diff --git a/internal/handlers/home.go b/internal/handlers/home.go index 070b834..2dab313 100644 --- a/internal/handlers/home.go +++ b/internal/handlers/home.go @@ -18,24 +18,25 @@ type SponsorResponse struct { Paypal string `json:"paypal"` } type RootResponse struct { - Version string `json:"version"` - BuiltAt string `json:"built_at"` - GitCommit string `json:"git_commit"` - Html string `json:"html"` - Dav string `json:"dav"` - Infuse string `json:"infuse"` - Logs string `json:"logs"` - UserInfo *realdebrid.User `json:"user_info"` // Replace UserInfoType with the actual type - LibrarySize int `json:"library_size"` // Number of torrents in the library - MemAlloc uint64 `json:"mem_alloc"` // Memory allocation in MB - TotalAlloc uint64 `json:"total_alloc"` // Total memory allocated in MB - Sys uint64 `json:"sys"` // System memory in MB - NumGC uint32 `json:"num_gc"` // Number of completed GC cycles - PID int `json:"pid"` // Process ID - Sponsor SponsorResponse `json:"sponsor_zurg"` // Sponsorship links - Config config.ZurgConfig `json:"config"` - ImmediateBin []string `json:"immediate_bin"` - OnceDoneBin []string `json:"once_done_bin"` + Version string `json:"version"` + BuiltAt string `json:"built_at"` + GitCommit string `json:"git_commit"` + Html string `json:"html"` + Dav string `json:"dav"` + Infuse string `json:"infuse"` + Logs string `json:"logs"` + UserInfo *realdebrid.User `json:"user_info"` // Replace UserInfoType with the actual type + LibrarySize int `json:"library_size"` // Number of torrents in the library + RepairQueueStr string `json:"repair_queue"` // List of torrents in the repair queue + MemAlloc uint64 `json:"mem_alloc"` // Memory allocation in MB + TotalAlloc uint64 `json:"total_alloc"` // Total memory allocated in MB + Sys uint64 `json:"sys"` // System memory in MB + NumGC uint32 `json:"num_gc"` // Number of completed GC cycles + PID int `json:"pid"` // Process ID + Sponsor SponsorResponse `json:"sponsor_zurg"` // Sponsorship links + Config config.ZurgConfig `json:"config"` + ImmediateBin []string `json:"immediate_bin"` + OnceDoneBin []string `json:"once_done_bin"` } func (zr *Handlers) handleHome(resp http.ResponseWriter, req *http.Request) { @@ -50,21 +51,37 @@ func (zr *Handlers) handleHome(resp http.ResponseWriter, req *http.Request) { allTorrents, _ := zr.torMgr.DirectoryMap.Get(config.ALL_TORRENTS) + repairQueueStr := "" + if zr.torMgr.RepairQueue == nil { + repairQueueStr = "repair is disabled" + } else if zr.torMgr.RepairQueue.IsEmpty() { + repairQueueStr = "empty" + } else { + for _, torrent := range zr.torMgr.RepairQueue.ToSlice() { + if torrent == nil { + repairQueueStr += " ⬅ 'all torrents'" + } else { + repairQueueStr += fmt.Sprintf(" ⬅ %s", zr.torMgr.GetKey(torrent)) + } + } + } + response := RootResponse{ - Version: version.GetVersion(), - BuiltAt: version.GetBuiltAt(), - GitCommit: version.GetGitCommit(), - Html: fmt.Sprintf("//%s/http/", req.Host), - Dav: fmt.Sprintf("//%s/dav/", req.Host), - Infuse: fmt.Sprintf("//%s/infuse/", req.Host), - Logs: fmt.Sprintf("//%s/logs/", req.Host), - UserInfo: userInfo, - LibrarySize: allTorrents.Count(), - MemAlloc: bToMb(mem.Alloc), - TotalAlloc: bToMb(mem.TotalAlloc), - Sys: bToMb(mem.Sys), - NumGC: mem.NumGC, - PID: os.Getpid(), + Version: version.GetVersion(), + BuiltAt: version.GetBuiltAt(), + GitCommit: version.GetGitCommit(), + Html: fmt.Sprintf("//%s/http/", req.Host), + Dav: fmt.Sprintf("//%s/dav/", req.Host), + Infuse: fmt.Sprintf("//%s/infuse/", req.Host), + Logs: fmt.Sprintf("//%s/logs/", req.Host), + UserInfo: userInfo, + LibrarySize: allTorrents.Count(), + RepairQueueStr: repairQueueStr, + MemAlloc: bToMb(mem.Alloc), + TotalAlloc: bToMb(mem.TotalAlloc), + Sys: bToMb(mem.Sys), + NumGC: mem.NumGC, + PID: os.Getpid(), Sponsor: SponsorResponse{ Patreon: "https://www.patreon.com/debridmediamanager", Github: "https://github.com/sponsors/debridmediamanager", @@ -111,6 +128,10 @@ func (zr *Handlers) handleHome(resp http.ResponseWriter, req *http.Request) { Library Size %d items + + Repair Queue + %s + Memory Allocation %d MB @@ -262,8 +283,21 @@ func (zr *Handlers) handleHome(resp http.ResponseWriter, req *http.Request) { Utilities
- + Share the link to get help
+
+ Copy all your zurgtorrent files to your dump folder +
+
+ Required to use media_info_* filters +
+
+ Trigger repair of all torrents +
+ + + Debug +
@@ -276,15 +310,6 @@ func (zr *Handlers) handleHome(resp http.ResponseWriter, req *http.Request) {
-
- -
-
- -
-
- -
@@ -302,6 +327,7 @@ func (zr *Handlers) handleHome(resp http.ResponseWriter, req *http.Request) { response.Logs, response.Logs, response.LibrarySize, + response.RepairQueueStr, response.MemAlloc, response.TotalAlloc, response.Sys, @@ -395,7 +421,7 @@ func (zr *Handlers) handleAnalyzeTorrents(resp http.ResponseWriter, req *http.Re func (zr *Handlers) handleTriggerRepairAll(resp http.ResponseWriter, req *http.Request) { resp.Header().Set("Refresh", "2; url=/") - zr.torMgr.RepairAllTrigger <- struct{}{} + zr.torMgr.EnqueueForRepair(nil) zr.log.Infof("Triggered repair of all torrents") fmt.Fprint(resp, "Repairing all torrents...") } diff --git a/internal/handlers/router.go b/internal/handlers/router.go index 0cc70be..d8c528e 100644 --- a/internal/handlers/router.go +++ b/internal/handlers/router.go @@ -50,10 +50,12 @@ func AttachHandlers(router *chi.Mux, downloader *universal.Downloader, torMgr *t router.Use(hs.options) router.Get("/", hs.handleHome) + // debug router.Post("/reboot-worker", hs.handleRebootWorkerPool) router.Post("/reboot-refresh", hs.handleRebootRefreshWorker) router.Post("/reboot-repair", hs.handleRebootRepairWorker) router.Post("/downloads/remount", hs.handleRemountDownloads) + // utils router.Post("/torrents/dump", hs.handleDumpTorrents) router.Post("/torrents/analyze", hs.handleAnalyzeTorrents) router.Post("/torrents/repair", hs.handleTriggerRepairAll) diff --git a/internal/torrent/manager.go b/internal/torrent/manager.go index f5dbe8d..c3ca4d9 100644 --- a/internal/torrent/manager.go +++ b/internal/torrent/manager.go @@ -49,8 +49,8 @@ type TorrentManager struct { latestState *LibraryState inProgressHashes mapset.Set[string] - repairTrigger chan *Torrent - repairQueue mapset.Set[*Torrent] + repairChan chan *Torrent + RepairQueue mapset.Set[*Torrent] repairRunning bool repairRunningMu sync.Mutex @@ -129,7 +129,7 @@ func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, w t.setNewLatestState(t.getCurrentState()) - t.TriggerRepair(nil) + t.EnqueueForRepair(nil) }) return t diff --git a/internal/torrent/repair.go b/internal/torrent/repair.go index 393cbd8..541be58 100644 --- a/internal/torrent/repair.go +++ b/internal/torrent/repair.go @@ -21,13 +21,14 @@ const ( func (t *TorrentManager) StartRepairJob() { if !t.Config.EnableRepair() { - t.repairLog.Debug("Repair is disabled, skipping repair job") + t.repairLog.Warn("Repair is disabled, skipping repair job") return } - t.repairTrigger = make(chan *Torrent) - t.repairQueue = mapset.NewSet[*Torrent]() + + t.repairChan = make(chan *Torrent) + t.RepairQueue = mapset.NewSet[*Torrent]() t.RepairAllTrigger = make(chan struct{}) - // there is 1 repair worker, with max 1 blocking task + t.workerPool.Submit(func() { t.repairLog.Debug("Starting periodic repair job") repairTicker := time.NewTicker(time.Duration(t.Config.GetRepairEveryMins()) * time.Minute) @@ -36,11 +37,20 @@ func (t *TorrentManager) StartRepairJob() { for { select { case <-repairTicker.C: - t.invokeRepair(nil) + t.repairLog.Debug("Periodic repair started; searching for broken torrents") + t.EnqueueForRepair(nil) case <-t.RepairAllTrigger: - t.invokeRepair(nil) - case torrent := <-t.repairTrigger: - // On-demand trigger with a specific torrent + t.repairLog.Debug("Manual repair of all torrents triggered; searching for broken torrents") + t.EnqueueForRepair(nil) + } + } + }) + + // there is 1 repair worker, with max 1 blocking task + t.workerPool.Submit(func() { + for { + select { + case torrent := <-t.repairChan: t.invokeRepair(torrent) case <-t.RepairWorkerKillSwitch: t.repairLog.Info("Stopping periodic repair job") @@ -50,64 +60,61 @@ func (t *TorrentManager) StartRepairJob() { }) } +// EnqueueForRepair allows an on-demand repair to be initiated. +func (t *TorrentManager) EnqueueForRepair(torrent *Torrent) { + if !t.Config.EnableRepair() { + if torrent != nil { + t.repairLog.Warnf("Repair is disabled, skipping repair for torrent %s", t.GetKey(torrent)) + } + return + } + if torrent != nil && torrent.State.Event(context.Background(), "break_torrent") != nil { + // t.repairLog.Errorf("Failed to mark torrent %s as broken: %v", t.GetKey(torrent), err) + return + } + t.workerPool.Submit(func() { + t.invokeRepair(torrent) + }) +} + +// invokeRepair runs a sync repair job func (t *TorrentManager) invokeRepair(torrent *Torrent) { t.repairRunningMu.Lock() if t.repairRunning { t.repairRunningMu.Unlock() - t.repairQueue.Add(torrent) + t.RepairQueue.Add(torrent) // don't do anything if repair is already running return } - t.repairRunning = true t.repairRunningMu.Unlock() // Execute the repair job - t.repairAll(torrent) + time.Sleep(10 * time.Second) + t.executeRepairJob(torrent) // After repair is done t.repairRunningMu.Lock() t.repairRunning = false t.repairRunningMu.Unlock() - // before we let go, let's check repairQueue - queuedTorrent, exists := t.repairQueue.Pop() - if exists { - t.TriggerRepair(queuedTorrent) + if queuedTorrent, exists := t.RepairQueue.Pop(); exists { + t.workerPool.Submit(func() { + t.invokeRepair(queuedTorrent) + }) } } -// TriggerRepair allows an on-demand repair to be initiated. -func (t *TorrentManager) TriggerRepair(torrent *Torrent) { - if !t.Config.EnableRepair() { - if torrent != nil { - t.repairLog.Warnf("Repair is disabled, skipping repair for torrent %s", t.GetKey(torrent)) - } else { - t.repairLog.Warn("Repair is disabled, skipping repair") - } - return - } - if torrent != nil { - if err := torrent.State.Event(context.Background(), "break_torrent"); err != nil { - // t.repairLog.Errorf("Failed to mark torrent %s as broken: %v", t.GetKey(torrent), err) - return - } - } - t.repairTrigger <- torrent -} - -func (t *TorrentManager) repairAll(torrent *Torrent) { - // todo: a more elegant way to do this +func (t *TorrentManager) executeRepairJob(torrent *Torrent) { var haystack cmap.ConcurrentMap[string, *Torrent] if torrent == nil { haystack, _ = t.DirectoryMap.Get(INT_ALL) - t.repairLog.Debug("Periodic repair started; searching for broken torrents") } else { haystack = cmap.New[*Torrent]() haystack.Set("", torrent) } - // collect all torrents that need to be repaired + // collect all torrents that need to be repaired asynchronously toRepair := mapset.NewSet[*Torrent]() var wg sync.WaitGroup @@ -115,7 +122,8 @@ func (t *TorrentManager) repairAll(torrent *Torrent) { wg.Add(1) t.workerPool.Submit(func() { defer wg.Done() - if torrent.UnrepairableReason != "" { + canExtract := t.Config.GetRarAction() == "extract" && strings.Contains(torrent.UnrepairableReason, "rar") + if torrent.UnrepairableReason != "" || !canExtract { return } // check 1: for broken files @@ -144,7 +152,7 @@ func (t *TorrentManager) repairAll(torrent *Torrent) { toRepair.Each(func(torrent *Torrent) bool { wg.Add(1) - t.Repair(torrent, &wg) + t.repair(torrent, &wg) return false }) wg.Wait() @@ -152,24 +160,21 @@ func (t *TorrentManager) repairAll(torrent *Torrent) { t.repairLog.Infof("Finished periodic repair sequence for %d broken torrent(s)", toRepair.Cardinality()) } -func (t *TorrentManager) Repair(torrent *Torrent, wg *sync.WaitGroup) { +// repairman +func (t *TorrentManager) repair(torrent *Torrent, wg *sync.WaitGroup) { defer wg.Done() + if err := torrent.State.Event(context.Background(), "repair_torrent"); err != nil && t.inProgressHashes.Contains(torrent.Hash) { + // t.repairLog.Errorf("Failed to mark torrent %s as under repair: %v", t.GetKey(torrent), err) + return + } + // blocks for approx 45 minutes if active torrents are full if !t.canCapacityHandle() { t.repairLog.Error("Blocked for too long due to limit of active torrents, cannot continue with the repair") return } - if err := torrent.State.Event(context.Background(), "repair_torrent"); err != nil && t.inProgressHashes.Contains(torrent.Hash) { - // t.repairLog.Errorf("Failed to mark torrent %s as under repair: %v", t.GetKey(torrent), err) - return - } - t.repair(torrent) -} - -// repairman -func (t *TorrentManager) repair(torrent *Torrent) { t.repairLog.Infof("Started repair process for torrent %s (ids=%v)", t.GetKey(torrent), torrent.DownloadedIDs.ToSlice()) if torrent.UnassignedLinks.Cardinality() > 0 && !t.assignLinks(torrent) { @@ -311,7 +316,6 @@ func (t *TorrentManager) assignLinks(torrent *Torrent) bool { // try to assign to a selected file assigned := false torrent.SelectedFiles.IterCb(func(_ string, file *File) { - // base it on size because why not? if !assigned && file.State.Is("broken_file") && file.Bytes == unrestrict.Filesize && strings.HasSuffix(strings.ToLower(file.Path), strings.ToLower(unrestrict.Filename)) { file.Link = link assignedLinks = append(assignedLinks, link) @@ -335,6 +339,7 @@ func (t *TorrentManager) assignLinks(torrent *Torrent) bool { // t.log.Debugf("contents: %v", contents) rarCount++ } else { + // it's possible that it is already repaired t.repairLog.Warnf("Cannot assign %s to any file in torrent %s", unrestrict.Filename, t.GetKey(torrent)) } newUnassignedLinks.Set(link, unrestrict) @@ -508,10 +513,12 @@ func (t *TorrentManager) redownloadTorrent(torrent *Torrent, selection []string) if info.Progress == 100 && len(info.Links) != len(selection) { t.setToBinImmediately(newTorrentID) return nil, fmt.Errorf("only got %d links but we need %d", len(info.Links), len(selection)) + } else if info.Progress != 100 { + t.repairLog.Infof("Downloading torrent %s (id=%s, progress=%d)", t.GetKey(torrent), info.ID, info.Progress) + } else { + t.repairLog.Infof("Downloaded %d file(s) of torrent %s (id=%s)", len(selection), t.GetKey(torrent), info.ID, info.Progress) } - t.repairLog.Infof("Redownloading %d file(s) of torrent %s successful (id=%s, progress=%d)", len(selection), t.GetKey(torrent), info.ID, info.Progress) - return info, nil } diff --git a/internal/universal/downloader.go b/internal/universal/downloader.go index f3eed3b..d8d7d17 100644 --- a/internal/universal/downloader.go +++ b/internal/universal/downloader.go @@ -69,7 +69,7 @@ func (dl *Downloader) DownloadFile( http.Error(resp, "File is stale, please try again", http.StatusLocked) return } - torMgr.TriggerRepair(torrent) + torMgr.EnqueueForRepair(torrent) http.Error(resp, "File is not available", http.StatusNotFound) return } else { @@ -155,7 +155,7 @@ func (dl *Downloader) streamFileToResponse( http.Error(resp, "File is stale, please try again", http.StatusLocked) return } - torMgr.TriggerRepair(torrent) + torMgr.EnqueueForRepair(torrent) } http.Error(resp, "File is not available", http.StatusNotFound) return @@ -171,7 +171,7 @@ func (dl *Downloader) streamFileToResponse( http.Error(resp, "File is stale, please try again", http.StatusLocked) return } - torMgr.TriggerRepair(torrent) + torMgr.EnqueueForRepair(torrent) } http.Error(resp, "File is not available", http.StatusNotFound) return