Zfs fixes
This commit is contained in:
42
Dockerfile
42
Dockerfile
@@ -12,16 +12,46 @@ RUN CGO_ENABLED=0 GOOS=${GOOS} GOARCH=${GOARCH} go build -ldflags="-s -w" -o zur
|
|||||||
FROM alpine:3 AS obfuscator
|
FROM alpine:3 AS obfuscator
|
||||||
WORKDIR /app
|
WORKDIR /app
|
||||||
COPY --from=builder /app/zurg .
|
COPY --from=builder /app/zurg .
|
||||||
RUN apk add --no-cache upx
|
# RUN apk add --no-cache upx
|
||||||
RUN upx --brute zurg
|
# RUN upx --brute zurg
|
||||||
|
# Create a health check script that extracts the port from the config file
|
||||||
|
RUN echo $'#!/bin/sh\n\
|
||||||
|
port=$(yaml read /app/config.yml port)\n\
|
||||||
|
nc -z localhost $port || exit 1' > /app/healthcheck.sh && \
|
||||||
|
chmod +x /app/healthcheck.sh
|
||||||
|
|
||||||
# Final stage
|
# Final stage
|
||||||
FROM alpine:3
|
FROM alpine:3
|
||||||
WORKDIR /app
|
WORKDIR /app
|
||||||
COPY --from=builder /app/zurg .
|
|
||||||
RUN apk add --no-cache fuse3 netcat-openbsd
|
|
||||||
|
|
||||||
HEALTHCHECK --interval=30s --timeout=30s --start-period=5s --retries=3 \
|
# Accept UID and GID as build arguments with default values
|
||||||
CMD nc -z localhost 9999 || exit 1
|
ARG UID=1000
|
||||||
|
ARG GID=1000
|
||||||
|
|
||||||
|
# Add a group with the specified GID
|
||||||
|
RUN addgroup -g ${GID} appgroup
|
||||||
|
|
||||||
|
# Add a user with the specified UID and add to the group
|
||||||
|
RUN adduser -u ${UID} -D -G appgroup appuser
|
||||||
|
|
||||||
|
# Change the ownership of the /app directory to the appuser
|
||||||
|
RUN chown -R appuser:appgroup /app
|
||||||
|
|
||||||
|
# Copy the obfuscated binary from the obfuscator stage
|
||||||
|
COPY --from=obfuscator /app/zurg .
|
||||||
|
COPY --from=obfuscator /app/healthcheck.sh .
|
||||||
|
|
||||||
|
# Copy the rest of the application files, including the config.yml
|
||||||
|
COPY config.yml.example /app/config.yml
|
||||||
|
|
||||||
|
# Install runtime dependencies and configure FUSE
|
||||||
|
RUN apk add --no-cache fuse3 netcat-openbsd yaml-cpp \
|
||||||
|
&& echo 'user_allow_other' >> /etc/fuse.conf
|
||||||
|
|
||||||
|
# Use the non-root user to run the application
|
||||||
|
USER appuser
|
||||||
|
|
||||||
|
# Use the script for the health check
|
||||||
|
HEALTHCHECK --interval=30s --timeout=30s --start-period=5s --retries=3 CMD /app/healthcheck.sh
|
||||||
|
|
||||||
ENTRYPOINT ["./zurg"]
|
ENTRYPOINT ["./zurg"]
|
||||||
|
|||||||
@@ -6,7 +6,6 @@ import (
|
|||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
"os/signal"
|
"os/signal"
|
||||||
"runtime"
|
|
||||||
"syscall"
|
"syscall"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@@ -14,7 +13,6 @@ import (
|
|||||||
"github.com/debridmediamanager.com/zurg/internal/net"
|
"github.com/debridmediamanager.com/zurg/internal/net"
|
||||||
"github.com/debridmediamanager.com/zurg/internal/torrent"
|
"github.com/debridmediamanager.com/zurg/internal/torrent"
|
||||||
"github.com/debridmediamanager.com/zurg/internal/zfs"
|
"github.com/debridmediamanager.com/zurg/internal/zfs"
|
||||||
"github.com/debridmediamanager.com/zurg/pkg/chunk"
|
|
||||||
"github.com/debridmediamanager.com/zurg/pkg/logutil"
|
"github.com/debridmediamanager.com/zurg/pkg/logutil"
|
||||||
"github.com/debridmediamanager.com/zurg/pkg/realdebrid"
|
"github.com/debridmediamanager.com/zurg/pkg/realdebrid"
|
||||||
"github.com/hashicorp/golang-lru/v2/expirable"
|
"github.com/hashicorp/golang-lru/v2/expirable"
|
||||||
@@ -45,28 +43,6 @@ func main() {
|
|||||||
addr := fmt.Sprintf("%s:%s", config.GetHost(), config.GetPort())
|
addr := fmt.Sprintf("%s:%s", config.GetHost(), config.GetPort())
|
||||||
server := &http.Server{Addr: addr, Handler: mux}
|
server := &http.Server{Addr: addr, Handler: mux}
|
||||||
|
|
||||||
mountPoint := config.GetMountPoint()
|
|
||||||
if _, err := os.Stat(mountPoint); os.IsNotExist(err) {
|
|
||||||
if err := os.Mkdir(mountPoint, 0755); err != nil {
|
|
||||||
log.Panicf("Failed to create mount point: %v", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Debugf("Initializing chunk manager, cores: %d", runtime.NumCPU())
|
|
||||||
// 64kb request size
|
|
||||||
chunkMgr, err := chunk.NewManager(
|
|
||||||
"",
|
|
||||||
524288, // 512kb
|
|
||||||
1, // 1 chunk - load ahead (1MB total)
|
|
||||||
max(runtime.NumCPU()/2, 1), // check threads
|
|
||||||
max(runtime.NumCPU()/2, 1), // load threads
|
|
||||||
runtime.NumCPU()*2,
|
|
||||||
torrentMgr, // max chunks
|
|
||||||
config)
|
|
||||||
if nil != err {
|
|
||||||
log.Panicf("Failed to initialize chunk manager: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
shutdown := make(chan os.Signal, 1)
|
shutdown := make(chan os.Signal, 1)
|
||||||
signal.Notify(shutdown, os.Interrupt, syscall.SIGTERM)
|
signal.Notify(shutdown, os.Interrupt, syscall.SIGTERM)
|
||||||
|
|
||||||
@@ -78,9 +54,15 @@ func main() {
|
|||||||
}()
|
}()
|
||||||
|
|
||||||
// Start the mount in a goroutine with panic recovery.
|
// Start the mount in a goroutine with panic recovery.
|
||||||
|
mountPoint := config.GetMountPoint()
|
||||||
|
if _, err := os.Stat(mountPoint); os.IsNotExist(err) {
|
||||||
|
if err := os.Mkdir(mountPoint, 0755); err != nil {
|
||||||
|
log.Panicf("Failed to create mount point: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
go func() {
|
go func() {
|
||||||
log.Infof("Mounting on %s", mountPoint)
|
log.Infof("Mounting on %s", mountPoint)
|
||||||
if err := zfs.Mount(mountPoint, config, torrentMgr, chunkMgr); err != nil {
|
if err := zfs.Mount(mountPoint, config, torrentMgr); err != nil {
|
||||||
log.Panicf("Failed to mount: %v", err)
|
log.Panicf("Failed to mount: %v", err)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|||||||
@@ -71,7 +71,7 @@ func (z *ZurgConfig) GetNetworkBufferSize() int {
|
|||||||
|
|
||||||
func (z *ZurgConfig) GetMountPoint() string {
|
func (z *ZurgConfig) GetMountPoint() string {
|
||||||
if z.MountPoint == "" {
|
if z.MountPoint == "" {
|
||||||
return "/mnt/zurg"
|
return "/app/mnt"
|
||||||
}
|
}
|
||||||
return z.MountPoint
|
return z.MountPoint
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -18,12 +18,12 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type TorrentManager struct {
|
type TorrentManager struct {
|
||||||
|
config config.ConfigInterface
|
||||||
TorrentMap *orderedmap.OrderedMap[string, *Torrent] // accessKey -> Torrent
|
TorrentMap *orderedmap.OrderedMap[string, *Torrent] // accessKey -> Torrent
|
||||||
repairMap *orderedmap.OrderedMap[string, time.Time] // accessKey -> time last repaired
|
repairMap *orderedmap.OrderedMap[string, time.Time] // accessKey -> time last repaired
|
||||||
requiredVersion string
|
requiredVersion string
|
||||||
rd *realdebrid.RealDebrid
|
|
||||||
checksum string
|
checksum string
|
||||||
config config.ConfigInterface
|
api *realdebrid.RealDebrid
|
||||||
workerPool chan bool
|
workerPool chan bool
|
||||||
mu *sync.Mutex
|
mu *sync.Mutex
|
||||||
log *zap.SugaredLogger
|
log *zap.SugaredLogger
|
||||||
@@ -32,13 +32,13 @@ type TorrentManager struct {
|
|||||||
// NewTorrentManager creates a new torrent manager
|
// NewTorrentManager creates a new torrent manager
|
||||||
// it will fetch all torrents and their info in the background
|
// it will fetch all torrents and their info in the background
|
||||||
// and store them in-memory and cached in files
|
// and store them in-memory and cached in files
|
||||||
func NewTorrentManager(config config.ConfigInterface, rd *realdebrid.RealDebrid) *TorrentManager {
|
func NewTorrentManager(config config.ConfigInterface, api *realdebrid.RealDebrid) *TorrentManager {
|
||||||
t := &TorrentManager{
|
t := &TorrentManager{
|
||||||
|
config: config,
|
||||||
TorrentMap: orderedmap.NewOrderedMap[string, *Torrent](),
|
TorrentMap: orderedmap.NewOrderedMap[string, *Torrent](),
|
||||||
repairMap: orderedmap.NewOrderedMap[string, time.Time](),
|
repairMap: orderedmap.NewOrderedMap[string, time.Time](),
|
||||||
requiredVersion: "10.11.2023",
|
requiredVersion: "10.11.2023",
|
||||||
rd: rd,
|
api: api,
|
||||||
config: config,
|
|
||||||
workerPool: make(chan bool, config.GetNumOfWorkers()),
|
workerPool: make(chan bool, config.GetNumOfWorkers()),
|
||||||
mu: &sync.Mutex{},
|
mu: &sync.Mutex{},
|
||||||
log: logutil.NewLogger().Named("manager"),
|
log: logutil.NewLogger().Named("manager"),
|
||||||
@@ -46,7 +46,7 @@ func NewTorrentManager(config config.ConfigInterface, rd *realdebrid.RealDebrid)
|
|||||||
|
|
||||||
t.mu.Lock()
|
t.mu.Lock()
|
||||||
|
|
||||||
newTorrents, _, err := t.rd.GetTorrents(0)
|
newTorrents, _, err := t.api.GetTorrents(0)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.log.Fatalf("Cannot get torrents: %v\n", err)
|
t.log.Fatalf("Cannot get torrents: %v\n", err)
|
||||||
}
|
}
|
||||||
@@ -123,7 +123,7 @@ func (t *TorrentManager) mergeToMain(t1, t2 *Torrent) *Torrent {
|
|||||||
// proxy
|
// proxy
|
||||||
func (t *TorrentManager) UnrestrictUntilOk(link string) *realdebrid.UnrestrictResponse {
|
func (t *TorrentManager) UnrestrictUntilOk(link string) *realdebrid.UnrestrictResponse {
|
||||||
t.workerPool <- true
|
t.workerPool <- true
|
||||||
ret := t.rd.UnrestrictUntilOk(link)
|
ret := t.api.UnrestrictUntilOk(link)
|
||||||
<-t.workerPool
|
<-t.workerPool
|
||||||
return ret
|
return ret
|
||||||
}
|
}
|
||||||
@@ -141,7 +141,7 @@ func (t *TorrentManager) getChecksum() string {
|
|||||||
|
|
||||||
// GetTorrents request
|
// GetTorrents request
|
||||||
go func() {
|
go func() {
|
||||||
torrents, totalCount, err := t.rd.GetTorrents(1)
|
torrents, totalCount, err := t.api.GetTorrents(1)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
errChan <- err
|
errChan <- err
|
||||||
return
|
return
|
||||||
@@ -151,7 +151,7 @@ func (t *TorrentManager) getChecksum() string {
|
|||||||
|
|
||||||
// GetActiveTorrentCount request
|
// GetActiveTorrentCount request
|
||||||
go func() {
|
go func() {
|
||||||
count, err := t.rd.GetActiveTorrentCount()
|
count, err := t.api.GetActiveTorrentCount()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
errChan <- err
|
errChan <- err
|
||||||
return
|
return
|
||||||
@@ -196,7 +196,7 @@ func (t *TorrentManager) startRefreshJob() {
|
|||||||
|
|
||||||
t.mu.Lock()
|
t.mu.Lock()
|
||||||
|
|
||||||
newTorrents, _, err := t.rd.GetTorrents(0)
|
newTorrents, _, err := t.api.GetTorrents(0)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.log.Warnf("Cannot get torrents: %v\n", err)
|
t.log.Warnf("Cannot get torrents: %v\n", err)
|
||||||
continue
|
continue
|
||||||
@@ -262,13 +262,13 @@ func (t *TorrentManager) getMoreInfo(rdTorrent realdebrid.Torrent) *Torrent {
|
|||||||
var err error
|
var err error
|
||||||
// file cache
|
// file cache
|
||||||
torrentFromFile := t.readFromFile(rdTorrent.ID)
|
torrentFromFile := t.readFromFile(rdTorrent.ID)
|
||||||
if torrentFromFile != nil && len(torrentFromFile.ID) > 0 && len(torrentFromFile.Links) == len(rdTorrent.Links) {
|
if torrentFromFile != nil && len(torrentFromFile.ID) > 0 && len(torrentFromFile.Links) == len(rdTorrent.Links) && torrentFromFile.Links[0] == rdTorrent.Links[0] {
|
||||||
// see if api data and file data still match
|
// see if api data and file data still match
|
||||||
// then it means data is still usable
|
// then it means data is still usable
|
||||||
info = torrentFromFile
|
info = torrentFromFile
|
||||||
}
|
}
|
||||||
if info == nil {
|
if info == nil {
|
||||||
info, err = t.rd.GetTorrentInfo(rdTorrent.ID)
|
info, err = t.api.GetTorrentInfo(rdTorrent.ID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.log.Warnf("Cannot get info for id=%s: %v\n", rdTorrent.ID, err)
|
t.log.Warnf("Cannot get info for id=%s: %v\n", rdTorrent.ID, err)
|
||||||
return nil
|
return nil
|
||||||
@@ -386,7 +386,7 @@ func (t *TorrentManager) getDirectories(torrent *realdebrid.TorrentInfo) []strin
|
|||||||
default:
|
default:
|
||||||
t.log.Error("Unknown config version")
|
t.log.Error("Unknown config version")
|
||||||
}
|
}
|
||||||
t.log.Debugf("Torrent %s is in directories %v", t.getName(torrent.Name, torrent.OriginalName), ret)
|
// t.log.Debugf("Torrent %s is in directories %v", t.getName(torrent.Name, torrent.OriginalName), ret)
|
||||||
return ret
|
return ret
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -446,7 +446,7 @@ func (t *TorrentManager) organizeChaos(links []string, selectedFiles *orderedmap
|
|||||||
go func(lnk string) {
|
go func(lnk string) {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
t.workerPool <- true
|
t.workerPool <- true
|
||||||
resp := t.rd.UnrestrictUntilOk(lnk)
|
resp := t.api.UnrestrictUntilOk(lnk)
|
||||||
<-t.workerPool
|
<-t.workerPool
|
||||||
resultsChan <- Result{Response: resp}
|
resultsChan <- Result{Response: resp}
|
||||||
}(link)
|
}(link)
|
||||||
@@ -613,7 +613,7 @@ func (t *TorrentManager) reinsertTorrent(torrent *Torrent, missingFiles string)
|
|||||||
}
|
}
|
||||||
|
|
||||||
// redownload torrent
|
// redownload torrent
|
||||||
resp, err := t.rd.AddMagnetHash(torrent.Instances[0].Hash)
|
resp, err := t.api.AddMagnetHash(torrent.Instances[0].Hash)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.log.Warnf("Cannot redownload torrent: %v", err)
|
t.log.Warnf("Cannot redownload torrent: %v", err)
|
||||||
return false
|
return false
|
||||||
@@ -622,25 +622,25 @@ func (t *TorrentManager) reinsertTorrent(torrent *Torrent, missingFiles string)
|
|||||||
|
|
||||||
// select files
|
// select files
|
||||||
newTorrentID := resp.ID
|
newTorrentID := resp.ID
|
||||||
err = t.rd.SelectTorrentFiles(newTorrentID, missingFiles)
|
err = t.api.SelectTorrentFiles(newTorrentID, missingFiles)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.log.Warnf("Cannot start redownloading: %v", err)
|
t.log.Warnf("Cannot start redownloading: %v", err)
|
||||||
t.rd.DeleteTorrent(newTorrentID)
|
t.api.DeleteTorrent(newTorrentID)
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
time.Sleep(10 * time.Second)
|
time.Sleep(10 * time.Second)
|
||||||
|
|
||||||
// see if the torrent is ready
|
// see if the torrent is ready
|
||||||
info, err := t.rd.GetTorrentInfo(newTorrentID)
|
info, err := t.api.GetTorrentInfo(newTorrentID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.log.Warnf("Cannot get info on redownloaded torrent id=%s : %v", newTorrentID, err)
|
t.log.Warnf("Cannot get info on redownloaded torrent id=%s : %v", newTorrentID, err)
|
||||||
t.rd.DeleteTorrent(newTorrentID)
|
t.api.DeleteTorrent(newTorrentID)
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
if info.Status == "magnet_error" || info.Status == "error" || info.Status == "virus" || info.Status == "dead" {
|
if info.Status == "magnet_error" || info.Status == "error" || info.Status == "virus" || info.Status == "dead" {
|
||||||
t.log.Warnf("The redownloaded torrent id=%s is in error state: %s", newTorrentID, info.Status)
|
t.log.Warnf("The redownloaded torrent id=%s is in error state: %s", newTorrentID, info.Status)
|
||||||
t.rd.DeleteTorrent(newTorrentID)
|
t.api.DeleteTorrent(newTorrentID)
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -652,7 +652,7 @@ func (t *TorrentManager) reinsertTorrent(torrent *Torrent, missingFiles string)
|
|||||||
missingCount := len(strings.Split(missingFiles, ","))
|
missingCount := len(strings.Split(missingFiles, ","))
|
||||||
if len(info.Links) != missingCount {
|
if len(info.Links) != missingCount {
|
||||||
t.log.Infof("It did not fix the issue for id=%s, only got %d files but we need %d, undoing", info.ID, len(info.Links), missingCount)
|
t.log.Infof("It did not fix the issue for id=%s, only got %d files but we need %d, undoing", info.ID, len(info.Links), missingCount)
|
||||||
t.rd.DeleteTorrent(newTorrentID)
|
t.api.DeleteTorrent(newTorrentID)
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -667,7 +667,7 @@ func (t *TorrentManager) canCapacityHandle() bool {
|
|||||||
const maxDelay = 60 * time.Second
|
const maxDelay = 60 * time.Second
|
||||||
retryCount := 0
|
retryCount := 0
|
||||||
for {
|
for {
|
||||||
count, err := t.rd.GetActiveTorrentCount()
|
count, err := t.api.GetActiveTorrentCount()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.log.Warnf("Cannot get active downloads count: %v", err)
|
t.log.Warnf("Cannot get active downloads count: %v", err)
|
||||||
if retryCount >= maxRetries {
|
if retryCount >= maxRetries {
|
||||||
|
|||||||
@@ -8,7 +8,6 @@ import (
|
|||||||
"bazil.org/fuse/fs"
|
"bazil.org/fuse/fs"
|
||||||
"github.com/debridmediamanager.com/zurg/internal/config"
|
"github.com/debridmediamanager.com/zurg/internal/config"
|
||||||
"github.com/debridmediamanager.com/zurg/internal/torrent"
|
"github.com/debridmediamanager.com/zurg/internal/torrent"
|
||||||
"github.com/debridmediamanager.com/zurg/pkg/chunk"
|
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -18,11 +17,10 @@ type FS struct {
|
|||||||
umask os.FileMode
|
umask os.FileMode
|
||||||
directIO bool
|
directIO bool
|
||||||
lock sync.RWMutex
|
lock sync.RWMutex
|
||||||
c config.ConfigInterface
|
config config.ConfigInterface
|
||||||
t *torrent.TorrentManager
|
tMgr *torrent.TorrentManager
|
||||||
log *zap.SugaredLogger
|
log *zap.SugaredLogger
|
||||||
initTime time.Time
|
initTime time.Time
|
||||||
chunk *chunk.Manager
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Root returns the root path
|
// Root returns the root path
|
||||||
|
|||||||
@@ -8,15 +8,13 @@ import (
|
|||||||
"bazil.org/fuse/fs"
|
"bazil.org/fuse/fs"
|
||||||
"github.com/debridmediamanager.com/zurg/internal/config"
|
"github.com/debridmediamanager.com/zurg/internal/config"
|
||||||
"github.com/debridmediamanager.com/zurg/internal/torrent"
|
"github.com/debridmediamanager.com/zurg/internal/torrent"
|
||||||
"github.com/debridmediamanager.com/zurg/pkg/chunk"
|
|
||||||
"github.com/debridmediamanager.com/zurg/pkg/logutil"
|
"github.com/debridmediamanager.com/zurg/pkg/logutil"
|
||||||
|
|
||||||
"golang.org/x/sys/unix"
|
"golang.org/x/sys/unix"
|
||||||
)
|
)
|
||||||
|
|
||||||
func Mount(mountpoint string, cfg config.ConfigInterface, tMgr *torrent.TorrentManager, cMgr *chunk.Manager) error {
|
func Mount(mountpoint string, cfg config.ConfigInterface, tMgr *torrent.TorrentManager) error {
|
||||||
rlog := logutil.NewLogger()
|
log := logutil.NewLogger().Named("zfs")
|
||||||
log := rlog.Named("zfs")
|
|
||||||
|
|
||||||
options := []fuse.MountOption{
|
options := []fuse.MountOption{
|
||||||
fuse.AllowOther(),
|
fuse.AllowOther(),
|
||||||
@@ -37,11 +35,10 @@ func Mount(mountpoint string, cfg config.ConfigInterface, tMgr *torrent.TorrentM
|
|||||||
uid: uint32(unix.Geteuid()),
|
uid: uint32(unix.Geteuid()),
|
||||||
gid: uint32(unix.Getegid()),
|
gid: uint32(unix.Getegid()),
|
||||||
umask: os.FileMode(0),
|
umask: os.FileMode(0),
|
||||||
c: cfg,
|
config: cfg,
|
||||||
t: tMgr,
|
tMgr: tMgr,
|
||||||
log: log,
|
log: log,
|
||||||
initTime: time.Now(),
|
initTime: time.Now(),
|
||||||
chunk: cMgr,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := srv.Serve(filesys); err != nil {
|
if err := srv.Serve(filesys); err != nil {
|
||||||
|
|||||||
@@ -58,14 +58,14 @@ func (o Object) ReadDirAll(ctx context.Context) ([]fuse.Dirent, error) {
|
|||||||
dirs := []fuse.Dirent{}
|
dirs := []fuse.Dirent{}
|
||||||
switch o.objType {
|
switch o.objType {
|
||||||
case ROOT:
|
case ROOT:
|
||||||
for _, directory := range o.fs.c.GetDirectories() {
|
for _, directory := range o.fs.config.GetDirectories() {
|
||||||
dirs = append(dirs, fuse.Dirent{
|
dirs = append(dirs, fuse.Dirent{
|
||||||
Name: directory,
|
Name: directory,
|
||||||
Type: fuse.DT_Dir,
|
Type: fuse.DT_Dir,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
case DIRECTORY:
|
case DIRECTORY:
|
||||||
for el := o.fs.t.TorrentMap.Front(); el != nil; el = el.Next() {
|
for el := o.fs.tMgr.TorrentMap.Front(); el != nil; el = el.Next() {
|
||||||
torrent := el.Value
|
torrent := el.Value
|
||||||
if torrent.InProgress {
|
if torrent.InProgress {
|
||||||
continue
|
continue
|
||||||
@@ -76,7 +76,7 @@ func (o Object) ReadDirAll(ctx context.Context) ([]fuse.Dirent, error) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
case TORRENT:
|
case TORRENT:
|
||||||
torrent, _ := o.fs.t.TorrentMap.Get(o.name)
|
torrent, _ := o.fs.tMgr.TorrentMap.Get(o.name)
|
||||||
if torrent == nil || torrent.InProgress {
|
if torrent == nil || torrent.InProgress {
|
||||||
return nil, syscall.ENOENT
|
return nil, syscall.ENOENT
|
||||||
}
|
}
|
||||||
@@ -100,7 +100,7 @@ func (o Object) ReadDirAll(ctx context.Context) ([]fuse.Dirent, error) {
|
|||||||
func (o Object) Lookup(ctx context.Context, name string) (fs.Node, error) {
|
func (o Object) Lookup(ctx context.Context, name string) (fs.Node, error) {
|
||||||
switch o.objType {
|
switch o.objType {
|
||||||
case ROOT:
|
case ROOT:
|
||||||
for _, directory := range o.fs.c.GetDirectories() {
|
for _, directory := range o.fs.config.GetDirectories() {
|
||||||
if directory == name {
|
if directory == name {
|
||||||
return Object{
|
return Object{
|
||||||
fs: o.fs,
|
fs: o.fs,
|
||||||
@@ -112,7 +112,7 @@ func (o Object) Lookup(ctx context.Context, name string) (fs.Node, error) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
case DIRECTORY:
|
case DIRECTORY:
|
||||||
torrent, _ := o.fs.t.TorrentMap.Get(name)
|
torrent, _ := o.fs.tMgr.TorrentMap.Get(name)
|
||||||
if torrent == nil {
|
if torrent == nil {
|
||||||
return nil, syscall.ENOENT
|
return nil, syscall.ENOENT
|
||||||
}
|
}
|
||||||
@@ -125,7 +125,7 @@ func (o Object) Lookup(ctx context.Context, name string) (fs.Node, error) {
|
|||||||
}, nil
|
}, nil
|
||||||
|
|
||||||
case TORRENT:
|
case TORRENT:
|
||||||
torrent, _ := o.fs.t.TorrentMap.Get(o.name)
|
torrent, _ := o.fs.tMgr.TorrentMap.Get(o.name)
|
||||||
if torrent == nil {
|
if torrent == nil {
|
||||||
return nil, syscall.ENOENT
|
return nil, syscall.ENOENT
|
||||||
}
|
}
|
||||||
@@ -155,7 +155,7 @@ func (o Object) Open(ctx context.Context, req *fuse.OpenRequest, resp *fuse.Open
|
|||||||
|
|
||||||
// Read reads some bytes or the whole file
|
// Read reads some bytes or the whole file
|
||||||
func (o Object) Read(ctx context.Context, req *fuse.ReadRequest, resp *fuse.ReadResponse) error {
|
func (o Object) Read(ctx context.Context, req *fuse.ReadRequest, resp *fuse.ReadResponse) error {
|
||||||
reader := universal.GetFileReader(o.torrent, o.file, req.Offset, int(req.Size), o.fs.t, o.fs.c, o.fs.log)
|
reader := universal.GetFileReader(o.torrent, o.file, req.Offset, int(req.Size), o.fs.tMgr, o.fs.config, o.fs.log)
|
||||||
if reader == nil {
|
if reader == nil {
|
||||||
return syscall.EIO
|
return syscall.EIO
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,51 +0,0 @@
|
|||||||
package chunk
|
|
||||||
|
|
||||||
import (
|
|
||||||
"container/list"
|
|
||||||
"hash/crc32"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Chunk of memory
|
|
||||||
type Chunk struct {
|
|
||||||
clean bool
|
|
||||||
*chunkHeader
|
|
||||||
bytes []byte
|
|
||||||
item *list.Element
|
|
||||||
}
|
|
||||||
|
|
||||||
type chunkHeader struct {
|
|
||||||
id RequestID
|
|
||||||
size uint32
|
|
||||||
checksum uint32
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Chunk) valid(id RequestID) bool {
|
|
||||||
if c.id != id {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
if !c.clean {
|
|
||||||
c.clean = c.checksum == c.calculateChecksum()
|
|
||||||
}
|
|
||||||
return c.clean
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Chunk) update(id RequestID, bytes []byte) {
|
|
||||||
c.id = id
|
|
||||||
c.size = uint32(copy(c.bytes, bytes))
|
|
||||||
c.checksum = c.calculateChecksum()
|
|
||||||
c.clean = true
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Chunk) calculateChecksum() uint32 {
|
|
||||||
size := c.size
|
|
||||||
if nil == c.bytes || size == 0 {
|
|
||||||
return 0
|
|
||||||
}
|
|
||||||
maxSize := uint32(len(c.bytes))
|
|
||||||
if size > maxSize {
|
|
||||||
// corrupt size or truncated chunk, fix size
|
|
||||||
c.size = maxSize
|
|
||||||
return crc32.Checksum(c.bytes, crc32Table)
|
|
||||||
}
|
|
||||||
return crc32.Checksum(c.bytes[:size], crc32Table)
|
|
||||||
}
|
|
||||||
@@ -1,146 +0,0 @@
|
|||||||
package chunk
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
"io"
|
|
||||||
"net/http"
|
|
||||||
"sync"
|
|
||||||
"syscall"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/debridmediamanager.com/zurg/internal/config"
|
|
||||||
"github.com/debridmediamanager.com/zurg/internal/torrent"
|
|
||||||
"github.com/debridmediamanager.com/zurg/pkg/logutil"
|
|
||||||
"go.uber.org/zap"
|
|
||||||
"golang.org/x/sys/unix"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Downloader handles concurrent chunk downloads
|
|
||||||
type Downloader struct {
|
|
||||||
BufferSize int64
|
|
||||||
queue chan *Request
|
|
||||||
callbacks map[RequestID][]DownloadCallback
|
|
||||||
lock sync.Mutex
|
|
||||||
storage *Storage
|
|
||||||
c config.ConfigInterface
|
|
||||||
t *torrent.TorrentManager
|
|
||||||
log *zap.SugaredLogger
|
|
||||||
}
|
|
||||||
|
|
||||||
type DownloadCallback func(error, []byte)
|
|
||||||
|
|
||||||
// NewDownloader creates a new download manager
|
|
||||||
func NewDownloader(threads int, storage *Storage, bufferSize int64, t *torrent.TorrentManager, c config.ConfigInterface) (*Downloader, error) {
|
|
||||||
rlog := logutil.NewLogger()
|
|
||||||
log := rlog.Named("downloader")
|
|
||||||
|
|
||||||
manager := Downloader{
|
|
||||||
BufferSize: bufferSize,
|
|
||||||
queue: make(chan *Request, 100),
|
|
||||||
callbacks: make(map[RequestID][]DownloadCallback, 100),
|
|
||||||
storage: storage,
|
|
||||||
c: c,
|
|
||||||
t: t,
|
|
||||||
log: log,
|
|
||||||
}
|
|
||||||
|
|
||||||
for i := 0; i < threads; i++ {
|
|
||||||
go manager.thread(i)
|
|
||||||
}
|
|
||||||
|
|
||||||
return &manager, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Download starts a new download request
|
|
||||||
func (d *Downloader) Download(req *Request, callback DownloadCallback) {
|
|
||||||
d.lock.Lock()
|
|
||||||
callbacks, exists := d.callbacks[req.id]
|
|
||||||
if nil != callback {
|
|
||||||
d.callbacks[req.id] = append(callbacks, callback)
|
|
||||||
} else if !exists {
|
|
||||||
d.callbacks[req.id] = callbacks
|
|
||||||
}
|
|
||||||
if !exists {
|
|
||||||
d.queue <- req
|
|
||||||
}
|
|
||||||
d.lock.Unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (d *Downloader) thread(n int) {
|
|
||||||
buffer, err := unix.Mmap(-1, 0, int(d.BufferSize), syscall.PROT_READ|syscall.PROT_WRITE, syscall.MAP_ANON|syscall.MAP_PRIVATE)
|
|
||||||
if nil != err {
|
|
||||||
d.log.Warnf("Failed to mmap download buffer %v: %v", n, err)
|
|
||||||
buffer = make([]byte, d.BufferSize)
|
|
||||||
}
|
|
||||||
for {
|
|
||||||
req := <-d.queue
|
|
||||||
d.download(req, buffer)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (d *Downloader) download(req *Request, buffer []byte) {
|
|
||||||
d.log.Debugf("Starting download %v (preload: %v)", req.id, req.preload)
|
|
||||||
err := d.downloadFromAPI(req, buffer, 0)
|
|
||||||
|
|
||||||
d.lock.Lock()
|
|
||||||
callbacks := d.callbacks[req.id]
|
|
||||||
for _, callback := range callbacks {
|
|
||||||
callback(err, buffer)
|
|
||||||
}
|
|
||||||
delete(d.callbacks, req.id)
|
|
||||||
d.lock.Unlock()
|
|
||||||
|
|
||||||
if nil != err {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := d.storage.Store(req.id, buffer); nil != err {
|
|
||||||
d.log.Warnf("Could not store chunk %v: %v", req.id, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (d *Downloader) downloadFromAPI(request *Request, buffer []byte, delay int64) error {
|
|
||||||
// sleep if request is throttled
|
|
||||||
if delay > 0 {
|
|
||||||
time.Sleep(time.Duration(delay) * time.Second)
|
|
||||||
}
|
|
||||||
|
|
||||||
resp := d.t.UnrestrictUntilOk(request.file.Link)
|
|
||||||
if resp == nil {
|
|
||||||
return fmt.Errorf("cannot unrestrict file %s %s", request.file.Path, request.file.Link)
|
|
||||||
}
|
|
||||||
|
|
||||||
downloadURL := resp.Download
|
|
||||||
req, err := http.NewRequest("GET", downloadURL, nil)
|
|
||||||
if nil != err {
|
|
||||||
d.log.Debugf("request init error: %v", err)
|
|
||||||
return fmt.Errorf("could not create request object %s %s from API", request.file.Path, request.file.Link)
|
|
||||||
}
|
|
||||||
req.Header.Add("Range", fmt.Sprintf("bytes=%v-%v", request.offsetStart, request.offsetEnd-1))
|
|
||||||
|
|
||||||
res, err := http.DefaultClient.Do(req)
|
|
||||||
if nil != err {
|
|
||||||
d.log.Debugf("request error: %v", err)
|
|
||||||
return fmt.Errorf("could not request object %s %s from API", request.file.Path, request.file.Link)
|
|
||||||
}
|
|
||||||
defer res.Body.Close()
|
|
||||||
reader := res.Body
|
|
||||||
|
|
||||||
if res.StatusCode != 206 && res.StatusCode != 200 {
|
|
||||||
return fmt.Errorf("could not read object %s %s / StatusCode: %v",
|
|
||||||
request.file.Path, request.file.Link, res.StatusCode)
|
|
||||||
}
|
|
||||||
|
|
||||||
if res.ContentLength == -1 {
|
|
||||||
return fmt.Errorf("missing Content-Length header in response")
|
|
||||||
}
|
|
||||||
|
|
||||||
n, err := io.ReadFull(reader, buffer[:res.ContentLength:cap(buffer)])
|
|
||||||
if nil != err && err != io.ErrUnexpectedEOF {
|
|
||||||
d.log.Debugf("response read error: %v", err)
|
|
||||||
return fmt.Errorf("could not read objects %s %s API response", request.file.Path, request.file.Link)
|
|
||||||
}
|
|
||||||
d.log.Debugf("Downloaded %v bytes of %s %s", n, request.file.Path, request.file.Link)
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
@@ -1,266 +0,0 @@
|
|||||||
package chunk
|
|
||||||
|
|
||||||
import (
|
|
||||||
"crypto/sha256"
|
|
||||||
"encoding/binary"
|
|
||||||
"fmt"
|
|
||||||
"os"
|
|
||||||
|
|
||||||
"github.com/debridmediamanager.com/zurg/internal/config"
|
|
||||||
"github.com/debridmediamanager.com/zurg/internal/torrent"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Manager manages chunks on disk
|
|
||||||
type Manager struct {
|
|
||||||
ChunkSize int64
|
|
||||||
LoadAhead int
|
|
||||||
downloader *Downloader
|
|
||||||
storage *Storage
|
|
||||||
queue chan *QueueEntry
|
|
||||||
}
|
|
||||||
|
|
||||||
type QueueEntry struct {
|
|
||||||
request *Request
|
|
||||||
response chan Response
|
|
||||||
}
|
|
||||||
|
|
||||||
// RequestID is the binary identifier for a chunk request
|
|
||||||
type RequestID [24]byte
|
|
||||||
|
|
||||||
func (id RequestID) String() string {
|
|
||||||
return fmt.Sprintf("%032x:%v", id[:16], binary.BigEndian.Uint64(id[16:]))
|
|
||||||
}
|
|
||||||
|
|
||||||
// Request represents a chunk request
|
|
||||||
type Request struct {
|
|
||||||
id RequestID
|
|
||||||
file *torrent.File
|
|
||||||
offsetStart int64
|
|
||||||
offsetEnd int64
|
|
||||||
chunkOffset int64
|
|
||||||
chunkOffsetEnd int64
|
|
||||||
sequence int
|
|
||||||
preload bool
|
|
||||||
}
|
|
||||||
|
|
||||||
// Response represetns a chunk response
|
|
||||||
type Response struct {
|
|
||||||
Sequence int
|
|
||||||
Error error
|
|
||||||
Bytes []byte
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewManager creates a new chunk manager
|
|
||||||
func NewManager(
|
|
||||||
chunkFile string,
|
|
||||||
chunkSize int64,
|
|
||||||
loadAhead,
|
|
||||||
checkThreads,
|
|
||||||
loadThreads,
|
|
||||||
maxChunks int,
|
|
||||||
t *torrent.TorrentManager,
|
|
||||||
c config.ConfigInterface) (*Manager, error) {
|
|
||||||
|
|
||||||
pageSize := int64(os.Getpagesize())
|
|
||||||
if chunkSize < pageSize {
|
|
||||||
return nil, fmt.Errorf("chunk size must not be < %v", pageSize)
|
|
||||||
}
|
|
||||||
if chunkSize%pageSize != 0 {
|
|
||||||
return nil, fmt.Errorf("chunk size must be divideable by %v", pageSize)
|
|
||||||
}
|
|
||||||
// 32-Bit: ~2GB / 64-Bit: ~8EB
|
|
||||||
maxMmapSize := int64(^uint(0) >> 1)
|
|
||||||
if chunkSize > maxMmapSize {
|
|
||||||
return nil, fmt.Errorf("chunk size must be < %v", maxMmapSize)
|
|
||||||
}
|
|
||||||
if maxChunks < 2 || maxChunks < loadAhead {
|
|
||||||
return nil, fmt.Errorf("max-chunks must be greater than 2 and bigger than the load ahead value")
|
|
||||||
}
|
|
||||||
|
|
||||||
storage, err := NewStorage(chunkSize, maxChunks, maxMmapSize, chunkFile)
|
|
||||||
if nil != err {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
downloader, err := NewDownloader(loadThreads, storage, chunkSize, t, c)
|
|
||||||
if nil != err {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
manager := Manager{
|
|
||||||
ChunkSize: chunkSize,
|
|
||||||
LoadAhead: loadAhead,
|
|
||||||
downloader: downloader,
|
|
||||||
storage: storage,
|
|
||||||
queue: make(chan *QueueEntry, 100),
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := manager.storage.Clear(); nil != err {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
for i := 0; i < checkThreads; i++ {
|
|
||||||
go manager.thread()
|
|
||||||
}
|
|
||||||
|
|
||||||
return &manager, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetChunk loads one chunk and starts the preload for the next chunks
|
|
||||||
func (m *Manager) GetChunk(file *torrent.File, offset, size int64) ([]byte, error) {
|
|
||||||
maxOffset := file.Bytes
|
|
||||||
if offset > maxOffset {
|
|
||||||
return nil, fmt.Errorf("tried to read past EOF of %v at offset %v", file.ID, offset)
|
|
||||||
}
|
|
||||||
// Log.Infof("Request %v:%v md5:%v", object.ID, offset, object.MD5Checksum)
|
|
||||||
if offset+size > maxOffset {
|
|
||||||
size = file.Bytes - offset
|
|
||||||
}
|
|
||||||
|
|
||||||
ranges := splitChunkRanges(offset, size, m.ChunkSize)
|
|
||||||
numRanges := len(ranges)
|
|
||||||
responses := make(chan Response, numRanges)
|
|
||||||
|
|
||||||
last := numRanges - 1
|
|
||||||
for i, r := range ranges {
|
|
||||||
m.requestChunk(file, r.offset, r.size, i, i == last, responses)
|
|
||||||
}
|
|
||||||
|
|
||||||
data := make([]byte, size)
|
|
||||||
for i := 0; i < cap(responses); i++ {
|
|
||||||
res := <-responses
|
|
||||||
if nil != res.Error {
|
|
||||||
return nil, res.Error
|
|
||||||
}
|
|
||||||
|
|
||||||
dataOffset := ranges[res.Sequence].offset - offset
|
|
||||||
|
|
||||||
if n := copy(data[dataOffset:], res.Bytes); n == 0 {
|
|
||||||
return nil, fmt.Errorf("request %v slice %v has empty response", file.ID, res.Sequence)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
close(responses)
|
|
||||||
|
|
||||||
return data, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func buildRequestID(object *torrent.File, offset int64) (id RequestID) {
|
|
||||||
fileID := object.Link
|
|
||||||
if fileID == "" {
|
|
||||||
fileID = object.Path
|
|
||||||
}
|
|
||||||
hash := sha256.Sum256([]byte(fileID))
|
|
||||||
copy(id[:16], hash[:16])
|
|
||||||
binary.BigEndian.PutUint64(id[16:], uint64(offset))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *Manager) requestChunk(object *torrent.File, offset, size int64, sequence int, preload bool, response chan Response) {
|
|
||||||
chunkOffset := offset % m.ChunkSize
|
|
||||||
offsetStart := offset - chunkOffset
|
|
||||||
offsetEnd := offsetStart + m.ChunkSize
|
|
||||||
|
|
||||||
request := &Request{
|
|
||||||
id: buildRequestID(object, offsetStart),
|
|
||||||
file: object,
|
|
||||||
offsetStart: offsetStart,
|
|
||||||
offsetEnd: offsetEnd,
|
|
||||||
chunkOffset: chunkOffset,
|
|
||||||
chunkOffsetEnd: chunkOffset + size,
|
|
||||||
sequence: sequence,
|
|
||||||
preload: false,
|
|
||||||
}
|
|
||||||
|
|
||||||
m.queue <- &QueueEntry{
|
|
||||||
request: request,
|
|
||||||
response: response,
|
|
||||||
}
|
|
||||||
|
|
||||||
if !preload {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
for i := m.ChunkSize; i < (m.ChunkSize * int64(m.LoadAhead+1)); i += m.ChunkSize {
|
|
||||||
aheadOffsetStart := offsetStart + i
|
|
||||||
aheadOffsetEnd := aheadOffsetStart + m.ChunkSize
|
|
||||||
if uint64(aheadOffsetStart) < uint64(object.Bytes) && uint64(aheadOffsetEnd) < uint64(object.Bytes) {
|
|
||||||
request := &Request{
|
|
||||||
id: buildRequestID(object, aheadOffsetStart),
|
|
||||||
file: object,
|
|
||||||
offsetStart: aheadOffsetStart,
|
|
||||||
offsetEnd: aheadOffsetEnd,
|
|
||||||
preload: true,
|
|
||||||
}
|
|
||||||
m.queue <- &QueueEntry{
|
|
||||||
request: request,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
type byteRange struct {
|
|
||||||
offset, size int64
|
|
||||||
}
|
|
||||||
|
|
||||||
// Calculate request ranges that span multiple chunks
|
|
||||||
//
|
|
||||||
// This can happen with Direct-IO and unaligned reads or
|
|
||||||
// if the size is bigger than the chunk size.
|
|
||||||
func splitChunkRanges(offset, size, chunkSize int64) []byteRange {
|
|
||||||
ranges := make([]byteRange, 0, size/chunkSize+2)
|
|
||||||
for remaining := size; remaining > 0; remaining -= size {
|
|
||||||
size = min(remaining, chunkSize-offset%chunkSize)
|
|
||||||
ranges = append(ranges, byteRange{offset, size})
|
|
||||||
offset += size
|
|
||||||
}
|
|
||||||
return ranges
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *Manager) thread() {
|
|
||||||
for {
|
|
||||||
queueEntry := <-m.queue
|
|
||||||
m.checkChunk(queueEntry.request, queueEntry.response)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *Manager) checkChunk(req *Request, response chan Response) {
|
|
||||||
if nil == response {
|
|
||||||
if nil == m.storage.Load(req.id) {
|
|
||||||
m.downloader.Download(req, nil)
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if bytes := m.storage.Load(req.id); nil != bytes {
|
|
||||||
response <- Response{
|
|
||||||
Sequence: req.sequence,
|
|
||||||
Bytes: adjustResponseChunk(req, bytes),
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
m.downloader.Download(req, func(err error, bytes []byte) {
|
|
||||||
response <- Response{
|
|
||||||
Sequence: req.sequence,
|
|
||||||
Error: err,
|
|
||||||
Bytes: adjustResponseChunk(req, bytes),
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func adjustResponseChunk(req *Request, bytes []byte) []byte {
|
|
||||||
if nil == bytes {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
bytesLen := int64(len(bytes))
|
|
||||||
sOffset := min(req.chunkOffset, bytesLen)
|
|
||||||
eOffset := min(req.chunkOffsetEnd, bytesLen)
|
|
||||||
return bytes[sOffset:eOffset]
|
|
||||||
}
|
|
||||||
|
|
||||||
func min(x, y int64) int64 {
|
|
||||||
if x < y {
|
|
||||||
return x
|
|
||||||
}
|
|
||||||
return y
|
|
||||||
}
|
|
||||||
@@ -1,53 +0,0 @@
|
|||||||
package chunk
|
|
||||||
|
|
||||||
import "testing"
|
|
||||||
|
|
||||||
func TestSplitChunkRanges(t *testing.T) {
|
|
||||||
testcases := []struct {
|
|
||||||
offset, size, chunkSize int64
|
|
||||||
result []byteRange
|
|
||||||
}{
|
|
||||||
{0, 0, 4096, []byteRange{}},
|
|
||||||
{0, 4096, 4096, []byteRange{
|
|
||||||
{0, 4096},
|
|
||||||
}},
|
|
||||||
{4095, 4096, 4096, []byteRange{
|
|
||||||
{4095, 1},
|
|
||||||
{4096, 4095},
|
|
||||||
}},
|
|
||||||
{0, 8192, 4096, []byteRange{
|
|
||||||
{0, 4096},
|
|
||||||
{4096, 4096},
|
|
||||||
}},
|
|
||||||
{2048, 8192, 4096, []byteRange{
|
|
||||||
{2048, 2048},
|
|
||||||
{4096, 4096},
|
|
||||||
{8192, 2048},
|
|
||||||
}},
|
|
||||||
{2048, 8192, 4096, []byteRange{
|
|
||||||
{2048, 2048},
|
|
||||||
{4096, 4096},
|
|
||||||
{8192, 2048},
|
|
||||||
}},
|
|
||||||
{17960960, 16777216, 10485760, []byteRange{
|
|
||||||
{17960960, 3010560},
|
|
||||||
{20971520, 10485760},
|
|
||||||
{31457280, 3280896},
|
|
||||||
}},
|
|
||||||
}
|
|
||||||
for i, tc := range testcases {
|
|
||||||
ranges := splitChunkRanges(tc.offset, tc.size, tc.chunkSize)
|
|
||||||
actualSize := len(ranges)
|
|
||||||
expectedSize := len(tc.result)
|
|
||||||
if actualSize != expectedSize {
|
|
||||||
t.Fatalf("ByteRange %v length mismatch: %v != %v", i, actualSize, expectedSize)
|
|
||||||
}
|
|
||||||
for j, r := range ranges {
|
|
||||||
actual := r
|
|
||||||
expected := tc.result[j]
|
|
||||||
if actual != expected {
|
|
||||||
t.Fatalf("ByteRange %v mismatch: %v != %v", i, actual, expected)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -1,75 +0,0 @@
|
|||||||
package chunk
|
|
||||||
|
|
||||||
import (
|
|
||||||
"container/list"
|
|
||||||
"sync"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Stack is a thread safe list/stack implementation
|
|
||||||
type Stack struct {
|
|
||||||
items *list.List
|
|
||||||
lock sync.Mutex
|
|
||||||
maxSize int
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewStack creates a new stack
|
|
||||||
func NewStack(maxChunks int) *Stack {
|
|
||||||
return &Stack{
|
|
||||||
items: list.New(),
|
|
||||||
maxSize: maxChunks,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Len gets the number of items on the stack
|
|
||||||
func (s *Stack) Len() int {
|
|
||||||
s.lock.Lock()
|
|
||||||
defer s.lock.Unlock()
|
|
||||||
return s.items.Len()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Pop pops the first item from the stack
|
|
||||||
func (s *Stack) Pop() int {
|
|
||||||
s.lock.Lock()
|
|
||||||
defer s.lock.Unlock()
|
|
||||||
if s.items.Len() < s.maxSize {
|
|
||||||
return -1
|
|
||||||
}
|
|
||||||
item := s.items.Front()
|
|
||||||
if nil == item {
|
|
||||||
return -1
|
|
||||||
}
|
|
||||||
s.items.Remove(item)
|
|
||||||
return item.Value.(int)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Touch moves the specified item to the last position of the stack
|
|
||||||
func (s *Stack) Touch(item *list.Element) {
|
|
||||||
s.lock.Lock()
|
|
||||||
if item != s.items.Back() {
|
|
||||||
s.items.MoveToBack(item)
|
|
||||||
}
|
|
||||||
s.lock.Unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Push adds a new item to the last position of the stack
|
|
||||||
func (s *Stack) Push(id int) *list.Element {
|
|
||||||
s.lock.Lock()
|
|
||||||
defer s.lock.Unlock()
|
|
||||||
return s.items.PushBack(id)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Prepend adds a list to the front of the stack
|
|
||||||
func (s *Stack) Prepend(items *list.List) {
|
|
||||||
s.lock.Lock()
|
|
||||||
s.items.PushFrontList(items)
|
|
||||||
s.lock.Unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Purge an item from the stack
|
|
||||||
func (s *Stack) Purge(item *list.Element) {
|
|
||||||
s.lock.Lock()
|
|
||||||
defer s.lock.Unlock()
|
|
||||||
if item != s.items.Front() {
|
|
||||||
s.items.MoveToFront(item)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -1,72 +0,0 @@
|
|||||||
package chunk
|
|
||||||
|
|
||||||
import "testing"
|
|
||||||
|
|
||||||
func TestOOB(t *testing.T) {
|
|
||||||
stack := NewStack(1)
|
|
||||||
|
|
||||||
item := stack.Push(1)
|
|
||||||
stack.Touch(item)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestAddToStack(t *testing.T) {
|
|
||||||
stack := NewStack(1)
|
|
||||||
|
|
||||||
item1 := stack.Push(1)
|
|
||||||
item2 := stack.Push(2)
|
|
||||||
item3 := stack.Push(3)
|
|
||||||
item4 := stack.Push(4)
|
|
||||||
|
|
||||||
stack.Touch(item1)
|
|
||||||
stack.Touch(item3)
|
|
||||||
|
|
||||||
stack.Purge(item2)
|
|
||||||
stack.Purge(item4)
|
|
||||||
|
|
||||||
v := stack.Pop()
|
|
||||||
if v != 4 {
|
|
||||||
t.Fatalf("Expected 4 got %v", v)
|
|
||||||
}
|
|
||||||
|
|
||||||
v = stack.Pop()
|
|
||||||
if v != 2 {
|
|
||||||
t.Fatalf("Expected 2 got %v", v)
|
|
||||||
}
|
|
||||||
|
|
||||||
v = stack.Pop()
|
|
||||||
if v != 1 {
|
|
||||||
t.Fatalf("Expected 1 got %v", v)
|
|
||||||
}
|
|
||||||
|
|
||||||
v = stack.Pop()
|
|
||||||
if v != 3 {
|
|
||||||
t.Fatalf("Expected 3 got %v", v)
|
|
||||||
}
|
|
||||||
|
|
||||||
v = stack.Pop()
|
|
||||||
if v != -1 {
|
|
||||||
t.Fatalf("Expected -1 got %v", v)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestLen(t *testing.T) {
|
|
||||||
stack := NewStack(1)
|
|
||||||
|
|
||||||
v := stack.Len()
|
|
||||||
if v != 0 {
|
|
||||||
t.Fatalf("Expected 0 got %v", v)
|
|
||||||
}
|
|
||||||
|
|
||||||
stack.Push(1)
|
|
||||||
v = stack.Len()
|
|
||||||
if v != 1 {
|
|
||||||
t.Fatalf("Expected 1 got %v", v)
|
|
||||||
}
|
|
||||||
|
|
||||||
_ = stack.Pop()
|
|
||||||
v = stack.Len()
|
|
||||||
if v != 0 {
|
|
||||||
t.Fatalf("Expected 0 got %v", v)
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
@@ -1,420 +0,0 @@
|
|||||||
package chunk
|
|
||||||
|
|
||||||
import (
|
|
||||||
"container/list"
|
|
||||||
"fmt"
|
|
||||||
"hash/crc32"
|
|
||||||
"os"
|
|
||||||
"os/signal"
|
|
||||||
"sync"
|
|
||||||
"syscall"
|
|
||||||
"time"
|
|
||||||
"unsafe"
|
|
||||||
|
|
||||||
"go.uber.org/zap"
|
|
||||||
"golang.org/x/sys/unix"
|
|
||||||
|
|
||||||
"github.com/debridmediamanager.com/zurg/pkg/logutil"
|
|
||||||
)
|
|
||||||
|
|
||||||
const (
|
|
||||||
headerSize = int(unsafe.Sizeof(*new(chunkHeader)))
|
|
||||||
tocSize = int64(unsafe.Sizeof(*new(journalHeader)))
|
|
||||||
journalMagic = uint16('P'<<8 | 'D'&0xFF)
|
|
||||||
journalVersion = uint8(2)
|
|
||||||
)
|
|
||||||
|
|
||||||
var (
|
|
||||||
blankRequestID RequestID
|
|
||||||
crc32Table = crc32.MakeTable(crc32.Castagnoli)
|
|
||||||
)
|
|
||||||
|
|
||||||
// Storage is a chunk storage
|
|
||||||
type Storage struct {
|
|
||||||
ChunkFile *os.File
|
|
||||||
ChunkSize int64
|
|
||||||
HeaderSize int64
|
|
||||||
MaxChunks int
|
|
||||||
chunks map[RequestID]int
|
|
||||||
stack *Stack
|
|
||||||
lock sync.RWMutex
|
|
||||||
buffers []*Chunk
|
|
||||||
loadChunks int
|
|
||||||
signals chan os.Signal
|
|
||||||
journal []byte
|
|
||||||
mmapRegions [][]byte
|
|
||||||
chunksPerRegion int64
|
|
||||||
log *zap.SugaredLogger
|
|
||||||
}
|
|
||||||
|
|
||||||
type journalHeader struct {
|
|
||||||
magic uint16
|
|
||||||
version uint8
|
|
||||||
headerSize uint8
|
|
||||||
maxChunks uint32
|
|
||||||
chunkSize uint32
|
|
||||||
checksum uint32
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewStorage creates a new storage
|
|
||||||
func NewStorage(chunkSize int64, maxChunks int, maxMmapSize int64, chunkFilePath string) (*Storage, error) {
|
|
||||||
rlog := logutil.NewLogger()
|
|
||||||
log := rlog.Named("storage")
|
|
||||||
|
|
||||||
s := Storage{
|
|
||||||
ChunkSize: chunkSize,
|
|
||||||
MaxChunks: maxChunks,
|
|
||||||
chunks: make(map[RequestID]int, maxChunks),
|
|
||||||
stack: NewStack(maxChunks),
|
|
||||||
buffers: make([]*Chunk, maxChunks),
|
|
||||||
signals: make(chan os.Signal, 1),
|
|
||||||
log: log,
|
|
||||||
}
|
|
||||||
|
|
||||||
journalSize := tocSize + int64(headerSize*maxChunks)
|
|
||||||
journalOffset := chunkSize * int64(maxChunks)
|
|
||||||
|
|
||||||
// Non-empty string in chunkFilePath enables MMAP disk storage for chunks
|
|
||||||
if chunkFilePath != "" {
|
|
||||||
chunkFile, err := os.OpenFile(chunkFilePath, os.O_RDWR|os.O_CREATE, 0600)
|
|
||||||
if nil != err {
|
|
||||||
s.log.Debugf("%v", err)
|
|
||||||
return nil, fmt.Errorf("could not open chunk cache file")
|
|
||||||
}
|
|
||||||
s.ChunkFile = chunkFile
|
|
||||||
currentSize, err := chunkFile.Seek(0, os.SEEK_END)
|
|
||||||
if nil != err {
|
|
||||||
s.log.Debugf("%v", err)
|
|
||||||
return nil, fmt.Errorf("chunk file is not seekable")
|
|
||||||
}
|
|
||||||
wantedSize := journalOffset + journalSize
|
|
||||||
s.log.Debugf("Current chunk cache file size: %v B (wanted: %v B)", currentSize, wantedSize)
|
|
||||||
if err := chunkFile.Truncate(currentSize); nil != err {
|
|
||||||
s.log.Warnf("Could not truncate chunk cache, skip resizing")
|
|
||||||
} else if currentSize != wantedSize {
|
|
||||||
if currentSize > tocSize {
|
|
||||||
err = s.relocateJournal(currentSize, wantedSize, journalSize, journalOffset)
|
|
||||||
if nil != err {
|
|
||||||
s.log.Errorf("%v", err)
|
|
||||||
} else {
|
|
||||||
s.log.Infof("Relocated chunk cache journal")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if err := chunkFile.Truncate(wantedSize); nil != err {
|
|
||||||
s.log.Debugf("%v", err)
|
|
||||||
return nil, fmt.Errorf("could not resize chunk cache file")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
s.log.Infof("Created chunk cache file %v", chunkFile.Name())
|
|
||||||
s.loadChunks = int(min(currentSize/chunkSize, int64(maxChunks)))
|
|
||||||
}
|
|
||||||
|
|
||||||
// Alocate journal
|
|
||||||
if journal, err := s.mmap(journalOffset, journalSize); nil != err {
|
|
||||||
return nil, fmt.Errorf("could not allocate journal: %v", err)
|
|
||||||
} else {
|
|
||||||
if err := unix.Madvise(journal, syscall.MADV_RANDOM); nil != err {
|
|
||||||
s.log.Warnf("Madvise MADV_RANDOM for journal failed: %v", err)
|
|
||||||
}
|
|
||||||
tocOffset := journalSize - tocSize
|
|
||||||
header := journal[tocOffset:]
|
|
||||||
if valid := s.checkJournal(header, false); !valid {
|
|
||||||
s.initJournal(header)
|
|
||||||
}
|
|
||||||
s.journal = journal[:tocOffset]
|
|
||||||
}
|
|
||||||
|
|
||||||
// Setup sighandler
|
|
||||||
signal.Notify(s.signals, syscall.SIGINT, syscall.SIGTERM)
|
|
||||||
|
|
||||||
// Allocate mmap regions for chunks
|
|
||||||
if err := s.allocateMmapRegions(maxMmapSize); nil != err {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
// Map chunks to slices from mmap regions
|
|
||||||
if err := s.mmapChunks(); nil != err {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return &s, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// relocateJournal moves existing journal prior to resize
|
|
||||||
func (s *Storage) relocateJournal(currentSize, wantedSize, journalSize, journalOffset int64) error {
|
|
||||||
header := make([]byte, tocSize)
|
|
||||||
if _, err := s.ChunkFile.ReadAt(header, currentSize-tocSize); nil != err {
|
|
||||||
return fmt.Errorf("failed to read journal header: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if valid := s.checkJournal(header, true); !valid {
|
|
||||||
return fmt.Errorf("failed to validate journal header")
|
|
||||||
}
|
|
||||||
|
|
||||||
h := (*journalHeader)(unsafe.Pointer(&header[0]))
|
|
||||||
oldJournalOffset := s.ChunkSize * int64(h.maxChunks)
|
|
||||||
oldJournalSize := min(journalSize, currentSize-oldJournalOffset) - tocSize
|
|
||||||
journal := make([]byte, journalSize)
|
|
||||||
|
|
||||||
if _, err := s.ChunkFile.ReadAt(journal[:oldJournalSize], oldJournalOffset); nil != err {
|
|
||||||
return fmt.Errorf("failed to read journal: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
s.initJournal(header)
|
|
||||||
|
|
||||||
sizeWithoutJournal := currentSize - oldJournalSize - tocSize
|
|
||||||
if err := s.ChunkFile.Truncate(sizeWithoutJournal); nil != err {
|
|
||||||
return fmt.Errorf("could not truncate chunk cache journal: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := s.ChunkFile.Truncate(wantedSize); nil != err {
|
|
||||||
return fmt.Errorf("could not resize chunk cache file: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if _, err := s.ChunkFile.WriteAt(journal, journalOffset); nil != err {
|
|
||||||
return fmt.Errorf("failed to write journal: %v", err)
|
|
||||||
}
|
|
||||||
if _, err := s.ChunkFile.WriteAt(header, wantedSize-tocSize); nil != err {
|
|
||||||
return fmt.Errorf("failed to write journal header: %v", err)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// checkJournal verifies the journal header
|
|
||||||
func (s *Storage) checkJournal(journal []byte, skipMaxChunks bool) bool {
|
|
||||||
h := (*journalHeader)(unsafe.Pointer(&journal[0]))
|
|
||||||
// check magic bytes / endianess mismatch ('PD' vs 'DP')
|
|
||||||
if h.magic != journalMagic {
|
|
||||||
s.log.Debugf("Journal magic mismatch: %v != %v", h.magic, journalMagic)
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
checksum := crc32.Checksum(journal[:12], crc32Table)
|
|
||||||
if h.checksum != checksum {
|
|
||||||
s.log.Debugf("Journal checksum mismatch: %08X != %08X", h.checksum, checksum)
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
if h.version != journalVersion {
|
|
||||||
s.log.Debugf("Journal version mismatch: %v != %v", h.version, journalVersion)
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
if h.headerSize != uint8(headerSize) {
|
|
||||||
s.log.Debugf("Journal chunk header size mismatch: %v != %v", h.headerSize, headerSize)
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
if !skipMaxChunks && h.maxChunks != uint32(s.MaxChunks) {
|
|
||||||
s.log.Debugf("Journal max chunks mismatch: %v != %v", h.maxChunks, s.MaxChunks)
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
if h.chunkSize != uint32(s.ChunkSize) {
|
|
||||||
s.log.Debugf("Journal chunk size mismatch: %v != %v", h.chunkSize, s.ChunkSize)
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
s.log.Debug("Journal is valid")
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
// initJournal initializes the journal
|
|
||||||
func (s *Storage) initJournal(journal []byte) {
|
|
||||||
h := (*journalHeader)(unsafe.Pointer(&journal[0]))
|
|
||||||
h.magic = journalMagic
|
|
||||||
h.version = journalVersion
|
|
||||||
h.headerSize = uint8(headerSize)
|
|
||||||
h.maxChunks = uint32(s.MaxChunks)
|
|
||||||
h.chunkSize = uint32(s.ChunkSize)
|
|
||||||
h.checksum = crc32.Checksum(journal[:12], crc32Table)
|
|
||||||
}
|
|
||||||
|
|
||||||
// allocateMmapRegions creates memory mappings to fit all chunks
|
|
||||||
func (s *Storage) allocateMmapRegions(maxMmapSize int64) error {
|
|
||||||
s.chunksPerRegion = maxMmapSize / s.ChunkSize
|
|
||||||
regionSize := s.chunksPerRegion * s.ChunkSize
|
|
||||||
numRegions := int64(s.MaxChunks) / s.chunksPerRegion
|
|
||||||
remChunks := int64(s.MaxChunks) % s.chunksPerRegion
|
|
||||||
if remChunks != 0 {
|
|
||||||
numRegions++
|
|
||||||
}
|
|
||||||
s.mmapRegions = make([][]byte, numRegions)
|
|
||||||
for i := int64(0); i < int64(len(s.mmapRegions)); i++ {
|
|
||||||
size := regionSize
|
|
||||||
if i == numRegions-1 && remChunks != 0 {
|
|
||||||
size = remChunks * s.ChunkSize
|
|
||||||
}
|
|
||||||
s.log.Debugf("Allocate mmap region %v/%v with size %v B", i+1, numRegions, size)
|
|
||||||
region, err := s.mmap(i*regionSize, size)
|
|
||||||
if nil != err {
|
|
||||||
s.log.Errorf("failed to mmap region %v/%v with size %v B", i+1, numRegions, size)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if err := unix.Madvise(region, syscall.MADV_SEQUENTIAL); nil != err {
|
|
||||||
s.log.Warnf("Madvise MADV_SEQUENTIAL for region %v/%v failed: %v", i+1, numRegions, err)
|
|
||||||
}
|
|
||||||
s.mmapRegions[i] = region
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// mmapChunks slices buffers from mmap regions and loads chunk metadata
|
|
||||||
func (s *Storage) mmapChunks() error {
|
|
||||||
start := time.Now()
|
|
||||||
empty := list.New()
|
|
||||||
restored := list.New()
|
|
||||||
loadedChunks := 0
|
|
||||||
for i := 0; i < s.MaxChunks; i++ {
|
|
||||||
select {
|
|
||||||
case sig := <-s.signals:
|
|
||||||
s.log.Warnf("Received signal %v, aborting chunk loader", sig)
|
|
||||||
return fmt.Errorf("aborted by signal")
|
|
||||||
default:
|
|
||||||
if loaded, err := s.initChunk(i, empty, restored); nil != err {
|
|
||||||
s.log.Errorf("failed to allocate chunk %v: %v", i, err)
|
|
||||||
return fmt.Errorf("failed to initialize chunks")
|
|
||||||
} else if loaded {
|
|
||||||
loadedChunks++
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
s.stack.Prepend(restored)
|
|
||||||
s.stack.Prepend(empty)
|
|
||||||
elapsed := time.Since(start)
|
|
||||||
if nil != s.ChunkFile {
|
|
||||||
s.log.Infof("Loaded %v/%v cache chunks in %v", loadedChunks, s.MaxChunks, elapsed)
|
|
||||||
} else {
|
|
||||||
s.log.Infof("Allocated %v cache chunks in %v", s.MaxChunks, elapsed)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// initChunk tries to restore a chunk from disk
|
|
||||||
func (s *Storage) initChunk(index int, empty *list.List, restored *list.List) (bool, error) {
|
|
||||||
chunk, err := s.allocateChunk(index)
|
|
||||||
if err != nil {
|
|
||||||
s.log.Debugf("%v", err)
|
|
||||||
return false, err
|
|
||||||
}
|
|
||||||
|
|
||||||
s.buffers[index] = chunk
|
|
||||||
|
|
||||||
id := chunk.id
|
|
||||||
|
|
||||||
if blankRequestID == id || index >= s.loadChunks {
|
|
||||||
chunk.item = empty.PushBack(index)
|
|
||||||
// s.log.Tracef("Allocate chunk %v/%v", index+1, s.MaxChunks)
|
|
||||||
return false, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
chunk.item = restored.PushBack(index)
|
|
||||||
// s.log.Tracef("Load chunk %v/%v (restored: %v)", index+1, s.MaxChunks, id)
|
|
||||||
s.chunks[id] = index
|
|
||||||
|
|
||||||
return true, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// allocateChunk creates a new mmap-backed chunk
|
|
||||||
func (s *Storage) allocateChunk(index int) (*Chunk, error) {
|
|
||||||
region := int64(index) / s.chunksPerRegion
|
|
||||||
offset := (int64(index) - region*s.chunksPerRegion) * s.ChunkSize
|
|
||||||
// s.log.Tracef("Allocate chunk %v from region %v at offset %v", index+1, region, offset)
|
|
||||||
bytes := s.mmapRegions[region][offset : offset+s.ChunkSize : offset+s.ChunkSize]
|
|
||||||
headerOffset := index * headerSize
|
|
||||||
header := (*chunkHeader)(unsafe.Pointer(&s.journal[headerOffset]))
|
|
||||||
chunk := Chunk{
|
|
||||||
chunkHeader: header,
|
|
||||||
bytes: bytes,
|
|
||||||
}
|
|
||||||
return &chunk, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Storage) mmap(offset, size int64) ([]byte, error) {
|
|
||||||
if s.ChunkFile != nil {
|
|
||||||
return unix.Mmap(int(s.ChunkFile.Fd()), offset, int(size), syscall.PROT_READ|syscall.PROT_WRITE, syscall.MAP_SHARED)
|
|
||||||
} else {
|
|
||||||
return unix.Mmap(-1, 0, int(size), syscall.PROT_READ|syscall.PROT_WRITE, syscall.MAP_ANON|syscall.MAP_PRIVATE)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Clear removes all old chunks on disk (will be called on each program start)
|
|
||||||
func (s *Storage) Clear() error {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Load a chunk from ram or creates it
|
|
||||||
func (s *Storage) Load(id RequestID) []byte {
|
|
||||||
s.lock.RLock()
|
|
||||||
chunk := s.fetch(id)
|
|
||||||
if nil == chunk {
|
|
||||||
// s.log.Tracef("Load chunk %v (missing)", id)
|
|
||||||
s.lock.RUnlock()
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
if chunk.clean {
|
|
||||||
// s.log.Tracef("Load chunk %v (clean)", id)
|
|
||||||
defer s.lock.RUnlock()
|
|
||||||
return chunk.bytes
|
|
||||||
}
|
|
||||||
s.lock.RUnlock()
|
|
||||||
// Switch to write lock to avoid races on crc verification
|
|
||||||
s.lock.Lock()
|
|
||||||
defer s.lock.Unlock()
|
|
||||||
if chunk.valid(id) {
|
|
||||||
s.log.Debugf("Load chunk %v (verified)", id)
|
|
||||||
return chunk.bytes
|
|
||||||
}
|
|
||||||
s.log.Warnf("Load chunk %v (bad checksum: %08x <> %08x)", id, chunk.checksum, chunk.calculateChecksum())
|
|
||||||
s.stack.Purge(chunk.item)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Store stores a chunk in the RAM and adds it to the disk storage queue
|
|
||||||
func (s *Storage) Store(id RequestID, bytes []byte) (err error) {
|
|
||||||
s.lock.RLock()
|
|
||||||
|
|
||||||
// Avoid storing same chunk multiple times
|
|
||||||
chunk := s.fetch(id)
|
|
||||||
if nil != chunk && chunk.clean {
|
|
||||||
// s.log.Tracef("Create chunk %v (exists: clean)", id)
|
|
||||||
s.lock.RUnlock()
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
s.lock.RUnlock()
|
|
||||||
s.lock.Lock()
|
|
||||||
defer s.lock.Unlock()
|
|
||||||
|
|
||||||
if nil != chunk {
|
|
||||||
if chunk.valid(id) {
|
|
||||||
s.log.Debugf("Create chunk %v (exists: valid)", id)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
s.log.Warnf("Create chunk %v(exists: overwrite)", id)
|
|
||||||
} else {
|
|
||||||
index := s.stack.Pop()
|
|
||||||
if index == -1 {
|
|
||||||
s.log.Debugf("Create chunk %v (failed)", id)
|
|
||||||
return fmt.Errorf("no buffers available")
|
|
||||||
}
|
|
||||||
chunk = s.buffers[index]
|
|
||||||
deleteID := chunk.id
|
|
||||||
if blankRequestID != deleteID {
|
|
||||||
delete(s.chunks, deleteID)
|
|
||||||
s.log.Debugf("Create chunk %v (reused)", id)
|
|
||||||
} else {
|
|
||||||
s.log.Debugf("Create chunk %v (stored)", id)
|
|
||||||
}
|
|
||||||
s.chunks[id] = index
|
|
||||||
chunk.item = s.stack.Push(index)
|
|
||||||
}
|
|
||||||
|
|
||||||
chunk.update(id, bytes)
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// fetch chunk and index by id
|
|
||||||
func (s *Storage) fetch(id RequestID) *Chunk {
|
|
||||||
index, exists := s.chunks[id]
|
|
||||||
if !exists {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
chunk := s.buffers[index]
|
|
||||||
s.stack.Touch(chunk.item)
|
|
||||||
return chunk
|
|
||||||
}
|
|
||||||
Reference in New Issue
Block a user