Fix logic of fetching torrents

This commit is contained in:
Ben Sarmiento
2024-05-06 10:48:01 +02:00
parent b6b59b22e6
commit ae94252156
11 changed files with 205 additions and 49 deletions

87
internal/fs/node.go Normal file
View File

@@ -0,0 +1,87 @@
package fs
import (
"strings"
cmap "github.com/orcaman/concurrent-map/v2"
)
// FileNode represents a directory or a file. If IsDir is true, Children is used.
type FileNode struct {
Name string
IsDir bool
Children cmap.ConcurrentMap[string, *FileNode]
}
// NewFileNode creates a new FileNode, initializing the concurrent map if it is a directory.
func NewFileNode(name string, isDir bool) *FileNode {
node := &FileNode{
Name: name,
IsDir: isDir,
}
if isDir {
node.Children = cmap.New[*FileNode]()
}
return node
}
// AddChild adds a child node to a directory node.
func (n *FileNode) AddChild(child *FileNode) {
if n.IsDir {
n.Children.Set(child.Name, child)
}
}
// AddPath adds a path to the file system like mkdir -p.
func (n *FileNode) MakeDirectoryWithPath(path string) {
currentNode := n
for _, part := range SplitPath(path) {
child, ok := currentNode.Children.Get(part)
if !ok {
child = NewFileNode(part, true)
currentNode.AddChild(child)
}
currentNode = child
}
}
// AddChildToPath adds a child to a path in the file system. Path should be an existing directory.
func (n *FileNode) AddChildToPath(path string, child *FileNode) {
parent := n.GetFileNode(path)
if parent != nil && parent.IsDir {
parent.AddChild(child)
}
}
// ListFiles returns a list of files in the file system.
func (n *FileNode) ListFiles() []*FileNode {
var files []*FileNode
n.Children.IterCb(func(key string, value *FileNode) {
files = append(files, value)
})
return files
}
// GetFileNode returns the FileNode of the given path.
func (n *FileNode) GetFileNode(path string) *FileNode {
currentNode := n
for _, part := range SplitPath(path) {
child, ok := currentNode.Children.Get(part)
if !ok {
return nil
}
currentNode = child
}
return currentNode
}
// SplitPath splits a path into its parts.
func SplitPath(path string) []string {
var parts []string
for _, part := range strings.Split(path, "/") {
if part != "" {
parts = append(parts, part)
}
}
return parts
}

52
internal/fs/node_test.go Normal file
View File

@@ -0,0 +1,52 @@
package fs
import (
"testing"
)
// TestNewFileNode tests the creation of a new FileNode.
func TestNewFileNode(t *testing.T) {
node := NewFileNode("root", true)
if node.Name != "root" || !node.IsDir {
t.Errorf("NewFileNode failed to initialize properly")
}
}
// TestAddChild and TestListFiles tests adding children to a node and listing them.
func TestAddChild(t *testing.T) {
root := NewFileNode("root", true)
child1 := NewFileNode("child1", false)
child2 := NewFileNode("child2", true)
root.AddChild(child1)
root.AddChild(child2)
// Test if children are added properly
if _, ok := root.Children.Get("child1"); !ok {
t.Errorf("Failed to add child1")
}
if _, ok := root.Children.Get("child2"); !ok {
t.Errorf("Failed to add child2")
}
}
// TestGetFileNode tests retrieving a node from the file system based on a given path.
func TestGetFileNode(t *testing.T) {
root := NewFileNode("root", true)
child1 := NewFileNode("child1", false)
child2 := NewFileNode("child2", true)
root.AddChild(child1)
root.AddChild(child2)
// Retrieve node by path
resultNode := root.GetFileNode("child2")
if resultNode != child2 {
t.Errorf("GetFileNode failed to retrieve the correct node")
}
// Test non-existent path
if resultNode = root.GetFileNode("child3"); resultNode != nil {
t.Errorf("GetFileNode should return nil for non-existent node")
}
}

