diff --git a/internal/app.go b/internal/app.go index 2f5a74b..f1f8734 100644 --- a/internal/app.go +++ b/internal/app.go @@ -88,6 +88,7 @@ func MainApp(configPath string) { hosts := repo.GetOptimalHosts(config.GetNumberOfHosts(), config.ShouldForceIPv6()) zurglog.Debugf("Optimal hosts (%d): %v", len(hosts), hosts) + zurglog.Debug("To reset optimal hosts, run 'zurg network-test' (Using docker compose? 'docker compose exec zurg ./zurg network-test')") downloadClient := http.NewHTTPClient( "", diff --git a/internal/torrent/bins.go b/internal/torrent/bins.go index da5774f..50357df 100644 --- a/internal/torrent/bins.go +++ b/internal/torrent/bins.go @@ -14,16 +14,12 @@ const BINS_FILE = "data/bins.json" func (t *TorrentManager) initializeBins() { if _, err := os.Stat(BINS_FILE); os.IsNotExist(err) { t.repairLog.Info("data/bins.json does not exist. Initializing empty bins.") - t.ImmediateBin = mapset.NewSet[string]() - t.OnceDoneBin = mapset.NewSet[string]() return } fileData, err := os.ReadFile(BINS_FILE) if err != nil { - t.repairLog.Errorf("Failed to read bins.json file: %v", err) - t.ImmediateBin = mapset.NewSet[string]() - t.OnceDoneBin = mapset.NewSet[string]() + t.repairLog.Errorf("Failed to read bins.json file: %v Initializing empty bins.", err) return } @@ -31,9 +27,7 @@ func (t *TorrentManager) initializeBins() { err = json.Unmarshal(fileData, &data) if err != nil { - t.repairLog.Errorf("Failed to unmarshal bin data: %v", err) - t.ImmediateBin = mapset.NewSet[string]() - t.OnceDoneBin = mapset.NewSet[string]() + t.repairLog.Errorf("Failed to unmarshal bin data: %v Initializing empty bins.", err) return } diff --git a/internal/torrent/manager.go b/internal/torrent/manager.go index c3ca4d9..ec31da2 100644 --- a/internal/torrent/manager.go +++ b/internal/torrent/manager.go @@ -46,8 +46,7 @@ type TorrentManager struct { DumpTrigger chan struct{} AnalyzeTrigger chan struct{} - latestState *LibraryState - inProgressHashes mapset.Set[string] + latestState *LibraryState repairChan chan *Torrent RepairQueue mapset.Set[*Torrent] @@ -85,6 +84,9 @@ func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, w AnalyzeTrigger: make(chan struct{}, 1), latestState: &LibraryState{log: log}, + + ImmediateBin: mapset.NewSet[string](), + OnceDoneBin: mapset.NewSet[string](), } t.initializeBins() @@ -236,6 +238,7 @@ func (t *TorrentManager) applyMediaInfoDetails(torrent *Torrent) { unrestrict := t.UnrestrictFileUntilOk(file, true) if unrestrict == nil { file.State.Event(context.Background(), "break_file") + t.EnqueueForRepair(torrent) changesApplied = true return } diff --git a/internal/torrent/refresh.go b/internal/torrent/refresh.go index ba15973..5ae3789 100644 --- a/internal/torrent/refresh.go +++ b/internal/torrent/refresh.go @@ -16,12 +16,7 @@ import ( "gopkg.in/vansante/go-ffprobe.v2" ) -func inProgressStatus(status string) bool { - return status == "downloading" || status == "uploading" || status == "queued" || status == "compressing" -} - func (t *TorrentManager) refreshTorrents(initialRun bool) { - t.inProgressHashes = mapset.NewSet[string]() instances, _, err := t.api.GetTorrents(false) if err != nil { t.log.Warnf("Cannot get torrents: %v", err) @@ -45,9 +40,6 @@ func (t *TorrentManager) refreshTorrents(initialRun bool) { if t.binImmediately(instances[idx].ID) || t.binOnceDoneErrorCheck(instances[idx].ID, instances[idx].Status) || instances[idx].Progress != 100 { - if inProgressStatus(instances[idx].Status) { - t.inProgressHashes.Add(instances[idx].Hash) - } mergeChan <- nil return } @@ -84,7 +76,6 @@ func (t *TorrentManager) refreshTorrents(initialRun bool) { if torrent == nil { continue } - accessKey := t.GetKey(torrent) existing, ok := allTorrents.Get(accessKey) if !ok { diff --git a/internal/torrent/repair.go b/internal/torrent/repair.go index 541be58..dc10bcd 100644 --- a/internal/torrent/repair.go +++ b/internal/torrent/repair.go @@ -164,7 +164,7 @@ func (t *TorrentManager) executeRepairJob(torrent *Torrent) { func (t *TorrentManager) repair(torrent *Torrent, wg *sync.WaitGroup) { defer wg.Done() - if err := torrent.State.Event(context.Background(), "repair_torrent"); err != nil && t.inProgressHashes.Contains(torrent.Hash) { + if err := torrent.State.Event(context.Background(), "repair_torrent"); err != nil { // t.repairLog.Errorf("Failed to mark torrent %s as under repair: %v", t.GetKey(torrent), err) return } @@ -213,18 +213,18 @@ func (t *TorrentManager) repair(torrent *Torrent, wg *sync.WaitGroup) { return } + oldDownloadedIDs := torrent.DownloadedIDs.Clone() + // first step: redownload the whole torrent - t.repairLog.Debugf("Torrent %s has %d broken files (out of %d), repairing by redownloading whole torrent", t.GetKey(torrent), len(brokenFiles), torrent.SelectedFiles.Count()) + t.repairLog.Debugf("Torrent %s has %d broken files (out of %d); repairing by redownloading whole torrent", t.GetKey(torrent), len(brokenFiles), torrent.SelectedFiles.Count()) info, err := t.redownloadTorrent(torrent, []string{}) // reinsert the whole torrent, passing empty selection if info != nil && info.Progress == 100 { if !t.isStillBroken(info, brokenFiles) { - // successful repair - torrent.State.Event(context.Background(), "mark_as_repaired") t.repairLog.Infof("Successfully repaired torrent %s by redownloading whole torrent", t.GetKey(torrent)) // delete the torrents it replaced - torrent.DownloadedIDs.Clone().Each(func(torrentID string) bool { + oldDownloadedIDs.Each(func(torrentID string) bool { if torrentID != info.ID { t.setXToBinOnceYDone(torrentID, info.ID) } @@ -248,7 +248,7 @@ func (t *TorrentManager) repair(torrent *Torrent, wg *sync.WaitGroup) { } if torrent.UnrepairableReason != "" { - t.repairLog.Debugf("Torrent %s has been marked as unfixable during redownloading whole torrent (%s), ending repair process early", t.GetKey(torrent), torrent.UnrepairableReason) + t.repairLog.Debugf("Torrent %s has been marked as unfixable after redownloading torrent %s; ending repair process early", t.GetKey(torrent), torrent.UnrepairableReason) return } @@ -365,63 +365,62 @@ func (t *TorrentManager) assignLinks(torrent *Torrent) bool { t.writeTorrentToFile(torrent) } - if assignedCount == 0 && rarCount == 1 { - action := t.Config.GetRarAction() + if assignedCount != 0 || rarCount != 1 { + return true // continue repair + } - if action == "delete" { - t.repairLog.Warnf("Torrent %s is rar'ed and we cannot repair it, deleting it as configured", t.GetKey(torrent)) - t.Delete(t.GetKey(torrent), true) - return false - } + action := t.Config.GetRarAction() + if action == "delete" { + t.repairLog.Warnf("Torrent %s is rar'ed and we cannot repair it, deleting it as configured", t.GetKey(torrent)) + t.Delete(t.GetKey(torrent), true) + return false // end repair + } - newUnassignedLinks.IterCb(func(_ string, unassigned *realdebrid.Download) { - newFile := &File{ - File: realdebrid.File{ - ID: 0, - Path: unassigned.Filename, - Bytes: unassigned.Filesize, - Selected: 0, - }, - Ended: torrent.Added, - Link: unassigned.Link, - State: NewFileState("ok_file"), - } - torrent.SelectedFiles.Set(unassigned.Filename, newFile) + newUnassignedLinks.IterCb(func(_ string, unassigned *realdebrid.Download) { + torrent.SelectedFiles.Set(unassigned.Filename, &File{ + File: realdebrid.File{ + ID: 0, + Path: unassigned.Filename, + Bytes: unassigned.Filesize, + Selected: 0, + }, + Ended: torrent.Added, + Link: unassigned.Link, + State: NewFileState("ok_file"), }) + }) - if action == "extract" { - videoFiles := []string{} - torrent.SelectedFiles.IterCb(func(_ string, file *File) { - if utils.IsPlayable(file.Path) { - videoFiles = append(videoFiles, fmt.Sprintf("%d", file.ID)) - } else if file.ID != 0 { - t.repairLog.Debugf("Extracting file %s from rar'ed torrent %s", file.Path, t.GetKey(torrent)) - info, _ := t.redownloadTorrent(torrent, []string{fmt.Sprintf("%d", file.ID)}) - if info != nil { - t.setToBinOnceDone(info.ID) - } - } - }) - if len(videoFiles) > 0 { - info, _ := t.redownloadTorrent(torrent, videoFiles) + if action == "extract" { + videoFiles := []string{} + torrent.SelectedFiles.IterCb(func(_ string, file *File) { + if utils.IsPlayable(file.Path) { + videoFiles = append(videoFiles, fmt.Sprintf("%d", file.ID)) + } else if file.ID != 0 { + t.repairLog.Debugf("Extracting file %s from rar'ed torrent %s", file.Path, t.GetKey(torrent)) + info, _ := t.redownloadTorrent(torrent, []string{fmt.Sprintf("%d", file.ID)}) if info != nil { t.setToBinOnceDone(info.ID) } } - } else { - t.repairLog.Warnf("Torrent %s is rar'ed and we cannot repair it", t.GetKey(torrent)) - t.markAsUnfixable(torrent, "rar'ed by RD") - t.markAsUnplayable(torrent, "rar'ed by RD") + }) + if len(videoFiles) > 0 { + t.repairLog.Debugf("Extracting %d video files from rar'ed torrent %s", len(videoFiles), t.GetKey(torrent)) + info, _ := t.redownloadTorrent(torrent, videoFiles) + if info != nil { + t.setToBinOnceDone(info.ID) + } } - - torrent.UnassignedLinks = mapset.NewSet[string]() - torrent.State.Event(context.Background(), "mark_as_repaired") - t.writeTorrentToFile(torrent) - - return false // end repair + } else { + t.repairLog.Warnf("Torrent %s is rar'ed and we cannot repair it", t.GetKey(torrent)) + t.markAsUnfixable(torrent, "rar'ed by RD") + t.markAsUnplayable(torrent, "rar'ed by RD") } - return true // continue repair + torrent.UnassignedLinks = mapset.NewSet[string]() + // torrent.State.Event(context.Background(), "mark_as_repaired") + t.writeTorrentToFile(torrent) + + return false // end repair } func (t *TorrentManager) redownloadTorrent(torrent *Torrent, selection []string) (*realdebrid.TorrentInfo, error) { @@ -630,7 +629,7 @@ func (t *TorrentManager) isStillBroken(info *realdebrid.TorrentInfo, brokenFiles brokenFiles = append(brokenFiles, selectedFiles[len(selectedFiles)-1]) } - // check if the broken files can now be unrestricted + // check if the broken files can now be unrestricted and downloaded for _, oldFile := range brokenFiles { for idx, newFile := range selectedFiles { if oldFile.ID == newFile.ID && t.UnrestrictFileUntilOk(selectedFiles[idx], true) == nil { diff --git a/internal/torrent/states.go b/internal/torrent/states.go index aa73c51..abad8ad 100644 --- a/internal/torrent/states.go +++ b/internal/torrent/states.go @@ -8,7 +8,9 @@ func NewTorrentState(initial string) *fsm.FSM { return fsm.NewFSM( initial, fsm.Events{ + // when enqueueing a torrent for repair {Name: "break_torrent", Src: []string{"ok_torrent"}, Dst: "broken_torrent"}, + // when repair has been started {Name: "repair_torrent", Src: []string{"ok_torrent", "broken_torrent"}, Dst: "under_repair_torrent"}, {Name: "mark_as_repaired", Src: []string{"broken_torrent", "under_repair_torrent"}, Dst: "ok_torrent"}, }, diff --git a/pkg/realdebrid/torrents.go b/pkg/realdebrid/torrents.go index d8d43fb..26220ff 100644 --- a/pkg/realdebrid/torrents.go +++ b/pkg/realdebrid/torrents.go @@ -59,7 +59,7 @@ func (rd *RealDebrid) GetTorrents(onlyOne bool) ([]Torrent, int, error) { result := <-allResults if result.err != nil { rd.log.Warnf("Ignoring error when fetching torrents pg %d: %v", result.page, result.err) - continue + return nil, 0, result.err } bIdx := (result.page - 1) % maxParallelThreads batches[bIdx] = []Torrent{} @@ -71,7 +71,7 @@ func (rd *RealDebrid) GetTorrents(onlyOne bool) ([]Torrent, int, error) { cIdxEnd := cachedCount - 1 - cIdx for tIdx, torrent := range batch { // 250 torrents tIdxEnd := indexFromEnd(tIdx, page+bIdx, pageSize, result.total) - if torrent.ID == cached.ID && tIdxEnd == cIdxEnd { + if torrent.ID == cached.ID && torrent.Progress == cached.Progress && tIdxEnd == cIdxEnd { allTorrents = append(allTorrents, batch[:tIdx]...) allTorrents = append(allTorrents, rd.torrentsCache[cIdx:]...) rd.log.Debugf("Got %d/%d torrents", len(allTorrents), result.total) diff --git a/pkg/realdebrid/types.go b/pkg/realdebrid/types.go index afa1230..a293e4f 100644 --- a/pkg/realdebrid/types.go +++ b/pkg/realdebrid/types.go @@ -68,6 +68,20 @@ func (i *Torrent) UnmarshalJSON(data []byte) error { return nil } +func (i *Torrent) MarshalJSON() ([]byte, error) { + type Alias Torrent + aux := &struct { + Progress float64 `json:"progress"` + Added string `json:"added"` + *Alias + }{ + Alias: (*Alias)(i), + Progress: float64(i.Progress), // Convert int to float64 for JSON representation + Added: i.Added, + } + return json.Marshal(aux) +} + type TorrentInfo struct { ID string `json:"id"` Name string `json:"filename"`