Avout brings Clojure's in-memory model of state to distributed application development by providing a distributed implementation of Clojure's Multiversion Concurrency Control (MVCC) STM along with distributable, durable, and extendable versions of Clojure's Atom and Ref concurrency primitives.

Download on GitHub

The Avout equivalent of Hello World.

(use 'avout.core)
(def client (connect "127.0.0.1"))

(def r0 (zk-ref client "/r0" 0))
(def r1 (zk-ref client "/r1" []))

(dosync!! client
  (alter!! r0 inc)
  (alter!! r1 conj @r0))

Start by creating a ZooKeeper client with the connect function, then create two ZooKeeper-backed distributed Refs using the zk-ref function. Finally, perform a dosync!! transaction that updates both Refs with alter!!. Using Avout isn't much different than using Clojure's in-memory Atoms and Refs.

Avout Atoms and Refs implement Clojure's IRef interface, and therefore support functions that operate on IRefs, including: deref (and its reader-macro, @), set-validator!, add-watch, and remove-watch.

Avout also provides "double-bang" versions of the remaining core Atom and Ref functions (reset!, swap!, dosync, ref-set, alter, commute) for use with distributed Atoms and Refs: reset!!, swap!!, dosync!!, ref-set!!, alter!!, commute!!.

Note: Avout Refs cannot participate in in-memory dosync transactions, but Avout's local-ref provides the equivalent of an in-memory Ref that can participate in dosync!! transactions with distributed Refs.

Background

Avout enables techniques that require synchronous, coordinated management of distributed state (see also JavaSpaces), complementing approaches that focus on asynchronous, uncoordinated communication between distributed components, e.g. message queues (0MQ, RabbitMQ, HornetQ), event-driven approaches (Netty, Aleph), and actors (Erlang, Akka).

Much has been written [3,4,5] on functional programming and the advantages of designing programs that emphasize pure functions and immutable values, and that minimize or eliminate, wherever possible, mutable state. Of course, it's not always possible to completely eliminate the need for mutable state, and that's where Clojure's precise model of time, identity, and state becomes powerful.

Likewise, when designing distributed applications, it is desirable to create components that are loosely coupled and that communicate with each other asynchronously, but this too is also not always possible. There are times when you need coordinated access to state across systems in a distributed application, and this is where Avout comes in.

Below is some background on Avout's design, if your interests don't tend toward the philosophy of Alfred North Whitehead, you can jump directly to the Getting Started guide.

Clojure's Philosophy of State

Rich Hickey has spoken eloquently on mutable state in his talk "Are We There Yet?". To summarize, Rich and Alfred North Whitehead [6] don't believe in mutable state, it's an illusion. Rather, there are only successions of causally-linked immutable values, and time is derived from the perception of these successions. Causally-linked means the future is a function of the past; processes apply pure functions to immutable values to derive new immutable values, and we assign identity to these chains of values, and perceive change where there is none.

As Rich has been known to do, he has provided precise, if not familiar, definitions to some familiar words that describe this model of identity, time, and mutable state.

A consequence of this view is that we perceive only snapshots of an unchanging past, not a ever-changing present, and this is exactly how Clojure models time, identity, and mutable state using Atoms, Refs, and Agents.

Atoms & Refs

Atoms provide a way to manage shared, independent state synchronously. Atoms are thread-safe and updates are atomic.

Refs also provide a way to manage shared state synchronously, but changes to multiple Refs can be coordinated within a single transaction.

If you need synchronized updates coordinated across multiple identities, then use Refs. Otherwise, Atoms are more efficient.

Reads vs. Writes

Reading is just observation, and perception is massively parallel and uncoordinated, meaning you don't have to coordinate with others to observe the value of an identity, no locks or synchronized blocks or message queues, just grab the most recent snapshot of the identity's value and go.

Writes, on the other hand, must be coordinated. Since future values are derived by applying functions to an identity's most recent immutable value, writes must be atomic. Two processes that observe the same value for an identity cannot both write new values, one will win and the other will have to read the new value, calculate the new value, and try its write again.

Transactions

Clojure uses a Multiversion Concurrency Control (MVCC) STM to manage transactions.

"Transactions ensure that all actions on Refs are atomic, consistent, and isolated. Atomic means that every change to Refs made within a transaction occurs or none do. Consistent means that each new value can be checked with a validator function before allowing the transaction to commit. Isolated means that no transaction sees the effects of any other transaction while it is running. Another feature common to STMs is that, should a transaction have a conflict while running, it is automatically retried." — clojure.org/refs

The above figure illustrates two transactions. In the first, only a single Ref, x, participates, the transaction begins at time t1, reads the last committed value of x written at time t0, applies a function to the value, and writes a new value at time t2. The transaction ensures that the update is atomic, i.e. that no other process can update the value of x in the time between when this transaction reads the current value and writes a new value. If another process beats it to the write, this transaction will be retried, applying its function to the new value and trying the write again.

In the second transaction, two Refs participate, y and z, the transaction starts at time t5, the last committed values of y, from time t3, and x, from time t4, are read, new values are derived by applying functions to the old values, and are then written at time t6. If the write of either y or z fails, then the entire transaction is retried until the maximum number of retries has been reached.

Below is an illustration of how Clojure's, and Avout's, STM performs updates to a Ref's state with two contending writers.

  1. Thread-1 creates a Ref, r, at time t0 with a value of 0
  2. Thread-2 then derefs r and oberves the latest value 0 from t0.
  3. Thread-1 then begins a transaction at t1, the value of r is still 0.
  4. Thread-2 starts begins its own transaction at t2, the value of r is also still 0.
  5. Thread-1 begins committing at t3, updating r's t0 value with the inc function.
  6. Thread-2 attempts to commit but fails because Thread-1 beat it to the commit, so it starts over.
  7. While Thread-2 is running the transaction again, Thread-1 starts a new transaction, but only observes the value of r in it, so it doesn't interfere with Thread-2's commit. Note that observing a Ref's value outside of a transaction doesn't move time forward, but that observing it inside of transaction, even if you don't change it does.

Both figures illustrate that values are associated with points in time and are immutable.

Avout Distributed Atoms and Refs

Avout provides a distributed version of Clojure's MVCC STM. Avout transactions provide the same promises of atomicity, consistency, and isolation as Clojure's in-memory versions, and optionally providing durability, depending on the type of Avout Atom or Ref used.

Avout Atoms and Refs use Apache ZooKeeper to help coordinate state change, but not necessarily to hold state. Out of the box, Avout contains an Atom, zk-atom, that does also use ZooKeeper data fields to store its state, and two types of Refs, zk-ref, also backed by ZooKeeper data fields, and local-ref, that is in fact not distributed but provides a mechanism for local Refs to participate in transactions with distributed Refs.

Extendable

Avout lets you create new types of distributed Atoms and Refs using the avout.state.StateContainer and avout.state.VersionedStateContainer protocols, respectively; examples of which are the MongoDB-backed mongo-atom and mongo-ref, which can be found in the plugins directory.

Caching

Avout provides caching, so that multiple derefs of the same Atom/Ref will not need to hit either ZooKeeper nor the back-end state-store until the cache has been invalidated by a local or remote update to the Atom or Ref.

IRef

Avout Atoms and Refs implement Clojure's IRef interface, and therefore support functions that operate on IRefs, including: deref (and its reader-macro, @), set-validator!, add-watch, and remove-watch.

Avout provides "double-bang" versions of the remaining core Atom and Ref functions (reset!, swap!, dosync, ref-set, alter, commute) for use with distributed Atoms and Refs: reset!!, swap!!, dosync!!, ref-set!!, alter!!, commute!!.

Note: Avout Refs cannot participate in in-memory dosync transactions, but Avout's local-ref provides the equivalent of an in-memory Ref that can participate in dosync!! transactions with distributed Refs.

zk-atom & zk-ref

Both zk-atoms and zk-refs store their state in a ZooKeeper data field as a byte array; deserialization/serialization is performed using Clojure's Reader/Printer (read-str/pr-str). ZooKeeper limits the size of data stored in data fields to 1 megabyte. Because Clojure's printer/reader is used for serialization, only appropriate Clojure data structures can be used as values. However, the ability to implement other back-ends with different serializations schemes is a primary goal of Avout.

local-ref

The local-ref stores its state in an in-memory Clojure Atom. Because its state is not network accessible, it can only be access from a single JVM, just like in-memory Refs, but unlike in-memory Refs it can participate in transactions with distributed Refs. This provides two benefits, 1) it provides a mechanism for keeping a value that only needs to be available locally in sync with distributed state using dosync!!, 2) since no serialization/deserialization is performed, arbitrary values can be used, including Java objects.

