Chapter 7
Scaling Deployments
“I fixed my eyes on the largest cloud, as if, when it passed out of my
sight, I might have the good luck to pass with it.”
—Sylvia Plath, The Bell Jar
Public clouds are built from the ground up to allow customers to scale their
deployed services to fit their needs. The most common motivation for scaling in
industry is to allow services to support thousands or millions of concurrent users.
For example, an online media streaming site may initially be deployed on a single
VM instance. As its business grows, it may need to scale to use 100s or even
10,000s of servers at peak times, and then scale back when business is slow.
As we discuss in chapter 14, such multiuser scaling can also be of interest to
some research users. However, the more urgent need facing scientists and engi neers
is often to run a bigger simulation or to process more data. To do so, they need to
be able to obtain access to 10 or 100 or 1,000 servers quickly and easily, and then
run parallel programs eciently across those servers. In this chapter, we describe
how dierent forms of parallel computation can be mapped to the cloud, and some
of the many software tools that clouds make available for this purpose.
We return to various of these tools in part III of the book, when we look at
data an alytics , streaming, and machine learning, each of which increasingly require
large-scale computing.
7.1. Paradigms of Parallel Computing in the Cloud
7.1 Paradigms of Parallel Computing in the Cloud
We consider in this chapter five commonly used parallel computing paradigms for
scaling deployments in the cl oud .
The first is highly synchronous
single program multiple data
(SPMD)
computing. This paradigm dominates supercomputer applications and is often the
first that scientific programmers want to try when they meet the cloud. We will
show how to use SPMD programming models in the cloud, pointing out where
care must be taken to get good performance.
The second paradigm i s
many task parallelism
, in which a large queue or
queues of tasks may be executed in any order, with the results of each task stored
in a database or in files, o r used to define a new task that is added to a queue.
Such computations were termed embarrassingly parallel by early parallel computing
researchers [131]; we prefer happily parallel. It adapts well to clouds.
The third paradigm is
bulk synchronous parallelism
(BSP) [
139
]. Originally
proposed as a model for analyzing parallel algorithms, BSP is based on processes or
threads of execution that repeatedly perform independent computations, exchange
data, and then synchronize at a barrier. (When a process reaches a barrier in
its program, it stops and cannot proceed until all other threads/processes in the
synchronization group reach the same point.) A contemporary example of th e BSP
model i s the
MapReduce
style made famous by Google and now widely used in
its Hadoop and Spark realizations.
The fourth paradigm is the graph execution model, in which computation
is represented by a directed, usually acyclic, graph of tasks. Execution begins at a
source of the graph. Each node is scheduled for execution when all incoming edges
to that node come from task nodes that h ave completed. Graphs can be constructed
by hand or alternatively generated by a compiler from a more traditional-looking
program that describes the graph either implicitly or explicitly. We describe graph
execution systems in chapters 8 an d 9, including the data analytics tool
Spark
and the
Spark Streaming
,
Apache Flink
,
Storm
,and
Google Dataflow
systems. The graph execution model is also used in machine learning tools, s uch
as the
Google TensorFlow
and
Microsoft Cognitive Toolkit
systems that
we discuss in chapter 10. In many of these cases, the execution of a graph node
involves a parallel operation on on e or more distributed data structures. If the
graph is just a linear sequence of nodes, then execution becomes an instance of
BSP concurrency.
The fifth paradigm we consider is
microservices
and actors. In the actor
model of paral lel programm ing , computing is performed by many
actors
that
96
Chapter 7. Scaling Deployments
communicate via messages. Each actor has its own internal private memory and
goes into action when it receives a message. Based on the message, it can change its
internal state and then send messages to other actors. You can implement an actor
as a simple web service. If you limit the service to being largely stateless, you are
now in th e realm of
microservices
, a dominant design paradigm for large cloud
applications. Microservices are often implemented as container instances, and they
depend on robust container management infrastructure such as
Mesos
[
154
]and
Kubernetes [79], which we describe in later sections.
We discuss in the following how each of these paradigms can be implemented in
cloud environments. We look at the impact that GPU accelerators are having on
science in the cloud and examine how we can build traditional high-performance
computing (HPC) clusters. We then move on to more cloud-centric systems that
have evolved for scalable data analytics. This last group includes Mesos and
Kubernetes for managing large collections of microservices, as well as MapReduce-
style computing systems such as Hadoop and Spark.
7.2 SPMD and HPC-style Paralleli sm
Scientists and engineers frequently ask, “Can I do traditional high-performance
computing (HP C) in the cloud?” The answer us ed to be, “not really”: Indeed, a
2011 U.S. Department of Energy report [
223
] and a 2012 National Aeronautics
and Space Administration (NASA) report [
196
] both concluded that cloud d id not
meet expectations for HPC applications. However, things have changed as cloud
providers have deployed both high-speed node types and specialized interprocessor
communication hardware. It is now feasible to achieve excellent performance for
HPC applications on cloud systems for modest numbers of processing cores, as we
now discuss.
7.2.1 Message Passing Interface in the Cloud
The Message Passing Interface (MPI) remains the standard for large-scale parallel
computing i n science and engineering. The specification allows for portable and
modular parallel programs; implementations allow communicating processes to
exchange messages frequently and at high speed. Amazon
aws.amazon.com/hpc
and Azure
azure.microsoft.com/en-us/solutions/big-compute
both oer special
HPC-style cluster hardware in their data centers that can achieve high performance
for MPI applications. Cycl e Computing reported back in 2015 th at “we have seen
comparable performance [for MPI applications on EC2 vs. s upercomputers] up
97
7.2. SPMD and HPC-style Parallelism
to 256 cores for most workloads, and even up to 1024 for certain workloads that
aren’t as tightly-coupled. ” [
156
] The key to getting good performance is configu ring
advanced networking, as we discuss below when we describe how to create virtual
clusters on Amazon and Azure.
7.2.2 GPUs in the Cloud
Graphics processing units have been used as accelerators in supercomputing for
about 10 years. Many top supercomputing applications now rely on compute
nodes with GPUs to give massive accelerations to dense linear algeb ra calculations.
Coincidentally this type of calculation is central to the computations needed to
train
deep neural networks
(NNs), which lie at the nexus of interest in machine
learning by the technology industry. Consequently it is no surprise that GPUs are
making their way into cloud servers.
The most common GPUs used in the current systems are from NVIDIA and
are programmed with a single instruction multiple data (SIMD) model in a system
called CUDA. SIMD operators are, in eect, array operations where each instruction
applies to entire arrays of data. For example, a state-of-the-art system in 2017, the
NVIDIA K80, has 4, 992 CUDA cores with a dual-GPU design delivering up to 2.91
teraflops (10
12
floating point operations per second) double-precision performance
and 8.73 teraflops i n single precision. Amazon, Microsoft, and IBM have made
K80-supported servers available in their public clouds. The Amazon EC2 P2
instances in corporate up to eight NVIDIA Tesla K80 accelerators, each runni ng
a pair of NVIDIA GK210 GPUs. The Azure NC24r series has four K80s with
224 GB memory, 1.5 TB solid state disk (SSD), and hi gh-speed networking. You
do not need to configure a particularly large cluster of these machine instances to
create a petaflop (10
15
floating point operations/second) virtual computer.
MPI cloud computing for proton therapy
. This example illustrates how spe-
cialized node types (in this case, GPU-equipped nodes with 10 gigabit/s interconnect)
allow cloud computing to deliver transformational computing power for a time-
sensitive m edical application. It also involves the use of MPI for inter-instance
communication, Apache Mesos for acquiring and configuring virtual clusters, and
Globus for data movement betwee n hospitals and cloud.
Chard and colleagues have used cloud computing to reconstruct three-dimensional
proton computed tomography (pCT) images in support of proton cancer treat-
ment [
91
]: see figure 7.1 on the following page. In a typical clin ical usage modality,
protons are first used in an online position verification scan and are then targeted at
cancerous cells. The res ults of the verification scan must be returned rapidly.
98
Chapter 7. Scaling Deployments
10 cm
Figure 7.1: Proton c omputed tomography. Protons pass left to right through sensor planes
and traverse the target before stopping in the detector at th e far right.
Asinglereconstructioncanrequiretheanalysisofaroundtwobillion50-byte
proton histories, resulting in an input dataset of
100 GB. The reconstruction process
is complex, involving multiple processes and multiple stages. First, each participating
process reads a subset of proton histories into memory and performs some preliminary
calculations to remove abnormal histories. Then, filtered back projection is used to
estimate an initial recons truction solution, from which most likely paths (MLPs) are
estimated for the proton through the target. The voxels of the MLP for each proton
identif y the nonzero coecients in a set of nonlinear equations that must then be
iteratively solved to construct the image. This solution phase takes the bulk of the
time and can be accelerated by using GPUs and by caching the MLP paths (up to
2 TB for a 2-billion-history image) to avoid recomputation.
Chard et al. use a high-performance, MPI-based parallel reconstruction code
developed by Karonis and colleagues that, when run on a standalone cluster with
60 GPU-equipped compute nodes, can reconstruct two billion histories in seven
minutes [
167
]. They configure this code to run on an Amazon virtual cluster of GPU-
equipped compute instances, selected to meet requirements for compute performance
and memory. They deploy on each instance a VM image configured to run the pCT
software plus associated dependencies (e.g., MPI for inter-instance communication).
The motivation for developing this pCT reconstruction service is to enable rapid
turnaround processing for pCT data from many hospitals, as required for clinical use
of proton therapy. Thus, the pCT service also incorporates a scheduler component,
responsible for acce p ting requests from client hospitals, acquiring and configuring
appropriately sized virtual clusters (the size depending on the number of proton
histories to be processed), and dispatching reconstruction tasks to those clusters.
They use the Apache Mesos scheduler (section 7.6.6 on page 124) for this task and
the Globus transfer service (section 3.6 on page 51) for data movement.
This pCT service can compute reconstructions for $10 per image when using
99
7.2. SPMD and HPC-style Parallelism
Amazon spot instances (see page 80 for a discussion of spot instances). This is a
revolutionary new capability, considering that the alternative is for each hospital with
aprotontherapysystemtoacquire,install,andoperateadedicatedHPCcluster.
7.2.3 Deploying an HPC Cluster on Amazon
A useful tool for building HPC clusters on the Amazon cloud i s Amazon’s
Cloud-
Formation
service, which enables the automated deployment of complex collec-
tions of related services, such as multiple EC2 instances, load balancers, special
subnetworks connecting these components, and security groups that apply across
the collection. A specific deployment is described by a template that defines all
needed parameters. Once required parameter values are set in the template, you
can launch multiple identical instances of the described deployment. You can both
create new templates and modify existing templates.
We do not go into those complexities here bu t instead consider one particular
case:
CfnCluster (CloudFormation Cl uster)
. This tool is a set of Python
scripts that you can install and run on your Linux, Mac, or Windows computer to
invoke CloudFormation, as follows, to build a private, custom HPC cluster.
sudo pip install cfncluster
cfncluster configure
The configuration script asks you various questions, including the following.
1. A name for your cluster template (e.g., “mycluster”)
2. You r Amazon access keys
3. The region in which you want your cluster to be deployed
4. The name for your virtual private cloud (VPC)
5. The name for the key pair that you want to use
6. Virtual Private Cloud (VPC) and subnet IDs
The following command causes a basic cluster to be launched, in the manner
illustrated in figure 7.2 on the following page. (As you have not yet specified the
number or type of compute nodes to start, or what image to run on those nodes,
defaults are used.) This process typically takes about 10 minutes.
cfncluster create mycluster
100
Chapter 7. Scaling Deployments
Figure 7.2: CloudFormation steps involved in launching a private HPC cloud from a
CfnCluster template.
The “create” command returns a Gangli a URL. Ganglia is a well-known and
frequently used cluster monitoring tool. Following that link takes you to a Ganglia
view of your HPC clus ter. The default settings for a new cluster are autoscale
compute nodes and the gridEngine scheduler. This combination works well if you
are running a lot of no n-MPI batch jobs. With autoscale, compute nodes are shut
down when not in us e, and new nodes are started when load increases.
Now let us deploy a new cluster with better compute nodes and a better
scheduler. The following command deletes your current deployment.
cfncluster delete mycluster
To create a new and improved deployment that you can use for MPI programs,
find the directory
~/.cfncluster
on your PC and edit the file
config
. Look for
a section called
[cluster mycloud]
, and add or edit the following four lines in
that section.
compute_instance_type = c3.xlarge
initial_queue_size = 4
maintain_initial_size = true
scheduler = slurm
101
7.2. SPMD and HPC-style Parallelism
The choice of compute instance type here is important. The
c3.xlarge
instance
type supports what Amazon calls
enhanced networking
, which mean s that it
runs on hardware and with software that support Single-Root I/O Virtualization
(SR-IOV). As we discuss in more detail in section 13.3.2 o n page 286, this technology
reduces latency and increases bandwidth by allowing VMs to access the physical
network interface card (NIC) directly.
You do not need to specify a VM image because the default contains all libraries
needed for HPC MPI-style computing. You have also stated that you want all of
your compute nodes to stay around and not be managed by autoscale, an d that you
want Slurm to be the scheduler. We have found that these options make interactive
MPI-based computing much ea sier. When you now issue the create comma nd, as
follows, CloudFormation deploys an HPC cluster with these properties.
cfncluster create mycluster
Now let us run an MPI program on the new cluster. You first login to the
cluster’s head node, using the key pair that we used to create the cluster. On a
PC you can use PuTTY, and on a Mac you can use
ssh
from the command line.
The user is
ec2-user
. First you need to set up some path information. At the
command prompt of the head node, issue the following three commands.
export PATH=/usr/lib64/mpich/bin :$PATH
export LD_LIBRARY_PATH =/usr/ lib64/mpich/lib
export I_MPI_PMI_LIBRARY =/opt/slurm/lib/libpmi.so
You next need to know the local IP addresses of your compute nodes. Create a
file called ip-print.c, and put the following C code in it.
# include <stdio.h>
# include <mpi.h>
# include <stdlib.h>
main(int argc , char ** argv )
{
char hostname [1024];
gethostname(hostname , 1024);
printf("%s\n", hostname );
}
You can then use the Slurm command
srun
to run copies of this prog ram
across the enti re cluster. For example, if your cluster has 16 nodes, run:
mpicc ip -print.c
srun -n 16 /home /ec2 -user /a. out > machines
102
Chapter 7. Scaling Deployments
The output file
machines
should then contain multiple IP addresses of the form
10.0.1.x
, where
x
is a number between 1 and 255, one for each of your compute
nodes. (Another way to find these private IP addresses is to go to the EC2 console
and look at the details for each running server labeled Compute. You need to do
this for each instance, so if you have a lot of instances, the above method i s faster.)
Now you can run the simple MPI program in figure 7.3 on the next page to
test the cluster. Call this
ring_simple.c
. This program starts with MPI node 0
and sen ds the number -1 to MPI node 1. MPI node 1 sends 0 to node 2, node 2
sends 1 to node 3, and so on. Compile and run this with the command below.
mpicc ring.c
mpirun -np 7 -machinefile ./ machines /home/ec2 -user/a.out
Assuming seven processes running on four nodes, you should get results like
the following.
Warning: Permanently added the RSA host key for IP address ' 10.0.1.77 ' to
the list of known hosts .
Warning: Permanently added the RSA host key for IP address ' 10.0.1.78 ' to
the list of known hosts .
Warning: Permanently added the RSA host key for IP address ' 10.0.1.76 ' to
the list of known hosts .
Warning: Permanently added the RSA host key for IP address' 10.0.1.75 ' to
the list of known hosts .
Received number -1 from process 0 on node ip -10 -0 -1 -77
Received number 0 from process 1 on node ip -10-0-1-75
Received number 1 from process 2 on node ip -10-0-1-78
Received number 2 from process 3 on node ip -10-0-1-76
Received number 3 from process 4 on node ip -10-0-1-77
Received number 4 from process 5 on node ip -10-0-1-75
Received number 5 from process 6 on node ip -10-0-1-78
Observe that the
mpirun
command distributed the MPI nodes uniformly across
the cluster. You are ready to do some real HPC computing on your virtual private
MPI cluster. You might, for example, measure message transit times. We converted
our C program into one in which our nodes are connected in a ring, and measured
the time fo r messages to go around this ring ten million times. We found that the
average time to send a message and have it received at the destination was a bout
70 microseconds. While this is not a standard way to measure message latencies,
the resu lt is consistent with that seen in other cloud cluster implementations. It
is at leas t 10 times slower than conventional supercomputers, but note that the
underlying protocol used here is IP and that we do not use any special cluster
network such as InfiniBand. The full code is available in GitHub and is linked
from the book website in the Extras tab.
103
7.2. SPMD and HPC-style Parallelism
# include <mpi.h>
# include <stdio.h>
# include <stdlib.h>
# include <time.h>
int main( int argc , char ** argv ) {
// Initialize the MPI environment
MPI_Init(NULL , NULL);
// Find out rank , size
int rank , world_size , number ;
MPI_Comm_rank(MPI_COMM_WORLD , &rank);
MPI_Comm_size(MPI_COMM_WORLD , &world_size);
char hostname [1024];
gethostname(hostname , 1024);
// We assume at least two processes for this task
if (world_size < 2) {
fprintf(stderr , "World size must be >1 for %s\n", argv [0]);
MPI_Abort(MPI_COMM_WORLD , 1);
}
if (rank == 0) {
// If we are rank 0, set number to -1 & send it to process 1
number = -1;
MPI_Send (&number , 1, MPI_INT , 1, 0, MPI_COMM_WORLD );
}
else if (rank > 0 && rank < world_size) {
MPI_Recv (&number , 1, MPI_INT , rank -1, 0, MPI_COMM_WORLD ,
MPI_STATUS_IGNORE);
printf("Received number %d from process %d on node %s\n",
number , rank -1, hostname);
number = number +1;
if (rank+1 < world_size)
MPI_Send (&number , 1, MPI_INT , rank+1, 0, MPI_COMM_WORLD );
}
MPI_Finalize ();
}
Figure 7.3: The MPI program
ring_simple.c
that we use for performance experiments.
104
Chapter 7. Scaling Deployments
7.2.4 Deploying an HPC Cluster on Azure
We describe two approaches to building an HP C cluster on Azure. The first is to
use Azure’s service deployment orchestration service,
Quick Start
. Like Amazon
CloudFormation, it is based on templates. The templates are stored in GitHub
and can be invoked directly from the GitHub page. For example, figure 7.4 shows
the template start page for creating a Slurm cluster.
Figure 7.4: Azure Slurm start page [8].
When you cl ick
Deploy to Azure
, you are taken to the Azure login p age and
then directly to the web form for completing the S lu rm cluster deployment. At
this point you enter the names for the new resource group that defines your cluster,
the numb er and type of compute nodes, and a few details about the network. We
refer you to the Microsoft tutorial [
200
] for more details on how to buil d a Slurm
cluster and launch jobs on that cluster.
A second approach to HPC computing on Azure is
Azure Batch
, which
supports the management of large pools of VMs that can handle large batch
jobs, such as many task-parallel job s (described in the next section) or large MPI
computations. As illustrated in figure 7.5 on the next page, four major steps are
involved in using this service.
1. Upload your application binaries a nd input data to Azure storage.
2.
Define the pool of compute VMs that you want to use, specifying the desired
VM size and OS image.
3. Define a Job, a container for tasks executed i n your VM pool.
4. Create tas ks that are lo aded into the Job and executed in the VM pool.
105
7.2. SPMD and HPC-style Parallelism
Tasks are usually just command line scripts to be executed on each node.
Typically the first tasks are executed on all nodes and involve loading your input
data and applications, and performing other needed initialization of the individual
VMs. The next task can be your MPI job. You can specify that this task not start
until the previous task finishes. The final task can be one that moves your output
data back to the Azure blob storage.
Figure 7.5: Steps in creating and e xecuting an Azure batch job.
7.2.5 Scaling Further
We indicated above that tightly coupled HPC applications can scale to modest-sized
clusters. Why can’t they scale further? There are good reasons. Supercomputer
architectures are designed with a heavy emp has is on the performance of the
processor interconnection network. They use specialized hardware features and
communication protocols to achieve mess age latencies in the sub-microsecond range.
On the other hand, cloud data centers were originally built with networks based on
the standard Internet TCP/IP protocols over Ethernet switches, with an emphasis
on bandwidth from the cluster to the outside user rather than between servers.
A quantity called the network
bisection bandwidth
is a measure o f how
much data trac can flow from one half of the su percomputer to the other
half in a specified period of time. The networks used in supercomputers have
an extremely high bisection bandwidth. The first cloud data centers had low
106
Chapter 7. Scaling Deployments
bisection bandwidth. More recently, most public cloud vendors have redesigned
their networks to take advantage of software defined networking and better network
topologies, with the result that Google claims today to have a bisection bandwidth
of more than 1 petabit/s (10
15
b/s)—which, as they point out, is “enough for
100,000 servers to exchange information at 10 Gb/s each” [
250
]. In addition,
the HPC subclusters deployed in Microsoft and Amazon data centers are now
supplemented by mo re sophisticated InfiniBand as well as custom networks. Thus,
it is becoming increasingly feasible to run latency-sen sitive applications on clouds.
Nevertheless, one needs to consider the nature of the
service level a greement
(SLA) that cloud providers make with their users. Supercomputers commit to a
specific processor type, network bandwidth, bisection width, and latency, allowing
the user to predict an application’s performance profile with a fair degree of certainty.
With cloud data centers, h owever, the specific resources used to constitute a HPC
subcluster in the cloud are less transparent. In other words, the SLA is complex
and less likely to scale to large deployments than a supercomputer.
7.3 Many Task Parallelism
Suppose you need to an alyze many data samples. Each analysis task can be
performed independently of all the other tasks. This style of compu tation is simple.
You place all data samples in a queue in the cloud, and then start a large number
of worker VMs or containers. We refer to this as
many task parallelism
,butit
is also known as
bag of tasks parallelism
and
manager worker parallelism
.
Each worker repeatedly pulls a sa mpl e from a queue, processes the data, and stores
the result in a shared table: see figure 7.6.
Figure 7.6: Simple many task execution model.
A good example application is genomics, in which many DNA sequences must
often be processed. Wilkening et al. have analyzed the suitability of the cloud for
107
7.4. MapReduce an d Bulk Synchronous Parallelism
this task [
260
]. Seattle Children’s Hospital used the same approach for th e NCBI
BLAST code on Azure [
33
]. The cloud-based Globus Genom ics, described in detail
in section 14.4 on pag e 303, provides many gen omi cs pipelines [
187
]. We present a
detailed implementation of a many task example later i n this chapter.
This strategy requires an ecient mechanism that multiple workers can each
use to pull a unique task from the queue and push an item into a table. We
also need a way to start and stop tasks. We describe below a simple Python
framework for many task parallelism that uses a resource manager to coordinate
worker activities.
7.4 MapReduce and Bulk Synchronous Parallelism
A slightly more sophisticated approach to parallelism is based on a concept called
bulk-synchronous parallelism (BSP). This is important when worker tasks mu st
periodically synchronize and exchange data with each other. The point of syn-
chronization i n a BSP computation is ca lled a barrier, because no computation is
allowed to proceed until all computations reach the synchronization point.
MapReduce
is a special case of BSP computing. The concept is simple. Say
you have a sequence of d ata objects
X
i
for
i
=1
..n
and you want to apply a
function
f
(
x
) to each element. Assume the result is a value in an associative ring
like the real numbers, for which we can compose objects, and we want to compute
the sum
P
n
i=1
f
(
x
i
).Wemap
f
over the data and then reduce by performing
the sum. While this concept dates from Lisp programmin g in the 1960s, it was
popularized for big data analysis in a 2004 paper by Google engineers Dea n and
Ghemawat [
108
] and became ubiquitous when Yahoo! released an open source
implementation of MapReduce known as Hadoop.
A Map Reduce computation starts with a distributed data collection partitioned
into non-overlapping blocks. It then maps a supplied fun ction over each block.
Next it applies the reduction. This is where the barrier comes in. It combines the
result of the map operator in a treelike way, perhaps through several stages, until
it produces the result, as illustrated in figure 7.7 on the following page.
Even before Hadoop was released, people were applying MapReduce to scientific
problems in biology [
129
,
246
,
203
], image analysis [
188
], and more [
135
]. Hadoop
is a popular i mpl ementation of MapReduce that can run on any cloud and cluster.
Hadoop is now part of the Apache YARN project, which we describe in chapter 8.
108
Chapter 7. Scaling Deployments
Figure 7.7: Simple MapReduce execution.
7.5 Graph Dataflow Execution and Spark
The MapReduce model can be thought of as the execution of a directed acyclic
graph of tasks, but one can easily see how to generalize the idea. Define the
computation as a graph of functions that are executed in a dataflow form until
certain evaluation points are reached. E ach function node in the graph represents
a parallel invocation of the fun ction on distributed data structures, as shown in
figure 7.8 on the next page. At this level, the execution is essentially BSP-style,
and the evaluation points are barrier synchronizations.
The dataflow graph is defined in a high-level control program that is then
compiled into the parallel executio n grap h. Control flow starts with the control
program; and when the graph is evaluated, control passes to a distributed execution
engine. At the control points, th e distributed data structures are converted back
into structures that can be accessed by the control program. For example, if the
algorithm invo lves an iteration, then the loop control is in the control program,
and this is the barrier point for the parallel part of the program.
Spark
spark.apache.org
is a popular example o f this style of computation.
In Spark the control flow program is a version of SQL, Scala, or Python and,
consequently, you can easily execute Spark programs from a Jupyter notebook.
We provide many examples of such computations in this book. Spark was orig-
inally developed in the Berkeley AMPLab and is now supported by Databricks
109
7.6. Agents and Microservices
Figure 7.8: Dataflow grap h as defined by the program (above) and after the parallelism is
unrolled during execution (below).
databricks.com
, among others. Databricks provides an online development envi-
ronment and platform for d eploying Spark-enabled clusters on Amazon.
Spark is also part of Microsoft’s HDInsight toolkit and is supported on Amazon
Elastic MapReduce. Microsoft documentation [
21
] describes how to d epl oy Spark
on Azure with Linux or Windows a nd from a Jupyter notebook. Because Spark is
used primarily for data analytics, we defer detailed examples to chapter 8.
7.6 Agents and Microservices
The cloud is designed to hos t applications that are realized as scalable services, such
as a web server or the backend of a mobile app. Such applications accept connections
from remote clients and, based on the client request, perform some computation
and return a response. If the number of clients that are co ncurrently requesting
service grows too large for one server to handle, the system should automa tically
spawn new instances of the server to share the load . Another scenario is an
application that processes events from remote sensors, with the goal of i nform ing
a control system on how to respond, as when geosensors detecting ground motion
tremors occurring in a significant pattern sound an earthquake warning. In this
110
Chapter 7. Scaling Deployments
case the cloud part of the application may involve multiple components: sensor
signal decoders, pattern analysis integrators, database searches, and alarm system
interfaces. A nother example is a large-scale data analytics system processing large
amounts of text data: for example, a search index for the web.
This style of parallel programming is like an asynch ronou s swarm of commu-
nicating processes or services distributed over a virtual network in the cloud, as
illustrated in figure 7.9. The individual processes may be stateless,suchasasimple
web service, or stateful, as in the actor programming model. Several great examples
exist [
56
], bu t one that has been used in the public cloud is the Orleans system s
from Microsoft Research [72].
Figure 7.9: Conceptual view of a swarm of communicating microservices or actors.
Recently this asynchronou s swarm style has been rebranded as a new paradigm,
under the name microservices. The problem addressed by microservices is that
of how to design an d build a large, heterogeneous application that is secure,
maintainable, fault tolerant, and scalable. This question is particularly important
for large online services that need to support thousands of concurrent users.
The app must remain up twenty-four hou rs a day while being maintained and
upgraded. (This task is closely related to a software engineering methodology called
DevOps
, which integrates product delivery, quality testing, feature development,
and maintenance releases in order to improve reliability an d security and provide
faster devel opm ent and deployment cycles. From the programmer’s perspective
this is a “you built it, so now you run it” philosophy. Updates to parts of this
system occur while the system is running, so the developers are tightly integrated
with the IT professionals who manage the system.)
The microservice solution to this challenge is to partition the application into
small, independent service components commu ni cating with simple, lightweight
111
7.6. Agents and Microservices
mechanisms. The microservice paradigm design rules dictate that each microservice
must be able to be managed, replicated, scaled, upgraded, and deployed indepen-
dently of other microservices. Each microservice must have a single fun ction and
operate in a bounded context; that is , it has limited responsibility and limited
dependence on other services. When possible one should reuse existing trusted
services such as databases, caches, and directories. All microservices should be
designed for constant fai lu re a nd recovery. The communication mechanisms used
by microservice systems are varied: they include REST web service calls, RPC
mechanisms such as Google’s Swift, an d the Advanced Message Queuing Protocol
(AMQP). The microservice philosophy has been ado pted in various forms by many
companies, including Netflix, Google, Microsoft, Spotify, and Amazon.
7.6.1 Microservices and Container Resource Managers
We next describe the b asi c ideas behind building m icroservices using clusters of
containers. To build a substantial microservice application, you need to build
on a platform that can manage large collections of distributed, commu nicating
services. The microservices described below are instances of containers, and we
use a resource manager to launch instances, stop them, create new versions, and
scale their number up and down . Thanks to the open source movement and the
public clouds, we have an excellent collection of tools for building and managing
containerized microservices. We focus here on the Amazon ECS container service,
Google Kubernetes, Apache Mesos, and Mesosph ere on Azure.
7.6.2 Managing Identity in a Swarm
One issue that must be co nsi dered when you design an application around mi-
croservices is how you del egate your authority to a swarm of containerized, nearly
stateless services. For example, if some microservices need to access a queue of
events that you own, and others need to interact with a database that you created,
then you need to pass part of your authority to those services so that they may
make invocation s on your behalf. We discussed this issue in section 3.2 on page 38,
where we described how to manage your Amazon key pair.
One solution is to pass these values as runtime parameters through a secure
channel to your remotely run ni ng applicatio n or microservice. This solution has
two problems, however. First, microservices are designed to be shut down when
not needed and scaled up in number when the demand is high. You would need
to automate the process of fetching the credentials for each microservice reboot.
Second, by passing these credentials, you endow your remote service with all of
112
Chapter 7. Scaling Deployments
your authority. This is your microservice, so you are entitled to do so, but you
may p refer to pass only a limited authority: for example, grant access only to the
task queue but not the database.
The public cloud providers have a better solution:
role-based security
.What
this means is that you can create special secure entities, called
roles
, that authorize
individuals, applications, or services to access various cloud resources on your behalf.
As we illustrate below, you can add a reference to a role in a container’s deployment
metadata so that each time the container is instantiated, the role is applied. (We
discuss role-based access control in more detail in s ection 15.2 on page 319.)
7.6.3 A Simple Microservices Example
As in previous chapters, we present a single example and show how dierent
resource managers can be used to implement that example. Several types of
scientific applications can benefit from a microservice architecture. One common
characteristic is that they are to run continuously and respond to external events.
In chapter 9, we describe a detailed example of such an application: the analysis
of events from online instruments and experiments. In this chapter, we consider
the following simpler example.
Scientific document classification
.Whenscientistssendtechnicalpapersto
scientific journals, the abstracts of these papers often make their way onto the
Internet as a stream of news items, to which one can subscribe via RSS feeds. A
major source of high-quality streamed science data is arXiv
arxiv.org
,acollection
of more than one million open-access documents. Other sources include the Public
Library of Science (PLOS one), Science, and Nature, as well as online news sources.
We have downloaded a small collection of records from arXiv, each containing a paper
title, an abstract, and, for some, a sc ientific topic as determined by a curator. In the
following sections we describe how to build a simple online science d ocument classifier.
Our goal is to build a system that pulls document abstracts from the various feeds
and then uses one set of microservices to classify those abstracts into the major topics
of physics, biology, math , fin an ce , and comp u ter science, an d a second set to classify
them into subtopic areas, as illus trated in figure 7.10 on the next p age .
The initial version that we describe here is more modest. In the fi rst phase of this
system, initial document classification, we feed the do cuments from a Jupyter notebook
into a cloud-based message queue. The classifier microservices pull documents from
the queue, perform the analysis, and push results into a NoSQL table, as shown in
figure 7.11. This is now a simple many task system.
113
7.6. Agents and Microservices
Figure 7.10: Online scientific document classifier example, showing two levels and subcat-
egories for biology and computer science.
Figure 7.11: Document classifier version 1, showing the multiple predictor microservices.
7.6.4 Amazon EC2 Container Service
The Amazon
EC2 Container Service
(ECS) is a system to man age clusters
of servers devoted to launching and managing microservices based on Docker
containers. It comprises th ree basic components, as follows.
114
Chapter 7. Scaling Deployments
One or more sets of E C2 instances, each a logical unit called a
cluster
.You
always have at least one Default cluster, but you can add more.
Task denitions
, which specify inform ation about the containers in your
application, such as how many containers are pa rt of your task, what resources
they are to use, how they are linked, and which host ports they are to use.
Amazon-hosted Docker image
repositories
. Storing your images here may
make them faster to load when needed, but you can also us e the public
Docker Hub repository discussed in chapter 6.
First a note of potential confusion. Amazon refers to the EC2 VM instances in
a cl uster as container instances. These are not Docker contai ners. Later we add
Docker containers to those VMs; but, using ECS terminology, those containers will
be called tasks and services.
Before creating our cluster, you create two roles in the Am azon Identity and
Access Management (IAM) system to add ress the identity management issues that
we just discussed. The IAM link in the Security subarea of the AWS management
console takes you to the IAM Dashboard. On the left, select
Roles
. From there,
you can select
Create New Roles
.Nameit
containerservice
, and then select
the role type. You need two roles: one for the container service (which actually
refers to the VMs in our cluster) and one for the actual Docker services that we
deploy. Scroll down the list of role types and look for Amazon EC2 Container
Service Role and Amazon Container Service T ask Role. Select the Container
Service Role for
containerservice
. Save this, and now create a second role and
call it
mymicroservices
. This time, select the Amazon Container Service Task
Role. Now when you go back to the dashboard, it should look like figure 7.12.
Figure 7.12: AWS IAM Console, showing the two new roles.
115
7.6. Agents and Microservices
On the panel on the left is the link Roles. Select your
containerservice
role, and click
Roles
. You should now be able to attach various access policies
to your role. The attach policy button exposes a list of over 400 access poli-
cies that you can attach. Add three policies: AmazonBigtableServic eF ullAc c ess,
AmazonEC2ContainerServic eforEC2r ole,andAmazonEC2ContainerServiceRole.
Next, add two policies to the
mymicroservices
role, for the Amazon Simple
Queue Service (SQS) and DynamoDB: AmazonSQSFullAc c ess and AmazonDy-
namoDBFullAccess. Finally, at the top of the page on which are listed the policies,
you should see the Role ARN (Amazon Resource Name), looking something like
arn:aws:iam::01234567890123:role/mymicroservices. Copy and save it.
Creating a cluster is now easy. From the Amazon ECS console, simply click
create cluster
, and then give it a name. You next say what EC2 Instance type
you want and provid e the number of instances. If you need
ssh
access to the
cluster nodes, include a cryptographic key pair; however, this is not needed for
managing containers. You can use all the defaults on this page except th at when
you get to Contai ne r Instance IAM role, you should see the “containerservice” role.
Select this, and select create. You should soon see the cluster listed on the cluster
console, with the container instances running.
We next describe the steps needed to create a task definition and launch a service.
We present the highlights here; full details are in notebook 9. We use the following
example code to ill ustrate the approach. The call to
register_task_definition
creates a task definition in a family named
predict
with a standard network mode,
and the Role ARN from above as taskRoleArn.
import boto3
client = boto3.client(' ecs ')
response = client.register_task_definition(
family=' predict ',
networkMode=' bridge ',
taskRoleArn= ' arn :aws: iam ::01233456789123:role /mymicroservices ' ,
containerDefinitions=[
{
' name ': ' predict ',
' image ': ' cloudbook /predict',
'cpu ':20,
' memoryReservation ':400,
' essential ':True,
},
],
)
116
Chapter 7. Scaling Deployments
The rest of this code defines the container. It names the task definition
predict
,
specifies that the image is
cloudbook/predict
from the Docker public Hub, and
indicates a need for 20 units of computing (out of 1,024 available on a core) and
400 MB of memory. This is about as simpl e as a task definition can g et. The
notebook shows a more sophisticated task definition that manages port mappings.
Given a task definition, you can then call the
create_service
function, as in
the following, which creates a service called
predictor
that is to run with eight
microservices on the
cloudbook
cluster. Note the task definition name,
predict:5
.
You often end up modifying various aspects of a task definition. Each time you
execute the
register_task_definition
call, it creates a new revision of that task
with a version tag. The sux :5 indicates that this is the fifth version.
response = client.create_service(
cluster=' cloudbook ',
serviceName=' predictor ',
taskDefinition=' predict :5' ,
desiredCount=8,
deploymentConfiguration={
' maximumPercent ':100,
' minimumHealthyPercent':50
}
)
Our invocation requires that at least 50% of the requested instances be granted.
The first time we created this service, it took about a minu te to download the
2 GB Docker image from the public Hub and load it into our cluster VMs. In
subsequent runs, it took only a few seconds to start the s ervice, because the image
was local. If you now look at the cluster in the EC2 console, you see the state as
shown in figure 7.13 on the next page.
Next we describe the
cloudbook/predict
service and how it works with the
message queue. The Amazon SQS service is simple to use. Figure 7.14 shows a
slightly abbreviated version of the prediction microservice. (Missing are the details
of how to set up the DynamoDB table, covered in chapter 3, and the machine
learning module, to be described briefly in chapter 10.) The full code i s accessible
from the book website in the Extras tab.
We use the queue service’s message attribute system to pass the article’ s title,
abstract, a nd so urce in each message. (The arXiv d ata’s s ource field p rovides
enough information to train the predictor, but we d o no t use it for tha t purpose;
instead, we append it to the prediction so that the pred ictor’s accuracy can be
evaluated by looking at the data stored in the table.)
117
7.6. Agents and Microservices
Figure 7.13: Eight instances of the predictor and two instances of the table service running.
The code to send the messages to the queue is also simple. We provide the
complete code in notebook 10. We first load the data into three arrays a nd then
send 100 messages using the Amazon SQS method send_message, as follows.
queue = sqs. get_queue_by_name (QueueName=' bookque ')
abstracts , sites , titles = load_docs("path -to -documents",
"sciml_data_arxiv")
for i in range (1330 ,1430):
queue. send_message (MessageBody=' boto3 ',MessageAttributes={
' Title ':{ ' StringValue ':titles[i],
' DataType ': ' String '},
' Source ':{ ' StringValue ':sites[i],
' DataType ': ' String '},
' Abstract ':{ ' StringValue ':abstracts[i],
' DataType ': ' String '}
})
Table 7.1 shows the results of this action wh en using eight instances of the
predictor microservice. As a further refinement, we could, as shown in figure 7. 15 on
page 120, partition the work performed by our microservice across two microservices:
one to pull a task from queue and analyze it, and one to store a result. We can then
mix and match dierent in pu t queues and output services: for example, pulling
from the Amazon queue, ana lyzing on Jetstream, and storing in Google Bigtabl e.
We provide a version with this form on the book website in the Extras tab.
118
Chapter 7. Scaling Deployments
import boto3, time
from socket import gethostname
from predictor import predictor
hostnm = gethostname()
# Create an instance of the ML predictor code
pred = predictor ()
# Create instance of Amazon DynamoDB table ( see chapter 3)
sqs = boto3 .resource(' sqs ',region_name='us-w est - 2 ')
queue = sqs. get_queue_by_name (QueueName=' bookque ')
i=0
while True:
for message in queue .receive_messages (
MessageAttributeNames=[' Title ', ' Abstract ',' Source ']):
timestamp = time.time()
if message .message_attributes is not None :
title = message. message_attributes.
get(' Title '). get (' StringValue ')
abstract = message.message_attributes.
get(' Abstract '). get (' StringValue ')
source = message.message_attributes .
get(' Source'). get (' StringValue ')
predicted = pred.predict(abstract , source)
metadata_item =
{ ' PartitionKey ':hostnm,' RowKey ': str (i),
' date ' : str(timestamp), ' answer ':source,
' predicted ': str(predicted), ' title ':title}
table. put_item(Item= metadata_item)
message.delete ()
i=i+1
Figure 7.14: Abbreviated code for the prediction microservice.
Table 7.1: View of the DynamoDB table after processing 100 messages. The Answer
column references the arXiv source, which can be used to verify the prediction.
PartitionKey RowKey Answer Date Predicted Title
e0bfabe3d880 0 gr-qc 148... Physics Superconducting dark eng ...
e0bfabe3d880 1 physics.optics 148... Physics Directional out-coupling of il .. .
e0bfabe3d880 2 q-bio.PI 148... Bio A guide through a family of p ...
e0bfabe3d880 4 math.PR 148... Math Critical population and error ...
e0bfabe3d880 5 physics.comp 148... Phys Coupling all-atom molecular ...
e0bfabe3d880 7 hep-th 148... Pyysics Nonsingular Cosmology from ...
119
7.6. Agents and Microservices
Figure 7.15: Version 2 of the document classifier in figure 7.11 partitions the microservice
into a major topic selector service and a web service that handles the table s torage .
Because putting a record into the table is much chea per than running the
data analysis predictor, we can increase throughput by rep lica ting the predictor
component. For example, in our Amazon implementation we ran on a two-server
cluster, with one table service per server and multiple predictors per server. We
have configured the table service to wait on a fixed TCP /IP port; hence there can
be no more than one table service per server. Each predictor service si mp ly posts
its result to its local host at that port, providing a simple form of service discovery.
With 20 p redictors and two table services, the system can class ify documents as
fast as we can add them to the queue. (To scale further, we might want multiple
table services per server, requiring a more sophisticated service discovery method.)
7.6.5 Google’s Kubernet es
Google has run its major services us in g the microservice design for years. Recently,
Google released a version of its underlying resource manager as open source, under
the name
Kubernetes
. This service can be both in stal led on a third-party cloud
and accessed within the Google Cloud. Creating a Kubernetes cluster on the
Google Cloud is easy. S elect the “Container Engine.” On the “container clusters”
page, there is a link that allows you to create a cluster. (With the free account you
cannot make a large cluster: you are limited to about four dual-core servers.) Fill
in the form and submit it, and you soon have a new cluster. Clicking on the icon
>_
in the blue banner at the top of the form creates an instance of a “Cloud Shell”
that is automatically authenticated to your account. Next, you must authenticate
120
Chapter 7. Scaling Deployments
your cloud shell with your new cluster. Select your container an d click on the
connect
button to the right to obtain the code to paste into the cloud shell. The
result should now look like figure 7.16.
Figure 7.16: Kubernetes con sole and cloud shell connected to a small cluster.
You interact wi th Kubernetes, which is now running on our small cluster,
via command line calls entered into the cloud shell. Kub ern etes has a dierent
and somewhat more interesting archi tecture than other container m anag ement
tools. The basic unit of scheduling in Kubernetes is the pod, a set of one or more
Docker-style containers together with a set of resources that are shared by the
containers in that pod. When launched, a pod resides on a single server or VM.
This approach has several advantages for the containers in that pod. Because the
containers in a pod all run on the same VM, they all share the same IP and port
space and thus find each other through conventio nal means such as localhost. They
can also share storage volumes that are local to the pod.
To start, l et’s run the notebook in a simple single-contain er pod. Use the
Kubernetes control command kubectl to run the following statements.
# Launch Jupyter and expose its port 8888
kubectl run jupyter --image=jupyter/scipy - notebook --port=8888
# To make visible externally , attach a load balancer
kubectl expose deployment jupyter --type=LoadBalancer
# Get service description
kubectl describe services jupyter
You can then get the IP address for Jupyter from the “LoadBalancer Ingress:”
field of the service des criptio n produced by the third command. (If th at address is
not immediately visible, rerun the command.)
121
7.6. Agents and Microservices
To duplicate the example in the preceding section, we need a queue service
and a way to talk to it. Like Amazon, Google has an excellent message queue
service,
Google Cloud Pub/Sub
,thatsupportsbothpush and pull subscribers.
However, rather than use this service here, we choose to demonstrate how to
distribute the computation over the Internet by putting the queue service on a
dierent cloud. Specifically, we deploy an instance of the open source queue s ervice
RabbitMQ rabbitmq.com
on a VM running on Jetstream. We then use a Python
package called Celery to communicate with the queue service.
Celery is a distributed remote procedure call system for Pytho n programs . The
Celery view of the world is that you have a set of worker processes running on
remote machi nes and a client process that invokes functions that are executed on
the remo te machines. The workers and clients coordinate through a message broker
running on Jetstream. Because Celery is a remote procedure call system, we define
its behavior by creating functions that are annotated with a Celery object task.
When creating a Celery object, we must provide a name and a reference to
the protocol being us ed, which in this case is the
Advanced Message Queui ng
Protocol
(AMQP). We can then run our predictor as follows; the command line
argument provides a link to the specific RabbitMQ server.
>celery worker -A predictor -b 'amqp :// guest@brokerIPaddr '
The code for our solution, shown in figure 7.17 on the following page, includes
the n eeded components for interacting with the Google Cloud Datastore service
that we described in chapter 3. The resources needed to run this example can be
found on the book website in the Extras tab .
We can create as many instances of this microservice as we wish. If we create
multiple instances, they share the load of handling the prediction requests. To
invoke the remote procedure call from a client program, we use an
apply_async
function call. This involves creating a stub version of the function to define its
parameters. For example, the following is a single invocation of our predictor.
from celery import Celery
app = Celery (' predictor' ,broker=' amqp :// guest@brokerIPaddr ' ,\
backend=' amqp ')
@app. task
def predict(statement):
return ["stub call"]
res = predict . apply_async (["this is a science document ... "])
print(res.get())
We have included here a reference to the RabbitMQ broker so that we can
run this in a Jupyter notebook. The
apply_async
call returns immediately and
122
Chapter 7. Scaling Deployments
from celery import Celery
from socket import gethostname
from predictor import predictor
from gcloud import datastore
clientds = datastore.Client()
key = clientds. key(' booktable ')
hostnm = gethostname()
import time
app = Celery (' predictor' ,broker='amqp :// guest@brokerIPaddr ',\
backend=' amqp ')
pred = predictor ()
# Define the functions that we will call remotely
@app. task
def predict(abstract , title , source ):
prediction = pred.predict(statement , source)
entity = datastore.Entity(key=key)
entity[' partition ']=hostname'
entity[' date ']=str(time.time())
entity[' title ' ]=title
entity[' prediction ']=str(prediction)
clientds.put( entity)
return [prediction]
Figure 7.17: An implementation of the document classifier that us es Kubernetes.
returns a future object. To get the actual returned value we have to wait, and
we can do that with a call to
get()
. If we want to send a request to predict the
classification of thousands of documents, we can do this as fo llows.
res = []
for doc in documents:
res. append ( predict . apply_async ([ doc ])
# Now wait for them all to be done
predictions = [result.get() for result in res ]
This statement dispatches a remote procedure call to the message broker. All
our workers then participate in satisfying the requests. The result returned is a
list of future objects. We can resolve them as they arrive, as shown.
Next we must ask Kubernetes to create and manage a collection of workers.
First we must package the worker as a Docker container cloudbook/predictor and
push it to the Docker Hub. As we did with ECS, the Amazon container service,
we must create a task descrip tion , such as the following.
123
7.6. Agents and Microservices
apiVersion: batch/v1
kind: Job
metadata:
name:predict -job
spec:
parallelism: 6
template:
metadata:
name: job -wq
spec:
containers:
-name:c
image: cloudbook/ predictor
args: ["amqp :// guest@brokerIPaddr "]
restartPolicy: OnFailure
We package this document in a file
predict-job.json
.Noticethatthistask
description contains a mix of the elements of the Amazon ECS task descriptor and
the Amazon ECS service arguments. We can now ask Kubernetes to launch the
worker set with the following command. Once they are up and running, they are
ready to respond to requests. Kubernetes handles restarts in case of fail ures.
kubectl create -f predict -job.json
7.6.6 Mesos and Mesosphere
Mesosphere (from Mesosphere.com) is a data center operating system (DCOS)
based on the original Berkeley Mesos system for managing clusters. We describe
here how to install and use Mes osph ere on Microsoft’s Azure cloud. Mesosphere
has four major components:
1. The Apache Mesos distributed system kernel.
2.
The
Marathon
init system, which monitors applications and services and,
like Amazon ECS and Kubernetes, automatically heals any failures.
3. Mesos-DNS, a service discovery utility.
4. ZooKeeper
, a high-performance coordination service to manage the installed
DCOS services.
When Mesosphere is deployed, it has a master node, a backup master, and a
set of workers that run the service containers. Azure supports the Mesosphere
124
Chapter 7. Scaling Deployments
components listed above as well as another container management service called
Docker Swarm. Azure also provides a set of
DCOS
command li ne tools. For example,
to see the full set of a ppl icati ons that Marathon is managing, we can issue the
command dcos marathon app list, with the result shown in figure 7.18.
>dcosmarathonapplist
ID
MEM CPUS TASKS HEALTH DEPLOYMENT CONTAINER
CMD
/nginx
16 0.1 1/1 ––- ––- DOCKER
None
/rabbitsend3
512 0.1 0/0 ––- ––- DOCKER
None
/spark
1024 1 1/1 1/1 ––- DOCKER
/sbin/init.sh
/storm-default
1024 1 2/2 2/2 ––- DOCKER
./bin/run-
with-marathon.sh
/zeppelin
2048 1 1/1 1/1 ––- DOCKER
sed ...
Figure 7.18: DCOS command line example.
Mesosphere also provides excellent interactive service management consoles.
When you bring up Mesos on Azure through the Azure Container Services, the
console presents a view of your service health, current CPU and memory allocations,
and current failure rate. If you next select the services view and then the Marathon
console, you see the details of the applications that Marathon is managing, as
shown in figure 7.19. You see that from a previous session that we currently have
running one instance of NGINX, one instance of Spark, two instances of Storm,
and one instance of Zeppelin.
Figure 7.19: View of the Marathon console.
The process of launching applications composed of Docker containers is sim il ar
to that used in Amazon ECS and Kubernetes. We start with a task description
125
7.6. Agents and Microservices
JSON file such as the following, which specifies the type of contai ner, the Dockerfile
to use, the number of instances required, and other information.
{
"container": {
"type": "DOCKER",
"docker": {
"image": "cloudbook/predictor"
}
},
"id": "worker",
"instances": 1,
"cpus": 0.2,
"mem": 512,
}
Next, send this descriptor to Marathon with the following command.
dcos marathon app add config .json
Notice that this task descriptor specifies only one instance. You can issue the
following command to increase this instance count to nine.
dcos marathon app update worker env='{"instances ":"9"} '
We now return to our document classifier example. We completed a version of
the full classifier based on the concepts in figure 7.10. The actual implementation
splits the predictor service into two parts: the core document classsifier and a
service that pushes the data into an Azure table. Once again, we u se RabbitMQ
as a message broker, as shown in figure 7.20 on the following page.
In this case the first stage of classification performs the ini tial partition into
the main subject headings; depending on the results, it then pushes the document
into a queue specific to the main subject. We assigned one or more instances of the
classifier to each topic queue, with numbers determined by the size of th at main
topic queue. (For example, physics has more documents than any other subject,
and thus we assigned that topic more ins tances. ) The subarea classifiers are based
on the same code base as the main topic classifiers. They specialize their behavior
by loading trained data models specific to that subject and its subareas. A detailed
analysis of system performance as a function of the number of classifier instances is
av ailable online [
133
]. In brief, using nine servers and computing speedup relative
to the time for a single classifier to compute the ana lysis , the system achieved a
speedup of about 8.5 using 16 classifier in stan ces.
126
Chapter 7. Scaling Deployments
Figure 7.20: Diagram of the full subject area classifier.
Figure 7.21: DCOS system status showing the load on each server in the cluster.
The Azure DCOS container service status tool provides a useful view of system
status. Figure 7.21 shows the load on each server when eight classifier services are
running. Notice th at one server has two instances while two servers are empty.
This apparent imbalance is a feature, not a bug: at one point duri ng the run, two
services crashed, causing DCOS to migrate the failed instances to other nodes.
127
7.7. HTCondor
7.7 HTCondor
The
HTCondor research.cs.wisc.edu/htcondor
high-throughput computing
system is a particularly mature technology for scientific computing in the cloud.
The following are examples of HTCondor applications.
The
Globus Genomics
system [
187
] uses HTCondor to schedule large numbers
of bioinformatics pipelines o n the Amazon cl oud . We describe this system in more
detail in section 14.4 on page 303, as an example of how to build scalable SaaS.
GeoDeepDive geodeepdive.org
, part of the NSF EarthCube project, is an
infrastructure for text and data mi nin g that uses HTCondor for large analyses of
massive text collections.
Pegasus
[
109
] i s a workflow system fo r managin g large
scientific computations on top of HTCon dor. A collaboration between Google
and Fermilab’s
HEPCloud project
used HTCondor to process data from a high
energy physics experiment. The Google Cloud Preemptible Vi rtual Machines
provided 160,000 virtual cores and 320 TB of memory at a cost of only $1,400
per hour. With the data stored in Google Cloud Storage, the tasks spawned by
HTCondor on the VMs read data using
gcfuse
, a mechanism to mount storage
data buckets as Linux file systems . The output was returned to Fermilab over the
U.S. Department of Energy’s ESnet network.
7.8 Summary
The development of the digital computer led computation to join experiment and
theory as a third major paradigm of scientific d iscovery and engineering design.
Supercomputers became essential lab instruments. In the 40 years that followed,
the power of supercomputers has grown by a factor of over one billion. But now
science is rapidly evolving into a
fourth paradigm
[
153
] of data-driven discovery.
The technology companies whose busi ness depends on understanding their
online customers are the same companies that have built the cloud to handle data
analytics workloads. As a result, data science and machine learning are now among
the most in-demand technical specialties [
215
]. To meet the needs of “big data”
analysis, cloud builders are rapidly evolving cloud data centers architectures down
paths pioneered by the supercomputer vendors.
Taking advantage of large-scale parallelism was a first s tep to using the cloud
for analytics. Bulk synchronous data parallelism vi a Hadoop and graph-driven
execution are standard approaches to extracting enough concurrency to pull
knowledge from mountains of data. Running and m anag ing massive on lin e systems
also requires new approaches to parallelism that have made it easier to deploy and
128
Chapter 7. Scaling Deployments
maintain thousands of processes. Containers and microservice archi tectures have
become the basis for new cloud software engineering parad igm s.
As public clouds evolved, their customers demanded better and simpler ways of
using the clo ud to compute at ever larger scales. In response, public cloud vendors
introduced the services discussed in this chapter. Some customers need traditi onal
HPC capabilities to run legacy HPC MPI codes. As we have illustrated, Amazon
and Microsoft now have services that customers can use to build medium-scale
HPC clusters on demand. While still far from the performance level of the latest
supercomputers, these cloud solutions can be deployed quickly, used for a specific
task, and then shut down, all for a tiny fraction of the cos t.
Other cloud customers need less tightly coupled computing. Amazon, Azure,
and Google have excellent scale-out container management services that are easy
to use and manage. As the importance of machine lea rnin g has grown in the tech
industry, so too has the need to train massive, deep neural networks. This need has
driven the growth of GPU-bas ed computing in the cloud. First used internally by
some cloud vendors for deep lea rnin g, thi s technology has now emerged as a new
class of multiGPU server instances available to anybody. Combining multiGPU
servers with the ability to build a cus tom HPC cluster makes it possible to deploy
truly powerful on-demand computing platforms.
In the chapters that follow we return to the theme of scale as it relates to data
analysis, machine learning, a nd event processing.
7.9 Resources
We recommend the excellent tutorial by Sebastien Goas guen [
140
] on how to deploy
and manage Kubernetes from Python. Microsoft recently released the Azure Batch
Shipyard [6] open source tools for managing clusters of containers.
In addition to notebook 9 and notebook 10, introduced in the text, the source
code for the C program in the Amazon CfnCluster examp le from section 7.2.3 on
page 100 is on the book website in the
Extras
tab, as are the source code and
data for the Amazon science document classification example, and the source code
for the simple Docker example of section 7.6.5 on page 120.
For those interested in the evolution of cloud data center networks, we recom-
mend Greenberg et al.’s review of the challenges inherent in building networks for
cloud data centers [
145
] and two fascinati ng papers by Google engineers describ in g
the techniques used to organ ize Google’s data center networks [160, 236].
129