diff --git a/internal/zfs/mount.go b/internal/zfs/mount.go deleted file mode 100644 index 6825ebe..0000000 --- a/internal/zfs/mount.go +++ /dev/null @@ -1,18 +0,0 @@ -package zfs - -import ( - "github.com/debridmediamanager.com/zurg/internal/config" - "github.com/winfsp/cgofuse/fuse" -) - -func Mount(host *fuse.FileSystemHost, cfg config.ConfigInterface) error { - host.SetCapCaseInsensitive(false) - host.SetCapReaddirPlus(false) - host.Mount(cfg.GetMountPoint(), []string{}) - return nil -} - -func Unmount(host *fuse.FileSystemHost) error { - _ = host.Unmount() - return nil -} diff --git a/internal/zfs/zfs.go b/internal/zfs/zfs.go deleted file mode 100644 index 7463ab4..0000000 --- a/internal/zfs/zfs.go +++ /dev/null @@ -1,183 +0,0 @@ -package zfs - -import ( - "strings" - - "github.com/debridmediamanager.com/zurg/internal/config" - "github.com/debridmediamanager.com/zurg/internal/torrent" - "github.com/debridmediamanager.com/zurg/pkg/chunk" - cmap "github.com/orcaman/concurrent-map/v2" - "github.com/winfsp/cgofuse/fuse" - "go.uber.org/zap" -) - -type ZurgFS struct { - fuse.FileSystemBase - TorrentManager *torrent.TorrentManager - Config config.ConfigInterface - Chunk *chunk.Manager - Log *zap.SugaredLogger -} - -func NewZurgFS(tm *torrent.TorrentManager, cfg config.ConfigInterface, chunk *chunk.Manager, log *zap.SugaredLogger) *ZurgFS { - return &ZurgFS{ - TorrentManager: tm, - Config: cfg, - Chunk: chunk, - Log: log, - } -} - -func (fs *ZurgFS) Open(path string, flags int) (errc int, fh uint64) { - segments := splitIntoSegments(path) - switch len(segments) { - case 0: - return 0, 0 - case 1: - if _, dirFound := fs.TorrentManager.DirectoryMap.Get(segments[0]); !dirFound { - return -fuse.ENOENT, ^uint64(0) - } - return 0, 0 - case 2: - if directory, dirFound := fs.TorrentManager.DirectoryMap.Get(segments[0]); !dirFound { - return -fuse.ENOENT, ^uint64(0) - } else if _, torFound := directory.Get(segments[1]); !torFound { - return -fuse.ENOENT, ^uint64(0) - } - return 0, 0 - case 3: - if directory, dirFound := fs.TorrentManager.DirectoryMap.Get(segments[0]); !dirFound { - return -fuse.ENOENT, ^uint64(0) - } else if torrent, torFound := directory.Get(segments[1]); !torFound { - return -fuse.ENOENT, ^uint64(0) - } else if file, fileFound := torrent.SelectedFiles.Get(segments[2]); !fileFound { - return -fuse.ENOENT, ^uint64(0) - } else { - return 0, file.ZurgFS - } - } - - return -fuse.ENOENT, ^uint64(0) -} - -func (fs *ZurgFS) Getattr(path string, stat *fuse.Stat_t, fh uint64) (errc int) { - segments := splitIntoSegments(path) - switch len(segments) { - case 0: - stat.Mode = fuse.S_IFDIR | 0555 - return 0 - case 1: - if _, dirFound := fs.TorrentManager.DirectoryMap.Get(segments[0]); !dirFound { - return -fuse.ENOENT - } - stat.Mode = fuse.S_IFDIR | 0555 - return 0 - case 2: - if directory, dirFound := fs.TorrentManager.DirectoryMap.Get(segments[0]); !dirFound { - return -fuse.ENOENT - } else if _, torFound := directory.Get(segments[1]); !torFound { - return -fuse.ENOENT - } - stat.Mode = fuse.S_IFDIR | 0555 - return 0 - case 3: - if directory, dirFound := fs.TorrentManager.DirectoryMap.Get(segments[0]); !dirFound { - return -fuse.ENOENT - } else if torrent, torFound := directory.Get(segments[1]); !torFound { - return -fuse.ENOENT - } else if file, fileFound := torrent.SelectedFiles.Get(segments[2]); !fileFound { - return -fuse.ENOENT - } else { - stat.Mode = fuse.S_IFREG | 0444 - stat.Size = file.Bytes - return 0 - } - } - - return -fuse.ENOENT -} - -func (fs *ZurgFS) Read(path string, buff []byte, ofst int64, fh uint64) (n int) { - segments := splitIntoSegments(path) - if len(segments) != 3 { - return -fuse.ENOENT - } else if directory, dirFound := fs.TorrentManager.DirectoryMap.Get(segments[0]); !dirFound { - return -fuse.ENOENT - } else if torrent, torFound := directory.Get(segments[1]); !torFound { - return -fuse.ENOENT - } else if file, fileFound := torrent.SelectedFiles.Get(segments[2]); !fileFound { - return -fuse.ENOENT - } else { - size := int64(len(buff)) - endofst := ofst + size - if endofst > file.Bytes { - endofst = file.Bytes - } - if endofst < ofst { - return 0 - } - // let's request a bigger chunk than we need - if size < int64(fs.Config.GetNetworkBufferSize()) { - size = int64(fs.Config.GetNetworkBufferSize()) - } - response, err := fs.Chunk.GetChunk(file, ofst, size) - if err != nil { - return -fuse.ENOENT - } - n = copy(buff, response[:endofst-ofst]) - return n - } -} - -func (fs *ZurgFS) Readdir(path string, - fill func(name string, stat *fuse.Stat_t, ofst int64) bool, - ofst int64, - fh uint64) (errc int) { - - segments := splitIntoSegments(path) - switch len(segments) { - case 0: - fill(".", nil, 0) - fill("..", nil, 0) - fs.TorrentManager.DirectoryMap.IterCb(func(directoryName string, _ cmap.ConcurrentMap[string, *torrent.Torrent]) { - fill(directoryName, nil, 0) - }) - case 1: - fill(".", nil, 0) - fill("..", nil, 0) - if torrents, dirFound := fs.TorrentManager.DirectoryMap.Get(segments[0]); !dirFound { - return -fuse.ENOENT - } else { - torrents.IterCb(func(accessKey string, _ *torrent.Torrent) { - fill(accessKey, nil, 0) - }) - } - case 2: - fill(".", nil, 0) - fill("..", nil, 0) - if torrents, dirFound := fs.TorrentManager.DirectoryMap.Get(segments[0]); !dirFound { - return -fuse.ENOENT - } else if tor, torFound := torrents.Get(segments[1]); !torFound { - return -fuse.ENOENT - } else { - tor.SelectedFiles.IterCb(func(filename string, _ *torrent.File) { - fill(filename, nil, 0) - }) - } - default: - return -fuse.ENOENT - } - return 0 -} - -func splitIntoSegments(path string) []string { - segments := strings.Split(path, "/") - // remove empty segments - for i := 0; i < len(segments); i++ { - if segments[i] == "" { - segments = append(segments[:i], segments[i+1:]...) - i-- - } - } - return segments -} diff --git a/pkg/chunk/chunk.go b/pkg/chunk/chunk.go deleted file mode 100644 index bc0c964..0000000 --- a/pkg/chunk/chunk.go +++ /dev/null @@ -1,51 +0,0 @@ -package chunk - -import ( - "container/list" - "hash/crc32" -) - -// Chunk of memory -type Chunk struct { - clean bool - *chunkHeader - bytes []byte - item *list.Element -} - -type chunkHeader struct { - id RequestID - size uint32 - checksum uint32 -} - -func (c *Chunk) valid(id RequestID) bool { - if c.id != id { - return false - } - if !c.clean { - c.clean = c.checksum == c.calculateChecksum() - } - return c.clean -} - -func (c *Chunk) update(id RequestID, bytes []byte) { - c.id = id - c.size = uint32(copy(c.bytes, bytes)) - c.checksum = c.calculateChecksum() - c.clean = true -} - -func (c *Chunk) calculateChecksum() uint32 { - size := c.size - if nil == c.bytes || size == 0 { - return 0 - } - maxSize := uint32(len(c.bytes)) - if size > maxSize { - // corrupt size or truncated chunk, fix size - c.size = maxSize - return crc32.Checksum(c.bytes, crc32Table) - } - return crc32.Checksum(c.bytes[:size], crc32Table) -} diff --git a/pkg/chunk/download.go b/pkg/chunk/download.go deleted file mode 100644 index 125170d..0000000 --- a/pkg/chunk/download.go +++ /dev/null @@ -1,144 +0,0 @@ -package chunk - -import ( - "fmt" - "io" - "net/http" - "sync" - "syscall" - "time" - - "github.com/debridmediamanager.com/zurg/internal/torrent" - zurghttp "github.com/debridmediamanager.com/zurg/pkg/http" - "github.com/debridmediamanager.com/zurg/pkg/logutil" - "go.uber.org/zap" - "golang.org/x/sys/unix" -) - -// Downloader handles concurrent chunk downloads -type Downloader struct { - BufferSize int64 - queue chan *Request - callbacks map[RequestID][]DownloadCallback - lock sync.Mutex - storage *Storage - torMgr *torrent.TorrentManager - log *zap.SugaredLogger - client *zurghttp.HTTPClient -} - -type DownloadCallback func(error, []byte) - -// NewDownloader creates a new download manager -func NewDownloader(threads int, storage *Storage, bufferSize int64, torMgr *torrent.TorrentManager, client *zurghttp.HTTPClient) (*Downloader, error) { - rlog := logutil.NewLogger() - log := rlog.Named("downloader") - - manager := Downloader{ - BufferSize: bufferSize, - queue: make(chan *Request, 100), - callbacks: make(map[RequestID][]DownloadCallback, 100), - storage: storage, - torMgr: torMgr, - client: client, - log: log, - } - - for i := 0; i < threads; i++ { - go manager.thread(i) - } - - return &manager, nil -} - -// Download starts a new download request -func (d *Downloader) Download(req *Request, callback DownloadCallback) { - d.lock.Lock() - callbacks, exists := d.callbacks[req.id] - if nil != callback { - d.callbacks[req.id] = append(callbacks, callback) - } else if !exists { - d.callbacks[req.id] = callbacks - } - if !exists { - d.queue <- req - } - d.lock.Unlock() -} - -func (d *Downloader) thread(n int) { - buffer, err := unix.Mmap(-1, 0, int(d.BufferSize), syscall.PROT_READ|syscall.PROT_WRITE, syscall.MAP_ANON|syscall.MAP_PRIVATE) - if nil != err { - d.log.Warnf("Failed to mmap download buffer %v: %v", n, err) - buffer = make([]byte, d.BufferSize) - } - for { - req := <-d.queue - d.download(req, buffer) - } -} - -func (d *Downloader) download(req *Request, buffer []byte) { - err := d.downloadFromAPI(req, buffer, 0) - - d.lock.Lock() - callbacks := d.callbacks[req.id] - for _, callback := range callbacks { - callback(err, buffer) - } - delete(d.callbacks, req.id) - d.lock.Unlock() - - if nil != err { - return - } - - if err := d.storage.Store(req.id, buffer); nil != err { - d.log.Warnf("Could not store chunk %v: %v", req.id, err) - } -} - -func (d *Downloader) downloadFromAPI(request *Request, buffer []byte, delay int64) error { - // sleep if request is throttled - if delay > 0 { - time.Sleep(time.Duration(delay) * time.Second) - } - - resp := d.torMgr.UnrestrictUntilOk(request.file.Link) - if resp == nil { - return fmt.Errorf("cannot unrestrict file %s %s", request.file.Path, request.file.Link) - } - - downloadURL := resp.Download - req, err := http.NewRequest("GET", downloadURL, nil) - if nil != err { - return fmt.Errorf("could not create request object %s %s from API", request.file.Path, request.file.Link) - } - req.Header.Add("Range", fmt.Sprintf("bytes=%v-%v", request.offsetStart, request.offsetEnd-1)) - - res, err := http.DefaultClient.Do(req) - if nil != err { - d.log.Debugf("request error: %v", err) - return fmt.Errorf("could not request object %s %s from API", request.file.Path, request.file.Link) - } - defer res.Body.Close() - reader := res.Body - - if res.StatusCode != 206 && res.StatusCode != 200 { - return fmt.Errorf("could not read object %s %s / StatusCode: %v", - request.file.Path, request.file.Link, res.StatusCode) - } - - if res.ContentLength == -1 { - return fmt.Errorf("missing Content-Length header in response") - } - - n, err := io.ReadFull(reader, buffer[:res.ContentLength:cap(buffer)]) - if nil != err && err != io.ErrUnexpectedEOF { - d.log.Debugf("response read error: %v", err) - return fmt.Errorf("could not read objects %s %s API response", request.file.Path, request.file.Link) - } - d.log.Debugf("Downloaded %v bytes of %s", n, request.file.Path) - - return nil -} diff --git a/pkg/chunk/manager.go b/pkg/chunk/manager.go deleted file mode 100644 index d18d333..0000000 --- a/pkg/chunk/manager.go +++ /dev/null @@ -1,271 +0,0 @@ -package chunk - -import ( - "encoding/binary" - "fmt" - "hash/fnv" - "os" - - "github.com/debridmediamanager.com/zurg/internal/torrent" - zurghttp "github.com/debridmediamanager.com/zurg/pkg/http" -) - -// Manager manages chunks on disk -type Manager struct { - ChunkSize int64 - LoadAhead int - downloader *Downloader - storage *Storage - queue chan *QueueEntry -} - -type QueueEntry struct { - request *Request - response chan Response -} - -// RequestID is the binary identifier for a chunk request -type RequestID [24]byte - -func (id RequestID) String() string { - return fmt.Sprintf("%032x:%v", id[:16], binary.BigEndian.Uint64(id[16:])) -} - -// Request represents a chunk request -type Request struct { - id RequestID - file *torrent.File - offsetStart int64 - offsetEnd int64 - chunkOffset int64 - chunkOffsetEnd int64 - sequence int - preload bool -} - -// Response represetns a chunk response -type Response struct { - Sequence int - Error error - Bytes []byte -} - -// NewManager creates a new chunk manager -func NewManager( - chunkFile string, - chunkSize int64, - loadAhead, - checkThreads, - loadThreads, - maxChunks int, - torMgr *torrent.TorrentManager, - client *zurghttp.HTTPClient) (*Manager, error) { - - pageSize := int64(os.Getpagesize()) - if chunkSize < pageSize { - return nil, fmt.Errorf("chunk size must not be < %v", pageSize) - } - if chunkSize%pageSize != 0 { - return nil, fmt.Errorf("chunk size must be divideable by %v", pageSize) - } - // 32-Bit: ~2GB / 64-Bit: ~8EB - maxMmapSize := int64(^uint(0) >> 1) - if chunkSize > maxMmapSize { - return nil, fmt.Errorf("chunk size must be < %v", maxMmapSize) - } - if maxChunks < 2 || maxChunks < loadAhead { - return nil, fmt.Errorf("max-chunks must be greater than 2 and bigger than the load ahead value") - } - - storage, err := NewStorage(chunkSize, maxChunks, maxMmapSize, chunkFile) - if nil != err { - return nil, err - } - - downloader, err := NewDownloader(loadThreads, storage, chunkSize, torMgr, client) - if nil != err { - return nil, err - } - - manager := Manager{ - ChunkSize: chunkSize, - LoadAhead: loadAhead, - downloader: downloader, - storage: storage, - queue: make(chan *QueueEntry, 100), - } - - if err := manager.storage.Clear(); nil != err { - return nil, err - } - - for i := 0; i < checkThreads; i++ { - go manager.thread() - } - - return &manager, nil -} - -// GetChunk loads one chunk and starts the preload for the next chunks -func (m *Manager) GetChunk(file *torrent.File, offset, size int64) ([]byte, error) { - maxOffset := file.Bytes - if offset > maxOffset { - return nil, fmt.Errorf("tried to read past EOF of %v at offset %v", file.ID, offset) - } - // Log.Infof("Request %v:%v md5:%v", object.ID, offset, object.MD5Checksum) - if offset+size > maxOffset { - size = file.Bytes - offset - } - - ranges := splitChunkRanges(offset, size, m.ChunkSize) - numRanges := len(ranges) - responses := make(chan Response, numRanges) - - last := numRanges - 1 - for i, r := range ranges { - m.requestChunk(file, r.offset, r.size, i, i == last, responses) - } - - data := make([]byte, size) - for i := 0; i < cap(responses); i++ { - res := <-responses - if nil != res.Error { - return nil, res.Error - } - - dataOffset := ranges[res.Sequence].offset - offset - - if n := copy(data[dataOffset:], res.Bytes); n == 0 { - return nil, fmt.Errorf("request %v slice %v has empty response", file.ID, res.Sequence) - } - } - close(responses) - - return data, nil -} - -func buildRequestID(object *torrent.File, offset int64) (id RequestID) { - fileID := object.Link - if fileID == "" { - fileID = object.Path - } - hash := hashStringToFh(fileID) - copy(id[:16], hash[:16]) - binary.BigEndian.PutUint64(id[16:], uint64(offset)) - return -} - -func hashStringToFh(s string) []byte { - hasher := fnv.New64a() - return hasher.Sum([]byte(s)) -} - -func (m *Manager) requestChunk(object *torrent.File, offset, size int64, sequence int, preload bool, response chan Response) { - chunkOffset := offset % m.ChunkSize - offsetStart := offset - chunkOffset - offsetEnd := offsetStart + m.ChunkSize - - request := &Request{ - id: buildRequestID(object, offsetStart), - file: object, - offsetStart: offsetStart, - offsetEnd: offsetEnd, - chunkOffset: chunkOffset, - chunkOffsetEnd: chunkOffset + size, - sequence: sequence, - preload: false, - } - - m.queue <- &QueueEntry{ - request: request, - response: response, - } - - if !preload { - return - } - - for i := m.ChunkSize; i < (m.ChunkSize * int64(m.LoadAhead+1)); i += m.ChunkSize { - aheadOffsetStart := offsetStart + i - aheadOffsetEnd := aheadOffsetStart + m.ChunkSize - if uint64(aheadOffsetStart) < uint64(object.Bytes) && uint64(aheadOffsetEnd) < uint64(object.Bytes) { - request := &Request{ - id: buildRequestID(object, aheadOffsetStart), - file: object, - offsetStart: aheadOffsetStart, - offsetEnd: aheadOffsetEnd, - preload: true, - } - m.queue <- &QueueEntry{ - request: request, - } - } - } -} - -type byteRange struct { - offset, size int64 -} - -// Calculate request ranges that span multiple chunks -// -// This can happen with Direct-IO and unaligned reads or -// if the size is bigger than the chunk size. -func splitChunkRanges(offset, size, chunkSize int64) []byteRange { - ranges := make([]byteRange, 0, size/chunkSize+2) - for remaining := size; remaining > 0; remaining -= size { - size = min(remaining, chunkSize-offset%chunkSize) - ranges = append(ranges, byteRange{offset, size}) - offset += size - } - return ranges -} - -func (m *Manager) thread() { - for { - queueEntry := <-m.queue - m.checkChunk(queueEntry.request, queueEntry.response) - } -} - -func (m *Manager) checkChunk(req *Request, response chan Response) { - if nil == response { - if nil == m.storage.Load(req.id) { - m.downloader.Download(req, nil) - } - return - } - - if bytes := m.storage.Load(req.id); nil != bytes { - response <- Response{ - Sequence: req.sequence, - Bytes: adjustResponseChunk(req, bytes), - } - return - } - - m.downloader.Download(req, func(err error, bytes []byte) { - response <- Response{ - Sequence: req.sequence, - Error: err, - Bytes: adjustResponseChunk(req, bytes), - } - }) -} - -func adjustResponseChunk(req *Request, bytes []byte) []byte { - if nil == bytes { - return nil - } - bytesLen := int64(len(bytes)) - sOffset := min(req.chunkOffset, bytesLen) - eOffset := min(req.chunkOffsetEnd, bytesLen) - return bytes[sOffset:eOffset] -} - -func min(x, y int64) int64 { - if x < y { - return x - } - return y -} diff --git a/pkg/chunk/manager_test.go b/pkg/chunk/manager_test.go deleted file mode 100644 index ceb78e5..0000000 --- a/pkg/chunk/manager_test.go +++ /dev/null @@ -1,53 +0,0 @@ -package chunk - -import "testing" - -func TestSplitChunkRanges(t *testing.T) { - testcases := []struct { - offset, size, chunkSize int64 - result []byteRange - }{ - {0, 0, 4096, []byteRange{}}, - {0, 4096, 4096, []byteRange{ - {0, 4096}, - }}, - {4095, 4096, 4096, []byteRange{ - {4095, 1}, - {4096, 4095}, - }}, - {0, 8192, 4096, []byteRange{ - {0, 4096}, - {4096, 4096}, - }}, - {2048, 8192, 4096, []byteRange{ - {2048, 2048}, - {4096, 4096}, - {8192, 2048}, - }}, - {2048, 8192, 4096, []byteRange{ - {2048, 2048}, - {4096, 4096}, - {8192, 2048}, - }}, - {17960960, 16777216, 10485760, []byteRange{ - {17960960, 3010560}, - {20971520, 10485760}, - {31457280, 3280896}, - }}, - } - for i, tc := range testcases { - ranges := splitChunkRanges(tc.offset, tc.size, tc.chunkSize) - actualSize := len(ranges) - expectedSize := len(tc.result) - if actualSize != expectedSize { - t.Fatalf("ByteRange %v length mismatch: %v != %v", i, actualSize, expectedSize) - } - for j, r := range ranges { - actual := r - expected := tc.result[j] - if actual != expected { - t.Fatalf("ByteRange %v mismatch: %v != %v", i, actual, expected) - } - } - } -} diff --git a/pkg/chunk/stack.go b/pkg/chunk/stack.go deleted file mode 100644 index 8955306..0000000 --- a/pkg/chunk/stack.go +++ /dev/null @@ -1,75 +0,0 @@ -package chunk - -import ( - "container/list" - "sync" -) - -// Stack is a thread safe list/stack implementation -type Stack struct { - items *list.List - lock sync.Mutex - maxSize int -} - -// NewStack creates a new stack -func NewStack(maxChunks int) *Stack { - return &Stack{ - items: list.New(), - maxSize: maxChunks, - } -} - -// Len gets the number of items on the stack -func (s *Stack) Len() int { - s.lock.Lock() - defer s.lock.Unlock() - return s.items.Len() -} - -// Pop pops the first item from the stack -func (s *Stack) Pop() int { - s.lock.Lock() - defer s.lock.Unlock() - if s.items.Len() < s.maxSize { - return -1 - } - item := s.items.Front() - if nil == item { - return -1 - } - s.items.Remove(item) - return item.Value.(int) -} - -// Touch moves the specified item to the last position of the stack -func (s *Stack) Touch(item *list.Element) { - s.lock.Lock() - if item != s.items.Back() { - s.items.MoveToBack(item) - } - s.lock.Unlock() -} - -// Push adds a new item to the last position of the stack -func (s *Stack) Push(id int) *list.Element { - s.lock.Lock() - defer s.lock.Unlock() - return s.items.PushBack(id) -} - -// Prepend adds a list to the front of the stack -func (s *Stack) Prepend(items *list.List) { - s.lock.Lock() - s.items.PushFrontList(items) - s.lock.Unlock() -} - -// Purge an item from the stack -func (s *Stack) Purge(item *list.Element) { - s.lock.Lock() - defer s.lock.Unlock() - if item != s.items.Front() { - s.items.MoveToFront(item) - } -} diff --git a/pkg/chunk/stack_test.go b/pkg/chunk/stack_test.go deleted file mode 100644 index 023bd70..0000000 --- a/pkg/chunk/stack_test.go +++ /dev/null @@ -1,72 +0,0 @@ -package chunk - -import "testing" - -func TestOOB(t *testing.T) { - stack := NewStack(1) - - item := stack.Push(1) - stack.Touch(item) -} - -func TestAddToStack(t *testing.T) { - stack := NewStack(1) - - item1 := stack.Push(1) - item2 := stack.Push(2) - item3 := stack.Push(3) - item4 := stack.Push(4) - - stack.Touch(item1) - stack.Touch(item3) - - stack.Purge(item2) - stack.Purge(item4) - - v := stack.Pop() - if v != 4 { - t.Fatalf("Expected 4 got %v", v) - } - - v = stack.Pop() - if v != 2 { - t.Fatalf("Expected 2 got %v", v) - } - - v = stack.Pop() - if v != 1 { - t.Fatalf("Expected 1 got %v", v) - } - - v = stack.Pop() - if v != 3 { - t.Fatalf("Expected 3 got %v", v) - } - - v = stack.Pop() - if v != -1 { - t.Fatalf("Expected -1 got %v", v) - } -} - -func TestLen(t *testing.T) { - stack := NewStack(1) - - v := stack.Len() - if v != 0 { - t.Fatalf("Expected 0 got %v", v) - } - - stack.Push(1) - v = stack.Len() - if v != 1 { - t.Fatalf("Expected 1 got %v", v) - } - - _ = stack.Pop() - v = stack.Len() - if v != 0 { - t.Fatalf("Expected 0 got %v", v) - } - -} diff --git a/pkg/chunk/storage.go b/pkg/chunk/storage.go deleted file mode 100644 index 96e759f..0000000 --- a/pkg/chunk/storage.go +++ /dev/null @@ -1,415 +0,0 @@ -package chunk - -import ( - "container/list" - "fmt" - "hash/crc32" - "os" - "os/signal" - "sync" - "syscall" - "time" - "unsafe" - - "go.uber.org/zap" - "golang.org/x/sys/unix" - - "github.com/debridmediamanager.com/zurg/pkg/logutil" -) - -const ( - headerSize = int(unsafe.Sizeof(*new(chunkHeader))) - tocSize = int64(unsafe.Sizeof(*new(journalHeader))) - journalMagic = uint16('P'<<8 | 'D'&0xFF) - journalVersion = uint8(2) -) - -var ( - blankRequestID RequestID - crc32Table = crc32.MakeTable(crc32.Castagnoli) -) - -// Storage is a chunk storage -type Storage struct { - ChunkFile *os.File - ChunkSize int64 - HeaderSize int64 - MaxChunks int - chunks map[RequestID]int - stack *Stack - lock sync.RWMutex - buffers []*Chunk - loadChunks int - signals chan os.Signal - journal []byte - mmapRegions [][]byte - chunksPerRegion int64 - log *zap.SugaredLogger -} - -type journalHeader struct { - magic uint16 - version uint8 - headerSize uint8 - maxChunks uint32 - chunkSize uint32 - checksum uint32 -} - -// NewStorage creates a new storage -func NewStorage(chunkSize int64, maxChunks int, maxMmapSize int64, chunkFilePath string) (*Storage, error) { - rlog := logutil.NewLogger() - log := rlog.Named("storage") - - s := Storage{ - ChunkSize: chunkSize, - MaxChunks: maxChunks, - chunks: make(map[RequestID]int, maxChunks), - stack: NewStack(maxChunks), - buffers: make([]*Chunk, maxChunks), - signals: make(chan os.Signal, 1), - log: log, - } - - journalSize := tocSize + int64(headerSize*maxChunks) - journalOffset := chunkSize * int64(maxChunks) - - // Non-empty string in chunkFilePath enables MMAP disk storage for chunks - if chunkFilePath != "" { - chunkFile, err := os.OpenFile(chunkFilePath, os.O_RDWR|os.O_CREATE, 0600) - if nil != err { - s.log.Debugf("%v", err) - return nil, fmt.Errorf("could not open chunk cache file") - } - s.ChunkFile = chunkFile - currentSize, err := chunkFile.Seek(0, os.SEEK_END) - if nil != err { - s.log.Debugf("%v", err) - return nil, fmt.Errorf("chunk file is not seekable") - } - wantedSize := journalOffset + journalSize - s.log.Debugf("Current chunk cache file size: %v B (wanted: %v B)", currentSize, wantedSize) - if err := chunkFile.Truncate(currentSize); nil != err { - s.log.Warnf("Could not truncate chunk cache, skip resizing") - } else if currentSize != wantedSize { - if currentSize > tocSize { - err = s.relocateJournal(currentSize, wantedSize, journalSize, journalOffset) - if nil != err { - s.log.Errorf("%v", err) - } else { - s.log.Infof("Relocated chunk cache journal") - } - } - if err := chunkFile.Truncate(wantedSize); nil != err { - s.log.Debugf("%v", err) - return nil, fmt.Errorf("could not resize chunk cache file") - } - } - s.log.Infof("Created chunk cache file %v", chunkFile.Name()) - s.loadChunks = int(min(currentSize/chunkSize, int64(maxChunks))) - } - - // Alocate journal - if journal, err := s.mmap(journalOffset, journalSize); nil != err { - return nil, fmt.Errorf("could not allocate journal: %v", err) - } else { - if err := unix.Madvise(journal, syscall.MADV_RANDOM); nil != err { - s.log.Warnf("Madvise MADV_RANDOM for journal failed: %v", err) - } - tocOffset := journalSize - tocSize - header := journal[tocOffset:] - if valid := s.checkJournal(header, false); !valid { - s.initJournal(header) - } - s.journal = journal[:tocOffset] - } - - // Setup sighandler - signal.Notify(s.signals, syscall.SIGINT, syscall.SIGTERM) - - // Allocate mmap regions for chunks - if err := s.allocateMmapRegions(maxMmapSize); nil != err { - return nil, err - } - // Map chunks to slices from mmap regions - if err := s.mmapChunks(); nil != err { - return nil, err - } - - return &s, nil -} - -// relocateJournal moves existing journal prior to resize -func (s *Storage) relocateJournal(currentSize, wantedSize, journalSize, journalOffset int64) error { - header := make([]byte, tocSize) - if _, err := s.ChunkFile.ReadAt(header, currentSize-tocSize); nil != err { - return fmt.Errorf("failed to read journal header: %v", err) - } - - if valid := s.checkJournal(header, true); !valid { - return fmt.Errorf("failed to validate journal header") - } - - h := (*journalHeader)(unsafe.Pointer(&header[0])) - oldJournalOffset := s.ChunkSize * int64(h.maxChunks) - oldJournalSize := min(journalSize, currentSize-oldJournalOffset) - tocSize - journal := make([]byte, journalSize) - - if _, err := s.ChunkFile.ReadAt(journal[:oldJournalSize], oldJournalOffset); nil != err { - return fmt.Errorf("failed to read journal: %v", err) - } - - s.initJournal(header) - - sizeWithoutJournal := currentSize - oldJournalSize - tocSize - if err := s.ChunkFile.Truncate(sizeWithoutJournal); nil != err { - return fmt.Errorf("could not truncate chunk cache journal: %v", err) - } - - if err := s.ChunkFile.Truncate(wantedSize); nil != err { - return fmt.Errorf("could not resize chunk cache file: %v", err) - } - - if _, err := s.ChunkFile.WriteAt(journal, journalOffset); nil != err { - return fmt.Errorf("failed to write journal: %v", err) - } - if _, err := s.ChunkFile.WriteAt(header, wantedSize-tocSize); nil != err { - return fmt.Errorf("failed to write journal header: %v", err) - } - return nil -} - -// checkJournal verifies the journal header -func (s *Storage) checkJournal(journal []byte, skipMaxChunks bool) bool { - h := (*journalHeader)(unsafe.Pointer(&journal[0])) - // check magic bytes / endianess mismatch ('PD' vs 'DP') - if h.magic != journalMagic { - s.log.Debugf("Journal magic mismatch: %v != %v", h.magic, journalMagic) - return false - } - checksum := crc32.Checksum(journal[:12], crc32Table) - if h.checksum != checksum { - s.log.Debugf("Journal checksum mismatch: %08X != %08X", h.checksum, checksum) - return false - } - if h.version != journalVersion { - s.log.Debugf("Journal version mismatch: %v != %v", h.version, journalVersion) - return false - } - if h.headerSize != uint8(headerSize) { - s.log.Debugf("Journal chunk header size mismatch: %v != %v", h.headerSize, headerSize) - return false - } - if !skipMaxChunks && h.maxChunks != uint32(s.MaxChunks) { - s.log.Debugf("Journal max chunks mismatch: %v != %v", h.maxChunks, s.MaxChunks) - return false - } - if h.chunkSize != uint32(s.ChunkSize) { - s.log.Debugf("Journal chunk size mismatch: %v != %v", h.chunkSize, s.ChunkSize) - return false - } - s.log.Debug("Journal is valid") - return true -} - -// initJournal initializes the journal -func (s *Storage) initJournal(journal []byte) { - h := (*journalHeader)(unsafe.Pointer(&journal[0])) - h.magic = journalMagic - h.version = journalVersion - h.headerSize = uint8(headerSize) - h.maxChunks = uint32(s.MaxChunks) - h.chunkSize = uint32(s.ChunkSize) - h.checksum = crc32.Checksum(journal[:12], crc32Table) -} - -// allocateMmapRegions creates memory mappings to fit all chunks -func (s *Storage) allocateMmapRegions(maxMmapSize int64) error { - s.chunksPerRegion = maxMmapSize / s.ChunkSize - regionSize := s.chunksPerRegion * s.ChunkSize - numRegions := int64(s.MaxChunks) / s.chunksPerRegion - remChunks := int64(s.MaxChunks) % s.chunksPerRegion - if remChunks != 0 { - numRegions++ - } - s.mmapRegions = make([][]byte, numRegions) - for i := int64(0); i < int64(len(s.mmapRegions)); i++ { - size := regionSize - if i == numRegions-1 && remChunks != 0 { - size = remChunks * s.ChunkSize - } - s.log.Debugf("Allocate mmap region %v/%v with size %v B", i+1, numRegions, size) - region, err := s.mmap(i*regionSize, size) - if nil != err { - s.log.Errorf("failed to mmap region %v/%v with size %v B", i+1, numRegions, size) - return err - } - if err := unix.Madvise(region, syscall.MADV_SEQUENTIAL); nil != err { - s.log.Warnf("Madvise MADV_SEQUENTIAL for region %v/%v failed: %v", i+1, numRegions, err) - } - s.mmapRegions[i] = region - } - return nil -} - -// mmapChunks slices buffers from mmap regions and loads chunk metadata -func (s *Storage) mmapChunks() error { - start := time.Now() - empty := list.New() - restored := list.New() - loadedChunks := 0 - for i := 0; i < s.MaxChunks; i++ { - select { - case sig := <-s.signals: - s.log.Warnf("Received signal %v, aborting chunk loader", sig) - return fmt.Errorf("aborted by signal") - default: - if loaded, err := s.initChunk(i, empty, restored); nil != err { - s.log.Errorf("failed to allocate chunk %v: %v", i, err) - return fmt.Errorf("failed to initialize chunks") - } else if loaded { - loadedChunks++ - } - } - } - s.stack.Prepend(restored) - s.stack.Prepend(empty) - elapsed := time.Since(start) - if nil != s.ChunkFile { - s.log.Infof("Loaded %v/%v cache chunks in %v", loadedChunks, s.MaxChunks, elapsed) - } else { - s.log.Infof("Allocated %v cache chunks in %v", s.MaxChunks, elapsed) - } - return nil -} - -// initChunk tries to restore a chunk from disk -func (s *Storage) initChunk(index int, empty *list.List, restored *list.List) (bool, error) { - chunk, err := s.allocateChunk(index) - if err != nil { - s.log.Debugf("%v", err) - return false, err - } - - s.buffers[index] = chunk - - id := chunk.id - - if blankRequestID == id || index >= s.loadChunks { - chunk.item = empty.PushBack(index) - // s.log.Tracef("Allocate chunk %v/%v", index+1, s.MaxChunks) - return false, nil - } - - chunk.item = restored.PushBack(index) - // s.log.Tracef("Load chunk %v/%v (restored: %v)", index+1, s.MaxChunks, id) - s.chunks[id] = index - - return true, nil -} - -// allocateChunk creates a new mmap-backed chunk -func (s *Storage) allocateChunk(index int) (*Chunk, error) { - region := int64(index) / s.chunksPerRegion - offset := (int64(index) - region*s.chunksPerRegion) * s.ChunkSize - // s.log.Tracef("Allocate chunk %v from region %v at offset %v", index+1, region, offset) - bytes := s.mmapRegions[region][offset : offset+s.ChunkSize : offset+s.ChunkSize] - headerOffset := index * headerSize - header := (*chunkHeader)(unsafe.Pointer(&s.journal[headerOffset])) - chunk := Chunk{ - chunkHeader: header, - bytes: bytes, - } - return &chunk, nil -} - -func (s *Storage) mmap(offset, size int64) ([]byte, error) { - if s.ChunkFile != nil { - return unix.Mmap(int(s.ChunkFile.Fd()), offset, int(size), syscall.PROT_READ|syscall.PROT_WRITE, syscall.MAP_SHARED) - } else { - return unix.Mmap(-1, 0, int(size), syscall.PROT_READ|syscall.PROT_WRITE, syscall.MAP_ANON|syscall.MAP_PRIVATE) - } -} - -// Clear removes all old chunks on disk (will be called on each program start) -func (s *Storage) Clear() error { - return nil -} - -// Load a chunk from ram or creates it -func (s *Storage) Load(id RequestID) []byte { - s.lock.RLock() - chunk := s.fetch(id) - if nil == chunk { - // s.log.Tracef("Load chunk %v (missing)", id) - s.lock.RUnlock() - return nil - } - if chunk.clean { - // s.log.Tracef("Load chunk %v (clean)", id) - defer s.lock.RUnlock() - return chunk.bytes - } - s.lock.RUnlock() - // Switch to write lock to avoid races on crc verification - s.lock.Lock() - defer s.lock.Unlock() - if chunk.valid(id) { - s.log.Debugf("Load chunk %v (verified)", id) - return chunk.bytes - } - s.log.Warnf("Load chunk %v (bad checksum: %08x <> %08x)", id, chunk.checksum, chunk.calculateChecksum()) - s.stack.Purge(chunk.item) - return nil -} - -// Store stores a chunk in the RAM and adds it to the disk storage queue -func (s *Storage) Store(id RequestID, bytes []byte) (err error) { - s.lock.RLock() - - // Avoid storing same chunk multiple times - chunk := s.fetch(id) - if nil != chunk && chunk.clean { - // s.log.Tracef("Create chunk %v (exists: clean)", id) - s.lock.RUnlock() - return nil - } - - s.lock.RUnlock() - s.lock.Lock() - defer s.lock.Unlock() - - if nil != chunk { - if chunk.valid(id) { - return nil - } - s.log.Warnf("Create chunk %v(exists: overwrite)", id) - } else { - index := s.stack.Pop() - if index == -1 { - return fmt.Errorf("no buffers available") - } - chunk = s.buffers[index] - deleteID := chunk.id - if blankRequestID != deleteID { - delete(s.chunks, deleteID) - } - s.chunks[id] = index - chunk.item = s.stack.Push(index) - } - - chunk.update(id, bytes) - - return nil -} - -// fetch chunk and index by id -func (s *Storage) fetch(id RequestID) *Chunk { - index, exists := s.chunks[id] - if !exists { - return nil - } - chunk := s.buffers[index] - s.stack.Touch(chunk.item) - return chunk -}