mongo-atom & mongo-ref

In the plugins directory there is a Leiningen project called mongo-avout, which contains an implementation of a MongoDB-backed Atom, mongo-atom, and Ref, mongo-ref. Both support values consisting of any Clojure data structure supported in MongoDB with the Congomongo Library.

Extending Avout Atoms and Refs

The above three types of Atoms and Refs provide examples for implementing other types that use different back-end state-stores and serialization methods. State-stores may, or may not, provide durability. For instance, zk-atom, zk-ref, mongo-atom, and mongo-ref provide durability promises in addition to atomicity, consistency, and isolation, but local-ref does not. Other durable (e.g. (No)SQL databases, Terracotta, Hadoop HFS, RESTful services) or non-durable (e.g. in-memory data structures, RESTful services) can be used as the basis of new types of Atoms and Refs.

New types of Atoms are created by implementing the avout.state.StateContainer protocol.

(defprotocol StateContainer
  (initStateContainer [this])
  (destroyStateContainer [this])
  (getState [this])
  (setState [this value]))

Once you have implemented a StateContainer, create a distributed Atom by passing it to the avout.atoms.distributed-atom function.

(defn custom-atom [client atom-name] 
  (distributed-atom client atom-name custom-state-container))

To create new Ref types, implement avout.state.VersionedStateContainer.

