Monday, October 27, 2014

Clock-SI: Snapshot Isolation for Partitioned Data Stores Using Loosely Synchronized Clocks

This paper appeared in SRDS 2013, and is concerned with the snapshot isolation problem for distributed databases/data stores.

What is snapshot isolation (SI)?

(I took these definitions almost verbatim from the paper.)
SI is a multiversion concurrency control scheme with 3 properties:
1) Each transaction reads from a consistent snapshot, taken at the start of the transaction and identified by a snapshot timestamp. A snapshot is consistent if it includes all writes of transactions committed before the snapshot timestamp, and if it does not include any writes of aborted transactions or transactions committed after the snapshot timestamp.
2) Update transactions commit in a total order. Every commit produces a new database snapshot, identified by the commit timestamp.
3) An update transaction aborts if it introduces a write-write conflict with a concurrent committed transaction. Transaction T1 is concurrent with committed update transaction T2, if T1 took its snapshot before T2 committed and T1 tries to commit after T2 committed.

When a transaction starts, its snapshot timestamp is set to the current value of the database version. All its reads are satisfied from the corresponding snapshot. To support snapshots, multiple versions of each data item are kept, each tagged with a version number equal to the commit timestamp of the transaction that creates the version. The transaction reads the version with the largest version number smaller than its snapshot timestamp. If the transaction is read-only, it always commits without further checks. If the transaction has updates, its writes are buffered in a workspace. When the update transaction requests to commit, a certification check verifies that the transaction writeset does not intersect with the writesets of concurrent committed transactions. If the certification succeeds, the database version is incremented, and the transaction commit timestamp is set to this value.

What is the innovation in the Clock-SI paper?

The conventional SI implementations use a centralized timestamp authority for consistent versioning. This is because local clocks on different nodes may differ a lot (NTP synchronization may have 10s of ms of inaccuracies), and is not suitable for consistent versioning.

Clock-SI, instead, proposes a way to use loosely synchronized clocks to assign snapshot and commit timestamps to transactions. Compared to conventional SI, Clock-SI does not have a single point of failure and a potential performance bottleneck. It saves one round-trip message for a ready-only transaction (to obtain the snapshot timestamp), and two round-trip messages for an update transaction (to obtain the snapshot timestamp and the commit timestamp). A transaction's snapshot timestamp is the value of the local clock at the partition where it starts. Similarly, the commit timestamp of a local update transaction is obtained by reading the local clock.

If you read Google's Spanner paper, you know that Google Spanner solves this problem by introducing TrueTime, which uses atomic clocks.

How does Clock-SI work?

Clock-SI essentially response-delays a read in a transaction
1) to account for clock synchronization differences (epsilon) as in Fig1, and
2) to account for the pending commit of an update transaction.

In Fig1, the read arrives at time t′ on P2's clock, before P2’s clock has reached the value t, and thus t′ < t. The snapshot with timestamp t at P2 is therefore not yet available. Another transaction on P2 could commit at time t′′, between t′ and t, and change the value of x. This new value should be included in T1's snapshot.

T2's snapshot is unavailable due to the commit in progress of transaction T1, which is assigned the value of the local clock, say t, as its commit timestamp. T1 updates item x and commits. The commit operation involves a write to stable storage and completes at time t′. Transaction T2 starts between t and t′, and gets assigned a snapshot timestamp t′′, t < t′′ < t′. If T2 issues a read for item x, we cannot return the value written by T1, because we do not yet know if the commit will succeed, but we can also not return the earlier value, because, if T1's commit succeeds, this older value will not be part of a consistent snapshot at t′′.


The paper does not include a performance comparison to Spanner. The NTP synchronized clocks in the evaluation experiments have an NTP offset/epsilon less than 0.1 msec, which is actually more precise than Spanner's atomic clock! I guess this is thanks to the Gigabit Ethernet they use in their LAN deployment.

Discussion: Use of Hybrid Logical Clocks (HLC) for the Clock-SI problem

HLC is a hybrid version of logical clocks and physical clocks, introduced by us recently, to combine the advantages of both clocks, while avoiding their disadvantages. Since HLC captures happened-before relationship and uses this extra information in ordering, it does not need to wait out uncertainty regions of physical clock synchronization. Dually, since HLC is related to physical clocks it allows querying with respect to physical time. We had shown HLC's advantages for the consistent snapshot problem in our work.

Here we find that HLC indeed improves the clock-SI problem of snapshot isolation if it is used instead of physical clocks. HLC avoids the delay in Figure 1. HLC would not incur the delay because it also uses happened-before information as encoded in HLC clocks.

Saturday, October 18, 2014

Facebook's software architecture

I had summarized/discussed a couple papers (Haystack, Memcache caching) about Facebook's architecture before.

Facebook uses simple architecture that gets things done. Papers from Facebook are refreshingly simple, and I like reading these papers.

Two more Facebook papers appeared recently, and I briefly summarize them below.

TAO: Facebook's distributed data store for the social graph (ATC'13)

A single Facebook page may aggregate and filter 100s of items from the social graph. Since Facebook presents each user with customized content (which needs to be filtered with privacy checks) an efficient, highly available, and scalable graph data store is needed to serve this dynamic read-heavy workload.

Before Tao, Facebook's web servers directly accessed MySql to read or write the social graph, aggressively using memcache as a look aside cache (as it was explained in this paper).

The Tao data store implements a graph abstraction directly. This allows Tao to avoid some of the fundamental shortcomings of a look-aside cache architecture. Tao implements an objects and associations model and continues to use MySql for persistent storage, but mediates access to the database and uses its own graph-aware cache.
To handle multi-region scalability, Tao uses replication using the per-record master idea. (This multi-region scalability idea was again presented earlier in the Facebook memcache scaling paper.)

F4: Facebook's warm BLOB storage system (OSDI'14)

Facebook uses Haystack to store all media data, which we discussed earlier here.

Facebook's new architecture splits the media into two categories:
1) hot/recently-added media, which is still stored in Haystack, and
2) warm media (still not cold), which is now stored in F4 storage and not in Haystack.

This paper discusses the motivation for this split and how this works.

Facebook has big data! (This is one of those rare cases where you can say big data and mean it.) Facebook stores over 400 billion photos.

Facebook found that there is a strong correlation between the age of a BLOB (Binary Large OBject) and its temperature. Newly created BLOBs are requested at a far higher rate than older BLOBs; they are hot! For instance, the request rate for week-old BLOBs is an order of magnitude lower than for less-than-a-day old content for eight of nine examined types. Content less than one day old receives more than 100 times the request rate of one-year old content. The request rate drops by an order of magnitude in less then a week, and for most content types, the request rate drops by 100x in less than 60 days. Similarly, there is a strong correlation between age and the deletion rate: older BLOBs see an order of magnitude less deletion rate than the new BLOBs. These older content is called warm, not seeing frequent access like hot content, but they are not completely frozen either.

They also find that warm content is a large percentage of all objects. They separate the last 9 months Facebook data under 3 intervals: 9-6 mo, 6-3 mo, 3-0 months. In the oldest interval, they find that for the data generated in that interval more than 80% of objects are warm for all types. For objects created in the most recent interval more than 89% of objects are warm for all types. That is the warm content is large and it is growing increasingly.

In light of these analysis, Facebook goes with a split design for BLOB storage. They introduce F4 as a warm BLOB storage system because the request rate for its content is lower than that for content in Haystack and thus is not as hot. Warm is also in contrast with cold storage systems that reliably store data but may take days or hours to retrieve it, which is unacceptably long for user-facing requests. The lower request rate of warm BLOBs enables them to provision a lower maximum throughput for F4 than Haystack, and the low delete rate for warm BLOBs enables them to simplify F4 by not needing to physically reclaim space quickly after deletes.

F4 provides a simple, efficient, and fault tolerant warm storage solution that reduces the effective-replication-factor from 3.6 to 2.8 and then to 2.1. F4 uses erasure coding with parity blocks and striping. Instead of maintaining 2 other replicas, it uses erasure coding to reduce this significantly.

The data and index files are the same as Haystack, the journal file is new. The journal file is a write-ahead journal with tombstones appended for tracking BLOBs that have been deleted. F4 keeps dedicated spare backoff nodes to help with BLOB online reconstruction. This is similar to the use of dedicated gutter nodes for tolerating memcached node failures in the Facebook memcache paper.
F4 has been running in production at Facebook for over 19 months. F4 currently stores over 65PB of logical data and saves over 53PB of storage.


1) Why go with a design that has a big binary divide between hot and warm storage? Would it be possible to use a system that handles hot and warm as gradual degrees in the spectrum? I guess the reason for this design is its simplicity. Maybe it is possible to optimize things by treating BLOBs differentially, but this design is simple and gets things done.

2) What are the major differences in F4 from the Haystack architecture? F4 uses erasure coding for replication: Instead of maintaining 2 other replicas, erasure coding reduces replication overhead significantly.  F4 uses write-ahead logging and is aggressively optimized for read-only workload. F4 has less throughput needs. (How is this reflected in its architecture?)

Caching is an orthogonal issue handled at another layer using memcache nodes. I wonder if the caching policies treat content cached from Haystack versus F4 differently.

3) Why is energy-efficiency of F4 not described at all? Can we use grouping tricks to get cold machines/clusters in F4 and improve energy-efficiency further as we discussed here?

