Resolve race condition issues

This commit is contained in:
Ben Sarmiento
2023-11-04 14:13:24 +01:00
parent b2e957cb4c
commit 4136310622
8 changed files with 136 additions and 26 deletions

View File

@@ -1,12 +1,17 @@
package main package main
import ( import (
"context"
"fmt" "fmt"
"log" "log"
"net/http" "net/http"
"os"
"os/signal"
"syscall"
"time" "time"
"github.com/debridmediamanager.com/zurg/internal/config" "github.com/debridmediamanager.com/zurg/internal/config"
"github.com/debridmediamanager.com/zurg/internal/mount"
"github.com/debridmediamanager.com/zurg/internal/net" "github.com/debridmediamanager.com/zurg/internal/net"
"github.com/debridmediamanager.com/zurg/internal/torrent" "github.com/debridmediamanager.com/zurg/internal/torrent"
"github.com/hashicorp/golang-lru/v2/expirable" "github.com/hashicorp/golang-lru/v2/expirable"
@@ -26,9 +31,31 @@ func main() {
net.Router(mux, config, t, cache) net.Router(mux, config, t, cache)
addr := fmt.Sprintf(":%s", config.GetPort()) addr := fmt.Sprintf(":%s", config.GetPort())
server := &http.Server{Addr: addr, Handler: mux}
shutdown := make(chan os.Signal, 1)
signal.Notify(shutdown, os.Interrupt, syscall.SIGTERM)
go func() {
log.Printf("Starting server on %s\n", addr) log.Printf("Starting server on %s\n", addr)
err := http.ListenAndServe(addr, mux) if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
if err != nil {
log.Panicf("Failed to start server: %v", err) log.Panicf("Failed to start server: %v", err)
} }
}()
// Start the mount in a goroutine.
go func() {
if err := mount.Mount(config.GetMountPath()); err != nil {
log.Panicf("Failed to mount: %v", err)
}
}()
<-shutdown
log.Println("zurg signing off...")
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
server.Shutdown(ctx)
} }

8
go.mod
View File

@@ -6,3 +6,11 @@ require (
github.com/hashicorp/golang-lru/v2 v2.0.7 github.com/hashicorp/golang-lru/v2 v2.0.7
gopkg.in/yaml.v3 v3.0.1 gopkg.in/yaml.v3 v3.0.1
) )
require (
bazil.org/fuse v0.0.0-20230120002735-62a210ff1fd5 // indirect
github.com/mdlayher/sdnotify v1.0.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.26.0 // indirect
golang.org/x/sys v0.4.0 // indirect
)

10
go.sum
View File

@@ -1,5 +1,15 @@
bazil.org/fuse v0.0.0-20230120002735-62a210ff1fd5 h1:A0NsYy4lDBZAC6QiYeJ4N+XuHIKBpyhAVRMHRQZKTeQ=
bazil.org/fuse v0.0.0-20230120002735-62a210ff1fd5/go.mod h1:gG3RZAMXCa/OTes6rr9EwusmR1OH1tDDy+cg9c5YliY=
github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k=
github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM=
github.com/mdlayher/sdnotify v1.0.0 h1:Ma9XeLVN/l0qpyx1tNeMSeTjCPH6NtuD6/N9XdTlQ3c=
github.com/mdlayher/sdnotify v1.0.0/go.mod h1:HQUmpM4XgYkhDLtd+Uad8ZFK1T9D5+pNxnXQjCeJlGE=
go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
go.uber.org/zap v1.26.0 h1:sI7k6L95XOKS281NhVKOFCUNIvv9e0w4BF8N3u+tCRo=
go.uber.org/zap v1.26.0/go.mod h1:dtElttAiwGvoJ/vj4IwHBS/gXsEu/pZ50mUIRWuG0so=
golang.org/x/sys v0.4.0 h1:Zr2JFtRQNX3BCZ8YtxRE9hNJYC8J6I1MVbMg6owUp18=
golang.org/x/sys v0.4.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=

View File

