From 8aa1362df2153ddf17440651d898b756a859ca37 Mon Sep 17 00:00:00 2001 From: Ben Sarmiento Date: Mon, 22 Jan 2024 19:41:29 +0100 Subject: [PATCH] Refactor robust retry in http client --- internal/universal/downloader.go | 2 +- pkg/http/client.go | 105 +++++++++++++++---------------- 2 files changed, 52 insertions(+), 55 deletions(-) diff --git a/internal/universal/downloader.go b/internal/universal/downloader.go index 062dcf8..61fae82 100644 --- a/internal/universal/downloader.go +++ b/internal/universal/downloader.go @@ -164,8 +164,8 @@ func (dl *Downloader) streamFileToResponse(torrent *intTor.Torrent, file *intTor download, err := dl.client.Do(dlReq) if err != nil { + log.Warnf("Cannot download file %s: %v", unrestrict.Download, err) if file != nil && unrestrict.Streamable == 1 { - log.Warnf("Cannot download file %s: %v", file.Path, err) if cfg.EnableRepair() && torrent != nil { torrent.BrokenLinks.Add(file.Link) file.Link = "repair" diff --git a/pkg/http/client.go b/pkg/http/client.go index 6141243..fb8df26 100644 --- a/pkg/http/client.go +++ b/pkg/http/client.go @@ -22,15 +22,11 @@ import ( cmap "github.com/orcaman/concurrent-map/v2" ) -const ( - RATE_LIMIT_FACTOR = 4 // should always be > 1 -) - type HTTPClient struct { client *http.Client maxRetries int backoff func(attempt int) time.Duration - getRetryIncr func(resp *http.Response, hasRangeHeader bool, err error) int + getRetryIncr func(resp *http.Response, reqHasRangeHeader bool, err error) int bearerToken string ensureIPv6Host bool cfg config.ConfigInterface @@ -44,12 +40,12 @@ type HTTPClient struct { // "error_code": 35 // } -type ErrorResponse struct { +type ApiErrorResponse struct { Message string `json:"error"` Code int `json:"error_code"` } -func (e *ErrorResponse) Error() string { +func (e *ApiErrorResponse) Error() string { return fmt.Sprintf("api response error: %s (code: %d)", e.Message, e.Code) } @@ -59,7 +55,7 @@ func NewHTTPClient(token string, maxRetries int, timeoutSecs int, ensureIPv6Host client: &http.Client{ Timeout: time.Duration(timeoutSecs) * time.Second, }, - maxRetries: maxRetries * RATE_LIMIT_FACTOR, + maxRetries: maxRetries, backoff: func(attempt int) time.Duration { maxDuration := 60 backoff := int(math.Pow(2, float64(attempt))) @@ -68,44 +64,45 @@ func NewHTTPClient(token string, maxRetries int, timeoutSecs int, ensureIPv6Host } return time.Duration(backoff) * time.Second }, - getRetryIncr: func(resp *http.Response, hasRangeHeader bool, err error) int { - if resp != nil { - if resp.StatusCode == 429 { - return 1 - } - if resp.StatusCode == http.StatusOK && hasRangeHeader { - return 1 - } - return 0 // don't retry - } else if err != nil { - log.Errorf("Client request error: %s", err.Error()) - if strings.Contains(err.Error(), "api response error") { - if apiErr, ok := err.(*ErrorResponse); ok { - switch apiErr.Code { - case -1: // Internal error - return 1 - case 5: // Slow down - return 1 - case 6: // Ressource unreachable - return 1 - case 17: // Hoster in maintenance - return 1 - case 19: // Hoster temporarily unavailable - return 1 - case 25: // Service unavailable - return 1 - case 34: // Too many requests - return 1 - case 36: // Fair Usage Limit - return 1 - default: - return 0 // don't retry - } + getRetryIncr: func(resp *http.Response, reqHasRangeHeader bool, err error) int { + if err != nil && strings.HasPrefix(err.Error(), "api response error:") { + if apiErr, ok := err.(*ApiErrorResponse); ok { + switch apiErr.Code { + case -1: // Internal error + return 1 + case 5: // Slow down (retry infinitely) + time.Sleep(time.Duration(cfg.GetRateLimitSleepSeconds()) * time.Second) + return -1 + case 6: // Ressource unreachable + return 1 + case 17: // Hoster in maintenance + return 1 + case 19: // Hoster temporarily unavailable + return 1 + case 25: // Service unavailable + return 1 + case 34: // Too many requests (retry infinitely) + time.Sleep(time.Duration(cfg.GetRateLimitSleepSeconds()) * time.Second) + return -1 + case 36: // Fair Usage Limit + return 1 + default: + return 0 // don't retry } } - return 1 } - return RATE_LIMIT_FACTOR + if resp != nil { + if resp.StatusCode == 429 { + time.Sleep(time.Duration(cfg.GetRateLimitSleepSeconds()) * time.Second) + return -1 + } + if resp.Header.Get("Content-Range") == "" && reqHasRangeHeader { + time.Sleep(10 * time.Millisecond) + return -1 + } + return 0 // don't retry + } + return 1 }, ensureIPv6Host: ensureIPv6Host, cfg: cfg, @@ -177,7 +174,7 @@ func (r *HTTPClient) Do(req *http.Request) (*http.Response, error) { } r.replaceHostIfNeeded(req) // check if Range header is set - hasRangeHeader := req.Header.Get("Range") != "" && req.Header.Get("Range") != "bytes=0-" + reqHasRangeHeader := req.Header.Get("Range") != "" && req.Header.Get("Range") != "bytes=0-" var resp *http.Response var err error @@ -187,7 +184,7 @@ func (r *HTTPClient) Do(req *http.Request) (*http.Response, error) { if resp != nil && (resp.StatusCode < http.StatusOK || resp.StatusCode > http.StatusPartialContent) { body, _ := io.ReadAll(resp.Body) if body != nil { - var errResp ErrorResponse + var errResp ApiErrorResponse jsonErr := json.Unmarshal(body, &errResp) if jsonErr == nil { errResp.Message += fmt.Sprintf(" (status code: %d)", resp.StatusCode) @@ -195,23 +192,23 @@ func (r *HTTPClient) Do(req *http.Request) (*http.Response, error) { } } } - if incr := r.getRetryIncr(resp, hasRangeHeader, err); incr > 0 { + incr := r.getRetryIncr(resp, reqHasRangeHeader, err) + if resp != nil { + resp.Body.Close() + } + if incr > 0 { attempt += incr if attempt > r.maxRetries { break } - if incr >= RATE_LIMIT_FACTOR { + if incr > 0 { time.Sleep(r.backoff(attempt)) - } else { - time.Sleep(time.Duration(r.cfg.GetRateLimitSleepSeconds()) * time.Second) // extra delay } - if resp != nil { - resp.Body.Close() - } - } else { - // if incr == 0, don't retry anymore + } else if incr == 0 { + // don't retry anymore break } + // if incr < 0, retry infinitely } return resp, err }