PySpark Cluster Analysis on Weather Data¶
CIML Summer Institute¶
UC San Diego
Resources:
- Spark DataFrame Guide: https://spark.apache.org/docs/latest/sql-programming-guide.html
- PySpark API Documentation: https://spark.apache.org/docs/latest/api/python/index.html
- PySpark Cheat Sheet PDF
Setup¶
In [1]:
# Start Spark session
import pyspark
from pyspark.sql import SparkSession
conf = pyspark.SparkConf().setAll([('spark.master', 'local[2]'),
('spark.app.name', 'PySpark Cluster Analysis')])
spark = SparkSession.builder.config(conf=conf).getOrCreate()
print (spark.version)
print (pyspark.version)
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 24/05/26 20:10:21 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
3.2.1 <module 'pyspark.version' from '/opt/spark/3.2.1/hadoop/3.2/python/lib/pyspark.zip/pyspark/version.py'>
In [2]:
# Import modules
import matplotlib.pyplot as plt
import pandas as pd
# Show plots in notebook
%matplotlib inline
Read in data¶
Data source: http://hpwren.ucsd.edu
In [3]:
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import StringType, IntegerType, DoubleType, TimestampType
# Specify schema
schema = StructType ([ \
StructField ("rowID",IntegerType(),True), \
StructField ("hpwren_timestamp",TimestampType(),True), \
StructField ("air_pressure",DoubleType(),True), \
StructField ("air_temp",DoubleType(),True), \
StructField ("avg_wind_direction",DoubleType(),True), \
StructField ("avg_wind_speed",DoubleType(),True), \
StructField ("max_wind_direction",DoubleType(),True), \
StructField ("max_wind_speed",DoubleType(),True), \
StructField ("min_wind_direction",DoubleType(),True), \
StructField ("min_wind_speed",DoubleType(),True), \
StructField ("rain_accumulation",DoubleType(),True), \
StructField ("rain_duration",DoubleType(),True), \
StructField ("relative_humidity",DoubleType(),True)
])
In [4]:
# Read in data and put in Spark DataFrame
from os.path import expanduser
HOME = expanduser("~")
# Set input file
data_path = HOME + '/data/'
# ==> YOUR CODE HERE
df = spark.read.csv (inputfile, header=True, schema=schema).cache()
Explore data¶
Print schema¶
In [5]:
# ==> YOUR CODE HERE
root |-- rowID: integer (nullable = true) |-- hpwren_timestamp: timestamp (nullable = true) |-- air_pressure: double (nullable = true) |-- air_temp: double (nullable = true) |-- avg_wind_direction: double (nullable = true) |-- avg_wind_speed: double (nullable = true) |-- max_wind_direction: double (nullable = true) |-- max_wind_speed: double (nullable = true) |-- min_wind_direction: double (nullable = true) |-- min_wind_speed: double (nullable = true) |-- rain_accumulation: double (nullable = true) |-- rain_duration: double (nullable = true) |-- relative_humidity: double (nullable = true)
Count rows¶
In [6]:
# ==> YOUR CODE HERE
Out[6]:
1587257
Show summary statistics¶
In [7]:
# Use describe(). Can convert to pandas for nicer output.
# ==> YOUR CODE HERE
Out[7]:
| 0 | 1 | 2 | 3 | 4 | |
|---|---|---|---|---|---|
| summary | count | mean | stddev | min | max |
| rowID | 1587257 | 793628.0 | 458201.7724491034 | 0 | 1587256 |
| air_pressure | 1587257 | 916.8301266904355 | 3.0515931266797334 | 905.0 | 929.5 |
| air_temp | 1587257 | 61.85144042833238 | 11.833623786835483 | 31.64 | 99.5 |
| avg_wind_direction | 1586824 | 161.96537927331576 | 95.20811970204007 | 0.0 | 359.0 |
| avg_wind_speed | 1586824 | 2.774272067979729 | 2.060757793563034 | 0.0 | 32.3 |
| max_wind_direction | 1586824 | 163.40304784903682 | 92.3672342806411 | 0.0 | 359.0 |
| max_wind_speed | 1586824 | 3.3998134008569094 | 2.4231674336171545 | 0.1 | 36.0 |
| min_wind_direction | 1586824 | 166.82637078844283 | 97.4627462007766 | 0.0 | 359.0 |
| min_wind_speed | 1586824 | 2.1331304542923206 | 1.7453450849327021 | 0.0 | 32.0 |
| rain_accumulation | 1587256 | 0.0018548362708977345 | 0.9609715682155956 | 0.0 | 655.01 |
| rain_duration | 1587256 | 0.5361460281139274 | 81.1476564505365 | 0.0 | 63305.0 |
| relative_humidity | 1587257 | 47.60836877707868 | 26.21454390649868 | 0.7 | 93.0 |
Prepare data¶
Drop nulls¶
In [8]:
# Drop NAs, then get count of rows. Save the results in a new dataframe.
# ==> YOUR CODE HERE
Out[8]:
1586823
Create feature vector¶
In [9]:
from pyspark.ml.feature import VectorAssembler
featuresUsed = ['air_pressure', 'air_temp', 'avg_wind_direction', 'avg_wind_speed', 'max_wind_direction',
'max_wind_speed','relative_humidity']
assembler = VectorAssembler(inputCols=featuresUsed, outputCol="features_unscaled")
assembled = assembler.transform(workingDF)
In [10]:
# Show first row of assembled data
# ==> YOUR CODE HERE
+-----+-------------------+------------+--------+------------------+--------------+------------------+--------------+------------------+--------------+-----------------+-------------+-----------------+--------------------+ |rowID| hpwren_timestamp|air_pressure|air_temp|avg_wind_direction|avg_wind_speed|max_wind_direction|max_wind_speed|min_wind_direction|min_wind_speed|rain_accumulation|rain_duration|relative_humidity| features_unscaled| +-----+-------------------+------------+--------+------------------+--------------+------------------+--------------+------------------+--------------+-----------------+-------------+-----------------+--------------------+ | 1|2011-09-10 00:01:49| 912.3| 63.86| 161.0| 0.8| 215.0| 1.5| 43.0| 0.2| 0.0| 0.0| 39.9|[912.3,63.86,161....| +-----+-------------------+------------+--------+------------------+--------------+------------------+--------------+------------------+--------------+-----------------+-------------+-----------------+--------------------+ only showing top 1 row
Scale data¶
In [11]:
from pyspark.ml.feature import StandardScaler
scaler = StandardScaler(inputCol="features_unscaled", outputCol="features", withStd=True, withMean=True)
scalerModel = scaler.fit(assembled)
scaledData = scalerModel.transform(assembled)
In [12]:
# Show first row of scaled data
# ==> YOUR CODE HERE
+-----+-------------------+------------+--------+------------------+--------------+------------------+--------------+------------------+--------------+-----------------+-------------+-----------------+--------------------+--------------------+ |rowID| hpwren_timestamp|air_pressure|air_temp|avg_wind_direction|avg_wind_speed|max_wind_direction|max_wind_speed|min_wind_direction|min_wind_speed|rain_accumulation|rain_duration|relative_humidity| features_unscaled| features| +-----+-------------------+------------+--------+------------------+--------------+------------------+--------------+------------------+--------------+-----------------+-------------+-----------------+--------------------+--------------------+ | 1|2011-09-10 00:01:49| 912.3| 63.86| 161.0| 0.8| 215.0| 1.5| 43.0| 0.2| 0.0| 0.0| 39.9|[912.3,63.86,161....|[-1.4845928293047...| +-----+-------------------+------------+--------+------------------+--------------+------------------+--------------+------------------+--------------+-----------------+-------------+-----------------+--------------------+--------------------+ only showing top 1 row
Perform cluster analysis¶
Generate elbow plot to determine value(s) for k¶
In [13]:
%%time
import utils
# Only need to run this once to find value(s) to try for k
# Set to False if already know value for k
create_elbow_plot = True
# Get elbow plot using subset of data
if create_elbow_plot == True:
sampledData = scaledData.filter((scaledData.rowID % 20) == 0).select("features").cache()
k_attempts = range(5,15)
print('Trying k from {} to {} with {} samples\n'.format(list(k_attempts)[0],
list(k_attempts)[-1],
sampledData.count()))
wsseList = utils.elbow(sampledData, k_attempts)
utils.elbow_plot(wsseList, k_attempts)
Trying k from 5 to 14 with 79342 samples Training for cluster size 5 ......................WSSE = 132090.8626590807 Training for cluster size 6 ......................WSSE = 127840.72179653986 Training for cluster size 7 ......................WSSE = 123273.3903919039 Training for cluster size 8 ......................WSSE = 117557.05041159592 Training for cluster size 9 ......................WSSE = 114697.17601240409 Training for cluster size 10 ......................WSSE = 110832.24108164778 Training for cluster size 11 ......................WSSE = 106975.96733059295 Training for cluster size 12 ......................WSSE = 106209.23046228987 Training for cluster size 13 ......................WSSE = 103336.86818217761 Training for cluster size 14 ......................WSSE = 101171.40176944171 CPU times: user 171 ms, sys: 45.1 ms, total: 216 ms Wall time: 59.4 s
Perform Clustering Using K-Means¶
In [14]:
scaledData.printSchema()
root |-- rowID: integer (nullable = true) |-- hpwren_timestamp: timestamp (nullable = true) |-- air_pressure: double (nullable = true) |-- air_temp: double (nullable = true) |-- avg_wind_direction: double (nullable = true) |-- avg_wind_speed: double (nullable = true) |-- max_wind_direction: double (nullable = true) |-- max_wind_speed: double (nullable = true) |-- min_wind_direction: double (nullable = true) |-- min_wind_speed: double (nullable = true) |-- rain_accumulation: double (nullable = true) |-- rain_duration: double (nullable = true) |-- relative_humidity: double (nullable = true) |-- features_unscaled: vector (nullable = true) |-- features: vector (nullable = true)
In [15]:
from pyspark.ml.clustering import KMeans
scaledDataFeat = scaledData.select("features").cache()
# Set number of clusters
nClusters = 11
kmeans = KMeans(k=nClusters, seed=1)
# Fit model to scaledDataFeat. Save fitted model as 'model'
# ==> YOUR CODE HERE
In [16]:
# Get model's cluster centers
# ==> YOUR CODE HERE
# Show cluster centers
pd.DataFrame(centers,columns=featuresUsed)
Out[16]:
| air_pressure | air_temp | avg_wind_direction | avg_wind_speed | max_wind_direction | max_wind_speed | relative_humidity | |
|---|---|---|---|---|---|---|---|
| 0 | 1.171256 | -0.251789 | -1.154951 | 2.166273 | -1.054745 | 2.289235 | -1.134940 |
| 1 | 0.463010 | -0.944084 | 0.752266 | -0.534278 | 0.665727 | -0.513820 | 1.030146 |
| 2 | 0.327127 | -1.015366 | -1.211716 | -0.573832 | -1.046480 | -0.582451 | 0.945063 |
| 3 | -1.117210 | -0.900354 | 0.441681 | 1.869681 | 0.534087 | 1.828972 | 0.945397 |
| 4 | -0.214792 | 0.610966 | 0.411501 | 0.616994 | 0.501985 | 0.562956 | -0.167618 |
| 5 | -0.898883 | -1.109780 | 0.384500 | 0.087716 | 0.463018 | 0.083325 | 1.331430 |
| 6 | 0.133075 | 0.947372 | -1.334976 | -0.571825 | -1.182443 | -0.580240 | -0.870452 |
| 7 | -0.071531 | 0.662120 | 0.538240 | -0.656654 | -0.002678 | -0.656781 | -0.532039 |
| 8 | 0.222609 | 0.792225 | 1.328018 | -0.639614 | 1.599720 | -0.590233 | -0.715729 |
| 9 | -0.652842 | 0.288582 | -1.113623 | -0.605836 | -0.978505 | -0.631862 | 0.103645 |
| 10 | 1.484501 | -0.161324 | -1.075705 | 0.022640 | -0.971772 | 0.049008 | -1.000615 |
In [17]:
# Show cluster sizes
model.summary.clusterSizes
Out[17]:
[74856, 162242, 89804, 120564, 215438, 166702, 152800, 253180, 131530, 100447, 119260]
Generate cluster profile plots¶
In [18]:
centersNamed = utils.pd_centers(featuresUsed,centers)
print(centersNamed.columns.values)
['air_pressure' 'air_temp' 'avg_wind_direction' 'avg_wind_speed' 'max_wind_direction' 'max_wind_speed' 'relative_humidity' 'prediction']
Profiles for All Clusters¶
This is a parallel plot to show the cluster centers for all clusters.
Data samples were standardized to have 0-mean and 1-standard-deviation before k-means was applied, so values of cluster centers can be interpreted as standard deviations from the mean.
In [19]:
numClusters = len(centersNamed.index)
colors_used = utils.parallel_plot(centersNamed, numClusters)
Clusters Capturing Dry Days¶
Clusters with lower-than-average relative_humidity values capture dry days
In [20]:
utils.parallel_plot(centersNamed[centersNamed['relative_humidity'] < -0.5],
numClusters, colors=colors_used);
Clusters Capturing Humid Days¶
Clusters with higher-than-average relative_humidity values capture humid days
In [21]:
# ==> YOUR CODE HERE
Clusters Capturing Hot Days¶
Use air_temp
In [22]:
# ==> YOUR CODE HERE
Clusters Capturing Windy Days¶
Use max_wind_speed
In [23]:
# ==> YOUR CODE HERE
Save Model¶
In [24]:
# Specify file name
# ==> YOUR CODE HERE
model.write().overwrite().save(model_file)
Stop Spark session¶
In [25]:
# ==> YOUR CODE HERE
In [ ]: