package torrent import ( "bufio" "encoding/gob" "fmt" "hash/fnv" "math" "os" "path/filepath" "strings" "sync" "time" "github.com/debridmediamanager.com/zurg/internal/config" "github.com/debridmediamanager.com/zurg/pkg/logutil" "github.com/debridmediamanager.com/zurg/pkg/realdebrid" cmap "github.com/orcaman/concurrent-map/v2" "go.uber.org/zap" ) type TorrentManager struct { cfg config.ConfigInterface DirectoryMap cmap.ConcurrentMap[string, cmap.ConcurrentMap[string, *Torrent]] // directory -> accessKey -> Torrent requiredVersion string checksum string api *realdebrid.RealDebrid workerPool chan bool log *zap.SugaredLogger } // NewTorrentManager creates a new torrent manager // it will fetch all torrents and their info in the background // and store them in-memory and cached in files func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid) *TorrentManager { t := &TorrentManager{ cfg: cfg, DirectoryMap: cmap.New[cmap.ConcurrentMap[string, *Torrent]](), requiredVersion: "10.11.2023", api: api, workerPool: make(chan bool, cfg.GetNumOfWorkers()), log: logutil.NewLogger().Named("manager"), } // create special directory t.DirectoryMap.Set("__all__", cmap.New[*Torrent]()) // key is AccessKey // create directory maps for _, directory := range cfg.GetDirectories() { t.DirectoryMap.Set(directory, cmap.New[*Torrent]()) } newTorrents, _, err := t.api.GetTorrents(0) if err != nil { t.log.Fatalf("Cannot get torrents: %v\n", err) } torrentsChan := make(chan *Torrent, len(newTorrents)) var wg sync.WaitGroup for i := range newTorrents { wg.Add(1) go func(idx int) { defer wg.Done() t.workerPool <- true // TODO wrap getMoreInfo and limit the execution time! torrentsChan <- t.getMoreInfo(newTorrents[idx]) <-t.workerPool }(i) } wg.Wait() close(torrentsChan) t.log.Infof("Fetched info for %d torrents", len(newTorrents)) noInfoCount := 0 allCt := 0 allTorrents, _ := t.DirectoryMap.Get("__all__") for info := range torrentsChan { allCt++ if info == nil { noInfoCount++ continue } if torrent, exists := allTorrents.Get(info.AccessKey); exists { mainTorrent := t.mergeToMain(torrent, info) allTorrents.Set(info.AccessKey, mainTorrent) } else { allTorrents.Set(info.AccessKey, info) } } anotherCt := 0 allTorrents.IterCb(func(accessKey string, torrent *Torrent) { anotherCt++ // get IDs var torrentIDs []string for _, instance := range torrent.Instances { torrentIDs = append(torrentIDs, instance.ID) } // get filenames filenames := torrent.SelectedFiles.Keys() // Map torrents to directories switch t.cfg.GetVersion() { case "v1": configV1 := t.cfg.(*config.ZurgConfigV1) for _, directories := range configV1.GetGroupMap() { for _, directory := range directories { if t.cfg.MeetsConditions(directory, torrent.AccessKey, torrentIDs, filenames) { torrents, _ := t.DirectoryMap.Get(directory) torrents.Set(accessKey, torrent) break } } } } }) t.log.Infof("Compiled into %d torrents, %d were missing info", allTorrents.Count(), noInfoCount) t.checksum = t.getChecksum() // if t.config.EnableRepair() { // go t.repairAll() // } go t.startRefreshJob() return t } func (t *TorrentManager) mergeToMain(t1, t2 *Torrent) *Torrent { mainTorrent := t1 // Merge SelectedFiles - itercb accesses a different copy of the selectedfiles map t2.SelectedFiles.IterCb(func(key string, file *File) { // see if it already exists in the main torrent if mainFile, ok := mainTorrent.SelectedFiles.Get(key); !ok { mainTorrent.SelectedFiles.Set(key, file) } else if file.Link != "" && mainFile.Link == "" { // if it exists, but the link is empty, then we can update it mainTorrent.SelectedFiles.Set(key, file) } }) // Merge Instances mainTorrent.Instances = append(t1.Instances, t2.Instances...) // LatestAdded if t1.LatestAdded < t2.LatestAdded { mainTorrent.LatestAdded = t2.LatestAdded } // InProgress - if one of the instances is in progress, then the whole torrent is in progress mainTorrent.InProgress = false for _, instance := range mainTorrent.Instances { if instance.Progress != 100 { mainTorrent.InProgress = true } if instance.ForRepair { mainTorrent.ForRepair = true } } return mainTorrent } // proxy func (t *TorrentManager) UnrestrictUntilOk(link string) *realdebrid.UnrestrictResponse { t.workerPool <- true ret := t.api.UnrestrictUntilOk(link) <-t.workerPool return ret } type torrentsResponse struct { torrents []realdebrid.Torrent totalCount int } // generates a checksum based on the number of torrents, the first torrent id and the number of active torrents func (t *TorrentManager) getChecksum() string { torrentsChan := make(chan torrentsResponse, 1) countChan := make(chan int, 1) errChan := make(chan error, 2) // accommodate errors from both goroutines // GetTorrents request go func() { torrents, totalCount, err := t.api.GetTorrents(1) if err != nil { errChan <- err return } torrentsChan <- torrentsResponse{torrents: torrents, totalCount: totalCount} }() // GetActiveTorrentCount request go func() { count, err := t.api.GetActiveTorrentCount() if err != nil { errChan <- err return } countChan <- count.DownloadingCount }() var torrents []realdebrid.Torrent var totalCount, count int for i := 0; i < 2; i++ { select { case torrentsResp := <-torrentsChan: torrents = torrentsResp.torrents totalCount = torrentsResp.totalCount case count = <-countChan: case err := <-errChan: t.log.Warnf("Checksum API Error: %v\n", err) return "" } } if len(torrents) == 0 { t.log.Error("Huh, no torrents returned") return "" } checksum := fmt.Sprintf("%d%s%d", totalCount, torrents[0].ID, count) return checksum } // startRefreshJob periodically refreshes the torrents func (t *TorrentManager) startRefreshJob() { t.log.Info("Starting periodic refresh") for { <-time.After(time.Duration(t.cfg.GetRefreshEverySeconds()) * time.Second) checksum := t.getChecksum() if checksum == t.checksum { continue } newTorrents, _, err := t.api.GetTorrents(0) if err != nil { t.log.Warnf("Cannot get torrents: %v\n", err) continue } t.log.Infof("Detected changes! Refreshing %d torrents", len(newTorrents)) torrentsChan := make(chan *Torrent, len(newTorrents)) var wg sync.WaitGroup for i := range newTorrents { wg.Add(1) go func(idx int) { defer wg.Done() t.workerPool <- true torrentsChan <- t.getMoreInfo(newTorrents[idx]) <-t.workerPool }(i) } wg.Wait() close(torrentsChan) t.log.Infof("Fetched info for %d torrents", len(newTorrents)) noInfoCount := 0 allTorrents, _ := t.DirectoryMap.Get("__all__") var retain []string for info := range torrentsChan { if info == nil { noInfoCount++ continue } retain = append(retain, info.AccessKey) if torrent, exists := allTorrents.Get(info.AccessKey); exists { mainTorrent := t.mergeToMain(torrent, info) allTorrents.Set(info.AccessKey, mainTorrent) } else { allTorrents.Set(info.AccessKey, info) } } allTorrents.IterCb(func(accessKey string, torrent *Torrent) { // get IDs var torrentIDs []string for _, instance := range torrent.Instances { torrentIDs = append(torrentIDs, instance.ID) } // get filenames var filenames []string torrent.SelectedFiles.IterCb(func(_ string, file *File) { filenames = append(filenames, file.Path) }) // Map torrents to directories switch t.cfg.GetVersion() { case "v1": configV1 := t.cfg.(*config.ZurgConfigV1) for _, directories := range configV1.GetGroupMap() { for _, directory := range directories { if t.cfg.MeetsConditions(directory, torrent.AccessKey, torrentIDs, filenames) { torrents, _ := t.DirectoryMap.Get(directory) torrents.Set(accessKey, torrent) break } } } } }) // delete torrents that no longer exist var toDelete []string allTorrents.IterCb(func(_ string, torrent *Torrent) { found := false for _, accessKey := range retain { if torrent.AccessKey == accessKey { found = true break } } if !found { toDelete = append(toDelete, torrent.AccessKey) } }) for _, accessKey := range toDelete { t.DirectoryMap.IterCb(func(_ string, torrents cmap.ConcurrentMap[string, *Torrent]) { torrents.Remove(accessKey) }) } // end delete torrents that no longer exist t.log.Infof("Compiled into %d torrents, %d were missing info", allTorrents.Count(), noInfoCount) t.checksum = t.getChecksum() // if t.config.EnableRepair() { // go t.repairAll() // } go OnLibraryUpdateHook(t.cfg) } } // getMoreInfo gets original name, size and files for a torrent func (t *TorrentManager) getMoreInfo(rdTorrent realdebrid.Torrent) *Torrent { var info *realdebrid.TorrentInfo var err error // file cache torrentFromFile := t.readFromFile(rdTorrent.ID) if torrentFromFile != nil && len(torrentFromFile.ID) > 0 && len(torrentFromFile.Links) == len(rdTorrent.Links) { if (len(torrentFromFile.Links) > 0 && torrentFromFile.Links[0] == rdTorrent.Links[0]) || len(torrentFromFile.Links) == 0 { // see if api data and file data still match // then it means data is still usable info = torrentFromFile info.Progress = rdTorrent.Progress } } if info == nil { info, err = t.api.GetTorrentInfo(rdTorrent.ID) if err != nil { t.log.Warnf("Cannot get info for id=%s: %v\n", rdTorrent.ID, err) return nil } } // SelectedFiles is a subset of Files with only the selected ones // it also has a Link field, which can be empty // if it is empty, it means the file is no longer available // Files+Links together are the same as SelectedFiles var selectedFiles []*File streamableCount := 0 // if some Links are empty, we need to repair it forRepair := false for _, file := range info.Files { if isStreamable(file.Path) { streamableCount++ } if file.Selected == 0 { continue } selectedFiles = append(selectedFiles, &File{ File: file, Added: info.Added, Link: "", // no link yet ZurgFS: hashStringToFh(file.Path + info.Hash), }) } if len(selectedFiles) > len(info.Links) && info.Progress == 100 { // chaotic file means RD will not output the desired file selection // e.g. even if we select just a single mkv, it will output a rar var isChaotic bool selectedFiles, isChaotic = t.organizeChaos(info.Links, selectedFiles) if isChaotic { t.log.Warnf("Torrent id=%s %s is unplayable; it is always returning a rar file (it will no longer show up in your directories)", info.ID, info.Name) // t.log.Debugf("You can try fixing it yourself magnet:?xt=urn:btih:%s", info.Hash) return nil } else { if streamableCount > 1 && t.cfg.EnableRepair() { // case for repair 1: it's missing some links (or all links) // if we download it as is, we might get the same file over and over again // so we need to redownload it with other files selected // that is why we check if there are other streamable files t.log.Infof("Torrent id=%s %s marked for repair", info.ID, info.Name) forRepair = true } else if streamableCount == 1 { t.log.Warnf("Torrent id=%s %s is unplayable; the lone streamable link has expired (it will no longer show up in your directories)", info.ID, info.Name) // t.log.Debugf("You can try fixing it yourself magnet:?xt=urn:btih:%s", info.Hash) return nil } } } else if len(selectedFiles) == len(info.Links) { // all links are still intact! good! // side note: iteration works! for i, file := range selectedFiles { file.Link = info.Links[i] i++ } } info.ForRepair = forRepair torrent := Torrent{ AccessKey: t.getName(info.Name, info.OriginalName), LatestAdded: info.Added, InProgress: info.Progress != 100, Instances: []realdebrid.TorrentInfo{*info}, } torrent.SelectedFiles = cmap.New[*File]() for _, file := range selectedFiles { torrent.SelectedFiles.Set(filepath.Base(file.Path), file) } if len(selectedFiles) > 0 && torrentFromFile == nil { t.writeToFile(info) // only when there are selected files, else it's useless } return &torrent } func hashStringToFh(s string) (fh uint64) { hasher := fnv.New64a() _, _ = hasher.Write([]byte(s)) // Write the string to the hasher; ignoring error as it never returns an error return hasher.Sum64() // Returns a 64-bit hash value } func (t *TorrentManager) getName(name, originalName string) string { // drop the extension from the name if t.cfg.EnableRetainFolderNameExtension() && strings.Contains(name, originalName) { return name } else { ret := strings.TrimSuffix(originalName, ".mp4") ret = strings.TrimSuffix(ret, ".mkv") return ret } } func (t *TorrentManager) writeToFile(torrent *realdebrid.TorrentInfo) error { filePath := "data/" + torrent.ID + ".bin" file, err := os.Create(filePath) if err != nil { return fmt.Errorf("failed creating file: %w", err) } defer file.Close() w := bufio.NewWriter(file) defer w.Flush() torrent.Version = t.requiredVersion dataEncoder := gob.NewEncoder(w) if err := dataEncoder.Encode(torrent); err != nil { return fmt.Errorf("failed encoding torrent: %w", err) } return nil } func (t *TorrentManager) readFromFile(torrentID string) *realdebrid.TorrentInfo { filePath := "data/" + torrentID + ".bin" file, err := os.Open(filePath) if err != nil { if os.IsNotExist(err) { return nil } return nil } defer file.Close() r := bufio.NewReader(file) var torrent realdebrid.TorrentInfo dataDecoder := gob.NewDecoder(r) if err := dataDecoder.Decode(&torrent); err != nil { return nil } if torrent.Version != t.requiredVersion { return nil } return &torrent } func (t *TorrentManager) organizeChaos(links []string, selectedFiles []*File) ([]*File, bool) { type Result struct { Response *realdebrid.UnrestrictResponse } resultsChan := make(chan Result, len(links)) var wg sync.WaitGroup for _, link := range links { wg.Add(1) go func(lnk string) { defer wg.Done() t.workerPool <- true resp := t.api.UnrestrictUntilOk(lnk) <-t.workerPool resultsChan <- Result{Response: resp} }(link) } go func() { wg.Wait() close(resultsChan) }() isChaotic := false for result := range resultsChan { if result.Response == nil { continue } found := false for _, file := range selectedFiles { if strings.Contains(file.Path, result.Response.Filename) { file.Link = result.Response.Link found = true } } if !found { if result.Response.Streamable == 1 { selectedFiles = append(selectedFiles, &File{ File: realdebrid.File{ ID: math.MaxInt32, Path: result.Response.Filename, Bytes: result.Response.Filesize, Selected: 1, }, Added: time.Now().Format(time.RFC3339), Link: result.Response.Link, ZurgFS: hashStringToFh(result.Response.Filename), }) } else { isChaotic = true } } } return selectedFiles, isChaotic } // func (t *TorrentManager) repairAll() { // t.log.Info("Checking for torrents to repair") // // side note: iteration works! // for el := t.TorrentMap.Front(); el != nil; el = el.Next() { // torrent := el.Value // // do not repair if in progress // if torrent.InProgress { // continue // } // // do not repair if all files have links // forRepair := false // for el2 := torrent.SelectedFiles.Front(); el2 != nil; el2 = el2.Next() { // file := el2.Value // if file.Link == "" { // forRepair = true // break // } // } // if !forRepair { // // if it was marked for repair, unmark it // torrent.ForRepair = false // t.mu.Lock() // t.TorrentMap.Set(torrent.AccessKey, torrent) // t.mu.Unlock() // continue // } // // when getting info, we mark it for repair if it's missing some links // if torrent.ForRepair { // t.log.Infof("Found torrent for repair: %s", torrent.AccessKey) // t.Repair(torrent.AccessKey) // break // only repair the first one for repair and then move on // } // } // } // func (t *TorrentManager) Repair(accessKey string) { // if lastRepair, ok := t.repairMap.Get(accessKey); ok { // if time.Since(lastRepair) < time.Duration(24*time.Hour) { // magic number: 24 hrs // return // } // } // t.mu.Lock() // t.repairMap.Set(accessKey, time.Now()) // t.mu.Unlock() // if !t.config.EnableRepair() { // t.log.Warn("Repair is disabled; if you do not have other zurg instances running, you should enable repair") // return // } // torrent, _ := t.TorrentMap.Get(accessKey) // if torrent == nil { // t.log.Warnf("Cannot find torrent %s anymore to repair it", accessKey) // return // } // if torrent.InProgress { // t.log.Infof("Torrent %s is in progress, cannot repair", torrent.AccessKey) // return // } // // check if we can still add more downloads // proceed := t.canCapacityHandle() // if !proceed { // t.log.Error("Cannot add more torrents, ignoring repair request") // return // } // // make the file messy // var links []string // for el := torrent.SelectedFiles.Front(); el != nil; el = el.Next() { // file := el.Value // if file.Link != "" { // links = append(links, file.Link) // } // file.Link = "" // } // selectedFiles, _ := t.organizeChaos(links, torrent.SelectedFiles) // torrent.SelectedFiles = selectedFiles // t.mu.Lock() // t.TorrentMap.Set(torrent.AccessKey, torrent) // t.mu.Unlock() // // first solution: add the same selection, maybe it can be fixed by reinsertion? // if t.reinsertTorrent(torrent, "") { // t.log.Infof("Successfully downloaded torrent %s to repair it", torrent.AccessKey) // return // } // // if all the selected files are missing but there are other streamable files // var missingFiles []File // for el := torrent.SelectedFiles.Front(); el != nil; el = el.Next() { // file := el.Value // if file.Link == "" { // missingFiles = append(missingFiles, *file) // } // } // if len(missingFiles) > 0 { // t.log.Infof("Redownloading %d missing files for torrent %s", len(missingFiles), torrent.AccessKey) // // if not, last resort: add only the missing files but do it in 2 batches // half := len(missingFiles) / 2 // missingFiles1 := strings.Join(getFileIDs(missingFiles[:half]), ",") // missingFiles2 := strings.Join(getFileIDs(missingFiles[half:]), ",") // if missingFiles1 != "" { // t.reinsertTorrent(torrent, missingFiles1) // } // if missingFiles2 != "" { // t.reinsertTorrent(torrent, missingFiles2) // } // } // } // func (t *TorrentManager) reinsertTorrent(torrent *Torrent, missingFiles string) bool { // // if missingFiles is not provided, look for missing files // if missingFiles == "" { // var tmpSelection string // for el := torrent.SelectedFiles.Front(); el != nil; el = el.Next() { // file := el.Value // tmpSelection += fmt.Sprintf("%d,", file.ID) // } // if tmpSelection == "" { // return false // } // if len(tmpSelection) > 0 { // missingFiles = tmpSelection[:len(tmpSelection)-1] // } // } // // redownload torrent // resp, err := t.api.AddMagnetHash(torrent.Instances[0].Hash) // if err != nil { // t.log.Warnf("Cannot redownload torrent: %v", err) // return false // } // time.Sleep(1 * time.Second) // // select files // newTorrentID := resp.ID // err = t.api.SelectTorrentFiles(newTorrentID, missingFiles) // if err != nil { // t.log.Warnf("Cannot start redownloading: %v", err) // t.api.DeleteTorrent(newTorrentID) // return false // } // time.Sleep(10 * time.Second) // // see if the torrent is ready // info, err := t.api.GetTorrentInfo(newTorrentID) // if err != nil { // t.log.Warnf("Cannot get info on redownloaded torrent id=%s : %v", newTorrentID, err) // t.api.DeleteTorrent(newTorrentID) // return false // } // if info.Status == "magnet_error" || info.Status == "error" || info.Status == "virus" || info.Status == "dead" { // t.log.Warnf("The redownloaded torrent id=%s is in error state: %s", newTorrentID, info.Status) // t.api.DeleteTorrent(newTorrentID) // return false // } // if info.Progress != 100 { // t.log.Infof("Torrent id=%s is not cached anymore so we have to wait until completion (this should fix the issue already)", info.ID) // return true // } // missingCount := len(strings.Split(missingFiles, ",")) // if len(info.Links) != missingCount { // t.log.Infof("It did not fix the issue for id=%s, only got %d files but we need %d, undoing", info.ID, len(info.Links), missingCount) // t.api.DeleteTorrent(newTorrentID) // return false // } // t.log.Infof("Repair successful id=%s", newTorrentID) // return true // } // func (t *TorrentManager) canCapacityHandle() bool { // // max waiting time is 45 minutes // const maxRetries = 50 // const baseDelay = 1 * time.Second // const maxDelay = 60 * time.Second // retryCount := 0 // for { // count, err := t.api.GetActiveTorrentCount() // if err != nil { // t.log.Warnf("Cannot get active downloads count: %v", err) // if retryCount >= maxRetries { // t.log.Error("Max retries reached. Exiting.") // return false // } // delay := time.Duration(math.Pow(2, float64(retryCount))) * baseDelay // if delay > maxDelay { // delay = maxDelay // } // time.Sleep(delay) // retryCount++ // continue // } // if count.DownloadingCount < count.MaxNumberOfTorrents { // t.log.Infof("We can still add a new torrent, we have capacity for %d more", count.MaxNumberOfTorrents-count.DownloadingCount) // return true // } // if retryCount >= maxRetries { // t.log.Error("Max retries reached, exiting") // return false // } // delay := time.Duration(math.Pow(2, float64(retryCount))) * baseDelay // if delay > maxDelay { // delay = maxDelay // } // time.Sleep(delay) // retryCount++ // } // }