Thursday, December 14, 2017

TLA+/Pluscal solution for modeling of 2-phase commit transactions

I had posted about the second TLA+ project I assigned for the distributed systems class the other day. Here is the solution to the project. The solution is pretty simple and straightforward. (This was built by modifying the /P2TCommit.tla/ at Lamport's site to add a TM and a BTM agent).

Initial definitions

Lines 12-13 set the initial RMstates for each replica and the initial TMstate.

Lines 15-18 define the canCommit and canAbort conditions. If all RMs are "prepared" (they are ready for a commit decision), the TM uses this to set tmState="commit". CanCommit is also true if there is an RM that already has "committed" state. This is possible if the TM already made the "commit" decision and an RM went ahead with it and transitioned to "committed" from "prepared". Then if the TM failed, and BTM is to make a decision, the BTM sets tmState="commit" in order to keep Consistency.

If there exists an RM with "abort" state or an RM with a "failed" state, canAbort becomes truthified, provided that there does not exist an RM with "committed" state. This Line 18 is there to prevent inconsistency with the Line 16 condition. If there is a failed node, it is possible to Abort, but not if a previous Commit was signaled and made an RM="committed".

RM modeling

Lines 37-38 show the actions RM take. Since "working" and "prepared" states are nonterminal states, the RM keeps trying an action until a terminal state is reached. The RM may call one of the macros Prepare, Decide, and Fail.

Prepare first checks that the RM is in "working" state, and if so transitions it to the "prepared" state. (In the 2-phase commit implementations, generally this happens upon a prepare-request sent by the TM, but this modeling keeps things simple and abstracts that away.)

Decide changes the RM state to "committed" if  tmState="commit", and changes it to "aborted" if tmState="abort". Note that, when Decide is called RM is either in "working" or "prepared" state. As the canCommit condition shows above tmState goes to "commit" only if all RMs are in "prepared" state, so the allowed transitions for RM is prepared->committed, prepared->aborted, and working->aborted.

Fail changes the RM state to "failed".
Don't worry about all RMs go to failed, the model checker is going to check all possible scenarios including all up, one down (at any of the dozens of possible points in computation), two down (at any of the dozens of possible points in computation).

TM modeling

The TM model is simple. The TM checks if it canCommit or canAbort and updates tmState accordingly. TM can also fail which makes the tmState="hidden". To keep things simple yet interesting, I only made the TM fail after it makes a decision. But if you notice, I put a label before the fail action. That makes these two updates nonatomic: That is, the tmState will be available for RMs to read for a duration, before the TM fails.

BTM modeling

The BTM model is very similar to the TM model. The BTM checks for tmState="hidden" before it interjects, but otherwise it makes the same decision. Of course there will be some complications, maybe an RM changed state seeing TMs decision, or maybe an RM failed, since the TMs decision.

For simplicity we assume the BTM does not fail.


Termination checks if all processes stop executing eventually. TypeOK is just a sanity check.

Consistency checks that there are no 2 RMs such that one says "committed" and other says "aborted". That was what the 2-phase commit was trying to prevent in the first place.

Model checking

Model checking with TM failure (with the BTM code commented out) led to a Termination violation as expected. That is if TM fails before the tmState is read by all RMs, the prepared RMs will block and won't be able to terminate.

When we uncomment the BTM code, the BTM is able to take over, and the Termination is also satisfied. Of course we didn't get to this flawless model all at once. The model checker found some counterexamples to my flawed models, and I fixed them. I guess this post would be more instructive if I had documented some of the mistakes I made instead of  showing only the final state of the code. Well, next time.

But that was easy

How were we able to solve this problem? Isn't this supposed to be a complicated problem in the database literature? There were a lot of attempts to make 2-phase commit fault-tolerant, fix it with a backup TM. Lamport and Gray paper on Transaction Commit says that all of those attempts had bugs in corner cases:
A non-blocking commit protocol is one in which the failure of a single process does not prevent the other processes from deciding if the transaction is committed or aborted. They are often called Three-Phase Commit protocols. Several have been proposed, and a few have been implemented [3, 4, 19]. They have usually attempted to “fix” the Two-Phase Commit protocol by choosing another TM if the first TM fails. However, we know of none that provides a complete algorithm proven to satisfy a clearly stated correctness condition. For example, the discussion of non-blocking commit in the classic text of Bernstein, Hadzilacos, and Goodman [3] fails to explain what a process should do if it receives messages from two different processes, both claiming to be the current TM. Guaranteeing that this situation cannot arise is a problem that is as difficult as implementing a transaction commit protocol.

The reason we succeeded easily is because we cheated: we assumed clean failures and perfect failure detectors. If we didn't have perfect failure detectors, this would be a mess: A node may act as if another is dead, when the other node is alive and act as if this one is dead. This asymmetry of information is the root of all evil in distributed systems.

In a future post, I plan to make the failure detectors more fuzzy in this 2PC with backup TM code and show what type of problems can arise.

Before I end this post, let me answer this hanging question: How do we deal with partial failures and imperfect failure detectors? We use Paxos to cut the Gordian Knot and act as a definitive judge to get the processes agree on the current state/configuration of the system. This is exactly what Lamport and Gray show in their Consensus on Transaction Commit to make 2-phase commit fault-tolerant. I will discuss that work also in a future blog post.

Tuesday, December 12, 2017

Paper writing woes

I am busy submitting papers for the ICDCS deadline. Today was a long day filled with just polishing the writing of the papers. At this point all I can think of is the dorodangos.

And this Peugot commercial. For no particular reason. None at all.

Sunday, December 10, 2017

Reasoning compositionally about security

Prof. Andrew Myers from Cornell visited our department at UB couple weeks ago, and gave a talk. I had taken some notes during the talk, and figured I should organize them a little and post here.

I was delighted to see Andrew had a blog. I just wish he posted more frequently. I especially liked the upgoer-five-editor composed "What I do" post, the "GashlyCode-Tinies" post, and the "Deserialization considered harmful" posts.

I like it when researchers and professors blog. Blogging gives us a different venue with an informal format to present our opinions. This enables us to provide intuitive and simpler explanations, opine open-endedly without needing to provide proof/validation, and even show our humorous and human-sides. Blogging is ideal for professing, I think.

Anyways back to Andrew's talk. The talk was about a couple recent papers
"Secure Information Flow Verification with Mutable Dependent Types" and "Verification of a practical hardware security architecture through static information flow analysis"

Compositional security enforcement

An idea that worked for compositional security enforcement in software is to  "control the flow of information throughout a computing system". The secret flow shouldn't get into public flow and leak outside. In other words, like in Ghostbusters, you must not let the secret and public information streams cross inside a component. The components that possess this property compose and by composing them together you get end-to-end security in your system. (I am not in the security field, but I had heard about this idea applied to achieve Android system security.)

Andrew had worked on this idea for software systems, and recently wondered if we can use this idea also for achieving hardware security. This is because even if you have security at the software level, if the hardware level leaks, you didn't achieve anything. And there were very interesting exploits in recent years using side-channel attacks and timing attacks to leak information from the hardware (i.e., data cache, instruction cache, computation unit, memory controller).

Secure HDLs

The idea is to develop a secure hardware description language (HDL) that uses the information flow type ideas described above to ensure that hardware is secure at design time. Chip design already uses Verilog as an HDL and synthesize chips from Verilog programs. (Chip design is still a relatively constrained domain that synthesis from high-level code is possible.) So Andrew's team add security annotations to Verilog to provide SecVerilog.

SecVerilog is essentially Verilog plus some dependent security labels. The idea is to design a chip system that doesn't leak, by modeling/verifying it in Secverilog. The security model is that the attacker sees contents of public hardware state (high/low) at each clock tick.

Using SecVerilog Andrew's team produced a formally verified MIPS processor. The static analysis overhead of SecVerilog was extremely low: it was 2 seconds for the complete MIPS processor.

Friday, December 8, 2017

TLA+/Pluscal modeling of 2-phase commit transactions

For the second project in my distributed systems class Fall 17, I assigned modeling of the two-phase transaction commit protocol. I ask students to model what happens when the initiator/transaction manager (TM) fails, how would a backup (TM) take over, and what type of problems could arise.

Here is the description of the project. I will post the solution later on.

2 phase commit

In a distributed system, a transaction is performed by a collection of processes called resource managers (RMs), each executing on a different node. The transaction ends when the transaction manager (TM) issues a request either to commit or to abort the transaction. For the transaction to be committed, each participating RM must be willing to commit it. Otherwise, the transaction must be aborted. The fundamental requirement is that all RMs must eventually agree on whether the transaction is committed or aborted.

Here is a model of 2-phase commit. (This was built by modifying the /P2TCommit.tla/ at Lamport's site to add a TM agent). I decided to stay with the shared memory model rather than the message passing model to keep the project simple. The interesting scenarios is still possible under the shared memory model.

Here are the constraints on the shared memory communication. A RM node can only read tmState and read/update its own rmState. It cannot read other RM's rmState. A TM node can read all RM nodes' rmState and read/update tmState. A BTM node can read all RM nodes' rmState and read/update tmState.

2 phase commit modeling and validation

If no faults occur, the 2-phase commit algorithm is correct. In the presence of a crash fault, however, problems can arise. In the questions below, we will use TLA+/PlusCal to explore what problems may arise, and how to properly design the protocol to overcome those problems.

Part 1.

  • Fill in the reducted PlusCal code. (The /macro Fail/ models RM failure.)
  • Add /Consistency/ and /Termination/ properties.
  • Model check /Consistency/ and /Termination/ with no failures (RMMAYFAIL=FALSE and TMMAYFAIL=FALSE). You should see no errors.
  • Model check with RM failure (RMMAYFAIL=TRUE and TMMAYFAIL=FALSE). You should see no errors. 

Part 2.

  • Model check with RMMAYFAIL=FALSE and TMMAYFAIL=TRUE. (No need to recover a failed TM.)
  • Write in the comments section, after the "==================" line, your findings/observations. Comment whether the /Termination/ property is violated by a TM failure. 

Part 3.

  • Add a backup TM process to take over if primary crashes. (Assume the BTM cannot fail. TMMAYFAIL can only affect TM not BTM.)
  • Test satisfaction of /Consistency/ and /Termination/ properties with no TM or RM failures. Make sure BackupTM terminates, so the /Termination/ property is also satisfied as well as /Consistency/ property. 
  • Model check with both TM and RM failure allowed (RMMAYFAIL=TRUE and TMMAYFAIL=TRUE). Write down your observations. (No need to recover a failed RM or TM.)

Here is the synchronized consensus problem I assigned as the first project in the class.

Here is some previous discussion/context about why I started assigning TLA+/PlusCal modeling projects in distributed systems classes.

Tuesday, December 5, 2017

Paper Summary. The Case for Learned Index Structures

This paper was put on Arxiv yesterday and is authored by Tim Kraska, Alex Beutel, Ed Chi, Jeff Dean, Neoklis Polyzotis.

The paper aims to demonstrate that "machine learned models have the potential to provide significant benefits over state-of-the-art database indexes".

If this research bears more fruit, we may look back and say, the indexes were first to fall, and gradually other database components (sorting algorithms, query optimization, joins) were replaced with neural networks (NNs).

In any case this is a promising direction for research, and the paper is really thought provoking.


Databases started as general, one-size fits all blackboxes. Over time, this view got refined to "standardized sizes" to OLAP databases and OLTP databases.

Databases use indexes to access data quickly. B-Trees and Hash-maps are common techniques to implement indexes. But along with the blackbox view, the databases treat the data as opaque, and apply these indexes blindly without making any assumptions about the data. However, it is obvious that not knowing about the data distribution leaves performance on the table. Consider this thought experiment. If the keys are from the range of 0 to 500m, it is faster to just use the key as index, rather than using a hash. This observation can be extended to other data distributions, if we know the cumulative distributed function (CDF) of the data. We can generalize by saying "CDF*key*record-size" gives us the approximate position of the record the key refers to.

Ok, so, by knowing about the data distribution, we can achieve performance gains. But now we lost reusability when we go full whitebox. We can't afford to go full whitebox, inspecting the data, and designing the indexes from scratch every time.

The paper shows that by using NNs to learn the data distribution we can have a graybox approach to index design and reap performance benefits by designing the indexing to be data-aware.

The case for applying NNs to indexing is shown over the following three index-types:

  • B-trees, which are used for handling range queries
  • hash-maps, which are used for point-lookup queries
  • bloom-filters, which are used for set inclusion checks

I will only summarize the section on how to replace the B-tree structure. For the hash maps, the learned structure is a straightforward function based on CDF of the data.


B-trees provide a hierarchical efficient index.

Why is it even conceivable to replace B-tree with an NN model? Conceptually, b-tree maps a key to a page. We can have a model that also performs key to position mapping, and for the error range, we can do a variant of binary search (or expanded ring search) to locate the page.

OK, then, how do we know min_error and max-error? We train the model with the data we have. The data is static, the NN makes a prediction and then learns from these errors. (Even simple logistic regression may work for simple distributions.)

What potential benefits can we reap by replacing B-tree with a model:

  • smaller indexes: less main-memory or L1 cache storage
  • faster lookup: as a result of smaller indexes
  • more parallelism (TPU), instead of hierarchical if statements as in B-tree.

The key insight here is to trade computation for memory, banking on the trend that computation is getting cheaper (and if you can do it on TPU/GPU you reap more benefits). The evaluation of the paper doesn't even go into using TPUs for this.

The paper includes several strategies to improve the performance of the learned index, including using a recursive model index, hierarchical models, and hybrid models. For evaluation results, please refer to the paper.

Monday, December 4, 2017

Enabling API Virtualization on Android for Platform Openness

This morning I was at Taeyeon Ki's dissertation proposal. He presented his work on enabling openness (hackability/tinkerability) in Android platform-side.

He defined openness as being able to participate in innovation and distribution, and gave SDN and FUSE are examples of openness.

He argued that mobile vendors control their platforms tightly, and this prevents tinkering the platform and obstructs innovation. Even though you can innovate at the app side, the OS/platform side is closed for tinkering. He posed the question: how can we enable anyone to easily develop distribute new platform-level functionality?

To answer this question, he presented his work in two projects: Reptor and Mimic.


Reptor is a bytecode instrumentation tool enabling api virtualization on Android. Reptor does this by intercepting calls and redirecting them to a new implementation of the method called.

Intersecting calls to methods is not straightforward. Doing just method name substitution cannot handle calls to class level features: class hierarchy, interface, abstract class, etc. So instead of replacing the method, the Reptor tool replaces entire class. Class level replacement approach works well with callbacks in Android as it ensures that typecasting is correctly handled.

One use case for Reptor is to change some platform-level components used in an app with a more suitable one depending on the region. China and Korea do not have Google Play store, they run their own app-stores for Android. So in China and Korea Google Maps (which is an integral component of Play Store) is not available. Apps there use a downgraded static web map mostly. But with Reptor, it is possible to wrap those applications and redirect the maps calls to Amazon map. Taeyeon showed a demo of this on the Airbnb app.

Reptor has been dummy-tested with 1200 popular apps from Google Play. It has been tested for 32 benchmarked apps with more in depth tests, where runtime checks performed. For this in-depth tests for showing runtime equivalence the Mimic tool described below has been used. The tests showed that Reptor achieves minimal overhead in memory, and performance.

While run-time testing via Mimic is helpful for checking if Reptor substitution at the application of one platform-component with another is "correct" with respect to the platform-level classes, better definition of "correctness" and better framing/formalization of the problem will help this work.


Mimic is an automated UI behavior comparison testing system that provides runtime behavioral compatibility check between original and instrumented app. (I think this is under submission or to appear state.)

When I heard this description, I combined this and Reptor together in my mind, and all I could think of was that the combination can be great for automating click farms for smartphone apps!

Taeyeon had something more useful and honest in mind. There are a dozen different android versions in use. A typical app posts several updates a month and the  developers struggle to test their apps with all these different environments. Mimic combined with Reptor can help perform these tests in an automated way? (Well, still sounds a lot like click farm to me :-)

Taeyon explained their follow-the-leader testing model, where Mimic replicates the user behavior at the leader to the followers.

Saturday, December 2, 2017


Open Computing Language (OpenCL) is a framework for writing programs that execute across heterogeneous platforms consisting of central processing units (CPUs), graphics processing units (GPUs), digital signal processors (DSPs), or field-programmable gate arrays (FPGAs). Heterogeneous computing refers to systems that use more than one kind of processor or cores for high performance or energy efficiency.

OpenCL views a computing system as consisting of a number of compute devices (GPUs, CPUs, FPGAs) attached to a host processor (a CPU). It defines a C-like language for writing programs. Functions executed on an OpenCL device are called /kernels/. A single compute device typically consists of several compute units, which in turn comprise multiple processing elements (PEs). A single kernel execution can run on all or many of the PEs in parallel.

In addition to its C-like programming language, OpenCL defines an API that allows programs running on the host to launch kernels on the compute devices and manage device memory. Programs in the OpenCL language are intended to be compiled at run-time, so that OpenCL-using applications are portable between implementations for various host devices.

OpenCL is an open standard maintained by the non-profit technology consortium Khronos Group. Conformant implementations are available from Altera, AMD, Apple, ARM, Creative, IBM, Imagination, Intel, NVIDIA, Qualcomm, Samsung, Vivante, Xilinx, and ZiiLABS. Although OpenCL provides an alternative to CUDA, it has some support from  NVDIA.

OpenCL is supported by Android, FreeBSD, Arch Linux, Linux, macOS, Windows operating systems.

CUDA to OpenCL translators

A prototype implementation exists for CUDA to OpenCL translator.

A 2015 paper, Bridging OpenCL and CUDA: a comparative analysis and translation, also provides CUDA-OpenCL translation as well as OpenCL-CUDA translation.

These translators, however, do not provide industry-grade and ruggedized implementations.

OpenCL for ML

OpenCL support is still underwhelming for deep learning, but it is getting better.

Recently an OpenCL port of Caffee was made available. This Caffe port was shown/evaluated for AMD chipsets, but it should also apply for ARM platforms that support OpenCL.

Thursday, November 30, 2017

What I learn from teaching

After I teach a class I am debilitated/incapacitated for about an hour. I can't do anything productive in that time. This is because teaching demands a lot of mental power (at least for me). I am sure, being an introvert doesn't help either; I need some recovery time after too much exposure. When I am lecturing though, I let go (at least after the first 5-10 minutes or so), and I don't think about myself or being introvert. After the initial warm up period, I get excited about what I am teaching, and all I can think of is the message I am trying to convey. This is actually good. When I find myself lost in the moment (or maybe that is getting into the teaching zone), I know I am doing good. I had written about this earlier on presenting: you should center on the message not on yourself. Oh wow, actually I wrote twice on that topic.

So, in that down hour after teaching, what do I do? I have regret-flashbacks about some parts of the lecture I delivered. I have remorse about how I botched some parts of the lecture and how I couldn't answer a student's question better. The reason I botch up a part of the lecture is often because I haven't internalized that part yet. The reason I wasn't able to make a smooth transition from one part to another part is often because I didn't have a better understanding/insight about how those parts fit. It is likely I was missing some context about it. Or maybe more likely that I knew the context, but I didn't internalize it well enough. There are many different levels of knowing, and the more reliable/resilient/dependable level of knowing is knowing by experience. Teaching is a way to have hands-on training in your research area. You learn through your regrets about how you couldn't explain it better. So "teaching is an integral part of a faculty/researcher's job" is a cliche for a reason: it is very true and important.

Richard Feynman wrote about teaching in several memoirs. This one is especially relevant: "If you're teaching a class, you can think about the elementary things that you know very well. These things are kind of fun and delightful. It doesn't do any harm to think them over again. Is there a better way to present them? Are there any new problems associated with them? Are there any new thoughts you can make about them? The elementary things are easy to think about; if you can't think of a new thought, no harm done; what you thought about it before is good enough for the class. If you do think of something new, you're rather pleased that you have a new way of looking at it."

And here Feynman is saying it is better not to be too methodical when teaching. I can listen to Feynman all day.

Here is relevant post of mine on teaching.
Here are some of my other posts labeled teaching.

Monday, November 27, 2017

Paper summary. Blazes: Coordination analysis and placement for distributed programs

This paper is by Peter Alvaro, Neil Conway, Joseph M. Hellerstein, and David Maier. It appears in October 2017 issue of ACM Transactions on Database Systems. A preliminary conference version appeared in ICDE 2014. 

This paper builds a theory of dataflow/stream-processing programs, which cover the Spark, Storm, Heron, Tensorflow, Naiad, TimelyDataflow work.

The paper introduces compositional operators of labels, and shows how to infer the coordination points in dataflow programs. When reading below, pay attention to the labeling section, the labels "CR, CW, OR, OW", and the section on reduction rules on labels.

To figure out these coordination points, the Blazes framework relies on annotations of the dataflow programs to be supplied as an input. This is demonstrated in terms of a Twitter Storm application and a Bloom application.

The paper has many gems.  It says at the conclusion section in passing that when designing systems, we should pay attention to coordination locality, not just data-access locality. (Maybe often the two are related/correlated, and this leads people to confuse them.) It also makes practical lessons about how to organize your dataflow programs: move replication upstream, caching downstream. I wish the paper elaborated more on these points at the conclusion.

I am not done with the paper yet. It is a 31 page paper, packed with information. I gave it a first pass. I think I will be reading this paper again and keep thinking on it.

In my summary below I borrow from the paper figures and a lot of text with small edits.


The key idea in Blazes is that even when components are order-sensitive, it is often possible to avoid the cost of global ordering without sacrificing consistency. In many cases, Blazes can ensure consistent outcomes via *sealing* which indicates when partitions of a stream have stopped changing. (Is "sealing" like "watermark" in the Google streaming paper?)

Some contributions of the Blazes work are:

  • The paper shows how to analyze the composition of consistency properties via a term-rewriting technique over dataflow paths.
  • The paper demonstrates how the semantics of components can be modeled using a system of programmer-supplied annotations (graybox approach). 
  • The paper discusses two alternative coordination strategies, ordering and sealing, and shows how to automatically generate application-aware coordination code that uses the cheaper sealing technique in many cases.

Dataflow consistency

Look at these anomalies. Just look at them. This figure should have come with a trigger warning for distributed systems developers.

But it is possible to go to the first-principles and cut the Gordian knot. If we could ensure that every component in a dataflow produces a deterministic set of outputs regardless of the order in which its inputs are delivered, then we could prevent all of these anomalies.

There are two approaches to doing that.

*Consistency via Coordination*:  A sufficient strategy for preventing the anomalies (or arbitrary components) is to remove the nondeterminism in delivery order by coordinating the (otherwise asynchronous) processing of inputs by using a coordination system such as ZooKeeper. This is a general (independent of semantics of computation) approach but imposes throughput and latency costs.

*Consistency via Component Properties*: A dataflow component is *confluent* if it produces the same set of outputs for all orderings of its inputs. At any time, the output of a confluent component (and any redundant copies of that component) is a subset of the unique, final output.

In an ideal world, we would build dataflows out of strictly confluent components. Unfortunately this is an infeasible assumption. When we add an order-sensitive component to an otherwise confluent dataflow, how can we avoid falling back on a global coordination strategy? This is the question the paper investigates.

Remark: Note that confluent is a stronger property than convergent. Convergent replicated components are guaranteed to eventually reach the same state, but this final state may not be uniquely determined by component inputs. As Figure 5 indicates, convergent components allow cross-instance nondeterminism, which can occur when reading "snapshots" of the convergent state while it is still changing. Consider what happens when the outputs of a convergent component (e.g., GETs posed to a key/value store)  flow into a replicated stateful component (e.g., a replicated cache). If the caches record different stream contents, then the result is replica divergence.

Annotated dataflow graphs

The *CR* annotation indicates that a path through a component is confluent and stateless; that is, it produces deterministic output regardless of its input order, and the path does not modify the component's state.

*CW* denotes a path that is confluent and stateful. That is, calls to the API may modify the internal state of the component, but the final state is determined only by the input contents and not their order.

The annotations *OR_gate* and *OW_gate* denote non-confluent paths that are stateless or stateful, respectively.

The *Seal_key* annotation means that the stream is punctuated on the subset key of the stream's attributes.

The label *Async* corresponds to streams with deterministic contents whose order may differ on different executions or different stream instances. Streams labeled *NDRead* may exhibit cross-run nondeterminism, having different contents in different runs, or cross-instance nondeterminism within a single run if the system is replicated. Finally, streams labeled *Taint* may exhibit persistent nondeterministic contents, or replica divergence in replicated systems.

Reduction relation

Combination rules model what happens when multiple input streams contribute to a single output stream in a dataflow. Taint is the strongest label: if any of the paths contributing to an output stream can exhibit permanent nondeterministic contents, then so can that output stream. Async is the weakest, and will be dominated by any labels with which it combines.

Coordination selection

Blazes will repair nonconvergent and nonconfluent dataflows by constraining how messages are delivered to individual components. When possible, Blazes will recognize the compatibility between sealed streams and component semantics, synthesizing a seal-based strategy that avoids global coordination. Otherwise, it will enforce a total order on message delivery, say using ZooKeeper.

*Sealing Strategies.* To determine that the complete partition contents are available, the consumer must a) participate in a protocol with each producer to ensure that the local per-producer partition is complete, and b) perform a unanimous voting protocol to ensure that it has received partition data from each producer. The voting protocol is a local form of coordination, limited to the "stake holders" contributing to individual partitions. When there is only one producer instance per partition, Blazes need not synthesize a voting protocol. Once the consumer has determined that the contents of a partition are (henceforth) immutable, it may process the partition without any further synchronization.

