From 9e3760f275396e6e43daea97be5824435c01340b Mon Sep 17 00:00:00 2001 From: Ben Sarmiento Date: Thu, 30 Nov 2023 00:40:26 +0100 Subject: [PATCH] Use worker pool extensively --- internal/app.go | 2 +- internal/dav/delete.go | 2 +- internal/torrent/latestState.go | 29 ++++++++ internal/torrent/manager.go | 122 +++++++++++++++++--------------- internal/universal/get.go | 6 +- 5 files changed, 97 insertions(+), 64 deletions(-) create mode 100644 internal/torrent/latestState.go diff --git a/internal/app.go b/internal/app.go index 6682e40..0e87d48 100644 --- a/internal/app.go +++ b/internal/app.go @@ -31,7 +31,7 @@ func MainApp(configPath string) { rd := realdebrid.NewRealDebrid(apiClient, log.Named("realdebrid")) - p, err := ants.NewPool(config.GetNumOfWorkers()) + p, err := ants.NewPool(config.GetNumOfWorkers() + 1) if err != nil { zurglog.Errorf("Failed to create worker pool: %v", err) os.Exit(1) diff --git a/internal/dav/delete.go b/internal/dav/delete.go index e75aed3..de5c1e7 100644 --- a/internal/dav/delete.go +++ b/internal/dav/delete.go @@ -83,7 +83,7 @@ func handleDeleteFile(w http.ResponseWriter, segments []string, t *torrent.Torre } file.Link = "unselect" - t.SetChecksum("") + t.SetNewLatestState(torrent.EmptyState()) w.WriteHeader(http.StatusNoContent) return nil } diff --git a/internal/torrent/latestState.go b/internal/torrent/latestState.go new file mode 100644 index 0000000..42ecc9c --- /dev/null +++ b/internal/torrent/latestState.go @@ -0,0 +1,29 @@ +package torrent + +import ( + "time" + + "github.com/debridmediamanager/zurg/pkg/realdebrid" +) + +type LibraryState struct { + TotalCount int + FirstTorrent *realdebrid.Torrent + DownloadingCount int +} + +func (ls LibraryState) equal(a LibraryState) bool { + return a.TotalCount == ls.TotalCount && a.FirstTorrent.ID == ls.FirstTorrent.ID && a.DownloadingCount == ls.DownloadingCount +} + +func EmptyState() LibraryState { + oldestTime := time.Time{} + return LibraryState{ + TotalCount: 0, + FirstTorrent: &realdebrid.Torrent{ + ID: "", + Added: oldestTime.Format(time.RFC3339), + }, + DownloadingCount: 0, + } +} diff --git a/internal/torrent/manager.go b/internal/torrent/manager.go index 52488f6..7fa5bba 100644 --- a/internal/torrent/manager.go +++ b/internal/torrent/manager.go @@ -25,7 +25,6 @@ import ( const ( INT_ALL = "int__all__" INT_INFO_CACHE = "int__info__" - DATA_DIR = "data" ) type TorrentManager struct { @@ -34,10 +33,9 @@ type TorrentManager struct { DirectoryMap cmap.ConcurrentMap[string, cmap.ConcurrentMap[string, *Torrent]] // directory -> accessKey -> Torrent DownloadCache cmap.ConcurrentMap[string, *realdebrid.Download] ResponseCache *ristretto.Cache - checksum string - latestAdded string + latestState *LibraryState requiredVersion string - antsPool *ants.Pool + workerPool *ants.Pool unrestrictPool *ants.Pool log *zap.SugaredLogger } @@ -46,19 +44,23 @@ type TorrentManager struct { // it will fetch all torrents and their info in the background // and store them in-memory and cached in files func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, p *ants.Pool, cache *ristretto.Cache, log *zap.SugaredLogger) *TorrentManager { + initialSate := EmptyState() + t := &TorrentManager{ Config: cfg, Api: api, DirectoryMap: cmap.New[cmap.ConcurrentMap[string, *Torrent]](), ResponseCache: cache, + latestState: &initialSate, requiredVersion: "18.11.2023", - antsPool: p, + workerPool: p, log: log, } + // create unrestrict pool unrestrictPool, err := ants.NewPool(t.Config.GetUnrestrictWorkers()) if err != nil { - t.unrestrictPool = t.antsPool + t.unrestrictPool = t.workerPool } else { t.unrestrictPool = unrestrictPool } @@ -66,17 +68,16 @@ func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, p // create internal directories t.DirectoryMap.Set(INT_ALL, cmap.New[*Torrent]()) // key is AccessKey t.DirectoryMap.Set(INT_INFO_CACHE, cmap.New[*Torrent]()) // key is Torrent ID - // create directory maps for _, directory := range cfg.GetDirectories() { t.DirectoryMap.Set(directory, cmap.New[*Torrent]()) } var initWait sync.WaitGroup - initWait.Add(2) // Fetch downloads - go func() { + initWait.Add(1) + _ = t.workerPool.Submit(func() { defer initWait.Done() downloads, _, err := t.Api.GetDownloads() if err != nil { @@ -88,17 +89,17 @@ func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, p t.DownloadCache.Set(downloads[i].Link, &downloads[i]) } } - }() + }) - // Fetch torrents var newTorrents []realdebrid.Torrent - go func() { + initWait.Add(1) + _ = t.workerPool.Submit(func() { defer initWait.Done() newTorrents, _, err = t.Api.GetTorrents(0) if err != nil { t.log.Fatalf("Cannot get torrents: %v\n", err) } - }() + }) initWait.Wait() @@ -108,12 +109,11 @@ func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, p var wg sync.WaitGroup for i := range newTorrents { wg.Add(1) - go func(idx int) { - _ = t.antsPool.Submit(func() { - defer wg.Done() - torrentsChan <- t.getMoreInfo(newTorrents[idx]) - }) - }(i) + idx := i // capture the loop variable + _ = t.workerPool.Submit(func() { + defer wg.Done() + torrentsChan <- t.getMoreInfo(newTorrents[idx]) + }) } wg.Wait() close(torrentsChan) @@ -164,16 +164,18 @@ func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, p t.log.Infof("Compiled into %d torrents, %d were missing info", allTorrents.Count(), noInfoCount) - t.SetChecksum(t.getChecksum()) + t.SetNewLatestState(t.getCurrentState()) if t.Config.EnableRepair() { t.log.Info("Checking for torrents to repair") t.repairAll() t.log.Info("Finished checking for torrents to repair") } - go t.startRefreshJob() - t.latestAdded = newTorrents[0].Added // set the latest added to the first torrent's added + _ = t.workerPool.Submit(func() { + t.startRefreshJob() + }) + t.log.Info("Finished initializing torrent manager") return t @@ -219,65 +221,67 @@ func (t *TorrentManager) UnrestrictUntilOk(link string) *realdebrid.Download { // return t.api.UnrestrictUntilOk(link, t.cfg.ShouldServeFromRclone()) } -type torrentsResponse struct { +func (t *TorrentManager) SetNewLatestState(checksum LibraryState) { + t.latestState.DownloadingCount = checksum.DownloadingCount + t.latestState.FirstTorrent = checksum.FirstTorrent + t.latestState.TotalCount = checksum.TotalCount +} + +type torrentsResp struct { torrents []realdebrid.Torrent totalCount int } -func (t *TorrentManager) SetChecksum(checksum string) { - // t.mu.Lock() - t.checksum = checksum - // t.mu.Unlock() -} - // generates a checksum based on the number of torrents, the first torrent id and the number of active torrents -func (t *TorrentManager) getChecksum() string { - torrentsChan := make(chan torrentsResponse, 1) +func (t *TorrentManager) getCurrentState() LibraryState { + torrentsChan := make(chan torrentsResp, 1) countChan := make(chan int, 1) errChan := make(chan error, 2) // accommodate errors from both goroutines - // GetTorrents request - go func() { + _ = t.workerPool.Submit(func() { torrents, totalCount, err := t.Api.GetTorrents(1) if err != nil { errChan <- err return } - torrentsChan <- torrentsResponse{torrents: torrents, totalCount: totalCount} - }() + torrentsChan <- torrentsResp{torrents: torrents, totalCount: totalCount} + }) - // GetActiveTorrentCount request - go func() { + _ = t.workerPool.Submit(func() { count, err := t.Api.GetActiveTorrentCount() if err != nil { errChan <- err return } countChan <- count.DownloadingCount - }() + }) + // Existing goroutines for GetTorrents and GetActiveTorrentCount var torrents []realdebrid.Torrent var totalCount, count int for i := 0; i < 2; i++ { select { - case torrentsResp := <-torrentsChan: - torrents = torrentsResp.torrents - totalCount = torrentsResp.totalCount + case resp := <-torrentsChan: + torrents = resp.torrents + totalCount = resp.totalCount case count = <-countChan: case err := <-errChan: t.log.Warnf("Checksum API Error: %v\n", err) - return "" + return EmptyState() } } if len(torrents) == 0 { t.log.Error("Huh, no torrents returned") - return "" + return EmptyState() } - checksum := fmt.Sprintf("%d%s%d", totalCount, torrents[0].ID, count) - return checksum + return LibraryState{ + TotalCount: totalCount, + FirstTorrent: &torrents[0], + DownloadingCount: count, + } } // startRefreshJob periodically refreshes the torrents @@ -286,8 +290,8 @@ func (t *TorrentManager) startRefreshJob() { for { <-time.After(time.Duration(t.Config.GetRefreshEverySeconds()) * time.Second) - checksum := t.getChecksum() - if checksum == t.checksum { + checksum := t.getCurrentState() + if t.latestState.equal(checksum) { continue } @@ -319,12 +323,11 @@ func (t *TorrentManager) startRefreshJob() { var wg sync.WaitGroup for i := range newTorrents { wg.Add(1) - go func(idx int) { - _ = t.antsPool.Submit(func() { - defer wg.Done() - torrentsChan <- t.getMoreInfo(newTorrents[idx]) - }) - }(i) + idx := i // capture the loop variable + _ = t.workerPool.Submit(func() { + defer wg.Done() + torrentsChan <- t.getMoreInfo(newTorrents[idx]) + }) } wg.Wait() close(torrentsChan) @@ -368,7 +371,7 @@ func (t *TorrentManager) startRefreshJob() { if t.Config.MeetsConditions(directory, torrent.AccessKey, torrentIDs, filenames) { torrents, _ := t.DirectoryMap.Get(directory) torrents.Set(torrent.AccessKey, torrent) - if torrent.LatestAdded > t.latestAdded { + if torrent.LatestAdded > t.latestState.FirstTorrent.Added { updatedPaths = append(updatedPaths, fmt.Sprintf("%s/%s", directory, torrent.AccessKey)) } break @@ -392,7 +395,7 @@ func (t *TorrentManager) startRefreshJob() { t.log.Infof("Compiled into %d torrents, %d were missing info", oldTorrents.Count(), noInfoCount) - t.SetChecksum(t.getChecksum()) + t.SetNewLatestState(t.getCurrentState()) if t.Config.EnableRepair() { t.log.Info("Checking for torrents to repair") @@ -401,9 +404,10 @@ func (t *TorrentManager) startRefreshJob() { } else { t.log.Info("Repair is disabled, skipping repair check") } - go OnLibraryUpdateHook(updatedPaths, t.Config, t.log) + _ = t.workerPool.Submit(func() { + OnLibraryUpdateHook(updatedPaths, t.Config, t.log) + }) - t.latestAdded = newTorrents[0].Added t.log.Info("Finished refreshing torrents") } } @@ -494,7 +498,7 @@ func (t *TorrentManager) getName(name, originalName string) string { } func (t *TorrentManager) writeTorrentToFile(torrent *realdebrid.TorrentInfo) error { - filePath := DATA_DIR + "/" + torrent.ID + ".bin" + filePath := "data/" + torrent.ID + ".bin" file, err := os.Create(filePath) if err != nil { return fmt.Errorf("failed creating file: %w", err) @@ -513,7 +517,7 @@ func (t *TorrentManager) writeTorrentToFile(torrent *realdebrid.TorrentInfo) err } func (t *TorrentManager) readTorrentFromFile(torrentID string) *realdebrid.TorrentInfo { - filePath := DATA_DIR + "/" + torrentID + ".bin" + filePath := "data/" + torrentID + ".bin" file, err := os.Open(filePath) if err != nil { if os.IsNotExist(err) { diff --git a/internal/universal/get.go b/internal/universal/get.go index c226f7a..dee3430 100644 --- a/internal/universal/get.go +++ b/internal/universal/get.go @@ -97,7 +97,7 @@ func (gf *GetFile) HandleGetRequest(w http.ResponseWriter, r *http.Request, t *i file.Link = "repair" if c.EnableRepair() { // log.Debugf("File %s is marked for repair", filepath.Base(file.Path)) - t.SetChecksum("") // force a recheck + t.SetNewLatestState(intTor.EmptyState()) // force a recheck } http.Error(w, "File is not available", http.StatusNotFound) return @@ -181,7 +181,7 @@ func (gf *GetFile) streamFileToResponse(file *intTor.File, url string, w http.Re file.Link = "repair" if cfg.EnableRepair() { // log.Debugf("File %s is marked for repair", filepath.Base(file.Path)) - torMgr.SetChecksum("") // force a recheck + torMgr.SetNewLatestState(intTor.EmptyState()) // force a recheck } } http.Error(w, "File is not available", http.StatusNotFound) @@ -195,7 +195,7 @@ func (gf *GetFile) streamFileToResponse(file *intTor.File, url string, w http.Re file.Link = "repair" if cfg.EnableRepair() { // log.Debugf("File %s is marked for repair", filepath.Base(file.Path)) - torMgr.SetChecksum("") // force a recheck + torMgr.SetNewLatestState(intTor.EmptyState()) // force a recheck } } http.Error(w, "File is not available", http.StatusNotFound)