Skip to content

Commit

Permalink
WIP: Integrate new loro CRDT
Browse files Browse the repository at this point in the history
  • Loading branch information
adzialocha committed Jan 29, 2025
1 parent c882551 commit 653478c
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 53 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions aardvark-app/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ aardvark-node = { path = "../aardvark-node" }
anyhow = "1.0.94"
gettext-rs = { version = "0.7", features = ["gettext-system"] }
gtk = { version = "0.9", package = "gtk4", features = ["gnome_47"] }
p2panda-core = { version = "0.2.0", default-features = false }
serde = { version = "1.0.215", features = ["derive"] }
serde_json = "1.0.128"
sourceview = { package = "sourceview5", version = "0.9" }
Expand Down
121 changes: 74 additions & 47 deletions aardvark-app/src/application.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@

use std::cell::{OnceCell, RefCell};

use aardvark_doc::{TextCrdt, TextCrdtEvent};
use aardvark_doc::{TextCrdt, TextCrdtEvent, TextDelta};
use aardvark_node::network;
use adw::prelude::*;
use adw::subclass::prelude::*;
use gettextrs::gettext;
use gtk::{gio, glib};
use p2panda_core::PrivateKey;
use tokio::sync::{mpsc, oneshot};

use crate::config::VERSION;
Expand All @@ -38,7 +39,9 @@ mod imp {
#[derive(Debug)]
pub struct AardvarkApplication {
pub window: OnceCell<AardvarkWindow>,
pub document: Document,
// TODO(adz): The CRDT and backend channels will be moved into `aardvark-doc` in the next
// refactoring:
pub document: TextCrdt,
pub tx: mpsc::Sender<Vec<u8>>,
pub rx: RefCell<Option<mpsc::Receiver<Vec<u8>>>>,
#[allow(dead_code)]
Expand All @@ -50,12 +53,20 @@ mod imp {
#[glib::object_subclass]
impl ObjectSubclass for AardvarkApplication {
const NAME: &'static str = "AardvarkApplication";

type Type = super::AardvarkApplication;
type ParentType = adw::Application;

fn new() -> Self {
let document = Document::default();
let (backend_shutdown, tx, rx) = network::run().expect("running p2p backend");
// TODO(adz): We probably want to persist the private key somewhere on the file-system
// or generate a new one on first start.
let private_key = PrivateKey::new();
let public_key = private_key.public_key();
println!("my public key: {}", public_key);

let document = TextCrdt::new(public_key.into());
let (backend_shutdown, tx, rx) =
network::run(private_key).expect("running p2p backend");

AardvarkApplication {
document,
Expand Down Expand Up @@ -149,22 +160,53 @@ impl AardvarkApplication {

glib::spawn_future_local(async move {
while let Some(bytes) = rx.recv().await {
application.ingest_message(bytes);
application.on_remote_message(bytes);
}
});
}

{
let application = self.clone();

let mut document_rx = self.imp().document.subscribe();
let mut network_tx = self.imp().tx.clone();

glib::spawn_future_local(async move {
while let Ok(event) = document_rx.recv().await {
match event {
TextCrdtEvent::LocalEncoded(bytes) => {
if network_tx.send(bytes).await.is_err() {
break;
}
}
TextCrdtEvent::Local(text_delta) => {
// @TODO(adz): Later we want to apply changes to the text buffer
// here. Something along the lines of:
// application.on_deltas_received(vec![text_delta});
}
TextCrdtEvent::Remote(text_deltas) => {
application.on_deltas_received(text_deltas);
}
}
}
});
}

{
let application = self.clone();

// @TODO(adz): At this stage the text buffer was already changed. We should instead
// intercept the event, forward it here to the document, which handles the change,
// fires an event (see TextCrdtEvent::Local) which then finally manipulates the
// text buffer.
window.get_text_buffer().connect_closure(
"text-change",
false,
closure_local!(|_buffer: AardvarkTextBuffer,
position: i32,
del: i32,
text: &str| {
application.update_text(position, del, text);
application.on_local_text_change(position as usize, del as usize, text);
}),
);
}
Expand All @@ -173,55 +215,40 @@ impl AardvarkApplication {
})
}

fn ingest_message(&self, message: Vec<u8>) {
fn on_remote_message(&self, bytes: Vec<u8>) {
let document = &self.imp().document;
let window = self.imp().window.get().unwrap();
let buffer = window.get_text_buffer();

// Apply remote changes to our local text CRDT
if let Err(err) = document.load_incremental(&message) {
eprintln!("failed applying text change from remote peer to automerge document: {err}");
if let Err(err) = document.apply_encoded_delta(&bytes) {
eprintln!("received invalid message: {}", err);
window.add_toast(adw::Toast::new("The network provided bad data!"));
return;
}
}

fn on_local_text_change(&self, position: usize, del: usize, text: &str) {
if del == 0 {
self.imp()
.document
.insert(position, text)
.expect("update document after text insertion");
} else {
self.imp()
.document
.remove(position, del)
.expect("update document after text removal");
}
}

// Get latest changes and apply them to our local text buffer
for patch in document.diff_incremental() {
match &patch.action {
PatchAction::SpliceText { index, value, .. } => {
buffer.splice(*index as i32, 0, value.make_string().as_str());
fn on_deltas_received(&self, text_deltas: Vec<TextDelta>) {
let buffer = self.imp().window.get().unwrap().get_text_buffer();
for delta in text_deltas {
match delta {
TextDelta::Insert { index, chunk } => {
buffer.splice(index as i32, 0, &chunk);
}
PatchAction::DeleteSeq { index, length } => {
buffer.splice(*index as i32, *length as i32, "");
TextDelta::Remove { index, len } => {
buffer.splice(index as i32, len as i32, "");
}
_ => (),
}
}

// Sanity check that the text buffer and CRDT are in the same state
if buffer.full_text() != document.text() {
window.add_toast(adw::Toast::new(
"The CRDT and the text view have different states!",
));
// if the state diverged, use the CRDT as the source of truth
buffer.set_text(&document.text());
}

dbg!(document.text());
}

fn update_text(&self, position: i32, del: i32, text: &str) {
self.imp()
.document
.update(position, del, text)
.expect("update automerge document after text update");

let bytes = self.imp().document.save_incremental();
let tx = self.imp().tx.clone();
glib::spawn_future_local(async move {
tx.send(bytes)
.await
.expect("sending message to networking backend");
});
}
}
8 changes: 6 additions & 2 deletions aardvark-doc/src/crdt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,10 @@ pub struct TextCrdt {
}

impl TextCrdt {
/// Returns new instance managing a text CRDT.
/// Returns new instance managing a text CRDT. Use this when creating a new document.
///
/// Use this when creating a new document.
/// The peer id represents the identity of the author applying local changes (that's
/// essentially us), it needs be strictly unique.
pub fn new(peer_id: u64) -> Self {
let doc = LoroDoc::new();
doc.set_record_timestamp(false);
Expand Down Expand Up @@ -85,6 +86,9 @@ impl TextCrdt {
/// Use this when restoring an existing, local document (for example when it was stored on your
/// file system) or when receiving a full snapshot from another peer after joining an existing
/// document.
///
/// The peer id represents the identity of the author applying local changes (that's
/// essentially us), it needs be strictly unique.
pub fn from_bytes(peer_id: u64, bytes: &[u8]) -> Result<Self, TextCrdtError> {
let crdt = Self::new(peer_id);
{
Expand Down
7 changes: 3 additions & 4 deletions aardvark-node/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,9 @@ impl TopicLogMap<TextDocument, LogId> for TextDocumentStore {
}

#[allow(clippy::type_complexity)]
pub fn run() -> Result<(
pub fn run(
private_key: PrivateKey,
) -> Result<(
oneshot::Sender<()>,
mpsc::Sender<Vec<u8>>,
mpsc::Receiver<Vec<u8>>,
Expand All @@ -100,9 +102,6 @@ pub fn run() -> Result<(
let network_id = Hash::new(b"aardvark <3");
let document_id = TextDocument(Hash::new(b"my first doc <3").into());

let private_key = PrivateKey::new();
println!("my public key: {}", private_key.public_key());

let mut operations_store = MemoryStore::<LogId, AardvarkExtensions>::new();
let documents_store = TextDocumentStore::new();
documents_store
Expand Down

0 comments on commit 653478c

Please sign in to comment.