package torrent import ( "fmt" "path/filepath" "strings" "sync" "time" "github.com/debridmediamanager/zurg/pkg/realdebrid" cmap "github.com/orcaman/concurrent-map/v2" "github.com/scylladb/go-set" "github.com/scylladb/go-set/strset" ) func (t *TorrentManager) RefreshTorrents() []string { instances, _, err := t.Api.GetTorrents(0) if err != nil { t.log.Warnf("Cannot get torrents: %v\n", err) return nil } infoChan := make(chan *Torrent, len(instances)) var wg sync.WaitGroup for i := range instances { wg.Add(1) idx := i // capture the loop variable _ = t.workerPool.Submit(func() { defer wg.Done() infoChan <- t.getMoreInfo(instances[idx]) }) } wg.Wait() close(infoChan) t.log.Infof("Fetched info for %d torrents", len(instances)) freshKeys := set.NewStringSet() allTorrents, _ := t.DirectoryMap.Get(INT_ALL) noInfoCount := 0 for info := range infoChan { if info == nil { noInfoCount++ continue } freshKeys.Add(info.AccessKey) if torrent, exists := allTorrents.Get(info.AccessKey); !exists { t.allAccessKeys.Add(info.AccessKey) allTorrents.Set(info.AccessKey, info) } else if !strset.Difference(info.DownloadedIDs, torrent.DownloadedIDs).IsEmpty() { mainTorrent := t.mergeToMain(torrent, info) allTorrents.Set(info.AccessKey, &mainTorrent) } } t.log.Infof("Compiled into %d torrents, %d were missing info", allTorrents.Count(), noInfoCount) var updatedPaths []string // torrents yet to be assigned in a directory strset.Difference(freshKeys, t.allAccessKeys).Each(func(accessKey string) bool { // assign to directories tor, _ := allTorrents.Get(accessKey) t.assignedDirectoryCb(tor, func(directory string) { if strings.HasPrefix(directory, "int__") { return } torrents, _ := t.DirectoryMap.Get(directory) torrents.Set(accessKey, tor) updatedPaths = append(updatedPaths, fmt.Sprintf("%s/%s", directory, accessKey)) }) return true }) // removed torrents strset.Difference(t.allAccessKeys, freshKeys).Each(func(accessKey string) bool { t.Delete(accessKey, false) return true }) t.SetNewLatestState(t.getCurrentState()) return updatedPaths } func (t *TorrentManager) SetNewLatestState(checksum LibraryState) { t.latestState.DownloadingCount = checksum.DownloadingCount t.latestState.FirstTorrent = checksum.FirstTorrent t.latestState.TotalCount = checksum.TotalCount } // startRefreshJob periodically refreshes the torrents func (t *TorrentManager) startRefreshJob() { t.log.Info("Starting periodic refresh") for { <-time.After(time.Duration(t.Config.GetRefreshEverySeconds()) * time.Second) checksum := t.getCurrentState() if t.latestState.equal(checksum) { continue } t.log.Infof("Detected changes! Refreshing %d torrents", checksum.TotalCount) updatedPaths := t.RefreshTorrents() t.log.Info("Finished refreshing torrents") t.TriggerHookOnLibraryUpdate(updatedPaths) if t.Config.EnableRepair() { t.RepairAll() } else { t.log.Info("Repair is disabled, skipping repair check") } } } // getMoreInfo gets original name, size and files for a torrent func (t *TorrentManager) getMoreInfo(rdTorrent realdebrid.Torrent) *Torrent { infoCache, _ := t.DirectoryMap.Get(INT_INFO_CACHE) if tor, exists := infoCache.Get(rdTorrent.ID); exists && tor.SelectedFiles.Count() == len(rdTorrent.Links) { return tor } torrentFromFile := t.readTorrentFromFile(rdTorrent.ID) if torrentFromFile != nil && torrentFromFile.SelectedFiles.Count() == len(rdTorrent.Links) { infoCache.Set(rdTorrent.ID, torrentFromFile) return torrentFromFile } info, err := t.Api.GetTorrentInfo(rdTorrent.ID) if err != nil { t.log.Warnf("Cannot get info for id=%s: %v\n", rdTorrent.ID, err) return nil } torrent := Torrent{ AccessKey: t.computeAccessKey(info.Name, info.OriginalName), LatestAdded: info.Added, Hash: info.Hash, } // SelectedFiles is a subset of Files with only the selected ones // it also has a Link field, which can be empty // if it is empty, it means the file is no longer available // Files+Links together are the same as SelectedFiles var selectedFiles []*File // if some Links are empty, we need to repair it for _, file := range info.Files { if file.Selected == 0 { continue } selectedFiles = append(selectedFiles, &File{ File: file, Added: info.Added, Ended: info.Ended, Link: "", // no link yet }) } if len(selectedFiles) > len(info.Links) && info.Progress == 100 { if len(info.Links) == 1 { // this usually is a rar file issue if t.Config.ShouldDeleteRarFiles() { t.log.Warnf("Torrent %s id=%s is a rar file, it cannot be repaired. Deleting...", info.Name, info.ID) t.Api.DeleteTorrent(info.ID) } else { t.log.Warnf("Torrent %s id=%s is a rar file, it cannot be repaired as it's a known Real-Debrid limitation. zurg recommends you delete this torrent or add auto_delete_rar_torrents: true in your config.yml", info.Name, info.ID) torrent.Unfixable = true } return nil } else { t.log.Warnf("Torrent id=%s is partly expired. It has %d selected files but only %d links", info.ID, len(selectedFiles), len(info.Links)) torrent.ForRepair = true } } else if len(selectedFiles) == len(info.Links) { // all links are still intact! good! for i, file := range selectedFiles { file.Link = info.Links[i] } } torrent.SelectedFiles = cmap.New[*File]() for _, file := range selectedFiles { // todo better handling of duplicate filenames if torrent.SelectedFiles.Has(filepath.Base(file.Path)) { oldName := filepath.Base(file.Path) ext := filepath.Ext(oldName) filename := strings.TrimSuffix(oldName, ext) newName := fmt.Sprintf("%s (%d)%s", filename, file.ID, ext) torrent.SelectedFiles.Set(newName, file) } else { torrent.SelectedFiles.Set(filepath.Base(file.Path), file) } } torrent.DownloadedIDs = strset.New() torrent.InProgressIDs = strset.New() if info.Progress == 100 { torrent.DownloadedIDs.Add(info.ID) } else { torrent.InProgressIDs.Add(info.ID) } t.writeTorrentToFile(rdTorrent.ID, &torrent) infoCache.Set(rdTorrent.ID, &torrent) return &torrent } func (t *TorrentManager) mergeToMain(existing, toMerge *Torrent) Torrent { mainTorrent := Torrent{} mainTorrent.AccessKey = existing.AccessKey mainTorrent.Hash = existing.Hash mainTorrent.DownloadedIDs = strset.New() mainTorrent.InProgressIDs = strset.New() // this function triggers only when we have a new DownloadedID strset.Difference(toMerge.DownloadedIDs, existing.DownloadedIDs).Each(func(id string) bool { mainTorrent.DownloadedIDs.Add(id) mainTorrent.InProgressIDs.Remove(id) return true }) // the link can have the following values // 1. https://*** - the file is available // 2. repair - the file is available but we need to repair it // 3. repairing - the file is being repaired // 4. unselect - the file is deleted mainTorrent.SelectedFiles = existing.SelectedFiles toMerge.SelectedFiles.IterCb(func(filepath string, fileToMerge *File) { // see if it already exists in the main torrent if originalFile, ok := mainTorrent.SelectedFiles.Get(filepath); !ok || fileToMerge.Link == "unselect" { // if it doesn't exist in the main torrent, add it mainTorrent.SelectedFiles.Set(filepath, fileToMerge) } else if originalFile.Link != "unselect" { // if it exists, compare the LatestAdded property and the link if existing.LatestAdded < toMerge.LatestAdded && strings.HasPrefix(fileToMerge.Link, "http") { // if torrentToMerge is more recent and its file has a link, update the main torrent's file mainTorrent.SelectedFiles.Set(filepath, fileToMerge) } // else do nothing, the main torrent's file is more recent or has a valid link } }) if existing.LatestAdded < toMerge.LatestAdded { mainTorrent.LatestAdded = toMerge.LatestAdded } else { mainTorrent.LatestAdded = existing.LatestAdded } return mainTorrent } type torrentsResp struct { torrents []realdebrid.Torrent totalCount int } // generates a checksum based on the number of torrents, the first torrent id and the number of active torrents func (t *TorrentManager) getCurrentState() LibraryState { torrentsChan := make(chan torrentsResp, 1) countChan := make(chan int, 1) errChan := make(chan error, 2) // accommodate errors from both goroutines defer close(torrentsChan) defer close(countChan) defer close(errChan) _ = t.workerPool.Submit(func() { torrents, totalCount, err := t.Api.GetTorrents(1) if err != nil { errChan <- err return } torrentsChan <- torrentsResp{torrents: torrents, totalCount: totalCount} }) _ = t.workerPool.Submit(func() { count, err := t.Api.GetActiveTorrentCount() if err != nil { errChan <- err return } countChan <- count.DownloadingCount }) // Existing goroutines for GetTorrents and GetActiveTorrentCount var torrents []realdebrid.Torrent var totalCount, count int for i := 0; i < 2; i++ { select { case resp := <-torrentsChan: torrents = resp.torrents totalCount = resp.totalCount case count = <-countChan: case err := <-errChan: t.log.Warnf("Checksum API Error: %v\n", err) return EmptyState() } } if len(torrents) == 0 { t.log.Error("Huh, no torrents returned") return EmptyState() } return LibraryState{ TotalCount: totalCount, FirstTorrent: &torrents[0], DownloadingCount: count, } }