Use worker pool extensively

This commit is contained in:
Ben Sarmiento
2023-11-30 00:40:26 +01:00
parent 6e54fa760b
commit 9e3760f275
5 changed files with 97 additions and 64 deletions

View File

@@ -31,7 +31,7 @@ func MainApp(configPath string) {
rd := realdebrid.NewRealDebrid(apiClient, log.Named("realdebrid")) rd := realdebrid.NewRealDebrid(apiClient, log.Named("realdebrid"))
p, err := ants.NewPool(config.GetNumOfWorkers()) p, err := ants.NewPool(config.GetNumOfWorkers() + 1)
if err != nil { if err != nil {
zurglog.Errorf("Failed to create worker pool: %v", err) zurglog.Errorf("Failed to create worker pool: %v", err)
os.Exit(1) os.Exit(1)

View File

@@ -83,7 +83,7 @@ func handleDeleteFile(w http.ResponseWriter, segments []string, t *torrent.Torre
} }
file.Link = "unselect" file.Link = "unselect"
t.SetChecksum("") t.SetNewLatestState(torrent.EmptyState())
w.WriteHeader(http.StatusNoContent) w.WriteHeader(http.StatusNoContent)
return nil return nil
} }

View File

@@ -0,0 +1,29 @@
package torrent
import (
"time"
"github.com/debridmediamanager/zurg/pkg/realdebrid"
)
type LibraryState struct {
TotalCount int
FirstTorrent *realdebrid.Torrent
DownloadingCount int
}
func (ls LibraryState) equal(a LibraryState) bool {
return a.TotalCount == ls.TotalCount && a.FirstTorrent.ID == ls.FirstTorrent.ID && a.DownloadingCount == ls.DownloadingCount
}
func EmptyState() LibraryState {
oldestTime := time.Time{}
return LibraryState{
TotalCount: 0,
FirstTorrent: &realdebrid.Torrent{
ID: "",
Added: oldestTime.Format(time.RFC3339),
},
DownloadingCount: 0,
}
}

View File

