Wednesday, July 5, 2017

Paper Summary: Two decades of recommender systems at Amazon.com

This is a short article that appeared as a retrospective piece for the 2003 "Item-to-Item Collaborative filtering" paper as it was awarded a test-of-time award. This article is by Brent Smith and Greg Linden. 

I am not a machine-learning/data-mining guy, so initially I was worried I wouldn't understand or enjoy the article. But this was a very fun article to read, so I am writing a summary.

The item-based collaborative filtering is an elegant algorithm that changed the landscape of collaborative filtering which was user-based till then. User-based means "first search across other users to find people with similar interests (such as similar purchase patterns), then look at what items those similar users found that you haven't found yet". Item-based is based on the idea that "people who buy one item are unusually likely to buy the other." So, for every item i1, we want every item i2 that was purchased with unusually high frequency by people who bought i1.

The beauty of the approach is most of the computation is done offline. Once the related items table is built, we can generate recommendations quickly as a series of lookups. Moreover since the number of items sold is less than the users, this scales to better user numbers.

This was implemented for Amazon.com for recommending related products (mostly books at that time). Since 2003, item-based collaborative filtering has been adopted by YouTube and Netflix, among others.

Defining related items

This section was tricky and fun. Statistics is not a very intuitive area. At least for me. While reading this section I saw proposals to fix things, and thought they would work, and I was wrong. Twice.

To define related, we should define what it means for Y to be unusually-likely to be bought by X buyers. And for figuring this out, we should first figure out the reverse, what is the expected ratio that X buyers would buy Y if the two items were unrelated.

The straightforward way to estimate the number of customers, Nxy, who have bought both X and Y would be to assume X buyers had the same probability, P(Y) = |Y_buyers|/|all_buyers|, of buying Y as the general population and use |X_buyers| * P(Y) as the estimate, Exy, of the expected number of customers who bought both X and Y. In fact, the original 2003 algorithm had used this ratio.

But this ratio is misleading, because  for almost any two items X and Y, customers who bought X will be much more likely to buy Y than the general population. "Heavy buyers" are to blame for this situation. We have a biased sample. For any item X, customers who bought X (this set has many heavy buyers in it by definition) will be likely to have bought Y more than the general population.

Figure 1 shows how to account for this effect.


Now, knowing Exy, we can use it to evaluate whether Nxy, the observed number of customers who bought both X and Y, is higher or lower than randomly would be expected. For example, Nxy-Exy gives an estimate of the number of non-random cooccurrences, and [Nxy-Exy]/Exy gives the percent difference from the expected random co-occurrence.

In another surprise, neither of those work quite well. The first will be biased towards popular Ys, and the second makes it to easy for low-selling items to have high scores. The chi-square score, $[Nxy−Exy]/\sqrt{Exy}$ strikes the balance.

Extensions

The article talks about tons of extensions possible. Using the feedback data about user clicks on recommendations, it is possible to further tune the recommender. One should also take into account time of purchases, causality of purchases, compatibility of purchases. One should also account for aging the history and aging the recommendation as the user ages.

Worth noting was the observation that some items have more weight. They found that a single book purchase can say a lot about a customer's interests than an arbitrary product, letting them recommend dozens of highly relevant items.

For the future, the article envisions intelligent interactive services where shopping is as easy as a conversation, and the recommender system knows you as well as your spouse or a close friend.

Thursday, June 15, 2017

Paper Summary: DeepXplore, Automated Whitebox Testing of Deep Learning Systems

This paper was put on arxiv on May 2017, and is authored by Kexin Pei, Yinzhi Cao, Junfeng Yang, Suman Jana at Columbia and Lehigh Universities.

The paper proposes a framework to automatically generate inputs that trigger/cover different parts of a Deep Neural Network (DNN) for inference and identify incorrect behaviors.

It is easy to see the motivation for high-coverage testing of DNNs. We use DNN inference for safety-critical tasks such as self-driving cars; A DNN gives us results, but we don't know how it works, and how much it works. DNN inference is opaque and we don't have any guarantee that it will not mess up spectacularly in a slightly different input then the ones it succeeded. There are too many corner cases to consider for input based testing, and rote testing will not be able to cover all bases.

DeepXplore goes about DNN inference testing in an intelligent manner. It shows that finding inputs triggering differential behaviors while achieving high neuron coverage for DL algorithms can be represented as a joint optimization problem and solved efficiently using gradient-based optimization techniques. (Gradients of DNNs with respect to inputs can be calculated accurately and can be used to solve this joint optimization problem efficiently.)

DeepXplore also leverages multiple DL systems with similar functionality as cross-referencing oracles and thus avoid manual checking for erroneous behaviors. For example, use Uber, Google, Waymo for the driving video, and compare outputs. Majority voting determines the correct behavior. DeepXplore counts on a majority of the independently trained DNNs not to be susceptible to the bug. This is similar to N-version programming for building resilience against software bugs.

Here is DeepXplore workflow for generating test images. DeepXplore takes unlabeled test inputs as seeds and generates new test inputs that cover a large number of different neurons (i.e., activates them to a value above a customizable threshold) while causing the tested DNNs to behave differently.

Figure 6 shows how "gradient ascent" can be employed in this joint optimization problem. This is a walk up-hill towards less certain scoring, so it is a gradient-ascent, rather than a gradient-descent. Starting from a seed input, DeepXplore performs the guided search by the gradient in the input space of two similar DNNs supposed to handle the same task such that it finally uncovers the test inputs that lie between the decision boundary of these two DNNs. Such test inputs will be classified differently by the two DNNs.

The team implemented DeepXplore using Tensorflow 1.0.1 and Keras 2.0.3 DL frameworks. They used Tensorflow's implementation of gradient computations in the joint optimization process. Tensorflow also supports creating subDNNs by marking any arbitrary neuron's output as the subDNN's output while keeping the input same as the original DNN's input. They used this feature to intercept and record the output of neurons in the intermediate layers of a DNN and compute the corresponding gradients with respect to the DNN’s input. All the experiments were run on a Linux laptop with 16GB RAM. I guess since this is inference rather than training, a laptop sufficed for the experiments.

A criticism to the paper could be this. Yes, DeepXplore catches a bad classification on an image, that is good and useful. But probably the self-driving application already has built-in tolerance to occasional misclassifications. For example, the temporal continuity can help; previous images and next images correctly classify the road, so an interim misclassification would not be very bad. Moreover, application-specific invariants can also act as safety net, e.g., do not steer very sharp, and use a Kalman filter. It would be interesting to do evaluations also in an end-to-end application setting.

UPDATE (6/17/2018): I have received clarification from Yinzhi Cao, one of the authors, about these points. Here are his comments:

First, our light effect (or other changes) can be added constantly over the time domain, and thus DeepXplore should be able to fool the decision engine all the time.  That is, the previous images and next images will also lead to incorrect decisions.

Second, DeepXplore can ask the decision engine to gradually switch the steering so that a Kalman filter may not help.  For example, the switching from left to right or vice versa is not that sudden so that a Kalman filter cannot rule out the decision.

Wednesday, June 14, 2017

Scalability, but at what COST

This paper is by Frank McSherry, Michael Isard, Derek G. Murray and appeared in HotOS 2015. The authors are all listed as unaffiliated because this is around the time where Microsoft Research Silicon Valley lab was closed, where they used to work. Michael and Derek are at Google working on TensorFlow framework, but Frank McSherry is still at large and unaffiliated. Frank has a great blog, where you will learn more than you ever wanted to know about dataflow, Rust, differential privacy, and the art of influencing people and making friends. 

COST, defined per system for a problem, is the configuration required before the system outperforms a competent single-threaded implementation. They show that many big data systems have surprisingly large COST, often hundreds of cores.

