Files
zurg/internal/torrent/manager.go
2024-01-30 02:06:39 +01:00

236 lines
6.5 KiB
Go

package torrent
import (
"io"
"os"
"path/filepath"
"strings"
"sync"
"github.com/debridmediamanager/zurg/internal/config"
"github.com/debridmediamanager/zurg/pkg/logutil"
"github.com/debridmediamanager/zurg/pkg/realdebrid"
mapset "github.com/deckarep/golang-set/v2"
cmap "github.com/orcaman/concurrent-map/v2"
"github.com/panjf2000/ants/v2"
)
const (
INT_ALL = "int__all__"
INT_INFO_CACHE = "int__info__"
)
type TorrentManager struct {
Config config.ConfigInterface
Api *realdebrid.RealDebrid
DirectoryMap cmap.ConcurrentMap[string, cmap.ConcurrentMap[string, *Torrent]] // directory -> accessKey -> Torrent
DownloadCache cmap.ConcurrentMap[string, *realdebrid.Download]
DownloadMap cmap.ConcurrentMap[string, *realdebrid.Download]
fixers cmap.ConcurrentMap[string, string] // trigger -> [command, id]
allAccessKeys mapset.Set[string]
latestState *LibraryState
requiredVersion string
workerPool *ants.Pool
refreshPool *ants.Pool
RefreshKillSwitch chan struct{}
RepairKillSwitch chan struct{}
repairPool *ants.Pool
repairTrigger chan *Torrent
repairSet mapset.Set[*Torrent]
repairRunning bool
repairRunningMu sync.Mutex
log *logutil.Logger
}
// NewTorrentManager creates a new torrent manager
// it will fetch all torrents and their info in the background
// and store them in-memory and cached in files
func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, workerPool, refreshPool, repairPool *ants.Pool, log *logutil.Logger) *TorrentManager {
t := &TorrentManager{
Config: cfg,
Api: api,
DirectoryMap: cmap.New[cmap.ConcurrentMap[string, *Torrent]](),
DownloadCache: cmap.New[*realdebrid.Download](),
DownloadMap: cmap.New[*realdebrid.Download](),
RefreshKillSwitch: make(chan struct{}, 1),
RepairKillSwitch: make(chan struct{}, 1),
allAccessKeys: mapset.NewSet[string](),
latestState: &LibraryState{},
requiredVersion: "0.9.3-hotfix.3",
workerPool: workerPool,
refreshPool: refreshPool,
repairPool: repairPool,
log: log,
}
t.fixers = t.readFixersFromFile()
t.initializeDirectories()
t.mountDownloads()
t.refreshTorrents()
t.SetNewLatestState(t.getCurrentState())
t.StartRefreshJob()
t.StartRepairJob()
return t
}
// proxy
func (t *TorrentManager) UnrestrictUntilOk(link string) *realdebrid.Download {
// check if it's a valid link
if !strings.HasPrefix(link, "http") {
return nil
}
if download, exists := t.DownloadCache.Get(link); exists {
return download
}
ret, err := t.Api.UnrestrictLink(link, t.Config.ShouldServeFromRclone())
if err != nil {
t.log.Warnf("Cannot unrestrict link %s: %v", link, err)
return nil
}
if ret != nil && ret.Link != "" && ret.Filename != "" {
t.DownloadCache.Set(ret.Link, ret)
t.DownloadMap.Set(ret.Filename, ret)
}
return ret
}
func (t *TorrentManager) GetKey(torrent *Torrent) string {
if !t.Config.ShouldIgnoreRenames() && torrent.Rename != "" {
return torrent.Rename
}
if t.Config.EnableRetainRDTorrentName() {
return torrent.Name
}
// drop the extension from the name
if t.Config.EnableRetainFolderNameExtension() && strings.Contains(torrent.Name, torrent.OriginalName) {
return torrent.Name
} else {
ret := strings.TrimSuffix(torrent.OriginalName, ".mp4")
ret = strings.TrimSuffix(ret, ".mkv")
return ret
}
}
func (t *TorrentManager) GetPath(file *File) string {
if !t.Config.ShouldIgnoreRenames() && file.Rename != "" {
return file.Rename
}
if t.Config.ShouldExposeFullPath() {
filename := strings.TrimPrefix(file.Path, "/")
filename = strings.ReplaceAll(filename, "/", " - ")
return filename
}
filename := filepath.Base(file.Path)
return filename
}
func (t *TorrentManager) writeTorrentToFile(instanceID string, torrent *Torrent) {
filePath := "data/" + instanceID + ".json"
file, err := os.Create(filePath)
if err != nil {
t.log.Warnf("Cannot create file %s: %v", filePath, err)
return
}
defer file.Close()
torrent.Version = t.requiredVersion
jsonData, err := json.Marshal(torrent)
if err != nil {
t.log.Warnf("Cannot marshal torrent: %v", err)
return
}
if _, err := file.Write(jsonData); err != nil {
t.log.Warnf("Cannot write to file %s: %v", filePath, err)
return
}
t.log.Debugf("Saved torrent %s to file", instanceID)
}
func (t *TorrentManager) readTorrentFromFile(torrentID string) *Torrent {
filePath := "data/" + torrentID + ".json"
file, err := os.Open(filePath)
if err != nil {
if os.IsNotExist(err) {
return nil
}
return nil
}
defer file.Close()
jsonData, err := io.ReadAll(file)
if err != nil {
return nil
}
var torrent *Torrent
if err := json.Unmarshal(jsonData, &torrent); err != nil {
return nil
}
if torrent.DownloadedIDs.Union(torrent.InProgressIDs).IsEmpty() {
t.log.Fatal("Torrent has no downloaded or in progress ids")
}
if torrent.Version != t.requiredVersion {
return nil
}
return torrent
}
func (t *TorrentManager) deleteTorrentFile(torrentID string) {
filePath := "data/" + torrentID + ".json"
err := os.Remove(filePath)
if err != nil {
t.log.Warnf("Cannot delete file %s: %v", filePath, err)
}
}
func (t *TorrentManager) mountDownloads() {
if !t.Config.EnableDownloadMount() {
return
}
_ = t.workerPool.Submit(func() {
page := 1
offset := 0
for {
downloads, totalDownloads, err := t.Api.GetDownloads(page, offset)
if err != nil {
t.log.Fatalf("Cannot get downloads: %v", err)
}
for i := range downloads {
t.DownloadMap.Set(downloads[i].Filename, &downloads[i])
}
offset += len(downloads)
page++
if offset >= totalDownloads {
break
}
}
t.log.Infof("Compiled into %d downloads", t.DownloadCache.Count())
})
}
func (t *TorrentManager) initializeDirectories() {
// create internal directories
t.DirectoryMap.Set(INT_ALL, cmap.New[*Torrent]()) // key is GetAccessKey()
t.DirectoryMap.Set(INT_INFO_CACHE, cmap.New[*Torrent]()) // key is Torrent ID
// create directory maps
for _, directory := range t.Config.GetDirectories() {
t.DirectoryMap.Set(directory, cmap.New[*Torrent]())
}
}
func (t *TorrentManager) saveTorrentChangesToDisk(torrent *Torrent, cb func(*Torrent)) {
infoCache, _ := t.DirectoryMap.Get(INT_INFO_CACHE)
torrent.DownloadedIDs.Union(torrent.InProgressIDs).Each(func(id string) bool {
info, exists := infoCache.Get(id)
if !exists {
return false
}
if cb != nil {
cb(info)
}
t.writeTorrentToFile(id, info)
return false
})
}