Spark & R: Loading Data into SparkSQL Data Frames

Published Sep 18, 2015Last updated Mar 22, 2017
Spark & R: Loading Data into SparkSQL Data Frames

In this second tutorial (see the first one) we will introduce basic concepts about SparkSQL with R that you can find in the SparkR documentation, applied to the 2013 American Community Survey dataset. We will do two things, read data into a SparkSQL data frame, and have a quick look at the schema and what we have read. We will work with IPython/Jupyter notebooks here. In the first tutorial, we explained how to download files and start a Jupyter notebook using SparkR. In order to continue with this tutorial, you will need to take that first, in order to have the data downloaded locally.

All the code for these series of Spark and R tutorials can be found in its own GitHub repository. Go there and make it yours.

Instructions

As we already mentioned, for these series of tutorials/notebooks, we have used Jupyter with the IRkernel R kernel. You can find
installation instructions for you specific setup here.

A good way of using these notebooks is by first cloning the repo, and then
starting your Jupyter in pySpark mode. For example,
if we have a standalone Spark installation running in our localhost with a
maximum of 6Gb per node assigned to IPython:

MASTER="spark://127.0.0.1:7077" SPARK_EXECUTOR_MEMORY="6G" IPYTHON_OPTS="notebook --pylab inline" ~/spark-1.5.0-bin-hadoop2.6/bin/pyspark

Notice that the path to the pyspark command will depend on your specific
installation. So as requirement, you need to have
Spark installed in
the same machine you are going to start the IPython notebook server.

For more Spark options see here. In general it works the rule of passign options
described in the form spark.executor.memory as SPARK_EXECUTOR_MEMORY when
calling IPython/pySpark.

Datasets

2013 American Community Survey dataset

Every year, the US Census Bureau runs the American Community Survey. In this survey, approximately 3.5 million
households are asked detailed questions about who they are and how they live. Many topics are covered, including
ancestry, education, work, transportation, internet use, and residency. You can directly to
the source
in order to know more about the data and get files for different years, longer periods, individual states, etc.

In any case, the starting up notebook
will download the 2013 data locally for later use with the rest of the notebooks.

The idea of using this dataset came from being recently announced in Kaggle
as part of their Kaggle scripts datasets. There you will be able to analyse the dataset on site, while sharing your results with other Kaggle
users. Highly recommended!

Creating a SparkSQL Context

In further notebooks, we will explore our data by loading them into SparkSQL data frames. But first we need to init a SparkSQL context. The first thing we need to do is to set up some environment variables and library paths as follows. Remember to replace the value assigned to SPARK_HOME with your Spark home folder.

# Set Spark home and R libs
Sys.setenv(SPARK_HOME='/home/cluster/spark-1.5.0-bin-hadoop2.6')
.libPaths(c(file.path(Sys.getenv('SPARK_HOME'), 'R', 'lib'), .libPaths()))

Now we can load the SparkR library as follows.

library(SparkR)
    Attaching package: ‘SparkR’
    
    The following objects are masked from ‘package:stats’:
    
        filter, na.omit
    
    The following objects are masked from ‘package:base’:
    
        intersect, rbind, sample, subset, summary, table, transform

And now we can initialise the Spark context as in the official documentation. In our case we are use a standalone Spark cluster with one master and seven workers. If you are running Spark in local node, use just master='local'. Additionally, we require a Spark package from Databricks to read CSV files (more on this in the next section).

sc <- sparkR.init(master='spark://169.254.206.2:7077', sparkPackages="com.databricks:spark-csv_2.11:1.2.0")
    Launching java with spark-submit command /home/cluster/spark-1.5.0-bin-hadoop2.6/bin/spark-submit  --packages com.databricks:spark-csv_2.11:1.2.0 sparkr-shell /tmp/RtmpyC8rUQ/backend_port3743359adb78 

And finally we can start the SparkSQL context as follows.

sqlContext <- sparkRSQL.init(sc)

Creating SparkSQL Data Frames

Reading CSV Data Using Databricks CSV Extension

The easiest way to get our CSV data into a SparkSQL dataframe, is by using Databricks CSV extension to read SparkSQL dataframes directly from csv files. In any case, remember to set the right path for your data files in the first line, ours is /nfs/data/2013-acs/ss13husa.csv.

