From 5914af80fdde1f13ded21ed5f9bf72117035c648 Mon Sep 17 00:00:00 2001 From: Ben Sarmiento Date: Thu, 30 Nov 2023 03:16:10 +0100 Subject: [PATCH] Cache for directories and torrents --- internal/dav/delete.go | 2 +- internal/dav/listing.go | 53 ++++++---- internal/http/listing.go | 63 ++++++++---- internal/torrent/manager.go | 195 ++++++++++++++++++++---------------- internal/universal/get.go | 6 +- pkg/realdebrid/api.go | 84 +++++++--------- 6 files changed, 226 insertions(+), 177 deletions(-) diff --git a/internal/dav/delete.go b/internal/dav/delete.go index de5c1e7..b7846d0 100644 --- a/internal/dav/delete.go +++ b/internal/dav/delete.go @@ -83,7 +83,7 @@ func handleDeleteFile(w http.ResponseWriter, segments []string, t *torrent.Torre } file.Link = "unselect" - t.SetNewLatestState(torrent.EmptyState()) + t.ScheduleForRefresh() w.WriteHeader(http.StatusNoContent) return nil } diff --git a/internal/dav/listing.go b/internal/dav/listing.go index 37aa51e..7ba4830 100644 --- a/internal/dav/listing.go +++ b/internal/dav/listing.go @@ -4,6 +4,7 @@ import ( "fmt" "net/http" "path" + "path/filepath" "sort" "strings" @@ -74,18 +75,27 @@ func handleListTorrents(w http.ResponseWriter, requestPath string, t *torrent.To return nil, fmt.Errorf("cannot find directory %s", basePath) } - resp, _ := t.ResponseCache.Get(basePath + ".dav") - davDoc := resp.(string) - - return &davDoc, nil + if resp, ok := t.ResponseCache.Get(basePath + ".dav"); !ok { + davDoc := "" + davDoc += dav.Directory("", "") + directories := t.DirectoryMap.Keys() + sort.Strings(directories) + for _, directory := range directories { + davDoc += dav.Directory(directory, "") + } + davDoc += "" + return &davDoc, nil + } else { + davDoc := resp.(*string) + return davDoc, nil + } } func handleListFiles(w http.ResponseWriter, requestPath string, t *torrent.TorrentManager) (*string, error) { - requestPath = strings.Trim(requestPath, "/") - basePath := path.Base(path.Dir(requestPath)) - torrents, ok := t.DirectoryMap.Get(basePath) + directory := path.Base(path.Dir(requestPath)) + torrents, ok := t.DirectoryMap.Get(directory) if !ok { - return nil, fmt.Errorf("cannot find directory %s", basePath) + return nil, fmt.Errorf("cannot find directory %s", directory) } accessKey := path.Base(requestPath) tor, ok := torrents.Get(accessKey) @@ -93,18 +103,21 @@ func handleListFiles(w http.ResponseWriter, requestPath string, t *torrent.Torre return nil, fmt.Errorf("cannot find torrent %s", accessKey) } - davDoc := "" + dav.BaseDirectory(requestPath, tor.LatestAdded) - - filenames := tor.SelectedFiles.Keys() - sort.Strings(filenames) - for _, filename := range filenames { - file, _ := tor.SelectedFiles.Get(filename) - if file == nil || !strings.HasPrefix(file.Link, "http") { - continue + if resp, ok := t.ResponseCache.Get(directory + "/" + accessKey + ".dav"); !ok { + davDoc := "" + dav.BaseDirectory(filepath.Join(directory, tor.AccessKey), tor.LatestAdded) + filenames := tor.SelectedFiles.Keys() + sort.Strings(filenames) + for _, filename := range filenames { + file, _ := tor.SelectedFiles.Get(filename) + if file == nil || !strings.HasPrefix(file.Link, "http") { + continue + } + davDoc += dav.File(filename, file.Bytes, file.Ended) } - davDoc += dav.File(filename, file.Bytes, file.Ended) + davDoc += "" + return &davDoc, nil + } else { + davDoc := resp.(*string) + return davDoc, nil } - - davDoc += "" - return &davDoc, nil } diff --git a/internal/http/listing.go b/internal/http/listing.go index c1df15d..6eb5d0f 100644 --- a/internal/http/listing.go +++ b/internal/http/listing.go @@ -65,23 +65,39 @@ func handleRoot(t *torrent.TorrentManager) (*string, error) { } func handleListOfTorrents(requestPath string, t *torrent.TorrentManager) (*string, error) { - basePath := path.Base(requestPath) - _, ok := t.DirectoryMap.Get(basePath) + directory := path.Base(requestPath) + torrents, ok := t.DirectoryMap.Get(directory) if !ok { - return nil, fmt.Errorf("cannot find directory %s", basePath) + return nil, fmt.Errorf("cannot find directory %s", directory) } - resp, _ := t.ResponseCache.Get(basePath + ".html") - htmlDoc := resp.(string) - - return &htmlDoc, nil + if resp, ok := t.ResponseCache.Get(directory + ".html"); !ok { + htmlDoc := "
    " + var allTorrents []*torrent.Torrent + torrents.IterCb(func(_ string, tor *torrent.Torrent) { + if tor.AllInProgress() { + return + } + allTorrents = append(allTorrents, tor) + }) + sort.Slice(allTorrents, func(i, j int) bool { + return allTorrents[i].AccessKey < allTorrents[j].AccessKey + }) + for _, tor := range allTorrents { + htmlDoc = htmlDoc + fmt.Sprintf("
  1. %s
  2. ", filepath.Join(requestPath, url.PathEscape(tor.AccessKey)), tor.AccessKey) + } + return &htmlDoc, nil + } else { + htmlDoc := resp.(*string) + return htmlDoc, nil + } } func handleSingleTorrent(requestPath string, t *torrent.TorrentManager) (*string, error) { - basePath := path.Base(path.Dir(requestPath)) - torrents, ok := t.DirectoryMap.Get(basePath) + directory := path.Base(path.Dir(requestPath)) + torrents, ok := t.DirectoryMap.Get(directory) if !ok { - return nil, fmt.Errorf("cannot find directory %s", basePath) + return nil, fmt.Errorf("cannot find directory %s", directory) } accessKey := path.Base(requestPath) tor, ok := torrents.Get(accessKey) @@ -89,18 +105,21 @@ func handleSingleTorrent(requestPath string, t *torrent.TorrentManager) (*string return nil, fmt.Errorf("cannot find torrent %s", accessKey) } - htmlDoc := "
      " - filenames := tor.SelectedFiles.Keys() - sort.Strings(filenames) - for _, filename := range filenames { - file, _ := tor.SelectedFiles.Get(filename) - if file == nil || !strings.HasPrefix(file.Link, "http") { - // will be caught by torrent manager's repairAll - // just skip it for now - continue + if resp, ok := t.ResponseCache.Get(directory + "/" + accessKey + ".html"); !ok { + htmlDoc := "
        " + filenames := tor.SelectedFiles.Keys() + sort.Strings(filenames) + for _, filename := range filenames { + file, _ := tor.SelectedFiles.Get(filename) + if file == nil || !strings.HasPrefix(file.Link, "http") { + continue + } + filePath := filepath.Join(requestPath, url.PathEscape(filename)) + htmlDoc += fmt.Sprintf("
      1. %s
      2. ", filePath, filename) } - filePath := filepath.Join(requestPath, url.PathEscape(filename)) - htmlDoc += fmt.Sprintf("
      3. %s
      4. ", filePath, filename) + return &htmlDoc, nil + } else { + htmlDoc := resp.(*string) + return htmlDoc, nil } - return &htmlDoc, nil } diff --git a/internal/torrent/manager.go b/internal/torrent/manager.go index 0495b39..cac60b7 100644 --- a/internal/torrent/manager.go +++ b/internal/torrent/manager.go @@ -5,6 +5,7 @@ import ( "encoding/gob" "fmt" "math" + "net/url" "os" "path/filepath" "sort" @@ -64,36 +65,35 @@ func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, p t.DirectoryMap.Set(directory, cmap.New[*Torrent]()) } - var initWait sync.WaitGroup - // Fetch downloads - initWait.Add(1) + t.DownloadCache = cmap.New[*realdebrid.Download]() _ = t.workerPool.Submit(func() { - defer initWait.Done() - downloads, _, err := t.Api.GetDownloads() - if err != nil { - t.log.Fatalf("Cannot get downloads: %v\n", err) - } - t.DownloadCache = cmap.New[*realdebrid.Download]() - for i := range downloads { - if !t.DownloadCache.Has(downloads[i].Link) { - t.DownloadCache.Set(downloads[i].Link, &downloads[i]) + page := 1 + offset := 0 + for { + downloads, totalDownloads, err := t.Api.GetDownloads(page, offset) + if err != nil { + t.log.Fatalf("Cannot get downloads: %v\n", err) + } + for i := range downloads { + if !t.DownloadCache.Has(downloads[i].Link) { + t.DownloadCache.Set(downloads[i].Link, &downloads[i]) + } + } + offset += len(downloads) + page++ + if offset >= totalDownloads { + break } } }) var newTorrents []realdebrid.Torrent var err error - initWait.Add(1) - _ = t.workerPool.Submit(func() { - defer initWait.Done() - newTorrents, _, err = t.Api.GetTorrents(0) - if err != nil { - t.log.Fatalf("Cannot get torrents: %v\n", err) - } - }) - - initWait.Wait() + newTorrents, _, err = t.Api.GetTorrents(0) + if err != nil { + t.log.Fatalf("Cannot get torrents: %v\n", err) + } t.log.Infof("Fetched %d downloads", t.DownloadCache.Count()) @@ -112,47 +112,34 @@ func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, p t.log.Infof("Fetched info for %d torrents", len(newTorrents)) noInfoCount := 0 - allCt := 0 allTorrents, _ := t.DirectoryMap.Get(INT_ALL) - for info := range torrentsChan { - allCt++ - if info == nil { + for tor := range torrentsChan { + if tor == nil { noInfoCount++ continue } - if torrent, exists := allTorrents.Get(info.AccessKey); !exists { - allTorrents.Set(info.AccessKey, info) + if torrent, exists := allTorrents.Get(tor.AccessKey); !exists { + allTorrents.Set(tor.AccessKey, tor) } else { - mainTorrent := t.mergeToMain(torrent, info) - allTorrents.Set(info.AccessKey, mainTorrent) + mainTorrent := t.mergeToMain(torrent, tor) + allTorrents.Set(tor.AccessKey, mainTorrent) } } - allTorrents.IterCb(func(accessKey string, torrent *Torrent) { - // get IDs - var torrentIDs []string - for _, instance := range torrent.Instances { - torrentIDs = append(torrentIDs, instance.ID) - } - - // get filenames - filenames := torrent.SelectedFiles.Keys() - // Map torrents to directories - switch t.Config.GetVersion() { - case "v1": - configV1 := t.Config.(*config.ZurgConfigV1) - for _, directories := range configV1.GetGroupMap() { - for _, directory := range directories { - if t.Config.MeetsConditions(directory, torrent.AccessKey, torrentIDs, filenames) { - torrents, _ := t.DirectoryMap.Get(directory) - torrents.Set(accessKey, torrent) - break - } - } - } - } + allTorrents.IterCb(func(_ string, torrent *Torrent) { + dav, html := t.buildTorrentResponses(torrent) + t.AssignedDirectoryCb(torrent, func(directory string) { + torrents, _ := t.DirectoryMap.Get(directory) + torrents.Set(torrent.AccessKey, torrent) + // torrent responses + newHtml := strings.ReplaceAll(html, "$dir", directory) + t.ResponseCache.Set(directory+"/"+torrent.AccessKey+".html", &newHtml, 1) + newDav := strings.ReplaceAll(dav, "$dir", directory) + t.ResponseCache.Set(directory+"/"+torrent.AccessKey+".dav", &newDav, 1) + }) }) - t.updateSortedKeys() + + t.updateDirectoryResponsesCache() t.log.Infof("Compiled into %d torrents, %d were missing info", allTorrents.Count(), noInfoCount) @@ -218,6 +205,10 @@ func (t *TorrentManager) SetNewLatestState(checksum LibraryState) { t.latestState.TotalCount = checksum.TotalCount } +func (t *TorrentManager) ScheduleForRefresh() { + t.SetNewLatestState(EmptyState()) +} + type torrentsResp struct { torrents []realdebrid.Torrent totalCount int @@ -345,31 +336,20 @@ func (t *TorrentManager) startRefreshJob() { var updatedPaths []string newSet.IterCb(func(_ string, torrent *Torrent) { - // get IDs - var torrentIDs []string - for _, instance := range torrent.Instances { - torrentIDs = append(torrentIDs, instance.ID) - } - - // get filenames - filenames := torrent.SelectedFiles.Keys() - // Map torrents to directories - switch t.Config.GetVersion() { - case "v1": - configV1 := t.Config.(*config.ZurgConfigV1) - for _, directories := range configV1.GetGroupMap() { - for _, directory := range directories { - if t.Config.MeetsConditions(directory, torrent.AccessKey, torrentIDs, filenames) { - torrents, _ := t.DirectoryMap.Get(directory) - torrents.Set(torrent.AccessKey, torrent) - if torrent.LatestAdded > t.latestState.FirstTorrent.Added { - updatedPaths = append(updatedPaths, fmt.Sprintf("%s/%s", directory, torrent.AccessKey)) - } - break - } - } + dav, html := t.buildTorrentResponses(torrent) + t.AssignedDirectoryCb(torrent, func(directory string) { + torrents, _ := t.DirectoryMap.Get(directory) + torrents.Set(torrent.AccessKey, torrent) + if torrent.LatestAdded > t.latestState.FirstTorrent.Added { + updatedPaths = append(updatedPaths, fmt.Sprintf("%s/%s", directory, torrent.AccessKey)) } - } + // torrent responses + cacheKey := directory + "/" + torrent.AccessKey + newHtml := strings.ReplaceAll(html, "$dir", directory) + t.ResponseCache.Set(cacheKey+".html", &newHtml, 1) + newDav := strings.ReplaceAll(dav, "$dir", directory) + t.ResponseCache.Set(cacheKey+".dav", &newDav, 1) + }) }) // delete torrents that no longer exist @@ -382,7 +362,7 @@ func (t *TorrentManager) startRefreshJob() { t.log.Infof("Deleted torrent: %s\n", oldAccessKey) } } - t.updateSortedKeys() + t.updateDirectoryResponsesCache() t.log.Infof("Compiled into %d torrents, %d were missing info", oldTorrents.Count(), noInfoCount) @@ -698,7 +678,7 @@ func (t *TorrentManager) Repair(accessKey string) { t.DirectoryMap.IterCb(func(_ string, torrents cmap.ConcurrentMap[string, *Torrent]) { torrents.Remove(torrent.AccessKey) }) - t.SetNewLatestState(EmptyState()) + t.ScheduleForRefresh() // t.log.Debugf("You can try fixing it yourself magnet:?xt=urn:btih:%s", info.Hash) return } else if streamableCount == 1 { @@ -707,7 +687,7 @@ func (t *TorrentManager) Repair(accessKey string) { t.DirectoryMap.IterCb(func(_ string, torrents cmap.ConcurrentMap[string, *Torrent]) { torrents.Remove(torrent.AccessKey) }) - t.SetNewLatestState(EmptyState()) + t.ScheduleForRefresh() return } // t.log.Debugf("Identified the expired files of torrent id=%s", info.ID) @@ -877,7 +857,7 @@ func (t *TorrentManager) canCapacityHandle() bool { } } -func (t *TorrentManager) updateSortedKeys() { +func (t *TorrentManager) updateDirectoryResponsesCache() { t.DirectoryMap.IterCb(func(directory string, torrents cmap.ConcurrentMap[string, *Torrent]) { allKeys := torrents.Keys() sort.Strings(allKeys) @@ -893,10 +873,55 @@ func (t *TorrentManager) updateSortedKeys() { } } + cacheKey := directory davRet = "" + dav.BaseDirectory(directory, "") + dav.BaseDirectory(directory, "") + davRet + "" - t.ResponseCache.Set(directory+".dav", davRet, 1) - + t.ResponseCache.Set(cacheKey+".dav", &davRet, 1) htmlRet = "
          " + htmlRet - t.ResponseCache.Set(directory+".html", "
            "+htmlRet, 1) + t.ResponseCache.Set(cacheKey+".html", &htmlRet, 1) }) } + +func (t *TorrentManager) buildTorrentResponses(tor *Torrent) (string, string) { + davRet := "" + dav.BaseDirectory(filepath.Join("$dir", tor.AccessKey), tor.LatestAdded) + htmlRet := "
              " + filenames := tor.SelectedFiles.Keys() + sort.Strings(filenames) + for _, filename := range filenames { + file, _ := tor.SelectedFiles.Get(filename) + if file == nil || !strings.HasPrefix(file.Link, "http") { + // will be caught by torrent manager's repairAll + // just skip it for now + continue + } + + davRet += dav.File(filename, file.Bytes, file.Ended) + + filePath := filepath.Join("$dir", tor.AccessKey, url.PathEscape(filename)) + htmlRet += fmt.Sprintf("
            1. %s
            2. ", filePath, filename) + } + davRet += "" + + return davRet, htmlRet +} + +func (t *TorrentManager) AssignedDirectoryCb(tor *Torrent, cb func(string)) { + var torrentIDs []string + for _, instance := range tor.Instances { + torrentIDs = append(torrentIDs, instance.ID) + } + // get filenames needed for directory conditions + filenames := tor.SelectedFiles.Keys() + // Map torrents to directories + switch t.Config.GetVersion() { + case "v1": + configV1 := t.Config.(*config.ZurgConfigV1) + for _, directories := range configV1.GetGroupMap() { + for _, directory := range directories { + if t.Config.MeetsConditions(directory, tor.AccessKey, torrentIDs, filenames) { + cb(directory) + break + } + } + } + } +} diff --git a/internal/universal/get.go b/internal/universal/get.go index dee3430..6b8d412 100644 --- a/internal/universal/get.go +++ b/internal/universal/get.go @@ -97,7 +97,7 @@ func (gf *GetFile) HandleGetRequest(w http.ResponseWriter, r *http.Request, t *i file.Link = "repair" if c.EnableRepair() { // log.Debugf("File %s is marked for repair", filepath.Base(file.Path)) - t.SetNewLatestState(intTor.EmptyState()) // force a recheck + t.ScheduleForRefresh() // force a recheck } http.Error(w, "File is not available", http.StatusNotFound) return @@ -181,7 +181,7 @@ func (gf *GetFile) streamFileToResponse(file *intTor.File, url string, w http.Re file.Link = "repair" if cfg.EnableRepair() { // log.Debugf("File %s is marked for repair", filepath.Base(file.Path)) - torMgr.SetNewLatestState(intTor.EmptyState()) // force a recheck + torMgr.ScheduleForRefresh() // force a recheck } } http.Error(w, "File is not available", http.StatusNotFound) @@ -195,7 +195,7 @@ func (gf *GetFile) streamFileToResponse(file *intTor.File, url string, w http.Re file.Link = "repair" if cfg.EnableRepair() { // log.Debugf("File %s is marked for repair", filepath.Base(file.Path)) - torMgr.SetNewLatestState(intTor.EmptyState()) // force a recheck + torMgr.ScheduleForRefresh() // force a recheck } } http.Error(w, "File is not available", http.StatusNotFound) diff --git a/pkg/realdebrid/api.go b/pkg/realdebrid/api.go index 200e086..923be0f 100644 --- a/pkg/realdebrid/api.go +++ b/pkg/realdebrid/api.go @@ -312,59 +312,51 @@ func (rd *RealDebrid) UnrestrictLink(link string, checkFirstByte bool) (*Downloa } // GetDownloads returns all torrents, paginated -func (rd *RealDebrid) GetDownloads() ([]Download, int, error) { +func (rd *RealDebrid) GetDownloads(page, offset int) ([]Download, int, error) { baseURL := "https://api.real-debrid.com/rest/1.0/downloads" var allDownloads []Download - page := 1 limit := 1000 totalCount := 0 - for { - params := url.Values{} - params.Set("page", fmt.Sprintf("%d", page)) - params.Set("limit", fmt.Sprintf("%d", limit)) - // params.Set("filter", "active") + params := url.Values{} + params.Set("page", fmt.Sprintf("%d", page)) + params.Set("offset", fmt.Sprintf("%d", offset)) + params.Set("limit", fmt.Sprintf("%d", limit)) + // params.Set("filter", "active") - reqURL := baseURL + "?" + params.Encode() + reqURL := baseURL + "?" + params.Encode() - req, err := http.NewRequest("GET", reqURL, nil) - if err != nil { - rd.log.Errorf("Error when creating a get downloads request: %v", err) - return nil, 0, err - } - - resp, err := rd.client.Do(req) - if err != nil { - rd.log.Errorf("Error when executing the get downloads request: %v", err) - return nil, 0, err - } - defer resp.Body.Close() - - // if status code is not 2xx, return erro - - var downloads []Download - decoder := json.NewDecoder(resp.Body) - err = decoder.Decode(&downloads) - if err != nil { - rd.log.Errorf("Error when decoding get downloads JSON: %v", err) - return nil, 0, err - } - - allDownloads = append(allDownloads, downloads...) - - totalCountHeader := resp.Header.Get("x-total-count") - totalCount, err = strconv.Atoi(totalCountHeader) - if err != nil { - break - } - - if len(allDownloads) >= totalCount { - break - } - - rd.log.Debugf("Got %d downloads (page %d), total count is %d", len(allDownloads), page, totalCount) - - page++ + req, err := http.NewRequest("GET", reqURL, nil) + if err != nil { + rd.log.Errorf("Error when creating a get downloads request: %v", err) + return nil, 0, err } + + resp, err := rd.client.Do(req) + if err != nil { + rd.log.Errorf("Error when executing the get downloads request: %v", err) + return nil, 0, err + } + defer resp.Body.Close() + + // if status code is not 2xx, return erro + + var downloads []Download + decoder := json.NewDecoder(resp.Body) + err = decoder.Decode(&downloads) + if err != nil { + rd.log.Errorf("Error when decoding get downloads JSON: %v", err) + return nil, 0, err + } + + allDownloads = append(allDownloads, downloads...) + + totalCountHeader := resp.Header.Get("x-total-count") + totalCount, err = strconv.Atoi(totalCountHeader) + if err != nil { + totalCount = 0 + } + + rd.log.Debugf("Got %d downloads (page %d), total count is %d", len(allDownloads), page, totalCount) return allDownloads, totalCount, nil }