Refactor file links and merges
This commit is contained in:
@@ -54,7 +54,7 @@ func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, w
|
|||||||
deleteOnceDone: mapset.NewSet[string](),
|
deleteOnceDone: mapset.NewSet[string](),
|
||||||
allAccessKeys: mapset.NewSet[string](),
|
allAccessKeys: mapset.NewSet[string](),
|
||||||
latestState: &LibraryState{},
|
latestState: &LibraryState{},
|
||||||
requiredVersion: "27.01.2024",
|
requiredVersion: "0.9.3-hotfix.3",
|
||||||
workerPool: workerPool,
|
workerPool: workerPool,
|
||||||
repairPool: repairPool,
|
repairPool: repairPool,
|
||||||
log: log,
|
log: log,
|
||||||
|
|||||||
@@ -231,7 +231,6 @@ func (t *TorrentManager) getMoreInfo(rdTorrent realdebrid.Torrent) *Torrent {
|
|||||||
} else {
|
} else {
|
||||||
torrent.InProgressIDs.Add(info.ID)
|
torrent.InProgressIDs.Add(info.ID)
|
||||||
}
|
}
|
||||||
torrent.BrokenLinks = mapset.NewSet[string]()
|
|
||||||
|
|
||||||
infoCache.Set(rdTorrent.ID, &torrent)
|
infoCache.Set(rdTorrent.ID, &torrent)
|
||||||
t.saveTorrentChangesToDisk(&torrent, nil)
|
t.saveTorrentChangesToDisk(&torrent, nil)
|
||||||
@@ -240,61 +239,63 @@ func (t *TorrentManager) getMoreInfo(rdTorrent realdebrid.Torrent) *Torrent {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (t *TorrentManager) mergeToMain(existing, toMerge *Torrent) Torrent {
|
func (t *TorrentManager) mergeToMain(existing, toMerge *Torrent) Torrent {
|
||||||
|
var newer, older *Torrent
|
||||||
|
if existing.Added < toMerge.Added {
|
||||||
|
newer = toMerge
|
||||||
|
older = existing
|
||||||
|
} else {
|
||||||
|
newer = existing
|
||||||
|
older = toMerge
|
||||||
|
}
|
||||||
|
|
||||||
|
// build the main torrent
|
||||||
mainTorrent := Torrent{
|
mainTorrent := Torrent{
|
||||||
Name: existing.Name,
|
Name: newer.Name,
|
||||||
OriginalName: existing.OriginalName,
|
OriginalName: newer.OriginalName,
|
||||||
Rename: existing.Rename,
|
Rename: newer.Rename,
|
||||||
Hash: existing.Hash,
|
Hash: newer.Hash,
|
||||||
DownloadedIDs: mapset.NewSet[string](),
|
Added: newer.Added,
|
||||||
InProgressIDs: mapset.NewSet[string](),
|
|
||||||
// UnassignedLinks: mapset.NewSet[string](),
|
DownloadedIDs: newer.DownloadedIDs.Union(older.DownloadedIDs),
|
||||||
UnassignedLinks: existing.UnassignedLinks.Union(toMerge.UnassignedLinks),
|
InProgressIDs: newer.InProgressIDs.Union(older.InProgressIDs),
|
||||||
BrokenLinks: existing.BrokenLinks.Union(toMerge.BrokenLinks),
|
UnassignedLinks: newer.UnassignedLinks.Union(older.UnassignedLinks),
|
||||||
|
SelectedFiles: newer.SelectedFiles,
|
||||||
}
|
}
|
||||||
|
|
||||||
// unrepairable reason
|
// unrepairable reason
|
||||||
if existing.UnrepairableReason != "" && toMerge.UnrepairableReason != "" && existing.UnrepairableReason != toMerge.UnrepairableReason {
|
if newer.UnrepairableReason != "" && older.UnrepairableReason != "" && newer.UnrepairableReason != older.UnrepairableReason {
|
||||||
mainTorrent.UnrepairableReason = fmt.Sprintf("%s, %s", existing.UnrepairableReason, toMerge.UnrepairableReason)
|
mainTorrent.UnrepairableReason = fmt.Sprintf("%s, %s", newer.UnrepairableReason, older.UnrepairableReason)
|
||||||
} else if existing.UnrepairableReason != "" {
|
} else if newer.UnrepairableReason != "" {
|
||||||
mainTorrent.UnrepairableReason = existing.UnrepairableReason
|
mainTorrent.UnrepairableReason = newer.UnrepairableReason
|
||||||
} else if toMerge.UnrepairableReason != "" {
|
} else if older.UnrepairableReason != "" {
|
||||||
mainTorrent.UnrepairableReason = toMerge.UnrepairableReason
|
mainTorrent.UnrepairableReason = older.UnrepairableReason
|
||||||
}
|
}
|
||||||
|
|
||||||
// this function triggers only when we have a new DownloadedID
|
// update in progress ids
|
||||||
toMerge.DownloadedIDs.Difference(existing.DownloadedIDs).Each(func(id string) bool {
|
mainTorrent.DownloadedIDs.Each(func(id string) bool {
|
||||||
mainTorrent.DownloadedIDs.Add(id)
|
|
||||||
mainTorrent.InProgressIDs.Remove(id)
|
mainTorrent.InProgressIDs.Remove(id)
|
||||||
return false
|
return false
|
||||||
})
|
})
|
||||||
|
|
||||||
// the link can have the following values
|
// the link can have the following values
|
||||||
// 1. https://*** - the file is available
|
// 1. https://*** - the file is available
|
||||||
// 2. repair - the link is there but we need to repair it
|
// 2. unselect - the file is deleted
|
||||||
// 3. unselect - the file is deleted
|
// 3. empty - the file is not available
|
||||||
// 4. empty - the file is not available
|
mainTorrent.SelectedFiles.IterCb(func(key string, file *File) {
|
||||||
mainTorrent.SelectedFiles = existing.SelectedFiles
|
if file.Link == "" {
|
||||||
toMerge.SelectedFiles.IterCb(func(filepath string, fileToMerge *File) {
|
file, ok := older.SelectedFiles.Get(key)
|
||||||
// see if it already exists in the main torrent
|
if ok {
|
||||||
if originalFile, ok := mainTorrent.SelectedFiles.Get(filepath); !ok || fileToMerge.Link == "unselect" {
|
mainTorrent.SelectedFiles.Set(key, file)
|
||||||
// if it doesn't exist in the main torrent, add it
|
|
||||||
mainTorrent.SelectedFiles.Set(filepath, fileToMerge)
|
|
||||||
} else if originalFile.Link != "unselect" {
|
|
||||||
// if it exists, compare the Added property and the link
|
|
||||||
if existing.Added < toMerge.Added {
|
|
||||||
// && strings.HasPrefix(fileToMerge.Link, "http")
|
|
||||||
// if torrentToMerge is more recent and its file has a link, update the main torrent's file
|
|
||||||
mainTorrent.SelectedFiles.Set(filepath, fileToMerge)
|
|
||||||
}
|
}
|
||||||
// else do nothing, the main torrent's file is more recent or has a valid link
|
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
older.SelectedFiles.IterCb(func(key string, file *File) {
|
||||||
if existing.Added < toMerge.Added {
|
if !mainTorrent.SelectedFiles.Has(key) {
|
||||||
mainTorrent.Added = toMerge.Added
|
mainTorrent.SelectedFiles.Set(key, file)
|
||||||
} else {
|
} else if file.Link == "unselect" {
|
||||||
mainTorrent.Added = existing.Added
|
mainTorrent.SelectedFiles.Set(key, file)
|
||||||
}
|
}
|
||||||
|
})
|
||||||
|
|
||||||
return mainTorrent
|
return mainTorrent
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -78,7 +78,7 @@ func (t *TorrentManager) repairAll(torrent *Torrent) {
|
|||||||
allTorrents, _ = t.DirectoryMap.Get(INT_ALL)
|
allTorrents, _ = t.DirectoryMap.Get(INT_ALL)
|
||||||
} else {
|
} else {
|
||||||
allTorrents = cmap.New[*Torrent]()
|
allTorrents = cmap.New[*Torrent]()
|
||||||
allTorrents.Set(t.GetKey(torrent), torrent)
|
allTorrents.Set("", torrent)
|
||||||
}
|
}
|
||||||
|
|
||||||
// collect all torrents that need to be repaired
|
// collect all torrents that need to be repaired
|
||||||
@@ -89,29 +89,6 @@ func (t *TorrentManager) repairAll(torrent *Torrent) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// save the broken files to the file cache
|
|
||||||
// broken files are also added when trying to open a file
|
|
||||||
if torrent.BrokenLinks.Cardinality() > 0 {
|
|
||||||
t.saveTorrentChangesToDisk(torrent, func(info *Torrent) {
|
|
||||||
hasBrokenFiles := false
|
|
||||||
info.SelectedFiles.IterCb(func(_ string, file *File) {
|
|
||||||
torrent.BrokenLinks.Each(func(brokenLink string) bool {
|
|
||||||
if file.Link == brokenLink {
|
|
||||||
hasBrokenFiles = true
|
|
||||||
file.Link = ""
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
return false
|
|
||||||
})
|
|
||||||
})
|
|
||||||
if hasBrokenFiles {
|
|
||||||
info.BrokenLinks = torrent.BrokenLinks
|
|
||||||
} else {
|
|
||||||
info.BrokenLinks = mapset.NewSet[string]()
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
// check 2: for broken files
|
// check 2: for broken files
|
||||||
brokenFileIDs := mapset.NewSet[int]()
|
brokenFileIDs := mapset.NewSet[int]()
|
||||||
torrent.SelectedFiles.IterCb(func(_ string, file *File) {
|
torrent.SelectedFiles.IterCb(func(_ string, file *File) {
|
||||||
@@ -204,10 +181,7 @@ func (t *TorrentManager) repair(torrent *Torrent) {
|
|||||||
file.Link = info.Links[ix]
|
file.Link = info.Links[ix]
|
||||||
ix++
|
ix++
|
||||||
})
|
})
|
||||||
torrent.BrokenLinks = mapset.NewSet[string]()
|
t.saveTorrentChangesToDisk(torrent, nil)
|
||||||
t.saveTorrentChangesToDisk(torrent, func(info *Torrent) {
|
|
||||||
info.BrokenLinks = mapset.NewSet[string]()
|
|
||||||
})
|
|
||||||
t.log.Infof("Successfully repaired torrent %s using repair_method#1", t.GetKey(torrent))
|
t.log.Infof("Successfully repaired torrent %s using repair_method#1", t.GetKey(torrent))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -554,10 +528,7 @@ func (t *TorrentManager) handleFixers(fixer realdebrid.Torrent) *Torrent {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
torrent.BrokenLinks = mapset.NewSet[string]()
|
t.saveTorrentChangesToDisk(torrent, nil)
|
||||||
t.saveTorrentChangesToDisk(torrent, func(info *Torrent) {
|
|
||||||
info.BrokenLinks = mapset.NewSet[string]()
|
|
||||||
})
|
|
||||||
t.log.Infof("Successfully repaired torrent %s using repair_method#2", t.GetKey(torrent))
|
t.log.Infof("Successfully repaired torrent %s using repair_method#2", t.GetKey(torrent))
|
||||||
} else {
|
} else {
|
||||||
t.log.Warnf("repair_method#2: Fixer is done but torrent %s is still broken; let's keep the fixer", t.GetKey(torrent))
|
t.log.Warnf("repair_method#2: Fixer is done but torrent %s is still broken; let's keep the fixer", t.GetKey(torrent))
|
||||||
|
|||||||
@@ -24,7 +24,6 @@ type Torrent struct {
|
|||||||
Rename string `json:"Rename"` // modified over time
|
Rename string `json:"Rename"` // modified over time
|
||||||
SelectedFiles cmap.ConcurrentMap[string, *File] `json:"-"` // modified over time
|
SelectedFiles cmap.ConcurrentMap[string, *File] `json:"-"` // modified over time
|
||||||
UnrepairableReason string `json:"Unfixable"` // modified over time
|
UnrepairableReason string `json:"Unfixable"` // modified over time
|
||||||
BrokenLinks mapset.Set[string] `json:"BrokenLinks"` // only relevant on repair
|
|
||||||
|
|
||||||
Version string `json:"Version"` // only used for files
|
Version string `json:"Version"` // only used for files
|
||||||
}
|
}
|
||||||
@@ -36,7 +35,6 @@ func (t *Torrent) MarshalJSON() ([]byte, error) {
|
|||||||
DownloadedIDsJson stdjson.RawMessage `json:"DownloadedIDs"`
|
DownloadedIDsJson stdjson.RawMessage `json:"DownloadedIDs"`
|
||||||
InProgressIDsJson stdjson.RawMessage `json:"InProgressIDs"`
|
InProgressIDsJson stdjson.RawMessage `json:"InProgressIDs"`
|
||||||
UnassignedLinksJson stdjson.RawMessage `json:"UnassignedLinks"`
|
UnassignedLinksJson stdjson.RawMessage `json:"UnassignedLinks"`
|
||||||
BrokenLinksJson stdjson.RawMessage `json:"BrokenLinks"`
|
|
||||||
*Alias
|
*Alias
|
||||||
}{
|
}{
|
||||||
Alias: (*Alias)(t),
|
Alias: (*Alias)(t),
|
||||||
@@ -69,13 +67,6 @@ func (t *Torrent) MarshalJSON() ([]byte, error) {
|
|||||||
temp.UnassignedLinksJson = []byte(unassignedLinksStr)
|
temp.UnassignedLinksJson = []byte(unassignedLinksStr)
|
||||||
}
|
}
|
||||||
|
|
||||||
if t.BrokenLinks.IsEmpty() {
|
|
||||||
temp.BrokenLinksJson = []byte(`""`)
|
|
||||||
} else {
|
|
||||||
brokenLinksStr := `"` + strings.Join(t.BrokenLinks.ToSlice(), ",") + `"`
|
|
||||||
temp.BrokenLinksJson = []byte(brokenLinksStr)
|
|
||||||
}
|
|
||||||
|
|
||||||
return json.Marshal(temp)
|
return json.Marshal(temp)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -86,7 +77,6 @@ func (t *Torrent) UnmarshalJSON(data []byte) error {
|
|||||||
DownloadedIDsJson stdjson.RawMessage `json:"DownloadedIDs"`
|
DownloadedIDsJson stdjson.RawMessage `json:"DownloadedIDs"`
|
||||||
InProgressIDsJson stdjson.RawMessage `json:"InProgressIDs"`
|
InProgressIDsJson stdjson.RawMessage `json:"InProgressIDs"`
|
||||||
UnassignedLinksJson stdjson.RawMessage `json:"UnassignedLinks"`
|
UnassignedLinksJson stdjson.RawMessage `json:"UnassignedLinks"`
|
||||||
BrokenLinksJson stdjson.RawMessage `json:"BrokenLinks"`
|
|
||||||
*Alias
|
*Alias
|
||||||
}{
|
}{
|
||||||
Alias: (*Alias)(t),
|
Alias: (*Alias)(t),
|
||||||
@@ -123,13 +113,6 @@ func (t *Torrent) UnmarshalJSON(data []byte) error {
|
|||||||
t.UnassignedLinks = mapset.NewSet[string]()
|
t.UnassignedLinks = mapset.NewSet[string]()
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(temp.BrokenLinksJson) > 2 {
|
|
||||||
brokenLinks := strings.Split(strings.ReplaceAll(string(temp.BrokenLinksJson), `"`, ""), ",")
|
|
||||||
t.BrokenLinks = mapset.NewSet[string](brokenLinks...)
|
|
||||||
} else {
|
|
||||||
t.BrokenLinks = mapset.NewSet[string]()
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -49,17 +49,17 @@ func (dl *Downloader) DownloadFile(directory, torrentName, fileName string, resp
|
|||||||
http.Error(resp, "File is not available", http.StatusNotFound)
|
http.Error(resp, "File is not available", http.StatusNotFound)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
link := file.Link
|
|
||||||
|
|
||||||
unrestrict := torMgr.UnrestrictUntilOk(link)
|
log.Debugf("Opening file %s from torrent %s (%s)", fileName, torMgr.GetKey(torrent), file.Link)
|
||||||
|
|
||||||
|
unrestrict := torMgr.UnrestrictUntilOk(file.Link)
|
||||||
if unrestrict == nil {
|
if unrestrict == nil {
|
||||||
log.Warnf("File %s cannot be unrestricted (link=%s)", fileName, link)
|
log.Warnf("File %s cannot be unrestricted (link=%s)", fileName, file.Link)
|
||||||
torrent.BrokenLinks.Add(file.Link)
|
file.Link = ""
|
||||||
// file.Link = "repair"
|
|
||||||
if cfg.EnableRepair() {
|
if cfg.EnableRepair() {
|
||||||
torMgr.TriggerRepair(torrent)
|
torMgr.TriggerRepair(torrent)
|
||||||
} else {
|
} else {
|
||||||
log.Debugf("Repair is disabled, skipping repair for unavailable file %s (link=%s)", fileName, link)
|
log.Debugf("Repair is disabled, skipping repair for unavailable file %s (link=%s)", fileName, file.Link)
|
||||||
}
|
}
|
||||||
http.Error(resp, "File is not available", http.StatusNotFound)
|
http.Error(resp, "File is not available", http.StatusNotFound)
|
||||||
return
|
return
|
||||||
@@ -100,6 +100,8 @@ func (dl *Downloader) DownloadLink(fileName, link string, resp http.ResponseWrit
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
log.Debugf("Opening file %s (%s)", fileName, link)
|
||||||
|
|
||||||
unrestrict := torMgr.UnrestrictUntilOk(link)
|
unrestrict := torMgr.UnrestrictUntilOk(link)
|
||||||
if unrestrict == nil {
|
if unrestrict == nil {
|
||||||
log.Warnf("File %s cannot be unrestricted (link=%s)", fileName, link)
|
log.Warnf("File %s cannot be unrestricted (link=%s)", fileName, link)
|
||||||
@@ -155,17 +157,16 @@ func (dl *Downloader) streamFileToResponse(torrent *intTor.Torrent, file *intTor
|
|||||||
}
|
}
|
||||||
|
|
||||||
if torrent != nil {
|
if torrent != nil {
|
||||||
log.Debugf("Opening file %s from torrent %s (%s)%s", unrestrict.Download, torMgr.GetKey(torrent), unrestrict.Link, rangeLog)
|
log.Debugf("Downloading unrestricted link %s from torrent %s (%s)%s", unrestrict.Download, torMgr.GetKey(torrent), unrestrict.Link, rangeLog)
|
||||||
} else {
|
} else {
|
||||||
log.Debugf("Opening file %s (%s)%s", unrestrict.Download, unrestrict.Link, rangeLog)
|
log.Debugf("Downloading unrestricted link %s (%s)%s", unrestrict.Download, unrestrict.Link, rangeLog)
|
||||||
}
|
}
|
||||||
|
|
||||||
download, err := dl.client.Do(dlReq)
|
download, err := dl.client.Do(dlReq)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warnf("Cannot download file %s: %v", unrestrict.Download, err)
|
log.Warnf("Cannot download file %s: %v", unrestrict.Download, err)
|
||||||
if file != nil && unrestrict.Streamable == 1 {
|
if file != nil && unrestrict.Streamable == 1 {
|
||||||
torrent.BrokenLinks.Add(file.Link)
|
file.Link = ""
|
||||||
// file.Link = "repair"
|
|
||||||
if cfg.EnableRepair() && torrent != nil {
|
if cfg.EnableRepair() && torrent != nil {
|
||||||
torMgr.TriggerRepair(torrent)
|
torMgr.TriggerRepair(torrent)
|
||||||
} else {
|
} else {
|
||||||
@@ -181,8 +182,7 @@ func (dl *Downloader) streamFileToResponse(torrent *intTor.Torrent, file *intTor
|
|||||||
if download.StatusCode/100 != 2 {
|
if download.StatusCode/100 != 2 {
|
||||||
if file != nil && unrestrict.Streamable == 1 {
|
if file != nil && unrestrict.Streamable == 1 {
|
||||||
log.Warnf("Received a %s status code for file %s", download.Status, file.Path)
|
log.Warnf("Received a %s status code for file %s", download.Status, file.Path)
|
||||||
torrent.BrokenLinks.Add(file.Link)
|
file.Link = ""
|
||||||
// file.Link = "repair"
|
|
||||||
if cfg.EnableRepair() && torrent != nil {
|
if cfg.EnableRepair() && torrent != nil {
|
||||||
torMgr.TriggerRepair(torrent)
|
torMgr.TriggerRepair(torrent)
|
||||||
} else {
|
} else {
|
||||||
@@ -199,7 +199,7 @@ func (dl *Downloader) streamFileToResponse(torrent *intTor.Torrent, file *intTor
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Debugf("Started serving file %s%s", unrestrict.Filename, rangeLog)
|
log.Debugf("Serving file %s%s", unrestrict.Download, rangeLog)
|
||||||
|
|
||||||
buf := make([]byte, cfg.GetNetworkBufferSize())
|
buf := make([]byte, cfg.GetNetworkBufferSize())
|
||||||
io.CopyBuffer(resp, download.Body, buf)
|
io.CopyBuffer(resp, download.Body, buf)
|
||||||
|
|||||||
Reference in New Issue
Block a user