Repair optimizations
This commit is contained in:
@@ -125,32 +125,39 @@ func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, p
|
|||||||
if t.cfg.EnableRepair() {
|
if t.cfg.EnableRepair() {
|
||||||
t.log.Info("Checking for torrents to repair")
|
t.log.Info("Checking for torrents to repair")
|
||||||
t.repairAll()
|
t.repairAll()
|
||||||
|
t.log.Info("Finished checking for torrents to repair")
|
||||||
}
|
}
|
||||||
go t.startRefreshJob()
|
go t.startRefreshJob()
|
||||||
|
|
||||||
return t
|
return t
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *TorrentManager) mergeToMain(t1, t2 *Torrent) *Torrent {
|
func (t *TorrentManager) mergeToMain(mainTorrent, torrentToMerge *Torrent) *Torrent {
|
||||||
mainTorrent := t1
|
|
||||||
|
|
||||||
// Merge SelectedFiles - itercb accesses a different copy of the selectedfiles map
|
// Merge SelectedFiles - itercb accesses a different copy of the selectedfiles map
|
||||||
t2.SelectedFiles.IterCb(func(key string, file *File) {
|
torrentToMerge.SelectedFiles.IterCb(func(filepath string, fileToMerge *File) {
|
||||||
// see if it already exists in the main torrent
|
// see if it already exists in the main torrent
|
||||||
if mainFile, ok := mainTorrent.SelectedFiles.Get(key); !ok {
|
if mainFile, ok := mainTorrent.SelectedFiles.Get(filepath); !ok {
|
||||||
mainTorrent.SelectedFiles.Set(key, file)
|
// if it doesn't exist in the main torrent, add it
|
||||||
} else if file.Link != "" && mainFile.Link == "" {
|
mainTorrent.SelectedFiles.Set(filepath, fileToMerge)
|
||||||
// if it exists, but the link is empty, then we can update it
|
} else {
|
||||||
mainTorrent.SelectedFiles.Set(key, file)
|
// if it exists, compare the LatestAdded property and the link
|
||||||
|
if mainTorrent.LatestAdded < torrentToMerge.LatestAdded && fileToMerge.Link != "" {
|
||||||
|
// if torrentToMerge is more recent and its file has a link, update the main torrent's file
|
||||||
|
mainTorrent.SelectedFiles.Set(filepath, fileToMerge)
|
||||||
|
} else if mainFile.Link == "" && fileToMerge.Link != "" {
|
||||||
|
// if the main torrent's file link is empty and t2's file has a link, even if it's older, update it
|
||||||
|
mainTorrent.SelectedFiles.Set(filepath, fileToMerge)
|
||||||
|
}
|
||||||
|
// else do nothing, the main torrent's file is more recent or has a valid link
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
// Merge Instances
|
// Merge Instances
|
||||||
mainTorrent.Instances = append(t1.Instances, t2.Instances...)
|
mainTorrent.Instances = append(mainTorrent.Instances, torrentToMerge.Instances...)
|
||||||
|
|
||||||
// LatestAdded
|
// LatestAdded
|
||||||
if t1.LatestAdded < t2.LatestAdded {
|
if mainTorrent.LatestAdded < torrentToMerge.LatestAdded {
|
||||||
mainTorrent.LatestAdded = t2.LatestAdded
|
mainTorrent.LatestAdded = torrentToMerge.LatestAdded
|
||||||
}
|
}
|
||||||
|
|
||||||
return mainTorrent
|
return mainTorrent
|
||||||
@@ -320,6 +327,7 @@ func (t *TorrentManager) startRefreshJob() {
|
|||||||
if t.cfg.EnableRepair() {
|
if t.cfg.EnableRepair() {
|
||||||
t.log.Info("Checking for torrents to repair")
|
t.log.Info("Checking for torrents to repair")
|
||||||
t.repairAll()
|
t.repairAll()
|
||||||
|
t.log.Info("Finished checking for torrents to repair")
|
||||||
}
|
}
|
||||||
go OnLibraryUpdateHook(t.cfg)
|
go OnLibraryUpdateHook(t.cfg)
|
||||||
}
|
}
|
||||||
@@ -534,17 +542,19 @@ func (t *TorrentManager) repairAll() {
|
|||||||
allTorrents, _ := t.DirectoryMap.Get(ALL_TORRENTS)
|
allTorrents, _ := t.DirectoryMap.Get(ALL_TORRENTS)
|
||||||
allTorrents.IterCb(func(_ string, torrent *Torrent) {
|
allTorrents.IterCb(func(_ string, torrent *Torrent) {
|
||||||
if torrent.InProgress() {
|
if torrent.InProgress() {
|
||||||
|
t.log.Debugf("Skipping %s for repairs because it is in progress", torrent.AccessKey)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
forRepair := false
|
forRepair := false
|
||||||
torrent.SelectedFiles.IterCb(func(_ string, file *File) {
|
torrent.SelectedFiles.IterCb(func(_ string, file *File) {
|
||||||
if file.Link == "" {
|
if file.Link == "@" || file.ForRepair {
|
||||||
|
t.log.Debugf("Found a file to repair for torrent %s", torrent.AccessKey)
|
||||||
forRepair = true
|
forRepair = true
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
if forRepair {
|
if forRepair {
|
||||||
t.log.Infof("Repairing %s", torrent.AccessKey)
|
t.log.Infof("Repairing %s", torrent.AccessKey)
|
||||||
go t.Repair(torrent.AccessKey)
|
t.Repair(torrent.AccessKey)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@@ -717,7 +727,7 @@ func (t *TorrentManager) canCapacityHandle() bool {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if count.DownloadingCount < count.MaxNumberOfTorrents {
|
if count.DownloadingCount < count.MaxNumberOfTorrents {
|
||||||
t.log.Infof("We can still add a new torrent, we have capacity for %d more", count.MaxNumberOfTorrents-count.DownloadingCount)
|
// t.log.Infof("We can still add a new torrent, we have capacity for %d more", count.MaxNumberOfTorrents-count.DownloadingCount)
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -27,4 +27,5 @@ type File struct {
|
|||||||
Added string
|
Added string
|
||||||
Link string
|
Link string
|
||||||
ZurgFS uint64
|
ZurgFS uint64
|
||||||
|
ForRepair bool
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -81,7 +81,8 @@ func HandleGetRequest(w http.ResponseWriter, r *http.Request, t *intTor.TorrentM
|
|||||||
resp := t.UnrestrictUntilOk(link)
|
resp := t.UnrestrictUntilOk(link)
|
||||||
if resp == nil {
|
if resp == nil {
|
||||||
log.Warnf("File %s is no longer available", file.Path)
|
log.Warnf("File %s is no longer available", file.Path)
|
||||||
file.Link = ""
|
file.Link = "@"
|
||||||
|
file.ForRepair = true
|
||||||
t.SetChecksum("") // force a recheck
|
t.SetChecksum("") // force a recheck
|
||||||
streamErrorVideo("https://www.youtube.com/watch?v=gea_FJrtFVA", w, r, t, c, log)
|
streamErrorVideo("https://www.youtube.com/watch?v=gea_FJrtFVA", w, r, t, c, log)
|
||||||
} else {
|
} else {
|
||||||
@@ -126,7 +127,8 @@ func streamFileToResponse(file *intTor.File, url string, w http.ResponseWriter,
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
if file != nil {
|
if file != nil {
|
||||||
log.Warnf("Cannot download file %s: %v", file.Path, err)
|
log.Warnf("Cannot download file %s: %v", file.Path, err)
|
||||||
file.Link = ""
|
file.Link = "@"
|
||||||
|
file.ForRepair = true
|
||||||
torMgr.SetChecksum("") // force a recheck
|
torMgr.SetChecksum("") // force a recheck
|
||||||
}
|
}
|
||||||
streamErrorVideo("https://www.youtube.com/watch?v=FSSd8cponAA", w, r, torMgr, cfg, log)
|
streamErrorVideo("https://www.youtube.com/watch?v=FSSd8cponAA", w, r, torMgr, cfg, log)
|
||||||
@@ -137,7 +139,8 @@ func streamFileToResponse(file *intTor.File, url string, w http.ResponseWriter,
|
|||||||
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusPartialContent {
|
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusPartialContent {
|
||||||
if file != nil {
|
if file != nil {
|
||||||
log.Warnf("Received a %s status code for file %s", resp.Status, file.Path)
|
log.Warnf("Received a %s status code for file %s", resp.Status, file.Path)
|
||||||
file.Link = ""
|
file.Link = "@"
|
||||||
|
file.ForRepair = true
|
||||||
torMgr.SetChecksum("") // force a recheck
|
torMgr.SetChecksum("") // force a recheck
|
||||||
}
|
}
|
||||||
streamErrorVideo("https://www.youtube.com/watch?v=BcseUxviVqE", w, r, torMgr, cfg, log)
|
streamErrorVideo("https://www.youtube.com/watch?v=BcseUxviVqE", w, r, torMgr, cfg, log)
|
||||||
|
|||||||
@@ -1,7 +1,6 @@
|
|||||||
package realdebrid
|
package realdebrid
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"math"
|
|
||||||
"net/http"
|
"net/http"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
@@ -20,8 +19,8 @@ func (rd *RealDebrid) UnrestrictUntilOk(link string) *UnrestrictResponse {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func retryUntilOk[T any](fn func() (T, error)) T {
|
func retryUntilOk[T any](fn func() (T, error)) T {
|
||||||
const initialDelay = 1 * time.Second
|
// const initialDelay = 1 * time.Second
|
||||||
const maxDelay = 128 * time.Second
|
// const maxDelay = 128 * time.Second
|
||||||
const maxRetries = 2 // Maximum retries for non-429 errors
|
const maxRetries = 2 // Maximum retries for non-429 errors
|
||||||
|
|
||||||
var result T
|
var result T
|
||||||
@@ -34,7 +33,9 @@ func retryUntilOk[T any](fn func() (T, error)) T {
|
|||||||
return result
|
return result
|
||||||
}
|
}
|
||||||
|
|
||||||
// If error is 429, we retry indefinitely, hence no condition to break the loop.
|
if strings.Contains(err.Error(), "first byte") || strings.Contains(err.Error(), "expired") {
|
||||||
|
return result
|
||||||
|
}
|
||||||
if !strings.Contains(err.Error(), "429") {
|
if !strings.Contains(err.Error(), "429") {
|
||||||
retryCount++
|
retryCount++
|
||||||
if retryCount >= maxRetries {
|
if retryCount >= maxRetries {
|
||||||
@@ -44,7 +45,8 @@ func retryUntilOk[T any](fn func() (T, error)) T {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Calculate delay with exponential backoff
|
// Calculate delay with exponential backoff
|
||||||
delay := time.Duration(math.Min(float64(initialDelay)*math.Pow(2, float64(retryCount)), float64(maxDelay)))
|
// delay := time.Duration(math.Min(float64(initialDelay)*math.Pow(2, float64(retryCount)), float64(maxDelay)))
|
||||||
|
delay := 500 * time.Millisecond
|
||||||
time.Sleep(delay)
|
time.Sleep(delay)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -85,7 +87,7 @@ func canFetchFirstByte(url string) bool {
|
|||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
time.Sleep(1 * time.Second) // Add a delay before the next retry
|
time.Sleep(500 * time.Millisecond) // Add a delay before the next retry
|
||||||
}
|
}
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user