Use normal pool, add semaphore lock

This commit is contained in:
Ben Sarmiento
2024-01-11 06:10:43 +01:00
parent 666975211a
commit 3cdc2f8791
2 changed files with 11 additions and 15 deletions

View File

@@ -31,7 +31,6 @@ type TorrentManager struct {
latestState *LibraryState latestState *LibraryState
requiredVersion string requiredVersion string
workerPool *ants.Pool workerPool *ants.Pool
repairWorker *ants.Pool
log *logutil.Logger log *logutil.Logger
} }
@@ -95,11 +94,6 @@ func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, p
t.SetNewLatestState(t.getCurrentState()) t.SetNewLatestState(t.getCurrentState())
if t.Config.EnableRepair() { if t.Config.EnableRepair() {
repairWorker, err := ants.NewPool(1)
if err != nil {
log.Fatalf("Failed to create repair worker: %v", err)
}
t.repairWorker = repairWorker
t.RepairAll() // initial repair t.RepairAll() // initial repair
} else { } else {
t.log.Info("Repair is disabled, skipping repair check") t.log.Info("Repair is disabled, skipping repair check")

View File

@@ -12,10 +12,13 @@ import (
cmap "github.com/orcaman/concurrent-map/v2" cmap "github.com/orcaman/concurrent-map/v2"
) )
const EXPIRED_LINK_TOLERANCE_HOURS = 24 const (
EXPIRED_LINK_TOLERANCE_HOURS = 24
REPAIR_SEMAPHORE = "semaphore"
)
func (t *TorrentManager) RepairAll() { func (t *TorrentManager) RepairAll() {
_ = t.repairWorker.Submit(func() { _ = t.workerPool.Submit(func() {
t.log.Info("Repairing all broken torrents") t.log.Info("Repairing all broken torrents")
t.repairAll() t.repairAll()
t.log.Info("Finished repairing all torrents") t.log.Info("Finished repairing all torrents")
@@ -93,21 +96,22 @@ func (t *TorrentManager) repairAll() {
t.log.Debugf("Found %d broken torrents to repair in total", len(toRepair)) t.log.Debugf("Found %d broken torrents to repair in total", len(toRepair))
for i := range toRepair { for i := range toRepair {
torrent := toRepair[i] torrent := toRepair[i]
t.log.Infof("Repairing %s", t.GetKey(torrent)) t.Repair(torrent)
t.repair(torrent)
} }
} }
func (t *TorrentManager) Repair(torrent *Torrent) { func (t *TorrentManager) Repair(torrent *Torrent) {
// save the broken files to the file cache
infoCache, _ := t.DirectoryMap.Get(INT_INFO_CACHE) infoCache, _ := t.DirectoryMap.Get(INT_INFO_CACHE)
torrent.DownloadedIDs.Each(func(id string) bool { torrent.DownloadedIDs.Each(func(id string) bool {
infoCache.Set(id, torrent) infoCache.Set(id, torrent)
t.writeTorrentToFile(id, torrent, false) t.writeTorrentToFile(id, torrent, false)
return false return false
}) })
_ = t.repairWorker.Submit(func() { _ = t.workerPool.Submit(func() {
t.log.Infof("Repairing torrent %s", t.GetKey(torrent)) t.log.Infof("Repairing torrent %s", t.GetKey(torrent))
t.repair(torrent) t.repair(torrent)
torrent.InProgressIDs.Remove(REPAIR_SEMAPHORE)
t.log.Infof("Finished repairing torrent %s", t.GetKey(torrent)) t.log.Infof("Finished repairing torrent %s", t.GetKey(torrent))
}) })
} }
@@ -118,6 +122,8 @@ func (t *TorrentManager) repair(torrent *Torrent) {
return return
} }
torrent.InProgressIDs.Add(REPAIR_SEMAPHORE)
proceed := t.canCapacityHandle() // blocks for approx 45 minutes if active torrents are full proceed := t.canCapacityHandle() // blocks for approx 45 minutes if active torrents are full
if !proceed { if !proceed {
t.log.Error("Reached the max number of active torrents, cannot continue with the repair") t.log.Error("Reached the max number of active torrents, cannot continue with the repair")
@@ -140,9 +146,6 @@ func (t *TorrentManager) repair(torrent *Torrent) {
t.log.Warnf("Torrent %s is not older than %d hours to be repaired by reinsertion, will only redownload broken files...", t.GetKey(torrent), EXPIRED_LINK_TOLERANCE_HOURS) t.log.Warnf("Torrent %s is not older than %d hours to be repaired by reinsertion, will only redownload broken files...", t.GetKey(torrent), EXPIRED_LINK_TOLERANCE_HOURS)
} }
// sleep for 30 seconds to let the torrent accumulate more broken files if scanning
time.Sleep(30 * time.Second)
// handle rar'ed torrents // handle rar'ed torrents
assignedCount := 0 assignedCount := 0
rarCount := 0 rarCount := 0
@@ -271,7 +274,6 @@ func (t *TorrentManager) reinsertTorrent(torrent *Torrent, brokenFiles string) b
t.Api.DeleteTorrent(newTorrentID) t.Api.DeleteTorrent(newTorrentID)
return false return false
} }
time.Sleep(10 * time.Second)
// see if the torrent is ready // see if the torrent is ready
info, err := t.Api.GetTorrentInfo(newTorrentID) info, err := t.Api.GetTorrentInfo(newTorrentID)