This commit is contained in:
Ben Sarmiento
2023-11-07 00:07:31 +01:00
parent 6be49d8843
commit fb7fce1a43
13 changed files with 0 additions and 1412 deletions

View File

@@ -6,15 +6,12 @@ import (
"net/http" "net/http"
"os" "os"
"os/signal" "os/signal"
"runtime"
"syscall" "syscall"
"time" "time"
"github.com/debridmediamanager.com/zurg/internal/config" "github.com/debridmediamanager.com/zurg/internal/config"
"github.com/debridmediamanager.com/zurg/internal/net" "github.com/debridmediamanager.com/zurg/internal/net"
"github.com/debridmediamanager.com/zurg/internal/torrent" "github.com/debridmediamanager.com/zurg/internal/torrent"
"github.com/debridmediamanager.com/zurg/internal/zfs"
"github.com/debridmediamanager.com/zurg/pkg/chunk"
"github.com/debridmediamanager.com/zurg/pkg/logutil" "github.com/debridmediamanager.com/zurg/pkg/logutil"
"github.com/hashicorp/golang-lru/v2/expirable" "github.com/hashicorp/golang-lru/v2/expirable"
) )
@@ -38,27 +35,6 @@ func main() {
addr := fmt.Sprintf(":%s", config.GetPort()) addr := fmt.Sprintf(":%s", config.GetPort())
server := &http.Server{Addr: addr, Handler: mux} server := &http.Server{Addr: addr, Handler: mux}
mountPoint := config.GetMountPoint()
if _, err := os.Stat(mountPoint); os.IsNotExist(err) {
if err := os.Mkdir(mountPoint, 0755); err != nil {
log.Panicf("Failed to create mount point: %v", err)
}
}
log.Debugf("Initializing chunk manager, cores: %d", runtime.NumCPU())
// 64kb request size
chunkMgr, err := chunk.NewManager(
"",
524288, // 512kb
1, // 1 chunk - load ahead (1MB total)
max(runtime.NumCPU()/2, 1), // check threads
max(runtime.NumCPU()/2, 1), // load threads
runtime.NumCPU()*2, // max chunks
config)
if nil != err {
log.Panicf("Failed to initialize chunk manager: %v", err)
}
shutdown := make(chan os.Signal, 1) shutdown := make(chan os.Signal, 1)
signal.Notify(shutdown, os.Interrupt, syscall.SIGTERM) signal.Notify(shutdown, os.Interrupt, syscall.SIGTERM)
@@ -74,19 +50,6 @@ func main() {
} }
}() }()
// Start the mount in a goroutine with panic recovery.
go func() {
defer func() {
if r := recover(); r != nil {
log.Errorf("Mount panic: %v\n", r)
}
}()
log.Infof("Mounting on %s", mountPoint)
if err := zfs.Mount(mountPoint, config, torrentMgr, chunkMgr); err != nil {
log.Panicf("Failed to mount: %v", err)
}
}()
<-shutdown <-shutdown
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
@@ -95,9 +58,6 @@ func main() {
if err := server.Shutdown(ctx); err != nil { if err := server.Shutdown(ctx); err != nil {
log.Errorf("Server shutdown error: %v\n", err) log.Errorf("Server shutdown error: %v\n", err)
} }
if err := zfs.Unmount(mountPoint); err != nil {
log.Errorf("Unmount error: %v\n", err)
}
log.Info("BYE") log.Info("BYE")
} }

2
go.mod
View File

@@ -3,10 +3,8 @@ module github.com/debridmediamanager.com/zurg
go 1.21.3 go 1.21.3
require ( require (
bazil.org/fuse v0.0.0-20230120002735-62a210ff1fd5
github.com/hashicorp/golang-lru/v2 v2.0.7 github.com/hashicorp/golang-lru/v2 v2.0.7
go.uber.org/zap v1.26.0 go.uber.org/zap v1.26.0
golang.org/x/sys v0.4.0
gopkg.in/yaml.v3 v3.0.1 gopkg.in/yaml.v3 v3.0.1
) )

6
go.sum
View File

@@ -1,5 +1,3 @@
bazil.org/fuse v0.0.0-20230120002735-62a210ff1fd5 h1:A0NsYy4lDBZAC6QiYeJ4N+XuHIKBpyhAVRMHRQZKTeQ=
bazil.org/fuse v0.0.0-20230120002735-62a210ff1fd5/go.mod h1:gG3RZAMXCa/OTes6rr9EwusmR1OH1tDDy+cg9c5YliY=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k=
@@ -8,16 +6,12 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/tv42/httpunix v0.0.0-20191220191345-2ba4b9c3382c h1:u6SKchux2yDvFQnDHS3lPnIRmfVJ5Sxy3ao2SIdysLQ=
github.com/tv42/httpunix v0.0.0-20191220191345-2ba4b9c3382c/go.mod h1:hzIxponao9Kjc7aWznkXaL4U4TWaDSs8zcsY4Ka08nM=
go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk= go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk=
go.uber.org/goleak v1.2.0/go.mod h1:XJYK+MuIchqpmGmUSAzotztawfKvYLUIgg7guXrwVUo= go.uber.org/goleak v1.2.0/go.mod h1:XJYK+MuIchqpmGmUSAzotztawfKvYLUIgg7guXrwVUo=
go.uber.org/multierr v1.10.0 h1:S0h4aNzvfcFsC3dRF1jLoaov7oRaKqRGC/pUEJ2yvPQ= go.uber.org/multierr v1.10.0 h1:S0h4aNzvfcFsC3dRF1jLoaov7oRaKqRGC/pUEJ2yvPQ=
go.uber.org/multierr v1.10.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= go.uber.org/multierr v1.10.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
go.uber.org/zap v1.26.0 h1:sI7k6L95XOKS281NhVKOFCUNIvv9e0w4BF8N3u+tCRo= go.uber.org/zap v1.26.0 h1:sI7k6L95XOKS281NhVKOFCUNIvv9e0w4BF8N3u+tCRo=
go.uber.org/zap v1.26.0/go.mod h1:dtElttAiwGvoJ/vj4IwHBS/gXsEu/pZ50mUIRWuG0so= go.uber.org/zap v1.26.0/go.mod h1:dtElttAiwGvoJ/vj4IwHBS/gXsEu/pZ50mUIRWuG0so=
golang.org/x/sys v0.4.0 h1:Zr2JFtRQNX3BCZ8YtxRE9hNJYC8J6I1MVbMg6owUp18=
golang.org/x/sys v0.4.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=

