480 lines
12 KiB
Go
480 lines
12 KiB
Go
package torrent
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"os"
|
|
"path/filepath"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/debridmediamanager/zurg/internal/config"
|
|
"github.com/debridmediamanager/zurg/internal/fs"
|
|
"github.com/debridmediamanager/zurg/pkg/http"
|
|
"github.com/debridmediamanager/zurg/pkg/logutil"
|
|
"github.com/debridmediamanager/zurg/pkg/realdebrid"
|
|
"github.com/debridmediamanager/zurg/pkg/utils"
|
|
mapset "github.com/deckarep/golang-set/v2"
|
|
cmap "github.com/orcaman/concurrent-map/v2"
|
|
"github.com/panjf2000/ants/v2"
|
|
"gopkg.in/vansante/go-ffprobe.v2"
|
|
)
|
|
|
|
const (
|
|
INT_ALL = "int__all__"
|
|
)
|
|
|
|
type TorrentManager struct {
|
|
requiredVersion string
|
|
|
|
Config config.ConfigInterface
|
|
rd *realdebrid.RealDebrid
|
|
workerPool *ants.Pool
|
|
log *logutil.Logger
|
|
repairLog *logutil.Logger
|
|
|
|
DirectoryMap cmap.ConcurrentMap[string, cmap.ConcurrentMap[string, *Torrent]] // directory -> accessKey -> Torrent
|
|
DownloadMap cmap.ConcurrentMap[string, *realdebrid.Download]
|
|
|
|
RootNode *fs.FileNode
|
|
|
|
RefreshWorkerKillSwitch chan struct{}
|
|
RepairWorkerKillSwitch chan struct{}
|
|
RemountTrigger chan struct{}
|
|
RepairAllTrigger chan struct{}
|
|
DumpTrigger chan struct{}
|
|
AnalyzeTrigger chan struct{}
|
|
|
|
latestState *LibraryState
|
|
|
|
repairChan chan *Torrent
|
|
RepairQueue mapset.Set[*Torrent]
|
|
repairRunning bool
|
|
repairRunningMu sync.Mutex
|
|
|
|
OnceDoneBin mapset.Set[string]
|
|
hasFFprobe bool
|
|
}
|
|
|
|
// NewTorrentManager creates a new torrent manager
|
|
// it will fetch all torrents and their info in the background
|
|
// and store them in-memory and cached in files
|
|
func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, workerPool *ants.Pool, hasFFprobe bool, log, repairLog *logutil.Logger) *TorrentManager {
|
|
t := &TorrentManager{
|
|
requiredVersion: "0.10.0",
|
|
|
|
Config: cfg,
|
|
rd: api,
|
|
workerPool: workerPool,
|
|
log: log,
|
|
repairLog: repairLog,
|
|
|
|
DirectoryMap: cmap.New[cmap.ConcurrentMap[string, *Torrent]](),
|
|
DownloadMap: cmap.New[*realdebrid.Download](),
|
|
|
|
RootNode: fs.NewFileNode("root", true),
|
|
|
|
RefreshWorkerKillSwitch: make(chan struct{}, 1),
|
|
RepairWorkerKillSwitch: make(chan struct{}, 1),
|
|
RemountTrigger: make(chan struct{}, 1),
|
|
// RepairAllTrigger: make(chan struct{}, 1), // initialized in repair.go
|
|
DumpTrigger: make(chan struct{}, 1),
|
|
AnalyzeTrigger: make(chan struct{}, 1),
|
|
|
|
latestState: &LibraryState{log: log},
|
|
|
|
OnceDoneBin: mapset.NewSet[string](),
|
|
hasFFprobe: hasFFprobe,
|
|
}
|
|
|
|
t.initializeBins()
|
|
t.initializeDirectoryMaps()
|
|
|
|
var wg sync.WaitGroup
|
|
wg.Add(3)
|
|
|
|
t.workerPool.Submit(func() {
|
|
defer wg.Done()
|
|
t.loadDumpedTorrents()
|
|
})
|
|
t.workerPool.Submit(func() {
|
|
defer wg.Done()
|
|
|
|
// load *.zurgtorrent files
|
|
allTorrents, _ := t.DirectoryMap.Get(INT_ALL)
|
|
t.getTorrentFiles("data").Each(func(filePath string) bool {
|
|
torrent := t.readTorrentFromFile(filePath)
|
|
if torrent != nil {
|
|
accessKey := t.GetKey(torrent)
|
|
allTorrents.Set(accessKey, torrent)
|
|
t.assignDirectory(torrent, false)
|
|
}
|
|
return false
|
|
})
|
|
|
|
t.refreshTorrents(true)
|
|
})
|
|
t.workerPool.Submit(func() {
|
|
defer wg.Done()
|
|
t.mountNewDownloads()
|
|
})
|
|
|
|
t.workerPool.Submit(func() {
|
|
wg.Wait()
|
|
t.StartRefreshJob()
|
|
t.StartDownloadsJob()
|
|
t.StartRepairJob()
|
|
t.StartDumpJob()
|
|
t.StartMediaAnalysisJob()
|
|
|
|
t.setNewLatestState(t.getCurrentState())
|
|
|
|
t.EnqueueForRepair(nil)
|
|
})
|
|
|
|
return t
|
|
}
|
|
|
|
// proxy function
|
|
func (t *TorrentManager) UnrestrictFile(file *File) (*realdebrid.Download, error) {
|
|
if file.State.Is("deleted_file") {
|
|
return nil, fmt.Errorf("file %s has been deleted", file.Path)
|
|
} else if file.State.Is("broken_file") {
|
|
return nil, fmt.Errorf("file %s is broken", file.Path)
|
|
}
|
|
return t.rd.UnrestrictLink(file.Link)
|
|
}
|
|
|
|
func (t *TorrentManager) GetKey(torrent *Torrent) string {
|
|
if !t.Config.ShouldIgnoreRenames() && torrent.Rename != "" {
|
|
return torrent.Rename
|
|
}
|
|
if t.Config.EnableRetainRDTorrentName() {
|
|
return torrent.Name
|
|
}
|
|
// drop the extension from the name
|
|
if t.Config.EnableRetainFolderNameExtension() && strings.Contains(torrent.Name, torrent.OriginalName) {
|
|
return torrent.Name
|
|
} else {
|
|
ret := strings.TrimSuffix(torrent.OriginalName, ".mp4")
|
|
ret = strings.TrimSuffix(ret, ".mkv")
|
|
return ret
|
|
}
|
|
}
|
|
|
|
func (t *TorrentManager) GetPath(file *File) string {
|
|
if !t.Config.ShouldIgnoreRenames() && file.Rename != "" {
|
|
return file.Rename
|
|
}
|
|
filename := filepath.Base(file.Path)
|
|
return filename
|
|
}
|
|
|
|
/// torrent functions
|
|
|
|
func (t *TorrentManager) getTorrentFiles(parentDir string) mapset.Set[string] {
|
|
files, err := filepath.Glob(parentDir + "/*.zurgtorrent")
|
|
if err != nil {
|
|
t.log.Warnf("Cannot get files in %s directory: %v", parentDir, err)
|
|
return nil
|
|
}
|
|
return mapset.NewSet[string](files...)
|
|
}
|
|
|
|
func (t *TorrentManager) writeTorrentToFile(torrent *Torrent) {
|
|
filePath := "data/" + t.GetKey(torrent) + ".zurgtorrent"
|
|
file, err := os.Create(filePath)
|
|
if err != nil {
|
|
t.log.Warnf("Cannot create file %s: %v", filePath, err)
|
|
return
|
|
}
|
|
defer file.Close()
|
|
|
|
torrent.Version = t.requiredVersion
|
|
|
|
jsonData, err := json.Marshal(torrent)
|
|
if err != nil {
|
|
t.log.Warnf("Cannot marshal torrent: %v", err)
|
|
return
|
|
}
|
|
|
|
if _, err := file.Write(jsonData); err != nil {
|
|
t.log.Warnf("Cannot write to file %s: %v", filePath, err)
|
|
return
|
|
}
|
|
|
|
// t.log.Debugf("Saved torrent %s to file", t.GetKey(torrent))
|
|
}
|
|
|
|
func (t *TorrentManager) applyMediaInfoDetails(torrent *Torrent) error {
|
|
changesApplied := false
|
|
bwLimitReached := false
|
|
torrent.SelectedFiles.IterCb(func(_ string, file *File) {
|
|
if bwLimitReached {
|
|
return
|
|
}
|
|
isPlayable := utils.IsVideo(file.Path) || t.IsPlayable(file.Path)
|
|
if file.MediaInfo != nil || !file.State.Is("ok_file") || !isPlayable {
|
|
return
|
|
}
|
|
unrestrict, err := t.UnrestrictFile(file)
|
|
if dlErr, ok := err.(*http.DownloadErrorResponse); ok && dlErr.Message == "bytes_limit_reached" {
|
|
bwLimitReached = true
|
|
return
|
|
}
|
|
if unrestrict == nil {
|
|
file.State.Event(context.Background(), "break_file")
|
|
t.EnqueueForRepair(torrent)
|
|
changesApplied = true
|
|
return
|
|
}
|
|
ctx, cancelFn := context.WithTimeout(context.Background(), 5*time.Second)
|
|
defer cancelFn()
|
|
data, err := ffprobe.ProbeURL(ctx, unrestrict.Download)
|
|
if err != nil {
|
|
t.log.Warnf("Cannot probe file %s: %v", file.Path, err)
|
|
return
|
|
}
|
|
file.MediaInfo = data
|
|
changesApplied = true
|
|
})
|
|
if changesApplied {
|
|
t.writeTorrentToFile(torrent)
|
|
}
|
|
if bwLimitReached {
|
|
t.log.Warnf("Your account has reached the bandwidth limit, cannot apply media info details to the rest of the files")
|
|
return fmt.Errorf("bandwidth limit reached")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (t *TorrentManager) readTorrentFromFile(filePath string) *Torrent {
|
|
file, err := os.Open(filePath)
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
defer file.Close()
|
|
jsonData, err := io.ReadAll(file)
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
var torrent *Torrent
|
|
if err := json.Unmarshal(jsonData, &torrent); err != nil {
|
|
return nil
|
|
}
|
|
if torrent.Version != t.requiredVersion {
|
|
return nil
|
|
}
|
|
return torrent
|
|
}
|
|
|
|
/// end torrent functions
|
|
|
|
/// info functions
|
|
|
|
func (t *TorrentManager) getInfoFiles() mapset.Set[string] {
|
|
files, err := filepath.Glob("data/info/*.zurginfo")
|
|
if err != nil {
|
|
t.log.Warnf("Cannot get files in data directory: %v", err)
|
|
return nil
|
|
}
|
|
return mapset.NewSet[string](files...)
|
|
}
|
|
|
|
func (t *TorrentManager) writeInfoToFile(info *realdebrid.TorrentInfo) {
|
|
filePath := "data/info/" + info.ID + ".zurginfo"
|
|
file, err := os.Create(filePath)
|
|
if err != nil {
|
|
t.log.Warnf("Cannot create info file %s: %v", filePath, err)
|
|
return
|
|
}
|
|
defer file.Close()
|
|
|
|
jsonData, err := json.Marshal(info)
|
|
if err != nil {
|
|
t.log.Warnf("Cannot marshal torrent info: %v", err)
|
|
return
|
|
}
|
|
|
|
if _, err := file.Write(jsonData); err != nil {
|
|
t.log.Warnf("Cannot write to info file %s: %v", filePath, err)
|
|
return
|
|
}
|
|
|
|
// t.log.Debugf("Saved torrent %s to info file", info.ID)
|
|
}
|
|
|
|
func (t *TorrentManager) readInfoFromFile(torrentID string) *realdebrid.TorrentInfo {
|
|
filePath := "data/info/" + torrentID + ".zurginfo"
|
|
file, err := os.Open(filePath)
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
defer file.Close()
|
|
jsonData, err := io.ReadAll(file)
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
var info *realdebrid.TorrentInfo
|
|
if err := json.Unmarshal(jsonData, &info); err != nil {
|
|
return nil
|
|
}
|
|
return info
|
|
}
|
|
|
|
func (t *TorrentManager) deleteInfoFile(torrentID string) {
|
|
filePath := "data/info/" + torrentID + ".zurginfo"
|
|
_ = os.Remove(filePath)
|
|
}
|
|
|
|
/// end info functions
|
|
|
|
func (t *TorrentManager) mountNewDownloads() {
|
|
token, _ := t.rd.GetToken()
|
|
var tokenMap cmap.ConcurrentMap[string, *realdebrid.Download]
|
|
if token != "" {
|
|
tokenMap, _ = t.rd.UnrestrictMap.Get(token)
|
|
}
|
|
|
|
downloads := t.rd.GetDownloads()
|
|
mountedCount := 0
|
|
for i := range downloads {
|
|
isRealDebrid := strings.HasPrefix(downloads[i].Link, "https://real-debrid.com/d/")
|
|
if !isRealDebrid {
|
|
filename := filepath.Base(downloads[i].Filename)
|
|
t.DownloadMap.Set(filename, &downloads[i])
|
|
mountedCount++
|
|
} else if token != "" {
|
|
tokenMap.Set(downloads[i].Link, &downloads[i])
|
|
}
|
|
}
|
|
if mountedCount > 0 {
|
|
t.log.Infof("Mounted %d new downloads", mountedCount)
|
|
} else {
|
|
t.log.Debugf("No new downloads to mount")
|
|
}
|
|
}
|
|
|
|
// StartDownloadsJob: permanent job for remounting downloads
|
|
func (t *TorrentManager) StartDownloadsJob() {
|
|
t.workerPool.Submit(func() {
|
|
remountTicker := time.NewTicker(time.Duration(t.Config.GetDownloadsEveryMins()) * time.Minute)
|
|
defer remountTicker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-remountTicker.C:
|
|
t.DownloadMap.Clear()
|
|
t.mountNewDownloads()
|
|
case <-t.RemountTrigger:
|
|
t.DownloadMap.Clear()
|
|
t.mountNewDownloads()
|
|
}
|
|
}
|
|
})
|
|
}
|
|
|
|
func (t *TorrentManager) dumpTorrents() {
|
|
files := t.getTorrentFiles("data")
|
|
for file := range files.Iter() {
|
|
destPath := "dump/" + filepath.Base(file)
|
|
if err := copyFile(file, destPath); err != nil {
|
|
t.log.Warnf("Cannot copy file %s to %s: %v", file, destPath, err)
|
|
}
|
|
}
|
|
}
|
|
|
|
func copyFile(sourcePath, destPath string) error {
|
|
source, err := os.Open(sourcePath)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer source.Close()
|
|
|
|
destination, err := os.Create(destPath)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer destination.Close()
|
|
|
|
buf := make([]byte, 4096)
|
|
for {
|
|
n, err := source.Read(buf)
|
|
if err != nil && err != io.EOF {
|
|
return err
|
|
}
|
|
if n == 0 {
|
|
break
|
|
}
|
|
|
|
if _, err := destination.Write(buf[:n]); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// StartDumpJob: permanent job for dumping torrents
|
|
func (t *TorrentManager) StartDumpJob() {
|
|
t.workerPool.Submit(func() {
|
|
dumpTicker := time.NewTicker(time.Duration(t.Config.GetDumpTorrentsEveryMins()) * time.Minute)
|
|
defer dumpTicker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-dumpTicker.C:
|
|
t.dumpTorrents()
|
|
t.loadDumpedTorrents()
|
|
case <-t.DumpTrigger:
|
|
t.dumpTorrents()
|
|
t.loadDumpedTorrents()
|
|
}
|
|
}
|
|
})
|
|
}
|
|
|
|
func (t *TorrentManager) analyzeAllTorrents() {
|
|
allTorrents, _ := t.DirectoryMap.Get(INT_ALL)
|
|
totalCount := allTorrents.Count()
|
|
t.log.Infof("Applying media info details to all %d torrents", totalCount)
|
|
idx := 0
|
|
skipTheRest := false
|
|
allTorrents.IterCb(func(_ string, torrent *Torrent) {
|
|
if skipTheRest {
|
|
return
|
|
}
|
|
err := t.applyMediaInfoDetails(torrent)
|
|
if err != nil && err.Error() == "bandwidth limit reached" {
|
|
skipTheRest = true
|
|
return
|
|
}
|
|
idx++
|
|
t.log.Debugf("Applied media info details to torrent %s (%d/%d)", t.GetKey(torrent), idx, totalCount)
|
|
})
|
|
if skipTheRest {
|
|
t.log.Warnf("Bandwidth limit reached, skipped the rest of the torrents")
|
|
}
|
|
}
|
|
|
|
// StartMediaAnalysisJob: permanent job for analyzing media info (triggered by the user)
|
|
func (t *TorrentManager) StartMediaAnalysisJob() {
|
|
t.workerPool.Submit(func() {
|
|
for range t.AnalyzeTrigger {
|
|
t.analyzeAllTorrents()
|
|
}
|
|
})
|
|
}
|
|
|
|
func (t *TorrentManager) initializeDirectoryMaps() {
|
|
// create internal directories
|
|
t.DirectoryMap.Set(INT_ALL, cmap.New[*Torrent]()) // key is GetAccessKey()
|
|
// create directory maps
|
|
for _, directory := range t.Config.GetDirectories() {
|
|
t.DirectoryMap.Set(directory, cmap.New[*Torrent]())
|
|
// t.RootNode.AddChild(fs.NewFileNode(directory, true))
|
|
}
|
|
}
|