PySpark Scaling Demo¶

CIML Summer Institute¶

UC San Diego¶


Setup¶

In [1]:
# Initialize Spark

import pyspark
from pyspark.sql import SparkSession, Row

# Change N in local[N] to change number of resources 
# Note change in execution time
conf = pyspark.SparkConf().setAll([('spark.master', 'local[1]'),
                                   ('spark.app.name', 'Spark Demo'),
                                   ])
spark = SparkSession.builder.config(conf=conf).getOrCreate()

print (pyspark.version.__version__)
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/06/22 17:29:19 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
3.5.5
In [2]:
print(spark.sparkContext.defaultParallelism)
1

Record execution start time¶

In [3]:
import time
start_time = time.time()

Read data¶

In [4]:
# Read data into Spark DataFrame 
# Data source: https://jmcauley.ucsd.edu/data/amazon/ 

from os.path import expanduser
HOME = expanduser("~")

data_path = HOME + '/data/'

dataFileName = data_path + "BookReviews_5M.txt"
textDF = spark.read.text(dataFileName)

Process data¶

Count number of rows¶

In [5]:
%%time

textDF.count()
[Stage 0:====================================================>      (8 + 1) / 9]
CPU times: user 4.56 ms, sys: 191 μs, total: 4.75 ms
Wall time: 3.05 s
                                                                                
Out[5]:
5000000

Show first few rows¶

In [6]:
textDF.show()
+--------------------+
|               value|
+--------------------+
|This was the firs...|
|Also after going ...|
|As with all of Ms...|
|I've not read any...|
|This romance nove...|
|Carolina Garcia A...|
|Not only can she ...|
|Once again Garcia...|
|The timing is jus...|
|Engaging. Dark. R...|
|Set amid the back...|
|This novel is a d...|
|If readers are ad...|
| Reviewed by Phyllis|
|      APOOO BookClub|
|A guilty pleasure...|
|In the tradition ...|
|Beryl Unger, top ...|
|What follows is a...|
|The book flap say...|
+--------------------+
only showing top 20 rows

Filter for reviews with the word 'science'¶

In [7]:
%%time

from pyspark.sql.functions import col

filteredDF = textDF.filter(col('value').contains('science'))
CPU times: user 1.29 ms, sys: 38 μs, total: 1.33 ms
Wall time: 30.8 ms
In [8]:
# Uncomment to show entire untruncated review 
# filteredDF.show(truncate=False)
filteredDF.show()
+--------------------+
|               value|
+--------------------+
|If so then it des...|
|This book was an ...|
|A "Welcome to the...|
|Despite painful g...|
|The storyline its...|
|Recommended if yo...|
|I don't really ge...|
|As a reader and r...|
|Sony E-readers:  ...|
|I had rather hope...|
|In the early 1980...|
|You just have to ...|
|Genres in fiction...|
|Although this see...|
|This is one of th...|
|I'm not normally ...|
|From the first pa...|
|Predators of Dark...|
|Predators Of Dark...|
|In short: Highly ...|
+--------------------+
only showing top 20 rows

In [9]:
%%time 

filteredDF.count()
[Stage 5:====================================================>      (8 + 1) / 9]
CPU times: user 2.37 ms, sys: 3.75 ms, total: 6.13 ms
Wall time: 3.31 s
                                                                                
Out[9]:
1403

Print time since execution start¶

In [10]:
print(time.time() - start_time)
8.840343475341797

Stop Spark Session¶

In [11]:
spark.stop()
In [ ]: