PySpark Cluster Analysis on Weather Data¶
CIML Summer Institute¶
UC San Diego¶
Resources:
- Spark DataFrame Guide: https://spark.apache.org/docs/latest/sql-programming-guide.html
- PySpark API Documentation: https://spark.apache.org/docs/latest/api/python/index.html
- PySpark Cheat Sheet PDF
Setup¶
In [ ]:
# Start Spark session
import pyspark
from pyspark.sql import SparkSession
conf = pyspark.SparkConf().setAll([('spark.master', 'local[2]'),
('spark.app.name', 'PySpark Cluster Analysis')])
spark = SparkSession.builder.config(conf=conf).getOrCreate()
print (spark.version)
print (pyspark.version)
In [ ]:
# Import modules
import matplotlib.pyplot as plt
import pandas as pd
# Show plots in notebook
%matplotlib inline
Read in data¶
Data source: http://hpwren.ucsd.edu
In [ ]:
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import StringType, IntegerType, DoubleType, TimestampType
# Specify schema
schema = StructType ([ \
StructField ("rowID",IntegerType(),True), \
StructField ("hpwren_timestamp",TimestampType(),True), \
StructField ("air_pressure",DoubleType(),True), \
StructField ("air_temp",DoubleType(),True), \
StructField ("avg_wind_direction",DoubleType(),True), \
StructField ("avg_wind_speed",DoubleType(),True), \
StructField ("max_wind_direction",DoubleType(),True), \
StructField ("max_wind_speed",DoubleType(),True), \
StructField ("min_wind_direction",DoubleType(),True), \
StructField ("min_wind_speed",DoubleType(),True), \
StructField ("rain_accumulation",DoubleType(),True), \
StructField ("rain_duration",DoubleType(),True), \
StructField ("relative_humidity",DoubleType(),True)
])
In [ ]:
# Read in data and put in Spark DataFrame
from os.path import expanduser
HOME = expanduser("~")
# Set input file
data_path = HOME + '/data/'
# ==> YOUR CODE HERE
df = spark.read.csv (inputfile, header=True, schema=schema).cache()
Explore data¶
Print schema¶
In [ ]:
==> YOUR CODE HERE
Count rows¶
In [ ]:
==> YOUR CODE HERE
Show summary statistics¶
In [ ]:
# Use describe(). Can convert to pandas for nicer output.
==> YOUR CODE HERE
Prepare data¶
Drop nulls¶
In [ ]:
# Drop NAs, then get count of rows. Save the results in a new dataframe.
==> YOUR CODE HERE
Create feature vector¶
In [ ]:
from pyspark.ml.feature import VectorAssembler
featuresUsed = ['air_pressure', 'air_temp', 'avg_wind_direction', 'avg_wind_speed', 'max_wind_direction',
'max_wind_speed','relative_humidity']
assembler = VectorAssembler(inputCols=featuresUsed, outputCol="features_unscaled")
assembled = assembler.transform(workingDF)
In [ ]:
# Show first row of assembled data
==> YOUR CODE HERE
Scale data¶
In [ ]:
from pyspark.ml.feature import StandardScaler
scaler = StandardScaler(inputCol="features_unscaled", outputCol="features", withStd=True, withMean=True)
scalerModel = scaler.fit(assembled)
scaledData = scalerModel.transform(assembled)
In [ ]:
# Show first row of scaled data
==> YOUR CODE HERE
Perform cluster analysis¶
Generate elbow plot to determine value(s) for k¶
In [ ]:
%%time
import utils
# Only need to run this once to find value(s) to try for k
# Set to False if already know value for k
create_elbow_plot = True
# Get elbow plot using subset of data
if create_elbow_plot == True:
sampledData = scaledData.filter((scaledData.rowID % 20) == 0).select("features").cache()
k_attempts = range(5,15)
print('Trying k from {} to {} with {} samples\n'.format(list(k_attempts)[0],
list(k_attempts)[-1],
sampledData.count()))
wsseList = utils.elbow(sampledData, k_attempts)
utils.elbow_plot(wsseList, k_attempts)
Perform Clustering Using K-Means¶
In [ ]:
scaledData.printSchema()
In [ ]:
from pyspark.ml.clustering import KMeans
scaledDataFeat = scaledData.select("features").cache()
# Set number of clusters
nClusters = 11
kmeans = KMeans(k=nClusters, seed=1)
# Fit model to scaledDataFeat. Save fitted model as 'model'
# ==> YOUR CODE HERE
In [ ]:
# Get model's cluster centers
==> YOUR CODE HERE
# Show cluster centers
pd.DataFrame(centers,columns=featuresUsed)
In [ ]:
# Show cluster sizes
model.summary.clusterSizes
Generate cluster profile plots¶
In [ ]:
centersNamed = utils.pd_centers(featuresUsed,centers)
print(centersNamed.columns.values)
Profiles for All Clusters¶
This is a parallel plot to show the cluster centers for all clusters.
Data samples were standardized to have 0-mean and 1-standard-deviation before k-means was applied, so values of cluster centers can be interpreted as standard deviations from the mean.
In [ ]:
numClusters = len(centersNamed.index)
colors_used = utils.parallel_plot(centersNamed, numClusters)
Clusters Capturing Dry Days¶
Clusters with lower-than-average relative_humidity values capture dry days
In [ ]:
utils.parallel_plot(centersNamed[centersNamed['relative_humidity'] < -0.5],
numClusters, colors=colors_used);
Clusters Capturing Humid Days¶
Clusters with higher-than-average relative_humidity values capture humid days
In [ ]:
==> YOUR CODE HERE
Clusters Capturing Hot Days¶
Use air_temp
In [ ]:
==> YOUR CODE HERE
Clusters Capturing Windy Days¶
Use max_wind_speed
In [ ]:
==> YOUR CODE HERE
Save Model¶
In [ ]:
# Specify file name
==> YOUR CODE HERE
model.write().overwrite().save(model_file)
Stop Spark session¶
In [ ]:
==> YOUR CODE HERE
In [ ]: