Skip to content

Commit f67a5a2

Browse files
committed
Turn bonanza_builder into a worker
Instead of having a gRPC server integrated into bonanza_builder and letting clients connect to it directly, this change alters it so that it behaves like a worker that connects to the scheduler. bonanza_bazel will now submit builds to the scheduler. This makes it easy to spin up pools of bonanza_builder systems.
1 parent 631e7a6 commit f67a5a2

File tree

21 files changed

+649
-336
lines changed

21 files changed

+649
-336
lines changed

cmd/bonanza_builder/BUILD.bazel

+8-3
Original file line numberDiff line numberDiff line change
@@ -10,28 +10,33 @@ go_library(
1010
"//pkg/model/analysis",
1111
"//pkg/model/core",
1212
"//pkg/model/encoding",
13-
"//pkg/proto/build",
1413
"//pkg/proto/configuration/bonanza_builder",
1514
"//pkg/proto/model/analysis",
15+
"//pkg/proto/model/build",
16+
"//pkg/proto/model/command",
1617
"//pkg/proto/remoteexecution",
18+
"//pkg/proto/remoteworker",
1719
"//pkg/proto/storage/dag",
1820
"//pkg/proto/storage/object",
21+
"//pkg/remoteexecution",
22+
"//pkg/remoteworker",
1923
"//pkg/storage/dag",
2024
"//pkg/storage/object",
2125
"//pkg/storage/object/grpc",
2226
"//pkg/storage/object/namespacemapping",
2327
"@com_github_buildbarn_bb_remote_execution//pkg/filesystem",
28+
"@com_github_buildbarn_bb_storage//pkg/clock",
2429
"@com_github_buildbarn_bb_storage//pkg/filesystem",
2530
"@com_github_buildbarn_bb_storage//pkg/filesystem/path",
2631
"@com_github_buildbarn_bb_storage//pkg/global",
27-
"@com_github_buildbarn_bb_storage//pkg/grpc",
2832
"@com_github_buildbarn_bb_storage//pkg/http",
2933
"@com_github_buildbarn_bb_storage//pkg/program",
34+
"@com_github_buildbarn_bb_storage//pkg/random",
3035
"@com_github_buildbarn_bb_storage//pkg/util",
31-
"@org_golang_google_grpc//:grpc",
3236
"@org_golang_google_grpc//codes",
3337
"@org_golang_google_grpc//status",
3438
"@org_golang_google_protobuf//proto",
39+
"@org_golang_google_protobuf//types/known/emptypb",
3540
"@org_golang_x_sync//semaphore",
3641
],
3742
)

cmd/bonanza_builder/main.go

+93-67
Original file line numberDiff line numberDiff line change
@@ -2,42 +2,46 @@ package main
22

33
import (
44
"context"
5-
"crypto/ecdh"
6-
"crypto/x509"
7-
"encoding/pem"
5+
"encoding/json"
86
"fmt"
97
"net/http"
108
"os"
119
"runtime"
10+
"time"
1211

1312
re_filesystem "github.com/buildbarn/bb-remote-execution/pkg/filesystem"
13+
"github.com/buildbarn/bb-storage/pkg/clock"
1414
"github.com/buildbarn/bb-storage/pkg/filesystem"
1515
"github.com/buildbarn/bb-storage/pkg/filesystem/path"
1616
"github.com/buildbarn/bb-storage/pkg/global"
17-
bb_grpc "github.com/buildbarn/bb-storage/pkg/grpc"
1817
bb_http "github.com/buildbarn/bb-storage/pkg/http"
1918
"github.com/buildbarn/bb-storage/pkg/program"
19+
"github.com/buildbarn/bb-storage/pkg/random"
2020
"github.com/buildbarn/bb-storage/pkg/util"
2121
"github.com/buildbarn/bonanza/pkg/evaluation"
2222
model_analysis "github.com/buildbarn/bonanza/pkg/model/analysis"
2323
model_core "github.com/buildbarn/bonanza/pkg/model/core"
2424
"github.com/buildbarn/bonanza/pkg/model/encoding"
25-
build_pb "github.com/buildbarn/bonanza/pkg/proto/build"
2625
"github.com/buildbarn/bonanza/pkg/proto/configuration/bonanza_builder"
2726
model_analysis_pb "github.com/buildbarn/bonanza/pkg/proto/model/analysis"
27+
model_build_pb "github.com/buildbarn/bonanza/pkg/proto/model/build"
28+
model_command_pb "github.com/buildbarn/bonanza/pkg/proto/model/command"
2829
remoteexecution_pb "github.com/buildbarn/bonanza/pkg/proto/remoteexecution"
30+
remoteworker_pb "github.com/buildbarn/bonanza/pkg/proto/remoteworker"
2931
dag_pb "github.com/buildbarn/bonanza/pkg/proto/storage/dag"
3032
object_pb "github.com/buildbarn/bonanza/pkg/proto/storage/object"
33+
remoteexecution "github.com/buildbarn/bonanza/pkg/remoteexecution"
34+
"github.com/buildbarn/bonanza/pkg/remoteworker"
3135
"github.com/buildbarn/bonanza/pkg/storage/dag"
3236
"github.com/buildbarn/bonanza/pkg/storage/object"
3337
object_grpc "github.com/buildbarn/bonanza/pkg/storage/object/grpc"
3438
object_namespacemapping "github.com/buildbarn/bonanza/pkg/storage/object/namespacemapping"
3539

3640
"golang.org/x/sync/semaphore"
37-
"google.golang.org/grpc"
3841
"google.golang.org/grpc/codes"
3942
"google.golang.org/grpc/status"
4043
"google.golang.org/protobuf/proto"
44+
"google.golang.org/protobuf/types/known/emptypb"
4145
)
4246

