PySpark Cluster Analysis on Weather Data¶

CIML Summer Institute¶

UC San Diego


Resources:

  • Spark DataFrame Guide: https://spark.apache.org/docs/latest/sql-programming-guide.html
  • PySpark API Documentation: https://spark.apache.org/docs/latest/api/python/index.html
  • PySpark Cheat Sheet PDF

Setup¶

In [1]:
# Start Spark session
import pyspark
from pyspark.sql import SparkSession

conf = pyspark.SparkConf().setAll([('spark.master', 'local[2]'),
                                   ('spark.app.name', 'PySpark Cluster Analysis')])
spark = SparkSession.builder.config(conf=conf).getOrCreate()

print (spark.version)
print (pyspark.version)
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/05/26 20:10:21 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
3.2.1
<module 'pyspark.version' from '/opt/spark/3.2.1/hadoop/3.2/python/lib/pyspark.zip/pyspark/version.py'>
In [2]:
# Import modules
import matplotlib.pyplot as plt
import pandas as pd

# Show plots in notebook
%matplotlib inline

Read in data¶

Data source: http://hpwren.ucsd.edu

In [3]:
from pyspark.sql.types import StructType, StructField 
from pyspark.sql.types import StringType, IntegerType, DoubleType, TimestampType

# Specify schema
schema = StructType ([ \
    StructField ("rowID",IntegerType(),True), \
    StructField ("hpwren_timestamp",TimestampType(),True), \
    StructField ("air_pressure",DoubleType(),True), \
    StructField ("air_temp",DoubleType(),True), \
    StructField ("avg_wind_direction",DoubleType(),True), \
    StructField ("avg_wind_speed",DoubleType(),True), \
    StructField ("max_wind_direction",DoubleType(),True), \
    StructField ("max_wind_speed",DoubleType(),True), \
    StructField ("min_wind_direction",DoubleType(),True), \
    StructField ("min_wind_speed",DoubleType(),True), \
    StructField ("rain_accumulation",DoubleType(),True), \
    StructField ("rain_duration",DoubleType(),True), \
    StructField ("relative_humidity",DoubleType(),True)
])
In [4]:
# Read in data and put in Spark DataFrame

from os.path import expanduser
HOME = expanduser("~")

# Set input file
data_path = HOME + '/data/'
# ==> YOUR CODE HERE

df = spark.read.csv (inputfile, header=True, schema=schema).cache()

Explore data¶

Print schema¶

In [5]:
# ==> YOUR CODE HERE
root
 |-- rowID: integer (nullable = true)
 |-- hpwren_timestamp: timestamp (nullable = true)
 |-- air_pressure: double (nullable = true)
 |-- air_temp: double (nullable = true)
 |-- avg_wind_direction: double (nullable = true)
 |-- avg_wind_speed: double (nullable = true)
 |-- max_wind_direction: double (nullable = true)
 |-- max_wind_speed: double (nullable = true)
 |-- min_wind_direction: double (nullable = true)
 |-- min_wind_speed: double (nullable = true)
 |-- rain_accumulation: double (nullable = true)
 |-- rain_duration: double (nullable = true)
 |-- relative_humidity: double (nullable = true)

Count rows¶

In [6]:
# ==> YOUR CODE HERE
Out[6]:
1587257

Show summary statistics¶

In [7]:
# Use describe().  Can convert to pandas for nicer output.
# ==> YOUR CODE HERE
Out[7]:
0 1 2 3 4
summary count mean stddev min max
rowID 1587257 793628.0 458201.7724491034 0 1587256
air_pressure 1587257 916.8301266904355 3.0515931266797334 905.0 929.5
air_temp 1587257 61.85144042833238 11.833623786835483 31.64 99.5
avg_wind_direction 1586824 161.96537927331576 95.20811970204007 0.0 359.0
avg_wind_speed 1586824 2.774272067979729 2.060757793563034 0.0 32.3
max_wind_direction 1586824 163.40304784903682 92.3672342806411 0.0 359.0
max_wind_speed 1586824 3.3998134008569094 2.4231674336171545 0.1 36.0
min_wind_direction 1586824 166.82637078844283 97.4627462007766 0.0 359.0
min_wind_speed 1586824 2.1331304542923206 1.7453450849327021 0.0 32.0
rain_accumulation 1587256 0.0018548362708977345 0.9609715682155956 0.0 655.01
rain_duration 1587256 0.5361460281139274 81.1476564505365 0.0 63305.0
relative_humidity 1587257 47.60836877707868 26.21454390649868 0.7 93.0

Prepare data¶

