package torrent import ( "fmt" "path/filepath" "strings" "sync" "time" "github.com/debridmediamanager/zurg/internal/config" "github.com/debridmediamanager/zurg/pkg/realdebrid" "github.com/debridmediamanager/zurg/pkg/utils" mapset "github.com/deckarep/golang-set/v2" cmap "github.com/orcaman/concurrent-map/v2" ) func (t *TorrentManager) refreshTorrents() []string { instances, _, err := t.Api.GetTorrents(0, false) if err != nil { t.log.Warnf("Cannot get torrents: %v", err) return nil } infoChan := make(chan *Torrent, len(instances)) var wg sync.WaitGroup allTorrents, _ := t.DirectoryMap.Get(INT_ALL) for i := range instances { idx := i wg.Add(1) _ = t.workerPool.Submit(func() { defer wg.Done() infoChan <- t.getMoreInfo(instances[idx]) }) } wg.Wait() close(infoChan) t.log.Infof("Fetched info for %d torrents", len(instances)) newlyFetchedKeys := mapset.NewSet[string]() noInfoCount := 0 for info := range infoChan { if info == nil { noInfoCount++ continue } accessKey := t.GetKey(info) if !info.AllInProgress() { newlyFetchedKeys.Add(accessKey) } if torrent, exists := allTorrents.Get(accessKey); !exists { allTorrents.Set(accessKey, info) } else if !info.DownloadedIDs.Difference(torrent.DownloadedIDs).IsEmpty() { mainTorrent := t.mergeToMain(torrent, info) allTorrents.Set(accessKey, &mainTorrent) } } t.log.Infof("Compiled into %d torrents, %d were missing info", allTorrents.Count(), noInfoCount) var updatedPaths []string // torrents yet to be assigned in a directory newlyFetchedKeys.Difference(t.allAccessKeys).Each(func(accessKey string) bool { // assign to directories tor, ok := allTorrents.Get(accessKey) if !ok { return false } var directories []string t.assignedDirectoryCb(tor, func(directory string) { torrents, _ := t.DirectoryMap.Get(directory) torrents.Set(accessKey, tor) updatedPaths = append(updatedPaths, fmt.Sprintf("%s/%s", directory, accessKey)) // this is just for the logs if directory != config.ALL_TORRENTS { directories = append(directories, directory) } }) // t.log.Debugf("Added %s to %v", accessKey, directories) t.allAccessKeys.Add(accessKey) return false }) // removed torrents t.allAccessKeys.Difference(newlyFetchedKeys).Each(func(accessKey string) bool { t.Delete(accessKey, false) return false }) if t.Config.EnableRepair() { t.workerPool.Submit(func() { t.handleFixers() }) } return updatedPaths } // StartRefreshJob periodically refreshes the torrents func (t *TorrentManager) StartRefreshJob() { _ = t.workerPool.Submit(func() { t.log.Info("Starting periodic refresh job") refreshTicker := time.NewTicker(time.Duration(t.Config.GetRefreshEverySecs()) * time.Second) defer refreshTicker.Stop() for { select { case <-refreshTicker.C: checksum := t.getCurrentState() if t.latestState.Eq(checksum) { continue } t.log.Infof("Detected changes! Refreshing %d torrents", checksum.TotalCount) t.setNewLatestState(checksum) updatedPaths := t.refreshTorrents() t.log.Info("Finished refreshing torrents") t.TriggerHookOnLibraryUpdate(updatedPaths) case <-t.RefreshKillSwitch: t.log.Info("Stopping periodic refresh job") return } } }) } // getMoreInfo gets original name, size and files for a torrent func (t *TorrentManager) getMoreInfo(rdTorrent realdebrid.Torrent) *Torrent { infoCache, _ := t.DirectoryMap.Get(INT_INFO_CACHE) if torrentFromCache, exists := infoCache.Get(rdTorrent.ID); exists && !torrentFromCache.AnyInProgress() && torrentFromCache.SelectedFiles.Count() == len(rdTorrent.Links) { return torrentFromCache } else if !exists { torrentFromFile := t.readTorrentFromFile(rdTorrent.ID) if torrentFromFile != nil && !torrentFromFile.AnyInProgress() && torrentFromFile.SelectedFiles.Count() == len(rdTorrent.Links) { hasBrokenFiles := false torrentFromFile.SelectedFiles.IterCb(func(filepath string, file *File) { if file.IsBroken && !file.IsDeleted { hasBrokenFiles = true } }) if !hasBrokenFiles { infoCache.Set(rdTorrent.ID, torrentFromFile) t.ResetSelectedFiles(torrentFromFile) return torrentFromFile } else { t.log.Warnf("Torrent %s has broken files, will not save on info cache", rdTorrent.ID) } } } info, err := t.Api.GetTorrentInfo(rdTorrent.ID) if err != nil { t.log.Warnf("Cannot get info for id=%s: %v", rdTorrent.ID, err) return nil } torrent := Torrent{ Name: info.Name, OriginalName: info.OriginalName, Added: info.Added, Hash: info.Hash, } // SelectedFiles is a subset of Files with only the selected ones // it also has a Link field, which can be empty // if it is empty, it means the file is no longer available // Files+Links together are the same as SelectedFiles var selectedFiles []*File // if some Links are empty, we need to repair it for _, file := range info.Files { if file.Selected == 0 { continue } selectedFiles = append(selectedFiles, &File{ File: file, Ended: info.Ended, Link: "", // no link yet IsBroken: true, }) } if len(selectedFiles) == len(info.Links) { // all links are still intact! good! for i, file := range selectedFiles { file.Link = info.Links[i] file.IsBroken = false } torrent.UnassignedLinks = mapset.NewSet[string]() } else { torrent.UnassignedLinks = mapset.NewSet[string](info.Links...) } torrent.SelectedFiles = cmap.New[*File]() for _, file := range selectedFiles { filename := t.GetPath(file) // todo better handling of duplicate filenames if torrent.SelectedFiles.Has(filename) { oldName := filename ext := filepath.Ext(oldName) noExtension := strings.TrimSuffix(oldName, ext) newName := fmt.Sprintf("%s (%d)%s", noExtension, file.ID, ext) torrent.SelectedFiles.Set(newName, file) } else { torrent.SelectedFiles.Set(filename, file) } } torrent.DownloadedIDs = mapset.NewSet[string]() torrent.InProgressIDs = mapset.NewSet[string]() if info.IsDone() { torrent.DownloadedIDs.Add(info.ID) } else { torrent.InProgressIDs.Add(info.ID) } infoCache.Set(rdTorrent.ID, &torrent) t.saveTorrentChangesToDisk(&torrent, nil) return &torrent } // ResetSelectedFiles will rename the file based on config func (t *TorrentManager) ResetSelectedFiles(torrent *Torrent) { // reset selected files newSelectedFiles := cmap.New[*File]() torrent.SelectedFiles.IterCb(func(_ string, file *File) { filename := t.GetPath(file) if newSelectedFiles.Has(filename) { oldName := filename ext := filepath.Ext(oldName) noExtension := strings.TrimSuffix(oldName, ext) newName := fmt.Sprintf("%s (%d)%s", noExtension, file.ID, ext) newSelectedFiles.Set(newName, file) } else { newSelectedFiles.Set(filename, file) } }) torrent.SelectedFiles = newSelectedFiles } func (t *TorrentManager) mergeToMain(existing, toMerge *Torrent) Torrent { var newer, older *Torrent if existing.Added < toMerge.Added { newer = toMerge older = existing } else { newer = existing older = toMerge } // build the main torrent mainTorrent := Torrent{ Name: newer.Name, OriginalName: newer.OriginalName, Rename: newer.Rename, Hash: newer.Hash, Added: newer.Added, DownloadedIDs: newer.DownloadedIDs.Union(older.DownloadedIDs), InProgressIDs: newer.InProgressIDs.Union(older.InProgressIDs), UnassignedLinks: newer.UnassignedLinks.Union(older.UnassignedLinks), SelectedFiles: newer.SelectedFiles, UnrepairableReason: newer.UnrepairableReason, } // unrepairable reason if mainTorrent.UnrepairableReason != "" && older.UnrepairableReason != "" && mainTorrent.UnrepairableReason != older.UnrepairableReason { mainTorrent.UnrepairableReason = fmt.Sprintf("%s, %s", mainTorrent.UnrepairableReason, older.UnrepairableReason) } else if older.UnrepairableReason != "" { mainTorrent.UnrepairableReason = older.UnrepairableReason } // update in progress ids mainTorrent.DownloadedIDs.Each(func(id string) bool { mainTorrent.InProgressIDs.Remove(id) return false }) // the link can have the following values // 1. https://*** - the file is available // 3. empty - the file is not available mainTorrent.SelectedFiles.IterCb(func(key string, file *File) { if file.IsBroken { file, ok := older.SelectedFiles.Get(key) if ok { mainTorrent.SelectedFiles.Set(key, file) } } }) older.SelectedFiles.IterCb(func(key string, file *File) { if !mainTorrent.SelectedFiles.Has(key) { mainTorrent.SelectedFiles.Set(key, file) } else if file.IsDeleted { mainTorrent.SelectedFiles.Set(key, file) } }) return mainTorrent } func (t *TorrentManager) assignedDirectoryCb(tor *Torrent, cb func(string)) { torrentIDs := tor.DownloadedIDs.Union(tor.InProgressIDs).ToSlice() // get filenames needed for directory conditions var filenames []string var fileSizes []int64 unplayable := true tor.SelectedFiles.IterCb(func(key string, file *File) { filenames = append(filenames, filepath.Base(file.Path)) fileSizes = append(fileSizes, file.Bytes) if unplayable && utils.IsStreamable(file.Path) { unplayable = false } }) // Map torrents to directories switch t.Config.GetVersion() { case "v1": configV1 := t.Config.(*config.ZurgConfigV1) for _, directories := range configV1.GetGroupMap() { for _, directory := range directories { if unplayable { cb(config.UNPLAYABLE_TORRENTS) break } if t.Config.MeetsConditions(directory, t.GetKey(tor), tor.ComputeTotalSize(), torrentIDs, filenames, fileSizes) { cb(directory) break } } } } }