From 962845fb81ddf725dfa11157c0c8c3d2f8e20a43 Mon Sep 17 00:00:00 2001 From: Ben Adrian Sarmiento Date: Fri, 28 Jun 2024 04:47:43 +0200 Subject: [PATCH] Multi-token support --- internal/app.go | 7 +-- internal/commands.go | 4 +- internal/config/types.go | 6 +++ internal/handlers/home.go | 18 ++++---- internal/handlers/router.go | 22 ++++----- internal/torrent/delete.go | 2 +- internal/torrent/latestState.go | 4 +- internal/torrent/manager.go | 63 +++++++++++--------------- internal/torrent/refresh.go | 4 +- internal/torrent/repair.go | 14 +++--- internal/torrent/uncached.go | 2 +- internal/universal/downloader.go | 17 +------ pkg/http/client.go | 18 +++++--- pkg/realdebrid/api.go | 77 ++++++++++++++++++++++++++++---- pkg/realdebrid/token_manager.go | 64 ++++++++++++++++++++++++++ 15 files changed, 214 insertions(+), 108 deletions(-) create mode 100644 pkg/realdebrid/token_manager.go diff --git a/internal/app.go b/internal/app.go index fc1f1e4..b004b12 100644 --- a/internal/app.go +++ b/internal/app.go @@ -61,8 +61,8 @@ func MainApp(configPath string) { proxyURL = os.Getenv("PROXY") } - repoClient4 := http.NewHTTPClient("", 0, 1, false, []string{}, proxyURL, log.Named("network_test")) - repoClient6 := http.NewHTTPClient("", 0, 1, true, []string{}, proxyURL, log.Named("network_test")) + repoClient4 := http.NewHTTPClient(0, 1, false, []string{}, proxyURL, log.Named("network_test")) + repoClient6 := http.NewHTTPClient(0, 1, true, []string{}, proxyURL, log.Named("network_test")) repo := http.NewIPRepository(repoClient4, repoClient6, "", log.Named("network_test")) var hosts []string @@ -84,7 +84,6 @@ func MainApp(configPath string) { } apiClient := http.NewHTTPClient( - config.GetToken(), config.GetRetriesUntilFailed(), // default retries = 2 config.GetApiTimeoutSecs(), // default api timeout = 60 false, // no need for ipv6 support @@ -94,7 +93,6 @@ func MainApp(configPath string) { ) unrestrictClient := http.NewHTTPClient( - config.GetToken(), config.GetRetriesUntilFailed(), // default retries = 2 config.GetDownloadTimeoutSecs(), // default download timeout = 10 false, // no need for ipv6 support @@ -104,7 +102,6 @@ func MainApp(configPath string) { ) downloadClient := http.NewHTTPClient( - "", config.GetRetriesUntilFailed(), config.GetDownloadTimeoutSecs(), config.ShouldForceIPv6(), diff --git a/internal/commands.go b/internal/commands.go index 54cb35e..c7861e4 100644 --- a/internal/commands.go +++ b/internal/commands.go @@ -35,8 +35,8 @@ func NetworkTest(testURL string) { log.Info("You can set a proxy by setting the PROXY environment variable") } - repoClient4 := http.NewHTTPClient("", 0, 1, false, []string{}, proxyURL, log.Named("network_test")) - repoClient6 := http.NewHTTPClient("", 0, 1, true, []string{}, proxyURL, log.Named("network_test")) + repoClient4 := http.NewHTTPClient(0, 1, false, []string{}, proxyURL, log.Named("network_test")) + repoClient6 := http.NewHTTPClient(0, 1, true, []string{}, proxyURL, log.Named("network_test")) repo := http.NewIPRepository(repoClient4, repoClient6, testURL, log.Named("network_test")) repo.NetworkTest(true, true) } diff --git a/internal/config/types.go b/internal/config/types.go index 7a4b224..04cdd21 100644 --- a/internal/config/types.go +++ b/internal/config/types.go @@ -15,6 +15,7 @@ type ConfigInterface interface { GetDirectories() []string GetDownloadsEveryMins() int GetDownloadTimeoutSecs() int + GetDownloadTokens() []string GetDumpTorrentsEveryMins() int GetHost() string GetNumberOfHosts() int @@ -50,6 +51,7 @@ type ZurgConfig struct { CanRepair bool `yaml:"enable_repair" json:"enable_repair"` DownloadsEveryMins int `yaml:"downloads_every_mins" json:"downloads_every_mins"` DownloadTimeoutSecs int `yaml:"download_timeout_secs" json:"download_timeout_secs"` + DownloadTokens []string `yaml:"download_tokens" json:"download_tokens"` DumpTorrentsEveryMins int `yaml:"dump_torrents_every_mins" json:"dump_torrents_every_mins"` ForceIPv6 bool `yaml:"force_ipv6" json:"force_ipv6"` Host string `yaml:"host" json:"host"` @@ -226,3 +228,7 @@ func (z *ZurgConfig) ShouldCacheNetworkTestResults() bool { func (z *ZurgConfig) ShouldLogRequests() bool { return z.LogRequests } + +func (z *ZurgConfig) GetDownloadTokens() []string { + return z.DownloadTokens +} diff --git a/internal/handlers/home.go b/internal/handlers/home.go index b18daa3..10fe0b9 100644 --- a/internal/handlers/home.go +++ b/internal/handlers/home.go @@ -29,7 +29,7 @@ type RootResponse struct { Infuse string `json:"infuse"` Logs string `json:"logs"` UserInfo *realdebrid.User `json:"user_info"` - TrafficLogged uint64 `json:"traffic_logged"` + APITrafficMB uint64 `json:"traffic_from_api"` RequestedMB uint64 `json:"requested_mb"` ServedMB uint64 `json:"served_mb"` LibrarySize int `json:"library_size"` // Number of torrents in the library @@ -82,10 +82,10 @@ func (zr *Handlers) generateResponse(resp http.ResponseWriter, req *http.Request http.Error(resp, err.Error(), http.StatusInternalServerError) return nil, err } - var trafficLogged int64 - trafficLogged = 0 + var trafficFromAPI int64 + trafficFromAPI = 0 if _, ok := trafficDetails["real-debrid.com"]; ok { - trafficLogged = trafficDetails["real-debrid.com"] + trafficFromAPI = trafficDetails["real-debrid.com"] } userInfo.Premium = userInfo.Premium / 86400 @@ -101,7 +101,7 @@ func (zr *Handlers) generateResponse(resp http.ResponseWriter, req *http.Request Infuse: fmt.Sprintf("//%s/infuse/", req.Host), Logs: fmt.Sprintf("//%s/logs/", req.Host), UserInfo: userInfo, - TrafficLogged: bToMb(uint64(trafficLogged)), + APITrafficMB: bToMb(uint64(trafficFromAPI)), RequestedMB: bToMb(zr.downloader.RequestedBytes.Load()), ServedMB: bToMb(zr.downloader.TotalBytes.Load()), LibrarySize: allTorrents.Count(), @@ -198,10 +198,10 @@ func (zr *Handlers) handleHome(resp http.ResponseWriter, req *http.Request) { } efficiency := response.ServedMB * 100 / denominator - if zr.initialTraffic.Load() > response.TrafficLogged { + if zr.trafficOnStartup.Load() > response.APITrafficMB { // it cannot be bigger than traffic logged // so it must be a reset back to 0 - zr.initialTraffic.Store(0) + zr.trafficOnStartup.Store(response.APITrafficMB * 1024 * 1024) } out += fmt.Sprintf(` @@ -251,8 +251,8 @@ func (zr *Handlers) handleHome(resp http.ResponseWriter, req *http.Request) { response.Sys, response.NumGC, response.PID, - response.TrafficLogged, - response.TrafficLogged-bToMb(zr.initialTraffic.Load()), + response.APITrafficMB, + response.APITrafficMB-bToMb(zr.trafficOnStartup.Load()), response.RequestedMB, response.ServedMB, efficiency, diff --git a/internal/handlers/router.go b/internal/handlers/router.go index 6fb5441..61f0884 100644 --- a/internal/handlers/router.go +++ b/internal/handlers/router.go @@ -21,14 +21,14 @@ import ( ) type Handlers struct { - downloader *universal.Downloader - torMgr *torrent.TorrentManager - cfg config.ConfigInterface - api *realdebrid.RealDebrid - workerPool *ants.Pool - hosts []string - initialTraffic atomic.Uint64 - log *logutil.Logger + downloader *universal.Downloader + torMgr *torrent.TorrentManager + cfg config.ConfigInterface + api *realdebrid.RealDebrid + workerPool *ants.Pool + hosts []string + trafficOnStartup atomic.Uint64 + log *logutil.Logger } func init() { @@ -53,9 +53,9 @@ func AttachHandlers(router *chi.Mux, downloader *universal.Downloader, torMgr *t log.Errorf("Failed to get traffic details: %v", err) trafficDetails = make(map[string]int64) } - hs.initialTraffic.Store(uint64(0)) + hs.trafficOnStartup.Store(uint64(0)) if _, ok := trafficDetails["real-debrid.com"]; ok { - hs.initialTraffic.Store(uint64(trafficDetails["real-debrid.com"])) + hs.trafficOnStartup.Store(uint64(trafficDetails["real-debrid.com"])) } if cfg.GetUsername() != "" { @@ -426,7 +426,7 @@ func (hs *Handlers) handleDownloadLink(resp http.ResponseWriter, req *http.Reque filename = chi.URLParam(req, "filename") } if download, ok := hs.torMgr.DownloadMap.Get(filename); ok { - hs.downloader.DownloadLink(download.Filename, download.Download, resp, req, hs.torMgr, hs.cfg, hs.log) + hs.downloader.DownloadLink(download, resp, req, hs.torMgr, hs.cfg, hs.log) } else { http.NotFound(resp, req) } diff --git a/internal/torrent/delete.go b/internal/torrent/delete.go index 539233d..282179c 100644 --- a/internal/torrent/delete.go +++ b/internal/torrent/delete.go @@ -38,6 +38,6 @@ func (t *TorrentManager) Delete(accessKey string, deleteInRD bool) { } func (t *TorrentManager) DeleteByID(torrentID string) { - t.api.DeleteTorrent(torrentID) + t.rd.DeleteTorrent(torrentID) t.deleteInfoFile(torrentID) } diff --git a/internal/torrent/latestState.go b/internal/torrent/latestState.go index a558ad4..4167d71 100644 --- a/internal/torrent/latestState.go +++ b/internal/torrent/latestState.go @@ -35,7 +35,7 @@ func (t *TorrentManager) setNewLatestState(checksum LibraryState) { func (t *TorrentManager) getCurrentState() LibraryState { var state LibraryState - torrents, totalCount, err := t.api.GetTorrents(true) + torrents, totalCount, err := t.rd.GetTorrents(true) if err != nil { t.log.Errorf("Checksum API Error (GetTorrents): %v", err) return LibraryState{} @@ -45,7 +45,7 @@ func (t *TorrentManager) getCurrentState() LibraryState { state.FirstTorrentId = torrents[0].ID } - count, err := t.api.GetActiveTorrentCount() + count, err := t.rd.GetActiveTorrentCount() if err != nil { t.log.Errorf("Checksum API Error (GetActiveTorrentCount): %v", err) return LibraryState{} diff --git a/internal/torrent/manager.go b/internal/torrent/manager.go index 8ad48b1..d29dccc 100644 --- a/internal/torrent/manager.go +++ b/internal/torrent/manager.go @@ -30,14 +30,13 @@ type TorrentManager struct { requiredVersion string Config config.ConfigInterface - api *realdebrid.RealDebrid + rd *realdebrid.RealDebrid workerPool *ants.Pool log *logutil.Logger repairLog *logutil.Logger - DirectoryMap cmap.ConcurrentMap[string, cmap.ConcurrentMap[string, *Torrent]] // directory -> accessKey -> Torrent - DownloadMap cmap.ConcurrentMap[string, *realdebrid.Download] - UnrestrictMap cmap.ConcurrentMap[string, *realdebrid.Download] + DirectoryMap cmap.ConcurrentMap[string, cmap.ConcurrentMap[string, *Torrent]] // directory -> accessKey -> Torrent + DownloadMap cmap.ConcurrentMap[string, *realdebrid.Download] RootNode *fs.FileNode @@ -67,14 +66,13 @@ func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, w requiredVersion: "0.10.0", Config: cfg, - api: api, + rd: api, workerPool: workerPool, log: log, repairLog: repairLog, - DirectoryMap: cmap.New[cmap.ConcurrentMap[string, *Torrent]](), - DownloadMap: cmap.New[*realdebrid.Download](), - UnrestrictMap: cmap.New[*realdebrid.Download](), + DirectoryMap: cmap.New[cmap.ConcurrentMap[string, *Torrent]](), + DownloadMap: cmap.New[*realdebrid.Download](), RootNode: fs.NewFileNode("root", true), @@ -140,34 +138,13 @@ func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, w } // proxy function -func (t *TorrentManager) UnrestrictFile(file *File, checkFirstByte bool) (*realdebrid.Download, error) { +func (t *TorrentManager) UnrestrictFile(file *File) (*realdebrid.Download, error) { if file.State.Is("deleted_file") { return nil, fmt.Errorf("file %s has been deleted", file.Path) } else if file.State.Is("broken_file") { return nil, fmt.Errorf("file %s is broken", file.Path) } - return t.UnrestrictLink(file.Link, checkFirstByte) -} - -func (t *TorrentManager) UnrestrictLink(link string, verifyURL bool) (*realdebrid.Download, error) { - isRealDebrid := strings.HasPrefix(link, "https://real-debrid.com/d/") - if isRealDebrid && t.UnrestrictMap.Has(link[0:39]) { - ret, _ := t.UnrestrictMap.Get(link[0:39]) - return ret, nil - } else if !isRealDebrid && t.UnrestrictMap.Has(link) { - ret, _ := t.UnrestrictMap.Get(link) - return ret, nil - } - ret, err := t.api.UnrestrictLink(link, verifyURL) - if err != nil { - return nil, err - } - if isRealDebrid { - t.UnrestrictMap.Set(ret.Link[0:39], ret) - } else { - t.UnrestrictMap.Set(ret.Link, ret) - } - return ret, nil + return t.rd.UnrestrictLink(file.Link) } func (t *TorrentManager) GetKey(torrent *Torrent) string { @@ -242,7 +219,7 @@ func (t *TorrentManager) applyMediaInfoDetails(torrent *Torrent) error { if file.MediaInfo != nil || !file.State.Is("ok_file") || !isPlayable { return } - unrestrict, err := t.UnrestrictFile(file, true) + unrestrict, err := t.UnrestrictFile(file) if dlErr, ok := err.(*http.DownloadErrorResponse); ok && dlErr.Message == "bytes_limit_reached" { bwLimitReached = true return @@ -355,17 +332,29 @@ func (t *TorrentManager) deleteInfoFile(torrentID string) { /// end info functions func (t *TorrentManager) mountNewDownloads() { - downloads := t.api.GetDownloads() + token, _ := t.rd.GetToken() + var tokenMap cmap.ConcurrentMap[string, *realdebrid.Download] + if token != "" { + tokenMap, _ = t.rd.UnrestrictMap.Get(token) + } + + downloads := t.rd.GetDownloads() + mountedCount := 0 for i := range downloads { isRealDebrid := strings.HasPrefix(downloads[i].Link, "https://real-debrid.com/d/") - if isRealDebrid { - t.UnrestrictMap.SetIfAbsent(downloads[i].Link[0:39], &downloads[i]) - } else { - t.UnrestrictMap.SetIfAbsent(downloads[i].Link, &downloads[i]) + if !isRealDebrid { filename := filepath.Base(downloads[i].Filename) t.DownloadMap.Set(filename, &downloads[i]) + mountedCount++ + } else if token != "" { + tokenMap.Set(downloads[i].Link, &downloads[i]) } } + if mountedCount > 0 { + t.log.Infof("Mounted %d new downloads", mountedCount) + } else { + t.log.Debugf("No new downloads to mount") + } } // StartDownloadsJob: permanent job for remounting downloads diff --git a/internal/torrent/refresh.go b/internal/torrent/refresh.go index 8f3abad..7aaa4de 100644 --- a/internal/torrent/refresh.go +++ b/internal/torrent/refresh.go @@ -17,7 +17,7 @@ import ( ) func (t *TorrentManager) refreshTorrents(initialRun bool) { - instances, _, err := t.api.GetTorrents(false) + instances, _, err := t.rd.GetTorrents(false) if err != nil { t.log.Warnf("Cannot get torrents: %v", err) return @@ -173,7 +173,7 @@ func (t *TorrentManager) getMoreInfo(rdTorrent realdebrid.Torrent) *realdebrid.T info := t.readInfoFromFile(rdTorrent.ID) if info == nil { var err error - info, err = t.api.GetTorrentInfo(rdTorrent.ID) + info, err = t.rd.GetTorrentInfo(rdTorrent.ID) if err != nil { t.log.Warnf("Cannot get info for id=%s: %v", rdTorrent.ID, err) return nil diff --git a/internal/torrent/repair.go b/internal/torrent/repair.go index 9f0253d..be837e9 100644 --- a/internal/torrent/repair.go +++ b/internal/torrent/repair.go @@ -196,7 +196,7 @@ func (t *TorrentManager) repair(torrent *Torrent, wg *sync.WaitGroup) { if bwLimitReached || !file.State.Is("ok_file") { return } - _, err := t.UnrestrictFile(file, true) + _, err := t.UnrestrictFile(file) if dlErr, ok := err.(*http.DownloadErrorResponse); ok && dlErr.Message == "bytes_limit_reached" { bwLimitReached = true return @@ -339,7 +339,7 @@ func (t *TorrentManager) assignLinks(torrent *Torrent) bool { bwLimitReached := false torrent.UnassignedLinks.Clone().Each(func(link string) bool { // unrestrict each unassigned link that was filled out during torrent init - unrestrict, err := t.UnrestrictLink(link, true) + unrestrict, err := t.rd.UnrestrictLink(link) if dlErr, ok := err.(*http.DownloadErrorResponse); ok && dlErr.Message == "bytes_limit_reached" { bwLimitReached = true return true @@ -481,7 +481,7 @@ func (t *TorrentManager) redownloadTorrent(torrent *Torrent, selection []string) // redownload torrent var newTorrentID string prevState := t.latestState - resp, err := t.api.AddMagnetHash(torrent.Hash) + resp, err := t.rd.AddMagnetHash(torrent.Hash) if err != nil { if strings.Contains(err.Error(), "timeout") { newState := t.getCurrentState() @@ -523,14 +523,14 @@ func (t *TorrentManager) redownloadTorrent(torrent *Torrent, selection []string) return nil, fmt.Errorf("cannot start redownloading: too many retries") } - err = t.api.SelectTorrentFiles(newTorrentID, finalSelection) + err = t.rd.SelectTorrentFiles(newTorrentID, finalSelection) if err != nil { t.DeleteByID(newTorrentID) return nil, fmt.Errorf("cannot start redownloading: %v", err) } time.Sleep(2 * time.Second) - info, err = t.api.GetTorrentInfo(newTorrentID) + info, err = t.rd.GetTorrentInfo(newTorrentID) if err != nil { t.DeleteByID(newTorrentID) return nil, fmt.Errorf("cannot get info on redownloaded : %v", err) @@ -568,7 +568,7 @@ func (t *TorrentManager) canCapacityHandle() bool { const maxDelay = 60 * time.Second retryCount := 0 for { - count, err := t.api.GetActiveTorrentCount() + count, err := t.rd.GetActiveTorrentCount() if err != nil { t.repairLog.Warnf("Cannot get active downloads count: %v", err) if retryCount >= maxRetries { @@ -679,7 +679,7 @@ func (t *TorrentManager) checkIfBroken(info *realdebrid.TorrentInfo, brokenFiles if oldFile.ID != newFile.ID { continue } - if _, err := t.UnrestrictFile(selectedFiles[idx], true); err != nil { + if _, err := t.UnrestrictFile(selectedFiles[idx]); err != nil { return err } } diff --git a/internal/torrent/uncached.go b/internal/torrent/uncached.go index 75633d8..597e506 100644 --- a/internal/torrent/uncached.go +++ b/internal/torrent/uncached.go @@ -34,7 +34,7 @@ func (t *TorrentManager) GetUncachedTorrents() ([]*Torrent, error) { break } - resp, err := t.api.AvailabilityCheck(hashGroups[i].ToSlice()) + resp, err := t.rd.AvailabilityCheck(hashGroups[i].ToSlice()) if err != nil { return nil, fmt.Errorf("availability check is incomplete, skipping uncached check: %v", err) } diff --git a/internal/universal/downloader.go b/internal/universal/downloader.go index 6884871..0017632 100644 --- a/internal/universal/downloader.go +++ b/internal/universal/downloader.go @@ -87,7 +87,7 @@ func (dl *Downloader) DownloadFile( return } - unrestrict, err := torMgr.UnrestrictFile(file, cfg.ShouldServeFromRclone()) + unrestrict, err := torMgr.UnrestrictFile(file) if dlErr, ok := err.(*zurghttp.DownloadErrorResponse); ok && dlErr.Message == "bytes_limit_reached" { // log.Errorf("Your account has reached the bandwidth limit, please try again after 12AM CET") http.Error(resp, "File is not available (bandwidth limit reached)", http.StatusBadRequest) @@ -122,26 +122,13 @@ func (dl *Downloader) DownloadFile( // DownloadLink handles a GET request for downloads func (dl *Downloader) DownloadLink( - fileName, - link string, + unrestrict *realdebrid.Download, resp http.ResponseWriter, req *http.Request, torMgr *intTor.TorrentManager, cfg config.ConfigInterface, log *logutil.Logger, ) { - // log.Debugf("Opening file %s (%s)", fileName, link) - unrestrict, err := torMgr.UnrestrictLink(link, cfg.ShouldServeFromRclone()) - if dlErr, ok := err.(*zurghttp.DownloadErrorResponse); ok && dlErr.Message == "bytes_limit_reached" { - // log.Errorf("Your account has reached the bandwidth limit, please try again after 12AM CET") - http.Error(resp, "Link is not available (bandwidth limit reached)", http.StatusBadRequest) - return - } - if err != nil { - log.Errorf("Error unrestricting link %s: %v", link, err) - http.Error(resp, "File is not available (can't unrestrict)", http.StatusInternalServerError) - return - } if cfg.ShouldServeFromRclone() { redirect(resp, req, unrestrict.Download) } else { diff --git a/pkg/http/client.go b/pkg/http/client.go index 34cca84..e4b28a3 100644 --- a/pkg/http/client.go +++ b/pkg/http/client.go @@ -12,6 +12,7 @@ import ( "net/http" "net/url" "strings" + "sync/atomic" "time" "github.com/debridmediamanager/zurg/pkg/logutil" @@ -27,7 +28,7 @@ type HTTPClient struct { timeoutSecs int rateLimitSleepSecs int backoff func(attempt int) time.Duration - bearerToken string + token atomic.Value dnsCache cmap.ConcurrentMap[string, string] hosts []string log *logutil.Logger @@ -52,7 +53,6 @@ func (e *DownloadErrorResponse) Error() string { } func NewHTTPClient( - token string, maxRetries int, timeoutSecs int, forceIPv6 bool, @@ -61,7 +61,6 @@ func NewHTTPClient( log *logutil.Logger, ) *HTTPClient { client := HTTPClient{ - bearerToken: token, client: &http.Client{}, maxRetries: maxRetries, timeoutSecs: timeoutSecs, @@ -128,9 +127,14 @@ func NewHTTPClient( return &client } +func (r *HTTPClient) SetToken(token string) { + r.token.Store(token) +} + func (r *HTTPClient) Do(req *http.Request) (*http.Response, error) { - if r.bearerToken != "" { - req.Header.Set("Authorization", "Bearer "+r.bearerToken) + token := r.token.Load() + if token != nil && token.(string) != "" { + req.Header.Set("Authorization", "Bearer "+token.(string)) } var resp *http.Response @@ -321,8 +325,8 @@ func backoffFunc(attempt int) time.Duration { return time.Duration(backoff) * time.Second } -func (r *HTTPClient) VerifyURL(url string) error { - req, err := http.NewRequest(http.MethodHead, url, nil) +func (r *HTTPClient) VerifyLink(link string) error { + req, err := http.NewRequest(http.MethodHead, link, nil) if err != nil { return err } diff --git a/pkg/realdebrid/api.go b/pkg/realdebrid/api.go index 156200b..d39ee7d 100644 --- a/pkg/realdebrid/api.go +++ b/pkg/realdebrid/api.go @@ -6,6 +6,7 @@ import ( "net/http" "net/url" "strings" + "time" "github.com/debridmediamanager/zurg/internal/config" zurghttp "github.com/debridmediamanager/zurg/pkg/http" @@ -17,25 +18,44 @@ import ( type RealDebrid struct { torrentsCache []Torrent UnrestrictMap cmap.ConcurrentMap[string, cmap.ConcurrentMap[string, *Download]] + verifiedLinks cmap.ConcurrentMap[string, int64] apiClient *zurghttp.HTTPClient unrestrictClient *zurghttp.HTTPClient downloadClient *zurghttp.HTTPClient + tokenManager *DownloadTokenManager workerPool *ants.Pool cfg config.ConfigInterface log *logutil.Logger } func NewRealDebrid(apiClient, unrestrictClient, downloadClient *zurghttp.HTTPClient, workerPool *ants.Pool, cfg config.ConfigInterface, log *logutil.Logger) *RealDebrid { + mainToken := cfg.GetToken() + downloadTokens := cfg.GetDownloadTokens() + if !strings.Contains(strings.Join(downloadTokens, ","), mainToken) { + downloadTokens = append([]string{mainToken}, downloadTokens...) + } + rd := &RealDebrid{ torrentsCache: []Torrent{}, + UnrestrictMap: cmap.New[cmap.ConcurrentMap[string, *Download]](), + verifiedLinks: cmap.New[int64](), apiClient: apiClient, unrestrictClient: unrestrictClient, downloadClient: downloadClient, + tokenManager: NewDownloadTokenManager(downloadTokens), workerPool: workerPool, cfg: cfg, log: log, } + + apiClient.SetToken(mainToken) + unrestrictClient.SetToken(mainToken) + for _, token := range downloadTokens { + rd.UnrestrictMap.Set(token, cmap.New[*Download]()) + } + rd.loadCachedTorrents() + return rd } @@ -76,10 +96,42 @@ func (rd *RealDebrid) UnrestrictCheck(link string) (*Download, error) { return &response, nil } -func (rd *RealDebrid) UnrestrictLink(link string, verifyDownloadURL bool) (*Download, error) { +func (rd *RealDebrid) UnrestrictLink(link string) (*Download, error) { + for { + token, err := rd.tokenManager.GetCurrentToken() + if err != nil { + // when all tokens are expired + return nil, err + } + download, err := rd.UnrestrictLinkWithToken(token, link) + if dlErr, ok := err.(*zurghttp.DownloadErrorResponse); ok && dlErr.Message == "bytes_limit_reached" { + rd.tokenManager.SetCurrentTokenExpired() + continue + } + return download, err + } +} + +func (rd *RealDebrid) UnrestrictLinkWithToken(token, link string) (*Download, error) { + // check if the link is already unrestricted + if tokenMap, ok := rd.UnrestrictMap.Get(token); ok { + if d, ok := tokenMap.Get(link); ok { + // check if the link is in the verified links cache + if expiry, ok := rd.verifiedLinks.Get(d.Download); ok && expiry > time.Now().Unix() { + return d, nil + } + err := rd.downloadClient.VerifyLink(d.Download) + if err != nil { + return nil, err + } + rd.verifiedLinks.Set(d.Download, time.Now().Unix()+60*60*24) + return d, nil + } + } + data := url.Values{} if strings.HasPrefix(link, "https://real-debrid.com/d/") { - // set link to max 39 chars + // set link to max 39 chars (26 + 13) link = link[0:39] } data.Set("link", link) @@ -87,12 +139,13 @@ func (rd *RealDebrid) UnrestrictLink(link string, verifyDownloadURL bool) (*Down req, err := http.NewRequest(http.MethodPost, "https://api.real-debrid.com/rest/1.0/unrestrict/link", requestBody) if err != nil { - rd.log.Errorf("Error when creating a unrestrict link request: %v", err) + // rd.log.Errorf("Error when creating a unrestrict link request: %v", err) return nil, err } req.Header.Set("Content-Type", "application/x-www-form-urlencoded") + rd.unrestrictClient.SetToken(token) // at this point, any errors mean that the link has expired and we need to repair it resp, err := rd.unrestrictClient.Do(req) if err != nil { @@ -114,14 +167,16 @@ func (rd *RealDebrid) UnrestrictLink(link string, verifyDownloadURL bool) (*Down return nil, fmt.Errorf("undecodable response: %v", err) } - // will only check for first byte if serving from rclone - if verifyDownloadURL { - err := rd.downloadClient.VerifyURL(response.Download) - if err != nil { - return nil, err - } + tokenMap, _ := rd.UnrestrictMap.Get(token) + tokenMap.Set(link, &response) + + err = rd.downloadClient.VerifyLink(response.Download) + if err != nil { + return nil, err } + rd.verifiedLinks.Set(response.Download, time.Now().Unix()+60*60*24) + // rd.log.Debugf("Unrestricted link %s into %s", link, response.Download) return &response, nil } @@ -372,3 +427,7 @@ func (rd *RealDebrid) AvailabilityCheck(hashes []string) (AvailabilityResponse, return response, nil } + +func (rd *RealDebrid) GetToken() (string, error) { + return rd.tokenManager.GetCurrentToken() +} diff --git a/pkg/realdebrid/token_manager.go b/pkg/realdebrid/token_manager.go new file mode 100644 index 0000000..d385c2b --- /dev/null +++ b/pkg/realdebrid/token_manager.go @@ -0,0 +1,64 @@ +package realdebrid + +import ( + "fmt" + "sync" +) + +type Token struct { + value string + expired bool +} + +type DownloadTokenManager struct { + tokens []Token + current int + mu sync.Mutex +} + +// NewDownloadTokenManager initializes a new DownloadTokenManager with the given tokens. +func NewDownloadTokenManager(tokenStrings []string) *DownloadTokenManager { + tokens := make([]Token, len(tokenStrings)) + for i, t := range tokenStrings { + tokens[i] = Token{value: t, expired: false} + } + return &DownloadTokenManager{tokens: tokens, current: 0} +} + +// GetCurrentToken returns the current non-expired token. +func (dtm *DownloadTokenManager) GetCurrentToken() (string, error) { + dtm.mu.Lock() + defer dtm.mu.Unlock() + + for { + if !dtm.tokens[dtm.current].expired { + return dtm.tokens[dtm.current].value, nil + } + + dtm.current = (dtm.current + 1) % len(dtm.tokens) + + if dtm.current == 0 { + return "", fmt.Errorf("all tokens are bandwidth-limited") + } + } +} + +// SetCurrentTokenExpired sets the current token as expired. +func (dtm *DownloadTokenManager) SetCurrentTokenExpired() { + dtm.mu.Lock() + defer dtm.mu.Unlock() + + dtm.tokens[dtm.current].expired = true + dtm.current = (dtm.current + 1) % len(dtm.tokens) +} + +// ResetAllTokens resets all tokens to expired=false. +func (dtm *DownloadTokenManager) ResetAllTokens() { + dtm.mu.Lock() + defer dtm.mu.Unlock() + + for i := range dtm.tokens { + dtm.tokens[i].expired = false + } + dtm.current = 0 +}