From 0c2cff238737fa0d2cf094fb6e06f9a7ff30787f Mon Sep 17 00:00:00 2001 From: Ben Sarmiento Date: Sun, 12 Nov 2023 02:05:45 +0100 Subject: [PATCH] Zfs fixes --- Dockerfile | 42 +++- cmd/zurg/main.go | 32 +-- internal/config/types.go | 2 +- internal/torrent/manager.go | 44 ++-- internal/zfs/fs.go | 6 +- internal/zfs/mount.go | 11 +- internal/zfs/object.go | 14 +- pkg/chunk/chunk.go | 51 ----- pkg/chunk/download.go | 146 ------------- pkg/chunk/manager.go | 266 ----------------------- pkg/chunk/manager_test.go | 53 ----- pkg/chunk/stack.go | 75 ------- pkg/chunk/stack_test.go | 72 ------- pkg/chunk/storage.go | 420 ------------------------------------ 14 files changed, 79 insertions(+), 1155 deletions(-) delete mode 100644 pkg/chunk/chunk.go delete mode 100644 pkg/chunk/download.go delete mode 100644 pkg/chunk/manager.go delete mode 100644 pkg/chunk/manager_test.go delete mode 100644 pkg/chunk/stack.go delete mode 100644 pkg/chunk/stack_test.go delete mode 100644 pkg/chunk/storage.go diff --git a/Dockerfile b/Dockerfile index d8c8cd9..69cafd6 100644 --- a/Dockerfile +++ b/Dockerfile @@ -12,16 +12,46 @@ RUN CGO_ENABLED=0 GOOS=${GOOS} GOARCH=${GOARCH} go build -ldflags="-s -w" -o zur FROM alpine:3 AS obfuscator WORKDIR /app COPY --from=builder /app/zurg . -RUN apk add --no-cache upx -RUN upx --brute zurg +# RUN apk add --no-cache upx +# RUN upx --brute zurg +# Create a health check script that extracts the port from the config file +RUN echo $'#!/bin/sh\n\ +port=$(yaml read /app/config.yml port)\n\ +nc -z localhost $port || exit 1' > /app/healthcheck.sh && \ + chmod +x /app/healthcheck.sh # Final stage FROM alpine:3 WORKDIR /app -COPY --from=builder /app/zurg . -RUN apk add --no-cache fuse3 netcat-openbsd -HEALTHCHECK --interval=30s --timeout=30s --start-period=5s --retries=3 \ - CMD nc -z localhost 9999 || exit 1 +# Accept UID and GID as build arguments with default values +ARG UID=1000 +ARG GID=1000 + +# Add a group with the specified GID +RUN addgroup -g ${GID} appgroup + +# Add a user with the specified UID and add to the group +RUN adduser -u ${UID} -D -G appgroup appuser + +# Change the ownership of the /app directory to the appuser +RUN chown -R appuser:appgroup /app + +# Copy the obfuscated binary from the obfuscator stage +COPY --from=obfuscator /app/zurg . +COPY --from=obfuscator /app/healthcheck.sh . + +# Copy the rest of the application files, including the config.yml +COPY config.yml.example /app/config.yml + +# Install runtime dependencies and configure FUSE +RUN apk add --no-cache fuse3 netcat-openbsd yaml-cpp \ + && echo 'user_allow_other' >> /etc/fuse.conf + +# Use the non-root user to run the application +USER appuser + +# Use the script for the health check +HEALTHCHECK --interval=30s --timeout=30s --start-period=5s --retries=3 CMD /app/healthcheck.sh ENTRYPOINT ["./zurg"] diff --git a/cmd/zurg/main.go b/cmd/zurg/main.go index aa2be09..db9a4a0 100644 --- a/cmd/zurg/main.go +++ b/cmd/zurg/main.go @@ -6,7 +6,6 @@ import ( "net/http" "os" "os/signal" - "runtime" "syscall" "time" @@ -14,7 +13,6 @@ import ( "github.com/debridmediamanager.com/zurg/internal/net" "github.com/debridmediamanager.com/zurg/internal/torrent" "github.com/debridmediamanager.com/zurg/internal/zfs" - "github.com/debridmediamanager.com/zurg/pkg/chunk" "github.com/debridmediamanager.com/zurg/pkg/logutil" "github.com/debridmediamanager.com/zurg/pkg/realdebrid" "github.com/hashicorp/golang-lru/v2/expirable" @@ -45,28 +43,6 @@ func main() { addr := fmt.Sprintf("%s:%s", config.GetHost(), config.GetPort()) server := &http.Server{Addr: addr, Handler: mux} - mountPoint := config.GetMountPoint() - if _, err := os.Stat(mountPoint); os.IsNotExist(err) { - if err := os.Mkdir(mountPoint, 0755); err != nil { - log.Panicf("Failed to create mount point: %v", err) - } - } - - log.Debugf("Initializing chunk manager, cores: %d", runtime.NumCPU()) - // 64kb request size - chunkMgr, err := chunk.NewManager( - "", - 524288, // 512kb - 1, // 1 chunk - load ahead (1MB total) - max(runtime.NumCPU()/2, 1), // check threads - max(runtime.NumCPU()/2, 1), // load threads - runtime.NumCPU()*2, - torrentMgr, // max chunks - config) - if nil != err { - log.Panicf("Failed to initialize chunk manager: %v", err) - } - shutdown := make(chan os.Signal, 1) signal.Notify(shutdown, os.Interrupt, syscall.SIGTERM) @@ -78,9 +54,15 @@ func main() { }() // Start the mount in a goroutine with panic recovery. + mountPoint := config.GetMountPoint() + if _, err := os.Stat(mountPoint); os.IsNotExist(err) { + if err := os.Mkdir(mountPoint, 0755); err != nil { + log.Panicf("Failed to create mount point: %v", err) + } + } go func() { log.Infof("Mounting on %s", mountPoint) - if err := zfs.Mount(mountPoint, config, torrentMgr, chunkMgr); err != nil { + if err := zfs.Mount(mountPoint, config, torrentMgr); err != nil { log.Panicf("Failed to mount: %v", err) } }() diff --git a/internal/config/types.go b/internal/config/types.go index 0cdc22a..fe89ed7 100644 --- a/internal/config/types.go +++ b/internal/config/types.go @@ -71,7 +71,7 @@ func (z *ZurgConfig) GetNetworkBufferSize() int { func (z *ZurgConfig) GetMountPoint() string { if z.MountPoint == "" { - return "/mnt/zurg" + return "/app/mnt" } return z.MountPoint } diff --git a/internal/torrent/manager.go b/internal/torrent/manager.go index 3204799..3e89c82 100644 --- a/internal/torrent/manager.go +++ b/internal/torrent/manager.go @@ -18,12 +18,12 @@ import ( ) type TorrentManager struct { + config config.ConfigInterface TorrentMap *orderedmap.OrderedMap[string, *Torrent] // accessKey -> Torrent repairMap *orderedmap.OrderedMap[string, time.Time] // accessKey -> time last repaired requiredVersion string - rd *realdebrid.RealDebrid checksum string - config config.ConfigInterface + api *realdebrid.RealDebrid workerPool chan bool mu *sync.Mutex log *zap.SugaredLogger @@ -32,13 +32,13 @@ type TorrentManager struct { // NewTorrentManager creates a new torrent manager // it will fetch all torrents and their info in the background // and store them in-memory and cached in files -func NewTorrentManager(config config.ConfigInterface, rd *realdebrid.RealDebrid) *TorrentManager { +func NewTorrentManager(config config.ConfigInterface, api *realdebrid.RealDebrid) *TorrentManager { t := &TorrentManager{ + config: config, TorrentMap: orderedmap.NewOrderedMap[string, *Torrent](), repairMap: orderedmap.NewOrderedMap[string, time.Time](), requiredVersion: "10.11.2023", - rd: rd, - config: config, + api: api, workerPool: make(chan bool, config.GetNumOfWorkers()), mu: &sync.Mutex{}, log: logutil.NewLogger().Named("manager"), @@ -46,7 +46,7 @@ func NewTorrentManager(config config.ConfigInterface, rd *realdebrid.RealDebrid) t.mu.Lock() - newTorrents, _, err := t.rd.GetTorrents(0) + newTorrents, _, err := t.api.GetTorrents(0) if err != nil { t.log.Fatalf("Cannot get torrents: %v\n", err) } @@ -123,7 +123,7 @@ func (t *TorrentManager) mergeToMain(t1, t2 *Torrent) *Torrent { // proxy func (t *TorrentManager) UnrestrictUntilOk(link string) *realdebrid.UnrestrictResponse { t.workerPool <- true - ret := t.rd.UnrestrictUntilOk(link) + ret := t.api.UnrestrictUntilOk(link) <-t.workerPool return ret } @@ -141,7 +141,7 @@ func (t *TorrentManager) getChecksum() string { // GetTorrents request go func() { - torrents, totalCount, err := t.rd.GetTorrents(1) + torrents, totalCount, err := t.api.GetTorrents(1) if err != nil { errChan <- err return @@ -151,7 +151,7 @@ func (t *TorrentManager) getChecksum() string { // GetActiveTorrentCount request go func() { - count, err := t.rd.GetActiveTorrentCount() + count, err := t.api.GetActiveTorrentCount() if err != nil { errChan <- err return @@ -196,7 +196,7 @@ func (t *TorrentManager) startRefreshJob() { t.mu.Lock() - newTorrents, _, err := t.rd.GetTorrents(0) + newTorrents, _, err := t.api.GetTorrents(0) if err != nil { t.log.Warnf("Cannot get torrents: %v\n", err) continue @@ -262,13 +262,13 @@ func (t *TorrentManager) getMoreInfo(rdTorrent realdebrid.Torrent) *Torrent { var err error // file cache torrentFromFile := t.readFromFile(rdTorrent.ID) - if torrentFromFile != nil && len(torrentFromFile.ID) > 0 && len(torrentFromFile.Links) == len(rdTorrent.Links) { + if torrentFromFile != nil && len(torrentFromFile.ID) > 0 && len(torrentFromFile.Links) == len(rdTorrent.Links) && torrentFromFile.Links[0] == rdTorrent.Links[0] { // see if api data and file data still match // then it means data is still usable info = torrentFromFile } if info == nil { - info, err = t.rd.GetTorrentInfo(rdTorrent.ID) + info, err = t.api.GetTorrentInfo(rdTorrent.ID) if err != nil { t.log.Warnf("Cannot get info for id=%s: %v\n", rdTorrent.ID, err) return nil @@ -386,7 +386,7 @@ func (t *TorrentManager) getDirectories(torrent *realdebrid.TorrentInfo) []strin default: t.log.Error("Unknown config version") } - t.log.Debugf("Torrent %s is in directories %v", t.getName(torrent.Name, torrent.OriginalName), ret) + // t.log.Debugf("Torrent %s is in directories %v", t.getName(torrent.Name, torrent.OriginalName), ret) return ret } @@ -446,7 +446,7 @@ func (t *TorrentManager) organizeChaos(links []string, selectedFiles *orderedmap go func(lnk string) { defer wg.Done() t.workerPool <- true - resp := t.rd.UnrestrictUntilOk(lnk) + resp := t.api.UnrestrictUntilOk(lnk) <-t.workerPool resultsChan <- Result{Response: resp} }(link) @@ -613,7 +613,7 @@ func (t *TorrentManager) reinsertTorrent(torrent *Torrent, missingFiles string) } // redownload torrent - resp, err := t.rd.AddMagnetHash(torrent.Instances[0].Hash) + resp, err := t.api.AddMagnetHash(torrent.Instances[0].Hash) if err != nil { t.log.Warnf("Cannot redownload torrent: %v", err) return false @@ -622,25 +622,25 @@ func (t *TorrentManager) reinsertTorrent(torrent *Torrent, missingFiles string) // select files newTorrentID := resp.ID - err = t.rd.SelectTorrentFiles(newTorrentID, missingFiles) + err = t.api.SelectTorrentFiles(newTorrentID, missingFiles) if err != nil { t.log.Warnf("Cannot start redownloading: %v", err) - t.rd.DeleteTorrent(newTorrentID) + t.api.DeleteTorrent(newTorrentID) return false } time.Sleep(10 * time.Second) // see if the torrent is ready - info, err := t.rd.GetTorrentInfo(newTorrentID) + info, err := t.api.GetTorrentInfo(newTorrentID) if err != nil { t.log.Warnf("Cannot get info on redownloaded torrent id=%s : %v", newTorrentID, err) - t.rd.DeleteTorrent(newTorrentID) + t.api.DeleteTorrent(newTorrentID) return false } if info.Status == "magnet_error" || info.Status == "error" || info.Status == "virus" || info.Status == "dead" { t.log.Warnf("The redownloaded torrent id=%s is in error state: %s", newTorrentID, info.Status) - t.rd.DeleteTorrent(newTorrentID) + t.api.DeleteTorrent(newTorrentID) return false } @@ -652,7 +652,7 @@ func (t *TorrentManager) reinsertTorrent(torrent *Torrent, missingFiles string) missingCount := len(strings.Split(missingFiles, ",")) if len(info.Links) != missingCount { t.log.Infof("It did not fix the issue for id=%s, only got %d files but we need %d, undoing", info.ID, len(info.Links), missingCount) - t.rd.DeleteTorrent(newTorrentID) + t.api.DeleteTorrent(newTorrentID) return false } @@ -667,7 +667,7 @@ func (t *TorrentManager) canCapacityHandle() bool { const maxDelay = 60 * time.Second retryCount := 0 for { - count, err := t.rd.GetActiveTorrentCount() + count, err := t.api.GetActiveTorrentCount() if err != nil { t.log.Warnf("Cannot get active downloads count: %v", err) if retryCount >= maxRetries { diff --git a/internal/zfs/fs.go b/internal/zfs/fs.go index 4e6d74c..eec4eac 100644 --- a/internal/zfs/fs.go +++ b/internal/zfs/fs.go @@ -8,7 +8,6 @@ import ( "bazil.org/fuse/fs" "github.com/debridmediamanager.com/zurg/internal/config" "github.com/debridmediamanager.com/zurg/internal/torrent" - "github.com/debridmediamanager.com/zurg/pkg/chunk" "go.uber.org/zap" ) @@ -18,11 +17,10 @@ type FS struct { umask os.FileMode directIO bool lock sync.RWMutex - c config.ConfigInterface - t *torrent.TorrentManager + config config.ConfigInterface + tMgr *torrent.TorrentManager log *zap.SugaredLogger initTime time.Time - chunk *chunk.Manager } // Root returns the root path diff --git a/internal/zfs/mount.go b/internal/zfs/mount.go index 7145a99..bfe1c31 100644 --- a/internal/zfs/mount.go +++ b/internal/zfs/mount.go @@ -8,15 +8,13 @@ import ( "bazil.org/fuse/fs" "github.com/debridmediamanager.com/zurg/internal/config" "github.com/debridmediamanager.com/zurg/internal/torrent" - "github.com/debridmediamanager.com/zurg/pkg/chunk" "github.com/debridmediamanager.com/zurg/pkg/logutil" "golang.org/x/sys/unix" ) -func Mount(mountpoint string, cfg config.ConfigInterface, tMgr *torrent.TorrentManager, cMgr *chunk.Manager) error { - rlog := logutil.NewLogger() - log := rlog.Named("zfs") +func Mount(mountpoint string, cfg config.ConfigInterface, tMgr *torrent.TorrentManager) error { + log := logutil.NewLogger().Named("zfs") options := []fuse.MountOption{ fuse.AllowOther(), @@ -37,11 +35,10 @@ func Mount(mountpoint string, cfg config.ConfigInterface, tMgr *torrent.TorrentM uid: uint32(unix.Geteuid()), gid: uint32(unix.Getegid()), umask: os.FileMode(0), - c: cfg, - t: tMgr, + config: cfg, + tMgr: tMgr, log: log, initTime: time.Now(), - chunk: cMgr, } if err := srv.Serve(filesys); err != nil { diff --git a/internal/zfs/object.go b/internal/zfs/object.go index b7f61ad..4ec1d10 100644 --- a/internal/zfs/object.go +++ b/internal/zfs/object.go @@ -58,14 +58,14 @@ func (o Object) ReadDirAll(ctx context.Context) ([]fuse.Dirent, error) { dirs := []fuse.Dirent{} switch o.objType { case ROOT: - for _, directory := range o.fs.c.GetDirectories() { + for _, directory := range o.fs.config.GetDirectories() { dirs = append(dirs, fuse.Dirent{ Name: directory, Type: fuse.DT_Dir, }) } case DIRECTORY: - for el := o.fs.t.TorrentMap.Front(); el != nil; el = el.Next() { + for el := o.fs.tMgr.TorrentMap.Front(); el != nil; el = el.Next() { torrent := el.Value if torrent.InProgress { continue @@ -76,7 +76,7 @@ func (o Object) ReadDirAll(ctx context.Context) ([]fuse.Dirent, error) { }) } case TORRENT: - torrent, _ := o.fs.t.TorrentMap.Get(o.name) + torrent, _ := o.fs.tMgr.TorrentMap.Get(o.name) if torrent == nil || torrent.InProgress { return nil, syscall.ENOENT } @@ -100,7 +100,7 @@ func (o Object) ReadDirAll(ctx context.Context) ([]fuse.Dirent, error) { func (o Object) Lookup(ctx context.Context, name string) (fs.Node, error) { switch o.objType { case ROOT: - for _, directory := range o.fs.c.GetDirectories() { + for _, directory := range o.fs.config.GetDirectories() { if directory == name { return Object{ fs: o.fs, @@ -112,7 +112,7 @@ func (o Object) Lookup(ctx context.Context, name string) (fs.Node, error) { } } case DIRECTORY: - torrent, _ := o.fs.t.TorrentMap.Get(name) + torrent, _ := o.fs.tMgr.TorrentMap.Get(name) if torrent == nil { return nil, syscall.ENOENT } @@ -125,7 +125,7 @@ func (o Object) Lookup(ctx context.Context, name string) (fs.Node, error) { }, nil case TORRENT: - torrent, _ := o.fs.t.TorrentMap.Get(o.name) + torrent, _ := o.fs.tMgr.TorrentMap.Get(o.name) if torrent == nil { return nil, syscall.ENOENT } @@ -155,7 +155,7 @@ func (o Object) Open(ctx context.Context, req *fuse.OpenRequest, resp *fuse.Open // Read reads some bytes or the whole file func (o Object) Read(ctx context.Context, req *fuse.ReadRequest, resp *fuse.ReadResponse) error { - reader := universal.GetFileReader(o.torrent, o.file, req.Offset, int(req.Size), o.fs.t, o.fs.c, o.fs.log) + reader := universal.GetFileReader(o.torrent, o.file, req.Offset, int(req.Size), o.fs.tMgr, o.fs.config, o.fs.log) if reader == nil { return syscall.EIO } diff --git a/pkg/chunk/chunk.go b/pkg/chunk/chunk.go deleted file mode 100644 index bc0c964..0000000 --- a/pkg/chunk/chunk.go +++ /dev/null @@ -1,51 +0,0 @@ -package chunk - -import ( - "container/list" - "hash/crc32" -) - -// Chunk of memory -type Chunk struct { - clean bool - *chunkHeader - bytes []byte - item *list.Element -} - -type chunkHeader struct { - id RequestID - size uint32 - checksum uint32 -} - -func (c *Chunk) valid(id RequestID) bool { - if c.id != id { - return false - } - if !c.clean { - c.clean = c.checksum == c.calculateChecksum() - } - return c.clean -} - -func (c *Chunk) update(id RequestID, bytes []byte) { - c.id = id - c.size = uint32(copy(c.bytes, bytes)) - c.checksum = c.calculateChecksum() - c.clean = true -} - -func (c *Chunk) calculateChecksum() uint32 { - size := c.size - if nil == c.bytes || size == 0 { - return 0 - } - maxSize := uint32(len(c.bytes)) - if size > maxSize { - // corrupt size or truncated chunk, fix size - c.size = maxSize - return crc32.Checksum(c.bytes, crc32Table) - } - return crc32.Checksum(c.bytes[:size], crc32Table) -} diff --git a/pkg/chunk/download.go b/pkg/chunk/download.go deleted file mode 100644 index c26be05..0000000 --- a/pkg/chunk/download.go +++ /dev/null @@ -1,146 +0,0 @@ -package chunk - -import ( - "fmt" - "io" - "net/http" - "sync" - "syscall" - "time" - - "github.com/debridmediamanager.com/zurg/internal/config" - "github.com/debridmediamanager.com/zurg/internal/torrent" - "github.com/debridmediamanager.com/zurg/pkg/logutil" - "go.uber.org/zap" - "golang.org/x/sys/unix" -) - -// Downloader handles concurrent chunk downloads -type Downloader struct { - BufferSize int64 - queue chan *Request - callbacks map[RequestID][]DownloadCallback - lock sync.Mutex - storage *Storage - c config.ConfigInterface - t *torrent.TorrentManager - log *zap.SugaredLogger -} - -type DownloadCallback func(error, []byte) - -// NewDownloader creates a new download manager -func NewDownloader(threads int, storage *Storage, bufferSize int64, t *torrent.TorrentManager, c config.ConfigInterface) (*Downloader, error) { - rlog := logutil.NewLogger() - log := rlog.Named("downloader") - - manager := Downloader{ - BufferSize: bufferSize, - queue: make(chan *Request, 100), - callbacks: make(map[RequestID][]DownloadCallback, 100), - storage: storage, - c: c, - t: t, - log: log, - } - - for i := 0; i < threads; i++ { - go manager.thread(i) - } - - return &manager, nil -} - -// Download starts a new download request -func (d *Downloader) Download(req *Request, callback DownloadCallback) { - d.lock.Lock() - callbacks, exists := d.callbacks[req.id] - if nil != callback { - d.callbacks[req.id] = append(callbacks, callback) - } else if !exists { - d.callbacks[req.id] = callbacks - } - if !exists { - d.queue <- req - } - d.lock.Unlock() -} - -func (d *Downloader) thread(n int) { - buffer, err := unix.Mmap(-1, 0, int(d.BufferSize), syscall.PROT_READ|syscall.PROT_WRITE, syscall.MAP_ANON|syscall.MAP_PRIVATE) - if nil != err { - d.log.Warnf("Failed to mmap download buffer %v: %v", n, err) - buffer = make([]byte, d.BufferSize) - } - for { - req := <-d.queue - d.download(req, buffer) - } -} - -func (d *Downloader) download(req *Request, buffer []byte) { - d.log.Debugf("Starting download %v (preload: %v)", req.id, req.preload) - err := d.downloadFromAPI(req, buffer, 0) - - d.lock.Lock() - callbacks := d.callbacks[req.id] - for _, callback := range callbacks { - callback(err, buffer) - } - delete(d.callbacks, req.id) - d.lock.Unlock() - - if nil != err { - return - } - - if err := d.storage.Store(req.id, buffer); nil != err { - d.log.Warnf("Could not store chunk %v: %v", req.id, err) - } -} - -func (d *Downloader) downloadFromAPI(request *Request, buffer []byte, delay int64) error { - // sleep if request is throttled - if delay > 0 { - time.Sleep(time.Duration(delay) * time.Second) - } - - resp := d.t.UnrestrictUntilOk(request.file.Link) - if resp == nil { - return fmt.Errorf("cannot unrestrict file %s %s", request.file.Path, request.file.Link) - } - - downloadURL := resp.Download - req, err := http.NewRequest("GET", downloadURL, nil) - if nil != err { - d.log.Debugf("request init error: %v", err) - return fmt.Errorf("could not create request object %s %s from API", request.file.Path, request.file.Link) - } - req.Header.Add("Range", fmt.Sprintf("bytes=%v-%v", request.offsetStart, request.offsetEnd-1)) - - res, err := http.DefaultClient.Do(req) - if nil != err { - d.log.Debugf("request error: %v", err) - return fmt.Errorf("could not request object %s %s from API", request.file.Path, request.file.Link) - } - defer res.Body.Close() - reader := res.Body - - if res.StatusCode != 206 && res.StatusCode != 200 { - return fmt.Errorf("could not read object %s %s / StatusCode: %v", - request.file.Path, request.file.Link, res.StatusCode) - } - - if res.ContentLength == -1 { - return fmt.Errorf("missing Content-Length header in response") - } - - n, err := io.ReadFull(reader, buffer[:res.ContentLength:cap(buffer)]) - if nil != err && err != io.ErrUnexpectedEOF { - d.log.Debugf("response read error: %v", err) - return fmt.Errorf("could not read objects %s %s API response", request.file.Path, request.file.Link) - } - d.log.Debugf("Downloaded %v bytes of %s %s", n, request.file.Path, request.file.Link) - - return nil -} diff --git a/pkg/chunk/manager.go b/pkg/chunk/manager.go deleted file mode 100644 index 00011a4..0000000 --- a/pkg/chunk/manager.go +++ /dev/null @@ -1,266 +0,0 @@ -package chunk - -import ( - "crypto/sha256" - "encoding/binary" - "fmt" - "os" - - "github.com/debridmediamanager.com/zurg/internal/config" - "github.com/debridmediamanager.com/zurg/internal/torrent" -) - -// Manager manages chunks on disk -type Manager struct { - ChunkSize int64 - LoadAhead int - downloader *Downloader - storage *Storage - queue chan *QueueEntry -} - -type QueueEntry struct { - request *Request - response chan Response -} - -// RequestID is the binary identifier for a chunk request -type RequestID [24]byte - -func (id RequestID) String() string { - return fmt.Sprintf("%032x:%v", id[:16], binary.BigEndian.Uint64(id[16:])) -} - -// Request represents a chunk request -type Request struct { - id RequestID - file *torrent.File - offsetStart int64 - offsetEnd int64 - chunkOffset int64 - chunkOffsetEnd int64 - sequence int - preload bool -} - -// Response represetns a chunk response -type Response struct { - Sequence int - Error error - Bytes []byte -} - -// NewManager creates a new chunk manager -func NewManager( - chunkFile string, - chunkSize int64, - loadAhead, - checkThreads, - loadThreads, - maxChunks int, - t *torrent.TorrentManager, - c config.ConfigInterface) (*Manager, error) { - - pageSize := int64(os.Getpagesize()) - if chunkSize < pageSize { - return nil, fmt.Errorf("chunk size must not be < %v", pageSize) - } - if chunkSize%pageSize != 0 { - return nil, fmt.Errorf("chunk size must be divideable by %v", pageSize) - } - // 32-Bit: ~2GB / 64-Bit: ~8EB - maxMmapSize := int64(^uint(0) >> 1) - if chunkSize > maxMmapSize { - return nil, fmt.Errorf("chunk size must be < %v", maxMmapSize) - } - if maxChunks < 2 || maxChunks < loadAhead { - return nil, fmt.Errorf("max-chunks must be greater than 2 and bigger than the load ahead value") - } - - storage, err := NewStorage(chunkSize, maxChunks, maxMmapSize, chunkFile) - if nil != err { - return nil, err - } - - downloader, err := NewDownloader(loadThreads, storage, chunkSize, t, c) - if nil != err { - return nil, err - } - - manager := Manager{ - ChunkSize: chunkSize, - LoadAhead: loadAhead, - downloader: downloader, - storage: storage, - queue: make(chan *QueueEntry, 100), - } - - if err := manager.storage.Clear(); nil != err { - return nil, err - } - - for i := 0; i < checkThreads; i++ { - go manager.thread() - } - - return &manager, nil -} - -// GetChunk loads one chunk and starts the preload for the next chunks -func (m *Manager) GetChunk(file *torrent.File, offset, size int64) ([]byte, error) { - maxOffset := file.Bytes - if offset > maxOffset { - return nil, fmt.Errorf("tried to read past EOF of %v at offset %v", file.ID, offset) - } - // Log.Infof("Request %v:%v md5:%v", object.ID, offset, object.MD5Checksum) - if offset+size > maxOffset { - size = file.Bytes - offset - } - - ranges := splitChunkRanges(offset, size, m.ChunkSize) - numRanges := len(ranges) - responses := make(chan Response, numRanges) - - last := numRanges - 1 - for i, r := range ranges { - m.requestChunk(file, r.offset, r.size, i, i == last, responses) - } - - data := make([]byte, size) - for i := 0; i < cap(responses); i++ { - res := <-responses - if nil != res.Error { - return nil, res.Error - } - - dataOffset := ranges[res.Sequence].offset - offset - - if n := copy(data[dataOffset:], res.Bytes); n == 0 { - return nil, fmt.Errorf("request %v slice %v has empty response", file.ID, res.Sequence) - } - } - close(responses) - - return data, nil -} - -func buildRequestID(object *torrent.File, offset int64) (id RequestID) { - fileID := object.Link - if fileID == "" { - fileID = object.Path - } - hash := sha256.Sum256([]byte(fileID)) - copy(id[:16], hash[:16]) - binary.BigEndian.PutUint64(id[16:], uint64(offset)) - return -} - -func (m *Manager) requestChunk(object *torrent.File, offset, size int64, sequence int, preload bool, response chan Response) { - chunkOffset := offset % m.ChunkSize - offsetStart := offset - chunkOffset - offsetEnd := offsetStart + m.ChunkSize - - request := &Request{ - id: buildRequestID(object, offsetStart), - file: object, - offsetStart: offsetStart, - offsetEnd: offsetEnd, - chunkOffset: chunkOffset, - chunkOffsetEnd: chunkOffset + size, - sequence: sequence, - preload: false, - } - - m.queue <- &QueueEntry{ - request: request, - response: response, - } - - if !preload { - return - } - - for i := m.ChunkSize; i < (m.ChunkSize * int64(m.LoadAhead+1)); i += m.ChunkSize { - aheadOffsetStart := offsetStart + i - aheadOffsetEnd := aheadOffsetStart + m.ChunkSize - if uint64(aheadOffsetStart) < uint64(object.Bytes) && uint64(aheadOffsetEnd) < uint64(object.Bytes) { - request := &Request{ - id: buildRequestID(object, aheadOffsetStart), - file: object, - offsetStart: aheadOffsetStart, - offsetEnd: aheadOffsetEnd, - preload: true, - } - m.queue <- &QueueEntry{ - request: request, - } - } - } -} - -type byteRange struct { - offset, size int64 -} - -// Calculate request ranges that span multiple chunks -// -// This can happen with Direct-IO and unaligned reads or -// if the size is bigger than the chunk size. -func splitChunkRanges(offset, size, chunkSize int64) []byteRange { - ranges := make([]byteRange, 0, size/chunkSize+2) - for remaining := size; remaining > 0; remaining -= size { - size = min(remaining, chunkSize-offset%chunkSize) - ranges = append(ranges, byteRange{offset, size}) - offset += size - } - return ranges -} - -func (m *Manager) thread() { - for { - queueEntry := <-m.queue - m.checkChunk(queueEntry.request, queueEntry.response) - } -} - -func (m *Manager) checkChunk(req *Request, response chan Response) { - if nil == response { - if nil == m.storage.Load(req.id) { - m.downloader.Download(req, nil) - } - return - } - - if bytes := m.storage.Load(req.id); nil != bytes { - response <- Response{ - Sequence: req.sequence, - Bytes: adjustResponseChunk(req, bytes), - } - return - } - - m.downloader.Download(req, func(err error, bytes []byte) { - response <- Response{ - Sequence: req.sequence, - Error: err, - Bytes: adjustResponseChunk(req, bytes), - } - }) -} - -func adjustResponseChunk(req *Request, bytes []byte) []byte { - if nil == bytes { - return nil - } - bytesLen := int64(len(bytes)) - sOffset := min(req.chunkOffset, bytesLen) - eOffset := min(req.chunkOffsetEnd, bytesLen) - return bytes[sOffset:eOffset] -} - -func min(x, y int64) int64 { - if x < y { - return x - } - return y -} diff --git a/pkg/chunk/manager_test.go b/pkg/chunk/manager_test.go deleted file mode 100644 index ceb78e5..0000000 --- a/pkg/chunk/manager_test.go +++ /dev/null @@ -1,53 +0,0 @@ -package chunk - -import "testing" - -func TestSplitChunkRanges(t *testing.T) { - testcases := []struct { - offset, size, chunkSize int64 - result []byteRange - }{ - {0, 0, 4096, []byteRange{}}, - {0, 4096, 4096, []byteRange{ - {0, 4096}, - }}, - {4095, 4096, 4096, []byteRange{ - {4095, 1}, - {4096, 4095}, - }}, - {0, 8192, 4096, []byteRange{ - {0, 4096}, - {4096, 4096}, - }}, - {2048, 8192, 4096, []byteRange{ - {2048, 2048}, - {4096, 4096}, - {8192, 2048}, - }}, - {2048, 8192, 4096, []byteRange{ - {2048, 2048}, - {4096, 4096}, - {8192, 2048}, - }}, - {17960960, 16777216, 10485760, []byteRange{ - {17960960, 3010560}, - {20971520, 10485760}, - {31457280, 3280896}, - }}, - } - for i, tc := range testcases { - ranges := splitChunkRanges(tc.offset, tc.size, tc.chunkSize) - actualSize := len(ranges) - expectedSize := len(tc.result) - if actualSize != expectedSize { - t.Fatalf("ByteRange %v length mismatch: %v != %v", i, actualSize, expectedSize) - } - for j, r := range ranges { - actual := r - expected := tc.result[j] - if actual != expected { - t.Fatalf("ByteRange %v mismatch: %v != %v", i, actual, expected) - } - } - } -} diff --git a/pkg/chunk/stack.go b/pkg/chunk/stack.go deleted file mode 100644 index 8955306..0000000 --- a/pkg/chunk/stack.go +++ /dev/null @@ -1,75 +0,0 @@ -package chunk - -import ( - "container/list" - "sync" -) - -// Stack is a thread safe list/stack implementation -type Stack struct { - items *list.List - lock sync.Mutex - maxSize int -} - -// NewStack creates a new stack -func NewStack(maxChunks int) *Stack { - return &Stack{ - items: list.New(), - maxSize: maxChunks, - } -} - -// Len gets the number of items on the stack -func (s *Stack) Len() int { - s.lock.Lock() - defer s.lock.Unlock() - return s.items.Len() -} - -// Pop pops the first item from the stack -func (s *Stack) Pop() int { - s.lock.Lock() - defer s.lock.Unlock() - if s.items.Len() < s.maxSize { - return -1 - } - item := s.items.Front() - if nil == item { - return -1 - } - s.items.Remove(item) - return item.Value.(int) -} - -// Touch moves the specified item to the last position of the stack -func (s *Stack) Touch(item *list.Element) { - s.lock.Lock() - if item != s.items.Back() { - s.items.MoveToBack(item) - } - s.lock.Unlock() -} - -// Push adds a new item to the last position of the stack -func (s *Stack) Push(id int) *list.Element { - s.lock.Lock() - defer s.lock.Unlock() - return s.items.PushBack(id) -} - -// Prepend adds a list to the front of the stack -func (s *Stack) Prepend(items *list.List) { - s.lock.Lock() - s.items.PushFrontList(items) - s.lock.Unlock() -} - -// Purge an item from the stack -func (s *Stack) Purge(item *list.Element) { - s.lock.Lock() - defer s.lock.Unlock() - if item != s.items.Front() { - s.items.MoveToFront(item) - } -} diff --git a/pkg/chunk/stack_test.go b/pkg/chunk/stack_test.go deleted file mode 100644 index 023bd70..0000000 --- a/pkg/chunk/stack_test.go +++ /dev/null @@ -1,72 +0,0 @@ -package chunk - -import "testing" - -func TestOOB(t *testing.T) { - stack := NewStack(1) - - item := stack.Push(1) - stack.Touch(item) -} - -func TestAddToStack(t *testing.T) { - stack := NewStack(1) - - item1 := stack.Push(1) - item2 := stack.Push(2) - item3 := stack.Push(3) - item4 := stack.Push(4) - - stack.Touch(item1) - stack.Touch(item3) - - stack.Purge(item2) - stack.Purge(item4) - - v := stack.Pop() - if v != 4 { - t.Fatalf("Expected 4 got %v", v) - } - - v = stack.Pop() - if v != 2 { - t.Fatalf("Expected 2 got %v", v) - } - - v = stack.Pop() - if v != 1 { - t.Fatalf("Expected 1 got %v", v) - } - - v = stack.Pop() - if v != 3 { - t.Fatalf("Expected 3 got %v", v) - } - - v = stack.Pop() - if v != -1 { - t.Fatalf("Expected -1 got %v", v) - } -} - -func TestLen(t *testing.T) { - stack := NewStack(1) - - v := stack.Len() - if v != 0 { - t.Fatalf("Expected 0 got %v", v) - } - - stack.Push(1) - v = stack.Len() - if v != 1 { - t.Fatalf("Expected 1 got %v", v) - } - - _ = stack.Pop() - v = stack.Len() - if v != 0 { - t.Fatalf("Expected 0 got %v", v) - } - -} diff --git a/pkg/chunk/storage.go b/pkg/chunk/storage.go deleted file mode 100644 index 264dcc8..0000000 --- a/pkg/chunk/storage.go +++ /dev/null @@ -1,420 +0,0 @@ -package chunk - -import ( - "container/list" - "fmt" - "hash/crc32" - "os" - "os/signal" - "sync" - "syscall" - "time" - "unsafe" - - "go.uber.org/zap" - "golang.org/x/sys/unix" - - "github.com/debridmediamanager.com/zurg/pkg/logutil" -) - -const ( - headerSize = int(unsafe.Sizeof(*new(chunkHeader))) - tocSize = int64(unsafe.Sizeof(*new(journalHeader))) - journalMagic = uint16('P'<<8 | 'D'&0xFF) - journalVersion = uint8(2) -) - -var ( - blankRequestID RequestID - crc32Table = crc32.MakeTable(crc32.Castagnoli) -) - -// Storage is a chunk storage -type Storage struct { - ChunkFile *os.File - ChunkSize int64 - HeaderSize int64 - MaxChunks int - chunks map[RequestID]int - stack *Stack - lock sync.RWMutex - buffers []*Chunk - loadChunks int - signals chan os.Signal - journal []byte - mmapRegions [][]byte - chunksPerRegion int64 - log *zap.SugaredLogger -} - -type journalHeader struct { - magic uint16 - version uint8 - headerSize uint8 - maxChunks uint32 - chunkSize uint32 - checksum uint32 -} - -// NewStorage creates a new storage -func NewStorage(chunkSize int64, maxChunks int, maxMmapSize int64, chunkFilePath string) (*Storage, error) { - rlog := logutil.NewLogger() - log := rlog.Named("storage") - - s := Storage{ - ChunkSize: chunkSize, - MaxChunks: maxChunks, - chunks: make(map[RequestID]int, maxChunks), - stack: NewStack(maxChunks), - buffers: make([]*Chunk, maxChunks), - signals: make(chan os.Signal, 1), - log: log, - } - - journalSize := tocSize + int64(headerSize*maxChunks) - journalOffset := chunkSize * int64(maxChunks) - - // Non-empty string in chunkFilePath enables MMAP disk storage for chunks - if chunkFilePath != "" { - chunkFile, err := os.OpenFile(chunkFilePath, os.O_RDWR|os.O_CREATE, 0600) - if nil != err { - s.log.Debugf("%v", err) - return nil, fmt.Errorf("could not open chunk cache file") - } - s.ChunkFile = chunkFile - currentSize, err := chunkFile.Seek(0, os.SEEK_END) - if nil != err { - s.log.Debugf("%v", err) - return nil, fmt.Errorf("chunk file is not seekable") - } - wantedSize := journalOffset + journalSize - s.log.Debugf("Current chunk cache file size: %v B (wanted: %v B)", currentSize, wantedSize) - if err := chunkFile.Truncate(currentSize); nil != err { - s.log.Warnf("Could not truncate chunk cache, skip resizing") - } else if currentSize != wantedSize { - if currentSize > tocSize { - err = s.relocateJournal(currentSize, wantedSize, journalSize, journalOffset) - if nil != err { - s.log.Errorf("%v", err) - } else { - s.log.Infof("Relocated chunk cache journal") - } - } - if err := chunkFile.Truncate(wantedSize); nil != err { - s.log.Debugf("%v", err) - return nil, fmt.Errorf("could not resize chunk cache file") - } - } - s.log.Infof("Created chunk cache file %v", chunkFile.Name()) - s.loadChunks = int(min(currentSize/chunkSize, int64(maxChunks))) - } - - // Alocate journal - if journal, err := s.mmap(journalOffset, journalSize); nil != err { - return nil, fmt.Errorf("could not allocate journal: %v", err) - } else { - if err := unix.Madvise(journal, syscall.MADV_RANDOM); nil != err { - s.log.Warnf("Madvise MADV_RANDOM for journal failed: %v", err) - } - tocOffset := journalSize - tocSize - header := journal[tocOffset:] - if valid := s.checkJournal(header, false); !valid { - s.initJournal(header) - } - s.journal = journal[:tocOffset] - } - - // Setup sighandler - signal.Notify(s.signals, syscall.SIGINT, syscall.SIGTERM) - - // Allocate mmap regions for chunks - if err := s.allocateMmapRegions(maxMmapSize); nil != err { - return nil, err - } - // Map chunks to slices from mmap regions - if err := s.mmapChunks(); nil != err { - return nil, err - } - - return &s, nil -} - -// relocateJournal moves existing journal prior to resize -func (s *Storage) relocateJournal(currentSize, wantedSize, journalSize, journalOffset int64) error { - header := make([]byte, tocSize) - if _, err := s.ChunkFile.ReadAt(header, currentSize-tocSize); nil != err { - return fmt.Errorf("failed to read journal header: %v", err) - } - - if valid := s.checkJournal(header, true); !valid { - return fmt.Errorf("failed to validate journal header") - } - - h := (*journalHeader)(unsafe.Pointer(&header[0])) - oldJournalOffset := s.ChunkSize * int64(h.maxChunks) - oldJournalSize := min(journalSize, currentSize-oldJournalOffset) - tocSize - journal := make([]byte, journalSize) - - if _, err := s.ChunkFile.ReadAt(journal[:oldJournalSize], oldJournalOffset); nil != err { - return fmt.Errorf("failed to read journal: %v", err) - } - - s.initJournal(header) - - sizeWithoutJournal := currentSize - oldJournalSize - tocSize - if err := s.ChunkFile.Truncate(sizeWithoutJournal); nil != err { - return fmt.Errorf("could not truncate chunk cache journal: %v", err) - } - - if err := s.ChunkFile.Truncate(wantedSize); nil != err { - return fmt.Errorf("could not resize chunk cache file: %v", err) - } - - if _, err := s.ChunkFile.WriteAt(journal, journalOffset); nil != err { - return fmt.Errorf("failed to write journal: %v", err) - } - if _, err := s.ChunkFile.WriteAt(header, wantedSize-tocSize); nil != err { - return fmt.Errorf("failed to write journal header: %v", err) - } - return nil -} - -// checkJournal verifies the journal header -func (s *Storage) checkJournal(journal []byte, skipMaxChunks bool) bool { - h := (*journalHeader)(unsafe.Pointer(&journal[0])) - // check magic bytes / endianess mismatch ('PD' vs 'DP') - if h.magic != journalMagic { - s.log.Debugf("Journal magic mismatch: %v != %v", h.magic, journalMagic) - return false - } - checksum := crc32.Checksum(journal[:12], crc32Table) - if h.checksum != checksum { - s.log.Debugf("Journal checksum mismatch: %08X != %08X", h.checksum, checksum) - return false - } - if h.version != journalVersion { - s.log.Debugf("Journal version mismatch: %v != %v", h.version, journalVersion) - return false - } - if h.headerSize != uint8(headerSize) { - s.log.Debugf("Journal chunk header size mismatch: %v != %v", h.headerSize, headerSize) - return false - } - if !skipMaxChunks && h.maxChunks != uint32(s.MaxChunks) { - s.log.Debugf("Journal max chunks mismatch: %v != %v", h.maxChunks, s.MaxChunks) - return false - } - if h.chunkSize != uint32(s.ChunkSize) { - s.log.Debugf("Journal chunk size mismatch: %v != %v", h.chunkSize, s.ChunkSize) - return false - } - s.log.Debug("Journal is valid") - return true -} - -// initJournal initializes the journal -func (s *Storage) initJournal(journal []byte) { - h := (*journalHeader)(unsafe.Pointer(&journal[0])) - h.magic = journalMagic - h.version = journalVersion - h.headerSize = uint8(headerSize) - h.maxChunks = uint32(s.MaxChunks) - h.chunkSize = uint32(s.ChunkSize) - h.checksum = crc32.Checksum(journal[:12], crc32Table) -} - -// allocateMmapRegions creates memory mappings to fit all chunks -func (s *Storage) allocateMmapRegions(maxMmapSize int64) error { - s.chunksPerRegion = maxMmapSize / s.ChunkSize - regionSize := s.chunksPerRegion * s.ChunkSize - numRegions := int64(s.MaxChunks) / s.chunksPerRegion - remChunks := int64(s.MaxChunks) % s.chunksPerRegion - if remChunks != 0 { - numRegions++ - } - s.mmapRegions = make([][]byte, numRegions) - for i := int64(0); i < int64(len(s.mmapRegions)); i++ { - size := regionSize - if i == numRegions-1 && remChunks != 0 { - size = remChunks * s.ChunkSize - } - s.log.Debugf("Allocate mmap region %v/%v with size %v B", i+1, numRegions, size) - region, err := s.mmap(i*regionSize, size) - if nil != err { - s.log.Errorf("failed to mmap region %v/%v with size %v B", i+1, numRegions, size) - return err - } - if err := unix.Madvise(region, syscall.MADV_SEQUENTIAL); nil != err { - s.log.Warnf("Madvise MADV_SEQUENTIAL for region %v/%v failed: %v", i+1, numRegions, err) - } - s.mmapRegions[i] = region - } - return nil -} - -// mmapChunks slices buffers from mmap regions and loads chunk metadata -func (s *Storage) mmapChunks() error { - start := time.Now() - empty := list.New() - restored := list.New() - loadedChunks := 0 - for i := 0; i < s.MaxChunks; i++ { - select { - case sig := <-s.signals: - s.log.Warnf("Received signal %v, aborting chunk loader", sig) - return fmt.Errorf("aborted by signal") - default: - if loaded, err := s.initChunk(i, empty, restored); nil != err { - s.log.Errorf("failed to allocate chunk %v: %v", i, err) - return fmt.Errorf("failed to initialize chunks") - } else if loaded { - loadedChunks++ - } - } - } - s.stack.Prepend(restored) - s.stack.Prepend(empty) - elapsed := time.Since(start) - if nil != s.ChunkFile { - s.log.Infof("Loaded %v/%v cache chunks in %v", loadedChunks, s.MaxChunks, elapsed) - } else { - s.log.Infof("Allocated %v cache chunks in %v", s.MaxChunks, elapsed) - } - return nil -} - -// initChunk tries to restore a chunk from disk -func (s *Storage) initChunk(index int, empty *list.List, restored *list.List) (bool, error) { - chunk, err := s.allocateChunk(index) - if err != nil { - s.log.Debugf("%v", err) - return false, err - } - - s.buffers[index] = chunk - - id := chunk.id - - if blankRequestID == id || index >= s.loadChunks { - chunk.item = empty.PushBack(index) - // s.log.Tracef("Allocate chunk %v/%v", index+1, s.MaxChunks) - return false, nil - } - - chunk.item = restored.PushBack(index) - // s.log.Tracef("Load chunk %v/%v (restored: %v)", index+1, s.MaxChunks, id) - s.chunks[id] = index - - return true, nil -} - -// allocateChunk creates a new mmap-backed chunk -func (s *Storage) allocateChunk(index int) (*Chunk, error) { - region := int64(index) / s.chunksPerRegion - offset := (int64(index) - region*s.chunksPerRegion) * s.ChunkSize - // s.log.Tracef("Allocate chunk %v from region %v at offset %v", index+1, region, offset) - bytes := s.mmapRegions[region][offset : offset+s.ChunkSize : offset+s.ChunkSize] - headerOffset := index * headerSize - header := (*chunkHeader)(unsafe.Pointer(&s.journal[headerOffset])) - chunk := Chunk{ - chunkHeader: header, - bytes: bytes, - } - return &chunk, nil -} - -func (s *Storage) mmap(offset, size int64) ([]byte, error) { - if s.ChunkFile != nil { - return unix.Mmap(int(s.ChunkFile.Fd()), offset, int(size), syscall.PROT_READ|syscall.PROT_WRITE, syscall.MAP_SHARED) - } else { - return unix.Mmap(-1, 0, int(size), syscall.PROT_READ|syscall.PROT_WRITE, syscall.MAP_ANON|syscall.MAP_PRIVATE) - } -} - -// Clear removes all old chunks on disk (will be called on each program start) -func (s *Storage) Clear() error { - return nil -} - -// Load a chunk from ram or creates it -func (s *Storage) Load(id RequestID) []byte { - s.lock.RLock() - chunk := s.fetch(id) - if nil == chunk { - // s.log.Tracef("Load chunk %v (missing)", id) - s.lock.RUnlock() - return nil - } - if chunk.clean { - // s.log.Tracef("Load chunk %v (clean)", id) - defer s.lock.RUnlock() - return chunk.bytes - } - s.lock.RUnlock() - // Switch to write lock to avoid races on crc verification - s.lock.Lock() - defer s.lock.Unlock() - if chunk.valid(id) { - s.log.Debugf("Load chunk %v (verified)", id) - return chunk.bytes - } - s.log.Warnf("Load chunk %v (bad checksum: %08x <> %08x)", id, chunk.checksum, chunk.calculateChecksum()) - s.stack.Purge(chunk.item) - return nil -} - -// Store stores a chunk in the RAM and adds it to the disk storage queue -func (s *Storage) Store(id RequestID, bytes []byte) (err error) { - s.lock.RLock() - - // Avoid storing same chunk multiple times - chunk := s.fetch(id) - if nil != chunk && chunk.clean { - // s.log.Tracef("Create chunk %v (exists: clean)", id) - s.lock.RUnlock() - return nil - } - - s.lock.RUnlock() - s.lock.Lock() - defer s.lock.Unlock() - - if nil != chunk { - if chunk.valid(id) { - s.log.Debugf("Create chunk %v (exists: valid)", id) - return nil - } - s.log.Warnf("Create chunk %v(exists: overwrite)", id) - } else { - index := s.stack.Pop() - if index == -1 { - s.log.Debugf("Create chunk %v (failed)", id) - return fmt.Errorf("no buffers available") - } - chunk = s.buffers[index] - deleteID := chunk.id - if blankRequestID != deleteID { - delete(s.chunks, deleteID) - s.log.Debugf("Create chunk %v (reused)", id) - } else { - s.log.Debugf("Create chunk %v (stored)", id) - } - s.chunks[id] = index - chunk.item = s.stack.Push(index) - } - - chunk.update(id, bytes) - - return nil -} - -// fetch chunk and index by id -func (s *Storage) fetch(id RequestID) *Chunk { - index, exists := s.chunks[id] - if !exists { - return nil - } - chunk := s.buffers[index] - s.stack.Touch(chunk.item) - return chunk -}