Fix issue on downloads

This commit is contained in:
Ben Sarmiento
2024-01-31 22:39:42 +01:00
parent cdb04fcf52
commit 0810e8203f
5 changed files with 240 additions and 192 deletions

View File

@@ -29,6 +29,7 @@ type ConfigInterface interface {
EnableDownloadMount() bool EnableDownloadMount() bool
GetRateLimitSleepSecs() int GetRateLimitSleepSecs() int
ShouldDeleteRarFiles() bool ShouldDeleteRarFiles() bool
GetDownloadsEveryMins() int
} }
type ZurgConfig struct { type ZurgConfig struct {
@@ -43,6 +44,7 @@ type ZurgConfig struct {
NumOfWorkers int `yaml:"concurrent_workers" json:"concurrent_workers"` NumOfWorkers int `yaml:"concurrent_workers" json:"concurrent_workers"`
RefreshEverySecs int `yaml:"check_for_changes_every_secs" json:"check_for_changes_every_secs"` RefreshEverySecs int `yaml:"check_for_changes_every_secs" json:"check_for_changes_every_secs"`
RepairEveryMins int `yaml:"repair_every_mins" json:"repair_every_mins"` RepairEveryMins int `yaml:"repair_every_mins" json:"repair_every_mins"`
DownloadsEveryMins int `yaml:"downloads_every_mins" json:"downloads_every_mins"`
IgnoreRenames bool `yaml:"ignore_renames" json:"ignore_renames"` IgnoreRenames bool `yaml:"ignore_renames" json:"ignore_renames"`
RetainRDTorrentName bool `yaml:"retain_rd_torrent_name" json:"retain_rd_torrent_name"` RetainRDTorrentName bool `yaml:"retain_rd_torrent_name" json:"retain_rd_torrent_name"`
@@ -119,6 +121,13 @@ func (z *ZurgConfig) GetRepairEveryMins() int {
return z.RepairEveryMins return z.RepairEveryMins
} }
func (z *ZurgConfig) GetDownloadsEveryMins() int {
if z.DownloadsEveryMins == 0 {
return 60
}
return z.DownloadsEveryMins
}
func (z *ZurgConfig) EnableRepair() bool { func (z *ZurgConfig) EnableRepair() bool {
return z.CanRepair return z.CanRepair
} }

View File

@@ -158,7 +158,7 @@ func (zr *Handlers) handleHome(resp http.ResponseWriter, req *http.Request) {
<td>%s</td> <td>%s</td>
</tr> </tr>
<tr> <tr>
<td rowspan="20">Config</td> <td rowspan="22">Config</td>
<td>Version</td> <td>Version</td>
<td>%s</td> <td>%s</td>
</tr> </tr>
@@ -194,6 +194,10 @@ func (zr *Handlers) handleHome(resp http.ResponseWriter, req *http.Request) {
<td>Can Repair</td> <td>Can Repair</td>
<td>%t</td> <td>%t</td>
</tr> </tr>
<tr>
<td>Repair Every...</td>
<td>%d mins</td>
</tr>
<tr> <tr>
<td>Delete Rar Files</td> <td>Delete Rar Files</td>
<td>%t</td> <td>%t</td>
@@ -210,6 +214,10 @@ func (zr *Handlers) handleHome(resp http.ResponseWriter, req *http.Request) {
<td>Use Download Mount</td> <td>Use Download Mount</td>
<td>%t</td> <td>%t</td>
</tr> </tr>
<tr>
<td>Refresh Download Mount Every...</td>
<td>%d mins</td>
</tr>
<tr> <tr>
<td>Rate Limit Sleep for...</td> <td>Rate Limit Sleep for...</td>
<td>%d secs</td> <td>%d secs</td>
@@ -241,10 +249,21 @@ func (zr *Handlers) handleHome(resp http.ResponseWriter, req *http.Request) {
<tr> <tr>
<td>Utilities</td> <td>Utilities</td>
<td colspan="2"> <td colspan="2">
<button type="button" onclick="window.open('/logs/upload')">Upload logs</button> <form method="get" action="/logs/upload">
<button type="button" onclick="window.open('/reboot/worker')">Reboot worker pool</button> <input type="submit" value="Upload logs" />
<button type="button" onclick="window.open('/reboot/refresh')">Reboot refresh worker</button> </form>
<button type="button" onclick="window.open('/reboot/repair')">Reboot repair worker</button> <form method="post" action="/reboot/worker">
<input type="submit" value="Reboot worker pool" />
</form>
<form method="post" action="/reboot/refresh">
<input type="submit" value="Reboot refresh worker" />
</form>
<form method="post" action="/reboot/repair">
<input type="submit" value="Reboot repair worker" />
</form>
<form method="post" action="/remount/downloads">
<input type="submit" value="Remount downloads" />
</form>
</td> </td>
</tr> </tr>
</table> </table>
@@ -289,10 +308,12 @@ func (zr *Handlers) handleHome(resp http.ResponseWriter, req *http.Request) {
response.Config.EnableRetainRDTorrentName(), response.Config.EnableRetainRDTorrentName(),
response.Config.EnableRetainFolderNameExtension(), response.Config.EnableRetainFolderNameExtension(),
response.Config.EnableRepair(), response.Config.EnableRepair(),
response.Config.GetRepairEveryMins(),
response.Config.ShouldDeleteRarFiles(), response.Config.ShouldDeleteRarFiles(),
response.Config.GetApiTimeoutSecs(), response.Config.GetApiTimeoutSecs(),
response.Config.GetDownloadTimeoutSecs(), response.Config.GetDownloadTimeoutSecs(),
response.Config.EnableDownloadMount(), response.Config.EnableDownloadMount(),
response.Config.GetDownloadsEveryMins(),
response.Config.GetRateLimitSleepSecs(), response.Config.GetRateLimitSleepSecs(),
response.Config.GetRetriesUntilFailed(), response.Config.GetRetriesUntilFailed(),
response.Config.GetNetworkBufferSize(), response.Config.GetNetworkBufferSize(),
@@ -306,24 +327,34 @@ func (zr *Handlers) handleHome(resp http.ResponseWriter, req *http.Request) {
} }
func (zr *Handlers) handleRebootWorkerPool(resp http.ResponseWriter, req *http.Request) { func (zr *Handlers) handleRebootWorkerPool(resp http.ResponseWriter, req *http.Request) {
// zr.workerPool.Release() resp.Header().Set("Refresh", "2; url=/")
zr.workerPool.Release()
zr.workerPool.Reboot() zr.workerPool.Reboot()
zr.log.Infof("Rebooted worker pool") zr.log.Infof("Rebooted worker pool")
fmt.Fprint(resp, "Rebooted worker pool, please close this window") fmt.Fprint(resp, "Rebooting worker pool...")
} }
func (zr *Handlers) handleRebootRefreshPool(resp http.ResponseWriter, req *http.Request) { func (zr *Handlers) handleRebootRefreshWorker(resp http.ResponseWriter, req *http.Request) {
resp.Header().Set("Refresh", "2; url=/")
zr.torMgr.RefreshKillSwitch <- struct{}{} zr.torMgr.RefreshKillSwitch <- struct{}{}
zr.torMgr.StartRefreshJob() zr.torMgr.StartRefreshJob()
zr.log.Infof("Rebooted refresh worker") zr.log.Infof("Rebooted refresh worker")
fmt.Fprint(resp, "Rebooted refresh worker, please close this window") fmt.Fprint(resp, "Rebooting refresh worker...")
} }
func (zr *Handlers) handleRebootRepairPool(resp http.ResponseWriter, req *http.Request) { func (zr *Handlers) handleRebootRepairWorker(resp http.ResponseWriter, req *http.Request) {
resp.Header().Set("Refresh", "2; url=/")
zr.torMgr.RepairKillSwitch <- struct{}{} zr.torMgr.RepairKillSwitch <- struct{}{}
zr.torMgr.StartRepairJob() zr.torMgr.StartRepairJob()
zr.log.Infof("Rebooted repair worker") zr.log.Infof("Rebooted repair worker")
fmt.Fprint(resp, "Rebooted repair worker, please close this window") fmt.Fprint(resp, "Rebooting repair worker...")
}
func (zr *Handlers) handleRemountDownloads(resp http.ResponseWriter, req *http.Request) {
resp.Header().Set("Refresh", "2; url=/")
zr.torMgr.RemountTrigger <- struct{}{}
zr.log.Infof("Triggered remount of downloads")
fmt.Fprint(resp, "Remounting downloads...")
} }
func bToMb(b uint64) uint64 { func bToMb(b uint64) uint64 {

View File

@@ -25,8 +25,6 @@ type Handlers struct {
cfg config.ConfigInterface cfg config.ConfigInterface
api *realdebrid.RealDebrid api *realdebrid.RealDebrid
workerPool *ants.Pool workerPool *ants.Pool
refreshPool *ants.Pool
repairPool *ants.Pool
log *logutil.Logger log *logutil.Logger
} }
@@ -52,9 +50,10 @@ func AttachHandlers(router *chi.Mux, downloader *universal.Downloader, torMgr *t
router.Use(hs.options) router.Use(hs.options)
router.Get("/", hs.handleHome) router.Get("/", hs.handleHome)
router.Get("/reboot/worker", hs.handleRebootWorkerPool) router.Post("/reboot/worker", hs.handleRebootWorkerPool)
router.Get("/reboot/refresh", hs.handleRebootRefreshPool) router.Post("/reboot/refresh", hs.handleRebootRefreshWorker)
router.Get("/reboot/repair", hs.handleRebootRepairPool) router.Post("/reboot/repair", hs.handleRebootRepairWorker)
router.Post("/remount/downloads", hs.handleRemountDownloads)
// version // version
router.Get(fmt.Sprintf("/{mountType}/%s", version.FILE), hs.handleVersionFile) router.Get(fmt.Sprintf("/{mountType}/%s", version.FILE), hs.handleVersionFile)
router.Head(fmt.Sprintf("/{mountType}/%s", version.FILE), hs.handleCheckVersionFile) router.Head(fmt.Sprintf("/{mountType}/%s", version.FILE), hs.handleCheckVersionFile)

View File

@@ -6,6 +6,7 @@ import (
"path/filepath" "path/filepath"
"strings" "strings"
"sync" "sync"
"time"
"github.com/debridmediamanager/zurg/internal/config" "github.com/debridmediamanager/zurg/internal/config"
"github.com/debridmediamanager/zurg/pkg/logutil" "github.com/debridmediamanager/zurg/pkg/logutil"
@@ -33,6 +34,7 @@ type TorrentManager struct {
workerPool *ants.Pool workerPool *ants.Pool
RefreshKillSwitch chan struct{} RefreshKillSwitch chan struct{}
RepairKillSwitch chan struct{} RepairKillSwitch chan struct{}
RemountTrigger chan struct{}
repairTrigger chan *Torrent repairTrigger chan *Torrent
repairSet mapset.Set[*Torrent] repairSet mapset.Set[*Torrent]
repairRunning bool repairRunning bool
@@ -49,9 +51,9 @@ func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, w
Api: api, Api: api,
DirectoryMap: cmap.New[cmap.ConcurrentMap[string, *Torrent]](), DirectoryMap: cmap.New[cmap.ConcurrentMap[string, *Torrent]](),
DownloadCache: cmap.New[*realdebrid.Download](), DownloadCache: cmap.New[*realdebrid.Download](),
DownloadMap: cmap.New[*realdebrid.Download](),
RefreshKillSwitch: make(chan struct{}, 1), RefreshKillSwitch: make(chan struct{}, 1),
RepairKillSwitch: make(chan struct{}, 1), RepairKillSwitch: make(chan struct{}, 1),
RemountTrigger: make(chan struct{}, 1),
allAccessKeys: mapset.NewSet[string](), allAccessKeys: mapset.NewSet[string](),
latestState: &LibraryState{}, latestState: &LibraryState{},
requiredVersion: "0.9.3-hotfix.4", requiredVersion: "0.9.3-hotfix.4",
@@ -60,11 +62,12 @@ func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, w
} }
t.fixers = t.readFixersFromFile() t.fixers = t.readFixersFromFile()
t.initializeDirectories() t.initializeDirectories()
t.mountDownloads()
t.refreshTorrents() t.refreshTorrents()
t.setNewLatestState(t.getCurrentState()) t.setNewLatestState(t.getCurrentState())
t.StartRefreshJob() t.StartRefreshJob()
t.StartRepairJob() t.StartRepairJob()
t.mountDownloads()
t.StartDownloadsJob()
return t return t
} }
@@ -85,8 +88,10 @@ func (t *TorrentManager) UnrestrictUntilOk(link string) *realdebrid.Download {
} }
if ret != nil && ret.Link != "" && ret.Filename != "" { if ret != nil && ret.Link != "" && ret.Filename != "" {
t.DownloadCache.Set(ret.Link, ret) t.DownloadCache.Set(ret.Link, ret)
if t.Config.EnableDownloadMount() {
t.DownloadMap.Set(ret.Filename, ret) t.DownloadMap.Set(ret.Filename, ret)
} }
}
return ret return ret
} }
@@ -179,6 +184,7 @@ func (t *TorrentManager) mountDownloads() {
if !t.Config.EnableDownloadMount() { if !t.Config.EnableDownloadMount() {
return return
} }
t.DownloadMap = cmap.New[*realdebrid.Download]()
_ = t.workerPool.Submit(func() { _ = t.workerPool.Submit(func() {
page := 1 page := 1
offset := 0 offset := 0
@@ -196,7 +202,23 @@ func (t *TorrentManager) mountDownloads() {
break break
} }
} }
t.log.Infof("Compiled into %d downloads", t.DownloadCache.Count()) t.log.Infof("Compiled into %d downloads", t.DownloadMap.Count())
})
}
func (t *TorrentManager) StartDownloadsJob() {
_ = t.workerPool.Submit(func() {
remountTicker := time.NewTicker(time.Duration(t.Config.GetDownloadsEveryMins()) * time.Minute)
defer remountTicker.Stop()
for {
select {
case <-remountTicker.C:
t.mountDownloads()
case <-t.RemountTrigger:
t.mountDownloads()
}
}
}) })
} }

