From 595040ad7e67cdaf449c1f72764b51aaa1eb17ca Mon Sep 17 00:00:00 2001 From: Ben Sarmiento Date: Fri, 24 Nov 2023 21:35:22 +0100 Subject: [PATCH] Use a proper client for fetch byte --- cmd/zurg/main.go | 62 +++++---------------------- internal/config/types.go | 6 +++ internal/net/router.go | 10 ++--- internal/universal/get.go | 51 ++++++++++++++-------- pkg/realdebrid/api.go | 2 +- pkg/realdebrid/unrestrict.go | 82 ++++++++++++++++++------------------ 6 files changed, 97 insertions(+), 116 deletions(-) diff --git a/cmd/zurg/main.go b/cmd/zurg/main.go index eae5ff3..4fed888 100644 --- a/cmd/zurg/main.go +++ b/cmd/zurg/main.go @@ -1,17 +1,15 @@ package main import ( - "context" "fmt" "net/http" "os" - "os/signal" - "syscall" "time" "github.com/debridmediamanager.com/zurg/internal/config" "github.com/debridmediamanager.com/zurg/internal/net" "github.com/debridmediamanager.com/zurg/internal/torrent" + "github.com/debridmediamanager.com/zurg/internal/universal" "github.com/debridmediamanager.com/zurg/internal/version" "github.com/panjf2000/ants/v2" @@ -44,7 +42,8 @@ func main() { cache := expirable.NewLRU[string, string](1e4, nil, time.Hour) - client := zurghttp.NewHTTPClient(config.GetToken(), 3, nil) + client := zurghttp.NewHTTPClient(config.GetToken(), 5, nil) + rd := realdebrid.NewRealDebrid(config.GetToken(), client, logutil.NewLogger().Named("realdebrid")) p, err := ants.NewPool(config.GetNumOfWorkers()) @@ -56,58 +55,17 @@ func main() { torrentMgr := torrent.NewTorrentManager(config, rd, p) + getfile := universal.NewGetFile(client) + mux := http.NewServeMux() - net.Router(mux, config, torrentMgr, cache) + net.Router(mux, getfile, config, torrentMgr, cache) addr := fmt.Sprintf("%s:%s", config.GetHost(), config.GetPort()) server := &http.Server{Addr: addr, Handler: mux} - shutdown := make(chan os.Signal, 1) - signal.Notify(shutdown, os.Interrupt, syscall.SIGTERM) - - go func() { - log.Infof("Starting server on %s", addr) - if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed { - log.Errorf("Failed to start server: %v", err) - os.Exit(1) - } - }() - - // log.Debugf("Initializing chunk manager, cores: %d", runtime.NumCPU()) - // client := zurghttp.NewHTTPClient(config.GetToken(), 3, config) - // chunkMgr, err := chunk.NewManager( - // "", // in-memory chunks - // 10485760, // 10MB chunk size - // max(runtime.NumCPU()/2, 1), // 8 cores/2 = 4 chunks to load ahead - // max(runtime.NumCPU()/2, 1), // 4 check threads - // max(runtime.NumCPU()-1, 1), // number of chunks that should be read ahead - // runtime.NumCPU()*2, // total chunks kept in memory - // torrentMgr, - // client) - // if nil != err { - // log.Panicf("Failed to initialize chunk manager: %v", err) - // } - - // fs := zfs.NewZurgFS(torrentMgr, config, chunkMgr, logutil.NewLogger().Named("zfs")) - // host := fuse.NewFileSystemHost(fs) - // go func() { - // log.Infof("Mounting on %s", config.GetMountPoint()) - // if err := zfs.Mount(host, config); err != nil { - // log.Panicf("Failed to mount: %v", err) - // } - // }() - - <-shutdown - - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - - if err := server.Shutdown(ctx); err != nil { - log.Errorf("Server shutdown error: %v\n", err) + log.Infof("Starting server on %s", addr) + if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed { + log.Errorf("Failed to start server: %v", err) + os.Exit(1) } - // if err := zfs.Unmount(host); err != nil { - // log.Errorf("Unmount error: %v\n", err) - // } - - log.Info("BYE") } diff --git a/internal/config/types.go b/internal/config/types.go index 84f3403..eca4ad5 100644 --- a/internal/config/types.go +++ b/internal/config/types.go @@ -18,6 +18,7 @@ type ConfigInterface interface { EnableRetainFolderNameExtension() bool EnableRetainRDTorrentName() bool GetRandomPreferredHost() string + ShouldServeFromRclone() bool } type ZurgConfig struct { @@ -34,6 +35,7 @@ type ZurgConfig struct { RetainFolderNameExtension bool `yaml:"retain_folder_name_extension"` RetainRDTorrentName bool `yaml:"retain_rd_torrent_name"` PreferredHosts []string `yaml:"preferred_hosts"` + ServeFromRclone bool `yaml:"serve_from_rclone"` } func (z *ZurgConfig) GetToken() string { @@ -105,3 +107,7 @@ func (z *ZurgConfig) GetRandomPreferredHost() string { randomIndex := rand.Intn(len(z.PreferredHosts)) return z.PreferredHosts[randomIndex] } + +func (z *ZurgConfig) ShouldServeFromRclone() bool { + return z.ServeFromRclone +} diff --git a/internal/net/router.go b/internal/net/router.go index a663303..7c7d61e 100644 --- a/internal/net/router.go +++ b/internal/net/router.go @@ -7,7 +7,7 @@ import ( "github.com/debridmediamanager.com/zurg/internal/config" "github.com/debridmediamanager.com/zurg/internal/dav" - zurghttp "github.com/debridmediamanager.com/zurg/internal/http" + intHttp "github.com/debridmediamanager.com/zurg/internal/http" "github.com/debridmediamanager.com/zurg/internal/torrent" "github.com/debridmediamanager.com/zurg/internal/universal" "github.com/debridmediamanager.com/zurg/pkg/logutil" @@ -15,7 +15,7 @@ import ( ) // Router creates a WebDAV router -func Router(mux *http.ServeMux, c config.ConfigInterface, t *torrent.TorrentManager, cache *expirable.LRU[string, string]) { +func Router(mux *http.ServeMux, getfile *universal.GetFile, c config.ConfigInterface, t *torrent.TorrentManager, cache *expirable.LRU[string, string]) { log := logutil.NewLogger().Named("net") mux.HandleFunc("/http/", func(w http.ResponseWriter, r *http.Request) { @@ -23,9 +23,9 @@ func Router(mux *http.ServeMux, c config.ConfigInterface, t *torrent.TorrentMana case http.MethodGet: requestPath := path.Clean(r.URL.Path) if countNonEmptySegments(strings.Split(requestPath, "/")) > 3 { - universal.HandleGetRequest(w, r, t, c, cache) + getfile.HandleGetRequest(w, r, t, c, cache) } else { - zurghttp.HandleDirectoryListing(w, r, t) + intHttp.HandleDirectoryListing(w, r, t) } case http.MethodHead: @@ -47,7 +47,7 @@ func Router(mux *http.ServeMux, c config.ConfigInterface, t *torrent.TorrentMana dav.HandleDeleteRequest(w, r, t, davlog) case http.MethodGet: - universal.HandleGetRequest(w, r, t, c, cache) + getfile.HandleGetRequest(w, r, t, c, cache) case http.MethodOptions: w.WriteHeader(http.StatusOK) diff --git a/internal/universal/get.go b/internal/universal/get.go index e3ba402..c502ce2 100644 --- a/internal/universal/get.go +++ b/internal/universal/get.go @@ -17,8 +17,16 @@ import ( "go.uber.org/zap" ) +type GetFile struct { + client *zurghttp.HTTPClient +} + +func NewGetFile(client *zurghttp.HTTPClient) *GetFile { + return &GetFile{client: client} +} + // HandleGetRequest handles a GET request universally for both WebDAV and HTTP -func HandleGetRequest(w http.ResponseWriter, r *http.Request, t *intTor.TorrentManager, c config.ConfigInterface, cache *expirable.LRU[string, string]) { +func (gf *GetFile) HandleGetRequest(w http.ResponseWriter, r *http.Request, t *intTor.TorrentManager, c config.ConfigInterface, cache *expirable.LRU[string, string]) { log := logutil.NewLogger().Named("file") requestPath := path.Clean(r.URL.Path) @@ -65,15 +73,19 @@ func HandleGetRequest(w http.ResponseWriter, r *http.Request, t *intTor.TorrentM return } - if data, exists := cache.Get(requestPath); exists { - streamFileToResponse(file, data, w, r, t, c, log) + if url, exists := cache.Get(requestPath); exists { + if c.ShouldServeFromRclone() { + http.Redirect(w, r, url, http.StatusFound) + } else { + gf.streamFileToResponse(file, url, w, r, t, c, log) + } return } if !strings.HasPrefix(file.Link, "http") { // This is a dead file, serve an alternate file log.Warnf("File %s is not yet available, zurg is repairing the torrent", filename) - streamErrorVideo("https://www.youtube.com/watch?v=bGTqwt6vdcY", w, r, t, c, log) + gf.playErrorVideo("https://www.youtube.com/watch?v=bGTqwt6vdcY", w, r, t, c, log) return } link := file.Link @@ -83,7 +95,7 @@ func HandleGetRequest(w http.ResponseWriter, r *http.Request, t *intTor.TorrentM log.Warnf("File %s is no longer available", file.Path) file.Link = "repair" t.SetChecksum("") // force a recheck - streamErrorVideo("https://www.youtube.com/watch?v=gea_FJrtFVA", w, r, t, c, log) + gf.playErrorVideo("https://www.youtube.com/watch?v=gea_FJrtFVA", w, r, t, c, log) } else { if resp.Filename != filename { // this is possible if there's only 1 streamable file in the torrent @@ -92,25 +104,29 @@ func HandleGetRequest(w http.ResponseWriter, r *http.Request, t *intTor.TorrentM expectedExt := filepath.Ext(filename) if actualExt != expectedExt && resp.Streamable != 1 { log.Warnf("File was changed and is not streamable: %s and %s", filename, resp.Filename) - streamErrorVideo("https://www.youtube.com/watch?v=t9VgOriBHwE", w, r, t, c, log) + gf.playErrorVideo("https://www.youtube.com/watch?v=t9VgOriBHwE", w, r, t, c, log) return } else { log.Warnf("Filename mismatch: %s and %s", filename, resp.Filename) } } cache.Add(requestPath, resp.Download) - streamFileToResponse(file, resp.Download, w, r, t, c, log) + if c.ShouldServeFromRclone() { + http.Redirect(w, r, resp.Download, http.StatusFound) + } else { + gf.streamFileToResponse(file, resp.Download, w, r, t, c, log) + } } } -func streamFileToResponse(file *intTor.File, url string, w http.ResponseWriter, r *http.Request, torMgr *intTor.TorrentManager, cfg config.ConfigInterface, log *zap.SugaredLogger) { +func (gf *GetFile) streamFileToResponse(file *intTor.File, url string, w http.ResponseWriter, r *http.Request, torMgr *intTor.TorrentManager, cfg config.ConfigInterface, log *zap.SugaredLogger) { // Create a new request for the file download. req, err := http.NewRequest(http.MethodGet, url, nil) if err != nil { if file != nil { log.Errorf("Error creating new request for file %s: %v", file.Path, err) } - streamErrorVideo("https://www.youtube.com/watch?v=H3NSrObyAxM", w, r, torMgr, cfg, log) + gf.playErrorVideo("https://www.youtube.com/watch?v=H3NSrObyAxM", w, r, torMgr, cfg, log) return } @@ -119,17 +135,14 @@ func streamFileToResponse(file *intTor.File, url string, w http.ResponseWriter, req.Header.Add("Range", r.Header.Get("Range")) } - // Create a custom HTTP client - client := zurghttp.NewHTTPClient(cfg.GetToken(), 10, cfg) - - resp, err := client.Do(req) + resp, err := gf.client.Do(req) if err != nil { if file != nil { log.Warnf("Cannot download file %s: %v", file.Path, err) file.Link = "repair" torMgr.SetChecksum("") // force a recheck } - streamErrorVideo("https://www.youtube.com/watch?v=FSSd8cponAA", w, r, torMgr, cfg, log) + gf.playErrorVideo("https://www.youtube.com/watch?v=FSSd8cponAA", w, r, torMgr, cfg, log) return } defer resp.Body.Close() @@ -140,7 +153,7 @@ func streamFileToResponse(file *intTor.File, url string, w http.ResponseWriter, file.Link = "repair" torMgr.SetChecksum("") // force a recheck } - streamErrorVideo("https://www.youtube.com/watch?v=BcseUxviVqE", w, r, torMgr, cfg, log) + gf.playErrorVideo("https://www.youtube.com/watch?v=BcseUxviVqE", w, r, torMgr, cfg, log) return } @@ -154,11 +167,15 @@ func streamFileToResponse(file *intTor.File, url string, w http.ResponseWriter, io.CopyBuffer(w, resp.Body, buf) } -func streamErrorVideo(link string, w http.ResponseWriter, r *http.Request, t *intTor.TorrentManager, c config.ConfigInterface, log *zap.SugaredLogger) { +func (gf *GetFile) playErrorVideo(link string, w http.ResponseWriter, r *http.Request, t *intTor.TorrentManager, c config.ConfigInterface, log *zap.SugaredLogger) { resp := t.UnrestrictUntilOk(link) if resp == nil { http.Error(w, "REAL-DEBRID IS DOWN", http.StatusInternalServerError) return } - streamFileToResponse(nil, resp.Download, w, r, t, c, log) + if c.ShouldServeFromRclone() { + http.Redirect(w, r, resp.Download, http.StatusFound) + return + } + gf.streamFileToResponse(nil, resp.Download, w, r, t, c, log) } diff --git a/pkg/realdebrid/api.go b/pkg/realdebrid/api.go index 52fb58c..c4a656c 100644 --- a/pkg/realdebrid/api.go +++ b/pkg/realdebrid/api.go @@ -296,7 +296,7 @@ func (rd *RealDebrid) UnrestrictLink(link string) (*UnrestrictResponse, error) { return nil, fmt.Errorf("undecodable response so likely it has expired") } - if !canFetchFirstByte(response.Download) { + if !rd.canFetchFirstByte(response.Download) { return nil, fmt.Errorf("can't fetch first byte") } diff --git a/pkg/realdebrid/unrestrict.go b/pkg/realdebrid/unrestrict.go index dddf7fc..c3c08b9 100644 --- a/pkg/realdebrid/unrestrict.go +++ b/pkg/realdebrid/unrestrict.go @@ -18,6 +18,47 @@ func (rd *RealDebrid) UnrestrictUntilOk(link string) *UnrestrictResponse { }) } +func (rd *RealDebrid) canFetchFirstByte(url string) bool { + const maxAttempts = 3 + + for i := 0; i < maxAttempts; i++ { + // Create a new HTTP request + req, err := http.NewRequest("GET", url, nil) + if err != nil { + continue + } + + // Set the Range header to request only the first byte + req.Header.Set("Range", "bytes=0-0") + + // TODO set a proper client + resp, err := http.DefaultClient.Do(req) + if err != nil { + time.Sleep(1 * time.Second) // Add a delay before the next retry + continue + } + defer resp.Body.Close() + + // If server supports partial content + if resp.StatusCode == http.StatusPartialContent { + buffer := make([]byte, 1) + _, err := resp.Body.Read(buffer) + if err == nil { + return true + } + } else if resp.StatusCode == http.StatusOK { + // If server doesn't support partial content, try reading the first byte and immediately close + buffer := make([]byte, 1) + _, err = resp.Body.Read(buffer) + if err == nil { + return true + } + } + time.Sleep(500 * time.Millisecond) // Add a delay before the next retry + } + return false +} + func retryUntilOk[T any](fn func() (T, error)) T { // const initialDelay = 1 * time.Second // const maxDelay = 128 * time.Second @@ -50,44 +91,3 @@ func retryUntilOk[T any](fn func() (T, error)) T { time.Sleep(delay) } } - -func canFetchFirstByte(url string) bool { - const maxAttempts = 3 - - for i := 0; i < maxAttempts; i++ { - // Create a new HTTP request - req, err := http.NewRequest("GET", url, nil) - if err != nil { - continue - } - - // Set the Range header to request only the first byte - req.Header.Set("Range", "bytes=0-0") - - // Execute the request - resp, err := http.DefaultClient.Do(req) - if err != nil { - time.Sleep(1 * time.Second) // Add a delay before the next retry - continue - } - defer resp.Body.Close() - - // If server supports partial content - if resp.StatusCode == http.StatusPartialContent { - buffer := make([]byte, 1) - _, err := resp.Body.Read(buffer) - if err == nil { - return true - } - } else if resp.StatusCode == http.StatusOK { - // If server doesn't support partial content, try reading the first byte and immediately close - buffer := make([]byte, 1) - _, err = resp.Body.Read(buffer) - if err == nil { - return true - } - } - time.Sleep(500 * time.Millisecond) // Add a delay before the next retry - } - return false -}