Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added DASSdkInvalidArgumentException + handle it in table gRPC operations #28

Merged
merged 2 commits into from
Feb 26, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 0 additions & 23 deletions src/main/java/com/rawlabs/das/sdk/DASSdkException.java

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,15 @@
package com.rawlabs.das.sdk;

/**
* Top-level Exception. Message contains information that WILL BE shared with the end-user, so
* ensure it does not leak sensitive information.
* DASSdkInvalidArgumentException is thrown by DAS SDK methods when an invalid argument error is to
* be reported to a user (e.g. string too long during an INSERT, missing mandatory predicate, etc.).
*/
public class DASException extends RuntimeException {
public DASException(String message) {
public class DASSdkInvalidArgumentException extends RuntimeException {
public DASSdkInvalidArgumentException(String message) {
super(message);
}

public DASException(String message, Throwable cause) {
public DASSdkInvalidArgumentException(String message, Throwable cause) {
super(message, cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

package com.rawlabs.das.sdk;

public class DASSdkUnsupportedException extends DASSdkException {
public class DASSdkUnsupportedException extends RuntimeException {
public DASSdkUnsupportedException() {
super("unsupported operation");
}
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/com/rawlabs/das/sdk/DASSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public class DASSettings {
private final Config config;

/** Exception representing a settings configuration problem. */
public static class SettingsException extends DASException {
public static class SettingsException extends RuntimeException {
public SettingsException(String message, Throwable cause) {
super(message, cause);
}
Expand Down
226 changes: 86 additions & 140 deletions src/main/scala/com/rawlabs/das/server/grpc/TableServiceGrpcImpl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,24 +12,23 @@

package com.rawlabs.das.server.grpc

import scala.concurrent.ExecutionContext
import scala.concurrent.duration.{DurationInt, FiniteDuration}
import scala.jdk.CollectionConverters._
import scala.jdk.OptionConverters._
import scala.util.{Failure, Success}

import org.apache.pekko.NotUsed
import org.apache.pekko.stream.scaladsl.{Keep, Sink, Source}
import org.apache.pekko.stream.{KillSwitches, Materializer, UniqueKillSwitch}

import com.rawlabs.das.sdk.{DASExecuteResult, DASSdk, DASSdkUnsupportedException, DASTable}
import com.rawlabs.das.sdk._
import com.rawlabs.das.server.cache.{QueryCacheKey, QueryResultCache}
import com.rawlabs.das.server.manager.DASSdkManager
import com.rawlabs.protocol.das.v1.common.DASId
import com.rawlabs.protocol.das.v1.services._
import com.rawlabs.protocol.das.v1.tables._
import com.typesafe.scalalogging.StrictLogging

import _root_.scala.concurrent.ExecutionContext
import _root_.scala.concurrent.duration.{DurationInt, FiniteDuration}
import _root_.scala.jdk.CollectionConverters._
import _root_.scala.jdk.OptionConverters._
import _root_.scala.util.{Failure, Success}
import io.grpc.stub.{ServerCallStreamObserver, StreamObserver}
import io.grpc.{Status, StatusRuntimeException}

Expand Down Expand Up @@ -136,23 +135,16 @@ class TableServiceGrpcImpl(
responseObserver: StreamObserver[ExplainTableResponse]): Unit = {
logger.debug(s"Explaining query for Table ID: ${request.getTableId.getName}")
withTable(request.getDasId, request.getTableId, responseObserver) { table =>
try {
val explanation =
table.explain(
request.getQuery.getQualsList,
request.getQuery.getColumnsList,
request.getQuery.getSortKeysList,
if (request.getQuery.hasLimit) java.lang.Long.valueOf(request.getQuery.getLimit) else null)
val response = ExplainTableResponse.newBuilder().addAllStmts(explanation).build()
responseObserver.onNext(response)
responseObserver.onCompleted()
logger.debug("Query explanation sent successfully.")
} catch {
case t: Throwable =>
logger.error("Error explaining query", t)
responseObserver.onError(
Status.INVALID_ARGUMENT.withDescription("Error explaining query").withCause(t).asRuntimeException())
}
val explanation =
table.explain(
request.getQuery.getQualsList,
request.getQuery.getColumnsList,
request.getQuery.getSortKeysList,
if (request.getQuery.hasLimit) java.lang.Long.valueOf(request.getQuery.getLimit) else null)
val response = ExplainTableResponse.newBuilder().addAllStmts(explanation).build()
responseObserver.onNext(response)
responseObserver.onCompleted()
logger.debug("Query explanation sent successfully.")
}
}

Expand Down Expand Up @@ -224,48 +216,40 @@ class TableServiceGrpcImpl(
}
}

try {
val key = QueryCacheKey(request)
// Check if we have a cached result for this query
val source: Source[Rows, NotUsed] = resultCache.get(key) match {
case Some(iterator) =>
// We do. Use the iterator to build the Source.
logger.debug(s"Using cached result for $request.")
Source.fromIterator(() => iterator)
case None =>
// We don't. Run the query and build a Source that populates a new cache entry.
// We tap the source to cache the results as they are streamed to the client.
// A callback is added to the source to mark the cache entry as done when the stream completes.
logger.debug(s"Cache miss for $request.")
val source = runQuery()
val cachedResult = resultCache.newBuffer(key)
val tappingSource: Source[Rows, NotUsed] = source.map { chunk =>
cachedResult.addChunk(chunk) // This is NOP if the internal buffer is full.
chunk
}
val withCallBack = tappingSource.watchTermination() { (_, doneF) =>
doneF.onComplete {
case Success(_) =>
// Registers the entry, making it available for future queries. Unless the buffer was full. Then it's a NOP.
cachedResult.register()
case Failure(ex) =>
// If the stream fails, we don't cache the result.
logger.warn(s"Failed streaming for $request", ex)
}(ec)
}
withCallBack.mapMaterializedValue(_ => NotUsed)
}
// Run the final streaming result: pipe the source through a kill switch and to the gRPC response observer.
val ks = runStreamedResult(source, request, responseObserver, maybeServerCallObs)
// Store the kill switch so that we can cancel the stream if needed.
killSwitchRef.set(Some(ks))
} catch {
case t: Throwable =>
logger.error("Error executing query", t)
responseObserver.onError(
Status.INVALID_ARGUMENT.withDescription("Error executing query").withCause(t).asRuntimeException())
val key = QueryCacheKey(request)
// Check if we have a cached result for this query
val source: Source[Rows, NotUsed] = resultCache.get(key) match {
case Some(iterator) =>
// We do. Use the iterator to build the Source.
logger.debug(s"Using cached result for $request.")
Source.fromIterator(() => iterator)
case None =>
// We don't. Run the query and build a Source that populates a new cache entry.
// We tap the source to cache the results as they are streamed to the client.
// A callback is added to the source to mark the cache entry as done when the stream completes.
logger.debug(s"Cache miss for $request.")
val source = runQuery()
val cachedResult = resultCache.newBuffer(key)
val tappingSource: Source[Rows, NotUsed] = source.map { chunk =>
cachedResult.addChunk(chunk) // This is NOP if the internal buffer is full.
chunk
}
val withCallBack = tappingSource.watchTermination() { (_, doneF) =>
doneF.onComplete {
case Success(_) =>
// Registers the entry, making it available for future queries. Unless the buffer was full. Then it's a NOP.
cachedResult.register()
case Failure(ex) =>
// If the stream fails, we don't cache the result.
logger.warn(s"Failed streaming for $request", ex)
}(ec)
}
withCallBack.mapMaterializedValue(_ => NotUsed)
}

// Run the final streaming result: pipe the source through a kill switch and to the gRPC response observer.
val ks = runStreamedResult(source, request, responseObserver, maybeServerCallObs)
// Store the kill switch so that we can cancel the stream if needed.
killSwitchRef.set(Some(ks))
}
}

Expand Down Expand Up @@ -315,11 +299,9 @@ class TableServiceGrpcImpl(
logger.error(s"Error during streaming for planID=${request.getPlanId}.", ex)
maybeServerCallObs match {
case Some(sco) if !sco.isCancelled =>
sco.onError(
new StatusRuntimeException(Status.INTERNAL.withDescription(s"Error during streaming: ${ex.getMessage}")))
sco.onError(new StatusRuntimeException(Status.INTERNAL.withCause(ex)))
case _ =>
responseObserver.onError(
new StatusRuntimeException(Status.INTERNAL.withDescription(s"Error during streaming: ${ex.getMessage}")))
responseObserver.onError(new StatusRuntimeException(Status.INTERNAL.withCause(ex)))
// If cancelled, no need to call onError (client is gone).
}
}(ec)
Expand All @@ -339,20 +321,10 @@ class TableServiceGrpcImpl(
responseObserver: StreamObserver[GetTableUniqueColumnResponse]): Unit = {
logger.debug(s"Fetching unique columns for Table ID: ${request.getTableId.getName}")
withTable(request.getDasId, request.getTableId, responseObserver) { table =>
try {
val response = GetTableUniqueColumnResponse.newBuilder().setColumn(table.uniqueColumn).build()
responseObserver.onNext(response)
responseObserver.onCompleted()
logger.debug("Unique column information sent successfully.")
} catch {
case t: DASSdkUnsupportedException =>
responseObserver.onError(
Status.INVALID_ARGUMENT.withDescription("Unsupported operation").withCause(t).asRuntimeException())
case t: Throwable =>
logger.error("Error fetching unique column", t)
responseObserver.onError(
Status.INTERNAL.withDescription("Error fetching unique column").withCause(t).asRuntimeException())
}
val response = GetTableUniqueColumnResponse.newBuilder().setColumn(table.uniqueColumn).build()
responseObserver.onNext(response)
responseObserver.onCompleted()
logger.debug("Unique column information sent successfully.")
}
}

Expand Down Expand Up @@ -384,20 +356,10 @@ class TableServiceGrpcImpl(
override def insertTable(request: InsertTableRequest, responseObserver: StreamObserver[InsertTableResponse]): Unit = {
logger.debug(s"Inserting row into Table ID: ${request.getTableId.getName}")
withTable(request.getDasId, request.getTableId, responseObserver) { table =>
try {
val row = table.insert(request.getRow)
responseObserver.onNext(InsertTableResponse.newBuilder().setRow(row).build())
responseObserver.onCompleted()
logger.debug("Row inserted successfully.")
} catch {
case t: DASSdkUnsupportedException =>
responseObserver.onError(
Status.UNIMPLEMENTED.withDescription("Unsupported operation").withCause(t).asRuntimeException())
case t: Throwable =>
logger.error("Error inserting row", t)
responseObserver.onError(
Status.INVALID_ARGUMENT.withDescription("Error inserting row").withCause(t).asRuntimeException())
}
val row = table.insert(request.getRow)
responseObserver.onNext(InsertTableResponse.newBuilder().setRow(row).build())
responseObserver.onCompleted()
logger.debug("Row inserted successfully.")
}
}

Expand All @@ -412,20 +374,10 @@ class TableServiceGrpcImpl(
responseObserver: StreamObserver[BulkInsertTableResponse]): Unit = {
logger.debug(s"Performing bulk insert into Table ID: ${request.getTableId.getName}")
withTable(request.getDasId, request.getTableId, responseObserver) { table =>
try {
val rows = table.bulkInsert(request.getRowsList)
responseObserver.onNext(BulkInsertTableResponse.newBuilder().addAllRows(rows).build())
responseObserver.onCompleted()
logger.debug("Bulk insert completed successfully.")
} catch {
case t: DASSdkUnsupportedException =>
responseObserver.onError(
Status.UNIMPLEMENTED.withDescription("Unsupported operation").withCause(t).asRuntimeException())
case t: Throwable =>
logger.error("Error inserting rows", t)
responseObserver.onError(
Status.INVALID_ARGUMENT.withDescription("Error inserting rows").withCause(t).asRuntimeException())
}
val rows = table.bulkInsert(request.getRowsList)
responseObserver.onNext(BulkInsertTableResponse.newBuilder().addAllRows(rows).build())
responseObserver.onCompleted()
logger.debug("Bulk insert completed successfully.")
}
}

Expand All @@ -438,20 +390,10 @@ class TableServiceGrpcImpl(
override def updateTable(request: UpdateTableRequest, responseObserver: StreamObserver[UpdateTableResponse]): Unit = {
logger.debug(s"Updating rows in Table ID: ${request.getTableId.getName}")
withTable(request.getDasId, request.getTableId, responseObserver) { table =>
try {
val newRow = table.update(request.getRowId, request.getNewRow)
responseObserver.onNext(UpdateTableResponse.newBuilder().setRow(newRow).build())
responseObserver.onCompleted()
logger.debug("Rows updated successfully.")
} catch {
case t: DASSdkUnsupportedException =>
responseObserver.onError(
Status.UNIMPLEMENTED.withDescription("Unsupported operation").withCause(t).asRuntimeException())
case t: Throwable =>
logger.error("Error updating row", t)
responseObserver.onError(
Status.INVALID_ARGUMENT.withDescription("Error updating row").withCause(t).asRuntimeException())
}
val newRow = table.update(request.getRowId, request.getNewRow)
responseObserver.onNext(UpdateTableResponse.newBuilder().setRow(newRow).build())
responseObserver.onCompleted()
logger.debug("Rows updated successfully.")
}
}

Expand All @@ -464,20 +406,10 @@ class TableServiceGrpcImpl(
override def deleteTable(request: DeleteTableRequest, responseObserver: StreamObserver[DeleteTableResponse]): Unit = {
logger.debug(s"Deleting rows from Table ID: ${request.getTableId.getName}")
withTable(request.getDasId, request.getTableId, responseObserver) { table =>
try {
table.delete(request.getRowId)
responseObserver.onNext(DeleteTableResponse.getDefaultInstance)
responseObserver.onCompleted()
logger.debug("Rows deleted successfully.")
} catch {
case t: DASSdkUnsupportedException =>
responseObserver.onError(
Status.UNIMPLEMENTED.withDescription("Unsupported operation").withCause(t).asRuntimeException())
case t: Throwable =>
logger.error("Error deleting row", t)
responseObserver.onError(
Status.INVALID_ARGUMENT.withDescription("Error deleting row").withCause(t).asRuntimeException())
}
table.delete(request.getRowId)
responseObserver.onNext(DeleteTableResponse.getDefaultInstance)
responseObserver.onCompleted()
logger.debug("Rows deleted successfully.")
}
}

Expand All @@ -486,7 +418,21 @@ class TableServiceGrpcImpl(
case None =>
// We use 'NOT_FOUND' so that the client doesn't confuse that error with a user-visible error.
responseObserver.onError(Status.NOT_FOUND.withDescription("DAS not found").asRuntimeException())
case Some(das) => f(das)
case Some(das) =>
try {
f(das)
} catch {
case ex: DASSdkInvalidArgumentException =>
logger.error("DASSdk invalid argument error", ex)
responseObserver.onError(Status.INVALID_ARGUMENT.withDescription(ex.getMessage).asRuntimeException())
case ex: DASSdkUnsupportedException =>
logger.error("DASSdk unsupported feature", ex)
responseObserver.onError(
Status.UNIMPLEMENTED.withDescription("Unsupported operation").withCause(ex).asRuntimeException())
case t: Throwable =>
logger.error("DASSdk unexpected error", t)
responseObserver.onError(Status.INTERNAL.withCause(t).asRuntimeException())
}
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/test/scala/com/rawlabs/das/mock/DASMockEventTable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ import java.time.LocalDate

import scala.collection.mutable

import com.rawlabs.das.sdk.DASExecuteResult
import com.rawlabs.das.sdk.scala.DASTable
import com.rawlabs.das.sdk.{DASExecuteResult, DASSdkInvalidArgumentException}
import com.rawlabs.protocol.das.v1.query.{Operator, Qual, SortKey}
import com.rawlabs.protocol.das.v1.tables.{Column, Row}
import com.rawlabs.protocol.das.v1.types.{Value, ValueDate, ValueString}
Expand Down Expand Up @@ -124,7 +124,7 @@ class DASMockEventTable extends DASTable with StrictLogging {

private def checkAndMkDate(v: ValueDate) = {
// Introduce an artificial limitation to only support dates > 2000
if (v.getYear <= 2000) throw new IllegalArgumentException(s"Invalid date: $v (only > 2000 supported)")
if (v.getYear <= 2000) throw new DASSdkInvalidArgumentException(s"Invalid date: $v (only > 2000 supported)")
LocalDate.of(v.getYear, v.getMonth, v.getDay)
}

Expand Down