From 17ab115747fb3a00feb02e1d6c23136ef17d59be Mon Sep 17 00:00:00 2001 From: Ben Sarmiento Date: Fri, 26 Jan 2024 22:13:36 +0100 Subject: [PATCH] Readd downloads mount --- internal/config/types.go | 8 +- internal/dav/infuse.go | 4 +- internal/dav/listing.go | 4 +- internal/handlers/home.go | 2 +- internal/http/listing.go | 4 +- internal/torrent/manager.go | 22 +---- internal/universal/downloader.go | 2 +- pkg/http/client.go | 157 ++++++++++++++++--------------- pkg/realdebrid/unrestrict.go | 2 +- 9 files changed, 98 insertions(+), 107 deletions(-) diff --git a/internal/config/types.go b/internal/config/types.go index aea10e2..20bd7c1 100644 --- a/internal/config/types.go +++ b/internal/config/types.go @@ -29,7 +29,7 @@ type ConfigInterface interface { ShouldForceIPv6() bool GetRealDebridTimeout() int GetRetriesUntilFailed() int - EnableDownloadCache() bool + EnableDownloadMount() bool GetRateLimitSleepSeconds() int ShouldDeleteRarFiles() bool } @@ -54,7 +54,7 @@ type ZurgConfig struct { DeleteRarFiles bool `yaml:"auto_delete_rar_torrents" json:"auto_delete_rar_torrents"` RealDebridTimeout int `yaml:"realdebrid_timeout_secs" json:"realdebrid_timeout_secs"` - UseDownloadCache bool `yaml:"use_download_cache" json:"use_download_cache"` + DownloadMount bool `yaml:"enable_download_mount" json:"enable_download_mount"` RateLimitSleepSeconds int `yaml:"rate_limit_sleep_secs" json:"rate_limit_sleep_secs"` RetriesUntilFailed int `yaml:"retries_until_failed" json:"retries_until_failed"` PreferredHosts []string `yaml:"preferred_hosts" json:"preferred_hosts"` @@ -168,8 +168,8 @@ func (z *ZurgConfig) GetRetriesUntilFailed() int { return z.RetriesUntilFailed } -func (z *ZurgConfig) EnableDownloadCache() bool { - return z.UseDownloadCache +func (z *ZurgConfig) EnableDownloadMount() bool { + return z.DownloadMount } func (z *ZurgConfig) GetRealDebridTimeout() int { diff --git a/internal/dav/infuse.go b/internal/dav/infuse.go index 923125b..ec1461a 100644 --- a/internal/dav/infuse.go +++ b/internal/dav/infuse.go @@ -23,7 +23,7 @@ func ServeRootDirectoryForInfuse(torMgr *torrent.TorrentManager) ([]byte, error) } buf.WriteString(dav.BaseDirectory(directory, "")) } - if torMgr.Config.GetConfig().UseDownloadCache { + if torMgr.Config.EnableDownloadMount() { buf.WriteString(dav.BaseDirectory(config.DOWNLOADS, "")) } _, size := version.GetFile() @@ -95,7 +95,7 @@ func ServeFilesListForInfuse(directory, torrentName string, torMgr *torrent.Torr func ServeDownloadsListForInfuse(torMgr *torrent.TorrentManager) ([]byte, error) { var buf bytes.Buffer - if !torMgr.Config.GetConfig().UseDownloadCache { + if !torMgr.Config.EnableDownloadMount() { buf.WriteString("Enable download cache in config to use this feature") return buf.Bytes(), nil } diff --git a/internal/dav/listing.go b/internal/dav/listing.go index 4e7e25b..85e0503 100644 --- a/internal/dav/listing.go +++ b/internal/dav/listing.go @@ -25,7 +25,7 @@ func ServeRootDirectory(torMgr *torrent.TorrentManager) ([]byte, error) { } buf.WriteString(dav.Directory(directory, "")) } - if torMgr.Config.GetConfig().UseDownloadCache { + if torMgr.Config.EnableDownloadMount() { buf.WriteString(dav.Directory(config.DOWNLOADS, "")) } _, size := version.GetFile() @@ -121,7 +121,7 @@ func HandleSingleFile(directory, torrentName, fileName string, torMgr *torrent.T func ServeDownloadsList(torMgr *torrent.TorrentManager) ([]byte, error) { var buf bytes.Buffer - if !torMgr.Config.GetConfig().UseDownloadCache { + if !torMgr.Config.EnableDownloadMount() { buf.WriteString("Enable download cache in config to use this feature") return buf.Bytes(), nil } diff --git a/internal/handlers/home.go b/internal/handlers/home.go index 51ffc65..6271736 100644 --- a/internal/handlers/home.go +++ b/internal/handlers/home.go @@ -280,7 +280,7 @@ func (zr *Handlers) handleHome(resp http.ResponseWriter, req *http.Request) { response.Config.EnableRepair(), response.Config.ShouldDeleteRarFiles(), response.Config.GetRealDebridTimeout(), - response.Config.EnableDownloadCache(), + response.Config.EnableDownloadMount(), response.Config.GetRateLimitSleepSeconds(), response.Config.GetRetriesUntilFailed(), strings.Join(response.Config.PreferredHosts, ", "), diff --git a/internal/http/listing.go b/internal/http/listing.go index ec125ae..ba3b012 100644 --- a/internal/http/listing.go +++ b/internal/http/listing.go @@ -17,7 +17,7 @@ func ServeRootDirectory(torMgr *torrent.TorrentManager) ([]byte, error) { var buf bytes.Buffer buf.WriteString("
    ") directories := torMgr.DirectoryMap.Keys() - if torMgr.Config.GetConfig().UseDownloadCache { + if torMgr.Config.EnableDownloadMount() { directories = append(directories, config.DOWNLOADS) } sort.Strings(directories) @@ -95,7 +95,7 @@ func ServeFilesList(directory, torrentName string, torMgr *torrent.TorrentManage func ServeDownloadsList(torMgr *torrent.TorrentManager) ([]byte, error) { var buf bytes.Buffer - if !torMgr.Config.GetConfig().UseDownloadCache { + if !torMgr.Config.EnableDownloadMount() { buf.WriteString("Enable download cache in config to use this feature") return buf.Bytes(), nil } diff --git a/internal/torrent/manager.go b/internal/torrent/manager.go index a51250b..5dcc629 100644 --- a/internal/torrent/manager.go +++ b/internal/torrent/manager.go @@ -2,7 +2,6 @@ package torrent import ( "io" - "net/url" "os" "path/filepath" "strings" @@ -68,7 +67,7 @@ func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, w } // Fetch downloads - if t.Config.EnableDownloadCache() { + if t.Config.EnableDownloadMount() { _ = t.workerPool.Submit(func() { page := 1 offset := 0 @@ -78,7 +77,7 @@ func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, w t.log.Fatalf("Cannot get downloads: %v", err) } for i := range downloads { - t.cacheDownload(&downloads[i]) + t.DownloadMap.Set(downloads[i].Filename, &downloads[i]) } offset += len(downloads) page++ @@ -122,7 +121,8 @@ func (t *TorrentManager) UnrestrictUntilOk(link string) *realdebrid.Download { return nil } if ret != nil && ret.Link != "" && ret.Filename != "" { - t.cacheDownload(ret) + t.DownloadCache.Set(ret.Link, ret) + t.DownloadMap.Set(ret.Filename, ret) } return ret } @@ -242,17 +242,3 @@ func (t *TorrentManager) deleteTorrentFile(torrentID string) { t.log.Warnf("Cannot delete file %s: %v", filePath, err) } } - -func replaceHostInURL(inputURL string, newHost string) string { - u, err := url.Parse(inputURL) - if err != nil { - return "" - } - u.Host = newHost - return u.String() -} - -func (t *TorrentManager) cacheDownload(ret *realdebrid.Download) { - t.DownloadCache.Set(ret.Link, ret) - t.DownloadMap.Set(ret.Filename, ret) -} diff --git a/internal/universal/downloader.go b/internal/universal/downloader.go index 8a1075e..e7f1c8e 100644 --- a/internal/universal/downloader.go +++ b/internal/universal/downloader.go @@ -180,7 +180,7 @@ func (dl *Downloader) streamFileToResponse(torrent *intTor.Torrent, file *intTor defer download.Body.Close() - if download.StatusCode != http.StatusOK && download.StatusCode != http.StatusPartialContent { + if download.StatusCode/100 != 2 { if file != nil && unrestrict.Streamable == 1 { log.Warnf("Received a %s status code for file %s", download.Status, file.Path) torrent.BrokenLinks.Add(file.Link) diff --git a/pkg/http/client.go b/pkg/http/client.go index 93f6fa2..e286626 100644 --- a/pkg/http/client.go +++ b/pkg/http/client.go @@ -28,7 +28,7 @@ type HTTPClient struct { maxRetries int timeoutSecs int backoff func(attempt int) time.Duration - getRetryIncr func(resp *http.Response, reqHasRangeHeader bool, err error) int + shouldRetry func(resp *http.Response, reqHasRangeHeader bool, err error, rateLimitSleep int) int bearerToken string ensureIPv6Host bool cfg config.ConfigInterface @@ -53,58 +53,12 @@ func (e *ApiErrorResponse) Error() string { func NewHTTPClient(token string, maxRetries int, timeoutSecs int, ensureIPv6Host bool, cfg config.ConfigInterface, log *logutil.Logger) *HTTPClient { client := HTTPClient{ - bearerToken: token, - client: &http.Client{}, - maxRetries: maxRetries, - timeoutSecs: timeoutSecs, - backoff: func(attempt int) time.Duration { - maxDuration := 60 - backoff := int(math.Pow(2, float64(attempt))) - if backoff > maxDuration { - backoff = maxDuration - } - return time.Duration(backoff) * time.Second - }, - getRetryIncr: func(resp *http.Response, reqHasRangeHeader bool, err error) int { - if err != nil && strings.HasPrefix(err.Error(), "api response error:") { - if apiErr, ok := err.(*ApiErrorResponse); ok { - switch apiErr.Code { - case -1: // Internal error - return 1 - case 5: // Slow down (retry infinitely) - time.Sleep(time.Duration(cfg.GetRateLimitSleepSeconds()) * time.Second) - return -1 - case 6: // Ressource unreachable - return 1 - case 17: // Hoster in maintenance - return 1 - case 19: // Hoster temporarily unavailable - return 1 - case 25: // Service unavailable - return 1 - case 34: // Too many requests (retry infinitely) - time.Sleep(time.Duration(cfg.GetRateLimitSleepSeconds()) * time.Second) - return -1 - case 36: // Fair Usage Limit - return 1 - default: - return 0 // don't retry - } - } - } - if resp != nil { - if resp.StatusCode == 429 { - time.Sleep(time.Duration(cfg.GetRateLimitSleepSeconds()) * time.Second) - return -1 - } - if resp.Header.Get("Content-Range") == "" && reqHasRangeHeader { - time.Sleep(10 * time.Millisecond) - return -1 - } - return 0 // don't retry - } - return 1 - }, + bearerToken: token, + client: &http.Client{}, + maxRetries: maxRetries, + timeoutSecs: timeoutSecs, + backoff: backoffFunc, + shouldRetry: shouldRetryFunc, ensureIPv6Host: ensureIPv6Host, cfg: cfg, ipv6: cmap.New[string](), @@ -120,7 +74,7 @@ func NewHTTPClient(token string, maxRetries int, timeoutSecs int, ensureIPv6Host log.Errorf("Failed to parse proxy URL: %v", err) return nil } - dialer, err = proxyDialer(proxyURL) + dialer, err = client.proxyDialer(proxyURL) if err != nil { log.Errorf("Failed to create proxy dialer: %v", err) return nil @@ -170,8 +124,9 @@ func NewHTTPClient(token string, maxRetries int, timeoutSecs int, ensureIPv6Host } } else { client.client.Transport = &http.Transport{ - MaxIdleConns: maxConnections, - MaxConnsPerHost: maxConnections, + ResponseHeaderTimeout: time.Duration(timeoutSecs) * time.Second, + MaxIdleConns: 0, + MaxConnsPerHost: maxConnections, DialContext: func(ctx context.Context, network, address string) (net.Conn, error) { return dialer.Dial(network, address) }, @@ -186,16 +141,15 @@ func (r *HTTPClient) Do(req *http.Request) (*http.Response, error) { req.Header.Set("Authorization", "Bearer "+r.bearerToken) } // check if Range header is set - reqHasRangeHeader := req.Header.Get("Range") != "" && req.Header.Get("Range") != "bytes=0-" + reqHasRangeHeader := req.Header.Get("Range") != "" && !strings.HasPrefix(req.Header.Get("Range"), "bytes=0-") var resp *http.Response var err error attempt := 0 for { - r.replaceHostIfNeeded(req) - + r.replaceHostIfNeeded(req) // needed for ipv6 resp, err = r.client.Do(req) - if resp != nil && (resp.StatusCode < http.StatusOK || resp.StatusCode > http.StatusPartialContent) { + if resp != nil && resp.StatusCode/100 >= 4 { body, _ := io.ReadAll(resp.Body) if body != nil { var errResp ApiErrorResponse @@ -206,59 +160,110 @@ func (r *HTTPClient) Do(req *http.Request) (*http.Response, error) { } } } - incr := r.getRetryIncr(resp, reqHasRangeHeader, err) + incr := r.shouldRetry(resp, reqHasRangeHeader, err, r.cfg.GetRateLimitSleepSeconds()) if incr > 0 { attempt += incr if attempt > r.maxRetries { break } - if incr > 0 { - time.Sleep(r.backoff(attempt)) - } + time.Sleep(r.backoff(attempt)) } else if incr == 0 { + time.Sleep(10 * time.Millisecond) + } else { // don't retry anymore break } - // if incr < 0, retry infinitely } return resp, err } func (r *HTTPClient) replaceHostIfNeeded(req *http.Request) { - if !r.ensureIPv6Host && !r.cfg.ShouldForceIPv6() || !strings.HasSuffix(req.URL.Host, "download.real-debrid.com") { + if !r.ensureIPv6Host && !r.cfg.ShouldForceIPv6() || !strings.HasSuffix(req.Host, "download.real-debrid.com") { return } - // get subdomain of req.URL.Host - subdomain := strings.Split(req.URL.Host, ".")[0] + // get subdomain of req.Host + subdomain := strings.Split(req.Host, ".")[0] // check if subdomain is numeric _, err := strconv.Atoi(subdomain) if err == nil { // subdomain is numeric, replace it with .cloud - req.URL.Host = strings.Replace(req.URL.Host, ".com", ".cloud", 1) + req.Host = strings.Replace(req.Host, ".com", ".cloud", 1) + req.URL.Host = req.Host } // check if host is in the list of IPv6 hosts found := false for _, h := range r.ipv6Hosts { - if h == req.URL.Host { + if h == req.Host { found = true break } } if !found { + r.log.Debug("Not found, assigning random IPv6 host") // random IPv6 host - req.URL.Host = r.ipv6Hosts[rand.Intn(len(r.ipv6Hosts))] + req.Host = r.ipv6Hosts[rand.Intn(len(r.ipv6Hosts))] + req.URL.Host = req.Host } - fmt.Println(req.URL.Host) } -func proxyDialer(proxyURL *url.URL) (proxy.Dialer, error) { +func (r *HTTPClient) proxyDialer(proxyURL *url.URL) (proxy.Dialer, error) { if proxyURL.Scheme == "http" || proxyURL.Scheme == "https" { - // Create a new HTTP proxy dialer - httpProxyDialer := http_dialer.New(proxyURL) + httpProxyDialer := http_dialer.New(proxyURL, http_dialer.WithConnectionTimeout(time.Duration(r.timeoutSecs)*time.Second)) return httpProxyDialer, nil } else if proxyURL.Scheme == "socks5" { - // For SOCKS5 proxies, use the proxy package's FromURL return proxy.FromURL(proxyURL, proxy.Direct) } return nil, fmt.Errorf("unsupported proxy scheme: %s", proxyURL.Scheme) } + +func shouldRetryFunc(resp *http.Response, reqHasRangeHeader bool, err error, rateLimitSleep int) int { + if err != nil && strings.HasPrefix(err.Error(), "api response error:") { + if apiErr, ok := err.(*ApiErrorResponse); ok { + switch apiErr.Code { + case -1: // Internal error + return 1 + case 5: // Slow down (retry infinitely) + time.Sleep(time.Duration(rateLimitSleep) * time.Second) + return 0 + case 6: // Ressource unreachable + return 1 + case 17: // Hoster in maintenance + return 1 + case 18: // Hoster limit reached + return 1 + case 19: // Hoster temporarily unavailable + return 1 + case 25: // Service unavailable + return 1 + case 34: // Too many requests (retry infinitely) + time.Sleep(time.Duration(rateLimitSleep) * time.Second) + return 0 + case 36: // Fair Usage Limit + return 1 + default: + return -1 // don't retry + } + } + } + if resp != nil { + if resp.StatusCode == 429 { + time.Sleep(time.Duration(rateLimitSleep) * time.Second) + return 0 + } + if resp.StatusCode/100 == 2 && resp.Header.Get("Content-Range") == "" && reqHasRangeHeader { + time.Sleep(10 * time.Millisecond) + return 0 + } + return -1 // don't retry + } + return 1 +} + +func backoffFunc(attempt int) time.Duration { + maxDuration := 60 + backoff := int(math.Pow(2, float64(attempt))) + if backoff > maxDuration { + backoff = maxDuration + } + return time.Duration(backoff) * time.Second +} diff --git a/pkg/realdebrid/unrestrict.go b/pkg/realdebrid/unrestrict.go index 7594bf0..45dad97 100644 --- a/pkg/realdebrid/unrestrict.go +++ b/pkg/realdebrid/unrestrict.go @@ -19,7 +19,7 @@ func (rd *RealDebrid) CanFetchFirstByte(url string) bool { defer resp.Body.Close() // If server supports partial content - if resp.StatusCode == http.StatusPartialContent || resp.StatusCode == http.StatusOK { + if resp.StatusCode/100 == 2 { buffer := make([]byte, 1) _, err = resp.Body.Read(buffer) if err == nil {