From 17059e6a4a7897b3a36a79136901d8ed308c8c76 Mon Sep 17 00:00:00 2001 From: Ben Sarmiento Date: Sun, 26 May 2024 03:49:16 +0200 Subject: [PATCH] Dump torrents job --- internal/config/types.go | 18 +++++++++ internal/handlers/home.go | 22 +++++++++- internal/handlers/router.go | 1 + internal/torrent/manager.go | 80 +++++++++++++++++++++++++++++++++---- pkg/realdebrid/api.go | 2 +- 5 files changed, 114 insertions(+), 9 deletions(-) diff --git a/internal/config/types.go b/internal/config/types.go index 1903853..744c242 100644 --- a/internal/config/types.go +++ b/internal/config/types.go @@ -30,6 +30,8 @@ type ConfigInterface interface { GetRateLimitSleepSecs() int ShouldDeleteRarFiles() bool GetDownloadsEveryMins() int + GetDownloadsLimit() int + GetDumpTorrentsEveryMins() int GetPlayableExtensions() []string GetTorrentsCount() int } @@ -42,6 +44,8 @@ type ZurgConfig struct { CanRepair bool `yaml:"enable_repair" json:"enable_repair"` DeleteRarFiles bool `yaml:"auto_delete_rar_torrents" json:"auto_delete_rar_torrents"` DownloadsEveryMins int `yaml:"downloads_every_mins" json:"downloads_every_mins"` + DownloadsLimit int `yaml:"downloads_limit" json:"downloads_limit"` + DumpTorrentsEveryMins int `yaml:"dump_torrents_every_mins" json:"dump_torrents_every_mins"` DownloadTimeoutSecs int `yaml:"download_timeout_secs" json:"download_timeout_secs"` ForceIPv6 bool `yaml:"force_ipv6" json:"force_ipv6"` Host string `yaml:"host" json:"host"` @@ -132,6 +136,20 @@ func (z *ZurgConfig) GetDownloadsEveryMins() int { return z.DownloadsEveryMins } +func (z *ZurgConfig) GetDownloadsLimit() int { + if z.DownloadsLimit == 0 { + return 50000 + } + return z.DownloadsLimit +} + +func (z *ZurgConfig) GetDumpTorrentsEveryMins() int { + if z.DumpTorrentsEveryMins == 0 { + return 60 + } + return z.DumpTorrentsEveryMins +} + func (z *ZurgConfig) EnableRepair() bool { return z.CanRepair } diff --git a/internal/handlers/home.go b/internal/handlers/home.go index 62f0cf1..c684c91 100644 --- a/internal/handlers/home.go +++ b/internal/handlers/home.go @@ -163,7 +163,7 @@ func (zr *Handlers) handleHome(resp http.ResponseWriter, req *http.Request) { %s - Config + Config Version %s @@ -223,6 +223,14 @@ func (zr *Handlers) handleHome(resp http.ResponseWriter, req *http.Request) { Refresh Download Mount Every... %d mins + + Get downloads limit + %d items + + + Dump Torrents Every... + %d mins + Rate Limit Sleep for... %d secs @@ -273,6 +281,9 @@ func (zr *Handlers) handleHome(resp http.ResponseWriter, req *http.Request) {
+
+ +
@@ -323,6 +334,8 @@ func (zr *Handlers) handleHome(resp http.ResponseWriter, req *http.Request) { response.Config.GetTorrentsCount(), response.Config.GetDownloadTimeoutSecs(), response.Config.GetDownloadsEveryMins(), + response.Config.GetDownloadsLimit(), + response.Config.GetDumpTorrentsEveryMins(), response.Config.GetRateLimitSleepSecs(), response.Config.GetRetriesUntilFailed(), response.Config.GetNetworkBufferSize(), @@ -367,6 +380,13 @@ func (zr *Handlers) handleRemountDownloads(resp http.ResponseWriter, req *http.R fmt.Fprint(resp, "Remounting downloads...") } +func (zr *Handlers) handleDumpTorrents(resp http.ResponseWriter, req *http.Request) { + resp.Header().Set("Refresh", "2; url=/") + zr.torMgr.DumpTrigger <- struct{}{} + zr.log.Infof("Triggered dump of torrents") + fmt.Fprint(resp, "Dumping torrents...") +} + func bToMb(b uint64) uint64 { return b / 1024 / 1024 } diff --git a/internal/handlers/router.go b/internal/handlers/router.go index a4162a5..9e2e475 100644 --- a/internal/handlers/router.go +++ b/internal/handlers/router.go @@ -54,6 +54,7 @@ func AttachHandlers(router *chi.Mux, downloader *universal.Downloader, torMgr *t router.Post("/reboot/refresh", hs.handleRebootRefreshWorker) router.Post("/reboot/repair", hs.handleRebootRepairWorker) router.Post("/remount/downloads", hs.handleRemountDownloads) + router.Post("/dump/torrents", hs.handleDumpTorrents) // version router.Get(fmt.Sprintf("/{mountType}/%s", version.FILE), hs.handleVersionFile) router.Head(fmt.Sprintf("/{mountType}/%s", version.FILE), hs.handleCheckVersionFile) diff --git a/internal/torrent/manager.go b/internal/torrent/manager.go index 9533f7d..14b4256 100644 --- a/internal/torrent/manager.go +++ b/internal/torrent/manager.go @@ -41,6 +41,7 @@ type TorrentManager struct { RefreshKillSwitch chan struct{} RepairKillSwitch chan struct{} RemountTrigger chan struct{} + DumpTrigger chan struct{} latestState *LibraryState @@ -75,29 +76,37 @@ func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, w RefreshKillSwitch: make(chan struct{}, 1), RepairKillSwitch: make(chan struct{}, 1), RemountTrigger: make(chan struct{}, 1), + DumpTrigger: make(chan struct{}, 1), latestState: &LibraryState{log: log}, } t.initializeBins() t.initializeDirectoryMaps() + var wg sync.WaitGroup wg.Add(2) + t.workerPool.Submit(func() { + defer wg.Done() t.refreshTorrents() - wg.Done() - t.setNewLatestState(t.getCurrentState()) - t.StartRefreshJob() - t.StartRepairJob() - t.TriggerRepair(nil) }) t.workerPool.Submit(func() { + defer wg.Done() t.mountNewDownloads() - wg.Done() - t.StartDownloadsJob() }) + t.workerPool.Submit(func() { wg.Wait() + t.StartRefreshJob() + t.StartDownloadsJob() + t.StartRepairJob() + t.StartDumpJob() + + t.setNewLatestState(t.getCurrentState()) + + t.TriggerRepair(nil) + t.log.Info("Applying media info details to all torrents") allTorrents, _ := t.DirectoryMap.Get(INT_ALL) allTorrents.IterCb(func(_ string, torrent *Torrent) { @@ -348,6 +357,63 @@ func (t *TorrentManager) StartDownloadsJob() { }) } +func (t *TorrentManager) dumpTorrents() { + files := t.getTorrentFiles("data") + for file := range files.Iter() { + destPath := "dump/" + filepath.Base(file) + if err := copyFile(file, destPath); err != nil { + t.log.Warnf("Cannot copy file %s to %s: %v", file, destPath, err) + } + } +} + +func copyFile(sourcePath, destPath string) error { + source, err := os.Open(sourcePath) + if err != nil { + return err + } + defer source.Close() + + destination, err := os.Create(destPath) + if err != nil { + return err + } + defer destination.Close() + + buf := make([]byte, 4096) + for { + n, err := source.Read(buf) + if err != nil && err != io.EOF { + return err + } + if n == 0 { + break + } + + if _, err := destination.Write(buf[:n]); err != nil { + return err + } + } + + return nil +} + +func (t *TorrentManager) StartDumpJob() { + _ = t.workerPool.Submit(func() { + dumpTicker := time.NewTicker(time.Duration(t.Config.GetDumpTorrentsEveryMins()) * time.Minute) + defer dumpTicker.Stop() + + for { + select { + case <-dumpTicker.C: + t.dumpTorrents() + case <-t.DumpTrigger: + t.dumpTorrents() + } + } + }) +} + func (t *TorrentManager) initializeDirectoryMaps() { // create internal directories t.DirectoryMap.Set(INT_ALL, cmap.New[*Torrent]()) // key is GetAccessKey() diff --git a/pkg/realdebrid/api.go b/pkg/realdebrid/api.go index c78bb9b..117c2b3 100644 --- a/pkg/realdebrid/api.go +++ b/pkg/realdebrid/api.go @@ -362,7 +362,7 @@ func (rd *RealDebrid) GetDownloads() []Download { return nil } - const maxItems = 50000 + maxItems := rd.cfg.GetDownloadsLimit() // reset allDownloads allDownloads := []Download{}