Massively scalable collaborative text editor backend with Rama in 120 LOC
This is part of a series of posts exploring programming with Rama, ranging from interactive consumer apps, high-scale analytics, background processing, recommendation engines, and much more. This tutorial is self-contained, but for broader information about Rama and how it reduces the cost of building backends so much (up to 100x for large-scale backends), see our website.
Like all Rama applications, the example in this post requires very little code. It’s easily scalable to millions of reads/writes per second, ACID compliant, high performance, and fault-tolerant from how Rama incrementally replicates all state. Deploying, updating, and scaling this application are all one-line CLI commands. No other infrastructure besides Rama is needed. Comprehensive monitoring on all aspects of runtime operation is built-in.
In this post, I’ll explore building the backend for a real-time collaborative text editor like Google Docs or Etherpad. The technical challenge of building an application like this is conflict resolution. When multiple people edit the same text at the same time, what should be the result? If a user makes a lot of changes offline, when they come online how should their changes be merged in to a document whose state may have diverged significantly?
There are many approaches for solving this problem. I’ll show an implementation of “operational transformations” similar to the one Google Docs uses as described here. Only incremental changes are sent back and forth between server and client, such as “Add text ‘hello’ to offset 128” or “Remove 14 characters starting from offset 201”. When clients send a change to the server, they also say what version of the document the change was applied to. When the server receives a change from an earlier document version, it applies the “operational transformation” algorithm to modify the addition or removal to fit with the latest document version.
Code will be shown in both Clojure and Java, with the total code being about 120 lines for the Clojure implementation and 160 lines for the Java implementation. Most of the code is implementing the “operational transformation” algorithm, which is just plain Clojure or Java functions, and the Rama code handling storage/queries is just 40 lines. You can download and play with the Clojure implementation in this repository or the Java implementation in this repository.
Operational transformations
The idea behind “operational transformations” is simple and can be understood through a few examples. Suppose Alice and Bob are currently editing a document, and Alice adds "to the " at offset 6 when the document is at version 3, like so:

However, when the change gets to the server, the document is actually at version 4 since Bob added "big " to offset 0:

Applying Alice’s change without modification would produce the string “big heto the llo world”, which is completely wrong. Instead, the server can transform Alice’s change based on the single add that happened between versions 3 and 4 by pushing Alice’s change to the right by the length of "big ". So Alice’s change of “Add ‘to the ’ at offset 6” becomes “Add ‘to the ’ at offset 10”, and the document becomes:

Now suppose instead the change Bob made between versions 3 and 4 was adding " again" to the end:

In this case, Alice’s change should not be modified since Bob’s change was to the right of her addition, and applying Alice’s change will produce “hello to the world again”.
Transforming an addition against a missed remove works the same way: if Bob had removed text to the left of Alice’s addition, her addition would be shifted left by the amount of text removed. If Bob removed text to the right, Alice’s addition is unchanged.
Now let’s take a look at what happens if Alice had removed text at an earlier version, which is slightly trickier to handle. Suppose Alice removes 7 characters starting from offset 2 when the document was “hello world” at version 3. Suppose Bob had added "big " to offset 0 between versions 3 and 4:

In this case, Alice’s removal should be shifted right by that amount to remove 7 characters starting from offset 6. Now suppose Bob had instead added "to the " to offset 6:

In this case, text had been added in the middle of where Alice was deleting text. It’s wrong and confusing for Alice’s removal to delete text that wasn’t in her version of the document. The correct way to handle this is split her removal into two changes: remove 3 characters from offset 13 and then remove 4 characters from offset 2. This produces the following document:

The resulting document isn’t legible, but it’s consistent with the conflicting changes that Alice and Bob were making at the same time without losing any information improperly.
Let’s take a look at one more case. Suppose again that Alice removes 7 characters from offset 2 when the document was “hello world” at version 3. This time, Bob had already removed one character starting from offset 3:

In this case, one of the characters Alice had removed was already removed, so Alice’s removal should be reduced by one to remove 6 characters from offset 2 instead of 7, producing:

