Add job for monitoring bw limit status of tokens

This commit is contained in:
Ben Adrian Sarmiento
2024-06-28 18:55:02 +02:00
parent c3aea427d0
commit 67111696a2
13 changed files with 136 additions and 55 deletions

View File

@@ -111,8 +111,8 @@ func MainApp(configPath string) {
) )
workerCount := config.GetNumberOfWorkers() workerCount := config.GetNumberOfWorkers()
if workerCount < 10 { if workerCount < 20 {
workerCount = 10 workerCount = 20
} }
workerPool, err := ants.NewPool(workerCount) workerPool, err := ants.NewPool(workerCount)
if err != nil { if err != nil {
@@ -129,6 +129,7 @@ func MainApp(configPath string) {
config, config,
log.Named("realdebrid"), log.Named("realdebrid"),
) )
rd.MonitorExpiredTokens()
premium.MonitorPremiumStatus( premium.MonitorPremiumStatus(
workerPool, workerPool,
@@ -153,6 +154,7 @@ func MainApp(configPath string) {
) )
downloader := universal.NewDownloader(rd, workerPool) downloader := universal.NewDownloader(rd, workerPool)
downloader.StartResetBandwidthCountersJob()
router := chi.NewRouter() router := chi.NewRouter()
handlers.AttachHandlers( handlers.AttachHandlers(

View File

@@ -28,7 +28,7 @@ func loadV1Config(content []byte, log *logutil.Logger) (*ZurgConfigV1, error) {
// don't log token and password // don't log token and password
bufToken := configV1.Token bufToken := configV1.Token
configV1.Token = strings.Repeat("*", len(bufToken)-4) + bufToken[len(bufToken)-4:] configV1.Token = utils.MaskToken(configV1.Token)
bufPassword := configV1.Password bufPassword := configV1.Password
configV1.Password = strings.Repeat("*", len(bufPassword)) configV1.Password = strings.Repeat("*", len(bufPassword))
log.Debugf("Config dump: %+v", configV1) log.Debugf("Config dump: %+v", configV1)

View File

@@ -12,6 +12,7 @@ import (
"github.com/debridmediamanager/zurg/internal/config" "github.com/debridmediamanager/zurg/internal/config"
"github.com/debridmediamanager/zurg/internal/version" "github.com/debridmediamanager/zurg/internal/version"
"github.com/debridmediamanager/zurg/pkg/realdebrid" "github.com/debridmediamanager/zurg/pkg/realdebrid"
"github.com/debridmediamanager/zurg/pkg/utils"
) )
type SponsorResponse struct { type SponsorResponse struct {
@@ -117,7 +118,7 @@ func (zr *Handlers) generateResponse(resp http.ResponseWriter, req *http.Request
Paypal: "https://paypal.me/yowmamasita", Paypal: "https://paypal.me/yowmamasita",
}, },
Config: zr.cfg.GetConfig(), Config: zr.cfg.GetConfig(),
Token: strings.Replace(token, token[len(token)-48:], "*****", 1), Token: utils.MaskToken(token),
IDsToDelete: sortedIDs, IDsToDelete: sortedIDs,
Hosts: zr.hosts, Hosts: zr.hosts,
}, nil }, nil

View File

@@ -219,7 +219,7 @@ func (t *TorrentManager) applyMediaInfoDetails(torrent *Torrent) error {
return return
} }
unrestrict, err := t.UnrestrictFile(file) unrestrict, err := t.UnrestrictFile(file)
if utils.IsBWLimitExceeded(err) { if utils.AreAllTokensExpired(err) {
bwLimitReached = true bwLimitReached = true
return return
} }
@@ -337,6 +337,7 @@ func (t *TorrentManager) mountNewDownloads() {
downloads := t.rd.GetDownloads() downloads := t.rd.GetDownloads()
mountedCount := 0 mountedCount := 0
for i := range downloads { for i := range downloads {
downloads[i].Token = token
isRealDebrid := strings.HasPrefix(downloads[i].Link, "https://real-debrid.com/d/") isRealDebrid := strings.HasPrefix(downloads[i].Link, "https://real-debrid.com/d/")
if !isRealDebrid { if !isRealDebrid {
filename := filepath.Base(downloads[i].Filename) filename := filepath.Base(downloads[i].Filename)

View File

@@ -294,16 +294,15 @@ func (t *TorrentManager) mergeTorrents(existing, toMerge *Torrent) *Torrent {
newer.SelectedFiles.IterCb(func(key string, file *File) { newer.SelectedFiles.IterCb(func(key string, file *File) {
mergedTorrent.SelectedFiles.SetIfAbsent(key, file) mergedTorrent.SelectedFiles.SetIfAbsent(key, file)
}) })
older.SelectedFiles.IterCb(func(key string, file *File) { older.SelectedFiles.IterCb(func(key string, olderFile *File) {
mediaInfo := file.MediaInfo file, ok := mergedTorrent.SelectedFiles.Get(key)
f, ok := mergedTorrent.SelectedFiles.Get(key) if !ok || (file.State.Is("broken_file") && olderFile.State.Is("ok_file")) {
if !ok || !f.State.Is("ok_file") { mergedTorrent.SelectedFiles.Set(key, olderFile)
mergedTorrent.SelectedFiles.Set(key, file)
} }
// get the file again, set the media info // get the file again, set the media info
f, ok = mergedTorrent.SelectedFiles.Get(key) file, ok = mergedTorrent.SelectedFiles.Get(key)
if ok && f.MediaInfo == nil && mediaInfo != nil { if ok && file.MediaInfo == nil && olderFile.MediaInfo != nil {
f.MediaInfo = mediaInfo file.MediaInfo = olderFile.MediaInfo
} }
}) })

View File

@@ -19,6 +19,7 @@ const (
EXPIRED_LINK_TOLERANCE_HOURS = 24 EXPIRED_LINK_TOLERANCE_HOURS = 24
) )
// StartRepairJob is a permanent job that runs every periodically to repair broken torrents
func (t *TorrentManager) StartRepairJob() { func (t *TorrentManager) StartRepairJob() {
if !t.Config.EnableRepair() { if !t.Config.EnableRepair() {
t.repairLog.Warn("Repair is disabled, skipping repair job") t.repairLog.Warn("Repair is disabled, skipping repair job")
@@ -34,7 +35,6 @@ func (t *TorrentManager) StartRepairJob() {
t.repairLog.Debug("Starting periodic repair job") t.repairLog.Debug("Starting periodic repair job")
repairTicker := time.NewTicker(time.Duration(t.Config.GetRepairEveryMins()) * time.Minute) repairTicker := time.NewTicker(time.Duration(t.Config.GetRepairEveryMins()) * time.Minute)
defer repairTicker.Stop() defer repairTicker.Stop()
for { for {
select { select {
case <-repairTicker.C: case <-repairTicker.C:
@@ -192,11 +192,12 @@ func (t *TorrentManager) repair(torrent *Torrent, wg *sync.WaitGroup) {
bwLimitReached := false bwLimitReached := false
// check for other broken file // check for other broken file
torrent.SelectedFiles.IterCb(func(_ string, file *File) { torrent.SelectedFiles.IterCb(func(_ string, file *File) {
// we will only check for working files, we ignore broken and deleted files
if bwLimitReached || !file.State.Is("ok_file") { if bwLimitReached || !file.State.Is("ok_file") {
return return
} }
_, err := t.UnrestrictFile(file) _, err := t.UnrestrictFile(file)
if utils.IsBWLimitExceeded(err) { if utils.AreAllTokensExpired(err) {
bwLimitReached = true bwLimitReached = true
return return
} }
@@ -252,7 +253,7 @@ func (t *TorrentManager) repair(torrent *Torrent, wg *sync.WaitGroup) {
info, err := t.redownloadTorrent(torrent, []string{}) // reinsert the whole torrent, passing empty selection info, err := t.redownloadTorrent(torrent, []string{}) // reinsert the whole torrent, passing empty selection
if info != nil && info.Progress == 100 { if info != nil && info.Progress == 100 {
err = t.checkIfBroken(info, brokenFiles) err = t.checkIfBroken(info, brokenFiles)
if utils.IsBWLimitExceeded(err) { if utils.AreAllTokensExpired(err) {
t.repairLog.Warnf("Your account has reached the bandwidth limit, cannot continue repairing torrent %s", t.GetKey(torrent)) t.repairLog.Warnf("Your account has reached the bandwidth limit, cannot continue repairing torrent %s", t.GetKey(torrent))
return return
} }
@@ -332,14 +333,13 @@ func (t *TorrentManager) assignLinks(torrent *Torrent) bool {
expiredCount := 0 expiredCount := 0
rarCount := 0 rarCount := 0
unassignedCount := 0 unassignedCount := 0
newUnassignedLinks := cmap.New[*realdebrid.Download]()
var assignedLinks []string var assignedLinks []string
bwLimitReached := false bwLimitReached := false
torrent.UnassignedLinks.Clone().Each(func(link string) bool { torrent.UnassignedLinks.Clone().Each(func(link string) bool {
// unrestrict each unassigned link that was filled out during torrent init // unrestrict each unassigned link that was filled out during torrent init
unrestrict, err := t.rd.UnrestrictAndVerify(link) unrestrict, err := t.rd.UnrestrictAndVerify(link)
if utils.IsBWLimitExceeded(err) { if utils.AreAllTokensExpired(err) {
bwLimitReached = true bwLimitReached = true
return true return true
} }
@@ -377,7 +377,18 @@ func (t *TorrentManager) assignLinks(torrent *Torrent) bool {
// it's possible that it is already repaired // it's possible that it is already repaired
t.repairLog.Warnf("Cannot assign %s to any file in torrent %s", unrestrict.Filename, t.GetKey(torrent)) t.repairLog.Warnf("Cannot assign %s to any file in torrent %s", unrestrict.Filename, t.GetKey(torrent))
} }
newUnassignedLinks.Set(link, unrestrict)
torrent.SelectedFiles.Set(unrestrict.Filename, &File{
File: realdebrid.File{
ID: 0,
Path: unrestrict.Filename,
Bytes: unrestrict.Filesize,
Selected: 0,
},
Ended: torrent.Added,
Link: unrestrict.Link,
State: NewFileState("ok_file"),
})
} }
processedCount := assignedCount + unassignedCount + expiredCount processedCount := assignedCount + unassignedCount + expiredCount
@@ -416,20 +427,6 @@ func (t *TorrentManager) assignLinks(torrent *Torrent) bool {
return false // end repair return false // end repair
} }
newUnassignedLinks.IterCb(func(_ string, unassigned *realdebrid.Download) {
torrent.SelectedFiles.Set(unassigned.Filename, &File{
File: realdebrid.File{
ID: 0,
Path: unassigned.Filename,
Bytes: unassigned.Filesize,
Selected: 0,
},
Ended: torrent.Added,
Link: unassigned.Link,
State: NewFileState("ok_file"),
})
})
if action == "extract" { if action == "extract" {
videoFiles := []string{} videoFiles := []string{}
torrent.SelectedFiles.IterCb(func(_ string, file *File) { torrent.SelectedFiles.IterCb(func(_ string, file *File) {

View File

@@ -27,11 +27,14 @@ type Downloader struct {
} }
func NewDownloader(rd *realdebrid.RealDebrid, workerPool *ants.Pool) *Downloader { func NewDownloader(rd *realdebrid.RealDebrid, workerPool *ants.Pool) *Downloader {
dl := &Downloader{ return &Downloader{
rd: rd, rd: rd,
workerPool: workerPool, workerPool: workerPool,
} }
}
// StartResetBandwidthCountersJob is a permanent job that resets the bandwidth counters at 12AM CET
func (dl *Downloader) StartResetBandwidthCountersJob() {
// track bandwidth usage and reset at 12AM CET // track bandwidth usage and reset at 12AM CET
now := time.Now() now := time.Now()
tomorrow := now.AddDate(0, 0, 1) tomorrow := now.AddDate(0, 0, 1)
@@ -42,19 +45,17 @@ func NewDownloader(rd *realdebrid.RealDebrid, workerPool *ants.Pool) *Downloader
nextMidnightInCET := time.Date(tomorrow.Year(), tomorrow.Month(), tomorrow.Day(), 0, 0, 0, 0, cetTZ) nextMidnightInCET := time.Date(tomorrow.Year(), tomorrow.Month(), tomorrow.Day(), 0, 0, 0, 0, cetTZ)
duration := nextMidnightInCET.Sub(now) duration := nextMidnightInCET.Sub(now)
timer := time.NewTimer(duration) timer := time.NewTimer(duration)
// permanent job for bandwidth reset
workerPool.Submit(func() { dl.workerPool.Submit(func() {
<-timer.C <-timer.C
ticker := time.NewTicker(24 * time.Hour) ticker := time.NewTicker(24 * time.Hour)
for { for {
rd.TokenManager.ResetAllTokens() dl.rd.TokenManager.ResetAllTokens()
dl.RequestedBytes.Store(0) dl.RequestedBytes.Store(0)
dl.TotalBytes.Store(0) dl.TotalBytes.Store(0)
<-ticker.C <-ticker.C
} }
}) })
return dl
} }
// DownloadFile handles a GET request for files in torrents // DownloadFile handles a GET request for files in torrents
@@ -90,7 +91,7 @@ func (dl *Downloader) DownloadFile(
} }
unrestrict, err := torMgr.UnrestrictFile(file) unrestrict, err := torMgr.UnrestrictFile(file)
if utils.IsBWLimitExceeded(err) { if utils.AreAllTokensExpired(err) {
// log.Errorf("Your account has reached the bandwidth limit, please try again after 12AM CET") // log.Errorf("Your account has reached the bandwidth limit, please try again after 12AM CET")
http.Error(resp, "File is not available (bandwidth limit reached)", http.StatusBadRequest) http.Error(resp, "File is not available (bandwidth limit reached)", http.StatusBadRequest)
return return
@@ -164,7 +165,8 @@ func (dl *Downloader) streamFileToResponse(
} }
downloadResp, err := dl.rd.DownloadFile(dlReq) downloadResp, err := dl.rd.DownloadFile(dlReq)
if utils.IsBWLimitExceeded(err) { if utils.IsBytesLimitReached(err) {
dl.rd.TokenManager.SetTokenAsExpired(unrestrict.Token, "bandwidth limit exceeded")
// log.Errorf("Your account has reached the bandwidth limit, please try again after 12AM CET") // log.Errorf("Your account has reached the bandwidth limit, please try again after 12AM CET")
http.Error(resp, "File is not available (bandwidth limit reached)", http.StatusBadRequest) http.Error(resp, "File is not available (bandwidth limit reached)", http.StatusBadRequest)
return return

View File

@@ -13,6 +13,7 @@ const (
MINIMUM_SLEEP = 60 // 60 seconds MINIMUM_SLEEP = 60 // 60 seconds
) )
// MonitorPremiumStatus is a permanent job that monitors the premium status of the user
func MonitorPremiumStatus(workerPool *ants.Pool, rd *realdebrid.RealDebrid, zurglog *logutil.Logger) { func MonitorPremiumStatus(workerPool *ants.Pool, rd *realdebrid.RealDebrid, zurglog *logutil.Logger) {
var userInfo *realdebrid.User var userInfo *realdebrid.User
var err error var err error
@@ -52,8 +53,7 @@ func MonitorPremiumStatus(workerPool *ants.Pool, rd *realdebrid.RealDebrid, zurg
userInfo, err = rd.GetUserInformation() userInfo, err = rd.GetUserInformation()
if err != nil { if err != nil {
zurglog.Errorf("Failed to get user information: %v", err) zurglog.Errorf("Failed to get user information: %v", err)
time.Sleep(5 * time.Minute) time.Sleep(time.Duration(MINIMUM_SLEEP) * time.Second)
continue
} }
} }
}) })

View File

@@ -38,7 +38,7 @@ func NewRealDebrid(apiClient, unrestrictClient, downloadClient *zurghttp.HTTPCli
rd := &RealDebrid{ rd := &RealDebrid{
UnrestrictMap: cmap.New[cmap.ConcurrentMap[string, *Download]](), UnrestrictMap: cmap.New[cmap.ConcurrentMap[string, *Download]](),
TokenManager: NewDownloadTokenManager(downloadTokens), TokenManager: NewDownloadTokenManager(downloadTokens, log),
torrentsCache: []Torrent{}, torrentsCache: []Torrent{},
verifiedLinks: cmap.New[int64](), verifiedLinks: cmap.New[int64](),
apiClient: apiClient, apiClient: apiClient,
@@ -69,8 +69,8 @@ func (rd *RealDebrid) UnrestrictAndVerify(link string) (*Download, error) {
} }
// check if the link is already unrestricted // check if the link is already unrestricted
tokenMap, ok := rd.UnrestrictMap.Get(token) tokenMap, _ := rd.UnrestrictMap.Get(token)
if ok && tokenMap.Has(link) { if tokenMap.Has(link) {
download, _ := tokenMap.Get(link) download, _ := tokenMap.Get(link)
// check if the link is in the verified links cache // check if the link is in the verified links cache
if expiry, ok := rd.verifiedLinks.Get(download.ID); ok && expiry > time.Now().Unix() { if expiry, ok := rd.verifiedLinks.Get(download.ID); ok && expiry > time.Now().Unix() {
@@ -78,8 +78,8 @@ func (rd *RealDebrid) UnrestrictAndVerify(link string) (*Download, error) {
} }
err := rd.downloadClient.VerifyLink(download.Download) err := rd.downloadClient.VerifyLink(download.Download)
if utils.IsBWLimitExceeded(err) { if utils.IsBytesLimitReached(err) {
rd.TokenManager.SetTokenAsExpired(token) rd.TokenManager.SetTokenAsExpired(token, "bandwidth limit exceeded")
continue continue
} }
if err != nil { if err != nil {
@@ -94,12 +94,13 @@ func (rd *RealDebrid) UnrestrictAndVerify(link string) (*Download, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
download.Token = token
tokenMap.Set(link, download) tokenMap.Set(link, download)
err = rd.downloadClient.VerifyLink(download.Download) err = rd.downloadClient.VerifyLink(download.Download)
if utils.IsBWLimitExceeded(err) { if utils.IsBytesLimitReached(err) {
rd.TokenManager.SetTokenAsExpired(token) rd.TokenManager.SetTokenAsExpired(token, "bandwidth limit exceeded")
continue continue
} }
if err != nil { if err != nil {
@@ -404,3 +405,35 @@ func (rd *RealDebrid) AvailabilityCheck(hashes []string) (AvailabilityResponse,
func (rd *RealDebrid) DownloadFile(req *http.Request) (*http.Response, error) { func (rd *RealDebrid) DownloadFile(req *http.Request) (*http.Response, error) {
return rd.downloadClient.Do(req) return rd.downloadClient.Do(req)
} }
// MonitorExpiredTokens is a permanent job for monitoring expired tokens if they are still expired
func (rd *RealDebrid) MonitorExpiredTokens() {
sleepPeriod := 5 * time.Minute
rd.workerPool.Submit(func() {
for {
expiredTokens := rd.TokenManager.GetExpiredTokens()
for _, token := range expiredTokens {
tokenMap, _ := rd.UnrestrictMap.Get(token)
stillExpired := true
skipAll := false
tokenMap.IterCb(func(key string, download *Download) {
if skipAll {
return
}
err := rd.downloadClient.VerifyLink(download.Download)
if err != nil {
skipAll = utils.IsBytesLimitReached(err)
return
}
stillExpired = false
skipAll = true
rd.verifiedLinks.Set(download.ID, time.Now().Unix()+60*60*24)
})
if !stillExpired {
rd.TokenManager.SetTokenAsUnexpired(token)
}
}
time.Sleep(sleepPeriod)
}
})
}

View File

@@ -3,6 +3,9 @@ package realdebrid
import ( import (
"fmt" "fmt"
"sync" "sync"
"github.com/debridmediamanager/zurg/pkg/logutil"
"github.com/debridmediamanager/zurg/pkg/utils"
) )
type Token struct { type Token struct {
@@ -14,15 +17,16 @@ type DownloadTokenManager struct {
tokens []Token tokens []Token
current int current int
mu sync.RWMutex mu sync.RWMutex
log *logutil.Logger
} }
// NewDownloadTokenManager initializes a new DownloadTokenManager with the given tokens. // NewDownloadTokenManager initializes a new DownloadTokenManager with the given tokens.
func NewDownloadTokenManager(tokenStrings []string) *DownloadTokenManager { func NewDownloadTokenManager(tokenStrings []string, log *logutil.Logger) *DownloadTokenManager {
tokens := make([]Token, len(tokenStrings)) tokens := make([]Token, len(tokenStrings))
for i, t := range tokenStrings { for i, t := range tokenStrings {
tokens[i] = Token{value: t, expired: false} tokens[i] = Token{value: t, expired: false}
} }
return &DownloadTokenManager{tokens: tokens, current: 0} return &DownloadTokenManager{tokens: tokens, current: 0, log: log}
} }
// GetCurrentToken returns the current non-expired token. // GetCurrentToken returns the current non-expired token.
@@ -38,19 +42,36 @@ func (dtm *DownloadTokenManager) GetCurrentToken() (string, error) {
dtm.current = (dtm.current + 1) % len(dtm.tokens) dtm.current = (dtm.current + 1) % len(dtm.tokens)
if dtm.current == 0 { if dtm.current == 0 {
return "", fmt.Errorf("all tokens are bandwidth-limited") return "", fmt.Errorf("all tokens are expired")
} }
} }
} }
// SetTokenAsExpired sets the specified token as expired. // SetTokenAsExpired sets the specified token as expired.
func (dtm *DownloadTokenManager) SetTokenAsExpired(token string) error { func (dtm *DownloadTokenManager) SetTokenAsExpired(token, reason string) error {
dtm.mu.Lock() dtm.mu.Lock()
defer dtm.mu.Unlock() defer dtm.mu.Unlock()
for i, t := range dtm.tokens { for i, t := range dtm.tokens {
if t.value == token { if t.value == token {
dtm.tokens[i].expired = true dtm.tokens[i].expired = true
dtm.log.Debugf("Token %s set as expired (reason=%s)", utils.MaskToken(token), reason)
return nil
}
}
return fmt.Errorf("token not found")
}
// SetTokenAsUnexpired sets the specified token as unexpired.
func (dtm *DownloadTokenManager) SetTokenAsUnexpired(token string) error {
dtm.mu.Lock()
defer dtm.mu.Unlock()
for i, t := range dtm.tokens {
if t.value == token {
dtm.tokens[i].expired = false
dtm.log.Debugf("Token %s set as unexpired", utils.MaskToken(token))
return nil return nil
} }
} }
@@ -68,3 +89,16 @@ func (dtm *DownloadTokenManager) ResetAllTokens() {
} }
dtm.current = 0 dtm.current = 0
} }
func (dtm *DownloadTokenManager) GetExpiredTokens() []string {
dtm.mu.RLock()
defer dtm.mu.RUnlock()
var tokens []string
for _, t := range dtm.tokens {
if t.expired {
tokens = append(tokens, t.value)
}
}
return tokens
}

View File

@@ -24,6 +24,7 @@ type Download struct {
Download string `json:"download"` // Generated link Download string `json:"download"` // Generated link
Streamable int `json:"streamable"` Streamable int `json:"streamable"`
Generated string `json:"-"` // jsonDate Generated string `json:"-"` // jsonDate
Token string `json:"-"`
} }
func (d *Download) UnmarshalJSON(data []byte) error { func (d *Download) UnmarshalJSON(data []byte) error {

View File

@@ -2,9 +2,13 @@ package utils
import "github.com/debridmediamanager/zurg/pkg/http" import "github.com/debridmediamanager/zurg/pkg/http"
func IsBWLimitExceeded(err error) bool { func IsBytesLimitReached(err error) bool {
if dlErr, ok := err.(*http.DownloadErrorResponse); ok && dlErr.Message == "bytes_limit_reached" { if dlErr, ok := err.(*http.DownloadErrorResponse); ok && dlErr.Message == "bytes_limit_reached" {
return true return true
} }
return false return false
} }
func AreAllTokensExpired(err error) bool {
return err.Error() == "all tokens are expired"
}

7
pkg/utils/mask_token.go Normal file
View File

@@ -0,0 +1,7 @@
package utils
import "strings"
func MaskToken(token string) string {
return strings.Repeat("*", len(token)-4) + token[len(token)-4:]
}