package torrent import ( "fmt" "math" "strings" "time" "github.com/debridmediamanager/zurg/internal/config" "github.com/debridmediamanager/zurg/pkg/realdebrid" mapset "github.com/deckarep/golang-set/v2" cmap "github.com/orcaman/concurrent-map/v2" ) const EXPIRED_LINK_TOLERANCE_HOURS = 24 func (t *TorrentManager) RepairAll() { _ = t.repairWorker.Submit(func() { t.log.Info("Repairing all broken torrents") t.repairAll() t.log.Info("Finished repairing all torrents") }) } func (t *TorrentManager) repairAll() { allTorrents, _ := t.DirectoryMap.Get(INT_ALL) var hashGroups []mapset.Set[string] const maxGroupSize = 399 currentGroup := mapset.NewSet[string]() hashGroups = append(hashGroups, currentGroup) allTorrents.IterCb(func(_ string, torrent *Torrent) { if torrent.AnyInProgress() || torrent.Unfixable { return } // Check if the current group is full if currentGroup.Cardinality() >= maxGroupSize { // Create a new group and add it to the hashGroups currentGroup = mapset.NewSet[string]() hashGroups = append(hashGroups, currentGroup) } // Add the hash to the current group currentGroup.Add(torrent.Hash) }) t.log.Debug("Checking if torrents are still cached") var availabilityChecks = make(map[string]bool) uncachedCount := 0 for i := range hashGroups { if hashGroups[i].Cardinality() == 0 { break } resp, err := t.Api.AvailabilityCheck(hashGroups[i].ToSlice()) if err != nil { t.log.Warnf("Cannot check availability: %v", err) continue } for hash, hosterHash := range resp { // Check if HosterHash is a map (Variants field is used) availabilityChecks[hash] = len(hosterHash.Variants) > 0 if !availabilityChecks[hash] { uncachedCount++ } } } t.log.Debugf("Found %d torrents that are no longer cached", uncachedCount) var toRepair []*Torrent allTorrents.IterCb(func(_ string, torrent *Torrent) { if torrent.AnyInProgress() || torrent.Unfixable { return } // check 1: for cached status isCached := true if _, ok := availabilityChecks[torrent.Hash]; !ok || !availabilityChecks[torrent.Hash] { isCached = false } // todo: also handle file ID checks // check 2: for broken files hasBrokenFiles := false torrent.SelectedFiles.IterCb(func(_ string, file *File) { if file.Link == "repair" || file.Link == "" { hasBrokenFiles = true } }) if !isCached || hasBrokenFiles { toRepair = append(toRepair, torrent) } }) t.log.Debugf("Found %d broken torrents to repair in total", len(toRepair)) for i := range toRepair { torrent := toRepair[i] t.log.Infof("Repairing %s", torrent.AccessKey) t.repair(torrent) } } func (t *TorrentManager) Repair(torrent *Torrent) { infoCache, _ := t.DirectoryMap.Get(INT_INFO_CACHE) torrent.DownloadedIDs.Each(func(id string) bool { infoCache.Set(id, torrent) t.writeTorrentToFile(id, torrent) return false }) _ = t.repairWorker.Submit(func() { t.log.Infof("Repairing torrent %s", torrent.AccessKey) t.repair(torrent) t.log.Infof("Finished repairing torrent %s", torrent.AccessKey) }) } func (t *TorrentManager) repair(torrent *Torrent) { if torrent.AnyInProgress() { t.log.Infof("Torrent %s is in progress, skipping repair until download is done", torrent.AccessKey) return } proceed := t.canCapacityHandle() // blocks for approx 45 minutes if active torrents are full if !proceed { t.log.Error("Reached the max number of active torrents, cannot continue with the repair") return } if torrent.OlderThanDuration(EXPIRED_LINK_TOLERANCE_HOURS * time.Hour) { // first solution: reinsert with same selection t.log.Infof("Torrent %s is older than %d hours, reinserting it", torrent.AccessKey, EXPIRED_LINK_TOLERANCE_HOURS) if t.reinsertTorrent(torrent, "") { t.log.Infof("Successfully downloaded torrent %s to repair it", torrent.AccessKey) return } else if !torrent.Unfixable { t.log.Warnf("Failed to repair by reinserting torrent %s, will only redownload broken files...", torrent.AccessKey) } else { t.log.Warnf("Cannot repair torrent %s", torrent.AccessKey) return } } else { t.log.Warnf("Torrent %s is not older than %d hours to be repaired by reinsertion, will only redownload broken files...", torrent.AccessKey, EXPIRED_LINK_TOLERANCE_HOURS) } // sleep for 30 seconds to let the torrent accumulate more broken files if scanning time.Sleep(30 * time.Second) // handle rar'ed torrents assignedCount := 0 rarCount := 0 unassignedDownloads := make([]*realdebrid.Download, 0) torrent.UnassignedLinks.Each(func(link string) bool { unrestrict := t.UnrestrictUntilOk(link) if unrestrict != nil { // assign to a selected file assigned := false torrent.SelectedFiles.IterCb(func(_ string, file *File) { // if strings.HasSuffix(file.Path, unrestrict.Filename) { if file.Bytes == unrestrict.Filesize { file.Link = unrestrict.Link assigned = true assignedCount++ } }) if !assigned { if strings.HasSuffix(unrestrict.Filename, ".rar") { rarCount++ } unassignedDownloads = append(unassignedDownloads, unrestrict) } } return false }) if assignedCount > 0 { t.log.Infof("Assigned %d links to selected files for torrent %s", assignedCount, torrent.AccessKey) } else if rarCount > 0 { // this is a rar'ed torrent, nothing we can do if t.Config.ShouldDeleteRarFiles() { t.log.Warnf("Torrent %s is rar'ed and we cannot repair it, deleting it as configured", torrent.AccessKey) t.Delete(torrent.AccessKey, true) } else { for _, unassigned := range unassignedDownloads { newFile := &File{ File: realdebrid.File{ ID: 0, Path: unassigned.Filename, Bytes: unassigned.Filesize, Selected: 1, }, Ended: torrent.LatestAdded, Link: unassigned.Link, } torrent.SelectedFiles.Set(unassigned.Filename, newFile) } t.markAsUnfixable(torrent) t.markAsUnplayable(torrent) } return } // second solution: add only the broken files var brokenFiles []File torrent.SelectedFiles.IterCb(func(_ string, file *File) { if file.Link == "repair" || file.Link == "" { brokenFiles = append(brokenFiles, *file) file.Link = "repairing" } }) t.log.Debugf("During repair, zurg found %d broken files for torrent %s", len(brokenFiles), torrent.AccessKey) // todo: to verify removed logic when there's only 1 selected file selected and it's broken if len(brokenFiles) == 1 && torrent.SelectedFiles.Count() > 1 { // if we download a single file, it will be named differently // so we need to download 1 extra file to preserve the name // this is only relevant if we enable retain_rd_torrent_name // add the first file link encountered with a prefix of http t.log.Debugf("Torrent %s has only 1 broken file, adding 1 extra file to preserve the name", torrent.AccessKey) for _, file := range torrent.SelectedFiles.Items() { if strings.HasPrefix(file.Link, "http") { brokenFiles = append(brokenFiles, *file) break } } } if len(brokenFiles) > 0 { t.log.Infof("Redownloading the %d broken files for torrent %s", len(brokenFiles), torrent.AccessKey) brokenFileIDs := strings.Join(getFileIDs(brokenFiles), ",") if t.reinsertTorrent(torrent, brokenFileIDs) { t.log.Infof("Successfully downloaded torrent %s to repair it", torrent.AccessKey) } else { t.log.Warnf("Cannot repair torrent %s", torrent.AccessKey) } } else { t.log.Warnf("Torrent %s has no broken files to repair", torrent.AccessKey) } } func (t *TorrentManager) reinsertTorrent(torrent *Torrent, brokenFiles string) bool { oldTorrentIDs := make([]string, 0) // broken files means broken links // if brokenFiles is not provided if brokenFiles == "" { // only replace the torrent if we are reinserting all files oldTorrentIDs = torrent.DownloadedIDs.ToSlice() tmpSelection := "" torrent.SelectedFiles.IterCb(func(_ string, file *File) { tmpSelection += fmt.Sprintf("%d,", file.ID) // select all files }) if tmpSelection == "" { return true // nothing to repair } else { brokenFiles = tmpSelection[:len(tmpSelection)-1] } } // redownload torrent resp, err := t.Api.AddMagnetHash(torrent.Hash) if err != nil { t.log.Warnf("Cannot redownload torrent: %v", err) if strings.Contains(err.Error(), "infringing_file") { t.markAsUnfixable(torrent) } return false } time.Sleep(1 * time.Second) // select files newTorrentID := resp.ID err = t.Api.SelectTorrentFiles(newTorrentID, brokenFiles) if err != nil { t.log.Warnf("Cannot start redownloading: %v", err) t.Api.DeleteTorrent(newTorrentID) return false } time.Sleep(10 * time.Second) // see if the torrent is ready info, err := t.Api.GetTorrentInfo(newTorrentID) if err != nil { t.log.Warnf("Cannot get info on redownloaded torrent id=%s : %v", newTorrentID, err) t.Api.DeleteTorrent(newTorrentID) return false } if info.Status == "magnet_error" || info.Status == "error" || info.Status == "virus" || info.Status == "dead" { t.log.Warnf("The redownloaded torrent id=%s is in error state: %s", newTorrentID, info.Status) t.Api.DeleteTorrent(newTorrentID) return false } if info.Progress != 100 { t.log.Infof("Torrent id=%s is not cached anymore so we have to wait until completion (this should fix the issue already)", info.ID) for _, id := range oldTorrentIDs { t.Api.DeleteTorrent(id) } return true } brokenCount := len(strings.Split(brokenFiles, ",")) if len(info.Links) != brokenCount { t.log.Infof("It did not fix the issue for id=%s, only got %d files but we need %d, undoing", info.ID, len(info.Links), brokenCount) t.Api.DeleteTorrent(newTorrentID) return false } t.log.Infof("Repair successful id=%s", newTorrentID) for _, id := range oldTorrentIDs { t.Api.DeleteTorrent(id) } return true } func (t *TorrentManager) canCapacityHandle() bool { // max waiting time is 45 minutes const maxRetries = 50 const baseDelay = 1 * time.Second const maxDelay = 60 * time.Second retryCount := 0 for { count, err := t.Api.GetActiveTorrentCount() if err != nil { t.log.Warnf("Cannot get active downloads count: %v", err) if retryCount >= maxRetries { t.log.Error("Max retries reached. Exiting.") return false } delay := time.Duration(math.Pow(2, float64(retryCount))) * baseDelay if delay > maxDelay { delay = maxDelay } time.Sleep(delay) retryCount++ continue } if count.DownloadingCount < count.MaxNumberOfTorrents { return true } delay := time.Duration(math.Pow(2, float64(retryCount))) * baseDelay if delay > maxDelay { delay = maxDelay } t.log.Infof("We have reached the max number of active torrents, waiting for %s seconds before retrying", delay) if retryCount >= maxRetries { t.log.Error("Max retries reached. Exiting.") return false } time.Sleep(delay) retryCount++ } } func (t *TorrentManager) markAsUnplayable(torrent *Torrent) { t.log.Warnf("Marking torrent %s as unplayable", torrent.AccessKey) t.DirectoryMap.IterCb(func(directory string, torrents cmap.ConcurrentMap[string, *Torrent]) { torrents.Remove(torrent.AccessKey) }) torrents, _ := t.DirectoryMap.Get(config.UNPLAYABLE_TORRENTS) torrents.Set(torrent.AccessKey, torrent) } func (t *TorrentManager) markAsUnfixable(torrent *Torrent) { t.log.Warnf("Marking torrent %s as unfixable", torrent.AccessKey) torrent.Unfixable = true infoCache, _ := t.DirectoryMap.Get(INT_INFO_CACHE) torrent.DownloadedIDs.Each(func(id string) bool { info, _ := infoCache.Get(id) info.Unfixable = true t.writeTorrentToFile(id, torrent) return false }) }