diff --git a/internal/app.go b/internal/app.go index 6e40e3c..27c377c 100644 --- a/internal/app.go +++ b/internal/app.go @@ -58,12 +58,15 @@ func MainApp(configPath string) { } defer workerPool.Release() - repairPool, err := ants.NewPool(1, ants.WithMaxBlockingTasks(1)) - if err != nil { - zurglog.Errorf("Failed to create repair pool: %v", err) - os.Exit(1) + var repairPool *ants.Pool + if config.EnableRepair() { + repairPool, err := ants.NewPool(1) + if err != nil { + zurglog.Errorf("Failed to create repair pool: %v", err) + os.Exit(1) + } + defer repairPool.Release() } - defer repairPool.Release() utils.EnsureDirExists("data") // Ensure the data directory exists torrentMgr := torrent.NewTorrentManager(config, rd, workerPool, repairPool, log.Named("manager")) diff --git a/internal/torrent/manager.go b/internal/torrent/manager.go index e554ded..27b2ec8 100644 --- a/internal/torrent/manager.go +++ b/internal/torrent/manager.go @@ -26,7 +26,6 @@ type TorrentManager struct { DownloadCache cmap.ConcurrentMap[string, *realdebrid.Download] DownloadMap cmap.ConcurrentMap[string, *realdebrid.Download] fixers cmap.ConcurrentMap[string, *Torrent] - repairs mapset.Set[string] deleteOnceDone mapset.Set[string] allAccessKeys mapset.Set[string] latestState *LibraryState @@ -50,7 +49,6 @@ func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, w DownloadCache: cmap.New[*realdebrid.Download](), DownloadMap: cmap.New[*realdebrid.Download](), fixers: cmap.New[*Torrent](), - repairs: mapset.NewSet[string](), deleteOnceDone: mapset.NewSet[string](), allAccessKeys: mapset.NewSet[string](), latestState: &LibraryState{}, diff --git a/internal/torrent/repair.go b/internal/torrent/repair.go index fdce862..6f3795f 100644 --- a/internal/torrent/repair.go +++ b/internal/torrent/repair.go @@ -4,6 +4,7 @@ import ( "fmt" "math" "strings" + "sync" "time" "github.com/debridmediamanager/zurg/internal/config" @@ -70,26 +71,25 @@ func (t *TorrentManager) TriggerRepair(torrent *Torrent) { } func (t *TorrentManager) repairAll(torrent *Torrent) { - t.log.Info("Periodic repair invoked; searching for broken torrents") - // todo: a more elegant way to do this - var allTorrents cmap.ConcurrentMap[string, *Torrent] + var haystack cmap.ConcurrentMap[string, *Torrent] if torrent == nil { - allTorrents, _ = t.DirectoryMap.Get(INT_ALL) + haystack, _ = t.DirectoryMap.Get(INT_ALL) + t.log.Info("Periodic repair started; searching for broken torrents") } else { - allTorrents = cmap.New[*Torrent]() - allTorrents.Set("", torrent) + haystack = cmap.New[*Torrent]() + haystack.Set("", torrent) + t.log.Infof("Repair invoked for torrent %s", t.GetKey(torrent)) } // collect all torrents that need to be repaired toRepair := mapset.NewSet[*Torrent]() - allTorrents.IterCb(func(_ string, torrent *Torrent) { + haystack.IterCb(func(_ string, torrent *Torrent) { if torrent.AnyInProgress() || torrent.AllInProgress() || torrent.UnrepairableReason != "" { return } - - // check 2: for broken files + // check 1: for broken files brokenFileIDs := mapset.NewSet[int]() torrent.SelectedFiles.IterCb(func(_ string, file *File) { if !strings.HasPrefix(file.Link, "http") && file.Link != "unselect" { @@ -101,8 +101,7 @@ func (t *TorrentManager) repairAll(torrent *Torrent) { toRepair.Add(torrent) return } - - // check 3: for expired links + // check 2: for expired links if torrent.UnassignedLinks.Cardinality() > 0 { t.log.Debugf("Torrent %s has unassigned links, adding to repair list", t.GetKey(torrent)) toRepair.Add(torrent) @@ -110,46 +109,43 @@ func (t *TorrentManager) repairAll(torrent *Torrent) { } }) - if toRepair.Cardinality() == 0 { - t.log.Info("Periodic repair found no broken torrents to repair") - } else { - t.log.Info("Periodic repair found %d broken torrents to repair in total", toRepair.Cardinality()) + t.log.Infof("Found %d broken torrents to repair in total", toRepair.Cardinality()) - toRepair.Each(func(torrent *Torrent) bool { - t.Repair(torrent) - return false - }) - } + var wg sync.WaitGroup + toRepair.Each(func(torrent *Torrent) bool { + wg.Add(1) + t.Repair(torrent, &wg) + return false + }) + wg.Wait() + t.log.Infof("Finished repairing %d broken torrents", toRepair.Cardinality()) } -func (t *TorrentManager) Repair(torrent *Torrent) { +func (t *TorrentManager) Repair(torrent *Torrent, wg *sync.WaitGroup) { if torrent.UnrepairableReason != "" { t.log.Warnf("Torrent %s is unfixable (%s), skipping repair", t.GetKey(torrent), torrent.UnrepairableReason) - return - } - if t.repairs.Contains(t.GetKey(torrent)) { - t.log.Warnf("Torrent %s is already being repaired, skipping repair", t.GetKey(torrent)) + wg.Done() return } if torrent.AnyInProgress() || torrent.AllInProgress() { t.log.Infof("Torrent %s is in progress, skipping repair until download is done", t.GetKey(torrent)) + wg.Done() return } - t.repairs.Add(t.GetKey(torrent)) - t.log.Infof("Attempting repair for torrent %s", t.GetKey(torrent)) // blocks for approx 45 minutes if active torrents are full if !t.canCapacityHandle() { t.log.Error("Blocked for too long due to limit of active torrents, cannot continue with the repair") + wg.Done() return } // assign to a worker _ = t.workerPool.Submit(func() { + defer wg.Done() t.repair(torrent) - t.repairs.Remove(t.GetKey(torrent)) }) } diff --git a/pkg/http/client.go b/pkg/http/client.go index 4cae707..a6a895b 100644 --- a/pkg/http/client.go +++ b/pkg/http/client.go @@ -28,7 +28,6 @@ type HTTPClient struct { maxRetries int timeoutSecs int backoff func(attempt int) time.Duration - shouldRetry func(resp *http.Response, reqHasRangeHeader bool, err error, rateLimitSleep int) int bearerToken string ensureIPv6Host bool cfg config.ConfigInterface @@ -58,7 +57,6 @@ func NewHTTPClient(token string, maxRetries int, timeoutSecs int, ensureIPv6Host maxRetries: maxRetries, timeoutSecs: timeoutSecs, backoff: backoffFunc, - shouldRetry: shouldRetryFunc, ensureIPv6Host: ensureIPv6Host, cfg: cfg, ipv6: cmap.New[string](), @@ -148,7 +146,20 @@ func (r *HTTPClient) Do(req *http.Request) (*http.Response, error) { attempt := 0 for { r.replaceHostIfNeeded(req) // needed for ipv6 + r.log.Debugf("downloading %s", req.URL) + timeout := time.Duration(r.cfg.GetRealDebridTimeout()) * time.Second + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + req = req.WithContext(ctx) resp, err = r.client.Do(req) + // check if error is context deadline exceeded + if r.ensureIPv6Host && r.cfg.ShouldForceIPv6() && err != nil && strings.Contains(err.Error(), "context deadline exceeded") { + attempt += 1 + if attempt > r.maxRetries { + break + } + continue + } if resp != nil && resp.StatusCode/100 >= 4 { body, _ := io.ReadAll(resp.Body) if body != nil { @@ -161,6 +172,7 @@ func (r *HTTPClient) Do(req *http.Request) (*http.Response, error) { } } incr := r.shouldRetry(resp, reqHasRangeHeader, err, r.cfg.GetRateLimitSleepSeconds()) + r.log.Debugf("got %s incr %d/%d", req.URL, incr, attempt) if incr > 0 { attempt += incr if attempt > r.maxRetries { @@ -217,7 +229,10 @@ func (r *HTTPClient) proxyDialer(proxyURL *url.URL) (proxy.Dialer, error) { return nil, fmt.Errorf("unsupported proxy scheme: %s", proxyURL.Scheme) } -func shouldRetryFunc(resp *http.Response, reqHasRangeHeader bool, err error, rateLimitSleep int) int { +func (r *HTTPClient) shouldRetry(resp *http.Response, reqHasRangeHeader bool, err error, rateLimitSleep int) int { + if err != nil { + r.log.Errorf("http error +%v", err) + } if err != nil && strings.HasPrefix(err.Error(), "api response error:") { if apiErr, ok := err.(*ApiErrorResponse); ok { switch apiErr.Code { @@ -270,13 +285,15 @@ func backoffFunc(attempt int) time.Duration { } func (r *HTTPClient) CanFetchFirstByte(url string) bool { + timeout := time.Duration(r.cfg.GetRealDebridTimeout()) * time.Second + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() req, err := http.NewRequest("GET", url, nil) if err != nil { return false } - req.Header.Set("Range", "bytes=0-0") - + req = req.WithContext(ctx) resp, err := r.client.Do(req) if err != nil { return false