Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Protocol v1 ben #19

Merged
merged 47 commits into from
Feb 11, 2025
Merged

Protocol v1 ben #19

merged 47 commits into from
Feb 11, 2025

Conversation

bgaidioz
Copy link
Contributor

@bgaidioz bgaidioz commented Jan 13, 2025

  • predicates being considered false when not supported (in cache filtering),
  • edited cache logic to accept a cache that implements the exact same predicates,
  • edited MessageAdapter related code because it appeared to be a singleton reused for all tasks,
  • missing filter on table name,
  • cleared two warning stack traces indicating bad practices or missing calls to close,
  • made server read its parameters from config,
  • handle new cache and max-size related message fields in Query,
  • added test framework Postgres DAS.

@bgaidioz bgaidioz force-pushed the protocol-v1-ben branch 2 times, most recently from eea2573 to e7d793b Compare January 13, 2025 14:29
server {
port = 50051 # the port the server listens on
monitoring-port = 8080 # http port for monitoring
max-chunk-size = 1000 # the maximum number of rows that can be returned in a single chunk
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this one, and why different from batch-size below?
Is it necessary or can be done 'time-based' chunking only?

We have to be careful to not "over-fetch" from the source systems.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So now the reference.conf has two settings:

server {
   batch-latency = 100 millis # how long we wait for more rows before we send a batch
}
cache {
   batch-size = 1000 # how many rows of data are produced per producerInterval tick
}

And within the protocol we also get a max buffer size as part of execute requests. Setting server.batch-latency impacts how long one waits for filling that max buffer the client specified. The cache.batch-size is the number of rows we pick per producer tick, from the DAS. I agree if DAS => cache queue => gRPC were properly pipelines, we'd be reading as many rows as needed to filling the client's buffer, or until the batch-latency would be hit.

@@ -281,7 +281,7 @@ object ExpressionEvaluator {
evalEquals(x, y)
}

case _ => false
case _ => true
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Predicates that aren't implemented are considered as true since they need to be evaluated in Postgres.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't agree with this part. (And in any case, this comment would have belonged in the source code, not in the PR).

But this is an ExpressionEvaluator component, so I'd expect it to act as one. This has nothing to do with Postgres, as this is its own standalone component, which is in no way depending or relating to how Postgres or FDW happen to use it.

If the predicates are not implemented, we either say we don't support it (return an option, exception, etc) or something else along those lines. In the interaction point between components, then we can assume that not being implemented provides an explicit semantic; but that's for the composition component/layer to do, not the implementation. Providing a semantic as this one here is crosscutting two components and concerns that are not related to each other.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If patched the ExpressionEvaluator to throw when an expression isn't supported. The layer above, QualEvaluator, is the one who logs the warning, ignores the failure and interprets the predicate as true (meaning it ignores the predicate as if it's not supported). It's also the one that interprets null as false. The QualEvaluator was changed to evaluate predicates one by one instead of as an "and", so that one can skip failing ones.

