package torrent import ( "bytes" "context" "fmt" "io" "net/http" "os" "path/filepath" "regexp" "runtime" "strings" "sync" "time" "github.com/debridmediamanager/zurg/internal/config" "github.com/debridmediamanager/zurg/internal/fs" "github.com/debridmediamanager/zurg/pkg/logutil" "github.com/debridmediamanager/zurg/pkg/realdebrid" "github.com/debridmediamanager/zurg/pkg/utils" mapset "github.com/deckarep/golang-set/v2" cmap "github.com/orcaman/concurrent-map/v2" "github.com/panjf2000/ants/v2" "gopkg.in/vansante/go-ffprobe.v2" ) const ( INT_ALL = "int__all__" ) type TorrentManager struct { requiredVersion string Config config.ConfigInterface rd *realdebrid.RealDebrid workerPool *ants.Pool log *logutil.Logger repairLog *logutil.Logger DirectoryMap cmap.ConcurrentMap[string, cmap.ConcurrentMap[string, *Torrent]] // directory -> accessKey -> Torrent DownloadMap cmap.ConcurrentMap[string, *realdebrid.Download] RootNode *fs.FileNode RefreshWorkerKillSwitch chan struct{} RepairWorkerKillSwitch chan struct{} RemountTrigger chan struct{} RepairAllTrigger chan struct{} DumpTrigger chan struct{} AnalyzeTrigger chan struct{} latestState *LibraryState RepairQueue mapset.Set[*Torrent] repairRunning bool repairRunningMu sync.Mutex OnceDoneBin mapset.Set[string] hasFFprobe bool } // NewTorrentManager creates a new torrent manager // it will fetch all torrents and their info in the background // and store them in-memory and cached in files func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, workerPool *ants.Pool, hasFFprobe bool, log, repairLog *logutil.Logger) *TorrentManager { t := &TorrentManager{ requiredVersion: "0.10.0", Config: cfg, rd: api, workerPool: workerPool, log: log, repairLog: repairLog, DirectoryMap: cmap.New[cmap.ConcurrentMap[string, *Torrent]](), DownloadMap: cmap.New[*realdebrid.Download](), RootNode: fs.NewFileNode("root", true), RefreshWorkerKillSwitch: make(chan struct{}, 1), RepairWorkerKillSwitch: make(chan struct{}, 1), RemountTrigger: make(chan struct{}, 1), // RepairAllTrigger: make(chan struct{}, 1), // initialized in repair.go DumpTrigger: make(chan struct{}, 1), AnalyzeTrigger: make(chan struct{}, 1), latestState: &LibraryState{log: log}, OnceDoneBin: mapset.NewSet[string](), hasFFprobe: hasFFprobe, } t.initializeBins() t.initializeDirectoryMaps() var wg sync.WaitGroup wg.Add(3) t.workerPool.Submit(func() { defer wg.Done() t.loadDumpedTorrents() }) t.workerPool.Submit(func() { defer wg.Done() // initial load of existing *.zurgtorrent files torrents, _ := t.DirectoryMap.Get(INT_ALL) t.getTorrentFiles("data").Each(func(filePath string) bool { torrent := t.readTorrentFromFile(filePath) if torrent != nil { accessKey := t.GetKey(torrent) torrents.Set(accessKey, torrent) t.assignDirectory(torrent, false, false) } return false }) t.refreshTorrents(true) }) t.workerPool.Submit(func() { defer wg.Done() t.mountNewDownloads() }) t.workerPool.Submit(func() { wg.Wait() t.StartRefreshJob() t.StartDownloadsJob() t.StartRepairJob() t.StartDumpJob() t.StartMediaAnalysisJob() t.setNewLatestState(t.getCurrentState()) t.EnqueueForRepair(nil) }) return t } // proxy function func (t *TorrentManager) UnrestrictFile(file *File) (*realdebrid.Download, error) { if file.State.Is("deleted_file") { return nil, fmt.Errorf("file %s has been deleted", file.Path) } else if file.State.Is("broken_file") { return nil, fmt.Errorf("file %s is broken", file.Path) } return t.rd.UnrestrictAndVerify(file.Link) } func (t *TorrentManager) GetKey(torrent *Torrent) string { if !t.Config.ShouldIgnoreRenames() && torrent.Rename != "" { return torrent.Rename } if t.Config.EnableRetainRDTorrentName() { return torrent.Name } // drop the extension from the name if t.Config.EnableRetainFolderNameExtension() && strings.Contains(torrent.Name, torrent.OriginalName) { return torrent.Name } else { ret := strings.TrimSuffix(torrent.OriginalName, ".mp4") ret = strings.TrimSuffix(ret, ".mkv") return ret } } func (t *TorrentManager) GetPath(file *File) string { if !t.Config.ShouldIgnoreRenames() && file.Rename != "" { return file.Rename } filename := filepath.Base(file.Path) return filename } /// torrent functions func (t *TorrentManager) getTorrentFiles(parentDir string) mapset.Set[string] { files, err := filepath.Glob(parentDir + "/*.zurgtorrent") if err != nil { t.log.Warnf("Cannot get files in %s directory: %v", parentDir, err) return nil } return mapset.NewSet[string](files...) } func (t *TorrentManager) writeTorrentToFile(torrent *Torrent) { filePath := "data/" + t.getTorrentFilename(torrent) + ".zurgtorrent" file, err := os.Create(filePath) if err != nil { t.log.Warnf("Cannot create file %s: %v", filePath, err) return } defer file.Close() torrent.Version = t.requiredVersion jsonData, err := json.Marshal(torrent) if err != nil { t.log.Warnf("Cannot marshal torrent: %v", err) return } if _, err := file.Write(jsonData); err != nil { t.log.Warnf("Cannot write to file %s: %v", filePath, err) return } // t.log.Debugf("Saved torrent %s to file", t.GetKey(torrent)) } func (t *TorrentManager) sendTorrentToAPI(torrent *Torrent) { torrent.Version = t.requiredVersion jsonData, err := json.Marshal(torrent) if err != nil { return } req, err := http.NewRequest( "POST", "https://zurgtorrent.debridmediamanager.com/api/torrents", bytes.NewBuffer(jsonData), ) if err != nil { return } req.Header.Set("Content-Type", "application/json") client := &http.Client{} client.Do(req) } func (t *TorrentManager) applyMediaInfoDetails(torrent *Torrent) error { changesApplied := false bwLimitReached := false torrent.SelectedFiles.IterCb(func(_ string, file *File) { if bwLimitReached { return } isPlayable := utils.IsVideo(file.Path) || t.IsPlayable(file.Path) if file.MediaInfo != nil || !file.State.Is("ok_file") || !isPlayable { return } unrestrict, err := t.UnrestrictFile(file) if utils.AreAllTokensExpired(err) { bwLimitReached = true return } if unrestrict == nil { file.State.Event(context.Background(), "break_file") t.EnqueueForRepair(torrent) changesApplied = true return } ctx, cancelFn := context.WithTimeout(context.Background(), 5*time.Second) defer cancelFn() data, err := ffprobe.ProbeURL(ctx, unrestrict.Download) if err != nil { t.log.Warnf("Cannot probe file %s: %v", file.Path, err) return } file.MediaInfo = data changesApplied = true }) if changesApplied { t.writeTorrentToFile(torrent) } t.sendTorrentToAPI(torrent) if bwLimitReached { t.log.Warnf("Your account has reached the bandwidth limit, cannot apply media info details to the rest of the files") return fmt.Errorf("bandwidth limit reached") } return nil } func (t *TorrentManager) readTorrentFromFile(filePath string) *Torrent { file, err := os.Open(filePath) if err != nil { return nil } defer file.Close() jsonData, err := io.ReadAll(file) if err != nil { return nil } var torrent *Torrent if err := json.Unmarshal(jsonData, &torrent); err != nil { return nil } if torrent.Version != t.requiredVersion { return nil } torrent.SelectedFiles.IterCb(func(_ string, file *File) { if strings.HasPrefix(file.Link, "https://real-debrid.com/d/") { // set link to max 39 chars (26 + 13) file.Link = file.Link[0:39] } }) unassignedLinks := mapset.NewSet[string]() torrent.UnassignedLinks.Each(func(link string) bool { if strings.HasPrefix(link, "https://real-debrid.com/d/") { // set link to max 39 chars (26 + 13) unassignedLinks.Add(link[0:39]) } else { unassignedLinks.Add(link) } return false }) torrent.UnassignedLinks = unassignedLinks return torrent } /// end torrent functions /// info functions func (t *TorrentManager) getInfoFiles() mapset.Set[string] { files, err := filepath.Glob("data/info/*.zurginfo") if err != nil { t.log.Warnf("Cannot get files in data directory: %v", err) return nil } return mapset.NewSet[string](files...) } func (t *TorrentManager) writeInfoToFile(info *realdebrid.TorrentInfo) { filePath := "data/info/" + info.ID + ".zurginfo" file, err := os.Create(filePath) if err != nil { t.log.Warnf("Cannot create info file %s: %v", filePath, err) return } defer file.Close() jsonData, err := json.Marshal(info) if err != nil { t.log.Warnf("Cannot marshal torrent info: %v", err) return } if _, err := file.Write(jsonData); err != nil { t.log.Warnf("Cannot write to info file %s: %v", filePath, err) return } // t.log.Debugf("Saved torrent %s to info file", info.ID) } func (t *TorrentManager) readInfoFromFile(torrentID string) *realdebrid.TorrentInfo { filePath := "data/info/" + torrentID + ".zurginfo" file, err := os.Open(filePath) if err != nil { return nil } defer file.Close() jsonData, err := io.ReadAll(file) if err != nil { return nil } var info *realdebrid.TorrentInfo if err := json.Unmarshal(jsonData, &info); err != nil { return nil } if info.Progress != 100 { return nil } return info } func (t *TorrentManager) deleteInfoFile(torrentID string) { filePath := "data/info/" + torrentID + ".zurginfo" _ = os.Remove(filePath) } /// end info functions func (t *TorrentManager) mountNewDownloads() { token := t.Config.GetToken() tokenMap, _ := t.rd.UnrestrictMap.Get(token) // clear maps tokenMap.Clear() t.DownloadMap.Clear() downloads := t.rd.GetDownloads() mountedCount := 0 for i := range downloads { downloads[i].Token = token if strings.HasPrefix(downloads[i].Link, "https://real-debrid.com/d/") { downloads[i].Link = downloads[i].Link[0:39] tokenMap.Set(downloads[i].Link, &downloads[i]) continue } filename := filepath.Base(downloads[i].Filename) // account for resolution in the type if strings.Contains(downloads[i].Type, "x") { // extract extension from the filename ext := filepath.Ext(filename) trimmed := strings.TrimSuffix(filename, ext) // it's a resolution so extract 2nd part and add it to the filename parts := strings.Split(downloads[i].Type, "x") if len(parts) > 1 { filename = fmt.Sprintf("%s (%sp)%s", trimmed, parts[1], ext) } } // t.log.Debugf("Download dump: %+v", downloads[i]) t.DownloadMap.Set(filename, &downloads[i]) mountedCount++ } if mountedCount > 0 { t.log.Infof("Mounted %d new downloads", mountedCount) } else { t.log.Debugf("No new downloads to mount") } } // StartDownloadsJob: permanent job for remounting downloads func (t *TorrentManager) StartDownloadsJob() { t.workerPool.Submit(func() { remountTicker := time.NewTicker(time.Duration(t.Config.GetDownloadsEveryMins()) * time.Minute) defer remountTicker.Stop() for { select { case <-remountTicker.C: t.mountNewDownloads() case <-t.RemountTrigger: t.mountNewDownloads() } } }) } func (t *TorrentManager) dumpTorrents() { files := t.getTorrentFiles("data") for file := range files.Iter() { destPath := "dump/" + filepath.Base(file) if err := copyFile(file, destPath); err != nil { t.log.Warnf("Cannot copy file %s to %s: %v", file, destPath, err) } } } func copyFile(sourcePath, destPath string) error { source, err := os.Open(sourcePath) if err != nil { return err } defer source.Close() destination, err := os.Create(destPath) if err != nil { return err } defer destination.Close() buf := make([]byte, 4096) for { n, err := source.Read(buf) if err != nil && err != io.EOF { return err } if n == 0 { break } if _, err := destination.Write(buf[:n]); err != nil { return err } } return nil } // StartDumpJob: permanent job for dumping torrents func (t *TorrentManager) StartDumpJob() { t.workerPool.Submit(func() { dumpTicker := time.NewTicker(time.Duration(t.Config.GetDumpTorrentsEveryMins()) * time.Minute) defer dumpTicker.Stop() for { select { case <-dumpTicker.C: t.dumpTorrents() t.loadDumpedTorrents() case <-t.DumpTrigger: t.dumpTorrents() t.loadDumpedTorrents() } } }) } func (t *TorrentManager) analyzeAllTorrents() { torrents, _ := t.DirectoryMap.Get(INT_ALL) totalCount := torrents.Count() t.log.Infof("Applying media info details to all %d torrents", totalCount) idx := 0 skipTheRest := false torrents.IterCb(func(_ string, torrent *Torrent) { if skipTheRest { return } err := t.applyMediaInfoDetails(torrent) if err != nil && err.Error() == "bandwidth limit reached" { skipTheRest = true return } idx++ t.log.Debugf("Applied media info details to torrent %s (%d/%d)", t.GetKey(torrent), idx, totalCount) }) if skipTheRest { t.log.Warnf("Bandwidth limit reached, skipped the rest of the torrents") } } // StartMediaAnalysisJob: permanent job for analyzing media info (triggered by the user) func (t *TorrentManager) StartMediaAnalysisJob() { t.workerPool.Submit(func() { for range t.AnalyzeTrigger { t.analyzeAllTorrents() } }) } func (t *TorrentManager) initializeDirectoryMaps() { // create internal directories t.DirectoryMap.Set(INT_ALL, cmap.New[*Torrent]()) // key is GetAccessKey() // create directory maps for _, directory := range t.Config.GetDirectories() { t.DirectoryMap.Set(directory, cmap.New[*Torrent]()) } // create special directories t.DirectoryMap.Set(config.ALL_TORRENTS, cmap.New[*Torrent]()) t.DirectoryMap.Set(config.DOWNLOADS, cmap.New[*Torrent]()) t.DirectoryMap.Set(config.DUMPED_TORRENTS, cmap.New[*Torrent]()) t.DirectoryMap.Set(config.UNPLAYABLE_TORRENTS, cmap.New[*Torrent]()) } func (t *TorrentManager) getTorrentFilename(torrent *Torrent) string { if t.Config.EnableRetainRDTorrentName() { return sanitizeFileName(torrent.Name) } // drop the extension from the name if t.Config.EnableRetainFolderNameExtension() && strings.Contains(torrent.Name, torrent.OriginalName) { return sanitizeFileName(torrent.Name) } ret := strings.TrimSuffix(torrent.OriginalName, ".mp4") ret = strings.TrimSuffix(ret, ".mkv") return sanitizeFileName(ret) } func (t *TorrentManager) getTorrentInfoFilename(torrent *realdebrid.TorrentInfo) string { if t.Config.EnableRetainRDTorrentName() { return sanitizeFileName(torrent.Name) } // drop the extension from the name if t.Config.EnableRetainFolderNameExtension() && strings.Contains(torrent.Name, torrent.OriginalName) { return sanitizeFileName(torrent.Name) } ret := strings.TrimSuffix(torrent.OriginalName, ".mp4") ret = strings.TrimSuffix(ret, ".mkv") return sanitizeFileName(ret) } // sanitizeFileName takes a string and converts it to a valid Windows filename func sanitizeFileName(input string) string { if !isWindows() { return input } // Define a regex pattern to match invalid filename characters invalidChars := regexp.MustCompile(`[<>:"/\\|?*]+`) // Replace invalid characters with an underscore sanitized := invalidChars.ReplaceAllString(input, "_") // Trim leading and trailing whitespace and dots sanitized = strings.TrimSpace(sanitized) sanitized = strings.Trim(sanitized, ".") // Ensure the filename is not empty if sanitized == "" { sanitized = "default_filename" } return sanitized } func isWindows() bool { return runtime.GOOS == "windows" }