package torrent import ( "fmt" "io" "math" "net/url" "os" "path/filepath" "sort" "strings" "sync" "time" "github.com/debridmediamanager/zurg/internal/config" "github.com/debridmediamanager/zurg/pkg/dav" "github.com/debridmediamanager/zurg/pkg/realdebrid" "github.com/debridmediamanager/zurg/pkg/utils" "github.com/dgraph-io/ristretto" cmap "github.com/orcaman/concurrent-map/v2" "github.com/panjf2000/ants/v2" "github.com/scylladb/go-set" "github.com/scylladb/go-set/strset" "go.uber.org/zap" ) const ( INT_ALL = "int__all__" INT_INFO_CACHE = "int__info__" ) type TorrentManager struct { Config config.ConfigInterface Api *realdebrid.RealDebrid DirectoryMap cmap.ConcurrentMap[string, cmap.ConcurrentMap[string, *Torrent]] // directory -> accessKey -> Torrent DownloadCache cmap.ConcurrentMap[string, *realdebrid.Download] ResponseCache *ristretto.Cache accessKeySet *strset.Set latestState *LibraryState requiredVersion string workerPool *ants.Pool repairWorker *ants.Pool log *zap.SugaredLogger } // NewTorrentManager creates a new torrent manager // it will fetch all torrents and their info in the background // and store them in-memory and cached in files func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, p *ants.Pool, cache *ristretto.Cache, log *zap.SugaredLogger) *TorrentManager { initialSate := EmptyState() t := &TorrentManager{ Config: cfg, Api: api, ResponseCache: cache, accessKeySet: set.NewStringSet(), latestState: &initialSate, requiredVersion: "03.12.2023", workerPool: p, log: log, } // create internal directories t.DirectoryMap = cmap.New[cmap.ConcurrentMap[string, *Torrent]]() t.DirectoryMap.Set(INT_ALL, cmap.New[*Torrent]()) // key is AccessKey t.DirectoryMap.Set(INT_INFO_CACHE, cmap.New[*Torrent]()) // key is Torrent ID // create directory maps for _, directory := range cfg.GetDirectories() { t.DirectoryMap.Set(directory, cmap.New[*Torrent]()) } // Fetch downloads t.DownloadCache = cmap.New[*realdebrid.Download]() if t.Config.EnableDownloadCache() { _ = t.workerPool.Submit(func() { page := 1 offset := 0 for { downloads, totalDownloads, err := t.Api.GetDownloads(page, offset) if err != nil { t.log.Fatalf("Cannot get downloads: %v\n", err) } for i := range downloads { if !t.DownloadCache.Has(downloads[i].Link) { t.DownloadCache.Set(downloads[i].Link, &downloads[i]) } } offset += len(downloads) page++ if offset >= totalDownloads { t.log.Infof("Fetched %d downloads", t.DownloadCache.Count()) break } } }) } t.RefreshTorrents() if t.Config.EnableRepair() { repairWorker, err := ants.NewPool(1) if err != nil { log.Fatalf("Failed to create repair worker: %v", err) } t.repairWorker = repairWorker t.RepairAll() // initial repair } else { t.log.Info("Repair is disabled, skipping repair check") } t.log.Info("Finished initializing torrent manager") _ = t.workerPool.Submit(func() { t.startRefreshJob() }) return t } func (t *TorrentManager) RefreshTorrents() { instances, _, err := t.Api.GetTorrents(0) if err != nil { t.log.Warnf("Cannot get torrents: %v\n", err) return } infoChan := make(chan *Torrent, len(instances)) var wg sync.WaitGroup for i := range instances { wg.Add(1) idx := i // capture the loop variable _ = t.workerPool.Submit(func() { defer wg.Done() infoChan <- t.getMoreInfo(instances[idx]) }) } wg.Wait() close(infoChan) t.log.Infof("Fetched info for %d torrents", len(instances)) freshKeys := set.NewStringSet() oldTorrents, _ := t.DirectoryMap.Get(INT_ALL) noInfoCount := 0 for info := range infoChan { if info == nil { noInfoCount++ continue } freshKeys.Add(info.AccessKey) if torrent, exists := oldTorrents.Get(info.AccessKey); !exists { oldTorrents.Set(info.AccessKey, info) } else if !strset.Difference(info.DownloadedIDs, torrent.DownloadedIDs).IsEmpty() { mainTorrent := t.mergeToMain(torrent, info) oldTorrents.Set(info.AccessKey, &mainTorrent) } } t.log.Infof("Compiled into %d torrents, %d were missing info", oldTorrents.Count(), noInfoCount) somthingChanged := false // removed strset.Difference(t.accessKeySet, freshKeys).Each(func(accessKey string) bool { somthingChanged = true t.Delete(accessKey, false, false) return true }) // new strset.Difference(freshKeys, t.accessKeySet).Each(func(accessKey string) bool { somthingChanged = true torrent, _ := oldTorrents.Get(accessKey) t.UpdateTorrentResponseCache(torrent) t.accessKeySet.Add(accessKey) return true }) // now we can build the directory responses if somthingChanged { t.UpdateDirectoryResponsesCache() } t.SetNewLatestState(t.getCurrentState()) // todo: work on hook // _ = t.workerPool.Submit(func() { // OnLibraryUpdateHook(updatedPaths, t.Config, t.log) // }) } // getMoreInfo gets original name, size and files for a torrent func (t *TorrentManager) getMoreInfo(rdTorrent realdebrid.Torrent) *Torrent { infoCache, _ := t.DirectoryMap.Get(INT_INFO_CACHE) if tor, exists := infoCache.Get(rdTorrent.ID); exists && tor.SelectedFiles.Count() == len(rdTorrent.Links) { return tor } torrentFromFile := t.readTorrentFromFile(rdTorrent.ID) if torrentFromFile != nil && torrentFromFile.SelectedFiles.Count() == len(rdTorrent.Links) { infoCache.Set(rdTorrent.ID, torrentFromFile) return torrentFromFile } info, err := t.Api.GetTorrentInfo(rdTorrent.ID) if err != nil { t.log.Warnf("Cannot get info for id=%s: %v\n", rdTorrent.ID, err) return nil } // SelectedFiles is a subset of Files with only the selected ones // it also has a Link field, which can be empty // if it is empty, it means the file is no longer available // Files+Links together are the same as SelectedFiles var selectedFiles []*File // if some Links are empty, we need to repair it for _, file := range info.Files { if file.Selected == 0 { continue } selectedFiles = append(selectedFiles, &File{ File: file, Added: info.Added, Ended: info.Ended, Link: "", // no link yet }) } if len(selectedFiles) > len(info.Links) && info.Progress == 100 { t.log.Warnf("Torrent id=%s is partly expired, it has %d selected files but only %d links", info.ID, len(selectedFiles), len(info.Links)) for i, file := range selectedFiles { file.Link = "repair" i++ } } else if len(selectedFiles) == len(info.Links) { // all links are still intact! good! for i, file := range selectedFiles { file.Link = info.Links[i] i++ } } torrent := Torrent{ AccessKey: t.computeAccessKey(info.Name, info.OriginalName), LatestAdded: info.Added, Hash: info.Hash, } torrent.SelectedFiles = cmap.New[*File]() for _, file := range selectedFiles { // todo better handling of duplicate filenames if torrent.SelectedFiles.Has(filepath.Base(file.Path)) { oldName := filepath.Base(file.Path) ext := filepath.Ext(oldName) filename := strings.TrimSuffix(oldName, ext) newName := fmt.Sprintf("%s (%d)%s", filename, file.ID, ext) torrent.SelectedFiles.Set(newName, file) } else { torrent.SelectedFiles.Set(filepath.Base(file.Path), file) } } torrent.DownloadedIDs = strset.New() torrent.InProgressIDs = strset.New() if info.Progress == 100 { torrent.DownloadedIDs.Add(info.ID) } else { torrent.InProgressIDs.Add(info.ID) } infoCache.Set(rdTorrent.ID, &torrent) err = t.writeTorrentToFile(rdTorrent.ID, &torrent) if err != nil { t.log.Warnf("Cannot write torrent to file: %v", err) } return &torrent } func (t *TorrentManager) mergeToMain(existing, toMerge *Torrent) Torrent { mainTorrent := Torrent{} mainTorrent.AccessKey = existing.AccessKey mainTorrent.Hash = existing.Hash mainTorrent.DownloadedIDs = strset.New() mainTorrent.InProgressIDs = strset.New() // this function triggers only when we have a new DownloadedID strset.Difference(toMerge.DownloadedIDs, existing.DownloadedIDs).Each(func(id string) bool { mainTorrent.DownloadedIDs.Add(id) mainTorrent.InProgressIDs.Remove(id) return true }) // the link can have the following values // 1. https://*** - the file is available // 2. repair - the file is available but we need to repair it // 3. repairing - the file is being repaired // 4. unselect - the file is deleted mainTorrent.SelectedFiles = existing.SelectedFiles toMerge.SelectedFiles.IterCb(func(filepath string, fileToMerge *File) { // see if it already exists in the main torrent if originalFile, ok := mainTorrent.SelectedFiles.Get(filepath); !ok || fileToMerge.Link == "unselect" { // if it doesn't exist in the main torrent, add it mainTorrent.SelectedFiles.Set(filepath, fileToMerge) } else if originalFile.Link != "unselect" { // if it exists, compare the LatestAdded property and the link if existing.LatestAdded < toMerge.LatestAdded && strings.HasPrefix(fileToMerge.Link, "http") { // if torrentToMerge is more recent and its file has a link, update the main torrent's file mainTorrent.SelectedFiles.Set(filepath, fileToMerge) } // else do nothing, the main torrent's file is more recent or has a valid link } }) if existing.LatestAdded < toMerge.LatestAdded { mainTorrent.LatestAdded = toMerge.LatestAdded } else { mainTorrent.LatestAdded = existing.LatestAdded } return mainTorrent } // proxy func (t *TorrentManager) UnrestrictUntilOk(link string) *realdebrid.Download { t.log.Debugf("Unrestricting %s", link) retChan := make(chan *realdebrid.Download, 1) defer close(retChan) t.workerPool.Submit(func() { retChan <- t.Api.UnrestrictUntilOk(link, t.Config.ShouldServeFromRclone()) }) return <-retChan } func (t *TorrentManager) SetNewLatestState(checksum LibraryState) { t.latestState.DownloadingCount = checksum.DownloadingCount t.latestState.FirstTorrent = checksum.FirstTorrent t.latestState.TotalCount = checksum.TotalCount } type torrentsResp struct { torrents []realdebrid.Torrent totalCount int } // generates a checksum based on the number of torrents, the first torrent id and the number of active torrents func (t *TorrentManager) getCurrentState() LibraryState { torrentsChan := make(chan torrentsResp, 1) countChan := make(chan int, 1) errChan := make(chan error, 2) // accommodate errors from both goroutines defer close(torrentsChan) defer close(countChan) defer close(errChan) _ = t.workerPool.Submit(func() { torrents, totalCount, err := t.Api.GetTorrents(1) if err != nil { errChan <- err return } torrentsChan <- torrentsResp{torrents: torrents, totalCount: totalCount} }) _ = t.workerPool.Submit(func() { count, err := t.Api.GetActiveTorrentCount() if err != nil { errChan <- err return } countChan <- count.DownloadingCount }) // Existing goroutines for GetTorrents and GetActiveTorrentCount var torrents []realdebrid.Torrent var totalCount, count int for i := 0; i < 2; i++ { select { case resp := <-torrentsChan: torrents = resp.torrents totalCount = resp.totalCount case count = <-countChan: case err := <-errChan: t.log.Warnf("Checksum API Error: %v\n", err) return EmptyState() } } if len(torrents) == 0 { t.log.Error("Huh, no torrents returned") return EmptyState() } return LibraryState{ TotalCount: totalCount, FirstTorrent: &torrents[0], DownloadingCount: count, } } // startRefreshJob periodically refreshes the torrents func (t *TorrentManager) startRefreshJob() { t.log.Info("Starting periodic refresh") for { <-time.After(time.Duration(t.Config.GetRefreshEverySeconds()) * time.Second) checksum := t.getCurrentState() if t.latestState.equal(checksum) { continue } t.log.Infof("Detected changes! Refreshing %d torrents", checksum.TotalCount) t.RefreshTorrents() t.log.Info("Finished refreshing torrents") if t.Config.EnableRepair() { t.RepairAll() } else { t.log.Info("Repair is disabled, skipping repair check") } } } func (t *TorrentManager) computeAccessKey(name, originalName string) string { if t.Config.EnableRetainRDTorrentName() { return name } // drop the extension from the name if t.Config.EnableRetainFolderNameExtension() && strings.Contains(name, originalName) { return name } else { ret := strings.TrimSuffix(originalName, ".mp4") ret = strings.TrimSuffix(ret, ".mkv") return ret } } func (t *TorrentManager) writeTorrentToFile(instanceID string, torrent *Torrent) error { filePath := "data/" + instanceID + ".json" file, err := os.Create(filePath) if err != nil { return fmt.Errorf("failed creating file: %w", err) } defer file.Close() torrent.Version = t.requiredVersion jsonData, err := json.Marshal(torrent) if err != nil { return fmt.Errorf("failed marshaling torrent: %w", err) } if _, err := file.Write(jsonData); err != nil { return fmt.Errorf("failed writing to file: %w", err) } t.log.Debugf("Saved torrent %s to file", instanceID) return nil } func (t *TorrentManager) readTorrentFromFile(torrentID string) *Torrent { filePath := "data/" + torrentID + ".json" file, err := os.Open(filePath) if err != nil { if os.IsNotExist(err) { return nil } return nil } defer file.Close() jsonData, err := io.ReadAll(file) if err != nil { return nil } var torrent *Torrent if err := json.Unmarshal(jsonData, &torrent); err != nil { return nil } if strset.Union(torrent.DownloadedIDs, torrent.InProgressIDs).IsEmpty() { t.log.Fatal("Torrent has no downloaded or in progress ids") } if torrent.Version != t.requiredVersion { return nil } return torrent } func (t *TorrentManager) deleteTorrentFile(torrentID string) { filePath := "data/" + torrentID + ".json" err := os.Remove(filePath) if err != nil { t.log.Warnf("Cannot delete file %s: %v", filePath, err) } } func (t *TorrentManager) organizeChaos(links []string, selectedFiles []*File) ([]*File, bool) { type Result struct { Response *realdebrid.Download } resultsChan := make(chan Result, len(links)) var wg sync.WaitGroup for _, link := range links { wg.Add(1) link := link // redeclare to avoid closure on loop variable // Use the existing worker pool to submit tasks _ = t.workerPool.Submit(func() { defer wg.Done() if t.DownloadCache.Has(link) { download, _ := t.DownloadCache.Get(link) resultsChan <- Result{Response: download} return } resp := t.UnrestrictUntilOk(link) resultsChan <- Result{Response: resp} }) } wg.Wait() close(resultsChan) chaoticFileCount := 0 for result := range resultsChan { if result.Response == nil { continue } found := false for _, file := range selectedFiles { if strings.Contains(file.Path, result.Response.Filename) { file.Link = result.Response.Link found = true } } if !found { if result.Response.Streamable == 1 { now := time.Now().Format(time.RFC3339) selectedFiles = append(selectedFiles, &File{ File: realdebrid.File{ ID: math.MaxInt32, Path: result.Response.Filename, Bytes: result.Response.Filesize, Selected: 1, }, Added: now, Ended: now, Link: result.Response.Link, }) } else { chaoticFileCount++ } } } return selectedFiles, chaoticFileCount == len(links) } func (t *TorrentManager) repairAll() { proceed := t.canCapacityHandle() // blocks for approx 45 minutes if active torrents are full if !proceed { t.log.Error("Reached the max number of active torrents, cannot start repair") // TODO delete oldest in progress torrent return } allTorrents, _ := t.DirectoryMap.Get(INT_ALL) var toRepair []*Torrent allTorrents.IterCb(func(_ string, torrent *Torrent) { if torrent.AnyInProgress() { t.log.Debugf("Skipping %s for repairs because it is in progress", torrent.AccessKey) return } forRepair := false torrent.SelectedFiles.IterCb(func(_ string, file *File) { if file.Link == "repair" { file.Link = "repairing" forRepair = true } }) if forRepair { toRepair = append(toRepair, torrent) } }) t.log.Debugf("Found %d torrents to repair", len(toRepair)) for i := range toRepair { t.log.Infof("Repairing %s", toRepair[i].AccessKey) t.repair(toRepair[i]) } } func (t *TorrentManager) CheckDeletedState(torrent *Torrent) bool { var unselectedIDs []int torrent.SelectedFiles.IterCb(func(_ string, file *File) { if file.Link == "unselect" { unselectedIDs = append(unselectedIDs, file.ID) } }) if len(unselectedIDs) == torrent.SelectedFiles.Count() && len(unselectedIDs) > 0 { return true } else if len(unselectedIDs) > 0 { torrent.DownloadedIDs.Each(func(id string) bool { t.writeTorrentToFile(id, torrent) return true }) } return false } func (t *TorrentManager) Delete(accessKey string, deleteInRD bool, updateDirectoryResponses bool) { if deleteInRD { allTorrents, _ := t.DirectoryMap.Get(INT_ALL) infoCache, _ := t.DirectoryMap.Get(INT_INFO_CACHE) if torrent, ok := allTorrents.Get(accessKey); ok { torrent.DownloadedIDs.Each(func(id string) bool { t.log.Infof("Deleting torrent %s %s in RD", id, accessKey) t.Api.DeleteTorrent(id) infoCache.Remove(id) t.deleteTorrentFile(id) return true }) } } t.log.Infof("Removing torrent %s from zurg database", accessKey) t.DirectoryMap.IterCb(func(directory string, torrents cmap.ConcurrentMap[string, *Torrent]) { if ok := torrents.Has(accessKey); ok { torrents.Remove(accessKey) pathKey := fmt.Sprintf("%s/%s", directory, accessKey) t.ResponseCache.Del(pathKey) } }) if updateDirectoryResponses { t.UpdateDirectoryResponsesCache() } } func (t *TorrentManager) repair(torrent *Torrent) { if torrent.AllInProgress() { t.log.Infof("Torrent %s is in progress, cannot repair", torrent.AccessKey) return } // make the file messy t.log.Infof("Evaluating whole torrent to find the correct files for torrent: %s", torrent.AccessKey) var selectedFiles []*File var isChaotic bool var links []string streamableCount := 0 torrent.SelectedFiles.IterCb(func(_ string, file *File) { if utils.IsStreamable(file.Path) { streamableCount++ } fileCopy := &File{ File: file.File, Added: file.Added, Ended: file.Ended, Link: file.Link, } selectedFiles = append(selectedFiles, fileCopy) if strings.HasPrefix(fileCopy.Link, "http") { links = append(links, fileCopy.Link) } fileCopy.Link = "" // empty the links = chaos! }) // chaotic file means RD will not output the desired file selection // e.g. even if we select just a single mkv, it will output a rar selectedFiles, isChaotic = t.organizeChaos(links, selectedFiles) if isChaotic { t.log.Warnf("Torrent %s is always returning an unplayable rar file (it will no longer show up in your directories, zurg suggests you delete it)", torrent.AccessKey) t.Delete(torrent.AccessKey, false, true) return } else if streamableCount == 1 { t.log.Warnf("Torrent %s only file has expired (it will no longer show up in your directories, zurg suggests you delete it)", torrent.AccessKey) t.Delete(torrent.AccessKey, false, true) return } // t.log.Debugf("Identified the expired files of torrent id=%s", info.ID) for _, newFile := range selectedFiles { if file, exists := torrent.SelectedFiles.Get(filepath.Base(newFile.Path)); exists { file.Link = newFile.Link } else { torrent.SelectedFiles.Set(filepath.Base(newFile.Path), newFile) } } proceed := t.canCapacityHandle() // blocks for approx 45 minutes if active torrents are full if !proceed { t.log.Error("Reached the max number of active torrents, cannot continue with the repair") return } // first solution: add the same selection, maybe it can be fixed by reinsertion? if t.reinsertTorrent(torrent, "") { t.log.Infof("Successfully downloaded torrent %s to repair it", torrent.AccessKey) return } // if all the selected files are missing but there are other streamable files var missingFiles []File torrent.SelectedFiles.IterCb(func(_ string, file *File) { if !strings.HasPrefix(file.Link, "http") { missingFiles = append(missingFiles, *file) } }) // if we download a single file, it will be named differently // so we need to download 1 extra file to preserve the name // this is only relevant if we enable retain_rd_torrent_name if len(missingFiles) == 1 && streamableCount > 1 { // add the first file link encountered with a prefix of http for _, file := range torrent.SelectedFiles.Items() { if strings.HasPrefix(file.Link, "http") { missingFiles = append(missingFiles, *file) break } } } if len(missingFiles) > 0 { t.log.Infof("Redownloading in multiple batches the %d missing files for torrent %s", len(missingFiles), torrent.AccessKey) // if not, last resort: add only the missing files but do it in 2 batches half := len(missingFiles) / 2 missingFiles1 := strings.Join(getFileIDs(missingFiles[:half]), ",") missingFiles2 := strings.Join(getFileIDs(missingFiles[half:]), ",") if missingFiles1 != "" { t.reinsertTorrent(torrent, missingFiles1) } if missingFiles2 != "" { t.reinsertTorrent(torrent, missingFiles2) } } else { t.log.Warnf("Torrent %s has no missing files to repair", torrent.AccessKey) } } func (t *TorrentManager) reinsertTorrent(torrent *Torrent, missingFiles string) bool { // if missingFiles is not provided, missing files means missing links if missingFiles == "" { tmpSelection := "" torrent.SelectedFiles.IterCb(func(_ string, file *File) { if !strings.HasPrefix(file.Link, "http") { tmpSelection += fmt.Sprintf("%d,", file.ID) } }) if tmpSelection == "" { return false } if len(tmpSelection) > 0 { missingFiles = tmpSelection[:len(tmpSelection)-1] } } // redownload torrent resp, err := t.Api.AddMagnetHash(torrent.Hash) if err != nil { t.log.Warnf("Cannot redownload torrent: %v", err) return false } time.Sleep(1 * time.Second) // select files newTorrentID := resp.ID err = t.Api.SelectTorrentFiles(newTorrentID, missingFiles) if err != nil { t.log.Warnf("Cannot start redownloading: %v", err) t.Api.DeleteTorrent(newTorrentID) return false } time.Sleep(10 * time.Second) // see if the torrent is ready info, err := t.Api.GetTorrentInfo(newTorrentID) if err != nil { t.log.Warnf("Cannot get info on redownloaded torrent id=%s : %v", newTorrentID, err) t.Api.DeleteTorrent(newTorrentID) return false } if info.Status == "magnet_error" || info.Status == "error" || info.Status == "virus" || info.Status == "dead" { t.log.Warnf("The redownloaded torrent id=%s is in error state: %s", newTorrentID, info.Status) t.Api.DeleteTorrent(newTorrentID) return false } if info.Progress != 100 { t.log.Infof("Torrent id=%s is not cached anymore so we have to wait until completion (this should fix the issue already)", info.ID) return true } missingCount := len(strings.Split(missingFiles, ",")) if len(info.Links) != missingCount { t.log.Infof("It did not fix the issue for id=%s, only got %d files but we need %d, undoing", info.ID, len(info.Links), missingCount) t.Api.DeleteTorrent(newTorrentID) return false } t.log.Infof("Repair successful id=%s", newTorrentID) return true } func (t *TorrentManager) canCapacityHandle() bool { // max waiting time is 45 minutes const maxRetries = 50 const baseDelay = 1 * time.Second const maxDelay = 60 * time.Second retryCount := 0 for { count, err := t.Api.GetActiveTorrentCount() if err != nil { t.log.Warnf("Cannot get active downloads count: %v", err) if retryCount >= maxRetries { t.log.Error("Max retries reached. Exiting.") return false } delay := time.Duration(math.Pow(2, float64(retryCount))) * baseDelay if delay > maxDelay { delay = maxDelay } time.Sleep(delay) retryCount++ continue } if count.DownloadingCount < count.MaxNumberOfTorrents { // t.log.Infof("We can still add a new torrent, we have capacity for %d more", count.MaxNumberOfTorrents-count.DownloadingCount) return true } delay := time.Duration(math.Pow(2, float64(retryCount))) * baseDelay if delay > maxDelay { delay = maxDelay } t.log.Infof("We have reached the max number of active torrents, waiting for %s seconds before retrying", delay) if retryCount >= maxRetries { t.log.Error("Max retries reached, exiting") return false } time.Sleep(delay) retryCount++ } } func (t *TorrentManager) RepairAll() { _ = t.repairWorker.Submit(func() { t.log.Info("Checking for torrents to repair") t.repairAll() t.log.Info("Finished checking for torrents to repair") }) } func (t *TorrentManager) Repair(torrent *Torrent) { _ = t.repairWorker.Submit(func() { t.log.Info("repairing torrent %s", torrent.AccessKey) t.repair(torrent) t.log.Info("Finished repairing torrent %s", torrent.AccessKey) }) t.UpdateTorrentResponseCache(torrent) } func (t *TorrentManager) UpdateTorrentResponseCache(torrent *Torrent) { dav, html := t.buildTorrentResponses(torrent) t.AssignedDirectoryCb(torrent, func(directory string) { torrents, _ := t.DirectoryMap.Get(directory) torrents.Set(torrent.AccessKey, torrent) pathKey := fmt.Sprintf("%s/%s", directory, torrent.AccessKey) // torrent responses newHtml := strings.ReplaceAll(html, "$dir", directory) t.ResponseCache.Set(pathKey+".html", newHtml, 1) newDav := strings.ReplaceAll(dav, "$dir", directory) t.ResponseCache.Set(pathKey+".dav", newDav, 1) }) } func (t *TorrentManager) UpdateDirectoryResponsesCache() { t.DirectoryMap.IterCb(func(directory string, torrents cmap.ConcurrentMap[string, *Torrent]) { allKeys := torrents.Keys() sort.Strings(allKeys) davRet := "" htmlRet := "" for _, accessKey := range allKeys { if tor, ok := torrents.Get(accessKey); ok { if tor.AllInProgress() { continue } davRet += dav.Directory(tor.AccessKey, tor.LatestAdded) htmlRet += fmt.Sprintf("