-
Notifications
You must be signed in to change notification settings - Fork 10
/
Copy pathexample.go
76 lines (63 loc) · 2.21 KB
/
example.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
package main
import (
"context"
"fmt"
xnsv1 "github.com/cludden/protoc-gen-go-temporal/gen/example/xns/v1"
"go.temporal.io/sdk/activity"
"go.temporal.io/sdk/workflow"
)
type ExampleWorkflows struct{}
type CreateFooWorkflow struct {
*xnsv1.CreateFooWorkflowInput
progress float32
status xnsv1.Foo_Status
}
func (w *ExampleWorkflows) CreateFoo(ctx workflow.Context, input *xnsv1.CreateFooWorkflowInput) (xnsv1.CreateFooWorkflow, error) {
return &CreateFooWorkflow{input, 0, xnsv1.Foo_FOO_STATUS_CREATING}, nil
}
func (wf *CreateFooWorkflow) Execute(ctx workflow.Context) (*xnsv1.CreateFooResponse, error) {
workflow.Go(ctx, func(ctx workflow.Context) {
for {
signal, _ := wf.SetFooProgress.Receive(ctx)
wf.UpdateFooProgress(ctx, signal)
}
})
err := xnsv1.Notify(ctx, &xnsv1.NotifyRequest{
Message: fmt.Sprintf("creating foo resource (%s)", wf.Req.GetName()),
})
if err != nil {
return nil, fmt.Errorf("error sending notification: %w", err)
}
workflow.Await(ctx, func() bool {
return wf.status == xnsv1.Foo_FOO_STATUS_READY
})
return &xnsv1.CreateFooResponse{
Foo: &xnsv1.Foo{
Name: wf.Req.GetName(),
Status: wf.status,
},
}, nil
}
func (wf *CreateFooWorkflow) GetFooProgress() (*xnsv1.GetFooProgressResponse, error) {
return &xnsv1.GetFooProgressResponse{Progress: wf.progress, Status: wf.status}, nil
}
func (wf *CreateFooWorkflow) UpdateFooProgress(ctx workflow.Context, req *xnsv1.SetFooProgressRequest) (*xnsv1.GetFooProgressResponse, error) {
wf.progress = req.GetProgress()
switch {
case wf.progress < 0:
wf.progress, wf.status = 0, xnsv1.Foo_FOO_STATUS_CREATING
case wf.progress < 100:
wf.status = xnsv1.Foo_FOO_STATUS_CREATING
case wf.progress >= 100:
wf.progress, wf.status = 100, xnsv1.Foo_FOO_STATUS_READY
}
return &xnsv1.GetFooProgressResponse{Progress: wf.progress, Status: wf.status}, nil
}
// ExampleActivities manages shared state for activities and is used to register
// activities with a worker
type ExampleActivities struct{}
// Notify defines the implementation for a Notify activity
func (a *ExampleActivities) Notify(ctx context.Context, req *xnsv1.NotifyRequest) error {
activity.GetLogger(ctx).Info("notification", "message", req.GetMessage())
return nil
}