package torrent import ( "encoding/gob" "fmt" "math" "os" "strings" "sync" "time" "github.com/debridmediamanager.com/zurg/internal/config" "github.com/debridmediamanager.com/zurg/pkg/logutil" "github.com/debridmediamanager.com/zurg/pkg/realdebrid" "github.com/hashicorp/golang-lru/v2/expirable" "go.uber.org/zap" ) type TorrentManager struct { requiredVersion string rd *realdebrid.RealDebrid torrents []Torrent torrentMap map[string]*Torrent inProgress []string checksum string config config.ConfigInterface cache *expirable.LRU[string, string] workerPool chan bool directoryMap map[string][]string processedTorrents map[string][]string mu *sync.Mutex log *zap.SugaredLogger } // NewTorrentManager creates a new torrent manager // it will fetch all torrents and their info in the background // and store them in-memory; it is called only once at startup func NewTorrentManager(config config.ConfigInterface, cache *expirable.LRU[string, string]) *TorrentManager { t := &TorrentManager{ requiredVersion: fmt.Sprintf("8.11.2023 - retain:%v", config.EnableRetainFolderNameExtension()), rd: realdebrid.NewRealDebrid(config.GetToken(), logutil.NewLogger().Named("realdebrid")), torrentMap: make(map[string]*Torrent), config: config, cache: cache, workerPool: make(chan bool, config.GetNumOfWorkers()), directoryMap: make(map[string][]string), processedTorrents: make(map[string][]string), mu: &sync.Mutex{}, log: logutil.NewLogger().Named("manager"), } // start with a clean slate t.mu.Lock() t.cache.Purge() t.torrents = nil newTorrents, _, err := t.rd.GetTorrents(0) if err != nil { t.log.Fatalf("Cannot get torrents: %v\n", err) } torrentsChan := make(chan *Torrent, len(newTorrents)) var wg sync.WaitGroup for i := range newTorrents { wg.Add(1) go func(idx int) { defer wg.Done() t.workerPool <- true torrentsChan <- t.getMoreInfo(newTorrents[idx]) <-t.workerPool }(i) } wg.Wait() close(torrentsChan) for newTorrent := range torrentsChan { if newTorrent == nil { continue } t.torrents = append(t.torrents, *newTorrent) if _, exists := t.torrentMap[newTorrent.AccessKey]; exists { t.torrentMap[newTorrent.AccessKey].Files = newTorrent.Files t.torrentMap[newTorrent.AccessKey].Links = newTorrent.Links t.torrentMap[newTorrent.AccessKey].SelectedFiles = newTorrent.SelectedFiles t.torrentMap[newTorrent.AccessKey].ForRepair = newTorrent.ForRepair } else { t.torrentMap[newTorrent.AccessKey] = newTorrent } } t.checksum = t.getChecksum() t.mu.Unlock() if t.config.EnableRepair() { go t.repairAll() } go t.startRefreshJob() return t } // GetByDirectory returns all torrents that have a file in the specified directory func (t *TorrentManager) GetByDirectory(directory string) []Torrent { var torrents []Torrent for i := range t.torrents { for _, dir := range t.directoryMap[t.torrents[i].AccessKey] { if dir == directory { torrents = append(torrents, t.torrents[i]) } } } return torrents } // HideTheFile marks a file as deleted func (t *TorrentManager) HideTheFile(torrent *Torrent, file *File) { file.Unavailable = true t.repair(torrent.ID, torrent.SelectedFiles, false) } // FindAllTorrentsWithName finds all torrents in a given directory with a given name func (t *TorrentManager) FindAllTorrentsWithName(directory, torrentName string) []Torrent { var matchingTorrents []Torrent torrents := t.GetByDirectory(directory) for i := range torrents { if torrents[i].AccessKey == torrentName || strings.Contains(torrents[i].AccessKey, torrentName) { matchingTorrents = append(matchingTorrents, torrents[i]) } } return matchingTorrents } // proxy func (t *TorrentManager) UnrestrictUntilOk(link string) *realdebrid.UnrestrictResponse { return t.rd.UnrestrictUntilOk(link) } // findAllDownloadedFilesFromHash finds all files that were with a given hash func (t *TorrentManager) findAllDownloadedFilesFromHash(hash string) []File { var files []File for _, torrent := range t.torrents { if torrent.Hash == hash { for _, file := range torrent.SelectedFiles { if file.Link != "" { files = append(files, file) } } } } return files } type torrentsResponse struct { torrents []realdebrid.Torrent totalCount int } func (t *TorrentManager) getChecksum() string { torrentsChan := make(chan torrentsResponse) countChan := make(chan int) errChan := make(chan error, 2) // accommodate errors from both goroutines // GetTorrents request go func() { torrents, totalCount, err := t.rd.GetTorrents(1) if err != nil { errChan <- err return } torrentsChan <- torrentsResponse{torrents: torrents, totalCount: totalCount} }() // GetActiveTorrentCount request go func() { count, err := t.rd.GetActiveTorrentCount() if err != nil { errChan <- err return } countChan <- count.DownloadingCount }() var torrents []realdebrid.Torrent var totalCount, count int for i := 0; i < 2; i++ { select { case torrentsResp := <-torrentsChan: torrents = torrentsResp.torrents totalCount = torrentsResp.totalCount case count = <-countChan: case err := <-errChan: t.log.Errorf("Checksum API Error: %v\n", err) return "" } } if len(torrents) == 0 { t.log.Error("Huh, no torrents returned") return "" } checksum := fmt.Sprintf("%d%s%d", totalCount, torrents[0].ID, count) return checksum } // startRefreshJob periodically refreshes the torrents func (t *TorrentManager) startRefreshJob() { t.log.Info("Starting periodic refresh") for { <-time.After(time.Duration(t.config.GetRefreshEverySeconds()) * time.Second) checksum := t.getChecksum() if checksum == t.checksum { continue } t.mu.Lock() t.cache.Purge() t.torrents = nil newTorrents, _, err := t.rd.GetTorrents(0) if err != nil { t.log.Errorf("Cannot get torrents: %v\n", err) continue } torrentsChan := make(chan *Torrent) var wg sync.WaitGroup for i := range newTorrents { wg.Add(1) go func(idx int) { defer wg.Done() t.log.Debug(newTorrents[idx].ID) t.workerPool <- true torrentsChan <- t.getMoreInfo(newTorrents[idx]) <-t.workerPool }(i) } wg.Wait() close(torrentsChan) for newTorrent := range torrentsChan { if newTorrent == nil { continue } t.torrents = append(t.torrents, *newTorrent) if _, exists := t.torrentMap[newTorrent.AccessKey]; exists { t.torrentMap[newTorrent.AccessKey].Files = newTorrent.Files t.torrentMap[newTorrent.AccessKey].Links = newTorrent.Links t.torrentMap[newTorrent.AccessKey].SelectedFiles = newTorrent.SelectedFiles t.torrentMap[newTorrent.AccessKey].ForRepair = newTorrent.ForRepair } else { t.torrentMap[newTorrent.AccessKey] = newTorrent } } t.checksum = t.getChecksum() t.mu.Unlock() if t.config.EnableRepair() { go t.repairAll() } go OnLibraryUpdateHook(t.config) } } // getMoreInfo updates the selected files for a torrent func (t *TorrentManager) getMoreInfo(rdTorrent realdebrid.Torrent) *Torrent { t.log.Info("Getting more info for", rdTorrent.ID) // file cache torrentFromFile := t.readFromFile(rdTorrent.ID) if torrentFromFile != nil { // see if api data and file data still match // then it means data is still usable if len(torrentFromFile.Links) == len(rdTorrent.Links) { return torrentFromFile } } t.log.Debug("Getting info for", rdTorrent.ID) info, err := t.rd.GetTorrentInfo(rdTorrent.ID) if err != nil { t.log.Errorf("Cannot get info for id=%s: %v\n", rdTorrent.ID, err) return nil } torrent := Torrent{ Version: t.requiredVersion, Torrent: *info, SelectedFiles: nil, ForRepair: false, } // SelectedFiles is a subset of Files with only the selected ones // it also has a Link field, which can be empty // if it is empty, it means the file is no longer available // Files+Links together are the same as SelectedFiles var selectedFiles []File var streamableFiles []File // if some Links are empty, we need to repair it forRepair := false for _, file := range info.Files { if isStreamable(file.Path) { streamableFiles = append(streamableFiles, File{ File: file, Link: "", }) } if file.Selected == 0 { continue } selectedFiles = append(selectedFiles, File{ File: file, Link: "", }) } if len(selectedFiles) > len(info.Links) && info.Progress == 100 { t.log.Debugf("Some links has expired for %s %s: %d selected but only %d link(s)", info.ID, info.Name, len(selectedFiles), len(info.Links)) // chaotic file means RD will not output the desired file selection // e.g. even if we select just a single mkv, it will output a rar var isChaotic bool selectedFiles, isChaotic = t.organizeChaos(info, selectedFiles) if isChaotic { t.log.Infof("Torrent %s %s is unfixable, it's always returning an unstreamable link, ignoring", info.ID, info.Name) t.log.Debugf("You can try fixing it yourself magnet:?xt=urn:btih:%s", info.Hash) } else { if len(streamableFiles) > 1 { t.log.Infof("Torrent %s %s marked for repair", info.ID, info.Name) forRepair = true } else { t.log.Infof("Torrent %s %s is unfixable, the lone streamable link has expired, ignoring", info.ID, info.Name) t.log.Debugf("You can try fixing it yourself magnet:?xt=urn:btih:%s", info.Hash) } } } else { // all links are still intact! good! for i, link := range info.Links { selectedFiles[i].Link = link } } torrent.ForRepair = forRepair torrent.SelectedFiles = selectedFiles torrent.AccessKey = t.getName(info.Name, info.OriginalName) // update file cache if len(selectedFiles) > 0 { t.writeToFile(&torrent) } t.log.Debugf("Got info for %s %s", torrent.ID, torrent.AccessKey) return &torrent } func (t *TorrentManager) getName(name, originalName string) string { // drop the extension from the name if t.config.EnableRetainFolderNameExtension() && strings.Contains(name, originalName) { return name } else { ret := strings.TrimSuffix(originalName, ".mp4") ret = strings.TrimSuffix(ret, ".mkv") return ret } } // mapToDirectories maps torrents to directories func (t *TorrentManager) mapToDirectories() { // Map torrents to directories switch t.config.GetVersion() { case "v1": configV1 := t.config.(*config.ZurgConfigV1) groupMap := configV1.GetGroupMap() // for every group, iterate over every torrent // and then sprinkle/distribute the torrents to the directories of the group for group, directories := range groupMap { counter := make(map[string]int) for i := range t.torrents { // don't process torrents that are already mapped if it is not the first run alreadyMappedToGroup := false for _, mappedGroup := range t.processedTorrents[t.torrents[i].AccessKey] { if mappedGroup == group { alreadyMappedToGroup = true } } if alreadyMappedToGroup { continue } for _, directory := range directories { var filenames []string for _, file := range t.torrents[i].SelectedFiles { filenames = append(filenames, file.Path) } if configV1.MeetsConditions(directory, t.torrents[i].ID, t.torrents[i].AccessKey, filenames) { found := false // check if it is already mapped to this directory for _, dir := range t.directoryMap[t.torrents[i].AccessKey] { if dir == directory { found = true break // it is already mapped to this directory } } if !found { counter[directory]++ t.mu.Lock() t.directoryMap[t.torrents[i].AccessKey] = append(t.directoryMap[t.torrents[i].AccessKey], directory) t.mu.Unlock() break // we found a directory for this torrent, so we can stop looking for more } } } t.mu.Lock() t.processedTorrents[t.torrents[i].AccessKey] = append(t.processedTorrents[t.torrents[i].AccessKey], group) t.mu.Unlock() } sum := 0 for _, count := range counter { sum += count } if sum > 0 { t.log.Infof("Group processing completed: %s %v total: %d", group, counter, sum) } else { t.log.Infof("No new additions to directory group %s", group) } } default: t.log.Error("Unknown config version") } } func (t *TorrentManager) getDirectories(torrent *Torrent) []string { var ret []string // Map torrents to directories switch t.config.GetVersion() { case "v1": configV1 := t.config.(*config.ZurgConfigV1) groupMap := configV1.GetGroupMap() // for every group, iterate over every torrent // and then sprinkle/distribute the torrents to the directories of the group for _, directories := range groupMap { for _, directory := range directories { var filenames []string for _, file := range torrent.SelectedFiles { filenames = append(filenames, file.Path) } if configV1.MeetsConditions(directory, torrent.ID, torrent.AccessKey, filenames) { ret = append(ret, directory) break // we found a directory for this torrent for this group, so we can stop looking for more } } } default: t.log.Error("Unknown config version") } return ret } // getByID returns a torrent by its ID func (t *TorrentManager) getByID(torrentID string) *Torrent { for i := range t.torrents { if t.torrents[i].ID == torrentID { return &t.torrents[i] } } return nil } // writeToFile writes a torrent to a file func (t *TorrentManager) writeToFile(torrent *Torrent) { filePath := fmt.Sprintf("data/%s.bin", torrent.ID) file, err := os.Create(filePath) if err != nil { t.log.Fatalf("Failed creating file: %s", err) return } defer file.Close() torrent.Version = t.requiredVersion dataEncoder := gob.NewEncoder(file) dataEncoder.Encode(torrent) } // readFromFile reads a torrent from a file func (t *TorrentManager) readFromFile(torrentID string) *Torrent { filePath := fmt.Sprintf("data/%s.bin", torrentID) fileInfo, err := os.Stat(filePath) if err != nil { return nil } if time.Since(fileInfo.ModTime()) > time.Duration(t.config.GetCacheTimeHours())*time.Hour { return nil } file, err := os.Open(filePath) if err != nil { return nil } defer file.Close() var torrent Torrent dataDecoder := gob.NewDecoder(file) err = dataDecoder.Decode(&torrent) if err != nil { return nil } if torrent.Version != t.requiredVersion { return nil } return &torrent } func (t *TorrentManager) repairAll() { for _, torrent := range t.torrents { if torrent.ForRepair { t.log.Infof("There were less links than was expected on %s %s; fixing...", torrent.ID, torrent.AccessKey) t.repair(torrent.ID, torrent.SelectedFiles, true) } if len(torrent.Links) == 0 && torrent.Progress == 100 { // If the torrent has no links // and already processing repair // delete it! t.log.Infof("Deleting broken torrent %s %s as it doesn't contain any files", torrent.ID, torrent.AccessKey) t.rd.DeleteTorrent(torrent.ID) } } } func (t *TorrentManager) repair(torrentID string, selectedFiles []File, tryReinsertionFirst bool) { torrent := t.getByID(torrentID) if torrent == nil { return } // check if it is already "being" repaired found := false for _, hash := range t.inProgress { if hash == torrent.Hash { found = true break } } if found { t.log.Infof("Repair in progress, skipping %s", torrentID) return } // check if it is already repaired foundFiles := t.findAllDownloadedFilesFromHash(torrent.Hash) var missingFiles []File for _, sFile := range selectedFiles { if sFile.Link == "" || sFile.Unavailable { found := false for _, fFile := range foundFiles { // same file but different link, then yes it has been repaired if sFile.Path == fFile.Path && sFile.Link != fFile.Link { found = true break } } if !found { missingFiles = append(missingFiles, sFile) } } } if len(missingFiles) == 0 { t.log.Infof("Torrent %s %s is already repaired", torrent.ID, torrent.AccessKey) return } // then we repair it! t.log.Infof("Repairing torrent %s %s", torrent.ID, torrent.AccessKey) // check if we can still add more downloads proceed := t.canCapacityHandle() if !proceed { t.log.Error("Cannot add more torrents, exiting") return } // first solution: add the same selection, maybe it can be fixed by reinsertion? success := false if tryReinsertionFirst { success = t.reinsertTorrent(torrent, "", true) } if !success { // if all the selected files are missing but there are other streamable files var otherStreamableFileIDs []int for _, file := range torrent.Files { found := false for _, selectedFile := range selectedFiles { if selectedFile.ID == file.ID { found = true break } } if !found && isStreamable(file.Path) { otherStreamableFileIDs = append(otherStreamableFileIDs, file.ID) } } if (len(missingFiles) == len(selectedFiles) || len(missingFiles) == 1) && len(otherStreamableFileIDs) > 0 { // we will download 1 extra streamable file to force a redownload of the missing files // or if there's only 1 missing file, we will download 1 more to prevent a rename missingFilesPlus1 := strings.Join(getFileIDs(missingFiles), ",") t.log.Infof("Redownloading %d missing files", len(missingFiles)) t.reinsertTorrent(torrent, missingFilesPlus1, false) } else if len(selectedFiles) > 1 { // if not, last resort: add only the missing files but do it in 2 batches half := len(missingFiles) / 2 missingFiles1 := strings.Join(getFileIDs(missingFiles[:half]), ",") missingFiles2 := strings.Join(getFileIDs(missingFiles[half:]), ",") if missingFiles1 != "" { t.log.Infof("Redownloading %d missing files; batch 1 of 2", len(missingFiles1)) t.reinsertTorrent(torrent, missingFiles1, false) } if missingFiles2 != "" { t.log.Infof("Redownloading %d missing files; batch 2 of 2", len(missingFiles2)) t.reinsertTorrent(torrent, missingFiles2, false) } else { t.log.Info("No other missing files left to reinsert") } } else { t.log.Infof("Torrent %s %s is unfixable as the only link cached in RD is already broken", torrent.ID, torrent.AccessKey) t.log.Debugf("You can try fixing it yourself magnet:?xt=urn:btih:%s", torrent.Hash) return } t.log.Info("Waiting for downloads to finish") } } func (t *TorrentManager) reinsertTorrent(torrent *Torrent, missingFiles string, deleteIfFailed bool) bool { // if missingFiles is not provided, look for missing files if missingFiles == "" { t.log.Info("Redownloading whole torrent", torrent.AccessKey) var selection string for _, file := range torrent.SelectedFiles { selection += fmt.Sprintf("%d,", file.ID) } if selection == "" { return false } if len(selection) > 0 { missingFiles = selection[:len(selection)-1] } } // redownload torrent resp, err := t.rd.AddMagnetHash(torrent.Hash) if err != nil { t.log.Errorf("Cannot redownload torrent: %v", err) return false } newTorrentID := resp.ID err = t.rd.SelectTorrentFiles(newTorrentID, missingFiles) if err != nil { t.log.Errorf("Cannot start redownloading: %v", err) } if deleteIfFailed { if err != nil { t.rd.DeleteTorrent(newTorrentID) return false } time.Sleep(1 * time.Second) // see if the torrent is ready info, err := t.rd.GetTorrentInfo(newTorrentID) if err != nil { t.log.Errorf("Cannot get info on redownloaded torrent: %v", err) if deleteIfFailed { t.rd.DeleteTorrent(newTorrentID) } return false } time.Sleep(1 * time.Second) if info.Progress != 100 { t.log.Infof("Torrent is not cached anymore so we have to wait until completion, currently %d%%", info.Progress) t.rd.DeleteTorrent(newTorrentID) return false } if len(info.Links) != len(torrent.SelectedFiles) { t.log.Infof("It didn't fix the issue, only got %d files but we need %d, undoing", len(info.Links), len(torrent.SelectedFiles)) t.rd.DeleteTorrent(newTorrentID) return false } t.log.Info("Redownload successful, deleting old torrent") t.rd.DeleteTorrent(torrent.ID) } return true } func (t *TorrentManager) organizeChaos(info *realdebrid.Torrent, selectedFiles []File) ([]File, bool) { type Result struct { Response *realdebrid.UnrestrictResponse } resultsChan := make(chan Result, len(info.Links)) var wg sync.WaitGroup // Limit concurrency sem := make(chan bool, t.config.GetNumOfWorkers()) for _, link := range info.Links { wg.Add(1) sem <- true go func(lnk string) { defer wg.Done() defer func() { <-sem }() resp := t.rd.UnrestrictUntilOk(lnk) resultsChan <- Result{Response: resp} }(link) } go func() { t.log.Debugf("Checking %d link(s) for problematic torrent id=%s", len(info.Links), info.ID) wg.Wait() close(sem) close(resultsChan) t.log.Debugf("Closing results channel for torrent id=%s, checking...", info.ID) }() isChaotic := false for result := range resultsChan { if result.Response == nil { continue } found := false for i := range selectedFiles { if strings.Contains(selectedFiles[i].Path, result.Response.Filename) { t.log.Debugf("Found a file that is in the selection for torrent id=%s: %s", info.ID, result.Response.Filename) selectedFiles[i].Link = result.Response.Link found = true } } if !found { isChaotic = result.Response.Streamable == 0 t.log.Debugf("Found a file that is not in the selection for torrent id=%s: %s %v", info.ID, result.Response.Filename, result.Response.Streamable) selectedFiles = append(selectedFiles, File{ File: realdebrid.File{ Path: result.Response.Filename, Bytes: result.Response.Filesize, Selected: 1, }, Link: result.Response.Link, }) } } return selectedFiles, isChaotic } func (t *TorrentManager) canCapacityHandle() bool { // max waiting time is 45 minutes const maxRetries = 50 const baseDelay = 1 * time.Second const maxDelay = 60 * time.Second retryCount := 0 for { count, err := t.rd.GetActiveTorrentCount() if err != nil { t.log.Errorf("Cannot get active downloads count: %v", err) if retryCount >= maxRetries { t.log.Error("Max retries reached. Exiting.") return false } delay := time.Duration(math.Pow(2, float64(retryCount))) * baseDelay if delay > maxDelay { delay = maxDelay } time.Sleep(delay) retryCount++ continue } if count.DownloadingCount < count.MaxNumberOfTorrents { t.log.Infof("We can still add a new torrent, we have capacity for %d more", count.MaxNumberOfTorrents-count.DownloadingCount) return true } if retryCount >= maxRetries { t.log.Error("Max retries reached, exiting") return false } delay := time.Duration(math.Pow(2, float64(retryCount))) * baseDelay if delay > maxDelay { delay = maxDelay } time.Sleep(delay) retryCount++ } }