-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Protocol v1 ben #19
Protocol v1 ben #19
Conversation
Added a test postgres das
eea2573
to
e7d793b
Compare
e7d793b
to
e8ffdd0
Compare
e8ffdd0
to
54b6561
Compare
src/main/resources/reference.conf
Outdated
server { | ||
port = 50051 # the port the server listens on | ||
monitoring-port = 8080 # http port for monitoring | ||
max-chunk-size = 1000 # the maximum number of rows that can be returned in a single chunk |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is this one, and why different from batch-size below?
Is it necessary or can be done 'time-based' chunking only?
We have to be careful to not "over-fetch" from the source systems.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So now the reference.conf has two settings:
server {
batch-latency = 100 millis # how long we wait for more rows before we send a batch
}
cache {
batch-size = 1000 # how many rows of data are produced per producerInterval tick
}
And within the protocol we also get a max buffer size as part of execute requests. Setting server.batch-latency
impacts how long one waits for filling that max buffer the client specified. The cache.batch-size
is the number of rows we pick per producer tick, from the DAS. I agree if DAS => cache queue => gRPC were properly pipelines, we'd be reading as many rows as needed to filling the client's buffer, or until the batch-latency would be hit.
@@ -281,7 +281,7 @@ object ExpressionEvaluator { | |||
evalEquals(x, y) | |||
} | |||
|
|||
case _ => false | |||
case _ => true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Predicates that aren't implemented are considered as true
since they need to be evaluated in Postgres.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't agree with this part. (And in any case, this comment would have belonged in the source code, not in the PR).
But this is an ExpressionEvaluator component, so I'd expect it to act as one. This has nothing to do with Postgres, as this is its own standalone component, which is in no way depending or relating to how Postgres or FDW happen to use it.
If the predicates are not implemented, we either say we don't support it (return an option, exception, etc) or something else along those lines. In the interaction point between components, then we can assume that not being implemented provides an explicit semantic; but that's for the composition component/layer to do, not the implementation. Providing a semantic as this one here is crosscutting two components and concerns that are not related to each other.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If patched the ExpressionEvaluator to throw when an expression isn't supported. The layer above, QualEvaluator, is the one who logs the warning, ignores the failure and interprets the predicate as true (meaning it ignores the predicate as if it's not supported). It's also the one that interprets null as false. The QualEvaluator was changed to evaluate predicates one by one instead of as an "and", so that one can skip failing ones.
@@ -28,12 +30,14 @@ object QualSelectivityAnalyzer { | |||
/** | |||
* @param oldQuals Existing qualifiers | |||
* @param newQuals New qualifiers | |||
* @return None if `newQuals` is NOT strictly more selective than `oldQuals` Some(difference) if it is, where | |||
* 'difference' is the subset of `newQuals` that imposes stricter constraints than already in `oldQuals`. | |||
* @return None if `newQuals` is NOT as selective as `oldQuals`, Some(difference) if it is, where 'difference' is the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the original implementation the logic would discard a cache if the newQuals
weren't strictly more selective. The exact same query having produced that cache wasn't reusing it's own cache.
@@ -149,7 +147,12 @@ private class CacheManagerBehavior[T]( | |||
|
|||
// We'll keep track of child data source actors by cacheId | |||
private var dataSourceMap = Map.empty[UUID, ActorRef[ChronicleDataSource.ChronicleDataSourceCommand]] | |||
|
|||
private val dataSourceEventAdapter: ActorRef[ChronicleDataSource.DataSourceLifecycleEvent] = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Apparently a MessageAdapter
is a singleton per type it handles. In the original implementation, a MessageAdapter
that handles the messages and passes them to the CacheManager
, was created per data source and would encapsulate the cacheId
to be passed to CacheManager
.
But since internally it's a singleton, the same message handler was used for two different data sources. For example if both would hit the grace-period
, we'd get twice the same cacheId
in both messages to CacheManager
.
Here's it's created once, but messages are parameters with the cacheId
to propagate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, didn't know.
@@ -371,16 +375,13 @@ private class CacheManagerBehavior[T]( | |||
|
|||
ctx.spawn( | |||
ChronicleDataSource[T]( | |||
cacheId = cacheId, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pass the cacheId
to the data source since it has to be eventually sent in its messages.
@@ -416,7 +416,10 @@ private class CacheManagerBehavior[T]( | |||
val reader = archivedStore.newReader() | |||
|
|||
Source.fromIterator(() => reader).watchTermination() { (mat, doneF) => | |||
doneF.onComplete(_ => reader.close())(executionContext) | |||
doneF.onComplete { _ => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing store close that leads to a stack trace.
@@ -395,10 +396,9 @@ private class CacheManagerBehavior[T]( | |||
dsRef | |||
.ask[ChronicleDataSource.SubscribeResponse](replyTo => ChronicleDataSource.RequestSubscribe(replyTo)) | |||
.map { | |||
case ChronicleDataSource.Subscribed(consumerId) => | |||
case ChronicleDataSource.Subscribed(consumerId, tailer) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Creating the tailer
outside instead of passing the storage
and let it create the tailer
inside. That fixes the stack trace warning showing an object is garbage collected before being closed.
final case class DataProductionComplete(sizeInBytes: Long) extends DataSourceLifecycleEvent | ||
final case class DataProductionError(msg: String) extends DataSourceLifecycleEvent | ||
final case object DataProductionVoluntaryStop extends DataSourceLifecycleEvent | ||
final case class DataProductionComplete(cacheId: UUID, sizeInBytes: Long) extends DataSourceLifecycleEvent |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cacheId
part of the message to fix the fact the singleton message handler knows what to send to the CacheManager
.
@@ -322,6 +327,10 @@ private class ChronicleDataSourceBehavior[T]( | |||
case ConsumerTerminated(cid) => | |||
activeConsumers -= 1 | |||
log.info(s"Consumer $cid terminated; activeConsumers=$activeConsumers") | |||
tailerMap.remove(cid).foreach { tailer => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Closing the tailer
that was open after the consumer is terminated (fixes the stack trace warning).
@@ -0,0 +1,111 @@ | |||
/* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unused now. Will delete.
|
||
private def quoteIdentifier(ident: String): String = { | ||
// naive approach | ||
s""""$ident"""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, needs a regex and adding quotes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I fixed a number of things in PostgresDAS (like it was ignoring the schema), and about the ident, we wrap it in double quotes because we're internally using the postgres identifier, the one that can be wrapped in double quotes. I added the logic to escape double quotes if ever they're part of the identifier.
Fields and tables are advertised such that double quotes is safe. Clients are sending idenfiers that follow those (DAS protocol isn't case insensitive). I think that works like that.
|
||
// (Can be used to pass custom parameters/settings in the future?) | ||
DASServer.main(Array()) | ||
test("Run the main code with mock services")(DASServer.main(Array())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will this not hang the CI and leave it running?
Why make it a test? It's not testing anything (?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes it was left by accident as I run live tests like that. I rewrote it as a test as you used to have because DASServer.main
wouldn't run within test
.
It's missing the service declaration file. I'll switch it back to an App
and add the service file too.
src/test/scala/com/rawlabs/das/postgresql/PostgresqlBackend.scala
Outdated
Show resolved
Hide resolved
@@ -297,9 +297,15 @@ object ExpressionEvaluator { | |||
// Strings | |||
case (StringVal(s1), StringVal(s2)) => | |||
s1 < s2 | |||
|
|||
// (Add date/time if needed) | |||
case _ => false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment applies elsewhere.
Options may be cumbersome - but cleaner -, so exceptions would be easier.
That said, o1pro can write the option code if we do that route.
But this is not to be assumed in this component as it is mixing two different meanings.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes it's using Exceptions now.
val days = localDate.toEpochDay | ||
Some(BigDecimal(days)) | ||
} catch { | ||
case _: Throwable => None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Throwable is the wrong choice here (and below), as it is way too broad and captures interruption exceptions, out of memory, and other things hat have nothing to do with date conversions. At the very least use NonFatal.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Catching DateTimeException now. If ever we'd get wrong numbers from our Value
, LocalDateTime
and company would throw.
val micros = totalSeconds * 1_000_000L + (t.getNano.toLong / 1000L) | ||
Some(BigDecimal(micros)) | ||
} catch { | ||
case _: Throwable => None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here.
val micros = epochSec * 1_000_000L + (ts.getNano.toLong / 1000L) | ||
Some(BigDecimal(micros)) | ||
} catch { | ||
case _: Throwable => None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here.
@@ -149,7 +147,12 @@ private class CacheManagerBehavior[T]( | |||
|
|||
// We'll keep track of child data source actors by cacheId | |||
private var dataSourceMap = Map.empty[UUID, ActorRef[ChronicleDataSource.ChronicleDataSourceCommand]] | |||
|
|||
private val dataSourceEventAdapter: ActorRef[ChronicleDataSource.DataSourceLifecycleEvent] = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, didn't know.
66437d8
to
292dff7
Compare
9009ee7
to
9f37ea6
Compare
9f37ea6
to
34b3eba
Compare
MessageAdapter
related code because it appeared to be a singleton reused for all tasks,close
,Query
,