Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: conclusion poll loop no longer stops #648

Merged
merged 1 commit into from
Jan 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 24 additions & 67 deletions pipeline/src/aggregator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -626,51 +626,11 @@ mod tests {

use crate::{
cid_string::CidString, concluder::mock::MockConcluder, conclusion_events_to_record_batch,
pipeline_ctx, ConclusionData, ConclusionEvent, ConclusionInit, ConclusionTime,
pipeline_ctx, tests::TestContext, ConclusionData, ConclusionEvent, ConclusionInit,
ConclusionTime,
};

// A context that ensures all tasks for the actor have terminated.
// Without the assurance we cannot be sure that the mock actor drop logic has run its
// assertions.
//
// This struct implements drop logic that panics if proper cleanup wasn't performed.
// However double panics (i.e. panic while panicing) make test output hard to read.
// Therefore if we are already panicing the drop logic does not panic.
// This makes it so that the panic in the cleanup logic doesn't hide the real reason for the
// test failure. However in order for this to work tests must fail by panicing not returning a
// result.
#[must_use]
struct TestContext {
shutdown: Shutdown,
handles: Vec<JoinHandle<()>>,
aggregator: AggregatorHandle,
is_shutdown: bool,
}

impl TestContext {
async fn shutdown(mut self) -> anyhow::Result<()> {
self.shutdown.shutdown();
while let Some(h) = self.handles.pop() {
// if the task paniced this will report and error.
h.await?;
}
self.is_shutdown = true;
Ok(())
}
}
impl Drop for TestContext {
fn drop(&mut self) {
if !self.is_shutdown {
if std::thread::panicking() {
// Do not double panic. Test is going to fail anyways.
} else {
panic!("TestContext shutdown must be called");
}
}
}
}

async fn init() -> anyhow::Result<TestContext> {
async fn init() -> anyhow::Result<TestContext<AggregatorHandle>> {
let mut mock_concluder = MockConcluder::new();
mock_concluder
.expect_handle_subscribe_since()
Expand All @@ -684,15 +644,17 @@ mod tests {
init_with_concluder(mock_concluder).await
}

async fn init_with_concluder(mock_concluder: MockConcluder) -> anyhow::Result<TestContext> {
async fn init_with_concluder(
mock_concluder: MockConcluder,
) -> anyhow::Result<TestContext<AggregatorHandle>> {
let object_store = Arc::new(InMemory::new());
init_with_object_store(mock_concluder, object_store, None).await
}
async fn init_with_object_store(
mock_concluder: MockConcluder,
object_store: Arc<dyn ObjectStore>,
max_cached_rows: Option<usize>,
) -> anyhow::Result<TestContext> {
) -> anyhow::Result<TestContext<AggregatorHandle>> {
let metrics = Metrics::register(&mut Registry::default());
let shutdown = Shutdown::new();
let pipeline_ctx = pipeline_ctx(object_store.clone()).await?;
Expand All @@ -705,17 +667,12 @@ mod tests {
shutdown.clone(),
)
.await?;
Ok(TestContext {
shutdown,
handles,
aggregator,
is_shutdown: false,
})
Ok(TestContext::new(shutdown, handles, aggregator))
}

async fn do_test(conclusion_events: RecordBatch) -> anyhow::Result<impl std::fmt::Display> {
let ctx = init().await?;
let r = do_pass(ctx.aggregator.clone(), None, Some(conclusion_events)).await;
let r = do_pass(ctx.actor_handle.clone(), None, Some(conclusion_events)).await;
ctx.shutdown().await?;
r
}
Expand Down Expand Up @@ -838,7 +795,7 @@ mod tests {
.unwrap();

let mut subscription = ctx
.aggregator
.actor_handle
.send(SubscribeSinceMsg {
projection: Some(vec![0, 2, 3]),
offset: None,
Expand All @@ -847,7 +804,7 @@ mod tests {
.await
.unwrap()
.unwrap();
ctx.aggregator
ctx.actor_handle
.send(NewConclusionEventsMsg {
events: conclusion_events,
})
Expand Down Expand Up @@ -1008,7 +965,7 @@ mod tests {
// events for each pass.
let ctx = init().await.unwrap();
let event_states = do_pass(
ctx.aggregator.clone(),
ctx.actor_handle.clone(),
None,
Some(
conclusion_events_to_record_batch(&[ConclusionEvent::Data(ConclusionData {
Expand Down Expand Up @@ -1044,7 +1001,7 @@ mod tests {
| 0 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 2 | did:key:bob | {controller: 6469643a6b65793a626f62, model: 6d6f64656c} | baeabeials2i6o2ppkj55kfbh7r2fzc73r2esohqfivekpag553lyc7f6bi | 0 | {"metadata":{"foo":1,"shouldIndex":true},"content":{"a":0}} |
+-------+-------------------------------------------------------------+-------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+------------+-------------------------------------------------------------+"#]].assert_eq(&event_states.to_string());
let event_states = do_pass(
ctx.aggregator.clone(),
ctx.actor_handle.clone(),
Some(0),
Some(
conclusion_events_to_record_batch(&[ConclusionEvent::Time(ConclusionTime {
Expand Down Expand Up @@ -1082,7 +1039,7 @@ mod tests {
| 1 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 2 | did:key:bob | {controller: 6469643a6b65793a626f62, model: 6d6f64656c} | baeabeihyzbu2wxx4yj37mozb76gkxln2dt5zxxasivhuzbnxiqd5w4xygq | 1 | {"metadata":{"foo":1,"shouldIndex":true},"content":{"a":0}} |
+-------+-------------------------------------------------------------+-------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+------------+-------------------------------------------------------------+"#]].assert_eq(&event_states.to_string());
let event_states = do_pass(
ctx.aggregator.clone(),
ctx.actor_handle.clone(),
Some(1),
Some(conclusion_events_to_record_batch(&[ConclusionEvent::Data(ConclusionData {
index: 2,
Expand Down Expand Up @@ -1119,7 +1076,7 @@ mod tests {
async fn multiple_passes() {
let ctx = init().await.unwrap();
let event_states = do_pass(
ctx.aggregator.clone(),
ctx.actor_handle.clone(),
None,
Some(
conclusion_events_to_record_batch(&[ConclusionEvent::Data(ConclusionData {
Expand Down Expand Up @@ -1155,7 +1112,7 @@ mod tests {
| 0 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 2 | did:key:bob | {controller: 6469643a6b65793a626f62, model: 6d6f64656c} | baeabeials2i6o2ppkj55kfbh7r2fzc73r2esohqfivekpag553lyc7f6bi | 0 | {"metadata":{"foo":1,"shouldIndex":true},"content":{"a":0}} |
+-------+-------------------------------------------------------------+-------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+------------+-------------------------------------------------------------+"#]].assert_eq(&event_states.to_string());
let event_states = do_pass(
ctx.aggregator.clone(),
ctx.actor_handle.clone(),
Some(0),
Some(conclusion_events_to_record_batch(&[
ConclusionEvent::Time(ConclusionTime {
Expand Down Expand Up @@ -1292,7 +1249,7 @@ mod tests {
});

let ctx = init_with_concluder(mock_concluder).await.unwrap();
let event_states = do_pass(ctx.aggregator.clone(), None, None).await.unwrap();
let event_states = do_pass(ctx.actor_handle.clone(), None, None).await.unwrap();
expect![[r#"
+-------+-------------------------------------------------------------+-------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+------------+--------------------------------------------------------------+
| index | stream_cid | stream_type | controller | dimensions | event_cid | event_type | data |
Expand Down Expand Up @@ -1381,7 +1338,7 @@ mod tests {
)
.await
.unwrap();
let event_states = do_pass(ctx.aggregator.clone(), None, None).await.unwrap();
let event_states = do_pass(ctx.actor_handle.clone(), None, None).await.unwrap();
expect![[r#"
+-------+-------------------------------------------------------------+-------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+------------+--------------------------------------------------------------+
| index | stream_cid | stream_type | controller | dimensions | event_cid | event_type | data |
Expand Down Expand Up @@ -1442,7 +1399,7 @@ mod tests {
.unwrap();

let mut subscription = ctx
.aggregator
.actor_handle
.send(SubscribeSinceMsg {
projection: None,
offset: None,
Expand Down Expand Up @@ -1548,7 +1505,7 @@ mod tests {
]).unwrap();

let mut subscription = ctx
.aggregator
.actor_handle
.send(SubscribeSinceMsg {
projection: None,
offset: None,
Expand All @@ -1557,7 +1514,7 @@ mod tests {
.await
.unwrap()
.unwrap();
ctx.aggregator
ctx.actor_handle
.send(NewConclusionEventsMsg {
events: conclusion_events,
})
Expand All @@ -1581,7 +1538,7 @@ mod tests {
async fn stream_state() {
let ctx = init().await.unwrap();

ctx.aggregator
ctx.actor_handle
.send(NewConclusionEventsMsg {
events: conclusion_events_to_record_batch(&[
ConclusionEvent::Data(ConclusionData {
Expand Down Expand Up @@ -1651,7 +1608,7 @@ mod tests {
.unwrap();

let state = ctx
.aggregator
.actor_handle
.send(StreamStateMsg {
id: StreamId {
r#type: StreamIdType::Model,
Expand Down Expand Up @@ -1688,7 +1645,7 @@ mod tests {
let ctx = init().await.unwrap();

let state = ctx
.aggregator
.actor_handle
.send(StreamStateMsg {
id: StreamId {
r#type: StreamIdType::Model,
Expand Down
Loading
Loading