From d4f30d05e219ac930c5b3cfdb132e83c0560a29a Mon Sep 17 00:00:00 2001 From: Benjamin Gaidioz Date: Wed, 26 Feb 2025 16:43:44 +0100 Subject: [PATCH] Added DASSdkInvalidArgumentException + handle it in table gRPC operations (#28) --- .../com/rawlabs/das/sdk/DASSdkException.java | 23 -- ...va => DASSdkInvalidArgumentException.java} | 10 +- .../das/sdk/DASSdkUnsupportedException.java | 2 +- .../java/com/rawlabs/das/sdk/DASSettings.java | 2 +- .../server/grpc/TableServiceGrpcImpl.scala | 226 +++++++----------- .../rawlabs/das/mock/DASMockEventTable.scala | 4 +- 6 files changed, 95 insertions(+), 172 deletions(-) delete mode 100644 src/main/java/com/rawlabs/das/sdk/DASSdkException.java rename src/main/java/com/rawlabs/das/sdk/{DASException.java => DASSdkInvalidArgumentException.java} (53%) diff --git a/src/main/java/com/rawlabs/das/sdk/DASSdkException.java b/src/main/java/com/rawlabs/das/sdk/DASSdkException.java deleted file mode 100644 index 6133b81..0000000 --- a/src/main/java/com/rawlabs/das/sdk/DASSdkException.java +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Copyright 2024 RAW Labs S.A. - * - * Use of this software is governed by the Business Source License - * included in the file licenses/BSL.txt. - * - * As of the Change Date specified in that file, in accordance with - * the Business Source License, use of this software will be governed - * by the Apache License, Version 2.0, included in the file - * licenses/APL.txt. - */ - -package com.rawlabs.das.sdk; - -public class DASSdkException extends DASException { - public DASSdkException(String message, Throwable cause) { - super(message, cause); - } - - public DASSdkException(String message) { - super(message); - } -} diff --git a/src/main/java/com/rawlabs/das/sdk/DASException.java b/src/main/java/com/rawlabs/das/sdk/DASSdkInvalidArgumentException.java similarity index 53% rename from src/main/java/com/rawlabs/das/sdk/DASException.java rename to src/main/java/com/rawlabs/das/sdk/DASSdkInvalidArgumentException.java index deb101a..691f4e7 100644 --- a/src/main/java/com/rawlabs/das/sdk/DASException.java +++ b/src/main/java/com/rawlabs/das/sdk/DASSdkInvalidArgumentException.java @@ -13,15 +13,15 @@ package com.rawlabs.das.sdk; /** - * Top-level Exception. Message contains information that WILL BE shared with the end-user, so - * ensure it does not leak sensitive information. + * DASSdkInvalidArgumentException is thrown by DAS SDK methods when an invalid argument error is to + * be reported to a user (e.g. string too long during an INSERT, missing mandatory predicate, etc.). */ -public class DASException extends RuntimeException { - public DASException(String message) { +public class DASSdkInvalidArgumentException extends RuntimeException { + public DASSdkInvalidArgumentException(String message) { super(message); } - public DASException(String message, Throwable cause) { + public DASSdkInvalidArgumentException(String message, Throwable cause) { super(message, cause); } } diff --git a/src/main/java/com/rawlabs/das/sdk/DASSdkUnsupportedException.java b/src/main/java/com/rawlabs/das/sdk/DASSdkUnsupportedException.java index dd27ec3..1f3e490 100644 --- a/src/main/java/com/rawlabs/das/sdk/DASSdkUnsupportedException.java +++ b/src/main/java/com/rawlabs/das/sdk/DASSdkUnsupportedException.java @@ -12,7 +12,7 @@ package com.rawlabs.das.sdk; -public class DASSdkUnsupportedException extends DASSdkException { +public class DASSdkUnsupportedException extends RuntimeException { public DASSdkUnsupportedException() { super("unsupported operation"); } diff --git a/src/main/java/com/rawlabs/das/sdk/DASSettings.java b/src/main/java/com/rawlabs/das/sdk/DASSettings.java index 079e85b..adc30fc 100644 --- a/src/main/java/com/rawlabs/das/sdk/DASSettings.java +++ b/src/main/java/com/rawlabs/das/sdk/DASSettings.java @@ -35,7 +35,7 @@ public class DASSettings { private final Config config; /** Exception representing a settings configuration problem. */ - public static class SettingsException extends DASException { + public static class SettingsException extends RuntimeException { public SettingsException(String message, Throwable cause) { super(message, cause); } diff --git a/src/main/scala/com/rawlabs/das/server/grpc/TableServiceGrpcImpl.scala b/src/main/scala/com/rawlabs/das/server/grpc/TableServiceGrpcImpl.scala index a9616a0..9a32713 100644 --- a/src/main/scala/com/rawlabs/das/server/grpc/TableServiceGrpcImpl.scala +++ b/src/main/scala/com/rawlabs/das/server/grpc/TableServiceGrpcImpl.scala @@ -12,17 +12,11 @@ package com.rawlabs.das.server.grpc -import scala.concurrent.ExecutionContext -import scala.concurrent.duration.{DurationInt, FiniteDuration} -import scala.jdk.CollectionConverters._ -import scala.jdk.OptionConverters._ -import scala.util.{Failure, Success} - import org.apache.pekko.NotUsed import org.apache.pekko.stream.scaladsl.{Keep, Sink, Source} import org.apache.pekko.stream.{KillSwitches, Materializer, UniqueKillSwitch} -import com.rawlabs.das.sdk.{DASExecuteResult, DASSdk, DASSdkUnsupportedException, DASTable} +import com.rawlabs.das.sdk._ import com.rawlabs.das.server.cache.{QueryCacheKey, QueryResultCache} import com.rawlabs.das.server.manager.DASSdkManager import com.rawlabs.protocol.das.v1.common.DASId @@ -30,6 +24,11 @@ import com.rawlabs.protocol.das.v1.services._ import com.rawlabs.protocol.das.v1.tables._ import com.typesafe.scalalogging.StrictLogging +import _root_.scala.concurrent.ExecutionContext +import _root_.scala.concurrent.duration.{DurationInt, FiniteDuration} +import _root_.scala.jdk.CollectionConverters._ +import _root_.scala.jdk.OptionConverters._ +import _root_.scala.util.{Failure, Success} import io.grpc.stub.{ServerCallStreamObserver, StreamObserver} import io.grpc.{Status, StatusRuntimeException} @@ -136,23 +135,16 @@ class TableServiceGrpcImpl( responseObserver: StreamObserver[ExplainTableResponse]): Unit = { logger.debug(s"Explaining query for Table ID: ${request.getTableId.getName}") withTable(request.getDasId, request.getTableId, responseObserver) { table => - try { - val explanation = - table.explain( - request.getQuery.getQualsList, - request.getQuery.getColumnsList, - request.getQuery.getSortKeysList, - if (request.getQuery.hasLimit) java.lang.Long.valueOf(request.getQuery.getLimit) else null) - val response = ExplainTableResponse.newBuilder().addAllStmts(explanation).build() - responseObserver.onNext(response) - responseObserver.onCompleted() - logger.debug("Query explanation sent successfully.") - } catch { - case t: Throwable => - logger.error("Error explaining query", t) - responseObserver.onError( - Status.INVALID_ARGUMENT.withDescription("Error explaining query").withCause(t).asRuntimeException()) - } + val explanation = + table.explain( + request.getQuery.getQualsList, + request.getQuery.getColumnsList, + request.getQuery.getSortKeysList, + if (request.getQuery.hasLimit) java.lang.Long.valueOf(request.getQuery.getLimit) else null) + val response = ExplainTableResponse.newBuilder().addAllStmts(explanation).build() + responseObserver.onNext(response) + responseObserver.onCompleted() + logger.debug("Query explanation sent successfully.") } } @@ -224,48 +216,40 @@ class TableServiceGrpcImpl( } } - try { - val key = QueryCacheKey(request) - // Check if we have a cached result for this query - val source: Source[Rows, NotUsed] = resultCache.get(key) match { - case Some(iterator) => - // We do. Use the iterator to build the Source. - logger.debug(s"Using cached result for $request.") - Source.fromIterator(() => iterator) - case None => - // We don't. Run the query and build a Source that populates a new cache entry. - // We tap the source to cache the results as they are streamed to the client. - // A callback is added to the source to mark the cache entry as done when the stream completes. - logger.debug(s"Cache miss for $request.") - val source = runQuery() - val cachedResult = resultCache.newBuffer(key) - val tappingSource: Source[Rows, NotUsed] = source.map { chunk => - cachedResult.addChunk(chunk) // This is NOP if the internal buffer is full. - chunk - } - val withCallBack = tappingSource.watchTermination() { (_, doneF) => - doneF.onComplete { - case Success(_) => - // Registers the entry, making it available for future queries. Unless the buffer was full. Then it's a NOP. - cachedResult.register() - case Failure(ex) => - // If the stream fails, we don't cache the result. - logger.warn(s"Failed streaming for $request", ex) - }(ec) - } - withCallBack.mapMaterializedValue(_ => NotUsed) - } - // Run the final streaming result: pipe the source through a kill switch and to the gRPC response observer. - val ks = runStreamedResult(source, request, responseObserver, maybeServerCallObs) - // Store the kill switch so that we can cancel the stream if needed. - killSwitchRef.set(Some(ks)) - } catch { - case t: Throwable => - logger.error("Error executing query", t) - responseObserver.onError( - Status.INVALID_ARGUMENT.withDescription("Error executing query").withCause(t).asRuntimeException()) + val key = QueryCacheKey(request) + // Check if we have a cached result for this query + val source: Source[Rows, NotUsed] = resultCache.get(key) match { + case Some(iterator) => + // We do. Use the iterator to build the Source. + logger.debug(s"Using cached result for $request.") + Source.fromIterator(() => iterator) + case None => + // We don't. Run the query and build a Source that populates a new cache entry. + // We tap the source to cache the results as they are streamed to the client. + // A callback is added to the source to mark the cache entry as done when the stream completes. + logger.debug(s"Cache miss for $request.") + val source = runQuery() + val cachedResult = resultCache.newBuffer(key) + val tappingSource: Source[Rows, NotUsed] = source.map { chunk => + cachedResult.addChunk(chunk) // This is NOP if the internal buffer is full. + chunk + } + val withCallBack = tappingSource.watchTermination() { (_, doneF) => + doneF.onComplete { + case Success(_) => + // Registers the entry, making it available for future queries. Unless the buffer was full. Then it's a NOP. + cachedResult.register() + case Failure(ex) => + // If the stream fails, we don't cache the result. + logger.warn(s"Failed streaming for $request", ex) + }(ec) + } + withCallBack.mapMaterializedValue(_ => NotUsed) } - + // Run the final streaming result: pipe the source through a kill switch and to the gRPC response observer. + val ks = runStreamedResult(source, request, responseObserver, maybeServerCallObs) + // Store the kill switch so that we can cancel the stream if needed. + killSwitchRef.set(Some(ks)) } } @@ -315,11 +299,9 @@ class TableServiceGrpcImpl( logger.error(s"Error during streaming for planID=${request.getPlanId}.", ex) maybeServerCallObs match { case Some(sco) if !sco.isCancelled => - sco.onError( - new StatusRuntimeException(Status.INTERNAL.withDescription(s"Error during streaming: ${ex.getMessage}"))) + sco.onError(new StatusRuntimeException(Status.INTERNAL.withCause(ex))) case _ => - responseObserver.onError( - new StatusRuntimeException(Status.INTERNAL.withDescription(s"Error during streaming: ${ex.getMessage}"))) + responseObserver.onError(new StatusRuntimeException(Status.INTERNAL.withCause(ex))) // If cancelled, no need to call onError (client is gone). } }(ec) @@ -339,20 +321,10 @@ class TableServiceGrpcImpl( responseObserver: StreamObserver[GetTableUniqueColumnResponse]): Unit = { logger.debug(s"Fetching unique columns for Table ID: ${request.getTableId.getName}") withTable(request.getDasId, request.getTableId, responseObserver) { table => - try { - val response = GetTableUniqueColumnResponse.newBuilder().setColumn(table.uniqueColumn).build() - responseObserver.onNext(response) - responseObserver.onCompleted() - logger.debug("Unique column information sent successfully.") - } catch { - case t: DASSdkUnsupportedException => - responseObserver.onError( - Status.INVALID_ARGUMENT.withDescription("Unsupported operation").withCause(t).asRuntimeException()) - case t: Throwable => - logger.error("Error fetching unique column", t) - responseObserver.onError( - Status.INTERNAL.withDescription("Error fetching unique column").withCause(t).asRuntimeException()) - } + val response = GetTableUniqueColumnResponse.newBuilder().setColumn(table.uniqueColumn).build() + responseObserver.onNext(response) + responseObserver.onCompleted() + logger.debug("Unique column information sent successfully.") } } @@ -384,20 +356,10 @@ class TableServiceGrpcImpl( override def insertTable(request: InsertTableRequest, responseObserver: StreamObserver[InsertTableResponse]): Unit = { logger.debug(s"Inserting row into Table ID: ${request.getTableId.getName}") withTable(request.getDasId, request.getTableId, responseObserver) { table => - try { - val row = table.insert(request.getRow) - responseObserver.onNext(InsertTableResponse.newBuilder().setRow(row).build()) - responseObserver.onCompleted() - logger.debug("Row inserted successfully.") - } catch { - case t: DASSdkUnsupportedException => - responseObserver.onError( - Status.UNIMPLEMENTED.withDescription("Unsupported operation").withCause(t).asRuntimeException()) - case t: Throwable => - logger.error("Error inserting row", t) - responseObserver.onError( - Status.INVALID_ARGUMENT.withDescription("Error inserting row").withCause(t).asRuntimeException()) - } + val row = table.insert(request.getRow) + responseObserver.onNext(InsertTableResponse.newBuilder().setRow(row).build()) + responseObserver.onCompleted() + logger.debug("Row inserted successfully.") } } @@ -412,20 +374,10 @@ class TableServiceGrpcImpl( responseObserver: StreamObserver[BulkInsertTableResponse]): Unit = { logger.debug(s"Performing bulk insert into Table ID: ${request.getTableId.getName}") withTable(request.getDasId, request.getTableId, responseObserver) { table => - try { - val rows = table.bulkInsert(request.getRowsList) - responseObserver.onNext(BulkInsertTableResponse.newBuilder().addAllRows(rows).build()) - responseObserver.onCompleted() - logger.debug("Bulk insert completed successfully.") - } catch { - case t: DASSdkUnsupportedException => - responseObserver.onError( - Status.UNIMPLEMENTED.withDescription("Unsupported operation").withCause(t).asRuntimeException()) - case t: Throwable => - logger.error("Error inserting rows", t) - responseObserver.onError( - Status.INVALID_ARGUMENT.withDescription("Error inserting rows").withCause(t).asRuntimeException()) - } + val rows = table.bulkInsert(request.getRowsList) + responseObserver.onNext(BulkInsertTableResponse.newBuilder().addAllRows(rows).build()) + responseObserver.onCompleted() + logger.debug("Bulk insert completed successfully.") } } @@ -438,20 +390,10 @@ class TableServiceGrpcImpl( override def updateTable(request: UpdateTableRequest, responseObserver: StreamObserver[UpdateTableResponse]): Unit = { logger.debug(s"Updating rows in Table ID: ${request.getTableId.getName}") withTable(request.getDasId, request.getTableId, responseObserver) { table => - try { - val newRow = table.update(request.getRowId, request.getNewRow) - responseObserver.onNext(UpdateTableResponse.newBuilder().setRow(newRow).build()) - responseObserver.onCompleted() - logger.debug("Rows updated successfully.") - } catch { - case t: DASSdkUnsupportedException => - responseObserver.onError( - Status.UNIMPLEMENTED.withDescription("Unsupported operation").withCause(t).asRuntimeException()) - case t: Throwable => - logger.error("Error updating row", t) - responseObserver.onError( - Status.INVALID_ARGUMENT.withDescription("Error updating row").withCause(t).asRuntimeException()) - } + val newRow = table.update(request.getRowId, request.getNewRow) + responseObserver.onNext(UpdateTableResponse.newBuilder().setRow(newRow).build()) + responseObserver.onCompleted() + logger.debug("Rows updated successfully.") } } @@ -464,20 +406,10 @@ class TableServiceGrpcImpl( override def deleteTable(request: DeleteTableRequest, responseObserver: StreamObserver[DeleteTableResponse]): Unit = { logger.debug(s"Deleting rows from Table ID: ${request.getTableId.getName}") withTable(request.getDasId, request.getTableId, responseObserver) { table => - try { - table.delete(request.getRowId) - responseObserver.onNext(DeleteTableResponse.getDefaultInstance) - responseObserver.onCompleted() - logger.debug("Rows deleted successfully.") - } catch { - case t: DASSdkUnsupportedException => - responseObserver.onError( - Status.UNIMPLEMENTED.withDescription("Unsupported operation").withCause(t).asRuntimeException()) - case t: Throwable => - logger.error("Error deleting row", t) - responseObserver.onError( - Status.INVALID_ARGUMENT.withDescription("Error deleting row").withCause(t).asRuntimeException()) - } + table.delete(request.getRowId) + responseObserver.onNext(DeleteTableResponse.getDefaultInstance) + responseObserver.onCompleted() + logger.debug("Rows deleted successfully.") } } @@ -486,7 +418,21 @@ class TableServiceGrpcImpl( case None => // We use 'NOT_FOUND' so that the client doesn't confuse that error with a user-visible error. responseObserver.onError(Status.NOT_FOUND.withDescription("DAS not found").asRuntimeException()) - case Some(das) => f(das) + case Some(das) => + try { + f(das) + } catch { + case ex: DASSdkInvalidArgumentException => + logger.error("DASSdk invalid argument error", ex) + responseObserver.onError(Status.INVALID_ARGUMENT.withDescription(ex.getMessage).asRuntimeException()) + case ex: DASSdkUnsupportedException => + logger.error("DASSdk unsupported feature", ex) + responseObserver.onError( + Status.UNIMPLEMENTED.withDescription("Unsupported operation").withCause(ex).asRuntimeException()) + case t: Throwable => + logger.error("DASSdk unexpected error", t) + responseObserver.onError(Status.INTERNAL.withCause(t).asRuntimeException()) + } } } diff --git a/src/test/scala/com/rawlabs/das/mock/DASMockEventTable.scala b/src/test/scala/com/rawlabs/das/mock/DASMockEventTable.scala index 940104d..d212aa5 100644 --- a/src/test/scala/com/rawlabs/das/mock/DASMockEventTable.scala +++ b/src/test/scala/com/rawlabs/das/mock/DASMockEventTable.scala @@ -16,8 +16,8 @@ import java.time.LocalDate import scala.collection.mutable -import com.rawlabs.das.sdk.DASExecuteResult import com.rawlabs.das.sdk.scala.DASTable +import com.rawlabs.das.sdk.{DASExecuteResult, DASSdkInvalidArgumentException} import com.rawlabs.protocol.das.v1.query.{Operator, Qual, SortKey} import com.rawlabs.protocol.das.v1.tables.{Column, Row} import com.rawlabs.protocol.das.v1.types.{Value, ValueDate, ValueString} @@ -124,7 +124,7 @@ class DASMockEventTable extends DASTable with StrictLogging { private def checkAndMkDate(v: ValueDate) = { // Introduce an artificial limitation to only support dates > 2000 - if (v.getYear <= 2000) throw new IllegalArgumentException(s"Invalid date: $v (only > 2000 supported)") + if (v.getYear <= 2000) throw new DASSdkInvalidArgumentException(s"Invalid date: $v (only > 2000 supported)") LocalDate.of(v.getYear, v.getMonth, v.getDay) }