Add rate limiter

This commit is contained in:
Ben Adrian Sarmiento
2024-07-12 14:00:10 +02:00
parent fded8ee8aa
commit fbc431b82b
7 changed files with 116 additions and 54 deletions

View File

@@ -57,8 +57,10 @@ func MainApp(configPath string) {
proxyURL = os.Getenv("PROXY") proxyURL = os.Getenv("PROXY")
} }
repoClient4 := http.NewHTTPClient("", 0, 1, false, []string{}, proxyURL, log.Named("network_test")) dummyLimiter := http.NewRateLimiter(1000)
repoClient6 := http.NewHTTPClient("", 0, 1, true, []string{}, proxyURL, log.Named("network_test"))
repoClient4 := http.NewHTTPClient("", 0, 1, false, []string{}, proxyURL, dummyLimiter, log.Named("network_test"))
repoClient6 := http.NewHTTPClient("", 0, 1, true, []string{}, proxyURL, dummyLimiter, log.Named("network_test"))
repo := http.NewIPRepository(repoClient4, repoClient6, "", log.Named("network_test")) repo := http.NewIPRepository(repoClient4, repoClient6, "", log.Named("network_test"))
var hosts []string var hosts []string
@@ -79,6 +81,8 @@ func MainApp(configPath string) {
} }
} }
rateLimiter := http.NewRateLimiter(config.GetAPIRateLimitPerSecond())
apiClient := http.NewHTTPClient( apiClient := http.NewHTTPClient(
config.GetToken(), config.GetToken(),
config.GetRetriesUntilFailed(), // default retries = 2 config.GetRetriesUntilFailed(), // default retries = 2
@@ -86,6 +90,7 @@ func MainApp(configPath string) {
false, // no need for ipv6 support false, // no need for ipv6 support
[]string{}, // no optimal hosts needed []string{}, // no optimal hosts needed
proxyURL, proxyURL,
rateLimiter,
log.Named("api_client"), log.Named("api_client"),
) )
@@ -96,6 +101,7 @@ func MainApp(configPath string) {
false, // no need for ipv6 support false, // no need for ipv6 support
[]string{}, // no optimal hosts needed []string{}, // no optimal hosts needed
proxyURL, proxyURL,
rateLimiter,
log.Named("unrestrict_client"), log.Named("unrestrict_client"),
) )
@@ -106,6 +112,7 @@ func MainApp(configPath string) {
config.ShouldForceIPv6(), config.ShouldForceIPv6(),
hosts, hosts,
proxyURL, proxyURL,
rateLimiter,
log.Named("download_client"), log.Named("download_client"),
) )
@@ -120,11 +127,14 @@ func MainApp(configPath string) {
} }
defer workerPool.Release() defer workerPool.Release()
torrentsRateLimiter := http.NewRateLimiter(config.GetTorrentsRateLimitPerSecond())
rd := realdebrid.NewRealDebrid( rd := realdebrid.NewRealDebrid(
apiClient, apiClient,
unrestrictClient, unrestrictClient,
downloadClient, downloadClient,
workerPool, workerPool,
torrentsRateLimiter,
config, config,
log.Named("realdebrid"), log.Named("realdebrid"),
) )

View File

@@ -31,8 +31,9 @@ func NetworkTest(testURL string) {
log.Info("You can set a proxy by setting the PROXY environment variable") log.Info("You can set a proxy by setting the PROXY environment variable")
} }
repoClient4 := http.NewHTTPClient("", 0, 1, false, []string{}, proxyURL, log.Named("network_test")) dummyLimiter := http.NewRateLimiter(1000)
repoClient6 := http.NewHTTPClient("", 0, 1, true, []string{}, proxyURL, log.Named("network_test")) repoClient4 := http.NewHTTPClient("", 0, 1, false, []string{}, proxyURL, dummyLimiter, log.Named("network_test"))
repoClient6 := http.NewHTTPClient("", 0, 1, true, []string{}, proxyURL, dummyLimiter, log.Named("network_test"))
repo := http.NewIPRepository(repoClient4, repoClient6, testURL, log.Named("network_test")) repo := http.NewIPRepository(repoClient4, repoClient6, testURL, log.Named("network_test"))
repo.NetworkTest(true, true) repo.NetworkTest(true, true)
} }

View File

