From 653478c6e9f2f20c854f0a62da0f60be99c81022 Mon Sep 17 00:00:00 2001 From: adz Date: Wed, 29 Jan 2025 15:41:32 +0100 Subject: [PATCH] WIP: Integrate new loro CRDT --- Cargo.lock | 1 + aardvark-app/Cargo.toml | 1 + aardvark-app/src/application.rs | 121 +++++++++++++++++++------------- aardvark-doc/src/crdt.rs | 8 ++- aardvark-node/src/network.rs | 7 +- 5 files changed, 85 insertions(+), 53 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0716293..3ba042b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -12,6 +12,7 @@ dependencies = [ "gettext-rs", "gtk4", "libadwaita", + "p2panda-core", "serde", "serde_json", "sourceview5", diff --git a/aardvark-app/Cargo.toml b/aardvark-app/Cargo.toml index a67b226..8492ba0 100644 --- a/aardvark-app/Cargo.toml +++ b/aardvark-app/Cargo.toml @@ -9,6 +9,7 @@ aardvark-node = { path = "../aardvark-node" } anyhow = "1.0.94" gettext-rs = { version = "0.7", features = ["gettext-system"] } gtk = { version = "0.9", package = "gtk4", features = ["gnome_47"] } +p2panda-core = { version = "0.2.0", default-features = false } serde = { version = "1.0.215", features = ["derive"] } serde_json = "1.0.128" sourceview = { package = "sourceview5", version = "0.9" } diff --git a/aardvark-app/src/application.rs b/aardvark-app/src/application.rs index 053c330..a351c2d 100644 --- a/aardvark-app/src/application.rs +++ b/aardvark-app/src/application.rs @@ -20,12 +20,13 @@ use std::cell::{OnceCell, RefCell}; -use aardvark_doc::{TextCrdt, TextCrdtEvent}; +use aardvark_doc::{TextCrdt, TextCrdtEvent, TextDelta}; use aardvark_node::network; use adw::prelude::*; use adw::subclass::prelude::*; use gettextrs::gettext; use gtk::{gio, glib}; +use p2panda_core::PrivateKey; use tokio::sync::{mpsc, oneshot}; use crate::config::VERSION; @@ -38,7 +39,9 @@ mod imp { #[derive(Debug)] pub struct AardvarkApplication { pub window: OnceCell, - pub document: Document, + // TODO(adz): The CRDT and backend channels will be moved into `aardvark-doc` in the next + // refactoring: + pub document: TextCrdt, pub tx: mpsc::Sender>, pub rx: RefCell>>>, #[allow(dead_code)] @@ -50,12 +53,20 @@ mod imp { #[glib::object_subclass] impl ObjectSubclass for AardvarkApplication { const NAME: &'static str = "AardvarkApplication"; + type Type = super::AardvarkApplication; type ParentType = adw::Application; fn new() -> Self { - let document = Document::default(); - let (backend_shutdown, tx, rx) = network::run().expect("running p2p backend"); + // TODO(adz): We probably want to persist the private key somewhere on the file-system + // or generate a new one on first start. + let private_key = PrivateKey::new(); + let public_key = private_key.public_key(); + println!("my public key: {}", public_key); + + let document = TextCrdt::new(public_key.into()); + let (backend_shutdown, tx, rx) = + network::run(private_key).expect("running p2p backend"); AardvarkApplication { document, @@ -149,7 +160,34 @@ impl AardvarkApplication { glib::spawn_future_local(async move { while let Some(bytes) = rx.recv().await { - application.ingest_message(bytes); + application.on_remote_message(bytes); + } + }); + } + + { + let application = self.clone(); + + let mut document_rx = self.imp().document.subscribe(); + let mut network_tx = self.imp().tx.clone(); + + glib::spawn_future_local(async move { + while let Ok(event) = document_rx.recv().await { + match event { + TextCrdtEvent::LocalEncoded(bytes) => { + if network_tx.send(bytes).await.is_err() { + break; + } + } + TextCrdtEvent::Local(text_delta) => { + // @TODO(adz): Later we want to apply changes to the text buffer + // here. Something along the lines of: + // application.on_deltas_received(vec![text_delta}); + } + TextCrdtEvent::Remote(text_deltas) => { + application.on_deltas_received(text_deltas); + } + } } }); } @@ -157,6 +195,10 @@ impl AardvarkApplication { { let application = self.clone(); + // @TODO(adz): At this stage the text buffer was already changed. We should instead + // intercept the event, forward it here to the document, which handles the change, + // fires an event (see TextCrdtEvent::Local) which then finally manipulates the + // text buffer. window.get_text_buffer().connect_closure( "text-change", false, @@ -164,7 +206,7 @@ impl AardvarkApplication { position: i32, del: i32, text: &str| { - application.update_text(position, del, text); + application.on_local_text_change(position as usize, del as usize, text); }), ); } @@ -173,55 +215,40 @@ impl AardvarkApplication { }) } - fn ingest_message(&self, message: Vec) { + fn on_remote_message(&self, bytes: Vec) { let document = &self.imp().document; - let window = self.imp().window.get().unwrap(); - let buffer = window.get_text_buffer(); - // Apply remote changes to our local text CRDT - if let Err(err) = document.load_incremental(&message) { - eprintln!("failed applying text change from remote peer to automerge document: {err}"); + if let Err(err) = document.apply_encoded_delta(&bytes) { + eprintln!("received invalid message: {}", err); window.add_toast(adw::Toast::new("The network provided bad data!")); - return; } + } + + fn on_local_text_change(&self, position: usize, del: usize, text: &str) { + if del == 0 { + self.imp() + .document + .insert(position, text) + .expect("update document after text insertion"); + } else { + self.imp() + .document + .remove(position, del) + .expect("update document after text removal"); + } + } - // Get latest changes and apply them to our local text buffer - for patch in document.diff_incremental() { - match &patch.action { - PatchAction::SpliceText { index, value, .. } => { - buffer.splice(*index as i32, 0, value.make_string().as_str()); + fn on_deltas_received(&self, text_deltas: Vec) { + let buffer = self.imp().window.get().unwrap().get_text_buffer(); + for delta in text_deltas { + match delta { + TextDelta::Insert { index, chunk } => { + buffer.splice(index as i32, 0, &chunk); } - PatchAction::DeleteSeq { index, length } => { - buffer.splice(*index as i32, *length as i32, ""); + TextDelta::Remove { index, len } => { + buffer.splice(index as i32, len as i32, ""); } - _ => (), } } - - // Sanity check that the text buffer and CRDT are in the same state - if buffer.full_text() != document.text() { - window.add_toast(adw::Toast::new( - "The CRDT and the text view have different states!", - )); - // if the state diverged, use the CRDT as the source of truth - buffer.set_text(&document.text()); - } - - dbg!(document.text()); - } - - fn update_text(&self, position: i32, del: i32, text: &str) { - self.imp() - .document - .update(position, del, text) - .expect("update automerge document after text update"); - - let bytes = self.imp().document.save_incremental(); - let tx = self.imp().tx.clone(); - glib::spawn_future_local(async move { - tx.send(bytes) - .await - .expect("sending message to networking backend"); - }); } } diff --git a/aardvark-doc/src/crdt.rs b/aardvark-doc/src/crdt.rs index 4d7eaae..b3af585 100644 --- a/aardvark-doc/src/crdt.rs +++ b/aardvark-doc/src/crdt.rs @@ -30,9 +30,10 @@ pub struct TextCrdt { } impl TextCrdt { - /// Returns new instance managing a text CRDT. + /// Returns new instance managing a text CRDT. Use this when creating a new document. /// - /// Use this when creating a new document. + /// The peer id represents the identity of the author applying local changes (that's + /// essentially us), it needs be strictly unique. pub fn new(peer_id: u64) -> Self { let doc = LoroDoc::new(); doc.set_record_timestamp(false); @@ -85,6 +86,9 @@ impl TextCrdt { /// Use this when restoring an existing, local document (for example when it was stored on your /// file system) or when receiving a full snapshot from another peer after joining an existing /// document. + /// + /// The peer id represents the identity of the author applying local changes (that's + /// essentially us), it needs be strictly unique. pub fn from_bytes(peer_id: u64, bytes: &[u8]) -> Result { let crdt = Self::new(peer_id); { diff --git a/aardvark-node/src/network.rs b/aardvark-node/src/network.rs index 538c124..dbed677 100644 --- a/aardvark-node/src/network.rs +++ b/aardvark-node/src/network.rs @@ -80,7 +80,9 @@ impl TopicLogMap for TextDocumentStore { } #[allow(clippy::type_complexity)] -pub fn run() -> Result<( +pub fn run( + private_key: PrivateKey, +) -> Result<( oneshot::Sender<()>, mpsc::Sender>, mpsc::Receiver>, @@ -100,9 +102,6 @@ pub fn run() -> Result<( let network_id = Hash::new(b"aardvark <3"); let document_id = TextDocument(Hash::new(b"my first doc <3").into()); - let private_key = PrivateKey::new(); - println!("my public key: {}", private_key.public_key()); - let mut operations_store = MemoryStore::::new(); let documents_store = TextDocumentStore::new(); documents_store