View File

@@ -1,35 +0,0 @@
package zfs
import (
"os"
"sync"
"time"
"bazil.org/fuse/fs"
"github.com/debridmediamanager.com/zurg/internal/config"
"github.com/debridmediamanager.com/zurg/internal/torrent"
"github.com/debridmediamanager.com/zurg/pkg/chunk"
"go.uber.org/zap"
)
type FS struct {
uid uint32
gid uint32
umask os.FileMode
directIO bool
lock sync.RWMutex
c config.ConfigInterface
t *torrent.TorrentManager
log *zap.SugaredLogger
initTime time.Time
chunk *chunk.Manager
}
// Root returns the root path
func (f *FS) Root() (fs.Node, error) {
return Object{
fs: f,
objType: ROOT,
mtime: f.initTime,
}, nil
}

View File

@@ -1,57 +0,0 @@
package zfs
import (
"os"
"time"
"bazil.org/fuse"
"bazil.org/fuse/fs"
"github.com/debridmediamanager.com/zurg/internal/config"
"github.com/debridmediamanager.com/zurg/internal/torrent"
"github.com/debridmediamanager.com/zurg/pkg/chunk"
"github.com/debridmediamanager.com/zurg/pkg/logutil"
"golang.org/x/sys/unix"
)
func Mount(mountpoint string, cfg config.ConfigInterface, tMgr *torrent.TorrentManager, cMgr *chunk.Manager) error {
rlog := logutil.NewLogger()
log := rlog.Named("zfs")
options := []fuse.MountOption{
fuse.AllowOther(),
fuse.AllowNonEmptyMount(),
fuse.MaxReadahead(uint32(128 << 10)),
fuse.DefaultPermissions(),
fuse.FSName("zurgfs"),
}
conn, err := fuse.Mount(mountpoint, options...)
if err != nil {
return err
}
defer conn.Close()
srv := fs.New(conn, nil)
filesys := &FS{
uid: uint32(unix.Geteuid()),
gid: uint32(unix.Getegid()),
umask: os.FileMode(0),
c: cfg,
t: tMgr,
log: log,
initTime: time.Now(),
chunk: cMgr,
}
if err := srv.Serve(filesys); err != nil {
return err
}
return nil
}
func Unmount(mountpoint string) error {
fuse.Unmount(mountpoint)
return nil
}

View File

@@ -1,189 +0,0 @@
package zfs
import (
"context"
"fmt"
"os"
"path/filepath"
"strings"
"syscall"
"time"
"bazil.org/fuse"
"bazil.org/fuse/fs"
"github.com/debridmediamanager.com/zurg/internal/torrent"
)
// define variable as rootObject id
const (
ROOT = 0
DIRECTORY = 1
TORRENT = 2
FILE = 3
)
type Object struct {
fs *FS
objType int
parentName string
name string
file *torrent.File
size uint64
mtime time.Time
}
// Attr returns the attributes for a directory
func (o Object) Attr(ctx context.Context, attr *fuse.Attr) error {
if o.objType == FILE {
attr.Mode = 0644
} else {
attr.Mode = os.ModeDir | 0755
}
attr.Size = o.size
attr.Uid = o.fs.uid
attr.Gid = o.fs.gid
attr.Ctime = o.mtime
attr.Mtime = o.mtime
attr.Blocks = (attr.Size + 511) >> 9
return nil
}
// ReadDirAll shows all files in the current directory
func (o Object) ReadDirAll(ctx context.Context) ([]fuse.Dirent, error) {
dirs := []fuse.Dirent{}
switch o.objType {
case ROOT:
for _, directory := range o.fs.c.GetDirectories() {
dirs = append(dirs, fuse.Dirent{
Name: directory,
Type: fuse.DT_Dir,
})
}
case DIRECTORY:
seen := make(map[string]bool)
for _, item := range o.fs.t.GetByDirectory(o.name) {
if item.Progress != 100 {
continue
}
if _, exists := seen[item.Name]; exists {
continue
}
seen[item.Name] = true
dirs = append(dirs, fuse.Dirent{
Name: item.Name,
Type: fuse.DT_Dir,
})
}
case TORRENT:
finalName := make(map[string]bool)
for _, item := range o.fs.t.FindAllTorrentsWithName(o.parentName, o.name) {
for _, file := range item.SelectedFiles {
if file.Link == "" {
// log.Println("File has no link, skipping", file.Path)
continue
}
filename := filepath.Base(file.Path)
if finalName[filename] {
// fragment := davextra.GetLinkFragment(file.Link)
// filename = davextra.InsertLinkFragment(filename, fragment)
continue
}
finalName[filename] = true
dirs = append(dirs, fuse.Dirent{
Name: filename,
Type: fuse.DT_File,
})
}
}
}
return dirs, nil
}
// Lookup tests if a file is existent in the current directory
func (o Object) Lookup(ctx context.Context, name string) (fs.Node, error) {
switch o.objType {
case ROOT:
for _, directory := range o.fs.c.GetDirectories() {
if directory == name {
return Object{
fs: o.fs,
objType: DIRECTORY,
parentName: o.name,
name: name,
mtime: o.fs.initTime,
}, nil
}
}
case DIRECTORY:
for _, item := range o.fs.t.GetByDirectory(o.name) {
if item.Name == name && item.Progress == 100 {
return Object{
fs: o.fs,
objType: TORRENT,
parentName: o.name,
name: name,
mtime: convertRFC3339toTime(item.Added),
}, nil
}
}
case TORRENT:
for _, item := range o.fs.t.FindAllTorrentsWithName(o.parentName, o.name) {
for _, file := range item.SelectedFiles {
if strings.HasSuffix(file.Path, name) && file.Link != "" {
return Object{
fs: o.fs,
objType: FILE,
parentName: o.name,
name: name,
file: &file,
size: uint64(file.Bytes),
mtime: convertRFC3339toTime(item.Added),
}, nil
}
}
}
}
return nil, syscall.ENOENT
}
// Open a file
func (o Object) Open(ctx context.Context, req *fuse.OpenRequest, resp *fuse.OpenResponse) (fs.Handle, error) {
resp.Flags |= fuse.OpenDirectIO
return o, nil
}
// Read reads some bytes or the whole file
func (o Object) Read(ctx context.Context, req *fuse.ReadRequest, resp *fuse.ReadResponse) error {
o.fs.log.Debugf("Read %s (total size %d) req offset %d req size %d", o.name, o.size, req.Offset, req.Size)
data, err := o.fs.chunk.GetChunk(o.file, req.Offset, int64(req.Size))
if nil != err {
o.fs.log.Warnf("%v", err)
return syscall.EIO
}
resp.Data = data
return nil
}
// Remove deletes an element
func (o Object) Remove(ctx context.Context, req *fuse.RemoveRequest) error {
return fmt.Errorf("Remove not yet implemented")
}
// Rename renames an element
func (o Object) Rename(ctx context.Context, req *fuse.RenameRequest, newDir fs.Node) error {
return fmt.Errorf("Rename not yet implemented")
}
func convertRFC3339toTime(input string) time.Time {
layout := "2006-01-02T15:04:05.000Z"
t, err := time.Parse(layout, input)
if err != nil {
return time.Now()
}
return t
}

