Finalize repair

This commit is contained in:
Ben Sarmiento
2023-10-26 03:29:13 +02:00
parent cc9616894a
commit 54ab801796
13 changed files with 500 additions and 517 deletions

View File

@@ -32,7 +32,7 @@ type TorrentManager struct {
// and store them in-memory
func NewTorrentManager(config config.ConfigInterface, cache *expirable.LRU[string, string]) *TorrentManager {
t := &TorrentManager{
requiredVersion: "24.10.2023",
requiredVersion: "26.10.2023",
config: config,
cache: cache,
workerPool: make(chan bool, config.GetNumOfWorkers()),
@@ -68,23 +68,6 @@ func NewTorrentManager(config config.ConfigInterface, cache *expirable.LRU[strin
return t
}
func (t *TorrentManager) repairAll(wg *sync.WaitGroup) {
wg.Wait()
for _, torrent := range t.torrents {
if torrent.ForRepair {
log.Println("Issues detected on", torrent.Name, "; fixing...")
t.repair(torrent.ID, torrent.SelectedFiles, true)
}
if len(torrent.Links) == 0 {
// If the torrent has no links
// and already processing repair
// delete it!
log.Println("Deleting", torrent.Name, "as it has no links")
realdebrid.DeleteTorrent(t.config.GetToken(), torrent.ID)
}
}
}
// GetByDirectory returns all torrents that have a file in the specified directory
func (t *TorrentManager) GetByDirectory(directory string) []Torrent {
var torrents []Torrent
@@ -99,11 +82,10 @@ func (t *TorrentManager) GetByDirectory(directory string) []Torrent {
}
// HideTheFile marks a file as deleted
// func (t *TorrentManager) HideTheFile(torrent *Torrent, file *File) {
// file.Link = ""
// t.writeToFile(torrent)
// t.repair(torrent.ID, []File{*file}, false)
// }
func (t *TorrentManager) HideTheFile(torrent *Torrent, file *File) {
file.Unavailable = true
t.repair(torrent.ID, torrent.SelectedFiles, false)
}
// FindAllTorrentsWithName finds all torrents in a given directory with a given name
func (t *TorrentManager) FindAllTorrentsWithName(directory, torrentName string) []Torrent {
@@ -270,7 +252,7 @@ func (t *TorrentManager) addMoreInfo(torrent *Torrent) {
return
}
log.Println("Getting info for", torrent.ID)
// log.Println("Getting info for", torrent.ID)
info, err := realdebrid.GetTorrentInfo(t.config.GetToken(), torrent.ID)
if err != nil {
log.Printf("Cannot get info: %v\n", err)
@@ -282,9 +264,16 @@ func (t *TorrentManager) addMoreInfo(torrent *Torrent) {
// if it is empty, it means the file is no longer available
// Files+Links together are the same as SelectedFiles
var selectedFiles []File
var streamableFiles []File
// if some Links are empty, we need to repair it
forRepair := false
for _, file := range info.Files {
if isStreamable(file.Path) {
streamableFiles = append(streamableFiles, File{
File: file,
Link: "",
})
}
if file.Selected == 0 {
continue
}
@@ -300,10 +289,14 @@ func (t *TorrentManager) addMoreInfo(torrent *Torrent) {
var isChaotic bool
selectedFiles, isChaotic = t.organizeChaos(info, selectedFiles)
if isChaotic {
log.Println("This torrent is unfixable, ignoring", info.Name, info.ID)
log.Println("This torrent is unfixable, it's always returning an unstreamable link, ignoring", info.Name, info.ID)
} else {
log.Println("Marking for repair", info.Name)
forRepair = true
if len(streamableFiles) > 1 {
log.Println("Marking for repair", info.Name)
forRepair = true
} else {
log.Println("This torrent is unfixable, the lone streamable link has expired, ignoring", info.Name, info.ID)
}
}
} else {
// all links are still intact! good!
@@ -320,6 +313,64 @@ func (t *TorrentManager) addMoreInfo(torrent *Torrent) {
}
}
// mapToDirectories maps torrents to directories
func (t *TorrentManager) mapToDirectories() {
// Map torrents to directories
switch t.config.GetVersion() {
case "v1":
configV1 := t.config.(*config.ZurgConfigV1)
groupMap := configV1.GetGroupMap()
// for every group, iterate over every torrent
// and then sprinkle/distribute the torrents to the directories of the group
for group, directories := range groupMap {
log.Printf("Processing directory group '%s', sequence: %s\n", group, strings.Join(directories, " > "))
counter := make(map[string]int)
for i := range t.torrents {
// don't process torrents that are already mapped if it is not the first run
if _, exists := t.processedTorrents[t.torrents[i].ID]; exists {
continue
}
for _, directory := range directories {
var filenames []string
for _, file := range t.torrents[i].Files {
filenames = append(filenames, file.Path)
}
if configV1.MeetsConditions(directory, t.torrents[i].ID, t.torrents[i].Name, filenames) {
found := false
for _, dir := range t.TorrentDirectoriesMap[t.torrents[i].Name] {
if dir == directory {
found = true
break
}
}
if !found {
counter[directory]++
t.TorrentDirectoriesMap[t.torrents[i].Name] = append(t.TorrentDirectoriesMap[t.torrents[i].Name], directory)
break
}
}
}
}
sum := 0
for _, count := range counter {
sum += count
}
if sum > 0 {
log.Printf("Directory group processed: %s %v %d\n", group, counter, sum)
} else {
log.Println("No new additions to directory group", group)
}
}
default:
log.Println("Unknown config version")
}
for _, torrent := range t.torrents {
t.processedTorrents[torrent.ID] = true
}
log.Println("Finished mapping to directories")
}
// getByID returns a torrent by its ID
func (t *TorrentManager) getByID(torrentID string) *Torrent {
for i := range t.torrents {
@@ -376,6 +427,116 @@ func (t *TorrentManager) readFromFile(torrentID string) *Torrent {
return &torrent
}
func (t *TorrentManager) repairAll(wg *sync.WaitGroup) {
wg.Wait()
for _, torrent := range t.torrents {
if torrent.ForRepair {
log.Println("Issues were detected on", torrent.Name, "; fixing...")
t.repair(torrent.ID, torrent.SelectedFiles, true)
}
if len(torrent.Links) == 0 && torrent.Progress == 100 {
// If the torrent has no links
// and already processing repair
// delete it!
log.Println("Deleting", torrent.Name, "as it has no links")
realdebrid.DeleteTorrent(t.config.GetToken(), torrent.ID)
}
}
}
func (t *TorrentManager) repair(torrentID string, selectedFiles []File, tryReinsertionFirst bool) {
torrent := t.getByID(torrentID)
if torrent == nil {
return
}
// check if it is already "being" repaired
found := false
for _, hash := range t.inProgress {
if hash == torrent.Hash {
found = true
break
}
}
if found {
log.Println("Repair in progress, skipping", torrentID)
return
}
// check if it is already repaired
foundFiles := t.findAllDownloadedFilesFromHash(torrent.Hash)
var missingFiles []File
for _, sFile := range selectedFiles {
if sFile.Link == "" || sFile.Unavailable {
found := false
for _, fFile := range foundFiles {
// same file but different link, then yes it has been repaired
if sFile.Path == fFile.Path && sFile.Link != fFile.Link {
found = true
break
}
}
if !found {
missingFiles = append(missingFiles, sFile)
}
}
}
if len(missingFiles) == 0 {
log.Println(torrent.Name, "is already repaired")
return
}
// then we repair it!
log.Println("Repairing torrent", torrentID)
// check if we can still add more downloads
proceed := t.canCapacityHandle()
if !proceed {
log.Println("Cannot add more torrents, exiting")
return
}
// first solution: add the same selection, maybe it can be fixed by reinsertion?
success := false
if tryReinsertionFirst {
success = t.reinsertTorrent(torrent, "", true)
}
if !success {
// if all the selected files are missing but there are other streamable files
var otherStreamableFileIDs []int
for _, file := range torrent.Files {
found := false
for _, selectedFile := range selectedFiles {
if selectedFile.ID == file.ID {
found = true
break
}
}
if !found && isStreamable(file.Path) {
otherStreamableFileIDs = append(otherStreamableFileIDs, file.ID)
}
}
if (len(missingFiles) == len(selectedFiles) || len(missingFiles) == 1) && len(otherStreamableFileIDs) > 0 {
// we will download 1 extra streamable file to force a redownload of the missing files
// or if there's only 1 missing file, we will download 1 more to prevent a rename
missingFilesPlus1 := strings.Join(getFileIDs(missingFiles), ",")
missingFilesPlus1 += fmt.Sprintf(",%d", otherStreamableFileIDs[0])
t.reinsertTorrent(torrent, missingFilesPlus1, false)
} else {
// if not, last resort: add only the missing files but do it in 2 batches
half := len(missingFiles) / 2
missingFiles1 := strings.Join(getFileIDs(missingFiles[:half]), ",")
missingFiles2 := strings.Join(getFileIDs(missingFiles[half:]), ",")
if missingFiles1 != "" {
t.reinsertTorrent(torrent, missingFiles1, false)
}
if missingFiles2 != "" {
t.reinsertTorrent(torrent, missingFiles2, false)
}
}
log.Println("Waiting for downloads to finish")
}
}
func (t *TorrentManager) reinsertTorrent(torrent *Torrent, missingFiles string, deleteIfFailed bool) bool {
// if missingFiles is not provided, look for missing files
if missingFiles == "" {
@@ -485,7 +646,7 @@ func (t *TorrentManager) organizeChaos(info *realdebrid.Torrent, selectedFiles [
}
if !found {
// "chaos" file, we don't know where it belongs
isChaotic = true
isChaotic = !isStreamable(result.Response.Filename)
selectedFiles = append(selectedFiles, File{
File: realdebrid.File{
Path: result.Response.Filename,
@@ -500,133 +661,6 @@ func (t *TorrentManager) organizeChaos(info *realdebrid.Torrent, selectedFiles [
return selectedFiles, isChaotic
}
func (t *TorrentManager) repair(torrentID string, selectedFiles []File, tryReinsertionFirst bool) {
torrent := t.getByID(torrentID)
if torrent == nil {
return
}
// check if it is already "being" repaired
found := false
for _, hash := range t.inProgress {
if hash == torrent.Hash {
found = true
break
}
}
if found {
log.Println("Repair in progress, skipping", torrentID)
return
}
// check if it is already repaired
foundFiles := t.findAllDownloadedFilesFromHash(torrent.Hash)
var missingFiles []File
for _, sFile := range selectedFiles {
if sFile.Link == "" {
found := false
for _, fFile := range foundFiles {
if sFile.Path == fFile.Path {
found = true
break
}
}
if !found {
missingFiles = append(missingFiles, sFile)
}
}
}
if len(missingFiles) == 0 {
log.Println(torrent.Name, "is already repaired")
return
}
// then we repair it!
log.Println("Repairing torrent", torrentID)
// check if we can still add more downloads
proceed := t.canCapacityHandle()
if !proceed {
log.Println("Cannot add more torrents, exiting")
return
}
// first solution: add the same selection, maybe it can be fixed by reinsertion?
success := false
if tryReinsertionFirst {
success = t.reinsertTorrent(torrent, "", true)
}
if !success {
// if not, last resort: add only the missing files and do it in 2 batches
half := len(missingFiles) / 2
missingFiles1 := getFileIDs(missingFiles[:half])
missingFiles2 := getFileIDs(missingFiles[half:])
if missingFiles1 != "" {
t.reinsertTorrent(torrent, missingFiles1, false)
}
if missingFiles2 != "" {
t.reinsertTorrent(torrent, missingFiles2, false)
}
log.Println("Waiting for downloads to finish")
}
}
func (t *TorrentManager) mapToDirectories() {
// Map torrents to directories
switch t.config.GetVersion() {
case "v1":
configV1 := t.config.(*config.ZurgConfigV1)
groupMap := configV1.GetGroupMap()
// for every group, iterate over every torrent
// and then sprinkle/distribute the torrents to the directories of the group
for group, directories := range groupMap {
log.Printf("Processing directory group '%s', sequence: %s\n", group, strings.Join(directories, " > "))
counter := make(map[string]int)
for i := range t.torrents {
// don't process torrents that are already mapped if it is not the first run
if _, exists := t.processedTorrents[t.torrents[i].ID]; exists {
continue
}
for _, directory := range directories {
var filenames []string
for _, file := range t.torrents[i].Files {
filenames = append(filenames, file.Path)
}
if configV1.MeetsConditions(directory, t.torrents[i].ID, t.torrents[i].Name, filenames) {
found := false
for _, dir := range t.TorrentDirectoriesMap[t.torrents[i].Name] {
if dir == directory {
found = true
break
}
}
if !found {
counter[directory]++
t.TorrentDirectoriesMap[t.torrents[i].Name] = append(t.TorrentDirectoriesMap[t.torrents[i].Name], directory)
break
}
}
}
}
sum := 0
for _, count := range counter {
sum += count
}
if sum > 0 {
log.Printf("Directory group processed: %s %v %d\n", group, counter, sum)
} else {
log.Println("No new additions to directory group", group)
}
}
default:
log.Println("Unknown config version")
}
for _, torrent := range t.torrents {
t.processedTorrents[torrent.ID] = true
}
log.Println("Finished mapping to directories")
}
func (t *TorrentManager) canCapacityHandle() bool {
// max waiting time is 45 minutes
const maxRetries = 50
@@ -667,17 +701,3 @@ func (t *TorrentManager) canCapacityHandle() bool {
retryCount++
}
}
func getFileIDs(files []File) string {
var fileIDs string
for _, file := range files {
// this won't include the id=0 files that were "chaos"
if file.File.Selected == 1 && file.ID != 0 && file.Link == "" {
fileIDs += fmt.Sprintf("%d,", file.ID)
}
}
if len(fileIDs) > 0 {
fileIDs = fileIDs[:len(fileIDs)-1]
}
return fileIDs
}