diff --git a/internal/config/types.go b/internal/config/types.go index 46e146c..d86de7d 100644 --- a/internal/config/types.go +++ b/internal/config/types.go @@ -177,21 +177,21 @@ func (z *ZurgConfig) EnableDownloadMount() bool { func (z *ZurgConfig) GetApiTimeoutSecs() int { if z.ApiTimeoutSecs == 0 { - return 30 + return 15 } return z.ApiTimeoutSecs } func (z *ZurgConfig) GetDownloadTimeoutSecs() int { if z.DownloadTimeoutSecs == 0 { - return 15 + return 10 } return z.DownloadTimeoutSecs } func (z *ZurgConfig) GetRateLimitSleepSecs() int { if z.RateLimitSleepSecs == 0 { - return 4 + return 6 } return z.RateLimitSleepSecs } diff --git a/internal/handlers/home.go b/internal/handlers/home.go index 8e271c1..6d58c2e 100644 --- a/internal/handlers/home.go +++ b/internal/handlers/home.go @@ -243,6 +243,8 @@ func (zr *Handlers) handleHome(resp http.ResponseWriter, req *http.Request) { + + @@ -310,6 +312,20 @@ func (zr *Handlers) handleRebootWorkerPool(resp http.ResponseWriter, req *http.R fmt.Fprint(resp, "Rebooted worker pool, please close this window") } +func (zr *Handlers) handleRebootRefreshPool(resp http.ResponseWriter, req *http.Request) { + zr.torMgr.RefreshKillSwitch <- struct{}{} + zr.torMgr.StartRefreshJob() + zr.log.Infof("Rebooted refresh worker") + fmt.Fprint(resp, "Rebooted refresh worker, please close this window") +} + +func (zr *Handlers) handleRebootRepairPool(resp http.ResponseWriter, req *http.Request) { + zr.torMgr.RepairKillSwitch <- struct{}{} + zr.torMgr.StartRepairJob() + zr.log.Infof("Rebooted repair worker") + fmt.Fprint(resp, "Rebooted repair worker, please close this window") +} + func bToMb(b uint64) uint64 { return b / 1024 / 1024 } diff --git a/internal/handlers/router.go b/internal/handlers/router.go index 84b7a9c..7b2148b 100644 --- a/internal/handlers/router.go +++ b/internal/handlers/router.go @@ -55,6 +55,8 @@ func AttachHandlers(router *chi.Mux, downloader *universal.Downloader, torMgr *t router.Get("/", hs.handleHome) router.Get("/reboot/worker", hs.handleRebootWorkerPool) + router.Get("/reboot/refresh", hs.handleRebootRefreshPool) + router.Get("/reboot/repair", hs.handleRebootRepairPool) // version router.Get(fmt.Sprintf("/{mountType}/%s", version.FILE), hs.handleVersionFile) router.Head(fmt.Sprintf("/{mountType}/%s", version.FILE), hs.handleCheckVersionFile) diff --git a/internal/torrent/manager.go b/internal/torrent/manager.go index 51e7c38..53ea670 100644 --- a/internal/torrent/manager.go +++ b/internal/torrent/manager.go @@ -21,24 +21,26 @@ const ( ) type TorrentManager struct { - Config config.ConfigInterface - Api *realdebrid.RealDebrid - DirectoryMap cmap.ConcurrentMap[string, cmap.ConcurrentMap[string, *Torrent]] // directory -> accessKey -> Torrent - DownloadCache cmap.ConcurrentMap[string, *realdebrid.Download] - DownloadMap cmap.ConcurrentMap[string, *realdebrid.Download] - fixers cmap.ConcurrentMap[string, *Torrent] - deleteOnceDone mapset.Set[string] - allAccessKeys mapset.Set[string] - latestState *LibraryState - requiredVersion string - workerPool *ants.Pool - refreshPool *ants.Pool - repairPool *ants.Pool - repairTrigger chan *Torrent - repairSet mapset.Set[*Torrent] - repairRunning bool - repairRunningMu sync.Mutex - log *logutil.Logger + Config config.ConfigInterface + Api *realdebrid.RealDebrid + DirectoryMap cmap.ConcurrentMap[string, cmap.ConcurrentMap[string, *Torrent]] // directory -> accessKey -> Torrent + DownloadCache cmap.ConcurrentMap[string, *realdebrid.Download] + DownloadMap cmap.ConcurrentMap[string, *realdebrid.Download] + fixers cmap.ConcurrentMap[string, *Torrent] + deleteOnceDone mapset.Set[string] + allAccessKeys mapset.Set[string] + latestState *LibraryState + requiredVersion string + workerPool *ants.Pool + refreshPool *ants.Pool + RefreshKillSwitch chan struct{} + RepairKillSwitch chan struct{} + repairPool *ants.Pool + repairTrigger chan *Torrent + repairSet mapset.Set[*Torrent] + repairRunning bool + repairRunningMu sync.Mutex + log *logutil.Logger } // NewTorrentManager creates a new torrent manager @@ -46,20 +48,22 @@ type TorrentManager struct { // and store them in-memory and cached in files func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, workerPool, refreshPool, repairPool *ants.Pool, log *logutil.Logger) *TorrentManager { t := &TorrentManager{ - Config: cfg, - Api: api, - DirectoryMap: cmap.New[cmap.ConcurrentMap[string, *Torrent]](), - DownloadCache: cmap.New[*realdebrid.Download](), - DownloadMap: cmap.New[*realdebrid.Download](), - fixers: cmap.New[*Torrent](), - deleteOnceDone: mapset.NewSet[string](), - allAccessKeys: mapset.NewSet[string](), - latestState: &LibraryState{}, - requiredVersion: "0.9.3-hotfix.3", - workerPool: workerPool, - refreshPool: refreshPool, - repairPool: repairPool, - log: log, + Config: cfg, + Api: api, + DirectoryMap: cmap.New[cmap.ConcurrentMap[string, *Torrent]](), + DownloadCache: cmap.New[*realdebrid.Download](), + DownloadMap: cmap.New[*realdebrid.Download](), + RefreshKillSwitch: make(chan struct{}, 1), + RepairKillSwitch: make(chan struct{}, 1), + fixers: cmap.New[*Torrent](), + deleteOnceDone: mapset.NewSet[string](), + allAccessKeys: mapset.NewSet[string](), + latestState: &LibraryState{}, + requiredVersion: "0.9.3-hotfix.3", + workerPool: workerPool, + refreshPool: refreshPool, + repairPool: repairPool, + log: log, } t.initializeDirectories() diff --git a/internal/torrent/refresh.go b/internal/torrent/refresh.go index 719d36b..0ae3821 100644 --- a/internal/torrent/refresh.go +++ b/internal/torrent/refresh.go @@ -123,20 +123,27 @@ func (t *TorrentManager) refreshTorrents() []string { func (t *TorrentManager) StartRefreshJob() { _ = t.refreshPool.Submit(func() { t.log.Info("Starting periodic refresh job") + refreshTicker := time.NewTicker(time.Duration(t.Config.GetRefreshEverySecs()) * time.Second) + defer refreshTicker.Stop() + for { - <-time.After(time.Duration(t.Config.GetRefreshEverySecs()) * time.Second) + select { + case <-refreshTicker.C: + checksum := t.getCurrentState() + if t.latestState.equal(checksum) { + continue + } + t.SetNewLatestState(checksum) + t.log.Infof("Detected changes! Refreshing %d torrents", checksum.TotalCount) - checksum := t.getCurrentState() - if t.latestState.equal(checksum) { - continue + updatedPaths := t.refreshTorrents() + t.log.Info("Finished refreshing torrents") + + t.TriggerHookOnLibraryUpdate(updatedPaths) + case <-t.RefreshKillSwitch: + t.log.Info("Stopping periodic refresh job") + return } - t.SetNewLatestState(checksum) - t.log.Infof("Detected changes! Refreshing %d torrents", checksum.TotalCount) - - updatedPaths := t.refreshTorrents() - t.log.Info("Finished refreshing torrents") - - t.TriggerHookOnLibraryUpdate(updatedPaths) } }) } diff --git a/internal/torrent/repair.go b/internal/torrent/repair.go index 3955516..d8c070b 100644 --- a/internal/torrent/repair.go +++ b/internal/torrent/repair.go @@ -37,6 +37,9 @@ func (t *TorrentManager) StartRepairJob() { case torrent := <-t.repairTrigger: // On-demand trigger with a specific torrent t.invokeRepair(torrent) + case <-t.RepairKillSwitch: + t.log.Info("Stopping periodic repair job") + return } } }) diff --git a/pkg/http/client.go b/pkg/http/client.go index 1c48224..67a2abe 100644 --- a/pkg/http/client.go +++ b/pkg/http/client.go @@ -248,7 +248,7 @@ func (r *HTTPClient) shouldRetry(resp *http.Response, reqHasRangeHeader bool, er } } } - if err != nil && strings.Contains(err.Error(), "timeout awaiting response headers") { + if err != nil && strings.Contains(err.Error(), "timeout") { return 1 } if resp != nil {