Prevent race condition with downloaded id

This commit is contained in:
Ben Sarmiento
2024-02-05 03:27:45 +01:00
parent dcdb83349a
commit c8bbfc8650
4 changed files with 38 additions and 26 deletions

View File

@@ -21,8 +21,8 @@ func (t *TorrentManager) fixerAddCommand(trigger, command string) {
t.writeFixersToFile() t.writeFixersToFile()
} }
func (t *TorrentManager) handleFixers(instances []realdebrid.Torrent) { func (t *TorrentManager) processFixers(instances []realdebrid.Torrent) {
t.log.Debugf("Handling fixers - %d left", t.fixers.Count()) t.log.Debugf("Processing fixers (%d left)", t.fixers.Count())
var toDelete []string var toDelete []string
var toRedownload []*Torrent var toRedownload []*Torrent
allTorrents, _ := t.DirectoryMap.Get(INT_ALL) allTorrents, _ := t.DirectoryMap.Get(INT_ALL)
@@ -56,25 +56,34 @@ func (t *TorrentManager) handleFixers(instances []realdebrid.Torrent) {
infoCache.Remove(id) infoCache.Remove(id)
t.deleteTorrentFile(id) t.deleteTorrentFile(id)
} }
for _, torrent := range toRedownload { for _, torrent := range toRedownload {
t.redownloadTorrent(torrent, "") t.redownloadTorrent(torrent, "")
} }
// remove expired fixers
var expired []string
t.fixers.IterCb(func(trigger string, command string) {
if infoCache.Has(trigger) {
return
}
expired = append(expired, trigger)
})
for _, trigger := range expired {
t.log.Debugf("Removing expired fixer id=%s", trigger)
t.fixers.Remove(trigger)
}
t.writeFixersToFile() t.writeFixersToFile()
t.log.Debugf("Finished handling fixers") t.log.Debugf("Finished processing fixers")
}
func (t *TorrentManager) removeExpiredFixers(instances []realdebrid.Torrent) {
fixers := t.fixers.Keys()
if len(fixers) == 0 {
return
}
for _, fixerID := range fixers {
found := false
for _, instance := range instances {
if instance.ID == fixerID {
found = true
break
}
}
if !found {
t.log.Debugf("Removing expired fixer id=%s", fixerID)
t.fixers.Remove(fixerID)
}
}
} }
func (t *TorrentManager) writeFixersToFile() { func (t *TorrentManager) writeFixersToFile() {

View File

@@ -64,7 +64,7 @@ func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, w
t.fixers = t.readFixersFromFile() t.fixers = t.readFixersFromFile()
t.initializeDirectories() t.initializeDirectories()
t.workerPool.Submit(func() { t.workerPool.Submit(func() {
t.refreshTorrents() t.refreshTorrents(true)
t.setNewLatestState(t.getCurrentState()) t.setNewLatestState(t.getCurrentState())
t.StartRefreshJob() t.StartRefreshJob()
t.StartRepairJob() t.StartRepairJob()

View File

@@ -14,7 +14,7 @@ import (
cmap "github.com/orcaman/concurrent-map/v2" cmap "github.com/orcaman/concurrent-map/v2"
) )
func (t *TorrentManager) refreshTorrents() []string { func (t *TorrentManager) refreshTorrents(isInitialRun bool) []string {
instances, _, err := t.Api.GetTorrents(0, false) instances, _, err := t.Api.GetTorrents(0, false)
if err != nil { if err != nil {
t.log.Warnf("Cannot get torrents: %v", err) t.log.Warnf("Cannot get torrents: %v", err)
@@ -87,10 +87,15 @@ func (t *TorrentManager) refreshTorrents() []string {
}) })
if t.Config.EnableRepair() { if t.Config.EnableRepair() {
if isInitialRun {
t.removeExpiredFixers(instances)
t.processFixers(instances)
} else {
t.workerPool.Submit(func() { t.workerPool.Submit(func() {
t.handleFixers(instances) t.processFixers(instances)
}) })
} }
}
return updatedPaths return updatedPaths
} }
@@ -112,7 +117,7 @@ func (t *TorrentManager) StartRefreshJob() {
t.log.Infof("Detected changes! Refreshing %d torrents", checksum.TotalCount) t.log.Infof("Detected changes! Refreshing %d torrents", checksum.TotalCount)
t.setNewLatestState(checksum) t.setNewLatestState(checksum)
updatedPaths := t.refreshTorrents() updatedPaths := t.refreshTorrents(false)
t.log.Info("Finished refreshing torrents") t.log.Info("Finished refreshing torrents")
t.TriggerHookOnLibraryUpdate(updatedPaths) t.TriggerHookOnLibraryUpdate(updatedPaths)

View File

@@ -118,8 +118,6 @@ func (t *TorrentManager) repairAll(torrent *Torrent) {
} }
}) })
t.log.Infof("Found %d broken torrents to repair in total", toRepair.Cardinality())
var wg sync.WaitGroup var wg sync.WaitGroup
toRepair.Each(func(torrent *Torrent) bool { toRepair.Each(func(torrent *Torrent) bool {
wg.Add(1) wg.Add(1)
@@ -143,8 +141,6 @@ func (t *TorrentManager) Repair(torrent *Torrent, wg *sync.WaitGroup) {
return return
} }
t.log.Infof("Attempting repair for torrent %s", t.GetKey(torrent))
// blocks for approx 45 minutes if active torrents are full // blocks for approx 45 minutes if active torrents are full
if !t.canCapacityHandle() { if !t.canCapacityHandle() {
t.log.Error("Blocked for too long due to limit of active torrents, cannot continue with the repair") t.log.Error("Blocked for too long due to limit of active torrents, cannot continue with the repair")
@@ -160,7 +156,7 @@ func (t *TorrentManager) Repair(torrent *Torrent, wg *sync.WaitGroup) {
} }
func (t *TorrentManager) repair(torrent *Torrent) { func (t *TorrentManager) repair(torrent *Torrent) {
t.log.Infof("Started repair process for torrent %s", t.GetKey(torrent)) t.log.Infof("Started repair process for torrent %s (ids=%v)", t.GetKey(torrent), torrent.DownloadedIDs.Union(torrent.InProgressIDs).ToSlice())
// handle torrents with incomplete links for selected files // handle torrents with incomplete links for selected files
if !t.assignUnassignedLinks(torrent) { if !t.assignUnassignedLinks(torrent) {
@@ -192,6 +188,7 @@ func (t *TorrentManager) repair(torrent *Torrent) {
} }
} }
}) })
torrent.DownloadedIDs.Add(info.ID)
t.saveTorrentChangesToDisk(torrent, nil) t.saveTorrentChangesToDisk(torrent, nil)
t.log.Infof("Successfully repaired torrent %s by redownloading", t.GetKey(torrent)) t.log.Infof("Successfully repaired torrent %s by redownloading", t.GetKey(torrent))
return return
@@ -309,6 +306,7 @@ func (t *TorrentManager) redownloadTorrent(torrent *Torrent, selection string) (
if selection == "" { if selection == "" {
// only delete the old torrent if we are redownloading all files // only delete the old torrent if we are redownloading all files
oldTorrentIDs = torrent.DownloadedIDs.Union(torrent.InProgressIDs).ToSlice() oldTorrentIDs = torrent.DownloadedIDs.Union(torrent.InProgressIDs).ToSlice()
t.log.Debugf("Redownloading torrent %s with all files (torrents=%v)", t.GetKey(torrent), oldTorrentIDs)
tmpSelection := "" tmpSelection := ""
torrent.SelectedFiles.IterCb(func(_ string, file *File) { torrent.SelectedFiles.IterCb(func(_ string, file *File) {
tmpSelection += fmt.Sprintf("%d,", file.ID) // select all files tmpSelection += fmt.Sprintf("%d,", file.ID) // select all files