Add configs
This commit is contained in:
@@ -20,6 +20,8 @@ type ConfigInterface interface {
|
||||
GetRandomPreferredHost() string
|
||||
ShouldServeFromRclone() bool
|
||||
ShouldForceIPv6() bool
|
||||
GetUnrestrictWorkers() int
|
||||
GetReleaseUnrestrictAfterMs() int
|
||||
}
|
||||
|
||||
type ZurgConfig struct {
|
||||
@@ -28,6 +30,8 @@ type ZurgConfig struct {
|
||||
Host string `yaml:"host"`
|
||||
Port string `yaml:"port"`
|
||||
NumOfWorkers int `yaml:"concurrent_workers"`
|
||||
UnrestrictWorkers int `yaml:"unrestrict_workers"`
|
||||
ReleaseUnrestrictAfterMs int `yaml:"release_unrestrict_after_ms"`
|
||||
RefreshEverySeconds int `yaml:"check_for_changes_every_secs"`
|
||||
CanRepair bool `yaml:"enable_repair"`
|
||||
OnLibraryUpdate string `yaml:"on_library_update"`
|
||||
@@ -60,7 +64,7 @@ func (z *ZurgConfig) GetPort() string {
|
||||
|
||||
func (z *ZurgConfig) GetNumOfWorkers() int {
|
||||
if z.NumOfWorkers == 0 {
|
||||
return 10
|
||||
return 50
|
||||
}
|
||||
return z.NumOfWorkers
|
||||
}
|
||||
@@ -117,3 +121,17 @@ func (z *ZurgConfig) ShouldServeFromRclone() bool {
|
||||
func (z *ZurgConfig) ShouldForceIPv6() bool {
|
||||
return z.ForceIPv6
|
||||
}
|
||||
|
||||
func (z *ZurgConfig) GetUnrestrictWorkers() int {
|
||||
if z.UnrestrictWorkers == 0 {
|
||||
return 20
|
||||
}
|
||||
return z.UnrestrictWorkers
|
||||
}
|
||||
|
||||
func (z *ZurgConfig) GetReleaseUnrestrictAfterMs() int {
|
||||
if z.ReleaseUnrestrictAfterMs == 0 {
|
||||
return 100
|
||||
}
|
||||
return z.ReleaseUnrestrictAfterMs
|
||||
}
|
||||
|
||||
@@ -32,7 +32,7 @@ type TorrentManager struct {
|
||||
latestAdded string
|
||||
requiredVersion string
|
||||
cfg config.ConfigInterface
|
||||
api *realdebrid.RealDebrid
|
||||
Api *realdebrid.RealDebrid
|
||||
antsPool *ants.Pool
|
||||
unrestrictPool *ants.Pool
|
||||
log *zap.SugaredLogger
|
||||
@@ -47,13 +47,13 @@ func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, p
|
||||
cfg: cfg,
|
||||
DirectoryMap: cmap.New[cmap.ConcurrentMap[string, *Torrent]](),
|
||||
requiredVersion: "18.11.2023",
|
||||
api: api,
|
||||
Api: api,
|
||||
antsPool: p,
|
||||
log: log,
|
||||
mu: &sync.Mutex{},
|
||||
}
|
||||
|
||||
unrestrictPool, err := ants.NewPool(10)
|
||||
unrestrictPool, err := ants.NewPool(t.cfg.GetUnrestrictWorkers())
|
||||
if err != nil {
|
||||
t.unrestrictPool = t.antsPool
|
||||
} else {
|
||||
@@ -77,7 +77,7 @@ func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, p
|
||||
// Fetch downloads
|
||||
go func() {
|
||||
defer initWait.Done()
|
||||
downloads, _, err := t.api.GetDownloads()
|
||||
downloads, _, err := t.Api.GetDownloads()
|
||||
if err != nil {
|
||||
t.log.Fatalf("Cannot get downloads: %v\n", err)
|
||||
}
|
||||
@@ -93,7 +93,7 @@ func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, p
|
||||
var newTorrents []realdebrid.Torrent
|
||||
go func() {
|
||||
defer initWait.Done()
|
||||
newTorrents, _, err = t.api.GetTorrents(0)
|
||||
newTorrents, _, err = t.Api.GetTorrents(0)
|
||||
if err != nil {
|
||||
t.log.Fatalf("Cannot get torrents: %v\n", err)
|
||||
}
|
||||
@@ -207,8 +207,8 @@ func (t *TorrentManager) mergeToMain(mainTorrent, torrentToMerge *Torrent) *Torr
|
||||
func (t *TorrentManager) UnrestrictUntilOk(link string) *realdebrid.Download {
|
||||
retChan := make(chan *realdebrid.Download, 1)
|
||||
t.unrestrictPool.Submit(func() {
|
||||
retChan <- t.api.UnrestrictUntilOk(link, t.cfg.ShouldServeFromRclone())
|
||||
time.Sleep(1 * time.Second)
|
||||
retChan <- t.Api.UnrestrictUntilOk(link, t.cfg.ShouldServeFromRclone())
|
||||
time.Sleep(time.Duration(t.cfg.GetReleaseUnrestrictAfterMs()) * time.Millisecond)
|
||||
})
|
||||
defer close(retChan)
|
||||
return <-retChan
|
||||
@@ -234,7 +234,7 @@ func (t *TorrentManager) getChecksum() string {
|
||||
|
||||
// GetTorrents request
|
||||
go func() {
|
||||
torrents, totalCount, err := t.api.GetTorrents(1)
|
||||
torrents, totalCount, err := t.Api.GetTorrents(1)
|
||||
if err != nil {
|
||||
errChan <- err
|
||||
return
|
||||
@@ -244,7 +244,7 @@ func (t *TorrentManager) getChecksum() string {
|
||||
|
||||
// GetActiveTorrentCount request
|
||||
go func() {
|
||||
count, err := t.api.GetActiveTorrentCount()
|
||||
count, err := t.Api.GetActiveTorrentCount()
|
||||
if err != nil {
|
||||
errChan <- err
|
||||
return
|
||||
@@ -287,7 +287,7 @@ func (t *TorrentManager) startRefreshJob() {
|
||||
continue
|
||||
}
|
||||
|
||||
newTorrents, _, err := t.api.GetTorrents(0)
|
||||
newTorrents, _, err := t.Api.GetTorrents(0)
|
||||
if err != nil {
|
||||
t.log.Warnf("Cannot get torrents: %v\n", err)
|
||||
continue
|
||||
@@ -422,7 +422,7 @@ func (t *TorrentManager) getMoreInfo(rdTorrent realdebrid.Torrent) *Torrent {
|
||||
torrentFromFile = nil
|
||||
}
|
||||
if info == nil {
|
||||
info, err = t.api.GetTorrentInfo(rdTorrent.ID)
|
||||
info, err = t.Api.GetTorrentInfo(rdTorrent.ID)
|
||||
if err != nil {
|
||||
t.log.Warnf("Cannot get info for id=%s: %v\n", rdTorrent.ID, err)
|
||||
return nil
|
||||
@@ -544,7 +544,12 @@ func (t *TorrentManager) organizeChaos(links []string, selectedFiles []*File) ([
|
||||
// Use the existing worker pool to submit tasks
|
||||
_ = t.unrestrictPool.Submit(func() {
|
||||
defer wg.Done()
|
||||
resp := t.api.UnrestrictUntilOk(link, t.cfg.ShouldServeFromRclone())
|
||||
if t.DownloadCache.Has(link) {
|
||||
download, _ := t.DownloadCache.Get(link)
|
||||
resultsChan <- Result{Response: download}
|
||||
return
|
||||
}
|
||||
resp := t.Api.UnrestrictUntilOk(link, t.cfg.ShouldServeFromRclone())
|
||||
resultsChan <- Result{Response: resp}
|
||||
time.Sleep(1 * time.Second)
|
||||
})
|
||||
@@ -634,7 +639,7 @@ func (t *TorrentManager) Delete(accessKey string) {
|
||||
if torrent, ok := allTorrents.Get(accessKey); ok {
|
||||
for _, instance := range torrent.Instances {
|
||||
infoCache.Remove(instance.ID)
|
||||
t.api.DeleteTorrent(instance.ID)
|
||||
t.Api.DeleteTorrent(instance.ID)
|
||||
}
|
||||
}
|
||||
t.DirectoryMap.IterCb(func(_ string, torrents cmap.ConcurrentMap[string, *Torrent]) {
|
||||
@@ -779,7 +784,7 @@ func (t *TorrentManager) reinsertTorrent(torrent *Torrent, missingFiles string)
|
||||
}
|
||||
|
||||
// redownload torrent
|
||||
resp, err := t.api.AddMagnetHash(torrent.Instances[0].Hash)
|
||||
resp, err := t.Api.AddMagnetHash(torrent.Instances[0].Hash)
|
||||
if err != nil {
|
||||
t.log.Warnf("Cannot redownload torrent: %v", err)
|
||||
return false
|
||||
@@ -788,25 +793,25 @@ func (t *TorrentManager) reinsertTorrent(torrent *Torrent, missingFiles string)
|
||||
|
||||
// select files
|
||||
newTorrentID := resp.ID
|
||||
err = t.api.SelectTorrentFiles(newTorrentID, missingFiles)
|
||||
err = t.Api.SelectTorrentFiles(newTorrentID, missingFiles)
|
||||
if err != nil {
|
||||
t.log.Warnf("Cannot start redownloading: %v", err)
|
||||
t.api.DeleteTorrent(newTorrentID)
|
||||
t.Api.DeleteTorrent(newTorrentID)
|
||||
return false
|
||||
}
|
||||
time.Sleep(10 * time.Second)
|
||||
|
||||
// see if the torrent is ready
|
||||
info, err := t.api.GetTorrentInfo(newTorrentID)
|
||||
info, err := t.Api.GetTorrentInfo(newTorrentID)
|
||||
if err != nil {
|
||||
t.log.Warnf("Cannot get info on redownloaded torrent id=%s : %v", newTorrentID, err)
|
||||
t.api.DeleteTorrent(newTorrentID)
|
||||
t.Api.DeleteTorrent(newTorrentID)
|
||||
return false
|
||||
}
|
||||
|
||||
if info.Status == "magnet_error" || info.Status == "error" || info.Status == "virus" || info.Status == "dead" {
|
||||
t.log.Warnf("The redownloaded torrent id=%s is in error state: %s", newTorrentID, info.Status)
|
||||
t.api.DeleteTorrent(newTorrentID)
|
||||
t.Api.DeleteTorrent(newTorrentID)
|
||||
return false
|
||||
}
|
||||
|
||||
@@ -818,7 +823,7 @@ func (t *TorrentManager) reinsertTorrent(torrent *Torrent, missingFiles string)
|
||||
missingCount := len(strings.Split(missingFiles, ","))
|
||||
if len(info.Links) != missingCount {
|
||||
t.log.Infof("It did not fix the issue for id=%s, only got %d files but we need %d, undoing", info.ID, len(info.Links), missingCount)
|
||||
t.api.DeleteTorrent(newTorrentID)
|
||||
t.Api.DeleteTorrent(newTorrentID)
|
||||
return false
|
||||
}
|
||||
|
||||
@@ -833,7 +838,7 @@ func (t *TorrentManager) canCapacityHandle() bool {
|
||||
const maxDelay = 60 * time.Second
|
||||
retryCount := 0
|
||||
for {
|
||||
count, err := t.api.GetActiveTorrentCount()
|
||||
count, err := t.Api.GetActiveTorrentCount()
|
||||
if err != nil {
|
||||
t.log.Warnf("Cannot get active downloads count: %v", err)
|
||||
if retryCount >= maxRetries {
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package universal
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/url"
|
||||
@@ -79,12 +80,14 @@ func (gf *GetFile) HandleGetRequest(w http.ResponseWriter, r *http.Request, t *i
|
||||
link := file.Link
|
||||
|
||||
if download, exists := t.DownloadCache.Get(link); exists {
|
||||
if c.ShouldServeFromRclone() {
|
||||
if c.ShouldServeFromRclone() && t.Api.CanFetchFirstByte(download.Download) {
|
||||
redirect(w, r, download.Download, c)
|
||||
} else {
|
||||
gf.streamFileToResponse(file, download.Download, w, r, t, c, log)
|
||||
err := gf.streamCachedLinkToResponse(download.Download, w, r, t, c, log)
|
||||
if err == nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
resp := t.UnrestrictUntilOk(link)
|
||||
@@ -121,6 +124,39 @@ func (gf *GetFile) HandleGetRequest(w http.ResponseWriter, r *http.Request, t *i
|
||||
}
|
||||
}
|
||||
|
||||
func (gf *GetFile) streamCachedLinkToResponse(url string, w http.ResponseWriter, r *http.Request, torMgr *intTor.TorrentManager, cfg config.ConfigInterface, log *zap.SugaredLogger) error {
|
||||
// Create a new request for the file download.
|
||||
req, err := http.NewRequest(http.MethodGet, url, nil)
|
||||
if err != nil {
|
||||
return fmt.Errorf("file is not available")
|
||||
}
|
||||
|
||||
// copy range header if it exists
|
||||
if r.Header.Get("Range") != "" {
|
||||
req.Header.Add("Range", r.Header.Get("Range"))
|
||||
}
|
||||
|
||||
resp, err := gf.client.Do(req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("file is not available")
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusPartialContent {
|
||||
return fmt.Errorf("file is not available")
|
||||
}
|
||||
|
||||
for k, vv := range resp.Header {
|
||||
for _, v := range vv {
|
||||
w.Header().Add(k, v)
|
||||
}
|
||||
}
|
||||
|
||||
buf := make([]byte, cfg.GetNetworkBufferSize())
|
||||
io.CopyBuffer(w, resp.Body, buf)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (gf *GetFile) streamFileToResponse(file *intTor.File, url string, w http.ResponseWriter, r *http.Request, torMgr *intTor.TorrentManager, cfg config.ConfigInterface, log *zap.SugaredLogger) {
|
||||
// Create a new request for the file download.
|
||||
req, err := http.NewRequest(http.MethodGet, url, nil)
|
||||
|
||||
Reference in New Issue
Block a user