-
Notifications
You must be signed in to change notification settings - Fork 10
/
Copy pathexample.go
98 lines (84 loc) · 3.24 KB
/
example.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
package example
import (
"context"
"fmt"
examplev1 "github.com/cludden/protoc-gen-go-temporal/gen/example/v1"
"go.temporal.io/sdk/activity"
"go.temporal.io/sdk/log"
"go.temporal.io/sdk/workflow"
)
type (
// Workflows manages shared state for workflow constructors and is used to
// register workflows with a worker
Workflows struct{}
// Activities manages shared state for activities and is used to register
// activities with a worker
Activities struct{}
// CreateFooWorkflow manages workflow state for a CreateFoo workflow
CreateFooWorkflow struct {
// it embeds the generated workflow Input type that contains the workflow
// input and signal helpers
*examplev1.CreateFooWorkflowInput
log log.Logger
progress float32
status examplev1.Foo_Status
}
)
// CreateFoo initializes a new examplev1.CreateFooWorkflow value
func (w *Workflows) CreateFoo(ctx workflow.Context, input *examplev1.CreateFooWorkflowInput) (examplev1.CreateFooWorkflow, error) {
return &CreateFooWorkflow{
CreateFooWorkflowInput: input,
log: workflow.GetLogger(ctx),
status: examplev1.Foo_FOO_STATUS_CREATING,
}, nil
}
// Execute defines the entrypoint to a example.v1.Example.CreateFoo workflow
func (wf *CreateFooWorkflow) Execute(ctx workflow.Context) (*examplev1.CreateFooResponse, error) {
// listen for signals using generated signal provided by workflow input
workflow.Go(ctx, func(ctx workflow.Context) {
for {
signal, _ := wf.SetFooProgress.Receive(ctx)
wf.UpdateFooProgress(ctx, signal)
}
})
// execute Notify activity using generated helper
if err := examplev1.Notify(ctx, &examplev1.NotifyRequest{
Message: fmt.Sprintf("creating foo resource (%s)", wf.Req.GetName()),
}); err != nil {
return nil, fmt.Errorf("error sending notification: %w", err)
}
// block until progress has reached 100 via signals and/or updates
if err := workflow.Await(ctx, func() bool {
return wf.status == examplev1.Foo_FOO_STATUS_READY
}); err != nil {
return nil, fmt.Errorf("error awaiting ready status: %w", err)
}
return &examplev1.CreateFooResponse{
Foo: &examplev1.Foo{
Name: wf.Req.GetName(),
Status: wf.status,
},
}, nil
}
// GetFooProgress defines the handler for a GetFooProgress query
func (wf *CreateFooWorkflow) GetFooProgress() (*examplev1.GetFooProgressResponse, error) {
return &examplev1.GetFooProgressResponse{Progress: wf.progress, Status: wf.status}, nil
}
// UpdateFooProgress defines the handler for an UpdateFooProgress update
func (wf *CreateFooWorkflow) UpdateFooProgress(ctx workflow.Context, req *examplev1.SetFooProgressRequest) (*examplev1.GetFooProgressResponse, error) {
wf.progress = req.GetProgress()
switch {
case wf.progress < 0:
wf.progress, wf.status = 0, examplev1.Foo_FOO_STATUS_CREATING
case wf.progress < 100:
wf.status = examplev1.Foo_FOO_STATUS_CREATING
case wf.progress >= 100:
wf.progress, wf.status = 100, examplev1.Foo_FOO_STATUS_READY
}
return &examplev1.GetFooProgressResponse{Progress: wf.progress, Status: wf.status}, nil
}
// Notify defines the implementation for a Notify activity
func (a *Activities) Notify(ctx context.Context, req *examplev1.NotifyRequest) error {
activity.GetLogger(ctx).Info("notification", "message", req.GetMessage())
return nil
}