Repair works now
This commit is contained in:
@@ -11,16 +11,15 @@ type LibraryState struct {
|
||||
|
||||
func (ls *LibraryState) Eq(a LibraryState) bool {
|
||||
if ls.TotalCount == 0 || ls.FirstTorrentId == "" {
|
||||
ls.log.Debugf("Checksum is empty")
|
||||
return false
|
||||
} else if a.TotalCount != ls.TotalCount {
|
||||
ls.log.Debugf("Checksum total count mismatch: %d != %d", a.TotalCount, ls.TotalCount)
|
||||
ls.log.Debugf("Detected changes! Total count mismatch: was %d now %d", ls.TotalCount, a.TotalCount)
|
||||
return false
|
||||
} else if a.ActiveCount != ls.ActiveCount {
|
||||
ls.log.Debugf("Checksum active count mismatch: %d != %d", a.ActiveCount, ls.ActiveCount)
|
||||
ls.log.Debugf("Detected changes! Active count mismatch: was %d now %d", ls.ActiveCount, a.ActiveCount)
|
||||
return false
|
||||
} else if a.FirstTorrentId != ls.FirstTorrentId {
|
||||
ls.log.Debugf("Checksum first torrent id mismatch: %s != %s", a.FirstTorrentId, ls.FirstTorrentId)
|
||||
ls.log.Debugf("Detected changes! First torrent id mismatch: was %s now %s", ls.FirstTorrentId, a.FirstTorrentId)
|
||||
return false
|
||||
}
|
||||
return true
|
||||
|
||||
@@ -21,7 +21,6 @@ func (t *TorrentManager) refreshTorrents() []string {
|
||||
t.log.Warnf("Cannot get torrents: %v", err)
|
||||
return nil
|
||||
}
|
||||
t.log.Infof("Fetched %d torrents", len(instances))
|
||||
|
||||
var wg sync.WaitGroup
|
||||
var mergeChan = make(chan *Torrent, len(instances))
|
||||
@@ -34,7 +33,8 @@ func (t *TorrentManager) refreshTorrents() []string {
|
||||
freshIDs := mapset.NewSet[string]()
|
||||
freshAccessKeys := mapset.NewSet[string]()
|
||||
|
||||
counter := 0
|
||||
t.log.Infof("Getting info of %d torrents", len(instances))
|
||||
|
||||
for i := range instances {
|
||||
wg.Add(1)
|
||||
idx := i
|
||||
@@ -43,14 +43,12 @@ func (t *TorrentManager) refreshTorrents() []string {
|
||||
if t.trashBin.Contains(instances[idx].ID) {
|
||||
t.api.DeleteTorrent(instances[idx].ID)
|
||||
t.log.Debugf("Skipping trashed torrent %s (id=%s)", instances[idx].Name, instances[idx].ID)
|
||||
counter++
|
||||
mergeChan <- nil
|
||||
return
|
||||
}
|
||||
|
||||
if instances[idx].Progress != 100 {
|
||||
t.log.Debugf("Skipping incomplete torrent %s (id=%s)", instances[idx].Name, instances[idx].ID)
|
||||
counter++
|
||||
mergeChan <- nil
|
||||
return
|
||||
}
|
||||
@@ -78,22 +76,18 @@ func (t *TorrentManager) refreshTorrents() []string {
|
||||
forMerging = torrent
|
||||
}
|
||||
|
||||
counter++
|
||||
mergeChan <- forMerging
|
||||
})
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
close(mergeChan)
|
||||
t.log.Infof("Finished fetching info for %d torrents, proceeding to merge", counter)
|
||||
|
||||
for torrent := range mergeChan {
|
||||
if torrent == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
t.log.Debugf("Merging %s", t.GetKey(torrent))
|
||||
|
||||
accessKey := t.GetKey(torrent)
|
||||
existing, ok := allTorrents.Get(accessKey)
|
||||
if !ok {
|
||||
@@ -117,12 +111,8 @@ func (t *TorrentManager) refreshTorrents() []string {
|
||||
|
||||
updatedPaths.Add(fmt.Sprintf("%s/%s", directory, accessKey))
|
||||
})
|
||||
|
||||
t.log.Debugf("Merging %s done!", t.GetKey(existing))
|
||||
}
|
||||
|
||||
t.log.Infof("Fetched info for %d torrents", len(instances))
|
||||
|
||||
noInfoCount := 0
|
||||
|
||||
// removed torrents
|
||||
@@ -135,18 +125,18 @@ func (t *TorrentManager) refreshTorrents() []string {
|
||||
t.log.Infof("Compiled into %d torrents, %d were missing info", allTorrents.Count(), noInfoCount)
|
||||
|
||||
// data directory cleanup
|
||||
existingHashes := mapset.NewSet[string]()
|
||||
t.getTorrentFiles().Each(func(path string) bool {
|
||||
path = filepath.Base(path)
|
||||
hash := strings.TrimSuffix(path, ".torrent_zurg")
|
||||
existingHashes.Add(hash)
|
||||
return false
|
||||
})
|
||||
existingHashes.Difference(freshHashes).Each(func(hash string) bool {
|
||||
t.log.Infof("Deleting stale torrent file %s", hash)
|
||||
t.deleteTorrentFile(hash)
|
||||
return false
|
||||
})
|
||||
// existingHashes := mapset.NewSet[string]()
|
||||
// t.getTorrentFiles().Each(func(path string) bool {
|
||||
// path = filepath.Base(path)
|
||||
// hash := strings.TrimSuffix(path, ".torrent_zurg")
|
||||
// existingHashes.Add(hash)
|
||||
// return false
|
||||
// })
|
||||
// existingHashes.Difference(freshHashes).Each(func(hash string) bool {
|
||||
// t.log.Infof("Deleting stale torrent file %s", hash)
|
||||
// t.deleteTorrentFile(hash)
|
||||
// return false
|
||||
// })
|
||||
existingIDs := mapset.NewSet[string]()
|
||||
t.getInfoFiles().Each(func(path string) bool {
|
||||
path = filepath.Base(path)
|
||||
@@ -177,7 +167,6 @@ func (t *TorrentManager) StartRefreshJob() {
|
||||
if t.latestState.Eq(checksum) {
|
||||
continue
|
||||
}
|
||||
t.log.Infof("Detected changes! Refreshing %d torrents", checksum.TotalCount)
|
||||
t.setNewLatestState(checksum)
|
||||
|
||||
updatedPaths := t.refreshTorrents()
|
||||
@@ -311,7 +300,7 @@ func (t *TorrentManager) mergeTorrents(existing, toMerge *Torrent) *Torrent {
|
||||
})
|
||||
|
||||
// base of the merged torrent
|
||||
mergedTorrent := Torrent{
|
||||
mergedTorrent := &Torrent{
|
||||
Name: older.Name,
|
||||
OriginalName: older.OriginalName,
|
||||
Rename: older.Rename,
|
||||
@@ -370,10 +359,13 @@ func (t *TorrentManager) mergeTorrents(existing, toMerge *Torrent) *Torrent {
|
||||
}
|
||||
})
|
||||
|
||||
// todo
|
||||
t.log.Debugf("Merging %s (%d comps) - selected files: %d ; unassigned: %d ; broken: %d ; ok %d ; wtf %d", t.GetKey(&mergedTorrent), numComponents, brokenCount+okCount+wtfCount, mergedTorrent.UnassignedLinks.Cardinality(), brokenCount, okCount, wtfCount)
|
||||
if brokenCount == 0 && okCount > 0 && mergedTorrent.State.Can("mark_as_repaired") {
|
||||
if err := mergedTorrent.State.Event(context.Background(), "mark_as_repaired"); err != nil {
|
||||
t.log.Errorf("Cannot repair torrent %s: %v", t, t.GetKey(mergedTorrent), err)
|
||||
}
|
||||
}
|
||||
|
||||
return &mergedTorrent
|
||||
return mergedTorrent
|
||||
}
|
||||
|
||||
func (t *TorrentManager) assignDirectory(tor *Torrent, cb func(string)) {
|
||||
|
||||
@@ -483,17 +483,12 @@ func (t *TorrentManager) canCapacityHandle() bool {
|
||||
}
|
||||
|
||||
func (t *TorrentManager) markAsUnplayable(torrent *Torrent, reason string) {
|
||||
if torrent.State.Is("unplayable_torrent") {
|
||||
return
|
||||
}
|
||||
t.log.Warnf("Marking torrent %s as unplayable - %s", t.GetKey(torrent), reason)
|
||||
if err := torrent.State.Event(context.Background(), "mark_as_unplayable_torrent"); err != nil {
|
||||
t.log.Errorf("Failed to mark torrent %s as unplayable: %v", t.GetKey(torrent), err)
|
||||
return
|
||||
}
|
||||
t.writeTorrentToFile(torrent)
|
||||
t.log.Warnf("Torrent %s is unplayable (reason: %s), moving to unplayable directory", t.GetKey(torrent), reason)
|
||||
// reassign to unplayable torrents directory
|
||||
t.DirectoryMap.IterCb(func(directory string, torrents cmap.ConcurrentMap[string, *Torrent]) {
|
||||
if strings.HasPrefix(directory, "int__") {
|
||||
return
|
||||
}
|
||||
torrents.Remove(t.GetKey(torrent))
|
||||
})
|
||||
torrents, _ := t.DirectoryMap.Get(config.UNPLAYABLE_TORRENTS)
|
||||
|
||||
@@ -5,16 +5,12 @@ import (
|
||||
)
|
||||
|
||||
func NewTorrentState(initial string) *fsm.FSM {
|
||||
// ok_torrent 1
|
||||
// broken_torrent 1
|
||||
// unplayable_torrent 1
|
||||
return fsm.NewFSM(
|
||||
initial,
|
||||
fsm.Events{
|
||||
{Name: "break_torrent", Src: []string{"ok_torrent", "unplayable_torrent"}, Dst: "broken_torrent"},
|
||||
{Name: "break_torrent", Src: []string{"ok_torrent"}, Dst: "broken_torrent"},
|
||||
{Name: "repair_torrent", Src: []string{"broken_torrent"}, Dst: "under_repair_torrent"},
|
||||
{Name: "mark_as_repaired", Src: []string{"broken_torrent", "under_repair_torrent"}, Dst: "ok_torrent"},
|
||||
{Name: "mark_as_unplayable_torrent", Src: []string{"ok_torrent"}, Dst: "unplayable_torrent"},
|
||||
},
|
||||
fsm.Callbacks{},
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user