@@ -19,6 +19,7 @@ type ConfigInterface interface {
MeetsConditions(directory, torrentID, torrentName string, fileNames []string) bool MeetsConditions(directory, torrentID, torrentName string, fileNames []string) bool
GetOnLibraryUpdate() string GetOnLibraryUpdate() string
GetNetworkBufferSize() int GetNetworkBufferSize() int
GetMountPath() string
} }
func LoadZurgConfig(filename string) (ConfigInterface, error) { func LoadZurgConfig(filename string) (ConfigInterface, error) {

View File

@@ -10,6 +10,7 @@ type ZurgConfig struct {
CanRepair bool `yaml:"enable_repair"` CanRepair bool `yaml:"enable_repair"`
OnLibraryUpdate string `yaml:"on_library_update"` OnLibraryUpdate string `yaml:"on_library_update"`
NetworkBufferSize int `yaml:"network_buffer_size"` NetworkBufferSize int `yaml:"network_buffer_size"`
MountPath string `yaml:"mount_path"`
} }
func (z *ZurgConfig) GetToken() string { func (z *ZurgConfig) GetToken() string {
@@ -55,3 +56,7 @@ func (z *ZurgConfig) GetNetworkBufferSize() int {
} }
return z.NetworkBufferSize return z.NetworkBufferSize
} }
func (z *ZurgConfig) GetMountPath() string {
return z.MountPath
}

38
internal/mount/mount.go Normal file
View File

@@ -0,0 +1,38 @@
package mount
import (
"errors"
"log"
"os"
"bazil.org/fuse"
"github.com/mdlayher/sdnotify"
)
func Mount(mountpoint string) error {
n, err := sdnotify.New()
if err != nil && !errors.Is(err, os.ErrNotExist) {
log.Fatalf("failed to open systemd notifier: %v", err)
}
err = n.Notify(
sdnotify.Statusf("service started successfully"),
sdnotify.Ready,
)
if err != nil {
log.Fatalf("failed to send ready notification: %v", err)
}
return Unmount(mountpoint, n)
}
func Unmount(mountpoint string, n *sdnotify.Notifier) error {
log.Println("Unmounting...")
err := n.Notify(
sdnotify.Statusf("service stopped successfully"),
sdnotify.Ready,
)
if err != nil {
log.Fatalf("failed to send stop notification: %v", err)
}
fuse.Unmount(mountpoint)
return nil
}

View File

