package torrent import ( "fmt" "math" "strings" "sync" "time" "github.com/debridmediamanager/zurg/internal/config" "github.com/debridmediamanager/zurg/pkg/realdebrid" mapset "github.com/deckarep/golang-set/v2" cmap "github.com/orcaman/concurrent-map/v2" ) const ( EXPIRED_LINK_TOLERANCE_HOURS = 24 ) func (t *TorrentManager) StartRepairJob() { if !t.Config.EnableRepair() { t.log.Debug("Repair is disabled, skipping repair job") return } t.repairTrigger = make(chan *Torrent) t.repairSet = mapset.NewSet[*Torrent]() // there is 1 repair worker, with max 1 blocking task go func() { t.log.Info("Starting periodic repair job") repairTicker := time.NewTicker(time.Duration(t.Config.GetRepairEveryMins()) * time.Minute) defer repairTicker.Stop() for { select { case <-repairTicker.C: t.invokeRepair(nil) case torrent := <-t.repairTrigger: // On-demand trigger with a specific torrent t.invokeRepair(torrent) case <-t.RepairKillSwitch: t.log.Info("Stopping periodic repair job") return } } }() } func (t *TorrentManager) invokeRepair(torrent *Torrent) { t.repairRunningMu.Lock() if t.repairRunning { t.repairRunningMu.Unlock() t.repairSet.Add(torrent) // don't do anything if repair is already running return } t.repairRunning = true t.repairRunningMu.Unlock() // Execute the repair job t.repairAll(torrent) // After repair is done t.repairRunningMu.Lock() t.repairRunning = false t.repairRunningMu.Unlock() // before we let go, let's check repairSet t.workerPool.Submit(func() { queuedTorrent, exists := t.repairSet.Pop() if exists { t.TriggerRepair(queuedTorrent) } }) } // TriggerRepair allows an on-demand repair to be initiated. func (t *TorrentManager) TriggerRepair(torrent *Torrent) { t.repairTrigger <- torrent } func (t *TorrentManager) repairAll(torrent *Torrent) { // todo: a more elegant way to do this var haystack cmap.ConcurrentMap[string, *Torrent] if torrent == nil { haystack, _ = t.DirectoryMap.Get(INT_ALL) t.log.Info("Periodic repair started; searching for broken torrents") } else { haystack = cmap.New[*Torrent]() haystack.Set("", torrent) } // collect all torrents that need to be repaired toRepair := mapset.NewSet[*Torrent]() haystack.IterCb(func(_ string, torrent *Torrent) { if torrent.AnyInProgress() || torrent.AllInProgress() || torrent.UnrepairableReason != "" { return } // check 1: for broken files brokenFileIDs := mapset.NewSet[int]() torrent.SelectedFiles.IterCb(func(_ string, file *File) { if file.IsBroken && !file.IsDeleted { brokenFileIDs.Add(file.ID) } }) if brokenFileIDs.Cardinality() > 0 { toRepair.Add(torrent) return } // check 2: for unassigned links (this means the torrent has started to deteriorate) if torrent.UnassignedLinks.Cardinality() > 0 { t.log.Debugf("Torrent %s has unassigned links, adding to repair list", t.GetKey(torrent)) toRepair.Add(torrent) return } }) var wg sync.WaitGroup toRepair.Each(func(torrent *Torrent) bool { wg.Add(1) t.Repair(torrent, &wg) return false }) wg.Wait() t.log.Infof("Finished repairing %d broken torrents", toRepair.Cardinality()) } func (t *TorrentManager) Repair(torrent *Torrent, wg *sync.WaitGroup) { if torrent.UnrepairableReason != "" { t.log.Warnf("Torrent %s is unfixable (%s), skipping repair", t.GetKey(torrent), torrent.UnrepairableReason) wg.Done() return } if torrent.AnyInProgress() || torrent.AllInProgress() { t.log.Infof("Torrent %s is in progress, skipping repair until download is done", t.GetKey(torrent)) wg.Done() return } // blocks for approx 45 minutes if active torrents are full if !t.canCapacityHandle() { t.log.Error("Blocked for too long due to limit of active torrents, cannot continue with the repair") wg.Done() return } // assign to a worker _ = t.workerPool.Submit(func() { defer wg.Done() t.repair(torrent) }) } func (t *TorrentManager) repair(torrent *Torrent) { t.log.Infof("Started repair process for torrent %s (ids=%v)", t.GetKey(torrent), torrent.DownloadedIDs.Union(torrent.InProgressIDs).ToSlice()) // handle torrents with incomplete links for selected files // torrent can be rare'ed by RD, so we need to check for that if !t.assignUnassignedLinks(torrent) { t.log.Debugf("Ending repair process early for torrent %s", t.GetKey(torrent)) return } // get all broken files brokenFiles, allBroken := getBrokenFiles(torrent) brokenFileIDs := getFileIDs(brokenFiles) t.log.Debugf("Torrent %s has %d broken files (ids=%s; total is %d), repairing by redownloading", t.GetKey(torrent), len(brokenFiles), brokenFileIDs, torrent.SelectedFiles.Count()) // first step: redownload the whole torrent info, err := t.redownloadTorrent(torrent, "") // reinsert the torrent, passing "" if info != nil && info.Progress != 100 { torrent.InProgressIDs.Add(info.ID) t.saveTorrentChangesToDisk(torrent, nil) t.log.Infof("Torrent %s (files=%s) is still in progress after redownloading but it should be repaired once done", t.GetKey(torrent), brokenFileIDs) return } else if info != nil && info.Progress == 100 && !t.isStillBroken(info, brokenFiles) { selectedFiles := getSelectedFiles(info) torrent.SelectedFiles.IterCb(func(_ string, oldFile *File) { for _, newFile := range selectedFiles { if oldFile.Bytes == newFile.Bytes { oldFile.Link = newFile.Link oldFile.IsBroken = false break } } }) torrent.DownloadedIDs.Add(info.ID) t.saveTorrentChangesToDisk(torrent, nil) t.log.Infof("Successfully repaired torrent %s (files=%s) by redownloading", t.GetKey(torrent), brokenFileIDs) return } t.log.Warnf("Cannot repair torrent %s by redownloading (error=%s)", t.GetKey(torrent), err.Error()) if torrent.UnrepairableReason != "" { t.log.Debugf("Torrent %s has been marked as unfixable during redownload (%s), ending repair process early", t.GetKey(torrent), torrent.UnrepairableReason) return } // second step: download just the broken files if len(brokenFiles) == 1 && allBroken { // if all files are broken, we can't do anything t.log.Warnf("Torrent %s has only 1 cached file and it's broken, marking as unfixable", t.GetKey(torrent)) t.markAsUnfixable(torrent, "the lone cached file is broken") return } else if len(brokenFiles) > 1 { t.log.Infof("Repairing by downloading 2 batches of the %d broken files of torrent %s", len(brokenFiles), t.GetKey(torrent)) oldTorrentIDs := torrent.DownloadedIDs.Union(torrent.InProgressIDs).ToSlice() newlyDownloadedIds := make([]string, 0) group := make([]*File, 0) for _, file := range brokenFiles { group = append(group, file) if len(group) >= 200 { brokenFileIDs := getFileIDs(group) redownloadedInfo, err := t.redownloadTorrent(torrent, brokenFileIDs) if err != nil { t.log.Warnf("Cannot repair torrent %s by downloading broken files (error=%s) giving up", t.GetKey(torrent), err.Error()) for _, newId := range newlyDownloadedIds { t.registerFixer(newId, "download_failed") } return } newlyDownloadedIds = append(newlyDownloadedIds, redownloadedInfo.ID) group = make([]*File, 0) } } if len(group) > 0 { brokenFileIDs := getFileIDs(group) _, err := t.redownloadTorrent(torrent, brokenFileIDs) if err != nil { t.log.Warnf("Cannot repair torrent %s by downloading broken files (error=%s) giving up", t.GetKey(torrent), err.Error()) for _, newId := range newlyDownloadedIds { t.registerFixer(newId, "download_failed") } return } } for _, oldId := range oldTorrentIDs { t.registerFixer(oldId, "replaced") } } } func (t *TorrentManager) assignUnassignedLinks(torrent *Torrent) bool { unassignedTotal := torrent.UnassignedLinks.Cardinality() t.log.Infof("Trying to assign %d links to the %d selected of incomplete torrent %s", unassignedTotal, torrent.SelectedFiles.Count(), t.GetKey(torrent)) // handle torrents with incomplete links for selected files assignedCount := 0 expiredCount := 0 rarCount := 0 newUnassignedLinks := cmap.New[*realdebrid.Download]() torrent.UnassignedLinks.Each(func(link string) bool { // unrestrict each unassigned link that was filled out during torrent init unrestrict := t.UnrestrictLinkUntilOk(link) if unrestrict == nil { expiredCount++ // newUnassignedLinks.Set(link, nil) return false // next } // try to assign to a selected file assigned := false torrent.SelectedFiles.IterCb(func(_ string, file *File) { // base it on size because why not? if file.Bytes == unrestrict.Filesize || strings.HasSuffix(strings.ToLower(file.Path), strings.ToLower(unrestrict.Filename)) { file.Link = link file.IsBroken = false assigned = true assignedCount++ } }) if !assigned { // if not assigned and is a rar, likely it was rar'ed by RD if strings.HasSuffix(strings.ToLower(unrestrict.Filename), ".rar") { rarCount++ } else { t.log.Warnf("Cannot assign %s to any file in torrent %s", unrestrict.Filename, t.GetKey(torrent)) } newUnassignedLinks.Set(link, unrestrict) } processedCount := assignedCount + newUnassignedLinks.Count() + expiredCount if processedCount%10 == 0 { t.log.Infof("Processed %d out of %d links (%d expired) to broken torrent %s", processedCount, unassignedTotal, expiredCount, t.GetKey(torrent)) } return false }) if assignedCount == 0 && rarCount == 1 { // this is a rar'ed torrent, nothing we can do if t.Config.ShouldDeleteRarFiles() { t.log.Warnf("Torrent %s is rar'ed and we cannot repair it, deleting it as configured", t.GetKey(torrent)) t.Delete(t.GetKey(torrent), true) } else { t.log.Warnf("Torrent %s is rar'ed and we cannot repair it", t.GetKey(torrent)) newUnassignedLinks.IterCb(func(_ string, unassigned *realdebrid.Download) { // if unassigned == nil { // return // } newFile := &File{ File: realdebrid.File{ ID: 0, Path: unassigned.Filename, Bytes: unassigned.Filesize, Selected: 0, }, Ended: torrent.Added, Link: unassigned.Link, IsBroken: false, } torrent.SelectedFiles.Set(unassigned.Filename, newFile) }) torrent.UnassignedLinks = mapset.NewSet[string]() t.markAsUnfixable(torrent, "rar'ed by RD") t.markAsUnplayable(torrent, "rar'ed by RD") } return false } // empty/reset the unassigned links as we have assigned them already if torrent.UnassignedLinks.Cardinality() > 0 { t.saveTorrentChangesToDisk(torrent, func(info *Torrent) { info.UnassignedLinks = mapset.NewSet[string]() }) } return true } func (t *TorrentManager) redownloadTorrent(torrent *Torrent, selection string) (*realdebrid.TorrentInfo, error) { // broken files means broken links // if brokenFiles is not provided, we will redownload all files oldTorrentIDs := make([]string, 0) if selection == "" { // only delete the old torrent if we are redownloading all files oldTorrentIDs = torrent.DownloadedIDs.Union(torrent.InProgressIDs).ToSlice() tmpSelection := "" torrent.SelectedFiles.IterCb(func(_ string, file *File) { tmpSelection += fmt.Sprintf("%d,", file.ID) // select all files }) if tmpSelection == "" { return nil, nil // nothing to repair } else { selection = tmpSelection[:len(tmpSelection)-1] } } // redownload torrent resp, err := t.Api.AddMagnetHash(torrent.Hash) if err != nil { if strings.Contains(err.Error(), "infringing") { t.markAsUnfixable(torrent, "infringing torrent") } else if strings.Contains(err.Error(), "unsupported") { t.markAsUnfixable(torrent, "unsupported torrent") } else if strings.Contains(err.Error(), "unavailable") { t.markAsUnfixable(torrent, "unavailable torrent") } else if strings.Contains(err.Error(), "invalid") { t.markAsUnfixable(torrent, "invalid torrent") } else if strings.Contains(err.Error(), "big") { t.markAsUnfixable(torrent, "torrent too big") } else if strings.Contains(err.Error(), "allowed") { t.markAsUnfixable(torrent, "torrent not allowed") } return nil, fmt.Errorf("cannot redownload torrent: %v", err) } newTorrentID := resp.ID // sleep for 1 second to let RD process the magnet time.Sleep(10 * time.Second) var info *realdebrid.TorrentInfo for { // select files err = t.Api.SelectTorrentFiles(newTorrentID, selection) if err != nil { t.registerFixer(newTorrentID, "download_failed") return nil, fmt.Errorf("cannot start redownloading: %v", err) } // sleep for 5 second to let RD process the magnet time.Sleep(10 * time.Second) // see if the torrent is ready info, err = t.Api.GetTorrentInfo(newTorrentID) if err != nil { t.registerFixer(newTorrentID, "download_failed") return nil, fmt.Errorf("cannot get info on redownloaded torrent %s (id=%s) : %v", t.GetKey(torrent), newTorrentID, err) } if info.Status != "magnet_conversion" && info.Status != "waiting_files_selection" { break } } // documented status: magnet_error, magnet_conversion, waiting_files_selection, queued, downloading, downloaded, error, virus, compressing, uploading, dead isOkStatus := false okStatuses := []string{"downloading", "downloaded", "uploading", "queued", "compressing"} // not compressing because we need playable files for _, status := range okStatuses { if info.Status == status { isOkStatus = true break } } if !isOkStatus { t.registerFixer(info.ID, "download_failed") return nil, fmt.Errorf("the redownloaded torrent %s is in a non-OK state: %s", t.GetKey(torrent), info.Status) } // check if incorrect number of links selectionCount := len(strings.Split(selection, ",")) if info.Progress == 100 && len(info.Links) != selectionCount { t.registerFixer(newTorrentID, "download_failed") return nil, fmt.Errorf("torrent %s only got %d links but we need %d", t.GetKey(torrent), len(info.Links), selectionCount) } t.log.Infof("Redownloading torrent %s successful (id=%s, progress=%d)", t.GetKey(torrent), info.ID, info.Progress) for _, id := range oldTorrentIDs { t.registerFixer(id, "replaced") } return info, nil } func (t *TorrentManager) canCapacityHandle() bool { // max waiting time is 45 minutes const maxRetries = 50 const baseDelay = 1 * time.Second const maxDelay = 60 * time.Second retryCount := 0 for { count, err := t.Api.GetActiveTorrentCount() if err != nil { t.log.Warnf("Cannot get active downloads count: %v", err) if retryCount >= maxRetries { t.log.Error("Max retries reached. Exiting.") return false } delay := time.Duration(math.Pow(2, float64(retryCount))) * baseDelay if delay > maxDelay { delay = maxDelay } time.Sleep(delay) retryCount++ continue } if count.DownloadingCount < count.MaxNumberOfTorrents-1 { return true } delay := time.Duration(math.Pow(2, float64(retryCount))) * baseDelay if delay > maxDelay { delay = maxDelay } t.log.Infof("We have reached the max number of active torrents, waiting for %s seconds before retrying", delay) if retryCount >= maxRetries { t.log.Error("Max retries reached. Exiting.") return false } time.Sleep(delay) retryCount++ } } func (t *TorrentManager) markAsUnplayable(torrent *Torrent, reason string) { t.log.Warnf("Marking torrent %s as unplayable - %s", t.GetKey(torrent), reason) t.DirectoryMap.IterCb(func(directory string, torrents cmap.ConcurrentMap[string, *Torrent]) { torrents.Remove(t.GetKey(torrent)) }) torrents, _ := t.DirectoryMap.Get(config.UNPLAYABLE_TORRENTS) torrents.Set(t.GetKey(torrent), torrent) } func (t *TorrentManager) markAsUnfixable(torrent *Torrent, reason string) { t.log.Warnf("Marking torrent %s as unfixable - %s", t.GetKey(torrent), reason) torrent.UnrepairableReason = reason t.saveTorrentChangesToDisk(torrent, func(t *Torrent) { t.UnrepairableReason = reason }) } // getBrokenFiles returns the files that are not http links and not deleted func getBrokenFiles(torrent *Torrent) ([]*File, bool) { var brokenFiles []*File allBroken := true torrent.SelectedFiles.IterCb(func(_ string, file *File) { if file.IsBroken && !file.IsDeleted { brokenFiles = append(brokenFiles, file) } else { allBroken = false } }) return brokenFiles, allBroken } // isStillBroken checks if the torrent is still broken // if it's not broken anymore, it will assign the links to the files func (t *TorrentManager) isStillBroken(info *realdebrid.TorrentInfo, brokenFiles []*File) bool { var selectedFiles []*File for _, file := range info.Files { if file.Selected == 0 { continue } selectedFiles = append(selectedFiles, &File{ File: file, Ended: info.Ended, Link: "", // no link yet IsBroken: true, }) } if len(selectedFiles) == len(info.Links) { // all links are still intact! good! for i, file := range selectedFiles { file.Link = info.Links[i] file.IsBroken = false } } else { // if we can't assign links, it's still broken return true } if len(brokenFiles) == 0 { // just check for the last file brokenFiles = append(brokenFiles, selectedFiles[len(selectedFiles)-1]) } // check if the broken files can now be unrestricted for _, oldFile := range brokenFiles { for idx, newFile := range selectedFiles { if oldFile.Bytes == newFile.Bytes { unrestrict := t.UnrestrictFileUntilOk(selectedFiles[idx]) if unrestrict == nil || oldFile.Bytes != unrestrict.Filesize { return true } } } } return false } func getSelectedFiles(info *realdebrid.TorrentInfo) []*File { var selectedFiles []*File // if some Links are empty, we need to repair it for _, file := range info.Files { if file.Selected == 0 { continue } selectedFiles = append(selectedFiles, &File{ File: file, Ended: info.Ended, Link: "", // no link yet IsBroken: true, }) } if len(selectedFiles) == len(info.Links) { // all links are still intact! good! for i, file := range selectedFiles { file.Link = info.Links[i] file.IsBroken = false } } return selectedFiles }