Spark on AWS Elastic Map Reduce.

The first step to using this is to deploy an aws emr cluster using the spark option.

Then modify the the port setting in the security profile so that port 8192 is exposed and your ssh key pair is set correctlly. when it comes up login to to the master with

$ssh -i /path-to-your-keyfile/keyfile.pem  hadoop@ipaddress-of-master
next do the following

$sudo pip install ipython[notebook]

$ipython profile create default

$echo "c = get_config()" > /home/hadoop/.ipython/profile_default/ipython_notebook_config.py $echo "c.NotebookApp.ip = '*'" >> /home/hadoop/.ipython/profile_default/ipython_notebook_config.py $echo "c.NotebookApp.open_browser = False" >> /home/hadoop/.ipython/profile_default/ipython_notebook_config.py $echo "c.NotebookApp.port = 8192" >> /home/hadoop/.ipython/profile_default/ipython_notebook_config.py $export PYSPARK_DRIVER_PYTHON=/usr/local/bin/jupyter $export PYSPARK_DRIVER_PYTHON_OPTS="notebook" $export MASTER=yarn </pre> Next create a file called runjup.sh with one line

PYSPARK_DRIVER_PYTHON=/usr/local/bin/jupyter /usr/lib/spark/bin/pyspark --master yarn
(there may be some redundancy here, but this works) finally execute

$nohup ./runjup.sh &
The examples here are based on Wikipedia. if you want to play with the full wikipedia dump do

$hdadoop fs -mkdir /user/wiki

$curl -s -L http://dumps.wikimedia.org/enwiki/20161020/enwiki-20161020-pages-articles-multistream.xml.bz2 \ | bzip2 -cd | hadoop fs -put - /user/wiki/wikidump-en.xml <\pre>

The first thing we will do is look at a sample of wikipedia access logs from 2008 to 2010. It is a small sample.

In [6]:
import numpy as np
import pyspark

The spark context should already be there.

In [21]:
# what one does on the containerized spark-jupyter is
#sc = pyspark.SparkContext('local[*]')
#here it is
sc
Out[21]:
<pyspark.context.SparkContext at 0x7f1453442850>
In [22]:
txtfile = sc.textFile("s3://support.elasticmapreduce/bigdatademo/sample/wiki")

Lets make suser we have at least 10 partitions. And split the text lines into lists along blank space separators.

In [11]:
txtfile = txtfile.repartition(10)
In [23]:
def parseline(line):
    return np.array([x for x in line.split(' ')])
In [24]:
data = txtfile.map(parseline)

we are next going to look for the page references that mention famous folks and see how may hits there are.

In [25]:
def filter_fun(row, titles):
    for title in titles:
        if row[1].find(title) > -1:
            return True
    else:
        return False
In [26]:
namelist = ['Albert_Einstein','Lady_Gaga','Barack_Obama','Richard_Nixon','Steve_Jobs', 'Bill_Clinton', 'Bill_Gates', 'Michael_Jackson', 
            'Justin_Bieber','Dante_Alighieri' 'Shakespeare', 'Byron', 'Donald_Trump', 'Hillary_Clinton', 'Werner_Heisenberg',
            'Arnold_Schwarzenegger', 'Elon_Musk', 'Nicolas_Sarkozy', 'Vladimir_Putin', 'Vladimir_Lenin', 'Karl_Marx',
            'Groucho_Marx']
In [27]:
filterd = data.filter(lambda p: filter_fun(p, namelist))
In [28]:
def mapname(row, names):
    for name in names:
        if row[1].find(name) > -1:
            return name
    else:
        return 'huh?'

The RDD filtered has only the tuples that contain one of our names in namelist.

we next map that to tuples of the form (name, reference count)

We then reduce that by the key and sum the counts.

