package torrent import ( "io" "os" "strings" "github.com/debridmediamanager/zurg/internal/config" "github.com/debridmediamanager/zurg/pkg/realdebrid" cmap "github.com/orcaman/concurrent-map/v2" "github.com/panjf2000/ants/v2" "github.com/scylladb/go-set" "github.com/scylladb/go-set/strset" "go.uber.org/zap" ) const ( INT_ALL = "int__all__" INT_INFO_CACHE = "int__info__" ) type TorrentManager struct { Config config.ConfigInterface Api *realdebrid.RealDebrid DirectoryMap cmap.ConcurrentMap[string, cmap.ConcurrentMap[string, *Torrent]] // directory -> accessKey -> Torrent DownloadCache cmap.ConcurrentMap[string, *realdebrid.Download] accessKeySet *strset.Set latestState *LibraryState requiredVersion string workerPool *ants.Pool repairWorker *ants.Pool log *zap.SugaredLogger } // NewTorrentManager creates a new torrent manager // it will fetch all torrents and their info in the background // and store them in-memory and cached in files func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, p *ants.Pool, log *zap.SugaredLogger) *TorrentManager { initialSate := EmptyState() t := &TorrentManager{ Config: cfg, Api: api, accessKeySet: set.NewStringSet(), latestState: &initialSate, requiredVersion: "06.12.2023", workerPool: p, log: log, } // create internal directories t.DirectoryMap = cmap.New[cmap.ConcurrentMap[string, *Torrent]]() t.DirectoryMap.Set(INT_ALL, cmap.New[*Torrent]()) // key is AccessKey t.DirectoryMap.Set(INT_INFO_CACHE, cmap.New[*Torrent]()) // key is Torrent ID // create directory maps for _, directory := range cfg.GetDirectories() { t.DirectoryMap.Set(directory, cmap.New[*Torrent]()) } // Fetch downloads t.DownloadCache = cmap.New[*realdebrid.Download]() if t.Config.EnableDownloadCache() { _ = t.workerPool.Submit(func() { page := 1 offset := 0 for { downloads, totalDownloads, err := t.Api.GetDownloads(page, offset) if err != nil { t.log.Fatalf("Cannot get downloads: %v\n", err) } for i := range downloads { if !t.DownloadCache.Has(downloads[i].Link) { t.DownloadCache.Set(downloads[i].Link, &downloads[i]) } } offset += len(downloads) page++ if offset >= totalDownloads { t.log.Infof("Fetched %d downloads", t.DownloadCache.Count()) break } } }) } t.RefreshTorrents() if t.Config.EnableRepair() { repairWorker, err := ants.NewPool(1) if err != nil { log.Fatalf("Failed to create repair worker: %v", err) } t.repairWorker = repairWorker t.RepairAll() // initial repair } else { t.log.Info("Repair is disabled, skipping repair check") } t.log.Info("Finished initializing torrent manager") _ = t.workerPool.Submit(func() { t.startRefreshJob() }) return t } // proxy func (t *TorrentManager) UnrestrictUntilOk(link string) *realdebrid.Download { return t.Api.UnrestrictUntilOk(link, t.Config.ShouldServeFromRclone()) } func (t *TorrentManager) TriggerHookOnLibraryUpdate(updatedPaths []string) { t.log.Debugf("Triggering hook on_library_update for %d path(s)", len(updatedPaths)) _ = t.workerPool.Submit(func() { OnLibraryUpdateHook(updatedPaths, t.Config, t.log) }) } func (t *TorrentManager) assignedDirectoryCb(tor *Torrent, cb func(string)) { torrentIDs := strset.Union(tor.DownloadedIDs, tor.InProgressIDs).List() // get filenames needed for directory conditions filenames := tor.SelectedFiles.Keys() // Map torrents to directories switch t.Config.GetVersion() { case "v1": configV1 := t.Config.(*config.ZurgConfigV1) for _, directories := range configV1.GetGroupMap() { for _, directory := range directories { if t.Config.MeetsConditions(directory, tor.AccessKey, torrentIDs, filenames) { cb(directory) break } } } } } func (t *TorrentManager) computeAccessKey(name, originalName string) string { if t.Config.EnableRetainRDTorrentName() { return name } // drop the extension from the name if t.Config.EnableRetainFolderNameExtension() && strings.Contains(name, originalName) { return name } else { ret := strings.TrimSuffix(originalName, ".mp4") ret = strings.TrimSuffix(ret, ".mkv") return ret } } func (t *TorrentManager) writeTorrentToFile(instanceID string, torrent *Torrent) { filePath := "data/" + instanceID + ".json" file, err := os.Create(filePath) if err != nil { t.log.Warnf("Cannot create file %s: %v", filePath, err) return } defer file.Close() torrent.Version = t.requiredVersion jsonData, err := json.Marshal(torrent) if err != nil { t.log.Warnf("Cannot marshal torrent: %v", err) return } if _, err := file.Write(jsonData); err != nil { t.log.Warnf("Cannot write to file %s: %v", filePath, err) return } t.log.Debugf("Saved torrent %s to file", instanceID) } func (t *TorrentManager) readTorrentFromFile(torrentID string) *Torrent { filePath := "data/" + torrentID + ".json" file, err := os.Open(filePath) if err != nil { if os.IsNotExist(err) { return nil } return nil } defer file.Close() jsonData, err := io.ReadAll(file) if err != nil { return nil } var torrent *Torrent if err := json.Unmarshal(jsonData, &torrent); err != nil { return nil } if strset.Union(torrent.DownloadedIDs, torrent.InProgressIDs).IsEmpty() { t.log.Fatal("Torrent has no downloaded or in progress ids") } if torrent.Version != t.requiredVersion { return nil } return torrent } func (t *TorrentManager) deleteTorrentFile(torrentID string) { filePath := "data/" + torrentID + ".json" err := os.Remove(filePath) if err != nil { t.log.Warnf("Cannot delete file %s: %v", filePath, err) } }