Cache for directories and torrents

This commit is contained in:
Ben Sarmiento
2023-11-30 03:16:10 +01:00
parent 253b92a3b6
commit 5914af80fd
6 changed files with 226 additions and 177 deletions

View File

@@ -5,6 +5,7 @@ import (
"encoding/gob"
"fmt"
"math"
"net/url"
"os"
"path/filepath"
"sort"
@@ -64,36 +65,35 @@ func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, p
t.DirectoryMap.Set(directory, cmap.New[*Torrent]())
}
var initWait sync.WaitGroup
// Fetch downloads
initWait.Add(1)
t.DownloadCache = cmap.New[*realdebrid.Download]()
_ = t.workerPool.Submit(func() {
defer initWait.Done()
downloads, _, err := t.Api.GetDownloads()
if err != nil {
t.log.Fatalf("Cannot get downloads: %v\n", err)
}
t.DownloadCache = cmap.New[*realdebrid.Download]()
for i := range downloads {
if !t.DownloadCache.Has(downloads[i].Link) {
t.DownloadCache.Set(downloads[i].Link, &downloads[i])
page := 1
offset := 0
for {
downloads, totalDownloads, err := t.Api.GetDownloads(page, offset)
if err != nil {
t.log.Fatalf("Cannot get downloads: %v\n", err)
}
for i := range downloads {
if !t.DownloadCache.Has(downloads[i].Link) {
t.DownloadCache.Set(downloads[i].Link, &downloads[i])
}
}
offset += len(downloads)
page++
if offset >= totalDownloads {
break
}
}
})
var newTorrents []realdebrid.Torrent
var err error
initWait.Add(1)
_ = t.workerPool.Submit(func() {
defer initWait.Done()
newTorrents, _, err = t.Api.GetTorrents(0)
if err != nil {
t.log.Fatalf("Cannot get torrents: %v\n", err)
}
})
initWait.Wait()
newTorrents, _, err = t.Api.GetTorrents(0)
if err != nil {
t.log.Fatalf("Cannot get torrents: %v\n", err)
}
t.log.Infof("Fetched %d downloads", t.DownloadCache.Count())
@@ -112,47 +112,34 @@ func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, p
t.log.Infof("Fetched info for %d torrents", len(newTorrents))
noInfoCount := 0
allCt := 0
allTorrents, _ := t.DirectoryMap.Get(INT_ALL)
for info := range torrentsChan {
allCt++
if info == nil {
for tor := range torrentsChan {
if tor == nil {
noInfoCount++
continue
}
if torrent, exists := allTorrents.Get(info.AccessKey); !exists {
allTorrents.Set(info.AccessKey, info)
if torrent, exists := allTorrents.Get(tor.AccessKey); !exists {
allTorrents.Set(tor.AccessKey, tor)
} else {
mainTorrent := t.mergeToMain(torrent, info)
allTorrents.Set(info.AccessKey, mainTorrent)
mainTorrent := t.mergeToMain(torrent, tor)
allTorrents.Set(tor.AccessKey, mainTorrent)
}
}
allTorrents.IterCb(func(accessKey string, torrent *Torrent) {
// get IDs
var torrentIDs []string
for _, instance := range torrent.Instances {
torrentIDs = append(torrentIDs, instance.ID)
}
// get filenames
filenames := torrent.SelectedFiles.Keys()
// Map torrents to directories
switch t.Config.GetVersion() {
case "v1":
configV1 := t.Config.(*config.ZurgConfigV1)
for _, directories := range configV1.GetGroupMap() {
for _, directory := range directories {
if t.Config.MeetsConditions(directory, torrent.AccessKey, torrentIDs, filenames) {
torrents, _ := t.DirectoryMap.Get(directory)
torrents.Set(accessKey, torrent)
break
}
}
}
}
allTorrents.IterCb(func(_ string, torrent *Torrent) {
dav, html := t.buildTorrentResponses(torrent)
t.AssignedDirectoryCb(torrent, func(directory string) {
torrents, _ := t.DirectoryMap.Get(directory)
torrents.Set(torrent.AccessKey, torrent)
// torrent responses
newHtml := strings.ReplaceAll(html, "$dir", directory)
t.ResponseCache.Set(directory+"/"+torrent.AccessKey+".html", &newHtml, 1)
newDav := strings.ReplaceAll(dav, "$dir", directory)
t.ResponseCache.Set(directory+"/"+torrent.AccessKey+".dav", &newDav, 1)
})
})
t.updateSortedKeys()
t.updateDirectoryResponsesCache()
t.log.Infof("Compiled into %d torrents, %d were missing info", allTorrents.Count(), noInfoCount)
@@ -218,6 +205,10 @@ func (t *TorrentManager) SetNewLatestState(checksum LibraryState) {
t.latestState.TotalCount = checksum.TotalCount
}
func (t *TorrentManager) ScheduleForRefresh() {
t.SetNewLatestState(EmptyState())
}
type torrentsResp struct {
torrents []realdebrid.Torrent
totalCount int
@@ -345,31 +336,20 @@ func (t *TorrentManager) startRefreshJob() {
var updatedPaths []string
newSet.IterCb(func(_ string, torrent *Torrent) {
// get IDs
var torrentIDs []string
for _, instance := range torrent.Instances {
torrentIDs = append(torrentIDs, instance.ID)
}
// get filenames
filenames := torrent.SelectedFiles.Keys()
// Map torrents to directories
switch t.Config.GetVersion() {
case "v1":
configV1 := t.Config.(*config.ZurgConfigV1)
for _, directories := range configV1.GetGroupMap() {
for _, directory := range directories {
if t.Config.MeetsConditions(directory, torrent.AccessKey, torrentIDs, filenames) {
torrents, _ := t.DirectoryMap.Get(directory)
torrents.Set(torrent.AccessKey, torrent)
if torrent.LatestAdded > t.latestState.FirstTorrent.Added {
updatedPaths = append(updatedPaths, fmt.Sprintf("%s/%s", directory, torrent.AccessKey))
}
break
}
}
dav, html := t.buildTorrentResponses(torrent)
t.AssignedDirectoryCb(torrent, func(directory string) {
torrents, _ := t.DirectoryMap.Get(directory)
torrents.Set(torrent.AccessKey, torrent)
if torrent.LatestAdded > t.latestState.FirstTorrent.Added {
updatedPaths = append(updatedPaths, fmt.Sprintf("%s/%s", directory, torrent.AccessKey))
}
}
// torrent responses
cacheKey := directory + "/" + torrent.AccessKey
newHtml := strings.ReplaceAll(html, "$dir", directory)
t.ResponseCache.Set(cacheKey+".html", &newHtml, 1)
newDav := strings.ReplaceAll(dav, "$dir", directory)
t.ResponseCache.Set(cacheKey+".dav", &newDav, 1)
})
})
// delete torrents that no longer exist
@@ -382,7 +362,7 @@ func (t *TorrentManager) startRefreshJob() {
t.log.Infof("Deleted torrent: %s\n", oldAccessKey)
}
}
t.updateSortedKeys()
t.updateDirectoryResponsesCache()
t.log.Infof("Compiled into %d torrents, %d were missing info", oldTorrents.Count(), noInfoCount)
@@ -698,7 +678,7 @@ func (t *TorrentManager) Repair(accessKey string) {
t.DirectoryMap.IterCb(func(_ string, torrents cmap.ConcurrentMap[string, *Torrent]) {
torrents.Remove(torrent.AccessKey)
})
t.SetNewLatestState(EmptyState())
t.ScheduleForRefresh()
// t.log.Debugf("You can try fixing it yourself magnet:?xt=urn:btih:%s", info.Hash)
return
} else if streamableCount == 1 {
@@ -707,7 +687,7 @@ func (t *TorrentManager) Repair(accessKey string) {
t.DirectoryMap.IterCb(func(_ string, torrents cmap.ConcurrentMap[string, *Torrent]) {
torrents.Remove(torrent.AccessKey)
})
t.SetNewLatestState(EmptyState())
t.ScheduleForRefresh()
return
}
// t.log.Debugf("Identified the expired files of torrent id=%s", info.ID)
@@ -877,7 +857,7 @@ func (t *TorrentManager) canCapacityHandle() bool {
}
}
func (t *TorrentManager) updateSortedKeys() {
func (t *TorrentManager) updateDirectoryResponsesCache() {
t.DirectoryMap.IterCb(func(directory string, torrents cmap.ConcurrentMap[string, *Torrent]) {
allKeys := torrents.Keys()
sort.Strings(allKeys)
@@ -893,10 +873,55 @@ func (t *TorrentManager) updateSortedKeys() {
}
}
cacheKey := directory
davRet = "<?xml version=\"1.0\" encoding=\"utf-8\"?><d:multistatus xmlns:d=\"DAV:\">" + dav.BaseDirectory(directory, "") + dav.BaseDirectory(directory, "") + davRet + "</d:multistatus>"
t.ResponseCache.Set(directory+".dav", davRet, 1)
t.ResponseCache.Set(cacheKey+".dav", &davRet, 1)
htmlRet = "<ol>" + htmlRet
t.ResponseCache.Set(directory+".html", "<ol>"+htmlRet, 1)
t.ResponseCache.Set(cacheKey+".html", &htmlRet, 1)
})
}
func (t *TorrentManager) buildTorrentResponses(tor *Torrent) (string, string) {
davRet := "<?xml version=\"1.0\" encoding=\"utf-8\"?><d:multistatus xmlns:d=\"DAV:\">" + dav.BaseDirectory(filepath.Join("$dir", tor.AccessKey), tor.LatestAdded)
htmlRet := "<ol>"
filenames := tor.SelectedFiles.Keys()
sort.Strings(filenames)
for _, filename := range filenames {
file, _ := tor.SelectedFiles.Get(filename)
if file == nil || !strings.HasPrefix(file.Link, "http") {
// will be caught by torrent manager's repairAll
// just skip it for now
continue
}
davRet += dav.File(filename, file.Bytes, file.Ended)
filePath := filepath.Join("$dir", tor.AccessKey, url.PathEscape(filename))
htmlRet += fmt.Sprintf("<li><a href=\"%s\">%s</a></li>", filePath, filename)
}
davRet += "</d:multistatus>"
return davRet, htmlRet
}
func (t *TorrentManager) AssignedDirectoryCb(tor *Torrent, cb func(string)) {
var torrentIDs []string
for _, instance := range tor.Instances {
torrentIDs = append(torrentIDs, instance.ID)
}
// get filenames needed for directory conditions
filenames := tor.SelectedFiles.Keys()
// Map torrents to directories
switch t.Config.GetVersion() {
case "v1":
configV1 := t.Config.(*config.ZurgConfigV1)
for _, directories := range configV1.GetGroupMap() {
for _, directory := range directories {
if t.Config.MeetsConditions(directory, tor.AccessKey, torrentIDs, filenames) {
cb(directory)
break
}
}
}
}
}