diff --git a/go.mod b/go.mod index 4de5e7c..7072d5f 100644 --- a/go.mod +++ b/go.mod @@ -11,7 +11,10 @@ require github.com/orcaman/concurrent-map/v2 v2.0.1 require github.com/panjf2000/ants/v2 v2.8.2 -require github.com/julienschmidt/httprouter v1.3.0 // indirect +require ( + github.com/julienschmidt/httprouter v1.3.0 // indirect + github.com/scylladb/go-set v1.0.2 // indirect +) require ( github.com/cespare/xxhash/v2 v2.1.1 // indirect diff --git a/go.sum b/go.sum index 8953f85..644f9eb 100644 --- a/go.sum +++ b/go.sum @@ -10,6 +10,7 @@ github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczC github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= +github.com/fatih/set v0.2.1/go.mod h1:+RKtMCH+favT2+3YecHGxcc0b4KyVWA1QWWJUs4E0CI= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= @@ -25,6 +26,8 @@ github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= +github.com/scylladb/go-set v1.0.2 h1:SkvlMCKhP0wyyct6j+0IHJkBkSZL+TDzZ4E7f7BCcRE= +github.com/scylladb/go-set v1.0.2/go.mod h1:DkpGd78rljTxKAnTDPFqXSGxvETQnJyuSOQwsHycqfs= github.com/spf13/cobra v1.8.0 h1:7aJaZx1B85qltLMc546zn58BxxfZdR/W22ej9CFoEf0= github.com/spf13/cobra v1.8.0/go.mod h1:WXLWApfZ71AjXPya3WOlMsY9yMs7YeiHhFVlvLyhcho= github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= diff --git a/internal/config/types.go b/internal/config/types.go index 8ce122f..869e8ab 100644 --- a/internal/config/types.go +++ b/internal/config/types.go @@ -23,6 +23,7 @@ type ConfigInterface interface { GetRateLimitSleepSeconds() int GetRealDebridTimeout() int GetRetriesUntilFailed() int + EnableDownloadCache() bool } type ZurgConfig struct { @@ -44,6 +45,7 @@ type ZurgConfig struct { ForceIPv6 bool `yaml:"force_ipv6"` RealDebridTimeout int `yaml:"realdebrid_timeout_secs"` RetriesUntilFailed int `yaml:"retries_until_failed"` + UseDownloadCache bool `yaml:"use_download_cache"` } func (z *ZurgConfig) GetToken() string { @@ -144,3 +146,7 @@ func (z *ZurgConfig) GetRetriesUntilFailed() int { } return z.RetriesUntilFailed } + +func (z *ZurgConfig) EnableDownloadCache() bool { + return z.UseDownloadCache +} diff --git a/internal/dav/delete.go b/internal/dav/delete.go index c1e834e..d5ac49b 100644 --- a/internal/dav/delete.go +++ b/internal/dav/delete.go @@ -14,7 +14,7 @@ func HandleDeleteTorrent(directory, torrentName string, t *torrent.TorrentManage if !torrents.Has(torrentName) { return fmt.Errorf("cannot find torrent %s", torrentName) } - t.Delete(torrentName) + t.Delete(torrentName, true, true) return nil } @@ -23,15 +23,15 @@ func HandleDeleteFile(directory, torrentName, fileName string, t *torrent.Torren if !ok { return fmt.Errorf("cannot find directory %s", directory) } - tor, ok := torrents.Get(torrentName) + torrent, ok := torrents.Get(torrentName) if !ok { return fmt.Errorf("cannot find torrent %s", torrentName) } - file, ok := tor.SelectedFiles.Get(fileName) + file, ok := torrent.SelectedFiles.Get(fileName) if !ok { return fmt.Errorf("cannot find file %s", fileName) } file.Link = "unselect" - t.ScheduleForRefresh() + t.UpdateTorrentResponseCache(torrent) return nil } diff --git a/internal/torrent/manager.go b/internal/torrent/manager.go index d2791b6..4cabe16 100644 --- a/internal/torrent/manager.go +++ b/internal/torrent/manager.go @@ -20,6 +20,8 @@ import ( "github.com/dgraph-io/ristretto" cmap "github.com/orcaman/concurrent-map/v2" "github.com/panjf2000/ants/v2" + "github.com/scylladb/go-set" + "github.com/scylladb/go-set/strset" "go.uber.org/zap" ) @@ -34,6 +36,7 @@ type TorrentManager struct { DirectoryMap cmap.ConcurrentMap[string, cmap.ConcurrentMap[string, *Torrent]] // directory -> accessKey -> Torrent DownloadCache cmap.ConcurrentMap[string, *realdebrid.Download] ResponseCache *ristretto.Cache + accessKeySet *strset.Set latestState *LibraryState requiredVersion string workerPool *ants.Pool @@ -45,12 +48,11 @@ type TorrentManager struct { // and store them in-memory and cached in files func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, p *ants.Pool, cache *ristretto.Cache, log *zap.SugaredLogger) *TorrentManager { initialSate := EmptyState() - t := &TorrentManager{ Config: cfg, Api: api, - DirectoryMap: cmap.New[cmap.ConcurrentMap[string, *Torrent]](), ResponseCache: cache, + accessKeySet: set.NewStringSet(), latestState: &initialSate, requiredVersion: "18.11.2023", workerPool: p, @@ -58,6 +60,7 @@ func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, p } // create internal directories + t.DirectoryMap = cmap.New[cmap.ConcurrentMap[string, *Torrent]]() t.DirectoryMap.Set(INT_ALL, cmap.New[*Torrent]()) // key is AccessKey t.DirectoryMap.Set(INT_INFO_CACHE, cmap.New[*Torrent]()) // key is Torrent ID // create directory maps @@ -67,89 +70,31 @@ func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, p // Fetch downloads t.DownloadCache = cmap.New[*realdebrid.Download]() - // _ = t.workerPool.Submit(func() { - // page := 1 - // offset := 0 - // for { - // downloads, totalDownloads, err := t.Api.GetDownloads(page, offset) - // if err != nil { - // t.log.Fatalf("Cannot get downloads: %v\n", err) - // } - // for i := range downloads { - // if !t.DownloadCache.Has(downloads[i].Link) { - // t.DownloadCache.Set(downloads[i].Link, &downloads[i]) - // } - // } - // offset += len(downloads) - // page++ - // if offset >= totalDownloads { - // break - // } - // } - // }) - - var newTorrents []realdebrid.Torrent - var err error - newTorrents, _, err = t.Api.GetTorrents(0) - if err != nil { - t.log.Fatalf("Cannot get torrents: %v\n", err) - } - - t.log.Infof("Fetched %d downloads", t.DownloadCache.Count()) - - torrentsChan := make(chan *Torrent, len(newTorrents)) - var wg sync.WaitGroup - for i := range newTorrents { - wg.Add(1) - idx := i // capture the loop variable + if t.Config.EnableDownloadCache() { _ = t.workerPool.Submit(func() { - defer wg.Done() - torrentsChan <- t.getMoreInfo(newTorrents[idx]) + page := 1 + offset := 0 + for { + downloads, totalDownloads, err := t.Api.GetDownloads(page, offset) + if err != nil { + t.log.Fatalf("Cannot get downloads: %v\n", err) + } + for i := range downloads { + if !t.DownloadCache.Has(downloads[i].Link) { + t.DownloadCache.Set(downloads[i].Link, &downloads[i]) + } + } + offset += len(downloads) + page++ + if offset >= totalDownloads { + t.log.Infof("Fetched %d downloads", t.DownloadCache.Count()) + break + } + } }) } - wg.Wait() - close(torrentsChan) - t.log.Infof("Fetched info for %d torrents", len(newTorrents)) - noInfoCount := 0 - allTorrents, _ := t.DirectoryMap.Get(INT_ALL) - for tor := range torrentsChan { - if tor == nil { - noInfoCount++ - continue - } - if torrent, exists := allTorrents.Get(tor.AccessKey); !exists { - allTorrents.Set(tor.AccessKey, tor) - } else { - mainTorrent := t.mergeToMain(torrent, tor) - allTorrents.Set(tor.AccessKey, mainTorrent) - } - } - - allTorrents.IterCb(func(_ string, torrent *Torrent) { - dav, html := t.buildTorrentResponses(torrent) - t.AssignedDirectoryCb(torrent, func(directory string) { - torrents, _ := t.DirectoryMap.Get(directory) - torrents.Set(torrent.AccessKey, torrent) - // torrent responses - newHtml := strings.ReplaceAll(html, "$dir", directory) - t.ResponseCache.Set(directory+"/"+torrent.AccessKey+".html", &newHtml, 1) - newDav := strings.ReplaceAll(dav, "$dir", directory) - t.ResponseCache.Set(directory+"/"+torrent.AccessKey+".dav", &newDav, 1) - }) - }) - - t.updateDirectoryResponsesCache() - - t.log.Infof("Compiled into %d torrents, %d were missing info", allTorrents.Count(), noInfoCount) - - t.SetNewLatestState(t.getCurrentState()) - - if t.Config.EnableRepair() { - t.log.Info("Checking for torrents to repair") - t.repairAll() - t.log.Info("Finished checking for torrents to repair") - } + t.RefreshTorrents() _ = t.workerPool.Submit(func() { t.startRefreshJob() @@ -161,16 +106,21 @@ func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, p } func (t *TorrentManager) mergeToMain(mainTorrent, torrentToMerge *Torrent) *Torrent { - // Merge SelectedFiles - itercb accesses a different copy of the selectedfiles map + // the link can have the following values + // 1. https://*** - the file is available + // 2. repair - the file is available but we need to repair it + // 3. repairing - the file is being repaired + // 4. unselect - the file is deleted torrentToMerge.SelectedFiles.IterCb(func(filepath string, fileToMerge *File) { // see if it already exists in the main torrent - if _, ok := mainTorrent.SelectedFiles.Get(filepath); !ok { + if originalFile, ok := mainTorrent.SelectedFiles.Get(filepath); !ok || fileToMerge.Link == "unselect" { // if it doesn't exist in the main torrent, add it mainTorrent.SelectedFiles.Set(filepath, fileToMerge) - } else { - // if it exists, compare the LatestAdded property and the link + } else if originalFile.Link != "unselect" { if mainTorrent.LatestAdded < torrentToMerge.LatestAdded && strings.HasPrefix(fileToMerge.Link, "http") { + // if it exists, compare the LatestAdded property and the link // if torrentToMerge is more recent and its file has a link, update the main torrent's file + // unless it's removed mainTorrent.SelectedFiles.Set(filepath, fileToMerge) } // else do nothing, the main torrent's file is more recent or has a valid link @@ -190,13 +140,13 @@ func (t *TorrentManager) mergeToMain(mainTorrent, torrentToMerge *Torrent) *Torr // proxy func (t *TorrentManager) UnrestrictUntilOk(link string) *realdebrid.Download { + t.log.Debugf("Unrestricting %s", link) retChan := make(chan *realdebrid.Download, 1) + defer close(retChan) t.workerPool.Submit(func() { retChan <- t.Api.UnrestrictUntilOk(link, t.Config.ShouldServeFromRclone()) }) - defer close(retChan) return <-retChan - // return t.api.UnrestrictUntilOk(link, t.cfg.ShouldServeFromRclone()) } func (t *TorrentManager) SetNewLatestState(checksum LibraryState) { @@ -219,6 +169,9 @@ func (t *TorrentManager) getCurrentState() LibraryState { torrentsChan := make(chan torrentsResp, 1) countChan := make(chan int, 1) errChan := make(chan error, 2) // accommodate errors from both goroutines + defer close(torrentsChan) + defer close(countChan) + defer close(errChan) _ = t.workerPool.Submit(func() { torrents, totalCount, err := t.Api.GetTorrents(1) @@ -276,113 +229,114 @@ func (t *TorrentManager) startRefreshJob() { if t.latestState.equal(checksum) { continue } + t.log.Infof("Detected changes! Refreshing %d torrents", checksum.TotalCount) - newTorrents, _, err := t.Api.GetTorrents(0) - if err != nil { - t.log.Warnf("Cannot get torrents: %v\n", err) - continue - } - t.log.Infof("Detected changes! Refreshing %d torrents", len(newTorrents)) - - // handle deleted torrents in info cache - keep := make(map[string]bool) - for _, torrent := range newTorrents { - keep[torrent.ID] = true - } - var toDelete []string - infoCache, _ := t.DirectoryMap.Get(INT_INFO_CACHE) - infoCache.IterCb(func(torrentID string, torrent *Torrent) { - if _, ok := keep[torrentID]; !ok { - toDelete = append(toDelete, torrentID) - } - }) - for _, torrentID := range toDelete { - infoCache.Remove(torrentID) - } - // end info cache cleanup - - torrentsChan := make(chan *Torrent, len(newTorrents)) - var wg sync.WaitGroup - for i := range newTorrents { - wg.Add(1) - idx := i // capture the loop variable - _ = t.workerPool.Submit(func() { - defer wg.Done() - torrentsChan <- t.getMoreInfo(newTorrents[idx]) - }) - } - wg.Wait() - close(torrentsChan) - t.log.Infof("Fetched info for %d torrents", len(newTorrents)) - - noInfoCount := 0 - oldTorrents, _ := t.DirectoryMap.Get(INT_ALL) - newSet := cmap.New[*Torrent]() - for info := range torrentsChan { - if info == nil { - noInfoCount++ - continue - } - if torrent, exists := oldTorrents.Get(info.AccessKey); !exists { - oldTorrents.Set(info.AccessKey, info) - newSet.Set(info.AccessKey, info) - } else { - mainTorrent := t.mergeToMain(torrent, info) - oldTorrents.Set(info.AccessKey, mainTorrent) - newSet.Set(info.AccessKey, mainTorrent) - } - } - - var updatedPaths []string - - newSet.IterCb(func(_ string, torrent *Torrent) { - dav, html := t.buildTorrentResponses(torrent) - t.AssignedDirectoryCb(torrent, func(directory string) { - torrents, _ := t.DirectoryMap.Get(directory) - torrents.Set(torrent.AccessKey, torrent) - if torrent.LatestAdded > t.latestState.FirstTorrent.Added { - updatedPaths = append(updatedPaths, fmt.Sprintf("%s/%s", directory, torrent.AccessKey)) - } - // torrent responses - cacheKey := directory + "/" + torrent.AccessKey - newHtml := strings.ReplaceAll(html, "$dir", directory) - t.ResponseCache.Set(cacheKey+".html", &newHtml, 1) - newDav := strings.ReplaceAll(dav, "$dir", directory) - t.ResponseCache.Set(cacheKey+".dav", &newDav, 1) - }) - }) - - // delete torrents that no longer exist - oldAccessKeys := oldTorrents.Keys() - for _, oldAccessKey := range oldAccessKeys { - if _, ok := newSet.Get(oldAccessKey); !ok { - t.DirectoryMap.IterCb(func(_ string, torrents cmap.ConcurrentMap[string, *Torrent]) { - torrents.Remove(oldAccessKey) - }) - t.log.Infof("Deleted torrent: %s\n", oldAccessKey) - } - } - t.updateDirectoryResponsesCache() - - t.log.Infof("Compiled into %d torrents, %d were missing info", oldTorrents.Count(), noInfoCount) - - t.SetNewLatestState(t.getCurrentState()) - - if t.Config.EnableRepair() { - t.log.Info("Checking for torrents to repair") - t.repairAll() - t.log.Info("Finished checking for torrents to repair") - } else { - t.log.Info("Repair is disabled, skipping repair check") - } - _ = t.workerPool.Submit(func() { - OnLibraryUpdateHook(updatedPaths, t.Config, t.log) - }) + t.RefreshTorrents() t.log.Info("Finished refreshing torrents") } } +func (t *TorrentManager) RefreshTorrents() { + instances, _, err := t.Api.GetTorrents(0) + if err != nil { + t.log.Warnf("Cannot get torrents: %v\n", err) + return + } + instanceCount := len(instances) + + // todo: inefficient + // handle deleted torrents in info cache + // but only do it if there is an info cache filled + infoCache, _ := t.DirectoryMap.Get(INT_INFO_CACHE) + if infoCache.Count() > 0 { + t.cleanInfoCache(instances) + } + + infoChan := make(chan *Torrent, instanceCount) + var wg sync.WaitGroup + for i := range instances { + wg.Add(1) + idx := i // capture the loop variable + _ = t.workerPool.Submit(func() { + defer wg.Done() + infoChan <- t.getMoreInfo(instances[idx]) + }) + } + wg.Wait() + close(infoChan) + t.log.Infof("Fetched info for %d torrents", instanceCount) + + freshKeys := set.NewStringSet() + oldTorrents, _ := t.DirectoryMap.Get(INT_ALL) + noInfoCount := 0 + for info := range infoChan { + if info == nil { + noInfoCount++ + continue + } + freshKeys.Add(info.AccessKey) + if torrent, exists := oldTorrents.Get(info.AccessKey); !exists { + oldTorrents.Set(info.AccessKey, info) + } else { + mainTorrent := t.mergeToMain(torrent, info) + oldTorrents.Set(info.AccessKey, mainTorrent) + } + } + // removed + strset.Difference(t.accessKeySet, freshKeys).Each(func(accessKey string) bool { + t.DirectoryMap.IterCb(func(_ string, torrents cmap.ConcurrentMap[string, *Torrent]) { + torrents.Remove(accessKey) + }) + t.log.Infof("Deleted torrent: %s\n", accessKey) + return true + }) + // new + strset.Difference(freshKeys, t.accessKeySet).Each(func(accessKey string) bool { + torrent, _ := oldTorrents.Get(accessKey) + t.UpdateTorrentResponseCache(torrent) + t.accessKeySet.Add(accessKey) + return true + }) + // now we can build the directory responses + t.checkForOtherDeletedTorrents() + t.UpdateDirectoryResponsesCache() + t.log.Infof("Compiled into %d torrents, %d were missing info", oldTorrents.Count(), noInfoCount) + + t.SetNewLatestState(t.getCurrentState()) + + if t.Config.EnableRepair() { + t.log.Info("Checking for torrents to repair") + t.repairAll() + t.log.Info("Finished checking for torrents to repair") + } else { + t.log.Info("Repair is disabled, skipping repair check") + } + + // todo: work on hook + // _ = t.workerPool.Submit(func() { + // OnLibraryUpdateHook(updatedPaths, t.Config, t.log) + // }) +} + +func (t *TorrentManager) cleanInfoCache(torrents []realdebrid.Torrent) { + infoCache, _ := t.DirectoryMap.Get(INT_INFO_CACHE) + keep := make(map[string]bool) + for _, torrent := range torrents { + keep[torrent.ID] = true + } + var toDelete []string + infoCache.IterCb(func(torrentID string, torrent *Torrent) { + if _, ok := keep[torrentID]; !ok { + toDelete = append(toDelete, torrentID) + } + }) + for _, torrentID := range toDelete { + infoCache.Remove(torrentID) + t.deleteTorrentFile(torrentID) + } +} + // getMoreInfo gets original name, size and files for a torrent func (t *TorrentManager) getMoreInfo(rdTorrent realdebrid.Torrent) *Torrent { infoCache, _ := t.DirectoryMap.Get(INT_INFO_CACHE) @@ -484,6 +438,7 @@ func (t *TorrentManager) writeTorrentToFile(torrent *realdebrid.TorrentInfo) err if err := dataEncoder.Encode(torrent); err != nil { return fmt.Errorf("failed encoding torrent: %w", err) } + t.log.Debugf("Saved torrent %s to file", torrent.ID) return nil } @@ -510,6 +465,14 @@ func (t *TorrentManager) readTorrentFromFile(torrentID string) *realdebrid.Torre return &torrent } +func (t *TorrentManager) deleteTorrentFile(torrentID string) { + filePath := "data/" + torrentID + ".bin" + err := os.Remove(filePath) + if err != nil { + t.log.Warnf("Cannot delete file %s: %v", filePath, err) + } +} + func (t *TorrentManager) organizeChaos(links []string, selectedFiles []*File) ([]*File, bool) { type Result struct { Response *realdebrid.Download @@ -537,7 +500,7 @@ func (t *TorrentManager) organizeChaos(links []string, selectedFiles []*File) ([ wg.Wait() close(resultsChan) - isChaotic := false + chaoticFileCount := 0 for result := range resultsChan { if result.Response == nil { continue @@ -564,22 +527,22 @@ func (t *TorrentManager) organizeChaos(links []string, selectedFiles []*File) ([ Link: result.Response.Link, }) } else { - isChaotic = true + chaoticFileCount++ } } } - return selectedFiles, isChaotic + return selectedFiles, chaoticFileCount == len(links) } func (t *TorrentManager) repairAll() { proceed := t.canCapacityHandle() // blocks for approx 45 minutes if active torrents are full if !proceed { t.log.Error("Reached the max number of active torrents, cannot start repair") + // TODO delete oldest in progress torrent return } - var toDelete []string allTorrents, _ := t.DirectoryMap.Get(INT_ALL) allTorrents.IterCb(func(_ string, torrent *Torrent) { if torrent.AnyInProgress() { @@ -587,46 +550,65 @@ func (t *TorrentManager) repairAll() { return } forRepair := false - unselected := 0 torrent.SelectedFiles.IterCb(func(_ string, file *File) { if file.Link == "repair" && !forRepair { file.Link = "repairing" t.log.Debugf("Found a file to repair for torrent %s", torrent.AccessKey) forRepair = true } - if file.Link == "unselect" { - unselected++ - } }) if forRepair { t.log.Infof("Repairing %s", torrent.AccessKey) t.Repair(torrent.AccessKey) } + }) +} + +func (t *TorrentManager) checkForOtherDeletedTorrents() { + var toDelete []string + allTorrents, _ := t.DirectoryMap.Get(INT_ALL) + allTorrents.IterCb(func(_ string, torrent *Torrent) { + unselected := 0 + torrent.SelectedFiles.IterCb(func(_ string, file *File) { + if file.Link == "unselect" { + unselected++ + } + }) if unselected == torrent.SelectedFiles.Count() && unselected > 0 { t.log.Infof("Deleting %s", torrent.AccessKey) toDelete = append(toDelete, torrent.AccessKey) + } else { + // save to file + for i := range torrent.Instances { + t.writeTorrentToFile(&torrent.Instances[i]) + } } }) for _, accessKey := range toDelete { - t.Delete(accessKey) + t.Delete(accessKey, true, false) } } -func (t *TorrentManager) Delete(accessKey string) { - infoCache, _ := t.DirectoryMap.Get(INT_INFO_CACHE) +func (t *TorrentManager) Delete(accessKey string, deleteInRD bool, updateDirectoryResponses bool) { t.log.Infof("Deleting torrent %s", accessKey) - allTorrents, _ := t.DirectoryMap.Get(INT_ALL) - if torrent, ok := allTorrents.Get(accessKey); ok { - for _, instance := range torrent.Instances { - infoCache.Remove(instance.ID) - t.Api.DeleteTorrent(instance.ID) + if deleteInRD { + allTorrents, _ := t.DirectoryMap.Get(INT_ALL) + if torrent, ok := allTorrents.Get(accessKey); ok { + for _, instance := range torrent.Instances { + t.Api.DeleteTorrent(instance.ID) + } } } - t.DirectoryMap.IterCb(func(_ string, torrents cmap.ConcurrentMap[string, *Torrent]) { - if _, ok := torrents.Get(accessKey); ok { + t.DirectoryMap.IterCb(func(directory string, torrents cmap.ConcurrentMap[string, *Torrent]) { + if ok := torrents.Has(accessKey); ok { torrents.Remove(accessKey) + pathKey := fmt.Sprintf("%s/%s", directory, accessKey) + t.ResponseCache.Del(pathKey) } }) + if updateDirectoryResponses { + t.UpdateDirectoryResponsesCache() + } } func (t *TorrentManager) Repair(accessKey string) { @@ -675,11 +657,7 @@ func (t *TorrentManager) Repair(accessKey string) { selectedFiles, isChaotic = t.organizeChaos(links, selectedFiles) if isChaotic { t.log.Warnf("Torrent %s is always returning an unplayable rar file (it will no longer show up in your directories, zurg suggests you delete it)", torrent.AccessKey) - t.DirectoryMap.IterCb(func(_ string, torrents cmap.ConcurrentMap[string, *Torrent]) { - torrents.Remove(torrent.AccessKey) - }) - t.ScheduleForRefresh() - // t.log.Debugf("You can try fixing it yourself magnet:?xt=urn:btih:%s", info.Hash) + t.Delete(torrent.AccessKey, false, true) return } else if streamableCount == 1 { t.log.Warnf("Torrent %s only file has expired (it will no longer show up in your directories, zurg suggests you delete it)", torrent.AccessKey) @@ -687,7 +665,7 @@ func (t *TorrentManager) Repair(accessKey string) { t.DirectoryMap.IterCb(func(_ string, torrents cmap.ConcurrentMap[string, *Torrent]) { torrents.Remove(torrent.AccessKey) }) - t.ScheduleForRefresh() + t.Delete(torrent.AccessKey, false, true) return } // t.log.Debugf("Identified the expired files of torrent id=%s", info.ID) @@ -857,7 +835,21 @@ func (t *TorrentManager) canCapacityHandle() bool { } } -func (t *TorrentManager) updateDirectoryResponsesCache() { +func (t *TorrentManager) UpdateTorrentResponseCache(torrent *Torrent) { + dav, html := t.buildTorrentResponses(torrent) + t.AssignedDirectoryCb(torrent, func(directory string) { + torrents, _ := t.DirectoryMap.Get(directory) + torrents.Set(torrent.AccessKey, torrent) + pathKey := fmt.Sprintf("%s/%s", directory, torrent.AccessKey) + // torrent responses + newHtml := strings.ReplaceAll(html, "$dir", directory) + t.ResponseCache.Set(pathKey+".html", &newHtml, 1) + newDav := strings.ReplaceAll(dav, "$dir", directory) + t.ResponseCache.Set(pathKey+".dav", &newDav, 1) + }) +} + +func (t *TorrentManager) UpdateDirectoryResponsesCache() { t.DirectoryMap.IterCb(func(directory string, torrents cmap.ConcurrentMap[string, *Torrent]) { allKeys := torrents.Keys() sort.Strings(allKeys) diff --git a/internal/universal/get.go b/internal/universal/get.go index e2a84ad..389238a 100644 --- a/internal/universal/get.go +++ b/internal/universal/get.go @@ -68,10 +68,7 @@ func (gf *GetFile) HandleGetRequest(directory, torrentName, fileName string, res if unrestrict == nil { // log.Warnf("File %s is no longer available, link %s", filepath.Base(file.Path), link) file.Link = "repair" - if cfg.EnableRepair() { - // log.Debugf("File %s is marked for repair", filepath.Base(file.Path)) - torMgr.ScheduleForRefresh() // force a recheck - } + torMgr.UpdateTorrentResponseCache(torrent) http.Error(resp, "File is not available", http.StatusNotFound) return } else { @@ -92,7 +89,7 @@ func (gf *GetFile) HandleGetRequest(directory, torrentName, fileName string, res if cfg.ShouldServeFromRclone() { redirect(resp, req, unrestrict.Download, cfg) } else { - gf.streamFileToResponse(file, unrestrict.Download, resp, req, torMgr, cfg, log) + gf.streamFileToResponse(torrent, file, unrestrict.Download, resp, req, torMgr, cfg, log) } return } @@ -131,7 +128,7 @@ func (gf *GetFile) streamCachedLinkToResponse(url string, resp http.ResponseWrit return nil } -func (gf *GetFile) streamFileToResponse(file *intTor.File, url string, resp http.ResponseWriter, req *http.Request, torMgr *intTor.TorrentManager, cfg config.ConfigInterface, log *zap.SugaredLogger) { +func (gf *GetFile) streamFileToResponse(torrent *intTor.Torrent, file *intTor.File, url string, resp http.ResponseWriter, req *http.Request, torMgr *intTor.TorrentManager, cfg config.ConfigInterface, log *zap.SugaredLogger) { // Create a new request for the file download. dlReq, err := http.NewRequest(http.MethodGet, url, nil) if err != nil { @@ -152,10 +149,7 @@ func (gf *GetFile) streamFileToResponse(file *intTor.File, url string, resp http if file != nil { log.Warnf("Cannot download file %s: %v", file.Path, err) file.Link = "repair" - if cfg.EnableRepair() { - // log.Debugf("File %s is marked for repair", filepath.Base(file.Path)) - torMgr.ScheduleForRefresh() // force a recheck - } + torMgr.UpdateTorrentResponseCache(torrent) } http.Error(resp, "File is not available", http.StatusNotFound) return @@ -166,10 +160,7 @@ func (gf *GetFile) streamFileToResponse(file *intTor.File, url string, resp http if file != nil { log.Warnf("Received a %s status code for file %s", download.Status, file.Path) file.Link = "repair" - if cfg.EnableRepair() { - // log.Debugf("File %s is marked for repair", filepath.Base(file.Path)) - torMgr.ScheduleForRefresh() // force a recheck - } + torMgr.UpdateTorrentResponseCache(torrent) } http.Error(resp, "File is not available", http.StatusNotFound) return diff --git a/pkg/realdebrid/network.go b/pkg/realdebrid/network.go index 1ae8a37..c490d73 100644 --- a/pkg/realdebrid/network.go +++ b/pkg/realdebrid/network.go @@ -69,6 +69,7 @@ func RunTest() { } wg.Wait() + close(semaphore) close(infoChan) fmt.Printf("Network test complete.\n\n")