diff --git a/internal/torrent/refresh.go b/internal/torrent/refresh.go index d51078d..05a952e 100644 --- a/internal/torrent/refresh.go +++ b/internal/torrent/refresh.go @@ -20,6 +20,9 @@ func (t *TorrentManager) refreshTorrents(initialRun bool) { instances, _, err := t.rd.GetTorrents(false) if err != nil { t.log.Warnf("Cannot get torrents: %v", err) + t.log.Info("Retrying in 5 seconds") + time.Sleep(5 * time.Second) + t.refreshTorrents(initialRun) return } diff --git a/pkg/realdebrid/torrents.go b/pkg/realdebrid/torrents.go index 0c4a9cd..55c156a 100644 --- a/pkg/realdebrid/torrents.go +++ b/pkg/realdebrid/torrents.go @@ -16,96 +16,64 @@ type fetchTorrentsResult struct { // GetTorrents returns all torrents, paginated func (rd *RealDebrid) GetTorrents(onlyOne bool) ([]Torrent, int, error) { - result := rd.fetchPageOfTorrents(1, 1) - if result.err != nil { - return nil, 0, result.err - } - - totalElements := result.total - if onlyOne { - return result.torrents, totalElements, nil + result := rd.fetchPageOfTorrents(1, 1) + if result.err != nil { + return nil, 0, result.err + } + return result.torrents, result.total, nil } - allTorrents := []Torrent{} - page := 1 - pageSize := 500 + pageSize := 250 - maxPages := (totalElements + pageSize - 1) / pageSize - rd.log.Debugf("Torrents total count is %d", totalElements) - maxParallelThreads := 4 - if maxPages < maxParallelThreads { - maxParallelThreads = maxPages + firstPage := rd.fetchPageOfTorrents(1, pageSize) + if firstPage.err != nil { + return nil, 0, firstPage.err } - found := -1 - for { - allResults := make(chan fetchTorrentsResult, maxParallelThreads) // Channel to collect results from goroutines - for i := 0; i < maxParallelThreads; i++ { // Launch GET_PARALLEL concurrent fetches - idx := i - rd.workerPool.Submit(func() { - if page+idx > maxPages { - allResults <- fetchTorrentsResult{ - torrents: nil, - page: page + idx, - total: totalElements, - err: nil, - } - return - } - allResults <- rd.fetchPageOfTorrents(page+idx, pageSize) - }) - } - batches := make([][]Torrent, maxParallelThreads) - for i := 0; i < maxParallelThreads; i++ { - result := <-allResults - if result.err != nil { - rd.log.Warnf("Ignoring error when fetching torrents pg %d: %v", result.page, result.err) - return nil, 0, result.err + + totalCount := firstPage.total + rd.log.Debugf("Torrents total count is %d", totalCount) + + for cIdx, cached := range rd.torrentsCache { // N cached torrents + for fIdx, fresh := range firstPage.torrents { // 250 torrents in batch + cIdxEnd := len(rd.torrentsCache) - 1 - cIdx + fIdxEnd := totalCount - 1 - fIdx + if fresh.ID == cached.ID && fresh.Progress == cached.Progress && fIdxEnd == cIdxEnd { + allTorrents := firstPage.torrents[:fIdx] + allTorrents = append(allTorrents, rd.torrentsCache[cIdx:]...) + return allTorrents, len(allTorrents), nil } - bIdx := (result.page - 1) % maxParallelThreads - batches[bIdx] = []Torrent{} - batches[bIdx] = append(batches[bIdx], result.torrents...) - } - for bIdx, batch := range batches { // 4 batches - if found < 0 && len(batch) > 0 { - cachedCount := len(rd.torrentsCache) - for cIdx, cached := range rd.torrentsCache { // N cached torrents - cIdxEnd := cachedCount - 1 - cIdx - for tIdx, torrent := range batch { // 250 torrents in batch - tIdxEnd := indexFromEnd(tIdx, page+bIdx, pageSize, totalElements) - if torrent.ID == cached.ID && torrent.Progress == cached.Progress && tIdxEnd == cIdxEnd { - found = ((page + bIdx - 1) * pageSize) + tIdx - break - } - } - if found >= 0 { - break - } - } - } - allTorrents = append(allTorrents, batch...) - } - if found >= 0 { - tIdx := found % pageSize - pageNum := (found / pageSize) + 1 - tIdxEnd := indexFromEnd(tIdx, pageNum, pageSize, totalElements) - cIdx := len(rd.torrentsCache) - 1 - tIdxEnd - last := len(allTorrents) - 1 - cIdx += last - found + 1 - allTorrents = append(allTorrents, rd.torrentsCache[cIdx:]...) } + } - rd.log.Debugf("Got %d/%d torrents", len(allTorrents), totalElements) + // Fetch all torrents in parallel - if len(allTorrents) >= totalElements || page >= maxPages { - break + maxPages := (totalCount + pageSize - 1) / pageSize + resultsCh := make(chan fetchTorrentsResult, maxPages) + for pageIdx := 2; pageIdx <= maxPages; pageIdx++ { + pageNum := pageIdx + rd.workerPool.Submit(func() { + resultsCh <- rd.fetchPageOfTorrents(pageNum, pageSize) + }) + } + totalFetched := len(firstPage.torrents) + torrentPages := make([][]Torrent, maxPages) + for i := 2; i <= maxPages; i++ { + result := <-resultsCh + if result.err != nil { + return nil, 0, result.err } - - page += maxParallelThreads + torrentPages[result.page-1] = result.torrents + totalFetched += len(result.torrents) + rd.log.Debugf("Got %d/%d torrents", totalFetched, result.total) + } + allTorrents := firstPage.torrents + for _, page := range torrentPages { + allTorrents = append(allTorrents, page...) } rd.torrentsCache = allTorrents - return allTorrents, len(allTorrents), nil + return rd.torrentsCache, len(rd.torrentsCache), nil } func (rd *RealDebrid) fetchPageOfTorrents(page, limit int) fetchTorrentsResult {