Remove immediate bin

This commit is contained in:
Ben Adrian Sarmiento
2024-06-18 23:43:31 +02:00
parent 34a7d6a432
commit acc9b69b5a
8 changed files with 58 additions and 87 deletions

View File

@@ -87,8 +87,13 @@ func MainApp(configPath string) {
)
hosts := repo.GetOptimalHosts(config.GetNumberOfHosts(), config.ShouldForceIPv6())
if len(hosts) == 0 {
zurglog.Fatal("No optimal hosts found. We cannot continue! (check if Real-Debrid is down or they have blocked your IP address)")
}
zurglog.Debugf("Optimal hosts (%d): %v", len(hosts), hosts)
// help message
zurglog.Debug("To reset optimal hosts, run 'zurg network-test' (Using docker compose? 'docker compose exec zurg ./zurg network-test')")
zurglog.Debug("To run network-test with a proxy, set the PROXY environment variable 'PROXY=http://xyz:123 zurg network-test'")
downloadClient := http.NewHTTPClient(
"",

View File

@@ -35,7 +35,6 @@ type RootResponse struct {
PID int `json:"pid"` // Process ID
Sponsor SponsorResponse `json:"sponsor_zurg"` // Sponsorship links
Config config.ZurgConfig `json:"config"`
ImmediateBin []string `json:"immediate_bin"`
OnceDoneBin []string `json:"once_done_bin"`
}
@@ -87,9 +86,8 @@ func (zr *Handlers) handleHome(resp http.ResponseWriter, req *http.Request) {
Github: "https://github.com/sponsors/debridmediamanager",
Paypal: "https://paypal.me/yowmamasita",
},
Config: zr.cfg.GetConfig(),
ImmediateBin: zr.torMgr.ImmediateBin.ToSlice(),
OnceDoneBin: zr.torMgr.OnceDoneBin.ToSlice(),
Config: zr.cfg.GetConfig(),
OnceDoneBin: zr.torMgr.OnceDoneBin.ToSlice(),
}
out := `<table border="1px">
@@ -272,11 +270,7 @@ func (zr *Handlers) handleHome(resp http.ResponseWriter, req *http.Request) {
<td>%v</td>
</tr>
<tr>
<td>Immediate Bin</td>
<td colspan="2">%v</td>
</tr>
<tr>
<td>Once Done Bin</td>
<td>IDs to be deleted once download completes</td>
<td colspan="2">%v</td>
</tr>
<tr>
@@ -367,7 +361,6 @@ func (zr *Handlers) handleHome(resp http.ResponseWriter, req *http.Request) {
response.Config.ShouldServeFromRclone(),
response.Config.ShouldForceIPv6(),
response.Config.GetOnLibraryUpdate(),
response.ImmediateBin,
response.OnceDoneBin,
)

View File

@@ -31,17 +31,14 @@ func (t *TorrentManager) initializeBins() {
return
}
t.ImmediateBin = mapset.NewSet[string](data["trash_bin"]...)
t.OnceDoneBin = mapset.NewSet[string](data["repair_bin"]...)
t.repairLog.Debugf("Bin immediately: %v", t.ImmediateBin.ToSlice())
t.repairLog.Debugf("Bin once done: %v", t.OnceDoneBin.ToSlice())
t.repairLog.Debugf("These IDs will be deleted after completion: %v", t.OnceDoneBin.ToSlice())
}
func (t *TorrentManager) persistBins() {
data := map[string]interface{}{
"trash_bin": t.ImmediateBin.ToSlice(), // Assuming trashBin is a mapset.Set[string]
"repair_bin": t.OnceDoneBin.ToSlice(), // Assuming repairBin is a mapset.Set[string]
"repair_bin": t.OnceDoneBin.ToSlice(), // Assuming repairBin is a mapset.Set[string]
}
jsonData, err := json.Marshal(data)
@@ -63,12 +60,6 @@ func (t *TorrentManager) persistBins() {
}
}
func (t *TorrentManager) setToBinImmediately(torrentId string) {
t.repairLog.Debugf("id=%s set to delete immediately", torrentId)
t.ImmediateBin.Add(torrentId)
t.persistBins()
}
func (t *TorrentManager) setToBinOnceDone(torrentId string) {
t.repairLog.Debugf("id=%s set to delete once it completes", torrentId)
t.OnceDoneBin.Add(torrentId)
@@ -83,24 +74,19 @@ func (t *TorrentManager) setXToBinOnceYDone(deleteId, completeId string) {
}
func (t *TorrentManager) cleanupBins(freshIDs mapset.Set[string]) {
t.ImmediateBin.Clone().Each(func(entry string) bool {
if !freshIDs.Contains(entry) {
t.ImmediateBin.Remove(entry)
}
return false
})
t.OnceDoneBin.Clone().Each(func(entry string) bool {
// check if the entry is a special case
// check for: delete x once y is done cases
if strings.Contains(entry, "-") {
// format is: id1-id2 or id1-
// if either id1 or id2 is not fresh, remove the entry
ids := strings.Split(entry, "-")
if !freshIDs.Contains(ids[0]) || (ids[1] != "" && !freshIDs.Contains(ids[1])) {
if !freshIDs.ContainsOne(ids[0]) || (ids[1] != "" && !freshIDs.ContainsOne(ids[1])) {
t.OnceDoneBin.Remove(entry)
}
return false
}
if !freshIDs.Contains(entry) {
// check for: delete once done cases
if !freshIDs.ContainsOne(entry) {
t.OnceDoneBin.Remove(entry)
}
return false
@@ -108,41 +94,22 @@ func (t *TorrentManager) cleanupBins(freshIDs mapset.Set[string]) {
t.persistBins()
}
// binImmediatelyErrorCheck checks if the torrent is in the ImmediateBin and deletes it if it is.
// returns true if the torrent was in the bin and was deleted, false otherwise
func (t *TorrentManager) binImmediately(torrentId string) bool {
if t.ImmediateBin.Contains(torrentId) {
if err := t.api.DeleteTorrent(torrentId); err != nil {
t.repairLog.Warnf("Failed to delete torrent %s: %v", torrentId, err)
}
t.ImmediateBin.Remove(torrentId)
t.repairLog.Debugf("Bin: immediate deletion of torrent %s", torrentId)
t.persistBins()
return true
}
return false
}
// binOnceDoneErrorCheck checks if the torrent is in error states and then checks if it should be deleted
func (t *TorrentManager) binOnceDoneErrorCheck(torrentId, status string) bool {
if status == "downloading" || status == "downloaded" || status == "uploading" || status == "queued" || status == "compressing" || status == "waiting_files_selection" {
return false
}
t.repairLog.Infof("Bin: error status=%s, checking if %s should be deleted", status, torrentId)
return t.binOnceDone(torrentId, true)
}
// binOnceDone checks if the torrent is in the OnceDoneBin and deletes it if it is.
// returns true if the torrent was in the bin and was deleted, false otherwise
func (t *TorrentManager) binOnceDone(completedTorrentId string, errorCheck bool) bool {
if t.OnceDoneBin.Contains(completedTorrentId) {
if err := t.api.DeleteTorrent(completedTorrentId); err != nil {
t.repairLog.Warnf("Failed to delete torrent %s: %v", completedTorrentId, err)
}
t.deleteInfoFile(completedTorrentId)
if t.OnceDoneBin.ContainsOne(completedTorrentId) {
t.DeleteByID(completedTorrentId)
t.OnceDoneBin.Remove(completedTorrentId)
if errorCheck {
t.repairLog.Errorf("Bin: error deletion of torrent %s", completedTorrentId)
t.repairLog.Infof("Bin: deleting torrent id=%s early because it has encountered an error", completedTorrentId)
} else {
t.repairLog.Debugf("Bin: done deletion of torrent %s", completedTorrentId)
}
@@ -152,7 +119,7 @@ func (t *TorrentManager) binOnceDone(completedTorrentId string, errorCheck bool)
// special case: yyy-xxx means if yyy is done, delete xxx
specialCase := fmt.Sprintf("%s-", completedTorrentId)
if !t.OnceDoneBin.Contains(specialCase) {
if !t.OnceDoneBin.ContainsOne(specialCase) {
return false
}
t.deleteInfoFile(completedTorrentId)
@@ -160,16 +127,12 @@ func (t *TorrentManager) binOnceDone(completedTorrentId string, errorCheck bool)
t.OnceDoneBin.Clone().Each(func(entry string) bool {
if strings.Contains(entry, specialCase) {
if errorCheck {
if err := t.api.DeleteTorrent(completedTorrentId); err != nil {
t.repairLog.Warnf("Failed to delete torrent %s: %v", completedTorrentId, err)
}
t.DeleteByID(completedTorrentId)
t.OnceDoneBin.Remove(entry)
t.repairLog.Errorf("Bin: error deletion of torrent %s", completedTorrentId)
t.repairLog.Infof("Bin: deleting torrent id=%s early because it has encountered an error", completedTorrentId)
} else {
idToDelete := strings.Split(entry, "-")[1]
if err := t.api.DeleteTorrent(idToDelete); err != nil {
t.repairLog.Warnf("Failed to delete torrent %s: %v", idToDelete, err)
}
t.DeleteByID(idToDelete)
t.OnceDoneBin.Remove(entry)
t.repairLog.Debugf("Bin: %s completed, done deletion of torrent %s", completedTorrentId, idToDelete)
}

View File

@@ -25,9 +25,7 @@ func (t *TorrentManager) Delete(accessKey string, deleteInRD bool) {
if torrent, ok := allTorrents.Get(accessKey); ok {
if deleteInRD {
torrent.DownloadedIDs.Clone().Each(func(torrentID string) bool {
t.log.Debugf("Deleting torrent %s (id=%s) in RD", accessKey, torrentID)
t.api.DeleteTorrent(torrentID)
t.deleteInfoFile(torrentID)
t.DeleteByID(torrentID)
return false
})
}
@@ -38,3 +36,8 @@ func (t *TorrentManager) Delete(accessKey string, deleteInRD bool) {
})
allTorrents.Remove(accessKey)
}
func (t *TorrentManager) DeleteByID(torrentID string) {
t.api.DeleteTorrent(torrentID)
t.deleteInfoFile(torrentID)
}

View File

@@ -53,8 +53,8 @@ type TorrentManager struct {
repairRunning bool
repairRunningMu sync.Mutex
ImmediateBin mapset.Set[string]
OnceDoneBin mapset.Set[string]
OnceDoneBin mapset.Set[string]
DeleteOnCompletionBin cmap.ConcurrentMap[string, string]
}
// NewTorrentManager creates a new torrent manager
@@ -85,8 +85,8 @@ func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, w
latestState: &LibraryState{log: log},
ImmediateBin: mapset.NewSet[string](),
OnceDoneBin: mapset.NewSet[string](),
OnceDoneBin: mapset.NewSet[string](),
DeleteOnCompletionBin: cmap.New[string](),
}
t.initializeBins()
@@ -353,6 +353,7 @@ func (t *TorrentManager) mountNewDownloads() {
}
}
// StartDownloadsJob: permanent job for remounting downloads
func (t *TorrentManager) StartDownloadsJob() {
t.workerPool.Submit(func() {
remountTicker := time.NewTicker(time.Duration(t.Config.GetDownloadsEveryMins()) * time.Minute)
@@ -412,6 +413,7 @@ func copyFile(sourcePath, destPath string) error {
return nil
}
// StartDumpJob: permanent job for dumping torrents
func (t *TorrentManager) StartDumpJob() {
t.workerPool.Submit(func() {
dumpTicker := time.NewTicker(time.Duration(t.Config.GetDumpTorrentsEveryMins()) * time.Minute)
@@ -442,6 +444,7 @@ func (t *TorrentManager) analyzeAllTorrents() {
})
}
// StartMediaAnalysisJob: permanent job for analyzing media info (triggered by the user)
func (t *TorrentManager) StartMediaAnalysisJob() {
t.workerPool.Submit(func() {
for range t.AnalyzeTrigger {

View File

@@ -37,8 +37,7 @@ func (t *TorrentManager) refreshTorrents(initialRun bool) {
idx := i
t.workerPool.Submit(func() {
defer wg.Done()
if t.binImmediately(instances[idx].ID) ||
t.binOnceDoneErrorCheck(instances[idx].ID, instances[idx].Status) ||
if t.binOnceDoneErrorCheck(instances[idx].ID, instances[idx].Status) ||
instances[idx].Progress != 100 {
mergeChan <- nil
return
@@ -55,7 +54,7 @@ func (t *TorrentManager) refreshTorrents(initialRun bool) {
allTorrents.Set(accessKey, torrent)
t.writeTorrentToFile(torrent)
t.assignDirectory(torrent, !initialRun)
} else if !mainTorrent.DownloadedIDs.Contains(tInfo.ID) {
} else if !mainTorrent.DownloadedIDs.ContainsOne(tInfo.ID) {
forMerging = torrent
}
@@ -98,20 +97,21 @@ func (t *TorrentManager) refreshTorrents(initialRun bool) {
t.log.Infof("Compiled into %d unique torrents", allTorrents.Count())
// delete info files that are no longer present
// it also runs binOnceDone (needed for cleanup every refresh)
t.getInfoFiles().Each(func(path string) bool {
path = filepath.Base(path)
torrentID := strings.TrimSuffix(path, ".zurginfo")
// if binOnceDone returns true, it means the info file is deleted
// if false, then we check if it's one of the torrents we just fetched
// if not (both are false), then we delete the info file
if !t.binOnceDone(torrentID, false) && !freshIDs.Contains(torrentID) {
if !t.binOnceDone(torrentID, false) && !freshIDs.ContainsOne(torrentID) {
t.deleteInfoFile(torrentID)
}
return false
})
// cleans up DownloadedIDs field of all torrents
t.workerPool.Submit(func() {
// update DownloadedIDs field of torrents
allTorrents.IterCb(func(accessKey string, torrent *Torrent) {
deletedIDs := torrent.DownloadedIDs.Difference(freshIDs)
if deletedIDs.Cardinality() > 0 {
@@ -167,7 +167,7 @@ func (t *TorrentManager) getMoreInfo(rdTorrent realdebrid.Torrent) *realdebrid.T
func (t *TorrentManager) convertToTorrent(info *realdebrid.TorrentInfo) *Torrent {
torrent := t.readTorrentFromFile("data/" + info.Hash + ".zurgtorrent")
if torrent != nil && torrent.DownloadedIDs.Contains(info.ID) {
if torrent != nil && torrent.DownloadedIDs.ContainsOne(info.ID) {
return torrent
}
@@ -189,7 +189,7 @@ func (t *TorrentManager) convertToTorrent(info *realdebrid.TorrentInfo) *Torrent
var selectedFiles []*File
for _, file := range info.Files {
filename := filepath.Base(file.Path)
if allFilenames.Contains(filename) {
if allFilenames.ContainsOne(filename) {
dupeFilenames.Add(filename)
} else {
allFilenames.Add(filename)
@@ -228,7 +228,7 @@ func (t *TorrentManager) convertToTorrent(info *realdebrid.TorrentInfo) *Torrent
for _, file := range selectedFiles {
baseFilename := t.GetPath(file)
// todo better handling of duplicate filenames
if dupeFilenames.Contains(baseFilename) {
if dupeFilenames.ContainsOne(baseFilename) {
extension := filepath.Ext(baseFilename)
filenameNoExt := strings.TrimSuffix(baseFilename, extension)
newName := fmt.Sprintf("%s (%d)%s", filenameNoExt, file.ID, extension)

View File

@@ -29,6 +29,7 @@ func (t *TorrentManager) StartRepairJob() {
t.RepairQueue = mapset.NewSet[*Torrent]()
t.RepairAllTrigger = make(chan struct{})
// periodic repair worker
t.workerPool.Submit(func() {
t.repairLog.Debug("Starting periodic repair job")
repairTicker := time.NewTicker(time.Duration(t.Config.GetRepairEveryMins()) * time.Minute)
@@ -46,7 +47,7 @@ func (t *TorrentManager) StartRepairJob() {
}
})
// there is 1 repair worker, with max 1 blocking task
// repair worker
t.workerPool.Submit(func() {
for {
select {
@@ -90,7 +91,6 @@ func (t *TorrentManager) invokeRepair(torrent *Torrent) {
t.repairRunningMu.Unlock()
// Execute the repair job
time.Sleep(10 * time.Second)
t.executeRepairJob(torrent)
// After repair is done
@@ -120,10 +120,11 @@ func (t *TorrentManager) executeRepairJob(torrent *Torrent) {
var wg sync.WaitGroup
haystack.IterCb(func(_ string, torrent *Torrent) {
wg.Add(1)
// temp worker for finding broken torrents
t.workerPool.Submit(func() {
defer wg.Done()
canExtract := t.Config.GetRarAction() == "extract" && strings.Contains(torrent.UnrepairableReason, "rar")
if torrent.UnrepairableReason != "" || !canExtract {
if !canExtract || torrent.UnrepairableReason != "" {
return
}
// check 1: for broken files
@@ -234,12 +235,12 @@ func (t *TorrentManager) repair(torrent *Torrent, wg *sync.WaitGroup) {
}
// if it's still broken, let's delete the newly downloaded torrent
t.setToBinImmediately(info.ID)
t.DeleteByID(info.ID)
err = fmt.Errorf("links are still broken")
} else if info != nil && info.Progress != 100 {
// it's faster to download just the broken files, so let's delete the newly downloaded torrent
t.setToBinImmediately(info.ID)
t.DeleteByID(info.ID)
err = fmt.Errorf("no longer cached")
}
@@ -278,7 +279,7 @@ func (t *TorrentManager) repair(torrent *Torrent, wg *sync.WaitGroup) {
t.repairLog.Warnf("Cannot repair torrent %s by downloading broken files (error=%v) giving up", t.GetKey(torrent), err)
// delete the newly downloaded torrents because the operation failed
for _, newId := range newlyDownloadedIds {
t.setToBinImmediately(newId)
t.DeleteByID(newId)
}
return
}
@@ -414,10 +415,10 @@ func (t *TorrentManager) assignLinks(torrent *Torrent) bool {
t.repairLog.Warnf("Torrent %s is rar'ed and we cannot repair it", t.GetKey(torrent))
t.markAsUnfixable(torrent, "rar'ed by RD")
t.markAsUnplayable(torrent, "rar'ed by RD")
torrent.State.Event(context.Background(), "mark_as_repaired")
}
torrent.UnassignedLinks = mapset.NewSet[string]()
// torrent.State.Event(context.Background(), "mark_as_repaired")
t.writeTorrentToFile(torrent)
return false // end repair
@@ -478,20 +479,20 @@ func (t *TorrentManager) redownloadTorrent(torrent *Torrent, selection []string)
for {
retries++
if retries > 10 {
t.setToBinImmediately(newTorrentID)
t.DeleteByID(newTorrentID)
return nil, fmt.Errorf("cannot start redownloading: too many retries")
}
err = t.api.SelectTorrentFiles(newTorrentID, finalSelection)
if err != nil {
t.setToBinImmediately(newTorrentID)
t.DeleteByID(newTorrentID)
return nil, fmt.Errorf("cannot start redownloading: %v", err)
}
time.Sleep(2 * time.Second)
info, err = t.api.GetTorrentInfo(newTorrentID)
if err != nil {
t.setToBinImmediately(newTorrentID)
t.DeleteByID(newTorrentID)
return nil, fmt.Errorf("cannot get info on redownloaded : %v", err)
}
@@ -504,13 +505,13 @@ func (t *TorrentManager) redownloadTorrent(torrent *Torrent, selection []string)
// documented status: magnet_error, magnet_conversion, waiting_files_selection, queued, downloading, downloaded, error, virus, compressing, uploading, dead
if info.Status != "downloading" && info.Status != "downloaded" && info.Status != "uploading" && info.Status != "queued" && info.Status != "compressing" {
t.setToBinImmediately(newTorrentID)
t.DeleteByID(newTorrentID)
return nil, fmt.Errorf("non-OK state: %s", info.Status)
}
// check if incorrect number of links
if info.Progress == 100 && len(info.Links) != len(selection) {
t.setToBinImmediately(newTorrentID)
t.DeleteByID(newTorrentID)
return nil, fmt.Errorf("only got %d links but we need %d", len(info.Links), len(selection))
} else if info.Progress != 100 {
t.repairLog.Infof("Downloading torrent %s (id=%s, progress=%d)", t.GetKey(torrent), info.ID, info.Progress)

View File

@@ -12,6 +12,9 @@ func NewTorrentState(initial string) *fsm.FSM {
{Name: "break_torrent", Src: []string{"ok_torrent"}, Dst: "broken_torrent"},
// when repair has been started
{Name: "repair_torrent", Src: []string{"ok_torrent", "broken_torrent"}, Dst: "under_repair_torrent"},
// when converting to torrent
// when merging with another same hash torrent
// when a torrent is rar'ed and not extracting
{Name: "mark_as_repaired", Src: []string{"broken_torrent", "under_repair_torrent"}, Dst: "ok_torrent"},
},
fsm.Callbacks{},