@@ -28,12 +30,14 @@ object QualSelectivityAnalyzer {
/**
* @param oldQuals Existing qualifiers
* @param newQuals New qualifiers
* @return None if `newQuals` is NOT strictly more selective than `oldQuals` Some(difference) if it is, where
* 'difference' is the subset of `newQuals` that imposes stricter constraints than already in `oldQuals`.
* @return None if `newQuals` is NOT as selective as `oldQuals`, Some(difference) if it is, where 'difference' is the
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the original implementation the logic would discard a cache if the newQuals weren't strictly more selective. The exact same query having produced that cache wasn't reusing it's own cache.

@@ -149,7 +147,12 @@ private class CacheManagerBehavior[T](

// We'll keep track of child data source actors by cacheId
private var dataSourceMap = Map.empty[UUID, ActorRef[ChronicleDataSource.ChronicleDataSourceCommand]]

private val dataSourceEventAdapter: ActorRef[ChronicleDataSource.DataSourceLifecycleEvent] =
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apparently a MessageAdapter is a singleton per type it handles. In the original implementation, a MessageAdapter that handles the messages and passes them to the CacheManager, was created per data source and would encapsulate the cacheId to be passed to CacheManager.

But since internally it's a singleton, the same message handler was used for two different data sources. For example if both would hit the grace-period, we'd get twice the same cacheId in both messages to CacheManager.

Here's it's created once, but messages are parameters with the cacheId to propagate.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, didn't know.

@@ -371,16 +375,13 @@ private class CacheManagerBehavior[T](

ctx.spawn(
ChronicleDataSource[T](
cacheId = cacheId,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pass the cacheId to the data source since it has to be eventually sent in its messages.

@@ -416,7 +416,10 @@ private class CacheManagerBehavior[T](
val reader = archivedStore.newReader()

Source.fromIterator(() => reader).watchTermination() { (mat, doneF) =>
doneF.onComplete(_ => reader.close())(executionContext)
doneF.onComplete { _ =>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing store close that leads to a stack trace.

@@ -395,10 +396,9 @@ private class CacheManagerBehavior[T](
dsRef
.ask[ChronicleDataSource.SubscribeResponse](replyTo => ChronicleDataSource.RequestSubscribe(replyTo))
.map {
case ChronicleDataSource.Subscribed(consumerId) =>
case ChronicleDataSource.Subscribed(consumerId, tailer) =>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Creating the tailer outside instead of passing the storage and let it create the tailer inside. That fixes the stack trace warning showing an object is garbage collected before being closed.

final case class DataProductionComplete(sizeInBytes: Long) extends DataSourceLifecycleEvent
final case class DataProductionError(msg: String) extends DataSourceLifecycleEvent
final case object DataProductionVoluntaryStop extends DataSourceLifecycleEvent
final case class DataProductionComplete(cacheId: UUID, sizeInBytes: Long) extends DataSourceLifecycleEvent
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cacheId part of the message to fix the fact the singleton message handler knows what to send to the CacheManager.

@@ -322,6 +327,10 @@ private class ChronicleDataSourceBehavior[T](
case ConsumerTerminated(cid) =>
activeConsumers -= 1
log.info(s"Consumer $cid terminated; activeConsumers=$activeConsumers")
tailerMap.remove(cid).foreach { tailer =>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Closing the tailer that was open after the consumer is terminated (fixes the stack trace warning).

@@ -0,0 +1,111 @@
/*
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unused now. Will delete.


private def quoteIdentifier(ident: String): String = {
// naive approach
s""""$ident""""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, needs a regex and adding quotes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I fixed a number of things in PostgresDAS (like it was ignoring the schema), and about the ident, we wrap it in double quotes because we're internally using the postgres identifier, the one that can be wrapped in double quotes. I added the logic to escape double quotes if ever they're part of the identifier.

Fields and tables are advertised such that double quotes is safe. Clients are sending idenfiers that follow those (DAS protocol isn't case insensitive). I think that works like that.


// (Can be used to pass custom parameters/settings in the future?)
DASServer.main(Array())
test("Run the main code with mock services")(DASServer.main(Array()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this not hang the CI and leave it running?
Why make it a test? It's not testing anything (?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it was left by accident as I run live tests like that. I rewrote it as a test as you used to have because DASServer.main wouldn't run within test.

It's missing the service declaration file. I'll switch it back to an App and add the service file too.

@@ -297,9 +297,15 @@ object ExpressionEvaluator {
// Strings
case (StringVal(s1), StringVal(s2)) =>
s1 < s2

// (Add date/time if needed)
case _ => false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment applies elsewhere.
Options may be cumbersome - but cleaner -, so exceptions would be easier.
That said, o1pro can write the option code if we do that route.
But this is not to be assumed in this component as it is mixing two different meanings.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it's using Exceptions now.

val days = localDate.toEpochDay
Some(BigDecimal(days))
} catch {
case _: Throwable => None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Throwable is the wrong choice here (and below), as it is way too broad and captures interruption exceptions, out of memory, and other things hat have nothing to do with date conversions. At the very least use NonFatal.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Catching DateTimeException now. If ever we'd get wrong numbers from our Value, LocalDateTime and company would throw.

val micros = totalSeconds * 1_000_000L + (t.getNano.toLong / 1000L)
Some(BigDecimal(micros))
} catch {
case _: Throwable => None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here.

val micros = epochSec * 1_000_000L + (ts.getNano.toLong / 1000L)
Some(BigDecimal(micros))
} catch {
case _: Throwable => None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here.

@@ -149,7 +147,12 @@ private class CacheManagerBehavior[T](

// We'll keep track of child data source actors by cacheId
private var dataSourceMap = Map.empty[UUID, ActorRef[ChronicleDataSource.ChronicleDataSourceCommand]]

private val dataSourceEventAdapter: ActorRef[ChronicleDataSource.DataSourceLifecycleEvent] =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, didn't know.

@miguelbranco80 miguelbranco80 marked this pull request as ready for review February 11, 2025 12:20
@miguelbranco80 miguelbranco80 merged commit 068af19 into protocol-v1 Feb 11, 2025
1 of 2 checks passed
@miguelbranco80 miguelbranco80 deleted the protocol-v1-ben branch February 11, 2025 12:20
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants