Files
zurg/pkg/chunk/download.go
2023-11-11 11:52:34 +01:00

147 lines
3.9 KiB
Go

package chunk
import (
"fmt"
"io"
"net/http"
"sync"
"syscall"
"time"
"github.com/debridmediamanager.com/zurg/internal/config"
"github.com/debridmediamanager.com/zurg/internal/torrent"
"github.com/debridmediamanager.com/zurg/pkg/logutil"
"go.uber.org/zap"
"golang.org/x/sys/unix"
)
// Downloader handles concurrent chunk downloads
type Downloader struct {
BufferSize int64
queue chan *Request
callbacks map[RequestID][]DownloadCallback
lock sync.Mutex
storage *Storage
c config.ConfigInterface
t *torrent.TorrentManager
log *zap.SugaredLogger
}
type DownloadCallback func(error, []byte)
// NewDownloader creates a new download manager
func NewDownloader(threads int, storage *Storage, bufferSize int64, t *torrent.TorrentManager, c config.ConfigInterface) (*Downloader, error) {
rlog := logutil.NewLogger()
log := rlog.Named("downloader")
manager := Downloader{
BufferSize: bufferSize,
queue: make(chan *Request, 100),
callbacks: make(map[RequestID][]DownloadCallback, 100),
storage: storage,
c: c,
t: t,
log: log,
}
for i := 0; i < threads; i++ {
go manager.thread(i)
}
return &manager, nil
}
// Download starts a new download request
func (d *Downloader) Download(req *Request, callback DownloadCallback) {
d.lock.Lock()
callbacks, exists := d.callbacks[req.id]
if nil != callback {
d.callbacks[req.id] = append(callbacks, callback)
} else if !exists {
d.callbacks[req.id] = callbacks
}
if !exists {
d.queue <- req
}
d.lock.Unlock()
}
func (d *Downloader) thread(n int) {
buffer, err := unix.Mmap(-1, 0, int(d.BufferSize), syscall.PROT_READ|syscall.PROT_WRITE, syscall.MAP_ANON|syscall.MAP_PRIVATE)
if nil != err {
d.log.Warnf("Failed to mmap download buffer %v: %v", n, err)
buffer = make([]byte, d.BufferSize)
}
for {
req := <-d.queue
d.download(req, buffer)
}
}
func (d *Downloader) download(req *Request, buffer []byte) {
d.log.Debugf("Starting download %v (preload: %v)", req.id, req.preload)
err := d.downloadFromAPI(req, buffer, 0)
d.lock.Lock()
callbacks := d.callbacks[req.id]
for _, callback := range callbacks {
callback(err, buffer)
}
delete(d.callbacks, req.id)
d.lock.Unlock()
if nil != err {
return
}
if err := d.storage.Store(req.id, buffer); nil != err {
d.log.Warnf("Could not store chunk %v: %v", req.id, err)
}
}
func (d *Downloader) downloadFromAPI(request *Request, buffer []byte, delay int64) error {
// sleep if request is throttled
if delay > 0 {
time.Sleep(time.Duration(delay) * time.Second)
}
resp := d.t.UnrestrictUntilOk(request.file.Link)
if resp == nil {
return fmt.Errorf("cannot unrestrict file %s %s", request.file.Path, request.file.Link)
}
downloadURL := resp.Download
req, err := http.NewRequest("GET", downloadURL, nil)
if nil != err {
d.log.Debugf("request init error: %v", err)
return fmt.Errorf("could not create request object %s %s from API", request.file.Path, request.file.Link)
}
req.Header.Add("Range", fmt.Sprintf("bytes=%v-%v", request.offsetStart, request.offsetEnd-1))
res, err := http.DefaultClient.Do(req)
if nil != err {
d.log.Debugf("request error: %v", err)
return fmt.Errorf("could not request object %s %s from API", request.file.Path, request.file.Link)
}
defer res.Body.Close()
reader := res.Body
if res.StatusCode != 206 && res.StatusCode != 200 {
return fmt.Errorf("could not read object %s %s / StatusCode: %v",
request.file.Path, request.file.Link, res.StatusCode)
}
if res.ContentLength == -1 {
return fmt.Errorf("missing Content-Length header in response")
}
n, err := io.ReadFull(reader, buffer[:res.ContentLength:cap(buffer)])
if nil != err && err != io.ErrUnexpectedEOF {
d.log.Debugf("response read error: %v", err)
return fmt.Errorf("could not read objects %s %s API response", request.file.Path, request.file.Link)
}
d.log.Debugf("Downloaded %v bytes of %s %s", n, request.file.Path, request.file.Link)
return nil
}