Blazes framework

Blazes can be directly applied to existing programming platforms based on distributed stream or dataflow processing, including Twitter Storm and Spark Streaming. Programmers of stream processing engines interact with Blazes in a graybox manner: they provide simple semantic annotations to the blackbox components in their dataflows, and Blazes performs the analysis of all data ow paths through the program.

Case study: Word count example

The Storm word-count dataflow can be reduced to Taint in the following way once we supply an Async label for its input interface.

If, on the other hand, the input stream is sealed on batch, then Blazes instead produces this reduction:

Because a batch is atomic (its contents may be completely determined once a punctuation arrives) and independent (emitting a processed batch never affects any other batches), the topology will produce deterministic contents in its (possibly nondeterministically ordered) outputs, a requirement for Storm’s replay-based fault-tolerance—under all interleavings.

Figure 20 shows that the sealed solution outperforms the ordering based solution significantly as the number of workers increase.


Recently I had summarized a computational model for TensorFlow. It was a preliminary basic attempt at creating an operational semantics for TensorFlow programs. It would be interesting to compare that with the Blazes approach, as they are solving related problems.

The TLA+ approach seems to be more fine granularity and can do refinement checks between different models. But it is preliminary so far. The Blazes approach can handle components. It looks like it is more coarse granularity but can be applied easier.

