The problem here is to predict the grade (Passing or Failing) of City of Chicago restaurant inspections based on the notes made by the inspector.
This uses logistic regression. Suppose you have a set of feature vectors $x_i \in R^n$ for $i$ in in $[0,m]$. Associated with each feature vector we have a binary result $y_i$. We are interested in the probability $P(y =1 | x)$ which we write as the function $p(x)$. However because $p(x)$ is between 0 and 1 it is not expressable as a linear function of x so we can't use regular linear regression, so we look at the odds expression $p(x) / (1-p(x))$ and make the guess that its log is linear. In other words
$$ ln( \frac{p(x)}{1-p(x)}) = b_0 + b \cdot x$$where the offset $b_0$ and the vector $b = [b_1, b_2, ... b_n]$ define a hyperplane for linear regression. solving this for $p(x)$ we get
$$p(x) = \frac {1}{1+e^{-(b_0 + b \cdot x)}} $$And we say $y=1$ if $p(x)>0$ otherwise it is zero. Unfortunately finding the best $b_0$ and $b$ is not as easy as straight linear regression, but simple Newton like iterations will converge to good solutions.
We note that the logistic function $\sigma (t)$ is defined as follows:
$$\sigma (t)= \frac {e^t}{e^{t}+1} =\frac {1}{1+e^{-t}}$$It is used frequently in machine learning to map a real number into a probabilty range $[0,1]$ .
sc
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.sql import Row
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import *
def csvParse(s):
import csv
from StringIO import StringIO
sio = StringIO(s)
value = csv.reader(sio).next()
sio.close()
return value
This notebook will run on spark on your laptop.
#inspections = spark.sparkContext.textFile('wasb:///HdiSamples/HdiSamples/FoodInspectionData/Food_Inspections1.csv')\
# .map(csvParse)
inspections = spark.sparkContext.textFile('/users/dennisgannon/OneDrive/Docs7/Food_Inspections1.csv')\
.map(csvParse)
inspections.count()
inspections.take(10)
schema = StructType([StructField("id", IntegerType(), False),
StructField("name", StringType(), False),
StructField("results", StringType(), False),
StructField("violations", StringType(), True)])
df = spark.createDataFrame(inspections.map(lambda l: (int(l[0]), l[2], l[3], l[4])) , schema)
df.registerTempTable('CountResults')
df.show(5)
print("passing = %d"%df[df.results == 'Pass'].count())
print("failing = %d"%df[df.results == 'Fail'].count())
df.count()
df.select('results').distinct().show()
#%%sql -o count_results_df
count_results_df = spark.sql("SELECT results, COUNT(results) AS cnt FROM \
CountResults GROUP BY results").toPandas()
count_results_df
%matplotlib inline
import matplotlib.pyplot as plt
labels = count_results_df['results']
sizes = count_results_df['cnt']
colors = ['turquoise', 'seagreen', 'mediumslateblue', 'palegreen', 'coral']
plt.pie(sizes, labels=labels, autopct='%1.1f%%', colors=colors)
plt.axis('equal')
def labelForResults(s):
if s == 'Fail':
return 0.0
elif s == 'Pass w/ Conditions' or s == 'Pass':
return 1.0
else:
return -1.0
label = UserDefinedFunction(labelForResults, DoubleType())
labeledData = df.select(label(df.results).alias('label'), df.violations).where('label >= 0')
labeledData.take(1)
tokenizer = Tokenizer(inputCol="violations", outputCol="words")
#hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
hashingTF = HashingTF(inputCol="words", outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.01)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
model = pipeline.fit(labeledData)
model
hashingTF.getNumFeatures()
testData = spark.sparkContext.textFile('/users/dennisgannon/OneDrive/Docs7/Food_Inspections2.csv')\
.map(csvParse) \
.map(lambda l: (int(l[0]), l[2], l[3], l[4]))
testDf = spark.createDataFrame(testData, schema).where("results = 'Fail' OR results = 'Pass' OR results = 'Pass w/ Conditions'")
predictionsDf = model.transform(testDf)
predictionsDf.registerTempTable('Predictions')
predictionsDf.columns
predictionsDf.take(1)
numSuccesses = predictionsDf.where("""(prediction = 0 AND results = 'Fail') OR
(prediction = 1 AND (results = 'Pass' OR
results = 'Pass w/ Conditions'))""").count()
numInspections = predictionsDf.count()
print "There were", numInspections, "inspections and there were", numSuccesses, "successful predictions"
print "This is a", str((float(numSuccesses) / float(numInspections)) * 100) + "%", "success rate"
from pyspark.sql.types import *
from IPython.core.magic import register_line_cell_magic
# Configuration parameters
max_show_lines = 50 # Limit on the number of lines to show with %sql_show and %sql_display
detailed_explain = True
@register_line_cell_magic
def sql(line, cell=None):
"Return a Spark DataFrame for lazy evaluation of the sql. Use: %sql or %%sql"
val = cell if cell is not None else line
print val
return sqlContext.sql(val)
@register_line_cell_magic
def sql_show(line, cell=None):
"Execute sql and show the first max_show_lines lines. Use: %sql_show or %%sql_show"
val = cell if cell is not None else line
return sqlContext.sql(val).show(max_show_lines)
@register_line_cell_magic
def sql_display(line, cell=None):
"""Execute sql and convert results to Pandas DataFrame for pretty display or further processing.
Use: %sql_display or %%sql_display"""
val = cell if cell is not None else line
return sqlContext.sql(val).limit(max_show_lines).toPandas()
#%%sql -q -o true_positive
#SELECT count(*) AS cnt FROM Predictions WHERE prediction = 0 AND results = 'Fail'
true_negative = spark.sql("SELECT count(*) AS cnt FROM Predictions WHERE \
(prediction = 0 AND results = 'Fail')").toPandas()
#%%sql -q -o false_positive
false_negative = spark.sql("SELECT count(*) AS cnt FROM Predictions \
WHERE prediction = 0 AND (results = 'Pass' OR results = 'Pass w/ Conditions')").toPandas()
#%%sql -q -o true_negative
false_positive = spark.sql("SELECT count(*) AS cnt FROM Predictions WHERE \
prediction = 1 AND results = 'Fail' ").toPandas()
#%%sql -q -o false_negative
true_positive = spark.sql("SELECT count(*) AS cnt FROM Predictions WHERE \
prediction = 1 AND (results = 'Pass' OR results = 'Pass w/ Conditions')").toPandas()
false_negative['cnt']
%matplotlib inline
import matplotlib.pyplot as plt
labels = ['True positive', 'False positive', 'True negative', 'False negative']
sizes = [true_positive['cnt'], false_positive['cnt'], false_negative['cnt'], true_negative['cnt']]
colors = ['turquoise', 'seagreen', 'mediumslateblue', 'palegreen', 'coral']
plt.pie(sizes, labels=labels, autopct='%10.1f%%', colors=colors)
plt.axis('equal')
Precision and recall are then defined as:
$$Precision=\frac {tp}{tp+fp}$$$$ Recall = \frac {tp}{tp+fn} $$Precision is the probability that a (randomly selected) positive prediction is correct.
Recall is the probability that a (randomly selected) resturant with a passing grade is predicted to be passing.
print('so precision = %f'% \
(float(true_positive['cnt'])/(float(true_positive['cnt'])+float(false_positive['cnt']))))
print('and recall = %f'% \
(float(true_positive['cnt'])/(float(true_positive['cnt'])+float(false_negative['cnt']))))
If we do this another way, we can ask how accurate are we in finding the failing resturants? This is a bit harder because there are far fewer of them. In this case we are interested in true-negatives, so
Precision is the probability that a (randomly selected) negative prediction is correct.
Recall is the probability that a (randomly selected) resturant with a failing grade is predicted to be failing.
$$Precision=\frac {tn}{tn+fn}$$$$ Recall = \frac {tn}{tn+fp} $$print('so the precision of failure prediction = %f'% \
(float(true_negative['cnt'])/(float(true_negative['cnt'])+float(false_negative['cnt']))))
print('and recall is = %f'% \
(float(true_negative['cnt'])/(float(true_negative['cnt'])+float(false_positive['cnt']))))