From 413631062245dc62f1f03ad9ef1a1f72ba0d3362 Mon Sep 17 00:00:00 2001 From: Ben Sarmiento Date: Sat, 4 Nov 2023 14:13:24 +0100 Subject: [PATCH] Resolve race condition issues --- cmd/zurg/main.go | 37 ++++++++++++++++++++---- go.mod | 8 ++++++ go.sum | 10 +++++++ internal/config/load.go | 1 + internal/config/types.go | 5 ++++ internal/mount/mount.go | 38 +++++++++++++++++++++++++ internal/torrent/manager.go | 56 ++++++++++++++++++++++++------------- internal/torrent/types.go | 7 ++++- 8 files changed, 136 insertions(+), 26 deletions(-) create mode 100644 internal/mount/mount.go diff --git a/cmd/zurg/main.go b/cmd/zurg/main.go index 57652b2..79cedcb 100644 --- a/cmd/zurg/main.go +++ b/cmd/zurg/main.go @@ -1,12 +1,17 @@ package main import ( + "context" "fmt" "log" "net/http" + "os" + "os/signal" + "syscall" "time" "github.com/debridmediamanager.com/zurg/internal/config" + "github.com/debridmediamanager.com/zurg/internal/mount" "github.com/debridmediamanager.com/zurg/internal/net" "github.com/debridmediamanager.com/zurg/internal/torrent" "github.com/hashicorp/golang-lru/v2/expirable" @@ -26,9 +31,31 @@ func main() { net.Router(mux, config, t, cache) addr := fmt.Sprintf(":%s", config.GetPort()) - log.Printf("Starting server on %s\n", addr) - err := http.ListenAndServe(addr, mux) - if err != nil { - log.Panicf("Failed to start server: %v", err) - } + server := &http.Server{Addr: addr, Handler: mux} + + shutdown := make(chan os.Signal, 1) + signal.Notify(shutdown, os.Interrupt, syscall.SIGTERM) + + go func() { + log.Printf("Starting server on %s\n", addr) + if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed { + log.Panicf("Failed to start server: %v", err) + } + }() + + // Start the mount in a goroutine. + go func() { + if err := mount.Mount(config.GetMountPath()); err != nil { + log.Panicf("Failed to mount: %v", err) + } + }() + + <-shutdown + log.Println("zurg signing off...") + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + server.Shutdown(ctx) + } diff --git a/go.mod b/go.mod index 6c4297d..87e6b19 100644 --- a/go.mod +++ b/go.mod @@ -6,3 +6,11 @@ require ( github.com/hashicorp/golang-lru/v2 v2.0.7 gopkg.in/yaml.v3 v3.0.1 ) + +require ( + bazil.org/fuse v0.0.0-20230120002735-62a210ff1fd5 // indirect + github.com/mdlayher/sdnotify v1.0.0 // indirect + go.uber.org/multierr v1.11.0 // indirect + go.uber.org/zap v1.26.0 // indirect + golang.org/x/sys v0.4.0 // indirect +) diff --git a/go.sum b/go.sum index c282774..b4cea36 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,15 @@ +bazil.org/fuse v0.0.0-20230120002735-62a210ff1fd5 h1:A0NsYy4lDBZAC6QiYeJ4N+XuHIKBpyhAVRMHRQZKTeQ= +bazil.org/fuse v0.0.0-20230120002735-62a210ff1fd5/go.mod h1:gG3RZAMXCa/OTes6rr9EwusmR1OH1tDDy+cg9c5YliY= github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= +github.com/mdlayher/sdnotify v1.0.0 h1:Ma9XeLVN/l0qpyx1tNeMSeTjCPH6NtuD6/N9XdTlQ3c= +github.com/mdlayher/sdnotify v1.0.0/go.mod h1:HQUmpM4XgYkhDLtd+Uad8ZFK1T9D5+pNxnXQjCeJlGE= +go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= +go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= +go.uber.org/zap v1.26.0 h1:sI7k6L95XOKS281NhVKOFCUNIvv9e0w4BF8N3u+tCRo= +go.uber.org/zap v1.26.0/go.mod h1:dtElttAiwGvoJ/vj4IwHBS/gXsEu/pZ50mUIRWuG0so= +golang.org/x/sys v0.4.0 h1:Zr2JFtRQNX3BCZ8YtxRE9hNJYC8J6I1MVbMg6owUp18= +golang.org/x/sys v0.4.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= diff --git a/internal/config/load.go b/internal/config/load.go index 02391b2..bd60cb6 100644 --- a/internal/config/load.go +++ b/internal/config/load.go @@ -19,6 +19,7 @@ type ConfigInterface interface { MeetsConditions(directory, torrentID, torrentName string, fileNames []string) bool GetOnLibraryUpdate() string GetNetworkBufferSize() int + GetMountPath() string } func LoadZurgConfig(filename string) (ConfigInterface, error) { diff --git a/internal/config/types.go b/internal/config/types.go index 330cd20..9fafcd1 100644 --- a/internal/config/types.go +++ b/internal/config/types.go @@ -10,6 +10,7 @@ type ZurgConfig struct { CanRepair bool `yaml:"enable_repair"` OnLibraryUpdate string `yaml:"on_library_update"` NetworkBufferSize int `yaml:"network_buffer_size"` + MountPath string `yaml:"mount_path"` } func (z *ZurgConfig) GetToken() string { @@ -55,3 +56,7 @@ func (z *ZurgConfig) GetNetworkBufferSize() int { } return z.NetworkBufferSize } + +func (z *ZurgConfig) GetMountPath() string { + return z.MountPath +} diff --git a/internal/mount/mount.go b/internal/mount/mount.go new file mode 100644 index 0000000..50596c6 --- /dev/null +++ b/internal/mount/mount.go @@ -0,0 +1,38 @@ +package mount + +import ( + "errors" + "log" + "os" + + "bazil.org/fuse" + "github.com/mdlayher/sdnotify" +) + +func Mount(mountpoint string) error { + n, err := sdnotify.New() + if err != nil && !errors.Is(err, os.ErrNotExist) { + log.Fatalf("failed to open systemd notifier: %v", err) + } + err = n.Notify( + sdnotify.Statusf("service started successfully"), + sdnotify.Ready, + ) + if err != nil { + log.Fatalf("failed to send ready notification: %v", err) + } + return Unmount(mountpoint, n) +} + +func Unmount(mountpoint string, n *sdnotify.Notifier) error { + log.Println("Unmounting...") + err := n.Notify( + sdnotify.Statusf("service stopped successfully"), + sdnotify.Ready, + ) + if err != nil { + log.Fatalf("failed to send stop notification: %v", err) + } + fuse.Unmount(mountpoint) + return nil +} diff --git a/internal/torrent/manager.go b/internal/torrent/manager.go index 271bd97..4106d4a 100644 --- a/internal/torrent/manager.go +++ b/internal/torrent/manager.go @@ -16,33 +16,41 @@ import ( ) type TorrentManager struct { - requiredVersion string - torrents []Torrent - inProgress []string - checksum string - config config.ConfigInterface - cache *expirable.LRU[string, string] - workerPool chan bool - TorrentDirectoriesMap map[string][]string - processedTorrents map[string][]string + requiredVersion string + torrents []Torrent + inProgress []string + checksum string + config config.ConfigInterface + cache *expirable.LRU[string, string] + workerPool chan bool + directoryMap map[string][]string + processedTorrents map[string][]string + mu *sync.Mutex } // NewTorrentManager creates a new torrent manager // it will fetch all torrents and their info in the background -// and store them in-memory +// and store them in-memory; it is called only once at startup func NewTorrentManager(config config.ConfigInterface, cache *expirable.LRU[string, string]) *TorrentManager { t := &TorrentManager{ - requiredVersion: "28.10.2023", - config: config, - cache: cache, - workerPool: make(chan bool, config.GetNumOfWorkers()), - TorrentDirectoriesMap: make(map[string][]string), - processedTorrents: make(map[string][]string), + requiredVersion: "4.11.2023", + config: config, + cache: cache, + workerPool: make(chan bool, config.GetNumOfWorkers()), + directoryMap: make(map[string][]string), + processedTorrents: make(map[string][]string), + mu: &sync.Mutex{}, } // Initialize torrents for the first time + log.Println("Initializing torrents") + t.mu.Lock() + log.Println("Fetching torrents") t.torrents = t.getFreshListFromAPI() t.checksum = t.getChecksum() + t.mu.Unlock() + log.Println("Finished fetching torrents") + // log.Println("First checksum", t.checksum) var wg sync.WaitGroup @@ -72,7 +80,7 @@ func NewTorrentManager(config config.ConfigInterface, cache *expirable.LRU[strin func (t *TorrentManager) GetByDirectory(directory string) []Torrent { var torrents []Torrent for i := range t.torrents { - for _, dir := range t.TorrentDirectoriesMap[t.torrents[i].Name] { + for _, dir := range t.directoryMap[t.torrents[i].Name] { if dir == directory { torrents = append(torrents, t.torrents[i]) } @@ -195,8 +203,11 @@ func (t *TorrentManager) startRefreshJob() { wg.Wait() // apply side effects + t.mu.Lock() t.torrents = newTorrents t.checksum = t.getChecksum() + t.mu.Unlock() + // log.Println("Checksum changed", t.checksum) if t.config.EnableRepair() { go t.repairAll(&wg) @@ -223,6 +234,8 @@ func (t *TorrentManager) getFreshListFromAPI() []Torrent { torrentV2 := Torrent{ Torrent: torrent, SelectedFiles: nil, + ForRepair: false, + lock: &sync.Mutex{}, } torrentsV2 = append(torrentsV2, torrentV2) @@ -361,7 +374,7 @@ func (t *TorrentManager) mapToDirectories() { if configV1.MeetsConditions(directory, t.torrents[i].ID, t.torrents[i].Name, filenames) { found := false // check if it is already mapped to this directory - for _, dir := range t.TorrentDirectoriesMap[t.torrents[i].Name] { + for _, dir := range t.directoryMap[t.torrents[i].Name] { if dir == directory { found = true break // it is already mapped to this directory @@ -369,12 +382,16 @@ func (t *TorrentManager) mapToDirectories() { } if !found { counter[directory]++ - t.TorrentDirectoriesMap[t.torrents[i].Name] = append(t.TorrentDirectoriesMap[t.torrents[i].Name], directory) + t.mu.Lock() + t.directoryMap[t.torrents[i].Name] = append(t.directoryMap[t.torrents[i].Name], directory) + t.mu.Unlock() break // we found a directory for this torrent, so we can stop looking for more } } } + t.mu.Lock() t.processedTorrents[t.torrents[i].Name] = append(t.processedTorrents[t.torrents[i].Name], group) + t.mu.Unlock() } sum := 0 for _, count := range counter { @@ -439,7 +456,6 @@ func (t *TorrentManager) readFromFile(torrentID string) *Torrent { dataDecoder := gob.NewDecoder(file) err = dataDecoder.Decode(&torrent) if err != nil { - log.Fatalf("Failed decoding file: %s", err) return nil } if torrent.Version != t.requiredVersion { diff --git a/internal/torrent/types.go b/internal/torrent/types.go index 1b73984..37b0156 100644 --- a/internal/torrent/types.go +++ b/internal/torrent/types.go @@ -1,12 +1,17 @@ package torrent -import "github.com/debridmediamanager.com/zurg/pkg/realdebrid" +import ( + "sync" + + "github.com/debridmediamanager.com/zurg/pkg/realdebrid" +) type Torrent struct { Version string realdebrid.Torrent SelectedFiles []File ForRepair bool + lock *sync.Mutex } type File struct {