Implement autoheal feature

This commit is contained in:
Ben Sarmiento
2023-10-23 20:01:55 +02:00
parent 6298830ea6
commit 21cbb16b88
6 changed files with 218 additions and 86 deletions

View File

@@ -4,6 +4,7 @@ import (
"encoding/gob"
"fmt"
"log"
"math"
"os"
"strings"
"sync"
@@ -91,6 +92,8 @@ func (t *TorrentManager) MarkFileAsDeleted(torrent *Torrent, file *File) {
log.Println("Marking file as deleted", file.Path)
file.Link = ""
t.writeToFile(torrent.ID, torrent)
log.Println("Healing a single file in the torrent", torrent.Name)
t.heal(torrent.ID, []File{*file})
}
// GetInfo returns the info for a torrent
@@ -114,7 +117,7 @@ func (t *TorrentManager) getChecksum() string {
log.Println("Huh, no torrents returned")
return t.checksum
}
return fmt.Sprintf("%d-%s", totalCount, torrents[0].ID)
return fmt.Sprintf("%d-%s-%v", totalCount, torrents[0].ID, torrents[0].Progress == 100)
}
// refreshTorrents periodically refreshes the torrents
@@ -221,9 +224,11 @@ func (t *TorrentManager) getInfo(torrentID string) *Torrent {
if torrentFromFile != nil {
torrent := t.getByID(torrentID)
if torrent != nil {
torrent.SelectedFiles = torrentFromFile.SelectedFiles
if len(torrentFromFile.SelectedFiles) == len(torrent.Links) {
torrent.SelectedFiles = torrentFromFile.SelectedFiles
return torrent
}
}
return torrent
}
log.Println("Getting info for", torrentID)
info, err := realdebrid.GetTorrentInfo(t.config.GetToken(), torrentID)
@@ -242,62 +247,9 @@ func (t *TorrentManager) getInfo(torrentID string) *Torrent {
})
}
if len(selectedFiles) != len(info.Links) {
// TODO: This means some files have expired
// we need to 'fix' this torrent then, at least the missing selected files
log.Println("Some links has expired for", info.Name)
type Result struct {
Response *realdebrid.UnrestrictResponse
}
resultsChan := make(chan Result, len(info.Links))
var wg sync.WaitGroup
// Limit concurrency
sem := make(chan struct{}, t.config.GetNumOfWorkers())
for _, link := range info.Links {
wg.Add(1)
sem <- struct{}{} // Acquire semaphore
go func(lnk string) {
defer wg.Done()
defer func() { <-sem }() // Release semaphore
unrestrictFn := func() (*realdebrid.UnrestrictResponse, error) {
return realdebrid.UnrestrictCheck(t.config.GetToken(), lnk)
}
resp := realdebrid.RetryUntilOk(unrestrictFn)
if resp != nil {
resultsChan <- Result{Response: resp}
}
}(link)
}
go func() {
wg.Wait()
close(sem)
close(resultsChan)
}()
for result := range resultsChan {
found := false
for i := range selectedFiles {
if strings.HasSuffix(selectedFiles[i].Path, result.Response.Filename) {
selectedFiles[i].Link = result.Response.Link
found = true
}
}
if !found {
selectedFiles = append(selectedFiles, File{
File: realdebrid.File{
Path: result.Response.Filename,
Bytes: result.Response.Filesize,
Selected: 1,
},
Link: result.Response.Link,
})
}
}
selectedFiles = t.organizeChaos(info, selectedFiles)
t.heal(torrentID, selectedFiles)
} else {
for i, link := range info.Links {
selectedFiles[i].Link = link
@@ -365,3 +317,191 @@ func (t *TorrentManager) readFromFile(torrentID string) *Torrent {
return &torrent
}
func (t *TorrentManager) reinsertTorrent(oldTorrentID string, missingFiles string, deleteIfFailed bool) bool {
torrent := t.GetInfo(oldTorrentID)
if torrent == nil {
return false
}
if missingFiles == "" {
var selection string
for _, file := range torrent.SelectedFiles {
if file.Link == "" {
selection += fmt.Sprintf("%d,", file.ID)
}
}
if selection == "" {
return false
}
missingFiles = selection[:len(selection)-1]
}
// reinsert torrent
resp, err := realdebrid.AddMagnetHash(t.config.GetToken(), torrent.Hash)
if err != nil {
log.Printf("Cannot reinsert torrent: %v\n", err)
return false
}
newTorrentID := resp.ID
err = realdebrid.SelectTorrentFiles(t.config.GetToken(), newTorrentID, missingFiles)
if err != nil {
log.Printf("Cannot select files on reinserted torrent: %v\n", err)
}
if deleteIfFailed {
if err != nil {
realdebrid.DeleteTorrent(t.config.GetToken(), newTorrentID)
return false
}
time.Sleep(1 * time.Second)
// see if the torrent is ready
info, err := realdebrid.GetTorrentInfo(t.config.GetToken(), newTorrentID)
if err != nil {
log.Printf("Cannot get info on reinserted torrent: %v\n", err)
if deleteIfFailed {
realdebrid.DeleteTorrent(t.config.GetToken(), newTorrentID)
}
return false
}
time.Sleep(1 * time.Second)
if info.Progress != 100 {
log.Printf("Torrent is not cached anymore, %d%%\n", info.Progress)
realdebrid.DeleteTorrent(t.config.GetToken(), newTorrentID)
return false
}
if len(info.Links) != len(torrent.SelectedFiles) {
log.Printf("It doesn't fix the problem, got %d but we need %d\n", len(info.Links), len(torrent.SelectedFiles))
realdebrid.DeleteTorrent(t.config.GetToken(), newTorrentID)
return false
}
log.Println("Reinsertion successful, deleting old torrent")
realdebrid.DeleteTorrent(t.config.GetToken(), oldTorrentID)
}
return true
}
func (t *TorrentManager) organizeChaos(info *realdebrid.Torrent, selectedFiles []File) []File {
type Result struct {
Response *realdebrid.UnrestrictResponse
}
resultsChan := make(chan Result, len(info.Links))
var wg sync.WaitGroup
// Limit concurrency
sem := make(chan struct{}, t.config.GetNumOfWorkers())
for _, link := range info.Links {
wg.Add(1)
sem <- struct{}{} // Acquire semaphore
go func(lnk string) {
defer wg.Done()
defer func() { <-sem }() // Release semaphore
unrestrictFn := func() (*realdebrid.UnrestrictResponse, error) {
return realdebrid.UnrestrictCheck(t.config.GetToken(), lnk)
}
resp := realdebrid.RetryUntilOk(unrestrictFn)
if resp != nil {
resultsChan <- Result{Response: resp}
}
}(link)
}
go func() {
wg.Wait()
close(sem)
close(resultsChan)
}()
for result := range resultsChan {
found := false
for i := range selectedFiles {
if strings.HasSuffix(selectedFiles[i].Path, result.Response.Filename) {
selectedFiles[i].Link = result.Response.Link
found = true
}
}
if !found {
selectedFiles = append(selectedFiles, File{
File: realdebrid.File{
Path: result.Response.Filename,
Bytes: result.Response.Filesize,
Selected: 1,
},
Link: result.Response.Link,
})
}
}
return selectedFiles
}
func (t *TorrentManager) heal(torrentID string, selectedFiles []File) {
// max waiting time is 45 minutes
const maxRetries = 50
const baseDelay = 1 * time.Second
const maxDelay = 60 * time.Second
retryCount := 0
for {
count, err := realdebrid.GetActiveTorrentCount(t.config.GetToken())
if err != nil {
log.Printf("Cannot get active torrent count: %v\n", err)
if retryCount >= maxRetries {
log.Println("Max retries reached. Exiting.")
return
}
delay := time.Duration(math.Pow(2, float64(retryCount))) * baseDelay
if delay > maxDelay {
delay = maxDelay
}
time.Sleep(delay)
retryCount++
continue
}
if count.DownloadingCount < count.MaxNumberOfTorrents {
log.Printf("We can still add a new torrent, %d/%d\n", count.DownloadingCount, count.MaxNumberOfTorrents)
break
}
if retryCount >= maxRetries {
log.Println("Max retries reached. Exiting.")
return
}
delay := time.Duration(math.Pow(2, float64(retryCount))) * baseDelay
if delay > maxDelay {
delay = maxDelay
}
time.Sleep(delay)
retryCount++
}
// now we can get the missing files
half := len(selectedFiles) / 2
missingFiles1 := getMissingFiles(0, half, selectedFiles)
missingFiles2 := getMissingFiles(half, len(selectedFiles), selectedFiles)
// first solution: add the same selection, maybe it can be fixed by reinsertion?
success := t.reinsertTorrent(torrentID, "", true)
if !success {
// if not, last resort: add only the missing files and do it in 2 batches
t.reinsertTorrent(torrentID, missingFiles1, false)
t.reinsertTorrent(torrentID, missingFiles2, false)
}
}
func getMissingFiles(start, end int, files []File) string {
var missingFiles string
for i := start; i < end; i++ {
if files[i].File.Selected == 1 && files[i].ID != 0 && files[i].Link == "" {
missingFiles += fmt.Sprintf("%d,", files[i].ID)
}
}
if len(missingFiles) > 0 {
missingFiles = missingFiles[:len(missingFiles)-1]
}
return missingFiles
}