4) BLOBs have large variation in size. Can this be utilized in F4 to improve access efficiency? (Maybe treat/store very small BLOBs differently, store them together, don't use erasure coding for them. How about very large BLOBs?)

Wednesday, October 15, 2014

Paper Summary: A self-configurable geo-replicated cloud storage system

This paper is a followup work to the Pileus work, which I had covered here. Pileus aimed to help developers find a suitable consistency/latency combination for their application deployment. In Pileus, the configuration of primary and secondary nodes is assumed to be fixed (some storage nodes are designated as primary nodes, which hold the master data, while others are secondary nodes). The developer uses an SLA to state ranked preferences for latency and consistency of the reads that would make most sense for the application, and using this SLA, Pileus provides dynamic tuning of the performance of the application by deciding which read to forward to which replica/master.

This paper introduces a followup system, called Tuba, where the configuration is not fixed, and can be changed on the fly. Tuba extends Pileus to address the problem of finding an optimal configuration of primary and secondary replicas that maximizes the overall utility and minimizes the cost for the application.

Tuba extends Pileus with a configuration service (CS) delivering the following capabilities:
1. performing a reconfiguration periodically for different tablets, and
2. informing clients of the current configuration for different tablets.

Data is organized into tablets as access units, and each tablet is assigned to primary and secondary nodes. Of course different tablets may be assigned to different primary and secondary nodes. In order for the CS to configure a tablet's replicas to maximize the overall utility, the CS must be aware of the way the tablet is being accessed globally. Therefore, all clients in the system periodically send their observed latency and the hit and miss ratios of their SLAs to the CS.

Once a new configuration is decided, one or more of the following operations are performed as the system changes to the new configuration: (i) changing the primary replica, (ii) adding or removing secondary replicas, and (iii) changing the synchronization periods between primary and secondary replicas.

Configuration service (CS)

The CS is a centralized service. To improve utility, the CS selects a new configuration by first generating all configurations that satisfy a list of defined constraints. To permute configurations, it is free to use one of the following operations:
+ Adjust the Synchronization Period
+ Add/Remove Secondary Replica
+ Change Primary Replica
+ Add Primary Replica

For example, for a missed subSLA with strong consistency, two potential new configurations would be: (i) creating a new replica near the client and making it the solo primary replica, or (ii) adding a new primary replica near the client and making the system run in multi-primary mode. As another example, for a social network application that spans Brazil and India, the CS may decide to swap the primary and secondary replica roles to improve utility. During peak times in India, the secondary replica in South Asia becomes the primary replica. Likewise, during peak times in Brazil, the replica in Brazil becomes primary.

The constraints for configurations may include (i) Geo-replication factor, (ii) Location, (iii) Synchronization period, and (iv) Cost constraints. With the cost constraint, the application developers can indicate how much they are willing to pay (in terms of dollars) to switch to a new configuration. For instance, one possible configuration is to put secondary replicas in all available datacenters. While the gained utility for this configuration would be great, the cost of this configuration is likely unacceptably large.

For each configuration possibility that meets the constraints, the CS computes the expected gained utility and the cost of reconfiguration. The CS considers the following costs for a new configuration:
+ Storage: the cost of storing a tablet in a particular site.
+ Read/Write Operation: the cost of performing read/write operations.
+ Synchronization: the cost of synchronizing a secondary replica with a primary one.

Finally the CS choses the configuration that offers the highest utility-to-cost ratio, and executes the reconfiguration operations required to transition to the new one.

Client execution modes

Clients need to avoid two potential safety violations: (i) performing a read operation with strong consistency on a non-primary replica, or (ii) executing a write operation on a non-primary replica. Based on the freshness of a client's view, the client is either in fast or slow mode. A client is in the fast mode for a given tablet if it knows the locations of primary and secondary replicas, and it is guaranteed that the configuration will not change in the near future. Whenever a client suspects that a configuration may have changed, it enters slow mode until it refreshes its local cache.

Fast Mode. When a client is in fast mode, read and single-primary write operations involve a single round-trip to one selected replica. No additional overhead is imposed on these operations. Multi-primary write operations use a three-phase protocol in fast or slow mode.

Slow Mode. Slow mode has no affect on read operations with relaxed consistency. On the other hand, since read operations with strong consistency must always go to a primary replica, the client needs to validate that the responding replica to a strong-consistency read is still a primary replica. If not, the client retries the read operation. Write operations are more involved when a client is in slow mode. Any client in slow mode that wishes to execute a write operation on a tablet needs to take a non-exclusive lock on the tablet's configuration before issuing the write operation. On the other hand, the CS needs to take an exclusive lock on the configuration if it decides to change the set of primary replicas. This lock procedure is required to ensure the linearizability of write operations.


They implemented Tuba to extend Microsoft Azure Storage, with broad consistency choices (as in Bayou), consistency-based SLAs (as in Pileus), and a novel replication configuration service. Their evaluation compared with a system that is statically configured. An experiment with clients distributed in datacenters around the world shows that reconfiguration every two hours increases the fraction of reads guaranteeing strong consistency from 33% to 54%. This confirms that automatic reconfiguration can yield substantial benefits which are realizable in practice.

SEA is South East Asia and  WEU is West Europe.

Wednesday, October 8, 2014

Consistent snapshot analogies

Last week I taught distributed snapshot in my CSE 586: Distributed Systems class. While I teach snapshot, I invariably find myself longing for analogies to provide some intuition about this concept. The global state captured by a distributed snapshot (say using Lamport/Chandy marker algorithm) does not correspond to the "state of the system at initiation of the snapshot". Furthermore, it also may not correspond to a "state of the system from initiation to current state during this computation". This is because while the snapshot taking is progressing in the system, the underlying system computation is also proceeding and changing the state of the system progressively. (Distributed snapshot is not allowed to stop/freeze underlying system computation as that reduces availability.)

For those curious about the question, "what good is a snapshot then?": The snapshot captures a reachable state from initiation state, and from the snapshot state the current state of the computation is also reachable. In other words, snapshot is a likely state of the computation, even though it may not have occurred in this particular computation. So, for stable predicate detection and distributed system debugging the snapshot is still valuable.

Going back to my predicament, the analogy I resort to is that of 1000 ants trying to take/construct a picture of the elephant as the elephant is moving. (I had heard this example from Paul Sivilotti while I was a graduate student at Ohio State.) Here the ants correspond to the marker algorithm, and the elephant the underlying computation that we want to take a snapshot of. Of course the pictures the ants will construct will be vaguely elephant-like, it will be a picture of the elephant's outer surface as it progresses in the spacetime continuum. (Achievement Unlocked: Today I used spacetime continuum in serious writing.)

Last week I was using this analogy in class, when a better (at least more modern) analogy occurred to me. Panoramic photographs! When you use your smartphone to take a panorama picture, you are in fact taking a distributed snapshot of your surrounding. Your snapshot is not instantaneous, it needs time to complete: you need to rotate 180 to 360 degrees and probably that takes a good 5-10 seconds. If in the meanwhile something moves, that object will not be reflected in its original form/place/state in your panorama picture.

We may attempt to take the analogy further to categorizing the panorama pictures as consistent snapshots and inconsistent snapshots. In an inconsistent snapshot, although the send of a message is not recorded as part of the snapshot, the receive of the message is recorded as part of the snapshot. (You received a message from the future.) So we can say that, your panorama picture is inconsistent if the object moves in the opposite direction of the panorama/snapshot. These are examples of inconsistent snapshots.

And, these are examples of consistent snapshot. (Maybe the last two are debatable as they duplicate some state.)

Finally, this seemingly-consistent inconsistent snapshot (the bearded guy on the leftmost is teleported to reappear as the rightmost person) points to the dangers of ignoring backchannels when taking a snapshot.

Probably it is not worth trying to strain the analogy further, so I will stop.  Here are some more funny iphone panoramic pictures. 

Monday, September 29, 2014

Paper Summary: High-availability distributed logging with BookKeeper

This paper is brought to you by the Yahoo Research group that developed the ZooKeeper, and it appeared in LADIS'12.

BookKeeper targets the logging problem. More specifically, the distributed logging problem where high-availability is important and where many distributed clients are interested in reading the logs.

Most current applications log to the local disk, but this constitutes a single point of failure (SPOF) and betrays high-availability. A hasty remedy is to write to an NFS partition to store log files remotely. But now the NFS server becomes the SPOF. (We can of course replicate the NSF server, but the performance would suffer.) Another solution is to use NetApp filers that implement RAID. This costs money, and still does not completely solve SPOF.

BookKeeper provides a no-SPOF efficient data store  for serving a large number of concurrent single-writer, multiple-reader logs. It stripes log entries across servers, leading to higher throughput. BookKeeper is opensource and is used in production systems.

BookKeeper presents two case studies: Hedwig and HDFS Namenode. Hedwig is a scalable topic-based publish-subscribe system. To guarantee the delivery of messages despite partitions and server failures, Hedwig uses logging to persist published messages, which is implemented with BookKeeper. Hedwig is in production use and serves push notifications for Yahoo! properties (e.g., notifications for mobile devices).

The other use case concerns replicating the HDFS Namenode, the component of HDFS (Hadoop Distributed File System) that manages the file system metadata. On each update, the Namenode writes synchronously to a journal to guarantee that the update is durable. But unfortunately the Namenode is a SPOF. To enable efficient journaling and strong durability through replication, BookKeeper is used for implementing a journal manager for HDFS. The implementation is currently part of the HDFS codebase.

BookKeeper design and architecture

BookKeeper has 3 main components:

  • A bookie is a BookKeeper storage server, and each bookie stores ledger fragments. A ledger is written across f+1 bookies for fault-tolerance and striping. 
  • BookKeeper client is used for interacting with bookies. 
  • Ledger abstracts a log file. It is a sequence of entries identified by a sequence number (id). 

BookKeeper assumes that there is only a single client writing to a ledger (clients can employ ZooKeeper coordination for this), and in return it guarantees that, once a ledger is closed, all other clients that read from it read the same sequence of entries.

Here is the happy path for BookKeeper. An application using BookKeeper initially designates a ledger writer.  This ledger writer creates a ledger and appends data to the ledger; only the ledger writer is able to append entries to the ledger. Eventually, after appending an arbitrary number of entries to the ledger, the ledger writer closes it. Once the ledger is closed, its content is immutable. Clients can open closed ledgers for reading and any individual ledger can have multiple readers over time, and even concurrent readers.

The main calls in the API enable applications to:

  • Create a ledger;
  • Add entries to a ledger;
  • Open a ledger for reading;
  • Read entries from a ledger;
  • Close a ledger to prevent further writes;
  • Delete a ledger.

All these calls have both a synchronous and an asynchronous version.
Creating and using a ledger.
When a client creates a ledger, it selects a set of bookies to form an ensemble for the ledger and stores the ensemble information as part of the ledger metadata on ZooKeeper. For each entry the ledger writer adds to the ledger, it replicates the entry across f+1 bookies. A request to add an entry e completes successfully if e has been successfully replicated across f+1 bookies. If a bookie crashes, then the client replaces that bookie. BookKeeper uses ZooKeeper to keep track of configuration changes for a ledger.

Closing a ledger.
When closing a ledger, the ledger writer writes to ZooKeeper the last entry that has been written successfully, as part of the ledger metadata. If a ledger writer crashes prematurely, before it closes its open ledger, a ledger reader would need to do ledger recovery.

Ledger recovery.
When a ledger reader opens a ledger for reading, it first obtains the ledger metadata. If it finds that it has not been closed by checking the state of the ledger, the ledger reader triggers a recovery procedure. The first step of recovery for a given ledger consists of having the reader client asking each bookie in the ensemble for the last add confirmed (LAC) field in the last entry that the bookie has processed for the ledger. Since reads are based on entry id, the recovery process can start reading from the highest LAC it receives, and thus it is not necessary to read the entire ledger.

Reading from an open ledger.
BookKeeper also enables clients to read from open ledgers. When clients need to read from an open ledger, they invoke a call to open the ledger that does not try to recover it if it is not closed. To avoid reading partially replicated entries from the ledger, which may not be in the ledger once it is closed, the client asks bookies for their LAC values. Reading entry i ≤ LAC is safe, since the ledger writer has marked it as successfully replicated.

Dealing with multiple ledgers

To enable recovery, upon each request to append an entry to a ledger, a bookie appends this entry to the journal and flushes the write to the local disk device. A bookie only acknowledges to the client once it receives a confirmation that the flush operation has completed successfully. Note that the journal is shared across all active ledgers the bookie is currently storing. A bookie also writes entries to the ledger device to serve read requests. Thus, read traffic does not affect the performance of writes to the journal device.

The ledger device stores ledger entries along with an index for each ledger. Bookie has a single file, called entry log, and interleaves entries of different ledgers by appending entries of all ledgers. For each ledger, Bookie also keeps in-memory an index mapping the entry identifier to its position in the entry log.

This design targets workloads dominated by writes, while not neglecting the performance of reads. Requests to add an entry to a ledger return as soon as the entry is flushed to the journal of a bookie, and writes to the ledger device are asynchronous and mostly sequential to enable the writes to this device to keep up with the writes to the journal device. To serve a read request, it is necessary to obtain the position of the entry in the entry log. If the index page is cached, then the read requires one disk seek.

BookKeeper stores metadata on ZooKeeper.
The ledger metadata includes the ensemble composition of ledgers, write quorum size, ledger status, the last entry successfully written to a closed ledger. For the metadata store, BookKeeper uses ZooKeeper. "A different, more scalable data store becomes necessary when the number of active ledgers is of the order of tens to hundreds of millions." For the availability of bookies, BookKeeper relies upon ZooKeeper because it provides ephemeral znodes and watches.


Experiments are conducted using a cluster of identical machines: 2 Quad Core Intel Xeon 2.5Ghz, 16GB of RAM, one 1 Gbit/s network interface, and four SATA drives of 1TB and rotational speed of 7200 RPM. Each machine in the cluster mounts an enterprise class filer via NFS (NetApp FAS3050). This hardware gives a raw performance of 1.2 milliseconds for the latency of add operations and 22.5k adds/sec for 1 kbyte entries when writing to a single bookie. nE-qQ denotes a ledger configuration with ensemble size n and write quorum q.

Using a 3E-2Q configuration, Figure shows throughput and latency for a single client as the maximum number of outstanding operations is increased. This leads to a higher throughput, in particular for 128-byte entries. No batching tricks employed to improve throughput, the processing of an operation is triggered by the call.

Here 12 clients write simultaneously to a set of bookies, and the aggregate throughput is measured. Compared to the results for a single client writer, the aggregate throughput is substantially higher for shorter entries. For longer entries, throughput is limited by the speed with which bookies are able to write to disk, so adding more bookies to the pool (configurations with 6E) results in increased throughput.


BookKeeper resembles chain replication a little. The chain replication approach is to export consensus to Paxos, and only store data providing high throughput. BookKeeper also does that, but chain replication is not referred to in the paper at all. Of course, chain replication lacks striping, and does not by default provide disjoint read replicas (in addition to write replicas) to improve read throughput.

The Tango paper mentions BookKeeper and states that it has an implementation of BookKeeper in 300 lines. How would you implement BookKeeper in Tango?  What can you speculate about the performance of BookKeeper versus TangoBookKeeper? Could you implement Tango using BookKeeper? How about transactions?

After reading the paper, I was kept with this question. What happens if the bookie writes the entry to its journal and acknowledges it, but dies before asynchronously writing this entry to its ledger? Does this cause any problems?

Final remarks
The paper does not talk about consistency of logging, because every consistency concern is exported to the ZooKeeper. I guess we can chalk this up as success points for ZooKeeper. BookKeeper's bottleneck for WAN deployment is ZooKeeper. If ZooKeeper is consulted infrequently things are OK. But if ZooKeeper is consulted frequently for LAC information in order to read from open ledgers, performance suffers.

Related links:
Flavio's blog post on BookKeeper
Flavio's presentation on BookKeeper

Sunday, September 28, 2014

Paper summary: Tango: Distributed Data Structures over a Shared Log

This paper is from the Microsoft Research Silicon Valley (which unfortunately recently got closed), and it appeared in SOSP'13. SOSP'13 provides open access, so here is the pdf for free. The talk video is also on YouTube as part of this SOSP'13 talks playlist. I think this paper didn't get the attention it deserves. It is really a great piece of work.

To facilitate construction of highly available metadata services, Tango provides developers with the abstraction of a replicated in-memory data structure (such as a map or a tree) backed by a shared log.

While ZooKeeper provides developers a fixed data structure (the data tree) for building coordination primitives, Tango enables clients to build different data structures based on the same single shared log. Tango also provides transactions across data structures.

The state of a Tango object exists in two forms. 1) a history: which is an ordered sequence of updates stored durably in the shared log, 2) any number of views: which are full or partial copies of the data structure --such as a tree or a map-- constructed from the log and stored in RAM on clients (i.e., application servers).

