Skip to content

Commit

Permalink
Write a bunch of doc strings
Browse files Browse the repository at this point in the history
  • Loading branch information
adzialocha committed Jan 29, 2025
1 parent 8db92b6 commit 7167b84
Showing 1 changed file with 108 additions and 9 deletions.
117 changes: 108 additions & 9 deletions aardvark-doc/src/crdt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,20 @@ use anyhow::Result;
use loro::event::{Diff, DiffEvent};
use loro::{EventTriggerKind, ExportMode, LoroDoc, Subscription};

/// Identifier in container where we store the text in the Loro document.
/// Identifier of container where we handle the text CRDT in a Loro document.
///
/// Loro documents can contain multiple different CRDT types in one document. We can address these
/// with identifiers.
const TEXT_CONTAINER_ID: &str = "document";

pub type EventReceiver = async_channel::Receiver<TextCrdtEvent>;

/// Manages a Conflict-free Replicated Data Type (CRDTs) to resolve parallel edits by multiple
/// authors on the same text document.
///
/// Internally this uses a text CRDT implementation by Loro. This interface serves merely as a
/// wrapper to bring Loro and it's data into the shape we need, without worrying too much about the
/// internal details of Loro.
pub struct TextCrdt {
doc: RefCell<LoroDoc>,
event_rx: EventReceiver,
Expand All @@ -21,6 +30,9 @@ pub struct TextCrdt {
}

impl TextCrdt {
/// Returns new instance managing a text CRDT.
///
/// Use this when creating a new document.
pub fn new(peer_id: u64) -> Self {
let doc = LoroDoc::new();
doc.set_record_timestamp(false);
Expand All @@ -29,6 +41,14 @@ impl TextCrdt {

let text = doc.get_text(TEXT_CONTAINER_ID);

// NOTE(adz): We're introducing a non-tokio channel implementation here as using a tokio
// channel would cause a panic in this setup.
//
// Tokio (rightly) informs us about using a `send_blocking` inside the same thread where
// the async runtime operates, thus potentially blocking it.
//
// This is not optimal but seems to work for now, later we might want to look into running
// the whole CRDT logic in a separate thread.
let (event_tx, event_rx) = async_channel::bounded::<TextCrdtEvent>(64);

let subscription = {
Expand Down Expand Up @@ -60,6 +80,11 @@ impl TextCrdt {
}
}

/// Returns text CRDT instance from a snapshot.
///
/// Use this when restoring an existing, local document (for example when it was stored on your
/// file system) or when receiving a full snapshot from another peer after joining an existing
/// document.
pub fn from_bytes(peer_id: u64, bytes: &[u8]) -> Result<Self> {
let crdt = Self::new(peer_id);
{
Expand All @@ -69,10 +94,39 @@ impl TextCrdt {
Ok(crdt)
}

/// Subscribe to changes to the document.
///
/// This should be used as the "source of truth" for all text operations (local and remote text
/// inserts and removals), affecting all "higher layer" state (text buffer).
///
/// ## Local Changes
///
/// ```text
/// -> User types something
/// -> Text CRDT "insert" or "removal" called
/// -> Commit & create "local" delta event
/// -> Delta Event received via subscription
/// -> Apply delta to text buffer
/// ```
///
/// ## Remote Changes
///
/// ```text
/// -> Received deltas from remote peer (via networking layer)
/// -> Apply encoded delta to Text CRDT
/// -> Commit & create "remote" delta event
/// -> Delta Event received via subscription
/// -> Apply delta to text buffer
/// ```
pub fn subscribe(&mut self) -> EventReceiver {
self.event_rx.clone()
}

/// Inserts text at the given unicode position.
///
/// This text change gets directly committed, causing a local "delta event" which should be
/// used to update "higher layer" state, like the text buffer. Read [`subscribe`] for receiving
/// and handling these events.
pub fn insert(&mut self, index: usize, chunk: &str) -> Result<()> {
let doc = self.doc.get_mut();
let text = doc.get_text(TEXT_CONTAINER_ID);
Expand All @@ -81,6 +135,11 @@ impl TextCrdt {
Ok(())
}

/// Removes range of text at the given unicode position with unicode length.
///
/// This text change gets directly committed, causing a local "delta event" which should be
/// used to update "higher layer" state, like the text buffer. Read [`subscribe`] for receiving
/// and handling these events.
pub fn remove(&mut self, index: usize, len: usize) -> Result<()> {
let doc = self.doc.get_mut();
let text = doc.get_text(TEXT_CONTAINER_ID);
Expand All @@ -89,13 +148,31 @@ impl TextCrdt {
Ok(())
}

/// Applies encoded text deltas received from a remote peer.
///
/// Deltas are encoded according to the Loro specification.
pub fn apply_encoded_delta(&mut self, bytes: &[u8]) -> Result<()> {
let doc = self.doc.get_mut();
doc.import_with(bytes, "delta")?;
Ok(())
}

pub fn apply_delta(&mut self, delta: TextDelta) -> Result<()> {
/// Exports encoded snapshot of current Text CRDT state.
///
/// This can be used to persist the current state of the text CRDT on the file system or during
/// initial sync when a remote peer joins our document. See `[from_bytes]` for the reverse
/// method.
///
/// Snapshots are encoded according to the Loro specification.
pub fn snapshot(&self) -> Vec<u8> {
let doc = self.doc.borrow();
doc.export(ExportMode::Snapshot)
.expect("encoded crdt snapshot")
}

/// Applies local text changes.
#[cfg(test)]
fn apply_delta(&mut self, delta: TextDelta) -> Result<()> {
match delta {
TextDelta::Insert { index, chunk } => {
self.insert(index, &chunk)?;
Expand All @@ -107,12 +184,6 @@ impl TextCrdt {

Ok(())
}

pub fn snapshot(&self) -> Vec<u8> {
let doc = self.doc.borrow();
doc.export(ExportMode::Snapshot)
.expect("encoded crdt snapshot")
}
}

impl fmt::Display for TextCrdt {
Expand All @@ -129,15 +200,32 @@ pub enum TextDelta {
Remove { index: usize, len: usize },
}

/// Events to notify other parts of the application about text changes.
#[derive(Debug)]
pub enum TextCrdtEvent {
/// We've locally inserted or removed text.
///
/// Use this to apply changes to your local text buffer, etc.
Local(TextDelta),

/// Same as `Local` but in encoded form, including additional information like a vector clock,
/// so we can send that delta over the wire to other peers.
///
/// Use this to send "small" text changes directly to other peers, for example via gossip
/// broadcast.
LocalEncoded(Vec<u8>),

/// Remote peer inserted or removed text.
///
/// If a snapshot was received (for example during initial sync), this event might contain
/// multiple deltas.
///
/// Use this to apply remote changes to your local text buffer.
Remote(Vec<TextDelta>),
}

impl TextCrdtEvent {
fn from_deltas(triggered_by: loro::EventTriggerKind, mut deltas: Vec<TextDelta>) -> Self {
fn from_deltas(triggered_by: EventTriggerKind, mut deltas: Vec<TextDelta>) -> Self {
match triggered_by {
EventTriggerKind::Local => {
// Since we're committing inserts and removals directly on local changes, we can assure
Expand All @@ -152,6 +240,8 @@ impl TextCrdtEvent {
}
}

/// Loro supports all sorts of CRDTs (Lists, Maps, Counters, etc.), this method extracts only the
/// deltas related to collaborative text editing of our known text container.
fn extract_text_deltas(diff_event: DiffEvent<'_>) -> Vec<loro::TextDelta> {
diff_event
.events
Expand All @@ -171,6 +261,15 @@ fn extract_text_deltas(diff_event: DiffEvent<'_>) -> Vec<loro::TextDelta> {
.collect()
}

/// Converts relative text deltas to absolute ones.
///
/// Loro's text deltas are represented as QuillJS "Deltas" which encode text inserts and removals
/// relative to position 0 in the document.
///
/// For our purposes we need absolute positions, as our text buffer implementation requires the
/// exact position for every text insertion and removal.
///
/// Read more: https://quilljs.com/docs/delta/
fn absolute_deltas(loro_deltas: Vec<loro::TextDelta>) -> Vec<TextDelta> {
let mut deltas = Vec::new();
let mut index = 0;
Expand Down

0 comments on commit 7167b84

Please sign in to comment.