From fbc431b82bafb5da9b0a1ef0923b2647823fc46d Mon Sep 17 00:00:00 2001 From: Ben Adrian Sarmiento Date: Fri, 12 Jul 2024 14:00:10 +0200 Subject: [PATCH] Add rate limiter --- internal/app.go | 14 ++++++-- internal/commands.go | 5 +-- internal/config/types.go | 74 +++++++++++++++++++++++--------------- pkg/http/client.go | 5 +++ pkg/http/rate_limiter.go | 17 +++++++++ pkg/realdebrid/api.go | 51 +++++++++++++++----------- pkg/realdebrid/torrents.go | 4 ++- 7 files changed, 116 insertions(+), 54 deletions(-) create mode 100644 pkg/http/rate_limiter.go diff --git a/internal/app.go b/internal/app.go index 120668e..4a6b94a 100644 --- a/internal/app.go +++ b/internal/app.go @@ -57,8 +57,10 @@ func MainApp(configPath string) { proxyURL = os.Getenv("PROXY") } - repoClient4 := http.NewHTTPClient("", 0, 1, false, []string{}, proxyURL, log.Named("network_test")) - repoClient6 := http.NewHTTPClient("", 0, 1, true, []string{}, proxyURL, log.Named("network_test")) + dummyLimiter := http.NewRateLimiter(1000) + + repoClient4 := http.NewHTTPClient("", 0, 1, false, []string{}, proxyURL, dummyLimiter, log.Named("network_test")) + repoClient6 := http.NewHTTPClient("", 0, 1, true, []string{}, proxyURL, dummyLimiter, log.Named("network_test")) repo := http.NewIPRepository(repoClient4, repoClient6, "", log.Named("network_test")) var hosts []string @@ -79,6 +81,8 @@ func MainApp(configPath string) { } } + rateLimiter := http.NewRateLimiter(config.GetAPIRateLimitPerSecond()) + apiClient := http.NewHTTPClient( config.GetToken(), config.GetRetriesUntilFailed(), // default retries = 2 @@ -86,6 +90,7 @@ func MainApp(configPath string) { false, // no need for ipv6 support []string{}, // no optimal hosts needed proxyURL, + rateLimiter, log.Named("api_client"), ) @@ -96,6 +101,7 @@ func MainApp(configPath string) { false, // no need for ipv6 support []string{}, // no optimal hosts needed proxyURL, + rateLimiter, log.Named("unrestrict_client"), ) @@ -106,6 +112,7 @@ func MainApp(configPath string) { config.ShouldForceIPv6(), hosts, proxyURL, + rateLimiter, log.Named("download_client"), ) @@ -120,11 +127,14 @@ func MainApp(configPath string) { } defer workerPool.Release() + torrentsRateLimiter := http.NewRateLimiter(config.GetTorrentsRateLimitPerSecond()) + rd := realdebrid.NewRealDebrid( apiClient, unrestrictClient, downloadClient, workerPool, + torrentsRateLimiter, config, log.Named("realdebrid"), ) diff --git a/internal/commands.go b/internal/commands.go index e1b6bfe..89efcd0 100644 --- a/internal/commands.go +++ b/internal/commands.go @@ -31,8 +31,9 @@ func NetworkTest(testURL string) { log.Info("You can set a proxy by setting the PROXY environment variable") } - repoClient4 := http.NewHTTPClient("", 0, 1, false, []string{}, proxyURL, log.Named("network_test")) - repoClient6 := http.NewHTTPClient("", 0, 1, true, []string{}, proxyURL, log.Named("network_test")) + dummyLimiter := http.NewRateLimiter(1000) + repoClient4 := http.NewHTTPClient("", 0, 1, false, []string{}, proxyURL, dummyLimiter, log.Named("network_test")) + repoClient6 := http.NewHTTPClient("", 0, 1, true, []string{}, proxyURL, dummyLimiter, log.Named("network_test")) repo := http.NewIPRepository(repoClient4, repoClient6, testURL, log.Named("network_test")) repo.NetworkTest(true, true) } diff --git a/internal/config/types.go b/internal/config/types.go index bd3c114..29d4f13 100644 --- a/internal/config/types.go +++ b/internal/config/types.go @@ -10,6 +10,7 @@ type ConfigInterface interface { EnableRepair() bool EnableRetainFolderNameExtension() bool EnableRetainRDTorrentName() bool + GetAPIRateLimitPerSecond() int GetApiTimeoutSecs() int GetConfig() ZurgConfig GetDirectories() []string @@ -30,6 +31,7 @@ type ConfigInterface interface { GetRepairEveryMins() int GetRetriesUntilFailed() int GetToken() string + GetTorrentsRateLimitPerSecond() int GetUsername() string GetVersion() string MeetsConditions(directory, torrentName string, torrentSize int64, torrentIDs, fileNames []string, fileSizes []int64, mediaInfos []*ffprobe.ProbeData) bool @@ -46,34 +48,36 @@ type ZurgConfig struct { Version string `yaml:"zurg" json:"-"` Token string `yaml:"token" json:"-"` - ApiTimeoutSecs int `yaml:"api_timeout_secs" json:"api_timeout_secs"` - AutoAnalyzeNewTorrents bool `yaml:"auto_analyze_new_torrents" json:"auto_analyze_new_torrents"` - CacheNetworkTestResults bool `yaml:"cache_network_test_results" json:"cache_network_test_results"` - CanRepair bool `yaml:"enable_repair" json:"enable_repair"` - DownloadsEveryMins int `yaml:"downloads_every_mins" json:"downloads_every_mins"` - DownloadTimeoutSecs int `yaml:"download_timeout_secs" json:"download_timeout_secs"` - DownloadTokens []string `yaml:"download_tokens" json:"download_tokens"` - DumpTorrentsEveryMins int `yaml:"dump_torrents_every_mins" json:"dump_torrents_every_mins"` - ForceIPv6 bool `yaml:"force_ipv6" json:"force_ipv6"` - HideBrokenTorrents bool `yaml:"hide_broken_torrents" json:"hide_broken_torrents"` - Host string `yaml:"host" json:"host"` - IgnoreRenames bool `yaml:"ignore_renames" json:"ignore_renames"` - LogRequests bool `yaml:"log_requests" json:"log_requests"` - NumberOfHosts int `yaml:"number_of_hosts" json:"number_of_hosts"` - NumOfWorkers int `yaml:"concurrent_workers" json:"concurrent_workers"` - OnLibraryUpdate string `yaml:"on_library_update" json:"on_library_update"` - Password string `yaml:"password" json:"password"` - PlayableExtensions []string `yaml:"addl_playable_extensions" json:"addl_playable_extensions"` - Port string `yaml:"port" json:"port"` - Proxy string `yaml:"proxy" json:"proxy"` - RarAction string `yaml:"rar_action" json:"rar_action"` - RefreshEverySecs int `yaml:"check_for_changes_every_secs" json:"check_for_changes_every_secs"` - RepairEveryMins int `yaml:"repair_every_mins" json:"repair_every_mins"` - RetainFolderNameExtension bool `yaml:"retain_folder_name_extension" json:"retain_folder_name_extension"` - RetainRDTorrentName bool `yaml:"retain_rd_torrent_name" json:"retain_rd_torrent_name"` - RetriesUntilFailed int `yaml:"retries_until_failed" json:"retries_until_failed"` - ServeFromRclone bool `yaml:"serve_from_rclone" json:"serve_from_rclone"` - Username string `yaml:"username" json:"username"` + APIRateLimitPerSecond int `yaml:"api_rate_limit_per_second" json:"api_rate_limit_per_second"` + ApiTimeoutSecs int `yaml:"api_timeout_secs" json:"api_timeout_secs"` + AutoAnalyzeNewTorrents bool `yaml:"auto_analyze_new_torrents" json:"auto_analyze_new_torrents"` + CacheNetworkTestResults bool `yaml:"cache_network_test_results" json:"cache_network_test_results"` + CanRepair bool `yaml:"enable_repair" json:"enable_repair"` + DownloadsEveryMins int `yaml:"downloads_every_mins" json:"downloads_every_mins"` + DownloadTimeoutSecs int `yaml:"download_timeout_secs" json:"download_timeout_secs"` + DownloadTokens []string `yaml:"download_tokens" json:"download_tokens"` + DumpTorrentsEveryMins int `yaml:"dump_torrents_every_mins" json:"dump_torrents_every_mins"` + ForceIPv6 bool `yaml:"force_ipv6" json:"force_ipv6"` + HideBrokenTorrents bool `yaml:"hide_broken_torrents" json:"hide_broken_torrents"` + Host string `yaml:"host" json:"host"` + IgnoreRenames bool `yaml:"ignore_renames" json:"ignore_renames"` + LogRequests bool `yaml:"log_requests" json:"log_requests"` + NumberOfHosts int `yaml:"number_of_hosts" json:"number_of_hosts"` + NumOfWorkers int `yaml:"concurrent_workers" json:"concurrent_workers"` + OnLibraryUpdate string `yaml:"on_library_update" json:"on_library_update"` + Password string `yaml:"password" json:"password"` + PlayableExtensions []string `yaml:"addl_playable_extensions" json:"addl_playable_extensions"` + Port string `yaml:"port" json:"port"` + Proxy string `yaml:"proxy" json:"proxy"` + RarAction string `yaml:"rar_action" json:"rar_action"` + RefreshEverySecs int `yaml:"check_for_changes_every_secs" json:"check_for_changes_every_secs"` + RepairEveryMins int `yaml:"repair_every_mins" json:"repair_every_mins"` + RetainFolderNameExtension bool `yaml:"retain_folder_name_extension" json:"retain_folder_name_extension"` + RetainRDTorrentName bool `yaml:"retain_rd_torrent_name" json:"retain_rd_torrent_name"` + RetriesUntilFailed int `yaml:"retries_until_failed" json:"retries_until_failed"` + ServeFromRclone bool `yaml:"serve_from_rclone" json:"serve_from_rclone"` + TorrentsRateLimitPerSecond int `yaml:"torrents_rate_limit_per_second" json:"torrents_rate_limit_per_second"` + Username string `yaml:"username" json:"username"` } func (z *ZurgConfig) GetConfig() ZurgConfig { @@ -238,3 +242,17 @@ func (z *ZurgConfig) GetDownloadTokens() []string { func (z *ZurgConfig) ShouldHideBrokenTorrents() bool { return z.HideBrokenTorrents } + +func (z *ZurgConfig) GetAPIRateLimitPerSecond() int { + if z.APIRateLimitPerSecond == 0 { + return 250 + } + return z.APIRateLimitPerSecond +} + +func (z *ZurgConfig) GetTorrentsRateLimitPerSecond() int { + if z.TorrentsRateLimitPerSecond == 0 { + return 1 + } + return z.TorrentsRateLimitPerSecond +} diff --git a/pkg/http/client.go b/pkg/http/client.go index 18c8046..127a099 100644 --- a/pkg/http/client.go +++ b/pkg/http/client.go @@ -30,6 +30,7 @@ type HTTPClient struct { backoff func(int, int) time.Duration dnsCache cmap.ConcurrentMap[string, string] hosts []string + rateLimiter *RateLimiter log *logutil.Logger } @@ -58,6 +59,7 @@ func NewHTTPClient( forceIPv6 bool, hosts []string, proxyURL string, + rateLimiter *RateLimiter, log *logutil.Logger, ) *HTTPClient { client := HTTPClient{ @@ -69,6 +71,7 @@ func NewHTTPClient( backoff: backoffFunc, dnsCache: cmap.New[string](), hosts: hosts, + rateLimiter: rateLimiter, log: log, } @@ -154,6 +157,8 @@ func (r *HTTPClient) Do(req *http.Request) (*http.Response, error) { r.ensureReachableHost(req) } + r.rateLimiter.Wait() + resp, err = r.client.Do(req) // http 4xx and 5xx errors diff --git a/pkg/http/rate_limiter.go b/pkg/http/rate_limiter.go new file mode 100644 index 0000000..1389187 --- /dev/null +++ b/pkg/http/rate_limiter.go @@ -0,0 +1,17 @@ +package http + +import "time" + +type RateLimiter struct { + ticker *time.Ticker +} + +func NewRateLimiter(apiRateLimitPerSecond int) *RateLimiter { + return &RateLimiter{ + ticker: time.NewTicker(time.Second / time.Duration(apiRateLimitPerSecond)), + } +} + +func (r *RateLimiter) Wait() { + <-r.ticker.C +} diff --git a/pkg/realdebrid/api.go b/pkg/realdebrid/api.go index 504449e..27a6dfc 100644 --- a/pkg/realdebrid/api.go +++ b/pkg/realdebrid/api.go @@ -17,19 +17,27 @@ import ( ) type RealDebrid struct { - UnrestrictMap cmap.ConcurrentMap[string, cmap.ConcurrentMap[string, *Download]] - TokenManager *DownloadTokenManager - torrentsCache []Torrent - verifiedLinks cmap.ConcurrentMap[string, int64] - apiClient *zurghttp.HTTPClient - unrestrictClient *zurghttp.HTTPClient - downloadClient *zurghttp.HTTPClient - workerPool *ants.Pool - cfg config.ConfigInterface - log *logutil.Logger + UnrestrictMap cmap.ConcurrentMap[string, cmap.ConcurrentMap[string, *Download]] + TokenManager *DownloadTokenManager + torrentsCache []Torrent + verifiedLinks cmap.ConcurrentMap[string, int64] + apiClient *zurghttp.HTTPClient + unrestrictClient *zurghttp.HTTPClient + downloadClient *zurghttp.HTTPClient + workerPool *ants.Pool + torrentsRateLimiter *zurghttp.RateLimiter + cfg config.ConfigInterface + log *logutil.Logger } -func NewRealDebrid(apiClient, unrestrictClient, downloadClient *zurghttp.HTTPClient, workerPool *ants.Pool, cfg config.ConfigInterface, log *logutil.Logger) *RealDebrid { +func NewRealDebrid(apiClient, + unrestrictClient, + downloadClient *zurghttp.HTTPClient, + workerPool *ants.Pool, + torrentsRateLimiter *zurghttp.RateLimiter, + cfg config.ConfigInterface, + log *logutil.Logger, +) *RealDebrid { mainToken := cfg.GetToken() downloadTokens := cfg.GetDownloadTokens() if !strings.Contains(strings.Join(downloadTokens, ","), mainToken) { @@ -37,16 +45,17 @@ func NewRealDebrid(apiClient, unrestrictClient, downloadClient *zurghttp.HTTPCli } rd := &RealDebrid{ - UnrestrictMap: cmap.New[cmap.ConcurrentMap[string, *Download]](), - TokenManager: NewDownloadTokenManager(downloadTokens, log), - torrentsCache: []Torrent{}, - verifiedLinks: cmap.New[int64](), - apiClient: apiClient, - unrestrictClient: unrestrictClient, - downloadClient: downloadClient, - workerPool: workerPool, - cfg: cfg, - log: log, + UnrestrictMap: cmap.New[cmap.ConcurrentMap[string, *Download]](), + TokenManager: NewDownloadTokenManager(downloadTokens, log), + torrentsCache: []Torrent{}, + verifiedLinks: cmap.New[int64](), + apiClient: apiClient, + unrestrictClient: unrestrictClient, + downloadClient: downloadClient, + workerPool: workerPool, + torrentsRateLimiter: torrentsRateLimiter, + cfg: cfg, + log: log, } for _, token := range downloadTokens { diff --git a/pkg/realdebrid/torrents.go b/pkg/realdebrid/torrents.go index d750306..0c4a9cd 100644 --- a/pkg/realdebrid/torrents.go +++ b/pkg/realdebrid/torrents.go @@ -29,7 +29,7 @@ func (rd *RealDebrid) GetTorrents(onlyOne bool) ([]Torrent, int, error) { allTorrents := []Torrent{} page := 1 - pageSize := 250 + pageSize := 500 maxPages := (totalElements + pageSize - 1) / pageSize rd.log.Debugf("Torrents total count is %d", totalElements) @@ -127,6 +127,8 @@ func (rd *RealDebrid) fetchPageOfTorrents(page, limit int) fetchTorrentsResult { } } + rd.torrentsRateLimiter.Wait() + resp, err := rd.apiClient.Do(req) if err != nil { rd.log.Errorf("Error when executing the get torrents request: %v", err)