From 2f777f63e91cdf7e5a0389c19c88cff3e398240f Mon Sep 17 00:00:00 2001 From: Ben Sarmiento Date: Sat, 25 May 2024 06:01:20 +0200 Subject: [PATCH] Add download cache again --- internal/torrent/manager.go | 51 +++++++++++------------ pkg/realdebrid/api.go | 82 +++++++++++++++++++++++++++++++++---- 2 files changed, 98 insertions(+), 35 deletions(-) diff --git a/internal/torrent/manager.go b/internal/torrent/manager.go index 566aca0..40502cf 100644 --- a/internal/torrent/manager.go +++ b/internal/torrent/manager.go @@ -30,8 +30,9 @@ type TorrentManager struct { log *logutil.Logger repairLog *logutil.Logger - DirectoryMap cmap.ConcurrentMap[string, cmap.ConcurrentMap[string, *Torrent]] // directory -> accessKey -> Torrent - DownloadMap cmap.ConcurrentMap[string, *realdebrid.Download] + DirectoryMap cmap.ConcurrentMap[string, cmap.ConcurrentMap[string, *Torrent]] // directory -> accessKey -> Torrent + DownloadMap cmap.ConcurrentMap[string, *realdebrid.Download] + DownloadCache cmap.ConcurrentMap[string, *realdebrid.Download] RootNode *fs.FileNode @@ -63,8 +64,9 @@ func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, w log: log, repairLog: repairLog, - DirectoryMap: cmap.New[cmap.ConcurrentMap[string, *Torrent]](), - DownloadMap: cmap.New[*realdebrid.Download](), + DirectoryMap: cmap.New[cmap.ConcurrentMap[string, *Torrent]](), + DownloadMap: cmap.New[*realdebrid.Download](), + DownloadCache: cmap.New[*realdebrid.Download](), RootNode: fs.NewFileNode("root", true), @@ -94,15 +96,18 @@ func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, w // proxy function func (t *TorrentManager) UnrestrictLinkUntilOk(link string) *realdebrid.Download { + if strings.HasPrefix(link, "https://real-debrid.com/d/") && t.DownloadCache.Has(link[0:39]) { + ret, _ := t.DownloadCache.Get(link[0:39]) + return ret + } ret, err := t.api.UnrestrictLink(link, t.Config.ShouldServeFromRclone()) + t.DownloadCache.Set(ret.Link[0:39], ret) if err != nil { t.log.Warnf("Cannot unrestrict link %s: %v", link, err) return nil } - if ret != nil && ret.Link != "" && ret.Filename != "" { - if t.Config.EnableDownloadMount() { - t.DownloadMap.Set(ret.Filename, ret) - } + if t.Config.EnableDownloadMount() { + t.DownloadMap.Set(ret.Filename, ret) } return ret } @@ -272,26 +277,20 @@ func (t *TorrentManager) mountDownloads() { return } t.DownloadMap.Clear() + t.DownloadCache.Clear() _ = t.workerPool.Submit(func() { - page := 1 - offset := 0 - for { - downloads, totalDownloads, err := t.api.GetDownloads(page, offset) - if err != nil { - // if we get an error, we just stop - t.log.Warnf("Cannot get downloads on page %d: %v", page, err) - continue - } - for i := range downloads { - t.DownloadMap.Set(downloads[i].Filename, &downloads[i]) - } - offset += len(downloads) - page++ - if offset >= totalDownloads { - break - } + downloads, totalDownloads, err := t.api.GetDownloads() + if err != nil { + t.log.Errorf("Cannot get downloads: %v", err) + } + t.log.Debugf("Got %d downloads", totalDownloads) + for i := range downloads { + idx := i + if strings.HasPrefix(downloads[idx].Link, "https://real-debrid.com/d/") { + t.DownloadCache.Set(downloads[idx].Link[0:39], &downloads[idx]) + } + t.DownloadMap.Set(downloads[idx].Filename, &downloads[idx]) } - t.log.Infof("Compiled into %d downloads", t.DownloadMap.Count()) }) } diff --git a/pkg/realdebrid/api.go b/pkg/realdebrid/api.go index d77d758..6dac4ce 100644 --- a/pkg/realdebrid/api.go +++ b/pkg/realdebrid/api.go @@ -356,9 +356,73 @@ func (rd *RealDebrid) GetActiveTorrentCount() (*ActiveTorrentCountResponse, erro } // GetDownloads returns all torrents, paginated -func (rd *RealDebrid) GetDownloads(page, offset int) ([]Download, int, error) { +func (rd *RealDebrid) GetDownloads() ([]Download, int, error) { + _, totalCount, err := rd.fetchPageOfDownloads(1, 0) + if err != nil { + return nil, 0, err + } + + // reset allDownloads + allDownloads := []Download{} + page := 1 + offset := 0 + limit := 100 + + // compute ceiling of totalCount / limit + maxPages := (totalCount + limit - 1) / limit + // rd.log.Debugf("Total count is %d, max pages is %d", totalCount, maxPages) + maxParallelThreads := 8 + if maxPages < maxParallelThreads { + maxParallelThreads = maxPages + } + for { + allResults := make(chan []Download, maxParallelThreads) // Channel to collect results from goroutines + errChan := make(chan error, maxParallelThreads) // Channel to collect errors from goroutines + for i := 0; i < maxParallelThreads; i++ { // Launch GET_PARALLEL concurrent fetches + go func(add int) { + if page+add > maxPages { + allResults <- nil + errChan <- nil + return + } + result, _, err := rd.fetchPageOfDownloads(page+add, offset+add*limit) + if err != nil { + allResults <- nil + errChan <- err + return + } + allResults <- result + errChan <- nil + }(i) + } + // Collect results from all goroutines + for i := 0; i < maxParallelThreads; i++ { + res := <-allResults + err := <-errChan + if err != nil { + return nil, 0, err + } + allDownloads = append(allDownloads, res...) + } + + // rd.log.Debugf("Got %d/%d downloads", len(allDownloads), totalCount) + + if len(allDownloads) >= totalCount || page >= maxPages { + break + } + + page += maxParallelThreads + offset += maxParallelThreads * limit + } + + rd.log.Debugf("Got %d downloads", len(allDownloads)) + + return allDownloads, totalCount, nil +} + +func (rd *RealDebrid) fetchPageOfDownloads(page, offset int) ([]Download, int, error) { baseURL := "https://api.real-debrid.com/rest/1.0/downloads" - var allDownloads []Download + var downloads []Download limit := 500 totalCount := 0 @@ -384,12 +448,15 @@ func (rd *RealDebrid) GetDownloads(page, offset int) ([]Download, int, error) { defer resp.Body.Close() if resp.StatusCode == http.StatusNoContent { - return allDownloads, 0, nil + return downloads, 0, nil } - // if status code is not 2xx, return erro + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + err := fmt.Errorf("unexpected status code: %d", resp.StatusCode) + rd.log.Errorf("Error when executing the get downloads request: %v", err) + return nil, 0, err + } - var downloads []Download decoder := json.NewDecoder(resp.Body) err = decoder.Decode(&downloads) if err != nil { @@ -397,16 +464,13 @@ func (rd *RealDebrid) GetDownloads(page, offset int) ([]Download, int, error) { return nil, 0, err } - allDownloads = append(allDownloads, downloads...) - totalCountHeader := resp.Header.Get("x-total-count") totalCount, err = strconv.Atoi(totalCountHeader) if err != nil { totalCount = 0 } - rd.log.Debugf("Got %d downloads (page %d), total count is %d", len(allDownloads)+offset, page, totalCount) - return allDownloads, totalCount, nil + return downloads, totalCount, nil } // GetUserInformation gets the current user information.