diff --git a/cmd/zurg/main.go b/cmd/zurg/main.go
index 228c578..a9c7353 100644
--- a/cmd/zurg/main.go
+++ b/cmd/zurg/main.go
@@ -6,7 +6,6 @@ import (
"net/http"
"os"
"os/signal"
- "runtime"
"syscall"
"time"
@@ -14,13 +13,11 @@ import (
"github.com/debridmediamanager.com/zurg/internal/net"
"github.com/debridmediamanager.com/zurg/internal/torrent"
"github.com/debridmediamanager.com/zurg/internal/version"
- "github.com/debridmediamanager.com/zurg/internal/zfs"
- "github.com/debridmediamanager.com/zurg/pkg/chunk"
- zurghttp "github.com/debridmediamanager.com/zurg/pkg/http"
+
+ // zurghttp "github.com/debridmediamanager.com/zurg/pkg/http"
"github.com/debridmediamanager.com/zurg/pkg/logutil"
"github.com/debridmediamanager.com/zurg/pkg/realdebrid"
"github.com/hashicorp/golang-lru/v2/expirable"
- "github.com/winfsp/cgofuse/fuse"
)
func main() {
@@ -65,29 +62,29 @@ func main() {
}
}()
- log.Debugf("Initializing chunk manager, cores: %d", runtime.NumCPU())
- client := zurghttp.NewHTTPClient(config.GetToken(), 10, config)
- chunkMgr, err := chunk.NewManager(
- "", // in-memory chunks
- 10485760, // 10MB chunk size
- max(runtime.NumCPU()/2, 1), // 8 cores/2 = 4 chunks to load ahead
- max(runtime.NumCPU()/2, 1), // 4 check threads
- max(runtime.NumCPU()-1, 1), // number of chunks that should be read ahead
- runtime.NumCPU()*2, // total chunks kept in memory
- torrentMgr,
- client)
- if nil != err {
- log.Panicf("Failed to initialize chunk manager: %v", err)
- }
+ // log.Debugf("Initializing chunk manager, cores: %d", runtime.NumCPU())
+ // client := zurghttp.NewHTTPClient(config.GetToken(), 10, config)
+ // chunkMgr, err := chunk.NewManager(
+ // "", // in-memory chunks
+ // 10485760, // 10MB chunk size
+ // max(runtime.NumCPU()/2, 1), // 8 cores/2 = 4 chunks to load ahead
+ // max(runtime.NumCPU()/2, 1), // 4 check threads
+ // max(runtime.NumCPU()-1, 1), // number of chunks that should be read ahead
+ // runtime.NumCPU()*2, // total chunks kept in memory
+ // torrentMgr,
+ // client)
+ // if nil != err {
+ // log.Panicf("Failed to initialize chunk manager: %v", err)
+ // }
- fs := zfs.NewZurgFS(torrentMgr, config, chunkMgr, logutil.NewLogger().Named("zfs"))
- host := fuse.NewFileSystemHost(fs)
- go func() {
- log.Infof("Mounting on %s", config.GetMountPoint())
- if err := zfs.Mount(host, config); err != nil {
- log.Panicf("Failed to mount: %v", err)
- }
- }()
+ // fs := zfs.NewZurgFS(torrentMgr, config, chunkMgr, logutil.NewLogger().Named("zfs"))
+ // host := fuse.NewFileSystemHost(fs)
+ // go func() {
+ // log.Infof("Mounting on %s", config.GetMountPoint())
+ // if err := zfs.Mount(host, config); err != nil {
+ // log.Panicf("Failed to mount: %v", err)
+ // }
+ // }()
<-shutdown
@@ -97,9 +94,9 @@ func main() {
if err := server.Shutdown(ctx); err != nil {
log.Errorf("Server shutdown error: %v\n", err)
}
- if err := zfs.Unmount(host); err != nil {
- log.Errorf("Unmount error: %v\n", err)
- }
+ // if err := zfs.Unmount(host); err != nil {
+ // log.Errorf("Unmount error: %v\n", err)
+ // }
log.Info("BYE")
}
diff --git a/go.mod b/go.mod
index 89a76d9..1056e14 100644
--- a/go.mod
+++ b/go.mod
@@ -3,13 +3,14 @@ module github.com/debridmediamanager.com/zurg
go 1.21.3
require (
- github.com/elliotchance/orderedmap/v2 v2.2.0
github.com/hashicorp/golang-lru/v2 v2.0.7
go.uber.org/zap v1.26.0
golang.org/x/sys v0.14.0
gopkg.in/yaml.v3 v3.0.1
)
+require github.com/orcaman/concurrent-map/v2 v2.0.1
+
require (
github.com/winfsp/cgofuse v1.5.0
go.uber.org/multierr v1.10.0 // indirect
diff --git a/go.sum b/go.sum
index 963bc15..1896074 100644
--- a/go.sum
+++ b/go.sum
@@ -1,9 +1,9 @@
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
-github.com/elliotchance/orderedmap/v2 v2.2.0 h1:7/2iwO98kYT4XkOjA9mBEIwvi4KpGB4cyHeOFOnj4Vk=
-github.com/elliotchance/orderedmap/v2 v2.2.0/go.mod h1:85lZyVbpGaGvHvnKa7Qhx7zncAdBIBq6u56Hb1PRU5Q=
github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k=
github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM=
+github.com/orcaman/concurrent-map/v2 v2.0.1 h1:jOJ5Pg2w1oeB6PeDurIYf6k9PQ+aTITr/6lP/L/zp6c=
+github.com/orcaman/concurrent-map/v2 v2.0.1/go.mod h1:9Eq3TG2oBe5FirmYWQfYO5iH1q0Jv47PLaNK++uCdOM=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
diff --git a/internal/config/load.go b/internal/config/load.go
index b1bb3ba..3ee3ac6 100644
--- a/internal/config/load.go
+++ b/internal/config/load.go
@@ -18,7 +18,7 @@ type ConfigInterface interface {
GetHost() string
GetPort() string
GetDirectories() []string
- MeetsConditions(directory, torrentID, torrentName string, fileNames []string) bool
+ MeetsConditions(directory, torrentName string, torrentIDs, fileNames []string) bool
GetOnLibraryUpdate() string
GetNetworkBufferSize() int
GetMountPoint() string
diff --git a/internal/config/v1.go b/internal/config/v1.go
index dd2bf9b..78f2bf2 100644
--- a/internal/config/v1.go
+++ b/internal/config/v1.go
@@ -63,21 +63,25 @@ func (z *ZurgConfigV1) GetGroupMap() map[string][]string {
return result
}
-func (z *ZurgConfigV1) MeetsConditions(directory, torrentID, torrentName string, fileNames []string) bool {
+func (z *ZurgConfigV1) MeetsConditions(directory, torrentName string, torrentIDs, fileNames []string) bool {
if _, ok := z.Directories[directory]; !ok {
return false
}
for _, filter := range z.Directories[directory].Filters {
- if z.matchFilter(torrentID, torrentName, fileNames, filter) {
+ if z.matchFilter(torrentName, torrentIDs, fileNames, filter) {
return true
}
}
return false
}
-func (z *ZurgConfigV1) matchFilter(fileID, torrentName string, fileNames []string, filter *FilterConditionsV1) bool {
- if filter.ID != "" && fileID == filter.ID {
- return true
+func (z *ZurgConfigV1) matchFilter(torrentName string, torrentIDs, fileNames []string, filter *FilterConditionsV1) bool {
+ if filter.ID != "" {
+ for _, torrentID := range torrentIDs {
+ if torrentID == filter.ID {
+ return true
+ }
+ }
}
if filter.RegexStr != "" {
regex := compilePattern(filter.RegexStr)
@@ -100,7 +104,7 @@ func (z *ZurgConfigV1) matchFilter(fileID, torrentName string, fileNames []strin
if len(filter.And) > 0 {
andResult := true
for _, andFilter := range filter.And {
- andResult = andResult && z.matchFilter(fileID, torrentName, fileNames, andFilter)
+ andResult = andResult && z.matchFilter(torrentName, torrentIDs, fileNames, andFilter)
if !andResult {
return false
}
@@ -109,7 +113,7 @@ func (z *ZurgConfigV1) matchFilter(fileID, torrentName string, fileNames []strin
}
if len(filter.Or) > 0 {
for _, orFilter := range filter.Or {
- if z.matchFilter(fileID, torrentName, fileNames, orFilter) {
+ if z.matchFilter(torrentName, torrentIDs, fileNames, orFilter) {
return true
}
}
diff --git a/internal/dav/listing.go b/internal/dav/listing.go
index b9bb1ce..0c8ffb4 100644
--- a/internal/dav/listing.go
+++ b/internal/dav/listing.go
@@ -4,8 +4,10 @@ import (
"encoding/xml"
"fmt"
"net/http"
+ "net/url"
"path"
"path/filepath"
+ "sort"
"strings"
"github.com/debridmediamanager.com/zurg/internal/config"
@@ -27,11 +29,11 @@ func HandlePropfindRequest(w http.ResponseWriter, r *http.Request, t *torrent.To
switch {
case len(filteredSegments) == 1 && filteredSegments[0] == "":
- output, err = handleRoot(w, r, c)
+ output, err = handleRoot(c)
case len(filteredSegments) == 1:
- output, err = handleListOfTorrents(requestPath, w, r, t, c)
+ output, err = handleListOfTorrents(requestPath, t)
case len(filteredSegments) == 2:
- output, err = handleSingleTorrent(requestPath, w, r, t)
+ output, err = handleSingleTorrent(requestPath, t)
default:
log.Warnf("Request %s %s not found", r.Method, requestPath)
http.Error(w, "Not Found", http.StatusNotFound)
@@ -56,7 +58,7 @@ func HandlePropfindRequest(w http.ResponseWriter, r *http.Request, t *torrent.To
}
}
-func handleRoot(w http.ResponseWriter, r *http.Request, c config.ConfigInterface) ([]byte, error) {
+func handleRoot(c config.ConfigInterface) ([]byte, error) {
var responses []dav.Response
responses = append(responses, dav.Directory(""))
for _, directory := range c.GetDirectories() {
@@ -69,68 +71,23 @@ func handleRoot(w http.ResponseWriter, r *http.Request, c config.ConfigInterface
return xml.Marshal(rootResponse)
}
-func handleListOfTorrents(requestPath string, w http.ResponseWriter, r *http.Request, t *torrent.TorrentManager, c config.ConfigInterface) ([]byte, error) {
+func handleListOfTorrents(requestPath string, t *torrent.TorrentManager) ([]byte, error) {
basePath := path.Base(requestPath)
-
- for _, directory := range c.GetDirectories() {
- if basePath == directory {
- var responses []dav.Response
-
- responses = append(responses, dav.Directory(basePath))
-
- for el := t.TorrentMap.Front(); el != nil; el = el.Next() {
- accessKey := el.Key
- torrent := el.Value
- if torrent.InProgress {
- continue
- }
- for _, dir := range torrent.Directories {
- if dir == basePath {
- path := filepath.Join(basePath, accessKey)
- responses = append(responses, dav.Directory(path))
- break
- }
- }
- }
-
- resp := &dav.MultiStatus{
- XMLNS: "DAV:",
- Response: responses,
- }
- return xml.Marshal(resp)
- }
+ torrents, ok := t.DirectoryMap.Get(basePath)
+ if !ok {
+ return nil, fmt.Errorf("cannot find directory %s", basePath)
}
-
- return nil, fmt.Errorf("cannot find directory when generating list: %s", requestPath)
-}
-
-func handleSingleTorrent(requestPath string, w http.ResponseWriter, r *http.Request, t *torrent.TorrentManager) ([]byte, error) {
- accessKey := path.Base(requestPath)
- torrent, exists := t.TorrentMap.Get(accessKey)
- if !exists {
- return nil, fmt.Errorf("cannot find torrent %s", accessKey)
- }
-
var responses []dav.Response
-
// initial response is the directory itself
- responses = append(responses, dav.Directory(requestPath))
+ responses = append(responses, dav.Directory(basePath))
- for el := torrent.SelectedFiles.Front(); el != nil; el = el.Next() {
- file := el.Value
- if file.Link == "" {
- // will be caught by torrent manager's repairAll
- // just skip it for now
- continue
- }
- filename := filepath.Base(file.Path)
- filePath := filepath.Join(requestPath, filename)
- responses = append(responses, dav.File(
- filePath,
- file.Bytes,
- convertRFC3339toRFC1123(torrent.LatestAdded),
- file.Link,
- ))
+ var accessKeys []string
+ torrents.IterCb(func(accessKey string, _ *torrent.Torrent) {
+ accessKeys = append(accessKeys, accessKey)
+ })
+ sort.Strings(accessKeys)
+ for _, accessKey := range accessKeys {
+ responses = append(responses, dav.Directory(filepath.Join(basePath, accessKey)))
}
resp := &dav.MultiStatus{
@@ -139,3 +96,41 @@ func handleSingleTorrent(requestPath string, w http.ResponseWriter, r *http.Requ
}
return xml.Marshal(resp)
}
+
+func handleSingleTorrent(requestPath string, t *torrent.TorrentManager) ([]byte, error) {
+ basePath := path.Base(path.Dir(requestPath))
+ torrents, ok := t.DirectoryMap.Get(basePath)
+ if !ok {
+ return nil, fmt.Errorf("cannot find directory %s", basePath)
+ }
+ accessKey := path.Base(requestPath)
+ tor, ok := torrents.Get(accessKey)
+ if !ok {
+ return nil, fmt.Errorf("cannot find torrent %s", accessKey)
+ }
+
+ var responses []dav.Response
+ // initial response is the directory itself
+ responses = append(responses, dav.Directory(requestPath))
+
+ tor.SelectedFiles.IterCb(func(filename string, file *torrent.File) {
+ if file.Link == "" {
+ // will be caught by torrent manager's repairAll
+ // just skip it for now
+ return
+ }
+ filePath := filepath.Join(requestPath, url.PathEscape(filename))
+ responses = append(responses, dav.File(
+ filePath,
+ file.Bytes,
+ convertRFC3339toRFC1123(tor.LatestAdded),
+ file.Link,
+ ))
+ })
+
+ resp := &dav.MultiStatus{
+ XMLNS: "DAV:",
+ Response: responses,
+ }
+ return xml.Marshal(resp)
+}
diff --git a/internal/http/listing.go b/internal/http/listing.go
index ddf0db7..210976b 100644
--- a/internal/http/listing.go
+++ b/internal/http/listing.go
@@ -6,6 +6,7 @@ import (
"net/url"
"path"
"path/filepath"
+ "sort"
"strings"
"github.com/debridmediamanager.com/zurg/internal/config"
@@ -24,11 +25,11 @@ func HandleDirectoryListing(w http.ResponseWriter, r *http.Request, t *torrent.T
filteredSegments := removeEmptySegments(strings.Split(requestPath, "/"))
switch {
case len(filteredSegments) == 1:
- output, err = handleRoot(w, r, c)
+ output, err = handleRoot(c)
case len(filteredSegments) == 2:
- output, err = handleListOfTorrents(requestPath, w, r, t, c)
+ output, err = handleListOfTorrents(requestPath, t)
case len(filteredSegments) == 3:
- output, err = handleSingleTorrent(requestPath, w, r, t)
+ output, err = handleSingleTorrent(requestPath, t)
default:
log.Warnf("Request %s %s not found", r.Method, requestPath)
http.Error(w, "Not Found", http.StatusNotFound)
@@ -51,7 +52,7 @@ func HandleDirectoryListing(w http.ResponseWriter, r *http.Request, t *torrent.T
}
}
-func handleRoot(w http.ResponseWriter, r *http.Request, c config.ConfigInterface) (*string, error) {
+func handleRoot(c config.ConfigInterface) (*string, error) {
htmlDoc := "
"
for _, directory := range c.GetDirectories() {
@@ -62,50 +63,50 @@ func handleRoot(w http.ResponseWriter, r *http.Request, c config.ConfigInterface
return &htmlDoc, nil
}
-func handleListOfTorrents(requestPath string, w http.ResponseWriter, r *http.Request, t *torrent.TorrentManager, c config.ConfigInterface) (*string, error) {
+func handleListOfTorrents(requestPath string, t *torrent.TorrentManager) (*string, error) {
basePath := path.Base(requestPath)
-
- for _, directory := range c.GetDirectories() {
- if basePath == directory {
- htmlDoc := ""
- for el := t.TorrentMap.Front(); el != nil; el = el.Next() {
- accessKey := el.Key
- torrent := el.Value
- if torrent.InProgress {
- continue
- }
- for _, dir := range torrent.Directories {
- if dir == basePath {
- htmlDoc += fmt.Sprintf("- %s
", filepath.Join(requestPath, url.PathEscape(accessKey)), accessKey)
- break
- }
- }
- }
- return &htmlDoc, nil
- }
+ torrents, ok := t.DirectoryMap.Get(basePath)
+ if !ok {
+ return nil, fmt.Errorf("cannot find directory %s", basePath)
}
- return nil, fmt.Errorf("cannot find directory when generating list: %s", requestPath)
+ htmlDoc := ""
+
+ var accessKeys []string
+ torrents.IterCb(func(accessKey string, _ *torrent.Torrent) {
+ accessKeys = append(accessKeys, accessKey)
+ })
+ sort.Strings(accessKeys)
+ for _, accessKey := range accessKeys {
+ htmlDoc = htmlDoc + fmt.Sprintf("- %s
", filepath.Join(requestPath, url.PathEscape(accessKey)), accessKey)
+ }
+
+ return &htmlDoc, nil
}
-func handleSingleTorrent(requestPath string, w http.ResponseWriter, r *http.Request, t *torrent.TorrentManager) (*string, error) {
+func handleSingleTorrent(requestPath string, t *torrent.TorrentManager) (*string, error) {
+ basePath := path.Base(path.Dir(requestPath))
+ torrents, ok := t.DirectoryMap.Get(basePath)
+ if !ok {
+ return nil, fmt.Errorf("cannot find directory %s", basePath)
+ }
accessKey := path.Base(requestPath)
- torrent, _ := t.TorrentMap.Get(accessKey)
- if torrent == nil {
+ tor, ok := torrents.Get(accessKey)
+ if !ok {
return nil, fmt.Errorf("cannot find torrent %s", accessKey)
}
htmlDoc := ""
- for el := torrent.SelectedFiles.Front(); el != nil; el = el.Next() {
- file := el.Value
+
+ tor.SelectedFiles.IterCb(func(filename string, file *torrent.File) {
if file.Link == "" {
// will be caught by torrent manager's repairAll
// just skip it for now
- continue
+ return
}
- filename := filepath.Base(file.Path)
filePath := filepath.Join(requestPath, url.PathEscape(filename))
htmlDoc += fmt.Sprintf("- %s
", filePath, filename)
- }
+ })
+
return &htmlDoc, nil
}
diff --git a/internal/torrent/manager.go b/internal/torrent/manager.go
index 1e3d8bc..3594e73 100644
--- a/internal/torrent/manager.go
+++ b/internal/torrent/manager.go
@@ -15,15 +15,13 @@ import (
"github.com/debridmediamanager.com/zurg/internal/config"
"github.com/debridmediamanager.com/zurg/pkg/logutil"
"github.com/debridmediamanager.com/zurg/pkg/realdebrid"
- "github.com/elliotchance/orderedmap/v2"
+ cmap "github.com/orcaman/concurrent-map/v2"
"go.uber.org/zap"
)
type TorrentManager struct {
- config config.ConfigInterface
- DirectoryMap *orderedmap.OrderedMap[string, *orderedmap.OrderedMap[string, *Torrent]]
- TorrentMap *orderedmap.OrderedMap[string, *Torrent] // accessKey -> Torrent
- repairMap *orderedmap.OrderedMap[string, time.Time] // accessKey -> time last repaired
+ cfg config.ConfigInterface
+ DirectoryMap cmap.ConcurrentMap[string, cmap.ConcurrentMap[string, *Torrent]] // directory -> accessKey -> Torrent
requiredVersion string
checksum string
api *realdebrid.RealDebrid
@@ -35,19 +33,24 @@ type TorrentManager struct {
// NewTorrentManager creates a new torrent manager
// it will fetch all torrents and their info in the background
// and store them in-memory and cached in files
-func NewTorrentManager(config config.ConfigInterface, api *realdebrid.RealDebrid) *TorrentManager {
+func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid) *TorrentManager {
t := &TorrentManager{
- config: config,
- DirectoryMap: orderedmap.NewOrderedMap[string, *orderedmap.OrderedMap[string, *Torrent]](),
- TorrentMap: orderedmap.NewOrderedMap[string, *Torrent](),
- repairMap: orderedmap.NewOrderedMap[string, time.Time](),
+ cfg: cfg,
+ DirectoryMap: cmap.New[cmap.ConcurrentMap[string, *Torrent]](),
requiredVersion: "10.11.2023",
api: api,
- workerPool: make(chan bool, config.GetNumOfWorkers()),
+ workerPool: make(chan bool, cfg.GetNumOfWorkers()),
mu: &sync.Mutex{},
log: logutil.NewLogger().Named("manager"),
}
+ // create special directory
+ t.DirectoryMap.Set("__all__", cmap.New[*Torrent]()) // key is AccessKey
+ // create directory maps
+ for _, directory := range cfg.GetDirectories() {
+ t.DirectoryMap.Set(directory, cmap.New[*Torrent]())
+ }
+
newTorrents, _, err := t.api.GetTorrents(0)
if err != nil {
t.log.Fatalf("Cannot get torrents: %v\n", err)
@@ -60,72 +63,109 @@ func NewTorrentManager(config config.ConfigInterface, api *realdebrid.RealDebrid
go func(idx int) {
defer wg.Done()
t.workerPool <- true
+ // TODO wrap getMoreInfo and limit the execution time!
torrentsChan <- t.getMoreInfo(newTorrents[idx])
<-t.workerPool
}(i)
}
- t.log.Infof("Received %d torrents", len(newTorrents))
wg.Wait()
- t.log.Infof("Fetched info for %d torrents", len(newTorrents))
close(torrentsChan)
- count := 0
- for newTorrent := range torrentsChan {
- if newTorrent == nil {
- count++
+ t.log.Infof("Fetched info for %d torrents", len(newTorrents))
+
+ noInfoCount := 0
+ allCt := 0
+ allTorrents, _ := t.DirectoryMap.Get("__all__")
+ for info := range torrentsChan {
+ allCt++
+ if info == nil {
+ noInfoCount++
continue
}
- torrent, _ := t.TorrentMap.Get(newTorrent.AccessKey)
- if torrent != nil {
- t.mu.Lock()
- t.TorrentMap.Set(newTorrent.AccessKey, t.mergeToMain(torrent, newTorrent))
- t.mu.Unlock()
+ if torrent, exists := allTorrents.Get(info.AccessKey); exists {
+ mainTorrent := t.mergeToMain(torrent, info)
+ allTorrents.Set(info.AccessKey, mainTorrent)
} else {
- t.mu.Lock()
- t.TorrentMap.Set(newTorrent.AccessKey, newTorrent)
- t.mu.Unlock()
+ allTorrents.Set(info.AccessKey, info)
}
}
- t.log.Infof("Compiled all torrents to %d unique movies and shows, %d were missing info", t.TorrentMap.Len(), count)
+
+ anotherCt := 0
+ allTorrents.IterCb(func(accessKey string, torrent *Torrent) {
+ anotherCt++
+ // get IDs
+ var torrentIDs []string
+ for _, instance := range torrent.Instances {
+ torrentIDs = append(torrentIDs, instance.ID)
+ }
+
+ // get filenames
+ var filenames []string
+ torrent.SelectedFiles.IterCb(func(_ string, file *File) {
+ filenames = append(filenames, file.Path)
+ })
+
+ // Map torrents to directories
+ switch t.cfg.GetVersion() {
+ case "v1":
+ configV1 := t.cfg.(*config.ZurgConfigV1)
+ for _, directories := range configV1.GetGroupMap() {
+ for _, directory := range directories {
+ if t.cfg.MeetsConditions(directory, torrent.AccessKey, torrentIDs, filenames) {
+ torrents, _ := t.DirectoryMap.Get(directory)
+ torrents.Set(accessKey, torrent)
+ break
+ }
+ }
+ }
+ }
+ })
+
+ t.log.Infof("Compiled into %d torrents, %d were missing info", allTorrents.Count(), noInfoCount)
+
t.checksum = t.getChecksum()
- if t.config.EnableRepair() {
- go t.repairAll()
- }
- // go t.startRefreshJob()
+ // if t.config.EnableRepair() {
+ // go t.repairAll()
+ // }
+ go t.startRefreshJob()
return t
}
func (t *TorrentManager) mergeToMain(t1, t2 *Torrent) *Torrent {
- merged := t1
+ mainTorrent := t1
- // Merge SelectedFiles
- // side note: iteration works!
- for el := t2.SelectedFiles.Front(); el != nil; el = el.Next() {
- if _, ok := merged.SelectedFiles.Get(el.Key); !ok {
- merged.SelectedFiles.Set(el.Key, el.Value)
+ // Merge SelectedFiles - itercb accesses a different copy of the selectedfiles map
+ t2.SelectedFiles.IterCb(func(key string, file *File) {
+ // see if it already exists in the main torrent
+ if mainFile, ok := mainTorrent.SelectedFiles.Get(key); !ok {
+ mainTorrent.SelectedFiles.Set(key, file)
+ } else if file.Link != "" && mainFile.Link == "" {
+ // if it exists, but the link is empty, then we can update it
+ mainTorrent.SelectedFiles.Set(key, file)
}
- }
+ })
// Merge Instances
- merged.Instances = append(t1.Instances, t2.Instances...)
+ mainTorrent.Instances = append(t1.Instances, t2.Instances...)
// LatestAdded
if t1.LatestAdded < t2.LatestAdded {
- merged.LatestAdded = t2.LatestAdded
+ mainTorrent.LatestAdded = t2.LatestAdded
}
// InProgress - if one of the instances is in progress, then the whole torrent is in progress
- for _, instance := range merged.Instances {
+ mainTorrent.InProgress = false
+ for _, instance := range mainTorrent.Instances {
if instance.Progress != 100 {
- merged.InProgress = true
+ mainTorrent.InProgress = true
}
if instance.ForRepair {
- merged.ForRepair = true
+ mainTorrent.ForRepair = true
}
}
- return merged
+ return mainTorrent
}
// proxy
@@ -195,7 +235,7 @@ func (t *TorrentManager) getChecksum() string {
func (t *TorrentManager) startRefreshJob() {
t.log.Info("Starting periodic refresh")
for {
- <-time.After(time.Duration(t.config.GetRefreshEverySeconds()) * time.Second)
+ <-time.After(time.Duration(t.cfg.GetRefreshEverySeconds()) * time.Second)
checksum := t.getChecksum()
if checksum == t.checksum {
@@ -220,57 +260,85 @@ func (t *TorrentManager) startRefreshJob() {
<-t.workerPool
}(i)
}
+ wg.Wait()
+ close(torrentsChan)
+ t.log.Infof("Fetched info for %d torrents", len(newTorrents))
- // side note: iteration works!
+ noInfoCount := 0
+ allTorrents, _ := t.DirectoryMap.Get("__all__")
+ var retain []string
+ for info := range torrentsChan {
+ if info == nil {
+ noInfoCount++
+ continue
+ }
+ retain = append(retain, info.AccessKey)
+ if torrent, exists := allTorrents.Get(info.AccessKey); exists {
+ mainTorrent := t.mergeToMain(torrent, info)
+ allTorrents.Set(info.AccessKey, mainTorrent)
+ } else {
+ allTorrents.Set(info.AccessKey, info)
+ }
+ }
+
+ allTorrents.IterCb(func(accessKey string, torrent *Torrent) {
+ // get IDs
+ var torrentIDs []string
+ for _, instance := range torrent.Instances {
+ torrentIDs = append(torrentIDs, instance.ID)
+ }
+
+ // get filenames
+ var filenames []string
+ torrent.SelectedFiles.IterCb(func(_ string, file *File) {
+ filenames = append(filenames, file.Path)
+ })
+
+ // Map torrents to directories
+ switch t.cfg.GetVersion() {
+ case "v1":
+ configV1 := t.cfg.(*config.ZurgConfigV1)
+ for _, directories := range configV1.GetGroupMap() {
+ for _, directory := range directories {
+ if t.cfg.MeetsConditions(directory, torrent.AccessKey, torrentIDs, filenames) {
+ torrents, _ := t.DirectoryMap.Get(directory)
+ torrents.Set(accessKey, torrent)
+ break
+ }
+ }
+ }
+ }
+ })
+
+ // delete torrents that no longer exist
var toDelete []string
- for el := t.TorrentMap.Front(); el != nil; el = el.Next() {
+ allTorrents.IterCb(func(_ string, torrent *Torrent) {
found := false
- for _, newTorrent := range newTorrents {
- if newTorrent.ID == el.Value.AccessKey {
+ for _, accessKey := range retain {
+ if torrent.AccessKey == accessKey {
found = true
break
}
}
if !found {
- toDelete = append(toDelete, el.Key)
+ toDelete = append(toDelete, torrent.AccessKey)
}
- }
+ })
for _, accessKey := range toDelete {
- t.TorrentMap.Delete(accessKey)
- for el := t.DirectoryMap.Front(); el != nil; el = el.Next() {
- torrents := el.Value
- for el2 := torrents.Front(); el2 != nil; el2 = el2.Next() {
- if el2.Key == accessKey {
- torrents.Delete(accessKey)
- break
- }
- }
- }
+ t.DirectoryMap.IterCb(func(_ string, torrents cmap.ConcurrentMap[string, *Torrent]) {
+ torrents.Remove(accessKey)
+ })
}
+ // end delete torrents that no longer exist
+
+ t.log.Infof("Compiled into %d torrents, %d were missing info", allTorrents.Count(), noInfoCount)
- wg.Wait()
- close(torrentsChan)
- for newTorrent := range torrentsChan {
- if newTorrent == nil {
- continue
- }
- torrent, _ := t.TorrentMap.Get(newTorrent.AccessKey)
- if torrent != nil {
- t.mu.Lock()
- t.TorrentMap.Set(newTorrent.AccessKey, t.mergeToMain(torrent, newTorrent))
- t.mu.Unlock()
- } else {
- t.mu.Lock()
- t.TorrentMap.Set(newTorrent.AccessKey, newTorrent)
- t.mu.Unlock()
- }
- }
t.checksum = t.getChecksum()
- if t.config.EnableRepair() {
- go t.repairAll()
- }
- go OnLibraryUpdateHook(t.config)
+ // if t.config.EnableRepair() {
+ // go t.repairAll()
+ // }
+ go OnLibraryUpdateHook(t.cfg)
}
}
@@ -299,7 +367,7 @@ func (t *TorrentManager) getMoreInfo(rdTorrent realdebrid.Torrent) *Torrent {
// it also has a Link field, which can be empty
// if it is empty, it means the file is no longer available
// Files+Links together are the same as SelectedFiles
- selectedFiles := orderedmap.NewOrderedMap[string, *File]()
+ selectedFiles := cmap.New[*File]()
streamableCount := 0
// if some Links are empty, we need to repair it
forRepair := false
@@ -317,66 +385,50 @@ func (t *TorrentManager) getMoreInfo(rdTorrent realdebrid.Torrent) *Torrent {
ZurgFS: hashStringToFh(file.Path + info.Hash),
})
}
- if selectedFiles.Len() > len(info.Links) && info.Progress == 100 {
+ if selectedFiles.Count() > len(info.Links) && info.Progress == 100 {
// chaotic file means RD will not output the desired file selection
// e.g. even if we select just a single mkv, it will output a rar
var isChaotic bool
selectedFiles, isChaotic = t.organizeChaos(info.Links, selectedFiles)
if isChaotic {
- t.log.Warnf("Torrent id=%s %s is unrepairable, it is always returning a rar file (it will no longer show up in your directories)", info.ID, info.Name)
+ t.log.Warnf("Torrent id=%s %s is unplayable; it is always returning a rar file (it will no longer show up in your directories)", info.ID, info.Name)
// t.log.Debugf("You can try fixing it yourself magnet:?xt=urn:btih:%s", info.Hash)
return nil
} else {
- if streamableCount > 1 {
+ if streamableCount > 1 && t.cfg.EnableRepair() {
// case for repair 1: it's missing some links (or all links)
// if we download it as is, we might get the same file over and over again
// so we need to redownload it with other files selected
// that is why we check if there are other streamable files
t.log.Infof("Torrent id=%s %s marked for repair", info.ID, info.Name)
forRepair = true
- } else {
- t.log.Warnf("Torrent id=%s %s is unrepairable, the lone streamable link has expired (it will no longer show up in your directories)", info.ID, info.Name)
+ } else if streamableCount == 1 {
+ t.log.Warnf("Torrent id=%s %s is unplayable; the lone streamable link has expired (it will no longer show up in your directories)", info.ID, info.Name)
// t.log.Debugf("You can try fixing it yourself magnet:?xt=urn:btih:%s", info.Hash)
return nil
}
}
- } else if selectedFiles.Len() == len(info.Links) {
+ } else if selectedFiles.Count() == len(info.Links) {
// all links are still intact! good!
// side note: iteration works!
i := 0
- for el := selectedFiles.Front(); el != nil; el = el.Next() {
+ selectedFiles.IterCb(func(_ string, file *File) {
if i < len(info.Links) {
- file := el.Value
file.Link = info.Links[i] // verified working!
- selectedFiles.Set(el.Key, file)
i++
}
- }
+ })
}
info.ForRepair = forRepair
torrent := Torrent{
AccessKey: t.getName(info.Name, info.OriginalName),
SelectedFiles: selectedFiles,
- Directories: t.getDirectories(info),
LatestAdded: info.Added,
InProgress: info.Progress != 100,
Instances: []realdebrid.TorrentInfo{*info},
}
- for _, directory := range torrent.Directories {
- if _, ok := t.DirectoryMap.Get(directory); !ok {
- newMap := orderedmap.NewOrderedMap[string, *Torrent]()
- t.mu.Lock()
- t.DirectoryMap.Set(directory, newMap)
- t.mu.Unlock()
- } else {
- torrents, _ := t.DirectoryMap.Get(directory)
- t.mu.Lock()
- torrents.Set(torrent.AccessKey, &torrent)
- t.mu.Unlock()
- }
- }
- if selectedFiles.Len() > 0 && torrentFromFile == nil {
+ if selectedFiles.Count() > 0 && torrentFromFile == nil {
t.writeToFile(info) // only when there are selected files, else it's useless
}
return &torrent
@@ -390,7 +442,7 @@ func hashStringToFh(s string) (fh uint64) {
func (t *TorrentManager) getName(name, originalName string) string {
// drop the extension from the name
- if t.config.EnableRetainFolderNameExtension() && strings.Contains(name, originalName) {
+ if t.cfg.EnableRetainFolderNameExtension() && strings.Contains(name, originalName) {
return name
} else {
ret := strings.TrimSuffix(originalName, ".mp4")
@@ -399,38 +451,6 @@ func (t *TorrentManager) getName(name, originalName string) string {
}
}
-func (t *TorrentManager) getDirectories(torrent *realdebrid.TorrentInfo) []string {
- var ret []string
- // Map torrents to directories
- switch t.config.GetVersion() {
- case "v1":
- configV1 := t.config.(*config.ZurgConfigV1)
- groupMap := configV1.GetGroupMap()
- // for every group, iterate over every torrent
- // and then sprinkle/distribute the torrents to the directories of the group
- for _, directories := range groupMap {
- for _, directory := range directories {
- var filenames []string
- for _, file := range torrent.Files {
- if file.Selected == 0 {
- continue
- }
- filenames = append(filenames, file.Path)
- }
- accessKey := t.getName(torrent.Name, torrent.OriginalName)
- if configV1.MeetsConditions(directory, torrent.ID, accessKey, filenames) {
- ret = append(ret, directory)
- break // we found a directory for this torrent for this group, so we can stop looking for more
- }
- }
- }
- default:
- t.log.Error("Unknown config version")
- }
- // t.log.Debugf("Torrent %s is in directories %v", t.getName(torrent.Name, torrent.OriginalName), ret)
- return ret
-}
-
func (t *TorrentManager) writeToFile(torrent *realdebrid.TorrentInfo) error {
filePath := "data/" + torrent.ID + ".bin"
file, err := os.Create(filePath)
@@ -473,7 +493,7 @@ func (t *TorrentManager) readFromFile(torrentID string) *realdebrid.TorrentInfo
return &torrent
}
-func (t *TorrentManager) organizeChaos(links []string, selectedFiles *orderedmap.OrderedMap[string, *File]) (*orderedmap.OrderedMap[string, *File], bool) {
+func (t *TorrentManager) organizeChaos(links []string, selectedFiles cmap.ConcurrentMap[string, *File]) (cmap.ConcurrentMap[string, *File], bool) {
type Result struct {
Response *realdebrid.UnrestrictResponse
}
@@ -503,13 +523,12 @@ func (t *TorrentManager) organizeChaos(links []string, selectedFiles *orderedmap
continue
}
found := false
- // side note: iteration works!
- for el := selectedFiles.Front(); el != nil; el = el.Next() {
- if file, _ := selectedFiles.Get(el.Key); strings.Contains(file.Path, result.Response.Filename) {
+ selectedFiles.IterCb(func(_ string, file *File) {
+ if strings.Contains(file.Path, result.Response.Filename) {
file.Link = result.Response.Link
found = true
}
- }
+ })
if !found {
if result.Response.Streamable == 1 {
selectedFiles.Set(filepath.Base(result.Response.Filename), &File{
@@ -532,219 +551,219 @@ func (t *TorrentManager) organizeChaos(links []string, selectedFiles *orderedmap
return selectedFiles, isChaotic
}
-func (t *TorrentManager) repairAll() {
- t.log.Info("Checking for torrents to repair")
- // side note: iteration works!
- for el := t.TorrentMap.Front(); el != nil; el = el.Next() {
- torrent := el.Value
- // do not repair if in progress
- if torrent.InProgress {
- continue
- }
+// func (t *TorrentManager) repairAll() {
+// t.log.Info("Checking for torrents to repair")
+// // side note: iteration works!
+// for el := t.TorrentMap.Front(); el != nil; el = el.Next() {
+// torrent := el.Value
+// // do not repair if in progress
+// if torrent.InProgress {
+// continue
+// }
- // do not repair if all files have links
- forRepair := false
- for el2 := torrent.SelectedFiles.Front(); el2 != nil; el2 = el2.Next() {
- file := el2.Value
- if file.Link == "" {
- forRepair = true
- break
- }
- }
- if !forRepair {
- // if it was marked for repair, unmark it
- torrent.ForRepair = false
- t.mu.Lock()
- t.TorrentMap.Set(torrent.AccessKey, torrent)
- t.mu.Unlock()
- continue
- }
+// // do not repair if all files have links
+// forRepair := false
+// for el2 := torrent.SelectedFiles.Front(); el2 != nil; el2 = el2.Next() {
+// file := el2.Value
+// if file.Link == "" {
+// forRepair = true
+// break
+// }
+// }
+// if !forRepair {
+// // if it was marked for repair, unmark it
+// torrent.ForRepair = false
+// t.mu.Lock()
+// t.TorrentMap.Set(torrent.AccessKey, torrent)
+// t.mu.Unlock()
+// continue
+// }
- // when getting info, we mark it for repair if it's missing some links
- if torrent.ForRepair {
- t.log.Infof("Found torrent for repair: %s", torrent.AccessKey)
- t.Repair(torrent.AccessKey)
- break // only repair the first one for repair and then move on
- }
- }
-}
+// // when getting info, we mark it for repair if it's missing some links
+// if torrent.ForRepair {
+// t.log.Infof("Found torrent for repair: %s", torrent.AccessKey)
+// t.Repair(torrent.AccessKey)
+// break // only repair the first one for repair and then move on
+// }
+// }
+// }
-func (t *TorrentManager) Repair(accessKey string) {
- if lastRepair, ok := t.repairMap.Get(accessKey); ok {
- if time.Since(lastRepair) < time.Duration(24*time.Hour) { // magic number: 24 hrs
- return
- }
- }
- t.mu.Lock()
- t.repairMap.Set(accessKey, time.Now())
- t.mu.Unlock()
+// func (t *TorrentManager) Repair(accessKey string) {
+// if lastRepair, ok := t.repairMap.Get(accessKey); ok {
+// if time.Since(lastRepair) < time.Duration(24*time.Hour) { // magic number: 24 hrs
+// return
+// }
+// }
+// t.mu.Lock()
+// t.repairMap.Set(accessKey, time.Now())
+// t.mu.Unlock()
- if !t.config.EnableRepair() {
- t.log.Warn("Repair is disabled; if you do not have other zurg instances running, you should enable repair")
- return
- }
+// if !t.config.EnableRepair() {
+// t.log.Warn("Repair is disabled; if you do not have other zurg instances running, you should enable repair")
+// return
+// }
- torrent, _ := t.TorrentMap.Get(accessKey)
- if torrent == nil {
- t.log.Warnf("Cannot find torrent %s anymore to repair it", accessKey)
- return
- }
- if torrent.InProgress {
- t.log.Infof("Torrent %s is in progress, cannot repair", torrent.AccessKey)
- return
- }
+// torrent, _ := t.TorrentMap.Get(accessKey)
+// if torrent == nil {
+// t.log.Warnf("Cannot find torrent %s anymore to repair it", accessKey)
+// return
+// }
+// if torrent.InProgress {
+// t.log.Infof("Torrent %s is in progress, cannot repair", torrent.AccessKey)
+// return
+// }
- // check if we can still add more downloads
- proceed := t.canCapacityHandle()
- if !proceed {
- t.log.Error("Cannot add more torrents, ignoring repair request")
- return
- }
+// // check if we can still add more downloads
+// proceed := t.canCapacityHandle()
+// if !proceed {
+// t.log.Error("Cannot add more torrents, ignoring repair request")
+// return
+// }
- // make the file messy
- var links []string
- for el := torrent.SelectedFiles.Front(); el != nil; el = el.Next() {
- file := el.Value
- if file.Link != "" {
- links = append(links, file.Link)
- }
- file.Link = ""
- }
- selectedFiles, _ := t.organizeChaos(links, torrent.SelectedFiles)
- torrent.SelectedFiles = selectedFiles
- t.mu.Lock()
- t.TorrentMap.Set(torrent.AccessKey, torrent)
- t.mu.Unlock()
+// // make the file messy
+// var links []string
+// for el := torrent.SelectedFiles.Front(); el != nil; el = el.Next() {
+// file := el.Value
+// if file.Link != "" {
+// links = append(links, file.Link)
+// }
+// file.Link = ""
+// }
+// selectedFiles, _ := t.organizeChaos(links, torrent.SelectedFiles)
+// torrent.SelectedFiles = selectedFiles
+// t.mu.Lock()
+// t.TorrentMap.Set(torrent.AccessKey, torrent)
+// t.mu.Unlock()
- // first solution: add the same selection, maybe it can be fixed by reinsertion?
- if t.reinsertTorrent(torrent, "") {
- t.log.Infof("Successfully downloaded torrent %s to repair it", torrent.AccessKey)
- return
- }
- // if all the selected files are missing but there are other streamable files
- var missingFiles []File
- for el := torrent.SelectedFiles.Front(); el != nil; el = el.Next() {
- file := el.Value
- if file.Link == "" {
- missingFiles = append(missingFiles, *file)
- }
- }
- if len(missingFiles) > 0 {
- t.log.Infof("Redownloading %d missing files for torrent %s", len(missingFiles), torrent.AccessKey)
- // if not, last resort: add only the missing files but do it in 2 batches
- half := len(missingFiles) / 2
- missingFiles1 := strings.Join(getFileIDs(missingFiles[:half]), ",")
- missingFiles2 := strings.Join(getFileIDs(missingFiles[half:]), ",")
- if missingFiles1 != "" {
- t.reinsertTorrent(torrent, missingFiles1)
- }
- if missingFiles2 != "" {
- t.reinsertTorrent(torrent, missingFiles2)
- }
- }
-}
+// // first solution: add the same selection, maybe it can be fixed by reinsertion?
+// if t.reinsertTorrent(torrent, "") {
+// t.log.Infof("Successfully downloaded torrent %s to repair it", torrent.AccessKey)
+// return
+// }
+// // if all the selected files are missing but there are other streamable files
+// var missingFiles []File
+// for el := torrent.SelectedFiles.Front(); el != nil; el = el.Next() {
+// file := el.Value
+// if file.Link == "" {
+// missingFiles = append(missingFiles, *file)
+// }
+// }
+// if len(missingFiles) > 0 {
+// t.log.Infof("Redownloading %d missing files for torrent %s", len(missingFiles), torrent.AccessKey)
+// // if not, last resort: add only the missing files but do it in 2 batches
+// half := len(missingFiles) / 2
+// missingFiles1 := strings.Join(getFileIDs(missingFiles[:half]), ",")
+// missingFiles2 := strings.Join(getFileIDs(missingFiles[half:]), ",")
+// if missingFiles1 != "" {
+// t.reinsertTorrent(torrent, missingFiles1)
+// }
+// if missingFiles2 != "" {
+// t.reinsertTorrent(torrent, missingFiles2)
+// }
+// }
+// }
-func (t *TorrentManager) reinsertTorrent(torrent *Torrent, missingFiles string) bool {
- // if missingFiles is not provided, look for missing files
- if missingFiles == "" {
- var tmpSelection string
- for el := torrent.SelectedFiles.Front(); el != nil; el = el.Next() {
- file := el.Value
- tmpSelection += fmt.Sprintf("%d,", file.ID)
- }
- if tmpSelection == "" {
- return false
- }
- if len(tmpSelection) > 0 {
- missingFiles = tmpSelection[:len(tmpSelection)-1]
- }
- }
+// func (t *TorrentManager) reinsertTorrent(torrent *Torrent, missingFiles string) bool {
+// // if missingFiles is not provided, look for missing files
+// if missingFiles == "" {
+// var tmpSelection string
+// for el := torrent.SelectedFiles.Front(); el != nil; el = el.Next() {
+// file := el.Value
+// tmpSelection += fmt.Sprintf("%d,", file.ID)
+// }
+// if tmpSelection == "" {
+// return false
+// }
+// if len(tmpSelection) > 0 {
+// missingFiles = tmpSelection[:len(tmpSelection)-1]
+// }
+// }
- // redownload torrent
- resp, err := t.api.AddMagnetHash(torrent.Instances[0].Hash)
- if err != nil {
- t.log.Warnf("Cannot redownload torrent: %v", err)
- return false
- }
- time.Sleep(1 * time.Second)
+// // redownload torrent
+// resp, err := t.api.AddMagnetHash(torrent.Instances[0].Hash)
+// if err != nil {
+// t.log.Warnf("Cannot redownload torrent: %v", err)
+// return false
+// }
+// time.Sleep(1 * time.Second)
- // select files
- newTorrentID := resp.ID
- err = t.api.SelectTorrentFiles(newTorrentID, missingFiles)
- if err != nil {
- t.log.Warnf("Cannot start redownloading: %v", err)
- t.api.DeleteTorrent(newTorrentID)
- return false
- }
- time.Sleep(10 * time.Second)
+// // select files
+// newTorrentID := resp.ID
+// err = t.api.SelectTorrentFiles(newTorrentID, missingFiles)
+// if err != nil {
+// t.log.Warnf("Cannot start redownloading: %v", err)
+// t.api.DeleteTorrent(newTorrentID)
+// return false
+// }
+// time.Sleep(10 * time.Second)
- // see if the torrent is ready
- info, err := t.api.GetTorrentInfo(newTorrentID)
- if err != nil {
- t.log.Warnf("Cannot get info on redownloaded torrent id=%s : %v", newTorrentID, err)
- t.api.DeleteTorrent(newTorrentID)
- return false
- }
+// // see if the torrent is ready
+// info, err := t.api.GetTorrentInfo(newTorrentID)
+// if err != nil {
+// t.log.Warnf("Cannot get info on redownloaded torrent id=%s : %v", newTorrentID, err)
+// t.api.DeleteTorrent(newTorrentID)
+// return false
+// }
- if info.Status == "magnet_error" || info.Status == "error" || info.Status == "virus" || info.Status == "dead" {
- t.log.Warnf("The redownloaded torrent id=%s is in error state: %s", newTorrentID, info.Status)
- t.api.DeleteTorrent(newTorrentID)
- return false
- }
+// if info.Status == "magnet_error" || info.Status == "error" || info.Status == "virus" || info.Status == "dead" {
+// t.log.Warnf("The redownloaded torrent id=%s is in error state: %s", newTorrentID, info.Status)
+// t.api.DeleteTorrent(newTorrentID)
+// return false
+// }
- if info.Progress != 100 {
- t.log.Infof("Torrent id=%s is not cached anymore so we have to wait until completion (this should fix the issue already)", info.ID)
- return true
- }
+// if info.Progress != 100 {
+// t.log.Infof("Torrent id=%s is not cached anymore so we have to wait until completion (this should fix the issue already)", info.ID)
+// return true
+// }
- missingCount := len(strings.Split(missingFiles, ","))
- if len(info.Links) != missingCount {
- t.log.Infof("It did not fix the issue for id=%s, only got %d files but we need %d, undoing", info.ID, len(info.Links), missingCount)
- t.api.DeleteTorrent(newTorrentID)
- return false
- }
+// missingCount := len(strings.Split(missingFiles, ","))
+// if len(info.Links) != missingCount {
+// t.log.Infof("It did not fix the issue for id=%s, only got %d files but we need %d, undoing", info.ID, len(info.Links), missingCount)
+// t.api.DeleteTorrent(newTorrentID)
+// return false
+// }
- t.log.Infof("Repair successful id=%s", newTorrentID)
- return true
-}
+// t.log.Infof("Repair successful id=%s", newTorrentID)
+// return true
+// }
-func (t *TorrentManager) canCapacityHandle() bool {
- // max waiting time is 45 minutes
- const maxRetries = 50
- const baseDelay = 1 * time.Second
- const maxDelay = 60 * time.Second
- retryCount := 0
- for {
- count, err := t.api.GetActiveTorrentCount()
- if err != nil {
- t.log.Warnf("Cannot get active downloads count: %v", err)
- if retryCount >= maxRetries {
- t.log.Error("Max retries reached. Exiting.")
- return false
- }
- delay := time.Duration(math.Pow(2, float64(retryCount))) * baseDelay
- if delay > maxDelay {
- delay = maxDelay
- }
- time.Sleep(delay)
- retryCount++
- continue
- }
+// func (t *TorrentManager) canCapacityHandle() bool {
+// // max waiting time is 45 minutes
+// const maxRetries = 50
+// const baseDelay = 1 * time.Second
+// const maxDelay = 60 * time.Second
+// retryCount := 0
+// for {
+// count, err := t.api.GetActiveTorrentCount()
+// if err != nil {
+// t.log.Warnf("Cannot get active downloads count: %v", err)
+// if retryCount >= maxRetries {
+// t.log.Error("Max retries reached. Exiting.")
+// return false
+// }
+// delay := time.Duration(math.Pow(2, float64(retryCount))) * baseDelay
+// if delay > maxDelay {
+// delay = maxDelay
+// }
+// time.Sleep(delay)
+// retryCount++
+// continue
+// }
- if count.DownloadingCount < count.MaxNumberOfTorrents {
- t.log.Infof("We can still add a new torrent, we have capacity for %d more", count.MaxNumberOfTorrents-count.DownloadingCount)
- return true
- }
+// if count.DownloadingCount < count.MaxNumberOfTorrents {
+// t.log.Infof("We can still add a new torrent, we have capacity for %d more", count.MaxNumberOfTorrents-count.DownloadingCount)
+// return true
+// }
- if retryCount >= maxRetries {
- t.log.Error("Max retries reached, exiting")
- return false
- }
- delay := time.Duration(math.Pow(2, float64(retryCount))) * baseDelay
- if delay > maxDelay {
- delay = maxDelay
- }
- time.Sleep(delay)
- retryCount++
- }
-}
+// if retryCount >= maxRetries {
+// t.log.Error("Max retries reached, exiting")
+// return false
+// }
+// delay := time.Duration(math.Pow(2, float64(retryCount))) * baseDelay
+// if delay > maxDelay {
+// delay = maxDelay
+// }
+// time.Sleep(delay)
+// retryCount++
+// }
+// }
diff --git a/internal/torrent/types.go b/internal/torrent/types.go
index 9e30a8f..9cba950 100644
--- a/internal/torrent/types.go
+++ b/internal/torrent/types.go
@@ -2,13 +2,12 @@ package torrent
import (
"github.com/debridmediamanager.com/zurg/pkg/realdebrid"
- "github.com/elliotchance/orderedmap/v2"
+ cmap "github.com/orcaman/concurrent-map/v2"
)
type Torrent struct {
AccessKey string
- SelectedFiles *orderedmap.OrderedMap[string, *File]
- Directories []string
+ SelectedFiles cmap.ConcurrentMap[string, *File]
LatestAdded string
InProgress bool
ForRepair bool
diff --git a/internal/universal/get.go b/internal/universal/get.go
index 11ee023..4f2634e 100644
--- a/internal/universal/get.go
+++ b/internal/universal/get.go
@@ -46,22 +46,28 @@ func HandleGetRequest(w http.ResponseWriter, r *http.Request, t *intTor.TorrentM
accessKey := segments[len(segments)-2]
filename := segments[len(segments)-1]
- torrent, _ := t.TorrentMap.Get(accessKey)
- if torrent == nil {
+ torrents, ok := t.DirectoryMap.Get(baseDirectory)
+ if !ok {
+ log.Warnf("Cannot find directory %s", baseDirectory)
+ http.Error(w, "File not found", http.StatusNotFound)
+ return
+ }
+ torrent, ok := torrents.Get(accessKey)
+ if !ok {
log.Warnf("Cannot find torrent %s in the directory %s", accessKey, baseDirectory)
http.Error(w, "File not found", http.StatusNotFound)
return
}
- file, _ := torrent.SelectedFiles.Get(filename)
- if file == nil {
+ file, ok := torrent.SelectedFiles.Get(filename)
+ if !ok {
log.Warnf("Cannot find file from path %s", requestPath)
http.Error(w, "File not found", http.StatusNotFound)
return
}
if data, exists := cache.Get(requestPath); exists {
- streamFileToResponse(torrent, data, w, r, t, c, log)
+ streamFileToResponse(data, w, r, t, c, log)
return
}
@@ -75,7 +81,7 @@ func HandleGetRequest(w http.ResponseWriter, r *http.Request, t *intTor.TorrentM
resp := t.UnrestrictUntilOk(link)
if resp == nil {
- go t.Repair(torrent.AccessKey)
+ // go t.Repair(torrent.AccessKey)
log.Warnf("File %s is no longer available, torrent is marked for repair", file.Path)
streamErrorVideo("https://www.youtube.com/watch?v=gea_FJrtFVA", w, r, t, c, log)
return
@@ -93,15 +99,15 @@ func HandleGetRequest(w http.ResponseWriter, r *http.Request, t *intTor.TorrentM
}
}
cache.Add(requestPath, resp.Download)
- streamFileToResponse(torrent, resp.Download, w, r, t, c, log)
+ streamFileToResponse(resp.Download, w, r, t, c, log)
}
-func streamFileToResponse(torrent *intTor.Torrent, url string, w http.ResponseWriter, r *http.Request, t *intTor.TorrentManager, c config.ConfigInterface, log *zap.SugaredLogger) {
+func streamFileToResponse(url string, w http.ResponseWriter, r *http.Request, torMgr *intTor.TorrentManager, cfg config.ConfigInterface, log *zap.SugaredLogger) {
// Create a new request for the file download.
req, err := http.NewRequest(http.MethodGet, url, nil)
if err != nil {
log.Errorf("Error creating new request: %v", err)
- streamErrorVideo("https://www.youtube.com/watch?v=H3NSrObyAxM", w, r, t, c, log)
+ streamErrorVideo("https://www.youtube.com/watch?v=H3NSrObyAxM", w, r, torMgr, cfg, log)
return
}
@@ -111,25 +117,25 @@ func streamFileToResponse(torrent *intTor.Torrent, url string, w http.ResponseWr
}
// Create a custom HTTP client
- client := zurghttp.NewHTTPClient(c.GetToken(), 10, c)
+ client := zurghttp.NewHTTPClient(cfg.GetToken(), 10, cfg)
resp, err := client.Do(req)
if err != nil {
log.Warnf("Cannot download file %v ; torrent is marked for repair", err)
- if torrent != nil {
- go t.Repair(torrent.AccessKey)
- }
- streamErrorVideo("https://www.youtube.com/watch?v=FSSd8cponAA", w, r, t, c, log)
+ // if torrent != nil {
+ // go t.Repair(torrent.AccessKey)
+ // }
+ streamErrorVideo("https://www.youtube.com/watch?v=FSSd8cponAA", w, r, torMgr, cfg, log)
return
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusPartialContent {
log.Warnf("Received a %s status code ; torrent is marked for repair", resp.Status)
- if torrent != nil {
- go t.Repair(torrent.AccessKey)
- }
- streamErrorVideo("https://www.youtube.com/watch?v=BcseUxviVqE", w, r, t, c, log)
+ // if torrent != nil {
+ // go t.Repair(torrent.AccessKey)
+ // }
+ streamErrorVideo("https://www.youtube.com/watch?v=BcseUxviVqE", w, r, torMgr, cfg, log)
return
}
@@ -139,7 +145,7 @@ func streamFileToResponse(torrent *intTor.Torrent, url string, w http.ResponseWr
}
}
- buf := make([]byte, c.GetNetworkBufferSize())
+ buf := make([]byte, cfg.GetNetworkBufferSize())
io.CopyBuffer(w, resp.Body, buf)
}
@@ -149,7 +155,7 @@ func streamErrorVideo(link string, w http.ResponseWriter, r *http.Request, t *in
http.Error(w, "REAL-DEBRID IS DOWN", http.StatusInternalServerError)
return
}
- streamFileToResponse(nil, resp.Download, w, r, t, c, log)
+ streamFileToResponse(resp.Download, w, r, t, c, log)
}
func createErrorFile(path, link string) *intTor.File {
@@ -160,7 +166,7 @@ func createErrorFile(path, link string) *intTor.File {
return &ret
}
-func GetFileReader(torrent *intTor.Torrent, file *intTor.File, offset int64, size int, torMgr *intTor.TorrentManager, cfg config.ConfigInterface, log *zap.SugaredLogger) []byte {
+func GetFileReader(file *intTor.File, offset int64, size int, torMgr *intTor.TorrentManager, cfg config.ConfigInterface, log *zap.SugaredLogger) []byte {
unres := torMgr.UnrestrictUntilOk(file.Link)
if unres == nil {
if strings.Contains(file.Link, "www.youtube.com") {
@@ -168,11 +174,11 @@ func GetFileReader(torrent *intTor.Torrent, file *intTor.File, offset int64, siz
return nil
}
log.Warnf("File %s is no longer available, torrent is marked for repair", file.Path)
- if torrent != nil {
- go torMgr.Repair(torrent.AccessKey)
- }
+ // if torrent != nil {
+ // go torMgr.Repair(torrent.AccessKey)
+ // }
errFile := createErrorFile("unavailable.mp4", "https://www.youtube.com/watch?v=gea_FJrtFVA")
- return GetFileReader(nil, errFile, 0, 0, torMgr, cfg, log)
+ return GetFileReader(errFile, 0, 0, torMgr, cfg, log)
}
req, err := http.NewRequest(http.MethodGet, unres.Download, nil)
@@ -183,7 +189,7 @@ func GetFileReader(torrent *intTor.Torrent, file *intTor.File, offset int64, siz
}
log.Errorf("Error creating new request: %v", err)
errFile := createErrorFile("new_request.mp4", "https://www.youtube.com/watch?v=H3NSrObyAxM")
- return GetFileReader(nil, errFile, 0, 0, torMgr, cfg, log)
+ return GetFileReader(errFile, 0, 0, torMgr, cfg, log)
}
if size == 0 {
@@ -199,11 +205,11 @@ func GetFileReader(torrent *intTor.Torrent, file *intTor.File, offset int64, siz
return nil
}
log.Warnf("Cannot download file %v ; torrent is marked for repair", err)
- if torrent != nil {
- go torMgr.Repair(torrent.AccessKey)
- }
+ // if torrent != nil {
+ // go torMgr.Repair(torrent.AccessKey)
+ // }
errFile := createErrorFile("cannot_download.mp4", "https://www.youtube.com/watch?v=FSSd8cponAA")
- return GetFileReader(nil, errFile, 0, 0, torMgr, cfg, log)
+ return GetFileReader(errFile, 0, 0, torMgr, cfg, log)
}
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusPartialContent {
@@ -212,11 +218,11 @@ func GetFileReader(torrent *intTor.Torrent, file *intTor.File, offset int64, siz
return nil
}
log.Warnf("Received a %s status code ; torrent is marked for repair", resp.Status)
- if torrent != nil {
- go torMgr.Repair(torrent.AccessKey)
- }
+ // if torrent != nil {
+ // go torMgr.Repair(torrent.AccessKey)
+ // }
errFile := createErrorFile("not_ok_status.mp4", "https://www.youtube.com/watch?v=BcseUxviVqE")
- return GetFileReader(nil, errFile, 0, 0, torMgr, cfg, log)
+ return GetFileReader(errFile, 0, 0, torMgr, cfg, log)
}
defer resp.Body.Close()
requestedBytes, err := io.ReadAll(resp.Body)
@@ -224,7 +230,7 @@ func GetFileReader(torrent *intTor.Torrent, file *intTor.File, offset int64, siz
if err != io.EOF {
log.Errorf("Error reading bytes: %v", err)
errFile := createErrorFile("read_error.mp4", "https://www.youtube.com/watch?v=t9VgOriBHwE")
- return GetFileReader(nil, errFile, 0, 0, torMgr, cfg, log)
+ return GetFileReader(errFile, 0, 0, torMgr, cfg, log)
}
}
return requestedBytes
diff --git a/internal/universal/head.go b/internal/universal/head.go
index 160dff0..62d210f 100644
--- a/internal/universal/head.go
+++ b/internal/universal/head.go
@@ -43,8 +43,14 @@ func HandleHeadRequest(w http.ResponseWriter, r *http.Request, t *torrent.Torren
accessKey := segments[len(segments)-2]
filename := segments[len(segments)-1]
- torrent, _ := t.TorrentMap.Get(accessKey)
- if torrent == nil {
+ torrents, ok := t.DirectoryMap.Get(baseDirectory)
+ if !ok {
+ log.Warnf("Cannot find directory %s", baseDirectory)
+ http.Error(w, "File not found", http.StatusNotFound)
+ return
+ }
+ torrent, ok := torrents.Get(accessKey)
+ if !ok {
log.Warnf("Cannot find torrent %s in the directory %s", accessKey, baseDirectory)
http.Error(w, "File not found", http.StatusNotFound)
return
diff --git a/internal/zfs/zfs.go b/internal/zfs/zfs.go
index 5ec7ae8..66a473a 100644
--- a/internal/zfs/zfs.go
+++ b/internal/zfs/zfs.go
@@ -1,11 +1,13 @@
package zfs
import (
+ "fmt"
"strings"
"github.com/debridmediamanager.com/zurg/internal/config"
"github.com/debridmediamanager.com/zurg/internal/torrent"
"github.com/debridmediamanager.com/zurg/pkg/chunk"
+ cmap "github.com/orcaman/concurrent-map/v2"
"github.com/winfsp/cgofuse/fuse"
"go.uber.org/zap"
)
@@ -98,6 +100,7 @@ func (fs *ZurgFS) Getattr(path string, stat *fuse.Stat_t, fh uint64) (errc int)
func (fs *ZurgFS) Read(path string, buff []byte, ofst int64, fh uint64) (n int) {
segments := splitIntoSegments(path)
+ fmt.Println("seg", segments)
if len(segments) != 3 {
return -fuse.ENOENT
} else if directory, dirFound := fs.TorrentManager.DirectoryMap.Get(segments[0]); !dirFound {
@@ -138,30 +141,30 @@ func (fs *ZurgFS) Readdir(path string,
case 0:
fill(".", nil, 0)
fill("..", nil, 0)
- for el := fs.TorrentManager.DirectoryMap.Front(); el != nil; el = el.Next() {
- fill(el.Key, nil, 0)
- }
+ fs.TorrentManager.DirectoryMap.IterCb(func(directoryName string, _ cmap.ConcurrentMap[string, *torrent.Torrent]) {
+ fill(directoryName, nil, 0)
+ })
case 1:
fill(".", nil, 0)
fill("..", nil, 0)
- if directory, dirFound := fs.TorrentManager.DirectoryMap.Get(segments[0]); !dirFound {
+ if torrents, dirFound := fs.TorrentManager.DirectoryMap.Get(segments[0]); !dirFound {
return -fuse.ENOENT
} else {
- for el := directory.Front(); el != nil; el = el.Next() {
- fill(el.Key, nil, 0)
- }
+ torrents.IterCb(func(accessKey string, _ *torrent.Torrent) {
+ fill(accessKey, nil, 0)
+ })
}
case 2:
fill(".", nil, 0)
fill("..", nil, 0)
- if directory, dirFound := fs.TorrentManager.DirectoryMap.Get(segments[0]); !dirFound {
+ if torrents, dirFound := fs.TorrentManager.DirectoryMap.Get(segments[0]); !dirFound {
return -fuse.ENOENT
- } else if torrent, torFound := directory.Get(segments[1]); !torFound {
+ } else if tor, torFound := torrents.Get(segments[1]); !torFound {
return -fuse.ENOENT
} else {
- for el := torrent.SelectedFiles.Front(); el != nil; el = el.Next() {
- fill(el.Key, nil, 0)
- }
+ tor.SelectedFiles.IterCb(func(filename string, _ *torrent.File) {
+ fill(filename, nil, 0)
+ })
}
default:
return -fuse.ENOENT
diff --git a/pkg/chunk/download.go b/pkg/chunk/download.go
index 532373a..125170d 100644
--- a/pkg/chunk/download.go
+++ b/pkg/chunk/download.go
@@ -112,7 +112,6 @@ func (d *Downloader) downloadFromAPI(request *Request, buffer []byte, delay int6
downloadURL := resp.Download
req, err := http.NewRequest("GET", downloadURL, nil)
if nil != err {
- d.log.Debugf("request init error: %v", err)
return fmt.Errorf("could not create request object %s %s from API", request.file.Path, request.file.Link)
}
req.Header.Add("Range", fmt.Sprintf("bytes=%v-%v", request.offsetStart, request.offsetEnd-1))
diff --git a/pkg/chunk/storage.go b/pkg/chunk/storage.go
index 264dcc8..96e759f 100644
--- a/pkg/chunk/storage.go
+++ b/pkg/chunk/storage.go
@@ -381,23 +381,18 @@ func (s *Storage) Store(id RequestID, bytes []byte) (err error) {
if nil != chunk {
if chunk.valid(id) {
- s.log.Debugf("Create chunk %v (exists: valid)", id)
return nil
}
s.log.Warnf("Create chunk %v(exists: overwrite)", id)
} else {
index := s.stack.Pop()
if index == -1 {
- s.log.Debugf("Create chunk %v (failed)", id)
return fmt.Errorf("no buffers available")
}
chunk = s.buffers[index]
deleteID := chunk.id
if blankRequestID != deleteID {
delete(s.chunks, deleteID)
- s.log.Debugf("Create chunk %v (reused)", id)
- } else {
- s.log.Debugf("Create chunk %v (stored)", id)
}
s.chunks[id] = index
chunk.item = s.stack.Push(index)
diff --git a/pkg/realdebrid/types.go b/pkg/realdebrid/types.go
index a27fb92..5d01982 100644
--- a/pkg/realdebrid/types.go
+++ b/pkg/realdebrid/types.go
@@ -29,6 +29,23 @@ type Torrent struct {
Links []string `json:"links"`
}
+func (i *Torrent) UnmarshalJSON(data []byte) error {
+ type Alias Torrent
+ aux := &struct {
+ Progress float64 `json:"progress"`
+ *Alias
+ }{
+ Alias: (*Alias)(i),
+ }
+
+ if err := json.Unmarshal(data, &aux); err != nil {
+ return err
+ }
+
+ i.Progress = int(math.Round(aux.Progress))
+ return nil
+}
+
type TorrentInfo struct {
ID string `json:"id"`
Name string `json:"filename"`