Skip to content

Commit

Permalink
update: add function Add&Done, support graceful shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
huija committed Apr 21, 2022
1 parent b4f29dd commit e09bb53
Show file tree
Hide file tree
Showing 8 changed files with 193 additions and 176 deletions.
56 changes: 56 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,59 @@ func SetConfig(key string, c Config) error {
configMap[key] = c
return nil
}

// ConfigKey for this repo
const ConfigKey = "tao"

// taoConfig implements Config
type taoConfig struct {
Log *Log `json:"log"`
HideBanner bool `json:"hide_banner"`
}

var defaultTao = &taoConfig{
Log: &Log{
Level: DEBUG,
Type: Console | File,
CallDepth: 3,
Path: "./test.log",
Disable: false,
},
}

// Default config
func (t *taoConfig) Default() Config {
return defaultTao
}

// ValidSelf with some default values
func (t *taoConfig) ValidSelf() {
if t.Log == nil {
t.Log = defaultTao.Log
} else {
if t.Log.Level < DEBUG || t.Log.Level > FATAL {
t.Log.Level = defaultTao.Log.Level
}
if t.Log.Type == 0 {
t.Log.Type = defaultTao.Log.Type
}
if t.Log.CallDepth <= 0 {
t.Log.CallDepth = defaultTao.Log.CallDepth
}
if t.Log.Type&File != 0 {
if t.Log.Path == "" {
t.Log.Path = defaultTao.Log.Path
}
}
}
}

// ToTask transform itself to Task
func (t *taoConfig) ToTask() Task {
return nil
}

// RunAfter defines pre task names
func (t *taoConfig) RunAfter() []string {
return nil
}
27 changes: 25 additions & 2 deletions init.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package tao

import (
"context"
"encoding/json"
"gopkg.in/yaml.v3"
"io"
Expand All @@ -23,8 +24,19 @@ import (
"os"
"path"
"strings"
"time"
)

// banner of tao
const banner = `
___________
\__ ___/____ ____
| | \__ \ / _ \
| | / __ \( <_> )
|____| (____ /\____/
\/
`

// ConfigType of config file
type ConfigType uint8

Expand Down Expand Up @@ -104,6 +116,9 @@ func SetConfigBytesAll(data []byte, configType ConfigType) (err error) {
return
}

// t global config of tao
var t *taoConfig

// taoInit can only be called once before tao.Run
func taoInit() error {
// transfer config bytes to object
Expand Down Expand Up @@ -166,8 +181,16 @@ func taoInit() error {
}
}

tao = NewPipeline(ConfigKey)

// init universe after tao
return universeInit()
}

