Optimizations
This commit is contained in:
@@ -27,6 +27,7 @@ import (
|
||||
func MainApp(configPath string) {
|
||||
utils.EnsureDirExists("logs") // log files
|
||||
utils.EnsureDirExists("data") // cache files (info, bins, etc.)
|
||||
utils.EnsureDirExists("data/info")
|
||||
utils.EnsureDirExists("dump") // "zurgtorrent" files
|
||||
|
||||
logPath := fmt.Sprintf("logs/zurg-%s-%s.log", time.Now().Format(time.DateOnly), time.Now().Format(time.TimeOnly))
|
||||
|
||||
@@ -68,17 +68,24 @@ func (t *TorrentManager) persistBins() {
|
||||
}
|
||||
|
||||
func (t *TorrentManager) setToBinImmediately(torrentId string) {
|
||||
t.log.Debugf("Set to delete immediately: %s", torrentId)
|
||||
t.log.Debugf("id=%s set to delete immediately", torrentId)
|
||||
t.ImmediateBin.Add(torrentId)
|
||||
t.persistBins()
|
||||
}
|
||||
|
||||
func (t *TorrentManager) setToBinOnceDone(torrentId string) {
|
||||
t.log.Debugf("Set to delete once completed: %s", torrentId)
|
||||
t.log.Debugf("id=%s set to delete once it completes", torrentId)
|
||||
t.OnceDoneBin.Add(torrentId)
|
||||
t.persistBins()
|
||||
}
|
||||
|
||||
func (t *TorrentManager) setXToBinOnceYDone(deleteId, completeId string) {
|
||||
t.log.Debugf("id=%s set to delete once id=%s completes", deleteId, completeId)
|
||||
t.OnceDoneBin.Add(fmt.Sprintf("%s-", completeId))
|
||||
t.OnceDoneBin.Add(fmt.Sprintf("%s-%s", completeId, deleteId))
|
||||
t.persistBins()
|
||||
}
|
||||
|
||||
func (t *TorrentManager) binImmediately(torrentId string) bool {
|
||||
if t.ImmediateBin.Contains(torrentId) {
|
||||
if err := t.api.DeleteTorrent(torrentId); err != nil {
|
||||
@@ -86,42 +93,52 @@ func (t *TorrentManager) binImmediately(torrentId string) bool {
|
||||
return false
|
||||
}
|
||||
t.ImmediateBin.Remove(torrentId)
|
||||
t.persistBins()
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (t *TorrentManager) binOnceDoneErrorCheck(torrentId, status string) bool {
|
||||
okStatuses := mapset.NewSet("downloading", "downloaded", "uploading", "queued", "compressing")
|
||||
if !okStatuses.Contains(status) {
|
||||
return t.binOnceDone(torrentId)
|
||||
if status == "downloading" || status == "downloaded" || status == "uploading" || status == "queued" || status == "compressing" {
|
||||
return false
|
||||
}
|
||||
return false
|
||||
return t.binOnceDone(torrentId)
|
||||
}
|
||||
|
||||
func (t *TorrentManager) binOnceDone(torrentId string) bool {
|
||||
found := false
|
||||
binnedIDs := t.OnceDoneBin.ToSlice()
|
||||
// special case: xxx-yyy means if xxx is done, delete yyy
|
||||
if t.OnceDoneBin.Contains(torrentId) {
|
||||
if err := t.api.DeleteTorrent(torrentId); err != nil {
|
||||
t.log.Errorf("Failed to delete torrent %s: %v", torrentId, err)
|
||||
return false
|
||||
}
|
||||
t.OnceDoneBin.Remove(torrentId)
|
||||
t.persistBins()
|
||||
return true
|
||||
}
|
||||
|
||||
// special case: yyy-xxx means if yyy is done, delete xxx
|
||||
specialCase := fmt.Sprintf("%s-", torrentId)
|
||||
for _, entry := range binnedIDs {
|
||||
if !t.OnceDoneBin.Contains(specialCase) {
|
||||
return false
|
||||
}
|
||||
|
||||
hasError := false
|
||||
t.OnceDoneBin.Each(func(entry string) bool {
|
||||
if strings.Contains(entry, specialCase) {
|
||||
idToDelete := strings.Split(entry, "-")[1]
|
||||
if err := t.api.DeleteTorrent(idToDelete); err != nil {
|
||||
t.log.Errorf("Failed to delete torrent %s: %v", idToDelete, err)
|
||||
continue
|
||||
hasError = true
|
||||
return true
|
||||
}
|
||||
t.OnceDoneBin.Remove(entry)
|
||||
found = true
|
||||
} else if entry == torrentId {
|
||||
if err := t.api.DeleteTorrent(torrentId); err != nil {
|
||||
t.log.Errorf("Failed to delete torrent %s: %v", torrentId, err)
|
||||
return false
|
||||
}
|
||||
t.OnceDoneBin.Remove(torrentId)
|
||||
return true
|
||||
}
|
||||
return false
|
||||
})
|
||||
if !hasError {
|
||||
t.OnceDoneBin.Remove(specialCase)
|
||||
}
|
||||
|
||||
return found
|
||||
t.persistBins()
|
||||
return true
|
||||
}
|
||||
|
||||
@@ -94,6 +94,22 @@ func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, r
|
||||
|
||||
t.workerPool.Submit(func() {
|
||||
defer wg.Done()
|
||||
|
||||
// load *.zurgtorrent files
|
||||
allTorrents, _ := t.DirectoryMap.Get(INT_ALL)
|
||||
t.getTorrentFiles("data").Each(func(filePath string) bool {
|
||||
torrent := t.readTorrentFromFile(filePath)
|
||||
if torrent != nil {
|
||||
accessKey := t.GetKey(torrent)
|
||||
allTorrents.Set(accessKey, torrent)
|
||||
t.assignDirectory(torrent, func(directory string) {
|
||||
listing, _ := t.DirectoryMap.Get(directory)
|
||||
listing.Set(accessKey, torrent)
|
||||
})
|
||||
}
|
||||
return false
|
||||
})
|
||||
|
||||
t.refreshTorrents()
|
||||
})
|
||||
t.workerPool.Submit(func() {
|
||||
@@ -253,8 +269,8 @@ func (t *TorrentManager) readTorrentFromFile(filePath string) *Torrent {
|
||||
return torrent
|
||||
}
|
||||
|
||||
func (t *TorrentManager) deleteTorrentFile(hash string) {
|
||||
filePath := "data/" + hash + ".zurgtorrent"
|
||||
func (t *TorrentManager) deleteTorrentFile(filename string) {
|
||||
filePath := "data/" + filename + ".zurgtorrent"
|
||||
_ = os.Remove(filePath)
|
||||
}
|
||||
|
||||
@@ -263,7 +279,7 @@ func (t *TorrentManager) deleteTorrentFile(hash string) {
|
||||
/// info functions
|
||||
|
||||
func (t *TorrentManager) getInfoFiles() mapset.Set[string] {
|
||||
files, err := filepath.Glob("data/*.zurginfo")
|
||||
files, err := filepath.Glob("data/info/*.zurginfo")
|
||||
if err != nil {
|
||||
t.log.Warnf("Cannot get files in data directory: %v", err)
|
||||
return nil
|
||||
@@ -272,7 +288,7 @@ func (t *TorrentManager) getInfoFiles() mapset.Set[string] {
|
||||
}
|
||||
|
||||
func (t *TorrentManager) writeInfoToFile(info *realdebrid.TorrentInfo) {
|
||||
filePath := "data/" + info.ID + ".zurginfo"
|
||||
filePath := "data/info/" + info.ID + ".zurginfo"
|
||||
file, err := os.Create(filePath)
|
||||
if err != nil {
|
||||
t.log.Warnf("Cannot create info file %s: %v", filePath, err)
|
||||
@@ -295,7 +311,7 @@ func (t *TorrentManager) writeInfoToFile(info *realdebrid.TorrentInfo) {
|
||||
}
|
||||
|
||||
func (t *TorrentManager) readInfoFromFile(torrentID string) *realdebrid.TorrentInfo {
|
||||
filePath := "data/" + torrentID + ".zurginfo"
|
||||
filePath := "data/info/" + torrentID + ".zurginfo"
|
||||
file, err := os.Open(filePath)
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
@@ -316,7 +332,7 @@ func (t *TorrentManager) readInfoFromFile(torrentID string) *realdebrid.TorrentI
|
||||
}
|
||||
|
||||
func (t *TorrentManager) deleteInfoFile(torrentID string) {
|
||||
filePath := "data/" + torrentID + ".zurginfo"
|
||||
filePath := "data/info/" + torrentID + ".zurginfo"
|
||||
_ = os.Remove(filePath)
|
||||
}
|
||||
|
||||
|
||||
@@ -32,24 +32,6 @@ func (t *TorrentManager) refreshTorrents() []string {
|
||||
freshIDs := mapset.NewSet[string]()
|
||||
freshAccessKeys := mapset.NewSet[string]()
|
||||
|
||||
cachedAccessKeys := mapset.NewSet[string]()
|
||||
t.getTorrentFiles("data").Each(func(filePath string) bool {
|
||||
torrent := t.readTorrentFromFile(filePath)
|
||||
if torrent != nil {
|
||||
accessKey := t.GetKey(torrent)
|
||||
allTorrents.Set(accessKey, torrent)
|
||||
t.assignDirectory(torrent, func(directory string) {
|
||||
listing, _ := t.DirectoryMap.Get(directory)
|
||||
listing.Set(accessKey, torrent)
|
||||
|
||||
updatedPaths.Add(fmt.Sprintf("%s/%s", directory, accessKey))
|
||||
})
|
||||
filename := filepath.Base(filePath)
|
||||
cachedAccessKeys.Add(strings.TrimSuffix(filename, ".zurgtorrent"))
|
||||
}
|
||||
return false
|
||||
})
|
||||
|
||||
for i := range instances {
|
||||
wg.Add(1)
|
||||
idx := i
|
||||
@@ -80,15 +62,12 @@ func (t *TorrentManager) refreshTorrents() []string {
|
||||
|
||||
updatedPaths.Add(fmt.Sprintf("%s/%s", directory, accessKey))
|
||||
})
|
||||
|
||||
t.writeTorrentToFile(torrent)
|
||||
} else if !mainTorrent.DownloadedIDs.Contains(tInfo.ID) {
|
||||
forMerging = torrent
|
||||
}
|
||||
|
||||
// write to file if it is a new torrent
|
||||
if forMerging == nil && !cachedAccessKeys.Contains(accessKey) {
|
||||
t.writeTorrentToFile(torrent)
|
||||
}
|
||||
|
||||
mergeChan <- forMerging
|
||||
})
|
||||
}
|
||||
@@ -137,20 +116,15 @@ func (t *TorrentManager) refreshTorrents() []string {
|
||||
|
||||
t.log.Infof("Compiled into %d torrents, %d were missing info", allTorrents.Count(), noInfoCount)
|
||||
|
||||
existingIDs := mapset.NewSet[string]()
|
||||
// delete info files that are no longer present
|
||||
t.getInfoFiles().Each(func(path string) bool {
|
||||
path = filepath.Base(path)
|
||||
torrentID := strings.TrimSuffix(path, ".zurginfo")
|
||||
if !t.binOnceDone(torrentID) {
|
||||
existingIDs.Add(torrentID)
|
||||
if !t.binOnceDone(torrentID) && !freshIDs.Contains(torrentID) {
|
||||
t.deleteInfoFile(torrentID)
|
||||
}
|
||||
return false
|
||||
})
|
||||
existingIDs.Difference(freshIDs).Each(func(id string) bool {
|
||||
// t.log.Infof("Deleting stale info file %s", id)
|
||||
t.deleteInfoFile(id)
|
||||
return false
|
||||
})
|
||||
|
||||
return updatedPaths.ToSlice()
|
||||
}
|
||||
|
||||
@@ -146,22 +146,19 @@ func (t *TorrentManager) repairAll(torrent *Torrent) {
|
||||
}
|
||||
|
||||
func (t *TorrentManager) Repair(torrent *Torrent, wg *sync.WaitGroup) {
|
||||
defer wg.Done()
|
||||
|
||||
// blocks for approx 45 minutes if active torrents are full
|
||||
if !t.canCapacityHandle() {
|
||||
t.repairLog.Error("Blocked for too long due to limit of active torrents, cannot continue with the repair")
|
||||
wg.Done()
|
||||
return
|
||||
}
|
||||
|
||||
// assign to a worker
|
||||
_ = t.workerPool.Submit(func() {
|
||||
defer wg.Done()
|
||||
if err := torrent.State.Event(context.Background(), "repair_torrent"); err != nil {
|
||||
t.repairLog.Errorf("Failed to mark torrent %s as under repair: %v", t.GetKey(torrent), err)
|
||||
return
|
||||
}
|
||||
t.repair(torrent)
|
||||
})
|
||||
if err := torrent.State.Event(context.Background(), "repair_torrent"); err != nil {
|
||||
t.repairLog.Errorf("Failed to mark torrent %s as under repair: %v", t.GetKey(torrent), err)
|
||||
return
|
||||
}
|
||||
t.repair(torrent)
|
||||
}
|
||||
|
||||
// repairman
|
||||
@@ -196,7 +193,7 @@ func (t *TorrentManager) repair(torrent *Torrent) {
|
||||
// delete the torrents it replaced
|
||||
torrent.DownloadedIDs.Each(func(torrentID string) bool {
|
||||
if torrentID != info.ID {
|
||||
t.setToBinOnceDone(fmt.Sprintf("%s-%s", info.ID, torrentID))
|
||||
t.setXToBinOnceYDone(torrentID, info.ID)
|
||||
}
|
||||
return false
|
||||
})
|
||||
@@ -206,7 +203,7 @@ func (t *TorrentManager) repair(torrent *Torrent) {
|
||||
// once info.ID is done, we can delete the old torrent
|
||||
torrent.DownloadedIDs.Each(func(torrentID string) bool {
|
||||
if torrentID != info.ID {
|
||||
t.setToBinOnceDone(fmt.Sprintf("%s-%s", info.ID, torrentID))
|
||||
t.setXToBinOnceYDone(torrentID, info.ID)
|
||||
}
|
||||
return false
|
||||
})
|
||||
@@ -441,8 +438,7 @@ func (t *TorrentManager) redownloadTorrent(torrent *Torrent, selection []string)
|
||||
}
|
||||
|
||||
// documented status: magnet_error, magnet_conversion, waiting_files_selection, queued, downloading, downloaded, error, virus, compressing, uploading, dead
|
||||
okStatuses := mapset.NewSet("downloading", "downloaded", "uploading", "queued", "compressing")
|
||||
if !okStatuses.Contains(info.Status) {
|
||||
if info.Status != "downloading" && info.Status != "downloaded" && info.Status != "uploading" && info.Status != "queued" && info.Status != "compressing" {
|
||||
t.setToBinImmediately(newTorrentID)
|
||||
return nil, fmt.Errorf("non-OK state: %s", info.Status)
|
||||
}
|
||||
|
||||
@@ -17,9 +17,6 @@ func NewTorrentState(initial string) *fsm.FSM {
|
||||
}
|
||||
|
||||
func NewFileState(initial string) *fsm.FSM {
|
||||
// ok_file 13
|
||||
// broken_file 5
|
||||
// deleted_file 3
|
||||
return fsm.NewFSM(
|
||||
initial,
|
||||
fsm.Events{
|
||||
|
||||
Reference in New Issue
Block a user