Implement proper checks for new torrents

This commit is contained in:
Ben Sarmiento
2023-12-02 17:37:18 +01:00
parent 910e5a4998
commit a8e5744481
5 changed files with 207 additions and 139 deletions

View File

@@ -16,12 +16,16 @@ import (
"github.com/dgraph-io/ristretto"
"github.com/julienschmidt/httprouter"
"github.com/panjf2000/ants/v2"
_ "net/http/pprof" // Register pprof
)
func MainApp(configPath string) {
log := logutil.NewLogger()
zurglog := log.Named("zurg")
zurglog.Debugf("PID: %d", os.Getpid())
config, configErr := config.LoadZurgConfig(configPath, log.Named("config"))
if configErr != nil {
zurglog.Errorf("Config failed to load: %v", configErr)
@@ -61,6 +65,13 @@ func MainApp(configPath string) {
handler.RedirectFixedPath = true
router.ApplyRouteTable(handler, getfile, torrentMgr, config, rd, log.Named("router"))
go func() {
if err := netHttp.ListenAndServe(":6060", nil); err != nil && err != netHttp.ErrServerClosed {
zurglog.Errorf("Failed to start pprof: %v", err)
os.Exit(1)
}
}()
addr := fmt.Sprintf("%s:%s", config.GetHost(), config.GetPort())
zurglog.Infof("Starting server on %s", addr)
if err := netHttp.ListenAndServe(addr, handler); err != nil && err != netHttp.ErrServerClosed {

View File

@@ -47,8 +47,8 @@ func HandleListTorrents(directory string, t *torrent.TorrentManager, log *zap.Su
davDoc += "</d:multistatus>"
return &davDoc, nil
} else {
davDoc := resp.(*string)
return davDoc, nil
davDoc := resp.(string)
return &davDoc, nil
}
}
@@ -77,7 +77,7 @@ func HandleListFiles(directory, torrentName string, t *torrent.TorrentManager, l
davDoc += "</d:multistatus>"
return &davDoc, nil
} else {
davDoc := resp.(*string)
return davDoc, nil
davDoc := resp.(string)
return &davDoc, nil
}
}

View File

@@ -50,8 +50,8 @@ func HandleListTorrents(directory string, t *torrent.TorrentManager, log *zap.Su
}
return &htmlDoc, nil
} else {
htmlDoc := resp.(*string)
return htmlDoc, nil
htmlDoc := resp.(string)
return &htmlDoc, nil
}
}
@@ -80,7 +80,7 @@ func HandleListFiles(directory, torrentName string, t *torrent.TorrentManager, l
}
return &htmlDoc, nil
} else {
htmlDoc := resp.(*string)
return htmlDoc, nil
htmlDoc := resp.(string)
return &htmlDoc, nil
}
}

View File

