From 4da9416bec98e2702d8e78a194aa42725f55a267 Mon Sep 17 00:00:00 2001 From: Ben Sarmiento Date: Tue, 14 Nov 2023 16:55:24 +0100 Subject: [PATCH] Fix chunk manager --- cmd/zurg/main.go | 12 ++++++------ internal/zfs/zfs.go | 11 ++++------- pkg/chunk/download.go | 11 ++++------- pkg/chunk/manager.go | 15 ++++++++++----- 4 files changed, 24 insertions(+), 25 deletions(-) diff --git a/cmd/zurg/main.go b/cmd/zurg/main.go index 576726f..04f881d 100644 --- a/cmd/zurg/main.go +++ b/cmd/zurg/main.go @@ -67,12 +67,12 @@ func main() { log.Debugf("Initializing chunk manager, cores: %d", runtime.NumCPU()) // 64kb request size chunkMgr, err := chunk.NewManager( - "", - 1048576, // 1MB - 1, // 1 chunk - load ahead (1MB total) - max(runtime.NumCPU()/2, 1), // check threads - max(runtime.NumCPU()/2, 1), // load threads - runtime.NumCPU()*2, + "", // in-memory chunk file + 5242880, // 10MB + max((runtime.NumCPU()/2)-1, 1), // 1 chunk - load ahead (1MB total) + max(runtime.NumCPU()/2, 1), // check threads + max(runtime.NumCPU()/2, 1), // load threads + runtime.NumCPU()*4, torrentMgr, // max chunks config) if nil != err { diff --git a/internal/zfs/zfs.go b/internal/zfs/zfs.go index a043ccc..5ec7ae8 100644 --- a/internal/zfs/zfs.go +++ b/internal/zfs/zfs.go @@ -108,9 +108,6 @@ func (fs *ZurgFS) Read(path string, buff []byte, ofst int64, fh uint64) (n int) return -fuse.ENOENT } else { size := int64(len(buff)) - if size < int64(fs.Config.GetNetworkBufferSize()) { - size = int64(fs.Config.GetNetworkBufferSize()) - } endofst := ofst + size if endofst > file.Bytes { endofst = file.Bytes @@ -118,14 +115,14 @@ func (fs *ZurgFS) Read(path string, buff []byte, ofst int64, fh uint64) (n int) if endofst < ofst { return 0 } + // let's request a bigger chunk than we need + if size < int64(fs.Config.GetNetworkBufferSize()) { + size = int64(fs.Config.GetNetworkBufferSize()) + } response, err := fs.Chunk.GetChunk(file, ofst, size) if err != nil { return -fuse.ENOENT } - // response := universal.GetFileReader(torrent, file, ofst, int(size), fs.TorrentManager, fs.Config, fs.Log) - // if response == nil { - // return -fuse.ENOENT - // } n = copy(buff, response[:endofst-ofst]) return n } diff --git a/pkg/chunk/download.go b/pkg/chunk/download.go index c26be05..6f096a3 100644 --- a/pkg/chunk/download.go +++ b/pkg/chunk/download.go @@ -8,7 +8,6 @@ import ( "syscall" "time" - "github.com/debridmediamanager.com/zurg/internal/config" "github.com/debridmediamanager.com/zurg/internal/torrent" "github.com/debridmediamanager.com/zurg/pkg/logutil" "go.uber.org/zap" @@ -22,15 +21,14 @@ type Downloader struct { callbacks map[RequestID][]DownloadCallback lock sync.Mutex storage *Storage - c config.ConfigInterface - t *torrent.TorrentManager + torMgr *torrent.TorrentManager log *zap.SugaredLogger } type DownloadCallback func(error, []byte) // NewDownloader creates a new download manager -func NewDownloader(threads int, storage *Storage, bufferSize int64, t *torrent.TorrentManager, c config.ConfigInterface) (*Downloader, error) { +func NewDownloader(threads int, storage *Storage, bufferSize int64, torMgr *torrent.TorrentManager) (*Downloader, error) { rlog := logutil.NewLogger() log := rlog.Named("downloader") @@ -39,8 +37,7 @@ func NewDownloader(threads int, storage *Storage, bufferSize int64, t *torrent.T queue: make(chan *Request, 100), callbacks: make(map[RequestID][]DownloadCallback, 100), storage: storage, - c: c, - t: t, + torMgr: torMgr, log: log, } @@ -105,7 +102,7 @@ func (d *Downloader) downloadFromAPI(request *Request, buffer []byte, delay int6 time.Sleep(time.Duration(delay) * time.Second) } - resp := d.t.UnrestrictUntilOk(request.file.Link) + resp := d.torMgr.UnrestrictUntilOk(request.file.Link) if resp == nil { return fmt.Errorf("cannot unrestrict file %s %s", request.file.Path, request.file.Link) } diff --git a/pkg/chunk/manager.go b/pkg/chunk/manager.go index 00011a4..4ac1334 100644 --- a/pkg/chunk/manager.go +++ b/pkg/chunk/manager.go @@ -1,9 +1,9 @@ package chunk import ( - "crypto/sha256" "encoding/binary" "fmt" + "hash/fnv" "os" "github.com/debridmediamanager.com/zurg/internal/config" @@ -58,8 +58,8 @@ func NewManager( checkThreads, loadThreads, maxChunks int, - t *torrent.TorrentManager, - c config.ConfigInterface) (*Manager, error) { + torMgr *torrent.TorrentManager, + cfg config.ConfigInterface) (*Manager, error) { pageSize := int64(os.Getpagesize()) if chunkSize < pageSize { @@ -82,7 +82,7 @@ func NewManager( return nil, err } - downloader, err := NewDownloader(loadThreads, storage, chunkSize, t, c) + downloader, err := NewDownloader(loadThreads, storage, chunkSize, torMgr) if nil != err { return nil, err } @@ -149,12 +149,17 @@ func buildRequestID(object *torrent.File, offset int64) (id RequestID) { if fileID == "" { fileID = object.Path } - hash := sha256.Sum256([]byte(fileID)) + hash := hashStringToFh(fileID) copy(id[:16], hash[:16]) binary.BigEndian.PutUint64(id[16:], uint64(offset)) return } +func hashStringToFh(s string) []byte { + hasher := fnv.New64a() + return hasher.Sum([]byte(s)) +} + func (m *Manager) requestChunk(object *torrent.File, offset, size int64, sequence int, preload bool, response chan Response) { chunkOffset := offset % m.ChunkSize offsetStart := offset - chunkOffset