From a7fd68b3fda99925da87fc66a2580da790792106 Mon Sep 17 00:00:00 2001 From: Ben Sarmiento Date: Tue, 28 Nov 2023 01:06:11 +0100 Subject: [PATCH] Add configs --- config.example.yml | 2 ++ go.mod | 7 +----- go.sum | 6 ----- internal/config/types.go | 20 ++++++++++++++- internal/torrent/manager.go | 47 ++++++++++++++++++++---------------- internal/universal/get.go | 42 +++++++++++++++++++++++++++++--- pkg/realdebrid/api.go | 3 +-- pkg/realdebrid/unrestrict.go | 2 +- 8 files changed, 89 insertions(+), 40 deletions(-) diff --git a/config.example.yml b/config.example.yml index ba8232e..b3dc261 100644 --- a/config.example.yml +++ b/config.example.yml @@ -7,6 +7,8 @@ host: "[::]" # do not change this if you are running it inside a docker containe port: 9999 # do not change this if you are running it inside a docker container concurrent_workers: 200 check_for_changes_every_secs: 15 +unrestrict_workers: 10 # since unrestricting has a different rate limit, use a different worker pool. decrease this if you are getting 429s +release_unrestrict_after_ms: 100 # wait time for every unrestrict worker to be released. increase this if you are getting 429s enable_repair: true # BEWARE! THERE CAN ONLY BE 1 INSTANCE OF ZURG THAT SHOULD REPAIR YOUR TORRENTS retain_folder_name_extension: false # if true, zurg won't modify the filenames from real-debrid diff --git a/go.mod b/go.mod index b938258..f457ffa 100644 --- a/go.mod +++ b/go.mod @@ -3,9 +3,7 @@ module github.com/debridmediamanager.com/zurg go 1.21.3 require ( - github.com/hashicorp/golang-lru/v2 v2.0.7 go.uber.org/zap v1.26.0 - golang.org/x/sys v0.14.0 gopkg.in/yaml.v3 v3.0.1 ) @@ -13,7 +11,4 @@ require github.com/orcaman/concurrent-map/v2 v2.0.1 require github.com/panjf2000/ants/v2 v2.8.2 -require ( - github.com/winfsp/cgofuse v1.5.0 - go.uber.org/multierr v1.10.0 // indirect -) +require go.uber.org/multierr v1.10.0 // indirect diff --git a/go.sum b/go.sum index febb751..6f5023b 100644 --- a/go.sum +++ b/go.sum @@ -1,8 +1,6 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= -github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= github.com/orcaman/concurrent-map/v2 v2.0.1 h1:jOJ5Pg2w1oeB6PeDurIYf6k9PQ+aTITr/6lP/L/zp6c= github.com/orcaman/concurrent-map/v2 v2.0.1/go.mod h1:9Eq3TG2oBe5FirmYWQfYO5iH1q0Jv47PLaNK++uCdOM= github.com/panjf2000/ants/v2 v2.8.2 h1:D1wfANttg8uXhC9149gRt1PDQ+dLVFjNXkCEycMcvQQ= @@ -16,8 +14,6 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8= github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= -github.com/winfsp/cgofuse v1.5.0 h1:MsBP7Mi/LiJf/7/F3O/7HjjR009ds6KCdqXzKpZSWxI= -github.com/winfsp/cgofuse v1.5.0/go.mod h1:h3awhoUOcn2VYVKCwDaYxSLlZwnyK+A8KaDoLUp2lbU= go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk= go.uber.org/goleak v1.2.0/go.mod h1:XJYK+MuIchqpmGmUSAzotztawfKvYLUIgg7guXrwVUo= go.uber.org/multierr v1.10.0 h1:S0h4aNzvfcFsC3dRF1jLoaov7oRaKqRGC/pUEJ2yvPQ= @@ -26,8 +22,6 @@ go.uber.org/zap v1.26.0 h1:sI7k6L95XOKS281NhVKOFCUNIvv9e0w4BF8N3u+tCRo= go.uber.org/zap v1.26.0/go.mod h1:dtElttAiwGvoJ/vj4IwHBS/gXsEu/pZ50mUIRWuG0so= golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E= golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= -golang.org/x/sys v0.14.0 h1:Vz7Qs629MkJkGyHxUlRHizWJRG2j8fbQKjELVSNhy7Q= -golang.org/x/sys v0.14.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/config/types.go b/internal/config/types.go index 0734aee..5d31034 100644 --- a/internal/config/types.go +++ b/internal/config/types.go @@ -20,6 +20,8 @@ type ConfigInterface interface { GetRandomPreferredHost() string ShouldServeFromRclone() bool ShouldForceIPv6() bool + GetUnrestrictWorkers() int + GetReleaseUnrestrictAfterMs() int } type ZurgConfig struct { @@ -28,6 +30,8 @@ type ZurgConfig struct { Host string `yaml:"host"` Port string `yaml:"port"` NumOfWorkers int `yaml:"concurrent_workers"` + UnrestrictWorkers int `yaml:"unrestrict_workers"` + ReleaseUnrestrictAfterMs int `yaml:"release_unrestrict_after_ms"` RefreshEverySeconds int `yaml:"check_for_changes_every_secs"` CanRepair bool `yaml:"enable_repair"` OnLibraryUpdate string `yaml:"on_library_update"` @@ -60,7 +64,7 @@ func (z *ZurgConfig) GetPort() string { func (z *ZurgConfig) GetNumOfWorkers() int { if z.NumOfWorkers == 0 { - return 10 + return 50 } return z.NumOfWorkers } @@ -117,3 +121,17 @@ func (z *ZurgConfig) ShouldServeFromRclone() bool { func (z *ZurgConfig) ShouldForceIPv6() bool { return z.ForceIPv6 } + +func (z *ZurgConfig) GetUnrestrictWorkers() int { + if z.UnrestrictWorkers == 0 { + return 20 + } + return z.UnrestrictWorkers +} + +func (z *ZurgConfig) GetReleaseUnrestrictAfterMs() int { + if z.ReleaseUnrestrictAfterMs == 0 { + return 100 + } + return z.ReleaseUnrestrictAfterMs +} diff --git a/internal/torrent/manager.go b/internal/torrent/manager.go index 1152b4a..1ff8dbe 100644 --- a/internal/torrent/manager.go +++ b/internal/torrent/manager.go @@ -32,7 +32,7 @@ type TorrentManager struct { latestAdded string requiredVersion string cfg config.ConfigInterface - api *realdebrid.RealDebrid + Api *realdebrid.RealDebrid antsPool *ants.Pool unrestrictPool *ants.Pool log *zap.SugaredLogger @@ -47,13 +47,13 @@ func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, p cfg: cfg, DirectoryMap: cmap.New[cmap.ConcurrentMap[string, *Torrent]](), requiredVersion: "18.11.2023", - api: api, + Api: api, antsPool: p, log: log, mu: &sync.Mutex{}, } - unrestrictPool, err := ants.NewPool(10) + unrestrictPool, err := ants.NewPool(t.cfg.GetUnrestrictWorkers()) if err != nil { t.unrestrictPool = t.antsPool } else { @@ -77,7 +77,7 @@ func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, p // Fetch downloads go func() { defer initWait.Done() - downloads, _, err := t.api.GetDownloads() + downloads, _, err := t.Api.GetDownloads() if err != nil { t.log.Fatalf("Cannot get downloads: %v\n", err) } @@ -93,7 +93,7 @@ func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, p var newTorrents []realdebrid.Torrent go func() { defer initWait.Done() - newTorrents, _, err = t.api.GetTorrents(0) + newTorrents, _, err = t.Api.GetTorrents(0) if err != nil { t.log.Fatalf("Cannot get torrents: %v\n", err) } @@ -207,8 +207,8 @@ func (t *TorrentManager) mergeToMain(mainTorrent, torrentToMerge *Torrent) *Torr func (t *TorrentManager) UnrestrictUntilOk(link string) *realdebrid.Download { retChan := make(chan *realdebrid.Download, 1) t.unrestrictPool.Submit(func() { - retChan <- t.api.UnrestrictUntilOk(link, t.cfg.ShouldServeFromRclone()) - time.Sleep(1 * time.Second) + retChan <- t.Api.UnrestrictUntilOk(link, t.cfg.ShouldServeFromRclone()) + time.Sleep(time.Duration(t.cfg.GetReleaseUnrestrictAfterMs()) * time.Millisecond) }) defer close(retChan) return <-retChan @@ -234,7 +234,7 @@ func (t *TorrentManager) getChecksum() string { // GetTorrents request go func() { - torrents, totalCount, err := t.api.GetTorrents(1) + torrents, totalCount, err := t.Api.GetTorrents(1) if err != nil { errChan <- err return @@ -244,7 +244,7 @@ func (t *TorrentManager) getChecksum() string { // GetActiveTorrentCount request go func() { - count, err := t.api.GetActiveTorrentCount() + count, err := t.Api.GetActiveTorrentCount() if err != nil { errChan <- err return @@ -287,7 +287,7 @@ func (t *TorrentManager) startRefreshJob() { continue } - newTorrents, _, err := t.api.GetTorrents(0) + newTorrents, _, err := t.Api.GetTorrents(0) if err != nil { t.log.Warnf("Cannot get torrents: %v\n", err) continue @@ -422,7 +422,7 @@ func (t *TorrentManager) getMoreInfo(rdTorrent realdebrid.Torrent) *Torrent { torrentFromFile = nil } if info == nil { - info, err = t.api.GetTorrentInfo(rdTorrent.ID) + info, err = t.Api.GetTorrentInfo(rdTorrent.ID) if err != nil { t.log.Warnf("Cannot get info for id=%s: %v\n", rdTorrent.ID, err) return nil @@ -544,7 +544,12 @@ func (t *TorrentManager) organizeChaos(links []string, selectedFiles []*File) ([ // Use the existing worker pool to submit tasks _ = t.unrestrictPool.Submit(func() { defer wg.Done() - resp := t.api.UnrestrictUntilOk(link, t.cfg.ShouldServeFromRclone()) + if t.DownloadCache.Has(link) { + download, _ := t.DownloadCache.Get(link) + resultsChan <- Result{Response: download} + return + } + resp := t.Api.UnrestrictUntilOk(link, t.cfg.ShouldServeFromRclone()) resultsChan <- Result{Response: resp} time.Sleep(1 * time.Second) }) @@ -634,7 +639,7 @@ func (t *TorrentManager) Delete(accessKey string) { if torrent, ok := allTorrents.Get(accessKey); ok { for _, instance := range torrent.Instances { infoCache.Remove(instance.ID) - t.api.DeleteTorrent(instance.ID) + t.Api.DeleteTorrent(instance.ID) } } t.DirectoryMap.IterCb(func(_ string, torrents cmap.ConcurrentMap[string, *Torrent]) { @@ -779,7 +784,7 @@ func (t *TorrentManager) reinsertTorrent(torrent *Torrent, missingFiles string) } // redownload torrent - resp, err := t.api.AddMagnetHash(torrent.Instances[0].Hash) + resp, err := t.Api.AddMagnetHash(torrent.Instances[0].Hash) if err != nil { t.log.Warnf("Cannot redownload torrent: %v", err) return false @@ -788,25 +793,25 @@ func (t *TorrentManager) reinsertTorrent(torrent *Torrent, missingFiles string) // select files newTorrentID := resp.ID - err = t.api.SelectTorrentFiles(newTorrentID, missingFiles) + err = t.Api.SelectTorrentFiles(newTorrentID, missingFiles) if err != nil { t.log.Warnf("Cannot start redownloading: %v", err) - t.api.DeleteTorrent(newTorrentID) + t.Api.DeleteTorrent(newTorrentID) return false } time.Sleep(10 * time.Second) // see if the torrent is ready - info, err := t.api.GetTorrentInfo(newTorrentID) + info, err := t.Api.GetTorrentInfo(newTorrentID) if err != nil { t.log.Warnf("Cannot get info on redownloaded torrent id=%s : %v", newTorrentID, err) - t.api.DeleteTorrent(newTorrentID) + t.Api.DeleteTorrent(newTorrentID) return false } if info.Status == "magnet_error" || info.Status == "error" || info.Status == "virus" || info.Status == "dead" { t.log.Warnf("The redownloaded torrent id=%s is in error state: %s", newTorrentID, info.Status) - t.api.DeleteTorrent(newTorrentID) + t.Api.DeleteTorrent(newTorrentID) return false } @@ -818,7 +823,7 @@ func (t *TorrentManager) reinsertTorrent(torrent *Torrent, missingFiles string) missingCount := len(strings.Split(missingFiles, ",")) if len(info.Links) != missingCount { t.log.Infof("It did not fix the issue for id=%s, only got %d files but we need %d, undoing", info.ID, len(info.Links), missingCount) - t.api.DeleteTorrent(newTorrentID) + t.Api.DeleteTorrent(newTorrentID) return false } @@ -833,7 +838,7 @@ func (t *TorrentManager) canCapacityHandle() bool { const maxDelay = 60 * time.Second retryCount := 0 for { - count, err := t.api.GetActiveTorrentCount() + count, err := t.Api.GetActiveTorrentCount() if err != nil { t.log.Warnf("Cannot get active downloads count: %v", err) if retryCount >= maxRetries { diff --git a/internal/universal/get.go b/internal/universal/get.go index d2cbac4..9786258 100644 --- a/internal/universal/get.go +++ b/internal/universal/get.go @@ -1,6 +1,7 @@ package universal import ( + "fmt" "io" "net/http" "net/url" @@ -79,12 +80,14 @@ func (gf *GetFile) HandleGetRequest(w http.ResponseWriter, r *http.Request, t *i link := file.Link if download, exists := t.DownloadCache.Get(link); exists { - if c.ShouldServeFromRclone() { + if c.ShouldServeFromRclone() && t.Api.CanFetchFirstByte(download.Download) { redirect(w, r, download.Download, c) } else { - gf.streamFileToResponse(file, download.Download, w, r, t, c, log) + err := gf.streamCachedLinkToResponse(download.Download, w, r, t, c, log) + if err == nil { + return + } } - return } resp := t.UnrestrictUntilOk(link) @@ -121,6 +124,39 @@ func (gf *GetFile) HandleGetRequest(w http.ResponseWriter, r *http.Request, t *i } } +func (gf *GetFile) streamCachedLinkToResponse(url string, w http.ResponseWriter, r *http.Request, torMgr *intTor.TorrentManager, cfg config.ConfigInterface, log *zap.SugaredLogger) error { + // Create a new request for the file download. + req, err := http.NewRequest(http.MethodGet, url, nil) + if err != nil { + return fmt.Errorf("file is not available") + } + + // copy range header if it exists + if r.Header.Get("Range") != "" { + req.Header.Add("Range", r.Header.Get("Range")) + } + + resp, err := gf.client.Do(req) + if err != nil { + return fmt.Errorf("file is not available") + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusPartialContent { + return fmt.Errorf("file is not available") + } + + for k, vv := range resp.Header { + for _, v := range vv { + w.Header().Add(k, v) + } + } + + buf := make([]byte, cfg.GetNetworkBufferSize()) + io.CopyBuffer(w, resp.Body, buf) + return nil +} + func (gf *GetFile) streamFileToResponse(file *intTor.File, url string, w http.ResponseWriter, r *http.Request, torMgr *intTor.TorrentManager, cfg config.ConfigInterface, log *zap.SugaredLogger) { // Create a new request for the file download. req, err := http.NewRequest(http.MethodGet, url, nil) diff --git a/pkg/realdebrid/api.go b/pkg/realdebrid/api.go index f34b7af..e808cc9 100644 --- a/pkg/realdebrid/api.go +++ b/pkg/realdebrid/api.go @@ -266,7 +266,6 @@ func (rd *RealDebrid) GetActiveTorrentCount() (*ActiveTorrentCountResponse, erro } func (rd *RealDebrid) UnrestrictLink(link string, checkFirstByte bool) (*Download, error) { - fmt.Println("Unrestricting link via api", link) data := url.Values{} data.Set("link", link) @@ -304,7 +303,7 @@ func (rd *RealDebrid) UnrestrictLink(link string, checkFirstByte bool) (*Downloa return nil, fmt.Errorf("undecodable response so likely it has expired") } - if checkFirstByte && !rd.canFetchFirstByte(response.Download) { + if checkFirstByte && !rd.CanFetchFirstByte(response.Download) { return nil, fmt.Errorf("can't fetch first byte") } diff --git a/pkg/realdebrid/unrestrict.go b/pkg/realdebrid/unrestrict.go index 69d900b..0a2d9d8 100644 --- a/pkg/realdebrid/unrestrict.go +++ b/pkg/realdebrid/unrestrict.go @@ -16,7 +16,7 @@ func (rd *RealDebrid) UnrestrictUntilOk(link string, serveFromRclone bool) *Down return nil } -func (rd *RealDebrid) canFetchFirstByte(url string) bool { +func (rd *RealDebrid) CanFetchFirstByte(url string) bool { req, err := http.NewRequest("GET", url, nil) if err != nil { return false