sc
From Luca Canali's blog http://externaltable.blogspot.com/2016/11/ipythonjupyter-sql-magic-functions-for.html
from pyspark.sql.types import *
from IPython.core.magic import register_line_cell_magic
# Configuration parameters
max_show_lines = 50 # Limit on the number of lines to show with %sql_show and %sql_display
detailed_explain = True
@register_line_cell_magic
def sql(line, cell=None):
"Return a Spark DataFrame for lazy evaluation of the sql. Use: %sql or %%sql"
val = cell if cell is not None else line
print val
return sqlContext.sql(val)
@register_line_cell_magic
def sql_show(line, cell=None):
"Execute sql and show the first max_show_lines lines. Use: %sql_show or %%sql_show"
val = cell if cell is not None else line
return sqlContext.sql(val).show(max_show_lines)
@register_line_cell_magic
def sql_display(line, cell=None):
"""Execute sql and convert results to Pandas DataFrame for pretty display or further processing.
Use: %sql_display or %%sql_display"""
val = cell if cell is not None else line
return sqlContext.sql(val).limit(max_show_lines).toPandas()
@register_line_cell_magic
def sql_explain(line, cell=None):
"Display the execution plan of the sql. Use: %sql_explain or %%sql_explain"
val = cell if cell is not None else line
return sqlContext.sql(val).explain(detailed_explain)
sqlCtx = SQLContext(sc)
import csv
with open("/Users/dennisgannon/Desktop/hvac.csv", 'rb') as csvfile:
spamreader = csv.reader(csvfile) #, delimiter=',', quotechar='|')
for row in spamreader:
print ', '.join(row)
hvacText = sc.textFile("/Users/dennisgannon/Desktop/hvac.csv")
hvacSchema = StructType([StructField("date", StringType(), False),StructField("time", StringType(), False),StructField("targettemp", IntegerType(), False),StructField("actualtemp", IntegerType(), False),StructField("buildingID", StringType(), False)])
hvacSchema
hvac = hvacText.map(lambda s: s.split(",")).filter(lambda s: s[0] != "Date").map(lambda s:(str(s[0]), str(s[1]), int(s[2]), int(s[3]), str(s[4]) ))
hvac.collect()
hvacdf = sqlContext.createDataFrame(hvac,hvacSchema)
hvacdf.registerTempTable("hvac")
hvacdf
%%sql_show
SELECT buildingID , (targettemp - actualtemp) AS temp_diff, date FROM hvac WHERE date = "3/23/2016"
x = sqlContext.sql(' SELECT buildingID , (targettemp - actualtemp) AS temp_diff, date FROM hvac WHERE date = "3/23/2016" ')
#x = sqlContext.sql(' SELECT buildingID FROM hvac ')
x.collect()
x.registerTempTable("tempdiffs")
%%sql_show
SElECT buildingID, temp_diff from tempdiffs where temp_diff < 0