diff --git a/internal/torrent/delete.go b/internal/torrent/delete.go index a23a907..a42a72f 100644 --- a/internal/torrent/delete.go +++ b/internal/torrent/delete.go @@ -36,7 +36,7 @@ func (t *TorrentManager) Delete(accessKey string, deleteInRD bool) { infoCache, _ := t.DirectoryMap.Get(INT_INFO_CACHE) if torrent, ok := allTorrents.Get(accessKey); ok { torrent.DownloadedIDs.Union(torrent.InProgressIDs).Each(func(id string) bool { - t.log.Debugf("Deleting torrent %s %s in RD", id, accessKey) + t.log.Debugf("Deleting torrent %s (id=%s) in RD", accessKey, id) t.Api.DeleteTorrent(id) infoCache.Remove(id) t.deleteTorrentFile(id) diff --git a/internal/torrent/manager.go b/internal/torrent/manager.go index 9ca1121..7f2492b 100644 --- a/internal/torrent/manager.go +++ b/internal/torrent/manager.go @@ -29,7 +29,7 @@ type TorrentManager struct { DownloadMap cmap.ConcurrentMap[string, *realdebrid.Download] fixers cmap.ConcurrentMap[string, *Torrent] repairs mapset.Set[string] - ensureDelete mapset.Set[string] + deleteOnceDone mapset.Set[string] allAccessKeys mapset.Set[string] latestState *LibraryState requiredVersion string @@ -50,7 +50,7 @@ func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, w DownloadMap: cmap.New[*realdebrid.Download](), fixers: cmap.New[*Torrent](), repairs: mapset.NewSet[string](), - ensureDelete: mapset.NewSet[string](), + deleteOnceDone: mapset.NewSet[string](), allAccessKeys: mapset.NewSet[string](), latestState: &LibraryState{}, requiredVersion: "24.01.2024", diff --git a/internal/torrent/refresh.go b/internal/torrent/refresh.go index 4bfdaa8..a3d98ce 100644 --- a/internal/torrent/refresh.go +++ b/internal/torrent/refresh.go @@ -23,43 +23,23 @@ func (t *TorrentManager) RefreshTorrents() []string { var wg sync.WaitGroup allTorrents, _ := t.DirectoryMap.Get(INT_ALL) - expiredFixers := t.ensureDelete.Clone() + doesNotExist := t.deleteOnceDone.Clone() for i := range instances { idx := i - expiredFixers.Remove(instances[idx].ID) + doesNotExist.Remove(instances[idx].ID) wg.Add(1) _ = t.workerPool.Submit(func() { defer wg.Done() if !t.fixers.Has(instances[idx].ID) { + // not a fixer, just regular torrent infoChan <- t.getMoreInfo(instances[idx]) return } else if instances[idx].IsDone() { - fixer := instances[idx] - torrent, _ := t.fixers.Pop(fixer.ID) - t.log.Debugf("Fixer %s is done, let's check if it fixed torrent %s by redownloading", instances[idx].ID, t.GetKey(torrent)) - brokenFiles := getBrokenFiles(torrent) - info, err := t.redownloadTorrent(torrent, "") - if err != nil { - t.log.Warnf("Cannot redownload torrent %s after fixer is done: %v", t.GetKey(torrent), err) - infoChan <- nil - return - } - if info.IsDone() { - if t.isStillBroken(info, brokenFiles) { - t.log.Warnf("Fixer is done but torrent %s is still broken; let's keep the fixer", t.GetKey(torrent)) - infoChan <- t.getMoreInfo(fixer) - return - } else { - t.log.Infof("Fixer resolved issues for torrent %s, broken files are repaired", t.GetKey(torrent)) - } - } else { - t.log.Warnf("Torrent %s is still not done after redownload; likely the fixer did its job", t.GetKey(torrent)) - } - t.Api.DeleteTorrent(fixer.ID) // delete the fixer - infoChan <- nil + // fixer is done, let's check if it fixed the torrent + infoChan <- t.handleFixers(instances[idx]) return - } + infoChan <- nil }) } @@ -68,21 +48,21 @@ func (t *TorrentManager) RefreshTorrents() []string { t.log.Debugf("Fetched info for %d torrents", len(instances)) // delete expired fixers - expiredFixers.Each(func(fixerID string) bool { - t.log.Debugf("Deleting expired fixer %s", fixerID) + doesNotExist.Each(func(fixerID string) bool { t.fixers.Remove(fixerID) - t.ensureDelete.Remove(fixerID) + t.deleteOnceDone.Remove(fixerID) return false }) // ensure delete infoCache, _ := t.DirectoryMap.Get(INT_INFO_CACHE) - t.ensureDelete.Each(func(fixerID string) bool { + t.deleteOnceDone.Each(func(fixerID string) bool { torrent, exists := infoCache.Get(fixerID) if exists && torrent.AnyInProgress() { return false } t.log.Debugf("Ensuring that torrent id=%s is deleted", fixerID) t.Delete(t.GetKey(torrent), true) + t.Api.DeleteTorrent(fixerID) return false }) @@ -260,12 +240,13 @@ func (t *TorrentManager) getMoreInfo(rdTorrent realdebrid.Torrent) *Torrent { func (t *TorrentManager) mergeToMain(existing, toMerge *Torrent) Torrent { mainTorrent := Torrent{ - Name: existing.Name, - OriginalName: existing.OriginalName, - Rename: existing.Rename, - Hash: existing.Hash, - DownloadedIDs: mapset.NewSet[string](), - InProgressIDs: mapset.NewSet[string](), + Name: existing.Name, + OriginalName: existing.OriginalName, + Rename: existing.Rename, + Hash: existing.Hash, + DownloadedIDs: mapset.NewSet[string](), + InProgressIDs: mapset.NewSet[string](), + // UnassignedLinks: mapset.NewSet[string](), UnassignedLinks: existing.UnassignedLinks.Union(toMerge.UnassignedLinks), BrokenLinks: existing.BrokenLinks.Union(toMerge.BrokenLinks), } diff --git a/internal/torrent/repair.go b/internal/torrent/repair.go index f008e2d..7d92c4b 100644 --- a/internal/torrent/repair.go +++ b/internal/torrent/repair.go @@ -17,6 +17,7 @@ const ( ) func (t *TorrentManager) RepairAll() { + // there is 1 repair worker, with max 1 blocking task _ = t.repairPool.Submit(func() { t.log.Info("Periodic repair invoked; searching for broken torrents") t.repairAll() @@ -26,17 +27,9 @@ func (t *TorrentManager) RepairAll() { func (t *TorrentManager) repairAll() { allTorrents, _ := t.DirectoryMap.Get(INT_ALL) + // collect all torrents that need to be repaired var toRepair mapset.Set[*Torrent] - // check 1: for uncached torrents - uncachedTorrents, err := t.getUncachedTorrents() - if err != nil { - t.log.Warnf("Cannot check for uncached torrents: %v", err) - toRepair = mapset.NewSet[*Torrent]() - } else { - toRepair = mapset.NewSet[*Torrent](uncachedTorrents...) - } - allTorrents.IterCb(func(_ string, torrent *Torrent) { if torrent.AnyInProgress() || torrent.UnrepairableReason != "" { return @@ -53,15 +46,18 @@ func (t *TorrentManager) repairAll() { if hasBrokenFiles { t.log.Debugf("Torrent %s has broken files, adding to repair list", t.GetKey(torrent)) toRepair.Add(torrent) + return } // check 3: for expired links if torrent.UnassignedLinks.Cardinality() > 0 { t.log.Debugf("Torrent %s has unassigned links, adding to repair list", t.GetKey(torrent)) toRepair.Add(torrent) + return } }) t.log.Debugf("Found %d broken torrents to repair in total", toRepair.Cardinality()) + toRepair.Each(func(torrent *Torrent) bool { t.Repair(torrent) return false @@ -81,6 +77,7 @@ func (t *TorrentManager) Repair(torrent *Torrent) { t.log.Infof("Torrent %s is in progress, skipping repair until download is done", t.GetKey(torrent)) return } + t.repairs.Add(t.GetKey(torrent)) // save the broken files to the file cache @@ -89,15 +86,21 @@ func (t *TorrentManager) Repair(torrent *Torrent) { infoCache, _ := t.DirectoryMap.Get(INT_INFO_CACHE) torrent.DownloadedIDs.Each(func(id string) bool { info, _ := infoCache.Get(id) + hasBrokenFiles := false info.SelectedFiles.IterCb(func(_ string, file *File) { torrent.BrokenLinks.Each(func(brokenLink string) bool { if file.Link == brokenLink { + hasBrokenFiles = true file.Link = "" + return true } - return file.Link == brokenLink + return false }) }) - t.writeTorrentToFile(id, info) + if hasBrokenFiles { + info.BrokenLinks = torrent.BrokenLinks + t.writeTorrentToFile(id, info) + } return false }) } @@ -110,6 +113,7 @@ func (t *TorrentManager) Repair(torrent *Torrent) { return } + // assign to a worker _ = t.workerPool.Submit(func() { t.repair(torrent) t.repairs.Remove(t.GetKey(torrent)) @@ -119,16 +123,59 @@ func (t *TorrentManager) Repair(torrent *Torrent) { func (t *TorrentManager) repair(torrent *Torrent) { t.log.Infof("Started repair process for torrent %s", t.GetKey(torrent)) + // handle torrents with incomplete links for selected files + if !t.assignUnassignedLinks(torrent) { + return + } + + // get all broken files + brokenFiles := getBrokenFiles(torrent) + t.log.Debugf("Torrent %s has %d broken out of %d files", t.GetKey(torrent), len(brokenFiles), torrent.SelectedFiles.Count()) + brokenFileIDs := getFileIDs(brokenFiles) + + // first solution: reinsert and see if the broken file is now working + t.log.Debugf("repair_method#1: Redownloading whole torrent %s", t.GetKey(torrent)) + info, err := t.redownloadTorrent(torrent, "") // reinsert the torrent, passing "" + if err != nil { + t.log.Warnf("repair_method#1: Cannot repair torrent %s (error=%s)", t.GetKey(torrent), err.Error()) + } else if info != nil && info.Progress != 100 { + t.log.Infof("repair_method#1: Torrent %s is still in progress but it should work once done (torrent is temporarily hidden until download has completed)", t.GetKey(torrent)) + return + } else if info != nil && info.IsDone() && !t.isStillBroken(info, brokenFiles) { + t.log.Infof("Successfully repaired torrent %s using repair_method#1", t.GetKey(torrent)) + return + } + + if torrent.UnrepairableReason != "" { + t.log.Debugf("Torrent %s has been marked as unfixable during repair_method#1 (%s), ending repair process early", t.GetKey(torrent), torrent.UnrepairableReason) + return + } + + if len(brokenFiles) == torrent.SelectedFiles.Count() { + t.log.Warnf("Torrent %s has all broken files, marking as unplayable", t.GetKey(torrent)) + return + } + + // second solution: add only the broken files + if len(brokenFiles) > 0 { + t.log.Infof("repair_method#2: Redownloading %dof%d files only for torrent %s", len(brokenFiles), torrent.SelectedFiles.Count(), t.GetKey(torrent)) + _, err := t.redownloadTorrent(torrent, brokenFileIDs) + if err != nil { + t.log.Warnf("repair_method#2: Cannot repair torrent %s (error=%s)", t.GetKey(torrent), err.Error()) + } + return + } + + t.log.Infof("Torrent %s has no broken files to repair", t.GetKey(torrent)) +} + +func (t *TorrentManager) assignUnassignedLinks(torrent *Torrent) bool { // handle torrents with incomplete links for selected files assignedCount := 0 - - // number of rar files detected from the unrestricted links rarCount := 0 - newUnassignedLinks := cmap.New[*realdebrid.Download]() - - // unrestrict each unassigned link that was filled out during torrent init torrent.UnassignedLinks.Each(func(link string) bool { + // unrestrict each unassigned link that was filled out during torrent init unrestrict := t.UnrestrictUntilOk(link) if unrestrict == nil { newUnassignedLinks.Set(link, nil) @@ -156,18 +203,13 @@ func (t *TorrentManager) repair(torrent *Torrent) { } return false }) - - if assignedCount > 0 { - // if there are any assigned count - t.log.Infof("Assigned %d links to selected files for torrent %s", assignedCount, t.GetKey(torrent)) - } else if rarCount == 1 && newUnassignedLinks.Count() == 1 { - // also is assignedCount=0 + if assignedCount == 0 && rarCount > 0 && newUnassignedLinks.Count() > 0 { // this is a rar'ed torrent, nothing we can do if t.Config.ShouldDeleteRarFiles() { t.log.Warnf("Torrent %s is rar'ed and we cannot repair it, deleting it as configured", t.GetKey(torrent)) t.Delete(t.GetKey(torrent), true) } else { - t.log.Warnf("Torrent %s is rar'ed and we cannot repair it, skipping repair", t.GetKey(torrent)) + t.log.Warnf("Torrent %s is rar'ed and we cannot repair it", t.GetKey(torrent)) newUnassignedLinks.IterCb(func(_ string, unassigned *realdebrid.Download) { if unassigned == nil { return @@ -188,56 +230,29 @@ func (t *TorrentManager) repair(torrent *Torrent) { t.markAsUnfixable(torrent, "rar'ed by RD") t.markAsUnplayable(torrent, "rar'ed by RD") } - return + t.log.Debugf("Ending repair process early for torrent %s", t.GetKey(torrent)) + return false + } + // empty the unassigned links as we have assigned them + if torrent.UnassignedLinks.Cardinality() > 0 { + infoCache, _ := t.DirectoryMap.Get(INT_INFO_CACHE) + torrent.DownloadedIDs.Each(func(id string) bool { + info, _ := infoCache.Get(id) + info.UnassignedLinks = mapset.NewSet[string]() + t.writeTorrentToFile(id, info) + return false + }) } - // get all broken files - brokenFiles := getBrokenFiles(torrent) - fileIDs := getFileIDs(brokenFiles) - brokenFileIDs := strings.Join(fileIDs, ",") - - // first solution: reinsert and see if the broken file is now working - t.log.Debugf("repair_method#1: Trying to redownload torrent %s to repair files", t.GetKey(torrent)) - info, err := t.redownloadTorrent(torrent, "") - if err != nil { - t.log.Warnf("Cannot repair torrent %s using repair_method#1", t.GetKey(torrent)) - } else if info != nil && info.Progress != 100 { - t.log.Infof("Redownloading torrent %s after repair_method#1, it should work once done", t.GetKey(torrent)) - return - } else if info != nil && info.IsDone() && !t.isStillBroken(info, brokenFiles) { - t.log.Infof("Successfully repaired torrent %s using repair_method#1", t.GetKey(torrent)) - return - } else if info != nil && info.ID != "" { - t.log.Warnf("Torrent %s is still broken after repair_method#1, cleaning up (deleting ID=%s)", t.GetKey(torrent), info.ID) - t.Api.DeleteTorrent(info.ID) - t.ensureDelete.Add(info.ID) - } - - if torrent.UnrepairableReason != "" { - t.log.Warnf("Torrent %s has been marked as unfixable (%s), ending repair process", t.GetKey(torrent), torrent.UnrepairableReason) - return - } - - // second solution: add only the broken files - if len(brokenFiles) > 0 { - t.log.Infof("repair_method#2: Redownloading %dof%d broken files for torrent %s", len(brokenFiles), torrent.SelectedFiles.Count(), t.GetKey(torrent)) - _, err := t.redownloadTorrent(torrent, brokenFileIDs) - if err != nil { - t.log.Warnf("Cannot repair torrent %s using repair_method#2", t.GetKey(torrent)) - } else { - t.log.Infof("Successfully repaired torrent %s using repair_method#2", t.GetKey(torrent)) - } - } else { - t.log.Infof("Torrent %s has no broken files to repair", t.GetKey(torrent)) - } + return true } -func (t *TorrentManager) redownloadTorrent(torrent *Torrent, brokenFiles string) (*realdebrid.TorrentInfo, error) { - oldTorrentIDs := make([]string, 0) +func (t *TorrentManager) redownloadTorrent(torrent *Torrent, selection string) (*realdebrid.TorrentInfo, error) { // broken files means broken links - // if brokenFiles is not provided - if brokenFiles == "" { - // only replace the torrent if we are redownloading all files + // if brokenFiles is not provided, we will redownload all files + oldTorrentIDs := make([]string, 0) + if selection == "" { + // only delete the old torrent if we are redownloading all files oldTorrentIDs = torrent.DownloadedIDs.ToSlice() tmpSelection := "" torrent.SelectedFiles.IterCb(func(_ string, file *File) { @@ -246,33 +261,49 @@ func (t *TorrentManager) redownloadTorrent(torrent *Torrent, brokenFiles string) if tmpSelection == "" { return nil, nil // nothing to repair } else { - brokenFiles = tmpSelection[:len(tmpSelection)-1] + selection = tmpSelection[:len(tmpSelection)-1] } } // redownload torrent resp, err := t.Api.AddMagnetHash(torrent.Hash) if err != nil { - if strings.Contains(err.Error(), "infringing_file") { - t.markAsUnfixable(torrent, "infringing_file") + if strings.Contains(err.Error(), "infringing") { + t.markAsUnfixable(torrent, "infringing torrent") + } else if strings.Contains(err.Error(), "unsupported") { + t.markAsUnfixable(torrent, "unsupported torrent") + } else if strings.Contains(err.Error(), "unavailable") { + t.markAsUnfixable(torrent, "unavailable torrent") + } else if strings.Contains(err.Error(), "invalid") { + t.markAsUnfixable(torrent, "invalid torrent") + } else if strings.Contains(err.Error(), "big") { + t.markAsUnfixable(torrent, "torrent too big") + } else if strings.Contains(err.Error(), "allowed") { + t.markAsUnfixable(torrent, "torrent not allowed") } return nil, fmt.Errorf("cannot redownload torrent: %v", err) } + // sleep for 1 second to let RD process the magnet + time.Sleep(1 * time.Second) + // select files newTorrentID := resp.ID - err = t.Api.SelectTorrentFiles(newTorrentID, brokenFiles) + err = t.Api.SelectTorrentFiles(newTorrentID, selection) if err != nil { t.Api.DeleteTorrent(newTorrentID) - t.ensureDelete.Add(newTorrentID) + t.deleteOnceDone.Add(newTorrentID) return nil, fmt.Errorf("cannot start redownloading: %v", err) } + // sleep for 1 second to let RD process the magnet + time.Sleep(1 * time.Second) + // see if the torrent is ready info, err := t.Api.GetTorrentInfo(newTorrentID) if err != nil { t.Api.DeleteTorrent(newTorrentID) - t.ensureDelete.Add(newTorrentID) + t.deleteOnceDone.Add(newTorrentID) return nil, fmt.Errorf("cannot get info on redownloaded torrent %s (id=%s) : %v", t.GetKey(torrent), newTorrentID, err) } @@ -288,40 +319,30 @@ func (t *TorrentManager) redownloadTorrent(torrent *Torrent, brokenFiles string) } if !isOkStatus { t.Api.DeleteTorrent(newTorrentID) - t.ensureDelete.Add(newTorrentID) + t.deleteOnceDone.Add(newTorrentID) return nil, fmt.Errorf("the redownloaded torrent %s (id=%s) is in error state: %s", t.GetKey(torrent), newTorrentID, info.Status) } - if info.Progress != 100 { - t.log.Infof("Torrent %s (id=%s) is not cached anymore so we have to wait until completion (this should fix the issue already)", t.GetKey(torrent), info.ID) - if len(oldTorrentIDs) > 0 { - for _, id := range oldTorrentIDs { - t.Api.DeleteTorrent(id) - t.ensureDelete.Add(id) - } - } else { - t.fixers.Set(newTorrentID, torrent) - t.ensureDelete.Add(newTorrentID) - } - return info, nil + // check if incorrect number of links + selectionCount := len(strings.Split(selection, ",")) + if info.Progress == 100 && len(info.Links) != selectionCount { + t.Api.DeleteTorrent(newTorrentID) + t.deleteOnceDone.Add(newTorrentID) + return nil, fmt.Errorf("it did not fix the issue for %s (id=%s), only got %d files but we need %d, undoing", t.GetKey(torrent), info.ID, len(info.Links), selectionCount) } - brokenCount := len(strings.Split(brokenFiles, ",")) - if len(info.Links) != brokenCount { - t.Api.DeleteTorrent(newTorrentID) - t.ensureDelete.Add(newTorrentID) - return nil, fmt.Errorf("it did not fix the issue for %s (id=%s), only got %d files but we need %d, undoing", t.GetKey(torrent), info.ID, len(info.Links), brokenCount) - } + // looks like it's fixed if len(oldTorrentIDs) > 0 { - // only triggered when brokenFiles == "" + // replace the old torrent for _, id := range oldTorrentIDs { t.Api.DeleteTorrent(id) - t.ensureDelete.Add(id) + t.deleteOnceDone.Add(id) } } else { + // it's a fixer t.fixers.Set(newTorrentID, torrent) - t.ensureDelete.Add(newTorrentID) + t.deleteOnceDone.Add(newTorrentID) } return info, nil } @@ -390,6 +411,7 @@ func (t *TorrentManager) markAsUnfixable(torrent *Torrent, reason string) { }) } +// getBrokenFiles returns the files that are not http links and not unselect func getBrokenFiles(torrent *Torrent) []*File { var brokenFiles []*File torrent.SelectedFiles.IterCb(func(_ string, file *File) { @@ -400,6 +422,8 @@ func getBrokenFiles(torrent *Torrent) []*File { return brokenFiles } +// isStillBroken checks if the torrent is still broken +// if it's not broken anymore, it will assign the links to the files func (t *TorrentManager) isStillBroken(info *realdebrid.TorrentInfo, brokenFiles []*File) bool { var selectedFiles []*File for _, file := range info.Files { @@ -427,6 +451,7 @@ func (t *TorrentManager) isStillBroken(info *realdebrid.TorrentInfo, brokenFiles brokenFiles = append(brokenFiles, selectedFiles[len(selectedFiles)-1]) } + // check if the broken files can now be unrestricted for _, oldFile := range brokenFiles { for idx, newFile := range selectedFiles { if oldFile.Path == newFile.Path || oldFile.Bytes == newFile.Bytes { @@ -439,3 +464,38 @@ func (t *TorrentManager) isStillBroken(info *realdebrid.TorrentInfo, brokenFiles } return false } + +func (t *TorrentManager) handleFixers(fixer realdebrid.Torrent) *Torrent { + // since fixer is done, we can pop it from the fixers set + torrent, _ := t.fixers.Pop(fixer.ID) + if torrent == nil { + t.log.Warnf("repair_method#2: Fixer for %s (id=%s) is done but torrent has been deleted, deleting fixer...", fixer.Name, fixer.ID) + t.Api.DeleteTorrent(fixer.ID) + t.deleteOnceDone.Add(fixer.ID) + return nil + } + + brokenFiles := getBrokenFiles(torrent) + info, err := t.redownloadTorrent(torrent, "") // reinsert + if err != nil { + t.log.Warnf("repair_method#2: Cannot repair torrent %s (error=%s)", t.GetKey(torrent), err.Error()) + return nil + } + + if info.IsDone() { + if t.isStillBroken(info, brokenFiles) { + t.log.Warnf("repair_method#2: Fixer is done but torrent %s is still broken; let's keep the fixer", t.GetKey(torrent)) + t.Api.DeleteTorrent(info.ID) + t.deleteOnceDone.Add(fixer.ID) + return t.getMoreInfo(fixer) + } else { + t.log.Infof("Successfully repaired torrent %s using repair_method#2", t.GetKey(torrent)) + } + } else { + t.log.Infof("repair_method#2: Torrent %s is still in progress but it should work once done (torrent is temporarily hidden until download has completed)", t.GetKey(torrent)) + } + + t.Api.DeleteTorrent(fixer.ID) // delete the fixer + t.deleteOnceDone.Add(fixer.ID) + return nil +} diff --git a/internal/torrent/util.go b/internal/torrent/util.go index f09fad1..532be15 100644 --- a/internal/torrent/util.go +++ b/internal/torrent/util.go @@ -2,14 +2,15 @@ package torrent import ( "fmt" + "strings" ) -func getFileIDs(files []*File) []string { +func getFileIDs(files []*File) string { var fileIDs []string for _, file := range files { if file.ID != 0 { fileIDs = append(fileIDs, fmt.Sprintf("%d", file.ID)) } } - return fileIDs + return strings.Join(fileIDs, ",") }