Compare commits
10 Commits
7ce8a01a3b
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f04a0ff42c | ||
|
|
bac6351071 | ||
|
|
d4adc2ae21 | ||
|
|
83d928a540 | ||
|
|
78be877efe | ||
|
|
54230c9eaa | ||
|
|
11e9c5d431 | ||
|
|
f5cbf150ef | ||
|
|
517aca22ab | ||
|
|
33cfdbbbea |
@@ -57,8 +57,8 @@ func MainApp(configPath string) {
|
||||
proxyURL = os.Getenv("PROXY")
|
||||
}
|
||||
|
||||
repoClient4 := http.NewHTTPClient("", 0, 1, false, []string{}, proxyURL, nil, log.Named("network_test"))
|
||||
repoClient6 := http.NewHTTPClient("", 0, 1, true, []string{}, proxyURL, nil, log.Named("network_test"))
|
||||
repoClient4 := http.NewHTTPClient("", 0, 2, false, []string{}, proxyURL, nil, log.Named("network_test"))
|
||||
repoClient6 := http.NewHTTPClient("", 0, 2, true, []string{}, proxyURL, nil, log.Named("network_test"))
|
||||
repo := http.NewIPRepository(repoClient4, repoClient6, "", log.Named("network_test"))
|
||||
|
||||
var hosts []string
|
||||
|
||||
@@ -192,7 +192,7 @@ func (z *ZurgConfig) GetApiTimeoutSecs() int {
|
||||
|
||||
func (z *ZurgConfig) GetDownloadTimeoutSecs() int {
|
||||
if z.DownloadTimeoutSecs == 0 {
|
||||
return 10
|
||||
return 15
|
||||
}
|
||||
return z.DownloadTimeoutSecs
|
||||
}
|
||||
|
||||
@@ -541,6 +541,7 @@ func (zr *Handlers) handleHome(resp http.ResponseWriter, req *http.Request) {
|
||||
<form method="post" action="/torrents/reset-repair-state">
|
||||
<input type="submit" value="Reset repair state" /> Reset repair state of all torrents so they can be repaired again
|
||||
</form>
|
||||
// add more utilities here
|
||||
</td>
|
||||
</tr>
|
||||
<tr>
|
||||
|
||||
@@ -396,7 +396,7 @@ func (hs *Handlers) handleDownloadLink(resp http.ResponseWriter, req *http.Reque
|
||||
filename = chi.URLParam(req, "filename")
|
||||
}
|
||||
if download, ok := hs.torMgr.DownloadMap.Get(filename); ok {
|
||||
hs.downloader.DownloadLink(download, resp, req, hs.torMgr, hs.cfg, hs.log)
|
||||
hs.downloader.DownloadLink(download, resp, req, hs.cfg, hs.log)
|
||||
} else {
|
||||
http.NotFound(resp, req)
|
||||
}
|
||||
|
||||
@@ -378,10 +378,10 @@ func (t *TorrentManager) deleteInfoFile(torrentID string) {
|
||||
|
||||
func (t *TorrentManager) mountNewDownloads() {
|
||||
token := t.Config.GetToken()
|
||||
tokenMap, _ := t.rd.UnrestrictMap.Get(token)
|
||||
unrestrictCache, _ := t.rd.UnrestrictCache.Get(token)
|
||||
|
||||
// clear maps
|
||||
tokenMap.Clear()
|
||||
unrestrictCache.Clear()
|
||||
t.DownloadMap.Clear()
|
||||
|
||||
downloads := t.rd.GetDownloads()
|
||||
@@ -391,7 +391,7 @@ func (t *TorrentManager) mountNewDownloads() {
|
||||
|
||||
if strings.HasPrefix(downloads[i].Link, "https://real-debrid.com/d/") {
|
||||
downloads[i].Link = downloads[i].Link[0:39]
|
||||
tokenMap.Set(downloads[i].Link, &downloads[i])
|
||||
unrestrictCache.Set(downloads[i].Link, &downloads[i])
|
||||
continue
|
||||
}
|
||||
|
||||
|
||||
@@ -19,7 +19,7 @@ const (
|
||||
EXPIRED_LINK_TOLERANCE_HOURS = 24
|
||||
)
|
||||
|
||||
// StartRepairJob is a permanent job that runs every periodically to repair broken torrents
|
||||
// StartRepairJob is a permanent job that runs periodically to repair broken torrents
|
||||
func (t *TorrentManager) StartRepairJob() {
|
||||
if !t.Config.EnableRepair() {
|
||||
t.repairLog.Warn("Repair is disabled, skipping repair job")
|
||||
@@ -29,7 +29,6 @@ func (t *TorrentManager) StartRepairJob() {
|
||||
t.RepairQueue = mapset.NewSet[*Torrent]()
|
||||
t.RepairAllTrigger = make(chan struct{})
|
||||
|
||||
// periodic repair worker
|
||||
t.workerPool.Submit(func() {
|
||||
t.repairLog.Debug("Starting periodic repair job")
|
||||
repairTicker := time.NewTicker(time.Duration(t.Config.GetRepairEveryMins()) * time.Minute)
|
||||
@@ -48,6 +47,7 @@ func (t *TorrentManager) StartRepairJob() {
|
||||
}
|
||||
|
||||
// EnqueueForRepair allows an on-demand repair to be initiated.
|
||||
// if torrent is nil, all torrents will be repaired
|
||||
func (t *TorrentManager) EnqueueForRepair(torrent *Torrent) {
|
||||
if !t.Config.EnableRepair() {
|
||||
if torrent != nil {
|
||||
|
||||
@@ -116,10 +116,9 @@ func (dl *Downloader) DownloadFile(
|
||||
unrestrict, err := torMgr.UnrestrictFile(file)
|
||||
if utils.AreAllTokensExpired(err) {
|
||||
// log.Errorf("Your account has reached the bandwidth limit, please try again after 12AM CET")
|
||||
http.Error(resp, "File is not available (bandwidth limit reached)", http.StatusBadRequest)
|
||||
http.Error(resp, "File is not available (bandwidth limit reached, all tokens are expired)", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
if err != nil {
|
||||
} else if err != nil {
|
||||
if file.State.Event(context.Background(), "break_file") == nil {
|
||||
torMgr.EnqueueForRepair(torrent)
|
||||
}
|
||||
@@ -144,7 +143,7 @@ func (dl *Downloader) DownloadFile(
|
||||
redirect(resp, req, unrestrict.Download)
|
||||
} else {
|
||||
// log.Debugf("Streaming %s", unrestrict.Download)
|
||||
dl.streamFileToResponse(torrent, file, unrestrict, resp, req, torMgr, cfg, log)
|
||||
dl.streamFileToResponse(file, unrestrict, resp, req, cfg, log)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -153,7 +152,6 @@ func (dl *Downloader) DownloadLink(
|
||||
unrestrict *realdebrid.Download,
|
||||
resp http.ResponseWriter,
|
||||
req *http.Request,
|
||||
torMgr *intTor.TorrentManager,
|
||||
cfg config.ConfigInterface,
|
||||
log *logutil.Logger,
|
||||
) {
|
||||
@@ -162,17 +160,15 @@ func (dl *Downloader) DownloadLink(
|
||||
redirect(resp, req, unrestrict.Download)
|
||||
} else {
|
||||
log.Debugf("Streaming %s", unrestrict.Download)
|
||||
dl.streamFileToResponse(nil, nil, unrestrict, resp, req, torMgr, cfg, log)
|
||||
dl.streamFileToResponse(nil, unrestrict, resp, req, cfg, log)
|
||||
}
|
||||
}
|
||||
|
||||
func (dl *Downloader) streamFileToResponse(
|
||||
torrent *intTor.Torrent,
|
||||
file *intTor.File,
|
||||
file *intTor.File, // can be nil if downloading a link
|
||||
unrestrict *realdebrid.Download,
|
||||
resp http.ResponseWriter,
|
||||
req *http.Request,
|
||||
torMgr *intTor.TorrentManager,
|
||||
cfg config.ConfigInterface,
|
||||
log *logutil.Logger,
|
||||
) {
|
||||
@@ -182,7 +178,7 @@ func (dl *Downloader) streamFileToResponse(
|
||||
if file != nil {
|
||||
log.Errorf("Error creating new request for file %s: %v", file.Path, err)
|
||||
}
|
||||
http.Error(resp, "File is not available (can't create request)", http.StatusBadRequest)
|
||||
http.Error(resp, "File is not available (can't create request)", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -200,31 +196,15 @@ func (dl *Downloader) streamFileToResponse(
|
||||
if utils.IsBytesLimitReached(err) {
|
||||
dl.rd.TokenManager.SetTokenAsExpired(unrestrict.Token, "bandwidth limit exceeded")
|
||||
// log.Errorf("Your account has reached the bandwidth limit, please try again after 12AM CET")
|
||||
http.Error(resp, "File is not available (bandwidth limit reached)", http.StatusBadRequest)
|
||||
return
|
||||
} else if utils.IsInvalidDownloadCode(err) {
|
||||
http.Error(resp, "File is not available (invalid download code)", http.StatusInternalServerError)
|
||||
http.Error(resp, "File is not available (bandwidth limit reached)", http.StatusInternalServerError)
|
||||
return
|
||||
} else if err != nil {
|
||||
log.Errorf("Cannot download file %s: %v", unrestrict.Download, err)
|
||||
if file != nil && file.State.Event(context.Background(), "break_file") == nil {
|
||||
torMgr.EnqueueForRepair(torrent)
|
||||
}
|
||||
http.Error(resp, "File is not available (can't download)", http.StatusBadRequest)
|
||||
http.Error(resp, "File is not available (can't download)", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
defer downloadResp.Body.Close()
|
||||
|
||||
// Check if the download was not successful
|
||||
if downloadResp.StatusCode != http.StatusOK && downloadResp.StatusCode != http.StatusPartialContent {
|
||||
log.Errorf("Received a %s status code for file %s", downloadResp.Status, unrestrict.Filename)
|
||||
if file != nil && file.State.Event(context.Background(), "break_file") == nil {
|
||||
torMgr.EnqueueForRepair(torrent)
|
||||
}
|
||||
http.Error(resp, "File is not available (download error)", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
if cr := downloadResp.Header.Get("Content-Range"); cr != "" {
|
||||
resp.Header().Set("Content-Range", cr)
|
||||
}
|
||||
|
||||
@@ -97,6 +97,9 @@ func NewHTTPClient(
|
||||
MaxIdleConnsPerHost: 32,
|
||||
MaxConnsPerHost: 32,
|
||||
IdleConnTimeout: 90 * time.Second,
|
||||
DialContext: func(ctx context.Context, network, address string) (net.Conn, error) {
|
||||
return dialer.Dial(network, address)
|
||||
},
|
||||
}
|
||||
|
||||
if forceIPv6 {
|
||||
@@ -154,7 +157,7 @@ func (r *HTTPClient) Do(req *http.Request) (*http.Response, error) {
|
||||
}
|
||||
|
||||
if len(r.hosts) > 0 {
|
||||
r.ensureReachableHost(req)
|
||||
r.ensureReachableDownloadServer(req)
|
||||
}
|
||||
|
||||
if r.rateLimiter != nil {
|
||||
@@ -197,15 +200,20 @@ func (r *HTTPClient) Do(req *http.Request) (*http.Response, error) {
|
||||
return resp, err
|
||||
}
|
||||
|
||||
func (r *HTTPClient) ensureReachableHost(req *http.Request) {
|
||||
// ensureReachableDownloadServer ensures that the request is sent to a reachable host
|
||||
// if not, it will replace the host with a reachable one
|
||||
func (r *HTTPClient) ensureReachableDownloadServer(req *http.Request) {
|
||||
if len(r.hosts) == 0 {
|
||||
return
|
||||
}
|
||||
// skip if not a download server
|
||||
if !strings.Contains(req.Host, ".download.real-debrid.") {
|
||||
return
|
||||
}
|
||||
// skip CDN servers
|
||||
// skip if CDN servers
|
||||
if req.Host[0] >= 'a' && req.Host[0] <= 'z' {
|
||||
return
|
||||
}
|
||||
|
||||
// check if req.Host is in r.hosts
|
||||
if r.CheckIfHostIsReachable(req.Host) {
|
||||
return
|
||||
@@ -217,15 +225,17 @@ func (r *HTTPClient) ensureReachableHost(req *http.Request) {
|
||||
} else if strings.HasSuffix(req.Host, ".cloud") {
|
||||
newHost = strings.Replace(req.Host, ".cloud", ".com", 1)
|
||||
}
|
||||
// check if newHost is reachable
|
||||
if r.CheckIfHostIsReachable(newHost) {
|
||||
req.Host = newHost
|
||||
req.URL.Host = req.Host
|
||||
return
|
||||
} else {
|
||||
// just pick a random host
|
||||
req.Host = r.hosts[rand.Intn(len(r.hosts))]
|
||||
}
|
||||
// just pick a random host
|
||||
req.Host = r.hosts[rand.Intn(len(r.hosts))]
|
||||
|
||||
req.URL.Host = req.Host
|
||||
|
||||
// just retain the original host if not in the list of reachable hosts
|
||||
// r.log.Debugf("Host %s is not found on the list of reachable hosts", req.Host)
|
||||
}
|
||||
|
||||
// CheckIfHostIsReachable checks if the given host is passed in the list of reachable hosts
|
||||
@@ -256,6 +266,23 @@ func (r *HTTPClient) shouldRetry(req *http.Request, resp *http.Response, err err
|
||||
return false
|
||||
}
|
||||
|
||||
// retry on timeout errors for download requests
|
||||
if err != nil && strings.Contains(err.Error(), "timeout") && strings.Contains(req.Host, ".download.real-debrid.") && len(r.hosts) > 1 {
|
||||
oldHost := req.Host
|
||||
// remove old host from the list of reachable hosts
|
||||
for i, host := range r.hosts {
|
||||
if host == oldHost {
|
||||
r.hosts = append(r.hosts[:i], r.hosts[i+1:]...)
|
||||
break
|
||||
}
|
||||
}
|
||||
// pick a new host from the list
|
||||
req.Host = r.hosts[rand.Intn(len(r.hosts))]
|
||||
req.URL.Host = req.Host
|
||||
r.log.Debugf("Download timed out, attempt #%d, retrying with a new host (%s -> %s)", attempts+1, oldHost, req.URL.Host)
|
||||
return true
|
||||
}
|
||||
|
||||
if apiErr, ok := err.(*ApiErrorResponse); ok {
|
||||
switch apiErr.Code {
|
||||
case 5: // Slow down (retry infinitely)
|
||||
@@ -291,20 +318,8 @@ func (r *HTTPClient) shouldRetry(req *http.Request, resp *http.Response, err err
|
||||
default:
|
||||
return false
|
||||
}
|
||||
} else if downloadErr, ok := err.(*DownloadErrorResponse); ok {
|
||||
switch downloadErr.Message {
|
||||
case "invalid_download_code": // 404
|
||||
if attempts >= r.maxRetries {
|
||||
r.log.Debugf("Invalid download code, attempt #%d", attempts+1)
|
||||
return false
|
||||
}
|
||||
secs := r.backoff(attempts, rateLimitSleep)
|
||||
r.log.Debugf("Invalid download code, attempt #%d, retrying in %d seconds", attempts+1, secs/time.Second)
|
||||
time.Sleep(secs)
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
} else if _, ok := err.(*DownloadErrorResponse); ok {
|
||||
return false
|
||||
}
|
||||
|
||||
// succesful requests
|
||||
@@ -319,7 +334,7 @@ func (r *HTTPClient) shouldRetry(req *http.Request, resp *http.Response, err err
|
||||
}
|
||||
|
||||
if attempts >= r.maxRetries {
|
||||
r.log.Debugf("Request failed, attempt #%d (error=%v)", attempts+1, err)
|
||||
r.log.Errorf("Request failed, attempt #%d (error=%v), giving up", attempts+1, err)
|
||||
return false
|
||||
}
|
||||
secs := r.backoff(attempts, 1)
|
||||
@@ -342,10 +357,6 @@ func (r *HTTPClient) VerifyLink(link string) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
timeout := time.Duration(r.timeoutSecs) * time.Second
|
||||
ctx, cancel := context.WithTimeout(context.Background(), timeout)
|
||||
defer cancel()
|
||||
req = req.WithContext(ctx)
|
||||
resp, err := r.Do(req)
|
||||
if err != nil {
|
||||
return err
|
||||
|
||||
130
pkg/http/ip.go
130
pkg/http/ip.go
@@ -11,9 +11,12 @@ import (
|
||||
"net/url"
|
||||
"os"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/debridmediamanager/zurg/pkg/logutil"
|
||||
"github.com/panjf2000/ants/v2"
|
||||
)
|
||||
|
||||
type IPRepository struct {
|
||||
@@ -89,6 +92,7 @@ func (r *IPRepository) GetHosts(numberOfHosts int, ipv6 bool) []string {
|
||||
kvList = append(kvList, kv{k, v})
|
||||
}
|
||||
|
||||
// Sort by latency from lowest to highest
|
||||
sort.Slice(kvList, func(i, j int) bool {
|
||||
return kvList[i].Value < kvList[j].Value
|
||||
})
|
||||
@@ -105,65 +109,78 @@ func (r *IPRepository) GetHosts(numberOfHosts int, ipv6 bool) []string {
|
||||
}
|
||||
|
||||
func (r *IPRepository) runLatencyTest() {
|
||||
limit := 99
|
||||
start := 0
|
||||
for {
|
||||
lastDomainsWorked := false
|
||||
for i := start; i <= limit; i++ {
|
||||
domain := fmt.Sprintf("%d.download.real-debrid.com", i)
|
||||
ips, err := net.LookupIP(domain)
|
||||
if err != nil || len(ips) == 0 {
|
||||
continue
|
||||
var wg sync.WaitGroup
|
||||
ipv6Enabled := false
|
||||
if r.canConnectToIPv6() {
|
||||
r.log.Info("Your network supports IPv6")
|
||||
ipv6Enabled = true
|
||||
} else {
|
||||
r.log.Info("Your network does not support IPv6")
|
||||
}
|
||||
|
||||
p, _ := ants.NewPoolWithFunc(8, func(h interface{}) {
|
||||
defer wg.Done()
|
||||
host := h.(string)
|
||||
|
||||
ips, err := net.LookupIP(host)
|
||||
if err == nil && len(ips) > 0 {
|
||||
latency, err := r.testDomainLatency(r.ipv4client, host)
|
||||
if err == nil {
|
||||
r.ipv4latencyMap[host] = latency
|
||||
r.log.Debugf("Latency from ipv4 %s: %.5f seconds", host, latency)
|
||||
}
|
||||
|
||||
latency, err := r.testDomainLatency(r.ipv4client, domain)
|
||||
if err == nil {
|
||||
r.ipv4latencyMap[domain] = latency
|
||||
r.log.Debugf("Latency from ipv4 %s: %.5f seconds", domain, latency)
|
||||
if i >= limit-2 {
|
||||
lastDomainsWorked = true
|
||||
if ipv6Enabled {
|
||||
latency, err = r.testDomainLatency(r.ipv6client, host)
|
||||
if err == nil {
|
||||
r.ipv6latencyMap[host] = latency
|
||||
r.log.Debugf("Latency from ipv6 %s: %.5f seconds", host, latency)
|
||||
}
|
||||
}
|
||||
|
||||
latency, err = r.testDomainLatency(r.ipv6client, domain)
|
||||
if err == nil {
|
||||
r.ipv6latencyMap[domain] = latency
|
||||
r.log.Debugf("Latency from ipv6 %s: %.5f seconds", domain, latency)
|
||||
if i >= limit-2 {
|
||||
lastDomainsWorked = true
|
||||
if strings.HasSuffix(host, ".download.real-debrid.com") {
|
||||
ipv4Host := strings.Replace(host, ".download.real-debrid.com", "-4.download.real-debrid.com", 1)
|
||||
latency, err := r.testDomainLatency(r.ipv4client, ipv4Host)
|
||||
if err == nil {
|
||||
r.ipv4latencyMap[ipv4Host] = latency
|
||||
r.log.Debugf("Latency from ipv4 %s: %.5f seconds", ipv4Host, latency)
|
||||
}
|
||||
}
|
||||
|
||||
domain = fmt.Sprintf("%d.download.real-debrid.cloud", i)
|
||||
ips, err = net.LookupIP(domain)
|
||||
if err != nil || len(ips) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
latency, err = r.testDomainLatency(r.ipv4client, domain)
|
||||
if err == nil {
|
||||
r.ipv4latencyMap[domain] = latency
|
||||
r.log.Debugf("Latency from ipv4 %s: %.5f seconds", domain, latency)
|
||||
if i >= limit-2 {
|
||||
lastDomainsWorked = true
|
||||
}
|
||||
}
|
||||
|
||||
latency, err = r.testDomainLatency(r.ipv6client, domain)
|
||||
if err == nil {
|
||||
r.ipv6latencyMap[domain] = latency
|
||||
r.log.Debugf("Latency from ipv6 %s: %.5f seconds", domain, latency)
|
||||
if i >= limit-2 {
|
||||
lastDomainsWorked = true
|
||||
if ipv6Enabled {
|
||||
ipv6Host := strings.Replace(host, ".download.real-debrid.com", "-6.download.real-debrid.com", 1)
|
||||
latency, err = r.testDomainLatency(r.ipv6client, ipv6Host)
|
||||
if err == nil {
|
||||
r.ipv6latencyMap[ipv6Host] = latency
|
||||
r.log.Debugf("Latency from ipv6 %s: %.5f seconds", ipv6Host, latency)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if lastDomainsWorked {
|
||||
start = limit + 1
|
||||
limit += 10
|
||||
} else {
|
||||
break
|
||||
}
|
||||
})
|
||||
|
||||
defer p.Release()
|
||||
|
||||
for i := 1; i <= 99; i++ {
|
||||
wg.Add(2)
|
||||
_ = p.Invoke(fmt.Sprintf("%d.download.real-debrid.com", i))
|
||||
_ = p.Invoke(fmt.Sprintf("%d.download.real-debrid.cloud", i))
|
||||
}
|
||||
|
||||
// for i := 'a'; i <= 'z'; i++ {
|
||||
// for j := 'a'; j <= 'z'; j++ {
|
||||
// for k := 'a'; k <= 'z'; k++ {
|
||||
// wg.Add(2)
|
||||
// _ = p.Invoke(fmt.Sprintf("%c%c%c.download.real-debrid.com", i, j, k))
|
||||
// _ = p.Invoke(fmt.Sprintf("%c%c%c1.download.real-debrid.com", i, j, k))
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
|
||||
wg.Wait()
|
||||
|
||||
r.log.Infof("Found %d ipv4 hosts", len(r.ipv4latencyMap))
|
||||
if ipv6Enabled {
|
||||
r.log.Infof("Found %d ipv6 hosts", len(r.ipv6latencyMap))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -252,3 +269,18 @@ func (r *IPRepository) writeLatencyFile(latencyFile string, data interface{}) {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (r *IPRepository) canConnectToIPv6() bool {
|
||||
address := "[2001:4860:4860::8888]:53" // Google Public DNS IPv6 address on port 53
|
||||
timeout := 5 * time.Second // Timeout duration
|
||||
|
||||
conn, err := net.DialTimeout("tcp6", address, timeout)
|
||||
if err != nil {
|
||||
fmt.Printf("Failed to connect to IPv6 address %s: %v\n", address, err)
|
||||
return false
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
fmt.Printf("Successfully connected to IPv6 address %s\n", address)
|
||||
return true
|
||||
}
|
||||
|
||||
@@ -17,7 +17,7 @@ import (
|
||||
)
|
||||
|
||||
type RealDebrid struct {
|
||||
UnrestrictMap cmap.ConcurrentMap[string, cmap.ConcurrentMap[string, *Download]]
|
||||
UnrestrictCache cmap.ConcurrentMap[string, cmap.ConcurrentMap[string, *Download]]
|
||||
TokenManager *DownloadTokenManager
|
||||
torrentsCache []Torrent
|
||||
verifiedLinks cmap.ConcurrentMap[string, int64]
|
||||
@@ -45,7 +45,7 @@ func NewRealDebrid(apiClient,
|
||||
}
|
||||
|
||||
rd := &RealDebrid{
|
||||
UnrestrictMap: cmap.New[cmap.ConcurrentMap[string, *Download]](),
|
||||
UnrestrictCache: cmap.New[cmap.ConcurrentMap[string, *Download]](),
|
||||
TokenManager: NewDownloadTokenManager(downloadTokens, log),
|
||||
torrentsCache: []Torrent{},
|
||||
verifiedLinks: cmap.New[int64](),
|
||||
@@ -59,16 +59,17 @@ func NewRealDebrid(apiClient,
|
||||
}
|
||||
|
||||
for _, token := range downloadTokens {
|
||||
rd.UnrestrictMap.Set(token, cmap.New[*Download]())
|
||||
rd.UnrestrictCache.Set(token, cmap.New[*Download]())
|
||||
}
|
||||
|
||||
return rd
|
||||
}
|
||||
|
||||
const DOWNLOAD_LINK_EXPIRY = 60 * 60 * 1.5 // 1.5 hours
|
||||
const DOWNLOAD_LINK_EXPIRY = 60 * 3 // 3 minutes
|
||||
|
||||
func (rd *RealDebrid) UnrestrictAndVerify(link string) (*Download, error) {
|
||||
for {
|
||||
now := time.Now().Unix()
|
||||
token, err := rd.TokenManager.GetCurrentToken()
|
||||
if err != nil {
|
||||
// when all tokens are expired
|
||||
@@ -76,48 +77,38 @@ func (rd *RealDebrid) UnrestrictAndVerify(link string) (*Download, error) {
|
||||
}
|
||||
|
||||
// check if the link is already unrestricted
|
||||
tokenMap, _ := rd.UnrestrictMap.Get(token)
|
||||
if tokenMap.Has(link) {
|
||||
download, _ := tokenMap.Get(link)
|
||||
unrestrictCache, _ := rd.UnrestrictCache.Get(token)
|
||||
if unrestrictCache.Has(link) {
|
||||
download, _ := unrestrictCache.Get(link)
|
||||
|
||||
// check if the link is in the verified links cache
|
||||
if expiry, ok := rd.verifiedLinks.Get(download.ID); ok && expiry > time.Now().Unix() {
|
||||
// check if the link is in the verified links cache and not expired
|
||||
if expiry, ok := rd.verifiedLinks.Get(download.ID); ok && expiry > now {
|
||||
return download, nil
|
||||
} else if ok {
|
||||
// if the link is expired, remove it from the verified links cache
|
||||
rd.verifiedLinks.Remove(download.ID)
|
||||
}
|
||||
|
||||
// check if the link is still valid (not in the cache or expired)
|
||||
// we need to re-verify the link
|
||||
|
||||
rd.verifiedLinks.Remove(download.ID)
|
||||
err := rd.downloadClient.VerifyLink(download.Download)
|
||||
if utils.IsBytesLimitReached(err) {
|
||||
if err == nil {
|
||||
// yes? then extend the expiry time?
|
||||
rd.verifiedLinks.Set(download.ID, now+DOWNLOAD_LINK_EXPIRY)
|
||||
return download, nil
|
||||
} else if utils.IsBytesLimitReached(err) {
|
||||
rd.TokenManager.SetTokenAsExpired(token, "bandwidth limit exceeded")
|
||||
continue
|
||||
} else if err == nil {
|
||||
rd.verifiedLinks.Set(download.ID, time.Now().Unix()+DOWNLOAD_LINK_EXPIRY)
|
||||
return download, nil
|
||||
}
|
||||
|
||||
// if verification failed, remove the link from the token map
|
||||
tokenMap.Remove(link)
|
||||
unrestrictCache.Remove(link)
|
||||
}
|
||||
|
||||
download, err := rd.UnrestrictLinkWithToken(link, token)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
download.Token = token
|
||||
|
||||
tokenMap.Set(link, download)
|
||||
|
||||
err = rd.downloadClient.VerifyLink(download.Download)
|
||||
if utils.IsBytesLimitReached(err) {
|
||||
rd.TokenManager.SetTokenAsExpired(token, "bandwidth limit exceeded")
|
||||
continue
|
||||
} else if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
rd.verifiedLinks.Set(download.ID, time.Now().Unix()+DOWNLOAD_LINK_EXPIRY)
|
||||
unrestrictCache.Set(link, download)
|
||||
rd.verifiedLinks.Set(download.ID, now+DOWNLOAD_LINK_EXPIRY)
|
||||
|
||||
return download, err
|
||||
}
|
||||
@@ -162,6 +153,8 @@ func (rd *RealDebrid) UnrestrictLinkWithToken(link, token string) (*Download, er
|
||||
return nil, fmt.Errorf("undecodable response: %v", err)
|
||||
}
|
||||
|
||||
response.Token = token
|
||||
|
||||
// rd.log.Debugf("Unrestricted link %s into %s", link, response.Download)
|
||||
return &response, nil
|
||||
}
|
||||
@@ -428,30 +421,28 @@ func (rd *RealDebrid) MonitorExpiredTokens() {
|
||||
i++
|
||||
expiredTokens := rd.TokenManager.GetExpiredTokens()
|
||||
for _, token := range expiredTokens {
|
||||
tokenMap, _ := rd.UnrestrictMap.Get(token)
|
||||
stillExpired := true
|
||||
unrestrictCache, _ := rd.UnrestrictCache.Get(token)
|
||||
skipAll := false
|
||||
tokenMap.IterCb(func(key string, download *Download) {
|
||||
unrestrictCache.IterCb(func(key string, download *Download) {
|
||||
if skipAll {
|
||||
return
|
||||
}
|
||||
err := rd.downloadClient.VerifyLink(download.Download)
|
||||
if err != nil {
|
||||
if utils.IsBytesLimitReached(err) {
|
||||
if i%15 == 0 {
|
||||
if i%10 == 0 {
|
||||
rd.log.Debugf("Token %s is still expired", utils.MaskToken(token))
|
||||
}
|
||||
// we already know that it is still expired
|
||||
skipAll = true
|
||||
}
|
||||
return
|
||||
}
|
||||
stillExpired = false
|
||||
|
||||
skipAll = true
|
||||
rd.verifiedLinks.Set(download.ID, time.Now().Unix()+DOWNLOAD_LINK_EXPIRY)
|
||||
})
|
||||
if !stillExpired {
|
||||
rd.TokenManager.SetTokenAsUnexpired(token)
|
||||
}
|
||||
})
|
||||
}
|
||||
time.Sleep(sleepPeriod)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user