In [30]:
remaped = filterd.map(lambda row: (mapname(row, namelist), int(row[2]) )).reduceByKey(lambda v1, v2: v1+v2)
In [13]:
remaped.takeOrdered(100, key  = lambda x: -x[1])
Out[13]:
[('Lady_Gaga', 4427),
 ('Bill_Clinton', 4221),
 ('Michael_Jackson', 3310),
 ('Barack_Obama', 2518),
 ('Justin_Bieber', 2234),
 ('Albert_Einstein', 1609),
 ('Byron', 964),
 ('Karl_Marx', 892),
 ('Arnold_Schwarzenegger', 820),
 ('Bill_Gates', 799),
 ('Steve_Jobs', 613),
 ('Vladimir_Putin', 563),
 ('Richard_Nixon', 509),
 ('Vladimir_Lenin', 283),
 ('Donald_Trump', 272),
 ('Nicolas_Sarkozy', 171),
 ('Hillary_Clinton', 162),
 ('Groucho_Marx', 152),
 ('Werner_Heisenberg', 92),
 ('Elon_Musk', 21)]
In [ ]:
What happens if we look for just part of the name?    
In [14]:
snamelist = ['Einstein','Gaga','Obama','Nixon','Jobs', 'Clinton', 'Gates', 'Jackson', 
            'Bieber','Dante', 'Shakespeare', 'Byron', 'Trump', 'Heisenberg',
            'Schwarzenegger', 'Musk', 'Sarkozy', 'Putin', 'Lenin', 'Marx',
            'Marx']
In [102]:
sfilterd = data.filter(lambda p: filter_fun(p, snamelist))
sremaped = sfilterd.map(lambda row: (mapname(row, snamelist), int(row[2]) )).reduceByKey(lambda v1, v2: v1+v2)
sremaped.takeOrdered(100, key  = lambda x: -x[1])
Out[102]:
[('Jackson', 17056),
 ('Dante', 7289),
 ('Clinton', 6923),
 ('Gaga', 5527),
 ('Obama', 3563),
 ('Marx', 2548),
 ('Einstein', 2291),
 ('Bieber', 2284),
 ('Gates', 2124),
 ('Shakespeare', 1867),
 ('Nixon', 1344),
 ('Lenin', 1343),
 ('Trump', 984),
 ('Byron', 964),
 ('Schwarzenegger', 906),
 ('Musk', 861),
 ('Jobs', 830),
 ('Putin', 723),
 ('Sarkozy', 297),
 ('Heisenberg', 149)]

let's see how many page references there are here. we can delete pages with non readable titles.