View File

@@ -1,51 +0,0 @@
package chunk
import (
"container/list"
"hash/crc32"
)
// Chunk of memory
type Chunk struct {
clean bool
*chunkHeader
bytes []byte
item *list.Element
}
type chunkHeader struct {
id RequestID
size uint32
checksum uint32
}
func (c *Chunk) valid(id RequestID) bool {
if c.id != id {
return false
}
if !c.clean {
c.clean = c.checksum == c.calculateChecksum()
}
return c.clean
}
func (c *Chunk) update(id RequestID, bytes []byte) {
c.id = id
c.size = uint32(copy(c.bytes, bytes))
c.checksum = c.calculateChecksum()
c.clean = true
}
func (c *Chunk) calculateChecksum() uint32 {
size := c.size
if nil == c.bytes || size == 0 {
return 0
}
maxSize := uint32(len(c.bytes))
if size > maxSize {
// corrupt size or truncated chunk, fix size
c.size = maxSize
return crc32.Checksum(c.bytes, crc32Table)
}
return crc32.Checksum(c.bytes[:size], crc32Table)
}

View File

@@ -1,147 +0,0 @@
package chunk
import (
"fmt"
"io"
"net/http"
"sync"
"syscall"
"time"
"github.com/debridmediamanager.com/zurg/internal/config"
"github.com/debridmediamanager.com/zurg/pkg/logutil"
"github.com/debridmediamanager.com/zurg/pkg/realdebrid"
"go.uber.org/zap"
"golang.org/x/sys/unix"
)
// Downloader handles concurrent chunk downloads
type Downloader struct {
BufferSize int64
queue chan *Request
callbacks map[RequestID][]DownloadCallback
lock sync.Mutex
storage *Storage
c config.ConfigInterface
log *zap.SugaredLogger
}
type DownloadCallback func(error, []byte)
// NewDownloader creates a new download manager
func NewDownloader(threads int, storage *Storage, bufferSize int64, c config.ConfigInterface) (*Downloader, error) {
rlog := logutil.NewLogger()
log := rlog.Named("downloader")
manager := Downloader{
BufferSize: bufferSize,
queue: make(chan *Request, 100),
callbacks: make(map[RequestID][]DownloadCallback, 100),
storage: storage,
c: c,
log: log,
}
for i := 0; i < threads; i++ {
go manager.thread(i)
}
return &manager, nil
}
// Download starts a new download request
func (d *Downloader) Download(req *Request, callback DownloadCallback) {
d.lock.Lock()
callbacks, exists := d.callbacks[req.id]
if nil != callback {
d.callbacks[req.id] = append(callbacks, callback)
} else if !exists {
d.callbacks[req.id] = callbacks
}
if !exists {
d.queue <- req
}
d.lock.Unlock()
}
func (d *Downloader) thread(n int) {
buffer, err := unix.Mmap(-1, 0, int(d.BufferSize), syscall.PROT_READ|syscall.PROT_WRITE, syscall.MAP_ANON|syscall.MAP_PRIVATE)
if nil != err {
d.log.Warnf("Failed to mmap download buffer %v: %v", n, err)
buffer = make([]byte, d.BufferSize)
}
for {
req := <-d.queue
d.download(req, buffer)
}
}
func (d *Downloader) download(req *Request, buffer []byte) {
d.log.Debugf("Starting download %v (preload: %v)", req.id, req.preload)
err := d.downloadFromAPI(req, buffer, 0)
d.lock.Lock()
callbacks := d.callbacks[req.id]
for _, callback := range callbacks {
callback(err, buffer)
}
delete(d.callbacks, req.id)
d.lock.Unlock()
if nil != err {
return
}
if err := d.storage.Store(req.id, buffer); nil != err {
d.log.Warnf("Could not store chunk %v: %v", req.id, err)
}
}
func (d *Downloader) downloadFromAPI(request *Request, buffer []byte, delay int64) error {
// sleep if request is throttled
if delay > 0 {
time.Sleep(time.Duration(delay) * time.Second)
}
unrestrictFn := func() (*realdebrid.UnrestrictResponse, error) {
return realdebrid.UnrestrictLink(d.c.GetToken(), request.file.Link)
}
resp := realdebrid.RetryUntilOk(unrestrictFn)
if resp == nil {
return fmt.Errorf("cannot unrestrict file %s %s", request.file.Path, request.file.Link)
}
downloadURL := resp.Download
req, err := http.NewRequest("GET", downloadURL, nil)
if nil != err {
d.log.Debugf("request init error: %v", err)
return fmt.Errorf("could not create request object %s %s from API", request.file.Path, request.file.Link)
}
req.Header.Add("Range", fmt.Sprintf("bytes=%v-%v", request.offsetStart, request.offsetEnd-1))
res, err := http.DefaultClient.Do(req)
if nil != err {
d.log.Debugf("request error: %v", err)
return fmt.Errorf("could not request object %s %s from API", request.file.Path, request.file.Link)
}
defer res.Body.Close()
reader := res.Body
if res.StatusCode != 206 && res.StatusCode != 200 {
return fmt.Errorf("could not read object %s %s / StatusCode: %v",
request.file.Path, request.file.Link, res.StatusCode)
}
if res.ContentLength == -1 {
return fmt.Errorf("missing Content-Length header in response")
}
n, err := io.ReadFull(reader, buffer[:res.ContentLength:cap(buffer)])
if nil != err && err != io.ErrUnexpectedEOF {
d.log.Debugf("response read error: %v", err)
return fmt.Errorf("could not read objects %s %s API response", request.file.Path, request.file.Link)
}
d.log.Debugf("Downloaded %v bytes of %s %s", n, request.file.Path, request.file.Link)
return nil
}

