From fb7fce1a438c997ae379f7b5fd178d419b316d79 Mon Sep 17 00:00:00 2001 From: Ben Sarmiento Date: Tue, 7 Nov 2023 00:07:31 +0100 Subject: [PATCH] defuse --- cmd/zurg/main.go | 40 ---- go.mod | 2 - go.sum | 6 - internal/zfs/fs.go | 35 ---- internal/zfs/mount.go | 57 ------ internal/zfs/object.go | 189 ----------------- pkg/chunk/chunk.go | 51 ----- pkg/chunk/download.go | 147 ------------- pkg/chunk/manager.go | 265 ------------------------ pkg/chunk/manager_test.go | 53 ----- pkg/chunk/stack.go | 75 ------- pkg/chunk/stack_test.go | 72 ------- pkg/chunk/storage.go | 420 -------------------------------------- 13 files changed, 1412 deletions(-) delete mode 100644 internal/zfs/fs.go delete mode 100644 internal/zfs/mount.go delete mode 100644 internal/zfs/object.go delete mode 100644 pkg/chunk/chunk.go delete mode 100644 pkg/chunk/download.go delete mode 100644 pkg/chunk/manager.go delete mode 100644 pkg/chunk/manager_test.go delete mode 100644 pkg/chunk/stack.go delete mode 100644 pkg/chunk/stack_test.go delete mode 100644 pkg/chunk/storage.go diff --git a/cmd/zurg/main.go b/cmd/zurg/main.go index b52100b..9399474 100644 --- a/cmd/zurg/main.go +++ b/cmd/zurg/main.go @@ -6,15 +6,12 @@ import ( "net/http" "os" "os/signal" - "runtime" "syscall" "time" "github.com/debridmediamanager.com/zurg/internal/config" "github.com/debridmediamanager.com/zurg/internal/net" "github.com/debridmediamanager.com/zurg/internal/torrent" - "github.com/debridmediamanager.com/zurg/internal/zfs" - "github.com/debridmediamanager.com/zurg/pkg/chunk" "github.com/debridmediamanager.com/zurg/pkg/logutil" "github.com/hashicorp/golang-lru/v2/expirable" ) @@ -38,27 +35,6 @@ func main() { addr := fmt.Sprintf(":%s", config.GetPort()) server := &http.Server{Addr: addr, Handler: mux} - mountPoint := config.GetMountPoint() - if _, err := os.Stat(mountPoint); os.IsNotExist(err) { - if err := os.Mkdir(mountPoint, 0755); err != nil { - log.Panicf("Failed to create mount point: %v", err) - } - } - - log.Debugf("Initializing chunk manager, cores: %d", runtime.NumCPU()) - // 64kb request size - chunkMgr, err := chunk.NewManager( - "", - 524288, // 512kb - 1, // 1 chunk - load ahead (1MB total) - max(runtime.NumCPU()/2, 1), // check threads - max(runtime.NumCPU()/2, 1), // load threads - runtime.NumCPU()*2, // max chunks - config) - if nil != err { - log.Panicf("Failed to initialize chunk manager: %v", err) - } - shutdown := make(chan os.Signal, 1) signal.Notify(shutdown, os.Interrupt, syscall.SIGTERM) @@ -74,19 +50,6 @@ func main() { } }() - // Start the mount in a goroutine with panic recovery. - go func() { - defer func() { - if r := recover(); r != nil { - log.Errorf("Mount panic: %v\n", r) - } - }() - log.Infof("Mounting on %s", mountPoint) - if err := zfs.Mount(mountPoint, config, torrentMgr, chunkMgr); err != nil { - log.Panicf("Failed to mount: %v", err) - } - }() - <-shutdown ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) @@ -95,9 +58,6 @@ func main() { if err := server.Shutdown(ctx); err != nil { log.Errorf("Server shutdown error: %v\n", err) } - if err := zfs.Unmount(mountPoint); err != nil { - log.Errorf("Unmount error: %v\n", err) - } log.Info("BYE") } diff --git a/go.mod b/go.mod index eefa01b..69b6a67 100644 --- a/go.mod +++ b/go.mod @@ -3,10 +3,8 @@ module github.com/debridmediamanager.com/zurg go 1.21.3 require ( - bazil.org/fuse v0.0.0-20230120002735-62a210ff1fd5 github.com/hashicorp/golang-lru/v2 v2.0.7 go.uber.org/zap v1.26.0 - golang.org/x/sys v0.4.0 gopkg.in/yaml.v3 v3.0.1 ) diff --git a/go.sum b/go.sum index 252ca36..f49f369 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,3 @@ -bazil.org/fuse v0.0.0-20230120002735-62a210ff1fd5 h1:A0NsYy4lDBZAC6QiYeJ4N+XuHIKBpyhAVRMHRQZKTeQ= -bazil.org/fuse v0.0.0-20230120002735-62a210ff1fd5/go.mod h1:gG3RZAMXCa/OTes6rr9EwusmR1OH1tDDy+cg9c5YliY= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= @@ -8,16 +6,12 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= -github.com/tv42/httpunix v0.0.0-20191220191345-2ba4b9c3382c h1:u6SKchux2yDvFQnDHS3lPnIRmfVJ5Sxy3ao2SIdysLQ= -github.com/tv42/httpunix v0.0.0-20191220191345-2ba4b9c3382c/go.mod h1:hzIxponao9Kjc7aWznkXaL4U4TWaDSs8zcsY4Ka08nM= go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk= go.uber.org/goleak v1.2.0/go.mod h1:XJYK+MuIchqpmGmUSAzotztawfKvYLUIgg7guXrwVUo= go.uber.org/multierr v1.10.0 h1:S0h4aNzvfcFsC3dRF1jLoaov7oRaKqRGC/pUEJ2yvPQ= go.uber.org/multierr v1.10.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= go.uber.org/zap v1.26.0 h1:sI7k6L95XOKS281NhVKOFCUNIvv9e0w4BF8N3u+tCRo= go.uber.org/zap v1.26.0/go.mod h1:dtElttAiwGvoJ/vj4IwHBS/gXsEu/pZ50mUIRWuG0so= -golang.org/x/sys v0.4.0 h1:Zr2JFtRQNX3BCZ8YtxRE9hNJYC8J6I1MVbMg6owUp18= -golang.org/x/sys v0.4.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= diff --git a/internal/zfs/fs.go b/internal/zfs/fs.go deleted file mode 100644 index 4e6d74c..0000000 --- a/internal/zfs/fs.go +++ /dev/null @@ -1,35 +0,0 @@ -package zfs - -import ( - "os" - "sync" - "time" - - "bazil.org/fuse/fs" - "github.com/debridmediamanager.com/zurg/internal/config" - "github.com/debridmediamanager.com/zurg/internal/torrent" - "github.com/debridmediamanager.com/zurg/pkg/chunk" - "go.uber.org/zap" -) - -type FS struct { - uid uint32 - gid uint32 - umask os.FileMode - directIO bool - lock sync.RWMutex - c config.ConfigInterface - t *torrent.TorrentManager - log *zap.SugaredLogger - initTime time.Time - chunk *chunk.Manager -} - -// Root returns the root path -func (f *FS) Root() (fs.Node, error) { - return Object{ - fs: f, - objType: ROOT, - mtime: f.initTime, - }, nil -} diff --git a/internal/zfs/mount.go b/internal/zfs/mount.go deleted file mode 100644 index 7145a99..0000000 --- a/internal/zfs/mount.go +++ /dev/null @@ -1,57 +0,0 @@ -package zfs - -import ( - "os" - "time" - - "bazil.org/fuse" - "bazil.org/fuse/fs" - "github.com/debridmediamanager.com/zurg/internal/config" - "github.com/debridmediamanager.com/zurg/internal/torrent" - "github.com/debridmediamanager.com/zurg/pkg/chunk" - "github.com/debridmediamanager.com/zurg/pkg/logutil" - - "golang.org/x/sys/unix" -) - -func Mount(mountpoint string, cfg config.ConfigInterface, tMgr *torrent.TorrentManager, cMgr *chunk.Manager) error { - rlog := logutil.NewLogger() - log := rlog.Named("zfs") - - options := []fuse.MountOption{ - fuse.AllowOther(), - fuse.AllowNonEmptyMount(), - fuse.MaxReadahead(uint32(128 << 10)), - fuse.DefaultPermissions(), - fuse.FSName("zurgfs"), - } - conn, err := fuse.Mount(mountpoint, options...) - if err != nil { - return err - } - defer conn.Close() - - srv := fs.New(conn, nil) - - filesys := &FS{ - uid: uint32(unix.Geteuid()), - gid: uint32(unix.Getegid()), - umask: os.FileMode(0), - c: cfg, - t: tMgr, - log: log, - initTime: time.Now(), - chunk: cMgr, - } - - if err := srv.Serve(filesys); err != nil { - return err - } - - return nil -} - -func Unmount(mountpoint string) error { - fuse.Unmount(mountpoint) - return nil -} diff --git a/internal/zfs/object.go b/internal/zfs/object.go deleted file mode 100644 index 8b4fe0e..0000000 --- a/internal/zfs/object.go +++ /dev/null @@ -1,189 +0,0 @@ -package zfs - -import ( - "context" - "fmt" - "os" - "path/filepath" - "strings" - "syscall" - "time" - - "bazil.org/fuse" - "bazil.org/fuse/fs" - "github.com/debridmediamanager.com/zurg/internal/torrent" -) - -// define variable as rootObject id -const ( - ROOT = 0 - DIRECTORY = 1 - TORRENT = 2 - FILE = 3 -) - -type Object struct { - fs *FS - objType int - parentName string - name string - file *torrent.File - size uint64 - mtime time.Time -} - -// Attr returns the attributes for a directory -func (o Object) Attr(ctx context.Context, attr *fuse.Attr) error { - if o.objType == FILE { - attr.Mode = 0644 - } else { - attr.Mode = os.ModeDir | 0755 - } - attr.Size = o.size - - attr.Uid = o.fs.uid - attr.Gid = o.fs.gid - - attr.Ctime = o.mtime - attr.Mtime = o.mtime - - attr.Blocks = (attr.Size + 511) >> 9 - - return nil -} - -// ReadDirAll shows all files in the current directory -func (o Object) ReadDirAll(ctx context.Context) ([]fuse.Dirent, error) { - dirs := []fuse.Dirent{} - switch o.objType { - case ROOT: - for _, directory := range o.fs.c.GetDirectories() { - dirs = append(dirs, fuse.Dirent{ - Name: directory, - Type: fuse.DT_Dir, - }) - } - case DIRECTORY: - seen := make(map[string]bool) - for _, item := range o.fs.t.GetByDirectory(o.name) { - if item.Progress != 100 { - continue - } - if _, exists := seen[item.Name]; exists { - continue - } - seen[item.Name] = true - dirs = append(dirs, fuse.Dirent{ - Name: item.Name, - Type: fuse.DT_Dir, - }) - } - case TORRENT: - finalName := make(map[string]bool) - for _, item := range o.fs.t.FindAllTorrentsWithName(o.parentName, o.name) { - for _, file := range item.SelectedFiles { - if file.Link == "" { - // log.Println("File has no link, skipping", file.Path) - continue - } - filename := filepath.Base(file.Path) - if finalName[filename] { - // fragment := davextra.GetLinkFragment(file.Link) - // filename = davextra.InsertLinkFragment(filename, fragment) - continue - } - finalName[filename] = true - dirs = append(dirs, fuse.Dirent{ - Name: filename, - Type: fuse.DT_File, - }) - } - } - } - return dirs, nil -} - -// Lookup tests if a file is existent in the current directory -func (o Object) Lookup(ctx context.Context, name string) (fs.Node, error) { - switch o.objType { - case ROOT: - for _, directory := range o.fs.c.GetDirectories() { - if directory == name { - return Object{ - fs: o.fs, - objType: DIRECTORY, - parentName: o.name, - name: name, - mtime: o.fs.initTime, - }, nil - } - } - case DIRECTORY: - for _, item := range o.fs.t.GetByDirectory(o.name) { - if item.Name == name && item.Progress == 100 { - return Object{ - fs: o.fs, - objType: TORRENT, - parentName: o.name, - name: name, - mtime: convertRFC3339toTime(item.Added), - }, nil - } - } - case TORRENT: - for _, item := range o.fs.t.FindAllTorrentsWithName(o.parentName, o.name) { - for _, file := range item.SelectedFiles { - if strings.HasSuffix(file.Path, name) && file.Link != "" { - return Object{ - fs: o.fs, - objType: FILE, - parentName: o.name, - name: name, - file: &file, - size: uint64(file.Bytes), - mtime: convertRFC3339toTime(item.Added), - }, nil - } - } - } - } - return nil, syscall.ENOENT -} - -// Open a file -func (o Object) Open(ctx context.Context, req *fuse.OpenRequest, resp *fuse.OpenResponse) (fs.Handle, error) { - resp.Flags |= fuse.OpenDirectIO - return o, nil -} - -// Read reads some bytes or the whole file -func (o Object) Read(ctx context.Context, req *fuse.ReadRequest, resp *fuse.ReadResponse) error { - o.fs.log.Debugf("Read %s (total size %d) req offset %d req size %d", o.name, o.size, req.Offset, req.Size) - data, err := o.fs.chunk.GetChunk(o.file, req.Offset, int64(req.Size)) - if nil != err { - o.fs.log.Warnf("%v", err) - return syscall.EIO - } - - resp.Data = data - return nil -} - -// Remove deletes an element -func (o Object) Remove(ctx context.Context, req *fuse.RemoveRequest) error { - return fmt.Errorf("Remove not yet implemented") -} - -// Rename renames an element -func (o Object) Rename(ctx context.Context, req *fuse.RenameRequest, newDir fs.Node) error { - return fmt.Errorf("Rename not yet implemented") -} - -func convertRFC3339toTime(input string) time.Time { - layout := "2006-01-02T15:04:05.000Z" - t, err := time.Parse(layout, input) - if err != nil { - return time.Now() - } - return t -} diff --git a/pkg/chunk/chunk.go b/pkg/chunk/chunk.go deleted file mode 100644 index bc0c964..0000000 --- a/pkg/chunk/chunk.go +++ /dev/null @@ -1,51 +0,0 @@ -package chunk - -import ( - "container/list" - "hash/crc32" -) - -// Chunk of memory -type Chunk struct { - clean bool - *chunkHeader - bytes []byte - item *list.Element -} - -type chunkHeader struct { - id RequestID - size uint32 - checksum uint32 -} - -func (c *Chunk) valid(id RequestID) bool { - if c.id != id { - return false - } - if !c.clean { - c.clean = c.checksum == c.calculateChecksum() - } - return c.clean -} - -func (c *Chunk) update(id RequestID, bytes []byte) { - c.id = id - c.size = uint32(copy(c.bytes, bytes)) - c.checksum = c.calculateChecksum() - c.clean = true -} - -func (c *Chunk) calculateChecksum() uint32 { - size := c.size - if nil == c.bytes || size == 0 { - return 0 - } - maxSize := uint32(len(c.bytes)) - if size > maxSize { - // corrupt size or truncated chunk, fix size - c.size = maxSize - return crc32.Checksum(c.bytes, crc32Table) - } - return crc32.Checksum(c.bytes[:size], crc32Table) -} diff --git a/pkg/chunk/download.go b/pkg/chunk/download.go deleted file mode 100644 index a9cd6ec..0000000 --- a/pkg/chunk/download.go +++ /dev/null @@ -1,147 +0,0 @@ -package chunk - -import ( - "fmt" - "io" - "net/http" - "sync" - "syscall" - "time" - - "github.com/debridmediamanager.com/zurg/internal/config" - "github.com/debridmediamanager.com/zurg/pkg/logutil" - "github.com/debridmediamanager.com/zurg/pkg/realdebrid" - "go.uber.org/zap" - "golang.org/x/sys/unix" -) - -// Downloader handles concurrent chunk downloads -type Downloader struct { - BufferSize int64 - queue chan *Request - callbacks map[RequestID][]DownloadCallback - lock sync.Mutex - storage *Storage - c config.ConfigInterface - log *zap.SugaredLogger -} - -type DownloadCallback func(error, []byte) - -// NewDownloader creates a new download manager -func NewDownloader(threads int, storage *Storage, bufferSize int64, c config.ConfigInterface) (*Downloader, error) { - rlog := logutil.NewLogger() - log := rlog.Named("downloader") - - manager := Downloader{ - BufferSize: bufferSize, - queue: make(chan *Request, 100), - callbacks: make(map[RequestID][]DownloadCallback, 100), - storage: storage, - c: c, - log: log, - } - - for i := 0; i < threads; i++ { - go manager.thread(i) - } - - return &manager, nil -} - -// Download starts a new download request -func (d *Downloader) Download(req *Request, callback DownloadCallback) { - d.lock.Lock() - callbacks, exists := d.callbacks[req.id] - if nil != callback { - d.callbacks[req.id] = append(callbacks, callback) - } else if !exists { - d.callbacks[req.id] = callbacks - } - if !exists { - d.queue <- req - } - d.lock.Unlock() -} - -func (d *Downloader) thread(n int) { - buffer, err := unix.Mmap(-1, 0, int(d.BufferSize), syscall.PROT_READ|syscall.PROT_WRITE, syscall.MAP_ANON|syscall.MAP_PRIVATE) - if nil != err { - d.log.Warnf("Failed to mmap download buffer %v: %v", n, err) - buffer = make([]byte, d.BufferSize) - } - for { - req := <-d.queue - d.download(req, buffer) - } -} - -func (d *Downloader) download(req *Request, buffer []byte) { - d.log.Debugf("Starting download %v (preload: %v)", req.id, req.preload) - err := d.downloadFromAPI(req, buffer, 0) - - d.lock.Lock() - callbacks := d.callbacks[req.id] - for _, callback := range callbacks { - callback(err, buffer) - } - delete(d.callbacks, req.id) - d.lock.Unlock() - - if nil != err { - return - } - - if err := d.storage.Store(req.id, buffer); nil != err { - d.log.Warnf("Could not store chunk %v: %v", req.id, err) - } -} - -func (d *Downloader) downloadFromAPI(request *Request, buffer []byte, delay int64) error { - // sleep if request is throttled - if delay > 0 { - time.Sleep(time.Duration(delay) * time.Second) - } - - unrestrictFn := func() (*realdebrid.UnrestrictResponse, error) { - return realdebrid.UnrestrictLink(d.c.GetToken(), request.file.Link) - } - resp := realdebrid.RetryUntilOk(unrestrictFn) - if resp == nil { - return fmt.Errorf("cannot unrestrict file %s %s", request.file.Path, request.file.Link) - } - - downloadURL := resp.Download - req, err := http.NewRequest("GET", downloadURL, nil) - if nil != err { - d.log.Debugf("request init error: %v", err) - return fmt.Errorf("could not create request object %s %s from API", request.file.Path, request.file.Link) - } - req.Header.Add("Range", fmt.Sprintf("bytes=%v-%v", request.offsetStart, request.offsetEnd-1)) - - res, err := http.DefaultClient.Do(req) - if nil != err { - d.log.Debugf("request error: %v", err) - return fmt.Errorf("could not request object %s %s from API", request.file.Path, request.file.Link) - } - defer res.Body.Close() - reader := res.Body - - if res.StatusCode != 206 && res.StatusCode != 200 { - return fmt.Errorf("could not read object %s %s / StatusCode: %v", - request.file.Path, request.file.Link, res.StatusCode) - } - - if res.ContentLength == -1 { - return fmt.Errorf("missing Content-Length header in response") - } - - n, err := io.ReadFull(reader, buffer[:res.ContentLength:cap(buffer)]) - if nil != err && err != io.ErrUnexpectedEOF { - d.log.Debugf("response read error: %v", err) - return fmt.Errorf("could not read objects %s %s API response", request.file.Path, request.file.Link) - } - d.log.Debugf("Downloaded %v bytes of %s %s", n, request.file.Path, request.file.Link) - - return nil -} diff --git a/pkg/chunk/manager.go b/pkg/chunk/manager.go deleted file mode 100644 index 6713c49..0000000 --- a/pkg/chunk/manager.go +++ /dev/null @@ -1,265 +0,0 @@ -package chunk - -import ( - "crypto/sha256" - "encoding/binary" - "fmt" - "os" - - "github.com/debridmediamanager.com/zurg/internal/config" - "github.com/debridmediamanager.com/zurg/internal/torrent" -) - -// Manager manages chunks on disk -type Manager struct { - ChunkSize int64 - LoadAhead int - downloader *Downloader - storage *Storage - queue chan *QueueEntry -} - -type QueueEntry struct { - request *Request - response chan Response -} - -// RequestID is the binary identifier for a chunk request -type RequestID [24]byte - -func (id RequestID) String() string { - return fmt.Sprintf("%032x:%v", id[:16], binary.BigEndian.Uint64(id[16:])) -} - -// Request represents a chunk request -type Request struct { - id RequestID - file *torrent.File - offsetStart int64 - offsetEnd int64 - chunkOffset int64 - chunkOffsetEnd int64 - sequence int - preload bool -} - -// Response represetns a chunk response -type Response struct { - Sequence int - Error error - Bytes []byte -} - -// NewManager creates a new chunk manager -func NewManager( - chunkFile string, - chunkSize int64, - loadAhead, - checkThreads, - loadThreads, - maxChunks int, - c config.ConfigInterface) (*Manager, error) { - - pageSize := int64(os.Getpagesize()) - if chunkSize < pageSize { - return nil, fmt.Errorf("chunk size must not be < %v", pageSize) - } - if chunkSize%pageSize != 0 { - return nil, fmt.Errorf("chunk size must be divideable by %v", pageSize) - } - // 32-Bit: ~2GB / 64-Bit: ~8EB - maxMmapSize := int64(^uint(0) >> 1) - if chunkSize > maxMmapSize { - return nil, fmt.Errorf("chunk size must be < %v", maxMmapSize) - } - if maxChunks < 2 || maxChunks < loadAhead { - return nil, fmt.Errorf("max-chunks must be greater than 2 and bigger than the load ahead value") - } - - storage, err := NewStorage(chunkSize, maxChunks, maxMmapSize, chunkFile) - if nil != err { - return nil, err - } - - downloader, err := NewDownloader(loadThreads, storage, chunkSize, c) - if nil != err { - return nil, err - } - - manager := Manager{ - ChunkSize: chunkSize, - LoadAhead: loadAhead, - downloader: downloader, - storage: storage, - queue: make(chan *QueueEntry, 100), - } - - if err := manager.storage.Clear(); nil != err { - return nil, err - } - - for i := 0; i < checkThreads; i++ { - go manager.thread() - } - - return &manager, nil -} - -// GetChunk loads one chunk and starts the preload for the next chunks -func (m *Manager) GetChunk(object *torrent.File, offset, size int64) ([]byte, error) { - maxOffset := object.Bytes - if offset > maxOffset { - return nil, fmt.Errorf("tried to read past EOF of %v at offset %v", object.ID, offset) - } - // Log.Infof("Request %v:%v md5:%v", object.ID, offset, object.MD5Checksum) - if offset+size > maxOffset { - size = object.Bytes - offset - } - - ranges := splitChunkRanges(offset, size, m.ChunkSize) - numRanges := len(ranges) - responses := make(chan Response, numRanges) - - last := numRanges - 1 - for i, r := range ranges { - m.requestChunk(object, r.offset, r.size, i, i == last, responses) - } - - data := make([]byte, size) - for i := 0; i < cap(responses); i++ { - res := <-responses - if nil != res.Error { - return nil, res.Error - } - - dataOffset := ranges[res.Sequence].offset - offset - - if n := copy(data[dataOffset:], res.Bytes); n == 0 { - return nil, fmt.Errorf("request %v slice %v has empty response", object.ID, res.Sequence) - } - } - close(responses) - - return data, nil -} - -func buildRequestID(object *torrent.File, offset int64) (id RequestID) { - fileID := object.Link - if fileID == "" { - fileID = object.Path - } - hash := sha256.Sum256([]byte(fileID)) - copy(id[:16], hash[:16]) - binary.BigEndian.PutUint64(id[16:], uint64(offset)) - return -} - -func (m *Manager) requestChunk(object *torrent.File, offset, size int64, sequence int, preload bool, response chan Response) { - chunkOffset := offset % m.ChunkSize - offsetStart := offset - chunkOffset - offsetEnd := offsetStart + m.ChunkSize - - request := &Request{ - id: buildRequestID(object, offsetStart), - file: object, - offsetStart: offsetStart, - offsetEnd: offsetEnd, - chunkOffset: chunkOffset, - chunkOffsetEnd: chunkOffset + size, - sequence: sequence, - preload: false, - } - - m.queue <- &QueueEntry{ - request: request, - response: response, - } - - if !preload { - return - } - - for i := m.ChunkSize; i < (m.ChunkSize * int64(m.LoadAhead+1)); i += m.ChunkSize { - aheadOffsetStart := offsetStart + i - aheadOffsetEnd := aheadOffsetStart + m.ChunkSize - if uint64(aheadOffsetStart) < uint64(object.Bytes) && uint64(aheadOffsetEnd) < uint64(object.Bytes) { - request := &Request{ - id: buildRequestID(object, aheadOffsetStart), - file: object, - offsetStart: aheadOffsetStart, - offsetEnd: aheadOffsetEnd, - preload: true, - } - m.queue <- &QueueEntry{ - request: request, - } - } - } -} - -type byteRange struct { - offset, size int64 -} - -// Calculate request ranges that span multiple chunks -// -// This can happen with Direct-IO and unaligned reads or -// if the size is bigger than the chunk size. -func splitChunkRanges(offset, size, chunkSize int64) []byteRange { - ranges := make([]byteRange, 0, size/chunkSize+2) - for remaining := size; remaining > 0; remaining -= size { - size = min(remaining, chunkSize-offset%chunkSize) - ranges = append(ranges, byteRange{offset, size}) - offset += size - } - return ranges -} - -func (m *Manager) thread() { - for { - queueEntry := <-m.queue - m.checkChunk(queueEntry.request, queueEntry.response) - } -} - -func (m *Manager) checkChunk(req *Request, response chan Response) { - if nil == response { - if nil == m.storage.Load(req.id) { - m.downloader.Download(req, nil) - } - return - } - - if bytes := m.storage.Load(req.id); nil != bytes { - response <- Response{ - Sequence: req.sequence, - Bytes: adjustResponseChunk(req, bytes), - } - return - } - - m.downloader.Download(req, func(err error, bytes []byte) { - response <- Response{ - Sequence: req.sequence, - Error: err, - Bytes: adjustResponseChunk(req, bytes), - } - }) -} - -func adjustResponseChunk(req *Request, bytes []byte) []byte { - if nil == bytes { - return nil - } - bytesLen := int64(len(bytes)) - sOffset := min(req.chunkOffset, bytesLen) - eOffset := min(req.chunkOffsetEnd, bytesLen) - return bytes[sOffset:eOffset] -} - -func min(x, y int64) int64 { - if x < y { - return x - } - return y -} diff --git a/pkg/chunk/manager_test.go b/pkg/chunk/manager_test.go deleted file mode 100644 index ceb78e5..0000000 --- a/pkg/chunk/manager_test.go +++ /dev/null @@ -1,53 +0,0 @@ -package chunk - -import "testing" - -func TestSplitChunkRanges(t *testing.T) { - testcases := []struct { - offset, size, chunkSize int64 - result []byteRange - }{ - {0, 0, 4096, []byteRange{}}, - {0, 4096, 4096, []byteRange{ - {0, 4096}, - }}, - {4095, 4096, 4096, []byteRange{ - {4095, 1}, - {4096, 4095}, - }}, - {0, 8192, 4096, []byteRange{ - {0, 4096}, - {4096, 4096}, - }}, - {2048, 8192, 4096, []byteRange{ - {2048, 2048}, - {4096, 4096}, - {8192, 2048}, - }}, - {2048, 8192, 4096, []byteRange{ - {2048, 2048}, - {4096, 4096}, - {8192, 2048}, - }}, - {17960960, 16777216, 10485760, []byteRange{ - {17960960, 3010560}, - {20971520, 10485760}, - {31457280, 3280896}, - }}, - } - for i, tc := range testcases { - ranges := splitChunkRanges(tc.offset, tc.size, tc.chunkSize) - actualSize := len(ranges) - expectedSize := len(tc.result) - if actualSize != expectedSize { - t.Fatalf("ByteRange %v length mismatch: %v != %v", i, actualSize, expectedSize) - } - for j, r := range ranges { - actual := r - expected := tc.result[j] - if actual != expected { - t.Fatalf("ByteRange %v mismatch: %v != %v", i, actual, expected) - } - } - } -} diff --git a/pkg/chunk/stack.go b/pkg/chunk/stack.go deleted file mode 100644 index 8955306..0000000 --- a/pkg/chunk/stack.go +++ /dev/null @@ -1,75 +0,0 @@ -package chunk - -import ( - "container/list" - "sync" -) - -// Stack is a thread safe list/stack implementation -type Stack struct { - items *list.List - lock sync.Mutex - maxSize int -} - -// NewStack creates a new stack -func NewStack(maxChunks int) *Stack { - return &Stack{ - items: list.New(), - maxSize: maxChunks, - } -} - -// Len gets the number of items on the stack -func (s *Stack) Len() int { - s.lock.Lock() - defer s.lock.Unlock() - return s.items.Len() -} - -// Pop pops the first item from the stack -func (s *Stack) Pop() int { - s.lock.Lock() - defer s.lock.Unlock() - if s.items.Len() < s.maxSize { - return -1 - } - item := s.items.Front() - if nil == item { - return -1 - } - s.items.Remove(item) - return item.Value.(int) -} - -// Touch moves the specified item to the last position of the stack -func (s *Stack) Touch(item *list.Element) { - s.lock.Lock() - if item != s.items.Back() { - s.items.MoveToBack(item) - } - s.lock.Unlock() -} - -// Push adds a new item to the last position of the stack -func (s *Stack) Push(id int) *list.Element { - s.lock.Lock() - defer s.lock.Unlock() - return s.items.PushBack(id) -} - -// Prepend adds a list to the front of the stack -func (s *Stack) Prepend(items *list.List) { - s.lock.Lock() - s.items.PushFrontList(items) - s.lock.Unlock() -} - -// Purge an item from the stack -func (s *Stack) Purge(item *list.Element) { - s.lock.Lock() - defer s.lock.Unlock() - if item != s.items.Front() { - s.items.MoveToFront(item) - } -} diff --git a/pkg/chunk/stack_test.go b/pkg/chunk/stack_test.go deleted file mode 100644 index 023bd70..0000000 --- a/pkg/chunk/stack_test.go +++ /dev/null @@ -1,72 +0,0 @@ -package chunk - -import "testing" - -func TestOOB(t *testing.T) { - stack := NewStack(1) - - item := stack.Push(1) - stack.Touch(item) -} - -func TestAddToStack(t *testing.T) { - stack := NewStack(1) - - item1 := stack.Push(1) - item2 := stack.Push(2) - item3 := stack.Push(3) - item4 := stack.Push(4) - - stack.Touch(item1) - stack.Touch(item3) - - stack.Purge(item2) - stack.Purge(item4) - - v := stack.Pop() - if v != 4 { - t.Fatalf("Expected 4 got %v", v) - } - - v = stack.Pop() - if v != 2 { - t.Fatalf("Expected 2 got %v", v) - } - - v = stack.Pop() - if v != 1 { - t.Fatalf("Expected 1 got %v", v) - } - - v = stack.Pop() - if v != 3 { - t.Fatalf("Expected 3 got %v", v) - } - - v = stack.Pop() - if v != -1 { - t.Fatalf("Expected -1 got %v", v) - } -} - -func TestLen(t *testing.T) { - stack := NewStack(1) - - v := stack.Len() - if v != 0 { - t.Fatalf("Expected 0 got %v", v) - } - - stack.Push(1) - v = stack.Len() - if v != 1 { - t.Fatalf("Expected 1 got %v", v) - } - - _ = stack.Pop() - v = stack.Len() - if v != 0 { - t.Fatalf("Expected 0 got %v", v) - } - -} diff --git a/pkg/chunk/storage.go b/pkg/chunk/storage.go deleted file mode 100644 index 264dcc8..0000000 --- a/pkg/chunk/storage.go +++ /dev/null @@ -1,420 +0,0 @@ -package chunk - -import ( - "container/list" - "fmt" - "hash/crc32" - "os" - "os/signal" - "sync" - "syscall" - "time" - "unsafe" - - "go.uber.org/zap" - "golang.org/x/sys/unix" - - "github.com/debridmediamanager.com/zurg/pkg/logutil" -) - -const ( - headerSize = int(unsafe.Sizeof(*new(chunkHeader))) - tocSize = int64(unsafe.Sizeof(*new(journalHeader))) - journalMagic = uint16('P'<<8 | 'D'&0xFF) - journalVersion = uint8(2) -) - -var ( - blankRequestID RequestID - crc32Table = crc32.MakeTable(crc32.Castagnoli) -) - -// Storage is a chunk storage -type Storage struct { - ChunkFile *os.File - ChunkSize int64 - HeaderSize int64 - MaxChunks int - chunks map[RequestID]int - stack *Stack - lock sync.RWMutex - buffers []*Chunk - loadChunks int - signals chan os.Signal - journal []byte - mmapRegions [][]byte - chunksPerRegion int64 - log *zap.SugaredLogger -} - -type journalHeader struct { - magic uint16 - version uint8 - headerSize uint8 - maxChunks uint32 - chunkSize uint32 - checksum uint32 -} - -// NewStorage creates a new storage -func NewStorage(chunkSize int64, maxChunks int, maxMmapSize int64, chunkFilePath string) (*Storage, error) { - rlog := logutil.NewLogger() - log := rlog.Named("storage") - - s := Storage{ - ChunkSize: chunkSize, - MaxChunks: maxChunks, - chunks: make(map[RequestID]int, maxChunks), - stack: NewStack(maxChunks), - buffers: make([]*Chunk, maxChunks), - signals: make(chan os.Signal, 1), - log: log, - } - - journalSize := tocSize + int64(headerSize*maxChunks) - journalOffset := chunkSize * int64(maxChunks) - - // Non-empty string in chunkFilePath enables MMAP disk storage for chunks - if chunkFilePath != "" { - chunkFile, err := os.OpenFile(chunkFilePath, os.O_RDWR|os.O_CREATE, 0600) - if nil != err { - s.log.Debugf("%v", err) - return nil, fmt.Errorf("could not open chunk cache file") - } - s.ChunkFile = chunkFile - currentSize, err := chunkFile.Seek(0, os.SEEK_END) - if nil != err { - s.log.Debugf("%v", err) - return nil, fmt.Errorf("chunk file is not seekable") - } - wantedSize := journalOffset + journalSize - s.log.Debugf("Current chunk cache file size: %v B (wanted: %v B)", currentSize, wantedSize) - if err := chunkFile.Truncate(currentSize); nil != err { - s.log.Warnf("Could not truncate chunk cache, skip resizing") - } else if currentSize != wantedSize { - if currentSize > tocSize { - err = s.relocateJournal(currentSize, wantedSize, journalSize, journalOffset) - if nil != err { - s.log.Errorf("%v", err) - } else { - s.log.Infof("Relocated chunk cache journal") - } - } - if err := chunkFile.Truncate(wantedSize); nil != err { - s.log.Debugf("%v", err) - return nil, fmt.Errorf("could not resize chunk cache file") - } - } - s.log.Infof("Created chunk cache file %v", chunkFile.Name()) - s.loadChunks = int(min(currentSize/chunkSize, int64(maxChunks))) - } - - // Alocate journal - if journal, err := s.mmap(journalOffset, journalSize); nil != err { - return nil, fmt.Errorf("could not allocate journal: %v", err) - } else { - if err := unix.Madvise(journal, syscall.MADV_RANDOM); nil != err { - s.log.Warnf("Madvise MADV_RANDOM for journal failed: %v", err) - } - tocOffset := journalSize - tocSize - header := journal[tocOffset:] - if valid := s.checkJournal(header, false); !valid { - s.initJournal(header) - } - s.journal = journal[:tocOffset] - } - - // Setup sighandler - signal.Notify(s.signals, syscall.SIGINT, syscall.SIGTERM) - - // Allocate mmap regions for chunks - if err := s.allocateMmapRegions(maxMmapSize); nil != err { - return nil, err - } - // Map chunks to slices from mmap regions - if err := s.mmapChunks(); nil != err { - return nil, err - } - - return &s, nil -} - -// relocateJournal moves existing journal prior to resize -func (s *Storage) relocateJournal(currentSize, wantedSize, journalSize, journalOffset int64) error { - header := make([]byte, tocSize) - if _, err := s.ChunkFile.ReadAt(header, currentSize-tocSize); nil != err { - return fmt.Errorf("failed to read journal header: %v", err) - } - - if valid := s.checkJournal(header, true); !valid { - return fmt.Errorf("failed to validate journal header") - } - - h := (*journalHeader)(unsafe.Pointer(&header[0])) - oldJournalOffset := s.ChunkSize * int64(h.maxChunks) - oldJournalSize := min(journalSize, currentSize-oldJournalOffset) - tocSize - journal := make([]byte, journalSize) - - if _, err := s.ChunkFile.ReadAt(journal[:oldJournalSize], oldJournalOffset); nil != err { - return fmt.Errorf("failed to read journal: %v", err) - } - - s.initJournal(header) - - sizeWithoutJournal := currentSize - oldJournalSize - tocSize - if err := s.ChunkFile.Truncate(sizeWithoutJournal); nil != err { - return fmt.Errorf("could not truncate chunk cache journal: %v", err) - } - - if err := s.ChunkFile.Truncate(wantedSize); nil != err { - return fmt.Errorf("could not resize chunk cache file: %v", err) - } - - if _, err := s.ChunkFile.WriteAt(journal, journalOffset); nil != err { - return fmt.Errorf("failed to write journal: %v", err) - } - if _, err := s.ChunkFile.WriteAt(header, wantedSize-tocSize); nil != err { - return fmt.Errorf("failed to write journal header: %v", err) - } - return nil -} - -// checkJournal verifies the journal header -func (s *Storage) checkJournal(journal []byte, skipMaxChunks bool) bool { - h := (*journalHeader)(unsafe.Pointer(&journal[0])) - // check magic bytes / endianess mismatch ('PD' vs 'DP') - if h.magic != journalMagic { - s.log.Debugf("Journal magic mismatch: %v != %v", h.magic, journalMagic) - return false - } - checksum := crc32.Checksum(journal[:12], crc32Table) - if h.checksum != checksum { - s.log.Debugf("Journal checksum mismatch: %08X != %08X", h.checksum, checksum) - return false - } - if h.version != journalVersion { - s.log.Debugf("Journal version mismatch: %v != %v", h.version, journalVersion) - return false - } - if h.headerSize != uint8(headerSize) { - s.log.Debugf("Journal chunk header size mismatch: %v != %v", h.headerSize, headerSize) - return false - } - if !skipMaxChunks && h.maxChunks != uint32(s.MaxChunks) { - s.log.Debugf("Journal max chunks mismatch: %v != %v", h.maxChunks, s.MaxChunks) - return false - } - if h.chunkSize != uint32(s.ChunkSize) { - s.log.Debugf("Journal chunk size mismatch: %v != %v", h.chunkSize, s.ChunkSize) - return false - } - s.log.Debug("Journal is valid") - return true -} - -// initJournal initializes the journal -func (s *Storage) initJournal(journal []byte) { - h := (*journalHeader)(unsafe.Pointer(&journal[0])) - h.magic = journalMagic - h.version = journalVersion - h.headerSize = uint8(headerSize) - h.maxChunks = uint32(s.MaxChunks) - h.chunkSize = uint32(s.ChunkSize) - h.checksum = crc32.Checksum(journal[:12], crc32Table) -} - -// allocateMmapRegions creates memory mappings to fit all chunks -func (s *Storage) allocateMmapRegions(maxMmapSize int64) error { - s.chunksPerRegion = maxMmapSize / s.ChunkSize - regionSize := s.chunksPerRegion * s.ChunkSize - numRegions := int64(s.MaxChunks) / s.chunksPerRegion - remChunks := int64(s.MaxChunks) % s.chunksPerRegion - if remChunks != 0 { - numRegions++ - } - s.mmapRegions = make([][]byte, numRegions) - for i := int64(0); i < int64(len(s.mmapRegions)); i++ { - size := regionSize - if i == numRegions-1 && remChunks != 0 { - size = remChunks * s.ChunkSize - } - s.log.Debugf("Allocate mmap region %v/%v with size %v B", i+1, numRegions, size) - region, err := s.mmap(i*regionSize, size) - if nil != err { - s.log.Errorf("failed to mmap region %v/%v with size %v B", i+1, numRegions, size) - return err - } - if err := unix.Madvise(region, syscall.MADV_SEQUENTIAL); nil != err { - s.log.Warnf("Madvise MADV_SEQUENTIAL for region %v/%v failed: %v", i+1, numRegions, err) - } - s.mmapRegions[i] = region - } - return nil -} - -// mmapChunks slices buffers from mmap regions and loads chunk metadata -func (s *Storage) mmapChunks() error { - start := time.Now() - empty := list.New() - restored := list.New() - loadedChunks := 0 - for i := 0; i < s.MaxChunks; i++ { - select { - case sig := <-s.signals: - s.log.Warnf("Received signal %v, aborting chunk loader", sig) - return fmt.Errorf("aborted by signal") - default: - if loaded, err := s.initChunk(i, empty, restored); nil != err { - s.log.Errorf("failed to allocate chunk %v: %v", i, err) - return fmt.Errorf("failed to initialize chunks") - } else if loaded { - loadedChunks++ - } - } - } - s.stack.Prepend(restored) - s.stack.Prepend(empty) - elapsed := time.Since(start) - if nil != s.ChunkFile { - s.log.Infof("Loaded %v/%v cache chunks in %v", loadedChunks, s.MaxChunks, elapsed) - } else { - s.log.Infof("Allocated %v cache chunks in %v", s.MaxChunks, elapsed) - } - return nil -} - -// initChunk tries to restore a chunk from disk -func (s *Storage) initChunk(index int, empty *list.List, restored *list.List) (bool, error) { - chunk, err := s.allocateChunk(index) - if err != nil { - s.log.Debugf("%v", err) - return false, err - } - - s.buffers[index] = chunk - - id := chunk.id - - if blankRequestID == id || index >= s.loadChunks { - chunk.item = empty.PushBack(index) - // s.log.Tracef("Allocate chunk %v/%v", index+1, s.MaxChunks) - return false, nil - } - - chunk.item = restored.PushBack(index) - // s.log.Tracef("Load chunk %v/%v (restored: %v)", index+1, s.MaxChunks, id) - s.chunks[id] = index - - return true, nil -} - -// allocateChunk creates a new mmap-backed chunk -func (s *Storage) allocateChunk(index int) (*Chunk, error) { - region := int64(index) / s.chunksPerRegion - offset := (int64(index) - region*s.chunksPerRegion) * s.ChunkSize - // s.log.Tracef("Allocate chunk %v from region %v at offset %v", index+1, region, offset) - bytes := s.mmapRegions[region][offset : offset+s.ChunkSize : offset+s.ChunkSize] - headerOffset := index * headerSize - header := (*chunkHeader)(unsafe.Pointer(&s.journal[headerOffset])) - chunk := Chunk{ - chunkHeader: header, - bytes: bytes, - } - return &chunk, nil -} - -func (s *Storage) mmap(offset, size int64) ([]byte, error) { - if s.ChunkFile != nil { - return unix.Mmap(int(s.ChunkFile.Fd()), offset, int(size), syscall.PROT_READ|syscall.PROT_WRITE, syscall.MAP_SHARED) - } else { - return unix.Mmap(-1, 0, int(size), syscall.PROT_READ|syscall.PROT_WRITE, syscall.MAP_ANON|syscall.MAP_PRIVATE) - } -} - -// Clear removes all old chunks on disk (will be called on each program start) -func (s *Storage) Clear() error { - return nil -} - -// Load a chunk from ram or creates it -func (s *Storage) Load(id RequestID) []byte { - s.lock.RLock() - chunk := s.fetch(id) - if nil == chunk { - // s.log.Tracef("Load chunk %v (missing)", id) - s.lock.RUnlock() - return nil - } - if chunk.clean { - // s.log.Tracef("Load chunk %v (clean)", id) - defer s.lock.RUnlock() - return chunk.bytes - } - s.lock.RUnlock() - // Switch to write lock to avoid races on crc verification - s.lock.Lock() - defer s.lock.Unlock() - if chunk.valid(id) { - s.log.Debugf("Load chunk %v (verified)", id) - return chunk.bytes - } - s.log.Warnf("Load chunk %v (bad checksum: %08x <> %08x)", id, chunk.checksum, chunk.calculateChecksum()) - s.stack.Purge(chunk.item) - return nil -} - -// Store stores a chunk in the RAM and adds it to the disk storage queue -func (s *Storage) Store(id RequestID, bytes []byte) (err error) { - s.lock.RLock() - - // Avoid storing same chunk multiple times - chunk := s.fetch(id) - if nil != chunk && chunk.clean { - // s.log.Tracef("Create chunk %v (exists: clean)", id) - s.lock.RUnlock() - return nil - } - - s.lock.RUnlock() - s.lock.Lock() - defer s.lock.Unlock() - - if nil != chunk { - if chunk.valid(id) { - s.log.Debugf("Create chunk %v (exists: valid)", id) - return nil - } - s.log.Warnf("Create chunk %v(exists: overwrite)", id) - } else { - index := s.stack.Pop() - if index == -1 { - s.log.Debugf("Create chunk %v (failed)", id) - return fmt.Errorf("no buffers available") - } - chunk = s.buffers[index] - deleteID := chunk.id - if blankRequestID != deleteID { - delete(s.chunks, deleteID) - s.log.Debugf("Create chunk %v (reused)", id) - } else { - s.log.Debugf("Create chunk %v (stored)", id) - } - s.chunks[id] = index - chunk.item = s.stack.Push(index) - } - - chunk.update(id, bytes) - - return nil -} - -// fetch chunk and index by id -func (s *Storage) fetch(id RequestID) *Chunk { - index, exists := s.chunks[id] - if !exists { - return nil - } - chunk := s.buffers[index] - s.stack.Touch(chunk.item) - return chunk -}