From b4f29dd155e2e43dd7a430569caa3d109f3c4b6d Mon Sep 17 00:00:00 2001 From: huija <1150555483@qq.com> Date: Sun, 20 Mar 2022 23:05:08 +0800 Subject: [PATCH 1/2] Update .gitignore --- .gitignore | 1 - 1 file changed, 1 deletion(-) diff --git a/.gitignore b/.gitignore index 52ac755..aa8a35b 100644 --- a/.gitignore +++ b/.gitignore @@ -17,5 +17,4 @@ coverage.txt # vendor/ # others -conf/ *.log From e09bb53a35c29972e985b243a69544ff53966fb7 Mon Sep 17 00:00:00 2001 From: huija <1150555483@qq.com> Date: Thu, 21 Apr 2022 19:45:07 +0800 Subject: [PATCH 2/2] update: add function Add&Done, support graceful shutdown --- config.go | 56 ++++++++++++++++++++ init.go | 27 +++++++++- pipeline.go | 18 ++++--- tao.go | 132 ++++++++++++++++++++++++----------------------- tao_test.go | 32 +++++++++++- task.go | 6 +-- universe.go | 50 ------------------ universe_test.go | 48 ----------------- 8 files changed, 193 insertions(+), 176 deletions(-) delete mode 100644 universe.go delete mode 100644 universe_test.go diff --git a/config.go b/config.go index cfbbbf9..4a36464 100644 --- a/config.go +++ b/config.go @@ -58,3 +58,59 @@ func SetConfig(key string, c Config) error { configMap[key] = c return nil } + +// ConfigKey for this repo +const ConfigKey = "tao" + +// taoConfig implements Config +type taoConfig struct { + Log *Log `json:"log"` + HideBanner bool `json:"hide_banner"` +} + +var defaultTao = &taoConfig{ + Log: &Log{ + Level: DEBUG, + Type: Console | File, + CallDepth: 3, + Path: "./test.log", + Disable: false, + }, +} + +// Default config +func (t *taoConfig) Default() Config { + return defaultTao +} + +// ValidSelf with some default values +func (t *taoConfig) ValidSelf() { + if t.Log == nil { + t.Log = defaultTao.Log + } else { + if t.Log.Level < DEBUG || t.Log.Level > FATAL { + t.Log.Level = defaultTao.Log.Level + } + if t.Log.Type == 0 { + t.Log.Type = defaultTao.Log.Type + } + if t.Log.CallDepth <= 0 { + t.Log.CallDepth = defaultTao.Log.CallDepth + } + if t.Log.Type&File != 0 { + if t.Log.Path == "" { + t.Log.Path = defaultTao.Log.Path + } + } + } +} + +// ToTask transform itself to Task +func (t *taoConfig) ToTask() Task { + return nil +} + +// RunAfter defines pre task names +func (t *taoConfig) RunAfter() []string { + return nil +} diff --git a/init.go b/init.go index 0259d9e..a5b14f6 100644 --- a/init.go +++ b/init.go @@ -15,6 +15,7 @@ package tao import ( + "context" "encoding/json" "gopkg.in/yaml.v3" "io" @@ -23,8 +24,19 @@ import ( "os" "path" "strings" + "time" ) +// banner of tao +const banner = ` +___________ +\__ ___/____ ____ + | | \__ \ / _ \ + | | / __ \( <_> ) + |____| (____ /\____/ + \/ +` + // ConfigType of config file type ConfigType uint8 @@ -104,6 +116,9 @@ func SetConfigBytesAll(data []byte, configType ConfigType) (err error) { return } +// t global config of tao +var t *taoConfig + // taoInit can only be called once before tao.Run func taoInit() error { // transfer config bytes to object @@ -166,8 +181,16 @@ func taoInit() error { } } - tao = NewPipeline(ConfigKey) - // init universe after tao return universeInit() } + +func universeInit() error { + if tao.universe.State() != Runnable { + return NewError(TaskRunTwice, "universe: init twice") + } + // universe run + timeout, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + return tao.universe.Run(timeout, nil) +} diff --git a/pipeline.go b/pipeline.go index 209a215..7fd2317 100644 --- a/pipeline.go +++ b/pipeline.go @@ -79,8 +79,6 @@ func NewPipeline(name string, options ...PipelineOption) Pipeline { // Name of Pipeline func (p *pipeline) Name() string { - p.mu.RLock() - defer p.mu.RUnlock() return p.name } @@ -102,6 +100,10 @@ func (p *pipeline) Register(task *PipeTask) error { if tName == "" { return NewError(ParamInvalid, "pipeline: Register task name is empty") } + + if p.state == Running { + return NewError(TaskRunning, "pipeline: pipeline is running") + } if _, dup := p.signals[tName]; dup { return NewError(ParamInvalid, "pipeline: Register called twice for task "+tName) } @@ -144,7 +146,7 @@ func (p *pipeline) Run(ctx context.Context, param Parameter) error { }() // init closeChan & results when run - p.closeChan = make(chan func() error, len(p.tasks)) + p.closeChan = make(chan func() error, len(p.tasks)+2) p.results = NewParameter() if p.postStart != nil { @@ -180,6 +182,9 @@ func (p *pipeline) taskRun(ctx context.Context, task *PipeTask, param Parameter, } } + // register close before run + p.closeChan <- task.Close + // run & wrap cause err = task.Run(ctx, param) if err != nil { @@ -195,8 +200,6 @@ func (p *pipeline) taskRun(ctx context.Context, task *PipeTask, param Parameter, if async { // signal close(p.signals[task.Name()]) - // close fun - p.closeChan <- task.Close } } @@ -227,11 +230,11 @@ func (p *pipeline) Close() error { ) if p.state == Running { - return NewError(TaskRunning, "pipeline: pipeline is running") + return NewError(TaskRunning, "pipeline: pipeline %s is running", p.Name()) } if p.state == Closed { - return NewError(TaskCloseTwice, "pipeline: Closed called twice for pipeline "+p.name) + return NewError(TaskCloseTwice, "pipeline: Close called twice for pipeline %s", p.Name()) } // close chan before for range @@ -248,6 +251,7 @@ func (p *pipeline) Close() error { closeSlice = append(closeSlice, p.preStop.Close) } + // close functions are called in reverse order for i := len(closeSlice) - 1; i >= 0; i-- { if e := closeSlice[i](); e != nil { err = NewErrorWrapped(e.Error(), err) diff --git a/tao.go b/tao.go index e1bbd73..565e3b6 100644 --- a/tao.go +++ b/tao.go @@ -1,4 +1,4 @@ -// Copyright 2021 huija +// Copyright 2022 huija // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -17,13 +17,31 @@ package tao import ( "context" "encoding/json" + "os" + "os/signal" + "sync" + "syscall" ) +// Universe of tao +type Universe struct { + sync.WaitGroup + + Pipeline + universe Pipeline +} + // The Tao produced One; One produced Two; Two produced Three; Three produced All things. -var tao Pipeline +var tao = &Universe{ + Pipeline: NewPipeline(ConfigKey), + universe: NewPipeline("universe"), +} + +// Add of tao +var Add = tao.Add -// t global config of tao -var t *taoConfig +// Done of tao +var Done = tao.Done // Run tao func Run(ctx context.Context, param Parameter) (err error) { @@ -47,7 +65,7 @@ func Run(ctx context.Context, param Parameter) (err error) { default: } - // tasks run + // tasks register for _, c := range configMap { c.ValidSelf() err = tao.Register(NewPipeTask(c.ToTask(), c.RunAfter()...)) @@ -63,71 +81,57 @@ func Run(ctx context.Context, param Parameter) (err error) { } Debugf("config data: \n%s", string(cm)) - return tao.Run(ctx, param) -} + // graceful shutdown + gracefulShutdown() -// ConfigKey for this repo -const ConfigKey = "tao" - -// banner of tao -const banner = ` -___________ -\__ ___/____ ____ - | | \__ \ / _ \ - | | / __ \( <_> ) - |____| (____ /\____/ - \/ -` - -// taoConfig implements Config -type taoConfig struct { - Log *Log `json:"log"` - HideBanner bool `json:"hide_banner"` -} - -var defaultTao = &taoConfig{ - Log: &Log{ - Level: DEBUG, - Type: Console | File, - CallDepth: 3, - Path: "./test.log", - Disable: false, - }, -} + // tao run + err = tao.Run(ctx, param) + if err != nil { + return err + } -// Default config -func (t *taoConfig) Default() Config { - return defaultTao + // tao wait + tao.Wait() + return } -// ValidSelf with some default values -func (t *taoConfig) ValidSelf() { - if t.Log == nil { - t.Log = defaultTao.Log - } else { - if t.Log.Level < DEBUG || t.Log.Level > FATAL { - t.Log.Level = defaultTao.Log.Level - } - if t.Log.Type == 0 { - t.Log.Type = defaultTao.Log.Type - } - if t.Log.CallDepth <= 0 { - t.Log.CallDepth = defaultTao.Log.CallDepth - } - if t.Log.Type&File != 0 { - if t.Log.Path == "" { - t.Log.Path = defaultTao.Log.Path +// Register to tao universe +func Register(configKey string, fn func() error) error { + switch tao.universe.State() { + case Running, Over, Closed: + return fn() + default: + return tao.universe.Register(NewPipeTask(NewTask(configKey, func(ctx context.Context, param Parameter) (Parameter, error) { + select { + case <-ctx.Done(): + return param, NewError(ContextCanceled, "universe: %s init failed", configKey) + default: + return param, fn() } - } + }))) } } -// ToTask transform itself to Task -func (t *taoConfig) ToTask() Task { - return nil -} - -// RunAfter defines pre task names -func (t *taoConfig) RunAfter() []string { - return nil +func gracefulShutdown() { + sc := make(chan os.Signal, 1) + signal.Notify(sc) + go func() { + for { + sig := <-sc + if _, ok := map[os.Signal]struct{}{ + syscall.SIGINT: {}, + syscall.SIGQUIT: {}, + syscall.SIGTERM: {}, + }[sig]; ok { + Debugf("got exiting signal now: %v\n", sig) + if err := tao.Close(); err != nil { + os.Exit(1) + } else { + os.Exit(0) + } + } else { + Debugf("got non-exiting signal: %v\n", sig) + } + } + }() } diff --git a/tao_test.go b/tao_test.go index 131d25d..f8660ee 100644 --- a/tao_test.go +++ b/tao_test.go @@ -1,4 +1,4 @@ -// Copyright 2021 huija +// Copyright 2022 huija // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -16,10 +16,38 @@ package tao import ( "context" + "encoding/json" "github.com/stretchr/testify/assert" "testing" ) +func TestRegister(t *testing.T) { + err := Register(printConfigKey, func() error { + p := new(printConfig) + // 1. transfer config bytes to object + bytes, err := GetConfigBytes(printConfigKey) + if err != nil { + return err + } + err = json.Unmarshal(bytes, &p) + if err != nil { + return err + } + + p.ValidSelf() + + // 2. set object to tao + return SetConfig(printConfigKey, p) + }) + assert.Nil(t, err) + + err = SetConfig(printConfigKey, nil) + assert.NotNil(t, err) + + err = universeInit() + assert.NotNil(t, err) +} + func TestRun(t *testing.T) { t.Log(new(taoConfig).Default()) t.Log(new(taoConfig).ToTask()) @@ -30,6 +58,8 @@ func TestRun(t *testing.T) { err := Run(cancel, nil) assert.NotNil(t, err) + Add(1) + Done() err = Run(nil, nil) assert.Nil(t, err) } diff --git a/task.go b/task.go index e5cdf0d..b26623c 100644 --- a/task.go +++ b/task.go @@ -85,8 +85,6 @@ func NewTask(name string, fun TaskRun, options ...TaskOption) Task { // Name of Task func (t *task) Name() string { - t.mu.RLock() - defer t.mu.RUnlock() return t.name } @@ -166,11 +164,11 @@ func (t *task) Close() error { defer t.mu.Unlock() if t.state == Running { - return NewError(TaskRunning, "task: task is running") + return NewError(TaskRunning, "task: task %s is running", t.Name()) } if t.state == Closed { - return NewError(TaskCloseTwice, "task: Closed called twice for task "+t.name) + return NewError(TaskCloseTwice, "task: Close called twice for task %s", t.Name()) } t.state = Closed diff --git a/universe.go b/universe.go deleted file mode 100644 index 1649e4b..0000000 --- a/universe.go +++ /dev/null @@ -1,50 +0,0 @@ -// Copyright 2022 huija -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package tao - -import ( - "context" - "time" -) - -// universe for tao -var universe = NewPipeline("universe") - -func universeInit() error { - if universe.State() != Runnable { - return NewError(TaskRunTwice, "universe: init twice") - } - // universe run - timeout, cancel := context.WithTimeout(context.Background(), time.Minute) - defer cancel() - return universe.Run(timeout, nil) -} - -// Register to tao universe -func Register(configKey string, fn func() error) error { - switch universe.State() { - case Running, Over, Closed: - return fn() - default: - return universe.Register(NewPipeTask(NewTask(configKey, func(ctx context.Context, param Parameter) (Parameter, error) { - select { - case <-ctx.Done(): - return param, NewError(ContextCanceled, "universe: %s init failed", configKey) - default: - return param, fn() - } - }))) - } -} diff --git a/universe_test.go b/universe_test.go deleted file mode 100644 index 4b24f4b..0000000 --- a/universe_test.go +++ /dev/null @@ -1,48 +0,0 @@ -// Copyright 2022 huija -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package tao - -import ( - "encoding/json" - "github.com/stretchr/testify/assert" - "testing" -) - -func TestRegister(t *testing.T) { - err := Register(printConfigKey, func() error { - p := new(printConfig) - // 1. transfer config bytes to object - bytes, err := GetConfigBytes(printConfigKey) - if err != nil { - return err - } - err = json.Unmarshal(bytes, &p) - if err != nil { - return err - } - - p.ValidSelf() - - // 2. set object to tao - return SetConfig(printConfigKey, p) - }) - assert.Nil(t, err) - - err = SetConfig(printConfigKey, nil) - assert.NotNil(t, err) - - err = universeInit() - assert.NotNil(t, err) -}