diff --git a/internal/app.go b/internal/app.go index f1f8734..7405dc2 100644 --- a/internal/app.go +++ b/internal/app.go @@ -87,8 +87,13 @@ func MainApp(configPath string) { ) hosts := repo.GetOptimalHosts(config.GetNumberOfHosts(), config.ShouldForceIPv6()) + if len(hosts) == 0 { + zurglog.Fatal("No optimal hosts found. We cannot continue! (check if Real-Debrid is down or they have blocked your IP address)") + } zurglog.Debugf("Optimal hosts (%d): %v", len(hosts), hosts) + // help message zurglog.Debug("To reset optimal hosts, run 'zurg network-test' (Using docker compose? 'docker compose exec zurg ./zurg network-test')") + zurglog.Debug("To run network-test with a proxy, set the PROXY environment variable 'PROXY=http://xyz:123 zurg network-test'") downloadClient := http.NewHTTPClient( "", diff --git a/internal/handlers/home.go b/internal/handlers/home.go index 2dab313..8f1b8bc 100644 --- a/internal/handlers/home.go +++ b/internal/handlers/home.go @@ -35,7 +35,6 @@ type RootResponse struct { PID int `json:"pid"` // Process ID Sponsor SponsorResponse `json:"sponsor_zurg"` // Sponsorship links Config config.ZurgConfig `json:"config"` - ImmediateBin []string `json:"immediate_bin"` OnceDoneBin []string `json:"once_done_bin"` } @@ -87,9 +86,8 @@ func (zr *Handlers) handleHome(resp http.ResponseWriter, req *http.Request) { Github: "https://github.com/sponsors/debridmediamanager", Paypal: "https://paypal.me/yowmamasita", }, - Config: zr.cfg.GetConfig(), - ImmediateBin: zr.torMgr.ImmediateBin.ToSlice(), - OnceDoneBin: zr.torMgr.OnceDoneBin.ToSlice(), + Config: zr.cfg.GetConfig(), + OnceDoneBin: zr.torMgr.OnceDoneBin.ToSlice(), } out := ` @@ -272,11 +270,7 @@ func (zr *Handlers) handleHome(resp http.ResponseWriter, req *http.Request) { - - - - - + @@ -367,7 +361,6 @@ func (zr *Handlers) handleHome(resp http.ResponseWriter, req *http.Request) { response.Config.ShouldServeFromRclone(), response.Config.ShouldForceIPv6(), response.Config.GetOnLibraryUpdate(), - response.ImmediateBin, response.OnceDoneBin, ) diff --git a/internal/torrent/bins.go b/internal/torrent/bins.go index 50357df..7b309b6 100644 --- a/internal/torrent/bins.go +++ b/internal/torrent/bins.go @@ -31,17 +31,14 @@ func (t *TorrentManager) initializeBins() { return } - t.ImmediateBin = mapset.NewSet[string](data["trash_bin"]...) t.OnceDoneBin = mapset.NewSet[string](data["repair_bin"]...) - t.repairLog.Debugf("Bin immediately: %v", t.ImmediateBin.ToSlice()) - t.repairLog.Debugf("Bin once done: %v", t.OnceDoneBin.ToSlice()) + t.repairLog.Debugf("These IDs will be deleted after completion: %v", t.OnceDoneBin.ToSlice()) } func (t *TorrentManager) persistBins() { data := map[string]interface{}{ - "trash_bin": t.ImmediateBin.ToSlice(), // Assuming trashBin is a mapset.Set[string] - "repair_bin": t.OnceDoneBin.ToSlice(), // Assuming repairBin is a mapset.Set[string] + "repair_bin": t.OnceDoneBin.ToSlice(), // Assuming repairBin is a mapset.Set[string] } jsonData, err := json.Marshal(data) @@ -63,12 +60,6 @@ func (t *TorrentManager) persistBins() { } } -func (t *TorrentManager) setToBinImmediately(torrentId string) { - t.repairLog.Debugf("id=%s set to delete immediately", torrentId) - t.ImmediateBin.Add(torrentId) - t.persistBins() -} - func (t *TorrentManager) setToBinOnceDone(torrentId string) { t.repairLog.Debugf("id=%s set to delete once it completes", torrentId) t.OnceDoneBin.Add(torrentId) @@ -83,24 +74,19 @@ func (t *TorrentManager) setXToBinOnceYDone(deleteId, completeId string) { } func (t *TorrentManager) cleanupBins(freshIDs mapset.Set[string]) { - t.ImmediateBin.Clone().Each(func(entry string) bool { - if !freshIDs.Contains(entry) { - t.ImmediateBin.Remove(entry) - } - return false - }) t.OnceDoneBin.Clone().Each(func(entry string) bool { - // check if the entry is a special case + // check for: delete x once y is done cases if strings.Contains(entry, "-") { // format is: id1-id2 or id1- // if either id1 or id2 is not fresh, remove the entry ids := strings.Split(entry, "-") - if !freshIDs.Contains(ids[0]) || (ids[1] != "" && !freshIDs.Contains(ids[1])) { + if !freshIDs.ContainsOne(ids[0]) || (ids[1] != "" && !freshIDs.ContainsOne(ids[1])) { t.OnceDoneBin.Remove(entry) } return false } - if !freshIDs.Contains(entry) { + // check for: delete once done cases + if !freshIDs.ContainsOne(entry) { t.OnceDoneBin.Remove(entry) } return false @@ -108,41 +94,22 @@ func (t *TorrentManager) cleanupBins(freshIDs mapset.Set[string]) { t.persistBins() } -// binImmediatelyErrorCheck checks if the torrent is in the ImmediateBin and deletes it if it is. -// returns true if the torrent was in the bin and was deleted, false otherwise -func (t *TorrentManager) binImmediately(torrentId string) bool { - if t.ImmediateBin.Contains(torrentId) { - if err := t.api.DeleteTorrent(torrentId); err != nil { - t.repairLog.Warnf("Failed to delete torrent %s: %v", torrentId, err) - } - t.ImmediateBin.Remove(torrentId) - t.repairLog.Debugf("Bin: immediate deletion of torrent %s", torrentId) - t.persistBins() - return true - } - return false -} - // binOnceDoneErrorCheck checks if the torrent is in error states and then checks if it should be deleted func (t *TorrentManager) binOnceDoneErrorCheck(torrentId, status string) bool { if status == "downloading" || status == "downloaded" || status == "uploading" || status == "queued" || status == "compressing" || status == "waiting_files_selection" { return false } - t.repairLog.Infof("Bin: error status=%s, checking if %s should be deleted", status, torrentId) return t.binOnceDone(torrentId, true) } // binOnceDone checks if the torrent is in the OnceDoneBin and deletes it if it is. // returns true if the torrent was in the bin and was deleted, false otherwise func (t *TorrentManager) binOnceDone(completedTorrentId string, errorCheck bool) bool { - if t.OnceDoneBin.Contains(completedTorrentId) { - if err := t.api.DeleteTorrent(completedTorrentId); err != nil { - t.repairLog.Warnf("Failed to delete torrent %s: %v", completedTorrentId, err) - } - t.deleteInfoFile(completedTorrentId) + if t.OnceDoneBin.ContainsOne(completedTorrentId) { + t.DeleteByID(completedTorrentId) t.OnceDoneBin.Remove(completedTorrentId) if errorCheck { - t.repairLog.Errorf("Bin: error deletion of torrent %s", completedTorrentId) + t.repairLog.Infof("Bin: deleting torrent id=%s early because it has encountered an error", completedTorrentId) } else { t.repairLog.Debugf("Bin: done deletion of torrent %s", completedTorrentId) } @@ -152,7 +119,7 @@ func (t *TorrentManager) binOnceDone(completedTorrentId string, errorCheck bool) // special case: yyy-xxx means if yyy is done, delete xxx specialCase := fmt.Sprintf("%s-", completedTorrentId) - if !t.OnceDoneBin.Contains(specialCase) { + if !t.OnceDoneBin.ContainsOne(specialCase) { return false } t.deleteInfoFile(completedTorrentId) @@ -160,16 +127,12 @@ func (t *TorrentManager) binOnceDone(completedTorrentId string, errorCheck bool) t.OnceDoneBin.Clone().Each(func(entry string) bool { if strings.Contains(entry, specialCase) { if errorCheck { - if err := t.api.DeleteTorrent(completedTorrentId); err != nil { - t.repairLog.Warnf("Failed to delete torrent %s: %v", completedTorrentId, err) - } + t.DeleteByID(completedTorrentId) t.OnceDoneBin.Remove(entry) - t.repairLog.Errorf("Bin: error deletion of torrent %s", completedTorrentId) + t.repairLog.Infof("Bin: deleting torrent id=%s early because it has encountered an error", completedTorrentId) } else { idToDelete := strings.Split(entry, "-")[1] - if err := t.api.DeleteTorrent(idToDelete); err != nil { - t.repairLog.Warnf("Failed to delete torrent %s: %v", idToDelete, err) - } + t.DeleteByID(idToDelete) t.OnceDoneBin.Remove(entry) t.repairLog.Debugf("Bin: %s completed, done deletion of torrent %s", completedTorrentId, idToDelete) } diff --git a/internal/torrent/delete.go b/internal/torrent/delete.go index aa83f9c..539233d 100644 --- a/internal/torrent/delete.go +++ b/internal/torrent/delete.go @@ -25,9 +25,7 @@ func (t *TorrentManager) Delete(accessKey string, deleteInRD bool) { if torrent, ok := allTorrents.Get(accessKey); ok { if deleteInRD { torrent.DownloadedIDs.Clone().Each(func(torrentID string) bool { - t.log.Debugf("Deleting torrent %s (id=%s) in RD", accessKey, torrentID) - t.api.DeleteTorrent(torrentID) - t.deleteInfoFile(torrentID) + t.DeleteByID(torrentID) return false }) } @@ -38,3 +36,8 @@ func (t *TorrentManager) Delete(accessKey string, deleteInRD bool) { }) allTorrents.Remove(accessKey) } + +func (t *TorrentManager) DeleteByID(torrentID string) { + t.api.DeleteTorrent(torrentID) + t.deleteInfoFile(torrentID) +} diff --git a/internal/torrent/manager.go b/internal/torrent/manager.go index ec31da2..2684ceb 100644 --- a/internal/torrent/manager.go +++ b/internal/torrent/manager.go @@ -53,8 +53,8 @@ type TorrentManager struct { repairRunning bool repairRunningMu sync.Mutex - ImmediateBin mapset.Set[string] - OnceDoneBin mapset.Set[string] + OnceDoneBin mapset.Set[string] + DeleteOnCompletionBin cmap.ConcurrentMap[string, string] } // NewTorrentManager creates a new torrent manager @@ -85,8 +85,8 @@ func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, w latestState: &LibraryState{log: log}, - ImmediateBin: mapset.NewSet[string](), - OnceDoneBin: mapset.NewSet[string](), + OnceDoneBin: mapset.NewSet[string](), + DeleteOnCompletionBin: cmap.New[string](), } t.initializeBins() @@ -353,6 +353,7 @@ func (t *TorrentManager) mountNewDownloads() { } } +// StartDownloadsJob: permanent job for remounting downloads func (t *TorrentManager) StartDownloadsJob() { t.workerPool.Submit(func() { remountTicker := time.NewTicker(time.Duration(t.Config.GetDownloadsEveryMins()) * time.Minute) @@ -412,6 +413,7 @@ func copyFile(sourcePath, destPath string) error { return nil } +// StartDumpJob: permanent job for dumping torrents func (t *TorrentManager) StartDumpJob() { t.workerPool.Submit(func() { dumpTicker := time.NewTicker(time.Duration(t.Config.GetDumpTorrentsEveryMins()) * time.Minute) @@ -442,6 +444,7 @@ func (t *TorrentManager) analyzeAllTorrents() { }) } +// StartMediaAnalysisJob: permanent job for analyzing media info (triggered by the user) func (t *TorrentManager) StartMediaAnalysisJob() { t.workerPool.Submit(func() { for range t.AnalyzeTrigger { diff --git a/internal/torrent/refresh.go b/internal/torrent/refresh.go index 5ae3789..1b1b440 100644 --- a/internal/torrent/refresh.go +++ b/internal/torrent/refresh.go @@ -37,8 +37,7 @@ func (t *TorrentManager) refreshTorrents(initialRun bool) { idx := i t.workerPool.Submit(func() { defer wg.Done() - if t.binImmediately(instances[idx].ID) || - t.binOnceDoneErrorCheck(instances[idx].ID, instances[idx].Status) || + if t.binOnceDoneErrorCheck(instances[idx].ID, instances[idx].Status) || instances[idx].Progress != 100 { mergeChan <- nil return @@ -55,7 +54,7 @@ func (t *TorrentManager) refreshTorrents(initialRun bool) { allTorrents.Set(accessKey, torrent) t.writeTorrentToFile(torrent) t.assignDirectory(torrent, !initialRun) - } else if !mainTorrent.DownloadedIDs.Contains(tInfo.ID) { + } else if !mainTorrent.DownloadedIDs.ContainsOne(tInfo.ID) { forMerging = torrent } @@ -98,20 +97,21 @@ func (t *TorrentManager) refreshTorrents(initialRun bool) { t.log.Infof("Compiled into %d unique torrents", allTorrents.Count()) // delete info files that are no longer present + // it also runs binOnceDone (needed for cleanup every refresh) t.getInfoFiles().Each(func(path string) bool { path = filepath.Base(path) torrentID := strings.TrimSuffix(path, ".zurginfo") // if binOnceDone returns true, it means the info file is deleted // if false, then we check if it's one of the torrents we just fetched // if not (both are false), then we delete the info file - if !t.binOnceDone(torrentID, false) && !freshIDs.Contains(torrentID) { + if !t.binOnceDone(torrentID, false) && !freshIDs.ContainsOne(torrentID) { t.deleteInfoFile(torrentID) } return false }) + // cleans up DownloadedIDs field of all torrents t.workerPool.Submit(func() { - // update DownloadedIDs field of torrents allTorrents.IterCb(func(accessKey string, torrent *Torrent) { deletedIDs := torrent.DownloadedIDs.Difference(freshIDs) if deletedIDs.Cardinality() > 0 { @@ -167,7 +167,7 @@ func (t *TorrentManager) getMoreInfo(rdTorrent realdebrid.Torrent) *realdebrid.T func (t *TorrentManager) convertToTorrent(info *realdebrid.TorrentInfo) *Torrent { torrent := t.readTorrentFromFile("data/" + info.Hash + ".zurgtorrent") - if torrent != nil && torrent.DownloadedIDs.Contains(info.ID) { + if torrent != nil && torrent.DownloadedIDs.ContainsOne(info.ID) { return torrent } @@ -189,7 +189,7 @@ func (t *TorrentManager) convertToTorrent(info *realdebrid.TorrentInfo) *Torrent var selectedFiles []*File for _, file := range info.Files { filename := filepath.Base(file.Path) - if allFilenames.Contains(filename) { + if allFilenames.ContainsOne(filename) { dupeFilenames.Add(filename) } else { allFilenames.Add(filename) @@ -228,7 +228,7 @@ func (t *TorrentManager) convertToTorrent(info *realdebrid.TorrentInfo) *Torrent for _, file := range selectedFiles { baseFilename := t.GetPath(file) // todo better handling of duplicate filenames - if dupeFilenames.Contains(baseFilename) { + if dupeFilenames.ContainsOne(baseFilename) { extension := filepath.Ext(baseFilename) filenameNoExt := strings.TrimSuffix(baseFilename, extension) newName := fmt.Sprintf("%s (%d)%s", filenameNoExt, file.ID, extension) diff --git a/internal/torrent/repair.go b/internal/torrent/repair.go index dc10bcd..7f2ea06 100644 --- a/internal/torrent/repair.go +++ b/internal/torrent/repair.go @@ -29,6 +29,7 @@ func (t *TorrentManager) StartRepairJob() { t.RepairQueue = mapset.NewSet[*Torrent]() t.RepairAllTrigger = make(chan struct{}) + // periodic repair worker t.workerPool.Submit(func() { t.repairLog.Debug("Starting periodic repair job") repairTicker := time.NewTicker(time.Duration(t.Config.GetRepairEveryMins()) * time.Minute) @@ -46,7 +47,7 @@ func (t *TorrentManager) StartRepairJob() { } }) - // there is 1 repair worker, with max 1 blocking task + // repair worker t.workerPool.Submit(func() { for { select { @@ -90,7 +91,6 @@ func (t *TorrentManager) invokeRepair(torrent *Torrent) { t.repairRunningMu.Unlock() // Execute the repair job - time.Sleep(10 * time.Second) t.executeRepairJob(torrent) // After repair is done @@ -120,10 +120,11 @@ func (t *TorrentManager) executeRepairJob(torrent *Torrent) { var wg sync.WaitGroup haystack.IterCb(func(_ string, torrent *Torrent) { wg.Add(1) + // temp worker for finding broken torrents t.workerPool.Submit(func() { defer wg.Done() canExtract := t.Config.GetRarAction() == "extract" && strings.Contains(torrent.UnrepairableReason, "rar") - if torrent.UnrepairableReason != "" || !canExtract { + if !canExtract || torrent.UnrepairableReason != "" { return } // check 1: for broken files @@ -234,12 +235,12 @@ func (t *TorrentManager) repair(torrent *Torrent, wg *sync.WaitGroup) { } // if it's still broken, let's delete the newly downloaded torrent - t.setToBinImmediately(info.ID) + t.DeleteByID(info.ID) err = fmt.Errorf("links are still broken") } else if info != nil && info.Progress != 100 { // it's faster to download just the broken files, so let's delete the newly downloaded torrent - t.setToBinImmediately(info.ID) + t.DeleteByID(info.ID) err = fmt.Errorf("no longer cached") } @@ -278,7 +279,7 @@ func (t *TorrentManager) repair(torrent *Torrent, wg *sync.WaitGroup) { t.repairLog.Warnf("Cannot repair torrent %s by downloading broken files (error=%v) giving up", t.GetKey(torrent), err) // delete the newly downloaded torrents because the operation failed for _, newId := range newlyDownloadedIds { - t.setToBinImmediately(newId) + t.DeleteByID(newId) } return } @@ -414,10 +415,10 @@ func (t *TorrentManager) assignLinks(torrent *Torrent) bool { t.repairLog.Warnf("Torrent %s is rar'ed and we cannot repair it", t.GetKey(torrent)) t.markAsUnfixable(torrent, "rar'ed by RD") t.markAsUnplayable(torrent, "rar'ed by RD") + torrent.State.Event(context.Background(), "mark_as_repaired") } torrent.UnassignedLinks = mapset.NewSet[string]() - // torrent.State.Event(context.Background(), "mark_as_repaired") t.writeTorrentToFile(torrent) return false // end repair @@ -478,20 +479,20 @@ func (t *TorrentManager) redownloadTorrent(torrent *Torrent, selection []string) for { retries++ if retries > 10 { - t.setToBinImmediately(newTorrentID) + t.DeleteByID(newTorrentID) return nil, fmt.Errorf("cannot start redownloading: too many retries") } err = t.api.SelectTorrentFiles(newTorrentID, finalSelection) if err != nil { - t.setToBinImmediately(newTorrentID) + t.DeleteByID(newTorrentID) return nil, fmt.Errorf("cannot start redownloading: %v", err) } time.Sleep(2 * time.Second) info, err = t.api.GetTorrentInfo(newTorrentID) if err != nil { - t.setToBinImmediately(newTorrentID) + t.DeleteByID(newTorrentID) return nil, fmt.Errorf("cannot get info on redownloaded : %v", err) } @@ -504,13 +505,13 @@ func (t *TorrentManager) redownloadTorrent(torrent *Torrent, selection []string) // documented status: magnet_error, magnet_conversion, waiting_files_selection, queued, downloading, downloaded, error, virus, compressing, uploading, dead if info.Status != "downloading" && info.Status != "downloaded" && info.Status != "uploading" && info.Status != "queued" && info.Status != "compressing" { - t.setToBinImmediately(newTorrentID) + t.DeleteByID(newTorrentID) return nil, fmt.Errorf("non-OK state: %s", info.Status) } // check if incorrect number of links if info.Progress == 100 && len(info.Links) != len(selection) { - t.setToBinImmediately(newTorrentID) + t.DeleteByID(newTorrentID) return nil, fmt.Errorf("only got %d links but we need %d", len(info.Links), len(selection)) } else if info.Progress != 100 { t.repairLog.Infof("Downloading torrent %s (id=%s, progress=%d)", t.GetKey(torrent), info.ID, info.Progress) diff --git a/internal/torrent/states.go b/internal/torrent/states.go index abad8ad..db10b01 100644 --- a/internal/torrent/states.go +++ b/internal/torrent/states.go @@ -12,6 +12,9 @@ func NewTorrentState(initial string) *fsm.FSM { {Name: "break_torrent", Src: []string{"ok_torrent"}, Dst: "broken_torrent"}, // when repair has been started {Name: "repair_torrent", Src: []string{"ok_torrent", "broken_torrent"}, Dst: "under_repair_torrent"}, + // when converting to torrent + // when merging with another same hash torrent + // when a torrent is rar'ed and not extracting {Name: "mark_as_repaired", Src: []string{"broken_torrent", "under_repair_torrent"}, Dst: "ok_torrent"}, }, fsm.Callbacks{},
%v
Immediate Bin%v
Once Done BinIDs to be deleted once download completes %v