From 0e9302f3b562354ac979b0f54e9f98b7a45310a8 Mon Sep 17 00:00:00 2001 From: Ben Sarmiento Date: Sat, 18 Nov 2023 12:53:39 +0100 Subject: [PATCH] use a new thread safe map --- cmd/zurg/main.go | 57 ++- go.mod | 3 +- go.sum | 4 +- internal/config/load.go | 2 +- internal/config/v1.go | 18 +- internal/dav/listing.go | 117 +++--- internal/http/listing.go | 67 ++-- internal/torrent/manager.go | 703 ++++++++++++++++++------------------ internal/torrent/types.go | 5 +- internal/universal/get.go | 76 ++-- internal/universal/head.go | 10 +- internal/zfs/zfs.go | 27 +- pkg/chunk/download.go | 1 - pkg/chunk/storage.go | 5 - pkg/realdebrid/types.go | 17 + 15 files changed, 577 insertions(+), 535 deletions(-) diff --git a/cmd/zurg/main.go b/cmd/zurg/main.go index 228c578..a9c7353 100644 --- a/cmd/zurg/main.go +++ b/cmd/zurg/main.go @@ -6,7 +6,6 @@ import ( "net/http" "os" "os/signal" - "runtime" "syscall" "time" @@ -14,13 +13,11 @@ import ( "github.com/debridmediamanager.com/zurg/internal/net" "github.com/debridmediamanager.com/zurg/internal/torrent" "github.com/debridmediamanager.com/zurg/internal/version" - "github.com/debridmediamanager.com/zurg/internal/zfs" - "github.com/debridmediamanager.com/zurg/pkg/chunk" - zurghttp "github.com/debridmediamanager.com/zurg/pkg/http" + + // zurghttp "github.com/debridmediamanager.com/zurg/pkg/http" "github.com/debridmediamanager.com/zurg/pkg/logutil" "github.com/debridmediamanager.com/zurg/pkg/realdebrid" "github.com/hashicorp/golang-lru/v2/expirable" - "github.com/winfsp/cgofuse/fuse" ) func main() { @@ -65,29 +62,29 @@ func main() { } }() - log.Debugf("Initializing chunk manager, cores: %d", runtime.NumCPU()) - client := zurghttp.NewHTTPClient(config.GetToken(), 10, config) - chunkMgr, err := chunk.NewManager( - "", // in-memory chunks - 10485760, // 10MB chunk size - max(runtime.NumCPU()/2, 1), // 8 cores/2 = 4 chunks to load ahead - max(runtime.NumCPU()/2, 1), // 4 check threads - max(runtime.NumCPU()-1, 1), // number of chunks that should be read ahead - runtime.NumCPU()*2, // total chunks kept in memory - torrentMgr, - client) - if nil != err { - log.Panicf("Failed to initialize chunk manager: %v", err) - } + // log.Debugf("Initializing chunk manager, cores: %d", runtime.NumCPU()) + // client := zurghttp.NewHTTPClient(config.GetToken(), 10, config) + // chunkMgr, err := chunk.NewManager( + // "", // in-memory chunks + // 10485760, // 10MB chunk size + // max(runtime.NumCPU()/2, 1), // 8 cores/2 = 4 chunks to load ahead + // max(runtime.NumCPU()/2, 1), // 4 check threads + // max(runtime.NumCPU()-1, 1), // number of chunks that should be read ahead + // runtime.NumCPU()*2, // total chunks kept in memory + // torrentMgr, + // client) + // if nil != err { + // log.Panicf("Failed to initialize chunk manager: %v", err) + // } - fs := zfs.NewZurgFS(torrentMgr, config, chunkMgr, logutil.NewLogger().Named("zfs")) - host := fuse.NewFileSystemHost(fs) - go func() { - log.Infof("Mounting on %s", config.GetMountPoint()) - if err := zfs.Mount(host, config); err != nil { - log.Panicf("Failed to mount: %v", err) - } - }() + // fs := zfs.NewZurgFS(torrentMgr, config, chunkMgr, logutil.NewLogger().Named("zfs")) + // host := fuse.NewFileSystemHost(fs) + // go func() { + // log.Infof("Mounting on %s", config.GetMountPoint()) + // if err := zfs.Mount(host, config); err != nil { + // log.Panicf("Failed to mount: %v", err) + // } + // }() <-shutdown @@ -97,9 +94,9 @@ func main() { if err := server.Shutdown(ctx); err != nil { log.Errorf("Server shutdown error: %v\n", err) } - if err := zfs.Unmount(host); err != nil { - log.Errorf("Unmount error: %v\n", err) - } + // if err := zfs.Unmount(host); err != nil { + // log.Errorf("Unmount error: %v\n", err) + // } log.Info("BYE") } diff --git a/go.mod b/go.mod index 89a76d9..1056e14 100644 --- a/go.mod +++ b/go.mod @@ -3,13 +3,14 @@ module github.com/debridmediamanager.com/zurg go 1.21.3 require ( - github.com/elliotchance/orderedmap/v2 v2.2.0 github.com/hashicorp/golang-lru/v2 v2.0.7 go.uber.org/zap v1.26.0 golang.org/x/sys v0.14.0 gopkg.in/yaml.v3 v3.0.1 ) +require github.com/orcaman/concurrent-map/v2 v2.0.1 + require ( github.com/winfsp/cgofuse v1.5.0 go.uber.org/multierr v1.10.0 // indirect diff --git a/go.sum b/go.sum index 963bc15..1896074 100644 --- a/go.sum +++ b/go.sum @@ -1,9 +1,9 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/elliotchance/orderedmap/v2 v2.2.0 h1:7/2iwO98kYT4XkOjA9mBEIwvi4KpGB4cyHeOFOnj4Vk= -github.com/elliotchance/orderedmap/v2 v2.2.0/go.mod h1:85lZyVbpGaGvHvnKa7Qhx7zncAdBIBq6u56Hb1PRU5Q= github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= +github.com/orcaman/concurrent-map/v2 v2.0.1 h1:jOJ5Pg2w1oeB6PeDurIYf6k9PQ+aTITr/6lP/L/zp6c= +github.com/orcaman/concurrent-map/v2 v2.0.1/go.mod h1:9Eq3TG2oBe5FirmYWQfYO5iH1q0Jv47PLaNK++uCdOM= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= diff --git a/internal/config/load.go b/internal/config/load.go index b1bb3ba..3ee3ac6 100644 --- a/internal/config/load.go +++ b/internal/config/load.go @@ -18,7 +18,7 @@ type ConfigInterface interface { GetHost() string GetPort() string GetDirectories() []string - MeetsConditions(directory, torrentID, torrentName string, fileNames []string) bool + MeetsConditions(directory, torrentName string, torrentIDs, fileNames []string) bool GetOnLibraryUpdate() string GetNetworkBufferSize() int GetMountPoint() string diff --git a/internal/config/v1.go b/internal/config/v1.go index dd2bf9b..78f2bf2 100644 --- a/internal/config/v1.go +++ b/internal/config/v1.go @@ -63,21 +63,25 @@ func (z *ZurgConfigV1) GetGroupMap() map[string][]string { return result } -func (z *ZurgConfigV1) MeetsConditions(directory, torrentID, torrentName string, fileNames []string) bool { +func (z *ZurgConfigV1) MeetsConditions(directory, torrentName string, torrentIDs, fileNames []string) bool { if _, ok := z.Directories[directory]; !ok { return false } for _, filter := range z.Directories[directory].Filters { - if z.matchFilter(torrentID, torrentName, fileNames, filter) { + if z.matchFilter(torrentName, torrentIDs, fileNames, filter) { return true } } return false } -func (z *ZurgConfigV1) matchFilter(fileID, torrentName string, fileNames []string, filter *FilterConditionsV1) bool { - if filter.ID != "" && fileID == filter.ID { - return true +func (z *ZurgConfigV1) matchFilter(torrentName string, torrentIDs, fileNames []string, filter *FilterConditionsV1) bool { + if filter.ID != "" { + for _, torrentID := range torrentIDs { + if torrentID == filter.ID { + return true + } + } } if filter.RegexStr != "" { regex := compilePattern(filter.RegexStr) @@ -100,7 +104,7 @@ func (z *ZurgConfigV1) matchFilter(fileID, torrentName string, fileNames []strin if len(filter.And) > 0 { andResult := true for _, andFilter := range filter.And { - andResult = andResult && z.matchFilter(fileID, torrentName, fileNames, andFilter) + andResult = andResult && z.matchFilter(torrentName, torrentIDs, fileNames, andFilter) if !andResult { return false } @@ -109,7 +113,7 @@ func (z *ZurgConfigV1) matchFilter(fileID, torrentName string, fileNames []strin } if len(filter.Or) > 0 { for _, orFilter := range filter.Or { - if z.matchFilter(fileID, torrentName, fileNames, orFilter) { + if z.matchFilter(torrentName, torrentIDs, fileNames, orFilter) { return true } } diff --git a/internal/dav/listing.go b/internal/dav/listing.go index b9bb1ce..0c8ffb4 100644 --- a/internal/dav/listing.go +++ b/internal/dav/listing.go @@ -4,8 +4,10 @@ import ( "encoding/xml" "fmt" "net/http" + "net/url" "path" "path/filepath" + "sort" "strings" "github.com/debridmediamanager.com/zurg/internal/config" @@ -27,11 +29,11 @@ func HandlePropfindRequest(w http.ResponseWriter, r *http.Request, t *torrent.To switch { case len(filteredSegments) == 1 && filteredSegments[0] == "": - output, err = handleRoot(w, r, c) + output, err = handleRoot(c) case len(filteredSegments) == 1: - output, err = handleListOfTorrents(requestPath, w, r, t, c) + output, err = handleListOfTorrents(requestPath, t) case len(filteredSegments) == 2: - output, err = handleSingleTorrent(requestPath, w, r, t) + output, err = handleSingleTorrent(requestPath, t) default: log.Warnf("Request %s %s not found", r.Method, requestPath) http.Error(w, "Not Found", http.StatusNotFound) @@ -56,7 +58,7 @@ func HandlePropfindRequest(w http.ResponseWriter, r *http.Request, t *torrent.To } } -func handleRoot(w http.ResponseWriter, r *http.Request, c config.ConfigInterface) ([]byte, error) { +func handleRoot(c config.ConfigInterface) ([]byte, error) { var responses []dav.Response responses = append(responses, dav.Directory("")) for _, directory := range c.GetDirectories() { @@ -69,68 +71,23 @@ func handleRoot(w http.ResponseWriter, r *http.Request, c config.ConfigInterface return xml.Marshal(rootResponse) } -func handleListOfTorrents(requestPath string, w http.ResponseWriter, r *http.Request, t *torrent.TorrentManager, c config.ConfigInterface) ([]byte, error) { +func handleListOfTorrents(requestPath string, t *torrent.TorrentManager) ([]byte, error) { basePath := path.Base(requestPath) - - for _, directory := range c.GetDirectories() { - if basePath == directory { - var responses []dav.Response - - responses = append(responses, dav.Directory(basePath)) - - for el := t.TorrentMap.Front(); el != nil; el = el.Next() { - accessKey := el.Key - torrent := el.Value - if torrent.InProgress { - continue - } - for _, dir := range torrent.Directories { - if dir == basePath { - path := filepath.Join(basePath, accessKey) - responses = append(responses, dav.Directory(path)) - break - } - } - } - - resp := &dav.MultiStatus{ - XMLNS: "DAV:", - Response: responses, - } - return xml.Marshal(resp) - } + torrents, ok := t.DirectoryMap.Get(basePath) + if !ok { + return nil, fmt.Errorf("cannot find directory %s", basePath) } - - return nil, fmt.Errorf("cannot find directory when generating list: %s", requestPath) -} - -func handleSingleTorrent(requestPath string, w http.ResponseWriter, r *http.Request, t *torrent.TorrentManager) ([]byte, error) { - accessKey := path.Base(requestPath) - torrent, exists := t.TorrentMap.Get(accessKey) - if !exists { - return nil, fmt.Errorf("cannot find torrent %s", accessKey) - } - var responses []dav.Response - // initial response is the directory itself - responses = append(responses, dav.Directory(requestPath)) + responses = append(responses, dav.Directory(basePath)) - for el := torrent.SelectedFiles.Front(); el != nil; el = el.Next() { - file := el.Value - if file.Link == "" { - // will be caught by torrent manager's repairAll - // just skip it for now - continue - } - filename := filepath.Base(file.Path) - filePath := filepath.Join(requestPath, filename) - responses = append(responses, dav.File( - filePath, - file.Bytes, - convertRFC3339toRFC1123(torrent.LatestAdded), - file.Link, - )) + var accessKeys []string + torrents.IterCb(func(accessKey string, _ *torrent.Torrent) { + accessKeys = append(accessKeys, accessKey) + }) + sort.Strings(accessKeys) + for _, accessKey := range accessKeys { + responses = append(responses, dav.Directory(filepath.Join(basePath, accessKey))) } resp := &dav.MultiStatus{ @@ -139,3 +96,41 @@ func handleSingleTorrent(requestPath string, w http.ResponseWriter, r *http.Requ } return xml.Marshal(resp) } + +func handleSingleTorrent(requestPath string, t *torrent.TorrentManager) ([]byte, error) { + basePath := path.Base(path.Dir(requestPath)) + torrents, ok := t.DirectoryMap.Get(basePath) + if !ok { + return nil, fmt.Errorf("cannot find directory %s", basePath) + } + accessKey := path.Base(requestPath) + tor, ok := torrents.Get(accessKey) + if !ok { + return nil, fmt.Errorf("cannot find torrent %s", accessKey) + } + + var responses []dav.Response + // initial response is the directory itself + responses = append(responses, dav.Directory(requestPath)) + + tor.SelectedFiles.IterCb(func(filename string, file *torrent.File) { + if file.Link == "" { + // will be caught by torrent manager's repairAll + // just skip it for now + return + } + filePath := filepath.Join(requestPath, url.PathEscape(filename)) + responses = append(responses, dav.File( + filePath, + file.Bytes, + convertRFC3339toRFC1123(tor.LatestAdded), + file.Link, + )) + }) + + resp := &dav.MultiStatus{ + XMLNS: "DAV:", + Response: responses, + } + return xml.Marshal(resp) +} diff --git a/internal/http/listing.go b/internal/http/listing.go index ddf0db7..210976b 100644 --- a/internal/http/listing.go +++ b/internal/http/listing.go @@ -6,6 +6,7 @@ import ( "net/url" "path" "path/filepath" + "sort" "strings" "github.com/debridmediamanager.com/zurg/internal/config" @@ -24,11 +25,11 @@ func HandleDirectoryListing(w http.ResponseWriter, r *http.Request, t *torrent.T filteredSegments := removeEmptySegments(strings.Split(requestPath, "/")) switch { case len(filteredSegments) == 1: - output, err = handleRoot(w, r, c) + output, err = handleRoot(c) case len(filteredSegments) == 2: - output, err = handleListOfTorrents(requestPath, w, r, t, c) + output, err = handleListOfTorrents(requestPath, t) case len(filteredSegments) == 3: - output, err = handleSingleTorrent(requestPath, w, r, t) + output, err = handleSingleTorrent(requestPath, t) default: log.Warnf("Request %s %s not found", r.Method, requestPath) http.Error(w, "Not Found", http.StatusNotFound) @@ -51,7 +52,7 @@ func HandleDirectoryListing(w http.ResponseWriter, r *http.Request, t *torrent.T } } -func handleRoot(w http.ResponseWriter, r *http.Request, c config.ConfigInterface) (*string, error) { +func handleRoot(c config.ConfigInterface) (*string, error) { htmlDoc := "