Skip to content

Commit

Permalink
Add (and process at the top) SDK specific exceptions to distinguish e…
Browse files Browse the repository at this point in the history
…rrors (#29)

* Added a couple needed new exceptions for an SDK to report specific
errors,
* Catch the exceptions in the server to rewrite them as gRPC errors,
* Edited `DASSdkManager` to catch the guava cache exception one gets if
the SDK throws an exception, so that the SDK exception propagates.
Otherwise, whichever exception propagates. Therefore I removed the
`.setError` code to be consistent with the idea that errors are reported
through gRPC errors.
> [!CAUTION]
> I believe we never hit `REGISTRATION_FAILED` in psql, since that one
is made up from `.error` instead of a `gRPC` failure. Maybe for backward
compatibility we can instead leave it?

Manually tried against that version of multicorn-das:
raw-labs/multicorn-das#27 which modifies error
handling during registration. With errors triggered in `DASMock` or
`DASMockTable`:

If for example DASMock throws a `DASSdkUnauthenticatedException` in its
initializer. That mimics a DAS instance that performs some
authentication early
on, verifies the password, etc.
```scala
class DASMock(options: Map[String, String]) extends DASSdk with StrictLogging {

  options.keys.foreach(key => logger.info(s"Option: $key = ${options(key)}"))

  private val dasMockStorage = new DASMockStorage("column1")
  throw new DASSdkUnauthenticatedException("Wrong password") // HERE
```
The effect is:
```
ERROR:  Wrong password
DETAIL:  {"code": "UNAUTHENTICATED", "message": "Wrong password", "das_name":
"localhost:50051", "das_type": "mock", "das_url": "localhost:50051",
"table_name": "slow", "cause": "<_InactiveRpcError of RPC that terminated
with:\n\tstatus = StatusCode.UNAUTHENTICATED\n\tdetails = \"Wrong
password\"\n\tdebug_error_string = \"UNKNOWN:Error received from peer
{created_time:\"2025-02-28T14:35:21.707372+01:00\", grpc_status:16,
grpc_message:\"Wrong password\"}\"\n>"}
```
Similar with an invalid option.
```scala
throw new DASSdkInvalidArgumentException("Invalid options: missing an important key")
```
```
ERROR:  Invalid options: missing an important key
DETAIL:  {"code": "INVALID_ARGUMENT", "message": "Invalid options: missing an
important key", "das_name": "localhost:50051", "das_type": "mock", "das_url":
"localhost:50051", "table_name": "slow", "cause": "<_InactiveRpcError of RPC
that terminated with:\n\tstatus = StatusCode.INVALID_ARGUMENT\n\tdetails =
\"Invalid options: missing an important key\"\n\tdebug_error_string =
\"UNKNOWN:Error received from peer
{created_time:\"2025-02-28T14:39:34.398676+01:00\", grpc_status:3,
grpc_message:\"Invalid options: missing an important key\"}\"\n>"}
```
If the error is unexpected (e.g. a bug in the DAS initializer,
`NullPointerException`). The error is `INTERNAL`.
```
ERROR:  gRPC error calling remote DAS server
DETAIL:  {"code": "INTERNAL", "message": "gRPC error calling remote DAS
server", "das_name": null, "das_type": null, "das_url": null, "table_name":
null, "cause": "<_InactiveRpcError of RPC that terminated with:\n\tstatus =
StatusCode.INTERNAL\n\tdetails = \"\"\n\tdebug_error_string = \"UNKNOWN:Error
received from peer  {created_time:\"2025-02-28T14:38:26.791684+01:00\",
grpc_status:13, grpc_message:\"\"}\"\n>"}
```
And if the DAS server is down, after 30 attempts:
```
ERROR:  Server unavailable
DETAIL:  {"code": "UNAVAILABLE", "message": "Server unavailable", "das_name":
"localhost:50051", "das_type": "mock", "das_url": "localhost:50051",
"table_name": "slow", "cause": "<_InactiveRpcError of RPC that terminated
with:\n\tstatus = StatusCode.UNAVAILABLE\n\tdetails = \"failed to connect to
all addresses; last error: UNKNOWN: ipv4:127.0.0.1:50051: Failed to connect to
remote host: connect: Connection refused (61)\"\n\tdebug_error_string =
\"UNKNOWN:Error received from peer  {grpc_message:\"failed to connect to all
addresses; last error: UNKNOWN: ipv4:127.0.0.1:50051: Failed to connect to
remote host: connect: Connection refused (61)\", grpc_status:14,
created_time:\"2025-02-28T14:41:27.460325+01:00\"}\"\n>"}
```

If the error occurs after registration, e.g. when running `.execute` of
a `DASTable`.
```scala
    throw new DASSdkInvalidArgumentException("Invalid predicate")
```
```
ERROR:  Invalid predicate
DETAIL:  {"code": "INVALID_ARGUMENT", "message": "Invalid predicate",
"das_name": "localhost:50051", "das_type": "mock", "das_url":
"localhost:50051", "table_name": "slow", "cause": "<_MultiThreadedRendezvous of
RPC that terminated with:\n\tstatus = StatusCode.INVALID_ARGUMENT\n\tdetails =
\"Invalid predicate\"\n\tdebug_error_string = \"UNKNOWN:Error received from
peer  {grpc_message:\"Invalid predicate\", grpc_status:3,
created_time:\"2025-02-28T14:43:35.637285+01:00\"}\"\n>"}
```
```scala
    throw new DASSdkUnauthenticatedException("Who are you?")
```
```
WARNING:  gRPC attempting registration and retry...
ERROR:  Who are you?
DETAIL:  {"code": "UNAUTHENTICATED", "message": "Who are you?", "das_name":
"localhost:50051", "das_type": "mock", "das_url": "localhost:50051",
"table_name": "slow", "cause": "<_MultiThreadedRendezvous of RPC that
terminated with:\n\tstatus = StatusCode.UNAUTHENTICATED\n\tdetails = \"Who are
you?\"\n\tdebug_error_string = \"UNKNOWN:Error received from peer
{grpc_message:\"Who are you?\", grpc_status:16,
created_time:\"2025-02-28T14:44:37.911759+01:00\"}\"\n>"}
```
```scala
    throw new DASSdkPermissionDeniedException("Forbidden")
```
```
ERROR:  Forbidden
DETAIL:  {"code": "PERMISSION_DENIED", "message": "Forbidden", "das_name":
"localhost:50051", "das_type": "mock", "das_url": "localhost:50051",
"table_name": "slow", "cause": "<_MultiThreadedRendezvous of RPC that
terminated with:\n\tstatus = StatusCode.PERMISSION_DENIED\n\tdetails =
\"Forbidden\"\n\tdebug_error_string = \"UNKNOWN:Error received from peer
{grpc_message:\"Forbidden\", grpc_status:7,
created_time:\"2025-02-28T14:46:08.608081+01:00\"}\"\n>"}
```

---------

Co-authored-by: Miguel Branco <miguel@raw-labs.com>
  • Loading branch information
bgaidioz and miguelbranco80 authored Feb 28, 2025
1 parent d4f30d0 commit 6b45fd3
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 18 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright 2025 RAW Labs S.A.
*
* Use of this software is governed by the Business Source License
* included in the file licenses/BSL.txt.
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0, included in the file
* licenses/APL.txt.
*/

package com.rawlabs.das.sdk;

/**
* DASSdkPermissionDeniedException is thrown by DAS SDK methods when a permission denied error is to
* be reported to a user (e.g. missing required permissions, etc.).
*/
public class DASSdkPermissionDeniedException extends RuntimeException {
public DASSdkPermissionDeniedException(String message) {
super(message);
}

public DASSdkPermissionDeniedException(String message, Throwable cause) {
super(message, cause);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright 2025 RAW Labs S.A.
*
* Use of this software is governed by the Business Source License
* included in the file licenses/BSL.txt.
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0, included in the file
* licenses/APL.txt.
*/

package com.rawlabs.das.sdk;

/**
* DASSdkUnauthenticatedException is thrown by DAS SDK methods when an unauthenticated error is to
* be reported to a user (e.g. missing authentication token, invalid credentials, etc.).
*/
public class DASSdkUnauthenticatedException extends RuntimeException {
public DASSdkUnauthenticatedException(String message) {
super(message);
}

public DASSdkUnauthenticatedException(String message, Throwable cause) {
super(message, cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,18 @@ package com.rawlabs.das.server.grpc

import scala.jdk.CollectionConverters._

import com.rawlabs.das.sdk.{
DASSdkInvalidArgumentException,
DASSdkPermissionDeniedException,
DASSdkUnauthenticatedException,
DASSdkUnsupportedException
}
import com.rawlabs.das.server.manager.DASSdkManager
import com.rawlabs.protocol.das.v1.common.DASId
import com.rawlabs.protocol.das.v1.services._
import com.typesafe.scalalogging.StrictLogging

import io.grpc.Status
import io.grpc.stub.StreamObserver

/**
Expand All @@ -38,13 +45,32 @@ class RegistrationServiceGrpcImpl(dasSdkManager: DASSdkManager)
*/
override def register(request: RegisterRequest, responseObserver: StreamObserver[RegisterResponse]): Unit = {
logger.debug(s"Registering DAS with type: ${request.getDefinition.getType}")
val dasId = dasSdkManager.registerDAS(
request.getDefinition.getType,
request.getDefinition.getOptionsMap.asScala.toMap,
maybeDasId = if (request.hasId) Some(request.getId) else None)
responseObserver.onNext(dasId)
responseObserver.onCompleted()
logger.debug(s"DAS registered successfully with ID: $dasId")
try {
val dasId = dasSdkManager.registerDAS(
request.getDefinition.getType,
request.getDefinition.getOptionsMap.asScala.toMap,
maybeDasId = if (request.hasId) Some(request.getId) else None)
responseObserver.onNext(dasId)
responseObserver.onCompleted()
logger.debug(s"DAS registered successfully with ID: $dasId")
} catch {
case ex: DASSdkInvalidArgumentException =>
logger.error("DASSdk invalid argument error", ex)
responseObserver.onError(Status.INVALID_ARGUMENT.withDescription(ex.getMessage).asRuntimeException())
case ex: DASSdkPermissionDeniedException =>
logger.error("DASSdk permission denied error", ex)
responseObserver.onError(Status.PERMISSION_DENIED.withDescription(ex.getMessage).asRuntimeException())
case ex: DASSdkUnauthenticatedException =>
logger.error("DASSdk unauthenticated error", ex)
responseObserver.onError(Status.UNAUTHENTICATED.withDescription(ex.getMessage).asRuntimeException())
case ex: DASSdkUnsupportedException =>
logger.error("DASSdk unsupported feature", ex)
responseObserver.onError(Status.UNIMPLEMENTED.withDescription(ex.getMessage).asRuntimeException())
case t: Throwable =>
logger.error("DASSdk unexpected error", t)
responseObserver.onError(Status.INTERNAL.withCause(t).asRuntimeException())

}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -425,10 +425,15 @@ class TableServiceGrpcImpl(
case ex: DASSdkInvalidArgumentException =>
logger.error("DASSdk invalid argument error", ex)
responseObserver.onError(Status.INVALID_ARGUMENT.withDescription(ex.getMessage).asRuntimeException())
case ex: DASSdkPermissionDeniedException =>
logger.error("DASSdk permission denied error", ex)
responseObserver.onError(Status.PERMISSION_DENIED.withDescription(ex.getMessage).asRuntimeException())
case ex: DASSdkUnauthenticatedException =>
logger.error("DASSdk unauthenticated error", ex)
responseObserver.onError(Status.UNAUTHENTICATED.withDescription(ex.getMessage).asRuntimeException())
case ex: DASSdkUnsupportedException =>
logger.error("DASSdk unsupported feature", ex)
responseObserver.onError(
Status.UNIMPLEMENTED.withDescription("Unsupported operation").withCause(ex).asRuntimeException())
responseObserver.onError(Status.UNIMPLEMENTED.withDescription(ex.getMessage).asRuntimeException())
case t: Throwable =>
logger.error("DASSdk unexpected error", t)
responseObserver.onError(Status.INTERNAL.withCause(t).asRuntimeException())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,9 @@ import scala.jdk.CollectionConverters._
import scala.jdk.OptionConverters._
import scala.util.control.NonFatal

import com.google.common.cache.CacheBuilder
import com.google.common.cache.CacheLoader
import com.google.common.cache.RemovalNotification
import com.rawlabs.das.sdk.DASSdk
import com.rawlabs.das.sdk.DASSdkBuilder
import com.rawlabs.das.sdk.DASSettings
import com.google.common.cache.{CacheBuilder, CacheLoader, RemovalNotification}
import com.google.common.util.concurrent.UncheckedExecutionException
import com.rawlabs.das.sdk._
import com.rawlabs.protocol.das.v1.common.DASId
import com.rawlabs.protocol.das.v1.services.RegisterResponse
import com.typesafe.scalalogging.StrictLogging
Expand Down Expand Up @@ -103,13 +100,15 @@ class DASSdkManager(implicit settings: DASSettings) extends StrictLogging {
dasSdkCache.get(config) // If the config didn't exist, that blocks until the new DAS is loaded
RegisterResponse.newBuilder().setId(dasId).build()
} catch {
case NonFatal(e) =>
logger.error(s"Failed to create DAS for type: $dasType with id: $dasId", e)
case e: UncheckedExecutionException =>
// `dasSdkCache.get` throws that exception when an unchecked exception occurs while loading
// a missing key. Strip the Guava wrapping and rethrow the original exception.
logger.error(s"Failed to create DAS for type: $dasType with id: $dasId", e.getCause)
// Remove the broken config since we failed to build the DAS
dasSdkConfigCacheLock.synchronized {
dasSdkConfigCache.remove(dasId)
}
RegisterResponse.newBuilder().setError(e.getMessage).build()
throw e.getCause
}
}

Expand Down

0 comments on commit 6b45fd3

Please sign in to comment.