housing_a_file_path <- file.path('', 'nfs','data','2013-acs','ss13husa.csv')
housing_b_file_path <- file.path('', 'nfs','data','2013-acs','ss13husb.csv')

Now let's read into a SparkSQL dataframe. We need to pass four parameters in addition to the sqlContext:

  • The file path.
  • header='true' since our csv files have a header with the column names.
  • Indicate that we want the library to infer the schema.
  • And the source type (the Databricks package in this case).
system.time(
        housing_a_df <- read.df(sqlContext, 
                            housing_a_file_path, 
                            header='true', 
                            source = "com.databricks.spark.csv", 
                            inferSchema='true')
)
       user  system elapsed 
      0.002   0.000  16.919 

Let's have a look at the inferred schema.

system.time(
        printSchema(housing_a_df)
)
    root
     |-- RT: string (nullable = true)
     |-- SERIALNO: integer (nullable = true)
     |-- DIVISION: integer (nullable = true)
     |-- PUMA: integer (nullable = true)
     |-- REGION: integer (nullable = true)
     |-- ST: integer (nullable = true)
     |-- ADJHSG: integer (nullable = true)
     |-- ADJINC: integer (nullable = true)
     |-- WGTP: integer (nullable = true)
     |-- NP: integer (nullable = true)
     |-- TYPE: integer (nullable = true)
     |-- ACCESS: integer (nullable = true)
     |-- ACR: integer (nullable = true)
     |-- AGS: integer (nullable = true)
     |-- BATH: integer (nullable = true)
     |-- BDSP: integer (nullable = true)
     |-- BLD: integer (nullable = true)
     |-- BROADBND: integer (nullable = true)
     |-- BUS: integer (nullable = true)
     |-- COMPOTHX: integer (nullable = true)
     |-- CONP: integer (nullable = true)
     |-- DIALUP: integer (nullable = true)
     |-- DSL: integer (nullable = true)
     |-- ELEP: integer (nullable = true)
     |-- FIBEROP: integer (nullable = true)
     |-- FS: integer (nullable = true)
     |-- FULP: integer (nullable = true)
     |-- GASP: integer (nullable = true)
     |-- HANDHELD: integer (nullable = true)
     |-- HFL: integer (nullable = true)
     |-- INSP: integer (nullable = true)
     |-- LAPTOP: integer (nullable = true)
     |-- MHP: integer (nullable = true)
     |-- MODEM: integer (nullable = true)
     |-- MRGI: integer (nullable = true)
     |-- MRGP: integer (nullable = true)
     |-- MRGT: integer (nullable = true)
     |-- MRGX: integer (nullable = true)
     |-- OTHSVCEX: integer (nullable = true)
     |-- REFR: integer (nullable = true)
     |-- RMSP: integer (nullable = true)
     |-- RNTM: integer (nullable = true)
     |-- RNTP: integer (nullable = true)
     |-- RWAT: integer (nullable = true)
     |-- RWATPR: integer (nullable = true)
     |-- SATELLITE: integer (nullable = true)
     |-- SINK: integer (nullable = true)
     |-- SMP: integer (nullable = true)
     |-- STOV: integer (nullable = true)
     |-- TEL: integer (nullable = true)
     |-- TEN: integer (nullable = true)
     |-- TOIL: integer (nullable = true)
     |-- VACS: integer (nullable = true)
     |-- VALP: integer (nullable = true)
     |-- VEH: integer (nullable = true)
     |-- WATP: integer (nullable = true)
     |-- YBL: integer (nullable = true)
     |-- FES: integer (nullable = true)
     |-- FFINCP: integer (nullable = true)
     |-- FGRNTP: integer (nullable = true)
     |-- FHINCP: integer (nullable = true)
     |-- FINCP: integer (nullable = true)
     |-- FPARC: integer (nullable = true)
     |-- FSMOCP: integer (nullable = true)
     |-- GRNTP: integer (nullable = true)
     |-- GRPIP: integer (nullable = true)
     |-- HHL: integer (nullable = true)
     |-- HHT: integer (nullable = true)
     |-- HINCP: integer (nullable = true)
     |-- HUGCL: integer (nullable = true)
     |-- HUPAC: integer (nullable = true)
     |-- HUPAOC: integer (nullable = true)
     |-- HUPARC: integer (nullable = true)
     |-- KIT: integer (nullable = true)
     |-- LNGI: integer (nullable = true)
     |-- MULTG: integer (nullable = true)
     |-- MV: integer (nullable = true)
     |-- NOC: integer (nullable = true)
     |-- NPF: integer (nullable = true)
     |-- NPP: integer (nullable = true)
     |-- NR: integer (nullable = true)
     |-- NRC: integer (nullable = true)
     |-- OCPIP: integer (nullable = true)
     |-- PARTNER: integer (nullable = true)
     |-- PLM: integer (nullable = true)
     |-- PSF: integer (nullable = true)
     |-- R18: integer (nullable = true)
     |-- R60: integer (nullable = true)
     |-- R65: integer (nullable = true)
     |-- RESMODE: integer (nullable = true)
     |-- SMOCP: integer (nullable = true)
     |-- SMX: integer (nullable = true)
     |-- SRNT: integer (nullable = true)
     |-- SSMC: integer (nullable = true)
     |-- SVAL: integer (nullable = true)
     |-- TAXP: integer (nullable = true)
     |-- WIF: integer (nullable = true)
     |-- WKEXREL: integer (nullable = true)
     |-- WORKSTAT: integer (nullable = true)
     |-- FACCESSP: integer (nullable = true)
     |-- FACRP: integer (nullable = true)
     |-- FAGSP: integer (nullable = true)
     |-- FBATHP: integer (nullable = true)
     |-- FBDSP: integer (nullable = true)
     |-- FBLDP: integer (nullable = true)
     |-- FBROADBNDP: integer (nullable = true)
     |-- FBUSP: integer (nullable = true)
     |-- FCOMPOTHXP: integer (nullable = true)
     |-- FCONP: integer (nullable = true)
     |-- FDIALUPP: integer (nullable = true)
     |-- FDSLP: integer (nullable = true)
     |-- FELEP: integer (nullable = true)
     |-- FFIBEROPP: integer (nullable = true)
     |-- FFSP: integer (nullable = true)
     |-- FFULP: integer (nullable = true)
     |-- FGASP: integer (nullable = true)
     |-- FHANDHELDP: integer (nullable = true)
     |-- FHFLP: integer (nullable = true)
     |-- FINSP: integer (nullable = true)
     |-- FKITP: integer (nullable = true)
     |-- FLAPTOPP: integer (nullable = true)
     |-- FMHP: integer (nullable = true)
     |-- FMODEMP: integer (nullable = true)
     |-- FMRGIP: integer (nullable = true)
     |-- FMRGP: integer (nullable = true)
     |-- FMRGTP: integer (nullable = true)
     |-- FMRGXP: integer (nullable = true)
     |-- FMVP: integer (nullable = true)
     |-- FOTHSVCEXP: integer (nullable = true)
     |-- FPLMP: integer (nullable = true)
     |-- FREFRP: integer (nullable = true)
     |-- FRMSP: integer (nullable = true)
     |-- FRNTMP: integer (nullable = true)
     |-- FRNTP: integer (nullable = true)
     |-- FRWATP: integer (nullable = true)
     |-- FRWATPRP: integer (nullable = true)
     |-- FSATELLITEP: integer (nullable = true)
     |-- FSINKP: integer (nullable = true)
     |-- FSMP: integer (nullable = true)
     |-- FSMXHP: integer (nullable = true)
     |-- FSMXSP: integer (nullable = true)
     |-- FSTOVP: integer (nullable = true)
     |-- FTAXP: integer (nullable = true)
     |-- FTELP: integer (nullable = true)
     |-- FTENP: integer (nullable = true)
     |-- FTOILP: integer (nullable = true)
     |-- FVACSP: integer (nullable = true)
     |-- FVALP: integer (nullable = true)
     |-- FVEHP: integer (nullable = true)
     |-- FWATP: integer (nullable = true)
     |-- FYBLP: integer (nullable = true)
     |-- wgtp1: integer (nullable = true)
     |-- wgtp2: integer (nullable = true)
     |-- wgtp3: integer (nullable = true)
     |-- wgtp4: integer (nullable = true)
     |-- wgtp5: integer (nullable = true)
     |-- wgtp6: integer (nullable = true)
     |-- wgtp7: integer (nullable = true)
     |-- wgtp8: integer (nullable = true)
     |-- wgtp9: integer (nullable = true)
     |-- wgtp10: integer (nullable = true)
     |-- wgtp11: integer (nullable = true)
     |-- wgtp12: integer (nullable = true)
     |-- wgtp13: integer (nullable = true)
     |-- wgtp14: integer (nullable = true)
     |-- wgtp15: integer (nullable = true)
     |-- wgtp16: integer (nullable = true)
     |-- wgtp17: integer (nullable = true)
     |-- wgtp18: integer (nullable = true)
     |-- wgtp19: integer (nullable = true)
     |-- wgtp20: integer (nullable = true)
     |-- wgtp21: integer (nullable = true)
     |-- wgtp22: integer (nullable = true)
     |-- wgtp23: integer (nullable = true)
     |-- wgtp24: integer (nullable = true)
     |-- wgtp25: integer (nullable = true)
     |-- wgtp26: integer (nullable = true)
     |-- wgtp27: integer (nullable = true)
     |-- wgtp28: integer (nullable = true)
     |-- wgtp29: integer (nullable = true)
     |-- wgtp30: integer (nullable = true)
     |-- wgtp31: integer (nullable = true)
     |-- wgtp32: integer (nullable = true)
     |-- wgtp33: integer (nullable = true)
     |-- wgtp34: integer (nullable = true)
     |-- wgtp35: integer (nullable = true)
     |-- wgtp36: integer (nullable = true)
     |-- wgtp37: integer (nullable = true)
     |-- wgtp38: integer (nullable = true)
     |-- wgtp39: integer (nullable = true)
     |-- wgtp40: integer (nullable = true)
     |-- wgtp41: integer (nullable = true)
     |-- wgtp42: integer (nullable = true)
     |-- wgtp43: integer (nullable = true)
     |-- wgtp44: integer (nullable = true)
     |-- wgtp45: integer (nullable = true)
     |-- wgtp46: integer (nullable = true)
     |-- wgtp47: integer (nullable = true)
     |-- wgtp48: integer (nullable = true)
     |-- wgtp49: integer (nullable = true)
     |-- wgtp50: integer (nullable = true)
     |-- wgtp51: integer (nullable = true)
     |-- wgtp52: integer (nullable = true)
     |-- wgtp53: integer (nullable = true)
     |-- wgtp54: integer (nullable = true)
     |-- wgtp55: integer (nullable = true)
     |-- wgtp56: integer (nullable = true)
     |-- wgtp57: integer (nullable = true)
     |-- wgtp58: integer (nullable = true)
     |-- wgtp59: integer (nullable = true)
     |-- wgtp60: integer (nullable = true)
     |-- wgtp61: integer (nullable = true)
     |-- wgtp62: integer (nullable = true)
     |-- wgtp63: integer (nullable = true)
     |-- wgtp64: integer (nullable = true)
     |-- wgtp65: integer (nullable = true)
     |-- wgtp66: integer (nullable = true)
     |-- wgtp67: integer (nullable = true)
     |-- wgtp68: integer (nullable = true)
     |-- wgtp69: integer (nullable = true)
     |-- wgtp70: integer (nullable = true)
     |-- wgtp71: integer (nullable = true)
     |-- wgtp72: integer (nullable = true)
     |-- wgtp73: integer (nullable = true)
     |-- wgtp74: integer (nullable = true)
     |-- wgtp75: integer (nullable = true)
     |-- wgtp76: integer (nullable = true)
     |-- wgtp77: integer (nullable = true)
     |-- wgtp78: integer (nullable = true)
     |-- wgtp79: integer (nullable = true)
     |-- wgtp80: integer (nullable = true)
       user  system elapsed 
      0.002   0.000   0.062 

