## Wednesday, May 24, 2017

### Paper summary: Making sense of Performance in Data Analytics Frameworks (NSDI 15)

What constitutes the bottlenecks for big data processing frameworks? If CPU is a bottleneck, it is easy to fix: add more machines to the computation. Of course for any analytics job, there is some amount of coordination needed across machines. Otherwise, you are just mapping and transforming, but not reducing and aggregating information. And this is where the network and the disk as bottleneck comes into play. The reason you don't get linear speedup by adding more machines to an analytics job is the network and disk bottlenecks. And a lot of research and effort is focused on trying to optimize and alleviate the network and disk bottlenecks.

OK this sounds easy, and it looks like we understand the bottlenecks in big data analytics. But this paper argues that there is a need to put more work into understanding the performance of big data analytics framework, and shows that at least for Spark on the benchmarks and workloads they tried (see Table 1), there are some counter intuitive results. For Spark, the network is not much of a bottleneck: Network optimizations can only reduce job completion time by a median of at most 2%. The disk is more of a bottleneck than the network: Optimizing/eliminating disk accesses can reduce job completion time by a median of at most 19%. But most interestingly, the paper shows that CPU is often the bottleneck for Spark, so engineers should be careful about trading off I/O time for CPU time using more sophisticated serialization and compression techniques.

This is a bit too much to digest at once, so let's start with the observations about disk versus network bottlenecks in Spark. Since shuffled data is always written to disk and read from the disk, the disk constitutes more of a bottleneck than the network in Spark. (Here is a great Spark refresher. Yes RDDs help, but pipelining is possible only within a stage. Across the stages, shuffling is needed, and the intermediate shuffled data is always written to and read from the disk.)

To elaborate more on this point, the paper says: "One reason network performance has little effect on job completion time is that the data transferred over the network is a subset of data transferred to disk, so jobs bottleneck on the disk before bottlenecking on the network [even using a 1Gbps network]." While prior work has found much larger improvements from optimizing network performance, the paper argues that prior work focused mostly on workloads where shuffle data is equal to the amount of input data, which is not representative of typical workloads (where shuffle data is around one third of input data). Moreover the paper argues, prior work used incomplete metrics, conflating the CPU and network utilization. (More on this later below, where we discuss the blocked time analysis introduced in this paper.)

OK, now for the CPU being the bottleneck, isn't that what we want? If the CPU becomes the bottleneck (and not the network and the disk), we can add more machines to it to improve processing time. (Of course there is a side effect that this will in turn create more need for network and disk usage to consolidate the extra machines. But adding more machines is still an easy route to take until adding machines start to harm.) But I guess there is good CPU utilization, and not-so-good CPU utilization, and the paper takes issue with the latter. If you have already a lot overhead/waste associated with your CPU processing, it will be easier to speed up your framework by adding more machines, but that doesn't necessarily make your framework an efficient framework as it is argued in "Scalability, but at what COST?".

So I guess, the main criticisms in this paper for Spark is that Spark is not utilizing the CPU efficiently and leaves a lot of performance on the table.  Given the simplicity of the computation in some workloads, the authors were surprised to find the computation to be CPU bound. The paper blames this CPU over-utilization on the following factors. One reason is that Spark frameworks often store compressed data (in increasingly sophisticated formats, e.g. Parquet), trading CPU time for I/O time. They found that if they instead ran queries on uncompressed data, most queries became I/O bound. A second reason that CPU time is large is an artifact of the decision to write Spark in Scala, which is based on Java: "after being read from disk, data must be deserialized from a byte buffer to a Java object". They find that for some queries considered, as much as half of the CPU time is spent deserializing and decompressing data. Scala is high-level language and has overheads; for one query that they re-wrote in C++ instead of Scala, they found that the CPU time reduced by a factor of more than 2x.

It seems like Spark is paying a lot of performance penalty for their selection of Scala as the programming language. It turns out the programming language selection was also a factor behind the stragglers: Using their blocked time analysis technique, the authors identify the two leading causes of Spark stragglers as Java's garbage collection and time to transfer data to and from disk. The paper also mentions that optimizing stragglers can only reduce job completion time by a median of at most 10%, and in 75% of queries, they can identify the cause of more than 60% of stragglers.

## Blocked time analysis

A major contribution of the paper is to introduce "blocked time analysis" methodology to enable deeper analysis of end-to-end performance in data analytics frameworks.

It is too complicated to infer job bottlenecks by just looking at log of parallel tasks. Instead the paper argues, we should go with the resources perspective, and try to infer how much faster would the job complete if tasks were never blocked on the network. The blocked time analysis method instruments the application to measure performance, uses simulations to find improved completion time while taking new scheduling opportunities into account.

## Conclusions

In sum, this paper raised more questions than it answered for me, but that is not necessarily a bad thing. I am OK with being confused, and I am capable of holding ambivalent thoughts in my brain. These are minimal (necessary but not sufficient) requirements for being a researcher. I would rather have unanswered questions than unquestioned answers. (Aha, of course, that was a Feynman quote. "I would rather have questions that can't be answered than answers that can't be questioned." --Richard Feynman)

This analysis was done for Spark. The paper makes the analysis tools and traces available online so that others can replicate the results. The paper does not claim that these results are broadly representative and apply to other big data analytics frameworks.

Frank McSherry and University of Cambridge Computing Lab take issue with generalizability of the results, and run some experiments on the timely dataflow framework. Here are their post1 and post2 on that.

The results do not generalize for machine learning frameworks, where network is still the significant bottleneck, and optimizing the network can give up to 75% gains in performance.

## Tuesday, May 9, 2017

### Paper review: Prioritizing attention in fast data

This paper appeared in CIDR17 and is authored by Peter Bailis, Edward Gan, Kexin Rong, and Sahaana Suri at Stanford InfoLab.

Human attention is scarce, data is abundant. The paper argues, this is how we fight back:

• prioritize output: return fewer results
• prioritize iteration: perform feedback driven development and give useful details and allow user to tune the analysis pipeline
• prioritize computation: aggressively filter and sample, tradeoff accuracy/completeness with performance where it has low impact, and use incremental data structures

The slogan for the system is: MacroBase is a search engine for fast data. MacroBase employs a customizable combination of high-performance streaming analytics operators for feature extraction, classification, and explanation.

MacroBase has a dataflow architecture (Storm, Spark Streaming, Heron). The paper argues it is better to focus on what dataflow operators to provide than to try to design from-scratch a new system (which won't be much faster/efficient than existing dataflow systems anyhow).

The architecture of MacroBase is simple:
ingestion&ETL -> feature transform -> classification -> data explanation

MacroBase focuses attention on dataflow operators to prioritize computation. This is done by applying classic systems techniques: predicate pushdown, incremental memorization, partial materialization, cardinality estimation, approximate query processing (top K sketch).

Users are engaged at three different interface levels with MacroBase.
1) Basic: web based point and click UI
2) Intermediate: custom pipeline configuring using Java
3) Advanced: custom dataflow operator design using Java/C++

Users highlight key performance metrics (e.g., power drain, latency) and metadata attributes (e.g., hostname, device ID), and MacroBase reports explanations of abnormal behavior. For example, MacroBase may report that queries running on host 5 are 10 times more likely to experience high latency than the rest of the cluster.

As a broader theme, the paper argues there is opportunity in marrying systems-oriented performance optimization and the machine learning literature. Another big message from the paper is the importance of building combined and optimized end-to-end systems.

MacroBase is currently doing mostly anomaly/outlier detection, and it is not doing any deeper machine learning training. There are plans to make the system distributed. Given that it is based on a dataflow system, there are many plausible ways to achieve distribution of MacroBase.

## Saturday, May 6, 2017

### Paper Review: Serverless computation with OpenLambda