Let's repeat this again: some single threaded implementations were found to be more than an order of magnitude faster than published results (at SOSP/OSDI!) for systems using hundreds of cores.

The paper's goal is to shed light on this issue so that "future research is directed toward distributed systems whose scalability comes from advances in system design rather than poor baselines and low expectations." (That has gotta be one of the snarkiest lines in a computer science paper. Well, discounting those from Dijkstra, that is.)


What does better baselines mean? It means using better graph layout and better algorithms for performance. The paper gives as an example the label propagation algorithm. The paper argues that label propagation is used for graph connectivity not because it is a good algorithm, but because it fits within the "think like a vertex" computational model, whose implementations scale well. The paper claims the appealing scaling properties are largely due to the algorithm's sub-optimality, as label propagation does more work than better algorithms.

Yes, and on the other hand, I can also see the appeal in the Giraph "think like a vertex" approach (or for that matter the MapReduce and Spark approaches). Giraph optimized for simplicity and ease-of-development. If you make it simple and easy to use, people will be happy to use it, adapt it, and throw it cluster resources when needed. One may argue this is a good tradeoff. Instead of letting people think harder and make programming harder for them, make it easy but wasteful on computing resources. After all, humans are much more expensive than computers, and scalability in terms of human cost is also a factor for practical/industrial systems. A similar argument for BlockChains has been made here, arguing social scalability is more important than computational-efficiency or even computational-scalability.

Of course this can be a false dichotomy, there are (and will be) systems/frameworks that provide both scalability in terms of human cost (by being easy-to-develop-with) and also computationally efficient. And we should strive to design such systems.

The analysis and evaluation was given/studied in the context of graph algorithms: pagerank and connected components. For embarrassingly parallel algorithms, such as SGD, this analysis and the results would not apply.


Here are Hacker News discussions on this paper.
https://news.ycombinator.com/item?id=11855594
https://news.ycombinator.com/item?id=8901983

Tuesday, June 13, 2017

Paper Summary: Neurosurgeon, collaborative intelligence between the cloud and mobile edge

This paper is by Yiping Kang, Johann Hauswald, Cao Gao, Austin Rovinski, Trevor Mudge, Jason Mars, and Lingjia Tang from University of Michigan, and appeared at ASPLOS 17.

In Deep Learning (DL), you have a long, computation-intensive training phase where you micro-fiddle/fudge the model parameters until you get desired accuracy. Then you deploy this optimized model parameters (i.e., the Deep Neural Network [DNN])for inference with real-world inputs. The paper is about this inference/serving layer of DL.

In the serving layer, the input goes through the DL with the tuned model parameters activating some subset of neurons at each layer and finally activating the correct neuron[s] at the output layer. This can still be a computation intensive process as the model has millions of parameters, and you apply matrix multiplication layer after layer. So this serving layer still has many juicy problems to work on.

A very relevant problem is that executing inference at the mobile can be slow because of the computational and energy limitations of the mobile. Executing at the cloud backend server is fast, but how do you get the input there? Uploading the input to the cloud can be slow, especially if the input is a large image and the connection is slow. So there is a tradeoff.

In Section 4, the paper shows how beneficial it can be to perform a proper DL inference partitioning. For image processing/computer vision (CV), e.g., AlexNet, partitioning at a middle layer is the most optimal for both latency and energy optimization. Since the input image is large (512Mb is used), uploading it to the cloud is both time and energy consuming. However, if you execute the convolutional layers followed by the pooling at the mobile, you reduce the size of the intermediate output data and it is time and energy efficient to upload this to the cloud. The rest of the computation, carried on the cloud, consists of processing fully connected layers, that are computation intensive. If we were to execute them also on the mobile, we would be waiting for the mobile CPU/GPU to finish execution, where as uploading the intermediate output to the cloud and executing the rest of the layers at the cloud finishes earlier.


The paper also finds that, for Automatic Speech Recognition (ASR) and Natural Language Processing (NLP) applications, usually the best approach is to execute everything at the mobile.

Enter Neurosurgeon

Are we done here then? Why do we need a neurosurgeon tool, if a static lookup can do the trick? At this point, the paper makes another argument. You can't just use this one time static observation per application class (CV, ASR, NLP) and be done with it. The best partition point for a DNN architecture depends on the DNN's topology, which manifests itself in the computation and data size variations of each layer. Moreover, the connectivity conditions are changing, so you need to monitor and adjust your decision with the current network quality.

(The paper also argues that changing cloud backend conditions are a factor, but I am not convinced with the datacenter can get busy/overloaded argument. The evaluation experiments for that part is done synthetically.)

The proposed system to address this problem, Neurosurgeon, consists of a deployment phase and a runtime system that manages the partitioned execution of an inference application. Figure 10 shows the design of Neurosurgeon.


As part of the deployment stage, Neurosurgeon runs once per mobile and server platform for producing performance prediction models. This is application and NN independent. It tries different NN layer types for these mobile and server platforms and estimates regression line wrt changing configuration parameters.

The runtime stage is where the Neurosurgeon uses the layer performance prediction models produced in the deployment stage to dynamically choose the best DNN partition models. Neurosurgeon analyzes the target DNN’s constituent layers, and uses the prediction models to estimate, for each layer, the latency on mobile and cloud, and power consumption on the mobile. As Algorithm 1 shows, this is a simple evaluation of the conditions to choose the partition point.



Figures 11 and 12 show results for latency and energy-efficiency improvements achieved using Neurosurgeon.




What about Maui?

Maui is a general smartphone to cloud offloading framework and appeared in MobiSys 2010. MAUI is control-centric, the partition points are at the procedure/method invocation layer, whereas the neurosurgeon is data-centric, at the NN layer layer.  While Maui requires runtime profiling of the application, Neurosurgeon makes decisions based on the DNN topology one-time deployment stage observations without requiring any runtime profiling.

Figure 13 shows the comparison results. The paper says: Maui makes incorrect offloading choices for more complicated scenarios (e.g., VGG, FACE, DIG and ASR). This is because Maui relies on past invocation of a certain DNN layer type to predict the latency and data size of the future invocations of that layer type, leading to mispredictions. But why don't we make Maui control points *per layer* method invocations? Instead of making Maui control points per layer type, if we made them per layer number method invocations things might improve for Maui.

Wednesday, May 24, 2017

Paper summary: Making sense of Performance in Data Analytics Frameworks (NSDI 15)

What constitutes the bottlenecks for big data processing frameworks? If CPU is a bottleneck, it is easy to fix: add more machines to the computation. Of course for any analytics job, there is some amount of coordination needed across machines. Otherwise, you are just mapping and transforming, but not reducing and aggregating information. And this is where the network and the disk as bottleneck comes into play. The reason you don't get linear speedup by adding more machines to an analytics job is the network and disk bottlenecks. And a lot of research and effort is focused on trying to optimize and alleviate the network and disk bottlenecks.

OK this sounds easy, and it looks like we understand the bottlenecks in big data analytics. But this paper argues that there is a need to put more work into understanding the performance of big data analytics framework, and shows that at least for Spark on the benchmarks and workloads they tried (see Table 1), there are some counter intuitive results. For Spark, the network is not much of a bottleneck: Network optimizations can only reduce job completion time by a median of at most 2%. The disk is more of a bottleneck than the network: Optimizing/eliminating disk accesses can reduce job completion time by a median of at most 19%. But most interestingly, the paper shows that CPU is often the bottleneck for Spark, so engineers should be careful about trading off I/O time for CPU time using more sophisticated serialization and compression techniques.


This is a bit too much to digest at once, so let's start with the observations about disk versus network bottlenecks in Spark. Since shuffled data is always written to disk and read from the disk, the disk constitutes more of a bottleneck than the network in Spark. (Here is a great Spark refresher. Yes RDDs help, but pipelining is possible only within a stage. Across the stages, shuffling is needed, and the intermediate shuffled data is always written to and read from the disk.)

