Repair logger

This commit is contained in:
Ben Sarmiento
2024-05-25 05:45:50 +02:00
parent a681912fc1
commit 53c71b1249
4 changed files with 45 additions and 40 deletions

View File

@@ -99,6 +99,7 @@ func MainApp(configPath string) {
api,
workerPool,
log.Named("manager"),
log.Named("repair"),
)
downloader := universal.NewDownloader(downloadClient)

View File

@@ -253,11 +253,11 @@ func (zr *Handlers) handleHome(resp http.ResponseWriter, req *http.Request) {
</tr>
<tr>
<td>Immediate Bin</td>
<td>%s</td>
<td colspan="2">%s</td>
</tr>
<tr>
<td>Once Done Bin</td>
<td>%s</td>
<td colspan="2">%s</td>
</tr>
<tr>
<td>Utilities</td>

View File

@@ -28,6 +28,7 @@ type TorrentManager struct {
api *realdebrid.RealDebrid
workerPool *ants.Pool
log *logutil.Logger
repairLog *logutil.Logger
DirectoryMap cmap.ConcurrentMap[string, cmap.ConcurrentMap[string, *Torrent]] // directory -> accessKey -> Torrent
DownloadMap cmap.ConcurrentMap[string, *realdebrid.Download]
@@ -52,7 +53,7 @@ type TorrentManager struct {
// NewTorrentManager creates a new torrent manager
// it will fetch all torrents and their info in the background
// and store them in-memory and cached in files
func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, workerPool *ants.Pool, log *logutil.Logger) *TorrentManager {
func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, workerPool *ants.Pool, log, repairLog *logutil.Logger) *TorrentManager {
t := &TorrentManager{
requiredVersion: "0.10.0",
@@ -60,6 +61,7 @@ func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, w
api: api,
workerPool: workerPool,
log: log,
repairLog: repairLog,
DirectoryMap: cmap.New[cmap.ConcurrentMap[string, *Torrent]](),
DownloadMap: cmap.New[*realdebrid.Download](),

View File

@@ -20,14 +20,14 @@ const (
func (t *TorrentManager) StartRepairJob() {
if !t.Config.EnableRepair() {
t.log.Debug("Repair is disabled, skipping repair job")
t.repairLog.Debug("Repair is disabled, skipping repair job")
return
}
t.repairTrigger = make(chan *Torrent)
t.repairQueue = mapset.NewSet[*Torrent]()
// there is 1 repair worker, with max 1 blocking task
go func() {
t.log.Debug("Starting periodic repair job")
t.repairLog.Debug("Starting periodic repair job")
repairTicker := time.NewTicker(time.Duration(t.Config.GetRepairEveryMins()) * time.Minute)
defer repairTicker.Stop()
@@ -39,7 +39,7 @@ func (t *TorrentManager) StartRepairJob() {
// On-demand trigger with a specific torrent
t.invokeRepair(torrent)
case <-t.RepairKillSwitch:
t.log.Info("Stopping periodic repair job")
t.repairLog.Info("Stopping periodic repair job")
return
}
}
@@ -79,11 +79,11 @@ func (t *TorrentManager) invokeRepair(torrent *Torrent) {
func (t *TorrentManager) TriggerRepair(torrent *Torrent) {
if torrent != nil {
if err := torrent.State.Event(context.Background(), "break_torrent"); err != nil {
t.log.Errorf("Failed to mark torrent %s as broken: %v", t.GetKey(torrent), err)
t.repairLog.Errorf("Failed to mark torrent %s as broken: %v", t.GetKey(torrent), err)
return
}
if !t.Config.EnableRepair() {
t.log.Warnf("Repair is disabled, skipping repair for torrent %s", t.GetKey(torrent))
t.repairLog.Warnf("Repair is disabled, skipping repair for torrent %s", t.GetKey(torrent))
return
}
}
@@ -95,7 +95,7 @@ func (t *TorrentManager) repairAll(torrent *Torrent) {
var haystack cmap.ConcurrentMap[string, *Torrent]
if torrent == nil {
haystack, _ = t.DirectoryMap.Get(INT_ALL)
t.log.Debug("Periodic repair started; searching for broken torrents")
t.repairLog.Debug("Periodic repair started; searching for broken torrents")
} else {
haystack = cmap.New[*Torrent]()
haystack.Set("", torrent)
@@ -120,14 +120,13 @@ func (t *TorrentManager) repairAll(torrent *Torrent) {
}
})
if brokenFileCount > 0 {
t.log.Debugf("Torrent %s has %d/%d broken files, adding to repair list", t.GetKey(torrent), brokenFileCount, torrent.SelectedFiles.Count())
toRepair.Add(torrent)
return
}
// check 2: for unassigned links (this means the torrent has started to deteriorate)
unassignedCount := torrent.UnassignedLinks.Cardinality()
if unassignedCount > 0 {
t.log.Debugf("Torrent %s has %d unassigned links, adding to repair list", t.GetKey(torrent), unassignedCount)
t.repairLog.Debugf("Torrent %s has %d unassigned links, adding to repair list", t.GetKey(torrent), unassignedCount)
toRepair.Add(torrent)
return
}
@@ -143,13 +142,13 @@ func (t *TorrentManager) repairAll(torrent *Torrent) {
})
wg.Wait()
t.log.Infof("Finished periodic repair sequence for %d broken torrent(s)", toRepair.Cardinality())
t.repairLog.Infof("Finished periodic repair sequence for %d broken torrent(s)", toRepair.Cardinality())
}
func (t *TorrentManager) Repair(torrent *Torrent, wg *sync.WaitGroup) {
// blocks for approx 45 minutes if active torrents are full
if !t.canCapacityHandle() {
t.log.Error("Blocked for too long due to limit of active torrents, cannot continue with the repair")
t.repairLog.Error("Blocked for too long due to limit of active torrents, cannot continue with the repair")
wg.Done()
return
}
@@ -158,7 +157,7 @@ func (t *TorrentManager) Repair(torrent *Torrent, wg *sync.WaitGroup) {
_ = t.workerPool.Submit(func() {
defer wg.Done()
if err := torrent.State.Event(context.Background(), "repair_torrent"); err != nil {
t.log.Errorf("Failed to mark torrent %s as under repair: %v", t.GetKey(torrent), err)
t.repairLog.Errorf("Failed to mark torrent %s as under repair: %v", t.GetKey(torrent), err)
return
}
t.repair(torrent)
@@ -167,7 +166,7 @@ func (t *TorrentManager) Repair(torrent *Torrent, wg *sync.WaitGroup) {
// repairman
func (t *TorrentManager) repair(torrent *Torrent) {
t.log.Infof("Started repair process for torrent %s (ids=%v)", t.GetKey(torrent), torrent.DownloadedIDs.ToSlice())
t.repairLog.Infof("Started repair process for torrent %s (ids=%v)", t.GetKey(torrent), torrent.DownloadedIDs.ToSlice())
if torrent.UnassignedLinks.Cardinality() > 0 && !t.assignLinks(torrent) {
return
@@ -187,7 +186,7 @@ func (t *TorrentManager) repair(torrent *Torrent) {
// first step: redownload the whole torrent
t.log.Debugf("Torrent %s has %d broken files (out of %d), repairing by redownloading whole torrent", t.GetKey(torrent), len(brokenFiles), torrent.SelectedFiles.Count())
t.repairLog.Debugf("Torrent %s has %d broken files (out of %d), repairing by redownloading whole torrent", t.GetKey(torrent), len(brokenFiles), torrent.SelectedFiles.Count())
info, err := t.redownloadTorrent(torrent, []string{}) // reinsert the whole torrent, passing empty selection
if info != nil && info.Progress == 100 && !t.isStillBroken(info, brokenFiles) {
@@ -196,26 +195,26 @@ func (t *TorrentManager) repair(torrent *Torrent) {
// delete old torrents
torrent.DownloadedIDs.Each(func(torrentID string) bool {
if torrentID != info.ID {
t.setToBinOnceDone(torrentID)
t.setToBinImmediately(torrentID)
}
return false
})
t.log.Infof("Successfully repaired torrent %s by redownloading all files", t.GetKey(torrent))
t.repairLog.Infof("Successfully repaired torrent %s by redownloading all files", t.GetKey(torrent))
return
} else if info != nil && info.Progress != 100 {
t.log.Infof("Torrent %s is still in progress after redownloading but it should be repaired once done", t.GetKey(torrent))
t.repairLog.Infof("Torrent %s is still in progress after redownloading but it should be repaired once done", t.GetKey(torrent))
return
}
if err != nil {
t.log.Warnf("Cannot repair torrent %s by redownloading all files (error=%v)", t.GetKey(torrent), err)
t.repairLog.Warnf("Cannot repair torrent %s by redownloading all files (error=%v)", t.GetKey(torrent), err)
} else {
t.log.Warnf("Cannot repair torrent %s by redownloading all files", t.GetKey(torrent))
t.repairLog.Warnf("Cannot repair torrent %s by redownloading all files", t.GetKey(torrent))
}
if torrent.UnrepairableReason != "" {
t.log.Debugf("Torrent %s has been marked as unfixable during redownload (%s), ending repair process early", t.GetKey(torrent), torrent.UnrepairableReason)
t.repairLog.Debugf("Torrent %s has been marked as unfixable during redownload (%s), ending repair process early", t.GetKey(torrent), torrent.UnrepairableReason)
return
}
@@ -223,12 +222,12 @@ func (t *TorrentManager) repair(torrent *Torrent) {
if len(brokenFiles) == 1 && allBroken {
// if all files are broken, we can't do anything!
t.log.Warnf("Torrent %s has only 1 cached file and it's broken; marking as unfixable (to fix, select other files)", t.GetKey(torrent))
t.repairLog.Warnf("Torrent %s has only 1 cached file and it's broken; marking as unfixable (to fix, select other files)", t.GetKey(torrent))
t.markAsUnfixable(torrent, "the lone cached file is broken")
return
}
t.log.Infof("Torrent %s will be repaired by downloading %d batches of the %d broken files", t.GetKey(torrent), int(math.Ceil(float64(len(brokenFiles))/100)), len(brokenFiles))
t.repairLog.Infof("Torrent %s will be repaired by downloading %d batches of the %d broken files", t.GetKey(torrent), int(math.Ceil(float64(len(brokenFiles))/100)), len(brokenFiles))
newlyDownloadedIds := make([]string, 0)
batchNum := 1
@@ -237,11 +236,12 @@ func (t *TorrentManager) repair(torrent *Torrent) {
for _, fileIDStr := range brokenFileIDs {
group = append(group, fileIDStr)
if len(group) >= 100 {
t.log.Debugf("Downloading batch %d of broken files of torrent %s", batchNum, t.GetKey(torrent))
t.repairLog.Debugf("Downloading batch %d of broken files of torrent %s", batchNum, t.GetKey(torrent))
batchNum++
redownloadedInfo, err := t.redownloadTorrent(torrent, group)
if err != nil {
t.log.Warnf("Cannot repair torrent %s by downloading broken files (error=%v) giving up", t.GetKey(torrent), err)
t.repairLog.Warnf("Cannot repair torrent %s by downloading broken files (error=%v) giving up", t.GetKey(torrent), err)
// delete the newly downloaded torrents because the operation failed
for _, newId := range newlyDownloadedIds {
t.setToBinImmediately(newId)
}
@@ -252,12 +252,13 @@ func (t *TorrentManager) repair(torrent *Torrent) {
}
}
t.log.Debugf("Downloading last batch of broken files of torrent %s", t.GetKey(torrent))
t.repairLog.Debugf("Downloading last batch of broken files of torrent %s", t.GetKey(torrent))
if len(group) > 0 {
redownloadedInfo, err := t.redownloadTorrent(torrent, group)
if err != nil {
t.log.Warnf("Cannot repair torrent %s by downloading broken files (error=%v) giving up", t.GetKey(torrent), err)
t.repairLog.Warnf("Cannot repair torrent %s by downloading broken files (error=%v) giving up", t.GetKey(torrent), err)
// delete the newly downloaded torrents because the operation failed
for _, newId := range newlyDownloadedIds {
t.setToBinImmediately(newId)
}
@@ -266,6 +267,7 @@ func (t *TorrentManager) repair(torrent *Torrent) {
newlyDownloadedIds = append(newlyDownloadedIds, redownloadedInfo.ID)
}
// once done, we can delete the newly downloaded torrents because we only need the links
for _, newId := range newlyDownloadedIds {
t.setToBinOnceDone(newId)
}
@@ -273,7 +275,7 @@ func (t *TorrentManager) repair(torrent *Torrent) {
func (t *TorrentManager) assignLinks(torrent *Torrent) bool {
unassignedTotal := torrent.UnassignedLinks.Cardinality()
t.log.Infof("Trying to assign %d links to the %d selected of incomplete torrent %s", unassignedTotal, torrent.SelectedFiles.Count(), t.GetKey(torrent))
t.repairLog.Infof("Trying to assign %d links to the %d selected of incomplete torrent %s", unassignedTotal, torrent.SelectedFiles.Count(), t.GetKey(torrent))
// handle torrents with incomplete links for selected files
assignedCount := 0
@@ -310,14 +312,14 @@ func (t *TorrentManager) assignLinks(torrent *Torrent) bool {
if strings.HasSuffix(strings.ToLower(unrestrict.Filename), ".rar") {
rarCount++
} else {
t.log.Warnf("Cannot assign %s to any file in torrent %s", unrestrict.Filename, t.GetKey(torrent))
t.repairLog.Warnf("Cannot assign %s to any file in torrent %s", unrestrict.Filename, t.GetKey(torrent))
}
newUnassignedLinks.Set(link, unrestrict)
}
processedCount := assignedCount + unassignedCount + expiredCount
if processedCount%10 == 0 || processedCount == unassignedTotal {
t.log.Infof("Processed %d out of %d links (%d expired) to broken torrent %s", processedCount, unassignedTotal, expiredCount, t.GetKey(torrent))
t.repairLog.Infof("Processed %d out of %d links (%d expired) to broken torrent %s", processedCount, unassignedTotal, expiredCount, t.GetKey(torrent))
}
return false // next unassigned link
@@ -333,10 +335,10 @@ func (t *TorrentManager) assignLinks(torrent *Torrent) bool {
// this is a rar'ed torrent, nothing we can do
if assignedCount == 0 && rarCount == 1 {
if t.Config.ShouldDeleteRarFiles() {
t.log.Warnf("Torrent %s is rar'ed and we cannot repair it, deleting it as configured", t.GetKey(torrent))
t.repairLog.Warnf("Torrent %s is rar'ed and we cannot repair it, deleting it as configured", t.GetKey(torrent))
t.Delete(t.GetKey(torrent), true)
} else {
t.log.Warnf("Torrent %s is rar'ed and we cannot repair it", t.GetKey(torrent))
t.repairLog.Warnf("Torrent %s is rar'ed and we cannot repair it", t.GetKey(torrent))
newUnassignedLinks.IterCb(func(_ string, unassigned *realdebrid.Download) {
// if unassigned == nil {
// return
@@ -466,7 +468,7 @@ func (t *TorrentManager) redownloadTorrent(torrent *Torrent, selection []string)
return nil, fmt.Errorf("torrent %s only got %d links but we need %d", t.GetKey(torrent), len(info.Links), len(selection))
}
t.log.Infof("Redownloading torrent %s successful (id=%s, progress=%d)", t.GetKey(torrent), info.ID, info.Progress)
t.repairLog.Infof("Redownloading torrent %s successful (id=%s, progress=%d)", t.GetKey(torrent), info.ID, info.Progress)
return info, nil
}
@@ -480,9 +482,9 @@ func (t *TorrentManager) canCapacityHandle() bool {
for {
count, err := t.api.GetActiveTorrentCount()
if err != nil {
t.log.Warnf("Cannot get active downloads count: %v", err)
t.repairLog.Warnf("Cannot get active downloads count: %v", err)
if retryCount >= maxRetries {
t.log.Error("Max retries reached. Exiting.")
t.repairLog.Error("Max retries reached. Exiting.")
return false
}
delay := time.Duration(math.Pow(2, float64(retryCount))) * baseDelay
@@ -502,10 +504,10 @@ func (t *TorrentManager) canCapacityHandle() bool {
if delay > maxDelay {
delay = maxDelay
}
t.log.Infof("We have reached the max number of active torrents, waiting for %s seconds before retrying", delay)
t.repairLog.Infof("We have reached the max number of active torrents, waiting for %s seconds before retrying", delay)
if retryCount >= maxRetries {
t.log.Error("Max retries reached. Exiting.")
t.repairLog.Error("Max retries reached. Exiting.")
return false
}
@@ -515,7 +517,7 @@ func (t *TorrentManager) canCapacityHandle() bool {
}
func (t *TorrentManager) markAsUnplayable(torrent *Torrent, reason string) {
t.log.Warnf("Torrent %s is unplayable (reason: %s), moving to unplayable directory", t.GetKey(torrent), reason)
t.repairLog.Warnf("Torrent %s is unplayable (reason: %s), moving to unplayable directory", t.GetKey(torrent), reason)
// reassign to unplayable torrents directory
t.DirectoryMap.IterCb(func(directory string, torrents cmap.ConcurrentMap[string, *Torrent]) {
if strings.HasPrefix(directory, "int__") {
@@ -528,7 +530,7 @@ func (t *TorrentManager) markAsUnplayable(torrent *Torrent, reason string) {
}
func (t *TorrentManager) markAsUnfixable(torrent *Torrent, reason string) {
t.log.Warnf("Marking torrent %s as unfixable - %s", t.GetKey(torrent), reason)
t.repairLog.Warnf("Marking torrent %s as unfixable - %s", t.GetKey(torrent), reason)
torrent.UnrepairableReason = reason
t.writeTorrentToFile(torrent)
}