Rework torrent manager handling of update

This commit is contained in:
Ben Sarmiento
2023-12-02 01:32:11 +01:00
parent 67f509248e
commit 2f7e3b0ca9
7 changed files with 228 additions and 232 deletions

5
go.mod
View File

@@ -11,7 +11,10 @@ require github.com/orcaman/concurrent-map/v2 v2.0.1
require github.com/panjf2000/ants/v2 v2.8.2
require github.com/julienschmidt/httprouter v1.3.0 // indirect
require (
github.com/julienschmidt/httprouter v1.3.0 // indirect
github.com/scylladb/go-set v1.0.2 // indirect
)
require (
github.com/cespare/xxhash/v2 v2.1.1 // indirect

3
go.sum
View File

@@ -10,6 +10,7 @@ github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczC
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo=
github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/fatih/set v0.2.1/go.mod h1:+RKtMCH+favT2+3YecHGxcc0b4KyVWA1QWWJUs4E0CI=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=
@@ -25,6 +26,8 @@ github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/scylladb/go-set v1.0.2 h1:SkvlMCKhP0wyyct6j+0IHJkBkSZL+TDzZ4E7f7BCcRE=
github.com/scylladb/go-set v1.0.2/go.mod h1:DkpGd78rljTxKAnTDPFqXSGxvETQnJyuSOQwsHycqfs=
github.com/spf13/cobra v1.8.0 h1:7aJaZx1B85qltLMc546zn58BxxfZdR/W22ej9CFoEf0=
github.com/spf13/cobra v1.8.0/go.mod h1:WXLWApfZ71AjXPya3WOlMsY9yMs7YeiHhFVlvLyhcho=
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=

View File

@@ -23,6 +23,7 @@ type ConfigInterface interface {
GetRateLimitSleepSeconds() int
GetRealDebridTimeout() int
GetRetriesUntilFailed() int
EnableDownloadCache() bool
}
type ZurgConfig struct {
@@ -44,6 +45,7 @@ type ZurgConfig struct {
ForceIPv6 bool `yaml:"force_ipv6"`
RealDebridTimeout int `yaml:"realdebrid_timeout_secs"`
RetriesUntilFailed int `yaml:"retries_until_failed"`
UseDownloadCache bool `yaml:"use_download_cache"`
}
func (z *ZurgConfig) GetToken() string {
@@ -144,3 +146,7 @@ func (z *ZurgConfig) GetRetriesUntilFailed() int {
}
return z.RetriesUntilFailed
}
func (z *ZurgConfig) EnableDownloadCache() bool {
return z.UseDownloadCache
}

View File

@@ -14,7 +14,7 @@ func HandleDeleteTorrent(directory, torrentName string, t *torrent.TorrentManage
if !torrents.Has(torrentName) {
return fmt.Errorf("cannot find torrent %s", torrentName)
}
t.Delete(torrentName)
t.Delete(torrentName, true, true)
return nil
}
@@ -23,15 +23,15 @@ func HandleDeleteFile(directory, torrentName, fileName string, t *torrent.Torren
if !ok {
return fmt.Errorf("cannot find directory %s", directory)
}
tor, ok := torrents.Get(torrentName)
torrent, ok := torrents.Get(torrentName)
if !ok {
return fmt.Errorf("cannot find torrent %s", torrentName)
}
file, ok := tor.SelectedFiles.Get(fileName)
file, ok := torrent.SelectedFiles.Get(fileName)
if !ok {
return fmt.Errorf("cannot find file %s", fileName)
}
file.Link = "unselect"
t.ScheduleForRefresh()
t.UpdateTorrentResponseCache(torrent)
return nil
}

View File

@@ -20,6 +20,8 @@ import (
"github.com/dgraph-io/ristretto"
cmap "github.com/orcaman/concurrent-map/v2"
"github.com/panjf2000/ants/v2"
"github.com/scylladb/go-set"
"github.com/scylladb/go-set/strset"
"go.uber.org/zap"
)
@@ -34,6 +36,7 @@ type TorrentManager struct {
DirectoryMap cmap.ConcurrentMap[string, cmap.ConcurrentMap[string, *Torrent]] // directory -> accessKey -> Torrent
DownloadCache cmap.ConcurrentMap[string, *realdebrid.Download]
ResponseCache *ristretto.Cache
accessKeySet *strset.Set
latestState *LibraryState
requiredVersion string
workerPool *ants.Pool
@@ -45,12 +48,11 @@ type TorrentManager struct {
// and store them in-memory and cached in files
func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, p *ants.Pool, cache *ristretto.Cache, log *zap.SugaredLogger) *TorrentManager {
initialSate := EmptyState()
t := &TorrentManager{
Config: cfg,
Api: api,
DirectoryMap: cmap.New[cmap.ConcurrentMap[string, *Torrent]](),
ResponseCache: cache,
accessKeySet: set.NewStringSet(),
latestState: &initialSate,
requiredVersion: "18.11.2023",
workerPool: p,
@@ -58,6 +60,7 @@ func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, p
}
// create internal directories
t.DirectoryMap = cmap.New[cmap.ConcurrentMap[string, *Torrent]]()
t.DirectoryMap.Set(INT_ALL, cmap.New[*Torrent]()) // key is AccessKey
t.DirectoryMap.Set(INT_INFO_CACHE, cmap.New[*Torrent]()) // key is Torrent ID
// create directory maps
@@ -67,89 +70,31 @@ func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, p
// Fetch downloads
t.DownloadCache = cmap.New[*realdebrid.Download]()
// _ = t.workerPool.Submit(func() {
// page := 1
// offset := 0
// for {
// downloads, totalDownloads, err := t.Api.GetDownloads(page, offset)
// if err != nil {
// t.log.Fatalf("Cannot get downloads: %v\n", err)
// }
// for i := range downloads {
// if !t.DownloadCache.Has(downloads[i].Link) {
// t.DownloadCache.Set(downloads[i].Link, &downloads[i])
// }
// }
// offset += len(downloads)
// page++
// if offset >= totalDownloads {
// break
// }
// }
// })
var newTorrents []realdebrid.Torrent
var err error
newTorrents, _, err = t.Api.GetTorrents(0)
if err != nil {
t.log.Fatalf("Cannot get torrents: %v\n", err)
}
t.log.Infof("Fetched %d downloads", t.DownloadCache.Count())
torrentsChan := make(chan *Torrent, len(newTorrents))
var wg sync.WaitGroup
for i := range newTorrents {
wg.Add(1)
idx := i // capture the loop variable
if t.Config.EnableDownloadCache() {
_ = t.workerPool.Submit(func() {
defer wg.Done()
torrentsChan <- t.getMoreInfo(newTorrents[idx])
page := 1
offset := 0
for {
downloads, totalDownloads, err := t.Api.GetDownloads(page, offset)
if err != nil {
t.log.Fatalf("Cannot get downloads: %v\n", err)
}
for i := range downloads {
if !t.DownloadCache.Has(downloads[i].Link) {
t.DownloadCache.Set(downloads[i].Link, &downloads[i])
}
}
offset += len(downloads)
page++
if offset >= totalDownloads {
t.log.Infof("Fetched %d downloads", t.DownloadCache.Count())
break
}
}
})
}
wg.Wait()
close(torrentsChan)
t.log.Infof("Fetched info for %d torrents", len(newTorrents))
noInfoCount := 0
allTorrents, _ := t.DirectoryMap.Get(INT_ALL)
for tor := range torrentsChan {
if tor == nil {
noInfoCount++
continue
}
if torrent, exists := allTorrents.Get(tor.AccessKey); !exists {
allTorrents.Set(tor.AccessKey, tor)
} else {
mainTorrent := t.mergeToMain(torrent, tor)
allTorrents.Set(tor.AccessKey, mainTorrent)
}
}
allTorrents.IterCb(func(_ string, torrent *Torrent) {
dav, html := t.buildTorrentResponses(torrent)
t.AssignedDirectoryCb(torrent, func(directory string) {
torrents, _ := t.DirectoryMap.Get(directory)
torrents.Set(torrent.AccessKey, torrent)
// torrent responses
newHtml := strings.ReplaceAll(html, "$dir", directory)
t.ResponseCache.Set(directory+"/"+torrent.AccessKey+".html", &newHtml, 1)
newDav := strings.ReplaceAll(dav, "$dir", directory)
t.ResponseCache.Set(directory+"/"+torrent.AccessKey+".dav", &newDav, 1)
})
})
t.updateDirectoryResponsesCache()
t.log.Infof("Compiled into %d torrents, %d were missing info", allTorrents.Count(), noInfoCount)
t.SetNewLatestState(t.getCurrentState())
if t.Config.EnableRepair() {
t.log.Info("Checking for torrents to repair")
t.repairAll()
t.log.Info("Finished checking for torrents to repair")
}
t.RefreshTorrents()
_ = t.workerPool.Submit(func() {
t.startRefreshJob()
@@ -161,16 +106,21 @@ func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, p
}
func (t *TorrentManager) mergeToMain(mainTorrent, torrentToMerge *Torrent) *Torrent {
// Merge SelectedFiles - itercb accesses a different copy of the selectedfiles map
// the link can have the following values
// 1. https://*** - the file is available
// 2. repair - the file is available but we need to repair it
// 3. repairing - the file is being repaired
// 4. unselect - the file is deleted
torrentToMerge.SelectedFiles.IterCb(func(filepath string, fileToMerge *File) {
// see if it already exists in the main torrent
if _, ok := mainTorrent.SelectedFiles.Get(filepath); !ok {
if originalFile, ok := mainTorrent.SelectedFiles.Get(filepath); !ok || fileToMerge.Link == "unselect" {
// if it doesn't exist in the main torrent, add it
mainTorrent.SelectedFiles.Set(filepath, fileToMerge)
} else {
// if it exists, compare the LatestAdded property and the link
} else if originalFile.Link != "unselect" {
if mainTorrent.LatestAdded < torrentToMerge.LatestAdded && strings.HasPrefix(fileToMerge.Link, "http") {
// if it exists, compare the LatestAdded property and the link
// if torrentToMerge is more recent and its file has a link, update the main torrent's file
// unless it's removed
mainTorrent.SelectedFiles.Set(filepath, fileToMerge)
}
// else do nothing, the main torrent's file is more recent or has a valid link
@@ -190,13 +140,13 @@ func (t *TorrentManager) mergeToMain(mainTorrent, torrentToMerge *Torrent) *Torr
// proxy
func (t *TorrentManager) UnrestrictUntilOk(link string) *realdebrid.Download {
t.log.Debugf("Unrestricting %s", link)
retChan := make(chan *realdebrid.Download, 1)
defer close(retChan)
t.workerPool.Submit(func() {
retChan <- t.Api.UnrestrictUntilOk(link, t.Config.ShouldServeFromRclone())
})
defer close(retChan)
return <-retChan
// return t.api.UnrestrictUntilOk(link, t.cfg.ShouldServeFromRclone())
}
func (t *TorrentManager) SetNewLatestState(checksum LibraryState) {
@@ -219,6 +169,9 @@ func (t *TorrentManager) getCurrentState() LibraryState {
torrentsChan := make(chan torrentsResp, 1)
countChan := make(chan int, 1)
errChan := make(chan error, 2) // accommodate errors from both goroutines
defer close(torrentsChan)
defer close(countChan)
defer close(errChan)
_ = t.workerPool.Submit(func() {
torrents, totalCount, err := t.Api.GetTorrents(1)
@@ -276,94 +229,78 @@ func (t *TorrentManager) startRefreshJob() {
if t.latestState.equal(checksum) {
continue
}
t.log.Infof("Detected changes! Refreshing %d torrents", checksum.TotalCount)
newTorrents, _, err := t.Api.GetTorrents(0)
t.RefreshTorrents()
t.log.Info("Finished refreshing torrents")
}
}
func (t *TorrentManager) RefreshTorrents() {
instances, _, err := t.Api.GetTorrents(0)
if err != nil {
t.log.Warnf("Cannot get torrents: %v\n", err)
continue
return
}
t.log.Infof("Detected changes! Refreshing %d torrents", len(newTorrents))
instanceCount := len(instances)
// todo: inefficient
// handle deleted torrents in info cache
keep := make(map[string]bool)
for _, torrent := range newTorrents {
keep[torrent.ID] = true
}
var toDelete []string
// but only do it if there is an info cache filled
infoCache, _ := t.DirectoryMap.Get(INT_INFO_CACHE)
infoCache.IterCb(func(torrentID string, torrent *Torrent) {
if _, ok := keep[torrentID]; !ok {
toDelete = append(toDelete, torrentID)
if infoCache.Count() > 0 {
t.cleanInfoCache(instances)
}
})
for _, torrentID := range toDelete {
infoCache.Remove(torrentID)
}
// end info cache cleanup
torrentsChan := make(chan *Torrent, len(newTorrents))
infoChan := make(chan *Torrent, instanceCount)
var wg sync.WaitGroup
for i := range newTorrents {
for i := range instances {
wg.Add(1)
idx := i // capture the loop variable
_ = t.workerPool.Submit(func() {
defer wg.Done()
torrentsChan <- t.getMoreInfo(newTorrents[idx])
infoChan <- t.getMoreInfo(instances[idx])
})
}
wg.Wait()
close(torrentsChan)
t.log.Infof("Fetched info for %d torrents", len(newTorrents))
close(infoChan)
t.log.Infof("Fetched info for %d torrents", instanceCount)
noInfoCount := 0
freshKeys := set.NewStringSet()
oldTorrents, _ := t.DirectoryMap.Get(INT_ALL)
newSet := cmap.New[*Torrent]()
for info := range torrentsChan {
noInfoCount := 0
for info := range infoChan {
if info == nil {
noInfoCount++
continue
}
freshKeys.Add(info.AccessKey)
if torrent, exists := oldTorrents.Get(info.AccessKey); !exists {
oldTorrents.Set(info.AccessKey, info)
newSet.Set(info.AccessKey, info)
} else {
mainTorrent := t.mergeToMain(torrent, info)
oldTorrents.Set(info.AccessKey, mainTorrent)
newSet.Set(info.AccessKey, mainTorrent)
}
}
var updatedPaths []string
newSet.IterCb(func(_ string, torrent *Torrent) {
dav, html := t.buildTorrentResponses(torrent)
t.AssignedDirectoryCb(torrent, func(directory string) {
torrents, _ := t.DirectoryMap.Get(directory)
torrents.Set(torrent.AccessKey, torrent)
if torrent.LatestAdded > t.latestState.FirstTorrent.Added {
updatedPaths = append(updatedPaths, fmt.Sprintf("%s/%s", directory, torrent.AccessKey))
}
// torrent responses
cacheKey := directory + "/" + torrent.AccessKey
newHtml := strings.ReplaceAll(html, "$dir", directory)
t.ResponseCache.Set(cacheKey+".html", &newHtml, 1)
newDav := strings.ReplaceAll(dav, "$dir", directory)
t.ResponseCache.Set(cacheKey+".dav", &newDav, 1)
})
})
// delete torrents that no longer exist
oldAccessKeys := oldTorrents.Keys()
for _, oldAccessKey := range oldAccessKeys {
if _, ok := newSet.Get(oldAccessKey); !ok {
// removed
strset.Difference(t.accessKeySet, freshKeys).Each(func(accessKey string) bool {
t.DirectoryMap.IterCb(func(_ string, torrents cmap.ConcurrentMap[string, *Torrent]) {
torrents.Remove(oldAccessKey)
torrents.Remove(accessKey)
})
t.log.Infof("Deleted torrent: %s\n", oldAccessKey)
}
}
t.updateDirectoryResponsesCache()
t.log.Infof("Deleted torrent: %s\n", accessKey)
return true
})
// new
strset.Difference(freshKeys, t.accessKeySet).Each(func(accessKey string) bool {
torrent, _ := oldTorrents.Get(accessKey)
t.UpdateTorrentResponseCache(torrent)
t.accessKeySet.Add(accessKey)
return true
})
// now we can build the directory responses
t.checkForOtherDeletedTorrents()
t.UpdateDirectoryResponsesCache()
t.log.Infof("Compiled into %d torrents, %d were missing info", oldTorrents.Count(), noInfoCount)
t.SetNewLatestState(t.getCurrentState())
@@ -375,11 +312,28 @@ func (t *TorrentManager) startRefreshJob() {
} else {
t.log.Info("Repair is disabled, skipping repair check")
}
_ = t.workerPool.Submit(func() {
OnLibraryUpdateHook(updatedPaths, t.Config, t.log)
})
t.log.Info("Finished refreshing torrents")
// todo: work on hook
// _ = t.workerPool.Submit(func() {
// OnLibraryUpdateHook(updatedPaths, t.Config, t.log)
// })
}
func (t *TorrentManager) cleanInfoCache(torrents []realdebrid.Torrent) {
infoCache, _ := t.DirectoryMap.Get(INT_INFO_CACHE)
keep := make(map[string]bool)
for _, torrent := range torrents {
keep[torrent.ID] = true
}
var toDelete []string
infoCache.IterCb(func(torrentID string, torrent *Torrent) {
if _, ok := keep[torrentID]; !ok {
toDelete = append(toDelete, torrentID)
}
})
for _, torrentID := range toDelete {
infoCache.Remove(torrentID)
t.deleteTorrentFile(torrentID)
}
}
@@ -484,6 +438,7 @@ func (t *TorrentManager) writeTorrentToFile(torrent *realdebrid.TorrentInfo) err
if err := dataEncoder.Encode(torrent); err != nil {
return fmt.Errorf("failed encoding torrent: %w", err)
}
t.log.Debugf("Saved torrent %s to file", torrent.ID)
return nil
}
@@ -510,6 +465,14 @@ func (t *TorrentManager) readTorrentFromFile(torrentID string) *realdebrid.Torre
return &torrent
}
func (t *TorrentManager) deleteTorrentFile(torrentID string) {
filePath := "data/" + torrentID + ".bin"
err := os.Remove(filePath)
if err != nil {
t.log.Warnf("Cannot delete file %s: %v", filePath, err)
}
}
func (t *TorrentManager) organizeChaos(links []string, selectedFiles []*File) ([]*File, bool) {
type Result struct {
Response *realdebrid.Download
@@ -537,7 +500,7 @@ func (t *TorrentManager) organizeChaos(links []string, selectedFiles []*File) ([
wg.Wait()
close(resultsChan)
isChaotic := false
chaoticFileCount := 0
for result := range resultsChan {
if result.Response == nil {
continue
@@ -564,22 +527,22 @@ func (t *TorrentManager) organizeChaos(links []string, selectedFiles []*File) ([
Link: result.Response.Link,
})
} else {
isChaotic = true
chaoticFileCount++
}
}
}
return selectedFiles, isChaotic
return selectedFiles, chaoticFileCount == len(links)
}
func (t *TorrentManager) repairAll() {
proceed := t.canCapacityHandle() // blocks for approx 45 minutes if active torrents are full
if !proceed {
t.log.Error("Reached the max number of active torrents, cannot start repair")
// TODO delete oldest in progress torrent
return
}
var toDelete []string
allTorrents, _ := t.DirectoryMap.Get(INT_ALL)
allTorrents.IterCb(func(_ string, torrent *Torrent) {
if torrent.AnyInProgress() {
@@ -587,46 +550,65 @@ func (t *TorrentManager) repairAll() {
return
}
forRepair := false
unselected := 0
torrent.SelectedFiles.IterCb(func(_ string, file *File) {
if file.Link == "repair" && !forRepair {
file.Link = "repairing"
t.log.Debugf("Found a file to repair for torrent %s", torrent.AccessKey)
forRepair = true
}
if file.Link == "unselect" {
unselected++
}
})
if forRepair {
t.log.Infof("Repairing %s", torrent.AccessKey)
t.Repair(torrent.AccessKey)
}
})
}
func (t *TorrentManager) checkForOtherDeletedTorrents() {
var toDelete []string
allTorrents, _ := t.DirectoryMap.Get(INT_ALL)
allTorrents.IterCb(func(_ string, torrent *Torrent) {
unselected := 0
torrent.SelectedFiles.IterCb(func(_ string, file *File) {
if file.Link == "unselect" {
unselected++
}
})
if unselected == torrent.SelectedFiles.Count() && unselected > 0 {
t.log.Infof("Deleting %s", torrent.AccessKey)
toDelete = append(toDelete, torrent.AccessKey)
} else {
// save to file
for i := range torrent.Instances {
t.writeTorrentToFile(&torrent.Instances[i])
}
}
})
for _, accessKey := range toDelete {
t.Delete(accessKey)
t.Delete(accessKey, true, false)
}
}
func (t *TorrentManager) Delete(accessKey string) {
infoCache, _ := t.DirectoryMap.Get(INT_INFO_CACHE)
func (t *TorrentManager) Delete(accessKey string, deleteInRD bool, updateDirectoryResponses bool) {
t.log.Infof("Deleting torrent %s", accessKey)
if deleteInRD {
allTorrents, _ := t.DirectoryMap.Get(INT_ALL)
if torrent, ok := allTorrents.Get(accessKey); ok {
for _, instance := range torrent.Instances {
infoCache.Remove(instance.ID)
t.Api.DeleteTorrent(instance.ID)
}
}
t.DirectoryMap.IterCb(func(_ string, torrents cmap.ConcurrentMap[string, *Torrent]) {
if _, ok := torrents.Get(accessKey); ok {
}
t.DirectoryMap.IterCb(func(directory string, torrents cmap.ConcurrentMap[string, *Torrent]) {
if ok := torrents.Has(accessKey); ok {
torrents.Remove(accessKey)
pathKey := fmt.Sprintf("%s/%s", directory, accessKey)
t.ResponseCache.Del(pathKey)
}
})
if updateDirectoryResponses {
t.UpdateDirectoryResponsesCache()
}
}
func (t *TorrentManager) Repair(accessKey string) {
@@ -675,11 +657,7 @@ func (t *TorrentManager) Repair(accessKey string) {
selectedFiles, isChaotic = t.organizeChaos(links, selectedFiles)
if isChaotic {
t.log.Warnf("Torrent %s is always returning an unplayable rar file (it will no longer show up in your directories, zurg suggests you delete it)", torrent.AccessKey)
t.DirectoryMap.IterCb(func(_ string, torrents cmap.ConcurrentMap[string, *Torrent]) {
torrents.Remove(torrent.AccessKey)
})
t.ScheduleForRefresh()
// t.log.Debugf("You can try fixing it yourself magnet:?xt=urn:btih:%s", info.Hash)
t.Delete(torrent.AccessKey, false, true)
return
} else if streamableCount == 1 {
t.log.Warnf("Torrent %s only file has expired (it will no longer show up in your directories, zurg suggests you delete it)", torrent.AccessKey)
@@ -687,7 +665,7 @@ func (t *TorrentManager) Repair(accessKey string) {
t.DirectoryMap.IterCb(func(_ string, torrents cmap.ConcurrentMap[string, *Torrent]) {
torrents.Remove(torrent.AccessKey)
})
t.ScheduleForRefresh()
t.Delete(torrent.AccessKey, false, true)
return
}
// t.log.Debugf("Identified the expired files of torrent id=%s", info.ID)
@@ -857,7 +835,21 @@ func (t *TorrentManager) canCapacityHandle() bool {
}
}
func (t *TorrentManager) updateDirectoryResponsesCache() {
func (t *TorrentManager) UpdateTorrentResponseCache(torrent *Torrent) {
dav, html := t.buildTorrentResponses(torrent)
t.AssignedDirectoryCb(torrent, func(directory string) {
torrents, _ := t.DirectoryMap.Get(directory)
torrents.Set(torrent.AccessKey, torrent)
pathKey := fmt.Sprintf("%s/%s", directory, torrent.AccessKey)
// torrent responses
newHtml := strings.ReplaceAll(html, "$dir", directory)
t.ResponseCache.Set(pathKey+".html", &newHtml, 1)
newDav := strings.ReplaceAll(dav, "$dir", directory)
t.ResponseCache.Set(pathKey+".dav", &newDav, 1)
})
}
func (t *TorrentManager) UpdateDirectoryResponsesCache() {
t.DirectoryMap.IterCb(func(directory string, torrents cmap.ConcurrentMap[string, *Torrent]) {
allKeys := torrents.Keys()
sort.Strings(allKeys)

View File

@@ -68,10 +68,7 @@ func (gf *GetFile) HandleGetRequest(directory, torrentName, fileName string, res
if unrestrict == nil {
// log.Warnf("File %s is no longer available, link %s", filepath.Base(file.Path), link)
file.Link = "repair"
if cfg.EnableRepair() {
// log.Debugf("File %s is marked for repair", filepath.Base(file.Path))
torMgr.ScheduleForRefresh() // force a recheck
}
torMgr.UpdateTorrentResponseCache(torrent)
http.Error(resp, "File is not available", http.StatusNotFound)
return
} else {
@@ -92,7 +89,7 @@ func (gf *GetFile) HandleGetRequest(directory, torrentName, fileName string, res
if cfg.ShouldServeFromRclone() {
redirect(resp, req, unrestrict.Download, cfg)
} else {
gf.streamFileToResponse(file, unrestrict.Download, resp, req, torMgr, cfg, log)
gf.streamFileToResponse(torrent, file, unrestrict.Download, resp, req, torMgr, cfg, log)
}
return
}
@@ -131,7 +128,7 @@ func (gf *GetFile) streamCachedLinkToResponse(url string, resp http.ResponseWrit
return nil
}
func (gf *GetFile) streamFileToResponse(file *intTor.File, url string, resp http.ResponseWriter, req *http.Request, torMgr *intTor.TorrentManager, cfg config.ConfigInterface, log *zap.SugaredLogger) {
func (gf *GetFile) streamFileToResponse(torrent *intTor.Torrent, file *intTor.File, url string, resp http.ResponseWriter, req *http.Request, torMgr *intTor.TorrentManager, cfg config.ConfigInterface, log *zap.SugaredLogger) {
// Create a new request for the file download.
dlReq, err := http.NewRequest(http.MethodGet, url, nil)
if err != nil {
@@ -152,10 +149,7 @@ func (gf *GetFile) streamFileToResponse(file *intTor.File, url string, resp http
if file != nil {
log.Warnf("Cannot download file %s: %v", file.Path, err)
file.Link = "repair"
if cfg.EnableRepair() {
// log.Debugf("File %s is marked for repair", filepath.Base(file.Path))
torMgr.ScheduleForRefresh() // force a recheck
}
torMgr.UpdateTorrentResponseCache(torrent)
}
http.Error(resp, "File is not available", http.StatusNotFound)
return
@@ -166,10 +160,7 @@ func (gf *GetFile) streamFileToResponse(file *intTor.File, url string, resp http
if file != nil {
log.Warnf("Received a %s status code for file %s", download.Status, file.Path)
file.Link = "repair"
if cfg.EnableRepair() {
// log.Debugf("File %s is marked for repair", filepath.Base(file.Path))
torMgr.ScheduleForRefresh() // force a recheck
}
torMgr.UpdateTorrentResponseCache(torrent)
}
http.Error(resp, "File is not available", http.StatusNotFound)
return

View File

@@ -69,6 +69,7 @@ func RunTest() {
}
wg.Wait()
close(semaphore)
close(infoChan)
fmt.Printf("Network test complete.\n\n")