Files
zurg/internal/torrent/repair.go
2024-01-26 16:47:54 +01:00

521 lines
16 KiB
Go

package torrent
import (
"fmt"
"math"
"strings"
"time"
"github.com/debridmediamanager/zurg/internal/config"
"github.com/debridmediamanager/zurg/pkg/realdebrid"
mapset "github.com/deckarep/golang-set/v2"
cmap "github.com/orcaman/concurrent-map/v2"
)
const (
EXPIRED_LINK_TOLERANCE_HOURS = 24
)
func (t *TorrentManager) RepairAll() {
// there is 1 repair worker, with max 1 blocking task
_ = t.repairPool.Submit(func() {
t.log.Info("Periodic repair invoked; searching for broken torrents")
t.repairAll()
})
}
func (t *TorrentManager) repairAll() {
allTorrents, _ := t.DirectoryMap.Get(INT_ALL)
// collect all torrents that need to be repaired
toRepair := mapset.NewSet[*Torrent]()
allTorrents.IterCb(func(_ string, torrent *Torrent) {
if torrent.AnyInProgress() || torrent.UnrepairableReason != "" {
return
}
// save the broken files to the file cache
// broken files are also added when trying to open a file
if torrent.BrokenLinks.Cardinality() > 0 {
infoCache, _ := t.DirectoryMap.Get(INT_INFO_CACHE)
torrent.DownloadedIDs.Each(func(id string) bool {
info, _ := infoCache.Get(id)
hasBrokenFiles := false
info.SelectedFiles.IterCb(func(_ string, file *File) {
torrent.BrokenLinks.Each(func(brokenLink string) bool {
if file.Link == brokenLink {
hasBrokenFiles = true
file.Link = ""
return true
}
return false
})
})
if hasBrokenFiles {
info.BrokenLinks = torrent.BrokenLinks
t.writeTorrentToFile(id, info)
}
return false
})
}
// check 2: for broken files
brokenFileIDs := mapset.NewSet[int]()
torrent.SelectedFiles.IterCb(func(_ string, file *File) {
if !strings.HasPrefix(file.Link, "http") && file.Link != "unselect" {
brokenFileIDs.Add(file.ID)
}
})
if brokenFileIDs.Cardinality() > 0 {
t.log.Debugf("Torrent %s has broken files (ids=%v), adding to repair list", t.GetKey(torrent), brokenFileIDs.ToSlice())
toRepair.Add(torrent)
return
}
// check 3: for expired links
if torrent.UnassignedLinks.Cardinality() > 0 {
t.log.Debugf("Torrent %s has unassigned links, adding to repair list", t.GetKey(torrent))
toRepair.Add(torrent)
return
}
})
if toRepair.Cardinality() == 0 {
t.log.Info("Periodic repair found no broken torrents to repair")
} else {
t.log.Debugf("Periodic repair found %d broken torrents to repair in total", toRepair.Cardinality())
toRepair.Each(func(torrent *Torrent) bool {
t.Repair(torrent)
return false
})
}
}
func (t *TorrentManager) Repair(torrent *Torrent) {
if torrent.UnrepairableReason != "" {
t.log.Warnf("Torrent %s is unfixable (%s), skipping repair", t.GetKey(torrent), torrent.UnrepairableReason)
return
}
if t.repairs.Contains(t.GetKey(torrent)) {
t.log.Warnf("Torrent %s is already being repaired, skipping repair", t.GetKey(torrent))
return
}
if torrent.AnyInProgress() {
t.log.Infof("Torrent %s is in progress, skipping repair until download is done", t.GetKey(torrent))
return
}
t.repairs.Add(t.GetKey(torrent))
t.log.Infof("Attempting repair for torrent %s", t.GetKey(torrent))
// blocks for approx 45 minutes if active torrents are full
if !t.canCapacityHandle() {
t.log.Error("Blocked for too long due to limit of active torrents, cannot continue with the repair")
return
}
// assign to a worker
_ = t.workerPool.Submit(func() {
t.repair(torrent)
t.repairs.Remove(t.GetKey(torrent))
})
}
func (t *TorrentManager) repair(torrent *Torrent) {
t.log.Infof("Started repair process for torrent %s", t.GetKey(torrent))
// handle torrents with incomplete links for selected files
if !t.assignUnassignedLinks(torrent) {
return
}
// get all broken files
brokenFiles := getBrokenFiles(torrent)
t.log.Debugf("Torrent %s has %d broken out of %d files", t.GetKey(torrent), len(brokenFiles), torrent.SelectedFiles.Count())
brokenFileIDs := getFileIDs(brokenFiles)
// first solution: reinsert and see if the broken file is now working
t.log.Debugf("repair_method#1: Redownloading whole torrent %s", t.GetKey(torrent))
info, err := t.redownloadTorrent(torrent, "") // reinsert the torrent, passing ""
if err != nil {
t.log.Warnf("repair_method#1: Cannot repair torrent %s (error=%s)", t.GetKey(torrent), err.Error())
} else if info != nil && info.Progress != 100 {
t.log.Infof("repair_method#1: Torrent %s is still in progress but it should work once done (torrent is temporarily hidden until download has completed)", t.GetKey(torrent))
return
} else if info != nil && info.IsDone() && !t.isStillBroken(info, brokenFiles) {
ix := 0
torrent.SelectedFiles.IterCb(func(_ string, file *File) {
file.Link = info.Links[ix]
ix++
})
t.log.Infof("Successfully repaired torrent %s using repair_method#1", t.GetKey(torrent))
return
}
if torrent.UnrepairableReason != "" {
t.log.Debugf("Torrent %s has been marked as unfixable during repair_method#1 (%s), ending repair process early", t.GetKey(torrent), torrent.UnrepairableReason)
return
}
if len(brokenFiles) == torrent.SelectedFiles.Count() {
t.log.Warnf("Torrent %s has all broken files, marking as unplayable", t.GetKey(torrent))
return
}
// second solution: add only the broken files
if len(brokenFiles) > 0 {
t.log.Infof("repair_method#2: Redownloading %dof%d files only for torrent %s", len(brokenFiles), torrent.SelectedFiles.Count(), t.GetKey(torrent))
_, err := t.redownloadTorrent(torrent, brokenFileIDs)
if err != nil {
t.log.Warnf("repair_method#2: Cannot repair torrent %s (error=%s)", t.GetKey(torrent), err.Error())
}
return
}
t.log.Infof("Torrent %s has no broken files to repair", t.GetKey(torrent))
}
func (t *TorrentManager) assignUnassignedLinks(torrent *Torrent) bool {
// handle torrents with incomplete links for selected files
assignedCount := 0
rarCount := 0
newUnassignedLinks := cmap.New[*realdebrid.Download]()
torrent.UnassignedLinks.Each(func(link string) bool {
// unrestrict each unassigned link that was filled out during torrent init
unrestrict := t.UnrestrictUntilOk(link)
if unrestrict == nil {
newUnassignedLinks.Set(link, nil)
// return early, no point continuing
return false
}
// try to assign to a selected file
assigned := false
torrent.SelectedFiles.IterCb(func(_ string, file *File) {
// base it on size because why not?
if file.Bytes == unrestrict.Filesize {
file.Link = link
assigned = true
assignedCount++
}
})
if !assigned {
// if not assigned and is a rar, likely it was rar'ed by RD
if strings.HasSuffix(strings.ToLower(unrestrict.Filename), ".rar") {
rarCount++
}
newUnassignedLinks.Set(link, unrestrict)
}
return false
})
if assignedCount == 0 && rarCount > 0 && newUnassignedLinks.Count() > 0 {
// this is a rar'ed torrent, nothing we can do
if t.Config.ShouldDeleteRarFiles() {
t.log.Warnf("Torrent %s is rar'ed and we cannot repair it, deleting it as configured", t.GetKey(torrent))
t.Delete(t.GetKey(torrent), true)
} else {
t.log.Warnf("Torrent %s is rar'ed and we cannot repair it", t.GetKey(torrent))
newUnassignedLinks.IterCb(func(_ string, unassigned *realdebrid.Download) {
if unassigned == nil {
return
}
newFile := &File{
File: realdebrid.File{
ID: 0,
Path: unassigned.Filename,
Bytes: unassigned.Filesize,
Selected: 1,
},
Ended: torrent.Added,
Link: unassigned.Link,
}
torrent.SelectedFiles.Set(unassigned.Filename, newFile)
})
torrent.UnassignedLinks = mapset.NewSet[string]()
t.markAsUnfixable(torrent, "rar'ed by RD")
t.markAsUnplayable(torrent, "rar'ed by RD")
}
t.log.Debugf("Ending repair process early for torrent %s", t.GetKey(torrent))
return false
}
// empty the unassigned links as we have assigned them
if torrent.UnassignedLinks.Cardinality() > 0 {
infoCache, _ := t.DirectoryMap.Get(INT_INFO_CACHE)
torrent.DownloadedIDs.Each(func(id string) bool {
info, _ := infoCache.Get(id)
info.UnassignedLinks = mapset.NewSet[string]()
t.writeTorrentToFile(id, info)
return false
})
}
return true
}
func (t *TorrentManager) redownloadTorrent(torrent *Torrent, selection string) (*realdebrid.TorrentInfo, error) {
// broken files means broken links
// if brokenFiles is not provided, we will redownload all files
oldTorrentIDs := make([]string, 0)
if selection == "" {
// only delete the old torrent if we are redownloading all files
oldTorrentIDs = torrent.DownloadedIDs.ToSlice()
tmpSelection := ""
torrent.SelectedFiles.IterCb(func(_ string, file *File) {
tmpSelection += fmt.Sprintf("%d,", file.ID) // select all files
})
if tmpSelection == "" {
return nil, nil // nothing to repair
} else {
selection = tmpSelection[:len(tmpSelection)-1]
}
}
// redownload torrent
resp, err := t.Api.AddMagnetHash(torrent.Hash)
if err != nil {
if strings.Contains(err.Error(), "infringing") {
t.markAsUnfixable(torrent, "infringing torrent")
} else if strings.Contains(err.Error(), "unsupported") {
t.markAsUnfixable(torrent, "unsupported torrent")
} else if strings.Contains(err.Error(), "unavailable") {
t.markAsUnfixable(torrent, "unavailable torrent")
} else if strings.Contains(err.Error(), "invalid") {
t.markAsUnfixable(torrent, "invalid torrent")
} else if strings.Contains(err.Error(), "big") {
t.markAsUnfixable(torrent, "torrent too big")
} else if strings.Contains(err.Error(), "allowed") {
t.markAsUnfixable(torrent, "torrent not allowed")
}
return nil, fmt.Errorf("cannot redownload torrent: %v", err)
}
// sleep for 1 second to let RD process the magnet
time.Sleep(1 * time.Second)
// select files
newTorrentID := resp.ID
err = t.Api.SelectTorrentFiles(newTorrentID, selection)
if err != nil {
t.Api.DeleteTorrent(newTorrentID)
t.deleteOnceDone.Add(newTorrentID)
return nil, fmt.Errorf("cannot start redownloading: %v", err)
}
// sleep for 1 second to let RD process the magnet
time.Sleep(1 * time.Second)
// see if the torrent is ready
info, err := t.Api.GetTorrentInfo(newTorrentID)
if err != nil {
t.Api.DeleteTorrent(newTorrentID)
t.deleteOnceDone.Add(newTorrentID)
return nil, fmt.Errorf("cannot get info on redownloaded torrent %s (id=%s) : %v", t.GetKey(torrent), newTorrentID, err)
}
// documented status: magnet_error, magnet_conversion, waiting_files_selection, queued, downloading, downloaded, error, virus, compressing, uploading, dead
okStatuses := []string{"magnet_conversion", "waiting_files_selection", "queued", "downloading", "downloaded", "uploading"}
// not compressing because we need playable files
isOkStatus := false
for _, status := range okStatuses {
if info.Status == status {
isOkStatus = true
break
}
}
if !isOkStatus {
t.Api.DeleteTorrent(newTorrentID)
t.deleteOnceDone.Add(newTorrentID)
return nil, fmt.Errorf("the redownloaded torrent %s (id=%s) is in error state: %s", t.GetKey(torrent), newTorrentID, info.Status)
}
// check if incorrect number of links
selectionCount := len(strings.Split(selection, ","))
if info.Progress == 100 && len(info.Links) != selectionCount {
t.Api.DeleteTorrent(newTorrentID)
t.deleteOnceDone.Add(newTorrentID)
return nil, fmt.Errorf("it did not fix the issue for %s (id=%s), only got %d files but we need %d, undoing", t.GetKey(torrent), info.ID, len(info.Links), selectionCount)
}
// looks like it's fixed
if len(oldTorrentIDs) > 0 {
// replace the old torrent
for _, id := range oldTorrentIDs {
t.Api.DeleteTorrent(id)
t.deleteOnceDone.Add(id)
}
} else {
// it's a fixer
t.fixers.Set(newTorrentID, torrent)
t.deleteOnceDone.Add(newTorrentID)
}
return info, nil
}
func (t *TorrentManager) canCapacityHandle() bool {
// max waiting time is 45 minutes
const maxRetries = 50
const baseDelay = 1 * time.Second
const maxDelay = 60 * time.Second
retryCount := 0
for {
count, err := t.Api.GetActiveTorrentCount()
if err != nil {
t.log.Warnf("Cannot get active downloads count: %v", err)
if retryCount >= maxRetries {
t.log.Error("Max retries reached. Exiting.")
return false
}
delay := time.Duration(math.Pow(2, float64(retryCount))) * baseDelay
if delay > maxDelay {
delay = maxDelay
}
time.Sleep(delay)
retryCount++
continue
}
if count.DownloadingCount < count.MaxNumberOfTorrents-1 {
return true
}
delay := time.Duration(math.Pow(2, float64(retryCount))) * baseDelay
if delay > maxDelay {
delay = maxDelay
}
t.log.Infof("We have reached the max number of active torrents, waiting for %s seconds before retrying", delay)
if retryCount >= maxRetries {
t.log.Error("Max retries reached. Exiting.")
return false
}
time.Sleep(delay)
retryCount++
}
}
func (t *TorrentManager) markAsUnplayable(torrent *Torrent, reason string) {
t.log.Warnf("Marking torrent %s as unplayable - %s", t.GetKey(torrent), reason)
t.DirectoryMap.IterCb(func(directory string, torrents cmap.ConcurrentMap[string, *Torrent]) {
torrents.Remove(t.GetKey(torrent))
})
torrents, _ := t.DirectoryMap.Get(config.UNPLAYABLE_TORRENTS)
torrents.Set(t.GetKey(torrent), torrent)
}
func (t *TorrentManager) markAsUnfixable(torrent *Torrent, reason string) {
t.log.Warnf("Marking torrent %s as unfixable - %s", t.GetKey(torrent), reason)
torrent.UnrepairableReason = reason
infoCache, _ := t.DirectoryMap.Get(INT_INFO_CACHE)
torrent.DownloadedIDs.Each(func(id string) bool {
info, _ := infoCache.Get(id)
info.UnrepairableReason = reason
t.writeTorrentToFile(id, info)
return false
})
}
// getBrokenFiles returns the files that are not http links and not unselect
func getBrokenFiles(torrent *Torrent) []*File {
var brokenFiles []*File
torrent.SelectedFiles.IterCb(func(_ string, file *File) {
if !strings.HasPrefix(file.Link, "http") && file.Link != "unselect" {
brokenFiles = append(brokenFiles, file)
}
})
return brokenFiles
}
// isStillBroken checks if the torrent is still broken
// if it's not broken anymore, it will assign the links to the files
func (t *TorrentManager) isStillBroken(info *realdebrid.TorrentInfo, brokenFiles []*File) bool {
var selectedFiles []*File
for _, file := range info.Files {
if file.Selected == 0 {
continue
}
selectedFiles = append(selectedFiles, &File{
File: file,
Ended: info.Ended,
Link: "", // no link yet
})
}
if len(selectedFiles) == len(info.Links) {
// all links are still intact! good!
for i, file := range selectedFiles {
file.Link = info.Links[i]
}
} else {
// if we can't assign links, it's still broken
return true
}
if len(brokenFiles) == 0 {
// just check for the last file
brokenFiles = append(brokenFiles, selectedFiles[len(selectedFiles)-1])
}
// check if the broken files can now be unrestricted
for _, oldFile := range brokenFiles {
for idx, newFile := range selectedFiles {
if oldFile.Path == newFile.Path || oldFile.Bytes == newFile.Bytes {
unrestrict := t.UnrestrictUntilOk(selectedFiles[idx].Link)
if unrestrict == nil || oldFile.Bytes != unrestrict.Filesize {
return true
}
}
}
}
return false
}
func (t *TorrentManager) handleFixers(fixer realdebrid.Torrent) *Torrent {
// since fixer is done, we can pop it from the fixers set
torrent, _ := t.fixers.Pop(fixer.ID)
if torrent == nil {
t.log.Warnf("repair_method#2: Fixer for %s (id=%s) is done but torrent has been deleted, deleting fixer...", fixer.Name, fixer.ID)
t.Api.DeleteTorrent(fixer.ID)
t.deleteOnceDone.Add(fixer.ID)
return nil
}
brokenFiles := getBrokenFiles(torrent)
info, err := t.redownloadTorrent(torrent, "") // reinsert
if err != nil {
t.log.Warnf("repair_method#2: Cannot repair torrent %s (error=%s)", t.GetKey(torrent), err.Error())
return nil
}
if info.IsDone() {
if !t.isStillBroken(info, brokenFiles) {
ix := 0
torrent.SelectedFiles.IterCb(func(_ string, file *File) {
for _, brokenFile := range brokenFiles {
if file.ID == brokenFile.ID {
file.Link = info.Links[ix]
ix++
break
}
}
})
t.log.Infof("Successfully repaired torrent %s using repair_method#2", t.GetKey(torrent))
} else {
t.log.Warnf("repair_method#2: Fixer is done but torrent %s is still broken; let's keep the fixer", t.GetKey(torrent))
t.Api.DeleteTorrent(info.ID)
t.deleteOnceDone.Add(fixer.ID)
return t.getMoreInfo(fixer)
}
} else {
t.log.Infof("repair_method#2: Torrent %s is still in progress but it should work once done (torrent is temporarily hidden until download has completed)", t.GetKey(torrent))
}
t.Api.DeleteTorrent(fixer.ID) // delete the fixer
t.deleteOnceDone.Add(fixer.ID)
return nil
}