From a8e5744481652dc3674ec9c66e1fc0a8ddad6590 Mon Sep 17 00:00:00 2001 From: Ben Sarmiento Date: Sat, 2 Dec 2023 17:37:18 +0100 Subject: [PATCH] Implement proper checks for new torrents --- internal/app.go | 11 ++ internal/dav/listing.go | 8 +- internal/http/listing.go | 8 +- internal/torrent/manager.go | 255 ++++++++++++++++++++---------------- internal/torrent/types.go | 64 ++++++--- 5 files changed, 207 insertions(+), 139 deletions(-) diff --git a/internal/app.go b/internal/app.go index 4164df6..a1bcaf7 100644 --- a/internal/app.go +++ b/internal/app.go @@ -16,12 +16,16 @@ import ( "github.com/dgraph-io/ristretto" "github.com/julienschmidt/httprouter" "github.com/panjf2000/ants/v2" + + _ "net/http/pprof" // Register pprof ) func MainApp(configPath string) { log := logutil.NewLogger() zurglog := log.Named("zurg") + zurglog.Debugf("PID: %d", os.Getpid()) + config, configErr := config.LoadZurgConfig(configPath, log.Named("config")) if configErr != nil { zurglog.Errorf("Config failed to load: %v", configErr) @@ -61,6 +65,13 @@ func MainApp(configPath string) { handler.RedirectFixedPath = true router.ApplyRouteTable(handler, getfile, torrentMgr, config, rd, log.Named("router")) + go func() { + if err := netHttp.ListenAndServe(":6060", nil); err != nil && err != netHttp.ErrServerClosed { + zurglog.Errorf("Failed to start pprof: %v", err) + os.Exit(1) + } + }() + addr := fmt.Sprintf("%s:%s", config.GetHost(), config.GetPort()) zurglog.Infof("Starting server on %s", addr) if err := netHttp.ListenAndServe(addr, handler); err != nil && err != netHttp.ErrServerClosed { diff --git a/internal/dav/listing.go b/internal/dav/listing.go index 2079b5a..21328e7 100644 --- a/internal/dav/listing.go +++ b/internal/dav/listing.go @@ -47,8 +47,8 @@ func HandleListTorrents(directory string, t *torrent.TorrentManager, log *zap.Su davDoc += "" return &davDoc, nil } else { - davDoc := resp.(*string) - return davDoc, nil + davDoc := resp.(string) + return &davDoc, nil } } @@ -77,7 +77,7 @@ func HandleListFiles(directory, torrentName string, t *torrent.TorrentManager, l davDoc += "" return &davDoc, nil } else { - davDoc := resp.(*string) - return davDoc, nil + davDoc := resp.(string) + return &davDoc, nil } } diff --git a/internal/http/listing.go b/internal/http/listing.go index 024e86a..6ef26ce 100644 --- a/internal/http/listing.go +++ b/internal/http/listing.go @@ -50,8 +50,8 @@ func HandleListTorrents(directory string, t *torrent.TorrentManager, log *zap.Su } return &htmlDoc, nil } else { - htmlDoc := resp.(*string) - return htmlDoc, nil + htmlDoc := resp.(string) + return &htmlDoc, nil } } @@ -80,7 +80,7 @@ func HandleListFiles(directory, torrentName string, t *torrent.TorrentManager, l } return &htmlDoc, nil } else { - htmlDoc := resp.(*string) - return htmlDoc, nil + htmlDoc := resp.(string) + return &htmlDoc, nil } } diff --git a/internal/torrent/manager.go b/internal/torrent/manager.go index c903549..537e9c0 100644 --- a/internal/torrent/manager.go +++ b/internal/torrent/manager.go @@ -55,7 +55,7 @@ func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, p ResponseCache: cache, accessKeySet: set.NewStringSet(), latestState: &initialSate, - requiredVersion: "02.12.2023", + requiredVersion: "03.12.2023", workerPool: p, log: log, } @@ -118,14 +118,12 @@ func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, p } func (t *TorrentManager) RefreshTorrents() { - // get all torrent info instances, _, err := t.Api.GetTorrents(0) if err != nil { t.log.Warnf("Cannot get torrents: %v\n", err) return } - instanceCount := len(instances) - infoChan := make(chan *Torrent, instanceCount) + infoChan := make(chan *Torrent, len(instances)) var wg sync.WaitGroup for i := range instances { wg.Add(1) @@ -137,7 +135,7 @@ func (t *TorrentManager) RefreshTorrents() { } wg.Wait() close(infoChan) - t.log.Infof("Fetched info for %d torrents", instanceCount) + t.log.Infof("Fetched info for %d torrents", len(instances)) freshKeys := set.NewStringSet() oldTorrents, _ := t.DirectoryMap.Get(INT_ALL) @@ -150,26 +148,32 @@ func (t *TorrentManager) RefreshTorrents() { freshKeys.Add(info.AccessKey) if torrent, exists := oldTorrents.Get(info.AccessKey); !exists { oldTorrents.Set(info.AccessKey, info) - } else { + } else if !strset.Difference(info.DownloadedIDs, torrent.DownloadedIDs).IsEmpty() { mainTorrent := t.mergeToMain(torrent, info) - oldTorrents.Set(info.AccessKey, mainTorrent) + oldTorrents.Set(info.AccessKey, &mainTorrent) } } t.log.Infof("Compiled into %d torrents, %d were missing info", oldTorrents.Count(), noInfoCount) + + somthingChanged := false // removed strset.Difference(t.accessKeySet, freshKeys).Each(func(accessKey string) bool { + somthingChanged = true t.Delete(accessKey, false, false) return true }) // new strset.Difference(freshKeys, t.accessKeySet).Each(func(accessKey string) bool { + somthingChanged = true torrent, _ := oldTorrents.Get(accessKey) t.UpdateTorrentResponseCache(torrent) t.accessKeySet.Add(accessKey) return true }) // now we can build the directory responses - t.UpdateDirectoryResponsesCache() + if somthingChanged { + t.UpdateDirectoryResponsesCache() + } t.SetNewLatestState(t.getCurrentState()) @@ -179,34 +183,129 @@ func (t *TorrentManager) RefreshTorrents() { // }) } -func (t *TorrentManager) mergeToMain(mainTorrent, torrentToMerge *Torrent) *Torrent { +// getMoreInfo gets original name, size and files for a torrent +func (t *TorrentManager) getMoreInfo(rdTorrent realdebrid.Torrent) *Torrent { + infoCache, _ := t.DirectoryMap.Get(INT_INFO_CACHE) + if tor, exists := infoCache.Get(rdTorrent.ID); exists && tor.SelectedFiles.Count() == len(rdTorrent.Links) { + return tor + } + torrentFromFile := t.readTorrentFromFile(rdTorrent.ID) + if torrentFromFile != nil && torrentFromFile.SelectedFiles.Count() == len(rdTorrent.Links) { + infoCache.Set(rdTorrent.ID, torrentFromFile) + return torrentFromFile + } + + info, err := t.Api.GetTorrentInfo(rdTorrent.ID) + if err != nil { + t.log.Warnf("Cannot get info for id=%s: %v\n", rdTorrent.ID, err) + return nil + } + // SelectedFiles is a subset of Files with only the selected ones + // it also has a Link field, which can be empty + // if it is empty, it means the file is no longer available + // Files+Links together are the same as SelectedFiles + var selectedFiles []*File + // if some Links are empty, we need to repair it + for _, file := range info.Files { + if file.Selected == 0 { + continue + } + selectedFiles = append(selectedFiles, &File{ + File: file, + Added: info.Added, + Ended: info.Ended, + Link: "", // no link yet + }) + } + if len(selectedFiles) > len(info.Links) && info.Progress == 100 { + t.log.Warnf("Torrent id=%s is partly expired, it has %d selected files but only %d links", info.ID, len(selectedFiles), len(info.Links)) + for i, file := range selectedFiles { + file.Link = "repair" + i++ + } + } else if len(selectedFiles) == len(info.Links) { + // all links are still intact! good! + for i, file := range selectedFiles { + file.Link = info.Links[i] + i++ + } + } + + torrent := Torrent{ + AccessKey: t.computeAccessKey(info.Name, info.OriginalName), + LatestAdded: info.Added, + Hash: info.Hash, + } + torrent.SelectedFiles = cmap.New[*File]() + for _, file := range selectedFiles { + // todo better handling of duplicate filenames + if torrent.SelectedFiles.Has(filepath.Base(file.Path)) { + oldName := filepath.Base(file.Path) + ext := filepath.Ext(oldName) + filename := strings.TrimSuffix(oldName, ext) + newName := fmt.Sprintf("%s (%d)%s", filename, file.ID, ext) + torrent.SelectedFiles.Set(newName, file) + } else { + torrent.SelectedFiles.Set(filepath.Base(file.Path), file) + } + } + torrent.DownloadedIDs = strset.New() + torrent.InProgressIDs = strset.New() + if info.Progress == 100 { + torrent.DownloadedIDs.Add(info.ID) + } else { + torrent.InProgressIDs.Add(info.ID) + } + + infoCache.Set(rdTorrent.ID, &torrent) + err = t.writeTorrentToFile(rdTorrent.ID, &torrent) + if err != nil { + t.log.Warnf("Cannot write torrent to file: %v", err) + } + + return &torrent +} + +func (t *TorrentManager) mergeToMain(existing, toMerge *Torrent) Torrent { + mainTorrent := Torrent{} + + mainTorrent.AccessKey = existing.AccessKey + mainTorrent.Hash = existing.Hash + mainTorrent.DownloadedIDs = strset.New() + mainTorrent.InProgressIDs = strset.New() + + // this function triggers only when we have a new DownloadedID + strset.Difference(toMerge.DownloadedIDs, existing.DownloadedIDs).Each(func(id string) bool { + mainTorrent.DownloadedIDs.Add(id) + mainTorrent.InProgressIDs.Remove(id) + return true + }) + // the link can have the following values // 1. https://*** - the file is available // 2. repair - the file is available but we need to repair it // 3. repairing - the file is being repaired // 4. unselect - the file is deleted - torrentToMerge.SelectedFiles.IterCb(func(filepath string, fileToMerge *File) { + mainTorrent.SelectedFiles = existing.SelectedFiles + toMerge.SelectedFiles.IterCb(func(filepath string, fileToMerge *File) { // see if it already exists in the main torrent if originalFile, ok := mainTorrent.SelectedFiles.Get(filepath); !ok || fileToMerge.Link == "unselect" { // if it doesn't exist in the main torrent, add it mainTorrent.SelectedFiles.Set(filepath, fileToMerge) } else if originalFile.Link != "unselect" { - if mainTorrent.LatestAdded < torrentToMerge.LatestAdded && strings.HasPrefix(fileToMerge.Link, "http") { - // if it exists, compare the LatestAdded property and the link + // if it exists, compare the LatestAdded property and the link + if existing.LatestAdded < toMerge.LatestAdded && strings.HasPrefix(fileToMerge.Link, "http") { // if torrentToMerge is more recent and its file has a link, update the main torrent's file - // unless it's removed mainTorrent.SelectedFiles.Set(filepath, fileToMerge) } // else do nothing, the main torrent's file is more recent or has a valid link } }) - // Merge Instances - mainTorrent.Instances = append(mainTorrent.Instances, torrentToMerge.Instances...) - - // LatestAdded - if mainTorrent.LatestAdded < torrentToMerge.LatestAdded { - mainTorrent.LatestAdded = torrentToMerge.LatestAdded + if existing.LatestAdded < toMerge.LatestAdded { + mainTorrent.LatestAdded = toMerge.LatestAdded + } else { + mainTorrent.LatestAdded = existing.LatestAdded } return mainTorrent @@ -313,81 +412,6 @@ func (t *TorrentManager) startRefreshJob() { } } -// getMoreInfo gets original name, size and files for a torrent -func (t *TorrentManager) getMoreInfo(rdTorrent realdebrid.Torrent) *Torrent { - infoCache, _ := t.DirectoryMap.Get(INT_INFO_CACHE) - if tor, exists := infoCache.Get(rdTorrent.ID); exists && tor.SelectedFiles.Count() == len(rdTorrent.Links) { - return tor - } - torrentFromFile := t.readTorrentFromFile(rdTorrent.ID) - if torrentFromFile != nil && torrentFromFile.SelectedFiles.Count() == len(rdTorrent.Links) { - return torrentFromFile - } - - info, err := t.Api.GetTorrentInfo(rdTorrent.ID) - if err != nil { - t.log.Warnf("Cannot get info for id=%s: %v\n", rdTorrent.ID, err) - return nil - } - // SelectedFiles is a subset of Files with only the selected ones - // it also has a Link field, which can be empty - // if it is empty, it means the file is no longer available - // Files+Links together are the same as SelectedFiles - var selectedFiles []*File - // if some Links are empty, we need to repair it - for _, file := range info.Files { - if file.Selected == 0 { - continue - } - selectedFiles = append(selectedFiles, &File{ - File: file, - Added: info.Added, - Ended: info.Ended, - Link: "", // no link yet - }) - } - if len(selectedFiles) > len(info.Links) && info.Progress == 100 { - t.log.Warnf("Torrent id=%s is partly expired, it has %d selected files but only %d links", info.ID, len(selectedFiles), len(info.Links)) - for i, file := range selectedFiles { - file.Link = "repair" - i++ - } - } else if len(selectedFiles) == len(info.Links) { - // all links are still intact! good! - for i, file := range selectedFiles { - file.Link = info.Links[i] - i++ - } - } - - torrent := Torrent{ - AccessKey: t.computeAccessKey(info.Name, info.OriginalName), - LatestAdded: info.Added, - Instances: []*realdebrid.TorrentInfo{info}, - } - torrent.SelectedFiles = cmap.New[*File]() - for _, file := range selectedFiles { - // todo better handling of duplicate filenames - if torrent.SelectedFiles.Has(filepath.Base(file.Path)) { - oldName := filepath.Base(file.Path) - ext := filepath.Ext(oldName) - filename := strings.TrimSuffix(oldName, ext) - newName := fmt.Sprintf("%s (%d)%s", filename, file.ID, ext) - torrent.SelectedFiles.Set(newName, file) - } else { - torrent.SelectedFiles.Set(filepath.Base(file.Path), file) - } - } - - infoCache.Set(rdTorrent.ID, &torrent) - err = t.writeTorrentToFile(rdTorrent.ID, &torrent) - if err != nil { - t.log.Warnf("Cannot write torrent to file: %v", err) - } - - return &torrent -} - func (t *TorrentManager) computeAccessKey(name, originalName string) string { if t.Config.EnableRetainRDTorrentName() { return name @@ -443,6 +467,9 @@ func (t *TorrentManager) readTorrentFromFile(torrentID string) *Torrent { if err := json.Unmarshal(jsonData, &torrent); err != nil { return nil } + if strset.Union(torrent.DownloadedIDs, torrent.InProgressIDs).IsEmpty() { + t.log.Fatal("Torrent has no downloaded or in progress ids") + } if torrent.Version != t.requiredVersion { return nil } @@ -562,9 +589,10 @@ func (t *TorrentManager) CheckDeletedState(torrent *Torrent) bool { if len(unselectedIDs) == torrent.SelectedFiles.Count() && len(unselectedIDs) > 0 { return true } else if len(unselectedIDs) > 0 { - for i := range torrent.Instances { - t.writeTorrentToFile(torrent.Instances[i].ID, torrent) - } + torrent.DownloadedIDs.Each(func(id string) bool { + t.writeTorrentToFile(id, torrent) + return true + }) } return false } @@ -574,12 +602,13 @@ func (t *TorrentManager) Delete(accessKey string, deleteInRD bool, updateDirecto allTorrents, _ := t.DirectoryMap.Get(INT_ALL) infoCache, _ := t.DirectoryMap.Get(INT_INFO_CACHE) if torrent, ok := allTorrents.Get(accessKey); ok { - for _, instance := range torrent.Instances { - t.log.Infof("Deleting torrent %s %s in RD", instance.ID, accessKey) - t.Api.DeleteTorrent(instance.ID) - infoCache.Remove(instance.ID) - t.deleteTorrentFile(instance.ID) - } + torrent.DownloadedIDs.Each(func(id string) bool { + t.log.Infof("Deleting torrent %s %s in RD", id, accessKey) + t.Api.DeleteTorrent(id) + infoCache.Remove(id) + t.deleteTorrentFile(id) + return true + }) } } t.log.Infof("Removing torrent %s from zurg database", accessKey) @@ -596,7 +625,7 @@ func (t *TorrentManager) Delete(accessKey string, deleteInRD bool, updateDirecto } func (t *TorrentManager) repair(torrent *Torrent) { - if torrent.AnyInProgress() { + if torrent.AllInProgress() { t.log.Infof("Torrent %s is in progress, cannot repair", torrent.AccessKey) return } @@ -633,7 +662,6 @@ func (t *TorrentManager) repair(torrent *Torrent) { return } else if streamableCount == 1 { t.log.Warnf("Torrent %s only file has expired (it will no longer show up in your directories, zurg suggests you delete it)", torrent.AccessKey) - t.log.Debugf("You can try fixing it yourself magnet:?xt=urn:btih:%s", torrent.Instances[0].Hash) t.Delete(torrent.AccessKey, false, true) return } @@ -713,7 +741,7 @@ func (t *TorrentManager) reinsertTorrent(torrent *Torrent, missingFiles string) } // redownload torrent - resp, err := t.Api.AddMagnetHash(torrent.Instances[0].Hash) + resp, err := t.Api.AddMagnetHash(torrent.Hash) if err != nil { t.log.Warnf("Cannot redownload torrent: %v", err) return false @@ -829,9 +857,9 @@ func (t *TorrentManager) UpdateTorrentResponseCache(torrent *Torrent) { pathKey := fmt.Sprintf("%s/%s", directory, torrent.AccessKey) // torrent responses newHtml := strings.ReplaceAll(html, "$dir", directory) - t.ResponseCache.Set(pathKey+".html", &newHtml, 1) + t.ResponseCache.Set(pathKey+".html", newHtml, 1) newDav := strings.ReplaceAll(dav, "$dir", directory) - t.ResponseCache.Set(pathKey+".dav", &newDav, 1) + t.ResponseCache.Set(pathKey+".dav", newDav, 1) }) } @@ -843,7 +871,7 @@ func (t *TorrentManager) UpdateDirectoryResponsesCache() { htmlRet := "" for _, accessKey := range allKeys { if tor, ok := torrents.Get(accessKey); ok { - if tor.AnyInProgress() { + if tor.AllInProgress() { continue } davRet += dav.Directory(tor.AccessKey, tor.LatestAdded) @@ -853,9 +881,9 @@ func (t *TorrentManager) UpdateDirectoryResponsesCache() { cacheKey := directory davRet = "" + dav.BaseDirectory(directory, "") + dav.BaseDirectory(directory, "") + davRet + "" - t.ResponseCache.Set(cacheKey+".dav", &davRet, 1) + t.ResponseCache.Set(cacheKey+".dav", davRet, 1) htmlRet = "
    " + htmlRet - t.ResponseCache.Set(cacheKey+".html", &htmlRet, 1) + t.ResponseCache.Set(cacheKey+".html", htmlRet, 1) }) } @@ -883,10 +911,7 @@ func (t *TorrentManager) buildTorrentResponses(tor *Torrent) (string, string) { } func (t *TorrentManager) AssignedDirectoryCb(tor *Torrent, cb func(string)) { - var torrentIDs []string - for _, instance := range tor.Instances { - torrentIDs = append(torrentIDs, instance.ID) - } + torrentIDs := strset.Union(tor.DownloadedIDs, tor.InProgressIDs).List() // get filenames needed for directory conditions filenames := tor.SelectedFiles.Keys() // Map torrents to directories diff --git a/internal/torrent/types.go b/internal/torrent/types.go index fedc78d..0919675 100644 --- a/internal/torrent/types.go +++ b/internal/torrent/types.go @@ -6,32 +6,51 @@ import ( "github.com/debridmediamanager/zurg/pkg/realdebrid" jsoniter "github.com/json-iterator/go" cmap "github.com/orcaman/concurrent-map/v2" + "github.com/scylladb/go-set/strset" ) var json = jsoniter.ConfigCompatibleWithStandardLibrary type Torrent struct { AccessKey string `json:"AccessKey"` + Hash string `json:"Hash"` SelectedFiles cmap.ConcurrentMap[string, *File] `json:"-"` LatestAdded string `json:"LatestAdded"` - Version string `json:"Version"` + DownloadedIDs *strset.Set `json:"DownloadedIDs"` + InProgressIDs *strset.Set `json:"InProgressIDs"` - Instances []*realdebrid.TorrentInfo `json:"Instances"` + Version string `json:"Version"` // only used for files } func (t *Torrent) MarshalJSON() ([]byte, error) { type Alias Torrent temp := &struct { SelectedFilesJson oldjson.RawMessage `json:"SelectedFiles"` + DownloadedIDsJson oldjson.RawMessage `json:"DownloadedIDs"` + InProgressIDsJson oldjson.RawMessage `json:"InProgressIDs"` *Alias }{ Alias: (*Alias)(t), } + selectedFilesJson, err := t.SelectedFiles.MarshalJSON() if err != nil { return nil, err } temp.SelectedFilesJson = selectedFilesJson + + downloadedIDsJson, err := json.Marshal(t.DownloadedIDs.List()) + if err != nil { + return nil, err + } + temp.DownloadedIDsJson = downloadedIDsJson + + inProgressIDsJson, err := json.Marshal(t.InProgressIDs.List()) + if err != nil { + return nil, err + } + temp.InProgressIDsJson = inProgressIDsJson + return json.Marshal(temp) } @@ -39,6 +58,8 @@ func (t *Torrent) UnmarshalJSON(data []byte) error { type Alias Torrent temp := &struct { SelectedFilesJson oldjson.RawMessage `json:"SelectedFiles"` + DownloadedIDsJson oldjson.RawMessage `json:"DownloadedIDs"` + InProgressIDsJson oldjson.RawMessage `json:"InProgressIDs"` *Alias }{ Alias: (*Alias)(t), @@ -46,32 +67,43 @@ func (t *Torrent) UnmarshalJSON(data []byte) error { if err := json.Unmarshal(data, temp); err != nil { return err } + + t.SelectedFiles = cmap.New[*File]() if len(temp.SelectedFilesJson) > 0 { - t.SelectedFiles = cmap.New[*File]() if err := t.SelectedFiles.UnmarshalJSON(temp.SelectedFilesJson); err != nil { return err } } + + if len(temp.DownloadedIDsJson) > 0 { + var downloadedIDs []string + if err := json.Unmarshal(temp.DownloadedIDsJson, &downloadedIDs); err != nil { + return err + } + t.DownloadedIDs = strset.New(downloadedIDs...) + } else { + t.DownloadedIDs = strset.New() + } + + if len(temp.InProgressIDsJson) > 0 { + var inProgressIDs []string + if err := json.Unmarshal(temp.InProgressIDsJson, &inProgressIDs); err != nil { + return err + } + t.InProgressIDs = strset.New(inProgressIDs...) + } else { + t.InProgressIDs = strset.New() + } + return nil } func (t *Torrent) AnyInProgress() bool { - for _, instance := range t.Instances { - if instance.Progress < 100 { - return true - } - } - return false + return !t.InProgressIDs.IsEmpty() } func (t *Torrent) AllInProgress() bool { - count := 0 - for _, instance := range t.Instances { - if instance.Progress < 100 { - count++ - } - } - return count == len(t.Instances) + return t.DownloadedIDs.IsEmpty() } type File struct {