View File

@@ -33,7 +33,7 @@ func (t *TorrentManager) Delete(accessKey string, deleteInRD bool) {
if torrent, ok := allTorrents.Get(accessKey); ok {
torrent.DownloadedIDs.Union(torrent.InProgressIDs).Each(func(id string) bool {
t.log.Debugf("Deleting torrent %s (id=%s) in RD", accessKey, id)
t.Api.DeleteTorrent(id)
t.api.DeleteTorrent(id)
infoCache.Remove(id)
t.deleteTorrentFile(id)
return false

View File

@@ -62,7 +62,7 @@ func (t *TorrentManager) processFixers(instances []realdebrid.Torrent) {
infoCache, _ := t.DirectoryMap.Get(INT_INFO_CACHE)
for _, id := range toDelete {
t.Api.DeleteTorrent(id)
t.api.DeleteTorrent(id)
infoCache.Remove(id)
t.deleteTorrentFile(id)
}

View File

@@ -23,7 +23,7 @@ func (t *TorrentManager) setNewLatestState(checksum LibraryState) {
func (t *TorrentManager) getCurrentState() LibraryState {
var state LibraryState
torrents, totalCount, err := t.Api.GetTorrents(true)
torrents, totalCount, err := t.api.GetTorrents(true)
if err != nil {
t.log.Errorf("Checksum API Error (GetTorrents): %v", err)
return LibraryState{}
@@ -33,7 +33,7 @@ func (t *TorrentManager) getCurrentState() LibraryState {
state.FirstTorrentId = torrents[0].ID
}
count, err := t.Api.GetActiveTorrentCount()
count, err := t.api.GetActiveTorrentCount()
if err != nil {
t.log.Errorf("Checksum API Error (GetActiveTorrentCount): %v", err)
return LibraryState{}

View File

@@ -9,6 +9,7 @@ import (
"time"
"github.com/debridmediamanager/zurg/internal/config"
"github.com/debridmediamanager/zurg/internal/fs"
"github.com/debridmediamanager/zurg/pkg/logutil"
"github.com/debridmediamanager/zurg/pkg/realdebrid"
mapset "github.com/deckarep/golang-set/v2"
@@ -22,24 +23,31 @@ const (
)
type TorrentManager struct {
Config config.ConfigInterface
Api *realdebrid.RealDebrid
DirectoryMap cmap.ConcurrentMap[string, cmap.ConcurrentMap[string, *Torrent]] // directory -> accessKey -> Torrent
DownloadMap cmap.ConcurrentMap[string, *realdebrid.Download]
fixers cmap.ConcurrentMap[string, string] // trigger -> [command, id]
allAccessKeys mapset.Set[string]
allIDs mapset.Set[string]
latestState *LibraryState
requiredVersion string
workerPool *ants.Pool
requiredVersion string
Config config.ConfigInterface
api *realdebrid.RealDebrid
workerPool *ants.Pool
log *logutil.Logger
DirectoryMap cmap.ConcurrentMap[string, cmap.ConcurrentMap[string, *Torrent]] // directory -> accessKey -> Torrent
DownloadMap cmap.ConcurrentMap[string, *realdebrid.Download]
RootNode *fs.FileNode
RefreshKillSwitch chan struct{}
RepairKillSwitch chan struct{}
RemountTrigger chan struct{}
repairTrigger chan *Torrent
repairSet mapset.Set[*Torrent]
repairRunning bool
repairRunningMu sync.Mutex
log *logutil.Logger
latestState *LibraryState
allAccessKeys mapset.Set[string]
allIDs mapset.Set[string]
fixers cmap.ConcurrentMap[string, string] // trigger -> [command, id]
repairTrigger chan *Torrent
repairSet mapset.Set[*Torrent]
repairRunning bool
repairRunningMu sync.Mutex
}
// NewTorrentManager creates a new torrent manager
@@ -47,18 +55,24 @@ type TorrentManager struct {
// and store them in-memory and cached in files
func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, workerPool *ants.Pool, log *logutil.Logger) *TorrentManager {
t := &TorrentManager{
Config: cfg,
Api: api,
DirectoryMap: cmap.New[cmap.ConcurrentMap[string, *Torrent]](),
requiredVersion: "0.9.3-hotfix.10",
Config: cfg,
api: api,
workerPool: workerPool,
log: log,
DirectoryMap: cmap.New[cmap.ConcurrentMap[string, *Torrent]](),
RootNode: fs.NewFileNode("root", true),
RefreshKillSwitch: make(chan struct{}, 1),
RepairKillSwitch: make(chan struct{}, 1),
RemountTrigger: make(chan struct{}, 1),
allAccessKeys: mapset.NewSet[string](),
allIDs: mapset.NewSet[string](),
latestState: &LibraryState{},
requiredVersion: "0.9.3-hotfix.10",
workerPool: workerPool,
log: log,
latestState: &LibraryState{},
allAccessKeys: mapset.NewSet[string](),
allIDs: mapset.NewSet[string](),
}
t.fixers = t.readFixersFromFile()
@@ -78,9 +92,9 @@ func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, w
return t
}
// proxy
// proxy function
func (t *TorrentManager) UnrestrictLinkUntilOk(link string) *realdebrid.Download {
ret, err := t.Api.UnrestrictLink(link, t.Config.ShouldServeFromRclone())
ret, err := t.api.UnrestrictLink(link, t.Config.ShouldServeFromRclone())
if err != nil {
t.log.Warnf("Cannot unrestrict link %s: %v", link, err)
return nil
@@ -194,7 +208,7 @@ func (t *TorrentManager) mountDownloads() {
page := 1
offset := 0
for {
downloads, totalDownloads, err := t.Api.GetDownloads(page, offset)
downloads, totalDownloads, err := t.api.GetDownloads(page, offset)
if err != nil {
// if we get an error, we just stop
t.log.Warnf("Cannot get downloads on page %d: %v", page, err)
@@ -236,6 +250,7 @@ func (t *TorrentManager) initializeDirectories() {
// create directory maps
for _, directory := range t.Config.GetDirectories() {
t.DirectoryMap.Set(directory, cmap.New[*Torrent]())
// t.RootNode.AddChild(fs.NewFileNode(directory, true))
}
}

View File

@@ -15,7 +15,7 @@ import (
)
func (t *TorrentManager) refreshTorrents(isInitialRun bool) []string {
instances, _, err := t.Api.GetTorrents(false)
instances, _, err := t.api.GetTorrents(false)
if err != nil {
t.log.Warnf("Cannot get torrents: %v", err)
return nil
@@ -72,6 +72,7 @@ func (t *TorrentManager) refreshTorrents(isInitialRun bool) []string {
t.assignedDirectoryCb(mainTor, func(directory string) {
listing, _ := t.DirectoryMap.Get(directory)
listing.Set(accessKey, mainTor)
updatedPaths = append(updatedPaths, fmt.Sprintf("%s/%s", directory, accessKey))
// this is just for the logs
if directory != config.ALL_TORRENTS {
@@ -151,7 +152,7 @@ func (t *TorrentManager) getMoreInfo(rdTorrent realdebrid.Torrent) *Torrent {
return diskTor
}
info, err := t.Api.GetTorrentInfo(rdTorrent.ID)
info, err := t.api.GetTorrentInfo(rdTorrent.ID)
if err != nil {
t.log.Warnf("Cannot get info for id=%s: %v", rdTorrent.ID, err)
return nil

View File

@@ -356,7 +356,7 @@ func (t *TorrentManager) redownloadTorrent(torrent *Torrent, selection string) (
}
// redownload torrent
resp, err := t.Api.AddMagnetHash(torrent.Hash)
resp, err := t.api.AddMagnetHash(torrent.Hash)
if err != nil {
if strings.Contains(err.Error(), "infringing") {
t.markAsUnfixable(torrent, "infringing torrent")
@@ -382,7 +382,7 @@ func (t *TorrentManager) redownloadTorrent(torrent *Torrent, selection string) (
var info *realdebrid.TorrentInfo
for {
// select files
err = t.Api.SelectTorrentFiles(newTorrentID, selection)
err = t.api.SelectTorrentFiles(newTorrentID, selection)
if err != nil {
t.registerFixer(newTorrentID, "download_failed")
return nil, fmt.Errorf("cannot start redownloading: %v", err)
@@ -391,7 +391,7 @@ func (t *TorrentManager) redownloadTorrent(torrent *Torrent, selection string) (
time.Sleep(10 * time.Second)
// see if the torrent is ready
info, err = t.Api.GetTorrentInfo(newTorrentID)
info, err = t.api.GetTorrentInfo(newTorrentID)
if err != nil {
t.registerFixer(newTorrentID, "download_failed")
return nil, fmt.Errorf("cannot get info on redownloaded torrent %s (id=%s) : %v", t.GetKey(torrent), newTorrentID, err)
@@ -440,7 +440,7 @@ func (t *TorrentManager) canCapacityHandle() bool {
const maxDelay = 60 * time.Second
retryCount := 0
for {
count, err := t.Api.GetActiveTorrentCount()
count, err := t.api.GetActiveTorrentCount()
if err != nil {
t.log.Warnf("Cannot get active downloads count: %v", err)
if retryCount >= maxRetries {

View File

@@ -14,16 +14,17 @@ import (
var json = jsoniter.ConfigCompatibleWithStandardLibrary
type Torrent struct {
Hash string `json:"Hash"` // immutable
Added string `json:"Added"` // immutable
UnassignedLinks mapset.Set[string] `json:"UnassignedLinks"` // immutable
DownloadedIDs mapset.Set[string] `json:"DownloadedIDs"` // immutable
InProgressIDs mapset.Set[string] `json:"InProgressIDs"` // immutable
Name string `json:"Name"` // immutable
OriginalName string `json:"OriginalName"` // immutable
Rename string `json:"Rename"` // modified over time
SelectedFiles cmap.ConcurrentMap[string, *File] `json:"-"` // modified over time
UnrepairableReason string `json:"Unfixable"` // modified over time
Name string `json:"Name"` // immutable
OriginalName string `json:"OriginalName"` // immutable
Hash string `json:"Hash"` // immutable
Added string `json:"Added"` // immutable
DownloadedIDs mapset.Set[string] `json:"DownloadedIDs"` // immutable
InProgressIDs mapset.Set[string] `json:"InProgressIDs"` // immutable
UnassignedLinks mapset.Set[string] `json:"UnassignedLinks"` // immutable
Rename string `json:"Rename"` // modified over time
SelectedFiles cmap.ConcurrentMap[string, *File] `json:"-"` // modified over time
UnrepairableReason string `json:"Unfixable"` // modified over time
Version string `json:"Version"` // only used for files
}

View File

@@ -34,7 +34,7 @@ func (t *TorrentManager) GetUncachedTorrents() ([]*Torrent, error) {
break
}
resp, err := t.Api.AvailabilityCheck(hashGroups[i].ToSlice())
resp, err := t.api.AvailabilityCheck(hashGroups[i].ToSlice())
if err != nil {
return nil, fmt.Errorf("availability check is incomplete, skipping uncached check: %v", err)
}

View File

@@ -165,7 +165,7 @@ func (rd *RealDebrid) GetTorrents(onlyOne bool) ([]Torrent, int, error) {
allTorrents = append(allTorrents, result.torrents...)
totalCount := result.totalCount
if onlyOne || totalCount == len(allTorrents) {
if onlyOne {
return allTorrents, totalCount, nil
}