This commit is contained in:
Ben Sarmiento
2023-11-27 21:50:00 +01:00
parent a7623de410
commit c8334ecb3b
15 changed files with 277 additions and 221 deletions

View File

@@ -4,13 +4,11 @@ import (
"fmt"
"os"
"github.com/debridmediamanager.com/zurg/pkg/logutil"
"go.uber.org/zap"
"gopkg.in/yaml.v3"
)
func LoadZurgConfig(filename string) (ConfigInterface, error) {
log := logutil.NewLogger().Named("config")
func LoadZurgConfig(filename string, log *zap.SugaredLogger) (ConfigInterface, error) {
log.Debug("Loading config file ", filename)
content, err := os.ReadFile(filename)
if err != nil {

View File

@@ -12,6 +12,10 @@ import (
"gopkg.in/yaml.v3"
)
const (
ALL_TORRENTS = "__all__"
)
func loadV1Config(content []byte, log *zap.SugaredLogger) (*ZurgConfigV1, error) {
var configV1 ZurgConfigV1
if err := yaml.Unmarshal(content, &configV1); err != nil {
@@ -26,12 +30,13 @@ func (z *ZurgConfigV1) GetVersion() string {
}
func (z *ZurgConfigV1) GetDirectories() []string {
rootDirectories := make([]string, len(z.Directories))
rootDirectories := make([]string, len(z.Directories)+1)
i := 0
for directory := range z.Directories {
rootDirectories[i] = directory
i++
}
rootDirectories[i] = ALL_TORRENTS
return rootDirectories
}
@@ -63,11 +68,15 @@ func (z *ZurgConfigV1) GetGroupMap() map[string][]string {
copy(temp, v)
result[k] = temp
}
result[ALL_TORRENTS] = []string{ALL_TORRENTS} // Add special group for all torrents
return result
}
func (z *ZurgConfigV1) MeetsConditions(directory, torrentName string, torrentIDs, fileNames []string) bool {
if directory == ALL_TORRENTS {
return true
}
if _, ok := z.Directories[directory]; !ok {
return false
}

View File

@@ -4,7 +4,6 @@ import (
"fmt"
"net/http"
"path"
"path/filepath"
"sort"
"strings"
@@ -47,11 +46,14 @@ func HandlePropfindRequest(w http.ResponseWriter, r *http.Request, t *torrent.To
func handleListDirectories(w http.ResponseWriter, t *torrent.TorrentManager) error {
fmt.Fprint(w, "<?xml version=\"1.0\" encoding=\"utf-8\"?><d:multistatus xmlns:d=\"DAV:\">")
// initial response is the directory itself
fmt.Fprint(w, dav.Directory("", ""))
fmt.Fprint(w, dav.BaseDirectory("", ""))
directories := t.DirectoryMap.Keys()
sort.Strings(directories)
for _, directory := range directories {
if strings.HasPrefix(directory, "int__") {
continue
}
fmt.Fprint(w, dav.Directory(directory, ""))
}
@@ -68,21 +70,23 @@ func handleListTorrents(w http.ResponseWriter, requestPath string, t *torrent.To
fmt.Fprint(w, "<?xml version=\"1.0\" encoding=\"utf-8\"?><d:multistatus xmlns:d=\"DAV:\">")
// initial response is the directory itself
fmt.Fprint(w, dav.Directory(basePath, ""))
fmt.Fprint(w, dav.BaseDirectory(basePath, ""))
var allTorrents []*torrent.Torrent
torrents.IterCb(func(_ string, tor *torrent.Torrent) {
var allTorrents []torrent.Torrent
torrents.IterCb(func(key string, tor *torrent.Torrent) {
if tor.AllInProgress() {
return
}
allTorrents = append(allTorrents, tor)
copy := *tor
copy.AccessKey = key
allTorrents = append(allTorrents, copy)
})
sort.Slice(allTorrents, func(i, j int) bool {
return allTorrents[i].AccessKey < allTorrents[j].AccessKey
})
for _, tor := range allTorrents {
fmt.Fprint(w, dav.Directory(filepath.Join(basePath, tor.AccessKey), tor.LatestAdded))
fmt.Fprint(w, dav.Directory(tor.AccessKey, tor.LatestAdded))
}
fmt.Fprint(w, "</d:multistatus>")
@@ -90,7 +94,7 @@ func handleListTorrents(w http.ResponseWriter, requestPath string, t *torrent.To
}
func handleListFiles(w http.ResponseWriter, requestPath string, t *torrent.TorrentManager) error {
requestPath = strings.TrimPrefix(requestPath, "/")
requestPath = strings.Trim(requestPath, "/")
basePath := path.Base(path.Dir(requestPath))
torrents, ok := t.DirectoryMap.Get(basePath)
if !ok {
@@ -104,7 +108,7 @@ func handleListFiles(w http.ResponseWriter, requestPath string, t *torrent.Torre
fmt.Fprint(w, "<?xml version=\"1.0\" encoding=\"utf-8\"?><d:multistatus xmlns:d=\"DAV:\">")
// initial response is the directory itself
fmt.Fprint(w, dav.Directory(requestPath, tor.LatestAdded))
fmt.Fprint(w, dav.BaseDirectory(requestPath, tor.LatestAdded))
filenames := tor.SelectedFiles.Keys()
sort.Strings(filenames)
@@ -113,7 +117,7 @@ func handleListFiles(w http.ResponseWriter, requestPath string, t *torrent.Torre
if file == nil || !strings.HasPrefix(file.Link, "http") {
continue
}
fmt.Fprint(w, dav.File(filepath.Join(requestPath, filename), file.Bytes, file.Ended))
fmt.Fprint(w, dav.File(filename, file.Bytes, file.Ended))
}
fmt.Fprint(w, "</d:multistatus>")

View File

@@ -10,12 +10,10 @@ import (
"strings"
"github.com/debridmediamanager.com/zurg/internal/torrent"
"github.com/debridmediamanager.com/zurg/pkg/logutil"
"go.uber.org/zap"
)
func HandleDirectoryListing(w http.ResponseWriter, r *http.Request, t *torrent.TorrentManager) {
log := logutil.NewLogger().Named("http")
func HandleDirectoryListing(w http.ResponseWriter, r *http.Request, t *torrent.TorrentManager, log *zap.SugaredLogger) {
requestPath := path.Clean(r.URL.Path)
var output *string
@@ -56,6 +54,9 @@ func handleRoot(t *torrent.TorrentManager) (*string, error) {
directories := t.DirectoryMap.Keys()
sort.Strings(directories)
for _, directory := range directories {
if strings.HasPrefix(directory, "int__") {
continue
}
directoryPath := url.PathEscape(directory)
htmlDoc += fmt.Sprintf("<li><a href=\"/http/%s/\">%s</a></li>", directoryPath, directory)
}
@@ -72,12 +73,14 @@ func handleListOfTorrents(requestPath string, t *torrent.TorrentManager) (*strin
htmlDoc := "<ol>"
var allTorrents []*torrent.Torrent
torrents.IterCb(func(_ string, tor *torrent.Torrent) {
var allTorrents []torrent.Torrent
torrents.IterCb(func(key string, tor *torrent.Torrent) {
if tor.AllInProgress() {
return
}
allTorrents = append(allTorrents, tor)
copy := *tor
copy.AccessKey = key
allTorrents = append(allTorrents, copy)
})
sort.Slice(allTorrents, func(i, j int) bool {
return allTorrents[i].AccessKey < allTorrents[j].AccessKey

View File

@@ -10,26 +10,24 @@ import (
intHttp "github.com/debridmediamanager.com/zurg/internal/http"
"github.com/debridmediamanager.com/zurg/internal/torrent"
"github.com/debridmediamanager.com/zurg/internal/universal"
"github.com/debridmediamanager.com/zurg/pkg/logutil"
"github.com/hashicorp/golang-lru/v2/expirable"
"go.uber.org/zap"
)
// Router creates a WebDAV router
func Router(mux *http.ServeMux, getfile *universal.GetFile, c config.ConfigInterface, t *torrent.TorrentManager, cache *expirable.LRU[string, string]) {
log := logutil.NewLogger().Named("net")
func Router(mux *http.ServeMux, getfile *universal.GetFile, c config.ConfigInterface, t *torrent.TorrentManager, cache *expirable.LRU[string, string], log *zap.SugaredLogger) {
mux.HandleFunc("/http/", func(w http.ResponseWriter, r *http.Request) {
switch r.Method {
case http.MethodGet:
requestPath := path.Clean(r.URL.Path)
if countNonEmptySegments(strings.Split(requestPath, "/")) > 3 {
getfile.HandleGetRequest(w, r, t, c, cache)
getfile.HandleGetRequest(w, r, t, c, cache, log)
} else {
intHttp.HandleDirectoryListing(w, r, t)
intHttp.HandleDirectoryListing(w, r, t, log)
}
case http.MethodHead:
universal.HandleHeadRequest(w, r, t, cache)
universal.HandleHeadRequest(w, r, t, cache, log)
default:
log.Errorf("Request %s %s not supported yet", r.Method, r.URL.Path)
@@ -38,16 +36,15 @@ func Router(mux *http.ServeMux, getfile *universal.GetFile, c config.ConfigInter
})
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
davlog := logutil.NewLogger().Named("dav")
switch r.Method {
case "PROPFIND":
dav.HandlePropfindRequest(w, r, t, davlog)
dav.HandlePropfindRequest(w, r, t, log)
case http.MethodDelete:
dav.HandleDeleteRequest(w, r, t, davlog)
dav.HandleDeleteRequest(w, r, t, log)
case http.MethodGet:
getfile.HandleGetRequest(w, r, t, c, cache)
getfile.HandleGetRequest(w, r, t, c, cache, log)
case http.MethodOptions:
w.WriteHeader(http.StatusOK)

View File

@@ -12,7 +12,6 @@ import (
"time"
"github.com/debridmediamanager.com/zurg/internal/config"
"github.com/debridmediamanager.com/zurg/pkg/logutil"
"github.com/debridmediamanager.com/zurg/pkg/realdebrid"
"github.com/debridmediamanager.com/zurg/pkg/utils"
cmap "github.com/orcaman/concurrent-map/v2"
@@ -21,8 +20,9 @@ import (
)
const (
ALL_TORRENTS = "__all__"
DATA_DIR = "data"
INT_ALL = "int__all__"
INT_INFO_CACHE = "int__info__"
DATA_DIR = "data"
)
type TorrentManager struct {
@@ -33,6 +33,7 @@ type TorrentManager struct {
cfg config.ConfigInterface
api *realdebrid.RealDebrid
antsPool *ants.Pool
unrestrictPool *ants.Pool
log *zap.SugaredLogger
mu *sync.Mutex
}
@@ -40,21 +41,30 @@ type TorrentManager struct {
// NewTorrentManager creates a new torrent manager
// it will fetch all torrents and their info in the background
// and store them in-memory and cached in files
func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, p *ants.Pool) *TorrentManager {
func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, p *ants.Pool, log *zap.SugaredLogger) *TorrentManager {
t := &TorrentManager{
cfg: cfg,
DirectoryMap: cmap.New[cmap.ConcurrentMap[string, *Torrent]](),
requiredVersion: "18.11.2023",
api: api,
antsPool: p,
log: logutil.NewLogger().Named("manager"),
log: log,
mu: &sync.Mutex{},
}
unrestrictPool, err := ants.NewPool(10)
if err != nil {
t.unrestrictPool = t.antsPool
} else {
t.unrestrictPool = unrestrictPool
}
ensureDir(DATA_DIR)
// create special directory
t.DirectoryMap.Set("__all__", cmap.New[*Torrent]()) // key is AccessKey
// create internal directories
t.DirectoryMap.Set(INT_ALL, cmap.New[*Torrent]()) // key is AccessKey
t.DirectoryMap.Set(INT_INFO_CACHE, cmap.New[*Torrent]()) // key is Torrent ID
// create directory maps
for _, directory := range cfg.GetDirectories() {
t.DirectoryMap.Set(directory, cmap.New[*Torrent]())
@@ -82,18 +92,18 @@ func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, p
noInfoCount := 0
allCt := 0
allTorrents, _ := t.DirectoryMap.Get(ALL_TORRENTS)
allTorrents, _ := t.DirectoryMap.Get(INT_ALL)
for info := range torrentsChan {
allCt++
if info == nil {
noInfoCount++
continue
}
if torrent, exists := allTorrents.Get(info.AccessKey); exists {
if torrent, exists := allTorrents.Get(info.AccessKey); !exists {
allTorrents.Set(info.AccessKey, info)
} else {
mainTorrent := t.mergeToMain(torrent, info)
allTorrents.Set(info.AccessKey, mainTorrent)
} else {
allTorrents.Set(info.AccessKey, info)
}
}
@@ -134,6 +144,7 @@ func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, p
go t.startRefreshJob()
t.latestAdded = newTorrents[0].Added // set the latest added to the first torrent's added
t.log.Info("Finished initializing torrent manager")
return t
}
@@ -168,8 +179,14 @@ func (t *TorrentManager) mergeToMain(mainTorrent, torrentToMerge *Torrent) *Torr
// proxy
func (t *TorrentManager) UnrestrictUntilOk(link string) *realdebrid.UnrestrictResponse {
ret := t.api.UnrestrictUntilOk(link)
return ret
retChan := make(chan *realdebrid.UnrestrictResponse, 1)
t.unrestrictPool.Submit(func() {
retChan <- t.api.UnrestrictUntilOk(link, t.cfg.ShouldServeFromRclone())
time.Sleep(1 * time.Second)
})
defer close(retChan)
return <-retChan
// return t.api.UnrestrictUntilOk(link, t.cfg.ShouldServeFromRclone())
}
type torrentsResponse struct {
@@ -178,9 +195,9 @@ type torrentsResponse struct {
}
func (t *TorrentManager) SetChecksum(checksum string) {
t.mu.Lock()
// t.mu.Lock()
t.checksum = checksum
t.mu.Unlock()
// t.mu.Unlock()
}
// generates a checksum based on the number of torrents, the first torrent id and the number of active torrents
@@ -251,6 +268,23 @@ func (t *TorrentManager) startRefreshJob() {
}
t.log.Infof("Detected changes! Refreshing %d torrents", len(newTorrents))
// handle deleted torrents in info cache
keep := make(map[string]bool)
for _, torrent := range newTorrents {
keep[torrent.ID] = true
}
var toDelete []string
infoCache, _ := t.DirectoryMap.Get(INT_INFO_CACHE)
infoCache.IterCb(func(torrentID string, torrent *Torrent) {
if _, ok := keep[torrentID]; !ok {
toDelete = append(toDelete, torrentID)
}
})
for _, torrentID := range toDelete {
infoCache.Remove(torrentID)
}
// end info cache cleanup
torrentsChan := make(chan *Torrent, len(newTorrents))
var wg sync.WaitGroup
for i := range newTorrents {
@@ -267,20 +301,20 @@ func (t *TorrentManager) startRefreshJob() {
t.log.Infof("Fetched info for %d torrents", len(newTorrents))
noInfoCount := 0
oldTorrents, _ := t.DirectoryMap.Get(ALL_TORRENTS)
oldTorrents, _ := t.DirectoryMap.Get(INT_ALL)
newSet := cmap.New[*Torrent]()
for info := range torrentsChan {
if info == nil {
noInfoCount++
continue
}
if torrent, exists := oldTorrents.Get(info.AccessKey); exists {
if torrent, exists := oldTorrents.Get(info.AccessKey); !exists {
oldTorrents.Set(info.AccessKey, info)
newSet.Set(info.AccessKey, info)
} else {
mainTorrent := t.mergeToMain(torrent, info)
oldTorrents.Set(info.AccessKey, mainTorrent)
newSet.Set(info.AccessKey, mainTorrent)
} else {
oldTorrents.Set(info.AccessKey, info)
newSet.Set(info.AccessKey, info)
}
}
@@ -333,15 +367,24 @@ func (t *TorrentManager) startRefreshJob() {
t.log.Info("Checking for torrents to repair")
t.repairAll()
t.log.Info("Finished checking for torrents to repair")
} else {
t.log.Info("Repair is disabled, skipping repair check")
}
go OnLibraryUpdateHook(updatedPaths, t.cfg, t.log)
t.latestAdded = newTorrents[0].Added
t.log.Info("Finished refreshing torrents")
}
}
// getMoreInfo gets original name, size and files for a torrent
func (t *TorrentManager) getMoreInfo(rdTorrent realdebrid.Torrent) *Torrent {
infoCache, _ := t.DirectoryMap.Get(INT_INFO_CACHE)
if infoCache.Has(rdTorrent.ID) {
tor, _ := infoCache.Get(rdTorrent.ID)
return tor
}
var info *realdebrid.TorrentInfo
var err error
// file cache
@@ -399,6 +442,9 @@ func (t *TorrentManager) getMoreInfo(rdTorrent realdebrid.Torrent) *Torrent {
if len(selectedFiles) > 0 && torrentFromFile == nil {
t.writeToFile(info) // only when there are selected files, else it's useless
}
infoCache.Set(rdTorrent.ID, &torrent)
return &torrent
}
@@ -470,10 +516,11 @@ func (t *TorrentManager) organizeChaos(links []string, selectedFiles []*File) ([
wg.Add(1)
link := link // redeclare to avoid closure on loop variable
// Use the existing worker pool to submit tasks
_ = t.antsPool.Submit(func() {
_ = t.unrestrictPool.Submit(func() {
defer wg.Done()
resp := t.api.UnrestrictUntilOk(link)
resp := t.api.UnrestrictUntilOk(link, t.cfg.ShouldServeFromRclone())
resultsChan <- Result{Response: resp}
time.Sleep(1 * time.Second)
})
}
@@ -522,7 +569,8 @@ func (t *TorrentManager) repairAll() {
return
}
allTorrents, _ := t.DirectoryMap.Get(ALL_TORRENTS)
var toDelete []string
allTorrents, _ := t.DirectoryMap.Get(INT_ALL)
allTorrents.IterCb(func(_ string, torrent *Torrent) {
if torrent.AnyInProgress() {
t.log.Debugf("Skipping %s for repairs because it is in progress", torrent.AccessKey)
@@ -531,7 +579,7 @@ func (t *TorrentManager) repairAll() {
forRepair := false
unselected := 0
torrent.SelectedFiles.IterCb(func(_ string, file *File) {
if file.Link == "repair" {
if file.Link == "repair" && !forRepair {
t.log.Debugf("Found a file to repair for torrent %s", torrent.AccessKey)
forRepair = true
}
@@ -545,20 +593,29 @@ func (t *TorrentManager) repairAll() {
}
if unselected == torrent.SelectedFiles.Count() && unselected > 0 {
t.log.Infof("Deleting %s", torrent.AccessKey)
t.Delete(torrent.AccessKey)
toDelete = append(toDelete, torrent.AccessKey)
}
})
for _, accessKey := range toDelete {
t.Delete(accessKey)
}
}
func (t *TorrentManager) Delete(accessKey string) {
infoCache, _ := t.DirectoryMap.Get(INT_INFO_CACHE)
t.log.Infof("Deleting torrent %s", accessKey)
allTorrents, _ := t.DirectoryMap.Get(ALL_TORRENTS)
allTorrents, _ := t.DirectoryMap.Get(INT_ALL)
if torrent, ok := allTorrents.Get(accessKey); ok {
for _, instance := range torrent.Instances {
infoCache.Remove(instance.ID)
t.api.DeleteTorrent(instance.ID)
}
allTorrents.Remove(accessKey)
}
t.DirectoryMap.IterCb(func(_ string, torrents cmap.ConcurrentMap[string, *Torrent]) {
if _, ok := torrents.Get(accessKey); ok {
torrents.Remove(accessKey)
}
})
}
func (t *TorrentManager) Repair(accessKey string) {
@@ -567,7 +624,7 @@ func (t *TorrentManager) Repair(accessKey string) {
return
}
allTorrents, _ := t.DirectoryMap.Get(ALL_TORRENTS)
allTorrents, _ := t.DirectoryMap.Get(INT_ALL)
torrent, _ := allTorrents.Get(accessKey)
if torrent == nil {
t.log.Warnf("Cannot find torrent %s anymore so we are not repairing it", accessKey)

View File

@@ -13,7 +13,6 @@ import (
intHttp "github.com/debridmediamanager.com/zurg/internal/http"
intTor "github.com/debridmediamanager.com/zurg/internal/torrent"
zurghttp "github.com/debridmediamanager.com/zurg/pkg/http"
"github.com/debridmediamanager.com/zurg/pkg/logutil"
"github.com/hashicorp/golang-lru/v2/expirable"
"go.uber.org/zap"
)
@@ -27,9 +26,7 @@ func NewGetFile(client *zurghttp.HTTPClient) *GetFile {
}
// HandleGetRequest handles a GET request universally for both WebDAV and HTTP
func (gf *GetFile) HandleGetRequest(w http.ResponseWriter, r *http.Request, t *intTor.TorrentManager, c config.ConfigInterface, cache *expirable.LRU[string, string]) {
log := logutil.NewLogger().Named("file")
func (gf *GetFile) HandleGetRequest(w http.ResponseWriter, r *http.Request, t *intTor.TorrentManager, c config.ConfigInterface, cache *expirable.LRU[string, string], log *zap.SugaredLogger) {
requestPath := path.Clean(r.URL.Path)
isDav := true
if strings.Contains(requestPath, "/http") {
@@ -45,7 +42,7 @@ func (gf *GetFile) HandleGetRequest(w http.ResponseWriter, r *http.Request, t *i
if isDav {
dav.HandlePropfindRequest(w, r, t, log)
} else {
intHttp.HandleDirectoryListing(w, r, t)
intHttp.HandleDirectoryListing(w, r, t, log)
}
return
}
@@ -93,10 +90,10 @@ func (gf *GetFile) HandleGetRequest(w http.ResponseWriter, r *http.Request, t *i
resp := t.UnrestrictUntilOk(link)
if resp == nil {
log.Warnf("File %s is no longer available", filepath.Base(file.Path))
// log.Warnf("File %s is no longer available, link %s", filepath.Base(file.Path), link)
file.Link = "repair"
if c.EnableRepair() {
log.Debugf("File %s is marked for repair", filepath.Base(file.Path))
// log.Debugf("File %s is marked for repair", filepath.Base(file.Path))
t.SetChecksum("") // force a recheck
}
http.Error(w, "File is not available", http.StatusNotFound)
@@ -147,7 +144,7 @@ func (gf *GetFile) streamFileToResponse(file *intTor.File, url string, w http.Re
log.Warnf("Cannot download file %s: %v", file.Path, err)
file.Link = "repair"
if cfg.EnableRepair() {
log.Debugf("File %s is marked for repair", filepath.Base(file.Path))
// log.Debugf("File %s is marked for repair", filepath.Base(file.Path))
torMgr.SetChecksum("") // force a recheck
}
}
@@ -161,7 +158,7 @@ func (gf *GetFile) streamFileToResponse(file *intTor.File, url string, w http.Re
log.Warnf("Received a %s status code for file %s", resp.Status, file.Path)
file.Link = "repair"
if cfg.EnableRepair() {
log.Debugf("File %s is marked for repair", filepath.Base(file.Path))
// log.Debugf("File %s is marked for repair", filepath.Base(file.Path))
torMgr.SetChecksum("") // force a recheck
}
}

View File

@@ -8,17 +8,15 @@ import (
"strings"
"github.com/debridmediamanager.com/zurg/internal/torrent"
"github.com/debridmediamanager.com/zurg/pkg/logutil"
"github.com/hashicorp/golang-lru/v2/expirable"
"go.uber.org/zap"
)
const (
SPLIT_TOKEN = "$"
)
func HandleHeadRequest(w http.ResponseWriter, r *http.Request, t *torrent.TorrentManager, cache *expirable.LRU[string, string]) {
log := logutil.NewLogger().Named("head")
func HandleHeadRequest(w http.ResponseWriter, r *http.Request, t *torrent.TorrentManager, cache *expirable.LRU[string, string], log *zap.SugaredLogger) {
requestPath := path.Clean(r.URL.Path)
requestPath = strings.Replace(requestPath, "/http", "", 1)
if requestPath == "/favicon.ico" {