package torrent import ( "encoding/gob" "fmt" "os" "strings" "sync" "time" "github.com/debridmediamanager.com/zurg/internal/config" "github.com/debridmediamanager.com/zurg/pkg/logutil" "github.com/debridmediamanager.com/zurg/pkg/realdebrid" "github.com/hashicorp/golang-lru/v2/expirable" "github.com/nutsdb/nutsdb" "go.uber.org/zap" ) type TorrentManager struct { TorrentMap map[string]*Torrent requiredVersion string rd *realdebrid.RealDebrid checksum string config config.ConfigInterface cache *expirable.LRU[string, string] db *nutsdb.DB workerPool chan bool directoryMap map[string][]string processedTorrents map[string][]string mu *sync.Mutex log *zap.SugaredLogger } // NewTorrentManager creates a new torrent manager // it will fetch all torrents and their info in the background // and store them in-memory and cached in files func NewTorrentManager(config config.ConfigInterface, cache *expirable.LRU[string, string], db *nutsdb.DB) *TorrentManager { t := &TorrentManager{ TorrentMap: make(map[string]*Torrent), requiredVersion: fmt.Sprintf("8.11.2023 - retain:%v", config.EnableRetainFolderNameExtension()), rd: realdebrid.NewRealDebrid(config.GetToken(), logutil.NewLogger().Named("realdebrid")), config: config, cache: cache, db: db, workerPool: make(chan bool, config.GetNumOfWorkers()), directoryMap: make(map[string][]string), processedTorrents: make(map[string][]string), mu: &sync.Mutex{}, log: logutil.NewLogger().Named("manager"), } // start with a clean slate t.mu.Lock() newTorrents, _, err := t.rd.GetTorrents(0) if err != nil { t.log.Fatalf("Cannot get torrents: %v\n", err) } torrentsChan := make(chan *Torrent, len(newTorrents)) var wg sync.WaitGroup for i := range newTorrents { wg.Add(1) go func(idx int) { defer wg.Done() t.workerPool <- true torrentsChan <- t.getMoreInfo(newTorrents[idx]) <-t.workerPool }(i) } wg.Wait() close(torrentsChan) for newTorrent := range torrentsChan { if newTorrent == nil { continue } if _, exists := t.TorrentMap[newTorrent.AccessKey]; exists { t.TorrentMap[newTorrent.AccessKey] = t.mergeToMain(t.TorrentMap[newTorrent.AccessKey], newTorrent) } else { t.TorrentMap[newTorrent.AccessKey] = newTorrent } } t.checksum = t.getChecksum() t.mu.Unlock() // if t.config.EnableRepair() { // go t.repairAll() // } go t.startRefreshJob() return t } func (t *TorrentManager) mergeToMain(t1, t2 *Torrent) *Torrent { merged := t1 // Merge SelectedFiles fileMap := make(map[int]File) for _, f := range append(t1.SelectedFiles, t2.SelectedFiles...) { if _, exists := fileMap[f.ID]; !exists { fileMap[f.ID] = f } } for _, f := range fileMap { merged.SelectedFiles = append(merged.SelectedFiles, f) } // Merge Instances merged.Instances = append(t1.Instances, t2.Instances...) // LatestAdded if t1.LatestAdded < t2.LatestAdded { merged.LatestAdded = t2.LatestAdded } // InProgress for _, instance := range merged.Instances { if instance.Progress != 100 { merged.InProgress = true break } } return merged } // GetByDirectory returns all torrents that have a file in the specified directory func (t *TorrentManager) GetByDirectory(directory string) []Torrent { var torrents []Torrent for k, v := range t.TorrentMap { found := false for _, dir := range v.Directories { if dir == directory { found = true break } } if found { torrents = append(torrents, *t.TorrentMap[k]) } } return torrents } // FindAllTorrentsWithName finds all torrents in a given directory with a given name func (t *TorrentManager) FindAllTorrentsWithName(directory, torrentName string) []Torrent { var matchingTorrents []Torrent torrents := t.GetByDirectory(directory) for i := range torrents { if torrents[i].AccessKey == torrentName || strings.Contains(torrents[i].AccessKey, torrentName) { matchingTorrents = append(matchingTorrents, torrents[i]) } } return matchingTorrents } // proxy func (t *TorrentManager) UnrestrictUntilOk(link string) *realdebrid.UnrestrictResponse { return t.rd.UnrestrictUntilOk(link) } type torrentsResponse struct { torrents []realdebrid.Torrent totalCount int } // generates a checksum based on the number of torrents, the first torrent id and the number of active torrents func (t *TorrentManager) getChecksum() string { torrentsChan := make(chan torrentsResponse) countChan := make(chan int) errChan := make(chan error, 2) // accommodate errors from both goroutines // GetTorrents request go func() { torrents, totalCount, err := t.rd.GetTorrents(1) if err != nil { errChan <- err return } torrentsChan <- torrentsResponse{torrents: torrents, totalCount: totalCount} }() // GetActiveTorrentCount request go func() { count, err := t.rd.GetActiveTorrentCount() if err != nil { errChan <- err return } countChan <- count.DownloadingCount }() var torrents []realdebrid.Torrent var totalCount, count int for i := 0; i < 2; i++ { select { case torrentsResp := <-torrentsChan: torrents = torrentsResp.torrents totalCount = torrentsResp.totalCount case count = <-countChan: case err := <-errChan: t.log.Errorf("Checksum API Error: %v\n", err) return "" } } if len(torrents) == 0 { t.log.Error("Huh, no torrents returned") return "" } checksum := fmt.Sprintf("%d%s%d", totalCount, torrents[0].ID, count) return checksum } // startRefreshJob periodically refreshes the torrents func (t *TorrentManager) startRefreshJob() { t.log.Info("Starting periodic refresh") for { <-time.After(time.Duration(t.config.GetRefreshEverySeconds()) * time.Second) checksum := t.getChecksum() if checksum == t.checksum { continue } t.mu.Lock() newTorrents, _, err := t.rd.GetTorrents(0) if err != nil { t.log.Errorf("Cannot get torrents: %v\n", err) continue } torrentsChan := make(chan *Torrent) var wg sync.WaitGroup for i := range newTorrents { wg.Add(1) go func(idx int) { defer wg.Done() t.workerPool <- true torrentsChan <- t.getMoreInfo(newTorrents[idx]) <-t.workerPool }(i) } wg.Wait() close(torrentsChan) for newTorrent := range torrentsChan { if newTorrent == nil { continue } if _, exists := t.TorrentMap[newTorrent.AccessKey]; exists { t.TorrentMap[newTorrent.AccessKey] = t.mergeToMain(t.TorrentMap[newTorrent.AccessKey], newTorrent) } else { t.TorrentMap[newTorrent.AccessKey] = newTorrent } } t.checksum = t.getChecksum() t.mu.Unlock() // if t.config.EnableRepair() { // go t.repairAll() // } go OnLibraryUpdateHook(t.config) } } // getMoreInfo gets original name, size and files for a torrent func (t *TorrentManager) getMoreInfo(rdTorrent realdebrid.Torrent) *Torrent { var info *realdebrid.TorrentInfo var err error // file cache torrentFromFile := t.readFromFile(rdTorrent.ID) if torrentFromFile != nil && len(torrentFromFile.ID) > 0 && len(torrentFromFile.Links) == len(rdTorrent.Links) { // see if api data and file data still match // then it means data is still usable info = torrentFromFile } if info == nil { info, err = t.rd.GetTorrentInfo(rdTorrent.ID) if err != nil { t.log.Errorf("Cannot get info for id=%s: %v\n", rdTorrent.ID, err) return nil } } // SelectedFiles is a subset of Files with only the selected ones // it also has a Link field, which can be empty // if it is empty, it means the file is no longer available // Files+Links together are the same as SelectedFiles var selectedFiles []File streamableCount := 0 // if some Links are empty, we need to repair it forRepair := false for _, file := range info.Files { if isStreamable(file.Path) { streamableCount++ } if file.Selected == 0 { continue } selectedFiles = append(selectedFiles, File{ File: file, Link: "", // no link yet }) } if len(selectedFiles) > len(info.Links) && info.Progress == 100 { t.log.Debugf("Some links has expired for %s %s: %d selected but only %d link(s)", info.ID, info.Name, len(selectedFiles), len(info.Links)) // chaotic file means RD will not output the desired file selection // e.g. even if we select just a single mkv, it will output a rar var isChaotic bool selectedFiles, isChaotic = t.organizeChaos(&rdTorrent, selectedFiles) if isChaotic { t.log.Infof("Torrent %s %s is unfixable, it's always returning an unstreamable link, ignoring", info.ID, info.Name) t.log.Debugf("You can try fixing it yourself magnet:?xt=urn:btih:%s", info.Hash) } else { if streamableCount > 1 { t.log.Infof("Torrent %s %s marked for repair", info.ID, info.Name) forRepair = true } else { t.log.Infof("Torrent %s %s is unfixable, the lone streamable link has expired, ignoring", info.ID, info.Name) t.log.Debugf("You can try fixing it yourself magnet:?xt=urn:btih:%s", info.Hash) } } } else if len(selectedFiles) > 0 { // all links are still intact! good! for i, link := range info.Links { selectedFiles[i].Link = link } } info.ForRepair = forRepair torrent := Torrent{ AccessKey: t.getName(info.Name, info.OriginalName), SelectedFiles: selectedFiles, Directories: t.getDirectories(info), LatestAdded: info.Added, InProgress: info.Progress != 100, Instances: []realdebrid.TorrentInfo{*info}, } if len(selectedFiles) > 0 && torrentFromFile == nil { t.writeToFile(info) // only when there are selected files, else it's useless } return &torrent } func (t *TorrentManager) getName(name, originalName string) string { // drop the extension from the name if t.config.EnableRetainFolderNameExtension() && strings.Contains(name, originalName) { return name } else { ret := strings.TrimSuffix(originalName, ".mp4") ret = strings.TrimSuffix(ret, ".mkv") return ret } } func (t *TorrentManager) getDirectories(torrent *realdebrid.TorrentInfo) []string { var ret []string // Map torrents to directories switch t.config.GetVersion() { case "v1": configV1 := t.config.(*config.ZurgConfigV1) groupMap := configV1.GetGroupMap() // for every group, iterate over every torrent // and then sprinkle/distribute the torrents to the directories of the group for _, directories := range groupMap { for _, directory := range directories { var filenames []string for _, file := range torrent.Files { if file.Selected == 0 { continue } filenames = append(filenames, file.Path) } accessKey := t.getName(torrent.Name, torrent.OriginalName) if configV1.MeetsConditions(directory, torrent.ID, accessKey, filenames) { ret = append(ret, directory) break // we found a directory for this torrent for this group, so we can stop looking for more } } } default: t.log.Error("Unknown config version") } return ret } func (t *TorrentManager) writeToFile(torrent *realdebrid.TorrentInfo) { filePath := fmt.Sprintf("data/%s.bin", torrent.ID) file, err := os.Create(filePath) if err != nil { t.log.Fatalf("Failed creating file: %s", err) return } defer file.Close() torrent.Version = t.requiredVersion dataEncoder := gob.NewEncoder(file) dataEncoder.Encode(torrent) } func (t *TorrentManager) readFromFile(torrentID string) *realdebrid.TorrentInfo { filePath := fmt.Sprintf("data/%s.bin", torrentID) fileInfo, err := os.Stat(filePath) if err != nil { return nil } if time.Since(fileInfo.ModTime()) > time.Duration(t.config.GetCacheTimeHours())*time.Hour { return nil } file, err := os.Open(filePath) if err != nil { return nil } defer file.Close() var torrent realdebrid.TorrentInfo dataDecoder := gob.NewDecoder(file) err = dataDecoder.Decode(&torrent) if err != nil { return nil } if torrent.Version != t.requiredVersion { return nil } return &torrent } func (t *TorrentManager) organizeChaos(info *realdebrid.Torrent, selectedFiles []File) ([]File, bool) { type Result struct { Response *realdebrid.UnrestrictResponse } resultsChan := make(chan Result, len(info.Links)) var wg sync.WaitGroup // Limit concurrency sem := make(chan bool, t.config.GetNumOfWorkers()) for _, link := range info.Links { wg.Add(1) sem <- true go func(lnk string) { defer wg.Done() defer func() { <-sem }() resp := t.rd.UnrestrictUntilOk(lnk) resultsChan <- Result{Response: resp} }(link) } go func() { t.log.Debugf("Checking %d link(s) for problematic torrent id=%s", len(info.Links), info.ID) wg.Wait() close(sem) close(resultsChan) t.log.Debugf("Closing results channel for torrent id=%s, checking...", info.ID) }() isChaotic := false for result := range resultsChan { if result.Response == nil { continue } found := false for i := range selectedFiles { if strings.Contains(selectedFiles[i].Path, result.Response.Filename) { t.log.Debugf("Found a file that is in the selection for torrent id=%s: %s", info.ID, result.Response.Filename) selectedFiles[i].Link = result.Response.Link found = true } } if !found { isChaotic = result.Response.Streamable == 0 t.log.Debugf("Found a file that is not in the selection for torrent id=%s: %s %v", info.ID, result.Response.Filename, result.Response.Streamable) selectedFiles = append(selectedFiles, File{ File: realdebrid.File{ Path: result.Response.Filename, Bytes: result.Response.Filesize, Selected: 1, }, Link: result.Response.Link, }) } } return selectedFiles, isChaotic } // HideTheFile marks a file as deleted // func (t *TorrentManager) HideTheFile(torrent *Torrent, file *File) { // file.Unavailable = true // t.repair(torrent, false) // } // func (t *TorrentManager) repairAll() { // for _, torrent := range t.torrentMap { // // do not repair if: // // in progress // hasInProgress := false // for _, info := range torrent.Instances { // if info.Progress != 100 { // hasInProgress = true // break // } // } // if hasInProgress { // continue // } // // already repaired based on other instances // var missingFiles []File // for _, file := range torrent.SelectedFiles { // if file.Link == "" || file.Unavailable { // missingFiles = append(missingFiles, file) // } // } // for _, sFile := range selectedFiles { // if sFile.Link == "" || sFile.Unavailable { // found := false // for _, fFile := range foundFiles { // // same file but different link, then yes it has been repaired // if sFile.Path == fFile.Path && sFile.Link != fFile.Link { // found = true // break // } // } // if !found { // missingFiles = append(missingFiles, sFile) // } // } // } // if len(missingFiles) == 0 { // t.log.Infof("Torrent id=%s is already repaired", info.ID) // return // } // for _, info := range torrent.Instances { // if info.Progress != 100 { // continue // } // if info.ForRepair { // t.log.Infof("There were less links than was expected on %s %s; fixing...", info.ID, info.Name) // t.repair(&info, true) // break // only repair the first one for repair and then move on // } // if len(info.Links) == 0 && info.Progress == 100 { // // If the torrent has no links // // and already processing repair // // delete it! // t.log.Infof("Deleting broken torrent id=%s as it doesn't contain any files", info.ID) // t.rd.DeleteTorrent(info.ID) // } // } // } // } // func (t *TorrentManager) repair(info *realdebrid.TorrentInfo, tryReinsertionFirst bool) { // // then we repair it! // t.log.Infof("Repairing torrent id=%s", info.ID) // // check if we can still add more downloads // proceed := t.canCapacityHandle() // if !proceed { // t.log.Error("Cannot add more torrents, exiting") // return // } // // first solution: add the same selection, maybe it can be fixed by reinsertion? // success := false // if tryReinsertionFirst { // success = t.reinsertTorrent(info, "", true) // } // if !success { // // if all the selected files are missing but there are other streamable files // var otherStreamableFileIDs []int // for _, file := range info.Files { // found := false // for _, selectedFile := range selectedFiles { // if selectedFile.ID == file.ID { // found = true // break // } // } // if !found && isStreamable(file.Path) { // otherStreamableFileIDs = append(otherStreamableFileIDs, file.ID) // } // } // if (len(missingFiles) == len(selectedFiles) || len(missingFiles) == 1) && len(otherStreamableFileIDs) > 0 { // // we will download 1 extra streamable file to force a redownload of the missing files // // or if there's only 1 missing file, we will download 1 more to prevent a rename // missingFilesPlus1 := strings.Join(getFileIDs(missingFiles), ",") // t.log.Infof("Redownloading %d missing files", len(missingFiles)) // t.reinsertTorrent(info, missingFilesPlus1, false) // } else if len(selectedFiles) > 1 { // // if not, last resort: add only the missing files but do it in 2 batches // half := len(missingFiles) / 2 // missingFiles1 := strings.Join(getFileIDs(missingFiles[:half]), ",") // missingFiles2 := strings.Join(getFileIDs(missingFiles[half:]), ",") // if missingFiles1 != "" { // t.log.Infof("Redownloading %d missing files; batch 1 of 2", len(missingFiles1)) // t.reinsertTorrent(info, missingFiles1, false) // } // if missingFiles2 != "" { // t.log.Infof("Redownloading %d missing files; batch 2 of 2", len(missingFiles2)) // t.reinsertTorrent(info, missingFiles2, false) // } else { // t.log.Info("No other missing files left to reinsert") // } // } else { // t.log.Infof("Torrent id=%s is unfixable as the only link cached in RD is already broken", info.ID) // t.log.Debugf("You can try fixing it yourself magnet:?xt=urn:btih:%s", info.Hash) // return // } // t.log.Info("Waiting for downloads to finish") // } // } // func (t *TorrentManager) reinsertTorrent(torrent *realdebrid.TorrentInfo, missingFiles string, deleteIfFailed bool) bool { // // if missingFiles is not provided, look for missing files // if missingFiles == "" { // var tmpSelection string // for _, file := range torrent.Files { // if file.Selected == 0 { // continue // } // tmpSelection += fmt.Sprintf("%d,", file.ID) // } // if tmpSelection == "" { // return false // } // if len(tmpSelection) > 0 { // missingFiles = tmpSelection[:len(tmpSelection)-1] // } // } // // redownload torrent // resp, err := t.rd.AddMagnetHash(torrent.Hash) // if err != nil { // t.log.Errorf("Cannot redownload torrent: %v", err) // return false // } // newTorrentID := resp.ID // err = t.rd.SelectTorrentFiles(newTorrentID, missingFiles) // if err != nil { // t.log.Errorf("Cannot start redownloading: %v", err) // } // if deleteIfFailed { // if err != nil { // t.rd.DeleteTorrent(newTorrentID) // return false // } // time.Sleep(1 * time.Second) // // see if the torrent is ready // info, err := t.rd.GetTorrentInfo(newTorrentID) // if err != nil { // t.log.Errorf("Cannot get info on redownloaded torrent id=%s : %v", newTorrentID, err) // if deleteIfFailed { // t.rd.DeleteTorrent(newTorrentID) // } // return false // } // time.Sleep(1 * time.Second) // if info.Progress != 100 { // t.log.Infof("Torrent id=%s is not cached anymore so we have to wait until completion, currently %d%%", info.ID, info.Progress) // t.rd.DeleteTorrent(newTorrentID) // return false // } // missingCount := len(strings.Split(missingFiles, ",")) // if len(info.Links) != missingCount { // t.log.Infof("It didn't fix the issue for id=%s, only got %d files but we need %d, undoing", info.ID, len(info.Links), missingCount) // t.rd.DeleteTorrent(newTorrentID) // return false // } // t.log.Infof("Redownload successful id=%s, deleting old torrent id=%s", newTorrentID, torrent.ID) // t.rd.DeleteTorrent(torrent.ID) // } // return true // } // func (t *TorrentManager) canCapacityHandle() bool { // // max waiting time is 45 minutes // const maxRetries = 50 // const baseDelay = 1 * time.Second // const maxDelay = 60 * time.Second // retryCount := 0 // for { // count, err := t.rd.GetActiveTorrentCount() // if err != nil { // t.log.Errorf("Cannot get active downloads count: %v", err) // if retryCount >= maxRetries { // t.log.Error("Max retries reached. Exiting.") // return false // } // delay := time.Duration(math.Pow(2, float64(retryCount))) * baseDelay // if delay > maxDelay { // delay = maxDelay // } // time.Sleep(delay) // retryCount++ // continue // } // if count.DownloadingCount < count.MaxNumberOfTorrents { // t.log.Infof("We can still add a new torrent, we have capacity for %d more", count.MaxNumberOfTorrents-count.DownloadingCount) // return true // } // if retryCount >= maxRetries { // t.log.Error("Max retries reached, exiting") // return false // } // delay := time.Duration(math.Pow(2, float64(retryCount))) * baseDelay // if delay > maxDelay { // delay = maxDelay // } // time.Sleep(delay) // retryCount++ // } // }