package torrent import ( "encoding/gob" "fmt" "math" "os" "strings" "sync" "time" "github.com/debridmediamanager.com/zurg/internal/config" "github.com/debridmediamanager.com/zurg/pkg/logutil" "github.com/debridmediamanager.com/zurg/pkg/realdebrid" "github.com/hashicorp/golang-lru/v2/expirable" "go.uber.org/zap" ) type TorrentManager struct { requiredVersion string torrents []Torrent inProgress []string checksum string config config.ConfigInterface cache *expirable.LRU[string, string] workerPool chan bool directoryMap map[string][]string processedTorrents map[string][]string mu *sync.Mutex log *zap.SugaredLogger } // NewTorrentManager creates a new torrent manager // it will fetch all torrents and their info in the background // and store them in-memory; it is called only once at startup func NewTorrentManager(config config.ConfigInterface, cache *expirable.LRU[string, string]) *TorrentManager { t := &TorrentManager{ requiredVersion: fmt.Sprintf("4.11.2023 - retain:%v", config.EnableRetainFolderNameExtension()), config: config, cache: cache, workerPool: make(chan bool, config.GetNumOfWorkers()), directoryMap: make(map[string][]string), processedTorrents: make(map[string][]string), mu: &sync.Mutex{}, log: logutil.NewLogger().Named("manager"), } // Initialize torrents for the first time t.mu.Lock() t.torrents = t.getFreshListFromAPI() t.checksum = t.getChecksum() t.mu.Unlock() // log.Println("First checksum", t.checksum) var wg sync.WaitGroup for i := range t.torrents { wg.Add(1) go func(idx int) { defer wg.Done() t.workerPool <- true t.addMoreInfo(&t.torrents[idx]) <-t.workerPool }(i) } if t.config.EnableRepair() { go t.repairAll(&wg) } wg.Wait() t.mapToDirectories() go t.startRefreshJob() return t } // GetByDirectory returns all torrents that have a file in the specified directory func (t *TorrentManager) GetByDirectory(directory string) []Torrent { var torrents []Torrent for i := range t.torrents { for _, dir := range t.directoryMap[t.torrents[i].Name] { if dir == directory { torrents = append(torrents, t.torrents[i]) } } } return torrents } // HideTheFile marks a file as deleted func (t *TorrentManager) HideTheFile(torrent *Torrent, file *File) { file.Unavailable = true t.repair(torrent.ID, torrent.SelectedFiles, false) } // FindAllTorrentsWithName finds all torrents in a given directory with a given name func (t *TorrentManager) FindAllTorrentsWithName(directory, torrentName string) []Torrent { var matchingTorrents []Torrent torrents := t.GetByDirectory(directory) for i := range torrents { if torrents[i].Name == torrentName || strings.HasPrefix(torrents[i].Name, torrentName) { matchingTorrents = append(matchingTorrents, torrents[i]) } } return matchingTorrents } // findAllDownloadedFilesFromHash finds all files that were with a given hash func (t *TorrentManager) findAllDownloadedFilesFromHash(hash string) []File { var files []File for _, torrent := range t.torrents { if torrent.Hash == hash { for _, file := range torrent.SelectedFiles { if file.Link != "" { files = append(files, file) } } } } return files } type torrentsResponse struct { torrents []realdebrid.Torrent totalCount int } func (t *TorrentManager) getChecksum() string { torrentsChan := make(chan torrentsResponse) countChan := make(chan int) errChan := make(chan error, 2) // accommodate errors from both goroutines // GetTorrents request go func() { torrents, totalCount, err := realdebrid.GetTorrents(t.config.GetToken(), 1) if err != nil { errChan <- err return } torrentsChan <- torrentsResponse{torrents: torrents, totalCount: totalCount} }() // GetActiveTorrentCount request go func() { count, err := realdebrid.GetActiveTorrentCount(t.config.GetToken()) if err != nil { errChan <- err return } countChan <- count.DownloadingCount }() var torrents []realdebrid.Torrent var totalCount, count int for i := 0; i < 2; i++ { select { case torrentsResp := <-torrentsChan: torrents = torrentsResp.torrents totalCount = torrentsResp.totalCount case count = <-countChan: case err := <-errChan: t.log.Errorf("Checksum API Error: %v\n", err) return "" } } if len(torrents) == 0 { t.log.Error("Huh, no torrents returned") return "" } checksum := fmt.Sprintf("%d%s%d", totalCount, torrents[0].ID, count) return checksum } // startRefreshJob periodically refreshes the torrents func (t *TorrentManager) startRefreshJob() { t.log.Info("Starting periodic refresh") for { <-time.After(time.Duration(t.config.GetRefreshEverySeconds()) * time.Second) checksum := t.getChecksum() if checksum == t.checksum { continue } t.cache.Purge() newTorrents := t.getFreshListFromAPI() var wg sync.WaitGroup for i := range newTorrents { wg.Add(1) go func(idx int) { defer wg.Done() t.workerPool <- true t.addMoreInfo(&newTorrents[idx]) <-t.workerPool }(i) } wg.Wait() // apply side effects t.mu.Lock() t.torrents = newTorrents t.checksum = t.getChecksum() t.mu.Unlock() // log.Println("Checksum changed", t.checksum) if t.config.EnableRepair() { go t.repairAll(&wg) } go t.mapToDirectories() go OnLibraryUpdateHook(t.config) } } // getFreshListFromAPI returns all torrents func (t *TorrentManager) getFreshListFromAPI() []Torrent { torrents, _, err := realdebrid.GetTorrents(t.config.GetToken(), 0) if err != nil { t.log.Errorf("Cannot get torrents: %v\n", err) return nil } // convert to own internal type without SelectedFiles yet // populate inProgress var torrentsV2 []Torrent t.inProgress = t.inProgress[:0] // reset for _, torrent := range torrents { torrent.Name = strings.TrimSuffix(torrent.Name, "/") torrentV2 := Torrent{ Torrent: torrent, SelectedFiles: nil, ForRepair: false, lock: &sync.Mutex{}, } torrentsV2 = append(torrentsV2, torrentV2) if torrent.Progress != 100 { t.inProgress = append(t.inProgress, torrent.Hash) } } t.log.Infof("Fetched %d torrents", len(torrentsV2)) return torrentsV2 } // addMoreInfo updates the selected files for a torrent func (t *TorrentManager) addMoreInfo(torrent *Torrent) { // file cache torrentFromFile := t.readFromFile(torrent.ID) if torrentFromFile != nil { // see if api data and file data still match // then it means data is still usable if len(torrentFromFile.Links) == len(torrent.Links) { torrent.Name = t.getName(torrentFromFile) torrent.ForRepair = torrentFromFile.ForRepair torrent.SelectedFiles = torrentFromFile.SelectedFiles[:] return } } // no file data yet as it is still downloading if torrent.Progress != 100 { return } // t.log.Println("Getting info for", torrent.ID) info, err := realdebrid.GetTorrentInfo(t.config.GetToken(), torrent.ID) if err != nil { t.log.Errorf("Cannot get info: %v\n", err) return } // SelectedFiles is a subset of Files with only the selected ones // it also has a Link field, which can be empty // if it is empty, it means the file is no longer available // Files+Links together are the same as SelectedFiles var selectedFiles []File var streamableFiles []File // if some Links are empty, we need to repair it forRepair := false for _, file := range info.Files { if isStreamable(file.Path) { streamableFiles = append(streamableFiles, File{ File: file, Link: "", }) } if file.Selected == 0 { continue } selectedFiles = append(selectedFiles, File{ File: file, Link: "", }) } if len(selectedFiles) > len(info.Links) && info.Progress == 100 { t.log.Debugf("Some links has expired for %s %s: %d selected but only %d link(s)", info.ID, info.Name, len(selectedFiles), len(info.Links)) // chaotic file means RD will not output the desired file selection // e.g. even if we select just a single mkv, it will output a rar var isChaotic bool selectedFiles, isChaotic = t.organizeChaos(info, selectedFiles) if isChaotic { t.log.Infof("Torrent %s %s is unfixable, it's always returning an unstreamable link, ignoring", info.ID, info.Name) t.log.Debugf("You can try fixing it yourself magnet:?xt=urn:btih:%s", info.Hash) } else { if len(streamableFiles) > 1 { t.log.Infof("Torrent %s %s marked for repair", info.ID, info.Name) forRepair = true } else { t.log.Infof("Torrent %s %s is unfixable, the lone streamable link has expired, ignoring", info.ID, info.Name) t.log.Debugf("You can try fixing it yourself magnet:?xt=urn:btih:%s", info.Hash) } } } else { // all links are still intact! good! for i, link := range info.Links { selectedFiles[i].Link = link } } // update file cache torrent.OriginalName = info.OriginalName torrent.Name = t.getName(torrent) if len(selectedFiles) > 0 { // update the torrent with more data! torrent.SelectedFiles = selectedFiles torrent.ForRepair = forRepair t.writeToFile(torrent) } } func (t *TorrentManager) getName(torrent *Torrent) string { // drop the extension from the name if t.config.EnableRetainFolderNameExtension() && strings.Contains(torrent.Name, torrent.OriginalName) { return torrent.Name } else { ret := strings.TrimSuffix(torrent.OriginalName, ".mp4") ret = strings.TrimSuffix(ret, ".mkv") return ret } } // mapToDirectories maps torrents to directories func (t *TorrentManager) mapToDirectories() { // Map torrents to directories switch t.config.GetVersion() { case "v1": configV1 := t.config.(*config.ZurgConfigV1) groupMap := configV1.GetGroupMap() // for every group, iterate over every torrent // and then sprinkle/distribute the torrents to the directories of the group for group, directories := range groupMap { counter := make(map[string]int) for i := range t.torrents { // don't process torrents that are already mapped if it is not the first run alreadyMappedToGroup := false for _, mappedGroup := range t.processedTorrents[t.torrents[i].Name] { if mappedGroup == group { alreadyMappedToGroup = true } } if alreadyMappedToGroup { continue } for _, directory := range directories { var filenames []string for _, file := range t.torrents[i].SelectedFiles { filenames = append(filenames, file.Path) } if configV1.MeetsConditions(directory, t.torrents[i].ID, t.torrents[i].Name, filenames) { found := false // check if it is already mapped to this directory for _, dir := range t.directoryMap[t.torrents[i].Name] { if dir == directory { found = true break // it is already mapped to this directory } } if !found { counter[directory]++ t.mu.Lock() t.directoryMap[t.torrents[i].Name] = append(t.directoryMap[t.torrents[i].Name], directory) t.mu.Unlock() break // we found a directory for this torrent, so we can stop looking for more } } } t.mu.Lock() t.processedTorrents[t.torrents[i].Name] = append(t.processedTorrents[t.torrents[i].Name], group) t.mu.Unlock() } sum := 0 for _, count := range counter { sum += count } if sum > 0 { t.log.Infof("Group processing completed: %s %v total: %d", group, counter, sum) } else { t.log.Infof("No new additions to directory group %s", group) } } default: t.log.Error("Unknown config version") } } // getByID returns a torrent by its ID func (t *TorrentManager) getByID(torrentID string) *Torrent { for i := range t.torrents { if t.torrents[i].ID == torrentID { return &t.torrents[i] } } return nil } // writeToFile writes a torrent to a file func (t *TorrentManager) writeToFile(torrent *Torrent) { filePath := fmt.Sprintf("data/%s.bin", torrent.ID) file, err := os.Create(filePath) if err != nil { t.log.Fatalf("Failed creating file: %s", err) return } defer file.Close() torrent.Version = t.requiredVersion dataEncoder := gob.NewEncoder(file) dataEncoder.Encode(torrent) } // readFromFile reads a torrent from a file func (t *TorrentManager) readFromFile(torrentID string) *Torrent { filePath := fmt.Sprintf("data/%s.bin", torrentID) fileInfo, err := os.Stat(filePath) if err != nil { return nil } if time.Since(fileInfo.ModTime()) > time.Duration(t.config.GetCacheTimeHours())*time.Hour { return nil } file, err := os.Open(filePath) if err != nil { return nil } defer file.Close() var torrent Torrent dataDecoder := gob.NewDecoder(file) err = dataDecoder.Decode(&torrent) if err != nil { return nil } if torrent.Version != t.requiredVersion { return nil } return &torrent } func (t *TorrentManager) repairAll(wg *sync.WaitGroup) { wg.Wait() for _, torrent := range t.torrents { if torrent.ForRepair { t.log.Infof("Issues were detected on %s %s; fixing...", torrent.ID, torrent.Name) t.repair(torrent.ID, torrent.SelectedFiles, true) } if len(torrent.Links) == 0 && torrent.Progress == 100 { // If the torrent has no links // and already processing repair // delete it! t.log.Infof("Deleting broken torrent %s %s as it doesn't contain any files", torrent.ID, torrent.Name) realdebrid.DeleteTorrent(t.config.GetToken(), torrent.ID) } } } func (t *TorrentManager) repair(torrentID string, selectedFiles []File, tryReinsertionFirst bool) { torrent := t.getByID(torrentID) if torrent == nil { return } // check if it is already "being" repaired found := false for _, hash := range t.inProgress { if hash == torrent.Hash { found = true break } } if found { t.log.Infof("Repair in progress, skipping %s", torrentID) return } // check if it is already repaired foundFiles := t.findAllDownloadedFilesFromHash(torrent.Hash) var missingFiles []File for _, sFile := range selectedFiles { if sFile.Link == "" || sFile.Unavailable { found := false for _, fFile := range foundFiles { // same file but different link, then yes it has been repaired if sFile.Path == fFile.Path && sFile.Link != fFile.Link { found = true break } } if !found { missingFiles = append(missingFiles, sFile) } } } if len(missingFiles) == 0 { t.log.Infof("Torrent %s %s is already repaired", torrent.ID, torrent.Name) return } // then we repair it! t.log.Infof("Repairing torrent %s %s", torrent.ID, torrent.Name) // check if we can still add more downloads proceed := t.canCapacityHandle() if !proceed { t.log.Error("Cannot add more torrents, exiting") return } // first solution: add the same selection, maybe it can be fixed by reinsertion? success := false if tryReinsertionFirst { success = t.reinsertTorrent(torrent, "", true) } if !success { // if all the selected files are missing but there are other streamable files var otherStreamableFileIDs []int for _, file := range torrent.Files { found := false for _, selectedFile := range selectedFiles { if selectedFile.ID == file.ID { found = true break } } if !found && isStreamable(file.Path) { otherStreamableFileIDs = append(otherStreamableFileIDs, file.ID) } } if (len(missingFiles) == len(selectedFiles) || len(missingFiles) == 1) && len(otherStreamableFileIDs) > 0 { // we will download 1 extra streamable file to force a redownload of the missing files // or if there's only 1 missing file, we will download 1 more to prevent a rename missingFilesPlus1 := strings.Join(getFileIDs(missingFiles), ",") t.log.Infof("Redownloading %d missing files", len(missingFiles)) t.reinsertTorrent(torrent, missingFilesPlus1, false) } else if len(selectedFiles) > 1 { // if not, last resort: add only the missing files but do it in 2 batches half := len(missingFiles) / 2 missingFiles1 := strings.Join(getFileIDs(missingFiles[:half]), ",") missingFiles2 := strings.Join(getFileIDs(missingFiles[half:]), ",") if missingFiles1 != "" { t.log.Infof("Redownloading %d missing files; batch 1 of 2", len(missingFiles1)) t.reinsertTorrent(torrent, missingFiles1, false) } if missingFiles2 != "" { t.log.Infof("Redownloading %d missing files; batch 2 of 2", len(missingFiles2)) t.reinsertTorrent(torrent, missingFiles2, false) } else { t.log.Info("No other missing files left to reinsert") } } else { t.log.Infof("Torrent %s %s is unfixable as the only link cached in RD is already broken", torrent.ID, torrent.Name) t.log.Debugf("You can try fixing it yourself magnet:?xt=urn:btih:%s", torrent.Hash) return } t.log.Info("Waiting for downloads to finish") } } func (t *TorrentManager) reinsertTorrent(torrent *Torrent, missingFiles string, deleteIfFailed bool) bool { // if missingFiles is not provided, look for missing files if missingFiles == "" { t.log.Info("Redownloading whole torrent", torrent.Name) var selection string for _, file := range torrent.SelectedFiles { selection += fmt.Sprintf("%d,", file.ID) } if selection == "" { return false } if len(selection) > 0 { missingFiles = selection[:len(selection)-1] } } // redownload torrent resp, err := realdebrid.AddMagnetHash(t.config.GetToken(), torrent.Hash) if err != nil { t.log.Errorf("Cannot redownload torrent: %v", err) return false } newTorrentID := resp.ID err = realdebrid.SelectTorrentFiles(t.config.GetToken(), newTorrentID, missingFiles) if err != nil { t.log.Errorf("Cannot start redownloading: %v", err) } if deleteIfFailed { if err != nil { realdebrid.DeleteTorrent(t.config.GetToken(), newTorrentID) return false } time.Sleep(1 * time.Second) // see if the torrent is ready info, err := realdebrid.GetTorrentInfo(t.config.GetToken(), newTorrentID) if err != nil { t.log.Errorf("Cannot get info on redownloaded torrent: %v", err) if deleteIfFailed { realdebrid.DeleteTorrent(t.config.GetToken(), newTorrentID) } return false } time.Sleep(1 * time.Second) if info.Progress != 100 { t.log.Infof("Torrent is not cached anymore so we have to wait until completion, currently %d%%", info.Progress) realdebrid.DeleteTorrent(t.config.GetToken(), newTorrentID) return false } if len(info.Links) != len(torrent.SelectedFiles) { t.log.Infof("It didn't fix the issue, only got %d files but we need %d, undoing", len(info.Links), len(torrent.SelectedFiles)) realdebrid.DeleteTorrent(t.config.GetToken(), newTorrentID) return false } t.log.Info("Redownload successful, deleting old torrent") realdebrid.DeleteTorrent(t.config.GetToken(), torrent.ID) } return true } func (t *TorrentManager) organizeChaos(info *realdebrid.Torrent, selectedFiles []File) ([]File, bool) { type Result struct { Response *realdebrid.UnrestrictResponse } resultsChan := make(chan Result, len(info.Links)) var wg sync.WaitGroup // Limit concurrency sem := make(chan struct{}, t.config.GetNumOfWorkers()) for _, link := range info.Links { wg.Add(1) sem <- struct{}{} go func(lnk string) { defer wg.Done() defer func() { <-sem }() unrestrictFn := func() (*realdebrid.UnrestrictResponse, error) { return realdebrid.UnrestrictCheck(t.config.GetToken(), lnk) } resp := realdebrid.RetryUntilOk(unrestrictFn) if resp != nil { resultsChan <- Result{Response: resp} } }(link) } go func() { wg.Wait() close(sem) close(resultsChan) }() isChaotic := false for result := range resultsChan { found := false for i := range selectedFiles { if strings.HasSuffix(selectedFiles[i].Path, result.Response.Filename) { selectedFiles[i].Link = result.Response.Link found = true } } if !found { // "chaos" file, we don't know where it belongs isChaotic = !isStreamable(result.Response.Filename) selectedFiles = append(selectedFiles, File{ File: realdebrid.File{ Path: result.Response.Filename, Bytes: result.Response.Filesize, Selected: 1, }, Link: result.Response.Link, }) } } return selectedFiles, isChaotic } func (t *TorrentManager) canCapacityHandle() bool { // max waiting time is 45 minutes const maxRetries = 50 const baseDelay = 1 * time.Second const maxDelay = 60 * time.Second retryCount := 0 for { count, err := realdebrid.GetActiveTorrentCount(t.config.GetToken()) if err != nil { t.log.Errorf("Cannot get active downloads count: %v", err) if retryCount >= maxRetries { t.log.Error("Max retries reached. Exiting.") return false } delay := time.Duration(math.Pow(2, float64(retryCount))) * baseDelay if delay > maxDelay { delay = maxDelay } time.Sleep(delay) retryCount++ continue } if count.DownloadingCount < count.MaxNumberOfTorrents { t.log.Infof("We can still add a new torrent, we have capacity for %d more", count.MaxNumberOfTorrents-count.DownloadingCount) return true } if retryCount >= maxRetries { t.log.Error("Max retries reached, exiting") return false } delay := time.Duration(math.Pow(2, float64(retryCount))) * baseDelay if delay > maxDelay { delay = maxDelay } time.Sleep(delay) retryCount++ } }