View File

@@ -104,19 +104,6 @@ func (dl *Downloader) DownloadLink(fileName, link string, resp http.ResponseWrit
http.Error(resp, "File is not available", http.StatusInternalServerError) http.Error(resp, "File is not available", http.StatusInternalServerError)
return return
} else { } else {
lFilename := strings.ToLower(fileName)
unrestrictFilename := strings.ToLower(strings.TrimPrefix(unrestrict.Filename, "/"))
if strings.Contains(lFilename, unrestrictFilename) {
// this is possible if there's only 1 streamable file in the torrent
// and then suddenly it's a rar file
actualExt := filepath.Ext(unrestrictFilename)
expectedExt := filepath.Ext(lFilename)
if actualExt != expectedExt && unrestrict.Streamable != 1 {
log.Warnf("File was changed and is not streamable: %s and %s (link=%s)", fileName, unrestrict.Filename, unrestrict.Link)
} else {
log.Warnf("Filename mismatch: %s and %s", fileName, unrestrict.Filename)
}
}
if cfg.ShouldServeFromRclone() { if cfg.ShouldServeFromRclone() {
if cfg.ShouldVerifyDownloadLink() { if cfg.ShouldVerifyDownloadLink() {
if !dl.client.CanFetchFirstByte(unrestrict.Download) { if !dl.client.CanFetchFirstByte(unrestrict.Download) {