Files
zurg/internal/torrent/manager.go
2023-11-10 19:03:07 +01:00

699 lines
20 KiB
Go

package torrent
import (
"encoding/gob"
"fmt"
"math"
"os"
"path/filepath"
"strings"
"sync"
"time"
"github.com/debridmediamanager.com/zurg/internal/config"
"github.com/debridmediamanager.com/zurg/pkg/logutil"
"github.com/debridmediamanager.com/zurg/pkg/realdebrid"
"github.com/elliotchance/orderedmap/v2"
"github.com/nutsdb/nutsdb"
"go.uber.org/zap"
)
type TorrentManager struct {
TorrentMap *orderedmap.OrderedMap[string, *Torrent] // accessKey -> Torrent
requiredVersion string
rd *realdebrid.RealDebrid
checksum string
config config.ConfigInterface
db *nutsdb.DB
workerPool chan bool
directoryMap map[string][]string
processedTorrents map[string][]string
mu *sync.Mutex
log *zap.SugaredLogger
}
// NewTorrentManager creates a new torrent manager
// it will fetch all torrents and their info in the background
// and store them in-memory and cached in files
func NewTorrentManager(config config.ConfigInterface, db *nutsdb.DB) *TorrentManager {
t := &TorrentManager{
TorrentMap: orderedmap.NewOrderedMap[string, *Torrent](),
requiredVersion: fmt.Sprintf("10.11.2023/retain=%t", config.EnableRetainFolderNameExtension()),
rd: realdebrid.NewRealDebrid(config.GetToken(), logutil.NewLogger().Named("realdebrid")),
config: config,
db: db,
workerPool: make(chan bool, config.GetNumOfWorkers()),
directoryMap: make(map[string][]string),
processedTorrents: make(map[string][]string),
mu: &sync.Mutex{},
log: logutil.NewLogger().Named("manager"),
}
// start with a clean slate
t.mu.Lock()
newTorrents, _, err := t.rd.GetTorrents(0)
if err != nil {
t.log.Fatalf("Cannot get torrents: %v\n", err)
}
torrentsChan := make(chan *Torrent, len(newTorrents))
var wg sync.WaitGroup
for i := range newTorrents {
wg.Add(1)
go func(idx int) {
defer wg.Done()
t.workerPool <- true
torrentsChan <- t.getMoreInfo(newTorrents[idx])
<-t.workerPool
}(i)
}
wg.Wait()
close(torrentsChan)
for newTorrent := range torrentsChan {
if newTorrent == nil {
continue
}
torrent, _ := t.TorrentMap.Get(newTorrent.AccessKey)
if torrent != nil {
t.TorrentMap.Set(newTorrent.AccessKey, t.mergeToMain(torrent, newTorrent))
} else {
t.TorrentMap.Set(newTorrent.AccessKey, newTorrent)
}
}
t.checksum = t.getChecksum()
t.mu.Unlock()
// if t.config.EnableRepair() {
// go t.repairAll()
// }
go t.startRefreshJob()
return t
}
func (t *TorrentManager) mergeToMain(t1, t2 *Torrent) *Torrent {
merged := t1
// Merge SelectedFiles
for el := t2.SelectedFiles.Front(); el != nil; el = el.Next() {
if _, ok := merged.SelectedFiles.Get(el.Key); !ok {
merged.SelectedFiles.Set(el.Key, el.Value)
}
}
// Merge Instances
merged.Instances = append(t1.Instances, t2.Instances...)
// LatestAdded
if t1.LatestAdded < t2.LatestAdded {
merged.LatestAdded = t2.LatestAdded
}
// InProgress - if one of the instances is in progress, then the whole torrent is in progress
for _, instance := range merged.Instances {
if instance.Progress != 100 {
merged.InProgress = true
break
}
}
return merged
}
// proxy
func (t *TorrentManager) UnrestrictUntilOk(link string) *realdebrid.UnrestrictResponse {
return t.rd.UnrestrictUntilOk(link)
}
type torrentsResponse struct {
torrents []realdebrid.Torrent
totalCount int
}
// generates a checksum based on the number of torrents, the first torrent id and the number of active torrents
func (t *TorrentManager) getChecksum() string {
torrentsChan := make(chan torrentsResponse)
countChan := make(chan int)
errChan := make(chan error, 2) // accommodate errors from both goroutines
// GetTorrents request
go func() {
torrents, totalCount, err := t.rd.GetTorrents(1)
if err != nil {
errChan <- err
return
}
torrentsChan <- torrentsResponse{torrents: torrents, totalCount: totalCount}
}()
// GetActiveTorrentCount request
go func() {
count, err := t.rd.GetActiveTorrentCount()
if err != nil {
errChan <- err
return
}
countChan <- count.DownloadingCount
}()
var torrents []realdebrid.Torrent
var totalCount, count int
for i := 0; i < 2; i++ {
select {
case torrentsResp := <-torrentsChan:
torrents = torrentsResp.torrents
totalCount = torrentsResp.totalCount
case count = <-countChan:
case err := <-errChan:
t.log.Errorf("Checksum API Error: %v\n", err)
return ""
}
}
if len(torrents) == 0 {
t.log.Error("Huh, no torrents returned")
return ""
}
checksum := fmt.Sprintf("%d%s%d", totalCount, torrents[0].ID, count)
return checksum
}
// startRefreshJob periodically refreshes the torrents
func (t *TorrentManager) startRefreshJob() {
t.log.Info("Starting periodic refresh")
for {
<-time.After(time.Duration(t.config.GetRefreshEverySeconds()) * time.Second)
checksum := t.getChecksum()
if checksum == t.checksum {
continue
}
t.mu.Lock()
newTorrents, _, err := t.rd.GetTorrents(0)
if err != nil {
t.log.Errorf("Cannot get torrents: %v\n", err)
continue
}
torrentsChan := make(chan *Torrent)
var wg sync.WaitGroup
for i := range newTorrents {
wg.Add(1)
go func(idx int) {
defer wg.Done()
t.workerPool <- true
torrentsChan <- t.getMoreInfo(newTorrents[idx])
<-t.workerPool
}(i)
}
// deletes
// for el := t.TorrentMap.Front(); el != nil; el = el.Next() {
// found := false
// for _, newTorrent := range newTorrents {
// if newTorrent.ID == el.Value.AccessKey {
// found = true
// break
// }
// }
// if !found {
// t.log.Infof("Torrent id=%s is no longer found", accessKey)
// t.TorrentMap.Delete(accessKey)
// }
// }
wg.Wait()
close(torrentsChan)
for newTorrent := range torrentsChan {
if newTorrent == nil {
continue
}
torrent, _ := t.TorrentMap.Get(newTorrent.AccessKey)
if torrent != nil {
t.TorrentMap.Set(newTorrent.AccessKey, t.mergeToMain(torrent, newTorrent))
} else {
t.TorrentMap.Set(newTorrent.AccessKey, newTorrent)
}
}
t.checksum = t.getChecksum()
t.mu.Unlock()
// if t.config.EnableRepair() {
// go t.repairAll()
// }
go OnLibraryUpdateHook(t.config)
}
}
// getMoreInfo gets original name, size and files for a torrent
func (t *TorrentManager) getMoreInfo(rdTorrent realdebrid.Torrent) *Torrent {
var info *realdebrid.TorrentInfo
var err error
// file cache
torrentFromFile := t.readFromFile(rdTorrent.ID)
if torrentFromFile != nil && len(torrentFromFile.ID) > 0 && len(torrentFromFile.Links) == len(rdTorrent.Links) {
// see if api data and file data still match
// then it means data is still usable
info = torrentFromFile
}
if info == nil {
info, err = t.rd.GetTorrentInfo(rdTorrent.ID)
if err != nil {
t.log.Errorf("Cannot get info for id=%s: %v\n", rdTorrent.ID, err)
return nil
}
}
// SelectedFiles is a subset of Files with only the selected ones
// it also has a Link field, which can be empty
// if it is empty, it means the file is no longer available
// Files+Links together are the same as SelectedFiles
selectedFiles := orderedmap.NewOrderedMap[string, *File]()
streamableCount := 0
// if some Links are empty, we need to repair it
forRepair := false
for _, file := range info.Files {
if isStreamable(file.Path) {
streamableCount++
}
if file.Selected == 0 {
continue
}
selectedFiles.Set(filepath.Base(file.Path), &File{
File: file,
Link: "", // no link yet
})
}
if selectedFiles.Len() > len(info.Links) && info.Progress == 100 {
t.log.Debugf("Some links has expired for %s %s: %d selected but only %d link(s)", info.ID, info.Name, selectedFiles.Len(), len(info.Links))
// chaotic file means RD will not output the desired file selection
// e.g. even if we select just a single mkv, it will output a rar
var isChaotic bool
selectedFiles, isChaotic = t.organizeChaos(&rdTorrent, selectedFiles)
if isChaotic && selectedFiles.Len() == 1 {
t.log.Infof("Torrent %s %s is unfixable, it's always returning an unstreamable link, ignoring", info.ID, info.Name)
t.log.Debugf("You can try fixing it yourself magnet:?xt=urn:btih:%s", info.Hash)
} else {
if streamableCount > 1 {
// case for repair 1: it's missing some links (or all links)
// if we download it as is, we might get the same file over and over again
// so we need to redownload it with other files selected
// that is why we check if there are other streamable files
t.log.Infof("Torrent %s %s marked for repair", info.ID, info.Name)
forRepair = true
} else {
t.log.Infof("Torrent %s %s is unfixable, the lone streamable link has expired, ignoring", info.ID, info.Name)
t.log.Debugf("You can try fixing it yourself magnet:?xt=urn:btih:%s", info.Hash)
}
}
} else if selectedFiles.Len() == len(info.Links) {
// all links are still intact! good!
i := 0
for el := selectedFiles.Front(); el != nil; el = el.Next() {
if i < len(info.Links) {
file := el.Value
file.Link = info.Links[i]
selectedFiles.Set(el.Key, file)
i++
}
}
}
info.ForRepair = forRepair
torrent := Torrent{
AccessKey: t.getName(info.Name, info.OriginalName),
SelectedFiles: selectedFiles,
Directories: t.getDirectories(info),
LatestAdded: info.Added,
InProgress: info.Progress != 100,
Instances: []realdebrid.TorrentInfo{*info},
}
if selectedFiles.Len() > 0 && torrentFromFile == nil {
t.writeToFile(info) // only when there are selected files, else it's useless
}
return &torrent
}
func (t *TorrentManager) getName(name, originalName string) string {
// drop the extension from the name
if t.config.EnableRetainFolderNameExtension() && strings.Contains(name, originalName) {
return name
} else {
ret := strings.TrimSuffix(originalName, ".mp4")
ret = strings.TrimSuffix(ret, ".mkv")
return ret
}
}
func (t *TorrentManager) getDirectories(torrent *realdebrid.TorrentInfo) []string {
var ret []string
// Map torrents to directories
switch t.config.GetVersion() {
case "v1":
configV1 := t.config.(*config.ZurgConfigV1)
groupMap := configV1.GetGroupMap()
// for every group, iterate over every torrent
// and then sprinkle/distribute the torrents to the directories of the group
for _, directories := range groupMap {
for _, directory := range directories {
var filenames []string
for _, file := range torrent.Files {
if file.Selected == 0 {
continue
}
filenames = append(filenames, file.Path)
}
accessKey := t.getName(torrent.Name, torrent.OriginalName)
if configV1.MeetsConditions(directory, torrent.ID, accessKey, filenames) {
ret = append(ret, directory)
break // we found a directory for this torrent for this group, so we can stop looking for more
}
}
}
default:
t.log.Error("Unknown config version")
}
return ret
}
func (t *TorrentManager) writeToFile(torrent *realdebrid.TorrentInfo) {
filePath := fmt.Sprintf("data/%s.bin", torrent.ID)
file, err := os.Create(filePath)
if err != nil {
t.log.Fatalf("Failed creating file: %s", err)
return
}
defer file.Close()
torrent.Version = t.requiredVersion
dataEncoder := gob.NewEncoder(file)
dataEncoder.Encode(torrent)
}
func (t *TorrentManager) readFromFile(torrentID string) *realdebrid.TorrentInfo {
filePath := fmt.Sprintf("data/%s.bin", torrentID)
fileInfo, err := os.Stat(filePath)
if err != nil {
return nil
}
if time.Since(fileInfo.ModTime()) > time.Duration(t.config.GetCacheTimeHours())*time.Hour {
return nil
}
file, err := os.Open(filePath)
if err != nil {
return nil
}
defer file.Close()
var torrent realdebrid.TorrentInfo
dataDecoder := gob.NewDecoder(file)
err = dataDecoder.Decode(&torrent)
if err != nil {
return nil
}
if torrent.Version != t.requiredVersion {
return nil
}
return &torrent
}
func (t *TorrentManager) organizeChaos(info *realdebrid.Torrent, selectedFiles *orderedmap.OrderedMap[string, *File]) (*orderedmap.OrderedMap[string, *File], bool) {
type Result struct {
Response *realdebrid.UnrestrictResponse
}
resultsChan := make(chan Result, len(info.Links))
var wg sync.WaitGroup
// Limit concurrency
sem := make(chan bool, t.config.GetNumOfWorkers())
for _, link := range info.Links {
wg.Add(1)
sem <- true
go func(lnk string) {
defer wg.Done()
defer func() { <-sem }()
resp := t.rd.UnrestrictUntilOk(lnk)
resultsChan <- Result{Response: resp}
}(link)
}
go func() {
t.log.Debugf("Checking %d link(s) for problematic torrent id=%s", len(info.Links), info.ID)
wg.Wait()
close(sem)
close(resultsChan)
t.log.Debugf("Closing results channel for torrent id=%s, checking...", info.ID)
}()
isChaotic := false
for result := range resultsChan {
if result.Response == nil {
continue
}
found := false
for el := selectedFiles.Front(); el != nil; el = el.Next() {
if file, _ := selectedFiles.Get(el.Key); strings.Contains(file.Path, result.Response.Filename) {
t.log.Debugf("Found a file that is in the selection for torrent id=%s: %s", info.ID, result.Response.Filename)
file.Link = result.Response.Link
found = true
}
}
if !found {
t.log.Debugf("Found a file that is NOT in the selection for torrent id=%s: %s %v", info.ID, result.Response.Filename, result.Response.Streamable)
if result.Response.Streamable == 1 {
selectedFiles.Set(filepath.Base(result.Response.Filename), &File{
File: realdebrid.File{
ID: math.MaxInt32,
Path: result.Response.Filename,
Bytes: result.Response.Filesize,
Selected: 1,
},
Link: result.Response.Link,
})
}
}
}
return selectedFiles, isChaotic
}
// func (t *TorrentManager) repairAll() {
// for el := t.TorrentMap.Front(); el != nil; el = el.Next() {
// torrent := el.Value
// // do not repair if: in progress
// if torrent.InProgress {
// continue
// }
// var missingFiles []File
// for el2 := torrent.SelectedFiles.Front(); el2 != nil; el2 = el2.Next() {
// file, ok := torrent.SelectedFiles.Get(el2.Key)
// if !ok {
// continue
// }
// // check for case of repairs like
// // case 1: missing links
// // case 2: unrestrictable links TODO
// if file.Link == "" {
// missingFiles = append(missingFiles, *file)
// }
// }
// if len(missingFiles) == 0 {
// continue
// }
// for _, info := range torrent.Instances {
// if info.ForRepair {
// t.log.Infof("There were less links than was expected on %s %s; fixing...", info.ID, info.Name)
// // t.repair(&info, true)
// break // only repair the first one for repair and then move on
// }
// if len(info.Links) == 0 && info.Progress == 100 {
// // If the torrent has no links
// // and already processing repair
// // delete it!
// t.log.Infof("Deleting broken torrent id=%s as it doesn't contain any files", info.ID)
// t.rd.DeleteTorrent(info.ID)
// }
// }
// }
// }
// func (t *TorrentManager) repair(info *realdebrid.TorrentInfo, tryReinsertionFirst bool) {
// // file.Link == "" should be repaired
// // then we repair it!
// t.log.Infof("Repairing torrent id=%s", info.ID)
// // check if we can still add more downloads
// proceed := t.canCapacityHandle()
// if !proceed {
// t.log.Error("Cannot add more torrents, exiting")
// return
// }
// // first solution: add the same selection, maybe it can be fixed by reinsertion?
// success := false
// if tryReinsertionFirst {
// success = t.reinsertTorrent(info, "", true)
// }
// if !success {
// // if all the selected files are missing but there are other streamable files
// var otherStreamableFileIDs []int
// for _, file := range info.Files {
// found := false
// for el := selectedFiles.Front(); el != nil; el = el.Next() {
// }
// for _, selectedFile := range selectedFiles {
// if selectedFile.ID == file.ID {
// found = true
// break
// }
// }
// if !found && isStreamable(file.Path) {
// otherStreamableFileIDs = append(otherStreamableFileIDs, file.ID)
// }
// }
// if (len(missingFiles) == len(selectedFiles) || len(missingFiles) == 1) && len(otherStreamableFileIDs) > 0 {
// // we will download 1 extra streamable file to force a redownload of the missing files
// // or if there's only 1 missing file, we will download 1 more to prevent a rename
// missingFilesPlus1 := strings.Join(getFileIDs(missingFiles), ",")
// t.log.Infof("Redownloading %d missing files", len(missingFiles))
// t.reinsertTorrent(info, missingFilesPlus1, false)
// } else if len(selectedFiles) > 1 {
// // if not, last resort: add only the missing files but do it in 2 batches
// half := len(missingFiles) / 2
// missingFiles1 := strings.Join(getFileIDs(missingFiles[:half]), ",")
// missingFiles2 := strings.Join(getFileIDs(missingFiles[half:]), ",")
// if missingFiles1 != "" {
// t.log.Infof("Redownloading %d missing files; batch 1 of 2", len(missingFiles1))
// t.reinsertTorrent(info, missingFiles1, false)
// }
// if missingFiles2 != "" {
// t.log.Infof("Redownloading %d missing files; batch 2 of 2", len(missingFiles2))
// t.reinsertTorrent(info, missingFiles2, false)
// } else {
// t.log.Info("No other missing files left to reinsert")
// }
// } else {
// t.log.Infof("Torrent id=%s is unfixable as the only link cached in RD is already broken", info.ID)
// t.log.Debugf("You can try fixing it yourself magnet:?xt=urn:btih:%s", info.Hash)
// return
// }
// t.log.Info("Waiting for downloads to finish")
// }
// }
// func (t *TorrentManager) reinsertTorrent(torrent *realdebrid.TorrentInfo, missingFiles string, deleteIfFailed bool) bool {
// // if missingFiles is not provided, look for missing files
// if missingFiles == "" {
// var tmpSelection string
// for _, file := range torrent.Files {
// if file.Selected == 0 {
// continue
// }
// tmpSelection += fmt.Sprintf("%d,", file.ID)
// }
// if tmpSelection == "" {
// return false
// }
// if len(tmpSelection) > 0 {
// missingFiles = tmpSelection[:len(tmpSelection)-1]
// }
// }
// // redownload torrent
// resp, err := t.rd.AddMagnetHash(torrent.Hash)
// if err != nil {
// t.log.Errorf("Cannot redownload torrent: %v", err)
// return false
// }
// newTorrentID := resp.ID
// err = t.rd.SelectTorrentFiles(newTorrentID, missingFiles)
// if err != nil {
// t.log.Errorf("Cannot start redownloading: %v", err)
// }
// if deleteIfFailed {
// if err != nil {
// t.rd.DeleteTorrent(newTorrentID)
// return false
// }
// time.Sleep(1 * time.Second)
// // see if the torrent is ready
// info, err := t.rd.GetTorrentInfo(newTorrentID)
// if err != nil {
// t.log.Errorf("Cannot get info on redownloaded torrent id=%s : %v", newTorrentID, err)
// if deleteIfFailed {
// t.rd.DeleteTorrent(newTorrentID)
// }
// return false
// }
// time.Sleep(1 * time.Second)
// if info.Progress != 100 {
// t.log.Infof("Torrent id=%s is not cached anymore so we have to wait until completion, currently %d%%", info.ID, info.Progress)
// t.rd.DeleteTorrent(newTorrentID)
// return false
// }
// missingCount := len(strings.Split(missingFiles, ","))
// if len(info.Links) != missingCount {
// t.log.Infof("It didn't fix the issue for id=%s, only got %d files but we need %d, undoing", info.ID, len(info.Links), missingCount)
// t.rd.DeleteTorrent(newTorrentID)
// return false
// }
// t.log.Infof("Redownload successful id=%s, deleting old torrent id=%s", newTorrentID, torrent.ID)
// t.rd.DeleteTorrent(torrent.ID)
// }
// return true
// }
// func (t *TorrentManager) canCapacityHandle() bool {
// // max waiting time is 45 minutes
// const maxRetries = 50
// const baseDelay = 1 * time.Second
// const maxDelay = 60 * time.Second
// retryCount := 0
// for {
// count, err := t.rd.GetActiveTorrentCount()
// if err != nil {
// t.log.Errorf("Cannot get active downloads count: %v", err)
// if retryCount >= maxRetries {
// t.log.Error("Max retries reached. Exiting.")
// return false
// }
// delay := time.Duration(math.Pow(2, float64(retryCount))) * baseDelay
// if delay > maxDelay {
// delay = maxDelay
// }
// time.Sleep(delay)
// retryCount++
// continue
// }
// if count.DownloadingCount < count.MaxNumberOfTorrents {
// t.log.Infof("We can still add a new torrent, we have capacity for %d more", count.MaxNumberOfTorrents-count.DownloadingCount)
// return true
// }
// if retryCount >= maxRetries {
// t.log.Error("Max retries reached, exiting")
// return false
// }
// delay := time.Duration(math.Pow(2, float64(retryCount))) * baseDelay
// if delay > maxDelay {
// delay = maxDelay
// }
// time.Sleep(delay)
// retryCount++
// }
// }