package http import ( "context" "math" "net" "net/http" "strings" "time" "github.com/debridmediamanager/zurg/internal/config" "github.com/debridmediamanager/zurg/pkg/logutil" cmap "github.com/orcaman/concurrent-map/v2" ) const ( RATE_LIMIT_FACTOR = 4 // should always be > 1 ) type HTTPClient struct { client *http.Client maxRetries int backoff func(attempt int) time.Duration getRetryIncr func(resp *http.Response, hasRangeHeader bool, err error) int bearerToken string cfg config.ConfigInterface ipv6 cmap.ConcurrentMap[string, string] log *logutil.Logger } func NewHTTPClient(token string, maxRetries int, timeoutSecs int, cfg config.ConfigInterface, log *logutil.Logger) *HTTPClient { client := HTTPClient{ bearerToken: token, client: &http.Client{ Timeout: time.Duration(timeoutSecs) * time.Second, }, maxRetries: maxRetries * RATE_LIMIT_FACTOR, backoff: func(attempt int) time.Duration { maxDuration := 60 backoff := int(math.Pow(2, float64(attempt))) if backoff > maxDuration { backoff = maxDuration } return time.Duration(backoff) * time.Second }, getRetryIncr: func(resp *http.Response, hasRangeHeader bool, err error) int { if resp != nil { if resp.StatusCode == 429 || resp.StatusCode == 400 || resp.StatusCode == 403 { return 1 // retry but don't increment attempt } if resp.StatusCode != http.StatusPartialContent && hasRangeHeader { return 1 } return 0 // don't retry } else if err != nil { errStr := err.Error() if strings.Contains(errStr, "EOF") || strings.Contains(errStr, "connection reset") || strings.Contains(errStr, "no such host") { return 1 // retry but don't increment attempt } else { return RATE_LIMIT_FACTOR } } return RATE_LIMIT_FACTOR // retry and increment attempt }, cfg: cfg, ipv6: cmap.New[string](), log: log, } if cfg.ShouldForceIPv6() { dialer := &net.Dialer{} dialContext := func(ctx context.Context, network, address string) (net.Conn, error) { host, port, err := net.SplitHostPort(address) if err != nil { return nil, err } if ipv6Address, ok := client.ipv6.Get(address); ok { return dialer.DialContext(ctx, network, ipv6Address) } ips, err := net.DefaultResolver.LookupIPAddr(ctx, host) if err != nil { return nil, err } for _, ip := range ips { if ip.IP.To4() == nil { // IPv6 address found ip6Host := ip.IP.String() ipv6Address := net.JoinHostPort(ip6Host, port) client.ipv6.Set(address, ipv6Address) return dialer.DialContext(ctx, network, ipv6Address) } } return dialer.DialContext(ctx, network, address) } transport := &http.Transport{ DialContext: dialContext, } client.client.Transport = transport } return &client } func (r *HTTPClient) Do(req *http.Request) (*http.Response, error) { if r.bearerToken != "" { req.Header.Set("Authorization", "Bearer "+r.bearerToken) } // check if Range header is set hasRangeHeader := req.Header.Get("Range") != "" var resp *http.Response var err error attempt := 0 for { resp, err = r.client.Do(req) if incr := r.getRetryIncr(resp, hasRangeHeader, err); incr > 0 { attempt += incr if attempt > r.maxRetries { break } if incr >= RATE_LIMIT_FACTOR { time.Sleep(r.backoff(attempt)) } else { time.Sleep(time.Duration(r.cfg.GetRateLimitSleepSeconds()) * time.Second) // extra delay } if resp != nil { resp.Body.Close() } } else { // if incr == 0, don't retry anymore break } } return resp, err }