Add token management

This commit is contained in:
Ben Adrian Sarmiento
2024-06-28 13:19:09 +02:00
parent 962845fb81
commit c3aea427d0
9 changed files with 100 additions and 114 deletions

View File

@@ -121,7 +121,7 @@ func MainApp(configPath string) {
} }
defer workerPool.Release() defer workerPool.Release()
api := realdebrid.NewRealDebrid( rd := realdebrid.NewRealDebrid(
apiClient, apiClient,
unrestrictClient, unrestrictClient,
downloadClient, downloadClient,
@@ -132,7 +132,7 @@ func MainApp(configPath string) {
premium.MonitorPremiumStatus( premium.MonitorPremiumStatus(
workerPool, workerPool,
api, rd,
zurglog, zurglog,
) )
@@ -145,14 +145,14 @@ func MainApp(configPath string) {
torrentMgr := torrent.NewTorrentManager( torrentMgr := torrent.NewTorrentManager(
config, config,
api, rd,
workerPool, workerPool,
hasFFprobe, hasFFprobe,
log.Named("manager"), log.Named("manager"),
log.Named("repair"), log.Named("repair"),
) )
downloader := universal.NewDownloader(downloadClient, workerPool) downloader := universal.NewDownloader(rd, workerPool)
router := chi.NewRouter() router := chi.NewRouter()
handlers.AttachHandlers( handlers.AttachHandlers(
@@ -160,7 +160,7 @@ func MainApp(configPath string) {
downloader, downloader,
torrentMgr, torrentMgr,
config, config,
api, rd,
workerPool, workerPool,
hosts, hosts,
log.Named("router"), log.Named("router"),

View File

@@ -47,7 +47,7 @@ type RootResponse struct {
} }
func (zr *Handlers) generateResponse(resp http.ResponseWriter, req *http.Request) (*RootResponse, error) { func (zr *Handlers) generateResponse(resp http.ResponseWriter, req *http.Request) (*RootResponse, error) {
userInfo, err := zr.api.GetUserInformation() userInfo, err := zr.rd.GetUserInformation()
if err != nil { if err != nil {
http.Error(resp, err.Error(), http.StatusInternalServerError) http.Error(resp, err.Error(), http.StatusInternalServerError)
return nil, err return nil, err
@@ -77,7 +77,7 @@ func (zr *Handlers) generateResponse(resp http.ResponseWriter, req *http.Request
sortedIDs := zr.torMgr.OnceDoneBin.ToSlice() sortedIDs := zr.torMgr.OnceDoneBin.ToSlice()
sort.Strings(sortedIDs) sort.Strings(sortedIDs)
trafficDetails, err := zr.api.GetTrafficDetails() trafficDetails, err := zr.rd.GetTrafficDetails()
if err != nil { if err != nil {
http.Error(resp, err.Error(), http.StatusInternalServerError) http.Error(resp, err.Error(), http.StatusInternalServerError)
return nil, err return nil, err

View File

@@ -24,7 +24,7 @@ type Handlers struct {
downloader *universal.Downloader downloader *universal.Downloader
torMgr *torrent.TorrentManager torMgr *torrent.TorrentManager
cfg config.ConfigInterface cfg config.ConfigInterface
api *realdebrid.RealDebrid rd *realdebrid.RealDebrid
workerPool *ants.Pool workerPool *ants.Pool
hosts []string hosts []string
trafficOnStartup atomic.Uint64 trafficOnStartup atomic.Uint64
@@ -37,18 +37,18 @@ func init() {
chi.RegisterMethod("MOVE") chi.RegisterMethod("MOVE")
} }
func AttachHandlers(router *chi.Mux, downloader *universal.Downloader, torMgr *torrent.TorrentManager, cfg config.ConfigInterface, api *realdebrid.RealDebrid, workerPool *ants.Pool, hosts []string, log *logutil.Logger) { func AttachHandlers(router *chi.Mux, downloader *universal.Downloader, torMgr *torrent.TorrentManager, cfg config.ConfigInterface, rd *realdebrid.RealDebrid, workerPool *ants.Pool, hosts []string, log *logutil.Logger) {
hs := &Handlers{ hs := &Handlers{
downloader: downloader, downloader: downloader,
torMgr: torMgr, torMgr: torMgr,
cfg: cfg, cfg: cfg,
api: api, rd: rd,
workerPool: workerPool, workerPool: workerPool,
hosts: hosts, hosts: hosts,
log: log, log: log,
} }
trafficDetails, err := api.GetTrafficDetails() trafficDetails, err := rd.GetTrafficDetails()
if err != nil { if err != nil {
log.Errorf("Failed to get traffic details: %v", err) log.Errorf("Failed to get traffic details: %v", err)
trafficDetails = make(map[string]int64) trafficDetails = make(map[string]int64)

View File

@@ -12,7 +12,6 @@ import (
"github.com/debridmediamanager/zurg/internal/config" "github.com/debridmediamanager/zurg/internal/config"
"github.com/debridmediamanager/zurg/internal/fs" "github.com/debridmediamanager/zurg/internal/fs"
"github.com/debridmediamanager/zurg/pkg/http"
"github.com/debridmediamanager/zurg/pkg/logutil" "github.com/debridmediamanager/zurg/pkg/logutil"
"github.com/debridmediamanager/zurg/pkg/realdebrid" "github.com/debridmediamanager/zurg/pkg/realdebrid"
"github.com/debridmediamanager/zurg/pkg/utils" "github.com/debridmediamanager/zurg/pkg/utils"
@@ -144,7 +143,7 @@ func (t *TorrentManager) UnrestrictFile(file *File) (*realdebrid.Download, error
} else if file.State.Is("broken_file") { } else if file.State.Is("broken_file") {
return nil, fmt.Errorf("file %s is broken", file.Path) return nil, fmt.Errorf("file %s is broken", file.Path)
} }
return t.rd.UnrestrictLink(file.Link) return t.rd.UnrestrictAndVerify(file.Link)
} }
func (t *TorrentManager) GetKey(torrent *Torrent) string { func (t *TorrentManager) GetKey(torrent *Torrent) string {
@@ -220,7 +219,7 @@ func (t *TorrentManager) applyMediaInfoDetails(torrent *Torrent) error {
return return
} }
unrestrict, err := t.UnrestrictFile(file) unrestrict, err := t.UnrestrictFile(file)
if dlErr, ok := err.(*http.DownloadErrorResponse); ok && dlErr.Message == "bytes_limit_reached" { if utils.IsBWLimitExceeded(err) {
bwLimitReached = true bwLimitReached = true
return return
} }
@@ -332,11 +331,8 @@ func (t *TorrentManager) deleteInfoFile(torrentID string) {
/// end info functions /// end info functions
func (t *TorrentManager) mountNewDownloads() { func (t *TorrentManager) mountNewDownloads() {
token, _ := t.rd.GetToken() token := t.Config.GetToken()
var tokenMap cmap.ConcurrentMap[string, *realdebrid.Download] tokenMap, _ := t.rd.UnrestrictMap.Get(token)
if token != "" {
tokenMap, _ = t.rd.UnrestrictMap.Get(token)
}
downloads := t.rd.GetDownloads() downloads := t.rd.GetDownloads()
mountedCount := 0 mountedCount := 0

View File

@@ -9,7 +9,6 @@ import (
"time" "time"
"github.com/debridmediamanager/zurg/internal/config" "github.com/debridmediamanager/zurg/internal/config"
"github.com/debridmediamanager/zurg/pkg/http"
"github.com/debridmediamanager/zurg/pkg/realdebrid" "github.com/debridmediamanager/zurg/pkg/realdebrid"
"github.com/debridmediamanager/zurg/pkg/utils" "github.com/debridmediamanager/zurg/pkg/utils"
mapset "github.com/deckarep/golang-set/v2" mapset "github.com/deckarep/golang-set/v2"
@@ -197,7 +196,7 @@ func (t *TorrentManager) repair(torrent *Torrent, wg *sync.WaitGroup) {
return return
} }
_, err := t.UnrestrictFile(file) _, err := t.UnrestrictFile(file)
if dlErr, ok := err.(*http.DownloadErrorResponse); ok && dlErr.Message == "bytes_limit_reached" { if utils.IsBWLimitExceeded(err) {
bwLimitReached = true bwLimitReached = true
return return
} }
@@ -253,7 +252,7 @@ func (t *TorrentManager) repair(torrent *Torrent, wg *sync.WaitGroup) {
info, err := t.redownloadTorrent(torrent, []string{}) // reinsert the whole torrent, passing empty selection info, err := t.redownloadTorrent(torrent, []string{}) // reinsert the whole torrent, passing empty selection
if info != nil && info.Progress == 100 { if info != nil && info.Progress == 100 {
err = t.checkIfBroken(info, brokenFiles) err = t.checkIfBroken(info, brokenFiles)
if dlErr, ok := err.(*http.DownloadErrorResponse); ok && dlErr.Message == "bytes_limit_reached" { if utils.IsBWLimitExceeded(err) {
t.repairLog.Warnf("Your account has reached the bandwidth limit, cannot continue repairing torrent %s", t.GetKey(torrent)) t.repairLog.Warnf("Your account has reached the bandwidth limit, cannot continue repairing torrent %s", t.GetKey(torrent))
return return
} }
@@ -339,8 +338,8 @@ func (t *TorrentManager) assignLinks(torrent *Torrent) bool {
bwLimitReached := false bwLimitReached := false
torrent.UnassignedLinks.Clone().Each(func(link string) bool { torrent.UnassignedLinks.Clone().Each(func(link string) bool {
// unrestrict each unassigned link that was filled out during torrent init // unrestrict each unassigned link that was filled out during torrent init
unrestrict, err := t.rd.UnrestrictLink(link) unrestrict, err := t.rd.UnrestrictAndVerify(link)
if dlErr, ok := err.(*http.DownloadErrorResponse); ok && dlErr.Message == "bytes_limit_reached" { if utils.IsBWLimitExceeded(err) {
bwLimitReached = true bwLimitReached = true
return true return true
} }

View File

@@ -13,22 +13,22 @@ import (
"github.com/debridmediamanager/zurg/internal/config" "github.com/debridmediamanager/zurg/internal/config"
intTor "github.com/debridmediamanager/zurg/internal/torrent" intTor "github.com/debridmediamanager/zurg/internal/torrent"
zurghttp "github.com/debridmediamanager/zurg/pkg/http"
"github.com/debridmediamanager/zurg/pkg/logutil" "github.com/debridmediamanager/zurg/pkg/logutil"
"github.com/debridmediamanager/zurg/pkg/realdebrid" "github.com/debridmediamanager/zurg/pkg/realdebrid"
"github.com/debridmediamanager/zurg/pkg/utils"
"github.com/panjf2000/ants/v2" "github.com/panjf2000/ants/v2"
) )
type Downloader struct { type Downloader struct {
client *zurghttp.HTTPClient rd *realdebrid.RealDebrid
workerPool *ants.Pool workerPool *ants.Pool
RequestedBytes atomic.Uint64 RequestedBytes atomic.Uint64
TotalBytes atomic.Uint64 TotalBytes atomic.Uint64
} }
func NewDownloader(client *zurghttp.HTTPClient, workerPool *ants.Pool) *Downloader { func NewDownloader(rd *realdebrid.RealDebrid, workerPool *ants.Pool) *Downloader {
dl := &Downloader{ dl := &Downloader{
client: client, rd: rd,
workerPool: workerPool, workerPool: workerPool,
} }
@@ -42,10 +42,12 @@ func NewDownloader(client *zurghttp.HTTPClient, workerPool *ants.Pool) *Download
nextMidnightInCET := time.Date(tomorrow.Year(), tomorrow.Month(), tomorrow.Day(), 0, 0, 0, 0, cetTZ) nextMidnightInCET := time.Date(tomorrow.Year(), tomorrow.Month(), tomorrow.Day(), 0, 0, 0, 0, cetTZ)
duration := nextMidnightInCET.Sub(now) duration := nextMidnightInCET.Sub(now)
timer := time.NewTimer(duration) timer := time.NewTimer(duration)
// permanent job for bandwidth reset
workerPool.Submit(func() { workerPool.Submit(func() {
<-timer.C <-timer.C
ticker := time.NewTicker(24 * time.Hour) ticker := time.NewTicker(24 * time.Hour)
for { for {
rd.TokenManager.ResetAllTokens()
dl.RequestedBytes.Store(0) dl.RequestedBytes.Store(0)
dl.TotalBytes.Store(0) dl.TotalBytes.Store(0)
<-ticker.C <-ticker.C
@@ -88,7 +90,7 @@ func (dl *Downloader) DownloadFile(
} }
unrestrict, err := torMgr.UnrestrictFile(file) unrestrict, err := torMgr.UnrestrictFile(file)
if dlErr, ok := err.(*zurghttp.DownloadErrorResponse); ok && dlErr.Message == "bytes_limit_reached" { if utils.IsBWLimitExceeded(err) {
// log.Errorf("Your account has reached the bandwidth limit, please try again after 12AM CET") // log.Errorf("Your account has reached the bandwidth limit, please try again after 12AM CET")
http.Error(resp, "File is not available (bandwidth limit reached)", http.StatusBadRequest) http.Error(resp, "File is not available (bandwidth limit reached)", http.StatusBadRequest)
return return
@@ -161,8 +163,8 @@ func (dl *Downloader) streamFileToResponse(
dlReq.Header.Add("Range", req.Header.Get("Range")) dlReq.Header.Add("Range", req.Header.Get("Range"))
} }
downloadResp, err := dl.client.Do(dlReq) downloadResp, err := dl.rd.DownloadFile(dlReq)
if dlErr, ok := err.(*zurghttp.DownloadErrorResponse); ok && dlErr.Message == "bytes_limit_reached" { if utils.IsBWLimitExceeded(err) {
// log.Errorf("Your account has reached the bandwidth limit, please try again after 12AM CET") // log.Errorf("Your account has reached the bandwidth limit, please try again after 12AM CET")
http.Error(resp, "File is not available (bandwidth limit reached)", http.StatusBadRequest) http.Error(resp, "File is not available (bandwidth limit reached)", http.StatusBadRequest)
return return

View File

@@ -11,18 +11,19 @@ import (
"github.com/debridmediamanager/zurg/internal/config" "github.com/debridmediamanager/zurg/internal/config"
zurghttp "github.com/debridmediamanager/zurg/pkg/http" zurghttp "github.com/debridmediamanager/zurg/pkg/http"
"github.com/debridmediamanager/zurg/pkg/logutil" "github.com/debridmediamanager/zurg/pkg/logutil"
"github.com/debridmediamanager/zurg/pkg/utils"
cmap "github.com/orcaman/concurrent-map/v2" cmap "github.com/orcaman/concurrent-map/v2"
"github.com/panjf2000/ants/v2" "github.com/panjf2000/ants/v2"
) )
type RealDebrid struct { type RealDebrid struct {
torrentsCache []Torrent
UnrestrictMap cmap.ConcurrentMap[string, cmap.ConcurrentMap[string, *Download]] UnrestrictMap cmap.ConcurrentMap[string, cmap.ConcurrentMap[string, *Download]]
TokenManager *DownloadTokenManager
torrentsCache []Torrent
verifiedLinks cmap.ConcurrentMap[string, int64] verifiedLinks cmap.ConcurrentMap[string, int64]
apiClient *zurghttp.HTTPClient apiClient *zurghttp.HTTPClient
unrestrictClient *zurghttp.HTTPClient unrestrictClient *zurghttp.HTTPClient
downloadClient *zurghttp.HTTPClient downloadClient *zurghttp.HTTPClient
tokenManager *DownloadTokenManager
workerPool *ants.Pool workerPool *ants.Pool
cfg config.ConfigInterface cfg config.ConfigInterface
log *logutil.Logger log *logutil.Logger
@@ -36,13 +37,13 @@ func NewRealDebrid(apiClient, unrestrictClient, downloadClient *zurghttp.HTTPCli
} }
rd := &RealDebrid{ rd := &RealDebrid{
torrentsCache: []Torrent{},
UnrestrictMap: cmap.New[cmap.ConcurrentMap[string, *Download]](), UnrestrictMap: cmap.New[cmap.ConcurrentMap[string, *Download]](),
TokenManager: NewDownloadTokenManager(downloadTokens),
torrentsCache: []Torrent{},
verifiedLinks: cmap.New[int64](), verifiedLinks: cmap.New[int64](),
apiClient: apiClient, apiClient: apiClient,
unrestrictClient: unrestrictClient, unrestrictClient: unrestrictClient,
downloadClient: downloadClient, downloadClient: downloadClient,
tokenManager: NewDownloadTokenManager(downloadTokens),
workerPool: workerPool, workerPool: workerPool,
cfg: cfg, cfg: cfg,
log: log, log: log,
@@ -59,76 +60,59 @@ func NewRealDebrid(apiClient, unrestrictClient, downloadClient *zurghttp.HTTPCli
return rd return rd
} }
// currently unused func (rd *RealDebrid) UnrestrictAndVerify(link string) (*Download, error) {
func (rd *RealDebrid) UnrestrictCheck(link string) (*Download, error) {
data := url.Values{}
data.Set("link", link)
requestBody := strings.NewReader(data.Encode())
req, err := http.NewRequest(http.MethodPost, "https://api.real-debrid.com/rest/1.0/unrestrict/check", requestBody)
if err != nil {
rd.log.Errorf("Error when creating a unrestrict check request: %v", err)
return nil, err
}
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
resp, err := rd.unrestrictClient.Do(req)
if err != nil {
rd.log.Errorf("Error when executing the unrestrict check request: %v", err)
return nil, err
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
rd.log.Errorf("Error when reading the body of unrestrict check response: %v", err)
return nil, err
}
var response Download
err = json.Unmarshal(body, &response)
if err != nil {
rd.log.Errorf("Error when decoding unrestrict check JSON: %v", err)
return nil, err
}
rd.log.Debugf("Link %s is streamable? %v", response.Streamable)
return &response, nil
}
func (rd *RealDebrid) UnrestrictLink(link string) (*Download, error) {
for { for {
token, err := rd.tokenManager.GetCurrentToken() token, err := rd.TokenManager.GetCurrentToken()
if err != nil { if err != nil {
// when all tokens are expired // when all tokens are expired
return nil, err return nil, err
} }
download, err := rd.UnrestrictLinkWithToken(token, link)
if dlErr, ok := err.(*zurghttp.DownloadErrorResponse); ok && dlErr.Message == "bytes_limit_reached" { // check if the link is already unrestricted
rd.tokenManager.SetCurrentTokenExpired() tokenMap, ok := rd.UnrestrictMap.Get(token)
if ok && tokenMap.Has(link) {
download, _ := tokenMap.Get(link)
// check if the link is in the verified links cache
if expiry, ok := rd.verifiedLinks.Get(download.ID); ok && expiry > time.Now().Unix() {
return download, nil
}
err := rd.downloadClient.VerifyLink(download.Download)
if utils.IsBWLimitExceeded(err) {
rd.TokenManager.SetTokenAsExpired(token)
continue continue
} }
if err != nil {
return nil, err
}
rd.verifiedLinks.Set(download.ID, time.Now().Unix()+60*60*24)
return download, nil
}
download, err := rd.UnrestrictLink(link)
if err != nil {
return nil, err
}
tokenMap.Set(link, download)
err = rd.downloadClient.VerifyLink(download.Download)
if utils.IsBWLimitExceeded(err) {
rd.TokenManager.SetTokenAsExpired(token)
continue
}
if err != nil {
return nil, err
}
rd.verifiedLinks.Set(download.ID, time.Now().Unix()+60*60*24)
return download, err return download, err
} }
} }
func (rd *RealDebrid) UnrestrictLinkWithToken(token, link string) (*Download, error) { func (rd *RealDebrid) UnrestrictLink(link string) (*Download, error) {
// check if the link is already unrestricted
if tokenMap, ok := rd.UnrestrictMap.Get(token); ok {
if d, ok := tokenMap.Get(link); ok {
// check if the link is in the verified links cache
if expiry, ok := rd.verifiedLinks.Get(d.Download); ok && expiry > time.Now().Unix() {
return d, nil
}
err := rd.downloadClient.VerifyLink(d.Download)
if err != nil {
return nil, err
}
rd.verifiedLinks.Set(d.Download, time.Now().Unix()+60*60*24)
return d, nil
}
}
data := url.Values{} data := url.Values{}
if strings.HasPrefix(link, "https://real-debrid.com/d/") { if strings.HasPrefix(link, "https://real-debrid.com/d/") {
// set link to max 39 chars (26 + 13) // set link to max 39 chars (26 + 13)
@@ -145,7 +129,6 @@ func (rd *RealDebrid) UnrestrictLinkWithToken(token, link string) (*Download, er
req.Header.Set("Content-Type", "application/x-www-form-urlencoded") req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
rd.unrestrictClient.SetToken(token)
// at this point, any errors mean that the link has expired and we need to repair it // at this point, any errors mean that the link has expired and we need to repair it
resp, err := rd.unrestrictClient.Do(req) resp, err := rd.unrestrictClient.Do(req)
if err != nil { if err != nil {
@@ -167,16 +150,6 @@ func (rd *RealDebrid) UnrestrictLinkWithToken(token, link string) (*Download, er
return nil, fmt.Errorf("undecodable response: %v", err) return nil, fmt.Errorf("undecodable response: %v", err)
} }
tokenMap, _ := rd.UnrestrictMap.Get(token)
tokenMap.Set(link, &response)
err = rd.downloadClient.VerifyLink(response.Download)
if err != nil {
return nil, err
}
rd.verifiedLinks.Set(response.Download, time.Now().Unix()+60*60*24)
// rd.log.Debugf("Unrestricted link %s into %s", link, response.Download) // rd.log.Debugf("Unrestricted link %s into %s", link, response.Download)
return &response, nil return &response, nil
} }
@@ -428,6 +401,6 @@ func (rd *RealDebrid) AvailabilityCheck(hashes []string) (AvailabilityResponse,
return response, nil return response, nil
} }
func (rd *RealDebrid) GetToken() (string, error) { func (rd *RealDebrid) DownloadFile(req *http.Request) (*http.Response, error) {
return rd.tokenManager.GetCurrentToken() return rd.downloadClient.Do(req)
} }

View File

@@ -13,7 +13,7 @@ type Token struct {
type DownloadTokenManager struct { type DownloadTokenManager struct {
tokens []Token tokens []Token
current int current int
mu sync.Mutex mu sync.RWMutex
} }
// NewDownloadTokenManager initializes a new DownloadTokenManager with the given tokens. // NewDownloadTokenManager initializes a new DownloadTokenManager with the given tokens.
@@ -27,8 +27,8 @@ func NewDownloadTokenManager(tokenStrings []string) *DownloadTokenManager {
// GetCurrentToken returns the current non-expired token. // GetCurrentToken returns the current non-expired token.
func (dtm *DownloadTokenManager) GetCurrentToken() (string, error) { func (dtm *DownloadTokenManager) GetCurrentToken() (string, error) {
dtm.mu.Lock() dtm.mu.RLock()
defer dtm.mu.Unlock() defer dtm.mu.RUnlock()
for { for {
if !dtm.tokens[dtm.current].expired { if !dtm.tokens[dtm.current].expired {
@@ -43,13 +43,19 @@ func (dtm *DownloadTokenManager) GetCurrentToken() (string, error) {
} }
} }
// SetCurrentTokenExpired sets the current token as expired. // SetTokenAsExpired sets the specified token as expired.
func (dtm *DownloadTokenManager) SetCurrentTokenExpired() { func (dtm *DownloadTokenManager) SetTokenAsExpired(token string) error {
dtm.mu.Lock() dtm.mu.Lock()
defer dtm.mu.Unlock() defer dtm.mu.Unlock()
dtm.tokens[dtm.current].expired = true for i, t := range dtm.tokens {
dtm.current = (dtm.current + 1) % len(dtm.tokens) if t.value == token {
dtm.tokens[i].expired = true
return nil
}
}
return fmt.Errorf("token not found")
} }
// ResetAllTokens resets all tokens to expired=false. // ResetAllTokens resets all tokens to expired=false.

10
pkg/utils/bwlimit.go Normal file
View File

@@ -0,0 +1,10 @@
package utils
import "github.com/debridmediamanager/zurg/pkg/http"
func IsBWLimitExceeded(err error) bool {
if dlErr, ok := err.(*http.DownloadErrorResponse); ok && dlErr.Message == "bytes_limit_reached" {
return true
}
return false
}