its the fastest ever

This commit is contained in:
Ben Sarmiento
2024-01-28 02:23:19 +01:00
parent abdc8fcbb0
commit f07b65d5da
7 changed files with 70 additions and 66 deletions

View File

@@ -44,7 +44,7 @@ func MainApp(configPath string) {
os.Exit(1) os.Exit(1)
} }
apiClient := http.NewHTTPClient(config.GetToken(), config.GetRetriesUntilFailed(), config.GetRealDebridTimeout(), false, config, log.Named("httpclient")) apiClient := http.NewHTTPClient(config.GetToken(), config.GetRetriesUntilFailed(), config.GetApiTimeoutSecs(), false, config, log.Named("httpclient"))
rd := realdebrid.NewRealDebrid(apiClient, log.Named("realdebrid")) rd := realdebrid.NewRealDebrid(apiClient, log.Named("realdebrid"))
@@ -71,8 +71,8 @@ func MainApp(configPath string) {
utils.EnsureDirExists("data") // Ensure the data directory exists utils.EnsureDirExists("data") // Ensure the data directory exists
torrentMgr := torrent.NewTorrentManager(config, rd, workerPool, repairPool, log.Named("manager")) torrentMgr := torrent.NewTorrentManager(config, rd, workerPool, repairPool, log.Named("manager"))
downloadClient := http.NewHTTPClient(config.GetToken(), config.GetRetriesUntilFailed(), 0, true, config, log.Named("dlclient")) downloadClient := http.NewHTTPClient(config.GetToken(), config.GetRetriesUntilFailed(), config.GetDownloadTimeoutSecs(), true, config, log.Named("dlclient"))
downloader := universal.NewDownloader(downloadClient, config.GetRealDebridTimeout()) downloader := universal.NewDownloader(downloadClient)
router := chi.NewRouter() router := chi.NewRouter()
handlers.AttachHandlers(router, downloader, torrentMgr, config, rd, log.Named("router")) handlers.AttachHandlers(router, downloader, torrentMgr, config, rd, log.Named("router"))

View File

@@ -5,8 +5,8 @@ type ConfigInterface interface {
GetVersion() string GetVersion() string
GetToken() string GetToken() string
GetNumOfWorkers() int GetNumOfWorkers() int
GetRefreshEverySeconds() int GetRefreshEverySecs() int
GetRepairEveryMinutes() int GetRepairEveryMins() int
EnableRepair() bool EnableRepair() bool
GetHost() string GetHost() string
GetPort() string GetPort() string
@@ -23,10 +23,11 @@ type ConfigInterface interface {
ShouldServeFromRclone() bool ShouldServeFromRclone() bool
ShouldVerifyDownloadLink() bool ShouldVerifyDownloadLink() bool
ShouldForceIPv6() bool ShouldForceIPv6() bool
GetRealDebridTimeout() int GetApiTimeoutSecs() int
GetDownloadTimeoutSecs() int
GetRetriesUntilFailed() int GetRetriesUntilFailed() int
EnableDownloadMount() bool EnableDownloadMount() bool
GetRateLimitSleepSeconds() int GetRateLimitSleepSecs() int
ShouldDeleteRarFiles() bool ShouldDeleteRarFiles() bool
} }
@@ -34,14 +35,14 @@ type ZurgConfig struct {
Version string `yaml:"zurg" json:"-"` Version string `yaml:"zurg" json:"-"`
Token string `yaml:"token" json:"-"` Token string `yaml:"token" json:"-"`
Host string `yaml:"host" json:"host"` Host string `yaml:"host" json:"host"`
Port string `yaml:"port" json:"port"` Port string `yaml:"port" json:"port"`
Username string `yaml:"username" json:"username"` Username string `yaml:"username" json:"username"`
Password string `yaml:"password" json:"password"` Password string `yaml:"password" json:"password"`
Proxy string `yaml:"proxy" json:"proxy"` Proxy string `yaml:"proxy" json:"proxy"`
NumOfWorkers int `yaml:"concurrent_workers" json:"concurrent_workers"` NumOfWorkers int `yaml:"concurrent_workers" json:"concurrent_workers"`
RefreshEverySeconds int `yaml:"check_for_changes_every_secs" json:"check_for_changes_every_secs"` RefreshEverySecs int `yaml:"check_for_changes_every_secs" json:"check_for_changes_every_secs"`
RepairEveryMins int `yaml:"repair_every_mins" json:"repair_every_mins"` RepairEveryMins int `yaml:"repair_every_mins" json:"repair_every_mins"`
IgnoreRenames bool `yaml:"ignore_renames" json:"ignore_renames"` IgnoreRenames bool `yaml:"ignore_renames" json:"ignore_renames"`
RetainRDTorrentName bool `yaml:"retain_rd_torrent_name" json:"retain_rd_torrent_name"` RetainRDTorrentName bool `yaml:"retain_rd_torrent_name" json:"retain_rd_torrent_name"`
@@ -50,14 +51,15 @@ type ZurgConfig struct {
CanRepair bool `yaml:"enable_repair" json:"enable_repair"` CanRepair bool `yaml:"enable_repair" json:"enable_repair"`
DeleteRarFiles bool `yaml:"auto_delete_rar_torrents" json:"auto_delete_rar_torrents"` DeleteRarFiles bool `yaml:"auto_delete_rar_torrents" json:"auto_delete_rar_torrents"`
RealDebridTimeout int `yaml:"realdebrid_timeout_secs" json:"realdebrid_timeout_secs"` ApiTimeoutSecs int `yaml:"api_timeout_secs" json:"api_timeout_secs"`
DownloadMount bool `yaml:"enable_download_mount" json:"enable_download_mount"` DownloadTimeoutSecs int `yaml:"download_timeout_secs" json:"download_timeout_secs"`
RateLimitSleepSeconds int `yaml:"rate_limit_sleep_secs" json:"rate_limit_sleep_secs"` DownloadMount bool `yaml:"enable_download_mount" json:"enable_download_mount"`
RetriesUntilFailed int `yaml:"retries_until_failed" json:"retries_until_failed"` RateLimitSleepSecs int `yaml:"rate_limit_sleep_secs" json:"rate_limit_sleep_secs"`
NetworkBufferSize int `yaml:"network_buffer_size" json:"network_buffer_size"` RetriesUntilFailed int `yaml:"retries_until_failed" json:"retries_until_failed"`
ServeFromRclone bool `yaml:"serve_from_rclone" json:"serve_from_rclone"` NetworkBufferSize int `yaml:"network_buffer_size" json:"network_buffer_size"`
VerifyDownloadLink bool `yaml:"verify_download_link" json:"verify_download_link"` ServeFromRclone bool `yaml:"serve_from_rclone" json:"serve_from_rclone"`
ForceIPv6 bool `yaml:"force_ipv6" json:"force_ipv6"` VerifyDownloadLink bool `yaml:"verify_download_link" json:"verify_download_link"`
ForceIPv6 bool `yaml:"force_ipv6" json:"force_ipv6"`
OnLibraryUpdate string `yaml:"on_library_update" json:"on_library_update"` OnLibraryUpdate string `yaml:"on_library_update" json:"on_library_update"`
} }
@@ -103,14 +105,14 @@ func (z *ZurgConfig) GetNumOfWorkers() int {
return z.NumOfWorkers return z.NumOfWorkers
} }
func (z *ZurgConfig) GetRefreshEverySeconds() int { func (z *ZurgConfig) GetRefreshEverySecs() int {
if z.RefreshEverySeconds == 0 { if z.RefreshEverySecs == 0 {
return 60 return 60
} }
return z.RefreshEverySeconds return z.RefreshEverySecs
} }
func (z *ZurgConfig) GetRepairEveryMinutes() int { func (z *ZurgConfig) GetRepairEveryMins() int {
if z.RepairEveryMins == 0 { if z.RepairEveryMins == 0 {
return 60 return 60
} }
@@ -167,18 +169,25 @@ func (z *ZurgConfig) EnableDownloadMount() bool {
return z.DownloadMount return z.DownloadMount
} }
func (z *ZurgConfig) GetRealDebridTimeout() int { func (z *ZurgConfig) GetApiTimeoutSecs() int {
if z.RealDebridTimeout == 0 { if z.ApiTimeoutSecs == 0 {
return 4 return 4
} }
return z.RealDebridTimeout return z.ApiTimeoutSecs
} }
func (z *ZurgConfig) GetRateLimitSleepSeconds() int { func (z *ZurgConfig) GetDownloadTimeoutSecs() int {
if z.RateLimitSleepSeconds == 0 { if z.DownloadTimeoutSecs == 0 {
return 2
}
return z.DownloadTimeoutSecs
}
func (z *ZurgConfig) GetRateLimitSleepSecs() int {
if z.RateLimitSleepSecs == 0 {
return 4 return 4
} }
return z.RateLimitSleepSeconds return z.RateLimitSleepSecs
} }
func (z *ZurgConfig) ShouldDeleteRarFiles() bool { func (z *ZurgConfig) ShouldDeleteRarFiles() bool {

View File

@@ -179,8 +179,8 @@ func (zr *Handlers) handleHome(resp http.ResponseWriter, req *http.Request) {
<td>%d</td> <td>%d</td>
</tr> </tr>
<tr> <tr>
<td>Refresh Every Seconds</td> <td>Refresh Every...</td>
<td>%d</td> <td>%d secs</td>
</tr> </tr>
<tr> <tr>
<td>Retain RD Torrent Name</td> <td>Retain RD Torrent Name</td>
@@ -199,16 +199,20 @@ func (zr *Handlers) handleHome(resp http.ResponseWriter, req *http.Request) {
<td>%t</td> <td>%t</td>
</tr> </tr>
<tr> <tr>
<td>RealDebrid Timeout</td> <td>API Timeout</td>
<td>%d</td> <td>%d secs</td>
</tr>
<tr>
<td>Download Timeout</td>
<td>%d secs</td>
</tr> </tr>
<tr> <tr>
<td>Use Download Mount</td> <td>Use Download Mount</td>
<td>%t</td> <td>%t</td>
</tr> </tr>
<tr> <tr>
<td>Rate Limit Sleep Seconds</td> <td>Rate Limit Sleep for...</td>
<td>%d</td> <td>%d secs</td>
</tr> </tr>
<tr> <tr>
<td>Retries Until Failed</td> <td>Retries Until Failed</td>
@@ -270,14 +274,15 @@ func (zr *Handlers) handleHome(resp http.ResponseWriter, req *http.Request) {
response.Config.GetHost(), response.Config.GetHost(),
response.Config.GetPort(), response.Config.GetPort(),
response.Config.GetNumOfWorkers(), response.Config.GetNumOfWorkers(),
response.Config.GetRefreshEverySeconds(), response.Config.GetRefreshEverySecs(),
response.Config.EnableRetainRDTorrentName(), response.Config.EnableRetainRDTorrentName(),
response.Config.EnableRetainFolderNameExtension(), response.Config.EnableRetainFolderNameExtension(),
response.Config.EnableRepair(), response.Config.EnableRepair(),
response.Config.ShouldDeleteRarFiles(), response.Config.ShouldDeleteRarFiles(),
response.Config.GetRealDebridTimeout(), response.Config.GetApiTimeoutSecs(),
response.Config.GetDownloadTimeoutSecs(),
response.Config.EnableDownloadMount(), response.Config.EnableDownloadMount(),
response.Config.GetRateLimitSleepSeconds(), response.Config.GetRateLimitSleepSecs(),
response.Config.GetRetriesUntilFailed(), response.Config.GetRetriesUntilFailed(),
response.Config.GetNetworkBufferSize(), response.Config.GetNetworkBufferSize(),
response.Config.ShouldServeFromRclone(), response.Config.ShouldServeFromRclone(),

View File

@@ -124,7 +124,7 @@ func (t *TorrentManager) startRefreshJob() {
_ = t.workerPool.Submit(func() { _ = t.workerPool.Submit(func() {
t.log.Info("Starting periodic refresh job") t.log.Info("Starting periodic refresh job")
for { for {
<-time.After(time.Duration(t.Config.GetRefreshEverySeconds()) * time.Second) <-time.After(time.Duration(t.Config.GetRefreshEverySecs()) * time.Second)
checksum := t.getCurrentState() checksum := t.getCurrentState()
if t.latestState.equal(checksum) { if t.latestState.equal(checksum) {

View File

@@ -26,7 +26,7 @@ func (t *TorrentManager) startRepairJob() {
// there is 1 repair worker, with max 1 blocking task // there is 1 repair worker, with max 1 blocking task
_ = t.repairPool.Submit(func() { _ = t.repairPool.Submit(func() {
t.log.Info("Starting periodic repair job") t.log.Info("Starting periodic repair job")
repairTicker := time.NewTicker(time.Duration(t.Config.GetRepairEveryMinutes()) * time.Minute) repairTicker := time.NewTicker(time.Duration(t.Config.GetRepairEveryMins()) * time.Minute)
defer repairTicker.Stop() defer repairTicker.Stop()
for { for {

View File

@@ -14,14 +14,12 @@ import (
) )
type Downloader struct { type Downloader struct {
client *zurghttp.HTTPClient client *zurghttp.HTTPClient
timeoutSecs int
} }
func NewDownloader(client *zurghttp.HTTPClient, timeoutSecs int) *Downloader { func NewDownloader(client *zurghttp.HTTPClient) *Downloader {
return &Downloader{ return &Downloader{
client: client, client: client,
timeoutSecs: timeoutSecs,
} }
} }
@@ -157,7 +155,6 @@ func (dl *Downloader) streamFileToResponse(torrent *intTor.Torrent, file *intTor
if req.Header.Get("Range") != "" { if req.Header.Get("Range") != "" {
dlReq.Header.Add("Range", req.Header.Get("Range")) dlReq.Header.Add("Range", req.Header.Get("Range"))
rangeLog = " (range: " + req.Header.Get("Range") + ")" rangeLog = " (range: " + req.Header.Get("Range") + ")"
} }
if torrent != nil { if torrent != nil {
@@ -166,12 +163,7 @@ func (dl *Downloader) streamFileToResponse(torrent *intTor.Torrent, file *intTor
log.Debugf("Downloading unrestricted link %s (%s)%s", unrestrict.Download, unrestrict.Link, rangeLog) log.Debugf("Downloading unrestricted link %s (%s)%s", unrestrict.Download, unrestrict.Link, rangeLog)
} }
// timeout := time.Duration(dl.timeoutSecs) * time.Second
// ctx, cancel := context.WithTimeout(context.TODO(), timeout)
// dlReq = dlReq.WithContext(ctx)
download, err := dl.client.Do(dlReq) download, err := dl.client.Do(dlReq)
if err != nil { if err != nil {
log.Warnf("Cannot download file %s: %v", unrestrict.Download, err) log.Warnf("Cannot download file %s: %v", unrestrict.Download, err)
if file != nil && unrestrict.Streamable == 1 { if file != nil && unrestrict.Streamable == 1 {
@@ -185,7 +177,6 @@ func (dl *Downloader) streamFileToResponse(torrent *intTor.Torrent, file *intTor
http.Error(resp, "File is not available", http.StatusNotFound) http.Error(resp, "File is not available", http.StatusNotFound)
return return
} }
defer download.Body.Close() defer download.Body.Close()
if download.StatusCode/100 != 2 { if download.StatusCode/100 != 2 {

View File

@@ -144,20 +144,16 @@ func (r *HTTPClient) Do(req *http.Request) (*http.Response, error) {
var resp *http.Response var resp *http.Response
var err error var err error
attempt := 0 attempt := 0
for { for {
if resp != nil && resp.Body != nil {
resp.Body.Close()
}
r.replaceHostIfNeeded(req) // needed for ipv6 r.replaceHostIfNeeded(req) // needed for ipv6
if !strings.Contains(req.URL.Host, "api.real-debrid.com") { if !strings.Contains(req.URL.Host, "api.real-debrid.com") {
r.log.Debugf("downloading %s", req.URL) r.log.Debugf("downloading %s", req.URL)
} }
resp, err = r.client.Do(req) resp, err = r.client.Do(req)
// check if error is context deadline exceeded
if r.ensureIPv6Host && r.cfg.ShouldForceIPv6() && err != nil && strings.Contains(err.Error(), "context deadline exceeded") {
attempt += 1
if attempt > r.maxRetries {
break
}
continue
}
if resp != nil && resp.StatusCode/100 >= 4 { if resp != nil && resp.StatusCode/100 >= 4 {
body, _ := io.ReadAll(resp.Body) body, _ := io.ReadAll(resp.Body)
if body != nil { if body != nil {
@@ -169,7 +165,7 @@ func (r *HTTPClient) Do(req *http.Request) (*http.Response, error) {
} }
} }
} }
incr := r.shouldRetry(resp, reqHasRangeHeader, err, r.cfg.GetRateLimitSleepSeconds()) incr := r.shouldRetry(resp, reqHasRangeHeader, err, r.cfg.GetRateLimitSleepSecs())
if incr > 0 { if incr > 0 {
attempt += incr attempt += incr
if attempt > r.maxRetries { if attempt > r.maxRetries {
@@ -255,6 +251,9 @@ func (r *HTTPClient) shouldRetry(resp *http.Response, reqHasRangeHeader bool, er
} }
} }
} }
if err != nil && strings.Contains(err.Error(), "timeout awaiting response headers") {
return 1
}
if resp != nil { if resp != nil {
if resp.StatusCode == 429 { if resp.StatusCode == 429 {
time.Sleep(time.Duration(rateLimitSleep) * time.Second) time.Sleep(time.Duration(rateLimitSleep) * time.Second)
@@ -279,7 +278,7 @@ func backoffFunc(attempt int) time.Duration {
} }
func (r *HTTPClient) CanFetchFirstByte(url string) bool { func (r *HTTPClient) CanFetchFirstByte(url string) bool {
timeout := time.Duration(r.cfg.GetRealDebridTimeout()) * time.Second timeout := time.Duration(r.timeoutSecs) * time.Second
ctx, cancel := context.WithTimeout(context.Background(), timeout) ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel() defer cancel()
req, err := http.NewRequest("GET", url, nil) req, err := http.NewRequest("GET", url, nil)