This paper provides a great accessible review and evaluation of the AWS Lambda architecture. It is by Scott Hendrickson, Stephen Sturdevant, Tyler Harter, Venkateshwaran Venkataramani†, Andrea C. Arpaci-Dusseau, Remzi H. Arpaci-Dusseau, and it appeared at Hot Cloud 16

Virtual machines virtualized and shared the hardware so multiple VMs can colocate on the same machine. This allowed consolidation of machines, prevented the server sprawl problem, and reduced costs as well as improving manageability.

The containers virtualized and shared the operating system, and avoided the overheads of VMs. They provided fast startup times for application servers. By "fast" we mean about 25 seconds of preparation time.

In both VMs and containers, there is a "server" waiting for a client to serve to. Applications are defined as collection of servers and services.

"Serverless" takes the virtualization a step ahead. They virtualize and share the runtime, and now the unit of deployment is a function. Applications are now defined as a set of functions (i.e., lambda handlers) with access to a common data store.

Lambda handlers from different customers share common pools of runtimes managed by the cloud provider, so developers need not worry about server management. Handlers are typically written in interpreted languages such as JavaScript or Python. By sharing the runtime environment across functions, the code specific to a particular application will typically be small, and hence it is inexpensive to send the handler code to any worker in a cluster.

## Performance evaluation on AWS Lambda

Handlers can execute on any worker; in AWS, startup time for a handler on a new worker is approximately 1-2 seconds. Upon a load burst, a load balancer can start a Lambda handler on a new worker to service a queued RPC call without incurring excessive latencies. Figure 2 shows that 100 lambda workers are generated in a short time to serve 100 outstanding RPC requests.

Figure 5 shows more details about the lambda handler initialization. There is a delay for unpausing a lambda function (1ms), if you start from scratch that delay is actually several 100ms.

On the systems research side, one problem to investigate is building better execution engines. Under light load, Lambdas are significantly slower than containers as Figure 4 shows.

Lambdas are great for performance tuning. You can see which functions are accessed how many times because that is how billing is provided. This helps you to tune the performance of your applications.

On the topic of billing, in AWS, the cost of an invocation is proportional to the memory cap (not the actual memory consumed) multiplied by the actual execution time, as rounded up to the nearest 100ms. But many RPCs are shorter than 100ms, so they cost several times more than if charging were more fine-grained.

The authors are working on building an opensource lambda computing platform.

## Discussion and future directions

So what kind of functions are suitable for "lambdatization"?

Currently RPC calls from web apps are being lambdatized. But as the paper observed, maybe those are too small, they last less than 100 ms, the unit of billing. A common use case scenario is when an app puts an image to S3, this triggers a call to a lambda handler that processes this image and creates a thumbnail. That is a better fit.

It is also best if the input to the lambda function is not very big, wasting/duplicating work by transferring large amounts of data. Lambda is for computation. The computation should compensate the cost of network use. Lambda excels in handling bursty traffic by autoscaling extremely quickly.

Picking up on this last observation, I think a very beneficial way to employ lambdatization is for addressing bottlenecks of the big data processing application/platform that surface at runtime. In a sense, I am advocating to use lambdatization to virtualize even the application at the unit of functions! For this we can ask the developer to provide tags to label some functions as lambda-offloadable. Then we can use a preprocessor and a shim layer to deploy these functions as lambda functions, so they can be auto-scaled based on the feedback/directives from the underlying distributed systems middleware.

## Sunday, April 30, 2017

### Setting up a new Mac laptop

I install brew, and then install cask (using brew).

Then I install mactex and emacs using cask.

I install Dropbox, so my research directories get to sync.

These days Opera is my browser of choice.

I use f.lux and caffeine to get my monitor to behave.

I can't live without some keyboard customizations. I open keyboard preferences, and get function keybindings to work without requiring the pressing of fn key. While there I map the CAPSLOCK to Control key. I use alt-left/right to traverse between desktops.

My Emacs customization takes some time. I use the modularized emacs24-starter kit at http://eschulte.github.io/emacs24-starter-kit/. It is great, it has good defaults, but it would be much better if it has better documentation and instructions.

Since I install emacs using cask rather than from binary, I get to enjoy good package-manager support using ELPA. To see the available packages type M-x package-show-package-list. I install auctex, exec-path-from-shell, ipython, magit, python-mode, tomatinho, visual-fill-column.