Drop nulls¶

In [8]:
# Drop NAs, then get count of rows.  Save the results in a new dataframe.
# ==> YOUR CODE HERE
Out[8]:
1586823

Create feature vector¶

In [9]:
from pyspark.ml.feature import VectorAssembler

featuresUsed = ['air_pressure', 'air_temp', 'avg_wind_direction', 'avg_wind_speed', 'max_wind_direction', 
        'max_wind_speed','relative_humidity']
assembler = VectorAssembler(inputCols=featuresUsed, outputCol="features_unscaled")
assembled = assembler.transform(workingDF)
In [10]:
# Show first row of assembled data
# ==> YOUR CODE HERE
+-----+-------------------+------------+--------+------------------+--------------+------------------+--------------+------------------+--------------+-----------------+-------------+-----------------+--------------------+
|rowID|   hpwren_timestamp|air_pressure|air_temp|avg_wind_direction|avg_wind_speed|max_wind_direction|max_wind_speed|min_wind_direction|min_wind_speed|rain_accumulation|rain_duration|relative_humidity|   features_unscaled|
+-----+-------------------+------------+--------+------------------+--------------+------------------+--------------+------------------+--------------+-----------------+-------------+-----------------+--------------------+
|    1|2011-09-10 00:01:49|       912.3|   63.86|             161.0|           0.8|             215.0|           1.5|              43.0|           0.2|              0.0|          0.0|             39.9|[912.3,63.86,161....|
+-----+-------------------+------------+--------+------------------+--------------+------------------+--------------+------------------+--------------+-----------------+-------------+-----------------+--------------------+
only showing top 1 row

Scale data¶

In [11]:
from pyspark.ml.feature import StandardScaler

scaler = StandardScaler(inputCol="features_unscaled", outputCol="features", withStd=True, withMean=True)
scalerModel = scaler.fit(assembled)
scaledData = scalerModel.transform(assembled)
In [12]:
# Show first row of scaled data
# ==> YOUR CODE HERE
+-----+-------------------+------------+--------+------------------+--------------+------------------+--------------+------------------+--------------+-----------------+-------------+-----------------+--------------------+--------------------+
|rowID|   hpwren_timestamp|air_pressure|air_temp|avg_wind_direction|avg_wind_speed|max_wind_direction|max_wind_speed|min_wind_direction|min_wind_speed|rain_accumulation|rain_duration|relative_humidity|   features_unscaled|            features|
+-----+-------------------+------------+--------+------------------+--------------+------------------+--------------+------------------+--------------+-----------------+-------------+-----------------+--------------------+--------------------+
|    1|2011-09-10 00:01:49|       912.3|   63.86|             161.0|           0.8|             215.0|           1.5|              43.0|           0.2|              0.0|          0.0|             39.9|[912.3,63.86,161....|[-1.4845928293047...|
+-----+-------------------+------------+--------+------------------+--------------+------------------+--------------+------------------+--------------+-----------------+-------------+-----------------+--------------------+--------------------+
only showing top 1 row

Perform cluster analysis¶

Generate elbow plot to determine value(s) for k¶

In [13]:
%%time
import utils

# Only need to run this once to find value(s) to try for k
# Set to False if already know value for k
create_elbow_plot = True 

# Get elbow plot using subset of data
if create_elbow_plot == True:
    sampledData = scaledData.filter((scaledData.rowID % 20) == 0).select("features").cache() 
    k_attempts = range(5,15)
    print('Trying k from {} to {} with {} samples\n'.format(list(k_attempts)[0],
                                                          list(k_attempts)[-1], 
                                                          sampledData.count()))
    wsseList = utils.elbow(sampledData, k_attempts)
    utils.elbow_plot(wsseList, k_attempts)
Trying k from 5 to 14 with 79342 samples

Training for cluster size 5 
......................WSSE = 132090.8626590807 
Training for cluster size 6 
......................WSSE = 127840.72179653986 
Training for cluster size 7 
......................WSSE = 123273.3903919039 
Training for cluster size 8 
......................WSSE = 117557.05041159592 
Training for cluster size 9 
......................WSSE = 114697.17601240409 
Training for cluster size 10 
......................WSSE = 110832.24108164778 
Training for cluster size 11 
......................WSSE = 106975.96733059295 
Training for cluster size 12 
......................WSSE = 106209.23046228987 
Training for cluster size 13 
......................WSSE = 103336.86818217761 
Training for cluster size 14 
......................WSSE = 101171.40176944171 
CPU times: user 171 ms, sys: 45.1 ms, total: 216 ms
Wall time: 59.4 s
No description has been provided for this image

Perform Clustering Using K-Means¶

In [14]:
scaledData.printSchema()
root
 |-- rowID: integer (nullable = true)
 |-- hpwren_timestamp: timestamp (nullable = true)
 |-- air_pressure: double (nullable = true)
 |-- air_temp: double (nullable = true)
 |-- avg_wind_direction: double (nullable = true)
 |-- avg_wind_speed: double (nullable = true)
 |-- max_wind_direction: double (nullable = true)
 |-- max_wind_speed: double (nullable = true)
 |-- min_wind_direction: double (nullable = true)
 |-- min_wind_speed: double (nullable = true)
 |-- rain_accumulation: double (nullable = true)
 |-- rain_duration: double (nullable = true)
 |-- relative_humidity: double (nullable = true)
 |-- features_unscaled: vector (nullable = true)
 |-- features: vector (nullable = true)

In [15]:
from pyspark.ml.clustering import KMeans

scaledDataFeat = scaledData.select("features").cache()

# Set number of clusters
nClusters = 11

kmeans = KMeans(k=nClusters, seed=1)

# Fit model to scaledDataFeat.  Save fitted model as 'model'
# ==> YOUR CODE HERE
In [16]:
# Get model's cluster centers
# ==> YOUR CODE HERE

# Show cluster centers
pd.DataFrame(centers,columns=featuresUsed)
Out[16]:
air_pressure air_temp avg_wind_direction avg_wind_speed max_wind_direction max_wind_speed relative_humidity
0 1.171256 -0.251789 -1.154951 2.166273 -1.054745 2.289235 -1.134940
1 0.463010 -0.944084 0.752266 -0.534278 0.665727 -0.513820 1.030146
2 0.327127 -1.015366 -1.211716 -0.573832 -1.046480 -0.582451 0.945063
3 -1.117210 -0.900354 0.441681 1.869681 0.534087 1.828972 0.945397
4 -0.214792 0.610966 0.411501 0.616994 0.501985 0.562956 -0.167618
5 -0.898883 -1.109780 0.384500 0.087716 0.463018 0.083325 1.331430
6 0.133075 0.947372 -1.334976 -0.571825 -1.182443 -0.580240 -0.870452
7 -0.071531 0.662120 0.538240 -0.656654 -0.002678 -0.656781 -0.532039
8 0.222609 0.792225 1.328018 -0.639614 1.599720 -0.590233 -0.715729
9 -0.652842 0.288582 -1.113623 -0.605836 -0.978505 -0.631862 0.103645
10 1.484501 -0.161324 -1.075705 0.022640 -0.971772 0.049008 -1.000615
In [17]:
# Show cluster sizes 

model.summary.clusterSizes
Out[17]:
[74856,
 162242,
 89804,
 120564,
 215438,
 166702,
 152800,
 253180,
 131530,
 100447,
 119260]

Generate cluster profile plots¶

In [18]:
centersNamed = utils.pd_centers(featuresUsed,centers)
print(centersNamed.columns.values)
['air_pressure' 'air_temp' 'avg_wind_direction' 'avg_wind_speed'
 'max_wind_direction' 'max_wind_speed' 'relative_humidity' 'prediction']

Profiles for All Clusters¶

This is a parallel plot to show the cluster centers for all clusters.
Data samples were standardized to have 0-mean and 1-standard-deviation before k-means was applied, so values of cluster centers can be interpreted as standard deviations from the mean.

In [19]:
numClusters = len(centersNamed.index)
colors_used = utils.parallel_plot(centersNamed, numClusters)
No description has been provided for this image

Clusters Capturing Dry Days¶

Clusters with lower-than-average relative_humidity values capture dry days

In [20]:
utils.parallel_plot(centersNamed[centersNamed['relative_humidity'] < -0.5], 
                   numClusters, colors=colors_used);
No description has been provided for this image

Clusters Capturing Humid Days¶

Clusters with higher-than-average relative_humidity values capture humid days

In [21]:
# ==> YOUR CODE HERE
No description has been provided for this image

Clusters Capturing Hot Days¶

Use air_temp

In [22]:
# ==> YOUR CODE HERE
No description has been provided for this image

Clusters Capturing Windy Days¶

Use max_wind_speed

In [23]:
# ==> YOUR CODE HERE
No description has been provided for this image

Save Model¶

In [24]:
# Specify file name
# ==> YOUR CODE HERE

model.write().overwrite().save(model_file)

Stop Spark session¶

In [25]:
# ==> YOUR CODE HERE
In [ ]: