diff --git a/go.mod b/go.mod index d588ff3..e5034e3 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,9 @@ module github.com/debridmediamanager.com/zurg go 1.21.3 require ( + github.com/elliotchance/orderedmap/v2 v2.2.0 github.com/hashicorp/golang-lru/v2 v2.0.7 + github.com/nutsdb/nutsdb v0.14.1 go.uber.org/zap v1.26.0 gopkg.in/yaml.v3 v3.0.1 ) @@ -12,12 +14,8 @@ require ( github.com/antlabs/stl v0.0.1 // indirect github.com/antlabs/timer v0.0.11 // indirect github.com/bwmarrin/snowflake v0.3.0 // indirect - github.com/cenkalti/backoff v2.2.1+incompatible // indirect - github.com/cenkalti/backoff/v4 v4.2.1 // indirect - github.com/elliotchance/orderedmap/v2 v2.2.0 // indirect github.com/emirpasic/gods v1.18.1 // indirect github.com/gofrs/flock v0.8.1 // indirect - github.com/nutsdb/nutsdb v0.14.1 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/tidwall/btree v1.6.0 // indirect github.com/xujiajun/mmap-go v1.0.1 // indirect diff --git a/go.sum b/go.sum index 99f954e..ff76c95 100644 --- a/go.sum +++ b/go.sum @@ -4,10 +4,6 @@ github.com/antlabs/timer v0.0.11 h1:z75oGFLeTqJHMOcWzUPBKsBbQAz4Ske3AfqJ7bsdcwU= github.com/antlabs/timer v0.0.11/go.mod h1:JNV8J3yGvMKhCavGXgj9HXrVZkfdQyKCcqXBT8RdyuU= github.com/bwmarrin/snowflake v0.3.0 h1:xm67bEhkKh6ij1790JB83OujPR5CzNe8QuQqAgISZN0= github.com/bwmarrin/snowflake v0.3.0/go.mod h1:NdZxfVWX+oR6y2K0o6qAYv6gIOP9rjG0/E9WsDpxqwE= -github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4= -github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM= -github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM= -github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -46,6 +42,8 @@ golang.org/x/sys v0.10.0 h1:SqMFp9UcQJZa+pmYuAKjd9xq1f0j5rLcDIk0mj4qAsA= golang.org/x/sys v0.10.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/go-playground/assert.v1 v1.2.1 h1:xoYuJVE7KT85PYWrN730RguIQO0ePzVRfFMXadIrXTM= +gopkg.in/go-playground/assert.v1 v1.2.1/go.mod h1:9RXL0bg/zibRAgZUYszZSwO/z8Y/a8bDuhia5mkpMnE= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/torrent/manager.go b/internal/torrent/manager.go index a1919d9..a7eabc8 100644 --- a/internal/torrent/manager.go +++ b/internal/torrent/manager.go @@ -19,17 +19,15 @@ import ( ) type TorrentManager struct { - TorrentMap *orderedmap.OrderedMap[string, *Torrent] // accessKey -> Torrent - requiredVersion string - rd *realdebrid.RealDebrid - checksum string - config config.ConfigInterface - db *nutsdb.DB - workerPool chan bool - directoryMap map[string][]string - processedTorrents map[string][]string - mu *sync.Mutex - log *zap.SugaredLogger + TorrentMap *orderedmap.OrderedMap[string, *Torrent] // accessKey -> Torrent + requiredVersion string + rd *realdebrid.RealDebrid + checksum string + config config.ConfigInterface + db *nutsdb.DB + workerPool chan bool + mu *sync.Mutex + log *zap.SugaredLogger } // NewTorrentManager creates a new torrent manager @@ -37,16 +35,14 @@ type TorrentManager struct { // and store them in-memory and cached in files func NewTorrentManager(config config.ConfigInterface, db *nutsdb.DB) *TorrentManager { t := &TorrentManager{ - TorrentMap: orderedmap.NewOrderedMap[string, *Torrent](), - requiredVersion: fmt.Sprintf("10.11.2023/retain=%t", config.EnableRetainFolderNameExtension()), - rd: realdebrid.NewRealDebrid(config.GetToken(), logutil.NewLogger().Named("realdebrid")), - config: config, - db: db, - workerPool: make(chan bool, config.GetNumOfWorkers()), - directoryMap: make(map[string][]string), - processedTorrents: make(map[string][]string), - mu: &sync.Mutex{}, - log: logutil.NewLogger().Named("manager"), + TorrentMap: orderedmap.NewOrderedMap[string, *Torrent](), + requiredVersion: "10.11.2023", + rd: realdebrid.NewRealDebrid(config.GetToken(), logutil.NewLogger().Named("realdebrid")), + config: config, + db: db, + workerPool: make(chan bool, config.GetNumOfWorkers()), + mu: &sync.Mutex{}, + log: logutil.NewLogger().Named("manager"), } // start with a clean slate @@ -68,6 +64,7 @@ func NewTorrentManager(config config.ConfigInterface, db *nutsdb.DB) *TorrentMan <-t.workerPool }(i) } + t.log.Infof("Got %d torrents", len(newTorrents)) wg.Wait() close(torrentsChan) for newTorrent := range torrentsChan { @@ -84,9 +81,9 @@ func NewTorrentManager(config config.ConfigInterface, db *nutsdb.DB) *TorrentMan t.checksum = t.getChecksum() t.mu.Unlock() - // if t.config.EnableRepair() { - // go t.repairAll() - // } + if t.config.EnableRepair() { + go t.repairAll() + } go t.startRefreshJob() return t @@ -96,6 +93,7 @@ func (t *TorrentManager) mergeToMain(t1, t2 *Torrent) *Torrent { merged := t1 // Merge SelectedFiles + // iteration works! for el := t2.SelectedFiles.Front(); el != nil; el = el.Next() { if _, ok := merged.SelectedFiles.Get(el.Key); !ok { merged.SelectedFiles.Set(el.Key, el.Value) @@ -133,8 +131,8 @@ type torrentsResponse struct { // generates a checksum based on the number of torrents, the first torrent id and the number of active torrents func (t *TorrentManager) getChecksum() string { - torrentsChan := make(chan torrentsResponse) - countChan := make(chan int) + torrentsChan := make(chan torrentsResponse, 1) + countChan := make(chan int, 1) errChan := make(chan error, 2) // accommodate errors from both goroutines // GetTorrents request @@ -199,8 +197,9 @@ func (t *TorrentManager) startRefreshJob() { t.log.Errorf("Cannot get torrents: %v\n", err) continue } + t.log.Infof("Detected changes! Refreshing %d torrents", len(newTorrents)) - torrentsChan := make(chan *Torrent) + torrentsChan := make(chan *Torrent, len(newTorrents)) var wg sync.WaitGroup for i := range newTorrents { wg.Add(1) @@ -212,20 +211,23 @@ func (t *TorrentManager) startRefreshJob() { }(i) } - // deletes - // for el := t.TorrentMap.Front(); el != nil; el = el.Next() { - // found := false - // for _, newTorrent := range newTorrents { - // if newTorrent.ID == el.Value.AccessKey { - // found = true - // break - // } - // } - // if !found { - // t.log.Infof("Torrent id=%s is no longer found", accessKey) - // t.TorrentMap.Delete(accessKey) - // } - // } + // iteration works! + var toDelete []string + for el := t.TorrentMap.Front(); el != nil; el = el.Next() { + found := false + for _, newTorrent := range newTorrents { + if newTorrent.ID == el.Value.AccessKey { + found = true + break + } + } + if !found { + toDelete = append(toDelete, el.Key) + } + } + for _, accessKey := range toDelete { + t.TorrentMap.Delete(accessKey) + } wg.Wait() close(torrentsChan) @@ -243,9 +245,9 @@ func (t *TorrentManager) startRefreshJob() { t.checksum = t.getChecksum() t.mu.Unlock() - // if t.config.EnableRepair() { - // go t.repairAll() - // } + if t.config.EnableRepair() { + go t.repairAll() + } go OnLibraryUpdateHook(t.config) } } @@ -289,34 +291,37 @@ func (t *TorrentManager) getMoreInfo(rdTorrent realdebrid.Torrent) *Torrent { }) } if selectedFiles.Len() > len(info.Links) && info.Progress == 100 { - t.log.Debugf("Some links has expired for %s %s: %d selected but only %d link(s)", info.ID, info.Name, selectedFiles.Len(), len(info.Links)) + t.log.Debugf("%d links has expired for %s %s", selectedFiles.Len()-len(info.Links), info.ID, info.Name) // chaotic file means RD will not output the desired file selection // e.g. even if we select just a single mkv, it will output a rar var isChaotic bool selectedFiles, isChaotic = t.organizeChaos(&rdTorrent, selectedFiles) - if isChaotic && selectedFiles.Len() == 1 { - t.log.Infof("Torrent %s %s is unfixable, it's always returning an unstreamable link, ignoring", info.ID, info.Name) + if isChaotic { + t.log.Errorf("Torrent id=%s %s is unfixable, it's always returning an unstreamable link (it is no longer shown in your directories)", info.ID, info.Name) t.log.Debugf("You can try fixing it yourself magnet:?xt=urn:btih:%s", info.Hash) + return nil } else { if streamableCount > 1 { // case for repair 1: it's missing some links (or all links) // if we download it as is, we might get the same file over and over again // so we need to redownload it with other files selected // that is why we check if there are other streamable files - t.log.Infof("Torrent %s %s marked for repair", info.ID, info.Name) + t.log.Infof("Torrent id=%s %s marked for repair", info.ID, info.Name) forRepair = true } else { - t.log.Infof("Torrent %s %s is unfixable, the lone streamable link has expired, ignoring", info.ID, info.Name) + t.log.Errorf("Torrent id=%s %s is unfixable, the lone streamable link has expired (it is no longer shown in your directories)", info.ID, info.Name) t.log.Debugf("You can try fixing it yourself magnet:?xt=urn:btih:%s", info.Hash) + return nil } } } else if selectedFiles.Len() == len(info.Links) { // all links are still intact! good! + // iteration works! i := 0 for el := selectedFiles.Front(); el != nil; el = el.Next() { if i < len(info.Links) { file := el.Value - file.Link = info.Links[i] + file.Link = info.Links[i] // verified working! selectedFiles.Set(el.Key, file) i++ } @@ -446,11 +451,9 @@ func (t *TorrentManager) organizeChaos(info *realdebrid.Torrent, selectedFiles * } go func() { - t.log.Debugf("Checking %d link(s) for problematic torrent id=%s", len(info.Links), info.ID) wg.Wait() close(sem) close(resultsChan) - t.log.Debugf("Closing results channel for torrent id=%s, checking...", info.ID) }() isChaotic := false @@ -459,9 +462,9 @@ func (t *TorrentManager) organizeChaos(info *realdebrid.Torrent, selectedFiles * continue } found := false + // iteration works! for el := selectedFiles.Front(); el != nil; el = el.Next() { if file, _ := selectedFiles.Get(el.Key); strings.Contains(file.Path, result.Response.Filename) { - t.log.Debugf("Found a file that is in the selection for torrent id=%s: %s", info.ID, result.Response.Filename) file.Link = result.Response.Link found = true } @@ -478,6 +481,8 @@ func (t *TorrentManager) organizeChaos(info *realdebrid.Torrent, selectedFiles * }, Link: result.Response.Link, }) + } else { + isChaotic = true } } } @@ -485,47 +490,48 @@ func (t *TorrentManager) organizeChaos(info *realdebrid.Torrent, selectedFiles * return selectedFiles, isChaotic } -// func (t *TorrentManager) repairAll() { -// for el := t.TorrentMap.Front(); el != nil; el = el.Next() { -// torrent := el.Value -// // do not repair if: in progress -// if torrent.InProgress { -// continue -// } +func (t *TorrentManager) repairAll() { + // iteration works! + for el := t.TorrentMap.Front(); el != nil; el = el.Next() { + torrent := el.Value + // do not repair if in progress + if torrent.InProgress { + continue + } -// var missingFiles []File -// for el2 := torrent.SelectedFiles.Front(); el2 != nil; el2 = el2.Next() { -// file, ok := torrent.SelectedFiles.Get(el2.Key) -// if !ok { -// continue -// } -// // check for case of repairs like -// // case 1: missing links -// // case 2: unrestrictable links TODO -// if file.Link == "" { -// missingFiles = append(missingFiles, *file) -// } -// } -// if len(missingFiles) == 0 { -// continue -// } + var missingFiles []File + for el2 := torrent.SelectedFiles.Front(); el2 != nil; el2 = el2.Next() { + file, ok := torrent.SelectedFiles.Get(el2.Key) + if !ok { + continue + } + // check for case of repairs like + // case 1: missing links + // case 2: unrestrictable links TODO + if file.Link == "" { + missingFiles = append(missingFiles, *file) + } + } + if len(missingFiles) == 0 { + continue + } -// for _, info := range torrent.Instances { -// if info.ForRepair { -// t.log.Infof("There were less links than was expected on %s %s; fixing...", info.ID, info.Name) -// // t.repair(&info, true) -// break // only repair the first one for repair and then move on -// } -// if len(info.Links) == 0 && info.Progress == 100 { -// // If the torrent has no links -// // and already processing repair -// // delete it! -// t.log.Infof("Deleting broken torrent id=%s as it doesn't contain any files", info.ID) -// t.rd.DeleteTorrent(info.ID) -// } -// } -// } -// } + for _, info := range torrent.Instances { + if info.ForRepair { + t.log.Infof("There were less links than was expected on %s %s; fixing...", info.ID, info.Name) + // t.repair(&info, true) + break // only repair the first one for repair and then move on + } + if len(info.Links) == 0 && info.Progress == 100 { + // If the torrent has no links + // and already processing repair + // delete it! + t.log.Infof("Deleting broken torrent id=%s as it doesn't contain any files", info.ID) + t.rd.DeleteTorrent(info.ID) + } + } + } +} // func (t *TorrentManager) repair(info *realdebrid.TorrentInfo, tryReinsertionFirst bool) { // // file.Link == "" should be repaired @@ -583,7 +589,7 @@ func (t *TorrentManager) organizeChaos(info *realdebrid.Torrent, selectedFiles * // t.log.Info("No other missing files left to reinsert") // } // } else { -// t.log.Infof("Torrent id=%s is unfixable as the only link cached in RD is already broken", info.ID) +// t.log.Infof("Torrent id=%s is unfixable as the only link cached in RD is already broken (it is no longer shown in your directories)", info.ID) // t.log.Debugf("You can try fixing it yourself magnet:?xt=urn:btih:%s", info.Hash) // return // } diff --git a/internal/universal/get.go b/internal/universal/get.go index f699433..6f190d0 100644 --- a/internal/universal/get.go +++ b/internal/universal/get.go @@ -6,13 +6,12 @@ import ( "path" "path/filepath" "strings" - "time" - "github.com/cenkalti/backoff" "github.com/debridmediamanager.com/zurg/internal/config" "github.com/debridmediamanager.com/zurg/internal/dav" intHttp "github.com/debridmediamanager.com/zurg/internal/http" "github.com/debridmediamanager.com/zurg/internal/torrent" + zurghttp "github.com/debridmediamanager.com/zurg/pkg/http" "github.com/debridmediamanager.com/zurg/pkg/logutil" "github.com/hashicorp/golang-lru/v2/expirable" "go.uber.org/zap" @@ -73,6 +72,7 @@ func HandleGetRequest(w http.ResponseWriter, r *http.Request, t *torrent.Torrent resp := t.UnrestrictUntilOk(link) if resp == nil { + log.Errorf("The link cannot be unrestricted, file %s is no longer available", file.Path) // TODO: maybe repair the torrent? streamErrorVideo("https://www.youtube.com/watch?v=gea_FJrtFVA", w, r, t, c, log) return @@ -100,55 +100,36 @@ func streamFileToResponse(url string, w http.ResponseWriter, r *http.Request, t return } - // Copy the headers from the incoming request to the new request. - for k, values := range r.Header { - for _, v := range values { - req.Header.Add(k, v) - } + // copy range header if it exists + if r.Header.Get("Range") != "" { + req.Header.Add("Range", r.Header.Get("Range")) } // Create a custom HTTP client with a timeout. - client := &http.Client{ - Timeout: time.Second * 30, // Full request timeout including dial, request and response. - } + client := zurghttp.NewHTTPClient(c.GetToken(), 10) - // Define a retry policy with exponential backoff. - retryPolicy := backoff.NewExponentialBackOff() - retryPolicy.MaxElapsedTime = time.Minute * 2 // Set the maximum elapsed time for retries. - - // Use an operation with retries. - operation := func() error { - resp, err := client.Do(req) - if err != nil { - log.Errorf("Error downloading file: %v", err) - return err - } - defer resp.Body.Close() - - if resp.StatusCode >= 500 { - log.Errorf("Received a 5XX status code: %d", resp.StatusCode) - return backoff.Permanent(err) // Stop retrying on bad status code. - } - - // Copy the headers from the response to the ResponseWriter. - for k, vv := range resp.Header { - for _, v := range vv { - w.Header().Add(k, v) - } - } - - // Stream the content to the ResponseWriter. - buf := make([]byte, c.GetNetworkBufferSize()) - _, err = io.CopyBuffer(w, resp.Body, buf) - return err - } - - // Perform the operation with the retry policy. - if err := backoff.Retry(operation, retryPolicy); err != nil { - log.Errorf("Failed after retries: %v", err) + resp, err := client.Do(req) + if err != nil { + log.Errorf("Error downloading file %v", err) streamErrorVideo("https://www.youtube.com/watch?v=FSSd8cponAA", w, r, t, c, log) return } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusPartialContent { + log.Errorf("Received a nonOK status code %d", resp.StatusCode) + streamErrorVideo("https://www.youtube.com/watch?v=BcseUxviVqE", w, r, t, c, log) + return + } + + for k, vv := range resp.Header { + for _, v := range vv { + w.Header().Add(k, v) + } + } + + buf := make([]byte, c.GetNetworkBufferSize()) + io.CopyBuffer(w, resp.Body, buf) } func streamErrorVideo(link string, w http.ResponseWriter, r *http.Request, t *torrent.TorrentManager, c config.ConfigInterface, log *zap.SugaredLogger) { diff --git a/internal/universal/util.go b/internal/universal/util.go deleted file mode 100644 index 897888b..0000000 --- a/internal/universal/util.go +++ /dev/null @@ -1 +0,0 @@ -package universal diff --git a/pkg/http/client.go b/pkg/http/client.go index 50b65ad..9aae92f 100644 --- a/pkg/http/client.go +++ b/pkg/http/client.go @@ -29,10 +29,10 @@ func (r *HTTPClient) Do(req *http.Request) (*http.Response, error) { return resp, err } -func NewHTTPClient(token string, maxRetries int, timeout time.Duration) *HTTPClient { +func NewHTTPClient(token string, maxRetries int) *HTTPClient { return &HTTPClient{ BearerToken: token, - Client: &http.Client{Timeout: timeout}, + Client: &http.Client{}, MaxRetries: maxRetries, Backoff: func(attempt int) time.Duration { return time.Duration(attempt) * time.Second diff --git a/pkg/realdebrid/api.go b/pkg/realdebrid/api.go index 6eecce7..a13fd78 100644 --- a/pkg/realdebrid/api.go +++ b/pkg/realdebrid/api.go @@ -9,7 +9,6 @@ import ( "net/url" "strconv" "strings" - "time" zurghttp "github.com/debridmediamanager.com/zurg/pkg/http" "go.uber.org/zap" @@ -22,9 +21,7 @@ type RealDebrid struct { func NewRealDebrid(accessToken string, log *zap.SugaredLogger) *RealDebrid { maxRetries := 10 - timeout := 10 * time.Second - client := zurghttp.NewHTTPClient(accessToken, maxRetries, timeout) - log.Debugf("Created an HTTP client with %d max retries and %s timeout", maxRetries, timeout) + client := zurghttp.NewHTTPClient(accessToken, maxRetries) return &RealDebrid{ log: log, client: client,