From 6b45fd3842bc536912f7cd2143241b3be54c9539 Mon Sep 17 00:00:00 2001 From: Benjamin Gaidioz Date: Sat, 1 Mar 2025 00:14:26 +0100 Subject: [PATCH] Add (and process at the top) SDK specific exceptions to distinguish errors (#29) * Added a couple needed new exceptions for an SDK to report specific errors, * Catch the exceptions in the server to rewrite them as gRPC errors, * Edited `DASSdkManager` to catch the guava cache exception one gets if the SDK throws an exception, so that the SDK exception propagates. Otherwise, whichever exception propagates. Therefore I removed the `.setError` code to be consistent with the idea that errors are reported through gRPC errors. > [!CAUTION] > I believe we never hit `REGISTRATION_FAILED` in psql, since that one is made up from `.error` instead of a `gRPC` failure. Maybe for backward compatibility we can instead leave it? Manually tried against that version of multicorn-das: https://github.com/raw-labs/multicorn-das/pull/27 which modifies error handling during registration. With errors triggered in `DASMock` or `DASMockTable`: If for example DASMock throws a `DASSdkUnauthenticatedException` in its initializer. That mimics a DAS instance that performs some authentication early on, verifies the password, etc. ```scala class DASMock(options: Map[String, String]) extends DASSdk with StrictLogging { options.keys.foreach(key => logger.info(s"Option: $key = ${options(key)}")) private val dasMockStorage = new DASMockStorage("column1") throw new DASSdkUnauthenticatedException("Wrong password") // HERE ``` The effect is: ``` ERROR: Wrong password DETAIL: {"code": "UNAUTHENTICATED", "message": "Wrong password", "das_name": "localhost:50051", "das_type": "mock", "das_url": "localhost:50051", "table_name": "slow", "cause": "<_InactiveRpcError of RPC that terminated with:\n\tstatus = StatusCode.UNAUTHENTICATED\n\tdetails = \"Wrong password\"\n\tdebug_error_string = \"UNKNOWN:Error received from peer {created_time:\"2025-02-28T14:35:21.707372+01:00\", grpc_status:16, grpc_message:\"Wrong password\"}\"\n>"} ``` Similar with an invalid option. ```scala throw new DASSdkInvalidArgumentException("Invalid options: missing an important key") ``` ``` ERROR: Invalid options: missing an important key DETAIL: {"code": "INVALID_ARGUMENT", "message": "Invalid options: missing an important key", "das_name": "localhost:50051", "das_type": "mock", "das_url": "localhost:50051", "table_name": "slow", "cause": "<_InactiveRpcError of RPC that terminated with:\n\tstatus = StatusCode.INVALID_ARGUMENT\n\tdetails = \"Invalid options: missing an important key\"\n\tdebug_error_string = \"UNKNOWN:Error received from peer {created_time:\"2025-02-28T14:39:34.398676+01:00\", grpc_status:3, grpc_message:\"Invalid options: missing an important key\"}\"\n>"} ``` If the error is unexpected (e.g. a bug in the DAS initializer, `NullPointerException`). The error is `INTERNAL`. ``` ERROR: gRPC error calling remote DAS server DETAIL: {"code": "INTERNAL", "message": "gRPC error calling remote DAS server", "das_name": null, "das_type": null, "das_url": null, "table_name": null, "cause": "<_InactiveRpcError of RPC that terminated with:\n\tstatus = StatusCode.INTERNAL\n\tdetails = \"\"\n\tdebug_error_string = \"UNKNOWN:Error received from peer {created_time:\"2025-02-28T14:38:26.791684+01:00\", grpc_status:13, grpc_message:\"\"}\"\n>"} ``` And if the DAS server is down, after 30 attempts: ``` ERROR: Server unavailable DETAIL: {"code": "UNAVAILABLE", "message": "Server unavailable", "das_name": "localhost:50051", "das_type": "mock", "das_url": "localhost:50051", "table_name": "slow", "cause": "<_InactiveRpcError of RPC that terminated with:\n\tstatus = StatusCode.UNAVAILABLE\n\tdetails = \"failed to connect to all addresses; last error: UNKNOWN: ipv4:127.0.0.1:50051: Failed to connect to remote host: connect: Connection refused (61)\"\n\tdebug_error_string = \"UNKNOWN:Error received from peer {grpc_message:\"failed to connect to all addresses; last error: UNKNOWN: ipv4:127.0.0.1:50051: Failed to connect to remote host: connect: Connection refused (61)\", grpc_status:14, created_time:\"2025-02-28T14:41:27.460325+01:00\"}\"\n>"} ``` If the error occurs after registration, e.g. when running `.execute` of a `DASTable`. ```scala throw new DASSdkInvalidArgumentException("Invalid predicate") ``` ``` ERROR: Invalid predicate DETAIL: {"code": "INVALID_ARGUMENT", "message": "Invalid predicate", "das_name": "localhost:50051", "das_type": "mock", "das_url": "localhost:50051", "table_name": "slow", "cause": "<_MultiThreadedRendezvous of RPC that terminated with:\n\tstatus = StatusCode.INVALID_ARGUMENT\n\tdetails = \"Invalid predicate\"\n\tdebug_error_string = \"UNKNOWN:Error received from peer {grpc_message:\"Invalid predicate\", grpc_status:3, created_time:\"2025-02-28T14:43:35.637285+01:00\"}\"\n>"} ``` ```scala throw new DASSdkUnauthenticatedException("Who are you?") ``` ``` WARNING: gRPC attempting registration and retry... ERROR: Who are you? DETAIL: {"code": "UNAUTHENTICATED", "message": "Who are you?", "das_name": "localhost:50051", "das_type": "mock", "das_url": "localhost:50051", "table_name": "slow", "cause": "<_MultiThreadedRendezvous of RPC that terminated with:\n\tstatus = StatusCode.UNAUTHENTICATED\n\tdetails = \"Who are you?\"\n\tdebug_error_string = \"UNKNOWN:Error received from peer {grpc_message:\"Who are you?\", grpc_status:16, created_time:\"2025-02-28T14:44:37.911759+01:00\"}\"\n>"} ``` ```scala throw new DASSdkPermissionDeniedException("Forbidden") ``` ``` ERROR: Forbidden DETAIL: {"code": "PERMISSION_DENIED", "message": "Forbidden", "das_name": "localhost:50051", "das_type": "mock", "das_url": "localhost:50051", "table_name": "slow", "cause": "<_MultiThreadedRendezvous of RPC that terminated with:\n\tstatus = StatusCode.PERMISSION_DENIED\n\tdetails = \"Forbidden\"\n\tdebug_error_string = \"UNKNOWN:Error received from peer {grpc_message:\"Forbidden\", grpc_status:7, created_time:\"2025-02-28T14:46:08.608081+01:00\"}\"\n>"} ``` --------- Co-authored-by: Miguel Branco --- .../sdk/DASSdkPermissionDeniedException.java | 27 +++++++++++++ .../sdk/DASSdkUnauthenticatedException.java | 27 +++++++++++++ .../grpc/RegistrationServiceGrpcImpl.scala | 40 +++++++++++++++---- .../server/grpc/TableServiceGrpcImpl.scala | 9 ++++- .../das/server/manager/DASSdkManager.scala | 17 ++++---- 5 files changed, 102 insertions(+), 18 deletions(-) create mode 100644 src/main/java/com/rawlabs/das/sdk/DASSdkPermissionDeniedException.java create mode 100644 src/main/java/com/rawlabs/das/sdk/DASSdkUnauthenticatedException.java diff --git a/src/main/java/com/rawlabs/das/sdk/DASSdkPermissionDeniedException.java b/src/main/java/com/rawlabs/das/sdk/DASSdkPermissionDeniedException.java new file mode 100644 index 0000000..20ad8e3 --- /dev/null +++ b/src/main/java/com/rawlabs/das/sdk/DASSdkPermissionDeniedException.java @@ -0,0 +1,27 @@ +/* + * Copyright 2025 RAW Labs S.A. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.txt. + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0, included in the file + * licenses/APL.txt. + */ + +package com.rawlabs.das.sdk; + +/** + * DASSdkPermissionDeniedException is thrown by DAS SDK methods when a permission denied error is to + * be reported to a user (e.g. missing required permissions, etc.). + */ +public class DASSdkPermissionDeniedException extends RuntimeException { + public DASSdkPermissionDeniedException(String message) { + super(message); + } + + public DASSdkPermissionDeniedException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/src/main/java/com/rawlabs/das/sdk/DASSdkUnauthenticatedException.java b/src/main/java/com/rawlabs/das/sdk/DASSdkUnauthenticatedException.java new file mode 100644 index 0000000..a3f0e07 --- /dev/null +++ b/src/main/java/com/rawlabs/das/sdk/DASSdkUnauthenticatedException.java @@ -0,0 +1,27 @@ +/* + * Copyright 2025 RAW Labs S.A. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.txt. + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0, included in the file + * licenses/APL.txt. + */ + +package com.rawlabs.das.sdk; + +/** + * DASSdkUnauthenticatedException is thrown by DAS SDK methods when an unauthenticated error is to + * be reported to a user (e.g. missing authentication token, invalid credentials, etc.). + */ +public class DASSdkUnauthenticatedException extends RuntimeException { + public DASSdkUnauthenticatedException(String message) { + super(message); + } + + public DASSdkUnauthenticatedException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/src/main/scala/com/rawlabs/das/server/grpc/RegistrationServiceGrpcImpl.scala b/src/main/scala/com/rawlabs/das/server/grpc/RegistrationServiceGrpcImpl.scala index a857b78..177540d 100644 --- a/src/main/scala/com/rawlabs/das/server/grpc/RegistrationServiceGrpcImpl.scala +++ b/src/main/scala/com/rawlabs/das/server/grpc/RegistrationServiceGrpcImpl.scala @@ -14,11 +14,18 @@ package com.rawlabs.das.server.grpc import scala.jdk.CollectionConverters._ +import com.rawlabs.das.sdk.{ + DASSdkInvalidArgumentException, + DASSdkPermissionDeniedException, + DASSdkUnauthenticatedException, + DASSdkUnsupportedException +} import com.rawlabs.das.server.manager.DASSdkManager import com.rawlabs.protocol.das.v1.common.DASId import com.rawlabs.protocol.das.v1.services._ import com.typesafe.scalalogging.StrictLogging +import io.grpc.Status import io.grpc.stub.StreamObserver /** @@ -38,13 +45,32 @@ class RegistrationServiceGrpcImpl(dasSdkManager: DASSdkManager) */ override def register(request: RegisterRequest, responseObserver: StreamObserver[RegisterResponse]): Unit = { logger.debug(s"Registering DAS with type: ${request.getDefinition.getType}") - val dasId = dasSdkManager.registerDAS( - request.getDefinition.getType, - request.getDefinition.getOptionsMap.asScala.toMap, - maybeDasId = if (request.hasId) Some(request.getId) else None) - responseObserver.onNext(dasId) - responseObserver.onCompleted() - logger.debug(s"DAS registered successfully with ID: $dasId") + try { + val dasId = dasSdkManager.registerDAS( + request.getDefinition.getType, + request.getDefinition.getOptionsMap.asScala.toMap, + maybeDasId = if (request.hasId) Some(request.getId) else None) + responseObserver.onNext(dasId) + responseObserver.onCompleted() + logger.debug(s"DAS registered successfully with ID: $dasId") + } catch { + case ex: DASSdkInvalidArgumentException => + logger.error("DASSdk invalid argument error", ex) + responseObserver.onError(Status.INVALID_ARGUMENT.withDescription(ex.getMessage).asRuntimeException()) + case ex: DASSdkPermissionDeniedException => + logger.error("DASSdk permission denied error", ex) + responseObserver.onError(Status.PERMISSION_DENIED.withDescription(ex.getMessage).asRuntimeException()) + case ex: DASSdkUnauthenticatedException => + logger.error("DASSdk unauthenticated error", ex) + responseObserver.onError(Status.UNAUTHENTICATED.withDescription(ex.getMessage).asRuntimeException()) + case ex: DASSdkUnsupportedException => + logger.error("DASSdk unsupported feature", ex) + responseObserver.onError(Status.UNIMPLEMENTED.withDescription(ex.getMessage).asRuntimeException()) + case t: Throwable => + logger.error("DASSdk unexpected error", t) + responseObserver.onError(Status.INTERNAL.withCause(t).asRuntimeException()) + + } } /** diff --git a/src/main/scala/com/rawlabs/das/server/grpc/TableServiceGrpcImpl.scala b/src/main/scala/com/rawlabs/das/server/grpc/TableServiceGrpcImpl.scala index 9a32713..c1ad835 100644 --- a/src/main/scala/com/rawlabs/das/server/grpc/TableServiceGrpcImpl.scala +++ b/src/main/scala/com/rawlabs/das/server/grpc/TableServiceGrpcImpl.scala @@ -425,10 +425,15 @@ class TableServiceGrpcImpl( case ex: DASSdkInvalidArgumentException => logger.error("DASSdk invalid argument error", ex) responseObserver.onError(Status.INVALID_ARGUMENT.withDescription(ex.getMessage).asRuntimeException()) + case ex: DASSdkPermissionDeniedException => + logger.error("DASSdk permission denied error", ex) + responseObserver.onError(Status.PERMISSION_DENIED.withDescription(ex.getMessage).asRuntimeException()) + case ex: DASSdkUnauthenticatedException => + logger.error("DASSdk unauthenticated error", ex) + responseObserver.onError(Status.UNAUTHENTICATED.withDescription(ex.getMessage).asRuntimeException()) case ex: DASSdkUnsupportedException => logger.error("DASSdk unsupported feature", ex) - responseObserver.onError( - Status.UNIMPLEMENTED.withDescription("Unsupported operation").withCause(ex).asRuntimeException()) + responseObserver.onError(Status.UNIMPLEMENTED.withDescription(ex.getMessage).asRuntimeException()) case t: Throwable => logger.error("DASSdk unexpected error", t) responseObserver.onError(Status.INTERNAL.withCause(t).asRuntimeException()) diff --git a/src/main/scala/com/rawlabs/das/server/manager/DASSdkManager.scala b/src/main/scala/com/rawlabs/das/server/manager/DASSdkManager.scala index c3f833a..43d86b4 100644 --- a/src/main/scala/com/rawlabs/das/server/manager/DASSdkManager.scala +++ b/src/main/scala/com/rawlabs/das/server/manager/DASSdkManager.scala @@ -19,12 +19,9 @@ import scala.jdk.CollectionConverters._ import scala.jdk.OptionConverters._ import scala.util.control.NonFatal -import com.google.common.cache.CacheBuilder -import com.google.common.cache.CacheLoader -import com.google.common.cache.RemovalNotification -import com.rawlabs.das.sdk.DASSdk -import com.rawlabs.das.sdk.DASSdkBuilder -import com.rawlabs.das.sdk.DASSettings +import com.google.common.cache.{CacheBuilder, CacheLoader, RemovalNotification} +import com.google.common.util.concurrent.UncheckedExecutionException +import com.rawlabs.das.sdk._ import com.rawlabs.protocol.das.v1.common.DASId import com.rawlabs.protocol.das.v1.services.RegisterResponse import com.typesafe.scalalogging.StrictLogging @@ -103,13 +100,15 @@ class DASSdkManager(implicit settings: DASSettings) extends StrictLogging { dasSdkCache.get(config) // If the config didn't exist, that blocks until the new DAS is loaded RegisterResponse.newBuilder().setId(dasId).build() } catch { - case NonFatal(e) => - logger.error(s"Failed to create DAS for type: $dasType with id: $dasId", e) + case e: UncheckedExecutionException => + // `dasSdkCache.get` throws that exception when an unchecked exception occurs while loading + // a missing key. Strip the Guava wrapping and rethrow the original exception. + logger.error(s"Failed to create DAS for type: $dasType with id: $dasId", e.getCause) // Remove the broken config since we failed to build the DAS dasSdkConfigCacheLock.synchronized { dasSdkConfigCache.remove(dasId) } - RegisterResponse.newBuilder().setError(e.getMessage).build() + throw e.getCause } }