Dump torrents job
This commit is contained in:
@@ -30,6 +30,8 @@ type ConfigInterface interface {
|
||||
GetRateLimitSleepSecs() int
|
||||
ShouldDeleteRarFiles() bool
|
||||
GetDownloadsEveryMins() int
|
||||
GetDownloadsLimit() int
|
||||
GetDumpTorrentsEveryMins() int
|
||||
GetPlayableExtensions() []string
|
||||
GetTorrentsCount() int
|
||||
}
|
||||
@@ -42,6 +44,8 @@ type ZurgConfig struct {
|
||||
CanRepair bool `yaml:"enable_repair" json:"enable_repair"`
|
||||
DeleteRarFiles bool `yaml:"auto_delete_rar_torrents" json:"auto_delete_rar_torrents"`
|
||||
DownloadsEveryMins int `yaml:"downloads_every_mins" json:"downloads_every_mins"`
|
||||
DownloadsLimit int `yaml:"downloads_limit" json:"downloads_limit"`
|
||||
DumpTorrentsEveryMins int `yaml:"dump_torrents_every_mins" json:"dump_torrents_every_mins"`
|
||||
DownloadTimeoutSecs int `yaml:"download_timeout_secs" json:"download_timeout_secs"`
|
||||
ForceIPv6 bool `yaml:"force_ipv6" json:"force_ipv6"`
|
||||
Host string `yaml:"host" json:"host"`
|
||||
@@ -132,6 +136,20 @@ func (z *ZurgConfig) GetDownloadsEveryMins() int {
|
||||
return z.DownloadsEveryMins
|
||||
}
|
||||
|
||||
func (z *ZurgConfig) GetDownloadsLimit() int {
|
||||
if z.DownloadsLimit == 0 {
|
||||
return 50000
|
||||
}
|
||||
return z.DownloadsLimit
|
||||
}
|
||||
|
||||
func (z *ZurgConfig) GetDumpTorrentsEveryMins() int {
|
||||
if z.DumpTorrentsEveryMins == 0 {
|
||||
return 60
|
||||
}
|
||||
return z.DumpTorrentsEveryMins
|
||||
}
|
||||
|
||||
func (z *ZurgConfig) EnableRepair() bool {
|
||||
return z.CanRepair
|
||||
}
|
||||
|
||||
@@ -163,7 +163,7 @@ func (zr *Handlers) handleHome(resp http.ResponseWriter, req *http.Request) {
|
||||
<td>%s</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td rowspan="22">Config</td>
|
||||
<td rowspan="23">Config</td>
|
||||
<td>Version</td>
|
||||
<td>%s</td>
|
||||
</tr>
|
||||
@@ -223,6 +223,14 @@ func (zr *Handlers) handleHome(resp http.ResponseWriter, req *http.Request) {
|
||||
<td>Refresh Download Mount Every...</td>
|
||||
<td>%d mins</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>Get downloads limit</td>
|
||||
<td>%d items</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>Dump Torrents Every...</td>
|
||||
<td>%d mins</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>Rate Limit Sleep for...</td>
|
||||
<td>%d secs</td>
|
||||
@@ -273,6 +281,9 @@ func (zr *Handlers) handleHome(resp http.ResponseWriter, req *http.Request) {
|
||||
<form method="post" action="/remount/downloads">
|
||||
<input type="submit" value="Remount downloads" />
|
||||
</form>
|
||||
<form method="post" action="/dump/torrents">
|
||||
<input type="submit" value="Dump torrents" />
|
||||
</form>
|
||||
</td>
|
||||
</tr>
|
||||
</table>
|
||||
@@ -323,6 +334,8 @@ func (zr *Handlers) handleHome(resp http.ResponseWriter, req *http.Request) {
|
||||
response.Config.GetTorrentsCount(),
|
||||
response.Config.GetDownloadTimeoutSecs(),
|
||||
response.Config.GetDownloadsEveryMins(),
|
||||
response.Config.GetDownloadsLimit(),
|
||||
response.Config.GetDumpTorrentsEveryMins(),
|
||||
response.Config.GetRateLimitSleepSecs(),
|
||||
response.Config.GetRetriesUntilFailed(),
|
||||
response.Config.GetNetworkBufferSize(),
|
||||
@@ -367,6 +380,13 @@ func (zr *Handlers) handleRemountDownloads(resp http.ResponseWriter, req *http.R
|
||||
fmt.Fprint(resp, "Remounting downloads...")
|
||||
}
|
||||
|
||||
func (zr *Handlers) handleDumpTorrents(resp http.ResponseWriter, req *http.Request) {
|
||||
resp.Header().Set("Refresh", "2; url=/")
|
||||
zr.torMgr.DumpTrigger <- struct{}{}
|
||||
zr.log.Infof("Triggered dump of torrents")
|
||||
fmt.Fprint(resp, "Dumping torrents...")
|
||||
}
|
||||
|
||||
func bToMb(b uint64) uint64 {
|
||||
return b / 1024 / 1024
|
||||
}
|
||||
|
||||
@@ -54,6 +54,7 @@ func AttachHandlers(router *chi.Mux, downloader *universal.Downloader, torMgr *t
|
||||
router.Post("/reboot/refresh", hs.handleRebootRefreshWorker)
|
||||
router.Post("/reboot/repair", hs.handleRebootRepairWorker)
|
||||
router.Post("/remount/downloads", hs.handleRemountDownloads)
|
||||
router.Post("/dump/torrents", hs.handleDumpTorrents)
|
||||
// version
|
||||
router.Get(fmt.Sprintf("/{mountType}/%s", version.FILE), hs.handleVersionFile)
|
||||
router.Head(fmt.Sprintf("/{mountType}/%s", version.FILE), hs.handleCheckVersionFile)
|
||||
|
||||
@@ -41,6 +41,7 @@ type TorrentManager struct {
|
||||
RefreshKillSwitch chan struct{}
|
||||
RepairKillSwitch chan struct{}
|
||||
RemountTrigger chan struct{}
|
||||
DumpTrigger chan struct{}
|
||||
|
||||
latestState *LibraryState
|
||||
|
||||
@@ -75,29 +76,37 @@ func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, w
|
||||
RefreshKillSwitch: make(chan struct{}, 1),
|
||||
RepairKillSwitch: make(chan struct{}, 1),
|
||||
RemountTrigger: make(chan struct{}, 1),
|
||||
DumpTrigger: make(chan struct{}, 1),
|
||||
|
||||
latestState: &LibraryState{log: log},
|
||||
}
|
||||
|
||||
t.initializeBins()
|
||||
t.initializeDirectoryMaps()
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(2)
|
||||
|
||||
t.workerPool.Submit(func() {
|
||||
defer wg.Done()
|
||||
t.refreshTorrents()
|
||||
wg.Done()
|
||||
t.setNewLatestState(t.getCurrentState())
|
||||
t.StartRefreshJob()
|
||||
t.StartRepairJob()
|
||||
t.TriggerRepair(nil)
|
||||
})
|
||||
t.workerPool.Submit(func() {
|
||||
defer wg.Done()
|
||||
t.mountNewDownloads()
|
||||
wg.Done()
|
||||
t.StartDownloadsJob()
|
||||
})
|
||||
|
||||
t.workerPool.Submit(func() {
|
||||
wg.Wait()
|
||||
t.StartRefreshJob()
|
||||
t.StartDownloadsJob()
|
||||
t.StartRepairJob()
|
||||
t.StartDumpJob()
|
||||
|
||||
t.setNewLatestState(t.getCurrentState())
|
||||
|
||||
t.TriggerRepair(nil)
|
||||
|
||||
t.log.Info("Applying media info details to all torrents")
|
||||
allTorrents, _ := t.DirectoryMap.Get(INT_ALL)
|
||||
allTorrents.IterCb(func(_ string, torrent *Torrent) {
|
||||
@@ -348,6 +357,63 @@ func (t *TorrentManager) StartDownloadsJob() {
|
||||
})
|
||||
}
|
||||
|
||||
func (t *TorrentManager) dumpTorrents() {
|
||||
files := t.getTorrentFiles("data")
|
||||
for file := range files.Iter() {
|
||||
destPath := "dump/" + filepath.Base(file)
|
||||
if err := copyFile(file, destPath); err != nil {
|
||||
t.log.Warnf("Cannot copy file %s to %s: %v", file, destPath, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func copyFile(sourcePath, destPath string) error {
|
||||
source, err := os.Open(sourcePath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer source.Close()
|
||||
|
||||
destination, err := os.Create(destPath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer destination.Close()
|
||||
|
||||
buf := make([]byte, 4096)
|
||||
for {
|
||||
n, err := source.Read(buf)
|
||||
if err != nil && err != io.EOF {
|
||||
return err
|
||||
}
|
||||
if n == 0 {
|
||||
break
|
||||
}
|
||||
|
||||
if _, err := destination.Write(buf[:n]); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *TorrentManager) StartDumpJob() {
|
||||
_ = t.workerPool.Submit(func() {
|
||||
dumpTicker := time.NewTicker(time.Duration(t.Config.GetDumpTorrentsEveryMins()) * time.Minute)
|
||||
defer dumpTicker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-dumpTicker.C:
|
||||
t.dumpTorrents()
|
||||
case <-t.DumpTrigger:
|
||||
t.dumpTorrents()
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func (t *TorrentManager) initializeDirectoryMaps() {
|
||||
// create internal directories
|
||||
t.DirectoryMap.Set(INT_ALL, cmap.New[*Torrent]()) // key is GetAccessKey()
|
||||
|
||||
@@ -362,7 +362,7 @@ func (rd *RealDebrid) GetDownloads() []Download {
|
||||
return nil
|
||||
}
|
||||
|
||||
const maxItems = 50000
|
||||
maxItems := rd.cfg.GetDownloadsLimit()
|
||||
|
||||
// reset allDownloads
|
||||
allDownloads := []Download{}
|
||||
|
||||
Reference in New Issue
Block a user