Add download cache again
This commit is contained in:
@@ -30,8 +30,9 @@ type TorrentManager struct {
|
|||||||
log *logutil.Logger
|
log *logutil.Logger
|
||||||
repairLog *logutil.Logger
|
repairLog *logutil.Logger
|
||||||
|
|
||||||
DirectoryMap cmap.ConcurrentMap[string, cmap.ConcurrentMap[string, *Torrent]] // directory -> accessKey -> Torrent
|
DirectoryMap cmap.ConcurrentMap[string, cmap.ConcurrentMap[string, *Torrent]] // directory -> accessKey -> Torrent
|
||||||
DownloadMap cmap.ConcurrentMap[string, *realdebrid.Download]
|
DownloadMap cmap.ConcurrentMap[string, *realdebrid.Download]
|
||||||
|
DownloadCache cmap.ConcurrentMap[string, *realdebrid.Download]
|
||||||
|
|
||||||
RootNode *fs.FileNode
|
RootNode *fs.FileNode
|
||||||
|
|
||||||
@@ -63,8 +64,9 @@ func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, w
|
|||||||
log: log,
|
log: log,
|
||||||
repairLog: repairLog,
|
repairLog: repairLog,
|
||||||
|
|
||||||
DirectoryMap: cmap.New[cmap.ConcurrentMap[string, *Torrent]](),
|
DirectoryMap: cmap.New[cmap.ConcurrentMap[string, *Torrent]](),
|
||||||
DownloadMap: cmap.New[*realdebrid.Download](),
|
DownloadMap: cmap.New[*realdebrid.Download](),
|
||||||
|
DownloadCache: cmap.New[*realdebrid.Download](),
|
||||||
|
|
||||||
RootNode: fs.NewFileNode("root", true),
|
RootNode: fs.NewFileNode("root", true),
|
||||||
|
|
||||||
@@ -94,15 +96,18 @@ func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, w
|
|||||||
|
|
||||||
// proxy function
|
// proxy function
|
||||||
func (t *TorrentManager) UnrestrictLinkUntilOk(link string) *realdebrid.Download {
|
func (t *TorrentManager) UnrestrictLinkUntilOk(link string) *realdebrid.Download {
|
||||||
|
if strings.HasPrefix(link, "https://real-debrid.com/d/") && t.DownloadCache.Has(link[0:39]) {
|
||||||
|
ret, _ := t.DownloadCache.Get(link[0:39])
|
||||||
|
return ret
|
||||||
|
}
|
||||||
ret, err := t.api.UnrestrictLink(link, t.Config.ShouldServeFromRclone())
|
ret, err := t.api.UnrestrictLink(link, t.Config.ShouldServeFromRclone())
|
||||||
|
t.DownloadCache.Set(ret.Link[0:39], ret)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.log.Warnf("Cannot unrestrict link %s: %v", link, err)
|
t.log.Warnf("Cannot unrestrict link %s: %v", link, err)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
if ret != nil && ret.Link != "" && ret.Filename != "" {
|
if t.Config.EnableDownloadMount() {
|
||||||
if t.Config.EnableDownloadMount() {
|
t.DownloadMap.Set(ret.Filename, ret)
|
||||||
t.DownloadMap.Set(ret.Filename, ret)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return ret
|
return ret
|
||||||
}
|
}
|
||||||
@@ -272,26 +277,20 @@ func (t *TorrentManager) mountDownloads() {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
t.DownloadMap.Clear()
|
t.DownloadMap.Clear()
|
||||||
|
t.DownloadCache.Clear()
|
||||||
_ = t.workerPool.Submit(func() {
|
_ = t.workerPool.Submit(func() {
|
||||||
page := 1
|
downloads, totalDownloads, err := t.api.GetDownloads()
|
||||||
offset := 0
|
if err != nil {
|
||||||
for {
|
t.log.Errorf("Cannot get downloads: %v", err)
|
||||||
downloads, totalDownloads, err := t.api.GetDownloads(page, offset)
|
}
|
||||||
if err != nil {
|
t.log.Debugf("Got %d downloads", totalDownloads)
|
||||||
// if we get an error, we just stop
|
for i := range downloads {
|
||||||
t.log.Warnf("Cannot get downloads on page %d: %v", page, err)
|
idx := i
|
||||||
continue
|
if strings.HasPrefix(downloads[idx].Link, "https://real-debrid.com/d/") {
|
||||||
}
|
t.DownloadCache.Set(downloads[idx].Link[0:39], &downloads[idx])
|
||||||
for i := range downloads {
|
}
|
||||||
t.DownloadMap.Set(downloads[i].Filename, &downloads[i])
|
t.DownloadMap.Set(downloads[idx].Filename, &downloads[idx])
|
||||||
}
|
|
||||||
offset += len(downloads)
|
|
||||||
page++
|
|
||||||
if offset >= totalDownloads {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
t.log.Infof("Compiled into %d downloads", t.DownloadMap.Count())
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -356,9 +356,73 @@ func (rd *RealDebrid) GetActiveTorrentCount() (*ActiveTorrentCountResponse, erro
|
|||||||
}
|
}
|
||||||
|
|
||||||
// GetDownloads returns all torrents, paginated
|
// GetDownloads returns all torrents, paginated
|
||||||
func (rd *RealDebrid) GetDownloads(page, offset int) ([]Download, int, error) {
|
func (rd *RealDebrid) GetDownloads() ([]Download, int, error) {
|
||||||
|
_, totalCount, err := rd.fetchPageOfDownloads(1, 0)
|
||||||
|
if err != nil {
|
||||||
|
return nil, 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// reset allDownloads
|
||||||
|
allDownloads := []Download{}
|
||||||
|
page := 1
|
||||||
|
offset := 0
|
||||||
|
limit := 100
|
||||||
|
|
||||||
|
// compute ceiling of totalCount / limit
|
||||||
|
maxPages := (totalCount + limit - 1) / limit
|
||||||
|
// rd.log.Debugf("Total count is %d, max pages is %d", totalCount, maxPages)
|
||||||
|
maxParallelThreads := 8
|
||||||
|
if maxPages < maxParallelThreads {
|
||||||
|
maxParallelThreads = maxPages
|
||||||
|
}
|
||||||
|
for {
|
||||||
|
allResults := make(chan []Download, maxParallelThreads) // Channel to collect results from goroutines
|
||||||
|
errChan := make(chan error, maxParallelThreads) // Channel to collect errors from goroutines
|
||||||
|
for i := 0; i < maxParallelThreads; i++ { // Launch GET_PARALLEL concurrent fetches
|
||||||
|
go func(add int) {
|
||||||
|
if page+add > maxPages {
|
||||||
|
allResults <- nil
|
||||||
|
errChan <- nil
|
||||||
|
return
|
||||||
|
}
|
||||||
|
result, _, err := rd.fetchPageOfDownloads(page+add, offset+add*limit)
|
||||||
|
if err != nil {
|
||||||
|
allResults <- nil
|
||||||
|
errChan <- err
|
||||||
|
return
|
||||||
|
}
|
||||||
|
allResults <- result
|
||||||
|
errChan <- nil
|
||||||
|
}(i)
|
||||||
|
}
|
||||||
|
// Collect results from all goroutines
|
||||||
|
for i := 0; i < maxParallelThreads; i++ {
|
||||||
|
res := <-allResults
|
||||||
|
err := <-errChan
|
||||||
|
if err != nil {
|
||||||
|
return nil, 0, err
|
||||||
|
}
|
||||||
|
allDownloads = append(allDownloads, res...)
|
||||||
|
}
|
||||||
|
|
||||||
|
// rd.log.Debugf("Got %d/%d downloads", len(allDownloads), totalCount)
|
||||||
|
|
||||||
|
if len(allDownloads) >= totalCount || page >= maxPages {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
page += maxParallelThreads
|
||||||
|
offset += maxParallelThreads * limit
|
||||||
|
}
|
||||||
|
|
||||||
|
rd.log.Debugf("Got %d downloads", len(allDownloads))
|
||||||
|
|
||||||
|
return allDownloads, totalCount, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (rd *RealDebrid) fetchPageOfDownloads(page, offset int) ([]Download, int, error) {
|
||||||
baseURL := "https://api.real-debrid.com/rest/1.0/downloads"
|
baseURL := "https://api.real-debrid.com/rest/1.0/downloads"
|
||||||
var allDownloads []Download
|
var downloads []Download
|
||||||
limit := 500
|
limit := 500
|
||||||
totalCount := 0
|
totalCount := 0
|
||||||
|
|
||||||
@@ -384,12 +448,15 @@ func (rd *RealDebrid) GetDownloads(page, offset int) ([]Download, int, error) {
|
|||||||
defer resp.Body.Close()
|
defer resp.Body.Close()
|
||||||
|
|
||||||
if resp.StatusCode == http.StatusNoContent {
|
if resp.StatusCode == http.StatusNoContent {
|
||||||
return allDownloads, 0, nil
|
return downloads, 0, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// if status code is not 2xx, return erro
|
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
|
||||||
|
err := fmt.Errorf("unexpected status code: %d", resp.StatusCode)
|
||||||
|
rd.log.Errorf("Error when executing the get downloads request: %v", err)
|
||||||
|
return nil, 0, err
|
||||||
|
}
|
||||||
|
|
||||||
var downloads []Download
|
|
||||||
decoder := json.NewDecoder(resp.Body)
|
decoder := json.NewDecoder(resp.Body)
|
||||||
err = decoder.Decode(&downloads)
|
err = decoder.Decode(&downloads)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -397,16 +464,13 @@ func (rd *RealDebrid) GetDownloads(page, offset int) ([]Download, int, error) {
|
|||||||
return nil, 0, err
|
return nil, 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
allDownloads = append(allDownloads, downloads...)
|
|
||||||
|
|
||||||
totalCountHeader := resp.Header.Get("x-total-count")
|
totalCountHeader := resp.Header.Get("x-total-count")
|
||||||
totalCount, err = strconv.Atoi(totalCountHeader)
|
totalCount, err = strconv.Atoi(totalCountHeader)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
totalCount = 0
|
totalCount = 0
|
||||||
}
|
}
|
||||||
|
|
||||||
rd.log.Debugf("Got %d downloads (page %d), total count is %d", len(allDownloads)+offset, page, totalCount)
|
return downloads, totalCount, nil
|
||||||
return allDownloads, totalCount, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetUserInformation gets the current user information.
|
// GetUserInformation gets the current user information.
|
||||||
|
|||||||
Reference in New Issue
Block a user