From db086b19b340a0821b07eca22f2aa01f8995d46d Mon Sep 17 00:00:00 2001 From: Ben Adrian Sarmiento Date: Fri, 7 Jun 2024 19:19:18 +0200 Subject: [PATCH] Remove get torrents count config and use worker pool on all places --- internal/app.go | 42 ++++++++++--------- internal/config/types.go | 10 ----- internal/handlers/home.go | 7 +--- internal/torrent/manager.go | 6 +-- internal/torrent/refresh.go | 6 +-- internal/torrent/repair.go | 6 +-- pkg/premium/monitor.go | 7 ++-- pkg/realdebrid/api.go | 21 ++++++---- sample.yml | 83 +++++++++++++++++++++++++++++++++++++ 9 files changed, 131 insertions(+), 57 deletions(-) create mode 100644 sample.yml diff --git a/internal/app.go b/internal/app.go index 15a9abb..c631192 100644 --- a/internal/app.go +++ b/internal/app.go @@ -54,7 +54,7 @@ func MainApp(configPath string) { apiClient := http.NewHTTPClient( config.GetToken(), - config.GetRetriesUntilFailed(), // default retries = 2, so this is 4 + config.GetRetriesUntilFailed(), // default retries = 2 config.GetApiTimeoutSecs(), // default api timeout = 60 false, // ipv6 support is not needed for api client config, @@ -79,19 +79,6 @@ func MainApp(configPath string) { log.Named("download_client"), ) - api := realdebrid.NewRealDebrid( - apiClient, - unrestrictClient, - downloadClient, - config, - log.Named("realdebrid"), - ) - - premium.MonitorPremiumStatus( - api, - zurglog, - ) - workerPool, err := ants.NewPool(config.GetNumOfWorkers()) if err != nil { zurglog.Errorf("Failed to create worker pool: %v", err) @@ -99,6 +86,21 @@ func MainApp(configPath string) { } defer workerPool.Release() + api := realdebrid.NewRealDebrid( + apiClient, + unrestrictClient, + downloadClient, + workerPool, + config, + log.Named("realdebrid"), + ) + + premium.MonitorPremiumStatus( + workerPool, + api, + zurglog, + ) + torrentMgr := torrent.NewTorrentManager( config, api, @@ -121,12 +123,12 @@ func MainApp(configPath string) { ) //// pprof - // go func() { - // if err := netHttp.ListenAndServe(":6060", nil); err != nil && err != netHttp.ErrServerClosed { - // zurglog.Errorf("Failed to start pprof: %v", err) - // os.Exit(1) - // } - // }() + workerPool.Submit(func() { + if err := netHttp.ListenAndServe(":6060", nil); err != nil && err != netHttp.ErrServerClosed { + zurglog.Errorf("Failed to start pprof: %v", err) + os.Exit(1) + } + }) addr := fmt.Sprintf("%s:%s", config.GetHost(), config.GetPort()) zurglog.Infof("Starting server on %s", addr) diff --git a/internal/config/types.go b/internal/config/types.go index cb9f9ef..6bdf66e 100644 --- a/internal/config/types.go +++ b/internal/config/types.go @@ -36,7 +36,6 @@ type ConfigInterface interface { GetDownloadsEveryMins() int GetDumpTorrentsEveryMins() int GetPlayableExtensions() []string - GetTorrentsCount() int } type ZurgConfig struct { @@ -46,7 +45,6 @@ type ZurgConfig struct { ApiTimeoutSecs int `yaml:"api_timeout_secs" json:"api_timeout_secs"` CanRepair bool `yaml:"enable_repair" json:"enable_repair"` DownloadsEveryMins int `yaml:"downloads_every_mins" json:"downloads_every_mins"` - DownloadsLimit int `yaml:"downloads_limit" json:"downloads_limit"` DumpTorrentsEveryMins int `yaml:"dump_torrents_every_mins" json:"dump_torrents_every_mins"` DownloadTimeoutSecs int `yaml:"download_timeout_secs" json:"download_timeout_secs"` ForceIPv6 bool `yaml:"force_ipv6" json:"force_ipv6"` @@ -66,7 +64,6 @@ type ZurgConfig struct { RetainRDTorrentName bool `yaml:"retain_rd_torrent_name" json:"retain_rd_torrent_name"` RetriesUntilFailed int `yaml:"retries_until_failed" json:"retries_until_failed"` ServeFromRclone bool `yaml:"serve_from_rclone" json:"serve_from_rclone"` - TorrentsCount int `yaml:"get_torrents_count" json:"get_torrents_count"` Username string `yaml:"username" json:"username"` RarAction string `yaml:"rar_action" json:"rar_action"` } @@ -226,10 +223,3 @@ func (z *ZurgConfig) GetPlayableExtensions() []string { } return z.PlayableExtensions } - -func (z *ZurgConfig) GetTorrentsCount() int { - if z.TorrentsCount == 0 { - return 250 - } - return z.TorrentsCount -} diff --git a/internal/handlers/home.go b/internal/handlers/home.go index d047915..121c766 100644 --- a/internal/handlers/home.go +++ b/internal/handlers/home.go @@ -170,7 +170,7 @@ func (zr *Handlers) handleHome(resp http.ResponseWriter, req *http.Request) { %s - Config + Config Version %s @@ -218,10 +218,6 @@ func (zr *Handlers) handleHome(resp http.ResponseWriter, req *http.Request) { API Timeout %d secs - - Get torrents count - %d at a time - Download Timeout %d secs @@ -338,7 +334,6 @@ func (zr *Handlers) handleHome(resp http.ResponseWriter, req *http.Request) { response.Config.GetRepairEveryMins(), response.Config.GetRarAction(), response.Config.GetApiTimeoutSecs(), - response.Config.GetTorrentsCount(), response.Config.GetDownloadTimeoutSecs(), response.Config.GetDownloadsEveryMins(), response.Config.GetDumpTorrentsEveryMins(), diff --git a/internal/torrent/manager.go b/internal/torrent/manager.go index 5ea240b..50d7876 100644 --- a/internal/torrent/manager.go +++ b/internal/torrent/manager.go @@ -355,7 +355,7 @@ func (t *TorrentManager) mountNewDownloads() { } func (t *TorrentManager) StartDownloadsJob() { - _ = t.workerPool.Submit(func() { + t.workerPool.Submit(func() { remountTicker := time.NewTicker(time.Duration(t.Config.GetDownloadsEveryMins()) * time.Minute) defer remountTicker.Stop() @@ -414,7 +414,7 @@ func copyFile(sourcePath, destPath string) error { } func (t *TorrentManager) StartDumpJob() { - _ = t.workerPool.Submit(func() { + t.workerPool.Submit(func() { dumpTicker := time.NewTicker(time.Duration(t.Config.GetDumpTorrentsEveryMins()) * time.Minute) defer dumpTicker.Stop() @@ -444,7 +444,7 @@ func (t *TorrentManager) analyzeAllTorrents() { } func (t *TorrentManager) StartMediaAnalysisJob() { - _ = t.workerPool.Submit(func() { + t.workerPool.Submit(func() { for range t.AnalyzeTrigger { t.analyzeAllTorrents() } diff --git a/internal/torrent/refresh.go b/internal/torrent/refresh.go index 8d144b4..bb73169 100644 --- a/internal/torrent/refresh.go +++ b/internal/torrent/refresh.go @@ -40,7 +40,7 @@ func (t *TorrentManager) refreshTorrents(initialRun bool) { freshIDs.Add(instances[i].ID) wg.Add(1) idx := i - _ = t.workerPool.Submit(func() { + t.workerPool.Submit(func() { defer wg.Done() if t.binImmediately(instances[idx].ID) || t.binOnceDoneErrorCheck(instances[idx].ID, instances[idx].Status) || @@ -136,7 +136,7 @@ func (t *TorrentManager) refreshTorrents(initialRun bool) { // StartRefreshJob periodically refreshes the torrents func (t *TorrentManager) StartRefreshJob() { - go func() { + t.workerPool.Submit(func() { t.log.Debug("Starting periodic refresh job") refreshTicker := time.NewTicker(time.Duration(t.Config.GetRefreshEverySecs()) * time.Second) defer refreshTicker.Stop() @@ -157,7 +157,7 @@ func (t *TorrentManager) StartRefreshJob() { return } } - }() + }) } func (t *TorrentManager) getMoreInfo(rdTorrent realdebrid.Torrent) *realdebrid.TorrentInfo { diff --git a/internal/torrent/repair.go b/internal/torrent/repair.go index d9feea4..46491b9 100644 --- a/internal/torrent/repair.go +++ b/internal/torrent/repair.go @@ -27,7 +27,7 @@ func (t *TorrentManager) StartRepairJob() { t.repairTrigger = make(chan *Torrent) t.repairQueue = mapset.NewSet[*Torrent]() // there is 1 repair worker, with max 1 blocking task - go func() { + t.workerPool.Submit(func() { t.repairLog.Debug("Starting periodic repair job") repairTicker := time.NewTicker(time.Duration(t.Config.GetRepairEveryMins()) * time.Minute) defer repairTicker.Stop() @@ -44,7 +44,7 @@ func (t *TorrentManager) StartRepairJob() { return } } - }() + }) } func (t *TorrentManager) invokeRepair(torrent *Torrent) { @@ -106,7 +106,7 @@ func (t *TorrentManager) repairAll(torrent *Torrent) { var wg sync.WaitGroup haystack.IterCb(func(_ string, torrent *Torrent) { wg.Add(1) - _ = t.workerPool.Submit(func() { + t.workerPool.Submit(func() { defer wg.Done() if torrent.UnrepairableReason != "" { return diff --git a/pkg/premium/monitor.go b/pkg/premium/monitor.go index 7c7de7c..3eb7331 100644 --- a/pkg/premium/monitor.go +++ b/pkg/premium/monitor.go @@ -5,6 +5,7 @@ import ( "github.com/debridmediamanager/zurg/pkg/logutil" "github.com/debridmediamanager/zurg/pkg/realdebrid" + "github.com/panjf2000/ants/v2" ) const ( @@ -12,8 +13,8 @@ const ( MINIMUM_SLEEP = 60 // 60 seconds ) -func MonitorPremiumStatus(rd *realdebrid.RealDebrid, zurglog *logutil.Logger) { - go func() { +func MonitorPremiumStatus(workerPool *ants.Pool, rd *realdebrid.RealDebrid, zurglog *logutil.Logger) { + workerPool.Submit(func() { for { userInfo, err := rd.GetUserInformation() if err != nil { @@ -41,5 +42,5 @@ func MonitorPremiumStatus(rd *realdebrid.RealDebrid, zurglog *logutil.Logger) { sleepDuration := time.Duration(remaining) * time.Second time.Sleep(sleepDuration) } - }() + }) } diff --git a/pkg/realdebrid/api.go b/pkg/realdebrid/api.go index 379a981..bd0cb83 100644 --- a/pkg/realdebrid/api.go +++ b/pkg/realdebrid/api.go @@ -11,21 +11,24 @@ import ( "github.com/debridmediamanager/zurg/internal/config" zurghttp "github.com/debridmediamanager/zurg/pkg/http" "github.com/debridmediamanager/zurg/pkg/logutil" + "github.com/panjf2000/ants/v2" ) type RealDebrid struct { apiClient *zurghttp.HTTPClient unrestrictClient *zurghttp.HTTPClient downloadClient *zurghttp.HTTPClient + workerPool *ants.Pool cfg config.ConfigInterface log *logutil.Logger } -func NewRealDebrid(apiClient, unrestrictClient, downloadClient *zurghttp.HTTPClient, cfg config.ConfigInterface, log *logutil.Logger) *RealDebrid { +func NewRealDebrid(apiClient, unrestrictClient, downloadClient *zurghttp.HTTPClient, workerPool *ants.Pool, cfg config.ConfigInterface, log *logutil.Logger) *RealDebrid { return &RealDebrid{ apiClient: apiClient, unrestrictClient: unrestrictClient, downloadClient: downloadClient, + workerPool: workerPool, cfg: cfg, log: log, } @@ -173,7 +176,7 @@ func (rd *RealDebrid) GetTorrents(onlyOne bool) ([]Torrent, int, error) { allTorrents = []Torrent{} page := 1 // compute ceiling of totalCount / limit - maxPages := (totalCount + rd.cfg.GetTorrentsCount() - 1) / rd.cfg.GetTorrentsCount() + maxPages := (totalCount + 250 - 1) / 250 rd.log.Debugf("Torrents total count is %d", totalCount) maxParallelThreads := 4 if maxPages < maxParallelThreads { @@ -182,13 +185,13 @@ func (rd *RealDebrid) GetTorrents(onlyOne bool) ([]Torrent, int, error) { for { allResults := make(chan getTorrentsResult, maxParallelThreads) // Channel to collect results from goroutines for i := 0; i < maxParallelThreads; i++ { // Launch GET_PARALLEL concurrent fetches - go func(add int) { + rd.workerPool.Submit(func() { if page > maxPages { allResults <- getTorrentsResult{nil, nil, 0} return } - allResults <- rd.getPageOfTorrents(page+add, rd.cfg.GetTorrentsCount()) - }(i) + allResults <- rd.getPageOfTorrents(page+i, 250) + }) } // Collect results from all goroutines for i := 0; i < maxParallelThreads; i++ { @@ -379,13 +382,13 @@ func (rd *RealDebrid) GetDownloads() []Download { allResults := make(chan []Download, maxParallelThreads) // Channel to collect results from goroutines errChan := make(chan error, maxParallelThreads) // Channel to collect errors from goroutines for i := 0; i < maxParallelThreads; i++ { // Launch GET_PARALLEL concurrent fetches - go func(add int) { - if page+add > maxPages { + rd.workerPool.Submit(func() { + if page+i > maxPages { allResults <- nil errChan <- nil return } - result, _, err := rd.fetchPageOfDownloads(page+add, limit) + result, _, err := rd.fetchPageOfDownloads(page+i, limit) if err != nil { allResults <- nil errChan <- err @@ -393,7 +396,7 @@ func (rd *RealDebrid) GetDownloads() []Download { } allResults <- result errChan <- nil - }(i) + }) } // Collect results from all goroutines for i := 0; i < maxParallelThreads; i++ { diff --git a/sample.yml b/sample.yml new file mode 100644 index 0000000..aa31faf --- /dev/null +++ b/sample.yml @@ -0,0 +1,83 @@ +zurg: v1 + +token: +# host: [::] +# port: 9999 +# proxy: http://user:pass@your-http-proxy.com:8080 +# proxy: https://user:pass@your-https-proxy.com:8080 +# proxy: socks5://user:pass@your-socks5-proxy.com +# proxy: + +# secure your zurg deployment +# username: +# password: + +# concurrent_workers: 20 +# check_for_changes_every_secs: 15 +# repair_every_mins: 60 # 1 hr +# downloads_every_mins: 720 # 12 hrs +# dump_torrents_every_mins: 1440 # 24 hrs +enable_repair: true + +# network_buffer_size: 32768 # 32kb +# force_ipv6: false +# serve_from_rclone: false + +# api_timeout_secs: 60 +# download_timeout_secs: 10 +# rate_limit_sleep_secs: 4 +# retries_until_failed: 2 + +# possible values are extract, delete, none +# extract requires enable_repair: true +rar_action: extract + +# retain_folder_name_extension: false +# retain_rd_torrent_name: false + +# add file extensions here that you don't want to be +# moved to the unplayable directory +addl_playable_extensions: +- mp3 +- flac + +# for windows +# on_library_update: '& powershell -ExecutionPolicy Bypass -File .\plex_update.ps1 --% "$args"' +# for linux/mac +# on_library_update: sh plex_update.sh "$@" + +directories: + + audiobooks: + group_order: 5 + group: media + filters: + - and: + - is_music: true + - media_info_duration_gte: 600 + + music: + group_order: 10 + group: media + filters: + - is_music: true + + anime: + group_order: 15 + group: media + filters: + - regex: /\b[a-fA-F0-9]{8}\b/ + - any_file_inside_regex: /\b[a-fA-F0-9]{8}\b/ + + shows: + group_order: 20 + group: media + filters: + - has_episodes: true + + movies: + group_order: 25 + group: media + only_show_the_biggest_file: true + filters: + - regex: /.*/