func universeInit() error {
if tao.universe.State() != Runnable {
return NewError(TaskRunTwice, "universe: init twice")
}
// universe run
timeout, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
return tao.universe.Run(timeout, nil)
}
18 changes: 11 additions & 7 deletions pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,6 @@ func NewPipeline(name string, options ...PipelineOption) Pipeline {

// Name of Pipeline
func (p *pipeline) Name() string {
p.mu.RLock()
defer p.mu.RUnlock()
return p.name
}

Expand All @@ -102,6 +100,10 @@ func (p *pipeline) Register(task *PipeTask) error {
if tName == "" {
return NewError(ParamInvalid, "pipeline: Register task name is empty")
}

if p.state == Running {
return NewError(TaskRunning, "pipeline: pipeline is running")
}
if _, dup := p.signals[tName]; dup {
return NewError(ParamInvalid, "pipeline: Register called twice for task "+tName)
}
Expand Down Expand Up @@ -144,7 +146,7 @@ func (p *pipeline) Run(ctx context.Context, param Parameter) error {
}()

// init closeChan & results when run
p.closeChan = make(chan func() error, len(p.tasks))
p.closeChan = make(chan func() error, len(p.tasks)+2)
p.results = NewParameter()

if p.postStart != nil {
Expand Down Expand Up @@ -180,6 +182,9 @@ func (p *pipeline) taskRun(ctx context.Context, task *PipeTask, param Parameter,
}
}

// register close before run
p.closeChan <- task.Close

// run & wrap cause
err = task.Run(ctx, param)
if err != nil {
Expand All @@ -195,8 +200,6 @@ func (p *pipeline) taskRun(ctx context.Context, task *PipeTask, param Parameter,
if async {
// signal
close(p.signals[task.Name()])
// close fun
p.closeChan <- task.Close
}
}

Expand Down Expand Up @@ -227,11 +230,11 @@ func (p *pipeline) Close() error {
)

if p.state == Running {
return NewError(TaskRunning, "pipeline: pipeline is running")
return NewError(TaskRunning, "pipeline: pipeline %s is running", p.Name())
}

if p.state == Closed {
return NewError(TaskCloseTwice, "pipeline: Closed called twice for pipeline "+p.name)
return NewError(TaskCloseTwice, "pipeline: Close called twice for pipeline %s", p.Name())
}

// close chan before for range
Expand All @@ -248,6 +251,7 @@ func (p *pipeline) Close() error {
closeSlice = append(closeSlice, p.preStop.Close)
}

// close functions are called in reverse order
for i := len(closeSlice) - 1; i >= 0; i-- {
if e := closeSlice[i](); e != nil {
err = NewErrorWrapped(e.Error(), err)
Expand Down
132 changes: 68 additions & 64 deletions tao.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2021 huija
// Copyright 2022 huija
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -17,13 +17,31 @@ package tao
import (
"context"
"encoding/json"
"os"
"os/signal"
"sync"
"syscall"
)

// Universe of tao
type Universe struct {
sync.WaitGroup

Pipeline
universe Pipeline
}

// The Tao produced One; One produced Two; Two produced Three; Three produced All things.
var tao Pipeline
var tao = &Universe{
Pipeline: NewPipeline(ConfigKey),
universe: NewPipeline("universe"),
}

// Add of tao
var Add = tao.Add

// t global config of tao
var t *taoConfig
// Done of tao
var Done = tao.Done

// Run tao
func Run(ctx context.Context, param Parameter) (err error) {
Expand All @@ -47,7 +65,7 @@ func Run(ctx context.Context, param Parameter) (err error) {
default:
}

// tasks run
// tasks register
for _, c := range configMap {
c.ValidSelf()
err = tao.Register(NewPipeTask(c.ToTask(), c.RunAfter()...))
Expand All @@ -63,71 +81,57 @@ func Run(ctx context.Context, param Parameter) (err error) {
}
Debugf("config data: \n%s", string(cm))

return tao.Run(ctx, param)
}
// graceful shutdown
gracefulShutdown()

// ConfigKey for this repo
const ConfigKey = "tao"

// banner of tao
const banner = `
___________
\__ ___/____ ____
| | \__ \ / _ \
| | / __ \( <_> )
|____| (____ /\____/
\/
`

// taoConfig implements Config
type taoConfig struct {
Log *Log `json:"log"`
HideBanner bool `json:"hide_banner"`
}

var defaultTao = &taoConfig{
Log: &Log{
Level: DEBUG,
Type: Console | File,
CallDepth: 3,
Path: "./test.log",
Disable: false,
},
}
// tao run
err = tao.Run(ctx, param)
if err != nil {
return err
}

// Default config
func (t *taoConfig) Default() Config {
return defaultTao
// tao wait
tao.Wait()
return
}

// ValidSelf with some default values
func (t *taoConfig) ValidSelf() {
if t.Log == nil {
t.Log = defaultTao.Log
} else {
if t.Log.Level < DEBUG || t.Log.Level > FATAL {
t.Log.Level = defaultTao.Log.Level
}
if t.Log.Type == 0 {
t.Log.Type = defaultTao.Log.Type
}
if t.Log.CallDepth <= 0 {
t.Log.CallDepth = defaultTao.Log.CallDepth
}
if t.Log.Type&File != 0 {
if t.Log.Path == "" {
t.Log.Path = defaultTao.Log.Path
// Register to tao universe
func Register(configKey string, fn func() error) error {
switch tao.universe.State() {
case Running, Over, Closed:
return fn()
default:
return tao.universe.Register(NewPipeTask(NewTask(configKey, func(ctx context.Context, param Parameter) (Parameter, error) {
select {
case <-ctx.Done():
return param, NewError(ContextCanceled, "universe: %s init failed", configKey)
default:
return param, fn()
}
}
})))
}
}

// ToTask transform itself to Task
func (t *taoConfig) ToTask() Task {
return nil
}

// RunAfter defines pre task names
func (t *taoConfig) RunAfter() []string {
return nil
func gracefulShutdown() {
sc := make(chan os.Signal, 1)
signal.Notify(sc)
go func() {
for {
sig := <-sc
if _, ok := map[os.Signal]struct{}{
syscall.SIGINT: {},
syscall.SIGQUIT: {},
syscall.SIGTERM: {},
}[sig]; ok {
Debugf("got exiting signal now: %v\n", sig)
if err := tao.Close(); err != nil {
os.Exit(1)
} else {
os.Exit(0)
}
} else {
Debugf("got non-exiting signal: %v\n", sig)
}
}
}()
}
Loading

0 comments on commit e09bb53

Please sign in to comment.