Use the chunk manager properly
This commit is contained in:
2
.gitignore
vendored
2
.gitignore
vendored
@@ -30,3 +30,5 @@ zurg
|
|||||||
config.yml
|
config.yml
|
||||||
|
|
||||||
error_videos/*.mp4
|
error_videos/*.mp4
|
||||||
|
|
||||||
|
mnt/
|
||||||
|
|||||||
@@ -16,6 +16,7 @@ import (
|
|||||||
"github.com/debridmediamanager.com/zurg/internal/version"
|
"github.com/debridmediamanager.com/zurg/internal/version"
|
||||||
"github.com/debridmediamanager.com/zurg/internal/zfs"
|
"github.com/debridmediamanager.com/zurg/internal/zfs"
|
||||||
"github.com/debridmediamanager.com/zurg/pkg/chunk"
|
"github.com/debridmediamanager.com/zurg/pkg/chunk"
|
||||||
|
zurghttp "github.com/debridmediamanager.com/zurg/pkg/http"
|
||||||
"github.com/debridmediamanager.com/zurg/pkg/logutil"
|
"github.com/debridmediamanager.com/zurg/pkg/logutil"
|
||||||
"github.com/debridmediamanager.com/zurg/pkg/realdebrid"
|
"github.com/debridmediamanager.com/zurg/pkg/realdebrid"
|
||||||
"github.com/hashicorp/golang-lru/v2/expirable"
|
"github.com/hashicorp/golang-lru/v2/expirable"
|
||||||
@@ -44,7 +45,7 @@ func main() {
|
|||||||
|
|
||||||
cache := expirable.NewLRU[string, string](1e4, nil, time.Hour)
|
cache := expirable.NewLRU[string, string](1e4, nil, time.Hour)
|
||||||
|
|
||||||
rd := realdebrid.NewRealDebrid(config.GetToken(), config, logutil.NewLogger().Named("realdebrid"))
|
rd := realdebrid.NewRealDebrid(config.GetToken(), logutil.NewLogger().Named("realdebrid"))
|
||||||
|
|
||||||
torrentMgr := torrent.NewTorrentManager(config, rd)
|
torrentMgr := torrent.NewTorrentManager(config, rd)
|
||||||
|
|
||||||
@@ -65,16 +66,16 @@ func main() {
|
|||||||
}()
|
}()
|
||||||
|
|
||||||
log.Debugf("Initializing chunk manager, cores: %d", runtime.NumCPU())
|
log.Debugf("Initializing chunk manager, cores: %d", runtime.NumCPU())
|
||||||
// 64kb request size
|
client := zurghttp.NewHTTPClient(config.GetToken(), 10, config)
|
||||||
chunkMgr, err := chunk.NewManager(
|
chunkMgr, err := chunk.NewManager(
|
||||||
"", // in-memory chunk file
|
"", // in-memory chunks
|
||||||
5242880, // 10MB
|
10485760, // 10MB chunk size
|
||||||
max((runtime.NumCPU()/2)-1, 1), // 1 chunk - load ahead (1MB total)
|
max(runtime.NumCPU()/2, 1), // 8 cores/2 = 4 chunks to load ahead
|
||||||
max(runtime.NumCPU()/2, 1), // check threads
|
max(runtime.NumCPU()/2, 1), // 4 check threads
|
||||||
max(runtime.NumCPU()/2, 1), // load threads
|
max(runtime.NumCPU()-1, 1), // number of chunks that should be read ahead
|
||||||
runtime.NumCPU()*4,
|
runtime.NumCPU()*2, // total chunks kept in memory
|
||||||
torrentMgr, // max chunks
|
torrentMgr,
|
||||||
config)
|
client)
|
||||||
if nil != err {
|
if nil != err {
|
||||||
log.Panicf("Failed to initialize chunk manager: %v", err)
|
log.Panicf("Failed to initialize chunk manager: %v", err)
|
||||||
}
|
}
|
||||||
|
|||||||
2
go.mod
2
go.mod
@@ -11,6 +11,6 @@ require (
|
|||||||
)
|
)
|
||||||
|
|
||||||
require (
|
require (
|
||||||
github.com/winfsp/cgofuse v1.5.0 // indirect
|
github.com/winfsp/cgofuse v1.5.0
|
||||||
go.uber.org/multierr v1.10.0 // indirect
|
go.uber.org/multierr v1.10.0 // indirect
|
||||||
)
|
)
|
||||||
|
|||||||
4
go.sum
4
go.sum
@@ -1,5 +1,3 @@
|
|||||||
bazil.org/fuse v0.0.0-20230120002735-62a210ff1fd5 h1:A0NsYy4lDBZAC6QiYeJ4N+XuHIKBpyhAVRMHRQZKTeQ=
|
|
||||||
bazil.org/fuse v0.0.0-20230120002735-62a210ff1fd5/go.mod h1:gG3RZAMXCa/OTes6rr9EwusmR1OH1tDDy+cg9c5YliY=
|
|
||||||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||||
github.com/elliotchance/orderedmap/v2 v2.2.0 h1:7/2iwO98kYT4XkOjA9mBEIwvi4KpGB4cyHeOFOnj4Vk=
|
github.com/elliotchance/orderedmap/v2 v2.2.0 h1:7/2iwO98kYT4XkOjA9mBEIwvi4KpGB4cyHeOFOnj4Vk=
|
||||||
@@ -10,8 +8,6 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
|
|||||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||||
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
|
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
|
||||||
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
|
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
|
||||||
github.com/tv42/httpunix v0.0.0-20191220191345-2ba4b9c3382c h1:u6SKchux2yDvFQnDHS3lPnIRmfVJ5Sxy3ao2SIdysLQ=
|
|
||||||
github.com/tv42/httpunix v0.0.0-20191220191345-2ba4b9c3382c/go.mod h1:hzIxponao9Kjc7aWznkXaL4U4TWaDSs8zcsY4Ka08nM=
|
|
||||||
github.com/winfsp/cgofuse v1.5.0 h1:MsBP7Mi/LiJf/7/F3O/7HjjR009ds6KCdqXzKpZSWxI=
|
github.com/winfsp/cgofuse v1.5.0 h1:MsBP7Mi/LiJf/7/F3O/7HjjR009ds6KCdqXzKpZSWxI=
|
||||||
github.com/winfsp/cgofuse v1.5.0/go.mod h1:h3awhoUOcn2VYVKCwDaYxSLlZwnyK+A8KaDoLUp2lbU=
|
github.com/winfsp/cgofuse v1.5.0/go.mod h1:h3awhoUOcn2VYVKCwDaYxSLlZwnyK+A8KaDoLUp2lbU=
|
||||||
go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk=
|
go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk=
|
||||||
|
|||||||
@@ -7,7 +7,7 @@ import (
|
|||||||
|
|
||||||
"github.com/debridmediamanager.com/zurg/internal/config"
|
"github.com/debridmediamanager.com/zurg/internal/config"
|
||||||
"github.com/debridmediamanager.com/zurg/internal/dav"
|
"github.com/debridmediamanager.com/zurg/internal/dav"
|
||||||
intHttp "github.com/debridmediamanager.com/zurg/internal/http"
|
zurghttp "github.com/debridmediamanager.com/zurg/internal/http"
|
||||||
"github.com/debridmediamanager.com/zurg/internal/torrent"
|
"github.com/debridmediamanager.com/zurg/internal/torrent"
|
||||||
"github.com/debridmediamanager.com/zurg/internal/universal"
|
"github.com/debridmediamanager.com/zurg/internal/universal"
|
||||||
"github.com/debridmediamanager.com/zurg/pkg/logutil"
|
"github.com/debridmediamanager.com/zurg/pkg/logutil"
|
||||||
@@ -25,11 +25,11 @@ func Router(mux *http.ServeMux, c config.ConfigInterface, t *torrent.TorrentMana
|
|||||||
if countNonEmptySegments(strings.Split(requestPath, "/")) > 3 {
|
if countNonEmptySegments(strings.Split(requestPath, "/")) > 3 {
|
||||||
universal.HandleGetRequest(w, r, t, c, cache)
|
universal.HandleGetRequest(w, r, t, c, cache)
|
||||||
} else {
|
} else {
|
||||||
intHttp.HandleDirectoryListing(w, r, t, c)
|
zurghttp.HandleDirectoryListing(w, r, t, c)
|
||||||
}
|
}
|
||||||
|
|
||||||
case http.MethodHead:
|
case http.MethodHead:
|
||||||
universal.HandleHeadRequest(w, r, t, c, cache)
|
universal.HandleHeadRequest(w, r, t, cache)
|
||||||
|
|
||||||
default:
|
default:
|
||||||
log.Errorf("Request %s %s not supported yet", r.Method, r.URL.Path)
|
log.Errorf("Request %s %s not supported yet", r.Method, r.URL.Path)
|
||||||
|
|||||||
@@ -7,13 +7,12 @@ import (
|
|||||||
"path/filepath"
|
"path/filepath"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"github.com/debridmediamanager.com/zurg/internal/config"
|
|
||||||
"github.com/debridmediamanager.com/zurg/internal/torrent"
|
"github.com/debridmediamanager.com/zurg/internal/torrent"
|
||||||
"github.com/debridmediamanager.com/zurg/pkg/logutil"
|
"github.com/debridmediamanager.com/zurg/pkg/logutil"
|
||||||
"github.com/hashicorp/golang-lru/v2/expirable"
|
"github.com/hashicorp/golang-lru/v2/expirable"
|
||||||
)
|
)
|
||||||
|
|
||||||
func HandleHeadRequest(w http.ResponseWriter, r *http.Request, t *torrent.TorrentManager, c config.ConfigInterface, cache *expirable.LRU[string, string]) {
|
func HandleHeadRequest(w http.ResponseWriter, r *http.Request, t *torrent.TorrentManager, cache *expirable.LRU[string, string]) {
|
||||||
log := logutil.NewLogger().Named("unihead")
|
log := logutil.NewLogger().Named("unihead")
|
||||||
|
|
||||||
requestPath := path.Clean(r.URL.Path)
|
requestPath := path.Clean(r.URL.Path)
|
||||||
|
|||||||
@@ -9,6 +9,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/debridmediamanager.com/zurg/internal/torrent"
|
"github.com/debridmediamanager.com/zurg/internal/torrent"
|
||||||
|
zurghttp "github.com/debridmediamanager.com/zurg/pkg/http"
|
||||||
"github.com/debridmediamanager.com/zurg/pkg/logutil"
|
"github.com/debridmediamanager.com/zurg/pkg/logutil"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
"golang.org/x/sys/unix"
|
"golang.org/x/sys/unix"
|
||||||
@@ -23,12 +24,13 @@ type Downloader struct {
|
|||||||
storage *Storage
|
storage *Storage
|
||||||
torMgr *torrent.TorrentManager
|
torMgr *torrent.TorrentManager
|
||||||
log *zap.SugaredLogger
|
log *zap.SugaredLogger
|
||||||
|
client *zurghttp.HTTPClient
|
||||||
}
|
}
|
||||||
|
|
||||||
type DownloadCallback func(error, []byte)
|
type DownloadCallback func(error, []byte)
|
||||||
|
|
||||||
// NewDownloader creates a new download manager
|
// NewDownloader creates a new download manager
|
||||||
func NewDownloader(threads int, storage *Storage, bufferSize int64, torMgr *torrent.TorrentManager) (*Downloader, error) {
|
func NewDownloader(threads int, storage *Storage, bufferSize int64, torMgr *torrent.TorrentManager, client *zurghttp.HTTPClient) (*Downloader, error) {
|
||||||
rlog := logutil.NewLogger()
|
rlog := logutil.NewLogger()
|
||||||
log := rlog.Named("downloader")
|
log := rlog.Named("downloader")
|
||||||
|
|
||||||
@@ -38,6 +40,7 @@ func NewDownloader(threads int, storage *Storage, bufferSize int64, torMgr *torr
|
|||||||
callbacks: make(map[RequestID][]DownloadCallback, 100),
|
callbacks: make(map[RequestID][]DownloadCallback, 100),
|
||||||
storage: storage,
|
storage: storage,
|
||||||
torMgr: torMgr,
|
torMgr: torMgr,
|
||||||
|
client: client,
|
||||||
log: log,
|
log: log,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -76,7 +79,6 @@ func (d *Downloader) thread(n int) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (d *Downloader) download(req *Request, buffer []byte) {
|
func (d *Downloader) download(req *Request, buffer []byte) {
|
||||||
d.log.Debugf("Starting download %v (preload: %v)", req.id, req.preload)
|
|
||||||
err := d.downloadFromAPI(req, buffer, 0)
|
err := d.downloadFromAPI(req, buffer, 0)
|
||||||
|
|
||||||
d.lock.Lock()
|
d.lock.Lock()
|
||||||
@@ -137,7 +139,7 @@ func (d *Downloader) downloadFromAPI(request *Request, buffer []byte, delay int6
|
|||||||
d.log.Debugf("response read error: %v", err)
|
d.log.Debugf("response read error: %v", err)
|
||||||
return fmt.Errorf("could not read objects %s %s API response", request.file.Path, request.file.Link)
|
return fmt.Errorf("could not read objects %s %s API response", request.file.Path, request.file.Link)
|
||||||
}
|
}
|
||||||
d.log.Debugf("Downloaded %v bytes of %s %s", n, request.file.Path, request.file.Link)
|
d.log.Debugf("Downloaded %v bytes of %s", n, request.file.Path)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -6,8 +6,8 @@ import (
|
|||||||
"hash/fnv"
|
"hash/fnv"
|
||||||
"os"
|
"os"
|
||||||
|
|
||||||
"github.com/debridmediamanager.com/zurg/internal/config"
|
|
||||||
"github.com/debridmediamanager.com/zurg/internal/torrent"
|
"github.com/debridmediamanager.com/zurg/internal/torrent"
|
||||||
|
zurghttp "github.com/debridmediamanager.com/zurg/pkg/http"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Manager manages chunks on disk
|
// Manager manages chunks on disk
|
||||||
@@ -59,7 +59,7 @@ func NewManager(
|
|||||||
loadThreads,
|
loadThreads,
|
||||||
maxChunks int,
|
maxChunks int,
|
||||||
torMgr *torrent.TorrentManager,
|
torMgr *torrent.TorrentManager,
|
||||||
cfg config.ConfigInterface) (*Manager, error) {
|
client *zurghttp.HTTPClient) (*Manager, error) {
|
||||||
|
|
||||||
pageSize := int64(os.Getpagesize())
|
pageSize := int64(os.Getpagesize())
|
||||||
if chunkSize < pageSize {
|
if chunkSize < pageSize {
|
||||||
@@ -82,7 +82,7 @@ func NewManager(
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
downloader, err := NewDownloader(loadThreads, storage, chunkSize, torMgr)
|
downloader, err := NewDownloader(loadThreads, storage, chunkSize, torMgr, client)
|
||||||
if nil != err {
|
if nil != err {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -14,7 +14,7 @@ type HTTPClient struct {
|
|||||||
Client *http.Client
|
Client *http.Client
|
||||||
MaxRetries int
|
MaxRetries int
|
||||||
Backoff func(attempt int) time.Duration
|
Backoff func(attempt int) time.Duration
|
||||||
CheckRespStatus func(resp *http.Response, err error, log *zap.SugaredLogger) bool
|
CheckRespStatus func(resp *http.Response, err error) bool
|
||||||
BearerToken string
|
BearerToken string
|
||||||
log *zap.SugaredLogger
|
log *zap.SugaredLogger
|
||||||
config config.ConfigInterface
|
config config.ConfigInterface
|
||||||
@@ -34,7 +34,7 @@ func (r *HTTPClient) Do(req *http.Request) (*http.Response, error) {
|
|||||||
var err error
|
var err error
|
||||||
for attempt := 0; attempt < r.MaxRetries; attempt++ {
|
for attempt := 0; attempt < r.MaxRetries; attempt++ {
|
||||||
resp, err = r.Client.Do(req)
|
resp, err = r.Client.Do(req)
|
||||||
if !r.CheckRespStatus(resp, err, r.log) {
|
if !r.CheckRespStatus(resp, err) {
|
||||||
return resp, err
|
return resp, err
|
||||||
}
|
}
|
||||||
time.Sleep(r.Backoff(attempt))
|
time.Sleep(r.Backoff(attempt))
|
||||||
@@ -50,7 +50,7 @@ func NewHTTPClient(token string, maxRetries int, cfg config.ConfigInterface) *HT
|
|||||||
Backoff: func(attempt int) time.Duration {
|
Backoff: func(attempt int) time.Duration {
|
||||||
return time.Duration(attempt) * time.Second
|
return time.Duration(attempt) * time.Second
|
||||||
},
|
},
|
||||||
CheckRespStatus: func(resp *http.Response, err error, log *zap.SugaredLogger) bool {
|
CheckRespStatus: func(resp *http.Response, err error) bool {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -10,7 +10,6 @@ import (
|
|||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"github.com/debridmediamanager.com/zurg/internal/config"
|
|
||||||
zurghttp "github.com/debridmediamanager.com/zurg/pkg/http"
|
zurghttp "github.com/debridmediamanager.com/zurg/pkg/http"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
@@ -20,7 +19,7 @@ type RealDebrid struct {
|
|||||||
client *zurghttp.HTTPClient
|
client *zurghttp.HTTPClient
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewRealDebrid(accessToken string, config config.ConfigInterface, log *zap.SugaredLogger) *RealDebrid {
|
func NewRealDebrid(accessToken string, log *zap.SugaredLogger) *RealDebrid {
|
||||||
maxRetries := 10
|
maxRetries := 10
|
||||||
client := zurghttp.NewHTTPClient(accessToken, maxRetries, nil)
|
client := zurghttp.NewHTTPClient(accessToken, maxRetries, nil)
|
||||||
return &RealDebrid{
|
return &RealDebrid{
|
||||||
|
|||||||
Reference in New Issue
Block a user