Skip to content

Commit

Permalink
Methods to create new document or join one
Browse files Browse the repository at this point in the history
  • Loading branch information
adzialocha authored and jsparber committed Feb 11, 2025
1 parent f69982e commit 5431c79
Show file tree
Hide file tree
Showing 9 changed files with 429 additions and 779 deletions.
1,040 changes: 309 additions & 731 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion aardvark-app/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ aardvark-doc = { path = "../aardvark-doc" }
aardvark-node = { path = "../aardvark-node" }
gettext-rs = { version = "0.7", features = ["gettext-system"] }
gtk = { version = "0.9", package = "gtk4", features = ["gnome_47"] }
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
sourceview = { package = "sourceview5", version = "0.9" }
tracing = "0.1"
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }

[dependencies.adw]
package = "libadwaita"
Expand Down
2 changes: 1 addition & 1 deletion aardvark-app/src/window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ mod imp {

// TODO: wait for the document to be ready before displaying the buffer
// TODO: The user needs to provide a document id
buffer.set_document(Document::new(&self.service.get().unwrap(), "some id"));
buffer.set_document(Document::new(&self.service.get().unwrap(), None));
}
}

Expand Down
8 changes: 4 additions & 4 deletions aardvark-doc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ version = "0.1.0"
edition = "2021"

[dependencies]
async-channel = "2.3.1"
loro = "1.3.1"
thiserror = "2.0.11"
aardvark-node = { path = "../aardvark-node" }
anyhow = "1.0.94"
async-channel = "2.3.1"
glib = "0.20"
loro = "1.3.1"
p2panda-core = { git = "https://github.com/p2panda/p2panda", rev = "6286e7b31e82a5c97cfe0de0cf020aba93d7a164", default-features = false }
thiserror = "2.0.11"
tokio = { version = "1.42.0", features = ["macros", "test-util"] }
tracing = "0.1"
p2panda-core = { version = "0.2.0", default-features = false }
21 changes: 17 additions & 4 deletions aardvark-doc/src/document.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::cell::{Cell, OnceCell, RefCell};
use std::sync::OnceLock;
use std::str::FromStr;

