Skip to content

Commit

Permalink
refactor: rename _stream tables to _feed
Browse files Browse the repository at this point in the history
  • Loading branch information
nathanielc committed Jan 27, 2025
1 parent 1366e21 commit ab147b7
Show file tree
Hide file tree
Showing 6 changed files with 28 additions and 28 deletions.
12 changes: 6 additions & 6 deletions flight/tests/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ async fn conclusion_push_down_predicate() -> Result<()> {
}

#[test(tokio::test)]
async fn conclusion_events_stream() -> Result<()> {
async fn conclusion_events_feed() -> Result<()> {
let mut feed = MockFeed::new();
feed.expect_max_highwater_mark()
.once()
Expand All @@ -280,7 +280,7 @@ async fn conclusion_events_stream() -> Result<()> {
event_type,
data::varchar as data,
array_cid_string(previous) as previous
FROM conclusion_events_stream LIMIT 2"#
FROM conclusion_events_feed LIMIT 2"#
.to_string(),
None,
)
Expand Down Expand Up @@ -352,7 +352,7 @@ async fn event_states_simple() -> Result<()> {
Ok(())
}
#[test(tokio::test)]
async fn event_states_stream() -> Result<()> {
async fn event_states_feed() -> Result<()> {
let mut feed = MockFeed::new();
feed.expect_max_highwater_mark()
.once()
Expand All @@ -374,7 +374,7 @@ async fn event_states_stream() -> Result<()> {
cid_string(event_cid) as event_cid,
event_type,
data::varchar as data
FROM event_states_stream LIMIT 3"#
FROM event_states_feed LIMIT 3"#
.to_string(),
None,
)
Expand All @@ -393,7 +393,7 @@ async fn event_states_stream() -> Result<()> {
}

#[test(tokio::test)]
async fn event_states_stream_projection() -> Result<()> {
async fn event_states_feed_projection() -> Result<()> {
// This ensures that a simple projection of the stream table works.
// Any extra functions or casts should not be used so we ensure we are excersicing the schema
// of the stream table directly.
Expand All @@ -412,7 +412,7 @@ async fn event_states_stream_projection() -> Result<()> {
r#"
SELECT
event_type
FROM event_states_stream LIMIT 3"#
FROM event_states_feed LIMIT 3"#
.to_string(),
None,
)
Expand Down
10 changes: 5 additions & 5 deletions pipeline/src/aggregator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,14 @@ use crate::{
concluder::ConcluderHandle,
metrics::Metrics,
schemas,
since::{rows_since, StreamTable, StreamTableSource},
since::{rows_since, FeedTable, FeedTableSource},
PipelineContext, Result, SessionContextRef,
};
// Use the SubscribeSinceMsg so its clear its a message for this actor
pub use crate::since::SubscribeSinceMsg;

const EVENT_STATES_TABLE: &str = "ceramic.v0.event_states";
const EVENT_STATES_STREAM_TABLE: &str = "ceramic.v0.event_states_stream";
const EVENT_STATES_FEED_TABLE: &str = "ceramic.v0.event_states_feed";
const EVENT_STATES_MEM_TABLE: &str = "ceramic._internal.event_states_mem";
const EVENT_STATES_PERSISTENT_TABLE: &str = "ceramic._internal.event_states_persistent";

Expand Down Expand Up @@ -154,8 +154,8 @@ impl Aggregator {

ctx.session()
.register_table(
EVENT_STATES_STREAM_TABLE,
Arc::new(StreamTable::new(handle.clone())),
EVENT_STATES_FEED_TABLE,
Arc::new(FeedTable::new(handle.clone())),
)
.expect("should be able to register table");

Expand Down Expand Up @@ -582,7 +582,7 @@ async fn process_conclusion_events_batch(
}

#[async_trait]
impl StreamTableSource for AggregatorHandle {
impl FeedTableSource for AggregatorHandle {
fn schema(&self) -> SchemaRef {
schemas::event_states()
}
Expand Down
14 changes: 7 additions & 7 deletions pipeline/src/concluder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use datafusion::{
};
use futures::TryStreamExt as _;
use shutdown::{Shutdown, ShutdownSignal};
use table::FeedTable;
use table::ConclusionFeedTable;
use tokio::{
select,
sync::broadcast,
Expand All @@ -34,7 +34,7 @@ use tracing::{debug, error, info, warn};
use crate::{
metrics::Metrics,
schemas,
since::{rows_since, StreamTable, StreamTableSource},
since::{rows_since, FeedTable, FeedTableSource},
ConclusionFeedSource, PipelineContext, Result, SessionContextRef,
};

Expand All @@ -47,7 +47,7 @@ pub use event::{
pub use table::ConclusionFeed;

const CONCLUSION_EVENTS_TABLE: &str = "ceramic.v0.conclusion_events";
const CONCLUSION_EVENTS_STREAM_TABLE: &str = "ceramic.v0.conclusion_events_stream";
const CONCLUSION_EVENTS_FEED_TABLE: &str = "ceramic.v0.conclusion_events_feed";

/// Concluder is responsible for making conclusions about raw events and publishing
/// conclusion_events.
Expand Down Expand Up @@ -95,7 +95,7 @@ impl Concluder {
ConclusionFeedSource::Direct(conclusion_feed) => {
ctx.session().register_table(
CONCLUSION_EVENTS_TABLE,
Arc::new(FeedTable::new(conclusion_feed.clone())),
Arc::new(ConclusionFeedTable::new(conclusion_feed.clone())),
)?;
conclusion_feed.max_highwater_mark().await?
}
Expand Down Expand Up @@ -129,8 +129,8 @@ impl Concluder {
};
ctx.session()
.register_table(
CONCLUSION_EVENTS_STREAM_TABLE,
Arc::new(StreamTable::new(handle.clone())),
CONCLUSION_EVENTS_FEED_TABLE,
Arc::new(FeedTable::new(handle.clone())),
)
.expect("should be able to register table");

Expand Down Expand Up @@ -322,7 +322,7 @@ impl Handler<EventsSinceMsg> for Concluder {
}

#[async_trait]
impl StreamTableSource for ConcluderHandle {
impl FeedTableSource for ConcluderHandle {
fn schema(&self) -> SchemaRef {
crate::schemas::conclusion_events()
}
Expand Down
6 changes: 3 additions & 3 deletions pipeline/src/concluder/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,12 @@ impl<T: ConclusionFeed> ConclusionFeed for Arc<T> {
// Implements the [`TableProvider`] trait producing a [`FeedExec`] instance when the table is
// scanned, which in turn calls into the [`ConclusionFeed`] to get the actual events.
#[derive(Debug)]
pub struct FeedTable<T> {
pub struct ConclusionFeedTable<T> {
feed: Arc<T>,
schema: SchemaRef,
}

impl<T> FeedTable<T> {
impl<T> ConclusionFeedTable<T> {
pub fn new(feed: Arc<T>) -> Self {
Self {
feed,
Expand Down Expand Up @@ -92,7 +92,7 @@ impl<T> FeedTable<T> {
}
}
#[async_trait::async_trait]
impl<T: ConclusionFeed + std::fmt::Debug + 'static> TableProvider for FeedTable<T> {
impl<T: ConclusionFeed + std::fmt::Debug + 'static> TableProvider for ConclusionFeedTable<T> {
fn as_any(&self) -> &dyn Any {
self
}
Expand Down
10 changes: 5 additions & 5 deletions pipeline/src/since/stream.rs → pipeline/src/since/feed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use futures::TryStreamExt as _;
//
// TODO add error handling
#[async_trait]
pub trait StreamTableSource: Clone + std::fmt::Debug + Sync + Send + 'static {
pub trait FeedTableSource: Clone + std::fmt::Debug + Sync + Send + 'static {
fn schema(&self) -> SchemaRef;
// Subscribe to all new data for this table in increasing "index" order since offset.
// All received RecordBatches must contain and be ordered by an "index" u64 column.
Expand All @@ -46,10 +46,10 @@ pub trait StreamTableSource: Clone + std::fmt::Debug + Sync + Send + 'static {
/// It is assumed that the table contains an "index" column and new data arrives in increasing
/// "index" order.
#[derive(Debug)]
pub struct StreamTable<S> {
pub struct FeedTable<S> {
source: S,
}
impl<S> StreamTable<S> {
impl<S> FeedTable<S> {
pub fn new(source: S) -> Self {
Self { source }
}
Expand Down Expand Up @@ -82,7 +82,7 @@ impl<S> StreamTable<S> {
}

#[async_trait]
impl<S: StreamTableSource> TableProvider for StreamTable<S> {
impl<S: FeedTableSource> TableProvider for FeedTable<S> {
fn as_any(&self) -> &dyn Any {
self
}
Expand Down Expand Up @@ -151,7 +151,7 @@ pub struct StreamExec<S> {
properties: PlanProperties,
}

impl<S: StreamTableSource> ExecutionPlan for StreamExec<S> {
impl<S: FeedTableSource> ExecutionPlan for StreamExec<S> {
fn name(&self) -> &str {
"StreamExec"
}
Expand Down
4 changes: 2 additions & 2 deletions pipeline/src/since/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@
//!
//! Actor handles may implement [`StreamTableSource`] and register a [`StreamTable`] on the [`datafusion::execution::context::SessionContext`] in order to provide query access to the stream.
mod feed;
mod metrics;
mod stream;

pub use stream::{StreamTable, StreamTableSource};
pub use feed::{FeedTable, FeedTableSource};

use arrow::{
array::{RecordBatch, UInt64Array},
Expand Down

0 comments on commit ab147b7

Please sign in to comment.