diff --git a/pkg/realdebrid/api.go b/pkg/realdebrid/api.go index 02d024e..9298fdd 100644 --- a/pkg/realdebrid/api.go +++ b/pkg/realdebrid/api.go @@ -5,7 +5,6 @@ import ( "io" "net/http" "net/url" - "strconv" "strings" "github.com/debridmediamanager/zurg/internal/config" @@ -15,6 +14,7 @@ import ( ) type RealDebrid struct { + torrentsCache []Torrent apiClient *zurghttp.HTTPClient unrestrictClient *zurghttp.HTTPClient downloadClient *zurghttp.HTTPClient @@ -25,6 +25,7 @@ type RealDebrid struct { func NewRealDebrid(apiClient, unrestrictClient, downloadClient *zurghttp.HTTPClient, workerPool *ants.Pool, cfg config.ConfigInterface, log *logutil.Logger) *RealDebrid { return &RealDebrid{ + torrentsCache: []Torrent{}, apiClient: apiClient, unrestrictClient: unrestrictClient, downloadClient: downloadClient, @@ -118,104 +119,6 @@ func (rd *RealDebrid) UnrestrictLink(link string, checkFirstByte bool) (*Downloa return &response, nil } -type getTorrentsResult struct { - torrents []Torrent - err error - totalCount int -} - -func (rd *RealDebrid) getPageOfTorrents(page, limit int) getTorrentsResult { - baseURL := "https://api.real-debrid.com/rest/1.0/torrents" - - params := url.Values{} - params.Set("page", fmt.Sprintf("%d", page)) - params.Set("limit", fmt.Sprintf("%d", limit)) - - reqURL := baseURL + "?" + params.Encode() - req, err := http.NewRequest("GET", reqURL, nil) - if err != nil { - return getTorrentsResult{nil, err, 0} - } - - resp, err := rd.apiClient.Do(req) - if err != nil { - return getTorrentsResult{nil, err, 0} - } - defer resp.Body.Close() - - if resp.StatusCode == http.StatusNoContent { - return getTorrentsResult{nil, nil, 0} - } - - var torrents []Torrent - decoder := json.NewDecoder(resp.Body) - err = decoder.Decode(&torrents) - if err != nil { - return getTorrentsResult{nil, err, 0} - } - - countHeader := resp.Header.Get("x-total-count") - count, _ := strconv.Atoi(countHeader) // In real use, handle this error - - return getTorrentsResult{torrents, nil, count} -} - -func (rd *RealDebrid) GetTorrents(onlyOne bool) ([]Torrent, int, error) { - var allTorrents []Torrent - - // fetch 1 to get total count - result := rd.getPageOfTorrents(1, 1) - allTorrents = append(allTorrents, result.torrents...) - totalCount := result.totalCount - - if onlyOne { - return allTorrents, totalCount, nil - } - - // reset allTorrents - allTorrents = []Torrent{} - page := 1 - // compute ceiling of totalCount / limit - maxPages := (totalCount + 250 - 1) / 250 - rd.log.Debugf("Torrents total count is %d", totalCount) - maxParallelThreads := 4 - if maxPages < maxParallelThreads { - maxParallelThreads = maxPages - } - for { - allResults := make(chan getTorrentsResult, maxParallelThreads) // Channel to collect results from goroutines - for i := 0; i < maxParallelThreads; i++ { // Launch GET_PARALLEL concurrent fetches - idx := i - rd.workerPool.Submit(func() { - if page > maxPages { - allResults <- getTorrentsResult{nil, nil, 0} - return - } - allResults <- rd.getPageOfTorrents(page+idx, 250) - }) - } - // Collect results from all goroutines - for i := 0; i < maxParallelThreads; i++ { - res := <-allResults - if res.err != nil { - rd.log.Warnf("Ignoring error when fetching torrents: %v", res.err) - continue - } - allTorrents = append(allTorrents, res.torrents...) - } - - rd.log.Debugf("Got %d/%d torrents", len(allTorrents), totalCount) - - if len(allTorrents) >= totalCount || page >= maxPages { - break - } - - page += maxParallelThreads - } - - return allTorrents, totalCount, nil -} - func (rd *RealDebrid) GetTorrentInfo(id string) (*TorrentInfo, error) { url := "https://api.real-debrid.com/rest/1.0/torrents/info/" + id @@ -360,120 +263,6 @@ func (rd *RealDebrid) GetActiveTorrentCount() (*ActiveTorrentCountResponse, erro return &response, nil } -// GetDownloads returns all torrents, paginated -func (rd *RealDebrid) GetDownloads() []Download { - _, totalCount, err := rd.fetchPageOfDownloads(1, 1) - if err != nil { - return nil - } - - // reset allDownloads - allDownloads := []Download{} - page := 1 - limit := 250 - - // compute ceiling of totalCount / limit - maxPages := (totalCount + limit - 1) / limit - rd.log.Debugf("Total downloads count is %d", totalCount) - maxParallelThreads := 4 - if maxPages < maxParallelThreads { - maxParallelThreads = maxPages - } - for { - allResults := make(chan []Download, maxParallelThreads) // Channel to collect results from goroutines - errChan := make(chan error, maxParallelThreads) // Channel to collect errors from goroutines - for i := 0; i < maxParallelThreads; i++ { // Launch GET_PARALLEL concurrent fetches - idx := i - rd.workerPool.Submit(func() { - if page+idx > maxPages { - allResults <- nil - errChan <- nil - return - } - result, _, err := rd.fetchPageOfDownloads(page+idx, limit) - if err != nil { - allResults <- nil - errChan <- err - return - } - allResults <- result - errChan <- nil - }) - } - // Collect results from all goroutines - for i := 0; i < maxParallelThreads; i++ { - res := <-allResults - err := <-errChan - if err != nil { - rd.log.Warnf("Ignoring error when fetching downloads: %v", err) - continue - } - allDownloads = append(allDownloads, res...) - } - - rd.log.Debugf("Got %d/%d downloads", len(allDownloads), totalCount) - - if len(allDownloads) >= totalCount || page >= maxPages { - break - } - - page += maxParallelThreads - } - - return allDownloads -} - -func (rd *RealDebrid) fetchPageOfDownloads(page, limit int) ([]Download, int, error) { - baseURL := "https://api.real-debrid.com/rest/1.0/downloads" - var downloads []Download - totalCount := 0 - - params := url.Values{} - params.Set("page", fmt.Sprintf("%d", page)) - params.Set("limit", fmt.Sprintf("%d", limit)) - // params.Set("filter", "active") - - reqURL := baseURL + "?" + params.Encode() - - req, err := http.NewRequest("GET", reqURL, nil) - if err != nil { - rd.log.Errorf("Error when creating a get downloads request: %v", err) - return nil, 0, err - } - - resp, err := rd.apiClient.Do(req) - if err != nil { - rd.log.Errorf("Error when executing the get downloads request: %v", err) - return nil, 0, err - } - defer resp.Body.Close() - - if resp.StatusCode == http.StatusNoContent { - return downloads, 0, nil - } - - if resp.StatusCode != http.StatusOK { - err := fmt.Errorf("unexpected status code: %d", resp.StatusCode) - rd.log.Errorf("Error when executing the get downloads request: %v", err) - return nil, 0, err - } - - decoder := json.NewDecoder(resp.Body) - err = decoder.Decode(&downloads) - if err != nil { - rd.log.Errorf("Error when decoding get downloads JSON: %v", err) - return nil, 0, err - } - - totalCountHeader := resp.Header.Get("x-total-count") - totalCount, err = strconv.Atoi(totalCountHeader) - if err != nil { - totalCount = 0 - } - - return downloads, totalCount, nil -} - // GetUserInformation gets the current user information. func (rd *RealDebrid) GetUserInformation() (*User, error) { // Construct request URL diff --git a/pkg/realdebrid/downloads.go b/pkg/realdebrid/downloads.go new file mode 100644 index 0000000..c8026b7 --- /dev/null +++ b/pkg/realdebrid/downloads.go @@ -0,0 +1,151 @@ +package realdebrid + +import ( + "fmt" + "net/http" + "net/url" + "strconv" +) + +type fetchDownloadsResult struct { + downloads []Download + page int + total int + err error +} + +// GetDownloads returns all torrents, paginated +func (rd *RealDebrid) GetDownloads() []Download { + result := rd.fetchPageOfDownloads(1, 1) + if result.err != nil { + return nil + } + + allDownloads := []Download{} + page := 1 + pageSize := 250 + + maxPages := (result.total + pageSize - 1) / pageSize + rd.log.Debugf("Total downloads count is %d", result.total) + maxParallelThreads := 4 + if maxPages < maxParallelThreads { + maxParallelThreads = maxPages + } + for { + allResults := make(chan fetchDownloadsResult, maxParallelThreads) // Channel to collect results from goroutines + for i := 0; i < maxParallelThreads; i++ { // Launch GET_PARALLEL concurrent fetches + idx := i + rd.workerPool.Submit(func() { + if page+idx > maxPages { + allResults <- fetchDownloadsResult{ + downloads: nil, + page: page + idx, + total: result.total, + err: nil, + } + return + } + result := rd.fetchPageOfDownloads(page+idx, pageSize) + allResults <- result + }) + } + // Collect results from all goroutines + for i := 0; i < maxParallelThreads; i++ { + result := <-allResults + if result.err != nil { + rd.log.Warnf("Encountered an error when fetching downloads pg %d: %v", result.page, result.err) + continue + } + allDownloads = append(allDownloads, result.downloads...) + } + + rd.log.Debugf("Got %d/%d downloads", len(allDownloads), result.total) + + if len(allDownloads) >= result.total || page >= maxPages { + break + } + + page += maxParallelThreads + } + + return allDownloads +} + +func (rd *RealDebrid) fetchPageOfDownloads(page, limit int) fetchDownloadsResult { + baseURL := "https://api.real-debrid.com/rest/1.0/downloads" + + params := url.Values{} + params.Set("page", fmt.Sprintf("%d", page)) + params.Set("limit", fmt.Sprintf("%d", limit)) + + reqURL := baseURL + "?" + params.Encode() + + req, err := http.NewRequest("GET", reqURL, nil) + if err != nil { + rd.log.Errorf("Error when creating a get downloads request: %v", err) + return fetchDownloadsResult{ + downloads: nil, + page: page, + total: 0, + err: err, + } + } + + resp, err := rd.apiClient.Do(req) + if err != nil { + rd.log.Errorf("Error when executing the get downloads request: %v", err) + return fetchDownloadsResult{ + downloads: nil, + page: page, + total: 0, + err: err, + } + } + defer resp.Body.Close() + + if resp.StatusCode == http.StatusNoContent { + return fetchDownloadsResult{ + downloads: []Download{}, + page: page, + total: 0, + err: nil, + } + } + + if resp.StatusCode != http.StatusOK { + err := fmt.Errorf("unexpected status code: %d", resp.StatusCode) + rd.log.Errorf("Error when executing the get downloads request: %v", err) + return fetchDownloadsResult{ + downloads: nil, + page: page, + total: 0, + err: err, + } + } + + totalCountHeader := resp.Header.Get("x-total-count") + totalCount, err := strconv.Atoi(totalCountHeader) + if err != nil { + totalCount = 0 + } + + var downloads []Download + decoder := json.NewDecoder(resp.Body) + err = decoder.Decode(&downloads) + if err != nil { + rd.log.Errorf("Error when decoding get downloads JSON: %v", err) + return fetchDownloadsResult{ + downloads: nil, + page: page, + total: totalCount, + err: err, + } + } + + return fetchDownloadsResult{ + downloads: downloads, + page: page, + total: totalCount, + err: nil, + } +} diff --git a/pkg/realdebrid/torrents.go b/pkg/realdebrid/torrents.go new file mode 100644 index 0000000..d9cab66 --- /dev/null +++ b/pkg/realdebrid/torrents.go @@ -0,0 +1,174 @@ +package realdebrid + +import ( + "fmt" + "net/http" + "net/url" + "strconv" +) + +type fetchTorrentsResult struct { + torrents []Torrent + page int + total int + err error +} + +// GetTorrents returns all torrents, paginated +func (rd *RealDebrid) GetTorrents(onlyOne bool) ([]Torrent, int, error) { + result := rd.fetchPageOfTorrents(1, 1) + if result.err != nil { + return nil, 0, result.err + } + + if onlyOne { + return result.torrents, result.total, nil + } + + allTorrents := []Torrent{} + page := 1 + pageSize := 250 + + maxPages := (result.total + pageSize - 1) / pageSize + rd.log.Debugf("Torrents total count is %d", result.total) + maxParallelThreads := 4 + if maxPages < maxParallelThreads { + maxParallelThreads = maxPages + } + for { + allResults := make(chan fetchTorrentsResult, maxParallelThreads) // Channel to collect results from goroutines + for i := 0; i < maxParallelThreads; i++ { // Launch GET_PARALLEL concurrent fetches + idx := i + rd.workerPool.Submit(func() { + if page+idx > maxPages { + allResults <- fetchTorrentsResult{ + torrents: nil, + page: page + idx, + total: result.total, + err: nil, + } + return + } + allResults <- rd.fetchPageOfTorrents(page+idx, pageSize) + }) + } + // Collect results from all goroutines + buffer := make([][]Torrent, maxParallelThreads) + for i := 0; i < maxParallelThreads; i++ { + result := <-allResults + bufferIdx := (result.page - 1) % maxParallelThreads + buffer[bufferIdx] = []Torrent{} + if result.err != nil { + rd.log.Warnf("Ignoring error when fetching torrents pg %d: %v", result.page, result.err) + continue + } + buffer[bufferIdx] = append(buffer[bufferIdx], result.torrents...) + } + for bIdx, batch := range buffer { + for tIdx, torrent := range batch { + for cIdx, cached := range rd.torrentsCache { + tIdxEnd := indexFromEnd(tIdx, page+bIdx, pageSize, result.total) + cIdxEnd := len(rd.torrentsCache) - 1 - cIdx + if torrent.ID == cached.ID && tIdxEnd == cIdxEnd { + allTorrents = append(allTorrents, batch[:tIdx]...) + allTorrents = append(allTorrents, rd.torrentsCache[cIdx:]...) + rd.log.Debugf("Fresh %d, cached %d", len(batch[:tIdx]), len(rd.torrentsCache[cIdx:])) + rd.log.Debugf("Got %d/%d torrents", len(allTorrents), result.total) + rd.torrentsCache = allTorrents + return allTorrents, len(allTorrents), nil + } + } + } + allTorrents = append(allTorrents, batch...) + } + + rd.log.Debugf("Got %d/%d torrents", len(allTorrents), result.total) + + if len(allTorrents) >= result.total || page >= maxPages { + break + } + + page += maxParallelThreads + } + + rd.torrentsCache = allTorrents + return allTorrents, len(allTorrents), nil +} + +func (rd *RealDebrid) fetchPageOfTorrents(page, limit int) fetchTorrentsResult { + baseURL := "https://api.real-debrid.com/rest/1.0/torrents" + + params := url.Values{} + params.Set("page", fmt.Sprintf("%d", page)) + params.Set("limit", fmt.Sprintf("%d", limit)) + + reqURL := baseURL + "?" + params.Encode() + req, err := http.NewRequest("GET", reqURL, nil) + if err != nil { + rd.log.Errorf("Error when creating a get torrents request: %v", err) + return fetchTorrentsResult{ + torrents: nil, + page: page, + total: 0, + err: err, + } + } + + resp, err := rd.apiClient.Do(req) + if err != nil { + rd.log.Errorf("Error when executing the get torrents request: %v", err) + return fetchTorrentsResult{ + torrents: nil, + page: page, + total: 0, + err: err, + } + } + defer resp.Body.Close() + + if resp.StatusCode == http.StatusNoContent { + return fetchTorrentsResult{ + torrents: []Torrent{}, + page: page, + total: 0, + err: nil, + } + } + + if resp.StatusCode != http.StatusOK { + err := fmt.Errorf("unexpected status code: %d", resp.StatusCode) + rd.log.Errorf("Error when executing the get torrents request: %v", err) + return fetchTorrentsResult{ + torrents: nil, + page: page, + total: 0, + err: err, + } + } + + totalCountHeader := resp.Header.Get("x-total-count") + totalCount, err := strconv.Atoi(totalCountHeader) + if err != nil { + totalCount = 0 + } + + var torrents []Torrent + decoder := json.NewDecoder(resp.Body) + err = decoder.Decode(&torrents) + if err != nil { + rd.log.Errorf("Error when decoding the body of get torrents response: %v", err) + return fetchTorrentsResult{ + torrents: nil, + page: page, + total: 0, + err: err, + } + } + + return fetchTorrentsResult{ + torrents: torrents, + page: page, + total: totalCount, + err: nil, + } +} diff --git a/pkg/realdebrid/types.go b/pkg/realdebrid/types.go index 9fff189..afa1230 100644 --- a/pkg/realdebrid/types.go +++ b/pkg/realdebrid/types.go @@ -16,6 +16,7 @@ type FileJSON struct { } type Download struct { + ID string `json:"id"` Filename string `json:"filename"` Filesize int64 `json:"filesize"` // bytes, 0 if unknown Link string `json:"link"` // Original link diff --git a/pkg/realdebrid/util.go b/pkg/realdebrid/util.go new file mode 100644 index 0000000..93ff173 --- /dev/null +++ b/pkg/realdebrid/util.go @@ -0,0 +1,14 @@ +package realdebrid + +func indexFromEnd(subIndex int, pageNumber int, pageSize int, totalElements int) int { + // Adjust pageNumber for 1-based index + adjustedPageNumber := pageNumber - 1 + + // Calculate the overall index in the array + overallIndex := (adjustedPageNumber * pageSize) + subIndex + + // Calculate the index from the end + indexFromEnd := totalElements - 1 - overallIndex + + return indexFromEnd +}