Small repairs on logic
This commit is contained in:
@@ -88,6 +88,7 @@ func MainApp(configPath string) {
|
||||
|
||||
hosts := repo.GetOptimalHosts(config.GetNumberOfHosts(), config.ShouldForceIPv6())
|
||||
zurglog.Debugf("Optimal hosts (%d): %v", len(hosts), hosts)
|
||||
zurglog.Debug("To reset optimal hosts, run 'zurg network-test' (Using docker compose? 'docker compose exec zurg ./zurg network-test')")
|
||||
|
||||
downloadClient := http.NewHTTPClient(
|
||||
"",
|
||||
|
||||
@@ -14,16 +14,12 @@ const BINS_FILE = "data/bins.json"
|
||||
func (t *TorrentManager) initializeBins() {
|
||||
if _, err := os.Stat(BINS_FILE); os.IsNotExist(err) {
|
||||
t.repairLog.Info("data/bins.json does not exist. Initializing empty bins.")
|
||||
t.ImmediateBin = mapset.NewSet[string]()
|
||||
t.OnceDoneBin = mapset.NewSet[string]()
|
||||
return
|
||||
}
|
||||
|
||||
fileData, err := os.ReadFile(BINS_FILE)
|
||||
if err != nil {
|
||||
t.repairLog.Errorf("Failed to read bins.json file: %v", err)
|
||||
t.ImmediateBin = mapset.NewSet[string]()
|
||||
t.OnceDoneBin = mapset.NewSet[string]()
|
||||
t.repairLog.Errorf("Failed to read bins.json file: %v Initializing empty bins.", err)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -31,9 +27,7 @@ func (t *TorrentManager) initializeBins() {
|
||||
|
||||
err = json.Unmarshal(fileData, &data)
|
||||
if err != nil {
|
||||
t.repairLog.Errorf("Failed to unmarshal bin data: %v", err)
|
||||
t.ImmediateBin = mapset.NewSet[string]()
|
||||
t.OnceDoneBin = mapset.NewSet[string]()
|
||||
t.repairLog.Errorf("Failed to unmarshal bin data: %v Initializing empty bins.", err)
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
@@ -47,7 +47,6 @@ type TorrentManager struct {
|
||||
AnalyzeTrigger chan struct{}
|
||||
|
||||
latestState *LibraryState
|
||||
inProgressHashes mapset.Set[string]
|
||||
|
||||
repairChan chan *Torrent
|
||||
RepairQueue mapset.Set[*Torrent]
|
||||
@@ -85,6 +84,9 @@ func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, w
|
||||
AnalyzeTrigger: make(chan struct{}, 1),
|
||||
|
||||
latestState: &LibraryState{log: log},
|
||||
|
||||
ImmediateBin: mapset.NewSet[string](),
|
||||
OnceDoneBin: mapset.NewSet[string](),
|
||||
}
|
||||
|
||||
t.initializeBins()
|
||||
@@ -236,6 +238,7 @@ func (t *TorrentManager) applyMediaInfoDetails(torrent *Torrent) {
|
||||
unrestrict := t.UnrestrictFileUntilOk(file, true)
|
||||
if unrestrict == nil {
|
||||
file.State.Event(context.Background(), "break_file")
|
||||
t.EnqueueForRepair(torrent)
|
||||
changesApplied = true
|
||||
return
|
||||
}
|
||||
|
||||
@@ -16,12 +16,7 @@ import (
|
||||
"gopkg.in/vansante/go-ffprobe.v2"
|
||||
)
|
||||
|
||||
func inProgressStatus(status string) bool {
|
||||
return status == "downloading" || status == "uploading" || status == "queued" || status == "compressing"
|
||||
}
|
||||
|
||||
func (t *TorrentManager) refreshTorrents(initialRun bool) {
|
||||
t.inProgressHashes = mapset.NewSet[string]()
|
||||
instances, _, err := t.api.GetTorrents(false)
|
||||
if err != nil {
|
||||
t.log.Warnf("Cannot get torrents: %v", err)
|
||||
@@ -45,9 +40,6 @@ func (t *TorrentManager) refreshTorrents(initialRun bool) {
|
||||
if t.binImmediately(instances[idx].ID) ||
|
||||
t.binOnceDoneErrorCheck(instances[idx].ID, instances[idx].Status) ||
|
||||
instances[idx].Progress != 100 {
|
||||
if inProgressStatus(instances[idx].Status) {
|
||||
t.inProgressHashes.Add(instances[idx].Hash)
|
||||
}
|
||||
mergeChan <- nil
|
||||
return
|
||||
}
|
||||
@@ -84,7 +76,6 @@ func (t *TorrentManager) refreshTorrents(initialRun bool) {
|
||||
if torrent == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
accessKey := t.GetKey(torrent)
|
||||
existing, ok := allTorrents.Get(accessKey)
|
||||
if !ok {
|
||||
|
||||
@@ -164,7 +164,7 @@ func (t *TorrentManager) executeRepairJob(torrent *Torrent) {
|
||||
func (t *TorrentManager) repair(torrent *Torrent, wg *sync.WaitGroup) {
|
||||
defer wg.Done()
|
||||
|
||||
if err := torrent.State.Event(context.Background(), "repair_torrent"); err != nil && t.inProgressHashes.Contains(torrent.Hash) {
|
||||
if err := torrent.State.Event(context.Background(), "repair_torrent"); err != nil {
|
||||
// t.repairLog.Errorf("Failed to mark torrent %s as under repair: %v", t.GetKey(torrent), err)
|
||||
return
|
||||
}
|
||||
@@ -213,18 +213,18 @@ func (t *TorrentManager) repair(torrent *Torrent, wg *sync.WaitGroup) {
|
||||
return
|
||||
}
|
||||
|
||||
oldDownloadedIDs := torrent.DownloadedIDs.Clone()
|
||||
|
||||
// first step: redownload the whole torrent
|
||||
|
||||
t.repairLog.Debugf("Torrent %s has %d broken files (out of %d), repairing by redownloading whole torrent", t.GetKey(torrent), len(brokenFiles), torrent.SelectedFiles.Count())
|
||||
t.repairLog.Debugf("Torrent %s has %d broken files (out of %d); repairing by redownloading whole torrent", t.GetKey(torrent), len(brokenFiles), torrent.SelectedFiles.Count())
|
||||
|
||||
info, err := t.redownloadTorrent(torrent, []string{}) // reinsert the whole torrent, passing empty selection
|
||||
if info != nil && info.Progress == 100 {
|
||||
if !t.isStillBroken(info, brokenFiles) {
|
||||
// successful repair
|
||||
torrent.State.Event(context.Background(), "mark_as_repaired")
|
||||
t.repairLog.Infof("Successfully repaired torrent %s by redownloading whole torrent", t.GetKey(torrent))
|
||||
// delete the torrents it replaced
|
||||
torrent.DownloadedIDs.Clone().Each(func(torrentID string) bool {
|
||||
oldDownloadedIDs.Each(func(torrentID string) bool {
|
||||
if torrentID != info.ID {
|
||||
t.setXToBinOnceYDone(torrentID, info.ID)
|
||||
}
|
||||
@@ -248,7 +248,7 @@ func (t *TorrentManager) repair(torrent *Torrent, wg *sync.WaitGroup) {
|
||||
}
|
||||
|
||||
if torrent.UnrepairableReason != "" {
|
||||
t.repairLog.Debugf("Torrent %s has been marked as unfixable during redownloading whole torrent (%s), ending repair process early", t.GetKey(torrent), torrent.UnrepairableReason)
|
||||
t.repairLog.Debugf("Torrent %s has been marked as unfixable after redownloading torrent %s; ending repair process early", t.GetKey(torrent), torrent.UnrepairableReason)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -365,17 +365,19 @@ func (t *TorrentManager) assignLinks(torrent *Torrent) bool {
|
||||
t.writeTorrentToFile(torrent)
|
||||
}
|
||||
|
||||
if assignedCount == 0 && rarCount == 1 {
|
||||
action := t.Config.GetRarAction()
|
||||
if assignedCount != 0 || rarCount != 1 {
|
||||
return true // continue repair
|
||||
}
|
||||
|
||||
action := t.Config.GetRarAction()
|
||||
if action == "delete" {
|
||||
t.repairLog.Warnf("Torrent %s is rar'ed and we cannot repair it, deleting it as configured", t.GetKey(torrent))
|
||||
t.Delete(t.GetKey(torrent), true)
|
||||
return false
|
||||
return false // end repair
|
||||
}
|
||||
|
||||
newUnassignedLinks.IterCb(func(_ string, unassigned *realdebrid.Download) {
|
||||
newFile := &File{
|
||||
torrent.SelectedFiles.Set(unassigned.Filename, &File{
|
||||
File: realdebrid.File{
|
||||
ID: 0,
|
||||
Path: unassigned.Filename,
|
||||
@@ -385,8 +387,7 @@ func (t *TorrentManager) assignLinks(torrent *Torrent) bool {
|
||||
Ended: torrent.Added,
|
||||
Link: unassigned.Link,
|
||||
State: NewFileState("ok_file"),
|
||||
}
|
||||
torrent.SelectedFiles.Set(unassigned.Filename, newFile)
|
||||
})
|
||||
})
|
||||
|
||||
if action == "extract" {
|
||||
@@ -403,6 +404,7 @@ func (t *TorrentManager) assignLinks(torrent *Torrent) bool {
|
||||
}
|
||||
})
|
||||
if len(videoFiles) > 0 {
|
||||
t.repairLog.Debugf("Extracting %d video files from rar'ed torrent %s", len(videoFiles), t.GetKey(torrent))
|
||||
info, _ := t.redownloadTorrent(torrent, videoFiles)
|
||||
if info != nil {
|
||||
t.setToBinOnceDone(info.ID)
|
||||
@@ -415,13 +417,10 @@ func (t *TorrentManager) assignLinks(torrent *Torrent) bool {
|
||||
}
|
||||
|
||||
torrent.UnassignedLinks = mapset.NewSet[string]()
|
||||
torrent.State.Event(context.Background(), "mark_as_repaired")
|
||||
// torrent.State.Event(context.Background(), "mark_as_repaired")
|
||||
t.writeTorrentToFile(torrent)
|
||||
|
||||
return false // end repair
|
||||
}
|
||||
|
||||
return true // continue repair
|
||||
}
|
||||
|
||||
func (t *TorrentManager) redownloadTorrent(torrent *Torrent, selection []string) (*realdebrid.TorrentInfo, error) {
|
||||
@@ -630,7 +629,7 @@ func (t *TorrentManager) isStillBroken(info *realdebrid.TorrentInfo, brokenFiles
|
||||
brokenFiles = append(brokenFiles, selectedFiles[len(selectedFiles)-1])
|
||||
}
|
||||
|
||||
// check if the broken files can now be unrestricted
|
||||
// check if the broken files can now be unrestricted and downloaded
|
||||
for _, oldFile := range brokenFiles {
|
||||
for idx, newFile := range selectedFiles {
|
||||
if oldFile.ID == newFile.ID && t.UnrestrictFileUntilOk(selectedFiles[idx], true) == nil {
|
||||
|
||||
@@ -8,7 +8,9 @@ func NewTorrentState(initial string) *fsm.FSM {
|
||||
return fsm.NewFSM(
|
||||
initial,
|
||||
fsm.Events{
|
||||
// when enqueueing a torrent for repair
|
||||
{Name: "break_torrent", Src: []string{"ok_torrent"}, Dst: "broken_torrent"},
|
||||
// when repair has been started
|
||||
{Name: "repair_torrent", Src: []string{"ok_torrent", "broken_torrent"}, Dst: "under_repair_torrent"},
|
||||
{Name: "mark_as_repaired", Src: []string{"broken_torrent", "under_repair_torrent"}, Dst: "ok_torrent"},
|
||||
},
|
||||
|
||||
@@ -59,7 +59,7 @@ func (rd *RealDebrid) GetTorrents(onlyOne bool) ([]Torrent, int, error) {
|
||||
result := <-allResults
|
||||
if result.err != nil {
|
||||
rd.log.Warnf("Ignoring error when fetching torrents pg %d: %v", result.page, result.err)
|
||||
continue
|
||||
return nil, 0, result.err
|
||||
}
|
||||
bIdx := (result.page - 1) % maxParallelThreads
|
||||
batches[bIdx] = []Torrent{}
|
||||
@@ -71,7 +71,7 @@ func (rd *RealDebrid) GetTorrents(onlyOne bool) ([]Torrent, int, error) {
|
||||
cIdxEnd := cachedCount - 1 - cIdx
|
||||
for tIdx, torrent := range batch { // 250 torrents
|
||||
tIdxEnd := indexFromEnd(tIdx, page+bIdx, pageSize, result.total)
|
||||
if torrent.ID == cached.ID && tIdxEnd == cIdxEnd {
|
||||
if torrent.ID == cached.ID && torrent.Progress == cached.Progress && tIdxEnd == cIdxEnd {
|
||||
allTorrents = append(allTorrents, batch[:tIdx]...)
|
||||
allTorrents = append(allTorrents, rd.torrentsCache[cIdx:]...)
|
||||
rd.log.Debugf("Got %d/%d torrents", len(allTorrents), result.total)
|
||||
|
||||
@@ -68,6 +68,20 @@ func (i *Torrent) UnmarshalJSON(data []byte) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (i *Torrent) MarshalJSON() ([]byte, error) {
|
||||
type Alias Torrent
|
||||
aux := &struct {
|
||||
Progress float64 `json:"progress"`
|
||||
Added string `json:"added"`
|
||||
*Alias
|
||||
}{
|
||||
Alias: (*Alias)(i),
|
||||
Progress: float64(i.Progress), // Convert int to float64 for JSON representation
|
||||
Added: i.Added,
|
||||
}
|
||||
return json.Marshal(aux)
|
||||
}
|
||||
|
||||
type TorrentInfo struct {
|
||||
ID string `json:"id"`
|
||||
Name string `json:"filename"`
|
||||
|
||||
Reference in New Issue
Block a user