Remove get torrents count config and use worker pool on all places
This commit is contained in:
@@ -54,7 +54,7 @@ func MainApp(configPath string) {
|
||||
|
||||
apiClient := http.NewHTTPClient(
|
||||
config.GetToken(),
|
||||
config.GetRetriesUntilFailed(), // default retries = 2, so this is 4
|
||||
config.GetRetriesUntilFailed(), // default retries = 2
|
||||
config.GetApiTimeoutSecs(), // default api timeout = 60
|
||||
false, // ipv6 support is not needed for api client
|
||||
config,
|
||||
@@ -79,19 +79,6 @@ func MainApp(configPath string) {
|
||||
log.Named("download_client"),
|
||||
)
|
||||
|
||||
api := realdebrid.NewRealDebrid(
|
||||
apiClient,
|
||||
unrestrictClient,
|
||||
downloadClient,
|
||||
config,
|
||||
log.Named("realdebrid"),
|
||||
)
|
||||
|
||||
premium.MonitorPremiumStatus(
|
||||
api,
|
||||
zurglog,
|
||||
)
|
||||
|
||||
workerPool, err := ants.NewPool(config.GetNumOfWorkers())
|
||||
if err != nil {
|
||||
zurglog.Errorf("Failed to create worker pool: %v", err)
|
||||
@@ -99,6 +86,21 @@ func MainApp(configPath string) {
|
||||
}
|
||||
defer workerPool.Release()
|
||||
|
||||
api := realdebrid.NewRealDebrid(
|
||||
apiClient,
|
||||
unrestrictClient,
|
||||
downloadClient,
|
||||
workerPool,
|
||||
config,
|
||||
log.Named("realdebrid"),
|
||||
)
|
||||
|
||||
premium.MonitorPremiumStatus(
|
||||
workerPool,
|
||||
api,
|
||||
zurglog,
|
||||
)
|
||||
|
||||
torrentMgr := torrent.NewTorrentManager(
|
||||
config,
|
||||
api,
|
||||
@@ -121,12 +123,12 @@ func MainApp(configPath string) {
|
||||
)
|
||||
|
||||
//// pprof
|
||||
// go func() {
|
||||
// if err := netHttp.ListenAndServe(":6060", nil); err != nil && err != netHttp.ErrServerClosed {
|
||||
// zurglog.Errorf("Failed to start pprof: %v", err)
|
||||
// os.Exit(1)
|
||||
// }
|
||||
// }()
|
||||
workerPool.Submit(func() {
|
||||
if err := netHttp.ListenAndServe(":6060", nil); err != nil && err != netHttp.ErrServerClosed {
|
||||
zurglog.Errorf("Failed to start pprof: %v", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
})
|
||||
|
||||
addr := fmt.Sprintf("%s:%s", config.GetHost(), config.GetPort())
|
||||
zurglog.Infof("Starting server on %s", addr)
|
||||
|
||||
@@ -36,7 +36,6 @@ type ConfigInterface interface {
|
||||
GetDownloadsEveryMins() int
|
||||
GetDumpTorrentsEveryMins() int
|
||||
GetPlayableExtensions() []string
|
||||
GetTorrentsCount() int
|
||||
}
|
||||
|
||||
type ZurgConfig struct {
|
||||
@@ -46,7 +45,6 @@ type ZurgConfig struct {
|
||||
ApiTimeoutSecs int `yaml:"api_timeout_secs" json:"api_timeout_secs"`
|
||||
CanRepair bool `yaml:"enable_repair" json:"enable_repair"`
|
||||
DownloadsEveryMins int `yaml:"downloads_every_mins" json:"downloads_every_mins"`
|
||||
DownloadsLimit int `yaml:"downloads_limit" json:"downloads_limit"`
|
||||
DumpTorrentsEveryMins int `yaml:"dump_torrents_every_mins" json:"dump_torrents_every_mins"`
|
||||
DownloadTimeoutSecs int `yaml:"download_timeout_secs" json:"download_timeout_secs"`
|
||||
ForceIPv6 bool `yaml:"force_ipv6" json:"force_ipv6"`
|
||||
@@ -66,7 +64,6 @@ type ZurgConfig struct {
|
||||
RetainRDTorrentName bool `yaml:"retain_rd_torrent_name" json:"retain_rd_torrent_name"`
|
||||
RetriesUntilFailed int `yaml:"retries_until_failed" json:"retries_until_failed"`
|
||||
ServeFromRclone bool `yaml:"serve_from_rclone" json:"serve_from_rclone"`
|
||||
TorrentsCount int `yaml:"get_torrents_count" json:"get_torrents_count"`
|
||||
Username string `yaml:"username" json:"username"`
|
||||
RarAction string `yaml:"rar_action" json:"rar_action"`
|
||||
}
|
||||
@@ -226,10 +223,3 @@ func (z *ZurgConfig) GetPlayableExtensions() []string {
|
||||
}
|
||||
return z.PlayableExtensions
|
||||
}
|
||||
|
||||
func (z *ZurgConfig) GetTorrentsCount() int {
|
||||
if z.TorrentsCount == 0 {
|
||||
return 250
|
||||
}
|
||||
return z.TorrentsCount
|
||||
}
|
||||
|
||||
@@ -170,7 +170,7 @@ func (zr *Handlers) handleHome(resp http.ResponseWriter, req *http.Request) {
|
||||
<td>%s</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td rowspan="22">Config</td>
|
||||
<td rowspan="21">Config</td>
|
||||
<td>Version</td>
|
||||
<td>%s</td>
|
||||
</tr>
|
||||
@@ -218,10 +218,6 @@ func (zr *Handlers) handleHome(resp http.ResponseWriter, req *http.Request) {
|
||||
<td>API Timeout</td>
|
||||
<td>%d secs</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>Get torrents count</td>
|
||||
<td>%d at a time</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>Download Timeout</td>
|
||||
<td>%d secs</td>
|
||||
@@ -338,7 +334,6 @@ func (zr *Handlers) handleHome(resp http.ResponseWriter, req *http.Request) {
|
||||
response.Config.GetRepairEveryMins(),
|
||||
response.Config.GetRarAction(),
|
||||
response.Config.GetApiTimeoutSecs(),
|
||||
response.Config.GetTorrentsCount(),
|
||||
response.Config.GetDownloadTimeoutSecs(),
|
||||
response.Config.GetDownloadsEveryMins(),
|
||||
response.Config.GetDumpTorrentsEveryMins(),
|
||||
|
||||
@@ -355,7 +355,7 @@ func (t *TorrentManager) mountNewDownloads() {
|
||||
}
|
||||
|
||||
func (t *TorrentManager) StartDownloadsJob() {
|
||||
_ = t.workerPool.Submit(func() {
|
||||
t.workerPool.Submit(func() {
|
||||
remountTicker := time.NewTicker(time.Duration(t.Config.GetDownloadsEveryMins()) * time.Minute)
|
||||
defer remountTicker.Stop()
|
||||
|
||||
@@ -414,7 +414,7 @@ func copyFile(sourcePath, destPath string) error {
|
||||
}
|
||||
|
||||
func (t *TorrentManager) StartDumpJob() {
|
||||
_ = t.workerPool.Submit(func() {
|
||||
t.workerPool.Submit(func() {
|
||||
dumpTicker := time.NewTicker(time.Duration(t.Config.GetDumpTorrentsEveryMins()) * time.Minute)
|
||||
defer dumpTicker.Stop()
|
||||
|
||||
@@ -444,7 +444,7 @@ func (t *TorrentManager) analyzeAllTorrents() {
|
||||
}
|
||||
|
||||
func (t *TorrentManager) StartMediaAnalysisJob() {
|
||||
_ = t.workerPool.Submit(func() {
|
||||
t.workerPool.Submit(func() {
|
||||
for range t.AnalyzeTrigger {
|
||||
t.analyzeAllTorrents()
|
||||
}
|
||||
|
||||
@@ -40,7 +40,7 @@ func (t *TorrentManager) refreshTorrents(initialRun bool) {
|
||||
freshIDs.Add(instances[i].ID)
|
||||
wg.Add(1)
|
||||
idx := i
|
||||
_ = t.workerPool.Submit(func() {
|
||||
t.workerPool.Submit(func() {
|
||||
defer wg.Done()
|
||||
if t.binImmediately(instances[idx].ID) ||
|
||||
t.binOnceDoneErrorCheck(instances[idx].ID, instances[idx].Status) ||
|
||||
@@ -136,7 +136,7 @@ func (t *TorrentManager) refreshTorrents(initialRun bool) {
|
||||
|
||||
// StartRefreshJob periodically refreshes the torrents
|
||||
func (t *TorrentManager) StartRefreshJob() {
|
||||
go func() {
|
||||
t.workerPool.Submit(func() {
|
||||
t.log.Debug("Starting periodic refresh job")
|
||||
refreshTicker := time.NewTicker(time.Duration(t.Config.GetRefreshEverySecs()) * time.Second)
|
||||
defer refreshTicker.Stop()
|
||||
@@ -157,7 +157,7 @@ func (t *TorrentManager) StartRefreshJob() {
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
})
|
||||
}
|
||||
|
||||
func (t *TorrentManager) getMoreInfo(rdTorrent realdebrid.Torrent) *realdebrid.TorrentInfo {
|
||||
|
||||
@@ -27,7 +27,7 @@ func (t *TorrentManager) StartRepairJob() {
|
||||
t.repairTrigger = make(chan *Torrent)
|
||||
t.repairQueue = mapset.NewSet[*Torrent]()
|
||||
// there is 1 repair worker, with max 1 blocking task
|
||||
go func() {
|
||||
t.workerPool.Submit(func() {
|
||||
t.repairLog.Debug("Starting periodic repair job")
|
||||
repairTicker := time.NewTicker(time.Duration(t.Config.GetRepairEveryMins()) * time.Minute)
|
||||
defer repairTicker.Stop()
|
||||
@@ -44,7 +44,7 @@ func (t *TorrentManager) StartRepairJob() {
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
})
|
||||
}
|
||||
|
||||
func (t *TorrentManager) invokeRepair(torrent *Torrent) {
|
||||
@@ -106,7 +106,7 @@ func (t *TorrentManager) repairAll(torrent *Torrent) {
|
||||
var wg sync.WaitGroup
|
||||
haystack.IterCb(func(_ string, torrent *Torrent) {
|
||||
wg.Add(1)
|
||||
_ = t.workerPool.Submit(func() {
|
||||
t.workerPool.Submit(func() {
|
||||
defer wg.Done()
|
||||
if torrent.UnrepairableReason != "" {
|
||||
return
|
||||
|
||||
Reference in New Issue
Block a user