Refactor robust retry in http client

This commit is contained in:
Ben Sarmiento
2024-01-22 19:41:29 +01:00
parent 3131e3cbdf
commit 8aa1362df2
2 changed files with 52 additions and 55 deletions

View File

@@ -164,8 +164,8 @@ func (dl *Downloader) streamFileToResponse(torrent *intTor.Torrent, file *intTor
download, err := dl.client.Do(dlReq) download, err := dl.client.Do(dlReq)
if err != nil { if err != nil {
log.Warnf("Cannot download file %s: %v", unrestrict.Download, err)
if file != nil && unrestrict.Streamable == 1 { if file != nil && unrestrict.Streamable == 1 {
log.Warnf("Cannot download file %s: %v", file.Path, err)
if cfg.EnableRepair() && torrent != nil { if cfg.EnableRepair() && torrent != nil {
torrent.BrokenLinks.Add(file.Link) torrent.BrokenLinks.Add(file.Link)
file.Link = "repair" file.Link = "repair"

View File

@@ -22,15 +22,11 @@ import (
cmap "github.com/orcaman/concurrent-map/v2" cmap "github.com/orcaman/concurrent-map/v2"
) )
const (
RATE_LIMIT_FACTOR = 4 // should always be > 1
)
type HTTPClient struct { type HTTPClient struct {
client *http.Client client *http.Client
maxRetries int maxRetries int
backoff func(attempt int) time.Duration backoff func(attempt int) time.Duration
getRetryIncr func(resp *http.Response, hasRangeHeader bool, err error) int getRetryIncr func(resp *http.Response, reqHasRangeHeader bool, err error) int
bearerToken string bearerToken string
ensureIPv6Host bool ensureIPv6Host bool
cfg config.ConfigInterface cfg config.ConfigInterface
@@ -44,12 +40,12 @@ type HTTPClient struct {
// "error_code": 35 // "error_code": 35
// } // }
type ErrorResponse struct { type ApiErrorResponse struct {
Message string `json:"error"` Message string `json:"error"`
Code int `json:"error_code"` Code int `json:"error_code"`
} }
func (e *ErrorResponse) Error() string { func (e *ApiErrorResponse) Error() string {
return fmt.Sprintf("api response error: %s (code: %d)", e.Message, e.Code) return fmt.Sprintf("api response error: %s (code: %d)", e.Message, e.Code)
} }
@@ -59,7 +55,7 @@ func NewHTTPClient(token string, maxRetries int, timeoutSecs int, ensureIPv6Host
client: &http.Client{ client: &http.Client{
Timeout: time.Duration(timeoutSecs) * time.Second, Timeout: time.Duration(timeoutSecs) * time.Second,
}, },
maxRetries: maxRetries * RATE_LIMIT_FACTOR, maxRetries: maxRetries,
backoff: func(attempt int) time.Duration { backoff: func(attempt int) time.Duration {
maxDuration := 60 maxDuration := 60
backoff := int(math.Pow(2, float64(attempt))) backoff := int(math.Pow(2, float64(attempt)))
@@ -68,44 +64,45 @@ func NewHTTPClient(token string, maxRetries int, timeoutSecs int, ensureIPv6Host
} }
return time.Duration(backoff) * time.Second return time.Duration(backoff) * time.Second
}, },
getRetryIncr: func(resp *http.Response, hasRangeHeader bool, err error) int { getRetryIncr: func(resp *http.Response, reqHasRangeHeader bool, err error) int {
if resp != nil { if err != nil && strings.HasPrefix(err.Error(), "api response error:") {
if resp.StatusCode == 429 { if apiErr, ok := err.(*ApiErrorResponse); ok {
return 1 switch apiErr.Code {
} case -1: // Internal error
if resp.StatusCode == http.StatusOK && hasRangeHeader { return 1
return 1 case 5: // Slow down (retry infinitely)
} time.Sleep(time.Duration(cfg.GetRateLimitSleepSeconds()) * time.Second)
return 0 // don't retry return -1
} else if err != nil { case 6: // Ressource unreachable
log.Errorf("Client request error: %s", err.Error()) return 1
if strings.Contains(err.Error(), "api response error") { case 17: // Hoster in maintenance
if apiErr, ok := err.(*ErrorResponse); ok { return 1
switch apiErr.Code { case 19: // Hoster temporarily unavailable
case -1: // Internal error return 1
return 1 case 25: // Service unavailable
case 5: // Slow down return 1
return 1 case 34: // Too many requests (retry infinitely)
case 6: // Ressource unreachable time.Sleep(time.Duration(cfg.GetRateLimitSleepSeconds()) * time.Second)
return 1 return -1
case 17: // Hoster in maintenance case 36: // Fair Usage Limit
return 1 return 1
case 19: // Hoster temporarily unavailable default:
return 1 return 0 // don't retry
case 25: // Service unavailable
return 1
case 34: // Too many requests
return 1
case 36: // Fair Usage Limit
return 1
default:
return 0 // don't retry
}
} }
} }
return 1
} }
return RATE_LIMIT_FACTOR if resp != nil {
if resp.StatusCode == 429 {
time.Sleep(time.Duration(cfg.GetRateLimitSleepSeconds()) * time.Second)
return -1
}
if resp.Header.Get("Content-Range") == "" && reqHasRangeHeader {
time.Sleep(10 * time.Millisecond)
return -1
}
return 0 // don't retry
}
return 1
}, },
ensureIPv6Host: ensureIPv6Host, ensureIPv6Host: ensureIPv6Host,
cfg: cfg, cfg: cfg,
@@ -177,7 +174,7 @@ func (r *HTTPClient) Do(req *http.Request) (*http.Response, error) {
} }
r.replaceHostIfNeeded(req) r.replaceHostIfNeeded(req)
// check if Range header is set // check if Range header is set
hasRangeHeader := req.Header.Get("Range") != "" && req.Header.Get("Range") != "bytes=0-" reqHasRangeHeader := req.Header.Get("Range") != "" && req.Header.Get("Range") != "bytes=0-"
var resp *http.Response var resp *http.Response
var err error var err error
@@ -187,7 +184,7 @@ func (r *HTTPClient) Do(req *http.Request) (*http.Response, error) {
if resp != nil && (resp.StatusCode < http.StatusOK || resp.StatusCode > http.StatusPartialContent) { if resp != nil && (resp.StatusCode < http.StatusOK || resp.StatusCode > http.StatusPartialContent) {
body, _ := io.ReadAll(resp.Body) body, _ := io.ReadAll(resp.Body)
if body != nil { if body != nil {
var errResp ErrorResponse var errResp ApiErrorResponse
jsonErr := json.Unmarshal(body, &errResp) jsonErr := json.Unmarshal(body, &errResp)
if jsonErr == nil { if jsonErr == nil {
errResp.Message += fmt.Sprintf(" (status code: %d)", resp.StatusCode) errResp.Message += fmt.Sprintf(" (status code: %d)", resp.StatusCode)
@@ -195,23 +192,23 @@ func (r *HTTPClient) Do(req *http.Request) (*http.Response, error) {
} }
} }
} }
if incr := r.getRetryIncr(resp, hasRangeHeader, err); incr > 0 { incr := r.getRetryIncr(resp, reqHasRangeHeader, err)
if resp != nil {
resp.Body.Close()
}
if incr > 0 {
attempt += incr attempt += incr
if attempt > r.maxRetries { if attempt > r.maxRetries {
break break
} }
if incr >= RATE_LIMIT_FACTOR { if incr > 0 {
time.Sleep(r.backoff(attempt)) time.Sleep(r.backoff(attempt))
} else {
time.Sleep(time.Duration(r.cfg.GetRateLimitSleepSeconds()) * time.Second) // extra delay
} }
if resp != nil { } else if incr == 0 {
resp.Body.Close() // don't retry anymore
}
} else {
// if incr == 0, don't retry anymore
break break
} }
// if incr < 0, retry infinitely
} }
return resp, err return resp, err
} }