Looks good. Let's have a look at the first few rows.

head(housing_a_df)
. RT SERIALNO DIVISION PUMA REGION ST ADJHSG ADJINC WGTP NP ellip.h wgtp71 wgtp72 wgtp73 wgtp74 wgtp75 wgtp76 wgtp77 wgtp78 wgtp79 wgtp80 .
1 H 84 6 2600 3 1 1000000 1007549 0 1 0 0 0 0 0 0 0 0 0 0
2 H 154 6 2500 3 1 1000000 1007549 51 4 86 53 59 84 49 15 15 20 50 16
3 H 156 6 1700 3 1 1000000 1007549 449 1 161 530 601 579 341 378 387 421 621 486
4 H 160 6 2200 3 1 1000000 1007549 16 3 31 24 33 7 7 13 18 23 23 5
5 H 231 6 2400 3 1 1000000 1007549 52 1 21 18 37 49 103 38 49 51 46 47
6 H 286 6 900 3 1 1000000 1007549 76 1 128 25 68 66 80 26 66 164 88 24

And let's count how many rows do we have in the first dataset. For that we will use nrow as we do with regular R data frames. It is just a definition of nrow by the package SparkR.

nrow(housing_a_df)
756065

Let's read the second housing data frame and count the number of rows.

system.time(
        housing_b_df <- read.df(sqlContext, 
                            housing_b_file_path, 
                            header='true', 
                            source = "com.databricks.spark.csv", 
                            inferSchema='true')
)
       user  system elapsed 
      0.128   0.016   9.666 
