Files
zurg/internal/torrent/manager.go
2023-10-23 20:01:55 +02:00

508 lines
13 KiB
Go

package torrent
import (
"encoding/gob"
"fmt"
"log"
"math"
"os"
"strings"
"sync"
"time"
"github.com/debridmediamanager.com/zurg/internal/config"
"github.com/debridmediamanager.com/zurg/pkg/realdebrid"
"github.com/hashicorp/golang-lru/v2/expirable"
)
type TorrentManager struct {
torrents []Torrent
checksum string
config config.ConfigInterface
cache *expirable.LRU[string, string]
workerPool chan bool
}
// NewTorrentManager creates a new torrent manager
// it will fetch all torrents and their info in the background
// and store them in-memory
func NewTorrentManager(config config.ConfigInterface, cache *expirable.LRU[string, string]) *TorrentManager {
handler := &TorrentManager{
config: config,
cache: cache,
workerPool: make(chan bool, config.GetNumOfWorkers()),
}
// Initialize torrents for the first time
handler.torrents = handler.getAll()
for _, torrent := range handler.torrents {
go func(id string) {
handler.workerPool <- true
handler.getInfo(id)
<-handler.workerPool
time.Sleep(1 * time.Second) // sleep for 1 second to avoid rate limiting
}(torrent.ID)
}
// Start the periodic refresh
go handler.refreshTorrents()
return handler
}
// GetByDirectory returns all torrents that have a file in the specified directory
func (t *TorrentManager) GetByDirectory(directory string) []Torrent {
var torrents []Torrent
for i := range t.torrents {
for _, dir := range t.torrents[i].Directories {
if dir == directory {
torrents = append(torrents, t.torrents[i])
}
}
}
return torrents
}
// RefreshInfo refreshes the info for a torrent
func (t *TorrentManager) RefreshInfo(torrentID string) {
filePath := fmt.Sprintf("data/%s.bin", torrentID)
// Check the last modified time of the .bin file
fileInfo, err := os.Stat(filePath)
if err == nil {
modTime := fileInfo.ModTime()
// If the file was modified less than an hour ago, don't refresh
if time.Since(modTime) < time.Duration(t.config.GetCacheTimeHours())*time.Hour {
return
}
err = os.Remove(filePath)
if err != nil && !os.IsNotExist(err) { // File doesn't exist or other error
log.Printf("Cannot remove file: %v\n", err)
}
} else if !os.IsNotExist(err) { // Error other than file not existing
log.Printf("Error checking file info: %v\n", err)
return
}
info := t.getInfo(torrentID)
log.Println("Refreshed info for", info.Name)
}
// MarkFileAsDeleted marks a file as deleted
func (t *TorrentManager) MarkFileAsDeleted(torrent *Torrent, file *File) {
log.Println("Marking file as deleted", file.Path)
file.Link = ""
t.writeToFile(torrent.ID, torrent)
log.Println("Healing a single file in the torrent", torrent.Name)
t.heal(torrent.ID, []File{*file})
}
// GetInfo returns the info for a torrent
func (t *TorrentManager) GetInfo(torrentID string) *Torrent {
for i := range t.torrents {
if t.torrents[i].ID == torrentID {
return &t.torrents[i]
}
}
return t.getInfo(torrentID)
}
// getChecksum returns the checksum based on the total count and the first torrent's ID
func (t *TorrentManager) getChecksum() string {
torrents, totalCount, err := realdebrid.GetTorrents(t.config.GetToken(), 1)
if err != nil {
log.Printf("Cannot get torrents: %v\n", err)
return t.checksum
}
if len(torrents) == 0 {
log.Println("Huh, no torrents returned")
return t.checksum
}
return fmt.Sprintf("%d-%s-%v", totalCount, torrents[0].ID, torrents[0].Progress == 100)
}
// refreshTorrents periodically refreshes the torrents
func (t *TorrentManager) refreshTorrents() {
log.Println("Starting periodic refresh")
for {
<-time.After(time.Duration(t.config.GetRefreshEverySeconds()) * time.Second)
checksum := t.getChecksum()
if checksum == t.checksum {
continue
}
t.checksum = checksum
t.cache.Purge()
newTorrents := t.getAll()
// Identify removed torrents
for i := 0; i < len(t.torrents); i++ {
found := false
for _, newTorrent := range newTorrents {
if t.torrents[i].ID == newTorrent.ID {
found = true
break
}
}
if !found {
// Remove this torrent from the slice
t.torrents = append(t.torrents[:i], t.torrents[i+1:]...)
i-- // Decrement index since we modified the slice
}
}
// Identify and handle added torrents
for _, newTorrent := range newTorrents {
found := false
for _, torrent := range t.torrents {
if newTorrent.ID == torrent.ID {
found = true
break
}
}
if !found {
t.torrents = append(t.torrents, newTorrent)
go func(id string) {
t.workerPool <- true
t.getInfo(id)
<-t.workerPool
time.Sleep(1 * time.Second) // sleep for 1 second to avoid rate limiting
}(newTorrent.ID)
}
}
}
}
// getAll returns all torrents
func (t *TorrentManager) getAll() []Torrent {
log.Println("Getting all torrents")
torrents, totalCount, err := realdebrid.GetTorrents(t.config.GetToken(), 0)
if err != nil {
log.Printf("Cannot get torrents: %v\n", err)
return nil
}
t.checksum = fmt.Sprintf("%d-%s", totalCount, torrents[0].ID)
var torrentsV2 []Torrent
for _, torrent := range torrents {
torrent.Name = strings.TrimSuffix(torrent.Name, "/")
torrentV2 := Torrent{
Torrent: torrent,
SelectedFiles: nil,
}
torrentsV2 = append(torrentsV2, torrentV2)
}
log.Printf("Fetched %d torrents", len(torrentsV2))
version := t.config.GetVersion()
if version == "v1" {
configV1 := t.config.(*config.ZurgConfigV1)
groupMap := configV1.GetGroupMap()
for group, directories := range groupMap {
log.Printf("Processing directory group: %s\n", group)
var directoryMap = make(map[string]int)
for i := range torrents {
for _, directory := range directories {
if configV1.MeetsConditions(directory, torrentsV2[i].ID, torrentsV2[i].Name) {
torrentsV2[i].Directories = append(torrentsV2[i].Directories, directory)
directoryMap[directory]++
break
}
}
}
log.Printf("Finished processing directory group: %v\n", directoryMap)
}
}
log.Println("Finished mapping to groups")
return torrentsV2
}
// getInfo returns the info for a torrent
func (t *TorrentManager) getInfo(torrentID string) *Torrent {
torrentFromFile := t.readFromFile(torrentID)
if torrentFromFile != nil {
torrent := t.getByID(torrentID)
if torrent != nil {
if len(torrentFromFile.SelectedFiles) == len(torrent.Links) {
torrent.SelectedFiles = torrentFromFile.SelectedFiles
return torrent
}
}
}
log.Println("Getting info for", torrentID)
info, err := realdebrid.GetTorrentInfo(t.config.GetToken(), torrentID)
if err != nil {
log.Printf("Cannot get info: %v\n", err)
return nil
}
var selectedFiles []File
for _, file := range info.Files {
if file.Selected == 0 {
continue
}
selectedFiles = append(selectedFiles, File{
File: file,
Link: "",
})
}
if len(selectedFiles) != len(info.Links) {
log.Println("Some links has expired for", info.Name)
selectedFiles = t.organizeChaos(info, selectedFiles)
t.heal(torrentID, selectedFiles)
} else {
for i, link := range info.Links {
selectedFiles[i].Link = link
}
}
torrent := t.getByID(torrentID)
if torrent != nil {
torrent.SelectedFiles = selectedFiles
}
if len(torrent.SelectedFiles) > 0 {
t.writeToFile(torrentID, torrent)
}
return torrent
}
// getByID returns a torrent by its ID
func (t *TorrentManager) getByID(torrentID string) *Torrent {
for i := range t.torrents {
if t.torrents[i].ID == torrentID {
return &t.torrents[i]
}
}
return nil
}
// writeToFile writes a torrent to a file
func (t *TorrentManager) writeToFile(torrentID string, torrent *Torrent) {
filePath := fmt.Sprintf("data/%s.bin", torrentID)
file, err := os.Create(filePath)
if err != nil {
log.Fatalf("Failed creating file: %s", err)
return
}
defer file.Close()
dataEncoder := gob.NewEncoder(file)
dataEncoder.Encode(torrent)
}
// readFromFile reads a torrent from a file
func (t *TorrentManager) readFromFile(torrentID string) *Torrent {
filePath := fmt.Sprintf("data/%s.bin", torrentID)
fileInfo, err := os.Stat(filePath)
if err != nil {
return nil
}
if time.Since(fileInfo.ModTime()) > time.Duration(t.config.GetCacheTimeHours())*time.Hour {
return nil
}
file, err := os.Open(filePath)
if err != nil {
return nil
}
defer file.Close()
var torrent Torrent
dataDecoder := gob.NewDecoder(file)
err = dataDecoder.Decode(&torrent)
if err != nil {
log.Fatalf("Failed decoding file: %s", err)
return nil
}
return &torrent
}
func (t *TorrentManager) reinsertTorrent(oldTorrentID string, missingFiles string, deleteIfFailed bool) bool {
torrent := t.GetInfo(oldTorrentID)
if torrent == nil {
return false
}
if missingFiles == "" {
var selection string
for _, file := range torrent.SelectedFiles {
if file.Link == "" {
selection += fmt.Sprintf("%d,", file.ID)
}
}
if selection == "" {
return false
}
missingFiles = selection[:len(selection)-1]
}
// reinsert torrent
resp, err := realdebrid.AddMagnetHash(t.config.GetToken(), torrent.Hash)
if err != nil {
log.Printf("Cannot reinsert torrent: %v\n", err)
return false
}
newTorrentID := resp.ID
err = realdebrid.SelectTorrentFiles(t.config.GetToken(), newTorrentID, missingFiles)
if err != nil {
log.Printf("Cannot select files on reinserted torrent: %v\n", err)
}
if deleteIfFailed {
if err != nil {
realdebrid.DeleteTorrent(t.config.GetToken(), newTorrentID)
return false
}
time.Sleep(1 * time.Second)
// see if the torrent is ready
info, err := realdebrid.GetTorrentInfo(t.config.GetToken(), newTorrentID)
if err != nil {
log.Printf("Cannot get info on reinserted torrent: %v\n", err)
if deleteIfFailed {
realdebrid.DeleteTorrent(t.config.GetToken(), newTorrentID)
}
return false
}
time.Sleep(1 * time.Second)
if info.Progress != 100 {
log.Printf("Torrent is not cached anymore, %d%%\n", info.Progress)
realdebrid.DeleteTorrent(t.config.GetToken(), newTorrentID)
return false
}
if len(info.Links) != len(torrent.SelectedFiles) {
log.Printf("It doesn't fix the problem, got %d but we need %d\n", len(info.Links), len(torrent.SelectedFiles))
realdebrid.DeleteTorrent(t.config.GetToken(), newTorrentID)
return false
}
log.Println("Reinsertion successful, deleting old torrent")
realdebrid.DeleteTorrent(t.config.GetToken(), oldTorrentID)
}
return true
}
func (t *TorrentManager) organizeChaos(info *realdebrid.Torrent, selectedFiles []File) []File {
type Result struct {
Response *realdebrid.UnrestrictResponse
}
resultsChan := make(chan Result, len(info.Links))
var wg sync.WaitGroup
// Limit concurrency
sem := make(chan struct{}, t.config.GetNumOfWorkers())
for _, link := range info.Links {
wg.Add(1)
sem <- struct{}{} // Acquire semaphore
go func(lnk string) {
defer wg.Done()
defer func() { <-sem }() // Release semaphore
unrestrictFn := func() (*realdebrid.UnrestrictResponse, error) {
return realdebrid.UnrestrictCheck(t.config.GetToken(), lnk)
}
resp := realdebrid.RetryUntilOk(unrestrictFn)
if resp != nil {
resultsChan <- Result{Response: resp}
}
}(link)
}
go func() {
wg.Wait()
close(sem)
close(resultsChan)
}()
for result := range resultsChan {
found := false
for i := range selectedFiles {
if strings.HasSuffix(selectedFiles[i].Path, result.Response.Filename) {
selectedFiles[i].Link = result.Response.Link
found = true
}
}
if !found {
selectedFiles = append(selectedFiles, File{
File: realdebrid.File{
Path: result.Response.Filename,
Bytes: result.Response.Filesize,
Selected: 1,
},
Link: result.Response.Link,
})
}
}
return selectedFiles
}
func (t *TorrentManager) heal(torrentID string, selectedFiles []File) {
// max waiting time is 45 minutes
const maxRetries = 50
const baseDelay = 1 * time.Second
const maxDelay = 60 * time.Second
retryCount := 0
for {
count, err := realdebrid.GetActiveTorrentCount(t.config.GetToken())
if err != nil {
log.Printf("Cannot get active torrent count: %v\n", err)
if retryCount >= maxRetries {
log.Println("Max retries reached. Exiting.")
return
}
delay := time.Duration(math.Pow(2, float64(retryCount))) * baseDelay
if delay > maxDelay {
delay = maxDelay
}
time.Sleep(delay)
retryCount++
continue
}
if count.DownloadingCount < count.MaxNumberOfTorrents {
log.Printf("We can still add a new torrent, %d/%d\n", count.DownloadingCount, count.MaxNumberOfTorrents)
break
}
if retryCount >= maxRetries {
log.Println("Max retries reached. Exiting.")
return
}
delay := time.Duration(math.Pow(2, float64(retryCount))) * baseDelay
if delay > maxDelay {
delay = maxDelay
}
time.Sleep(delay)
retryCount++
}
// now we can get the missing files
half := len(selectedFiles) / 2
missingFiles1 := getMissingFiles(0, half, selectedFiles)
missingFiles2 := getMissingFiles(half, len(selectedFiles), selectedFiles)
// first solution: add the same selection, maybe it can be fixed by reinsertion?
success := t.reinsertTorrent(torrentID, "", true)
if !success {
// if not, last resort: add only the missing files and do it in 2 batches
t.reinsertTorrent(torrentID, missingFiles1, false)
t.reinsertTorrent(torrentID, missingFiles2, false)
}
}
func getMissingFiles(start, end int, files []File) string {
var missingFiles string
for i := start; i < end; i++ {
if files[i].File.Selected == 1 && files[i].ID != 0 && files[i].Link == "" {
missingFiles += fmt.Sprintf("%d,", files[i].ID)
}
}
if len(missingFiles) > 0 {
missingFiles = missingFiles[:len(missingFiles)-1]
}
return missingFiles
}