Remove components, do downloaded ids ; support dumps

This commit is contained in:
Ben Sarmiento
2024-05-24 02:24:26 +02:00
parent beba993364
commit 9ecbb5d892
8 changed files with 188 additions and 107 deletions

3
.gitignore vendored
View File

@@ -43,3 +43,6 @@ pkg/anidb/
logs/
*.log
*.zurgtorrent
dump/

View File

@@ -93,6 +93,7 @@ func MainApp(configPath string) {
defer workerPool.Release()
utils.EnsureDirExists("data") // Ensure the data directory exists
utils.EnsureDirExists("dump") // dump is a new directory for "torrent" files
torrentMgr := torrent.NewTorrentManager(
config,
api,
@@ -113,6 +114,7 @@ func MainApp(configPath string) {
log.Named("router"),
)
//// pprof
// go func() {
// if err := netHttp.ListenAndServe(":6060", nil); err != nil && err != netHttp.ErrServerClosed {
// zurglog.Errorf("Failed to start pprof: %v", err)

View File

@@ -1,7 +1,6 @@
package torrent
import (
"github.com/debridmediamanager/zurg/pkg/realdebrid"
cmap "github.com/orcaman/concurrent-map/v2"
)
@@ -27,10 +26,12 @@ func (t *TorrentManager) Delete(accessKey string, deleteInRD bool) {
if torrent, ok := allTorrents.Get(accessKey); ok {
hash = torrent.Hash
if deleteInRD {
torrent.Components.IterCb(func(torrentID string, _ *realdebrid.TorrentInfo) {
torrent.DownloadedIDs.Each(func(torrentID string) bool {
t.log.Debugf("Deleting torrent %s (id=%s) in RD", accessKey, torrentID)
t.api.DeleteTorrent(torrentID)
t.deleteInfoFile(torrentID)
return false
})
}
}

View File

@@ -45,8 +45,8 @@ type TorrentManager struct {
repairRunning bool
repairRunningMu sync.Mutex
trashBin mapset.Set[string]
repairBin mapset.Set[string] // same as trash bin, but only if the torrent has been downloaded
immediateBin mapset.Set[string]
onceDoneBin mapset.Set[string]
}
// NewTorrentManager creates a new torrent manager
@@ -73,8 +73,8 @@ func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, w
latestState: &LibraryState{log: log},
}
t.trashBin = mapset.NewSet[string]()
t.initializeDirectories()
t.initializeBins()
t.initializeDirectoryMaps()
t.workerPool.Submit(func() {
t.refreshTorrents()
t.setNewLatestState(t.getCurrentState())
@@ -139,17 +139,17 @@ func (t *TorrentManager) GetPath(file *File) string {
/// torrent functions
func (t *TorrentManager) getTorrentFiles() mapset.Set[string] {
files, err := filepath.Glob("data/*.torrent_zurg")
func (t *TorrentManager) getTorrentFiles(parentDir string) mapset.Set[string] {
files, err := filepath.Glob(parentDir + "/*.zurgtorrent")
if err != nil {
t.log.Warnf("Cannot get files in data directory: %v", err)
t.log.Warnf("Cannot get files in %s directory: %v", parentDir, err)
return nil
}
return mapset.NewSet[string](files...)
}
func (t *TorrentManager) writeTorrentToFile(torrent *Torrent) {
filePath := "data/" + torrent.Hash + ".torrent_zurg"
filePath := "data/" + torrent.Hash + ".zurgtorrent"
file, err := os.Create(filePath)
if err != nil {
t.log.Warnf("Cannot create file %s: %v", filePath, err)
@@ -173,8 +173,7 @@ func (t *TorrentManager) writeTorrentToFile(torrent *Torrent) {
t.log.Debugf("Saved torrent %s (hash=%s) to file", t.GetKey(torrent), torrent.Hash)
}
func (t *TorrentManager) readTorrentFromFile(hash string) *Torrent {
filePath := "data/" + hash + ".torrent_zurg"
func (t *TorrentManager) readTorrentFromFile(filePath string) *Torrent {
file, err := os.Open(filePath)
if err != nil {
if os.IsNotExist(err) {
@@ -198,7 +197,7 @@ func (t *TorrentManager) readTorrentFromFile(hash string) *Torrent {
}
func (t *TorrentManager) deleteTorrentFile(hash string) {
filePath := "data/" + hash + ".torrent_zurg"
filePath := "data/" + hash + ".zurgtorrent"
_ = os.Remove(filePath)
}
@@ -310,7 +309,7 @@ func (t *TorrentManager) StartDownloadsJob() {
})
}
func (t *TorrentManager) initializeDirectories() {
func (t *TorrentManager) initializeDirectoryMaps() {
// create internal directories
t.DirectoryMap.Set(INT_ALL, cmap.New[*Torrent]()) // key is GetAccessKey()
// create directory maps

View File

@@ -30,18 +30,40 @@ func (t *TorrentManager) refreshTorrents() []string {
allTorrents, _ := t.DirectoryMap.Get(INT_ALL)
freshHashes := mapset.NewSet[string]()
freshIDs := mapset.NewSet[string]()
freshAccessKeys := mapset.NewSet[string]()
t.getTorrentFiles("dump").Each(func(filePath string) bool {
torrent := t.readTorrentFromFile(filePath)
if torrent != nil {
accessKey := t.GetKey(torrent)
t.log.Debugf("Adding dumped torrent %s", accessKey)
allTorrents.Set(accessKey, torrent)
t.assignDirectory(torrent, func(directory string) {
listing, _ := t.DirectoryMap.Get(directory)
listing.Set(accessKey, torrent)
// note that we're not adding it to updatedPaths
})
freshAccessKeys.Add(accessKey) // to prevent being deleted
}
return false
})
existingHashes := mapset.NewSet[string]()
t.getTorrentFiles("data").Each(func(path string) bool {
path = filepath.Base(path)
hash := strings.TrimSuffix(path, ".zurgtorrent")
existingHashes.Add(hash)
return false
})
for i := range instances {
wg.Add(1)
idx := i
_ = t.workerPool.Submit(func() {
defer wg.Done()
if t.trashBin.Contains(instances[idx].ID) {
t.api.DeleteTorrent(instances[idx].ID)
t.log.Debugf("Skipping trashed torrent %s (id=%s)", instances[idx].Name, instances[idx].ID)
if t.binImmediately(instances[idx].ID) {
// t.log.Debugf("Skipping trashed torrent %s (id=%s)", instances[idx].Name, instances[idx].ID)
mergeChan <- nil
return
}
@@ -52,7 +74,6 @@ func (t *TorrentManager) refreshTorrents() []string {
return
}
freshHashes.Add(instances[idx].Hash)
freshIDs.Add(instances[idx].ID)
tInfo := t.getMoreInfo(instances[idx])
@@ -71,10 +92,15 @@ func (t *TorrentManager) refreshTorrents() []string {
updatedPaths.Add(fmt.Sprintf("%s/%s", directory, accessKey))
})
} else if !mainTorrent.Components.Has(tInfo.ID) {
} else if !mainTorrent.DownloadedIDs.Contains(tInfo.ID) {
forMerging = torrent
}
// write to file if it is a new torrent
if forMerging == nil && !existingHashes.Contains(tInfo.Hash) {
t.writeTorrentToFile(torrent)
}
mergeChan <- forMerging
})
}
@@ -123,24 +149,13 @@ func (t *TorrentManager) refreshTorrents() []string {
t.log.Infof("Compiled into %d torrents, %d were missing info", allTorrents.Count(), noInfoCount)
// data directory cleanup
// existingHashes := mapset.NewSet[string]()
// t.getTorrentFiles().Each(func(path string) bool {
// path = filepath.Base(path)
// hash := strings.TrimSuffix(path, ".torrent_zurg")
// existingHashes.Add(hash)
// return false
// })
// existingHashes.Difference(freshHashes).Each(func(hash string) bool {
// t.log.Infof("Deleting stale torrent file %s", hash)
// t.deleteTorrentFile(hash)
// return false
// })
existingIDs := mapset.NewSet[string]()
t.getInfoFiles().Each(func(path string) bool {
path = filepath.Base(path)
torrentID := strings.TrimSuffix(path, ".info_zurg")
existingIDs.Add(torrentID)
if !t.binOnceDone(torrentID) {
existingIDs.Add(torrentID)
}
return false
})
existingIDs.Difference(freshIDs).Each(func(id string) bool {
@@ -149,6 +164,8 @@ func (t *TorrentManager) refreshTorrents() []string {
return false
})
t.cleanupBins(freshIDs)
return updatedPaths.ToSlice()
}
@@ -195,18 +212,18 @@ func (t *TorrentManager) getMoreInfo(rdTorrent realdebrid.Torrent) *realdebrid.T
}
func (t *TorrentManager) convertToTorrent(info *realdebrid.TorrentInfo) *Torrent {
torrent := t.readTorrentFromFile(info.Hash)
if torrent != nil && torrent.Components.Has(info.ID) {
torrent := t.readTorrentFromFile("data/" + info.Hash + ".zurgtorrent")
if torrent != nil && torrent.DownloadedIDs.Contains(info.ID) {
return torrent
}
torrent = &Torrent{
Name: info.Name,
OriginalName: info.OriginalName,
Added: info.Added,
Hash: info.Hash,
State: NewTorrentState("broken_torrent"),
Components: cmap.New[*realdebrid.TorrentInfo](),
Name: info.Name,
OriginalName: info.OriginalName,
Added: info.Added,
Hash: info.Hash,
State: NewTorrentState("broken_torrent"),
DownloadedIDs: mapset.NewSet[string](),
}
// SelectedFiles is a subset of Files with only the selected ones
@@ -271,7 +288,7 @@ func (t *TorrentManager) convertToTorrent(info *realdebrid.TorrentInfo) *Torrent
}
}
torrent.Components.Set(info.ID, info)
torrent.DownloadedIDs.Add(info.ID)
return torrent
}
@@ -286,26 +303,14 @@ func (t *TorrentManager) mergeTorrents(existing, toMerge *Torrent) *Torrent {
older = toMerge
}
// components
numComponents := 0
mergedComponents := cmap.New[*realdebrid.TorrentInfo]()
older.Components.IterCb(func(torrentID string, info *realdebrid.TorrentInfo) {
numComponents++
mergedComponents.Set(torrentID, info)
})
newer.Components.IterCb(func(torrentID string, info *realdebrid.TorrentInfo) {
numComponents++
mergedComponents.Set(torrentID, info)
})
// base of the merged torrent
mergedTorrent := &Torrent{
Name: older.Name,
OriginalName: older.OriginalName,
Rename: older.Rename,
Hash: older.Hash,
Added: older.Added,
Components: mergedComponents,
Name: older.Name,
OriginalName: older.OriginalName,
Rename: older.Rename,
Hash: older.Hash,
Added: older.Added,
DownloadedIDs: older.DownloadedIDs.Union(newer.DownloadedIDs),
State: older.State,
}
@@ -368,7 +373,7 @@ func (t *TorrentManager) mergeTorrents(existing, toMerge *Torrent) *Torrent {
}
func (t *TorrentManager) assignDirectory(tor *Torrent, cb func(string)) {
torrentIDs := tor.Components.Keys()
torrentIDs := tor.DownloadedIDs.ToSlice()
// get filenames needed for directory conditions
var filenames []string
var fileSizes []int64
@@ -412,20 +417,78 @@ func (t *TorrentManager) IsPlayable(filePath string) bool {
return false
}
func (t *TorrentManager) trash(torrentId string) {
t.log.Debugf("Trash: %s", torrentId)
t.trashBin.Add(torrentId)
// initializeBins reads from bins.json and assigns values to t.trashBin and t.repairBin
func (t *TorrentManager) initializeBins() {
if _, err := os.Stat("data/bins.json"); os.IsNotExist(err) {
t.log.Warn("data/bins.json does not exist. Initializing empty bins.")
t.immediateBin = mapset.NewSet[string]()
t.onceDoneBin = mapset.NewSet[string]()
return
}
fileData, err := os.ReadFile("data/bins.json")
if err != nil {
t.log.Errorf("Failed to read bins.json file: %v", err)
t.immediateBin = mapset.NewSet[string]()
t.onceDoneBin = mapset.NewSet[string]()
return
}
data := map[string][]string{}
err = json.Unmarshal(fileData, &data)
if err != nil {
t.log.Errorf("Failed to unmarshal bin data: %v", err)
return
}
t.immediateBin = mapset.NewSet[string](data["trash_bin"]...)
t.onceDoneBin = mapset.NewSet[string](data["repair_bin"]...)
t.log.Debug("Successfully read bins from bins.json")
t.log.Debugf("Bin immediately: %v", t.immediateBin.ToSlice())
t.log.Debugf("Bin once done: %v", t.onceDoneBin.ToSlice())
}
func (t *TorrentManager) setToBinImmediately(torrentId string) {
t.log.Debugf("Set to delete immediately: %s", torrentId)
t.immediateBin.Add(torrentId)
t.persistBins()
}
func (t *TorrentManager) trashOnceCompleted(torrentId string) {
t.log.Debugf("Trash once completed: %s", torrentId)
t.repairBin.Add(torrentId)
func (t *TorrentManager) setToBinOnceDone(torrentId string) {
t.log.Debugf("Set to delete once completed: %s", torrentId)
t.onceDoneBin.Add(torrentId)
t.persistBins()
}
func (t *TorrentManager) saveToBinFile() {
func (t *TorrentManager) binImmediately(torrentId string) bool {
if t.immediateBin.Contains(torrentId) {
if err := t.api.DeleteTorrent(torrentId); err != nil {
t.log.Errorf("Failed to delete torrent %s: %v", torrentId, err)
return false
}
t.immediateBin.Remove(torrentId)
return true
}
return false
}
func (t *TorrentManager) binOnceDone(torrentId string) bool {
if t.onceDoneBin.Contains(torrentId) {
if err := t.api.DeleteTorrent(torrentId); err != nil {
t.log.Errorf("Failed to delete torrent %s: %v", torrentId, err)
return false
}
t.onceDoneBin.Remove(torrentId)
return true
}
return false
}
func (t *TorrentManager) persistBins() {
data := map[string]interface{}{
"trash_bin": t.trashBin.ToSlice(), // Assuming trashBin is a mapset.Set[string]
"repair_bin": t.repairBin.ToSlice(), // Assuming repairBin is a mapset.Set[string]
"trash_bin": t.immediateBin.ToSlice(), // Assuming trashBin is a mapset.Set[string]
"repair_bin": t.onceDoneBin.ToSlice(), // Assuming repairBin is a mapset.Set[string]
}
jsonData, err := json.Marshal(data)
@@ -434,17 +497,29 @@ func (t *TorrentManager) saveToBinFile() {
return
}
file, err := os.Create("trash_bin")
file, err := os.Create("data/bins.json")
if err != nil {
t.log.Errorf("Failed to create trash_bin file: %v", err)
t.log.Errorf("Failed to create bins.json file: %v", err)
return
}
defer file.Close()
_, err = file.Write(jsonData)
if err != nil {
t.log.Errorf("Failed to write to trash_bin file: %v", err)
t.log.Errorf("Failed to write to bins.json file: %v", err)
} else {
t.log.Debug("Successfully saved bins to file")
}
}
func (t *TorrentManager) cleanupBins(freshIDs mapset.Set[string]) {
t.immediateBin.Difference(freshIDs).Each(func(id string) bool {
t.immediateBin.Remove(id)
return false
})
t.onceDoneBin.Difference(freshIDs).Each(func(id string) bool {
t.onceDoneBin.Remove(id)
return false
})
t.persistBins()
}

View File

@@ -156,7 +156,7 @@ func (t *TorrentManager) Repair(torrent *Torrent, wg *sync.WaitGroup) {
}
func (t *TorrentManager) repair(torrent *Torrent) {
torrentIDs := torrent.Components.Keys()
torrentIDs := torrent.DownloadedIDs.ToSlice()
t.log.Infof("Started repair process for torrent %s (ids=%v)", t.GetKey(torrent), torrentIDs)
// handle torrents with incomplete links for selected files
@@ -180,10 +180,11 @@ func (t *TorrentManager) repair(torrent *Torrent) {
return
}
// delete old torrents
torrent.Components.IterCb(func(id string, _ *realdebrid.TorrentInfo) {
if id != info.ID {
t.api.DeleteTorrent(id)
torrent.DownloadedIDs.Each(func(torrentID string) bool {
if torrentID != info.ID {
t.setToBinOnceDone(torrentID)
}
return false
})
t.log.Infof("Successfully repaired torrent %s by redownloading all files", t.GetKey(torrent))
return
@@ -212,7 +213,7 @@ func (t *TorrentManager) repair(torrent *Torrent) {
return
}
t.log.Infof("Torrent %s will be repaired by downloading %d batches of the %d broken files", int(math.Ceil(float64(len(brokenFiles))/100)), len(brokenFiles), t.GetKey(torrent))
t.log.Infof("Torrent %s will be repaired by downloading %d batches of the %d broken files", t.GetKey(torrent), int(math.Ceil(float64(len(brokenFiles))/100)), len(brokenFiles))
newlyDownloadedIds := make([]string, 0)
batchNum := 1
@@ -227,7 +228,7 @@ func (t *TorrentManager) repair(torrent *Torrent) {
if err != nil {
t.log.Warnf("Cannot repair torrent %s by downloading broken files (error=%s) giving up", t.GetKey(torrent), err.Error())
for _, newId := range newlyDownloadedIds {
t.trash(newId)
t.setToBinImmediately(newId)
}
return
}
@@ -243,7 +244,7 @@ func (t *TorrentManager) repair(torrent *Torrent) {
if err != nil {
t.log.Warnf("Cannot repair torrent %s by downloading broken files (error=%s) giving up", t.GetKey(torrent), err.Error())
for _, newId := range newlyDownloadedIds {
t.trash(newId)
t.setToBinImmediately(newId)
}
return
}
@@ -251,7 +252,7 @@ func (t *TorrentManager) repair(torrent *Torrent) {
}
for _, newId := range newlyDownloadedIds {
t.trashOnceCompleted(newId)
t.setToBinOnceDone(newId)
}
}
@@ -398,7 +399,7 @@ func (t *TorrentManager) redownloadTorrent(torrent *Torrent, selection []string)
// select files
err = t.api.SelectTorrentFiles(newTorrentID, finalSelection)
if err != nil {
t.trash(newTorrentID)
t.setToBinImmediately(newTorrentID)
return nil, fmt.Errorf("cannot start redownloading torrent %s (id=%s): %v", t.GetKey(torrent), newTorrentID, err)
}
// sleep for 2 second to let RD process the magnet
@@ -407,7 +408,7 @@ func (t *TorrentManager) redownloadTorrent(torrent *Torrent, selection []string)
// see if the torrent is ready
info, err = t.api.GetTorrentInfo(newTorrentID)
if err != nil {
t.trash(newTorrentID)
t.setToBinImmediately(newTorrentID)
return nil, fmt.Errorf("cannot get info on redownloaded torrent %s (id=%s) : %v", t.GetKey(torrent), newTorrentID, err)
}
@@ -433,13 +434,13 @@ func (t *TorrentManager) redownloadTorrent(torrent *Torrent, selection []string)
}
if !isOkStatus {
t.trash(info.ID)
t.setToBinImmediately(info.ID)
return nil, fmt.Errorf("the redownloaded torrent %s is in a non-OK state: %s", t.GetKey(torrent), info.Status)
}
// check if incorrect number of links
if info.Progress == 100 && len(info.Links) != len(selection) {
t.trash(newTorrentID)
t.setToBinImmediately(newTorrentID)
return nil, fmt.Errorf("torrent %s only got %d links but we need %d", t.GetKey(torrent), len(info.Links), len(selection))
}

View File

@@ -4,7 +4,6 @@ import (
stdjson "encoding/json"
"strings"
"github.com/debridmediamanager/zurg/pkg/realdebrid"
mapset "github.com/deckarep/golang-set/v2"
jsoniter "github.com/json-iterator/go"
"github.com/looplab/fsm"
@@ -14,13 +13,13 @@ import (
var json = jsoniter.ConfigCompatibleWithStandardLibrary
type Torrent struct {
Name string `json:"Name"`
OriginalName string `json:"OriginalName"`
Hash string `json:"Hash"`
Added string `json:"Added"`
Components cmap.ConcurrentMap[string, *realdebrid.TorrentInfo] `json:"-"`
Name string `json:"Name"`
OriginalName string `json:"OriginalName"`
Hash string `json:"Hash"`
Added string `json:"Added"`
DownloadedIDs mapset.Set[string] `json:"DownloadedIDs"` // used for keeping track of downloaded files
UnassignedLinks mapset.Set[string] `json:"UnassignedLinks"` // when links are not complete, we cannot assign them to a file so we store them here until it's fixed
UnassignedLinks mapset.Set[string] `json:"-"` // when links are not complete, we cannot assign them to a file so we store them here until it's fixed
Rename string `json:"Rename"`
SelectedFiles cmap.ConcurrentMap[string, *File] `json:"-"`
@@ -33,7 +32,7 @@ type Torrent struct {
func (t *Torrent) MarshalJSON() ([]byte, error) {
type Alias Torrent
temp := &struct {
ComponentsJson stdjson.RawMessage `json:"Components"`
DownloadedIDsJson stdjson.RawMessage `json:"DownloadedIDs"`
SelectedFilesJson stdjson.RawMessage `json:"SelectedFiles"`
UnassignedLinksJson stdjson.RawMessage `json:"UnassignedLinks"`
StateJson stdjson.RawMessage `json:"State"`
@@ -55,11 +54,12 @@ func (t *Torrent) MarshalJSON() ([]byte, error) {
temp.UnassignedLinksJson = []byte(unassignedLinksStr)
}
componentsJson, err := t.Components.MarshalJSON()
if err != nil {
return nil, err
if t.DownloadedIDs.IsEmpty() {
temp.DownloadedIDsJson = []byte(`""`)
} else {
DownloadedIDsStr := `"` + strings.Join(t.DownloadedIDs.ToSlice(), ",") + `"`
temp.DownloadedIDsJson = []byte(DownloadedIDsStr)
}
temp.ComponentsJson = componentsJson
temp.StateJson = []byte(`"` + t.State.Current() + `"`)
@@ -69,7 +69,7 @@ func (t *Torrent) MarshalJSON() ([]byte, error) {
func (t *Torrent) UnmarshalJSON(data []byte) error {
type Alias Torrent
temp := &struct {
ComponentsJson stdjson.RawMessage `json:"Components"`
DownloadedIDsJson stdjson.RawMessage `json:"DownloadedIDs"`
SelectedFilesJson stdjson.RawMessage `json:"SelectedFiles"`
UnassignedLinksJson stdjson.RawMessage `json:"UnassignedLinks"`
StateJson string `json:"State"`
@@ -81,13 +81,6 @@ func (t *Torrent) UnmarshalJSON(data []byte) error {
return err
}
t.Components = cmap.New[*realdebrid.TorrentInfo]()
if len(temp.ComponentsJson) > 0 {
if err := t.Components.UnmarshalJSON(temp.ComponentsJson); err != nil {
return err
}
}
t.SelectedFiles = cmap.New[*File]()
if len(temp.SelectedFilesJson) > 0 {
if err := t.SelectedFiles.UnmarshalJSON(temp.SelectedFilesJson); err != nil {
@@ -95,6 +88,13 @@ func (t *Torrent) UnmarshalJSON(data []byte) error {
}
}
if len(temp.DownloadedIDsJson) > 2 {
downloadedIDs := strings.Split(strings.ReplaceAll(string(temp.DownloadedIDsJson), `"`, ""), ",")
t.DownloadedIDs = mapset.NewSet[string](downloadedIDs...)
} else {
t.DownloadedIDs = mapset.NewSet[string]()
}
if len(temp.UnassignedLinksJson) > 2 {
unassignedLinks := strings.Split(strings.ReplaceAll(string(temp.UnassignedLinksJson), `"`, ""), ",")
t.UnassignedLinks = mapset.NewSet[string](unassignedLinks...)

View File

@@ -146,7 +146,7 @@ func (dl *Downloader) streamFileToResponse(
// Add the range header if it exists
if req.Header.Get("Range") != "" {
dlReq.Header.Add("Range", req.Header.Get("Range"))
log.Debugf("Range request for file %s: %s", unrestrict.Download, req.Header.Get("Range"))
// log.Debugf("Range request for file %s: %s", unrestrict.Download, req.Header.Get("Range"))
}
// Perform the request