From aca2347ffccd8a240130203c8af6a7c1438bd1c8 Mon Sep 17 00:00:00 2001 From: Ben Sarmiento Date: Mon, 29 Apr 2024 09:37:59 +0200 Subject: [PATCH] Parallel torrent fetching --- pkg/realdebrid/api.go | 147 ++++++++++++++++++++++++------------------ 1 file changed, 84 insertions(+), 63 deletions(-) diff --git a/pkg/realdebrid/api.go b/pkg/realdebrid/api.go index f212b36..8e7a675 100644 --- a/pkg/realdebrid/api.go +++ b/pkg/realdebrid/api.go @@ -14,16 +14,16 @@ import ( ) type RealDebrid struct { - client *zurghttp.HTTPClient + apiClient *zurghttp.HTTPClient unrestrictClient *zurghttp.HTTPClient downloadClient *zurghttp.HTTPClient cfg config.ConfigInterface log *logutil.Logger } -func NewRealDebrid(client, unrestrictClient, downloadClient *zurghttp.HTTPClient, cfg config.ConfigInterface, log *logutil.Logger) *RealDebrid { +func NewRealDebrid(apiClient, unrestrictClient, downloadClient *zurghttp.HTTPClient, cfg config.ConfigInterface, log *logutil.Logger) *RealDebrid { return &RealDebrid{ - client: client, + apiClient: apiClient, unrestrictClient: unrestrictClient, downloadClient: downloadClient, cfg: cfg, @@ -31,6 +31,7 @@ func NewRealDebrid(client, unrestrictClient, downloadClient *zurghttp.HTTPClient } } +// currently unused func (rd *RealDebrid) UnrestrictCheck(link string) (*Download, error) { data := url.Values{} data.Set("link", link) @@ -117,6 +118,7 @@ func (rd *RealDebrid) UnrestrictLink(link string, checkFirstByte bool) (*Downloa // GetTorrents returns all torrents, paginated // if customLimit is 0, the default limit of 500 is used func (rd *RealDebrid) GetTorrents(customLimit int, active bool) ([]Torrent, int, error) { + const GET_PARALLEL = 4 baseURL := "https://api.real-debrid.com/rest/1.0/torrents" var allTorrents []Torrent page := 1 @@ -126,59 +128,78 @@ func (rd *RealDebrid) GetTorrents(customLimit int, active bool) ([]Torrent, int, } totalCount := 0 - for { - params := url.Values{} - params.Set("page", fmt.Sprintf("%d", page)) - params.Set("limit", fmt.Sprintf("%d", limit)) - if active { - params.Set("filter", "active") - } - - reqURL := baseURL + "?" + params.Encode() - - req, err := http.NewRequest("GET", reqURL, nil) - if err != nil { - rd.log.Errorf("Error when creating a get torrents request: %v", err) - return nil, 0, err - } - - resp, err := rd.client.Do(req) - if err != nil { - rd.log.Errorf("Error when executing the get torrents request: %v", err) - return nil, 0, err - } - defer resp.Body.Close() - - if resp.StatusCode == http.StatusNoContent { - return allTorrents, 0, nil - } - - // if status code is not 2xx, return erro - - var torrents []Torrent - decoder := json.NewDecoder(resp.Body) - err = decoder.Decode(&torrents) - if err != nil { - rd.log.Errorf("Error when decoding get torrents JSON: %v", err) - return nil, 0, err - } - - allTorrents = append(allTorrents, torrents...) - - totalCountHeader := resp.Header.Get("x-total-count") - totalCount, err = strconv.Atoi(totalCountHeader) - if err != nil { - break - } - - if len(allTorrents) >= totalCount || (customLimit != 0 && customLimit <= len(allTorrents) && customLimit <= totalCount) { - break - } - - rd.log.Debugf("Got %d torrents (page %d), total count is %d", len(allTorrents), page, totalCount) - - page++ + type fetchResult struct { + torrents []Torrent + err error + count int } + + for { + results := make(chan fetchResult, 4) // Channel to collect results from goroutines + for i := 0; i < GET_PARALLEL; i++ { // Launch GET_PARALLEL concurrent fetches + go func(p int) { + params := url.Values{} + params.Set("page", fmt.Sprintf("%d", p)) + params.Set("limit", fmt.Sprintf("%d", limit)) + if active { + params.Set("filter", "active") + } + + reqURL := baseURL + "?" + params.Encode() + req, err := http.NewRequest("GET", reqURL, nil) + if err != nil { + results <- fetchResult{nil, err, 0} + return + } + + resp, err := rd.apiClient.Do(req) + if err != nil { + results <- fetchResult{nil, err, 0} + return + } + defer resp.Body.Close() + + if resp.StatusCode == http.StatusNoContent { + results <- fetchResult{nil, nil, 0} + return + } + + var torrents []Torrent + decoder := json.NewDecoder(resp.Body) + err = decoder.Decode(&torrents) + if err != nil { + results <- fetchResult{nil, err, 0} + return + } + + countHeader := resp.Header.Get("x-total-count") + count, _ := strconv.Atoi(countHeader) // In real use, handle this error + + results <- fetchResult{torrents, nil, count} + }(page + i) + } + + // Collect results from all goroutines + for i := 0; i < GET_PARALLEL; i++ { + result := <-results + if result.err != nil { + return nil, 0, result.err + } + allTorrents = append(allTorrents, result.torrents...) + if totalCount == 0 { // Set totalCount from the first successful fetch + totalCount = result.count + } + } + + // Increment page by GET_PARALLEL for the next iteration of GET_PARALLEL concurrent fetches + page += GET_PARALLEL + + // Break loop if all torrents fetched or the limit is reached + if len(allTorrents) >= totalCount || (customLimit != 0 && len(allTorrents) >= customLimit) { + break + } + } + return allTorrents, totalCount, nil } @@ -191,7 +212,7 @@ func (rd *RealDebrid) GetTorrentInfo(id string) (*TorrentInfo, error) { return nil, err } - resp, err := rd.client.Do(req) + resp, err := rd.apiClient.Do(req) if err != nil { rd.log.Errorf("Error when executing the get info request: %v", err) return nil, err @@ -230,7 +251,7 @@ func (rd *RealDebrid) SelectTorrentFiles(id string, files string) error { req.Header.Set("Content-Type", "application/x-www-form-urlencoded") - resp, err := rd.client.Do(req) + resp, err := rd.apiClient.Do(req) if err != nil { rd.log.Errorf("Error when executing the select files request: %v", err) return err @@ -252,7 +273,7 @@ func (rd *RealDebrid) DeleteTorrent(id string) error { } // Send the request - resp, err := rd.client.Do(req) + resp, err := rd.apiClient.Do(req) if err != nil { rd.log.Errorf("Error when executing the delete torrent request: %v", err) return err @@ -281,7 +302,7 @@ func (rd *RealDebrid) AddMagnetHash(magnet string) (*MagnetResponse, error) { req.Header.Set("Content-Type", "application/x-www-form-urlencoded") // Send the request - resp, err := rd.client.Do(req) + resp, err := rd.apiClient.Do(req) if err != nil { rd.log.Errorf("Error when executing the add magnet request: %v", err) return nil, err @@ -310,7 +331,7 @@ func (rd *RealDebrid) GetActiveTorrentCount() (*ActiveTorrentCountResponse, erro } // Send the request - resp, err := rd.client.Do(req) + resp, err := rd.apiClient.Do(req) if err != nil { rd.log.Errorf("Error when executing the active torrents request: %v", err) return nil, err @@ -347,7 +368,7 @@ func (rd *RealDebrid) GetDownloads(page, offset int) ([]Download, int, error) { return nil, 0, err } - resp, err := rd.client.Do(req) + resp, err := rd.apiClient.Do(req) if err != nil { rd.log.Errorf("Error when executing the get downloads request: %v", err) return nil, 0, err @@ -391,7 +412,7 @@ func (rd *RealDebrid) GetUserInformation() (*User, error) { } // Send the request - resp, err := rd.client.Do(req) + resp, err := rd.apiClient.Do(req) if err != nil { rd.log.Errorf("Error when executing the user information request: %v", err) return nil, err @@ -422,7 +443,7 @@ func (rd *RealDebrid) AvailabilityCheck(hashes []string) (AvailabilityResponse, return nil, err } - resp, err := rd.client.Do(req) + resp, err := rd.apiClient.Do(req) if err != nil { return nil, err }