diff --git a/internal/torrent/manager.go b/internal/torrent/manager.go index 5812042..d1276ee 100644 --- a/internal/torrent/manager.go +++ b/internal/torrent/manager.go @@ -51,7 +51,7 @@ func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, w repairs: mapset.NewSet[string](), allAccessKeys: mapset.NewSet[string](), latestState: &LibraryState{}, - requiredVersion: "20.01.2024", + requiredVersion: "24.01.2024", workerPool: workerPool, repairPool: repairPool, log: log, diff --git a/internal/torrent/refresh.go b/internal/torrent/refresh.go index 76305b8..b2a406d 100644 --- a/internal/torrent/refresh.go +++ b/internal/torrent/refresh.go @@ -74,7 +74,7 @@ func (t *TorrentManager) RefreshTorrents() []string { return false }) - freshKeys := mapset.NewSet[string]() + newlyFetchedKeys := mapset.NewSet[string]() noInfoCount := 0 for info := range infoChan { if info == nil { @@ -83,7 +83,7 @@ func (t *TorrentManager) RefreshTorrents() []string { } accessKey := t.GetKey(info) if !info.AnyInProgress() { - freshKeys.Add(accessKey) + newlyFetchedKeys.Add(accessKey) } if torrent, exists := allTorrents.Get(accessKey); !exists { allTorrents.Set(accessKey, info) @@ -96,7 +96,7 @@ func (t *TorrentManager) RefreshTorrents() []string { var updatedPaths []string // torrents yet to be assigned in a directory - freshKeys.Difference(t.allAccessKeys).Each(func(accessKey string) bool { + newlyFetchedKeys.Difference(t.allAccessKeys).Each(func(accessKey string) bool { // assign to directories tor, ok := allTorrents.Get(accessKey) if !ok { @@ -117,7 +117,7 @@ func (t *TorrentManager) RefreshTorrents() []string { return false }) // removed torrents - t.allAccessKeys.Difference(freshKeys).Each(func(accessKey string) bool { + t.allAccessKeys.Difference(newlyFetchedKeys).Each(func(accessKey string) bool { t.Delete(accessKey, false) return false }) @@ -254,11 +254,19 @@ func (t *TorrentManager) mergeToMain(existing, toMerge *Torrent) Torrent { Hash: existing.Hash, DownloadedIDs: mapset.NewSet[string](), InProgressIDs: mapset.NewSet[string](), - Unrepairable: existing.Unrepairable || toMerge.Unrepairable, UnassignedLinks: existing.UnassignedLinks.Union(toMerge.UnassignedLinks), BrokenLinks: existing.BrokenLinks.Union(toMerge.BrokenLinks), } + // unrepairable reason + if existing.UnrepairableReason != "" && toMerge.UnrepairableReason != "" && existing.UnrepairableReason != toMerge.UnrepairableReason { + mainTorrent.UnrepairableReason = fmt.Sprintf("%s, %s", existing.UnrepairableReason, toMerge.UnrepairableReason) + } else if existing.UnrepairableReason != "" { + mainTorrent.UnrepairableReason = existing.UnrepairableReason + } else if toMerge.UnrepairableReason != "" { + mainTorrent.UnrepairableReason = toMerge.UnrepairableReason + } + // this function triggers only when we have a new DownloadedID toMerge.DownloadedIDs.Difference(existing.DownloadedIDs).Each(func(id string) bool { mainTorrent.DownloadedIDs.Add(id) @@ -288,6 +296,18 @@ func (t *TorrentManager) mergeToMain(existing, toMerge *Torrent) Torrent { } }) + // broken links + if mainTorrent.BrokenLinks.Cardinality() > 0 { + mainTorrent.SelectedFiles.IterCb(func(_ string, file *File) { + mainTorrent.BrokenLinks.Each(func(brokenLink string) bool { + if file.Link == brokenLink { + file.Link = "" + } + return file.Link == brokenLink + }) + }) + } + if existing.Added < toMerge.Added { mainTorrent.Added = toMerge.Added } else { diff --git a/internal/torrent/repair.go b/internal/torrent/repair.go index ebb7121..c70dba6 100644 --- a/internal/torrent/repair.go +++ b/internal/torrent/repair.go @@ -18,68 +18,30 @@ const ( func (t *TorrentManager) RepairAll() { _ = t.repairPool.Submit(func() { - t.log.Info("Checking for broken torrents") + t.log.Info("Periodic repair invoked; searching for broken torrents") t.repairAll() - t.log.Info("Finished checking for broken torrents") }) } func (t *TorrentManager) repairAll() { allTorrents, _ := t.DirectoryMap.Get(INT_ALL) - var hashGroups []mapset.Set[string] - const maxGroupSize = 399 + var toRepair mapset.Set[*Torrent] - currentGroup := mapset.NewSet[string]() - hashGroups = append(hashGroups, currentGroup) - - allTorrents.IterCb(func(_ string, torrent *Torrent) { - if torrent.AnyInProgress() || torrent.Unrepairable { - return - } - if currentGroup.Cardinality() >= maxGroupSize { - currentGroup = mapset.NewSet[string]() - hashGroups = append(hashGroups, currentGroup) - } - currentGroup.Add(torrent.Hash) - }) - - t.log.Debug("Checking if torrents are still cached") - var availabilityChecks = make(map[string]bool) - uncachedCount := 0 - for i := range hashGroups { - if hashGroups[i].Cardinality() == 0 { - break - } - resp, err := t.Api.AvailabilityCheck(hashGroups[i].ToSlice()) - if err != nil { - t.log.Warnf("Cannot check availability: %v", err) - continue - } - - for hash, hosterHash := range resp { - // Check if HosterHash is a map (Variants field is used) - availabilityChecks[hash] = len(hosterHash.Variants) > 0 - if !availabilityChecks[hash] { - uncachedCount++ - } - } + // check 1: for uncached torrents + uncachedTorrents, err := t.getUncachedTorrents() + if err != nil { + t.log.Warnf("Cannot check for uncached torrents: %v", err) + toRepair = mapset.NewSet[*Torrent]() + } else { + toRepair = mapset.NewSet[*Torrent](uncachedTorrents...) } - t.log.Debugf("Found %d torrents that are no longer cached", uncachedCount) - toRepair := mapset.NewSet[*Torrent]() allTorrents.IterCb(func(_ string, torrent *Torrent) { - if torrent.AnyInProgress() || torrent.Unrepairable { + if torrent.AnyInProgress() || torrent.UnrepairableReason != "" { return } - // check 1: for cached status - isCached := true - if _, ok := availabilityChecks[torrent.Hash]; !ok || !availabilityChecks[torrent.Hash] { - isCached = false - } - // todo: also handle file ID checks - // check 2: for broken files hasBrokenFiles := false torrent.SelectedFiles.IterCb(func(_ string, file *File) { @@ -87,16 +49,9 @@ func (t *TorrentManager) repairAll() { hasBrokenFiles = true return } - if !isCached && strings.HasPrefix(file.Link, "http") { - unrestrict := t.UnrestrictUntilOk(file.Link) - if unrestrict == nil || file.Bytes != unrestrict.Filesize { - hasBrokenFiles = true - return - } - } }) - if !isCached || hasBrokenFiles || torrent.UnassignedLinks.Cardinality() > 0 { + if hasBrokenFiles || torrent.UnassignedLinks.Cardinality() > 0 { toRepair.Add(torrent) } }) @@ -108,41 +63,44 @@ func (t *TorrentManager) repairAll() { } func (t *TorrentManager) Repair(torrent *Torrent) { - if torrent.Unrepairable { - t.log.Warnf("Torrent %s is unfixable, skipping repair", t.GetKey(torrent)) + if torrent.UnrepairableReason != "" { + t.log.Warnf("Torrent %s is unfixable (%s), skipping repair", t.GetKey(torrent), torrent.UnrepairableReason) return } if t.repairs.Contains(t.GetKey(torrent)) { t.log.Warnf("Torrent %s is already being repaired, skipping repair", t.GetKey(torrent)) return } - t.repairs.Add(t.GetKey(torrent)) - - // save the broken files to the file cache - infoCache, _ := t.DirectoryMap.Get(INT_INFO_CACHE) - torrent.DownloadedIDs.Each(func(id string) bool { - info, _ := infoCache.Get(id) - info.SelectedFiles.IterCb(func(_ string, file *File) { - torrent.BrokenLinks.Each(func(link string) bool { - if file.Link == link { - file.Link = "" - } - return file.Link == link - }) - }) - t.writeTorrentToFile(id, info) - return false - }) - - t.log.Infof("Attempting repair for torrent %s", t.GetKey(torrent)) if torrent.AnyInProgress() { t.log.Infof("Torrent %s is in progress, skipping repair until download is done", t.GetKey(torrent)) return } + t.repairs.Add(t.GetKey(torrent)) - proceed := t.canCapacityHandle() // blocks for approx 45 minutes if active torrents are full - if !proceed { - t.log.Error("Reached the max number of active torrents, cannot continue with the repair") + // save the broken files to the file cache + // broken files are also added when trying to open a file + if torrent.BrokenLinks.Cardinality() > 0 { + infoCache, _ := t.DirectoryMap.Get(INT_INFO_CACHE) + torrent.DownloadedIDs.Each(func(id string) bool { + info, _ := infoCache.Get(id) + info.SelectedFiles.IterCb(func(_ string, file *File) { + torrent.BrokenLinks.Each(func(brokenLink string) bool { + if file.Link == brokenLink { + file.Link = "" + } + return file.Link == brokenLink + }) + }) + t.writeTorrentToFile(id, info) + return false + }) + } + + t.log.Infof("Attempting repair for torrent %s", t.GetKey(torrent)) + + // blocks for approx 45 minutes if active torrents are full + if !t.canCapacityHandle() { + t.log.Error("Blocked for too long due to limit of active torrents, cannot continue with the repair") return } @@ -185,7 +143,7 @@ func (t *TorrentManager) repair(torrent *Torrent) { if !assigned { // if not assigned and is a rar, likely it was rar'ed by RD - if strings.HasSuffix(unrestrict.Filename, ".rar") { + if strings.HasSuffix(strings.ToLower(unrestrict.Filename), ".rar") { rarCount++ } newUnassignedLinks.Set(link, unrestrict) @@ -221,8 +179,8 @@ func (t *TorrentManager) repair(torrent *Torrent) { torrent.SelectedFiles.Set(unassigned.Filename, newFile) }) torrent.UnassignedLinks = mapset.NewSet[string]() - t.markAsUnfixable(torrent) - t.markAsUnplayable(torrent) + t.markAsUnfixable(torrent, "rar'ed by RD") + t.markAsUnplayable(torrent, "rar'ed by RD") } return } @@ -233,22 +191,25 @@ func (t *TorrentManager) repair(torrent *Torrent) { brokenFileIDs := strings.Join(fileIDs, ",") // first solution: reinsert and see if the broken file is now working - t.log.Debugf("repair_method#1: Trying to redownload torrent %s to repair files (%s)", t.GetKey(torrent), brokenFileIDs) + t.log.Debugf("repair_method#1: Trying to redownload torrent %s to repair files", t.GetKey(torrent)) info, err := t.redownloadTorrent(torrent, "") if err != nil { t.log.Warnf("Cannot repair torrent %s using repair_method#1", t.GetKey(torrent)) - } - if info != nil && info.Progress != 100 { + } else if info != nil && info.Progress != 100 { t.log.Infof("Redownloading torrent %s after repair_method#1, it should work once done", t.GetKey(torrent)) return - } - if info != nil && info.IsDone() && !t.isStillBroken(info, brokenFiles) { + } else if info != nil && info.IsDone() && !t.isStillBroken(info, brokenFiles) { t.log.Infof("Successfully repaired torrent %s using repair_method#1", t.GetKey(torrent)) return - } - if info != nil && info.ID != "" { - t.log.Warnf("Torrent %s is still broken after repair_method#1, cleaning up", t.GetKey(torrent)) + } else if info != nil && info.ID != "" { + t.log.Warnf("Torrent %s is still broken after repair_method#1, cleaning up (deleting ID=%s)", t.GetKey(torrent), info.ID) t.Api.DeleteTorrent(info.ID) + t.fixers.Set(info.ID, torrent) + } + + if torrent.UnrepairableReason != "" { + t.log.Warnf("Torrent %s has been marked as unfixable (%s), ending repair process", t.GetKey(torrent), torrent.UnrepairableReason) + return } // second solution: add only the broken files @@ -261,7 +222,7 @@ func (t *TorrentManager) repair(torrent *Torrent) { t.log.Infof("Successfully repaired torrent %s using repair_method#2", t.GetKey(torrent)) } } else { - t.log.Warnf("Torrent %s has no broken files to repair", t.GetKey(torrent)) + t.log.Infof("Torrent %s has no broken files to repair", t.GetKey(torrent)) } } @@ -287,7 +248,7 @@ func (t *TorrentManager) redownloadTorrent(torrent *Torrent, brokenFiles string) resp, err := t.Api.AddMagnetHash(torrent.Hash) if err != nil { if strings.Contains(err.Error(), "infringing_file") { - t.markAsUnfixable(torrent) + t.markAsUnfixable(torrent, "infringing_file") } return nil, fmt.Errorf("cannot redownload torrent: %v", err) } @@ -330,6 +291,7 @@ func (t *TorrentManager) redownloadTorrent(torrent *Torrent, brokenFiles string) if len(oldTorrentIDs) > 0 { for _, id := range oldTorrentIDs { t.Api.DeleteTorrent(id) + t.fixers.Set(id, torrent) } } else { t.fixers.Set(newTorrentID, torrent) @@ -348,6 +310,7 @@ func (t *TorrentManager) redownloadTorrent(torrent *Torrent, brokenFiles string) // only triggered when brokenFiles == "" for _, id := range oldTorrentIDs { t.Api.DeleteTorrent(id) + t.fixers.Set(id, torrent) } } else { t.fixers.Set(newTorrentID, torrent) @@ -398,8 +361,8 @@ func (t *TorrentManager) canCapacityHandle() bool { } } -func (t *TorrentManager) markAsUnplayable(torrent *Torrent) { - t.log.Warnf("Marking torrent %s as unplayable", t.GetKey(torrent)) +func (t *TorrentManager) markAsUnplayable(torrent *Torrent, reason string) { + t.log.Warnf("Marking torrent %s as unplayable - %s", t.GetKey(torrent), reason) t.DirectoryMap.IterCb(func(directory string, torrents cmap.ConcurrentMap[string, *Torrent]) { torrents.Remove(t.GetKey(torrent)) }) @@ -407,13 +370,13 @@ func (t *TorrentManager) markAsUnplayable(torrent *Torrent) { torrents.Set(t.GetKey(torrent), torrent) } -func (t *TorrentManager) markAsUnfixable(torrent *Torrent) { - t.log.Warnf("Marking torrent %s as unfixable", t.GetKey(torrent)) - torrent.Unrepairable = true +func (t *TorrentManager) markAsUnfixable(torrent *Torrent, reason string) { + t.log.Warnf("Marking torrent %s as unfixable - %s", t.GetKey(torrent), reason) + torrent.UnrepairableReason = reason infoCache, _ := t.DirectoryMap.Get(INT_INFO_CACHE) torrent.DownloadedIDs.Each(func(id string) bool { info, _ := infoCache.Get(id) - info.Unrepairable = true + info.UnrepairableReason = reason t.writeTorrentToFile(id, info) return false }) @@ -431,7 +394,6 @@ func getBrokenFiles(torrent *Torrent) []*File { func (t *TorrentManager) isStillBroken(info *realdebrid.TorrentInfo, brokenFiles []*File) bool { var selectedFiles []*File - // if some Links are empty, we need to repair it for _, file := range info.Files { if file.Selected == 0 { continue @@ -448,8 +410,15 @@ func (t *TorrentManager) isStillBroken(info *realdebrid.TorrentInfo, brokenFiles file.Link = info.Links[i] } } else { + // if we can't assign links, it's still broken return true } + + if len(brokenFiles) == 0 { + // just check for the last file + brokenFiles = append(brokenFiles, selectedFiles[len(selectedFiles)-1]) + } + for _, oldFile := range brokenFiles { for idx, newFile := range selectedFiles { if oldFile.Path == newFile.Path || oldFile.Bytes == newFile.Bytes { diff --git a/internal/torrent/types.go b/internal/torrent/types.go index f609659..2141f08 100644 --- a/internal/torrent/types.go +++ b/internal/torrent/types.go @@ -14,17 +14,17 @@ import ( var json = jsoniter.ConfigCompatibleWithStandardLibrary type Torrent struct { - Hash string `json:"Hash"` // immutable - Added string `json:"Added"` // immutable - UnassignedLinks mapset.Set[string] `json:"UnassignedLinks"` // immutable - DownloadedIDs mapset.Set[string] `json:"DownloadedIDs"` // immutable - InProgressIDs mapset.Set[string] `json:"InProgressIDs"` // immutable - Name string `json:"Name"` // immutable - OriginalName string `json:"OriginalName"` // immutable - Rename string `json:"Rename"` // modified over time - SelectedFiles cmap.ConcurrentMap[string, *File] `json:"-"` // modified over time - Unrepairable bool `json:"Unfixable"` // modified over time - BrokenLinks mapset.Set[string] `json:"BrokenLinks"` // only relevant on repair + Hash string `json:"Hash"` // immutable + Added string `json:"Added"` // immutable + UnassignedLinks mapset.Set[string] `json:"UnassignedLinks"` // immutable + DownloadedIDs mapset.Set[string] `json:"DownloadedIDs"` // immutable + InProgressIDs mapset.Set[string] `json:"InProgressIDs"` // immutable + Name string `json:"Name"` // immutable + OriginalName string `json:"OriginalName"` // immutable + Rename string `json:"Rename"` // modified over time + SelectedFiles cmap.ConcurrentMap[string, *File] `json:"-"` // modified over time + UnrepairableReason string `json:"Unfixable"` // modified over time + BrokenLinks mapset.Set[string] `json:"BrokenLinks"` // only relevant on repair Version string `json:"Version"` // only used for files } diff --git a/internal/torrent/uncached.go b/internal/torrent/uncached.go new file mode 100644 index 0000000..84e62d7 --- /dev/null +++ b/internal/torrent/uncached.go @@ -0,0 +1,56 @@ +package torrent + +import ( + "fmt" + + mapset "github.com/deckarep/golang-set/v2" +) + +func (t *TorrentManager) getUncachedTorrents() ([]*Torrent, error) { + t.log.Debug("Checking if torrents are still cached") + allTorrents, _ := t.DirectoryMap.Get(INT_ALL) + + var hashGroups []mapset.Set[string] + const maxGroupSize = 399 + + currentGroup := mapset.NewSet[string]() + hashGroups = append(hashGroups, currentGroup) + + allTorrents.IterCb(func(_ string, torrent *Torrent) { + if torrent.AnyInProgress() || torrent.UnrepairableReason != "" { + return + } + + if currentGroup.Cardinality() >= maxGroupSize { + currentGroup = mapset.NewSet[string]() + hashGroups = append(hashGroups, currentGroup) + } + currentGroup.Add(torrent.Hash) + }) + + var availabilityChecks = make(map[string]bool) + for i := range hashGroups { + if hashGroups[i].Cardinality() == 0 { + break + } + + resp, err := t.Api.AvailabilityCheck(hashGroups[i].ToSlice()) + if err != nil { + return nil, fmt.Errorf("availability check is incomplete, skipping uncached check: %v", err) + } + for hash, hosterHash := range resp { + // Check if HosterHash is a map (Variants field is used) + availabilityChecks[hash] = len(hosterHash.Variants) > 0 + } + } + + var uncachedTorrents []*Torrent + allTorrents.IterCb(func(_ string, torrent *Torrent) { + if _, ok := availabilityChecks[torrent.Hash]; !ok || !availabilityChecks[torrent.Hash] { + uncachedTorrents = append(uncachedTorrents, torrent) + } + }) + t.log.Debugf("Found %d torrents that are no longer cached", len(uncachedTorrents)) + + return uncachedTorrents, nil +} diff --git a/internal/universal/downloader.go b/internal/universal/downloader.go index 61fae82..dfe9914 100644 --- a/internal/universal/downloader.go +++ b/internal/universal/downloader.go @@ -54,9 +54,9 @@ func (dl *Downloader) DownloadFile(directory, torrentName, fileName string, resp unrestrict := torMgr.UnrestrictUntilOk(link) if unrestrict == nil { log.Warnf("File %s cannot be unrestricted (link=%s)", fileName, link) + torrent.BrokenLinks.Add(file.Link) + file.Link = "repair" if cfg.EnableRepair() { - torrent.BrokenLinks.Add(file.Link) - file.Link = "repair" torMgr.SetNewLatestState(intTor.LibraryState{}) } else { log.Infof("Repair is disabled, skipping repair for unavailable file %s (link=%s)", fileName, link) @@ -166,9 +166,9 @@ func (dl *Downloader) streamFileToResponse(torrent *intTor.Torrent, file *intTor if err != nil { log.Warnf("Cannot download file %s: %v", unrestrict.Download, err) if file != nil && unrestrict.Streamable == 1 { + torrent.BrokenLinks.Add(file.Link) + file.Link = "repair" if cfg.EnableRepair() && torrent != nil { - torrent.BrokenLinks.Add(file.Link) - file.Link = "repair" torMgr.SetNewLatestState(intTor.LibraryState{}) } else { log.Infof("Repair is disabled, skipping repair for unavailable file %s (link=%s)", file.Path, file.Link) @@ -177,14 +177,13 @@ func (dl *Downloader) streamFileToResponse(torrent *intTor.Torrent, file *intTor http.Error(resp, "File is not available", http.StatusNotFound) return } - defer download.Body.Close() if download.StatusCode != http.StatusOK && download.StatusCode != http.StatusPartialContent { if file != nil && unrestrict.Streamable == 1 { log.Warnf("Received a %s status code for file %s", download.Status, file.Path) + torrent.BrokenLinks.Add(file.Link) + file.Link = "repair" if cfg.EnableRepair() && torrent != nil { - torrent.BrokenLinks.Add(file.Link) - file.Link = "repair" torMgr.SetNewLatestState(intTor.LibraryState{}) } else { log.Infof("Repair is disabled, skipping repair for unavailable file %s (link=%s)", file.Path, file.Link)