From 8636a0569da48d7b54664be46114db8ba44e937a Mon Sep 17 00:00:00 2001 From: Ben Sarmiento Date: Thu, 23 May 2024 21:45:20 +0200 Subject: [PATCH] Repair works now --- internal/torrent/latestState.go | 7 ++--- internal/torrent/refresh.go | 50 ++++++++++++++------------------- internal/torrent/repair.go | 13 +++------ internal/torrent/states.go | 6 +--- 4 files changed, 29 insertions(+), 47 deletions(-) diff --git a/internal/torrent/latestState.go b/internal/torrent/latestState.go index 3bb4a31..a558ad4 100644 --- a/internal/torrent/latestState.go +++ b/internal/torrent/latestState.go @@ -11,16 +11,15 @@ type LibraryState struct { func (ls *LibraryState) Eq(a LibraryState) bool { if ls.TotalCount == 0 || ls.FirstTorrentId == "" { - ls.log.Debugf("Checksum is empty") return false } else if a.TotalCount != ls.TotalCount { - ls.log.Debugf("Checksum total count mismatch: %d != %d", a.TotalCount, ls.TotalCount) + ls.log.Debugf("Detected changes! Total count mismatch: was %d now %d", ls.TotalCount, a.TotalCount) return false } else if a.ActiveCount != ls.ActiveCount { - ls.log.Debugf("Checksum active count mismatch: %d != %d", a.ActiveCount, ls.ActiveCount) + ls.log.Debugf("Detected changes! Active count mismatch: was %d now %d", ls.ActiveCount, a.ActiveCount) return false } else if a.FirstTorrentId != ls.FirstTorrentId { - ls.log.Debugf("Checksum first torrent id mismatch: %s != %s", a.FirstTorrentId, ls.FirstTorrentId) + ls.log.Debugf("Detected changes! First torrent id mismatch: was %s now %s", ls.FirstTorrentId, a.FirstTorrentId) return false } return true diff --git a/internal/torrent/refresh.go b/internal/torrent/refresh.go index 9598060..e693476 100644 --- a/internal/torrent/refresh.go +++ b/internal/torrent/refresh.go @@ -21,7 +21,6 @@ func (t *TorrentManager) refreshTorrents() []string { t.log.Warnf("Cannot get torrents: %v", err) return nil } - t.log.Infof("Fetched %d torrents", len(instances)) var wg sync.WaitGroup var mergeChan = make(chan *Torrent, len(instances)) @@ -34,7 +33,8 @@ func (t *TorrentManager) refreshTorrents() []string { freshIDs := mapset.NewSet[string]() freshAccessKeys := mapset.NewSet[string]() - counter := 0 + t.log.Infof("Getting info of %d torrents", len(instances)) + for i := range instances { wg.Add(1) idx := i @@ -43,14 +43,12 @@ func (t *TorrentManager) refreshTorrents() []string { if t.trashBin.Contains(instances[idx].ID) { t.api.DeleteTorrent(instances[idx].ID) t.log.Debugf("Skipping trashed torrent %s (id=%s)", instances[idx].Name, instances[idx].ID) - counter++ mergeChan <- nil return } if instances[idx].Progress != 100 { t.log.Debugf("Skipping incomplete torrent %s (id=%s)", instances[idx].Name, instances[idx].ID) - counter++ mergeChan <- nil return } @@ -78,22 +76,18 @@ func (t *TorrentManager) refreshTorrents() []string { forMerging = torrent } - counter++ mergeChan <- forMerging }) } wg.Wait() close(mergeChan) - t.log.Infof("Finished fetching info for %d torrents, proceeding to merge", counter) for torrent := range mergeChan { if torrent == nil { continue } - t.log.Debugf("Merging %s", t.GetKey(torrent)) - accessKey := t.GetKey(torrent) existing, ok := allTorrents.Get(accessKey) if !ok { @@ -117,12 +111,8 @@ func (t *TorrentManager) refreshTorrents() []string { updatedPaths.Add(fmt.Sprintf("%s/%s", directory, accessKey)) }) - - t.log.Debugf("Merging %s done!", t.GetKey(existing)) } - t.log.Infof("Fetched info for %d torrents", len(instances)) - noInfoCount := 0 // removed torrents @@ -135,18 +125,18 @@ func (t *TorrentManager) refreshTorrents() []string { t.log.Infof("Compiled into %d torrents, %d were missing info", allTorrents.Count(), noInfoCount) // data directory cleanup - existingHashes := mapset.NewSet[string]() - t.getTorrentFiles().Each(func(path string) bool { - path = filepath.Base(path) - hash := strings.TrimSuffix(path, ".torrent_zurg") - existingHashes.Add(hash) - return false - }) - existingHashes.Difference(freshHashes).Each(func(hash string) bool { - t.log.Infof("Deleting stale torrent file %s", hash) - t.deleteTorrentFile(hash) - return false - }) + // existingHashes := mapset.NewSet[string]() + // t.getTorrentFiles().Each(func(path string) bool { + // path = filepath.Base(path) + // hash := strings.TrimSuffix(path, ".torrent_zurg") + // existingHashes.Add(hash) + // return false + // }) + // existingHashes.Difference(freshHashes).Each(func(hash string) bool { + // t.log.Infof("Deleting stale torrent file %s", hash) + // t.deleteTorrentFile(hash) + // return false + // }) existingIDs := mapset.NewSet[string]() t.getInfoFiles().Each(func(path string) bool { path = filepath.Base(path) @@ -177,7 +167,6 @@ func (t *TorrentManager) StartRefreshJob() { if t.latestState.Eq(checksum) { continue } - t.log.Infof("Detected changes! Refreshing %d torrents", checksum.TotalCount) t.setNewLatestState(checksum) updatedPaths := t.refreshTorrents() @@ -311,7 +300,7 @@ func (t *TorrentManager) mergeTorrents(existing, toMerge *Torrent) *Torrent { }) // base of the merged torrent - mergedTorrent := Torrent{ + mergedTorrent := &Torrent{ Name: older.Name, OriginalName: older.OriginalName, Rename: older.Rename, @@ -370,10 +359,13 @@ func (t *TorrentManager) mergeTorrents(existing, toMerge *Torrent) *Torrent { } }) - // todo - t.log.Debugf("Merging %s (%d comps) - selected files: %d ; unassigned: %d ; broken: %d ; ok %d ; wtf %d", t.GetKey(&mergedTorrent), numComponents, brokenCount+okCount+wtfCount, mergedTorrent.UnassignedLinks.Cardinality(), brokenCount, okCount, wtfCount) + if brokenCount == 0 && okCount > 0 && mergedTorrent.State.Can("mark_as_repaired") { + if err := mergedTorrent.State.Event(context.Background(), "mark_as_repaired"); err != nil { + t.log.Errorf("Cannot repair torrent %s: %v", t, t.GetKey(mergedTorrent), err) + } + } - return &mergedTorrent + return mergedTorrent } func (t *TorrentManager) assignDirectory(tor *Torrent, cb func(string)) { diff --git a/internal/torrent/repair.go b/internal/torrent/repair.go index 4f0766a..09df0c0 100644 --- a/internal/torrent/repair.go +++ b/internal/torrent/repair.go @@ -483,17 +483,12 @@ func (t *TorrentManager) canCapacityHandle() bool { } func (t *TorrentManager) markAsUnplayable(torrent *Torrent, reason string) { - if torrent.State.Is("unplayable_torrent") { - return - } - t.log.Warnf("Marking torrent %s as unplayable - %s", t.GetKey(torrent), reason) - if err := torrent.State.Event(context.Background(), "mark_as_unplayable_torrent"); err != nil { - t.log.Errorf("Failed to mark torrent %s as unplayable: %v", t.GetKey(torrent), err) - return - } - t.writeTorrentToFile(torrent) + t.log.Warnf("Torrent %s is unplayable (reason: %s), moving to unplayable directory", t.GetKey(torrent), reason) // reassign to unplayable torrents directory t.DirectoryMap.IterCb(func(directory string, torrents cmap.ConcurrentMap[string, *Torrent]) { + if strings.HasPrefix(directory, "int__") { + return + } torrents.Remove(t.GetKey(torrent)) }) torrents, _ := t.DirectoryMap.Get(config.UNPLAYABLE_TORRENTS) diff --git a/internal/torrent/states.go b/internal/torrent/states.go index a516869..3586afb 100644 --- a/internal/torrent/states.go +++ b/internal/torrent/states.go @@ -5,16 +5,12 @@ import ( ) func NewTorrentState(initial string) *fsm.FSM { - // ok_torrent 1 - // broken_torrent 1 - // unplayable_torrent 1 return fsm.NewFSM( initial, fsm.Events{ - {Name: "break_torrent", Src: []string{"ok_torrent", "unplayable_torrent"}, Dst: "broken_torrent"}, + {Name: "break_torrent", Src: []string{"ok_torrent"}, Dst: "broken_torrent"}, {Name: "repair_torrent", Src: []string{"broken_torrent"}, Dst: "under_repair_torrent"}, {Name: "mark_as_repaired", Src: []string{"broken_torrent", "under_repair_torrent"}, Dst: "ok_torrent"}, - {Name: "mark_as_unplayable_torrent", Src: []string{"ok_torrent"}, Dst: "unplayable_torrent"}, }, fsm.Callbacks{}, )