diff --git a/internal/torrent/manager.go b/internal/torrent/manager.go index a34e2b7..c48cc71 100644 --- a/internal/torrent/manager.go +++ b/internal/torrent/manager.go @@ -40,6 +40,7 @@ type TorrentManager struct { latestState *LibraryState requiredVersion string workerPool *ants.Pool + repairWorker *ants.Pool log *zap.SugaredLogger } @@ -48,6 +49,7 @@ type TorrentManager struct { // and store them in-memory and cached in files func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, p *ants.Pool, cache *ristretto.Cache, log *zap.SugaredLogger) *TorrentManager { initialSate := EmptyState() + t := &TorrentManager{ Config: cfg, Api: api, @@ -95,16 +97,89 @@ func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, p } t.RefreshTorrents() + if t.Config.EnableRepair() { + repairWorker, err := ants.NewPool(1) + if err != nil { + log.Fatalf("Failed to create repair worker: %v", err) + } + t.repairWorker = repairWorker + t.RepairAll() // initial repair + } else { + t.log.Info("Repair is disabled, skipping repair check") + } + + t.log.Info("Finished initializing torrent manager") _ = t.workerPool.Submit(func() { t.startRefreshJob() }) - t.log.Info("Finished initializing torrent manager") - return t } +func (t *TorrentManager) RefreshTorrents() { + // get all torrent info + instances, _, err := t.Api.GetTorrents(0) + if err != nil { + t.log.Warnf("Cannot get torrents: %v\n", err) + return + } + instanceCount := len(instances) + infoChan := make(chan *Torrent, instanceCount) + var wg sync.WaitGroup + for i := range instances { + wg.Add(1) + idx := i // capture the loop variable + _ = t.workerPool.Submit(func() { + defer wg.Done() + infoChan <- t.getMoreInfo(instances[idx]) + }) + } + wg.Wait() + close(infoChan) + t.log.Infof("Fetched info for %d torrents", instanceCount) + + freshKeys := set.NewStringSet() + oldTorrents, _ := t.DirectoryMap.Get(INT_ALL) + noInfoCount := 0 + for info := range infoChan { + if info == nil { + noInfoCount++ + continue + } + freshKeys.Add(info.AccessKey) + if torrent, exists := oldTorrents.Get(info.AccessKey); !exists { + oldTorrents.Set(info.AccessKey, info) + } else { + mainTorrent := t.mergeToMain(torrent, info) + oldTorrents.Set(info.AccessKey, mainTorrent) + } + } + // removed + strset.Difference(t.accessKeySet, freshKeys).Each(func(accessKey string) bool { + t.Delete(accessKey, false, false) + return true + }) + // new + strset.Difference(freshKeys, t.accessKeySet).Each(func(accessKey string) bool { + torrent, _ := oldTorrents.Get(accessKey) + t.UpdateTorrentResponseCache(torrent) + t.accessKeySet.Add(accessKey) + return true + }) + t.checkForOtherDeletedTorrents() + // now we can build the directory responses + t.UpdateDirectoryResponsesCache() + t.log.Infof("Compiled into %d torrents, %d were missing info", oldTorrents.Count(), noInfoCount) + + t.SetNewLatestState(t.getCurrentState()) + + // todo: work on hook + // _ = t.workerPool.Submit(func() { + // OnLibraryUpdateHook(updatedPaths, t.Config, t.log) + // }) +} + func (t *TorrentManager) mergeToMain(mainTorrent, torrentToMerge *Torrent) *Torrent { // the link can have the following values // 1. https://*** - the file is available @@ -155,10 +230,6 @@ func (t *TorrentManager) SetNewLatestState(checksum LibraryState) { t.latestState.TotalCount = checksum.TotalCount } -func (t *TorrentManager) ScheduleForRefresh() { - t.SetNewLatestState(EmptyState()) -} - type torrentsResp struct { torrents []realdebrid.Torrent totalCount int @@ -232,108 +303,14 @@ func (t *TorrentManager) startRefreshJob() { t.log.Infof("Detected changes! Refreshing %d torrents", checksum.TotalCount) t.RefreshTorrents() - t.log.Info("Finished refreshing torrents") - } -} -func (t *TorrentManager) RefreshTorrents() { - // get all torrent info - instances, _, err := t.Api.GetTorrents(0) - if err != nil { - t.log.Warnf("Cannot get torrents: %v\n", err) - return - } - instanceCount := len(instances) - infoChan := make(chan *Torrent, instanceCount) - var wg sync.WaitGroup - for i := range instances { - wg.Add(1) - idx := i // capture the loop variable - _ = t.workerPool.Submit(func() { - defer wg.Done() - infoChan <- t.getMoreInfo(instances[idx]) - }) - } - wg.Wait() - close(infoChan) - t.log.Infof("Fetched info for %d torrents", instanceCount) - - // todo: inefficient - // handle deleted torrents in info cache - // but only do it if there is an info cache filled - infoCache, _ := t.DirectoryMap.Get(INT_INFO_CACHE) - if infoCache.Count() > 0 { - t.cleanInfoCache(instances) - } - - freshKeys := set.NewStringSet() - oldTorrents, _ := t.DirectoryMap.Get(INT_ALL) - noInfoCount := 0 - for info := range infoChan { - if info == nil { - noInfoCount++ - continue - } - freshKeys.Add(info.AccessKey) - if torrent, exists := oldTorrents.Get(info.AccessKey); !exists { - oldTorrents.Set(info.AccessKey, info) + if t.Config.EnableRepair() { + t.RepairAll() } else { - mainTorrent := t.mergeToMain(torrent, info) - oldTorrents.Set(info.AccessKey, mainTorrent) + t.log.Info("Repair is disabled, skipping repair check") } - } - // removed - strset.Difference(t.accessKeySet, freshKeys).Each(func(accessKey string) bool { - t.DirectoryMap.IterCb(func(_ string, torrents cmap.ConcurrentMap[string, *Torrent]) { - torrents.Remove(accessKey) - }) - t.log.Infof("Deleted torrent: %s\n", accessKey) - return true - }) - // new - strset.Difference(freshKeys, t.accessKeySet).Each(func(accessKey string) bool { - torrent, _ := oldTorrents.Get(accessKey) - t.UpdateTorrentResponseCache(torrent) - t.accessKeySet.Add(accessKey) - return true - }) - t.checkForOtherDeletedTorrents() - // now we can build the directory responses - t.UpdateDirectoryResponsesCache() - t.log.Infof("Compiled into %d torrents, %d were missing info", oldTorrents.Count(), noInfoCount) - t.SetNewLatestState(t.getCurrentState()) - - if t.Config.EnableRepair() { - t.log.Info("Checking for torrents to repair") - t.repairAll() - t.log.Info("Finished checking for torrents to repair") - } else { - t.log.Info("Repair is disabled, skipping repair check") - } - - // todo: work on hook - // _ = t.workerPool.Submit(func() { - // OnLibraryUpdateHook(updatedPaths, t.Config, t.log) - // }) -} - -func (t *TorrentManager) cleanInfoCache(torrents []realdebrid.Torrent) { - infoCache, _ := t.DirectoryMap.Get(INT_INFO_CACHE) - keep := make(map[string]bool) - for _, torrent := range torrents { - keep[torrent.ID] = true - } - var toDelete []string - infoCache.IterCb(func(torrentID string, torrent *Torrent) { - if _, ok := keep[torrentID]; !ok { - toDelete = append(toDelete, torrentID) - } - }) - for _, torrentID := range toDelete { - infoCache.Remove(torrentID) - t.deleteTorrentFile(torrentID) } } @@ -394,7 +371,7 @@ func (t *TorrentManager) getMoreInfo(rdTorrent realdebrid.Torrent) *Torrent { } torrent := Torrent{ - AccessKey: t.getName(info.Name, info.OriginalName), + AccessKey: t.computeAccessKey(info.Name, info.OriginalName), LatestAdded: info.Added, Instances: []realdebrid.TorrentInfo{*info}, } @@ -412,7 +389,7 @@ func (t *TorrentManager) getMoreInfo(rdTorrent realdebrid.Torrent) *Torrent { return &torrent } -func (t *TorrentManager) getName(name, originalName string) string { +func (t *TorrentManager) computeAccessKey(name, originalName string) string { if t.Config.EnableRetainRDTorrentName() { return name } @@ -552,7 +529,7 @@ func (t *TorrentManager) repairAll() { } allTorrents, _ := t.DirectoryMap.Get(INT_ALL) - var toRepair []string + var toRepair []*Torrent allTorrents.IterCb(func(_ string, torrent *Torrent) { if torrent.AnyInProgress() { t.log.Debugf("Skipping %s for repairs because it is in progress", torrent.AccessKey) @@ -566,13 +543,13 @@ func (t *TorrentManager) repairAll() { } }) if forRepair { - toRepair = append(toRepair, torrent.AccessKey) + toRepair = append(toRepair, torrent) } }) t.log.Debugf("Found %d torrents to repair", len(toRepair)) - for _, accessKey := range toRepair { - t.log.Infof("Repairing %s", accessKey) - t.Repair(accessKey) + for i := range toRepair { + t.log.Infof("Repairing %s", toRepair[i].AccessKey) + t.repair(toRepair[i]) } } @@ -603,14 +580,18 @@ func (t *TorrentManager) checkForOtherDeletedTorrents() { func (t *TorrentManager) Delete(accessKey string, deleteInRD bool, updateDirectoryResponses bool) { if deleteInRD { - t.log.Infof("Deleting torrent %s in RD", accessKey) allTorrents, _ := t.DirectoryMap.Get(INT_ALL) + infoCache, _ := t.DirectoryMap.Get(INT_INFO_CACHE) if torrent, ok := allTorrents.Get(accessKey); ok { for _, instance := range torrent.Instances { + t.log.Infof("Deleting torrent %s %s in RD", instance.ID, accessKey) t.Api.DeleteTorrent(instance.ID) + infoCache.Remove(instance.ID) + t.deleteTorrentFile(instance.ID) } } } + t.log.Infof("Removing torrent %s from zurg database", accessKey) t.DirectoryMap.IterCb(func(directory string, torrents cmap.ConcurrentMap[string, *Torrent]) { if ok := torrents.Has(accessKey); ok { torrents.Remove(accessKey) @@ -623,14 +604,7 @@ func (t *TorrentManager) Delete(accessKey string, deleteInRD bool, updateDirecto } } -func (t *TorrentManager) Repair(accessKey string) { - allTorrents, _ := t.DirectoryMap.Get(INT_ALL) - torrent, _ := allTorrents.Get(accessKey) - if torrent == nil { - t.log.Warnf("Cannot find torrent %s anymore so we are not repairing it", accessKey) - return - } - +func (t *TorrentManager) repair(torrent *Torrent) { if torrent.AnyInProgress() { t.log.Infof("Torrent %s is in progress, cannot repair", torrent.AccessKey) return @@ -669,9 +643,6 @@ func (t *TorrentManager) Repair(accessKey string) { } else if streamableCount == 1 { t.log.Warnf("Torrent %s only file has expired (it will no longer show up in your directories, zurg suggests you delete it)", torrent.AccessKey) t.log.Debugf("You can try fixing it yourself magnet:?xt=urn:btih:%s", torrent.Instances[0].Hash) - t.DirectoryMap.IterCb(func(_ string, torrents cmap.ConcurrentMap[string, *Torrent]) { - torrents.Remove(torrent.AccessKey) - }) t.Delete(torrent.AccessKey, false, true) return } @@ -924,3 +895,19 @@ func (t *TorrentManager) AssignedDirectoryCb(tor *Torrent, cb func(string)) { } } } + +func (t *TorrentManager) RepairAll() { + _ = t.repairWorker.Submit(func() { + t.log.Info("Checking for torrents to repair") + t.repairAll() + t.log.Info("Finished checking for torrents to repair") + }) +} + +func (t *TorrentManager) Repair(torrent *Torrent) { + _ = t.repairWorker.Submit(func() { + t.log.Info("repairing torrent %s", torrent.AccessKey) + t.repair(torrent) + t.log.Info("Finished repairing torrent %s", torrent.AccessKey) + }) +} diff --git a/internal/universal/get.go b/internal/universal/get.go index 389238a..75e3dfa 100644 --- a/internal/universal/get.go +++ b/internal/universal/get.go @@ -67,7 +67,8 @@ func (gf *GetFile) HandleGetRequest(directory, torrentName, fileName string, res unrestrict := torMgr.UnrestrictUntilOk(link) if unrestrict == nil { // log.Warnf("File %s is no longer available, link %s", filepath.Base(file.Path), link) - file.Link = "repair" + file.Link = "repairing" + torMgr.Repair(torrent) torMgr.UpdateTorrentResponseCache(torrent) http.Error(resp, "File is not available", http.StatusNotFound) return @@ -148,7 +149,8 @@ func (gf *GetFile) streamFileToResponse(torrent *intTor.Torrent, file *intTor.Fi if err != nil { if file != nil { log.Warnf("Cannot download file %s: %v", file.Path, err) - file.Link = "repair" + file.Link = "repairing" + torMgr.Repair(torrent) torMgr.UpdateTorrentResponseCache(torrent) } http.Error(resp, "File is not available", http.StatusNotFound) @@ -159,7 +161,8 @@ func (gf *GetFile) streamFileToResponse(torrent *intTor.Torrent, file *intTor.Fi if download.StatusCode != http.StatusOK && download.StatusCode != http.StatusPartialContent { if file != nil { log.Warnf("Received a %s status code for file %s", download.Status, file.Path) - file.Link = "repair" + file.Link = "repairing" + torMgr.Repair(torrent) torMgr.UpdateTorrentResponseCache(torrent) } http.Error(resp, "File is not available", http.StatusNotFound)