Optimize library fetching mechanism
This commit is contained in:
@@ -20,6 +20,9 @@ func (t *TorrentManager) refreshTorrents(initialRun bool) {
|
|||||||
instances, _, err := t.rd.GetTorrents(false)
|
instances, _, err := t.rd.GetTorrents(false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.log.Warnf("Cannot get torrents: %v", err)
|
t.log.Warnf("Cannot get torrents: %v", err)
|
||||||
|
t.log.Info("Retrying in 5 seconds")
|
||||||
|
time.Sleep(5 * time.Second)
|
||||||
|
t.refreshTorrents(initialRun)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -16,96 +16,64 @@ type fetchTorrentsResult struct {
|
|||||||
|
|
||||||
// GetTorrents returns all torrents, paginated
|
// GetTorrents returns all torrents, paginated
|
||||||
func (rd *RealDebrid) GetTorrents(onlyOne bool) ([]Torrent, int, error) {
|
func (rd *RealDebrid) GetTorrents(onlyOne bool) ([]Torrent, int, error) {
|
||||||
result := rd.fetchPageOfTorrents(1, 1)
|
|
||||||
if result.err != nil {
|
|
||||||
return nil, 0, result.err
|
|
||||||
}
|
|
||||||
|
|
||||||
totalElements := result.total
|
|
||||||
|
|
||||||
if onlyOne {
|
if onlyOne {
|
||||||
return result.torrents, totalElements, nil
|
result := rd.fetchPageOfTorrents(1, 1)
|
||||||
|
if result.err != nil {
|
||||||
|
return nil, 0, result.err
|
||||||
|
}
|
||||||
|
return result.torrents, result.total, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
allTorrents := []Torrent{}
|
pageSize := 250
|
||||||
page := 1
|
|
||||||
pageSize := 500
|
|
||||||
|
|
||||||
maxPages := (totalElements + pageSize - 1) / pageSize
|
firstPage := rd.fetchPageOfTorrents(1, pageSize)
|
||||||
rd.log.Debugf("Torrents total count is %d", totalElements)
|
if firstPage.err != nil {
|
||||||
maxParallelThreads := 4
|
return nil, 0, firstPage.err
|
||||||
if maxPages < maxParallelThreads {
|
|
||||||
maxParallelThreads = maxPages
|
|
||||||
}
|
}
|
||||||
found := -1
|
|
||||||
for {
|
totalCount := firstPage.total
|
||||||
allResults := make(chan fetchTorrentsResult, maxParallelThreads) // Channel to collect results from goroutines
|
rd.log.Debugf("Torrents total count is %d", totalCount)
|
||||||
for i := 0; i < maxParallelThreads; i++ { // Launch GET_PARALLEL concurrent fetches
|
|
||||||
idx := i
|
for cIdx, cached := range rd.torrentsCache { // N cached torrents
|
||||||
rd.workerPool.Submit(func() {
|
for fIdx, fresh := range firstPage.torrents { // 250 torrents in batch
|
||||||
if page+idx > maxPages {
|
cIdxEnd := len(rd.torrentsCache) - 1 - cIdx
|
||||||
allResults <- fetchTorrentsResult{
|
fIdxEnd := totalCount - 1 - fIdx
|
||||||
torrents: nil,
|
if fresh.ID == cached.ID && fresh.Progress == cached.Progress && fIdxEnd == cIdxEnd {
|
||||||
page: page + idx,
|
allTorrents := firstPage.torrents[:fIdx]
|
||||||
total: totalElements,
|
allTorrents = append(allTorrents, rd.torrentsCache[cIdx:]...)
|
||||||
err: nil,
|
return allTorrents, len(allTorrents), nil
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
allResults <- rd.fetchPageOfTorrents(page+idx, pageSize)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
batches := make([][]Torrent, maxParallelThreads)
|
|
||||||
for i := 0; i < maxParallelThreads; i++ {
|
|
||||||
result := <-allResults
|
|
||||||
if result.err != nil {
|
|
||||||
rd.log.Warnf("Ignoring error when fetching torrents pg %d: %v", result.page, result.err)
|
|
||||||
return nil, 0, result.err
|
|
||||||
}
|
}
|
||||||
bIdx := (result.page - 1) % maxParallelThreads
|
|
||||||
batches[bIdx] = []Torrent{}
|
|
||||||
batches[bIdx] = append(batches[bIdx], result.torrents...)
|
|
||||||
}
|
|
||||||
for bIdx, batch := range batches { // 4 batches
|
|
||||||
if found < 0 && len(batch) > 0 {
|
|
||||||
cachedCount := len(rd.torrentsCache)
|
|
||||||
for cIdx, cached := range rd.torrentsCache { // N cached torrents
|
|
||||||
cIdxEnd := cachedCount - 1 - cIdx
|
|
||||||
for tIdx, torrent := range batch { // 250 torrents in batch
|
|
||||||
tIdxEnd := indexFromEnd(tIdx, page+bIdx, pageSize, totalElements)
|
|
||||||
if torrent.ID == cached.ID && torrent.Progress == cached.Progress && tIdxEnd == cIdxEnd {
|
|
||||||
found = ((page + bIdx - 1) * pageSize) + tIdx
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if found >= 0 {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
allTorrents = append(allTorrents, batch...)
|
|
||||||
}
|
|
||||||
if found >= 0 {
|
|
||||||
tIdx := found % pageSize
|
|
||||||
pageNum := (found / pageSize) + 1
|
|
||||||
tIdxEnd := indexFromEnd(tIdx, pageNum, pageSize, totalElements)
|
|
||||||
cIdx := len(rd.torrentsCache) - 1 - tIdxEnd
|
|
||||||
last := len(allTorrents) - 1
|
|
||||||
cIdx += last - found + 1
|
|
||||||
allTorrents = append(allTorrents, rd.torrentsCache[cIdx:]...)
|
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
rd.log.Debugf("Got %d/%d torrents", len(allTorrents), totalElements)
|
// Fetch all torrents in parallel
|
||||||
|
|
||||||
if len(allTorrents) >= totalElements || page >= maxPages {
|
maxPages := (totalCount + pageSize - 1) / pageSize
|
||||||
break
|
resultsCh := make(chan fetchTorrentsResult, maxPages)
|
||||||
|
for pageIdx := 2; pageIdx <= maxPages; pageIdx++ {
|
||||||
|
pageNum := pageIdx
|
||||||
|
rd.workerPool.Submit(func() {
|
||||||
|
resultsCh <- rd.fetchPageOfTorrents(pageNum, pageSize)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
totalFetched := len(firstPage.torrents)
|
||||||
|
torrentPages := make([][]Torrent, maxPages)
|
||||||
|
for i := 2; i <= maxPages; i++ {
|
||||||
|
result := <-resultsCh
|
||||||
|
if result.err != nil {
|
||||||
|
return nil, 0, result.err
|
||||||
}
|
}
|
||||||
|
torrentPages[result.page-1] = result.torrents
|
||||||
page += maxParallelThreads
|
totalFetched += len(result.torrents)
|
||||||
|
rd.log.Debugf("Got %d/%d torrents", totalFetched, result.total)
|
||||||
|
}
|
||||||
|
allTorrents := firstPage.torrents
|
||||||
|
for _, page := range torrentPages {
|
||||||
|
allTorrents = append(allTorrents, page...)
|
||||||
}
|
}
|
||||||
|
|
||||||
rd.torrentsCache = allTorrents
|
rd.torrentsCache = allTorrents
|
||||||
return allTorrents, len(allTorrents), nil
|
return rd.torrentsCache, len(rd.torrentsCache), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rd *RealDebrid) fetchPageOfTorrents(page, limit int) fetchTorrentsResult {
|
func (rd *RealDebrid) fetchPageOfTorrents(page, limit int) fetchTorrentsResult {
|
||||||
|
|||||||
Reference in New Issue
Block a user