-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathtango.go
153 lines (130 loc) · 3.01 KB
/
tango.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
package tango
import (
"errors"
"github.com/charmbracelet/log"
"os"
"os/signal"
"sync"
"sync/atomic"
"syscall"
"time"
)
type Tango struct {
stages []Stage
wg sync.WaitGroup
producerChannel chan interface{}
osSignChannel chan os.Signal
onAccomplished AccomplishedCallback
closed atomic.Bool
}
func NewTango() *Tango {
tango := &Tango{
osSignChannel: make(chan os.Signal, 1),
wg: sync.WaitGroup{},
producerChannel: make(chan interface{}),
}
tango.closed.Store(false)
return tango
}
func (t *Tango) SetProducerChannel(producerChannel chan interface{}) {
t.producerChannel = producerChannel
}
func (t *Tango) SetStages(stages []Stage) {
if len(stages) == 0 {
panic("cant set empty stages")
}
stages[len(stages)-1].isTheLast = true
t.stages = stages
}
func (t *Tango) AppendStage(stage Stage) {
if len(t.stages) != 0 {
// reset isTheLast flag for exising stages
t.stages[len(t.stages)-1].isTheLast = false
}
stage.isTheLast = true
t.stages = append(t.stages, stage)
}
func (t *Tango) OnProcessed(callback AccomplishedCallback) {
t.onAccomplished = callback
}
func (t *Tango) Start() error {
if t.producerChannel == nil {
return errors.New("producer channel is nil. Cant start stages without producer")
}
if len(t.stages) == 0 {
return errors.New("no stages found. cant start without stages")
}
done := make(chan struct{})
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
for i := range t.stages {
go func(index int) {
defer func() {
close(t.stages[index].Channel)
}()
for msg := range t.stages[index].Channel {
select {
case <-done:
t.closed.Store(true)
break
default:
result, err := t.stages[index].Function(msg)
if err == nil {
if t.stages[index].isTheLast {
if t.onAccomplished != nil {
t.onAccomplished(result, err)
}
}
if index+1 < len(t.stages) {
select {
case t.stages[index+1].Channel <- result:
// Message sent successfully to the next stage
case <-done:
// Context canceled, break the loop
break
}
}
} else {
if t.onAccomplished != nil {
t.onAccomplished(nil, errors.New("failed to process message"))
}
log.Fatalf("Error in Stage %d: %v\n", index+1, err)
close(t.stages[0].Channel)
return
}
}
}
}(i)
}
t.wg.Add(1)
go func() {
for {
select {
case <-done:
if t.closed.Load() {
break
}
t.closed.Store(true)
close(done)
break
case msg, ok := <-t.producerChannel:
if !ok {
// producerChannel closed, break the loop
return
}
select {
case t.stages[0].Channel <- msg:
// Message sent successfully to the first stage
case <-time.After(time.Second):
// Backpressure: First stage is not ready to receive, wait for a short period
if t.closed.Load() {
break
}
t.stages[0].Channel <- msg
}
}
}
}()
<-sigCh
return nil
}