A client modifies a Tango object by appending a new update to the history; it accesses the object by first synchronizing its local view with the history. Views are soft state and are instantiated, reconstructed, and updated on clients by playing the shared history forward.

In Tango, the shared log provides: consistency, durability, history. Tango also provides atomicity and isolation for transactions across different objects by multiplexing & storing them on a single shared log.

Corfu shared log abstraction

Tango builds on the Corfu shared log abstraction, which employs flash disks to alleviate the concerns about the read from the history of the log, while writes are going on at the head of the log.

The CORFU interface consists of 4 calls:

  1. Clients can append entries to the shared log, obtaining an offset in return.
  2. They can check the current tail of the log. 
  3. They can read the entry at a particular offset.
  4. Clients can trim a particular offset in the log for garbage collection.
Corfu organizes a cluster of storage nodes into multiple, disjoint replica sets; for example, a 12-node cluster might consist of 4 replica sets of size 3. Each individual storage node exposes a 64-bit write-once address space, mirrored across the replica set. The cluster also contains a dedicated sequencer node, which is essentially a networked counter storing the current tail of the shared log.

To append, a client contacts the sequencer and obtains the next free offset in the global address space of the shared log. It then maps this offset to a local offset on one of the replica sets using a simple deterministic mapping (e.g., modulo function) over the membership of the cluster. The client then completes the append by directly issuing writes to the storage nodes in the replica set using a client-driven variant of Chain Replication.

The sequencer is merely an optimization to find the tail of the log and not required for correctness. The Chain Replication variant used to write to the storage nodes guarantees that a single client will "win" if multiple clients attempt to write to the same offset. When the sequencer goes down, any client can easily recover this state using the slow check operation on the shared log.

The Tango architecture

There are 3 components to a Tango object. 1) A Tango object contains the view, which is an in-memory representation of the object in some form, such as a list or a map. E.g., for TangoRegister this state is a single integer. 2) Each object implements the mandatory apply upcall which changes the view when the Tango runtime calls it with new entries from the log. By customizing the apply implementation, one client can build a "tree view" while another builds a "set view" reading from the same log. 3) Each object exposes an external interface of object-specific mutator and accessor methods; e.g., the TangoRegister exposes read/write methods.
The object's mutators do not directly change the in-memory state of the object. Instead, each mutator combines its parameters into an opaque buffer --an update record-- and calls the update helper function of the Tango runtime, which appends it to the shared log.

Similarly, the accessors do not immediately read the object's state. Each accessor first calls the query helper before returning an arbitrary function over the state of the object. The query helper plays new update records in the shared log until its current tail and applies them to the object via the apply upcall before returning.

Storing multiple objects on a single shared log enables strongly consistent operations across them without requiring complex distributed protocols.  The Tango runtime on each client can multiplex the log across objects by storing and checking a unique object ID (OID) on each entry. Such a scheme has the drawback that every client has to play every entry in the shared log, but layered partitioning, as we shall discuss soon, solves this problem. It enables strongly consistent operations across objects without requiring each object to be hosted by each client, and without requiring each client to consume the entire shared log.


Tango implements optimistic concurrency control by appending speculative transaction commit records to the shared log.  Commit records ensure atomicity, since they determine a point in the persistent total ordering at which the changes that occur in a transaction can be made visible at all clients. To provide isolation, each commit record contains a read set: a list of objects read by the transaction along with their versions, where the version is simply the last offset in the shared log that modified the object. A transaction only succeeds if none of its reads are stale when the commit record is encountered (i.e., the objects have not changed since they were read).

To denote a transaction, calls to object accessors and mutators can be bracketed by BeginTX and EndTX calls. BeginTX creates a transaction context in thread-local storage. EndTX appends a commit record to the shared log, plays the log forward until the commit point, and then makes a commit/abort decision.

Each client that encounters the commit record decides --independently but deterministically-- whether it should commit or abort by comparing the versions in the readset with the current versions of the objects. If none of the read objects have changed since they were read, the transaction commits and the objects in the write set are updated with the apply upcall.

For read-only transactions, the EndTX call does not insert a commit record into the shared log; instead, it just plays the log forward until its current tail before making the commit/abort decision. Tango also supports fast read-only transactions from stale snapshots by having EndTX make the commit/abort decision locally, without interacting with the log.

Write-only transactions require an append on the shared log but can commit immediately without playing the log forward.

Layered partitions

Each client hosts a (possibly overlapping) partition of the global state of the system, but this partitioning scheme is layered over a single shared log.  To efficiently implement layered partitions without requiring each client to play the entire shared log, Tango maps each object to a stream over the shared log.

A stream augments the conventional shared log interface (append and random read) with a streaming readnext call.  Many streams can co-exist on a single shared log; calling readnext on a stream returns the next entry belonging to that stream in the shared log, skipping over entries belonging to other streams. With this interface, clients can selectively consume the shared log by playing the streams of interest to them (i.e., the streams of objects hosted by them).