@@ -23,26 +23,34 @@ type TorrentManager struct {
config config.ConfigInterface config config.ConfigInterface
cache *expirable.LRU[string, string] cache *expirable.LRU[string, string]
workerPool chan bool workerPool chan bool
TorrentDirectoriesMap map[string][]string directoryMap map[string][]string
processedTorrents map[string][]string processedTorrents map[string][]string
mu *sync.Mutex
} }
// NewTorrentManager creates a new torrent manager // NewTorrentManager creates a new torrent manager
// it will fetch all torrents and their info in the background // it will fetch all torrents and their info in the background
// and store them in-memory // and store them in-memory; it is called only once at startup
func NewTorrentManager(config config.ConfigInterface, cache *expirable.LRU[string, string]) *TorrentManager { func NewTorrentManager(config config.ConfigInterface, cache *expirable.LRU[string, string]) *TorrentManager {
t := &TorrentManager{ t := &TorrentManager{
requiredVersion: "28.10.2023", requiredVersion: "4.11.2023",
config: config, config: config,
cache: cache, cache: cache,
workerPool: make(chan bool, config.GetNumOfWorkers()), workerPool: make(chan bool, config.GetNumOfWorkers()),
TorrentDirectoriesMap: make(map[string][]string), directoryMap: make(map[string][]string),
processedTorrents: make(map[string][]string), processedTorrents: make(map[string][]string),
mu: &sync.Mutex{},
} }
// Initialize torrents for the first time // Initialize torrents for the first time
log.Println("Initializing torrents")
t.mu.Lock()
log.Println("Fetching torrents")
t.torrents = t.getFreshListFromAPI() t.torrents = t.getFreshListFromAPI()
t.checksum = t.getChecksum() t.checksum = t.getChecksum()
t.mu.Unlock()
log.Println("Finished fetching torrents")
// log.Println("First checksum", t.checksum) // log.Println("First checksum", t.checksum)
var wg sync.WaitGroup var wg sync.WaitGroup
@@ -72,7 +80,7 @@ func NewTorrentManager(config config.ConfigInterface, cache *expirable.LRU[strin
func (t *TorrentManager) GetByDirectory(directory string) []Torrent { func (t *TorrentManager) GetByDirectory(directory string) []Torrent {
var torrents []Torrent var torrents []Torrent
for i := range t.torrents { for i := range t.torrents {
for _, dir := range t.TorrentDirectoriesMap[t.torrents[i].Name] { for _, dir := range t.directoryMap[t.torrents[i].Name] {
if dir == directory { if dir == directory {
torrents = append(torrents, t.torrents[i]) torrents = append(torrents, t.torrents[i])
} }
@@ -195,8 +203,11 @@ func (t *TorrentManager) startRefreshJob() {
wg.Wait() wg.Wait()
// apply side effects // apply side effects
t.mu.Lock()
t.torrents = newTorrents t.torrents = newTorrents
t.checksum = t.getChecksum() t.checksum = t.getChecksum()
t.mu.Unlock()
// log.Println("Checksum changed", t.checksum) // log.Println("Checksum changed", t.checksum)
if t.config.EnableRepair() { if t.config.EnableRepair() {
go t.repairAll(&wg) go t.repairAll(&wg)
@@ -223,6 +234,8 @@ func (t *TorrentManager) getFreshListFromAPI() []Torrent {
torrentV2 := Torrent{ torrentV2 := Torrent{
Torrent: torrent, Torrent: torrent,
SelectedFiles: nil, SelectedFiles: nil,
ForRepair: false,
lock: &sync.Mutex{},
} }
torrentsV2 = append(torrentsV2, torrentV2) torrentsV2 = append(torrentsV2, torrentV2)
@@ -361,7 +374,7 @@ func (t *TorrentManager) mapToDirectories() {
if configV1.MeetsConditions(directory, t.torrents[i].ID, t.torrents[i].Name, filenames) { if configV1.MeetsConditions(directory, t.torrents[i].ID, t.torrents[i].Name, filenames) {
found := false found := false
// check if it is already mapped to this directory // check if it is already mapped to this directory
for _, dir := range t.TorrentDirectoriesMap[t.torrents[i].Name] { for _, dir := range t.directoryMap[t.torrents[i].Name] {
if dir == directory { if dir == directory {
found = true found = true
break // it is already mapped to this directory break // it is already mapped to this directory
@@ -369,12 +382,16 @@ func (t *TorrentManager) mapToDirectories() {
} }
if !found { if !found {
counter[directory]++ counter[directory]++
t.TorrentDirectoriesMap[t.torrents[i].Name] = append(t.TorrentDirectoriesMap[t.torrents[i].Name], directory) t.mu.Lock()
t.directoryMap[t.torrents[i].Name] = append(t.directoryMap[t.torrents[i].Name], directory)
t.mu.Unlock()
break // we found a directory for this torrent, so we can stop looking for more break // we found a directory for this torrent, so we can stop looking for more
} }
} }
} }
t.mu.Lock()
t.processedTorrents[t.torrents[i].Name] = append(t.processedTorrents[t.torrents[i].Name], group) t.processedTorrents[t.torrents[i].Name] = append(t.processedTorrents[t.torrents[i].Name], group)
t.mu.Unlock()
} }
sum := 0 sum := 0
for _, count := range counter { for _, count := range counter {
@@ -439,7 +456,6 @@ func (t *TorrentManager) readFromFile(torrentID string) *Torrent {
dataDecoder := gob.NewDecoder(file) dataDecoder := gob.NewDecoder(file)
err = dataDecoder.Decode(&torrent) err = dataDecoder.Decode(&torrent)
if err != nil { if err != nil {
log.Fatalf("Failed decoding file: %s", err)
return nil return nil
} }
if torrent.Version != t.requiredVersion { if torrent.Version != t.requiredVersion {

View File

@@ -1,12 +1,17 @@
package torrent package torrent
import "github.com/debridmediamanager.com/zurg/pkg/realdebrid" import (
"sync"
"github.com/debridmediamanager.com/zurg/pkg/realdebrid"
)
type Torrent struct { type Torrent struct {
Version string Version string
realdebrid.Torrent realdebrid.Torrent
SelectedFiles []File SelectedFiles []File
ForRepair bool ForRepair bool
lock *sync.Mutex
} }
type File struct { type File struct {