View File

@@ -1,265 +0,0 @@
package chunk
import (
"crypto/sha256"
"encoding/binary"
"fmt"
"os"
"github.com/debridmediamanager.com/zurg/internal/config"
"github.com/debridmediamanager.com/zurg/internal/torrent"
)
// Manager manages chunks on disk
type Manager struct {
ChunkSize int64
LoadAhead int
downloader *Downloader
storage *Storage
queue chan *QueueEntry
}
type QueueEntry struct {
request *Request
response chan Response
}
// RequestID is the binary identifier for a chunk request
type RequestID [24]byte
func (id RequestID) String() string {
return fmt.Sprintf("%032x:%v", id[:16], binary.BigEndian.Uint64(id[16:]))
}
// Request represents a chunk request
type Request struct {
id RequestID
file *torrent.File
offsetStart int64
offsetEnd int64
chunkOffset int64
chunkOffsetEnd int64
sequence int
preload bool
}
// Response represetns a chunk response
type Response struct {
Sequence int
Error error
Bytes []byte
}
// NewManager creates a new chunk manager
func NewManager(
chunkFile string,
chunkSize int64,
loadAhead,
checkThreads,
loadThreads,
maxChunks int,
c config.ConfigInterface) (*Manager, error) {
pageSize := int64(os.Getpagesize())
if chunkSize < pageSize {
return nil, fmt.Errorf("chunk size must not be < %v", pageSize)
}
if chunkSize%pageSize != 0 {
return nil, fmt.Errorf("chunk size must be divideable by %v", pageSize)
}
// 32-Bit: ~2GB / 64-Bit: ~8EB
maxMmapSize := int64(^uint(0) >> 1)
if chunkSize > maxMmapSize {
return nil, fmt.Errorf("chunk size must be < %v", maxMmapSize)
}
if maxChunks < 2 || maxChunks < loadAhead {
return nil, fmt.Errorf("max-chunks must be greater than 2 and bigger than the load ahead value")
}
storage, err := NewStorage(chunkSize, maxChunks, maxMmapSize, chunkFile)
if nil != err {
return nil, err
}
downloader, err := NewDownloader(loadThreads, storage, chunkSize, c)
if nil != err {
return nil, err
}
manager := Manager{
ChunkSize: chunkSize,
LoadAhead: loadAhead,
downloader: downloader,
storage: storage,
queue: make(chan *QueueEntry, 100),
}
if err := manager.storage.Clear(); nil != err {
return nil, err
}
for i := 0; i < checkThreads; i++ {
go manager.thread()
}
return &manager, nil
}
// GetChunk loads one chunk and starts the preload for the next chunks
func (m *Manager) GetChunk(object *torrent.File, offset, size int64) ([]byte, error) {
maxOffset := object.Bytes
if offset > maxOffset {
return nil, fmt.Errorf("tried to read past EOF of %v at offset %v", object.ID, offset)
}
// Log.Infof("Request %v:%v md5:%v", object.ID, offset, object.MD5Checksum)
if offset+size > maxOffset {
size = object.Bytes - offset
}
ranges := splitChunkRanges(offset, size, m.ChunkSize)
numRanges := len(ranges)
responses := make(chan Response, numRanges)
last := numRanges - 1
for i, r := range ranges {
m.requestChunk(object, r.offset, r.size, i, i == last, responses)
}
data := make([]byte, size)
for i := 0; i < cap(responses); i++ {
res := <-responses
if nil != res.Error {
return nil, res.Error
}
dataOffset := ranges[res.Sequence].offset - offset
if n := copy(data[dataOffset:], res.Bytes); n == 0 {
return nil, fmt.Errorf("request %v slice %v has empty response", object.ID, res.Sequence)
}
}
close(responses)
return data, nil
}
func buildRequestID(object *torrent.File, offset int64) (id RequestID) {
fileID := object.Link
if fileID == "" {
fileID = object.Path
}
hash := sha256.Sum256([]byte(fileID))
copy(id[:16], hash[:16])
binary.BigEndian.PutUint64(id[16:], uint64(offset))
return
}
func (m *Manager) requestChunk(object *torrent.File, offset, size int64, sequence int, preload bool, response chan Response) {
chunkOffset := offset % m.ChunkSize
offsetStart := offset - chunkOffset
offsetEnd := offsetStart + m.ChunkSize
request := &Request{
id: buildRequestID(object, offsetStart),
file: object,
offsetStart: offsetStart,
offsetEnd: offsetEnd,
chunkOffset: chunkOffset,
chunkOffsetEnd: chunkOffset + size,
sequence: sequence,
preload: false,
}
m.queue <- &QueueEntry{
request: request,
response: response,
}
if !preload {
return
}
for i := m.ChunkSize; i < (m.ChunkSize * int64(m.LoadAhead+1)); i += m.ChunkSize {
aheadOffsetStart := offsetStart + i
aheadOffsetEnd := aheadOffsetStart + m.ChunkSize
if uint64(aheadOffsetStart) < uint64(object.Bytes) && uint64(aheadOffsetEnd) < uint64(object.Bytes) {
request := &Request{
id: buildRequestID(object, aheadOffsetStart),
file: object,
offsetStart: aheadOffsetStart,
offsetEnd: aheadOffsetEnd,
preload: true,
}
m.queue <- &QueueEntry{
request: request,
}
}
}
}
type byteRange struct {
offset, size int64
}
// Calculate request ranges that span multiple chunks
//
// This can happen with Direct-IO and unaligned reads or
// if the size is bigger than the chunk size.
func splitChunkRanges(offset, size, chunkSize int64) []byteRange {
ranges := make([]byteRange, 0, size/chunkSize+2)
for remaining := size; remaining > 0; remaining -= size {
size = min(remaining, chunkSize-offset%chunkSize)
ranges = append(ranges, byteRange{offset, size})
offset += size
}
return ranges
}
func (m *Manager) thread() {
for {
queueEntry := <-m.queue
m.checkChunk(queueEntry.request, queueEntry.response)
}
}
func (m *Manager) checkChunk(req *Request, response chan Response) {
if nil == response {
if nil == m.storage.Load(req.id) {
m.downloader.Download(req, nil)
}
return
}
if bytes := m.storage.Load(req.id); nil != bytes {
response <- Response{
Sequence: req.sequence,
Bytes: adjustResponseChunk(req, bytes),
}
return
}
m.downloader.Download(req, func(err error, bytes []byte) {
response <- Response{
Sequence: req.sequence,
Error: err,
Bytes: adjustResponseChunk(req, bytes),
}
})
}
func adjustResponseChunk(req *Request, bytes []byte) []byte {
if nil == bytes {
return nil
}
bytesLen := int64(len(bytes))
sOffset := min(req.chunkOffset, bytesLen)
eOffset := min(req.chunkOffsetEnd, bytesLen)
return bytes[sOffset:eOffset]
}
func min(x, y int64) int64 {
if x < y {
return x
}
return y
}