Each client plays the streams belonging to the objects in its layered partition. But, streams are not necessarily disjoint; a multiappend call allows a physical entry in the log to belong to multiple streams. When transactions cross object boundaries, Tango changes the behavior of its EndTX call to multiappend the commit record to all the streams involved in the write set. Multiappend ensures the following. A transaction that affects multiple objects occupies a single position in the global ordering; in other words, there is only one commit record per transaction in the raw shared log. A client hosting an object sees every transaction that impacts the object, even if it hosts no other objects.

Tango transactions has the following limitation though. Remote reads at the generating client is disallowed in a transaction: a client cannot execute transactions and generate commit records involving remote reads. Calling an accessor on an object that does not have a local view is problematic, since the data does not exist locally; possible solutions by invoking an RPC to a different client with a view of the object is expensive and complicated. So, if a client wants to do a transaction with reads on an object, the client should subscribe to the stream of that object.

Streaming Corfu

When the client-side library starts up, the application provides it with the list of stream IDs of interest to it. For each such stream, the library finds the last entry in the shared log belonging to that stream by asking the sequencer. The K backpointers in this entry allow it to construct a K-sized suffix of the linked list of offsets comprising the stream. It then issues a read to the offset pointed at by the Kth backpointer to obtain the previous K offsets in the linked list. In this manner, the library can construct the linked list by striding backward on the log, issuing N/K reads to build the list for a stream with N entries.


The experimental testbed consists of 36 8-core machines in two racks, with gigabit NICs on each node and 20 Gbps between the top-of-rack switches.  In all the experiments, they run an 18-node Corfu deployment on these nodes in a 9-by-2 configuration (i.e., 9 sets of 2 replicas each), such that each entry is mirrored across racks. The other 18 nodes are used as clients. The Corfu sequencer runs on a powerful, 32-core machine in a separate rack. They use 4KB entries in the Corfu log, with a batch size of 4 at each client.
Figure shows single object serializability. Reads wait the apply upcalls from the stream. If no writes, the reads are of little cost. As more writes occur, reads take more time to catch up. Probably reads may take more time than writes in Tango, but this is not shown in the graphs.
Figure shows performance for a primary/backup scenario where two nodes host views of the same object, with all writes directed to one node and all reads to the other. Overall throughput falls sharply as writes are introduced, and then stays constant at around 40K ops/sec as the workload mix changes; however, average read latency goes up as writes dominate, reflecting the extra work the read-only 'backup' node has to do to catchup with the primary.
Figure shows elasticity of linearizable read throughput with multiple views.

Figure shows transactions over layered partitions.

Tango vs. ZooKeeper.
Using Tango, the authors build ZooKeeper (TangoZK, 1K lines), BookKeeper (TangoBK, 300 lines), TreeSets and HashMaps (100 to 300 lines each). The performance of the resulting implementation is very similar to the TangoMap numbers in Figure 10; for example, with 18 clients running independent namespaces, they obtain around 200K txes/sec if transactions do not span namespaces, and nearly 20K txes/sec for transactions that atomically move a file from one namespace to another. The capability to move files across different instances does not exist in ZooKeeper, which supports a limited form of transaction within a single instance (i.e., a multi-op call that atomically executes a batch of operations).

They also implemented the single-writer ledger abstraction of BookKeeper in around 300 lines of Java code (again, not counting Exceptions and callback interfaces). To verify that their ZooKeeper and BookKeeper were full-fledged implementations, they ran the HDFS namenode over them (modifying it only to instantiate our classes instead of the originals) and successfully demonstrated recovery from a namenode reboot as well as fail-over to a backup namenode.


Tango fits within the State Machine Replication (SMR) paradigm, replicating state by imposing a total ordering over all updates. In the vocabulary of SMR, Tango clients can be seen as learners of the total ordering. The storage nodes comprising the shared log play the role of acceptors.

