Add context deadline to request

This commit is contained in:
Ben Sarmiento
2024-01-27 21:09:00 +01:00
parent 1aabcfd322
commit ce4b794098
4 changed files with 54 additions and 40 deletions

View File

@@ -58,12 +58,15 @@ func MainApp(configPath string) {
} }
defer workerPool.Release() defer workerPool.Release()
repairPool, err := ants.NewPool(1, ants.WithMaxBlockingTasks(1)) var repairPool *ants.Pool
if err != nil { if config.EnableRepair() {
zurglog.Errorf("Failed to create repair pool: %v", err) repairPool, err := ants.NewPool(1)
os.Exit(1) if err != nil {
zurglog.Errorf("Failed to create repair pool: %v", err)
os.Exit(1)
}
defer repairPool.Release()
} }
defer repairPool.Release()
utils.EnsureDirExists("data") // Ensure the data directory exists utils.EnsureDirExists("data") // Ensure the data directory exists
torrentMgr := torrent.NewTorrentManager(config, rd, workerPool, repairPool, log.Named("manager")) torrentMgr := torrent.NewTorrentManager(config, rd, workerPool, repairPool, log.Named("manager"))

View File

@@ -26,7 +26,6 @@ type TorrentManager struct {
DownloadCache cmap.ConcurrentMap[string, *realdebrid.Download] DownloadCache cmap.ConcurrentMap[string, *realdebrid.Download]
DownloadMap cmap.ConcurrentMap[string, *realdebrid.Download] DownloadMap cmap.ConcurrentMap[string, *realdebrid.Download]
fixers cmap.ConcurrentMap[string, *Torrent] fixers cmap.ConcurrentMap[string, *Torrent]
repairs mapset.Set[string]
deleteOnceDone mapset.Set[string] deleteOnceDone mapset.Set[string]
allAccessKeys mapset.Set[string] allAccessKeys mapset.Set[string]
latestState *LibraryState latestState *LibraryState
@@ -50,7 +49,6 @@ func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, w
DownloadCache: cmap.New[*realdebrid.Download](), DownloadCache: cmap.New[*realdebrid.Download](),
DownloadMap: cmap.New[*realdebrid.Download](), DownloadMap: cmap.New[*realdebrid.Download](),
fixers: cmap.New[*Torrent](), fixers: cmap.New[*Torrent](),
repairs: mapset.NewSet[string](),
deleteOnceDone: mapset.NewSet[string](), deleteOnceDone: mapset.NewSet[string](),
allAccessKeys: mapset.NewSet[string](), allAccessKeys: mapset.NewSet[string](),
latestState: &LibraryState{}, latestState: &LibraryState{},

View File

@@ -4,6 +4,7 @@ import (
"fmt" "fmt"
"math" "math"
"strings" "strings"
"sync"
"time" "time"
"github.com/debridmediamanager/zurg/internal/config" "github.com/debridmediamanager/zurg/internal/config"
@@ -70,26 +71,25 @@ func (t *TorrentManager) TriggerRepair(torrent *Torrent) {
} }
func (t *TorrentManager) repairAll(torrent *Torrent) { func (t *TorrentManager) repairAll(torrent *Torrent) {
t.log.Info("Periodic repair invoked; searching for broken torrents")
// todo: a more elegant way to do this // todo: a more elegant way to do this
var allTorrents cmap.ConcurrentMap[string, *Torrent] var haystack cmap.ConcurrentMap[string, *Torrent]
if torrent == nil { if torrent == nil {
allTorrents, _ = t.DirectoryMap.Get(INT_ALL) haystack, _ = t.DirectoryMap.Get(INT_ALL)
t.log.Info("Periodic repair started; searching for broken torrents")
} else { } else {
allTorrents = cmap.New[*Torrent]() haystack = cmap.New[*Torrent]()
allTorrents.Set("", torrent) haystack.Set("", torrent)
t.log.Infof("Repair invoked for torrent %s", t.GetKey(torrent))
} }
// collect all torrents that need to be repaired // collect all torrents that need to be repaired
toRepair := mapset.NewSet[*Torrent]() toRepair := mapset.NewSet[*Torrent]()
allTorrents.IterCb(func(_ string, torrent *Torrent) { haystack.IterCb(func(_ string, torrent *Torrent) {
if torrent.AnyInProgress() || torrent.AllInProgress() || torrent.UnrepairableReason != "" { if torrent.AnyInProgress() || torrent.AllInProgress() || torrent.UnrepairableReason != "" {
return return
} }
// check 1: for broken files
// check 2: for broken files
brokenFileIDs := mapset.NewSet[int]() brokenFileIDs := mapset.NewSet[int]()
torrent.SelectedFiles.IterCb(func(_ string, file *File) { torrent.SelectedFiles.IterCb(func(_ string, file *File) {
if !strings.HasPrefix(file.Link, "http") && file.Link != "unselect" { if !strings.HasPrefix(file.Link, "http") && file.Link != "unselect" {
@@ -101,8 +101,7 @@ func (t *TorrentManager) repairAll(torrent *Torrent) {
toRepair.Add(torrent) toRepair.Add(torrent)
return return
} }
// check 2: for expired links
// check 3: for expired links
if torrent.UnassignedLinks.Cardinality() > 0 { if torrent.UnassignedLinks.Cardinality() > 0 {
t.log.Debugf("Torrent %s has unassigned links, adding to repair list", t.GetKey(torrent)) t.log.Debugf("Torrent %s has unassigned links, adding to repair list", t.GetKey(torrent))
toRepair.Add(torrent) toRepair.Add(torrent)
@@ -110,46 +109,43 @@ func (t *TorrentManager) repairAll(torrent *Torrent) {
} }
}) })
if toRepair.Cardinality() == 0 { t.log.Infof("Found %d broken torrents to repair in total", toRepair.Cardinality())
t.log.Info("Periodic repair found no broken torrents to repair")
} else {
t.log.Info("Periodic repair found %d broken torrents to repair in total", toRepair.Cardinality())
toRepair.Each(func(torrent *Torrent) bool { var wg sync.WaitGroup
t.Repair(torrent) toRepair.Each(func(torrent *Torrent) bool {
return false wg.Add(1)
}) t.Repair(torrent, &wg)
} return false
})
wg.Wait()
t.log.Infof("Finished repairing %d broken torrents", toRepair.Cardinality())
} }
func (t *TorrentManager) Repair(torrent *Torrent) { func (t *TorrentManager) Repair(torrent *Torrent, wg *sync.WaitGroup) {
if torrent.UnrepairableReason != "" { if torrent.UnrepairableReason != "" {
t.log.Warnf("Torrent %s is unfixable (%s), skipping repair", t.GetKey(torrent), torrent.UnrepairableReason) t.log.Warnf("Torrent %s is unfixable (%s), skipping repair", t.GetKey(torrent), torrent.UnrepairableReason)
return wg.Done()
}
if t.repairs.Contains(t.GetKey(torrent)) {
t.log.Warnf("Torrent %s is already being repaired, skipping repair", t.GetKey(torrent))
return return
} }
if torrent.AnyInProgress() || torrent.AllInProgress() { if torrent.AnyInProgress() || torrent.AllInProgress() {
t.log.Infof("Torrent %s is in progress, skipping repair until download is done", t.GetKey(torrent)) t.log.Infof("Torrent %s is in progress, skipping repair until download is done", t.GetKey(torrent))
wg.Done()
return return
} }
t.repairs.Add(t.GetKey(torrent))
t.log.Infof("Attempting repair for torrent %s", t.GetKey(torrent)) t.log.Infof("Attempting repair for torrent %s", t.GetKey(torrent))
// blocks for approx 45 minutes if active torrents are full // blocks for approx 45 minutes if active torrents are full
if !t.canCapacityHandle() { if !t.canCapacityHandle() {
t.log.Error("Blocked for too long due to limit of active torrents, cannot continue with the repair") t.log.Error("Blocked for too long due to limit of active torrents, cannot continue with the repair")
wg.Done()
return return
} }
// assign to a worker // assign to a worker
_ = t.workerPool.Submit(func() { _ = t.workerPool.Submit(func() {
defer wg.Done()
t.repair(torrent) t.repair(torrent)
t.repairs.Remove(t.GetKey(torrent))
}) })
} }

View File

@@ -28,7 +28,6 @@ type HTTPClient struct {
maxRetries int maxRetries int
timeoutSecs int timeoutSecs int
backoff func(attempt int) time.Duration backoff func(attempt int) time.Duration
shouldRetry func(resp *http.Response, reqHasRangeHeader bool, err error, rateLimitSleep int) int
bearerToken string bearerToken string
ensureIPv6Host bool ensureIPv6Host bool
cfg config.ConfigInterface cfg config.ConfigInterface
@@ -58,7 +57,6 @@ func NewHTTPClient(token string, maxRetries int, timeoutSecs int, ensureIPv6Host
maxRetries: maxRetries, maxRetries: maxRetries,
timeoutSecs: timeoutSecs, timeoutSecs: timeoutSecs,
backoff: backoffFunc, backoff: backoffFunc,
shouldRetry: shouldRetryFunc,
ensureIPv6Host: ensureIPv6Host, ensureIPv6Host: ensureIPv6Host,
cfg: cfg, cfg: cfg,
ipv6: cmap.New[string](), ipv6: cmap.New[string](),
@@ -148,7 +146,20 @@ func (r *HTTPClient) Do(req *http.Request) (*http.Response, error) {
attempt := 0 attempt := 0
for { for {
r.replaceHostIfNeeded(req) // needed for ipv6 r.replaceHostIfNeeded(req) // needed for ipv6
r.log.Debugf("downloading %s", req.URL)
timeout := time.Duration(r.cfg.GetRealDebridTimeout()) * time.Second
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
req = req.WithContext(ctx)
resp, err = r.client.Do(req) resp, err = r.client.Do(req)
// check if error is context deadline exceeded
if r.ensureIPv6Host && r.cfg.ShouldForceIPv6() && err != nil && strings.Contains(err.Error(), "context deadline exceeded") {
attempt += 1
if attempt > r.maxRetries {
break
}
continue
}
if resp != nil && resp.StatusCode/100 >= 4 { if resp != nil && resp.StatusCode/100 >= 4 {
body, _ := io.ReadAll(resp.Body) body, _ := io.ReadAll(resp.Body)
if body != nil { if body != nil {
@@ -161,6 +172,7 @@ func (r *HTTPClient) Do(req *http.Request) (*http.Response, error) {
} }
} }
incr := r.shouldRetry(resp, reqHasRangeHeader, err, r.cfg.GetRateLimitSleepSeconds()) incr := r.shouldRetry(resp, reqHasRangeHeader, err, r.cfg.GetRateLimitSleepSeconds())
r.log.Debugf("got %s incr %d/%d", req.URL, incr, attempt)
if incr > 0 { if incr > 0 {
attempt += incr attempt += incr
if attempt > r.maxRetries { if attempt > r.maxRetries {
@@ -217,7 +229,10 @@ func (r *HTTPClient) proxyDialer(proxyURL *url.URL) (proxy.Dialer, error) {
return nil, fmt.Errorf("unsupported proxy scheme: %s", proxyURL.Scheme) return nil, fmt.Errorf("unsupported proxy scheme: %s", proxyURL.Scheme)
} }
func shouldRetryFunc(resp *http.Response, reqHasRangeHeader bool, err error, rateLimitSleep int) int { func (r *HTTPClient) shouldRetry(resp *http.Response, reqHasRangeHeader bool, err error, rateLimitSleep int) int {
if err != nil {
r.log.Errorf("http error +%v", err)
}
if err != nil && strings.HasPrefix(err.Error(), "api response error:") { if err != nil && strings.HasPrefix(err.Error(), "api response error:") {
if apiErr, ok := err.(*ApiErrorResponse); ok { if apiErr, ok := err.(*ApiErrorResponse); ok {
switch apiErr.Code { switch apiErr.Code {
@@ -270,13 +285,15 @@ func backoffFunc(attempt int) time.Duration {
} }
func (r *HTTPClient) CanFetchFirstByte(url string) bool { func (r *HTTPClient) CanFetchFirstByte(url string) bool {
timeout := time.Duration(r.cfg.GetRealDebridTimeout()) * time.Second
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
req, err := http.NewRequest("GET", url, nil) req, err := http.NewRequest("GET", url, nil)
if err != nil { if err != nil {
return false return false
} }
req.Header.Set("Range", "bytes=0-0") req.Header.Set("Range", "bytes=0-0")
req = req.WithContext(ctx)
resp, err := r.client.Do(req) resp, err := r.client.Do(req)
if err != nil { if err != nil {
return false return false