Proactive repairs
This commit is contained in:
@@ -99,7 +99,7 @@ func (t *TorrentManager) getCurrentState() LibraryState {
|
||||
active = activeResp.torrents[0]
|
||||
case count = <-countChan:
|
||||
case err := <-errChan:
|
||||
t.log.Warnf("Checksum API Error: %v\n", err)
|
||||
t.log.Warnf("Checksum API Error: %v", err)
|
||||
return EmptyState()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -68,7 +68,7 @@ func NewTorrentManager(cfg config.ConfigInterface, api *realdebrid.RealDebrid, p
|
||||
for {
|
||||
downloads, totalDownloads, err := t.Api.GetDownloads(page, offset)
|
||||
if err != nil {
|
||||
t.log.Fatalf("Cannot get downloads: %v\n", err)
|
||||
t.log.Fatalf("Cannot get downloads: %v", err)
|
||||
}
|
||||
for i := range downloads {
|
||||
if !t.DownloadCache.Has(downloads[i].Link) {
|
||||
|
||||
@@ -16,7 +16,7 @@ import (
|
||||
func (t *TorrentManager) RefreshTorrents() []string {
|
||||
instances, _, err := t.Api.GetTorrents(0, false)
|
||||
if err != nil {
|
||||
t.log.Warnf("Cannot get torrents: %v\n", err)
|
||||
t.log.Warnf("Cannot get torrents: %v", err)
|
||||
return nil
|
||||
}
|
||||
infoChan := make(chan *Torrent, len(instances))
|
||||
@@ -125,7 +125,7 @@ func (t *TorrentManager) getMoreInfo(rdTorrent realdebrid.Torrent) *Torrent {
|
||||
|
||||
info, err := t.Api.GetTorrentInfo(rdTorrent.ID)
|
||||
if err != nil {
|
||||
t.log.Warnf("Cannot get info for id=%s: %v\n", rdTorrent.ID, err)
|
||||
t.log.Warnf("Cannot get info for id=%s: %v", rdTorrent.ID, err)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -16,8 +16,9 @@ const EXPIRED_LINK_TOLERANCE_HOURS = 24
|
||||
|
||||
func (t *TorrentManager) RepairAll() {
|
||||
_ = t.repairWorker.Submit(func() {
|
||||
t.log.Info("Repairing all broken torrents")
|
||||
t.repairAll()
|
||||
t.log.Debug("Finished repairing all torrents")
|
||||
t.log.Info("Finished repairing all torrents")
|
||||
})
|
||||
}
|
||||
|
||||
@@ -44,7 +45,9 @@ func (t *TorrentManager) repairAll() {
|
||||
currentGroup.Add(torrent.Hash)
|
||||
})
|
||||
|
||||
t.log.Debug("Checking if torrents are still cached")
|
||||
var availabilityChecks = make(map[string]bool)
|
||||
uncachedCount := 0
|
||||
for i := range hashGroups {
|
||||
resp, err := t.Api.AvailabilityCheck(hashGroups[i].ToSlice())
|
||||
if err != nil {
|
||||
@@ -55,8 +58,12 @@ func (t *TorrentManager) repairAll() {
|
||||
for hash, hosterHash := range resp {
|
||||
// Check if HosterHash is a map (Variants field is used)
|
||||
availabilityChecks[hash] = len(hosterHash.Variants) > 0
|
||||
if !availabilityChecks[hash] {
|
||||
uncachedCount++
|
||||
}
|
||||
}
|
||||
}
|
||||
t.log.Debugf("Found %d torrents that are no longer cached", uncachedCount)
|
||||
|
||||
var toRepair []*Torrent
|
||||
allTorrents.IterCb(func(_ string, torrent *Torrent) {
|
||||
@@ -69,6 +76,7 @@ func (t *TorrentManager) repairAll() {
|
||||
if _, ok := availabilityChecks[torrent.Hash]; !ok || !availabilityChecks[torrent.Hash] {
|
||||
isCached = false
|
||||
}
|
||||
// todo: also handle file ID checks
|
||||
|
||||
// check 2: for broken files
|
||||
hasBrokenFiles := false
|
||||
@@ -82,7 +90,7 @@ func (t *TorrentManager) repairAll() {
|
||||
toRepair = append(toRepair, torrent)
|
||||
}
|
||||
})
|
||||
t.log.Debugf("Found %d torrents to repair", len(toRepair))
|
||||
t.log.Debugf("Found %d broken torrents to repair in total", len(toRepair))
|
||||
for i := range toRepair {
|
||||
torrent := toRepair[i]
|
||||
t.log.Infof("Repairing %s", torrent.AccessKey)
|
||||
@@ -122,8 +130,11 @@ func (t *TorrentManager) repair(torrent *Torrent) {
|
||||
if t.reinsertTorrent(torrent, "") {
|
||||
t.log.Infof("Successfully downloaded torrent %s to repair it", torrent.AccessKey)
|
||||
return
|
||||
} else {
|
||||
} else if !torrent.Unfixable {
|
||||
t.log.Warnf("Failed to repair by reinserting torrent %s, will only redownload broken files...", torrent.AccessKey)
|
||||
} else {
|
||||
t.log.Warnf("Cannot repair torrent %s", torrent.AccessKey)
|
||||
return
|
||||
}
|
||||
} else {
|
||||
t.log.Warnf("Torrent %s is not older than %d hours to be repaired by reinsertion, will only redownload broken files...", torrent.AccessKey, EXPIRED_LINK_TOLERANCE_HOURS)
|
||||
@@ -217,8 +228,7 @@ func (t *TorrentManager) repair(torrent *Torrent) {
|
||||
if t.reinsertTorrent(torrent, brokenFileIDs) {
|
||||
t.log.Infof("Successfully downloaded torrent %s to repair it", torrent.AccessKey)
|
||||
} else {
|
||||
t.log.Warnf("Failed to repair torrent %s", torrent.AccessKey)
|
||||
t.markAsUnplayable(torrent)
|
||||
t.log.Warnf("Cannot repair torrent %s", torrent.AccessKey)
|
||||
}
|
||||
} else {
|
||||
t.log.Warnf("Torrent %s has no broken files to repair", torrent.AccessKey)
|
||||
@@ -247,6 +257,9 @@ func (t *TorrentManager) reinsertTorrent(torrent *Torrent, brokenFiles string) b
|
||||
resp, err := t.Api.AddMagnetHash(torrent.Hash)
|
||||
if err != nil {
|
||||
t.log.Warnf("Cannot redownload torrent: %v", err)
|
||||
if strings.Contains(err.Error(), "infringing_file") {
|
||||
t.markAsUnfixable(torrent)
|
||||
}
|
||||
return false
|
||||
}
|
||||
time.Sleep(1 * time.Second)
|
||||
@@ -342,10 +355,21 @@ func (t *TorrentManager) canCapacityHandle() bool {
|
||||
|
||||
func (t *TorrentManager) markAsUnplayable(torrent *Torrent) {
|
||||
t.log.Warnf("Marking torrent %s as unplayable", torrent.AccessKey)
|
||||
torrent.Unfixable = true
|
||||
t.markAsUnfixable(torrent)
|
||||
t.DirectoryMap.IterCb(func(directory string, torrents cmap.ConcurrentMap[string, *Torrent]) {
|
||||
torrents.Remove(torrent.AccessKey)
|
||||
})
|
||||
torrents, _ := t.DirectoryMap.Get(config.UNPLAYABLE_TORRENTS)
|
||||
torrents.Set(torrent.AccessKey, torrent)
|
||||
}
|
||||
|
||||
func (t *TorrentManager) markAsUnfixable(torrent *Torrent) {
|
||||
torrent.Unfixable = true
|
||||
infoCache, _ := t.DirectoryMap.Get(INT_INFO_CACHE)
|
||||
torrent.DownloadedIDs.Each(func(id string) bool {
|
||||
info, _ := infoCache.Get(id)
|
||||
info.Unfixable = true
|
||||
t.writeTorrentToFile(id, torrent)
|
||||
return false
|
||||
})
|
||||
}
|
||||
|
||||
@@ -2,6 +2,9 @@ package http
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"math"
|
||||
"net"
|
||||
"net/http"
|
||||
@@ -29,6 +32,20 @@ type HTTPClient struct {
|
||||
log *logutil.Logger
|
||||
}
|
||||
|
||||
// {
|
||||
// "error": "infringing_file",
|
||||
// "error_code": 35
|
||||
// }
|
||||
|
||||
type ErrorResponse struct {
|
||||
Message string `json:"error"`
|
||||
Code int `json:"error_code"`
|
||||
}
|
||||
|
||||
func (e *ErrorResponse) Error() string {
|
||||
return fmt.Sprintf("api response error: %s (code: %d)", e.Message, e.Code)
|
||||
}
|
||||
|
||||
func NewHTTPClient(token string, maxRetries int, timeoutSecs int, cfg config.ConfigInterface, log *logutil.Logger) *HTTPClient {
|
||||
client := HTTPClient{
|
||||
bearerToken: token,
|
||||
@@ -46,22 +63,42 @@ func NewHTTPClient(token string, maxRetries int, timeoutSecs int, cfg config.Con
|
||||
},
|
||||
getRetryIncr: func(resp *http.Response, hasRangeHeader bool, err error) int {
|
||||
if resp != nil {
|
||||
if resp.StatusCode == 429 || resp.StatusCode == 400 || resp.StatusCode == 403 {
|
||||
return 1 // retry but don't increment attempt
|
||||
if resp.StatusCode == 429 {
|
||||
return 1
|
||||
}
|
||||
if resp.StatusCode != http.StatusPartialContent && hasRangeHeader {
|
||||
return 1
|
||||
}
|
||||
return 0 // don't retry
|
||||
} else if err != nil {
|
||||
errStr := err.Error()
|
||||
if strings.Contains(errStr, "EOF") || strings.Contains(errStr, "connection reset") || strings.Contains(errStr, "no such host") {
|
||||
return 1 // retry but don't increment attempt
|
||||
} else {
|
||||
return RATE_LIMIT_FACTOR
|
||||
log.Errorf("Client request error: %s", err.Error())
|
||||
if strings.Contains(err.Error(), "api response error") {
|
||||
if apiErr, ok := err.(*ErrorResponse); ok {
|
||||
switch apiErr.Code {
|
||||
case -1: // Internal error
|
||||
return 1
|
||||
case 5: // Slow down
|
||||
return 1
|
||||
case 6: // Ressource unreachable
|
||||
return 1
|
||||
case 17: // Hoster in maintenance
|
||||
return 1
|
||||
case 19: // Hoster temporarily unavailable
|
||||
return 1
|
||||
case 25: // Service unavailable
|
||||
return 1
|
||||
case 34: // Too many requests
|
||||
return 1
|
||||
case 36: // Fair Usage Limit
|
||||
return 1
|
||||
default:
|
||||
return 0 // don't retry
|
||||
}
|
||||
}
|
||||
}
|
||||
return 1
|
||||
}
|
||||
return RATE_LIMIT_FACTOR // retry and increment attempt
|
||||
return RATE_LIMIT_FACTOR
|
||||
},
|
||||
cfg: cfg,
|
||||
ipv6: cmap.New[string](),
|
||||
@@ -116,6 +153,16 @@ func (r *HTTPClient) Do(req *http.Request) (*http.Response, error) {
|
||||
attempt := 0
|
||||
for {
|
||||
resp, err = r.client.Do(req)
|
||||
if resp != nil && resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusPartialContent {
|
||||
body, _ := io.ReadAll(resp.Body)
|
||||
if body != nil {
|
||||
var errResp ErrorResponse
|
||||
jsonErr := json.Unmarshal(body, &errResp)
|
||||
if jsonErr == nil {
|
||||
err = &errResp
|
||||
}
|
||||
}
|
||||
}
|
||||
if incr := r.getRetryIncr(resp, hasRangeHeader, err); incr > 0 {
|
||||
attempt += incr
|
||||
if attempt > r.maxRetries {
|
||||
|
||||
Reference in New Issue
Block a user