The findings in the Tango paper that a centralized server can be made to run at very high RPC rates matches recent observations by others. The Percolator system runs a centralized timestamp oracle with similar functionality at over 2M requests/sec with batching. Vasudevan et al. (SOCC'12) report achieving 1.6M submillisecond 4-byte reads/sec on a single server with batching. Masstree is a key-value server that provides 6M queries/sec with batching.

Tango's biggest contribution is that it provides multiple consistent object views from the same log. Objects with different in-memory data structures can share the same data on the log. For example, a namespace can be represented by different trees, one ordered on the filename and the other on a directory hierarchy, allowing applications to perform two types of queries efficiently (i.e., "list all files starting with the letter B" vs. "list all files in this directory"). Strongly consistent reads can be scaled simply by instantiating more views of the object on new clients. But is this free? Is this fast?

Tango's soft-belly is that it uses a pull-based approach of constructing the view from the shared log. Wouldn't a push-based approach be more timely? When a read comes, the pull-based approach may have a lot of catching up to do to the current state before it returns an answer. I guess it may be possible to simulate this with periodic pulls, even when no accessor function is invoked.

Tango provides a weird combination of centralized and decentralized. The log is centralized and this is exploited to provide serialization of distributed transactions. On the other hand, not having a master node and using the clients as learners is a very decentralized approach. Instead of one master taking decisions and updating the data structure, all of the clients are playing the log and taking decisions (in a deterministic way ensuring that they all make the same decisions), and updating their data structures. This resembles Lamport's extremely decentralized (to a fault!) implementation of the mutual exclusion which maintains replicated queues of all requests at all processes. (Of course, you can always code one client as master learner/decision-maker for other clients, and circumvent this!)

Tango vs. ZooKeeper.
Tango provides a better/higher-level programming support than ZooKeeper. What the Tango paper calls as Tango clients are servers that provide services for application-clients. (You may even say a Tango-client roughly corresponds to a "customized-view" ZooKeeper observer.) So, in terms of programmability and expressivity, Tango has the upper-hand. I presume using ZooKeeper for large-scale applications may become intractable and may result in spaghetti-code since ZooKeeper provides a very minimalistic/low-level-primitives for coordination. Tango, on the other hand, lets the developer build higher level abstractions of their own coordination services at the Tango-clients, and this benefits managing large projects while keeping complexity on a leash.

Comparing the efficiency of Tango and ZooKeeper, it seems like ZooKeeper would be better. In Tango, there are couple of indirections that are not present in ZooKeeper. In Tango, there is an extra step for sequencer node to get ticket/offset number. The Tango replication can correspond to ZooKeeper/Zab replication so they equal out there. But, Tango has another layer of indirection, where the clients need to read and learn from the log. In ZooKeeper, since the leader is also the decision maker, the app-client's learning can be from relatively compact state, whereas in Tango, this will be through replaying a sequence of commands and by constructing the state itself. Again, since Tango-client is like the ZooKeeper observer, that is another level of indirection before going to the app-client in Tango. So in total, two extra-levels are present in Tango (the sequencer contacting, and the Tango-client learning) that are not present in ZooKeeper. Tango provides better programmability and expressivity but this comes with a trade-off at the performance.

If your application is simple (and will remain simple), and can be implemented using ZooKeeper in a straightforward manner, it would be best to use ZooKeeper. Otherwise, by using Tango, you can have a better/extendible/tractable code-base, and potentially write some of your services as Tango-client that can even improve the performance.

Final remarks

Tango code is not open source. That is really unfortunate, as it could provide a good alternative to ZooKeeper for some applications that require coordination and transactions across distributed clients.

Since the sequencer is centralized Tango is not suitable for WAN deployments.

Some questions still remain. The stream sharing assignments seems to be done statically using the layered stream abstraction API. Can we do this on demand and dynamically?

How is the layered stream abstraction implemented at CORFU level over the replica groups? Would it pay to dedicate one group for one popular stream? This would make bulk reading possible from that replica set. (Similar to the columnar storage idea.)

Friday, September 19, 2014

Revisiting the EWDs

Dijkstra was the original hipster. He was blogging before blogging was cool. "For over four decades, he mailed copies of his consecutively numbered technical notes, trip reports, insightful observations, and pungent commentaries, known collectively as EWDs, to several dozen recipients in academia and industry. Thanks to the ubiquity of the photocopier and the wide interest in Dijkstra’s writings, the informal circulation of many of the EWDs eventually reached into the thousands." And, thanks to the efforts of the University of Texas at Austin CS Department, all of these EWDs have been accessible to the public conveniently.

I remember when I first discovered the EWDs as a fresh graduate student. I was mesmerized. I read them with a lot of joy. It was as if a new world had opened to me to discover. He had many insightful observations. I recommend all CSE graduate students to read the EWDs to grow their minds.

Now, I don't agree with Dijkstra on everything. He was too much of a perfectionist, and believed in getting things right in one shot. He had this to say on this:
There are very different programming styles. I tend to see them as Mozart versus Beethoven. When Mozart started to write, the composition was finished. He wrote the manuscript and it was 'aus einem Guss' (from one cast). In beautiful handwriting, too. Beethoven was a doubter and a struggler who started writing before he finished the composition and then glued corrections onto the page. In one place he did this nine times. When they peeled them, the last version proved identical to the first one.

In contrast to Dijkstra's position, I believe in rapid prototyping and that perfection comes from iteration.

Of course I still adore all the EWDs and respect Dijkstra all the same. I mean, look at these gems in the Wikiquotes page for Dijkstra:

  • It is not the task of the University to offer what society asks for, but to give what society needs.
  • The required techniques of effective reasoning are pretty formal, but as long as programming is done by people that don't master them, the software crisis will remain with us and will be considered an incurable disease. And you know what incurable diseases do: they invite the quacks and charlatans in, who in this case take the form of Software Engineering gurus.
  • Elegance is not a dispensable luxury but a quality that decides between success and failure.
  • The problems of the real world are primarily those you are left with when you refuse to apply their effective solutions.

Some of his writings can be construed as starting a flamewar (Are "Systems people" really necessary?  :-). But he always had an important point to make. In some of his EWDs, he role-played as the "Chairman of the Board" of the fictitious Mathematics Inc., "a company that commercialized mathematical theorems the same way that software companies commercialized computer programs". He did this to show how ridiculous it is to patent a theorem, algorithm, or code.

And then there is this: "The cruelty of teaching computer science."

This is a 30 page handwritten (beautifully) manifesto against the state of CS teaching then, which unfortunately got worse in the following years. The manifesto finishes with a bang!
Teaching to unsuspecting youngsters the effective use of formal methods is one of the joys of life because it is so extremely rewarding. Within a few months, they find their way in a new world with a justified degree of confidence that is radically novel for them; within a few months, their concept of intellectual culture has acquired a radically novel dimension. To my taste and style, that is what education is about. Universities should not be afraid of teaching radical novelties; on the contrary, it is their calling to welcome the opportunity to do so. Their willingness to do so is our main safeguard against dictatorships, be they of the proletariat, of the scientific establishment, or of the corporate elite.

And about Microsoft's closing of the MS Research at Silicon Valley:

Paper summary: Can a decentralized metadata service layer benefit parallel filesystems?

Parallel filesystems do a good job of providing parallel and scalable access to the data transfer, but, due to consistency concerns, the metadata accesses are still directed to one metadata server (MDS) which becomes a bottleneck. This is a problem for scalability because studies show that over 75% of all filesystem calls require access to file metadata.

This paper proposes to adopt ZooKeeper as a decentralized MDS for parallel filesystems and test whether that improves performance. You can ask, what is decentralized about ZooKeeper, and you would be right about the update requests. But for read requests, ZooKeeper helps by allowing any ZooKeeper server to respond while guaranteeing consistency. (You would still need to do a sync operation if the request needs the read to be freshest and satisfy precedence order.)

If you recall, ZooKeeper uses a filesystem API to enable clients to build higher-level coordination primitives (group membership, locking, barrier sync). This paper is interesting because it takes ZooKeeper and uses it directly as a metadata server for a filesystem leveraging the filesystem API ZooKeeper exposes in a literal manner. FUSE is used to act as a glue between the ZooKeeper as MDS and the underlying physical storage filesystem.

Distributed Union FileSystem (DUFS) architecture

A DUFS client instance does not interact directly with other DUFS clients; Any necessary interaction is made through ZooKeeper service. The figure shows the basic steps required to perform an open() operation on a file using DUFS.

  • A. The open() call is intercepted by FUSE which gives the virtual path of the file to DUFS.
  • B. DUFS queries ZooKeeper to get the Znode based on the filename and to retrieve the file id (FID).
  • C. DUFS uses the deterministic mapping function to find the physical path associated to the FID.
  • D. Finally, DUFS opens the file based on its physical path. The result is returned to the application via FUSE.

Alternatively, directory operations take place only at the metadata level, so only ZooKeeper is involved and not the back-end storage. For example, the directory stat() operation is satisfied at the Zookeeper itself (the back-end storage is not contacted) since we maintain the entire directory hierarchy in Zookeeper.


These tests were performed on a Linux cluster. Each node has a dual Intel Xeon E5335 CPU (8 cores in total) and 6GB memory. A SATA 250GB hard drive is used as the storage device on each node. The nodes are connected with 1GigE.

I am bugged by some of the limitations of the evaluation. In these tests the ZooKeeper servers are colocated (running on the same node) as the DUFS client. This naturally achieves wonders for read request latencies! But this is not a very reasonable set up. Moreover, the clients are not under the control of DUFS, so it is not a good idea to deploy your ZooKeeper servers on clients which are uncontrolled and can disconnect any time. Finally, this disallows clients from faraway. Of course ZooKeeper does not scale to WAN environment, and all the tests are done in a controlled cluster environment.


This paper investigates an interesting idea, that of using ZooKeeper as MDS of parallel filesystems to provide some scalability to the MDS. Thanks to the advantages of ZooKeeper, this allows improved read access because those can be served consistently from any ZooKeeper server. And, due to limitations of the ZooKeeper, this fails to address the scalability of update requests (throughput of update operations actually decrease as the number of ZooKeeper replicase increase) and also lacks the scalability needed for WAN deployments. Another limitation of this approach is that the metadata need to be able to fit into a single ZooKeeper server (and of course also the ZooKeeper replicas), so there is a scalability problem with respect to the filesystem size as well.

We are working on a scalable WAN version of ZooKeeper, and we will use the parallel filesystems as our application to showcase a WAN filesystem leveraging our prototype coordination system.

Friday, September 12, 2014

Distributed system seminar talk: Data grouping framework for energy-efficiency in distributed storage systems

My research group and Tevfik's research group meet jointly for a weekly distributed systems. This gives our students a chance to give talks about current project and get feedback for improvement in a friendly setting.

In this week's seminar, Luigi presented his research on building energy-efficient file systems. I was initially skeptical about energy-efficiency as a research topic. Academicians like to work on things that they can quantify and improve, so I was thinking that energy-efficiency in distributed storage was an opportunistic research problem, rather than a real-world problem. Turns out, I couldn't be any more wrong: IT companies spend $10 billions every year on energy consumption (This is 3% of entire expenditure of US!). $3.5 billion of that $10 billion is energy expenditure is due to the storage systems.

Dynamic power management (DPM) is the primary mechanism for energy saving at the storage systems. DPM basically means turn the disk off if you're not using it. An idling disk spends energy because it is still rotating, and this mechanic motion which burns energy. But turning a disc off is not easy. It takes 10s of seconds to stop and start hard disk, and the energy usage spikes at these transition points. This makes the problem into an optimization problem. When is it beneficial to turn the disk off? How can you create gaps long enough to turn off the disk?

The literature discusses the following DPM-enabling techniques for energy-saving in storage systems. Most of these techniques prescribe data access locality improvements.

1) Memory and disk caching: Caching is not only good for providing low-latency but also in some cases good for saving energy. If we can use cache to answer instead of turning on the disk, we can give the disk more time to sleep. But what should be the cache size? If it is too small, data won't fit, this won't provide much/any saving. If it is too large, the cache itself may consume more energy than it saves.

