Big refactor
This commit is contained in:
420
pkg/chunk/storage.go
Normal file
420
pkg/chunk/storage.go
Normal file
@@ -0,0 +1,420 @@
|
||||
package chunk
|
||||
|
||||
import (
|
||||
"container/list"
|
||||
"fmt"
|
||||
"hash/crc32"
|
||||
"os"
|
||||
"os/signal"
|
||||
"sync"
|
||||
"syscall"
|
||||
"time"
|
||||
"unsafe"
|
||||
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/sys/unix"
|
||||
|
||||
"github.com/debridmediamanager.com/zurg/pkg/logutil"
|
||||
)
|
||||
|
||||
const (
|
||||
headerSize = int(unsafe.Sizeof(*new(chunkHeader)))
|
||||
tocSize = int64(unsafe.Sizeof(*new(journalHeader)))
|
||||
journalMagic = uint16('P'<<8 | 'D'&0xFF)
|
||||
journalVersion = uint8(2)
|
||||
)
|
||||
|
||||
var (
|
||||
blankRequestID RequestID
|
||||
crc32Table = crc32.MakeTable(crc32.Castagnoli)
|
||||
)
|
||||
|
||||
// Storage is a chunk storage
|
||||
type Storage struct {
|
||||
ChunkFile *os.File
|
||||
ChunkSize int64
|
||||
HeaderSize int64
|
||||
MaxChunks int
|
||||
chunks map[RequestID]int
|
||||
stack *Stack
|
||||
lock sync.RWMutex
|
||||
buffers []*Chunk
|
||||
loadChunks int
|
||||
signals chan os.Signal
|
||||
journal []byte
|
||||
mmapRegions [][]byte
|
||||
chunksPerRegion int64
|
||||
log *zap.SugaredLogger
|
||||
}
|
||||
|
||||
type journalHeader struct {
|
||||
magic uint16
|
||||
version uint8
|
||||
headerSize uint8
|
||||
maxChunks uint32
|
||||
chunkSize uint32
|
||||
checksum uint32
|
||||
}
|
||||
|
||||
// NewStorage creates a new storage
|
||||
func NewStorage(chunkSize int64, maxChunks int, maxMmapSize int64, chunkFilePath string) (*Storage, error) {
|
||||
rlog := logutil.NewLogger()
|
||||
log := rlog.Named("storage")
|
||||
|
||||
s := Storage{
|
||||
ChunkSize: chunkSize,
|
||||
MaxChunks: maxChunks,
|
||||
chunks: make(map[RequestID]int, maxChunks),
|
||||
stack: NewStack(maxChunks),
|
||||
buffers: make([]*Chunk, maxChunks),
|
||||
signals: make(chan os.Signal, 1),
|
||||
log: log,
|
||||
}
|
||||
|
||||
journalSize := tocSize + int64(headerSize*maxChunks)
|
||||
journalOffset := chunkSize * int64(maxChunks)
|
||||
|
||||
// Non-empty string in chunkFilePath enables MMAP disk storage for chunks
|
||||
if chunkFilePath != "" {
|
||||
chunkFile, err := os.OpenFile(chunkFilePath, os.O_RDWR|os.O_CREATE, 0600)
|
||||
if nil != err {
|
||||
s.log.Debugf("%v", err)
|
||||
return nil, fmt.Errorf("could not open chunk cache file")
|
||||
}
|
||||
s.ChunkFile = chunkFile
|
||||
currentSize, err := chunkFile.Seek(0, os.SEEK_END)
|
||||
if nil != err {
|
||||
s.log.Debugf("%v", err)
|
||||
return nil, fmt.Errorf("chunk file is not seekable")
|
||||
}
|
||||
wantedSize := journalOffset + journalSize
|
||||
s.log.Debugf("Current chunk cache file size: %v B (wanted: %v B)", currentSize, wantedSize)
|
||||
if err := chunkFile.Truncate(currentSize); nil != err {
|
||||
s.log.Warnf("Could not truncate chunk cache, skip resizing")
|
||||
} else if currentSize != wantedSize {
|
||||
if currentSize > tocSize {
|
||||
err = s.relocateJournal(currentSize, wantedSize, journalSize, journalOffset)
|
||||
if nil != err {
|
||||
s.log.Errorf("%v", err)
|
||||
} else {
|
||||
s.log.Infof("Relocated chunk cache journal")
|
||||
}
|
||||
}
|
||||
if err := chunkFile.Truncate(wantedSize); nil != err {
|
||||
s.log.Debugf("%v", err)
|
||||
return nil, fmt.Errorf("could not resize chunk cache file")
|
||||
}
|
||||
}
|
||||
s.log.Infof("Created chunk cache file %v", chunkFile.Name())
|
||||
s.loadChunks = int(min(currentSize/chunkSize, int64(maxChunks)))
|
||||
}
|
||||
|
||||
// Alocate journal
|
||||
if journal, err := s.mmap(journalOffset, journalSize); nil != err {
|
||||
return nil, fmt.Errorf("could not allocate journal: %v", err)
|
||||
} else {
|
||||
if err := unix.Madvise(journal, syscall.MADV_RANDOM); nil != err {
|
||||
s.log.Warnf("Madvise MADV_RANDOM for journal failed: %v", err)
|
||||
}
|
||||
tocOffset := journalSize - tocSize
|
||||
header := journal[tocOffset:]
|
||||
if valid := s.checkJournal(header, false); !valid {
|
||||
s.initJournal(header)
|
||||
}
|
||||
s.journal = journal[:tocOffset]
|
||||
}
|
||||
|
||||
// Setup sighandler
|
||||
signal.Notify(s.signals, syscall.SIGINT, syscall.SIGTERM)
|
||||
|
||||
// Allocate mmap regions for chunks
|
||||
if err := s.allocateMmapRegions(maxMmapSize); nil != err {
|
||||
return nil, err
|
||||
}
|
||||
// Map chunks to slices from mmap regions
|
||||
if err := s.mmapChunks(); nil != err {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &s, nil
|
||||
}
|
||||
|
||||
// relocateJournal moves existing journal prior to resize
|
||||
func (s *Storage) relocateJournal(currentSize, wantedSize, journalSize, journalOffset int64) error {
|
||||
header := make([]byte, tocSize)
|
||||
if _, err := s.ChunkFile.ReadAt(header, currentSize-tocSize); nil != err {
|
||||
return fmt.Errorf("failed to read journal header: %v", err)
|
||||
}
|
||||
|
||||
if valid := s.checkJournal(header, true); !valid {
|
||||
return fmt.Errorf("failed to validate journal header")
|
||||
}
|
||||
|
||||
h := (*journalHeader)(unsafe.Pointer(&header[0]))
|
||||
oldJournalOffset := s.ChunkSize * int64(h.maxChunks)
|
||||
oldJournalSize := min(journalSize, currentSize-oldJournalOffset) - tocSize
|
||||
journal := make([]byte, journalSize)
|
||||
|
||||
if _, err := s.ChunkFile.ReadAt(journal[:oldJournalSize], oldJournalOffset); nil != err {
|
||||
return fmt.Errorf("failed to read journal: %v", err)
|
||||
}
|
||||
|
||||
s.initJournal(header)
|
||||
|
||||
sizeWithoutJournal := currentSize - oldJournalSize - tocSize
|
||||
if err := s.ChunkFile.Truncate(sizeWithoutJournal); nil != err {
|
||||
return fmt.Errorf("could not truncate chunk cache journal: %v", err)
|
||||
}
|
||||
|
||||
if err := s.ChunkFile.Truncate(wantedSize); nil != err {
|
||||
return fmt.Errorf("could not resize chunk cache file: %v", err)
|
||||
}
|
||||
|
||||
if _, err := s.ChunkFile.WriteAt(journal, journalOffset); nil != err {
|
||||
return fmt.Errorf("failed to write journal: %v", err)
|
||||
}
|
||||
if _, err := s.ChunkFile.WriteAt(header, wantedSize-tocSize); nil != err {
|
||||
return fmt.Errorf("failed to write journal header: %v", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// checkJournal verifies the journal header
|
||||
func (s *Storage) checkJournal(journal []byte, skipMaxChunks bool) bool {
|
||||
h := (*journalHeader)(unsafe.Pointer(&journal[0]))
|
||||
// check magic bytes / endianess mismatch ('PD' vs 'DP')
|
||||
if h.magic != journalMagic {
|
||||
s.log.Debugf("Journal magic mismatch: %v != %v", h.magic, journalMagic)
|
||||
return false
|
||||
}
|
||||
checksum := crc32.Checksum(journal[:12], crc32Table)
|
||||
if h.checksum != checksum {
|
||||
s.log.Debugf("Journal checksum mismatch: %08X != %08X", h.checksum, checksum)
|
||||
return false
|
||||
}
|
||||
if h.version != journalVersion {
|
||||
s.log.Debugf("Journal version mismatch: %v != %v", h.version, journalVersion)
|
||||
return false
|
||||
}
|
||||
if h.headerSize != uint8(headerSize) {
|
||||
s.log.Debugf("Journal chunk header size mismatch: %v != %v", h.headerSize, headerSize)
|
||||
return false
|
||||
}
|
||||
if !skipMaxChunks && h.maxChunks != uint32(s.MaxChunks) {
|
||||
s.log.Debugf("Journal max chunks mismatch: %v != %v", h.maxChunks, s.MaxChunks)
|
||||
return false
|
||||
}
|
||||
if h.chunkSize != uint32(s.ChunkSize) {
|
||||
s.log.Debugf("Journal chunk size mismatch: %v != %v", h.chunkSize, s.ChunkSize)
|
||||
return false
|
||||
}
|
||||
s.log.Debug("Journal is valid")
|
||||
return true
|
||||
}
|
||||
|
||||
// initJournal initializes the journal
|
||||
func (s *Storage) initJournal(journal []byte) {
|
||||
h := (*journalHeader)(unsafe.Pointer(&journal[0]))
|
||||
h.magic = journalMagic
|
||||
h.version = journalVersion
|
||||
h.headerSize = uint8(headerSize)
|
||||
h.maxChunks = uint32(s.MaxChunks)
|
||||
h.chunkSize = uint32(s.ChunkSize)
|
||||
h.checksum = crc32.Checksum(journal[:12], crc32Table)
|
||||
}
|
||||
|
||||
// allocateMmapRegions creates memory mappings to fit all chunks
|
||||
func (s *Storage) allocateMmapRegions(maxMmapSize int64) error {
|
||||
s.chunksPerRegion = maxMmapSize / s.ChunkSize
|
||||
regionSize := s.chunksPerRegion * s.ChunkSize
|
||||
numRegions := int64(s.MaxChunks) / s.chunksPerRegion
|
||||
remChunks := int64(s.MaxChunks) % s.chunksPerRegion
|
||||
if remChunks != 0 {
|
||||
numRegions++
|
||||
}
|
||||
s.mmapRegions = make([][]byte, numRegions)
|
||||
for i := int64(0); i < int64(len(s.mmapRegions)); i++ {
|
||||
size := regionSize
|
||||
if i == numRegions-1 && remChunks != 0 {
|
||||
size = remChunks * s.ChunkSize
|
||||
}
|
||||
s.log.Debugf("Allocate mmap region %v/%v with size %v B", i+1, numRegions, size)
|
||||
region, err := s.mmap(i*regionSize, size)
|
||||
if nil != err {
|
||||
s.log.Errorf("failed to mmap region %v/%v with size %v B", i+1, numRegions, size)
|
||||
return err
|
||||
}
|
||||
if err := unix.Madvise(region, syscall.MADV_SEQUENTIAL); nil != err {
|
||||
s.log.Warnf("Madvise MADV_SEQUENTIAL for region %v/%v failed: %v", i+1, numRegions, err)
|
||||
}
|
||||
s.mmapRegions[i] = region
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// mmapChunks slices buffers from mmap regions and loads chunk metadata
|
||||
func (s *Storage) mmapChunks() error {
|
||||
start := time.Now()
|
||||
empty := list.New()
|
||||
restored := list.New()
|
||||
loadedChunks := 0
|
||||
for i := 0; i < s.MaxChunks; i++ {
|
||||
select {
|
||||
case sig := <-s.signals:
|
||||
s.log.Warnf("Received signal %v, aborting chunk loader", sig)
|
||||
return fmt.Errorf("aborted by signal")
|
||||
default:
|
||||
if loaded, err := s.initChunk(i, empty, restored); nil != err {
|
||||
s.log.Errorf("failed to allocate chunk %v: %v", i, err)
|
||||
return fmt.Errorf("failed to initialize chunks")
|
||||
} else if loaded {
|
||||
loadedChunks++
|
||||
}
|
||||
}
|
||||
}
|
||||
s.stack.Prepend(restored)
|
||||
s.stack.Prepend(empty)
|
||||
elapsed := time.Since(start)
|
||||
if nil != s.ChunkFile {
|
||||
s.log.Infof("Loaded %v/%v cache chunks in %v", loadedChunks, s.MaxChunks, elapsed)
|
||||
} else {
|
||||
s.log.Infof("Allocated %v cache chunks in %v", s.MaxChunks, elapsed)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// initChunk tries to restore a chunk from disk
|
||||
func (s *Storage) initChunk(index int, empty *list.List, restored *list.List) (bool, error) {
|
||||
chunk, err := s.allocateChunk(index)
|
||||
if err != nil {
|
||||
s.log.Debugf("%v", err)
|
||||
return false, err
|
||||
}
|
||||
|
||||
s.buffers[index] = chunk
|
||||
|
||||
id := chunk.id
|
||||
|
||||
if blankRequestID == id || index >= s.loadChunks {
|
||||
chunk.item = empty.PushBack(index)
|
||||
// s.log.Tracef("Allocate chunk %v/%v", index+1, s.MaxChunks)
|
||||
return false, nil
|
||||
}
|
||||
|
||||
chunk.item = restored.PushBack(index)
|
||||
// s.log.Tracef("Load chunk %v/%v (restored: %v)", index+1, s.MaxChunks, id)
|
||||
s.chunks[id] = index
|
||||
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// allocateChunk creates a new mmap-backed chunk
|
||||
func (s *Storage) allocateChunk(index int) (*Chunk, error) {
|
||||
region := int64(index) / s.chunksPerRegion
|
||||
offset := (int64(index) - region*s.chunksPerRegion) * s.ChunkSize
|
||||
// s.log.Tracef("Allocate chunk %v from region %v at offset %v", index+1, region, offset)
|
||||
bytes := s.mmapRegions[region][offset : offset+s.ChunkSize : offset+s.ChunkSize]
|
||||
headerOffset := index * headerSize
|
||||
header := (*chunkHeader)(unsafe.Pointer(&s.journal[headerOffset]))
|
||||
chunk := Chunk{
|
||||
chunkHeader: header,
|
||||
bytes: bytes,
|
||||
}
|
||||
return &chunk, nil
|
||||
}
|
||||
|
||||
func (s *Storage) mmap(offset, size int64) ([]byte, error) {
|
||||
if s.ChunkFile != nil {
|
||||
return unix.Mmap(int(s.ChunkFile.Fd()), offset, int(size), syscall.PROT_READ|syscall.PROT_WRITE, syscall.MAP_SHARED)
|
||||
} else {
|
||||
return unix.Mmap(-1, 0, int(size), syscall.PROT_READ|syscall.PROT_WRITE, syscall.MAP_ANON|syscall.MAP_PRIVATE)
|
||||
}
|
||||
}
|
||||
|
||||
// Clear removes all old chunks on disk (will be called on each program start)
|
||||
func (s *Storage) Clear() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Load a chunk from ram or creates it
|
||||
func (s *Storage) Load(id RequestID) []byte {
|
||||
s.lock.RLock()
|
||||
chunk := s.fetch(id)
|
||||
if nil == chunk {
|
||||
// s.log.Tracef("Load chunk %v (missing)", id)
|
||||
s.lock.RUnlock()
|
||||
return nil
|
||||
}
|
||||
if chunk.clean {
|
||||
// s.log.Tracef("Load chunk %v (clean)", id)
|
||||
defer s.lock.RUnlock()
|
||||
return chunk.bytes
|
||||
}
|
||||
s.lock.RUnlock()
|
||||
// Switch to write lock to avoid races on crc verification
|
||||
s.lock.Lock()
|
||||
defer s.lock.Unlock()
|
||||
if chunk.valid(id) {
|
||||
s.log.Debugf("Load chunk %v (verified)", id)
|
||||
return chunk.bytes
|
||||
}
|
||||
s.log.Warnf("Load chunk %v (bad checksum: %08x <> %08x)", id, chunk.checksum, chunk.calculateChecksum())
|
||||
s.stack.Purge(chunk.item)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Store stores a chunk in the RAM and adds it to the disk storage queue
|
||||
func (s *Storage) Store(id RequestID, bytes []byte) (err error) {
|
||||
s.lock.RLock()
|
||||
|
||||
// Avoid storing same chunk multiple times
|
||||
chunk := s.fetch(id)
|
||||
if nil != chunk && chunk.clean {
|
||||
// s.log.Tracef("Create chunk %v (exists: clean)", id)
|
||||
s.lock.RUnlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
s.lock.RUnlock()
|
||||
s.lock.Lock()
|
||||
defer s.lock.Unlock()
|
||||
|
||||
if nil != chunk {
|
||||
if chunk.valid(id) {
|
||||
s.log.Debugf("Create chunk %v (exists: valid)", id)
|
||||
return nil
|
||||
}
|
||||
s.log.Warnf("Create chunk %v(exists: overwrite)", id)
|
||||
} else {
|
||||
index := s.stack.Pop()
|
||||
if index == -1 {
|
||||
s.log.Debugf("Create chunk %v (failed)", id)
|
||||
return fmt.Errorf("no buffers available")
|
||||
}
|
||||
chunk = s.buffers[index]
|
||||
deleteID := chunk.id
|
||||
if blankRequestID != deleteID {
|
||||
delete(s.chunks, deleteID)
|
||||
s.log.Debugf("Create chunk %v (reused)", id)
|
||||
} else {
|
||||
s.log.Debugf("Create chunk %v (stored)", id)
|
||||
}
|
||||
s.chunks[id] = index
|
||||
chunk.item = s.stack.Push(index)
|
||||
}
|
||||
|
||||
chunk.update(id, bytes)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// fetch chunk and index by id
|
||||
func (s *Storage) fetch(id RequestID) *Chunk {
|
||||
index, exists := s.chunks[id]
|
||||
if !exists {
|
||||
return nil
|
||||
}
|
||||
chunk := s.buffers[index]
|
||||
s.stack.Touch(chunk.item)
|
||||
return chunk
|
||||
}
|
||||
Reference in New Issue
Block a user