This commit is contained in:
Ben Sarmiento
2024-05-23 22:20:19 +02:00
parent 8636a0569d
commit beba993364
3 changed files with 78 additions and 36 deletions

View File

@@ -41,10 +41,12 @@ type TorrentManager struct {
latestState *LibraryState
repairTrigger chan *Torrent
repairSet mapset.Set[*Torrent]
repairQueue mapset.Set[*Torrent]
repairRunning bool
repairRunningMu sync.Mutex
trashBin mapset.Set[string]
trashBin mapset.Set[string]
repairBin mapset.Set[string] // same as trash bin, but only if the torrent has been downloaded
}
// NewTorrentManager creates a new torrent manager

View File

@@ -3,6 +3,7 @@ package torrent
import (
"context"
"fmt"
"os"
"path/filepath"
"strings"
"sync"
@@ -33,8 +34,6 @@ func (t *TorrentManager) refreshTorrents() []string {
freshIDs := mapset.NewSet[string]()
freshAccessKeys := mapset.NewSet[string]()
t.log.Infof("Getting info of %d torrents", len(instances))
for i := range instances {
wg.Add(1)
idx := i
@@ -156,7 +155,7 @@ func (t *TorrentManager) refreshTorrents() []string {
// StartRefreshJob periodically refreshes the torrents
func (t *TorrentManager) StartRefreshJob() {
go func() {
t.log.Info("Starting periodic refresh job")
t.log.Debug("Starting periodic refresh job")
refreshTicker := time.NewTicker(time.Duration(t.Config.GetRefreshEverySecs()) * time.Second)
defer refreshTicker.Stop()
@@ -417,3 +416,35 @@ func (t *TorrentManager) trash(torrentId string) {
t.log.Debugf("Trash: %s", torrentId)
t.trashBin.Add(torrentId)
}
func (t *TorrentManager) trashOnceCompleted(torrentId string) {
t.log.Debugf("Trash once completed: %s", torrentId)
t.repairBin.Add(torrentId)
}
func (t *TorrentManager) saveToBinFile() {
data := map[string]interface{}{
"trash_bin": t.trashBin.ToSlice(), // Assuming trashBin is a mapset.Set[string]
"repair_bin": t.repairBin.ToSlice(), // Assuming repairBin is a mapset.Set[string]
}
jsonData, err := json.Marshal(data)
if err != nil {
t.log.Errorf("Failed to marshal bin data: %v", err)
return
}
file, err := os.Create("trash_bin")
if err != nil {
t.log.Errorf("Failed to create trash_bin file: %v", err)
return
}
defer file.Close()
_, err = file.Write(jsonData)
if err != nil {
t.log.Errorf("Failed to write to trash_bin file: %v", err)
} else {
t.log.Debug("Successfully saved bins to file")
}
}

View File

@@ -24,10 +24,10 @@ func (t *TorrentManager) StartRepairJob() {
return
}
t.repairTrigger = make(chan *Torrent)
t.repairSet = mapset.NewSet[*Torrent]()
t.repairQueue = mapset.NewSet[*Torrent]()
// there is 1 repair worker, with max 1 blocking task
go func() {
t.log.Info("Starting periodic repair job")
t.log.Debug("Starting periodic repair job")
repairTicker := time.NewTicker(time.Duration(t.Config.GetRepairEveryMins()) * time.Minute)
defer repairTicker.Stop()
@@ -50,7 +50,7 @@ func (t *TorrentManager) invokeRepair(torrent *Torrent) {
t.repairRunningMu.Lock()
if t.repairRunning {
t.repairRunningMu.Unlock()
t.repairSet.Add(torrent)
t.repairQueue.Add(torrent)
// don't do anything if repair is already running
return
}
@@ -66,9 +66,9 @@ func (t *TorrentManager) invokeRepair(torrent *Torrent) {
t.repairRunning = false
t.repairRunningMu.Unlock()
// before we let go, let's check repairSet
// before we let go, let's check repairQueue
t.workerPool.Submit(func() {
queuedTorrent, exists := t.repairSet.Pop()
queuedTorrent, exists := t.repairQueue.Pop()
if exists {
t.TriggerRepair(queuedTorrent)
}
@@ -85,7 +85,7 @@ func (t *TorrentManager) repairAll(torrent *Torrent) {
var haystack cmap.ConcurrentMap[string, *Torrent]
if torrent == nil {
haystack, _ = t.DirectoryMap.Get(INT_ALL)
t.log.Info("Periodic repair started; searching for broken torrents")
t.log.Debug("Periodic repair started; searching for broken torrents")
} else {
haystack = cmap.New[*Torrent]()
haystack.Set("", torrent)
@@ -94,32 +94,38 @@ func (t *TorrentManager) repairAll(torrent *Torrent) {
// collect all torrents that need to be repaired
toRepair := mapset.NewSet[*Torrent]()
var wg sync.WaitGroup
haystack.IterCb(func(_ string, torrent *Torrent) {
if torrent.UnrepairableReason != "" {
return
}
// check 1: for broken files
brokenFileCount := 0
torrent.SelectedFiles.IterCb(func(_ string, file *File) {
if file.State.Is("broken_file") {
brokenFileCount++
wg.Add(1)
_ = t.workerPool.Submit(func() {
defer wg.Done()
if torrent.UnrepairableReason != "" {
return
}
// check 1: for broken files
brokenFileCount := 0
torrent.SelectedFiles.IterCb(func(_ string, file *File) {
if file.State.Is("broken_file") {
brokenFileCount++
}
})
if brokenFileCount > 0 {
t.log.Debugf("Torrent %s has %d/%d broken files, adding to repair list", t.GetKey(torrent), brokenFileCount, torrent.SelectedFiles.Count())
toRepair.Add(torrent)
return
}
// check 2: for unassigned links (this means the torrent has started to deteriorate)
unassignedCount := torrent.UnassignedLinks.Cardinality()
if unassignedCount > 0 {
t.log.Debugf("Torrent %s has %d unassigned links, adding to repair list", t.GetKey(torrent), unassignedCount)
toRepair.Add(torrent)
return
}
})
if brokenFileCount > 0 {
t.log.Debugf("Torrent %s has %d broken files, adding to repair list", t.GetKey(torrent), brokenFileCount)
toRepair.Add(torrent)
return
}
// check 2: for unassigned links (this means the torrent has started to deteriorate)
unassignedCount := torrent.UnassignedLinks.Cardinality()
if unassignedCount > 0 {
t.log.Debugf("Torrent %s has %d unassigned links, adding to repair list", t.GetKey(torrent), unassignedCount)
toRepair.Add(torrent)
return
}
})
var wg sync.WaitGroup
wg.Wait()
toRepair.Each(func(torrent *Torrent) bool {
wg.Add(1)
t.Repair(torrent, &wg)
@@ -127,7 +133,7 @@ func (t *TorrentManager) repairAll(torrent *Torrent) {
})
wg.Wait()
t.log.Infof("Finished repair sequence for %d broken torrents", toRepair.Cardinality())
t.log.Infof("Finished periodic repair sequence for %d broken torrent(s)", toRepair.Cardinality())
}
func (t *TorrentManager) Repair(torrent *Torrent, wg *sync.WaitGroup) {
@@ -206,7 +212,7 @@ func (t *TorrentManager) repair(torrent *Torrent) {
return
}
t.log.Infof("Repairing by downloading %d batches of the %d broken files of torrent %s", int(math.Ceil(float64(len(brokenFiles))/100)), len(brokenFiles), t.GetKey(torrent))
t.log.Infof("Torrent %s will be repaired by downloading %d batches of the %d broken files", int(math.Ceil(float64(len(brokenFiles))/100)), len(brokenFiles), t.GetKey(torrent))
newlyDownloadedIds := make([]string, 0)
batchNum := 1
@@ -233,7 +239,7 @@ func (t *TorrentManager) repair(torrent *Torrent) {
t.log.Debugf("Downloading last batch of broken files of torrent %s", t.GetKey(torrent))
if len(group) > 0 {
_, err := t.redownloadTorrent(torrent, group)
redownloadedInfo, err := t.redownloadTorrent(torrent, group)
if err != nil {
t.log.Warnf("Cannot repair torrent %s by downloading broken files (error=%s) giving up", t.GetKey(torrent), err.Error())
for _, newId := range newlyDownloadedIds {
@@ -241,9 +247,12 @@ func (t *TorrentManager) repair(torrent *Torrent) {
}
return
}
newlyDownloadedIds = append(newlyDownloadedIds, redownloadedInfo.ID)
}
/// TODO: should we delete the old torrents that were replaced?
for _, newId := range newlyDownloadedIds {
t.trashOnceCompleted(newId)
}
}
func (t *TorrentManager) assignUnassignedLinks(torrent *Torrent) bool {