package chunk import ( "fmt" "io" "net/http" "sync" "syscall" "time" "github.com/debridmediamanager.com/zurg/internal/config" "github.com/debridmediamanager.com/zurg/pkg/logutil" "github.com/debridmediamanager.com/zurg/pkg/realdebrid" "go.uber.org/zap" "golang.org/x/sys/unix" ) // Downloader handles concurrent chunk downloads type Downloader struct { BufferSize int64 queue chan *Request callbacks map[RequestID][]DownloadCallback lock sync.Mutex storage *Storage c config.ConfigInterface log *zap.SugaredLogger } type DownloadCallback func(error, []byte) // NewDownloader creates a new download manager func NewDownloader(threads int, storage *Storage, bufferSize int64, c config.ConfigInterface) (*Downloader, error) { rlog := logutil.NewLogger() log := rlog.Named("downloader") manager := Downloader{ BufferSize: bufferSize, queue: make(chan *Request, 100), callbacks: make(map[RequestID][]DownloadCallback, 100), storage: storage, c: c, log: log, } for i := 0; i < threads; i++ { go manager.thread(i) } return &manager, nil } // Download starts a new download request func (d *Downloader) Download(req *Request, callback DownloadCallback) { d.lock.Lock() callbacks, exists := d.callbacks[req.id] if nil != callback { d.callbacks[req.id] = append(callbacks, callback) } else if !exists { d.callbacks[req.id] = callbacks } if !exists { d.queue <- req } d.lock.Unlock() } func (d *Downloader) thread(n int) { buffer, err := unix.Mmap(-1, 0, int(d.BufferSize), syscall.PROT_READ|syscall.PROT_WRITE, syscall.MAP_ANON|syscall.MAP_PRIVATE) if nil != err { d.log.Warnf("Failed to mmap download buffer %v: %v", n, err) buffer = make([]byte, d.BufferSize) } for { req := <-d.queue d.download(req, buffer) } } func (d *Downloader) download(req *Request, buffer []byte) { d.log.Debugf("Starting download %v (preload: %v)", req.id, req.preload) err := d.downloadFromAPI(req, buffer, 0) d.lock.Lock() callbacks := d.callbacks[req.id] for _, callback := range callbacks { callback(err, buffer) } delete(d.callbacks, req.id) d.lock.Unlock() if nil != err { return } if err := d.storage.Store(req.id, buffer); nil != err { d.log.Warnf("Could not store chunk %v: %v", req.id, err) } } func (d *Downloader) downloadFromAPI(request *Request, buffer []byte, delay int64) error { // sleep if request is throttled if delay > 0 { time.Sleep(time.Duration(delay) * time.Second) } unrestrictFn := func() (*realdebrid.UnrestrictResponse, error) { return realdebrid.UnrestrictLink(d.c.GetToken(), request.file.Link) } resp := realdebrid.RetryUntilOk(unrestrictFn) if resp == nil { return fmt.Errorf("cannot unrestrict file %s %s", request.file.Path, request.file.Link) } downloadURL := resp.Download req, err := http.NewRequest("GET", downloadURL, nil) if nil != err { d.log.Debugf("%v", err) return fmt.Errorf("could not create request object %s %s from API", request.file.Path, request.file.Link) } req.Header.Add("Range", fmt.Sprintf("bytes=%v-%v", request.offsetStart, request.offsetEnd-1)) d.log.Debugw("Sending HTTP Request %v", req) res, err := http.DefaultClient.Do(req) if nil != err { d.log.Debugf("%v", err) return fmt.Errorf("could not request object %s %s from API", request.file.Path, request.file.Link) } defer res.Body.Close() reader := res.Body if res.StatusCode != 206 && res.StatusCode != 200 { return fmt.Errorf("could not read object %s %s / StatusCode: %v", request.file.Path, request.file.Link, res.StatusCode) } if res.ContentLength == -1 { return fmt.Errorf("missing Content-Length header in response") } n, err := io.ReadFull(reader, buffer[:res.ContentLength:cap(buffer)]) if nil != err { d.log.Debugf("%v", err) return fmt.Errorf("could not read objects %s %s API response", request.file.Path, request.file.Link) } d.log.Debugf("Downloaded %v bytes of %s %s", n, request.file.Path, request.file.Link) return nil }