Compare commits

..

10 Commits

Author SHA1 Message Date
Ben Adrian Sarmiento
f04a0ff42c Ensure we have reachable hosts 2024-08-28 01:10:09 +02:00
Ben Adrian Sarmiento
bac6351071 Retry on download requests 2024-08-26 20:19:12 +02:00
Ben Adrian Sarmiento
d4adc2ae21 Change default download timeout from 10 to 15 2024-08-26 12:49:30 +02:00
Ben Adrian Sarmiento
83d928a540 Do not verify if fresh link 2024-08-26 10:22:37 +02:00
Ben Adrian Sarmiento
78be877efe Do not repair if download error, just retry from client side 2024-08-25 13:48:01 +02:00
Ben Adrian Sarmiento
54230c9eaa Set verified links cache time to 3 mins 2024-08-25 13:30:40 +02:00
Ben Adrian Sarmiento
11e9c5d431 Only test for ipv6 if network supports it 2024-08-25 13:29:50 +02:00
Ben Adrian Sarmiento
f5cbf150ef Do no replace with random host if not on reachable host list 2024-08-21 23:27:22 +02:00
Ben Adrian Sarmiento
517aca22ab Use non default dial context 2024-08-21 23:05:06 +02:00
Ben Adrian Sarmiento
33cfdbbbea Add new ipv4 and ipv6 hostnames 2024-08-21 23:03:59 +02:00
10 changed files with 168 additions and 153 deletions

View File

@@ -57,8 +57,8 @@ func MainApp(configPath string) {
proxyURL = os.Getenv("PROXY") proxyURL = os.Getenv("PROXY")
} }
repoClient4 := http.NewHTTPClient("", 0, 1, false, []string{}, proxyURL, nil, log.Named("network_test")) repoClient4 := http.NewHTTPClient("", 0, 2, false, []string{}, proxyURL, nil, log.Named("network_test"))
repoClient6 := http.NewHTTPClient("", 0, 1, true, []string{}, proxyURL, nil, log.Named("network_test")) repoClient6 := http.NewHTTPClient("", 0, 2, true, []string{}, proxyURL, nil, log.Named("network_test"))
repo := http.NewIPRepository(repoClient4, repoClient6, "", log.Named("network_test")) repo := http.NewIPRepository(repoClient4, repoClient6, "", log.Named("network_test"))
var hosts []string var hosts []string

View File

