diff --git a/internal/torrent/manager.go b/internal/torrent/manager.go index 2775279..3715f3f 100644 --- a/internal/torrent/manager.go +++ b/internal/torrent/manager.go @@ -93,7 +93,7 @@ func (t *TorrentManager) mergeToMain(t1, t2 *Torrent) *Torrent { merged := t1 // Merge SelectedFiles - // iteration works! + // side note: iteration works! for el := t2.SelectedFiles.Front(); el != nil; el = el.Next() { if _, ok := merged.SelectedFiles.Get(el.Key); !ok { merged.SelectedFiles.Set(el.Key, el.Value) @@ -112,7 +112,9 @@ func (t *TorrentManager) mergeToMain(t1, t2 *Torrent) *Torrent { for _, instance := range merged.Instances { if instance.Progress != 100 { merged.InProgress = true - break + } + if instance.ForRepair { + merged.ForRepair = true } } @@ -121,7 +123,10 @@ func (t *TorrentManager) mergeToMain(t1, t2 *Torrent) *Torrent { // proxy func (t *TorrentManager) UnrestrictUntilOk(link string) *realdebrid.UnrestrictResponse { - return t.rd.UnrestrictUntilOk(link) + t.workerPool <- true + ret := t.rd.UnrestrictUntilOk(link) + <-t.workerPool + return ret } type torrentsResponse struct { @@ -211,7 +216,7 @@ func (t *TorrentManager) startRefreshJob() { }(i) } - // iteration works! + // side note: iteration works! var toDelete []string for el := t.TorrentMap.Front(); el != nil; el = el.Next() { found := false @@ -295,9 +300,9 @@ func (t *TorrentManager) getMoreInfo(rdTorrent realdebrid.Torrent) *Torrent { // chaotic file means RD will not output the desired file selection // e.g. even if we select just a single mkv, it will output a rar var isChaotic bool - selectedFiles, isChaotic = t.organizeChaos(&rdTorrent, selectedFiles) + selectedFiles, isChaotic = t.organizeChaos(info.Links, selectedFiles) if isChaotic { - t.log.Errorf("Torrent id=%s %s is unfixable, it's always returning an unstreamable link (it is no longer shown in your directories)", info.ID, info.Name) + t.log.Errorf("Torrent id=%s %s is unfixable, it is always returning an unstreamable link (it is no longer shown in your directories)", info.ID, info.Name) t.log.Debugf("You can try fixing it yourself magnet:?xt=urn:btih:%s", info.Hash) return nil } else { @@ -316,7 +321,7 @@ func (t *TorrentManager) getMoreInfo(rdTorrent realdebrid.Torrent) *Torrent { } } else if selectedFiles.Len() == len(info.Links) { // all links are still intact! good! - // iteration works! + // side note: iteration works! i := 0 for el := selectedFiles.Front(); el != nil; el = el.Next() { if i < len(info.Links) { @@ -428,31 +433,27 @@ func (t *TorrentManager) readFromFile(torrentID string) *realdebrid.TorrentInfo return &torrent } -func (t *TorrentManager) organizeChaos(info *realdebrid.Torrent, selectedFiles *orderedmap.OrderedMap[string, *File]) (*orderedmap.OrderedMap[string, *File], bool) { +func (t *TorrentManager) organizeChaos(links []string, selectedFiles *orderedmap.OrderedMap[string, *File]) (*orderedmap.OrderedMap[string, *File], bool) { type Result struct { Response *realdebrid.UnrestrictResponse } - resultsChan := make(chan Result, len(info.Links)) + resultsChan := make(chan Result, len(links)) var wg sync.WaitGroup - // Limit concurrency - sem := make(chan bool, t.config.GetNumOfWorkers()) - - for _, link := range info.Links { + for _, link := range links { wg.Add(1) - sem <- true go func(lnk string) { defer wg.Done() - defer func() { <-sem }() + t.workerPool <- true resp := t.rd.UnrestrictUntilOk(lnk) + <-t.workerPool resultsChan <- Result{Response: resp} }(link) } go func() { wg.Wait() - close(sem) close(resultsChan) }() @@ -462,7 +463,7 @@ func (t *TorrentManager) organizeChaos(info *realdebrid.Torrent, selectedFiles * continue } found := false - // iteration works! + // side note: iteration works! for el := selectedFiles.Front(); el != nil; el = el.Next() { if file, _ := selectedFiles.Get(el.Key); strings.Contains(file.Path, result.Response.Filename) { file.Link = result.Response.Link @@ -490,7 +491,7 @@ func (t *TorrentManager) organizeChaos(info *realdebrid.Torrent, selectedFiles * } func (t *TorrentManager) repairAll() { - // iteration works! + // side note: iteration works! for el := t.TorrentMap.Front(); el != nil; el = el.Next() { torrent := el.Value // do not repair if in progress @@ -498,199 +499,191 @@ func (t *TorrentManager) repairAll() { continue } - var missingFiles []File + // do not repair if all files have links + forRepair := false for el2 := torrent.SelectedFiles.Front(); el2 != nil; el2 = el2.Next() { - file, ok := torrent.SelectedFiles.Get(el2.Key) - if !ok { - continue - } - // check for case of repairs like - // case 1: missing links - // case 2: unrestrictable links TODO + file := el2.Value if file.Link == "" { - missingFiles = append(missingFiles, *file) + forRepair = true + break } } - if len(missingFiles) == 0 { + if !forRepair { + // if it was marked for repair, unmark it + torrent.ForRepair = false + t.TorrentMap.Set(torrent.AccessKey, torrent) continue } - for _, info := range torrent.Instances { - if info.ForRepair { - t.log.Infof("There were less links than was expected on %s %s; fixing...", info.ID, info.Name) - // t.repair(&info, true) - break // only repair the first one for repair and then move on - } + // when getting info, we mark it for repair if it's missing some links + if torrent.ForRepair { + t.log.Infof("There were less links than was expected on %s; fixing...", torrent.AccessKey) + t.Repair(torrent.AccessKey) + break // only repair the first one for repair and then move on } } } -// func (t *TorrentManager) repair(info *realdebrid.TorrentInfo, tryReinsertionFirst bool) { -// // file.Link == "" should be repaired -// // then we repair it! -// t.log.Infof("Repairing torrent id=%s", info.ID) -// // check if we can still add more downloads -// proceed := t.canCapacityHandle() -// if !proceed { -// t.log.Error("Cannot add more torrents, exiting") -// return -// } +func (t *TorrentManager) Repair(accessKey string) { + torrent, _ := t.TorrentMap.Get(accessKey) + if torrent == nil { + t.log.Errorf("Cannot find torrent %s anymore to repair it", accessKey) + return + } + if torrent.InProgress { + t.log.Infof("Torrent %s is in progress, cannot repair", torrent.AccessKey) + return + } -// // first solution: add the same selection, maybe it can be fixed by reinsertion? -// success := false -// if tryReinsertionFirst { -// success = t.reinsertTorrent(info, "", true) -// } -// if !success { -// // if all the selected files are missing but there are other streamable files -// var otherStreamableFileIDs []int -// for _, file := range info.Files { -// found := false -// for el := selectedFiles.Front(); el != nil; el = el.Next() { + // check if we can still add more downloads + proceed := t.canCapacityHandle() + if !proceed { + t.log.Error("Cannot add more torrents, ignoring repair request") + return + } -// } -// for _, selectedFile := range selectedFiles { -// if selectedFile.ID == file.ID { -// found = true -// break -// } -// } -// if !found && isStreamable(file.Path) { -// otherStreamableFileIDs = append(otherStreamableFileIDs, file.ID) -// } -// } -// if (len(missingFiles) == len(selectedFiles) || len(missingFiles) == 1) && len(otherStreamableFileIDs) > 0 { -// // we will download 1 extra streamable file to force a redownload of the missing files -// // or if there's only 1 missing file, we will download 1 more to prevent a rename -// missingFilesPlus1 := strings.Join(getFileIDs(missingFiles), ",") -// t.log.Infof("Redownloading %d missing files", len(missingFiles)) -// t.reinsertTorrent(info, missingFilesPlus1, false) -// } else if len(selectedFiles) > 1 { -// // if not, last resort: add only the missing files but do it in 2 batches -// half := len(missingFiles) / 2 -// missingFiles1 := strings.Join(getFileIDs(missingFiles[:half]), ",") -// missingFiles2 := strings.Join(getFileIDs(missingFiles[half:]), ",") -// if missingFiles1 != "" { -// t.log.Infof("Redownloading %d missing files; batch 1 of 2", len(missingFiles1)) -// t.reinsertTorrent(info, missingFiles1, false) -// } -// if missingFiles2 != "" { -// t.log.Infof("Redownloading %d missing files; batch 2 of 2", len(missingFiles2)) -// t.reinsertTorrent(info, missingFiles2, false) -// } else { -// t.log.Info("No other missing files left to reinsert") -// } -// } else { -// t.log.Infof("Torrent id=%s is unfixable as the only link cached in RD is already broken (it is no longer shown in your directories)", info.ID) -// t.log.Debugf("You can try fixing it yourself magnet:?xt=urn:btih:%s", info.Hash) -// return -// } -// t.log.Info("Waiting for downloads to finish") -// } -// } + // make the file messy + var links []string + for el := torrent.SelectedFiles.Front(); el != nil; el = el.Next() { + file := el.Value + if file.Link != "" { + links = append(links, file.Link) + } + file.Link = "" + } + selectedFiles, _ := t.organizeChaos(links, torrent.SelectedFiles) + torrent.SelectedFiles = selectedFiles + t.TorrentMap.Set(torrent.AccessKey, torrent) -// func (t *TorrentManager) reinsertTorrent(torrent *realdebrid.TorrentInfo, missingFiles string, deleteIfFailed bool) bool { -// // if missingFiles is not provided, look for missing files -// if missingFiles == "" { -// var tmpSelection string -// for _, file := range torrent.Files { -// if file.Selected == 0 { -// continue -// } -// tmpSelection += fmt.Sprintf("%d,", file.ID) -// } -// if tmpSelection == "" { -// return false -// } -// if len(tmpSelection) > 0 { -// missingFiles = tmpSelection[:len(tmpSelection)-1] -// } -// } + // first solution: add the same selection, maybe it can be fixed by reinsertion? + if t.reinsertTorrent(torrent, "") { + t.log.Infof("Redownloaded torrent %s to repair it", torrent.AccessKey) + return + } + // if all the selected files are missing but there are other streamable files + var missingFiles []File + for el := torrent.SelectedFiles.Front(); el != nil; el = el.Next() { + file := el.Value + if file.Link == "" { + missingFiles = append(missingFiles, *file) + } + } + if len(missingFiles) > 0 { + t.log.Infof("Redownloading %d missing files for torrent %s", len(missingFiles), torrent.AccessKey) + // if not, last resort: add only the missing files but do it in 2 batches + half := len(missingFiles) / 2 + missingFiles1 := strings.Join(getFileIDs(missingFiles[:half]), ",") + missingFiles2 := strings.Join(getFileIDs(missingFiles[half:]), ",") + if missingFiles1 != "" { + t.reinsertTorrent(torrent, missingFiles1) + } + if missingFiles2 != "" { + t.reinsertTorrent(torrent, missingFiles2) + } + } +} -// // redownload torrent -// resp, err := t.rd.AddMagnetHash(torrent.Hash) -// if err != nil { -// t.log.Errorf("Cannot redownload torrent: %v", err) -// return false -// } -// newTorrentID := resp.ID -// err = t.rd.SelectTorrentFiles(newTorrentID, missingFiles) -// if err != nil { -// t.log.Errorf("Cannot start redownloading: %v", err) -// } +func (t *TorrentManager) reinsertTorrent(torrent *Torrent, missingFiles string) bool { + // if missingFiles is not provided, look for missing files + if missingFiles == "" { + var tmpSelection string + for el := torrent.SelectedFiles.Front(); el != nil; el = el.Next() { + file := el.Value + tmpSelection += fmt.Sprintf("%d,", file.ID) + } + if tmpSelection == "" { + return false + } + if len(tmpSelection) > 0 { + missingFiles = tmpSelection[:len(tmpSelection)-1] + } + } -// if deleteIfFailed { -// if err != nil { -// t.rd.DeleteTorrent(newTorrentID) -// return false -// } -// time.Sleep(1 * time.Second) -// // see if the torrent is ready -// info, err := t.rd.GetTorrentInfo(newTorrentID) -// if err != nil { -// t.log.Errorf("Cannot get info on redownloaded torrent id=%s : %v", newTorrentID, err) -// if deleteIfFailed { -// t.rd.DeleteTorrent(newTorrentID) -// } -// return false -// } -// time.Sleep(1 * time.Second) + // redownload torrent + resp, err := t.rd.AddMagnetHash(torrent.Instances[0].Hash) + if err != nil { + t.log.Errorf("Cannot redownload torrent: %v", err) + return false + } + time.Sleep(1 * time.Second) -// if info.Progress != 100 { -// t.log.Infof("Torrent id=%s is not cached anymore so we have to wait until completion, currently %d%%", info.ID, info.Progress) -// t.rd.DeleteTorrent(newTorrentID) -// return false -// } + // select files + newTorrentID := resp.ID + err = t.rd.SelectTorrentFiles(newTorrentID, missingFiles) + if err != nil { + t.log.Errorf("Cannot start redownloading: %v", err) + t.rd.DeleteTorrent(newTorrentID) + return false + } + time.Sleep(10 * time.Second) -// missingCount := len(strings.Split(missingFiles, ",")) -// if len(info.Links) != missingCount { -// t.log.Infof("It didn't fix the issue for id=%s, only got %d files but we need %d, undoing", info.ID, len(info.Links), missingCount) -// t.rd.DeleteTorrent(newTorrentID) -// return false -// } -// t.log.Infof("Redownload successful id=%s, deleting old torrent id=%s", newTorrentID, torrent.ID) -// t.rd.DeleteTorrent(torrent.ID) -// } -// return true -// } + // see if the torrent is ready + info, err := t.rd.GetTorrentInfo(newTorrentID) + if err != nil { + t.log.Errorf("Cannot get info on redownloaded torrent id=%s : %v", newTorrentID, err) + t.rd.DeleteTorrent(newTorrentID) + return false + } -// func (t *TorrentManager) canCapacityHandle() bool { -// // max waiting time is 45 minutes -// const maxRetries = 50 -// const baseDelay = 1 * time.Second -// const maxDelay = 60 * time.Second -// retryCount := 0 -// for { -// count, err := t.rd.GetActiveTorrentCount() -// if err != nil { -// t.log.Errorf("Cannot get active downloads count: %v", err) -// if retryCount >= maxRetries { -// t.log.Error("Max retries reached. Exiting.") -// return false -// } -// delay := time.Duration(math.Pow(2, float64(retryCount))) * baseDelay -// if delay > maxDelay { -// delay = maxDelay -// } -// time.Sleep(delay) -// retryCount++ -// continue -// } + if info.Status == "magnet_error" || info.Status == "error" || info.Status == "virus" || info.Status == "dead" { + t.log.Errorf("Redownloaded torrent id=%s is in error state: %s", newTorrentID, info.Status) + t.rd.DeleteTorrent(newTorrentID) + return false + } -// if count.DownloadingCount < count.MaxNumberOfTorrents { -// t.log.Infof("We can still add a new torrent, we have capacity for %d more", count.MaxNumberOfTorrents-count.DownloadingCount) -// return true -// } + if info.Progress != 100 { + t.log.Infof("Torrent id=%s is not cached anymore so we have to wait until completion (this should fix the issue already)", info.ID) + return true + } -// if retryCount >= maxRetries { -// t.log.Error("Max retries reached, exiting") -// return false -// } -// delay := time.Duration(math.Pow(2, float64(retryCount))) * baseDelay -// if delay > maxDelay { -// delay = maxDelay -// } -// time.Sleep(delay) -// retryCount++ -// } -// } + missingCount := len(strings.Split(missingFiles, ",")) + if len(info.Links) != missingCount { + t.log.Infof("It did not fix the issue for id=%s, only got %d files but we need %d, undoing", info.ID, len(info.Links), missingCount) + t.rd.DeleteTorrent(newTorrentID) + return false + } + + t.log.Infof("Repair successful id=%s", newTorrentID) + return true +} + +func (t *TorrentManager) canCapacityHandle() bool { + // max waiting time is 45 minutes + const maxRetries = 50 + const baseDelay = 1 * time.Second + const maxDelay = 60 * time.Second + retryCount := 0 + for { + count, err := t.rd.GetActiveTorrentCount() + if err != nil { + t.log.Errorf("Cannot get active downloads count: %v", err) + if retryCount >= maxRetries { + t.log.Error("Max retries reached. Exiting.") + return false + } + delay := time.Duration(math.Pow(2, float64(retryCount))) * baseDelay + if delay > maxDelay { + delay = maxDelay + } + time.Sleep(delay) + retryCount++ + continue + } + + if count.DownloadingCount < count.MaxNumberOfTorrents { + t.log.Infof("We can still add a new torrent, we have capacity for %d more", count.MaxNumberOfTorrents-count.DownloadingCount) + return true + } + + if retryCount >= maxRetries { + t.log.Error("Max retries reached, exiting") + return false + } + delay := time.Duration(math.Pow(2, float64(retryCount))) * baseDelay + if delay > maxDelay { + delay = maxDelay + } + time.Sleep(delay) + retryCount++ + } +} diff --git a/internal/torrent/types.go b/internal/torrent/types.go index f4c35a7..9e1888f 100644 --- a/internal/torrent/types.go +++ b/internal/torrent/types.go @@ -11,6 +11,7 @@ type Torrent struct { Directories []string LatestAdded string InProgress bool + ForRepair bool Instances []realdebrid.TorrentInfo } diff --git a/internal/universal/get.go b/internal/universal/get.go index 6f190d0..4551039 100644 --- a/internal/universal/get.go +++ b/internal/universal/get.go @@ -40,10 +40,6 @@ func HandleGetRequest(w http.ResponseWriter, r *http.Request, t *torrent.Torrent } return } - if data, exists := cache.Get(requestPath); exists { - streamFileToResponse(data, w, r, t, c, log) - return - } baseDirectory := segments[len(segments)-3] accessKey := segments[len(segments)-2] @@ -62,9 +58,15 @@ func HandleGetRequest(w http.ResponseWriter, r *http.Request, t *torrent.Torrent http.Error(w, "File not found", http.StatusNotFound) return } + + if data, exists := cache.Get(requestPath); exists { + streamFileToResponse(torrent, data, w, r, t, c, log) + return + } + if file.Link == "" { // This is a dead file, serve an alternate file - log.Errorf("File %s is no longer available", filename) + log.Errorf("File %s is not yet available, zurg is repairing the torrent", filename) streamErrorVideo("https://www.youtube.com/watch?v=bGTqwt6vdcY", w, r, t, c, log) return } @@ -72,8 +74,8 @@ func HandleGetRequest(w http.ResponseWriter, r *http.Request, t *torrent.Torrent resp := t.UnrestrictUntilOk(link) if resp == nil { - log.Errorf("The link cannot be unrestricted, file %s is no longer available", file.Path) - // TODO: maybe repair the torrent? + go t.Repair(torrent.AccessKey) + log.Errorf("File %s is no longer available, torrent is marked for repair", file.Path) streamErrorVideo("https://www.youtube.com/watch?v=gea_FJrtFVA", w, r, t, c, log) return } else if resp.Filename != filename { @@ -88,10 +90,10 @@ func HandleGetRequest(w http.ResponseWriter, r *http.Request, t *torrent.Torrent } } cache.Add(requestPath, resp.Download) - streamFileToResponse(resp.Download, w, r, t, c, log) + streamFileToResponse(torrent, resp.Download, w, r, t, c, log) } -func streamFileToResponse(url string, w http.ResponseWriter, r *http.Request, t *torrent.TorrentManager, c config.ConfigInterface, log *zap.SugaredLogger) { +func streamFileToResponse(torrent *torrent.Torrent, url string, w http.ResponseWriter, r *http.Request, t *torrent.TorrentManager, c config.ConfigInterface, log *zap.SugaredLogger) { // Create a new request for the file download. req, err := http.NewRequest(http.MethodGet, url, nil) if err != nil { @@ -105,19 +107,25 @@ func streamFileToResponse(url string, w http.ResponseWriter, r *http.Request, t req.Header.Add("Range", r.Header.Get("Range")) } - // Create a custom HTTP client with a timeout. + // Create a custom HTTP client client := zurghttp.NewHTTPClient(c.GetToken(), 10) resp, err := client.Do(req) if err != nil { - log.Errorf("Error downloading file %v", err) + log.Errorf("Error downloading file %v ; torrent is marked for repair", err) + if torrent != nil { + go t.Repair(torrent.AccessKey) + } streamErrorVideo("https://www.youtube.com/watch?v=FSSd8cponAA", w, r, t, c, log) return } defer resp.Body.Close() if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusPartialContent { - log.Errorf("Received a nonOK status code %d", resp.StatusCode) + log.Errorf("Received a %s status code ; torrent is marked for repair", resp.Status) + if torrent != nil { + go t.Repair(torrent.AccessKey) + } streamErrorVideo("https://www.youtube.com/watch?v=BcseUxviVqE", w, r, t, c, log) return } @@ -138,5 +146,5 @@ func streamErrorVideo(link string, w http.ResponseWriter, r *http.Request, t *to http.Error(w, "REAL-DEBRID IS DOWN", http.StatusInternalServerError) return } - streamFileToResponse(resp.Download, w, r, t, c, log) + streamFileToResponse(nil, resp.Download, w, r, t, c, log) } diff --git a/pkg/realdebrid/types.go b/pkg/realdebrid/types.go index 230cf19..a27fb92 100644 --- a/pkg/realdebrid/types.go +++ b/pkg/realdebrid/types.go @@ -34,6 +34,7 @@ type TorrentInfo struct { Name string `json:"filename"` Hash string `json:"hash"` Progress int `json:"-"` + Status string `json:"status"` Added string `json:"added"` Bytes int64 `json:"bytes"` Links []string `json:"links"`