Files
zurg/internal/torrent/manager.go
Ben Sarmiento f1717a8d94 Add new configs
2023-10-20 03:59:47 +02:00

312 lines
7.7 KiB
Go

package torrent
import (
"encoding/gob"
"fmt"
"log"
"os"
"strings"
"sync"
"time"
"github.com/debridmediamanager.com/zurg/internal/config"
"github.com/debridmediamanager.com/zurg/pkg/realdebrid"
)
type TorrentManager struct {
token string
torrents []Torrent
workerPool chan bool
config config.ConfigInterface
checksum string
}
func (t *TorrentManager) refreshTorrents() {
log.Println("Starting periodic refresh")
for {
<-time.After(time.Duration(t.config.GetRefreshEverySeconds()) * time.Second)
checksum := t.getChecksum()
if checksum == t.checksum {
continue
}
t.checksum = checksum
t.torrents = t.getAll()
for _, torrent := range t.torrents {
go func(id string) {
t.workerPool <- true
t.getInfo(id)
<-t.workerPool
time.Sleep(1 * time.Second) // sleep for 1 second to avoid rate limiting
}(torrent.ID)
}
}
}
// NewTorrentManager creates a new torrent manager
// it will fetch all torrents and their info in the background
// and store them in-memory
func NewTorrentManager(config config.ConfigInterface) *TorrentManager {
handler := &TorrentManager{
token: config.GetToken(),
workerPool: make(chan bool, config.GetNumOfWorkers()),
config: config,
}
// Initialize torrents for the first time
handler.torrents = handler.getAll()
for _, torrent := range handler.torrents {
go func(id string) {
handler.workerPool <- true
handler.getInfo(id)
<-handler.workerPool
time.Sleep(1 * time.Second) // sleep for 1 second to avoid rate limiting
}(torrent.ID)
}
// Start the periodic refresh
go handler.refreshTorrents()
return handler
}
func (t *TorrentManager) getChecksum() string {
torrents, totalCount, err := realdebrid.GetTorrents(t.token, 1)
if err != nil {
log.Printf("Cannot get torrents: %v\n", err.Error())
return t.checksum
}
if len(torrents) == 0 {
return t.checksum
}
return fmt.Sprintf("%d-%s", totalCount, torrents[0].ID)
}
func (t *TorrentManager) getAll() []Torrent {
log.Println("Getting all torrents")
torrents, totalCount, err := realdebrid.GetTorrents(t.token, 0)
if err != nil {
log.Printf("Cannot get torrents: %v\n", err.Error())
return nil
}
t.checksum = fmt.Sprintf("%d-%s", totalCount, torrents[0].ID)
var torrentsV2 []Torrent
for _, torrent := range torrents {
torrent.Name = strings.TrimSuffix(torrent.Name, "/")
torrentV2 := Torrent{
Torrent: torrent,
SelectedFiles: nil,
}
torrentsV2 = append(torrentsV2, torrentV2)
}
log.Printf("Fetched %d torrents", len(torrentsV2))
version := t.config.GetVersion()
if version == "v1" {
configV1 := t.config.(*config.ZurgConfigV1)
groupMap := configV1.GetGroupMap()
for group, directories := range groupMap {
log.Printf("Processing directory group: %s, %v\n", group, directories)
for i := range torrents {
for _, directory := range directories {
if configV1.MeetsConditions(directory, torrentsV2[i].ID, torrentsV2[i].Name) {
torrentsV2[i].Directories = append(torrentsV2[i].Directories, directory)
break
}
}
}
}
}
log.Println("Finished mapping to groups")
return torrentsV2
}
func (t *TorrentManager) GetByDirectory(directory string) []Torrent {
var torrents []Torrent
for i := range t.torrents {
for _, dir := range t.torrents[i].Directories {
if dir == directory {
torrents = append(torrents, t.torrents[i])
}
}
}
return torrents
}
func (t *TorrentManager) RefreshInfo(torrentID string) {
filePath := fmt.Sprintf("data/%s.bin", torrentID)
// Check the last modified time of the .bin file
fileInfo, err := os.Stat(filePath)
if err == nil {
modTime := fileInfo.ModTime()
// If the file was modified less than an hour ago, don't refresh
if time.Since(modTime) < time.Duration(t.config.GetCacheTimeHours())*time.Hour {
return
}
err = os.Remove(filePath)
if err != nil && !os.IsNotExist(err) { // File doesn't exist or other error
log.Printf("Cannot remove file: %v\n", err.Error())
}
} else if !os.IsNotExist(err) { // Error other than file not existing
log.Printf("Error checking file info: %v\n", err.Error())
return
}
info := t.getInfo(torrentID)
log.Println("Refreshed info for", info.Name)
}
func (t *TorrentManager) getInfo(torrentID string) *Torrent {
torrentFromFile := t.readFromFile(torrentID)
if torrentFromFile != nil {
torrent := t.getByID(torrentID)
if torrent != nil {
torrent.SelectedFiles = torrentFromFile.SelectedFiles
}
return torrent
}
log.Println("Getting info for", torrentID)
info, err := realdebrid.GetTorrentInfo(t.token, torrentID)
if err != nil {
log.Printf("Cannot get info: %v\n", err.Error())
return nil
}
var selectedFiles []File
for _, file := range info.Files {
if file.Selected == 0 {
continue
}
selectedFiles = append(selectedFiles, File{
File: file,
Link: "",
})
}
if len(selectedFiles) != len(info.Links) {
// TODO: This means some files have expired
// we need to 'fix' this torrent then, at least the missing selected files
log.Println("Some links has expired for", info.Name)
type Result struct {
Response *realdebrid.UnrestrictResponse
}
resultsChan := make(chan Result, len(info.Links))
var wg sync.WaitGroup
// Limit concurrency
sem := make(chan struct{}, t.config.GetNumOfWorkers())
for _, link := range info.Links {
wg.Add(1)
sem <- struct{}{} // Acquire semaphore
go func(lnk string) {
defer wg.Done()
defer func() { <-sem }() // Release semaphore
unrestrictFn := func() (*realdebrid.UnrestrictResponse, error) {
return realdebrid.UnrestrictCheck(t.token, lnk)
}
resp := realdebrid.RetryUntilOk(unrestrictFn)
if resp != nil {
resultsChan <- Result{Response: resp}
}
}(link)
}
go func() {
wg.Wait()
close(sem)
close(resultsChan)
}()
for result := range resultsChan {
found := false
for i := range selectedFiles {
if strings.HasSuffix(selectedFiles[i].Path, result.Response.Filename) {
selectedFiles[i].Link = result.Response.Link
found = true
}
}
if !found {
selectedFiles = append(selectedFiles, File{
File: realdebrid.File{
Path: result.Response.Filename,
Bytes: result.Response.Filesize,
Selected: 1,
},
Link: result.Response.Link,
})
}
}
} else {
for i, link := range info.Links {
selectedFiles[i].Link = link
}
}
torrent := t.getByID(torrentID)
if torrent != nil {
torrent.SelectedFiles = selectedFiles
}
if len(torrent.SelectedFiles) > 0 {
t.writeToFile(torrentID, torrent)
}
return torrent
}
func (t *TorrentManager) MarkFileAsDeleted(torrent *Torrent, file *File) {
log.Println("Marking file as deleted", file.Path)
file.Link = ""
t.writeToFile(torrent.ID, torrent)
}
func (t *TorrentManager) GetInfo(torrentID string) *Torrent {
for i := range t.torrents {
if t.torrents[i].ID == torrentID {
return &t.torrents[i]
}
}
return t.getInfo(torrentID)
}
func (t *TorrentManager) getByID(torrentID string) *Torrent {
for i := range t.torrents {
if t.torrents[i].ID == torrentID {
return &t.torrents[i]
}
}
return nil
}
func (t *TorrentManager) writeToFile(torrentID string, torrent *Torrent) {
filePath := fmt.Sprintf("data/%s.bin", torrentID)
file, err := os.Create(filePath)
if err != nil {
log.Fatalf("Failed creating file: %s", err)
return
}
defer file.Close()
dataEncoder := gob.NewEncoder(file)
dataEncoder.Encode(torrent)
}
func (t *TorrentManager) readFromFile(torrentID string) *Torrent {
filePath := fmt.Sprintf("data/%s.bin", torrentID)
file, err := os.Open(filePath)
if err != nil {
return nil
}
defer file.Close()
var torrent Torrent
dataDecoder := gob.NewDecoder(file)
err = dataDecoder.Decode(&torrent)
if err != nil {
log.Fatalf("Failed decoding file: %s", err)
return nil
}
return &torrent
}