Support rebooting workers
This commit is contained in:
@@ -177,21 +177,21 @@ func (z *ZurgConfig) EnableDownloadMount() bool {
|
|||||||
|
|
||||||
func (z *ZurgConfig) GetApiTimeoutSecs() int {
|
func (z *ZurgConfig) GetApiTimeoutSecs() int {
|
||||||
if z.ApiTimeoutSecs == 0 {
|
if z.ApiTimeoutSecs == 0 {
|
||||||
return 30
|
return 15
|
||||||
}
|
}
|
||||||
return z.ApiTimeoutSecs
|
return z.ApiTimeoutSecs
|
||||||
}
|
}
|
||||||
|
|
||||||
func (z *ZurgConfig) GetDownloadTimeoutSecs() int {
|
func (z *ZurgConfig) GetDownloadTimeoutSecs() int {
|
||||||
if z.DownloadTimeoutSecs == 0 {
|
if z.DownloadTimeoutSecs == 0 {
|
||||||
return 15
|
return 10
|
||||||
}
|
}
|
||||||
return z.DownloadTimeoutSecs
|
return z.DownloadTimeoutSecs
|
||||||
}
|
}
|
||||||
|
|
||||||
func (z *ZurgConfig) GetRateLimitSleepSecs() int {
|
func (z *ZurgConfig) GetRateLimitSleepSecs() int {
|
||||||
if z.RateLimitSleepSecs == 0 {
|
if z.RateLimitSleepSecs == 0 {
|
||||||
return 4
|
return 6
|
||||||
}
|
}
|
||||||
return z.RateLimitSleepSecs
|
return z.RateLimitSleepSecs
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -243,6 +243,8 @@ func (zr *Handlers) handleHome(resp http.ResponseWriter, req *http.Request) {
|
|||||||
<td colspan="2">
|
<td colspan="2">
|
||||||
<button type="button" onclick="window.open('/logs/upload')">Upload logs</button>
|
<button type="button" onclick="window.open('/logs/upload')">Upload logs</button>
|
||||||
<button type="button" onclick="window.open('/reboot/worker')">Reboot worker pool</button>
|
<button type="button" onclick="window.open('/reboot/worker')">Reboot worker pool</button>
|
||||||
|
<button type="button" onclick="window.open('/reboot/refresh')">Reboot refresh worker</button>
|
||||||
|
<button type="button" onclick="window.open('/reboot/repair')">Reboot repair worker</button>
|
||||||
</td>
|
</td>
|
||||||
</tr>
|
</tr>
|
||||||
</table>
|
</table>
|
||||||
@@ -310,6 +312,20 @@ func (zr *Handlers) handleRebootWorkerPool(resp http.ResponseWriter, req *http.R
|
|||||||
fmt.Fprint(resp, "Rebooted worker pool, please close this window")
|
fmt.Fprint(resp, "Rebooted worker pool, please close this window")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (zr *Handlers) handleRebootRefreshPool(resp http.ResponseWriter, req *http.Request) {
|
||||||
|
zr.torMgr.RefreshKillSwitch <- struct{}{}
|
||||||
|
zr.torMgr.StartRefreshJob()
|
||||||
|
zr.log.Infof("Rebooted refresh worker")
|
||||||
|
fmt.Fprint(resp, "Rebooted refresh worker, please close this window")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (zr *Handlers) handleRebootRepairPool(resp http.ResponseWriter, req *http.Request) {
|
||||||
|
zr.torMgr.RepairKillSwitch <- struct{}{}
|
||||||
|
zr.torMgr.StartRepairJob()
|
||||||
|
zr.log.Infof("Rebooted repair worker")
|
||||||
|
fmt.Fprint(resp, "Rebooted repair worker, please close this window")
|
||||||
|
}
|
||||||
|
|
||||||
func bToMb(b uint64) uint64 {
|
func bToMb(b uint64) uint64 {
|
||||||
return b / 1024 / 1024
|
return b / 1024 / 1024
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -55,6 +55,8 @@ func AttachHandlers(router *chi.Mux, downloader *universal.Downloader, torMgr *t
|
|||||||
|
|
||||||
router.Get("/", hs.handleHome)
|
router.Get("/", hs.handleHome)
|
||||||
router.Get("/reboot/worker", hs.handleRebootWorkerPool)
|
router.Get("/reboot/worker", hs.handleRebootWorkerPool)
|
||||||
|
router.Get("/reboot/refresh", hs.handleRebootRefreshPool)
|
||||||
|
router.Get("/reboot/repair", hs.handleRebootRepairPool)
|
||||||
// version
|
// version
|
||||||
router.Get(fmt.Sprintf("/{mountType}/%s", version.FILE), hs.handleVersionFile)
|
router.Get(fmt.Sprintf("/{mountType}/%s", version.FILE), hs.handleVersionFile)
|
||||||
router.Head(fmt.Sprintf("/{mountType}/%s", version.FILE), hs.handleCheckVersionFile)
|
router.Head(fmt.Sprintf("/{mountType}/%s", version.FILE), hs.handleCheckVersionFile)
|
||||||
|
|||||||
@@ -21,24 +21,26 @@ const (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type TorrentManager struct {
|
type TorrentManager struct {
|
||||||
Config config.ConfigInterface
|
Config config.ConfigInterface
|
||||||
Api *realdebrid.RealDebrid
|
Api *realdebrid.RealDebrid
|
||||||
DirectoryMap cmap.ConcurrentMap[string, cmap.ConcurrentMap[string, *Torrent]] // directory -> accessKey -> Torrent
|
DirectoryMap cmap.ConcurrentMap[string, cmap.ConcurrentMap[string, *Torrent]] // directory -> accessKey -> Torrent
|
||||||
DownloadCache cmap.ConcurrentMap[string, *realdebrid.Download]
|
DownloadCache cmap.ConcurrentMap[string, *realdebrid.Download]
|
||||||
DownloadMap cmap.ConcurrentMap[string, *realdebrid.Download]
|
DownloadMap cmap.ConcurrentMap[string, *realdebrid.Download]
|
||||||
fixers cmap.ConcurrentMap[string, *Torrent]
|
fixers cmap.ConcurrentMap[string, *Torrent]
|
||||||
deleteOnceDone mapset.Set[string]
|
deleteOnceDone mapset.Set[string]
|
||||||
allAccessKeys mapset.Set[string]
|
allAccessKeys mapset.Set[string]
|
||||||
latestState *LibraryState
|
latestState *LibraryState
|
||||||
requiredVersion string
|
requiredVersion string
|
||||||
workerPool *ants.Pool
|
workerPool *ants.Pool
|
||||||
refreshPool *ants.Pool
|
refreshPool *ants.Pool
|
||||||
repairPool *ants.Pool
|
RefreshKillSwitch chan struct{}
|
||||||
repairTrigger chan *Torrent
|
RepairKillSwitch chan struct{}
|
||||||
repairSet mapset.Set[*Torrent]
|
repairPool *ants.Pool
|
||||||
repairRunning bool
|
repairTrigger chan *Torrent
|
||||||
repairRunningMu sync.Mutex
|
repairSet mapset.Set[*Torrent]
|
||||||
log *logutil.Logger
|
repairRunning bool
|
||||||
|
repairRunningMu sync.Mutex
|
||||||
|
log *logutil.Logger
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewTorrentManager creates a new torrent manager
|
// NewTorrentManager creates a new torrent manager
|
||||||
@@ -46,20 +48,22 @@ type TorrentManager struct {
|
|||||||
// and store them in-memory and cached in files
|
// and store them in-memory and cached in files
|
||||||
func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, workerPool, refreshPool, repairPool *ants.Pool, log *logutil.Logger) *TorrentManager {
|
func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, workerPool, refreshPool, repairPool *ants.Pool, log *logutil.Logger) *TorrentManager {
|
||||||
t := &TorrentManager{
|
t := &TorrentManager{
|
||||||
Config: cfg,
|
Config: cfg,
|
||||||
Api: api,
|
Api: api,
|
||||||
DirectoryMap: cmap.New[cmap.ConcurrentMap[string, *Torrent]](),
|
DirectoryMap: cmap.New[cmap.ConcurrentMap[string, *Torrent]](),
|
||||||
DownloadCache: cmap.New[*realdebrid.Download](),
|
DownloadCache: cmap.New[*realdebrid.Download](),
|
||||||
DownloadMap: cmap.New[*realdebrid.Download](),
|
DownloadMap: cmap.New[*realdebrid.Download](),
|
||||||
fixers: cmap.New[*Torrent](),
|
RefreshKillSwitch: make(chan struct{}, 1),
|
||||||
deleteOnceDone: mapset.NewSet[string](),
|
RepairKillSwitch: make(chan struct{}, 1),
|
||||||
allAccessKeys: mapset.NewSet[string](),
|
fixers: cmap.New[*Torrent](),
|
||||||
latestState: &LibraryState{},
|
deleteOnceDone: mapset.NewSet[string](),
|
||||||
requiredVersion: "0.9.3-hotfix.3",
|
allAccessKeys: mapset.NewSet[string](),
|
||||||
workerPool: workerPool,
|
latestState: &LibraryState{},
|
||||||
refreshPool: refreshPool,
|
requiredVersion: "0.9.3-hotfix.3",
|
||||||
repairPool: repairPool,
|
workerPool: workerPool,
|
||||||
log: log,
|
refreshPool: refreshPool,
|
||||||
|
repairPool: repairPool,
|
||||||
|
log: log,
|
||||||
}
|
}
|
||||||
|
|
||||||
t.initializeDirectories()
|
t.initializeDirectories()
|
||||||
|
|||||||
@@ -123,20 +123,27 @@ func (t *TorrentManager) refreshTorrents() []string {
|
|||||||
func (t *TorrentManager) StartRefreshJob() {
|
func (t *TorrentManager) StartRefreshJob() {
|
||||||
_ = t.refreshPool.Submit(func() {
|
_ = t.refreshPool.Submit(func() {
|
||||||
t.log.Info("Starting periodic refresh job")
|
t.log.Info("Starting periodic refresh job")
|
||||||
|
refreshTicker := time.NewTicker(time.Duration(t.Config.GetRefreshEverySecs()) * time.Second)
|
||||||
|
defer refreshTicker.Stop()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
<-time.After(time.Duration(t.Config.GetRefreshEverySecs()) * time.Second)
|
select {
|
||||||
|
case <-refreshTicker.C:
|
||||||
|
checksum := t.getCurrentState()
|
||||||
|
if t.latestState.equal(checksum) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
t.SetNewLatestState(checksum)
|
||||||
|
t.log.Infof("Detected changes! Refreshing %d torrents", checksum.TotalCount)
|
||||||
|
|
||||||
checksum := t.getCurrentState()
|
updatedPaths := t.refreshTorrents()
|
||||||
if t.latestState.equal(checksum) {
|
t.log.Info("Finished refreshing torrents")
|
||||||
continue
|
|
||||||
|
t.TriggerHookOnLibraryUpdate(updatedPaths)
|
||||||
|
case <-t.RefreshKillSwitch:
|
||||||
|
t.log.Info("Stopping periodic refresh job")
|
||||||
|
return
|
||||||
}
|
}
|
||||||
t.SetNewLatestState(checksum)
|
|
||||||
t.log.Infof("Detected changes! Refreshing %d torrents", checksum.TotalCount)
|
|
||||||
|
|
||||||
updatedPaths := t.refreshTorrents()
|
|
||||||
t.log.Info("Finished refreshing torrents")
|
|
||||||
|
|
||||||
t.TriggerHookOnLibraryUpdate(updatedPaths)
|
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -37,6 +37,9 @@ func (t *TorrentManager) StartRepairJob() {
|
|||||||
case torrent := <-t.repairTrigger:
|
case torrent := <-t.repairTrigger:
|
||||||
// On-demand trigger with a specific torrent
|
// On-demand trigger with a specific torrent
|
||||||
t.invokeRepair(torrent)
|
t.invokeRepair(torrent)
|
||||||
|
case <-t.RepairKillSwitch:
|
||||||
|
t.log.Info("Stopping periodic repair job")
|
||||||
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|||||||
@@ -248,7 +248,7 @@ func (r *HTTPClient) shouldRetry(resp *http.Response, reqHasRangeHeader bool, er
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if err != nil && strings.Contains(err.Error(), "timeout awaiting response headers") {
|
if err != nil && strings.Contains(err.Error(), "timeout") {
|
||||||
return 1
|
return 1
|
||||||
}
|
}
|
||||||
if resp != nil {
|
if resp != nil {
|
||||||
|
|||||||
Reference in New Issue
Block a user