(defprotocol VersionedStateContainer
  (initVersionedStateContainer [this])
  (destroyVersionedStateContainer [this])
  (getStateAt [this version])
  (setStateAt [this value version])
  (deleteStateAt [this version]))

Once you have implemented a VersionedStateContainer, create a distributed Ref by passing it to the avout.refs.distributed-ref function.

(defn custom-ref [client ref-name] 
  (distributed-ref client ref-name custom-versioned-state-container))

Getting Started

Avout Atoms and Refs implement Clojure's IRef interface, and therefore support functions that operate on IRefs, including: deref (and its reader-macro, @), set-validator!, add-watch, remove-watch. Avout provides "double-bang" versions of the remaining core Atom and Ref functions (reset!, swap!, dosync, ref-set, alter, commute) for use with distributed Atoms and Refs: reset!!, swap!!, dosync!!, ref-set!!, alter!!, commute!!.

Note: Avout Refs cannot participate in in-memory dosync transactions, but Avout's local-ref provides the equivalent of an in-memory Ref that can participate in dosync!! transactions with distributed Refs.

Avout is available on Github, https://github.com/liebke/avout. To get started, you'll need to run ZooKeeper, and include Avout as a dependency by adding the following to your project.clj file:

[avout "0.5.3"]

Then load the avout.core namespace, and create a ZooKeeper client that will be passed to distributed Refs and Atoms when they are created and to dosync!! transactions when they are performed.

