Skip to content

Commit

Permalink
[admin-tool] Modify the response of updateAdminTopicMetadata DIFFSIZE…
Browse files Browse the repository at this point in the history
…OVERRIDE (#1530)

This PR modifies the response of /updateAdminTopicMetadata to return more meaningful response.

Before, when we send request to update metadata within /AdminTopicMetadata or updating executionId
at store level, we return cluster name and store name (if applicable), and not returning the metadata itself.
With this PR, we now return applicable metadata within the endpoint response.
  • Loading branch information
minhmo1620 authored Feb 14, 2025
1 parent 1e2b47d commit 3dbbaed
Show file tree
Hide file tree
Showing 8 changed files with 66 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ service ClusterAdminOpsGrpcService {

// AdminTopicMetadata
rpc getAdminTopicMetadata(AdminTopicMetadataGrpcRequest) returns (AdminTopicMetadataGrpcResponse) {}
rpc updateAdminTopicMetadata(UpdateAdminTopicMetadataGrpcRequest) returns (UpdateAdminTopicMetadataGrpcResponse) {}
rpc updateAdminTopicMetadata(UpdateAdminTopicMetadataGrpcRequest) returns (AdminTopicMetadataGrpcResponse) {}
}


Expand Down Expand Up @@ -48,18 +48,13 @@ message AdminTopicMetadataGrpcResponse {
AdminTopicGrpcMetadata metadata = 1;
}

message UpdateAdminTopicMetadataGrpcResponse {
string clusterName = 1;
optional string storeName = 2;
}

message UpdateAdminTopicMetadataGrpcRequest {
AdminTopicGrpcMetadata metadata = 1;
}

message AdminTopicGrpcMetadata {
string clusterName = 1;
int64 executionId = 2;
optional int64 executionId = 2;
optional string storeName = 3;
optional int64 offset = 4;
optional int64 upstreamOffset = 5;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import com.linkedin.venice.protocols.controller.LastSuccessfulAdminCommandExecutionGrpcRequest;
import com.linkedin.venice.protocols.controller.LastSuccessfulAdminCommandExecutionGrpcResponse;
import com.linkedin.venice.protocols.controller.UpdateAdminTopicMetadataGrpcRequest;
import com.linkedin.venice.protocols.controller.UpdateAdminTopicMetadataGrpcResponse;
import io.grpc.Context;
import io.grpc.stub.StreamObserver;
import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -77,7 +76,7 @@ public void getAdminTopicMetadata(
@Override
public void updateAdminTopicMetadata(
UpdateAdminTopicMetadataGrpcRequest request,
StreamObserver<UpdateAdminTopicMetadataGrpcResponse> responseObserver) {
StreamObserver<AdminTopicMetadataGrpcResponse> responseObserver) {
LOGGER.debug("Received updateAdminTopicMetadata request: {}", request);
AdminTopicGrpcMetadata metadata = request.getMetadata();
ControllerGrpcServerUtils.handleRequest(ClusterAdminOpsGrpcServiceGrpc.getUpdateAdminTopicMetadataMethod(), () -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import com.linkedin.venice.protocols.controller.AdminTopicMetadataGrpcRequest;
import com.linkedin.venice.protocols.controller.AdminTopicMetadataGrpcResponse;
import com.linkedin.venice.protocols.controller.UpdateAdminTopicMetadataGrpcRequest;
import com.linkedin.venice.protocols.controller.UpdateAdminTopicMetadataGrpcResponse;
import java.util.Optional;
import org.apache.http.HttpStatus;
import spark.Route;
Expand Down Expand Up @@ -93,10 +92,11 @@ public Route updateAdminTopicMetadata(Admin admin, ClusterAdminOpsRequestHandler
storeName.ifPresent(adminMetadataBuilder::setStoreName);
offset.ifPresent(adminMetadataBuilder::setOffset);
upstreamOffset.ifPresent(adminMetadataBuilder::setUpstreamOffset);
UpdateAdminTopicMetadataGrpcResponse internalResponse = requestHandler.updateAdminTopicMetadata(
UpdateAdminTopicMetadataGrpcRequest.newBuilder().setMetadata(adminMetadataBuilder).build());
responseObject.setCluster(internalResponse.getClusterName());
responseObject.setName(internalResponse.hasStoreName() ? internalResponse.getStoreName() : null);
AdminTopicMetadataGrpcResponse internalResponse = requestHandler.updateAdminTopicMetadata(
UpdateAdminTopicMetadataGrpcRequest.newBuilder().setMetadata(adminMetadataBuilder.build()).build());
responseObject.setCluster(internalResponse.getMetadata().getClusterName());
responseObject.setName(
internalResponse.getMetadata().hasStoreName() ? internalResponse.getMetadata().getStoreName() : null);
} catch (Throwable e) {
responseObject.setError(e);
AdminSparkServer.handleError(new VeniceException(e), request, response);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import com.linkedin.venice.protocols.controller.LastSuccessfulAdminCommandExecutionGrpcRequest;
import com.linkedin.venice.protocols.controller.LastSuccessfulAdminCommandExecutionGrpcResponse;
import com.linkedin.venice.protocols.controller.UpdateAdminTopicMetadataGrpcRequest;
import com.linkedin.venice.protocols.controller.UpdateAdminTopicMetadataGrpcResponse;
import com.linkedin.venice.utils.Pair;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -110,7 +109,7 @@ public AdminTopicMetadataGrpcResponse getAdminTopicMetadata(AdminTopicMetadataGr
return AdminTopicMetadataGrpcResponse.newBuilder().setMetadata(adminMetadataBuilder.build()).build();
}

public UpdateAdminTopicMetadataGrpcResponse updateAdminTopicMetadata(UpdateAdminTopicMetadataGrpcRequest request) {
public AdminTopicMetadataGrpcResponse updateAdminTopicMetadata(UpdateAdminTopicMetadataGrpcRequest request) {
AdminTopicGrpcMetadata metadata = request.getMetadata();
String clusterName = metadata.getClusterName();
long executionId = metadata.getExecutionId();
Expand All @@ -134,11 +133,19 @@ public UpdateAdminTopicMetadataGrpcResponse updateAdminTopicMetadata(UpdateAdmin
Optional.ofNullable(storeName),
Optional.ofNullable(offset),
Optional.ofNullable(upstreamOffset));
UpdateAdminTopicMetadataGrpcResponse.Builder responseBuilder =
UpdateAdminTopicMetadataGrpcResponse.newBuilder().setClusterName(clusterName);
if (storeName != null) {
responseBuilder.setStoreName(storeName);
}

AdminTopicGrpcMetadata.Builder adminTopicGrpcMetadataBuilder =
AdminTopicGrpcMetadata.newBuilder().setClusterName(clusterName).setExecutionId(executionId);

if (storeName != null)
adminTopicGrpcMetadataBuilder.setStoreName(storeName);
if (offset != null)
adminTopicGrpcMetadataBuilder.setOffset(offset);
if (upstreamOffset != null)
adminTopicGrpcMetadataBuilder.setUpstreamOffset(upstreamOffset);

AdminTopicMetadataGrpcResponse.Builder responseBuilder =
AdminTopicMetadataGrpcResponse.newBuilder().setMetadata(adminTopicGrpcMetadataBuilder.build());
return responseBuilder.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public static void validateClusterStoreInfo(ClusterStoreGrpcInfo rpcContext) {

public static void validateAdminCommandExecutionRequest(String clusterName, long executionId) {
if (StringUtils.isBlank(clusterName)) {
throw new IllegalArgumentException("Cluster name is required for getting admin command execution status");
throw new IllegalArgumentException("Cluster name is required for admin command execution");
}
if (executionId <= 0) {
throw new IllegalArgumentException("Admin command execution id with positive value is required");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.expectThrows;
Expand All @@ -25,7 +26,6 @@
import com.linkedin.venice.protocols.controller.LastSuccessfulAdminCommandExecutionGrpcRequest;
import com.linkedin.venice.protocols.controller.LastSuccessfulAdminCommandExecutionGrpcResponse;
import com.linkedin.venice.protocols.controller.UpdateAdminTopicMetadataGrpcRequest;
import com.linkedin.venice.protocols.controller.UpdateAdminTopicMetadataGrpcResponse;
import com.linkedin.venice.protocols.controller.VeniceControllerGrpcErrorInfo;
import io.grpc.ManagedChannel;
import io.grpc.Server;
Expand Down Expand Up @@ -148,18 +148,33 @@ public void testGetAdminTopicMetadataSuccess() {

@Test
public void testUpdateAdminTopicMetadataSuccess() {
UpdateAdminTopicMetadataGrpcResponse response =
UpdateAdminTopicMetadataGrpcResponse.newBuilder().setClusterName(TEST_CLUSTER).build();
AdminTopicGrpcMetadata.Builder adminTopicGrpcMetadataBuilder = AdminTopicGrpcMetadata.newBuilder()
.setClusterName(TEST_CLUSTER)
.setExecutionId(EXECUTION_ID)
.setOffset(100L)
.setUpstreamOffset(-1L);
AdminTopicMetadataGrpcResponse response =
AdminTopicMetadataGrpcResponse.newBuilder().setMetadata(adminTopicGrpcMetadataBuilder.build()).build();
doReturn(response).when(requestHandler).updateAdminTopicMetadata(any(UpdateAdminTopicMetadataGrpcRequest.class));
doReturn(true).when(accessManager).isAllowListUser(anyString(), any());

UpdateAdminTopicMetadataGrpcRequest request = UpdateAdminTopicMetadataGrpcRequest.newBuilder()
.setMetadata(AdminTopicGrpcMetadata.newBuilder().setClusterName(TEST_CLUSTER).setExecutionId(EXECUTION_ID))
.setMetadata(
AdminTopicGrpcMetadata.newBuilder()
.setClusterName(TEST_CLUSTER)
.setExecutionId(EXECUTION_ID)
.setOffset(100L)
.setUpstreamOffset(-1L))
.build();

UpdateAdminTopicMetadataGrpcResponse actualResponse = blockingStub.updateAdminTopicMetadata(request);
AdminTopicMetadataGrpcResponse actualResponse = blockingStub.updateAdminTopicMetadata(request);
assertNotNull(actualResponse);
assertEquals(actualResponse.getClusterName(), TEST_CLUSTER);
assertEquals(actualResponse.getMetadata().getClusterName(), TEST_CLUSTER);
assertEquals(actualResponse.getMetadata().getExecutionId(), EXECUTION_ID);
assertEquals(actualResponse.getMetadata().getOffset(), 100L);
assertEquals(actualResponse.getMetadata().getUpstreamOffset(), -1L);
// Since store name is not provided in the request, no store name will be returned in the response
assertFalse(actualResponse.getMetadata().hasStoreName());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import com.linkedin.venice.protocols.controller.AdminTopicMetadataGrpcRequest;
import com.linkedin.venice.protocols.controller.AdminTopicMetadataGrpcResponse;
import com.linkedin.venice.protocols.controller.UpdateAdminTopicMetadataGrpcRequest;
import com.linkedin.venice.protocols.controller.UpdateAdminTopicMetadataGrpcResponse;
import com.linkedin.venice.utils.ObjectMapperFactory;
import java.security.cert.X509Certificate;
import java.util.HashMap;
Expand Down Expand Up @@ -147,8 +146,12 @@ public void testUpdateAdminTopicMetadataSuccess() throws Exception {
when(request.queryParams(EXECUTION_ID)).thenReturn(String.valueOf(TEST_EXECUTION_ID));
when(request.queryParams(STORE_NAME)).thenReturn(TEST_STORE);

UpdateAdminTopicMetadataGrpcResponse grpcResponse =
UpdateAdminTopicMetadataGrpcResponse.newBuilder().setClusterName(TEST_CLUSTER).setStoreName(TEST_STORE).build();
AdminTopicGrpcMetadata.Builder adminTopicGrpcMetadataBuilder = AdminTopicGrpcMetadata.newBuilder()
.setClusterName(TEST_CLUSTER)
.setStoreName(TEST_STORE)
.setExecutionId(TEST_EXECUTION_ID);
AdminTopicMetadataGrpcResponse grpcResponse =
AdminTopicMetadataGrpcResponse.newBuilder().setMetadata(adminTopicGrpcMetadataBuilder.build()).build();

when(requestHandler.updateAdminTopicMetadata(any(UpdateAdminTopicMetadataGrpcRequest.class)))
.thenReturn(grpcResponse);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import com.linkedin.venice.protocols.controller.LastSuccessfulAdminCommandExecutionGrpcRequest;
import com.linkedin.venice.protocols.controller.LastSuccessfulAdminCommandExecutionGrpcResponse;
import com.linkedin.venice.protocols.controller.UpdateAdminTopicMetadataGrpcRequest;
import com.linkedin.venice.protocols.controller.UpdateAdminTopicMetadataGrpcResponse;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -198,10 +197,10 @@ public void testUpdateAdminTopicMetadataSuccess() {
.build();
UpdateAdminTopicMetadataGrpcRequest request =
UpdateAdminTopicMetadataGrpcRequest.newBuilder().setMetadata(metadata).build();
UpdateAdminTopicMetadataGrpcResponse response = handler.updateAdminTopicMetadata(request);
AdminTopicMetadataGrpcResponse response = handler.updateAdminTopicMetadata(request);
assertNotNull(response);
assertEquals(response.getClusterName(), clusterName);
assertFalse(response.hasStoreName());
assertEquals(response.getMetadata().getClusterName(), clusterName);
assertFalse(response.getMetadata().hasStoreName());

// Store name is provided
metadata = AdminTopicGrpcMetadata.newBuilder()
Expand All @@ -212,22 +211,30 @@ public void testUpdateAdminTopicMetadataSuccess() {
request = UpdateAdminTopicMetadataGrpcRequest.newBuilder().setMetadata(metadata).build();
response = handler.updateAdminTopicMetadata(request);
assertNotNull(response);
assertEquals(response.getClusterName(), clusterName);
assertEquals(response.getMetadata().getClusterName(), clusterName);
}

@Test
public void testUpdateAdminTopicMetadataInvalidOffsets() {
public void testUpdateAdminTopicMetadataInvalidInputs() {
String clusterName = "test-cluster";
long executionId = 12345L;

AdminTopicGrpcMetadata metadata = AdminTopicGrpcMetadata.newBuilder()
// No execution id
AdminTopicGrpcMetadata metadata = AdminTopicGrpcMetadata.newBuilder().setClusterName(clusterName).build();
UpdateAdminTopicMetadataGrpcRequest request =
UpdateAdminTopicMetadataGrpcRequest.newBuilder().setMetadata(metadata).build();
Exception exception = expectThrows(IllegalArgumentException.class, () -> handler.updateAdminTopicMetadata(request));
assertTrue(exception.getMessage().contains("Admin command execution id with positive value is required"));

// Either offset or upstream offset is provided
metadata = AdminTopicGrpcMetadata.newBuilder()
.setClusterName(clusterName)
.setExecutionId(executionId)
.setOffset(123L)
.build();
UpdateAdminTopicMetadataGrpcRequest request =
UpdateAdminTopicMetadataGrpcRequest request1 =
UpdateAdminTopicMetadataGrpcRequest.newBuilder().setMetadata(metadata).build();
Exception exception = expectThrows(VeniceException.class, () -> handler.updateAdminTopicMetadata(request));
exception = expectThrows(VeniceException.class, () -> handler.updateAdminTopicMetadata(request1));
assertTrue(
exception.getMessage().contains("Offsets must be provided to update cluster-level admin topic metadata"),
"Actual message: " + exception.getMessage());
Expand Down

0 comments on commit 3dbbaed

Please sign in to comment.