In [27]:
bdata = data.map(lambda row: (row[1], int(row[2])))
In [28]:
cdata = bdata.filter(lambda p:  p[0].find('%') < 0)
In [29]:
cdata.take(100)
Out[29]:
[(u'Special:WhatLinksHere/MediaWiki:Group-Ombudsmen', 1),
 (u'MediaWiki:Ipbexpiry', 1),
 (u'Wikipedia:Community_Portal', 1),
 (u'Antartika', 2),
 (u'Baku', 2),
 (u'Berkas:Coat_of_Amrs_of_Bashkortostan.svg', 1),
 (u'Berkas:Destroyed_Warsaw,_capital_of_Poland,_January_1945.jpg', 1),
 (u'Berkas:Flag_of_Omsk_Oblast.svg', 1),
 (u'Berkas:Mongolia_Topography.png', 1),
 (u'Berkas:Southeast_asia.svg', 1),
 (u'Berkas:Waterfallweh.jpg', 1),
 (u'Busan', 1),
 (u'Dublin', 1),
 (u'Haeju', 2),
 (u'Hyesan', 1),
 (u'Istimewa:Penggunaan_global/Crystal_Clear_action_run.png', 1),
 (u'Istimewa:Pranala_balik/Berkas:Flag_of_Smolensk_Oblast.png', 1),
 (u'Istimewa:Pranala_balik/Berkas:User_Abigor_global1.jpg', 1),
 (u'Jeollanam-do', 1),
 (u'Kategori:Neugara', 1),
 (u'Lithuania', 1),
 (u'P', 1),
 (u'Sipak_Bhan', 1),
 (u'Tokyo', 1),
 (u'islamabad', 1),
 (u'Gebruiker:Az1568', 1),
 (u'MediaWiki:Import-logentry-upload', 1),
 (u'Spesiaal:OnlangseVeranderingsMetSkakels/MediaWiki:Editgroup', 1),
 (u'Spesiaal:Recentchangeslinked/MediaWiki:Exif-componentsconfiguration-3', 1),
 (u'Spesiaal:Recentchangeslinked/Wikibooks:Community_Portal', 1),
 (u'Spesiaal:Skakels_hierheen/MediaWiki:Nosuchactiontext', 1),
 (u'Spesiaal:WhatLinksHere/MediaWiki:Wikititlesuffix', 1),
 (u'user:Renier_Maritz', 1),
 (u'15_Junie', 1),
 (u'Cathy', 1),
 (u'Etruskers', 1),
 (u'Gebruiker:Laurens', 1),
 (u'Gebruikerbespreking:Spacebirdy', 1),
 (u'Hulp:geslag', 1),
 (u'Jamani', 1),
 (u'Kategorie:Gebruiker_lb-2', 1),
 (u'Kategorie:Woorde_in_Galicies', 1),
 (u'Kategorie:Woorde_in_Roemeens', 1),
 (u'November', 1),
 (u'Sjabloon:aie', 1),
 (u'Spesiaal:GlobalUsage/Wikiquote-logo.svg', 1),
 (u'Spesiaal:OnlangseVeranderingsMetSkakels/AKE', 1),
 (u'Spesiaal:OnlangseVeranderingsMetSkakels/Afrikan', 1),
 (u'Spesiaal:OnlangseVeranderingsMetSkakels/Bulgaars', 1),
 (u'Spesiaal:OnlangseVeranderingsMetSkakels/English', 1),
 (u'Spesiaal:OnlangseVeranderingsMetSkakels/Gebruiker:Szajci', 1),
 (u'Spesiaal:OnlangseVeranderingsMetSkakels/Japans', 1),
 (u'Spesiaal:OnlangseVeranderingsMetSkakels/Kategorie:Sjinese_skryftekens', 1),
 (u'Spesiaal:OnlangseVeranderingsMetSkakels/Kategorie:Woorde_in_Ou_Hoogduits',
  1),
 (u'Spesiaal:OnlangseVeranderingsMetSkakels/Libro_de_Mormon', 1),
 (u'Spesiaal:OnlangseVeranderingsMetSkakels/Nieu-Noors', 1),
 (u'Spesiaal:OnlangseVeranderingsMetSkakels/Ou_Hoogduits', 1),
 (u'Spesiaal:OnlangseVeranderingsMetSkakels/Sjabloon:-gl-', 1),
 (u'Spesiaal:OnlangseVeranderingsMetSkakels/Sjabloon:betekenisse', 1),
 (u'Spesiaal:OnlangseVeranderingsMetSkakels/Stiftung', 1),
 (u'Spesiaal:OnlangseVeranderingsMetSkakels/WikiWoordenboek', 1),
 (u'Spesiaal:OnlangseVeranderingsMetSkakels/africain', 1),
 (u'Spesiaal:OnlangseVeranderingsMetSkakels/balo', 1),
 (u'Spesiaal:OnlangseVeranderingsMetSkakels/dalur', 1),
 (u'Spesiaal:OnlangseVeranderingsMetSkakels/eau', 1),
 (u'Spesiaal:OnlangseVeranderingsMetSkakels/ewe', 1),
 (u'Spesiaal:OnlangseVeranderingsMetSkakels/garoto', 1),
 (u'Spesiaal:OnlangseVeranderingsMetSkakels/interlingua', 1),
 (u'Spesiaal:OnlangseVeranderingsMetSkakels/jade', 1),
 (u'Spesiaal:OnlangseVeranderingsMetSkakels/land', 1),
 (u'Spesiaal:OnlangseVeranderingsMetSkakels/meisje', 1),
 (u'Spesiaal:OnlangseVeranderingsMetSkakels/nafn', 1),
 (u'Spesiaal:OnlangseVeranderingsMetSkakels/olho', 1),
 (u'Spesiaal:OnlangseVeranderingsMetSkakels/papiamento', 1),
 (u'Spesiaal:OnlangseVeranderingsMetSkakels/sech', 1),
 (u'Spesiaal:OnlangseVeranderingsMetSkakels/xoves', 1),
 (u'Spesiaal:RecentChangesLinked/AHI', 1),
 (u'Spesiaal:RecentChangesLinked/Afrikanca', 1),
 (u'Spesiaal:RecentChangesLinked/Danicus', 1),
 (u'Spesiaal:RecentChangesLinked/Gebruiker:Emijrp', 1),
 (u'Spesiaal:RecentChangesLinked/Grieks', 1),
 (u'Spesiaal:RecentChangesLinked/Kategorie:Robotte', 1),
 (u'Spesiaal:RecentChangesLinked/Kiafrikana', 1),
 (u'Spesiaal:RecentChangesLinked/Ou_Fries', 1),
 (u'Spesiaal:RecentChangesLinked/Sjabloon:-ur-', 1),
 (u'Spesiaal:RecentChangesLinked/Spaans', 1),
 (u'Spesiaal:RecentChangesLinked/adresse', 1),
 (u'Spesiaal:RecentChangesLinked/aqua', 1),
 (u'Spesiaal:RecentChangesLinked/basis', 1),
 (u'Spesiaal:RecentChangesLinked/cras', 1),
 (u'Spesiaal:RecentChangesLinked/eiginkona', 1),
 (u'Spesiaal:RecentChangesLinked/francese', 1),
 (u'Spesiaal:RecentChangesLinked/intransitive', 1),
 (u'Spesiaal:RecentChangesLinked/kameelperd', 1),
 (u'Spesiaal:RecentChangesLinked/liber', 1),
 (u'Spesiaal:RecentChangesLinked/nom', 1),
 (u'Spesiaal:RecentChangesLinked/septembre', 1),
 (u'Spesiaal:RecentChangesLinked/substantiv', 1),
 (u'Spesiaal:RecentChangesLinked/tylko', 1),
 (u'Spesiaal:RecentChangesLinked/yes', 1)]