View File

@@ -1,53 +0,0 @@
package chunk
import "testing"
func TestSplitChunkRanges(t *testing.T) {
testcases := []struct {
offset, size, chunkSize int64
result []byteRange
}{
{0, 0, 4096, []byteRange{}},
{0, 4096, 4096, []byteRange{
{0, 4096},
}},
{4095, 4096, 4096, []byteRange{
{4095, 1},
{4096, 4095},
}},
{0, 8192, 4096, []byteRange{
{0, 4096},
{4096, 4096},
}},
{2048, 8192, 4096, []byteRange{
{2048, 2048},
{4096, 4096},
{8192, 2048},
}},
{2048, 8192, 4096, []byteRange{
{2048, 2048},
{4096, 4096},
{8192, 2048},
}},
{17960960, 16777216, 10485760, []byteRange{
{17960960, 3010560},
{20971520, 10485760},
{31457280, 3280896},
}},
}
for i, tc := range testcases {
ranges := splitChunkRanges(tc.offset, tc.size, tc.chunkSize)
actualSize := len(ranges)
expectedSize := len(tc.result)
if actualSize != expectedSize {
t.Fatalf("ByteRange %v length mismatch: %v != %v", i, actualSize, expectedSize)
}
for j, r := range ranges {
actual := r
expected := tc.result[j]
if actual != expected {
t.Fatalf("ByteRange %v mismatch: %v != %v", i, actual, expected)
}
}
}
}

View File

@@ -1,75 +0,0 @@
package chunk
import (
"container/list"
"sync"
)
// Stack is a thread safe list/stack implementation
type Stack struct {
items *list.List
lock sync.Mutex
maxSize int
}
// NewStack creates a new stack
func NewStack(maxChunks int) *Stack {
return &Stack{
items: list.New(),
maxSize: maxChunks,
}
}
// Len gets the number of items on the stack
func (s *Stack) Len() int {
s.lock.Lock()
defer s.lock.Unlock()
return s.items.Len()
}
// Pop pops the first item from the stack
func (s *Stack) Pop() int {
s.lock.Lock()
defer s.lock.Unlock()
if s.items.Len() < s.maxSize {
return -1
}
item := s.items.Front()
if nil == item {
return -1
}
s.items.Remove(item)
return item.Value.(int)
}
// Touch moves the specified item to the last position of the stack
func (s *Stack) Touch(item *list.Element) {
s.lock.Lock()
if item != s.items.Back() {
s.items.MoveToBack(item)
}
s.lock.Unlock()
}
// Push adds a new item to the last position of the stack
func (s *Stack) Push(id int) *list.Element {
s.lock.Lock()
defer s.lock.Unlock()
return s.items.PushBack(id)
}
// Prepend adds a list to the front of the stack
func (s *Stack) Prepend(items *list.List) {
s.lock.Lock()
s.items.PushFrontList(items)
s.lock.Unlock()
}
// Purge an item from the stack
func (s *Stack) Purge(item *list.Element) {
s.lock.Lock()
defer s.lock.Unlock()
if item != s.items.Front() {
s.items.MoveToFront(item)
}
}

View File

@@ -1,72 +0,0 @@
package chunk
import "testing"
func TestOOB(t *testing.T) {
stack := NewStack(1)
item := stack.Push(1)
stack.Touch(item)
}
func TestAddToStack(t *testing.T) {
stack := NewStack(1)
item1 := stack.Push(1)
item2 := stack.Push(2)
item3 := stack.Push(3)
item4 := stack.Push(4)
stack.Touch(item1)
stack.Touch(item3)
stack.Purge(item2)
stack.Purge(item4)
v := stack.Pop()
if v != 4 {
t.Fatalf("Expected 4 got %v", v)
}
v = stack.Pop()
if v != 2 {
t.Fatalf("Expected 2 got %v", v)
}
v = stack.Pop()
if v != 1 {
t.Fatalf("Expected 1 got %v", v)
}
v = stack.Pop()
if v != 3 {
t.Fatalf("Expected 3 got %v", v)
}
v = stack.Pop()
if v != -1 {
t.Fatalf("Expected -1 got %v", v)
}
}
func TestLen(t *testing.T) {
stack := NewStack(1)
v := stack.Len()
if v != 0 {
t.Fatalf("Expected 0 got %v", v)
}
stack.Push(1)
v = stack.Len()
if v != 1 {
t.Fatalf("Expected 1 got %v", v)
}
_ = stack.Pop()
v = stack.Len()
if v != 0 {
t.Fatalf("Expected 0 got %v", v)
}
}

View File

@@ -1,420 +0,0 @@
package chunk
import (
"container/list"
"fmt"
"hash/crc32"
"os"
"os/signal"
"sync"
"syscall"
"time"
"unsafe"
"go.uber.org/zap"
"golang.org/x/sys/unix"
"github.com/debridmediamanager.com/zurg/pkg/logutil"
)
const (
headerSize = int(unsafe.Sizeof(*new(chunkHeader)))
tocSize = int64(unsafe.Sizeof(*new(journalHeader)))
journalMagic = uint16('P'<<8 | 'D'&0xFF)
journalVersion = uint8(2)
)
var (
blankRequestID RequestID
crc32Table = crc32.MakeTable(crc32.Castagnoli)
)
// Storage is a chunk storage
type Storage struct {
ChunkFile *os.File
ChunkSize int64
HeaderSize int64
MaxChunks int
chunks map[RequestID]int
stack *Stack
lock sync.RWMutex
buffers []*Chunk
loadChunks int
signals chan os.Signal
journal []byte
mmapRegions [][]byte
chunksPerRegion int64
log *zap.SugaredLogger
}
type journalHeader struct {
magic uint16
version uint8
headerSize uint8
maxChunks uint32
chunkSize uint32
checksum uint32
}
// NewStorage creates a new storage
func NewStorage(chunkSize int64, maxChunks int, maxMmapSize int64, chunkFilePath string) (*Storage, error) {
rlog := logutil.NewLogger()
log := rlog.Named("storage")
s := Storage{
ChunkSize: chunkSize,
MaxChunks: maxChunks,
chunks: make(map[RequestID]int, maxChunks),
stack: NewStack(maxChunks),
buffers: make([]*Chunk, maxChunks),
signals: make(chan os.Signal, 1),
log: log,
}
journalSize := tocSize + int64(headerSize*maxChunks)
journalOffset := chunkSize * int64(maxChunks)
// Non-empty string in chunkFilePath enables MMAP disk storage for chunks
if chunkFilePath != "" {
chunkFile, err := os.OpenFile(chunkFilePath, os.O_RDWR|os.O_CREATE, 0600)
if nil != err {
s.log.Debugf("%v", err)
return nil, fmt.Errorf("could not open chunk cache file")
}
s.ChunkFile = chunkFile
currentSize, err := chunkFile.Seek(0, os.SEEK_END)
if nil != err {
s.log.Debugf("%v", err)
return nil, fmt.Errorf("chunk file is not seekable")
}
wantedSize := journalOffset + journalSize
s.log.Debugf("Current chunk cache file size: %v B (wanted: %v B)", currentSize, wantedSize)
if err := chunkFile.Truncate(currentSize); nil != err {
s.log.Warnf("Could not truncate chunk cache, skip resizing")
} else if currentSize != wantedSize {
if currentSize > tocSize {
err = s.relocateJournal(currentSize, wantedSize, journalSize, journalOffset)
if nil != err {
s.log.Errorf("%v", err)
} else {
s.log.Infof("Relocated chunk cache journal")
}
}
if err := chunkFile.Truncate(wantedSize); nil != err {
s.log.Debugf("%v", err)
return nil, fmt.Errorf("could not resize chunk cache file")
}
}
s.log.Infof("Created chunk cache file %v", chunkFile.Name())
s.loadChunks = int(min(currentSize/chunkSize, int64(maxChunks)))
}
// Alocate journal
if journal, err := s.mmap(journalOffset, journalSize); nil != err {
return nil, fmt.Errorf("could not allocate journal: %v", err)
} else {
if err := unix.Madvise(journal, syscall.MADV_RANDOM); nil != err {
s.log.Warnf("Madvise MADV_RANDOM for journal failed: %v", err)
}
tocOffset := journalSize - tocSize
header := journal[tocOffset:]
if valid := s.checkJournal(header, false); !valid {
s.initJournal(header)
}
s.journal = journal[:tocOffset]
}
// Setup sighandler
signal.Notify(s.signals, syscall.SIGINT, syscall.SIGTERM)
// Allocate mmap regions for chunks
if err := s.allocateMmapRegions(maxMmapSize); nil != err {
return nil, err
}
// Map chunks to slices from mmap regions
if err := s.mmapChunks(); nil != err {
return nil, err
}
return &s, nil
}
// relocateJournal moves existing journal prior to resize
func (s *Storage) relocateJournal(currentSize, wantedSize, journalSize, journalOffset int64) error {
header := make([]byte, tocSize)
if _, err := s.ChunkFile.ReadAt(header, currentSize-tocSize); nil != err {
return fmt.Errorf("failed to read journal header: %v", err)
}
if valid := s.checkJournal(header, true); !valid {
return fmt.Errorf("failed to validate journal header")
}
h := (*journalHeader)(unsafe.Pointer(&header[0]))
oldJournalOffset := s.ChunkSize * int64(h.maxChunks)
oldJournalSize := min(journalSize, currentSize-oldJournalOffset) - tocSize
journal := make([]byte, journalSize)
if _, err := s.ChunkFile.ReadAt(journal[:oldJournalSize], oldJournalOffset); nil != err {
return fmt.Errorf("failed to read journal: %v", err)
}
s.initJournal(header)
sizeWithoutJournal := currentSize - oldJournalSize - tocSize
if err := s.ChunkFile.Truncate(sizeWithoutJournal); nil != err {
return fmt.Errorf("could not truncate chunk cache journal: %v", err)
}
if err := s.ChunkFile.Truncate(wantedSize); nil != err {
return fmt.Errorf("could not resize chunk cache file: %v", err)
}
if _, err := s.ChunkFile.WriteAt(journal, journalOffset); nil != err {
return fmt.Errorf("failed to write journal: %v", err)
}
if _, err := s.ChunkFile.WriteAt(header, wantedSize-tocSize); nil != err {
return fmt.Errorf("failed to write journal header: %v", err)
}
return nil
}
// checkJournal verifies the journal header
func (s *Storage) checkJournal(journal []byte, skipMaxChunks bool) bool {
h := (*journalHeader)(unsafe.Pointer(&journal[0]))
// check magic bytes / endianess mismatch ('PD' vs 'DP')
if h.magic != journalMagic {
s.log.Debugf("Journal magic mismatch: %v != %v", h.magic, journalMagic)
return false
}
checksum := crc32.Checksum(journal[:12], crc32Table)
if h.checksum != checksum {
s.log.Debugf("Journal checksum mismatch: %08X != %08X", h.checksum, checksum)
return false
}
if h.version != journalVersion {
s.log.Debugf("Journal version mismatch: %v != %v", h.version, journalVersion)
return false
}
if h.headerSize != uint8(headerSize) {
s.log.Debugf("Journal chunk header size mismatch: %v != %v", h.headerSize, headerSize)
return false
}
if !skipMaxChunks && h.maxChunks != uint32(s.MaxChunks) {
s.log.Debugf("Journal max chunks mismatch: %v != %v", h.maxChunks, s.MaxChunks)
return false
}
if h.chunkSize != uint32(s.ChunkSize) {
s.log.Debugf("Journal chunk size mismatch: %v != %v", h.chunkSize, s.ChunkSize)
return false
}
s.log.Debug("Journal is valid")
return true
}
// initJournal initializes the journal
func (s *Storage) initJournal(journal []byte) {
h := (*journalHeader)(unsafe.Pointer(&journal[0]))
h.magic = journalMagic
h.version = journalVersion
h.headerSize = uint8(headerSize)
h.maxChunks = uint32(s.MaxChunks)
h.chunkSize = uint32(s.ChunkSize)
h.checksum = crc32.Checksum(journal[:12], crc32Table)
}
// allocateMmapRegions creates memory mappings to fit all chunks
func (s *Storage) allocateMmapRegions(maxMmapSize int64) error {
s.chunksPerRegion = maxMmapSize / s.ChunkSize
regionSize := s.chunksPerRegion * s.ChunkSize
numRegions := int64(s.MaxChunks) / s.chunksPerRegion
remChunks := int64(s.MaxChunks) % s.chunksPerRegion
if remChunks != 0 {
numRegions++
}
s.mmapRegions = make([][]byte, numRegions)
for i := int64(0); i < int64(len(s.mmapRegions)); i++ {
size := regionSize
if i == numRegions-1 && remChunks != 0 {
size = remChunks * s.ChunkSize
}
s.log.Debugf("Allocate mmap region %v/%v with size %v B", i+1, numRegions, size)
region, err := s.mmap(i*regionSize, size)
if nil != err {
s.log.Errorf("failed to mmap region %v/%v with size %v B", i+1, numRegions, size)
return err
}
if err := unix.Madvise(region, syscall.MADV_SEQUENTIAL); nil != err {
s.log.Warnf("Madvise MADV_SEQUENTIAL for region %v/%v failed: %v", i+1, numRegions, err)
}
s.mmapRegions[i] = region
}
return nil
}
// mmapChunks slices buffers from mmap regions and loads chunk metadata
func (s *Storage) mmapChunks() error {
start := time.Now()
empty := list.New()
restored := list.New()
loadedChunks := 0
for i := 0; i < s.MaxChunks; i++ {
select {
case sig := <-s.signals:
s.log.Warnf("Received signal %v, aborting chunk loader", sig)
return fmt.Errorf("aborted by signal")
default:
if loaded, err := s.initChunk(i, empty, restored); nil != err {
s.log.Errorf("failed to allocate chunk %v: %v", i, err)
return fmt.Errorf("failed to initialize chunks")
} else if loaded {
loadedChunks++
}
}
}
s.stack.Prepend(restored)
s.stack.Prepend(empty)
elapsed := time.Since(start)
if nil != s.ChunkFile {
s.log.Infof("Loaded %v/%v cache chunks in %v", loadedChunks, s.MaxChunks, elapsed)
} else {
s.log.Infof("Allocated %v cache chunks in %v", s.MaxChunks, elapsed)
}
return nil
}
// initChunk tries to restore a chunk from disk
func (s *Storage) initChunk(index int, empty *list.List, restored *list.List) (bool, error) {
chunk, err := s.allocateChunk(index)
if err != nil {
s.log.Debugf("%v", err)
return false, err
}
s.buffers[index] = chunk
id := chunk.id
if blankRequestID == id || index >= s.loadChunks {
chunk.item = empty.PushBack(index)
// s.log.Tracef("Allocate chunk %v/%v", index+1, s.MaxChunks)
return false, nil
}
chunk.item = restored.PushBack(index)
// s.log.Tracef("Load chunk %v/%v (restored: %v)", index+1, s.MaxChunks, id)
s.chunks[id] = index
return true, nil
}
// allocateChunk creates a new mmap-backed chunk
func (s *Storage) allocateChunk(index int) (*Chunk, error) {
region := int64(index) / s.chunksPerRegion
offset := (int64(index) - region*s.chunksPerRegion) * s.ChunkSize
// s.log.Tracef("Allocate chunk %v from region %v at offset %v", index+1, region, offset)
bytes := s.mmapRegions[region][offset : offset+s.ChunkSize : offset+s.ChunkSize]
headerOffset := index * headerSize
header := (*chunkHeader)(unsafe.Pointer(&s.journal[headerOffset]))
chunk := Chunk{
chunkHeader: header,
bytes: bytes,
}
return &chunk, nil
}
func (s *Storage) mmap(offset, size int64) ([]byte, error) {
if s.ChunkFile != nil {
return unix.Mmap(int(s.ChunkFile.Fd()), offset, int(size), syscall.PROT_READ|syscall.PROT_WRITE, syscall.MAP_SHARED)
} else {
return unix.Mmap(-1, 0, int(size), syscall.PROT_READ|syscall.PROT_WRITE, syscall.MAP_ANON|syscall.MAP_PRIVATE)
}
}
// Clear removes all old chunks on disk (will be called on each program start)
func (s *Storage) Clear() error {
return nil
}
// Load a chunk from ram or creates it
func (s *Storage) Load(id RequestID) []byte {
s.lock.RLock()
chunk := s.fetch(id)
if nil == chunk {
// s.log.Tracef("Load chunk %v (missing)", id)
s.lock.RUnlock()
return nil
}
if chunk.clean {
// s.log.Tracef("Load chunk %v (clean)", id)
defer s.lock.RUnlock()
return chunk.bytes
}
s.lock.RUnlock()
// Switch to write lock to avoid races on crc verification
s.lock.Lock()
defer s.lock.Unlock()
if chunk.valid(id) {
s.log.Debugf("Load chunk %v (verified)", id)
return chunk.bytes
}
s.log.Warnf("Load chunk %v (bad checksum: %08x <> %08x)", id, chunk.checksum, chunk.calculateChecksum())
s.stack.Purge(chunk.item)
return nil
}
// Store stores a chunk in the RAM and adds it to the disk storage queue
func (s *Storage) Store(id RequestID, bytes []byte) (err error) {
s.lock.RLock()
// Avoid storing same chunk multiple times
chunk := s.fetch(id)
if nil != chunk && chunk.clean {
// s.log.Tracef("Create chunk %v (exists: clean)", id)
s.lock.RUnlock()
return nil
}
s.lock.RUnlock()
s.lock.Lock()
defer s.lock.Unlock()
if nil != chunk {
if chunk.valid(id) {
s.log.Debugf("Create chunk %v (exists: valid)", id)
return nil
}
s.log.Warnf("Create chunk %v(exists: overwrite)", id)
} else {
index := s.stack.Pop()
if index == -1 {
s.log.Debugf("Create chunk %v (failed)", id)
return fmt.Errorf("no buffers available")
}
chunk = s.buffers[index]
deleteID := chunk.id
if blankRequestID != deleteID {
delete(s.chunks, deleteID)
s.log.Debugf("Create chunk %v (reused)", id)
} else {
s.log.Debugf("Create chunk %v (stored)", id)
}
s.chunks[id] = index
chunk.item = s.stack.Push(index)
}
chunk.update(id, bytes)
return nil
}
// fetch chunk and index by id
func (s *Storage) fetch(id RequestID) *Chunk {
index, exists := s.chunks[id]
if !exists {
return nil
}
chunk := s.buffers[index]
s.stack.Touch(chunk.item)
return chunk
}