Files
zurg/internal/torrent/repair.go
Ben Adrian Sarmiento 3abf48514d Return an error for 503
2024-06-23 22:08:54 +02:00

677 lines
21 KiB
Go

package torrent
import (
"context"
"fmt"
"math"
"strings"
"sync"
"time"
"github.com/debridmediamanager/zurg/internal/config"
"github.com/debridmediamanager/zurg/pkg/realdebrid"
"github.com/debridmediamanager/zurg/pkg/utils"
mapset "github.com/deckarep/golang-set/v2"
cmap "github.com/orcaman/concurrent-map/v2"
)
const (
EXPIRED_LINK_TOLERANCE_HOURS = 24
)
func (t *TorrentManager) StartRepairJob() {
if !t.Config.EnableRepair() {
t.repairLog.Warn("Repair is disabled, skipping repair job")
return
}
t.repairChan = make(chan *Torrent)
t.RepairQueue = mapset.NewSet[*Torrent]()
t.RepairAllTrigger = make(chan struct{})
// periodic repair worker
t.workerPool.Submit(func() {
t.repairLog.Debug("Starting periodic repair job")
repairTicker := time.NewTicker(time.Duration(t.Config.GetRepairEveryMins()) * time.Minute)
defer repairTicker.Stop()
for {
select {
case <-repairTicker.C:
t.repairLog.Debug("Periodic repair started; searching for broken torrents")
t.EnqueueForRepair(nil)
case <-t.RepairAllTrigger:
t.repairLog.Debug("Manual repair of all torrents triggered; searching for broken torrents")
t.EnqueueForRepair(nil)
}
}
})
// repair worker
t.workerPool.Submit(func() {
for {
select {
case torrent := <-t.repairChan:
t.invokeRepair(torrent)
case <-t.RepairWorkerKillSwitch:
t.repairLog.Info("Stopping periodic repair job")
return
}
}
})
}
// EnqueueForRepair allows an on-demand repair to be initiated.
func (t *TorrentManager) EnqueueForRepair(torrent *Torrent) {
if !t.Config.EnableRepair() {
if torrent != nil {
t.repairLog.Warnf("Repair is disabled, skipping repair for torrent %s", t.GetKey(torrent))
}
return
}
if torrent != nil && torrent.State.Event(context.Background(), "break_torrent") != nil {
// t.repairLog.Errorf("Failed to mark torrent %s as broken: %v", t.GetKey(torrent), err)
return
}
t.workerPool.Submit(func() {
t.invokeRepair(torrent)
})
}
// invokeRepair runs a sync repair job
func (t *TorrentManager) invokeRepair(torrent *Torrent) {
t.repairRunningMu.Lock()
if t.repairRunning {
t.repairRunningMu.Unlock()
t.RepairQueue.Add(torrent)
// don't do anything if repair is already running
return
}
t.repairRunning = true
t.repairRunningMu.Unlock()
// Execute the repair job
t.executeRepairJob(torrent)
// After repair is done
t.repairRunningMu.Lock()
t.repairRunning = false
t.repairRunningMu.Unlock()
if queuedTorrent, exists := t.RepairQueue.Pop(); exists {
t.workerPool.Submit(func() {
t.invokeRepair(queuedTorrent)
})
}
}
func (t *TorrentManager) executeRepairJob(torrent *Torrent) {
var haystack cmap.ConcurrentMap[string, *Torrent]
if torrent == nil {
haystack, _ = t.DirectoryMap.Get(INT_ALL)
} else {
haystack = cmap.New[*Torrent]()
haystack.Set("", torrent)
}
// collect all torrents that need to be repaired asynchronously
toRepair := mapset.NewSet[*Torrent]()
var wg sync.WaitGroup
haystack.IterCb(func(_ string, torrent *Torrent) {
wg.Add(1)
// temp worker for finding broken torrents
t.workerPool.Submit(func() {
defer wg.Done()
canExtract := t.Config.GetRarAction() == "extract" && strings.Contains(torrent.UnrepairableReason, "rar")
if torrent.UnrepairableReason != "" && !canExtract {
return
}
// check 1: for broken files
brokenFileCount := 0
torrent.SelectedFiles.IterCb(func(_ string, file *File) {
if !utils.IsVideo(file.Path) && !t.IsPlayable(file.Path) {
return
}
if file.State.Is("broken_file") {
brokenFileCount++
}
})
if brokenFileCount > 0 {
t.repairLog.Debugf("Torrent %s has %d broken file(s), adding to repair list", t.GetKey(torrent), brokenFileCount)
toRepair.Add(torrent)
return
}
// check 2: for unassigned links (this means the torrent has started to deteriorate)
unassignedCount := torrent.UnassignedLinks.Cardinality()
if unassignedCount > 0 {
t.repairLog.Debugf("Torrent %s has %d unassigned link(s), adding to repair list", t.GetKey(torrent), unassignedCount)
toRepair.Add(torrent)
return
}
// if marked as broken, but no broken files or unassigned links, mark as repaired
if torrent.State.Is("broken_torrent") {
torrent.State.Event(context.Background(), "mark_as_repaired")
}
})
})
wg.Wait()
toRepair.Each(func(torrent *Torrent) bool {
wg.Add(1)
t.repair(torrent, &wg)
return false
})
wg.Wait()
t.repairLog.Infof("Finished periodic repair sequence for %d broken torrent(s)", toRepair.Cardinality())
}
// repairman
func (t *TorrentManager) repair(torrent *Torrent, wg *sync.WaitGroup) {
defer wg.Done()
if err := torrent.State.Event(context.Background(), "repair_torrent"); err != nil {
// t.repairLog.Errorf("Failed to mark torrent %s as under repair: %v", t.GetKey(torrent), err)
return
}
// blocks for approx 45 minutes if active torrents are full
if !t.canCapacityHandle() {
t.repairLog.Error("Blocked for too long due to limit of active torrents, cannot continue with the repair")
return
}
t.repairLog.Infof("Started repair process for torrent %s (ids=%v)", t.GetKey(torrent), torrent.DownloadedIDs.ToSlice())
if torrent.UnassignedLinks.Cardinality() > 0 && !t.assignLinks(torrent) {
return
}
// check for other broken file
torrent.SelectedFiles.IterCb(func(_ string, file *File) {
if !file.State.Is("ok_file") {
return
}
if t.UnrestrictFile(file, true) == nil {
file.State.Event(context.Background(), "break_file")
}
})
brokenFiles, allBroken := t.getBrokenFiles(torrent)
// check if broken files are playable
allPlayable := true
videoFiles := []string{}
for _, file := range brokenFiles {
if !utils.IsVideo(file.Path) && !t.IsPlayable(file.Path) {
continue
}
if utils.IsVideo(file.Path) {
videoFiles = append(videoFiles, fmt.Sprintf("%d", file.ID))
continue
}
allPlayable = false
if t.Config.GetRarAction() == "extract" && file.ID != 0 {
t.repairLog.Debugf("Extracting file %s from rar'ed torrent %s", file.Path, t.GetKey(torrent))
info, _ := t.redownloadTorrent(torrent, []string{fmt.Sprintf("%d", file.ID)})
if info != nil {
t.willDeleteOnceDone(info.ID)
}
}
}
if !allPlayable {
if len(videoFiles) > 0 {
t.repairLog.Debugf("Extracting %d video file(s) from rar'ed torrent %s", len(videoFiles), t.GetKey(torrent))
info, _ := t.redownloadTorrent(torrent, videoFiles)
if info != nil {
t.willDeleteOnceDone(info.ID)
}
}
return
}
oldDownloadedIDs := torrent.DownloadedIDs.Clone()
// first step: redownload the whole torrent
t.repairLog.Debugf("Torrent %s has %d broken files (out of %d); repairing by redownloading whole torrent", t.GetKey(torrent), len(brokenFiles), torrent.SelectedFiles.Count())
info, err := t.redownloadTorrent(torrent, []string{}) // reinsert the whole torrent, passing empty selection
if info != nil && info.Progress == 100 {
if !t.isStillBroken(info, brokenFiles) {
// delete the torrents it replaced
oldDownloadedIDs.Each(func(torrentID string) bool {
t.DeleteByID(torrentID)
return false
})
t.repairLog.Infof("Successfully repaired torrent %s by redownloading whole torrent", t.GetKey(torrent))
return
}
// if it's still broken, let's delete the newly downloaded torrent
t.DeleteByID(info.ID)
err = fmt.Errorf("links are still broken")
} else if info != nil && info.Progress != 100 {
// it's faster to download just the broken files, so let's delete the newly downloaded torrent
t.DeleteByID(info.ID)
err = fmt.Errorf("no longer cached")
}
if err != nil {
t.repairLog.Warnf("Cannot repair torrent %s by redownloading whole torrent (error=%v)", t.GetKey(torrent), err)
}
if torrent.UnrepairableReason != "" {
t.repairLog.Debugf("Torrent %s has been marked as unfixable after redownloading torrent %s; ending repair process early", t.GetKey(torrent), torrent.UnrepairableReason)
return
}
// second step: download just the broken files
if len(brokenFiles) == 1 && allBroken {
// if all files are broken, we can't do anything!
t.repairLog.Warnf("Torrent %s has only 1 cached file and it's broken; marking as unfixable (to fix, select other files)", t.GetKey(torrent))
t.markAsUnfixable(torrent, "the lone cached file is broken")
return
}
totalBatches := int(math.Ceil(float64(len(brokenFiles)) / 100))
t.repairLog.Infof("Torrent %s will be repaired by downloading %d batches of the %d broken files", t.GetKey(torrent), totalBatches, len(brokenFiles))
newlyDownloadedIds := make([]string, 0)
batchNum := 1
brokenFileIDs := getFileIDs(brokenFiles)
var group []string
for i, fileIDStr := range brokenFileIDs {
group = append(group, fileIDStr)
if len(group) >= 100 || i == len(brokenFileIDs)-1 {
t.repairLog.Debugf("Downloading batch %d/%d of broken files of torrent %s", batchNum, totalBatches, t.GetKey(torrent))
batchNum++
redownloadedInfo, err := t.redownloadTorrent(torrent, group)
if err != nil {
t.repairLog.Warnf("Cannot repair torrent %s by downloading broken files (error=%v) giving up", t.GetKey(torrent), err)
// delete the newly downloaded torrents because the operation failed
for _, newId := range newlyDownloadedIds {
t.DeleteByID(newId)
}
return
}
newlyDownloadedIds = append(newlyDownloadedIds, redownloadedInfo.ID)
group = make([]string, 0)
}
}
// once done, we can delete the newly downloaded torrents because we only need the links
for _, newId := range newlyDownloadedIds {
t.willDeleteOnceDone(newId)
}
}
func (t *TorrentManager) assignLinks(torrent *Torrent) bool {
unassignedTotal := torrent.UnassignedLinks.Cardinality()
t.repairLog.Infof("Trying to assign %d link(s) to the %d selected files of incomplete torrent %s", unassignedTotal, torrent.SelectedFiles.Count(), t.GetKey(torrent))
// handle torrents with incomplete links for selected files
assignedCount := 0
expiredCount := 0
rarCount := 0
unassignedCount := 0
newUnassignedLinks := cmap.New[*realdebrid.Download]()
var assignedLinks []string
torrent.UnassignedLinks.Clone().Each(func(link string) bool {
// unrestrict each unassigned link that was filled out during torrent init
unrestrict := t.UnrestrictLink(link, true)
if unrestrict == nil {
expiredCount++
return false // next unassigned link
}
// try to assign to a selected file
assigned := false
torrent.SelectedFiles.IterCb(func(_ string, file *File) {
if !assigned && file.State.Is("broken_file") && file.Bytes == unrestrict.Filesize && strings.HasSuffix(strings.ToLower(file.Path), strings.ToLower(unrestrict.Filename)) {
file.Link = link
assignedLinks = append(assignedLinks, link)
if strings.HasPrefix(file.Link, "https://real-debrid.com/d/") {
file.Link = file.Link[0:39]
}
file.State.Event(context.Background(), "repair_file")
assigned = true
assignedCount++
}
})
if !assigned {
// if not assigned and is a rar, likely it was rar'ed by RD
if strings.HasSuffix(strings.ToLower(unrestrict.Filename), ".rar") {
// t.log.Debugf("Trying to get contents of rar file %s", unrestrict.Filename)
// contents, err := t.rarReader.GetRARContents(unrestrict.Download)
// if err != nil {
// t.repairLog.Warnf("Cannot get contents of rar file %s: %v", unrestrict.Filename, err)
// }
// t.log.Debugf("contents: %v", contents)
rarCount++
} else {
// it's possible that it is already repaired
t.repairLog.Warnf("Cannot assign %s to any file in torrent %s", unrestrict.Filename, t.GetKey(torrent))
}
newUnassignedLinks.Set(link, unrestrict)
}
processedCount := assignedCount + unassignedCount + expiredCount
if processedCount%10 == 0 || processedCount == unassignedTotal {
// save each progress
for _, assignedLink := range assignedLinks {
torrent.UnassignedLinks.Remove(assignedLink)
}
t.writeTorrentToFile(torrent)
t.repairLog.Infof("Processed %d out of %d links (%d expired) to broken torrent %s", processedCount, unassignedTotal, expiredCount, t.GetKey(torrent))
}
return false // next unassigned link
})
// empty/reset the unassigned links as we have assigned them already
if unassignedTotal > 0 {
torrent.UnassignedLinks = mapset.NewSet[string]()
t.writeTorrentToFile(torrent)
}
if assignedCount != 0 || rarCount != 1 {
return true // continue repair
}
action := t.Config.GetRarAction()
if action == "delete" {
t.repairLog.Warnf("Torrent %s is rar'ed and we cannot repair it, deleting it as configured", t.GetKey(torrent))
t.Delete(t.GetKey(torrent), true)
return false // end repair
}
newUnassignedLinks.IterCb(func(_ string, unassigned *realdebrid.Download) {
torrent.SelectedFiles.Set(unassigned.Filename, &File{
File: realdebrid.File{
ID: 0,
Path: unassigned.Filename,
Bytes: unassigned.Filesize,
Selected: 0,
},
Ended: torrent.Added,
Link: unassigned.Link,
State: NewFileState("ok_file"),
})
})
if action == "extract" {
videoFiles := []string{}
torrent.SelectedFiles.IterCb(func(_ string, file *File) {
if utils.IsVideo(file.Path) {
videoFiles = append(videoFiles, fmt.Sprintf("%d", file.ID))
} else if file.ID != 0 && t.IsPlayable(file.Path) {
t.repairLog.Debugf("Extracting file %s from rar'ed torrent %s", file.Path, t.GetKey(torrent))
info, _ := t.redownloadTorrent(torrent, []string{fmt.Sprintf("%d", file.ID)})
if info != nil {
t.willDeleteOnceDone(info.ID)
}
}
})
if len(videoFiles) > 0 {
t.repairLog.Debugf("Extracting %d video file(s) from rar'ed torrent %s", len(videoFiles), t.GetKey(torrent))
info, _ := t.redownloadTorrent(torrent, videoFiles)
if info != nil {
t.willDeleteOnceDone(info.ID)
}
}
} else {
t.repairLog.Warnf("Torrent %s is rar'ed and we cannot repair it", t.GetKey(torrent))
t.markAsUnfixable(torrent, "rar'ed by RD")
t.markAsUnplayable(torrent, "rar'ed by RD")
torrent.State.Event(context.Background(), "mark_as_repaired")
}
torrent.UnassignedLinks = mapset.NewSet[string]()
t.writeTorrentToFile(torrent)
return false // end repair
}
func (t *TorrentManager) redownloadTorrent(torrent *Torrent, selection []string) (*realdebrid.TorrentInfo, error) {
finalSelection := strings.Join(selection, ",")
if len(selection) == 0 {
// if brokenFiles is not provided, we will redownload all files
torrent.SelectedFiles.IterCb(func(_ string, file *File) {
selection = append(selection, fmt.Sprintf("%d", file.ID))
})
if len(selection) == 0 {
return nil, nil
} else {
finalSelection = strings.Join(selection, ",")
}
}
// redownload torrent
var newTorrentID string
prevState := t.latestState
resp, err := t.api.AddMagnetHash(torrent.Hash)
if err != nil {
if strings.Contains(err.Error(), "timeout") {
newState := t.getCurrentState()
if prevState.Eq(newState) {
return t.redownloadTorrent(torrent, selection)
}
// sometimes, adding a new hash will encounter a timeout but the torrent is still added
newTorrentID = t.latestState.FirstTorrentId
} else {
if strings.Contains(err.Error(), "infringing") {
t.markAsUnfixable(torrent, "infringing torrent")
} else if strings.Contains(err.Error(), "unsupported") {
t.markAsUnfixable(torrent, "unsupported torrent")
} else if strings.Contains(err.Error(), "unavailable") {
t.markAsUnfixable(torrent, "unavailable torrent")
} else if strings.Contains(err.Error(), "invalid") {
t.markAsUnfixable(torrent, "invalid torrent")
} else if strings.Contains(err.Error(), "big") {
t.markAsUnfixable(torrent, "torrent too big")
} else if strings.Contains(err.Error(), "allowed") {
t.markAsUnfixable(torrent, "torrent not allowed")
}
return nil, fmt.Errorf("cannot add magnet: %v", err)
}
}
if resp != nil {
newTorrentID = resp.ID
}
time.Sleep(2 * time.Second)
var info *realdebrid.TorrentInfo
retries := 0
for {
retries++
if retries > 10 {
t.DeleteByID(newTorrentID)
return nil, fmt.Errorf("cannot start redownloading: too many retries")
}
err = t.api.SelectTorrentFiles(newTorrentID, finalSelection)
if err != nil {
t.DeleteByID(newTorrentID)
return nil, fmt.Errorf("cannot start redownloading: %v", err)
}
time.Sleep(2 * time.Second)
info, err = t.api.GetTorrentInfo(newTorrentID)
if err != nil {
t.DeleteByID(newTorrentID)
return nil, fmt.Errorf("cannot get info on redownloaded : %v", err)
}
if info.Status == "magnet_conversion" {
time.Sleep(60 * time.Second)
continue
}
break
}
// documented status: magnet_error, magnet_conversion, waiting_files_selection, queued, downloading, downloaded, error, virus, compressing, uploading, dead
if info.Status != "downloading" && info.Status != "downloaded" && info.Status != "uploading" && info.Status != "queued" && info.Status != "compressing" {
t.DeleteByID(newTorrentID)
return nil, fmt.Errorf("non-OK state: %s", info.Status)
}
// check if incorrect number of links
if info.Progress == 100 && len(info.Links) != len(selection) {
t.DeleteByID(newTorrentID)
return nil, fmt.Errorf("only got %d links but we need %d", len(info.Links), len(selection))
} else if info.Progress != 100 {
t.repairLog.Infof("Downloading torrent %s (id=%s, progress=%d)", t.GetKey(torrent), info.ID, info.Progress)
} else {
t.repairLog.Infof("Downloaded %d file(s) of torrent %s (id=%s)", len(selection), t.GetKey(torrent), info.ID)
}
return info, nil
}
func (t *TorrentManager) canCapacityHandle() bool {
// max waiting time is 45 minutes
const maxRetries = 50
const baseDelay = 1 * time.Second
const maxDelay = 60 * time.Second
retryCount := 0
for {
count, err := t.api.GetActiveTorrentCount()
if err != nil {
t.repairLog.Warnf("Cannot get active downloads count: %v", err)
if retryCount >= maxRetries {
t.repairLog.Error("Max retries reached. Exiting.")
return false
}
delay := time.Duration(math.Pow(2, float64(retryCount))) * baseDelay
if delay > maxDelay {
delay = maxDelay
}
time.Sleep(delay)
retryCount++
continue
}
if count.DownloadingCount < count.MaxNumberOfTorrents-1 {
return true
}
delay := time.Duration(math.Pow(2, float64(retryCount))) * baseDelay
if delay > maxDelay {
delay = maxDelay
}
t.repairLog.Infof("We have reached the max number of active torrents, waiting for %s seconds before retrying", delay)
if retryCount >= maxRetries {
t.repairLog.Error("Max retries reached. Exiting.")
return false
}
time.Sleep(delay)
retryCount++
}
}
func (t *TorrentManager) markAsUnplayable(torrent *Torrent, reason string) {
t.repairLog.Debugf("Torrent %s is unplayable (reason: %s), moving to unplayable directory", t.GetKey(torrent), reason)
// reassign to unplayable torrents directory
t.DirectoryMap.IterCb(func(directory string, torrents cmap.ConcurrentMap[string, *Torrent]) {
if strings.HasPrefix(directory, "int__") {
return
}
torrents.Remove(t.GetKey(torrent))
})
torrents, _ := t.DirectoryMap.Get(config.UNPLAYABLE_TORRENTS)
torrents.Set(t.GetKey(torrent), torrent)
}
func (t *TorrentManager) markAsUnfixable(torrent *Torrent, reason string) {
t.repairLog.Warnf("Marking torrent %s as unfixable - %s", t.GetKey(torrent), reason)
torrent.UnrepairableReason = reason
t.writeTorrentToFile(torrent)
}
// getBrokenFiles returns the files that are not http links and not deleted
func (t *TorrentManager) getBrokenFiles(torrent *Torrent) ([]*File, bool) {
var brokenFiles []*File
allBroken := true
torrent.SelectedFiles.IterCb(func(_ string, file *File) {
if !utils.IsVideo(file.Path) && !t.IsPlayable(file.Path) {
return
}
if file.State.Is("broken_file") {
brokenFiles = append(brokenFiles, file)
} else {
// file is ok
allBroken = false
}
})
return brokenFiles, allBroken
}
// isStillBroken checks if the torrent is still broken
// if it's not broken anymore, it will assign the links to the files
func (t *TorrentManager) isStillBroken(info *realdebrid.TorrentInfo, brokenFiles []*File) bool {
var selectedFiles []*File
for _, file := range info.Files {
if file.Selected == 0 {
continue
}
selectedFiles = append(selectedFiles, &File{
File: file,
Ended: info.Ended,
Link: "",
State: NewFileState("broken_file"),
})
}
if len(selectedFiles) != len(info.Links) {
return true
}
for i, file := range selectedFiles {
file.Link = info.Links[i]
if strings.HasPrefix(file.Link, "https://real-debrid.com/d/") {
file.Link = file.Link[0:39]
}
file.State.Event(context.Background(), "repair_file")
}
if len(brokenFiles) == 0 {
// just check for the last file
brokenFiles = append(brokenFiles, selectedFiles[len(selectedFiles)-1])
}
// check if the broken files can now be unrestricted and downloaded
for _, oldFile := range brokenFiles {
for idx, newFile := range selectedFiles {
if oldFile.ID == newFile.ID && t.UnrestrictFile(selectedFiles[idx], true) == nil {
return true
}
}
}
return false
}
func (t *TorrentManager) ResetRepairState() {
if !t.Config.EnableRepair() {
return
}
allTorrents, _ := t.DirectoryMap.Get(INT_ALL)
allTorrents.IterCb(func(_ string, torrent *Torrent) {
err := torrent.State.Event(context.Background(), "reset_repair")
if err == nil {
t.repairLog.Debugf("Repair state of torrent %s has been reset", t.GetKey(torrent))
t.writeTorrentToFile(torrent)
}
})
}