package torrent import ( "io" "net/url" "os" "path/filepath" "strings" "github.com/debridmediamanager/zurg/internal/config" "github.com/debridmediamanager/zurg/pkg/logutil" "github.com/debridmediamanager/zurg/pkg/realdebrid" "github.com/debridmediamanager/zurg/pkg/utils" mapset "github.com/deckarep/golang-set/v2" cmap "github.com/orcaman/concurrent-map/v2" "github.com/panjf2000/ants/v2" ) const ( INT_ALL = "int__all__" INT_INFO_CACHE = "int__info__" ) type TorrentManager struct { Config config.ConfigInterface Api *realdebrid.RealDebrid DirectoryMap cmap.ConcurrentMap[string, cmap.ConcurrentMap[string, *Torrent]] // directory -> accessKey -> Torrent DownloadCache cmap.ConcurrentMap[string, *realdebrid.Download] DownloadMap cmap.ConcurrentMap[string, *realdebrid.Download] fixers cmap.ConcurrentMap[string, *Torrent] repairs mapset.Set[string] allAccessKeys mapset.Set[string] latestState *LibraryState requiredVersion string workerPool *ants.Pool repairPool *ants.Pool log *logutil.Logger } // NewTorrentManager creates a new torrent manager // it will fetch all torrents and their info in the background // and store them in-memory and cached in files func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, workerPool, repairPool *ants.Pool, log *logutil.Logger) *TorrentManager { t := &TorrentManager{ Config: cfg, Api: api, DirectoryMap: cmap.New[cmap.ConcurrentMap[string, *Torrent]](), DownloadCache: cmap.New[*realdebrid.Download](), DownloadMap: cmap.New[*realdebrid.Download](), fixers: cmap.New[*Torrent](), repairs: mapset.NewSet[string](), allAccessKeys: mapset.NewSet[string](), latestState: &LibraryState{}, requiredVersion: "11.01.2024", workerPool: workerPool, repairPool: repairPool, log: log, } // create internal directories t.DirectoryMap.Set(INT_ALL, cmap.New[*Torrent]()) // key is GetAccessKey() t.DirectoryMap.Set(INT_INFO_CACHE, cmap.New[*Torrent]()) // key is Torrent ID // create directory maps for _, directory := range cfg.GetDirectories() { t.DirectoryMap.Set(directory, cmap.New[*Torrent]()) } // Fetch downloads if t.Config.EnableDownloadCache() { _ = t.workerPool.Submit(func() { page := 1 offset := 0 for { downloads, totalDownloads, err := t.Api.GetDownloads(page, offset) if err != nil { t.log.Fatalf("Cannot get downloads: %v", err) } for i := range downloads { if !t.DownloadCache.Has(downloads[i].Link) { if strings.Contains(downloads[i].Download, "download.real-debrid.") { prefHost := t.Config.GetRandomPreferredHost() if prefHost != "" { downloads[i].Download = replaceHostInURL(downloads[i].Download, prefHost) } } t.cacheDownload(&downloads[i]) } } offset += len(downloads) page++ if offset >= totalDownloads { t.log.Infof("Compiled into %d downloads", t.DownloadCache.Count()) break } } }) } t.RefreshTorrents() t.SetNewLatestState(t.getCurrentState()) if t.Config.EnableRepair() { t.RepairAll() // initial repair } else { t.log.Info("Repair is disabled, skipping repair check") } t.log.Info("Finished initializing torrent manager") _ = t.workerPool.Submit(func() { t.startRefreshJob() }) return t } // proxy func (t *TorrentManager) UnrestrictUntilOk(link string) *realdebrid.Download { if !strings.HasPrefix(link, "http") { return nil } if download, exists := t.DownloadCache.Get(link); exists { return download } ret, err := t.Api.UnrestrictLink(link, t.Config.ShouldServeFromRclone()) if err != nil { t.log.Warnf("Cannot unrestrict link %s: %v", link, err) return nil } if ret != nil && ret.Link != "" { if strings.Contains(ret.Download, "download.real-debrid.") { prefHost := t.Config.GetRandomPreferredHost() if prefHost != "" { ret.Download = replaceHostInURL(ret.Download, prefHost) } } t.cacheDownload(ret) } return ret } func (t *TorrentManager) TriggerHookOnLibraryUpdate(updatedPaths []string) { t.log.Debugf("Triggering hook on_library_update for %d path(s)", len(updatedPaths)) _ = t.workerPool.Submit(func() { OnLibraryUpdateHook(updatedPaths, t.Config, t.log) }) } func (t *TorrentManager) assignedDirectoryCb(tor *Torrent, cb func(string)) { torrentIDs := tor.DownloadedIDs.Union(tor.InProgressIDs).ToSlice() // get filenames needed for directory conditions var filenames []string var fileSizes []int64 unplayable := true tor.SelectedFiles.IterCb(func(key string, file *File) { filenames = append(filenames, filepath.Base(file.Path)) fileSizes = append(fileSizes, file.Bytes) if !tor.Unrepairable && unplayable && utils.IsStreamable(file.Path) { unplayable = false } }) // Map torrents to directories switch t.Config.GetVersion() { case "v1": configV1 := t.Config.(*config.ZurgConfigV1) for _, directories := range configV1.GetGroupMap() { for _, directory := range directories { if unplayable { cb(config.UNPLAYABLE_TORRENTS) break } if t.Config.MeetsConditions(directory, t.GetKey(tor), tor.ComputeTotalSize(), torrentIDs, filenames, fileSizes) { cb(directory) break } } } } } func (t *TorrentManager) GetKey(torrent *Torrent) string { if !t.Config.ShouldIgnoreRenames() && torrent.Rename != "" { return torrent.Rename } if t.Config.EnableRetainRDTorrentName() { return torrent.Name } // drop the extension from the name if t.Config.EnableRetainFolderNameExtension() && strings.Contains(torrent.Name, torrent.OriginalName) { return torrent.Name } else { ret := strings.TrimSuffix(torrent.OriginalName, ".mp4") ret = strings.TrimSuffix(ret, ".mkv") return ret } } func (t *TorrentManager) writeTorrentToFile(instanceID string, torrent *Torrent) { filePath := "data/" + instanceID + ".json" file, err := os.Create(filePath) if err != nil { t.log.Warnf("Cannot create file %s: %v", filePath, err) return } defer file.Close() torrent.Version = t.requiredVersion jsonData, err := json.Marshal(torrent) if err != nil { t.log.Warnf("Cannot marshal torrent: %v", err) return } if _, err := file.Write(jsonData); err != nil { t.log.Warnf("Cannot write to file %s: %v", filePath, err) return } t.log.Debugf("Saved torrent %s to file", instanceID) } func (t *TorrentManager) readTorrentFromFile(torrentID string) *Torrent { filePath := "data/" + torrentID + ".json" file, err := os.Open(filePath) if err != nil { if os.IsNotExist(err) { return nil } return nil } defer file.Close() jsonData, err := io.ReadAll(file) if err != nil { return nil } var torrent *Torrent if err := json.Unmarshal(jsonData, &torrent); err != nil { return nil } if torrent.DownloadedIDs.Union(torrent.InProgressIDs).IsEmpty() { t.log.Fatal("Torrent has no downloaded or in progress ids") } if torrent.Version != t.requiredVersion { return nil } return torrent } func (t *TorrentManager) deleteTorrentFile(torrentID string) { filePath := "data/" + torrentID + ".json" err := os.Remove(filePath) if err != nil { t.log.Warnf("Cannot delete file %s: %v", filePath, err) } } func replaceHostInURL(inputURL string, newHost string) string { u, err := url.Parse(inputURL) if err != nil { return "" } u.Host = newHost return u.String() } func (t *TorrentManager) cacheDownload(ret *realdebrid.Download) { t.DownloadCache.Set(ret.Link, ret) t.DownloadMap.Set(ret.Filename, ret) }