Fix repair :)

This commit is contained in:
Ben Sarmiento
2023-11-18 23:39:42 +01:00
parent 34b1a19478
commit ff93baa6c1
7 changed files with 264 additions and 348 deletions

View File

@@ -20,14 +20,17 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
) )
const ALL_TORRENTS = "__all__"
type TorrentManager struct { type TorrentManager struct {
cfg config.ConfigInterface
DirectoryMap cmap.ConcurrentMap[string, cmap.ConcurrentMap[string, *Torrent]] // directory -> accessKey -> Torrent DirectoryMap cmap.ConcurrentMap[string, cmap.ConcurrentMap[string, *Torrent]] // directory -> accessKey -> Torrent
requiredVersion string
checksum string checksum string
requiredVersion string
cfg config.ConfigInterface
api *realdebrid.RealDebrid api *realdebrid.RealDebrid
antsPool *ants.Pool antsPool *ants.Pool
log *zap.SugaredLogger log *zap.SugaredLogger
mu *sync.Mutex
} }
// NewTorrentManager creates a new torrent manager // NewTorrentManager creates a new torrent manager
@@ -41,6 +44,7 @@ func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, p
api: api, api: api,
antsPool: p, antsPool: p,
log: logutil.NewLogger().Named("manager"), log: logutil.NewLogger().Named("manager"),
mu: &sync.Mutex{},
} }
// create special directory // create special directory
@@ -72,7 +76,7 @@ func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, p
noInfoCount := 0 noInfoCount := 0
allCt := 0 allCt := 0
allTorrents, _ := t.DirectoryMap.Get("__all__") allTorrents, _ := t.DirectoryMap.Get(ALL_TORRENTS)
for info := range torrentsChan { for info := range torrentsChan {
allCt++ allCt++
if info == nil { if info == nil {
@@ -116,11 +120,11 @@ func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, p
t.log.Infof("Compiled into %d torrents, %d were missing info", allTorrents.Count(), noInfoCount) t.log.Infof("Compiled into %d torrents, %d were missing info", allTorrents.Count(), noInfoCount)
t.checksum = t.getChecksum() t.SetChecksum(t.getChecksum())
// if t.config.EnableRepair() { if t.cfg.EnableRepair() {
// go t.repairAll() go t.repairAll()
// } }
go t.startRefreshJob() go t.startRefreshJob()
return t return t
@@ -148,17 +152,6 @@ func (t *TorrentManager) mergeToMain(t1, t2 *Torrent) *Torrent {
mainTorrent.LatestAdded = t2.LatestAdded mainTorrent.LatestAdded = t2.LatestAdded
} }
// InProgress - if one of the instances is in progress, then the whole torrent is in progress
mainTorrent.InProgress = false
for _, instance := range mainTorrent.Instances {
if instance.Progress != 100 {
mainTorrent.InProgress = true
}
if instance.ForRepair {
mainTorrent.ForRepair = true
}
}
return mainTorrent return mainTorrent
} }
@@ -173,6 +166,12 @@ type torrentsResponse struct {
totalCount int totalCount int
} }
func (t *TorrentManager) SetChecksum(checksum string) {
t.mu.Lock()
t.checksum = checksum
t.mu.Unlock()
}
// generates a checksum based on the number of torrents, the first torrent id and the number of active torrents // generates a checksum based on the number of torrents, the first torrent id and the number of active torrents
func (t *TorrentManager) getChecksum() string { func (t *TorrentManager) getChecksum() string {
torrentsChan := make(chan torrentsResponse, 1) torrentsChan := make(chan torrentsResponse, 1)
@@ -257,7 +256,7 @@ func (t *TorrentManager) startRefreshJob() {
t.log.Infof("Fetched info for %d torrents", len(newTorrents)) t.log.Infof("Fetched info for %d torrents", len(newTorrents))
noInfoCount := 0 noInfoCount := 0
allTorrents, _ := t.DirectoryMap.Get("__all__") allTorrents, _ := t.DirectoryMap.Get(ALL_TORRENTS)
retain := make(map[string]bool) retain := make(map[string]bool)
for info := range torrentsChan { for info := range torrentsChan {
if info == nil { if info == nil {
@@ -309,17 +308,17 @@ func (t *TorrentManager) startRefreshJob() {
t.DirectoryMap.IterCb(func(_ string, torrents cmap.ConcurrentMap[string, *Torrent]) { t.DirectoryMap.IterCb(func(_ string, torrents cmap.ConcurrentMap[string, *Torrent]) {
torrents.Remove(oldAccessKey) torrents.Remove(oldAccessKey)
}) })
fmt.Printf("Deleted torrent: %s\n", oldAccessKey) t.log.Infof("Deleted torrent: %s\n", oldAccessKey)
} }
} }
t.log.Infof("Compiled into %d torrents, %d were missing info", allTorrents.Count(), noInfoCount) t.log.Infof("Compiled into %d torrents, %d were missing info", allTorrents.Count(), noInfoCount)
t.checksum = t.getChecksum() t.SetChecksum(t.getChecksum())
// if t.config.EnableRepair() { if t.cfg.EnableRepair() {
// go t.repairAll() go t.repairAll()
// } }
go OnLibraryUpdateHook(t.cfg) go OnLibraryUpdateHook(t.cfg)
} }
} }
@@ -352,7 +351,6 @@ func (t *TorrentManager) getMoreInfo(rdTorrent realdebrid.Torrent) *Torrent {
var selectedFiles []*File var selectedFiles []*File
streamableCount := 0 streamableCount := 0
// if some Links are empty, we need to repair it // if some Links are empty, we need to repair it
forRepair := false
for _, file := range info.Files { for _, file := range info.Files {
if isStreamable(file.Path) { if isStreamable(file.Path) {
streamableCount++ streamableCount++
@@ -376,34 +374,22 @@ func (t *TorrentManager) getMoreInfo(rdTorrent realdebrid.Torrent) *Torrent {
t.log.Warnf("Torrent id=%s %s is unplayable; it is always returning a rar file (it will no longer show up in your directories)", info.ID, info.Name) t.log.Warnf("Torrent id=%s %s is unplayable; it is always returning a rar file (it will no longer show up in your directories)", info.ID, info.Name)
// t.log.Debugf("You can try fixing it yourself magnet:?xt=urn:btih:%s", info.Hash) // t.log.Debugf("You can try fixing it yourself magnet:?xt=urn:btih:%s", info.Hash)
return nil return nil
} else { } else if streamableCount == 1 {
if streamableCount > 1 && t.cfg.EnableRepair() { t.log.Warnf("Torrent id=%s %s is unplayable; the lone streamable link has expired (it will no longer show up in your directories)", info.ID, info.Name)
// case for repair 1: it's missing some links (or all links) // t.log.Debugf("You can try fixing it yourself magnet:?xt=urn:btih:%s", info.Hash)
// if we download it as is, we might get the same file over and over again return nil
// so we need to redownload it with other files selected
// that is why we check if there are other streamable files
t.log.Infof("Torrent id=%s %s marked for repair", info.ID, info.Name)
forRepair = true
} else if streamableCount == 1 {
t.log.Warnf("Torrent id=%s %s is unplayable; the lone streamable link has expired (it will no longer show up in your directories)", info.ID, info.Name)
// t.log.Debugf("You can try fixing it yourself magnet:?xt=urn:btih:%s", info.Hash)
return nil
}
} }
} else if len(selectedFiles) == len(info.Links) { } else if len(selectedFiles) == len(info.Links) {
// all links are still intact! good! // all links are still intact! good!
// side note: iteration works!
for i, file := range selectedFiles { for i, file := range selectedFiles {
file.Link = info.Links[i] file.Link = info.Links[i]
i++ i++
} }
} }
info.ForRepair = forRepair
torrent := Torrent{ torrent := Torrent{
AccessKey: t.getName(info.Name, info.OriginalName), AccessKey: t.getName(info.Name, info.OriginalName),
LatestAdded: info.Added, LatestAdded: info.Added,
InProgress: info.Progress != 100,
Instances: []realdebrid.TorrentInfo{*info}, Instances: []realdebrid.TorrentInfo{*info},
} }
torrent.SelectedFiles = cmap.New[*File]() torrent.SelectedFiles = cmap.New[*File]()
@@ -536,219 +522,201 @@ func (t *TorrentManager) organizeChaos(links []string, selectedFiles []*File) ([
return selectedFiles, isChaotic return selectedFiles, isChaotic
} }
// func (t *TorrentManager) repairAll() { func (t *TorrentManager) repairAll() {
// t.log.Info("Checking for torrents to repair") t.log.Info("Checking for torrents to repair")
// // side note: iteration works! allTorrents, _ := t.DirectoryMap.Get(ALL_TORRENTS)
// for el := t.TorrentMap.Front(); el != nil; el = el.Next() { allTorrents.IterCb(func(_ string, torrent *Torrent) {
// torrent := el.Value if torrent.InProgress() {
// // do not repair if in progress t.log.Infof("Torrent %s is in progress, cannot repair", torrent.AccessKey)
// if torrent.InProgress { return
// continue }
// } forRepair := false
torrent.SelectedFiles.IterCb(func(_ string, file *File) {
if file.Link == "" {
forRepair = true
}
})
if forRepair {
t.Repair(torrent.AccessKey)
}
})
}
// // do not repair if all files have links func (t *TorrentManager) Repair(accessKey string) {
// forRepair := false if !t.cfg.EnableRepair() {
// for el2 := torrent.SelectedFiles.Front(); el2 != nil; el2 = el2.Next() { t.log.Warn("Repair is disabled; if you do not have other zurg instances running, you should enable repair")
// file := el2.Value return
// if file.Link == "" { }
// forRepair = true
// break
// }
// }
// if !forRepair {
// // if it was marked for repair, unmark it
// torrent.ForRepair = false
// t.mu.Lock()
// t.TorrentMap.Set(torrent.AccessKey, torrent)
// t.mu.Unlock()
// continue
// }
// // when getting info, we mark it for repair if it's missing some links allTorrents, _ := t.DirectoryMap.Get(ALL_TORRENTS)
// if torrent.ForRepair { torrent, _ := allTorrents.Get(accessKey)
// t.log.Infof("Found torrent for repair: %s", torrent.AccessKey) if torrent == nil {
// t.Repair(torrent.AccessKey) t.log.Warnf("Cannot find torrent %s anymore to repair it", accessKey)
// break // only repair the first one for repair and then move on return
// } }
// }
// }
// func (t *TorrentManager) Repair(accessKey string) { if torrent.InProgress() {
// if lastRepair, ok := t.repairMap.Get(accessKey); ok { t.log.Infof("Torrent %s is in progress, cannot repair", torrent.AccessKey)
// if time.Since(lastRepair) < time.Duration(24*time.Hour) { // magic number: 24 hrs return
// return }
// }
// }
// t.mu.Lock()
// t.repairMap.Set(accessKey, time.Now())
// t.mu.Unlock()
// if !t.config.EnableRepair() { proceed := t.canCapacityHandle() // blocks for approx 45 minutes
// t.log.Warn("Repair is disabled; if you do not have other zurg instances running, you should enable repair") if !proceed {
// return t.log.Error("Cannot add more torrents, ignoring repair request")
// } return
}
// torrent, _ := t.TorrentMap.Get(accessKey) // make the file messy
// if torrent == nil { t.log.Infof("Evaluating whole torrent to find the correct files for torrent: %s", torrent.AccessKey)
// t.log.Warnf("Cannot find torrent %s anymore to repair it", accessKey)
// return
// }
// if torrent.InProgress {
// t.log.Infof("Torrent %s is in progress, cannot repair", torrent.AccessKey)
// return
// }
// // check if we can still add more downloads var selectedFiles []*File
// proceed := t.canCapacityHandle() var links []string
// if !proceed { torrent.SelectedFiles.IterCb(func(_ string, file *File) {
// t.log.Error("Cannot add more torrents, ignoring repair request") selectedFiles = append(selectedFiles, file)
// return if file.Link != "" {
// } links = append(links, file.Link)
}
file.Link = "" // empty the links = chaos!
})
selectedFiles, _ = t.organizeChaos(links, selectedFiles)
for _, newFile := range selectedFiles {
if file, exists := torrent.SelectedFiles.Get(filepath.Base(newFile.Path)); exists {
file.Link = newFile.Link
} else {
torrent.SelectedFiles.Set(filepath.Base(newFile.Path), newFile)
}
}
// // make the file messy // first solution: add the same selection, maybe it can be fixed by reinsertion?
// var links []string if t.reinsertTorrent(torrent, "") {
// for el := torrent.SelectedFiles.Front(); el != nil; el = el.Next() { t.log.Infof("Successfully downloaded torrent %s to repair it", torrent.AccessKey)
// file := el.Value return
// if file.Link != "" { }
// links = append(links, file.Link)
// }
// file.Link = ""
// }
// selectedFiles, _ := t.organizeChaos(links, torrent.SelectedFiles)
// torrent.SelectedFiles = selectedFiles
// t.mu.Lock()
// t.TorrentMap.Set(torrent.AccessKey, torrent)
// t.mu.Unlock()
// // first solution: add the same selection, maybe it can be fixed by reinsertion? // if all the selected files are missing but there are other streamable files
// if t.reinsertTorrent(torrent, "") { var missingFiles []File
// t.log.Infof("Successfully downloaded torrent %s to repair it", torrent.AccessKey) torrent.SelectedFiles.IterCb(func(_ string, file *File) {
// return if file.Link == "" {
// } missingFiles = append(missingFiles, *file)
// // if all the selected files are missing but there are other streamable files }
// var missingFiles []File })
// for el := torrent.SelectedFiles.Front(); el != nil; el = el.Next() { if len(missingFiles) > 0 {
// file := el.Value t.log.Infof("Redownloading in multiple batches the %d missing files for torrent %s", len(missingFiles), torrent.AccessKey)
// if file.Link == "" { // if not, last resort: add only the missing files but do it in 2 batches
// missingFiles = append(missingFiles, *file) half := len(missingFiles) / 2
// } missingFiles1 := strings.Join(getFileIDs(missingFiles[:half]), ",")
// } missingFiles2 := strings.Join(getFileIDs(missingFiles[half:]), ",")
// if len(missingFiles) > 0 { if missingFiles1 != "" {
// t.log.Infof("Redownloading %d missing files for torrent %s", len(missingFiles), torrent.AccessKey) t.reinsertTorrent(torrent, missingFiles1)
// // if not, last resort: add only the missing files but do it in 2 batches }
// half := len(missingFiles) / 2 if missingFiles2 != "" {
// missingFiles1 := strings.Join(getFileIDs(missingFiles[:half]), ",") t.reinsertTorrent(torrent, missingFiles2)
// missingFiles2 := strings.Join(getFileIDs(missingFiles[half:]), ",") }
// if missingFiles1 != "" { }
// t.reinsertTorrent(torrent, missingFiles1) }
// }
// if missingFiles2 != "" {
// t.reinsertTorrent(torrent, missingFiles2)
// }
// }
// }
// func (t *TorrentManager) reinsertTorrent(torrent *Torrent, missingFiles string) bool { func (t *TorrentManager) reinsertTorrent(torrent *Torrent, missingFiles string) bool {
// // if missingFiles is not provided, look for missing files // if missingFiles is not provided, missing files means missing links
// if missingFiles == "" { if missingFiles == "" {
// var tmpSelection string tmpSelection := ""
// for el := torrent.SelectedFiles.Front(); el != nil; el = el.Next() { torrent.SelectedFiles.IterCb(func(_ string, file *File) {
// file := el.Value if file.Link == "" {
// tmpSelection += fmt.Sprintf("%d,", file.ID) tmpSelection += fmt.Sprintf("%d,", file.ID)
// } }
// if tmpSelection == "" { })
// return false if tmpSelection == "" {
// } return false
// if len(tmpSelection) > 0 { }
// missingFiles = tmpSelection[:len(tmpSelection)-1] if len(tmpSelection) > 0 {
// } missingFiles = tmpSelection[:len(tmpSelection)-1]
// } }
}
// // redownload torrent // redownload torrent
// resp, err := t.api.AddMagnetHash(torrent.Instances[0].Hash) resp, err := t.api.AddMagnetHash(torrent.Instances[0].Hash)
// if err != nil { if err != nil {
// t.log.Warnf("Cannot redownload torrent: %v", err) t.log.Warnf("Cannot redownload torrent: %v", err)
// return false return false
// } }
// time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
// // select files // select files
// newTorrentID := resp.ID newTorrentID := resp.ID
// err = t.api.SelectTorrentFiles(newTorrentID, missingFiles) err = t.api.SelectTorrentFiles(newTorrentID, missingFiles)
// if err != nil { if err != nil {
// t.log.Warnf("Cannot start redownloading: %v", err) t.log.Warnf("Cannot start redownloading: %v", err)
// t.api.DeleteTorrent(newTorrentID) t.api.DeleteTorrent(newTorrentID)
// return false return false
// } }
// time.Sleep(10 * time.Second) time.Sleep(10 * time.Second)
// // see if the torrent is ready // see if the torrent is ready
// info, err := t.api.GetTorrentInfo(newTorrentID) info, err := t.api.GetTorrentInfo(newTorrentID)
// if err != nil { if err != nil {
// t.log.Warnf("Cannot get info on redownloaded torrent id=%s : %v", newTorrentID, err) t.log.Warnf("Cannot get info on redownloaded torrent id=%s : %v", newTorrentID, err)
// t.api.DeleteTorrent(newTorrentID) t.api.DeleteTorrent(newTorrentID)
// return false return false
// } }
// if info.Status == "magnet_error" || info.Status == "error" || info.Status == "virus" || info.Status == "dead" { if info.Status == "magnet_error" || info.Status == "error" || info.Status == "virus" || info.Status == "dead" {
// t.log.Warnf("The redownloaded torrent id=%s is in error state: %s", newTorrentID, info.Status) t.log.Warnf("The redownloaded torrent id=%s is in error state: %s", newTorrentID, info.Status)
// t.api.DeleteTorrent(newTorrentID) t.api.DeleteTorrent(newTorrentID)
// return false return false
// } }
// if info.Progress != 100 { if info.Progress != 100 {
// t.log.Infof("Torrent id=%s is not cached anymore so we have to wait until completion (this should fix the issue already)", info.ID) t.log.Infof("Torrent id=%s is not cached anymore so we have to wait until completion (this should fix the issue already)", info.ID)
// return true return true
// } }
// missingCount := len(strings.Split(missingFiles, ",")) missingCount := len(strings.Split(missingFiles, ","))
// if len(info.Links) != missingCount { if len(info.Links) != missingCount {
// t.log.Infof("It did not fix the issue for id=%s, only got %d files but we need %d, undoing", info.ID, len(info.Links), missingCount) t.log.Infof("It did not fix the issue for id=%s, only got %d files but we need %d, undoing", info.ID, len(info.Links), missingCount)
// t.api.DeleteTorrent(newTorrentID) t.api.DeleteTorrent(newTorrentID)
// return false return false
// } }
// t.log.Infof("Repair successful id=%s", newTorrentID) t.log.Infof("Repair successful id=%s", newTorrentID)
// return true return true
// } }
// func (t *TorrentManager) canCapacityHandle() bool { func (t *TorrentManager) canCapacityHandle() bool {
// // max waiting time is 45 minutes // max waiting time is 45 minutes
// const maxRetries = 50 const maxRetries = 50
// const baseDelay = 1 * time.Second const baseDelay = 1 * time.Second
// const maxDelay = 60 * time.Second const maxDelay = 60 * time.Second
// retryCount := 0 retryCount := 0
// for { for {
// count, err := t.api.GetActiveTorrentCount() count, err := t.api.GetActiveTorrentCount()
// if err != nil { if err != nil {
// t.log.Warnf("Cannot get active downloads count: %v", err) t.log.Warnf("Cannot get active downloads count: %v", err)
// if retryCount >= maxRetries { if retryCount >= maxRetries {
// t.log.Error("Max retries reached. Exiting.") t.log.Error("Max retries reached. Exiting.")
// return false return false
// } }
// delay := time.Duration(math.Pow(2, float64(retryCount))) * baseDelay delay := time.Duration(math.Pow(2, float64(retryCount))) * baseDelay
// if delay > maxDelay { if delay > maxDelay {
// delay = maxDelay delay = maxDelay
// } }
// time.Sleep(delay) time.Sleep(delay)
// retryCount++ retryCount++
// continue continue
// } }
// if count.DownloadingCount < count.MaxNumberOfTorrents { if count.DownloadingCount < count.MaxNumberOfTorrents {
// t.log.Infof("We can still add a new torrent, we have capacity for %d more", count.MaxNumberOfTorrents-count.DownloadingCount) t.log.Infof("We can still add a new torrent, we have capacity for %d more", count.MaxNumberOfTorrents-count.DownloadingCount)
// return true return true
// } }
// if retryCount >= maxRetries { if retryCount >= maxRetries {
// t.log.Error("Max retries reached, exiting") t.log.Error("Max retries reached, exiting")
// return false return false
// } }
// delay := time.Duration(math.Pow(2, float64(retryCount))) * baseDelay delay := time.Duration(math.Pow(2, float64(retryCount))) * baseDelay
// if delay > maxDelay { if delay > maxDelay {
// delay = maxDelay delay = maxDelay
// } }
// time.Sleep(delay) time.Sleep(delay)
// retryCount++ retryCount++
// } }
// } }