There’s a few more intricacies to how operational transformations work, but this gets the gist of the idea across. Since this post is really about using Rama to handle the storage and computation for a real-time collaborative editor backend, I’ll keep it simple and only handle adds and removes. The code can easily be extended to handle other kinds of document edits such as formatting changes.
The Rama code will make use of two functions that encapsulate the “operational transformation” logic, one to apply a series of edits to a document and the other to transform an edit against a particular version against all the edits that happened until the latest version. These functions are as follows:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 | (defn transform-edit [edit missed-edits] (if (instance? AddText (:action edit)) (let [new-offset (reduce (fn [offset missed-edit] (if (<= (:offset missed-edit) offset) (+ offset (add-action-adjustment missed-edit)) offset)) (:offset edit) missed-edits)] [(assoc edit :offset new-offset)]) (reduce (fn [removes {:keys [offset action]}] (if (instance? AddText action) (transform-remove-against-add removes offset action) (transform-remove-against-remove removes offset action))) [edit] missed-edits))) (defn apply-edits [doc edits] (reduce (fn [doc {:keys [offset action]}] (if (instance? AddText action) (setval (srange offset offset) (:content action) doc) (setval (srange offset (+ offset (:amount action))) "" doc))) doc edits)) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 | public static List transformEdit(Map edit, List<Map> missedEdits) { int offset = offset(edit); if(isAddEdit(edit)) { int adjustment = 0; for(Map e: missedEdits) { if(offset(e) <= offset) offset += addActionAdjustment(e); } Map newEdit = new HashMap(edit); newEdit.put("offset", offset + adjustment); return Arrays.asList(newEdit); } else { List removes = Arrays.asList(edit); for(Map e: missedEdits) { if(isAddEdit(e)) removes = transformRemoveAgainstAdd(removes, e); else removes = transformRemoveAgainstRemove(removes, e); } return removes; } } private static String applyEdits(String doc, List<Map> edits) { for(Map edit: edits) { int offset = offset(edit); if(isAddEdit(edit)) { doc = doc.substring(0, offset) + content(edit) + doc.substring(offset); } else { doc = doc.substring(0, offset) + doc.substring(offset + amount(edit)); } } return doc; } |
The “apply edits” function updates a document with a series of edits, and the “transform edit” function implements the described “operational transformation” algorithm. The “transform edit” function returns a list since an operational transformation on a remove edit can split that edit into multiple remove edits, as described in one of the cases above.
The implementation of the helper functions used by “apply edits” and “transform edit” can be found in the Clojure repository or the Java repository.
Backend storage
Indexed datastores in Rama, called PStates (“partitioned state”), are much more powerful and flexible than databases. Whereas databases have fixed data models, PStates can represent infinite data models due to being based on the composition of the simpler primitive of data structures. PStates are distributed, durable, high-performance, and incrementally replicated. Each PState is fine-tuned to what the application needs, and an application makes as many PStates as needed. For this application, we’ll make two PStates: one to track the latest contents of a document, and one to track the sequence of every change made to a document in its history.
Here’s the PState definition for the latest contents of a document:
This PState is a map from a 64-bit document ID to the string contents of the document. The name of a PState always begins with
$$
, and this is equivalent to a key/value database.
Here’s the PState tracking the history of all edits to a document:
This declares the PState as a map of lists, with the key being the 64-bit document ID and the inner lists containing the edit data. The inner list is declared as “subindexed”, which instructs Rama to store the elements individually on disk rather than the whole list read and written as one value. Subindexing enables nested data structures to have billions of elements and still be read and written to extremely quickly. This PState can support many queries in less than one millisecond: get the number of edits for a document, get a single edit at a particular index, or get all edits between two indices.
Let’s now review the broader concepts of Rama in order to understand how these PStates will be materialized.
Rama concepts
A Rama application is called a “module”. In a module you define all the storage and implement all the logic needed for your backend. All Rama modules are event sourced, so all data enters through a distributed log in the module called a “depot”. Most of the work in implementing a module is coding “ETL topologies” which consume data from one or more depots to materialize any number of PStates. Modules look like this at a conceptual level:

