Use a proper client for fetch byte

This commit is contained in:
Ben Sarmiento
2023-11-24 21:35:22 +01:00
parent 1e8c50a350
commit 595040ad7e
6 changed files with 97 additions and 116 deletions

View File

@@ -1,17 +1,15 @@
package main
import (
"context"
"fmt"
"net/http"
"os"
"os/signal"
"syscall"
"time"
"github.com/debridmediamanager.com/zurg/internal/config"
"github.com/debridmediamanager.com/zurg/internal/net"
"github.com/debridmediamanager.com/zurg/internal/torrent"
"github.com/debridmediamanager.com/zurg/internal/universal"
"github.com/debridmediamanager.com/zurg/internal/version"
"github.com/panjf2000/ants/v2"
@@ -44,7 +42,8 @@ func main() {
cache := expirable.NewLRU[string, string](1e4, nil, time.Hour)
client := zurghttp.NewHTTPClient(config.GetToken(), 3, nil)
client := zurghttp.NewHTTPClient(config.GetToken(), 5, nil)
rd := realdebrid.NewRealDebrid(config.GetToken(), client, logutil.NewLogger().Named("realdebrid"))
p, err := ants.NewPool(config.GetNumOfWorkers())
@@ -56,58 +55,17 @@ func main() {
torrentMgr := torrent.NewTorrentManager(config, rd, p)
getfile := universal.NewGetFile(client)
mux := http.NewServeMux()
net.Router(mux, config, torrentMgr, cache)
net.Router(mux, getfile, config, torrentMgr, cache)
addr := fmt.Sprintf("%s:%s", config.GetHost(), config.GetPort())
server := &http.Server{Addr: addr, Handler: mux}
shutdown := make(chan os.Signal, 1)
signal.Notify(shutdown, os.Interrupt, syscall.SIGTERM)
go func() {
log.Infof("Starting server on %s", addr)
if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Errorf("Failed to start server: %v", err)
os.Exit(1)
}
}()
// log.Debugf("Initializing chunk manager, cores: %d", runtime.NumCPU())
// client := zurghttp.NewHTTPClient(config.GetToken(), 3, config)
// chunkMgr, err := chunk.NewManager(
// "", // in-memory chunks
// 10485760, // 10MB chunk size
// max(runtime.NumCPU()/2, 1), // 8 cores/2 = 4 chunks to load ahead
// max(runtime.NumCPU()/2, 1), // 4 check threads
// max(runtime.NumCPU()-1, 1), // number of chunks that should be read ahead
// runtime.NumCPU()*2, // total chunks kept in memory
// torrentMgr,
// client)
// if nil != err {
// log.Panicf("Failed to initialize chunk manager: %v", err)
// }
// fs := zfs.NewZurgFS(torrentMgr, config, chunkMgr, logutil.NewLogger().Named("zfs"))
// host := fuse.NewFileSystemHost(fs)
// go func() {
// log.Infof("Mounting on %s", config.GetMountPoint())
// if err := zfs.Mount(host, config); err != nil {
// log.Panicf("Failed to mount: %v", err)
// }
// }()
<-shutdown
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if err := server.Shutdown(ctx); err != nil {
log.Errorf("Server shutdown error: %v\n", err)
}
// if err := zfs.Unmount(host); err != nil {
// log.Errorf("Unmount error: %v\n", err)
// }
log.Info("BYE")
}

View File

@@ -18,6 +18,7 @@ type ConfigInterface interface {
EnableRetainFolderNameExtension() bool
EnableRetainRDTorrentName() bool
GetRandomPreferredHost() string
ShouldServeFromRclone() bool
}
type ZurgConfig struct {
@@ -34,6 +35,7 @@ type ZurgConfig struct {
RetainFolderNameExtension bool `yaml:"retain_folder_name_extension"`
RetainRDTorrentName bool `yaml:"retain_rd_torrent_name"`
PreferredHosts []string `yaml:"preferred_hosts"`
ServeFromRclone bool `yaml:"serve_from_rclone"`
}
func (z *ZurgConfig) GetToken() string {
@@ -105,3 +107,7 @@ func (z *ZurgConfig) GetRandomPreferredHost() string {
randomIndex := rand.Intn(len(z.PreferredHosts))
return z.PreferredHosts[randomIndex]
}
func (z *ZurgConfig) ShouldServeFromRclone() bool {
return z.ServeFromRclone
}

View File

@@ -7,7 +7,7 @@ import (
"github.com/debridmediamanager.com/zurg/internal/config"
"github.com/debridmediamanager.com/zurg/internal/dav"
zurghttp "github.com/debridmediamanager.com/zurg/internal/http"
intHttp "github.com/debridmediamanager.com/zurg/internal/http"
"github.com/debridmediamanager.com/zurg/internal/torrent"
"github.com/debridmediamanager.com/zurg/internal/universal"
"github.com/debridmediamanager.com/zurg/pkg/logutil"
@@ -15,7 +15,7 @@ import (
)
// Router creates a WebDAV router
func Router(mux *http.ServeMux, c config.ConfigInterface, t *torrent.TorrentManager, cache *expirable.LRU[string, string]) {
func Router(mux *http.ServeMux, getfile *universal.GetFile, c config.ConfigInterface, t *torrent.TorrentManager, cache *expirable.LRU[string, string]) {
log := logutil.NewLogger().Named("net")
mux.HandleFunc("/http/", func(w http.ResponseWriter, r *http.Request) {
@@ -23,9 +23,9 @@ func Router(mux *http.ServeMux, c config.ConfigInterface, t *torrent.TorrentMana
case http.MethodGet:
requestPath := path.Clean(r.URL.Path)
if countNonEmptySegments(strings.Split(requestPath, "/")) > 3 {
universal.HandleGetRequest(w, r, t, c, cache)
getfile.HandleGetRequest(w, r, t, c, cache)
} else {
zurghttp.HandleDirectoryListing(w, r, t)
intHttp.HandleDirectoryListing(w, r, t)
}
case http.MethodHead:
@@ -47,7 +47,7 @@ func Router(mux *http.ServeMux, c config.ConfigInterface, t *torrent.TorrentMana
dav.HandleDeleteRequest(w, r, t, davlog)
case http.MethodGet:
universal.HandleGetRequest(w, r, t, c, cache)
getfile.HandleGetRequest(w, r, t, c, cache)
case http.MethodOptions:
w.WriteHeader(http.StatusOK)

View File

@@ -17,8 +17,16 @@ import (
"go.uber.org/zap"
)
type GetFile struct {
client *zurghttp.HTTPClient
}
func NewGetFile(client *zurghttp.HTTPClient) *GetFile {
return &GetFile{client: client}
}
// HandleGetRequest handles a GET request universally for both WebDAV and HTTP
func HandleGetRequest(w http.ResponseWriter, r *http.Request, t *intTor.TorrentManager, c config.ConfigInterface, cache *expirable.LRU[string, string]) {
func (gf *GetFile) HandleGetRequest(w http.ResponseWriter, r *http.Request, t *intTor.TorrentManager, c config.ConfigInterface, cache *expirable.LRU[string, string]) {
log := logutil.NewLogger().Named("file")
requestPath := path.Clean(r.URL.Path)
@@ -65,15 +73,19 @@ func HandleGetRequest(w http.ResponseWriter, r *http.Request, t *intTor.TorrentM
return
}
if data, exists := cache.Get(requestPath); exists {
streamFileToResponse(file, data, w, r, t, c, log)
if url, exists := cache.Get(requestPath); exists {
if c.ShouldServeFromRclone() {
http.Redirect(w, r, url, http.StatusFound)
} else {
gf.streamFileToResponse(file, url, w, r, t, c, log)
}
return
}
if !strings.HasPrefix(file.Link, "http") {
// This is a dead file, serve an alternate file
log.Warnf("File %s is not yet available, zurg is repairing the torrent", filename)
streamErrorVideo("https://www.youtube.com/watch?v=bGTqwt6vdcY", w, r, t, c, log)
gf.playErrorVideo("https://www.youtube.com/watch?v=bGTqwt6vdcY", w, r, t, c, log)
return
}
link := file.Link
@@ -83,7 +95,7 @@ func HandleGetRequest(w http.ResponseWriter, r *http.Request, t *intTor.TorrentM
log.Warnf("File %s is no longer available", file.Path)
file.Link = "repair"
t.SetChecksum("") // force a recheck
streamErrorVideo("https://www.youtube.com/watch?v=gea_FJrtFVA", w, r, t, c, log)
gf.playErrorVideo("https://www.youtube.com/watch?v=gea_FJrtFVA", w, r, t, c, log)
} else {
if resp.Filename != filename {
// this is possible if there's only 1 streamable file in the torrent
@@ -92,25 +104,29 @@ func HandleGetRequest(w http.ResponseWriter, r *http.Request, t *intTor.TorrentM
expectedExt := filepath.Ext(filename)
if actualExt != expectedExt && resp.Streamable != 1 {
log.Warnf("File was changed and is not streamable: %s and %s", filename, resp.Filename)
streamErrorVideo("https://www.youtube.com/watch?v=t9VgOriBHwE", w, r, t, c, log)
gf.playErrorVideo("https://www.youtube.com/watch?v=t9VgOriBHwE", w, r, t, c, log)
return
} else {
log.Warnf("Filename mismatch: %s and %s", filename, resp.Filename)
}
}
cache.Add(requestPath, resp.Download)
streamFileToResponse(file, resp.Download, w, r, t, c, log)
if c.ShouldServeFromRclone() {
http.Redirect(w, r, resp.Download, http.StatusFound)
} else {
gf.streamFileToResponse(file, resp.Download, w, r, t, c, log)
}
}
}
func streamFileToResponse(file *intTor.File, url string, w http.ResponseWriter, r *http.Request, torMgr *intTor.TorrentManager, cfg config.ConfigInterface, log *zap.SugaredLogger) {
func (gf *GetFile) streamFileToResponse(file *intTor.File, url string, w http.ResponseWriter, r *http.Request, torMgr *intTor.TorrentManager, cfg config.ConfigInterface, log *zap.SugaredLogger) {
// Create a new request for the file download.
req, err := http.NewRequest(http.MethodGet, url, nil)
if err != nil {
if file != nil {
log.Errorf("Error creating new request for file %s: %v", file.Path, err)
}
streamErrorVideo("https://www.youtube.com/watch?v=H3NSrObyAxM", w, r, torMgr, cfg, log)
gf.playErrorVideo("https://www.youtube.com/watch?v=H3NSrObyAxM", w, r, torMgr, cfg, log)
return
}
@@ -119,17 +135,14 @@ func streamFileToResponse(file *intTor.File, url string, w http.ResponseWriter,
req.Header.Add("Range", r.Header.Get("Range"))
}
// Create a custom HTTP client
client := zurghttp.NewHTTPClient(cfg.GetToken(), 10, cfg)
resp, err := client.Do(req)
resp, err := gf.client.Do(req)
if err != nil {
if file != nil {
log.Warnf("Cannot download file %s: %v", file.Path, err)
file.Link = "repair"
torMgr.SetChecksum("") // force a recheck
}
streamErrorVideo("https://www.youtube.com/watch?v=FSSd8cponAA", w, r, torMgr, cfg, log)
gf.playErrorVideo("https://www.youtube.com/watch?v=FSSd8cponAA", w, r, torMgr, cfg, log)
return
}
defer resp.Body.Close()
@@ -140,7 +153,7 @@ func streamFileToResponse(file *intTor.File, url string, w http.ResponseWriter,
file.Link = "repair"
torMgr.SetChecksum("") // force a recheck
}
streamErrorVideo("https://www.youtube.com/watch?v=BcseUxviVqE", w, r, torMgr, cfg, log)
gf.playErrorVideo("https://www.youtube.com/watch?v=BcseUxviVqE", w, r, torMgr, cfg, log)
return
}
@@ -154,11 +167,15 @@ func streamFileToResponse(file *intTor.File, url string, w http.ResponseWriter,
io.CopyBuffer(w, resp.Body, buf)
}
func streamErrorVideo(link string, w http.ResponseWriter, r *http.Request, t *intTor.TorrentManager, c config.ConfigInterface, log *zap.SugaredLogger) {
func (gf *GetFile) playErrorVideo(link string, w http.ResponseWriter, r *http.Request, t *intTor.TorrentManager, c config.ConfigInterface, log *zap.SugaredLogger) {
resp := t.UnrestrictUntilOk(link)
if resp == nil {
http.Error(w, "REAL-DEBRID IS DOWN", http.StatusInternalServerError)
return
}
streamFileToResponse(nil, resp.Download, w, r, t, c, log)
if c.ShouldServeFromRclone() {
http.Redirect(w, r, resp.Download, http.StatusFound)
return
}
gf.streamFileToResponse(nil, resp.Download, w, r, t, c, log)
}

View File

@@ -296,7 +296,7 @@ func (rd *RealDebrid) UnrestrictLink(link string) (*UnrestrictResponse, error) {
return nil, fmt.Errorf("undecodable response so likely it has expired")
}
if !canFetchFirstByte(response.Download) {
if !rd.canFetchFirstByte(response.Download) {
return nil, fmt.Errorf("can't fetch first byte")
}

View File

@@ -18,6 +18,47 @@ func (rd *RealDebrid) UnrestrictUntilOk(link string) *UnrestrictResponse {
})
}
func (rd *RealDebrid) canFetchFirstByte(url string) bool {
const maxAttempts = 3
for i := 0; i < maxAttempts; i++ {
// Create a new HTTP request
req, err := http.NewRequest("GET", url, nil)
if err != nil {
continue
}
// Set the Range header to request only the first byte
req.Header.Set("Range", "bytes=0-0")
// TODO set a proper client
resp, err := http.DefaultClient.Do(req)
if err != nil {
time.Sleep(1 * time.Second) // Add a delay before the next retry
continue
}
defer resp.Body.Close()
// If server supports partial content
if resp.StatusCode == http.StatusPartialContent {
buffer := make([]byte, 1)
_, err := resp.Body.Read(buffer)
if err == nil {
return true
}
} else if resp.StatusCode == http.StatusOK {
// If server doesn't support partial content, try reading the first byte and immediately close
buffer := make([]byte, 1)
_, err = resp.Body.Read(buffer)
if err == nil {
return true
}
}
time.Sleep(500 * time.Millisecond) // Add a delay before the next retry
}
return false
}
func retryUntilOk[T any](fn func() (T, error)) T {
// const initialDelay = 1 * time.Second
// const maxDelay = 128 * time.Second
@@ -50,44 +91,3 @@ func retryUntilOk[T any](fn func() (T, error)) T {
time.Sleep(delay)
}
}
func canFetchFirstByte(url string) bool {
const maxAttempts = 3
for i := 0; i < maxAttempts; i++ {
// Create a new HTTP request
req, err := http.NewRequest("GET", url, nil)
if err != nil {
continue
}
// Set the Range header to request only the first byte
req.Header.Set("Range", "bytes=0-0")
// Execute the request
resp, err := http.DefaultClient.Do(req)
if err != nil {
time.Sleep(1 * time.Second) // Add a delay before the next retry
continue
}
defer resp.Body.Close()
// If server supports partial content
if resp.StatusCode == http.StatusPartialContent {
buffer := make([]byte, 1)
_, err := resp.Body.Read(buffer)
if err == nil {
return true
}
} else if resp.StatusCode == http.StatusOK {
// If server doesn't support partial content, try reading the first byte and immediately close
buffer := make([]byte, 1)
_, err = resp.Body.Read(buffer)
if err == nil {
return true
}
}
time.Sleep(500 * time.Millisecond) // Add a delay before the next retry
}
return false
}