View File

@@ -9,12 +9,19 @@ type Torrent struct {
AccessKey string AccessKey string
SelectedFiles cmap.ConcurrentMap[string, *File] SelectedFiles cmap.ConcurrentMap[string, *File]
LatestAdded string LatestAdded string
InProgress bool
ForRepair bool
Instances []realdebrid.TorrentInfo Instances []realdebrid.TorrentInfo
} }
func (t *Torrent) InProgress() bool {
for _, instance := range t.Instances {
if instance.Progress < 100 {
return true
}
}
return false
}
type File struct { type File struct {
realdebrid.File realdebrid.File
Added string Added string

View File

@@ -1,7 +1,6 @@
package universal package universal
import ( import (
"fmt"
"io" "io"
"net/http" "net/http"
"path" "path"
@@ -67,7 +66,7 @@ func HandleGetRequest(w http.ResponseWriter, r *http.Request, t *intTor.TorrentM
} }
if data, exists := cache.Get(requestPath); exists { if data, exists := cache.Get(requestPath); exists {
streamFileToResponse(data, w, r, t, c, log) streamFileToResponse(file, data, w, r, t, c, log)
return return
} }
@@ -81,8 +80,9 @@ func HandleGetRequest(w http.ResponseWriter, r *http.Request, t *intTor.TorrentM
resp := t.UnrestrictUntilOk(link) resp := t.UnrestrictUntilOk(link)
if resp == nil { if resp == nil {
// go t.Repair(torrent.AccessKey) file.Link = ""
log.Warnf("File %s is no longer available, torrent is marked for repair", file.Path) t.SetChecksum("") // force a recheck
log.Warnf("File %s is no longer available", file.Path)
streamErrorVideo("https://www.youtube.com/watch?v=gea_FJrtFVA", w, r, t, c, log) streamErrorVideo("https://www.youtube.com/watch?v=gea_FJrtFVA", w, r, t, c, log)
return return
} else if resp.Filename != filename { } else if resp.Filename != filename {
@@ -99,14 +99,16 @@ func HandleGetRequest(w http.ResponseWriter, r *http.Request, t *intTor.TorrentM
} }
} }
cache.Add(requestPath, resp.Download) cache.Add(requestPath, resp.Download)
streamFileToResponse(resp.Download, w, r, t, c, log) streamFileToResponse(file, resp.Download, w, r, t, c, log)
} }
func streamFileToResponse(url string, w http.ResponseWriter, r *http.Request, torMgr *intTor.TorrentManager, cfg config.ConfigInterface, log *zap.SugaredLogger) { func streamFileToResponse(file *intTor.File, url string, w http.ResponseWriter, r *http.Request, torMgr *intTor.TorrentManager, cfg config.ConfigInterface, log *zap.SugaredLogger) {
// Create a new request for the file download. // Create a new request for the file download.
req, err := http.NewRequest(http.MethodGet, url, nil) req, err := http.NewRequest(http.MethodGet, url, nil)
if err != nil { if err != nil {
log.Errorf("Error creating new request: %v", err) if file != nil {
log.Errorf("Error creating new request for file %s: %v", file.Path, err)
}
streamErrorVideo("https://www.youtube.com/watch?v=H3NSrObyAxM", w, r, torMgr, cfg, log) streamErrorVideo("https://www.youtube.com/watch?v=H3NSrObyAxM", w, r, torMgr, cfg, log)
return return
} }
@@ -121,20 +123,22 @@ func streamFileToResponse(url string, w http.ResponseWriter, r *http.Request, to
resp, err := client.Do(req) resp, err := client.Do(req)
if err != nil { if err != nil {
log.Warnf("Cannot download file %v ; torrent is marked for repair", err) if file != nil {
// if torrent != nil { log.Warnf("Cannot download file %s: %v", file.Path, err)
// go t.Repair(torrent.AccessKey) file.Link = ""
// } torMgr.SetChecksum("") // force a recheck
}
streamErrorVideo("https://www.youtube.com/watch?v=FSSd8cponAA", w, r, torMgr, cfg, log) streamErrorVideo("https://www.youtube.com/watch?v=FSSd8cponAA", w, r, torMgr, cfg, log)
return return
} }
defer resp.Body.Close() defer resp.Body.Close()
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusPartialContent { if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusPartialContent {
log.Warnf("Received a %s status code ; torrent is marked for repair", resp.Status) if file != nil {
// if torrent != nil { log.Warnf("Received a %s status code for file %s", resp.Status, file.Path)
// go t.Repair(torrent.AccessKey) file.Link = ""
// } torMgr.SetChecksum("") // force a recheck
}
streamErrorVideo("https://www.youtube.com/watch?v=BcseUxviVqE", w, r, torMgr, cfg, log) streamErrorVideo("https://www.youtube.com/watch?v=BcseUxviVqE", w, r, torMgr, cfg, log)
return return
} }
@@ -155,83 +159,5 @@ func streamErrorVideo(link string, w http.ResponseWriter, r *http.Request, t *in
http.Error(w, "REAL-DEBRID IS DOWN", http.StatusInternalServerError) http.Error(w, "REAL-DEBRID IS DOWN", http.StatusInternalServerError)
return return
} }
streamFileToResponse(resp.Download, w, r, t, c, log) streamFileToResponse(nil, resp.Download, w, r, t, c, log)
}
func createErrorFile(path, link string) *intTor.File {
ret := intTor.File{
Link: link,
}
ret.Path = path
return &ret
}
func GetFileReader(file *intTor.File, offset int64, size int, torMgr *intTor.TorrentManager, cfg config.ConfigInterface, log *zap.SugaredLogger) []byte {
unres := torMgr.UnrestrictUntilOk(file.Link)
if unres == nil {
if strings.Contains(file.Link, "www.youtube.com") {
log.Errorf("Even the error page is broken! Sorry!")
return nil
}
log.Warnf("File %s is no longer available, torrent is marked for repair", file.Path)
// if torrent != nil {
// go torMgr.Repair(torrent.AccessKey)
// }
errFile := createErrorFile("unavailable.mp4", "https://www.youtube.com/watch?v=gea_FJrtFVA")
return GetFileReader(errFile, 0, 0, torMgr, cfg, log)
}
req, err := http.NewRequest(http.MethodGet, unres.Download, nil)
if err != nil {
if strings.Contains(file.Link, "www.youtube.com") {
log.Errorf("Even the error page is broken! Sorry!")
return nil
}
log.Errorf("Error creating new request: %v", err)
errFile := createErrorFile("new_request.mp4", "https://www.youtube.com/watch?v=H3NSrObyAxM")
return GetFileReader(errFile, 0, 0, torMgr, cfg, log)
}
if size == 0 {
size = int(file.Bytes)
}
req.Header.Add("Range", fmt.Sprintf("bytes=%v-%v", offset, offset+int64(size)-1))
client := zurghttp.NewHTTPClient(cfg.GetToken(), 10, cfg)
resp, err := client.Do(req)
if err != nil {
if strings.Contains(file.Link, "www.youtube.com") {
log.Errorf("Even the error page is broken! Sorry!")
return nil
}
log.Warnf("Cannot download file %v ; torrent is marked for repair", err)
// if torrent != nil {
// go torMgr.Repair(torrent.AccessKey)
// }
errFile := createErrorFile("cannot_download.mp4", "https://www.youtube.com/watch?v=FSSd8cponAA")
return GetFileReader(errFile, 0, 0, torMgr, cfg, log)
}
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusPartialContent {
if strings.Contains(file.Link, "www.youtube.com") {
log.Errorf("Even the error page is broken! Sorry!")
return nil
}
log.Warnf("Received a %s status code ; torrent is marked for repair", resp.Status)
// if torrent != nil {
// go torMgr.Repair(torrent.AccessKey)
// }
errFile := createErrorFile("not_ok_status.mp4", "https://www.youtube.com/watch?v=BcseUxviVqE")
return GetFileReader(errFile, 0, 0, torMgr, cfg, log)
}
defer resp.Body.Close()
requestedBytes, err := io.ReadAll(resp.Body)
if err != nil {
if err != io.EOF {
log.Errorf("Error reading bytes: %v", err)
errFile := createErrorFile("read_error.mp4", "https://www.youtube.com/watch?v=t9VgOriBHwE")
return GetFileReader(errFile, 0, 0, torMgr, cfg, log)
}
}
return requestedBytes
} }

View File

@@ -1,7 +1,6 @@
package zfs package zfs
import ( import (
"fmt"
"strings" "strings"
"github.com/debridmediamanager.com/zurg/internal/config" "github.com/debridmediamanager.com/zurg/internal/config"
@@ -100,7 +99,6 @@ func (fs *ZurgFS) Getattr(path string, stat *fuse.Stat_t, fh uint64) (errc int)
func (fs *ZurgFS) Read(path string, buff []byte, ofst int64, fh uint64) (n int) { func (fs *ZurgFS) Read(path string, buff []byte, ofst int64, fh uint64) (n int) {
segments := splitIntoSegments(path) segments := splitIntoSegments(path)
fmt.Println("seg", segments)
if len(segments) != 3 { if len(segments) != 3 {
return -fuse.ENOENT return -fuse.ENOENT
} else if directory, dirFound := fs.TorrentManager.DirectoryMap.Get(segments[0]); !dirFound { } else if directory, dirFound := fs.TorrentManager.DirectoryMap.Get(segments[0]); !dirFound {

View File

@@ -277,24 +277,25 @@ func (rd *RealDebrid) UnrestrictLink(link string) (*UnrestrictResponse, error) {
req.Header.Set("Content-Type", "application/x-www-form-urlencoded") req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
// at this point, any errors mean that the link has expired and we need to repair it
resp, err := rd.client.Do(req) resp, err := rd.client.Do(req)
if err != nil { if err != nil {
rd.log.Errorf("Error when executing the unrestrict link request: %v", err) // rd.log.Errorf("Error when executing the unrestrict link request: %v", err)
return nil, err return nil, fmt.Errorf("unrestrict link request failed so likely it has expired")
} }
defer resp.Body.Close() defer resp.Body.Close()
body, err := io.ReadAll(resp.Body) body, err := io.ReadAll(resp.Body)
if err != nil { if err != nil {
rd.log.Errorf("Error when reading the body of unrestrict link response: %v", err) // rd.log.Errorf("Error when reading the body of unrestrict link response: %v", err)
return nil, err return nil, fmt.Errorf("unreadable body so likely it has expired")
} }
var response UnrestrictResponse var response UnrestrictResponse
err = json.Unmarshal(body, &response) err = json.Unmarshal(body, &response)
if err != nil { if err != nil {
rd.log.Errorf("Error when decoding unrestrict link JSON: %v", err) // rd.log.Errorf("Error when decoding unrestrict link JSON: %v", err)
return nil, err return nil, fmt.Errorf("undecodable response so likely it has expired")
} }
if !canFetchFirstByte(response.Download) { if !canFetchFirstByte(response.Download) {

View File

@@ -58,7 +58,6 @@ type TorrentInfo struct {
OriginalName string `json:"original_filename"` // from info OriginalName string `json:"original_filename"` // from info
OriginalBytes int64 `json:"original_bytes"` // from info OriginalBytes int64 `json:"original_bytes"` // from info
Files []File `json:"files"` // from info Files []File `json:"files"` // from info
ForRepair bool `json:"-"`
Version string `json:"-"` Version string `json:"-"`
} }

View File

@@ -20,14 +20,31 @@ func (rd *RealDebrid) UnrestrictUntilOk(link string) *UnrestrictResponse {
} }
func retryUntilOk[T any](fn func() (T, error)) T { func retryUntilOk[T any](fn func() (T, error)) T {
const initialDelay = 1 * time.Second const initialDelay = 2 * time.Second
const maxDelay = 128 * time.Second const maxDelay = 128 * time.Second
for i := 0; ; i++ { const maxRetries = 5 // Maximum retries for non-429 errors
result, err := fn()
if err == nil || !strings.Contains(err.Error(), "429") { var result T
var err error
var retryCount int
for {
result, err = fn()
if err == nil {
return result return result
} }
delay := time.Duration(math.Min(float64(initialDelay*time.Duration(math.Pow(2, float64(i)))), float64(maxDelay)))
// If error is 429, we retry indefinitely, hence no condition to break the loop.
if !strings.Contains(err.Error(), "429") {
retryCount++
if retryCount >= maxRetries {
// If we've reached the maximum retries for errors other than 429, return the last result.
return result
}
}
// Calculate delay with exponential backoff
delay := time.Duration(math.Min(float64(initialDelay)*math.Pow(2, float64(retryCount)), float64(maxDelay)))
time.Sleep(delay) time.Sleep(delay)
} }
} }