package torrent import ( "context" "fmt" "math" "strings" "sync" "time" "github.com/debridmediamanager/zurg/internal/config" "github.com/debridmediamanager/zurg/pkg/http" "github.com/debridmediamanager/zurg/pkg/realdebrid" "github.com/debridmediamanager/zurg/pkg/utils" mapset "github.com/deckarep/golang-set/v2" cmap "github.com/orcaman/concurrent-map/v2" ) const ( EXPIRED_LINK_TOLERANCE_HOURS = 24 ) func (t *TorrentManager) StartRepairJob() { if !t.Config.EnableRepair() { t.repairLog.Warn("Repair is disabled, skipping repair job") return } t.repairChan = make(chan *Torrent) t.RepairQueue = mapset.NewSet[*Torrent]() t.RepairAllTrigger = make(chan struct{}) // periodic repair worker t.workerPool.Submit(func() { t.repairLog.Debug("Starting periodic repair job") repairTicker := time.NewTicker(time.Duration(t.Config.GetRepairEveryMins()) * time.Minute) defer repairTicker.Stop() for { select { case <-repairTicker.C: t.repairLog.Debug("Periodic repair started; searching for broken torrents") t.EnqueueForRepair(nil) case <-t.RepairAllTrigger: t.repairLog.Debug("Manual repair of all torrents triggered; searching for broken torrents") t.EnqueueForRepair(nil) } } }) // repair worker t.workerPool.Submit(func() { for { select { case torrent := <-t.repairChan: t.invokeRepair(torrent) case <-t.RepairWorkerKillSwitch: t.repairLog.Info("Stopping periodic repair job") return } } }) } // EnqueueForRepair allows an on-demand repair to be initiated. func (t *TorrentManager) EnqueueForRepair(torrent *Torrent) { if !t.Config.EnableRepair() { if torrent != nil { t.repairLog.Warnf("Repair is disabled, skipping repair for torrent %s", t.GetKey(torrent)) } return } if torrent != nil && torrent.State.Event(context.Background(), "break_torrent") != nil { // t.repairLog.Errorf("Failed to mark torrent %s as broken: %v", t.GetKey(torrent), err) return } t.workerPool.Submit(func() { t.invokeRepair(torrent) }) } // invokeRepair runs a sync repair job func (t *TorrentManager) invokeRepair(torrent *Torrent) { t.repairRunningMu.Lock() if t.repairRunning { t.repairRunningMu.Unlock() t.RepairQueue.Add(torrent) // don't do anything if repair is already running return } t.repairRunning = true t.repairRunningMu.Unlock() // Execute the repair job t.executeRepairJob(torrent) // After repair is done t.repairRunningMu.Lock() t.repairRunning = false t.repairRunningMu.Unlock() if queuedTorrent, exists := t.RepairQueue.Pop(); exists { t.workerPool.Submit(func() { t.invokeRepair(queuedTorrent) }) } } func (t *TorrentManager) executeRepairJob(torrent *Torrent) { var haystack cmap.ConcurrentMap[string, *Torrent] if torrent == nil { haystack, _ = t.DirectoryMap.Get(INT_ALL) } else { haystack = cmap.New[*Torrent]() haystack.Set("", torrent) } // collect all torrents that need to be repaired asynchronously toRepair := mapset.NewSet[*Torrent]() var wg sync.WaitGroup haystack.IterCb(func(_ string, torrent *Torrent) { wg.Add(1) // temp worker for finding broken torrents t.workerPool.Submit(func() { defer wg.Done() canExtract := t.Config.GetRarAction() == "extract" && strings.Contains(torrent.UnrepairableReason, "rar") if torrent.UnrepairableReason != "" && !canExtract { return } // check 1: for broken files brokenFileCount := 0 torrent.SelectedFiles.IterCb(func(_ string, file *File) { if !utils.IsVideo(file.Path) && !t.IsPlayable(file.Path) { return } if file.State.Is("broken_file") { brokenFileCount++ } }) if brokenFileCount > 0 { t.repairLog.Debugf("Torrent %s has %d broken file(s), adding to repair list", t.GetKey(torrent), brokenFileCount) toRepair.Add(torrent) return } // check 2: for unassigned links (this means the torrent has started to deteriorate) unassignedCount := torrent.UnassignedLinks.Cardinality() if unassignedCount > 0 { t.repairLog.Debugf("Torrent %s has %d unassigned link(s), adding to repair list", t.GetKey(torrent), unassignedCount) toRepair.Add(torrent) return } // if marked as broken, but no broken files or unassigned links, mark as repaired if torrent.State.Is("broken_torrent") { torrent.State.Event(context.Background(), "mark_as_repaired") } }) }) wg.Wait() toRepair.Each(func(torrent *Torrent) bool { wg.Add(1) t.repair(torrent, &wg) return false }) wg.Wait() t.repairLog.Infof("Finished periodic repair sequence for %d broken torrent(s)", toRepair.Cardinality()) } // repairman func (t *TorrentManager) repair(torrent *Torrent, wg *sync.WaitGroup) { defer wg.Done() if err := torrent.State.Event(context.Background(), "repair_torrent"); err != nil { // t.repairLog.Errorf("Failed to mark torrent %s as under repair: %v", t.GetKey(torrent), err) return } // blocks for approx 45 minutes if active torrents are full if !t.canCapacityHandle() { t.repairLog.Error("Blocked for too long due to limit of active torrents, cannot continue with the repair") return } t.repairLog.Infof("Started repair process for torrent %s (ids=%v)", t.GetKey(torrent), torrent.DownloadedIDs.ToSlice()) if torrent.UnassignedLinks.Cardinality() > 0 && !t.assignLinks(torrent) { return } bwLimitReached := false // check for other broken file torrent.SelectedFiles.IterCb(func(_ string, file *File) { if bwLimitReached || !file.State.Is("ok_file") { return } _, err := t.UnrestrictFile(file, true) if dlErr, ok := err.(*http.DownloadErrorResponse); ok && dlErr.Message == "bytes_limit_reached" { bwLimitReached = true return } if err != nil { file.State.Event(context.Background(), "break_file") } }) if bwLimitReached { t.repairLog.Warnf("Your account has reached the bandwidth limit, cannot continue repairing torrent %s", t.GetKey(torrent)) return } brokenFiles, allBroken := t.getBrokenFiles(torrent) // check if broken files are playable allPlayable := true videoFiles := []string{} for _, file := range brokenFiles { if !utils.IsVideo(file.Path) && !t.IsPlayable(file.Path) { continue } if utils.IsVideo(file.Path) { videoFiles = append(videoFiles, fmt.Sprintf("%d", file.ID)) continue } allPlayable = false if t.Config.GetRarAction() == "extract" && file.ID != 0 { t.repairLog.Debugf("Extracting file %s from rar'ed torrent %s", file.Path, t.GetKey(torrent)) info, _ := t.redownloadTorrent(torrent, []string{fmt.Sprintf("%d", file.ID)}) if info != nil { t.willDeleteOnceDone(info.ID) } } } if !allPlayable { if len(videoFiles) > 0 { t.repairLog.Debugf("Extracting %d video file(s) from rar'ed torrent %s", len(videoFiles), t.GetKey(torrent)) info, _ := t.redownloadTorrent(torrent, videoFiles) if info != nil { t.willDeleteOnceDone(info.ID) } } return } oldDownloadedIDs := torrent.DownloadedIDs.Clone() // first step: redownload the whole torrent t.repairLog.Debugf("Torrent %s has %d broken files (out of %d); repairing by redownloading whole torrent", t.GetKey(torrent), len(brokenFiles), torrent.SelectedFiles.Count()) info, err := t.redownloadTorrent(torrent, []string{}) // reinsert the whole torrent, passing empty selection if info != nil && info.Progress == 100 { err = t.checkIfBroken(info, brokenFiles) if dlErr, ok := err.(*http.DownloadErrorResponse); ok && dlErr.Message == "bytes_limit_reached" { t.repairLog.Warnf("Your account has reached the bandwidth limit, cannot continue repairing torrent %s", t.GetKey(torrent)) return } if err == nil { // delete the torrents it replaced oldDownloadedIDs.Each(func(torrentID string) bool { t.DeleteByID(torrentID) return false }) t.repairLog.Infof("Successfully repaired torrent %s by redownloading whole torrent", t.GetKey(torrent)) return } t.DeleteByID(info.ID) } else if info != nil && info.Progress != 100 { // it's faster to download just the broken files, so let's delete the newly downloaded torrent t.DeleteByID(info.ID) err = fmt.Errorf("no longer cached") } if err != nil { t.repairLog.Warnf("Cannot repair torrent %s by redownloading whole torrent (error=%v)", t.GetKey(torrent), err) } if torrent.UnrepairableReason != "" { t.repairLog.Debugf("Torrent %s has been marked as unfixable after redownloading torrent %s; ending repair process early", t.GetKey(torrent), torrent.UnrepairableReason) return } // second step: download just the broken files if len(brokenFiles) == 1 && allBroken { // if all files are broken, we can't do anything! t.repairLog.Warnf("Torrent %s has only 1 cached file and it's broken; marking as unfixable (to fix, select other files)", t.GetKey(torrent)) t.markAsUnfixable(torrent, "the lone cached file is broken") return } totalBatches := int(math.Ceil(float64(len(brokenFiles)) / 100)) t.repairLog.Infof("Torrent %s will be repaired by downloading %d batches of the %d broken files", t.GetKey(torrent), totalBatches, len(brokenFiles)) newlyDownloadedIds := make([]string, 0) batchNum := 1 brokenFileIDs := getFileIDs(brokenFiles) var group []string for i, fileIDStr := range brokenFileIDs { group = append(group, fileIDStr) if len(group) >= 100 || i == len(brokenFileIDs)-1 { t.repairLog.Debugf("Downloading batch %d/%d of broken files of torrent %s", batchNum, totalBatches, t.GetKey(torrent)) batchNum++ redownloadedInfo, err := t.redownloadTorrent(torrent, group) if err != nil { t.repairLog.Warnf("Cannot repair torrent %s by downloading broken files (error=%v) giving up", t.GetKey(torrent), err) // delete the newly downloaded torrents because the operation failed for _, newId := range newlyDownloadedIds { t.DeleteByID(newId) } return } newlyDownloadedIds = append(newlyDownloadedIds, redownloadedInfo.ID) group = make([]string, 0) } } // once done, we can delete the newly downloaded torrents because we only need the links for _, newId := range newlyDownloadedIds { t.willDeleteOnceDone(newId) } } func (t *TorrentManager) assignLinks(torrent *Torrent) bool { unassignedTotal := torrent.UnassignedLinks.Cardinality() t.repairLog.Infof("Trying to assign %d link(s) to the %d selected files of incomplete torrent %s", unassignedTotal, torrent.SelectedFiles.Count(), t.GetKey(torrent)) // handle torrents with incomplete links for selected files assignedCount := 0 expiredCount := 0 rarCount := 0 unassignedCount := 0 newUnassignedLinks := cmap.New[*realdebrid.Download]() var assignedLinks []string bwLimitReached := false torrent.UnassignedLinks.Clone().Each(func(link string) bool { // unrestrict each unassigned link that was filled out during torrent init unrestrict, err := t.UnrestrictLink(link, true) if dlErr, ok := err.(*http.DownloadErrorResponse); ok && dlErr.Message == "bytes_limit_reached" { bwLimitReached = true return true } if unrestrict == nil { expiredCount++ return false // next unassigned link } // try to assign to a selected file assigned := false torrent.SelectedFiles.IterCb(func(_ string, file *File) { if !assigned && file.State.Is("broken_file") && file.Bytes == unrestrict.Filesize && strings.HasSuffix(strings.ToLower(file.Path), strings.ToLower(unrestrict.Filename)) { file.Link = link assignedLinks = append(assignedLinks, link) if strings.HasPrefix(file.Link, "https://real-debrid.com/d/") { file.Link = file.Link[0:39] } file.State.Event(context.Background(), "repair_file") assigned = true assignedCount++ } }) if !assigned { // if not assigned and is a rar, likely it was rar'ed by RD if strings.HasSuffix(strings.ToLower(unrestrict.Filename), ".rar") { // t.log.Debugf("Trying to get contents of rar file %s", unrestrict.Filename) // contents, err := t.rarReader.GetRARContents(unrestrict.Download) // if err != nil { // t.repairLog.Warnf("Cannot get contents of rar file %s: %v", unrestrict.Filename, err) // } // t.log.Debugf("contents: %v", contents) rarCount++ } else { // it's possible that it is already repaired t.repairLog.Warnf("Cannot assign %s to any file in torrent %s", unrestrict.Filename, t.GetKey(torrent)) } newUnassignedLinks.Set(link, unrestrict) } processedCount := assignedCount + unassignedCount + expiredCount if processedCount%10 == 0 || processedCount == unassignedTotal { // save each progress for _, assignedLink := range assignedLinks { torrent.UnassignedLinks.Remove(assignedLink) } t.writeTorrentToFile(torrent) t.repairLog.Infof("Processed %d out of %d links (%d expired) to broken torrent %s", processedCount, unassignedTotal, expiredCount, t.GetKey(torrent)) } return false // next unassigned link }) if bwLimitReached { t.repairLog.Warnf("Your account has reached the bandwidth limit, cannot continue assigning links to torrent %s", t.GetKey(torrent)) return false } // empty/reset the unassigned links as we have assigned them already if unassignedTotal > 0 { torrent.UnassignedLinks = mapset.NewSet[string]() t.writeTorrentToFile(torrent) } if assignedCount != 0 || rarCount != 1 { return true // continue repair } action := t.Config.GetRarAction() if action == "delete" { t.repairLog.Warnf("Torrent %s is rar'ed and we cannot repair it, deleting it as configured", t.GetKey(torrent)) t.Delete(t.GetKey(torrent), true) return false // end repair } newUnassignedLinks.IterCb(func(_ string, unassigned *realdebrid.Download) { torrent.SelectedFiles.Set(unassigned.Filename, &File{ File: realdebrid.File{ ID: 0, Path: unassigned.Filename, Bytes: unassigned.Filesize, Selected: 0, }, Ended: torrent.Added, Link: unassigned.Link, State: NewFileState("ok_file"), }) }) if action == "extract" { videoFiles := []string{} torrent.SelectedFiles.IterCb(func(_ string, file *File) { if utils.IsVideo(file.Path) { videoFiles = append(videoFiles, fmt.Sprintf("%d", file.ID)) } else if file.ID != 0 && t.IsPlayable(file.Path) { t.repairLog.Debugf("Extracting file %s from rar'ed torrent %s", file.Path, t.GetKey(torrent)) info, _ := t.redownloadTorrent(torrent, []string{fmt.Sprintf("%d", file.ID)}) if info != nil { t.willDeleteOnceDone(info.ID) } } }) if len(videoFiles) > 0 { t.repairLog.Debugf("Extracting %d video file(s) from rar'ed torrent %s", len(videoFiles), t.GetKey(torrent)) info, _ := t.redownloadTorrent(torrent, videoFiles) if info != nil { t.willDeleteOnceDone(info.ID) } } } else { t.repairLog.Warnf("Torrent %s is rar'ed and we cannot repair it", t.GetKey(torrent)) t.markAsUnfixable(torrent, "rar'ed by RD") t.markAsUnplayable(torrent, "rar'ed by RD") torrent.State.Event(context.Background(), "mark_as_repaired") } torrent.UnassignedLinks = mapset.NewSet[string]() t.writeTorrentToFile(torrent) return false // end repair } func (t *TorrentManager) redownloadTorrent(torrent *Torrent, selection []string) (*realdebrid.TorrentInfo, error) { finalSelection := strings.Join(selection, ",") if len(selection) == 0 { // if brokenFiles is not provided, we will redownload all files torrent.SelectedFiles.IterCb(func(_ string, file *File) { selection = append(selection, fmt.Sprintf("%d", file.ID)) }) if len(selection) == 0 { return nil, nil } else { finalSelection = strings.Join(selection, ",") } } // redownload torrent var newTorrentID string prevState := t.latestState resp, err := t.api.AddMagnetHash(torrent.Hash) if err != nil { if strings.Contains(err.Error(), "timeout") { newState := t.getCurrentState() if prevState.Eq(newState) { return t.redownloadTorrent(torrent, selection) } // sometimes, adding a new hash will encounter a timeout but the torrent is still added newTorrentID = t.latestState.FirstTorrentId } else { if strings.Contains(err.Error(), "infringing") { t.markAsUnfixable(torrent, "infringing torrent") } else if strings.Contains(err.Error(), "unsupported") { t.markAsUnfixable(torrent, "unsupported torrent") } else if strings.Contains(err.Error(), "unavailable") { t.markAsUnfixable(torrent, "unavailable torrent") } else if strings.Contains(err.Error(), "invalid") { t.markAsUnfixable(torrent, "invalid torrent") } else if strings.Contains(err.Error(), "big") { t.markAsUnfixable(torrent, "torrent too big") } else if strings.Contains(err.Error(), "allowed") { t.markAsUnfixable(torrent, "torrent not allowed") } return nil, fmt.Errorf("cannot add magnet: %v", err) } } if resp != nil { newTorrentID = resp.ID } time.Sleep(2 * time.Second) var info *realdebrid.TorrentInfo retries := 0 for { retries++ if retries > 10 { t.DeleteByID(newTorrentID) return nil, fmt.Errorf("cannot start redownloading: too many retries") } err = t.api.SelectTorrentFiles(newTorrentID, finalSelection) if err != nil { t.DeleteByID(newTorrentID) return nil, fmt.Errorf("cannot start redownloading: %v", err) } time.Sleep(2 * time.Second) info, err = t.api.GetTorrentInfo(newTorrentID) if err != nil { t.DeleteByID(newTorrentID) return nil, fmt.Errorf("cannot get info on redownloaded : %v", err) } if info.Status == "magnet_conversion" { time.Sleep(60 * time.Second) continue } break } // documented status: magnet_error, magnet_conversion, waiting_files_selection, queued, downloading, downloaded, error, virus, compressing, uploading, dead if info.Status != "downloading" && info.Status != "downloaded" && info.Status != "uploading" && info.Status != "queued" && info.Status != "compressing" { t.DeleteByID(newTorrentID) return nil, fmt.Errorf("non-OK state: %s", info.Status) } if info.Progress != 100 { t.repairLog.Infof("Downloading torrent %s (id=%s, progress=%d)", t.GetKey(torrent), info.ID, info.Progress) } else if info.Progress == 100 && len(info.Links) == len(selection) { t.repairLog.Infof("Downloaded %d file(s) of torrent %s (id=%s)", len(selection), t.GetKey(torrent), info.ID) } else if info.Progress == 100 && len(info.Links) != len(selection) { t.DeleteByID(newTorrentID) return nil, fmt.Errorf("only got %d links but we need %d", len(info.Links), len(selection)) } return info, nil } func (t *TorrentManager) canCapacityHandle() bool { // max waiting time is 45 minutes const maxRetries = 50 const baseDelay = 1 * time.Second const maxDelay = 60 * time.Second retryCount := 0 for { count, err := t.api.GetActiveTorrentCount() if err != nil { t.repairLog.Warnf("Cannot get active downloads count: %v", err) if retryCount >= maxRetries { t.repairLog.Error("Max retries reached. Exiting.") return false } delay := time.Duration(math.Pow(2, float64(retryCount))) * baseDelay if delay > maxDelay { delay = maxDelay } time.Sleep(delay) retryCount++ continue } if count.DownloadingCount < count.MaxNumberOfTorrents-1 { return true } delay := time.Duration(math.Pow(2, float64(retryCount))) * baseDelay if delay > maxDelay { delay = maxDelay } t.repairLog.Infof("We have reached the max number of active torrents, waiting for %s seconds before retrying", delay) if retryCount >= maxRetries { t.repairLog.Error("Max retries reached. Exiting.") return false } time.Sleep(delay) retryCount++ } } func (t *TorrentManager) markAsUnplayable(torrent *Torrent, reason string) { t.repairLog.Debugf("Torrent %s is unplayable (reason: %s), moving to unplayable directory", t.GetKey(torrent), reason) // reassign to unplayable torrents directory t.DirectoryMap.IterCb(func(directory string, torrents cmap.ConcurrentMap[string, *Torrent]) { if strings.HasPrefix(directory, "int__") { return } torrents.Remove(t.GetKey(torrent)) }) torrents, _ := t.DirectoryMap.Get(config.UNPLAYABLE_TORRENTS) torrents.Set(t.GetKey(torrent), torrent) } func (t *TorrentManager) markAsUnfixable(torrent *Torrent, reason string) { t.repairLog.Warnf("Marking torrent %s as unfixable - %s", t.GetKey(torrent), reason) torrent.UnrepairableReason = reason t.writeTorrentToFile(torrent) } // getBrokenFiles returns the files that are not http links and not deleted func (t *TorrentManager) getBrokenFiles(torrent *Torrent) ([]*File, bool) { var brokenFiles []*File allBroken := true torrent.SelectedFiles.IterCb(func(_ string, file *File) { if !utils.IsVideo(file.Path) && !t.IsPlayable(file.Path) { return } if file.State.Is("broken_file") { brokenFiles = append(brokenFiles, file) } else { // file is ok allBroken = false } }) return brokenFiles, allBroken } // checkIfBroken checks if the torrent is still broken // if it's not broken anymore, it will assign the links to the files func (t *TorrentManager) checkIfBroken(info *realdebrid.TorrentInfo, brokenFiles []*File) error { var selectedFiles []*File for _, file := range info.Files { if file.Selected == 0 { continue } selectedFiles = append(selectedFiles, &File{ File: file, Ended: info.Ended, Link: "", State: NewFileState("broken_file"), }) } if len(selectedFiles) != len(info.Links) { return fmt.Errorf("number of selected files and links do not match") } for i, file := range selectedFiles { file.Link = info.Links[i] if strings.HasPrefix(file.Link, "https://real-debrid.com/d/") { file.Link = file.Link[0:39] } file.State.Event(context.Background(), "repair_file") } if len(brokenFiles) == 0 { // just check for the last file brokenFiles = append(brokenFiles, selectedFiles[len(selectedFiles)-1]) } // check if the broken files can now be unrestricted and downloaded for _, oldFile := range brokenFiles { for idx, newFile := range selectedFiles { if oldFile.ID != newFile.ID { continue } if _, err := t.UnrestrictFile(selectedFiles[idx], true); err != nil { return err } } } return nil } func (t *TorrentManager) ResetRepairState() { if !t.Config.EnableRepair() { return } allTorrents, _ := t.DirectoryMap.Get(INT_ALL) allTorrents.IterCb(func(_ string, torrent *Torrent) { err := torrent.State.Event(context.Background(), "reset_repair") if err == nil { t.repairLog.Debugf("Repair state of torrent %s has been reset", t.GetKey(torrent)) t.writeTorrentToFile(torrent) } }) }