The first step to using this is to deploy an aws emr cluster using the spark option.
Then modify the the port setting in the security profile so that port 8192 is exposed and your ssh key pair is set correctlly. when it comes up login to to the master with
$ssh -i /path-to-your-keyfile/keyfile.pem hadoop@ipaddress-of-masternext do the following
$sudo pip install ipython[notebook]$ipython profile create default
$echo "c = get_config()" > /home/hadoop/.ipython/profile_default/ipython_notebook_config.py $echo "c.NotebookApp.ip = '*'" >> /home/hadoop/.ipython/profile_default/ipython_notebook_config.py $echo "c.NotebookApp.open_browser = False" >> /home/hadoop/.ipython/profile_default/ipython_notebook_config.py $echo "c.NotebookApp.port = 8192" >> /home/hadoop/.ipython/profile_default/ipython_notebook_config.py $export PYSPARK_DRIVER_PYTHON=/usr/local/bin/jupyter $export PYSPARK_DRIVER_PYTHON_OPTS="notebook" $export MASTER=yarn </pre> Next create a file called runjup.sh with one line
PYSPARK_DRIVER_PYTHON=/usr/local/bin/jupyter /usr/lib/spark/bin/pyspark --master yarn(there may be some redundancy here, but this works) finally execute$nohup ./runjup.sh &The examples here are based on Wikipedia. if you want to play with the full wikipedia dump do$hdadoop fs -mkdir /user/wiki$curl -s -L http://dumps.wikimedia.org/enwiki/20161020/enwiki-20161020-pages-articles-multistream.xml.bz2 \ | bzip2 -cd | hadoop fs -put - /user/wiki/wikidump-en.xml <\pre>
The first thing we will do is look at a sample of wikipedia access logs from 2008 to 2010. It is a small sample.
import numpy as np
import pyspark
The spark context should already be there.
# what one does on the containerized spark-jupyter is
#sc = pyspark.SparkContext('local[*]')
#here it is
sc
txtfile = sc.textFile("s3://support.elasticmapreduce/bigdatademo/sample/wiki")
Lets make suser we have at least 10 partitions. And split the text lines into lists along blank space separators.
txtfile = txtfile.repartition(10)
def parseline(line):
return np.array([x for x in line.split(' ')])
data = txtfile.map(parseline)
we are next going to look for the page references that mention famous folks and see how may hits there are.
def filter_fun(row, titles):
for title in titles:
if row[1].find(title) > -1:
return True
else:
return False
namelist = ['Albert_Einstein','Lady_Gaga','Barack_Obama','Richard_Nixon','Steve_Jobs', 'Bill_Clinton', 'Bill_Gates', 'Michael_Jackson',
'Justin_Bieber','Dante_Alighieri' 'Shakespeare', 'Byron', 'Donald_Trump', 'Hillary_Clinton', 'Werner_Heisenberg',
'Arnold_Schwarzenegger', 'Elon_Musk', 'Nicolas_Sarkozy', 'Vladimir_Putin', 'Vladimir_Lenin', 'Karl_Marx',
'Groucho_Marx']
filterd = data.filter(lambda p: filter_fun(p, namelist))
def mapname(row, names):
for name in names:
if row[1].find(name) > -1:
return name
else:
return 'huh?'
The RDD filtered has only the tuples that contain one of our names in namelist.
we next map that to tuples of the form (name, reference count)
We then reduce that by the key and sum the counts.
remaped = filterd.map(lambda row: (mapname(row, namelist), int(row[2]) )).reduceByKey(lambda v1, v2: v1+v2)
remaped.takeOrdered(100, key = lambda x: -x[1])
What happens if we look for just part of the name?
snamelist = ['Einstein','Gaga','Obama','Nixon','Jobs', 'Clinton', 'Gates', 'Jackson',
'Bieber','Dante', 'Shakespeare', 'Byron', 'Trump', 'Heisenberg',
'Schwarzenegger', 'Musk', 'Sarkozy', 'Putin', 'Lenin', 'Marx',
'Marx']
sfilterd = data.filter(lambda p: filter_fun(p, snamelist))
sremaped = sfilterd.map(lambda row: (mapname(row, snamelist), int(row[2]) )).reduceByKey(lambda v1, v2: v1+v2)
sremaped.takeOrdered(100, key = lambda x: -x[1])
let's see how many page references there are here. we can delete pages with non readable titles.
bdata = data.map(lambda row: (row[1], int(row[2])))
cdata = bdata.filter(lambda p: p[0].find('%') < 0)
cdata.take(100)
cdata.count()
these are xml file with one line per file line.
we can extract the titles of each of the listings
wikidump = sc.textFile("hdfs:///user/wiki/wikidump-en.xml")
wikidump.count()
wikidump.getNumPartitions()
def findtitle(line):
if line.find('<title>') > -1:
return True
else:
return False
titles = wikidump.filter(lambda p: findtitle(p))
titles.count()
titles.cache()
titles.take(200)
To get more from the wikipedia xml files you need to seperate the lines for each xml file and then parse the xml to something spark can process.