Saturday, November 25, 2017

On dataflow systems, Naiad and Tensorflow

The below definition for dataflow programming is from Wikipedia (with some small edits):
"Traditionally, a program is modeled as a series of operations happening in a specific order; this may be referred to as sequential, procedural, or imperative programming. The program focuses on commands for programming, where data is normally /at rest/.

In contrast, dataflow programming emphasizes the movement of data; it models a program as a series of connections with explicitly defined inputs and outputs, and connect operations. An operation runs as soon as all of its inputs become available. Thus, dataflow languages are inherently parallel and can work well in large, decentralized systems."

Some examples of dataflow frameworks are map-reduce, Spark, Storm & Heron, GraphX, GraphLab, Naiad (now implemented in Rust as Timely-Dataflow), and Tensorflow.

Map-Reduce uses a very simple directed-acyclic-graph (DAG) with only two operations: map and reduce. Spark extends map-reduce to a couple dozen operations in addition to map and reduce, and implements data storage/processing to be in-memory. More specifically, in Spark, a computation is modeled as a directed acyclic graph (DAG), where each vertex denotes a Resilient Distributed Dataset (RDD) and each edge denotes an operation on RDD. RDDs are collection of objects divided in logical partitions that are stored and processed as in-memory, with shuffle/overflow to disk.