In [48]:
cdata.count()
Out[48]:
3240116

Now look at full wikipedia dump from HDFS

these are xml file with one line per file line.
we can extract the titles of each of the listings

In [33]:
wikidump = sc.textFile("hdfs:///user/wiki/wikidump-en.xml")
In [38]:
wikidump.count()
Out[38]:
927769981
In [40]:
wikidump.getNumPartitions()
Out[40]:
441
In [42]:
def findtitle(line):
    if line.find('<title>') > -1:
        return True
    else:
        return False
In [43]:
titles = wikidump.filter(lambda p: findtitle(p))
In [45]:
titles.count()
Out[45]:
17008269
In [46]:
titles.cache()
Out[46]:
PythonRDD[29] at RDD at PythonRDD.scala:48
In [47]:
titles.take(200)
Out[47]:
[u'    <title>AccessibleComputing</title>',
 u'    <title>Anarchism</title>',
 u'    <title>AfghanistanHistory</title>',
 u'    <title>AfghanistanGeography</title>',
 u'    <title>AfghanistanPeople</title>',
 u'    <title>AfghanistanCommunications</title>',
 u'    <title>AfghanistanTransportations</title>',
 u'    <title>AfghanistanMilitary</title>',
 u'    <title>AfghanistanTransnationalIssues</title>',
 u'    <title>AssistiveTechnology</title>',
 u'    <title>AmoeboidTaxa</title>',
 u'    <title>Autism</title>',
 u'    <title>AlbaniaHistory</title>',
 u'    <title>AlbaniaPeople</title>',
 u'    <title>AsWeMayThink</title>',
 u'    <title>AlbaniaGovernment</title>',
 u'    <title>AlbaniaEconomy</title>',
 u'    <title>Albedo</title>',
 u'    <title>AfroAsiaticLanguages</title>',
 u'    <title>ArtificalLanguages</title>',
 u'    <title>AbacuS</title>',
 u'    <title>AbalonE</title>',
 u'    <title>AbbadideS</title>',
 u'    <title>AbbesS</title>',
 u'    <title>AbbevilleFrance</title>',
 u'    <title>AbbeY</title>',
 u'    <title>AbboT</title>',
 u'    <title>Abbreviations</title>',
 u'    <title>AtlasShrugged</title>',
 u'    <title>ArtificialLanguages</title>',
 u'    <title>AtlasShruggedCharacters</title>',
 u'    <title>AtlasShruggedCompanies</title>',
 u'    <title>AyersMusicPublishingCompany</title>',
 u'    <title>AfricanAmericanPeople</title>',
 u'    <title>AdolfHitler</title>',
 u'    <title>AbeceDarians</title>',
 u'    <title>AbeL</title>',
 u'    <title>AbensbergGermany</title>',
 u'    <title>AberdeenSouthDakota</title>',
 u'    <title>ArthurKoestler</title>',
 u'    <title>AynRand</title>',
 u'    <title>AlexanderTheGreat</title>',
 u'    <title>AnchorageAlaska</title>',
 u'    <title>ArgumentForms</title>',
 u'    <title>ArgumentsForTheExistenceOfGod</title>',
 u'    <title>AnarchY</title>',
 u'    <title>AsciiArt</title>',
 u'    <title>AcademyAwards</title>',
 u'    <title>AcademyAwards/BestPicture</title>',
 u'    <title>AustriaLanguage</title>',
 u'    <title>AcademicElitism</title>',
 u'    <title>AxiomOfChoice</title>',
 u'    <title>AmericanFootball</title>',
 u'    <title>AnnaKournikova</title>',
 u'    <title>AndorrA</title>',
 u'    <title>AustroAsiaticLanguages</title>',
 u'    <title>ActresseS</title>',
 u'    <title>A</title>',
 u'    <title>AnarchoCapitalism</title>',
 u'    <title>AnarchoCapitalists</title>',
 u'    <title>ActressesS</title>',
 u'    <title>AnAmericanInParis</title>',
 u'    <title>AutoMorphism</title>',
 u'    <title>ActionFilm</title>',
 u'    <title>Alabama</title>',
 u'    <title>AfricA</title>',
 u'    <title>Achilles</title>',
 u'    <title>AppliedStatistics</title>',
 u'    <title>Abraham Lincoln</title>',
 u'    <title>Aristotle</title>',
 u'    <title>An American in Paris</title>',
 u'    <title>Academy Award for Best Production Design</title>',
 u'    <title>Academy Awards</title>',
 u'    <title>Action Film</title>',
 u'    <title>Actrius</title>',
 u'    <title>Animalia (book)</title>',
 u'    <title>International Atomic Time</title>',
 u'    <title>Altruism</title>',
 u'    <title>AutoRacing</title>',
 u'    <title>Ayn Rand</title>',
 u'    <title>Alain Connes</title>',
 u'    <title>Allan Dwan</title>',
 u'    <title>Algeria/People</title>',
 u'    <title>Algeria/Transnational Issues</title>',
 u'    <title>Algeria</title>',
 u'    <title>List of Atlas Shrugged characters</title>',
 u'    <title>Topics of note in Atlas Shrugged</title>',
 u'    <title>Anthropology</title>',
 u'    <title>Agricultural science</title>',
 u'    <title>Alchemy</title>',
 u'    <title>Air Transport</title>',
 u'    <title>Alien</title>',
 u'    <title>Astronomer</title>',
 u'    <title>Ameboid stage</title>',
 u'    <title>ASCII</title>',
 u'    <title>Ashmore And Cartier Islands</title>',
 u'    <title>Austin (disambiguation)</title>',
 u'    <title>Animation</title>',
 u'    <title>Apollo</title>',
 u'    <title>Andre Agassi</title>',
 u'    <title>Artificial languages</title>',
 u'    <title>Austroasiatic languages</title>',
 u'    <title>Afro-asiatic languages</title>',
 u'    <title>Afroasiatic languages</title>',
 u'    <title>Andorra</title>',
 u'    <title>Andorra/Transnational issues</title>',
 u'    <title>Arithmetic mean</title>',
 u'    <title>American Football Conference</title>',
 u'    <title>Albert Gore</title>',
 u'    <title>AnEnquiryConcerningHumanUnderstanding</title>',
 u'    <title>Animal Farm</title>',
 u'    <title>Amphibian</title>',
 u'    <title>Albert Arnold Gore/Criticisms</title>',
 u'    <title>Alaska</title>',
 u'    <title>Auteur Theory Film</title>',
 u'    <title>Agriculture</title>',
 u'    <title>Aldous Huxley</title>',
 u'    <title>Abstract Algebra</title>',
 u'    <title>Ada</title>',
 u'    <title>Aberdeen (disambiguation)</title>',
 u'    <title>Algae</title>',
 u'    <title>Analysis of variance</title>',
 u'    <title>ANOVA</title>',
 u'    <title>Alkane</title>',
 u'    <title>Appellate procedure in the United States</title>',
 u'    <title>Answer</title>',
 u'    <title>Appellate court</title>',
 u'    <title>Arithmetic and logic unit</title>',
 u'    <title>Actress</title>',
 u'    <title>Arraignment</title>',
 u'    <title>America the Beautiful</title>',
 u'    <title>Assistive technology</title>',
 u'    <title>Accessible computing</title>',
 u'    <title>Abacus</title>',
 u'    <title>Acid</title>',
 u'    <title>Asphalt</title>',
 u'    <title>American National Standards Institute</title>',
 u'    <title>Argument (disambiguation)</title>',
 u'    <title>Apollo 11</title>',
 u'    <title>Apollo 8</title>',
 u'    <title>Astronaut</title>',
 u'    <title>A Modest Proposal</title>',
 u'    <title>Alkali metal</title>',
 u'    <title>Argument form</title>',
 u'    <title>Allotrope</title>',
 u'    <title>Alphabet</title>',
 u'    <title>Atomic number</title>',
 u'    <title>Anatomy</title>',
 u'    <title>Affirming the consequent</title>',
 u'    <title>Andrei Tarkovsky</title>',
 u'    <title>Ambiguity</title>',
 u'    <title>Abel</title>',
 u'    <title>Animal (disambiguation)</title>',
 u'    <title>Aardvark</title>',
 u'    <title>Aardwolf</title>',
 u'    <title>Adobe</title>',
 u'    <title>Adventure</title>',
 u'    <title>Amaltheia</title>',
 u'    <title>Analysis of Variance</title>',
 u'    <title>Asia</title>',
 u'    <title>Aruba</title>',
 u'    <title>Articles of Confederation</title>',
 u'    <title>Archaeology/Broch</title>',
 u'    <title>Asia Minor (disambiguation)</title>',
 u'    <title>Aa River</title>',
 u'    <title>Atlantic Ocean</title>',
 u'    <title>Arthur Schopenhauer</title>',
 u'    <title>Angola</title>',
 u'    <title>Demographics of Angola</title>',
 u'    <title>Politics of Angola</title>',
 u'    <title>Economy of Angola</title>',
 u'    <title>Transport in Angola</title>',
 u'    <title>Angolan Armed Forces</title>',
 u'    <title>Foreign relations of Angola</title>',
 u'    <title>Albert Sidney Johnston</title>',
 u'    <title>Android (robot)</title>',
 u'    <title>Alberta</title>',
 u'    <title>Wikipedia:Adding Wikipedia articles to Nupedia</title>',
 u'    <title>Astronomy/History</title>',
 u'    <title>List of anthropologists</title>',
 u'    <title>Astronomy and Astrophysics/History</title>',
 u'    <title>Actinopterygii</title>',
 u'    <title>Al Gore/Criticisms</title>',
 u'    <title>Albert Einstein</title>',
 u'    <title>Afghanistan</title>',
 u'    <title>Albania</title>',
 u'    <title>Allah</title>',
 u'    <title>Algorithms (journal)</title>',
 u'    <title>Antigua And Barbuda</title>',
 u'    <title>Azerbaijan</title>',
 u'    <title>Amateur astronomy</title>',
 u'    <title>Astronomers and Astrophysicists</title>',
 u'    <title>Aikido</title>',
 u'    <title>Art</title>',
 u'    <title>Albania/History</title>',
 u'    <title>Albania/Transnational Issues</title>',
 u'    <title>Albania/People</title>',
 u'    <title>Albania/Foreign relations</title>',
 u'    <title>Agnostida</title>',
 u'    <title>Abortion</title>']

To get more from the wikipedia xml files you need to seperate the lines for each xml file and then parse the xml to something spark can process.

In [ ]: