PySpark Cluster Analysis on Weather Data¶

CIML Summer Institute¶

UC San Diego¶


Resources:

  • Spark DataFrame Guide: https://spark.apache.org/docs/latest/sql-programming-guide.html
  • PySpark API Documentation: https://spark.apache.org/docs/latest/api/python/index.html
  • PySpark Cheat Sheet PDF

Setup¶

In [ ]:
# Start Spark session
import pyspark
from pyspark.sql import SparkSession

conf = pyspark.SparkConf().setAll([('spark.master', 'local[2]'),
                                   ('spark.app.name', 'PySpark Cluster Analysis')])
spark = SparkSession.builder.config(conf=conf).getOrCreate()

print (spark.version)
print (pyspark.version)
In [ ]:
# Import modules
import matplotlib.pyplot as plt
import pandas as pd

# Show plots in notebook
%matplotlib inline

Read in data¶

Data source: http://hpwren.ucsd.edu

In [ ]:
from pyspark.sql.types import StructType, StructField 
from pyspark.sql.types import StringType, IntegerType, DoubleType, TimestampType

# Specify schema
schema = StructType ([ \
    StructField ("rowID",IntegerType(),True), \
    StructField ("hpwren_timestamp",TimestampType(),True), \
    StructField ("air_pressure",DoubleType(),True), \
    StructField ("air_temp",DoubleType(),True), \
    StructField ("avg_wind_direction",DoubleType(),True), \
    StructField ("avg_wind_speed",DoubleType(),True), \
    StructField ("max_wind_direction",DoubleType(),True), \
    StructField ("max_wind_speed",DoubleType(),True), \
    StructField ("min_wind_direction",DoubleType(),True), \
    StructField ("min_wind_speed",DoubleType(),True), \
    StructField ("rain_accumulation",DoubleType(),True), \
    StructField ("rain_duration",DoubleType(),True), \
    StructField ("relative_humidity",DoubleType(),True)
])
In [ ]:
# Read in data and put in Spark DataFrame

from os.path import expanduser
HOME = expanduser("~")

# Set input file
data_path = HOME + '/data/'
# ==> YOUR CODE HERE

df = spark.read.csv (inputfile, header=True, schema=schema).cache()

Explore data¶

Print schema¶

In [ ]:
==> YOUR CODE HERE

Count rows¶

In [ ]:
==> YOUR CODE HERE

Show summary statistics¶

In [ ]:
# Use describe().  Can convert to pandas for nicer output.
==> YOUR CODE HERE

Prepare data¶

Drop nulls¶

In [ ]:
# Drop NAs, then get count of rows.  Save the results in a new dataframe.
==> YOUR CODE HERE

Create feature vector¶

In [ ]:
from pyspark.ml.feature import VectorAssembler

featuresUsed = ['air_pressure', 'air_temp', 'avg_wind_direction', 'avg_wind_speed', 'max_wind_direction', 
        'max_wind_speed','relative_humidity']
assembler = VectorAssembler(inputCols=featuresUsed, outputCol="features_unscaled")
assembled = assembler.transform(workingDF)
In [ ]:
# Show first row of assembled data
==> YOUR CODE HERE

Scale data¶

In [ ]:
from pyspark.ml.feature import StandardScaler

scaler = StandardScaler(inputCol="features_unscaled", outputCol="features", withStd=True, withMean=True)
scalerModel = scaler.fit(assembled)
scaledData = scalerModel.transform(assembled)
In [ ]:
# Show first row of scaled data
==> YOUR CODE HERE

Perform cluster analysis¶

Generate elbow plot to determine value(s) for k¶

In [ ]:
%%time
import utils

# Only need to run this once to find value(s) to try for k
# Set to False if already know value for k
create_elbow_plot = True 

# Get elbow plot using subset of data
if create_elbow_plot == True:
    sampledData = scaledData.filter((scaledData.rowID % 20) == 0).select("features").cache() 
    k_attempts = range(5,15)
    print('Trying k from {} to {} with {} samples\n'.format(list(k_attempts)[0],
                                                          list(k_attempts)[-1], 
                                                          sampledData.count()))
    wsseList = utils.elbow(sampledData, k_attempts)
    utils.elbow_plot(wsseList, k_attempts)

Perform Clustering Using K-Means¶

In [ ]:
scaledData.printSchema()
In [ ]:
from pyspark.ml.clustering import KMeans

scaledDataFeat = scaledData.select("features").cache()

# Set number of clusters
nClusters = 11

kmeans = KMeans(k=nClusters, seed=1)

# Fit model to scaledDataFeat.  Save fitted model as 'model'
# ==> YOUR CODE HERE
In [ ]:
# Get model's cluster centers
==> YOUR CODE HERE

# Show cluster centers
pd.DataFrame(centers,columns=featuresUsed)
In [ ]:
# Show cluster sizes 

model.summary.clusterSizes

Generate cluster profile plots¶

In [ ]:
centersNamed = utils.pd_centers(featuresUsed,centers)
print(centersNamed.columns.values)

Profiles for All Clusters¶

This is a parallel plot to show the cluster centers for all clusters.
Data samples were standardized to have 0-mean and 1-standard-deviation before k-means was applied, so values of cluster centers can be interpreted as standard deviations from the mean.

In [ ]:
numClusters = len(centersNamed.index)
colors_used = utils.parallel_plot(centersNamed, numClusters)

Clusters Capturing Dry Days¶

Clusters with lower-than-average relative_humidity values capture dry days

In [ ]:
utils.parallel_plot(centersNamed[centersNamed['relative_humidity'] < -0.5], 
                   numClusters, colors=colors_used);

Clusters Capturing Humid Days¶

Clusters with higher-than-average relative_humidity values capture humid days

In [ ]:
==> YOUR CODE HERE

Clusters Capturing Hot Days¶

Use air_temp

In [ ]:
==> YOUR CODE HERE

Clusters Capturing Windy Days¶

Use max_wind_speed

In [ ]:
==> YOUR CODE HERE

Save Model¶

In [ ]:
# Specify file name
==> YOUR CODE HERE

model.write().overwrite().save(model_file)

Stop Spark session¶

In [ ]:
==> YOUR CODE HERE
In [ ]: