Fix chunk manager

This commit is contained in:
Ben Sarmiento
2023-11-14 16:55:24 +01:00
parent 16505ec33f
commit 4da9416bec
4 changed files with 24 additions and 25 deletions

View File

@@ -67,12 +67,12 @@ func main() {
log.Debugf("Initializing chunk manager, cores: %d", runtime.NumCPU()) log.Debugf("Initializing chunk manager, cores: %d", runtime.NumCPU())
// 64kb request size // 64kb request size
chunkMgr, err := chunk.NewManager( chunkMgr, err := chunk.NewManager(
"", "", // in-memory chunk file
1048576, // 1MB 5242880, // 10MB
1, // 1 chunk - load ahead (1MB total) max((runtime.NumCPU()/2)-1, 1), // 1 chunk - load ahead (1MB total)
max(runtime.NumCPU()/2, 1), // check threads max(runtime.NumCPU()/2, 1), // check threads
max(runtime.NumCPU()/2, 1), // load threads max(runtime.NumCPU()/2, 1), // load threads
runtime.NumCPU()*2, runtime.NumCPU()*4,
torrentMgr, // max chunks torrentMgr, // max chunks
config) config)
if nil != err { if nil != err {

View File

@@ -108,9 +108,6 @@ func (fs *ZurgFS) Read(path string, buff []byte, ofst int64, fh uint64) (n int)
return -fuse.ENOENT return -fuse.ENOENT
} else { } else {
size := int64(len(buff)) size := int64(len(buff))
if size < int64(fs.Config.GetNetworkBufferSize()) {
size = int64(fs.Config.GetNetworkBufferSize())
}
endofst := ofst + size endofst := ofst + size
if endofst > file.Bytes { if endofst > file.Bytes {
endofst = file.Bytes endofst = file.Bytes
@@ -118,14 +115,14 @@ func (fs *ZurgFS) Read(path string, buff []byte, ofst int64, fh uint64) (n int)
if endofst < ofst { if endofst < ofst {
return 0 return 0
} }
// let's request a bigger chunk than we need
if size < int64(fs.Config.GetNetworkBufferSize()) {
size = int64(fs.Config.GetNetworkBufferSize())
}
response, err := fs.Chunk.GetChunk(file, ofst, size) response, err := fs.Chunk.GetChunk(file, ofst, size)
if err != nil { if err != nil {
return -fuse.ENOENT return -fuse.ENOENT
} }
// response := universal.GetFileReader(torrent, file, ofst, int(size), fs.TorrentManager, fs.Config, fs.Log)
// if response == nil {
// return -fuse.ENOENT
// }
n = copy(buff, response[:endofst-ofst]) n = copy(buff, response[:endofst-ofst])
return n return n
} }

View File

@@ -8,7 +8,6 @@ import (
"syscall" "syscall"
"time" "time"
"github.com/debridmediamanager.com/zurg/internal/config"
"github.com/debridmediamanager.com/zurg/internal/torrent" "github.com/debridmediamanager.com/zurg/internal/torrent"
"github.com/debridmediamanager.com/zurg/pkg/logutil" "github.com/debridmediamanager.com/zurg/pkg/logutil"
"go.uber.org/zap" "go.uber.org/zap"
@@ -22,15 +21,14 @@ type Downloader struct {
callbacks map[RequestID][]DownloadCallback callbacks map[RequestID][]DownloadCallback
lock sync.Mutex lock sync.Mutex
storage *Storage storage *Storage
c config.ConfigInterface torMgr *torrent.TorrentManager
t *torrent.TorrentManager
log *zap.SugaredLogger log *zap.SugaredLogger
} }
type DownloadCallback func(error, []byte) type DownloadCallback func(error, []byte)
// NewDownloader creates a new download manager // NewDownloader creates a new download manager
func NewDownloader(threads int, storage *Storage, bufferSize int64, t *torrent.TorrentManager, c config.ConfigInterface) (*Downloader, error) { func NewDownloader(threads int, storage *Storage, bufferSize int64, torMgr *torrent.TorrentManager) (*Downloader, error) {
rlog := logutil.NewLogger() rlog := logutil.NewLogger()
log := rlog.Named("downloader") log := rlog.Named("downloader")
@@ -39,8 +37,7 @@ func NewDownloader(threads int, storage *Storage, bufferSize int64, t *torrent.T
queue: make(chan *Request, 100), queue: make(chan *Request, 100),
callbacks: make(map[RequestID][]DownloadCallback, 100), callbacks: make(map[RequestID][]DownloadCallback, 100),
storage: storage, storage: storage,
c: c, torMgr: torMgr,
t: t,
log: log, log: log,
} }
@@ -105,7 +102,7 @@ func (d *Downloader) downloadFromAPI(request *Request, buffer []byte, delay int6
time.Sleep(time.Duration(delay) * time.Second) time.Sleep(time.Duration(delay) * time.Second)
} }
resp := d.t.UnrestrictUntilOk(request.file.Link) resp := d.torMgr.UnrestrictUntilOk(request.file.Link)
if resp == nil { if resp == nil {
return fmt.Errorf("cannot unrestrict file %s %s", request.file.Path, request.file.Link) return fmt.Errorf("cannot unrestrict file %s %s", request.file.Path, request.file.Link)
} }

View File

@@ -1,9 +1,9 @@
package chunk package chunk
import ( import (
"crypto/sha256"
"encoding/binary" "encoding/binary"
"fmt" "fmt"
"hash/fnv"
"os" "os"
"github.com/debridmediamanager.com/zurg/internal/config" "github.com/debridmediamanager.com/zurg/internal/config"
@@ -58,8 +58,8 @@ func NewManager(
checkThreads, checkThreads,
loadThreads, loadThreads,
maxChunks int, maxChunks int,
t *torrent.TorrentManager, torMgr *torrent.TorrentManager,
c config.ConfigInterface) (*Manager, error) { cfg config.ConfigInterface) (*Manager, error) {
pageSize := int64(os.Getpagesize()) pageSize := int64(os.Getpagesize())
if chunkSize < pageSize { if chunkSize < pageSize {
@@ -82,7 +82,7 @@ func NewManager(
return nil, err return nil, err
} }
downloader, err := NewDownloader(loadThreads, storage, chunkSize, t, c) downloader, err := NewDownloader(loadThreads, storage, chunkSize, torMgr)
if nil != err { if nil != err {
return nil, err return nil, err
} }
@@ -149,12 +149,17 @@ func buildRequestID(object *torrent.File, offset int64) (id RequestID) {
if fileID == "" { if fileID == "" {
fileID = object.Path fileID = object.Path
} }
hash := sha256.Sum256([]byte(fileID)) hash := hashStringToFh(fileID)
copy(id[:16], hash[:16]) copy(id[:16], hash[:16])
binary.BigEndian.PutUint64(id[16:], uint64(offset)) binary.BigEndian.PutUint64(id[16:], uint64(offset))
return return
} }
func hashStringToFh(s string) []byte {
hasher := fnv.New64a()
return hasher.Sum([]byte(s))
}
func (m *Manager) requestChunk(object *torrent.File, offset, size int64, sequence int, preload bool, response chan Response) { func (m *Manager) requestChunk(object *torrent.File, offset, size int64, sequence int, preload bool, response chan Response) {
chunkOffset := offset % m.ChunkSize chunkOffset := offset % m.ChunkSize
offsetStart := offset - chunkOffset offsetStart := offset - chunkOffset