Introduce components
This commit is contained in:
@@ -21,7 +21,7 @@ func (t *TorrentManager) refreshTorrents(isInitialRun bool) []string {
|
||||
return nil
|
||||
}
|
||||
t.log.Infof("Fetched %d torrents", len(instances))
|
||||
infoChan := make(chan *Torrent, len(instances))
|
||||
torChan := make(chan *Torrent, len(instances))
|
||||
var wg sync.WaitGroup
|
||||
|
||||
for i := range instances {
|
||||
@@ -29,60 +29,56 @@ func (t *TorrentManager) refreshTorrents(isInitialRun bool) []string {
|
||||
wg.Add(1)
|
||||
_ = t.workerPool.Submit(func() {
|
||||
defer wg.Done()
|
||||
infoChan <- t.getMoreInfo(instances[idx])
|
||||
torChan <- t.getMoreInfo(instances[idx])
|
||||
})
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
close(infoChan)
|
||||
close(torChan)
|
||||
t.log.Infof("Fetched info for %d torrents", len(instances))
|
||||
|
||||
var updatedPaths []string
|
||||
noInfoCount := 0
|
||||
allTorrents, _ := t.DirectoryMap.Get(INT_ALL)
|
||||
freshAccessKeys := mapset.NewSet[string]()
|
||||
deletedIDs := t.allIDs.Clone()
|
||||
for info := range infoChan {
|
||||
if info == nil {
|
||||
for torrent := range torChan {
|
||||
if torrent == nil {
|
||||
noInfoCount++
|
||||
continue
|
||||
}
|
||||
|
||||
infoID, _ := info.DownloadedIDs.Clone().Pop()
|
||||
deletedIDs.Remove(infoID)
|
||||
accessKey := t.GetKey(info)
|
||||
// there's only 1 component torrent at this point, let's get it
|
||||
var tInfo *realdebrid.TorrentInfo
|
||||
for _, tInfo = range torrent.Components {
|
||||
break
|
||||
}
|
||||
accessKey := t.GetKey(torrent)
|
||||
freshAccessKeys.Add(accessKey)
|
||||
|
||||
// update allTorrents
|
||||
isNewID := false
|
||||
mainTorrent, exists := allTorrents.Get(accessKey)
|
||||
if !exists {
|
||||
allTorrents.Set(accessKey, info)
|
||||
} else {
|
||||
if !mainTorrent.DownloadedIDs.Contains(infoID) {
|
||||
merged := t.mergeToMain(mainTorrent, info)
|
||||
allTorrents.Set(accessKey, merged)
|
||||
}
|
||||
allTorrents.Set(accessKey, torrent)
|
||||
mainTorrent = torrent
|
||||
isNewID = true
|
||||
} else if _, ok := mainTorrent.Components[tInfo.ID]; !ok {
|
||||
merged := t.mergeToMain(mainTorrent, torrent)
|
||||
allTorrents.Set(accessKey, merged)
|
||||
mainTorrent = merged
|
||||
isNewID = true
|
||||
}
|
||||
|
||||
// check for newly finished torrents for assigning to directories
|
||||
isDone := info.DownloadedIDs.Cardinality() > 0 && info.InProgressIDs.IsEmpty()
|
||||
if isDone && !t.allIDs.Contains(infoID) {
|
||||
var directories []string
|
||||
mainTor, _ := allTorrents.Get(accessKey)
|
||||
t.assignedDirectoryCb(mainTor, func(directory string) {
|
||||
if isNewID && tInfo.Progress == 100 {
|
||||
// assign to directory
|
||||
t.assignedDirectoryCb(mainTorrent, func(directory string) {
|
||||
listing, _ := t.DirectoryMap.Get(directory)
|
||||
listing.Set(accessKey, mainTor)
|
||||
listing.Set(accessKey, mainTorrent)
|
||||
|
||||
updatedPaths = append(updatedPaths, fmt.Sprintf("%s/%s", directory, accessKey))
|
||||
// this is just for the logs
|
||||
if directory != config.ALL_TORRENTS {
|
||||
directories = append(directories, directory)
|
||||
}
|
||||
})
|
||||
t.allIDs.Add(infoID)
|
||||
}
|
||||
}
|
||||
t.allIDs.RemoveAll(deletedIDs.ToSlice()...)
|
||||
t.log.Infof("Compiled into %d torrents, %d were missing info", allTorrents.Count(), noInfoCount)
|
||||
|
||||
// removed torrents
|
||||
@@ -138,24 +134,14 @@ func (t *TorrentManager) StartRefreshJob() {
|
||||
|
||||
// getMoreInfo gets original name, size and files for a torrent
|
||||
func (t *TorrentManager) getMoreInfo(rdTorrent realdebrid.Torrent) *Torrent {
|
||||
infoCache, _ := t.DirectoryMap.Get(INT_INFO_CACHE)
|
||||
|
||||
if cachedTor, exists := infoCache.Get(rdTorrent.ID); exists &&
|
||||
cachedTor.SelectedFiles.Count() == len(rdTorrent.Links) {
|
||||
|
||||
return cachedTor
|
||||
|
||||
} else if diskTor := t.readTorrentFromFile(rdTorrent.ID); diskTor != nil && !diskTor.AllInProgress() {
|
||||
|
||||
infoCache.Set(rdTorrent.ID, diskTor)
|
||||
t.ResetSelectedFiles(diskTor)
|
||||
return diskTor
|
||||
}
|
||||
|
||||
info, err := t.api.GetTorrentInfo(rdTorrent.ID)
|
||||
if err != nil {
|
||||
t.log.Warnf("Cannot get info for id=%s: %v", rdTorrent.ID, err)
|
||||
return nil
|
||||
info := t.readInfoFromFile(rdTorrent.ID)
|
||||
if info == nil {
|
||||
var err error
|
||||
info, err = t.api.GetTorrentInfo(rdTorrent.ID)
|
||||
if err != nil {
|
||||
t.log.Warnf("Cannot get info for id=%s: %v", rdTorrent.ID, err)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
torrent := Torrent{
|
||||
@@ -204,15 +190,10 @@ func (t *TorrentManager) getMoreInfo(rdTorrent realdebrid.Torrent) *Torrent {
|
||||
torrent.SelectedFiles.Set(filename, file)
|
||||
}
|
||||
}
|
||||
torrent.DownloadedIDs = mapset.NewSet[string]()
|
||||
torrent.InProgressIDs = mapset.NewSet[string]()
|
||||
if rdTorrent.Progress == 100 {
|
||||
torrent.DownloadedIDs.Add(info.ID)
|
||||
// save to cache if it's not in progress anymore
|
||||
infoCache.Set(rdTorrent.ID, &torrent)
|
||||
t.saveTorrentChangesToDisk(&torrent, nil)
|
||||
} else {
|
||||
torrent.InProgressIDs.Add(info.ID)
|
||||
torrent.Components = map[string]*realdebrid.TorrentInfo{rdTorrent.ID: info}
|
||||
|
||||
if info.Progress == 100 {
|
||||
t.writeInfoToFile(info)
|
||||
}
|
||||
|
||||
return &torrent
|
||||
@@ -247,6 +228,14 @@ func (t *TorrentManager) mergeToMain(existing, toMerge *Torrent) *Torrent {
|
||||
older = toMerge
|
||||
}
|
||||
|
||||
mergedComponents := map[string]*realdebrid.TorrentInfo{}
|
||||
for k, v := range older.Components {
|
||||
mergedComponents[k] = v
|
||||
}
|
||||
for k, v := range newer.Components {
|
||||
mergedComponents[k] = v
|
||||
}
|
||||
|
||||
// build the main torrent
|
||||
mainTorrent := Torrent{
|
||||
Name: newer.Name,
|
||||
@@ -255,8 +244,7 @@ func (t *TorrentManager) mergeToMain(existing, toMerge *Torrent) *Torrent {
|
||||
Hash: newer.Hash,
|
||||
Added: newer.Added,
|
||||
|
||||
DownloadedIDs: newer.DownloadedIDs.Union(older.DownloadedIDs),
|
||||
InProgressIDs: newer.InProgressIDs.Union(older.InProgressIDs),
|
||||
Components: mergedComponents,
|
||||
UnassignedLinks: newer.UnassignedLinks.Union(older.UnassignedLinks),
|
||||
UnrepairableReason: newer.UnrepairableReason,
|
||||
}
|
||||
@@ -268,12 +256,6 @@ func (t *TorrentManager) mergeToMain(existing, toMerge *Torrent) *Torrent {
|
||||
mainTorrent.UnrepairableReason = older.UnrepairableReason
|
||||
}
|
||||
|
||||
// update in progress ids
|
||||
mainTorrent.DownloadedIDs.Each(func(id string) bool {
|
||||
mainTorrent.InProgressIDs.Remove(id)
|
||||
return false
|
||||
})
|
||||
|
||||
// the link can have the following values
|
||||
// 1. https://*** - the file is available
|
||||
// 3. empty - the file is not available
|
||||
@@ -310,7 +292,10 @@ func (t *TorrentManager) mergeToMain(existing, toMerge *Torrent) *Torrent {
|
||||
}
|
||||
|
||||
func (t *TorrentManager) assignedDirectoryCb(tor *Torrent, cb func(string)) {
|
||||
torrentIDs := tor.DownloadedIDs.Union(tor.InProgressIDs).ToSlice()
|
||||
torrentIDs := []string{}
|
||||
for id := range tor.Components {
|
||||
torrentIDs = append(torrentIDs, id)
|
||||
}
|
||||
// get filenames needed for directory conditions
|
||||
var filenames []string
|
||||
var fileSizes []int64
|
||||
|
||||
Reference in New Issue
Block a user