To elaborate more on this point, the paper says: "One reason network performance has little effect on job completion time is that the data transferred over the network is a subset of data transferred to disk, so jobs bottleneck on the disk before bottlenecking on the network [even using a 1Gbps network]." While prior work has found much larger improvements from optimizing network performance, the paper argues that prior work focused mostly on workloads where shuffle data is equal to the amount of input data, which is not representative of typical workloads (where shuffle data is around one third of input data). Moreover the paper argues, prior work used incomplete metrics, conflating the CPU and network utilization. (More on this later below, where we discuss the blocked time analysis introduced in this paper.)

OK, now for the CPU being the bottleneck, isn't that what we want? If the CPU becomes the bottleneck (and not the network and the disk), we can add more machines to it to improve processing time. (Of course there is a side effect that this will in turn create more need for network and disk usage to consolidate the extra machines. But adding more machines is still an easy route to take until adding machines start to harm.) But I guess there is good CPU utilization, and not-so-good CPU utilization, and the paper takes issue with the latter. If you have already a lot overhead/waste associated with your CPU processing, it will be easier to speed up your framework by adding more machines, but that doesn't necessarily make your framework an efficient framework as it is argued in "Scalability, but at what COST?".

So I guess, the main criticisms in this paper for Spark is that Spark is not utilizing the CPU efficiently and leaves a lot of performance on the table.  Given the simplicity of the computation in some workloads, the authors were surprised to find the computation to be CPU bound. The paper blames this CPU over-utilization on the following factors. One reason is that Spark frameworks often store compressed data (in increasingly sophisticated formats, e.g. Parquet), trading CPU time for I/O time. They found that if they instead ran queries on uncompressed data, most queries became I/O bound. A second reason that CPU time is large is an artifact of the decision to write Spark in Scala, which is based on Java: "after being read from disk, data must be deserialized from a byte buffer to a Java object". They find that for some queries considered, as much as half of the CPU time is spent deserializing and decompressing data. Scala is high-level language and has overheads; for one query that they re-wrote in C++ instead of Scala, they found that the CPU time reduced by a factor of more than 2x.

It seems like Spark is paying a lot of performance penalty for their selection of Scala as the programming language. It turns out the programming language selection was also a factor behind the stragglers: Using their blocked time analysis technique, the authors identify the two leading causes of Spark stragglers as Java's garbage collection and time to transfer data to and from disk. The paper also mentions that optimizing stragglers can only reduce job completion time by a median of at most 10%, and in 75% of queries, they can identify the cause of more than 60% of stragglers.

Blocked time analysis

A major contribution of the paper is to introduce "blocked time analysis" methodology to enable deeper analysis of end-to-end performance in data analytics frameworks.


It is too complicated to infer job bottlenecks by just looking at log of parallel tasks. Instead the paper argues, we should go with the resources perspective, and try to infer how much faster would the job complete if tasks were never blocked on the network. The blocked time analysis method instruments the application to measure performance, uses simulations to find improved completion time while taking new scheduling opportunities into account.


Conclusions

In sum, this paper raised more questions than it answered for me, but that is not necessarily a bad thing. I am OK with being confused, and I am capable of holding ambivalent thoughts in my brain. These are minimal (necessary but not sufficient) requirements for being a researcher. I would rather have unanswered questions than unquestioned answers. (Aha, of course, that was a Feynman quote. "I would rather have questions that can't be answered than answers that can't be questioned." --Richard Feynman)

This analysis was done for Spark. The paper makes the analysis tools and traces available online so that others can replicate the results. The paper does not claim that these results are broadly representative and apply to other big data analytics frameworks.

Frank McSherry and University of Cambridge Computing Lab take issue with generalizability of the results, and run some experiments on the timely dataflow framework. Here are their post1 and post2 on that.

The results do not generalize for machine learning frameworks, where network is still the significant bottleneck, and optimizing the network can give up to 75% gains in performance.

Tuesday, May 9, 2017

Paper review: Prioritizing attention in fast data

This paper appeared in CIDR17 and is authored by Peter Bailis, Edward Gan, Kexin Rong, and Sahaana Suri at Stanford InfoLab.

Human attention is scarce, data is abundant. The paper argues, this is how we fight back:

  • prioritize output: return fewer results
  • prioritize iteration: perform feedback driven development and give useful details and allow user to tune the analysis pipeline  
  • prioritize computation: aggressively filter and sample, tradeoff accuracy/completeness with performance where it has low impact, and use incremental data structures

The slogan for the system is: MacroBase is a search engine for fast data. MacroBase employs a customizable combination of high-performance streaming analytics operators for feature extraction, classification, and explanation.

MacroBase has a dataflow architecture (Storm, Spark Streaming, Heron). The paper argues it is better to focus on what dataflow operators to provide than to try to design from-scratch a new system (which won't be much faster/efficient than existing dataflow systems anyhow).


The architecture of MacroBase is simple:
ingestion&ETL -> feature transform -> classification -> data explanation

MacroBase focuses attention on dataflow operators to prioritize computation. This is done by applying classic systems techniques: predicate pushdown, incremental memorization, partial materialization, cardinality estimation, approximate query processing (top K sketch).


Users are engaged at three different interface levels with MacroBase.
1) Basic: web based point and click UI
2) Intermediate: custom pipeline configuring using Java
3) Advanced: custom dataflow operator design using Java/C++

Users highlight key performance metrics (e.g., power drain, latency) and metadata attributes (e.g., hostname, device ID), and MacroBase reports explanations of abnormal behavior. For example, MacroBase may report that queries running on host 5 are 10 times more likely to experience high latency than the rest of the cluster.

As a broader theme, the paper argues there is opportunity in marrying systems-oriented performance optimization and the machine learning literature. Another big message from the paper is the importance of building combined and optimized end-to-end systems.

MacroBase is currently doing mostly anomaly/outlier detection, and it is not doing any deeper machine learning training. There are plans to make the system distributed. Given that it is based on a dataflow system, there are many plausible ways to achieve distribution of MacroBase.

Saturday, May 6, 2017

Paper Review: Serverless computation with OpenLambda

This paper provides a great accessible review and evaluation of the AWS Lambda architecture. It is by Scott Hendrickson, Stephen Sturdevant, Tyler Harter, Venkateshwaran Venkataramani†, Andrea C. Arpaci-Dusseau, Remzi H. Arpaci-Dusseau, and it appeared at Hot Cloud 16


Virtual machines virtualized and shared the hardware so multiple VMs can colocate on the same machine. This allowed consolidation of machines, prevented the server sprawl problem, and reduced costs as well as improving manageability.

The containers virtualized and shared the operating system, and avoided the overheads of VMs. They provided fast startup times for application servers. By "fast" we mean about 25 seconds of preparation time.

In both VMs and containers, there is a "server" waiting for a client to serve to. Applications are defined as collection of servers and services.

"Serverless" takes the virtualization a step ahead. They virtualize and share the runtime, and now the unit of deployment is a function. Applications are now defined as a set of functions (i.e., lambda handlers) with access to a common data store.

Lambda handlers from different customers share common pools of runtimes managed by the cloud provider, so developers need not worry about server management. Handlers are typically written in interpreted languages such as JavaScript or Python. By sharing the runtime environment across functions, the code specific to a particular application will typically be small, and hence it is inexpensive to send the handler code to any worker in a cluster.

Performance evaluation on AWS Lambda 

Handlers can execute on any worker; in AWS, startup time for a handler on a new worker is approximately 1-2 seconds. Upon a load burst, a load balancer can start a Lambda handler on a new worker to service a queued RPC call without incurring excessive latencies. Figure 2 shows that 100 lambda workers are generated in a short time to serve 100 outstanding RPC requests.



