Chapter 9
Streaming Data to the Cloud
“Panta rhei [everything flows].”
—Heraclitus
While batch analysis of big data collections is important, real-time or near-
real-time analysis of data is becoming increasingly critical. Consider, for example,
data from instruments that control complex systems such as the sensors onboard
an autonomous vehicle or an energy power grid: here, the data analysis is critical
to driving the system. In some cases, the value of the results diminishes rapidly
as they get older. For example, trending topics and hashtags in a twitter stream
may be uninteresting after the fact. In o ther cases, such as some large science
experiments, the volume of data that arrives each second is so large that it cannot
be retained and real-time analysis or data reduction is the only way to h and le it.
We refer to the activity of analyzing data coming from unbounded streams
as
data stream analytics
. While many p eo ple think this is a new topic, it
dates back to basic research on complex event processing in the 1990s at places
including Stanford, Caltech, an d Cambridge [
87
]. That research created part of
the intellectual foundation for today’s systems. In the paragraphs that follow, we
describe some recent approaches to data stream analytics from the open source
community and public cloud p roviders. These approaches include Spark Stream-
ing
spark.apache.org/streaming
, derived from the Spark parallel data a nal ysis
system described in the next chapter; Twitter’s Storm system
storm.apache.org
,
redesigned by Twitter as Heron [
173
]; Apache Flink
flink.apache.org
from the
German Stratosphere project; and Google’s Cl oud Dataflow [
58
], becoming Apache
Beam
beam.apache.org
, which runs on top of Flink, Spark, and Google’s Cloud.
9.1. Scientific S tream Examples
University projects include Borealis from Brandeis, Brown, and MIT, and the
Neptune and Granules projects at Colorado State. Other commercially developed
systems include Amazon Kinesis
aws.amazon.com/kinesis
, Azu re Event Hubs [
31
],
and IBM Stream Analytics [25].
The analysis of instrument data streams sometimes needs to move closer to
the source. Tools are emerging to perform preanalysis, with the goal of identifying
the data subsets that should be sent to the cloud for deeper analysi s. For example,
the Apache Edgent edge-analytics tools
edgent.apache.org
are designed to run
in small systems such as the Raspberry Pi. Kamburugamuve and Fox [
165
] survey
many of these stream-processing technologies, covering i ssu es not discussed here.
In the rest of this chapter, we use examples to motivate the need for data
stream analytics in science, discuss challenges that data stream analytics systems
must address, and describe the features and illustrate the us e of a range of open
source and cloud provider systems.
9.1 Scientific Stream Examples
Many interesting examples of data stream processing in science have been examined
in recent workshops [130]. We review a few representative cases here.
9.1.1 Wide-area Geophysical Sensor Networks
Many geophysical phenomena, from climate chang e to earthquakes and inland
floodin g, require vast data gathering and analytics if we are to both understand
the underlying scientific pro cess es and mitigate damage to life and property.
Increasingly, scientists are operating sensor networks that provide large quantities
of streaming data from many geospatial locations. We give three examples here.
Studies of ground motion conducted by the Southern California Earthquake
Center
scec.org
can involve thousands of sens ors that record continuously for
months at high sample rates [
208
]. One purpose for collecting such data is to
improve ea rthquake models, in which case data delivery rates are not of great
concern. Another purpose is real-time earthquake detection, in which ground
movement data are collected, analyzed to determine likely location and hazard
level, and alerts generated which may, for example, be used to stop trains [
174
].
In this case, delivery and processing speed are vital.
The Geodesy Advancing Geosciences and EarthScope (GAGE) Glob al Posi-
tioning System (GPS) [
220
] network manages data streams from approximately
162
Chapter 9. Streaming Data to the Cloud
2,000 continuously operating GPS receivers spann ing an area covering the Arctic,
North America, and the Caribbean. These data are used for studies of seismic,
hydrol ogi cal, and other phenomena, and if availab le with low latency can also be
used for earthquake and tsunami warnings.
The U.S. National Science Foundation-funded
Ocean Observatories Initia-
tive
(OOI) [
102
,
34
] operates an integrated set of sci ence-driven platforms and
sensor systems at multiple locations worldwide, including cabled seafloor devices,
tethered buoys, and mobile assets. 1,227 instruments of 75 dierent types collect
more than 200 dierent kinds of data regarding physical, chemical, geological, and
biological properties and processes, from the seafloor to the air-sea interface. Once
acquired, raw data (consisting mostly of tables of raw instrument values—counts,
volts, etc.) are transmitted to one of three operations centers and hence to mi rrored
data repositories at east and west coast cyberinfrastructure sites, where numerous
other derived data are computed. The resulting data are used to study issues such
as climate change, ecosystem variability, ocean acidification, and carbon cycling.
Rapid data delivery is important because scientists want to use near-real-time data
to detect and monitor events as they are happening.
9.1.2 Urban Informatics
Cities are complex, dynamic systems that are growin g rapidly and becoming
increasingly dense. The global urban population in 2014 accounted for 54% of the
total global population, up from 34% in 1960. In the U.S., 62.7% of the people live
in cities, although cities constitute only 3.5% of the land area [
98
]. Cities consume
large quantities of water and power, and contribute significantly to greenhouse
gas emissio ns. Maki ng urban centers safe, healthy, sustainable, and ecient is of
critical global importance.
Understanding how cities work and how they respond to changing environmental
conditions is now part of the emerging discipline called
urban info rmatics
. This
new discipline brings together data analytics experts, sociologists, economists,
environmental health specialists, city planners, and safety experts. In order to
capture and understand the dynamics of a city, many municipalities have begun
to install instrumentation that helps monitor energy use, air and water quali ty,
the transportation system, crime rates, an d weather conditions. The aim of this
real-time data gathering and analysis is to help municipalities avert citywide or
neighborhood crises and more intelligently plan future expansion.
163
9.1. Scientific S tream Examples
Array of Things
. A project in the city of Chicago called the
Array of Things
arrayofthings.github.io
is leading the way in defining how data can be colle cted
and analyzed from instruments placed within cities. The project is led by Charlie
Catlett and Peter Beckman from the Urban Center for Computation and Data, a joint
initiative of the University of Chicago and Argonne National Laboratory. The Array
of Things team has designed the hardware and software for a sensor pod that can be
placed on utility poles throughout a city, gather local data, and push the data back as
a stream to a recording and analysis point in the cloud. As shown in figure 9.1 on the
following page, the sensor package contains numerous instruments, a data-pro c ess ing
engine, and a communications package. Because the sensor package is intended to
be installed on poles and infrequently accessed, reliability is an important feature.
Consequently, the system contains a sophisticated reliability management subsystem
that monitors the instruments, computing, and communications, and reboots failed
systems if needed and possible.
Sensors in the sensor pod can measure temperature and humidity; physical
shock and vibration; magnetic fields; infrared, visible, ultraviolet light; sound; and
atmospheric contents such as carbon monoxide, hydrogen sulphide, nitrogen dioxide,
ozone, sulfur dioxide, and air particles. The pod also contains a camera, but it cannot
transmit identifiable images of individual people. The data from these instruments
are uploaded approximately every 30 minutes as a JSON record, which approximately
takes the form shown below.
{
"NodeID": string,
"timestamp": 2016-11-10 00:00:20.143000,
"sensor": string,
"parameter": string,
"value": string
}
Note th at a data s tre am from the pod may have more than one sensor and that
individual sensors can take multiple dierent measurements. These measurements
are distinguished by the
parameter
keyword. We use this structure in an example
later in this chapter.
9.1.3 Large-scale Science Data Flows
At the other end of the application spectrum, m ass ively parallel simulation models
running on supercomputers can generate vast amounts of data. Every few simulated
time steps, the program may generate large (50 GB or more) datasets distributed
over thousands of p rocessing elements. While you can save a few such datasets to
164
Chapter 9. Streaming Data to the Cloud
Figure 9.1: Array of Things sensor pod contents (top) and pole mount (bottom.)
a file system, it is often preferable to create a stream of these big objects and let
another analysis system consume them online, without writing them to disk.
The
ADIOS
[
184
] HPC I/O library supports this mode of interaction. It
provid es a simple and uniform API to the application programmer, while allowing
the backend to be adapted to a variety of storage or networking layers while
taking full advantag e of the parallel I/O capabilities of the host system. One
such backend leverages a networking layer, EVPath [
115
], that provides the flow
and control needed to ha ndle such a massive stream. Another backend target for
ADIOS is DataSpaces [
112
], a system for creating shared data structures between
applications across distributed systems. DataSpaces accomplishes this by mapping
n
-dimensional array objects to one dimension by using a distributed hash table
165
9.2. Basic Design Challenges of Streaming Systems
and Hilbert space filling curves. Together, these components provide a variety of
streaming abstractions that can be used to move data from a HPC application to
a range of HPC data analysis and visualization tools.
9.2 Basic Design Challenges of Streaming Systems
Designers of streaming systems face a number of basic challenges, including cor-
rectness and consistency. Data in an unbounded stream are unbounded in time.
But if you want to present results from the analytics, you cannot wait until the
end of time. So instead you present results at the end of a reasonable time window.
For example, you may specify a daily sum mary based on a complete checkpoint
of events for that day. But what if you want results more frequently, say, every
second? If the processing is d istrib uted and the window of ti me i s s hort, you may
not have a way to know the global state of the system, and some events may be
missed or counted twice. In this case the reports may not be consistent.
Strongly consistent
event systems guarantee that each event is processed
once and only once. In contrast, a
weakly consistent
system may give you only
approximate results. You can verify these res ults, if needed, by performing a batch
run on each daily checkpoint file, but this verification step involves additional
work and delays. Streaming system des ign s that combine a streaming engine wi th
a separate batch system are examples of la mbda architecture [
190
]. The goal of
many of the systems described below is to combine the b atch comp utin g capability
with the streaming semantics so that a sep arate batch system is not necessary.
A second concern is the semantics of time and windows. Many event sources
provid e a time stamp when an event is created and pushed into the stream . However,
the event will not be processed until a later time. Thus, we need to distinguish
between event time and processing time. To further complicate things, events may
be processed out of event-time order. These factors raise the question of how to
reason about event time in windows defined by processing time.
At least four types of time windows exist.
Fixed time
windows divide the
incoming stream into logical s egments, each corresponding to a specified interval
of processing time. The intervals do not overlap.
Sliding
windows allow for the
windows to overlap: for example, windows of size 10 seconds that start every
five seconds.
Per session
windows divide the stream into sessions of activity
related to some key in the data. For example, mouse clicks from a particular
user m ay be bundled into a sequence of sessions of clicks nearby in time.
Global
windows can encapsulate an entire bounded stream. Associated with windows
there must be a mechanism to trigger an analysis of the content of the window
166
Chapter 9. Streaming Data to the Cloud
and publish the summary. Each of the systems that we d iscuss below support
some windowing mechanisms. Two articles by Tyler Akidau [
57
]provideagood
discussion of wind ows and related issues.
Another design issue concerns how work is distributed over processors or
contai ners and how parallelis m is achieved. As we will see, the systems described
here all adopt similar approaches to parallelism.
Operations on streams often resemble SQL-like relational operators, but there
are important dierences. In particular, a join operation is not well defined
when streams are unbounded. The natural soluti on involves dividi ng streams by
windows in time and performing the join over each window. Vijayakumar and
Plale have studied this topic extens ively [
255
]. Barja et al. [
66
] describe a Complex
Event Detection and Response system in which SQL-like temporal queries have
well -defined semantics.
9.3 Amazon Kinesis and Firehose
Amazon provides an impressive event-streaming software stack called
Kinesis
,
comprising th e following three services:
1. Kinesis Streams provides ordered, replayable real-time streaming data.
2. Kinesis Firehose
, designed for extreme scale, can load data directly into
S3 or other Amazon services.
3. Kinesis Analytics
provid es SQL-based tools for real-time analysis of stream-
ing data from Kinesis Streams or Firehose.
9.3.1 Kinesis Streams Architecture
Each Kines is stream is composed of one or more shards. Think of a s tream as
a rope composed of many strands. Each strand is a shard, and the data that
move through the stream are spread across the individual shard s that make up
the stream. Data producers write to the shards and consumers read from the
shards. Each shard can support writes from producers at up to 1,000 records
per second, up to a maximum data write total of 1 MB per second. However, no
individual record can be bigger th an 1 MB. On the other hand , reads by data
consumers can be at most five transactions per second, with a total throughput of
2 MB/sec. While these bounds may seem rather limiting, you can have streams
with thous and s of shards. For scientists, the biggest limitation may be the 1 MB
167
9.3. Amazon Kinesis and Firehose
limit on the size of each event, which means that big events must split up and
spread over multiple shards. We return to this detail l ater.
To create a stream, go to the Amazon console, click on the word
stream
,give
the stream a name, and select the desired number of shards, as shown in figure 9.2.
Figure 9.2: Creating a single stream with one shard from the Amazon console.
Let’s start with an example of sending a record to the stream
cbookstream2
.
Assume we want to send a series of time-stamped temperature readings from a
temperature sensor. Every record has to have a stream name, a binary encoded
data component, and a way to identify the shard. The Boto3 SDK that we use here
provid es an interface that is consistent with that used for other Amazon services.
We identify the shard by giving the record a partition key as a string: in this case,
as we have just one shard, th e string
'a'
. This key is hashed to an integer that
is used to select a shard. (If there is only one s hard, all records are mapped to
shard 0.) In our example, the binary data co mponent of our record is up to us.
We create a string that encodes a JSON record and then co nvert the string to
a binary array. The following code sends our record to the stream. (We use the
datetime
format for the timestamp because that i s the format used by Streams
for its timestamps.)
client = boto3.client(' kinesis')
tz = pytz . timezone (' America/Los_Angeles ')
ts = datetime . datetime . now (tz )
item = {' id ': ' sensor 1', 'val ':73,' label ': ' temperature',
' localtime ': str (ts)}
data = json .dumps( item)
client.put_record(
StreamName=' cbookstream2 ',
Data= bytearray(data),
PartitionKey = 'a '
)
Reading a stream takes a little more wo rk. Every record loaded into a shard
has a sequence number. To read the records, you need to provide a shard iterator,
168
Chapter 9. Streaming Data to the Cloud
which can be created in several ways. One way is to specify a timestamp so that
you read only the records that arrive after the specified time. A nother way is
to request that the iterator be positioned to be the latest point in th e stream so
that you only get the new records that come after that point. Or you can create
an iterator from a sequence number. To do so, you also need the shardID. You
can obtain both the shardID and a starting sequence number fo r that shard by
calling
describe_stream(StreamName)
. Given this information, you can create
an iterator as follows.
client = boto3.client(' kinesis')
iter =client.get_shard_iterator(
StreamName=' cbookstream2 ',
ShardId=' shardID ',
ShardIteratorType = ' AT_SEQUENCE_NUMBER ',
StartingSequenceNumber=' seqno '
)
If you want an iterator that starts only after the latest record, so as to obtain
only new records, you can set the
ShardIteratorType
equal to “LATEST” and
omit the sequence number. (Two other iterator constructor methods also can be
used, but we do not discuss them here.)
Havin g created an iterator, we can then ask for all records collected from that
point on by using the
get_records()
function. This function returns at most
10 MB in one call an d can support only 2 MB per secon d. To avoid the 10 MB
limit, you can limit the number of records returned; but if you are near the lim it,
it is better to add new sh ards or, alternatively, to use a split-shard function.
The best strategy is to pull records in time-defined windows and then do the
analysis for each window. The
get_records()
function returns a list of records
and metadata that includes a “next shard iterator” that can be used to position
the function for the next batch of records. For example, taking the resu lt from the
previous code, a typical processing loop would look like the following.
iterator = iter[ ' ShardIterator']
while True :
time. sleep (5.0)
resp = client. get_records( ShardIterator =iterator)
iterator = resp[' NextShardIterator']
analyzeData(resp['Records'])
This code pulls a block of records that have been waiting in the shard for the
last five seconds (not counting the time to do the data analysis). For example,
suppose we want to measure the time that elapses from when the stream creates a
169
9.4. Kinesis, Spark, and the Array of Things
record, as described above, until the time the record arrives and is timestamped in
the stream. We could write it as follows.
def analyzeData(resp):
# resp is the response[' Records '] field
for rec in resp :
data = rec['Data ']
arrivetime = rec[' ApproximateArrivalTimestamp']
print( ' Arrival time = '+ str (arrivetime))
item = json .loads( data)
prints(' Local time = ' + str (parse(item['localtime '])))
delay = arrivetime - parse(item[' localtime '])
secs = delay. total_seconds ()
print( ' Message delay to stream = '+ str (secs) + ' seconds ')
9.3.2 Kinesis and Amazon SQS
It is instructive to compa re Kinesis S treams and Ki nesi s Firehose with the Am azon
Simple Queue Service
(SQS) introduced in section 2 .3.6 on page 34. SQS is
based on the semantics of queues. Message (event) producers add items to SQS
queues; SQS clients can retrieve messages from queues for processing. Mes sages in
a queue are not removed by clients, but are instead retained for a period of time
(typically 24 hours) or until the entire queue is explicitly flushed. Each message in
a queue has a sequence number; knowing this number, the client can fetch that
message and all subsequent messages (up to a limit) with a single call to the stream
API. Thus, a client can replay an analysis of a queu e at any time, and dierent
clients can process the same queue in the same or dierent ways.
In contrast, Kinesis Firehose is designed to handle large streams of data that
are automatically delivered to S3 or Amazon Redshift. Firehose is batch oriented:
it buers incoming data into buers of up to 128 MB and d ump s the buer to
your S3 blobs at specific intervals that you select, from every minute to every 15
minutes. You can also specify that the data be compressed and/or encrypted.
Consequently Firehose is not designed for real-time analysis but for near-real-time
large-scale analysi s.
9.4 Kinesis, Spark, and the Array of Things
To illustrate the use of Kinesis together with Spark in a streaming context, we
study the data from 40 dierent instrument streams coming through Kinesis. Spark
has a s treamin g subs ystem called
Spark Streaming
. The key concept is that of
170
Chapter 9. Streaming Data to the Cloud
a
D-stream
, which takes win dows of streamed data and blocks them into Spark
RDDs for analysis. We use these concepts here to implement a basic anomaly
prediction algorithm. More specifically, we build a Jupyter notebook that tracks
all the streams but focuses on two where sudden dramatic changes in behavior
have been seen. Our goal is a system that automatically flags these anomalies. To
make this easier to do in a notebook, we use regular Spark plus Kinesis, rather
than the Spark streaming subsystem. We create a pseudo D-Stream by pulling
blocks of events from Kinesis and converting each block into a Spark RDD.
The d ata that we work with here are based on a 24-hour sample of event
streams from 40 instruments deployed as part of the Array of Things project
described in section 9.1.2 on page 163. We make these data available for download
from the book website; many more are available from arrayofthings.github.io .
We develop a program to read the dataset and push the events as fast as
possible to Kinesis. While we do not present all code details here—we want to
focus on the co re ideas—the details are in notebook 17. (The book website has
links to the code and data fo r this example in the Extras tab.) We first create
an instance of Jupyter running with Spark. (The example can run on a laptop.)
We connect to Kin esis exactly as described in the previous section and start pulling
event records and accumulating them into RDD windows.
We use a simple algorithm to detect anomalies: We record the data stream
values for each stream of interest; and as we are recording them, we compute a
simple predicted value for the next value. If the next value from the stream turns
out to be extremely dierent from our prediction, then we flag an ano mal y.
To record the data streams, we create an instance of a class
Datarecorder
.
This has one main method:
record_newlist(newlist)
, which takes a list of event
records of the form (timestamp , value) and appends it to
self.datalist
, which
is the record of the stream. In ad dition to this record, we use the recent history of
the stream to signal unexpected changes in th e stream, such as anomalies or other
major changes in behavior. A challenge is that some of the signals can be noisy, so
we must look for ways to filter the noise to see the significant changes in behavior.
To filter the noise in a stream
x
i
,i
=0
...
, we can use an exponential-based
smoothing technique that blends the previous values from the stream to create an
artificial stream
s
i
that tends to a geometric average of past values as follows. Let
s
0
= x
0
, and use the following recurrence to define the future values:
s
n
=(1 )x
n
+ s
n1
,
where is a number between 0 and 1. Expan din g the recurrence, we see that:
171
9.4. Kinesis, Spark, and the Array of Things
s
n
=(1 )x
n
+ (1 )x
n1
+
2
s
n2
s
n
=(1 )
n1
X
i=0
i
x
ni
+
n
x
0
(One can easily verify that in the case of a constant stream with
x
i
=
x
0
for all
i
, then
s
i
=
x
i
for all
i
.) If the stream makes a radical change i n value, however,
the smoothed value lags behind. We can use this lag to identify anomali es and
other points of behavior change. But the problem then becomes how to measure a
profound change in behavior in a man ner that is distinct from noise. To do that,
we can compute the standard deviation and look for departures from our smoothed
value that exceed the standard deviation.
We cannot readil y compute the standard deviation of stream val ues that may
dramatically change over time, but we can compute the standard devia tion in a
recent window. We create a buer, buf
i
,
i
=1
..M
, to record the most recent
M
stream values. We can compute the standard deviation
in this window as follows.
µ =
1
M
M
X
i=1
buf
i
=
v
u
u
t
1
M
M
X
i=1
(buf
i
µ)
2
Based on this computation, we can look for values of
x
i
that lay outside the
interval [
s
i
k, s
i
+
k
], where
k
is some value greater than 1. We use
k
=1
.
3 here.
While 2
would show us some truly significant outliers, we have found that the
more modest 1
.
3
worked well. This computation takes place in the Datarecorder
record_newlist(newlist)
method. We also keep a record of
s
i
,
s
i
k
and
s
i
+ k that we can plot after we complete the anal ysis .
We look at two of the 40 streams by creating a dictionary of the data recorders
for each stream of interest. One is a chemistry sensor that tracks atmospheric
no
2
,
a common pollutant; and the other is an ambient temperature sensor. We create
a function
update_recorders(newlist)
, shown on the next page, which takes a
list of the fol lowing form, and uses the dictionary to select the correct recorder,
passing the timestamp and value list to the recorder function.
172
Chapter 9. Streaming Data to the Cloud
[[sensor-name,[[time-stamp,value],[time-stamp,value].... ]
[sensor-name, [[time-stamp, value], [time-stamp, value] .... ]
..
]
myrecorder = {}
myrecorder[' Chemsense_no2 ']=Datarecorder(' Chemsense_no2 ')
myrecorder[' TSYS01_temperature']=Datarecorder(' TSYS01_temperature ')
def update_recorders(newlist):
if newlist != None and newlist != []:
for x in newlist:
myrecorder[x[0]].record_newlist(x[1])
Our Spark-Kinesis pipeline has four stages. The first stage creates the RDD in
our pseudo D-Stream:
gather_list(iteratorlist)
takes the li st of Kinesis stream iterators and
pulls all the events since the last call to this function from the stream. It then
updates
iteratorlist
with the iterator advanced to the spot that follows
the l ast one we pulled. Each event is a binary-encoded JSON object so it is
converted into a full JSON object and then to a list.
filter_fun(row, sensor, parameter)
is used to select the elements of the
RDD that correspond to a specific sensor, parameter pair.
The main loop of the program emulates Spark streaming. Approximately every
20 seconds, we gather all available events from the Kinesis stream and then create
a Spark RDD for them called data. Each event is a list of the following form.
[sensor-name, [timestamp, value]]
The first step in the pipeline filters all events except those that we want to
keep. Th e second step groups the events by the sensor-name key in the tuple,
producing a list with two elements, as follows.
[['Chemsense-no2',[python-iteratorover(time-stamp,value)tuples]],
['TSY01-temperature',[python-iteratorover(time-stamp,value)tuples]]
]
The third step uses
map
to convert the Python iterators into explicit lists, using
a simple function
doiter(row)
. We then
collect
these lists into a single
newlist
,
which we pass to the recorders to record and look for events of interest.
173
9.4. Kinesis, Spark, and the Array of Things
for i in range (150):
gathered = gather_list(iterlist)
data = sc. parallelize (gathered , 2)
newlist = data. filter( lambda p: filter_fun(p,' Chemsense ', ' no2 ')
or filter_fun(p,' TSYS01 ','temperature')) \
.groupByKey() \
. map ( lambda p: doiter(p)) \
.collect()
update_recorders(newlist)
print( ' ********** end of gather %s ***************'%i)
time. sleep (20.0)
Note that the data in the dataset cover 24 hours of real data, during which time
the instruments send an event approximately every 25 seconds: approximately two
events per minute for a total of 3,450 for the full day. Altogether there are 172,800
events in the dataset, and the total size is approximately 14 MB. We push all data
to Kinesis in about 120*20 seconds = 40 minutes using one shard. We could do it
mu ch faster with two s hards .
The output is rather dull until we get to timestamp 17:05:06, when we see the
following anomal ies reported.
[updating list for TSYS01_temperature
********** end of gather 84 ***************
updating list for Chemsense_no2
anomaly at time 2016-11-10 17:05:06.980000?
anomaly at time 2016-11-10 17:05:57.049000
... lines deleted ...
anomaly at time 2016-11-10 17:13:26.838000
anomaly at time 2016-11-10 17:13:51.875000
updating list for TSYS01_temperature
anomaly at time 2016-11-10 17:15:57.064000
... lines deleted ...
anomaly at time 2016-11-10 17:29:17.405000
anomaly at time 2016-11-10 17:29:42.460000
anomaly at time 2016-11-10 17:30:07.507000
... more time passes ...
updating list for TSYS01_temperature
anomaly at time 2016-11-10 22:23:07.533000
anomaly at time 2016-11-10 22:23:32.592000
anomaly at time 2016-11-10 22:24:22.673000
... ending at ...
anomaly at time 2016-11-10 23:14:47.974000
anomaly at time 2016-11-10 23:17:18.233000
********** end of gather 114 ***************
174
Chapter 9. Streaming Data to the Cloud
Each recorder keeps track of the data, smoothed prediction, and 3
-wide safety
window. The plot function on the Datarecorder shows the history. We can clearly
see the periods of strange behavior, as shown in figure 9.3.
Figure 9.3: Plot of the data streams from the s ens ors. The high frequency blue line is the
raw data. The red line is the smoothed prediction line. The other two lines show 1
.
5
above and below the prediction, respectively. When the blue line escapes the 3
window,
the anomaly is signaled.
9.5 Streaming Data with Azure
Azure has a set of services that are devoted to real-time, large scale stream analytics.
The system is designed to handle millions of events per second and to correlate
across multiple streams of data. Internet of Things applications are particularly
well handled here. The two primary components of the Azure stream analytics
services are the Azure Event Hubs service and Stream Analytics engine.
Event Hubs is where your instruments send their events. It is similar in this
respect to Kinesis. The analytics portal is where you put your event processing
logic, which is expressed as a streaming dialect of SQL. As illustrated in figure 9.4
on the next page, the Stream Analytics portal allows you to select an input stream
175
9.5. Streaming Data with Azure
from an Event Hub or from blob storage and use it as input to the query system.
You can then direct the output to blob storage. As illustrated in the figure, the
output of the query may go di rectly to the portal console.
Figure 9.4: View of an Azure stream analytics query. In this example we are looking for
telephone fraud. T he input stream is a set of call logs. The query is looking for a call
from the same number from dierent locations at nearly the same time.
The best way to create an Event Hub is via the Azure portal. Once that is
accomplished, sending events to the Event Hub is easy. The Event Hubs service
identifies hub instances by a name space,anEvent Hub name,anaccess key name,
and an access key. These values are all available from the portal. You can create
as many hub instances in a namespace as you need. The maximum size of an event
is 256 KB, so if your data are larger, you must break them into 256 KB blocks
and send them in batches. The Event Hub has a sequence number so that you can
keep track of the individual parts of your big message.
Suppose your events are scientific records wi th the form
category, title,
abstract
. You set up a connection to the Event Hub and send an event triple
doc
as follows, labeling each element of the event triple appropriately.
176
Chapter 9. Streaming Data to the Cloud
from azure. servicebus import ServiceBusService
servns = ' myhubnamespace '
key_name = ' RootManageSharedAccessKey ' # default from Azure portal
key_value = ' longkey from portal'
sbs = ServiceBusService ( service_namespace = servns,
shared_access_key_name=key_name ,
shared_access_key_value=key_value)
event_data = {'category':doc[0], ' title ':doc[1], ' abstract ':doc[2]}
sbs. send_event (' event hub name' ,json.dumps(event_data))
Events from the Event Hub are typically routed to the Azure Stream Analytics
engine. As shown in figure 9.5, you can save the output from the engine to b lob
storage; route it to the Azure Queu e Services, where your own microservices can
subscribe to events and process them; or have the Stream Analytics service queries
invoke the Azure Machine Learning services.
Figure 9.5: Events move from sourc e s to the Event Hub and then to the Stream An alytics
engine. From there they may go to blob storage, the Azure queue service, or the Azure
Machine Learning service.
To show how the machine learning system works with stream analytics, we need
to look more closely at the SQL language dialect that is part of the analytics services.
Language extensions have been added to accommodate a library of mathematical
and logical functions, as well as windowing semantics. Similar concepts are part
of the query lan gua ge used by Am azon Kinesis Analytics. For exam pl e, suppose
we have a stream of temperature reading events, each contain ing a sensor name
and a temperature value, and you want to know the standard d eviation of the
temperature in a window of 20 minutes for each sensor. We can ask this question
177
9.5. Streaming Data with Azure
as follows. The
GROUPBY
operator gathers all the events in the window with the
same sensor name, and the
STDEV
operator computes the standard deviation for
each sensor’s temperature values. The term
tumbling window
indicates that
windows are fixed size, non-overlapping, and contiguous.
SELECT sensorname , STDEV( temperature)
FROM input
GROUPBY sensorname , TumblingWindow(minute ,20)
Azure Stream Analytics allows us to invoke a machine learning function in
the same way. For examp le, assume that we have an Azure Machine Learning
(ML) web service that can take the abstract for a scientific document and predict
a pair of classifications: a best guess and a second best guess. We can now take
the stream of scientific document events and directly invoke the classifier with the
following Stream Analytics query.
WITH subquery AS {
SELECT category , title , classify (category , title , abstract)
as result from streaminput
}
SELECT category , result .[ Scored Label], result .[ second], title
FROM
subquery
INTO
myblobstore
The
classify()
web service ignores the category and title and uses machine
learning libraries to analyze the text of the abstract and returns a result with two
fields,
Scored Label
and the second-choice score
second
. The result is stored as
a CSV file in an Azure blob. In the next chapter we describe how Azure ML can
be used to construct such a web service.
You might well wonder how well Azure Stream Analytics can keep up with
the flow of input events, given that it must invoke a separate web service for each
query evaluation . The short answer is “very well.” To demonstrate, we sent a batch
of 1,939 events to an Event Hub from a Jupyter notebook. Table 9.1 presents the
total time taken to process the entire batch. We also measure the time taken to
invoke the Azure ML service directly for all 1,939 events, with one invocation per
event. We also measure the time taken when we invoke the Azure ML service with
a bulk request, whereby all events are sent in one web service call.
We see that bulk processing of events can have a substantial impact on perfor-
mance. When requests are sent one at a time (the second row in the table), 1,93 9
events take 1,042 seconds, or approximately 0.5 seconds to make a single call to
178
Chapter 9. Streaming Data to the Cloud
Table 9.1: Time taken by Azure Stream Analytics to process 1,939 input events.
Metric Time (seconds)
Total time for Azure Event Hub invocation 162
Total time for direct Azure ML invocations 1,042
Total time for bulk Azure ML invocation 3.75
Average time between event arrivals to Event H ub 0.0825
Average time for web service call in Stream Analytics 0.0827
the Azure ML web service and return the result. However, if we send the events in
one large block request (the third row), it takes only 3.75 seconds, an average of
0.002 seconds per event. As the Stream Analytics system automatically groups
requests to the web service, it performs only 578 distinct invocations for the 1,939
events, as revealed by the automatic monitoring shown in figure 9.6. This grouping
allows it to process each event in an average of 0.0827 seconds (the fifth line in the
table), which matches the arrival rate of events (the fourth line).
Figure 9.6: Metrics captured for our experiment by Azure Stream Analytics. Each set of
three bars shows in turn output events, input events, and web service invocations.
We also experiment with doubling the event arrival rate by using two machines
to send the complete set of events simultaneousl y, and saw no significant impact on
performance. The Stream Analytics system kept u p and required only 598 Azure
ML function invocations, even though twice as many events were processed. You
might want to try an interesting exercise: use many input event sources to test
the limits of the Azure Stream Analytics system’s scalability.
179
9.6. Kafka, Storm, and Heron Streams
9.6 Kafka, Storm, and Heron Streams
Apache
Kafka kafka.apache.org
is an open source message system that incorpo-
rates publish-subscribe messaging and streaming. It is designed to run on a cluster
of servers and is highly scalable. Streams in Kafka are streams of records that are
segregated into topics. Each record has a key, a valu e, and a timestamp.
Kafka streaming can be simple—a single-stream client consuming events from
one or more topics—or more complex, based on sets of producers and consumers
organized into graphs, called
topologies
. We do not go into the details of Kafka
here. Instead, we provide a brief overview of two systems that are similarly based
on executing a directed graph of tasks in a dataflow style.
One of the earliest such systems wa s Storm, created by Nathan Marz and
released as open source by Twitter in late 2011. Storm was written in a dialect of
Lisp called Clojure that runs on the Java virtual machine. In 2015, Twitter rewrote
Storm to create the API-compatible Heron [
173
]
twitter.github.io/heron
. Since
Heron implements the same programming model and API as Storm, we discuss
Storm first and then say a few words about the Heron design.
Storm (as well as Heron) runs topologies, directed acyclic graphs whose nodes are
spouts
(data sources) and
bolts
(data transformation and processing). Figure 9.7
shows an example Storm topology.
Figure 9.7: An example Storm topology. On the left is the abstract topology as defined
by the program; on the right is the unrolled parallel topology as executed at runtime.
Storm has two programming models: classic and Trident, wi th the latter built on
the first. When using the Storm programming model, you express your application
180
Chapter 9. Streaming Data to the Cloud
logic by extending the basic spout and b olt classes and then using a topology
builder to tie everything together. Figure 9.8 shows a basic template for a bolt,
expressed in Java as that is Storm’s main programmi ng language interface. Three
methods are required: prepare(), execute(),anddeclareOutputFields().
The
prepare()
method is a special constructor that is called when the actual
instance is deployed on the remote machine. It is suppli ed with context about the
configuration and topology as well as a special object called the OutputCollector,
which is used to connect the bolt output to the output stream defined by the
topology. Th is method is used to instantiate your own data structures.
The basic data model for Storm/Heron is a stream of tuples. A tuple is just
that: a set of items, each of which needs only to be serializable. Some fields in
a tuple have names that are used for communicating information between bolts.
The declareOutputFields() method is used to declare the name of the fields in
a stream. We discus s this method further later.
The heart of the bolt is the
execute()
method, which is invoked once for each
new tuple that is sent to the bolt. This method contains the bolt’s computational
core and is also where results from the bolt process are sent to its output streams.
Other classes and styles of bolts are also provid ed. For example, one specialized
bolt class provides for sliding and tumbling windows.
Spouts are similar to classes; the most interesting are those that connect to
event providers such as Kafka or Event Hubs.
Havin g defined a set of bolts and spouts, you develop a program by using the
topology builder class to build the abstract topology and define how the parallelism
should be deployed. The key methods are
setBolt()
and
setSpout()
. Each takes
three arguments: the name of the spout or bolt instance, an instance of your spout
or bolt class, and an integer (the parallelism number ) that specifies how many
tasks to assign to execute the instance. A task is a single thread that is assigned
to a spout or bolt instance.
The code on the page after next shows how to create the topology of figure 9.7.
We see that there are two tasks for the spout, four for bolt A, three for bolt B,
and two for bolt C. Note that the two tasks for the spout are sent to four tasks for
bolt B. We use a stream grouping function to partition the two output streams
over the four tasks : specifically, shue grouping, which di stribu tes them randomly.
When mappi ng the four output streams from bolt A to the three tasks of bolt B,
we use a field grouping based on a field name to ensure that all tuples with the
same field name are mapped to the same task.
181
9.6. Kafka, Storm, and Heron Streams
public class MyBolt extends BaseRichBolt{
private OutputCollector collector;
public void prepare(Map config , TopologyContext context ,
OutputCollector collector) {
this.collector = collector;
}
public void execute(Tuple tuple) {
/*
* Execute is called when a new tuple has been delivered.
* Do your real work here . For example , create
* a list of words from the tuple and then emit
* those words to the default output stream.
*/
for(String word : words){
this.collector.emit(new Values(word));
}
}
public void declareOutputFields(OutputFieldsDeclarer declarer)
{
/*
* The declarer is how we declare our output fields in
* the default output stream. You can have more than
* one output stream , using declarestream. The emit ()
* in execute needs to identify the stream for each
* output value.
*/
declarer.declare(new Fields("word"));
}
}
Figure 9.8: A basic Store/Heron bolt template, with three methods:
prepare()
,
execute(),anddeclareOutputFields().
182
Chapter 9. Streaming Data to the Cloud
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("Spout", new MySpout (), 2);
builder.setBolt("BoltA", new MyBoltA(), 4). shuffleGrouping("spout");
builder.setBolt("BoltB", new MyBoltB(), 3). fieldsGrouping ("BoltA",
new Fields("word"));
builder.setBolt("BoltC", new MyBoltC(), 2). shuffelGrouping("spout")
Config config = new Config ();
LocalCluster cluster = new LocalCluster ();
cluster.submitTopology ("mytopology", config ,
builder.createTopology ());
Having described the Storm/Heron API, we provide some information on
the Heron implementation. In Heron, a set of container instances is depl oyed
to manage the execution of the topology, as shown in figure 9.9. The topology
master coordinates the execution of the topology on a set of other containers each
of which contains a stream manager and Heron instance processes that execute
the tasks for the bolts and s pouts. The communication between the bolts and
spouts is medi ated by the stream manager, and all the s tream managers are
connected together in an overlay network. (The topology master makes sure they
are all in communication.) H eron provides considerable performance improvements
over Storm. One improvement is better flow control of data from spouts when
the bolts are falling behind. For more detail on Heron, see the full paper [
173
].
Some of the best Storm tutorial materials can be found in Michael Noll’s blog
www.michael-noll.com .
Figure 9.9: Heron architecture detail.
183
9.7. Google Dataflow and Apache Beam
9.7 Google Dataflow and Apache Beam
Apache Beam
beam.apache.org
, the open source release of the Google Cloud
Dataflow system [
58
], is the most recent entry to the zoo of data stream analytics
solutions that we discu ss here. An im portant motivation for Beam (we write Beam
rather than Google Cloud Dataflow from now on, to save space) is to treat the
batch and streaming cases uniformly. The important concepts are as follows.
1. Pipelines, which encapsulate computation
2. PCollections, which represent data as they move through a pipeline
3. Transforms
, the computational transformations that operate on PCollec-
tions and produce PCollections
4. Sources
and
Sinks
, from which data are read and to which data are written,
respectively
A PCollection can comprise either a large but fixed-size set of elements or a
potentially unbounded stream. The elements in any PCollection are all of the same
type, but that type may be any serializable Java type. The creator of a PCollection
often appends a timestamp to each element at creation time, particularly when
dealing with unbounded collections. One important type of PCo llection that is
often used is the key-value PCollection,
KV<K, V>
, where
K
and
V
are the key and
value types, respectively. Note that PCollections are immutable: you cannot change
them, but you can apply transforms to trans late them into new PCollections.
Without going into the details of how you initialize a pipeline, we discuss here
how to create a PCollection of type
PCollection<String>
of strings from a file.
(The API uses the Java programming language, but we hope that it is still readable
by the Python programmer.)
Pipeline p = Pipeline.create(options );
PCollection <String > pc =
p.apply(TextIO.Read.from("/home/me/mybigtextfile.txt"))
We have used the pipeline operator
apply()
, which allows us to invoke the
special transform
TextIO
to read the file. Now we create a sequence of PColl ections
using the
apply()
method of the PCollection class. The library has five basic
transform types, most of which take a built-in or user-defined function object as
an argum ent and apply the function object to each element of the PCollection to
create a new PCollection:
184
Chapter 9. Streaming Data to the Cloud
Pardo
applies the function argum ent to each element of the input PCollection.
The computations are performed by the worker tasks allocated to this activity,
in what is basic embarrassingly parallel map parallelism.
GroupByKey
, when applied to a
KV<K,V>
type of PCollecti on, groups all
elements with the same key into a single li st, so that the resulting PCollectio n
is of type
KV<K, Iterable<V> >
. In other words, this is the shue phase of
a MapReduce.
Combine
applies an operation that reduces a PCollection to a PCol lectio n
with a single element. If the PCollection is windowed, the result is a PCollec-
tion with the combined result for each window. Another type of combining
is for key-grouped PCollections.
Flatten combines PCollections of the same type into a single PCollection.
Windowing and Triggers
are used to define mechanisms for window
operations.
To illustrate some of these features, we consider an environmental sensor
example in which each event consists of an instrument type, location, and numerical
reading. We compute the average temperature for sensors of type
tempsensor
for each location using a sliding window. For the sake of illustration, we use an
imaginary pub-sub system to get the events from the instrument stream. We
suppose that the events are delivered to our system in the form of a Java object
from the class InstEvnt declared in Beam as follows.
@DefaultCoder(AvroCoder.class)
static class InstEvent{
@Nullable String instType;
@Nullable String location;
@Nullable Double reading;
public InstEvent( ....)
public String getInstType (){ ...}
public String getLocation (){ ...}
public String getReading (){ ...}
}
This class defini tion illustrates how a custom serializable type looks like in
Beam. We can now create our stream from our fictitious pub-sub system with the
following li nes.
185
9.7. Google Dataflow and Apache Beam
PCollection <InstEvent > input =
pipeline.apply(PubsubIO.Read
.timestampLabel(PUBSUB_TIMESTAMP_LABEL_KEY)
.subscription(options.getPubsubSubscription()));
We next filter out all but the “tempsensor” events. While we are at it, let’s
convert the stream so that the output is a stream of key-value pairs corresponding
to (location, reading). To do that, we need a special fun ction to feed to the
ParDo
operator, as follows.
static class FilterAndConvert extends DoFn < InstEvent ,
KV< S tr in g , Dou bl e > > {
@Override
public void processElement(ProcessContext c) {
InstEvent ev = c.element ();
if (ev.getInstType() == "tempsensor")
c.output(KV<String , Double >.
of (ev. getLocatio n (), ev . getReading ));
}
}
Now we can apply a
FilterAndConvert
operator to our input stream. We
also create a sliding window of events of duration five minutes, created every two
minutes. Note that the window is measured in terms of the timestamps on the
events, rather than the processing time.
PCollection <KV<String , Float >> reslt =
input.apply (Pardo.of(new FilterAndConvert ())
.apply(Window.<KV<String, Double>> into(SlidingWindows.of(
Duration.standardMinutes (5))
.every(Duration.standardMinutes(2))))
Our stream
reslt
is now a
KV<String,Double>
type, and we can apply a
GroupByKey and Combine operation to reduce this to a
KV<String,Double>
,
with each location key mapping to the average temperature. Beam provides
several variations of this si mpl e MapReduce operation. One is perfect for this case:
Mean.perKey(), which combines both steps in a single transformation.
PCollection <KV<String , Double>> avetemps
=reslt.apply(Mean.<String,Double>perKey());
We can now take the set of average temperatures for each window and send
them to an output file.
186
Chapter 9. Streaming Data to the Cloud
PCollection <String > outstrings =
avetemps.apply(Pardo.of(new KVToString ())
.apply(TextIO.Write.named("WritingToText")
.to("/my/path/to/temps")
.withSuffix(".txt"));
We must define th e function class
KVToString()
in a manner similar to the
FilterAndConvert class above. We call attention to two points. First, we have used
an implicit trigger that generates the means and output at the end of the window.
Second, because the windows overlap, events end up in more than one window.
Beam has several other types of trig gers. For example, you can have a data-
driven trigger that looks at the data as it is coming and fires when some condition
you have set is met. Anoth er type is based on a concept introduced by Google
Dataflow called the
watermark
. A watermark is based on event time and is used
to emi t results when the system estimates that it has seen all the data in a given
window. You can use a variety of methods to define triggers, based on dierent
ways of specifying the watermark. We refer you to the Google Dataflow documents
for details cloud.google.com/dataflow/ .
9.8 Apache Flink
Finally we describe the open source Apache Flink stream processing framework.
Flink can be used in its own rig ht; in addition, a component called the Apache
Flink Runner can be used to execute Beam pipelines. (Many of the same core
concepts exist in Flink and Beam.)
As with the other systems described in th is chapter, Flink takes input streams
from one or more sources, which it connects by a directed graph to a set of
sinks, and runs on the Java virtual machine. It supports both Java and Scala
APIs. (There is also an incomplete
Flink Python API
, with similarities to Spark
Streaming.) To illustrate the use of this API, we show a Flink implementation of
our instrum ent filter from the Beam example. The Flink Kinesis Producer is still
a work in progress, so we tested this code by reading a stream from a CSV file.
Since the Flink data types do not include the Python dictionary/JSON types, we
use here a simple tuple format. Each line in the input stream has this structure:
instrument-type string, location string, the word "value", float
For example:
tempsensor, pine street and second, value, 72.3
187
9.8. Apache Flin k
After reading from the file (or Kinesis shard), the records in the stream d ata
are now four-tuples of type
(STRING, STRING, STRING, FLOAT)
. The core of the
Flink version of the temperature sensor averager is as follows.
class MeanReducer(ReduceFunction ):
def reduce( self ,x,y):
return (x[0], x[1], x[2], x[3] + y[3], x[4] + y[4])
env = get_environment ()
data = env. add_source( FlinkKinesisProducer( ? ) ? )
results = data \
. filter(lambda x: x[0]==' tempsensor ')\
. map (lambda x: (x[0], x[1], x[2], x[3], 1.0)) \
.group_by(1) \
. reduce(MeanReducer()) \
. map (lambda x: ' location: '+x[1]+' average temp%f' %( x [3]/ x [4]))
The
filter
operation is identical to the Spark Streaming case. After filtering
the data, we turn each record into a five-tuple by appending 1 .0 to the end of the
four-tuple. The
group_by
and
reduce
calls use the MeanRedu cer function. The
group_by
is a signal to shue these so that they are keyed by field in position 1,
which corresponds to the location string. We then apply the reduction to each of
the grouped tuple sets. This operation is the same as the reduceByKey function
in the Spark Streaming example. The final map converts each element to a string
that gives the average temperature for each location.
Not shown in this example are Flink’s windowing operators (similar to Beam’s)
and its underlying execution architecture. In a manner similar to the other systems
described here, Flink parallelizes the stream and tasks during execution. For
example, we can view our temperature sensor example as a set of tasks that may
be executed in parallel, as shown in figure 9.10 on the following page.
The Flink distributed execution engine is based on a standard master worker
model. A Flink source program is compiled into an execution d ata flow graph
and sent to a job manager node by a client system. The job manager executes
the stream and transformations on remote Java virtual machines that run a task
manager. The task manager partitions its available resources into task s lots, to
which the individual tasks defined by the grap h execution nodes are assigned. The
job manager and task managers manage the data communication streams between
the graph nodes. The Apache Flink docum entation [
2
] explains this execution
model, Flink windowing, and other details of the programming model.
188
Chapter 9. Streaming Data to the Cloud
Figure 9.10: Flink logical task view and parallel execution view.
9.9 Summary
We have examined and illustrated the use of five dierent systems: Spark Streaming
with Kinesis, Azure Stream Analytics, Storm/Heron, Google Dataflow/Beam, and
Flink. Each has been used in critical production deployments and proven successful
for its intended applications. All share some of the same concepts and create
pipelines in similar ways. There are a lso dierences: for example, Storm/Heron
explicitly constructs graphs from nodes and edges, whereas the others use a
functional style of pipeline composition.
Conceptually the greatest dierence arises when comparing Spark Streaming
with the others and, in particular, with Beam. Akidau and Perry make a com-
pelling argument [
59
] for the superiority of the Beam model over Spark Streaming.
They point out that Spark is a batch system for which a streaming mode has
been attached, whereas Beam was designed from the ground u p to be streaming
with obvious batch capabilities. Spark windowing is based on the RDD in the
DStream, which is clearly not as flexible as Beam windows. A more significant
point concerns Beam’s recognition that event time and processing time are not the
same. This dierence becomes critical in dealing with out-of-order events, which
clearly can arise in wid ely distributed situations. Beam’s introducti on of event-time
windows, triggers, and watermarks are a major contribution that clarifies important
correctness issues when events are out of order, while still al lowing the generation
of approximate results in a timely manner.
189
9.10. Resources
The open source streaming tools that we have described here do not cover all
of the scientific cases that we mentioned early in the chapter. For example, the
ADIOS application requires the processing of events that are each of a size well
beyond the limits of Spark Streaming, Storm/Heron, an d Beam. In other scientific
applications, the volume and rate of data generation are so large that we cannot
keep data at rest for long. For example, the data to be produced by th e Square
Kilometer Array will be far too large to contemplate keeping [
211
]; therefore, the
data will have to be processed immedia tely to produce a reduced stream. A noth er
important aspect of large-scale streaming scientific data analysis is computational
steering: the need for a human or smart processes to analyze the data stream for
quality or relevance and then make rapid adjustments to the source instruments or
simulations. Such requi rements place further demands on streaming systems.
Science does not have the deep pockets of Google or Amazon when it comes
to IT resources. Budgets are dominated by massive experimental facilities and
supercomputers, and projects tend to produce custom designed solutions. And
each experimental domain is suciently unique that few common tools beyon d
MPI exist. This is a world of bespoke software systems.
One can argue that the Twitter, Google, and Apache projects discussed here
are also custom built for the problem s that each is designed to solve. But each
project h as many project committers, often supported by companies to ensure
that the software works for dierent problems. This situation does not mean that
the cloud and open source tools are of no value to the world of “big science.” The
Google and Amazon clouds have both been used to process data from the Large
Hadron Collider. We expect many of the cloud streaming technologies described
here to eventually be adapted to scientific solutions. Likewise, we expect tools
develo ped in scientific laboratories to migrate to the cloud.
9.10 Resources
There are excellent online tutorials on Amazon Kinesis
aws.amazon.com/kinesis
and Azure Stream Analytics
docs.microsoft.com/en-us/azure/stream-analytics
.
Amazon’s examples include demonstrations of log analysis, mobile da ta capture
and game data feeds. The Azure demos include examples of how to do sentiment
analysis of Twitter streams as well as fraud analysis. Google Dataflow streaming
also has good examples [
23
], including Wikipedia session analysis, trac routes,
and maximum flows from trac sensors.
190