Optimizations
This commit is contained in:
@@ -7,13 +7,16 @@ import (
|
||||
"math"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/debridmediamanager.com/zurg/internal/config"
|
||||
"github.com/debridmediamanager.com/zurg/pkg/dav"
|
||||
"github.com/debridmediamanager.com/zurg/pkg/realdebrid"
|
||||
"github.com/debridmediamanager.com/zurg/pkg/utils"
|
||||
"github.com/dgraph-io/ristretto"
|
||||
cmap "github.com/orcaman/concurrent-map/v2"
|
||||
"github.com/panjf2000/ants/v2"
|
||||
"go.uber.org/zap"
|
||||
@@ -26,13 +29,14 @@ const (
|
||||
)
|
||||
|
||||
type TorrentManager struct {
|
||||
Config config.ConfigInterface
|
||||
Api *realdebrid.RealDebrid
|
||||
DirectoryMap cmap.ConcurrentMap[string, cmap.ConcurrentMap[string, *Torrent]] // directory -> accessKey -> Torrent
|
||||
DownloadCache cmap.ConcurrentMap[string, *realdebrid.Download]
|
||||
ResponseCache *ristretto.Cache
|
||||
checksum string
|
||||
latestAdded string
|
||||
requiredVersion string
|
||||
cfg config.ConfigInterface
|
||||
Api *realdebrid.RealDebrid
|
||||
antsPool *ants.Pool
|
||||
unrestrictPool *ants.Pool
|
||||
log *zap.SugaredLogger
|
||||
@@ -42,18 +46,19 @@ type TorrentManager struct {
|
||||
// NewTorrentManager creates a new torrent manager
|
||||
// it will fetch all torrents and their info in the background
|
||||
// and store them in-memory and cached in files
|
||||
func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, p *ants.Pool, log *zap.SugaredLogger) *TorrentManager {
|
||||
func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, p *ants.Pool, cache *ristretto.Cache, log *zap.SugaredLogger) *TorrentManager {
|
||||
t := &TorrentManager{
|
||||
cfg: cfg,
|
||||
DirectoryMap: cmap.New[cmap.ConcurrentMap[string, *Torrent]](),
|
||||
requiredVersion: "18.11.2023",
|
||||
Config: cfg,
|
||||
Api: api,
|
||||
DirectoryMap: cmap.New[cmap.ConcurrentMap[string, *Torrent]](),
|
||||
ResponseCache: cache,
|
||||
requiredVersion: "18.11.2023",
|
||||
antsPool: p,
|
||||
log: log,
|
||||
mu: &sync.Mutex{},
|
||||
}
|
||||
|
||||
unrestrictPool, err := ants.NewPool(t.cfg.GetUnrestrictWorkers())
|
||||
unrestrictPool, err := ants.NewPool(t.Config.GetUnrestrictWorkers())
|
||||
if err != nil {
|
||||
t.unrestrictPool = t.antsPool
|
||||
} else {
|
||||
@@ -145,12 +150,12 @@ func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, p
|
||||
// get filenames
|
||||
filenames := torrent.SelectedFiles.Keys()
|
||||
// Map torrents to directories
|
||||
switch t.cfg.GetVersion() {
|
||||
switch t.Config.GetVersion() {
|
||||
case "v1":
|
||||
configV1 := t.cfg.(*config.ZurgConfigV1)
|
||||
configV1 := t.Config.(*config.ZurgConfigV1)
|
||||
for _, directories := range configV1.GetGroupMap() {
|
||||
for _, directory := range directories {
|
||||
if t.cfg.MeetsConditions(directory, torrent.AccessKey, torrentIDs, filenames) {
|
||||
if t.Config.MeetsConditions(directory, torrent.AccessKey, torrentIDs, filenames) {
|
||||
torrents, _ := t.DirectoryMap.Get(directory)
|
||||
torrents.Set(accessKey, torrent)
|
||||
break
|
||||
@@ -159,12 +164,13 @@ func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, p
|
||||
}
|
||||
}
|
||||
})
|
||||
t.updateSortedKeys()
|
||||
|
||||
t.log.Infof("Compiled into %d torrents, %d were missing info", allTorrents.Count(), noInfoCount)
|
||||
|
||||
t.SetChecksum(t.getChecksum())
|
||||
|
||||
if t.cfg.EnableRepair() {
|
||||
if t.Config.EnableRepair() {
|
||||
t.log.Info("Checking for torrents to repair")
|
||||
t.repairAll()
|
||||
t.log.Info("Finished checking for torrents to repair")
|
||||
@@ -209,8 +215,8 @@ func (t *TorrentManager) mergeToMain(mainTorrent, torrentToMerge *Torrent) *Torr
|
||||
func (t *TorrentManager) UnrestrictUntilOk(link string) *realdebrid.Download {
|
||||
retChan := make(chan *realdebrid.Download, 1)
|
||||
t.unrestrictPool.Submit(func() {
|
||||
retChan <- t.Api.UnrestrictUntilOk(link, t.cfg.ShouldServeFromRclone())
|
||||
time.Sleep(time.Duration(t.cfg.GetReleaseUnrestrictAfterMs()) * time.Millisecond)
|
||||
retChan <- t.Api.UnrestrictUntilOk(link, t.Config.ShouldServeFromRclone())
|
||||
time.Sleep(time.Duration(t.Config.GetReleaseUnrestrictAfterMs()) * time.Millisecond)
|
||||
})
|
||||
defer close(retChan)
|
||||
return <-retChan
|
||||
@@ -282,7 +288,7 @@ func (t *TorrentManager) getChecksum() string {
|
||||
func (t *TorrentManager) startRefreshJob() {
|
||||
t.log.Info("Starting periodic refresh")
|
||||
for {
|
||||
<-time.After(time.Duration(t.cfg.GetRefreshEverySeconds()) * time.Second)
|
||||
<-time.After(time.Duration(t.Config.GetRefreshEverySeconds()) * time.Second)
|
||||
|
||||
checksum := t.getChecksum()
|
||||
if checksum == t.checksum {
|
||||
@@ -358,12 +364,12 @@ func (t *TorrentManager) startRefreshJob() {
|
||||
// get filenames
|
||||
filenames := torrent.SelectedFiles.Keys()
|
||||
// Map torrents to directories
|
||||
switch t.cfg.GetVersion() {
|
||||
switch t.Config.GetVersion() {
|
||||
case "v1":
|
||||
configV1 := t.cfg.(*config.ZurgConfigV1)
|
||||
configV1 := t.Config.(*config.ZurgConfigV1)
|
||||
for _, directories := range configV1.GetGroupMap() {
|
||||
for _, directory := range directories {
|
||||
if t.cfg.MeetsConditions(directory, torrent.AccessKey, torrentIDs, filenames) {
|
||||
if t.Config.MeetsConditions(directory, torrent.AccessKey, torrentIDs, filenames) {
|
||||
torrents, _ := t.DirectoryMap.Get(directory)
|
||||
torrents.Set(torrent.AccessKey, torrent)
|
||||
if torrent.LatestAdded > t.latestAdded {
|
||||
@@ -386,19 +392,20 @@ func (t *TorrentManager) startRefreshJob() {
|
||||
t.log.Infof("Deleted torrent: %s\n", oldAccessKey)
|
||||
}
|
||||
}
|
||||
t.updateSortedKeys()
|
||||
|
||||
t.log.Infof("Compiled into %d torrents, %d were missing info", oldTorrents.Count(), noInfoCount)
|
||||
|
||||
t.SetChecksum(t.getChecksum())
|
||||
|
||||
if t.cfg.EnableRepair() {
|
||||
if t.Config.EnableRepair() {
|
||||
t.log.Info("Checking for torrents to repair")
|
||||
t.repairAll()
|
||||
t.log.Info("Finished checking for torrents to repair")
|
||||
} else {
|
||||
t.log.Info("Repair is disabled, skipping repair check")
|
||||
}
|
||||
go OnLibraryUpdateHook(updatedPaths, t.cfg, t.log)
|
||||
go OnLibraryUpdateHook(updatedPaths, t.Config, t.log)
|
||||
|
||||
t.latestAdded = newTorrents[0].Added
|
||||
t.log.Info("Finished refreshing torrents")
|
||||
@@ -477,11 +484,11 @@ func (t *TorrentManager) getMoreInfo(rdTorrent realdebrid.Torrent) *Torrent {
|
||||
}
|
||||
|
||||
func (t *TorrentManager) getName(name, originalName string) string {
|
||||
if t.cfg.EnableRetainRDTorrentName() {
|
||||
if t.Config.EnableRetainRDTorrentName() {
|
||||
return name
|
||||
}
|
||||
// drop the extension from the name
|
||||
if t.cfg.EnableRetainFolderNameExtension() && strings.Contains(name, originalName) {
|
||||
if t.Config.EnableRetainFolderNameExtension() && strings.Contains(name, originalName) {
|
||||
return name
|
||||
} else {
|
||||
ret := strings.TrimSuffix(originalName, ".mp4")
|
||||
@@ -551,9 +558,9 @@ func (t *TorrentManager) organizeChaos(links []string, selectedFiles []*File) ([
|
||||
resultsChan <- Result{Response: download}
|
||||
return
|
||||
}
|
||||
resp := t.Api.UnrestrictUntilOk(link, t.cfg.ShouldServeFromRclone())
|
||||
resp := t.Api.UnrestrictUntilOk(link, t.Config.ShouldServeFromRclone())
|
||||
resultsChan <- Result{Response: resp}
|
||||
time.Sleep(time.Duration(t.cfg.GetReleaseUnrestrictAfterMs()) * time.Millisecond)
|
||||
time.Sleep(time.Duration(t.Config.GetReleaseUnrestrictAfterMs()) * time.Millisecond)
|
||||
})
|
||||
}
|
||||
|
||||
@@ -649,10 +656,11 @@ func (t *TorrentManager) Delete(accessKey string) {
|
||||
torrents.Remove(accessKey)
|
||||
}
|
||||
})
|
||||
t.updateSortedKeys()
|
||||
}
|
||||
|
||||
func (t *TorrentManager) Repair(accessKey string) {
|
||||
if !t.cfg.EnableRepair() {
|
||||
if !t.Config.EnableRepair() {
|
||||
t.log.Warn("Repair is disabled; if you do not have other zurg instances running, you should enable repair")
|
||||
return
|
||||
}
|
||||
@@ -700,6 +708,7 @@ func (t *TorrentManager) Repair(accessKey string) {
|
||||
t.DirectoryMap.IterCb(func(_ string, torrents cmap.ConcurrentMap[string, *Torrent]) {
|
||||
torrents.Remove(torrent.AccessKey)
|
||||
})
|
||||
t.updateSortedKeys()
|
||||
// t.log.Debugf("You can try fixing it yourself magnet:?xt=urn:btih:%s", info.Hash)
|
||||
return
|
||||
} else if streamableCount == 1 {
|
||||
@@ -708,6 +717,7 @@ func (t *TorrentManager) Repair(accessKey string) {
|
||||
t.DirectoryMap.IterCb(func(_ string, torrents cmap.ConcurrentMap[string, *Torrent]) {
|
||||
torrents.Remove(torrent.AccessKey)
|
||||
})
|
||||
t.updateSortedKeys()
|
||||
return
|
||||
}
|
||||
// t.log.Debugf("Identified the expired files of torrent id=%s", info.ID)
|
||||
@@ -876,3 +886,23 @@ func (t *TorrentManager) canCapacityHandle() bool {
|
||||
retryCount++
|
||||
}
|
||||
}
|
||||
|
||||
func (t *TorrentManager) updateSortedKeys() {
|
||||
t.DirectoryMap.IterCb(func(directory string, torrents cmap.ConcurrentMap[string, *Torrent]) {
|
||||
allKeys := torrents.Keys()
|
||||
sort.Strings(allKeys)
|
||||
davRet := ""
|
||||
htmlRet := ""
|
||||
for _, accessKey := range allKeys {
|
||||
if tor, ok := torrents.Get(accessKey); ok {
|
||||
if tor.AnyInProgress() {
|
||||
continue
|
||||
}
|
||||
davRet += dav.Directory(tor.AccessKey, tor.LatestAdded)
|
||||
htmlRet += fmt.Sprintf("<a href=\"/%s/%s\">%s</a><br>", directory, tor.AccessKey, tor.AccessKey)
|
||||
}
|
||||
}
|
||||
t.ResponseCache.Set(directory+".dav", davRet, 1)
|
||||
t.ResponseCache.Set(directory+".html", "<ol>"+htmlRet, 1)
|
||||
})
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user