diff --git a/internal/app.go b/internal/app.go
index 15a9abb..c631192 100644
--- a/internal/app.go
+++ b/internal/app.go
@@ -54,7 +54,7 @@ func MainApp(configPath string) {
apiClient := http.NewHTTPClient(
config.GetToken(),
- config.GetRetriesUntilFailed(), // default retries = 2, so this is 4
+ config.GetRetriesUntilFailed(), // default retries = 2
config.GetApiTimeoutSecs(), // default api timeout = 60
false, // ipv6 support is not needed for api client
config,
@@ -79,19 +79,6 @@ func MainApp(configPath string) {
log.Named("download_client"),
)
- api := realdebrid.NewRealDebrid(
- apiClient,
- unrestrictClient,
- downloadClient,
- config,
- log.Named("realdebrid"),
- )
-
- premium.MonitorPremiumStatus(
- api,
- zurglog,
- )
-
workerPool, err := ants.NewPool(config.GetNumOfWorkers())
if err != nil {
zurglog.Errorf("Failed to create worker pool: %v", err)
@@ -99,6 +86,21 @@ func MainApp(configPath string) {
}
defer workerPool.Release()
+ api := realdebrid.NewRealDebrid(
+ apiClient,
+ unrestrictClient,
+ downloadClient,
+ workerPool,
+ config,
+ log.Named("realdebrid"),
+ )
+
+ premium.MonitorPremiumStatus(
+ workerPool,
+ api,
+ zurglog,
+ )
+
torrentMgr := torrent.NewTorrentManager(
config,
api,
@@ -121,12 +123,12 @@ func MainApp(configPath string) {
)
//// pprof
- // go func() {
- // if err := netHttp.ListenAndServe(":6060", nil); err != nil && err != netHttp.ErrServerClosed {
- // zurglog.Errorf("Failed to start pprof: %v", err)
- // os.Exit(1)
- // }
- // }()
+ workerPool.Submit(func() {
+ if err := netHttp.ListenAndServe(":6060", nil); err != nil && err != netHttp.ErrServerClosed {
+ zurglog.Errorf("Failed to start pprof: %v", err)
+ os.Exit(1)
+ }
+ })
addr := fmt.Sprintf("%s:%s", config.GetHost(), config.GetPort())
zurglog.Infof("Starting server on %s", addr)
diff --git a/internal/config/types.go b/internal/config/types.go
index cb9f9ef..6bdf66e 100644
--- a/internal/config/types.go
+++ b/internal/config/types.go
@@ -36,7 +36,6 @@ type ConfigInterface interface {
GetDownloadsEveryMins() int
GetDumpTorrentsEveryMins() int
GetPlayableExtensions() []string
- GetTorrentsCount() int
}
type ZurgConfig struct {
@@ -46,7 +45,6 @@ type ZurgConfig struct {
ApiTimeoutSecs int `yaml:"api_timeout_secs" json:"api_timeout_secs"`
CanRepair bool `yaml:"enable_repair" json:"enable_repair"`
DownloadsEveryMins int `yaml:"downloads_every_mins" json:"downloads_every_mins"`
- DownloadsLimit int `yaml:"downloads_limit" json:"downloads_limit"`
DumpTorrentsEveryMins int `yaml:"dump_torrents_every_mins" json:"dump_torrents_every_mins"`
DownloadTimeoutSecs int `yaml:"download_timeout_secs" json:"download_timeout_secs"`
ForceIPv6 bool `yaml:"force_ipv6" json:"force_ipv6"`
@@ -66,7 +64,6 @@ type ZurgConfig struct {
RetainRDTorrentName bool `yaml:"retain_rd_torrent_name" json:"retain_rd_torrent_name"`
RetriesUntilFailed int `yaml:"retries_until_failed" json:"retries_until_failed"`
ServeFromRclone bool `yaml:"serve_from_rclone" json:"serve_from_rclone"`
- TorrentsCount int `yaml:"get_torrents_count" json:"get_torrents_count"`
Username string `yaml:"username" json:"username"`
RarAction string `yaml:"rar_action" json:"rar_action"`
}
@@ -226,10 +223,3 @@ func (z *ZurgConfig) GetPlayableExtensions() []string {
}
return z.PlayableExtensions
}
-
-func (z *ZurgConfig) GetTorrentsCount() int {
- if z.TorrentsCount == 0 {
- return 250
- }
- return z.TorrentsCount
-}
diff --git a/internal/handlers/home.go b/internal/handlers/home.go
index d047915..121c766 100644
--- a/internal/handlers/home.go
+++ b/internal/handlers/home.go
@@ -170,7 +170,7 @@ func (zr *Handlers) handleHome(resp http.ResponseWriter, req *http.Request) {
%s |
- | Config |
+ Config |
Version |
%s |
@@ -218,10 +218,6 @@ func (zr *Handlers) handleHome(resp http.ResponseWriter, req *http.Request) {
API Timeout |
%d secs |
-
- | Get torrents count |
- %d at a time |
-
| Download Timeout |
%d secs |
@@ -338,7 +334,6 @@ func (zr *Handlers) handleHome(resp http.ResponseWriter, req *http.Request) {
response.Config.GetRepairEveryMins(),
response.Config.GetRarAction(),
response.Config.GetApiTimeoutSecs(),
- response.Config.GetTorrentsCount(),
response.Config.GetDownloadTimeoutSecs(),
response.Config.GetDownloadsEveryMins(),
response.Config.GetDumpTorrentsEveryMins(),
diff --git a/internal/torrent/manager.go b/internal/torrent/manager.go
index 5ea240b..50d7876 100644
--- a/internal/torrent/manager.go
+++ b/internal/torrent/manager.go
@@ -355,7 +355,7 @@ func (t *TorrentManager) mountNewDownloads() {
}
func (t *TorrentManager) StartDownloadsJob() {
- _ = t.workerPool.Submit(func() {
+ t.workerPool.Submit(func() {
remountTicker := time.NewTicker(time.Duration(t.Config.GetDownloadsEveryMins()) * time.Minute)
defer remountTicker.Stop()
@@ -414,7 +414,7 @@ func copyFile(sourcePath, destPath string) error {
}
func (t *TorrentManager) StartDumpJob() {
- _ = t.workerPool.Submit(func() {
+ t.workerPool.Submit(func() {
dumpTicker := time.NewTicker(time.Duration(t.Config.GetDumpTorrentsEveryMins()) * time.Minute)
defer dumpTicker.Stop()
@@ -444,7 +444,7 @@ func (t *TorrentManager) analyzeAllTorrents() {
}
func (t *TorrentManager) StartMediaAnalysisJob() {
- _ = t.workerPool.Submit(func() {
+ t.workerPool.Submit(func() {
for range t.AnalyzeTrigger {
t.analyzeAllTorrents()
}
diff --git a/internal/torrent/refresh.go b/internal/torrent/refresh.go
index 8d144b4..bb73169 100644
--- a/internal/torrent/refresh.go
+++ b/internal/torrent/refresh.go
@@ -40,7 +40,7 @@ func (t *TorrentManager) refreshTorrents(initialRun bool) {
freshIDs.Add(instances[i].ID)
wg.Add(1)
idx := i
- _ = t.workerPool.Submit(func() {
+ t.workerPool.Submit(func() {
defer wg.Done()
if t.binImmediately(instances[idx].ID) ||
t.binOnceDoneErrorCheck(instances[idx].ID, instances[idx].Status) ||
@@ -136,7 +136,7 @@ func (t *TorrentManager) refreshTorrents(initialRun bool) {
// StartRefreshJob periodically refreshes the torrents
func (t *TorrentManager) StartRefreshJob() {
- go func() {
+ t.workerPool.Submit(func() {
t.log.Debug("Starting periodic refresh job")
refreshTicker := time.NewTicker(time.Duration(t.Config.GetRefreshEverySecs()) * time.Second)
defer refreshTicker.Stop()
@@ -157,7 +157,7 @@ func (t *TorrentManager) StartRefreshJob() {
return
}
}
- }()
+ })
}
func (t *TorrentManager) getMoreInfo(rdTorrent realdebrid.Torrent) *realdebrid.TorrentInfo {
diff --git a/internal/torrent/repair.go b/internal/torrent/repair.go
index d9feea4..46491b9 100644
--- a/internal/torrent/repair.go
+++ b/internal/torrent/repair.go
@@ -27,7 +27,7 @@ func (t *TorrentManager) StartRepairJob() {
t.repairTrigger = make(chan *Torrent)
t.repairQueue = mapset.NewSet[*Torrent]()
// there is 1 repair worker, with max 1 blocking task
- go func() {
+ t.workerPool.Submit(func() {
t.repairLog.Debug("Starting periodic repair job")
repairTicker := time.NewTicker(time.Duration(t.Config.GetRepairEveryMins()) * time.Minute)
defer repairTicker.Stop()
@@ -44,7 +44,7 @@ func (t *TorrentManager) StartRepairJob() {
return
}
}
- }()
+ })
}
func (t *TorrentManager) invokeRepair(torrent *Torrent) {
@@ -106,7 +106,7 @@ func (t *TorrentManager) repairAll(torrent *Torrent) {
var wg sync.WaitGroup
haystack.IterCb(func(_ string, torrent *Torrent) {
wg.Add(1)
- _ = t.workerPool.Submit(func() {
+ t.workerPool.Submit(func() {
defer wg.Done()
if torrent.UnrepairableReason != "" {
return
diff --git a/pkg/premium/monitor.go b/pkg/premium/monitor.go
index 7c7de7c..3eb7331 100644
--- a/pkg/premium/monitor.go
+++ b/pkg/premium/monitor.go
@@ -5,6 +5,7 @@ import (
"github.com/debridmediamanager/zurg/pkg/logutil"
"github.com/debridmediamanager/zurg/pkg/realdebrid"
+ "github.com/panjf2000/ants/v2"
)
const (
@@ -12,8 +13,8 @@ const (
MINIMUM_SLEEP = 60 // 60 seconds
)
-func MonitorPremiumStatus(rd *realdebrid.RealDebrid, zurglog *logutil.Logger) {
- go func() {
+func MonitorPremiumStatus(workerPool *ants.Pool, rd *realdebrid.RealDebrid, zurglog *logutil.Logger) {
+ workerPool.Submit(func() {
for {
userInfo, err := rd.GetUserInformation()
if err != nil {
@@ -41,5 +42,5 @@ func MonitorPremiumStatus(rd *realdebrid.RealDebrid, zurglog *logutil.Logger) {
sleepDuration := time.Duration(remaining) * time.Second
time.Sleep(sleepDuration)
}
- }()
+ })
}
diff --git a/pkg/realdebrid/api.go b/pkg/realdebrid/api.go
index 379a981..bd0cb83 100644
--- a/pkg/realdebrid/api.go
+++ b/pkg/realdebrid/api.go
@@ -11,21 +11,24 @@ import (
"github.com/debridmediamanager/zurg/internal/config"
zurghttp "github.com/debridmediamanager/zurg/pkg/http"
"github.com/debridmediamanager/zurg/pkg/logutil"
+ "github.com/panjf2000/ants/v2"
)
type RealDebrid struct {
apiClient *zurghttp.HTTPClient
unrestrictClient *zurghttp.HTTPClient
downloadClient *zurghttp.HTTPClient
+ workerPool *ants.Pool
cfg config.ConfigInterface
log *logutil.Logger
}
-func NewRealDebrid(apiClient, unrestrictClient, downloadClient *zurghttp.HTTPClient, cfg config.ConfigInterface, log *logutil.Logger) *RealDebrid {
+func NewRealDebrid(apiClient, unrestrictClient, downloadClient *zurghttp.HTTPClient, workerPool *ants.Pool, cfg config.ConfigInterface, log *logutil.Logger) *RealDebrid {
return &RealDebrid{
apiClient: apiClient,
unrestrictClient: unrestrictClient,
downloadClient: downloadClient,
+ workerPool: workerPool,
cfg: cfg,
log: log,
}
@@ -173,7 +176,7 @@ func (rd *RealDebrid) GetTorrents(onlyOne bool) ([]Torrent, int, error) {
allTorrents = []Torrent{}
page := 1
// compute ceiling of totalCount / limit
- maxPages := (totalCount + rd.cfg.GetTorrentsCount() - 1) / rd.cfg.GetTorrentsCount()
+ maxPages := (totalCount + 250 - 1) / 250
rd.log.Debugf("Torrents total count is %d", totalCount)
maxParallelThreads := 4
if maxPages < maxParallelThreads {
@@ -182,13 +185,13 @@ func (rd *RealDebrid) GetTorrents(onlyOne bool) ([]Torrent, int, error) {
for {
allResults := make(chan getTorrentsResult, maxParallelThreads) // Channel to collect results from goroutines
for i := 0; i < maxParallelThreads; i++ { // Launch GET_PARALLEL concurrent fetches
- go func(add int) {
+ rd.workerPool.Submit(func() {
if page > maxPages {
allResults <- getTorrentsResult{nil, nil, 0}
return
}
- allResults <- rd.getPageOfTorrents(page+add, rd.cfg.GetTorrentsCount())
- }(i)
+ allResults <- rd.getPageOfTorrents(page+i, 250)
+ })
}
// Collect results from all goroutines
for i := 0; i < maxParallelThreads; i++ {
@@ -379,13 +382,13 @@ func (rd *RealDebrid) GetDownloads() []Download {
allResults := make(chan []Download, maxParallelThreads) // Channel to collect results from goroutines
errChan := make(chan error, maxParallelThreads) // Channel to collect errors from goroutines
for i := 0; i < maxParallelThreads; i++ { // Launch GET_PARALLEL concurrent fetches
- go func(add int) {
- if page+add > maxPages {
+ rd.workerPool.Submit(func() {
+ if page+i > maxPages {
allResults <- nil
errChan <- nil
return
}
- result, _, err := rd.fetchPageOfDownloads(page+add, limit)
+ result, _, err := rd.fetchPageOfDownloads(page+i, limit)
if err != nil {
allResults <- nil
errChan <- err
@@ -393,7 +396,7 @@ func (rd *RealDebrid) GetDownloads() []Download {
}
allResults <- result
errChan <- nil
- }(i)
+ })
}
// Collect results from all goroutines
for i := 0; i < maxParallelThreads; i++ {
diff --git a/sample.yml b/sample.yml
new file mode 100644
index 0000000..aa31faf
--- /dev/null
+++ b/sample.yml
@@ -0,0 +1,83 @@
+zurg: v1
+
+token:
+# host: [::]
+# port: 9999
+# proxy: http://user:pass@your-http-proxy.com:8080
+# proxy: https://user:pass@your-https-proxy.com:8080
+# proxy: socks5://user:pass@your-socks5-proxy.com
+# proxy:
+
+# secure your zurg deployment
+# username:
+# password:
+
+# concurrent_workers: 20
+# check_for_changes_every_secs: 15
+# repair_every_mins: 60 # 1 hr
+# downloads_every_mins: 720 # 12 hrs
+# dump_torrents_every_mins: 1440 # 24 hrs
+enable_repair: true
+
+# network_buffer_size: 32768 # 32kb
+# force_ipv6: false
+# serve_from_rclone: false
+
+# api_timeout_secs: 60
+# download_timeout_secs: 10
+# rate_limit_sleep_secs: 4
+# retries_until_failed: 2
+
+# possible values are extract, delete, none
+# extract requires enable_repair: true
+rar_action: extract
+
+# retain_folder_name_extension: false
+# retain_rd_torrent_name: false
+
+# add file extensions here that you don't want to be
+# moved to the unplayable directory
+addl_playable_extensions:
+- mp3
+- flac
+
+# for windows
+# on_library_update: '& powershell -ExecutionPolicy Bypass -File .\plex_update.ps1 --% "$args"'
+# for linux/mac
+# on_library_update: sh plex_update.sh "$@"
+
+directories:
+
+ audiobooks:
+ group_order: 5
+ group: media
+ filters:
+ - and:
+ - is_music: true
+ - media_info_duration_gte: 600
+
+ music:
+ group_order: 10
+ group: media
+ filters:
+ - is_music: true
+
+ anime:
+ group_order: 15
+ group: media
+ filters:
+ - regex: /\b[a-fA-F0-9]{8}\b/
+ - any_file_inside_regex: /\b[a-fA-F0-9]{8}\b/
+
+ shows:
+ group_order: 20
+ group: media
+ filters:
+ - has_episodes: true
+
+ movies:
+ group_order: 25
+ group: media
+ only_show_the_biggest_file: true
+ filters:
+ - regex: /.*/