Wednesday, June 3, 2015

Book Review: "Rework", 2010, Jason Fried & David Heinemeier Hansson

This is a book from the founders of 37signals. The book is full of simple commonsense advice for business and work. Yet these advice are also very fresh. How come?? Since we have been trying/inventing/pushing complex complicated rules & approaches for business and work in lieu of simple straightforward approaches, the simple commonsense advice in the book do come across as surprisingly fresh.

Just to give one example, for culture, the book says: "You can't build it, it occurs. Whatever you value and do (actions speak louder than words), it will become the culture of your company/workers overtime." Commonsense? Yes. But only after you take a step back and think, you say "Huh. That is obvious."

Another after-the-matter obvious advice: Try to underdo your competition, just do fewer things but better. You should stand for something. A strong stand is how you attract superfans. Covering everything means you don't have focus. Provide the main functionality as simple as possible, avoid the bells and whistles, and be unashamedly proud about this.

Maybe the insight behind this advice is you should necessarily have a transformational idea. And when you have that, the auxiliary stuff do not matter that much, and may even distract from your transformational idea.

This next advice is very interesting and stands out in controversial way. The book says: Don't accept outside money for your startup. You will find enough money yourself, if you implement the less is more principle. If you can't fund it yourself, and get it prototype yourself, it is not worth doing it. You are not thinking less is more. When you get outside money, you are bound to make the sponsors/vendors happy and lose your flexibility and freedom for the following reasons: you give up control; cashing out begins to trump building a quality business; spending other people's money is addictive; it's usually a bad deal; you start building what investors what instead of what customers want; raising money is incredibly addictive.

The book got my respect when it said writing is very important. The book says: Hire the guy who is a better writer. Clear writing is a sign of clear thinking. Great writers know how to communicate, empathize, and know what to omit and what to stress.

For the writing style, the books says: When you are writing, write with your own voice. This is also a matter of honesty. Honesty is the smart policy in business. So why come of like dishonest when you write artificially. The important thing is to communicate, and you should just focus on that.

The book has so many other commonsense, simple, yet refreshing and radical advice. The book is a very good very easy read. It is written in a conversational style, and you feel like you are having a discussion with Jason and David. And, it seems like the book got very good reviews from business and thought leaders.

Wednesday, May 6, 2015

How to run effective paper reading groups

Every year, I offer a distributed systems reading group seminar, where we discuss recent interesting research papers. (Here is the list of papers we covered this Spring.)

Reading group seminars are a lot of fun when everything clicks. But they can easily turn into soul-draining boring meetings when a couple of things go wrong. Here are some common bad meeting patterns: (1) the presenter goes on and on with a dry presentation, (2) without a common background, the participants bombard the presenter with a lot of questions just to get the context of the work and a lot of time is wasted just to get started on the paper, (3) the audience drifts away (some fall into their laptop screens, some start to fiddle with their phones), and (4) in the discussion phase an awkward silence sets in and crickets chirp.

I have been trying to tinker with the format of my reading group meetings to avoid those problems and improve the chances that everything clicks together. And over time I have been learning from trial and error what works and what doesn't. Based on what I learned, here is the most recent format I propose.

There are 3 components to a successful reading group meeting: the participants, the presenter, and the discussion process.

1. The participants should come prepared.

Here are the tricks I found to be most effective for ensuring that the participants come to the group meeting having read the paper and primed for fruitful discussion.

Cover only 1 paper per meeting. Trying to cover 2 papers in one meeting does not work. It is too much work to ask the participants to read 2 papers. Also 2 papers per meeting dilutes the focus and intensity.

Utilize the conference presentation video, if applicable. If the paper has a conference presentation video that really helps. Watching that 20 minute pitch about the paper makes it much easier to read the paper than attempting a cold reading of the paper. The OSDI and SOSP conferences make all the paper presentation videos openly available, which have been of great help for our seminar group.

Solicit questions from the participants before the meeting. In order to get the participants to start thinking critically about the paper, I ask them to write their questions in Piazza 1-2 days before the paper presentation. These questions also help the presenter to focus the presentation and get prepared for the discussion.

2. The presenter should come prepared.

Believe it or not, the presenter may come to the meeting with a superficial understanding of the paper. This makes things really awkward. In order to ensure that the presenter comes ready with a deep understanding of the paper, these are what we adopted.

Ask the presenter to submit a 2-3 page review of the paper. Preparing a powerpoint presentation only creates a superficial understanding, or even a false sense of security that the presenter understands (and can communicate) the paper. In order for the presenter to understand the paper better at a deeper conceptual level, I ask the presenter to write a 2-3 page review of the paper.

The presenter prints the copies of his review and brings this to the meeting for review before his presentation. All the participants review the hardcopy of the review-report in the first 15 minutes of the meeting. So in the first 15 minutes, it is all silence. The presenter is at the podium, getting adjusted, while we review his report. I got this idea from reading about how Bezos runs meetings.

Writing the report helps the presenter to focus on the message and really understand the paper. Reading the report critically prepare the participants to benefit more from the presentation. Their minds start thinking, posing questions about the content, and they listen to the presentation more actively as a result.

Restrict the presenter to a 10-15 slide presentation maximum. The presenter should perform no more than a 10-15 slide presentation. The slides should be mostly for visuals: figures, tables, graphs.

3. The discussion process should be primed.

The discussion phase needs some priming to achieve a fruitful discussion, a critical analysis of the paper, as well as brainstorming for creative extensions to the paper.

Spare enough time for question answering. We go with a separate dedicated question answering phase in addition to clarification questions that may come during the presentation. After the presentation, the presenter starts answering questions with first those that were submitted 1-2 day in advance on Piazza. The audience members who submitted the questions read the questions loudly, the presenter answers. And then we have more questions from the floor and more comments from the floor.

Elicit hands-on participation by writing mock paper reviews. In order to keep participants engaged deeply and elicit critical thinking, I recently got the idea to perform mock paper reviews in the group meetings. I plan to implement this idea for our upcoming reading group meetings.

After the question-answer phase, I will ask participants to form groups of 3, and mock-review the paper. I will ask the groups to commit to either a very-bad/critical review of the paper or a very-good/championing review of the paper. The reason for pushing to these extremities to force ourselves to come up with strong arguments, and not give wishy-washy reviews. The groups will write the review collaboratively using Google docs. In a group, one participant may research about related work, and write that part of the review, while another may write about motivation/application aspect of the paper, and the other about the technical/methods aspects.

Another group can form to write about related research questions to the current paper, in order to come up with interesting (and secondarily actionable) directions for future work. Again the group needs to be aggressive in its effort and brainstorm to come up with "novel", albeit speculative/little-far-fetched, research ideas.

