In [1]:
sc
Out[1]:
<pyspark.context.SparkContext at 0x100478710>
In [18]:
from pyspark.sql.types import *
from IPython.core.magic import register_line_cell_magic
In [19]:
# Configuration parameters
max_show_lines = 50         # Limit on the number of lines to show with %sql_show and %sql_display
detailed_explain = True    
In [44]:
@register_line_cell_magic
def sql(line, cell=None):
    "Return a Spark DataFrame for lazy evaluation of the sql. Use: %sql or %%sql"
    val = cell if cell is not None else line 
    print val
    return sqlContext.sql(val)

@register_line_cell_magic
def sql_show(line, cell=None):
    "Execute sql and show the first max_show_lines lines. Use: %sql_show or %%sql_show"
    val = cell if cell is not None else line 
    return sqlContext.sql(val).show(max_show_lines) 

@register_line_cell_magic
def sql_display(line, cell=None):
    """Execute sql and convert results to Pandas DataFrame for pretty display or further processing.
    Use: %sql_display or %%sql_display"""
    val = cell if cell is not None else line 
    return sqlContext.sql(val).limit(max_show_lines).toPandas() 

@register_line_cell_magic
def sql_explain(line, cell=None):
    "Display the execution plan of the sql. Use: %sql_explain or %%sql_explain"
    val = cell if cell is not None else line 
    return sqlContext.sql(val).explain(detailed_explain)
In [21]:
sqlCtx = SQLContext(sc)
In [23]:
import csv
with open("/Users/dennisgannon/Desktop/hvac.csv", 'rb') as csvfile:
    spamreader = csv.reader(csvfile) #, delimiter=',', quotechar='|')
    for row in spamreader:
        print ', '.join(row)
3/23/2016, 11:45, 67, 54, headquarters
3/23/2016, 11:51, 67, 77, lab1
3/23/2016, 11:20, 67, 33, coldroom
In [24]:
hvacText = sc.textFile("/Users/dennisgannon/Desktop/hvac.csv")
In [25]:
hvacSchema = StructType([StructField("date", StringType(), False),StructField("time", StringType(), False),StructField("targettemp", IntegerType(), False),StructField("actualtemp", IntegerType(), False),StructField("buildingID", StringType(), False)])
In [26]:
hvacSchema
Out[26]:
StructType(List(StructField(date,StringType,false),StructField(time,StringType,false),StructField(targettemp,IntegerType,false),StructField(actualtemp,IntegerType,false),StructField(buildingID,StringType,false)))
In [27]:
hvac = hvacText.map(lambda s: s.split(",")).filter(lambda s: s[0] != "Date").map(lambda s:(str(s[0]), str(s[1]), int(s[2]), int(s[3]), str(s[4]) ))
In [28]:
hvac.collect()
Out[28]:
[('3/23/2016', '11:45', 67, 54, 'headquarters'),
 ('3/23/2016', '11:51', 67, 77, 'lab1'),
 ('3/23/2016', '11:20', 67, 33, 'coldroom')]
In [29]:
hvacdf = sqlContext.createDataFrame(hvac,hvacSchema)
In [30]:
 hvacdf.registerTempTable("hvac")
In [31]:
hvacdf
Out[31]:
DataFrame[date: string, time: string, targettemp: int, actualtemp: int, buildingID: string]
In [47]:
%%sql_show
SELECT buildingID , (targettemp - actualtemp) AS temp_diff, date FROM hvac WHERE date = "3/23/2016"
+------------+---------+---------+
|  buildingID|temp_diff|     date|
+------------+---------+---------+
|headquarters|       13|3/23/2016|
|        lab1|      -10|3/23/2016|
|    coldroom|       34|3/23/2016|
+------------+---------+---------+

In [33]:
x = sqlContext.sql(' SELECT buildingID , (targettemp - actualtemp) AS temp_diff, date FROM hvac WHERE date = "3/23/2016" ')
In [34]:
#x = sqlContext.sql(' SELECT buildingID FROM hvac ')
In [35]:
x.collect()
Out[35]:
[Row(buildingID=u'headquarters', temp_diff=13, date=u'3/23/2016'),
 Row(buildingID=u'lab1', temp_diff=-10, date=u'3/23/2016'),
 Row(buildingID=u'coldroom', temp_diff=34, date=u'3/23/2016')]
In [49]:
x.registerTempTable("tempdiffs")
In [53]:
%%sql_show
SElECT buildingID, temp_diff from tempdiffs where temp_diff < 0
+----------+---------+
|buildingID|temp_diff|
+----------+---------+
|      lab1|      -10|
+----------+---------+

In [25]:
 
In [ ]: