380 lines
11 KiB
Go
380 lines
11 KiB
Go
package torrent
|
|
|
|
import (
|
|
"fmt"
|
|
"math"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/debridmediamanager/zurg/internal/config"
|
|
"github.com/debridmediamanager/zurg/pkg/realdebrid"
|
|
mapset "github.com/deckarep/golang-set/v2"
|
|
cmap "github.com/orcaman/concurrent-map/v2"
|
|
)
|
|
|
|
const EXPIRED_LINK_TOLERANCE_HOURS = 24
|
|
|
|
func (t *TorrentManager) RepairAll() {
|
|
_ = t.repairWorker.Submit(func() {
|
|
t.log.Info("Repairing all broken torrents")
|
|
t.repairAll()
|
|
t.log.Info("Finished repairing all torrents")
|
|
})
|
|
}
|
|
|
|
func (t *TorrentManager) repairAll() {
|
|
allTorrents, _ := t.DirectoryMap.Get(INT_ALL)
|
|
|
|
var hashGroups []mapset.Set[string]
|
|
const maxGroupSize = 399
|
|
|
|
currentGroup := mapset.NewSet[string]()
|
|
hashGroups = append(hashGroups, currentGroup)
|
|
|
|
allTorrents.IterCb(func(_ string, torrent *Torrent) {
|
|
if torrent.AnyInProgress() || torrent.Unfixable {
|
|
return
|
|
}
|
|
// Check if the current group is full
|
|
if currentGroup.Cardinality() >= maxGroupSize {
|
|
// Create a new group and add it to the hashGroups
|
|
currentGroup = mapset.NewSet[string]()
|
|
hashGroups = append(hashGroups, currentGroup)
|
|
}
|
|
// Add the hash to the current group
|
|
currentGroup.Add(torrent.Hash)
|
|
})
|
|
|
|
t.log.Debug("Checking if torrents are still cached")
|
|
var availabilityChecks = make(map[string]bool)
|
|
uncachedCount := 0
|
|
for i := range hashGroups {
|
|
if hashGroups[i].Cardinality() == 0 {
|
|
break
|
|
}
|
|
resp, err := t.Api.AvailabilityCheck(hashGroups[i].ToSlice())
|
|
if err != nil {
|
|
t.log.Warnf("Cannot check availability: %v", err)
|
|
continue
|
|
}
|
|
|
|
for hash, hosterHash := range resp {
|
|
// Check if HosterHash is a map (Variants field is used)
|
|
availabilityChecks[hash] = len(hosterHash.Variants) > 0
|
|
if !availabilityChecks[hash] {
|
|
uncachedCount++
|
|
}
|
|
}
|
|
}
|
|
t.log.Debugf("Found %d torrents that are no longer cached", uncachedCount)
|
|
|
|
var toRepair []*Torrent
|
|
allTorrents.IterCb(func(_ string, torrent *Torrent) {
|
|
if torrent.AnyInProgress() || torrent.Unfixable {
|
|
return
|
|
}
|
|
|
|
// check 1: for cached status
|
|
isCached := true
|
|
if _, ok := availabilityChecks[torrent.Hash]; !ok || !availabilityChecks[torrent.Hash] {
|
|
isCached = false
|
|
}
|
|
// todo: also handle file ID checks
|
|
|
|
// check 2: for broken files
|
|
hasBrokenFiles := false
|
|
torrent.SelectedFiles.IterCb(func(_ string, file *File) {
|
|
if file.Link == "repair" || file.Link == "" {
|
|
hasBrokenFiles = true
|
|
}
|
|
})
|
|
|
|
if !isCached || hasBrokenFiles {
|
|
toRepair = append(toRepair, torrent)
|
|
}
|
|
})
|
|
t.log.Debugf("Found %d broken torrents to repair in total", len(toRepair))
|
|
for i := range toRepair {
|
|
torrent := toRepair[i]
|
|
t.log.Infof("Repairing %s", torrent.AccessKey)
|
|
t.repair(torrent)
|
|
}
|
|
}
|
|
|
|
func (t *TorrentManager) Repair(torrent *Torrent) {
|
|
infoCache, _ := t.DirectoryMap.Get(INT_INFO_CACHE)
|
|
torrent.DownloadedIDs.Each(func(id string) bool {
|
|
infoCache.Set(id, torrent)
|
|
t.writeTorrentToFile(id, torrent)
|
|
return false
|
|
})
|
|
_ = t.repairWorker.Submit(func() {
|
|
t.log.Infof("Repairing torrent %s", torrent.AccessKey)
|
|
t.repair(torrent)
|
|
t.log.Infof("Finished repairing torrent %s", torrent.AccessKey)
|
|
})
|
|
}
|
|
|
|
func (t *TorrentManager) repair(torrent *Torrent) {
|
|
if torrent.AnyInProgress() {
|
|
t.log.Infof("Torrent %s is in progress, skipping repair until download is done", torrent.AccessKey)
|
|
return
|
|
}
|
|
|
|
proceed := t.canCapacityHandle() // blocks for approx 45 minutes if active torrents are full
|
|
if !proceed {
|
|
t.log.Error("Reached the max number of active torrents, cannot continue with the repair")
|
|
return
|
|
}
|
|
|
|
if torrent.OlderThanDuration(EXPIRED_LINK_TOLERANCE_HOURS * time.Hour) {
|
|
// first solution: reinsert with same selection
|
|
t.log.Infof("Torrent %s is older than %d hours, reinserting it", torrent.AccessKey, EXPIRED_LINK_TOLERANCE_HOURS)
|
|
if t.reinsertTorrent(torrent, "") {
|
|
t.log.Infof("Successfully downloaded torrent %s to repair it", torrent.AccessKey)
|
|
return
|
|
} else if !torrent.Unfixable {
|
|
t.log.Warnf("Failed to repair by reinserting torrent %s, will only redownload broken files...", torrent.AccessKey)
|
|
} else {
|
|
t.log.Warnf("Cannot repair torrent %s", torrent.AccessKey)
|
|
return
|
|
}
|
|
} else {
|
|
t.log.Warnf("Torrent %s is not older than %d hours to be repaired by reinsertion, will only redownload broken files...", torrent.AccessKey, EXPIRED_LINK_TOLERANCE_HOURS)
|
|
}
|
|
|
|
// sleep for 30 seconds to let the torrent accumulate more broken files if scanning
|
|
time.Sleep(30 * time.Second)
|
|
|
|
// handle rar'ed torrents
|
|
assignedCount := 0
|
|
rarCount := 0
|
|
unassignedDownloads := make([]*realdebrid.Download, 0)
|
|
torrent.UnassignedLinks.Each(func(link string) bool {
|
|
unrestrict := t.UnrestrictUntilOk(link)
|
|
if unrestrict != nil && unrestrict.Link != "" {
|
|
// assign to a selected file
|
|
assigned := false
|
|
torrent.SelectedFiles.IterCb(func(_ string, file *File) {
|
|
// if strings.HasSuffix(file.Path, unrestrict.Filename) {
|
|
if file.Bytes == unrestrict.Filesize {
|
|
file.Link = unrestrict.Link
|
|
assigned = true
|
|
assignedCount++
|
|
}
|
|
})
|
|
if !assigned {
|
|
if strings.HasSuffix(unrestrict.Filename, ".rar") {
|
|
rarCount++
|
|
}
|
|
unassignedDownloads = append(unassignedDownloads, unrestrict)
|
|
}
|
|
}
|
|
return false
|
|
})
|
|
|
|
if assignedCount > 0 {
|
|
t.log.Infof("Assigned %d links to selected files for torrent %s", assignedCount, torrent.AccessKey)
|
|
} else if rarCount > 0 {
|
|
// this is a rar'ed torrent, nothing we can do
|
|
if t.Config.ShouldDeleteRarFiles() {
|
|
t.log.Warnf("Torrent %s is rar'ed and we cannot repair it, deleting it as configured", torrent.AccessKey)
|
|
t.Delete(torrent.AccessKey, true)
|
|
} else {
|
|
for _, unassigned := range unassignedDownloads {
|
|
newFile := &File{
|
|
File: realdebrid.File{
|
|
ID: 0,
|
|
Path: unassigned.Filename,
|
|
Bytes: unassigned.Filesize,
|
|
Selected: 1,
|
|
},
|
|
Ended: torrent.LatestAdded,
|
|
Link: unassigned.Link,
|
|
}
|
|
torrent.SelectedFiles.Set(unassigned.Filename, newFile)
|
|
}
|
|
t.markAsUnfixable(torrent)
|
|
t.markAsUnplayable(torrent)
|
|
}
|
|
return
|
|
}
|
|
|
|
// second solution: add only the broken files
|
|
var brokenFiles []File
|
|
torrent.SelectedFiles.IterCb(func(_ string, file *File) {
|
|
if file.Link == "repair" || file.Link == "" {
|
|
brokenFiles = append(brokenFiles, *file)
|
|
file.Link = "repairing"
|
|
}
|
|
})
|
|
t.log.Debugf("During repair, zurg found %d broken files for torrent %s", len(brokenFiles), torrent.AccessKey)
|
|
|
|
// todo: to verify removed logic when there's only 1 selected file selected and it's broken
|
|
|
|
if len(brokenFiles) == 1 && torrent.SelectedFiles.Count() > 1 {
|
|
// if we download a single file, it will be named differently
|
|
// so we need to download 1 extra file to preserve the name
|
|
// this is only relevant if we enable retain_rd_torrent_name
|
|
// add the first file link encountered with a prefix of http
|
|
t.log.Debugf("Torrent %s has only 1 broken file, adding 1 extra file to preserve the name", torrent.AccessKey)
|
|
for _, file := range torrent.SelectedFiles.Items() {
|
|
if strings.HasPrefix(file.Link, "http") {
|
|
brokenFiles = append(brokenFiles, *file)
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
if len(brokenFiles) > 0 {
|
|
t.log.Infof("Redownloading the %d broken files for torrent %s", len(brokenFiles), torrent.AccessKey)
|
|
brokenFileIDs := strings.Join(getFileIDs(brokenFiles), ",")
|
|
if t.reinsertTorrent(torrent, brokenFileIDs) {
|
|
t.log.Infof("Successfully downloaded torrent %s to repair it", torrent.AccessKey)
|
|
} else {
|
|
t.log.Warnf("Cannot repair torrent %s", torrent.AccessKey)
|
|
}
|
|
} else {
|
|
t.log.Warnf("Torrent %s has no broken files to repair", torrent.AccessKey)
|
|
}
|
|
}
|
|
|
|
func (t *TorrentManager) reinsertTorrent(torrent *Torrent, brokenFiles string) bool {
|
|
oldTorrentIDs := make([]string, 0)
|
|
// broken files means broken links
|
|
// if brokenFiles is not provided
|
|
if brokenFiles == "" {
|
|
// only replace the torrent if we are reinserting all files
|
|
oldTorrentIDs = torrent.DownloadedIDs.ToSlice()
|
|
tmpSelection := ""
|
|
torrent.SelectedFiles.IterCb(func(_ string, file *File) {
|
|
tmpSelection += fmt.Sprintf("%d,", file.ID) // select all files
|
|
})
|
|
if tmpSelection == "" {
|
|
return true // nothing to repair
|
|
} else {
|
|
brokenFiles = tmpSelection[:len(tmpSelection)-1]
|
|
}
|
|
}
|
|
|
|
// redownload torrent
|
|
resp, err := t.Api.AddMagnetHash(torrent.Hash)
|
|
if err != nil {
|
|
t.log.Warnf("Cannot redownload torrent: %v", err)
|
|
if strings.Contains(err.Error(), "infringing_file") {
|
|
t.markAsUnfixable(torrent)
|
|
}
|
|
return false
|
|
}
|
|
time.Sleep(1 * time.Second)
|
|
|
|
// select files
|
|
newTorrentID := resp.ID
|
|
err = t.Api.SelectTorrentFiles(newTorrentID, brokenFiles)
|
|
if err != nil {
|
|
t.log.Warnf("Cannot start redownloading: %v", err)
|
|
t.Api.DeleteTorrent(newTorrentID)
|
|
return false
|
|
}
|
|
time.Sleep(10 * time.Second)
|
|
|
|
// see if the torrent is ready
|
|
info, err := t.Api.GetTorrentInfo(newTorrentID)
|
|
if err != nil {
|
|
t.log.Warnf("Cannot get info on redownloaded torrent id=%s : %v", newTorrentID, err)
|
|
t.Api.DeleteTorrent(newTorrentID)
|
|
return false
|
|
}
|
|
|
|
if info.Status == "magnet_error" || info.Status == "error" || info.Status == "virus" || info.Status == "dead" {
|
|
t.log.Warnf("The redownloaded torrent id=%s is in error state: %s", newTorrentID, info.Status)
|
|
t.Api.DeleteTorrent(newTorrentID)
|
|
return false
|
|
}
|
|
|
|
if info.Progress != 100 {
|
|
t.log.Infof("Torrent id=%s is not cached anymore so we have to wait until completion (this should fix the issue already)", info.ID)
|
|
for _, id := range oldTorrentIDs {
|
|
t.Api.DeleteTorrent(id)
|
|
}
|
|
return true
|
|
}
|
|
|
|
brokenCount := len(strings.Split(brokenFiles, ","))
|
|
if len(info.Links) != brokenCount {
|
|
t.log.Infof("It did not fix the issue for id=%s, only got %d files but we need %d, undoing", info.ID, len(info.Links), brokenCount)
|
|
t.Api.DeleteTorrent(newTorrentID)
|
|
return false
|
|
}
|
|
|
|
t.log.Infof("Repair successful id=%s", newTorrentID)
|
|
for _, id := range oldTorrentIDs {
|
|
t.Api.DeleteTorrent(id)
|
|
}
|
|
return true
|
|
}
|
|
|
|
func (t *TorrentManager) canCapacityHandle() bool {
|
|
// max waiting time is 45 minutes
|
|
const maxRetries = 50
|
|
const baseDelay = 1 * time.Second
|
|
const maxDelay = 60 * time.Second
|
|
retryCount := 0
|
|
for {
|
|
count, err := t.Api.GetActiveTorrentCount()
|
|
if err != nil {
|
|
t.log.Warnf("Cannot get active downloads count: %v", err)
|
|
if retryCount >= maxRetries {
|
|
t.log.Error("Max retries reached. Exiting.")
|
|
return false
|
|
}
|
|
delay := time.Duration(math.Pow(2, float64(retryCount))) * baseDelay
|
|
if delay > maxDelay {
|
|
delay = maxDelay
|
|
}
|
|
time.Sleep(delay)
|
|
retryCount++
|
|
continue
|
|
}
|
|
|
|
if count.DownloadingCount < count.MaxNumberOfTorrents {
|
|
return true
|
|
}
|
|
|
|
delay := time.Duration(math.Pow(2, float64(retryCount))) * baseDelay
|
|
if delay > maxDelay {
|
|
delay = maxDelay
|
|
}
|
|
t.log.Infof("We have reached the max number of active torrents, waiting for %s seconds before retrying", delay)
|
|
|
|
if retryCount >= maxRetries {
|
|
t.log.Error("Max retries reached. Exiting.")
|
|
return false
|
|
}
|
|
|
|
time.Sleep(delay)
|
|
retryCount++
|
|
}
|
|
}
|
|
|
|
func (t *TorrentManager) markAsUnplayable(torrent *Torrent) {
|
|
t.log.Warnf("Marking torrent %s as unplayable", torrent.AccessKey)
|
|
t.DirectoryMap.IterCb(func(directory string, torrents cmap.ConcurrentMap[string, *Torrent]) {
|
|
torrents.Remove(torrent.AccessKey)
|
|
})
|
|
torrents, _ := t.DirectoryMap.Get(config.UNPLAYABLE_TORRENTS)
|
|
torrents.Set(torrent.AccessKey, torrent)
|
|
}
|
|
|
|
func (t *TorrentManager) markAsUnfixable(torrent *Torrent) {
|
|
t.log.Warnf("Marking torrent %s as unfixable", torrent.AccessKey)
|
|
torrent.Unfixable = true
|
|
infoCache, _ := t.DirectoryMap.Get(INT_INFO_CACHE)
|
|
torrent.DownloadedIDs.Each(func(id string) bool {
|
|
info, _ := infoCache.Get(id)
|
|
info.Unfixable = true
|
|
t.writeTorrentToFile(id, torrent)
|
|
return false
|
|
})
|
|
}
|