use a new thread safe map
This commit is contained in:
@@ -15,15 +15,13 @@ import (
|
||||
"github.com/debridmediamanager.com/zurg/internal/config"
|
||||
"github.com/debridmediamanager.com/zurg/pkg/logutil"
|
||||
"github.com/debridmediamanager.com/zurg/pkg/realdebrid"
|
||||
"github.com/elliotchance/orderedmap/v2"
|
||||
cmap "github.com/orcaman/concurrent-map/v2"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type TorrentManager struct {
|
||||
config config.ConfigInterface
|
||||
DirectoryMap *orderedmap.OrderedMap[string, *orderedmap.OrderedMap[string, *Torrent]]
|
||||
TorrentMap *orderedmap.OrderedMap[string, *Torrent] // accessKey -> Torrent
|
||||
repairMap *orderedmap.OrderedMap[string, time.Time] // accessKey -> time last repaired
|
||||
cfg config.ConfigInterface
|
||||
DirectoryMap cmap.ConcurrentMap[string, cmap.ConcurrentMap[string, *Torrent]] // directory -> accessKey -> Torrent
|
||||
requiredVersion string
|
||||
checksum string
|
||||
api *realdebrid.RealDebrid
|
||||
@@ -35,19 +33,24 @@ type TorrentManager struct {
|
||||
// NewTorrentManager creates a new torrent manager
|
||||
// it will fetch all torrents and their info in the background
|
||||
// and store them in-memory and cached in files
|
||||
func NewTorrentManager(config config.ConfigInterface, api *realdebrid.RealDebrid) *TorrentManager {
|
||||
func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid) *TorrentManager {
|
||||
t := &TorrentManager{
|
||||
config: config,
|
||||
DirectoryMap: orderedmap.NewOrderedMap[string, *orderedmap.OrderedMap[string, *Torrent]](),
|
||||
TorrentMap: orderedmap.NewOrderedMap[string, *Torrent](),
|
||||
repairMap: orderedmap.NewOrderedMap[string, time.Time](),
|
||||
cfg: cfg,
|
||||
DirectoryMap: cmap.New[cmap.ConcurrentMap[string, *Torrent]](),
|
||||
requiredVersion: "10.11.2023",
|
||||
api: api,
|
||||
workerPool: make(chan bool, config.GetNumOfWorkers()),
|
||||
workerPool: make(chan bool, cfg.GetNumOfWorkers()),
|
||||
mu: &sync.Mutex{},
|
||||
log: logutil.NewLogger().Named("manager"),
|
||||
}
|
||||
|
||||
// create special directory
|
||||
t.DirectoryMap.Set("__all__", cmap.New[*Torrent]()) // key is AccessKey
|
||||
// create directory maps
|
||||
for _, directory := range cfg.GetDirectories() {
|
||||
t.DirectoryMap.Set(directory, cmap.New[*Torrent]())
|
||||
}
|
||||
|
||||
newTorrents, _, err := t.api.GetTorrents(0)
|
||||
if err != nil {
|
||||
t.log.Fatalf("Cannot get torrents: %v\n", err)
|
||||
@@ -60,72 +63,109 @@ func NewTorrentManager(config config.ConfigInterface, api *realdebrid.RealDebrid
|
||||
go func(idx int) {
|
||||
defer wg.Done()
|
||||
t.workerPool <- true
|
||||
// TODO wrap getMoreInfo and limit the execution time!
|
||||
torrentsChan <- t.getMoreInfo(newTorrents[idx])
|
||||
<-t.workerPool
|
||||
}(i)
|
||||
}
|
||||
t.log.Infof("Received %d torrents", len(newTorrents))
|
||||
wg.Wait()
|
||||
t.log.Infof("Fetched info for %d torrents", len(newTorrents))
|
||||
close(torrentsChan)
|
||||
count := 0
|
||||
for newTorrent := range torrentsChan {
|
||||
if newTorrent == nil {
|
||||
count++
|
||||
t.log.Infof("Fetched info for %d torrents", len(newTorrents))
|
||||
|
||||
noInfoCount := 0
|
||||
allCt := 0
|
||||
allTorrents, _ := t.DirectoryMap.Get("__all__")
|
||||
for info := range torrentsChan {
|
||||
allCt++
|
||||
if info == nil {
|
||||
noInfoCount++
|
||||
continue
|
||||
}
|
||||
torrent, _ := t.TorrentMap.Get(newTorrent.AccessKey)
|
||||
if torrent != nil {
|
||||
t.mu.Lock()
|
||||
t.TorrentMap.Set(newTorrent.AccessKey, t.mergeToMain(torrent, newTorrent))
|
||||
t.mu.Unlock()
|
||||
if torrent, exists := allTorrents.Get(info.AccessKey); exists {
|
||||
mainTorrent := t.mergeToMain(torrent, info)
|
||||
allTorrents.Set(info.AccessKey, mainTorrent)
|
||||
} else {
|
||||
t.mu.Lock()
|
||||
t.TorrentMap.Set(newTorrent.AccessKey, newTorrent)
|
||||
t.mu.Unlock()
|
||||
allTorrents.Set(info.AccessKey, info)
|
||||
}
|
||||
}
|
||||
t.log.Infof("Compiled all torrents to %d unique movies and shows, %d were missing info", t.TorrentMap.Len(), count)
|
||||
|
||||
anotherCt := 0
|
||||
allTorrents.IterCb(func(accessKey string, torrent *Torrent) {
|
||||
anotherCt++
|
||||
// get IDs
|
||||
var torrentIDs []string
|
||||
for _, instance := range torrent.Instances {
|
||||
torrentIDs = append(torrentIDs, instance.ID)
|
||||
}
|
||||
|
||||
// get filenames
|
||||
var filenames []string
|
||||
torrent.SelectedFiles.IterCb(func(_ string, file *File) {
|
||||
filenames = append(filenames, file.Path)
|
||||
})
|
||||
|
||||
// Map torrents to directories
|
||||
switch t.cfg.GetVersion() {
|
||||
case "v1":
|
||||
configV1 := t.cfg.(*config.ZurgConfigV1)
|
||||
for _, directories := range configV1.GetGroupMap() {
|
||||
for _, directory := range directories {
|
||||
if t.cfg.MeetsConditions(directory, torrent.AccessKey, torrentIDs, filenames) {
|
||||
torrents, _ := t.DirectoryMap.Get(directory)
|
||||
torrents.Set(accessKey, torrent)
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
t.log.Infof("Compiled into %d torrents, %d were missing info", allTorrents.Count(), noInfoCount)
|
||||
|
||||
t.checksum = t.getChecksum()
|
||||
|
||||
if t.config.EnableRepair() {
|
||||
go t.repairAll()
|
||||
}
|
||||
// go t.startRefreshJob()
|
||||
// if t.config.EnableRepair() {
|
||||
// go t.repairAll()
|
||||
// }
|
||||
go t.startRefreshJob()
|
||||
|
||||
return t
|
||||
}
|
||||
|
||||
func (t *TorrentManager) mergeToMain(t1, t2 *Torrent) *Torrent {
|
||||
merged := t1
|
||||
mainTorrent := t1
|
||||
|
||||
// Merge SelectedFiles
|
||||
// side note: iteration works!
|
||||
for el := t2.SelectedFiles.Front(); el != nil; el = el.Next() {
|
||||
if _, ok := merged.SelectedFiles.Get(el.Key); !ok {
|
||||
merged.SelectedFiles.Set(el.Key, el.Value)
|
||||
// Merge SelectedFiles - itercb accesses a different copy of the selectedfiles map
|
||||
t2.SelectedFiles.IterCb(func(key string, file *File) {
|
||||
// see if it already exists in the main torrent
|
||||
if mainFile, ok := mainTorrent.SelectedFiles.Get(key); !ok {
|
||||
mainTorrent.SelectedFiles.Set(key, file)
|
||||
} else if file.Link != "" && mainFile.Link == "" {
|
||||
// if it exists, but the link is empty, then we can update it
|
||||
mainTorrent.SelectedFiles.Set(key, file)
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
// Merge Instances
|
||||
merged.Instances = append(t1.Instances, t2.Instances...)
|
||||
mainTorrent.Instances = append(t1.Instances, t2.Instances...)
|
||||
|
||||
// LatestAdded
|
||||
if t1.LatestAdded < t2.LatestAdded {
|
||||
merged.LatestAdded = t2.LatestAdded
|
||||
mainTorrent.LatestAdded = t2.LatestAdded
|
||||
}
|
||||
|
||||
// InProgress - if one of the instances is in progress, then the whole torrent is in progress
|
||||
for _, instance := range merged.Instances {
|
||||
mainTorrent.InProgress = false
|
||||
for _, instance := range mainTorrent.Instances {
|
||||
if instance.Progress != 100 {
|
||||
merged.InProgress = true
|
||||
mainTorrent.InProgress = true
|
||||
}
|
||||
if instance.ForRepair {
|
||||
merged.ForRepair = true
|
||||
mainTorrent.ForRepair = true
|
||||
}
|
||||
}
|
||||
|
||||
return merged
|
||||
return mainTorrent
|
||||
}
|
||||
|
||||
// proxy
|
||||
@@ -195,7 +235,7 @@ func (t *TorrentManager) getChecksum() string {
|
||||
func (t *TorrentManager) startRefreshJob() {
|
||||
t.log.Info("Starting periodic refresh")
|
||||
for {
|
||||
<-time.After(time.Duration(t.config.GetRefreshEverySeconds()) * time.Second)
|
||||
<-time.After(time.Duration(t.cfg.GetRefreshEverySeconds()) * time.Second)
|
||||
|
||||
checksum := t.getChecksum()
|
||||
if checksum == t.checksum {
|
||||
@@ -220,57 +260,85 @@ func (t *TorrentManager) startRefreshJob() {
|
||||
<-t.workerPool
|
||||
}(i)
|
||||
}
|
||||
wg.Wait()
|
||||
close(torrentsChan)
|
||||
t.log.Infof("Fetched info for %d torrents", len(newTorrents))
|
||||
|
||||
// side note: iteration works!
|
||||
noInfoCount := 0
|
||||
allTorrents, _ := t.DirectoryMap.Get("__all__")
|
||||
var retain []string
|
||||
for info := range torrentsChan {
|
||||
if info == nil {
|
||||
noInfoCount++
|
||||
continue
|
||||
}
|
||||
retain = append(retain, info.AccessKey)
|
||||
if torrent, exists := allTorrents.Get(info.AccessKey); exists {
|
||||
mainTorrent := t.mergeToMain(torrent, info)
|
||||
allTorrents.Set(info.AccessKey, mainTorrent)
|
||||
} else {
|
||||
allTorrents.Set(info.AccessKey, info)
|
||||
}
|
||||
}
|
||||
|
||||
allTorrents.IterCb(func(accessKey string, torrent *Torrent) {
|
||||
// get IDs
|
||||
var torrentIDs []string
|
||||
for _, instance := range torrent.Instances {
|
||||
torrentIDs = append(torrentIDs, instance.ID)
|
||||
}
|
||||
|
||||
// get filenames
|
||||
var filenames []string
|
||||
torrent.SelectedFiles.IterCb(func(_ string, file *File) {
|
||||
filenames = append(filenames, file.Path)
|
||||
})
|
||||
|
||||
// Map torrents to directories
|
||||
switch t.cfg.GetVersion() {
|
||||
case "v1":
|
||||
configV1 := t.cfg.(*config.ZurgConfigV1)
|
||||
for _, directories := range configV1.GetGroupMap() {
|
||||
for _, directory := range directories {
|
||||
if t.cfg.MeetsConditions(directory, torrent.AccessKey, torrentIDs, filenames) {
|
||||
torrents, _ := t.DirectoryMap.Get(directory)
|
||||
torrents.Set(accessKey, torrent)
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
// delete torrents that no longer exist
|
||||
var toDelete []string
|
||||
for el := t.TorrentMap.Front(); el != nil; el = el.Next() {
|
||||
allTorrents.IterCb(func(_ string, torrent *Torrent) {
|
||||
found := false
|
||||
for _, newTorrent := range newTorrents {
|
||||
if newTorrent.ID == el.Value.AccessKey {
|
||||
for _, accessKey := range retain {
|
||||
if torrent.AccessKey == accessKey {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
toDelete = append(toDelete, el.Key)
|
||||
toDelete = append(toDelete, torrent.AccessKey)
|
||||
}
|
||||
}
|
||||
})
|
||||
for _, accessKey := range toDelete {
|
||||
t.TorrentMap.Delete(accessKey)
|
||||
for el := t.DirectoryMap.Front(); el != nil; el = el.Next() {
|
||||
torrents := el.Value
|
||||
for el2 := torrents.Front(); el2 != nil; el2 = el2.Next() {
|
||||
if el2.Key == accessKey {
|
||||
torrents.Delete(accessKey)
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
t.DirectoryMap.IterCb(func(_ string, torrents cmap.ConcurrentMap[string, *Torrent]) {
|
||||
torrents.Remove(accessKey)
|
||||
})
|
||||
}
|
||||
// end delete torrents that no longer exist
|
||||
|
||||
t.log.Infof("Compiled into %d torrents, %d were missing info", allTorrents.Count(), noInfoCount)
|
||||
|
||||
wg.Wait()
|
||||
close(torrentsChan)
|
||||
for newTorrent := range torrentsChan {
|
||||
if newTorrent == nil {
|
||||
continue
|
||||
}
|
||||
torrent, _ := t.TorrentMap.Get(newTorrent.AccessKey)
|
||||
if torrent != nil {
|
||||
t.mu.Lock()
|
||||
t.TorrentMap.Set(newTorrent.AccessKey, t.mergeToMain(torrent, newTorrent))
|
||||
t.mu.Unlock()
|
||||
} else {
|
||||
t.mu.Lock()
|
||||
t.TorrentMap.Set(newTorrent.AccessKey, newTorrent)
|
||||
t.mu.Unlock()
|
||||
}
|
||||
}
|
||||
t.checksum = t.getChecksum()
|
||||
|
||||
if t.config.EnableRepair() {
|
||||
go t.repairAll()
|
||||
}
|
||||
go OnLibraryUpdateHook(t.config)
|
||||
// if t.config.EnableRepair() {
|
||||
// go t.repairAll()
|
||||
// }
|
||||
go OnLibraryUpdateHook(t.cfg)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -299,7 +367,7 @@ func (t *TorrentManager) getMoreInfo(rdTorrent realdebrid.Torrent) *Torrent {
|
||||
// it also has a Link field, which can be empty
|
||||
// if it is empty, it means the file is no longer available
|
||||
// Files+Links together are the same as SelectedFiles
|
||||
selectedFiles := orderedmap.NewOrderedMap[string, *File]()
|
||||
selectedFiles := cmap.New[*File]()
|
||||
streamableCount := 0
|
||||
// if some Links are empty, we need to repair it
|
||||
forRepair := false
|
||||
@@ -317,66 +385,50 @@ func (t *TorrentManager) getMoreInfo(rdTorrent realdebrid.Torrent) *Torrent {
|
||||
ZurgFS: hashStringToFh(file.Path + info.Hash),
|
||||
})
|
||||
}
|
||||
if selectedFiles.Len() > len(info.Links) && info.Progress == 100 {
|
||||
if selectedFiles.Count() > len(info.Links) && info.Progress == 100 {
|
||||
// chaotic file means RD will not output the desired file selection
|
||||
// e.g. even if we select just a single mkv, it will output a rar
|
||||
var isChaotic bool
|
||||
selectedFiles, isChaotic = t.organizeChaos(info.Links, selectedFiles)
|
||||
if isChaotic {
|
||||
t.log.Warnf("Torrent id=%s %s is unrepairable, it is always returning a rar file (it will no longer show up in your directories)", info.ID, info.Name)
|
||||
t.log.Warnf("Torrent id=%s %s is unplayable; it is always returning a rar file (it will no longer show up in your directories)", info.ID, info.Name)
|
||||
// t.log.Debugf("You can try fixing it yourself magnet:?xt=urn:btih:%s", info.Hash)
|
||||
return nil
|
||||
} else {
|
||||
if streamableCount > 1 {
|
||||
if streamableCount > 1 && t.cfg.EnableRepair() {
|
||||
// case for repair 1: it's missing some links (or all links)
|
||||
// if we download it as is, we might get the same file over and over again
|
||||
// so we need to redownload it with other files selected
|
||||
// that is why we check if there are other streamable files
|
||||
t.log.Infof("Torrent id=%s %s marked for repair", info.ID, info.Name)
|
||||
forRepair = true
|
||||
} else {
|
||||
t.log.Warnf("Torrent id=%s %s is unrepairable, the lone streamable link has expired (it will no longer show up in your directories)", info.ID, info.Name)
|
||||
} else if streamableCount == 1 {
|
||||
t.log.Warnf("Torrent id=%s %s is unplayable; the lone streamable link has expired (it will no longer show up in your directories)", info.ID, info.Name)
|
||||
// t.log.Debugf("You can try fixing it yourself magnet:?xt=urn:btih:%s", info.Hash)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
} else if selectedFiles.Len() == len(info.Links) {
|
||||
} else if selectedFiles.Count() == len(info.Links) {
|
||||
// all links are still intact! good!
|
||||
// side note: iteration works!
|
||||
i := 0
|
||||
for el := selectedFiles.Front(); el != nil; el = el.Next() {
|
||||
selectedFiles.IterCb(func(_ string, file *File) {
|
||||
if i < len(info.Links) {
|
||||
file := el.Value
|
||||
file.Link = info.Links[i] // verified working!
|
||||
selectedFiles.Set(el.Key, file)
|
||||
i++
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
info.ForRepair = forRepair
|
||||
torrent := Torrent{
|
||||
AccessKey: t.getName(info.Name, info.OriginalName),
|
||||
SelectedFiles: selectedFiles,
|
||||
Directories: t.getDirectories(info),
|
||||
LatestAdded: info.Added,
|
||||
InProgress: info.Progress != 100,
|
||||
Instances: []realdebrid.TorrentInfo{*info},
|
||||
}
|
||||
for _, directory := range torrent.Directories {
|
||||
if _, ok := t.DirectoryMap.Get(directory); !ok {
|
||||
newMap := orderedmap.NewOrderedMap[string, *Torrent]()
|
||||
t.mu.Lock()
|
||||
t.DirectoryMap.Set(directory, newMap)
|
||||
t.mu.Unlock()
|
||||
} else {
|
||||
torrents, _ := t.DirectoryMap.Get(directory)
|
||||
t.mu.Lock()
|
||||
torrents.Set(torrent.AccessKey, &torrent)
|
||||
t.mu.Unlock()
|
||||
}
|
||||
}
|
||||
if selectedFiles.Len() > 0 && torrentFromFile == nil {
|
||||
if selectedFiles.Count() > 0 && torrentFromFile == nil {
|
||||
t.writeToFile(info) // only when there are selected files, else it's useless
|
||||
}
|
||||
return &torrent
|
||||
@@ -390,7 +442,7 @@ func hashStringToFh(s string) (fh uint64) {
|
||||
|
||||
func (t *TorrentManager) getName(name, originalName string) string {
|
||||
// drop the extension from the name
|
||||
if t.config.EnableRetainFolderNameExtension() && strings.Contains(name, originalName) {
|
||||
if t.cfg.EnableRetainFolderNameExtension() && strings.Contains(name, originalName) {
|
||||
return name
|
||||
} else {
|
||||
ret := strings.TrimSuffix(originalName, ".mp4")
|
||||
@@ -399,38 +451,6 @@ func (t *TorrentManager) getName(name, originalName string) string {
|
||||
}
|
||||
}
|
||||
|
||||
func (t *TorrentManager) getDirectories(torrent *realdebrid.TorrentInfo) []string {
|
||||
var ret []string
|
||||
// Map torrents to directories
|
||||
switch t.config.GetVersion() {
|
||||
case "v1":
|
||||
configV1 := t.config.(*config.ZurgConfigV1)
|
||||
groupMap := configV1.GetGroupMap()
|
||||
// for every group, iterate over every torrent
|
||||
// and then sprinkle/distribute the torrents to the directories of the group
|
||||
for _, directories := range groupMap {
|
||||
for _, directory := range directories {
|
||||
var filenames []string
|
||||
for _, file := range torrent.Files {
|
||||
if file.Selected == 0 {
|
||||
continue
|
||||
}
|
||||
filenames = append(filenames, file.Path)
|
||||
}
|
||||
accessKey := t.getName(torrent.Name, torrent.OriginalName)
|
||||
if configV1.MeetsConditions(directory, torrent.ID, accessKey, filenames) {
|
||||
ret = append(ret, directory)
|
||||
break // we found a directory for this torrent for this group, so we can stop looking for more
|
||||
}
|
||||
}
|
||||
}
|
||||
default:
|
||||
t.log.Error("Unknown config version")
|
||||
}
|
||||
// t.log.Debugf("Torrent %s is in directories %v", t.getName(torrent.Name, torrent.OriginalName), ret)
|
||||
return ret
|
||||
}
|
||||
|
||||
func (t *TorrentManager) writeToFile(torrent *realdebrid.TorrentInfo) error {
|
||||
filePath := "data/" + torrent.ID + ".bin"
|
||||
file, err := os.Create(filePath)
|
||||
@@ -473,7 +493,7 @@ func (t *TorrentManager) readFromFile(torrentID string) *realdebrid.TorrentInfo
|
||||
return &torrent
|
||||
}
|
||||
|
||||
func (t *TorrentManager) organizeChaos(links []string, selectedFiles *orderedmap.OrderedMap[string, *File]) (*orderedmap.OrderedMap[string, *File], bool) {
|
||||
func (t *TorrentManager) organizeChaos(links []string, selectedFiles cmap.ConcurrentMap[string, *File]) (cmap.ConcurrentMap[string, *File], bool) {
|
||||
type Result struct {
|
||||
Response *realdebrid.UnrestrictResponse
|
||||
}
|
||||
@@ -503,13 +523,12 @@ func (t *TorrentManager) organizeChaos(links []string, selectedFiles *orderedmap
|
||||
continue
|
||||
}
|
||||
found := false
|
||||
// side note: iteration works!
|
||||
for el := selectedFiles.Front(); el != nil; el = el.Next() {
|
||||
if file, _ := selectedFiles.Get(el.Key); strings.Contains(file.Path, result.Response.Filename) {
|
||||
selectedFiles.IterCb(func(_ string, file *File) {
|
||||
if strings.Contains(file.Path, result.Response.Filename) {
|
||||
file.Link = result.Response.Link
|
||||
found = true
|
||||
}
|
||||
}
|
||||
})
|
||||
if !found {
|
||||
if result.Response.Streamable == 1 {
|
||||
selectedFiles.Set(filepath.Base(result.Response.Filename), &File{
|
||||
@@ -532,219 +551,219 @@ func (t *TorrentManager) organizeChaos(links []string, selectedFiles *orderedmap
|
||||
return selectedFiles, isChaotic
|
||||
}
|
||||
|
||||
func (t *TorrentManager) repairAll() {
|
||||
t.log.Info("Checking for torrents to repair")
|
||||
// side note: iteration works!
|
||||
for el := t.TorrentMap.Front(); el != nil; el = el.Next() {
|
||||
torrent := el.Value
|
||||
// do not repair if in progress
|
||||
if torrent.InProgress {
|
||||
continue
|
||||
}
|
||||
// func (t *TorrentManager) repairAll() {
|
||||
// t.log.Info("Checking for torrents to repair")
|
||||
// // side note: iteration works!
|
||||
// for el := t.TorrentMap.Front(); el != nil; el = el.Next() {
|
||||
// torrent := el.Value
|
||||
// // do not repair if in progress
|
||||
// if torrent.InProgress {
|
||||
// continue
|
||||
// }
|
||||
|
||||
// do not repair if all files have links
|
||||
forRepair := false
|
||||
for el2 := torrent.SelectedFiles.Front(); el2 != nil; el2 = el2.Next() {
|
||||
file := el2.Value
|
||||
if file.Link == "" {
|
||||
forRepair = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !forRepair {
|
||||
// if it was marked for repair, unmark it
|
||||
torrent.ForRepair = false
|
||||
t.mu.Lock()
|
||||
t.TorrentMap.Set(torrent.AccessKey, torrent)
|
||||
t.mu.Unlock()
|
||||
continue
|
||||
}
|
||||
// // do not repair if all files have links
|
||||
// forRepair := false
|
||||
// for el2 := torrent.SelectedFiles.Front(); el2 != nil; el2 = el2.Next() {
|
||||
// file := el2.Value
|
||||
// if file.Link == "" {
|
||||
// forRepair = true
|
||||
// break
|
||||
// }
|
||||
// }
|
||||
// if !forRepair {
|
||||
// // if it was marked for repair, unmark it
|
||||
// torrent.ForRepair = false
|
||||
// t.mu.Lock()
|
||||
// t.TorrentMap.Set(torrent.AccessKey, torrent)
|
||||
// t.mu.Unlock()
|
||||
// continue
|
||||
// }
|
||||
|
||||
// when getting info, we mark it for repair if it's missing some links
|
||||
if torrent.ForRepair {
|
||||
t.log.Infof("Found torrent for repair: %s", torrent.AccessKey)
|
||||
t.Repair(torrent.AccessKey)
|
||||
break // only repair the first one for repair and then move on
|
||||
}
|
||||
}
|
||||
}
|
||||
// // when getting info, we mark it for repair if it's missing some links
|
||||
// if torrent.ForRepair {
|
||||
// t.log.Infof("Found torrent for repair: %s", torrent.AccessKey)
|
||||
// t.Repair(torrent.AccessKey)
|
||||
// break // only repair the first one for repair and then move on
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
|
||||
func (t *TorrentManager) Repair(accessKey string) {
|
||||
if lastRepair, ok := t.repairMap.Get(accessKey); ok {
|
||||
if time.Since(lastRepair) < time.Duration(24*time.Hour) { // magic number: 24 hrs
|
||||
return
|
||||
}
|
||||
}
|
||||
t.mu.Lock()
|
||||
t.repairMap.Set(accessKey, time.Now())
|
||||
t.mu.Unlock()
|
||||
// func (t *TorrentManager) Repair(accessKey string) {
|
||||
// if lastRepair, ok := t.repairMap.Get(accessKey); ok {
|
||||
// if time.Since(lastRepair) < time.Duration(24*time.Hour) { // magic number: 24 hrs
|
||||
// return
|
||||
// }
|
||||
// }
|
||||
// t.mu.Lock()
|
||||
// t.repairMap.Set(accessKey, time.Now())
|
||||
// t.mu.Unlock()
|
||||
|
||||
if !t.config.EnableRepair() {
|
||||
t.log.Warn("Repair is disabled; if you do not have other zurg instances running, you should enable repair")
|
||||
return
|
||||
}
|
||||
// if !t.config.EnableRepair() {
|
||||
// t.log.Warn("Repair is disabled; if you do not have other zurg instances running, you should enable repair")
|
||||
// return
|
||||
// }
|
||||
|
||||
torrent, _ := t.TorrentMap.Get(accessKey)
|
||||
if torrent == nil {
|
||||
t.log.Warnf("Cannot find torrent %s anymore to repair it", accessKey)
|
||||
return
|
||||
}
|
||||
if torrent.InProgress {
|
||||
t.log.Infof("Torrent %s is in progress, cannot repair", torrent.AccessKey)
|
||||
return
|
||||
}
|
||||
// torrent, _ := t.TorrentMap.Get(accessKey)
|
||||
// if torrent == nil {
|
||||
// t.log.Warnf("Cannot find torrent %s anymore to repair it", accessKey)
|
||||
// return
|
||||
// }
|
||||
// if torrent.InProgress {
|
||||
// t.log.Infof("Torrent %s is in progress, cannot repair", torrent.AccessKey)
|
||||
// return
|
||||
// }
|
||||
|
||||
// check if we can still add more downloads
|
||||
proceed := t.canCapacityHandle()
|
||||
if !proceed {
|
||||
t.log.Error("Cannot add more torrents, ignoring repair request")
|
||||
return
|
||||
}
|
||||
// // check if we can still add more downloads
|
||||
// proceed := t.canCapacityHandle()
|
||||
// if !proceed {
|
||||
// t.log.Error("Cannot add more torrents, ignoring repair request")
|
||||
// return
|
||||
// }
|
||||
|
||||
// make the file messy
|
||||
var links []string
|
||||
for el := torrent.SelectedFiles.Front(); el != nil; el = el.Next() {
|
||||
file := el.Value
|
||||
if file.Link != "" {
|
||||
links = append(links, file.Link)
|
||||
}
|
||||
file.Link = ""
|
||||
}
|
||||
selectedFiles, _ := t.organizeChaos(links, torrent.SelectedFiles)
|
||||
torrent.SelectedFiles = selectedFiles
|
||||
t.mu.Lock()
|
||||
t.TorrentMap.Set(torrent.AccessKey, torrent)
|
||||
t.mu.Unlock()
|
||||
// // make the file messy
|
||||
// var links []string
|
||||
// for el := torrent.SelectedFiles.Front(); el != nil; el = el.Next() {
|
||||
// file := el.Value
|
||||
// if file.Link != "" {
|
||||
// links = append(links, file.Link)
|
||||
// }
|
||||
// file.Link = ""
|
||||
// }
|
||||
// selectedFiles, _ := t.organizeChaos(links, torrent.SelectedFiles)
|
||||
// torrent.SelectedFiles = selectedFiles
|
||||
// t.mu.Lock()
|
||||
// t.TorrentMap.Set(torrent.AccessKey, torrent)
|
||||
// t.mu.Unlock()
|
||||
|
||||
// first solution: add the same selection, maybe it can be fixed by reinsertion?
|
||||
if t.reinsertTorrent(torrent, "") {
|
||||
t.log.Infof("Successfully downloaded torrent %s to repair it", torrent.AccessKey)
|
||||
return
|
||||
}
|
||||
// if all the selected files are missing but there are other streamable files
|
||||
var missingFiles []File
|
||||
for el := torrent.SelectedFiles.Front(); el != nil; el = el.Next() {
|
||||
file := el.Value
|
||||
if file.Link == "" {
|
||||
missingFiles = append(missingFiles, *file)
|
||||
}
|
||||
}
|
||||
if len(missingFiles) > 0 {
|
||||
t.log.Infof("Redownloading %d missing files for torrent %s", len(missingFiles), torrent.AccessKey)
|
||||
// if not, last resort: add only the missing files but do it in 2 batches
|
||||
half := len(missingFiles) / 2
|
||||
missingFiles1 := strings.Join(getFileIDs(missingFiles[:half]), ",")
|
||||
missingFiles2 := strings.Join(getFileIDs(missingFiles[half:]), ",")
|
||||
if missingFiles1 != "" {
|
||||
t.reinsertTorrent(torrent, missingFiles1)
|
||||
}
|
||||
if missingFiles2 != "" {
|
||||
t.reinsertTorrent(torrent, missingFiles2)
|
||||
}
|
||||
}
|
||||
}
|
||||
// // first solution: add the same selection, maybe it can be fixed by reinsertion?
|
||||
// if t.reinsertTorrent(torrent, "") {
|
||||
// t.log.Infof("Successfully downloaded torrent %s to repair it", torrent.AccessKey)
|
||||
// return
|
||||
// }
|
||||
// // if all the selected files are missing but there are other streamable files
|
||||
// var missingFiles []File
|
||||
// for el := torrent.SelectedFiles.Front(); el != nil; el = el.Next() {
|
||||
// file := el.Value
|
||||
// if file.Link == "" {
|
||||
// missingFiles = append(missingFiles, *file)
|
||||
// }
|
||||
// }
|
||||
// if len(missingFiles) > 0 {
|
||||
// t.log.Infof("Redownloading %d missing files for torrent %s", len(missingFiles), torrent.AccessKey)
|
||||
// // if not, last resort: add only the missing files but do it in 2 batches
|
||||
// half := len(missingFiles) / 2
|
||||
// missingFiles1 := strings.Join(getFileIDs(missingFiles[:half]), ",")
|
||||
// missingFiles2 := strings.Join(getFileIDs(missingFiles[half:]), ",")
|
||||
// if missingFiles1 != "" {
|
||||
// t.reinsertTorrent(torrent, missingFiles1)
|
||||
// }
|
||||
// if missingFiles2 != "" {
|
||||
// t.reinsertTorrent(torrent, missingFiles2)
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
|
||||
func (t *TorrentManager) reinsertTorrent(torrent *Torrent, missingFiles string) bool {
|
||||
// if missingFiles is not provided, look for missing files
|
||||
if missingFiles == "" {
|
||||
var tmpSelection string
|
||||
for el := torrent.SelectedFiles.Front(); el != nil; el = el.Next() {
|
||||
file := el.Value
|
||||
tmpSelection += fmt.Sprintf("%d,", file.ID)
|
||||
}
|
||||
if tmpSelection == "" {
|
||||
return false
|
||||
}
|
||||
if len(tmpSelection) > 0 {
|
||||
missingFiles = tmpSelection[:len(tmpSelection)-1]
|
||||
}
|
||||
}
|
||||
// func (t *TorrentManager) reinsertTorrent(torrent *Torrent, missingFiles string) bool {
|
||||
// // if missingFiles is not provided, look for missing files
|
||||
// if missingFiles == "" {
|
||||
// var tmpSelection string
|
||||
// for el := torrent.SelectedFiles.Front(); el != nil; el = el.Next() {
|
||||
// file := el.Value
|
||||
// tmpSelection += fmt.Sprintf("%d,", file.ID)
|
||||
// }
|
||||
// if tmpSelection == "" {
|
||||
// return false
|
||||
// }
|
||||
// if len(tmpSelection) > 0 {
|
||||
// missingFiles = tmpSelection[:len(tmpSelection)-1]
|
||||
// }
|
||||
// }
|
||||
|
||||
// redownload torrent
|
||||
resp, err := t.api.AddMagnetHash(torrent.Instances[0].Hash)
|
||||
if err != nil {
|
||||
t.log.Warnf("Cannot redownload torrent: %v", err)
|
||||
return false
|
||||
}
|
||||
time.Sleep(1 * time.Second)
|
||||
// // redownload torrent
|
||||
// resp, err := t.api.AddMagnetHash(torrent.Instances[0].Hash)
|
||||
// if err != nil {
|
||||
// t.log.Warnf("Cannot redownload torrent: %v", err)
|
||||
// return false
|
||||
// }
|
||||
// time.Sleep(1 * time.Second)
|
||||
|
||||
// select files
|
||||
newTorrentID := resp.ID
|
||||
err = t.api.SelectTorrentFiles(newTorrentID, missingFiles)
|
||||
if err != nil {
|
||||
t.log.Warnf("Cannot start redownloading: %v", err)
|
||||
t.api.DeleteTorrent(newTorrentID)
|
||||
return false
|
||||
}
|
||||
time.Sleep(10 * time.Second)
|
||||
// // select files
|
||||
// newTorrentID := resp.ID
|
||||
// err = t.api.SelectTorrentFiles(newTorrentID, missingFiles)
|
||||
// if err != nil {
|
||||
// t.log.Warnf("Cannot start redownloading: %v", err)
|
||||
// t.api.DeleteTorrent(newTorrentID)
|
||||
// return false
|
||||
// }
|
||||
// time.Sleep(10 * time.Second)
|
||||
|
||||
// see if the torrent is ready
|
||||
info, err := t.api.GetTorrentInfo(newTorrentID)
|
||||
if err != nil {
|
||||
t.log.Warnf("Cannot get info on redownloaded torrent id=%s : %v", newTorrentID, err)
|
||||
t.api.DeleteTorrent(newTorrentID)
|
||||
return false
|
||||
}
|
||||
// // see if the torrent is ready
|
||||
// info, err := t.api.GetTorrentInfo(newTorrentID)
|
||||
// if err != nil {
|
||||
// t.log.Warnf("Cannot get info on redownloaded torrent id=%s : %v", newTorrentID, err)
|
||||
// t.api.DeleteTorrent(newTorrentID)
|
||||
// return false
|
||||
// }
|
||||
|
||||
if info.Status == "magnet_error" || info.Status == "error" || info.Status == "virus" || info.Status == "dead" {
|
||||
t.log.Warnf("The redownloaded torrent id=%s is in error state: %s", newTorrentID, info.Status)
|
||||
t.api.DeleteTorrent(newTorrentID)
|
||||
return false
|
||||
}
|
||||
// if info.Status == "magnet_error" || info.Status == "error" || info.Status == "virus" || info.Status == "dead" {
|
||||
// t.log.Warnf("The redownloaded torrent id=%s is in error state: %s", newTorrentID, info.Status)
|
||||
// t.api.DeleteTorrent(newTorrentID)
|
||||
// return false
|
||||
// }
|
||||
|
||||
if info.Progress != 100 {
|
||||
t.log.Infof("Torrent id=%s is not cached anymore so we have to wait until completion (this should fix the issue already)", info.ID)
|
||||
return true
|
||||
}
|
||||
// if info.Progress != 100 {
|
||||
// t.log.Infof("Torrent id=%s is not cached anymore so we have to wait until completion (this should fix the issue already)", info.ID)
|
||||
// return true
|
||||
// }
|
||||
|
||||
missingCount := len(strings.Split(missingFiles, ","))
|
||||
if len(info.Links) != missingCount {
|
||||
t.log.Infof("It did not fix the issue for id=%s, only got %d files but we need %d, undoing", info.ID, len(info.Links), missingCount)
|
||||
t.api.DeleteTorrent(newTorrentID)
|
||||
return false
|
||||
}
|
||||
// missingCount := len(strings.Split(missingFiles, ","))
|
||||
// if len(info.Links) != missingCount {
|
||||
// t.log.Infof("It did not fix the issue for id=%s, only got %d files but we need %d, undoing", info.ID, len(info.Links), missingCount)
|
||||
// t.api.DeleteTorrent(newTorrentID)
|
||||
// return false
|
||||
// }
|
||||
|
||||
t.log.Infof("Repair successful id=%s", newTorrentID)
|
||||
return true
|
||||
}
|
||||
// t.log.Infof("Repair successful id=%s", newTorrentID)
|
||||
// return true
|
||||
// }
|
||||
|
||||
func (t *TorrentManager) canCapacityHandle() bool {
|
||||
// max waiting time is 45 minutes
|
||||
const maxRetries = 50
|
||||
const baseDelay = 1 * time.Second
|
||||
const maxDelay = 60 * time.Second
|
||||
retryCount := 0
|
||||
for {
|
||||
count, err := t.api.GetActiveTorrentCount()
|
||||
if err != nil {
|
||||
t.log.Warnf("Cannot get active downloads count: %v", err)
|
||||
if retryCount >= maxRetries {
|
||||
t.log.Error("Max retries reached. Exiting.")
|
||||
return false
|
||||
}
|
||||
delay := time.Duration(math.Pow(2, float64(retryCount))) * baseDelay
|
||||
if delay > maxDelay {
|
||||
delay = maxDelay
|
||||
}
|
||||
time.Sleep(delay)
|
||||
retryCount++
|
||||
continue
|
||||
}
|
||||
// func (t *TorrentManager) canCapacityHandle() bool {
|
||||
// // max waiting time is 45 minutes
|
||||
// const maxRetries = 50
|
||||
// const baseDelay = 1 * time.Second
|
||||
// const maxDelay = 60 * time.Second
|
||||
// retryCount := 0
|
||||
// for {
|
||||
// count, err := t.api.GetActiveTorrentCount()
|
||||
// if err != nil {
|
||||
// t.log.Warnf("Cannot get active downloads count: %v", err)
|
||||
// if retryCount >= maxRetries {
|
||||
// t.log.Error("Max retries reached. Exiting.")
|
||||
// return false
|
||||
// }
|
||||
// delay := time.Duration(math.Pow(2, float64(retryCount))) * baseDelay
|
||||
// if delay > maxDelay {
|
||||
// delay = maxDelay
|
||||
// }
|
||||
// time.Sleep(delay)
|
||||
// retryCount++
|
||||
// continue
|
||||
// }
|
||||
|
||||
if count.DownloadingCount < count.MaxNumberOfTorrents {
|
||||
t.log.Infof("We can still add a new torrent, we have capacity for %d more", count.MaxNumberOfTorrents-count.DownloadingCount)
|
||||
return true
|
||||
}
|
||||
// if count.DownloadingCount < count.MaxNumberOfTorrents {
|
||||
// t.log.Infof("We can still add a new torrent, we have capacity for %d more", count.MaxNumberOfTorrents-count.DownloadingCount)
|
||||
// return true
|
||||
// }
|
||||
|
||||
if retryCount >= maxRetries {
|
||||
t.log.Error("Max retries reached, exiting")
|
||||
return false
|
||||
}
|
||||
delay := time.Duration(math.Pow(2, float64(retryCount))) * baseDelay
|
||||
if delay > maxDelay {
|
||||
delay = maxDelay
|
||||
}
|
||||
time.Sleep(delay)
|
||||
retryCount++
|
||||
}
|
||||
}
|
||||
// if retryCount >= maxRetries {
|
||||
// t.log.Error("Max retries reached, exiting")
|
||||
// return false
|
||||
// }
|
||||
// delay := time.Duration(math.Pow(2, float64(retryCount))) * baseDelay
|
||||
// if delay > maxDelay {
|
||||
// delay = maxDelay
|
||||
// }
|
||||
// time.Sleep(delay)
|
||||
// retryCount++
|
||||
// }
|
||||
// }
|
||||
|
||||
Reference in New Issue
Block a user