diff --git a/internal/app.go b/internal/app.go index ead8a47..3282e63 100644 --- a/internal/app.go +++ b/internal/app.go @@ -29,9 +29,9 @@ func MainApp(configPath string) { zurglog := log.Named("zurg") zurglog.Debugf("PID: %d", os.Getpid()) - zurglog.Debugf("Version: %s", version.GetVersion()) - zurglog.Debugf("GitCommit: %s", version.GetGitCommit()) - zurglog.Debugf("BuiltAt: %s", version.GetBuiltAt()) + zurglog.Infof("Version: %s", version.GetVersion()) + zurglog.Infof("GitCommit: %s", version.GetGitCommit()) + zurglog.Infof("BuiltAt: %s", version.GetBuiltAt()) config, configErr := config.LoadZurgConfig(configPath, log.Named("config")) if configErr != nil { diff --git a/internal/config/types.go b/internal/config/types.go index 97f238a..735c818 100644 --- a/internal/config/types.go +++ b/internal/config/types.go @@ -6,6 +6,7 @@ type ConfigInterface interface { GetToken() string GetNumOfWorkers() int GetRefreshEverySeconds() int + GetRepairEveryMinutes() int EnableRepair() bool GetHost() string GetPort() string @@ -40,6 +41,7 @@ type ZurgConfig struct { Proxy string `yaml:"proxy" json:"proxy"` NumOfWorkers int `yaml:"concurrent_workers" json:"concurrent_workers"` RefreshEverySeconds int `yaml:"check_for_changes_every_secs" json:"check_for_changes_every_secs"` + RepairEveryMins int `yaml:"repair_every_mins" json:"repair_every_mins"` IgnoreRenames bool `yaml:"ignore_renames" json:"ignore_renames"` RetainRDTorrentName bool `yaml:"retain_rd_torrent_name" json:"retain_rd_torrent_name"` @@ -108,6 +110,13 @@ func (z *ZurgConfig) GetRefreshEverySeconds() int { return z.RefreshEverySeconds } +func (z *ZurgConfig) GetRepairEveryMinutes() int { + if z.RepairEveryMins == 0 { + return 10 + } + return z.RepairEveryMins +} + func (z *ZurgConfig) EnableRepair() bool { return z.CanRepair } diff --git a/internal/torrent/manager.go b/internal/torrent/manager.go index 208f9db..92d6b71 100644 --- a/internal/torrent/manager.go +++ b/internal/torrent/manager.go @@ -4,6 +4,7 @@ import ( "io" "os" "strings" + "sync" "github.com/debridmediamanager/zurg/internal/config" "github.com/debridmediamanager/zurg/pkg/logutil" @@ -32,6 +33,9 @@ type TorrentManager struct { requiredVersion string workerPool *ants.Pool repairPool *ants.Pool + repairTrigger chan *Torrent + repairRunning bool + repairRunningMu sync.Mutex log *logutil.Logger } diff --git a/internal/torrent/refresh.go b/internal/torrent/refresh.go index b28291a..8358479 100644 --- a/internal/torrent/refresh.go +++ b/internal/torrent/refresh.go @@ -46,7 +46,7 @@ func (t *TorrentManager) refreshTorrents() []string { wg.Wait() close(infoChan) - t.log.Debugf("Fetched info for %d torrents", len(instances)) + t.log.Infof("Fetched info for %d torrents", len(instances)) // delete expired fixers doesNotExist.Each(func(fixerID string) bool { @@ -122,7 +122,7 @@ func (t *TorrentManager) refreshTorrents() []string { // startRefreshJob periodically refreshes the torrents func (t *TorrentManager) startRefreshJob() { _ = t.workerPool.Submit(func() { - t.log.Info("Starting periodic refresh") + t.log.Info("Starting periodic refresh job") for { <-time.After(time.Duration(t.Config.GetRefreshEverySeconds()) * time.Second) diff --git a/internal/torrent/repair.go b/internal/torrent/repair.go index 4c05acc..2dc6ee0 100644 --- a/internal/torrent/repair.go +++ b/internal/torrent/repair.go @@ -18,17 +18,68 @@ const ( func (t *TorrentManager) startRepairJob() { if !t.Config.EnableRepair() { - t.log.Info("Repair is disabled, skipping repair job") + t.log.Debug("Repair is disabled, skipping repair job") + return } + t.repairTrigger = make(chan *Torrent) // there is 1 repair worker, with max 1 blocking task _ = t.repairPool.Submit(func() { - t.repairAll() + t.log.Info("Starting periodic repair job") + repairTicker := time.NewTicker(time.Duration(t.Config.GetRepairEveryMinutes()) * time.Minute) + defer repairTicker.Stop() + + for { + select { + case <-repairTicker.C: + t.invokeRepair(nil) + case torrent := <-t.repairTrigger: + // On-demand trigger with a specific torrent + t.invokeRepair(torrent) + } + } }) } -func (t *TorrentManager) repairAll() { +func (t *TorrentManager) invokeRepair(torrent *Torrent) { + t.repairRunningMu.Lock() + if t.repairRunning { + t.repairRunningMu.Unlock() + // don't do anything if repair is already running + return + } + t.repairRunning = true + t.repairRunningMu.Unlock() + + // Execute the repair job + t.repairAll(torrent) + + // After repair is done + t.repairRunningMu.Lock() + t.repairRunning = false + t.repairRunningMu.Unlock() +} + +// TriggerRepair allows an on-demand repair to be initiated. +func (t *TorrentManager) TriggerRepair(torrent *Torrent) { + select { + case t.repairTrigger <- torrent: + // Repair triggered + default: + // Already a repair request pending, so do nothing + } +} + +func (t *TorrentManager) repairAll(torrent *Torrent) { t.log.Info("Periodic repair invoked; searching for broken torrents") - allTorrents, _ := t.DirectoryMap.Get(INT_ALL) + + // todo: a more elegant way to do this + var allTorrents cmap.ConcurrentMap[string, *Torrent] + if torrent == nil { + allTorrents, _ = t.DirectoryMap.Get(INT_ALL) + } else { + allTorrents = cmap.New[*Torrent]() + allTorrents.Set(t.GetKey(torrent), torrent) + } // collect all torrents that need to be repaired toRepair := mapset.NewSet[*Torrent]() diff --git a/internal/universal/downloader.go b/internal/universal/downloader.go index dd3c491..5c0d79f 100644 --- a/internal/universal/downloader.go +++ b/internal/universal/downloader.go @@ -57,9 +57,9 @@ func (dl *Downloader) DownloadFile(directory, torrentName, fileName string, resp torrent.BrokenLinks.Add(file.Link) // file.Link = "repair" if cfg.EnableRepair() { - torMgr.SetNewLatestState(intTor.LibraryState{}) + torMgr.TriggerRepair(torrent) } else { - log.Infof("Repair is disabled, skipping repair for unavailable file %s (link=%s)", fileName, link) + log.Debugf("Repair is disabled, skipping repair for unavailable file %s (link=%s)", fileName, link) } http.Error(resp, "File is not available", http.StatusNotFound) return @@ -167,9 +167,9 @@ func (dl *Downloader) streamFileToResponse(torrent *intTor.Torrent, file *intTor torrent.BrokenLinks.Add(file.Link) // file.Link = "repair" if cfg.EnableRepair() && torrent != nil { - torMgr.SetNewLatestState(intTor.LibraryState{}) + torMgr.TriggerRepair(torrent) } else { - log.Infof("Repair is disabled, skipping repair for unavailable file %s (link=%s)", file.Path, file.Link) + log.Debugf("Repair is disabled, skipping repair for unavailable file %s (link=%s)", file.Path, file.Link) } } http.Error(resp, "File is not available", http.StatusNotFound) @@ -184,9 +184,9 @@ func (dl *Downloader) streamFileToResponse(torrent *intTor.Torrent, file *intTor torrent.BrokenLinks.Add(file.Link) // file.Link = "repair" if cfg.EnableRepair() && torrent != nil { - torMgr.SetNewLatestState(intTor.LibraryState{}) + torMgr.TriggerRepair(torrent) } else { - log.Infof("Repair is disabled, skipping repair for unavailable file %s (link=%s)", file.Path, file.Link) + log.Debugf("Repair is disabled, skipping repair for unavailable file %s (link=%s)", file.Path, file.Link) } } http.Error(resp, "File is not available", http.StatusNotFound) @@ -199,7 +199,7 @@ func (dl *Downloader) streamFileToResponse(torrent *intTor.Torrent, file *intTor } } - log.Infof("Started serving file %s%s", unrestrict.Filename, rangeLog) + log.Debugf("Started serving file %s%s", unrestrict.Filename, rangeLog) buf := make([]byte, cfg.GetNetworkBufferSize()) io.CopyBuffer(resp, download.Body, buf) diff --git a/pkg/logutil/factory.go b/pkg/logutil/factory.go index c21d75a..1ed814d 100644 --- a/pkg/logutil/factory.go +++ b/pkg/logutil/factory.go @@ -1,6 +1,7 @@ package logutil import ( + "bufio" "bytes" "fmt" "io" @@ -106,12 +107,25 @@ func (l *Logger) GetLogsFromFile() (string, error) { } defer file.Close() - var buffer bytes.Buffer - _, err = io.Copy(&buffer, file) - if err != nil { + const maxLines = 100000 + var lines []string + + scanner := bufio.NewScanner(file) + for scanner.Scan() { + lines = append(lines, scanner.Text()) + if len(lines) > maxLines { + lines = lines[1:] + } + } + if err := scanner.Err(); err != nil { return "", err } + var buffer bytes.Buffer + for _, line := range lines { + buffer.WriteString(line + "\n") + } + return buffer.String(), nil }