package torrent import ( "io" "os" "path/filepath" "strings" "sync" "github.com/debridmediamanager/zurg/internal/config" "github.com/debridmediamanager/zurg/pkg/logutil" "github.com/debridmediamanager/zurg/pkg/realdebrid" mapset "github.com/deckarep/golang-set/v2" cmap "github.com/orcaman/concurrent-map/v2" "github.com/panjf2000/ants/v2" ) const ( INT_ALL = "int__all__" INT_INFO_CACHE = "int__info__" ) type TorrentManager struct { Config config.ConfigInterface Api *realdebrid.RealDebrid DirectoryMap cmap.ConcurrentMap[string, cmap.ConcurrentMap[string, *Torrent]] // directory -> accessKey -> Torrent DownloadCache cmap.ConcurrentMap[string, *realdebrid.Download] DownloadMap cmap.ConcurrentMap[string, *realdebrid.Download] fixers cmap.ConcurrentMap[string, *Torrent] deleteOnceDone mapset.Set[string] allAccessKeys mapset.Set[string] latestState *LibraryState requiredVersion string workerPool *ants.Pool repairPool *ants.Pool repairTrigger chan *Torrent repairSet mapset.Set[*Torrent] repairRunning bool repairRunningMu sync.Mutex log *logutil.Logger } // NewTorrentManager creates a new torrent manager // it will fetch all torrents and their info in the background // and store them in-memory and cached in files func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, workerPool, repairPool *ants.Pool, log *logutil.Logger) *TorrentManager { t := &TorrentManager{ Config: cfg, Api: api, DirectoryMap: cmap.New[cmap.ConcurrentMap[string, *Torrent]](), DownloadCache: cmap.New[*realdebrid.Download](), DownloadMap: cmap.New[*realdebrid.Download](), fixers: cmap.New[*Torrent](), deleteOnceDone: mapset.NewSet[string](), allAccessKeys: mapset.NewSet[string](), latestState: &LibraryState{}, requiredVersion: "0.9.3-hotfix.3", workerPool: workerPool, repairPool: repairPool, log: log, } t.initializeDirectories() t.mountDownloads() t.refreshTorrents() t.SetNewLatestState(t.getCurrentState()) t.startRefreshJob() t.startRepairJob() return t } // proxy func (t *TorrentManager) UnrestrictUntilOk(link string) *realdebrid.Download { if !strings.HasPrefix(link, "http") { return nil } if download, exists := t.DownloadCache.Get(link); exists { return download } ret, err := t.Api.UnrestrictLink(link, t.Config.ShouldServeFromRclone()) if err != nil { t.log.Warnf("Cannot unrestrict link %s: %v", link, err) return nil } if ret != nil && ret.Link != "" && ret.Filename != "" { t.DownloadCache.Set(ret.Link, ret) t.DownloadMap.Set(ret.Filename, ret) } return ret } func (t *TorrentManager) GetKey(torrent *Torrent) string { if !t.Config.ShouldIgnoreRenames() && torrent.Rename != "" { return torrent.Rename } if t.Config.EnableRetainRDTorrentName() { return torrent.Name } // drop the extension from the name if t.Config.EnableRetainFolderNameExtension() && strings.Contains(torrent.Name, torrent.OriginalName) { return torrent.Name } else { ret := strings.TrimSuffix(torrent.OriginalName, ".mp4") ret = strings.TrimSuffix(ret, ".mkv") return ret } } func (t *TorrentManager) GetPath(file *File) string { if t.Config.ShouldExposeFullPath() { filename := strings.TrimPrefix(file.Path, "/") filename = strings.ReplaceAll(filename, "/", " - ") return filename } filename := filepath.Base(file.Path) return filename } func (t *TorrentManager) writeTorrentToFile(instanceID string, torrent *Torrent) { filePath := "data/" + instanceID + ".json" file, err := os.Create(filePath) if err != nil { t.log.Warnf("Cannot create file %s: %v", filePath, err) return } defer file.Close() torrent.Version = t.requiredVersion jsonData, err := json.Marshal(torrent) if err != nil { t.log.Warnf("Cannot marshal torrent: %v", err) return } if _, err := file.Write(jsonData); err != nil { t.log.Warnf("Cannot write to file %s: %v", filePath, err) return } t.log.Debugf("Saved torrent %s to file", instanceID) } func (t *TorrentManager) readTorrentFromFile(torrentID string) *Torrent { filePath := "data/" + torrentID + ".json" file, err := os.Open(filePath) if err != nil { if os.IsNotExist(err) { return nil } return nil } defer file.Close() jsonData, err := io.ReadAll(file) if err != nil { return nil } var torrent *Torrent if err := json.Unmarshal(jsonData, &torrent); err != nil { return nil } if torrent.DownloadedIDs.Union(torrent.InProgressIDs).IsEmpty() { t.log.Fatal("Torrent has no downloaded or in progress ids") } if torrent.Version != t.requiredVersion { return nil } return torrent } func (t *TorrentManager) deleteTorrentFile(torrentID string) { filePath := "data/" + torrentID + ".json" err := os.Remove(filePath) if err != nil { t.log.Warnf("Cannot delete file %s: %v", filePath, err) } } func (t *TorrentManager) mountDownloads() { if !t.Config.EnableDownloadMount() { return } _ = t.workerPool.Submit(func() { page := 1 offset := 0 for { downloads, totalDownloads, err := t.Api.GetDownloads(page, offset) if err != nil { t.log.Fatalf("Cannot get downloads: %v", err) } for i := range downloads { t.DownloadMap.Set(downloads[i].Filename, &downloads[i]) } offset += len(downloads) page++ if offset >= totalDownloads { break } } t.log.Infof("Compiled into %d downloads", t.DownloadCache.Count()) }) } func (t *TorrentManager) initializeDirectories() { // create internal directories t.DirectoryMap.Set(INT_ALL, cmap.New[*Torrent]()) // key is GetAccessKey() t.DirectoryMap.Set(INT_INFO_CACHE, cmap.New[*Torrent]()) // key is Torrent ID // create directory maps for _, directory := range t.Config.GetDirectories() { t.DirectoryMap.Set(directory, cmap.New[*Torrent]()) } } func (t *TorrentManager) saveTorrentChangesToDisk(torrent *Torrent, cb func(*Torrent)) { infoCache, _ := t.DirectoryMap.Get(INT_INFO_CACHE) torrent.DownloadedIDs.Union(torrent.InProgressIDs).Each(func(id string) bool { info, exists := infoCache.Get(id) if !exists { return false } if cb != nil { cb(info) } t.writeTorrentToFile(id, info) return false }) }