Chapter 8
Data Analytics in the Cloud
“Science is what we understand well enough to explain to a computer.
Art is everything else we do.
—Donald Knuth
What we know today as public clouds were originally created as internal data
centers to support services such as e-commerce, e-mail, and web search, each of
which involved the acquisition of large data collections. To optimize these services,
companies started performing massive amounts of analysis on those data. After
these data centers morphed into public clouds, the methods developed for these
analysis tasks were increasingly made available as services and open source software.
Universities and other companies have also contributed to a growing collection
of excellent open source tools. Together this collection represents an enormous
ecosystem of software supported by a large community of contributors.
The topic of data analytics in the cloud is huge and rapi dly evolving, and could
easily fill an entire volume itself. Furthermore, as we observed in chapter 7, science
and engineering are themselves rapidly evolving towards a new data-driven fourth
paradigm of data-intensive discovery and design [
153
]. We survey some of the most
significant approaches to cloud data analytics and, as we have done throughout
this book, leave you with examples of experiments that you can try on your own.
We begin with the first major cloud data analysis tool, Hadoop, and describe
its evolution to incorporate Apache YARN. Rather than describe the traditional
Hadoop MapReduce tool, we focus on the more modern and flexible Spark system.
Both Amazon and Microsoft have integrated versions of YARN into their standard
service oerings. We describe how to use Spark with the Amazon version of YARN,
8.1. Hadoop and YARN
Amazon Elastic MapReduce, which we use to anal yze Wikipedia data. We also
present an example of the use of the Azure version of YARN, HDInsight.
We then turn to the topic of analytics on truly massive data collections. We
introduce Azure Data Lake and illustrate the use of both Azure Data Lake Analytics
and Amazon’s similar Ath ena analytics platform. Finally, we describe a tool from
Google called Cloud Datalab, which we use to explore National Oceanographic
and Atmospheric Administration (NOAA) data.
8.1 Hadoop and YARN
We have already introduced Hadoop and the MapReduce concept in chapter 7.
Now is the ti me to go beyond an introduction. When Hadoop was introduced, it
was widely seen as the tool to use to solve many large data analysis problems. It
was not necessarily ecient, but it worked well for extremely large data collections
distributed over large clusters of servers.
The
Hadoop Distributed File System
(HDFS) is a key Hadoop buil d ing
block. Written in Java, HDFS is completely portable and based on stan dard
network TCP sockets. When deployed, it has a single
NameNode
, used to track
data location, and a cluster of
DataNodes
, used to hold the distributed data
structures. Individual files are broken into 64 MB blocks, which are distributed
across the DataNodes and also replicated to make the system more fault tolerant.
As illustrated in figure 8.1 on the following page, the NameNode keeps track of
the location of each file block and the replicas.
HDFS is not a POSIX file system: it is write-once, read-many, and only
eventually consistent. However, command li ne tools make it usable in a manner
similar to a standard Unix file s ystem. For example, the following commands create
a “directory” in HDFS, pull a copy of Wikipedia from a website, push those data
to HDFS (where they are blocked, replicated and stored), and list the directory.
$hadoop fs -mkdir /user/wiki
$curl -s -L http :// dumps. wikimedia .org/enwiki /... multisream .xml.bz2\
|bzip2-cd|hadoopfs-put-/user/wiki/wikidump-en.xml
$hadoop fs -ls /user/wiki
Found 1 items
-rw -r--r-- hadoop 59189269956 21:29 /user/wiki/wikidump-en.xml
Hadoop and HDFS were originally created to support only Hadoop MapReduce
tasks. However, the ecosystem rapidly grew to include other tools. In addition, the
original Hadoop MapReduce tool could not support important application classes
136
Chapter 8. Data Analytics in the Cloud
Figure 8.1: Hadoop Distributed File System with four DataNodes and two files broken
into blocks and distributed. The NameNode keeps track of the blocks and rep licas.
such as those requiring an iterative application of MapReduce [
81
] or the reuse of
distributed data structures.
Apache
YARN
(Yet Another Resource Negotia tor) represents the evolution of
the Hadoop ecosystem into a full distributed job management system. It has a
resource manager and scheduler that communicates with node manager processes
in each worker node. Applications connect to the resource manager, which then
spins up an application manager for that application instance. As illustrated in
figure 8.2 on the next page, the application manager interacts with the resource
manager to obtain “containers” for its worker nodes on the cluster of servers. This
model allows multiple applications to run on the system concurrently.
YARN is similar in many respects to the Mesos system described in chapter 7.
The primary dierence is that YARN is designed to schedule MapReduce-style
jobs, whereas Mesos is designed to support a more general class of computations,
including containers and microservices. Both systems are widely used.
8.2 Spark
Spark’s design addresses limitations in the original Hadoop MapReduce computing
paradigm. In Hadoop’s linear dataflow structure, programs read input data from
disk, map a function across the data, reduce the results of the map, and store
reduction results on disk. Spark supports a more general graph execution model
137
8.2. Spark
Figure 8.2: YARN distributed resource manager architecture.
that allows for iterative MapReduce as well as more ecient data reuse. Spark
is also interactive and much faster than pure Hadoop. It runs on both YARN
and Mesos, as well as on a laptop and in a Docker container. In the following
paragraphs we provide a gentle introduction to Spark and present some data
analytics examples that involve its use.
A central Spark construct is the
Resilient Distributed Dataset (RDD)
,a
data collection that is distributed across servers and mapped to disk or memory,
providing a restricted form of distributed shared memory. Spark is implemented
in
Scala
, an interpreted, statically typed object-functional language. Spark has
a library of Scala parallel operators, similar to the Map and Reduce operations
used in Hadoop, that perform transformations on RDDs. (The library also has
a nice Python binding.) More precisely, Spark h as two types of operations:
transformations
that map RDDs into new RDDs and
actions
that return values
to the main program: usually the read-eval-print-loop, such as Jupyter.
8.2.1 A Simple Spark Program
We introduce the use of Spark by using it to implement a trivial program that
computes an approximation to via this identity:
lim
n>1
n
X
i=1
1
i
2
=
2
6
(8.1)
Our program, shown in figure 8.3 on the following page and in n otebook 11,
uses a map operation to compute
1
i
2
for each of
n
values of
i
, and a reduce operation
138
Chapter 8. Data Analytics in the Cloud
to sum the resul ts of those computations. The p rogram creates a one-dimensional
array of integers that we then convert to an RDD partitioned into two pieces. (The
array is not big, but we ran this example on a dual-core Mac Mini.)
Figure 8.3: Computing with Spark.
In Spark, the partitions are distributed to the workers. Parallelism is achieved
by applying the computational p arts of Spark operators on each partition in parallel,
using, furthermore, multiple threads per worker. For actions, such as a reduce,
139
8.2. Spark
most of the work is done on each partition and then across partitions as needed.
The Spark Python library exploits Python’s ab ili ty to create anonymous functions
using the lambda operator. Code is generated for these functions, and they can
then be sh ip ped by Spark’s work scheduler to the workers for execution on each
RDD partition. In this case we use a simple MapReduce computati on.
8.2.2 A More Interesting Spark Program: K-means Clustering
We now consider the more interesting example of
k
-means clustering [
150
]. Suppose
you have 10,000 points on a plane, and you want to nd
k
new points that are the
centroids o f
k
clusters that partition the set. In other words, each point is to be
assigned to the set corresponding to the centroid to which it is closest. Our Spark
solution is in notebook 12.
We use an array
kPoints
to hold the
k
centroids. We initialize this array with
random values and then apply an iterative MapReduce algorithm, repeating these
two steps until the centroids have not moved far from their previous position:
1.
For ea ch point, find the index of the centroid to which it is nearest and assign
the point to the cluster associated with that nearest centroid.
2.
For each cluster, compute the centroid of all points in that cluster, and
replace the previous centroid in kPoints with that new centroid.
We first define the following function to compute, for a given point
p
, the index
of the centroid in kPoints to which p is nearest.
def closestPoint(p, kPoints):
bestIndex = 0
closest = float("+inf")
for i in range( len(kPoints)):
tempDist = np.sum (( p - kPoints [ i]) ** 2)
if tempDist < closest:
closest = tempDist
bestIndex = i
return bestIndex
The locations of our 10,000 points are in an array called
data
.Weusethe
following map expression to create, as a new RDD, the set of tuples (
j,
(
p,
1)) in
which j is the index of the closest centroid for each p in data.
data. map( lambda p: (closestPoint(p, kPoints), (p, 1)))
140
Chapter 8. Data Analytics in the Cloud
This use of tup les of form (
p,
1) is a comm on MapReduce idiom. We want to
compute for each j the sum of all tuples (p, 1), to obtain tuples of the form
(j, (
X
p,
X
1)).
To do this, we use a reduceByKey operation as foll ows.
reduceByKey(lambda x, y : (x[0] + y[0], x[1] + y[1]))
The sum of the 1s is just the count of the
j
tuples in our set, so we can compute
the centroid by dividing the sum of the
p
s by this count. This is an RDD of size
k
, and we can collect it and use it as the
kPoints
for the next iteration. The full
code is now in an iteration as follows.
tempDist = 1.0
while tempDist > convergeDist:
newPoints = data \
. map ( lambda p: (closestPoint(p, kPoints), (p, 1))) \
.reduceByKey(lambda x, y : (x[0] + y[0], x[1] + y[1])) \
. map ( lambda x:(x[0],x[1][0]/x[1][1]))\
.collect()
tempDist = sum (np.sum (( kPoints [ i] - y) ** 2) \
for (i, y) in newPoints)
for (i, y) in newPoints:
kPoints[i] = y
To summarize: We have a map, fol lowed by a reduce-by-key, followed by another
map, and finally a collect that brings the values of newPoints back to the read
eval-print-loop. Each Spark operation i s executed on the cluster of cores in which
the RDD is located. In fact, what the P ython program is doing is compiling a
graph that is then executed by the Spark engine.
We note that this example was designed to illustrate some of the standard
Spark idioms, and it is not the best
k
-means algorithm. Spark has a machine
learning library with a much better implementation.
8.2.3 Spark in a Container
Running a containerized version of Spark on your laptop is straightforward. You
can also easily run Spark on a remote VM with many cores: as long as the VM
has Docker in stalled, you can follow the same procedure described i n section 6.2
on page 87 but with a dierent version of Jupyter.
141
8.2. Spark
docker run -e GEN_CERT=yes -d -p 8888:8888 \
-v /tmp/docmnt :/home/jovyan/work/docmnt \
jupyter/all -spark-notebook start-notebook.sh \
--NotebookApp.password=' sha1 :.... '
When developing the
k
-means example, provided as notebook 12, we used a
container with a host drive
/vol1/dockerdata
with 10 GB of memory and cores
0, 1, 2, and 3. We created thi s container as follows.
docker run -e GEN_CERT=yes -d -p 8888:8888 --cpuset -cpus 0-3 -m 10G\
-v /tmp/docmnt :/home/jovyan/work/docmnt \
jupyter/all -spark-notebook start-notebook.sh \
--NotebookApp.password=' sha1 :.... '
8.2.4 SQL in Spark
Python and Spark can also execu te SQL commands [
64
]. We illustrate this
capability in notebook 13 with a simple example. We have a comma-separated
value (CSV) file, hvac.csv, with a hea der and three data lin es, as follows .
Date , Time , desired temp , actual temp , buildingID
3/23/2016 , 11:45, 67, 54, headquarters
3/23/2016 , 11:51, 67, 77, lab1
3/23/2016 , 11:20, 67, 33, coldroom
We load thi s file into Spark and create an RDD by applying the
textFile
operator to the Spark context object. We convert the text file RDD into an RDD
of tuples by stripping o the header and mapping the rest to typed tuples. We
create an SQL context object and schema type, and then an SQL DataFrame [
45
]
(see section 10.1 on page 192), hvacDF.
from pyspark.sql.types import *
hvacText = sc.textFile ("/pathto/file/hvac.csv")
hvac = hvacText .map( lambda s: s.split(",")) \
. filter( lambda s: s[0] != "Date") \
. map ( lambda s:(str (s[0]), str (s[1]),
int(s[2]), int(s[3]), str (s[4]) ))
sqlCtx = SQLContext(sc)
hvacSchema = StructType([StructField("date", StringType (), False),
StructField("time", StringType(), False),
StructField("targettemp", IntegerType (), False),
StructField("actualtemp", IntegerType (), False),
StructField("buildingID", StringType (), False)])
hvacDF = sqlCtx .createDataFrame(hvac , hvacSchema )
142
Chapter 8. Data Analytics in the Cloud
We are now ready to execute SQL operations on our data. For example, we
can use the
sql()
method to extract a new SQL RDD DataFrame consisting of
the buildingID column with the following command.
x=sqlCtx.sql(' SELECT buildingID from hvac')
Yet more fun is to use Jupyter and IPython
magic operators
, which allow
you to define small extensions to the language. Using an operator developed by
Luca Canali, we can create a magic operator that allows us to enter the SQL
command in a m ore natural way and have the results printed as a table. Using this
new magic operator
%%sql_show
, we can create a table consisting of the buildingID,
the data, and the dierence between the desired and actual temperatures.
%% sql_show
SELECT buildingID ,
(targettemp - actualtemp) AS temp_diff ,
date FROM hvac
WHERE date ="3/23/2016"
+ ------------+---------+---------+
|buildingID|temp_diff| date|
+ ------------+---------+---------+
|headquarters| 13|3/23/2016|
|lab1|-10|3/23/2016|
|coldroom| 34|3/23/2016|
+ ------------+---------+---------+
We provide details on the SQL magic operators and a l ink to Can ali ’s b log in
notebook 13.
8.3 Amazon Elastic MapReduce
Deploying Hadoop on a cluster used to require a team of systems professionals.
Fortunately, products from Amazon, Microsoft, Google, IBM, and others have
largely automated this task. Amazon’s
Elastic MapReduce (EMR)
service
makes creating a YARN cluster trivial. All you need to do is select your favorite
combination of tools from their preconfigured lists, specify the in stan ce type and
number of worker nodes that you want, set up your usual security rules, and click
Create cluster . In about two minutes you are up and running.
The configuration that we found most attractive combines Spark, YARN, and
an interactive web-based n otebook tool called
Zeppelin zeppelin.apache.org
.
Zeppelin is similar to Jupyter, has an impressive user interface and graphics
143
8.3. Amazon Elastic MapReduce
capabilities, and is highly integrated with Spark. For consis tency, however, we
stick with Jupyter in our presentation here.
When the EMR cluster comes up, it is already running one instance of Spark
on YARN as well as HDFS. Getting Jupyter installed and running with YARN
take a few extra commands, as shown in notebook 14.
To illustrate the use of Spark on EMR, we count how often famous people’s
names occur in Wikipedia. The program is in figure 8.4. We first load a small
sample of Wikipedia access logs—from 2008 to 20 10—from S3. Since this file
contains only about four million records, our text-file RDD initially has just one
partition. Thus, we next repartition the RDD into 10 segments, so that we can
better exploit the parallelism in the Spark operators in subsequent steps. Each
line of the text file comprises an identifier, the name of the page accessed, and an
associated access count, separated by blank characters. To make these data easier
to work with, we transform each line into an array by splitting on blank characters.
To count the number of hits for each person, we defin e a functi on
mapname
that
returns the name of the person in the page title, and execute a m ap to replace
each row with a new pair consisting of the name and the value in the count field
for that row. We then reduce by the person name and add the hit counts. To look
at the list ranked by hit count, we execute the pipeline defined by RDD
remapped
with a version of the take() function as follows.
remapped.takeOrdered (20, key = lambda x: -x[1])
[( ' Lady_Gaga ',4427),
( ' Bill_Clinton',4221),
( ' Michael_Jackson',3310),
( ' Barack_Obama',2518),
( ' Justin_Bieber',2234),
( ' Albert_Einstein',1609),
( ' Byron',964),
( ' Karl_Marx' ,892),
( ' Arnold_Schwarzenegger',820),
( ' Bill_Gates',799),
( ' Steve_Jobs',613),
( ' Vladimir_Putin',563),
( ' Richard_Nixon',509),
( ' Vladimir_Lenin',283),
( ' Donald_Trump',272),
( ' Nicolas_Sarkozy',171),
( ' Hillary_Clinton',162),
( ' Groucho_Marx',152)]
( ' Werner_Heisenberg',92),
( ' Elon_Musk' ,21)]
144
Chapter 8. Data Analytics in the Cloud
# Define list of famous names
namelist = [' Albert_Einstein', ' Lady_Gaga', ' Barack_Obama',
' Richard_Nixon', ' Steve_Jobs', ' Bill_Clinton', ' Bill_Gates',
' Michael_Jackson',' Justin_Bieber' , ' Vladimir_Putin',
' Byron ', ' Donald_Trump', ' Hillary_Clinton', ' Nicolas_Sarkozy',
' Werner_Heisenberg', ' Arnold_Schwarzenegger', ' Elon_Musk ',
' Vladimir_Lenin ', ' Karl_Marx', ' Groucho_Marx']
# Transform a line into an array by splitting on blank characters
def parseline(line):
return np . array ([ x for x in line .split('')])
# Filter out lines not containing famous name in the page title
def filter_fun(row , titles):
for title in titles:
if row [1]. find (title ) > -1:
return True
else:
return False
# Return name of person in page title
def mapname (row, names):
for name in names:
if row [1]. find (name ) > -1:
return name
else:
return 'huh?'
# ------ Load and process data -------------------------------------
# Load Wikipedia data from S3
rawdata = sc.textFile( \
"s3://support.elasticmapreduce/bigdatademo/sample/wiki")
# Repartition initial RDD into 10 segments , for parallelism
rawdata = rawdata.repartition (10)
# Split each line into an array
data = rawdata .map (parseline)
# Filter out lines without a famous name
filterd = data.filter ( lambda p: filter_fun(p, namelist))
# Map : Replace each row with ( name , count ) pair .
# Reduce by name: Add counts
remapped =filterd.map ( lambda row :( mapname (row , namelist ), int(row[2])))
.reduceByKey(lambda v1, v 2 : v1+ v 2 )
Figure 8.4: Our program for counting famous people in Wikipedia.
145
8.3. Amazon Elastic MapReduce
We discover that pop stars are more popular than future presidential candidates,
at least during 2008 to 2010. We choose not to p ursu e this research further.
To conclude this example, we load the full Wikipedia dump and make a list of
all of the main pages. We have stored the full Wikipedia dump file in HDFS using
the code presented in section 8.1 on page 136. The dump is a 64 GB file with each
line containing one line from an XML file. We can load it directly from HDFS
as follows using the
hdfs:///
prefix. The file has over 900 million lines. When
loaded, it comprises 441 partitions in the RDD.
wikidump = sc.textFile ("hdfs :/// user/wiki/wikidump -en.xml")
wikidump.count()
927769981
wikidump.getNumPartitions ()
441
To determine the titles of each Wikipedia entry, we filter for the lines containing
XML tag <title>. We look at the first 12 of the more than 17 million entries.
def findtitle(line):
if line .find('<title >')>-1:
return True
else:
return False
titles = wikidump.filter( lambda p: findtitle(p))
titles. count ()
17008269
titles. take (12)
[u' <title>AccessibleComputing </title >',
u ' <title >Anarchism </title>',
u ' <title >AfghanistanHistory </title >',
u ' <title >AfghanistanGeography </title >',
u ' <title >AfghanistanPeople </title >',
u ' <title >AfghanistanCommunications </title >',
u ' <title >AfghanistanTransportations </title >',
u ' <title >AfghanistanMilitary </title >',
u ' <title >AfghanistanTransnationalIssues </title >',
u ' <title >AssistiveTechnology </title >']
u ' <title >AmoeboidTaxa </title >',
u ' <title >Autism </title >',
To do more with these data, you need to assemble each of the 17 million XML
records for each page into single items. An exercise for the ambitious reader!
146
Chapter 8. Data Analytics in the Cloud
8.4 Azure HDInsight and Data Lake
Microsoft has long had a MapReduce framework called Cosmos that is u sed as
the main data analytics engine for many internal projects such as the Bing search
engine. Cosmos is based on a directed graph execution model and is programmed
in an SQL-like language called SCOPE . While Cosmos was a candidate for release
as a general MapReduce tool for Azure users, the comp any decided that the
Hadoop/YARN ecosystem was of such great interest to their customers that they
would keep Cosmos internal and support YARN as the public oering.
Called
HDInsight
on Azure, this service supports Spark, Hive, HBase, Storm,
Kafka, and Hadoop MapReduce, and is backed by a guarantee of 99.9% availability.
Programming tools include integration into Visual Studio, Python, R, Java, and
Scala as well as .Net languages. HDInsight is based on the Ho rtonworks Data
Platform distribution integrated with Azure security. All of the usual Hadoop and
YARN components are there, in clu ding HDFS as well as tools that integrate other
Microsoft business analytics tools such as Excel and SQL Server.
Creating an HDInsight cluster with Spark and Jupyter already installed is easy.
As with many Azure services, there is a preconfigured template that you can use.
The complete instruction s are online [
194
]. Clicking on the Deploy to Azure link
in the documents takes you to your Azure account and sets up the script. You
only need to fill in a few standard account details, such as a name for your cluster,
the login id, and the password for the SSH and master nodes. After a few minutes
the cluster is up. Going to the Azure portal, look for your new cluster and click
on the name. You should s ee an image l ike that in figure 8.5 on the next page.
Clicking on the HDInsight Cluster Dashboard icon provides you with live status
data about the clus ter, such as memory, network, and CPU use. Clicking on
the Jupyter icon first authenticates you and then takes you to the Jupyter home,
where you have a choice of PySpark or Scala notebooks. Both directories contain
excellent tutorial examples.
HDInsight uses a version of HDFS that is implemented on top of standard
Azure Blob storage. You can load a file with a comma nd l ike the following.
newRDD = spark.sparkContext.textFile (' wasb :/// mycontainer/file .txt')
Rather than go through another Spark example here, we defer further discussion
to chapter 10, where we use Spark in an HDInsight machine learning example.
147
8.4. Azure HDInsight and Data Lake
Figure 8.5: Azure portal view of a HDInsight cluster.
8.4.1 Azure Data Lake Store
HDInsight is part of something larger called
Azure Data Lake
, as shown in
figure 8.6 on the following page. Data L ake also includes the Azu re Data Lake
Store, a data warehouse for petascale data collections. This store can be thought
of as a giant extension of HDFS in the cloud. It does not have the 500 TB storage
limits of Azure blobs, and is designed for massive throughput. It supports both
structured and unstructured data. The access protocol fo r the Data Lake Store,
WebHDFS, is the RES T API fo r H DFS. Consequentl y you can access the Data
Lake Store from anywhere, and with the same commands that you use to access
HDFS. A Python SDK [
7
] allows access to the store from Jupyter or via a command
line tool using familiar HDFS commands, as shown below.
>pythonazure/datalake/store/cli.py
azure > help
Documented commands (type help <topic >):
========================================
cat chmod close du get help ls mv quit rmdir
touch chgrp chown df exists head info mkdir put rm
tail
azure >
148
Chapter 8. Data Analytics in the Cloud
Figure 8.6: Conceptual view of the components that make up the Azure Data Lake.
8.4.2 Data Lake Analytics
Azure Da ta Lake Analytics consists of HDIns ight and associated tools (Spark,
Hadoop, etc.), plus a data analytics tool called
U-SQL
for scripting large analytics
tasks. U-SQL combines SQL queries and declarative program functions written in
C#, Python, or R. U-SQL is intended for massively parallel analysis of terabyte-
to petabyte-size data collections. When you write a U-SQL script, you are actually
building a graph . As shown in figure 8.7 on the next page, the program is a graph
of queries in which regular declarative program functions can be embedded.
When you run a U-SQL job, you specify the degrees of parallelism that you
wish to exploit for each of the query tasks in the computation. The result is a
directed acyclic graph in which some nodes are parallelized according to your
suggestions. You do n ot explicitly allocate VMs or containers, and you are charged
for the total execution time of the graph nodes.
8.5 Amazon Athena Analytics
Athena, a recent addition to the Amazon analytics toolbox, is designed to allow
users to query data in Amazon S3 without having to launch VM instances. Like
Data Lake Analytics, Athena is an example of the concept of serverless computing
that we introduced briefly in chapter 4.
149
8.6. Google Cloud Datalab
Figure 8.7: A U-SQL program defin es a graph of queries. In the first step an object
searchlog
is extracted from a file. In step two the object
rs1
is created from
searchlog
by extracting items with region “en-gb”. In the final step the object
rs1
is converted to a
CSV file and saved.
Athena is accessed via the Amazon portal, which provides an i nteractive query
tool bas ed on standard SQL. You first put your data in S3 in one of several standard
forms, including text files, CSV files, Apache web logs, JSON files, and a column-
oriented format such as
Apache Parquet
. (Parquet files store the columns
of a data table in contiguous locations, suitable for ecient compression and
decompression into distributed file systems like HDFS. They are easily generated
and consumed by any of the Hadoop ecosystem of tools, includin g Sp ark.)
The Athena query editor allows you to define th e schema for your data and
the method to be used to extract the data from the S3 source. Athena treats data
in S3 as read-only, and all transformations are made in the internal Athena engine
and storage. Once you have your data defined, you can explore them with th e
query editor and visualize results with other tools such as Amazon QuickSight.
Athena is designed to make performing interactive analytics on large data
collections easy. Since the service is new, we do not provide examples of how to
use it for research data; we will do so in the future.
8.6 Google Cloud Datalab
The Google Cloud provides a data analytics tool called
BigQuery
, which is in
many ways similar to Athena. A big dierence is that while Athena is based on the
150
Chapter 8. Data Analytics in the Cloud
standard S3 blob store, BigQuery builds on a special data warehouse. Google hosts
interesting public datasets in the BigQuery warehouse, including the following.
The names on all U.S. social security cards for births after 1879. (The table
rows contain only the year of birth, state, first name, gender, and count as
long as it is greater than five. No social security numbers are included.)
New York City taxi trips from 2009 to 2015.
All stories and comments from Hacker News.
U.S. Department of Health weekly records of diseases reported from each
city and state from 1888 to 2013.
Public data from the HathiTrust and the Internet Book Archive.
Global Summary of the Day (GSOD) weather from the National Oceano-
graphic and Atmospheric Administration (NOAA) for 9,000 weather stations
between 1929 and 2016.
Google’s latest addition to BigQuery is a Jupyter-based tool called
Datalab
.
(At the time of this writing, this is still a “beta” product, so we do not know what
its future holds.) You can run Datalab either on their portal or on your own
laptop. To run Datalab on your laptop, you need to have Docker installed. Once
Docker is running and you have created a Google cloud account and created a
project, you can launch Datalab with a simple docker command as illustrated in
their quick-start guide [
24
]. When the container is up and running, you can view
it at http://localhost:8081. What you see at first is shown in figure 8.8.
Figure 8.8: Datalab home screen. The view you see is the initial notebook hierarchy. Inside
docs is a directory called notebooks that contains many great tutorials and samples.
We use two examples to illustrate the use of Datalab and the public data
collections. Further details are availab le i n notebook 15 and notebook 16.
151
8.6. Google Cloud Datalab
8.6.1 Rubella in Washington and Indiana
The Google BigQuery archive contains a Centers for Disease Control and Prevention
(CDC) dataset concerning diseases reported by state and city over a long period.
An interesting case is ru bella, a virus also known as the German measles. Today
this disease has been eliminated in the U.S. through vaccination programs, excep t
for those people who catch it in other countries where it still exists. But in the
1960s it was a major problem, with an estimated 12 million cases in the U.S. from
1964 through 1965 and a significant number of newborn deaths and birth defects.
The vaccine was introduced in 1969, and by 1975 the disease was almost gone.
The SQL script in figure 8.9 on the fo llowing page is based on an example in the
Google BigQuery demos
cloud.google.com/bigquery
. It looks for occurrences of
rubella in two states over 2,000 mi les apart, Washington and Indiana, in the years
1970 and 1971. Thi s code also illustrates the SQL “magic” operators built into
Datalab, which here are used to create a callable module called rubella.
We can invoke this query in a Python statement that captures its result as a
Pandas DataFrame and pulls apart the time stamp fields and data values:
rubel = bq. Query( rubella ).to_dataframe ()
rubelIN = rubel[rubel[' cdc_reports_state ']== 'IN ']. \
sort_values(by=[' cdc_reports_epi_week '])
rubelWA = rubel[rubel[' cdc_reports_state ']== 'WA ']. \
sort_values(by=[' cdc_reports_epi_week '])
epiweekIN = rubelIN[' cdc_reports_epi_week']
epiweekWA = rubelWA[' cdc_reports_epi_week']
rubelINval = rubelIN[' cdc_reports_total_cases']
rubelWAval = rubelWA[' cdc_reports_total_cases']
At this point a small adjustment must be made to the time stamps. The CDC
reports times in epidemic weeks, and there are 52 weeks in a year. So the time
stamp for the first a nd last weeks of 1970 are 197000 and 197051, respectively;
the next week (the first week of 1971) is then 197100. To obtain timestamps that
appear contiguous, we make a small “time slip” as follows.
rrealweekI = np.empty([len (epiweekIN)])
realweekI [:] = epiweekIN [:] -197000
realweekI [51:] = realweekI [51:] -48
Applying the same a djus tment to
epiweekWA
gives us s ometh ing that we can
graph. Figure 8. 10 shows the progress of rubella in Washington and Indiana over
two years. Note that outbreaks occur at about the same time in both states and
that by late 1971 the disease is nearly gone. (Continuing the plot over 1972 and
1973 shows that subsequent flare-ups diminish rapidly in size.)
152
Chapter 8. Data Analytics in the Cloud
%% sql -- module rubella
SELECT *
FROM (
SELECT
*,MIN(zrank) OVER (PARTITION BY cdc_reports_epi_week)AS zminrank
FROM (
SELECT
*, RANK() OVER (PARTITION BY cdc_reports_state ORDER BY
cdc_reports_epi_week ) AS zrank
FROM (
SELECT
cdc_reports.epi_week AS cdc_reports_epi_week ,
cdc_reports.state AS cdc_reports_state ,
COALESCE( CAST ( SUM (( FLOAT(cdc_reports.cases))) AS FLOAT ),0)
AS cdc_reports_total_cases
FROM
[lookerdata:cdc.project_tycho_reports] AS cdc_reports
WHERE
(cdc_reports.disease = ' RUBELLA')
AND (FLOOR(cdc_reports.epi_week/100) = 1970 OR
FLOOR(cdc_reports.epi_week /100) = 1971)
AND (cdc_reports.state = 'IN '
OR cdc_reports.state = 'WA ')
GROUP EACH BY
1,
2) ww ) aa ) xx
WHERE
zminrank <= 500
LIMIT
30000
Figure 8.9: Looking for rubella in CDC reports for Indiana and Washington.
8.6.2 Looking for Weather Station Anomalies
From NOAA we have the Global Summary of the Day (GSOD) weather for 9,000
weather stations between 1929 and 2016. While not all stations were operating
during that entire period, there is still a wealth of weather data here. To illustrate,
we write a query to look for the hottest locations in Washington for 2015. This was
a particularly warm year that brought unusual droughts and fires to the state. Our
query, shown below, joins the dataset
gsod2015
, the table of 2015 d ata, with the
station
table to determine the station names. We order the results descending by
temperature. Table 8.1 shows the top 10 results.
153
8.6. Google Cloud Datalab
Figure 8.10: Progre ss of rubella in Washington (red) and Indiana (blue)from1970through
1971. The x axis is the week and the y axis the number of cases.
%% sql
SELECT maxval , (maxval -32)*5/9 celsius , mo , da , state, stn , name
FROM (
SELECT
maxval , mo , da , state , stn , name
FROM
[bigquery-public-data:noaa_gsod.gsod2015] a
JOIN
[bigquery-public-data:noaa_gsod.stations] b
ON
a.stn=b.usaf
AND a.wban=b.wban
WHERE
state="WA"
AND maxval <1000
AND country =' US ' )
ORDER BY
maxval DESC
154
Chapter 8. Data Analytics in the Cloud
Table 8.1: Hottest reported temperatures in the state of Washington in 2015.
# Max F Max C Mo Day State Stn Stn name
1 113 45 06 29 WA 727846 Walla Walla Rgnl
2 113 45 06 28 WA 727846 Walla Walla Rgnl
3 111.9 44.4 06 28 WA 727827 Moses Lake/Grant Co
4 111.9 44.4 06 29 WA 727827 Moses Lake/Grant Co
5 111.2 44 09 24 WA 720272 Skagit Rgnl
6 111.2 44 09 25 WA 720272 Skagit Rgnl
7 111 43.9 06 29 WA 720845 Tri Cities
8 111 43.9 06 28 WA 720845 Tri Cities
9 111 43.9 06 27 WA 720845 Tri Cities
10 109.9 43.3 06 28 WA 720890 Omak
The results are, fo r the most part, what we would expect. Walla Wall a, Moses
Lake, and Tri-Cities are in the eastern part of the state; summer was particularly
hot there in 2015. But Skagit Rgnl is in the Skagit Valley near Pu get Sound. Why
was it 111
°
F there in September? If it was so hot there, what wa s the weather
like in the nearby locations? To find out which stations were nearby, we can look
at the stations on a map. The qu ery is simple, but it took some trial and error
because the latitude and longitude for one station,
SPOKANE NEXRAD
, was incorrect
in the database, placing it somewhere in Mongolia.
%% sql -- module stationsx
DEFINE QUERY locations
SELECT FLOAT (lat/1000.0) AS lat , FLOAT (lon/1000.0) as lon , name
FROM [bigquery-public-data :noaa_gsod.stations]
WHERE state ="WA" AND name != " SPOKANE NEXRAD "
We can then i nvoke the Cloud Datalab mapping functions, as shown in fig-
ure 8.11 on the next page. We find that one station, called PADILLA BAY
RESERVE, is only a few miles away; the next closest is BELLINGHAM INTL.
We can now compare the weather for 2015 at these three locations. First, we use a
simple query to get the station IDs.
%% sql
SELECT
usaf , name
FROM [bigquery-public-data :noaa_gsod.stations]
WHERE
name=" BELLINGHAM INTL" OR name=" PADILLA BAYRESERVE " \
OR name = "SKAGITRGNL"
155
8.6. Google Cloud Datalab
Figure 8.11: Mapping the weather stations in the northwest part of Washington.
With these results i n hand, we can build a parameterized BigQuery expression:
qry = " SELECT max AS temperature , \
TIMESTAMP(STRING(year) + '-' +STRING(mo)+'-' +STRING(da))\
AS timestamp FROM [ bigquery - public -data : noaa_gsod . gsod2015 ] \
WHERE stn = '%s' and max <500 \
ORDER BY year DESC , mo DESC , da DESC"
stationlist = [' 720272 ',' 727930 ', ' 727976']
dflist = [bq.Query(qry % station ).to_dataframe () \
for station in stationlist]
We can now use the following code to graph the weather for our three stations,
producing the result in figure 8.12 on the following page.
from pylab import rcParams
rcParams[' figure. figsize']=20,5
with plot .style .context (' fivethirtyeight '):
for df in dflist :
plot. plot(df[' timestamp ',df[' temperature'],linewidth=2)
plot. show()
156
Chapter 8. Data Analytics in the Cloud
Figure 8.12: Maximum daily temperatures for Skagit (blue), Padilla Bay (red), and
Bellingham (orange).
We can clearly see the anomaly for Skagit in September. We also spot another
problem in March, where the instruments seemed to be not recordin g. Other than
these, the readings are closely aligned.
These simple examples only scratch the surface of what can be done with
Datalab. You can, for example, use Datalab with the TensorFlow machine learning
library, and the charting capabilities are far more extensive than demonstrated
here. Also, you can easily upload your own data to the warehouse for analysis.
8.7 Summary
Big data analytics may be the most widely used cloud computational capability.
All companies, big or small, that run onli ne services analyze their log data in order
to understand their users and learn how to optimize their services. Moreover, the
availability of large cloud-hosted scientific data collections continues to grow in
fields from astronomy and cosmology to Earth science and genomics. And the
original Hadoop MapReduce tools first used to perform large-scale data analytics
in the cloud have now morphed into a vast suite of open source systems that are
constantly being improved and extended, providing enormous power for the user
but also introducing considerable complexity.
We have examined here only a few of the data analytics services available in the
cloud. We introduced the architecture of YARN and then focused on Spark, which
we chose because of its power and flexibility. We illustrated how Amazon Elastic
MapReduce can be an excellent platform for hosting Spark. Because EMR is b ased
on YARN, we were able to demonstrate how Azure HDInsight is similar to EMR
in power. However, we did not cover the ma ny additional services and tools that
are part of the YARN ecosystem: tools such as
HBase
[
138
], a NoSQL database,
157
8.8. Resources
and
Phoenix phoenix.apache.org
, the relational layer over HBase;
Hive
,adata
warehouse and query tool;
Pig
[
214
], a scripting tool for orchestrating Hadoop
MapReduce jobs; or
Oozie
[
214
], which provides workflow m anag ement for Hadoop,
Pig, and Hive.
We have also taken a brief look at Amazon Athena and Azure Data Lake,
services that support analysis of data in large warehouse repositories. Athena
provides a portal-based tool for rapi d data analysis over any data stored in Amazon
S3; and Data Lake Analytics provides U-SQL, a tool for writi ng a graph-based
parallel analysis program over petascale data in the Azure Data Lake Store. Both
systems are based on serverless computing paradigms. To execute their queries,
you do not need to deploy and manage VMs: the cloud infrastructure manages
everything for them. We consider such issues in more detail in the next chapter.
8.8 Resources
Many excellent books cover various aspects of the topics discussed i n this chapter.
In particular, we recommend the following.
Hadoop 2 Quick-Start Guide: Learn the Essentials of Big Data Computing
in the Apache Hadoop 2 Ecosystem [
114
] provides a good introduction to
YARN and the full Apache Hadoop ecosystem.
Python Data Science Handbook: Essential Tools for Working with Data [
253
]
provides an excellent deep dive into Python data topics, including the Pandas
data analysis library and the Python machine learning library.
Advanced Analytics with Spark: Patterns for Learning from Data at Scale [
229
]
provides an in -depth discussion of Spark. We have covered Spark from the
Python programmer’s perspective, but Spark’s native language is Scala. To
appreciate the full power of Spark, you need to study this book.
Two additional Python data analysis books are Python for Data Analysis: Data
Wrangling with Pandas, NumPy, and IPython [
193
]andPython Data Analytics:
Data Analysis and Science using Pandas, Matplotlib and the Python Programming
Language [209].
For those wanting to apply data analysis methods to real data, many publ ic
datasets are available to researchers in public and private clouds. The following
are some datasets on the Google Cloud cloud.google.com/public-datasets :
158
Chapter 8. Data Analytics in the Cloud
GDELT Book Corpus: 3.5 million digitized books
Open Images Data: 9 million URLs to annotated images
NOAA GSOD Weather Data: weather from 1929 to 2016 from 9,000 stations
USA Disease Surveillance: reports of nationally notifiable diseases from all
U.S. cities
NYC taxi and limousine trip records from 2009 to 2015
Amazon
aws.amazon.com/public-datasets
also provides many datasets, including
the following:
Landsat and SpaceNet: Satellite imagery of all land on Earth as well as
commercial satellite images
IRS 990 tax filing from 2011 to the present
The Common Crawl Corpus, composed of over 5 billion web pages
NEXRAD weather radar data
Global Database of Events, Language, and Tone (GDELT) records monitoring
the world’s broadcast, print, and web news
The private Open Science Data Cloud
opensciencedatacloud.org/publicdata
also
has data available to researchers, including the following.
City of Chicago public datasets
EMDataBank: A resource for a 3 -D Electron Mi croscopy
FlyBase: A highly valued database for Drosophila fruit fly genomics
General Social Survey: Answers to a range of demographic, behavioral, and
attitudinal questions
Million S ong Dataset: Data and m etadata for a million popular music tracks
In addition, various genomics and other life science datasets are available on all
three clouds, including the following.
The 1,000 Genomes dataset
The Illumina Platinum Genomes, a collection of high-quality genomic datasets
for community use
The Cancer Genome Atlas (TCGA), a collection of cancer genomics data
The 3,000 Rice Genomes collection from 89 countries.
159