From 2a5f12e37fde93b9c7abcd45fcba1e9507809202 Mon Sep 17 00:00:00 2001 From: Ben Sarmiento Date: Wed, 22 May 2024 04:27:12 +0200 Subject: [PATCH] Remove fixer concept --- internal/torrent/fixer.go | 141 ------------------------------------ internal/torrent/manager.go | 6 +- internal/torrent/refresh.go | 55 ++++++-------- internal/torrent/repair.go | 36 ++++----- 4 files changed, 40 insertions(+), 198 deletions(-) delete mode 100644 internal/torrent/fixer.go diff --git a/internal/torrent/fixer.go b/internal/torrent/fixer.go deleted file mode 100644 index 2a4ef00..0000000 --- a/internal/torrent/fixer.go +++ /dev/null @@ -1,141 +0,0 @@ -package torrent - -import ( - "io" - "os" - - "github.com/debridmediamanager/zurg/pkg/realdebrid" - cmap "github.com/orcaman/concurrent-map/v2" -) - -// fixers are commands that will be run on the next refresh -// they are stored in a file so that they can be run on startup -// they follow the format of: -// key: value: -// id_trigger: this means a specific torrent id's completion -// commands: delete | repair - -func (t *TorrentManager) registerFixer(torrentId, command string) { - t.log.Debugf("Adding fixer command: %s %s", torrentId, command) - t.fixers.Set(torrentId, command) - t.writeFixersToFile() -} - -func (t *TorrentManager) processFixers(instances []realdebrid.Torrent) { - t.log.Debugf("Processing fixers (%d left: %v)", t.fixers.Count(), t.fixers.Keys()) - var toDelete []string - var toRedownload []*Torrent - allTorrents, _ := t.DirectoryMap.Get(INT_ALL) - for _, instance := range instances { - if !t.fixers.Has(instance.ID) { - continue - } - - oldTorrentId := instance.ID - command, _ := t.fixers.Pop(oldTorrentId) // delete the fixer if it's done - switch command { - - case "replaced": // id is old torrent id - t.log.Debugf("Deleting old id=%s because it's redundant to fixed torrent %s ", oldTorrentId, instance.Name) - toDelete = append(toDelete, oldTorrentId) - continue - - case "download_failed": // id is failed fixer id - t.log.Debugf("Deleting failed fixer id=%s of torrent %s", oldTorrentId, instance.Name) - toDelete = append(toDelete, oldTorrentId) - continue - - case "repaired": // this torrent contains broken files - if instance.Progress != 100 { - t.fixers.Set(oldTorrentId, command) // requeue the fixer, it's not done yet - continue - } - - fixedTorrent := t.getMoreInfo(instance) - t.log.Debugf("Repairing torrent %s again now that fixer id=%s is done", t.GetKey(fixedTorrent), oldTorrentId) - repairMe, _ := allTorrents.Get(t.GetKey(fixedTorrent)) - toRedownload = append(toRedownload, repairMe) - toDelete = append(toDelete, oldTorrentId) - continue - } - } - - for _, id := range toDelete { - t.api.DeleteTorrent(id) - t.deleteInfoFile(id) - } - - for _, torrent := range toRedownload { - t.redownloadTorrent(torrent, []string{}) - } - - t.writeFixersToFile() - - t.log.Debugf("Finished processing fixers") -} - -func (t *TorrentManager) removeExpiredFixers(instances []realdebrid.Torrent) { - fixers := t.fixers.Keys() - for _, fixerID := range fixers { - found := false - for _, instance := range instances { - if instance.ID == fixerID { - found = true - break - } - } - if !found { - t.log.Debugf("Removing expired fixer id=%s", fixerID) - t.fixers.Remove(fixerID) - } - } -} - -func (t *TorrentManager) writeFixersToFile() { - filePath := "data/fixers.json" - file, err := os.Create(filePath) - if err != nil { - t.log.Warnf("Cannot create fixer file %s: %v", filePath, err) - return - } - defer file.Close() - - fileData, err := t.fixers.MarshalJSON() - if err != nil { - t.log.Warnf("Cannot marshal fixers: %v", err) - return - } - - _, err = file.Write(fileData) - if err != nil { - t.log.Warnf("Cannot write to fixer file %s: %v", filePath, err) - return - } -} - -func (t *TorrentManager) readFixersFromFile() (ret cmap.ConcurrentMap[string, string]) { - ret = cmap.New[string]() - filePath := "data/fixers.json" - file, err := os.Open(filePath) - if err != nil { - if os.IsNotExist(err) { - return - } - t.log.Warnf("Cannot open fixer file %s: %v", filePath, err) - return - } - defer file.Close() - fileData, err := io.ReadAll(file) - if err != nil { - t.log.Warnf("Cannot read fixer file %s: %v", filePath, err) - return - } - - err = ret.UnmarshalJSON(fileData) - if err != nil { - t.log.Warnf("Cannot unmarshal fixer file %s: %v", filePath, err) - return - } - - return -} diff --git a/internal/torrent/manager.go b/internal/torrent/manager.go index 646edc1..192de02 100644 --- a/internal/torrent/manager.go +++ b/internal/torrent/manager.go @@ -40,11 +40,11 @@ type TorrentManager struct { latestState *LibraryState - fixers cmap.ConcurrentMap[string, string] // trigger -> [command, id] repairTrigger chan *Torrent repairSet mapset.Set[*Torrent] repairRunning bool repairRunningMu sync.Mutex + trashBin mapset.Set[string] } // NewTorrentManager creates a new torrent manager @@ -71,10 +71,10 @@ func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, w latestState: &LibraryState{log: log}, } - t.fixers = t.readFixersFromFile() + t.trashBin = mapset.NewSet[string]() t.initializeDirectories() t.workerPool.Submit(func() { - t.refreshTorrents(true) + t.refreshTorrents() t.setNewLatestState(t.getCurrentState()) t.StartRefreshJob() t.StartRepairJob() diff --git a/internal/torrent/refresh.go b/internal/torrent/refresh.go index 8b4a09d..ef8e7a4 100644 --- a/internal/torrent/refresh.go +++ b/internal/torrent/refresh.go @@ -15,7 +15,7 @@ import ( cmap "github.com/orcaman/concurrent-map/v2" ) -func (t *TorrentManager) refreshTorrents(isInitialRun bool) []string { +func (t *TorrentManager) refreshTorrents() []string { instances, _, err := t.api.GetTorrents(false) if err != nil { t.log.Warnf("Cannot get torrents: %v", err) @@ -26,6 +26,11 @@ func (t *TorrentManager) refreshTorrents(isInitialRun bool) []string { var wg sync.WaitGroup for i := range instances { + if t.trashBin.Contains(instances[i].ID) { + t.api.DeleteTorrent(instances[i].ID) + t.log.Infof("Skipping trashed torrent %s", instances[i].Name) + torChan <- nil + } idx := i wg.Add(1) _ = t.workerPool.Submit(func() { @@ -128,15 +133,6 @@ func (t *TorrentManager) refreshTorrents(isInitialRun bool) []string { return false }) - if t.Config.EnableRepair() { - if isInitialRun { - t.removeExpiredFixers(instances) - } - t.workerPool.Submit(func() { - t.processFixers(instances) - }) - } - return updatedPaths } @@ -157,7 +153,7 @@ func (t *TorrentManager) StartRefreshJob() { t.log.Infof("Detected changes! Refreshing %d torrents", checksum.TotalCount) t.setNewLatestState(checksum) - updatedPaths := t.refreshTorrents(false) + updatedPaths := t.refreshTorrents() t.log.Info("Finished refreshing torrents") t.TriggerHookOnLibraryUpdate(updatedPaths) @@ -294,36 +290,26 @@ func (t *TorrentManager) mergeToMain(existing, toMerge *Torrent) *Torrent { Rename: newer.Rename, Hash: newer.Hash, Added: newer.Added, - - Components: mergedComponents, - UnrepairableReason: newer.UnrepairableReason, + Components: mergedComponents, State: older.State, } // unrepairable reason - if mainTorrent.UnrepairableReason != "" && older.UnrepairableReason != "" && mainTorrent.UnrepairableReason != older.UnrepairableReason { - mainTorrent.UnrepairableReason = fmt.Sprintf("%s, %s", mainTorrent.UnrepairableReason, older.UnrepairableReason) - } else if older.UnrepairableReason != "" { - mainTorrent.UnrepairableReason = older.UnrepairableReason - } + reasons := mapset.NewSet[string]() + reasons.Add(older.UnrepairableReason) + reasons.Add(newer.UnrepairableReason) + mainTorrent.UnrepairableReason = strings.Join(reasons.ToSlice(), ", ") - // the link can have the following values - // 1. https://*** - the file is available - // 3. empty - the file is not available mainTorrent.SelectedFiles = cmap.New[*File]() - newer.SelectedFiles.IterCb(func(key string, newerFile *File) { - mainTorrent.SelectedFiles.Set(key, newerFile) - }) older.SelectedFiles.IterCb(func(key string, olderFile *File) { - if !mainTorrent.SelectedFiles.Has(key) { - mainTorrent.SelectedFiles.Set(key, olderFile) - } else if olderFile.State.Is("deleted_file") { - newerFile, _ := mainTorrent.SelectedFiles.Get(key) - if err := newerFile.State.Event(context.Background(), "delete_file"); err != nil { - t.log.Errorf("Cannot delete file %s: %v", key, err) - } + mainTorrent.SelectedFiles.Set(key, olderFile) + }) + newer.SelectedFiles.IterCb(func(key string, newerFile *File) { + if f, ok := mainTorrent.SelectedFiles.Get(key); ok && f.State.Is("deleted_file") { + return } + mainTorrent.SelectedFiles.Set(key, newerFile) }) t.CheckDeletedStatus(&mainTorrent) @@ -393,3 +379,8 @@ func (t *TorrentManager) IsPlayable(filePath string) bool { } return false } + +func (t *TorrentManager) trash(torrentId string) { + t.log.Debugf("Trash: %s", torrentId) + t.trashBin.Add(torrentId) +} diff --git a/internal/torrent/repair.go b/internal/torrent/repair.go index a32335e..d063c5f 100644 --- a/internal/torrent/repair.go +++ b/internal/torrent/repair.go @@ -176,6 +176,13 @@ func (t *TorrentManager) repair(torrent *Torrent) { t.log.Errorf("Cannot repair torrent %s: %v", torrent.Hash, err) return } + // delete old torrents + for id := range torrent.Components { + if id == info.ID { + continue + } + t.api.DeleteTorrent(id) + } t.log.Infof("Successfully repaired torrent %s by redownloading all files", t.GetKey(torrent)) return } else if info != nil && info.Progress != 100 { @@ -205,11 +212,6 @@ func (t *TorrentManager) repair(torrent *Torrent) { t.log.Infof("Repairing by downloading %d batches of the %d broken files of torrent %s", int(math.Ceil(float64(len(brokenFiles))/130)), len(brokenFiles), t.GetKey(torrent)) - oldTorrentIDs := []string{} - for id := range torrent.Components { - oldTorrentIDs = append(oldTorrentIDs, id) - } - newlyDownloadedIds := make([]string, 0) group := make([]*File, 0) batchNum := 1 @@ -223,7 +225,7 @@ func (t *TorrentManager) repair(torrent *Torrent) { if err != nil { t.log.Warnf("Cannot repair torrent %s by downloading broken files (error=%s) giving up", t.GetKey(torrent), err.Error()) for _, newId := range newlyDownloadedIds { - t.registerFixer(newId, "download_failed") + t.trash(newId) } return } @@ -240,15 +242,13 @@ func (t *TorrentManager) repair(torrent *Torrent) { if err != nil { t.log.Warnf("Cannot repair torrent %s by downloading broken files (error=%s) giving up", t.GetKey(torrent), err.Error()) for _, newId := range newlyDownloadedIds { - t.registerFixer(newId, "download_failed") + t.trash(newId) } return } } - for _, oldId := range oldTorrentIDs { - t.registerFixer(oldId, "replaced") - } + /// TODO: should we delete the old torrents that were replaced? } func (t *TorrentManager) assignUnassignedLinks(torrent *Torrent) bool { @@ -350,14 +350,9 @@ func (t *TorrentManager) assignUnassignedLinks(torrent *Torrent) bool { func (t *TorrentManager) redownloadTorrent(torrent *Torrent, selection []string) (*realdebrid.TorrentInfo, error) { // broken files means broken links // if brokenFiles is not provided, we will redownload all files - oldTorrentIDs := make([]string, 0) finalSelection := strings.Join(selection, ",") selectionCount := len(selection) if selectionCount == 0 { - // only delete the old torrent if we are redownloading all files - for id := range torrent.Components { - oldTorrentIDs = append(oldTorrentIDs, id) - } tmpSelection := "" torrent.SelectedFiles.IterCb(func(_ string, file *File) { tmpSelection += fmt.Sprintf("%d,", file.ID) // select all files @@ -399,7 +394,7 @@ func (t *TorrentManager) redownloadTorrent(torrent *Torrent, selection []string) // select files err = t.api.SelectTorrentFiles(newTorrentID, finalSelection) if err != nil { - t.registerFixer(newTorrentID, "download_failed") + t.trash(newTorrentID) return nil, fmt.Errorf("cannot start redownloading torrent %s (id=%s): %v", t.GetKey(torrent), newTorrentID, err) } // sleep for 2 second to let RD process the magnet @@ -408,7 +403,7 @@ func (t *TorrentManager) redownloadTorrent(torrent *Torrent, selection []string) // see if the torrent is ready info, err = t.api.GetTorrentInfo(newTorrentID) if err != nil { - t.registerFixer(newTorrentID, "download_failed") + t.trash(newTorrentID) return nil, fmt.Errorf("cannot get info on redownloaded torrent %s (id=%s) : %v", t.GetKey(torrent), newTorrentID, err) } @@ -434,20 +429,17 @@ func (t *TorrentManager) redownloadTorrent(torrent *Torrent, selection []string) } if !isOkStatus { - t.registerFixer(info.ID, "download_failed") + t.trash(info.ID) return nil, fmt.Errorf("the redownloaded torrent %s is in a non-OK state: %s", t.GetKey(torrent), info.Status) } // check if incorrect number of links if info.Progress == 100 && len(info.Links) != selectionCount { - t.registerFixer(newTorrentID, "download_failed") + t.trash(newTorrentID) return nil, fmt.Errorf("torrent %s only got %d links but we need %d", t.GetKey(torrent), len(info.Links), selectionCount) } t.log.Infof("Redownloading torrent %s successful (id=%s, progress=%d)", t.GetKey(torrent), info.ID, info.Progress) - for _, id := range oldTorrentIDs { - t.registerFixer(id, "replaced") - } return info, nil }