@@ -55,7 +55,7 @@ func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, p
ResponseCache: cache,
accessKeySet: set.NewStringSet(),
latestState: &initialSate,
requiredVersion: "02.12.2023",
requiredVersion: "03.12.2023",
workerPool: p,
log: log,
}
@@ -118,14 +118,12 @@ func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, p
}
func (t *TorrentManager) RefreshTorrents() {
// get all torrent info
instances, _, err := t.Api.GetTorrents(0)
if err != nil {
t.log.Warnf("Cannot get torrents: %v\n", err)
return
}
instanceCount := len(instances)
infoChan := make(chan *Torrent, instanceCount)
infoChan := make(chan *Torrent, len(instances))
var wg sync.WaitGroup
for i := range instances {
wg.Add(1)
@@ -137,7 +135,7 @@ func (t *TorrentManager) RefreshTorrents() {
}
wg.Wait()
close(infoChan)
t.log.Infof("Fetched info for %d torrents", instanceCount)
t.log.Infof("Fetched info for %d torrents", len(instances))
freshKeys := set.NewStringSet()
oldTorrents, _ := t.DirectoryMap.Get(INT_ALL)
@@ -150,26 +148,32 @@ func (t *TorrentManager) RefreshTorrents() {
freshKeys.Add(info.AccessKey)
if torrent, exists := oldTorrents.Get(info.AccessKey); !exists {
oldTorrents.Set(info.AccessKey, info)
} else {
} else if !strset.Difference(info.DownloadedIDs, torrent.DownloadedIDs).IsEmpty() {
mainTorrent := t.mergeToMain(torrent, info)
oldTorrents.Set(info.AccessKey, mainTorrent)
oldTorrents.Set(info.AccessKey, &mainTorrent)
}
}
t.log.Infof("Compiled into %d torrents, %d were missing info", oldTorrents.Count(), noInfoCount)
somthingChanged := false
// removed
strset.Difference(t.accessKeySet, freshKeys).Each(func(accessKey string) bool {
somthingChanged = true
t.Delete(accessKey, false, false)
return true
})
// new
strset.Difference(freshKeys, t.accessKeySet).Each(func(accessKey string) bool {
somthingChanged = true
torrent, _ := oldTorrents.Get(accessKey)
t.UpdateTorrentResponseCache(torrent)
t.accessKeySet.Add(accessKey)
return true
})
// now we can build the directory responses
t.UpdateDirectoryResponsesCache()
if somthingChanged {
t.UpdateDirectoryResponsesCache()
}
t.SetNewLatestState(t.getCurrentState())
@@ -179,34 +183,129 @@ func (t *TorrentManager) RefreshTorrents() {
// })
}
func (t *TorrentManager) mergeToMain(mainTorrent, torrentToMerge *Torrent) *Torrent {
// getMoreInfo gets original name, size and files for a torrent
func (t *TorrentManager) getMoreInfo(rdTorrent realdebrid.Torrent) *Torrent {
infoCache, _ := t.DirectoryMap.Get(INT_INFO_CACHE)
if tor, exists := infoCache.Get(rdTorrent.ID); exists && tor.SelectedFiles.Count() == len(rdTorrent.Links) {
return tor
}
torrentFromFile := t.readTorrentFromFile(rdTorrent.ID)
if torrentFromFile != nil && torrentFromFile.SelectedFiles.Count() == len(rdTorrent.Links) {
infoCache.Set(rdTorrent.ID, torrentFromFile)
return torrentFromFile
}
info, err := t.Api.GetTorrentInfo(rdTorrent.ID)
if err != nil {
t.log.Warnf("Cannot get info for id=%s: %v\n", rdTorrent.ID, err)
return nil
}
// SelectedFiles is a subset of Files with only the selected ones
// it also has a Link field, which can be empty
// if it is empty, it means the file is no longer available
// Files+Links together are the same as SelectedFiles
var selectedFiles []*File
// if some Links are empty, we need to repair it
for _, file := range info.Files {
if file.Selected == 0 {
continue
}
selectedFiles = append(selectedFiles, &File{
File: file,
Added: info.Added,
Ended: info.Ended,
Link: "", // no link yet
})
}
if len(selectedFiles) > len(info.Links) && info.Progress == 100 {
t.log.Warnf("Torrent id=%s is partly expired, it has %d selected files but only %d links", info.ID, len(selectedFiles), len(info.Links))
for i, file := range selectedFiles {
file.Link = "repair"
i++
}
} else if len(selectedFiles) == len(info.Links) {
// all links are still intact! good!
for i, file := range selectedFiles {
file.Link = info.Links[i]
i++
}
}
torrent := Torrent{
AccessKey: t.computeAccessKey(info.Name, info.OriginalName),
LatestAdded: info.Added,
Hash: info.Hash,
}
torrent.SelectedFiles = cmap.New[*File]()
for _, file := range selectedFiles {
// todo better handling of duplicate filenames
if torrent.SelectedFiles.Has(filepath.Base(file.Path)) {
oldName := filepath.Base(file.Path)
ext := filepath.Ext(oldName)
filename := strings.TrimSuffix(oldName, ext)
newName := fmt.Sprintf("%s (%d)%s", filename, file.ID, ext)
torrent.SelectedFiles.Set(newName, file)
} else {
torrent.SelectedFiles.Set(filepath.Base(file.Path), file)
}
}
torrent.DownloadedIDs = strset.New()
torrent.InProgressIDs = strset.New()
if info.Progress == 100 {
torrent.DownloadedIDs.Add(info.ID)
} else {
torrent.InProgressIDs.Add(info.ID)
}
infoCache.Set(rdTorrent.ID, &torrent)
err = t.writeTorrentToFile(rdTorrent.ID, &torrent)
if err != nil {
t.log.Warnf("Cannot write torrent to file: %v", err)
}
return &torrent
}
func (t *TorrentManager) mergeToMain(existing, toMerge *Torrent) Torrent {
mainTorrent := Torrent{}
mainTorrent.AccessKey = existing.AccessKey
mainTorrent.Hash = existing.Hash
mainTorrent.DownloadedIDs = strset.New()
mainTorrent.InProgressIDs = strset.New()
// this function triggers only when we have a new DownloadedID
strset.Difference(toMerge.DownloadedIDs, existing.DownloadedIDs).Each(func(id string) bool {
mainTorrent.DownloadedIDs.Add(id)
mainTorrent.InProgressIDs.Remove(id)
return true
})
// the link can have the following values
// 1. https://*** - the file is available
// 2. repair - the file is available but we need to repair it
// 3. repairing - the file is being repaired
// 4. unselect - the file is deleted
torrentToMerge.SelectedFiles.IterCb(func(filepath string, fileToMerge *File) {
mainTorrent.SelectedFiles = existing.SelectedFiles
toMerge.SelectedFiles.IterCb(func(filepath string, fileToMerge *File) {
// see if it already exists in the main torrent
if originalFile, ok := mainTorrent.SelectedFiles.Get(filepath); !ok || fileToMerge.Link == "unselect" {
// if it doesn't exist in the main torrent, add it
mainTorrent.SelectedFiles.Set(filepath, fileToMerge)
} else if originalFile.Link != "unselect" {
if mainTorrent.LatestAdded < torrentToMerge.LatestAdded && strings.HasPrefix(fileToMerge.Link, "http") {
// if it exists, compare the LatestAdded property and the link
// if it exists, compare the LatestAdded property and the link
if existing.LatestAdded < toMerge.LatestAdded && strings.HasPrefix(fileToMerge.Link, "http") {
// if torrentToMerge is more recent and its file has a link, update the main torrent's file
// unless it's removed
mainTorrent.SelectedFiles.Set(filepath, fileToMerge)
}
// else do nothing, the main torrent's file is more recent or has a valid link
}
})
// Merge Instances
mainTorrent.Instances = append(mainTorrent.Instances, torrentToMerge.Instances...)
// LatestAdded
if mainTorrent.LatestAdded < torrentToMerge.LatestAdded {
mainTorrent.LatestAdded = torrentToMerge.LatestAdded
if existing.LatestAdded < toMerge.LatestAdded {
mainTorrent.LatestAdded = toMerge.LatestAdded
} else {
mainTorrent.LatestAdded = existing.LatestAdded
}
return mainTorrent
@@ -313,81 +412,6 @@ func (t *TorrentManager) startRefreshJob() {
}
}
// getMoreInfo gets original name, size and files for a torrent
func (t *TorrentManager) getMoreInfo(rdTorrent realdebrid.Torrent) *Torrent {
infoCache, _ := t.DirectoryMap.Get(INT_INFO_CACHE)
if tor, exists := infoCache.Get(rdTorrent.ID); exists && tor.SelectedFiles.Count() == len(rdTorrent.Links) {
return tor
}
torrentFromFile := t.readTorrentFromFile(rdTorrent.ID)
if torrentFromFile != nil && torrentFromFile.SelectedFiles.Count() == len(rdTorrent.Links) {
return torrentFromFile
}
info, err := t.Api.GetTorrentInfo(rdTorrent.ID)
if err != nil {
t.log.Warnf("Cannot get info for id=%s: %v\n", rdTorrent.ID, err)
return nil
}
// SelectedFiles is a subset of Files with only the selected ones
// it also has a Link field, which can be empty
// if it is empty, it means the file is no longer available
// Files+Links together are the same as SelectedFiles
var selectedFiles []*File
// if some Links are empty, we need to repair it
for _, file := range info.Files {
if file.Selected == 0 {
continue
}
selectedFiles = append(selectedFiles, &File{
File: file,
Added: info.Added,
Ended: info.Ended,
Link: "", // no link yet
})
}
if len(selectedFiles) > len(info.Links) && info.Progress == 100 {
t.log.Warnf("Torrent id=%s is partly expired, it has %d selected files but only %d links", info.ID, len(selectedFiles), len(info.Links))
for i, file := range selectedFiles {
file.Link = "repair"
i++
}
} else if len(selectedFiles) == len(info.Links) {
// all links are still intact! good!
for i, file := range selectedFiles {
file.Link = info.Links[i]
i++
}
}
torrent := Torrent{
AccessKey: t.computeAccessKey(info.Name, info.OriginalName),
LatestAdded: info.Added,
Instances: []*realdebrid.TorrentInfo{info},
}
torrent.SelectedFiles = cmap.New[*File]()
for _, file := range selectedFiles {
// todo better handling of duplicate filenames
if torrent.SelectedFiles.Has(filepath.Base(file.Path)) {
oldName := filepath.Base(file.Path)
ext := filepath.Ext(oldName)
filename := strings.TrimSuffix(oldName, ext)
newName := fmt.Sprintf("%s (%d)%s", filename, file.ID, ext)
torrent.SelectedFiles.Set(newName, file)
} else {
torrent.SelectedFiles.Set(filepath.Base(file.Path), file)
}
}
infoCache.Set(rdTorrent.ID, &torrent)
err = t.writeTorrentToFile(rdTorrent.ID, &torrent)
if err != nil {
t.log.Warnf("Cannot write torrent to file: %v", err)
}
return &torrent
}
func (t *TorrentManager) computeAccessKey(name, originalName string) string {
if t.Config.EnableRetainRDTorrentName() {
return name
@@ -443,6 +467,9 @@ func (t *TorrentManager) readTorrentFromFile(torrentID string) *Torrent {
if err := json.Unmarshal(jsonData, &torrent); err != nil {
return nil
}
if strset.Union(torrent.DownloadedIDs, torrent.InProgressIDs).IsEmpty() {
t.log.Fatal("Torrent has no downloaded or in progress ids")
}
if torrent.Version != t.requiredVersion {
return nil
}
@@ -562,9 +589,10 @@ func (t *TorrentManager) CheckDeletedState(torrent *Torrent) bool {
if len(unselectedIDs) == torrent.SelectedFiles.Count() && len(unselectedIDs) > 0 {
return true
} else if len(unselectedIDs) > 0 {
for i := range torrent.Instances {
t.writeTorrentToFile(torrent.Instances[i].ID, torrent)
}
torrent.DownloadedIDs.Each(func(id string) bool {
t.writeTorrentToFile(id, torrent)
return true
})
}
return false
}
@@ -574,12 +602,13 @@ func (t *TorrentManager) Delete(accessKey string, deleteInRD bool, updateDirecto
allTorrents, _ := t.DirectoryMap.Get(INT_ALL)
infoCache, _ := t.DirectoryMap.Get(INT_INFO_CACHE)
if torrent, ok := allTorrents.Get(accessKey); ok {
for _, instance := range torrent.Instances {
t.log.Infof("Deleting torrent %s %s in RD", instance.ID, accessKey)
t.Api.DeleteTorrent(instance.ID)
infoCache.Remove(instance.ID)
t.deleteTorrentFile(instance.ID)
}
torrent.DownloadedIDs.Each(func(id string) bool {
t.log.Infof("Deleting torrent %s %s in RD", id, accessKey)
t.Api.DeleteTorrent(id)
infoCache.Remove(id)
t.deleteTorrentFile(id)
return true
})
}
}
t.log.Infof("Removing torrent %s from zurg database", accessKey)
@@ -596,7 +625,7 @@ func (t *TorrentManager) Delete(accessKey string, deleteInRD bool, updateDirecto
}
func (t *TorrentManager) repair(torrent *Torrent) {
if torrent.AnyInProgress() {
if torrent.AllInProgress() {
t.log.Infof("Torrent %s is in progress, cannot repair", torrent.AccessKey)
return
}
@@ -633,7 +662,6 @@ func (t *TorrentManager) repair(torrent *Torrent) {
return
} else if streamableCount == 1 {
t.log.Warnf("Torrent %s only file has expired (it will no longer show up in your directories, zurg suggests you delete it)", torrent.AccessKey)
t.log.Debugf("You can try fixing it yourself magnet:?xt=urn:btih:%s", torrent.Instances[0].Hash)
t.Delete(torrent.AccessKey, false, true)
return
}
@@ -713,7 +741,7 @@ func (t *TorrentManager) reinsertTorrent(torrent *Torrent, missingFiles string)
}
// redownload torrent
resp, err := t.Api.AddMagnetHash(torrent.Instances[0].Hash)
resp, err := t.Api.AddMagnetHash(torrent.Hash)
if err != nil {
t.log.Warnf("Cannot redownload torrent: %v", err)
return false
@@ -829,9 +857,9 @@ func (t *TorrentManager) UpdateTorrentResponseCache(torrent *Torrent) {
pathKey := fmt.Sprintf("%s/%s", directory, torrent.AccessKey)
// torrent responses
newHtml := strings.ReplaceAll(html, "$dir", directory)
t.ResponseCache.Set(pathKey+".html", &newHtml, 1)
t.ResponseCache.Set(pathKey+".html", newHtml, 1)
newDav := strings.ReplaceAll(dav, "$dir", directory)
t.ResponseCache.Set(pathKey+".dav", &newDav, 1)
t.ResponseCache.Set(pathKey+".dav", newDav, 1)
})
}
@@ -843,7 +871,7 @@ func (t *TorrentManager) UpdateDirectoryResponsesCache() {
htmlRet := ""
for _, accessKey := range allKeys {
if tor, ok := torrents.Get(accessKey); ok {
if tor.AnyInProgress() {
if tor.AllInProgress() {
continue
}
davRet += dav.Directory(tor.AccessKey, tor.LatestAdded)
@@ -853,9 +881,9 @@ func (t *TorrentManager) UpdateDirectoryResponsesCache() {
cacheKey := directory
davRet = "<?xml version=\"1.0\" encoding=\"utf-8\"?><d:multistatus xmlns:d=\"DAV:\">" + dav.BaseDirectory(directory, "") + dav.BaseDirectory(directory, "") + davRet + "</d:multistatus>"
t.ResponseCache.Set(cacheKey+".dav", &davRet, 1)
t.ResponseCache.Set(cacheKey+".dav", davRet, 1)
htmlRet = "<ol>" + htmlRet
t.ResponseCache.Set(cacheKey+".html", &htmlRet, 1)
t.ResponseCache.Set(cacheKey+".html", htmlRet, 1)
})
}
@@ -883,10 +911,7 @@ func (t *TorrentManager) buildTorrentResponses(tor *Torrent) (string, string) {
}
func (t *TorrentManager) AssignedDirectoryCb(tor *Torrent, cb func(string)) {
var torrentIDs []string
for _, instance := range tor.Instances {
torrentIDs = append(torrentIDs, instance.ID)
}
torrentIDs := strset.Union(tor.DownloadedIDs, tor.InProgressIDs).List()
// get filenames needed for directory conditions
filenames := tor.SelectedFiles.Keys()
// Map torrents to directories

View File

@@ -6,32 +6,51 @@ import (
"github.com/debridmediamanager/zurg/pkg/realdebrid"
jsoniter "github.com/json-iterator/go"
cmap "github.com/orcaman/concurrent-map/v2"
"github.com/scylladb/go-set/strset"
)
var json = jsoniter.ConfigCompatibleWithStandardLibrary
type Torrent struct {
AccessKey string `json:"AccessKey"`
Hash string `json:"Hash"`
SelectedFiles cmap.ConcurrentMap[string, *File] `json:"-"`
LatestAdded string `json:"LatestAdded"`
Version string `json:"Version"`
DownloadedIDs *strset.Set `json:"DownloadedIDs"`
InProgressIDs *strset.Set `json:"InProgressIDs"`
Instances []*realdebrid.TorrentInfo `json:"Instances"`
Version string `json:"Version"` // only used for files
}
func (t *Torrent) MarshalJSON() ([]byte, error) {
type Alias Torrent
temp := &struct {
SelectedFilesJson oldjson.RawMessage `json:"SelectedFiles"`
DownloadedIDsJson oldjson.RawMessage `json:"DownloadedIDs"`
InProgressIDsJson oldjson.RawMessage `json:"InProgressIDs"`
*Alias
}{
Alias: (*Alias)(t),
}
selectedFilesJson, err := t.SelectedFiles.MarshalJSON()
if err != nil {
return nil, err
}
temp.SelectedFilesJson = selectedFilesJson
downloadedIDsJson, err := json.Marshal(t.DownloadedIDs.List())
if err != nil {
return nil, err
}
temp.DownloadedIDsJson = downloadedIDsJson
inProgressIDsJson, err := json.Marshal(t.InProgressIDs.List())
if err != nil {
return nil, err
}
temp.InProgressIDsJson = inProgressIDsJson
return json.Marshal(temp)
}
@@ -39,6 +58,8 @@ func (t *Torrent) UnmarshalJSON(data []byte) error {
type Alias Torrent
temp := &struct {
SelectedFilesJson oldjson.RawMessage `json:"SelectedFiles"`
DownloadedIDsJson oldjson.RawMessage `json:"DownloadedIDs"`
InProgressIDsJson oldjson.RawMessage `json:"InProgressIDs"`
*Alias
}{
Alias: (*Alias)(t),
@@ -46,32 +67,43 @@ func (t *Torrent) UnmarshalJSON(data []byte) error {
if err := json.Unmarshal(data, temp); err != nil {
return err
}
t.SelectedFiles = cmap.New[*File]()
if len(temp.SelectedFilesJson) > 0 {
t.SelectedFiles = cmap.New[*File]()
if err := t.SelectedFiles.UnmarshalJSON(temp.SelectedFilesJson); err != nil {
return err
}
}
if len(temp.DownloadedIDsJson) > 0 {
var downloadedIDs []string
if err := json.Unmarshal(temp.DownloadedIDsJson, &downloadedIDs); err != nil {
return err
}
t.DownloadedIDs = strset.New(downloadedIDs...)
} else {
t.DownloadedIDs = strset.New()
}
if len(temp.InProgressIDsJson) > 0 {
var inProgressIDs []string
if err := json.Unmarshal(temp.InProgressIDsJson, &inProgressIDs); err != nil {
return err
}
t.InProgressIDs = strset.New(inProgressIDs...)
} else {
t.InProgressIDs = strset.New()
}
return nil
}
func (t *Torrent) AnyInProgress() bool {
for _, instance := range t.Instances {
if instance.Progress < 100 {
return true
}
}
return false
return !t.InProgressIDs.IsEmpty()
}
func (t *Torrent) AllInProgress() bool {
count := 0
for _, instance := range t.Instances {
if instance.Progress < 100 {
count++
}
}
return count == len(t.Instances)
return t.DownloadedIDs.IsEmpty()
}
type File struct {