Skip to content

Commit

Permalink
feat: anchor service (#484)
Browse files Browse the repository at this point in the history
  • Loading branch information
smrz2001 authored Sep 12, 2024
1 parent 6caebb2 commit c03ee02
Show file tree
Hide file tree
Showing 39 changed files with 655 additions and 336 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ bytecheck = "0.6.7"
bytes = "1.1"
bytesize = "1.1"
ceramic-anchor-service = { path = "./anchor-service" }
ceramic-anchor-tx = { path = "./anchor-tx" }
ceramic-api = { path = "./api" }
ceramic-api-server = { path = "./api-server" }
ceramic-arrow-test = { path = "./arrow-test" }
Expand Down
40 changes: 18 additions & 22 deletions anchor-remote/src/cas_remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@ use ceramic_anchor_service::{
DetachedTimeEvent, MerkleNode, MerkleNodes, RootTimeEvent, TransactionManager,
};
use ceramic_car::CarReader;
use ceramic_core::{
cid_from_ed25519_key_pair, did_key_from_ed25519_key_pair, Cid, StreamId, StreamIdType,
};
use ceramic_core::{Cid, NodeId, StreamId};
use ceramic_event::unvalidated::Proof;

pub const AGENT_VERSION: &str = concat!("ceramic-one/", env!("CARGO_PKG_VERSION"));
Expand Down Expand Up @@ -63,23 +61,15 @@ struct CasAnchorResponse {
pub witness_car: Option<String>,
}

fn cid_to_stream_id(cid: Cid) -> StreamId {
StreamId {
r#type: StreamIdType::Unloadable,
cid,
}
}

/// Remote CAS transaction manager
pub struct RemoteCas {
node_id: NodeId,
signing_key: Ed25519KeyPair,
url: String,
poll_interval: Duration,
poll_retry_count: u32,
jws_header_b64: String,
http_client: reqwest::Client,
/// The did:key of the node represented as a StreamId
node_stream_id: StreamId,
}

enum CasResponseParseResult {
Expand Down Expand Up @@ -122,12 +112,13 @@ impl TransactionManager for RemoteCas {
impl RemoteCas {
/// Create a new RemoteCas instance
pub fn new(
node_id: NodeId,
keypair: Ed25519KeyPair,
remote_anchor_service_url: String,
anchor_poll_interval: Duration,
anchor_poll_retry_count: u32,
) -> Self {
let controller = did_key_from_ed25519_key_pair(&keypair);
let controller = node_id.did_key();
let jws_header = Header {
kid: format!(
"{}#{}",
Expand All @@ -141,22 +132,21 @@ impl RemoteCas {
};
let jws_header_b64 =
b64.encode(serde_json::to_vec(&jws_header).expect("invalid jws header"));
let node_stream_id = cid_to_stream_id(cid_from_ed25519_key_pair(&keypair));
Self {
node_id,
signing_key: keypair,
url: format!("{}/api/v0/requests", remote_anchor_service_url),
poll_interval: anchor_poll_interval,
poll_retry_count: anchor_poll_retry_count,
jws_header_b64,
http_client: reqwest::Client::new(),
node_stream_id,
}
}

/// Create an anchor request on the remote CAS
pub async fn create_anchor_request(&self, root_cid: Cid) -> Result<String> {
let cas_request_body = serde_json::to_string(&CasAnchorRequest {
stream_id: self.node_stream_id.clone(),
stream_id: self.node_id.stream_id(),
cid: root_cid.to_string(),
timestamp: chrono::Utc::now().to_rfc3339(),
ceramic_one_version: AGENT_VERSION.to_owned(),
Expand Down Expand Up @@ -250,10 +240,10 @@ mod tests {
use ring::signature::Ed25519KeyPair;

use ceramic_anchor_service::{AnchorService, MockAnchorClient, Store, TransactionManager};
use ceramic_core::{ed25519_key_pair_from_secret, Cid};
use ceramic_core::Cid;

fn node_private_key() -> Ed25519KeyPair {
ed25519_key_pair_from_secret(
fn node_id_and_private_key() -> (NodeId, Ed25519KeyPair) {
NodeId::try_from_secret(
std::env::var("NODE_PRIVATE_KEY")
// The following secret is NOT authenticated with CAS, it is only used for testing.
.unwrap_or(
Expand All @@ -270,8 +260,10 @@ mod tests {
async fn test_anchor_batch_with_cas() {
let anchor_client = Arc::new(MockAnchorClient::new(10));
let anchor_requests = anchor_client.local_sourced_data_events().await.unwrap();
let (node_id, keypair) = node_id_and_private_key();
let remote_cas = Arc::new(RemoteCas::new(
node_private_key(),
node_id,
keypair,
"https://cas-dev.3boxlabs.com".to_owned(),
Duration::from_secs(1),
1,
Expand All @@ -290,9 +282,11 @@ mod tests {
async fn test_create_anchor_request_with_cas() {
let mock_root_cid =
Cid::from_str("bafyreia776z4jdg5zgycivcpr3q6lcu6llfowkrljkmq3bex2k5hkzat54").unwrap();
let (node_id, keypair) = node_id_and_private_key();

let remote_cas = RemoteCas::new(
node_private_key(),
node_id,
keypair,
"https://cas-dev.3boxlabs.com".to_owned(),
Duration::from_secs(1),
1,
Expand All @@ -316,8 +310,10 @@ mod tests {
async fn test_jwt() {
let mock_data = serde_ipld_dagcbor::to_vec(b"mock root").unwrap();
let mock_hash = MultihashDigest::digest(&Code::Sha2_256, &mock_data);
let (node_id, keypair) = node_id_and_private_key();
let remote_cas = Arc::new(RemoteCas::new(
node_private_key(),
node_id,
keypair,
"https://cas-dev.3boxlabs.com".to_owned(),
Duration::from_secs(1),
1,
Expand Down
35 changes: 22 additions & 13 deletions api/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use ceramic_api_server::{
ExperimentalInterestsGetResponse, FeedEventsGetResponse, FeedResumeTokenGetResponse,
InterestsPostResponse,
};
use ceramic_core::{Cid, EventId, Interest, Network, PeerId, StreamId};
use ceramic_core::{Cid, EventId, Interest, Network, NodeId, PeerId, StreamId};
use futures::TryFutureExt;
use recon::Key;
use swagger::{ApiError, ByteArray};
Expand Down Expand Up @@ -245,7 +245,11 @@ impl ApiItem {
#[async_trait]
pub trait EventService: Send + Sync {
/// Returns (new_key, new_value) where true if was newly inserted, false if it already existed.
async fn insert_many(&self, items: Vec<ApiItem>) -> Result<Vec<EventInsertResult>>;
async fn insert_many(
&self,
items: Vec<ApiItem>,
informant: NodeId,
) -> Result<Vec<EventInsertResult>>;
async fn range_with_values(
&self,
range: Range<EventId>,
Expand Down Expand Up @@ -280,8 +284,12 @@ pub trait EventService: Send + Sync {

#[async_trait::async_trait]
impl<S: EventService> EventService for Arc<S> {
async fn insert_many(&self, items: Vec<ApiItem>) -> Result<Vec<EventInsertResult>> {
self.as_ref().insert_many(items).await
async fn insert_many(
&self,
items: Vec<ApiItem>,
informant: NodeId,
) -> Result<Vec<EventInsertResult>> {
self.as_ref().insert_many(items, informant).await
}

async fn range_with_values(
Expand Down Expand Up @@ -333,7 +341,7 @@ struct InsertTask {

#[derive(Clone)]
pub struct Server<C, I, M> {
peer_id: PeerId,
node_id: NodeId,
network: Network,
interest: I,
model: Arc<M>,
Expand All @@ -349,17 +357,17 @@ where
I: InterestService,
M: EventService + 'static,
{
pub fn new(peer_id: PeerId, network: Network, interest: I, model: Arc<M>) -> Self {
pub fn new(node_id: NodeId, network: Network, interest: I, model: Arc<M>) -> Self {
let (tx, event_rx) = tokio::sync::mpsc::channel::<EventInsert>(1024);
let event_store = model.clone();

let handle = Self::start_insert_task(event_store, event_rx);
let handle = Self::start_insert_task(event_store, event_rx, node_id);
let insert_task = Arc::new(InsertTask {
_handle: handle,
tx,
});
Server {
peer_id,
node_id,
network,
interest,
model,
Expand All @@ -377,6 +385,7 @@ where
fn start_insert_task(
event_store: Arc<M>,
mut event_rx: tokio::sync::mpsc::Receiver<EventInsert>,
node_id: NodeId,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_millis(FLUSH_INTERVAL_MS));
Expand All @@ -389,7 +398,7 @@ where
let mut buf = Vec::with_capacity(EVENTS_TO_RECEIVE);
tokio::select! {
_ = interval.tick() => {
Self::process_events(&mut events, &event_store).await;
Self::process_events(&mut events, &event_store, node_id).await;
}
val = event_rx.recv_many(&mut buf, EVENTS_TO_RECEIVE) => {
if val > 0 {
Expand All @@ -400,7 +409,7 @@ where
let shutdown = event_rx.is_closed();
// make sure the events queue doesn't get too deep when we're under heavy load
if events.len() >= EVENT_INSERT_QUEUE_SIZE || shutdown {
Self::process_events(&mut events, &event_store).await;
Self::process_events(&mut events, &event_store, node_id).await;
}
if shutdown {
tracing::info!("Shutting down insert task.");
Expand All @@ -410,7 +419,7 @@ where
})
}

async fn process_events(events: &mut Vec<EventInsert>, event_store: &Arc<M>) {
async fn process_events(events: &mut Vec<EventInsert>, event_store: &Arc<M>, node_id: NodeId) {
if events.is_empty() {
return;
}
Expand All @@ -421,7 +430,7 @@ where
items.push(ApiItem::new(req.id, req.data));
});
tracing::trace!("calling insert many with {} items.", items.len());
match event_store.insert_many(items).await {
match event_store.insert_many(items, node_id).await {
Ok(results) => {
tracing::debug!("insert many returned {} results.", results.len());
for result in results {
Expand Down Expand Up @@ -682,7 +691,7 @@ where
// Update interest ranges to include this new subscription.
let interest = Interest::builder()
.with_sep_key(&interest.sep)
.with_peer_id(&self.peer_id)
.with_peer_id(&self.node_id.peer_id())
.with_range((start.as_slice(), stop.as_slice()))
.with_not_after(0)
.build();
Expand Down
4 changes: 2 additions & 2 deletions api/src/server/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ where
}
}
}
unvalidated::Event::Unsigned(event) => {
event_id_from_init_payload(&event_cid, network, &event_cid, event.payload())
unvalidated::Event::Unsigned(init) => {
event_id_from_init_payload(&event_cid, network, &event_cid, init.payload())
}
}
}
Expand Down
Loading

0 comments on commit c03ee02

Please sign in to comment.