diff --git a/internal/app.go b/internal/app.go index b7e75a3..496da60 100644 --- a/internal/app.go +++ b/internal/app.go @@ -57,32 +57,14 @@ func MainApp(configPath string) { } defer workerPool.Release() - // extra 1 worker for the refresh job - refreshPool, err := ants.NewPool(1) - if err != nil { - zurglog.Errorf("Failed to create refresh worker: %v", err) - os.Exit(1) - } - defer refreshPool.Release() - - var repairPool *ants.Pool - if config.EnableRepair() { - repairPool, err = ants.NewPool(1) - if err != nil { - zurglog.Errorf("Failed to create repair worker: %v", err) - os.Exit(1) - } - defer repairPool.Release() - } - utils.EnsureDirExists("data") // Ensure the data directory exists - torrentMgr := torrent.NewTorrentManager(config, rd, workerPool, refreshPool, repairPool, log.Named("manager")) + torrentMgr := torrent.NewTorrentManager(config, rd, workerPool, log.Named("manager")) downloadClient := http.NewHTTPClient(config.GetToken(), config.GetRetriesUntilFailed(), config.GetDownloadTimeoutSecs(), true, config, log.Named("dlclient")) downloader := universal.NewDownloader(downloadClient) router := chi.NewRouter() - handlers.AttachHandlers(router, downloader, torrentMgr, config, rd, workerPool, refreshPool, repairPool, log.Named("router")) + handlers.AttachHandlers(router, downloader, torrentMgr, config, rd, workerPool, log.Named("router")) // go func() { // if err := netHttp.ListenAndServe(":6060", nil); err != nil && err != netHttp.ErrServerClosed { diff --git a/internal/config/types.go b/internal/config/types.go index 65724a0..59a9875 100644 --- a/internal/config/types.go +++ b/internal/config/types.go @@ -100,7 +100,7 @@ func (z *ZurgConfig) GetProxy() string { func (z *ZurgConfig) GetNumOfWorkers() int { if z.NumOfWorkers == 0 { - return 32 + return 20 } return z.NumOfWorkers } diff --git a/internal/handlers/router.go b/internal/handlers/router.go index 7b2148b..8668100 100644 --- a/internal/handlers/router.go +++ b/internal/handlers/router.go @@ -36,16 +36,14 @@ func init() { chi.RegisterMethod("MOVE") } -func AttachHandlers(router *chi.Mux, downloader *universal.Downloader, torMgr *torrent.TorrentManager, cfg config.ConfigInterface, api *realdebrid.RealDebrid, workerPool, refreshPool, repairPool *ants.Pool, log *logutil.Logger) { +func AttachHandlers(router *chi.Mux, downloader *universal.Downloader, torMgr *torrent.TorrentManager, cfg config.ConfigInterface, api *realdebrid.RealDebrid, workerPool *ants.Pool, log *logutil.Logger) { hs := &Handlers{ - downloader: downloader, - torMgr: torMgr, - cfg: cfg, - api: api, - workerPool: workerPool, - refreshPool: refreshPool, - repairPool: repairPool, - log: log, + downloader: downloader, + torMgr: torMgr, + cfg: cfg, + api: api, + workerPool: workerPool, + log: log, } if cfg.GetUsername() != "" { diff --git a/internal/torrent/fixer.go b/internal/torrent/fixer.go index 811f16e..141202f 100644 --- a/internal/torrent/fixer.go +++ b/internal/torrent/fixer.go @@ -30,17 +30,20 @@ func (t *TorrentManager) handleFixers() { } command, _ := t.fixers.Pop(id) switch command { - case "delete": + case "delete_replaced": + t.log.Debugf("Deleting old id=%s because it's redundant to fixed %s ", id, t.GetKey(torrent)) + toDelete = append(toDelete, id) + case "delete_failed": + t.log.Debugf("Deleting failed fixer id=%s of torrent %s", id, t.GetKey(torrent)) toDelete = append(toDelete, id) case "repair": + t.log.Debugf("Repairing torrent %s again now that fixer id=%s is done", t.GetKey(torrent), id) toDelete = append(toDelete, id) - t.log.Debugf("Repairing torrent %s again now that fixer is done", t.GetKey(torrent)) repairMe, _ := allTorrents.Get(t.GetKey(torrent)) t.TriggerRepair(repairMe) } }) for _, id := range toDelete { - t.log.Debugf("Deleting fixer torrent id=%s", id) t.Api.DeleteTorrent(id) infoCache.Remove(id) t.deleteTorrentFile(id) diff --git a/internal/torrent/manager.go b/internal/torrent/manager.go index cfd9037..c8f0b75 100644 --- a/internal/torrent/manager.go +++ b/internal/torrent/manager.go @@ -31,10 +31,8 @@ type TorrentManager struct { latestState *LibraryState requiredVersion string workerPool *ants.Pool - refreshPool *ants.Pool RefreshKillSwitch chan struct{} RepairKillSwitch chan struct{} - repairPool *ants.Pool repairTrigger chan *Torrent repairSet mapset.Set[*Torrent] repairRunning bool @@ -45,7 +43,7 @@ type TorrentManager struct { // NewTorrentManager creates a new torrent manager // it will fetch all torrents and their info in the background // and store them in-memory and cached in files -func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, workerPool, refreshPool, repairPool *ants.Pool, log *logutil.Logger) *TorrentManager { +func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, workerPool *ants.Pool, log *logutil.Logger) *TorrentManager { t := &TorrentManager{ Config: cfg, Api: api, @@ -58,8 +56,6 @@ func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, w latestState: &LibraryState{}, requiredVersion: "0.9.3-hotfix.4", workerPool: workerPool, - refreshPool: refreshPool, - repairPool: repairPool, log: log, } t.fixers = t.readFixersFromFile() diff --git a/internal/torrent/refresh.go b/internal/torrent/refresh.go index 4ab290a..0fb4d42 100644 --- a/internal/torrent/refresh.go +++ b/internal/torrent/refresh.go @@ -95,7 +95,7 @@ func (t *TorrentManager) refreshTorrents() []string { // StartRefreshJob periodically refreshes the torrents func (t *TorrentManager) StartRefreshJob() { - _ = t.refreshPool.Submit(func() { + _ = t.workerPool.Submit(func() { t.log.Info("Starting periodic refresh job") refreshTicker := time.NewTicker(time.Duration(t.Config.GetRefreshEverySecs()) * time.Second) defer refreshTicker.Stop() diff --git a/internal/torrent/repair.go b/internal/torrent/repair.go index 520ec67..766c440 100644 --- a/internal/torrent/repair.go +++ b/internal/torrent/repair.go @@ -25,7 +25,7 @@ func (t *TorrentManager) StartRepairJob() { t.repairTrigger = make(chan *Torrent) t.repairSet = mapset.NewSet[*Torrent]() // there is 1 repair worker, with max 1 blocking task - _ = t.repairPool.Submit(func() { + _ = t.workerPool.Submit(func() { t.log.Info("Starting periodic repair job") repairTicker := time.NewTicker(time.Duration(t.Config.GetRepairEveryMins()) * time.Minute) defer repairTicker.Stop() @@ -211,13 +211,13 @@ func (t *TorrentManager) repair(torrent *Torrent) { // second step: download the broken files if len(brokenFiles) > 0 { t.log.Infof("Repairing by downloading only the %d broken out of %d files of torrent %s", len(brokenFiles), torrent.SelectedFiles.Count(), t.GetKey(torrent)) - info, err := t.redownloadTorrent(torrent, brokenFileIDs) + redownloadedTorrent, err := t.redownloadTorrent(torrent, brokenFileIDs) if err != nil { t.log.Warnf("Cannot repair torrent %s by downloading broken files (error=%s) giving up", t.GetKey(torrent), err.Error()) return } - if info != nil { - t.fixerAddCommand(info.ID, "repair") + if redownloadedTorrent != nil { + t.fixerAddCommand(redownloadedTorrent.ID, "repair") return } } @@ -335,14 +335,15 @@ func (t *TorrentManager) redownloadTorrent(torrent *Torrent, selection string) ( return nil, fmt.Errorf("cannot redownload torrent: %v", err) } + newTorrentID := resp.ID + // sleep for 1 second to let RD process the magnet time.Sleep(1 * time.Second) // select files - newTorrentID := resp.ID err = t.Api.SelectTorrentFiles(newTorrentID, selection) if err != nil { - t.fixerAddCommand(newTorrentID, "delete") + t.fixerAddCommand(newTorrentID, "delete_failed") return nil, fmt.Errorf("cannot start redownloading: %v", err) } @@ -352,7 +353,7 @@ func (t *TorrentManager) redownloadTorrent(torrent *Torrent, selection string) ( // see if the torrent is ready info, err := t.Api.GetTorrentInfo(newTorrentID) if err != nil { - t.fixerAddCommand(newTorrentID, "delete") + t.fixerAddCommand(newTorrentID, "delete_failed") return nil, fmt.Errorf("cannot get info on redownloaded torrent %s (id=%s) : %v", t.GetKey(torrent), newTorrentID, err) } @@ -367,14 +368,14 @@ func (t *TorrentManager) redownloadTorrent(torrent *Torrent, selection string) ( } } if !isOkStatus { - t.fixerAddCommand(newTorrentID, "delete") + t.fixerAddCommand(newTorrentID, "delete_failed") return nil, fmt.Errorf("the redownloaded torrent %s (id=%s) is in error state: %s", t.GetKey(torrent), newTorrentID, info.Status) } // check if incorrect number of links selectionCount := len(strings.Split(selection, ",")) if info.Progress == 100 && len(info.Links) != selectionCount { - t.fixerAddCommand(newTorrentID, "delete") + t.fixerAddCommand(newTorrentID, "delete_failed") return nil, fmt.Errorf("it did not fix the issue for %s (id=%s), only got %d files but we need %d, undoing", t.GetKey(torrent), info.ID, len(info.Links), selectionCount) } @@ -382,13 +383,8 @@ func (t *TorrentManager) redownloadTorrent(torrent *Torrent, selection string) ( if len(oldTorrentIDs) > 0 { // replace the old torrent (empty selection) - infoCache, _ := t.DirectoryMap.Get(INT_INFO_CACHE) for _, id := range oldTorrentIDs { - t.log.Debugf("Deleting torrent %s (id=%s) to replace with repaired torrent", t.GetKey(torrent), id) - torrent.DownloadedIDs.Remove(id) - t.Api.DeleteTorrent(id) - infoCache.Remove(id) - t.deleteTorrentFile(id) + t.fixerAddCommand(id, "delete_replaced") } } return info, nil