package torrent import ( "encoding/gob" "fmt" "log" "math" "os" "strings" "sync" "time" "github.com/debridmediamanager.com/zurg/internal/config" "github.com/debridmediamanager.com/zurg/pkg/realdebrid" "github.com/hashicorp/golang-lru/v2/expirable" ) type TorrentManager struct { requiredVersion string torrents []Torrent inProgress []string checksum string config config.ConfigInterface cache *expirable.LRU[string, string] workerPool chan bool TorrentDirectoriesMap map[string][]string processedTorrents map[string][]string } // NewTorrentManager creates a new torrent manager // it will fetch all torrents and their info in the background // and store them in-memory func NewTorrentManager(config config.ConfigInterface, cache *expirable.LRU[string, string]) *TorrentManager { t := &TorrentManager{ requiredVersion: "28.10.2023", config: config, cache: cache, workerPool: make(chan bool, config.GetNumOfWorkers()), TorrentDirectoriesMap: make(map[string][]string), processedTorrents: make(map[string][]string), } // Initialize torrents for the first time t.torrents = t.getFreshListFromAPI() t.checksum = t.getChecksum() // log.Println("First checksum", t.checksum) var wg sync.WaitGroup for i := range t.torrents { wg.Add(1) go func(idx int) { defer wg.Done() t.workerPool <- true t.addMoreInfo(&t.torrents[idx]) <-t.workerPool }(i) } if t.config.EnableRepair() { go t.repairAll(&wg) } wg.Wait() t.mapToDirectories() go t.startRefreshJob() return t } // GetByDirectory returns all torrents that have a file in the specified directory func (t *TorrentManager) GetByDirectory(directory string) []Torrent { var torrents []Torrent for i := range t.torrents { for _, dir := range t.TorrentDirectoriesMap[t.torrents[i].Name] { if dir == directory { torrents = append(torrents, t.torrents[i]) } } } return torrents } // HideTheFile marks a file as deleted func (t *TorrentManager) HideTheFile(torrent *Torrent, file *File) { file.Unavailable = true t.repair(torrent.ID, torrent.SelectedFiles, false) } // FindAllTorrentsWithName finds all torrents in a given directory with a given name func (t *TorrentManager) FindAllTorrentsWithName(directory, torrentName string) []Torrent { var matchingTorrents []Torrent torrents := t.GetByDirectory(directory) for i := range torrents { if torrents[i].Name == torrentName || strings.HasPrefix(torrents[i].Name, torrentName) { matchingTorrents = append(matchingTorrents, torrents[i]) } } return matchingTorrents } // findAllDownloadedFilesFromHash finds all files that were with a given hash func (t *TorrentManager) findAllDownloadedFilesFromHash(hash string) []File { var files []File for _, torrent := range t.torrents { if torrent.Hash == hash { for _, file := range torrent.SelectedFiles { if file.Link != "" { files = append(files, file) } } } } return files } type torrentsResponse struct { torrents []realdebrid.Torrent totalCount int } func (t *TorrentManager) getChecksum() string { torrentsChan := make(chan torrentsResponse) countChan := make(chan int) errChan := make(chan error, 2) // accommodate errors from both goroutines // GetTorrents request go func() { torrents, totalCount, err := realdebrid.GetTorrents(t.config.GetToken(), 1) if err != nil { errChan <- err return } torrentsChan <- torrentsResponse{torrents: torrents, totalCount: totalCount} }() // GetActiveTorrentCount request go func() { count, err := realdebrid.GetActiveTorrentCount(t.config.GetToken()) if err != nil { errChan <- err return } countChan <- count.DownloadingCount }() var torrents []realdebrid.Torrent var totalCount, count int for i := 0; i < 2; i++ { select { case torrentsResp := <-torrentsChan: torrents = torrentsResp.torrents totalCount = torrentsResp.totalCount case count = <-countChan: case err := <-errChan: log.Printf("Error: %v\n", err) return "" } } if len(torrents) == 0 { log.Println("Huh, no torrents returned") return "" } checksum := fmt.Sprintf("%d%s%d", totalCount, torrents[0].ID, count) return checksum } // startRefreshJob periodically refreshes the torrents func (t *TorrentManager) startRefreshJob() { log.Println("Starting periodic refresh") for { <-time.After(time.Duration(t.config.GetRefreshEverySeconds()) * time.Second) checksum := t.getChecksum() if checksum == t.checksum { continue } t.cache.Purge() newTorrents := t.getFreshListFromAPI() var wg sync.WaitGroup for i := range newTorrents { wg.Add(1) go func(idx int) { defer wg.Done() t.workerPool <- true t.addMoreInfo(&newTorrents[idx]) <-t.workerPool }(i) } wg.Wait() // apply side effects t.torrents = newTorrents t.checksum = t.getChecksum() // log.Println("Checksum changed", t.checksum) if t.config.EnableRepair() { go t.repairAll(&wg) } go t.mapToDirectories() } } // getFreshListFromAPI returns all torrents func (t *TorrentManager) getFreshListFromAPI() []Torrent { torrents, _, err := realdebrid.GetTorrents(t.config.GetToken(), 0) if err != nil { log.Printf("Cannot get torrents: %v\n", err) return nil } // convert to own internal type without SelectedFiles yet // populate inProgress var torrentsV2 []Torrent t.inProgress = t.inProgress[:0] // reset for _, torrent := range torrents { torrent.Name = strings.TrimSuffix(torrent.Name, "/") torrentV2 := Torrent{ Torrent: torrent, SelectedFiles: nil, } torrentsV2 = append(torrentsV2, torrentV2) if torrent.Progress != 100 { t.inProgress = append(t.inProgress, torrent.Hash) } } log.Printf("Fetched %d torrents", len(torrentsV2)) return torrentsV2 } // addMoreInfo updates the selected files for a torrent func (t *TorrentManager) addMoreInfo(torrent *Torrent) { // file cache torrentFromFile := t.readFromFile(torrent.ID) if torrentFromFile != nil { // see if api data and file data still match // then it means data is still usable if len(torrentFromFile.Links) == len(torrent.Links) { torrent.Name = getName(torrentFromFile) torrent.ForRepair = torrentFromFile.ForRepair torrent.SelectedFiles = torrentFromFile.SelectedFiles[:] return } } // no file data yet as it is still downloading if torrent.Progress != 100 { return } // log.Println("Getting info for", torrent.ID) info, err := realdebrid.GetTorrentInfo(t.config.GetToken(), torrent.ID) if err != nil { log.Printf("Cannot get info: %v\n", err) return } // SelectedFiles is a subset of Files with only the selected ones // it also has a Link field, which can be empty // if it is empty, it means the file is no longer available // Files+Links together are the same as SelectedFiles var selectedFiles []File var streamableFiles []File // if some Links are empty, we need to repair it forRepair := false for _, file := range info.Files { if isStreamable(file.Path) { streamableFiles = append(streamableFiles, File{ File: file, Link: "", }) } if file.Selected == 0 { continue } selectedFiles = append(selectedFiles, File{ File: file, Link: "", }) } if len(selectedFiles) > len(info.Links) && info.Progress == 100 { log.Printf("Some links has expired for %s, %s: %d selected but only %d link(s)\n", info.ID, info.Name, len(selectedFiles), len(info.Links)) // chaotic file means RD will not output the desired file selection // e.g. even if we select just a single mkv, it will output a rar var isChaotic bool selectedFiles, isChaotic = t.organizeChaos(info, selectedFiles) if isChaotic { log.Println("This torrent is unfixable, it's always returning an unstreamable link, ignoring", info.ID, info.Name) } else { if len(streamableFiles) > 1 { log.Println("Marking for repair", info.ID, info.Name) forRepair = true } else { log.Println("This torrent is unfixable, the lone streamable link has expired, ignoring", info.ID, info.ID) } } } else { // all links are still intact! good! for i, link := range info.Links { selectedFiles[i].Link = link } } // update file cache torrent.OriginalName = info.OriginalName torrent.Name = getName(torrent) if len(selectedFiles) > 0 { // update the torrent with more data! torrent.SelectedFiles = selectedFiles torrent.ForRepair = forRepair t.writeToFile(torrent) } } func getName(torrent *Torrent) string { ret := "" if torrent.OriginalName != "" { ret = torrent.OriginalName } else { ret = torrent.Name } ret = strings.TrimSuffix(ret, ".mkv") ret = strings.TrimSuffix(ret, ".mp4") return ret } // mapToDirectories maps torrents to directories func (t *TorrentManager) mapToDirectories() { // Map torrents to directories switch t.config.GetVersion() { case "v1": configV1 := t.config.(*config.ZurgConfigV1) groupMap := configV1.GetGroupMap() // for every group, iterate over every torrent // and then sprinkle/distribute the torrents to the directories of the group for group, directories := range groupMap { log.Printf("Processing directory group '%s', sequence: %s\n", group, strings.Join(directories, " > ")) counter := make(map[string]int) for i := range t.torrents { // don't process torrents that are already mapped if it is not the first run alreadyMappedToGroup := false for _, mappedGroup := range t.processedTorrents[t.torrents[i].Name] { if mappedGroup == group { alreadyMappedToGroup = true } } if alreadyMappedToGroup { continue } for _, directory := range directories { var filenames []string for _, file := range t.torrents[i].Files { filenames = append(filenames, file.Path) } if configV1.MeetsConditions(directory, t.torrents[i].ID, t.torrents[i].Name, filenames) { found := false // check if it is already mapped to this directory for _, dir := range t.TorrentDirectoriesMap[t.torrents[i].Name] { if dir == directory { found = true break // it is already mapped to this directory } } if !found { counter[directory]++ t.TorrentDirectoriesMap[t.torrents[i].Name] = append(t.TorrentDirectoriesMap[t.torrents[i].Name], directory) break // we found a directory for this torrent, so we can stop looking for more } } } t.processedTorrents[t.torrents[i].Name] = append(t.processedTorrents[t.torrents[i].Name], group) } sum := 0 for _, count := range counter { sum += count } if sum > 0 { log.Printf("Directory group processed: %s %v %d\n", group, counter, sum) } else { log.Println("No new additions to directory group", group) } } default: log.Println("Unknown config version") } log.Println("Finished mapping to directories") } // getByID returns a torrent by its ID func (t *TorrentManager) getByID(torrentID string) *Torrent { for i := range t.torrents { if t.torrents[i].ID == torrentID { return &t.torrents[i] } } return nil } // writeToFile writes a torrent to a file func (t *TorrentManager) writeToFile(torrent *Torrent) { filePath := fmt.Sprintf("data/%s.bin", torrent.ID) file, err := os.Create(filePath) if err != nil { log.Fatalf("Failed creating file: %s", err) return } defer file.Close() torrent.Version = t.requiredVersion dataEncoder := gob.NewEncoder(file) dataEncoder.Encode(torrent) } // readFromFile reads a torrent from a file func (t *TorrentManager) readFromFile(torrentID string) *Torrent { filePath := fmt.Sprintf("data/%s.bin", torrentID) fileInfo, err := os.Stat(filePath) if err != nil { return nil } if time.Since(fileInfo.ModTime()) > time.Duration(t.config.GetCacheTimeHours())*time.Hour { return nil } file, err := os.Open(filePath) if err != nil { return nil } defer file.Close() var torrent Torrent dataDecoder := gob.NewDecoder(file) err = dataDecoder.Decode(&torrent) if err != nil { log.Fatalf("Failed decoding file: %s", err) return nil } if torrent.Version != t.requiredVersion { return nil } return &torrent } func (t *TorrentManager) repairAll(wg *sync.WaitGroup) { wg.Wait() for _, torrent := range t.torrents { if torrent.ForRepair { log.Println("Issues were detected on", torrent.Name, "; fixing...") t.repair(torrent.ID, torrent.SelectedFiles, true) } if len(torrent.Links) == 0 && torrent.Progress == 100 { // If the torrent has no links // and already processing repair // delete it! log.Println("Deleting", torrent.Name, "as it has no links") realdebrid.DeleteTorrent(t.config.GetToken(), torrent.ID) } } } func (t *TorrentManager) repair(torrentID string, selectedFiles []File, tryReinsertionFirst bool) { torrent := t.getByID(torrentID) if torrent == nil { return } // check if it is already "being" repaired found := false for _, hash := range t.inProgress { if hash == torrent.Hash { found = true break } } if found { log.Println("Repair in progress, skipping", torrentID) return } // check if it is already repaired foundFiles := t.findAllDownloadedFilesFromHash(torrent.Hash) var missingFiles []File for _, sFile := range selectedFiles { if sFile.Link == "" || sFile.Unavailable { found := false for _, fFile := range foundFiles { // same file but different link, then yes it has been repaired if sFile.Path == fFile.Path && sFile.Link != fFile.Link { found = true break } } if !found { missingFiles = append(missingFiles, sFile) } } } if len(missingFiles) == 0 { log.Println(torrent.Name, "is already repaired") return } // then we repair it! log.Println("Repairing torrent", torrentID) // check if we can still add more downloads proceed := t.canCapacityHandle() if !proceed { log.Println("Cannot add more torrents, exiting") return } // first solution: add the same selection, maybe it can be fixed by reinsertion? success := false if tryReinsertionFirst { success = t.reinsertTorrent(torrent, "", true) } if !success { // if all the selected files are missing but there are other streamable files var otherStreamableFileIDs []int for _, file := range torrent.Files { found := false for _, selectedFile := range selectedFiles { if selectedFile.ID == file.ID { found = true break } } if !found && isStreamable(file.Path) { otherStreamableFileIDs = append(otherStreamableFileIDs, file.ID) } } if (len(missingFiles) == len(selectedFiles) || len(missingFiles) == 1) && len(otherStreamableFileIDs) > 0 { // we will download 1 extra streamable file to force a redownload of the missing files // or if there's only 1 missing file, we will download 1 more to prevent a rename missingFilesPlus1 := strings.Join(getFileIDs(missingFiles), ",") missingFilesPlus1 += fmt.Sprintf(",%d", otherStreamableFileIDs[0]) log.Println("Trying to reinsert with 1 extra file", missingFilesPlus1) t.reinsertTorrent(torrent, missingFilesPlus1, false) } else if len(selectedFiles) > 1 { // if not, last resort: add only the missing files but do it in 2 batches half := len(missingFiles) / 2 missingFiles1 := strings.Join(getFileIDs(missingFiles[:half]), ",") missingFiles2 := strings.Join(getFileIDs(missingFiles[half:]), ",") if missingFiles1 != "" { log.Println("Trying to reinsert with 1/2 batches", missingFiles1) t.reinsertTorrent(torrent, missingFiles1, false) } if missingFiles2 != "" { log.Println("Trying to reinsert with 2/2 batches", missingFiles2) t.reinsertTorrent(torrent, missingFiles2, false) } } else { log.Println("Cannot repair", torrent.Name) return } log.Println("Waiting for downloads to finish") } } func (t *TorrentManager) reinsertTorrent(torrent *Torrent, missingFiles string, deleteIfFailed bool) bool { // if missingFiles is not provided, look for missing files if missingFiles == "" { log.Println("Reinserting whole torrent", torrent.Name) var selection string for _, file := range torrent.SelectedFiles { selection += fmt.Sprintf("%d,", file.ID) } if selection == "" { return false } if len(selection) > 0 { missingFiles = selection[:len(selection)-1] } } else { log.Printf("Reinserting %d missing files for %s", len(strings.Split(missingFiles, ",")), torrent.Name) } // reinsert torrent resp, err := realdebrid.AddMagnetHash(t.config.GetToken(), torrent.Hash) if err != nil { log.Printf("Cannot reinsert torrent: %v\n", err) return false } newTorrentID := resp.ID err = realdebrid.SelectTorrentFiles(t.config.GetToken(), newTorrentID, missingFiles) if err != nil { log.Printf("Cannot select files on reinserted torrent: %v\n", err) } if deleteIfFailed { if err != nil { realdebrid.DeleteTorrent(t.config.GetToken(), newTorrentID) return false } time.Sleep(1 * time.Second) // see if the torrent is ready info, err := realdebrid.GetTorrentInfo(t.config.GetToken(), newTorrentID) if err != nil { log.Printf("Cannot get info on reinserted torrent: %v\n", err) if deleteIfFailed { realdebrid.DeleteTorrent(t.config.GetToken(), newTorrentID) } return false } time.Sleep(1 * time.Second) if info.Progress != 100 { log.Printf("Torrent is not cached anymore, %d%%\n", info.Progress) realdebrid.DeleteTorrent(t.config.GetToken(), newTorrentID) return false } if len(info.Links) != len(torrent.SelectedFiles) { log.Printf("It doesn't fix the problem, got %d links but we need %d\n", len(info.Links), len(torrent.SelectedFiles)) realdebrid.DeleteTorrent(t.config.GetToken(), newTorrentID) return false } log.Println("Reinsertion successful, deleting old torrent") realdebrid.DeleteTorrent(t.config.GetToken(), torrent.ID) } return true } func (t *TorrentManager) organizeChaos(info *realdebrid.Torrent, selectedFiles []File) ([]File, bool) { type Result struct { Response *realdebrid.UnrestrictResponse } resultsChan := make(chan Result, len(info.Links)) var wg sync.WaitGroup // Limit concurrency sem := make(chan struct{}, t.config.GetNumOfWorkers()) for _, link := range info.Links { wg.Add(1) sem <- struct{}{} go func(lnk string) { defer wg.Done() defer func() { <-sem }() unrestrictFn := func() (*realdebrid.UnrestrictResponse, error) { return realdebrid.UnrestrictCheck(t.config.GetToken(), lnk) } resp := realdebrid.RetryUntilOk(unrestrictFn) if resp != nil { resultsChan <- Result{Response: resp} } }(link) } go func() { wg.Wait() close(sem) close(resultsChan) }() isChaotic := false for result := range resultsChan { found := false for i := range selectedFiles { if strings.HasSuffix(selectedFiles[i].Path, result.Response.Filename) { selectedFiles[i].Link = result.Response.Link found = true } } if !found { // "chaos" file, we don't know where it belongs isChaotic = !isStreamable(result.Response.Filename) selectedFiles = append(selectedFiles, File{ File: realdebrid.File{ Path: result.Response.Filename, Bytes: result.Response.Filesize, Selected: 1, }, Link: result.Response.Link, }) } } return selectedFiles, isChaotic } func (t *TorrentManager) canCapacityHandle() bool { // max waiting time is 45 minutes const maxRetries = 50 const baseDelay = 1 * time.Second const maxDelay = 60 * time.Second retryCount := 0 for { count, err := realdebrid.GetActiveTorrentCount(t.config.GetToken()) if err != nil { log.Printf("Cannot get active torrent count: %v\n", err) if retryCount >= maxRetries { log.Println("Max retries reached. Exiting.") return false } delay := time.Duration(math.Pow(2, float64(retryCount))) * baseDelay if delay > maxDelay { delay = maxDelay } time.Sleep(delay) retryCount++ continue } if count.DownloadingCount < count.MaxNumberOfTorrents { log.Printf("We can still add a new torrent, %d/%d\n", count.DownloadingCount, count.MaxNumberOfTorrents) return true } if retryCount >= maxRetries { log.Println("Max retries reached. Exiting.") return false } delay := time.Duration(math.Pow(2, float64(retryCount))) * baseDelay if delay > maxDelay { delay = maxDelay } time.Sleep(delay) retryCount++ } }