diff --git a/internal/venice-common/src/main/proto/controller/ClusterAdminOpsGrpcService.proto b/internal/venice-common/src/main/proto/controller/ClusterAdminOpsGrpcService.proto index ad39a88a77..b6c2d2cbd2 100644 --- a/internal/venice-common/src/main/proto/controller/ClusterAdminOpsGrpcService.proto +++ b/internal/venice-common/src/main/proto/controller/ClusterAdminOpsGrpcService.proto @@ -13,7 +13,7 @@ service ClusterAdminOpsGrpcService { // AdminTopicMetadata rpc getAdminTopicMetadata(AdminTopicMetadataGrpcRequest) returns (AdminTopicMetadataGrpcResponse) {} - rpc updateAdminTopicMetadata(UpdateAdminTopicMetadataGrpcRequest) returns (UpdateAdminTopicMetadataGrpcResponse) {} + rpc updateAdminTopicMetadata(UpdateAdminTopicMetadataGrpcRequest) returns (AdminTopicMetadataGrpcResponse) {} } @@ -48,18 +48,13 @@ message AdminTopicMetadataGrpcResponse { AdminTopicGrpcMetadata metadata = 1; } -message UpdateAdminTopicMetadataGrpcResponse { - string clusterName = 1; - optional string storeName = 2; -} - message UpdateAdminTopicMetadataGrpcRequest { AdminTopicGrpcMetadata metadata = 1; } message AdminTopicGrpcMetadata { string clusterName = 1; - int64 executionId = 2; + optional int64 executionId = 2; optional string storeName = 3; optional int64 offset = 4; optional int64 upstreamOffset = 5; diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/grpc/server/ClusterAdminOpsGrpcServiceImpl.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/grpc/server/ClusterAdminOpsGrpcServiceImpl.java index 4cde1654c4..b248752e98 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/grpc/server/ClusterAdminOpsGrpcServiceImpl.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/grpc/server/ClusterAdminOpsGrpcServiceImpl.java @@ -16,7 +16,6 @@ import com.linkedin.venice.protocols.controller.LastSuccessfulAdminCommandExecutionGrpcRequest; import com.linkedin.venice.protocols.controller.LastSuccessfulAdminCommandExecutionGrpcResponse; import com.linkedin.venice.protocols.controller.UpdateAdminTopicMetadataGrpcRequest; -import com.linkedin.venice.protocols.controller.UpdateAdminTopicMetadataGrpcResponse; import io.grpc.Context; import io.grpc.stub.StreamObserver; import org.apache.logging.log4j.LogManager; @@ -77,7 +76,7 @@ public void getAdminTopicMetadata( @Override public void updateAdminTopicMetadata( UpdateAdminTopicMetadataGrpcRequest request, - StreamObserver responseObserver) { + StreamObserver responseObserver) { LOGGER.debug("Received updateAdminTopicMetadata request: {}", request); AdminTopicGrpcMetadata metadata = request.getMetadata(); ControllerGrpcServerUtils.handleRequest(ClusterAdminOpsGrpcServiceGrpc.getUpdateAdminTopicMetadataMethod(), () -> { diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/AdminTopicMetadataRoutes.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/AdminTopicMetadataRoutes.java index fa2a6300e0..12e9fe2e37 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/AdminTopicMetadataRoutes.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/AdminTopicMetadataRoutes.java @@ -21,7 +21,6 @@ import com.linkedin.venice.protocols.controller.AdminTopicMetadataGrpcRequest; import com.linkedin.venice.protocols.controller.AdminTopicMetadataGrpcResponse; import com.linkedin.venice.protocols.controller.UpdateAdminTopicMetadataGrpcRequest; -import com.linkedin.venice.protocols.controller.UpdateAdminTopicMetadataGrpcResponse; import java.util.Optional; import org.apache.http.HttpStatus; import spark.Route; @@ -93,10 +92,11 @@ public Route updateAdminTopicMetadata(Admin admin, ClusterAdminOpsRequestHandler storeName.ifPresent(adminMetadataBuilder::setStoreName); offset.ifPresent(adminMetadataBuilder::setOffset); upstreamOffset.ifPresent(adminMetadataBuilder::setUpstreamOffset); - UpdateAdminTopicMetadataGrpcResponse internalResponse = requestHandler.updateAdminTopicMetadata( - UpdateAdminTopicMetadataGrpcRequest.newBuilder().setMetadata(adminMetadataBuilder).build()); - responseObject.setCluster(internalResponse.getClusterName()); - responseObject.setName(internalResponse.hasStoreName() ? internalResponse.getStoreName() : null); + AdminTopicMetadataGrpcResponse internalResponse = requestHandler.updateAdminTopicMetadata( + UpdateAdminTopicMetadataGrpcRequest.newBuilder().setMetadata(adminMetadataBuilder.build()).build()); + responseObject.setCluster(internalResponse.getMetadata().getClusterName()); + responseObject.setName( + internalResponse.getMetadata().hasStoreName() ? internalResponse.getMetadata().getStoreName() : null); } catch (Throwable e) { responseObject.setError(e); AdminSparkServer.handleError(new VeniceException(e), request, response); diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/ClusterAdminOpsRequestHandler.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/ClusterAdminOpsRequestHandler.java index ccdcdb097b..ece228b282 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/ClusterAdminOpsRequestHandler.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/ClusterAdminOpsRequestHandler.java @@ -15,7 +15,6 @@ import com.linkedin.venice.protocols.controller.LastSuccessfulAdminCommandExecutionGrpcRequest; import com.linkedin.venice.protocols.controller.LastSuccessfulAdminCommandExecutionGrpcResponse; import com.linkedin.venice.protocols.controller.UpdateAdminTopicMetadataGrpcRequest; -import com.linkedin.venice.protocols.controller.UpdateAdminTopicMetadataGrpcResponse; import com.linkedin.venice.utils.Pair; import java.util.Map; import java.util.Optional; @@ -110,7 +109,7 @@ public AdminTopicMetadataGrpcResponse getAdminTopicMetadata(AdminTopicMetadataGr return AdminTopicMetadataGrpcResponse.newBuilder().setMetadata(adminMetadataBuilder.build()).build(); } - public UpdateAdminTopicMetadataGrpcResponse updateAdminTopicMetadata(UpdateAdminTopicMetadataGrpcRequest request) { + public AdminTopicMetadataGrpcResponse updateAdminTopicMetadata(UpdateAdminTopicMetadataGrpcRequest request) { AdminTopicGrpcMetadata metadata = request.getMetadata(); String clusterName = metadata.getClusterName(); long executionId = metadata.getExecutionId(); @@ -134,11 +133,19 @@ public UpdateAdminTopicMetadataGrpcResponse updateAdminTopicMetadata(UpdateAdmin Optional.ofNullable(storeName), Optional.ofNullable(offset), Optional.ofNullable(upstreamOffset)); - UpdateAdminTopicMetadataGrpcResponse.Builder responseBuilder = - UpdateAdminTopicMetadataGrpcResponse.newBuilder().setClusterName(clusterName); - if (storeName != null) { - responseBuilder.setStoreName(storeName); - } + + AdminTopicGrpcMetadata.Builder adminTopicGrpcMetadataBuilder = + AdminTopicGrpcMetadata.newBuilder().setClusterName(clusterName).setExecutionId(executionId); + + if (storeName != null) + adminTopicGrpcMetadataBuilder.setStoreName(storeName); + if (offset != null) + adminTopicGrpcMetadataBuilder.setOffset(offset); + if (upstreamOffset != null) + adminTopicGrpcMetadataBuilder.setUpstreamOffset(upstreamOffset); + + AdminTopicMetadataGrpcResponse.Builder responseBuilder = + AdminTopicMetadataGrpcResponse.newBuilder().setMetadata(adminTopicGrpcMetadataBuilder.build()); return responseBuilder.build(); } } diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/ControllerRequestParamValidator.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/ControllerRequestParamValidator.java index 34671785b9..6f2864e9f9 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/ControllerRequestParamValidator.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/ControllerRequestParamValidator.java @@ -39,7 +39,7 @@ public static void validateClusterStoreInfo(ClusterStoreGrpcInfo rpcContext) { public static void validateAdminCommandExecutionRequest(String clusterName, long executionId) { if (StringUtils.isBlank(clusterName)) { - throw new IllegalArgumentException("Cluster name is required for getting admin command execution status"); + throw new IllegalArgumentException("Cluster name is required for admin command execution"); } if (executionId <= 0) { throw new IllegalArgumentException("Admin command execution id with positive value is required"); diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/controller/grpc/server/ClusterAdminOpsGrpcServiceImplTest.java b/services/venice-controller/src/test/java/com/linkedin/venice/controller/grpc/server/ClusterAdminOpsGrpcServiceImplTest.java index 056cad9874..dcfe281f51 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/controller/grpc/server/ClusterAdminOpsGrpcServiceImplTest.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/controller/grpc/server/ClusterAdminOpsGrpcServiceImplTest.java @@ -7,6 +7,7 @@ import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.expectThrows; @@ -25,7 +26,6 @@ import com.linkedin.venice.protocols.controller.LastSuccessfulAdminCommandExecutionGrpcRequest; import com.linkedin.venice.protocols.controller.LastSuccessfulAdminCommandExecutionGrpcResponse; import com.linkedin.venice.protocols.controller.UpdateAdminTopicMetadataGrpcRequest; -import com.linkedin.venice.protocols.controller.UpdateAdminTopicMetadataGrpcResponse; import com.linkedin.venice.protocols.controller.VeniceControllerGrpcErrorInfo; import io.grpc.ManagedChannel; import io.grpc.Server; @@ -148,18 +148,33 @@ public void testGetAdminTopicMetadataSuccess() { @Test public void testUpdateAdminTopicMetadataSuccess() { - UpdateAdminTopicMetadataGrpcResponse response = - UpdateAdminTopicMetadataGrpcResponse.newBuilder().setClusterName(TEST_CLUSTER).build(); + AdminTopicGrpcMetadata.Builder adminTopicGrpcMetadataBuilder = AdminTopicGrpcMetadata.newBuilder() + .setClusterName(TEST_CLUSTER) + .setExecutionId(EXECUTION_ID) + .setOffset(100L) + .setUpstreamOffset(-1L); + AdminTopicMetadataGrpcResponse response = + AdminTopicMetadataGrpcResponse.newBuilder().setMetadata(adminTopicGrpcMetadataBuilder.build()).build(); doReturn(response).when(requestHandler).updateAdminTopicMetadata(any(UpdateAdminTopicMetadataGrpcRequest.class)); doReturn(true).when(accessManager).isAllowListUser(anyString(), any()); UpdateAdminTopicMetadataGrpcRequest request = UpdateAdminTopicMetadataGrpcRequest.newBuilder() - .setMetadata(AdminTopicGrpcMetadata.newBuilder().setClusterName(TEST_CLUSTER).setExecutionId(EXECUTION_ID)) + .setMetadata( + AdminTopicGrpcMetadata.newBuilder() + .setClusterName(TEST_CLUSTER) + .setExecutionId(EXECUTION_ID) + .setOffset(100L) + .setUpstreamOffset(-1L)) .build(); - UpdateAdminTopicMetadataGrpcResponse actualResponse = blockingStub.updateAdminTopicMetadata(request); + AdminTopicMetadataGrpcResponse actualResponse = blockingStub.updateAdminTopicMetadata(request); assertNotNull(actualResponse); - assertEquals(actualResponse.getClusterName(), TEST_CLUSTER); + assertEquals(actualResponse.getMetadata().getClusterName(), TEST_CLUSTER); + assertEquals(actualResponse.getMetadata().getExecutionId(), EXECUTION_ID); + assertEquals(actualResponse.getMetadata().getOffset(), 100L); + assertEquals(actualResponse.getMetadata().getUpstreamOffset(), -1L); + // Since store name is not provided in the request, no store name will be returned in the response + assertFalse(actualResponse.getMetadata().hasStoreName()); } @Test diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/controller/server/AdminTopicMetadataRoutesTest.java b/services/venice-controller/src/test/java/com/linkedin/venice/controller/server/AdminTopicMetadataRoutesTest.java index 863f50f54b..f8dce51224 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/controller/server/AdminTopicMetadataRoutesTest.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/controller/server/AdminTopicMetadataRoutesTest.java @@ -26,7 +26,6 @@ import com.linkedin.venice.protocols.controller.AdminTopicMetadataGrpcRequest; import com.linkedin.venice.protocols.controller.AdminTopicMetadataGrpcResponse; import com.linkedin.venice.protocols.controller.UpdateAdminTopicMetadataGrpcRequest; -import com.linkedin.venice.protocols.controller.UpdateAdminTopicMetadataGrpcResponse; import com.linkedin.venice.utils.ObjectMapperFactory; import java.security.cert.X509Certificate; import java.util.HashMap; @@ -147,8 +146,12 @@ public void testUpdateAdminTopicMetadataSuccess() throws Exception { when(request.queryParams(EXECUTION_ID)).thenReturn(String.valueOf(TEST_EXECUTION_ID)); when(request.queryParams(STORE_NAME)).thenReturn(TEST_STORE); - UpdateAdminTopicMetadataGrpcResponse grpcResponse = - UpdateAdminTopicMetadataGrpcResponse.newBuilder().setClusterName(TEST_CLUSTER).setStoreName(TEST_STORE).build(); + AdminTopicGrpcMetadata.Builder adminTopicGrpcMetadataBuilder = AdminTopicGrpcMetadata.newBuilder() + .setClusterName(TEST_CLUSTER) + .setStoreName(TEST_STORE) + .setExecutionId(TEST_EXECUTION_ID); + AdminTopicMetadataGrpcResponse grpcResponse = + AdminTopicMetadataGrpcResponse.newBuilder().setMetadata(adminTopicGrpcMetadataBuilder.build()).build(); when(requestHandler.updateAdminTopicMetadata(any(UpdateAdminTopicMetadataGrpcRequest.class))) .thenReturn(grpcResponse); diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/controller/server/ClusterAdminOpsRequestHandlerTest.java b/services/venice-controller/src/test/java/com/linkedin/venice/controller/server/ClusterAdminOpsRequestHandlerTest.java index a109c39c48..0116344150 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/controller/server/ClusterAdminOpsRequestHandlerTest.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/controller/server/ClusterAdminOpsRequestHandlerTest.java @@ -24,7 +24,6 @@ import com.linkedin.venice.protocols.controller.LastSuccessfulAdminCommandExecutionGrpcRequest; import com.linkedin.venice.protocols.controller.LastSuccessfulAdminCommandExecutionGrpcResponse; import com.linkedin.venice.protocols.controller.UpdateAdminTopicMetadataGrpcRequest; -import com.linkedin.venice.protocols.controller.UpdateAdminTopicMetadataGrpcResponse; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -198,10 +197,10 @@ public void testUpdateAdminTopicMetadataSuccess() { .build(); UpdateAdminTopicMetadataGrpcRequest request = UpdateAdminTopicMetadataGrpcRequest.newBuilder().setMetadata(metadata).build(); - UpdateAdminTopicMetadataGrpcResponse response = handler.updateAdminTopicMetadata(request); + AdminTopicMetadataGrpcResponse response = handler.updateAdminTopicMetadata(request); assertNotNull(response); - assertEquals(response.getClusterName(), clusterName); - assertFalse(response.hasStoreName()); + assertEquals(response.getMetadata().getClusterName(), clusterName); + assertFalse(response.getMetadata().hasStoreName()); // Store name is provided metadata = AdminTopicGrpcMetadata.newBuilder() @@ -212,22 +211,30 @@ public void testUpdateAdminTopicMetadataSuccess() { request = UpdateAdminTopicMetadataGrpcRequest.newBuilder().setMetadata(metadata).build(); response = handler.updateAdminTopicMetadata(request); assertNotNull(response); - assertEquals(response.getClusterName(), clusterName); + assertEquals(response.getMetadata().getClusterName(), clusterName); } @Test - public void testUpdateAdminTopicMetadataInvalidOffsets() { + public void testUpdateAdminTopicMetadataInvalidInputs() { String clusterName = "test-cluster"; long executionId = 12345L; - AdminTopicGrpcMetadata metadata = AdminTopicGrpcMetadata.newBuilder() + // No execution id + AdminTopicGrpcMetadata metadata = AdminTopicGrpcMetadata.newBuilder().setClusterName(clusterName).build(); + UpdateAdminTopicMetadataGrpcRequest request = + UpdateAdminTopicMetadataGrpcRequest.newBuilder().setMetadata(metadata).build(); + Exception exception = expectThrows(IllegalArgumentException.class, () -> handler.updateAdminTopicMetadata(request)); + assertTrue(exception.getMessage().contains("Admin command execution id with positive value is required")); + + // Either offset or upstream offset is provided + metadata = AdminTopicGrpcMetadata.newBuilder() .setClusterName(clusterName) .setExecutionId(executionId) .setOffset(123L) .build(); - UpdateAdminTopicMetadataGrpcRequest request = + UpdateAdminTopicMetadataGrpcRequest request1 = UpdateAdminTopicMetadataGrpcRequest.newBuilder().setMetadata(metadata).build(); - Exception exception = expectThrows(VeniceException.class, () -> handler.updateAdminTopicMetadata(request)); + exception = expectThrows(VeniceException.class, () -> handler.updateAdminTopicMetadata(request1)); assertTrue( exception.getMessage().contains("Offsets must be provided to update cluster-level admin topic metadata"), "Actual message: " + exception.getMessage());