Repair enqueque

This commit is contained in:
Ben Adrian Sarmiento
2024-06-17 23:48:21 +02:00
parent ad06d8fea3
commit d53ba8c973
5 changed files with 137 additions and 102 deletions

View File

@@ -27,6 +27,7 @@ type RootResponse struct {
Logs string `json:"logs"` Logs string `json:"logs"`
UserInfo *realdebrid.User `json:"user_info"` // Replace UserInfoType with the actual type UserInfo *realdebrid.User `json:"user_info"` // Replace UserInfoType with the actual type
LibrarySize int `json:"library_size"` // Number of torrents in the library LibrarySize int `json:"library_size"` // Number of torrents in the library
RepairQueueStr string `json:"repair_queue"` // List of torrents in the repair queue
MemAlloc uint64 `json:"mem_alloc"` // Memory allocation in MB MemAlloc uint64 `json:"mem_alloc"` // Memory allocation in MB
TotalAlloc uint64 `json:"total_alloc"` // Total memory allocated in MB TotalAlloc uint64 `json:"total_alloc"` // Total memory allocated in MB
Sys uint64 `json:"sys"` // System memory in MB Sys uint64 `json:"sys"` // System memory in MB
@@ -50,6 +51,21 @@ func (zr *Handlers) handleHome(resp http.ResponseWriter, req *http.Request) {
allTorrents, _ := zr.torMgr.DirectoryMap.Get(config.ALL_TORRENTS) allTorrents, _ := zr.torMgr.DirectoryMap.Get(config.ALL_TORRENTS)
repairQueueStr := ""
if zr.torMgr.RepairQueue == nil {
repairQueueStr = "repair is disabled"
} else if zr.torMgr.RepairQueue.IsEmpty() {
repairQueueStr = "empty"
} else {
for _, torrent := range zr.torMgr.RepairQueue.ToSlice() {
if torrent == nil {
repairQueueStr += " ⬅ 'all torrents'"
} else {
repairQueueStr += fmt.Sprintf(" ⬅ %s", zr.torMgr.GetKey(torrent))
}
}
}
response := RootResponse{ response := RootResponse{
Version: version.GetVersion(), Version: version.GetVersion(),
BuiltAt: version.GetBuiltAt(), BuiltAt: version.GetBuiltAt(),
@@ -60,6 +76,7 @@ func (zr *Handlers) handleHome(resp http.ResponseWriter, req *http.Request) {
Logs: fmt.Sprintf("//%s/logs/", req.Host), Logs: fmt.Sprintf("//%s/logs/", req.Host),
UserInfo: userInfo, UserInfo: userInfo,
LibrarySize: allTorrents.Count(), LibrarySize: allTorrents.Count(),
RepairQueueStr: repairQueueStr,
MemAlloc: bToMb(mem.Alloc), MemAlloc: bToMb(mem.Alloc),
TotalAlloc: bToMb(mem.TotalAlloc), TotalAlloc: bToMb(mem.TotalAlloc),
Sys: bToMb(mem.Sys), Sys: bToMb(mem.Sys),
@@ -111,6 +128,10 @@ func (zr *Handlers) handleHome(resp http.ResponseWriter, req *http.Request) {
<td>Library Size</td> <td>Library Size</td>
<td colspan="2">%d items</td> <td colspan="2">%d items</td>
</tr> </tr>
<tr>
<td>Repair Queue</td>
<td colspan="2">%s</td>
</tr>
<tr> <tr>
<td>Memory Allocation</td> <td>Memory Allocation</td>
<td colspan="2">%d MB</td> <td colspan="2">%d MB</td>
@@ -262,8 +283,21 @@ func (zr *Handlers) handleHome(resp http.ResponseWriter, req *http.Request) {
<td>Utilities</td> <td>Utilities</td>
<td colspan="2"> <td colspan="2">
<form method="get" action="/logs/upload"> <form method="get" action="/logs/upload">
<input type="submit" value="Upload logs" /> <input type="submit" value="Upload logs" /> Share the link to get help
</form> </form>
<form method="post" action="/torrents/dump">
<input type="submit" value="Dump torrents" /> Copy all your zurgtorrent files to your dump folder
</form>
<form method="post" action="/torrents/analyze">
<input type="submit" value="Analyze torrents" /> Required to use media_info_* filters
</form>
<form method="post" action="/torrents/repair">
<input type="submit" value="Repair torrents" /> Trigger repair of all torrents
</form>
</td>
<tr>
<td>Debug</td>
<td colspan="2">
<form method="post" action="/reboot-worker"> <form method="post" action="/reboot-worker">
<input type="submit" value="Reboot worker pool" /> <input type="submit" value="Reboot worker pool" />
</form> </form>
@@ -276,15 +310,6 @@ func (zr *Handlers) handleHome(resp http.ResponseWriter, req *http.Request) {
<form method="post" action="/downloads/remount"> <form method="post" action="/downloads/remount">
<input type="submit" value="Remount downloads" /> <input type="submit" value="Remount downloads" />
</form> </form>
<form method="post" action="/torrents/dump">
<input type="submit" value="Dump torrents" />
</form>
<form method="post" action="/torrents/analyze">
<input type="submit" value="Analyze torrents" />
</form>
<form method="post" action="/torrents/repair">
<input type="submit" value="Repair torrents" />
</form>
</td> </td>
</tr> </tr>
</table> </table>
@@ -302,6 +327,7 @@ func (zr *Handlers) handleHome(resp http.ResponseWriter, req *http.Request) {
response.Logs, response.Logs,
response.Logs, response.Logs,
response.LibrarySize, response.LibrarySize,
response.RepairQueueStr,
response.MemAlloc, response.MemAlloc,
response.TotalAlloc, response.TotalAlloc,
response.Sys, response.Sys,
@@ -395,7 +421,7 @@ func (zr *Handlers) handleAnalyzeTorrents(resp http.ResponseWriter, req *http.Re
func (zr *Handlers) handleTriggerRepairAll(resp http.ResponseWriter, req *http.Request) { func (zr *Handlers) handleTriggerRepairAll(resp http.ResponseWriter, req *http.Request) {
resp.Header().Set("Refresh", "2; url=/") resp.Header().Set("Refresh", "2; url=/")
zr.torMgr.RepairAllTrigger <- struct{}{} zr.torMgr.EnqueueForRepair(nil)
zr.log.Infof("Triggered repair of all torrents") zr.log.Infof("Triggered repair of all torrents")
fmt.Fprint(resp, "Repairing all torrents...") fmt.Fprint(resp, "Repairing all torrents...")
} }

View File

@@ -50,10 +50,12 @@ func AttachHandlers(router *chi.Mux, downloader *universal.Downloader, torMgr *t
router.Use(hs.options) router.Use(hs.options)
router.Get("/", hs.handleHome) router.Get("/", hs.handleHome)
// debug
router.Post("/reboot-worker", hs.handleRebootWorkerPool) router.Post("/reboot-worker", hs.handleRebootWorkerPool)
router.Post("/reboot-refresh", hs.handleRebootRefreshWorker) router.Post("/reboot-refresh", hs.handleRebootRefreshWorker)
router.Post("/reboot-repair", hs.handleRebootRepairWorker) router.Post("/reboot-repair", hs.handleRebootRepairWorker)
router.Post("/downloads/remount", hs.handleRemountDownloads) router.Post("/downloads/remount", hs.handleRemountDownloads)
// utils
router.Post("/torrents/dump", hs.handleDumpTorrents) router.Post("/torrents/dump", hs.handleDumpTorrents)
router.Post("/torrents/analyze", hs.handleAnalyzeTorrents) router.Post("/torrents/analyze", hs.handleAnalyzeTorrents)
router.Post("/torrents/repair", hs.handleTriggerRepairAll) router.Post("/torrents/repair", hs.handleTriggerRepairAll)

View File

@@ -49,8 +49,8 @@ type TorrentManager struct {
latestState *LibraryState latestState *LibraryState
inProgressHashes mapset.Set[string] inProgressHashes mapset.Set[string]
repairTrigger chan *Torrent repairChan chan *Torrent
repairQueue mapset.Set[*Torrent] RepairQueue mapset.Set[*Torrent]
repairRunning bool repairRunning bool
repairRunningMu sync.Mutex repairRunningMu sync.Mutex
@@ -129,7 +129,7 @@ func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, w
t.setNewLatestState(t.getCurrentState()) t.setNewLatestState(t.getCurrentState())
t.TriggerRepair(nil) t.EnqueueForRepair(nil)
}) })
return t return t

View File

@@ -21,13 +21,14 @@ const (
func (t *TorrentManager) StartRepairJob() { func (t *TorrentManager) StartRepairJob() {
if !t.Config.EnableRepair() { if !t.Config.EnableRepair() {
t.repairLog.Debug("Repair is disabled, skipping repair job") t.repairLog.Warn("Repair is disabled, skipping repair job")
return return
} }
t.repairTrigger = make(chan *Torrent)
t.repairQueue = mapset.NewSet[*Torrent]() t.repairChan = make(chan *Torrent)
t.RepairQueue = mapset.NewSet[*Torrent]()
t.RepairAllTrigger = make(chan struct{}) t.RepairAllTrigger = make(chan struct{})
// there is 1 repair worker, with max 1 blocking task
t.workerPool.Submit(func() { t.workerPool.Submit(func() {
t.repairLog.Debug("Starting periodic repair job") t.repairLog.Debug("Starting periodic repair job")
repairTicker := time.NewTicker(time.Duration(t.Config.GetRepairEveryMins()) * time.Minute) repairTicker := time.NewTicker(time.Duration(t.Config.GetRepairEveryMins()) * time.Minute)
@@ -36,11 +37,20 @@ func (t *TorrentManager) StartRepairJob() {
for { for {
select { select {
case <-repairTicker.C: case <-repairTicker.C:
t.invokeRepair(nil) t.repairLog.Debug("Periodic repair started; searching for broken torrents")
t.EnqueueForRepair(nil)
case <-t.RepairAllTrigger: case <-t.RepairAllTrigger:
t.invokeRepair(nil) t.repairLog.Debug("Manual repair of all torrents triggered; searching for broken torrents")
case torrent := <-t.repairTrigger: t.EnqueueForRepair(nil)
// On-demand trigger with a specific torrent }
}
})
// there is 1 repair worker, with max 1 blocking task
t.workerPool.Submit(func() {
for {
select {
case torrent := <-t.repairChan:
t.invokeRepair(torrent) t.invokeRepair(torrent)
case <-t.RepairWorkerKillSwitch: case <-t.RepairWorkerKillSwitch:
t.repairLog.Info("Stopping periodic repair job") t.repairLog.Info("Stopping periodic repair job")
@@ -50,64 +60,61 @@ func (t *TorrentManager) StartRepairJob() {
}) })
} }
// EnqueueForRepair allows an on-demand repair to be initiated.
func (t *TorrentManager) EnqueueForRepair(torrent *Torrent) {
if !t.Config.EnableRepair() {
if torrent != nil {
t.repairLog.Warnf("Repair is disabled, skipping repair for torrent %s", t.GetKey(torrent))
}
return
}
if torrent != nil && torrent.State.Event(context.Background(), "break_torrent") != nil {
// t.repairLog.Errorf("Failed to mark torrent %s as broken: %v", t.GetKey(torrent), err)
return
}
t.workerPool.Submit(func() {
t.invokeRepair(torrent)
})
}
// invokeRepair runs a sync repair job
func (t *TorrentManager) invokeRepair(torrent *Torrent) { func (t *TorrentManager) invokeRepair(torrent *Torrent) {
t.repairRunningMu.Lock() t.repairRunningMu.Lock()
if t.repairRunning { if t.repairRunning {
t.repairRunningMu.Unlock() t.repairRunningMu.Unlock()
t.repairQueue.Add(torrent) t.RepairQueue.Add(torrent)
// don't do anything if repair is already running // don't do anything if repair is already running
return return
} }
t.repairRunning = true t.repairRunning = true
t.repairRunningMu.Unlock() t.repairRunningMu.Unlock()
// Execute the repair job // Execute the repair job
t.repairAll(torrent) time.Sleep(10 * time.Second)
t.executeRepairJob(torrent)
// After repair is done // After repair is done
t.repairRunningMu.Lock() t.repairRunningMu.Lock()
t.repairRunning = false t.repairRunning = false
t.repairRunningMu.Unlock() t.repairRunningMu.Unlock()
// before we let go, let's check repairQueue if queuedTorrent, exists := t.RepairQueue.Pop(); exists {
queuedTorrent, exists := t.repairQueue.Pop() t.workerPool.Submit(func() {
if exists { t.invokeRepair(queuedTorrent)
t.TriggerRepair(queuedTorrent) })
} }
} }
// TriggerRepair allows an on-demand repair to be initiated. func (t *TorrentManager) executeRepairJob(torrent *Torrent) {
func (t *TorrentManager) TriggerRepair(torrent *Torrent) {
if !t.Config.EnableRepair() {
if torrent != nil {
t.repairLog.Warnf("Repair is disabled, skipping repair for torrent %s", t.GetKey(torrent))
} else {
t.repairLog.Warn("Repair is disabled, skipping repair")
}
return
}
if torrent != nil {
if err := torrent.State.Event(context.Background(), "break_torrent"); err != nil {
// t.repairLog.Errorf("Failed to mark torrent %s as broken: %v", t.GetKey(torrent), err)
return
}
}
t.repairTrigger <- torrent
}
func (t *TorrentManager) repairAll(torrent *Torrent) {
// todo: a more elegant way to do this
var haystack cmap.ConcurrentMap[string, *Torrent] var haystack cmap.ConcurrentMap[string, *Torrent]
if torrent == nil { if torrent == nil {
haystack, _ = t.DirectoryMap.Get(INT_ALL) haystack, _ = t.DirectoryMap.Get(INT_ALL)
t.repairLog.Debug("Periodic repair started; searching for broken torrents")
} else { } else {
haystack = cmap.New[*Torrent]() haystack = cmap.New[*Torrent]()
haystack.Set("", torrent) haystack.Set("", torrent)
} }
// collect all torrents that need to be repaired // collect all torrents that need to be repaired asynchronously
toRepair := mapset.NewSet[*Torrent]() toRepair := mapset.NewSet[*Torrent]()
var wg sync.WaitGroup var wg sync.WaitGroup
@@ -115,7 +122,8 @@ func (t *TorrentManager) repairAll(torrent *Torrent) {
wg.Add(1) wg.Add(1)
t.workerPool.Submit(func() { t.workerPool.Submit(func() {
defer wg.Done() defer wg.Done()
if torrent.UnrepairableReason != "" { canExtract := t.Config.GetRarAction() == "extract" && strings.Contains(torrent.UnrepairableReason, "rar")
if torrent.UnrepairableReason != "" || !canExtract {
return return
} }
// check 1: for broken files // check 1: for broken files
@@ -144,7 +152,7 @@ func (t *TorrentManager) repairAll(torrent *Torrent) {
toRepair.Each(func(torrent *Torrent) bool { toRepair.Each(func(torrent *Torrent) bool {
wg.Add(1) wg.Add(1)
t.Repair(torrent, &wg) t.repair(torrent, &wg)
return false return false
}) })
wg.Wait() wg.Wait()
@@ -152,24 +160,21 @@ func (t *TorrentManager) repairAll(torrent *Torrent) {
t.repairLog.Infof("Finished periodic repair sequence for %d broken torrent(s)", toRepair.Cardinality()) t.repairLog.Infof("Finished periodic repair sequence for %d broken torrent(s)", toRepair.Cardinality())
} }
func (t *TorrentManager) Repair(torrent *Torrent, wg *sync.WaitGroup) { // repairman
func (t *TorrentManager) repair(torrent *Torrent, wg *sync.WaitGroup) {
defer wg.Done() defer wg.Done()
if err := torrent.State.Event(context.Background(), "repair_torrent"); err != nil && t.inProgressHashes.Contains(torrent.Hash) {
// t.repairLog.Errorf("Failed to mark torrent %s as under repair: %v", t.GetKey(torrent), err)
return
}
// blocks for approx 45 minutes if active torrents are full // blocks for approx 45 minutes if active torrents are full
if !t.canCapacityHandle() { if !t.canCapacityHandle() {
t.repairLog.Error("Blocked for too long due to limit of active torrents, cannot continue with the repair") t.repairLog.Error("Blocked for too long due to limit of active torrents, cannot continue with the repair")
return return
} }
if err := torrent.State.Event(context.Background(), "repair_torrent"); err != nil && t.inProgressHashes.Contains(torrent.Hash) {
// t.repairLog.Errorf("Failed to mark torrent %s as under repair: %v", t.GetKey(torrent), err)
return
}
t.repair(torrent)
}
// repairman
func (t *TorrentManager) repair(torrent *Torrent) {
t.repairLog.Infof("Started repair process for torrent %s (ids=%v)", t.GetKey(torrent), torrent.DownloadedIDs.ToSlice()) t.repairLog.Infof("Started repair process for torrent %s (ids=%v)", t.GetKey(torrent), torrent.DownloadedIDs.ToSlice())
if torrent.UnassignedLinks.Cardinality() > 0 && !t.assignLinks(torrent) { if torrent.UnassignedLinks.Cardinality() > 0 && !t.assignLinks(torrent) {
@@ -311,7 +316,6 @@ func (t *TorrentManager) assignLinks(torrent *Torrent) bool {
// try to assign to a selected file // try to assign to a selected file
assigned := false assigned := false
torrent.SelectedFiles.IterCb(func(_ string, file *File) { torrent.SelectedFiles.IterCb(func(_ string, file *File) {
// base it on size because why not?
if !assigned && file.State.Is("broken_file") && file.Bytes == unrestrict.Filesize && strings.HasSuffix(strings.ToLower(file.Path), strings.ToLower(unrestrict.Filename)) { if !assigned && file.State.Is("broken_file") && file.Bytes == unrestrict.Filesize && strings.HasSuffix(strings.ToLower(file.Path), strings.ToLower(unrestrict.Filename)) {
file.Link = link file.Link = link
assignedLinks = append(assignedLinks, link) assignedLinks = append(assignedLinks, link)
@@ -335,6 +339,7 @@ func (t *TorrentManager) assignLinks(torrent *Torrent) bool {
// t.log.Debugf("contents: %v", contents) // t.log.Debugf("contents: %v", contents)
rarCount++ rarCount++
} else { } else {
// it's possible that it is already repaired
t.repairLog.Warnf("Cannot assign %s to any file in torrent %s", unrestrict.Filename, t.GetKey(torrent)) t.repairLog.Warnf("Cannot assign %s to any file in torrent %s", unrestrict.Filename, t.GetKey(torrent))
} }
newUnassignedLinks.Set(link, unrestrict) newUnassignedLinks.Set(link, unrestrict)
@@ -508,10 +513,12 @@ func (t *TorrentManager) redownloadTorrent(torrent *Torrent, selection []string)
if info.Progress == 100 && len(info.Links) != len(selection) { if info.Progress == 100 && len(info.Links) != len(selection) {
t.setToBinImmediately(newTorrentID) t.setToBinImmediately(newTorrentID)
return nil, fmt.Errorf("only got %d links but we need %d", len(info.Links), len(selection)) return nil, fmt.Errorf("only got %d links but we need %d", len(info.Links), len(selection))
} else if info.Progress != 100 {
t.repairLog.Infof("Downloading torrent %s (id=%s, progress=%d)", t.GetKey(torrent), info.ID, info.Progress)
} else {
t.repairLog.Infof("Downloaded %d file(s) of torrent %s (id=%s)", len(selection), t.GetKey(torrent), info.ID, info.Progress)
} }
t.repairLog.Infof("Redownloading %d file(s) of torrent %s successful (id=%s, progress=%d)", len(selection), t.GetKey(torrent), info.ID, info.Progress)
return info, nil return info, nil
} }

View File

@@ -69,7 +69,7 @@ func (dl *Downloader) DownloadFile(
http.Error(resp, "File is stale, please try again", http.StatusLocked) http.Error(resp, "File is stale, please try again", http.StatusLocked)
return return
} }
torMgr.TriggerRepair(torrent) torMgr.EnqueueForRepair(torrent)
http.Error(resp, "File is not available", http.StatusNotFound) http.Error(resp, "File is not available", http.StatusNotFound)
return return
} else { } else {
@@ -155,7 +155,7 @@ func (dl *Downloader) streamFileToResponse(
http.Error(resp, "File is stale, please try again", http.StatusLocked) http.Error(resp, "File is stale, please try again", http.StatusLocked)
return return
} }
torMgr.TriggerRepair(torrent) torMgr.EnqueueForRepair(torrent)
} }
http.Error(resp, "File is not available", http.StatusNotFound) http.Error(resp, "File is not available", http.StatusNotFound)
return return
@@ -171,7 +171,7 @@ func (dl *Downloader) streamFileToResponse(
http.Error(resp, "File is stale, please try again", http.StatusLocked) http.Error(resp, "File is stale, please try again", http.StatusLocked)
return return
} }
torMgr.TriggerRepair(torrent) torMgr.EnqueueForRepair(torrent)
} }
http.Error(resp, "File is not available", http.StatusNotFound) http.Error(resp, "File is not available", http.StatusNotFound)
return return