I am a big proponent of making/producing something, albeit little, as much as possible. (That's why I have this blog.) With this setup we will not only criticize/analyze a work, but will also get to quickly synthesize and produce some by products, in terms of reviews and blog posts. Sort of like a mini-hackathon, shall we say a hack-a-paper session.

Would it be possible to use this format in teaching courses? I guess, for graduate level courses with less than 20 students, this format could also be used. This format is similar to flipped classroom, but with more critical thinking and writing/producing focus. With more than 20 students, I don't think the discussion and question-answering will scale. Also it may be hard to fit all this activities in a 50 minute class; an 80 minute class would work though.

Related links
How I read a research paper
How to present your work
Tell me about your thought process, not just your results
How to write your research paper
How I write
Good Prose and How to write a lot
Advice to graduate students

Wednesday, April 29, 2015

Arrakis: The OS is the control plane

This paper (authored by Simon Peter, Jialin Li, Irene Zhang, Dan R. K. Ports, Doug Woos, Arvind Krishnamurthy, and Thomas Anderson, University of Washington; Timothy Roscoe, ETH Zürich) was awarded a best paper award in OSDI 2014. 

The paper "described and evaluated Arrakis, a new operating system designed to remove the kernel from the I/O data path without compromising process isolation. Unlike a traditional operating system, which mediates all I/O operations to enforce process isolation and resource limits, Arrakis uses device hardware to deliver I/O directly to a customized user-level library. The Arrakis kernel operates in the control plane, configuring the hardware to limit application misbehavior."

The Arrakis paper avoids mentioning containers, but what they propose has a lot of applicability to the containers technology. Containers aim to provide isolation/portability of VM without incurring the overhead of VMs. So containers run an application set on the OS and raw metal with better performance instead of running it on a VM layer. Arrakis is providing OS level technology to improve efficiency for the same goal.

The Arrakis approach is also closely related to the ExoKernel and MicroKernel approach. Containers, ExoKernel, Xen Unikernel, and the Arrakis project form a spectrum from monolithic to microkernel OS. It seems like Tanenbaum will have the last laugh.

Hardware support

Arrakis exploits hardware support provided for Virtual-Machine-level virtualization, and pushes further and implements virtualization at the application (or potentially at the container) level. Arrakis is built on Barrelfish, which already supports standalone user-mode device drivers, akin to found in microkernels. The paper argues that with some modifications the idea can be brought to Linux as well.

This is what Arrakis requires from the hardware:
"Arrakis assumes the network devices provide support for virtualization by presenting themselves as multiple virtual network interface cards (VNICs) and that they can also multiplex/demultiplex packets based on complex filter expressions, directly to queues that can be managed entirely in user space without the need for kernel intervention. Similarly, each storage controller exposes multiple virtual storage interface controllers (VSICs) in our model. Each VSIC provides independent storage command queues (e.g., of SCSI or ATA format) that are multiplexed by the hardware. Associated with each such virtual interface card (VIC) are queues and rate limiters."

"Network cards that support SR-IOV have the key elements of this model: they allow the creation of multiple VNICs that each may have multiple send and receive queues, and support at least rudimentary transmit and receive filters."

"Storage controllers have some parts of the technology needed to provide the interface we describe. For example, RAID adapters have a translation layer that is able to provide virtual disks above physical extents, and SSDs use a flash translation layer for wear-leveling. SCSI host-bus adapters support SR-IOV technology for virtualization and can expose multiple VSICs, and the NVMe standard proposes multiple command queues for scalability."

Tuesday, April 28, 2015

Large-scale cluster management at Google with Borg

This paper is by Abhishek Verma, Luis Pedrosa, Madhukar Korupolu, David Oppenheimer, Eric Tune, and John Wilkes and it appeared recently in EuroSys 2015.

Google's Borg is a cluster manager that admits, schedules, starts, restarts, and monitors all applications that Google runs. Borg runs 100K of jobs across a number of clusters each with 10K of machines.

Borg cells (1000s of machines that belong to a single cluster and are managed as a unit) run a heterogenous workload with two main parts. The first is long-running services that should never go down, and handle quick requests: e.g., Gmail, Google Docs, web search, BigTable. The second is user submitted batch jobs. Each job consists of multiple tasks that all run the same program (binary).

Each task maps to a set of Linux processes running in a container on a machine. The vast majority of the Borg workload does not run inside virtual machines (VMs) in order to avoid the cost of virtualization. Containers are so hot right now.


Each cell's Borgmaster consists of two processes: the main Borgmaster process and a separate scheduler.

The Borgmaster process handles client RPCs to create, edit, view job, and also communicates with the Borglets to monitor/maintain their state. (The Borglet is a machine-local Borg agent that starts, stops, restarts tasks at a machine. The Borgmaster polls each Borglet every few seconds to retrieve the machine's current state and send it any outstanding requests.) The Borgmaster process is logically a single process but is actually Paxos replicated over 5 servers.

When a job is submitted, the Borgmaster records it in Paxos and adds the job's tasks to the pending queue. This is scanned asynchronously by the scheduler, which assigns tasks to machines if there are sufficient available resources that meet the job's constraints. The scheduling algorithm has two parts: feasibility checking, to find machines on which the task could run, and scoring, which picks one of the feasible machines. If the machine selected by the scoring phase doesn't have enough available resources to fit the new task, Borg preempts (kills) lower-priority tasks, from lowest to highest priority, until it does.


"Centralized is not necessarily less scalable than decentralized" is a pet pieve of mine. So, I went all ears when I read this section. The paper said: "We are not sure where the ultimate scalability limit to Borg's centralized architecture will come from; so far, every time we have approached a limit, we've managed to eliminate it."

One early technique they used for scalability of the Borgmaster is to decouple the Borgmaster into a master process and an asynchronous scheduler. A scheduler replica operates on a cached copy of the cell state from the Borgmaster in order to perform a scheduling pass to assign tasks. The master will accept and apply these assignments unless they are inappropriate (e.g., based on out of date state), just like in optimistic concurrency control (OCC). To improve response times, they added separate threads to talk to the Borglets and respond to read-only RPCs.

A single Borgmaster can manage many thousands of machines in a cell, and several cells have arrival rates above 10000 tasks per minute. A busy Borgmaster uses 10–14 CPU cores and up to 50 GiB RAM.

In order to achieve the scalability of the scheduler, Borg employs score caching, grouping & treating tasks in equivalence classes, and performing relaxed randomization (basically sampling on machines). These reduced the scheduling time of a cell's entire workload from scratch from 3 days to a few 100s of seconds. Normally, an online scheduling pass over the pending queue completes in less than half a second.

Related work

There is the Apache Mesos project, which originated from a UC Berkeley class project. Mesos formed the basis for Twitter's Aurora, a Borg-like scheduler for long running services, and Apple's Jarvis, which is used for running Siri services. Facebook has Tupperware, a Borg-like system for scheduling containers on a cluster.

AWS has ECS (EC2 Container Service) for managing jobs running on clusters. ECS has a state management system that runs Paxos to ensure a consistent and highly available view of the cluster state. (similar to the Borgmaster process). Instead of one scheduler, ECS employs distributed schedulers each interacting with the state management system. Each scheduler is responsible for a separate set of workers in order to avoid too many conflicts in scheduling decisions.

Microsoft has the Autopilot system for automating software provisioning, deployment, and system monitoring. Microsoft also uses the Apollo system   for scheduling which tops-off workers opportunistically with short-lived batch jobs to achieve high throughput, with the cost of causing (occasionally) multi-day queueing delays for lower-priority work.

Kubernetes is under active development by many of the same engineers who built Borg. Kubernetes builds/improves on Borg. In Borg, a major headache was caused due to using one IP address per machine. That meant Borg had to schedule ports as a resource coordinate with tasks to resolve port conflicts in the same machine. Thanks to the advent of Linux namespaces, VMs, IPv6, and software-defined networking, Kubernetes can take a more user-friendly approach that eliminates these complications: every pod and service gets its own IP address. Kubernetes is opensource.


Borg is all about scheduling computation but does not get into any data scheduling, transfer scheduling issues. Data (and data transfer) should also be treated as first class citizen in scheduling decisions, as with big data comes big costs and big delays. Wouldn't it be nice to have a data-scheduler/manager system collaborating with Borg help run a more efficient data center?

Thursday, April 23, 2015

Paper Summary: On the use of Clocks to Enforce Consistency in the Cloud

This paper is by Manuel Bravo, Nuno Diegues, Jingna Zeng, Paolo Romano, Luis Rodrigues, and appeared in IEEE Data Engineering Bulletin 2015.

The purpose of this paper is to revisit how the logical and physical clock concepts are applied in the context of developing distributed data store systems for the cloud and review the choice of clocks in relation to consistency/performance tradeoffs.

The use of clocks in weak consistency data stores 

Dynamo employs sloppy quorums and hinted hand-off and uses version vector (a special case of vector clocks) to track causal dependencies within the replication group of each key. A version vector contains one entry for each replica (thus the size of clocks grows linearly with the number of replicas). The purpose of this metadata is to detect conflicting updates and to be used in the conflict reconciliation function.
Here is a link to my Dynamo review.

COPS is a geo-replicated datastore and it assigns a scalar clock to each object. Clients maintain the last clock value of all objects read in the causal past. Updates piggyback their dependencies when being propagated to other data centers. When a data center receives an update propagated by another data center, it only makes it visible when its dependencies are satisfied. COPS provides a partial form of transactions called causally consistent read-only transactions which return versions of the read objects that belong to a causally consistent snapshot. A two-round protocol implements these transactions. In the worst case the list of dependencies can grow and slow down the system.
Here is a link to my COPS review.

The GentleRain protocol aims to reduce the metadata piggybacked on updates propagation and to eliminate dependency checking procedures. The idea is to only allow a data center to make a remote update visible once all partitions (within the data center) have seen all updates up to the remote update time stamp. Thus, a client that reads a version is automatically ensured to read causally consistent versions in subsequent reads without the need of explicitly checking dependencies or being forced to wait until a causally consistent version is ready. In other words, GentleRain shoehorns causality in to physical clocks by delaying updates.

ORBE uses vector clocks, organized as a matrix, to represent dependencies. The vector clock has an entry per partition and data center. Physical clocks are used for generating read snapshot times, and ORBE can complete read-only transactions in one round by relying on physical clocks.

The use of clocks in strong consistency data stores

Clock-SI assumes loosely synchronized clocks that only move forward, and provides Snapshot Isolation consistency, where read-only transactions read from a consistent (possibly multi-versioned) snapshot, and other transactions commit if no object written by them was also written concurrently. To ensure safety against clocks skews, Clock-SI introduces delays to read operations.  Here is a link to my Clock-SI review.

Google Spanner employs TrueTime (which employs GPS and atomic clocks), and provides a strong consistency property: external consistency, which is also known as strict serializability. To ensure safety against clock skews, Spanner also introduces delays to read operations, and also delays commits in update operations to provide strict serializability. Here is a link to my Spanner review.

Finally CockroachDB, an open-source clone of Spanner, employs Hybrid Logical Clocks (HLC) in order to serialize transactions and ensure Snapshot Isolation and Serializable Snapshot Isolation. Hybrid Logical Clocks (HLC) is my recent work in collaboration with Dr. Sandeep Kulkarni. HLC couples physical clock with a scalar logical clock in order to efficiently order causally related transactions whose uncertainty intervals overlap. Here is a link to our Hybrid Logical Clocks (HLC) work.

I am quoting from the "On the use of Clocks to Enforce Consistency in the Cloud" paper about HLC: "Unlike Spanner, CockroachDB does not assume the availability of specialized hardware to ensure narrow bounds on clock synchronization, but relies on conventional NTP-based clock synchronization that frequently imposes clock skews of several tens of milliseconds. HLC is hence particularly beneficial in this case, at it allows for ensuring external consistency across causally related transactions while sparing from the costs of commit waits."


The paper has an intriguing discussion section. It makes the observation that we do not fully understand the trade-offs between logical and physical clocks yet, and mentions that HLC is an interesting and promising approach to investigate these tradeoffs. It gives some comparisons of the above protocols to show that time (in terms of its precision and comprehensiveness) is a resource that can be a factor in the performance and consistency tradeoffs in distributed data stores. The paper also talks about the costs of totally-ordered versus concurrent operations in distributed datastores. I found that this discussion make similar points with my "distributed is not necessarily more scalable than centralized" post.

Use of clocks in distributed datastores for consistency/performance tradeoffs is certainly an interesting and fruitful research area nowadays.

So how does your favorite data store use clocks/version-stamps? How would changing to a different clock scheme affect performance versus consistency tradeoffs in that data store?

Earlier I had discussed about the use of clocks in Granola, and how upgrading to HLC can improve performance and throughput.

Wednesday, April 22, 2015

Paper summary: A Taxonomy of Partitioned Replicated Cloud-based Database Systems

This paper is by Divy Agrawal, Amr El Abbadi, and Kenneth Salem, and appeared in IEEE Data Engineering journal in 2015.

This paper proposes a taxonomy of large scale partitioned replicated transactional databases. Partitioned replicated means, the database is divided in partitions and the partitions are replicated across different sites as in Figure 1. The motivation for partitioning is scalability, and the motivation for replication is to enable high availability even when some of the replicas are down. For geo-replicated databases, sites are maintained at different datacenters/regions, although the paper surveys non-geo-replicated databases as well.

The taxonomy

The taxonomy is based on the relationship between transaction management and replica management. This paper considers transactions that provide one-copy serializability guarantee, where concurrent transactions behave as if they execute sequentially on a single database. For a partitioned database, it is necessary to coordinate the database partitions to enforce transactional (ACID) guarantees. In addition, in order to support replication, the database system must also synchronize the database replicas so that replication can be hidden from the application.

Figure 2 shows the proposed taxonomy. Replicated object systems is a single leaf. Replicated transactions systems further divide in to symmetric and asymmetric replicated systems.

Replicated object systems

Replicated object systems implement transaction management on top of replica management, which ensures that partitions are consistently replicated across sites. A prominent example of this category is Spanner, another is Granola (which is not geo-replicated).

If you like to refresh your knowledge of these systems, here is a link to my Spanner review, and here a link to my Granola review.

A tablet in Spanner is a collection of directories or fragments of directories. Tablets correspond to partitions of the database and they are replicated across sites. Spanner uses Paxos to synchronize the replicas of each partition across sites. Spanner uses a separate instance of Paxos, with a long-lived leader, for each partition.

To implement transactions (including multiple partition transactions) Spanner uses two-phase locking for concurrency control, and two-phase commit. The Paxos leader in each partition is responsible for participating in these protocols on behalf of that partition. It does so in much the same way that it would if its partition were not replicated, except that changes to the partition's state are replicated using Paxos instead of just being stored locally at the leader's server. The leaders serve as the link between the transaction protocols and Paxos by ensuring that both database updates and changes in the state of the transaction are replicated.

Replicated transaction systems

Replicated transaction systems implement replica management on top of transaction management. Transactions run over individual partition replicas, rather than one logical partition as was the case in replicated object systems.

Each site has a local transaction manager that ensures that its local transactions have ACID properties with respect to other local transactions at that site. Each local transaction is responsible for applying the effects of its parent global transaction to the replicas at its own local site.

In symmetric replicated transaction systems, all of the local transactions of a given parent transaction are the same and run concurrently at the different sites. In the UCSB's replicated commit protocol the global transaction will be committed if the local coordinators at a majority of the sites vote to commit it. In return a client that wishes to read data from a partition sends its read request to all replicas of that partition, waits for a majority of the replicas to respond, and chooses the latest version that it receives from that majority. (Similar to the idea in Attiya, Bar-Noy, and Dolev, 1995.)

In an asymmetric replicated system, one local master transaction runs first at a single site. If that local transaction is able to commit, then the remaining local transactions are run at the other sites. These remaining transactions are called update propagation transactions. Typically, the update propagation transactions perform only the updates of the master transaction, and do not perform any reads.

An example of an asymmetric replicated primary copy system is Microsoft's Cloud SQL Server, which is the database management system behind the SQL Azure cloud relational database service. Cloud SQL server is not designed for geo-replication, so all of database replicas ("sites") are located in the same datacenter. Transactions are limited to a single partition unless they are willing to run at the read committed SQL isolation level.

An example of an asymmetric replicated update anywhere system is Google Megastore, where database partitions (called entity groups) are replicated and geographically distributed. In Megastore, a client can initiate a single-partition transaction at any replica of that partition --typically, the client will use a nearby replica. For each partition, Megastore manages a transaction log, which is replicated to all sites using Paxos to ensure that transactions commit in the same order everywhere. Here is a link to my Megastore review.

I guess Yahoo PNUTS could be considered somewhere between primary copy and update anywhere system due to its per-record master scheme.

What's next?

This taxonomy is useful for thinking about the cloud-based transactional database systems in a more systematic way. So where does your favorite transactional distributed database system fit? Are there inherent limitations/strengths to one category over another? Is it possible to have efficient multi-partition transaction capability for asymmetric replicated transaction systems?

Saturday, April 18, 2015

GraphX: Graph processing in a distributed dataflow framework

This paper appeared in OSDI'14, and is authored by Joseph E. Gonzalez, University of California, Berkeley; Reynold S. Xin, University of California, Berkeley, and Databricks; Ankur Dave, Daniel Crankshaw, and Michael J. Franklin, University of California, Berkeley; Ion Stoica, University of California, Berkeley, and Databricks. This link includes video and slides which are useful to understand the paper.

This paper comes from the AMP lab at UC Berkeley. (Nice name! AMP stands for Algorithms, Machines, and People.) This lab brought to us Spark, GraphLab, PowerGraph. And this paper is a logical successor. This paper is about marrying Spark (dataflow systems) with GraphLab (graph-processing systems).


Here is the motivation for this merger. In large-scale computation, we need both dataflow processing and graph processing systems. Graph-processing systems outperform dataflow processing systems by an order of magnitude for iterative computations on graphs (e.g., connected-component analysis, PageRank analysis). Unfortunately, it is very cumbersome to use two different tools and convert data back and forth between the two. The pipeline becomes very inefficient.

The paper sees an opportunity to unify the two tools (using a narrow-waist data/graph representation in the form of mrTriplets) and provide a single system to address the entire analytics pipeline.

GraphX is actually a thin abstraction layer on top of Spark that provides a conversion from graph computation to dataflow operations (Join, Map, GroupBy). During this reduction from graph computation to dataflow patterns, GraphX applies optimizations based on lessons learned in earlier work on efficient graph-processing (e.g., GraphLab).


GraphX introduces a range of optimizations.

As the programming abstraction GraphX introduces a normalized representation of graphs logically as a pair of vertex and edge property collections. This is called the triplet view.
The GroupBy stage gathers messages destined to the same vertex, an intervening Map operation applies the message sum to update the vertex property, and the Join stage scatters the new vertex property to all adjacent vertices.  This allows GraphX to embed graphs in a distributed dataflow framework. Flexible vertex-cut partitioning is used to encode graphs as horizontally partitioned collections and match the state of the art in distributed graph partitioning.

Here vertex mirroring approach substantially reduces communication for two reasons. First, real-world graphs commonly have orders of magnitude more edges than vertices. Second, a single vertex may have many edges in the same partition, enabling substantial reuse of the vertex property.

As another optimization learned from graph-processing systems, GraphX performs active vertices tracking. In graph algorithms, as algorithm converges, the set of active vertices shrink significantly, and this optimization avoids, wasteful work. GraphX tracks active vertices by restricting the graph using the subgraph operator. The vertex predicate is pushed to the edge partitions, where it can be used to filter the triplets.

GraphX programming

While graph-processing systems, and most famously Pregel, advocated a "think like a vertex" approach to programming, the GraphX programming model is closer to thinking about transformations on data. This may require some getting used to for programmers not familiar with dataflow programming and database operations.


Comparison to Naiad

If you are familiar with the Naiad project, you might be thinking: "Well, Naiad solves the unified general purpose dataflow & graph processing problem and throws in stream-processing and dynamic graphs for good measure". (GraphX does not support dynamic graphs.) So, what are the contributions differences in GraphX over Naiad?

I am new to the dataflow systems domain, and don't know enough to give a more authoritative answer. The contributions in GraphX may be mostly in the idea and academic contributions form. I think the idea of representing graph computations back to dataflow systems is nice. Unfortunately the GraphX paper does not compare with Naiad in terms of performance. And, after the OSDI presentation, there were couple questions/complaints about this point.

GitHub page of the GraphX project

GraphX is available as opensource on GitHub.

Thursday, April 16, 2015

All file systems are not created equal: On the complexity of crafting crash-consistent applications

This paper appeared in OSDI'14 and is authored by Thanumalayan Sankaranarayana Pillai, Vijay Chidambaram, Ramnatthan Alagappan, Samer Al-Kiswany, Andrea C. Arpaci-Dusseau, and Remzi H. Arpaci-Dusseau at University of Wisconsin–Madison.

A previous OSDI'14 paper we discussed had said almost every failure is due to bad exception/error-handling. But this paper shows that even when you divine the correct error-handling/recovery code, it may still not work. The layering abstraction leaks, and the filesystem underneath may do funny things in a crash.

The paper considers an important and timely problem, because many important applications, including databases such as SQLite and key-value stores such as LevelDB, are currently implemented on top of file systems instead of directly on raw disks. Such data-management applications must be crash consistent, but achieving this goal atop modern file systems is challenging because the exact guarantees provided by file systems are unclear and underspecified.

The paper defines persistence (a better term would be consistent-persistence) as a combination of two properties: atomicity and ordering (external linearizability). Figure 2 gives an example of how persistence can be violated by a crash.

From Table 1, we observe that persistence properties vary widely among file systems, and even among different configurations of the same file system. The order of persistence of system calls depends upon small details like whether the calls are to the same file or whether the file was renamed. The datajournal configuration of the filesystems are pretty solid, but they incur an overhead in terms of performance as well.

In order to analyze application-level protocols and detect crash vulnerabilities, the authors build ALICE framework. (ALICE is available as opensource here.) ALICE detects 60 vulnerabilities in total for the 11 applications analyzed, with 5 resulting in silent failures, 12 in loss of durability, 25 leading to inaccessible applications, and 17 returning errors while accessing certain data. ALICE is also able to detect previously known vulnerabilities.

The paper is easy to read and follow. And the conference presentation does a good job of explaining the paper in an accessible manner.


Is this paper being too alarmist? If we allow our system to recover to an earlier state instead of the most recent state at crash time, would that enable us to circumvent these crash-consistency problems? (Let's say we define "earlier state" as occuring in the past enough to be successfully flashed to the filesystem state.) Even that approach may fail if the most recent state at the moment of crash overwrites it inconsistently, which would corrupt it. So there is a reason to be alarmed!

But if we use a journaling approach (e.g., an append-only log approach) to writing the critical recovery states, this problem can be avoided. I guess a write-once style storage for critical state can be implemented even at the application-level. But again we pay a cost for fault-tolerance. If you take this to an extreme (to be able to recover everything), you implement the datajournal configuration of the filesystem at the application level.

This paper provides some motivation for the self-stabilization approach. If it is hard to enforce consistency, then always be converging to the consistent states. That is what the stabilization approach prescribes.

Monday, March 30, 2015

Facebook's Mystery Machine: End-to-end Performance Analysis of Large-scale Internet Services

This paper appeared in OSDI'14, and is authored by Michael Chow, University of Michigan; David Meisner, Facebook, Inc.; Jason Flinn, University of Michigan; Daniel Peek, Facebook, Inc.; Thomas F. Wenisch, University of Michigan.

The goal of this paper is very similar to that of Google Dapper (you can read my summary of Google Dapper here). Both work try to figure out bottlenecks in performance in high fanout large-scale Internet services. Both work use similar methods, however this work (the mystery machine) tries to accomplish the task relying on less instrumentation than Google Dapper. The novelty of the mystery machine work is that it tries to infer the component call graph implicitly via mining the logs, where as Google Dapper instrumented each call in a meticulous manner and explicitly obtained the entire call graph.

The motivation for this approach is that comprehensive instrumentation as in Google Dapper requires standardization....and I am quoting the rest from the paper:
[Facebook systems] grow organically over time in a culture that favors innovation over standardization (e.g., "move fast and break things" is a well-known Facebook slogan). There is broad diversity in programming languages, communication middleware, execution environments, and scheduling mechanisms. Adding instrumentation retroactively to such an infrastructure is a Herculean task. Further, the end-to-end pipeline includes client software such as Web browsers, and adding detailed instrumentation to all such software is not feasible.

While the paper says it doesn't want to interfere with the instrumentation, of course it has to interfere to establish a minimum standard in the resulting collection of individual software component logs, which they call UberTrace. (Can you find a more Facebooky name than UberTrace---which the paper spells as ÜberTrace, but I spare you here---?)
UberTrace requires that log messages contain at least:
1. A unique request identifier.
2. The executing computer (e.g., the client or a particular server)
3. A timestamp that uses the local clock of the executing computer
4. An event name (e.g., "start of DOM rendering").
5. A task name, where a task is defined to be a distributed thread of control.
In order not to incur a lot of overhead, UberTrace uses a low sampling rate of all requests to Facebook. But this necessitates another requirement on the logging:
UberTrace must ensure that the individual logging systems choose the same set of requests to monitor; otherwise the probability of all logging systems independently choosing to monitor the same request would be vanishingly small, making it infeasible to build a detailed picture of end-to-end latency. Therefore, UberTrace propagates the decision about whether or not to monitor a request from the initial logging component that makes such a decision through all logging systems along the path of the request, ensuring that the request is completely logged. The decision to log a request is made when the request is received at the Facebook Web server; the decision is included as part of the per-request metadata that is read by all subsequent components. UberTrace uses a global identifier to collect the individual log messages, extracts the data items enumerated above, and stores each message as a record in a relational database.

The mystery machine

To infer the call graph from the logs, the mystery machine starts with a call graph hypothesis and refines it gradually as each log trace provides some counterexample. Figure 1 and Figure 2 explain how the mystery machine generates the model via large scale mining of UberTrace.

For the analysis in the paper, they use traces of over 1.3 million requests to the Facebook home page gathered over 30 days. Was the sampling rate enough, statistically meaningful? Figure 3 says yes.

We know that for large scale Internet services, a single request may invoke 100s of (micro)services, and that many services can lead to 80K-100K relationships as shown in Figure 3. But it was still surprising to see that it took 400K traces for the call graph to start to converge to its final form. That must be one heck of a convoluted spaghetti of services.


The mystery machine analysis is performed by running parallel Hadoop jobs.

Figure 5 is why critical path identification is important. Check the ratios on the right side.

How can we use this analysis to improve Facebook's performance?

As Figure 9 showed, some users/requests have "slack" (another technical term this paper introduced). For the users/requests with slack, the server time constitutes only a very small fraction of the critical path, which the network- and client-side latencies dominate.

And there are also a category of users/requests with no slack. For those, the server time dominates the critical path, as the network- and client-side latencies are very low.

This suggests a potential performance improvement by offering differentiated service based on the predicted amount of slack available per connection:
By using predicted slack as a scheduling deadline, we can improve average response time in a manner similar to the earliest deadline first real-time scheduling algorithm. Connections with considerable slack can be given a lower priority without affecting end-to-end latency. However, connections with little slack should see an improvement in end-to-end latency because they are given scheduling priority. Therefore, average latency should improve. We have also shown that prior slack values are a good predictor of future slack [Figure 11]. When new connections are received, historical values can be retrieved and used in scheduling decisions. Since calculating slack is much less complex than servicing the actual Facebook request, it should be feasible to recalculate the slack for each user approximately once per month.

Some limitations of the mystery machine 

This approach assumes that the call graph is acyclic. With their request id based logging, they cannot handle the same event, task pair to appear multiple times for the same request trace.

This approach requires normalizing/synchronizing local clock timestamps across computers. It seems like they are doing offline post-hoc clock synchronization by leveraging the RPC calls. (Does that mean further instrumentation of the RPC calls?)
Since all log timestamps are in relation to local clocks, UberTrace translates them to estimated global clock values by compensating for clock skew. UberTrace looks for the common RPC pattern of communication in which the thread of control in an individual task passes from one computer (called the client to simplify this explanation) to another, executes on the second computer (called the server), and returns to the client. UberTrace calculates the server execution time by subtracting the latest and earliest server timestamps (according to the server's local clock) nested within the client RPC. It then calculates the client-observed execution time by subtracting the client timestamps that immediately succeed and precede the RPC. The difference between the client and server intervals is the estimated network round-trip time (RTT) between the client and server. By assuming that request and response delays are symmetric, UberTrace calculates clock skew such that, after clock-skew adjustment, the first server timestamp in the pattern is exactly 1/2 RTT after the previous client timestamp for the task.
This work also did not consider mobile users; 1.19 billion of 1.39 billion users are mobile users.

Related links

Facebook's software architecture

Scaling Memcache at Facebook

Finding a needle in Haystack: Facebook's photo storage

Google Dapper, a Large-Scale Distributed Systems Tracing Infrastructure

Wednesday, March 25, 2015

Paper review: "Simple Testing Can Prevent Most Critical Failures: An Analysis of Production Failures in Distributed Data-Intensive Systems"

This paper appeared in OSDI'14 and is written by Ding Yuan, Yu Luo, Xin Zhuang, Guilherme Renna Rodrigues, Xu Zhao, Yongle Zhang, Pranay U. Jain, and Michael Stumm, University of Toronto.

While the title mentions "analysis of production failures", the paper is not concerned/interested in root cause analysis of failures, instead, the paper is preoccupied with problems in error handling that lead to failures.

I enjoyed this paper a lot. The paper has interesting counterintuitive results. It opens to discussion the error handling issue, and particularly the Java exception handling. Digesting and interpreting the findings in the paper will require time. To contribute to this process, here is my take on the findings ---after a quick summary of the paper.

The setup and terminology 

The paper studied 198 randomly sampled user-reported failures of 5 opensource data-intensive distributed systems: Cassandra, HBase, Hadoop Distributed File System (HDFS), Hadoop MapReduce, and Redis. Table 1 shows how the sampling is done. (As a superficial observation, it seems like HBase is very buggy where 50% of sampled failures are catastrophic, and Cassandra is well engineered.) Almost all of these systems are written in Java, so Java's exception-style error handling plays a significant role in the findings of the paper.

Definitions: A fault is the initial root cause, which could be a hardware malfunction, a software bug, or a misconfiguration. A fault can produce abnormal behaviors referred to as errors, such as Java exceptions. Some of the errors will have no user-visible side-effects or may be appropriately handled by software; other errors manifest into a failure, where the system malfunction is noticed by end users or operators.

The paper defines catastrophic failure as "failures where most or all users experience an outage or data loss". Unfortunately this is a very vague definition. Table 2 provides some example categories for catastrophic failures considered.


Table 3 shows that single event input failures are relatively low, this is probably because these systems are well-tested with unit tests as they are heavily used in production. On the other hand, the predominating case is where 2 events conspire to trigger the failures.

From Table 4, what jumps up is that starting up services is particularly problematic. Incidentally, in airplanes most fatal accidents occur during the climbing stage.

Table 5 shows that almost all (98%) of the failures are guaranteed to manifest on no more than 3 nodes, and 84% will manifest on no more than 2 nodes. For large-scale distributed systems, 3 nodes being sufficient to manifest almost all failures seems surprisingly low. Of course, this paper looks at data-intensive distributed systems, which may not be representative of general distributed systems. In any case, these numbers don't surprise me as they agree with my experience using TLA+ in verifying distributed algorithms.

Deterministic failures totaling at 74% of all failures is good news. Deterministic failures are low-hanging fruit, they are easier to fix.

Catastrophic failures

Figure 5 shows a break-down of all catastrophic failures by their error handling. Based on this figure, the paper claims that "almost all (92%) of the catastrophic system failures are the result of incorrect handling of non-fatal errors explicitly signaled in software".

But, it seems to me that this provocative statement is due to a broad/vague definition of "incorrect error handling". If you use a broad/vague definition of "incorrect landing", you can say that every catastrophic airplane failure is an incorrect landing problem. Java casts everything into an error exception, then every fault materializes/surfaces as an exception. But, does that mean if we do a good job on exception handling, there will be almost no catastrophic failures? That is an incorrect assumption. Sometimes the correction required needs to be invasive (such as resetting nodes) and the correction also counts as a catastrophic failure.

And how can we do a good job on error handling? The paper does not provide help on that. Correct error-handling is very hard: You need a lot of context and a holistic understanding of the system, and that is probably why error-handling has been done sloppily in the systems studied.

The paper also claims: "in 58% of the catastrophic failures, the underlying faults could easily have been detected through simple testing of error handling code." I can agree with the 35%, as they consist of trivial mistakes in exception handling, such as an empty error handling block. But for including the other 23% labeling them as easily detectable, I think we should exercise caution and not rule out the hindsight bias.

To prevent this 23% failures, the paper suggests 100% statement coverage testing on the error handling logic. To this end, the paper suggests that reverse engineering test cases that trigger them. At the end of the day, this boils to thinking hard to see how this can be triggered. But how do we know when to quit and when we got enough? Without a rule, this can get cumbersome.

Figure10 shows an example of the 23% easily detectable failures. That example still looks tricky to me, even after we exercise the hindsight advantage.

Speculation about the causes for poor error-handling

Maybe one reason for poor error-handling in the studied systems is that features are sexy, but fault-tolerance is not. You see features, but you don't see fault-tolerance. Particularly, if you do fault-tolerance right, nobody notices it.

But, that is a simplistic answer. After all, these systems are commonly used in production, and they are well-tested with the FindBugs tool, unit tests, and fault injection. Then, why do they still suck so badly at their exception-handling code? I speculate that maybe there is a scarcity of "expert generalists", developers that understand the project as a whole and that can write error-handling/fault-tolerance code.

Another reason of course could be that developers may think a particular exception will never arise. (Some comments in the exception handlers in the 5 systems studied hint at that.) That the developers cannot anticipate how a particular exception can be raised doesn't mean it won't be raised. But, maybe we should include this case within the above case that says there is a scarcity of fault-tolerance experts/generalists in these projects.

"Correct" error handling

While the paper pushes for 100% error handling coverage,  it doesn't offer techniques for "correct" error handling. Correct error handling requires root cause analysis, a holistic view of the system, and fault-tolerance expertise.

So there is no silver bullet. Exceptions in Java, and error handling may facilitate some things, but there is no shortcut to fault-tolerance. Fault-tolerance will always have a price.

The price of reliability is the pursuit of the utmost simplicity.
It is a price which the very rich find most hard to pay.
C.A.R. Hoare

Possible future directions

Since 2-input triggered corruptions are predominant, maybe we should consider pairwise testing in addition to unit testing to guard against them. Unit testing is good for enforcing a component's local invariants. Pairwise testing can enforce an interaction invariant between two components, and guard against 2-input triggered corruptions.

I think the main reason it is hard to write "correct error handling" code is that the exception handler doesn't have enough information to know the proper way to correct the state. This is still futuristic, but if we had an eidetic system, then when exceptions hit, we could call the corrector, which can do a root-cause analysis by doing a backwards query on the eidetic system, and after determining the problem could do a forward query to figure out what are precisely the things that need to be fixed/corrected as a result of that fault.

Thursday, March 12, 2015

My crazy proposal for achieving lightweight distributed consensus

Distributed consensus is a hard problem. See some of my previous posts for discussion about impossibility results on distributed consensus.
Paxos taught
Perspectives on the CAP theorem
Attacking Generals problem

The reason distributed consensus is hard is because the parties involved don't have access to the same knowledge (the same point of view) of the system state.  Halpern's work on knowledge and common knowledge in distributed systems provides a useful framework to explain this.  Here is a summary of common knowledge paper by Halpern, and here is a nice talk on that paper.

Of course, we have distributed consensus solutions, Paxos, ZooKeeper/ZAB, that ensure safety always and provide progress whenever the system conditions step outside the impossibility results territory (with respect to synchrony, channel reliability/reachability, and number of up nodes). But these solutions come with a performance cost, as they need serialization of all update requests from a single Paxos leader, and acknowledgement of these updates by a majority quorum of the Paxos replicas. In ZooKeeper non-leader replicas can also serve reads locally, but the updates still need to get serialized by the leader. Some Paxos variants such as Mencius, ePaxos try to relieve this problem, but the bottom-line is they are still subject to the same fundamental performance limitations due to serialization at a single leader.

I have this crazy idea for circumventing the impossibility results of distributed consensus as well as improving the performance of distributed consensus. The idea is to connect the nodes involved in consensus (let's call these nodes coordinators) with a single-collision-domain Ethernet bus in order to solve the asymmetric knowledge problem.

No, this setup does not assume reliable communication. There can be collisions in the Ethernet bus. But the collisions will be total collisions since this is a shared Ethernet bus. When there is a collision, none of the coordinators would deliver the message, including the sender. Since Ethernet on a shared bus is CSMA-CD, the transmitter also detects the collision and does not accept its own message into the consensus log if that is the case.

So, in effect, the shared Ethernet bus performs the serialization of the coordinators' proposals. As a result, a coordinator proposing an operation does not need to collect explicit acknowledgement from any other coordinator let alone from a majority quorum of coordinators. This makes this consensus algorithm very lightweight and fast.

This system is masking fault tolerant to coordinator crashes until at least one coordinator left remaining. (If we are to allow reintegrating coordinators recovering back from crash, things get complicated of course. Then we would need to assume reasonable churn to allow time for recovering coordinators to catch up before they can be integrated. This would also require fast one broadcast consensus on the reconfigurations.)

That is it. That simple. Now comes the algorithmician's apology.

On the theory side, I know I am not suggesting anything novel. The impossibility results withstand; I just changed the system conditions and stepped outside the territory of the impossibility results (that is why I used the term "circumvent"). In fact, I had noticed this idea first in the context of wireless broadcast and wireless sensor networks, when I was a post-doc at Nancy Lynch's group at MIT. We had published papers exploring the concept for wireless broadcast with total and partial collisions.

On the practical side, I know this proposal has downsides. This is not readily applicable as it requires the Ethernet driver to expose collision information to the application layer. This requires setting up an auxiliary Ethernet LAN across the coordinators. And, yes, I know this doesn't scale outside a LAN. (The coordinators should be connected by the single domain Ethernet bus, but the clients to the coordinators may communicate to the coordinators over TCP/IP and need not be in the same LAN. The coordinators can be located across different racks to guard against rack-wide crashes.)

But every system design is an exercise in determining which tradeoffs you make. The important question is: Is this tradeoff worth exploring? Would the performance improvement and simplicity stemming from this setup makes this a reasonable/feasible option for solving the distributed consensus problem at the datacenter level?

Tuesday, March 10, 2015

Eidetic systems

This paper appeared in OSDI'14. The authors are all from University of Michigan: David Devecsery, Michael Chow, Xianzheng Dou, Jason Flinn, and Peter M. Chen.

This paper presents a transformative systems work, in that it introduces a practical eidetic system implementation on a Linux computer/workstation. This paper is a tour de force: It undertakes a huge implementation effort to implement a very useful and novel eidetic memory/system service. The authors should be commended for their audaciousness.

An eidetic computer system can recall any past state that existed on that computer, including all versions of all files, the memory and register state of processes, interprocess communication, and network input. An eidetic computer system can explain the lineage of each byte of current and past state. (This is related to the concept of data provenance, which I mention briefly at the end of my review.)


One use case for an eidetic system is to track where/how erroneous information entered to the system. The paper considers tracking down a faulty bibtex reference as a case study. This is done using a backwards query. After tracking down the faulty bibtex reference you can then perform a forward query on the eidetic system, in order to figure out which documents are contaminated with this faulty information and to fix them.

Another use case for an eidetic system is to do postmortem of a hack attack and whether it leaked any important information. In the evaluation section, the paper uses as another case study the heartbleed attack, which occurred during time the authors were testing/evaluating their eidetic system implementation.

With a good GUI for querying, the eidetic system concept can enhance the Mac OSX Time Machine significantly, with data lineage/provenance, backward querying, and forward querying/correction. This can augment time travel with analytics, and you can have a time machine on steroids. (Accomplishment unlocked: +100 points for serious use of time machine and time travel in writing.)

Design and implementation

The authors develop the eidetic system, Arnold, by modifying Linux kernel to record all nondeterministic data that enters a process: the order, return values, and memory addresses modified by a system call, the timing and values of received signals, and the results of querying the system time. Arnold and accompanying eidetic system tools (for replay, etc.) are available as opensource.
The key technologies that enable Arnold to provide the properties of an eidetic system efficiently are deterministic record and replay, model-based compression, deduplicated file recording, operating system tracking of information flow between processes, and retrospective binary analysis of information flow within processes.
Arnold uses deterministic record and replay, and trades storage for recomputation whenever possible. That is, Arnold only saves nondeterministic choices or new input and can reproduce everything else by recomputation. The major space saving technique Arnold uses is model based compression: Arnold constructs a model for predictable operations and records only instances in which the returned data differs from the model. Another optimization is copy on RAW (read-after-write) recording: "To deduplicate the read file data, Arnold saves a version of a file only on the first read after the file is written. Subsequent reads log only a reference to the saved version, along with the read offset and return code." These techniques enable Arnold to fit 4 years of desktop/workstation eidetic system into 4TB of off-the-shelf hard disk (which costs $150).

Querying and Replaying

Arnold uses the replay groups abstraction to perform storing and replaying efficiently. Replay groups consist of frequently communicating processes which can be replayed independently of any other group. Arnold employs "Pin" binary instrumentation to analyze replayed executions and track the lineage of data within a replay group. Inter process communication is tracked with the help of a dependency graph which keeps track of the communications between different replay groups. Bundling frequently communicating processes into a group ensures that a large number of conversations need not be recorded to the dependency graph. As such selection of replay group (and replay group size) gives rise to a tradeoff between storage efficiency and query efficiency. It would be nice if the paper provided the replay groups it used in Arnold as a table. This information would be useful to understand the replay groups concept better.

Arnold records even user propagated lineage, such as a user reading a webpage and entering text into an editor as a result. (Of course this leads to introducing some false positives, as it needs to be done speculatively.) Tracking this actually required a lot of work: "Understanding GUI output turned out to be tricky, however, because most programs we looked at did not send text to the X server, but instead sent binary glyphs generated by translating the output characters into a particular font. Arnold identifies these glyphs as they are passed to standard X and graphical library functions. It traces the lineage backward from these glyphs using one of the above linkages (e.g., the index linkage)."

Finally, for the querying of Arnold, the paper has this to say. "A backward query proceeds in a tree-like search, fanning out from one or more target states. The search continues until it is stopped by the user or all state has been traced back to external system inputs. As the search fans out, Arnold replays multiple replay groups in parallel. In addition, if no lineage is specified, it may test multiple linkages for the same group in parallel, terminating less restrictive searches if a more restrictive search finds a linkage."

Unfortunately, user-friendly GUI-based tools for querying is not available yet. That would be asking too much from this paper which already packed a lot of contributions into a single publication. The evaluation section gives some results about backward and forward querying performance in Arnold.

Related work on data provenance

Data provenance is a topic which has been studied as part of the database field traditionally. However, recent work on data provenance started considering the problem of capturing provenance for applications performing arbitrary computations (not resricted to a small set of valid transformations in database systems). The paper "A primer on provenance" provides a nice accessible survey of data provenance work.

Future work

This paper presents an eidetic system on a single computer. An obvious future direction is to enable building an eidetic distributed system. By leveraging Arnold, such a system also seems to be in reach now. Our work on hybrid logical clocks can also help here by relating and efficiently tracking causality across distributed nodes running Arnold. Since our hybrid logical clocks can work with loosely synchronized time (a la NTP), and is resilient to uncertainty (it enables efficient tracking of causality without blocking for synchronization uncertainties), it can be adopted for implementing a distributed eidetic system in practice.

A remaining kink for a distributed eidetic system could be the cost of querying. Querying and replay is already slow and hard for a single eidetic system, and it is likely to become more complicated for a distributed system since coordination of replay is needed across the machines involved in the replay.

Thursday, March 5, 2015

Extracting more concurrency from Distributed Transactions

This paper appeared in OSDI'14. The authors are: Shuai Mu, Yang Cui, Yang Zhang, Wyatt Lloyd, Jinyang Li. The paper, presentation slides and video are accessible here.

The paper proposes a concurrency control protocol for distributed transactions, and evaluates its performance comparing with two-phase locking (2PL) and optimistic concurrency control (OCC).

The protocol

The protocol introduced, ROCOCO (ReOrdering COnflicts for COncurrency) is targeted for extracting more concurrency under heavily contended workload than 2PL and OCC can handle. In fact, ROCOCO's improvements over OCC and 2PL comes after the peak throughput point of even ROCOCO. One of the questions after the OSDI presentation was about this. "That region where the system is thrashing is not a region you want to be. Why would you not employ admission control to refuse extra workload that pushes the system past peak performance?"
ROCOCO assumes that a distributed transaction consists of a set of stored procedures called pieces. Each piece accesses one or more data items stored on a single server using user-defined logic. Thus, each piece can be executed atomically with respect to other concurrent pieces by employing proper local concurrency control.

ROCOCO achieves safe interleavings without aborting or blocking transactions using two key techniques: 1) deferred and reordered execution using dependency tracking; and 2) offline safety checking based on the theory of transaction chopping.

ROCOCO's transaction reordering idea is adopted from the ePaxos protocol introduced a couple years ago. This is a neat idea. The first phase is sort of like a dry run for the transaction. Dependencies with other concurrently executing transactions are learned. In the second phase, the dependent transactions are synchronized. They are forced to wait for each other and executed that way.

This approach is basically pipelining the transactions. This is also similar to what the Calvin does with its log-based approach. Pipelining helps for throughput of course, but it also introduces a drawback.

Unlike 2PL and OCC which executes a depended-upon transaction to completion before allowing its dependent/conflicting transactions to execute, ROCOCO is deciding on an order and pipelining the execution of these conflicting transactions in some determined order. However, if the first transaction of these pipeline-executed transactions does not complete for some reason or due to a fault and needs to be rolled back, this also requires rolling-back the remaining transactions in the pipeline that depended on this transaction. This is a problem 2PL and OCC did not have.

This basic reordering protocol is when some of the transaction pieces/fragments are deferrable. For transaction fragments that are immediate (whose outputs are inputs to other pieces in a transaction), the reordering protocol is inapplicable, and the paper uses an offline checker to avoid conflicts in such situations.  The Offline checker works in following steps:
1. It constructs an S-C graph based on transaction chopping. Each edge in the graph is either a Sibling edge (an edge formed for pieces of same transaction instance) or a Conflict edge (an edge formed by pieces which access the same database table and any one of the piece issues a write).
2. Each vertex in the graph is either tagged as immediate (I) or deferrable (D). A conflict edge  can be an I-I edge or a D-D edge.
3. The checker observes all the S-C cycles formed by the graph. SC-cycles represent potential non-serializable interleavings. However, if an SC-cycle contains at least one D-D edge, ROCOCO can reorder the execution of the D-D edge's pieces to break the cycle and ensure serializability. For an unreorderable SC-cycle with all I-I C-edges, the checker proposes to merge those pieces in the cycle belonging to the same transaction into a larger atomic piece. ROCOCO relies on traditional distributed concurrency control methods such as 2PL or OCC to execute merged pieces.


ROCOCO is  implemented as an in-memory key-value store with 20K C++ code  and evaluated using a scaled TPC-C benchmark in comparison to OCC and 2PL. Given that Calvin  is a closely related work (because it also orders transactions and pipelines their execution), it would be good to see a comparison to Calvin, but the paper does not include that.

Some miscellaneous thoughts

In the introduction, the following paragraph is provided as a motivation for ROCOCO.
Unfortunately, contention is not rare in large-scale OLTP applications. For example, consider a transaction where a customer purchases a few items from a shopping website. Concurrent purchases by different customers on the same item create conflicts. Moreover, as the system scales—i.e., the site becomes more popular and has more customers, but maintains a relatively stable set of items—concurrent purchases to the same item are more likely to happen, leading to a greater contention rate.
The OSDI presentation also includes the same example as motivation. I don't think this is the right/best way to argue that contentions will happen, because this is a faux contention, and not an inherent contention. When the number of a sale item is very high (which is almost always the case), why do we care to carefully check the number of items remaining? Conflict-Free Replicated Data Types (CRDT) approach helps here to avoid conflicts easily. (For some nice papers on this see: CRDT1, CRDT2, CRDT3) The coordination avoidance in distributed databases paper also argues for  avoiding coordination when all local commit decisions are globally valid (in other words, when the commit decisions are composable).

Sunday, March 1, 2015

My trick for coordinating Dropbox collaborations

There are high-tech solutions to managing the collaborative paper writing process, such as using version control systems like CVS or git. The downside to these systems is that when you are collaborating on a paper with a low-tech author, it is cumbersome to get them on board with these tools. Some of my collaborators have been from a non-CS background. And even some CS-background collaborators refuse to deal with cryptic version control commands and error messages.

Dropbox provides a simple easy-to-use solution to sharing files which helps  teams to collaborate on projects, including collaborating on a paper. However, Dropbox does not have access control and cannot coordinate concurrent accesses to the same file by multiple writers. This causes overwritten/lost updates and frustrated collaborators.

I have a low-tech solution to this problem. I have used this solution successfully with multiple collaborators over the last 2-3 years. Chances are that, you have also come up with the same idea. If not, feel free to adopt this solution for your collaborations. It helps.

Here is the solution. When I share a folder to collaborate with coauthors, I create a token.txt file in that folder. This file is there to coordinate who edits which file at any given time.

Below is a sample token.txt file contents. The first part explains the guidelines for coordination, the second part lists the files to collaborate on. We generally use one file to correspond to one section of the paper.

Check with token.txt file to see who is editing which file in order to avoid conflicts.
If you want to write:
+ pick a section/file,
+ reopen token.txt to avoid seeing old-state,
+ write your name next to that section/file at token.txt to get the lock/token,
+ quickly hit save on token.txt, and
+ start editing that section/file on your own pace.

When done writing:
+ delete your name for that section/file at token.txt. (I.e., Unlock that file.)

Don't keep unlocked files open in your editor, lest you inadvertently save them over a newer version your collaborator wrote. An editor that auto-saves may also cause that problem.
eval: ...........Ailidani

ALL FILES: This file is always with Murat! Treat this as read-only, don't edit.

The token.txt file acts as a message board to show who is working on which file. Of course there can be concurrent updates to token.txt, but this is less likely because you make a tiny edit and then quickly save this file. Moreover, many editors, including Emacs, notifies you immediately if the token.txt file has been changed during the time you keep it open.

(This is in essence similar to the use of RTS-CTS messages in wireless communications. Instead of having a more costly collision for a long-size data transfer, it is an acceptable tradeoff to have occasional collided/lost RTS messages, because those are very short transfers and are low-cost.)

Yes, this is a lazy low-tech solution and requires some discipline from collaborators, but the effort needed is minimal, and I haven't got any bad feedback from any collaborator on this. During the time I used this method, we never had a problem with concurrent editing on the same file. (I used this with teams of size 2-to-6, and only for collaborating on papers.) One problem, however, we occasionally had was with someone forgetting to remove a lock after he is done working on that file. In this case, we resort to email: "Are you still holding the lock on this file? When do you plan to release it?"

Notice the "ALL FILES" entry at the end. That is when one author may want to lock all the files for a short duration easily. This happens infrequently, but comes handy for restructuring the paper, making fast substitution edits across all files, or spell-checking the entire paper before submission.

You may also notice a entry at the bottom. That is an org-mode file where I keep my TODO items and notes about the paper. It is my lab-notebook for this writing project. I can't function in any project without my file.

PS: Geoffrey Challen suggests: "You could also replace Murat's token.txt file with a Google Drive document."