Figure 5 shows more details about the lambda handler initialization. There is a delay for unpausing a lambda function (1ms), if you start from scratch that delay is actually several 100ms.


On the systems research side, one problem to investigate is building better execution engines. Under light load, Lambdas are significantly slower than containers as Figure 4 shows.


Lambdas are great for performance tuning. You can see which functions are accessed how many times because that is how billing is provided. This helps you to tune the performance of your applications.

On the topic of billing, in AWS, the cost of an invocation is proportional to the memory cap (not the actual memory consumed) multiplied by the actual execution time, as rounded up to the nearest 100ms. But many RPCs are shorter than 100ms, so they cost several times more than if charging were more fine-grained.

The authors are working on building an opensource lambda computing platform.

Discussion and future directions

So what kind of functions are suitable for "lambdatization"?

Currently RPC calls from web apps are being lambdatized. But as the paper observed, maybe those are too small, they last less than 100 ms, the unit of billing. A common use case scenario is when an app puts an image to S3, this triggers a call to a lambda handler that processes this image and creates a thumbnail. That is a better fit.

It is also best if the input to the lambda function is not very big, wasting/duplicating work by transferring large amounts of data. Lambda is for computation. The computation should compensate the cost of network use. Lambda excels in handling bursty traffic by autoscaling extremely quickly.

Picking up on this last observation, I think a very beneficial way to employ lambdatization is for addressing bottlenecks of the big data processing application/platform that surface at runtime. In a sense, I am advocating to use lambdatization to virtualize even the application at the unit of functions! For this we can ask the developer to provide tags to label some functions as lambda-offloadable. Then we can use a preprocessor and a shim layer to deploy these functions as lambda functions, so they can be auto-scaled based on the feedback/directives from the underlying distributed systems middleware.

Sunday, April 30, 2017

Setting up a new Mac laptop

I install brew, and then install cask (using brew).

Then I install mactex and emacs using cask.

I install Dropbox, so my research directories get to sync.

These days Opera is my browser of choice.

I use f.lux and caffeine to get my monitor to behave.

I can't live without some keyboard customizations. I open keyboard preferences, and get function keybindings to work without requiring the pressing of fn key. While there I map the CAPSLOCK to Control key. I use alt-left/right to traverse between desktops.

My Emacs customization takes some time. I use the modularized emacs24-starter kit at http://eschulte.github.io/emacs24-starter-kit/. It is great, it has good defaults, but it would be much better if it has better documentation and instructions.

Since I install emacs using cask rather than from binary, I get to enjoy good package-manager support using ELPA. To see the available packages type M-x package-show-package-list. I install auctex, exec-path-from-shell, ipython, magit, python-mode, tomatinho, visual-fill-column.

