Parallel torrent fetching
This commit is contained in:
@@ -14,16 +14,16 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type RealDebrid struct {
|
type RealDebrid struct {
|
||||||
client *zurghttp.HTTPClient
|
apiClient *zurghttp.HTTPClient
|
||||||
unrestrictClient *zurghttp.HTTPClient
|
unrestrictClient *zurghttp.HTTPClient
|
||||||
downloadClient *zurghttp.HTTPClient
|
downloadClient *zurghttp.HTTPClient
|
||||||
cfg config.ConfigInterface
|
cfg config.ConfigInterface
|
||||||
log *logutil.Logger
|
log *logutil.Logger
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewRealDebrid(client, unrestrictClient, downloadClient *zurghttp.HTTPClient, cfg config.ConfigInterface, log *logutil.Logger) *RealDebrid {
|
func NewRealDebrid(apiClient, unrestrictClient, downloadClient *zurghttp.HTTPClient, cfg config.ConfigInterface, log *logutil.Logger) *RealDebrid {
|
||||||
return &RealDebrid{
|
return &RealDebrid{
|
||||||
client: client,
|
apiClient: apiClient,
|
||||||
unrestrictClient: unrestrictClient,
|
unrestrictClient: unrestrictClient,
|
||||||
downloadClient: downloadClient,
|
downloadClient: downloadClient,
|
||||||
cfg: cfg,
|
cfg: cfg,
|
||||||
@@ -31,6 +31,7 @@ func NewRealDebrid(client, unrestrictClient, downloadClient *zurghttp.HTTPClient
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// currently unused
|
||||||
func (rd *RealDebrid) UnrestrictCheck(link string) (*Download, error) {
|
func (rd *RealDebrid) UnrestrictCheck(link string) (*Download, error) {
|
||||||
data := url.Values{}
|
data := url.Values{}
|
||||||
data.Set("link", link)
|
data.Set("link", link)
|
||||||
@@ -117,6 +118,7 @@ func (rd *RealDebrid) UnrestrictLink(link string, checkFirstByte bool) (*Downloa
|
|||||||
// GetTorrents returns all torrents, paginated
|
// GetTorrents returns all torrents, paginated
|
||||||
// if customLimit is 0, the default limit of 500 is used
|
// if customLimit is 0, the default limit of 500 is used
|
||||||
func (rd *RealDebrid) GetTorrents(customLimit int, active bool) ([]Torrent, int, error) {
|
func (rd *RealDebrid) GetTorrents(customLimit int, active bool) ([]Torrent, int, error) {
|
||||||
|
const GET_PARALLEL = 4
|
||||||
baseURL := "https://api.real-debrid.com/rest/1.0/torrents"
|
baseURL := "https://api.real-debrid.com/rest/1.0/torrents"
|
||||||
var allTorrents []Torrent
|
var allTorrents []Torrent
|
||||||
page := 1
|
page := 1
|
||||||
@@ -126,59 +128,78 @@ func (rd *RealDebrid) GetTorrents(customLimit int, active bool) ([]Torrent, int,
|
|||||||
}
|
}
|
||||||
totalCount := 0
|
totalCount := 0
|
||||||
|
|
||||||
for {
|
type fetchResult struct {
|
||||||
params := url.Values{}
|
torrents []Torrent
|
||||||
params.Set("page", fmt.Sprintf("%d", page))
|
err error
|
||||||
params.Set("limit", fmt.Sprintf("%d", limit))
|
count int
|
||||||
if active {
|
|
||||||
params.Set("filter", "active")
|
|
||||||
}
|
|
||||||
|
|
||||||
reqURL := baseURL + "?" + params.Encode()
|
|
||||||
|
|
||||||
req, err := http.NewRequest("GET", reqURL, nil)
|
|
||||||
if err != nil {
|
|
||||||
rd.log.Errorf("Error when creating a get torrents request: %v", err)
|
|
||||||
return nil, 0, err
|
|
||||||
}
|
|
||||||
|
|
||||||
resp, err := rd.client.Do(req)
|
|
||||||
if err != nil {
|
|
||||||
rd.log.Errorf("Error when executing the get torrents request: %v", err)
|
|
||||||
return nil, 0, err
|
|
||||||
}
|
|
||||||
defer resp.Body.Close()
|
|
||||||
|
|
||||||
if resp.StatusCode == http.StatusNoContent {
|
|
||||||
return allTorrents, 0, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// if status code is not 2xx, return erro
|
|
||||||
|
|
||||||
var torrents []Torrent
|
|
||||||
decoder := json.NewDecoder(resp.Body)
|
|
||||||
err = decoder.Decode(&torrents)
|
|
||||||
if err != nil {
|
|
||||||
rd.log.Errorf("Error when decoding get torrents JSON: %v", err)
|
|
||||||
return nil, 0, err
|
|
||||||
}
|
|
||||||
|
|
||||||
allTorrents = append(allTorrents, torrents...)
|
|
||||||
|
|
||||||
totalCountHeader := resp.Header.Get("x-total-count")
|
|
||||||
totalCount, err = strconv.Atoi(totalCountHeader)
|
|
||||||
if err != nil {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(allTorrents) >= totalCount || (customLimit != 0 && customLimit <= len(allTorrents) && customLimit <= totalCount) {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
rd.log.Debugf("Got %d torrents (page %d), total count is %d", len(allTorrents), page, totalCount)
|
|
||||||
|
|
||||||
page++
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for {
|
||||||
|
results := make(chan fetchResult, 4) // Channel to collect results from goroutines
|
||||||
|
for i := 0; i < GET_PARALLEL; i++ { // Launch GET_PARALLEL concurrent fetches
|
||||||
|
go func(p int) {
|
||||||
|
params := url.Values{}
|
||||||
|
params.Set("page", fmt.Sprintf("%d", p))
|
||||||
|
params.Set("limit", fmt.Sprintf("%d", limit))
|
||||||
|
if active {
|
||||||
|
params.Set("filter", "active")
|
||||||
|
}
|
||||||
|
|
||||||
|
reqURL := baseURL + "?" + params.Encode()
|
||||||
|
req, err := http.NewRequest("GET", reqURL, nil)
|
||||||
|
if err != nil {
|
||||||
|
results <- fetchResult{nil, err, 0}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
resp, err := rd.apiClient.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
results <- fetchResult{nil, err, 0}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
if resp.StatusCode == http.StatusNoContent {
|
||||||
|
results <- fetchResult{nil, nil, 0}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
var torrents []Torrent
|
||||||
|
decoder := json.NewDecoder(resp.Body)
|
||||||
|
err = decoder.Decode(&torrents)
|
||||||
|
if err != nil {
|
||||||
|
results <- fetchResult{nil, err, 0}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
countHeader := resp.Header.Get("x-total-count")
|
||||||
|
count, _ := strconv.Atoi(countHeader) // In real use, handle this error
|
||||||
|
|
||||||
|
results <- fetchResult{torrents, nil, count}
|
||||||
|
}(page + i)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Collect results from all goroutines
|
||||||
|
for i := 0; i < GET_PARALLEL; i++ {
|
||||||
|
result := <-results
|
||||||
|
if result.err != nil {
|
||||||
|
return nil, 0, result.err
|
||||||
|
}
|
||||||
|
allTorrents = append(allTorrents, result.torrents...)
|
||||||
|
if totalCount == 0 { // Set totalCount from the first successful fetch
|
||||||
|
totalCount = result.count
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Increment page by GET_PARALLEL for the next iteration of GET_PARALLEL concurrent fetches
|
||||||
|
page += GET_PARALLEL
|
||||||
|
|
||||||
|
// Break loop if all torrents fetched or the limit is reached
|
||||||
|
if len(allTorrents) >= totalCount || (customLimit != 0 && len(allTorrents) >= customLimit) {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return allTorrents, totalCount, nil
|
return allTorrents, totalCount, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -191,7 +212,7 @@ func (rd *RealDebrid) GetTorrentInfo(id string) (*TorrentInfo, error) {
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
resp, err := rd.client.Do(req)
|
resp, err := rd.apiClient.Do(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
rd.log.Errorf("Error when executing the get info request: %v", err)
|
rd.log.Errorf("Error when executing the get info request: %v", err)
|
||||||
return nil, err
|
return nil, err
|
||||||
@@ -230,7 +251,7 @@ func (rd *RealDebrid) SelectTorrentFiles(id string, files string) error {
|
|||||||
|
|
||||||
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
|
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
|
||||||
|
|
||||||
resp, err := rd.client.Do(req)
|
resp, err := rd.apiClient.Do(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
rd.log.Errorf("Error when executing the select files request: %v", err)
|
rd.log.Errorf("Error when executing the select files request: %v", err)
|
||||||
return err
|
return err
|
||||||
@@ -252,7 +273,7 @@ func (rd *RealDebrid) DeleteTorrent(id string) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Send the request
|
// Send the request
|
||||||
resp, err := rd.client.Do(req)
|
resp, err := rd.apiClient.Do(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
rd.log.Errorf("Error when executing the delete torrent request: %v", err)
|
rd.log.Errorf("Error when executing the delete torrent request: %v", err)
|
||||||
return err
|
return err
|
||||||
@@ -281,7 +302,7 @@ func (rd *RealDebrid) AddMagnetHash(magnet string) (*MagnetResponse, error) {
|
|||||||
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
|
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
|
||||||
|
|
||||||
// Send the request
|
// Send the request
|
||||||
resp, err := rd.client.Do(req)
|
resp, err := rd.apiClient.Do(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
rd.log.Errorf("Error when executing the add magnet request: %v", err)
|
rd.log.Errorf("Error when executing the add magnet request: %v", err)
|
||||||
return nil, err
|
return nil, err
|
||||||
@@ -310,7 +331,7 @@ func (rd *RealDebrid) GetActiveTorrentCount() (*ActiveTorrentCountResponse, erro
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Send the request
|
// Send the request
|
||||||
resp, err := rd.client.Do(req)
|
resp, err := rd.apiClient.Do(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
rd.log.Errorf("Error when executing the active torrents request: %v", err)
|
rd.log.Errorf("Error when executing the active torrents request: %v", err)
|
||||||
return nil, err
|
return nil, err
|
||||||
@@ -347,7 +368,7 @@ func (rd *RealDebrid) GetDownloads(page, offset int) ([]Download, int, error) {
|
|||||||
return nil, 0, err
|
return nil, 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
resp, err := rd.client.Do(req)
|
resp, err := rd.apiClient.Do(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
rd.log.Errorf("Error when executing the get downloads request: %v", err)
|
rd.log.Errorf("Error when executing the get downloads request: %v", err)
|
||||||
return nil, 0, err
|
return nil, 0, err
|
||||||
@@ -391,7 +412,7 @@ func (rd *RealDebrid) GetUserInformation() (*User, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Send the request
|
// Send the request
|
||||||
resp, err := rd.client.Do(req)
|
resp, err := rd.apiClient.Do(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
rd.log.Errorf("Error when executing the user information request: %v", err)
|
rd.log.Errorf("Error when executing the user information request: %v", err)
|
||||||
return nil, err
|
return nil, err
|
||||||
@@ -422,7 +443,7 @@ func (rd *RealDebrid) AvailabilityCheck(hashes []string) (AvailabilityResponse,
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
resp, err := rd.client.Do(req)
|
resp, err := rd.apiClient.Do(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user