diff --git a/internal/app.go b/internal/app.go index d7c44bb..8017c7b 100644 --- a/internal/app.go +++ b/internal/app.go @@ -111,8 +111,8 @@ func MainApp(configPath string) { ) workerCount := config.GetNumberOfWorkers() - if workerCount < 10 { - workerCount = 10 + if workerCount < 20 { + workerCount = 20 } workerPool, err := ants.NewPool(workerCount) if err != nil { @@ -129,6 +129,7 @@ func MainApp(configPath string) { config, log.Named("realdebrid"), ) + rd.MonitorExpiredTokens() premium.MonitorPremiumStatus( workerPool, @@ -153,6 +154,7 @@ func MainApp(configPath string) { ) downloader := universal.NewDownloader(rd, workerPool) + downloader.StartResetBandwidthCountersJob() router := chi.NewRouter() handlers.AttachHandlers( diff --git a/internal/config/v1.go b/internal/config/v1.go index be36357..13171a4 100644 --- a/internal/config/v1.go +++ b/internal/config/v1.go @@ -28,7 +28,7 @@ func loadV1Config(content []byte, log *logutil.Logger) (*ZurgConfigV1, error) { // don't log token and password bufToken := configV1.Token - configV1.Token = strings.Repeat("*", len(bufToken)-4) + bufToken[len(bufToken)-4:] + configV1.Token = utils.MaskToken(configV1.Token) bufPassword := configV1.Password configV1.Password = strings.Repeat("*", len(bufPassword)) log.Debugf("Config dump: %+v", configV1) diff --git a/internal/handlers/home.go b/internal/handlers/home.go index 43bf9d4..c66ae9b 100644 --- a/internal/handlers/home.go +++ b/internal/handlers/home.go @@ -12,6 +12,7 @@ import ( "github.com/debridmediamanager/zurg/internal/config" "github.com/debridmediamanager/zurg/internal/version" "github.com/debridmediamanager/zurg/pkg/realdebrid" + "github.com/debridmediamanager/zurg/pkg/utils" ) type SponsorResponse struct { @@ -117,7 +118,7 @@ func (zr *Handlers) generateResponse(resp http.ResponseWriter, req *http.Request Paypal: "https://paypal.me/yowmamasita", }, Config: zr.cfg.GetConfig(), - Token: strings.Replace(token, token[len(token)-48:], "*****", 1), + Token: utils.MaskToken(token), IDsToDelete: sortedIDs, Hosts: zr.hosts, }, nil diff --git a/internal/torrent/manager.go b/internal/torrent/manager.go index 2b5f159..4de2b4b 100644 --- a/internal/torrent/manager.go +++ b/internal/torrent/manager.go @@ -219,7 +219,7 @@ func (t *TorrentManager) applyMediaInfoDetails(torrent *Torrent) error { return } unrestrict, err := t.UnrestrictFile(file) - if utils.IsBWLimitExceeded(err) { + if utils.AreAllTokensExpired(err) { bwLimitReached = true return } @@ -337,6 +337,7 @@ func (t *TorrentManager) mountNewDownloads() { downloads := t.rd.GetDownloads() mountedCount := 0 for i := range downloads { + downloads[i].Token = token isRealDebrid := strings.HasPrefix(downloads[i].Link, "https://real-debrid.com/d/") if !isRealDebrid { filename := filepath.Base(downloads[i].Filename) diff --git a/internal/torrent/refresh.go b/internal/torrent/refresh.go index 7aaa4de..989ccc5 100644 --- a/internal/torrent/refresh.go +++ b/internal/torrent/refresh.go @@ -294,16 +294,15 @@ func (t *TorrentManager) mergeTorrents(existing, toMerge *Torrent) *Torrent { newer.SelectedFiles.IterCb(func(key string, file *File) { mergedTorrent.SelectedFiles.SetIfAbsent(key, file) }) - older.SelectedFiles.IterCb(func(key string, file *File) { - mediaInfo := file.MediaInfo - f, ok := mergedTorrent.SelectedFiles.Get(key) - if !ok || !f.State.Is("ok_file") { - mergedTorrent.SelectedFiles.Set(key, file) + older.SelectedFiles.IterCb(func(key string, olderFile *File) { + file, ok := mergedTorrent.SelectedFiles.Get(key) + if !ok || (file.State.Is("broken_file") && olderFile.State.Is("ok_file")) { + mergedTorrent.SelectedFiles.Set(key, olderFile) } // get the file again, set the media info - f, ok = mergedTorrent.SelectedFiles.Get(key) - if ok && f.MediaInfo == nil && mediaInfo != nil { - f.MediaInfo = mediaInfo + file, ok = mergedTorrent.SelectedFiles.Get(key) + if ok && file.MediaInfo == nil && olderFile.MediaInfo != nil { + file.MediaInfo = olderFile.MediaInfo } }) diff --git a/internal/torrent/repair.go b/internal/torrent/repair.go index e155c42..88cacb9 100644 --- a/internal/torrent/repair.go +++ b/internal/torrent/repair.go @@ -19,6 +19,7 @@ const ( EXPIRED_LINK_TOLERANCE_HOURS = 24 ) +// StartRepairJob is a permanent job that runs every periodically to repair broken torrents func (t *TorrentManager) StartRepairJob() { if !t.Config.EnableRepair() { t.repairLog.Warn("Repair is disabled, skipping repair job") @@ -34,7 +35,6 @@ func (t *TorrentManager) StartRepairJob() { t.repairLog.Debug("Starting periodic repair job") repairTicker := time.NewTicker(time.Duration(t.Config.GetRepairEveryMins()) * time.Minute) defer repairTicker.Stop() - for { select { case <-repairTicker.C: @@ -192,11 +192,12 @@ func (t *TorrentManager) repair(torrent *Torrent, wg *sync.WaitGroup) { bwLimitReached := false // check for other broken file torrent.SelectedFiles.IterCb(func(_ string, file *File) { + // we will only check for working files, we ignore broken and deleted files if bwLimitReached || !file.State.Is("ok_file") { return } _, err := t.UnrestrictFile(file) - if utils.IsBWLimitExceeded(err) { + if utils.AreAllTokensExpired(err) { bwLimitReached = true return } @@ -252,7 +253,7 @@ func (t *TorrentManager) repair(torrent *Torrent, wg *sync.WaitGroup) { info, err := t.redownloadTorrent(torrent, []string{}) // reinsert the whole torrent, passing empty selection if info != nil && info.Progress == 100 { err = t.checkIfBroken(info, brokenFiles) - if utils.IsBWLimitExceeded(err) { + if utils.AreAllTokensExpired(err) { t.repairLog.Warnf("Your account has reached the bandwidth limit, cannot continue repairing torrent %s", t.GetKey(torrent)) return } @@ -332,14 +333,13 @@ func (t *TorrentManager) assignLinks(torrent *Torrent) bool { expiredCount := 0 rarCount := 0 unassignedCount := 0 - newUnassignedLinks := cmap.New[*realdebrid.Download]() var assignedLinks []string bwLimitReached := false torrent.UnassignedLinks.Clone().Each(func(link string) bool { // unrestrict each unassigned link that was filled out during torrent init unrestrict, err := t.rd.UnrestrictAndVerify(link) - if utils.IsBWLimitExceeded(err) { + if utils.AreAllTokensExpired(err) { bwLimitReached = true return true } @@ -377,7 +377,18 @@ func (t *TorrentManager) assignLinks(torrent *Torrent) bool { // it's possible that it is already repaired t.repairLog.Warnf("Cannot assign %s to any file in torrent %s", unrestrict.Filename, t.GetKey(torrent)) } - newUnassignedLinks.Set(link, unrestrict) + + torrent.SelectedFiles.Set(unrestrict.Filename, &File{ + File: realdebrid.File{ + ID: 0, + Path: unrestrict.Filename, + Bytes: unrestrict.Filesize, + Selected: 0, + }, + Ended: torrent.Added, + Link: unrestrict.Link, + State: NewFileState("ok_file"), + }) } processedCount := assignedCount + unassignedCount + expiredCount @@ -416,20 +427,6 @@ func (t *TorrentManager) assignLinks(torrent *Torrent) bool { return false // end repair } - newUnassignedLinks.IterCb(func(_ string, unassigned *realdebrid.Download) { - torrent.SelectedFiles.Set(unassigned.Filename, &File{ - File: realdebrid.File{ - ID: 0, - Path: unassigned.Filename, - Bytes: unassigned.Filesize, - Selected: 0, - }, - Ended: torrent.Added, - Link: unassigned.Link, - State: NewFileState("ok_file"), - }) - }) - if action == "extract" { videoFiles := []string{} torrent.SelectedFiles.IterCb(func(_ string, file *File) { diff --git a/internal/universal/downloader.go b/internal/universal/downloader.go index 7c10f8b..c70ddbf 100644 --- a/internal/universal/downloader.go +++ b/internal/universal/downloader.go @@ -27,11 +27,14 @@ type Downloader struct { } func NewDownloader(rd *realdebrid.RealDebrid, workerPool *ants.Pool) *Downloader { - dl := &Downloader{ + return &Downloader{ rd: rd, workerPool: workerPool, } +} +// StartResetBandwidthCountersJob is a permanent job that resets the bandwidth counters at 12AM CET +func (dl *Downloader) StartResetBandwidthCountersJob() { // track bandwidth usage and reset at 12AM CET now := time.Now() tomorrow := now.AddDate(0, 0, 1) @@ -42,19 +45,17 @@ func NewDownloader(rd *realdebrid.RealDebrid, workerPool *ants.Pool) *Downloader nextMidnightInCET := time.Date(tomorrow.Year(), tomorrow.Month(), tomorrow.Day(), 0, 0, 0, 0, cetTZ) duration := nextMidnightInCET.Sub(now) timer := time.NewTimer(duration) - // permanent job for bandwidth reset - workerPool.Submit(func() { + + dl.workerPool.Submit(func() { <-timer.C ticker := time.NewTicker(24 * time.Hour) for { - rd.TokenManager.ResetAllTokens() + dl.rd.TokenManager.ResetAllTokens() dl.RequestedBytes.Store(0) dl.TotalBytes.Store(0) <-ticker.C } }) - - return dl } // DownloadFile handles a GET request for files in torrents @@ -90,7 +91,7 @@ func (dl *Downloader) DownloadFile( } unrestrict, err := torMgr.UnrestrictFile(file) - if utils.IsBWLimitExceeded(err) { + if utils.AreAllTokensExpired(err) { // log.Errorf("Your account has reached the bandwidth limit, please try again after 12AM CET") http.Error(resp, "File is not available (bandwidth limit reached)", http.StatusBadRequest) return @@ -164,7 +165,8 @@ func (dl *Downloader) streamFileToResponse( } downloadResp, err := dl.rd.DownloadFile(dlReq) - if utils.IsBWLimitExceeded(err) { + if utils.IsBytesLimitReached(err) { + dl.rd.TokenManager.SetTokenAsExpired(unrestrict.Token, "bandwidth limit exceeded") // log.Errorf("Your account has reached the bandwidth limit, please try again after 12AM CET") http.Error(resp, "File is not available (bandwidth limit reached)", http.StatusBadRequest) return diff --git a/pkg/premium/monitor.go b/pkg/premium/monitor.go index bd21173..159987d 100644 --- a/pkg/premium/monitor.go +++ b/pkg/premium/monitor.go @@ -13,6 +13,7 @@ const ( MINIMUM_SLEEP = 60 // 60 seconds ) +// MonitorPremiumStatus is a permanent job that monitors the premium status of the user func MonitorPremiumStatus(workerPool *ants.Pool, rd *realdebrid.RealDebrid, zurglog *logutil.Logger) { var userInfo *realdebrid.User var err error @@ -52,8 +53,7 @@ func MonitorPremiumStatus(workerPool *ants.Pool, rd *realdebrid.RealDebrid, zurg userInfo, err = rd.GetUserInformation() if err != nil { zurglog.Errorf("Failed to get user information: %v", err) - time.Sleep(5 * time.Minute) - continue + time.Sleep(time.Duration(MINIMUM_SLEEP) * time.Second) } } }) diff --git a/pkg/realdebrid/api.go b/pkg/realdebrid/api.go index b96c82b..80d4fd8 100644 --- a/pkg/realdebrid/api.go +++ b/pkg/realdebrid/api.go @@ -38,7 +38,7 @@ func NewRealDebrid(apiClient, unrestrictClient, downloadClient *zurghttp.HTTPCli rd := &RealDebrid{ UnrestrictMap: cmap.New[cmap.ConcurrentMap[string, *Download]](), - TokenManager: NewDownloadTokenManager(downloadTokens), + TokenManager: NewDownloadTokenManager(downloadTokens, log), torrentsCache: []Torrent{}, verifiedLinks: cmap.New[int64](), apiClient: apiClient, @@ -69,8 +69,8 @@ func (rd *RealDebrid) UnrestrictAndVerify(link string) (*Download, error) { } // check if the link is already unrestricted - tokenMap, ok := rd.UnrestrictMap.Get(token) - if ok && tokenMap.Has(link) { + tokenMap, _ := rd.UnrestrictMap.Get(token) + if tokenMap.Has(link) { download, _ := tokenMap.Get(link) // check if the link is in the verified links cache if expiry, ok := rd.verifiedLinks.Get(download.ID); ok && expiry > time.Now().Unix() { @@ -78,8 +78,8 @@ func (rd *RealDebrid) UnrestrictAndVerify(link string) (*Download, error) { } err := rd.downloadClient.VerifyLink(download.Download) - if utils.IsBWLimitExceeded(err) { - rd.TokenManager.SetTokenAsExpired(token) + if utils.IsBytesLimitReached(err) { + rd.TokenManager.SetTokenAsExpired(token, "bandwidth limit exceeded") continue } if err != nil { @@ -94,12 +94,13 @@ func (rd *RealDebrid) UnrestrictAndVerify(link string) (*Download, error) { if err != nil { return nil, err } + download.Token = token tokenMap.Set(link, download) err = rd.downloadClient.VerifyLink(download.Download) - if utils.IsBWLimitExceeded(err) { - rd.TokenManager.SetTokenAsExpired(token) + if utils.IsBytesLimitReached(err) { + rd.TokenManager.SetTokenAsExpired(token, "bandwidth limit exceeded") continue } if err != nil { @@ -404,3 +405,35 @@ func (rd *RealDebrid) AvailabilityCheck(hashes []string) (AvailabilityResponse, func (rd *RealDebrid) DownloadFile(req *http.Request) (*http.Response, error) { return rd.downloadClient.Do(req) } + +// MonitorExpiredTokens is a permanent job for monitoring expired tokens if they are still expired +func (rd *RealDebrid) MonitorExpiredTokens() { + sleepPeriod := 5 * time.Minute + rd.workerPool.Submit(func() { + for { + expiredTokens := rd.TokenManager.GetExpiredTokens() + for _, token := range expiredTokens { + tokenMap, _ := rd.UnrestrictMap.Get(token) + stillExpired := true + skipAll := false + tokenMap.IterCb(func(key string, download *Download) { + if skipAll { + return + } + err := rd.downloadClient.VerifyLink(download.Download) + if err != nil { + skipAll = utils.IsBytesLimitReached(err) + return + } + stillExpired = false + skipAll = true + rd.verifiedLinks.Set(download.ID, time.Now().Unix()+60*60*24) + }) + if !stillExpired { + rd.TokenManager.SetTokenAsUnexpired(token) + } + } + time.Sleep(sleepPeriod) + } + }) +} diff --git a/pkg/realdebrid/token_manager.go b/pkg/realdebrid/token_manager.go index 63410d7..f9779df 100644 --- a/pkg/realdebrid/token_manager.go +++ b/pkg/realdebrid/token_manager.go @@ -3,6 +3,9 @@ package realdebrid import ( "fmt" "sync" + + "github.com/debridmediamanager/zurg/pkg/logutil" + "github.com/debridmediamanager/zurg/pkg/utils" ) type Token struct { @@ -14,15 +17,16 @@ type DownloadTokenManager struct { tokens []Token current int mu sync.RWMutex + log *logutil.Logger } // NewDownloadTokenManager initializes a new DownloadTokenManager with the given tokens. -func NewDownloadTokenManager(tokenStrings []string) *DownloadTokenManager { +func NewDownloadTokenManager(tokenStrings []string, log *logutil.Logger) *DownloadTokenManager { tokens := make([]Token, len(tokenStrings)) for i, t := range tokenStrings { tokens[i] = Token{value: t, expired: false} } - return &DownloadTokenManager{tokens: tokens, current: 0} + return &DownloadTokenManager{tokens: tokens, current: 0, log: log} } // GetCurrentToken returns the current non-expired token. @@ -38,19 +42,36 @@ func (dtm *DownloadTokenManager) GetCurrentToken() (string, error) { dtm.current = (dtm.current + 1) % len(dtm.tokens) if dtm.current == 0 { - return "", fmt.Errorf("all tokens are bandwidth-limited") + return "", fmt.Errorf("all tokens are expired") } } } // SetTokenAsExpired sets the specified token as expired. -func (dtm *DownloadTokenManager) SetTokenAsExpired(token string) error { +func (dtm *DownloadTokenManager) SetTokenAsExpired(token, reason string) error { dtm.mu.Lock() defer dtm.mu.Unlock() for i, t := range dtm.tokens { if t.value == token { dtm.tokens[i].expired = true + dtm.log.Debugf("Token %s set as expired (reason=%s)", utils.MaskToken(token), reason) + return nil + } + } + + return fmt.Errorf("token not found") +} + +// SetTokenAsUnexpired sets the specified token as unexpired. +func (dtm *DownloadTokenManager) SetTokenAsUnexpired(token string) error { + dtm.mu.Lock() + defer dtm.mu.Unlock() + + for i, t := range dtm.tokens { + if t.value == token { + dtm.tokens[i].expired = false + dtm.log.Debugf("Token %s set as unexpired", utils.MaskToken(token)) return nil } } @@ -68,3 +89,16 @@ func (dtm *DownloadTokenManager) ResetAllTokens() { } dtm.current = 0 } + +func (dtm *DownloadTokenManager) GetExpiredTokens() []string { + dtm.mu.RLock() + defer dtm.mu.RUnlock() + + var tokens []string + for _, t := range dtm.tokens { + if t.expired { + tokens = append(tokens, t.value) + } + } + return tokens +} diff --git a/pkg/realdebrid/types.go b/pkg/realdebrid/types.go index a293e4f..19b9106 100644 --- a/pkg/realdebrid/types.go +++ b/pkg/realdebrid/types.go @@ -24,6 +24,7 @@ type Download struct { Download string `json:"download"` // Generated link Streamable int `json:"streamable"` Generated string `json:"-"` // jsonDate + Token string `json:"-"` } func (d *Download) UnmarshalJSON(data []byte) error { diff --git a/pkg/utils/bwlimit.go b/pkg/utils/bwlimit.go index eefd5cc..202529b 100644 --- a/pkg/utils/bwlimit.go +++ b/pkg/utils/bwlimit.go @@ -2,9 +2,13 @@ package utils import "github.com/debridmediamanager/zurg/pkg/http" -func IsBWLimitExceeded(err error) bool { +func IsBytesLimitReached(err error) bool { if dlErr, ok := err.(*http.DownloadErrorResponse); ok && dlErr.Message == "bytes_limit_reached" { return true } return false } + +func AreAllTokensExpired(err error) bool { + return err.Error() == "all tokens are expired" +} diff --git a/pkg/utils/mask_token.go b/pkg/utils/mask_token.go new file mode 100644 index 0000000..5bc27f7 --- /dev/null +++ b/pkg/utils/mask_token.go @@ -0,0 +1,7 @@ +package utils + +import "strings" + +func MaskToken(token string) string { + return strings.Repeat("*", len(token)-4) + token[len(token)-4:] +}