4347
func main() {
@@ -82,125 +86,147 @@ func main() {
8286
return util.StatusWrap(err, "Failed to create execution gRPC client")
8387
}
8488

85-
executionClientPrivateKeyBlock, _ := pem.Decode([]byte(configuration.ExecutionClientPrivateKey))
86-
if executionClientPrivateKeyBlock == nil {
87-
return status.Error(codes.InvalidArgument, "Execution client private key does not contain a PEM block")
89+
executionClientPrivateKey, err := remoteexecution.ParseECDHPrivateKey([]byte(configuration.ExecutionClientPrivateKey))
90+
if err != nil {
91+
return util.StatusWrap(err, "Failed to parse execution client private key")
8892
}
89-
if executionClientPrivateKeyBlock.Type != "PRIVATE KEY" {
90-
return status.Error(codes.InvalidArgument, "Execution client private key PEM block is not of type PRIVATE KEY")
93+
executionClientCertificateChain, err := remoteexecution.ParseCertificateChain([]byte(configuration.ExecutionClientCertificateChain))
94+
if err != nil {
95+
return util.StatusWrap(err, "Failed to parse execution client certificate chain")
9196
}
92-
executionClientPrivateKey, err := x509.ParsePKCS8PrivateKey(executionClientPrivateKeyBlock.Bytes)
97+
98+
remoteWorkerConnection, err := grpcClientFactory.NewClientFromConfiguration(configuration.RemoteWorkerGrpcClient)
9399
if err != nil {
94-
return util.StatusWrap(err, "Failed to parse execution client private key")
100+
return util.StatusWrap(err, "Failed to create remote worker RPC client")
95101
}
96-
executionClientECDHPrivateKey, ok := executionClientPrivateKey.(*ecdh.PrivateKey)
97-
if !ok {
98-
return status.Error(codes.InvalidArgument, "Execution client private key is not an ECDH private key")
102+
remoteWorkerClient := remoteworker_pb.NewOperationQueueClient(remoteWorkerConnection)
103+
104+
platformPrivateKeys, err := remoteworker.ParsePlatformPrivateKeys(configuration.PlatformPrivateKeys)
105+
if err != nil {
106+
return err
99107
}
100-
var executionClientCertificates [][]byte
101-
for certificateBlock, remainder := pem.Decode([]byte(configuration.ExecutionClientCertificateChain)); certificateBlock != nil; certificateBlock, remainder = pem.Decode(remainder) {
102-
if certificateBlock.Type != "CERTIFICATE" {
103-
return status.Error(codes.InvalidArgument, "Execution client certificate PEM block is not of type CERTIFICATE")
104-
}
105-
executionClientCertificates = append(executionClientCertificates, certificateBlock.Bytes)
106-
}
107-
108-
if err := bb_grpc.NewServersFromConfigurationAndServe(
109-
configuration.GrpcServers,
110-
func(s grpc.ServiceRegistrar) {
111-
build_pb.RegisterBuilderServer(s, &builderServer{
112-
objectDownloader: objectDownloader,
113-
dagUploaderClient: dag_pb.NewUploaderClient(storageGRPCClient),
114-
objectContentsWalkerSemaphore: semaphore.NewWeighted(int64(runtime.NumCPU())),
115-
httpClient: &http.Client{
116-
Transport: bb_http.NewMetricsRoundTripper(roundTripper, "Builder"),
117-
},
118-
filePool: filePool,
119-
cacheDirectory: cacheDirectory,
120-
executionClient: remoteexecution_pb.NewExecutionClient(executionGRPCClient),
121-
executionClientPrivateKey: executionClientECDHPrivateKey,
122-
executionClientCertificates: executionClientCertificates,
123-
})
108+
clientCertificateAuthorities, err := remoteworker.ParseClientCertificateAuthorities(configuration.ClientCertificateAuthorities)
109+
if err != nil {
110+
return err
111+
}
112+
workerName, err := json.Marshal(configuration.WorkerId)
113+
if err != nil {
114+
return util.StatusWrap(err, "Failed to marshal worker ID")
115+
}
116+
117+
executor := &builderExecutor{
118+
objectDownloader: objectDownloader,
119+
dagUploaderClient: dag_pb.NewUploaderClient(storageGRPCClient),
120+
objectContentsWalkerSemaphore: semaphore.NewWeighted(int64(runtime.NumCPU())),
121+
httpClient: &http.Client{
122+
Transport: bb_http.NewMetricsRoundTripper(roundTripper, "Builder"),
124123
},
125-
siblingsGroup,
126-
); err != nil {
127-
return util.StatusWrap(err, "gRPC server failure")
124+
filePool: filePool,
125+
cacheDirectory: cacheDirectory,
126+
executionClient: remoteexecution.NewClient[*model_command_pb.Action, emptypb.Empty, *model_command_pb.Result](
127+
remoteexecution_pb.NewExecutionClient(executionGRPCClient),
128+
executionClientPrivateKey,
129+
executionClientCertificateChain,
130+
),
128131
}
132+
client, err := remoteworker.NewClient(
133+
remoteWorkerClient,
134+
executor,
135+
clock.SystemClock,
136+
random.CryptoThreadSafeGenerator,
137+
platformPrivateKeys,
138+
clientCertificateAuthorities,
139+
configuration.WorkerId,
140+
/* sizeClass = */ 0,
141+
/* isLargestSizeClass = */ true,
142+
)
143+
if err != nil {
144+
return util.StatusWrap(err, "Failed to create remote worker client")
145+
}
146+
remoteworker.LaunchWorkerThread(siblingsGroup, client.Run, string(workerName))
129147

130148
lifecycleState.MarkReadyAndWait(siblingsGroup)
131149
return nil
132150
})
133151
}
134152

135-
type builderServer struct {
153+
type builderExecutor struct {
136154
objectDownloader object.Downloader[object.GlobalReference]
137155
dagUploaderClient dag_pb.UploaderClient
138156
objectContentsWalkerSemaphore *semaphore.Weighted
139157
httpClient *http.Client
140158
filePool re_filesystem.FilePool
141159
cacheDirectory filesystem.Directory
142-
executionClient remoteexecution_pb.ExecutionClient
143-
executionClientPrivateKey *ecdh.PrivateKey
144-
executionClientCertificates [][]byte
160+
executionClient *remoteexecution.Client[*model_command_pb.Action, emptypb.Empty, *emptypb.Empty, *model_command_pb.Result]
145161
}
146162

147-
func (s *builderServer) PerformBuild(request *build_pb.PerformBuildRequest, server build_pb.Builder_PerformBuildServer) error {
148-
ctx := server.Context()
163+
func (e *builderExecutor) CheckReadiness(ctx context.Context) error {
164+
return nil
165+
}
149166

150-
namespace, err := object.NewNamespace(request.Namespace)
167+
func (e *builderExecutor) Execute(ctx context.Context, action *model_build_pb.Action, executionTimeout time.Duration, executionEvents chan<- proto.Message) (proto.Message, time.Duration, remoteworker_pb.CurrentState_Completed_Result) {
168+
namespace, err := object.NewNamespace(action.Namespace)
151169
if err != nil {
152-
return util.StatusWrap(err, "Invalid namespace")
170+
return &model_build_pb.Result{
171+
Status: status.Convert(util.StatusWrap(err, "Invalid namespace")).Proto(),
172+
}, 0, remoteworker_pb.CurrentState_Completed_FAILED
153173
}
154174
instanceName := namespace.InstanceName
155-
objectDownloader := object_namespacemapping.NewNamespaceAddingDownloader(s.objectDownloader, namespace)
156-
buildSpecificationReference, err := namespace.NewGlobalReference(request.BuildSpecificationReference)
175+
objectDownloader := object_namespacemapping.NewNamespaceAddingDownloader(e.objectDownloader, namespace)
176+
buildSpecificationReference, err := namespace.NewGlobalReference(action.BuildSpecificationReference)
157177
if err != nil {
158-
return util.StatusWrap(err, "Invalid build specification reference")
178+
return &model_build_pb.Result{
179+
Status: status.Convert(util.StatusWrap(err, "Invalid build specification reference")).Proto(),
180+
}, 0, remoteworker_pb.CurrentState_Completed_FAILED
159181
}
160182
buildSpecificationEncoder, err := encoding.NewBinaryEncoderFromProto(
161-
request.BuildSpecificationEncoders,
183+
action.BuildSpecificationEncoders,
162184
uint32(namespace.ReferenceFormat.GetMaximumObjectSizeBytes()),
163185
)
164186
if err != nil {
165-
return util.StatusWrap(err, "Invalid build specification encoders")
187+
return &model_build_pb.Result{
188+
Status: status.Convert(util.StatusWrap(err, "Invalid build specification encoder")).Proto(),
189+
}, 0, remoteworker_pb.CurrentState_Completed_FAILED
166190
}
167191
value, err := evaluation.FullyComputeValue(
168192
ctx,
169193
model_analysis.NewTypedComputer(model_analysis.NewBaseComputer(
170194
objectDownloader,
171195
buildSpecificationReference,
172196
buildSpecificationEncoder,
173-
s.httpClient,
174-
s.filePool,
175-
s.cacheDirectory,
176-
s.executionClient,
177-
s.executionClientPrivateKey,
178-
s.executionClientCertificates,
197+
e.httpClient,
198+
e.filePool,
199+
e.cacheDirectory,
200+
e.executionClient,
179201
)),
180202
model_core.NewSimpleMessage[proto.Message](&model_analysis_pb.BuildResult_Key{}),
181203
func(references []object.LocalReference, objectContentsWalkers []dag.ObjectContentsWalker) error {
182204
for i, reference := range references {
183205
if err := dag.UploadDAG(
184206
ctx,
185-
s.dagUploaderClient,
207+
e.dagUploaderClient,
186208
object.GlobalReference{
187209
InstanceName: instanceName,
188210
LocalReference: reference,
189211
},
190212
objectContentsWalkers[i],
191-
s.objectContentsWalkerSemaphore,
213+
e.objectContentsWalkerSemaphore,
192214
// Assume everything we attempt
193215
// to upload is memory backed.
194216
object.Unlimited,
195217
); err != nil {
196-
return fmt.Errorf("failed to store DAG with reference %s: %w", reference.String(), err)
218+
return fmt.Errorf("failed to store DAG with reference %e: %w", reference.String(), err)
197219
}
198220
}
199221
return nil
200222
},
201223
)
202224
if err != nil {
203-
return err
225+
return &model_build_pb.Result{
226+
Status: status.Convert(err).Proto(),
227+
}, 0, remoteworker_pb.CurrentState_Completed_FAILED
204228
}
205-
return status.Errorf(codes.Internal, "XXX: %s", value)
229+
return &model_build_pb.Result{
230+
Status: status.Newf(codes.Internal, "TODO: %s", value).Proto(),
231+
}, 0, remoteworker_pb.CurrentState_Completed_FAILED
206232
}

cmd/bonanza_worker/main.go

+8-34
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,7 @@ package main
22

33
import (
44
"context"
5-
"crypto/ecdh"
6-
"crypto/x509"
75
"encoding/json"
8-
"encoding/pem"
96
"fmt"
107
"os"
118
"regexp"
@@ -116,36 +113,13 @@ func main() {
116113
}
117114
maximumWritableFileUploadDelay := runnerConfiguration.MaximumWritableFileUploadDelay.AsDuration()
118115

119-
platformPrivateKeys := make([]*ecdh.PrivateKey, 0, len(runnerConfiguration.PlatformPrivateKeys))
120-
for i, privateKey := range runnerConfiguration.PlatformPrivateKeys {
121-
privateKeyBlock, _ := pem.Decode([]byte(privateKey))
122-
if privateKeyBlock == nil {
123-
return status.Errorf(codes.InvalidArgument, "Platform private key at index %d does not contain a PEM block", i)
124-
}
125-
if privateKeyBlock.Type != "PRIVATE KEY" {
126-
return status.Errorf(codes.InvalidArgument, "PEM block of platform private key at index %d is not of type PRIVATE KEY", i)
127-
}
128-
parsedPrivateKey, err := x509.ParsePKCS8PrivateKey(privateKeyBlock.Bytes)
129-
if err != nil {
130-
return util.StatusWrapf(err, "Failed to parse platform private key at index %d", i)
131-
}
132-
ecdhPrivateKey, ok := parsedPrivateKey.(*ecdh.PrivateKey)
133-
if !ok {
134-
return status.Errorf(codes.InvalidArgument, "Platform private key at index %d is not an ECDH private key", i)
135-
}
136-
platformPrivateKeys = append(platformPrivateKeys, ecdhPrivateKey)
116+
platformPrivateKeys, err := remoteworker.ParsePlatformPrivateKeys(runnerConfiguration.PlatformPrivateKeys)
117+
if err != nil {
118+
return err
137119
}
138-
139-
clientCertificateAuthorities := x509.NewCertPool()
140-
for certificateBlock, remainder := pem.Decode([]byte(runnerConfiguration.ClientCertificateAuthorities)); certificateBlock != nil; certificateBlock, remainder = pem.Decode(remainder) {
141-
if certificateBlock.Type != "CERTIFICATE" {
142-
return status.Error(codes.InvalidArgument, "Client certificate authority is not of type CERTIFICATE")
143-
}
144-
certificate, err := x509.ParseCertificate(certificateBlock.Bytes)
145-
if err != nil {
146-
return util.StatusWrapWithCode(err, codes.InvalidArgument, "Invalid certificate in client certificate authorities")
147-
}
148-
clientCertificateAuthorities.AddCert(certificate)
120+
clientCertificateAuthorities, err := remoteworker.ParseClientCertificateAuthorities(runnerConfiguration.ClientCertificateAuthorities)
121+
if err != nil {
122+
return err
149123
}
150124

151125
hiddenFilesPattern := func(s string) bool { return false }
@@ -193,7 +167,7 @@ func main() {
193167
return util.StatusWrap(err, "Failed to marshal worker ID")
194168
}
195169

196-
buildExecutor := model_command.NewLocalExecutor(
170+
executor := model_command.NewLocalExecutor(
197171
objectDownloader,
198172
dag_pb.NewUploaderClient(storageGRPCClient),
199173
semaphore.NewWeighted(int64(runtime.NumCPU())),
@@ -216,7 +190,7 @@ func main() {
216190

217191
client, err := remoteworker.NewClient(
218192
schedulerClient,
219-
buildExecutor,
193+
executor,
220194
clock.SystemClock,
221195
random.CryptoThreadSafeGenerator,
222196
platformPrivateKeys,
+9
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
-----BEGIN CERTIFICATE-----
2+
MIIBSzCB/qADAgECAgEBMAUGAytlcDB6MQswCQYDVQQGEwJBVTETMBEGA1UECAwK
3+
U29tZS1TdGF0ZTEhMB8GA1UECgwYSW50ZXJuZXQgV2lkZ2l0cyBQdHkgTHRkMTMw
4+
MQYDVQQDDCpib25hbnphX2JhemVsIGNsaWVudCBjZXJ0aWZpY2F0ZSBhdXRob3Jp
5+
dHkwHhcNMjUwMjI2MDM1OTQyWhcNMjYwMjI2MDM1OTQyWjAAMCowBQYDK2VuAyEA
6+
w7t6QbVj8lPoYaVaXXdrf5bR9MWk69Q9Uk7zhzLFFVCjIzAhMB8GA1UdIwQYMBaA
7+
FO+HGe4qoGcrhpMEko3YDP3y/NKpMAUGAytlcANBACBDHwX3t1TK6XNPcvqsXvfc
8+
8L0TdNSQ6yjK2gYcSid7kR6K0IK/blaL3vohZXnEmcRI/4bFRtQgzluSmhcwFAM=
9+
-----END CERTIFICATE-----
+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
-----BEGIN PRIVATE KEY-----
2+
MC4CAQAwBQYDK2VuBCIEIMDh16UF9Qs2bafl+trOBKyuhSW1PWndsbyMv3tQfrVC
3+
-----END PRIVATE KEY-----

0 commit comments

Comments
 (0)