2) Diverting accesses: Data is stored redundantly, so this gives us the opportunity to spin down some redundant disks by diverting the accesses to the already active/hot ones. Unsurprisingly, there is a tradeoff of increased latency in doing so. By limiting concurrency/parallelism you increase latency of replies. (Is energy-efficiency versus latency a fundamental tradeoff in distributed storage?) Maybe, by offering well-drafted SLA agreements to the clients, it is possible to give incentive to the client for trading energy efficiency for slightly increased latency.

3) Popular data clustering: This technique prescribes organizing the disk storage based on the previously observed access locality of data. So if a disk is hot, it is likely to stay hot, and if a disk gets cold, it is likely to stay cold and it can sleep.

I guess there also could be orthogonal techniques if you don't need to serve requests in real-time. For those cases you have the opportunity to batch-schedule accesses.

Luigi is working on a hybrid of these techniques to provide as much energy-efficiency as possible. I wouldn't have thought energy-efficiency for distributed storage could be this interesting. There might even be a couple distributed algorithms problem here that I would enjoy.

Paper summary: ZooKeeper: Wait-free coordination for Internet-scale systems

Zookeeper is an Apache project for providing coordination services to distributed systems. ZooKeeper aims to provide a simple kernel (a filesystem API!) for empowering the clients to build more complex coordination primitives. In this post I will provide a summary of the ZooKeeper paper, and talk about some future directions I can see this going.

"Client" denotes a user of the ZooKeeper service, "server" denotes a process providing the ZooKeeper service, and "znode" denotes an in-memory data node (similar to the filesystem inode) in the ZooKeeper. znodes are organized in a hierarchical namespace referred to as the data tree.
There are 2 types of znodes. "Regular": Clients manipulate regular znodes by creating and deleting them explicitly. "Ephemeral": Clients create ephemeral znodes, and they either delete them explicitly, or let the system delete them automatically when the client's session termination. Additionally, when creating a new znode, a client can set a "Sequential" flag. Nodes created with the sequential flag set have the value of a monotonically increasing counter appended to its name. If n is the new znode and p is the parent znode, then the sequence value of n is never smaller than the value in the name of any other sequential znode ever created under p.

ZooKeeper also implements "watches" on znodes to allow clients to receive timely notifications of changes without requiring polling.

The API ZooKeeper provides to the clients

create(path, data, flags)
delete(path, version)  // operation is conditional on version (if provided)
exists(path, watch)
getData(path, watch)
setData(path, data, version) // operation is conditional on version (if provided)
getChildren(path, watch)

All methods in the API have both a synchronous and an asynchronous version. A client uses the synchronous API when it needs to execute a single ZooKeeper operation and it has no concurrent tasks to execute, so it makes the necessary ZooKeeper call and blocks. The asynchronous API enables a client to have both multiple outstanding ZooKeeper operations and other tasks executed in parallel. ZooKeeper guarantees that the corresponding callbacks for each operation are invoked in order.

Using ZooKeeper to implement coordination primitives

Configuration Management: The configuration is stored in a znode, zc. Processes start up with the full pathname of zc. Starting processes obtain their configuration by reading zc with the watch flag set to true. If the configuration in zc is ever updated, the processes are notified and read the new configuration, again setting the watch flag to true.

Rendezvous: When the master starts it fills in zr with information about addresses and ports it is using. When workers start, they read zr with watch set to true. If zr has not been filled in yet, the worker waits to be notified when zr is updated.

Group Membership: A znode, zg, is created to represent the group. When a process member of the group starts, it creates an ephemeral child znode under zg. If the process fails or ends, the znode that represents it under zg is automatically removed. Processes may put process information in the data of the child znode, e.g., addresses and ports used by the process. Processes may obtain group information by simply listing the children of zg. If a process wants to monitor changes in group membership, the process can set the watch flag to true and refresh the group information (always setting the watch flag to true) when change notifications are received.

Simple locks: To acquire a lock, a client tries to create the designated znode with the EPHEMERAL flag. If the create succeeds, the client holds the lock. Otherwise, the client can read the znode with the watch flag set. A client releases the lock explicitly or it is removed by timeout if it dies. Other clients that are waiting for a lock try again to acquire a lock once they observe the znode being deleted.

Simple Locks without Herd Effect: All the clients requesting the lock are lined up and each client obtains the lock in order of request arrival.
To lock: 
1 n = create(l + “/lock-”, EPHEMERAL|SEQUENTIAL)
2 C = getChildren(l, false)
3 if n is lowest znode in C, exit
4 p = znode in C ordered just before n
5 if exists(p, true) wait for watch event 6 goto 2

To unlock:
1 delete(n)

Read/Write Locks: The lock procedure is changed slightly to include separate read lock and write lock procedures.
Write Lock
1 n = create(l + “/write-”, EPHEMERAL|SEQUENTIAL)
2 C = getChildren(l, false)
3 if n is lowest znode in C, exit
4 p = znode in C ordered just before n
5 if exists(p, true) wait for event 6 goto 2

Read Lock
1 n = create(l + “/read-”, EPHEMERAL|SEQUENTIAL)
2 C = getChildren(l, false)
3 if no write znodes lower than n in C, exit
4 p = write znode in C ordered just before n
5 if exists(p, true) wait for event
6 goto 3

You can build even more powerful coordination primitives using ZooKeeper, and a Python binding is also made available here.

Zookeeper applications at Yahoo!: ZooKeeper is used for the Fetching Service (FS) to achieve recovering from failures of masters, guaranteeing availability despite failures, and decoupling the clients from the servers, and allowing them to direct their request to healthy servers by just reading their status from ZooKeeper. FS uses ZooKeeper mainly to manage configuration metadata. FS is read-heavy, 10:1 to 100:1. As another example, Yahoo! Message Broker (YMB), a distributed publish-subscribe system, uses ZooKeeper to manage the distribution of topics (configuration metadata), deal with failures of machines in the system (failure detection and group membership), and control system operation.

Other practical uses of Zookeeper has been explained nicely here.

ZooKeeper architecture/internals

The replicated database is an in-memory database containing the entire data tree. Each znode in the tree stores a maximum of 1MB of data by default. For recoverability, ZooKeeper efficiently logs updates to disk, and forces writes to be on the disk media before they are applied to the in-memory database.

Every ZooKeeper server services clients. Clients connect to exactly one server to submit its requests. Read requests are serviced from the local replica of each server database.

Requests that change the state of the service, write requests, are processed by an agreement protocol. As part of the agreement protocol write requests are forwarded to a single server, called the leader. The rest of the ZooKeeper servers, called followers, receive message proposals consisting of state changes from the leader and agree upon state changes. This is similar to how Paxos works.

ZooKeeper's atomic broadcast protocol (Zab) uses by default simple majority quorums to decide on a proposal, so Zab and thus ZooKeeper can only work if a majority of servers are correct (i.e., with 2f + 1 server we can tolerate f failures). Zab guarantees that changes broadcast by a leader are delivered in the order they were sent and all changes from previous leaders are delivered to an established leader before it broadcasts its own changes.

More specifically, Zab/ZooKeeper provides both of these two basic ordering guarantees:
Linearizable writes: all requests that update the state of ZooKeeper are serializable and respect precedence.
FIFO client order: all requests from a given client are executed in the order that they were sent by the client.

ZooKeeper vs Paxos

ZooKeeper provides FIFO client order property, but Paxos doesn't. Paxos may violate the FIFO client property as follows.

