From 1591b15b378107669af512f7d33902e59d33ff7f Mon Sep 17 00:00:00 2001 From: Ben Sarmiento Date: Mon, 6 Nov 2023 01:07:44 +0100 Subject: [PATCH] Resolve all errors --- pkg/chunk/chunk.go | 51 +++++ pkg/chunk/download.go | 149 ++++++++++++++ pkg/chunk/manager.go | 262 ++++++++++++++++++++++++ pkg/chunk/manager_test.go | 53 +++++ pkg/chunk/stack.go | 75 +++++++ pkg/chunk/stack_test.go | 72 +++++++ pkg/chunk/storage.go | 420 ++++++++++++++++++++++++++++++++++++++ 7 files changed, 1082 insertions(+) create mode 100644 pkg/chunk/chunk.go create mode 100644 pkg/chunk/download.go create mode 100644 pkg/chunk/manager.go create mode 100644 pkg/chunk/manager_test.go create mode 100644 pkg/chunk/stack.go create mode 100644 pkg/chunk/stack_test.go create mode 100644 pkg/chunk/storage.go diff --git a/pkg/chunk/chunk.go b/pkg/chunk/chunk.go new file mode 100644 index 0000000..bc0c964 --- /dev/null +++ b/pkg/chunk/chunk.go @@ -0,0 +1,51 @@ +package chunk + +import ( + "container/list" + "hash/crc32" +) + +// Chunk of memory +type Chunk struct { + clean bool + *chunkHeader + bytes []byte + item *list.Element +} + +type chunkHeader struct { + id RequestID + size uint32 + checksum uint32 +} + +func (c *Chunk) valid(id RequestID) bool { + if c.id != id { + return false + } + if !c.clean { + c.clean = c.checksum == c.calculateChecksum() + } + return c.clean +} + +func (c *Chunk) update(id RequestID, bytes []byte) { + c.id = id + c.size = uint32(copy(c.bytes, bytes)) + c.checksum = c.calculateChecksum() + c.clean = true +} + +func (c *Chunk) calculateChecksum() uint32 { + size := c.size + if nil == c.bytes || size == 0 { + return 0 + } + maxSize := uint32(len(c.bytes)) + if size > maxSize { + // corrupt size or truncated chunk, fix size + c.size = maxSize + return crc32.Checksum(c.bytes, crc32Table) + } + return crc32.Checksum(c.bytes[:size], crc32Table) +} diff --git a/pkg/chunk/download.go b/pkg/chunk/download.go new file mode 100644 index 0000000..b56b053 --- /dev/null +++ b/pkg/chunk/download.go @@ -0,0 +1,149 @@ +package chunk + +import ( + "fmt" + "io" + "net/http" + "sync" + "syscall" + "time" + + "github.com/debridmediamanager.com/zurg/internal/config" + "github.com/debridmediamanager.com/zurg/pkg/logutil" + "github.com/debridmediamanager.com/zurg/pkg/realdebrid" + "go.uber.org/zap" + "golang.org/x/sys/unix" +) + +// Downloader handles concurrent chunk downloads +type Downloader struct { + BufferSize int64 + queue chan *Request + callbacks map[RequestID][]DownloadCallback + lock sync.Mutex + storage *Storage + c config.ConfigInterface + log *zap.SugaredLogger +} + +type DownloadCallback func(error, []byte) + +// NewDownloader creates a new download manager +func NewDownloader(threads int, storage *Storage, bufferSize int64, c config.ConfigInterface) (*Downloader, error) { + rlog := logutil.NewLogger() + log := rlog.Named("downloader") + + manager := Downloader{ + BufferSize: bufferSize, + queue: make(chan *Request, 100), + callbacks: make(map[RequestID][]DownloadCallback, 100), + storage: storage, + c: c, + log: log, + } + + for i := 0; i < threads; i++ { + go manager.thread(i) + } + + return &manager, nil +} + +// Download starts a new download request +func (d *Downloader) Download(req *Request, callback DownloadCallback) { + d.lock.Lock() + callbacks, exists := d.callbacks[req.id] + if nil != callback { + d.callbacks[req.id] = append(callbacks, callback) + } else if !exists { + d.callbacks[req.id] = callbacks + } + if !exists { + d.queue <- req + } + d.lock.Unlock() +} + +func (d *Downloader) thread(n int) { + buffer, err := unix.Mmap(-1, 0, int(d.BufferSize), syscall.PROT_READ|syscall.PROT_WRITE, syscall.MAP_ANON|syscall.MAP_PRIVATE) + if nil != err { + d.log.Warnf("Failed to mmap download buffer %v: %v", n, err) + buffer = make([]byte, d.BufferSize) + } + for { + req := <-d.queue + d.download(req, buffer) + } +} + +func (d *Downloader) download(req *Request, buffer []byte) { + d.log.Debugf("Starting download %v (preload: %v)", req.id, req.preload) + err := d.downloadFromAPI(req, buffer, 0) + + d.lock.Lock() + callbacks := d.callbacks[req.id] + for _, callback := range callbacks { + callback(err, buffer) + } + delete(d.callbacks, req.id) + d.lock.Unlock() + + if nil != err { + return + } + + if err := d.storage.Store(req.id, buffer); nil != err { + d.log.Warnf("Could not store chunk %v: %v", req.id, err) + } +} + +func (d *Downloader) downloadFromAPI(request *Request, buffer []byte, delay int64) error { + // sleep if request is throttled + if delay > 0 { + time.Sleep(time.Duration(delay) * time.Second) + } + + unrestrictFn := func() (*realdebrid.UnrestrictResponse, error) { + return realdebrid.UnrestrictLink(d.c.GetToken(), request.file.Link) + } + resp := realdebrid.RetryUntilOk(unrestrictFn) + if resp == nil { + return fmt.Errorf("cannot unrestrict file %s %s", request.file.Path, request.file.Link) + } + + downloadURL := resp.Download + req, err := http.NewRequest("GET", downloadURL, nil) + if nil != err { + d.log.Debugf("%v", err) + return fmt.Errorf("could not create request object %s %s from API", request.file.Path, request.file.Link) + } + req.Header.Add("Range", fmt.Sprintf("bytes=%v-%v", request.offsetStart, request.offsetEnd-1)) + + d.log.Debugw("Sending HTTP Request %v", req) + + res, err := http.DefaultClient.Do(req) + if nil != err { + d.log.Debugf("%v", err) + return fmt.Errorf("could not request object %s %s from API", request.file.Path, request.file.Link) + } + defer res.Body.Close() + reader := res.Body + + if res.StatusCode != 206 && res.StatusCode != 200 { + return fmt.Errorf("could not read object %s %s / StatusCode: %v", + request.file.Path, request.file.Link, res.StatusCode) + } + + if res.ContentLength == -1 { + return fmt.Errorf("missing Content-Length header in response") + } + + n, err := io.ReadFull(reader, buffer[:res.ContentLength:cap(buffer)]) + if nil != err { + d.log.Debugf("%v", err) + return fmt.Errorf("could not read objects %s %s API response", request.file.Path, request.file.Link) + } + d.log.Debugf("Downloaded %v bytes of %s %s", n, request.file.Path, request.file.Link) + + return nil +} diff --git a/pkg/chunk/manager.go b/pkg/chunk/manager.go new file mode 100644 index 0000000..6302f2d --- /dev/null +++ b/pkg/chunk/manager.go @@ -0,0 +1,262 @@ +package chunk + +import ( + "encoding/binary" + "encoding/hex" + "fmt" + "net/http" + "os" + + "github.com/debridmediamanager.com/zurg/internal/config" + "github.com/debridmediamanager.com/zurg/internal/torrent" +) + +// Manager manages chunks on disk +type Manager struct { + ChunkSize int64 + LoadAhead int + downloader *Downloader + storage *Storage + queue chan *QueueEntry +} + +type QueueEntry struct { + request *Request + response chan Response +} + +// RequestID is the binary identifier for a chunk request +type RequestID [24]byte + +func (id RequestID) String() string { + return fmt.Sprintf("%032x:%v", id[:16], binary.BigEndian.Uint64(id[16:])) +} + +// Request represents a chunk request +type Request struct { + id RequestID + file *torrent.File + offsetStart int64 + offsetEnd int64 + chunkOffset int64 + chunkOffsetEnd int64 + sequence int + preload bool +} + +// Response represetns a chunk response +type Response struct { + Sequence int + Error error + Bytes []byte +} + +// NewManager creates a new chunk manager +func NewManager( + chunkFile string, + chunkSize int64, + loadAhead, + checkThreads int, + loadThreads int, + client *http.Client, + maxChunks int, + c config.ConfigInterface) (*Manager, error) { + + pageSize := int64(os.Getpagesize()) + if chunkSize < pageSize { + return nil, fmt.Errorf("chunk size must not be < %v", pageSize) + } + if chunkSize%pageSize != 0 { + return nil, fmt.Errorf("chunk size must be divideable by %v", pageSize) + } + // 32-Bit: ~2GB / 64-Bit: ~8EB + maxMmapSize := int64(^uint(0) >> 1) + if chunkSize > maxMmapSize { + return nil, fmt.Errorf("chunk size must be < %v", maxMmapSize) + } + if maxChunks < 2 || maxChunks < loadAhead { + return nil, fmt.Errorf("max-chunks must be greater than 2 and bigger than the load ahead value") + } + + storage, err := NewStorage(chunkSize, maxChunks, maxMmapSize, chunkFile) + if nil != err { + return nil, err + } + + downloader, err := NewDownloader(loadThreads, storage, chunkSize, c) + if nil != err { + return nil, err + } + + manager := Manager{ + ChunkSize: chunkSize, + LoadAhead: loadAhead, + downloader: downloader, + storage: storage, + queue: make(chan *QueueEntry, 100), + } + + if err := manager.storage.Clear(); nil != err { + return nil, err + } + + for i := 0; i < checkThreads; i++ { + go manager.thread() + } + + return &manager, nil +} + +// GetChunk loads one chunk and starts the preload for the next chunks +func (m *Manager) GetChunk(object *torrent.File, offset, size int64) ([]byte, error) { + maxOffset := object.Bytes + if offset > maxOffset { + return nil, fmt.Errorf("tried to read past EOF of %v at offset %v", object.ID, offset) + } + // Log.Infof("Request %v:%v md5:%v", object.ID, offset, object.MD5Checksum) + if offset+size > maxOffset { + size = object.Bytes - offset + } + + ranges := splitChunkRanges(offset, size, m.ChunkSize) + numRanges := len(ranges) + responses := make(chan Response, numRanges) + + last := numRanges - 1 + for i, r := range ranges { + m.requestChunk(object, r.offset, r.size, i, i == last, responses) + } + + data := make([]byte, size) + for i := 0; i < cap(responses); i++ { + res := <-responses + if nil != res.Error { + return nil, res.Error + } + + dataOffset := ranges[res.Sequence].offset - offset + + if n := copy(data[dataOffset:], res.Bytes); n == 0 { + return nil, fmt.Errorf("request %v slice %v has empty response", object.ID, res.Sequence) + } + } + close(responses) + + return data, nil +} + +func buildRequestID(object *torrent.File, offset int64) (id RequestID) { + hex.Decode(id[:], []byte(object.Link)) + binary.BigEndian.PutUint64(id[16:], uint64(offset)) + return +} + +func (m *Manager) requestChunk(object *torrent.File, offset, size int64, sequence int, preload bool, response chan Response) { + chunkOffset := offset % m.ChunkSize + offsetStart := offset - chunkOffset + offsetEnd := offsetStart + m.ChunkSize + + request := &Request{ + id: buildRequestID(object, offsetStart), + file: object, + offsetStart: offsetStart, + offsetEnd: offsetEnd, + chunkOffset: chunkOffset, + chunkOffsetEnd: chunkOffset + size, + sequence: sequence, + preload: false, + } + + m.queue <- &QueueEntry{ + request: request, + response: response, + } + + if !preload { + return + } + + for i := m.ChunkSize; i < (m.ChunkSize * int64(m.LoadAhead+1)); i += m.ChunkSize { + aheadOffsetStart := offsetStart + i + aheadOffsetEnd := aheadOffsetStart + m.ChunkSize + if uint64(aheadOffsetStart) < uint64(object.Bytes) && uint64(aheadOffsetEnd) < uint64(object.Bytes) { + request := &Request{ + id: buildRequestID(object, aheadOffsetStart), + file: object, + offsetStart: aheadOffsetStart, + offsetEnd: aheadOffsetEnd, + preload: true, + } + m.queue <- &QueueEntry{ + request: request, + } + } + } +} + +type byteRange struct { + offset, size int64 +} + +// Calculate request ranges that span multiple chunks +// +// This can happen with Direct-IO and unaligned reads or +// if the size is bigger than the chunk size. +func splitChunkRanges(offset, size, chunkSize int64) []byteRange { + ranges := make([]byteRange, 0, size/chunkSize+2) + for remaining := size; remaining > 0; remaining -= size { + size = min(remaining, chunkSize-offset%chunkSize) + ranges = append(ranges, byteRange{offset, size}) + offset += size + } + return ranges +} + +func (m *Manager) thread() { + for { + queueEntry := <-m.queue + m.checkChunk(queueEntry.request, queueEntry.response) + } +} + +func (m *Manager) checkChunk(req *Request, response chan Response) { + if nil == response { + if nil == m.storage.Load(req.id) { + m.downloader.Download(req, nil) + } + return + } + + if bytes := m.storage.Load(req.id); nil != bytes { + response <- Response{ + Sequence: req.sequence, + Bytes: adjustResponseChunk(req, bytes), + } + return + } + + m.downloader.Download(req, func(err error, bytes []byte) { + response <- Response{ + Sequence: req.sequence, + Error: err, + Bytes: adjustResponseChunk(req, bytes), + } + }) +} + +func adjustResponseChunk(req *Request, bytes []byte) []byte { + if nil == bytes { + return nil + } + bytesLen := int64(len(bytes)) + sOffset := min(req.chunkOffset, bytesLen) + eOffset := min(req.chunkOffsetEnd, bytesLen) + return bytes[sOffset:eOffset] +} + +func min(x, y int64) int64 { + if x < y { + return x + } + return y +} diff --git a/pkg/chunk/manager_test.go b/pkg/chunk/manager_test.go new file mode 100644 index 0000000..ceb78e5 --- /dev/null +++ b/pkg/chunk/manager_test.go @@ -0,0 +1,53 @@ +package chunk + +import "testing" + +func TestSplitChunkRanges(t *testing.T) { + testcases := []struct { + offset, size, chunkSize int64 + result []byteRange + }{ + {0, 0, 4096, []byteRange{}}, + {0, 4096, 4096, []byteRange{ + {0, 4096}, + }}, + {4095, 4096, 4096, []byteRange{ + {4095, 1}, + {4096, 4095}, + }}, + {0, 8192, 4096, []byteRange{ + {0, 4096}, + {4096, 4096}, + }}, + {2048, 8192, 4096, []byteRange{ + {2048, 2048}, + {4096, 4096}, + {8192, 2048}, + }}, + {2048, 8192, 4096, []byteRange{ + {2048, 2048}, + {4096, 4096}, + {8192, 2048}, + }}, + {17960960, 16777216, 10485760, []byteRange{ + {17960960, 3010560}, + {20971520, 10485760}, + {31457280, 3280896}, + }}, + } + for i, tc := range testcases { + ranges := splitChunkRanges(tc.offset, tc.size, tc.chunkSize) + actualSize := len(ranges) + expectedSize := len(tc.result) + if actualSize != expectedSize { + t.Fatalf("ByteRange %v length mismatch: %v != %v", i, actualSize, expectedSize) + } + for j, r := range ranges { + actual := r + expected := tc.result[j] + if actual != expected { + t.Fatalf("ByteRange %v mismatch: %v != %v", i, actual, expected) + } + } + } +} diff --git a/pkg/chunk/stack.go b/pkg/chunk/stack.go new file mode 100644 index 0000000..8955306 --- /dev/null +++ b/pkg/chunk/stack.go @@ -0,0 +1,75 @@ +package chunk + +import ( + "container/list" + "sync" +) + +// Stack is a thread safe list/stack implementation +type Stack struct { + items *list.List + lock sync.Mutex + maxSize int +} + +// NewStack creates a new stack +func NewStack(maxChunks int) *Stack { + return &Stack{ + items: list.New(), + maxSize: maxChunks, + } +} + +// Len gets the number of items on the stack +func (s *Stack) Len() int { + s.lock.Lock() + defer s.lock.Unlock() + return s.items.Len() +} + +// Pop pops the first item from the stack +func (s *Stack) Pop() int { + s.lock.Lock() + defer s.lock.Unlock() + if s.items.Len() < s.maxSize { + return -1 + } + item := s.items.Front() + if nil == item { + return -1 + } + s.items.Remove(item) + return item.Value.(int) +} + +// Touch moves the specified item to the last position of the stack +func (s *Stack) Touch(item *list.Element) { + s.lock.Lock() + if item != s.items.Back() { + s.items.MoveToBack(item) + } + s.lock.Unlock() +} + +// Push adds a new item to the last position of the stack +func (s *Stack) Push(id int) *list.Element { + s.lock.Lock() + defer s.lock.Unlock() + return s.items.PushBack(id) +} + +// Prepend adds a list to the front of the stack +func (s *Stack) Prepend(items *list.List) { + s.lock.Lock() + s.items.PushFrontList(items) + s.lock.Unlock() +} + +// Purge an item from the stack +func (s *Stack) Purge(item *list.Element) { + s.lock.Lock() + defer s.lock.Unlock() + if item != s.items.Front() { + s.items.MoveToFront(item) + } +} diff --git a/pkg/chunk/stack_test.go b/pkg/chunk/stack_test.go new file mode 100644 index 0000000..023bd70 --- /dev/null +++ b/pkg/chunk/stack_test.go @@ -0,0 +1,72 @@ +package chunk + +import "testing" + +func TestOOB(t *testing.T) { + stack := NewStack(1) + + item := stack.Push(1) + stack.Touch(item) +} + +func TestAddToStack(t *testing.T) { + stack := NewStack(1) + + item1 := stack.Push(1) + item2 := stack.Push(2) + item3 := stack.Push(3) + item4 := stack.Push(4) + + stack.Touch(item1) + stack.Touch(item3) + + stack.Purge(item2) + stack.Purge(item4) + + v := stack.Pop() + if v != 4 { + t.Fatalf("Expected 4 got %v", v) + } + + v = stack.Pop() + if v != 2 { + t.Fatalf("Expected 2 got %v", v) + } + + v = stack.Pop() + if v != 1 { + t.Fatalf("Expected 1 got %v", v) + } + + v = stack.Pop() + if v != 3 { + t.Fatalf("Expected 3 got %v", v) + } + + v = stack.Pop() + if v != -1 { + t.Fatalf("Expected -1 got %v", v) + } +} + +func TestLen(t *testing.T) { + stack := NewStack(1) + + v := stack.Len() + if v != 0 { + t.Fatalf("Expected 0 got %v", v) + } + + stack.Push(1) + v = stack.Len() + if v != 1 { + t.Fatalf("Expected 1 got %v", v) + } + + _ = stack.Pop() + v = stack.Len() + if v != 0 { + t.Fatalf("Expected 0 got %v", v) + } + +} diff --git a/pkg/chunk/storage.go b/pkg/chunk/storage.go new file mode 100644 index 0000000..264dcc8 --- /dev/null +++ b/pkg/chunk/storage.go @@ -0,0 +1,420 @@ +package chunk + +import ( + "container/list" + "fmt" + "hash/crc32" + "os" + "os/signal" + "sync" + "syscall" + "time" + "unsafe" + + "go.uber.org/zap" + "golang.org/x/sys/unix" + + "github.com/debridmediamanager.com/zurg/pkg/logutil" +) + +const ( + headerSize = int(unsafe.Sizeof(*new(chunkHeader))) + tocSize = int64(unsafe.Sizeof(*new(journalHeader))) + journalMagic = uint16('P'<<8 | 'D'&0xFF) + journalVersion = uint8(2) +) + +var ( + blankRequestID RequestID + crc32Table = crc32.MakeTable(crc32.Castagnoli) +) + +// Storage is a chunk storage +type Storage struct { + ChunkFile *os.File + ChunkSize int64 + HeaderSize int64 + MaxChunks int + chunks map[RequestID]int + stack *Stack + lock sync.RWMutex + buffers []*Chunk + loadChunks int + signals chan os.Signal + journal []byte + mmapRegions [][]byte + chunksPerRegion int64 + log *zap.SugaredLogger +} + +type journalHeader struct { + magic uint16 + version uint8 + headerSize uint8 + maxChunks uint32 + chunkSize uint32 + checksum uint32 +} + +// NewStorage creates a new storage +func NewStorage(chunkSize int64, maxChunks int, maxMmapSize int64, chunkFilePath string) (*Storage, error) { + rlog := logutil.NewLogger() + log := rlog.Named("storage") + + s := Storage{ + ChunkSize: chunkSize, + MaxChunks: maxChunks, + chunks: make(map[RequestID]int, maxChunks), + stack: NewStack(maxChunks), + buffers: make([]*Chunk, maxChunks), + signals: make(chan os.Signal, 1), + log: log, + } + + journalSize := tocSize + int64(headerSize*maxChunks) + journalOffset := chunkSize * int64(maxChunks) + + // Non-empty string in chunkFilePath enables MMAP disk storage for chunks + if chunkFilePath != "" { + chunkFile, err := os.OpenFile(chunkFilePath, os.O_RDWR|os.O_CREATE, 0600) + if nil != err { + s.log.Debugf("%v", err) + return nil, fmt.Errorf("could not open chunk cache file") + } + s.ChunkFile = chunkFile + currentSize, err := chunkFile.Seek(0, os.SEEK_END) + if nil != err { + s.log.Debugf("%v", err) + return nil, fmt.Errorf("chunk file is not seekable") + } + wantedSize := journalOffset + journalSize + s.log.Debugf("Current chunk cache file size: %v B (wanted: %v B)", currentSize, wantedSize) + if err := chunkFile.Truncate(currentSize); nil != err { + s.log.Warnf("Could not truncate chunk cache, skip resizing") + } else if currentSize != wantedSize { + if currentSize > tocSize { + err = s.relocateJournal(currentSize, wantedSize, journalSize, journalOffset) + if nil != err { + s.log.Errorf("%v", err) + } else { + s.log.Infof("Relocated chunk cache journal") + } + } + if err := chunkFile.Truncate(wantedSize); nil != err { + s.log.Debugf("%v", err) + return nil, fmt.Errorf("could not resize chunk cache file") + } + } + s.log.Infof("Created chunk cache file %v", chunkFile.Name()) + s.loadChunks = int(min(currentSize/chunkSize, int64(maxChunks))) + } + + // Alocate journal + if journal, err := s.mmap(journalOffset, journalSize); nil != err { + return nil, fmt.Errorf("could not allocate journal: %v", err) + } else { + if err := unix.Madvise(journal, syscall.MADV_RANDOM); nil != err { + s.log.Warnf("Madvise MADV_RANDOM for journal failed: %v", err) + } + tocOffset := journalSize - tocSize + header := journal[tocOffset:] + if valid := s.checkJournal(header, false); !valid { + s.initJournal(header) + } + s.journal = journal[:tocOffset] + } + + // Setup sighandler + signal.Notify(s.signals, syscall.SIGINT, syscall.SIGTERM) + + // Allocate mmap regions for chunks + if err := s.allocateMmapRegions(maxMmapSize); nil != err { + return nil, err + } + // Map chunks to slices from mmap regions + if err := s.mmapChunks(); nil != err { + return nil, err + } + + return &s, nil +} + +// relocateJournal moves existing journal prior to resize +func (s *Storage) relocateJournal(currentSize, wantedSize, journalSize, journalOffset int64) error { + header := make([]byte, tocSize) + if _, err := s.ChunkFile.ReadAt(header, currentSize-tocSize); nil != err { + return fmt.Errorf("failed to read journal header: %v", err) + } + + if valid := s.checkJournal(header, true); !valid { + return fmt.Errorf("failed to validate journal header") + } + + h := (*journalHeader)(unsafe.Pointer(&header[0])) + oldJournalOffset := s.ChunkSize * int64(h.maxChunks) + oldJournalSize := min(journalSize, currentSize-oldJournalOffset) - tocSize + journal := make([]byte, journalSize) + + if _, err := s.ChunkFile.ReadAt(journal[:oldJournalSize], oldJournalOffset); nil != err { + return fmt.Errorf("failed to read journal: %v", err) + } + + s.initJournal(header) + + sizeWithoutJournal := currentSize - oldJournalSize - tocSize + if err := s.ChunkFile.Truncate(sizeWithoutJournal); nil != err { + return fmt.Errorf("could not truncate chunk cache journal: %v", err) + } + + if err := s.ChunkFile.Truncate(wantedSize); nil != err { + return fmt.Errorf("could not resize chunk cache file: %v", err) + } + + if _, err := s.ChunkFile.WriteAt(journal, journalOffset); nil != err { + return fmt.Errorf("failed to write journal: %v", err) + } + if _, err := s.ChunkFile.WriteAt(header, wantedSize-tocSize); nil != err { + return fmt.Errorf("failed to write journal header: %v", err) + } + return nil +} + +// checkJournal verifies the journal header +func (s *Storage) checkJournal(journal []byte, skipMaxChunks bool) bool { + h := (*journalHeader)(unsafe.Pointer(&journal[0])) + // check magic bytes / endianess mismatch ('PD' vs 'DP') + if h.magic != journalMagic { + s.log.Debugf("Journal magic mismatch: %v != %v", h.magic, journalMagic) + return false + } + checksum := crc32.Checksum(journal[:12], crc32Table) + if h.checksum != checksum { + s.log.Debugf("Journal checksum mismatch: %08X != %08X", h.checksum, checksum) + return false + } + if h.version != journalVersion { + s.log.Debugf("Journal version mismatch: %v != %v", h.version, journalVersion) + return false + } + if h.headerSize != uint8(headerSize) { + s.log.Debugf("Journal chunk header size mismatch: %v != %v", h.headerSize, headerSize) + return false + } + if !skipMaxChunks && h.maxChunks != uint32(s.MaxChunks) { + s.log.Debugf("Journal max chunks mismatch: %v != %v", h.maxChunks, s.MaxChunks) + return false + } + if h.chunkSize != uint32(s.ChunkSize) { + s.log.Debugf("Journal chunk size mismatch: %v != %v", h.chunkSize, s.ChunkSize) + return false + } + s.log.Debug("Journal is valid") + return true +} + +// initJournal initializes the journal +func (s *Storage) initJournal(journal []byte) { + h := (*journalHeader)(unsafe.Pointer(&journal[0])) + h.magic = journalMagic + h.version = journalVersion + h.headerSize = uint8(headerSize) + h.maxChunks = uint32(s.MaxChunks) + h.chunkSize = uint32(s.ChunkSize) + h.checksum = crc32.Checksum(journal[:12], crc32Table) +} + +// allocateMmapRegions creates memory mappings to fit all chunks +func (s *Storage) allocateMmapRegions(maxMmapSize int64) error { + s.chunksPerRegion = maxMmapSize / s.ChunkSize + regionSize := s.chunksPerRegion * s.ChunkSize + numRegions := int64(s.MaxChunks) / s.chunksPerRegion + remChunks := int64(s.MaxChunks) % s.chunksPerRegion + if remChunks != 0 { + numRegions++ + } + s.mmapRegions = make([][]byte, numRegions) + for i := int64(0); i < int64(len(s.mmapRegions)); i++ { + size := regionSize + if i == numRegions-1 && remChunks != 0 { + size = remChunks * s.ChunkSize + } + s.log.Debugf("Allocate mmap region %v/%v with size %v B", i+1, numRegions, size) + region, err := s.mmap(i*regionSize, size) + if nil != err { + s.log.Errorf("failed to mmap region %v/%v with size %v B", i+1, numRegions, size) + return err + } + if err := unix.Madvise(region, syscall.MADV_SEQUENTIAL); nil != err { + s.log.Warnf("Madvise MADV_SEQUENTIAL for region %v/%v failed: %v", i+1, numRegions, err) + } + s.mmapRegions[i] = region + } + return nil +} + +// mmapChunks slices buffers from mmap regions and loads chunk metadata +func (s *Storage) mmapChunks() error { + start := time.Now() + empty := list.New() + restored := list.New() + loadedChunks := 0 + for i := 0; i < s.MaxChunks; i++ { + select { + case sig := <-s.signals: + s.log.Warnf("Received signal %v, aborting chunk loader", sig) + return fmt.Errorf("aborted by signal") + default: + if loaded, err := s.initChunk(i, empty, restored); nil != err { + s.log.Errorf("failed to allocate chunk %v: %v", i, err) + return fmt.Errorf("failed to initialize chunks") + } else if loaded { + loadedChunks++ + } + } + } + s.stack.Prepend(restored) + s.stack.Prepend(empty) + elapsed := time.Since(start) + if nil != s.ChunkFile { + s.log.Infof("Loaded %v/%v cache chunks in %v", loadedChunks, s.MaxChunks, elapsed) + } else { + s.log.Infof("Allocated %v cache chunks in %v", s.MaxChunks, elapsed) + } + return nil +} + +// initChunk tries to restore a chunk from disk +func (s *Storage) initChunk(index int, empty *list.List, restored *list.List) (bool, error) { + chunk, err := s.allocateChunk(index) + if err != nil { + s.log.Debugf("%v", err) + return false, err + } + + s.buffers[index] = chunk + + id := chunk.id + + if blankRequestID == id || index >= s.loadChunks { + chunk.item = empty.PushBack(index) + // s.log.Tracef("Allocate chunk %v/%v", index+1, s.MaxChunks) + return false, nil + } + + chunk.item = restored.PushBack(index) + // s.log.Tracef("Load chunk %v/%v (restored: %v)", index+1, s.MaxChunks, id) + s.chunks[id] = index + + return true, nil +} + +// allocateChunk creates a new mmap-backed chunk +func (s *Storage) allocateChunk(index int) (*Chunk, error) { + region := int64(index) / s.chunksPerRegion + offset := (int64(index) - region*s.chunksPerRegion) * s.ChunkSize + // s.log.Tracef("Allocate chunk %v from region %v at offset %v", index+1, region, offset) + bytes := s.mmapRegions[region][offset : offset+s.ChunkSize : offset+s.ChunkSize] + headerOffset := index * headerSize + header := (*chunkHeader)(unsafe.Pointer(&s.journal[headerOffset])) + chunk := Chunk{ + chunkHeader: header, + bytes: bytes, + } + return &chunk, nil +} + +func (s *Storage) mmap(offset, size int64) ([]byte, error) { + if s.ChunkFile != nil { + return unix.Mmap(int(s.ChunkFile.Fd()), offset, int(size), syscall.PROT_READ|syscall.PROT_WRITE, syscall.MAP_SHARED) + } else { + return unix.Mmap(-1, 0, int(size), syscall.PROT_READ|syscall.PROT_WRITE, syscall.MAP_ANON|syscall.MAP_PRIVATE) + } +} + +// Clear removes all old chunks on disk (will be called on each program start) +func (s *Storage) Clear() error { + return nil +} + +// Load a chunk from ram or creates it +func (s *Storage) Load(id RequestID) []byte { + s.lock.RLock() + chunk := s.fetch(id) + if nil == chunk { + // s.log.Tracef("Load chunk %v (missing)", id) + s.lock.RUnlock() + return nil + } + if chunk.clean { + // s.log.Tracef("Load chunk %v (clean)", id) + defer s.lock.RUnlock() + return chunk.bytes + } + s.lock.RUnlock() + // Switch to write lock to avoid races on crc verification + s.lock.Lock() + defer s.lock.Unlock() + if chunk.valid(id) { + s.log.Debugf("Load chunk %v (verified)", id) + return chunk.bytes + } + s.log.Warnf("Load chunk %v (bad checksum: %08x <> %08x)", id, chunk.checksum, chunk.calculateChecksum()) + s.stack.Purge(chunk.item) + return nil +} + +// Store stores a chunk in the RAM and adds it to the disk storage queue +func (s *Storage) Store(id RequestID, bytes []byte) (err error) { + s.lock.RLock() + + // Avoid storing same chunk multiple times + chunk := s.fetch(id) + if nil != chunk && chunk.clean { + // s.log.Tracef("Create chunk %v (exists: clean)", id) + s.lock.RUnlock() + return nil + } + + s.lock.RUnlock() + s.lock.Lock() + defer s.lock.Unlock() + + if nil != chunk { + if chunk.valid(id) { + s.log.Debugf("Create chunk %v (exists: valid)", id) + return nil + } + s.log.Warnf("Create chunk %v(exists: overwrite)", id) + } else { + index := s.stack.Pop() + if index == -1 { + s.log.Debugf("Create chunk %v (failed)", id) + return fmt.Errorf("no buffers available") + } + chunk = s.buffers[index] + deleteID := chunk.id + if blankRequestID != deleteID { + delete(s.chunks, deleteID) + s.log.Debugf("Create chunk %v (reused)", id) + } else { + s.log.Debugf("Create chunk %v (stored)", id) + } + s.chunks[id] = index + chunk.item = s.stack.Push(index) + } + + chunk.update(id, bytes) + + return nil +} + +// fetch chunk and index by id +func (s *Storage) fetch(id RequestID) *Chunk { + index, exists := s.chunks[id] + if !exists { + return nil + } + chunk := s.buffers[index] + s.stack.Touch(chunk.item) + return chunk +}