Skip to content

Commit

Permalink
Finished support for config dir and reload
Browse files Browse the repository at this point in the history
  • Loading branch information
tlm committed Oct 15, 2018
1 parent 7132d9a commit 840b885
Show file tree
Hide file tree
Showing 8 changed files with 69 additions and 58 deletions.
35 changes: 23 additions & 12 deletions cmd/disttrust.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"os"
"os/signal"
"sync"
"syscall"

log "github.com/sirupsen/logrus"
Expand All @@ -13,7 +14,8 @@ import (
"github.com/tlmiller/disttrust/cmd/config"
"github.com/tlmiller/disttrust/conductor"
"github.com/tlmiller/disttrust/provider"
_ "github.com/tlmiller/disttrust/server"
"github.com/tlmiller/disttrust/server"
"github.com/tlmiller/disttrust/server/healthz"
)

var (
Expand Down Expand Up @@ -47,6 +49,9 @@ func preRun(cmd *cobra.Command, args []string) {

func Run(cmd *cobra.Command, args []string) {
providers := provider.DefaultStore()
var apiServ *server.ApiServer
apiServSetup := sync.Once{}
healthApi := healthz.New()

sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, os.Interrupt, syscall.SIGHUP)
Expand All @@ -57,6 +62,14 @@ func Run(cmd *cobra.Command, args []string) {
log.Fatalf("making config: %v", err)
}

apiServSetup.Do(func() {
if userConfig.Api.Address != "" {
apiServ = server.NewApiServer(userConfig.Api.Address)
healthApi.InstallHandler(apiServ.Mux)
go apiServ.Serve()
}
})

log.Debug("building provider store from config")
providers, err = config.ToProviderStore(userConfig.Providers, providers)
if err != nil {
Expand All @@ -70,17 +83,12 @@ func Run(cmd *cobra.Command, args []string) {
}

manager := conductor.NewConductor()
_ = manager.AddMembers(members...)

//var apiServ *server.ApiServer
//if userConfig.Api.Address != "" {
// apiServ = server.NewApiServer(userConfig.Api.Address)
// for _, s := range mstatuses {
// apiServ.AddHealthzChecks(s)
// }
// go apiServ.Serve()
//}

mstatuses := manager.AddMembers(members...)
checks := make([]healthz.Checker, len(mstatuses))
for i, ms := range mstatuses {
checks[i] = ms
}
healthApi.SetChecks(checks...)
manager.Play()

sig := <-sigCh
Expand All @@ -93,4 +101,7 @@ func Run(cmd *cobra.Command, args []string) {
break
}
}
if apiServ != nil {
apiServ.Stop()
}
}
21 changes: 3 additions & 18 deletions conductor/conductor.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,13 @@ type Conductor struct {
members []*MemberStatus
memCount int32
stopCh chan struct{}
watchCh chan *MemberStatus
waitGroup sync.WaitGroup
}

func NewConductor() *Conductor {
return &Conductor{
healthErr: nil,
stopCh: make(chan struct{}),
watchCh: make(chan *MemberStatus),
waitGroup: sync.WaitGroup{},
}
}
Expand Down Expand Up @@ -60,7 +58,9 @@ func (c *Conductor) Play() *Conductor {
case err := <-gmstatus.member.DoneCh():
log.Info("member stopped")
gmstatus.setState(false, err)
c.watchCh <- gmstatus
if _, err := gmstatus.State(); err != nil {
log.Errorf("member failed: %v", err)
}
c.waitGroup.Done()
break Outer
case <-c.stopCh:
Expand All @@ -69,25 +69,10 @@ func (c *Conductor) Play() *Conductor {
}
}()
}
go c.Watch()
return c
}

func (c *Conductor) Stop() {
close(c.stopCh)
c.waitGroup.Wait()
}

func (c *Conductor) Watch() {
for {
select {
case mstatus := <-c.watchCh:
if _, err := mstatus.State(); err != nil {
log := logrus.WithFields(logrus.Fields{
"member": mstatus.member.Name(),
})
log.Errorf("member failed: %v", err)
}
}
}
}
9 changes: 3 additions & 6 deletions server/apiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,18 @@ package server

import (
"net/http"

"github.com/tlmiller/disttrust/server/healthz"
)

type ApiServer struct {
healthzChecks []healthz.Checker
server http.Server
stopCh chan struct{}
Mux *http.ServeMux
server http.Server
}

func NewApiServer(address string) *ApiServer {
return &ApiServer{
Mux: http.NewServeMux(),
server: http.Server{
Addr: address,
},
stopCh: make(chan struct{}),
}
}
9 changes: 0 additions & 9 deletions server/healthz.go

This file was deleted.

2 changes: 2 additions & 0 deletions server/healthz/checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,5 @@ type Checker interface {
Check() error
Name() string
}

type ChecksFetcher func() []Checker
8 changes: 4 additions & 4 deletions server/healthz/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"net/http"
)

func handleRootHealthz(checks ...Checker) http.HandlerFunc {
func handleRootHealthz(fetcher ChecksFetcher) http.HandlerFunc {
return http.HandlerFunc(func(res http.ResponseWriter, req *http.Request) {
if req.Method != http.MethodGet && req.Method != http.MethodHead {
res.WriteHeader(http.StatusMethodNotAllowed)
Expand All @@ -15,7 +15,7 @@ func handleRootHealthz(checks ...Checker) http.HandlerFunc {

failed := false
var verboseOut bytes.Buffer
for _, check := range checks {
for _, check := range fetcher() {
if err := check.Check(); err != nil {
fmt.Fprintf(&verboseOut, "[-]%v failed\n", check.Name())
failed = true
Expand All @@ -41,6 +41,6 @@ func handleRootHealthz(checks ...Checker) http.HandlerFunc {
})
}

func InstallHandler(mux *http.ServeMux, checks ...Checker) {
mux.Handle("/healthz", handleRootHealthz(checks...))
func (h *Healthz) InstallHandler(mux *http.ServeMux) {
mux.Handle("/healthz", handleRootHealthz(h.Checks))
}
29 changes: 29 additions & 0 deletions server/healthz/healthz.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package healthz

import (
"sync"
)

type Healthz struct {
checks []Checker
lock sync.Mutex
}

func (h *Healthz) Checks() []Checker {
h.lock.Lock()
defer h.lock.Unlock()
return h.checks
}

func New() *Healthz {
return &Healthz{
checks: []Checker{},
lock: sync.Mutex{},
}
}

func (h *Healthz) SetChecks(checks ...Checker) {
h.lock.Lock()
defer h.lock.Unlock()
h.checks = checks
}
14 changes: 5 additions & 9 deletions server/serve.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
package server

import (
"net/http"

"github.com/tlmiller/disttrust/server/healthz"
)

func (a *ApiServer) Serve() {
mux := http.NewServeMux()
healthz.InstallHandler(mux, a.healthzChecks...)
a.server.Handler = mux
a.server.Handler = a.Mux
a.server.ListenAndServe()
}

func (a *ApiServer) Stop() {
a.server.Close()
}

0 comments on commit 840b885

Please sign in to comment.