Remove fixer concept
This commit is contained in:
@@ -1,141 +0,0 @@
|
|||||||
package torrent
|
|
||||||
|
|
||||||
import (
|
|
||||||
"io"
|
|
||||||
"os"
|
|
||||||
|
|
||||||
"github.com/debridmediamanager/zurg/pkg/realdebrid"
|
|
||||||
cmap "github.com/orcaman/concurrent-map/v2"
|
|
||||||
)
|
|
||||||
|
|
||||||
// fixers are commands that will be run on the next refresh
|
|
||||||
// they are stored in a file so that they can be run on startup
|
|
||||||
// they follow the format of:
|
|
||||||
// key: <id_trigger> value: <command>
|
|
||||||
// id_trigger: this means a specific torrent id's completion
|
|
||||||
// commands: delete | repair
|
|
||||||
|
|
||||||
func (t *TorrentManager) registerFixer(torrentId, command string) {
|
|
||||||
t.log.Debugf("Adding fixer command: %s %s", torrentId, command)
|
|
||||||
t.fixers.Set(torrentId, command)
|
|
||||||
t.writeFixersToFile()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *TorrentManager) processFixers(instances []realdebrid.Torrent) {
|
|
||||||
t.log.Debugf("Processing fixers (%d left: %v)", t.fixers.Count(), t.fixers.Keys())
|
|
||||||
var toDelete []string
|
|
||||||
var toRedownload []*Torrent
|
|
||||||
allTorrents, _ := t.DirectoryMap.Get(INT_ALL)
|
|
||||||
for _, instance := range instances {
|
|
||||||
if !t.fixers.Has(instance.ID) {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
oldTorrentId := instance.ID
|
|
||||||
command, _ := t.fixers.Pop(oldTorrentId) // delete the fixer if it's done
|
|
||||||
switch command {
|
|
||||||
|
|
||||||
case "replaced": // id is old torrent id
|
|
||||||
t.log.Debugf("Deleting old id=%s because it's redundant to fixed torrent %s ", oldTorrentId, instance.Name)
|
|
||||||
toDelete = append(toDelete, oldTorrentId)
|
|
||||||
continue
|
|
||||||
|
|
||||||
case "download_failed": // id is failed fixer id
|
|
||||||
t.log.Debugf("Deleting failed fixer id=%s of torrent %s", oldTorrentId, instance.Name)
|
|
||||||
toDelete = append(toDelete, oldTorrentId)
|
|
||||||
continue
|
|
||||||
|
|
||||||
case "repaired": // this torrent contains broken files
|
|
||||||
if instance.Progress != 100 {
|
|
||||||
t.fixers.Set(oldTorrentId, command) // requeue the fixer, it's not done yet
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
fixedTorrent := t.getMoreInfo(instance)
|
|
||||||
t.log.Debugf("Repairing torrent %s again now that fixer id=%s is done", t.GetKey(fixedTorrent), oldTorrentId)
|
|
||||||
repairMe, _ := allTorrents.Get(t.GetKey(fixedTorrent))
|
|
||||||
toRedownload = append(toRedownload, repairMe)
|
|
||||||
toDelete = append(toDelete, oldTorrentId)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, id := range toDelete {
|
|
||||||
t.api.DeleteTorrent(id)
|
|
||||||
t.deleteInfoFile(id)
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, torrent := range toRedownload {
|
|
||||||
t.redownloadTorrent(torrent, []string{})
|
|
||||||
}
|
|
||||||
|
|
||||||
t.writeFixersToFile()
|
|
||||||
|
|
||||||
t.log.Debugf("Finished processing fixers")
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *TorrentManager) removeExpiredFixers(instances []realdebrid.Torrent) {
|
|
||||||
fixers := t.fixers.Keys()
|
|
||||||
for _, fixerID := range fixers {
|
|
||||||
found := false
|
|
||||||
for _, instance := range instances {
|
|
||||||
if instance.ID == fixerID {
|
|
||||||
found = true
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if !found {
|
|
||||||
t.log.Debugf("Removing expired fixer id=%s", fixerID)
|
|
||||||
t.fixers.Remove(fixerID)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *TorrentManager) writeFixersToFile() {
|
|
||||||
filePath := "data/fixers.json"
|
|
||||||
file, err := os.Create(filePath)
|
|
||||||
if err != nil {
|
|
||||||
t.log.Warnf("Cannot create fixer file %s: %v", filePath, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
defer file.Close()
|
|
||||||
|
|
||||||
fileData, err := t.fixers.MarshalJSON()
|
|
||||||
if err != nil {
|
|
||||||
t.log.Warnf("Cannot marshal fixers: %v", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
_, err = file.Write(fileData)
|
|
||||||
if err != nil {
|
|
||||||
t.log.Warnf("Cannot write to fixer file %s: %v", filePath, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *TorrentManager) readFixersFromFile() (ret cmap.ConcurrentMap[string, string]) {
|
|
||||||
ret = cmap.New[string]()
|
|
||||||
filePath := "data/fixers.json"
|
|
||||||
file, err := os.Open(filePath)
|
|
||||||
if err != nil {
|
|
||||||
if os.IsNotExist(err) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
t.log.Warnf("Cannot open fixer file %s: %v", filePath, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
defer file.Close()
|
|
||||||
fileData, err := io.ReadAll(file)
|
|
||||||
if err != nil {
|
|
||||||
t.log.Warnf("Cannot read fixer file %s: %v", filePath, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
err = ret.UnmarshalJSON(fileData)
|
|
||||||
if err != nil {
|
|
||||||
t.log.Warnf("Cannot unmarshal fixer file %s: %v", filePath, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
return
|
|
||||||
}
|
|
||||||
@@ -40,11 +40,11 @@ type TorrentManager struct {
|
|||||||
|
|
||||||
latestState *LibraryState
|
latestState *LibraryState
|
||||||
|
|
||||||
fixers cmap.ConcurrentMap[string, string] // trigger -> [command, id]
|
|
||||||
repairTrigger chan *Torrent
|
repairTrigger chan *Torrent
|
||||||
repairSet mapset.Set[*Torrent]
|
repairSet mapset.Set[*Torrent]
|
||||||
repairRunning bool
|
repairRunning bool
|
||||||
repairRunningMu sync.Mutex
|
repairRunningMu sync.Mutex
|
||||||
|
trashBin mapset.Set[string]
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewTorrentManager creates a new torrent manager
|
// NewTorrentManager creates a new torrent manager
|
||||||
@@ -71,10 +71,10 @@ func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, w
|
|||||||
latestState: &LibraryState{log: log},
|
latestState: &LibraryState{log: log},
|
||||||
}
|
}
|
||||||
|
|
||||||
t.fixers = t.readFixersFromFile()
|
t.trashBin = mapset.NewSet[string]()
|
||||||
t.initializeDirectories()
|
t.initializeDirectories()
|
||||||
t.workerPool.Submit(func() {
|
t.workerPool.Submit(func() {
|
||||||
t.refreshTorrents(true)
|
t.refreshTorrents()
|
||||||
t.setNewLatestState(t.getCurrentState())
|
t.setNewLatestState(t.getCurrentState())
|
||||||
t.StartRefreshJob()
|
t.StartRefreshJob()
|
||||||
t.StartRepairJob()
|
t.StartRepairJob()
|
||||||
|
|||||||
@@ -15,7 +15,7 @@ import (
|
|||||||
cmap "github.com/orcaman/concurrent-map/v2"
|
cmap "github.com/orcaman/concurrent-map/v2"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (t *TorrentManager) refreshTorrents(isInitialRun bool) []string {
|
func (t *TorrentManager) refreshTorrents() []string {
|
||||||
instances, _, err := t.api.GetTorrents(false)
|
instances, _, err := t.api.GetTorrents(false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.log.Warnf("Cannot get torrents: %v", err)
|
t.log.Warnf("Cannot get torrents: %v", err)
|
||||||
@@ -26,6 +26,11 @@ func (t *TorrentManager) refreshTorrents(isInitialRun bool) []string {
|
|||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
|
|
||||||
for i := range instances {
|
for i := range instances {
|
||||||
|
if t.trashBin.Contains(instances[i].ID) {
|
||||||
|
t.api.DeleteTorrent(instances[i].ID)
|
||||||
|
t.log.Infof("Skipping trashed torrent %s", instances[i].Name)
|
||||||
|
torChan <- nil
|
||||||
|
}
|
||||||
idx := i
|
idx := i
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
_ = t.workerPool.Submit(func() {
|
_ = t.workerPool.Submit(func() {
|
||||||
@@ -128,15 +133,6 @@ func (t *TorrentManager) refreshTorrents(isInitialRun bool) []string {
|
|||||||
return false
|
return false
|
||||||
})
|
})
|
||||||
|
|
||||||
if t.Config.EnableRepair() {
|
|
||||||
if isInitialRun {
|
|
||||||
t.removeExpiredFixers(instances)
|
|
||||||
}
|
|
||||||
t.workerPool.Submit(func() {
|
|
||||||
t.processFixers(instances)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
return updatedPaths
|
return updatedPaths
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -157,7 +153,7 @@ func (t *TorrentManager) StartRefreshJob() {
|
|||||||
t.log.Infof("Detected changes! Refreshing %d torrents", checksum.TotalCount)
|
t.log.Infof("Detected changes! Refreshing %d torrents", checksum.TotalCount)
|
||||||
t.setNewLatestState(checksum)
|
t.setNewLatestState(checksum)
|
||||||
|
|
||||||
updatedPaths := t.refreshTorrents(false)
|
updatedPaths := t.refreshTorrents()
|
||||||
t.log.Info("Finished refreshing torrents")
|
t.log.Info("Finished refreshing torrents")
|
||||||
|
|
||||||
t.TriggerHookOnLibraryUpdate(updatedPaths)
|
t.TriggerHookOnLibraryUpdate(updatedPaths)
|
||||||
@@ -294,36 +290,26 @@ func (t *TorrentManager) mergeToMain(existing, toMerge *Torrent) *Torrent {
|
|||||||
Rename: newer.Rename,
|
Rename: newer.Rename,
|
||||||
Hash: newer.Hash,
|
Hash: newer.Hash,
|
||||||
Added: newer.Added,
|
Added: newer.Added,
|
||||||
|
Components: mergedComponents,
|
||||||
Components: mergedComponents,
|
|
||||||
UnrepairableReason: newer.UnrepairableReason,
|
|
||||||
|
|
||||||
State: older.State,
|
State: older.State,
|
||||||
}
|
}
|
||||||
|
|
||||||
// unrepairable reason
|
// unrepairable reason
|
||||||
if mainTorrent.UnrepairableReason != "" && older.UnrepairableReason != "" && mainTorrent.UnrepairableReason != older.UnrepairableReason {
|
reasons := mapset.NewSet[string]()
|
||||||
mainTorrent.UnrepairableReason = fmt.Sprintf("%s, %s", mainTorrent.UnrepairableReason, older.UnrepairableReason)
|
reasons.Add(older.UnrepairableReason)
|
||||||
} else if older.UnrepairableReason != "" {
|
reasons.Add(newer.UnrepairableReason)
|
||||||
mainTorrent.UnrepairableReason = older.UnrepairableReason
|
mainTorrent.UnrepairableReason = strings.Join(reasons.ToSlice(), ", ")
|
||||||
}
|
|
||||||
|
|
||||||
// the link can have the following values
|
|
||||||
// 1. https://*** - the file is available
|
|
||||||
// 3. empty - the file is not available
|
|
||||||
mainTorrent.SelectedFiles = cmap.New[*File]()
|
mainTorrent.SelectedFiles = cmap.New[*File]()
|
||||||
newer.SelectedFiles.IterCb(func(key string, newerFile *File) {
|
|
||||||
mainTorrent.SelectedFiles.Set(key, newerFile)
|
|
||||||
})
|
|
||||||
older.SelectedFiles.IterCb(func(key string, olderFile *File) {
|
older.SelectedFiles.IterCb(func(key string, olderFile *File) {
|
||||||
if !mainTorrent.SelectedFiles.Has(key) {
|
mainTorrent.SelectedFiles.Set(key, olderFile)
|
||||||
mainTorrent.SelectedFiles.Set(key, olderFile)
|
})
|
||||||
} else if olderFile.State.Is("deleted_file") {
|
newer.SelectedFiles.IterCb(func(key string, newerFile *File) {
|
||||||
newerFile, _ := mainTorrent.SelectedFiles.Get(key)
|
if f, ok := mainTorrent.SelectedFiles.Get(key); ok && f.State.Is("deleted_file") {
|
||||||
if err := newerFile.State.Event(context.Background(), "delete_file"); err != nil {
|
return
|
||||||
t.log.Errorf("Cannot delete file %s: %v", key, err)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
mainTorrent.SelectedFiles.Set(key, newerFile)
|
||||||
})
|
})
|
||||||
t.CheckDeletedStatus(&mainTorrent)
|
t.CheckDeletedStatus(&mainTorrent)
|
||||||
|
|
||||||
@@ -393,3 +379,8 @@ func (t *TorrentManager) IsPlayable(filePath string) bool {
|
|||||||
}
|
}
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (t *TorrentManager) trash(torrentId string) {
|
||||||
|
t.log.Debugf("Trash: %s", torrentId)
|
||||||
|
t.trashBin.Add(torrentId)
|
||||||
|
}
|
||||||
|
|||||||
@@ -176,6 +176,13 @@ func (t *TorrentManager) repair(torrent *Torrent) {
|
|||||||
t.log.Errorf("Cannot repair torrent %s: %v", torrent.Hash, err)
|
t.log.Errorf("Cannot repair torrent %s: %v", torrent.Hash, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
// delete old torrents
|
||||||
|
for id := range torrent.Components {
|
||||||
|
if id == info.ID {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
t.api.DeleteTorrent(id)
|
||||||
|
}
|
||||||
t.log.Infof("Successfully repaired torrent %s by redownloading all files", t.GetKey(torrent))
|
t.log.Infof("Successfully repaired torrent %s by redownloading all files", t.GetKey(torrent))
|
||||||
return
|
return
|
||||||
} else if info != nil && info.Progress != 100 {
|
} else if info != nil && info.Progress != 100 {
|
||||||
@@ -205,11 +212,6 @@ func (t *TorrentManager) repair(torrent *Torrent) {
|
|||||||
|
|
||||||
t.log.Infof("Repairing by downloading %d batches of the %d broken files of torrent %s", int(math.Ceil(float64(len(brokenFiles))/130)), len(brokenFiles), t.GetKey(torrent))
|
t.log.Infof("Repairing by downloading %d batches of the %d broken files of torrent %s", int(math.Ceil(float64(len(brokenFiles))/130)), len(brokenFiles), t.GetKey(torrent))
|
||||||
|
|
||||||
oldTorrentIDs := []string{}
|
|
||||||
for id := range torrent.Components {
|
|
||||||
oldTorrentIDs = append(oldTorrentIDs, id)
|
|
||||||
}
|
|
||||||
|
|
||||||
newlyDownloadedIds := make([]string, 0)
|
newlyDownloadedIds := make([]string, 0)
|
||||||
group := make([]*File, 0)
|
group := make([]*File, 0)
|
||||||
batchNum := 1
|
batchNum := 1
|
||||||
@@ -223,7 +225,7 @@ func (t *TorrentManager) repair(torrent *Torrent) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.log.Warnf("Cannot repair torrent %s by downloading broken files (error=%s) giving up", t.GetKey(torrent), err.Error())
|
t.log.Warnf("Cannot repair torrent %s by downloading broken files (error=%s) giving up", t.GetKey(torrent), err.Error())
|
||||||
for _, newId := range newlyDownloadedIds {
|
for _, newId := range newlyDownloadedIds {
|
||||||
t.registerFixer(newId, "download_failed")
|
t.trash(newId)
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -240,15 +242,13 @@ func (t *TorrentManager) repair(torrent *Torrent) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.log.Warnf("Cannot repair torrent %s by downloading broken files (error=%s) giving up", t.GetKey(torrent), err.Error())
|
t.log.Warnf("Cannot repair torrent %s by downloading broken files (error=%s) giving up", t.GetKey(torrent), err.Error())
|
||||||
for _, newId := range newlyDownloadedIds {
|
for _, newId := range newlyDownloadedIds {
|
||||||
t.registerFixer(newId, "download_failed")
|
t.trash(newId)
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, oldId := range oldTorrentIDs {
|
/// TODO: should we delete the old torrents that were replaced?
|
||||||
t.registerFixer(oldId, "replaced")
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *TorrentManager) assignUnassignedLinks(torrent *Torrent) bool {
|
func (t *TorrentManager) assignUnassignedLinks(torrent *Torrent) bool {
|
||||||
@@ -350,14 +350,9 @@ func (t *TorrentManager) assignUnassignedLinks(torrent *Torrent) bool {
|
|||||||
func (t *TorrentManager) redownloadTorrent(torrent *Torrent, selection []string) (*realdebrid.TorrentInfo, error) {
|
func (t *TorrentManager) redownloadTorrent(torrent *Torrent, selection []string) (*realdebrid.TorrentInfo, error) {
|
||||||
// broken files means broken links
|
// broken files means broken links
|
||||||
// if brokenFiles is not provided, we will redownload all files
|
// if brokenFiles is not provided, we will redownload all files
|
||||||
oldTorrentIDs := make([]string, 0)
|
|
||||||
finalSelection := strings.Join(selection, ",")
|
finalSelection := strings.Join(selection, ",")
|
||||||
selectionCount := len(selection)
|
selectionCount := len(selection)
|
||||||
if selectionCount == 0 {
|
if selectionCount == 0 {
|
||||||
// only delete the old torrent if we are redownloading all files
|
|
||||||
for id := range torrent.Components {
|
|
||||||
oldTorrentIDs = append(oldTorrentIDs, id)
|
|
||||||
}
|
|
||||||
tmpSelection := ""
|
tmpSelection := ""
|
||||||
torrent.SelectedFiles.IterCb(func(_ string, file *File) {
|
torrent.SelectedFiles.IterCb(func(_ string, file *File) {
|
||||||
tmpSelection += fmt.Sprintf("%d,", file.ID) // select all files
|
tmpSelection += fmt.Sprintf("%d,", file.ID) // select all files
|
||||||
@@ -399,7 +394,7 @@ func (t *TorrentManager) redownloadTorrent(torrent *Torrent, selection []string)
|
|||||||
// select files
|
// select files
|
||||||
err = t.api.SelectTorrentFiles(newTorrentID, finalSelection)
|
err = t.api.SelectTorrentFiles(newTorrentID, finalSelection)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.registerFixer(newTorrentID, "download_failed")
|
t.trash(newTorrentID)
|
||||||
return nil, fmt.Errorf("cannot start redownloading torrent %s (id=%s): %v", t.GetKey(torrent), newTorrentID, err)
|
return nil, fmt.Errorf("cannot start redownloading torrent %s (id=%s): %v", t.GetKey(torrent), newTorrentID, err)
|
||||||
}
|
}
|
||||||
// sleep for 2 second to let RD process the magnet
|
// sleep for 2 second to let RD process the magnet
|
||||||
@@ -408,7 +403,7 @@ func (t *TorrentManager) redownloadTorrent(torrent *Torrent, selection []string)
|
|||||||
// see if the torrent is ready
|
// see if the torrent is ready
|
||||||
info, err = t.api.GetTorrentInfo(newTorrentID)
|
info, err = t.api.GetTorrentInfo(newTorrentID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.registerFixer(newTorrentID, "download_failed")
|
t.trash(newTorrentID)
|
||||||
return nil, fmt.Errorf("cannot get info on redownloaded torrent %s (id=%s) : %v", t.GetKey(torrent), newTorrentID, err)
|
return nil, fmt.Errorf("cannot get info on redownloaded torrent %s (id=%s) : %v", t.GetKey(torrent), newTorrentID, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -434,20 +429,17 @@ func (t *TorrentManager) redownloadTorrent(torrent *Torrent, selection []string)
|
|||||||
}
|
}
|
||||||
|
|
||||||
if !isOkStatus {
|
if !isOkStatus {
|
||||||
t.registerFixer(info.ID, "download_failed")
|
t.trash(info.ID)
|
||||||
return nil, fmt.Errorf("the redownloaded torrent %s is in a non-OK state: %s", t.GetKey(torrent), info.Status)
|
return nil, fmt.Errorf("the redownloaded torrent %s is in a non-OK state: %s", t.GetKey(torrent), info.Status)
|
||||||
}
|
}
|
||||||
|
|
||||||
// check if incorrect number of links
|
// check if incorrect number of links
|
||||||
if info.Progress == 100 && len(info.Links) != selectionCount {
|
if info.Progress == 100 && len(info.Links) != selectionCount {
|
||||||
t.registerFixer(newTorrentID, "download_failed")
|
t.trash(newTorrentID)
|
||||||
return nil, fmt.Errorf("torrent %s only got %d links but we need %d", t.GetKey(torrent), len(info.Links), selectionCount)
|
return nil, fmt.Errorf("torrent %s only got %d links but we need %d", t.GetKey(torrent), len(info.Links), selectionCount)
|
||||||
}
|
}
|
||||||
|
|
||||||
t.log.Infof("Redownloading torrent %s successful (id=%s, progress=%d)", t.GetKey(torrent), info.ID, info.Progress)
|
t.log.Infof("Redownloading torrent %s successful (id=%s, progress=%d)", t.GetKey(torrent), info.ID, info.Progress)
|
||||||
for _, id := range oldTorrentIDs {
|
|
||||||
t.registerFixer(id, "replaced")
|
|
||||||
}
|
|
||||||
|
|
||||||
return info, nil
|
return info, nil
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user