Fork me on GitHub

Project dependency

Add this to your project dependencies: Clojars Project

You can just load replikativ as a library.

(ns replikativ.ormap-demo
  (:require [superv.async :refer [<?? S]]
            [kabel.peer :refer [start stop]]
            [konserve
             [filestore :refer [new-fs-store]]
             [memory :refer [new-mem-store]]]
            [replikativ
             [peer :refer [server-peer]]
             [stage :refer [connect! create-stage!]]]
            [replikativ.crdt.ormap.stage :as ors]
            [replikativ.crdt.ormap.realize :as real]))

Global namespace

The datatypes for state management are organized in a global namespace which allows you to address and share state with other peers and applications. Each datatype is addressed by a user and a globally unique uuid:

(def user "mail:prototype@your-domain.com") ;; will be used to authenticate you (not yet)
(def ormap-id #uuid "7d274663-9396-4247-910b-409ae35fe98d") ;; application specific datatype address

Peer setup

Next you need to set up a peer. One can either integrate it with your http server through a ring-middleware or start directly a dedicated peer.

(def store-a (<?? S (new-fs-store "/tmp/test"))) ;; durable store
(def peer-a (<?? S (server-peer S store-a "ws://127.0.0.1:9090"))) ;; network and file IO
(<?? S (start peer-a))

Finally you can interact with the peer through a stage. The stage provides the API to the peer for applications.

(def stage-a (<?? S (create-stage! user peer-a))) ;; API for peer

Now lets provide the datatype for our application:

(<?? S (ors/create-ormap! stage-a :id ormap-id))

For test purposes we set up another peer similarly.

(def store-b (<?? S (new-mem-store))) ;; store for testing
(def peer-b (<?? S (server-peer S store-b "ws://127.0.0.1:9091")))
(<?? S (start peer-b))
(def stage-b (<?? S (create-stage! user peer-b)))
(<?? S (ors/create-ormap! stage-b :id ormap-id))

Now you are set up :).

State changes

[['assoc [:name "Peter"]]] is encoding a user-defined function application to apply to some associative local state. Note that you should use the same key (or one that can be mapped one-to-one).

(<?? S (ors/assoc! stage-b [user ormap-id] :name [['assoc [:name "Peter"]]]))
(<?? S (ors/get stage-b [user ormap-id] :name))

Now let us wire the peers up:

(<?? S (connect! stage-a "ws://127.0.0.1:9091")) 

Do they converge?

(<?? S (ors/get stage-a [user ormap-id] :name)) 
;; accordingly we can provide a dissoc operation on removal
(<?? S (ors/dissoc! stage-a [user ormap-id] :name [['dissoc :name]])) 

Feel free to play around :) Finally you can release the network connections by finishing the peers.

Streaming

Direct state access as done above can be cumbersome, as you want to react on all state changes in your application logic. By mutating replikativ and reacting to state changes from the whole system, you can let the state flow in a circle in your local application and gain a functional pipeline.

Let’s stream the OR-Map changes into an associative datastructure like a map in an atom. To do so we define runtime functions which interpret the streamed transactions with the eval-fn map:

(def val-atom (atom {}))
(def eval-fn {'assoc (fn [S old [k v]]
                       (swap! old assoc k v)
                       old)
              'dissoc (fn [S old [k]]
                        (swap! old dissoc k)
                        old)})

These are in effect a reduction over the identity applying the functions in the order provided by the datatype (details follow in the next section).

Now we start a stream of function applications on the identity of the val-atom.

(def stream (real/stream-into-identity! stage-a [user ormap-id] eval-fn val-atom))

That way you can decide locally when you want to access the data and how to interpret it for your desired identity type. Instead of an atom this can very well be Datomic or a database of your choice. If you provide an optional :applied-log key, the identity will be treated as durable and changes will only be applied once also after system restarts.

(@val-atom :name) ;; when you have added :name above => "Peter"

Order of events

It is important to note that the streaming functions you provide follow the semantics of the datatypes you use. In an OR-Map for instance no order of assoc or dissoc transactions is provided, the only guarantee is that you see a corresponding dissoc after its assoc.

To get order with strong consistency semantics, e.g. if you cannot use a default conflict resolution mechanism of a CRDT, you can use CDVCS.

Shutdown

(<?? S (stop peer-a))
(<?? S (stop peer-b))

The ClojureScript API is the same, except that you cannot have blocking IO and cannot open a websocket server in the browser (but we have already WebRTC in mind ;) ):