Readd downloads mount

This commit is contained in:
Ben Sarmiento
2024-01-26 22:13:36 +01:00
parent ef3be36932
commit 17ab115747
9 changed files with 98 additions and 107 deletions

View File

@@ -29,7 +29,7 @@ type ConfigInterface interface {
ShouldForceIPv6() bool ShouldForceIPv6() bool
GetRealDebridTimeout() int GetRealDebridTimeout() int
GetRetriesUntilFailed() int GetRetriesUntilFailed() int
EnableDownloadCache() bool EnableDownloadMount() bool
GetRateLimitSleepSeconds() int GetRateLimitSleepSeconds() int
ShouldDeleteRarFiles() bool ShouldDeleteRarFiles() bool
} }
@@ -54,7 +54,7 @@ type ZurgConfig struct {
DeleteRarFiles bool `yaml:"auto_delete_rar_torrents" json:"auto_delete_rar_torrents"` DeleteRarFiles bool `yaml:"auto_delete_rar_torrents" json:"auto_delete_rar_torrents"`
RealDebridTimeout int `yaml:"realdebrid_timeout_secs" json:"realdebrid_timeout_secs"` RealDebridTimeout int `yaml:"realdebrid_timeout_secs" json:"realdebrid_timeout_secs"`
UseDownloadCache bool `yaml:"use_download_cache" json:"use_download_cache"` DownloadMount bool `yaml:"enable_download_mount" json:"enable_download_mount"`
RateLimitSleepSeconds int `yaml:"rate_limit_sleep_secs" json:"rate_limit_sleep_secs"` RateLimitSleepSeconds int `yaml:"rate_limit_sleep_secs" json:"rate_limit_sleep_secs"`
RetriesUntilFailed int `yaml:"retries_until_failed" json:"retries_until_failed"` RetriesUntilFailed int `yaml:"retries_until_failed" json:"retries_until_failed"`
PreferredHosts []string `yaml:"preferred_hosts" json:"preferred_hosts"` PreferredHosts []string `yaml:"preferred_hosts" json:"preferred_hosts"`
@@ -168,8 +168,8 @@ func (z *ZurgConfig) GetRetriesUntilFailed() int {
return z.RetriesUntilFailed return z.RetriesUntilFailed
} }
func (z *ZurgConfig) EnableDownloadCache() bool { func (z *ZurgConfig) EnableDownloadMount() bool {
return z.UseDownloadCache return z.DownloadMount
} }
func (z *ZurgConfig) GetRealDebridTimeout() int { func (z *ZurgConfig) GetRealDebridTimeout() int {

View File

@@ -23,7 +23,7 @@ func ServeRootDirectoryForInfuse(torMgr *torrent.TorrentManager) ([]byte, error)
} }
buf.WriteString(dav.BaseDirectory(directory, "")) buf.WriteString(dav.BaseDirectory(directory, ""))
} }
if torMgr.Config.GetConfig().UseDownloadCache { if torMgr.Config.EnableDownloadMount() {
buf.WriteString(dav.BaseDirectory(config.DOWNLOADS, "")) buf.WriteString(dav.BaseDirectory(config.DOWNLOADS, ""))
} }
_, size := version.GetFile() _, size := version.GetFile()
@@ -95,7 +95,7 @@ func ServeFilesListForInfuse(directory, torrentName string, torMgr *torrent.Torr
func ServeDownloadsListForInfuse(torMgr *torrent.TorrentManager) ([]byte, error) { func ServeDownloadsListForInfuse(torMgr *torrent.TorrentManager) ([]byte, error) {
var buf bytes.Buffer var buf bytes.Buffer
if !torMgr.Config.GetConfig().UseDownloadCache { if !torMgr.Config.EnableDownloadMount() {
buf.WriteString("Enable download cache in config to use this feature") buf.WriteString("Enable download cache in config to use this feature")
return buf.Bytes(), nil return buf.Bytes(), nil
} }

View File

@@ -25,7 +25,7 @@ func ServeRootDirectory(torMgr *torrent.TorrentManager) ([]byte, error) {
} }
buf.WriteString(dav.Directory(directory, "")) buf.WriteString(dav.Directory(directory, ""))
} }
if torMgr.Config.GetConfig().UseDownloadCache { if torMgr.Config.EnableDownloadMount() {
buf.WriteString(dav.Directory(config.DOWNLOADS, "")) buf.WriteString(dav.Directory(config.DOWNLOADS, ""))
} }
_, size := version.GetFile() _, size := version.GetFile()
@@ -121,7 +121,7 @@ func HandleSingleFile(directory, torrentName, fileName string, torMgr *torrent.T
func ServeDownloadsList(torMgr *torrent.TorrentManager) ([]byte, error) { func ServeDownloadsList(torMgr *torrent.TorrentManager) ([]byte, error) {
var buf bytes.Buffer var buf bytes.Buffer
if !torMgr.Config.GetConfig().UseDownloadCache { if !torMgr.Config.EnableDownloadMount() {
buf.WriteString("Enable download cache in config to use this feature") buf.WriteString("Enable download cache in config to use this feature")
return buf.Bytes(), nil return buf.Bytes(), nil
} }

View File

@@ -280,7 +280,7 @@ func (zr *Handlers) handleHome(resp http.ResponseWriter, req *http.Request) {
response.Config.EnableRepair(), response.Config.EnableRepair(),
response.Config.ShouldDeleteRarFiles(), response.Config.ShouldDeleteRarFiles(),
response.Config.GetRealDebridTimeout(), response.Config.GetRealDebridTimeout(),
response.Config.EnableDownloadCache(), response.Config.EnableDownloadMount(),
response.Config.GetRateLimitSleepSeconds(), response.Config.GetRateLimitSleepSeconds(),
response.Config.GetRetriesUntilFailed(), response.Config.GetRetriesUntilFailed(),
strings.Join(response.Config.PreferredHosts, ", "), strings.Join(response.Config.PreferredHosts, ", "),

View File

@@ -17,7 +17,7 @@ func ServeRootDirectory(torMgr *torrent.TorrentManager) ([]byte, error) {
var buf bytes.Buffer var buf bytes.Buffer
buf.WriteString("<ol>") buf.WriteString("<ol>")
directories := torMgr.DirectoryMap.Keys() directories := torMgr.DirectoryMap.Keys()
if torMgr.Config.GetConfig().UseDownloadCache { if torMgr.Config.EnableDownloadMount() {
directories = append(directories, config.DOWNLOADS) directories = append(directories, config.DOWNLOADS)
} }
sort.Strings(directories) sort.Strings(directories)
@@ -95,7 +95,7 @@ func ServeFilesList(directory, torrentName string, torMgr *torrent.TorrentManage
func ServeDownloadsList(torMgr *torrent.TorrentManager) ([]byte, error) { func ServeDownloadsList(torMgr *torrent.TorrentManager) ([]byte, error) {
var buf bytes.Buffer var buf bytes.Buffer
if !torMgr.Config.GetConfig().UseDownloadCache { if !torMgr.Config.EnableDownloadMount() {
buf.WriteString("Enable download cache in config to use this feature") buf.WriteString("Enable download cache in config to use this feature")
return buf.Bytes(), nil return buf.Bytes(), nil
} }

View File

@@ -2,7 +2,6 @@ package torrent
import ( import (
"io" "io"
"net/url"
"os" "os"
"path/filepath" "path/filepath"
"strings" "strings"
@@ -68,7 +67,7 @@ func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, w
} }
// Fetch downloads // Fetch downloads
if t.Config.EnableDownloadCache() { if t.Config.EnableDownloadMount() {
_ = t.workerPool.Submit(func() { _ = t.workerPool.Submit(func() {
page := 1 page := 1
offset := 0 offset := 0
@@ -78,7 +77,7 @@ func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, w
t.log.Fatalf("Cannot get downloads: %v", err) t.log.Fatalf("Cannot get downloads: %v", err)
} }
for i := range downloads { for i := range downloads {
t.cacheDownload(&downloads[i]) t.DownloadMap.Set(downloads[i].Filename, &downloads[i])
} }
offset += len(downloads) offset += len(downloads)
page++ page++
@@ -122,7 +121,8 @@ func (t *TorrentManager) UnrestrictUntilOk(link string) *realdebrid.Download {
return nil return nil
} }
if ret != nil && ret.Link != "" && ret.Filename != "" { if ret != nil && ret.Link != "" && ret.Filename != "" {
t.cacheDownload(ret) t.DownloadCache.Set(ret.Link, ret)
t.DownloadMap.Set(ret.Filename, ret)
} }
return ret return ret
} }
@@ -242,17 +242,3 @@ func (t *TorrentManager) deleteTorrentFile(torrentID string) {
t.log.Warnf("Cannot delete file %s: %v", filePath, err) t.log.Warnf("Cannot delete file %s: %v", filePath, err)
} }
} }
func replaceHostInURL(inputURL string, newHost string) string {
u, err := url.Parse(inputURL)
if err != nil {
return ""
}
u.Host = newHost
return u.String()
}
func (t *TorrentManager) cacheDownload(ret *realdebrid.Download) {
t.DownloadCache.Set(ret.Link, ret)
t.DownloadMap.Set(ret.Filename, ret)
}

View File

@@ -180,7 +180,7 @@ func (dl *Downloader) streamFileToResponse(torrent *intTor.Torrent, file *intTor
defer download.Body.Close() defer download.Body.Close()
if download.StatusCode != http.StatusOK && download.StatusCode != http.StatusPartialContent { if download.StatusCode/100 != 2 {
if file != nil && unrestrict.Streamable == 1 { if file != nil && unrestrict.Streamable == 1 {
log.Warnf("Received a %s status code for file %s", download.Status, file.Path) log.Warnf("Received a %s status code for file %s", download.Status, file.Path)
torrent.BrokenLinks.Add(file.Link) torrent.BrokenLinks.Add(file.Link)

View File

@@ -28,7 +28,7 @@ type HTTPClient struct {
maxRetries int maxRetries int
timeoutSecs int timeoutSecs int
backoff func(attempt int) time.Duration backoff func(attempt int) time.Duration
getRetryIncr func(resp *http.Response, reqHasRangeHeader bool, err error) int shouldRetry func(resp *http.Response, reqHasRangeHeader bool, err error, rateLimitSleep int) int
bearerToken string bearerToken string
ensureIPv6Host bool ensureIPv6Host bool
cfg config.ConfigInterface cfg config.ConfigInterface
@@ -57,54 +57,8 @@ func NewHTTPClient(token string, maxRetries int, timeoutSecs int, ensureIPv6Host
client: &http.Client{}, client: &http.Client{},
maxRetries: maxRetries, maxRetries: maxRetries,
timeoutSecs: timeoutSecs, timeoutSecs: timeoutSecs,
backoff: func(attempt int) time.Duration { backoff: backoffFunc,
maxDuration := 60 shouldRetry: shouldRetryFunc,
backoff := int(math.Pow(2, float64(attempt)))
if backoff > maxDuration {
backoff = maxDuration
}
return time.Duration(backoff) * time.Second
},
getRetryIncr: func(resp *http.Response, reqHasRangeHeader bool, err error) int {
if err != nil && strings.HasPrefix(err.Error(), "api response error:") {
if apiErr, ok := err.(*ApiErrorResponse); ok {
switch apiErr.Code {
case -1: // Internal error
return 1
case 5: // Slow down (retry infinitely)
time.Sleep(time.Duration(cfg.GetRateLimitSleepSeconds()) * time.Second)
return -1
case 6: // Ressource unreachable
return 1
case 17: // Hoster in maintenance
return 1
case 19: // Hoster temporarily unavailable
return 1
case 25: // Service unavailable
return 1
case 34: // Too many requests (retry infinitely)
time.Sleep(time.Duration(cfg.GetRateLimitSleepSeconds()) * time.Second)
return -1
case 36: // Fair Usage Limit
return 1
default:
return 0 // don't retry
}
}
}
if resp != nil {
if resp.StatusCode == 429 {
time.Sleep(time.Duration(cfg.GetRateLimitSleepSeconds()) * time.Second)
return -1
}
if resp.Header.Get("Content-Range") == "" && reqHasRangeHeader {
time.Sleep(10 * time.Millisecond)
return -1
}
return 0 // don't retry
}
return 1
},
ensureIPv6Host: ensureIPv6Host, ensureIPv6Host: ensureIPv6Host,
cfg: cfg, cfg: cfg,
ipv6: cmap.New[string](), ipv6: cmap.New[string](),
@@ -120,7 +74,7 @@ func NewHTTPClient(token string, maxRetries int, timeoutSecs int, ensureIPv6Host
log.Errorf("Failed to parse proxy URL: %v", err) log.Errorf("Failed to parse proxy URL: %v", err)
return nil return nil
} }
dialer, err = proxyDialer(proxyURL) dialer, err = client.proxyDialer(proxyURL)
if err != nil { if err != nil {
log.Errorf("Failed to create proxy dialer: %v", err) log.Errorf("Failed to create proxy dialer: %v", err)
return nil return nil
@@ -170,7 +124,8 @@ func NewHTTPClient(token string, maxRetries int, timeoutSecs int, ensureIPv6Host
} }
} else { } else {
client.client.Transport = &http.Transport{ client.client.Transport = &http.Transport{
MaxIdleConns: maxConnections, ResponseHeaderTimeout: time.Duration(timeoutSecs) * time.Second,
MaxIdleConns: 0,
MaxConnsPerHost: maxConnections, MaxConnsPerHost: maxConnections,
DialContext: func(ctx context.Context, network, address string) (net.Conn, error) { DialContext: func(ctx context.Context, network, address string) (net.Conn, error) {
return dialer.Dial(network, address) return dialer.Dial(network, address)
@@ -186,16 +141,15 @@ func (r *HTTPClient) Do(req *http.Request) (*http.Response, error) {
req.Header.Set("Authorization", "Bearer "+r.bearerToken) req.Header.Set("Authorization", "Bearer "+r.bearerToken)
} }
// check if Range header is set // check if Range header is set
reqHasRangeHeader := req.Header.Get("Range") != "" && req.Header.Get("Range") != "bytes=0-" reqHasRangeHeader := req.Header.Get("Range") != "" && !strings.HasPrefix(req.Header.Get("Range"), "bytes=0-")
var resp *http.Response var resp *http.Response
var err error var err error
attempt := 0 attempt := 0
for { for {
r.replaceHostIfNeeded(req) r.replaceHostIfNeeded(req) // needed for ipv6
resp, err = r.client.Do(req) resp, err = r.client.Do(req)
if resp != nil && (resp.StatusCode < http.StatusOK || resp.StatusCode > http.StatusPartialContent) { if resp != nil && resp.StatusCode/100 >= 4 {
body, _ := io.ReadAll(resp.Body) body, _ := io.ReadAll(resp.Body)
if body != nil { if body != nil {
var errResp ApiErrorResponse var errResp ApiErrorResponse
@@ -206,59 +160,110 @@ func (r *HTTPClient) Do(req *http.Request) (*http.Response, error) {
} }
} }
} }
incr := r.getRetryIncr(resp, reqHasRangeHeader, err) incr := r.shouldRetry(resp, reqHasRangeHeader, err, r.cfg.GetRateLimitSleepSeconds())
if incr > 0 { if incr > 0 {
attempt += incr attempt += incr
if attempt > r.maxRetries { if attempt > r.maxRetries {
break break
} }
if incr > 0 {
time.Sleep(r.backoff(attempt)) time.Sleep(r.backoff(attempt))
}
} else if incr == 0 { } else if incr == 0 {
time.Sleep(10 * time.Millisecond)
} else {
// don't retry anymore // don't retry anymore
break break
} }
// if incr < 0, retry infinitely
} }
return resp, err return resp, err
} }
func (r *HTTPClient) replaceHostIfNeeded(req *http.Request) { func (r *HTTPClient) replaceHostIfNeeded(req *http.Request) {
if !r.ensureIPv6Host && !r.cfg.ShouldForceIPv6() || !strings.HasSuffix(req.URL.Host, "download.real-debrid.com") { if !r.ensureIPv6Host && !r.cfg.ShouldForceIPv6() || !strings.HasSuffix(req.Host, "download.real-debrid.com") {
return return
} }
// get subdomain of req.URL.Host // get subdomain of req.Host
subdomain := strings.Split(req.URL.Host, ".")[0] subdomain := strings.Split(req.Host, ".")[0]
// check if subdomain is numeric // check if subdomain is numeric
_, err := strconv.Atoi(subdomain) _, err := strconv.Atoi(subdomain)
if err == nil { if err == nil {
// subdomain is numeric, replace it with .cloud // subdomain is numeric, replace it with .cloud
req.URL.Host = strings.Replace(req.URL.Host, ".com", ".cloud", 1) req.Host = strings.Replace(req.Host, ".com", ".cloud", 1)
req.URL.Host = req.Host
} }
// check if host is in the list of IPv6 hosts // check if host is in the list of IPv6 hosts
found := false found := false
for _, h := range r.ipv6Hosts { for _, h := range r.ipv6Hosts {
if h == req.URL.Host { if h == req.Host {
found = true found = true
break break
} }
} }
if !found { if !found {
r.log.Debug("Not found, assigning random IPv6 host")
// random IPv6 host // random IPv6 host
req.URL.Host = r.ipv6Hosts[rand.Intn(len(r.ipv6Hosts))] req.Host = r.ipv6Hosts[rand.Intn(len(r.ipv6Hosts))]
req.URL.Host = req.Host
} }
fmt.Println(req.URL.Host)
} }
func proxyDialer(proxyURL *url.URL) (proxy.Dialer, error) { func (r *HTTPClient) proxyDialer(proxyURL *url.URL) (proxy.Dialer, error) {
if proxyURL.Scheme == "http" || proxyURL.Scheme == "https" { if proxyURL.Scheme == "http" || proxyURL.Scheme == "https" {
// Create a new HTTP proxy dialer httpProxyDialer := http_dialer.New(proxyURL, http_dialer.WithConnectionTimeout(time.Duration(r.timeoutSecs)*time.Second))
httpProxyDialer := http_dialer.New(proxyURL)
return httpProxyDialer, nil return httpProxyDialer, nil
} else if proxyURL.Scheme == "socks5" { } else if proxyURL.Scheme == "socks5" {
// For SOCKS5 proxies, use the proxy package's FromURL
return proxy.FromURL(proxyURL, proxy.Direct) return proxy.FromURL(proxyURL, proxy.Direct)
} }
return nil, fmt.Errorf("unsupported proxy scheme: %s", proxyURL.Scheme) return nil, fmt.Errorf("unsupported proxy scheme: %s", proxyURL.Scheme)
} }
func shouldRetryFunc(resp *http.Response, reqHasRangeHeader bool, err error, rateLimitSleep int) int {
if err != nil && strings.HasPrefix(err.Error(), "api response error:") {
if apiErr, ok := err.(*ApiErrorResponse); ok {
switch apiErr.Code {
case -1: // Internal error
return 1
case 5: // Slow down (retry infinitely)
time.Sleep(time.Duration(rateLimitSleep) * time.Second)
return 0
case 6: // Ressource unreachable
return 1
case 17: // Hoster in maintenance
return 1
case 18: // Hoster limit reached
return 1
case 19: // Hoster temporarily unavailable
return 1
case 25: // Service unavailable
return 1
case 34: // Too many requests (retry infinitely)
time.Sleep(time.Duration(rateLimitSleep) * time.Second)
return 0
case 36: // Fair Usage Limit
return 1
default:
return -1 // don't retry
}
}
}
if resp != nil {
if resp.StatusCode == 429 {
time.Sleep(time.Duration(rateLimitSleep) * time.Second)
return 0
}
if resp.StatusCode/100 == 2 && resp.Header.Get("Content-Range") == "" && reqHasRangeHeader {
time.Sleep(10 * time.Millisecond)
return 0
}
return -1 // don't retry
}
return 1
}
func backoffFunc(attempt int) time.Duration {
maxDuration := 60
backoff := int(math.Pow(2, float64(attempt)))
if backoff > maxDuration {
backoff = maxDuration
}
return time.Duration(backoff) * time.Second
}

View File

@@ -19,7 +19,7 @@ func (rd *RealDebrid) CanFetchFirstByte(url string) bool {
defer resp.Body.Close() defer resp.Body.Close()
// If server supports partial content // If server supports partial content
if resp.StatusCode == http.StatusPartialContent || resp.StatusCode == http.StatusOK { if resp.StatusCode/100 == 2 {
buffer := make([]byte, 1) buffer := make([]byte, 1)
_, err = resp.Body.Read(buffer) _, err = resp.Body.Read(buffer)
if err == nil { if err == nil {