Reunify workers, add commands

This commit is contained in:
Ben Sarmiento
2024-01-30 16:27:12 +01:00
parent 4e389fa79c
commit 7794e641ab
7 changed files with 29 additions and 54 deletions

View File

@@ -57,32 +57,14 @@ func MainApp(configPath string) {
} }
defer workerPool.Release() defer workerPool.Release()
// extra 1 worker for the refresh job
refreshPool, err := ants.NewPool(1)
if err != nil {
zurglog.Errorf("Failed to create refresh worker: %v", err)
os.Exit(1)
}
defer refreshPool.Release()
var repairPool *ants.Pool
if config.EnableRepair() {
repairPool, err = ants.NewPool(1)
if err != nil {
zurglog.Errorf("Failed to create repair worker: %v", err)
os.Exit(1)
}
defer repairPool.Release()
}
utils.EnsureDirExists("data") // Ensure the data directory exists utils.EnsureDirExists("data") // Ensure the data directory exists
torrentMgr := torrent.NewTorrentManager(config, rd, workerPool, refreshPool, repairPool, log.Named("manager")) torrentMgr := torrent.NewTorrentManager(config, rd, workerPool, log.Named("manager"))
downloadClient := http.NewHTTPClient(config.GetToken(), config.GetRetriesUntilFailed(), config.GetDownloadTimeoutSecs(), true, config, log.Named("dlclient")) downloadClient := http.NewHTTPClient(config.GetToken(), config.GetRetriesUntilFailed(), config.GetDownloadTimeoutSecs(), true, config, log.Named("dlclient"))
downloader := universal.NewDownloader(downloadClient) downloader := universal.NewDownloader(downloadClient)
router := chi.NewRouter() router := chi.NewRouter()
handlers.AttachHandlers(router, downloader, torrentMgr, config, rd, workerPool, refreshPool, repairPool, log.Named("router")) handlers.AttachHandlers(router, downloader, torrentMgr, config, rd, workerPool, log.Named("router"))
// go func() { // go func() {
// if err := netHttp.ListenAndServe(":6060", nil); err != nil && err != netHttp.ErrServerClosed { // if err := netHttp.ListenAndServe(":6060", nil); err != nil && err != netHttp.ErrServerClosed {

View File

