periodic repair

This commit is contained in:
Ben Sarmiento
2024-01-27 14:14:11 +01:00
parent 05d2544fe8
commit a851282b2a
7 changed files with 97 additions and 19 deletions

View File

@@ -29,9 +29,9 @@ func MainApp(configPath string) {
zurglog := log.Named("zurg") zurglog := log.Named("zurg")
zurglog.Debugf("PID: %d", os.Getpid()) zurglog.Debugf("PID: %d", os.Getpid())
zurglog.Debugf("Version: %s", version.GetVersion()) zurglog.Infof("Version: %s", version.GetVersion())
zurglog.Debugf("GitCommit: %s", version.GetGitCommit()) zurglog.Infof("GitCommit: %s", version.GetGitCommit())
zurglog.Debugf("BuiltAt: %s", version.GetBuiltAt()) zurglog.Infof("BuiltAt: %s", version.GetBuiltAt())
config, configErr := config.LoadZurgConfig(configPath, log.Named("config")) config, configErr := config.LoadZurgConfig(configPath, log.Named("config"))
if configErr != nil { if configErr != nil {

View File

@@ -6,6 +6,7 @@ type ConfigInterface interface {
GetToken() string GetToken() string
GetNumOfWorkers() int GetNumOfWorkers() int
GetRefreshEverySeconds() int GetRefreshEverySeconds() int
GetRepairEveryMinutes() int
EnableRepair() bool EnableRepair() bool
GetHost() string GetHost() string
GetPort() string GetPort() string
@@ -40,6 +41,7 @@ type ZurgConfig struct {
Proxy string `yaml:"proxy" json:"proxy"` Proxy string `yaml:"proxy" json:"proxy"`
NumOfWorkers int `yaml:"concurrent_workers" json:"concurrent_workers"` NumOfWorkers int `yaml:"concurrent_workers" json:"concurrent_workers"`
RefreshEverySeconds int `yaml:"check_for_changes_every_secs" json:"check_for_changes_every_secs"` RefreshEverySeconds int `yaml:"check_for_changes_every_secs" json:"check_for_changes_every_secs"`
RepairEveryMins int `yaml:"repair_every_mins" json:"repair_every_mins"`
IgnoreRenames bool `yaml:"ignore_renames" json:"ignore_renames"` IgnoreRenames bool `yaml:"ignore_renames" json:"ignore_renames"`
RetainRDTorrentName bool `yaml:"retain_rd_torrent_name" json:"retain_rd_torrent_name"` RetainRDTorrentName bool `yaml:"retain_rd_torrent_name" json:"retain_rd_torrent_name"`
@@ -108,6 +110,13 @@ func (z *ZurgConfig) GetRefreshEverySeconds() int {
return z.RefreshEverySeconds return z.RefreshEverySeconds
} }
func (z *ZurgConfig) GetRepairEveryMinutes() int {
if z.RepairEveryMins == 0 {
return 10
}
return z.RepairEveryMins
}
func (z *ZurgConfig) EnableRepair() bool { func (z *ZurgConfig) EnableRepair() bool {
return z.CanRepair return z.CanRepair
} }

View File

@@ -4,6 +4,7 @@ import (
"io" "io"
"os" "os"
"strings" "strings"
"sync"
"github.com/debridmediamanager/zurg/internal/config" "github.com/debridmediamanager/zurg/internal/config"
"github.com/debridmediamanager/zurg/pkg/logutil" "github.com/debridmediamanager/zurg/pkg/logutil"
@@ -32,6 +33,9 @@ type TorrentManager struct {
requiredVersion string requiredVersion string
workerPool *ants.Pool workerPool *ants.Pool
repairPool *ants.Pool repairPool *ants.Pool
repairTrigger chan *Torrent
repairRunning bool
repairRunningMu sync.Mutex
log *logutil.Logger log *logutil.Logger
} }

View File

@@ -46,7 +46,7 @@ func (t *TorrentManager) refreshTorrents() []string {
wg.Wait() wg.Wait()
close(infoChan) close(infoChan)
t.log.Debugf("Fetched info for %d torrents", len(instances)) t.log.Infof("Fetched info for %d torrents", len(instances))
// delete expired fixers // delete expired fixers
doesNotExist.Each(func(fixerID string) bool { doesNotExist.Each(func(fixerID string) bool {
@@ -122,7 +122,7 @@ func (t *TorrentManager) refreshTorrents() []string {
// startRefreshJob periodically refreshes the torrents // startRefreshJob periodically refreshes the torrents
func (t *TorrentManager) startRefreshJob() { func (t *TorrentManager) startRefreshJob() {
_ = t.workerPool.Submit(func() { _ = t.workerPool.Submit(func() {
t.log.Info("Starting periodic refresh") t.log.Info("Starting periodic refresh job")
for { for {
<-time.After(time.Duration(t.Config.GetRefreshEverySeconds()) * time.Second) <-time.After(time.Duration(t.Config.GetRefreshEverySeconds()) * time.Second)

View File

@@ -18,17 +18,68 @@ const (
func (t *TorrentManager) startRepairJob() { func (t *TorrentManager) startRepairJob() {
if !t.Config.EnableRepair() { if !t.Config.EnableRepair() {
t.log.Info("Repair is disabled, skipping repair job") t.log.Debug("Repair is disabled, skipping repair job")
return
} }
t.repairTrigger = make(chan *Torrent)
// there is 1 repair worker, with max 1 blocking task // there is 1 repair worker, with max 1 blocking task
_ = t.repairPool.Submit(func() { _ = t.repairPool.Submit(func() {
t.repairAll() t.log.Info("Starting periodic repair job")
repairTicker := time.NewTicker(time.Duration(t.Config.GetRepairEveryMinutes()) * time.Minute)
defer repairTicker.Stop()
for {
select {
case <-repairTicker.C:
t.invokeRepair(nil)
case torrent := <-t.repairTrigger:
// On-demand trigger with a specific torrent
t.invokeRepair(torrent)
}
}
}) })
} }
func (t *TorrentManager) repairAll() { func (t *TorrentManager) invokeRepair(torrent *Torrent) {
t.repairRunningMu.Lock()
if t.repairRunning {
t.repairRunningMu.Unlock()
// don't do anything if repair is already running
return
}
t.repairRunning = true
t.repairRunningMu.Unlock()
// Execute the repair job
t.repairAll(torrent)
// After repair is done
t.repairRunningMu.Lock()
t.repairRunning = false
t.repairRunningMu.Unlock()
}
// TriggerRepair allows an on-demand repair to be initiated.
func (t *TorrentManager) TriggerRepair(torrent *Torrent) {
select {
case t.repairTrigger <- torrent:
// Repair triggered
default:
// Already a repair request pending, so do nothing
}
}
func (t *TorrentManager) repairAll(torrent *Torrent) {
t.log.Info("Periodic repair invoked; searching for broken torrents") t.log.Info("Periodic repair invoked; searching for broken torrents")
allTorrents, _ := t.DirectoryMap.Get(INT_ALL)
// todo: a more elegant way to do this
var allTorrents cmap.ConcurrentMap[string, *Torrent]
if torrent == nil {
allTorrents, _ = t.DirectoryMap.Get(INT_ALL)
} else {
allTorrents = cmap.New[*Torrent]()
allTorrents.Set(t.GetKey(torrent), torrent)
}
// collect all torrents that need to be repaired // collect all torrents that need to be repaired
toRepair := mapset.NewSet[*Torrent]() toRepair := mapset.NewSet[*Torrent]()

View File

@@ -57,9 +57,9 @@ func (dl *Downloader) DownloadFile(directory, torrentName, fileName string, resp
torrent.BrokenLinks.Add(file.Link) torrent.BrokenLinks.Add(file.Link)
// file.Link = "repair" // file.Link = "repair"
if cfg.EnableRepair() { if cfg.EnableRepair() {
torMgr.SetNewLatestState(intTor.LibraryState{}) torMgr.TriggerRepair(torrent)
} else { } else {
log.Infof("Repair is disabled, skipping repair for unavailable file %s (link=%s)", fileName, link) log.Debugf("Repair is disabled, skipping repair for unavailable file %s (link=%s)", fileName, link)
} }
http.Error(resp, "File is not available", http.StatusNotFound) http.Error(resp, "File is not available", http.StatusNotFound)
return return
@@ -167,9 +167,9 @@ func (dl *Downloader) streamFileToResponse(torrent *intTor.Torrent, file *intTor
torrent.BrokenLinks.Add(file.Link) torrent.BrokenLinks.Add(file.Link)
// file.Link = "repair" // file.Link = "repair"
if cfg.EnableRepair() && torrent != nil { if cfg.EnableRepair() && torrent != nil {
torMgr.SetNewLatestState(intTor.LibraryState{}) torMgr.TriggerRepair(torrent)
} else { } else {
log.Infof("Repair is disabled, skipping repair for unavailable file %s (link=%s)", file.Path, file.Link) log.Debugf("Repair is disabled, skipping repair for unavailable file %s (link=%s)", file.Path, file.Link)
} }
} }
http.Error(resp, "File is not available", http.StatusNotFound) http.Error(resp, "File is not available", http.StatusNotFound)
@@ -184,9 +184,9 @@ func (dl *Downloader) streamFileToResponse(torrent *intTor.Torrent, file *intTor
torrent.BrokenLinks.Add(file.Link) torrent.BrokenLinks.Add(file.Link)
// file.Link = "repair" // file.Link = "repair"
if cfg.EnableRepair() && torrent != nil { if cfg.EnableRepair() && torrent != nil {
torMgr.SetNewLatestState(intTor.LibraryState{}) torMgr.TriggerRepair(torrent)
} else { } else {
log.Infof("Repair is disabled, skipping repair for unavailable file %s (link=%s)", file.Path, file.Link) log.Debugf("Repair is disabled, skipping repair for unavailable file %s (link=%s)", file.Path, file.Link)
} }
} }
http.Error(resp, "File is not available", http.StatusNotFound) http.Error(resp, "File is not available", http.StatusNotFound)
@@ -199,7 +199,7 @@ func (dl *Downloader) streamFileToResponse(torrent *intTor.Torrent, file *intTor
} }
} }
log.Infof("Started serving file %s%s", unrestrict.Filename, rangeLog) log.Debugf("Started serving file %s%s", unrestrict.Filename, rangeLog)
buf := make([]byte, cfg.GetNetworkBufferSize()) buf := make([]byte, cfg.GetNetworkBufferSize())
io.CopyBuffer(resp, download.Body, buf) io.CopyBuffer(resp, download.Body, buf)

View File

@@ -1,6 +1,7 @@
package logutil package logutil
import ( import (
"bufio"
"bytes" "bytes"
"fmt" "fmt"
"io" "io"
@@ -106,12 +107,25 @@ func (l *Logger) GetLogsFromFile() (string, error) {
} }
defer file.Close() defer file.Close()
var buffer bytes.Buffer const maxLines = 100000
_, err = io.Copy(&buffer, file) var lines []string
if err != nil {
scanner := bufio.NewScanner(file)
for scanner.Scan() {
lines = append(lines, scanner.Text())
if len(lines) > maxLines {
lines = lines[1:]
}
}
if err := scanner.Err(); err != nil {
return "", err return "", err
} }
var buffer bytes.Buffer
for _, line := range lines {
buffer.WriteString(line + "\n")
}
return buffer.String(), nil return buffer.String(), nil
} }