I use a wheat background, and Chalkboard font. I use highlight line mode as well with lightsteelblue1 color.
(custom-set-faces
'(default ((t (:inherit nil :background "wheat" :height 160 :width normal :foundry "nil" :family "Chalkboard"))))
For highlighting certain patterns at files, I use global-hi-lock mode as well.

Finally since I use org-mode a lot, I have several special keybindings for org-mode. Some of them are:
(global-set-key (kbd "<f5>") 'org-schedule)
(global-set-key (kbd "<f12>") 'org-agenda)

Let me know if you have some other important apps tricks, that you can't live without.

## Thursday, April 27, 2017

### DARPA: Delivering Disruption

What does DARPA and Facebook have in common?
Move fast and break things!

A couple weeks ago Stefanie Tompkins, DARPA acting deputy director, visited University at Buffalo to give a talk and meet researchers. She talked about DARPA and funding opportunities at DARPA. I had the mental image of DARPA as a slow moving, hard to maneuver aircraft carrier, but her talk painted a very different picture, and partially changed my perception of DARPA.

DARPA, then ARPA (Advanced Research Projects Agency), was created in 1958 as a response to Sputnik 1. DARPA's mission is to prevent strategic surprise, avoid another Sputnik moment. And as you already know, the best way to predict future is to invent it. So DARPA is perpetually pushing the envelope to invent the next strategic surprise and disruptive technology itself.

DARPA has a large number of successes under its belt. Internet has its roots in ARPANET.  GPS grew out of basic research on atomic clocks funded by DARPA. DARPA is also credited with several inventions in stealth, radar arrays, uavs, ir night vision, microelectronics, and materials sciences. DARPA funded several high impact grand challenges on autonomous vehicles starting back in 2004, which helped kick start todays self-driving cars technologies. DARPA also funded other grand challenges, including the network challenge (2009), and cyber grand challenge (2016).

To drive for disruption and off-scale impact, DARPA recruits program managers from universities (20% of PMs) and industry. The program managers rotate quickly, they serve 3-5 years. Within that time, they use the budget allocated to them to create/fund DARPA programs on high impact research and development. DARPA programs may range from $10,000,000 to$400,000,000 over 2 to 5 years.

DARPA has 6 technical offices.

Heilmeier questions have a significant place DARPA culture. Even the program managers need to answer these questions when creating new programs.

Another part of DARPA culture is the infamous quadchart. "Quadcharts which cram in figures, assertions, and required logos/branding/acronyms are a proud and sacred tradition" (RT @tedherman).

While Stefanie's presentation managed to convince me that DARPA is an agile institution, the quadcharts make me a skeptic :-)

## Monday, April 24, 2017

### Book Review: The Story of Your Life (by Ted Chiang)

The "Story of Your Life" is the science-fiction story which the Arrival film was based on. It was a very good read. Back in November, before the movie was released, I read it in less than 2 hours, in one breath.

The first couple pages of the book had some weird/mixed usage of the past future tense. Right at that first page, you get the idea that something is not right and this is going to be an interesting read. And as you keep reading, the story drops on you more clues, and you feel both smart and awed, when you first piece together that Gary was indeed the linguist's (Louise) first husband, and the daughter is not dead yet due to the climbing accident.

Ted Chiang gives a lot of credit to the readers' intelligence. I liked that a lot. I also liked that I had to google and learn several things while reading the story. I googled to learn about "Fermat's principle", "teleology", "ideograms", and some related linguistic terms.

I was pretty shaken after finishing the story. It contributes to the Freewill and Fate debate in philosophy and theology from a science-fiction perspective. (Time is an illusion, and with an altered perception, you can experience time in an orthogonal axis, and freewill becomes irrelevant/pointless.) The story is at the same time very emotional because the parentship thread is very well woven into the story.

Ted Chiang writes a very tight story. No, actually, he calculates, computes, and weaves the story. Amused by this first story, I read couple other stories from him. I also liked "The Alchemist's Gate", which was written like a Sufi story, again exploring the concept of fate, and free will.  Ted seems to be obsessed about these concepts, and must have been thinking very deeply about them. This  story also made subtle and thoughtful "what if" cases about these concepts. This story also had a strong technical side interwoven with a moving emotional side.

Wikipedia says Ted Chiang has been working as a technical writer in the software industry. I would be interested in reading the manuals he writes.

Hacker News Discussion
How I wrote Arrival
Wolfram's blog on the symbolic language created for the movie
Timequake by Kurt Vonnegut was also an excellent book playing with the freewill and fate ideas.

## Friday, April 21, 2017

### Paper summary: Federated Learning

On Thursday, April 6, Google announced Federated Learning.  The announcement didn't make quite a splash, but I think this is potentially transformative. Instead of uploading all the data from the smartphones to the cloud for training the model in the datacenter, federated learning enables in-situ training at the smartphones themselves. The datacenter is still involved but it is involved for just aggregating the smartphone-updated local models in order to construct the new/improved global model.

This is a win-win-win situation. As a smartphone user, your privacy is preserved since your data remains on your device, but you still get the benefits of machine learning on your smartphone. Google gets what it needs: it perpetually learns from cumulative user experience and improves its software/applications. Google collects insights without collecting data (and some of these insights may still be transferable to advertising income). Secondly, Google also outsources the training to the users' smartphones: it makes the smartphones work on updating the model, rather than using servers in its datacenters. This may seem like penny-pinching, but if you consider the sheer number of Android smartphones out there (more than 1.5 billion according to numbers by the end of 2015), the computation savings are huge. Notice that Google is winning twice, while you win only once. :-)

After skimming the Google announcement, I was excited, because I had predicted this on Jan 2016. When I read the TensorFlow whitepaper, I was struck by a peculiar emphasis on heterogenous device support of TensorFlow, especially on the smartphone. I predicted that Google is up to something, more than just inference/serving layer support on smartphones. I got the mechanism for this wrong, since I didn't have machine learning experience then.

The Google announcement links to some papers, and I read the most relevant paper carefully. Below is my summary of the paper: Communication-Efficient Learning of Deep Networks from Decentralized Data.

## Applications that are ideal fit for federated learning

The paper pitches smartphone applications as the ideal domain for federated learning.
For smartphones, the upload transfer is a bottleneck. Instead of uploading the data to the cloud for training, let the smartphone train and update the model and transmit it back. This makes sense when the model-update is smaller than the data. There is another Google paper that provides tricks for further optimizing/compressing the size of the learned-model at the smartphone for transmitting back to the cloud. Since the upload rate is about 1/3rd of the download rate, such techniques are beneficial.

A second big benefit for the smartphone applications domain is preserving the private data of the smartphone user. Finally, in the smartphone applications domain the labels on the data is available or inferable from user interaction (albeit in an application dependent way).

Concrete examples of these applications are "image classification: predicting which images are most likely to be viewed and shared", and "language modeling: improving voice recognition and text entry on smartphone keyboard." Google is already using federated learning in the GBoard Keyboard on Android for improving text entry. On device training uses a miniature version of TensorFlow.

## Related work

There has been work on iteratively averaging locally trained models, but they are inside datacenter, not at the edge. See: Sixin Zhang, Anna E Choromanska, and Yann LeCun. Deep learning with elastic averaging SGD. In NIPS. 2015. and also this one: Ryan McDonald, Keith Hall, and Gideon Mann. Distributed training strategies for the structured perceptron. In NAACL HLT, 2010.

There has been work motivated from privacy perspective, but with limited empirical results. And there has been work in convex setting, for distributed consensus algorithm (not the distributed systems version of the problem but the machine learning version.)

## The contributions of the paper

The paper introduces the FedAvg algorithm for federated learning. The algorithm is not an intellectually challenging innovation, as it prescribes a small variation to the traditional SGD training. So the paper is careful not to claim too much credit for the algorithm. Instead the paper distributes its contributions to items 1 and 3 below, and downplays 2 in comparison: "1) the identification of the problem of training on decentralized data from mobile devices as an important research direction; 2) the selection of a straightforward and practical algorithm that can be applied to this setting; and 3) an extensive empirical evaluation of the proposed approach."

## The algorithm

The algorithm uses a synchronous update scheme that proceeds in rounds of communication. There is a fixed set of K clients, each with a fixed local dataset. At the beginning of each round, a random fraction C of workers is selected, and the server sends the current global algorithm state to each of these workers (e.g., the current model parameters). Only a fraction of workers are selected for efficiency, as the experiments show diminishing returns for adding more workers beyond a certain point. Each selected worker then performs local computation based on the global state and its local dataset, and sends an update to the server. The server then applies these updates to its global state, and the process repeats.

So here are the high-level steps in the algorithm:

1. Workers are sent the model by the server
2. Workers compute an updated model based on their local data
3. The updated models are sent from the workers to the server
4. The server aggregates these models (by averaging) to construct the new global model

These steps are almost the same in a traditional ML/DL learning with a parameter-server and workers. But there are some minor differences. Difference in step 1: Not all workers, but a subset of workers are chosen. Differences in step 2: Workers are also producers of data, they work on the data they produce. Workers may do multiple iterations on the local-model-update. Difference in step 3: The model, not the gradients, is transmitted back.

For step 3, I don't know why the model is sent rather than the gradients. The paper presents a derivation to argue why both are equivalent, but then does not provide any justification for transmitting model instead of the gradients. There is no explanation nor experiments about comparing the two approaches.

Here is the algorithm: (The paper calls workers as clients.)

The amount of computation is controlled by three key parameters: C, the fraction of workers that perform computation on each round; E, then number of training passes each worker makes over its local dataset on each round; and B, the local minibatch size used for the worker updates. B =infinity indicates that the full local dataset is treated as a single minibatch.

And this is the biggest difference in the algorithm from traditional synchronous SGD:
In each synchronous round, the workers can perform multiple rounds of local-model update before uploading the model back to the cloud. Since the local-data is small, computing/iterating over it is fast. And since communication is already high-latency, the workers may as well do many local computation rounds to make this worthwhile.

The paper makes a big deal of the unbalanced data distribution and especially the  non-iid (non- independent & identically distributed) data at the workers. However, there is nothing special in the FedAvg algorithm to address the non-iid data challenge. It works because the SGD tolerates noise. And most likely having many workers participate at each round helps a lot. Even with C=0.1, the number of smartphones used for training can be amazingly high. In the experimental results,   100 workers are used for MNIST image classification application, and  1146 workers are used for Shakespeare dataset language modeling application. This is way more workers used in traditional ML, certainly for the MNIST and Shakespeare datasets. So the sheer number of workers helps compensate the non-iid challenge.

## Questions

Can Federated Learning bootstrap from zero-trained model?
Before reading the paper, I thought "maybe the smartphones are not initialized from random parameters, but with partially or mostly trained parameters". The experiments suggests otherwise: The smartphones are started from an untrained model. However, Figure 1 in the paper shows that the smartphones (i.e., workers) should get started with the same common initialization, because independent initialization does not converge.

How long is a round?
The experiments investigate how many rounds are needed to converge, but there is no explanation in the paper about the length of a round. In my estimation, the length of a round should be around 5 minutes, or so. In practical use, Google employs as workers the smartphones that are plugged to a wall-outlet and connected to a WI-FI. (Android phones provide this information to Google and a subset among them is randomly selected as workers. In our PhoneLab project, we also uploaded data from phones only when they were plugged in.) Since this is synchronous SGD, the round progresses with the rate of the slowest worker. So they must be using backup workers (maybe 5% or 10%) to combat the straggler problem.

FedAvg uses synchronous SGD, but would it also work with asynchronous rounds?
Unfortunately there are no experiments that investigate this question.

## Experimental results

The experiments use the MNIST dataset image classification, CIFAR 10 image classification, and Shakespeare dataset for language modeling and prediction of the next character (alphabet character that is).

The paper says this: "Our goal is to use additional computation in order to decrease the number of rounds of communication needed to train a model. The speedups we achieve are due primarily to adding more computation on each client, once a minimum level of parallelism over clients is used." They achieve this by increasing E (or decreasing B), once C is saturated (which tend to happen for C=0.1 in their setup).

Even the pathologically biased distribution of local datasets  works great because number of C is high, and provides smoothing. "Both standard SGD and FedAvg with only one client per round (C = 0), demonstrate significant oscillations in accuracy, whereas averaging over more clients smooths this out."

## Tuesday, February 21, 2017

### 1 million pageviews

My blog has recently reached 1 million pageviews. This warrants for a short retrospection.

I started the posting regularly on September 2010. I wanted to get into the cloud computing domain, so I needed to accumulate background on cloud computing work. I decided that as I read papers on cloud computing, I will post a summary to this blog. I thought if I could explain what I learned from the papers in my own words, I would internalize those lessons better. And if others read those summaries and benefit, that is an extra plus.

"Writing is nature's way of telling you how sloppy your thinking is." In fact, I learned a lot writing those paper reviews. Writing the reviews gave me a deeper understanding of the work done, beyond what I could achieve by passively reading them. Putting them on web was also a nice choice, because I could refer my students to some of these summaries when needed. And it turned out that I referred to those summaries myself very frequently to jog my memory. Since I have encoded the writing with my understanding, reading through my summary would get me refreshed about the important lessons from that work. Since my summaries were made available on the web, all I needed to do was google search for muratbuffalo and paper name.

(Side remark about my research journey: My research area at the first couple years of my PhD was distributed algorithms and self-stabilization. Then starting on 2002, wireless sensor networks has become my research area. I applied stabilizing distributed algorithms for in-network querying and tracking in wireless sensor networks. Around 2009 I started transitioning to crowdsourced sensing and collaboration using smartphones. And starting from 2010, I transitioned to large-scale cloud computing systems. Distributed systems has been the common theme through out. Here is a link to my research statement as of the end of 2016.)

Over time I included posts about my conference trips, book reviews, rants, and research advice for students. Putting research advice (reading, writing, presenting) is also beneficial because I can refer my students to it. And occasionally I receive emails from remote corners of the world about how some of these posts helped them or inspired them, and that makes me very happy for an entire day.

## Some of the big hits

The bursty traffic all came from Hacker News. The regular traffic came from many sources: Google searches, blog posts, twitter links.

Google tells me I can earn up to $18.50 per month by placing ads on my blog using AdSense. No thanks, for now. Here are the top 10 posts in my blog as of now. Looks like anything mentioning Facebook is a big hit. Deep learning is also very hot. Glad to see our hybrid logical clocks work also up there. And glad to see interest for TLA+. ## Saturday, February 18, 2017 ### Bowling your way to the top "Oh, this is very American!" I said, when I finally understood how Bowling scoring works. ## Bowling scoring is nonlinear In a bowling game, there are 10 rounds. There are 10 pins, and you get 2 shoots in each round to knock as many as you can. Even if you are novice, if you are eager and put effort in it, at each round you can knock down 6 pins. So that gives you a score of 6*10=60. If you knock down 7 pins at each round, you get a score of 70. 8 pins, you get a score of 80. 9 pins, you get a score of 90. Here is where things start to go nonlinear and you get accelerated returns. If you knock down all the 10 pins in your two shoots, this is called a spare. Your score for that round is not just 10, but the point you get from the next round is also added to it. So if you had a spare in round k, and got 7 in the next round k+1, you get 10+7 for round k, and 7 for round k+1, and in total of 17+7=24 points from these two rounds. If we were scoring this linearly, you would only get 10+7=17. If you knock down all the 10 pins in your first shoot in a round, this is called a strike. Your score for that round is not just 10, but the points you get from the next *two* rounds get added to it. If you had a strike in round k, and got 7 in round k+1 and k+2, you get 10+7+7=24 points for round k, 7 for k+1, and 7 for k+2, and a total of 38 points from these 3 rounds. If we were scoring this linearly, you would only get 10+7+7=24 from these 3 rounds. In the first game I played, I was knocking about 7 pins each round, so I thought, I should be in pretty good shape. Wrong. I got 4th place. The first two guys were experienced, they managed to hit sequences of strikes and spares, and their score grew very fast. My third place friend had managed to hit a series of spares towards the end of the game, and has beaten my score before I could understand my score was being beaten. I thought I was comfortably ahead. It is more important to hit occasional strikes and spares than hitting a constantly comfortable 7 average. So let's get back to where we left, the transition from linear to nonlinear scoring. All 8 pins at all rounds, you get a total score of 80. All 9, you get a total score of 90. All spares, you get a total score of 200, instead of 100. All strikes, you get a total score of 300, instead of 100. And that last one is called a perfect game. Here is a short video of a perfect game. ## OK so what? Why am I wasting my time and your time telling you about bowling scoring? If you were born and raised in US, you might have yawned reading through the above text. You might be taking the scoring for granted. In fact, when I realized the scoring works in a "funny" way, I asked my American friends to explain. They didn't have much previous practice explaining the scoring. One of them said, after stalling for some time, "Hmm, I realize this is the first time I am explaining Bowling scoring to someone." And this guy has played in a Bowling league for a couple years :-) After a couple more takes of explaining/questioning with a second friend, when I finally understood what is going on, I blurted: "Oh, this is very American!", which surprised my friend. If you take this scoring for granted, all I will tell you is this: "Three points for a win" is a relatively recent adoption in soccer scoring. Before that, it was 0 points for loss, 1 points for draw, and 2 points for win. And the games were so boring. Teams would go for a draw, because the prospect of gaining one extra point by putting effort into attacking was not worth risking your defensive stance which could make you lose the game and get no points. The transition to three points for a win started only after 1980 taking up to 2000 in some countries. And this led to a significant increase of average goals scored in the games. ## This is not about bowling, isn't it? Yes, you see, free markets are inherently nonlinear scoring markets. Nonlinear scoring applies especially for the current information technology markets, where "the best performers are able to capture a very large share of the rewards, and the remaining competitors are left with very little". In such a winner-take-all economy, you run the risk of being overlooked if your products are mediocre. You need to hit some strikes. This is also true in academia. Yes, you need to show that you are publishing productively, and there is some pebble counting. But in order for those pebbles to count, you need some occasional gems in between. You need to hit some strikes. It is more important to hit occasional strikes and spares than hitting a constantly comfortable 7 average. You need to think big, aim big, and go for a strike, so you can achieve nonlinear returns occasionally. ## Other related links 1. Wait a minute? Didn't I tell you a couple days ago "worse is better"? Yes, I did. But this is how I concluded that post: "Worse is better takes a simplistic/minimalist approach. Simple and minimal can be powerful, if it is not done too ugly. Is worse always better? No. As I said earlier systems design is all about tradeoffs. It is important to analyze and decide in advance what the priorities are." In fact, a worse-is-better system hits a strike in a priority dimension, such as being minimalist and going viral. On the other hand, a do-the-right-thing system may get stuck with hitting constantly comfortable of 7 average in all dimensions. 2. This nonlinear return idea also reminds me of the high-intensity interval training (HIT) idea. Tim Ferris had a very interesting interview with Prof. Martin Gibala on this. The idea in HIT is that you get accelerated returns for the short nonlinear effort you put into your training. ## Thursday, February 16, 2017 ### Mesos: A platform for fine-grained resource sharing in the data center This paper appeared in NSDI 11 and introduced the Mesos job management and scheduling platform which proved to be very influential in the big data processing ecosystem. Mesos has seen a large following because it is simple and minimalist. This reminds me of the "worse is better" approach to system design. This is an important point and I will ruminate about this after I explain you the Mesos platform. ## The problem We need to make multiple frameworks coexist and share the computing resources in a cluster. Yes, we have submachine scheduling abstractions: first the virtual machines and then containers. But we still need a coordinator/arbiter to manage/schedule jobs submitted from these frameworks to make sure that we don't underutilize or overload/overtax the resources in the cluster. ## Offer-based scheduling Earlier, I have talked about Borg which addressed this cluster management problem. While Borg (and later Kubernetes) takes a request-based scheduling approach, Mesos chooses to provide an offer-based scheduling approach. In the request-based scheduling, the frameworks provide their scheduling needs to the scheduler/controller and the scheduler/controller decides where to place the tasks and launches them. This can arguably make the design of the controller overcomplicated. The scheduler/controller may need to understand too many details about multiple frameworks in order to perform their requests adequately. This may not scale well as the number of frameworks to support grows. (And we have an abundance of big data processing frameworks.) In stark contrast, Mesos delegates the control over scheduling to the frameworks. The Mesos master (i.e., the controller) provides resource offers to the frameworks, and the frameworks decide which resources to accept and which tasks to run on them. In other words, Mesos takes a small-government, libertarian approach to cluster management :-) Since Mesos is minimalist, it is simple and nicely decoupled from the various frameworks it serves. This made Mesos go viral and achieve high-adoption. But systems design is an exercise in choosing which tradeoffs you make. Let's study the drawbacks. (I am putting on my critical hat, I will talk about the benefits of Mesos again toward the end of this post.) The long-running tasks, and the big tasks strain this offer-based scheduling model. Some frameworks may schedule tasks that can overstay their welcome, and take advantage of the too trusting and hands-off Mesos. This would be unfair to other client frameworks. (Of course the Mesos master may take into account "organizational policies such as fair sharing" when extending offers to the frameworks, and can even kill long running tasks.) Moreover, since Mesos is hands-off, it does not provide fault-tolerance support for long-running tasks, which are more likely to experience failure in their lifetimes as they run longer. Mesos punts the ball to the client frameworks which will need to carry the burden. And doing this for each client framework may lead to redundant/wasted effort. Fortunately other helper systems like Marathon emerged to address this issue and provide support for long running tasks. Even assuming that the client frameworks are not-greedy and on their best cooperating behavior, they may not have enough information about other tasks/clients of Mesos to make optimal scheduling decisions. The paper mentions that: "While this decentralized scheduling model may not always lead to globally optimal scheduling, we have found that it performs surprisingly well in practice, allowing frameworks to meet goals such as data locality nearly perfectly." Related to this problem, another very simple idea makes a cameo appearance in the paper: "We used delay scheduling to achieve data locality by waiting for slots on the nodes that contain task input data. In addition, our approach allowed us to reuse Hadoop's existing logic for re-scheduling of failed tasks and for speculative execution (straggler mitigation)." Is that too simple a technique? Well, it is hard to argue with results. The delay scheduling paper has received 1000+ citations since 2010. Delay scheduling: A simple technique for achieving locality and fairness in cluster scheduling. In EuroSys 10, 2010. ## Mesos architecture Mesos means middle or intermediate, from Greek misos. Nice name. Mesos master is ZooKeeper guarded, so a hot standby can get in and take over if the Mesos master fails. The Mesos master manages the resources by talking to Mesos slaves/workers on the machines in the cluster. This is similar to how BorgMaster manages resources talking to Borglets on the machines. So where is the scheduler in this architecture? This responsibility is punted to the client frameworks. As we mentioned above, the Mesos master provides offers to the client frameworks, and it is upto the client framework to accept an offer. Here is how things work from the client framework's perspective. Each framework intending to use Mesos needs to implement two components: a scheduler that registers with the Mesos master to be offered resources, and an executor that is launched on Mesos worker nodes to run the framework’s tasks. This table shows the callbacks and actions to implement to write the scheduler and the executor components. (To Python users, there is pyMesos to help you write the scheduler and executor components in Python.) In the resourceOffer callback, the scheduler should implement the functionality to select which of the offered resources to reject and which to use along with how to pass Mesos a description of the tasks it wants to launch on them. What if that offer became unavailable in the meanwhile? The Mesos master will then warn the Scheduler via the offerRescinded callback that the offer has been rescinded, and it is the client framework's responsibility to handle this and reschedule the job using the next offers from the Mesos master. The implementation of the scheduler gets more and more involved if the framework would like to keep track of tasks for a submitted job and provide the users of the framework this information. The scheduler gets callbacks on statusUpdate of the tasks, but it needs to piece together and track which job these tasks correspond to. For example, the scheduler gets a callback when a task is finished, and then it is the responsibility of the scheduler to check and mark a job as completed when all its tasks are finished. This scheduler/executor abstraction can also get leaky. The paper mentions this about the Hadoop port, which came to a total of 1500 lines of code: "We also needed to change how map output data is served to reduce tasks. Hadoop normally writes map output files to the local filesystem, then serves these to reduce tasks using an HTTP server included in the TaskTracker. However, the TaskTracker within Mesos runs as an executor, which may be terminated if it is not running tasks. This would make map output files unavailable to reduce tasks. We solved this problem by providing a shared file server on each node in the cluster to serve local files. Such a service is useful beyond Hadoop, to other frameworks that write data locally on each node." If you are thin like Mesos, you can (should?) add on weight later when it is warranted. ## A side remark: What is it with the "fine-grained" in the title? The title is a humble and conservative title: "Mesos: A platform for fine-grained resource sharing in the data center". The paper seems insecure about this issue, and keeps referring back to this to emphasize that Mesos works best with fine-grained short tasks. This gets peculiar for a careful reader. Well, I think I know the reason for this peculiarity. Probably the authors may have been burned before about this from an over-critical reviewer (it is always Reviewer 2!), and so they are trying to preemptively dismantle the same criticism to be aired again. This is a very solid and important paper, but I wouldn't be surprised even a paper of this caliber may have been rejected earlier and this version may be their second (or even third) submission. Reviewer 2 might have told the authors not too subtly that the paper is claiming too much credit (which is always a big pet peeve of reviewer 2), and the current version of the paper is written defensively to guard against this criticism. Oh, the joys of academia. I wouldn't be surprised if Reviewer 2 also found the paper low on novelty and suitable more for industrial research and not for academic research. ## Worse-is-better Yes, Mesos is too eager to punt the ball to the clients. But this is not necessarily a bug, it can be a feature. Mesos is thin, and gives your frameworks control over how to schedule things. Mesos doesn't step in your way and provides your frameworks low-level control over scheduling and management decisions. Mesos reminds me of the "worse-is-better" approach to system design. (Ok, read that link, it is important. I will wait.) Since Mesos is minimalist and simple it is a viral platform. (Much like MapReduce and Hadoop were.) Borg/Kubernetes aims to do "the-right-thing". They provide a blackbox cluster manager that provides a lot of features, optimal scheduling, fault-tolerance, etc. This is great if you fit into the workloads that they cater to, which covers most of the web-services workloads. But this approach may actually get in your way if you like to have low-layer control on scheduling/management decisions. I read the "worse is better" when I was a fresh graduate student in 1999 working on the theory side of distributed algorithms and self-stabilization. I was a Dijkstra fan, and this article was a real eye opener for me. It made me to question my faith :-) Worse is better takes a simplistic/minimalist approach. Simple and minimal can be powerful, if it is not done too ugly. Is worse always better? No. As I said earlier systems design is all about tradeoffs. It is important to analyze and decide in advance what the priorities are. I feel like I will pick up on this thread at another time. ## Saturday, February 11, 2017 ### Large-scale cluster management at Google with Borg This paper from Google appeared on Eurosys'15. The paper presents Borg, the cluster management system Google used since 2005. The paper includes a section at the end about the good and bad lessons learned from using Borg, and how these led to the development of Kubernetes container-management system which empowers the Google Cloud Platform and App Engine. ## Borg architecture This is the Borg. Resistance is futile. A median Borg cell is 10K machines. And all those machines in a cell are served by a logically centralized control: the Borgmaster. Where is the bottleneck in the centralized Borg architecture? The paper says it is still unclear whether this architecture would hit a practical scalability limit. Anytime Borg was given a scalability target, they managed to achieve it by applying basic techniques: caching, loose-synchronization, and aggregation. What helped the most for achieving scalability was decoupling the scheduler component from the Borgmaster. The scheduler is loosely-synchronized with the Borgmaster: it operates on a cached cached copy of the cell state and acts as a counsel/advisor to the Borgmaster. If the scheduler makes a decision that is not feasible (because it is based of an outdated state: machine failed, resource gone, etc.), the Borgmaster will not take that advice and ask the scheduler to reschedule the job this time hopefully with better up-to-date state. To provide high-availability, the Borgmaster is Paxos-replicated over 5 machines. Replicas serve read-only RPC calls to reduce the workload on the Borgmaster leader. In addition to the Paxos log, there is also periodic checkpoints/snapshots to restore the Borgmaster's state to an arbitrary point in the past. A fauxmaster can also use this functionality in debugging of the Borgmaster and scheduling performance. A Borglet is the local Borg agent on every machine in a cell. (In Mesos this corresponds to the Mesos slave, or in the new terminology the Mesos agent.) Borgmaster replica runs a stateless link shard to handle the communication with some subset of borglets. The link shard aggregates and compresses and reports only diffs to the state machines to reduce update load at the elected master. ## Jobs and tasks A job consists of many tasks (which are same binary programs). 50% of machines run 9+ tasks, and 90%ile machine has ~25 tasks and run ~4500 threads. Google's Borg workload consists of 2 main categories. Production jobs are long running services serving short user requests and they require low-latency. Batch jobs on the other hand are less-sensitive to performance fluctuations. The workload has dynamic surges: batch jobs come and go, and productions jobs have a diurnal pattern. (A representative Borg workload trace is publicly available.) Borg needs to handle this dynamic demand while providing as high utilization of the cluster machines as possible. It turns out tight-packing scheduling is not optimal for high-utilization, because it is too strict and fails to accommodate for bursty loads and misestimations from Borg clients. Instead a hybrid packing is used, which provides 5% better packing efficiency than the tight-packing/best-fit policy. Borg uses priorities for tasks. If a machine runs out of resources to accommodate its assigned tasks (e.g., due to burst in demands), lower priority tasks on that machine are killed and added to the scheduler's pending queue for re-placement. Users operate on jobs by issuing remote procedure calls (RPCs) to Borg, most commonly from a command-line tool or from other Borg jobs. To help users manage their jobs, Borg provides declarative job specification language, and job monitoring/management tools. Borg uses the concept of allocation set for a job, which corresponds to the concept of pod in Kubernetes. Task startup latency at a machine is about 25seconds, 20 sec of which is package installation time. To reduce the latency from package installation, Borg tries to schedule tasks where the packages are already available. In addition, Borg employs tree and torrent-like protocols to distributes packages to machines in parallel. Finally, Borg also tries to schedule tasks to reduce correlation of failures for a given job. Almost every task contains a builtin HTTP server that publishes health and performance info. Borg monitors the health-check URL and restarts tasks that fail to respond. ## Wednesday, January 18, 2017 ### Deep Learning With Dynamic Computation Graphs (ICLR 2017) This is a paper by Google that is under submission to ICLR 2017. Here is the OpenReview link for the paper. The paper pdf as well as paper reviews are openly available there. What a concept! This paper was of interest to me because I wanted to learn about dynamic computation graphs. Unfortunately almost all machine learning/deep learning (ML/DL) frameworks operate on static computation graphs and can't handle dynamic computation graphs. (Dynet and Chainer are exceptions). Using dynamic computation graphs allows dealing with recurrent neural networks (RNNs) better, among other use cases. (Here is a great article about RNNs and LSTMs. Another good writeup on RNNs is here.) TensorFlow already supports RNNs, but by adding padding to ensure that all input data are of the same size, i.e., the maximum size in the dataset/domain. Even then this support is good only for linear RNNs not good for treeRNNs which is suitable for more advanced natural language processing. This was a very tough paper to read. It was definitely above my level as a beginner. The paper assumed a lot of background from the reader. It assumed familiarity with TensorFlow execution and operators, and also some understanding of programming language background and familiarity with RNNs. The dynamic batching idea introduced in the paper is a complex idea but it is explained briefly (and maybe a bit poorly?) in one page. Even when I gave the paper all my attention, and tried to form several hypothesis of dynamic batching idea, I was unable to make progress. At the end, I got help from a friend who is an expert at deep learning. I skipped reading the second part of the paper which introduced a combinator library for NNs. The library is relevant because it was instrumental in implementing the dynamic batching idea introduced in the first part of the paper. This second part looked interesting but the functional programming language concepts discussed was hard for me to follow. ## The dynamic batching idea This paper introduces dynamic batching idea to emulate dynamic computation graphs (DCGs) of arbitrary shapes and sizes over TensorFlow which only supports static computation graphs. Batching is important because GPUs crave for batching, especially when dealing with text data where each item is of small size. (While images are already large enough to fill/busy the GPU, but that is not so for text data.) However, the challenge for batching when using DCGs is that the graph of operations is not static, and can be different for every input. The dynamic batching algorithm fixes batching for DCGs. Given a set of computation graphs as input, each of which has a different size and topology, dynamic batching algorithm will rewrite the graphs by batching together all instances of the same operation that occur at the same depth in the graph. (Google is really into graph rewriting.) The dynamic batching algorithm takes as input a batch of multiple input graphs and treats them as a single disconnected graph. Source nodes are constant tensors, and non-source nodes are operations. Scheduling is performed using a greedy algorithm: (I omit some of the more detailed steps in the paper.) • Assign a depth, d, to each node in the graph. Nodes with no dependencies (constants) are assigned depth zero. Nodes with only dependencies of depth zero, are assigned depth one, and so on. • Batch together all nodes invoking the same operation at the same depth into a single node. • Concatenate all outputs which have the same depth and tensor type. The order of concatenation corresponds to the order in which the dynamic batching operations were enumerated. Each dynamic operation is instantiated once in the static dataflow graph. The inputs to each operation are tf.gather ops, and the outputs are fed into tf.concat ops. These TensorFlow ops are then placed within a tf.while_loop. Each iteration of the loop will evaluate all of the operations at a particular depth. The loop maintains state variables for each tensor type t, and feeds the output of concat for tensor type t and iteration d into the input of the gathers at tensor type t and iteration d+1. ## Experimental results The test results emphasize the importance of batching, especially on GPUs where it can enable speed ups up to 120x. The speedup ratio denotes the ratio between the per-tree time for dynamic batching on random shapes ("full dynamic"), versus manual batching with a batch size of 1. Dynamic batching instantiates each operation only once, and invokes it once for each depth, so the number of kernel invocations is log(n), rather than n, where n is tree size. Dynamic batching thus achieves substantial speedups even at batch size 1, because it batches operations at the same depth within a single tree. ## Limitations Dynamic batching works on a single machine, it is not distributed. Dynamic batching requires an all to all broadcasts, so it doesn't scale to distributed machines. This Google paper doesn't cite or talk about Dynet and Chainer, but Dynet and Chainer are single machine ML/DL frameworks that support dynamic computation graphs. On one hand, Dynet & Chainer are most likely not good at batching, and the dynamic batching method here has contribution. On the other hand, since Dynet & Chainer support dynamic computation graphs natively (rather than by way of emulating it on static computation graphs like dynamic batching does), they are most likely more expressive than the dynamic batching can achieve. In fact, another limitation of the dynamic batching approach is that it requires all operations that might be used to be specified in advance. Each input/output may have a different type but all types must be fixed and fully specified in advance. ## Tuesday, January 17, 2017 ### My first impressions after a week of using TensorFlow Last week I went through the TensorFlow (TF) Tutorials here. I found that I hadn't understood some important points about TensorFlow execution, when I read the TensorFlow paper. I am noting them here fresh to capture my experience as a beginner. (As one gathers more experience with a platform, the baffling introductory concepts starts to occur obvious and trivial.) The biggest realization I had was to see a dichotomy in TensorFlow among two phases. The first phase defines a computation graph (e.g., a neural network to be trained and the operations for doing so). The second phase executes the computation/dataflow graph defined in Phase1 on a set of available devices. This deferred execution model enables optimizations in the execution phase by using global information about the computation graph: graph rewriting can be done to remove redundancies, better scheduling decisions can be made, etc. Another big benefit is in enabling flexibility and ability to explore/experiment in the execution phase through the use of partial executions of subgraphs of the defined computation graph. In the rest of this post, I first talk about Phase1: Graph construction, Phase2: Graph execution, and then I give a very brief overview of TensorFlow distributed execution, and conclude with a discussion on visualizing and debugging in TensorFlow. ## Phase1: Graph construction This first phase where you design the computation graph is where most of your efforts are spent. Essentially the computation graph consists of the neural network (NN) to be trained and operations to train it. Here you lay out the computation/dataflow graph brick by brick using TensorFlow operations and tensors. But what you are designing is just a blueprint, nothing gets built yet. Since you are designing the computation graph, you use placeholders for input and output. Placeholders denote what type of input is expected. For example, x may correspond to your training data, and y_ may be your training labels, and you may define them as follows using the placeholders. x = tf.placeholder(tf.float32, [None, 784]) y_ = tf.placeholder(tf.float32, [None, 10]) This says that x will later get instantiated unspecified number of rows (you use 'None' to tell this to TensorFlow) of 784 float32 vectors. This setup enables us to feed the training data to the NN in batches, and gives you flexibility in the graph execution phase to instantiate multiple workers in parallel with the computational graph/NN and train them in parallel by feeding them different batches of your input data. As a more advanced topic in graph construction, heads up for the variable scopes and sharing of the variables. You can learn more about them here. ## Phase2: Graph execution using sessions After you get the computation graph designed to perfection, you switch to the second phase where the graph execution is done. Graph/subgraph execution is done using sessions. A session encapsulates the runtime environment in which graphs/subgraphs instantiate and execute. When you open a session, you first initialize the variables by calling "tf.global_variables_initializer().run()". Surprise! In Phase1 you had assigned variables initial values, but those did not get assigned/initialized until you got to Phase2 and called "tf.global_variables_initializer". For example, let's say you asked b to be initialized as a vector of size 10 with all zeros "b = tf.Variable(tf.zeros([10]))" in Phase1. That didn't take effect until you opened a session, and called "tf.global_variables_initializer". If you had typed in "print( b.eval() )" in the first part after you wrote "b = tf.Variable(tf.zeros([10]))", you get an error: " ValueError: Cannot evaluate tensor using eval(): No default session is registered. Use with sess.as_default() or pass an explicit session to eval(session=sess) ". This is because b.eval() maps to session.run(b), and you don't have any session in Phase1. On the other hand, if you try print (b.eval()) in Phase2 after you call "tf.global_variables_initializer", the initialization takes effect and you get the output [ 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]. Each invocation of the Session API is called a step, and TensorFlow supports multiple concurrent steps on the same graph. In other words, the Session API allows multiple calls to Session.run() in parallel to improve throughput. This is basically performing dataflow programming over the symbolic computation graph built in Phase1. In Phase2, you can open sessions and close sessions to your heart's content. Tearing down a session and reopening a session has several benefits. This way you instruct TensorFlow runtime to forget about the previous values assigned to the variables in the computation graph, and start again with a new slate (which can be useful for hyperparameter tuning). When you close a session you release that state, and when you open a session you initialize the graph again and start from scratch. You can even have multiple sessions open concurrently in theory, and that may even be useful for avoiding variable naming clashes. An important concept for Phase2 is partial graph execution. When I read the TensorFlow paper first time, I hadn't understood the importance of partial graph execution, but turns out it is important and useful. The API for executing a graph allows the client to specify the subgraph that should be executed. The client selects zero or more edges to feed input tensors into the dataflow, and one or more edges to fetch output tensors from the dataflow. Then the runtime prunes the graph to contain the necessary set of operations. Partial graph execution is useful in training parts of the NN at a time. However, it is commonly exercised in a more mundane way in basic training of NNs. When you are training the NN, every K iterations you may like to test with the validation/test set. You had defined those in Phase1 when you define the computation graph, but these validation/test evaluation subgraphs are only included and executed every K iterations, when you ask sess.run() to evaluate them. This reduces the overhead in execution. Another example is the tf.summary operators, which I will talk about in visualizing and debugging. The tf.summary operators are defined as peripheral operations to collect logs from computation graph operations. You can think of them as an overlay graph. If you like to execute tf.summary operations, you explicitly mention this in sess.run(). And when you leave that out, tf.summary operations (that overlay graph) is pruned out and don't get executed. Mundane it is but it provides a lot of computation optimization as well as flexibility in execution. This deferred execution model in TensorFlow is very different than the traditional instant-gratification instant-evaluation execution model. But this serves a purpose. The main idea of Phase2 is that, after you have painstakingly constructed the computation graph in Phase1, this is where you try to get as much mileage out of that computation graph. ## Brief overview of TensorFlow distributed execution A TensorFlow cluster is a set of tasks (named processes that can communicate over a network) that each contain one or more devices (such as CPUs or GPUs). Typically a subset of those tasks is assigned as parameter-server (PS) tasks, and others as worker tasks. Tasks are run as (Docker) containers in jobs managed by a cluster scheduling system (Kubernetes). After device placement, a subgraph is created per device. Send/Receive node pairs that communicate across worker processes use remote communication mechanisms such as TCP or RDMA to move data across machine boundaries. Since TensorFlow computation graph is flexible, it is possible to easily allocate subgraphs to devices and machines. Therefore distributed execution is mostly a matter of computation subgraph placement and scheduling. Of course there are many complicating factors: such as heterogeneity of devices, communication overheads, just in time scheduling (to reduce overhead), etc. Google TensorFlow papers mention they perform graph rewriting and inferring of just-in-time scheduling from the computation graphs. I haven't started delving into TensorFlow distributed, and haven't experimented with it yet. After I experiment with it, I will provide a longer write up. ## Visualizing and debugging TF.summary operation provides a way to collect and visualize TensorFlow execution information. TF.summary operators are peripheral operators; they attach to other variables/tensors in the computation graph and they capture their values. Again, remember the two phase dichotomy in TensorFlow. In Phase1, you define and describe these TF.summary for the computational graph, but they don't get executed. They only get executed in Phase2 where you create a session, execute the graph, and explicitly mention to execute tf.summary graph as well. If you use the TF.summary.FileWriter, you can write the values the tf.Summary operations collected during a sess.run() into a log file. Then you can direct the Tensorboard tool to the log file to visualize and see the computational graph, as well as the how the values evolved over time. I didn't get much use from the Tensorboard visualization. Maybe it is because I am a beginner. I don't find the graphs useful even after having a basic understanding of how to read them. Maybe they get useful for very very large computation graphs. The Google TensorFlow whitepaper says that there is also a performance tracing tool called EEG but that is not included in the opensource release. ## Related links By clicking on label "mldl" at the end of the post, you can reach all my posts about machine learning / deep learning (ML/DL). In particular the series below reviews the introductory concepts in ML/DL. Learning Machine Learning: A beginner's journey Linear Regression Logistic Regression Multinomial Logistic Regression ## Thursday, January 12, 2017 ### Google DistBelief paper: Large Scale Distributed Deep Networks This paper introduced the DistBelief deep neural network architecture. The paper is from NIPS 2012. If you consider the pace of progress in deep learning, that is old and it shows. DistBelief doesn't support distributed GPU training which most modern deep networks (including TensorFlow) employ. The scalability and performance of DistBelief has been long surpassed. On the other hand, the paper is a must read if you are interested in distributed deep network platforms. This is the paper that applied the distributed parameter-server idea to Deep Learning. The parameter-server idea is still going strong as it is suitable to serve the convergent iteration nature of machine learning and deep learning tasks. The DistBelief architecture has been used by the Microsoft Adam project, Baidu Deep Image, Apache Hama, and Petuum's Bosen. Google, though, has since switched from the DistBelief parameter-server to TensorFlow's hybrid dataflow architecture, citing the difficulty of customizing/optimizing DistBelief for different machine learning tasks. And of course TensorFlow also brought support for distributed GPU execution for deep learning, which improves performance significantly. I think another significance of this paper is that it established connections between deep-learning and distributed graph processing systems. After understanding the model-parallelism architecture in DistBelief, it is possible to transfer some distributed graph processing expertise (e.g., locality-optimized graph partitioning) to address performance optimization of deep NN platforms. ## The DistBelief architecture DistBelief supports both data and model parallelism. I will use the Stochastic Gradient Descent (SGD) application as the example to explain both cases. Let's talk about the simple case, data parallelism first. ### Data parallelism in DistBelief In the figure there are 3 model replicas. (You can have 10s even 100s of model replicas as the evaluation section of the paper shows in Figures 4 and 5.) Each model replica has a copy of the entire neural network (NN), i.e., the model. The model replicas execute in a data-parallel manner, meaning that each replica works at one shard of the training data, going through its shard in mini-batches to perform SGD. Before processing a mini-batch, each model replica asynchronously fetches from the parameter-server service an update copy of its model parameters$w$. And after processing a mini-batch and computing parameter gradients,$\Delta w$, each model replica asynchronously pushes these gradients to the parameter-server upon which the parameter-server applies these gradients to the current value of the model parameters. It is OK for the model replicas work concurrently in an asynchronous fashion because the$\Delta$gradients are commutative and additive with respect to each other. It is even acceptable for the model replicas to slack a bit in fetching an updated copy of the model parameters$w$. It is possible to reduce the communication overhead of SGD by limiting each model replica to request updated parameters only every nfetch steps and send updated gradient values only every npush steps (where nfetch might not be equal to npush). This slacking may even be advantageous in the beginning of the training when the gradients are steep, however, towards converging to an optima when the gradients become subtle, going like this may cause dithering. Fortunately, this is where Adagrad adaptive learning rate procedure helps. Rather than using a single fixed learning rate on the parameter server, Adagrad uses a separate adaptive learning rate$\eta$for each parameter. In Figure 2 the parameter-server update rule is$w' := w - \eta \Delta w$. An adaptive learning with large learning rate$\eta$during convergence, and small learning rate$\eta\$ closer to the convergence is most suitable.

Although the parameter-server is drawn as a single logical entity, it is itself implemented in a distributed fashion, akin to how distributed key value stores are implemented. In fact the parameter server may even be partitioned over the model replicas so each model replica becomes the primary server of one partition of the parameter-server.

### Model parallelism in DistBelief

OK now to explain model-parallelism, we need to zoom in each model replica. As shown in the Figure, a model-replica does not need to be a single machine. A five layer deep neural network with local connectivity is shown here, partitioned across four machines called model-workers (blue rectangles). Only those nodes with edges that cross partition boundaries (thick lines) will need to have their state transmitted between machines. Even in cases where a node has multiple edges crossing a partition boundary, its state is only sent to the machine on the other side of that boundary once. Within each partition, computation for individual nodes will be parallelized across all available CPU cores.

When the model replica is sharded over multiple machines as in the figure, this is called *model-parallelism*. Typically the model replica, i.e. the NN, is sharded upto 8 model-worker machines. Scalability suffers when we try to partition the model replica among more than 8 model-workers. While we were able to tolerate slack between the model-replicas and the parameter-server, inside the model-replica the model-workers need to act consistently with respect to each other as they perform forward activation propagation and backward gradient propagation.

For this reason, proper partitioning of the model-replica to the model-worker workers is critical for performance. How is the model, i.e., the NN, partitioned over the model-workers? This is where the connection to distributed graph processing occurs. The performance benefits of distributing the model, i.e., the deep NN, across multiple model-worker machines depends on the connectivity structure and computational needs of the model. Obviously, models with local connectivity structures tend to be more amenable to extensive distribution than fully-connected structures, given their lower communication requirements.

The final question that remains is the interaction of the model-workers with the parameter-server. How do the model workers, which constitute a model-replica, update the parameter-server? Since the parameter-server itself is also distributedly implemented (often over the model replicas), each model-worker needs to communicate with just the subset of parameter server shards that hold the model parameters relevant to its partition. For fetching the model from the parameter-server, I presume the model-workers need to coordinate with each other and do this in a somewhat synchronized manner before starting a new mini-batch.

[Remark: Unfortunately the presentation of the paper was unclear. For example there wasn't a clean distinction made between the term "model-replica" and "model-worker". Because of these ambiguities and the complicated design ideas involved, I spent a good portion of a day being confused and irritated with the paper. I initially thought that each model-replica has all the model (correct!), but each model-replica responsible for updating only part of the model in parameter-server (incorrect!).]

## Experiments

The paper evaluated DistBelief for a speech recognition application and for ImageNet classification application.
The speech recognition task used a deep network with five layers: four hidden layer with sigmoidal activations and 2560 nodes each, and a softmax output layer with 8192 nodes. The network was fully-connected layer-to-layer, for a total of approximately 42 million model parameters. Lack of locality in the connectivity structure is the reason why the speech recognition application did not scale for more than 8 model-worker machines inside a model-replica. When partitioning the model on more than 8 model-workers, the network overhead starts to dominate in the fully-connected network structure and there is less work for each machine to perform with more partitions.

For visual object recognition, DistBelief was used for training a larger neural network with locally-connected receptive fields on the ImageNet data set of 16 million images, each of which we scaled to 100x100 pixels. The network had three stages, each composed of filtering, pooling and local contrast normalization, where each node in the filtering layer was connected to a 10x10 patch in the layer below. (I guess this is a similar set up to convolutional NN which become an established method of image recognition more recently. Convolutional NN has good locality especially in the earlier convolutional layers.) Due to locality in the model, i.e., deep NN, this task scales better to partitioning up to 128 model-workers inside a model replica, however, the speedup efficiency is pretty poor: 12x speedup using 81 model-workers.

Using data-parallelism by running multiple model-replicas concurrently, DistBelief was shown to be deployed over 1000s of machines in total.