Twitter's Storm and Heron are dataflow stream-processing frameworks. In Storm (and its subsequent modular implementation Heron), bolts and spouts corresponds to dataflow operations and streams.

I had written summaries about GraphX and GraphLab approaches to dataflow as well.

In this post, I will focus more on Naiad (and timely-dataflow), and try to compare/contrast that with TensorFlow.

Naiad, timely-dataflow, and differential-dataflow

Naiad introduced a dataflow programming framework that allows cycles in the dataflow graph. Naiad's dataflow model supports structured loops allowing feedback in the dataflow, stateful data flow vertices capable of consuming and producing records without global coordination, and notifications for vertices once they have received all records for a given round of input or loop iteration. Here is my summary of Naiad from 2014.

Frank McSherry has been singlehandedly (I don't know, maybe he uses both his hands) implementing Naiad in Rust for the last couple years, and calls the project timely-dataflow. He also has another project that provides implementation of differential dataflow in Naiad using timely-dataflow on Rust.

Differential dataflow (i.e., incremental dataflow) means streaming iterative computation and doing incremental computation only in response to changing data. "If you change one input to your computation, you would prefer not to re-evaluate the entire computation. Rather, changes to the input produces some changes in the output of your first dataflow operator, which are then changes to the inputs of subsequent operators. Eventually these changes either evaporate or spill out the end of your dataflow as changed outputs. As changes flow around the dataflow, they happen at various logical times. These times reflect the epoch of streaming input data, and iteration counters for any loops they may be contained in. Rather than eagerly aggregate these changes, we leave them disaggregated, allowing future computation to use arbitrary subsets of the changes. Although we leave ourselves more work to do for each change, we end up with an orders-of-magnitude reduction in the numbers of changes."

Theres is a technical distinction between differential and incremental. Incremental works with "sequence of arbitrary updates" whereas differential works with  "partial order of arbitrary updates". The former has been around a while, has lots of prior art, etc. Differential is pretty new and while more general is pretty similar in most cases (e.g. streaming join is the same for both of them; streaming join in a loop is only possible in differential).

Naiad versus TensorFlow

TensorFlow is a special instantiation/application of Naiad's timely-dataflow model of cyclic dataflow processing for the domain of machine learning and specifically deep-learning. It is well integrated with tools, Python, GPUs, etc. Here is my summary of TensorFlow if you need a refresher.

I think TensorFlow traded off generality of Naiad and gained much more in return. This is what some people call "addition by subtraction".

Naiad aimed to satisfy real-time querying. TensorFlow is OK with batch training. TensorFlow can still output a result at the end of each minibatch, not a big deal. The real-time querying requirement in Naiad may be an unrealistic and unnecessarily/impractical requirement to shoot for in practical systems. Who knows. Are there practical applications that require tight real-time querying with the most recent updates reflected in the view? (More on this, at the end, on the applications discussion.)

Naiad aimed at precisely correct output. If something changes slightly, Naiad will recompute effected things (incrementally, yay) and give you the correct output. On the other hand, if something changes, TensorFlow will consider the changed things as a new minibatch to train with, as a result of which not much may change. Machine Learning and Deep Learning tolerate uncertainty anyhow. Minibatches are how TensorFlow handles differential/incremental dataflow implicitly. No complicated mechanism is required. If things change, that is another minibatch to train for, and most of the previously computed model parameters may remain unaffected.

(Naiad's differential/incremental processing imposes extra memory requirements. It is unclear to me how much state needs to be held at the stateful-operations about the input-graph so that incremental processing is still possible.)

Finally, Naiad has a join operation a very strong/capable yet a costly operation. TensorFlow does not have a join operation, but that makes TensorFlow operations easier to shard/partition across machines. Maybe TensorFlow doesn't have join operations because its inputs are assumed to be independent. Naiad assumes inputs can be dependent: like parts of a graph, or update to an earlier graph, streaming into Naiad. So Naiad has a join operator but pays the cost for that powerful operator.

Naiad's generality should open it up for more potential applications. Differential dataflow is ideal for graph algorithms that update the graph or operates on a dynamic graph. This should be useful for search engines; when the internet graph and links update, you shouldn't recompute the entire result in batch. I don't know what Google uses most recently, but Google has been using an incremental transactional system leveraging BigTable, called Percolator, to solve this problem, without needing the differential dataflow computing power of Naiad.

Other application areas for Naiad's computing power could be social network applications and recommendation systems. But it seems like social network applications do not need to be very precise, approximate answers are OK with them. And recommendations systems do not need very tight real-time constraints yet.

Friday, November 24, 2017

Two paper summaries on scheduling in heterogenous computing

Today, I have two short paper summaries from HotCloud'17 on heterogenous computing.

Heterogeneous GPU reallocation

This paper appeared in HOTCloud'17, and the authors are James Gleeson and Eyal de Lara, University of Toronto.

It looks like they have developed a GPU virtualization tool called Crane recently. "General purpose GPU (GPGPU) computing in virtualized environments leverages PCI passthrough to achieve GPU performance comparable to bare-metal execution. However, GPU passthrough prevents service administrators from performing virtual machine migration between physical hosts. Crane is a new technique for virtualizing OpenCL-based GPGPU computing that achieves within 5.25% of passthrough GPU performance while supporting VM migration. Crane interposes a virtualization-aware OpenCL library that makes it possible to reclaim and subsequently reassign physical GPUs to a VM without terminating the guest or its applications. Crane also enables continued GPU operation while the VM is undergoing live migration by transparently switching between GPU passthrough operation and API remoting." Here is the video of conference presentation at SYSTOR'17.

The HotCloud paper is a position paper motivating the use of the Crane tool. They say GPU workloads underutilize GPU device memory (using less than 37%). So there is an opportunity to reallocate heterogenous GPUs within existing VMs, upgrading and downgrading them. They employ Crane to support reallocation of heterogenous GPU models within a VM and also by allowing live migration of VM to a new host.

This may be something that can complement the Zorua GPU virtualization approach. 

The HCl scheduler: Going all-in on Heterogeneity

This paper appeared in HOTCloud'17 and is authored by Michael Kaufmann, IBM Research Zurich, Karlsruhe Institute of Technology; Kornilios Kourtis, IBM Research Zurich.

The paper builds a new scheduler HCl that tries to fully model and support resource heterogeneity. HCl operates on 2 annotated DAGs: one representing the Spark application, and one representing the available hardware resources.

HCl uses task runtime estimations for each resource to find the optimal mapping of a task, and estimate future availability of resources. It also tries to estimate the cost of data transfer and includes this as a factor in where to schedule tasks. It uses lookahead scheduling to place parent tasks closer to their children's optimal resource instead of only placing child tasks closer to their parents. Finally, it performs path balancing and tries to balance the execution times of converging paths in a DAG such that the waiting time for tasks at a convergence points is minimized.

This is a complicated scheduling algorithm, and would likely have problems scaling as the size of tasks and heterogenous resources grow. Scheduling needs to be done quickly, and these calculations will take time. The evaluations don't test at scale.

There is not much detail about HCl implementation in the paper, but it looks like HCl is not a Mesos framework, it is a separate scheduler.  It must be more like Borg, where the scheduler gets to see not only resources but also all the tasks.

Tuesday, November 21, 2017

Paper summary: A Computational Model for TensorFlow

This paper appeared in MAPL 17. It is written by Martin Abadi, Michael Isard, and Derek G. Murray at Google Brain. It is a 7-page paper, and the meat of the paper is in Section 3.

I am interested in the paper because it talks about TLA+ modeling of TensorFlow graphs and uses that for creating an operational semantics for TensorFlow programs. In other words, the paper provides a conceptual framework for understanding the behavior of TensorFlow models during training and inference.

As you recall, TensorFlow relies on dataflow graphs with mutable state. This paper describes a simple and elementary semantics for these dataflow graphs using TLA+. The semantics model does not aim to account for implementation choices: it defines what outputs may be produced, without saying exactly how. A framework of this kind does not just have theoretical/academical value; it can be useful to assess correctness of TensorFlow's dataflow graph (symbolic computation graph) rewriting optimizations such as those in XLA.

TensorFlow refresher

In TensorFlow, dataflow graphs support both training and inference. That is, a computation may perform one or more steps of training for a machine-learning model, or it may be the application of a trained model. TensorFlow models are assembled from primitive operations by function composition. The operations are implemented by kernels that can be run on particular types of devices (for instance, CPUs or GPUs).

In addition to edges for communicating tensors, a graph may include control edges that constrain the order of execution. This order can affect performance as well as the observable semantics in the presence of mutable state.

A client typically constructs a graph using a front-end language such as Python. Then the client can make a call to run the graph, specifying which inputs to "feed" and which outputs to "fetch". TensorFlow propagates the input values, repeatedly applying the operations prescribed by the graph, until no more nodes can fire. The order in which nodes fire is constrained by data dependencies and control edges, but is not necessarily unique. The execution ends with values on the graph's output edges. Often, a graph is executed multiple times. Most tensors do not survive past a single execution of the graph. However, mutable state does persist across executions.

Core Computational Model 

This section presents TLA+ modeling of the TensorFlow.

TensorFlow values include tensors and variables. The TLA+ model distinguishes three kinds of edges: tensor edges, variable edges, and control edges.

Operations are of several kinds: Functions, Var(x), Read, Assign. A TensorFlow program consists of a directed acyclic graph G, plus a mapping (a "labelling") L from nodes of G to TensorFlow operations.

A TensorFlow program starts with non-EMPTY input edges, consumes the values on those edges, and repeatedly propagates them, applying operations, until no more nodes can fire. In the course of such an execution, each node fires exactly once, and the execution ends with non-EMPTY output edges. The order in which nodes fire is not necessarily unique: determinism is not always expected or desired. For example, lock-free stochastic gradient descent is a common source of intentional race conditions.

Each change of state in the behavior is caused by the execution (i.e., the firing) of exactly one node in the graph. A condition for whether a node n can cause a change from a state s to a state s′ is that for all its incoming control edges d, InTransit(d) = GO in s and InTransit(d) = EMPTY in s′, and for all its outgoing control edges e, InTransit(e) = GO in s′. Moreover, InTransit(d) must be the same in s and in s′ for all edges d not incident on n, and VarValue(x) must be the same in s and in s′ for all variables x, except in the case where L(n) = Assign-f for some function f.

Figure 3 shows a computation that *assign-adds* x:=x+D and x:=x+E. The *assign-adds* commute for x, but *writes* alone do not commute and leads to a
last-write-wins race condition, as in Figure 4.

A race condition can be avoided by other means, such as implementing mutexes using TensorFlow control primitives.

Further work 

The paper claims that this semantics can be generalized to accommodate cyclic execution graphs in TensorFlow. In that generalization, instead of firing exactly once, each node fires at most once in each "execution context", where an "execution context" identifies an iteration of each (possibly nested) loop that contains the node.

Monday, November 20, 2017

The YouTube clips I show in my distributed systems class

I made it a tradition to start my CSE 4/586: Distributed Systems lectures with a short YouTube video. As the students trickle into their seats, I play a YouTube video related to the lecture of the day.

Video is an underutilized medium in teaching. I don't understand why more professors do not use them. They are visual and they wake the students up, and get them interested in the class. After the students are primed with the video clip about the context of the lecture, they listen the class more attentively. I get overwhelmingly positive feedback from the students over many years about this practice.

Here are some of the videos I show in my class.

This is the video I show in my first class when I introduce the definition of a distributed system. I tell them a murmuration of starlings can be considered a distributed system, because the definition fits: A collection of autonomous nodes communicating to address a problem collectively, with no shared memory, no common physical clock, and geographical separation.  Come to think of it, maybe this game of life video is more relevant, because it clearly lays out the 4 simple local rules that create the amazing emergent behavior.

In the first couple weeks of the class I teach about specifying and reasoning about the safety and progress properties of distributed programs. Some students find this short forage into predicate logic and computation model tedious. So to motivate them I tell them that this understanding of program properties and reasoning about them will get very handy as they learn more complicated algorithms soon. I tell them to trust me on this: /before they can do karate, they need to learn about some tedious wax on and wax off/. Then I show them scenes from Karate Kid, an awesome movie from my childhood. The movie is from 1984, and recently I came to the sobering realization that it is older than my students in the class.

When I teach about the distributed system computational model, I show a short clip from a lecture by Leslie Lamport, a forefather of distributed systems.

When I teach about examples on reasoning about distributed programs, I show them inevitably some unrelated clips. But I believe these clips are extremely helpful to familiarize the students with the CSE field. Some of my favorites are: Jeff Bezos (CEO Amazon) talks about Internet entrepreneurship
and about Xerox PARC and the origins of the PC.

When I introduce logical time in distributed systems, of course, I show a clip from Genius: The life of Albert Einstein.

When I talk to them about mutual exclusion, I show them a video of toy train track mutual exclusion. I follow that up with a traffic intersection mutual exclusion. And when I talk about dining philosophers, I show a simulation of dining philosophers.

For consensus, I don't have anything good to show. Instead I bring a couple students to the front of the class and we re-enact the attacking generals problem with messengers running between the two generals.

For chain replication, I show a video about DNA replication. Maybe unrelated, but super interesting.

For failure detectors, I use this skit from Monty Python. This may be the most appropriate use of a Monty Python skit in a distributed systems class.

When I teach them about fault tolerance, I show them this clip about resilience. When I tell them about the concept of masking of fault-tolerance and robust-yet-fragile concept, I show them about this video. In this earlier post, I explain the concept and the relevance of this clip. 

I have been using some of these videos for 4-5 years now. So, if you have suggestions for alternatives, I will be happy to upgrade my YouTube video curation.

Saturday, November 18, 2017

Paper summary. Dynet: The Dynamic Neural Network Toolkit

The programming model that underlies several popular toolkits such as TensorFlow uses a static declaration approach: they separate declaration and execution of the network architecture.

Static declaration has a number of advantages. After the computation graph is defined, it can be optimized in a number of ways so that the subsequent repeated executions of computation can be performed as quickly as possible. This also simplifies distribution of computation across multiple devices, as in TensorFlow. But static declaration is inconvenient for the following:

  • variably sized inputs
  • variably structured inputs
  • nontrivial inference algorithms
  • variably structured outputs

Of course, it is possible to process variable sized inputs if the computation graphs can represent objects whose size is unspecified at declaration time. Flow control operations such as conditional execution and iteration can be added to the inventory of operations supported by the computation graph. For example, to run an RNN over variable length sequences, Theano offers the scan operation, and TensorFlow offers the dynamic RNN operation.

While it is therefore possible to deal with variable architectures with static declaration in theory, that still poses some difficulties in practice:

  • Difficulty in expressing complex flow-control logic
  • Complexity of the computation graph implementation
  • Difficulty in debugging

These are associated with some serious software engineering risks. As an alternative, DyNet proposes reviving an alternative programming model: dynamic declaration of computation graphs.

Dynamic declaration

The dynamic declaration model in Dynet takes a single-step approach: the user defines the computation graph programmatically as if they were calculating the outputs of their network on a particular training instance. There are no separate steps for definition and execution: the necessary computation graph is created, on the fly, as the loss calculation is executed, and a new graph is created for each training instance. (To avoid the overhead, DyNet strives to provide very lightweight graph construction.)

Dynamic declaration reduces the complexity of the computation graph implementation since it does not need to contain flow control operations or support dynamically sized data. DyNet is designed to allow users to implement their models in their preferred programming language (C++ or Python). A symbolic computation graph is still constructed, but by using the host language (C++ or Python) rather than providing them separately at the computation graph level. Thus, dynamic declaration facilitates the implementation of more complicated network architectures.

What is the innovation in DyNet?

DyNet aims to minimize the computational cost of graph construction in order to allow efficient dynamic computation. This way DyNet aspires to remove barriers to rapid prototyping and implementation of more sophisticated applications of neural nets that are not easy to implement in the static computation paradigm.

DyNet's backend, which is written in C++, is optimized to remove overhead in computation graph construction, and support efficient execution on both CPU and GPU. This is feasible to do. Since flow control and facilities for dealing with variably sized inputs remain in the host language (rather than in the computation graph, as is required by static declaration), the computation graph needs to support fewer operation types, and these tend to be more completely specified (e.g., tensor sizes are always known rather than inferred at execution time).

DyNet programs

DyNet programs follow the following template
1. Create a Model.
2. Add the necessary Parameters and LookupParameters to the model. Create a Trainer object and associate it with the Model.
3. For each input example:
(a) Create a new ComputationGraph, and populate it by building an Expression representing the desired computation for this example.
(b) Calculate the result of that computation forward through the graph by calling the value() or npvalue() functions of the final Expression
(c) If training, calculate an Expression representing the loss function, and use its backward() function to perform back-propagation
(d) Use the Trainer to update the parameters in the Model

In contrast to static declaration libraries such as TensorFlow, in DyNet the "create a graph" step falls within the loop. This has the advantage of allowing the user to flexibly create a new graph structure for each instance and to use flow control syntax (e.g., iteration) from their native programming language.

Here is an example program.

This program shows the process of performing maximum likelihood training for a simple classifier that calculates a vector of scores for each class it will be expected to predict, then returns the ID of the class with the highest score.  Notice that, at line 14: symbolic graph is defined dynamically, at line 15: forward pass is executed, and at line 16: backward pass automatic diff is executed. At line 19, after the training, inference is done. To account for dynamic input/graphs at inference, the graph is reconstructed for each serving input.

Dynet allows dynamic flow control at the inference time easily. This can allow the classifier to avoid wasting processing time when the answer is clear. It is also possible to perform dynamic flow control at training time, and this supports more sophisticated training algorithms using reinforcement learning. These algorithms require interleaving model evaluation and decision making on the basis of that evaluation.

How do we make DyNet distributed

Dynet is currently centralized. There is also support for automatic mini-batching to improve computational efficiency, taking the burden off of users who want to implement mini-batching in their models. For more complicated models that do not support mini-batching, there is support for data-parallel multi-processing, in which asynchronous parameter updates are performed across multiple threads, making it simple to parallelize (on a single machine) any variety of model at training time.

Petuum Inc. is working on extending this parallelism from single machine to multiple machines data-parallel processing, by using Poseidon machine-learning communication framework.

Thursday, November 16, 2017

Spring 18 seminar, "blast from the past" edition

It is a bad sign if the references section of a paper fails to cite any recent papers. That means, either the authors are not aware of any recent work on the area, or the area is not of interest to other researchers.

But it is also a bad sign if the references section of a paper fails to cite any old papers. That means, the authors likely do not know enough about the fundamental/foundational work in the area.

There is a case to be made for working from the fundamentals, the first principles. Elon Musk made that case in his work, and showed that you can make transformative work, even in the commercial technology world, by working from the first principals.

Working from the first principles is also essential for research. It is not uncommon to get your best ideas when preparing for a class. Sometimes reviewing fundamental work in a topic, you notice a gap, some weird under-explained assumption, and go "huh, why is it that way". Or sometimes the students (or outsiders of the field) ask a basic question from the left field, and that may start an investigation. And sometimes, you see the old idea/algorithm as a promising/useful fit in new emerging applications. Recently the flexible quorums extension to Paxos is a good example of working from first principles. Nobody expected that result after 30 years of Paxos.

Back to the seminar

Every Spring, I teach a distributed systems seminar where we cover recent interesting papers in the field. But this Spring, I think I will experiment with a special "blast from the past" edition.

Reading these golden oldies could be a chance to revisit the fundamentals of the field. When reading these papers, we can note about how they aged (which parts aged well, which not) and how the context changed from then to now. We can use Google Scholar to investigate how these papers got cited in the intervening years. We can also consider if some of these algorithms can find new applications in modern systems.

We run our distributed systems seminars to include discussion and group work sessions, so I am sure we will be able to come up with new insights/perspectives about these papers with the advantage of hindsight.

But I am not sure what the selected papers will be yet. Here is my first brain dump on this. It may take several weeks to finalize the list. If you have suggestions, please let me know in the comments. What should be the cutoff date for these papers? 2000 seems to be a reasonable cutoff date. We may even stretch this up to 2007 to include a paper or two.
  1. Lamport. Time, clocks, and the ordering of events in a distributed system, 1978.
  2. Lampson & Sturgis, Crash Recovery in a Distributed Data Storage System, 1979 
  3. Dijkstra. Self-stabilization in spite of distributed control, 1982.
  4. Ben-Or, "Another Advantage of Free Choice: Completely Asynchronous Agreement Protocols" 1983.
  5. Rabin, "Randomized Byzantine Generals" 1983.
  6. Oki & Liskov, Viewstamped replication: A new primary copy method to support highly-available distributed systems, 1988.
  7. Consensus in the presence of partial synchrony, Dwork, Lynch, Stockmeyer, 1988. 
  8. Some UNITY papers from Chandy & Misra.
  9. Birman, Virtual Synchrony paper and debate.
  10. Andrew File System: Scale and performance in a distributed file system, 1988 
  11. Schneider, "Implementing fault-tolerant services using the state machine approach: a tutorial", 1990.
  12. Awerbuch and Peleg, "Sparse Partitions", 1990.
  13. Arora, Gouda. Distributed reset, 1990.  Closure and convergence: A foundation of fault-tolerant computing, 1993. 
  14. Herlihy & Moss, "Transactional Memory: Architectural Support for Lock-Free Data Structures", 1993.
  15. Plan 9 from Bell Labs, 1995 
  16. Lampson, "How to Build a Highly Available System Using Consensus", 1996. 
  17. Chandra, Toueg "Unreliable Failure Detectors for Reliable Distributed Systems", 1996.
  18. Flexible update propagation for weakly consistent replication, 1997. 
  19. Afek & Dolev, "Local stabilizer" 1997.  
  20. Cluster-Based Scalable Network Services, 1997. 
  21. Scalable, distributed data structures for internet service construction, 2000. 
  22. Rosenblum and Garfinkel. Virtual Machine Monitors: Current Technology and Future Trends, 2005. 

Werner Vogels's blog has a "Back to the Basics Reading" label, which includes interesting papers.

Tuesday, November 14, 2017

Book review: The Growth mindset

I had read this book a couple years ago . This was a fulfilling book. It is written by an eminent Stanford psychiatrist Prof. Carol Dweck. You can listen to her speak about the ideas in the book here.

The growth mindset is best understood with its antithesis: the fixed mindset. The fixed mindset says that your abilities and capacity are predetermined at birth. The growth mindset says they can be altered and improved.

So, why is this important? This sounds like just an issue of perspective. But a perspective change (a paradigm shift) is capable of changing a lot of things.

The book argues that fixed mindset thinking leads people to play defensive. The fixed mindset people are scared of failures and embarrassment as they would show to the world their capacity (limited and unchanged). So in order  not to fail and save face they don't attempt things.

In contrast, the growth mindset people (i.e., the people who embraces the growth mindset/perspective) love challenges. They are not scared of failure and embarrassment. They may chalk up failure and embarrassment as learning and improvement opportunities. Of course this does not mean that they can absolve themselves of any responsibilities or guilt about their failings/shortcomings. They need to accept the pain in order to grow: no pain, no gain. This just means that the growth mindset people are not affixed to the fixed capabilities and skills defining their self worth. They know they can grow. The growth mindset is akin to taking an antifragile approach to personality and life.

The fixed versus growth mindset has several implications in education, parenthood, relationship, sports-coaching. One practical advise (you have certainly heard this from media/blogs/newspapers) is about how to praise your kids. Don't praise your kids about how smart they are as this would reinforce the fixed mindset. Praise them about how hard they try, how persistent they are,  how courageous they are to attempt/try new challenges. This way you reinforce the growth mindset in them.

One of the biggest harms we can do to our gifts is to raise them with the fixed mindset. And one of the biggest gifts we can give to our kids is to raise them with the growth mindset. Don't just teach them how to fish. Teach them the concept/notion that they can always become much better fisherman than you were and they currently are.

Sunday, November 12, 2017

Book review. The Undoing Project: A friendship that changed our minds

I have recently read this 2017 book about the collaboration between two psychologists, Amos Tversky and Daniel Kahneman. The book is by Michael Lewis, a truly great storyteller. From his pen, the academic collaboration story of two social scientists becomes a love story and a thriller. I wouldn't ever imagine social science and behavioral economics could be this exciting. So I plan to read more of Michael Lewis's books: Flash Boys, The Big Short, Moneyball, The Blind Side, and Liar's Poker.

Here are some short notes from the book.

Danny Kahneman had a very tough childhood. His family survived (except the father) through Nazi prosecution and World War 2, and were able immigrate to Israel in 1948. He was a gifted child and starred in academia, although through out his life he was always had doubts about his talents and was always unsure of himself.

Amos Tversky was born in Israel and served in the Israel army for many years. He got educated at US for graduate school in psychology. He was very bright, which led to his colleagues coining an IQ test: "The faster you realized Tversky was smarter than you, the smarter you are."

The book contains many amazing quotes from Amos. When he was asked why/how he had become a psychologist, Amos replied: "The big choices we make are practically random. The small choices probably tell us more about who we are. Which field we go into may depend on which high school teacher we happen to meet. Who we marry may  depend on who happens to be around at the right time of life. On the other hand, the small decisions are very systematic. That I became a psychologist is probably not very revealing. What kind of psychologist I am may reflect deep traits."

Amos's early research on similarity of things was also very interesting, and the book explains this very nicely. Similarity is not like distance, because it does not necessarily have symmetry. People assess similarity essentially by making a list of features. The more noticeable features two objects share, the more similar they are. Note that, objects have varying number of noticeable features: New York has more of them than Tel Aviv. As a result, New York is not as much like Tel Aviv as Tel Aviv is like New York. Hence, similarity can be asymmetric. (In other words, similarity is like Bayesian subset comparison of features.) The book has a nice anecdote about how Amos instated that sometimes a lack of a feature is a feature: e.g., a three legged dog.

When Amos and Danny got together as colleagues, they started an epic collaboration streak investigating the irrational ways humans make decisions about risk. Their collaborative research started the behavioral economics field.

Their research shows a prime example of academic courage and creativity. They were able ask very original and daring questions and they were brave enough to pursue those questions and break ground in uncharted territory. An example of the kind of thought experiments they performed is the Asian disease problem which elegantly demonstrates the power of framing.

I will not divulge more about the collaboration of this duo and about the book. I enjoyed the book immensely and I strongly encourage you to read the book to learn about the collaboration story of these two great men. The book gives great insights about how to approach research as well.

Amos passed away in 1996 due to metastatic melanoma. Danny was awarded the 2002 Nobel Memorial Prize in Economic Sciences. You can listen to Danny talk here.

Friday, November 10, 2017

Paper summary. Towards distributed machine learning in shared clusters: a dynamically partitioned approach

This paper (by Peng Sun, Yonggang Wen, Ta Nguyen Binh Duong, and Shengen Yan) has been put on Arxiv on April 2017.

This paper was a little confusing to read. I think it could have been presented better to make its contributions more clear. The paper aims to enable multiple distributed ML frameworks, say TensorFlow, Petuum, MxNet, share the same cluster.

Enterprises have clusters, managed by  a cluster management systems (CMSs). The paper starts with a review of existing CMSs, and mentions shortcomings with each. It is unhappy with application-level scheduling, because there each application reserves and keeps all allocated resources until completion, and this leads to low utilization of the resources as the scheduling is done for the peak/maximum resource needs of the application.

In the task-level scheduling mode, applications use acquired resources to run a single task, release them as soon as the task completes, and petition for new resources to launch uncompleted tasks. The paper cites high scheduling overhead with this approach: each task must wait until receiving suitable resources. (But the SLAQ paper we reviewed has taken this task level scheduling route and didn't find significant scheduling overheads.)

In order to enable distributed ML workloads share a single cluster, Dorm uses two techniques: a dynamically-partitioned cluster management mechanism and an utilization-fairness optimizer.

The solution Dorm proposes is simple. It uses Docker containers to partition a cluster and runs one application per partition. Each application places its tasks on the assigned partition without petitioning for resources. Dorm can then adjust the existing resource allocations (i.e., number of containers in a partition) to keep a high resource utilization.

When adjusting an application's resources, Dorm first saves its state to a reliable storage system (e.g., Lustre filesystem). Then Dorm kills this application and creates/destroys containers on corresponding servers. Finally, Dorm resumes the killed application from the saved state with new resource allocations. This way, distributed ML applications can dynamically scale up/down without recomputing from the first iteration.

But this leaves me with several questions. Is this checkpointing only for the parameter-server state and not the worker states? Would the checkpointing work with TensorFlow which has dataflow graphs and associated state at each worker? Would those worker states matter? Would the states of the channels (i.e., messages in transit) matter? Finally, how is the checkpoint done in a distributed manner? The checkpoints will naturally appear at different points in different workers/containers; does that cause a problem?

The paper reports that Dorm was implemented to work with Petuum, MxNet, TensorFlow and MPI-Caffe. But details lack about how the implementations are done and how the application frameworks are modified to work with Dorm.

Wednesday, November 8, 2017

TLA+/PlusCal modeling of Synchronized Round Consensus Algorithm: Solution

The other day I posed the synchronized round consensus question.

Here is the solution on GitHub, and some brief explanation of the relevant part below.

Single round consensus algorithm

The code above is pretty straightforward. The while loop between lines 36-42 models how a node sends its vote to other nodes one by one. The sender node can fail in this while loop after some of the nodes received the vote. So if we model check with FAILNUM=1, the agreement invariant is violated in this single round algorithm as seen in the error trace below.

The blue highlighted line, state 15, is the last line in the error trace, and the value of the state variables are listed in the window below. If you inspect "up" you can see node 1 is down. Checking "mb" you can see node 2 received node 1's vote, but node 3 did not receive node 1's node. As a result, the decision "d" for node 2 is "1", whereas node 3 decides "2", and both decisions are finalized. So the invariant "Agreement" is violated.

(Note that even if we had 10 nodes, and FAILNUM=8, we could have extended this scenario by failing always the smallest id up node in each round after it delivers the message to the next node in the sequence keeping the "1" vote alive but hidden.)

Another interesting part in the code occurs at lines 43 and 44.
After sending its vote to the other nodes, the node increments its round number, pt, by 1. But then it "awaits" other nodes to catchup, and goes to the next round only after this synchronization await at line 44. Note that the node awaits only for the nodes that are "up". If it waits for a down node to increment its pt+1, it would have to wait forever.

This await at line 44 cuts corners: it assumes shared memory instead of message passing. One way to implement this unrealistic "await" is to use physical time, but even that is a brittle method. In reality it is hard to implement perfectly synchronized rounds. Physical clock synchronization is hard, and since the OS is not a real-time OS, timing assumptions can be violated, say due to garbage collection kicking in, or due to VM/container getting slow, or network contention.

When the round synchronization assumption is broken, this algorithm fails. Trust me, you don't want your consensus algorithm, that the rest of your coordination infrastructure depends on, to fail. That is why consensus algorithms adopted in practice, such as Paxos, Raft, Zab, Viewstamped Replication, do not rely on synchronized rounds, and can tolerate (in the sense that agreement is not violated) extreme asynchrony in the system. When the timing assumptions normalize a bit, those algorithms then achieve progress and solve consensus.

Crash tolerant synchronized round consensus algorithm

To tolerate crash faults, it is clear that the algorithm needs to be extended to delay decision to future rounds where each node can ensure that all the nodes have the same set of values from which to decide.

To this end, Line 32 introduces a top-level while loop to the original model iterate through multiple rounds.

The important question here is to determine which round is safe to decide.

The trivial way to do this is to always iterate FAILNUM+1 rounds before deciding, but that is a wasteful solution. FAILNUM is the maximum number of faults that can occur, and the algorithm should be able to decide in less number of rounds in the common case when faults do not occur. But how do you tell that asymmetrically, only using the node's own perception of the system, which is by definition partial and always slightly stale.

One way to do this is to look at the stability of the set of proposed votes and compare mb, the mailbox contents for this round, with pmb, the mailbox contents in the previous round. If there is a potential for fault, it follows that the algorithm should always go to round 2, to confirm with others. The delaying of the decision should continue until the proposed votes converge to a single value for two consecutive rounds and the cardinality of the mailbox is also important because it witnesses to the fact that there are no faults so the above pathological crash sequence of vote hiding is avoided.


I had written a faulty version of the multi-round algorithm in my first try. I had not taken the cardinality of the set into account and went with straightforward set union. It didn't give any violations of the invariant for N=5 and FAILNUM=3, but the progress part was taking more than an hour on my laptop and I stopped running it. Turns out that version was susceptible to the pathological crash sequence and vote hiding as above. All the nodes decide with "2" but there is this node who just received a "1" vote which was still alive but hidden. So this node goes to next round, but since others have decided, this node will await forever. This is a wacky flavor of consensus, which can still be acceptable maybe if this minority report node kills itself with up:=FALSE. This led me to improve the line 44 condition. Another bug was about a node in a higher round sending messages which gets consumed by a node in a lower round, which leads to nodes getting stuck. To solve this, I had to improve the condition at line 37.

I found out these mistakes when I started writing this blog post. So writing and explaining is an indispensable part of the design process. If TLA+ model checking was more performant, I wouldn't give up prematurely, and I would still know about the solution. If model checking is taking long, it may be best to do it on the cloud, say on an AWS, Azure, or GCE. But the state space explosion is an inherent and dangerous nemesis for model-checking and it will bite. The best precaution is to keep things simple/minimal in the modeling. But that is not always easy.