I use a wheat background, and Chalkboard font. I use highlight line mode as well with lightsteelblue1 color.
(custom-set-faces
 '(default ((t (:inherit nil :background "wheat" :height 160 :width normal :foundry "nil" :family "Chalkboard"))))
For highlighting certain patterns at files, I use global-hi-lock mode as well.

Finally since I use org-mode a lot, I have several special keybindings for org-mode. Some of them are:
(global-set-key (kbd "<f5>") 'org-schedule)
(global-set-key (kbd "<f8>") 'org-deadline)
(global-set-key (kbd "<f10>") 'org-store-link)
(global-set-key (kbd "<f11>") 'org-insert-link)
(global-set-key (kbd "<f12>") 'org-agenda)

Let me know if you have some other important apps tricks, that you can't live without.

Thursday, April 27, 2017

DARPA: Delivering Disruption

What does DARPA and Facebook have in common?
Move fast and break things!

A couple weeks ago Stefanie Tompkins, DARPA acting deputy director, visited University at Buffalo to give a talk and meet researchers. She talked about DARPA and funding opportunities at DARPA. I had the mental image of DARPA as a slow moving, hard to maneuver aircraft carrier, but her talk painted a very different picture, and partially changed my perception of DARPA.

DARPA, then ARPA (Advanced Research Projects Agency), was created in 1958 as a response to Sputnik 1. DARPA's mission is to prevent strategic surprise, avoid another Sputnik moment. And as you already know, the best way to predict future is to invent it. So DARPA is perpetually pushing the envelope to invent the next strategic surprise and disruptive technology itself.

DARPA has a large number of successes under its belt. Internet has its roots in ARPANET.  GPS grew out of basic research on atomic clocks funded by DARPA. DARPA is also credited with several inventions in stealth, radar arrays, uavs, ir night vision, microelectronics, and materials sciences. DARPA funded several high impact grand challenges on autonomous vehicles starting back in 2004, which helped kick start todays self-driving cars technologies. DARPA also funded other grand challenges, including the network challenge (2009), and cyber grand challenge (2016).

To drive for disruption and off-scale impact, DARPA recruits program managers from universities (20% of PMs) and industry. The program managers rotate quickly, they serve 3-5 years. Within that time, they use the budget allocated to them to create/fund DARPA programs on high impact research and development. DARPA programs may range from $10,000,000 to $400,000,000 over 2 to 5 years.

DARPA has 6 technical offices.


Heilmeier questions have a significant place DARPA culture. Even the program managers need to answer these questions when creating new programs.

Another part of DARPA culture is the infamous quadchart. "Quadcharts which cram in figures, assertions, and required logos/branding/acronyms are a proud and sacred tradition" (RT @tedherman).

While Stefanie's presentation managed to convince me that DARPA is an agile institution, the quadcharts make me a skeptic :-)

Monday, April 24, 2017

Book Review: The Story of Your Life (by Ted Chiang)

The "Story of Your Life" is the science-fiction story which the Arrival film was based on. It was a very good read. Back in November, before the movie was released, I read it in less than 2 hours, in one breath.

[Spoilers ahead.]

The first couple pages of the book had some weird/mixed usage of the past future tense. Right at that first page, you get the idea that something is not right and this is going to be an interesting read. And as you keep reading, the story drops on you more clues, and you feel both smart and awed, when you first piece together that Gary was indeed the linguist's (Louise) first husband, and the daughter is not dead yet due to the climbing accident.

Ted Chiang gives a lot of credit to the readers' intelligence. I liked that a lot. I also liked that I had to google and learn several things while reading the story. I googled to learn about "Fermat's principle", "teleology", "ideograms", and some related linguistic terms.

I was pretty shaken after finishing the story. It contributes to the Freewill and Fate debate in philosophy and theology from a science-fiction perspective. (Time is an illusion, and with an altered perception, you can experience time in an orthogonal axis, and freewill becomes irrelevant/pointless.) The story is at the same time very emotional because the parentship thread is very well woven into the story.

Ted Chiang writes a very tight story. No, actually, he calculates, computes, and weaves the story. Amused by this first story, I read couple other stories from him. I also liked "The Alchemist's Gate", which was written like a Sufi story, again exploring the concept of fate, and free will.  Ted seems to be obsessed about these concepts, and must have been thinking very deeply about them. This  story also made subtle and thoughtful "what if" cases about these concepts. This story also had a strong technical side interwoven with a moving emotional side.

Wikipedia says Ted Chiang has been working as a technical writer in the software industry. I would be interested in reading the manuals he writes.

Related links:
Hacker News Discussion 
How I wrote Arrival 
Wolfram's blog on the symbolic language created for the movie 
Timequake by Kurt Vonnegut was also an excellent book playing with the freewill and fate ideas.

Friday, April 21, 2017

Paper summary: Federated Learning

On Thursday, April 6, Google announced Federated Learning.  The announcement didn't make quite a splash, but I think this is potentially transformative. Instead of uploading all the data from the smartphones to the cloud for training the model in the datacenter, federated learning enables in-situ training at the smartphones themselves. The datacenter is still involved but it is involved for just aggregating the smartphone-updated local models in order to construct the new/improved global model.

This is a win-win-win situation. As a smartphone user, your privacy is preserved since your data remains on your device, but you still get the benefits of machine learning on your smartphone. Google gets what it needs: it perpetually learns from cumulative user experience and improves its software/applications. Google collects insights without collecting data (and some of these insights may still be transferable to advertising income). Secondly, Google also outsources the training to the users' smartphones: it makes the smartphones work on updating the model, rather than using servers in its datacenters. This may seem like penny-pinching, but if you consider the sheer number of Android smartphones out there (more than 1.5 billion according to numbers by the end of 2015), the computation savings are huge. Notice that Google is winning twice, while you win only once. :-)

After skimming the Google announcement, I was excited, because I had predicted this on Jan 2016. When I read the TensorFlow whitepaper, I was struck by a peculiar emphasis on heterogenous device support of TensorFlow, especially on the smartphone. I predicted that Google is up to something, more than just inference/serving layer support on smartphones. I got the mechanism for this wrong, since I didn't have machine learning experience then.

The Google announcement links to some papers, and I read the most relevant paper carefully. Below is my summary of the paper: Communication-Efficient Learning of Deep Networks from Decentralized Data.

Applications that are ideal fit for federated learning

The paper pitches smartphone applications as the ideal domain for federated learning.
For smartphones, the upload transfer is a bottleneck. Instead of uploading the data to the cloud for training, let the smartphone train and update the model and transmit it back. This makes sense when the model-update is smaller than the data. There is another Google paper that provides tricks for further optimizing/compressing the size of the learned-model at the smartphone for transmitting back to the cloud. Since the upload rate is about 1/3rd of the download rate, such techniques are beneficial.

A second big benefit for the smartphone applications domain is preserving the private data of the smartphone user. Finally, in the smartphone applications domain the labels on the data is available or inferable from user interaction (albeit in an application dependent way).

Concrete examples of these applications are "image classification: predicting which images are most likely to be viewed and shared", and "language modeling: improving voice recognition and text entry on smartphone keyboard." Google is already using federated learning in the GBoard Keyboard on Android for improving text entry. On device training uses a miniature version of TensorFlow.

Related work

There has been work on iteratively averaging locally trained models, but they are inside datacenter, not at the edge. See: Sixin Zhang, Anna E Choromanska, and Yann LeCun. Deep learning with elastic averaging SGD. In NIPS. 2015. and also this one: Ryan McDonald, Keith Hall, and Gideon Mann. Distributed training strategies for the structured perceptron. In NAACL HLT, 2010.

There has been work motivated from privacy perspective, but with limited empirical results. And there has been work in convex setting, for distributed consensus algorithm (not the distributed systems version of the problem but the machine learning version.)

The contributions of the paper

The paper introduces the FedAvg algorithm for federated learning. The algorithm is not an intellectually challenging innovation, as it prescribes a small variation to the traditional SGD training. So the paper is careful not to claim too much credit for the algorithm. Instead the paper distributes its contributions to items 1 and 3 below, and downplays 2 in comparison: "1) the identification of the problem of training on decentralized data from mobile devices as an important research direction; 2) the selection of a straightforward and practical algorithm that can be applied to this setting; and 3) an extensive empirical evaluation of the proposed approach."

The algorithm

The algorithm uses a synchronous update scheme that proceeds in rounds of communication. There is a fixed set of K clients, each with a fixed local dataset. At the beginning of each round, a random fraction C of workers is selected, and the server sends the current global algorithm state to each of these workers (e.g., the current model parameters). Only a fraction of workers are selected for efficiency, as the experiments show diminishing returns for adding more workers beyond a certain point. Each selected worker then performs local computation based on the global state and its local dataset, and sends an update to the server. The server then applies these updates to its global state, and the process repeats.

So here are the high-level steps in the algorithm:

  1. Workers are sent the model by the server
  2. Workers compute an updated model based on their local data
  3. The updated models are sent from the workers to the server
  4. The server aggregates these models (by averaging) to construct the new global model

These steps are almost the same in a traditional ML/DL learning with a parameter-server and workers. But there are some minor differences. Difference in step 1: Not all workers, but a subset of workers are chosen. Differences in step 2: Workers are also producers of data, they work on the data they produce. Workers may do multiple iterations on the local-model-update. Difference in step 3: The model, not the gradients, is transmitted back.

For step 3, I don't know why the model is sent rather than the gradients. The paper presents a derivation to argue why both are equivalent, but then does not provide any justification for transmitting model instead of the gradients. There is no explanation nor experiments about comparing the two approaches.

Here is the algorithm: (The paper calls workers as clients.)


The amount of computation is controlled by three key parameters: C, the fraction of workers that perform computation on each round; E, then number of training passes each worker makes over its local dataset on each round; and B, the local minibatch size used for the worker updates. B =infinity indicates that the full local dataset is treated as a single minibatch.

And this is the biggest difference in the algorithm from traditional synchronous SGD:
In each synchronous round, the workers can perform multiple rounds of local-model update before uploading the model back to the cloud. Since the local-data is small, computing/iterating over it is fast. And since communication is already high-latency, the workers may as well do many local computation rounds to make this worthwhile.

The paper makes a big deal of the unbalanced data distribution and especially the  non-iid (non- independent & identically distributed) data at the workers. However, there is nothing special in the FedAvg algorithm to address the non-iid data challenge. It works because the SGD tolerates noise. And most likely having many workers participate at each round helps a lot. Even with C=0.1, the number of smartphones used for training can be amazingly high. In the experimental results,   100 workers are used for MNIST image classification application, and  1146 workers are used for Shakespeare dataset language modeling application. This is way more workers used in traditional ML, certainly for the MNIST and Shakespeare datasets. So the sheer number of workers helps compensate the non-iid challenge.

Questions

Can Federated Learning bootstrap from zero-trained model?
Before reading the paper, I thought "maybe the smartphones are not initialized from random parameters, but with partially or mostly trained parameters". The experiments suggests otherwise: The smartphones are started from an untrained model. However, Figure 1 in the paper shows that the smartphones (i.e., workers) should get started with the same common initialization, because independent initialization does not converge.

How long is a round?
The experiments investigate how many rounds are needed to converge, but there is no explanation in the paper about the length of a round. In my estimation, the length of a round should be around 5 minutes, or so. In practical use, Google employs as workers the smartphones that are plugged to a wall-outlet and connected to a WI-FI. (Android phones provide this information to Google and a subset among them is randomly selected as workers. In our PhoneLab project, we also uploaded data from phones only when they were plugged in.) Since this is synchronous SGD, the round progresses with the rate of the slowest worker. So they must be using backup workers (maybe 5% or 10%) to combat the straggler problem.

FedAvg uses synchronous SGD, but would it also work with asynchronous rounds?
Unfortunately there are no experiments that investigate this question.

Experimental results

The experiments use the MNIST dataset image classification, CIFAR 10 image classification, and Shakespeare dataset for language modeling and prediction of the next character (alphabet character that is).

The paper says this: "Our goal is to use additional computation in order to decrease the number of rounds of communication needed to train a model. The speedups we achieve are due primarily to adding more computation on each client, once a minimum level of parallelism over clients is used." They achieve this by increasing E (or decreasing B), once C is saturated (which tend to happen for C=0.1 in their setup).


Even the pathologically biased distribution of local datasets  works great because number of C is high, and provides smoothing. "Both standard SGD and FedAvg with only one client per round (C = 0), demonstrate significant oscillations in accuracy, whereas averaging over more clients smooths this out."

Tuesday, February 21, 2017

1 million pageviews



My blog has recently reached 1 million pageviews. This warrants for a short retrospection.

I started the posting regularly on September 2010. I wanted to get into the cloud computing domain, so I needed to accumulate background on cloud computing work. I decided that as I read papers on cloud computing, I will post a summary to this blog. I thought if I could explain what I learned from the papers in my own words, I would internalize those lessons better. And if others read those summaries and benefit, that is an extra plus.

"Writing is nature's way of telling you how sloppy your thinking is." In fact, I learned a lot writing those paper reviews. Writing the reviews gave me a deeper understanding of the work done, beyond what I could achieve by passively reading them. Putting them on web was also a nice choice, because I could refer my students to some of these summaries when needed. And it turned out that I referred to those summaries myself very frequently to jog my memory. Since I have encoded the writing with my understanding, reading through my summary would get me refreshed about the important lessons from that work. Since my summaries were made available on the web, all I needed to do was google search for muratbuffalo and paper name.

(Side remark about my research journey: My research area at the first couple years of my PhD was distributed algorithms and self-stabilization. Then starting on 2002, wireless sensor networks has become my research area. I applied stabilizing distributed algorithms for in-network querying and tracking in wireless sensor networks. Around 2009 I started transitioning to crowdsourced sensing and collaboration using smartphones. And starting from 2010, I transitioned to large-scale cloud computing systems. Distributed systems has been the common theme through out. Here is a link to my research statement as of the end of 2016.)

Over time I included posts about my conference trips, book reviews, rants, and research advice for students. Putting research advice (reading, writing, presenting) is also beneficial because I can refer my students to it. And occasionally I receive emails from remote corners of the world about how some of these posts helped them or inspired them, and that makes me very happy for an entire day.

Some of the big hits 


The bursty traffic all came from Hacker News. The regular traffic came from many sources: Google searches, blog posts, twitter links.

Google tells me I can earn up to $18.50 per month by placing ads on my blog using AdSense. No thanks, for now.

Here are the top 10 posts in my blog as of now. Looks like anything mentioning Facebook is a big hit. Deep learning is also very hot. Glad to see our hybrid logical clocks work also up there. And glad to see interest for TLA+.

Saturday, February 18, 2017

Bowling your way to the top

"Oh, this is very American!" I said, when I finally understood how Bowling scoring works.

Bowling scoring is nonlinear

In a bowling game, there are 10 rounds. There are 10 pins, and you get 2 shoots in each round to knock as many as you can.

Even if you are novice, if you are eager and put effort in it, at each round you can knock down 6 pins. So that gives you a score of 6*10=60.

If you knock down 7 pins at each round, you get a score of 70.
8 pins, you get a score of 80.
9 pins, you get a score of 90.

Here is where things start to go nonlinear and you get accelerated returns. If you knock down all the 10 pins in your two shoots, this is called a spare. Your score for that round is not just 10, but the point you get from the next round is also added to it. So if you had a spare in round k, and got 7 in the next round k+1, you get 10+7 for round k, and 7 for round k+1, and in total of 17+7=24 points from these two rounds. If we were scoring this linearly, you would only get 10+7=17.

If you knock down all the 10 pins in your first shoot in a round, this is called a strike. Your score for that round is not just 10, but the points you get from the next *two* rounds get added to it. If you had a strike in round k, and got 7 in round k+1 and k+2, you get 10+7+7=24 points for round k, 7 for k+1, and 7 for k+2, and a total of 38 points from these 3 rounds. If we were scoring this linearly, you would only get 10+7+7=24 from these 3 rounds.

In the first game I played, I was knocking about 7 pins each round, so I thought, I should be in pretty good shape. Wrong. I got 4th place. The first two guys were experienced, they managed to hit sequences of strikes and spares, and their score grew very fast. My third place friend had managed to hit a series of spares towards the end of the game, and has beaten my score before I could understand my score was being beaten. I thought I was comfortably ahead.

It is more important to hit occasional strikes and spares than hitting a constantly comfortable 7 average.

So let's get back to where we left, the transition from linear to nonlinear scoring.
All 8 pins at all rounds, you get a total score of 80.
All 9, you get a total score of 90.
All spares, you get a total score of 200, instead of 100.
All strikes, you get a total score of 300, instead of 100.

And that last one is called a perfect game. 
Here is a short video of a perfect game.

OK so what?

Why am I wasting my time and your time telling you about bowling scoring?

If you were born and raised in US, you might have yawned reading through the above text. You might be taking the scoring for granted. In fact, when I realized the scoring works in a "funny" way, I asked my American friends to explain. They didn't have much previous practice explaining the scoring. One of them said, after stalling for some time, "Hmm, I realize this is the first time I am explaining Bowling scoring to someone." And this guy has played in a Bowling league for a couple years :-)

After a couple more takes of explaining/questioning with a second friend, when I finally understood what is going on, I blurted: "Oh, this is very American!", which surprised my friend.

If you take this scoring for granted, all I will tell you is this: "Three points for a win" is a relatively recent adoption in soccer scoring. Before that, it was 0 points for loss, 1 points for draw, and 2 points for win. And the games were so boring.

Teams would go for a draw, because the prospect of gaining one extra point by putting effort into attacking was not worth risking your defensive stance which could make you lose the game and get no points. The transition to three points for a win started only after 1980 taking up to 2000 in some countries. And this led to a significant increase of average goals scored in the games.

This is not about bowling, isn't it?

Yes, you see, free markets are inherently nonlinear scoring markets. Nonlinear scoring applies especially for the current information technology markets, where "the best performers are able to capture a very large share of the rewards, and the remaining competitors are left with very little". In such a winner-take-all economy, you run the risk of being overlooked if your products are mediocre. You need to hit some strikes.

This is also true in academia. Yes, you need to show that you are publishing productively, and there is some pebble counting. But in order for those pebbles to count, you need some occasional gems in between. You need to hit some strikes.

It is more important to hit occasional strikes and spares than hitting a constantly comfortable 7 average.

You need to think big, aim big, and go for a strike, so you can achieve nonlinear returns occasionally. 

Other related links

1. Wait a minute? Didn't I tell you a couple days ago "worse is better"?
Yes, I did. But this is how I concluded that post: "Worse is better takes a simplistic/minimalist approach. Simple and minimal can be powerful, if it is not done too ugly. Is worse always better? No. As I said earlier systems design is all about tradeoffs. It is important to analyze and decide in advance what the priorities are."

In fact, a worse-is-better system hits a strike in a priority dimension, such as being minimalist and going viral. On the other hand, a do-the-right-thing system may get stuck with hitting constantly comfortable of 7 average in all dimensions.

2. This nonlinear return idea also reminds me of the high-intensity interval training (HIT) idea. Tim Ferris had a very interesting interview with Prof. Martin Gibala on this.  The idea in HIT is that you get accelerated returns for the short nonlinear effort you put into your training.

Thursday, February 16, 2017

Mesos: A platform for fine-grained resource sharing in the data center

This paper appeared in NSDI 11 and introduced the Mesos job management and scheduling platform which proved to be very influential in the big data processing ecosystem. Mesos has seen a large following because it is simple and minimalist. This reminds me of the "worse is better" approach to system design. This is an important point and I will ruminate about this after I explain you the Mesos platform.

The problem 

We need to make multiple frameworks coexist and share the computing resources in a cluster. Yes, we have submachine scheduling abstractions: first the virtual machines and then containers. But we still need a coordinator/arbiter to manage/schedule jobs submitted from these frameworks  to make sure that we don't underutilize or overload/overtax the resources in the cluster.

Offer-based scheduling

Earlier, I have talked about Borg which addressed this cluster management problem. While Borg (and later Kubernetes) takes a request-based scheduling approach, Mesos chooses to provide an offer-based scheduling approach.

In the request-based scheduling, the frameworks provide their scheduling needs to the scheduler/controller and the scheduler/controller decides where to place the tasks and launches them. This can arguably make the design of the controller overcomplicated. The scheduler/controller may need to understand too many details about multiple frameworks in order to perform their requests adequately. This may not scale well as the number of frameworks to support grows. (And we have an abundance of big data processing frameworks.)

In stark contrast, Mesos delegates the control over scheduling to the frameworks. The Mesos master (i.e., the controller) provides resource offers to the frameworks,  and the frameworks decide which resources to accept and which tasks to run on them.

In other words, Mesos takes a small-government, libertarian approach to cluster management :-)

Since Mesos is minimalist, it is simple and nicely decoupled from the various frameworks it serves. This made Mesos go viral and achieve high-adoption. But systems design is an exercise in choosing which tradeoffs you make. Let's study the drawbacks. (I am putting on my critical hat, I will talk about the benefits of Mesos again toward the end of this post.)

The long-running tasks, and the big tasks strain this offer-based scheduling model. Some frameworks may schedule tasks that can overstay their welcome, and take advantage of the too trusting and hands-off Mesos. This would be unfair to other client frameworks. (Of course the Mesos master may take into account "organizational policies such as fair sharing" when extending offers to the frameworks, and can even kill long running tasks.)

Moreover, since Mesos is hands-off, it does not provide fault-tolerance support for long-running tasks, which are more likely to experience failure in their lifetimes as they run longer. Mesos punts the ball to the client frameworks which will need to carry the burden. And doing this for each client framework may lead to redundant/wasted effort. Fortunately other helper systems like Marathon emerged to address this issue and provide support for long running tasks.

Even assuming that the client frameworks are not-greedy and on their best cooperating behavior, they may not have enough information about other tasks/clients of Mesos to make optimal scheduling decisions. The paper mentions that: "While this decentralized scheduling model may not always lead to globally optimal scheduling, we have found that it performs surprisingly well in practice, allowing frameworks to meet goals such as data locality nearly perfectly."

Related to this problem, another very simple idea makes a cameo appearance in the paper: "We used delay scheduling to achieve data locality by waiting for slots on the nodes that contain task input data. In addition, our approach allowed us to reuse Hadoop's existing logic for re-scheduling of failed tasks and for speculative execution (straggler mitigation)."

Is that too simple a technique? Well, it is hard to argue with results. The delay scheduling paper has received 1000+ citations since 2010. Delay scheduling: A simple technique for achieving locality and fairness in cluster scheduling. In EuroSys 10, 2010. 

Mesos architecture

Mesos means middle or intermediate, from Greek misos. Nice name.


Mesos master is ZooKeeper guarded, so a hot standby can get in and take over if the Mesos master fails. The Mesos master manages the resources by talking to Mesos slaves/workers on the machines in the cluster. This is similar to how BorgMaster manages resources talking to Borglets on the machines.

So where is the scheduler in this architecture? This responsibility is punted to the client frameworks. As we mentioned above, the Mesos master provides offers to the client frameworks, and it is upto the client framework to accept an offer.


Here is how things work from the client framework's perspective. Each framework intending to use Mesos needs to implement two components: a scheduler that registers with the Mesos master to be offered resources, and an executor that is launched on Mesos worker nodes to run the framework’s tasks.


This table shows the callbacks and actions to implement to write the scheduler and the executor components. (To Python users, there is pyMesos to help you write the scheduler and executor components in Python.)

In the resourceOffer callback, the scheduler should implement the functionality to select which of the offered resources to reject and which to use along with how to pass Mesos a description of the tasks it wants to launch on them. What if that offer became unavailable in the meanwhile? The Mesos master will then warn the Scheduler via the offerRescinded callback that the offer has been rescinded, and it is the client framework's responsibility to handle this and reschedule the job using the next offers from the Mesos master.

The implementation of the scheduler gets more and more involved if the framework would like to keep track of tasks for a submitted job and provide the users of the framework this information. The scheduler gets callbacks on statusUpdate of the tasks, but it needs to piece together and track which job these tasks correspond to. For example, the scheduler gets a callback when a task is finished, and then it is the responsibility of the scheduler to check and mark a job as completed when all its tasks are finished.

This scheduler/executor abstraction can also get leaky. The paper mentions this about the Hadoop port, which came to a total of 1500 lines of code: "We also needed to change how map output data is served to reduce tasks. Hadoop normally writes map output files to the local filesystem, then serves these to reduce tasks using an HTTP server included in the TaskTracker. However, the TaskTracker within Mesos runs as an executor, which may be terminated if it is not running tasks. This would make map output files unavailable to reduce tasks. We solved this problem by providing a shared file server on each node in the cluster to serve local files. Such a service is useful beyond Hadoop, to other frameworks that write data locally on each node."

If you are thin like Mesos, you can (should?) add on weight later when it is warranted.

A side remark: What is it with the "fine-grained" in the title?

The title is a humble and conservative title: "Mesos: A platform for fine-grained resource sharing in the data center". The paper seems insecure about this issue, and keeps referring back to this to emphasize that Mesos works best with fine-grained short tasks. This gets peculiar for a careful reader.

Well, I think I know the reason for this peculiarity. Probably the authors may have been burned before about this from an over-critical reviewer (it is always Reviewer 2!), and so they are trying to preemptively dismantle the same criticism to be aired again. This is a very solid and important paper, but I wouldn't be surprised even a paper of this caliber may have been rejected earlier and this version may be their second (or even third) submission. Reviewer 2 might have told the authors  not too subtly that the paper is claiming too much credit (which is always a big pet peeve of reviewer 2), and the current version of the paper is written defensively to guard against this criticism.

Oh, the joys of academia. I wouldn't be surprised if Reviewer 2 also found the paper low on novelty and suitable more for industrial research and not for academic research.

Worse-is-better

Yes, Mesos is too eager to punt the ball to the clients. But this is not necessarily a bug, it can be a feature. Mesos is thin, and  gives your frameworks control over how to schedule things. Mesos doesn't step in your way and provides your frameworks  low-level control over scheduling and management decisions.

Mesos reminds me of the "worse-is-better" approach to system design. (Ok, read that link, it is important. I will wait.) Since Mesos is minimalist and simple it is a viral platform. (Much like MapReduce and Hadoop were.)

Borg/Kubernetes aims to do "the-right-thing". They provide a blackbox cluster manager that provides a lot of features, optimal scheduling, fault-tolerance, etc. This is great if you fit into the workloads that they cater to, which covers most of the web-services workloads. But this approach may actually get in your way if you like to have low-layer control on scheduling/management decisions.

I read the "worse is better" when I was a fresh graduate student in 1999 working on the theory side of distributed algorithms and self-stabilization. I was a Dijkstra fan, and this article was a real eye opener for me. It made me to question my faith :-)

Worse is better takes a simplistic/minimalist approach. Simple and minimal can be powerful, if it is not done too ugly. Is worse always better? No. As I said earlier systems design is all about tradeoffs. It is important to analyze and decide in advance what the priorities are.

I feel like I will pick up on this thread at another time.

Saturday, February 11, 2017

Large-scale cluster management at Google with Borg

This paper from Google appeared on Eurosys'15. The paper presents Borg, the cluster management system Google used since 2005. The paper includes a section at the end about the good and bad lessons learned from using Borg, and how these led to the development of Kubernetes container-management system which empowers the Google Cloud Platform and App Engine.

Borg architecture



This is the Borg. Resistance is futile.

A median Borg cell is 10K machines. And all those machines in a cell are served by a logically centralized control: the Borgmaster.

Where is the bottleneck in the centralized Borg architecture? The paper says it is still unclear whether this architecture would hit a practical scalability limit. Anytime Borg was given a scalability target, they managed to achieve it by applying basic techniques: caching, loose-synchronization, and aggregation.

What helped the most for achieving scalability was decoupling the scheduler component from the Borgmaster. The scheduler is loosely-synchronized with the Borgmaster: it operates on a cached cached copy of the cell state and acts as a counsel/advisor to the Borgmaster. If the scheduler makes a decision that is not feasible (because it is based of an outdated state: machine failed, resource gone, etc.), the Borgmaster will not take that advice and ask the scheduler to reschedule the job this time hopefully with better up-to-date state.

To provide high-availability, the Borgmaster is Paxos-replicated over 5 machines. Replicas serve read-only RPC calls to reduce the workload on the Borgmaster leader. In addition to the Paxos log, there is also periodic checkpoints/snapshots to restore the Borgmaster's state to an arbitrary point in the past. A fauxmaster can also use this functionality in debugging of the Borgmaster and scheduling performance.

A Borglet is the local Borg agent on every machine in a cell. (In Mesos this corresponds to the Mesos slave, or in the new terminology the Mesos agent.) Borgmaster replica runs a stateless link shard to handle the communication with some subset of borglets. The link shard aggregates and compresses and reports only diffs to the state machines to reduce update load at the elected master.

Jobs and tasks



A job consists of many tasks (which are same binary programs). 50% of machines run 9+ tasks, and 90%ile machine has ~25 tasks and run ~4500 threads.

Google's Borg workload consists of 2 main categories. Production jobs are long running services serving short user requests and they require low-latency. Batch jobs on the other hand are less-sensitive to performance fluctuations. The workload has dynamic surges: batch jobs come and go, and productions jobs have a diurnal pattern. (A representative Borg workload trace is publicly available.) Borg needs to handle this dynamic demand while providing as high utilization of the cluster machines as possible.

It turns out tight-packing scheduling is not optimal for high-utilization, because it is too strict and fails to accommodate for bursty loads and misestimations from Borg clients. Instead a hybrid packing is used, which provides 5% better packing efficiency than the tight-packing/best-fit policy. Borg uses priorities for tasks. If a machine runs out of resources to accommodate its assigned tasks (e.g., due to burst in demands), lower priority tasks on that machine are killed and added to the scheduler's pending queue for re-placement.

Users operate on jobs by issuing remote procedure calls (RPCs) to Borg, most commonly from a command-line tool or from other Borg jobs. To help users manage their jobs, Borg provides declarative job specification language, and job monitoring/management tools. Borg uses the concept of allocation set for a job, which corresponds to the concept of pod in Kubernetes.

Task startup latency at a machine is about 25seconds, 20 sec of which is package installation time. To reduce the latency from package installation, Borg tries to schedule tasks where the packages are already available. In addition, Borg employs tree and torrent-like protocols to distributes packages to machines in parallel. Finally, Borg also tries to schedule tasks to reduce correlation of failures for a given job.

Almost every task contains a builtin HTTP server that publishes health and performance info. Borg monitors the health-check URL and restarts tasks that fail to respond.

Wednesday, January 18, 2017

Deep Learning With Dynamic Computation Graphs (ICLR 2017)

This is a paper by Google that is under submission to ICLR 2017. Here is the OpenReview link for the paper. The paper pdf as well as paper reviews are openly available there. What a concept!

This paper was of interest to me because I wanted to learn about dynamic computation graphs. Unfortunately almost all machine learning/deep learning (ML/DL) frameworks operate on static computation graphs and can't handle dynamic computation graphs. (Dynet and Chainer are exceptions).

Using dynamic computation graphs allows dealing with recurrent neural networks (RNNs) better, among other use cases. (Here is a great article about RNNs and LSTMs. Another good writeup on RNNs is here.) TensorFlow already supports RNNs, but by adding padding to ensure that all input data are of the same size, i.e., the maximum size in the dataset/domain. Even then this support is good only for linear RNNs not good for treeRNNs which is suitable for more advanced natural language processing.

This was a very tough paper to read. It was definitely above my level as a beginner. The paper assumed a lot of background from the reader. It assumed familiarity with TensorFlow execution and operators, and also some understanding of programming language background and familiarity with RNNs. The dynamic batching idea introduced in the paper is a complex idea but it is explained briefly (and maybe a bit poorly?) in one page. Even when I gave the paper all my attention, and tried to form several hypothesis of dynamic batching idea, I was unable to make progress. At the end, I got help from a friend who is an expert at deep learning.

I skipped reading the second part of the paper which introduced a combinator library for NNs. The library is relevant because it was instrumental in implementing the dynamic batching idea introduced in the first part of the paper. This second part looked interesting but the functional programming language concepts discussed was hard for me to follow.

The dynamic batching idea

This paper introduces dynamic batching idea to emulate dynamic computation graphs (DCGs) of arbitrary shapes and sizes over TensorFlow which only supports static computation graphs.

Batching is important because GPUs crave for batching, especially when dealing with text data where each item is of small size. (While images are already large enough to fill/busy the GPU, but that is not so for text data.)

However, the challenge for batching when using DCGs is that the graph of operations is not static, and can be different for every input. The dynamic batching algorithm fixes batching for DCGs. Given a set of computation graphs as input, each of which has a different size and topology, dynamic batching algorithm will rewrite the graphs by batching together all instances of the same operation that occur at the same depth in the graph. (Google is really into graph rewriting.)

The dynamic batching algorithm takes as input a batch of multiple input graphs and treats them as a single disconnected graph. Source nodes are constant tensors, and non-source nodes are operations. Scheduling is performed using a greedy algorithm: (I omit some of the more detailed steps in the paper.)
  • Assign a depth, d, to each node in the graph. Nodes with no dependencies (constants) are assigned depth zero. Nodes with only dependencies of depth zero, are assigned depth one, and so on.
  • Batch together all nodes invoking the same operation at the same depth into a single node.
  • Concatenate all outputs which have the same depth and tensor type. The order of concatenation corresponds to the order in which the dynamic batching operations were enumerated.
Each dynamic operation is instantiated once in the static dataflow graph. The inputs to each operation are tf.gather ops, and the outputs are fed into tf.concat ops. These TensorFlow ops are then placed within a tf.while_loop. Each iteration of the loop will evaluate all of the operations at a particular depth. The loop maintains state variables for each tensor type t, and feeds the output of concat for tensor type t and iteration d into the input of the gathers at tensor type t and iteration d+1.

Experimental results

The test results emphasize the importance of batching, especially on GPUs where it can enable speed ups up to 120x. The speedup ratio denotes the ratio between the per-tree time for dynamic batching on random shapes ("full dynamic"), versus manual batching with a batch size of 1.

Dynamic batching instantiates each operation only once, and invokes it once for each depth, so the number of kernel invocations is log(n), rather than n, where n is tree size. Dynamic batching thus achieves substantial speedups even at batch size 1, because it batches operations at the same depth within a single tree.

Limitations

Dynamic batching works on a single machine, it is not distributed. Dynamic batching requires an all to all broadcasts, so it doesn't scale to distributed machines.

This Google paper doesn't cite or talk about Dynet and Chainer, but Dynet and Chainer are single machine ML/DL frameworks that support dynamic computation graphs. On one hand, Dynet & Chainer are most likely not good at batching, and the dynamic batching method here has contribution. On the other hand, since Dynet & Chainer support dynamic computation graphs natively (rather than by way of emulating it on static computation graphs like dynamic batching does), they are most likely more expressive than the dynamic batching can achieve. In fact, another limitation of the dynamic batching approach is that it requires all operations that might be used to be specified in advance. Each input/output may have a different type but all types must be fixed and fully specified in advance.