diff --git a/cmd/zurg/main.go b/cmd/zurg/main.go index 7b7b90f..b555940 100644 --- a/cmd/zurg/main.go +++ b/cmd/zurg/main.go @@ -13,6 +13,7 @@ import ( "github.com/debridmediamanager.com/zurg/internal/net" "github.com/debridmediamanager.com/zurg/internal/torrent" "github.com/debridmediamanager.com/zurg/internal/zfs" + "github.com/debridmediamanager.com/zurg/pkg/chunk" "github.com/debridmediamanager.com/zurg/pkg/logutil" "github.com/hashicorp/golang-lru/v2/expirable" ) @@ -28,10 +29,10 @@ func main() { cache := expirable.NewLRU[string, string](1e4, nil, time.Hour) - t := torrent.NewTorrentManager(config, cache) + torrentMgr := torrent.NewTorrentManager(config, cache) mux := http.NewServeMux() - net.Router(mux, config, t, cache) + net.Router(mux, config, torrentMgr, cache) addr := fmt.Sprintf(":%s", config.GetPort()) server := &http.Server{Addr: addr, Handler: mux} @@ -43,6 +44,18 @@ func main() { } } + chunkMgr, err := chunk.NewManager( + "", + 10485760, + 4, + 4, + 4, + 32, + config) + if nil != err { + log.Panicf("Failed to initialize chunk manager: %v", err) + } + shutdown := make(chan os.Signal, 1) signal.Notify(shutdown, os.Interrupt, syscall.SIGTERM) @@ -66,7 +79,7 @@ func main() { } }() log.Infof("Mounting on %s", mountPoint) - if err := zfs.Mount(mountPoint, config, t); err != nil { + if err := zfs.Mount(mountPoint, config, torrentMgr, chunkMgr); err != nil { log.Panicf("Failed to mount: %v", err) } }() diff --git a/internal/zfs/fs.go b/internal/zfs/fs.go index 2967f1e..4e6d74c 100644 --- a/internal/zfs/fs.go +++ b/internal/zfs/fs.go @@ -8,6 +8,7 @@ import ( "bazil.org/fuse/fs" "github.com/debridmediamanager.com/zurg/internal/config" "github.com/debridmediamanager.com/zurg/internal/torrent" + "github.com/debridmediamanager.com/zurg/pkg/chunk" "go.uber.org/zap" ) @@ -21,6 +22,7 @@ type FS struct { t *torrent.TorrentManager log *zap.SugaredLogger initTime time.Time + chunk *chunk.Manager } // Root returns the root path diff --git a/internal/zfs/mount.go b/internal/zfs/mount.go index 8dad4cf..7145a99 100644 --- a/internal/zfs/mount.go +++ b/internal/zfs/mount.go @@ -8,12 +8,13 @@ import ( "bazil.org/fuse/fs" "github.com/debridmediamanager.com/zurg/internal/config" "github.com/debridmediamanager.com/zurg/internal/torrent" + "github.com/debridmediamanager.com/zurg/pkg/chunk" "github.com/debridmediamanager.com/zurg/pkg/logutil" "golang.org/x/sys/unix" ) -func Mount(mountpoint string, c config.ConfigInterface, t *torrent.TorrentManager) error { +func Mount(mountpoint string, cfg config.ConfigInterface, tMgr *torrent.TorrentManager, cMgr *chunk.Manager) error { rlog := logutil.NewLogger() log := rlog.Named("zfs") @@ -36,10 +37,11 @@ func Mount(mountpoint string, c config.ConfigInterface, t *torrent.TorrentManage uid: uint32(unix.Geteuid()), gid: uint32(unix.Getegid()), umask: os.FileMode(0), - c: c, - t: t, + c: cfg, + t: tMgr, log: log, initTime: time.Now(), + chunk: cMgr, } if err := srv.Serve(filesys); err != nil { diff --git a/internal/zfs/object.go b/internal/zfs/object.go index dc34dab..c0574f8 100644 --- a/internal/zfs/object.go +++ b/internal/zfs/object.go @@ -3,8 +3,6 @@ package zfs import ( "context" "fmt" - "io" - "net/http" "os" "path/filepath" "strings" @@ -13,7 +11,7 @@ import ( "bazil.org/fuse" "bazil.org/fuse/fs" - "github.com/debridmediamanager.com/zurg/pkg/realdebrid" + "github.com/debridmediamanager.com/zurg/internal/torrent" ) // define variable as rootObject id @@ -29,7 +27,7 @@ type Object struct { objType int parentName string name string - link string + file *torrent.File size uint64 mtime time.Time } @@ -141,7 +139,7 @@ func (o Object) Lookup(ctx context.Context, name string) (fs.Node, error) { objType: FILE, parentName: o.name, name: name, - link: file.Link, + file: &file, size: uint64(file.Bytes), mtime: convertRFC3339toTime(item.Added), }, nil @@ -160,69 +158,13 @@ func (o Object) Open(ctx context.Context, req *fuse.OpenRequest, resp *fuse.Open // Read reads some bytes or the whole file func (o Object) Read(ctx context.Context, req *fuse.ReadRequest, resp *fuse.ReadResponse) error { - o.fs.log.Debugf("Reading offset %d size %d", req.Offset, req.Size) - unrestrictFn := func() (*realdebrid.UnrestrictResponse, error) { - return realdebrid.UnrestrictLink(o.fs.c.GetToken(), o.link) - } - unrestrictResp := realdebrid.RetryUntilOk(unrestrictFn) - if unrestrictResp == nil { - o.fs.log.Errorf("Cannot unrestrict link %s", o.link) - return syscall.EIO - } else if unrestrictResp.Filename != o.name { - actualExt := filepath.Ext(unrestrictResp.Filename) - expectedExt := filepath.Ext(o.name) - if actualExt != expectedExt { - o.fs.log.Errorf("File extension mismatch: expected %s, got %s", expectedExt, actualExt) - } else { - o.fs.log.Errorf("Filename mismatch: expected %s, got %s", o.name, unrestrictResp.Filename) - } - } - o.fs.log.Debugf("Unrestricted link: %s", unrestrictResp.Download) - - httpClient := &http.Client{ - Timeout: 30 * time.Second, // Set a timeout to prevent hanging + data, err := o.fs.chunk.GetChunk(o.file, req.Offset, int64(req.Size)) + if nil != err { + o.fs.log.Warnf("%v", err) + return fuse.EIO } - streamReq, err := http.NewRequest(http.MethodGet, unrestrictResp.Download, nil) - if err != nil { - o.fs.log.Errorf("Error creating new request %v", err) - return syscall.EIO - } - - // Ensure that the byte range is correctly specified - endByte := req.Offset + int64(req.Size) - 1 - streamReq.Header.Add("Range", fmt.Sprintf("bytes=%d-%d", req.Offset, endByte)) - o.fs.log.Debugf("Requested bytes=%d-%d", req.Offset, endByte) - - streamResp, err := httpClient.Do(streamReq.WithContext(ctx)) - if err != nil { - o.fs.log.Errorf("Error downloading file %s %v", unrestrictResp.Download, err) - return syscall.EIO - } - defer streamResp.Body.Close() - - // Check for successful status codes - if streamResp.StatusCode != http.StatusOK && streamResp.StatusCode != http.StatusPartialContent { - o.fs.log.Errorf("Received a non-ok status code %d for %s", streamResp.StatusCode, unrestrictResp.Download) - return syscall.EIO - } - - // Optionally check if the server returned the expected byte range - contentRange := streamResp.Header.Get("Content-Range") - if contentRange == "" && streamResp.StatusCode == http.StatusPartialContent { - o.fs.log.Error("Content-Range header is missing in HTTP partial content response") - return syscall.EIO - } - - // Process the stream in chunks. - // Allocate a buffer to read into. - respData, readErr := io.ReadAll(streamResp.Body) - if readErr != nil { - o.fs.log.Errorf("Error reading bytes from download response %v", readErr) - return syscall.EIO - } - resp.Data = respData - o.fs.log.Debugf("Read %d bytes in total", len(resp.Data)) + resp.Data = data return nil } diff --git a/pkg/chunk/manager.go b/pkg/chunk/manager.go index 6302f2d..8e397e2 100644 --- a/pkg/chunk/manager.go +++ b/pkg/chunk/manager.go @@ -4,7 +4,6 @@ import ( "encoding/binary" "encoding/hex" "fmt" - "net/http" "os" "github.com/debridmediamanager.com/zurg/internal/config" @@ -58,7 +57,6 @@ func NewManager( loadAhead, checkThreads int, loadThreads int, - client *http.Client, maxChunks int, c config.ConfigInterface) (*Manager, error) {