@@ -10,6 +10,7 @@ type ConfigInterface interface {
EnableRepair() bool EnableRepair() bool
EnableRetainFolderNameExtension() bool EnableRetainFolderNameExtension() bool
EnableRetainRDTorrentName() bool EnableRetainRDTorrentName() bool
GetAPIRateLimitPerSecond() int
GetApiTimeoutSecs() int GetApiTimeoutSecs() int
GetConfig() ZurgConfig GetConfig() ZurgConfig
GetDirectories() []string GetDirectories() []string
@@ -30,6 +31,7 @@ type ConfigInterface interface {
GetRepairEveryMins() int GetRepairEveryMins() int
GetRetriesUntilFailed() int GetRetriesUntilFailed() int
GetToken() string GetToken() string
GetTorrentsRateLimitPerSecond() int
GetUsername() string GetUsername() string
GetVersion() string GetVersion() string
MeetsConditions(directory, torrentName string, torrentSize int64, torrentIDs, fileNames []string, fileSizes []int64, mediaInfos []*ffprobe.ProbeData) bool MeetsConditions(directory, torrentName string, torrentSize int64, torrentIDs, fileNames []string, fileSizes []int64, mediaInfos []*ffprobe.ProbeData) bool
@@ -46,34 +48,36 @@ type ZurgConfig struct {
Version string `yaml:"zurg" json:"-"` Version string `yaml:"zurg" json:"-"`
Token string `yaml:"token" json:"-"` Token string `yaml:"token" json:"-"`
ApiTimeoutSecs int `yaml:"api_timeout_secs" json:"api_timeout_secs"` APIRateLimitPerSecond int `yaml:"api_rate_limit_per_second" json:"api_rate_limit_per_second"`
AutoAnalyzeNewTorrents bool `yaml:"auto_analyze_new_torrents" json:"auto_analyze_new_torrents"` ApiTimeoutSecs int `yaml:"api_timeout_secs" json:"api_timeout_secs"`
CacheNetworkTestResults bool `yaml:"cache_network_test_results" json:"cache_network_test_results"` AutoAnalyzeNewTorrents bool `yaml:"auto_analyze_new_torrents" json:"auto_analyze_new_torrents"`
CanRepair bool `yaml:"enable_repair" json:"enable_repair"` CacheNetworkTestResults bool `yaml:"cache_network_test_results" json:"cache_network_test_results"`
DownloadsEveryMins int `yaml:"downloads_every_mins" json:"downloads_every_mins"` CanRepair bool `yaml:"enable_repair" json:"enable_repair"`
DownloadTimeoutSecs int `yaml:"download_timeout_secs" json:"download_timeout_secs"` DownloadsEveryMins int `yaml:"downloads_every_mins" json:"downloads_every_mins"`
DownloadTokens []string `yaml:"download_tokens" json:"download_tokens"` DownloadTimeoutSecs int `yaml:"download_timeout_secs" json:"download_timeout_secs"`
DumpTorrentsEveryMins int `yaml:"dump_torrents_every_mins" json:"dump_torrents_every_mins"` DownloadTokens []string `yaml:"download_tokens" json:"download_tokens"`
ForceIPv6 bool `yaml:"force_ipv6" json:"force_ipv6"` DumpTorrentsEveryMins int `yaml:"dump_torrents_every_mins" json:"dump_torrents_every_mins"`
HideBrokenTorrents bool `yaml:"hide_broken_torrents" json:"hide_broken_torrents"` ForceIPv6 bool `yaml:"force_ipv6" json:"force_ipv6"`
Host string `yaml:"host" json:"host"` HideBrokenTorrents bool `yaml:"hide_broken_torrents" json:"hide_broken_torrents"`
IgnoreRenames bool `yaml:"ignore_renames" json:"ignore_renames"` Host string `yaml:"host" json:"host"`
LogRequests bool `yaml:"log_requests" json:"log_requests"` IgnoreRenames bool `yaml:"ignore_renames" json:"ignore_renames"`
NumberOfHosts int `yaml:"number_of_hosts" json:"number_of_hosts"` LogRequests bool `yaml:"log_requests" json:"log_requests"`
NumOfWorkers int `yaml:"concurrent_workers" json:"concurrent_workers"` NumberOfHosts int `yaml:"number_of_hosts" json:"number_of_hosts"`
OnLibraryUpdate string `yaml:"on_library_update" json:"on_library_update"` NumOfWorkers int `yaml:"concurrent_workers" json:"concurrent_workers"`
Password string `yaml:"password" json:"password"` OnLibraryUpdate string `yaml:"on_library_update" json:"on_library_update"`
PlayableExtensions []string `yaml:"addl_playable_extensions" json:"addl_playable_extensions"` Password string `yaml:"password" json:"password"`
Port string `yaml:"port" json:"port"` PlayableExtensions []string `yaml:"addl_playable_extensions" json:"addl_playable_extensions"`
Proxy string `yaml:"proxy" json:"proxy"` Port string `yaml:"port" json:"port"`
RarAction string `yaml:"rar_action" json:"rar_action"` Proxy string `yaml:"proxy" json:"proxy"`
RefreshEverySecs int `yaml:"check_for_changes_every_secs" json:"check_for_changes_every_secs"` RarAction string `yaml:"rar_action" json:"rar_action"`
RepairEveryMins int `yaml:"repair_every_mins" json:"repair_every_mins"` RefreshEverySecs int `yaml:"check_for_changes_every_secs" json:"check_for_changes_every_secs"`
RetainFolderNameExtension bool `yaml:"retain_folder_name_extension" json:"retain_folder_name_extension"` RepairEveryMins int `yaml:"repair_every_mins" json:"repair_every_mins"`
RetainRDTorrentName bool `yaml:"retain_rd_torrent_name" json:"retain_rd_torrent_name"` RetainFolderNameExtension bool `yaml:"retain_folder_name_extension" json:"retain_folder_name_extension"`
RetriesUntilFailed int `yaml:"retries_until_failed" json:"retries_until_failed"` RetainRDTorrentName bool `yaml:"retain_rd_torrent_name" json:"retain_rd_torrent_name"`
ServeFromRclone bool `yaml:"serve_from_rclone" json:"serve_from_rclone"` RetriesUntilFailed int `yaml:"retries_until_failed" json:"retries_until_failed"`
Username string `yaml:"username" json:"username"` ServeFromRclone bool `yaml:"serve_from_rclone" json:"serve_from_rclone"`
TorrentsRateLimitPerSecond int `yaml:"torrents_rate_limit_per_second" json:"torrents_rate_limit_per_second"`
Username string `yaml:"username" json:"username"`
} }
func (z *ZurgConfig) GetConfig() ZurgConfig { func (z *ZurgConfig) GetConfig() ZurgConfig {
@@ -238,3 +242,17 @@ func (z *ZurgConfig) GetDownloadTokens() []string {
func (z *ZurgConfig) ShouldHideBrokenTorrents() bool { func (z *ZurgConfig) ShouldHideBrokenTorrents() bool {
return z.HideBrokenTorrents return z.HideBrokenTorrents
} }
func (z *ZurgConfig) GetAPIRateLimitPerSecond() int {
if z.APIRateLimitPerSecond == 0 {
return 250
}
return z.APIRateLimitPerSecond
}
func (z *ZurgConfig) GetTorrentsRateLimitPerSecond() int {
if z.TorrentsRateLimitPerSecond == 0 {
return 1
}
return z.TorrentsRateLimitPerSecond
}

View File

@@ -30,6 +30,7 @@ type HTTPClient struct {
backoff func(int, int) time.Duration backoff func(int, int) time.Duration
dnsCache cmap.ConcurrentMap[string, string] dnsCache cmap.ConcurrentMap[string, string]
hosts []string hosts []string
rateLimiter *RateLimiter
log *logutil.Logger log *logutil.Logger
} }
@@ -58,6 +59,7 @@ func NewHTTPClient(
forceIPv6 bool, forceIPv6 bool,
hosts []string, hosts []string,
proxyURL string, proxyURL string,
rateLimiter *RateLimiter,
log *logutil.Logger, log *logutil.Logger,
) *HTTPClient { ) *HTTPClient {
client := HTTPClient{ client := HTTPClient{
@@ -69,6 +71,7 @@ func NewHTTPClient(
backoff: backoffFunc, backoff: backoffFunc,
dnsCache: cmap.New[string](), dnsCache: cmap.New[string](),
hosts: hosts, hosts: hosts,
rateLimiter: rateLimiter,
log: log, log: log,
} }
@@ -154,6 +157,8 @@ func (r *HTTPClient) Do(req *http.Request) (*http.Response, error) {
r.ensureReachableHost(req) r.ensureReachableHost(req)
} }
r.rateLimiter.Wait()
resp, err = r.client.Do(req) resp, err = r.client.Do(req)
// http 4xx and 5xx errors // http 4xx and 5xx errors

17
pkg/http/rate_limiter.go Normal file
View File

@@ -0,0 +1,17 @@
package http
import "time"
type RateLimiter struct {
ticker *time.Ticker
}
func NewRateLimiter(apiRateLimitPerSecond int) *RateLimiter {
return &RateLimiter{
ticker: time.NewTicker(time.Second / time.Duration(apiRateLimitPerSecond)),
}
}
func (r *RateLimiter) Wait() {
<-r.ticker.C
}

View File

@@ -17,19 +17,27 @@ import (
) )
type RealDebrid struct { type RealDebrid struct {
UnrestrictMap cmap.ConcurrentMap[string, cmap.ConcurrentMap[string, *Download]] UnrestrictMap cmap.ConcurrentMap[string, cmap.ConcurrentMap[string, *Download]]
TokenManager *DownloadTokenManager TokenManager *DownloadTokenManager
torrentsCache []Torrent torrentsCache []Torrent
verifiedLinks cmap.ConcurrentMap[string, int64] verifiedLinks cmap.ConcurrentMap[string, int64]
apiClient *zurghttp.HTTPClient apiClient *zurghttp.HTTPClient
unrestrictClient *zurghttp.HTTPClient unrestrictClient *zurghttp.HTTPClient
downloadClient *zurghttp.HTTPClient downloadClient *zurghttp.HTTPClient
workerPool *ants.Pool workerPool *ants.Pool
cfg config.ConfigInterface torrentsRateLimiter *zurghttp.RateLimiter
log *logutil.Logger cfg config.ConfigInterface
log *logutil.Logger
} }
func NewRealDebrid(apiClient, unrestrictClient, downloadClient *zurghttp.HTTPClient, workerPool *ants.Pool, cfg config.ConfigInterface, log *logutil.Logger) *RealDebrid { func NewRealDebrid(apiClient,
unrestrictClient,
downloadClient *zurghttp.HTTPClient,
workerPool *ants.Pool,
torrentsRateLimiter *zurghttp.RateLimiter,
cfg config.ConfigInterface,
log *logutil.Logger,
) *RealDebrid {
mainToken := cfg.GetToken() mainToken := cfg.GetToken()
downloadTokens := cfg.GetDownloadTokens() downloadTokens := cfg.GetDownloadTokens()
if !strings.Contains(strings.Join(downloadTokens, ","), mainToken) { if !strings.Contains(strings.Join(downloadTokens, ","), mainToken) {
@@ -37,16 +45,17 @@ func NewRealDebrid(apiClient, unrestrictClient, downloadClient *zurghttp.HTTPCli
} }
rd := &RealDebrid{ rd := &RealDebrid{
UnrestrictMap: cmap.New[cmap.ConcurrentMap[string, *Download]](), UnrestrictMap: cmap.New[cmap.ConcurrentMap[string, *Download]](),
TokenManager: NewDownloadTokenManager(downloadTokens, log), TokenManager: NewDownloadTokenManager(downloadTokens, log),
torrentsCache: []Torrent{}, torrentsCache: []Torrent{},
verifiedLinks: cmap.New[int64](), verifiedLinks: cmap.New[int64](),
apiClient: apiClient, apiClient: apiClient,
unrestrictClient: unrestrictClient, unrestrictClient: unrestrictClient,
downloadClient: downloadClient, downloadClient: downloadClient,
workerPool: workerPool, workerPool: workerPool,
cfg: cfg, torrentsRateLimiter: torrentsRateLimiter,
log: log, cfg: cfg,
log: log,
} }
for _, token := range downloadTokens { for _, token := range downloadTokens {

View File

@@ -29,7 +29,7 @@ func (rd *RealDebrid) GetTorrents(onlyOne bool) ([]Torrent, int, error) {
allTorrents := []Torrent{} allTorrents := []Torrent{}
page := 1 page := 1
pageSize := 250 pageSize := 500
maxPages := (totalElements + pageSize - 1) / pageSize maxPages := (totalElements + pageSize - 1) / pageSize
rd.log.Debugf("Torrents total count is %d", totalElements) rd.log.Debugf("Torrents total count is %d", totalElements)
@@ -127,6 +127,8 @@ func (rd *RealDebrid) fetchPageOfTorrents(page, limit int) fetchTorrentsResult {
} }
} }
rd.torrentsRateLimiter.Wait()
resp, err := rd.apiClient.Do(req) resp, err := rd.apiClient.Do(req)
if err != nil { if err != nil {
rd.log.Errorf("Error when executing the get torrents request: %v", err) rd.log.Errorf("Error when executing the get torrents request: %v", err)