From c8334ecb3bc2fd3f6f63dea65510f20c1462d1b6 Mon Sep 17 00:00:00 2001 From: Ben Sarmiento Date: Mon, 27 Nov 2023 21:50:00 +0100 Subject: [PATCH] Hotfix --- cmd/zurg/main.go | 31 ++++++---- internal/config/load.go | 6 +- internal/config/v1.go | 11 +++- internal/dav/listing.go | 24 ++++---- internal/http/listing.go | 17 +++--- internal/net/router.go | 19 +++--- internal/torrent/manager.go | 113 ++++++++++++++++++++++++++--------- internal/universal/get.go | 15 ++--- internal/universal/head.go | 6 +- pkg/dav/response.go | 43 ++++--------- pkg/dav/util.go | 1 + pkg/http/client.go | 79 +++++++++++++++--------- pkg/realdebrid/api.go | 13 ++-- pkg/realdebrid/unrestrict.go | 97 +++++++----------------------- scan_test.sh | 23 +++++++ 15 files changed, 277 insertions(+), 221 deletions(-) create mode 100755 scan_test.sh diff --git a/cmd/zurg/main.go b/cmd/zurg/main.go index 20afd0a..93faa61 100644 --- a/cmd/zurg/main.go +++ b/cmd/zurg/main.go @@ -17,6 +17,8 @@ import ( "github.com/debridmediamanager.com/zurg/pkg/logutil" "github.com/debridmediamanager.com/zurg/pkg/realdebrid" "github.com/hashicorp/golang-lru/v2/expirable" + + _ "net/http/pprof" ) func main() { @@ -32,40 +34,47 @@ func main() { } // normal startup - log := logutil.NewLogger().Named("zurg") + log := logutil.NewLogger() + zurglog := log.Named("zurg") - config, configErr := config.LoadZurgConfig("./config.yml") + config, configErr := config.LoadZurgConfig("./config.yml", log.Named("config")) if configErr != nil { - log.Errorf("Config failed to load: %v", configErr) + zurglog.Errorf("Config failed to load: %v", configErr) os.Exit(1) } cache := expirable.NewLRU[string, string](1e4, nil, time.Hour) - client := zurghttp.NewHTTPClient(config.GetToken(), 5, config) + apiClient := zurghttp.NewHTTPClient(config.GetToken(), 5, 15, config, log.Named("httpclient")) - rd := realdebrid.NewRealDebrid(config.GetToken(), client, logutil.NewLogger().Named("realdebrid")) + rd := realdebrid.NewRealDebrid(apiClient, log.Named("realdebrid")) p, err := ants.NewPool(config.GetNumOfWorkers()) if err != nil { - log.Errorf("Failed to create worker pool: %v", err) + zurglog.Errorf("Failed to create worker pool: %v", err) os.Exit(1) } defer p.Release() - torrentMgr := torrent.NewTorrentManager(config, rd, p) + torrentMgr := torrent.NewTorrentManager(config, rd, p, log.Named("manager")) - getfile := universal.NewGetFile(client) + downloadClient := zurghttp.NewHTTPClient("", 5, 0, config, log.Named("dlclient")) + + getfile := universal.NewGetFile(downloadClient) + + go func() { + http.ListenAndServe("localhost:6060", nil) + }() mux := http.NewServeMux() - net.Router(mux, getfile, config, torrentMgr, cache) + net.Router(mux, getfile, config, torrentMgr, cache, log.Named("net")) addr := fmt.Sprintf("%s:%s", config.GetHost(), config.GetPort()) server := &http.Server{Addr: addr, Handler: mux} - log.Infof("Starting server on %s", addr) + zurglog.Infof("Starting server on %s", addr) if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed { - log.Errorf("Failed to start server: %v", err) + zurglog.Errorf("Failed to start server: %v", err) os.Exit(1) } } diff --git a/internal/config/load.go b/internal/config/load.go index 79c3cde..6b01b20 100644 --- a/internal/config/load.go +++ b/internal/config/load.go @@ -4,13 +4,11 @@ import ( "fmt" "os" - "github.com/debridmediamanager.com/zurg/pkg/logutil" + "go.uber.org/zap" "gopkg.in/yaml.v3" ) -func LoadZurgConfig(filename string) (ConfigInterface, error) { - log := logutil.NewLogger().Named("config") - +func LoadZurgConfig(filename string, log *zap.SugaredLogger) (ConfigInterface, error) { log.Debug("Loading config file ", filename) content, err := os.ReadFile(filename) if err != nil { diff --git a/internal/config/v1.go b/internal/config/v1.go index 15ecc63..030c0a4 100644 --- a/internal/config/v1.go +++ b/internal/config/v1.go @@ -12,6 +12,10 @@ import ( "gopkg.in/yaml.v3" ) +const ( + ALL_TORRENTS = "__all__" +) + func loadV1Config(content []byte, log *zap.SugaredLogger) (*ZurgConfigV1, error) { var configV1 ZurgConfigV1 if err := yaml.Unmarshal(content, &configV1); err != nil { @@ -26,12 +30,13 @@ func (z *ZurgConfigV1) GetVersion() string { } func (z *ZurgConfigV1) GetDirectories() []string { - rootDirectories := make([]string, len(z.Directories)) + rootDirectories := make([]string, len(z.Directories)+1) i := 0 for directory := range z.Directories { rootDirectories[i] = directory i++ } + rootDirectories[i] = ALL_TORRENTS return rootDirectories } @@ -63,11 +68,15 @@ func (z *ZurgConfigV1) GetGroupMap() map[string][]string { copy(temp, v) result[k] = temp } + result[ALL_TORRENTS] = []string{ALL_TORRENTS} // Add special group for all torrents return result } func (z *ZurgConfigV1) MeetsConditions(directory, torrentName string, torrentIDs, fileNames []string) bool { + if directory == ALL_TORRENTS { + return true + } if _, ok := z.Directories[directory]; !ok { return false } diff --git a/internal/dav/listing.go b/internal/dav/listing.go index 3c1d804..154a0de 100644 --- a/internal/dav/listing.go +++ b/internal/dav/listing.go @@ -4,7 +4,6 @@ import ( "fmt" "net/http" "path" - "path/filepath" "sort" "strings" @@ -47,11 +46,14 @@ func HandlePropfindRequest(w http.ResponseWriter, r *http.Request, t *torrent.To func handleListDirectories(w http.ResponseWriter, t *torrent.TorrentManager) error { fmt.Fprint(w, "") // initial response is the directory itself - fmt.Fprint(w, dav.Directory("", "")) + fmt.Fprint(w, dav.BaseDirectory("", "")) directories := t.DirectoryMap.Keys() sort.Strings(directories) for _, directory := range directories { + if strings.HasPrefix(directory, "int__") { + continue + } fmt.Fprint(w, dav.Directory(directory, "")) } @@ -68,21 +70,23 @@ func handleListTorrents(w http.ResponseWriter, requestPath string, t *torrent.To fmt.Fprint(w, "") // initial response is the directory itself - fmt.Fprint(w, dav.Directory(basePath, "")) + fmt.Fprint(w, dav.BaseDirectory(basePath, "")) - var allTorrents []*torrent.Torrent - torrents.IterCb(func(_ string, tor *torrent.Torrent) { + var allTorrents []torrent.Torrent + torrents.IterCb(func(key string, tor *torrent.Torrent) { if tor.AllInProgress() { return } - allTorrents = append(allTorrents, tor) + copy := *tor + copy.AccessKey = key + allTorrents = append(allTorrents, copy) }) sort.Slice(allTorrents, func(i, j int) bool { return allTorrents[i].AccessKey < allTorrents[j].AccessKey }) for _, tor := range allTorrents { - fmt.Fprint(w, dav.Directory(filepath.Join(basePath, tor.AccessKey), tor.LatestAdded)) + fmt.Fprint(w, dav.Directory(tor.AccessKey, tor.LatestAdded)) } fmt.Fprint(w, "") @@ -90,7 +94,7 @@ func handleListTorrents(w http.ResponseWriter, requestPath string, t *torrent.To } func handleListFiles(w http.ResponseWriter, requestPath string, t *torrent.TorrentManager) error { - requestPath = strings.TrimPrefix(requestPath, "/") + requestPath = strings.Trim(requestPath, "/") basePath := path.Base(path.Dir(requestPath)) torrents, ok := t.DirectoryMap.Get(basePath) if !ok { @@ -104,7 +108,7 @@ func handleListFiles(w http.ResponseWriter, requestPath string, t *torrent.Torre fmt.Fprint(w, "") // initial response is the directory itself - fmt.Fprint(w, dav.Directory(requestPath, tor.LatestAdded)) + fmt.Fprint(w, dav.BaseDirectory(requestPath, tor.LatestAdded)) filenames := tor.SelectedFiles.Keys() sort.Strings(filenames) @@ -113,7 +117,7 @@ func handleListFiles(w http.ResponseWriter, requestPath string, t *torrent.Torre if file == nil || !strings.HasPrefix(file.Link, "http") { continue } - fmt.Fprint(w, dav.File(filepath.Join(requestPath, filename), file.Bytes, file.Ended)) + fmt.Fprint(w, dav.File(filename, file.Bytes, file.Ended)) } fmt.Fprint(w, "") diff --git a/internal/http/listing.go b/internal/http/listing.go index b0e2337..bcc6158 100644 --- a/internal/http/listing.go +++ b/internal/http/listing.go @@ -10,12 +10,10 @@ import ( "strings" "github.com/debridmediamanager.com/zurg/internal/torrent" - "github.com/debridmediamanager.com/zurg/pkg/logutil" + "go.uber.org/zap" ) -func HandleDirectoryListing(w http.ResponseWriter, r *http.Request, t *torrent.TorrentManager) { - log := logutil.NewLogger().Named("http") - +func HandleDirectoryListing(w http.ResponseWriter, r *http.Request, t *torrent.TorrentManager, log *zap.SugaredLogger) { requestPath := path.Clean(r.URL.Path) var output *string @@ -56,6 +54,9 @@ func handleRoot(t *torrent.TorrentManager) (*string, error) { directories := t.DirectoryMap.Keys() sort.Strings(directories) for _, directory := range directories { + if strings.HasPrefix(directory, "int__") { + continue + } directoryPath := url.PathEscape(directory) htmlDoc += fmt.Sprintf("
  • %s
  • ", directoryPath, directory) } @@ -72,12 +73,14 @@ func handleListOfTorrents(requestPath string, t *torrent.TorrentManager) (*strin htmlDoc := "
      " - var allTorrents []*torrent.Torrent - torrents.IterCb(func(_ string, tor *torrent.Torrent) { + var allTorrents []torrent.Torrent + torrents.IterCb(func(key string, tor *torrent.Torrent) { if tor.AllInProgress() { return } - allTorrents = append(allTorrents, tor) + copy := *tor + copy.AccessKey = key + allTorrents = append(allTorrents, copy) }) sort.Slice(allTorrents, func(i, j int) bool { return allTorrents[i].AccessKey < allTorrents[j].AccessKey diff --git a/internal/net/router.go b/internal/net/router.go index 7c7d61e..34cab15 100644 --- a/internal/net/router.go +++ b/internal/net/router.go @@ -10,26 +10,24 @@ import ( intHttp "github.com/debridmediamanager.com/zurg/internal/http" "github.com/debridmediamanager.com/zurg/internal/torrent" "github.com/debridmediamanager.com/zurg/internal/universal" - "github.com/debridmediamanager.com/zurg/pkg/logutil" "github.com/hashicorp/golang-lru/v2/expirable" + "go.uber.org/zap" ) // Router creates a WebDAV router -func Router(mux *http.ServeMux, getfile *universal.GetFile, c config.ConfigInterface, t *torrent.TorrentManager, cache *expirable.LRU[string, string]) { - log := logutil.NewLogger().Named("net") - +func Router(mux *http.ServeMux, getfile *universal.GetFile, c config.ConfigInterface, t *torrent.TorrentManager, cache *expirable.LRU[string, string], log *zap.SugaredLogger) { mux.HandleFunc("/http/", func(w http.ResponseWriter, r *http.Request) { switch r.Method { case http.MethodGet: requestPath := path.Clean(r.URL.Path) if countNonEmptySegments(strings.Split(requestPath, "/")) > 3 { - getfile.HandleGetRequest(w, r, t, c, cache) + getfile.HandleGetRequest(w, r, t, c, cache, log) } else { - intHttp.HandleDirectoryListing(w, r, t) + intHttp.HandleDirectoryListing(w, r, t, log) } case http.MethodHead: - universal.HandleHeadRequest(w, r, t, cache) + universal.HandleHeadRequest(w, r, t, cache, log) default: log.Errorf("Request %s %s not supported yet", r.Method, r.URL.Path) @@ -38,16 +36,15 @@ func Router(mux *http.ServeMux, getfile *universal.GetFile, c config.ConfigInter }) mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { - davlog := logutil.NewLogger().Named("dav") switch r.Method { case "PROPFIND": - dav.HandlePropfindRequest(w, r, t, davlog) + dav.HandlePropfindRequest(w, r, t, log) case http.MethodDelete: - dav.HandleDeleteRequest(w, r, t, davlog) + dav.HandleDeleteRequest(w, r, t, log) case http.MethodGet: - getfile.HandleGetRequest(w, r, t, c, cache) + getfile.HandleGetRequest(w, r, t, c, cache, log) case http.MethodOptions: w.WriteHeader(http.StatusOK) diff --git a/internal/torrent/manager.go b/internal/torrent/manager.go index f5b0904..8191e50 100644 --- a/internal/torrent/manager.go +++ b/internal/torrent/manager.go @@ -12,7 +12,6 @@ import ( "time" "github.com/debridmediamanager.com/zurg/internal/config" - "github.com/debridmediamanager.com/zurg/pkg/logutil" "github.com/debridmediamanager.com/zurg/pkg/realdebrid" "github.com/debridmediamanager.com/zurg/pkg/utils" cmap "github.com/orcaman/concurrent-map/v2" @@ -21,8 +20,9 @@ import ( ) const ( - ALL_TORRENTS = "__all__" - DATA_DIR = "data" + INT_ALL = "int__all__" + INT_INFO_CACHE = "int__info__" + DATA_DIR = "data" ) type TorrentManager struct { @@ -33,6 +33,7 @@ type TorrentManager struct { cfg config.ConfigInterface api *realdebrid.RealDebrid antsPool *ants.Pool + unrestrictPool *ants.Pool log *zap.SugaredLogger mu *sync.Mutex } @@ -40,21 +41,30 @@ type TorrentManager struct { // NewTorrentManager creates a new torrent manager // it will fetch all torrents and their info in the background // and store them in-memory and cached in files -func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, p *ants.Pool) *TorrentManager { +func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, p *ants.Pool, log *zap.SugaredLogger) *TorrentManager { t := &TorrentManager{ cfg: cfg, DirectoryMap: cmap.New[cmap.ConcurrentMap[string, *Torrent]](), requiredVersion: "18.11.2023", api: api, antsPool: p, - log: logutil.NewLogger().Named("manager"), + log: log, mu: &sync.Mutex{}, } + unrestrictPool, err := ants.NewPool(10) + if err != nil { + t.unrestrictPool = t.antsPool + } else { + t.unrestrictPool = unrestrictPool + } + ensureDir(DATA_DIR) - // create special directory - t.DirectoryMap.Set("__all__", cmap.New[*Torrent]()) // key is AccessKey + // create internal directories + t.DirectoryMap.Set(INT_ALL, cmap.New[*Torrent]()) // key is AccessKey + t.DirectoryMap.Set(INT_INFO_CACHE, cmap.New[*Torrent]()) // key is Torrent ID + // create directory maps for _, directory := range cfg.GetDirectories() { t.DirectoryMap.Set(directory, cmap.New[*Torrent]()) @@ -82,18 +92,18 @@ func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, p noInfoCount := 0 allCt := 0 - allTorrents, _ := t.DirectoryMap.Get(ALL_TORRENTS) + allTorrents, _ := t.DirectoryMap.Get(INT_ALL) for info := range torrentsChan { allCt++ if info == nil { noInfoCount++ continue } - if torrent, exists := allTorrents.Get(info.AccessKey); exists { + if torrent, exists := allTorrents.Get(info.AccessKey); !exists { + allTorrents.Set(info.AccessKey, info) + } else { mainTorrent := t.mergeToMain(torrent, info) allTorrents.Set(info.AccessKey, mainTorrent) - } else { - allTorrents.Set(info.AccessKey, info) } } @@ -134,6 +144,7 @@ func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, p go t.startRefreshJob() t.latestAdded = newTorrents[0].Added // set the latest added to the first torrent's added + t.log.Info("Finished initializing torrent manager") return t } @@ -168,8 +179,14 @@ func (t *TorrentManager) mergeToMain(mainTorrent, torrentToMerge *Torrent) *Torr // proxy func (t *TorrentManager) UnrestrictUntilOk(link string) *realdebrid.UnrestrictResponse { - ret := t.api.UnrestrictUntilOk(link) - return ret + retChan := make(chan *realdebrid.UnrestrictResponse, 1) + t.unrestrictPool.Submit(func() { + retChan <- t.api.UnrestrictUntilOk(link, t.cfg.ShouldServeFromRclone()) + time.Sleep(1 * time.Second) + }) + defer close(retChan) + return <-retChan + // return t.api.UnrestrictUntilOk(link, t.cfg.ShouldServeFromRclone()) } type torrentsResponse struct { @@ -178,9 +195,9 @@ type torrentsResponse struct { } func (t *TorrentManager) SetChecksum(checksum string) { - t.mu.Lock() + // t.mu.Lock() t.checksum = checksum - t.mu.Unlock() + // t.mu.Unlock() } // generates a checksum based on the number of torrents, the first torrent id and the number of active torrents @@ -251,6 +268,23 @@ func (t *TorrentManager) startRefreshJob() { } t.log.Infof("Detected changes! Refreshing %d torrents", len(newTorrents)) + // handle deleted torrents in info cache + keep := make(map[string]bool) + for _, torrent := range newTorrents { + keep[torrent.ID] = true + } + var toDelete []string + infoCache, _ := t.DirectoryMap.Get(INT_INFO_CACHE) + infoCache.IterCb(func(torrentID string, torrent *Torrent) { + if _, ok := keep[torrentID]; !ok { + toDelete = append(toDelete, torrentID) + } + }) + for _, torrentID := range toDelete { + infoCache.Remove(torrentID) + } + // end info cache cleanup + torrentsChan := make(chan *Torrent, len(newTorrents)) var wg sync.WaitGroup for i := range newTorrents { @@ -267,20 +301,20 @@ func (t *TorrentManager) startRefreshJob() { t.log.Infof("Fetched info for %d torrents", len(newTorrents)) noInfoCount := 0 - oldTorrents, _ := t.DirectoryMap.Get(ALL_TORRENTS) + oldTorrents, _ := t.DirectoryMap.Get(INT_ALL) newSet := cmap.New[*Torrent]() for info := range torrentsChan { if info == nil { noInfoCount++ continue } - if torrent, exists := oldTorrents.Get(info.AccessKey); exists { + if torrent, exists := oldTorrents.Get(info.AccessKey); !exists { + oldTorrents.Set(info.AccessKey, info) + newSet.Set(info.AccessKey, info) + } else { mainTorrent := t.mergeToMain(torrent, info) oldTorrents.Set(info.AccessKey, mainTorrent) newSet.Set(info.AccessKey, mainTorrent) - } else { - oldTorrents.Set(info.AccessKey, info) - newSet.Set(info.AccessKey, info) } } @@ -333,15 +367,24 @@ func (t *TorrentManager) startRefreshJob() { t.log.Info("Checking for torrents to repair") t.repairAll() t.log.Info("Finished checking for torrents to repair") + } else { + t.log.Info("Repair is disabled, skipping repair check") } go OnLibraryUpdateHook(updatedPaths, t.cfg, t.log) t.latestAdded = newTorrents[0].Added + t.log.Info("Finished refreshing torrents") } } // getMoreInfo gets original name, size and files for a torrent func (t *TorrentManager) getMoreInfo(rdTorrent realdebrid.Torrent) *Torrent { + infoCache, _ := t.DirectoryMap.Get(INT_INFO_CACHE) + if infoCache.Has(rdTorrent.ID) { + tor, _ := infoCache.Get(rdTorrent.ID) + return tor + } + var info *realdebrid.TorrentInfo var err error // file cache @@ -399,6 +442,9 @@ func (t *TorrentManager) getMoreInfo(rdTorrent realdebrid.Torrent) *Torrent { if len(selectedFiles) > 0 && torrentFromFile == nil { t.writeToFile(info) // only when there are selected files, else it's useless } + + infoCache.Set(rdTorrent.ID, &torrent) + return &torrent } @@ -470,10 +516,11 @@ func (t *TorrentManager) organizeChaos(links []string, selectedFiles []*File) ([ wg.Add(1) link := link // redeclare to avoid closure on loop variable // Use the existing worker pool to submit tasks - _ = t.antsPool.Submit(func() { + _ = t.unrestrictPool.Submit(func() { defer wg.Done() - resp := t.api.UnrestrictUntilOk(link) + resp := t.api.UnrestrictUntilOk(link, t.cfg.ShouldServeFromRclone()) resultsChan <- Result{Response: resp} + time.Sleep(1 * time.Second) }) } @@ -522,7 +569,8 @@ func (t *TorrentManager) repairAll() { return } - allTorrents, _ := t.DirectoryMap.Get(ALL_TORRENTS) + var toDelete []string + allTorrents, _ := t.DirectoryMap.Get(INT_ALL) allTorrents.IterCb(func(_ string, torrent *Torrent) { if torrent.AnyInProgress() { t.log.Debugf("Skipping %s for repairs because it is in progress", torrent.AccessKey) @@ -531,7 +579,7 @@ func (t *TorrentManager) repairAll() { forRepair := false unselected := 0 torrent.SelectedFiles.IterCb(func(_ string, file *File) { - if file.Link == "repair" { + if file.Link == "repair" && !forRepair { t.log.Debugf("Found a file to repair for torrent %s", torrent.AccessKey) forRepair = true } @@ -545,20 +593,29 @@ func (t *TorrentManager) repairAll() { } if unselected == torrent.SelectedFiles.Count() && unselected > 0 { t.log.Infof("Deleting %s", torrent.AccessKey) - t.Delete(torrent.AccessKey) + toDelete = append(toDelete, torrent.AccessKey) } }) + for _, accessKey := range toDelete { + t.Delete(accessKey) + } } func (t *TorrentManager) Delete(accessKey string) { + infoCache, _ := t.DirectoryMap.Get(INT_INFO_CACHE) t.log.Infof("Deleting torrent %s", accessKey) - allTorrents, _ := t.DirectoryMap.Get(ALL_TORRENTS) + allTorrents, _ := t.DirectoryMap.Get(INT_ALL) if torrent, ok := allTorrents.Get(accessKey); ok { for _, instance := range torrent.Instances { + infoCache.Remove(instance.ID) t.api.DeleteTorrent(instance.ID) } - allTorrents.Remove(accessKey) } + t.DirectoryMap.IterCb(func(_ string, torrents cmap.ConcurrentMap[string, *Torrent]) { + if _, ok := torrents.Get(accessKey); ok { + torrents.Remove(accessKey) + } + }) } func (t *TorrentManager) Repair(accessKey string) { @@ -567,7 +624,7 @@ func (t *TorrentManager) Repair(accessKey string) { return } - allTorrents, _ := t.DirectoryMap.Get(ALL_TORRENTS) + allTorrents, _ := t.DirectoryMap.Get(INT_ALL) torrent, _ := allTorrents.Get(accessKey) if torrent == nil { t.log.Warnf("Cannot find torrent %s anymore so we are not repairing it", accessKey) diff --git a/internal/universal/get.go b/internal/universal/get.go index ccddb0a..6243249 100644 --- a/internal/universal/get.go +++ b/internal/universal/get.go @@ -13,7 +13,6 @@ import ( intHttp "github.com/debridmediamanager.com/zurg/internal/http" intTor "github.com/debridmediamanager.com/zurg/internal/torrent" zurghttp "github.com/debridmediamanager.com/zurg/pkg/http" - "github.com/debridmediamanager.com/zurg/pkg/logutil" "github.com/hashicorp/golang-lru/v2/expirable" "go.uber.org/zap" ) @@ -27,9 +26,7 @@ func NewGetFile(client *zurghttp.HTTPClient) *GetFile { } // HandleGetRequest handles a GET request universally for both WebDAV and HTTP -func (gf *GetFile) HandleGetRequest(w http.ResponseWriter, r *http.Request, t *intTor.TorrentManager, c config.ConfigInterface, cache *expirable.LRU[string, string]) { - log := logutil.NewLogger().Named("file") - +func (gf *GetFile) HandleGetRequest(w http.ResponseWriter, r *http.Request, t *intTor.TorrentManager, c config.ConfigInterface, cache *expirable.LRU[string, string], log *zap.SugaredLogger) { requestPath := path.Clean(r.URL.Path) isDav := true if strings.Contains(requestPath, "/http") { @@ -45,7 +42,7 @@ func (gf *GetFile) HandleGetRequest(w http.ResponseWriter, r *http.Request, t *i if isDav { dav.HandlePropfindRequest(w, r, t, log) } else { - intHttp.HandleDirectoryListing(w, r, t) + intHttp.HandleDirectoryListing(w, r, t, log) } return } @@ -93,10 +90,10 @@ func (gf *GetFile) HandleGetRequest(w http.ResponseWriter, r *http.Request, t *i resp := t.UnrestrictUntilOk(link) if resp == nil { - log.Warnf("File %s is no longer available", filepath.Base(file.Path)) + // log.Warnf("File %s is no longer available, link %s", filepath.Base(file.Path), link) file.Link = "repair" if c.EnableRepair() { - log.Debugf("File %s is marked for repair", filepath.Base(file.Path)) + // log.Debugf("File %s is marked for repair", filepath.Base(file.Path)) t.SetChecksum("") // force a recheck } http.Error(w, "File is not available", http.StatusNotFound) @@ -147,7 +144,7 @@ func (gf *GetFile) streamFileToResponse(file *intTor.File, url string, w http.Re log.Warnf("Cannot download file %s: %v", file.Path, err) file.Link = "repair" if cfg.EnableRepair() { - log.Debugf("File %s is marked for repair", filepath.Base(file.Path)) + // log.Debugf("File %s is marked for repair", filepath.Base(file.Path)) torMgr.SetChecksum("") // force a recheck } } @@ -161,7 +158,7 @@ func (gf *GetFile) streamFileToResponse(file *intTor.File, url string, w http.Re log.Warnf("Received a %s status code for file %s", resp.Status, file.Path) file.Link = "repair" if cfg.EnableRepair() { - log.Debugf("File %s is marked for repair", filepath.Base(file.Path)) + // log.Debugf("File %s is marked for repair", filepath.Base(file.Path)) torMgr.SetChecksum("") // force a recheck } } diff --git a/internal/universal/head.go b/internal/universal/head.go index d3f7abf..2ba6d19 100644 --- a/internal/universal/head.go +++ b/internal/universal/head.go @@ -8,17 +8,15 @@ import ( "strings" "github.com/debridmediamanager.com/zurg/internal/torrent" - "github.com/debridmediamanager.com/zurg/pkg/logutil" "github.com/hashicorp/golang-lru/v2/expirable" + "go.uber.org/zap" ) const ( SPLIT_TOKEN = "$" ) -func HandleHeadRequest(w http.ResponseWriter, r *http.Request, t *torrent.TorrentManager, cache *expirable.LRU[string, string]) { - log := logutil.NewLogger().Named("head") - +func HandleHeadRequest(w http.ResponseWriter, r *http.Request, t *torrent.TorrentManager, cache *expirable.LRU[string, string], log *zap.SugaredLogger) { requestPath := path.Clean(r.URL.Path) requestPath = strings.Replace(requestPath, "/http", "", 1) if requestPath == "/favicon.ico" { diff --git a/pkg/dav/response.go b/pkg/dav/response.go index 7d47de2..283b257 100644 --- a/pkg/dav/response.go +++ b/pkg/dav/response.go @@ -1,39 +1,22 @@ package dav -import "fmt" - -func DavDirectory(path, added string) Response { - return Response{ - Href: "/" + customPathEscape(path), - Propstat: PropStat{ - Prop: Prop{ - ResourceType: ResourceType{Value: ""}, - LastModified: added, - }, - Status: "HTTP/1.1 200 OK", - }, - } -} - -func DavFile(path string, fileSize int64, added string, link string) Response { - return Response{ - Href: "/" + customPathEscape(path), - Propstat: PropStat{ - Prop: Prop{ - ContentLength: fileSize, - LastModified: added, - }, - Status: "HTTP/1.1 200 OK", - }, - } -} +import ( + "fmt" + "path/filepath" +) // optimized versions, no more marshalling -func Directory(path, added string) string { +func BaseDirectory(path, added string) string { return fmt.Sprintf("/%s%sHTTP/1.1 200 OK", customPathEscape(path), added) } -func File(path string, fileSize int64, added string) string { - return fmt.Sprintf("/%s%d%sHTTP/1.1 200 OK", customPathEscape(path), fileSize, added) +func Directory(path, added string) string { + path = filepath.Base(path) + return fmt.Sprintf("%s%sHTTP/1.1 200 OK", customPathEscape(path), added) +} + +func File(path string, fileSize int64, added string) string { + path = filepath.Base(path) + return fmt.Sprintf("%s%d%sHTTP/1.1 200 OK", customPathEscape(path), fileSize, added) } diff --git a/pkg/dav/util.go b/pkg/dav/util.go index b83fa8d..bacc107 100644 --- a/pkg/dav/util.go +++ b/pkg/dav/util.go @@ -18,5 +18,6 @@ func customPathEscape(input string) string { escapedPath = strings.Replace(escapedPath, ">", "%3E", -1) // for > escapedPath = strings.Replace(escapedPath, "\"", "%22", -1) // for " escapedPath = strings.Replace(escapedPath, "'", "%27", -1) // for ' + escapedPath = strings.Replace(escapedPath, ":", "%3A", -1) // for : return escapedPath } diff --git a/pkg/http/client.go b/pkg/http/client.go index 8311a10..f254136 100644 --- a/pkg/http/client.go +++ b/pkg/http/client.go @@ -2,33 +2,34 @@ package http import ( "context" + "math" "net" "net/http" "strings" "time" "github.com/debridmediamanager.com/zurg/internal/config" - "github.com/debridmediamanager.com/zurg/pkg/logutil" cmap "github.com/orcaman/concurrent-map/v2" "go.uber.org/zap" ) type HTTPClient struct { - Client *http.Client - MaxRetries int - Backoff func(attempt int) time.Duration - CheckRespStatus func(resp *http.Response, err error) bool - BearerToken string - log *zap.SugaredLogger - config config.ConfigInterface - IPv6 cmap.ConcurrentMap[string, net.IP] + Client *http.Client + MaxRetries int + Backoff func(attempt int) time.Duration + ShouldRetry func(resp *http.Response, err error) int + BearerToken string + log *zap.SugaredLogger + config config.ConfigInterface + IPv6 cmap.ConcurrentMap[string, net.IP] } func (r *HTTPClient) Do(req *http.Request) (*http.Response, error) { + req.Close = true if r.config != nil && strings.Contains(req.Host, "download.real-debrid.") { prefHost := r.config.GetRandomPreferredHost() if prefHost != "" { - req.Host = prefHost + req.URL.Host = prefHost } } if r.BearerToken != "" { @@ -72,35 +73,57 @@ func (r *HTTPClient) Do(req *http.Request) (*http.Response, error) { var resp *http.Response var err error - for attempt := 0; attempt < r.MaxRetries; attempt++ { + attempt := 0 + for { resp, err = r.Client.Do(req) - if !r.CheckRespStatus(resp, err) { + if val := r.ShouldRetry(resp, err); val == -1 { return resp, err + } else { + if val == 0 { + time.Sleep(8 * time.Second) // extra delay + } else { + attempt += val + if attempt > r.MaxRetries { + return resp, err + } + time.Sleep(r.Backoff(attempt)) + } } - time.Sleep(r.Backoff(attempt)) } - return resp, err } -func NewHTTPClient(token string, maxRetries int, cfg config.ConfigInterface) *HTTPClient { +func NewHTTPClient(token string, maxRetries int, timeoutSecs int, cfg config.ConfigInterface, log *zap.SugaredLogger) *HTTPClient { return &HTTPClient{ BearerToken: token, - Client: &http.Client{}, - MaxRetries: maxRetries, + Client: &http.Client{ + Timeout: time.Duration(timeoutSecs) * time.Second, + }, + MaxRetries: maxRetries, Backoff: func(attempt int) time.Duration { - return time.Duration(attempt) * time.Second - }, - CheckRespStatus: func(resp *http.Response, err error) bool { - if err != nil { - return true + maxDuration := 60 + backoff := int(math.Pow(2, float64(attempt))) + if backoff > maxDuration { + backoff = maxDuration } - if resp.StatusCode == 429 { - return true - } - // no need to retry) - return false + return time.Duration(backoff) * time.Second }, - log: logutil.NewLogger().Named("client"), + ShouldRetry: func(resp *http.Response, err error) int { + if resp != nil { + if resp.StatusCode == 429 || resp.StatusCode == 400 { + return 0 // retry but don't increment attempt + } + return -1 // don't retry + } else if err != nil { + errStr := err.Error() + if strings.Contains(errStr, "EOF") || strings.Contains(errStr, "connection reset") || strings.Contains(errStr, "no such host") { + return 0 // retry but don't increment attempt + } else { + return 1 + } + } + return 1 // retry and increment attempt + }, + log: log, config: cfg, IPv6: cmap.New[net.IP](), } diff --git a/pkg/realdebrid/api.go b/pkg/realdebrid/api.go index c4a656c..5b4ac8a 100644 --- a/pkg/realdebrid/api.go +++ b/pkg/realdebrid/api.go @@ -19,7 +19,7 @@ type RealDebrid struct { client *zurghttp.HTTPClient } -func NewRealDebrid(accessToken string, client *zurghttp.HTTPClient, log *zap.SugaredLogger) *RealDebrid { +func NewRealDebrid(client *zurghttp.HTTPClient, log *zap.SugaredLogger) *RealDebrid { return &RealDebrid{ log: log, client: client, @@ -69,7 +69,7 @@ func (rd *RealDebrid) GetTorrents(customLimit int) ([]Torrent, int, error) { page := 1 limit := customLimit if limit == 0 { - limit = 2500 + limit = 1000 } totalCount := 0 @@ -263,7 +263,7 @@ func (rd *RealDebrid) GetActiveTorrentCount() (*ActiveTorrentCountResponse, erro return &response, nil } -func (rd *RealDebrid) UnrestrictLink(link string) (*UnrestrictResponse, error) { +func (rd *RealDebrid) UnrestrictLink(link string, checkFirstByte bool) (*UnrestrictResponse, error) { data := url.Values{} data.Set("link", link) @@ -283,6 +283,11 @@ func (rd *RealDebrid) UnrestrictLink(link string) (*UnrestrictResponse, error) { } defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + rd.log.Errorf("Unrestrict link request returned status code %d for link %s", resp.StatusCode, link) + // return nil, fmt.Errorf("unrestrict link request returned status code %d so likely it has expired", resp.StatusCode) + } + body, err := io.ReadAll(resp.Body) if err != nil { // rd.log.Errorf("Error when reading the body of unrestrict link response: %v", err) @@ -296,7 +301,7 @@ func (rd *RealDebrid) UnrestrictLink(link string) (*UnrestrictResponse, error) { return nil, fmt.Errorf("undecodable response so likely it has expired") } - if !rd.canFetchFirstByte(response.Download) { + if checkFirstByte && !rd.canFetchFirstByte(response.Download) { return nil, fmt.Errorf("can't fetch first byte") } diff --git a/pkg/realdebrid/unrestrict.go b/pkg/realdebrid/unrestrict.go index 7042d88..270716f 100644 --- a/pkg/realdebrid/unrestrict.go +++ b/pkg/realdebrid/unrestrict.go @@ -3,91 +3,40 @@ package realdebrid import ( "net/http" "strings" - "time" ) -func (rd *RealDebrid) UnrestrictUntilOk(link string) *UnrestrictResponse { +func (rd *RealDebrid) UnrestrictUntilOk(link string, serveFromRclone bool) *UnrestrictResponse { if !strings.HasPrefix(link, "http") { return nil } - unrestrictFn := func(link string) (*UnrestrictResponse, error) { - return rd.UnrestrictLink(link) + resp, _ := rd.UnrestrictLink(link, serveFromRclone) + if resp != nil { + return resp } - return retryUntilOk(func() (*UnrestrictResponse, error) { - return unrestrictFn(link) - }) + return nil } func (rd *RealDebrid) canFetchFirstByte(url string) bool { - const maxAttempts = 3 + req, err := http.NewRequest("GET", url, nil) + if err != nil { + return false + } - for i := 0; i < maxAttempts; i++ { - // Create a new HTTP request - req, err := http.NewRequest("GET", url, nil) - if err != nil { - continue + req.Header.Set("Range", "bytes=0-0") + + resp, err := rd.client.Do(req) + if err != nil { + return false + } + defer resp.Body.Close() + + // If server supports partial content + if resp.StatusCode == http.StatusPartialContent || resp.StatusCode == http.StatusOK { + buffer := make([]byte, 1) + _, err = resp.Body.Read(buffer) + if err == nil { + return true } - - // Set the Range header to request only the first byte - req.Header.Set("Range", "bytes=0-0") - - // TODO set a proper client - resp, err := rd.client.Do(req) - if err != nil { - time.Sleep(1 * time.Second) // Add a delay before the next retry - continue - } - defer resp.Body.Close() - - // If server supports partial content - if resp.StatusCode == http.StatusPartialContent { - buffer := make([]byte, 1) - _, err := resp.Body.Read(buffer) - if err == nil { - return true - } - } else if resp.StatusCode == http.StatusOK { - // If server doesn't support partial content, try reading the first byte and immediately close - buffer := make([]byte, 1) - _, err = resp.Body.Read(buffer) - if err == nil { - return true - } - } - time.Sleep(500 * time.Millisecond) // Add a delay before the next retry } return false } - -func retryUntilOk[T any](fn func() (T, error)) T { - // const initialDelay = 1 * time.Second - // const maxDelay = 128 * time.Second - const maxRetries = 2 // Maximum retries for non-429 errors - - var result T - var err error - var retryCount int - - for { - result, err = fn() - if err == nil { - return result - } - - if strings.Contains(err.Error(), "first byte") || strings.Contains(err.Error(), "expired") { - return result - } - if !strings.Contains(err.Error(), "429") { - retryCount++ - if retryCount >= maxRetries { - // If we've reached the maximum retries for errors other than 429, return the last result. - return result - } - } - - // Calculate delay with exponential backoff - // delay := time.Duration(math.Min(float64(initialDelay)*math.Pow(2, float64(retryCount)), float64(maxDelay))) - delay := 500 * time.Millisecond - time.Sleep(delay) - } -} diff --git a/scan_test.sh b/scan_test.sh new file mode 100755 index 0000000..4b85da0 --- /dev/null +++ b/scan_test.sh @@ -0,0 +1,23 @@ +#!/bin/bash + +process_file() { + echo "Processing $1" + # echo -n "First 100 bytes of $file: " + # | od -An -t x1 + # dd bs=1 count=100 if="$file" 2>/dev/null + dd bs=1 count=100 if="$1" >/dev/null 2>&1 +} + +export -f process_file + +if [ "$#" -ne 1 ]; then + echo "Usage: $0 directory" + exit 1 +fi + +if [ ! -d "$1" ]; then + echo "Directory $1 does not exist." + exit 1 +fi + +find "$1" -type f -print0 | xargs -0 -n1 -P20 -I{} bash -c 'process_file "$@"' _ {}