diff --git a/internal/fs/node.go b/internal/fs/node.go new file mode 100644 index 0000000..9b427d1 --- /dev/null +++ b/internal/fs/node.go @@ -0,0 +1,87 @@ +package fs + +import ( + "strings" + + cmap "github.com/orcaman/concurrent-map/v2" +) + +// FileNode represents a directory or a file. If IsDir is true, Children is used. +type FileNode struct { + Name string + IsDir bool + Children cmap.ConcurrentMap[string, *FileNode] +} + +// NewFileNode creates a new FileNode, initializing the concurrent map if it is a directory. +func NewFileNode(name string, isDir bool) *FileNode { + node := &FileNode{ + Name: name, + IsDir: isDir, + } + if isDir { + node.Children = cmap.New[*FileNode]() + } + return node +} + +// AddChild adds a child node to a directory node. +func (n *FileNode) AddChild(child *FileNode) { + if n.IsDir { + n.Children.Set(child.Name, child) + } +} + +// AddPath adds a path to the file system like mkdir -p. +func (n *FileNode) MakeDirectoryWithPath(path string) { + currentNode := n + for _, part := range SplitPath(path) { + child, ok := currentNode.Children.Get(part) + if !ok { + child = NewFileNode(part, true) + currentNode.AddChild(child) + } + currentNode = child + } +} + +// AddChildToPath adds a child to a path in the file system. Path should be an existing directory. +func (n *FileNode) AddChildToPath(path string, child *FileNode) { + parent := n.GetFileNode(path) + if parent != nil && parent.IsDir { + parent.AddChild(child) + } +} + +// ListFiles returns a list of files in the file system. +func (n *FileNode) ListFiles() []*FileNode { + var files []*FileNode + n.Children.IterCb(func(key string, value *FileNode) { + files = append(files, value) + }) + return files +} + +// GetFileNode returns the FileNode of the given path. +func (n *FileNode) GetFileNode(path string) *FileNode { + currentNode := n + for _, part := range SplitPath(path) { + child, ok := currentNode.Children.Get(part) + if !ok { + return nil + } + currentNode = child + } + return currentNode +} + +// SplitPath splits a path into its parts. +func SplitPath(path string) []string { + var parts []string + for _, part := range strings.Split(path, "/") { + if part != "" { + parts = append(parts, part) + } + } + return parts +} diff --git a/internal/fs/node_test.go b/internal/fs/node_test.go new file mode 100644 index 0000000..bd90f1b --- /dev/null +++ b/internal/fs/node_test.go @@ -0,0 +1,52 @@ +package fs + +import ( + "testing" +) + +// TestNewFileNode tests the creation of a new FileNode. +func TestNewFileNode(t *testing.T) { + node := NewFileNode("root", true) + if node.Name != "root" || !node.IsDir { + t.Errorf("NewFileNode failed to initialize properly") + } +} + +// TestAddChild and TestListFiles tests adding children to a node and listing them. +func TestAddChild(t *testing.T) { + root := NewFileNode("root", true) + child1 := NewFileNode("child1", false) + child2 := NewFileNode("child2", true) + + root.AddChild(child1) + root.AddChild(child2) + + // Test if children are added properly + if _, ok := root.Children.Get("child1"); !ok { + t.Errorf("Failed to add child1") + } + if _, ok := root.Children.Get("child2"); !ok { + t.Errorf("Failed to add child2") + } +} + +// TestGetFileNode tests retrieving a node from the file system based on a given path. +func TestGetFileNode(t *testing.T) { + root := NewFileNode("root", true) + child1 := NewFileNode("child1", false) + child2 := NewFileNode("child2", true) + + root.AddChild(child1) + root.AddChild(child2) + + // Retrieve node by path + resultNode := root.GetFileNode("child2") + if resultNode != child2 { + t.Errorf("GetFileNode failed to retrieve the correct node") + } + + // Test non-existent path + if resultNode = root.GetFileNode("child3"); resultNode != nil { + t.Errorf("GetFileNode should return nil for non-existent node") + } +} diff --git a/internal/torrent/delete.go b/internal/torrent/delete.go index 463c22e..6656f61 100644 --- a/internal/torrent/delete.go +++ b/internal/torrent/delete.go @@ -33,7 +33,7 @@ func (t *TorrentManager) Delete(accessKey string, deleteInRD bool) { if torrent, ok := allTorrents.Get(accessKey); ok { torrent.DownloadedIDs.Union(torrent.InProgressIDs).Each(func(id string) bool { t.log.Debugf("Deleting torrent %s (id=%s) in RD", accessKey, id) - t.Api.DeleteTorrent(id) + t.api.DeleteTorrent(id) infoCache.Remove(id) t.deleteTorrentFile(id) return false diff --git a/internal/torrent/fixer.go b/internal/torrent/fixer.go index 9bc586f..30d2b53 100644 --- a/internal/torrent/fixer.go +++ b/internal/torrent/fixer.go @@ -62,7 +62,7 @@ func (t *TorrentManager) processFixers(instances []realdebrid.Torrent) { infoCache, _ := t.DirectoryMap.Get(INT_INFO_CACHE) for _, id := range toDelete { - t.Api.DeleteTorrent(id) + t.api.DeleteTorrent(id) infoCache.Remove(id) t.deleteTorrentFile(id) } diff --git a/internal/torrent/latestState.go b/internal/torrent/latestState.go index 5f99149..488b067 100644 --- a/internal/torrent/latestState.go +++ b/internal/torrent/latestState.go @@ -23,7 +23,7 @@ func (t *TorrentManager) setNewLatestState(checksum LibraryState) { func (t *TorrentManager) getCurrentState() LibraryState { var state LibraryState - torrents, totalCount, err := t.Api.GetTorrents(true) + torrents, totalCount, err := t.api.GetTorrents(true) if err != nil { t.log.Errorf("Checksum API Error (GetTorrents): %v", err) return LibraryState{} @@ -33,7 +33,7 @@ func (t *TorrentManager) getCurrentState() LibraryState { state.FirstTorrentId = torrents[0].ID } - count, err := t.Api.GetActiveTorrentCount() + count, err := t.api.GetActiveTorrentCount() if err != nil { t.log.Errorf("Checksum API Error (GetActiveTorrentCount): %v", err) return LibraryState{} diff --git a/internal/torrent/manager.go b/internal/torrent/manager.go index 6a6ec48..b26121e 100644 --- a/internal/torrent/manager.go +++ b/internal/torrent/manager.go @@ -9,6 +9,7 @@ import ( "time" "github.com/debridmediamanager/zurg/internal/config" + "github.com/debridmediamanager/zurg/internal/fs" "github.com/debridmediamanager/zurg/pkg/logutil" "github.com/debridmediamanager/zurg/pkg/realdebrid" mapset "github.com/deckarep/golang-set/v2" @@ -22,24 +23,31 @@ const ( ) type TorrentManager struct { - Config config.ConfigInterface - Api *realdebrid.RealDebrid - DirectoryMap cmap.ConcurrentMap[string, cmap.ConcurrentMap[string, *Torrent]] // directory -> accessKey -> Torrent - DownloadMap cmap.ConcurrentMap[string, *realdebrid.Download] - fixers cmap.ConcurrentMap[string, string] // trigger -> [command, id] - allAccessKeys mapset.Set[string] - allIDs mapset.Set[string] - latestState *LibraryState - requiredVersion string - workerPool *ants.Pool + requiredVersion string + + Config config.ConfigInterface + api *realdebrid.RealDebrid + workerPool *ants.Pool + log *logutil.Logger + + DirectoryMap cmap.ConcurrentMap[string, cmap.ConcurrentMap[string, *Torrent]] // directory -> accessKey -> Torrent + DownloadMap cmap.ConcurrentMap[string, *realdebrid.Download] + + RootNode *fs.FileNode + RefreshKillSwitch chan struct{} RepairKillSwitch chan struct{} RemountTrigger chan struct{} - repairTrigger chan *Torrent - repairSet mapset.Set[*Torrent] - repairRunning bool - repairRunningMu sync.Mutex - log *logutil.Logger + + latestState *LibraryState + allAccessKeys mapset.Set[string] + allIDs mapset.Set[string] + + fixers cmap.ConcurrentMap[string, string] // trigger -> [command, id] + repairTrigger chan *Torrent + repairSet mapset.Set[*Torrent] + repairRunning bool + repairRunningMu sync.Mutex } // NewTorrentManager creates a new torrent manager @@ -47,18 +55,24 @@ type TorrentManager struct { // and store them in-memory and cached in files func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, workerPool *ants.Pool, log *logutil.Logger) *TorrentManager { t := &TorrentManager{ - Config: cfg, - Api: api, - DirectoryMap: cmap.New[cmap.ConcurrentMap[string, *Torrent]](), + requiredVersion: "0.9.3-hotfix.10", + + Config: cfg, + api: api, + workerPool: workerPool, + log: log, + + DirectoryMap: cmap.New[cmap.ConcurrentMap[string, *Torrent]](), + + RootNode: fs.NewFileNode("root", true), + RefreshKillSwitch: make(chan struct{}, 1), RepairKillSwitch: make(chan struct{}, 1), RemountTrigger: make(chan struct{}, 1), - allAccessKeys: mapset.NewSet[string](), - allIDs: mapset.NewSet[string](), - latestState: &LibraryState{}, - requiredVersion: "0.9.3-hotfix.10", - workerPool: workerPool, - log: log, + + latestState: &LibraryState{}, + allAccessKeys: mapset.NewSet[string](), + allIDs: mapset.NewSet[string](), } t.fixers = t.readFixersFromFile() @@ -78,9 +92,9 @@ func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, w return t } -// proxy +// proxy function func (t *TorrentManager) UnrestrictLinkUntilOk(link string) *realdebrid.Download { - ret, err := t.Api.UnrestrictLink(link, t.Config.ShouldServeFromRclone()) + ret, err := t.api.UnrestrictLink(link, t.Config.ShouldServeFromRclone()) if err != nil { t.log.Warnf("Cannot unrestrict link %s: %v", link, err) return nil @@ -194,7 +208,7 @@ func (t *TorrentManager) mountDownloads() { page := 1 offset := 0 for { - downloads, totalDownloads, err := t.Api.GetDownloads(page, offset) + downloads, totalDownloads, err := t.api.GetDownloads(page, offset) if err != nil { // if we get an error, we just stop t.log.Warnf("Cannot get downloads on page %d: %v", page, err) @@ -236,6 +250,7 @@ func (t *TorrentManager) initializeDirectories() { // create directory maps for _, directory := range t.Config.GetDirectories() { t.DirectoryMap.Set(directory, cmap.New[*Torrent]()) + // t.RootNode.AddChild(fs.NewFileNode(directory, true)) } } diff --git a/internal/torrent/refresh.go b/internal/torrent/refresh.go index 15235f3..fdadd27 100644 --- a/internal/torrent/refresh.go +++ b/internal/torrent/refresh.go @@ -15,7 +15,7 @@ import ( ) func (t *TorrentManager) refreshTorrents(isInitialRun bool) []string { - instances, _, err := t.Api.GetTorrents(false) + instances, _, err := t.api.GetTorrents(false) if err != nil { t.log.Warnf("Cannot get torrents: %v", err) return nil @@ -72,6 +72,7 @@ func (t *TorrentManager) refreshTorrents(isInitialRun bool) []string { t.assignedDirectoryCb(mainTor, func(directory string) { listing, _ := t.DirectoryMap.Get(directory) listing.Set(accessKey, mainTor) + updatedPaths = append(updatedPaths, fmt.Sprintf("%s/%s", directory, accessKey)) // this is just for the logs if directory != config.ALL_TORRENTS { @@ -151,7 +152,7 @@ func (t *TorrentManager) getMoreInfo(rdTorrent realdebrid.Torrent) *Torrent { return diskTor } - info, err := t.Api.GetTorrentInfo(rdTorrent.ID) + info, err := t.api.GetTorrentInfo(rdTorrent.ID) if err != nil { t.log.Warnf("Cannot get info for id=%s: %v", rdTorrent.ID, err) return nil diff --git a/internal/torrent/repair.go b/internal/torrent/repair.go index 92c1c87..c203f5f 100644 --- a/internal/torrent/repair.go +++ b/internal/torrent/repair.go @@ -356,7 +356,7 @@ func (t *TorrentManager) redownloadTorrent(torrent *Torrent, selection string) ( } // redownload torrent - resp, err := t.Api.AddMagnetHash(torrent.Hash) + resp, err := t.api.AddMagnetHash(torrent.Hash) if err != nil { if strings.Contains(err.Error(), "infringing") { t.markAsUnfixable(torrent, "infringing torrent") @@ -382,7 +382,7 @@ func (t *TorrentManager) redownloadTorrent(torrent *Torrent, selection string) ( var info *realdebrid.TorrentInfo for { // select files - err = t.Api.SelectTorrentFiles(newTorrentID, selection) + err = t.api.SelectTorrentFiles(newTorrentID, selection) if err != nil { t.registerFixer(newTorrentID, "download_failed") return nil, fmt.Errorf("cannot start redownloading: %v", err) @@ -391,7 +391,7 @@ func (t *TorrentManager) redownloadTorrent(torrent *Torrent, selection string) ( time.Sleep(10 * time.Second) // see if the torrent is ready - info, err = t.Api.GetTorrentInfo(newTorrentID) + info, err = t.api.GetTorrentInfo(newTorrentID) if err != nil { t.registerFixer(newTorrentID, "download_failed") return nil, fmt.Errorf("cannot get info on redownloaded torrent %s (id=%s) : %v", t.GetKey(torrent), newTorrentID, err) @@ -440,7 +440,7 @@ func (t *TorrentManager) canCapacityHandle() bool { const maxDelay = 60 * time.Second retryCount := 0 for { - count, err := t.Api.GetActiveTorrentCount() + count, err := t.api.GetActiveTorrentCount() if err != nil { t.log.Warnf("Cannot get active downloads count: %v", err) if retryCount >= maxRetries { diff --git a/internal/torrent/types.go b/internal/torrent/types.go index 81d32e6..17f5f36 100644 --- a/internal/torrent/types.go +++ b/internal/torrent/types.go @@ -14,16 +14,17 @@ import ( var json = jsoniter.ConfigCompatibleWithStandardLibrary type Torrent struct { - Hash string `json:"Hash"` // immutable - Added string `json:"Added"` // immutable - UnassignedLinks mapset.Set[string] `json:"UnassignedLinks"` // immutable - DownloadedIDs mapset.Set[string] `json:"DownloadedIDs"` // immutable - InProgressIDs mapset.Set[string] `json:"InProgressIDs"` // immutable - Name string `json:"Name"` // immutable - OriginalName string `json:"OriginalName"` // immutable - Rename string `json:"Rename"` // modified over time - SelectedFiles cmap.ConcurrentMap[string, *File] `json:"-"` // modified over time - UnrepairableReason string `json:"Unfixable"` // modified over time + Name string `json:"Name"` // immutable + OriginalName string `json:"OriginalName"` // immutable + Hash string `json:"Hash"` // immutable + Added string `json:"Added"` // immutable + DownloadedIDs mapset.Set[string] `json:"DownloadedIDs"` // immutable + InProgressIDs mapset.Set[string] `json:"InProgressIDs"` // immutable + UnassignedLinks mapset.Set[string] `json:"UnassignedLinks"` // immutable + + Rename string `json:"Rename"` // modified over time + SelectedFiles cmap.ConcurrentMap[string, *File] `json:"-"` // modified over time + UnrepairableReason string `json:"Unfixable"` // modified over time Version string `json:"Version"` // only used for files } diff --git a/internal/torrent/uncached.go b/internal/torrent/uncached.go index 9c92e38..c2ff497 100644 --- a/internal/torrent/uncached.go +++ b/internal/torrent/uncached.go @@ -34,7 +34,7 @@ func (t *TorrentManager) GetUncachedTorrents() ([]*Torrent, error) { break } - resp, err := t.Api.AvailabilityCheck(hashGroups[i].ToSlice()) + resp, err := t.api.AvailabilityCheck(hashGroups[i].ToSlice()) if err != nil { return nil, fmt.Errorf("availability check is incomplete, skipping uncached check: %v", err) } diff --git a/pkg/realdebrid/api.go b/pkg/realdebrid/api.go index 50f10bd..3898fe8 100644 --- a/pkg/realdebrid/api.go +++ b/pkg/realdebrid/api.go @@ -165,7 +165,7 @@ func (rd *RealDebrid) GetTorrents(onlyOne bool) ([]Torrent, int, error) { allTorrents = append(allTorrents, result.torrents...) totalCount := result.totalCount - if onlyOne || totalCount == len(allTorrents) { + if onlyOne { return allTorrents, totalCount, nil }