A more efficient repair procedure

This commit is contained in:
Ben Sarmiento
2023-12-02 02:43:47 +01:00
parent b5a923fd17
commit 1cb80e5047
2 changed files with 115 additions and 125 deletions

View File

@@ -40,6 +40,7 @@ type TorrentManager struct {
latestState *LibraryState
requiredVersion string
workerPool *ants.Pool
repairWorker *ants.Pool
log *zap.SugaredLogger
}
@@ -48,6 +49,7 @@ type TorrentManager struct {
// and store them in-memory and cached in files
func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, p *ants.Pool, cache *ristretto.Cache, log *zap.SugaredLogger) *TorrentManager {
initialSate := EmptyState()
t := &TorrentManager{
Config: cfg,
Api: api,
@@ -95,16 +97,89 @@ func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, p
}
t.RefreshTorrents()
if t.Config.EnableRepair() {
repairWorker, err := ants.NewPool(1)
if err != nil {
log.Fatalf("Failed to create repair worker: %v", err)
}
t.repairWorker = repairWorker
t.RepairAll() // initial repair
} else {
t.log.Info("Repair is disabled, skipping repair check")
}
t.log.Info("Finished initializing torrent manager")
_ = t.workerPool.Submit(func() {
t.startRefreshJob()
})
t.log.Info("Finished initializing torrent manager")
return t
}
func (t *TorrentManager) RefreshTorrents() {
// get all torrent info
instances, _, err := t.Api.GetTorrents(0)
if err != nil {
t.log.Warnf("Cannot get torrents: %v\n", err)
return
}
instanceCount := len(instances)
infoChan := make(chan *Torrent, instanceCount)
var wg sync.WaitGroup
for i := range instances {
wg.Add(1)
idx := i // capture the loop variable
_ = t.workerPool.Submit(func() {
defer wg.Done()
infoChan <- t.getMoreInfo(instances[idx])
})
}
wg.Wait()
close(infoChan)
t.log.Infof("Fetched info for %d torrents", instanceCount)
freshKeys := set.NewStringSet()
oldTorrents, _ := t.DirectoryMap.Get(INT_ALL)
noInfoCount := 0
for info := range infoChan {
if info == nil {
noInfoCount++
continue
}
freshKeys.Add(info.AccessKey)
if torrent, exists := oldTorrents.Get(info.AccessKey); !exists {
oldTorrents.Set(info.AccessKey, info)
} else {
mainTorrent := t.mergeToMain(torrent, info)
oldTorrents.Set(info.AccessKey, mainTorrent)
}
}
// removed
strset.Difference(t.accessKeySet, freshKeys).Each(func(accessKey string) bool {
t.Delete(accessKey, false, false)
return true
})
// new
strset.Difference(freshKeys, t.accessKeySet).Each(func(accessKey string) bool {
torrent, _ := oldTorrents.Get(accessKey)
t.UpdateTorrentResponseCache(torrent)
t.accessKeySet.Add(accessKey)
return true
})
t.checkForOtherDeletedTorrents()
// now we can build the directory responses
t.UpdateDirectoryResponsesCache()
t.log.Infof("Compiled into %d torrents, %d were missing info", oldTorrents.Count(), noInfoCount)
t.SetNewLatestState(t.getCurrentState())
// todo: work on hook
// _ = t.workerPool.Submit(func() {
// OnLibraryUpdateHook(updatedPaths, t.Config, t.log)
// })
}
func (t *TorrentManager) mergeToMain(mainTorrent, torrentToMerge *Torrent) *Torrent {
// the link can have the following values
// 1. https://*** - the file is available
@@ -155,10 +230,6 @@ func (t *TorrentManager) SetNewLatestState(checksum LibraryState) {
t.latestState.TotalCount = checksum.TotalCount
}
func (t *TorrentManager) ScheduleForRefresh() {
t.SetNewLatestState(EmptyState())
}
type torrentsResp struct {
torrents []realdebrid.Torrent
totalCount int
@@ -232,108 +303,14 @@ func (t *TorrentManager) startRefreshJob() {
t.log.Infof("Detected changes! Refreshing %d torrents", checksum.TotalCount)
t.RefreshTorrents()
t.log.Info("Finished refreshing torrents")
}
}
func (t *TorrentManager) RefreshTorrents() {
// get all torrent info
instances, _, err := t.Api.GetTorrents(0)
if err != nil {
t.log.Warnf("Cannot get torrents: %v\n", err)
return
}
instanceCount := len(instances)
infoChan := make(chan *Torrent, instanceCount)
var wg sync.WaitGroup
for i := range instances {
wg.Add(1)
idx := i // capture the loop variable
_ = t.workerPool.Submit(func() {
defer wg.Done()
infoChan <- t.getMoreInfo(instances[idx])
})
}
wg.Wait()
close(infoChan)
t.log.Infof("Fetched info for %d torrents", instanceCount)
// todo: inefficient
// handle deleted torrents in info cache
// but only do it if there is an info cache filled
infoCache, _ := t.DirectoryMap.Get(INT_INFO_CACHE)
if infoCache.Count() > 0 {
t.cleanInfoCache(instances)
}
freshKeys := set.NewStringSet()
oldTorrents, _ := t.DirectoryMap.Get(INT_ALL)
noInfoCount := 0
for info := range infoChan {
if info == nil {
noInfoCount++
continue
}
freshKeys.Add(info.AccessKey)
if torrent, exists := oldTorrents.Get(info.AccessKey); !exists {
oldTorrents.Set(info.AccessKey, info)
if t.Config.EnableRepair() {
t.RepairAll()
} else {
mainTorrent := t.mergeToMain(torrent, info)
oldTorrents.Set(info.AccessKey, mainTorrent)
t.log.Info("Repair is disabled, skipping repair check")
}
}
// removed
strset.Difference(t.accessKeySet, freshKeys).Each(func(accessKey string) bool {
t.DirectoryMap.IterCb(func(_ string, torrents cmap.ConcurrentMap[string, *Torrent]) {
torrents.Remove(accessKey)
})
t.log.Infof("Deleted torrent: %s\n", accessKey)
return true
})
// new
strset.Difference(freshKeys, t.accessKeySet).Each(func(accessKey string) bool {
torrent, _ := oldTorrents.Get(accessKey)
t.UpdateTorrentResponseCache(torrent)
t.accessKeySet.Add(accessKey)
return true
})
t.checkForOtherDeletedTorrents()
// now we can build the directory responses
t.UpdateDirectoryResponsesCache()
t.log.Infof("Compiled into %d torrents, %d were missing info", oldTorrents.Count(), noInfoCount)
t.SetNewLatestState(t.getCurrentState())
if t.Config.EnableRepair() {
t.log.Info("Checking for torrents to repair")
t.repairAll()
t.log.Info("Finished checking for torrents to repair")
} else {
t.log.Info("Repair is disabled, skipping repair check")
}
// todo: work on hook
// _ = t.workerPool.Submit(func() {
// OnLibraryUpdateHook(updatedPaths, t.Config, t.log)
// })
}
func (t *TorrentManager) cleanInfoCache(torrents []realdebrid.Torrent) {
infoCache, _ := t.DirectoryMap.Get(INT_INFO_CACHE)
keep := make(map[string]bool)
for _, torrent := range torrents {
keep[torrent.ID] = true
}
var toDelete []string
infoCache.IterCb(func(torrentID string, torrent *Torrent) {
if _, ok := keep[torrentID]; !ok {
toDelete = append(toDelete, torrentID)
}
})
for _, torrentID := range toDelete {
infoCache.Remove(torrentID)
t.deleteTorrentFile(torrentID)
}
}
@@ -394,7 +371,7 @@ func (t *TorrentManager) getMoreInfo(rdTorrent realdebrid.Torrent) *Torrent {
}
torrent := Torrent{
AccessKey: t.getName(info.Name, info.OriginalName),
AccessKey: t.computeAccessKey(info.Name, info.OriginalName),
LatestAdded: info.Added,
Instances: []realdebrid.TorrentInfo{*info},
}
@@ -412,7 +389,7 @@ func (t *TorrentManager) getMoreInfo(rdTorrent realdebrid.Torrent) *Torrent {
return &torrent
}
func (t *TorrentManager) getName(name, originalName string) string {
func (t *TorrentManager) computeAccessKey(name, originalName string) string {
if t.Config.EnableRetainRDTorrentName() {
return name
}
@@ -552,7 +529,7 @@ func (t *TorrentManager) repairAll() {
}
allTorrents, _ := t.DirectoryMap.Get(INT_ALL)
var toRepair []string
var toRepair []*Torrent
allTorrents.IterCb(func(_ string, torrent *Torrent) {
if torrent.AnyInProgress() {
t.log.Debugf("Skipping %s for repairs because it is in progress", torrent.AccessKey)
@@ -566,13 +543,13 @@ func (t *TorrentManager) repairAll() {
}
})
if forRepair {
toRepair = append(toRepair, torrent.AccessKey)
toRepair = append(toRepair, torrent)
}
})
t.log.Debugf("Found %d torrents to repair", len(toRepair))
for _, accessKey := range toRepair {
t.log.Infof("Repairing %s", accessKey)
t.Repair(accessKey)
for i := range toRepair {
t.log.Infof("Repairing %s", toRepair[i].AccessKey)
t.repair(toRepair[i])
}
}
@@ -603,14 +580,18 @@ func (t *TorrentManager) checkForOtherDeletedTorrents() {
func (t *TorrentManager) Delete(accessKey string, deleteInRD bool, updateDirectoryResponses bool) {
if deleteInRD {
t.log.Infof("Deleting torrent %s in RD", accessKey)
allTorrents, _ := t.DirectoryMap.Get(INT_ALL)
infoCache, _ := t.DirectoryMap.Get(INT_INFO_CACHE)
if torrent, ok := allTorrents.Get(accessKey); ok {
for _, instance := range torrent.Instances {
t.log.Infof("Deleting torrent %s %s in RD", instance.ID, accessKey)
t.Api.DeleteTorrent(instance.ID)
infoCache.Remove(instance.ID)
t.deleteTorrentFile(instance.ID)
}
}
}
t.log.Infof("Removing torrent %s from zurg database", accessKey)
t.DirectoryMap.IterCb(func(directory string, torrents cmap.ConcurrentMap[string, *Torrent]) {
if ok := torrents.Has(accessKey); ok {
torrents.Remove(accessKey)
@@ -623,14 +604,7 @@ func (t *TorrentManager) Delete(accessKey string, deleteInRD bool, updateDirecto
}
}
func (t *TorrentManager) Repair(accessKey string) {
allTorrents, _ := t.DirectoryMap.Get(INT_ALL)
torrent, _ := allTorrents.Get(accessKey)
if torrent == nil {
t.log.Warnf("Cannot find torrent %s anymore so we are not repairing it", accessKey)
return
}
func (t *TorrentManager) repair(torrent *Torrent) {
if torrent.AnyInProgress() {
t.log.Infof("Torrent %s is in progress, cannot repair", torrent.AccessKey)
return
@@ -669,9 +643,6 @@ func (t *TorrentManager) Repair(accessKey string) {
} else if streamableCount == 1 {
t.log.Warnf("Torrent %s only file has expired (it will no longer show up in your directories, zurg suggests you delete it)", torrent.AccessKey)
t.log.Debugf("You can try fixing it yourself magnet:?xt=urn:btih:%s", torrent.Instances[0].Hash)
t.DirectoryMap.IterCb(func(_ string, torrents cmap.ConcurrentMap[string, *Torrent]) {
torrents.Remove(torrent.AccessKey)
})
t.Delete(torrent.AccessKey, false, true)
return
}
@@ -924,3 +895,19 @@ func (t *TorrentManager) AssignedDirectoryCb(tor *Torrent, cb func(string)) {
}
}
}
func (t *TorrentManager) RepairAll() {
_ = t.repairWorker.Submit(func() {
t.log.Info("Checking for torrents to repair")
t.repairAll()
t.log.Info("Finished checking for torrents to repair")
})
}
func (t *TorrentManager) Repair(torrent *Torrent) {
_ = t.repairWorker.Submit(func() {
t.log.Info("repairing torrent %s", torrent.AccessKey)
t.repair(torrent)
t.log.Info("Finished repairing torrent %s", torrent.AccessKey)
})
}