diff --git a/internal/torrent/manager.go b/internal/torrent/manager.go index 1a55098..3fc11f4 100644 --- a/internal/torrent/manager.go +++ b/internal/torrent/manager.go @@ -20,14 +20,17 @@ import ( "go.uber.org/zap" ) +const ALL_TORRENTS = "__all__" + type TorrentManager struct { - cfg config.ConfigInterface DirectoryMap cmap.ConcurrentMap[string, cmap.ConcurrentMap[string, *Torrent]] // directory -> accessKey -> Torrent - requiredVersion string checksum string + requiredVersion string + cfg config.ConfigInterface api *realdebrid.RealDebrid antsPool *ants.Pool log *zap.SugaredLogger + mu *sync.Mutex } // NewTorrentManager creates a new torrent manager @@ -41,6 +44,7 @@ func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, p api: api, antsPool: p, log: logutil.NewLogger().Named("manager"), + mu: &sync.Mutex{}, } // create special directory @@ -72,7 +76,7 @@ func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, p noInfoCount := 0 allCt := 0 - allTorrents, _ := t.DirectoryMap.Get("__all__") + allTorrents, _ := t.DirectoryMap.Get(ALL_TORRENTS) for info := range torrentsChan { allCt++ if info == nil { @@ -116,11 +120,11 @@ func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, p t.log.Infof("Compiled into %d torrents, %d were missing info", allTorrents.Count(), noInfoCount) - t.checksum = t.getChecksum() + t.SetChecksum(t.getChecksum()) - // if t.config.EnableRepair() { - // go t.repairAll() - // } + if t.cfg.EnableRepair() { + go t.repairAll() + } go t.startRefreshJob() return t @@ -148,17 +152,6 @@ func (t *TorrentManager) mergeToMain(t1, t2 *Torrent) *Torrent { mainTorrent.LatestAdded = t2.LatestAdded } - // InProgress - if one of the instances is in progress, then the whole torrent is in progress - mainTorrent.InProgress = false - for _, instance := range mainTorrent.Instances { - if instance.Progress != 100 { - mainTorrent.InProgress = true - } - if instance.ForRepair { - mainTorrent.ForRepair = true - } - } - return mainTorrent } @@ -173,6 +166,12 @@ type torrentsResponse struct { totalCount int } +func (t *TorrentManager) SetChecksum(checksum string) { + t.mu.Lock() + t.checksum = checksum + t.mu.Unlock() +} + // generates a checksum based on the number of torrents, the first torrent id and the number of active torrents func (t *TorrentManager) getChecksum() string { torrentsChan := make(chan torrentsResponse, 1) @@ -257,7 +256,7 @@ func (t *TorrentManager) startRefreshJob() { t.log.Infof("Fetched info for %d torrents", len(newTorrents)) noInfoCount := 0 - allTorrents, _ := t.DirectoryMap.Get("__all__") + allTorrents, _ := t.DirectoryMap.Get(ALL_TORRENTS) retain := make(map[string]bool) for info := range torrentsChan { if info == nil { @@ -309,17 +308,17 @@ func (t *TorrentManager) startRefreshJob() { t.DirectoryMap.IterCb(func(_ string, torrents cmap.ConcurrentMap[string, *Torrent]) { torrents.Remove(oldAccessKey) }) - fmt.Printf("Deleted torrent: %s\n", oldAccessKey) + t.log.Infof("Deleted torrent: %s\n", oldAccessKey) } } t.log.Infof("Compiled into %d torrents, %d were missing info", allTorrents.Count(), noInfoCount) - t.checksum = t.getChecksum() + t.SetChecksum(t.getChecksum()) - // if t.config.EnableRepair() { - // go t.repairAll() - // } + if t.cfg.EnableRepair() { + go t.repairAll() + } go OnLibraryUpdateHook(t.cfg) } } @@ -352,7 +351,6 @@ func (t *TorrentManager) getMoreInfo(rdTorrent realdebrid.Torrent) *Torrent { var selectedFiles []*File streamableCount := 0 // if some Links are empty, we need to repair it - forRepair := false for _, file := range info.Files { if isStreamable(file.Path) { streamableCount++ @@ -376,34 +374,22 @@ func (t *TorrentManager) getMoreInfo(rdTorrent realdebrid.Torrent) *Torrent { t.log.Warnf("Torrent id=%s %s is unplayable; it is always returning a rar file (it will no longer show up in your directories)", info.ID, info.Name) // t.log.Debugf("You can try fixing it yourself magnet:?xt=urn:btih:%s", info.Hash) return nil - } else { - if streamableCount > 1 && t.cfg.EnableRepair() { - // case for repair 1: it's missing some links (or all links) - // if we download it as is, we might get the same file over and over again - // so we need to redownload it with other files selected - // that is why we check if there are other streamable files - t.log.Infof("Torrent id=%s %s marked for repair", info.ID, info.Name) - forRepair = true - } else if streamableCount == 1 { - t.log.Warnf("Torrent id=%s %s is unplayable; the lone streamable link has expired (it will no longer show up in your directories)", info.ID, info.Name) - // t.log.Debugf("You can try fixing it yourself magnet:?xt=urn:btih:%s", info.Hash) - return nil - } + } else if streamableCount == 1 { + t.log.Warnf("Torrent id=%s %s is unplayable; the lone streamable link has expired (it will no longer show up in your directories)", info.ID, info.Name) + // t.log.Debugf("You can try fixing it yourself magnet:?xt=urn:btih:%s", info.Hash) + return nil } } else if len(selectedFiles) == len(info.Links) { // all links are still intact! good! - // side note: iteration works! for i, file := range selectedFiles { file.Link = info.Links[i] i++ } } - info.ForRepair = forRepair torrent := Torrent{ AccessKey: t.getName(info.Name, info.OriginalName), LatestAdded: info.Added, - InProgress: info.Progress != 100, Instances: []realdebrid.TorrentInfo{*info}, } torrent.SelectedFiles = cmap.New[*File]() @@ -536,219 +522,201 @@ func (t *TorrentManager) organizeChaos(links []string, selectedFiles []*File) ([ return selectedFiles, isChaotic } -// func (t *TorrentManager) repairAll() { -// t.log.Info("Checking for torrents to repair") -// // side note: iteration works! -// for el := t.TorrentMap.Front(); el != nil; el = el.Next() { -// torrent := el.Value -// // do not repair if in progress -// if torrent.InProgress { -// continue -// } +func (t *TorrentManager) repairAll() { + t.log.Info("Checking for torrents to repair") + allTorrents, _ := t.DirectoryMap.Get(ALL_TORRENTS) + allTorrents.IterCb(func(_ string, torrent *Torrent) { + if torrent.InProgress() { + t.log.Infof("Torrent %s is in progress, cannot repair", torrent.AccessKey) + return + } + forRepair := false + torrent.SelectedFiles.IterCb(func(_ string, file *File) { + if file.Link == "" { + forRepair = true + } + }) + if forRepair { + t.Repair(torrent.AccessKey) + } + }) +} -// // do not repair if all files have links -// forRepair := false -// for el2 := torrent.SelectedFiles.Front(); el2 != nil; el2 = el2.Next() { -// file := el2.Value -// if file.Link == "" { -// forRepair = true -// break -// } -// } -// if !forRepair { -// // if it was marked for repair, unmark it -// torrent.ForRepair = false -// t.mu.Lock() -// t.TorrentMap.Set(torrent.AccessKey, torrent) -// t.mu.Unlock() -// continue -// } +func (t *TorrentManager) Repair(accessKey string) { + if !t.cfg.EnableRepair() { + t.log.Warn("Repair is disabled; if you do not have other zurg instances running, you should enable repair") + return + } -// // when getting info, we mark it for repair if it's missing some links -// if torrent.ForRepair { -// t.log.Infof("Found torrent for repair: %s", torrent.AccessKey) -// t.Repair(torrent.AccessKey) -// break // only repair the first one for repair and then move on -// } -// } -// } + allTorrents, _ := t.DirectoryMap.Get(ALL_TORRENTS) + torrent, _ := allTorrents.Get(accessKey) + if torrent == nil { + t.log.Warnf("Cannot find torrent %s anymore to repair it", accessKey) + return + } -// func (t *TorrentManager) Repair(accessKey string) { -// if lastRepair, ok := t.repairMap.Get(accessKey); ok { -// if time.Since(lastRepair) < time.Duration(24*time.Hour) { // magic number: 24 hrs -// return -// } -// } -// t.mu.Lock() -// t.repairMap.Set(accessKey, time.Now()) -// t.mu.Unlock() + if torrent.InProgress() { + t.log.Infof("Torrent %s is in progress, cannot repair", torrent.AccessKey) + return + } -// if !t.config.EnableRepair() { -// t.log.Warn("Repair is disabled; if you do not have other zurg instances running, you should enable repair") -// return -// } + proceed := t.canCapacityHandle() // blocks for approx 45 minutes + if !proceed { + t.log.Error("Cannot add more torrents, ignoring repair request") + return + } -// torrent, _ := t.TorrentMap.Get(accessKey) -// if torrent == nil { -// t.log.Warnf("Cannot find torrent %s anymore to repair it", accessKey) -// return -// } -// if torrent.InProgress { -// t.log.Infof("Torrent %s is in progress, cannot repair", torrent.AccessKey) -// return -// } + // make the file messy + t.log.Infof("Evaluating whole torrent to find the correct files for torrent: %s", torrent.AccessKey) -// // check if we can still add more downloads -// proceed := t.canCapacityHandle() -// if !proceed { -// t.log.Error("Cannot add more torrents, ignoring repair request") -// return -// } + var selectedFiles []*File + var links []string + torrent.SelectedFiles.IterCb(func(_ string, file *File) { + selectedFiles = append(selectedFiles, file) + if file.Link != "" { + links = append(links, file.Link) + } + file.Link = "" // empty the links = chaos! + }) + selectedFiles, _ = t.organizeChaos(links, selectedFiles) + for _, newFile := range selectedFiles { + if file, exists := torrent.SelectedFiles.Get(filepath.Base(newFile.Path)); exists { + file.Link = newFile.Link + } else { + torrent.SelectedFiles.Set(filepath.Base(newFile.Path), newFile) + } + } -// // make the file messy -// var links []string -// for el := torrent.SelectedFiles.Front(); el != nil; el = el.Next() { -// file := el.Value -// if file.Link != "" { -// links = append(links, file.Link) -// } -// file.Link = "" -// } -// selectedFiles, _ := t.organizeChaos(links, torrent.SelectedFiles) -// torrent.SelectedFiles = selectedFiles -// t.mu.Lock() -// t.TorrentMap.Set(torrent.AccessKey, torrent) -// t.mu.Unlock() + // first solution: add the same selection, maybe it can be fixed by reinsertion? + if t.reinsertTorrent(torrent, "") { + t.log.Infof("Successfully downloaded torrent %s to repair it", torrent.AccessKey) + return + } -// // first solution: add the same selection, maybe it can be fixed by reinsertion? -// if t.reinsertTorrent(torrent, "") { -// t.log.Infof("Successfully downloaded torrent %s to repair it", torrent.AccessKey) -// return -// } -// // if all the selected files are missing but there are other streamable files -// var missingFiles []File -// for el := torrent.SelectedFiles.Front(); el != nil; el = el.Next() { -// file := el.Value -// if file.Link == "" { -// missingFiles = append(missingFiles, *file) -// } -// } -// if len(missingFiles) > 0 { -// t.log.Infof("Redownloading %d missing files for torrent %s", len(missingFiles), torrent.AccessKey) -// // if not, last resort: add only the missing files but do it in 2 batches -// half := len(missingFiles) / 2 -// missingFiles1 := strings.Join(getFileIDs(missingFiles[:half]), ",") -// missingFiles2 := strings.Join(getFileIDs(missingFiles[half:]), ",") -// if missingFiles1 != "" { -// t.reinsertTorrent(torrent, missingFiles1) -// } -// if missingFiles2 != "" { -// t.reinsertTorrent(torrent, missingFiles2) -// } -// } -// } + // if all the selected files are missing but there are other streamable files + var missingFiles []File + torrent.SelectedFiles.IterCb(func(_ string, file *File) { + if file.Link == "" { + missingFiles = append(missingFiles, *file) + } + }) + if len(missingFiles) > 0 { + t.log.Infof("Redownloading in multiple batches the %d missing files for torrent %s", len(missingFiles), torrent.AccessKey) + // if not, last resort: add only the missing files but do it in 2 batches + half := len(missingFiles) / 2 + missingFiles1 := strings.Join(getFileIDs(missingFiles[:half]), ",") + missingFiles2 := strings.Join(getFileIDs(missingFiles[half:]), ",") + if missingFiles1 != "" { + t.reinsertTorrent(torrent, missingFiles1) + } + if missingFiles2 != "" { + t.reinsertTorrent(torrent, missingFiles2) + } + } +} -// func (t *TorrentManager) reinsertTorrent(torrent *Torrent, missingFiles string) bool { -// // if missingFiles is not provided, look for missing files -// if missingFiles == "" { -// var tmpSelection string -// for el := torrent.SelectedFiles.Front(); el != nil; el = el.Next() { -// file := el.Value -// tmpSelection += fmt.Sprintf("%d,", file.ID) -// } -// if tmpSelection == "" { -// return false -// } -// if len(tmpSelection) > 0 { -// missingFiles = tmpSelection[:len(tmpSelection)-1] -// } -// } +func (t *TorrentManager) reinsertTorrent(torrent *Torrent, missingFiles string) bool { + // if missingFiles is not provided, missing files means missing links + if missingFiles == "" { + tmpSelection := "" + torrent.SelectedFiles.IterCb(func(_ string, file *File) { + if file.Link == "" { + tmpSelection += fmt.Sprintf("%d,", file.ID) + } + }) + if tmpSelection == "" { + return false + } + if len(tmpSelection) > 0 { + missingFiles = tmpSelection[:len(tmpSelection)-1] + } + } -// // redownload torrent -// resp, err := t.api.AddMagnetHash(torrent.Instances[0].Hash) -// if err != nil { -// t.log.Warnf("Cannot redownload torrent: %v", err) -// return false -// } -// time.Sleep(1 * time.Second) + // redownload torrent + resp, err := t.api.AddMagnetHash(torrent.Instances[0].Hash) + if err != nil { + t.log.Warnf("Cannot redownload torrent: %v", err) + return false + } + time.Sleep(1 * time.Second) -// // select files -// newTorrentID := resp.ID -// err = t.api.SelectTorrentFiles(newTorrentID, missingFiles) -// if err != nil { -// t.log.Warnf("Cannot start redownloading: %v", err) -// t.api.DeleteTorrent(newTorrentID) -// return false -// } -// time.Sleep(10 * time.Second) + // select files + newTorrentID := resp.ID + err = t.api.SelectTorrentFiles(newTorrentID, missingFiles) + if err != nil { + t.log.Warnf("Cannot start redownloading: %v", err) + t.api.DeleteTorrent(newTorrentID) + return false + } + time.Sleep(10 * time.Second) -// // see if the torrent is ready -// info, err := t.api.GetTorrentInfo(newTorrentID) -// if err != nil { -// t.log.Warnf("Cannot get info on redownloaded torrent id=%s : %v", newTorrentID, err) -// t.api.DeleteTorrent(newTorrentID) -// return false -// } + // see if the torrent is ready + info, err := t.api.GetTorrentInfo(newTorrentID) + if err != nil { + t.log.Warnf("Cannot get info on redownloaded torrent id=%s : %v", newTorrentID, err) + t.api.DeleteTorrent(newTorrentID) + return false + } -// if info.Status == "magnet_error" || info.Status == "error" || info.Status == "virus" || info.Status == "dead" { -// t.log.Warnf("The redownloaded torrent id=%s is in error state: %s", newTorrentID, info.Status) -// t.api.DeleteTorrent(newTorrentID) -// return false -// } + if info.Status == "magnet_error" || info.Status == "error" || info.Status == "virus" || info.Status == "dead" { + t.log.Warnf("The redownloaded torrent id=%s is in error state: %s", newTorrentID, info.Status) + t.api.DeleteTorrent(newTorrentID) + return false + } -// if info.Progress != 100 { -// t.log.Infof("Torrent id=%s is not cached anymore so we have to wait until completion (this should fix the issue already)", info.ID) -// return true -// } + if info.Progress != 100 { + t.log.Infof("Torrent id=%s is not cached anymore so we have to wait until completion (this should fix the issue already)", info.ID) + return true + } -// missingCount := len(strings.Split(missingFiles, ",")) -// if len(info.Links) != missingCount { -// t.log.Infof("It did not fix the issue for id=%s, only got %d files but we need %d, undoing", info.ID, len(info.Links), missingCount) -// t.api.DeleteTorrent(newTorrentID) -// return false -// } + missingCount := len(strings.Split(missingFiles, ",")) + if len(info.Links) != missingCount { + t.log.Infof("It did not fix the issue for id=%s, only got %d files but we need %d, undoing", info.ID, len(info.Links), missingCount) + t.api.DeleteTorrent(newTorrentID) + return false + } -// t.log.Infof("Repair successful id=%s", newTorrentID) -// return true -// } + t.log.Infof("Repair successful id=%s", newTorrentID) + return true +} -// func (t *TorrentManager) canCapacityHandle() bool { -// // max waiting time is 45 minutes -// const maxRetries = 50 -// const baseDelay = 1 * time.Second -// const maxDelay = 60 * time.Second -// retryCount := 0 -// for { -// count, err := t.api.GetActiveTorrentCount() -// if err != nil { -// t.log.Warnf("Cannot get active downloads count: %v", err) -// if retryCount >= maxRetries { -// t.log.Error("Max retries reached. Exiting.") -// return false -// } -// delay := time.Duration(math.Pow(2, float64(retryCount))) * baseDelay -// if delay > maxDelay { -// delay = maxDelay -// } -// time.Sleep(delay) -// retryCount++ -// continue -// } +func (t *TorrentManager) canCapacityHandle() bool { + // max waiting time is 45 minutes + const maxRetries = 50 + const baseDelay = 1 * time.Second + const maxDelay = 60 * time.Second + retryCount := 0 + for { + count, err := t.api.GetActiveTorrentCount() + if err != nil { + t.log.Warnf("Cannot get active downloads count: %v", err) + if retryCount >= maxRetries { + t.log.Error("Max retries reached. Exiting.") + return false + } + delay := time.Duration(math.Pow(2, float64(retryCount))) * baseDelay + if delay > maxDelay { + delay = maxDelay + } + time.Sleep(delay) + retryCount++ + continue + } -// if count.DownloadingCount < count.MaxNumberOfTorrents { -// t.log.Infof("We can still add a new torrent, we have capacity for %d more", count.MaxNumberOfTorrents-count.DownloadingCount) -// return true -// } + if count.DownloadingCount < count.MaxNumberOfTorrents { + t.log.Infof("We can still add a new torrent, we have capacity for %d more", count.MaxNumberOfTorrents-count.DownloadingCount) + return true + } -// if retryCount >= maxRetries { -// t.log.Error("Max retries reached, exiting") -// return false -// } -// delay := time.Duration(math.Pow(2, float64(retryCount))) * baseDelay -// if delay > maxDelay { -// delay = maxDelay -// } -// time.Sleep(delay) -// retryCount++ -// } -// } + if retryCount >= maxRetries { + t.log.Error("Max retries reached, exiting") + return false + } + delay := time.Duration(math.Pow(2, float64(retryCount))) * baseDelay + if delay > maxDelay { + delay = maxDelay + } + time.Sleep(delay) + retryCount++ + } +} diff --git a/internal/torrent/types.go b/internal/torrent/types.go index 9cba950..1ed2045 100644 --- a/internal/torrent/types.go +++ b/internal/torrent/types.go @@ -9,12 +9,19 @@ type Torrent struct { AccessKey string SelectedFiles cmap.ConcurrentMap[string, *File] LatestAdded string - InProgress bool - ForRepair bool Instances []realdebrid.TorrentInfo } +func (t *Torrent) InProgress() bool { + for _, instance := range t.Instances { + if instance.Progress < 100 { + return true + } + } + return false +} + type File struct { realdebrid.File Added string diff --git a/internal/universal/get.go b/internal/universal/get.go index 4f2634e..2992940 100644 --- a/internal/universal/get.go +++ b/internal/universal/get.go @@ -1,7 +1,6 @@ package universal import ( - "fmt" "io" "net/http" "path" @@ -67,7 +66,7 @@ func HandleGetRequest(w http.ResponseWriter, r *http.Request, t *intTor.TorrentM } if data, exists := cache.Get(requestPath); exists { - streamFileToResponse(data, w, r, t, c, log) + streamFileToResponse(file, data, w, r, t, c, log) return } @@ -81,8 +80,9 @@ func HandleGetRequest(w http.ResponseWriter, r *http.Request, t *intTor.TorrentM resp := t.UnrestrictUntilOk(link) if resp == nil { - // go t.Repair(torrent.AccessKey) - log.Warnf("File %s is no longer available, torrent is marked for repair", file.Path) + file.Link = "" + t.SetChecksum("") // force a recheck + log.Warnf("File %s is no longer available", file.Path) streamErrorVideo("https://www.youtube.com/watch?v=gea_FJrtFVA", w, r, t, c, log) return } else if resp.Filename != filename { @@ -99,14 +99,16 @@ func HandleGetRequest(w http.ResponseWriter, r *http.Request, t *intTor.TorrentM } } cache.Add(requestPath, resp.Download) - streamFileToResponse(resp.Download, w, r, t, c, log) + streamFileToResponse(file, resp.Download, w, r, t, c, log) } -func streamFileToResponse(url string, w http.ResponseWriter, r *http.Request, torMgr *intTor.TorrentManager, cfg config.ConfigInterface, log *zap.SugaredLogger) { +func streamFileToResponse(file *intTor.File, url string, w http.ResponseWriter, r *http.Request, torMgr *intTor.TorrentManager, cfg config.ConfigInterface, log *zap.SugaredLogger) { // Create a new request for the file download. req, err := http.NewRequest(http.MethodGet, url, nil) if err != nil { - log.Errorf("Error creating new request: %v", err) + if file != nil { + log.Errorf("Error creating new request for file %s: %v", file.Path, err) + } streamErrorVideo("https://www.youtube.com/watch?v=H3NSrObyAxM", w, r, torMgr, cfg, log) return } @@ -121,20 +123,22 @@ func streamFileToResponse(url string, w http.ResponseWriter, r *http.Request, to resp, err := client.Do(req) if err != nil { - log.Warnf("Cannot download file %v ; torrent is marked for repair", err) - // if torrent != nil { - // go t.Repair(torrent.AccessKey) - // } + if file != nil { + log.Warnf("Cannot download file %s: %v", file.Path, err) + file.Link = "" + torMgr.SetChecksum("") // force a recheck + } streamErrorVideo("https://www.youtube.com/watch?v=FSSd8cponAA", w, r, torMgr, cfg, log) return } defer resp.Body.Close() if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusPartialContent { - log.Warnf("Received a %s status code ; torrent is marked for repair", resp.Status) - // if torrent != nil { - // go t.Repair(torrent.AccessKey) - // } + if file != nil { + log.Warnf("Received a %s status code for file %s", resp.Status, file.Path) + file.Link = "" + torMgr.SetChecksum("") // force a recheck + } streamErrorVideo("https://www.youtube.com/watch?v=BcseUxviVqE", w, r, torMgr, cfg, log) return } @@ -155,83 +159,5 @@ func streamErrorVideo(link string, w http.ResponseWriter, r *http.Request, t *in http.Error(w, "REAL-DEBRID IS DOWN", http.StatusInternalServerError) return } - streamFileToResponse(resp.Download, w, r, t, c, log) -} - -func createErrorFile(path, link string) *intTor.File { - ret := intTor.File{ - Link: link, - } - ret.Path = path - return &ret -} - -func GetFileReader(file *intTor.File, offset int64, size int, torMgr *intTor.TorrentManager, cfg config.ConfigInterface, log *zap.SugaredLogger) []byte { - unres := torMgr.UnrestrictUntilOk(file.Link) - if unres == nil { - if strings.Contains(file.Link, "www.youtube.com") { - log.Errorf("Even the error page is broken! Sorry!") - return nil - } - log.Warnf("File %s is no longer available, torrent is marked for repair", file.Path) - // if torrent != nil { - // go torMgr.Repair(torrent.AccessKey) - // } - errFile := createErrorFile("unavailable.mp4", "https://www.youtube.com/watch?v=gea_FJrtFVA") - return GetFileReader(errFile, 0, 0, torMgr, cfg, log) - } - - req, err := http.NewRequest(http.MethodGet, unres.Download, nil) - if err != nil { - if strings.Contains(file.Link, "www.youtube.com") { - log.Errorf("Even the error page is broken! Sorry!") - return nil - } - log.Errorf("Error creating new request: %v", err) - errFile := createErrorFile("new_request.mp4", "https://www.youtube.com/watch?v=H3NSrObyAxM") - return GetFileReader(errFile, 0, 0, torMgr, cfg, log) - } - - if size == 0 { - size = int(file.Bytes) - } - req.Header.Add("Range", fmt.Sprintf("bytes=%v-%v", offset, offset+int64(size)-1)) - - client := zurghttp.NewHTTPClient(cfg.GetToken(), 10, cfg) - resp, err := client.Do(req) - if err != nil { - if strings.Contains(file.Link, "www.youtube.com") { - log.Errorf("Even the error page is broken! Sorry!") - return nil - } - log.Warnf("Cannot download file %v ; torrent is marked for repair", err) - // if torrent != nil { - // go torMgr.Repair(torrent.AccessKey) - // } - errFile := createErrorFile("cannot_download.mp4", "https://www.youtube.com/watch?v=FSSd8cponAA") - return GetFileReader(errFile, 0, 0, torMgr, cfg, log) - } - - if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusPartialContent { - if strings.Contains(file.Link, "www.youtube.com") { - log.Errorf("Even the error page is broken! Sorry!") - return nil - } - log.Warnf("Received a %s status code ; torrent is marked for repair", resp.Status) - // if torrent != nil { - // go torMgr.Repair(torrent.AccessKey) - // } - errFile := createErrorFile("not_ok_status.mp4", "https://www.youtube.com/watch?v=BcseUxviVqE") - return GetFileReader(errFile, 0, 0, torMgr, cfg, log) - } - defer resp.Body.Close() - requestedBytes, err := io.ReadAll(resp.Body) - if err != nil { - if err != io.EOF { - log.Errorf("Error reading bytes: %v", err) - errFile := createErrorFile("read_error.mp4", "https://www.youtube.com/watch?v=t9VgOriBHwE") - return GetFileReader(errFile, 0, 0, torMgr, cfg, log) - } - } - return requestedBytes + streamFileToResponse(nil, resp.Download, w, r, t, c, log) } diff --git a/internal/zfs/zfs.go b/internal/zfs/zfs.go index 66a473a..7463ab4 100644 --- a/internal/zfs/zfs.go +++ b/internal/zfs/zfs.go @@ -1,7 +1,6 @@ package zfs import ( - "fmt" "strings" "github.com/debridmediamanager.com/zurg/internal/config" @@ -100,7 +99,6 @@ func (fs *ZurgFS) Getattr(path string, stat *fuse.Stat_t, fh uint64) (errc int) func (fs *ZurgFS) Read(path string, buff []byte, ofst int64, fh uint64) (n int) { segments := splitIntoSegments(path) - fmt.Println("seg", segments) if len(segments) != 3 { return -fuse.ENOENT } else if directory, dirFound := fs.TorrentManager.DirectoryMap.Get(segments[0]); !dirFound { diff --git a/pkg/realdebrid/api.go b/pkg/realdebrid/api.go index 8f66dee..03dfca5 100644 --- a/pkg/realdebrid/api.go +++ b/pkg/realdebrid/api.go @@ -277,24 +277,25 @@ func (rd *RealDebrid) UnrestrictLink(link string) (*UnrestrictResponse, error) { req.Header.Set("Content-Type", "application/x-www-form-urlencoded") + // at this point, any errors mean that the link has expired and we need to repair it resp, err := rd.client.Do(req) if err != nil { - rd.log.Errorf("Error when executing the unrestrict link request: %v", err) - return nil, err + // rd.log.Errorf("Error when executing the unrestrict link request: %v", err) + return nil, fmt.Errorf("unrestrict link request failed so likely it has expired") } defer resp.Body.Close() body, err := io.ReadAll(resp.Body) if err != nil { - rd.log.Errorf("Error when reading the body of unrestrict link response: %v", err) - return nil, err + // rd.log.Errorf("Error when reading the body of unrestrict link response: %v", err) + return nil, fmt.Errorf("unreadable body so likely it has expired") } var response UnrestrictResponse err = json.Unmarshal(body, &response) if err != nil { - rd.log.Errorf("Error when decoding unrestrict link JSON: %v", err) - return nil, err + // rd.log.Errorf("Error when decoding unrestrict link JSON: %v", err) + return nil, fmt.Errorf("undecodable response so likely it has expired") } if !canFetchFirstByte(response.Download) { diff --git a/pkg/realdebrid/types.go b/pkg/realdebrid/types.go index 5d01982..2270c4b 100644 --- a/pkg/realdebrid/types.go +++ b/pkg/realdebrid/types.go @@ -58,7 +58,6 @@ type TorrentInfo struct { OriginalName string `json:"original_filename"` // from info OriginalBytes int64 `json:"original_bytes"` // from info Files []File `json:"files"` // from info - ForRepair bool `json:"-"` Version string `json:"-"` } diff --git a/pkg/realdebrid/unrestrict.go b/pkg/realdebrid/unrestrict.go index 72fee99..811b609 100644 --- a/pkg/realdebrid/unrestrict.go +++ b/pkg/realdebrid/unrestrict.go @@ -20,14 +20,31 @@ func (rd *RealDebrid) UnrestrictUntilOk(link string) *UnrestrictResponse { } func retryUntilOk[T any](fn func() (T, error)) T { - const initialDelay = 1 * time.Second + const initialDelay = 2 * time.Second const maxDelay = 128 * time.Second - for i := 0; ; i++ { - result, err := fn() - if err == nil || !strings.Contains(err.Error(), "429") { + const maxRetries = 5 // Maximum retries for non-429 errors + + var result T + var err error + var retryCount int + + for { + result, err = fn() + if err == nil { return result } - delay := time.Duration(math.Min(float64(initialDelay*time.Duration(math.Pow(2, float64(i)))), float64(maxDelay))) + + // If error is 429, we retry indefinitely, hence no condition to break the loop. + if !strings.Contains(err.Error(), "429") { + retryCount++ + if retryCount >= maxRetries { + // If we've reached the maximum retries for errors other than 429, return the last result. + return result + } + } + + // Calculate delay with exponential backoff + delay := time.Duration(math.Min(float64(initialDelay)*math.Pow(2, float64(retryCount)), float64(maxDelay))) time.Sleep(delay) } }