Refactor torrent fetching
This commit is contained in:
@@ -3,21 +3,19 @@ package torrent
|
|||||||
type LibraryState struct {
|
type LibraryState struct {
|
||||||
TotalCount int
|
TotalCount int
|
||||||
ActiveCount int
|
ActiveCount int
|
||||||
FirstActiveTorrentId string
|
|
||||||
FirstTorrentId string
|
FirstTorrentId string
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ls *LibraryState) Eq(a LibraryState) bool {
|
func (ls *LibraryState) Eq(a LibraryState) bool {
|
||||||
if ls.TotalCount == 0 || ls.FirstActiveTorrentId == "" || ls.FirstTorrentId == "" {
|
if ls.TotalCount == 0 || ls.FirstTorrentId == "" {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
return a.TotalCount == ls.TotalCount && a.ActiveCount == ls.ActiveCount && a.FirstActiveTorrentId == ls.FirstActiveTorrentId && a.FirstTorrentId == ls.FirstTorrentId
|
return a.TotalCount == ls.TotalCount && a.ActiveCount == ls.ActiveCount && a.FirstTorrentId == ls.FirstTorrentId
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *TorrentManager) setNewLatestState(checksum LibraryState) {
|
func (t *TorrentManager) setNewLatestState(checksum LibraryState) {
|
||||||
t.latestState.ActiveCount = checksum.ActiveCount
|
t.latestState.ActiveCount = checksum.ActiveCount
|
||||||
t.latestState.TotalCount = checksum.TotalCount
|
t.latestState.TotalCount = checksum.TotalCount
|
||||||
t.latestState.FirstActiveTorrentId = checksum.FirstActiveTorrentId
|
|
||||||
t.latestState.FirstTorrentId = checksum.FirstTorrentId
|
t.latestState.FirstTorrentId = checksum.FirstTorrentId
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -25,16 +23,7 @@ func (t *TorrentManager) setNewLatestState(checksum LibraryState) {
|
|||||||
func (t *TorrentManager) getCurrentState() LibraryState {
|
func (t *TorrentManager) getCurrentState() LibraryState {
|
||||||
var state LibraryState
|
var state LibraryState
|
||||||
|
|
||||||
activeTorrents, _, err := t.Api.GetTorrents(1, true)
|
torrents, totalCount, err := t.Api.GetTorrents(true)
|
||||||
if err != nil {
|
|
||||||
t.log.Errorf("Checksum API Error (GetActiveTorrents): %v", err)
|
|
||||||
return LibraryState{}
|
|
||||||
}
|
|
||||||
if len(activeTorrents) > 0 {
|
|
||||||
state.FirstActiveTorrentId = activeTorrents[0].ID
|
|
||||||
}
|
|
||||||
|
|
||||||
torrents, totalCount, err := t.Api.GetTorrents(1, true)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.log.Errorf("Checksum API Error (GetTorrents): %v", err)
|
t.log.Errorf("Checksum API Error (GetTorrents): %v", err)
|
||||||
return LibraryState{}
|
return LibraryState{}
|
||||||
|
|||||||
@@ -15,7 +15,7 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func (t *TorrentManager) refreshTorrents(isInitialRun bool) []string {
|
func (t *TorrentManager) refreshTorrents(isInitialRun bool) []string {
|
||||||
instances, _, err := t.Api.GetTorrents(0, false)
|
instances, _, err := t.Api.GetTorrents(false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.log.Warnf("Cannot get torrents: %v", err)
|
t.log.Warnf("Cannot get torrents: %v", err)
|
||||||
return nil
|
return nil
|
||||||
|
|||||||
@@ -262,7 +262,7 @@ func (t *TorrentManager) repair(torrent *Torrent) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (t *TorrentManager) assignUnassignedLinks(torrent *Torrent) bool {
|
func (t *TorrentManager) assignUnassignedLinks(torrent *Torrent) bool {
|
||||||
t.log.Infof("Trying to assign links to incomplete torrent %s", t.GetKey(torrent))
|
t.log.Infof("Trying to assign %d links to incomplete torrent %s", torrent.UnassignedLinks.Cardinality(), t.GetKey(torrent))
|
||||||
// handle torrents with incomplete links for selected files
|
// handle torrents with incomplete links for selected files
|
||||||
assignedCount := 0
|
assignedCount := 0
|
||||||
rarCount := 0
|
rarCount := 0
|
||||||
@@ -272,8 +272,7 @@ func (t *TorrentManager) assignUnassignedLinks(torrent *Torrent) bool {
|
|||||||
unrestrict := t.UnrestrictLinkUntilOk(link)
|
unrestrict := t.UnrestrictLinkUntilOk(link)
|
||||||
if unrestrict == nil {
|
if unrestrict == nil {
|
||||||
newUnassignedLinks.Set(link, nil)
|
newUnassignedLinks.Set(link, nil)
|
||||||
// return early, no point continuing
|
return false // next
|
||||||
return false
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// try to assign to a selected file
|
// try to assign to a selected file
|
||||||
|
|||||||
@@ -115,89 +115,88 @@ func (rd *RealDebrid) UnrestrictLink(link string, checkFirstByte bool) (*Downloa
|
|||||||
return &response, nil
|
return &response, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetTorrents returns all torrents, paginated
|
type getTorrentsResult struct {
|
||||||
// if customLimit is 0, the default limit of 500 is used
|
|
||||||
func (rd *RealDebrid) GetTorrents(customLimit int, active bool) ([]Torrent, int, error) {
|
|
||||||
const MAX_PARALLEL = 4
|
|
||||||
baseURL := "https://api.real-debrid.com/rest/1.0/torrents"
|
|
||||||
var allTorrents []Torrent
|
|
||||||
page := 1
|
|
||||||
limit := customLimit
|
|
||||||
if limit == 0 {
|
|
||||||
limit = rd.cfg.GetTorrentsCount()
|
|
||||||
}
|
|
||||||
totalCount := 0
|
|
||||||
|
|
||||||
type fetchResult struct {
|
|
||||||
torrents []Torrent
|
torrents []Torrent
|
||||||
err error
|
err error
|
||||||
count int
|
totalCount int
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (rd *RealDebrid) getPageOfTorrents(page, limit int) getTorrentsResult {
|
||||||
|
baseURL := "https://api.real-debrid.com/rest/1.0/torrents"
|
||||||
|
|
||||||
for {
|
|
||||||
results := make(chan fetchResult, 4) // Channel to collect results from goroutines
|
|
||||||
for i := 0; i < MAX_PARALLEL; i++ { // Launch GET_PARALLEL concurrent fetches
|
|
||||||
go func(p int) {
|
|
||||||
params := url.Values{}
|
params := url.Values{}
|
||||||
params.Set("page", fmt.Sprintf("%d", p))
|
params.Set("page", fmt.Sprintf("%d", page))
|
||||||
params.Set("limit", fmt.Sprintf("%d", limit))
|
params.Set("limit", fmt.Sprintf("%d", limit))
|
||||||
if active {
|
|
||||||
params.Set("filter", "active")
|
|
||||||
}
|
|
||||||
|
|
||||||
reqURL := baseURL + "?" + params.Encode()
|
reqURL := baseURL + "?" + params.Encode()
|
||||||
req, err := http.NewRequest("GET", reqURL, nil)
|
req, err := http.NewRequest("GET", reqURL, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
results <- fetchResult{nil, err, 0}
|
return getTorrentsResult{nil, err, 0}
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
resp, err := rd.apiClient.Do(req)
|
resp, err := rd.apiClient.Do(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
results <- fetchResult{nil, err, 0}
|
return getTorrentsResult{nil, err, 0}
|
||||||
return
|
|
||||||
}
|
}
|
||||||
defer resp.Body.Close()
|
defer resp.Body.Close()
|
||||||
|
|
||||||
if resp.StatusCode == http.StatusNoContent {
|
if resp.StatusCode == http.StatusNoContent {
|
||||||
results <- fetchResult{nil, nil, 0}
|
return getTorrentsResult{nil, nil, 0}
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
var torrents []Torrent
|
var torrents []Torrent
|
||||||
decoder := json.NewDecoder(resp.Body)
|
decoder := json.NewDecoder(resp.Body)
|
||||||
err = decoder.Decode(&torrents)
|
err = decoder.Decode(&torrents)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
results <- fetchResult{nil, err, 0}
|
return getTorrentsResult{nil, err, 0}
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
countHeader := resp.Header.Get("x-total-count")
|
countHeader := resp.Header.Get("x-total-count")
|
||||||
count, _ := strconv.Atoi(countHeader) // In real use, handle this error
|
count, _ := strconv.Atoi(countHeader) // In real use, handle this error
|
||||||
|
|
||||||
results <- fetchResult{torrents, nil, count}
|
return getTorrentsResult{torrents, nil, count}
|
||||||
}(page + i)
|
}
|
||||||
|
|
||||||
|
func (rd *RealDebrid) GetTorrents(onlyOne bool) ([]Torrent, int, error) {
|
||||||
|
var allTorrents []Torrent
|
||||||
|
|
||||||
|
// fetch 1 to get total count
|
||||||
|
result := rd.getPageOfTorrents(1, 1)
|
||||||
|
allTorrents = append(allTorrents, result.torrents...)
|
||||||
|
totalCount := result.totalCount
|
||||||
|
|
||||||
|
if onlyOne || totalCount == len(allTorrents) {
|
||||||
|
return allTorrents, totalCount, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const MAX_PARALLEL = 4
|
||||||
|
page := 1
|
||||||
|
maxPages := totalCount / rd.cfg.GetTorrentsCount()
|
||||||
|
for {
|
||||||
|
allResults := make(chan getTorrentsResult, MAX_PARALLEL) // Channel to collect results from goroutines
|
||||||
|
for i := 0; i < MAX_PARALLEL; i++ { // Launch GET_PARALLEL concurrent fetches
|
||||||
|
go func(add int) {
|
||||||
|
if page > maxPages {
|
||||||
|
allResults <- getTorrentsResult{nil, nil, 0}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
allResults <- rd.getPageOfTorrents(page+add, rd.cfg.GetTorrentsCount())
|
||||||
|
}(i)
|
||||||
|
}
|
||||||
// Collect results from all goroutines
|
// Collect results from all goroutines
|
||||||
for i := 0; i < MAX_PARALLEL; i++ {
|
for i := 0; i < MAX_PARALLEL; i++ {
|
||||||
result := <-results
|
res := <-allResults
|
||||||
if result.err != nil {
|
if res.err != nil {
|
||||||
return nil, 0, result.err
|
return nil, 0, res.err
|
||||||
}
|
|
||||||
allTorrents = append(allTorrents, result.torrents...)
|
|
||||||
if totalCount == 0 { // Set totalCount from the first successful fetch
|
|
||||||
totalCount = result.count
|
|
||||||
}
|
}
|
||||||
|
allTorrents = append(allTorrents, res.torrents...)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Increment page by GET_PARALLEL for the next iteration of GET_PARALLEL concurrent fetches
|
if len(allTorrents) >= totalCount {
|
||||||
page += MAX_PARALLEL
|
|
||||||
|
|
||||||
// Break loop if all torrents fetched or the limit is reached
|
|
||||||
if len(allTorrents) >= totalCount || (customLimit != 0 && len(allTorrents) >= customLimit) {
|
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
|
page += MAX_PARALLEL
|
||||||
}
|
}
|
||||||
|
|
||||||
return allTorrents, totalCount, nil
|
return allTorrents, totalCount, nil
|
||||||
|
|||||||
Reference in New Issue
Block a user