Final repair fixes
This commit is contained in:
@@ -41,15 +41,22 @@ func MainApp(configPath string) {
|
|||||||
|
|
||||||
premium.MonitorPremiumStatus(rd, zurglog)
|
premium.MonitorPremiumStatus(rd, zurglog)
|
||||||
|
|
||||||
p, err := ants.NewPool(config.GetNumOfWorkers() + 1)
|
workerPool, err := ants.NewPool(config.GetNumOfWorkers() + 1)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
zurglog.Errorf("Failed to create worker pool: %v", err)
|
zurglog.Errorf("Failed to create worker pool: %v", err)
|
||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
defer p.Release()
|
defer workerPool.Release()
|
||||||
|
|
||||||
|
repairPool, err := ants.NewPool(1, ants.WithMaxBlockingTasks(1))
|
||||||
|
if err != nil {
|
||||||
|
zurglog.Errorf("Failed to create repair pool: %v", err)
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
defer repairPool.Release()
|
||||||
|
|
||||||
utils.EnsureDirExists("data") // Ensure the data directory exists
|
utils.EnsureDirExists("data") // Ensure the data directory exists
|
||||||
torrentMgr := torrent.NewTorrentManager(config, rd, p, log.Named("manager"))
|
torrentMgr := torrent.NewTorrentManager(config, rd, workerPool, repairPool, log.Named("manager"))
|
||||||
|
|
||||||
downloadClient := http.NewHTTPClient(config.GetToken(), config.GetRetriesUntilFailed(), 0, true, config, log.Named("dlclient"))
|
downloadClient := http.NewHTTPClient(config.GetToken(), config.GetRetriesUntilFailed(), 0, true, config, log.Named("dlclient"))
|
||||||
downloader := universal.NewDownloader(downloadClient)
|
downloader := universal.NewDownloader(downloadClient)
|
||||||
|
|||||||
@@ -33,13 +33,14 @@ type TorrentManager struct {
|
|||||||
latestState *LibraryState
|
latestState *LibraryState
|
||||||
requiredVersion string
|
requiredVersion string
|
||||||
workerPool *ants.Pool
|
workerPool *ants.Pool
|
||||||
|
repairPool *ants.Pool
|
||||||
log *logutil.Logger
|
log *logutil.Logger
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewTorrentManager creates a new torrent manager
|
// NewTorrentManager creates a new torrent manager
|
||||||
// it will fetch all torrents and their info in the background
|
// it will fetch all torrents and their info in the background
|
||||||
// and store them in-memory and cached in files
|
// and store them in-memory and cached in files
|
||||||
func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, p *ants.Pool, log *logutil.Logger) *TorrentManager {
|
func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, workerPool, repairPool *ants.Pool, log *logutil.Logger) *TorrentManager {
|
||||||
t := &TorrentManager{
|
t := &TorrentManager{
|
||||||
Config: cfg,
|
Config: cfg,
|
||||||
Api: api,
|
Api: api,
|
||||||
@@ -51,7 +52,8 @@ func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, p
|
|||||||
allAccessKeys: mapset.NewSet[string](),
|
allAccessKeys: mapset.NewSet[string](),
|
||||||
latestState: &LibraryState{},
|
latestState: &LibraryState{},
|
||||||
requiredVersion: "11.01.2024",
|
requiredVersion: "11.01.2024",
|
||||||
workerPool: p,
|
workerPool: workerPool,
|
||||||
|
repairPool: repairPool,
|
||||||
log: log,
|
log: log,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -149,7 +151,7 @@ func (t *TorrentManager) assignedDirectoryCb(tor *Torrent, cb func(string)) {
|
|||||||
tor.SelectedFiles.IterCb(func(key string, file *File) {
|
tor.SelectedFiles.IterCb(func(key string, file *File) {
|
||||||
filenames = append(filenames, filepath.Base(file.Path))
|
filenames = append(filenames, filepath.Base(file.Path))
|
||||||
fileSizes = append(fileSizes, file.Bytes)
|
fileSizes = append(fileSizes, file.Bytes)
|
||||||
if !tor.Unfixable && unplayable && utils.IsStreamable(file.Path) {
|
if !tor.Unrepairable && unplayable && utils.IsStreamable(file.Path) {
|
||||||
unplayable = false
|
unplayable = false
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|||||||
@@ -224,7 +224,7 @@ func (t *TorrentManager) mergeToMain(existing, toMerge *Torrent) Torrent {
|
|||||||
Hash: existing.Hash,
|
Hash: existing.Hash,
|
||||||
DownloadedIDs: mapset.NewSet[string](),
|
DownloadedIDs: mapset.NewSet[string](),
|
||||||
InProgressIDs: mapset.NewSet[string](),
|
InProgressIDs: mapset.NewSet[string](),
|
||||||
Unfixable: existing.Unfixable || toMerge.Unfixable,
|
Unrepairable: existing.Unrepairable || toMerge.Unrepairable,
|
||||||
UnassignedLinks: existing.UnassignedLinks.Union(toMerge.UnassignedLinks),
|
UnassignedLinks: existing.UnassignedLinks.Union(toMerge.UnassignedLinks),
|
||||||
BrokenLinks: existing.BrokenLinks.Union(toMerge.BrokenLinks),
|
BrokenLinks: existing.BrokenLinks.Union(toMerge.BrokenLinks),
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -17,10 +17,10 @@ const (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func (t *TorrentManager) RepairAll() {
|
func (t *TorrentManager) RepairAll() {
|
||||||
_ = t.workerPool.Submit(func() {
|
_ = t.repairPool.Submit(func() {
|
||||||
t.log.Info("Repairing all broken torrents")
|
t.log.Info("Checking for broken torrents")
|
||||||
t.repairAll()
|
t.repairAll()
|
||||||
t.log.Info("Finished repairing all torrents")
|
t.log.Info("Finished checking for broken torrents")
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -34,7 +34,7 @@ func (t *TorrentManager) repairAll() {
|
|||||||
hashGroups = append(hashGroups, currentGroup)
|
hashGroups = append(hashGroups, currentGroup)
|
||||||
|
|
||||||
allTorrents.IterCb(func(_ string, torrent *Torrent) {
|
allTorrents.IterCb(func(_ string, torrent *Torrent) {
|
||||||
if torrent.AnyInProgress() || torrent.Unfixable {
|
if torrent.AnyInProgress() || torrent.Unrepairable {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if currentGroup.Cardinality() >= maxGroupSize {
|
if currentGroup.Cardinality() >= maxGroupSize {
|
||||||
@@ -69,7 +69,7 @@ func (t *TorrentManager) repairAll() {
|
|||||||
|
|
||||||
var toRepair []*Torrent
|
var toRepair []*Torrent
|
||||||
allTorrents.IterCb(func(_ string, torrent *Torrent) {
|
allTorrents.IterCb(func(_ string, torrent *Torrent) {
|
||||||
if torrent.AnyInProgress() || torrent.Unfixable {
|
if torrent.AnyInProgress() || torrent.Unrepairable {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -100,16 +100,15 @@ func (t *TorrentManager) repairAll() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (t *TorrentManager) Repair(torrent *Torrent) {
|
func (t *TorrentManager) Repair(torrent *Torrent) {
|
||||||
|
if torrent.Unrepairable {
|
||||||
|
t.log.Warnf("Torrent %s is unfixable, skipping repair", t.GetKey(torrent))
|
||||||
|
return
|
||||||
|
}
|
||||||
if repairing, ok := t.Repairs.Get(t.GetKey(torrent)); ok && repairing {
|
if repairing, ok := t.Repairs.Get(t.GetKey(torrent)); ok && repairing {
|
||||||
t.log.Warnf("Torrent %s is already being repaired, skipping repair", t.GetKey(torrent))
|
t.log.Warnf("Torrent %s is already being repaired, skipping repair", t.GetKey(torrent))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
t.Repairs.Set(t.GetKey(torrent), true)
|
t.Repairs.Set(t.GetKey(torrent), true)
|
||||||
|
|
||||||
if torrent.Unfixable {
|
|
||||||
t.log.Warnf("Torrent %s is unfixable, skipping repair", t.GetKey(torrent))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
// save the broken files to the file cache
|
// save the broken files to the file cache
|
||||||
infoCache, _ := t.DirectoryMap.Get(INT_INFO_CACHE)
|
infoCache, _ := t.DirectoryMap.Get(INT_INFO_CACHE)
|
||||||
torrent.DownloadedIDs.Each(func(id string) bool {
|
torrent.DownloadedIDs.Each(func(id string) bool {
|
||||||
@@ -391,11 +390,11 @@ func (t *TorrentManager) markAsUnplayable(torrent *Torrent) {
|
|||||||
|
|
||||||
func (t *TorrentManager) markAsUnfixable(torrent *Torrent) {
|
func (t *TorrentManager) markAsUnfixable(torrent *Torrent) {
|
||||||
t.log.Warnf("Marking torrent %s as unfixable", t.GetKey(torrent))
|
t.log.Warnf("Marking torrent %s as unfixable", t.GetKey(torrent))
|
||||||
torrent.Unfixable = true
|
torrent.Unrepairable = true
|
||||||
infoCache, _ := t.DirectoryMap.Get(INT_INFO_CACHE)
|
infoCache, _ := t.DirectoryMap.Get(INT_INFO_CACHE)
|
||||||
torrent.DownloadedIDs.Each(func(id string) bool {
|
torrent.DownloadedIDs.Each(func(id string) bool {
|
||||||
info, _ := infoCache.Get(id)
|
info, _ := infoCache.Get(id)
|
||||||
info.Unfixable = true
|
info.Unrepairable = true
|
||||||
t.writeTorrentToFile(id, info)
|
t.writeTorrentToFile(id, info)
|
||||||
return false
|
return false
|
||||||
})
|
})
|
||||||
|
|||||||
@@ -23,7 +23,7 @@ type Torrent struct {
|
|||||||
OriginalName string `json:"OriginalName"` // immutable
|
OriginalName string `json:"OriginalName"` // immutable
|
||||||
Rename string `json:"Rename"` // modified over time
|
Rename string `json:"Rename"` // modified over time
|
||||||
SelectedFiles cmap.ConcurrentMap[string, *File] `json:"-"` // modified over time
|
SelectedFiles cmap.ConcurrentMap[string, *File] `json:"-"` // modified over time
|
||||||
Unfixable bool `json:"Unfixable"` // modified over time
|
Unrepairable bool `json:"Unfixable"` // modified over time
|
||||||
BrokenLinks mapset.Set[string] `json:"BrokenLinks"` // only relevant on repair
|
BrokenLinks mapset.Set[string] `json:"BrokenLinks"` // only relevant on repair
|
||||||
|
|
||||||
Version string `json:"Version"` // only used for files
|
Version string `json:"Version"` // only used for files
|
||||||
|
|||||||
Reference in New Issue
Block a user