Catch bw limit errors and prevent repair loops

This commit is contained in:
Ben Adrian Sarmiento
2024-06-24 00:38:57 +02:00
parent a0c13af94b
commit 449c0f71cf
5 changed files with 139 additions and 83 deletions

View File

@@ -2,6 +2,7 @@ package torrent
import ( import (
"context" "context"
"fmt"
"io" "io"
"os" "os"
"path/filepath" "path/filepath"
@@ -11,6 +12,7 @@ import (
"github.com/debridmediamanager/zurg/internal/config" "github.com/debridmediamanager/zurg/internal/config"
"github.com/debridmediamanager/zurg/internal/fs" "github.com/debridmediamanager/zurg/internal/fs"
"github.com/debridmediamanager/zurg/pkg/http"
"github.com/debridmediamanager/zurg/pkg/logutil" "github.com/debridmediamanager/zurg/pkg/logutil"
"github.com/debridmediamanager/zurg/pkg/realdebrid" "github.com/debridmediamanager/zurg/pkg/realdebrid"
"github.com/debridmediamanager/zurg/pkg/utils" "github.com/debridmediamanager/zurg/pkg/utils"
@@ -138,33 +140,34 @@ func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, w
} }
// proxy function // proxy function
func (t *TorrentManager) UnrestrictLink(link string, verifyURL bool) *realdebrid.Download { func (t *TorrentManager) UnrestrictFile(file *File, checkFirstByte bool) (*realdebrid.Download, error) {
if file.State.Is("deleted_file") {
return nil, fmt.Errorf("file %s has been deleted", file.Path)
} else if file.State.Is("broken_file") {
return nil, fmt.Errorf("file %s is broken", file.Path)
}
return t.UnrestrictLink(file.Link, checkFirstByte)
}
func (t *TorrentManager) UnrestrictLink(link string, verifyURL bool) (*realdebrid.Download, error) {
isRealDebrid := strings.HasPrefix(link, "https://real-debrid.com/d/") isRealDebrid := strings.HasPrefix(link, "https://real-debrid.com/d/")
if isRealDebrid && t.UnrestrictMap.Has(link[0:39]) { if isRealDebrid && t.UnrestrictMap.Has(link[0:39]) {
ret, _ := t.UnrestrictMap.Get(link[0:39]) ret, _ := t.UnrestrictMap.Get(link[0:39])
return ret return ret, nil
} else if !isRealDebrid && t.UnrestrictMap.Has(link) { } else if !isRealDebrid && t.UnrestrictMap.Has(link) {
ret, _ := t.UnrestrictMap.Get(link) ret, _ := t.UnrestrictMap.Get(link)
return ret return ret, nil
} }
ret, err := t.api.UnrestrictLink(link, verifyURL) ret, err := t.api.UnrestrictLink(link, verifyURL)
if err != nil { if err != nil {
t.log.Warnf("Cannot unrestrict link %s: %v", link, err) return nil, err
return nil
} }
if isRealDebrid { if isRealDebrid {
t.UnrestrictMap.Set(ret.Link[0:39], ret) t.UnrestrictMap.Set(ret.Link[0:39], ret)
} else { } else {
t.UnrestrictMap.Set(ret.Link, ret) t.UnrestrictMap.Set(ret.Link, ret)
} }
return ret return ret, nil
}
func (t *TorrentManager) UnrestrictFile(file *File, checkFirstByte bool) *realdebrid.Download {
if !file.State.Is("ok_file") {
return nil
}
return t.UnrestrictLink(file.Link, checkFirstByte)
} }
func (t *TorrentManager) GetKey(torrent *Torrent) string { func (t *TorrentManager) GetKey(torrent *Torrent) string {
@@ -228,14 +231,22 @@ func (t *TorrentManager) writeTorrentToFile(torrent *Torrent) {
// t.log.Debugf("Saved torrent %s to file", t.GetKey(torrent)) // t.log.Debugf("Saved torrent %s to file", t.GetKey(torrent))
} }
func (t *TorrentManager) applyMediaInfoDetails(torrent *Torrent) { func (t *TorrentManager) applyMediaInfoDetails(torrent *Torrent) error {
changesApplied := false changesApplied := false
bwLimitReached := false
torrent.SelectedFiles.IterCb(func(_ string, file *File) { torrent.SelectedFiles.IterCb(func(_ string, file *File) {
if bwLimitReached {
return
}
isPlayable := utils.IsVideo(file.Path) || t.IsPlayable(file.Path) isPlayable := utils.IsVideo(file.Path) || t.IsPlayable(file.Path)
if file.MediaInfo != nil || !file.State.Is("ok_file") || !isPlayable { if file.MediaInfo != nil || !file.State.Is("ok_file") || !isPlayable {
return return
} }
unrestrict := t.UnrestrictFile(file, true) unrestrict, err := t.UnrestrictFile(file, true)
if dlErr, ok := err.(*http.DownloadErrorResponse); ok && dlErr.Message == "bytes_limit_reached" {
bwLimitReached = true
return
}
if unrestrict == nil { if unrestrict == nil {
file.State.Event(context.Background(), "break_file") file.State.Event(context.Background(), "break_file")
t.EnqueueForRepair(torrent) t.EnqueueForRepair(torrent)
@@ -253,9 +264,13 @@ func (t *TorrentManager) applyMediaInfoDetails(torrent *Torrent) {
changesApplied = true changesApplied = true
}) })
if changesApplied { if changesApplied {
t.assignDirectory(torrent, true)
t.writeTorrentToFile(torrent) t.writeTorrentToFile(torrent)
} }
if bwLimitReached {
t.log.Warnf("Your account has reached the bandwidth limit, cannot apply media info details to the rest of the files")
return fmt.Errorf("bandwidth limit reached")
}
return nil
} }
func (t *TorrentManager) readTorrentFromFile(filePath string) *Torrent { func (t *TorrentManager) readTorrentFromFile(filePath string) *Torrent {
@@ -437,11 +452,22 @@ func (t *TorrentManager) analyzeAllTorrents() {
totalCount := allTorrents.Count() totalCount := allTorrents.Count()
t.log.Infof("Applying media info details to all %d torrents", totalCount) t.log.Infof("Applying media info details to all %d torrents", totalCount)
idx := 0 idx := 0
skipTheRest := false
allTorrents.IterCb(func(_ string, torrent *Torrent) { allTorrents.IterCb(func(_ string, torrent *Torrent) {
t.applyMediaInfoDetails(torrent) if skipTheRest {
return
}
err := t.applyMediaInfoDetails(torrent)
if err != nil && err.Error() == "bandwidth limit reached" {
skipTheRest = true
return
}
idx++ idx++
t.log.Debugf("Applied media info details to torrent %s (%d/%d)", t.GetKey(torrent), idx, totalCount) t.log.Debugf("Applied media info details to torrent %s (%d/%d)", t.GetKey(torrent), idx, totalCount)
}) })
if skipTheRest {
t.log.Warnf("Bandwidth limit reached, skipped the rest of the torrents")
}
} }
// StartMediaAnalysisJob: permanent job for analyzing media info (triggered by the user) // StartMediaAnalysisJob: permanent job for analyzing media info (triggered by the user)

View File

@@ -9,6 +9,7 @@ import (
"time" "time"
"github.com/debridmediamanager/zurg/internal/config" "github.com/debridmediamanager/zurg/internal/config"
"github.com/debridmediamanager/zurg/pkg/http"
"github.com/debridmediamanager/zurg/pkg/realdebrid" "github.com/debridmediamanager/zurg/pkg/realdebrid"
"github.com/debridmediamanager/zurg/pkg/utils" "github.com/debridmediamanager/zurg/pkg/utils"
mapset "github.com/deckarep/golang-set/v2" mapset "github.com/deckarep/golang-set/v2"
@@ -189,16 +190,30 @@ func (t *TorrentManager) repair(torrent *Torrent, wg *sync.WaitGroup) {
return return
} }
bwLimitReached := false
// check for other broken file // check for other broken file
torrent.SelectedFiles.IterCb(func(_ string, file *File) { torrent.SelectedFiles.IterCb(func(_ string, file *File) {
if bwLimitReached {
return
}
if !file.State.Is("ok_file") { if !file.State.Is("ok_file") {
return return
} }
if t.UnrestrictFile(file, true) == nil { _, err := t.UnrestrictFile(file, true)
if dlErr, ok := err.(*http.DownloadErrorResponse); ok && dlErr.Message == "bytes_limit_reached" {
bwLimitReached = true
return
}
if err != nil {
file.State.Event(context.Background(), "break_file") file.State.Event(context.Background(), "break_file")
} }
}) })
if bwLimitReached {
t.repairLog.Warnf("Your account has reached the bandwidth limit, cannot continue repairing torrent %s", t.GetKey(torrent))
return
}
brokenFiles, allBroken := t.getBrokenFiles(torrent) brokenFiles, allBroken := t.getBrokenFiles(torrent)
// check if broken files are playable // check if broken files are playable
@@ -240,7 +255,12 @@ func (t *TorrentManager) repair(torrent *Torrent, wg *sync.WaitGroup) {
info, err := t.redownloadTorrent(torrent, []string{}) // reinsert the whole torrent, passing empty selection info, err := t.redownloadTorrent(torrent, []string{}) // reinsert the whole torrent, passing empty selection
if info != nil && info.Progress == 100 { if info != nil && info.Progress == 100 {
if !t.isStillBroken(info, brokenFiles) { err = t.checkIfBroken(info, brokenFiles)
if dlErr, ok := err.(*http.DownloadErrorResponse); ok && dlErr.Message == "bytes_limit_reached" {
t.repairLog.Warnf("Your account has reached the bandwidth limit, cannot continue repairing torrent %s", t.GetKey(torrent))
return
}
if err == nil {
// delete the torrents it replaced // delete the torrents it replaced
oldDownloadedIDs.Each(func(torrentID string) bool { oldDownloadedIDs.Each(func(torrentID string) bool {
t.DeleteByID(torrentID) t.DeleteByID(torrentID)
@@ -249,10 +269,7 @@ func (t *TorrentManager) repair(torrent *Torrent, wg *sync.WaitGroup) {
t.repairLog.Infof("Successfully repaired torrent %s by redownloading whole torrent", t.GetKey(torrent)) t.repairLog.Infof("Successfully repaired torrent %s by redownloading whole torrent", t.GetKey(torrent))
return return
} }
// if it's still broken, let's delete the newly downloaded torrent
t.DeleteByID(info.ID) t.DeleteByID(info.ID)
err = fmt.Errorf("links are still broken")
} else if info != nil && info.Progress != 100 { } else if info != nil && info.Progress != 100 {
// it's faster to download just the broken files, so let's delete the newly downloaded torrent // it's faster to download just the broken files, so let's delete the newly downloaded torrent
@@ -322,9 +339,14 @@ func (t *TorrentManager) assignLinks(torrent *Torrent) bool {
newUnassignedLinks := cmap.New[*realdebrid.Download]() newUnassignedLinks := cmap.New[*realdebrid.Download]()
var assignedLinks []string var assignedLinks []string
bwLimitReached := false
torrent.UnassignedLinks.Clone().Each(func(link string) bool { torrent.UnassignedLinks.Clone().Each(func(link string) bool {
// unrestrict each unassigned link that was filled out during torrent init // unrestrict each unassigned link that was filled out during torrent init
unrestrict := t.UnrestrictLink(link, true) unrestrict, err := t.UnrestrictLink(link, true)
if dlErr, ok := err.(*http.DownloadErrorResponse); ok && dlErr.Message == "bytes_limit_reached" {
bwLimitReached = true
return true
}
if unrestrict == nil { if unrestrict == nil {
expiredCount++ expiredCount++
return false // next unassigned link return false // next unassigned link
@@ -376,6 +398,11 @@ func (t *TorrentManager) assignLinks(torrent *Torrent) bool {
return false // next unassigned link return false // next unassigned link
}) })
if bwLimitReached {
t.repairLog.Warnf("Your account has reached the bandwidth limit, cannot continue assigning links to torrent %s", t.GetKey(torrent))
return false
}
// empty/reset the unassigned links as we have assigned them already // empty/reset the unassigned links as we have assigned them already
if unassignedTotal > 0 { if unassignedTotal > 0 {
torrent.UnassignedLinks = mapset.NewSet[string]() torrent.UnassignedLinks = mapset.NewSet[string]()
@@ -617,9 +644,9 @@ func (t *TorrentManager) getBrokenFiles(torrent *Torrent) ([]*File, bool) {
return brokenFiles, allBroken return brokenFiles, allBroken
} }
// isStillBroken checks if the torrent is still broken // checkIfBroken checks if the torrent is still broken
// if it's not broken anymore, it will assign the links to the files // if it's not broken anymore, it will assign the links to the files
func (t *TorrentManager) isStillBroken(info *realdebrid.TorrentInfo, brokenFiles []*File) bool { func (t *TorrentManager) checkIfBroken(info *realdebrid.TorrentInfo, brokenFiles []*File) error {
var selectedFiles []*File var selectedFiles []*File
for _, file := range info.Files { for _, file := range info.Files {
if file.Selected == 0 { if file.Selected == 0 {
@@ -633,7 +660,7 @@ func (t *TorrentManager) isStillBroken(info *realdebrid.TorrentInfo, brokenFiles
}) })
} }
if len(selectedFiles) != len(info.Links) { if len(selectedFiles) != len(info.Links) {
return true return fmt.Errorf("number of selected files and links do not match")
} }
for i, file := range selectedFiles { for i, file := range selectedFiles {
@@ -652,12 +679,15 @@ func (t *TorrentManager) isStillBroken(info *realdebrid.TorrentInfo, brokenFiles
// check if the broken files can now be unrestricted and downloaded // check if the broken files can now be unrestricted and downloaded
for _, oldFile := range brokenFiles { for _, oldFile := range brokenFiles {
for idx, newFile := range selectedFiles { for idx, newFile := range selectedFiles {
if oldFile.ID == newFile.ID && t.UnrestrictFile(selectedFiles[idx], true) == nil { if oldFile.ID != newFile.ID {
return true continue
}
if _, err := t.UnrestrictFile(selectedFiles[idx], true); err != nil {
return err
} }
} }
} }
return false return nil
} }
func (t *TorrentManager) ResetRepairState() { func (t *TorrentManager) ResetRepairState() {

View File

@@ -61,35 +61,36 @@ func (dl *Downloader) DownloadFile(
return return
} }
unrestrict := torMgr.UnrestrictFile(file, cfg.ShouldServeFromRclone()) unrestrict, err := torMgr.UnrestrictFile(file, cfg.ShouldServeFromRclone())
if unrestrict == nil { if dlErr, ok := err.(*zurghttp.DownloadErrorResponse); ok && dlErr.Message == "bytes_limit_reached" {
log.Warnf("File %s cannot be unrestricted (link=%s)", fileName, file.Link) log.Warnf("Your account has reached the bandwidth limit, please try again after 12AM CET")
if err := file.State.Event(context.Background(), "break_file"); err != nil { http.Error(resp, "File is not available", http.StatusLocked)
log.Errorf("File %s is stale: %v", fileName, err) return
http.Error(resp, "File is stale, please try again", http.StatusLocked) }
return if err != nil {
log.Errorf("Error unrestricting file %s: %v", file.Path, err)
if file.State.Event(context.Background(), "break_file") == nil {
torMgr.EnqueueForRepair(torrent)
} }
torMgr.EnqueueForRepair(torrent)
http.Error(resp, "File is not available", http.StatusNotFound) http.Error(resp, "File is not available", http.StatusNotFound)
return return
} else { }
if unrestrict.Filesize != file.Bytes {
// this is possible if there's only 1 streamable file in the torrent if unrestrict.Filesize != file.Bytes {
// and then suddenly it's a rar file // this is possible if there's only 1 streamable file in the torrent
actualExt := strings.ToLower(filepath.Ext(unrestrict.Filename)) // and then suddenly it's a rar file
expectedExt := strings.ToLower(filepath.Ext(fileName)) actualExt := strings.ToLower(filepath.Ext(unrestrict.Filename))
if actualExt != expectedExt && unrestrict.Streamable != 1 { expectedExt := strings.ToLower(filepath.Ext(fileName))
log.Warnf("File was changed and is not streamable: %s and %s (link=%s)", fileName, unrestrict.Filename, unrestrict.Link) if actualExt != expectedExt && unrestrict.Streamable != 1 {
} else { log.Warnf("File was changed and is not streamable: %s and %s (link=%s)", fileName, unrestrict.Filename, unrestrict.Link)
log.Warnf("Filename mismatch: %s and %s", fileName, unrestrict.Filename)
}
}
if cfg.ShouldServeFromRclone() {
redirect(resp, req, unrestrict.Download)
} else { } else {
dl.streamFileToResponse(torrent, file, unrestrict, resp, req, torMgr, cfg, log) log.Warnf("Filename mismatch: %s and %s", fileName, unrestrict.Filename)
} }
return }
if cfg.ShouldServeFromRclone() {
redirect(resp, req, unrestrict.Download)
} else {
dl.streamFileToResponse(torrent, file, unrestrict, resp, req, torMgr, cfg, log)
} }
} }
@@ -104,18 +105,21 @@ func (dl *Downloader) DownloadLink(
log *logutil.Logger, log *logutil.Logger,
) { ) {
// log.Debugf("Opening file %s (%s)", fileName, link) // log.Debugf("Opening file %s (%s)", fileName, link)
unrestrict := torMgr.UnrestrictLink(link, cfg.ShouldServeFromRclone()) unrestrict, err := torMgr.UnrestrictLink(link, cfg.ShouldServeFromRclone())
if unrestrict == nil { if dlErr, ok := err.(*zurghttp.DownloadErrorResponse); ok && dlErr.Message == "bytes_limit_reached" {
log.Warnf("File %s cannot be unrestricted (link=%s)", fileName, link) log.Warnf("Your account has reached the bandwidth limit, please try again after 12AM CET")
http.Error(resp, "Link is not available", http.StatusLocked)
return
}
if err != nil {
log.Errorf("Error unrestricting link %s: %v", link, err)
http.Error(resp, "File is not available", http.StatusInternalServerError) http.Error(resp, "File is not available", http.StatusInternalServerError)
return return
}
if cfg.ShouldServeFromRclone() {
redirect(resp, req, unrestrict.Download)
} else { } else {
if cfg.ShouldServeFromRclone() { dl.streamFileToResponse(nil, nil, unrestrict, resp, req, torMgr, cfg, log)
redirect(resp, req, unrestrict.Download)
} else {
dl.streamFileToResponse(nil, nil, unrestrict, resp, req, torMgr, cfg, log)
}
return
} }
} }
@@ -149,12 +153,7 @@ func (dl *Downloader) streamFileToResponse(
downloadResp, err := dl.client.Do(dlReq) downloadResp, err := dl.client.Do(dlReq)
if err != nil { if err != nil {
log.Warnf("Cannot download file %s: %v", unrestrict.Download, err) log.Warnf("Cannot download file %s: %v", unrestrict.Download, err)
if file != nil { if file != nil && file.State.Event(context.Background(), "break_file") == nil {
if err := file.State.Event(context.Background(), "break_file"); err != nil {
log.Errorf("File %s is stale: %v", file.Path, err)
http.Error(resp, "File is stale, please try again", http.StatusLocked)
return
}
torMgr.EnqueueForRepair(torrent) torMgr.EnqueueForRepair(torrent)
} }
http.Error(resp, "File is not available", http.StatusNotFound) http.Error(resp, "File is not available", http.StatusNotFound)
@@ -165,12 +164,7 @@ func (dl *Downloader) streamFileToResponse(
// Check if the download was not successful // Check if the download was not successful
if downloadResp.StatusCode != http.StatusOK && downloadResp.StatusCode != http.StatusPartialContent { if downloadResp.StatusCode != http.StatusOK && downloadResp.StatusCode != http.StatusPartialContent {
log.Warnf("Received a %s status code for file %s", downloadResp.Status, unrestrict.Filename) log.Warnf("Received a %s status code for file %s", downloadResp.Status, unrestrict.Filename)
if file != nil { if file != nil && file.State.Event(context.Background(), "break_file") == nil {
if err := file.State.Event(context.Background(), "break_file"); err != nil {
log.Errorf("File %s is stale: %v", file.Path, err)
http.Error(resp, "File is stale, please try again", http.StatusLocked)
return
}
torMgr.EnqueueForRepair(torrent) torMgr.EnqueueForRepair(torrent)
} }
http.Error(resp, "File is not available", http.StatusNotFound) http.Error(resp, "File is not available", http.StatusNotFound)

View File

@@ -293,19 +293,22 @@ func backoffFunc(attempt int) time.Duration {
return time.Duration(backoff) * time.Second return time.Duration(backoff) * time.Second
} }
func (r *HTTPClient) VerifyURL(url string) bool { func (r *HTTPClient) VerifyURL(url string) error {
req, err := http.NewRequest(http.MethodHead, url, nil) req, err := http.NewRequest(http.MethodHead, url, nil)
if err != nil { if err != nil {
return false return err
} }
timeout := time.Duration(r.timeoutSecs) * time.Second timeout := time.Duration(r.timeoutSecs) * time.Second
ctx, cancel := context.WithTimeout(context.Background(), timeout) ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel() defer cancel()
req = req.WithContext(ctx) req = req.WithContext(ctx)
resp, _ := r.Do(req) resp, err := r.Do(req)
if resp != nil { if err != nil {
defer resp.Body.Close() return err
return resp.StatusCode == http.StatusOK
} }
return false defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("unexpected status code: %d", resp.StatusCode)
}
return nil
} }

View File

@@ -113,8 +113,11 @@ func (rd *RealDebrid) UnrestrictLink(link string, verifyDownloadURL bool) (*Down
} }
// will only check for first byte if serving from rclone // will only check for first byte if serving from rclone
if verifyDownloadURL && !rd.downloadClient.VerifyURL(response.Download) { if verifyDownloadURL {
return nil, fmt.Errorf("download URL verification failed: %s", response.Download) err := rd.downloadClient.VerifyURL(response.Download)
if err != nil {
return nil, err
}
} }
// rd.log.Debugf("Unrestricted link %s into %s", link, response.Download) // rd.log.Debugf("Unrestricted link %s into %s", link, response.Download)