print(nrow(housing_b_df))
    [1] 720248

Merging Data Frames

Now we can use rbind() as we do with regular R data frames to put both of them together. Again, SparkR redefines many of the common R functions to work with SparkSQL data frames. Let's actually use rbind as follows.

housing_df <- rbind(housing_a_df, housing_b_df)

And let's count how many rows do we have in the complete data frame.

system.time(
        housing_samples <- nrow(housing_df)
)
print(housing_samples)
       user  system elapsed 
      0.001   0.000  17.206 
    [1] 1476313

Finally, let's get a glimpse of what is to explore data using SparkR by using the summary function on the data frame. If we have a look at the documentation using '?summary' we can see that we are redirected to describe, that optionally accepts column names. Let's use it with the whole data frame, that is, the 231 columns and 1476313 rows. Note: We use collect here because the results of describe are given as a DataFrame object and we need to print them in the notebook.

system.time(
        housing_summary <- describe(housing_df)
)
       user  system elapsed 
      0.174   0.016 197.755 
collect(housing_summary)
. summary RT SERIALNO DIVISION PUMA REGION ST ADJHSG ADJINC WGTP ellip.h wgtp71 wgtp72 wgtp73 wgtp74 wgtp75 wgtp76 wgtp77 wgtp78 wgtp79 wgtp80
1 count 1476313 1476313 1476313 1476313 1476313 1476313 1476313 1476313 1476313 1476313 1476313 1476313 1476313 1476313 1476313 1476313 1476313 1476313 1476313
2 mean NA 746447.7617598707 5.114928202894643 4458.457590632881 2.6305945961323918 27.8048550679971 1000000.0 1007549.0 89.95933585899468 89.95847289836234 89.9587072660066 89.95868220357065 89.95941511048132 89.95922341671448 89.95888473514763 89.95931147392186 89.95866933367111 89.95865239959276 89.95870862073286
3 stddev NA NaN 2.476670855019605 NaN 1.0144176501154056 15.894404191598897 NaN NaN 80.66731786485431 99.43323120400835 100.08749152422924 98.37291356061534 99.52607767089758 98.67766416960376 99.11341616484188 99.05644177173914 98.94559614502197 97.87387141885895 100.18396018153416
4 min H 1 1 100 1 1 1000000 1007549 0 -8 -494 -14 -151 -38 -5 -3 -16 -22 -243
5 max H 1492845 9 70301 4 56 1000000 1007549 1829 2282 2328 2393 2348 2263 2310 2131 2794 2710 2447

Or we can select individual column summaries using select as follows (here is a dictionary of each column meaning) where VALP is the property value.

collect(select(housing_summary,"VALP"))

| . | VALP | |
|------|--------------------|
| 1 | 859691 |
| 2 | 247682.84302150423 |
| 3 | NaN |
| 4 | 100 |
| 5 | 4775000 |

Conclusions

And that's it. In this tutorial we have shown how to load a CSV file into an SparkSQL data frame using SparkR. We also had a look at the data loaded, mainly to the number of samples loaded, and the data summary.

We already started to see how the SparkR implementation tries to create a language that sounds familiar to the R user, not just by using a data frame abstraction but also by defining a series of functions equivalent to the regular R ones. However we have to remember that SparkR data frames are distributed data structures that open the door to scalable data analysis.

And finally, remember that all the code for these series of Spark and R tutorials can be found in its own GitHub repository. Go there and make it yours.

Discover and read more posts from Jose A Dianes
get started
Enjoy this post?

Leave a like and comment for Jose

1
1