@@ -192,7 +192,7 @@ func (z *ZurgConfig) GetApiTimeoutSecs() int {
func (z *ZurgConfig) GetDownloadTimeoutSecs() int { func (z *ZurgConfig) GetDownloadTimeoutSecs() int {
if z.DownloadTimeoutSecs == 0 { if z.DownloadTimeoutSecs == 0 {
return 10 return 15
} }
return z.DownloadTimeoutSecs return z.DownloadTimeoutSecs
} }

View File

@@ -541,6 +541,7 @@ func (zr *Handlers) handleHome(resp http.ResponseWriter, req *http.Request) {
<form method="post" action="/torrents/reset-repair-state"> <form method="post" action="/torrents/reset-repair-state">
<input type="submit" value="Reset repair state" /> Reset repair state of all torrents so they can be repaired again <input type="submit" value="Reset repair state" /> Reset repair state of all torrents so they can be repaired again
</form> </form>
// add more utilities here
</td> </td>
</tr> </tr>
<tr> <tr>

View File

@@ -396,7 +396,7 @@ func (hs *Handlers) handleDownloadLink(resp http.ResponseWriter, req *http.Reque
filename = chi.URLParam(req, "filename") filename = chi.URLParam(req, "filename")
} }
if download, ok := hs.torMgr.DownloadMap.Get(filename); ok { if download, ok := hs.torMgr.DownloadMap.Get(filename); ok {
hs.downloader.DownloadLink(download, resp, req, hs.torMgr, hs.cfg, hs.log) hs.downloader.DownloadLink(download, resp, req, hs.cfg, hs.log)
} else { } else {
http.NotFound(resp, req) http.NotFound(resp, req)
} }

View File

@@ -378,10 +378,10 @@ func (t *TorrentManager) deleteInfoFile(torrentID string) {
func (t *TorrentManager) mountNewDownloads() { func (t *TorrentManager) mountNewDownloads() {
token := t.Config.GetToken() token := t.Config.GetToken()
tokenMap, _ := t.rd.UnrestrictMap.Get(token) unrestrictCache, _ := t.rd.UnrestrictCache.Get(token)
// clear maps // clear maps
tokenMap.Clear() unrestrictCache.Clear()
t.DownloadMap.Clear() t.DownloadMap.Clear()
downloads := t.rd.GetDownloads() downloads := t.rd.GetDownloads()
@@ -391,7 +391,7 @@ func (t *TorrentManager) mountNewDownloads() {
if strings.HasPrefix(downloads[i].Link, "https://real-debrid.com/d/") { if strings.HasPrefix(downloads[i].Link, "https://real-debrid.com/d/") {
downloads[i].Link = downloads[i].Link[0:39] downloads[i].Link = downloads[i].Link[0:39]
tokenMap.Set(downloads[i].Link, &downloads[i]) unrestrictCache.Set(downloads[i].Link, &downloads[i])
continue continue
} }

View File

@@ -19,7 +19,7 @@ const (
EXPIRED_LINK_TOLERANCE_HOURS = 24 EXPIRED_LINK_TOLERANCE_HOURS = 24
) )
// StartRepairJob is a permanent job that runs every periodically to repair broken torrents // StartRepairJob is a permanent job that runs periodically to repair broken torrents
func (t *TorrentManager) StartRepairJob() { func (t *TorrentManager) StartRepairJob() {
if !t.Config.EnableRepair() { if !t.Config.EnableRepair() {
t.repairLog.Warn("Repair is disabled, skipping repair job") t.repairLog.Warn("Repair is disabled, skipping repair job")
@@ -29,7 +29,6 @@ func (t *TorrentManager) StartRepairJob() {
t.RepairQueue = mapset.NewSet[*Torrent]() t.RepairQueue = mapset.NewSet[*Torrent]()
t.RepairAllTrigger = make(chan struct{}) t.RepairAllTrigger = make(chan struct{})
// periodic repair worker
t.workerPool.Submit(func() { t.workerPool.Submit(func() {
t.repairLog.Debug("Starting periodic repair job") t.repairLog.Debug("Starting periodic repair job")
repairTicker := time.NewTicker(time.Duration(t.Config.GetRepairEveryMins()) * time.Minute) repairTicker := time.NewTicker(time.Duration(t.Config.GetRepairEveryMins()) * time.Minute)
@@ -48,6 +47,7 @@ func (t *TorrentManager) StartRepairJob() {
} }
// EnqueueForRepair allows an on-demand repair to be initiated. // EnqueueForRepair allows an on-demand repair to be initiated.
// if torrent is nil, all torrents will be repaired
func (t *TorrentManager) EnqueueForRepair(torrent *Torrent) { func (t *TorrentManager) EnqueueForRepair(torrent *Torrent) {
if !t.Config.EnableRepair() { if !t.Config.EnableRepair() {
if torrent != nil { if torrent != nil {

View File

@@ -116,10 +116,9 @@ func (dl *Downloader) DownloadFile(
unrestrict, err := torMgr.UnrestrictFile(file) unrestrict, err := torMgr.UnrestrictFile(file)
if utils.AreAllTokensExpired(err) { if utils.AreAllTokensExpired(err) {
// log.Errorf("Your account has reached the bandwidth limit, please try again after 12AM CET") // log.Errorf("Your account has reached the bandwidth limit, please try again after 12AM CET")
http.Error(resp, "File is not available (bandwidth limit reached)", http.StatusBadRequest) http.Error(resp, "File is not available (bandwidth limit reached, all tokens are expired)", http.StatusBadRequest)
return return
} } else if err != nil {
if err != nil {
if file.State.Event(context.Background(), "break_file") == nil { if file.State.Event(context.Background(), "break_file") == nil {
torMgr.EnqueueForRepair(torrent) torMgr.EnqueueForRepair(torrent)
} }
@@ -144,7 +143,7 @@ func (dl *Downloader) DownloadFile(
redirect(resp, req, unrestrict.Download) redirect(resp, req, unrestrict.Download)
} else { } else {
// log.Debugf("Streaming %s", unrestrict.Download) // log.Debugf("Streaming %s", unrestrict.Download)
dl.streamFileToResponse(torrent, file, unrestrict, resp, req, torMgr, cfg, log) dl.streamFileToResponse(file, unrestrict, resp, req, cfg, log)
} }
} }
@@ -153,7 +152,6 @@ func (dl *Downloader) DownloadLink(
unrestrict *realdebrid.Download, unrestrict *realdebrid.Download,
resp http.ResponseWriter, resp http.ResponseWriter,
req *http.Request, req *http.Request,
torMgr *intTor.TorrentManager,
cfg config.ConfigInterface, cfg config.ConfigInterface,
log *logutil.Logger, log *logutil.Logger,
) { ) {
@@ -162,17 +160,15 @@ func (dl *Downloader) DownloadLink(
redirect(resp, req, unrestrict.Download) redirect(resp, req, unrestrict.Download)
} else { } else {
log.Debugf("Streaming %s", unrestrict.Download) log.Debugf("Streaming %s", unrestrict.Download)
dl.streamFileToResponse(nil, nil, unrestrict, resp, req, torMgr, cfg, log) dl.streamFileToResponse(nil, unrestrict, resp, req, cfg, log)
} }
} }
func (dl *Downloader) streamFileToResponse( func (dl *Downloader) streamFileToResponse(
torrent *intTor.Torrent, file *intTor.File, // can be nil if downloading a link
file *intTor.File,
unrestrict *realdebrid.Download, unrestrict *realdebrid.Download,
resp http.ResponseWriter, resp http.ResponseWriter,
req *http.Request, req *http.Request,
torMgr *intTor.TorrentManager,
cfg config.ConfigInterface, cfg config.ConfigInterface,
log *logutil.Logger, log *logutil.Logger,
) { ) {
@@ -182,7 +178,7 @@ func (dl *Downloader) streamFileToResponse(
if file != nil { if file != nil {
log.Errorf("Error creating new request for file %s: %v", file.Path, err) log.Errorf("Error creating new request for file %s: %v", file.Path, err)
} }
http.Error(resp, "File is not available (can't create request)", http.StatusBadRequest) http.Error(resp, "File is not available (can't create request)", http.StatusInternalServerError)
return return
} }
@@ -200,31 +196,15 @@ func (dl *Downloader) streamFileToResponse(
if utils.IsBytesLimitReached(err) { if utils.IsBytesLimitReached(err) {
dl.rd.TokenManager.SetTokenAsExpired(unrestrict.Token, "bandwidth limit exceeded") dl.rd.TokenManager.SetTokenAsExpired(unrestrict.Token, "bandwidth limit exceeded")
// log.Errorf("Your account has reached the bandwidth limit, please try again after 12AM CET") // log.Errorf("Your account has reached the bandwidth limit, please try again after 12AM CET")
http.Error(resp, "File is not available (bandwidth limit reached)", http.StatusBadRequest) http.Error(resp, "File is not available (bandwidth limit reached)", http.StatusInternalServerError)
return
} else if utils.IsInvalidDownloadCode(err) {
http.Error(resp, "File is not available (invalid download code)", http.StatusInternalServerError)
return return
} else if err != nil { } else if err != nil {
log.Errorf("Cannot download file %s: %v", unrestrict.Download, err) log.Errorf("Cannot download file %s: %v", unrestrict.Download, err)
if file != nil && file.State.Event(context.Background(), "break_file") == nil { http.Error(resp, "File is not available (can't download)", http.StatusInternalServerError)
torMgr.EnqueueForRepair(torrent)
}
http.Error(resp, "File is not available (can't download)", http.StatusBadRequest)
return return
} }
defer downloadResp.Body.Close() defer downloadResp.Body.Close()
// Check if the download was not successful
if downloadResp.StatusCode != http.StatusOK && downloadResp.StatusCode != http.StatusPartialContent {
log.Errorf("Received a %s status code for file %s", downloadResp.Status, unrestrict.Filename)
if file != nil && file.State.Event(context.Background(), "break_file") == nil {
torMgr.EnqueueForRepair(torrent)
}
http.Error(resp, "File is not available (download error)", http.StatusBadRequest)
return
}
if cr := downloadResp.Header.Get("Content-Range"); cr != "" { if cr := downloadResp.Header.Get("Content-Range"); cr != "" {
resp.Header().Set("Content-Range", cr) resp.Header().Set("Content-Range", cr)
} }

View File

@@ -97,6 +97,9 @@ func NewHTTPClient(
MaxIdleConnsPerHost: 32, MaxIdleConnsPerHost: 32,
MaxConnsPerHost: 32, MaxConnsPerHost: 32,
IdleConnTimeout: 90 * time.Second, IdleConnTimeout: 90 * time.Second,
DialContext: func(ctx context.Context, network, address string) (net.Conn, error) {
return dialer.Dial(network, address)
},
} }
if forceIPv6 { if forceIPv6 {
@@ -154,7 +157,7 @@ func (r *HTTPClient) Do(req *http.Request) (*http.Response, error) {
} }
if len(r.hosts) > 0 { if len(r.hosts) > 0 {
r.ensureReachableHost(req) r.ensureReachableDownloadServer(req)
} }
if r.rateLimiter != nil { if r.rateLimiter != nil {
@@ -197,15 +200,20 @@ func (r *HTTPClient) Do(req *http.Request) (*http.Response, error) {
return resp, err return resp, err
} }
func (r *HTTPClient) ensureReachableHost(req *http.Request) { // ensureReachableDownloadServer ensures that the request is sent to a reachable host
// if not, it will replace the host with a reachable one
func (r *HTTPClient) ensureReachableDownloadServer(req *http.Request) {
if len(r.hosts) == 0 {
return
}
// skip if not a download server
if !strings.Contains(req.Host, ".download.real-debrid.") { if !strings.Contains(req.Host, ".download.real-debrid.") {
return return
} }
// skip CDN servers // skip if CDN servers
if req.Host[0] >= 'a' && req.Host[0] <= 'z' { if req.Host[0] >= 'a' && req.Host[0] <= 'z' {
return return
} }
// check if req.Host is in r.hosts // check if req.Host is in r.hosts
if r.CheckIfHostIsReachable(req.Host) { if r.CheckIfHostIsReachable(req.Host) {
return return
@@ -217,15 +225,17 @@ func (r *HTTPClient) ensureReachableHost(req *http.Request) {
} else if strings.HasSuffix(req.Host, ".cloud") { } else if strings.HasSuffix(req.Host, ".cloud") {
newHost = strings.Replace(req.Host, ".cloud", ".com", 1) newHost = strings.Replace(req.Host, ".cloud", ".com", 1)
} }
// check if newHost is reachable
if r.CheckIfHostIsReachable(newHost) { if r.CheckIfHostIsReachable(newHost) {
req.Host = newHost req.Host = newHost
req.URL.Host = req.Host } else {
return
}
// just pick a random host // just pick a random host
req.Host = r.hosts[rand.Intn(len(r.hosts))] req.Host = r.hosts[rand.Intn(len(r.hosts))]
}
req.URL.Host = req.Host req.URL.Host = req.Host
// just retain the original host if not in the list of reachable hosts
// r.log.Debugf("Host %s is not found on the list of reachable hosts", req.Host)
} }
// CheckIfHostIsReachable checks if the given host is passed in the list of reachable hosts // CheckIfHostIsReachable checks if the given host is passed in the list of reachable hosts
@@ -256,6 +266,23 @@ func (r *HTTPClient) shouldRetry(req *http.Request, resp *http.Response, err err
return false return false
} }
// retry on timeout errors for download requests
if err != nil && strings.Contains(err.Error(), "timeout") && strings.Contains(req.Host, ".download.real-debrid.") && len(r.hosts) > 1 {
oldHost := req.Host
// remove old host from the list of reachable hosts
for i, host := range r.hosts {
if host == oldHost {
r.hosts = append(r.hosts[:i], r.hosts[i+1:]...)
break
}
}
// pick a new host from the list
req.Host = r.hosts[rand.Intn(len(r.hosts))]
req.URL.Host = req.Host
r.log.Debugf("Download timed out, attempt #%d, retrying with a new host (%s -> %s)", attempts+1, oldHost, req.URL.Host)
return true
}
if apiErr, ok := err.(*ApiErrorResponse); ok { if apiErr, ok := err.(*ApiErrorResponse); ok {
switch apiErr.Code { switch apiErr.Code {
case 5: // Slow down (retry infinitely) case 5: // Slow down (retry infinitely)
@@ -291,21 +318,9 @@ func (r *HTTPClient) shouldRetry(req *http.Request, resp *http.Response, err err
default: default:
return false return false
} }
} else if downloadErr, ok := err.(*DownloadErrorResponse); ok { } else if _, ok := err.(*DownloadErrorResponse); ok {
switch downloadErr.Message {
case "invalid_download_code": // 404
if attempts >= r.maxRetries {
r.log.Debugf("Invalid download code, attempt #%d", attempts+1)
return false return false
} }
secs := r.backoff(attempts, rateLimitSleep)
r.log.Debugf("Invalid download code, attempt #%d, retrying in %d seconds", attempts+1, secs/time.Second)
time.Sleep(secs)
return true
default:
return false
}
}
// succesful requests // succesful requests
if resp != nil { if resp != nil {
@@ -319,7 +334,7 @@ func (r *HTTPClient) shouldRetry(req *http.Request, resp *http.Response, err err
} }
if attempts >= r.maxRetries { if attempts >= r.maxRetries {
r.log.Debugf("Request failed, attempt #%d (error=%v)", attempts+1, err) r.log.Errorf("Request failed, attempt #%d (error=%v), giving up", attempts+1, err)
return false return false
} }
secs := r.backoff(attempts, 1) secs := r.backoff(attempts, 1)
@@ -342,10 +357,6 @@ func (r *HTTPClient) VerifyLink(link string) error {
if err != nil { if err != nil {
return err return err
} }
timeout := time.Duration(r.timeoutSecs) * time.Second
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
req = req.WithContext(ctx)
resp, err := r.Do(req) resp, err := r.Do(req)
if err != nil { if err != nil {
return err return err

View File

@@ -11,9 +11,12 @@ import (
"net/url" "net/url"
"os" "os"
"sort" "sort"
"strings"
"sync"
"time" "time"
"github.com/debridmediamanager/zurg/pkg/logutil" "github.com/debridmediamanager/zurg/pkg/logutil"
"github.com/panjf2000/ants/v2"
) )
type IPRepository struct { type IPRepository struct {
@@ -89,6 +92,7 @@ func (r *IPRepository) GetHosts(numberOfHosts int, ipv6 bool) []string {
kvList = append(kvList, kv{k, v}) kvList = append(kvList, kv{k, v})
} }
// Sort by latency from lowest to highest
sort.Slice(kvList, func(i, j int) bool { sort.Slice(kvList, func(i, j int) bool {
return kvList[i].Value < kvList[j].Value return kvList[i].Value < kvList[j].Value
}) })
@@ -105,65 +109,78 @@ func (r *IPRepository) GetHosts(numberOfHosts int, ipv6 bool) []string {
} }
func (r *IPRepository) runLatencyTest() { func (r *IPRepository) runLatencyTest() {
limit := 99 var wg sync.WaitGroup
start := 0 ipv6Enabled := false
for { if r.canConnectToIPv6() {
lastDomainsWorked := false r.log.Info("Your network supports IPv6")
for i := start; i <= limit; i++ { ipv6Enabled = true
domain := fmt.Sprintf("%d.download.real-debrid.com", i)
ips, err := net.LookupIP(domain)
if err != nil || len(ips) == 0 {
continue
}
latency, err := r.testDomainLatency(r.ipv4client, domain)
if err == nil {
r.ipv4latencyMap[domain] = latency
r.log.Debugf("Latency from ipv4 %s: %.5f seconds", domain, latency)
if i >= limit-2 {
lastDomainsWorked = true
}
}
latency, err = r.testDomainLatency(r.ipv6client, domain)
if err == nil {
r.ipv6latencyMap[domain] = latency
r.log.Debugf("Latency from ipv6 %s: %.5f seconds", domain, latency)
if i >= limit-2 {
lastDomainsWorked = true
}
}
domain = fmt.Sprintf("%d.download.real-debrid.cloud", i)
ips, err = net.LookupIP(domain)
if err != nil || len(ips) == 0 {
continue
}
latency, err = r.testDomainLatency(r.ipv4client, domain)
if err == nil {
r.ipv4latencyMap[domain] = latency
r.log.Debugf("Latency from ipv4 %s: %.5f seconds", domain, latency)
if i >= limit-2 {
lastDomainsWorked = true
}
}
latency, err = r.testDomainLatency(r.ipv6client, domain)
if err == nil {
r.ipv6latencyMap[domain] = latency
r.log.Debugf("Latency from ipv6 %s: %.5f seconds", domain, latency)
if i >= limit-2 {
lastDomainsWorked = true
}
}
}
if lastDomainsWorked {
start = limit + 1
limit += 10
} else { } else {
break r.log.Info("Your network does not support IPv6")
} }
p, _ := ants.NewPoolWithFunc(8, func(h interface{}) {
defer wg.Done()
host := h.(string)
ips, err := net.LookupIP(host)
if err == nil && len(ips) > 0 {
latency, err := r.testDomainLatency(r.ipv4client, host)
if err == nil {
r.ipv4latencyMap[host] = latency
r.log.Debugf("Latency from ipv4 %s: %.5f seconds", host, latency)
}
if ipv6Enabled {
latency, err = r.testDomainLatency(r.ipv6client, host)
if err == nil {
r.ipv6latencyMap[host] = latency
r.log.Debugf("Latency from ipv6 %s: %.5f seconds", host, latency)
}
}
if strings.HasSuffix(host, ".download.real-debrid.com") {
ipv4Host := strings.Replace(host, ".download.real-debrid.com", "-4.download.real-debrid.com", 1)
latency, err := r.testDomainLatency(r.ipv4client, ipv4Host)
if err == nil {
r.ipv4latencyMap[ipv4Host] = latency
r.log.Debugf("Latency from ipv4 %s: %.5f seconds", ipv4Host, latency)
}
if ipv6Enabled {
ipv6Host := strings.Replace(host, ".download.real-debrid.com", "-6.download.real-debrid.com", 1)
latency, err = r.testDomainLatency(r.ipv6client, ipv6Host)
if err == nil {
r.ipv6latencyMap[ipv6Host] = latency
r.log.Debugf("Latency from ipv6 %s: %.5f seconds", ipv6Host, latency)
}
}
}
}
})
defer p.Release()
for i := 1; i <= 99; i++ {
wg.Add(2)
_ = p.Invoke(fmt.Sprintf("%d.download.real-debrid.com", i))
_ = p.Invoke(fmt.Sprintf("%d.download.real-debrid.cloud", i))
}
// for i := 'a'; i <= 'z'; i++ {
// for j := 'a'; j <= 'z'; j++ {
// for k := 'a'; k <= 'z'; k++ {
// wg.Add(2)
// _ = p.Invoke(fmt.Sprintf("%c%c%c.download.real-debrid.com", i, j, k))
// _ = p.Invoke(fmt.Sprintf("%c%c%c1.download.real-debrid.com", i, j, k))
// }
// }
// }
wg.Wait()
r.log.Infof("Found %d ipv4 hosts", len(r.ipv4latencyMap))
if ipv6Enabled {
r.log.Infof("Found %d ipv6 hosts", len(r.ipv6latencyMap))
} }
} }
@@ -252,3 +269,18 @@ func (r *IPRepository) writeLatencyFile(latencyFile string, data interface{}) {
return return
} }
} }
func (r *IPRepository) canConnectToIPv6() bool {
address := "[2001:4860:4860::8888]:53" // Google Public DNS IPv6 address on port 53
timeout := 5 * time.Second // Timeout duration
conn, err := net.DialTimeout("tcp6", address, timeout)
if err != nil {
fmt.Printf("Failed to connect to IPv6 address %s: %v\n", address, err)
return false
}
defer conn.Close()
fmt.Printf("Successfully connected to IPv6 address %s\n", address)
return true
}

View File

@@ -17,7 +17,7 @@ import (
) )
type RealDebrid struct { type RealDebrid struct {
UnrestrictMap cmap.ConcurrentMap[string, cmap.ConcurrentMap[string, *Download]] UnrestrictCache cmap.ConcurrentMap[string, cmap.ConcurrentMap[string, *Download]]
TokenManager *DownloadTokenManager TokenManager *DownloadTokenManager
torrentsCache []Torrent torrentsCache []Torrent
verifiedLinks cmap.ConcurrentMap[string, int64] verifiedLinks cmap.ConcurrentMap[string, int64]
@@ -45,7 +45,7 @@ func NewRealDebrid(apiClient,
} }
rd := &RealDebrid{ rd := &RealDebrid{
UnrestrictMap: cmap.New[cmap.ConcurrentMap[string, *Download]](), UnrestrictCache: cmap.New[cmap.ConcurrentMap[string, *Download]](),
TokenManager: NewDownloadTokenManager(downloadTokens, log), TokenManager: NewDownloadTokenManager(downloadTokens, log),
torrentsCache: []Torrent{}, torrentsCache: []Torrent{},
verifiedLinks: cmap.New[int64](), verifiedLinks: cmap.New[int64](),
@@ -59,16 +59,17 @@ func NewRealDebrid(apiClient,
} }
for _, token := range downloadTokens { for _, token := range downloadTokens {
rd.UnrestrictMap.Set(token, cmap.New[*Download]()) rd.UnrestrictCache.Set(token, cmap.New[*Download]())
} }
return rd return rd
} }
const DOWNLOAD_LINK_EXPIRY = 60 * 60 * 1.5 // 1.5 hours const DOWNLOAD_LINK_EXPIRY = 60 * 3 // 3 minutes
func (rd *RealDebrid) UnrestrictAndVerify(link string) (*Download, error) { func (rd *RealDebrid) UnrestrictAndVerify(link string) (*Download, error) {
for { for {
now := time.Now().Unix()
token, err := rd.TokenManager.GetCurrentToken() token, err := rd.TokenManager.GetCurrentToken()
if err != nil { if err != nil {
// when all tokens are expired // when all tokens are expired
@@ -76,48 +77,38 @@ func (rd *RealDebrid) UnrestrictAndVerify(link string) (*Download, error) {
} }
// check if the link is already unrestricted // check if the link is already unrestricted
tokenMap, _ := rd.UnrestrictMap.Get(token) unrestrictCache, _ := rd.UnrestrictCache.Get(token)
if tokenMap.Has(link) { if unrestrictCache.Has(link) {
download, _ := tokenMap.Get(link) download, _ := unrestrictCache.Get(link)
// check if the link is in the verified links cache // check if the link is in the verified links cache and not expired
if expiry, ok := rd.verifiedLinks.Get(download.ID); ok && expiry > time.Now().Unix() { if expiry, ok := rd.verifiedLinks.Get(download.ID); ok && expiry > now {
return download, nil return download, nil
} else if ok {
// if the link is expired, remove it from the verified links cache
rd.verifiedLinks.Remove(download.ID)
} }
// check if the link is still valid (not in the cache or expired) // we need to re-verify the link
rd.verifiedLinks.Remove(download.ID)
err := rd.downloadClient.VerifyLink(download.Download) err := rd.downloadClient.VerifyLink(download.Download)
if utils.IsBytesLimitReached(err) { if err == nil {
// yes? then extend the expiry time?
rd.verifiedLinks.Set(download.ID, now+DOWNLOAD_LINK_EXPIRY)
return download, nil
} else if utils.IsBytesLimitReached(err) {
rd.TokenManager.SetTokenAsExpired(token, "bandwidth limit exceeded") rd.TokenManager.SetTokenAsExpired(token, "bandwidth limit exceeded")
continue continue
} else if err == nil {
rd.verifiedLinks.Set(download.ID, time.Now().Unix()+DOWNLOAD_LINK_EXPIRY)
return download, nil
} }
// if verification failed, remove the link from the token map // if verification failed, remove the link from the token map
tokenMap.Remove(link) unrestrictCache.Remove(link)
} }
download, err := rd.UnrestrictLinkWithToken(link, token) download, err := rd.UnrestrictLinkWithToken(link, token)
if err != nil { if err != nil {
return nil, err return nil, err
} }
download.Token = token unrestrictCache.Set(link, download)
rd.verifiedLinks.Set(download.ID, now+DOWNLOAD_LINK_EXPIRY)
tokenMap.Set(link, download)
err = rd.downloadClient.VerifyLink(download.Download)
if utils.IsBytesLimitReached(err) {
rd.TokenManager.SetTokenAsExpired(token, "bandwidth limit exceeded")
continue
} else if err != nil {
return nil, err
}
rd.verifiedLinks.Set(download.ID, time.Now().Unix()+DOWNLOAD_LINK_EXPIRY)
return download, err return download, err
} }
@@ -162,6 +153,8 @@ func (rd *RealDebrid) UnrestrictLinkWithToken(link, token string) (*Download, er
return nil, fmt.Errorf("undecodable response: %v", err) return nil, fmt.Errorf("undecodable response: %v", err)
} }
response.Token = token
// rd.log.Debugf("Unrestricted link %s into %s", link, response.Download) // rd.log.Debugf("Unrestricted link %s into %s", link, response.Download)
return &response, nil return &response, nil
} }
@@ -428,30 +421,28 @@ func (rd *RealDebrid) MonitorExpiredTokens() {
i++ i++
expiredTokens := rd.TokenManager.GetExpiredTokens() expiredTokens := rd.TokenManager.GetExpiredTokens()
for _, token := range expiredTokens { for _, token := range expiredTokens {
tokenMap, _ := rd.UnrestrictMap.Get(token) unrestrictCache, _ := rd.UnrestrictCache.Get(token)
stillExpired := true
skipAll := false skipAll := false
tokenMap.IterCb(func(key string, download *Download) { unrestrictCache.IterCb(func(key string, download *Download) {
if skipAll { if skipAll {
return return
} }
err := rd.downloadClient.VerifyLink(download.Download) err := rd.downloadClient.VerifyLink(download.Download)
if err != nil { if err != nil {
if utils.IsBytesLimitReached(err) { if utils.IsBytesLimitReached(err) {
if i%15 == 0 { if i%10 == 0 {
rd.log.Debugf("Token %s is still expired", utils.MaskToken(token)) rd.log.Debugf("Token %s is still expired", utils.MaskToken(token))
} }
// we already know that it is still expired
skipAll = true skipAll = true
} }
return return
} }
stillExpired = false
skipAll = true skipAll = true
rd.verifiedLinks.Set(download.ID, time.Now().Unix()+DOWNLOAD_LINK_EXPIRY) rd.verifiedLinks.Set(download.ID, time.Now().Unix()+DOWNLOAD_LINK_EXPIRY)
})
if !stillExpired {
rd.TokenManager.SetTokenAsUnexpired(token) rd.TokenManager.SetTokenAsUnexpired(token)
} })
} }
time.Sleep(sleepPeriod) time.Sleep(sleepPeriod)
} }