Files
zurg/internal/torrent/manager.go
2024-07-21 02:37:32 +02:00

599 lines
15 KiB
Go

package torrent
import (
"bytes"
"context"
"fmt"
"io"
"net/http"
"os"
"path/filepath"
"regexp"
"runtime"
"strings"
"sync"
"time"
"github.com/debridmediamanager/zurg/internal/config"
"github.com/debridmediamanager/zurg/internal/fs"
"github.com/debridmediamanager/zurg/pkg/logutil"
"github.com/debridmediamanager/zurg/pkg/realdebrid"
"github.com/debridmediamanager/zurg/pkg/utils"
mapset "github.com/deckarep/golang-set/v2"
cmap "github.com/orcaman/concurrent-map/v2"
"github.com/panjf2000/ants/v2"
"gopkg.in/vansante/go-ffprobe.v2"
)
const (
INT_ALL = "int__all__"
)
type TorrentManager struct {
requiredVersion string
Config config.ConfigInterface
rd *realdebrid.RealDebrid
workerPool *ants.Pool
log *logutil.Logger
repairLog *logutil.Logger
DirectoryMap cmap.ConcurrentMap[string, cmap.ConcurrentMap[string, *Torrent]] // directory -> accessKey -> Torrent
DownloadMap cmap.ConcurrentMap[string, *realdebrid.Download]
RootNode *fs.FileNode
RefreshWorkerKillSwitch chan struct{}
RepairWorkerKillSwitch chan struct{}
RemountTrigger chan struct{}
RepairAllTrigger chan struct{}
DumpTrigger chan struct{}
AnalyzeTrigger chan struct{}
latestState *LibraryState
RepairQueue mapset.Set[*Torrent]
repairRunning bool
repairRunningMu sync.Mutex
OnceDoneBin mapset.Set[string]
hasFFprobe bool
}
// NewTorrentManager creates a new torrent manager
// it will fetch all torrents and their info in the background
// and store them in-memory and cached in files
func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, workerPool *ants.Pool, hasFFprobe bool, log, repairLog *logutil.Logger) *TorrentManager {
t := &TorrentManager{
requiredVersion: "0.10.0",
Config: cfg,
rd: api,
workerPool: workerPool,
log: log,
repairLog: repairLog,
DirectoryMap: cmap.New[cmap.ConcurrentMap[string, *Torrent]](),
DownloadMap: cmap.New[*realdebrid.Download](),
RootNode: fs.NewFileNode("root", true),
RefreshWorkerKillSwitch: make(chan struct{}, 1),
RepairWorkerKillSwitch: make(chan struct{}, 1),
RemountTrigger: make(chan struct{}, 1),
// RepairAllTrigger: make(chan struct{}, 1), // initialized in repair.go
DumpTrigger: make(chan struct{}, 1),
AnalyzeTrigger: make(chan struct{}, 1),
latestState: &LibraryState{log: log},
OnceDoneBin: mapset.NewSet[string](),
hasFFprobe: hasFFprobe,
}
t.initializeBins()
t.initializeDirectoryMaps()
var wg sync.WaitGroup
wg.Add(3)
t.workerPool.Submit(func() {
defer wg.Done()
t.loadDumpedTorrents()
})
t.workerPool.Submit(func() {
defer wg.Done()
// initial load of existing *.zurgtorrent files
torrents, _ := t.DirectoryMap.Get(INT_ALL)
t.getTorrentFiles("data").Each(func(filePath string) bool {
torrent := t.readTorrentFromFile(filePath)
if torrent != nil {
accessKey := t.GetKey(torrent)
torrents.Set(accessKey, torrent)
t.assignDirectory(torrent, false, false)
}
return false
})
t.refreshTorrents(true)
})
t.workerPool.Submit(func() {
defer wg.Done()
t.mountNewDownloads()
})
t.workerPool.Submit(func() {
wg.Wait()
t.StartRefreshJob()
t.StartDownloadsJob()
t.StartRepairJob()
t.StartDumpJob()
t.StartMediaAnalysisJob()
t.setNewLatestState(t.getCurrentState())
t.EnqueueForRepair(nil)
})
return t
}
// proxy function
func (t *TorrentManager) UnrestrictFile(file *File) (*realdebrid.Download, error) {
if file.State.Is("deleted_file") {
return nil, fmt.Errorf("file %s has been deleted", file.Path)
} else if file.State.Is("broken_file") {
return nil, fmt.Errorf("file %s is broken", file.Path)
}
return t.rd.UnrestrictAndVerify(file.Link)
}
func (t *TorrentManager) GetKey(torrent *Torrent) string {
if !t.Config.ShouldIgnoreRenames() && torrent.Rename != "" {
return torrent.Rename
}
if t.Config.EnableRetainRDTorrentName() {
return torrent.Name
}
// drop the extension from the name
if t.Config.EnableRetainFolderNameExtension() && strings.Contains(torrent.Name, torrent.OriginalName) {
return torrent.Name
} else {
ret := strings.TrimSuffix(torrent.OriginalName, ".mp4")
ret = strings.TrimSuffix(ret, ".mkv")
return ret
}
}
func (t *TorrentManager) GetPath(file *File) string {
if !t.Config.ShouldIgnoreRenames() && file.Rename != "" {
return file.Rename
}
filename := filepath.Base(file.Path)
return filename
}
/// torrent functions
func (t *TorrentManager) getTorrentFiles(parentDir string) mapset.Set[string] {
files, err := filepath.Glob(parentDir + "/*.zurgtorrent")
if err != nil {
t.log.Warnf("Cannot get files in %s directory: %v", parentDir, err)
return nil
}
return mapset.NewSet[string](files...)
}
func (t *TorrentManager) writeTorrentToFile(torrent *Torrent) {
filePath := "data/" + t.getTorrentFilename(torrent) + ".zurgtorrent"
file, err := os.Create(filePath)
if err != nil {
t.log.Warnf("Cannot create file %s: %v", filePath, err)
return
}
defer file.Close()
torrent.Version = t.requiredVersion
jsonData, err := json.Marshal(torrent)
if err != nil {
t.log.Warnf("Cannot marshal torrent: %v", err)
return
}
if _, err := file.Write(jsonData); err != nil {
t.log.Warnf("Cannot write to file %s: %v", filePath, err)
return
}
// t.log.Debugf("Saved torrent %s to file", t.GetKey(torrent))
}
func (t *TorrentManager) sendTorrentToAPI(torrent *Torrent) {
torrent.Version = t.requiredVersion
jsonData, err := json.Marshal(torrent)
if err != nil {
return
}
req, err := http.NewRequest(
"POST",
"https://zurgtorrent.debridmediamanager.com/api/torrents",
bytes.NewBuffer(jsonData),
)
if err != nil {
return
}
req.Header.Set("Content-Type", "application/json")
client := &http.Client{}
client.Do(req)
}
func (t *TorrentManager) applyMediaInfoDetails(torrent *Torrent) error {
changesApplied := false
bwLimitReached := false
torrent.SelectedFiles.IterCb(func(_ string, file *File) {
if bwLimitReached {
return
}
isPlayable := utils.IsVideo(file.Path) || t.IsPlayable(file.Path)
if file.MediaInfo != nil || !file.State.Is("ok_file") || !isPlayable {
return
}
unrestrict, err := t.UnrestrictFile(file)
if utils.AreAllTokensExpired(err) {
bwLimitReached = true
return
}
if unrestrict == nil {
file.State.Event(context.Background(), "break_file")
t.EnqueueForRepair(torrent)
changesApplied = true
return
}
ctx, cancelFn := context.WithTimeout(context.Background(), 5*time.Second)
defer cancelFn()
data, err := ffprobe.ProbeURL(ctx, unrestrict.Download)
if err != nil {
t.log.Warnf("Cannot probe file %s: %v", file.Path, err)
return
}
file.MediaInfo = data
changesApplied = true
})
if changesApplied {
t.writeTorrentToFile(torrent)
}
t.sendTorrentToAPI(torrent)
if bwLimitReached {
t.log.Warnf("Your account has reached the bandwidth limit, cannot apply media info details to the rest of the files")
return fmt.Errorf("bandwidth limit reached")
}
return nil
}
func (t *TorrentManager) readTorrentFromFile(filePath string) *Torrent {
file, err := os.Open(filePath)
if err != nil {
return nil
}
defer file.Close()
jsonData, err := io.ReadAll(file)
if err != nil {
return nil
}
var torrent *Torrent
if err := json.Unmarshal(jsonData, &torrent); err != nil {
return nil
}
if torrent.Version != t.requiredVersion {
return nil
}
torrent.SelectedFiles.IterCb(func(_ string, file *File) {
if strings.HasPrefix(file.Link, "https://real-debrid.com/d/") {
// set link to max 39 chars (26 + 13)
file.Link = file.Link[0:39]
}
})
unassignedLinks := mapset.NewSet[string]()
torrent.UnassignedLinks.Each(func(link string) bool {
if strings.HasPrefix(link, "https://real-debrid.com/d/") {
// set link to max 39 chars (26 + 13)
unassignedLinks.Add(link[0:39])
} else {
unassignedLinks.Add(link)
}
return false
})
torrent.UnassignedLinks = unassignedLinks
return torrent
}
/// end torrent functions
/// info functions
func (t *TorrentManager) getInfoFiles() mapset.Set[string] {
files, err := filepath.Glob("data/info/*.zurginfo")
if err != nil {
t.log.Warnf("Cannot get files in data directory: %v", err)
return nil
}
return mapset.NewSet[string](files...)
}
func (t *TorrentManager) writeInfoToFile(info *realdebrid.TorrentInfo) {
filePath := "data/info/" + info.ID + ".zurginfo"
file, err := os.Create(filePath)
if err != nil {
t.log.Warnf("Cannot create info file %s: %v", filePath, err)
return
}
defer file.Close()
jsonData, err := json.Marshal(info)
if err != nil {
t.log.Warnf("Cannot marshal torrent info: %v", err)
return
}
if _, err := file.Write(jsonData); err != nil {
t.log.Warnf("Cannot write to info file %s: %v", filePath, err)
return
}
// t.log.Debugf("Saved torrent %s to info file", info.ID)
}
func (t *TorrentManager) readInfoFromFile(torrentID string) *realdebrid.TorrentInfo {
filePath := "data/info/" + torrentID + ".zurginfo"
file, err := os.Open(filePath)
if err != nil {
return nil
}
defer file.Close()
jsonData, err := io.ReadAll(file)
if err != nil {
return nil
}
var info *realdebrid.TorrentInfo
if err := json.Unmarshal(jsonData, &info); err != nil {
return nil
}
if info.Progress != 100 {
return nil
}
return info
}
func (t *TorrentManager) deleteInfoFile(torrentID string) {
filePath := "data/info/" + torrentID + ".zurginfo"
_ = os.Remove(filePath)
}
/// end info functions
func (t *TorrentManager) mountNewDownloads() {
token := t.Config.GetToken()
tokenMap, _ := t.rd.UnrestrictMap.Get(token)
// clear maps
tokenMap.Clear()
t.DownloadMap.Clear()
downloads := t.rd.GetDownloads()
mountedCount := 0
for i := range downloads {
downloads[i].Token = token
if strings.HasPrefix(downloads[i].Link, "https://real-debrid.com/d/") {
downloads[i].Link = downloads[i].Link[0:39]
tokenMap.Set(downloads[i].Link, &downloads[i])
continue
}
filename := filepath.Base(downloads[i].Filename)
// account for resolution in the type
if strings.Contains(downloads[i].Type, "x") {
// extract extension from the filename
ext := filepath.Ext(filename)
trimmed := strings.TrimSuffix(filename, ext)
// it's a resolution so extract 2nd part and add it to the filename
parts := strings.Split(downloads[i].Type, "x")
if len(parts) > 1 {
filename = fmt.Sprintf("%s (%sp)%s", trimmed, parts[1], ext)
}
}
// t.log.Debugf("Download dump: %+v", downloads[i])
t.DownloadMap.Set(filename, &downloads[i])
mountedCount++
}
if mountedCount > 0 {
t.log.Infof("Mounted %d new downloads", mountedCount)
} else {
t.log.Debugf("No new downloads to mount")
}
}
// StartDownloadsJob: permanent job for remounting downloads
func (t *TorrentManager) StartDownloadsJob() {
t.workerPool.Submit(func() {
remountTicker := time.NewTicker(time.Duration(t.Config.GetDownloadsEveryMins()) * time.Minute)
defer remountTicker.Stop()
for {
select {
case <-remountTicker.C:
t.mountNewDownloads()
case <-t.RemountTrigger:
t.mountNewDownloads()
}
}
})
}
func (t *TorrentManager) dumpTorrents() {
files := t.getTorrentFiles("data")
for file := range files.Iter() {
destPath := "dump/" + filepath.Base(file)
if err := copyFile(file, destPath); err != nil {
t.log.Warnf("Cannot copy file %s to %s: %v", file, destPath, err)
}
}
}
func copyFile(sourcePath, destPath string) error {
source, err := os.Open(sourcePath)
if err != nil {
return err
}
defer source.Close()
destination, err := os.Create(destPath)
if err != nil {
return err
}
defer destination.Close()
buf := make([]byte, 4096)
for {
n, err := source.Read(buf)
if err != nil && err != io.EOF {
return err
}
if n == 0 {
break
}
if _, err := destination.Write(buf[:n]); err != nil {
return err
}
}
return nil
}
// StartDumpJob: permanent job for dumping torrents
func (t *TorrentManager) StartDumpJob() {
t.workerPool.Submit(func() {
dumpTicker := time.NewTicker(time.Duration(t.Config.GetDumpTorrentsEveryMins()) * time.Minute)
defer dumpTicker.Stop()
for {
select {
case <-dumpTicker.C:
t.dumpTorrents()
t.loadDumpedTorrents()
case <-t.DumpTrigger:
t.dumpTorrents()
t.loadDumpedTorrents()
}
}
})
}
func (t *TorrentManager) analyzeAllTorrents() {
torrents, _ := t.DirectoryMap.Get(INT_ALL)
totalCount := torrents.Count()
t.log.Infof("Applying media info details to all %d torrents", totalCount)
idx := 0
skipTheRest := false
torrents.IterCb(func(_ string, torrent *Torrent) {
if skipTheRest {
return
}
err := t.applyMediaInfoDetails(torrent)
if err != nil && err.Error() == "bandwidth limit reached" {
skipTheRest = true
return
}
idx++
t.log.Debugf("Applied media info details to torrent %s (%d/%d)", t.GetKey(torrent), idx, totalCount)
})
if skipTheRest {
t.log.Warnf("Bandwidth limit reached, skipped the rest of the torrents")
}
}
// StartMediaAnalysisJob: permanent job for analyzing media info (triggered by the user)
func (t *TorrentManager) StartMediaAnalysisJob() {
t.workerPool.Submit(func() {
for range t.AnalyzeTrigger {
t.analyzeAllTorrents()
}
})
}
func (t *TorrentManager) initializeDirectoryMaps() {
// create internal directories
t.DirectoryMap.Set(INT_ALL, cmap.New[*Torrent]()) // key is GetAccessKey()
// create directory maps
for _, directory := range t.Config.GetDirectories() {
t.DirectoryMap.Set(directory, cmap.New[*Torrent]())
}
// create special directories
t.DirectoryMap.Set(config.ALL_TORRENTS, cmap.New[*Torrent]())
t.DirectoryMap.Set(config.DUMPED_TORRENTS, cmap.New[*Torrent]())
t.DirectoryMap.Set(config.UNPLAYABLE_TORRENTS, cmap.New[*Torrent]())
}
func (t *TorrentManager) getTorrentFilename(torrent *Torrent) string {
if t.Config.EnableRetainRDTorrentName() {
return sanitizeFileName(torrent.Name)
}
// drop the extension from the name
if t.Config.EnableRetainFolderNameExtension() && strings.Contains(torrent.Name, torrent.OriginalName) {
return sanitizeFileName(torrent.Name)
}
ret := strings.TrimSuffix(torrent.OriginalName, ".mp4")
ret = strings.TrimSuffix(ret, ".mkv")
return sanitizeFileName(ret)
}
func (t *TorrentManager) getTorrentInfoFilename(torrent *realdebrid.TorrentInfo) string {
if t.Config.EnableRetainRDTorrentName() {
return sanitizeFileName(torrent.Name)
}
// drop the extension from the name
if t.Config.EnableRetainFolderNameExtension() && strings.Contains(torrent.Name, torrent.OriginalName) {
return sanitizeFileName(torrent.Name)
}
ret := strings.TrimSuffix(torrent.OriginalName, ".mp4")
ret = strings.TrimSuffix(ret, ".mkv")
return sanitizeFileName(ret)
}
// sanitizeFileName takes a string and converts it to a valid Windows filename
func sanitizeFileName(input string) string {
if !isWindows() {
return input
}
// Define a regex pattern to match invalid filename characters
invalidChars := regexp.MustCompile(`[<>:"/\\|?*]+`)
// Replace invalid characters with an underscore
sanitized := invalidChars.ReplaceAllString(input, "_")
// Trim leading and trailing whitespace and dots
sanitized = strings.TrimSpace(sanitized)
sanitized = strings.Trim(sanitized, ".")
// Ensure the filename is not empty
if sanitized == "" {
sanitized = "default_filename"
}
return sanitized
}
func isWindows() bool {
return runtime.GOOS == "windows"
}