Create repair queue

This commit is contained in:
Ben Sarmiento
2024-01-28 03:20:58 +01:00
parent f07b65d5da
commit 30dc080dba
3 changed files with 29 additions and 6 deletions

View File

@@ -19,6 +19,7 @@ type ConfigInterface interface {
GetNetworkBufferSize() int
EnableRetainFolderNameExtension() bool
EnableRetainRDTorrentName() bool
ShouldExposeFullPath() bool
ShouldIgnoreRenames() bool
ShouldServeFromRclone() bool
ShouldVerifyDownloadLink() bool
@@ -47,6 +48,7 @@ type ZurgConfig struct {
IgnoreRenames bool `yaml:"ignore_renames" json:"ignore_renames"`
RetainRDTorrentName bool `yaml:"retain_rd_torrent_name" json:"retain_rd_torrent_name"`
RetainFolderNameExtension bool `yaml:"retain_folder_name_extension" json:"retain_folder_name_extension"`
ExposeFullPath bool `yaml:"expose_full_path" json:"expose_full_path"`
CanRepair bool `yaml:"enable_repair" json:"enable_repair"`
DeleteRarFiles bool `yaml:"auto_delete_rar_torrents" json:"auto_delete_rar_torrents"`
@@ -142,6 +144,10 @@ func (z *ZurgConfig) EnableRetainRDTorrentName() bool {
return z.RetainRDTorrentName
}
func (z *ZurgConfig) ShouldExposeFullPath() bool {
return z.ExposeFullPath
}
func (z *ZurgConfig) ShouldIgnoreRenames() bool {
return !z.IgnoreRenames
}

View File

@@ -3,6 +3,7 @@ package torrent
import (
"io"
"os"
"path/filepath"
"strings"
"sync"
@@ -33,6 +34,7 @@ type TorrentManager struct {
workerPool *ants.Pool
repairPool *ants.Pool
repairTrigger chan *Torrent
repairSet mapset.Set[*Torrent]
repairRunning bool
repairRunningMu sync.Mutex
log *logutil.Logger
@@ -105,6 +107,14 @@ func (t *TorrentManager) GetKey(torrent *Torrent) string {
}
}
func (t *TorrentManager) GetPath(file *File) string {
if t.Config.ShouldExposeFullPath() {
filename := strings.TrimPrefix(file.Path, "/")
return strings.ReplaceAll(filename, "/", " - ")
}
return filepath.Base(file.Path)
}
func (t *TorrentManager) writeTorrentToFile(instanceID string, torrent *Torrent) {
filePath := "data/" + instanceID + ".json"
file, err := os.Create(filePath)

View File

@@ -23,6 +23,7 @@ func (t *TorrentManager) startRepairJob() {
return
}
t.repairTrigger = make(chan *Torrent)
t.repairSet = mapset.NewSet[*Torrent]()
// there is 1 repair worker, with max 1 blocking task
_ = t.repairPool.Submit(func() {
t.log.Info("Starting periodic repair job")
@@ -45,9 +46,11 @@ func (t *TorrentManager) invokeRepair(torrent *Torrent) {
t.repairRunningMu.Lock()
if t.repairRunning {
t.repairRunningMu.Unlock()
t.repairSet.Add(torrent)
// don't do anything if repair is already running
return
}
t.repairRunning = true
t.repairRunningMu.Unlock()
@@ -58,16 +61,19 @@ func (t *TorrentManager) invokeRepair(torrent *Torrent) {
t.repairRunningMu.Lock()
t.repairRunning = false
t.repairRunningMu.Unlock()
// before we let go, let's check repairSet
t.workerPool.Submit(func() {
queuedTorrent, exists := t.repairSet.Pop()
if exists {
t.TriggerRepair(queuedTorrent)
}
})
}
// TriggerRepair allows an on-demand repair to be initiated.
func (t *TorrentManager) TriggerRepair(torrent *Torrent) {
select {
case t.repairTrigger <- torrent:
// Repair triggered
default:
// Already a repair request pending, so do nothing
}
t.repairTrigger <- torrent
}
func (t *TorrentManager) repairAll(torrent *Torrent) {
@@ -118,6 +124,7 @@ func (t *TorrentManager) repairAll(torrent *Torrent) {
return false
})
wg.Wait()
t.log.Infof("Finished repairing %d broken torrents", toRepair.Cardinality())
}