package torrent import ( "fmt" "math" "strings" "time" "github.com/debridmediamanager/zurg/internal/config" "github.com/debridmediamanager/zurg/pkg/realdebrid" mapset "github.com/deckarep/golang-set/v2" cmap "github.com/orcaman/concurrent-map/v2" ) const ( EXPIRED_LINK_TOLERANCE_HOURS = 24 ) func (t *TorrentManager) RepairAll() { _ = t.repairPool.Submit(func() { t.log.Info("Checking for broken torrents") t.repairAll() t.log.Info("Finished checking for broken torrents") }) } func (t *TorrentManager) repairAll() { allTorrents, _ := t.DirectoryMap.Get(INT_ALL) var hashGroups []mapset.Set[string] const maxGroupSize = 399 currentGroup := mapset.NewSet[string]() hashGroups = append(hashGroups, currentGroup) allTorrents.IterCb(func(_ string, torrent *Torrent) { if torrent.AnyInProgress() || torrent.Unrepairable { return } if currentGroup.Cardinality() >= maxGroupSize { currentGroup = mapset.NewSet[string]() hashGroups = append(hashGroups, currentGroup) } currentGroup.Add(torrent.Hash) }) t.log.Debug("Checking if torrents are still cached") var availabilityChecks = make(map[string]bool) uncachedCount := 0 for i := range hashGroups { if hashGroups[i].Cardinality() == 0 { break } resp, err := t.Api.AvailabilityCheck(hashGroups[i].ToSlice()) if err != nil { t.log.Warnf("Cannot check availability: %v", err) continue } for hash, hosterHash := range resp { // Check if HosterHash is a map (Variants field is used) availabilityChecks[hash] = len(hosterHash.Variants) > 0 if !availabilityChecks[hash] { uncachedCount++ } } } t.log.Debugf("Found %d torrents that are no longer cached", uncachedCount) toRepair := mapset.NewSet[*Torrent]() allTorrents.IterCb(func(_ string, torrent *Torrent) { if torrent.AnyInProgress() || torrent.Unrepairable { return } // check 1: for cached status isCached := true if _, ok := availabilityChecks[torrent.Hash]; !ok || !availabilityChecks[torrent.Hash] { isCached = false } // todo: also handle file ID checks // check 2: for broken files hasBrokenFiles := false torrent.SelectedFiles.IterCb(func(_ string, file *File) { if !strings.HasPrefix(file.Link, "http") && file.Link != "unselect" { hasBrokenFiles = true return } if !isCached && strings.HasPrefix(file.Link, "http") { unrestrict := t.UnrestrictUntilOk(file.Link) if unrestrict == nil || file.Bytes != unrestrict.Filesize { hasBrokenFiles = true return } } }) if !isCached || hasBrokenFiles || torrent.UnassignedLinks.Cardinality() > 0 { toRepair.Add(torrent) } }) t.log.Debugf("Found %d broken torrents to repair in total", toRepair.Cardinality()) toRepair.Each(func(torrent *Torrent) bool { t.Repair(torrent) return false }) } func (t *TorrentManager) Repair(torrent *Torrent) { if torrent.Unrepairable { t.log.Warnf("Torrent %s is unfixable, skipping repair", t.GetKey(torrent)) return } if t.repairs.Contains(t.GetKey(torrent)) { t.log.Warnf("Torrent %s is already being repaired, skipping repair", t.GetKey(torrent)) return } t.repairs.Add(t.GetKey(torrent)) // save the broken files to the file cache infoCache, _ := t.DirectoryMap.Get(INT_INFO_CACHE) torrent.DownloadedIDs.Each(func(id string) bool { info, _ := infoCache.Get(id) info.SelectedFiles.IterCb(func(_ string, file *File) { torrent.BrokenLinks.Each(func(link string) bool { if file.Link == link { file.Link = "" } return file.Link == link }) }) t.writeTorrentToFile(id, info) return false }) _ = t.workerPool.Submit(func() { t.repair(torrent) t.repairs.Remove(t.GetKey(torrent)) }) } func (t *TorrentManager) repair(torrent *Torrent) { t.log.Infof("Attempting repair for torrent %s", t.GetKey(torrent)) if torrent.AnyInProgress() { t.log.Infof("Torrent %s is in progress, skipping repair until download is done", t.GetKey(torrent)) return } proceed := t.canCapacityHandle() // blocks for approx 45 minutes if active torrents are full if !proceed { t.log.Error("Reached the max number of active torrents, cannot continue with the repair") return } // handle torrents with incomplete links for selected files assignedCount := 0 // number of rar files detected from the unrestricted links rarCount := 0 newUnassignedLinks := cmap.New[*realdebrid.Download]() // unrestrict each unassigned link that was filled out during torrent init torrent.UnassignedLinks.Each(func(link string) bool { unrestrict := t.UnrestrictUntilOk(link) if unrestrict == nil { newUnassignedLinks.Set(link, nil) // return early, no point continuing return false } // try to assign to a selected file assigned := false torrent.SelectedFiles.IterCb(func(_ string, file *File) { // base it on size because why not? if file.Bytes == unrestrict.Filesize { file.Link = link assigned = true assignedCount++ } }) if !assigned { // if not assigned and is a rar, likely it was rar'ed by RD if strings.HasSuffix(unrestrict.Filename, ".rar") { rarCount++ } newUnassignedLinks.Set(link, unrestrict) } return false }) if assignedCount > 0 { // if there are any assigned count t.log.Infof("Assigned %d links to selected files for torrent %s", assignedCount, t.GetKey(torrent)) } else if rarCount > 0 { // also is assignedCount=0 // this is a rar'ed torrent, nothing we can do if t.Config.ShouldDeleteRarFiles() { t.log.Warnf("Torrent %s is rar'ed and we cannot repair it, deleting it as configured", t.GetKey(torrent)) t.Delete(t.GetKey(torrent), true) } else { t.log.Warnf("Torrent %s is rar'ed and we cannot repair it, skipping repair", t.GetKey(torrent)) newUnassignedLinks.IterCb(func(_ string, unassigned *realdebrid.Download) { if unassigned == nil { return } newFile := &File{ File: realdebrid.File{ ID: 0, Path: unassigned.Filename, Bytes: unassigned.Filesize, Selected: 1, }, Ended: torrent.Added, Link: unassigned.Link, } torrent.SelectedFiles.Set(unassigned.Filename, newFile) }) torrent.UnassignedLinks = mapset.NewSet[string]() t.markAsUnfixable(torrent) t.markAsUnplayable(torrent) } return } // get all broken files brokenFiles := getBrokenFiles(torrent) fileIDs := getFileIDs(brokenFiles) brokenFileIDs := strings.Join(fileIDs, ",") // first solution: reinsert and see if the broken file is now working t.log.Debugf("repair_method#1: Trying to redownload torrent %s to repair files (%s)", t.GetKey(torrent), brokenFileIDs) info, err := t.redownloadTorrent(torrent, "") if err != nil { t.log.Warnf("Cannot repair torrent %s using repair_method#1", t.GetKey(torrent)) } if info != nil && info.Progress != 100 { t.log.Infof("Redownloading torrent %s after repair_method#1, it should work once done", t.GetKey(torrent)) return } if info != nil && info.IsDone() && !t.isStillBroken(info, brokenFiles) { t.log.Infof("Successfully repaired torrent %s using repair_method#1", t.GetKey(torrent)) return } // second solution: add only the broken files if len(brokenFiles) > 0 { t.log.Infof("repair_method#2: Redownloading %dof%d broken files for torrent %s", len(brokenFiles), torrent.SelectedFiles.Count(), t.GetKey(torrent)) _, err := t.redownloadTorrent(torrent, brokenFileIDs) if err != nil { t.log.Warnf("Cannot repair torrent %s using repair_method#2", t.GetKey(torrent)) } else { t.log.Infof("Successfully repaired torrent %s using repair_method#2", t.GetKey(torrent)) } } else { t.log.Warnf("Torrent %s has no broken files to repair", t.GetKey(torrent)) } } func (t *TorrentManager) redownloadTorrent(torrent *Torrent, brokenFiles string) (*realdebrid.TorrentInfo, error) { t.log.Debugf("Redownloading torrent %s, broken files=%s (all if empty)", t.GetKey(torrent), brokenFiles) oldTorrentIDs := make([]string, 0) // broken files means broken links // if brokenFiles is not provided if brokenFiles == "" { // only replace the torrent if we are redownloading all files oldTorrentIDs = torrent.DownloadedIDs.ToSlice() tmpSelection := "" torrent.SelectedFiles.IterCb(func(_ string, file *File) { tmpSelection += fmt.Sprintf("%d,", file.ID) // select all files }) if tmpSelection == "" { return nil, nil // nothing to repair } else { brokenFiles = tmpSelection[:len(tmpSelection)-1] } } // redownload torrent resp, err := t.Api.AddMagnetHash(torrent.Hash) if err != nil { if strings.Contains(err.Error(), "infringing_file") { t.markAsUnfixable(torrent) } return nil, fmt.Errorf("cannot redownload torrent: %v", err) } time.Sleep(1 * time.Second) // select files newTorrentID := resp.ID err = t.Api.SelectTorrentFiles(newTorrentID, brokenFiles) if err != nil { t.Api.DeleteTorrent(newTorrentID) return nil, fmt.Errorf("cannot start redownloading: %v", err) } time.Sleep(1 * time.Second) // see if the torrent is ready info, err := t.Api.GetTorrentInfo(newTorrentID) if err != nil { t.Api.DeleteTorrent(newTorrentID) return nil, fmt.Errorf("cannot get info on redownloaded torrent %s (id=%s) : %v", t.GetKey(torrent), newTorrentID, err) } // documented status: magnet_error, magnet_conversion, waiting_files_selection, queued, downloading, downloaded, error, virus, compressing, uploading, dead okStatuses := []string{"magnet_conversion", "waiting_files_selection", "queued", "downloading", "downloaded", "uploading"} // not compressing because we need playable files isOkStatus := false for _, status := range okStatuses { if info.Status == status { isOkStatus = true break } } if !isOkStatus { t.Api.DeleteTorrent(newTorrentID) return nil, fmt.Errorf("the redownloaded torrent %s (id=%s) is in error state: %s", t.GetKey(torrent), newTorrentID, info.Status) } if info.Progress != 100 { t.log.Infof("Torrent %s (id=%s) is not cached anymore so we have to wait until completion (this should fix the issue already)", t.GetKey(torrent), info.ID) if len(oldTorrentIDs) > 0 { // only triggered when brokenFiles == "" for _, id := range oldTorrentIDs { t.Api.DeleteTorrent(id) } } else { t.fixers.Set(newTorrentID, torrent) } return info, nil } brokenCount := len(strings.Split(brokenFiles, ",")) if len(info.Links) != brokenCount { t.Api.DeleteTorrent(newTorrentID) return nil, fmt.Errorf("it did not fix the issue for %s (id=%s), only got %d files but we need %d, undoing", t.GetKey(torrent), info.ID, len(info.Links), brokenCount) } if len(oldTorrentIDs) > 0 { // only triggered when brokenFiles == "" for _, id := range oldTorrentIDs { t.Api.DeleteTorrent(id) } } else { t.fixers.Set(newTorrentID, torrent) } return info, nil } func (t *TorrentManager) canCapacityHandle() bool { // max waiting time is 45 minutes const maxRetries = 50 const baseDelay = 1 * time.Second const maxDelay = 60 * time.Second retryCount := 0 for { count, err := t.Api.GetActiveTorrentCount() if err != nil { t.log.Warnf("Cannot get active downloads count: %v", err) if retryCount >= maxRetries { t.log.Error("Max retries reached. Exiting.") return false } delay := time.Duration(math.Pow(2, float64(retryCount))) * baseDelay if delay > maxDelay { delay = maxDelay } time.Sleep(delay) retryCount++ continue } if count.DownloadingCount < count.MaxNumberOfTorrents-1 { return true } delay := time.Duration(math.Pow(2, float64(retryCount))) * baseDelay if delay > maxDelay { delay = maxDelay } t.log.Infof("We have reached the max number of active torrents, waiting for %s seconds before retrying", delay) if retryCount >= maxRetries { t.log.Error("Max retries reached. Exiting.") return false } time.Sleep(delay) retryCount++ } } func (t *TorrentManager) markAsUnplayable(torrent *Torrent) { t.log.Warnf("Marking torrent %s as unplayable", t.GetKey(torrent)) t.DirectoryMap.IterCb(func(directory string, torrents cmap.ConcurrentMap[string, *Torrent]) { torrents.Remove(t.GetKey(torrent)) }) torrents, _ := t.DirectoryMap.Get(config.UNPLAYABLE_TORRENTS) torrents.Set(t.GetKey(torrent), torrent) } func (t *TorrentManager) markAsUnfixable(torrent *Torrent) { t.log.Warnf("Marking torrent %s as unfixable", t.GetKey(torrent)) torrent.Unrepairable = true infoCache, _ := t.DirectoryMap.Get(INT_INFO_CACHE) torrent.DownloadedIDs.Each(func(id string) bool { info, _ := infoCache.Get(id) info.Unrepairable = true t.writeTorrentToFile(id, info) return false }) } func getBrokenFiles(torrent *Torrent) []*File { var brokenFiles []*File torrent.SelectedFiles.IterCb(func(_ string, file *File) { if !strings.HasPrefix(file.Link, "http") && file.Link != "unselect" { brokenFiles = append(brokenFiles, file) } }) return brokenFiles } func (t *TorrentManager) isStillBroken(info *realdebrid.TorrentInfo, brokenFiles []*File) bool { for _, oldFile := range brokenFiles { for idx, newFile := range info.Files { if oldFile.Path == newFile.Path { unrestrict := t.UnrestrictUntilOk(info.Links[idx]) if unrestrict == nil || oldFile.Bytes != unrestrict.Filesize { return true } } } } return false }