Optimize torrent fetching
This commit is contained in:
174
pkg/realdebrid/torrents.go
Normal file
174
pkg/realdebrid/torrents.go
Normal file
@@ -0,0 +1,174 @@
|
||||
package realdebrid
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strconv"
|
||||
)
|
||||
|
||||
type fetchTorrentsResult struct {
|
||||
torrents []Torrent
|
||||
page int
|
||||
total int
|
||||
err error
|
||||
}
|
||||
|
||||
// GetTorrents returns all torrents, paginated
|
||||
func (rd *RealDebrid) GetTorrents(onlyOne bool) ([]Torrent, int, error) {
|
||||
result := rd.fetchPageOfTorrents(1, 1)
|
||||
if result.err != nil {
|
||||
return nil, 0, result.err
|
||||
}
|
||||
|
||||
if onlyOne {
|
||||
return result.torrents, result.total, nil
|
||||
}
|
||||
|
||||
allTorrents := []Torrent{}
|
||||
page := 1
|
||||
pageSize := 250
|
||||
|
||||
maxPages := (result.total + pageSize - 1) / pageSize
|
||||
rd.log.Debugf("Torrents total count is %d", result.total)
|
||||
maxParallelThreads := 4
|
||||
if maxPages < maxParallelThreads {
|
||||
maxParallelThreads = maxPages
|
||||
}
|
||||
for {
|
||||
allResults := make(chan fetchTorrentsResult, maxParallelThreads) // Channel to collect results from goroutines
|
||||
for i := 0; i < maxParallelThreads; i++ { // Launch GET_PARALLEL concurrent fetches
|
||||
idx := i
|
||||
rd.workerPool.Submit(func() {
|
||||
if page+idx > maxPages {
|
||||
allResults <- fetchTorrentsResult{
|
||||
torrents: nil,
|
||||
page: page + idx,
|
||||
total: result.total,
|
||||
err: nil,
|
||||
}
|
||||
return
|
||||
}
|
||||
allResults <- rd.fetchPageOfTorrents(page+idx, pageSize)
|
||||
})
|
||||
}
|
||||
// Collect results from all goroutines
|
||||
buffer := make([][]Torrent, maxParallelThreads)
|
||||
for i := 0; i < maxParallelThreads; i++ {
|
||||
result := <-allResults
|
||||
bufferIdx := (result.page - 1) % maxParallelThreads
|
||||
buffer[bufferIdx] = []Torrent{}
|
||||
if result.err != nil {
|
||||
rd.log.Warnf("Ignoring error when fetching torrents pg %d: %v", result.page, result.err)
|
||||
continue
|
||||
}
|
||||
buffer[bufferIdx] = append(buffer[bufferIdx], result.torrents...)
|
||||
}
|
||||
for bIdx, batch := range buffer {
|
||||
for tIdx, torrent := range batch {
|
||||
for cIdx, cached := range rd.torrentsCache {
|
||||
tIdxEnd := indexFromEnd(tIdx, page+bIdx, pageSize, result.total)
|
||||
cIdxEnd := len(rd.torrentsCache) - 1 - cIdx
|
||||
if torrent.ID == cached.ID && tIdxEnd == cIdxEnd {
|
||||
allTorrents = append(allTorrents, batch[:tIdx]...)
|
||||
allTorrents = append(allTorrents, rd.torrentsCache[cIdx:]...)
|
||||
rd.log.Debugf("Fresh %d, cached %d", len(batch[:tIdx]), len(rd.torrentsCache[cIdx:]))
|
||||
rd.log.Debugf("Got %d/%d torrents", len(allTorrents), result.total)
|
||||
rd.torrentsCache = allTorrents
|
||||
return allTorrents, len(allTorrents), nil
|
||||
}
|
||||
}
|
||||
}
|
||||
allTorrents = append(allTorrents, batch...)
|
||||
}
|
||||
|
||||
rd.log.Debugf("Got %d/%d torrents", len(allTorrents), result.total)
|
||||
|
||||
if len(allTorrents) >= result.total || page >= maxPages {
|
||||
break
|
||||
}
|
||||
|
||||
page += maxParallelThreads
|
||||
}
|
||||
|
||||
rd.torrentsCache = allTorrents
|
||||
return allTorrents, len(allTorrents), nil
|
||||
}
|
||||
|
||||
func (rd *RealDebrid) fetchPageOfTorrents(page, limit int) fetchTorrentsResult {
|
||||
baseURL := "https://api.real-debrid.com/rest/1.0/torrents"
|
||||
|
||||
params := url.Values{}
|
||||
params.Set("page", fmt.Sprintf("%d", page))
|
||||
params.Set("limit", fmt.Sprintf("%d", limit))
|
||||
|
||||
reqURL := baseURL + "?" + params.Encode()
|
||||
req, err := http.NewRequest("GET", reqURL, nil)
|
||||
if err != nil {
|
||||
rd.log.Errorf("Error when creating a get torrents request: %v", err)
|
||||
return fetchTorrentsResult{
|
||||
torrents: nil,
|
||||
page: page,
|
||||
total: 0,
|
||||
err: err,
|
||||
}
|
||||
}
|
||||
|
||||
resp, err := rd.apiClient.Do(req)
|
||||
if err != nil {
|
||||
rd.log.Errorf("Error when executing the get torrents request: %v", err)
|
||||
return fetchTorrentsResult{
|
||||
torrents: nil,
|
||||
page: page,
|
||||
total: 0,
|
||||
err: err,
|
||||
}
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode == http.StatusNoContent {
|
||||
return fetchTorrentsResult{
|
||||
torrents: []Torrent{},
|
||||
page: page,
|
||||
total: 0,
|
||||
err: nil,
|
||||
}
|
||||
}
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
err := fmt.Errorf("unexpected status code: %d", resp.StatusCode)
|
||||
rd.log.Errorf("Error when executing the get torrents request: %v", err)
|
||||
return fetchTorrentsResult{
|
||||
torrents: nil,
|
||||
page: page,
|
||||
total: 0,
|
||||
err: err,
|
||||
}
|
||||
}
|
||||
|
||||
totalCountHeader := resp.Header.Get("x-total-count")
|
||||
totalCount, err := strconv.Atoi(totalCountHeader)
|
||||
if err != nil {
|
||||
totalCount = 0
|
||||
}
|
||||
|
||||
var torrents []Torrent
|
||||
decoder := json.NewDecoder(resp.Body)
|
||||
err = decoder.Decode(&torrents)
|
||||
if err != nil {
|
||||
rd.log.Errorf("Error when decoding the body of get torrents response: %v", err)
|
||||
return fetchTorrentsResult{
|
||||
torrents: nil,
|
||||
page: page,
|
||||
total: 0,
|
||||
err: err,
|
||||
}
|
||||
}
|
||||
|
||||
return fetchTorrentsResult{
|
||||
torrents: torrents,
|
||||
page: page,
|
||||
total: totalCount,
|
||||
err: nil,
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user