Optimize streaming from Real-Debrid

This commit is contained in:
Ben Adrian Sarmiento
2024-06-27 12:35:44 +02:00
parent bd1a163002
commit 7a7a79e882
6 changed files with 52 additions and 36 deletions

View File

@@ -53,9 +53,6 @@ on_library_update: |
echo "detected update on: $arg" echo "detected update on: $arg"
done done
# buffer size when zurg is streaming files
network_buffer_size: 1048576 # 1 MiB
# true = send link to rclone and rclone will stream the file # true = send link to rclone and rclone will stream the file
# false = zurg will stream the file # false = zurg will stream the file
serve_from_rclone: false serve_from_rclone: false

View File

@@ -17,7 +17,6 @@ type ConfigInterface interface {
GetDownloadTimeoutSecs() int GetDownloadTimeoutSecs() int
GetDumpTorrentsEveryMins() int GetDumpTorrentsEveryMins() int
GetHost() string GetHost() string
GetNetworkBufferSize() int
GetNumberOfHosts() int GetNumberOfHosts() int
GetNumberOfWorkers() int GetNumberOfWorkers() int
GetOnLibraryUpdate() string GetOnLibraryUpdate() string
@@ -56,7 +55,6 @@ type ZurgConfig struct {
Host string `yaml:"host" json:"host"` Host string `yaml:"host" json:"host"`
IgnoreRenames bool `yaml:"ignore_renames" json:"ignore_renames"` IgnoreRenames bool `yaml:"ignore_renames" json:"ignore_renames"`
LogRequests bool `yaml:"log_requests" json:"log_requests"` LogRequests bool `yaml:"log_requests" json:"log_requests"`
NetworkBufferSize int `yaml:"network_buffer_size" json:"network_buffer_size"`
NumberOfHosts int `yaml:"number_of_hosts" json:"number_of_hosts"` NumberOfHosts int `yaml:"number_of_hosts" json:"number_of_hosts"`
NumOfWorkers int `yaml:"concurrent_workers" json:"concurrent_workers"` NumOfWorkers int `yaml:"concurrent_workers" json:"concurrent_workers"`
OnLibraryUpdate string `yaml:"on_library_update" json:"on_library_update"` OnLibraryUpdate string `yaml:"on_library_update" json:"on_library_update"`
@@ -150,13 +148,6 @@ func (z *ZurgConfig) GetOnLibraryUpdate() string {
return z.OnLibraryUpdate return z.OnLibraryUpdate
} }
func (z *ZurgConfig) GetNetworkBufferSize() int {
if z.NetworkBufferSize == 0 {
return 32 * 1024 // 32kb
}
return z.NetworkBufferSize
}
func (z *ZurgConfig) EnableRetainFolderNameExtension() bool { func (z *ZurgConfig) EnableRetainFolderNameExtension() bool {
return z.RetainFolderNameExtension return z.RetainFolderNameExtension
} }

View File

@@ -315,7 +315,7 @@ func (zr *Handlers) handleHome(resp http.ResponseWriter, req *http.Request) {
out += fmt.Sprintf(` out += fmt.Sprintf(`
<tr> <tr>
<td rowspan="23">Config</td> <td rowspan="22">Config</td>
<td>Version</td> <td>Version</td>
<td>%s</td> <td>%s</td>
</tr> </tr>
@@ -391,10 +391,6 @@ func (zr *Handlers) handleHome(resp http.ResponseWriter, req *http.Request) {
<td>Additional Playable Extensions</td> <td>Additional Playable Extensions</td>
<td>%s</td> <td>%s</td>
</tr> </tr>
<tr>
<td>Network Buffer Size</td>
<td>%d bytes</td>
</tr>
<tr> <tr>
<td>Serve From Rclone</td> <td>Serve From Rclone</td>
<td>%t</td> <td>%t</td>
@@ -428,7 +424,6 @@ func (zr *Handlers) handleHome(resp http.ResponseWriter, req *http.Request) {
response.Config.ShouldAutoAnalyzeNewTorrents(), response.Config.ShouldAutoAnalyzeNewTorrents(),
response.Config.ShouldCacheNetworkTestResults(), response.Config.ShouldCacheNetworkTestResults(),
response.Config.GetPlayableExtensions(), response.Config.GetPlayableExtensions(),
response.Config.GetNetworkBufferSize(),
response.Config.ShouldServeFromRclone(), response.Config.ShouldServeFromRclone(),
response.Config.ShouldForceIPv6(), response.Config.ShouldForceIPv6(),
response.Config.GetOnLibraryUpdate(), response.Config.GetOnLibraryUpdate(),

View File

@@ -68,28 +68,28 @@ func (dl *Downloader) DownloadFile(
) { ) {
torrents, ok := torMgr.DirectoryMap.Get(directory) torrents, ok := torMgr.DirectoryMap.Get(directory)
if !ok { if !ok {
log.Warnf("Cannot find directory %s", directory) log.Errorf("Cannot find directory %s", directory)
http.Error(resp, "File not found", http.StatusNotFound) http.Error(resp, "File not found", http.StatusNotFound)
return return
} }
torrent, ok := torrents.Get(torrentName) torrent, ok := torrents.Get(torrentName)
if !ok { if !ok {
log.Warnf("Cannot find torrent %sfrom path %s", torrentName, req.URL.Path) log.Errorf("Cannot find torrent %s from path %s", torrentName, req.URL.Path)
http.Error(resp, "File not found", http.StatusNotFound) http.Error(resp, "File not found", http.StatusNotFound)
return return
} }
file, ok := torrent.SelectedFiles.Get(fileName) file, ok := torrent.SelectedFiles.Get(fileName)
if !ok || !file.State.Is("ok_file") { if !ok || !file.State.Is("ok_file") {
// log.Warnf("Cannot find file %s from path %s", fileName, req.URL.Path) log.Errorf("Cannot find file %s from path %s", fileName, req.URL.Path)
http.Error(resp, "File not found", http.StatusNotFound) http.Error(resp, "File not found", http.StatusNotFound)
return return
} }
unrestrict, err := torMgr.UnrestrictFile(file, cfg.ShouldServeFromRclone()) unrestrict, err := torMgr.UnrestrictFile(file, cfg.ShouldServeFromRclone())
if dlErr, ok := err.(*zurghttp.DownloadErrorResponse); ok && dlErr.Message == "bytes_limit_reached" { if dlErr, ok := err.(*zurghttp.DownloadErrorResponse); ok && dlErr.Message == "bytes_limit_reached" {
// log.Warnf("Your account has reached the bandwidth limit, please try again after 12AM CET") // log.Errorf("Your account has reached the bandwidth limit, please try again after 12AM CET")
http.Error(resp, "File is not available (bandwidth limit reached)", http.StatusBadRequest) http.Error(resp, "File is not available (bandwidth limit reached)", http.StatusBadRequest)
return return
} }
@@ -133,7 +133,7 @@ func (dl *Downloader) DownloadLink(
// log.Debugf("Opening file %s (%s)", fileName, link) // log.Debugf("Opening file %s (%s)", fileName, link)
unrestrict, err := torMgr.UnrestrictLink(link, cfg.ShouldServeFromRclone()) unrestrict, err := torMgr.UnrestrictLink(link, cfg.ShouldServeFromRclone())
if dlErr, ok := err.(*zurghttp.DownloadErrorResponse); ok && dlErr.Message == "bytes_limit_reached" { if dlErr, ok := err.(*zurghttp.DownloadErrorResponse); ok && dlErr.Message == "bytes_limit_reached" {
// log.Warnf("Your account has reached the bandwidth limit, please try again after 12AM CET") // log.Errorf("Your account has reached the bandwidth limit, please try again after 12AM CET")
http.Error(resp, "Link is not available (bandwidth limit reached)", http.StatusBadRequest) http.Error(resp, "Link is not available (bandwidth limit reached)", http.StatusBadRequest)
return return
} }
@@ -172,18 +172,16 @@ func (dl *Downloader) streamFileToResponse(
// Add the range header if it exists // Add the range header if it exists
if req.Header.Get("Range") != "" { if req.Header.Get("Range") != "" {
dlReq.Header.Add("Range", req.Header.Get("Range")) dlReq.Header.Add("Range", req.Header.Get("Range"))
// log.Debugf("Serving file %s: %s", unrestrict.Filename, req.Header.Get("Range"))
} }
// Perform the request
downloadResp, err := dl.client.Do(dlReq) downloadResp, err := dl.client.Do(dlReq)
if dlErr, ok := err.(*zurghttp.DownloadErrorResponse); ok && dlErr.Message == "bytes_limit_reached" { if dlErr, ok := err.(*zurghttp.DownloadErrorResponse); ok && dlErr.Message == "bytes_limit_reached" {
// log.Warnf("Your account has reached the bandwidth limit, please try again after 12AM CET") // log.Errorf("Your account has reached the bandwidth limit, please try again after 12AM CET")
http.Error(resp, "File is not available (bandwidth limit reached)", http.StatusBadRequest) http.Error(resp, "File is not available (bandwidth limit reached)", http.StatusBadRequest)
return return
} }
if err != nil { if err != nil {
log.Warnf("Cannot download file %s: %v", unrestrict.Download, err) log.Errorf("Cannot download file %s: %v", unrestrict.Download, err)
if file != nil && file.State.Event(context.Background(), "break_file") == nil { if file != nil && file.State.Event(context.Background(), "break_file") == nil {
torMgr.EnqueueForRepair(torrent) torMgr.EnqueueForRepair(torrent)
} }
@@ -194,7 +192,7 @@ func (dl *Downloader) streamFileToResponse(
// Check if the download was not successful // Check if the download was not successful
if downloadResp.StatusCode != http.StatusOK && downloadResp.StatusCode != http.StatusPartialContent { if downloadResp.StatusCode != http.StatusOK && downloadResp.StatusCode != http.StatusPartialContent {
log.Warnf("Received a %s status code for file %s", downloadResp.Status, unrestrict.Filename) log.Errorf("Received a %s status code for file %s", downloadResp.Status, unrestrict.Filename)
if file != nil && file.State.Event(context.Background(), "break_file") == nil { if file != nil && file.State.Event(context.Background(), "break_file") == nil {
torMgr.EnqueueForRepair(torrent) torMgr.EnqueueForRepair(torrent)
} }
@@ -202,15 +200,43 @@ func (dl *Downloader) streamFileToResponse(
return return
} }
// Copy the content-range header from the download response to the response
if cr := downloadResp.Header.Get("Content-Range"); cr != "" { if cr := downloadResp.Header.Get("Content-Range"); cr != "" {
resp.Header().Set("Content-Range", cr) resp.Header().Set("Content-Range", cr)
} }
buf := make([]byte, cfg.GetNetworkBufferSize()) var n int64
n, _ := io.CopyBuffer(resp, downloadResp.Body, buf)
if cfg.ShouldLogRequests() {
var totalBytes int64
ticker := time.NewTicker(60 * time.Second)
done := make(chan bool)
go func() {
for {
select {
case <-ticker.C:
mbps := float64(totalBytes*8) / 60_000_000
if mbps > 0 {
log.Debugf("%s | %.2f MB/s", dlReq.URL, mbps)
}
totalBytes = 0
case <-done:
ticker.Stop()
return
}
}
}()
defer func() {
done <- true
}()
n, _ = io.Copy(io.MultiWriter(resp, &byteCounter{&totalBytes}), downloadResp.Body)
} else {
n, _ = io.Copy(resp, downloadResp.Body)
}
dl.workerPool.Submit(func() { dl.workerPool.Submit(func() {
if n == 0 {
return
}
// Update the download statistics // Update the download statistics
reqBytes, _ := parseRangeHeader(req.Header.Get("Range")) reqBytes, _ := parseRangeHeader(req.Header.Get("Range"))
if reqBytes == 0 && unrestrict != nil { if reqBytes == 0 && unrestrict != nil {
@@ -272,6 +298,16 @@ func parseRangeHeader(rangeHeader string) (uint64, error) {
return bytesToRead, nil return bytesToRead, nil
} }
type byteCounter struct {
totalBytes *int64
}
func (bc *byteCounter) Write(p []byte) (int, error) {
n := len(p)
*bc.totalBytes += int64(n)
return n, nil
}
func bToMb(b uint64) uint64 { func bToMb(b uint64) uint64 {
return b / 1024 / 1024 return b / 1024 / 1024
} }

View File

@@ -91,11 +91,9 @@ func NewHTTPClient(
client.client.Transport = &http.Transport{ client.client.Transport = &http.Transport{
ResponseHeaderTimeout: time.Duration(timeoutSecs) * time.Second, ResponseHeaderTimeout: time.Duration(timeoutSecs) * time.Second,
MaxIdleConns: 0, MaxIdleConnsPerHost: 32,
MaxConnsPerHost: 32, MaxConnsPerHost: 32,
DialContext: func(ctx context.Context, network, address string) (net.Conn, error) { IdleConnTimeout: 90 * time.Second,
return dialer.Dial(network, address)
},
} }
if forceIPv6 { if forceIPv6 {

View File

@@ -19,7 +19,6 @@ token:
# dump_torrents_every_mins: 1440 # 24 hrs # dump_torrents_every_mins: 1440 # 24 hrs
enable_repair: true enable_repair: true
# network_buffer_size: 32768 # 32kb
# force_ipv6: false # force_ipv6: false
# serve_from_rclone: false # serve_from_rclone: false