use anyhow::Result;
use glib::prelude::*;
Expand All @@ -19,7 +20,7 @@ mod imp {
pub struct Document {
#[property(name = "text", get = Self::text, type = String)]
crdt_doc: RefCell<Option<TextCrdt>>,
#[property(get, construct_only)]
#[property(get, construct_only, set = Self::set_id)]
id: OnceCell<String>,
#[property(get, set)]
ready: Cell<bool>,
Expand All @@ -42,6 +43,12 @@ mod imp {
.to_string()
}

pub fn set_id(&self, id: Option<String>) {
if let Some(id) = id {
self.id.set(id).expect("Document id can only be set once");
}
}

pub fn splice_text(&self, index: i32, delete_len: i32, chunk: &str) -> Result<()> {
let mut doc_borrow = self.crdt_doc.borrow_mut();
let doc = doc_borrow.as_mut().expect("crdt_doc to be set");
Expand Down Expand Up @@ -93,9 +100,15 @@ mod imp {

fn constructed(&self) {
let service = self.service.get().unwrap();
let (network_tx, mut rx) = service.document(Hash::new(b"some id"));
let public_key = service.public_key();
let (network_tx, mut rx) = if let Some(id) = self.id.get() {
service.join_document(Hash::from_str(id).expect("Invalid document id"))
} else{
let (document_id, network_tx, rx) = service.create_document();
self.set_id(Some(document_id.to_hex()));
(network_tx, rx)
};

let public_key = service.public_key();
let crdt_doc = TextCrdt::new({
// Take first 8 bytes of public key (32 bytes) to determine a unique "peer id"
// which is used to keep authors apart inside the text crdt.
Expand Down Expand Up @@ -163,7 +176,7 @@ glib::wrapper! {
pub struct Document(ObjectSubclass<imp::Document>);
}
impl Document {
pub fn new(service: &Service, id: &str) -> Self {
pub fn new(service: &Service, id: Option<&str>) -> Self {
glib::Object::builder()
.property("service", service)
.property("id", id)
Expand Down
20 changes: 18 additions & 2 deletions aardvark-doc/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,24 @@ impl Service {
self.imp().network.shutdown();
}

pub(crate) fn document(&self, id: Hash) -> (mpsc::Sender<Vec<u8>>, mpsc::Receiver<Vec<u8>>) {
self.imp().network.get_or_create_document(id)
pub(crate) fn create_document(&self) -> (Hash, mpsc::Sender<Vec<u8>>, mpsc::Receiver<Vec<u8>>) {
let (document_id, tx, rx) = self
.imp()
.network
.create_document()
.expect("to create document");
info!("created new document: {}", document_id.to_hex());
(document_id, tx, rx)
}

pub(crate) fn join_document(
&self,
document_id: Hash,
) -> (mpsc::Sender<Vec<u8>>, mpsc::Receiver<Vec<u8>>) {
self.imp()
.network
.join_document(document_id)
.expect("to join document")
}

pub(crate) fn public_key(&self) -> PublicKey {
Expand Down
14 changes: 7 additions & 7 deletions aardvark-node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@ edition = "2021"
anyhow = "1.0.94"
async-trait = "0.1.83"
ciborium = "0.2.2"
p2panda-core = "0.2.0"
p2panda-discovery = { version = "0.2.0", features = ["mdns"] }
p2panda-net = "0.2.0"
p2panda-store = "0.2.0"
p2panda-stream = "0.2.0"
p2panda-sync = { version = "0.2.0", features = ["log-sync"] }
p2panda-core = { git = "https://github.com/p2panda/p2panda", rev = "6286e7b31e82a5c97cfe0de0cf020aba93d7a164" }
p2panda-discovery = { git = "https://github.com/p2panda/p2panda", rev = "6286e7b31e82a5c97cfe0de0cf020aba93d7a164", features = ["mdns"] }
p2panda-net = { git = "https://github.com/p2panda/p2panda", rev = "6286e7b31e82a5c97cfe0de0cf020aba93d7a164" }
p2panda-store = { git = "https://github.com/p2panda/p2panda", rev = "6286e7b31e82a5c97cfe0de0cf020aba93d7a164" }
p2panda-stream = { git = "https://github.com/p2panda/p2panda", rev = "6286e7b31e82a5c97cfe0de0cf020aba93d7a164" }
p2panda-sync = { git = "https://github.com/p2panda/p2panda", rev = "6286e7b31e82a5c97cfe0de0cf020aba93d7a164", features = ["log-sync"] }
serde = { version = "1.0.215", features = ["derive"] }
tokio = { version = "1.42.0", features = ["rt", "sync"] }
tokio-stream = "0.1.17"
tracing = "0.1"
tracing = "0.1"
57 changes: 39 additions & 18 deletions aardvark-node/src/network.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::sync::Arc;

use anyhow::Result;
use p2panda_core::{Extension, Hash, PrivateKey, PruneFlag};
use p2panda_core::{Hash, PrivateKey, PruneFlag};
use p2panda_discovery::mdns::LocalDiscovery;
use p2panda_net::config::GossipConfig;
use p2panda_net::{FromNetwork, NetworkBuilder, SyncConfiguration, ToNetwork};
Expand Down Expand Up @@ -107,12 +107,37 @@ impl Network {
});
}

pub fn get_or_create_document(
pub fn create_document(
&self,
) -> Result<(Hash, mpsc::Sender<Vec<u8>>, mpsc::Receiver<Vec<u8>>)> {
let mut operations_store = self.inner.operations_store.clone();
let private_key = self.inner.private_key.get().expect("private key to be set");

let (header, _body) = self.inner.runtime.block_on(async {
create_operation(&mut operations_store, private_key, None, None, false).await
})?;

let document: TextDocument = header.extension().expect("document id from our own logs");
let document_id = (&document).into();

let (tx, rx) = self.subscribe(document)?;

Ok((document_id, tx, rx))
}

pub fn join_document(
&self,
document_id: Hash,
) -> (mpsc::Sender<Vec<u8>>, mpsc::Receiver<Vec<u8>>) {
let document: TextDocument = document_id.into();
) -> Result<(mpsc::Sender<Vec<u8>>, mpsc::Receiver<Vec<u8>>)> {
let document = document_id.into();
let (tx, rx) = self.subscribe(document)?;
Ok((tx, rx))
}

fn subscribe(
&self,
document: TextDocument,
) -> Result<(mpsc::Sender<Vec<u8>>, mpsc::Receiver<Vec<u8>>)> {
let (to_network, mut from_app) = mpsc::channel::<Vec<u8>>(512);
let (to_app, from_network) = mpsc::channel(512);

Expand All @@ -123,18 +148,13 @@ impl Network {
.network
.get_or_init(|| async { unreachable!("network not running") })
.await;
let (topic_tx, topic_rx, ready) = network

let (document_tx, document_rx, _gossip_ready) = network
.subscribe(document.clone())
.await
.expect("subscribe to topic");

tokio::task::spawn(async move {
let _ = ready.await;
debug!("network joined!");
});

let document_id_clone = document.clone();
let stream = ReceiverStream::new(topic_rx);
let stream = ReceiverStream::new(document_rx);
let stream = stream.filter_map(|event| match event {
FromNetwork::GossipMessage { bytes, .. } => match decode_gossip_message(&bytes) {
Ok(result) => Some(result),
Expand Down Expand Up @@ -168,10 +188,11 @@ impl Network {
});

let documents_store = network_inner.documents_store.clone();
let document_clone = document.clone();
let _result: JoinHandle<Result<()>> = tokio::task::spawn(async move {
// Process the operations and forward application messages to app layer.
while let Some(operation) = stream.next().await {
let prune_flag: PruneFlag = operation.header.extract().unwrap_or_default();
let prune_flag: PruneFlag = operation.header.extension().unwrap_or_default();
debug!(
"received operation from {}, seq_num={}, prune_flag={}",
operation.header.public_key,
Expand All @@ -181,7 +202,7 @@ impl Network {

// When we discover a new author we need to add them to our "document store".
documents_store
.add_author(document_id_clone.clone(), operation.header.public_key)
.add_author(document_clone.clone(), operation.header.public_key)
.await?;

// Forward the payload up to the app.
Expand All @@ -202,7 +223,7 @@ impl Network {
let private_key = network_inner
.private_key
.get()
.expect("no private key set")
.expect("private key to be set")
.clone();
// Task for handling events coming from the application layer.
let _result: JoinHandle<Result<()>> = tokio::task::spawn(async move {
Expand All @@ -214,7 +235,7 @@ impl Network {
let (header, body) = create_operation(
&mut operations_store,
&private_key,
document.clone(),
Some(document.clone()),
Some(&bytes),
prune_flag,
)
Expand All @@ -230,7 +251,7 @@ impl Network {
let encoded_gossip_operation = encode_gossip_operation(header, body)?;

// Broadcast operation on gossip overlay.
topic_tx
document_tx
.send(ToNetwork::Message {
bytes: encoded_gossip_operation,
})
Expand All @@ -241,6 +262,6 @@ impl Network {
});
});

(to_network, from_network)
Ok((to_network, from_network))
}
}
44 changes: 33 additions & 11 deletions aardvark-node/src/operation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use serde::{Deserialize, Serialize};
use crate::topic::TextDocument;

/// Custom extensions for p2panda header.
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct AardvarkExtensions {
/// If flag is true we can remove all previous operations in this log.
///
Expand All @@ -31,33 +31,54 @@ pub struct AardvarkExtensions {
/// Can be `None` if this operation indicates that we are creating a new document. In this case
/// we take the hash of the header itself to derive the document id.
#[serde(rename = "d")]
pub document_id: TextDocument,
pub document: Option<TextDocument>,
}

impl Extension<PruneFlag> for AardvarkExtensions {
fn extract(&self) -> Option<PruneFlag> {
Some(self.prune_flag.clone())
fn extract(header: &Header<Self>) -> Option<PruneFlag> {
header
.extensions
.as_ref()
.map(|extensions| extensions.prune_flag.clone())
}
}

impl Extension<TextDocument> for AardvarkExtensions {
fn extract(&self) -> Option<TextDocument> {
Some(self.document_id.clone())
fn extract(header: &Header<Self>) -> Option<TextDocument> {
// If this is the first operation in the append-only log we use the hash of the header
// itself to determine the document id, otherwise use the one mentioned in the header by
// subsequent operations.
match header.seq_num {
0 => Some(header.hash().into()),
_ => header
.extensions
.as_ref()
.map(|extensions| extensions.document.clone())
.flatten(),
}
}
}

/// Creates, signs and stores new operation in the author's append-only log.
///
/// We maintain one log per author and document. If no document is specified we create a new
/// operation in a new log. The resulting hash of the header can be used to identify that new
/// document.
pub async fn create_operation(
store: &mut MemoryStore<TextDocument, AardvarkExtensions>,
private_key: &PrivateKey,
document_id: TextDocument,
document: Option<TextDocument>,
body: Option<&[u8]>,
prune_flag: bool,
) -> Result<(Header<AardvarkExtensions>, Option<Body>)> {
let body = body.map(Body::new);

let public_key = private_key.public_key();

let latest_operation = store.latest_operation(&public_key, &document_id).await?;
let latest_operation = match document {
Some(ref document) => store.latest_operation(&public_key, &document).await?,
None => None,
};

let (seq_num, backlink) = match latest_operation {
Some((header, _)) => (header.seq_num + 1, Some(header.hash())),
Expand All @@ -70,7 +91,7 @@ pub async fn create_operation(

let extensions = AardvarkExtensions {
prune_flag: PruneFlag::new(prune_flag),
document_id: document_id.clone(),
document: document.clone(),
};

let mut header = Header {
Expand All @@ -87,13 +108,14 @@ pub async fn create_operation(
};
header.sign(private_key);

let prune_flag: PruneFlag = header.extract().unwrap_or_default();
let log_id: TextDocument = header.extension().expect("document id from our own logs");
let prune_flag: PruneFlag = header.extension().unwrap_or_default();
ingest_operation(
store,
header.clone(),
body.clone(),
header.to_bytes(),
&document_id,
&log_id,
prune_flag.is_set(),
)
.await?;
Expand Down

0 comments on commit 5431c79

Please sign in to comment.