Proposer P1 executes Phase 1 for sequence numbers 27 and 28. It proposes values A and B for sequence numbers 27 and 28, respectively, in Phase 2 with ballot number 1. Both proposals are accepted only by acceptor A1. Proposer P2 executes Phase 1 against acceptors A2 and A3, and end up proposing C in Phase 2 to sequence number 27 with ballot number 2. Finally, proposer P3, executes Phase 1 and 2, and is able to have a quorum of acceptors choosing C for sequence number 27, B for sequence number 28, and D for 29.

ZooKeeper argues that such a run is not acceptable because the state change represented by B causally depends upon A, and not C. Consequently, B can only be chosen for sequence number i+1 if A has been chosen for sequence number i, and C cannot be chosen before B, since the state change that B represents cannot commute with C and can only be applied after A.

Client server interaction

When a server completes a write operation, it also sends out and clears notifications relative to any watch that corresponds to that update. Servers process the writes the leader server sends in order and do not process other writes or reads concurrently in order to ensure strict succession of notifications. Note that servers handle notifications locally. Only the server that a client is connected to tracks and triggers notifications for that client.

One drawback of using fast reads (local reads at one server) is not guaranteeing precedence order for read operations. That is, a read operation may return a stale value, even though a more recent update to the same znode has been committed. Not all applications require precedence order, but for applications that do require it, the sync primitive is used. To guarantee that a given read operation returns the latest updated value, a client calls sync before the read operation. Sync flushes the pipes so to speak. The FIFO order guarantee of client operations together with the global guarantee of sync enables the result of the read operation to reflect any changes that happened before the sync was issued.

Read requests are handled locally at each server. Each read request is tagged with a zxid that corresponds to the last transaction seen by the server. ZooKeeper servers process requests from clients in FIFO order; responses include the zxid that the response is relative to. Even heartbeat messages during intervals of no activity include the last zxid seen by the server that the client is connected to. This zxid defines the partial order of the read requests with respect to the write requests. If the client connects to a new server, that new server ensures that its view of the ZooKeeper data is at least as recent as the view of the client by checking the last zxid of the client against its last zxid. If the client has a more recent view than the server, the server does not reestablish the session with the client until the server has caught up.

To detect client session failures, ZooKeeper uses time-outs. To prevent the session from timing out, the ZooKeeper client library sends a heartbeat after the session has been idle for s/3 ms and switch to a new server if it has not heard from a server for 2s/3 ms, where s is the session timeout in milliseconds.


The evaluation is performed on a cluster of 50 servers. For the target workloads, 2:1 to 100:1 read to write ratio, it is shown that ZooKeeper can handle tens to hundreds of thousands of transactions per second. Each client has at least 100 requests outstanding. Each request consists of a read or write of 1K of data.

As you add ZooKeeper servers, the read throughput improves, bu the write throughput degrades. This is because atomic broadcast needs to be done via Zab. Also the servers need to ensure that transactions are logged to non-volatile store before sending acknowledgments back to the leader.


ZooKeeper provides a minimalist and flexible coordination system and found a lot of use in production distributed systems. Zookeeper scales well with increase in read operations, but does not with increase in write operations. Zookeeper also does not scale with more Zookeeper replicas added. To alleviate this observer replicas are used, but they are limited in operation, and do not allow/benefit write operations. Finally, due to very large latencies involved ZooKeeper cannot handle across the WAN deployment of ZooKeeper servers.

In most places ZooKeeper is punting the ball to the clients. Yes, this is due to minimalistic design and such, but this burdens the clients to solve the transactional update themselves, and we know that this is error-prone. Maybe this is really the way to go. Or maybe this is the soft-belly of ZooKeeper and a big opportunity to provide a new coordination tool.

ZooKeeper is a great start, but we are just at the beginning.

Exercise questions

How does ZooKeeper implement/provide ephemeral nodes?

How can you implement distributed counters without using sequential flag?

How can you implement general purpose transactions on ZooKeeper?
Is ZooKeeper enough to implement general transactions? What is missing?

Why not use ZooKeeper for serializing and reliably storing all data? Why is it a bad idea to use ZooKeeper for maintaining application logs?

Related links

High-availability distributed logging with BookKeeper
Apache Curator project maintains most common ZooKeeper client algorithms

Sunday, August 31, 2014

Sudoku and research

I got addicted to Sudoku... again. When I have a chance to rest, I enjoy the challenge of solving Sudoku puzzles (*cough* at the expert level) on my iPhone. I think this practice gives my mind sharpness and clarity (my wife would roll her eyes so hard if she heard me say this :-).

I was recently thinking of how the process of solving a Sudoku puzzle resembles doing research.

1) Sequencing is important. In Sudoku, you take things step by step. You fill out the obvious cells first. Having filled these, you now have more constraints/clues upon which you can fill in other blocks. You have to solve Sudoku step by step from most obvious to what is made obvious having finished that step.

This is also the case in research. You can't rush things; you should start with the simple steps. First you have to attack/complete what you can do currently, so that more things can become available for you to see. You have to climb the stair step by step to see more and do more.

2) Writing is important. You take notes on the Sudoku cells to give you clues, e.g., 4 can go into this cell or this cell. These clues eventually lead to constraints and to solutions. Without taking notes, you wouldn't be able to make any progress on hard Sudoku puzzles.

You are all computer scientists.
You know what FINITE AUTOMATA can do.
You know what TURING MACHINES can do.
For example, Finite Automata can add but not multiply.
Turing Machines can compute any computable function.
Turing machines are incredibly more powerful than Finite Automata.
Yet the only difference between a FA and a TM is that
the TM, unlike the FA, has paper and pencil.
Think about it.
It tells you something about the power of writing.
Without writing, you are reduced to a finite automaton.
With writing you have the extraordinary power of a Turing machine.
(From Manuel Blum's advice to graduate students)

Similarly, writing is very important for research. It leads the way for you. You start writing as you start the research work, and before you do the work/experiments. I think I said this many times before, so I will leave this at that. (How I write, How to write your research paper, My advice to graduate students)

3) Perspective is important. In Sudoku, when you are stuck, you change your perspective and look for alternatives, because there is always another easier way to look at the situation and get unstuck.
A change in perspective is worth 80 IQ points.
Alan Kay.

(Again from Manuel Blum's advice to graduate students)
CLAUDE SHANNON once told me that as a kid, he remembered being stuck on a jigsaw puzzle.
His brother, who was passing by, said to him:
"You know: I could tell you something."
That's all his brother said.
Yet that was enough hint to help Claude solve the puzzle.
The great thing about this hint... is that you can always give it to yourself !!!
I advise you, when you're stuck on a hard problem,
to imagine a little birdie or an older version of yourself whispering
"... I could tell you something..." 
I once asked UMESH VAZIRANI how he was able,
as an undergraduate at MIT,
to take 6 courses each and every semester.
He said that he knew he didn't have the time to work out his answers the hard way.
He had to find a shortcut.
You see, Umesh understood that problems often have short clever solutions.

In research, ... yup, you need to learn to change your perspective, and try different point of views.

4) Finally perseverance is important. In Sudoku, you learn patience and perseverance as you consider different things to make some progress. As you put in more time practicing Sudoku puzzles, you start noticing patterns, and you learn to solve those cases faster. You also develop intuition, which makes you get better at sequencing. In research, patience, perseverance, and practice are also essential.
Whatever you do, you got to like doing it....
You got to like it so much that you're willing to think about it, work on it, long after everyone else has moved on.
(Again from Manuel Blum's advice to graduate students)

Heeding my own warning on reverse scooping, I googled for "Sudoku and research" and found this nice post, which has made similar connections.
After doing a few [Sudoku puzzles], it struck me that these puzzles are a good analogy for the way science research is done. Thomas Kuhn in his classic book The Structure of Scientific Revolutions points out that normal scientific research within a paradigm is largely a puzzle solving exercise in which there is an assurance that a solution exists to the problem and that it is only the ingenuity of the scientist that stands between her and a solution. The sudoku problem is like that. We know that a solution of a particular form exists and it is this belief that makes people persevere until they arrive at a solution.