Fix repairs

This commit is contained in:
Ben Sarmiento
2024-01-18 20:54:41 +01:00
parent 49fd17c3d4
commit 0a451cccde
4 changed files with 93 additions and 77 deletions

View File

@@ -28,8 +28,8 @@ type TorrentManager struct {
DownloadCache cmap.ConcurrentMap[string, *realdebrid.Download] DownloadCache cmap.ConcurrentMap[string, *realdebrid.Download]
DownloadMap cmap.ConcurrentMap[string, *realdebrid.Download] DownloadMap cmap.ConcurrentMap[string, *realdebrid.Download]
Repairs cmap.ConcurrentMap[string, bool] Repairs cmap.ConcurrentMap[string, bool]
onlyForRepair cmap.ConcurrentMap[string, *Torrent]
allAccessKeys mapset.Set[string] allAccessKeys mapset.Set[string]
onlyForRepair mapset.Set[string]
latestState *LibraryState latestState *LibraryState
requiredVersion string requiredVersion string
workerPool *ants.Pool workerPool *ants.Pool
@@ -43,8 +43,12 @@ func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, p
t := &TorrentManager{ t := &TorrentManager{
Config: cfg, Config: cfg,
Api: api, Api: api,
DirectoryMap: cmap.New[cmap.ConcurrentMap[string, *Torrent]](),
DownloadCache: cmap.New[*realdebrid.Download](),
DownloadMap: cmap.New[*realdebrid.Download](),
onlyForRepair: cmap.New[*Torrent](),
Repairs: cmap.New[bool](),
allAccessKeys: mapset.NewSet[string](), allAccessKeys: mapset.NewSet[string](),
onlyForRepair: mapset.NewSet[string](),
latestState: &LibraryState{}, latestState: &LibraryState{},
requiredVersion: "11.01.2024", requiredVersion: "11.01.2024",
workerPool: p, workerPool: p,
@@ -52,7 +56,6 @@ func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, p
} }
// create internal directories // create internal directories
t.DirectoryMap = cmap.New[cmap.ConcurrentMap[string, *Torrent]]()
t.DirectoryMap.Set(INT_ALL, cmap.New[*Torrent]()) // key is GetAccessKey() t.DirectoryMap.Set(INT_ALL, cmap.New[*Torrent]()) // key is GetAccessKey()
t.DirectoryMap.Set(INT_INFO_CACHE, cmap.New[*Torrent]()) // key is Torrent ID t.DirectoryMap.Set(INT_INFO_CACHE, cmap.New[*Torrent]()) // key is Torrent ID
// create directory maps // create directory maps
@@ -61,9 +64,6 @@ func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, p
} }
// Fetch downloads // Fetch downloads
t.DownloadCache = cmap.New[*realdebrid.Download]()
t.DownloadMap = cmap.New[*realdebrid.Download]()
t.Repairs = cmap.New[bool]()
if t.Config.EnableDownloadCache() { if t.Config.EnableDownloadCache() {
_ = t.workerPool.Submit(func() { _ = t.workerPool.Submit(func() {
page := 1 page := 1

View File

@@ -22,12 +22,23 @@ func (t *TorrentManager) RefreshTorrents() []string {
infoChan := make(chan *Torrent, len(instances)) infoChan := make(chan *Torrent, len(instances))
var wg sync.WaitGroup var wg sync.WaitGroup
allTorrents, _ := t.DirectoryMap.Get(INT_ALL)
for i := range instances { for i := range instances {
idx := i idx := i
wg.Add(1) wg.Add(1)
_ = t.workerPool.Submit(func() { _ = t.workerPool.Submit(func() {
defer wg.Done() defer wg.Done()
if instances[idx].Progress == 100 && t.onlyForRepair.Has(instances[idx].ID) {
torrent, _ := t.onlyForRepair.Get(instances[idx].ID)
brokenFiles := getBrokenFiles(torrent)
info, err := t.redownloadTorrent(torrent, "")
if err == nil && info.Progress == 100 && !t.isStillBroken(info, brokenFiles) {
t.onlyForRepair.Remove(instances[idx].ID)
}
infoChan <- nil
} else {
infoChan <- t.getMoreInfo(instances[idx]) infoChan <- t.getMoreInfo(instances[idx])
}
}) })
} }
@@ -36,19 +47,13 @@ func (t *TorrentManager) RefreshTorrents() []string {
t.log.Debugf("Fetched info for %d torrents", len(instances)) t.log.Debugf("Fetched info for %d torrents", len(instances))
freshKeys := mapset.NewSet[string]() freshKeys := mapset.NewSet[string]()
allTorrents, _ := t.DirectoryMap.Get(INT_ALL)
noInfoCount := 0 noInfoCount := 0
for info := range infoChan { for info := range infoChan {
if info == nil { if info == nil {
noInfoCount++ noInfoCount++
continue continue
} }
accessKey := t.GetKey(info) accessKey := t.GetKey(info)
if t.handleRepairTorrents(info) {
continue
}
if !info.AnyInProgress() { if !info.AnyInProgress() {
freshKeys.Add(accessKey) freshKeys.Add(accessKey)
} }

View File

@@ -88,7 +88,7 @@ func (t *TorrentManager) repairAll() {
} }
}) })
if !isCached || hasBrokenFiles { if !isCached || hasBrokenFiles || torrent.UnassignedLinks.Cardinality() > 0 {
toRepair = append(toRepair, torrent) toRepair = append(toRepair, torrent)
} }
}) })
@@ -147,45 +147,56 @@ func (t *TorrentManager) repair(torrent *Torrent) {
// handle torrents with incomplete links for selected files // handle torrents with incomplete links for selected files
assignedCount := 0 assignedCount := 0
// number of rar files detected from the unrestricted links
rarCount := 0 rarCount := 0
unassignedDownloads := make([]*realdebrid.Download, 0)
assignedLinks := make([]string, 0) newUnassignedLinks := cmap.New[*realdebrid.Download]()
// unrestrict each unassigned link that was filled out during torrent init
torrent.UnassignedLinks.Each(func(link string) bool { torrent.UnassignedLinks.Each(func(link string) bool {
unrestrict := t.UnrestrictUntilOk(link) unrestrict := t.UnrestrictUntilOk(link)
if unrestrict == nil { if unrestrict == nil {
newUnassignedLinks.Set(link, nil)
// return early, no point continuing
return false return false
} }
// assign to a selected file
// try to assign to a selected file
assigned := false assigned := false
torrent.SelectedFiles.IterCb(func(_ string, file *File) { torrent.SelectedFiles.IterCb(func(_ string, file *File) {
// if strings.HasSuffix(file.Path, unrestrict.Filename) { // base it on size because why not?
if file.Bytes == unrestrict.Filesize { if file.Bytes == unrestrict.Filesize {
file.Link = unrestrict.Link file.Link = link
assigned = true assigned = true
assignedCount++ assignedCount++
} }
}) })
if !assigned { if !assigned {
// if not assigned and is a rar, likely it was rar'ed by RD
if strings.HasSuffix(unrestrict.Filename, ".rar") { if strings.HasSuffix(unrestrict.Filename, ".rar") {
rarCount++ rarCount++
} }
unassignedDownloads = append(unassignedDownloads, unrestrict) newUnassignedLinks.Set(link, unrestrict)
} else {
assignedLinks = append(assignedLinks, unrestrict.Link)
} }
return false return false
}) })
torrent.UnassignedLinks = torrent.UnassignedLinks.Difference(mapset.NewSet(assignedLinks...))
if assignedCount > 0 { if assignedCount > 0 {
// if there are any assigned count
t.log.Infof("Assigned %d links to selected files for torrent %s", assignedCount, t.GetKey(torrent)) t.log.Infof("Assigned %d links to selected files for torrent %s", assignedCount, t.GetKey(torrent))
} else if rarCount > 0 { } else if rarCount > 0 {
// also is assignedCount=0
// this is a rar'ed torrent, nothing we can do // this is a rar'ed torrent, nothing we can do
if t.Config.ShouldDeleteRarFiles() { if t.Config.ShouldDeleteRarFiles() {
t.log.Warnf("Torrent %s is rar'ed and we cannot repair it, deleting it as configured", t.GetKey(torrent)) t.log.Warnf("Torrent %s is rar'ed and we cannot repair it, deleting it as configured", t.GetKey(torrent))
t.Delete(t.GetKey(torrent), true) t.Delete(t.GetKey(torrent), true)
} else { } else {
for _, unassigned := range unassignedDownloads { newUnassignedLinks.IterCb(func(_ string, unassigned *realdebrid.Download) {
if unassigned == nil {
return
}
newFile := &File{ newFile := &File{
File: realdebrid.File{ File: realdebrid.File{
ID: 0, ID: 0,
@@ -197,40 +208,47 @@ func (t *TorrentManager) repair(torrent *Torrent) {
Link: unassigned.Link, Link: unassigned.Link,
} }
torrent.SelectedFiles.Set(unassigned.Filename, newFile) torrent.SelectedFiles.Set(unassigned.Filename, newFile)
} })
torrent.UnassignedLinks = mapset.NewSet[string]()
t.markAsUnfixable(torrent) t.markAsUnfixable(torrent)
t.markAsUnplayable(torrent) t.markAsUnplayable(torrent)
} }
return return
} }
// second solution: add only the broken files // get all broken files
var brokenFiles []File brokenFiles := getBrokenFiles(torrent)
torrent.SelectedFiles.IterCb(func(_ string, file *File) {
if !strings.HasPrefix(file.Link, "http") && file.Link != "unselect" {
brokenFiles = append(brokenFiles, *file)
}
})
t.log.Debugf("During repair, zurg found %d broken files for torrent %s", len(brokenFiles), t.GetKey(torrent)) t.log.Debugf("During repair, zurg found %d broken files for torrent %s", len(brokenFiles), t.GetKey(torrent))
if len(brokenFiles) > 0 { // first solution: reinsert and see if the broken file is now working
t.log.Infof("Redownloading %dof%d files for torrent %s", len(brokenFiles), torrent.SelectedFiles.Count(), t.GetKey(torrent)) t.log.Debugf("Repair_try#1: Trying to redownload torrent %s to repair it", t.GetKey(torrent))
brokenFileIDs := strings.Join(getFileIDs(brokenFiles), ",") info, err := t.redownloadTorrent(torrent, "")
if t.redownloadTorrent(torrent, brokenFileIDs) { if err != nil {
t.log.Infof("Successfully downloaded torrent %s to repair it", t.GetKey(torrent))
} else {
t.log.Warnf("Cannot repair torrent %s", t.GetKey(torrent)) t.log.Warnf("Cannot repair torrent %s", t.GetKey(torrent))
} }
if info.Progress != 100 || (info.Progress == 100 && !t.isStillBroken(info, brokenFiles)) {
t.log.Infof("Successfully repaired torrent %s", t.GetKey(torrent))
return
}
// second solution: add only the broken files
if len(brokenFiles) > 0 {
t.log.Infof("Repair_try#2: Redownloading %dof%d broken files for torrent %s", len(brokenFiles), torrent.SelectedFiles.Count(), t.GetKey(torrent))
brokenFileIDs := strings.Join(getFileIDs(brokenFiles), ",")
_, err := t.redownloadTorrent(torrent, brokenFileIDs)
if err != nil {
t.log.Warnf("Cannot repair torrent %s", t.GetKey(torrent))
} else {
t.log.Infof("Successfully repaired torrent %s", t.GetKey(torrent))
}
} else { } else {
t.log.Warnf("Torrent %s has no broken files to repair", t.GetKey(torrent)) t.log.Warnf("Torrent %s has no broken files to repair", t.GetKey(torrent))
} }
} }
func (t *TorrentManager) redownloadTorrent(torrent *Torrent, brokenFiles string) bool { func (t *TorrentManager) redownloadTorrent(torrent *Torrent, brokenFiles string) (*realdebrid.TorrentInfo, error) {
t.log.Debugf("Redownloading torrent %s, broken files=%s (all if empty)", t.GetKey(torrent), brokenFiles) t.log.Debugf("Redownloading torrent %s, broken files=%s (all if empty)", t.GetKey(torrent), brokenFiles)
oldTorrentIDs := make([]string, 0) oldTorrentIDs := make([]string, 0)
// broken files means broken links // broken files means broken links
// if brokenFiles is not provided // if brokenFiles is not provided
if brokenFiles == "" { if brokenFiles == "" {
@@ -241,7 +259,7 @@ func (t *TorrentManager) redownloadTorrent(torrent *Torrent, brokenFiles string)
tmpSelection += fmt.Sprintf("%d,", file.ID) // select all files tmpSelection += fmt.Sprintf("%d,", file.ID) // select all files
}) })
if tmpSelection == "" { if tmpSelection == "" {
return true // nothing to repair return nil, nil // nothing to repair
} else { } else {
brokenFiles = tmpSelection[:len(tmpSelection)-1] brokenFiles = tmpSelection[:len(tmpSelection)-1]
} }
@@ -250,11 +268,10 @@ func (t *TorrentManager) redownloadTorrent(torrent *Torrent, brokenFiles string)
// redownload torrent // redownload torrent
resp, err := t.Api.AddMagnetHash(torrent.Hash) resp, err := t.Api.AddMagnetHash(torrent.Hash)
if err != nil { if err != nil {
t.log.Warnf("Cannot redownload torrent: %v", err)
if strings.Contains(err.Error(), "infringing_file") { if strings.Contains(err.Error(), "infringing_file") {
t.markAsUnfixable(torrent) t.markAsUnfixable(torrent)
} }
return false return nil, fmt.Errorf("cannot redownload torrent: %v", err)
} }
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
@@ -262,17 +279,16 @@ func (t *TorrentManager) redownloadTorrent(torrent *Torrent, brokenFiles string)
newTorrentID := resp.ID newTorrentID := resp.ID
err = t.Api.SelectTorrentFiles(newTorrentID, brokenFiles) err = t.Api.SelectTorrentFiles(newTorrentID, brokenFiles)
if err != nil { if err != nil {
t.log.Warnf("Cannot start redownloading: %v", err)
t.Api.DeleteTorrent(newTorrentID) t.Api.DeleteTorrent(newTorrentID)
return false return nil, fmt.Errorf("cannot start redownloading: %v", err)
} }
time.Sleep(1 * time.Second)
// see if the torrent is ready // see if the torrent is ready
info, err := t.Api.GetTorrentInfo(newTorrentID) info, err := t.Api.GetTorrentInfo(newTorrentID)
if err != nil { if err != nil {
t.log.Warnf("Cannot get info on redownloaded torrent id=%s : %v", newTorrentID, err)
t.Api.DeleteTorrent(newTorrentID) t.Api.DeleteTorrent(newTorrentID)
return false return nil, fmt.Errorf("cannot get info on redownloaded torrent id=%s : %v", newTorrentID, err)
} }
// documented status: magnet_error, magnet_conversion, waiting_files_selection, queued, downloading, downloaded, error, virus, compressing, uploading, dead // documented status: magnet_error, magnet_conversion, waiting_files_selection, queued, downloading, downloaded, error, virus, compressing, uploading, dead
@@ -286,41 +302,39 @@ func (t *TorrentManager) redownloadTorrent(torrent *Torrent, brokenFiles string)
} }
} }
if !isOkStatus { if !isOkStatus {
t.log.Warnf("The redownloaded torrent id=%s is in error state: %s", newTorrentID, info.Status)
t.Api.DeleteTorrent(newTorrentID) t.Api.DeleteTorrent(newTorrentID)
return false return nil, fmt.Errorf("the redownloaded torrent id=%s is in error state: %s", newTorrentID, info.Status)
} }
if info.Progress != 100 { if info.Progress != 100 {
t.log.Infof("Torrent id=%s is not cached anymore so we have to wait until completion (this should fix the issue already)", info.ID) t.log.Infof("Torrent id=%s is not cached anymore so we have to wait until completion (this should fix the issue already)", info.ID)
t.onlyForRepair.Add(newTorrentID)
if len(oldTorrentIDs) > 0 { if len(oldTorrentIDs) > 0 {
// only triggered when brokenFiles == ""
for _, id := range oldTorrentIDs { for _, id := range oldTorrentIDs {
t.Api.DeleteTorrent(id) t.Api.DeleteTorrent(id)
} }
} else { } else {
t.onlyForRepair.Add(newTorrentID) t.onlyForRepair.Set(newTorrentID, torrent)
} }
return true return info, nil
} }
brokenCount := len(strings.Split(brokenFiles, ",")) brokenCount := len(strings.Split(brokenFiles, ","))
if len(info.Links) != brokenCount { if len(info.Links) != brokenCount {
t.log.Infof("It did not fix the issue for id=%s, only got %d files but we need %d, undoing", info.ID, len(info.Links), brokenCount)
t.Api.DeleteTorrent(newTorrentID) t.Api.DeleteTorrent(newTorrentID)
return false return nil, fmt.Errorf("it did not fix the issue for id=%s, only got %d files but we need %d, undoing", info.ID, len(info.Links), brokenCount)
} }
t.log.Infof("Redownload successful id=%s", newTorrentID) t.log.Infof("Redownload successful id=%s", newTorrentID)
t.onlyForRepair.Add(newTorrentID)
if len(oldTorrentIDs) > 0 { if len(oldTorrentIDs) > 0 {
// only triggered when brokenFiles == ""
for _, id := range oldTorrentIDs { for _, id := range oldTorrentIDs {
t.Api.DeleteTorrent(id) t.Api.DeleteTorrent(id)
} }
} else { } else {
t.onlyForRepair.Add(newTorrentID) t.onlyForRepair.Set(newTorrentID, torrent)
} }
return true return info, nil
} }
func (t *TorrentManager) canCapacityHandle() bool { func (t *TorrentManager) canCapacityHandle() bool {
@@ -387,29 +401,26 @@ func (t *TorrentManager) markAsUnfixable(torrent *Torrent) {
}) })
} }
func (t *TorrentManager) handleRepairTorrents(info *Torrent) bool { func getBrokenFiles(torrent *Torrent) []*File {
allTorrents, _ := t.DirectoryMap.Get(INT_ALL) var brokenFiles []*File
accessKey := t.GetKey(info) torrent.SelectedFiles.IterCb(func(_ string, file *File) {
torrentIDs := info.DownloadedIDs.ToSlice() if !strings.HasPrefix(file.Link, "http") && file.Link != "unselect" {
inRepairList := false brokenFiles = append(brokenFiles, file)
for _, torrentID := range torrentIDs {
if t.onlyForRepair.Contains(torrentID) {
inRepairList = true
break
}
}
if !info.AnyInProgress() && inRepairList {
t.log.Debugf("Newly downloaded %s (id=%s) is in repair list", info.Name, torrentIDs[0])
torrent, stillExists := allTorrents.Get(accessKey)
if stillExists && t.redownloadTorrent(torrent, "") {
t.log.Debugf("Deleting repair temp id=%s because it has served its purpose", torrentIDs[0])
// if it's 100% and it's a temp repair, remove it
for _, torrentID := range torrentIDs {
t.Api.DeleteTorrent(torrentID)
t.onlyForRepair.Remove(torrentID)
}
} }
})
return brokenFiles
}
func (t *TorrentManager) isStillBroken(info *realdebrid.TorrentInfo, brokenFiles []*File) bool {
for _, oldFile := range brokenFiles {
for idx, newFile := range info.Files {
if oldFile.Path == newFile.Path {
unrestrict := t.UnrestrictUntilOk(info.Links[idx])
if unrestrict == nil || oldFile.Bytes != unrestrict.Filesize {
return true return true
} }
}
}
}
return false return false
} }

View File

@@ -4,7 +4,7 @@ import (
"fmt" "fmt"
) )
func getFileIDs(files []File) []string { func getFileIDs(files []*File) []string {
var fileIDs []string var fileIDs []string
for _, file := range files { for _, file := range files {
if file.ID != 0 { if file.ID != 0 {