Add bandwidth tracking
This commit is contained in:
@@ -102,7 +102,11 @@ func MainApp(configPath string) {
|
|||||||
log.Named("download_client"),
|
log.Named("download_client"),
|
||||||
)
|
)
|
||||||
|
|
||||||
workerPool, err := ants.NewPool(config.GetNumberOfWorkers())
|
workerCount := config.GetNumberOfWorkers()
|
||||||
|
if workerCount < 10 {
|
||||||
|
workerCount = 10
|
||||||
|
}
|
||||||
|
workerPool, err := ants.NewPool(workerCount)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
zurglog.Errorf("Failed to create worker pool: %v", err)
|
zurglog.Errorf("Failed to create worker pool: %v", err)
|
||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
@@ -140,7 +144,7 @@ func MainApp(configPath string) {
|
|||||||
log.Named("repair"),
|
log.Named("repair"),
|
||||||
)
|
)
|
||||||
|
|
||||||
downloader := universal.NewDownloader(downloadClient)
|
downloader := universal.NewDownloader(downloadClient, workerPool)
|
||||||
|
|
||||||
router := chi.NewRouter()
|
router := chi.NewRouter()
|
||||||
handlers.AttachHandlers(
|
handlers.AttachHandlers(
|
||||||
|
|||||||
@@ -29,7 +29,9 @@ type RootResponse struct {
|
|||||||
Infuse string `json:"infuse"`
|
Infuse string `json:"infuse"`
|
||||||
Logs string `json:"logs"`
|
Logs string `json:"logs"`
|
||||||
UserInfo *realdebrid.User `json:"user_info"`
|
UserInfo *realdebrid.User `json:"user_info"`
|
||||||
RDTrafficUsed uint64 `json:"rd_traffic_used"`
|
TrafficLogged uint64 `json:"traffic_logged"`
|
||||||
|
RequestedMB uint64 `json:"requested_mb"`
|
||||||
|
ServedMB uint64 `json:"served_mb"`
|
||||||
LibrarySize int `json:"library_size"` // Number of torrents in the library
|
LibrarySize int `json:"library_size"` // Number of torrents in the library
|
||||||
TorrentsToRepair string `json:"repair_queue"` // List of torrents in the repair queue
|
TorrentsToRepair string `json:"repair_queue"` // List of torrents in the repair queue
|
||||||
MemAlloc uint64 `json:"mem_alloc"` // Memory allocation in MB
|
MemAlloc uint64 `json:"mem_alloc"` // Memory allocation in MB
|
||||||
@@ -82,10 +84,10 @@ func (zr *Handlers) generateResponse(resp http.ResponseWriter, req *http.Request
|
|||||||
sort.Strings(sortedIDs)
|
sort.Strings(sortedIDs)
|
||||||
|
|
||||||
// check if real-debrid.com is in the traffic details
|
// check if real-debrid.com is in the traffic details
|
||||||
var rdTrafficUsed int64
|
var trafficLogged int64
|
||||||
rdTrafficUsed = 0
|
trafficLogged = 0
|
||||||
if _, ok := trafficDetails["real-debrid.com"]; ok {
|
if _, ok := trafficDetails["real-debrid.com"]; ok {
|
||||||
rdTrafficUsed = trafficDetails["real-debrid.com"]
|
trafficLogged = trafficDetails["real-debrid.com"]
|
||||||
}
|
}
|
||||||
|
|
||||||
userInfo.Premium = userInfo.Premium / 86400
|
userInfo.Premium = userInfo.Premium / 86400
|
||||||
@@ -101,7 +103,9 @@ func (zr *Handlers) generateResponse(resp http.ResponseWriter, req *http.Request
|
|||||||
Infuse: fmt.Sprintf("//%s/infuse/", req.Host),
|
Infuse: fmt.Sprintf("//%s/infuse/", req.Host),
|
||||||
Logs: fmt.Sprintf("//%s/logs/", req.Host),
|
Logs: fmt.Sprintf("//%s/logs/", req.Host),
|
||||||
UserInfo: userInfo,
|
UserInfo: userInfo,
|
||||||
RDTrafficUsed: bToGb(uint64(rdTrafficUsed)),
|
TrafficLogged: bToMb(uint64(trafficLogged)),
|
||||||
|
RequestedMB: bToMb(zr.downloader.RequestedBytes.Load()),
|
||||||
|
ServedMB: bToMb(zr.downloader.TotalBytes.Load()),
|
||||||
LibrarySize: allTorrents.Count(),
|
LibrarySize: allTorrents.Count(),
|
||||||
TorrentsToRepair: repairQueueStr,
|
TorrentsToRepair: repairQueueStr,
|
||||||
MemAlloc: bToMb(mem.Alloc),
|
MemAlloc: bToMb(mem.Alloc),
|
||||||
@@ -190,6 +194,12 @@ func (zr *Handlers) handleHome(resp http.ResponseWriter, req *http.Request) {
|
|||||||
response.Logs,
|
response.Logs,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
denominator := response.RequestedMB
|
||||||
|
if denominator == 0 {
|
||||||
|
denominator = 1
|
||||||
|
}
|
||||||
|
efficiency := response.ServedMB / denominator
|
||||||
|
|
||||||
out += fmt.Sprintf(`
|
out += fmt.Sprintf(`
|
||||||
<tr>
|
<tr>
|
||||||
<td>Library Size</td>
|
<td>Library Size</td>
|
||||||
@@ -216,8 +226,20 @@ func (zr *Handlers) handleHome(resp http.ResponseWriter, req *http.Request) {
|
|||||||
<td colspan="2">%d</td>
|
<td colspan="2">%d</td>
|
||||||
</tr>
|
</tr>
|
||||||
<tr>
|
<tr>
|
||||||
<td>RD Traffic Used</td>
|
<td>Traffic Logged</td>
|
||||||
<td colspan="2">%d GB</td>
|
<td colspan="2">%d MB</td>
|
||||||
|
</tr>
|
||||||
|
<tr>
|
||||||
|
<td>Traffic Requested</td>
|
||||||
|
<td colspan="2">%d MB</td>
|
||||||
|
</tr>
|
||||||
|
<tr>
|
||||||
|
<td>Traffic Served</td>
|
||||||
|
<td colspan="2">%d MB</td>
|
||||||
|
</tr>
|
||||||
|
<tr>
|
||||||
|
<td>Traffic Efficiency</td>
|
||||||
|
<td colspan="2">%d%% (wasted %d MB)</td>
|
||||||
</tr>`,
|
</tr>`,
|
||||||
response.LibrarySize,
|
response.LibrarySize,
|
||||||
response.MemAlloc,
|
response.MemAlloc,
|
||||||
@@ -225,7 +247,11 @@ func (zr *Handlers) handleHome(resp http.ResponseWriter, req *http.Request) {
|
|||||||
response.Sys,
|
response.Sys,
|
||||||
response.NumGC,
|
response.NumGC,
|
||||||
response.PID,
|
response.PID,
|
||||||
response.RDTrafficUsed,
|
response.TrafficLogged,
|
||||||
|
response.RequestedMB,
|
||||||
|
response.ServedMB,
|
||||||
|
efficiency,
|
||||||
|
response.RequestedMB-response.ServedMB,
|
||||||
)
|
)
|
||||||
|
|
||||||
out += fmt.Sprintf(`
|
out += fmt.Sprintf(`
|
||||||
@@ -467,7 +493,3 @@ func (zr *Handlers) handleHome(resp http.ResponseWriter, req *http.Request) {
|
|||||||
func bToMb(b uint64) uint64 {
|
func bToMb(b uint64) uint64 {
|
||||||
return b / 1024 / 1024
|
return b / 1024 / 1024
|
||||||
}
|
}
|
||||||
|
|
||||||
func bToGb(b uint64) uint64 {
|
|
||||||
return b / 1024 / 1024 / 1024
|
|
||||||
}
|
|
||||||
|
|||||||
@@ -2,26 +2,55 @@ package universal
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync/atomic"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/debridmediamanager/zurg/internal/config"
|
"github.com/debridmediamanager/zurg/internal/config"
|
||||||
intTor "github.com/debridmediamanager/zurg/internal/torrent"
|
intTor "github.com/debridmediamanager/zurg/internal/torrent"
|
||||||
zurghttp "github.com/debridmediamanager/zurg/pkg/http"
|
zurghttp "github.com/debridmediamanager/zurg/pkg/http"
|
||||||
"github.com/debridmediamanager/zurg/pkg/logutil"
|
"github.com/debridmediamanager/zurg/pkg/logutil"
|
||||||
"github.com/debridmediamanager/zurg/pkg/realdebrid"
|
"github.com/debridmediamanager/zurg/pkg/realdebrid"
|
||||||
|
"github.com/panjf2000/ants/v2"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Downloader struct {
|
type Downloader struct {
|
||||||
client *zurghttp.HTTPClient
|
client *zurghttp.HTTPClient
|
||||||
|
RequestedBytes atomic.Uint64
|
||||||
|
TotalBytes atomic.Uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewDownloader(client *zurghttp.HTTPClient) *Downloader {
|
func NewDownloader(client *zurghttp.HTTPClient, workerPool *ants.Pool) *Downloader {
|
||||||
return &Downloader{
|
dl := &Downloader{
|
||||||
client: client,
|
client: client,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// track bandwidth usage and reset at 12AM CET
|
||||||
|
now := time.Now()
|
||||||
|
tomorrow := now.AddDate(0, 0, 1)
|
||||||
|
cetTZ, err := time.LoadLocation("CET")
|
||||||
|
if err != nil {
|
||||||
|
cetTZ = time.FixedZone("CET", 1*60*60)
|
||||||
|
}
|
||||||
|
nextMidnightInCET := time.Date(tomorrow.Year(), tomorrow.Month(), tomorrow.Day(), 0, 0, 0, 0, cetTZ)
|
||||||
|
duration := nextMidnightInCET.Sub(now)
|
||||||
|
timer := time.NewTimer(duration)
|
||||||
|
workerPool.Submit(func() {
|
||||||
|
<-timer.C
|
||||||
|
ticker := time.NewTicker(24 * time.Hour)
|
||||||
|
for {
|
||||||
|
dl.RequestedBytes.Store(0)
|
||||||
|
dl.TotalBytes.Store(0)
|
||||||
|
<-ticker.C
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
return dl
|
||||||
}
|
}
|
||||||
|
|
||||||
// DownloadFile handles a GET request for files in torrents
|
// DownloadFile handles a GET request for files in torrents
|
||||||
@@ -146,7 +175,7 @@ func (dl *Downloader) streamFileToResponse(
|
|||||||
// Add the range header if it exists
|
// Add the range header if it exists
|
||||||
if req.Header.Get("Range") != "" {
|
if req.Header.Get("Range") != "" {
|
||||||
dlReq.Header.Add("Range", req.Header.Get("Range"))
|
dlReq.Header.Add("Range", req.Header.Get("Range"))
|
||||||
// log.Debugf("Range request for file %s: %s", unrestrict.Download, req.Header.Get("Range"))
|
// log.Debugf("Serving file %s: %s", unrestrict.Filename, req.Header.Get("Range"))
|
||||||
}
|
}
|
||||||
|
|
||||||
// Perform the request
|
// Perform the request
|
||||||
@@ -182,9 +211,66 @@ func (dl *Downloader) streamFileToResponse(
|
|||||||
}
|
}
|
||||||
|
|
||||||
buf := make([]byte, cfg.GetNetworkBufferSize())
|
buf := make([]byte, cfg.GetNetworkBufferSize())
|
||||||
io.CopyBuffer(resp, downloadResp.Body, buf)
|
n, _ := io.CopyBuffer(resp, downloadResp.Body, buf)
|
||||||
|
|
||||||
|
// Update the download statistics
|
||||||
|
reqBytes, _ := parseRangeHeader(req.Header.Get("Range"))
|
||||||
|
if reqBytes == 0 && unrestrict != nil {
|
||||||
|
reqBytes = uint64(unrestrict.Filesize)
|
||||||
|
}
|
||||||
|
dl.RequestedBytes.Add(reqBytes)
|
||||||
|
dl.TotalBytes.Add(uint64(n))
|
||||||
|
log.Debugf("Served %d MB of the requested %d MB of file %s (range=%s)", bToMb(uint64(n)), bToMb(reqBytes), unrestrict.Filename, req.Header.Get("Range"))
|
||||||
}
|
}
|
||||||
|
|
||||||
func redirect(resp http.ResponseWriter, req *http.Request, url string) {
|
func redirect(resp http.ResponseWriter, req *http.Request, url string) {
|
||||||
http.Redirect(resp, req, url, http.StatusFound)
|
http.Redirect(resp, req, url, http.StatusFound)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func parseRangeHeader(rangeHeader string) (uint64, error) {
|
||||||
|
if rangeHeader == "" { // Empty header means no range request
|
||||||
|
return 0, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if !strings.HasPrefix(rangeHeader, "bytes=") {
|
||||||
|
return 0, fmt.Errorf("invalid range header format")
|
||||||
|
}
|
||||||
|
|
||||||
|
parts := strings.SplitN(rangeHeader[6:], "-", 2) // [6:] removes "bytes="
|
||||||
|
if len(parts) != 2 {
|
||||||
|
return 0, fmt.Errorf("invalid range specification")
|
||||||
|
}
|
||||||
|
|
||||||
|
var start, end uint64
|
||||||
|
var err error
|
||||||
|
|
||||||
|
// Case 1: "bytes=100-" (from byte 100 to the end)
|
||||||
|
if parts[0] != "" {
|
||||||
|
start, err = strconv.ParseUint(parts[0], 10, 64)
|
||||||
|
if err != nil {
|
||||||
|
return 0, fmt.Errorf("invalid start value: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Case 2: "bytes=-200" (last 200 bytes)
|
||||||
|
if parts[1] != "" {
|
||||||
|
end, err = strconv.ParseUint(parts[1], 10, 64)
|
||||||
|
if err != nil {
|
||||||
|
return 0, fmt.Errorf("invalid end value: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Handle "bytes=500-100" (invalid range)
|
||||||
|
if start > end {
|
||||||
|
return 0, fmt.Errorf("invalid range: start cannot be greater than end")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Calculate bytes to read
|
||||||
|
bytesToRead := end - start + 1 // +1 because ranges are inclusive
|
||||||
|
|
||||||
|
return bytesToRead, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func bToMb(b uint64) uint64 {
|
||||||
|
return b / 1024 / 1024
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user