@@ -100,7 +100,7 @@ func (z *ZurgConfig) GetProxy() string {
func (z *ZurgConfig) GetNumOfWorkers() int { func (z *ZurgConfig) GetNumOfWorkers() int {
if z.NumOfWorkers == 0 { if z.NumOfWorkers == 0 {
return 32 return 20
} }
return z.NumOfWorkers return z.NumOfWorkers
} }

View File

@@ -36,15 +36,13 @@ func init() {
chi.RegisterMethod("MOVE") chi.RegisterMethod("MOVE")
} }
func AttachHandlers(router *chi.Mux, downloader *universal.Downloader, torMgr *torrent.TorrentManager, cfg config.ConfigInterface, api *realdebrid.RealDebrid, workerPool, refreshPool, repairPool *ants.Pool, log *logutil.Logger) { func AttachHandlers(router *chi.Mux, downloader *universal.Downloader, torMgr *torrent.TorrentManager, cfg config.ConfigInterface, api *realdebrid.RealDebrid, workerPool *ants.Pool, log *logutil.Logger) {
hs := &Handlers{ hs := &Handlers{
downloader: downloader, downloader: downloader,
torMgr: torMgr, torMgr: torMgr,
cfg: cfg, cfg: cfg,
api: api, api: api,
workerPool: workerPool, workerPool: workerPool,
refreshPool: refreshPool,
repairPool: repairPool,
log: log, log: log,
} }

View File

@@ -30,17 +30,20 @@ func (t *TorrentManager) handleFixers() {
} }
command, _ := t.fixers.Pop(id) command, _ := t.fixers.Pop(id)
switch command { switch command {
case "delete": case "delete_replaced":
t.log.Debugf("Deleting old id=%s because it's redundant to fixed %s ", id, t.GetKey(torrent))
toDelete = append(toDelete, id)
case "delete_failed":
t.log.Debugf("Deleting failed fixer id=%s of torrent %s", id, t.GetKey(torrent))
toDelete = append(toDelete, id) toDelete = append(toDelete, id)
case "repair": case "repair":
t.log.Debugf("Repairing torrent %s again now that fixer id=%s is done", t.GetKey(torrent), id)
toDelete = append(toDelete, id) toDelete = append(toDelete, id)
t.log.Debugf("Repairing torrent %s again now that fixer is done", t.GetKey(torrent))
repairMe, _ := allTorrents.Get(t.GetKey(torrent)) repairMe, _ := allTorrents.Get(t.GetKey(torrent))
t.TriggerRepair(repairMe) t.TriggerRepair(repairMe)
} }
}) })
for _, id := range toDelete { for _, id := range toDelete {
t.log.Debugf("Deleting fixer torrent id=%s", id)
t.Api.DeleteTorrent(id) t.Api.DeleteTorrent(id)
infoCache.Remove(id) infoCache.Remove(id)
t.deleteTorrentFile(id) t.deleteTorrentFile(id)

View File

@@ -31,10 +31,8 @@ type TorrentManager struct {
latestState *LibraryState latestState *LibraryState
requiredVersion string requiredVersion string
workerPool *ants.Pool workerPool *ants.Pool
refreshPool *ants.Pool
RefreshKillSwitch chan struct{} RefreshKillSwitch chan struct{}
RepairKillSwitch chan struct{} RepairKillSwitch chan struct{}
repairPool *ants.Pool
repairTrigger chan *Torrent repairTrigger chan *Torrent
repairSet mapset.Set[*Torrent] repairSet mapset.Set[*Torrent]
repairRunning bool repairRunning bool
@@ -45,7 +43,7 @@ type TorrentManager struct {
// NewTorrentManager creates a new torrent manager // NewTorrentManager creates a new torrent manager
// it will fetch all torrents and their info in the background // it will fetch all torrents and their info in the background
// and store them in-memory and cached in files // and store them in-memory and cached in files
func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, workerPool, refreshPool, repairPool *ants.Pool, log *logutil.Logger) *TorrentManager { func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, workerPool *ants.Pool, log *logutil.Logger) *TorrentManager {
t := &TorrentManager{ t := &TorrentManager{
Config: cfg, Config: cfg,
Api: api, Api: api,
@@ -58,8 +56,6 @@ func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, w
latestState: &LibraryState{}, latestState: &LibraryState{},
requiredVersion: "0.9.3-hotfix.4", requiredVersion: "0.9.3-hotfix.4",
workerPool: workerPool, workerPool: workerPool,
refreshPool: refreshPool,
repairPool: repairPool,
log: log, log: log,
} }
t.fixers = t.readFixersFromFile() t.fixers = t.readFixersFromFile()

View File

@@ -95,7 +95,7 @@ func (t *TorrentManager) refreshTorrents() []string {
// StartRefreshJob periodically refreshes the torrents // StartRefreshJob periodically refreshes the torrents
func (t *TorrentManager) StartRefreshJob() { func (t *TorrentManager) StartRefreshJob() {
_ = t.refreshPool.Submit(func() { _ = t.workerPool.Submit(func() {
t.log.Info("Starting periodic refresh job") t.log.Info("Starting periodic refresh job")
refreshTicker := time.NewTicker(time.Duration(t.Config.GetRefreshEverySecs()) * time.Second) refreshTicker := time.NewTicker(time.Duration(t.Config.GetRefreshEverySecs()) * time.Second)
defer refreshTicker.Stop() defer refreshTicker.Stop()

View File

@@ -25,7 +25,7 @@ func (t *TorrentManager) StartRepairJob() {
t.repairTrigger = make(chan *Torrent) t.repairTrigger = make(chan *Torrent)
t.repairSet = mapset.NewSet[*Torrent]() t.repairSet = mapset.NewSet[*Torrent]()
// there is 1 repair worker, with max 1 blocking task // there is 1 repair worker, with max 1 blocking task
_ = t.repairPool.Submit(func() { _ = t.workerPool.Submit(func() {
t.log.Info("Starting periodic repair job") t.log.Info("Starting periodic repair job")
repairTicker := time.NewTicker(time.Duration(t.Config.GetRepairEveryMins()) * time.Minute) repairTicker := time.NewTicker(time.Duration(t.Config.GetRepairEveryMins()) * time.Minute)
defer repairTicker.Stop() defer repairTicker.Stop()
@@ -211,13 +211,13 @@ func (t *TorrentManager) repair(torrent *Torrent) {
// second step: download the broken files // second step: download the broken files
if len(brokenFiles) > 0 { if len(brokenFiles) > 0 {
t.log.Infof("Repairing by downloading only the %d broken out of %d files of torrent %s", len(brokenFiles), torrent.SelectedFiles.Count(), t.GetKey(torrent)) t.log.Infof("Repairing by downloading only the %d broken out of %d files of torrent %s", len(brokenFiles), torrent.SelectedFiles.Count(), t.GetKey(torrent))
info, err := t.redownloadTorrent(torrent, brokenFileIDs) redownloadedTorrent, err := t.redownloadTorrent(torrent, brokenFileIDs)
if err != nil { if err != nil {
t.log.Warnf("Cannot repair torrent %s by downloading broken files (error=%s) giving up", t.GetKey(torrent), err.Error()) t.log.Warnf("Cannot repair torrent %s by downloading broken files (error=%s) giving up", t.GetKey(torrent), err.Error())
return return
} }
if info != nil { if redownloadedTorrent != nil {
t.fixerAddCommand(info.ID, "repair") t.fixerAddCommand(redownloadedTorrent.ID, "repair")
return return
} }
} }
@@ -335,14 +335,15 @@ func (t *TorrentManager) redownloadTorrent(torrent *Torrent, selection string) (
return nil, fmt.Errorf("cannot redownload torrent: %v", err) return nil, fmt.Errorf("cannot redownload torrent: %v", err)
} }
newTorrentID := resp.ID
// sleep for 1 second to let RD process the magnet // sleep for 1 second to let RD process the magnet
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
// select files // select files
newTorrentID := resp.ID
err = t.Api.SelectTorrentFiles(newTorrentID, selection) err = t.Api.SelectTorrentFiles(newTorrentID, selection)
if err != nil { if err != nil {
t.fixerAddCommand(newTorrentID, "delete") t.fixerAddCommand(newTorrentID, "delete_failed")
return nil, fmt.Errorf("cannot start redownloading: %v", err) return nil, fmt.Errorf("cannot start redownloading: %v", err)
} }
@@ -352,7 +353,7 @@ func (t *TorrentManager) redownloadTorrent(torrent *Torrent, selection string) (
// see if the torrent is ready // see if the torrent is ready
info, err := t.Api.GetTorrentInfo(newTorrentID) info, err := t.Api.GetTorrentInfo(newTorrentID)
if err != nil { if err != nil {
t.fixerAddCommand(newTorrentID, "delete") t.fixerAddCommand(newTorrentID, "delete_failed")
return nil, fmt.Errorf("cannot get info on redownloaded torrent %s (id=%s) : %v", t.GetKey(torrent), newTorrentID, err) return nil, fmt.Errorf("cannot get info on redownloaded torrent %s (id=%s) : %v", t.GetKey(torrent), newTorrentID, err)
} }
@@ -367,14 +368,14 @@ func (t *TorrentManager) redownloadTorrent(torrent *Torrent, selection string) (
} }
} }
if !isOkStatus { if !isOkStatus {
t.fixerAddCommand(newTorrentID, "delete") t.fixerAddCommand(newTorrentID, "delete_failed")
return nil, fmt.Errorf("the redownloaded torrent %s (id=%s) is in error state: %s", t.GetKey(torrent), newTorrentID, info.Status) return nil, fmt.Errorf("the redownloaded torrent %s (id=%s) is in error state: %s", t.GetKey(torrent), newTorrentID, info.Status)
} }
// check if incorrect number of links // check if incorrect number of links
selectionCount := len(strings.Split(selection, ",")) selectionCount := len(strings.Split(selection, ","))
if info.Progress == 100 && len(info.Links) != selectionCount { if info.Progress == 100 && len(info.Links) != selectionCount {
t.fixerAddCommand(newTorrentID, "delete") t.fixerAddCommand(newTorrentID, "delete_failed")
return nil, fmt.Errorf("it did not fix the issue for %s (id=%s), only got %d files but we need %d, undoing", t.GetKey(torrent), info.ID, len(info.Links), selectionCount) return nil, fmt.Errorf("it did not fix the issue for %s (id=%s), only got %d files but we need %d, undoing", t.GetKey(torrent), info.ID, len(info.Links), selectionCount)
} }
@@ -382,13 +383,8 @@ func (t *TorrentManager) redownloadTorrent(torrent *Torrent, selection string) (
if len(oldTorrentIDs) > 0 { if len(oldTorrentIDs) > 0 {
// replace the old torrent (empty selection) // replace the old torrent (empty selection)
infoCache, _ := t.DirectoryMap.Get(INT_INFO_CACHE)
for _, id := range oldTorrentIDs { for _, id := range oldTorrentIDs {
t.log.Debugf("Deleting torrent %s (id=%s) to replace with repaired torrent", t.GetKey(torrent), id) t.fixerAddCommand(id, "delete_replaced")
torrent.DownloadedIDs.Remove(id)
t.Api.DeleteTorrent(id)
infoCache.Remove(id)
t.deleteTorrentFile(id)
} }
} }
return info, nil return info, nil