Files
zurg/internal/torrent/manager.go
2023-11-07 18:32:30 +01:00

748 lines
21 KiB
Go

package torrent
import (
"encoding/gob"
"fmt"
"math"
"os"
"strings"
"sync"
"time"
"github.com/debridmediamanager.com/zurg/internal/config"
"github.com/debridmediamanager.com/zurg/pkg/logutil"
"github.com/debridmediamanager.com/zurg/pkg/realdebrid"
"github.com/hashicorp/golang-lru/v2/expirable"
"go.uber.org/zap"
)
type TorrentManager struct {
requiredVersion string
torrents []Torrent
inProgress []string
checksum string
config config.ConfigInterface
cache *expirable.LRU[string, string]
workerPool chan bool
directoryMap map[string][]string
processedTorrents map[string][]string
mu *sync.Mutex
log *zap.SugaredLogger
}
// NewTorrentManager creates a new torrent manager
// it will fetch all torrents and their info in the background
// and store them in-memory; it is called only once at startup
func NewTorrentManager(config config.ConfigInterface, cache *expirable.LRU[string, string]) *TorrentManager {
t := &TorrentManager{
requiredVersion: fmt.Sprintf("4.11.2023 - retain:%v", config.EnableRetainFolderNameExtension()),
config: config,
cache: cache,
workerPool: make(chan bool, config.GetNumOfWorkers()),
directoryMap: make(map[string][]string),
processedTorrents: make(map[string][]string),
mu: &sync.Mutex{},
log: logutil.NewLogger().Named("manager"),
}
// Initialize torrents for the first time
t.mu.Lock()
t.torrents = t.getFreshListFromAPI()
t.checksum = t.getChecksum()
t.mu.Unlock()
// log.Println("First checksum", t.checksum)
var wg sync.WaitGroup
for i := range t.torrents {
wg.Add(1)
go func(idx int) {
defer wg.Done()
t.workerPool <- true
t.addMoreInfo(&t.torrents[idx])
<-t.workerPool
}(i)
}
if t.config.EnableRepair() {
go t.repairAll(&wg)
}
wg.Wait()
t.mapToDirectories()
go t.startRefreshJob()
return t
}
// GetByDirectory returns all torrents that have a file in the specified directory
func (t *TorrentManager) GetByDirectory(directory string) []Torrent {
var torrents []Torrent
for i := range t.torrents {
for _, dir := range t.directoryMap[t.torrents[i].Name] {
if dir == directory {
torrents = append(torrents, t.torrents[i])
}
}
}
return torrents
}
// HideTheFile marks a file as deleted
func (t *TorrentManager) HideTheFile(torrent *Torrent, file *File) {
file.Unavailable = true
t.repair(torrent.ID, torrent.SelectedFiles, false)
}
// FindAllTorrentsWithName finds all torrents in a given directory with a given name
func (t *TorrentManager) FindAllTorrentsWithName(directory, torrentName string) []Torrent {
var matchingTorrents []Torrent
torrents := t.GetByDirectory(directory)
for i := range torrents {
if torrents[i].Name == torrentName || strings.HasPrefix(torrents[i].Name, torrentName) {
matchingTorrents = append(matchingTorrents, torrents[i])
}
}
return matchingTorrents
}
// findAllDownloadedFilesFromHash finds all files that were with a given hash
func (t *TorrentManager) findAllDownloadedFilesFromHash(hash string) []File {
var files []File
for _, torrent := range t.torrents {
if torrent.Hash == hash {
for _, file := range torrent.SelectedFiles {
if file.Link != "" {
files = append(files, file)
}
}
}
}
return files
}
type torrentsResponse struct {
torrents []realdebrid.Torrent
totalCount int
}
func (t *TorrentManager) getChecksum() string {
torrentsChan := make(chan torrentsResponse)
countChan := make(chan int)
errChan := make(chan error, 2) // accommodate errors from both goroutines
// GetTorrents request
go func() {
torrents, totalCount, err := realdebrid.GetTorrents(t.config.GetToken(), 1)
if err != nil {
errChan <- err
return
}
torrentsChan <- torrentsResponse{torrents: torrents, totalCount: totalCount}
}()
// GetActiveTorrentCount request
go func() {
count, err := realdebrid.GetActiveTorrentCount(t.config.GetToken())
if err != nil {
errChan <- err
return
}
countChan <- count.DownloadingCount
}()
var torrents []realdebrid.Torrent
var totalCount, count int
for i := 0; i < 2; i++ {
select {
case torrentsResp := <-torrentsChan:
torrents = torrentsResp.torrents
totalCount = torrentsResp.totalCount
case count = <-countChan:
case err := <-errChan:
t.log.Errorf("Checksum API Error: %v\n", err)
return ""
}
}
if len(torrents) == 0 {
t.log.Error("Huh, no torrents returned")
return ""
}
checksum := fmt.Sprintf("%d%s%d", totalCount, torrents[0].ID, count)
return checksum
}
// startRefreshJob periodically refreshes the torrents
func (t *TorrentManager) startRefreshJob() {
t.log.Info("Starting periodic refresh")
for {
<-time.After(time.Duration(t.config.GetRefreshEverySeconds()) * time.Second)
checksum := t.getChecksum()
if checksum == t.checksum {
continue
}
t.cache.Purge()
newTorrents := t.getFreshListFromAPI()
var wg sync.WaitGroup
for i := range newTorrents {
wg.Add(1)
go func(idx int) {
defer wg.Done()
t.workerPool <- true
t.addMoreInfo(&newTorrents[idx])
<-t.workerPool
}(i)
}
wg.Wait()
// apply side effects
t.mu.Lock()
t.torrents = newTorrents
t.checksum = t.getChecksum()
t.mu.Unlock()
// log.Println("Checksum changed", t.checksum)
if t.config.EnableRepair() {
go t.repairAll(&wg)
}
go t.mapToDirectories()
go OnLibraryUpdateHook(t.config)
}
}
// getFreshListFromAPI returns all torrents
func (t *TorrentManager) getFreshListFromAPI() []Torrent {
torrents, _, err := realdebrid.GetTorrents(t.config.GetToken(), 0)
if err != nil {
t.log.Errorf("Cannot get torrents: %v\n", err)
return nil
}
// convert to own internal type without SelectedFiles yet
// populate inProgress
var torrentsV2 []Torrent
t.inProgress = t.inProgress[:0] // reset
for _, torrent := range torrents {
torrent.Name = strings.TrimSuffix(torrent.Name, "/")
torrentV2 := Torrent{
Torrent: torrent,
SelectedFiles: nil,
ForRepair: false,
lock: &sync.Mutex{},
}
torrentsV2 = append(torrentsV2, torrentV2)
if torrent.Progress != 100 {
t.inProgress = append(t.inProgress, torrent.Hash)
}
}
t.log.Infof("Fetched %d torrents", len(torrentsV2))
return torrentsV2
}
// addMoreInfo updates the selected files for a torrent
func (t *TorrentManager) addMoreInfo(torrent *Torrent) {
// file cache
torrentFromFile := t.readFromFile(torrent.ID)
if torrentFromFile != nil {
// see if api data and file data still match
// then it means data is still usable
if len(torrentFromFile.Links) == len(torrent.Links) {
torrent.Name = t.getName(torrentFromFile)
torrent.ForRepair = torrentFromFile.ForRepair
torrent.SelectedFiles = torrentFromFile.SelectedFiles[:]
return
}
}
// no file data yet as it is still downloading
if torrent.Progress != 100 {
return
}
// t.log.Println("Getting info for", torrent.ID)
info, err := realdebrid.GetTorrentInfo(t.config.GetToken(), torrent.ID)
if err != nil {
t.log.Errorf("Cannot get info: %v\n", err)
return
}
// SelectedFiles is a subset of Files with only the selected ones
// it also has a Link field, which can be empty
// if it is empty, it means the file is no longer available
// Files+Links together are the same as SelectedFiles
var selectedFiles []File
var streamableFiles []File
// if some Links are empty, we need to repair it
forRepair := false
for _, file := range info.Files {
if isStreamable(file.Path) {
streamableFiles = append(streamableFiles, File{
File: file,
Link: "",
})
}
if file.Selected == 0 {
continue
}
selectedFiles = append(selectedFiles, File{
File: file,
Link: "",
})
}
if len(selectedFiles) > len(info.Links) && info.Progress == 100 {
t.log.Debugf("Some links has expired for %s %s: %d selected but only %d link(s)", info.ID, info.Name, len(selectedFiles), len(info.Links))
// chaotic file means RD will not output the desired file selection
// e.g. even if we select just a single mkv, it will output a rar
var isChaotic bool
selectedFiles, isChaotic = t.organizeChaos(info, selectedFiles)
if isChaotic {
t.log.Infof("Torrent %s %s is unfixable, it's always returning an unstreamable link, ignoring", info.ID, info.Name)
t.log.Debugf("You can try fixing it yourself magnet:?xt=urn:btih:%s", info.Hash)
} else {
if len(streamableFiles) > 1 {
t.log.Infof("Torrent %s %s marked for repair", info.ID, info.Name)
forRepair = true
} else {
t.log.Infof("Torrent %s %s is unfixable, the lone streamable link has expired, ignoring", info.ID, info.Name)
t.log.Debugf("You can try fixing it yourself magnet:?xt=urn:btih:%s", info.Hash)
}
}
} else {
// all links are still intact! good!
for i, link := range info.Links {
selectedFiles[i].Link = link
}
}
// update file cache
torrent.OriginalName = info.OriginalName
torrent.Name = t.getName(torrent)
if len(selectedFiles) > 0 {
// update the torrent with more data!
torrent.SelectedFiles = selectedFiles
torrent.ForRepair = forRepair
t.writeToFile(torrent)
}
}
func (t *TorrentManager) getName(torrent *Torrent) string {
// drop the extension from the name
t.log.Debugf("Original name: %s", torrent.OriginalName)
t.log.Debugf("Name: %s", torrent.Name)
if t.config.EnableRetainFolderNameExtension() && strings.Contains(torrent.Name, torrent.OriginalName) {
return torrent.Name
} else {
ret := strings.TrimSuffix(torrent.OriginalName, ".mp4")
ret = strings.TrimSuffix(ret, ".mkv")
return ret
}
}
// mapToDirectories maps torrents to directories
func (t *TorrentManager) mapToDirectories() {
// Map torrents to directories
switch t.config.GetVersion() {
case "v1":
configV1 := t.config.(*config.ZurgConfigV1)
groupMap := configV1.GetGroupMap()
// for every group, iterate over every torrent
// and then sprinkle/distribute the torrents to the directories of the group
for group, directories := range groupMap {
counter := make(map[string]int)
for i := range t.torrents {
// don't process torrents that are already mapped if it is not the first run
alreadyMappedToGroup := false
for _, mappedGroup := range t.processedTorrents[t.torrents[i].Name] {
if mappedGroup == group {
alreadyMappedToGroup = true
}
}
if alreadyMappedToGroup {
continue
}
for _, directory := range directories {
var filenames []string
for _, file := range t.torrents[i].SelectedFiles {
filenames = append(filenames, file.Path)
}
if configV1.MeetsConditions(directory, t.torrents[i].ID, t.torrents[i].Name, filenames) {
found := false
// check if it is already mapped to this directory
for _, dir := range t.directoryMap[t.torrents[i].Name] {
if dir == directory {
found = true
break // it is already mapped to this directory
}
}
if !found {
counter[directory]++
t.mu.Lock()
t.directoryMap[t.torrents[i].Name] = append(t.directoryMap[t.torrents[i].Name], directory)
t.mu.Unlock()
break // we found a directory for this torrent, so we can stop looking for more
}
}
}
t.mu.Lock()
t.processedTorrents[t.torrents[i].Name] = append(t.processedTorrents[t.torrents[i].Name], group)
t.mu.Unlock()
}
sum := 0
for _, count := range counter {
sum += count
}
if sum > 0 {
t.log.Infof("Group processing completed: %s %v total: %d", group, counter, sum)
} else {
t.log.Infof("No new additions to directory group %s", group)
}
}
default:
t.log.Error("Unknown config version")
}
}
// getByID returns a torrent by its ID
func (t *TorrentManager) getByID(torrentID string) *Torrent {
for i := range t.torrents {
if t.torrents[i].ID == torrentID {
return &t.torrents[i]
}
}
return nil
}
// writeToFile writes a torrent to a file
func (t *TorrentManager) writeToFile(torrent *Torrent) {
filePath := fmt.Sprintf("data/%s.bin", torrent.ID)
file, err := os.Create(filePath)
if err != nil {
t.log.Fatalf("Failed creating file: %s", err)
return
}
defer file.Close()
torrent.Version = t.requiredVersion
dataEncoder := gob.NewEncoder(file)
dataEncoder.Encode(torrent)
}
// readFromFile reads a torrent from a file
func (t *TorrentManager) readFromFile(torrentID string) *Torrent {
filePath := fmt.Sprintf("data/%s.bin", torrentID)
fileInfo, err := os.Stat(filePath)
if err != nil {
return nil
}
if time.Since(fileInfo.ModTime()) > time.Duration(t.config.GetCacheTimeHours())*time.Hour {
return nil
}
file, err := os.Open(filePath)
if err != nil {
return nil
}
defer file.Close()
var torrent Torrent
dataDecoder := gob.NewDecoder(file)
err = dataDecoder.Decode(&torrent)
if err != nil {
return nil
}
if torrent.Version != t.requiredVersion {
return nil
}
return &torrent
}
func (t *TorrentManager) repairAll(wg *sync.WaitGroup) {
wg.Wait()
for _, torrent := range t.torrents {
if torrent.ForRepair {
t.log.Infof("Issues were detected on %s %s; fixing...", torrent.ID, torrent.Name)
t.repair(torrent.ID, torrent.SelectedFiles, true)
}
if len(torrent.Links) == 0 && torrent.Progress == 100 {
// If the torrent has no links
// and already processing repair
// delete it!
t.log.Infof("Deleting broken torrent %s %s as it doesn't contain any files", torrent.ID, torrent.Name)
realdebrid.DeleteTorrent(t.config.GetToken(), torrent.ID)
}
}
}
func (t *TorrentManager) repair(torrentID string, selectedFiles []File, tryReinsertionFirst bool) {
torrent := t.getByID(torrentID)
if torrent == nil {
return
}
// check if it is already "being" repaired
found := false
for _, hash := range t.inProgress {
if hash == torrent.Hash {
found = true
break
}
}
if found {
t.log.Infof("Repair in progress, skipping %s", torrentID)
return
}
// check if it is already repaired
foundFiles := t.findAllDownloadedFilesFromHash(torrent.Hash)
var missingFiles []File
for _, sFile := range selectedFiles {
if sFile.Link == "" || sFile.Unavailable {
found := false
for _, fFile := range foundFiles {
// same file but different link, then yes it has been repaired
if sFile.Path == fFile.Path && sFile.Link != fFile.Link {
found = true
break
}
}
if !found {
missingFiles = append(missingFiles, sFile)
}
}
}
if len(missingFiles) == 0 {
t.log.Infof("Torrent %s %s is already repaired", torrent.ID, torrent.Name)
return
}
// then we repair it!
t.log.Infof("Repairing torrent %s %s", torrent.ID, torrent.Name)
// check if we can still add more downloads
proceed := t.canCapacityHandle()
if !proceed {
t.log.Error("Cannot add more torrents, exiting")
return
}
// first solution: add the same selection, maybe it can be fixed by reinsertion?
success := false
if tryReinsertionFirst {
success = t.reinsertTorrent(torrent, "", true)
}
if !success {
// if all the selected files are missing but there are other streamable files
var otherStreamableFileIDs []int
for _, file := range torrent.Files {
found := false
for _, selectedFile := range selectedFiles {
if selectedFile.ID == file.ID {
found = true
break
}
}
if !found && isStreamable(file.Path) {
otherStreamableFileIDs = append(otherStreamableFileIDs, file.ID)
}
}
if (len(missingFiles) == len(selectedFiles) || len(missingFiles) == 1) && len(otherStreamableFileIDs) > 0 {
// we will download 1 extra streamable file to force a redownload of the missing files
// or if there's only 1 missing file, we will download 1 more to prevent a rename
missingFilesPlus1 := strings.Join(getFileIDs(missingFiles), ",")
t.log.Infof("Redownloading %d missing files", len(missingFiles))
t.reinsertTorrent(torrent, missingFilesPlus1, false)
} else if len(selectedFiles) > 1 {
// if not, last resort: add only the missing files but do it in 2 batches
half := len(missingFiles) / 2
missingFiles1 := strings.Join(getFileIDs(missingFiles[:half]), ",")
missingFiles2 := strings.Join(getFileIDs(missingFiles[half:]), ",")
if missingFiles1 != "" {
t.log.Infof("Redownloading %d missing files; batch 1 of 2", len(missingFiles1))
t.reinsertTorrent(torrent, missingFiles1, false)
}
if missingFiles2 != "" {
t.log.Infof("Redownloading %d missing files; batch 2 of 2", len(missingFiles2))
t.reinsertTorrent(torrent, missingFiles2, false)
} else {
t.log.Info("No other missing files left to reinsert")
}
} else {
t.log.Infof("Torrent %s %s is unfixable as the only link cached in RD is already broken", torrent.ID, torrent.Name)
t.log.Debugf("You can try fixing it yourself magnet:?xt=urn:btih:%s", torrent.Hash)
return
}
t.log.Info("Waiting for downloads to finish")
}
}
func (t *TorrentManager) reinsertTorrent(torrent *Torrent, missingFiles string, deleteIfFailed bool) bool {
// if missingFiles is not provided, look for missing files
if missingFiles == "" {
t.log.Info("Redownloading whole torrent", torrent.Name)
var selection string
for _, file := range torrent.SelectedFiles {
selection += fmt.Sprintf("%d,", file.ID)
}
if selection == "" {
return false
}
if len(selection) > 0 {
missingFiles = selection[:len(selection)-1]
}
}
// redownload torrent
resp, err := realdebrid.AddMagnetHash(t.config.GetToken(), torrent.Hash)
if err != nil {
t.log.Errorf("Cannot redownload torrent: %v", err)
return false
}
newTorrentID := resp.ID
err = realdebrid.SelectTorrentFiles(t.config.GetToken(), newTorrentID, missingFiles)
if err != nil {
t.log.Errorf("Cannot start redownloading: %v", err)
}
if deleteIfFailed {
if err != nil {
realdebrid.DeleteTorrent(t.config.GetToken(), newTorrentID)
return false
}
time.Sleep(1 * time.Second)
// see if the torrent is ready
info, err := realdebrid.GetTorrentInfo(t.config.GetToken(), newTorrentID)
if err != nil {
t.log.Errorf("Cannot get info on redownloaded torrent: %v", err)
if deleteIfFailed {
realdebrid.DeleteTorrent(t.config.GetToken(), newTorrentID)
}
return false
}
time.Sleep(1 * time.Second)
if info.Progress != 100 {
t.log.Infof("Torrent is not cached anymore so we have to wait until completion, currently %d%%", info.Progress)
realdebrid.DeleteTorrent(t.config.GetToken(), newTorrentID)
return false
}
if len(info.Links) != len(torrent.SelectedFiles) {
t.log.Infof("It didn't fix the issue, only got %d files but we need %d, undoing", len(info.Links), len(torrent.SelectedFiles))
realdebrid.DeleteTorrent(t.config.GetToken(), newTorrentID)
return false
}
t.log.Info("Redownload successful, deleting old torrent")
realdebrid.DeleteTorrent(t.config.GetToken(), torrent.ID)
}
return true
}
func (t *TorrentManager) organizeChaos(info *realdebrid.Torrent, selectedFiles []File) ([]File, bool) {
type Result struct {
Response *realdebrid.UnrestrictResponse
}
resultsChan := make(chan Result, len(info.Links))
var wg sync.WaitGroup
// Limit concurrency
sem := make(chan struct{}, t.config.GetNumOfWorkers())
for _, link := range info.Links {
wg.Add(1)
sem <- struct{}{}
go func(lnk string) {
defer wg.Done()
defer func() { <-sem }()
unrestrictFn := func() (*realdebrid.UnrestrictResponse, error) {
return realdebrid.UnrestrictCheck(t.config.GetToken(), lnk)
}
resp := realdebrid.RetryUntilOk(unrestrictFn)
if resp != nil {
resultsChan <- Result{Response: resp}
}
}(link)
}
go func() {
wg.Wait()
close(sem)
close(resultsChan)
}()
isChaotic := false
for result := range resultsChan {
found := false
for i := range selectedFiles {
if strings.HasSuffix(selectedFiles[i].Path, result.Response.Filename) {
selectedFiles[i].Link = result.Response.Link
found = true
}
}
if !found {
// "chaos" file, we don't know where it belongs
isChaotic = !isStreamable(result.Response.Filename)
selectedFiles = append(selectedFiles, File{
File: realdebrid.File{
Path: result.Response.Filename,
Bytes: result.Response.Filesize,
Selected: 1,
},
Link: result.Response.Link,
})
}
}
return selectedFiles, isChaotic
}
func (t *TorrentManager) canCapacityHandle() bool {
// max waiting time is 45 minutes
const maxRetries = 50
const baseDelay = 1 * time.Second
const maxDelay = 60 * time.Second
retryCount := 0
for {
count, err := realdebrid.GetActiveTorrentCount(t.config.GetToken())
if err != nil {
t.log.Errorf("Cannot get active downloads count: %v", err)
if retryCount >= maxRetries {
t.log.Error("Max retries reached. Exiting.")
return false
}
delay := time.Duration(math.Pow(2, float64(retryCount))) * baseDelay
if delay > maxDelay {
delay = maxDelay
}
time.Sleep(delay)
retryCount++
continue
}
if count.DownloadingCount < count.MaxNumberOfTorrents {
t.log.Infof("We can still add a new torrent, we have capacity for %d more", count.MaxNumberOfTorrents-count.DownloadingCount)
return true
}
if retryCount >= maxRetries {
t.log.Error("Max retries reached, exiting")
return false
}
delay := time.Duration(math.Pow(2, float64(retryCount))) * baseDelay
if delay > maxDelay {
delay = maxDelay
}
time.Sleep(delay)
retryCount++
}
}