Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ErrorConfig to hold errors contacting locations. #484

Merged
merged 6 commits into from
Aug 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions protocol/src/main/protobuf/raw/protocol/protocol.proto
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ message LocationConfig {
DropboxUsernamePasswordConfig dropboxUsernamePassword = 14;
HttpHeadersConfig httpHeaders = 15;
SecretConfig secret = 99;
ErrorConfig error = 9999;
}
}

Expand Down Expand Up @@ -145,4 +146,8 @@ message HttpHeadersConfig {
message SecretConfig {
string name = 1;
string value = 2;
}

message ErrorConfig {
string message = 1;
}
Original file line number Diff line number Diff line change
Expand Up @@ -574,7 +574,7 @@ object LocationDescription extends StrictLogging {
val delay = parser.getDuration("delay").toMillis
urlToLocationDescription(delegateUri, programEnvironment).right.map(MockPathLocationDescription(delay, _))
} catch {
case _: ConfigException => Left(s"not a mock location: $url")
case _: ConfigException => Left("not a mock location")
}
}
case "s3" =>
Expand Down Expand Up @@ -624,6 +624,8 @@ object LocationDescription extends StrictLogging {
objectKey
)
)
case Some(l) if l.hasError => Left(l.getError.getMessage)
case Some(_) => Left("not a S3 credential")
case None =>
// Anonymous access.
Right(S3PathLocationDescription(bucketName, None, None, None, objectKey))
Expand All @@ -636,6 +638,7 @@ object LocationDescription extends StrictLogging {
// In Dropbox, the host is the name of the credential
val DROPBOX_REGEX(name, path) = url
if (name == null) {
logger.warn("missing 'name' in Dropbox location")
return Left("missing Dropbox credential")
}
programEnvironment.locationConfigs.get(name) match {
Expand All @@ -651,7 +654,28 @@ object LocationDescription extends StrictLogging {
path
)
)
case None => Left("missing Dropbox credential")
case Some(l) if l.hasHttpHeaders =>
if (l.getHttpHeaders.getHeadersMap.containsKey("Authorization")) {
val splitted = l.getHttpHeaders.getHeadersMap.get("Authorization").split("Bearer ")
if (splitted.length == 2) {
Right(
DropboxAccessTokenLocationDescription(
splitted(1),
path
)
)
} else {
Left("invalid Dropbox credential")
}
} else {
logger.warn("missing Dropbox 'Authorization'")
Left("missing Dropbox credential")
}
case Some(l) if l.hasError => Left(l.getError.getMessage)
case Some(_) => Left("not a Dropbox credential")
case None =>
logger.warn("missing Dropbox credential")
Left("missing Dropbox credential")
}
case _ => Left(s"unsupported protocol: $protocol")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ class MySQLInferAndReadEntry extends SugarEntryExtension {
new MySqlTableLocation(l1.getHost, l1.getPort, l1.getDatabase, l1.getUser, l1.getPassword, table)(
programContext.settings
)
case Some(l) if l.hasError => return Left(l.getError.getMessage)
case Some(_) => return Left("not a MySQL server")
case None => return Left(s"unknown credential: $db")
}
Expand Down Expand Up @@ -394,6 +395,7 @@ class MySQLInferAndQueryEntry extends SugarEntryExtension {
new MySqlServerLocation(l1.getHost, l1.getPort, l1.getDatabase, l1.getUser, l1.getPassword)(
programContext.settings
)
case Some(l) if l.hasError => return Left(l.getError.getMessage)
case Some(_) => return Left("not a MySQL server")
case None => return Left(s"unknown credential: $db")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ class OracleInferAndReadEntry extends SugarEntryExtension {
new OracleTableLocation(l1.getHost, l1.getPort, l1.getDatabase, l1.getUser, l1.getPassword, schema, table)(
programContext.settings
)
case Some(l) if l.hasError => return Left(l.getError.getMessage)
case Some(_) => return Left("not an Oracle server")
case None => return Left(s"unknown credential: $db")
}
Expand Down Expand Up @@ -423,6 +424,7 @@ class OracleInferAndQueryEntry extends SugarEntryExtension {
new OracleServerLocation(l1.getHost, l1.getPort, l1.getDatabase, l1.getUser, l1.getPassword)(
programContext.settings
)
case Some(l) if l.hasError => return Left(l.getError.getMessage)
case Some(_) => return Left("not an Oracle server")
case None => return Left(s"unknown credential: $db")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ class PostgreSQLInferAndReadEntry extends SugarEntryExtension {
)(
programContext.settings
)
case Some(l) if l.hasError => return Left(l.getError.getMessage)
case Some(_) => return Left("not a PostgreSQL server")
case None => return Left(s"unknown credential: $db")
}
Expand Down Expand Up @@ -433,6 +434,7 @@ class PostgreSQLInferAndQueryEntry extends SugarEntryExtension {
new PostgresqlServerLocation(l1.getHost, l1.getPort, l1.getDatabase, l1.getUser, l1.getPassword)(
programContext.settings
)
case Some(l) if l.hasError => return Left(l.getError.getMessage)
case Some(_) => return Left("not an Oracle server")
case None => return Left(s"unknown credential: $db")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ class SQLServerInferAndReadEntry extends SugarEntryExtension {
)(
programContext.settings
)
case Some(l) if l.hasError => return Left(l.getError.getMessage)
case Some(_) => return Left("not an Oracle server")
case None => return Left(s"unknown credential: $db")
}
Expand Down Expand Up @@ -435,6 +436,7 @@ class SQLServerInferAndQueryEntry extends SugarEntryExtension {
new SqlServerServerLocation(l1.getHost, l1.getPort, l1.getDatabase, l1.getUser, l1.getPassword)(
programContext.settings
)
case Some(l) if l.hasError => return Left(l.getError.getMessage)
case Some(_) => return Left("not an Oracle server")
case None => return Left(s"unknown credential: $db")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ class SnowflakeInferAndReadEntry extends SugarEntryExtension {
)(
programContext.settings
)
case Some(l) if l.hasError => return Left(l.getError.getMessage)
case Some(_) => return Left("not a Snowflake server")
case None => return Left(s"unknown credential: $db")
}
Expand Down Expand Up @@ -501,6 +502,7 @@ class SnowflakeInferAndQueryEntry extends SugarEntryExtension {
l1.getParametersMap,
programContext.settings
)
case Some(l) if l.hasError => return Left(l.getError.getMessage)
case Some(_) => return Left("not a Snowflake server")
case None => return Left(s"unknown credential: $db")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,11 @@ public LocationConfig getLocationConfig(String name) {
if (maybeLocationConfig.isEmpty()) {
throw new RawTruffleRuntimeException("unknown credential: " + name);
}
return maybeLocationConfig.get();
LocationConfig locationConfig = maybeLocationConfig.get();
if (locationConfig.hasError()) {
throw new RawTruffleRuntimeException(locationConfig.getError().getMessage());
}
return locationConfig;
}

@CompilerDirectives.TruffleBoundary
Expand Down
17 changes: 12 additions & 5 deletions sql-client/src/main/scala/raw/client/sql/SqlConnection.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,18 @@ import java.util.concurrent.Executor
class SqlConnection(connectionPool: SqlConnectionPool, conn: Connection) extends java.sql.Connection {

override def close(): Unit = {
// We do not ACTUALLY close the connection; instead, we just release the borrow.
connectionPool.releaseConnection(
this,
isAlive = false // We are not sure if the connection is alive or not, e.g. it could be closed because it failed.
)
if (isClosed) {
// If the connection closed in the meantime (e.g. due to a crash), we cannot release it back to the pool.
connectionPool.actuallyRemoveConnection(this)
} else {
// If the connection seems "sane", then we do not ACTUALLY close the connection.
// Instead, we just release the borrow.
connectionPool.releaseConnection(
this,
isAlive =
false // We are not sure if the connection is alive or not, e.g. it could be closed because it failed.
)
}
}

// This is called by the connection pool when we *actually* want to close the connection.
Expand Down
47 changes: 26 additions & 21 deletions sql-client/src/main/scala/raw/client/sql/SqlConnectionPool.scala
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ class SqlConnectionPool()(implicit settings: RawSettings) extends RawService wit
logger.debug(s"Checking the connection health for $conn (state: ${connectionUrls(conn)})")
// Found one connection to check.
try {
if (conn.isValid(isValidSeconds)) {
if (!conn.isClosed() && conn.isValid(isValidSeconds)) {
logger.debug(s"Connection $conn is healthy")
// All good, so release borrow.
// This will update the last check is alive time.
Expand Down Expand Up @@ -225,27 +225,32 @@ class SqlConnectionPool()(implicit settings: RawSettings) extends RawService wit
}
}

private def actuallyRemoveConnection(conn: SqlConnection): Boolean = {
logger.debug(s"Actually removing connection $conn")
def actuallyRemoveConnection(conn: SqlConnection): Boolean = {
connectionPoolLock.synchronized {
val jdbcUrl = connectionUrls(conn)
try {
// First try to actually close the connection (note the use of actuallyClose), then clean up all the state.
conn.actuallyClose()
connectionState.remove(conn)
connectionUrls.remove(conn)
connectionCache.get(jdbcUrl) match {
case Some(conns) =>
val nconns = conns - conn
if (nconns.isEmpty) connectionCache.remove(jdbcUrl)
else connectionCache.put(jdbcUrl, nconns)
case None => // Nothing to do.
}
true
} catch {
case NonFatal(t) =>
// We failed to actually close the connection.
logger.warn(s"Failed to actually close the connection $conn", t)
connectionUrls.get(conn) match {
case Some(jdbcUrl) =>
logger.debug(s"Actually removing connection $conn")
try {
// First try to actually close the connection (note the use of actuallyClose), then clean up all the state.
conn.actuallyClose()
connectionState.remove(conn)
connectionUrls.remove(conn)
connectionCache.get(jdbcUrl) match {
case Some(conns) =>
val nconns = conns - conn
if (nconns.isEmpty) connectionCache.remove(jdbcUrl)
else connectionCache.put(jdbcUrl, nconns)
case None => // Nothing to do.
}
true
} catch {
case NonFatal(t) =>
// We failed to actually close the connection.
logger.warn(s"Failed to actually close the connection $conn", t)
false
}
case None =>
// Connection didn't exist/was removed already.
false
}
}
Expand Down
Loading