Codementor Events

STL and Holt from R to SparkR. To scale our machine learning… | by Shubham Raizada | Walmart Global Tech Blog | Medium

Published Dec 31, 2021

To scale our machine learning algorithms currently running in R, we recently undertook the activity of rewriting the entire data preprocessing and machine learning pipeline in SparkR.
We are using time series models STL and Holt for forecasting. The data is preprocessed to cast date columns with date datatypes, imputation, standardization and finally sort w.r.t. relevant columns for preparing input to ML algorithms. Since I was unfamiliar with R, the major challenge was to understand the data manipulations being performed in the current setup and find corresponding transformations in SparkR. This article aims at sharing my learnings in the process.

Getting started

Load SparkR library and initialize a spark session
library(SparkR, lib.loc = "/usr/local/spark/R/lib")
sparkEnvir <- list(spark.num.executors='5',spark.executor.cores='5')

initializing Spark context

sc <- sparkR.init(sparkHome = "/usr/local/spark",
sparkEnvir = sparkEnvir)

initializing SQL context

sqlContext <- sparkRSQL.init(sc)
2. Read a CSV file
We can read the CSV file with inferred schema = “true” (i.e. we allow Spark to infer the data type of each column in the DF), as it reduces the programmer’s effort to explicitly cast the columns to their datatypes. If the schema is not inferred, then each column is read as a String data type. We can also provide a custom schema.

# sales_training_file : location of the file 
sales_df <- read.df(sales_training_file, "csv", header = "true", inferSchema = "true", na.strings = "NA", integer64 = "numeric")
# number of rows in the dataframe
nrow(sales_df)
#list the columns of a df 
columns(sales_df)
# show the first 5 rows 
head(sales_df, num = 5)
# schema of the dataframe
schema(sales_df)
# visualisation of starting data in the spark dataframe 
str(sales_df)
# 'SparkDataFrame': 4 variables:
# $ _c0            : int 1 1 4 7 5 6
# $ audit_date: POSIXct 2017-02-07 2017-03-07 2017-04-07 2017-05-07 2017-06-07 2017-07-07
# $ sales      : int 17 20 21 32 20 72
# $ price          : num 15.02 9.04 15.10 12.61 13.17 21.11
# rename a column 
sales_df <- withColumnRenamed(sales_df, "_c0", "product_id")
# add a new column "count" with 0 as default value
sales_df_with_count <- withColumn(sales_df, "count", 0)
# create custom schema, and pass to load call
customSchema <- structType(
 structField("product_id", type = "integer"),
 structField("audit_date", type = "string"),
 structField("sales", type = "integer"),
 structField("price", type = "double"))
sales_df_custom_schema <- read.df(price_training_file, "csv", header = "true", customSchema, na.strings = "NA", integer64 = "numeric")
3. Data manipulations
data.table and data.frame in R give us rich functionalities for selections, casting columns with datatypes, filtering, filling null values, ordering, aggregations, and even joins across data frames. With SparkR we can leverage spark-sql to perform all these manipulations.
# cast column audit_date to data type string
sales_df$audit_date <- cast(sales_df$audit_date, dataType = "string")
# create a view on price_df
createOrReplaceTempView(sales_df, "sales_table")
sales_select <- sql("select product_id, audit_date, price from sales_table where price >= 15 order by audit_date")
sales_grouped <- sql("select sum(sales),product_id from sales_table group by product_id")
In addition to this, we can also use the spark data frame methods for the above transformations.
# fill all null entries of audit_date with "2016-02-07"
sales_df_cleaned <- fillna(sales_df, list("audit_date" = "2016-02-07"))
# update price to 0, if price is less than 0
sales_df_cleaned$price <- ifelse(sales_df_cleaned$price < 0, 0, sales_df_cleaned$price)
# filter all rows with price greater than 10
sales_df_filtered <- filter(sales_df, sales_df$price > 10)
# group by audit_date and sum 
sales_df_grouped <- groupBy(sales_df, sales_df$audit_date)

sales_df_agg <- agg(sales_df_grouped, tot_sales=sum(sales_df$sales))
4. SparkR dataframe to R dataframe and vice versa
# SparkR data frame to R
sales_R_df <- collect(sales_df)
# R to sparkR dataframe
sales_spark_df <- createDataFrame(sales_R_df)

Note: We should be careful with collect() as the entire dataset is collected at the driver. In case of huge data, if the Spark driver does not have enough memory we might face OOM.

Using R packages with SparkR