Modules can have any number of depots, topologies, and PStates, and clients interact with a module by appending new data to a depot or querying PStates. Although event sourcing traditionally means that processing is completely asynchronous to the client doing the append, with Rama that’s optional. By being an integrated system Rama clients can specify that their appends should only return after all downstream processing and PState updates have completed.
A module deployed to a Rama cluster runs across any number of worker processes across any number of nodes, and a module is scaled by adding more workers. A module is broken up into “tasks” like so:

A “task” is a partition of a module. The number of tasks for a module is specified on deploy. A task contains one partition of every depot and PState for the module as well as a thread and event queue for running events on that task. A running event has access to all depot and PState partitions on that task. Each worker process has a subset of all the tasks for the module.
Coding a topology involves reading and writing to PStates, running business logic, and switching between tasks as necessary.
Implementing the backend
Let’s start implementing the module for the collaborative editor backend. The first step to coding the module is defining the depot:
1 2 3 4 | (defmodule CollaborativeDocumentEditorModule [setup topologies] (declare-depot setup *edit-depot (hash-by :id)) ) |
1 2 3 4 5 6 | public class CollaborativeDocumentEditorModule implements RamaModule { @Override public void define(Setup setup, Topologies topologies) { setup.declareDepot("*edit-depot", Depot.hashBy("id")); } } |
This declares a Rama module called “CollaborativeDocumentEditorModule” with a depot called
*edit-depot
which will receive all new edit information. Objects appended to a depot can be any type. The second argument of declaring the depot is called the “depot partitioner” – more on that later.
To keep the example simple, the data appended to the depot will be
defrecord
objects for the Clojure version and
HashMap
objects for the Java version. To have a tighter schema on depot records you could instead use Thrift, Protocol Buffers, or a language-native tool for defining the types. Here are the functions that will be used to create depot data:
1 2 3 4 5 6 7 8 9 10 | (defrecord AddText [content]) (defrecord RemoveText [amount]) (defrecord Edit [id version offset action]) (defn mk-add-edit [id version offset content] (->Edit id version offset (->AddText content))) (defn mk-remove-edit [id version offset amount] (->Edit id version offset (->RemoveText amount))) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | public static Map makeAddEdit(long id, int version, int offset, String content) { Map ret = new HashMap(); ret.put("type", "add"); ret.put("id", id); ret.put("version", version); ret.put("offset", offset); ret.put("content", content); return ret; } public static Map makeRemoveEdit(long id, int version, int offset, int amount) { Map ret = new HashMap(); ret.put("type", "remove"); ret.put("id", id); ret.put("version", version); ret.put("offset", offset); ret.put("amount", amount); return ret; } |
Each edit contains a 64-bit document ID that identifies which document the edit should apply.
Next, let’s begin defining the topology to consume data from the depot and materialize the PStates. Here’s the declaration of the topology with the PStates:
1 2 3 4 5 6 7 8 9 10 11 12 13 | (defmodule CollaborativeDocumentEditorModule [setup topologies] (declare-depot setup *edit-depot (hash-by :id)) (let [topology (stream-topology topologies "core")] (declare-pstate topology $$docs {Long String}) (declare-pstate topology $$edits {Long (vector-schema Edit {:subindex? true})}) )) |
1 2 3 4 5 6 7 8 9 10 11 12 | public class CollaborativeDocumentEditorModule implements RamaModule { @Override public void define(Setup setup, Topologies topologies) { setup.declareDepot("*edit-depot", Depot.hashBy("id")); StreamTopology topology = topologies.stream("core"); topology.pstate("$$docs", PState.mapSchema(Long.class, String.class)); topology.pstate("$$edits", PState.mapSchema(Long.class, PState.listSchema(Map.class).subindexed())); } } |
This defines a stream topology called “core”. Rama has two kinds of topologies, stream and microbatch, which have different properties. In short, streaming is best for interactive applications that need single-digit millisecond update latency, while microbatching has update latency of a few hundred milliseconds and is best for everything else. Streaming is used here because a collaborative editor needs quick feedback from the server as it sends changes back and forth.
Notice that the PStates are defined as part of the topology. Unlike databases, PStates are not global mutable state. A PState is owned by a topology, and only the owning topology can write to it. Writing state in global variables is a horrible thing to do, and databases are just global variables by a different name.
Since a PState can only be written to by its owning topology, they’re much easier to reason about. Everything about them can be understood by just looking at the topology implementation, all of which exists in the same program and is deployed together. Additionally, the extra step of appending to a depot before processing the record to materialize the PState does not lower performance, as we’ve shown in benchmarks. Rama being an integrated system strips away much of the overhead which traditionally exists.
Let’s now add the code to materialize the PStates:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 | (defmodule CollaborativeDocumentEditorModule [setup topologies] (declare-depot setup *edit-depot (hash-by :id)) (let [topology (stream-topology topologies "core")] (declare-pstate topology $$docs {Long String}) (declare-pstate topology $$edits {Long (vector-schema Edit {:subindex? true})}) (<<sources topology (source> *edit-depot :> {:keys [*id *version] :as *edit}) (local-select> [(keypath *id) (view count)] $$edits :> *latest-version) (<<if (= *latest-version *version) (vector *edit :> *final-edits) (else>) (local-select> [(keypath *id) (srange *version *latest-version)] $$edits :> *missed-edits) (transform-edit *edit *missed-edits :> *final-edits)) (local-select> [(keypath *id) (nil->val "")] $$docs :> *latest-doc) (apply-edits *latest-doc *final-edits :> *new-doc) (local-transform> [(keypath *id) (termval *new-doc)] $$docs) (local-transform> [(keypath *id) END (termval *final-edits)] $$edits) ))) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 | public class CollaborativeDocumentEditorModule implements RamaModule { @Override public void define(Setup setup, Topologies topologies) { setup.declareDepot("*edit-depot", Depot.hashBy("id")); StreamTopology topology = topologies.stream("core"); topology.pstate("$$docs", PState.mapSchema(Long.class, String.class)); topology.pstate("$$edits", PState.mapSchema(Long.class, PState.listSchema(Map.class).subindexed())); topology.source("*edit-depot").out("*edit") .each(Ops.GET, "*edit", "id").out("*id") .each(Ops.GET, "*edit", "version").out("*version") .localSelect("$$edits", Path.key("*id").view(Ops.SIZE)).out("*latest-version") .ifTrue(new Expr(Ops.EQUAL, "*latest-version", "*version"), Block.each(Ops.TUPLE, "*edit").out("*final-edits"), Block.localSelect("$$edits", Path.key("*id") .sublist("*version", "*latest-version")).out("*missed-edits") .each(CollaborativeDocumentEditorModule::transformEdit, "*edit", "*missed-edits").out("*final-edits")) .localSelect("$$docs", Path.key("*id").nullToVal("")).out("*latest-doc") .each(CollaborativeDocumentEditorModule::applyEdits, "*latest-doc", "*final-edits").out("*new-doc") .localTransform("$$docs", Path.key("*id").termVal("*new-doc")) .localTransform("$$edits", Path.key("*id").end().termVal("*final-edits")); } } |
The code to implement the topology is less than 20 lines, but there’s a lot to unpack here. The business logic is implemented with dataflow. Rama’s dataflow API is exceptionally expressive, able to intermix arbitrary business logic with loops, conditionals, and moving computation between tasks. This post is not going to explore all the details of dataflow as there’s simply too much to cover. Full tutorials for Rama dataflow can be found on our website for the Java API and for the Clojure API.
Let’s go over each line of this topology implementation. The first step is subscribing to the depot:
1 2 | (<<sources topology (source> *edit-depot :> {:keys [*id *version] :as *edit}) |
1 2 3 | topology.source("*edit-depot").out("*edit") .each(Ops.GET, "*edit", "id").out("*id") .each(Ops.GET, "*edit", "version").out("*version") |
This subscribes the topology to the depot
*edit-depot
and starts a reactive computation on it. Operations in dataflow do not return values. Instead, they emit values that are bound to new variables. In the Clojure API, the input and outputs to an operation are separated by the
:>
keyword. In the Java API, output variables are bound with the
.out
method.
Whenever data is appended to that depot, the data is emitted into the topology. The Java version binds the emit into the variable
*edit
and then gets the fields “id” and “version” from the map into the variables
*id
and
*version
, while the Clojure version captures the emit as the variable
*edit
and also destructures its fields into the variables
*id
and
*version
. All variables in Rama code begin with a
*
. The subsequent code runs for every single emit.
Remember that last argument to the depot declaration called the “depot partitioner”? That’s relevant here. Here’s that image of the physical layout of a module again:

The depot partitioner determines on which task the append happens and thereby on which task computation begins for subscribed topologies. In this case, the depot partitioner says to hash by the “id” field of the appended data. The target task is computed by taking the hash and modding it by the total number of tasks. This means data with the same ID always go to the same task, while different IDs are evenly spread across all tasks.
Rama gives a ton of control over how computation and storage are partitioned, and in this case we’re partitioning by the hash of the document ID since that’s how we want the PStates to be partitioned. This allows us to easily locate the task storing data for any particular document.
The next line fetches the current version of the document:
1 2 | (local-select> [(keypath *id) (view count)] $$edits :> *latest-version) |
1 | .localSelect("$$edits", Path.key("*id").view(Ops.SIZE)).out("*latest-version") |
The
$$edits
PState contains every edit applied to the document, and the latest version of a document is simply the size of that list. The PState is queried with the “local select” operation with a “path” specifying what to fetch. When a PState is referenced in dataflow code, it always references the partition of the PState that’s located on the task on which the event is currently running.
Paths are a deep topic, and the full documentation for them can be found here. A path is a sequence of “navigators” that specify how to hop through a data structure to target values of interest. A path can target any number of values, and they’re used for both transforms and queries. In this case, the path navigates by the key
*id
to the list of edits for the document. The next navigator
view
runs a function on that list to get its size. The Clojure version uses Clojure’s
count
function, and the Java version uses the
Ops.SIZE
function. The output of the query is bound to the variable
*latest-version
. This is a fast sub-millisecond query no matter how large the list of edits.
The next few lines run the “operational transformation” algorithm if necessary to produce the edits to be applied to the current version of the document:
1 2 3 4 5 6 7 | (<<if (= *latest-version *version) (vector *edit :> *final-edits) (else>) (local-select> [(keypath *id) (srange *version *latest-version)] $$edits :> *missed-edits) (transform-edit *edit *missed-edits :> *final-edits)) |
1 2 3 4 5 6 7 | .ifTrue(new Expr(Ops.EQUAL, "*latest-version", "*version"), Block.each(Ops.TUPLE, "*edit").out("*final-edits"), Block.localSelect("$$edits", Path.key("*id") .sublist("*version", "*latest-version")).out("*missed-edits") .each(CollaborativeDocumentEditorModule::transformEdit, "*edit", "*missed-edits").out("*final-edits")) |
First, an “if” is run to check if the version of the edit is the same as the latest version on the backend. If so, the list of edits to be applied to the document is just the edit unchanged. As mentioned before, the operational transformation algorithm can result in multiple edits being produced from a single edit. The Clojure version produces the single-element list by calling
vector
, and the Java version does so with the
Ops.TUPLE
function. The list of edits is bound to the variable
*final-edits
.
The “else” branch of the “if” handles the case where the edit must be transformed against all edits up to the latest version. The “local select” on
$$edits
fetches all edits from the input edit’s version up to the latest version. The navigator to select the sublist,
srange
in Clojure and
sublist
in Java, takes in as arguments a start offset (inclusive) and end offset (exclusive) and navigates to the sublist of all elements between those offsets. This sublist is bound to the variable
*missed-edits
.
The function shown before implementing operational transformations,
transform-edit
in Clojure and
CollaborativeDocumentEditorModule::transformEdit
in Java, is then run on the edit and all the missed edits to produce the new list of edits and bind them to the variable
*final-edits
.
Any variables bound in both the “then” and “else” branches of an “if” conditional in Rama will be in scope after the conditional. In this case,
*final-edits
is available after the conditional.
*missed-edits
is not available since it is not bound in the “then” branch. This behavior comes from Rama implicitly “unifying” the “then” and “else” branches.
The next bit of code gets the latest document and applies the transformed edits to it:
1 2 3 | (local-select> [(keypath *id) (nil->val "")] $$docs :> *latest-doc) (apply-edits *latest-doc *final-edits :> *new-doc) |
1 2 3 | .localSelect("$$docs", Path.key("*id").nullToVal("")).out("*latest-doc") .each(CollaborativeDocumentEditorModule::applyEdits, "*latest-doc", "*final-edits").out("*new-doc") |
The “local select” fetches the latest version of the document from the
$$docs
PState. The second navigator in the path,
nil->val
in Clojure and
nullToVal
in Java, handles the case where this is the first ever edit on the document. In that case the document ID does not exist in this PState. In that case the key navigation by
*id
would navigate to
null
, so the next navigator instead navigates to the empty string in that case.
The next line runs the previously defined “apply edits” function to apply the transformed edits to produce the new version of the document into the variable
*new-doc
.
The next two lines finish this topology:
1 2 3 4 5 | (local-transform> [(keypath *id) (termval *new-doc)] $$docs) (local-transform> [(keypath *id) END (termval *final-edits)] $$edits) |
1 2 | .localTransform("$$docs", Path.key("*id").termVal("*new-doc")) .localTransform("$$edits", Path.key("*id").end().termVal("*final-edits")); |
The two PStates are updated with the “local transform” operation. Like “local select”, a “local transform” takes in as input a PState and a “path”. Paths for “local transform” navigate to the values to change and then use special “term” navigators to update them.
The first “local transform” navigates into
$$docs
by the document ID and uses the “term val” navigator to set the value there to
*new-doc
. This is exactly the same as doing a “put” into a hash map.
The second “local transform” appends the transformed edits to the end of the list of edits for this document. It navigates to the list by the key
*id
and then navigates to the “end” of the list. More specifically, the “end” navigator navigates to the empty list right after the overall list. Setting that empty list to a new value appends those elements to the overall list, which is what the final “term val” navigator does.
This entire topology definition executes atomically – all the PState queries, operational transformational logic, and PState writes all happen together and nothing else can run on the task in between. This is a result of Rama colocating computation and storage, which will be explored more in the next section.
The power of colocation
Let’s take a look at the physical layout of a module again:

Every task has a partition of each depot and PState as well as an executor thread for running events on that task. Critically, only one event can run at a time on a task. That means each event has atomic access to all depots and PState partitions on that task. Additionally, those depot and PState partition are local to the JVM process running that event so interactions with them are fully in-process (as opposed to the inter-process communication used with databases).
A traditional database handles many read and write requests concurrently, using complex locking strategies and explicit transactions to achieve atomicity. Rama’s approach is different: parallelism is achieved by having many tasks in a module, and atomicity comes from colocation. Rama doesn’t have explicit transactions because transactional behavior is automatic when computation is colocated with storage.
When writing a topology in a module, you have full control over what constitutes a single event. Code runs synchronously on a task unless they explicitly go asynchronous, like with partitioners or yields.
This implementation for a collaborative editor backend is a great example of the power of colocation. The topology consists of completely arbitrary code running fine-grained logic for the “operational transformational” logic and manipulating multiple PStates, and nothing special needed to be done to get the necessary transactional behavior.
When you use a microbatch topology to implement an ETL, you get even stronger transactional behavior. All microbatch topologies are cross-partition transactions in every case, no matter how complex the logic.
You can read more about Rama’s strong ACID semantics on this page.
Query topology to fetch document and version
The module needs one more small thing to complete the functionality necessary for a real-time collaborative editor backend. When the frontend is loaded, it needs to load the latest document contents along with its version. The contents are stored in the
$$docs
PStates, and the version is the size of the list of edits in the
$$edits
PState. So we need to read from both those PStates atomically in one event.
If you were to try to do this with direct PState clients, you would be issuing one query to the client for the
$$edits
PState and one query to the client for the
$$docs
PState. Those queries would run as separate events, and the PStates could be updated in between the queries. This would result in the frontend having incorrect state.
Rama provides a feature called “query topologies” to handle this case. Query topologies are exceptionally powerful, able to implement high-performance, real-time, distributed queries across any or all of the PStates of a module and any or all of the partitions of those PStates. They’re programmed with the exact same dataflow API as used to program ETLs.
For this use case, we only need to query two PStates atomically. So this is a simple use case for query topologies. The full implementation is:
1 2 3 4 5 6 7 | (<<query-topology topologies "doc+version" [*id :> *ret] (|hash *id) (local-select> (keypath *id) $$docs :> *doc) (local-select> [(keypath *id) (view count)] $$edits :> *version) (hash-map :doc *doc :version *version :> *ret) (|origin)) |
1 2 3 4 5 6 7 8 9 10 11 | topologies.query("doc+version", "*id").out("*ret") .hashPartition("*id") .localSelect("$$docs", Path.key("*id")).out("*doc") .localSelect("$$edits", Path.key("*id").view(Ops.SIZE)).out("*version") .each((String doc, Integer version) -> { Map ret = new HashMap(); ret.put("doc", doc); ret.put("version", version); return ret; }, "*doc", "*version").out("*ret") .originPartition(); |
Let’s go through this line by line. The first part declares the query topology and its arguments:
1 2 | (<<query-topology topologies "doc+version" [*id :> *ret] |
1 | topologies.query("doc+version", "*id").out("*ret") |
This declares a query topology named “doc+version” that takes in one argument
*id
as input. It declares the return variable
*ret
, which will be bound by the end of the topology execution.
The next line gets the query to the task of the module containing the data for that ID:
1 | (|hash *id) |
1 | .hashPartition("*id") |
The line does a “hash partition” by the value of
*id
. Partitioners relocate subsequent code to potentially a new task, and a hash partitioner works exactly like the aforementioned depot partitioner. The details of relocating computation, like serializing and deserializing any variables referenced after the partitioner, are handled automatically. The code is linear without any callback functions even though partitioners could be jumping around to different tasks on different nodes.
When the first operation of a query topology is a partitioner, query topology clients are optimized to go directly to that task. You’ll see an example of invoking a query topology in the next section.
The next two lines atomically fetch the document contents and version:
1 2 | (local-select> (keypath *id) $$docs :> *doc) (local-select> [(keypath *id) (view count)] $$edits :> *version) |
1 2 | .localSelect("$$docs", Path.key("*id")).out("*doc") .localSelect("$$edits", Path.key("*id").view(Ops.SIZE)).out("*version") |
These two queries are atomic because of colocation, just as explained above. Fetching the latest contents of a document is simply a lookup by key, and fetching the latest version is simply the size of the list of edits.
The next line packages these two values into a single object:
This just puts them into a hash map.
Finally, here’s the last line of the query topology:
1 | (|origin) |
1 | .originPartition(); |
The “origin partitioner” relocates computation to the task where the query began execution. All query topologies must invoke the origin partitioner, and it must be the last partitioner invoked.
Let’s now take a look at how a client would interact with this module.
Interacting with the module
Here’s an example of how you would get clients to*edit-depot
,
$$edits
, and the
doc+version
query topology, such as in your web server:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | (def manager (open-cluster-manager {"conductor.host" "1.2.3.4"})) (def edit-depot (foreign-depot manager "nlb.collaborative-document-editor/CollaborativeDocumentEditorModule" "*edit-depot")) (def edits-pstate (foreign-pstate manager "nlb.collaborative-document-editor/CollaborativeDocumentEditorModule" "$$edits")) (def doc+version (foreign-query manager "nlb.collaborative-document-editor/CollaborativeDocumentEditorModule" "doc+version" )) |
1 2 3 4 5 6 7 8 9 | Map config = new HashMap(); config.put("conductor.host", "1.2.3.4"); RamaClusterManager manager = RamaClusterManager.open(config); Depot editDepot = manager.clusterDepot("nlb.CollaborativeDocumentEditorModule", "*edit-depot"); PState editsPState = manager.clusterPState("nlb.CollaborativeDocumentEditorModule", "$$edits"); QueryTopologyClient<Map> docPlusVersion = manager.clusterQuery("nlb.CollaborativeDocumentEditorModule", "doc+version"); |
A “cluster manager” connects to a Rama cluster by specifying the location of its “Conductor” node. The “Conductor” node is the central node of a Rama cluster, which you can read more about here. From the cluster manager, you can retrieve clients to any depots, PStates, or query topologies for any module. The objects are identified by the module name and their name within the module.
Here’s an example of appending an edit to the depot:
1 | (foreign-append! edit-depot (mk-add-edit 123 3 5 "to the ")) |
1 | editDepot.append(makeAddEdit(123, 3, 5, "to the ")); |
This uses the previously defined helper functions to create an add edit for document ID 123 at version 3, adding “to the ” to offset 5.
Here’s an example of querying the
$$edits
PState for a range of edits:
1 | (def edits (foreign-select-one [(keypath 123) (srange 3 8)] edits-pstate)) |
1 | List<Map> edits = editsPState.selectOne(Path.key(123L).sublist(3, 8)); |
This queries for the list of edits from version 3 (inclusive) to 8 (exclusive) for document ID 123.
Finally, here’s an example of invoking the query topology to atomically fetch the contents and version for document ID 123:
1 | (def ret (foreign-invoke-query doc+version 123)) |
1 |
This looks no different than invoking any other function, but it’s actually executing remotely on a cluster. This returns a hash map containing the doc and version just as defined in the query topology.
Workflow with frontend
Let’s take a look at how a frontend implementation can interact with our completed backend implementation to be fully reactive. The approach is the same as detailed in this post. Architecturally, besides Rama you would have a web server interfacing between Rama and web browsers.
The application in the browser need to know as soon as there’s an update to the document contents on the backend. This is easy to do with Rama with a reactive query like the following:
“Proxy” is similar to the “select” calls already shown. It takes in a path specifying a value to navigate to. Unlike “select”, “proxy”:
- Returns a ProxyState which is continuously and incrementally updated as the value in that PState changes. This has a method “get” on it that can be called at any time from any thread to get its current value.
- Can register a callback function that’s invoked as soon as the value changes in the module.
The time to invoke the callback function after being changed on the PState is less than a millisecond, and it’s given three arguments: the new value (what would be selected if the path was run from scratch), a “diff” object, and the previous value the last time the callback function was run. The diff isn’t needed in this case, but it contains fine-grained information about how the value changed. For example, if the path was navigating to a set object, the diff would contain the specific info of what elements were added and/or removed from the set. Reactive queries in Rama are very potent, and you can read more about them here.
For this use case, the browser just need to know when the version changes so it can take the following actions:
- Fetch all edits that it missed
- Run operational transformational algorithm against any pending edits it has buffered but hasn’t sent to the server yet
The browser contains four pieces of information:
- The document contents
- The latest version its contents are based on
- Edits that have been sent to the server but haven’t been acknowledged yet
- Edits that are pending
The basic workflow of the frontend is to:
- Buffer changes locally
- Send one change at a time to the server. When it’s applied, run operational transformation against all pending edits, update the version, and then send the next pending change to the server if there is one.
The only time the full document contents are ever transmitted between server and browser are on initial load when the query topology is invoked to atomically retrieve the document contents and version. Otherwise, only incremental edit objects are sent back and forth.
Summary
There’s a lot to learn with Rama, but you can see from this example application how much you can accomplish with very little code. For an experienced Rama programmer, a project like this takes only a few hours to fully develop, test, and have ready for deployment. The Rama portion of this is trivial, with most of the work being implementing the operational transformation algorithm.
As mentioned earlier, there’s a Github project for the Clojure version and for the Java version containing all the code in this post. Those projects also have tests showing how to unit test modules in Rama’s “in-process cluster” environment.
You can get in touch with us at consult@redplanetlabs.com to schedule a free consultation to talk about your application and/or pair program on it. Rama is free for production clusters for up to two nodes and can be downloaded at this page.