Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RD-12419: Added retry to SqlConnectionPool #498

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -299,8 +299,9 @@ lazy val sqlCompiler = (project in file("sql-compiler"))
kiama,
postgresqlDeps,
hikariCP,
"com.dimafeng" %% "testcontainers-scala-scalatest" % "0.41.3" % Test,
"com.dimafeng" %% "testcontainers-scala-postgresql" % "0.41.3" % Test
"com.dimafeng" %% "testcontainers-scala-scalatest" % "0.41.4" % Test,
"com.dimafeng" %% "testcontainers-scala-postgresql" % "0.41.4" % Test,
"org.testcontainers" % "toxiproxy" % "1.20.1" % Test
)
)

Expand Down
7 changes: 3 additions & 4 deletions sql-compiler/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
raw.sql.compiler {
error-messages {
missing-relation = "Did you forget to add credentials?"
}
metadata-cache {
size = 1000 # How many individual user metadata caches to keep
duration = 30m # How long to keep user metadata data cache
Expand All @@ -14,5 +11,7 @@ raw.sql.compiler {
idle-timeout = 20m # How long before a connection is considered to be idle (for GC or for checking health)
health-check-period = 5s # How often to check for health of connections
is-valid-seconds = 5 # Controls the JDBC isValid(seconds) setting to use. Apparently 5 is a common value.
retry-interval = 0.5 seconds # How long to wait and reconnect when an operation against FDW fails.
retries = 9 # How many retries when an operation against FDW fails (zero = we try but don't retry upon failure)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,60 +89,61 @@ class SqlCompilerService()(implicit protected val settings: RawSettings) extends
case Left(errors) => GetProgramDescriptionFailure(errors)
case Right(parsedTree) =>
try {
val conn = connectionPool.getConnection(environment.jdbcUrl.get)
try {
val stmt = new NamedParametersPreparedStatement(conn, parsedTree)
val description = stmt.queryMetadata match {
case Right(info) =>
val queryParamInfo = info.parameters
val outputType = pgRowTypeToIterableType(info.outputType)
val parameterInfo = queryParamInfo
.map {
case (name, paramInfo) => SqlTypesUtils.rawTypeFromPgType(paramInfo.pgType).map { rawType =>
// we ignore tipe.nullable and mark all parameters as nullable
val paramType = rawType match {
case RawAnyType() => rawType;
case other => other.cloneNullable
connectionPool.connectAnd(environment.jdbcUrl.get) { conn =>
try {
val stmt = new NamedParametersPreparedStatement(conn, parsedTree)
val description = stmt.queryMetadata match {
case Right(info) =>
val queryParamInfo = info.parameters
val outputType = pgRowTypeToIterableType(info.outputType)
val parameterInfo = queryParamInfo
.map {
case (name, paramInfo) => SqlTypesUtils.rawTypeFromPgType(paramInfo.pgType).map { rawType =>
// we ignore tipe.nullable and mark all parameters as nullable
val paramType = rawType match {
case RawAnyType() => rawType;
case other => other.cloneNullable
}
ParamDescription(
name,
Some(paramType),
paramInfo.default,
comment = paramInfo.comment,
required = paramInfo.default.isEmpty
)
}
ParamDescription(
name,
Some(paramType),
paramInfo.default,
comment = paramInfo.comment,
required = paramInfo.default.isEmpty
)
}
}
.foldLeft(Right(Seq.empty): Either[Seq[String], Seq[ParamDescription]]) {
case (Left(errors), Left(error)) => Left(errors :+ error)
case (_, Left(error)) => Left(Seq(error))
case (Right(params), Right(param)) => Right(params :+ param)
case (errors @ Left(_), _) => errors
case (_, Right(param)) => Right(Seq(param))
}
.foldLeft(Right(Seq.empty): Either[Seq[String], Seq[ParamDescription]]) {
case (Left(errors), Left(error)) => Left(errors :+ error)
case (_, Left(error)) => Left(Seq(error))
case (Right(params), Right(param)) => Right(params :+ param)
case (errors @ Left(_), _) => errors
case (_, Right(param)) => Right(Seq(param))
}
(outputType, parameterInfo) match {
case (Right(iterableType), Right(ps)) =>
// Regardless if there are parameters, we declare a main function with the output type.
// This permits the publish endpoints from the UI (https://raw-labs.atlassian.net/browse/RD-10359)
val ok = ProgramDescription(
Map.empty,
Some(DeclDescription(Some(ps.toVector), Some(iterableType), None)),
None
)
GetProgramDescriptionSuccess(ok)
case _ =>
val errorMessages =
outputType.left.getOrElse(Seq.empty) ++ parameterInfo.left.getOrElse(Seq.empty)
GetProgramDescriptionFailure(treeErrors(parsedTree, errorMessages).toList)
}
(outputType, parameterInfo) match {
case (Right(iterableType), Right(ps)) =>
// Regardless if there are parameters, we declare a main function with the output type.
// This permits the publish endpoints from the UI (https://raw-labs.atlassian.net/browse/RD-10359)
val ok = ProgramDescription(
Map.empty,
Some(DeclDescription(Some(ps.toVector), Some(iterableType), None)),
None
)
GetProgramDescriptionSuccess(ok)
case _ =>
val errorMessages =
outputType.left.getOrElse(Seq.empty) ++ parameterInfo.left.getOrElse(Seq.empty)
GetProgramDescriptionFailure(treeErrors(parsedTree, errorMessages).toList)
}
case Left(errors) => GetProgramDescriptionFailure(errors)
case Left(errors) => GetProgramDescriptionFailure(errors)
}
RawUtils.withSuppressNonFatalException(stmt.close())
description
} catch {
case e: NamedParametersPreparedStatementException => GetProgramDescriptionFailure(e.errors)
} finally {
RawUtils.withSuppressNonFatalException(conn.close())
}
RawUtils.withSuppressNonFatalException(stmt.close())
description
} catch {
case e: NamedParametersPreparedStatementException => GetProgramDescriptionFailure(e.errors)
} finally {
RawUtils.withSuppressNonFatalException(conn.close())
}
} catch {
case ex: SQLException if isConnectionFailure(ex) =>
Expand Down Expand Up @@ -340,21 +341,22 @@ class SqlCompilerService()(implicit protected val settings: RawSettings) extends
.getOrElse(HoverResponse(None))
case use: SqlParamUseNode =>
try {
val conn = connectionPool.getConnection(environment.jdbcUrl.get)
try {
val pstmt = new NamedParametersPreparedStatement(conn, tree)
connectionPool.connectAnd(environment.jdbcUrl.get) { conn =>
try {
pstmt.parameterInfo(use.name) match {
case Right(typeInfo) => HoverResponse(Some(TypeCompletion(use.name, typeInfo.pgType.typeName)))
case Left(_) => HoverResponse(None)
val pstmt = new NamedParametersPreparedStatement(conn, tree)
try {
pstmt.parameterInfo(use.name) match {
case Right(typeInfo) => HoverResponse(Some(TypeCompletion(use.name, typeInfo.pgType.typeName)))
case Left(_) => HoverResponse(None)
}
} finally {
RawUtils.withSuppressNonFatalException(pstmt.close())
}
} catch {
case _: NamedParametersPreparedStatementException => HoverResponse(None)
} finally {
RawUtils.withSuppressNonFatalException(pstmt.close())
RawUtils.withSuppressNonFatalException(conn.close())
}
} catch {
case _: NamedParametersPreparedStatementException => HoverResponse(None)
} finally {
RawUtils.withSuppressNonFatalException(conn.close())
}
} catch {
case ex: SQLException if isConnectionFailure(ex) =>
Expand Down Expand Up @@ -400,21 +402,22 @@ class SqlCompilerService()(implicit protected val settings: RawSettings) extends
case Left(errors) => ValidateResponse(errors)
case Right(parsedTree) =>
try {
val conn = connectionPool.getConnection(environment.jdbcUrl.get)
try {
val stmt = new NamedParametersPreparedStatement(conn, parsedTree)
connectionPool.connectAnd(environment.jdbcUrl.get) { conn =>
try {
stmt.queryMetadata match {
case Right(_) => ValidateResponse(List.empty)
case Left(errors) => ValidateResponse(errors)
val stmt = new NamedParametersPreparedStatement(conn, parsedTree)
try {
stmt.queryMetadata match {
case Right(_) => ValidateResponse(List.empty)
case Left(errors) => ValidateResponse(errors)
}
} finally {
RawUtils.withSuppressNonFatalException(stmt.close())
}
} catch {
case e: NamedParametersPreparedStatementException => ValidateResponse(e.errors)
} finally {
RawUtils.withSuppressNonFatalException(stmt.close())
RawUtils.withSuppressNonFatalException(conn.close())
}
} catch {
case e: NamedParametersPreparedStatementException => ValidateResponse(e.errors)
} finally {
RawUtils.withSuppressNonFatalException(conn.close())
}
} catch {
case ex: SQLException if isConnectionFailure(ex) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import com.rawlabs.utils.core.{RawService, RawSettings, RawUtils}
import java.sql.{Connection, SQLException}
import java.time.Instant
import java.util.concurrent.{Executors, TimeUnit}
import scala.annotation.tailrec
import scala.collection.mutable
import scala.util.control.NonFatal

Expand Down Expand Up @@ -50,6 +51,11 @@ class SqlConnectionPool()(implicit settings: RawSettings) extends RawService wit
private val healthCheckPeriod =
settings.getDuration("raw.sql.compiler.pool.health-check-period", TimeUnit.MILLISECONDS)

// Waiting time when FDW is unreachable.
private val retryInterval = settings.getDuration("raw.sql.compiler.pool.retry-interval")
// How many retries when FDW is unreachable
private val retries = settings.getInt("raw.sql.compiler.pool.retries")

// The JDBC isValid(<seconds>) value to use.
private val isValidSeconds = settings.getInt("raw.sql.compiler.pool.is-valid-seconds")

Expand Down Expand Up @@ -155,19 +161,12 @@ class SqlConnectionPool()(implicit settings: RawSettings) extends RawService wit
// No connection available...

// Check if we must release connections to make space.
var retries = 10
while (getTotalActiveConnections() >= maxConnections && retries >= 0) {
if (getTotalActiveConnections() >= maxConnections) {
if (!releaseOldestConnection()) {
logger.warn(s"Could not release oldest connection; retry #$retries")
// We could not successfully release any connection, so bail out.
logger.warn(s"Could not release oldest connection")
throw new SQLException("no connections available", "08000")
}
retries -= 1
// (msb) Why do I do this? I don't know; I'm assuming by sleeping we increase the change of successful release.
Thread.sleep(10)
}

// We could not successfully release any connection, so bail out.
if (getTotalActiveConnections() >= maxConnections) {
throw new SQLException("no connections available", "08000")
}

// Create a new connection.
Expand Down Expand Up @@ -273,6 +272,33 @@ class SqlConnectionPool()(implicit settings: RawSettings) extends RawService wit
}
}

// Connect to the provided JDBC URL and calls the `handler` function on the connection.
// That API internally implements a retry logic when a failure occurs (when connecting
// or running `handler`).
def connectAnd[T](jdbcUrl: String)(handler: Connection => T): T = {
@tailrec
def retryConnection(retries: Int): T = {
try {
val conn = getConnection(jdbcUrl)
try {
handler(conn)
} finally {
conn.close()
}
} catch {
case t: Throwable =>
logger.warn(s"Couldn't connect (attempts left: $retries)", t)
if (retries > 0) {
Thread.sleep(retryInterval)
retryConnection(retries - 1)
} else {
throw t
}
}
}
retryConnection(retries)
}

override def doStop(): Unit = {
connectionPoolLock.synchronized {
connectionCache.clear()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,18 @@ class UserMetadataCache(jdbcUrl: String, connectionPool: SqlConnectionPool, maxS
val loader = new CacheLoader[Seq[SqlIdentifier], Seq[IdentifierInfo]]() {
override def load(idns: Seq[SqlIdentifier]): Seq[IdentifierInfo] = {
try {
val con = connectionPool.getConnection(jdbcUrl)
try {
val query = idns.size match {
case 3 => WordSearchWithThreeItems
case 2 => WordSearchWithTwoItems
case 1 => WordSearchWithOneItem
connectionPool.connectAnd(jdbcUrl) { con =>
try {
val query = idns.size match {
case 3 => WordSearchWithThreeItems
case 2 => WordSearchWithTwoItems
case 1 => WordSearchWithOneItem
}
val tokens = idns.map(idn => if (idn.quoted) idn.value else idn.value.toLowerCase)
query.run(con, tokens)
} finally {
con.close()
}
val tokens = idns.map(idn => if (idn.quoted) idn.value else idn.value.toLowerCase)
query.run(con, tokens)
} finally {
con.close()
}
} catch {
case ex: SQLException if isConnectionFailure(ex) =>
Expand Down Expand Up @@ -115,16 +116,17 @@ class UserMetadataCache(jdbcUrl: String, connectionPool: SqlConnectionPool, maxS
val loader = new CacheLoader[Seq[SqlIdentifier], Seq[IdentifierInfo]]() {
override def load(idns: Seq[SqlIdentifier]): Seq[IdentifierInfo] = {
try {
val con = connectionPool.getConnection(jdbcUrl)
try {
val query = idns.size match {
case 2 => DotSearchWithTwoItems
case 1 => DotSearchWithOneItem
connectionPool.connectAnd(jdbcUrl) { con =>
try {
val query = idns.size match {
case 2 => DotSearchWithTwoItems
case 1 => DotSearchWithOneItem
}
val tokens = idns.map(idn => if (idn.quoted) idn.value else idn.value.toLowerCase)
query.run(con, tokens)
} finally {
con.close()
}
val tokens = idns.map(idn => if (idn.quoted) idn.value else idn.value.toLowerCase)
query.run(con, tokens)
} finally {
con.close()
}
} catch {
case ex: SQLException if isConnectionFailure(ex) =>
Expand Down
Loading