diff --git a/internal/torrent/delete.go b/internal/torrent/delete.go new file mode 100644 index 0000000..98e4150 --- /dev/null +++ b/internal/torrent/delete.go @@ -0,0 +1,41 @@ +package torrent + +import cmap "github.com/orcaman/concurrent-map/v2" + +func (t *TorrentManager) CheckDeletedState(torrent *Torrent) bool { + var unselectedIDs []int + torrent.SelectedFiles.IterCb(func(_ string, file *File) { + if file.Link == "unselect" { + unselectedIDs = append(unselectedIDs, file.ID) + } + }) + if len(unselectedIDs) == torrent.SelectedFiles.Count() && len(unselectedIDs) > 0 { + return true + } else if len(unselectedIDs) > 0 { + torrent.DownloadedIDs.Each(func(id string) bool { + t.writeTorrentToFile(id, torrent) + return true + }) + } + return false +} + +func (t *TorrentManager) Delete(accessKey string, deleteInRD bool) { + if deleteInRD { + allTorrents, _ := t.DirectoryMap.Get(INT_ALL) + infoCache, _ := t.DirectoryMap.Get(INT_INFO_CACHE) + if torrent, ok := allTorrents.Get(accessKey); ok { + torrent.DownloadedIDs.Each(func(id string) bool { + t.log.Infof("Deleting torrent %s %s in RD", id, accessKey) + t.Api.DeleteTorrent(id) + infoCache.Remove(id) + t.deleteTorrentFile(id) + return true + }) + } + } + t.log.Infof("Removing torrent %s from zurg database", accessKey) + t.DirectoryMap.IterCb(func(directory string, torrents cmap.ConcurrentMap[string, *Torrent]) { + torrents.Remove(accessKey) + }) +} diff --git a/internal/torrent/manager.go b/internal/torrent/manager.go index ea8a2b1..9ab505f 100644 --- a/internal/torrent/manager.go +++ b/internal/torrent/manager.go @@ -1,14 +1,9 @@ package torrent import ( - "fmt" "io" - "math" "os" - "path/filepath" "strings" - "sync" - "time" "github.com/debridmediamanager/zurg/internal/config" "github.com/debridmediamanager/zurg/pkg/realdebrid" @@ -110,196 +105,9 @@ func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, p return t } -func (t *TorrentManager) RefreshTorrents() { - instances, _, err := t.Api.GetTorrents(0) - if err != nil { - t.log.Warnf("Cannot get torrents: %v\n", err) - return - } - infoChan := make(chan *Torrent, len(instances)) - var wg sync.WaitGroup - for i := range instances { - wg.Add(1) - idx := i // capture the loop variable - _ = t.workerPool.Submit(func() { - defer wg.Done() - infoChan <- t.getMoreInfo(instances[idx]) - }) - } - wg.Wait() - close(infoChan) - t.log.Infof("Fetched info for %d torrents", len(instances)) - - freshKeys := set.NewStringSet() - allTorrents, _ := t.DirectoryMap.Get(INT_ALL) - noInfoCount := 0 - for info := range infoChan { - if info == nil { - noInfoCount++ - continue - } - freshKeys.Add(info.AccessKey) - if torrent, exists := allTorrents.Get(info.AccessKey); !exists { - allTorrents.Set(info.AccessKey, info) - } else if !strset.Difference(info.DownloadedIDs, torrent.DownloadedIDs).IsEmpty() { - mainTorrent := t.mergeToMain(torrent, info) - allTorrents.Set(info.AccessKey, &mainTorrent) - } - } - t.log.Infof("Compiled into %d torrents, %d were missing info", allTorrents.Count(), noInfoCount) - - // removed - strset.Difference(t.accessKeySet, freshKeys).Each(func(accessKey string) bool { - t.Delete(accessKey, false) - return true - }) - // new - var updatedPaths []string - strset.Difference(freshKeys, t.accessKeySet).Each(func(accessKey string) bool { - t.accessKeySet.Add(accessKey) - tor, _ := allTorrents.Get(accessKey) - t.AssignedDirectoryCb(tor, func(directory string) { - if strings.HasPrefix(directory, "int__") { - return - } - torrents, _ := t.DirectoryMap.Get(directory) - torrents.Set(accessKey, tor) - updatedPaths = append(updatedPaths, fmt.Sprintf("%s/%s", directory, accessKey)) - }) - return true - }) - - t.SetNewLatestState(t.getCurrentState()) - - t.TriggerHookOnLibraryUpdate(updatedPaths) -} - -// getMoreInfo gets original name, size and files for a torrent -func (t *TorrentManager) getMoreInfo(rdTorrent realdebrid.Torrent) *Torrent { - infoCache, _ := t.DirectoryMap.Get(INT_INFO_CACHE) - if tor, exists := infoCache.Get(rdTorrent.ID); exists && tor.SelectedFiles.Count() == len(rdTorrent.Links) { - return tor - } - torrentFromFile := t.readTorrentFromFile(rdTorrent.ID) - if torrentFromFile != nil && torrentFromFile.SelectedFiles.Count() == len(rdTorrent.Links) { - infoCache.Set(rdTorrent.ID, torrentFromFile) - return torrentFromFile - } - - info, err := t.Api.GetTorrentInfo(rdTorrent.ID) - if err != nil { - t.log.Warnf("Cannot get info for id=%s: %v\n", rdTorrent.ID, err) - return nil - } - // SelectedFiles is a subset of Files with only the selected ones - // it also has a Link field, which can be empty - // if it is empty, it means the file is no longer available - // Files+Links together are the same as SelectedFiles - var selectedFiles []*File - // if some Links are empty, we need to repair it - for _, file := range info.Files { - if file.Selected == 0 { - continue - } - selectedFiles = append(selectedFiles, &File{ - File: file, - Added: info.Added, - Ended: info.Ended, - Link: "", // no link yet - }) - } - if len(selectedFiles) > len(info.Links) && info.Progress == 100 { - t.log.Warnf("Torrent id=%s is partly expired, it has %d selected files but only %d links", info.ID, len(selectedFiles), len(info.Links)) - for i, file := range selectedFiles { - file.Link = "repair" - i++ - } - } else if len(selectedFiles) == len(info.Links) { - // all links are still intact! good! - for i, file := range selectedFiles { - file.Link = info.Links[i] - i++ - } - } - - torrent := Torrent{ - AccessKey: t.computeAccessKey(info.Name, info.OriginalName), - LatestAdded: info.Added, - Hash: info.Hash, - } - torrent.SelectedFiles = cmap.New[*File]() - for _, file := range selectedFiles { - // todo better handling of duplicate filenames - if torrent.SelectedFiles.Has(filepath.Base(file.Path)) { - oldName := filepath.Base(file.Path) - ext := filepath.Ext(oldName) - filename := strings.TrimSuffix(oldName, ext) - newName := fmt.Sprintf("%s (%d)%s", filename, file.ID, ext) - torrent.SelectedFiles.Set(newName, file) - } else { - torrent.SelectedFiles.Set(filepath.Base(file.Path), file) - } - } - torrent.DownloadedIDs = strset.New() - torrent.InProgressIDs = strset.New() - if info.Progress == 100 { - torrent.DownloadedIDs.Add(info.ID) - } else { - torrent.InProgressIDs.Add(info.ID) - } - - err = t.writeTorrentToFile(rdTorrent.ID, &torrent) - if err != nil { - t.log.Warnf("Cannot write torrent to file: %v", err) - } - infoCache.Set(rdTorrent.ID, &torrent) - - return &torrent -} - -func (t *TorrentManager) mergeToMain(existing, toMerge *Torrent) Torrent { - mainTorrent := Torrent{} - - mainTorrent.AccessKey = existing.AccessKey - mainTorrent.Hash = existing.Hash - mainTorrent.DownloadedIDs = strset.New() - mainTorrent.InProgressIDs = strset.New() - - // this function triggers only when we have a new DownloadedID - strset.Difference(toMerge.DownloadedIDs, existing.DownloadedIDs).Each(func(id string) bool { - mainTorrent.DownloadedIDs.Add(id) - mainTorrent.InProgressIDs.Remove(id) - return true - }) - - // the link can have the following values - // 1. https://*** - the file is available - // 2. repair - the file is available but we need to repair it - // 3. repairing - the file is being repaired - // 4. unselect - the file is deleted - mainTorrent.SelectedFiles = existing.SelectedFiles - toMerge.SelectedFiles.IterCb(func(filepath string, fileToMerge *File) { - // see if it already exists in the main torrent - if originalFile, ok := mainTorrent.SelectedFiles.Get(filepath); !ok || fileToMerge.Link == "unselect" { - // if it doesn't exist in the main torrent, add it - mainTorrent.SelectedFiles.Set(filepath, fileToMerge) - } else if originalFile.Link != "unselect" { - // if it exists, compare the LatestAdded property and the link - if existing.LatestAdded < toMerge.LatestAdded && strings.HasPrefix(fileToMerge.Link, "http") { - // if torrentToMerge is more recent and its file has a link, update the main torrent's file - mainTorrent.SelectedFiles.Set(filepath, fileToMerge) - } - // else do nothing, the main torrent's file is more recent or has a valid link - } - }) - - if existing.LatestAdded < toMerge.LatestAdded { - mainTorrent.LatestAdded = toMerge.LatestAdded - } else { - mainTorrent.LatestAdded = existing.LatestAdded - } - - return mainTorrent +// proxy +func (t *TorrentManager) UnrestrictUntilOk(link string) *realdebrid.Download { + return t.Api.UnrestrictUntilOk(link, t.Config.ShouldServeFromRclone()) } func (t *TorrentManager) TriggerHookOnLibraryUpdate(updatedPaths []string) { @@ -309,98 +117,22 @@ func (t *TorrentManager) TriggerHookOnLibraryUpdate(updatedPaths []string) { }) } -// proxy -func (t *TorrentManager) UnrestrictUntilOk(link string) *realdebrid.Download { - return t.Api.UnrestrictUntilOk(link, t.Config.ShouldServeFromRclone()) -} - -func (t *TorrentManager) SetNewLatestState(checksum LibraryState) { - t.latestState.DownloadingCount = checksum.DownloadingCount - t.latestState.FirstTorrent = checksum.FirstTorrent - t.latestState.TotalCount = checksum.TotalCount -} - -type torrentsResp struct { - torrents []realdebrid.Torrent - totalCount int -} - -// generates a checksum based on the number of torrents, the first torrent id and the number of active torrents -func (t *TorrentManager) getCurrentState() LibraryState { - torrentsChan := make(chan torrentsResp, 1) - countChan := make(chan int, 1) - errChan := make(chan error, 2) // accommodate errors from both goroutines - defer close(torrentsChan) - defer close(countChan) - defer close(errChan) - - _ = t.workerPool.Submit(func() { - torrents, totalCount, err := t.Api.GetTorrents(1) - if err != nil { - errChan <- err - return +func (t *TorrentManager) assignedDirectoryCb(tor *Torrent, cb func(string)) { + torrentIDs := strset.Union(tor.DownloadedIDs, tor.InProgressIDs).List() + // get filenames needed for directory conditions + filenames := tor.SelectedFiles.Keys() + // Map torrents to directories + switch t.Config.GetVersion() { + case "v1": + configV1 := t.Config.(*config.ZurgConfigV1) + for _, directories := range configV1.GetGroupMap() { + for _, directory := range directories { + if t.Config.MeetsConditions(directory, tor.AccessKey, torrentIDs, filenames) { + cb(directory) + break + } + } } - torrentsChan <- torrentsResp{torrents: torrents, totalCount: totalCount} - }) - - _ = t.workerPool.Submit(func() { - count, err := t.Api.GetActiveTorrentCount() - if err != nil { - errChan <- err - return - } - countChan <- count.DownloadingCount - }) - - // Existing goroutines for GetTorrents and GetActiveTorrentCount - var torrents []realdebrid.Torrent - var totalCount, count int - - for i := 0; i < 2; i++ { - select { - case resp := <-torrentsChan: - torrents = resp.torrents - totalCount = resp.totalCount - case count = <-countChan: - case err := <-errChan: - t.log.Warnf("Checksum API Error: %v\n", err) - return EmptyState() - } - } - - if len(torrents) == 0 { - t.log.Error("Huh, no torrents returned") - return EmptyState() - } - - return LibraryState{ - TotalCount: totalCount, - FirstTorrent: &torrents[0], - DownloadingCount: count, - } -} - -// startRefreshJob periodically refreshes the torrents -func (t *TorrentManager) startRefreshJob() { - t.log.Info("Starting periodic refresh") - for { - <-time.After(time.Duration(t.Config.GetRefreshEverySeconds()) * time.Second) - - checksum := t.getCurrentState() - if t.latestState.equal(checksum) { - continue - } - t.log.Infof("Detected changes! Refreshing %d torrents", checksum.TotalCount) - - t.RefreshTorrents() - t.log.Info("Finished refreshing torrents") - - if t.Config.EnableRepair() { - t.RepairAll() - } else { - t.log.Info("Repair is disabled, skipping repair check") - } - } } @@ -418,11 +150,12 @@ func (t *TorrentManager) computeAccessKey(name, originalName string) string { } } -func (t *TorrentManager) writeTorrentToFile(instanceID string, torrent *Torrent) error { +func (t *TorrentManager) writeTorrentToFile(instanceID string, torrent *Torrent) { filePath := "data/" + instanceID + ".json" file, err := os.Create(filePath) if err != nil { - return fmt.Errorf("failed creating file: %w", err) + t.log.Warnf("Cannot create file %s: %v", filePath, err) + return } defer file.Close() @@ -430,15 +163,16 @@ func (t *TorrentManager) writeTorrentToFile(instanceID string, torrent *Torrent) jsonData, err := json.Marshal(torrent) if err != nil { - return fmt.Errorf("failed marshaling torrent: %w", err) + t.log.Warnf("Cannot marshal torrent: %v", err) + return } if _, err := file.Write(jsonData); err != nil { - return fmt.Errorf("failed writing to file: %w", err) + t.log.Warnf("Cannot write to file %s: %v", filePath, err) + return } t.log.Debugf("Saved torrent %s to file", instanceID) - return nil } func (t *TorrentManager) readTorrentFromFile(torrentID string) *Torrent { @@ -475,276 +209,3 @@ func (t *TorrentManager) deleteTorrentFile(torrentID string) { t.log.Warnf("Cannot delete file %s: %v", filePath, err) } } - -func (t *TorrentManager) repairAll() { - proceed := t.canCapacityHandle() // blocks for approx 45 minutes if active torrents are full - if !proceed { - t.log.Error("Reached the max number of active torrents, cannot start repair") - // TODO delete oldest in progress torrent - return - } - - allTorrents, _ := t.DirectoryMap.Get(INT_ALL) - var toRepair []*Torrent - allTorrents.IterCb(func(_ string, torrent *Torrent) { - if torrent.AnyInProgress() { - t.log.Debugf("Skipping %s for repairs because it is in progress", torrent.AccessKey) - return - } - forRepair := false - torrent.SelectedFiles.IterCb(func(_ string, file *File) { - if file.Link == "repair" { - file.Link = "repairing" - forRepair = true - } - }) - if forRepair { - toRepair = append(toRepair, torrent) - } - }) - t.log.Debugf("Found %d torrents to repair", len(toRepair)) - for i := range toRepair { - t.log.Infof("Repairing %s", toRepair[i].AccessKey) - t.repair(toRepair[i]) - } -} - -func (t *TorrentManager) CheckDeletedState(torrent *Torrent) bool { - var unselectedIDs []int - torrent.SelectedFiles.IterCb(func(_ string, file *File) { - if file.Link == "unselect" { - unselectedIDs = append(unselectedIDs, file.ID) - } - }) - if len(unselectedIDs) == torrent.SelectedFiles.Count() && len(unselectedIDs) > 0 { - return true - } else if len(unselectedIDs) > 0 { - torrent.DownloadedIDs.Each(func(id string) bool { - t.writeTorrentToFile(id, torrent) - return true - }) - } - return false -} - -func (t *TorrentManager) Delete(accessKey string, deleteInRD bool) { - if deleteInRD { - allTorrents, _ := t.DirectoryMap.Get(INT_ALL) - infoCache, _ := t.DirectoryMap.Get(INT_INFO_CACHE) - if torrent, ok := allTorrents.Get(accessKey); ok { - torrent.DownloadedIDs.Each(func(id string) bool { - t.log.Infof("Deleting torrent %s %s in RD", id, accessKey) - t.Api.DeleteTorrent(id) - infoCache.Remove(id) - t.deleteTorrentFile(id) - return true - }) - } - } - t.log.Infof("Removing torrent %s from zurg database", accessKey) - t.DirectoryMap.IterCb(func(directory string, torrents cmap.ConcurrentMap[string, *Torrent]) { - torrents.Remove(accessKey) - }) -} - -func (t *TorrentManager) repair(torrent *Torrent) { - if torrent.AllInProgress() { - t.log.Infof("Torrent %s is in progress, skipping repair until download is done", torrent.AccessKey) - return - } - - proceed := t.canCapacityHandle() // blocks for approx 45 minutes if active torrents are full - if !proceed { - t.log.Error("Reached the max number of active torrents, cannot continue with the repair") - return - } - - // first solution: reinsert with same selection - if t.reinsertTorrent(torrent, "") { - t.log.Infof("Successfully downloaded torrent %s to repair it", torrent.AccessKey) - return - } - - // second solution: add only the missing files - var missingFiles []File - torrent.SelectedFiles.IterCb(func(_ string, file *File) { - if !strings.HasPrefix(file.Link, "http") { - missingFiles = append(missingFiles, *file) - } - }) - - // if we download a single file, it will be named differently - // so we need to download 1 extra file to preserve the name - // this is only relevant if we enable retain_rd_torrent_name - if len(missingFiles) == 1 && torrent.SelectedFiles.Count() > 1 { - // add the first file link encountered with a prefix of http - for _, file := range torrent.SelectedFiles.Items() { - if strings.HasPrefix(file.Link, "http") { - missingFiles = append(missingFiles, *file) - break - } - } - } - if len(missingFiles) > 0 { - t.log.Infof("Redownloading in multiple batches the %d missing files for torrent %s", len(missingFiles), torrent.AccessKey) - // if not, last resort: add only the missing files but do it in 2 batches - half := len(missingFiles) / 2 - missingFiles1 := strings.Join(getFileIDs(missingFiles[:half]), ",") - missingFiles2 := strings.Join(getFileIDs(missingFiles[half:]), ",") - if missingFiles1 != "" { - t.reinsertTorrent(torrent, missingFiles1) - } - if missingFiles2 != "" { - t.reinsertTorrent(torrent, missingFiles2) - } - } else { - t.log.Warnf("Torrent %s has no missing files to repair", torrent.AccessKey) - } -} - -func (t *TorrentManager) reinsertTorrent(torrent *Torrent, missingFiles string) bool { - // if missingFiles is not provided, missing files means missing links - if missingFiles == "" { - tmpSelection := "" - torrent.SelectedFiles.IterCb(func(_ string, file *File) { - tmpSelection += fmt.Sprintf("%d,", file.ID) // select all files - }) - if tmpSelection == "" { - return true // nothing to repair - } else { - missingFiles = tmpSelection[:len(tmpSelection)-1] - } - } - - // redownload torrent - resp, err := t.Api.AddMagnetHash(torrent.Hash) - if err != nil { - t.log.Warnf("Cannot redownload torrent: %v", err) - return false - } - time.Sleep(1 * time.Second) - - // select files - newTorrentID := resp.ID - err = t.Api.SelectTorrentFiles(newTorrentID, missingFiles) - if err != nil { - t.log.Warnf("Cannot start redownloading: %v", err) - t.Api.DeleteTorrent(newTorrentID) - return false - } - time.Sleep(10 * time.Second) - - // see if the torrent is ready - info, err := t.Api.GetTorrentInfo(newTorrentID) - if err != nil { - t.log.Warnf("Cannot get info on redownloaded torrent id=%s : %v", newTorrentID, err) - t.Api.DeleteTorrent(newTorrentID) - return false - } - - if info.Status == "magnet_error" || info.Status == "error" || info.Status == "virus" || info.Status == "dead" { - t.log.Warnf("The redownloaded torrent id=%s is in error state: %s", newTorrentID, info.Status) - t.Api.DeleteTorrent(newTorrentID) - return false - } - - if info.Progress != 100 { - t.log.Infof("Torrent id=%s is not cached anymore so we have to wait until completion (this should fix the issue already)", info.ID) - return true - } - - missingCount := len(strings.Split(missingFiles, ",")) - if len(info.Links) != missingCount { - t.log.Infof("It did not fix the issue for id=%s, only got %d files but we need %d, undoing", info.ID, len(info.Links), missingCount) - t.Api.DeleteTorrent(newTorrentID) - return false - } - - t.log.Infof("Repair successful id=%s", newTorrentID) - return true -} - -func (t *TorrentManager) canCapacityHandle() bool { - // max waiting time is 45 minutes - const maxRetries = 50 - const baseDelay = 1 * time.Second - const maxDelay = 60 * time.Second - retryCount := 0 - for { - count, err := t.Api.GetActiveTorrentCount() - if err != nil { - t.log.Warnf("Cannot get active downloads count: %v", err) - if retryCount >= maxRetries { - t.log.Error("Max retries reached. Exiting.") - return false - } - delay := time.Duration(math.Pow(2, float64(retryCount))) * baseDelay - if delay > maxDelay { - delay = maxDelay - } - time.Sleep(delay) - retryCount++ - continue - } - - if count.DownloadingCount < count.MaxNumberOfTorrents { - // t.log.Infof("We can still add a new torrent, we have capacity for %d more", count.MaxNumberOfTorrents-count.DownloadingCount) - return true - } - - delay := time.Duration(math.Pow(2, float64(retryCount))) * baseDelay - if delay > maxDelay { - delay = maxDelay - } - t.log.Infof("We have reached the max number of active torrents, waiting for %s seconds before retrying", delay) - - if retryCount >= maxRetries { - t.log.Error("Max retries reached, exiting") - return false - } - - time.Sleep(delay) - retryCount++ - } -} - -func (t *TorrentManager) RepairAll() { - _ = t.repairWorker.Submit(func() { - t.log.Info("Checking for torrents to repair") - t.repairAll() - t.log.Info("Finished checking for torrents to repair") - }) -} - -func (t *TorrentManager) Repair(torrent *Torrent) { - _ = t.repairWorker.Submit(func() { - t.log.Info("Repairing torrent %s", torrent.AccessKey) - t.repair(torrent) - t.log.Info("Finished repairing torrent %s", torrent.AccessKey) - }) - - var updatedPaths []string - t.AssignedDirectoryCb(torrent, func(directory string) { - updatedPaths = append(updatedPaths, fmt.Sprintf("%s/%s", directory, torrent.AccessKey)) - }) - t.TriggerHookOnLibraryUpdate(updatedPaths) -} - -func (t *TorrentManager) AssignedDirectoryCb(tor *Torrent, cb func(string)) { - torrentIDs := strset.Union(tor.DownloadedIDs, tor.InProgressIDs).List() - // get filenames needed for directory conditions - filenames := tor.SelectedFiles.Keys() - // Map torrents to directories - switch t.Config.GetVersion() { - case "v1": - configV1 := t.Config.(*config.ZurgConfigV1) - for _, directories := range configV1.GetGroupMap() { - for _, directory := range directories { - if t.Config.MeetsConditions(directory, tor.AccessKey, torrentIDs, filenames) { - cb(directory) - break - } - } - } - } -} diff --git a/internal/torrent/refresh.go b/internal/torrent/refresh.go new file mode 100644 index 0000000..c80e45d --- /dev/null +++ b/internal/torrent/refresh.go @@ -0,0 +1,291 @@ +package torrent + +import ( + "fmt" + "path/filepath" + "strings" + "sync" + "time" + + "github.com/debridmediamanager/zurg/pkg/realdebrid" + cmap "github.com/orcaman/concurrent-map/v2" + "github.com/scylladb/go-set" + "github.com/scylladb/go-set/strset" +) + +func (t *TorrentManager) RefreshTorrents() []string { + instances, _, err := t.Api.GetTorrents(0) + if err != nil { + t.log.Warnf("Cannot get torrents: %v\n", err) + return nil + } + infoChan := make(chan *Torrent, len(instances)) + var wg sync.WaitGroup + for i := range instances { + wg.Add(1) + idx := i // capture the loop variable + _ = t.workerPool.Submit(func() { + defer wg.Done() + infoChan <- t.getMoreInfo(instances[idx]) + }) + } + wg.Wait() + close(infoChan) + t.log.Infof("Fetched info for %d torrents", len(instances)) + + freshKeys := set.NewStringSet() + allTorrents, _ := t.DirectoryMap.Get(INT_ALL) + noInfoCount := 0 + for info := range infoChan { + if info == nil { + noInfoCount++ + continue + } + freshKeys.Add(info.AccessKey) + if torrent, exists := allTorrents.Get(info.AccessKey); !exists { + allTorrents.Set(info.AccessKey, info) + } else if !strset.Difference(info.DownloadedIDs, torrent.DownloadedIDs).IsEmpty() { + mainTorrent := t.mergeToMain(torrent, info) + allTorrents.Set(info.AccessKey, &mainTorrent) + } + } + t.log.Infof("Compiled into %d torrents, %d were missing info", allTorrents.Count(), noInfoCount) + + // removed + strset.Difference(t.accessKeySet, freshKeys).Each(func(accessKey string) bool { + t.Delete(accessKey, false) + return true + }) + // new + var updatedPaths []string + strset.Difference(freshKeys, t.accessKeySet).Each(func(accessKey string) bool { + t.accessKeySet.Add(accessKey) + tor, _ := allTorrents.Get(accessKey) + t.assignedDirectoryCb(tor, func(directory string) { + if strings.HasPrefix(directory, "int__") { + return + } + torrents, _ := t.DirectoryMap.Get(directory) + torrents.Set(accessKey, tor) + updatedPaths = append(updatedPaths, fmt.Sprintf("%s/%s", directory, accessKey)) + }) + return true + }) + + t.SetNewLatestState(t.getCurrentState()) + + return updatedPaths +} + +func (t *TorrentManager) SetNewLatestState(checksum LibraryState) { + t.latestState.DownloadingCount = checksum.DownloadingCount + t.latestState.FirstTorrent = checksum.FirstTorrent + t.latestState.TotalCount = checksum.TotalCount +} + +// startRefreshJob periodically refreshes the torrents +func (t *TorrentManager) startRefreshJob() { + t.log.Info("Starting periodic refresh") + for { + <-time.After(time.Duration(t.Config.GetRefreshEverySeconds()) * time.Second) + + checksum := t.getCurrentState() + if t.latestState.equal(checksum) { + continue + } + t.log.Infof("Detected changes! Refreshing %d torrents", checksum.TotalCount) + + updatedPaths := t.RefreshTorrents() + t.log.Info("Finished refreshing torrents") + + t.TriggerHookOnLibraryUpdate(updatedPaths) + + if t.Config.EnableRepair() { + t.RepairAll() + } else { + t.log.Info("Repair is disabled, skipping repair check") + } + } +} + +// getMoreInfo gets original name, size and files for a torrent +func (t *TorrentManager) getMoreInfo(rdTorrent realdebrid.Torrent) *Torrent { + infoCache, _ := t.DirectoryMap.Get(INT_INFO_CACHE) + if tor, exists := infoCache.Get(rdTorrent.ID); exists && tor.SelectedFiles.Count() == len(rdTorrent.Links) { + return tor + } + torrentFromFile := t.readTorrentFromFile(rdTorrent.ID) + if torrentFromFile != nil && torrentFromFile.SelectedFiles.Count() == len(rdTorrent.Links) { + infoCache.Set(rdTorrent.ID, torrentFromFile) + return torrentFromFile + } + + info, err := t.Api.GetTorrentInfo(rdTorrent.ID) + if err != nil { + t.log.Warnf("Cannot get info for id=%s: %v\n", rdTorrent.ID, err) + return nil + } + + torrent := Torrent{ + AccessKey: t.computeAccessKey(info.Name, info.OriginalName), + LatestAdded: info.Added, + Hash: info.Hash, + } + // SelectedFiles is a subset of Files with only the selected ones + // it also has a Link field, which can be empty + // if it is empty, it means the file is no longer available + // Files+Links together are the same as SelectedFiles + var selectedFiles []*File + // if some Links are empty, we need to repair it + for _, file := range info.Files { + if file.Selected == 0 { + continue + } + selectedFiles = append(selectedFiles, &File{ + File: file, + Added: info.Added, + Ended: info.Ended, + Link: "", // no link yet + }) + } + if len(selectedFiles) > len(info.Links) && info.Progress == 100 { + t.log.Warnf("Torrent id=%s is partly expired, it has %d selected files but only %d links", info.ID, len(selectedFiles), len(info.Links)) + torrent.ForRepair = true + } else if len(selectedFiles) == len(info.Links) { + // all links are still intact! good! + for i, file := range selectedFiles { + file.Link = info.Links[i] + i++ + } + } + torrent.SelectedFiles = cmap.New[*File]() + for _, file := range selectedFiles { + // todo better handling of duplicate filenames + if torrent.SelectedFiles.Has(filepath.Base(file.Path)) { + oldName := filepath.Base(file.Path) + ext := filepath.Ext(oldName) + filename := strings.TrimSuffix(oldName, ext) + newName := fmt.Sprintf("%s (%d)%s", filename, file.ID, ext) + torrent.SelectedFiles.Set(newName, file) + } else { + torrent.SelectedFiles.Set(filepath.Base(file.Path), file) + } + } + torrent.DownloadedIDs = strset.New() + torrent.InProgressIDs = strset.New() + if info.Progress == 100 { + torrent.DownloadedIDs.Add(info.ID) + } else { + torrent.InProgressIDs.Add(info.ID) + } + + t.writeTorrentToFile(rdTorrent.ID, &torrent) + infoCache.Set(rdTorrent.ID, &torrent) + + return &torrent +} + +func (t *TorrentManager) mergeToMain(existing, toMerge *Torrent) Torrent { + mainTorrent := Torrent{} + + mainTorrent.AccessKey = existing.AccessKey + mainTorrent.Hash = existing.Hash + mainTorrent.DownloadedIDs = strset.New() + mainTorrent.InProgressIDs = strset.New() + + // this function triggers only when we have a new DownloadedID + strset.Difference(toMerge.DownloadedIDs, existing.DownloadedIDs).Each(func(id string) bool { + mainTorrent.DownloadedIDs.Add(id) + mainTorrent.InProgressIDs.Remove(id) + return true + }) + + // the link can have the following values + // 1. https://*** - the file is available + // 2. repair - the file is available but we need to repair it + // 3. repairing - the file is being repaired + // 4. unselect - the file is deleted + mainTorrent.SelectedFiles = existing.SelectedFiles + toMerge.SelectedFiles.IterCb(func(filepath string, fileToMerge *File) { + // see if it already exists in the main torrent + if originalFile, ok := mainTorrent.SelectedFiles.Get(filepath); !ok || fileToMerge.Link == "unselect" { + // if it doesn't exist in the main torrent, add it + mainTorrent.SelectedFiles.Set(filepath, fileToMerge) + } else if originalFile.Link != "unselect" { + // if it exists, compare the LatestAdded property and the link + if existing.LatestAdded < toMerge.LatestAdded && strings.HasPrefix(fileToMerge.Link, "http") { + // if torrentToMerge is more recent and its file has a link, update the main torrent's file + mainTorrent.SelectedFiles.Set(filepath, fileToMerge) + } + // else do nothing, the main torrent's file is more recent or has a valid link + } + }) + + if existing.LatestAdded < toMerge.LatestAdded { + mainTorrent.LatestAdded = toMerge.LatestAdded + } else { + mainTorrent.LatestAdded = existing.LatestAdded + } + + return mainTorrent +} + +type torrentsResp struct { + torrents []realdebrid.Torrent + totalCount int +} + +// generates a checksum based on the number of torrents, the first torrent id and the number of active torrents +func (t *TorrentManager) getCurrentState() LibraryState { + torrentsChan := make(chan torrentsResp, 1) + countChan := make(chan int, 1) + errChan := make(chan error, 2) // accommodate errors from both goroutines + defer close(torrentsChan) + defer close(countChan) + defer close(errChan) + + _ = t.workerPool.Submit(func() { + torrents, totalCount, err := t.Api.GetTorrents(1) + if err != nil { + errChan <- err + return + } + torrentsChan <- torrentsResp{torrents: torrents, totalCount: totalCount} + }) + + _ = t.workerPool.Submit(func() { + count, err := t.Api.GetActiveTorrentCount() + if err != nil { + errChan <- err + return + } + countChan <- count.DownloadingCount + }) + + // Existing goroutines for GetTorrents and GetActiveTorrentCount + var torrents []realdebrid.Torrent + var totalCount, count int + + for i := 0; i < 2; i++ { + select { + case resp := <-torrentsChan: + torrents = resp.torrents + totalCount = resp.totalCount + case count = <-countChan: + case err := <-errChan: + t.log.Warnf("Checksum API Error: %v\n", err) + return EmptyState() + } + } + + if len(torrents) == 0 { + t.log.Error("Huh, no torrents returned") + return EmptyState() + } + + return LibraryState{ + TotalCount: totalCount, + FirstTorrent: &torrents[0], + DownloadingCount: count, + } +} diff --git a/internal/torrent/repair.go b/internal/torrent/repair.go new file mode 100644 index 0000000..3a1d989 --- /dev/null +++ b/internal/torrent/repair.go @@ -0,0 +1,210 @@ +package torrent + +import ( + "fmt" + "math" + "strings" + "time" +) + +func (t *TorrentManager) RepairAll() { + _ = t.repairWorker.Submit(func() { + t.log.Info("Checking for torrents to repair") + t.repairAll() + t.log.Info("Finished checking for torrents to repair") + }) +} + +func (t *TorrentManager) repairAll() { + allTorrents, _ := t.DirectoryMap.Get(INT_ALL) + var toRepair []*Torrent + allTorrents.IterCb(func(_ string, torrent *Torrent) { + if torrent.AnyInProgress() && torrent.ForRepair { + t.log.Warnf("Skipping %s for repairs because it is in progress", torrent.AccessKey) + return + } else if torrent.ForRepair { + toRepair = append(toRepair, torrent) + } + }) + t.log.Debugf("Found %d torrents to repair", len(toRepair)) + for i := range toRepair { + t.log.Infof("Repairing %s", toRepair[i].AccessKey) + t.repair(toRepair[i]) + } +} + +func (t *TorrentManager) Repair(torrent *Torrent) { + _ = t.repairWorker.Submit(func() { + t.log.Info("Repairing torrent %s", torrent.AccessKey) + t.repair(torrent) + t.log.Info("Finished repairing torrent %s", torrent.AccessKey) + }) + + var updatedPaths []string + t.assignedDirectoryCb(torrent, func(directory string) { + updatedPaths = append(updatedPaths, fmt.Sprintf("%s/%s", directory, torrent.AccessKey)) + }) + t.TriggerHookOnLibraryUpdate(updatedPaths) +} + +func (t *TorrentManager) repair(torrent *Torrent) { + if torrent.AllInProgress() { + t.log.Infof("Torrent %s is in progress, skipping repair until download is done", torrent.AccessKey) + return + } + + proceed := t.canCapacityHandle() // blocks for approx 45 minutes if active torrents are full + if !proceed { + t.log.Error("Reached the max number of active torrents, cannot continue with the repair") + return + } + + // first solution: reinsert with same selection + if t.reinsertTorrent(torrent, "") { + t.log.Infof("Successfully downloaded torrent %s to repair it", torrent.AccessKey) + return + } + + // second solution: add only the missing files + var missingFiles []File + torrent.SelectedFiles.IterCb(func(_ string, file *File) { + if !strings.HasPrefix(file.Link, "http") { + missingFiles = append(missingFiles, *file) + } + }) + + // if we download a single file, it will be named differently + // so we need to download 1 extra file to preserve the name + // this is only relevant if we enable retain_rd_torrent_name + if len(missingFiles) == 1 && torrent.SelectedFiles.Count() > 1 { + // add the first file link encountered with a prefix of http + for _, file := range torrent.SelectedFiles.Items() { + if strings.HasPrefix(file.Link, "http") { + missingFiles = append(missingFiles, *file) + break + } + } + } + if len(missingFiles) > 0 { + t.log.Infof("Redownloading in multiple batches the %d missing files for torrent %s", len(missingFiles), torrent.AccessKey) + // if not, last resort: add only the missing files but do it in 2 batches + half := len(missingFiles) / 2 + missingFiles1 := strings.Join(getFileIDs(missingFiles[:half]), ",") + missingFiles2 := strings.Join(getFileIDs(missingFiles[half:]), ",") + if missingFiles1 != "" { + t.reinsertTorrent(torrent, missingFiles1) + } + if missingFiles2 != "" { + t.reinsertTorrent(torrent, missingFiles2) + } + } else { + t.log.Warnf("Torrent %s has no missing files to repair", torrent.AccessKey) + } +} + +func (t *TorrentManager) reinsertTorrent(torrent *Torrent, missingFiles string) bool { + // missing files means missing links + // if missingFiles is not provided + if missingFiles == "" { + tmpSelection := "" + torrent.SelectedFiles.IterCb(func(_ string, file *File) { + tmpSelection += fmt.Sprintf("%d,", file.ID) // select all files + }) + if tmpSelection == "" { + return true // nothing to repair + } else { + missingFiles = tmpSelection[:len(tmpSelection)-1] + } + } + + // redownload torrent + resp, err := t.Api.AddMagnetHash(torrent.Hash) + if err != nil { + t.log.Warnf("Cannot redownload torrent: %v", err) + return false + } + time.Sleep(1 * time.Second) + + // select files + newTorrentID := resp.ID + err = t.Api.SelectTorrentFiles(newTorrentID, missingFiles) + if err != nil { + t.log.Warnf("Cannot start redownloading: %v", err) + t.Api.DeleteTorrent(newTorrentID) + return false + } + time.Sleep(10 * time.Second) + + // see if the torrent is ready + info, err := t.Api.GetTorrentInfo(newTorrentID) + if err != nil { + t.log.Warnf("Cannot get info on redownloaded torrent id=%s : %v", newTorrentID, err) + t.Api.DeleteTorrent(newTorrentID) + return false + } + + if info.Status == "magnet_error" || info.Status == "error" || info.Status == "virus" || info.Status == "dead" { + t.log.Warnf("The redownloaded torrent id=%s is in error state: %s", newTorrentID, info.Status) + t.Api.DeleteTorrent(newTorrentID) + return false + } + + if info.Progress != 100 { + t.log.Infof("Torrent id=%s is not cached anymore so we have to wait until completion (this should fix the issue already)", info.ID) + return true + } + + missingCount := len(strings.Split(missingFiles, ",")) + if len(info.Links) != missingCount { + t.log.Infof("It did not fix the issue for id=%s, only got %d files but we need %d, undoing", info.ID, len(info.Links), missingCount) + t.Api.DeleteTorrent(newTorrentID) + return false + } + + t.log.Infof("Repair successful id=%s", newTorrentID) + return true +} + +func (t *TorrentManager) canCapacityHandle() bool { + // max waiting time is 45 minutes + const maxRetries = 50 + const baseDelay = 1 * time.Second + const maxDelay = 60 * time.Second + retryCount := 0 + for { + count, err := t.Api.GetActiveTorrentCount() + if err != nil { + t.log.Warnf("Cannot get active downloads count: %v", err) + if retryCount >= maxRetries { + t.log.Error("Max retries reached. Exiting.") + return false + } + delay := time.Duration(math.Pow(2, float64(retryCount))) * baseDelay + if delay > maxDelay { + delay = maxDelay + } + time.Sleep(delay) + retryCount++ + continue + } + + if count.DownloadingCount < count.MaxNumberOfTorrents { + // t.log.Infof("We can still add a new torrent, we have capacity for %d more", count.MaxNumberOfTorrents-count.DownloadingCount) + return true + } + + delay := time.Duration(math.Pow(2, float64(retryCount))) * baseDelay + if delay > maxDelay { + delay = maxDelay + } + t.log.Infof("We have reached the max number of active torrents, waiting for %s seconds before retrying", delay) + + if retryCount >= maxRetries { + t.log.Error("Max retries reached. Exiting.") + return false + } + + time.Sleep(delay) + retryCount++ + } +} diff --git a/internal/torrent/types.go b/internal/torrent/types.go index 9fbacd3..1545229 100644 --- a/internal/torrent/types.go +++ b/internal/torrent/types.go @@ -17,6 +17,7 @@ type Torrent struct { Hash string `json:"Hash"` SelectedFiles cmap.ConcurrentMap[string, *File] `json:"-"` LatestAdded string `json:"LatestAdded"` + ForRepair bool `json:"ForRepair"` DownloadedIDs *strset.Set `json:"DownloadedIDs"` InProgressIDs *strset.Set `json:"InProgressIDs"` diff --git a/internal/universal/get.go b/internal/universal/get.go index 024574a..dd4d264 100644 --- a/internal/universal/get.go +++ b/internal/universal/get.go @@ -11,6 +11,7 @@ import ( "github.com/debridmediamanager/zurg/internal/config" intTor "github.com/debridmediamanager/zurg/internal/torrent" zurghttp "github.com/debridmediamanager/zurg/pkg/http" + "github.com/debridmediamanager/zurg/pkg/realdebrid" "go.uber.org/zap" ) @@ -89,7 +90,7 @@ func (gf *GetFile) HandleGetRequest(directory, torrentName, fileName string, res if cfg.ShouldServeFromRclone() { redirect(resp, req, unrestrict.Download, cfg) } else { - gf.streamFileToResponse(torrent, file, unrestrict.Download, resp, req, torMgr, cfg, log) + gf.streamFileToResponse(torrent, file, unrestrict, resp, req, torMgr, cfg, log) } return } @@ -128,9 +129,9 @@ func (gf *GetFile) streamCachedLinkToResponse(url string, resp http.ResponseWrit return nil } -func (gf *GetFile) streamFileToResponse(torrent *intTor.Torrent, file *intTor.File, url string, resp http.ResponseWriter, req *http.Request, torMgr *intTor.TorrentManager, cfg config.ConfigInterface, log *zap.SugaredLogger) { +func (gf *GetFile) streamFileToResponse(torrent *intTor.Torrent, file *intTor.File, unrestrict *realdebrid.Download, resp http.ResponseWriter, req *http.Request, torMgr *intTor.TorrentManager, cfg config.ConfigInterface, log *zap.SugaredLogger) { // Create a new request for the file download. - dlReq, err := http.NewRequest(http.MethodGet, url, nil) + dlReq, err := http.NewRequest(http.MethodGet, unrestrict.Download, nil) if err != nil { if file != nil { log.Errorf("Error creating new request for file %s: %v", file.Path, err) @@ -146,7 +147,7 @@ func (gf *GetFile) streamFileToResponse(torrent *intTor.Torrent, file *intTor.Fi download, err := gf.client.Do(dlReq) if err != nil { - if file != nil { + if file != nil && unrestrict.Streamable == 1 { log.Warnf("Cannot download file %s: %v", file.Path, err) file.Link = "repairing" torMgr.Repair(torrent) @@ -157,7 +158,7 @@ func (gf *GetFile) streamFileToResponse(torrent *intTor.Torrent, file *intTor.Fi defer download.Body.Close() if download.StatusCode != http.StatusOK && download.StatusCode != http.StatusPartialContent { - if file != nil { + if file != nil && unrestrict.Streamable == 1 { log.Warnf("Received a %s status code for file %s", download.Status, file.Path) file.Link = "repairing" torMgr.Repair(torrent) diff --git a/internal/universal/head.go b/internal/universal/head.go index 3b11268..8e645d8 100644 --- a/internal/universal/head.go +++ b/internal/universal/head.go @@ -58,12 +58,6 @@ func getContentMimeType(filePath string) string { return "audio/mpeg" case ".rar": return "application/x-rar-compressed" - case ".zip": - return "application/zip" - case ".7z": - return "application/x-7z-compressed" - case ".srt": - return "text/plain" default: return "application/octet-stream" }