From beba993364e15802742d78b89a517a26eb0ff22b Mon Sep 17 00:00:00 2001 From: Ben Sarmiento Date: Thu, 23 May 2024 22:20:19 +0200 Subject: [PATCH] Add bins --- internal/torrent/manager.go | 6 ++-- internal/torrent/refresh.go | 37 +++++++++++++++++-- internal/torrent/repair.go | 71 +++++++++++++++++++++---------------- 3 files changed, 78 insertions(+), 36 deletions(-) diff --git a/internal/torrent/manager.go b/internal/torrent/manager.go index 9d37a6b..73d4ab0 100644 --- a/internal/torrent/manager.go +++ b/internal/torrent/manager.go @@ -41,10 +41,12 @@ type TorrentManager struct { latestState *LibraryState repairTrigger chan *Torrent - repairSet mapset.Set[*Torrent] + repairQueue mapset.Set[*Torrent] repairRunning bool repairRunningMu sync.Mutex - trashBin mapset.Set[string] + + trashBin mapset.Set[string] + repairBin mapset.Set[string] // same as trash bin, but only if the torrent has been downloaded } // NewTorrentManager creates a new torrent manager diff --git a/internal/torrent/refresh.go b/internal/torrent/refresh.go index e693476..ce6c43a 100644 --- a/internal/torrent/refresh.go +++ b/internal/torrent/refresh.go @@ -3,6 +3,7 @@ package torrent import ( "context" "fmt" + "os" "path/filepath" "strings" "sync" @@ -33,8 +34,6 @@ func (t *TorrentManager) refreshTorrents() []string { freshIDs := mapset.NewSet[string]() freshAccessKeys := mapset.NewSet[string]() - t.log.Infof("Getting info of %d torrents", len(instances)) - for i := range instances { wg.Add(1) idx := i @@ -156,7 +155,7 @@ func (t *TorrentManager) refreshTorrents() []string { // StartRefreshJob periodically refreshes the torrents func (t *TorrentManager) StartRefreshJob() { go func() { - t.log.Info("Starting periodic refresh job") + t.log.Debug("Starting periodic refresh job") refreshTicker := time.NewTicker(time.Duration(t.Config.GetRefreshEverySecs()) * time.Second) defer refreshTicker.Stop() @@ -417,3 +416,35 @@ func (t *TorrentManager) trash(torrentId string) { t.log.Debugf("Trash: %s", torrentId) t.trashBin.Add(torrentId) } + +func (t *TorrentManager) trashOnceCompleted(torrentId string) { + t.log.Debugf("Trash once completed: %s", torrentId) + t.repairBin.Add(torrentId) +} + +func (t *TorrentManager) saveToBinFile() { + data := map[string]interface{}{ + "trash_bin": t.trashBin.ToSlice(), // Assuming trashBin is a mapset.Set[string] + "repair_bin": t.repairBin.ToSlice(), // Assuming repairBin is a mapset.Set[string] + } + + jsonData, err := json.Marshal(data) + if err != nil { + t.log.Errorf("Failed to marshal bin data: %v", err) + return + } + + file, err := os.Create("trash_bin") + if err != nil { + t.log.Errorf("Failed to create trash_bin file: %v", err) + return + } + defer file.Close() + + _, err = file.Write(jsonData) + if err != nil { + t.log.Errorf("Failed to write to trash_bin file: %v", err) + } else { + t.log.Debug("Successfully saved bins to file") + } +} diff --git a/internal/torrent/repair.go b/internal/torrent/repair.go index 09df0c0..c78a88d 100644 --- a/internal/torrent/repair.go +++ b/internal/torrent/repair.go @@ -24,10 +24,10 @@ func (t *TorrentManager) StartRepairJob() { return } t.repairTrigger = make(chan *Torrent) - t.repairSet = mapset.NewSet[*Torrent]() + t.repairQueue = mapset.NewSet[*Torrent]() // there is 1 repair worker, with max 1 blocking task go func() { - t.log.Info("Starting periodic repair job") + t.log.Debug("Starting periodic repair job") repairTicker := time.NewTicker(time.Duration(t.Config.GetRepairEveryMins()) * time.Minute) defer repairTicker.Stop() @@ -50,7 +50,7 @@ func (t *TorrentManager) invokeRepair(torrent *Torrent) { t.repairRunningMu.Lock() if t.repairRunning { t.repairRunningMu.Unlock() - t.repairSet.Add(torrent) + t.repairQueue.Add(torrent) // don't do anything if repair is already running return } @@ -66,9 +66,9 @@ func (t *TorrentManager) invokeRepair(torrent *Torrent) { t.repairRunning = false t.repairRunningMu.Unlock() - // before we let go, let's check repairSet + // before we let go, let's check repairQueue t.workerPool.Submit(func() { - queuedTorrent, exists := t.repairSet.Pop() + queuedTorrent, exists := t.repairQueue.Pop() if exists { t.TriggerRepair(queuedTorrent) } @@ -85,7 +85,7 @@ func (t *TorrentManager) repairAll(torrent *Torrent) { var haystack cmap.ConcurrentMap[string, *Torrent] if torrent == nil { haystack, _ = t.DirectoryMap.Get(INT_ALL) - t.log.Info("Periodic repair started; searching for broken torrents") + t.log.Debug("Periodic repair started; searching for broken torrents") } else { haystack = cmap.New[*Torrent]() haystack.Set("", torrent) @@ -94,32 +94,38 @@ func (t *TorrentManager) repairAll(torrent *Torrent) { // collect all torrents that need to be repaired toRepair := mapset.NewSet[*Torrent]() + var wg sync.WaitGroup haystack.IterCb(func(_ string, torrent *Torrent) { - if torrent.UnrepairableReason != "" { - return - } - // check 1: for broken files - brokenFileCount := 0 - torrent.SelectedFiles.IterCb(func(_ string, file *File) { - if file.State.Is("broken_file") { - brokenFileCount++ + wg.Add(1) + _ = t.workerPool.Submit(func() { + defer wg.Done() + if torrent.UnrepairableReason != "" { + return + } + // check 1: for broken files + brokenFileCount := 0 + torrent.SelectedFiles.IterCb(func(_ string, file *File) { + if file.State.Is("broken_file") { + brokenFileCount++ + } + }) + if brokenFileCount > 0 { + t.log.Debugf("Torrent %s has %d/%d broken files, adding to repair list", t.GetKey(torrent), brokenFileCount, torrent.SelectedFiles.Count()) + toRepair.Add(torrent) + return + } + // check 2: for unassigned links (this means the torrent has started to deteriorate) + unassignedCount := torrent.UnassignedLinks.Cardinality() + if unassignedCount > 0 { + t.log.Debugf("Torrent %s has %d unassigned links, adding to repair list", t.GetKey(torrent), unassignedCount) + toRepair.Add(torrent) + return } }) - if brokenFileCount > 0 { - t.log.Debugf("Torrent %s has %d broken files, adding to repair list", t.GetKey(torrent), brokenFileCount) - toRepair.Add(torrent) - return - } - // check 2: for unassigned links (this means the torrent has started to deteriorate) - unassignedCount := torrent.UnassignedLinks.Cardinality() - if unassignedCount > 0 { - t.log.Debugf("Torrent %s has %d unassigned links, adding to repair list", t.GetKey(torrent), unassignedCount) - toRepair.Add(torrent) - return - } }) - var wg sync.WaitGroup + wg.Wait() + toRepair.Each(func(torrent *Torrent) bool { wg.Add(1) t.Repair(torrent, &wg) @@ -127,7 +133,7 @@ func (t *TorrentManager) repairAll(torrent *Torrent) { }) wg.Wait() - t.log.Infof("Finished repair sequence for %d broken torrents", toRepair.Cardinality()) + t.log.Infof("Finished periodic repair sequence for %d broken torrent(s)", toRepair.Cardinality()) } func (t *TorrentManager) Repair(torrent *Torrent, wg *sync.WaitGroup) { @@ -206,7 +212,7 @@ func (t *TorrentManager) repair(torrent *Torrent) { return } - t.log.Infof("Repairing by downloading %d batches of the %d broken files of torrent %s", int(math.Ceil(float64(len(brokenFiles))/100)), len(brokenFiles), t.GetKey(torrent)) + t.log.Infof("Torrent %s will be repaired by downloading %d batches of the %d broken files", int(math.Ceil(float64(len(brokenFiles))/100)), len(brokenFiles), t.GetKey(torrent)) newlyDownloadedIds := make([]string, 0) batchNum := 1 @@ -233,7 +239,7 @@ func (t *TorrentManager) repair(torrent *Torrent) { t.log.Debugf("Downloading last batch of broken files of torrent %s", t.GetKey(torrent)) if len(group) > 0 { - _, err := t.redownloadTorrent(torrent, group) + redownloadedInfo, err := t.redownloadTorrent(torrent, group) if err != nil { t.log.Warnf("Cannot repair torrent %s by downloading broken files (error=%s) giving up", t.GetKey(torrent), err.Error()) for _, newId := range newlyDownloadedIds { @@ -241,9 +247,12 @@ func (t *TorrentManager) repair(torrent *Torrent) { } return } + newlyDownloadedIds = append(newlyDownloadedIds, redownloadedInfo.ID) } - /// TODO: should we delete the old torrents that were replaced? + for _, newId := range newlyDownloadedIds { + t.trashOnceCompleted(newId) + } } func (t *TorrentManager) assignUnassignedLinks(torrent *Torrent) bool {