Files
zurg/internal/torrent/repair.go
Ben Sarmiento 727c694c02 Repairs
2024-01-16 20:10:46 +01:00

401 lines
12 KiB
Go

package torrent
import (
"fmt"
"math"
"strings"
"time"
"github.com/debridmediamanager/zurg/internal/config"
"github.com/debridmediamanager/zurg/pkg/realdebrid"
mapset "github.com/deckarep/golang-set/v2"
cmap "github.com/orcaman/concurrent-map/v2"
)
const (
EXPIRED_LINK_TOLERANCE_HOURS = 24
)
func (t *TorrentManager) RepairAll() {
_ = t.workerPool.Submit(func() {
t.log.Info("Repairing all broken torrents")
t.repairAll()
t.log.Info("Finished repairing all torrents")
})
}
func (t *TorrentManager) repairAll() {
allTorrents, _ := t.DirectoryMap.Get(INT_ALL)
var hashGroups []mapset.Set[string]
const maxGroupSize = 399
currentGroup := mapset.NewSet[string]()
hashGroups = append(hashGroups, currentGroup)
allTorrents.IterCb(func(_ string, torrent *Torrent) {
if torrent.AnyInProgress() || torrent.Unfixable {
return
}
if currentGroup.Cardinality() >= maxGroupSize {
currentGroup = mapset.NewSet[string]()
hashGroups = append(hashGroups, currentGroup)
}
currentGroup.Add(torrent.Hash)
})
t.log.Debug("Checking if torrents are still cached")
var availabilityChecks = make(map[string]bool)
uncachedCount := 0
for i := range hashGroups {
if hashGroups[i].Cardinality() == 0 {
break
}
resp, err := t.Api.AvailabilityCheck(hashGroups[i].ToSlice())
if err != nil {
t.log.Warnf("Cannot check availability: %v", err)
continue
}
for hash, hosterHash := range resp {
// Check if HosterHash is a map (Variants field is used)
availabilityChecks[hash] = len(hosterHash.Variants) > 0
if !availabilityChecks[hash] {
uncachedCount++
}
}
}
t.log.Debugf("Found %d torrents that are no longer cached", uncachedCount)
var toRepair []*Torrent
allTorrents.IterCb(func(_ string, torrent *Torrent) {
if torrent.AnyInProgress() || torrent.Unfixable {
return
}
// check 1: for cached status
isCached := true
if _, ok := availabilityChecks[torrent.Hash]; !ok || !availabilityChecks[torrent.Hash] {
isCached = false
}
// todo: also handle file ID checks
// check 2: for broken files
hasBrokenFiles := false
torrent.SelectedFiles.IterCb(func(_ string, file *File) {
if file.Link == "repair" || file.Link == "" {
hasBrokenFiles = true
}
})
if !isCached || hasBrokenFiles {
toRepair = append(toRepair, torrent)
}
})
t.log.Debugf("Found %d broken torrents to repair in total", len(toRepair))
for i := range toRepair {
torrent := toRepair[i]
t.Repair(torrent)
}
}
func (t *TorrentManager) Repair(torrent *Torrent) {
if repairing, ok := t.Repairs.Get(t.GetKey(torrent)); ok && repairing {
t.log.Warnf("Torrent %s is already being repaired, skipping repair", t.GetKey(torrent))
return
}
t.Repairs.Set(t.GetKey(torrent), true)
if torrent.Unfixable {
t.log.Warnf("Torrent %s is unfixable, skipping repair", t.GetKey(torrent))
return
}
// save the broken files to the file cache
infoCache, _ := t.DirectoryMap.Get(INT_INFO_CACHE)
torrent.DownloadedIDs.Each(func(id string) bool {
info, _ := infoCache.Get(id)
info.SelectedFiles.IterCb(func(_ string, file *File) {
torrent.BrokenLinks.Each(func(link string) bool {
if file.Link == link {
file.Link = "repair"
}
return file.Link == link
})
})
t.writeTorrentToFile(id, info)
return false
})
_ = t.workerPool.Submit(func() {
t.log.Infof("Repairing torrent %s", t.GetKey(torrent))
t.repair(torrent)
t.log.Infof("Finished repairing torrent %s", t.GetKey(torrent))
})
}
func (t *TorrentManager) repair(torrent *Torrent) {
if torrent.AnyInProgress() {
t.log.Infof("Torrent %s is in progress, skipping repair until download is done", t.GetKey(torrent))
return
}
proceed := t.canCapacityHandle() // blocks for approx 45 minutes if active torrents are full
if !proceed {
t.log.Error("Reached the max number of active torrents, cannot continue with the repair")
return
}
// handle torrents with incomplete links for selected files
assignedCount := 0
rarCount := 0
unassignedDownloads := make([]*realdebrid.Download, 0)
assignedLinks := make([]string, 0)
torrent.UnassignedLinks.Each(func(link string) bool {
unrestrict := t.UnrestrictUntilOk(link)
if unrestrict == nil {
return false
}
// assign to a selected file
assigned := false
torrent.SelectedFiles.IterCb(func(_ string, file *File) {
// if strings.HasSuffix(file.Path, unrestrict.Filename) {
if file.Bytes == unrestrict.Filesize {
file.Link = unrestrict.Link
assigned = true
assignedCount++
}
})
if !assigned {
if strings.HasSuffix(unrestrict.Filename, ".rar") {
rarCount++
}
unassignedDownloads = append(unassignedDownloads, unrestrict)
} else {
assignedLinks = append(assignedLinks, unrestrict.Link)
}
return false
})
torrent.UnassignedLinks = torrent.UnassignedLinks.Difference(mapset.NewSet(assignedLinks...))
if assignedCount > 0 {
t.log.Infof("Assigned %d links to selected files for torrent %s", assignedCount, t.GetKey(torrent))
} else if rarCount > 0 {
// this is a rar'ed torrent, nothing we can do
if t.Config.ShouldDeleteRarFiles() {
t.log.Warnf("Torrent %s is rar'ed and we cannot repair it, deleting it as configured", t.GetKey(torrent))
t.Delete(t.GetKey(torrent), true)
} else {
for _, unassigned := range unassignedDownloads {
newFile := &File{
File: realdebrid.File{
ID: 0,
Path: unassigned.Filename,
Bytes: unassigned.Filesize,
Selected: 1,
},
Ended: torrent.Added,
Link: unassigned.Link,
}
torrent.SelectedFiles.Set(unassigned.Filename, newFile)
}
t.markAsUnfixable(torrent)
t.markAsUnplayable(torrent)
}
return
}
// second solution: add only the broken files
var brokenFiles []File
torrent.SelectedFiles.IterCb(func(_ string, file *File) {
if !strings.HasPrefix(file.Link, "http") && file.Link != "unselect" {
brokenFiles = append(brokenFiles, *file)
}
})
t.log.Debugf("During repair, zurg found %d broken files for torrent %s", len(brokenFiles), t.GetKey(torrent))
if len(brokenFiles) == 1 && torrent.SelectedFiles.Count() > 2 {
// if we download a single file, it will be named differently
// so we need to download 1 extra file to preserve the name
// this is only relevant if we enable retain_rd_torrent_name
// add the first file link encountered with a prefix of http
t.log.Debugf("Torrent %s has only 1 broken file, adding 1 extra file to preserve the name", t.GetKey(torrent))
for _, file := range torrent.SelectedFiles.Items() {
if strings.HasPrefix(file.Link, "http") {
brokenFiles = append(brokenFiles, *file)
break
}
}
}
if len(brokenFiles) > 0 {
t.log.Infof("Redownloading %dof%d files for torrent %s", len(brokenFiles), torrent.SelectedFiles.Count(), t.GetKey(torrent))
brokenFileIDs := strings.Join(getFileIDs(brokenFiles), ",")
if t.reinsertTorrent(torrent, brokenFileIDs) {
t.log.Infof("Successfully downloaded torrent %s to repair it", t.GetKey(torrent))
} else {
t.log.Warnf("Cannot repair torrent %s", t.GetKey(torrent))
}
} else {
t.log.Warnf("Torrent %s has no broken files to repair", t.GetKey(torrent))
}
}
func (t *TorrentManager) reinsertTorrent(torrent *Torrent, brokenFiles string) bool {
// broken files means broken links
oldTorrentIDs := make([]string, 0)
// if brokenFiles is not provided
if brokenFiles == "" {
// only replace the torrent if we are reinserting all files
oldTorrentIDs = torrent.DownloadedIDs.ToSlice()
tmpSelection := ""
torrent.SelectedFiles.IterCb(func(_ string, file *File) {
tmpSelection += fmt.Sprintf("%d,", file.ID) // select all files
})
if tmpSelection == "" {
return true // nothing to repair
} else {
brokenFiles = tmpSelection[:len(tmpSelection)-1]
}
}
// redownload torrent
resp, err := t.Api.AddMagnetHash(torrent.Hash)
if err != nil {
t.log.Warnf("Cannot redownload torrent: %v", err)
if strings.Contains(err.Error(), "infringing_file") {
t.markAsUnfixable(torrent)
}
return false
}
time.Sleep(1 * time.Second)
// select files
newTorrentID := resp.ID
err = t.Api.SelectTorrentFiles(newTorrentID, brokenFiles)
if err != nil {
t.log.Warnf("Cannot start redownloading: %v", err)
t.Api.DeleteTorrent(newTorrentID)
return false
}
// see if the torrent is ready
info, err := t.Api.GetTorrentInfo(newTorrentID)
if err != nil {
t.log.Warnf("Cannot get info on redownloaded torrent id=%s : %v", newTorrentID, err)
t.Api.DeleteTorrent(newTorrentID)
return false
}
// documented status: magnet_error, magnet_conversion, waiting_files_selection, queued, downloading, downloaded, error, virus, compressing, uploading, dead
okStatuses := []string{"magnet_conversion", "waiting_files_selection", "queued", "downloading", "downloaded", "uploading"}
// not compressing because we need playable files
isOkStatus := false
for _, status := range okStatuses {
if info.Status == status {
isOkStatus = true
break
}
}
if !isOkStatus {
t.log.Warnf("The redownloaded torrent id=%s is in error state: %s", newTorrentID, info.Status)
t.Api.DeleteTorrent(newTorrentID)
return false
}
if info.Progress != 100 {
t.log.Infof("Torrent id=%s is not cached anymore so we have to wait until completion (this should fix the issue already)", info.ID)
t.forRepairs.Add(newTorrentID)
if len(oldTorrentIDs) > 0 {
for _, id := range oldTorrentIDs {
t.Api.DeleteTorrent(id)
}
} else {
t.forRepairs.Add(newTorrentID)
}
return true
}
brokenCount := len(strings.Split(brokenFiles, ","))
if len(info.Links) != brokenCount {
t.log.Infof("It did not fix the issue for id=%s, only got %d files but we need %d, undoing", info.ID, len(info.Links), brokenCount)
t.Api.DeleteTorrent(newTorrentID)
return false
}
t.log.Infof("Repair successful id=%s", newTorrentID)
t.forRepairs.Add(newTorrentID)
if len(oldTorrentIDs) > 0 {
for _, id := range oldTorrentIDs {
t.Api.DeleteTorrent(id)
}
} else {
t.forRepairs.Add(newTorrentID)
}
return true
}
func (t *TorrentManager) canCapacityHandle() bool {
// max waiting time is 45 minutes
const maxRetries = 50
const baseDelay = 1 * time.Second
const maxDelay = 60 * time.Second
retryCount := 0
for {
count, err := t.Api.GetActiveTorrentCount()
if err != nil {
t.log.Warnf("Cannot get active downloads count: %v", err)
if retryCount >= maxRetries {
t.log.Error("Max retries reached. Exiting.")
return false
}
delay := time.Duration(math.Pow(2, float64(retryCount))) * baseDelay
if delay > maxDelay {
delay = maxDelay
}
time.Sleep(delay)
retryCount++
continue
}
if count.DownloadingCount < count.MaxNumberOfTorrents {
return true
}
delay := time.Duration(math.Pow(2, float64(retryCount))) * baseDelay
if delay > maxDelay {
delay = maxDelay
}
t.log.Infof("We have reached the max number of active torrents, waiting for %s seconds before retrying", delay)
if retryCount >= maxRetries {
t.log.Error("Max retries reached. Exiting.")
return false
}
time.Sleep(delay)
retryCount++
}
}
func (t *TorrentManager) markAsUnplayable(torrent *Torrent) {
t.log.Warnf("Marking torrent %s as unplayable", t.GetKey(torrent))
t.DirectoryMap.IterCb(func(directory string, torrents cmap.ConcurrentMap[string, *Torrent]) {
torrents.Remove(t.GetKey(torrent))
})
torrents, _ := t.DirectoryMap.Get(config.UNPLAYABLE_TORRENTS)
torrents.Set(t.GetKey(torrent), torrent)
}
func (t *TorrentManager) markAsUnfixable(torrent *Torrent) {
t.log.Warnf("Marking torrent %s as unfixable", t.GetKey(torrent))
torrent.Unfixable = true
infoCache, _ := t.DirectoryMap.Get(INT_INFO_CACHE)
torrent.DownloadedIDs.Each(func(id string) bool {
info, _ := infoCache.Get(id)
info.Unfixable = true
t.writeTorrentToFile(id, info)
return false
})
}