Skip to content

Commit

Permalink
rework app starting & add config reloading
Browse files Browse the repository at this point in the history
  • Loading branch information
mejgun committed Sep 20, 2024
1 parent d173b0e commit 1d71cda
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 83 deletions.
133 changes: 77 additions & 56 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,7 @@ type flagsT struct {

const (
NoError = iota
ConfigError
LoggerError
ExtractorError
StreamerError
WebServerError
CacheError
SomeError
)

func parseCLIFlags() flagsT {
Expand All @@ -45,52 +40,21 @@ func parseCLIFlags() flagsT {
func main() {
flags := parseCLIFlags()
if flags.version {
stdout(appVersion)
os.Stdout.WriteString(fmt.Sprintf("%s\n", appVersion))
os.Exit(NoError)
}
startApp(flags.config)
}

var stdout = func(s string) { os.Stdout.WriteString(fmt.Sprintf("%s\n", s)) }
var stderr = func(s string) { os.Stderr.WriteString(fmt.Sprintf("ERROR %s\n", s)) }
var checkOrExit = func(err error, name string, errorcode int) {
if err != nil {
stderr(fmt.Sprintf("%s error. %s", name, err))
os.Exit(errorcode)
}
}
var texts = [5]string{
"Config",
"Logger",
"Extractor",
"Cache",
"Streamer",
}

func startApp(conf_file string) {
conf, err := config.Read(conf_file)
checkOrExit(err, texts[0], ConfigError)

log, err := logger.New(conf.Log)
checkOrExit(err, texts[1], LoggerError)

opts := make([]app.Option, 0)
for _, v := range conf.SubConfig {
opt := getNewApp(log, v)
opts = append(opts, opt)
conf, def, opts, log, err := readConfig(conf_file)
if err != nil {
os.Stderr.WriteString(fmt.Sprintf("Config read error: %s\n", err))
os.Exit(SomeError)
}
defapp := getNewApp(log, config.SubConfigT{
ConfigT: config.ConfigT{
Streamer: conf.Streamer,
Extractor: conf.Extractor,
Cache: conf.Cache,
},
Name: "default",
})
app := app.New(
logger.NewLayer(log, "App"),
defapp,
opts)
def, opts)

http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
log.LogInfo("Bad request", r.RemoteAddr, r.RequestURI)
Expand All @@ -106,7 +70,7 @@ func startApp(conf_file string) {
s := &http.Server{
Addr: fmt.Sprintf("%s:%d", conf.Host, conf.PortInt),
}
go signalsCatcher(log, app, s)
go signalsCatcher(conf_file, log, app, s)

log.LogInfo("Starting web server", "host", conf.Host, "port", conf.PortInt)
if err = s.ListenAndServe(); err == http.ErrServerClosed {
Expand All @@ -115,17 +79,59 @@ func startApp(conf_file string) {
} else {
log.LogError("HTTP server", err)
log.Close()
os.Exit(WebServerError)
os.Exit(SomeError)
}
}

func signalsCatcher(log logger.T, app *app.T, s *http.Server) {
func readConfig(conf_file string) (config.ConfigT, app.Option, []app.Option,
logger.T, error) {
conf, err := config.Read(conf_file)
if err != nil {
return config.ConfigT{}, app.Option{}, nil, nil, err
}

log, err := logger.New(conf.Log)
if err != nil {
return config.ConfigT{}, app.Option{}, nil, nil, err
}

defapp, err := getNewApp(log, config.SubConfigT{
ConfigT: config.ConfigT{
Streamer: conf.Streamer,
Extractor: conf.Extractor,
Cache: conf.Cache,
},
Name: "default",
})
if err != nil {
return config.ConfigT{}, app.Option{}, nil, nil, err
}

opts := make([]app.Option, 0)
for _, v := range conf.SubConfig {
opt, err := getNewApp(log, v)
if err != nil {
return config.ConfigT{}, app.Option{}, nil, nil, err
}
opts = append(opts, opt)
}
return conf, defapp, opts, log, nil
}

func signalsCatcher(conf_file string, log logger.T, app *app.AppLogic, s *http.Server) {
sigint := make(chan os.Signal, 1)
signal.Notify(sigint, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM)
for {
switch <-sigint {
case syscall.SIGHUP:
log.LogWarning("Config reloading")
_, def, opts, lognew, err := readConfig(conf_file)
if err != nil {
log.LogError("Config reload error", err)
} else {
app.ReloadConfig(logger.NewLayer(lognew, "App"), def, opts)
log = lognew
}
case syscall.SIGINT:
fallthrough
case syscall.SIGTERM:
Expand All @@ -140,25 +146,40 @@ func signalsCatcher(log logger.T, app *app.T, s *http.Server) {
}
}

func getNewApp(log logger.T, v config.SubConfigT) app.Option {
subcheck := func(err error, name string, errorcode int) {
checkOrExit(err, v.Name+" "+name, errorcode)
func getNewApp(log logger.T, v config.SubConfigT) (app.Option, error) {
texts := [3]string{
"Extractor",
"Cache",
"Streamer",
}

newname := func(name string) string {
return fmt.Sprintf("[%s] %s", v.Name, name)
}
xtr, err := extractor.New(v.Extractor, logger.NewLayer(log, newname(texts[2])))
subcheck(err, texts[2], ExtractorError)
cch, err := cache.New(v.Cache, logger.NewLayer(log, newname(texts[3])))
subcheck(err, texts[3], CacheError)
strm, err := streamer.New(v.Streamer, logger.NewLayer(log, newname(texts[4])), xtr)
subcheck(err, texts[4], StreamerError)
nameerr := func(name string, err error) error {
return fmt.Errorf("%s: %s", newname(name), err)
}
xtr, err := extractor.New(v.Extractor,
logger.NewLayer(log, newname(texts[0])))
if err != nil {
return app.Option{}, nameerr(texts[0], err)
}
cch, err := cache.New(v.Cache,
logger.NewLayer(log, newname(texts[1])))
if err != nil {
return app.Option{}, nameerr(texts[1], err)
}
strm, err := streamer.New(v.Streamer,
logger.NewLayer(log, newname(texts[2])), xtr)
if err != nil {
return app.Option{}, nameerr(texts[2], err)
}
return app.Option{
Name: v.Name,
Sites: v.Sites,
X: xtr,
S: strm,
C: cch,
L: logger.NewLayer(log, fmt.Sprintf("[%s] app", v.Name)),
}
}, nil
}
65 changes: 38 additions & 27 deletions lib/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ const (
defaultVideoFormat = "mp4"
)

type appT struct {
type app struct {
cache cache.T
extractor extractor.T
streamer streamer.T
Expand All @@ -29,11 +29,11 @@ type appT struct {
log logger.T
}

type T struct {
type AppLogic struct {
mu sync.RWMutex
log logger.T
defaultApp appT
appList []appT
defaultApp app
appList []app
}

type Option struct {
Expand All @@ -45,23 +45,25 @@ type Option struct {
L logger.T
}

func New(
log logger.T,
def Option,
opts []Option) *T {
t := T{
log: log,
defaultApp: appT{
log: def.L,
name: "default",
cache: def.C,
extractor: def.X,
streamer: def.S,
},
func New(log logger.T, def Option, opts []Option) *AppLogic {
var t AppLogic
t.set(log, def, opts)
return &t
}

func (t *AppLogic) set(log logger.T, def Option, opts []Option) {
t.log = log
t.defaultApp = app{
log: def.L,
name: "default",
cache: def.C,
extractor: def.X,
streamer: def.S,
}
t.appList = make([]appT, 0)

t.appList = make([]app, 0)
for _, v := range opts {
t.appList = append(t.appList, appT{
t.appList = append(t.appList, app{
cache: v.C,
extractor: v.X,
streamer: v.S,
Expand All @@ -70,10 +72,9 @@ func New(
log: v.L,
})
}
return &t
}

func (t *T) selectApp(rawURL string) appT {
func (t *AppLogic) selectApp(rawURL string) app {
host, err := parseUrlHost(rawURL)
if err == nil {
for _, v := range t.appList {
Expand All @@ -90,7 +91,7 @@ func parseUrlHost(rawURL string) (string, error) {
return u.Host, err
}

func (t *T) Run(w http.ResponseWriter, r *http.Request) {
func (t *AppLogic) Run(w http.ResponseWriter, r *http.Request) {
t.mu.RLock()
defer t.mu.RUnlock()
printExpired := func(links []extractor_config.RequestT) {
Expand Down Expand Up @@ -120,7 +121,7 @@ func (t *T) Run(w http.ResponseWriter, r *http.Request) {
}
}

func (t *appT) play(
func (t *app) play(
w http.ResponseWriter,
r *http.Request,
req extractor_config.RequestT,
Expand All @@ -133,7 +134,7 @@ func (t *appT) play(
}
}

func (t *appT) playError(
func (t *app) playError(
w http.ResponseWriter,
req extractor_config.RequestT,
err error,
Expand All @@ -144,13 +145,13 @@ func (t *appT) playError(
}
}

func (t *appT) cacheCheck(req extractor_config.RequestT, now time.Time) (extractor_config.ResultT, bool, []extractor_config.RequestT) {
func (t *app) cacheCheck(req extractor_config.RequestT, now time.Time) (extractor_config.ResultT, bool, []extractor_config.RequestT) {
expired := t.cache.CleanExpired(now)
res, ok := t.cache.Get(req)
return res, ok, expired
}

func (t *appT) cacheAdd(
func (t *app) cacheAdd(
req extractor_config.RequestT,
res extractor_config.ResultT,
now time.Time,
Expand Down Expand Up @@ -193,8 +194,18 @@ func parseQuery(query string) extractor_config.RequestT {
return req
}

func (t *T) Shutdown() {
func (t *AppLogic) Shutdown() {
t.mu.Lock() // locking app forever
t.log.LogInfo("Exiting")
t.log.Close()
}

func (t *AppLogic) ReloadConfig(log logger.T, def Option, opts []Option) {
t.log.LogDebug("Waiting for clients disconnect to reload app")
t.mu.Lock()
defer t.mu.Unlock()
t.log.LogInfo("Reloading app")
t.log.Close()
t.set(log, def, opts)
t.log.LogInfo("Reloading complete")
}

0 comments on commit 1d71cda

Please sign in to comment.