(use 'avout.core)
(def client (connect "127.0.0.1"))

Now create a distributed atom, you'll need to pass the ZooKeeper client, a ZooKeeper compliant name (must start with a slash), and an optional initial-value. Skip the initial value if you want to connect to an existing distributed Atom.

(def a0 (zk-atom client "/a0" 0))

Deref it.

@a0

Then you can use the swap!! and reset!! functions just as you would Clojure's built-in swap! and reset! functions.

(swap!! a0 inc)

Use reset!!
(reset!! a0 0)

Create a distributed Ref backed by ZooKeeper data fields

(def r0 (zk-ref client "/r0" 0))
(def r1 (zk-ref client "/r1" []))

Alter both Refs within a dosync!! block.
(dosync!! client
  (alter!! r0 inc)
  (alter!! r1 conj @r0))

@r0

@r1

(dosync!! client
  (ref-set!! r0 0)
  (ref-set!! r1 []))

To connect to a remote Ref or Atom without clobbering a current value that may have been set by a remote client, just leave off the initial value when calling zk-ref or zk-atom. For instance, to access the existing r0 and r1 Refs from a remote host:

(def r0 (zk-ref client "/r0"))
(def r1 (zk-ref client "/r1"))

Move a member from one group to another, across two different ref types (zk and mongo), transactionally so that the member always exist in one group or the other, but never neither group nor both groups.

This example will use mongo-ref and so you will need to include avout-mongo as a dependency by adding the following to your project.clj file:

[avout-mongo "0.5.3"]

Define the two groups as refs (one zk-backed, the other mongodb-backed) that contain a map with one fields :members, which is a set of names (Note: mongo-ref and mongo-atom are part of a seperate project called mongo-avout, which lives in the plugins directory, you will need to install it in order to run the following example).

(import 'avout.refs.mongo)

(def group1 (zk-ref client "/group1" {:members #{"david"}}))
(def group2 (mongo-ref client "/group2" {:members #{"emma"}}))

(dosync!! client
  (alter!! group1 update-in [:members] disj "david")
  (alter!! group2 update-in [:members] conj "david"))

Avout Distributed Locks

To get started, you'll need to run ZooKeeper.

The avout.locks namespace contains an implementation of java.util.concurrent.lock.Lock, called ZKDistributedReentrantLock, and an implementation of java.util.concurrent.lock.ReadWriteLock, called ZKDistributedReentrantReadWriteLock.

Examples

First require avout.zookeeper and avout.locks.

(require '(zookeeper :as zk))
(use 'avout.locks)

Then get a ZooKeeper client.

(def client (zk/connect "127.0.0.1"))

Then create a ZKDistributedReentrantLock with the distributed-lock function.

(def lock (distributed-lock client :lock-node "/lock"))

(try (.lock lock)
     ... do something
 (finally (.unlock lock)))

The lock method blocks until the lock is obtained. It is standard practice to call lock within a try block with a finally statement that unlocks it.

You can use the with-lock macro, which is equivalent to Clojure's locking macro, but designed to work with java.util.concurrent.locks.Lock instead of the traditional monitor locks.

(with-lock lock
  ... do something)

The tryLock method doesn't block while waiting for a lock, instead it returns a boolean indicating whether the lock was obtained.

(try (.tryLock lock)
  ... do something
  (finally (.unlock lock)))

Or use the when-lock and if-lock macros.

(when-lock lock
   (... do something)


(if-lock lock
  (... got lock, do something)
  (... didn't get lock do something else))

The tryLock method can take a timeout duration and units.

(try (.tryLock lock 10 java.util.concurrent.TimeUnit/MILLISECONDS)
  ... do something
  (finally (.unlock lock)))

The macros when-lock-with-timeout and if-lock-with-timeout are also available.

(when-lock-with-timeout lock 10 java.util.concurrent.TimeUnit/MILLISECONDS
   (... do something)

(if-lock-with-timeout lock 10 java.util.concurrent.TimeUnit/MILLISECONDS
  (... got lock, do something)
  (... didn't get lock do something else))

ReadWriteLock

Avout also provides a distributed implementation of java.util.concurrent.ReadWriteLock using the distributed-read-write-lock function.
(def rwlock (distributed-read-write-lock client :lock-node "/rwlock"))

ReadWriteLocks provide both a ReadLock and a WriteLock. WriteLocks are just instances of ZKDistributedReentrantLock, and therefore behave as described above. ReadLocks don't block other ReadLocks.

Running ZooKeeper

Download Apache ZooKeeper from http://zookeeper.apache.org/releases.html.

Unpack to $ZOOKEEPER_HOME (wherever you would like that to be).

Here's an example conf file for a standalone instance, by default ZooKeeper will look for it in $ZOOKEEPER_HOME/conf/zoo.cfg

# The number of milliseconds of each tick
tickTime=2000

# the directory where the snapshot is stored.
dataDir=/var/zookeeper

# the port at which the clients will connect
clientPort=2181

Ensure that the dataDir exists and is writable.

After creating and customizing the conf file, start ZooKeeper

$ZOOKEEPER_HOME/bin/zkServer.sh start

Roadmap

  1. Provide Avout Java API
  2. Optimize commute!! implementation, currently commute!! just calls alter!!
  3. Implement distributed agents
  4. Experiment with other back-end state-stores, e.g. Terracotta, JavaSpaces
  5. Abstract out ZooKeeper functionality and implement a version of Avout without dependencies or one with other dependencies, e.g. JGroups

Contributing

Although Avout is not part of Clojure-Contrib, it follows the same guidelines for contributing, which includes signing a Clojure Contributor Agreement (CA) before contributions can be accepted.

License

Avout is distributed under the Eclipse Public License, the same as Clojure.

References

  1. Avout on Github
  2. Are We There Yet?
  3. Clojure State
  4. Out of the Tar Pit
  5. Okasaki Publications
  6. Process and Reality
  7. Clojure Refs and Transactions
  8. Software Transactional Memory
  9. Multiversion Concurrency Control
  10. Snapshot Isolation
  11. ZooKeeper Website
  12. ZooKeeper: Wait-free coordination for Internet-scale systems
  13. ZooKeeper Recipes and Solutions

Copyright © 2011 David Edgar Liebke and Relevance, Inc