In addition to the basic manipulations on the data frames, R has a rich library of packages for statistical analysis of the data. These functions expect an R data.frame or data.table and do not have support for Spark data frames.
SparkR User-Defined Function (UDF) API opens up opportunities for big data workloads running on Apache Spark to embrace R’s rich package ecosystem.
SparkR UDF API transfers data between Spark JVM and R process back and forth. Inside the UDF function, we can work with R data.frames with access to the entire R ecosystem.
SparkR offers four APIs that run a user-defined function in R to a SparkDataFrame
dapply()
dapplyCollect()
gapply()
gapplyCollect()
The following diagram illustrates the serialization and deserialization performed during the execution of the UDF. The data gets serialized twice and deserialized twice in total.

https://databricks.com/blog/2018/08/15/100x-faster-bridge-between-spark-and-r-with-user-defined-functions-on-databricks.html
gapply()
The function is to be applied to each group of the SparkDataFrame and should have only two parameters: grouping key and R data.frame corresponding to that key. The groups are chosen from SparkDataFrames column(s). The output of function should be a data.frame. Schema specifies the row format of the resulting SparkDataFrame. It must represent R function’s output schema on the basis of Spark data types.
gapplyCollect is a shortcut if we want to call collect() on the result. But, the schema is not required to be passed. Note that gapplyCollect can fail if the output of UDF run on all the partition cannot be pulled to the driver and fit in driver memory.
R code

library(energy)

sales_df[, p_value := poisson.mtest(sales,
R = 100)$p.value, by = .(product_id)]
Equivalent SparkR code
sales_result_schema <- structType(
structField("product_id", "integer"),
structField("p_value", "double"))

poisson_sales <- gapply(sales_df, c("product_id"), function(key, x) {
library(energy)
y <- data.frame(key, poisson.mtest(xsales,R=100)sales, R = 100)p.value)
}, sales_result_schema)
dapply()
Apply a function to each partition of a SparkDataFrame. The function to be applied to each partition of the SparkDataFrame and should have only one parameter, to which a data.frame corresponds to each partition will be passed. The output of the function should be a data.frame. Schema specifies the row format of the resulting SparkDataFrame. It must match to data types of the returned value.
Likedapply, we can use dapplyCollect to apply a function to each partition of a SparkDataFrame and collect the result back. The output of the function should be a data.frame. But, Schema is not required to be passed.dapplyCollect can fail if the output of UDF run on all the partition cannot be pulled to the driver and fit in driver memory.

Running STL and Holt with SparkR

Transforming STL to SparkR
R code to execute a custom function stl_forecast() which internally uses the function stlf() from library(forecast). The argument to stl_forecast() is R data.table.
sales_R_df %>%
group_by(product_id) %>%
do(stl_forecast(data.table(.))) %>%
data.table(.) -> dt_stl
For SparkR, we first need to create a schema that will be mapped to the resulting data frame.
df_stl_schema <- structType(
structField("product_id", "integer"),
structField("audit_date", "date"),
structField("stl_forecast", "double"),
structField("stl_forecast_std", "double")
)
Use gapply to execute stl_forecast() on a SparkR data frame.
df_stl <- gapply(sales_df, c("product_id"), function(key, x) {
library(data.table)
library(lubridate)
library(dplyr)
library(forecast)
library(TTR)

gapply passes R data frame, change to data table and pass as arg to stl_forecast()

sales1 <- data.table(x)
y <- data.frame(key,stl_forecast(sales1))
}, df_stl_schema)
Transforming Holt to SparkR
Similarly, we have a custom method holt_forecast() which expects an R data table.
sales_R_df %>%
group_by(product_id) %>%
do(holt_forecast(data.table(.))) %>%
data.table(.) -> dt_holt
Implementation in SparkR
dt_holt_schema <- structType(
structField("product_id", "integer"),
structField("audit_date", "date"),
structField("holt_unit_forecast", "double"),
structField("holt_unit_forecast_std", "double")
)

dt_holt <- gapply(sales_df, c("product_id"), function(key, x) {
library(data.table)
library(lubridate)
library(dplyr)
library(forecast)
sales <- data.table(x)
y <- data.frame(key,holt_forecast(sales))
}, dt_holt_schema)

Observations

We were expecting some differences in precision, which can occur due to multiple data transfer across Spark JVM and R environment. But during our validations on the generated dataset, we observed that the forecast using SparkR UDFs exactly matches the results generated earlier in R.
[1]: 100x Faster Bridge between Apache Spark and R with User-Defined Functions on Databricks https://databricks.com/blog/2018/08/15/100x-faster-bridge-between-spark-and-r-with-user-defined-functions-on-databricks.html
[2]: SparkR documentation https://spark.apache.org/docs/latest/sparkr.html#sparkdataframe

Discover and read more posts from Shubham Raizada
get started
post commentsBe the first to share your opinion
Show more replies