From c03ee025f76568e773d2611551ec1ae6b9eb6bab Mon Sep 17 00:00:00 2001 From: Mohsin Zaidi <2236875+smrz2001@users.noreply.github.com> Date: Thu, 12 Sep 2024 18:33:39 -0400 Subject: [PATCH] feat: anchor service (#484) --- Cargo.lock | 1 + Cargo.toml | 1 - anchor-remote/src/cas_remote.rs | 40 ++- api/src/server.rs | 35 ++- api/src/server/event.rs | 4 +- api/src/tests.rs | 87 +++--- core/Cargo.toml | 1 + core/src/key.rs | 124 -------- core/src/lib.rs | 7 +- core/src/node_id.rs | 269 ++++++++++++++++++ cspell.json | 1 + event-svc/src/event/migration.rs | 5 +- event-svc/src/event/order_events.rs | 8 +- event-svc/src/event/ordering_task.rs | 4 +- event-svc/src/event/service.rs | 12 +- event-svc/src/event/store.rs | 8 +- event-svc/src/store/metrics.rs | 8 +- event-svc/src/store/sql/access/event.rs | 15 +- event-svc/src/store/sql/entities/event.rs | 17 +- event-svc/src/store/sql/query.rs | 6 +- event-svc/src/store/sql/test.rs | 5 +- event-svc/src/tests/event.rs | 86 ++++-- event-svc/src/tests/migration.rs | 5 +- event-svc/src/tests/ordering.rs | 19 +- event/src/unvalidated/event.rs | 6 +- event/src/unvalidated/payload/init.rs | 10 +- interest-svc/src/interest/store.rs | 4 +- interest-svc/src/store/metrics.rs | 5 +- interest-svc/src/tests/interest.rs | 72 +++-- ...40809205107_event_source_anchored.down.sql | 5 + ...0240809205107_event_source_anchored.up.sql | 9 + one/src/daemon.rs | 4 +- p2p/src/node.rs | 3 +- recon/src/libp2p/protocol.rs | 13 +- recon/src/libp2p/tests.rs | 9 +- recon/src/protocol.rs | 30 +- recon/src/recon.rs | 20 +- recon/src/recon/btreestore.rs | 11 +- recon/src/recon/tests.rs | 22 +- 39 files changed, 655 insertions(+), 336 deletions(-) delete mode 100644 core/src/key.rs create mode 100644 core/src/node_id.rs create mode 100644 migrations/sqlite/20240809205107_event_source_anchored.down.sql create mode 100644 migrations/sqlite/20240809205107_event_source_anchored.up.sql diff --git a/Cargo.lock b/Cargo.lock index a39dbc156..a174c963d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1540,6 +1540,7 @@ dependencies = [ "multihash-codetable", "multihash-derive 0.9.0", "once_cell", + "rand 0.8.5", "regex", "ring 0.17.8", "serde", diff --git a/Cargo.toml b/Cargo.toml index efc62bb92..6886fee25 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -60,7 +60,6 @@ bytecheck = "0.6.7" bytes = "1.1" bytesize = "1.1" ceramic-anchor-service = { path = "./anchor-service" } -ceramic-anchor-tx = { path = "./anchor-tx" } ceramic-api = { path = "./api" } ceramic-api-server = { path = "./api-server" } ceramic-arrow-test = { path = "./arrow-test" } diff --git a/anchor-remote/src/cas_remote.rs b/anchor-remote/src/cas_remote.rs index afa74b68b..3c43c64e1 100644 --- a/anchor-remote/src/cas_remote.rs +++ b/anchor-remote/src/cas_remote.rs @@ -17,9 +17,7 @@ use ceramic_anchor_service::{ DetachedTimeEvent, MerkleNode, MerkleNodes, RootTimeEvent, TransactionManager, }; use ceramic_car::CarReader; -use ceramic_core::{ - cid_from_ed25519_key_pair, did_key_from_ed25519_key_pair, Cid, StreamId, StreamIdType, -}; +use ceramic_core::{Cid, NodeId, StreamId}; use ceramic_event::unvalidated::Proof; pub const AGENT_VERSION: &str = concat!("ceramic-one/", env!("CARGO_PKG_VERSION")); @@ -63,23 +61,15 @@ struct CasAnchorResponse { pub witness_car: Option, } -fn cid_to_stream_id(cid: Cid) -> StreamId { - StreamId { - r#type: StreamIdType::Unloadable, - cid, - } -} - /// Remote CAS transaction manager pub struct RemoteCas { + node_id: NodeId, signing_key: Ed25519KeyPair, url: String, poll_interval: Duration, poll_retry_count: u32, jws_header_b64: String, http_client: reqwest::Client, - /// The did:key of the node represented as a StreamId - node_stream_id: StreamId, } enum CasResponseParseResult { @@ -122,12 +112,13 @@ impl TransactionManager for RemoteCas { impl RemoteCas { /// Create a new RemoteCas instance pub fn new( + node_id: NodeId, keypair: Ed25519KeyPair, remote_anchor_service_url: String, anchor_poll_interval: Duration, anchor_poll_retry_count: u32, ) -> Self { - let controller = did_key_from_ed25519_key_pair(&keypair); + let controller = node_id.did_key(); let jws_header = Header { kid: format!( "{}#{}", @@ -141,22 +132,21 @@ impl RemoteCas { }; let jws_header_b64 = b64.encode(serde_json::to_vec(&jws_header).expect("invalid jws header")); - let node_stream_id = cid_to_stream_id(cid_from_ed25519_key_pair(&keypair)); Self { + node_id, signing_key: keypair, url: format!("{}/api/v0/requests", remote_anchor_service_url), poll_interval: anchor_poll_interval, poll_retry_count: anchor_poll_retry_count, jws_header_b64, http_client: reqwest::Client::new(), - node_stream_id, } } /// Create an anchor request on the remote CAS pub async fn create_anchor_request(&self, root_cid: Cid) -> Result { let cas_request_body = serde_json::to_string(&CasAnchorRequest { - stream_id: self.node_stream_id.clone(), + stream_id: self.node_id.stream_id(), cid: root_cid.to_string(), timestamp: chrono::Utc::now().to_rfc3339(), ceramic_one_version: AGENT_VERSION.to_owned(), @@ -250,10 +240,10 @@ mod tests { use ring::signature::Ed25519KeyPair; use ceramic_anchor_service::{AnchorService, MockAnchorClient, Store, TransactionManager}; - use ceramic_core::{ed25519_key_pair_from_secret, Cid}; + use ceramic_core::Cid; - fn node_private_key() -> Ed25519KeyPair { - ed25519_key_pair_from_secret( + fn node_id_and_private_key() -> (NodeId, Ed25519KeyPair) { + NodeId::try_from_secret( std::env::var("NODE_PRIVATE_KEY") // The following secret is NOT authenticated with CAS, it is only used for testing. .unwrap_or( @@ -270,8 +260,10 @@ mod tests { async fn test_anchor_batch_with_cas() { let anchor_client = Arc::new(MockAnchorClient::new(10)); let anchor_requests = anchor_client.local_sourced_data_events().await.unwrap(); + let (node_id, keypair) = node_id_and_private_key(); let remote_cas = Arc::new(RemoteCas::new( - node_private_key(), + node_id, + keypair, "https://cas-dev.3boxlabs.com".to_owned(), Duration::from_secs(1), 1, @@ -290,9 +282,11 @@ mod tests { async fn test_create_anchor_request_with_cas() { let mock_root_cid = Cid::from_str("bafyreia776z4jdg5zgycivcpr3q6lcu6llfowkrljkmq3bex2k5hkzat54").unwrap(); + let (node_id, keypair) = node_id_and_private_key(); let remote_cas = RemoteCas::new( - node_private_key(), + node_id, + keypair, "https://cas-dev.3boxlabs.com".to_owned(), Duration::from_secs(1), 1, @@ -316,8 +310,10 @@ mod tests { async fn test_jwt() { let mock_data = serde_ipld_dagcbor::to_vec(b"mock root").unwrap(); let mock_hash = MultihashDigest::digest(&Code::Sha2_256, &mock_data); + let (node_id, keypair) = node_id_and_private_key(); let remote_cas = Arc::new(RemoteCas::new( - node_private_key(), + node_id, + keypair, "https://cas-dev.3boxlabs.com".to_owned(), Duration::from_secs(1), 1, diff --git a/api/src/server.rs b/api/src/server.rs index 23bcd550f..cc99b1bc8 100644 --- a/api/src/server.rs +++ b/api/src/server.rs @@ -34,7 +34,7 @@ use ceramic_api_server::{ ExperimentalInterestsGetResponse, FeedEventsGetResponse, FeedResumeTokenGetResponse, InterestsPostResponse, }; -use ceramic_core::{Cid, EventId, Interest, Network, PeerId, StreamId}; +use ceramic_core::{Cid, EventId, Interest, Network, NodeId, PeerId, StreamId}; use futures::TryFutureExt; use recon::Key; use swagger::{ApiError, ByteArray}; @@ -245,7 +245,11 @@ impl ApiItem { #[async_trait] pub trait EventService: Send + Sync { /// Returns (new_key, new_value) where true if was newly inserted, false if it already existed. - async fn insert_many(&self, items: Vec) -> Result>; + async fn insert_many( + &self, + items: Vec, + informant: NodeId, + ) -> Result>; async fn range_with_values( &self, range: Range, @@ -280,8 +284,12 @@ pub trait EventService: Send + Sync { #[async_trait::async_trait] impl EventService for Arc { - async fn insert_many(&self, items: Vec) -> Result> { - self.as_ref().insert_many(items).await + async fn insert_many( + &self, + items: Vec, + informant: NodeId, + ) -> Result> { + self.as_ref().insert_many(items, informant).await } async fn range_with_values( @@ -333,7 +341,7 @@ struct InsertTask { #[derive(Clone)] pub struct Server { - peer_id: PeerId, + node_id: NodeId, network: Network, interest: I, model: Arc, @@ -349,17 +357,17 @@ where I: InterestService, M: EventService + 'static, { - pub fn new(peer_id: PeerId, network: Network, interest: I, model: Arc) -> Self { + pub fn new(node_id: NodeId, network: Network, interest: I, model: Arc) -> Self { let (tx, event_rx) = tokio::sync::mpsc::channel::(1024); let event_store = model.clone(); - let handle = Self::start_insert_task(event_store, event_rx); + let handle = Self::start_insert_task(event_store, event_rx, node_id); let insert_task = Arc::new(InsertTask { _handle: handle, tx, }); Server { - peer_id, + node_id, network, interest, model, @@ -377,6 +385,7 @@ where fn start_insert_task( event_store: Arc, mut event_rx: tokio::sync::mpsc::Receiver, + node_id: NodeId, ) -> tokio::task::JoinHandle<()> { tokio::spawn(async move { let mut interval = tokio::time::interval(Duration::from_millis(FLUSH_INTERVAL_MS)); @@ -389,7 +398,7 @@ where let mut buf = Vec::with_capacity(EVENTS_TO_RECEIVE); tokio::select! { _ = interval.tick() => { - Self::process_events(&mut events, &event_store).await; + Self::process_events(&mut events, &event_store, node_id).await; } val = event_rx.recv_many(&mut buf, EVENTS_TO_RECEIVE) => { if val > 0 { @@ -400,7 +409,7 @@ where let shutdown = event_rx.is_closed(); // make sure the events queue doesn't get too deep when we're under heavy load if events.len() >= EVENT_INSERT_QUEUE_SIZE || shutdown { - Self::process_events(&mut events, &event_store).await; + Self::process_events(&mut events, &event_store, node_id).await; } if shutdown { tracing::info!("Shutting down insert task."); @@ -410,7 +419,7 @@ where }) } - async fn process_events(events: &mut Vec, event_store: &Arc) { + async fn process_events(events: &mut Vec, event_store: &Arc, node_id: NodeId) { if events.is_empty() { return; } @@ -421,7 +430,7 @@ where items.push(ApiItem::new(req.id, req.data)); }); tracing::trace!("calling insert many with {} items.", items.len()); - match event_store.insert_many(items).await { + match event_store.insert_many(items, node_id).await { Ok(results) => { tracing::debug!("insert many returned {} results.", results.len()); for result in results { @@ -682,7 +691,7 @@ where // Update interest ranges to include this new subscription. let interest = Interest::builder() .with_sep_key(&interest.sep) - .with_peer_id(&self.peer_id) + .with_peer_id(&self.node_id.peer_id()) .with_range((start.as_slice(), stop.as_slice())) .with_not_after(0) .build(); diff --git a/api/src/server/event.rs b/api/src/server/event.rs index e98ac4869..5935415ab 100644 --- a/api/src/server/event.rs +++ b/api/src/server/event.rs @@ -53,8 +53,8 @@ where } } } - unvalidated::Event::Unsigned(event) => { - event_id_from_init_payload(&event_cid, network, &event_cid, event.payload()) + unvalidated::Event::Unsigned(init) => { + event_id_from_init_payload(&event_cid, network, &event_cid, init.payload()) } } } diff --git a/api/src/tests.rs b/api/src/tests.rs index 2ede64b9f..c1460b58f 100644 --- a/api/src/tests.rs +++ b/api/src/tests.rs @@ -16,7 +16,7 @@ use ceramic_api_server::{ InterestsPostResponse, InterestsSortKeySortValuePostResponse, }; use ceramic_core::{Cid, Interest}; -use ceramic_core::{EventId, Network, PeerId, StreamId}; +use ceramic_core::{EventId, Network, NodeId, PeerId, StreamId}; use expect_test::expect; use mockall::{mock, predicate}; use multibase::Base; @@ -111,7 +111,7 @@ mock! { pub EventStoreTest {} #[async_trait] impl EventService for EventStoreTest { - async fn insert_many(&self, items: Vec) -> Result>; + async fn insert_many(&self, items: Vec, node_id: NodeId) -> Result>; async fn range_with_values( &self, range: Range, @@ -168,7 +168,7 @@ pub fn mock_get_unsigned_init_event(mock_store: &mut MockEventStoreTest) { #[test(tokio::test)] async fn create_event() { - let peer_id = PeerId::random(); + let node_id = NodeId::random().unwrap().0; let network = Network::Mainnet; let expected_event_id = EventId::try_from(hex::decode(DATA_EVENT_ID).unwrap()).unwrap(); @@ -187,15 +187,15 @@ async fn create_event() { mock_event_store .expect_insert_many() - .with(predicate::eq(args)) + .with(predicate::eq(args), predicate::eq(node_id)) .times(1) - .returning(|input| { + .returning(|input, _| { Ok(input .into_iter() .map(|v| EventInsertResult::new_ok(v.key.clone())) .collect()) }); - let server = Server::new(peer_id, network, mock_interest, Arc::new(mock_event_store)); + let server = Server::new(node_id, network, mock_interest, Arc::new(mock_event_store)); let resp = server .events_post( models::EventData { @@ -209,7 +209,7 @@ async fn create_event() { } #[test(tokio::test)] async fn create_event_fails() { - let peer_id = PeerId::random(); + let node_id = NodeId::random().unwrap().0; let network = Network::Mainnet; let expected_event_id = EventId::try_from(hex::decode(DATA_EVENT_ID).unwrap()).unwrap(); @@ -228,9 +228,9 @@ async fn create_event_fails() { mock_event_store .expect_insert_many() - .with(predicate::eq(args)) + .with(predicate::eq(args), predicate::eq(node_id)) .times(1) - .returning(|input| { + .returning(|input, _| { Ok(input .iter() .map(|i| { @@ -241,7 +241,7 @@ async fn create_event_fails() { }) .collect()) }); - let server = Server::new(peer_id, network, mock_interest, Arc::new(mock_event_store)); + let server = Server::new(node_id, network, mock_interest, Arc::new(mock_event_store)); let resp = server .events_post( models::EventData { @@ -256,7 +256,7 @@ async fn create_event_fails() { #[test(tokio::test)] async fn register_interest_sort_value() { - let peer_id = PeerId::random(); + let node_id = NodeId::random().unwrap().0; let network = Network::InMemory; let model = "k2t6wz4ylx0qr6v7dvbczbxqy7pqjb0879qx930c1e27gacg3r8sllonqt4xx9"; // cspell:disable-line @@ -283,7 +283,7 @@ async fn register_interest_sort_value() { .with(predicate::eq( Interest::builder() .with_sep_key("model") - .with_peer_id(&peer_id) + .with_peer_id(&node_id.peer_id()) .with_range((start.as_slice(), end.as_slice())) .with_not_after(0) .build(), @@ -291,7 +291,7 @@ async fn register_interest_sort_value() { .times(1) .returning(|_| Ok(true)); let mock_event_store = MockEventStoreTest::new(); - let server = Server::new(peer_id, network, mock_interest, Arc::new(mock_event_store)); + let server = Server::new(node_id, network, mock_interest, Arc::new(mock_event_store)); let interest = models::Interest { sep: "model".to_string(), sep_value: model.to_owned(), @@ -305,7 +305,7 @@ async fn register_interest_sort_value() { #[test(tokio::test)] async fn register_interest_sort_value_bad_request() { - let peer_id = PeerId::random(); + let node_id = NodeId::random().unwrap().0; let network = Network::InMemory; let model = "2t6wz4ylx0qr6v7dvbczbxqy7pqjb0879qx930c1e27gacg3r8sllonqt4xx9"; //missing 'k' cspell:disable-line @@ -313,7 +313,7 @@ async fn register_interest_sort_value_bad_request() { // Setup mock expectations let mock_interest = MockAccessInterestStoreTest::new(); let mock_event_store = MockEventStoreTest::new(); - let server = Server::new(peer_id, network, mock_interest, Arc::new(mock_event_store)); + let server = Server::new(node_id, network, mock_interest, Arc::new(mock_event_store)); let interest = models::Interest { sep: "model".to_string(), sep_value: model.to_owned(), @@ -327,7 +327,7 @@ async fn register_interest_sort_value_bad_request() { #[test(tokio::test)] async fn register_interest_sort_value_controller() { - let peer_id = PeerId::random(); + let node_id = NodeId::random().unwrap().0; let network = Network::InMemory; let model = "z3KWHw5Efh2qLou2FEdz3wB8ZvLgURJP94HeijLVurxtF1Ntv6fkg2G"; // base58 encoded should work cspell:disable-line // we convert to base36 before storing @@ -353,7 +353,7 @@ async fn register_interest_sort_value_controller() { .with(predicate::eq( Interest::builder() .with_sep_key("model") - .with_peer_id(&peer_id) + .with_peer_id(&node_id.peer_id()) .with_range((start.as_slice(), end.as_slice())) .with_not_after(0) .build(), @@ -361,7 +361,7 @@ async fn register_interest_sort_value_controller() { .times(1) .returning(|__| Ok(true)); let mock_event_store = MockEventStoreTest::new(); - let server = Server::new(peer_id, network, mock_interest, Arc::new(mock_event_store)); + let server = Server::new(node_id, network, mock_interest, Arc::new(mock_event_store)); let resp = server .interests_sort_key_sort_value_post( "model".to_string(), @@ -378,7 +378,7 @@ async fn register_interest_sort_value_controller() { #[test(tokio::test)] async fn register_interest_value_controller_stream() { - let peer_id = PeerId::random(); + let node_id = NodeId::random().unwrap().0; let network = Network::InMemory; let model = "k2t6wz4ylx0qr6v7dvbczbxqy7pqjb0879qx930c1e27gacg3r8sllonqt4xx9"; // cspell:disable-line let controller = "did:key:zGs1Det7LHNeu7DXT4nvoYrPfj3n6g7d6bj2K4AMXEvg1"; @@ -405,7 +405,7 @@ async fn register_interest_value_controller_stream() { .with(predicate::eq( Interest::builder() .with_sep_key("model") - .with_peer_id(&peer_id) + .with_peer_id(&node_id.peer_id()) .with_range((start.as_slice(), end.as_slice())) .with_not_after(0) .build(), @@ -413,7 +413,7 @@ async fn register_interest_value_controller_stream() { .times(1) .returning(|__| Ok(true)); let mock_event_store = MockEventStoreTest::new(); - let server = Server::new(peer_id, network, mock_interest, Arc::new(mock_event_store)); + let server = Server::new(node_id, network, mock_interest, Arc::new(mock_event_store)); let resp = server .interests_sort_key_sort_value_post( "model".to_string(), @@ -429,7 +429,10 @@ async fn register_interest_value_controller_stream() { #[test(tokio::test)] async fn get_interests() { - let peer_id = PeerId::from_str("1AaNXU5G2SJQSzCCP23V2TEDierSRBBGLA7aSCYScUTke9").unwrap(); + let node_id = NodeId::try_from_peer_id( + &PeerId::from_str("12D3KooWRyGSRzzEBpHbHyRkGTgCpXuoRMQgYrqk7tFQzM3AFEWp").unwrap(), + ) + .unwrap(); let network = Network::InMemory; let model = "k2t6wz4ylx0qr6v7dvbczbxqy7pqjb0879qx930c1e27gacg3r8sllonqt4xx9"; // cspell:disable-line let controller = "did:key:zGs1Det7LHNeu7DXT4nvoYrPfj3n6g7d6bj2K4AMXEvg1"; @@ -464,13 +467,13 @@ async fn get_interests() { Ok(vec![ Interest::builder() .with_sep_key("model") - .with_peer_id(&peer_id) + .with_peer_id(&node_id.peer_id()) .with_range((start.as_slice(), end.as_slice())) .with_not_after(0) .build(), Interest::builder() .with_sep_key("model") - .with_peer_id(&peer_id) + .with_peer_id(&node_id.peer_id()) .with_range((start.as_slice(), end.as_slice())) .with_not_after(1) .build(), @@ -478,7 +481,7 @@ async fn get_interests() { }); let mock_event_store = MockEventStoreTest::new(); - let server = Server::new(peer_id, network, mock_interest, Arc::new(mock_event_store)); + let server = Server::new(node_id, network, mock_interest, Arc::new(mock_event_store)); let resp = server .experimental_interests_get(None, &Context) .await @@ -488,10 +491,10 @@ async fn get_interests() { InterestsGet { interests: [ InterestsGetInterestsInner { - data: "zwZSodouDdQxpoGdqDjfm1n8r3v18Eoo4si4AFo1UbGArVey2XHwDBwDshSiPN36DDWaE7MprPpBmNrZkDhrFugryq9nnAVyP6M9oTns8fjB4RqR7oCNEX8HDBZAbVVrpXY2QcWHu5Dy", + data: "z7A21giXcVHK1TYeLxyBRsC1t4PxUBZQVV5nEJAi6wnG1cjoKnLCY4rckApWeBQLumpP9f5Vw71gzid5a7ih2txqAoM4pLGvJa8m6sgQxDF812gLxoAakDP81oda6Aib9raNy9fKYJhYtpVaZm", }, InterestsGetInterestsInner { - data: "zwZSodouDdQxpoGdqDjfm1n8r3v18Eoo4si4AFo1UbGArVey2XHwDBwDshSiPN36DDWaE7MprPpBmNrZkDhrFugryq9nnAVyP6M9oTns8fjB4RqR7oCNEX8HDBZAbVVrpXY2QcWHu5Dz", + data: "z7A21giXcVHK1TYeLxyBRsC1t4PxUBZQVV5nEJAi6wnG1cjoKnLCY4rckApWeBQLumpP9f5Vw71gzid5a7ih2txqAoM4pLGvJa8m6sgQxDF812gLxoAakDP81oda6Aib9raNy9fKYJhYtpVaZn", }, ], }, @@ -500,8 +503,12 @@ async fn get_interests() { } #[test(tokio::test)] async fn get_interests_for_peer() { - let peer_id_a = PeerId::from_str("1AaNXU5G2SJQSzCCP23V2TEDierSRBBGLA7aSCYScUTke9").unwrap(); - let peer_id_b = PeerId::from_str("1AcJjoLqKWAPBRQYsff8ZQPjwinCAFUdTEgZrAZgeZkCu7").unwrap(); + let peer_id_a = + PeerId::from_str("12D3KooWRyGSRzzEBpHbHyRkGTgCpXuoRMQgYrqk7tFQzM3AFEWp").unwrap(); + let node_id_b = NodeId::try_from_peer_id( + &PeerId::from_str("12D3KooWR1M8JiXyfdBKUhCLUmTJGhtNsgxnhvFVD4AU4EioDUwu").unwrap(), + ) + .unwrap(); let network = Network::InMemory; let model = "k2t6wz4ylx0qr6v7dvbczbxqy7pqjb0879qx930c1e27gacg3r8sllonqt4xx9"; // cspell:disable-line let controller = "did:key:zGs1Det7LHNeu7DXT4nvoYrPfj3n6g7d6bj2K4AMXEvg1"; @@ -548,13 +555,13 @@ async fn get_interests_for_peer() { .build(), Interest::builder() .with_sep_key("model") - .with_peer_id(&peer_id_b) + .with_peer_id(&node_id_b.peer_id()) .with_range((start.as_slice(), end.as_slice())) .with_not_after(0) .build(), Interest::builder() .with_sep_key("model") - .with_peer_id(&peer_id_b) + .with_peer_id(&node_id_b.peer_id()) .with_range((start.as_slice(), end.as_slice())) .with_not_after(1) .build(), @@ -563,7 +570,7 @@ async fn get_interests_for_peer() { let mock_event_store = MockEventStoreTest::new(); let server = Server::new( - peer_id_b, + node_id_b, network, mock_interest, Arc::new(mock_event_store), @@ -577,10 +584,10 @@ async fn get_interests_for_peer() { InterestsGet { interests: [ InterestsGetInterestsInner { - data: "zwZSodouDdQxpoGdqDjfm1n8r3v18Eoo4si4AFo1UbGArVey2XHwDBwDshSiPN36DDWaE7MprPpBmNrZkDhrFugryq9nnAVyP6M9oTns8fjB4RqR7oCNEX8HDBZAbVVrpXY2QcWHu5Dy", + data: "z7A21giXcVHK1TYeLxyBRsC1t4PxUBZQVV5nEJAi6wnG1cjoKnLCY4rckApWeBQLumpP9f5Vw71gzid5a7ih2txqAoM4pLGvJa8m6sgQxDF812gLxoAakDP81oda6Aib9raNy9fKYJhYtpVaZm", }, InterestsGetInterestsInner { - data: "zwZSodouDdQxpoGdqDjfm1n8r3v18Eoo4si4AFo1UbGArVey2XHwDBwDshSiPN36DDWaE7MprPpBmNrZkDhrFugryq9nnAVyP6M9oTns8fjB4RqR7oCNEX8HDBZAbVVrpXY2QcWHu5Dz", + data: "z7A21giXcVHK1TYeLxyBRsC1t4PxUBZQVV5nEJAi6wnG1cjoKnLCY4rckApWeBQLumpP9f5Vw71gzid5a7ih2txqAoM4pLGvJa8m6sgQxDF812gLxoAakDP81oda6Aib9raNy9fKYJhYtpVaZn", }, ], }, @@ -590,7 +597,7 @@ async fn get_interests_for_peer() { #[test(tokio::test)] async fn get_events_for_interest_range() { - let peer_id = PeerId::random(); + let node_id = NodeId::random().unwrap().0; let network = Network::InMemory; let model = "k2t6wz4ylx0qr6v7dvbczbxqy7pqjb0879qx930c1e27gacg3r8sllonqt4xx9"; // cspell:disable-line let controller = "did:key:zGs1Det7LHNeu7DXT4nvoYrPfj3n6g7d6bj2K4AMXEvg1"; @@ -628,7 +635,7 @@ async fn get_events_for_interest_range() { ) .times(1) .returning(move |_, _, _| Ok(vec![(cid, vec![])])); - let server = Server::new(peer_id, network, mock_interest, Arc::new(mock_event_store)); + let server = Server::new(node_id, network, mock_interest, Arc::new(mock_event_store)); let resp = server .experimental_events_sep_sep_value_get( "model".to_string(), @@ -653,7 +660,7 @@ async fn get_events_for_interest_range() { #[test(tokio::test)] async fn test_events_event_id_get_by_event_id_success() { - let peer_id = PeerId::random(); + let node_id = NodeId::random().unwrap().0; let network = Network::InMemory; let event_cid = Cid::from_str("baejbeicqtpe5si4qvbffs2s7vtbk5ccbsfg6owmpidfj3zeluqz4hlnz6m").unwrap(); // cspell:disable-line @@ -677,7 +684,7 @@ async fn test_events_event_id_get_by_event_id_success() { .times(1) .returning(move |_| Ok(Some(event_data.clone()))); let mock_interest = MockAccessInterestStoreTest::new(); - let server = Server::new(peer_id, network, mock_interest, Arc::new(mock_event_store)); + let server = Server::new(node_id, network, mock_interest, Arc::new(mock_event_store)); let result = server.events_event_id_get(event_id_str, &Context).await; let EventsEventIdGetResponse::Success(event) = result.unwrap() else { panic!("Expected EventsEventIdGetResponse::Success but got another variant"); @@ -692,7 +699,7 @@ async fn test_events_event_id_get_by_event_id_success() { #[test(tokio::test)] async fn test_events_event_id_get_by_cid_success() { - let peer_id = PeerId::random(); + let node_id = NodeId::random().unwrap().0; let network = Network::InMemory; let event_cid = Cid::from_str("baejbeihyr3kf77etqdccjfoc33dmko2ijyugn6qk6yucfkioasjssz3bbu").unwrap(); // cspell:disable-line @@ -705,7 +712,7 @@ async fn test_events_event_id_get_by_cid_success() { .times(1) .returning(move |_| Ok(Some(event_data.clone()))); let mock_interest = MockAccessInterestStoreTest::new(); - let server = Server::new(peer_id, network, mock_interest, Arc::new(mock_event_store)); + let server = Server::new(node_id, network, mock_interest, Arc::new(mock_event_store)); let result = server .events_event_id_get(event_cid.to_string(), &Context) .await; diff --git a/core/Cargo.toml b/core/Cargo.toml index adaf51e6c..a68ad2c8c 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -21,6 +21,7 @@ multibase.workspace = true multihash-codetable.workspace = true multihash-derive.workspace = true once_cell.workspace = true +rand.workspace = true regex.workspace = true ring.workspace = true serde.workspace = true diff --git a/core/src/key.rs b/core/src/key.rs deleted file mode 100644 index b6ce76714..000000000 --- a/core/src/key.rs +++ /dev/null @@ -1,124 +0,0 @@ -use std::{fs, path::PathBuf, str::FromStr}; - -use anyhow::{anyhow, Context, Result}; -use cid::multihash::Multihash; -use cid::Cid; -use ring::signature::{Ed25519KeyPair, KeyPair}; - -/// Read an Ed25519 key from a directory and return a key pair -pub async fn read_ed25519_key_from_dir(p2p_key_dir: PathBuf) -> Result { - let key_path = p2p_key_dir.join("id_ed25519_0"); - let content = fs::read_to_string(key_path)?; - let seed = ssh_key::private::PrivateKey::from_str(&content) - .map_err(|e| anyhow::anyhow!("failed to parse private key: {}", e))? - .key_data() - .ed25519() - .map_or(Err(anyhow::anyhow!("failed to parse ed25519 key")), |key| { - Ok(key.private.to_bytes()) - })?; - Ed25519KeyPair::from_seed_unchecked(seed.as_ref()) - .map_err(|e| anyhow::anyhow!("failed to create key pair: {}", e)) -} - -/// Create an Ed25519 key pair from a secret. The secret can be formatted in two ways: -/// - Multibase of Secret:Multibase of Public Key -/// (e.g. z3u2WLX8jeyN6sfbDowLGudoZHudxgVkNJfrw2TDTVx4tijd:z6MkueF19qChpGQJBJXcXjfoM1MYCwC167RMwUiNWXXvEm1M) -/// In this example, the DID will be did:key:z6MkueF19qChpGQJBJXcXjfoM1MYCwC167RMwUiNWXXvEm1M. -/// -/// - Multibase of unchecked Secret (i.e. not matched against public key) -/// (e.g. z3u2WLX8jeyN6sfbDowLGudoZHudxgVkNJfrw2TDTVx4tijd) -pub fn ed25519_key_pair_from_secret(secret: &str) -> Result { - let mut parts = secret.split(':'); - let secret = parts.next().expect("split should never give zero parts"); - let secret_with_prefix: [u8; 34] = multibase::decode(secret) - .context("secret is not multibase encoded")? - .1 - .try_into() - .map_err(|_| { - anyhow!("secret must be 0x8026 followed by 32 bytes of ed25519 private key") - })?; - let secret: [u8; 32] = secret_with_prefix - .strip_prefix(b"\x80\x26") - .context("secret must be 0x8026 followed by 32 bytes of ed25519 private key")? - .try_into()?; - match parts.next() { - None => Ok(Ed25519KeyPair::from_seed_unchecked(&secret) - .map_err(|e| anyhow!("failed to create key pair from secret: {}", e))?), - Some(public_multibase) => { - let public_with_prefix: [u8; 34] = multibase::decode(public_multibase) - .context("public key is not multibase encoded")? - .1 - .try_into() - .map_err(|_| { - anyhow!("public key must be 0xed01 followed by 32 bytes of ed25519 public key") - })?; - let public: [u8; 32] = public_with_prefix - .strip_prefix(b"\xed\x01") - .context("public key must be 0xed01 followed by 32 bytes of ed25519 public key")? - .try_into()?; - Ok( - Ed25519KeyPair::from_seed_and_public_key(&secret, &public).map_err(|e| { - anyhow!( - "failed to create key pair from secret and public key: {}", - e - ) - })?, - ) - } - } -} - -/// Create a DID key from an Ed25519 key pair -pub fn did_key_from_ed25519_key_pair(key: &Ed25519KeyPair) -> String { - let public = key.public_key().as_ref(); - let public_with_prefix = [b"\xed\x01", public].concat(); - let public_multibase = multibase::encode(multibase::Base::Base58Btc, public_with_prefix); - format!("did:key:{}", public_multibase) -} - -/// Create a CID from an Ed25519 key pair -pub fn cid_from_ed25519_key_pair(key: &Ed25519KeyPair) -> Cid { - let public = key.public_key().as_ref(); - let hash = Multihash::<64>::wrap(0, public).expect("ed25519 public key is 32 bytes"); - Cid::new_v1(0xed, hash) -} - -/// Create a DID from a Libp2p Peer ID Multihash -pub fn did_from_peer_id(peer_id_mh: &Multihash<64>) -> Result { - if peer_id_mh.code() != 0x00 { - return Err(anyhow!("peer ID multihash is not identity")); - } - if peer_id_mh.size() != 36 { - return Err(anyhow!("peer ID multihash is not 36 bytes")); - } - let libp2p_key = peer_id_mh.digest(); - let ed25519_public_key = libp2p_key - .strip_prefix(b"\x08\x01\x12\x20") - .context("libp2p peer ID must be 0x08011220 followed by 32 bytes of ed25519 public key")?; - let ed25519_public_key_with_prefix = [b"\xed\x01", ed25519_public_key].concat(); - let ed25519_public_key_multibase = - multibase::encode(multibase::Base::Base58Btc, ed25519_public_key_with_prefix); - Ok(format!("did:key:{}", ed25519_public_key_multibase)) -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_ed25519_key_pair_from_secret() { - let secret = "z3u2WLX8jeyN6sfbDowLGudoZHudxgVkNJfrw2TDTVx4tijd"; - let key1 = ed25519_key_pair_from_secret(secret).unwrap(); - let secret_and_public = "z3u2WLX8jeyN6sfbDowLGudoZHudxgVkNJfrw2TDTVx4tijd:z6MkueF19qChpGQJBJXcXjfoM1MYCwC167RMwUiNWXXvEm1M"; - let key2 = ed25519_key_pair_from_secret(secret_and_public).unwrap(); - assert_eq!( - did_key_from_ed25519_key_pair(&key1), - did_key_from_ed25519_key_pair(&key2) - ); - assert_eq!( - did_key_from_ed25519_key_pair(&key1), - "did:key:z6MkueF19qChpGQJBJXcXjfoM1MYCwC167RMwUiNWXXvEm1M" - ); - println!("{}", did_key_from_ed25519_key_pair(&key1)); - } -} diff --git a/core/src/lib.rs b/core/src/lib.rs index 3f0bd2ddf..e72686495 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -5,8 +5,8 @@ mod bytes; pub mod event_id; pub mod interest; mod jwk; -mod key; mod network; +mod node_id; mod range; mod serialize_ext; mod stream_id; @@ -15,11 +15,8 @@ pub use bytes::Bytes; pub use event_id::EventId; pub use interest::{Interest, PeerId}; pub use jwk::Jwk; -pub use key::{ - cid_from_ed25519_key_pair, did_from_peer_id, did_key_from_ed25519_key_pair, - ed25519_key_pair_from_secret, read_ed25519_key_from_dir, -}; pub use network::Network; +pub use node_id::NodeId; pub use range::RangeOpen; pub use serialize_ext::SerializeExt; pub use stream_id::{StreamId, StreamIdType}; diff --git a/core/src/node_id.rs b/core/src/node_id.rs new file mode 100644 index 000000000..674175c45 --- /dev/null +++ b/core/src/node_id.rs @@ -0,0 +1,269 @@ +use std::fmt::Display; +use std::{fs, path::PathBuf, str::FromStr}; + +use crate::{StreamId, StreamIdType}; +use anyhow::{anyhow, Context, Ok, Result}; +use cid::multihash::Multihash; +use cid::Cid; +use libp2p_identity::PeerId; +use rand::Rng; +use ring::signature::{Ed25519KeyPair, KeyPair}; + +const ED25519_MULTICODEC: u64 = 0xed; +const ED25519_PUBLIC_KEY_MULTICODEC_PREFIX: &[u8; 2] = b"\xed\x01"; +const ED25519_PRIVATE_KEY_MULTICODEC_PREFIX: &[u8; 2] = b"\x80\x26"; +const ED25519_LIBP2P_PEER_ID_PREFIX: &[u8; 4] = b"\x08\x01\x12\x20"; + +/// NodeId is the public_ed25519_key_bytes of the node +#[derive(Clone, Eq, PartialEq, Copy)] +pub struct NodeId { + public_ed25519_key_bytes: [u8; 32], +} + +impl std::fmt::Debug for NodeId { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.did_key()) + } +} + +impl NodeId { + /// public_ed25519_key_bytes as a CID + pub fn cid(&self) -> Cid { + let hash = Multihash::<64>::wrap(0, self.public_ed25519_key_bytes.as_slice()) + .expect("ed25519 public key is 32 bytes"); + Cid::new_v1(ED25519_MULTICODEC, hash) + } + /// public_ed25519_key_bytes as a StreamId + pub fn stream_id(&self) -> StreamId { + StreamId { + r#type: StreamIdType::Unloadable, + cid: self.cid(), + } + } + /// public_ed25519_key_bytes as a did:key + pub fn did_key(&self) -> String { + let public_with_prefix = [ + ED25519_PUBLIC_KEY_MULTICODEC_PREFIX, + self.public_ed25519_key_bytes.as_ref(), + ] + .concat(); + let public_multibase = multibase::encode(multibase::Base::Base58Btc, public_with_prefix); + format!("did:key:{}", public_multibase) + } + /// public_ed25519_key_bytes as a PeerID + pub fn peer_id(&self) -> PeerId { + let libp2p_key = [ + ED25519_LIBP2P_PEER_ID_PREFIX, + self.public_ed25519_key_bytes.as_slice(), + ] + .concat(); + // Identity multihash code = 0x00 + let libp2p_key_multihash = Multihash::<64>::wrap(0x00, &libp2p_key) + .expect("self.public_ed25519_key_bytes to be well formed"); + PeerId::from_multihash(libp2p_key_multihash) + .expect("self.public_ed25519_key_bytes to be well formed") + } + /// public_ed25519_key_bytes from a Cid + pub fn try_from_cid(cid: Cid) -> Result { + let mh = cid.hash(); + if mh.code() != 0x00 { + return Err(anyhow!("Cid multihash is not identity")); + } + if mh.size() != 32 { + return Err(anyhow!("CID multihash is not 36 bytes")); + } + Ok(Self { + public_ed25519_key_bytes: mh.digest().try_into()?, + }) + } + /// public_ed25519_key_bytes from a did:key + pub fn try_from_did_key(did_key: &str) -> Result { + let public_key_multibase = did_key + .strip_prefix("did:key:") + .context("DID did not start with did:key")?; + let ed25519_public_key_with_prefix: [u8; 34] = multibase::decode(public_key_multibase)? + .1 + .try_into() + .map_err(|_| anyhow!("Failed to decode public key multibase"))?; + let ed25519_public_key = ed25519_public_key_with_prefix + .strip_prefix(ED25519_PUBLIC_KEY_MULTICODEC_PREFIX) + .context("")?; + let public_ed25519_key_bytes: [u8; 32] = ed25519_public_key.try_into()?; + Ok(Self { + public_ed25519_key_bytes, + }) + } + /// Read an Ed25519 key from a directory and return a NodeID with a key pair + pub fn try_from_dir(key_dir: PathBuf) -> Result<(Self, Ed25519KeyPair)> { + let key_path = key_dir.join("id_ed25519_0"); + let content = fs::read_to_string(key_path)?; + let seed = ssh_key::private::PrivateKey::from_str(&content) + .map_err(|e| anyhow::anyhow!("failed to parse private key: {}", e))? + .key_data() + .ed25519() + .map_or(Err(anyhow::anyhow!("failed to parse ed25519 key")), |key| { + Ok(key.private.to_bytes()) + })?; + let key_pair = Ed25519KeyPair::from_seed_unchecked(seed.as_ref()) + .map_err(|e| anyhow::anyhow!("failed to create key pair: {}", e))?; + let public_ed25519_key_bytes = key_pair.public_key().as_ref().try_into()?; + Ok(( + Self { + public_ed25519_key_bytes, + }, + key_pair, + )) + } + /// Create an Ed25519 key pair from a secret. The secret can be formatted in two ways: + /// - Multibase of Secret:Multibase of Public Key + /// (e.g. z3u2WLX8jeyN6sfbDowLGudoZHudxgVkNJfrw2TDTVx4tijd:z6MkueF19qChpGQJBJXcXjfoM1MYCwC167RMwUiNWXXvEm1M) + /// In this example, the DID will be did:key:z6MkueF19qChpGQJBJXcXjfoM1MYCwC167RMwUiNWXXvEm1M. + /// + /// - Multibase of unchecked Secret (i.e. not matched against public key) + /// (e.g. z3u2WLX8jeyN6sfbDowLGudoZHudxgVkNJfrw2TDTVx4tijd) + pub fn try_from_secret(secret: &str) -> Result<(Self, Ed25519KeyPair)> { + let mut parts = secret.split(':'); + let secret = parts.next().expect("split should never give zero parts"); + let secret_with_prefix: [u8; 34] = multibase::decode(secret) + .context("secret is not multibase encoded")? + .1 + .try_into() + .map_err(|_| { + anyhow!("secret must be 0x8026 followed by 32 bytes of ed25519 private key") + })?; + let secret: [u8; 32] = secret_with_prefix + .strip_prefix(ED25519_PRIVATE_KEY_MULTICODEC_PREFIX) + .context("secret must be 0x8026 followed by 32 bytes of ed25519 private key")? + .try_into()?; + let key_pair = match parts.next() { + None => Ed25519KeyPair::from_seed_unchecked(&secret) + .map_err(|e| anyhow!("failed to create key pair from secret: {}", e))?, + Some(public_multibase) => { + let public_with_prefix: [u8; 34] = multibase::decode(public_multibase) + .context("public key is not multibase encoded")? + .1 + .try_into() + .map_err(|_| { + anyhow!( + "public key must be 0xed01 followed by 32 bytes of ed25519 public key" + ) + })?; + let public: [u8; 32] = public_with_prefix + .strip_prefix(ED25519_PUBLIC_KEY_MULTICODEC_PREFIX) + .context( + "public key must be 0xed01 followed by 32 bytes of ed25519 public key", + )? + .try_into()?; + + Ed25519KeyPair::from_seed_and_public_key(&secret, &public).map_err(|e| { + anyhow!( + "failed to create key pair from secret and public key: {}", + e + ) + })? + } + }; + let public_ed25519_key_bytes = key_pair.public_key().as_ref().try_into()?; + Ok(( + Self { + public_ed25519_key_bytes, + }, + key_pair, + )) + } + /// public_ed25519_key_bytes from a PeerId + pub fn try_from_peer_id(peer_id: &PeerId) -> Result { + let peer_id_mh = peer_id.as_ref(); + if peer_id_mh.code() != 0x00 { + return Err(anyhow!("peer ID multihash is not identity")); + } + if peer_id_mh.size() != 36 { + return Err(anyhow!("peer ID multihash is not 36 bytes")); + } + let libp2p_key = peer_id_mh.digest(); + let ed25519_public_key = libp2p_key + .strip_prefix(ED25519_LIBP2P_PEER_ID_PREFIX) + .context( + "libp2p peer ID must be 0x08011220 followed by 32 bytes of ed25519 public key", + )?; + let public_ed25519_key_bytes: [u8; 32] = ed25519_public_key.try_into()?; + Ok(Self { + public_ed25519_key_bytes, + }) + } + /// Create a NodeId using a random Ed25519 key pair + pub fn random() -> Result<(Self, String)> { + // Generate random secret key and corresponding keypair + let random_secret = rand::thread_rng().gen::<[u8; 32]>(); + let key_pair = Ed25519KeyPair::from_seed_unchecked(random_secret.as_ref()) + .map_err(|e| anyhow!("failed to create new key pair: {}", e))?; + // Encode the public key and secret key + let public_ed25519_key_bytes: [u8; 32] = key_pair.public_key().as_ref().try_into()?; + let public_key_with_prefix = [ + ED25519_PUBLIC_KEY_MULTICODEC_PREFIX, + public_ed25519_key_bytes.as_ref(), + ] + .concat(); + let public_key_multibase = + multibase::encode(multibase::Base::Base58Btc, public_key_with_prefix); + let private_key_with_prefix = [ + ED25519_PRIVATE_KEY_MULTICODEC_PREFIX, + random_secret.as_ref(), + ] + .concat(); + let private_key_multibase = + multibase::encode(multibase::Base::Base58Btc, private_key_with_prefix); + Ok(( + Self { + public_ed25519_key_bytes, + }, + format!("{}:{}", private_key_multibase, public_key_multibase), + )) + } +} + +impl Display for NodeId { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.did_key()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use expect_test::expect; + + #[test] + fn test_ed25519_key_pair_from_secret() { + let secret = "z3u2WLX8jeyN6sfbDowLGudoZHudxgVkNJfrw2TDTVx4tijd"; + let (node_id_1, _) = NodeId::try_from_secret(secret).unwrap(); + let secret_and_public = "z3u2WLX8jeyN6sfbDowLGudoZHudxgVkNJfrw2TDTVx4tijd:z6MkueF19qChpGQJBJXcXjfoM1MYCwC167RMwUiNWXXvEm1M"; + let (node_id_2, _) = NodeId::try_from_secret(secret_and_public).unwrap(); + assert_eq!(node_id_1, node_id_2); + expect![["did:key:z6MkueF19qChpGQJBJXcXjfoM1MYCwC167RMwUiNWXXvEm1M"]] + .assert_eq(&node_id_1.did_key()); + } + + #[test] + fn test_did_from_peer_id() { + let peer_id = + PeerId::from_str("12D3KooWR1M8JiXyfdBKUhCLUmTJGhtNsgxnhvFVD4AU4EioDUwu").unwrap(); + let node_id = NodeId::try_from_peer_id(&peer_id).unwrap(); + expect![[r#" + "did:key:z6MkueF19qChpGQJBJXcXjfoM1MYCwC167RMwUiNWXXvEm1M" + "#]] + .assert_debug_eq(&node_id.did_key()); + } + + #[test] + fn test_peer_id_from_did() { + let did = "did:key:z6MkueF19qChpGQJBJXcXjfoM1MYCwC167RMwUiNWXXvEm1M"; + let node_id = NodeId::try_from_did_key(did).unwrap(); + expect![[r#" + PeerId( + "12D3KooWR1M8JiXyfdBKUhCLUmTJGhtNsgxnhvFVD4AU4EioDUwu", + ) + "#]] + .assert_debug_eq(&node_id.peer_id()); + } +} diff --git a/cspell.json b/cspell.json index 8a493a3d9..380352416 100644 --- a/cspell.json +++ b/cspell.json @@ -81,6 +81,7 @@ "termcolor", "unimock", "unixfs", + "unvalidated", "varint", "wordle", "wordlist", diff --git a/event-svc/src/event/migration.rs b/event-svc/src/event/migration.rs index 370412b31..226d97319 100644 --- a/event-svc/src/event/migration.rs +++ b/event-svc/src/event/migration.rs @@ -161,7 +161,8 @@ impl<'a, S: BlockStore> Migrator<'a, S> { payload.header().controllers()[0].clone(), cid, ); - let event = unvalidated::Event::from(payload); + let event = unvalidated::init::Event::new(payload); + let event: unvalidated::Event = unvalidated::Event::from(Box::new(event)); self.batch .push(event_builder.build(&self.network, event).await?); if self.batch.len() > 1000 { @@ -173,7 +174,7 @@ impl<'a, S: BlockStore> Migrator<'a, S> { } async fn write_batch(&mut self) -> Result<()> { self.service - .insert_events(&self.batch, DeliverableRequirement::Lazy) + .insert_events(&self.batch, DeliverableRequirement::Lazy, None) .await .map_err(Error::new_fatal)?; self.event_count += self.batch.len(); diff --git a/event-svc/src/event/order_events.rs b/event-svc/src/event/order_events.rs index 56664e9a6..6843519a4 100644 --- a/event-svc/src/event/order_events.rs +++ b/event-svc/src/event/order_events.rs @@ -153,7 +153,9 @@ mod test { let stream_1 = get_n_events(10).await; let mut to_insert = Vec::with_capacity(10); for event in stream_1.iter().chain(stream_2.iter()) { - let insertable = EventService::parse_discovered_event(event).await.unwrap(); + let insertable = EventService::parse_discovered_event(event, None) + .await + .unwrap(); to_insert.push(insertable); } (stream_1, stream_2, to_insert) @@ -187,7 +189,9 @@ mod test { let mut insertable = Vec::with_capacity(first_vec_count); let mut remaining = Vec::with_capacity(events.len() - first_vec_count); for (i, event) in events.iter().enumerate() { - let new = EventService::parse_discovered_event(event).await.unwrap(); + let new = EventService::parse_discovered_event(event, None) + .await + .unwrap(); if i < first_vec_count { insertable.push(new); } else { diff --git a/event-svc/src/event/ordering_task.rs b/event-svc/src/event/ordering_task.rs index c8b974ba2..0b77b23a7 100644 --- a/event-svc/src/event/ordering_task.rs +++ b/event-svc/src/event/ordering_task.rs @@ -607,7 +607,9 @@ mod test { let mut res = Vec::with_capacity(n); let events = get_n_events(n).await; for event in events { - let event = EventService::parse_discovered_event(&event).await.unwrap(); + let event = EventService::parse_discovered_event(&event, None) + .await + .unwrap(); res.push(event); } res diff --git a/event-svc/src/event/service.rs b/event-svc/src/event/service.rs index 609e51be4..becc7caa2 100644 --- a/event-svc/src/event/service.rs +++ b/event-svc/src/event/service.rs @@ -6,7 +6,7 @@ use super::{ ordering_task::{DeliverableTask, OrderingTask}, }; use async_trait::async_trait; -use ceramic_core::{EventId, Network, SerializeExt}; +use ceramic_core::{EventId, Network, NodeId, SerializeExt}; use ceramic_event::unvalidated; use ceramic_event::unvalidated::Event; use ceramic_flight::{ConclusionData, ConclusionEvent, ConclusionInit, ConclusionTime}; @@ -53,7 +53,7 @@ pub trait BlockStore { async fn block_data(&self, cid: &Cid) -> anyhow::Result>>; } -#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq)] pub enum DeliverableRequirement { /// Must be ordered immediately and is rejected if not currently deliverable. The appropriate setting /// for API writes as we cannot create an event without its history. @@ -146,6 +146,7 @@ impl EventService { /// In the future, we will need to do more event validation (verify all EventID pieces, hashes, signatures, etc). pub(crate) async fn parse_discovered_event( item: &ReconItem, + informant: Option, ) -> Result { let (cid, parsed_event) = unvalidated::Event::::decode_car(item.value.as_slice(), false) @@ -155,6 +156,7 @@ impl EventService { item.key.to_owned(), cid, parsed_event, + informant, false, )?) } @@ -175,11 +177,12 @@ impl EventService { pub(crate) async fn validate_events( items: &[ReconItem], + informant: Option, ) -> Result<(Vec, Vec)> { let mut parsed_events = Vec::with_capacity(items.len()); let mut invalid_events = Vec::new(); for event in items { - match Self::parse_discovered_event(event).await { + match Self::parse_discovered_event(event, informant).await { Ok(insertable) => parsed_events.push(insertable), Err(err) => invalid_events.push(InvalidItem::InvalidFormat { key: event.key.clone(), @@ -227,8 +230,9 @@ impl EventService { &self, items: &[ReconItem], source: DeliverableRequirement, + informant: Option, ) -> Result { - let (to_insert, mut invalid) = Self::validate_events(items).await?; + let (to_insert, mut invalid) = Self::validate_events(items, informant).await?; let ordered = OrderEvents::try_new(&self.pool, to_insert).await?; diff --git a/event-svc/src/event/store.rs b/event-svc/src/event/store.rs index 5969110a3..6d90bfd0d 100644 --- a/event-svc/src/event/store.rs +++ b/event-svc/src/event/store.rs @@ -1,7 +1,7 @@ use std::ops::Range; use anyhow::anyhow; -use ceramic_core::EventId; +use ceramic_core::{EventId, NodeId}; use cid::Cid; use iroh_bitswap::Block; use recon::{HashCount, ReconItem, Result as ReconResult, Sha256a}; @@ -44,9 +44,10 @@ impl recon::Store for EventService { async fn insert_many( &self, items: &[ReconItem], + informant: NodeId, ) -> ReconResult> { let res = self - .insert_events(items, DeliverableRequirement::Asap) + .insert_events(items, DeliverableRequirement::Asap, Some(informant)) .await?; Ok(res.into()) @@ -163,6 +164,7 @@ impl ceramic_api::EventService for EventService { async fn insert_many( &self, items: Vec, + informant: NodeId, ) -> anyhow::Result> { let items = items .into_iter() @@ -172,7 +174,7 @@ impl ceramic_api::EventService for EventService { }) .collect::>(); let res = self - .insert_events(&items, DeliverableRequirement::Immediate) + .insert_events(&items, DeliverableRequirement::Immediate, Some(informant)) .await?; Ok(res.into()) diff --git a/event-svc/src/store/metrics.rs b/event-svc/src/store/metrics.rs index 4a89996f2..18367a9a9 100644 --- a/event-svc/src/store/metrics.rs +++ b/event-svc/src/store/metrics.rs @@ -1,7 +1,7 @@ use std::{ops::Range, time::Duration}; use async_trait::async_trait; -use ceramic_core::{Cid, EventId}; +use ceramic_core::{Cid, EventId, NodeId}; use ceramic_metrics::{register, Recorder}; use futures::Future; use prometheus_client::{ @@ -124,11 +124,12 @@ where async fn insert_many( &self, items: Vec, + informant: NodeId, ) -> anyhow::Result> { let new_keys = StoreMetricsMiddleware::::record( &self.metrics, "api_insert_many", - self.store.insert_many(items), + self.store.insert_many(items, informant), ) .await?; @@ -216,11 +217,12 @@ where async fn insert_many( &self, items: &[ReconItem], + informant: NodeId, ) -> ReconResult> { let res = StoreMetricsMiddleware::::record( &self.metrics, "insert_many", - self.store.insert_many(items), + self.store.insert_many(items, informant), ) .await?; diff --git a/event-svc/src/store/sql/access/event.rs b/event-svc/src/store/sql/access/event.rs index e875bfb29..f540b29b8 100644 --- a/event-svc/src/store/sql/access/event.rs +++ b/event-svc/src/store/sql/access/event.rs @@ -5,7 +5,7 @@ use std::{ }; use anyhow::anyhow; -use ceramic_core::{event_id::InvalidEventId, Cid, EventId}; +use ceramic_core::{event_id::InvalidEventId, Cid, EventId, NodeId}; use ceramic_event::unvalidated; use ceramic_sql::sqlite::{SqlitePool, SqliteTransaction}; use ipld_core::ipld::Ipld; @@ -88,6 +88,8 @@ impl CeramicOneEvent { async fn insert_event( tx: &mut SqliteTransaction<'_>, key: &EventId, + stream_cid: &Cid, + informant: &Option, deliverable: bool, ) -> Result { let id = key.as_bytes(); @@ -114,6 +116,8 @@ impl CeramicOneEvent { .bind(hash.as_u32s()[6]) .bind(hash.as_u32s()[7]) .bind(delivered) + .bind(stream_cid.to_bytes()) + .bind(informant.as_ref().map(|n| n.did_key())) .execute(&mut **tx.inner()) .await; @@ -179,7 +183,14 @@ impl CeramicOneEvent { let mut tx = pool.begin_tx().await.map_err(Error::from)?; for item in to_add { - let new_key = Self::insert_event(&mut tx, item.order_key(), item.deliverable()).await?; + let new_key = Self::insert_event( + &mut tx, + item.order_key(), + item.stream_cid(), + item.informant(), + item.deliverable(), + ) + .await?; inserted.push(InsertedEvent::new( item.order_key().clone(), new_key, diff --git a/event-svc/src/store/sql/entities/event.rs b/event-svc/src/store/sql/entities/event.rs index a41e6a003..a72e060f7 100644 --- a/event-svc/src/store/sql/entities/event.rs +++ b/event-svc/src/store/sql/entities/event.rs @@ -2,7 +2,7 @@ use std::{collections::BTreeSet, sync::Arc}; use anyhow::anyhow; use ceramic_car::{CarHeader, CarReader, CarWriter}; -use ceramic_core::EventId; +use ceramic_core::{EventId, NodeId}; use ceramic_event::unvalidated; use cid::Cid; use ipld_core::ipld::Ipld; @@ -51,6 +51,8 @@ pub struct EventInsertable { deliverable: bool, /// The parsed structure containing the actual Event data. event: Arc>, + /// The ID of the Node that informed us about this event + informant: Option, } impl EventInsertable { @@ -59,6 +61,7 @@ impl EventInsertable { order_key: EventId, event_cid: Cid, event: unvalidated::Event, + informant: Option, deliverable: bool, ) -> Result { let cid = order_key.cid().ok_or_else(|| { @@ -81,14 +84,19 @@ impl EventInsertable { cid, deliverable, event: Arc::new(event), + informant, }) } - /// Get the Recon order key (EventId) of the event. pub fn order_key(&self) -> &EventId { &self.order_key } + /// Get the CID of the init event of the stream + pub fn stream_cid(&self) -> &Cid { + self.event.id() + } + /// Get the CID of the event pub fn cid(&self) -> &Cid { &self.cid @@ -99,6 +107,11 @@ impl EventInsertable { &self.event } + /// Get the Event source. + pub fn informant(&self) -> &Option { + &self.informant + } + /// Whether this event is deliverable currently pub fn deliverable(&self) -> bool { self.deliverable diff --git a/event-svc/src/store/sql/query.rs b/event-svc/src/store/sql/query.rs index fee22afe5..a50dfb3f0 100644 --- a/event-svc/src/store/sql/query.rs +++ b/event-svc/src/store/sql/query.rs @@ -171,18 +171,18 @@ pub enum SqlBackend { } impl ReconQuery { - /// Requires 10 parameters: the order_key, cid and the 8 hash values + /// Requires 13 parameters: the order_key, cid, 8 hash values, delivered flag, init_cid, and informant pub fn insert_event() -> &'static str { "INSERT INTO ceramic_one_event ( order_key, cid, ahash_0, ahash_1, ahash_2, ahash_3, ahash_4, ahash_5, ahash_6, ahash_7, - delivered + delivered, init_cid, informant ) VALUES ( $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, - $11 + $11, $12, $13 );" } diff --git a/event-svc/src/store/sql/test.rs b/event-svc/src/store/sql/test.rs index 0eb7a1501..ecda69a42 100644 --- a/event-svc/src/store/sql/test.rs +++ b/event-svc/src/store/sql/test.rs @@ -5,6 +5,7 @@ use ceramic_core::{ EventId, Network, }; use ceramic_event::unvalidated; +use ceramic_event::unvalidated::init; use cid::Cid; use expect_test::expect; use ipld_core::codec::Codec; @@ -47,9 +48,9 @@ fn random_events(num: usize) -> Vec { Code::Sha2_256.digest(&serde_ipld_dagcbor::to_vec(&payload).unwrap()), ); let order_key = event_id_builder().with_event(&cid).build(); - let event = unvalidated::Event::from(payload); + let event = Box::new(init::Event::new(payload)).into(); - events.push(EventInsertable::new(order_key, cid, event, true).unwrap()) + events.push(EventInsertable::new(order_key, cid, event, None, true).unwrap()) } events diff --git a/event-svc/src/tests/event.rs b/event-svc/src/tests/event.rs index 300b7a9b9..43cbb2194 100644 --- a/event-svc/src/tests/event.rs +++ b/event-svc/src/tests/event.rs @@ -5,6 +5,7 @@ use crate::EventService; use anyhow::Error; use bytes::Bytes; use ceramic_api::{ApiItem, EventService as ApiEventService}; +use ceramic_core::NodeId; use ceramic_flight::server::ConclusionFeed as _; use ceramic_flight::ConclusionEvent; use ceramic_sql::sqlite::SqlitePool; @@ -65,10 +66,10 @@ where let init_cid = one.key.cid().unwrap(); let min_id = event_id_min(&init_cid, &model); let max_id = event_id_max(&init_cid, &model); - recon::Store::insert_many(&store, &[one.clone()]) + recon::Store::insert_many(&store, &[one.clone()], NodeId::random().unwrap().0) .await .unwrap(); - recon::Store::insert_many(&store, &[two.clone()]) + recon::Store::insert_many(&store, &[two.clone()], NodeId::random().unwrap().0) .await .unwrap(); let values: Vec<(EventId, Vec)> = @@ -104,16 +105,20 @@ where let item = &[ReconItem::new(id, car)]; // first insert reports its a new key - assert!(recon::Store::insert_many(&store, item) - .await - .unwrap() - .included_new_key()); + assert!( + recon::Store::insert_many(&store, item, NodeId::random().unwrap().0) + .await + .unwrap() + .included_new_key() + ); // second insert of same key reports it already existed - assert!(!recon::Store::insert_many(&store, item) - .await - .unwrap() - .included_new_key()); + assert!( + !recon::Store::insert_many(&store, item, NodeId::random().unwrap().0) + .await + .unwrap() + .included_new_key() + ); } test_with_dbs!( @@ -137,14 +142,22 @@ where let TestEventInfo { car: car2, .. } = build_event().await; let expected = hex::encode(&car1); - let actual = recon::Store::insert_many(&store, &[ReconItem::new(id.clone(), car1)]) - .await - .unwrap(); + let actual = recon::Store::insert_many( + &store, + &[ReconItem::new(id.clone(), car1)], + NodeId::random().unwrap().0, + ) + .await + .unwrap(); assert_eq!(actual, InsertResult::new(1)); - let res = recon::Store::insert_many(&store, &[ReconItem::new(id.clone(), car2)]) - .await - .unwrap(); + let res = recon::Store::insert_many( + &store, + &[ReconItem::new(id.clone(), car2)], + NodeId::random().unwrap().0, + ) + .await + .unwrap(); assert_eq!(1, res.invalid.len()); let invalid = res.invalid.first().unwrap(); @@ -184,9 +197,13 @@ where .. } = build_event().await; let expected = hex::encode(&store_value); - recon::Store::insert_many(&store, &[ReconItem::new(key.clone(), store_value)]) - .await - .unwrap(); + recon::Store::insert_many( + &store, + &[ReconItem::new(key.clone(), store_value)], + NodeId::random().unwrap().0, + ) + .await + .unwrap(); let value = recon::Store::value_for_key(&store, &key) .await .unwrap() @@ -214,9 +231,13 @@ where .. } = build_event().await; let expected = hex::encode(&store_value); - recon::Store::insert_many(&store, &[ReconItem::new(key.clone(), store_value)]) - .await - .unwrap(); + recon::Store::insert_many( + &store, + &[ReconItem::new(key.clone(), store_value)], + NodeId::random().unwrap().0, + ) + .await + .unwrap(); let value = recon::Store::value_for_key(&store, &key) .await .unwrap() @@ -248,7 +269,10 @@ async fn prep_highwater_tests(store: &dyn ApiEventService) -> (Cid, Cid, Cid) { keys[1].key.cid().unwrap(), keys[2].key.cid().unwrap(), ); - store.insert_many(keys).await.unwrap(); + store + .insert_many(keys, NodeId::random().unwrap().0) + .await + .unwrap(); res } @@ -445,7 +469,10 @@ where .. } = build_event().await; let item = ApiItem::new(key, store_value); - store.insert_many(vec![item.clone()]).await.unwrap(); + store + .insert_many(vec![item.clone()], NodeId::random().unwrap().0) + .await + .unwrap(); let res = store.value_for_order_key(&item.key).await.unwrap().unwrap(); assert_eq!(&res, item.value.as_ref()); @@ -472,7 +499,10 @@ where } = build_event().await; let item = ApiItem::new(key, store_value); - store.insert_many(vec![item.clone()]).await.unwrap(); + store + .insert_many(vec![item.clone()], NodeId::random().unwrap().0) + .await + .unwrap(); let res = store .value_for_cid(&item.key.cid().unwrap()) @@ -589,7 +619,11 @@ async fn test_conclusion_events_since() -> Result<(), Box for event in &test_events { service - .insert_events(&[event.clone()], DeliverableRequirement::Immediate) + .insert_events( + &[event.clone()], + DeliverableRequirement::Immediate, + Some(NodeId::random().unwrap().0), + ) .await?; } diff --git a/event-svc/src/tests/migration.rs b/event-svc/src/tests/migration.rs index 08922d16d..b9969d2fe 100644 --- a/event-svc/src/tests/migration.rs +++ b/event-svc/src/tests/migration.rs @@ -154,7 +154,10 @@ fn random_cid() -> cid::Cid { async fn random_unsigned_init_time_event() -> Vec> { let init = random_unsigned_init_event().await; let init_cid = init.to_cid().unwrap(); - vec![init.into(), random_time_event(init_cid).await.into()] + vec![ + unvalidated::Event::Unsigned(Box::new(unvalidated::init::Event::new(init.into()))), + random_time_event(init_cid).await.into(), + ] } // create random time event with a previous signed init event async fn random_signed_init_time_event() -> Vec> { diff --git a/event-svc/src/tests/ordering.rs b/event-svc/src/tests/ordering.rs index cfb85091f..62123306f 100644 --- a/event-svc/src/tests/ordering.rs +++ b/event-svc/src/tests/ordering.rs @@ -1,7 +1,7 @@ use std::collections::HashMap; use ceramic_api::{EventDataResult, EventService as ApiEventService, IncludeEventData}; -use ceramic_core::EventId; +use ceramic_core::{EventId, NodeId}; use rand::seq::SliceRandom; use rand::thread_rng; use recon::ReconItem; @@ -21,13 +21,19 @@ async fn setup_service() -> EventService { async fn add_and_assert_new_recon_event(store: &EventService, item: ReconItem) { tracing::trace!("inserted event: {}", item.key.cid().unwrap()); - let new = recon::Store::insert_many(store, &[item]).await.unwrap(); + let new = recon::Store::insert_many(store, &[item], NodeId::random().unwrap().0) + .await + .unwrap(); assert!(new.included_new_key()); } async fn add_and_assert_new_local_event(store: &EventService, item: ReconItem) { let new = store - .insert_events(&[item], DeliverableRequirement::Immediate) + .insert_events( + &[item], + DeliverableRequirement::Immediate, + Some(NodeId::random().unwrap().0), + ) .await .unwrap(); let new = new.store_result.count_new_keys(); @@ -58,7 +64,11 @@ async fn test_missing_prev_history_required_not_inserted() { let data = &events[1]; let new = store - .insert_events(&[data.to_owned()], DeliverableRequirement::Immediate) + .insert_events( + &[data.to_owned()], + DeliverableRequirement::Immediate, + Some(NodeId::random().unwrap().0), + ) .await .unwrap(); assert!(new.store_result.inserted.is_empty()); @@ -94,6 +104,7 @@ async fn test_prev_in_same_write_history_required() { .insert_events( &[init.to_owned(), data.to_owned()], DeliverableRequirement::Immediate, + Some(NodeId::random().unwrap().0), ) .await .unwrap(); diff --git a/event/src/unvalidated/event.rs b/event/src/unvalidated/event.rs index 9bd3d2205..c55417a11 100644 --- a/event/src/unvalidated/event.rs +++ b/event/src/unvalidated/event.rs @@ -247,9 +247,9 @@ impl From> for Event { } } -impl From> for Event { - fn from(payload: init::Payload) -> Self { - Self::Unsigned(Box::new(init::Event::new(payload))) +impl From>> for Event { + fn from(value: Box>) -> Self { + Self::Unsigned(value) } } diff --git a/event/src/unvalidated/payload/init.rs b/event/src/unvalidated/payload/init.rs index 764d328a5..8f634894c 100644 --- a/event/src/unvalidated/payload/init.rs +++ b/event/src/unvalidated/payload/init.rs @@ -1,9 +1,11 @@ +use cid::Cid; use serde::{Deserialize, Serialize}; +use std::fmt::Debug; use ceramic_car::sync::{CarHeader, CarWriter}; -use ceramic_core::{Cid, SerializeExt}; +use ceramic_core::SerializeExt; -use crate::Bytes; +use crate::bytes::Bytes; /// Represents an event with a payload and its corresponding CID #[derive(Debug)] @@ -31,7 +33,7 @@ impl Event { impl Event { /// Create a new init Event - pub(crate) fn new(payload: Payload) -> Self { + pub fn new(payload: Payload) -> Self { Self { cid: payload .to_cid() @@ -78,7 +80,7 @@ impl Payload { impl Payload { /// Encode the unsigned init event into CAR bytes. - pub fn encode_car(&self) -> Result, anyhow::Error> { + pub fn encode_car(&self) -> anyhow::Result> { let (cid, event) = self.to_dag_cbor_block()?; let mut car = Vec::new(); let roots: Vec = vec![cid]; diff --git a/interest-svc/src/interest/store.rs b/interest-svc/src/interest/store.rs index 92b2540ff..b2f3e2184 100644 --- a/interest-svc/src/interest/store.rs +++ b/interest-svc/src/interest/store.rs @@ -1,6 +1,6 @@ use std::ops::Range; -use ceramic_core::Interest; +use ceramic_core::{Interest, NodeId}; use recon::{HashCount, InsertResult, ReconItem, Result as ReconResult, Sha256a}; use tracing::instrument; @@ -20,6 +20,8 @@ impl recon::Store for InterestService { async fn insert_many( &self, items: &[ReconItem], + // the recon::Store trait is shared between InterestService and EventService but only events track a source. + _informant: NodeId, ) -> ReconResult> { let keys = items.iter().map(|item| &item.key).collect::>(); Ok(CeramicOneInterest::insert_many(&self.pool, &keys) diff --git a/interest-svc/src/store/metrics.rs b/interest-svc/src/store/metrics.rs index 2f7043d3a..3ad852d9e 100644 --- a/interest-svc/src/store/metrics.rs +++ b/interest-svc/src/store/metrics.rs @@ -1,7 +1,7 @@ use std::{ops::Range, time::Duration}; use async_trait::async_trait; -use ceramic_core::Interest; +use ceramic_core::{Interest, NodeId}; use ceramic_metrics::{register, Recorder}; use futures::Future; use prometheus_client::{ @@ -166,11 +166,12 @@ where async fn insert_many( &self, items: &[ReconItem], + informant: NodeId, ) -> ReconResult> { let res = StoreMetricsMiddleware::::record( &self.metrics, "insert_many", - self.store.insert_many(items), + self.store.insert_many(items, informant), ) .await?; diff --git a/interest-svc/src/tests/interest.rs b/interest-svc/src/tests/interest.rs index 9681d3a10..24272f59b 100644 --- a/interest-svc/src/tests/interest.rs +++ b/interest-svc/src/tests/interest.rs @@ -3,7 +3,7 @@ use std::{collections::BTreeSet, str::FromStr}; use ceramic_api::InterestService; use ceramic_core::{ interest::{Builder, WithPeerId}, - Interest, PeerId, + Interest, NodeId, PeerId, }; use expect_test::expect; use rand::{thread_rng, Rng}; @@ -116,6 +116,7 @@ where random_interest(Some((&[0], &[1])), Some(42)), vec![], )], + NodeId::random().unwrap().0, ) .await .unwrap(); @@ -126,6 +127,7 @@ where random_interest(Some((&[0], &[1])), Some(24)), vec![], )], + NodeId::random().unwrap().0, ) .await .unwrap(); @@ -150,12 +152,20 @@ where let interest_0 = random_interest(None, None); let interest_1 = random_interest(None, None); - recon::Store::insert_many(&store, &[ReconItem::new(interest_0.clone(), Vec::new())]) - .await - .unwrap(); - recon::Store::insert_many(&store, &[ReconItem::new(interest_1.clone(), Vec::new())]) - .await - .unwrap(); + recon::Store::insert_many( + &store, + &[ReconItem::new(interest_0.clone(), Vec::new())], + NodeId::random().unwrap().0, + ) + .await + .unwrap(); + recon::Store::insert_many( + &store, + &[ReconItem::new(interest_1.clone(), Vec::new())], + NodeId::random().unwrap().0, + ) + .await + .unwrap(); let ids = recon::Store::range( &store, &random_interest_min()..&random_interest_max(), @@ -182,11 +192,17 @@ where let interest_1 = random_interest(None, None); store - .insert_many(&[ReconItem::new(interest_0.clone(), Vec::new())]) + .insert_many( + &[ReconItem::new(interest_0.clone(), Vec::new())], + NodeId::random().unwrap().0, + ) .await .unwrap(); store - .insert_many(&[ReconItem::new(interest_1.clone(), Vec::new())]) + .insert_many( + &[ReconItem::new(interest_1.clone(), Vec::new())], + NodeId::random().unwrap().0, + ) .await .unwrap(); let ids = store @@ -216,20 +232,24 @@ where { let interest = random_interest(None, None); // do take the first one - assert!( - &recon::Store::insert_many(&store, &[ReconItem::new(interest.clone(), Vec::new())]) - .await - .unwrap() - .included_new_key(), - ); + assert!(&recon::Store::insert_many( + &store, + &[ReconItem::new(interest.clone(), Vec::new())], + NodeId::random().unwrap().0, + ) + .await + .unwrap() + .included_new_key(),); // reject the second insert of same key - assert!( - !recon::Store::insert_many(&store, &[ReconItem::new(interest.clone(), Vec::new())],) - .await - .unwrap() - .included_new_key() - ); + assert!(!recon::Store::insert_many( + &store, + &[ReconItem::new(interest.clone(), Vec::new())], + NodeId::random().unwrap().0, + ) + .await + .unwrap() + .included_new_key()); } test_with_dbs!( @@ -243,9 +263,13 @@ where S: recon::Store, { let key = random_interest(None, None); - recon::Store::insert_many(&store, &[ReconItem::new(key.clone(), Vec::new())]) - .await - .unwrap(); + recon::Store::insert_many( + &store, + &[ReconItem::new(key.clone(), Vec::new())], + NodeId::random().unwrap().0, + ) + .await + .unwrap(); let value = store.value_for_key(&key).await.unwrap(); let val = value.unwrap(); let empty: Vec = vec![]; diff --git a/migrations/sqlite/20240809205107_event_source_anchored.down.sql b/migrations/sqlite/20240809205107_event_source_anchored.down.sql new file mode 100644 index 000000000..5a2e91dc2 --- /dev/null +++ b/migrations/sqlite/20240809205107_event_source_anchored.down.sql @@ -0,0 +1,5 @@ +-- Add down migration script here + +-- Remove the newly added init_cid and informant columns +ALTER TABLE ceramic_one_event DROP COLUMN init_cid; +ALTER TABLE ceramic_one_event DROP COLUMN informant; diff --git a/migrations/sqlite/20240809205107_event_source_anchored.up.sql b/migrations/sqlite/20240809205107_event_source_anchored.up.sql new file mode 100644 index 000000000..7cb0d1fa2 --- /dev/null +++ b/migrations/sqlite/20240809205107_event_source_anchored.up.sql @@ -0,0 +1,9 @@ +-- Add up migration script here + +-- The CID of the Init Event for the stream +ALTER TABLE ceramic_one_event ADD COLUMN init_cid BLOB DEFAULT NULL; + +-- The ID of the node that sent the event or our own ID if the event came in via API +ALTER TABLE ceramic_one_event ADD COLUMN informant TEXT DEFAULT NULL; + +SELECT init_cid, informant FROM "ceramic_one_event" WHERE false; diff --git a/one/src/daemon.rs b/one/src/daemon.rs index 9c4204be6..f33051a2a 100644 --- a/one/src/daemon.rs +++ b/one/src/daemon.rs @@ -1,6 +1,7 @@ use std::{path::PathBuf, time::Duration}; use anyhow::{anyhow, Context, Result}; +use ceramic_core::NodeId; use ceramic_event_svc::EventService; use ceramic_interest_svc::InterestService; use ceramic_kubo_rpc::Multiaddr; @@ -386,8 +387,9 @@ pub async fn run(opts: DaemonOpts) -> Result<()> { }; // Build HTTP server + let (node_id, _) = NodeId::try_from_dir(opts.p2p_key_dir.clone())?; let mut ceramic_server = ceramic_api::Server::new( - peer_id, + node_id, network, interest_api_store, Arc::new(model_api_store), diff --git a/p2p/src/node.rs b/p2p/src/node.rs index 0401c777a..ef163db8f 100644 --- a/p2p/src/node.rs +++ b/p2p/src/node.rs @@ -1168,7 +1168,7 @@ mod tests { use crate::keys::Keypair; use async_trait::async_trait; - use ceramic_core::RangeOpen; + use ceramic_core::{NodeId, RangeOpen}; use ceramic_event_svc::{store::SqlitePool, EventService}; use futures::TryStreamExt; use rand::prelude::*; @@ -1258,6 +1258,7 @@ mod tests { async fn insert( &self, _items: Vec>, + _informant: NodeId, ) -> ReconResult> { unreachable!() } diff --git a/recon/src/libp2p/protocol.rs b/recon/src/libp2p/protocol.rs index b4915b15e..3ed49a740 100644 --- a/recon/src/libp2p/protocol.rs +++ b/recon/src/libp2p/protocol.rs @@ -5,10 +5,11 @@ use libp2p::swarm::ConnectionId; use libp2p_identity::PeerId; use tracing::Level; -use crate::protocol::ProtocolConfig; +use ceramic_core::NodeId; + use crate::{ libp2p::stream_set::StreamSet, - protocol::{self, Recon}, + protocol::{self, ProtocolConfig, Recon}, }; // Intiate Recon synchronization with a peer over a stream. @@ -26,14 +27,15 @@ where { let codec = CborCodec::new(); let stream = Framed::new(stream, codec); - protocol::initiate_synchronize(recon, stream, ProtocolConfig::new_peer_id(remote_peer_id)) + let remote_node_id = NodeId::try_from_peer_id(&remote_peer_id)?; + protocol::initiate_synchronize(recon, stream, ProtocolConfig::new_node_id(remote_node_id)) .await?; Ok(stream_set) } // Intiate Recon synchronization with a peer over a stream. #[tracing::instrument(skip(recon, stream, ), ret(level = Level::DEBUG))] pub async fn respond_synchronize( - remote_peer_id: PeerId, + remote_peer_id: PeerId, // included for context only connection_id: ConnectionId, // included for context only stream_set: StreamSet, recon: R, @@ -45,7 +47,8 @@ where { let codec = CborCodec::new(); let stream = Framed::new(stream, codec); - protocol::respond_synchronize(recon, stream, ProtocolConfig::new_peer_id(remote_peer_id)) + let remote_node_id = NodeId::try_from_peer_id(&remote_peer_id)?; + protocol::respond_synchronize(recon, stream, ProtocolConfig::new_node_id(remote_node_id)) .await?; Ok(stream_set) } diff --git a/recon/src/libp2p/tests.rs b/recon/src/libp2p/tests.rs index 147130051..8dbe821f8 100644 --- a/recon/src/libp2p/tests.rs +++ b/recon/src/libp2p/tests.rs @@ -7,6 +7,7 @@ use crate::{ }; use async_trait::async_trait; +use ceramic_core::NodeId; use libp2p::{metrics::Registry, PeerId, Swarm}; use libp2p_swarm_test::SwarmExt; use test_log::test; @@ -62,10 +63,14 @@ where type Key = K; type Hash = H; - async fn insert_many(&self, items: &[ReconItem]) -> ReconResult> { + async fn insert_many( + &self, + items: &[ReconItem], + informant: NodeId, + ) -> ReconResult> { self.as_error()?; - self.inner.insert_many(items).await + self.inner.insert_many(items, informant).await } async fn hash_range(&self, range: Range<&Self::Key>) -> ReconResult> { diff --git a/recon/src/protocol.rs b/recon/src/protocol.rs index 170228661..59215a548 100644 --- a/recon/src/protocol.rs +++ b/recon/src/protocol.rs @@ -14,10 +14,9 @@ use anyhow::{anyhow, bail, Context, Result}; use async_stream::try_stream; use async_trait::async_trait; -use ceramic_core::RangeOpen; +use ceramic_core::{NodeId, RangeOpen}; use ceramic_metrics::Recorder; use futures::{pin_mut, stream::BoxStream, Sink, SinkExt, Stream, StreamExt, TryStreamExt}; -use libp2p::PeerId; use serde::{Deserialize, Serialize}; use tokio::{sync::mpsc, time::Instant}; use tokio_stream::once; @@ -63,25 +62,24 @@ pub struct ProtocolConfig { /// As we descend the tree and find smaller ranges, this won't apply as we have to flush /// before recomputing a range, but it will be used when we're processing large ranges we don't yet have. pub insert_batch_size: usize, - #[allow(dead_code)] - /// The ID of the peer we're syncing with. - peer_id: PeerId, + /// The ID of the node we're syncing with. + node_id: NodeId, } impl ProtocolConfig { /// Create an instance of the config - pub fn new(insert_batch_size: usize, peer_id: PeerId) -> Self { + pub fn new(insert_batch_size: usize, node_id: NodeId) -> Self { Self { insert_batch_size, - peer_id, + node_id, } } /// Uses the constant defaults defined for batch size (100) and max items (1000) - pub fn new_peer_id(peer_id: PeerId) -> Self { + pub fn new_node_id(node_id: NodeId) -> Self { Self { insert_batch_size: INSERT_BATCH_SIZE, - peer_id, + node_id, } } } @@ -770,16 +768,23 @@ where let evs: Vec<_> = self.event_q.drain(..).collect(); - let batch = self.recon.insert(evs).await.context("persisting all")?; + let batch = self + .recon + .insert(evs, self.config.node_id) + .await + .context("persisting all")?; if !batch.invalid.is_empty() { for invalid in &batch.invalid { self.recon.metrics().record(invalid) } tracing::warn!( - invalid_cnt=%batch.invalid.len(), peer_id=%self.config.peer_id, + invalid_cnt=%batch.invalid.len(), peer_id=%self.config.node_id.peer_id(), "Recon discovered data it will never allow. Hanging up on peer", ); - bail!("Received unknown data from peer: {}", self.config.peer_id); + bail!( + "Received unknown data from peer: {}", + self.config.node_id.peer_id() + ); } // for now, we record the metrics from recon but the service is the one that will track and try to store them @@ -815,6 +820,7 @@ pub trait Recon: Clone + Send + Sync + 'static { async fn insert( &self, items: Vec>, + informant: NodeId, ) -> ReconResult>; /// Get all keys in the specified range diff --git a/recon/src/recon.rs b/recon/src/recon.rs index e2da3e650..f9cc14c9a 100644 --- a/recon/src/recon.rs +++ b/recon/src/recon.rs @@ -11,7 +11,7 @@ use std::{ use anyhow::anyhow; use async_trait::async_trait; -use ceramic_core::{EventId, Interest, PeerId, RangeOpen}; +use ceramic_core::{EventId, Interest, NodeId, PeerId, RangeOpen}; use serde::{Deserialize, Serialize}; use tracing::{instrument, trace, Level}; @@ -181,8 +181,8 @@ where type Hash = H; /// Insert keys into the key space. - async fn insert(&self, items: Vec>) -> Result> { - self.store.insert_many(&items).await + async fn insert(&self, items: Vec>, informant: NodeId) -> Result> { + self.store.insert_many(&items, informant).await } /// Return all keys in the range between left_fencepost and right_fencepost. @@ -530,7 +530,11 @@ pub trait Store { /// Insert new keys into the key space. /// Returns true for each key if it did not previously exist, in the /// same order as the input iterator. - async fn insert_many(&self, items: &[ReconItem]) -> Result>; + async fn insert_many( + &self, + items: &[ReconItem], + informant: NodeId, + ) -> Result>; /// Return the hash of all keys in the range between left_fencepost and right_fencepost. /// The upper range bound is exclusive. @@ -638,8 +642,12 @@ where type Key = K; type Hash = H; - async fn insert_many(&self, items: &[ReconItem]) -> Result> { - self.as_ref().insert_many(items).await + async fn insert_many( + &self, + items: &[ReconItem], + informant: NodeId, + ) -> Result> { + self.as_ref().insert_many(items, informant).await } async fn hash_range(&self, range: Range<&Self::Key>) -> Result> { diff --git a/recon/src/recon/btreestore.rs b/recon/src/recon/btreestore.rs index 492bbcbf5..c6a441ce6 100644 --- a/recon/src/recon/btreestore.rs +++ b/recon/src/recon/btreestore.rs @@ -1,4 +1,5 @@ use async_trait::async_trait; +use ceramic_core::NodeId; use std::{collections::BTreeMap, ops::Range, sync::Arc}; use tokio::sync::Mutex; use tracing::instrument; @@ -123,7 +124,7 @@ where Ok(Box::new(keys.into_iter())) } - async fn insert(&self, item: &ReconItem) -> Result { + async fn insert(&self, item: &ReconItem, _informant: NodeId) -> Result { let mut inner = self.inner.lock().await; let new = inner .keys @@ -145,11 +146,15 @@ where type Hash = H; #[instrument(skip(self))] - async fn insert_many(&self, items: &[ReconItem]) -> Result> { + async fn insert_many( + &self, + items: &[ReconItem], + informant: NodeId, + ) -> Result> { tracing::trace!("inserting items: {}", items.len()); let mut new = 0; for item in items.iter() { - self.insert(item).await?.then(|| new += 1); + self.insert(item, informant).await?.then(|| new += 1); } Ok(InsertResult::new(new)) } diff --git a/recon/src/recon/tests.rs b/recon/src/recon/tests.rs index a29ae013f..05cd9d1fa 100644 --- a/recon/src/recon/tests.rs +++ b/recon/src/recon/tests.rs @@ -19,9 +19,8 @@ use std::{collections::BTreeSet, sync::Arc}; use anyhow::Result; use async_trait::async_trait; -use ceramic_core::RangeOpen; +use ceramic_core::{NodeId, RangeOpen}; use futures::{ready, Future, Sink, Stream}; -use libp2p::PeerId; use pin_project::pin_project; use prometheus_client::registry::Registry; use serde::{Deserialize, Serialize}; @@ -511,10 +510,13 @@ async fn word_lists() { ); for key in s.split_whitespace().map(|s| s.to_string()) { if !s.is_empty() { - r.insert(vec![ReconItem::new( - key.as_bytes().into(), - key.to_uppercase().as_bytes().to_vec(), - )]) + r.insert( + vec![ReconItem::new( + key.as_bytes().into(), + key.to_uppercase().as_bytes().to_vec(), + )], + NodeId::random().unwrap().0, + ) .await .unwrap(); } @@ -568,12 +570,12 @@ async fn word_lists() { let local_handle = tokio::spawn(protocol::initiate_synchronize( local, local_channel, - ProtocolConfig::new(100, PeerId::random()), + ProtocolConfig::new(100, NodeId::random().unwrap().0), )); let remote_handle = tokio::spawn(protocol::respond_synchronize( remote, remote_channel, - ProtocolConfig::new(100, PeerId::random()), + ProtocolConfig::new(100, NodeId::random().unwrap().0), )); // Error if either synchronize method errors let (local, remote) = tokio::join!(local_handle, remote_handle); @@ -1139,12 +1141,12 @@ async fn recon_do_batch_size( let cat_fut = protocol::initiate_synchronize( cat.clone(), cat_channel, - ProtocolConfig::new(batch_size, PeerId::random()), + ProtocolConfig::new(batch_size, NodeId::random().unwrap().0), ); let dog_fut = protocol::respond_synchronize( dog.clone(), dog_channel, - ProtocolConfig::new(batch_size, PeerId::random()), + ProtocolConfig::new(batch_size, NodeId::random().unwrap().0), ); // Drive both synchronize futures on the same thread // This is to ensure a deterministic behavior.