@@ -25,7 +25,6 @@ import (
const ( const (
INT_ALL = "int__all__" INT_ALL = "int__all__"
INT_INFO_CACHE = "int__info__" INT_INFO_CACHE = "int__info__"
DATA_DIR = "data"
) )
type TorrentManager struct { type TorrentManager struct {
@@ -34,10 +33,9 @@ type TorrentManager struct {
DirectoryMap cmap.ConcurrentMap[string, cmap.ConcurrentMap[string, *Torrent]] // directory -> accessKey -> Torrent DirectoryMap cmap.ConcurrentMap[string, cmap.ConcurrentMap[string, *Torrent]] // directory -> accessKey -> Torrent
DownloadCache cmap.ConcurrentMap[string, *realdebrid.Download] DownloadCache cmap.ConcurrentMap[string, *realdebrid.Download]
ResponseCache *ristretto.Cache ResponseCache *ristretto.Cache
checksum string latestState *LibraryState
latestAdded string
requiredVersion string requiredVersion string
antsPool *ants.Pool workerPool *ants.Pool
unrestrictPool *ants.Pool unrestrictPool *ants.Pool
log *zap.SugaredLogger log *zap.SugaredLogger
} }
@@ -46,19 +44,23 @@ type TorrentManager struct {
// it will fetch all torrents and their info in the background // it will fetch all torrents and their info in the background
// and store them in-memory and cached in files // and store them in-memory and cached in files
func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, p *ants.Pool, cache *ristretto.Cache, log *zap.SugaredLogger) *TorrentManager { func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, p *ants.Pool, cache *ristretto.Cache, log *zap.SugaredLogger) *TorrentManager {
initialSate := EmptyState()
t := &TorrentManager{ t := &TorrentManager{
Config: cfg, Config: cfg,
Api: api, Api: api,
DirectoryMap: cmap.New[cmap.ConcurrentMap[string, *Torrent]](), DirectoryMap: cmap.New[cmap.ConcurrentMap[string, *Torrent]](),
ResponseCache: cache, ResponseCache: cache,
latestState: &initialSate,
requiredVersion: "18.11.2023", requiredVersion: "18.11.2023",
antsPool: p, workerPool: p,
log: log, log: log,
} }
// create unrestrict pool
unrestrictPool, err := ants.NewPool(t.Config.GetUnrestrictWorkers()) unrestrictPool, err := ants.NewPool(t.Config.GetUnrestrictWorkers())
if err != nil { if err != nil {
t.unrestrictPool = t.antsPool t.unrestrictPool = t.workerPool
} else { } else {
t.unrestrictPool = unrestrictPool t.unrestrictPool = unrestrictPool
} }
@@ -66,17 +68,16 @@ func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, p
// create internal directories // create internal directories
t.DirectoryMap.Set(INT_ALL, cmap.New[*Torrent]()) // key is AccessKey t.DirectoryMap.Set(INT_ALL, cmap.New[*Torrent]()) // key is AccessKey
t.DirectoryMap.Set(INT_INFO_CACHE, cmap.New[*Torrent]()) // key is Torrent ID t.DirectoryMap.Set(INT_INFO_CACHE, cmap.New[*Torrent]()) // key is Torrent ID
// create directory maps // create directory maps
for _, directory := range cfg.GetDirectories() { for _, directory := range cfg.GetDirectories() {
t.DirectoryMap.Set(directory, cmap.New[*Torrent]()) t.DirectoryMap.Set(directory, cmap.New[*Torrent]())
} }
var initWait sync.WaitGroup var initWait sync.WaitGroup
initWait.Add(2)
// Fetch downloads // Fetch downloads
go func() { initWait.Add(1)
_ = t.workerPool.Submit(func() {
defer initWait.Done() defer initWait.Done()
downloads, _, err := t.Api.GetDownloads() downloads, _, err := t.Api.GetDownloads()
if err != nil { if err != nil {
@@ -88,17 +89,17 @@ func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, p
t.DownloadCache.Set(downloads[i].Link, &downloads[i]) t.DownloadCache.Set(downloads[i].Link, &downloads[i])
} }
} }
}() })
// Fetch torrents
var newTorrents []realdebrid.Torrent var newTorrents []realdebrid.Torrent
go func() { initWait.Add(1)
_ = t.workerPool.Submit(func() {
defer initWait.Done() defer initWait.Done()
newTorrents, _, err = t.Api.GetTorrents(0) newTorrents, _, err = t.Api.GetTorrents(0)
if err != nil { if err != nil {
t.log.Fatalf("Cannot get torrents: %v\n", err) t.log.Fatalf("Cannot get torrents: %v\n", err)
} }
}() })
initWait.Wait() initWait.Wait()
@@ -108,12 +109,11 @@ func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, p
var wg sync.WaitGroup var wg sync.WaitGroup
for i := range newTorrents { for i := range newTorrents {
wg.Add(1) wg.Add(1)
go func(idx int) { idx := i // capture the loop variable
_ = t.antsPool.Submit(func() { _ = t.workerPool.Submit(func() {
defer wg.Done() defer wg.Done()
torrentsChan <- t.getMoreInfo(newTorrents[idx]) torrentsChan <- t.getMoreInfo(newTorrents[idx])
}) })
}(i)
} }
wg.Wait() wg.Wait()
close(torrentsChan) close(torrentsChan)
@@ -164,16 +164,18 @@ func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, p
t.log.Infof("Compiled into %d torrents, %d were missing info", allTorrents.Count(), noInfoCount) t.log.Infof("Compiled into %d torrents, %d were missing info", allTorrents.Count(), noInfoCount)
t.SetChecksum(t.getChecksum()) t.SetNewLatestState(t.getCurrentState())
if t.Config.EnableRepair() { if t.Config.EnableRepair() {
t.log.Info("Checking for torrents to repair") t.log.Info("Checking for torrents to repair")
t.repairAll() t.repairAll()
t.log.Info("Finished checking for torrents to repair") t.log.Info("Finished checking for torrents to repair")
} }
go t.startRefreshJob()
t.latestAdded = newTorrents[0].Added // set the latest added to the first torrent's added _ = t.workerPool.Submit(func() {
t.startRefreshJob()
})
t.log.Info("Finished initializing torrent manager") t.log.Info("Finished initializing torrent manager")
return t return t
@@ -219,65 +221,67 @@ func (t *TorrentManager) UnrestrictUntilOk(link string) *realdebrid.Download {
// return t.api.UnrestrictUntilOk(link, t.cfg.ShouldServeFromRclone()) // return t.api.UnrestrictUntilOk(link, t.cfg.ShouldServeFromRclone())
} }
type torrentsResponse struct { func (t *TorrentManager) SetNewLatestState(checksum LibraryState) {
t.latestState.DownloadingCount = checksum.DownloadingCount
t.latestState.FirstTorrent = checksum.FirstTorrent
t.latestState.TotalCount = checksum.TotalCount
}
type torrentsResp struct {
torrents []realdebrid.Torrent torrents []realdebrid.Torrent
totalCount int totalCount int
} }
func (t *TorrentManager) SetChecksum(checksum string) {
// t.mu.Lock()
t.checksum = checksum
// t.mu.Unlock()
}
// generates a checksum based on the number of torrents, the first torrent id and the number of active torrents // generates a checksum based on the number of torrents, the first torrent id and the number of active torrents
func (t *TorrentManager) getChecksum() string { func (t *TorrentManager) getCurrentState() LibraryState {
torrentsChan := make(chan torrentsResponse, 1) torrentsChan := make(chan torrentsResp, 1)
countChan := make(chan int, 1) countChan := make(chan int, 1)
errChan := make(chan error, 2) // accommodate errors from both goroutines errChan := make(chan error, 2) // accommodate errors from both goroutines
// GetTorrents request _ = t.workerPool.Submit(func() {
go func() {
torrents, totalCount, err := t.Api.GetTorrents(1) torrents, totalCount, err := t.Api.GetTorrents(1)
if err != nil { if err != nil {
errChan <- err errChan <- err
return return
} }
torrentsChan <- torrentsResponse{torrents: torrents, totalCount: totalCount} torrentsChan <- torrentsResp{torrents: torrents, totalCount: totalCount}
}() })
// GetActiveTorrentCount request _ = t.workerPool.Submit(func() {
go func() {
count, err := t.Api.GetActiveTorrentCount() count, err := t.Api.GetActiveTorrentCount()
if err != nil { if err != nil {
errChan <- err errChan <- err
return return
} }
countChan <- count.DownloadingCount countChan <- count.DownloadingCount
}() })
// Existing goroutines for GetTorrents and GetActiveTorrentCount
var torrents []realdebrid.Torrent var torrents []realdebrid.Torrent
var totalCount, count int var totalCount, count int
for i := 0; i < 2; i++ { for i := 0; i < 2; i++ {
select { select {
case torrentsResp := <-torrentsChan: case resp := <-torrentsChan:
torrents = torrentsResp.torrents torrents = resp.torrents
totalCount = torrentsResp.totalCount totalCount = resp.totalCount
case count = <-countChan: case count = <-countChan:
case err := <-errChan: case err := <-errChan:
t.log.Warnf("Checksum API Error: %v\n", err) t.log.Warnf("Checksum API Error: %v\n", err)
return "" return EmptyState()
} }
} }
if len(torrents) == 0 { if len(torrents) == 0 {
t.log.Error("Huh, no torrents returned") t.log.Error("Huh, no torrents returned")
return "" return EmptyState()
} }
checksum := fmt.Sprintf("%d%s%d", totalCount, torrents[0].ID, count) return LibraryState{
return checksum TotalCount: totalCount,
FirstTorrent: &torrents[0],
DownloadingCount: count,
}
} }
// startRefreshJob periodically refreshes the torrents // startRefreshJob periodically refreshes the torrents
@@ -286,8 +290,8 @@ func (t *TorrentManager) startRefreshJob() {
for { for {
<-time.After(time.Duration(t.Config.GetRefreshEverySeconds()) * time.Second) <-time.After(time.Duration(t.Config.GetRefreshEverySeconds()) * time.Second)
checksum := t.getChecksum() checksum := t.getCurrentState()
if checksum == t.checksum { if t.latestState.equal(checksum) {
continue continue
} }
@@ -319,12 +323,11 @@ func (t *TorrentManager) startRefreshJob() {
var wg sync.WaitGroup var wg sync.WaitGroup
for i := range newTorrents { for i := range newTorrents {
wg.Add(1) wg.Add(1)
go func(idx int) { idx := i // capture the loop variable
_ = t.antsPool.Submit(func() { _ = t.workerPool.Submit(func() {
defer wg.Done() defer wg.Done()
torrentsChan <- t.getMoreInfo(newTorrents[idx]) torrentsChan <- t.getMoreInfo(newTorrents[idx])
}) })
}(i)
} }
wg.Wait() wg.Wait()
close(torrentsChan) close(torrentsChan)
@@ -368,7 +371,7 @@ func (t *TorrentManager) startRefreshJob() {
if t.Config.MeetsConditions(directory, torrent.AccessKey, torrentIDs, filenames) { if t.Config.MeetsConditions(directory, torrent.AccessKey, torrentIDs, filenames) {
torrents, _ := t.DirectoryMap.Get(directory) torrents, _ := t.DirectoryMap.Get(directory)
torrents.Set(torrent.AccessKey, torrent) torrents.Set(torrent.AccessKey, torrent)
if torrent.LatestAdded > t.latestAdded { if torrent.LatestAdded > t.latestState.FirstTorrent.Added {
updatedPaths = append(updatedPaths, fmt.Sprintf("%s/%s", directory, torrent.AccessKey)) updatedPaths = append(updatedPaths, fmt.Sprintf("%s/%s", directory, torrent.AccessKey))
} }
break break
@@ -392,7 +395,7 @@ func (t *TorrentManager) startRefreshJob() {
t.log.Infof("Compiled into %d torrents, %d were missing info", oldTorrents.Count(), noInfoCount) t.log.Infof("Compiled into %d torrents, %d were missing info", oldTorrents.Count(), noInfoCount)
t.SetChecksum(t.getChecksum()) t.SetNewLatestState(t.getCurrentState())
if t.Config.EnableRepair() { if t.Config.EnableRepair() {
t.log.Info("Checking for torrents to repair") t.log.Info("Checking for torrents to repair")
@@ -401,9 +404,10 @@ func (t *TorrentManager) startRefreshJob() {
} else { } else {
t.log.Info("Repair is disabled, skipping repair check") t.log.Info("Repair is disabled, skipping repair check")
} }
go OnLibraryUpdateHook(updatedPaths, t.Config, t.log) _ = t.workerPool.Submit(func() {
OnLibraryUpdateHook(updatedPaths, t.Config, t.log)
})
t.latestAdded = newTorrents[0].Added
t.log.Info("Finished refreshing torrents") t.log.Info("Finished refreshing torrents")
} }
} }
@@ -494,7 +498,7 @@ func (t *TorrentManager) getName(name, originalName string) string {
} }
func (t *TorrentManager) writeTorrentToFile(torrent *realdebrid.TorrentInfo) error { func (t *TorrentManager) writeTorrentToFile(torrent *realdebrid.TorrentInfo) error {
filePath := DATA_DIR + "/" + torrent.ID + ".bin" filePath := "data/" + torrent.ID + ".bin"
file, err := os.Create(filePath) file, err := os.Create(filePath)
if err != nil { if err != nil {
return fmt.Errorf("failed creating file: %w", err) return fmt.Errorf("failed creating file: %w", err)
@@ -513,7 +517,7 @@ func (t *TorrentManager) writeTorrentToFile(torrent *realdebrid.TorrentInfo) err
} }
func (t *TorrentManager) readTorrentFromFile(torrentID string) *realdebrid.TorrentInfo { func (t *TorrentManager) readTorrentFromFile(torrentID string) *realdebrid.TorrentInfo {
filePath := DATA_DIR + "/" + torrentID + ".bin" filePath := "data/" + torrentID + ".bin"
file, err := os.Open(filePath) file, err := os.Open(filePath)
if err != nil { if err != nil {
if os.IsNotExist(err) { if os.IsNotExist(err) {

View File

@@ -97,7 +97,7 @@ func (gf *GetFile) HandleGetRequest(w http.ResponseWriter, r *http.Request, t *i
file.Link = "repair" file.Link = "repair"
if c.EnableRepair() { if c.EnableRepair() {
// log.Debugf("File %s is marked for repair", filepath.Base(file.Path)) // log.Debugf("File %s is marked for repair", filepath.Base(file.Path))
t.SetChecksum("") // force a recheck t.SetNewLatestState(intTor.EmptyState()) // force a recheck
} }
http.Error(w, "File is not available", http.StatusNotFound) http.Error(w, "File is not available", http.StatusNotFound)
return return
@@ -181,7 +181,7 @@ func (gf *GetFile) streamFileToResponse(file *intTor.File, url string, w http.Re
file.Link = "repair" file.Link = "repair"
if cfg.EnableRepair() { if cfg.EnableRepair() {
// log.Debugf("File %s is marked for repair", filepath.Base(file.Path)) // log.Debugf("File %s is marked for repair", filepath.Base(file.Path))
torMgr.SetChecksum("") // force a recheck torMgr.SetNewLatestState(intTor.EmptyState()) // force a recheck
} }
} }
http.Error(w, "File is not available", http.StatusNotFound) http.Error(w, "File is not available", http.StatusNotFound)
@@ -195,7 +195,7 @@ func (gf *GetFile) streamFileToResponse(file *intTor.File, url string, w http.Re
file.Link = "repair" file.Link = "repair"
if cfg.EnableRepair() { if cfg.EnableRepair() {
// log.Debugf("File %s is marked for repair", filepath.Base(file.Path)) // log.Debugf("File %s is marked for repair", filepath.Base(file.Path))
torMgr.SetChecksum("") // force a recheck torMgr.SetNewLatestState(intTor.EmptyState()) // force a recheck
} }
} }
http.Error(w, "File is not available", http.StatusNotFound) http.Error(w, "File is not available", http.StatusNotFound)