Codementor Events

Spark & Python: Working with RDDs (I)

Published Jul 02, 2015Last updated Mar 21, 2017

Instructions

My Spark & Python series of tutorials can be examined individually, although there is a more or less linear 'story' when followed in sequence. By using the same dataset they try to solve a related set of tasks with it.

It is not the only one but, a good way of following these Spark tutorials is by first cloning the GitHub repo, and then starting your own IPython notebook in pySpark mode. For example, if we have a standalone Spark installation running in our localhost with a maximum of 6Gb per node assigned to IPython:

MASTER="spark://127.0.0.1:7077" SPARK_EXECUTOR_MEMORY="6G" IPYTHON_OPTS="notebook --pylab inline" ~/spark-1.3.1-bin-hadoop2.6/bin/pyspark

Notice that the path to the pyspark command will depend on your specific installation. So as a requirement, you need to have Spark installed in the same machine you are going to start the IPython notebook server.

For more Spark options see here. In general it works the rule of passing options described in the form spark.executor.memory as SPARK_EXECUTOR_MEMORY when calling IPython/pySpark.

Datasets

We will be using datasets from the KDD Cup 1999.

References

The reference book for these and other Spark related topics is Learning Spark by Holden Karau, Andy Konwinski, Patrick Wendell, and Matei Zaharia.

The KDD Cup 1999 competition dataset is described in detail here.

RDD Creation

In this section, we will introduce two different ways of getting data into the basic Spark data structure, the Resilient Distributed Dataset or RDD. An RDD is a distributed collection of elements. All work in Spark is expressed as either creating new RDDs, transforming existing RDDs, or calling actions on RDDs to compute a result.Spark automatically distributes the data contained in RDDs across your cluster and parallelizes the operations you perform on them.

Getting the Data Files

In this notebook, we will use the reduced dataset (10 percent) provided for the KDD Cup 1999, containing nearly half million network interactions. The file is provided as a Gzip file that we will download locally.

import urllib
f = urllib.urlretrieve ("http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz", "kddcup.data_10_percent.gz")

Creating a RDD from a File

The most common way of creating an RDD is to load it from a file. Notice that Spark's textFile can handle compressed files directly.

data_file = "./kddcup.data_10_percent.gz"
raw_data = sc.textFile(data_file)

Now we have our data file loaded into the raw_data RDD.

Without getting into Spark transformations and actions, the most basic thing we can do to check that we got our RDD contents right is to count() the number of lines loaded from the file into the RDD.

raw_data.count()
494021

We can also check the first few entries in our data.

raw_data.take(5)
[u'0,tcp,http,SF,181,5450,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,
9,9,1.00,0.00,0.11,0.00,0.00,0.00,0.00,0.00,normal.',
u'0,tcp,http,SF,239,486,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,
19,19,1.00,0.00,0.05,0.00,0.00,0.00,0.00,0.00,normal.', u'0,tcp,http,SF,235,1337,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,
0.00,29,29,1.00,0.00,0.03,0.00,0.00,0.00,0.00,0.00,normal.', 
u'0,tcp,http,SF,219,1337,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,6,6,0.00,0.00,0.00,0.00,1.00,
0.00,0.00,39,39,1.00,0.00,0.03,0.00,0.00,0.00,0.00,0.00,normal.', 
u'0,tcp,http,SF,217,2032,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,6,6,0.00,0.00,0.00,0.00,1.00,
0.00,0.00,49,49,1.00,0.00,0.02,0.00,0.00,0.00,0.00,0.00,normal.']  

In further sections, we will use this raw data to learn about the different Spark transformations and actions. Don't worry too much about the meaning of those elements. We will go deeply into them in further sections and tutorials.

Creating and RDD using parallelize

Another way of creating an RDD is to parallelize an already existing list.

a = range(100)
    
data = sc.parallelize(a)

As we did before, we can count() the number of elements in the RDD.

data.count()
100

As before, we can access the first few elements on our RDD.

data.take(5)
[0, 1, 2, 3, 4]

RDD Basic Operations

This section will introduce three basic but essential Spark operations. Two of them are the transformations map and filter. The other is the action collect. At the same time we will introduce the concept of persistence in Spark.

Getting the Data and Creating the RDD

As we did in our first section, we will use again the reduced dataset (10 percent) provided for the KDD Cup 1999, containing nearly half million network interactions. The file is provided as a Gzip file that we will download locally.

import urllib
f = urllib.urlretrieve ("http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz", "kddcup.data_10_percent.gz")

data_file = "./kddcup.data_10_percent.gz"
raw_data = sc.textFile(data_file)

The filter Transformation

This transformation can be applied to RDDs in order to keep just elements that satisfy a certain condition. More concretely, a functions is evaluated on every element in the original RDD. The new resulting RDD will contain just those elements that make the function return True.

For example, imagine we want to count how many normal. interactions we have in our dataset. We can filter our raw_data RDD as follows.

normal_raw_data = raw_data.filter(lambda x: 'normal.' in x)

Now we can count how many elements do we have in the new RDD.

  from time import time
  t0 = time()
  normal_count = normal_raw_data.count()
  tt = time() - t0
  print "There are {} 'normal' interactions".format(normal_count)
  print "Count completed in {} seconds".format(round(tt,3))

There are 97278 'normal' interactions  
Count completed in 5.951 seconds

Remember from the first section that we have a total of 494021 samples in our 10 percent dataset. Here we can see that 97278 of them contain the normal tag word.

Notice also that we have measure the elapsed time for counting the elements in the RDD. We have done this because we wanted to point out that actual (distributed) computations in Spark take place when we execute actions and not transformations. In this case count is the action we execute on the RDD. We can apply as many transformations as we want on a our RDD and no computation will take place until we call the first action that, in this case takes a few seconds to complete.

The map Transformation

By using the map transformation in Spark, we can apply a function to every element in our RDD. Python's lambdas are specially expressive for this particular.

In this case we want to read our data file as a CSV formatted one. We can do this by applying a lambda function to each element in the RDD as follows.

  from pprint import pprint
  csv_data = raw_data.map(lambda x: x.split(","))
  t0 = time()
  head_rows = csv_data.take(5)
  tt = time() - t0
  print "Parse completed in {} seconds".format(round(tt,3))
  pprint(head_rows[0])

Parse completed in 1.715 seconds  
    [u'0',
     u'tcp',
     u'http',
     u'SF',
     u'181',
     u'5450',
     u'0',
     u'0',
     u'0',
     u'0',
     u'0',
     u'1',
     u'0',
     u'0',
     u'0',
     u'0',
     u'0',
     u'0',
     u'0',
     u'0',
     u'0',
     u'0',
     u'8',
     u'8',
     u'0.00',
     u'0.00',
     u'0.00',
     u'0.00',
     u'1.00',
     u'0.00',
     u'0.00',
     u'9',
     u'9',
     u'1.00',
     u'0.00',
     u'0.11',
     u'0.00',
     u'0.00',
     u'0.00',
     u'0.00',
     u'0.00',
     u'normal.']   

Again, all action happens once we call the first Spark action (i.e. take in this case). What if we take a lot of elements instead of just the first few of them?

    t0 = time()
    head_rows = csv_data.take(100000)
    tt = time() - t0
    print "Parse completed in {} seconds".format(round(tt,3))

Parse completed in 8.629 seconds

We can see that it takes longer. The map function is applied now in a distributed way to a lot of elements on the RDD, hence the longer execution time.

Using map with Predefined Functions

Of course we can use predefined functions with map and not just lambda. Imagine we want to have each element in the RDD as a key-value pair where the key is the tag (e.g. normal) and the value is the whole list of elements that represents the row in the CSV formatted file. We could proceed as follows.

def parse_interaction(line):
    elems = line.split(",")
    tag = elems[41]
    return (tag, elems)

key_csv_data = raw_data.map(parse_interaction)
head_rows = key_csv_data.take(5)
pprint(head_rows[0])
(u'normal.',
     [u'0',
      u'tcp',
      u'http',
      u'SF',
      u'181',
      u'5450',
      u'0',
      u'0',
      u'0',
      u'0',
      u'0',
      u'1',
      u'0',
      u'0',
      u'0',
      u'0',
      u'0',
      u'0',
      u'0',
      u'0',
      u'0',
      u'0',
      u'8',
      u'8',
      u'0.00',
      u'0.00',
      u'0.00',
      u'0.00',
      u'1.00',
      u'0.00',
      u'0.00',
      u'9',
      u'9',
      u'1.00',
      u'0.00',
      u'0.11',
      u'0.00',
      u'0.00',
      u'0.00',
      u'0.00',
      u'0.00',
      u'normal.'])

That was easy, wasn't it?

In the section about working with key-value pairs we will use this type of RDDs to do data aggregations (e.g. count by key).

The collect Action

So far we have used the actions count and take. Another basic action we need to learn is collect. Basically it will get all the elements in the RDD into memory for us to work with them. For this reason it has to be used with care, specially when working with large RDDs.

An example using our raw data.

t0 = time()
all_raw_data = raw_data.collect()
tt = time() - t0
print "Data collected in {} seconds".format(round(tt,3))
Data collected in 17.927 seconds

That took longer as any other action we used before, of course. Every Spark worker node that has a fragment of the RDD has to be coordinated in order to retrieve its part, and then reduce everything together.

As a last example combining all the previous, we want to collect all the normal interactions as key-value pairs.

# get data from file
data_file = "./kddcup.data_10_percent.gz"
raw_data = sc.textFile(data_file)

# parse into key-value pairs
key_csv_data = raw_data.map(parse_interaction)

# filter normal key interactions
normal_key_interactions = key_csv_data.filter(lambda x: x[0] == "normal.")

# collect all
t0 = time()
all_normal = normal_key_interactions.collect()
tt = time() - t0
normal_count = len(all_normal)
print "Data collected in {} seconds".format(round(tt,3))
print "There are {} 'normal' interactions".format(normal_count)
Data collected in 12.485 seconds
There are 97278 normal interactions

This count matches with the previous count for normal interactions. The new procedure is more time consuming. This is because we retrieve all the data with collect and then use Python's len on the resulting list. Before we were just counting the total number of elements in the RDD by using count.

Sampling RDDs

So far we have introduced RDD creation together with some basic transformations such as map and filter and some actions such as count, take, and collect.

This section will show how to sample RDDs. Regarding transformations, sample will be introduced since it will be useful in many statistical learning scenarios. Then we will compare results with the takeSample action.

Getting the Data and Creating the RDD

In this case we will use the complete dataset provided for the KDD Cup 1999, containing nearly half million network interactions. The file is provided as a Gzip file that we will download locally.

import urllib
f = urllib.urlretrieve ("http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data.gz", "kddcup.data.gz")

Now we can use this file to create our RDD.

data_file = "./kddcup.data.gz"
raw_data = sc.textFile(data_file)

Sampling RDDs

In Spark, there are two sampling operations, the transformation sample and the action takeSample. By using a transformation we can tell Spark to apply successive transformation on a sample of a given RDD. By using an action we retrieve a given sample and we can have it in local memory to be used by any other standard library (e.g. Scikit-learn).

The sample Transformation

The sample transformation takes up to three parameters. First is wether the sampling is done with replacement or not. Second is the sample size as a fraction. Finally we can optionally provide a random seed.

raw_data_sample = raw_data.sample(False, 0.1, 1234)
sample_size = raw_data_sample.count()
total_size = raw_data.count()
print "Sample size is {} of {}".format(sample_size, total_size)
Sample size is 489957 of 4898431

But the power of sampling as a transformation comes from doing it as part of a sequence of additional transformations. This will show more powerful once we start doing aggregations and key-value pairs operations, and will be specially useful when using Spark's machine learning library MLlib.

In the meantime, imagine we want to have an approximation of the proportion of normal. interactions in our dataset. We could do this by counting the total number of tags as we did in previous notebooks. However we want a quicker response and we don't need the exact answer but just an approximation. We can do it as follows.

from time import time

# transformations to be applied
raw_data_sample_items = raw_data_sample.map(lambda x: x.split(","))
sample_normal_tags = raw_data_sample_items.filter(lambda x: "normal." in x)

# actions + time
t0 = time()
sample_normal_tags_count = sample_normal_tags.count()
tt = time() - t0

sample_normal_ratio = sample_normal_tags_count / float(sample_size)
print "The ratio of 'normal' interactions is {}".format(round(sample_normal_ratio,3)) 
print "Count done in {} seconds".format(round(tt,3))
The ratio of 'normal' interactions is 0.199  

Count done in 44.523 seconds

Let's compare this with calculating the ratio without sampling.

# transformations to be applied
raw_data_items = raw_data.map(lambda x: x.split(","))
normal_tags = raw_data_items.filter(lambda x: "normal." in x)

# actions + time
t0 = time()
normal_tags_count = normal_tags.count()
tt = time() - t0

normal_ratio = normal_tags_count / float(total_size)
print "The ratio of 'normal' interactions is {}".format(round(normal_ratio,3)) 
print "Count done in {} seconds".format(round(tt,3))
The ratio of 'normal' interactions is 0.199  
Count done in 91.09 seconds  

We can see a gain in time. The more transformations we apply after the sampling the bigger this gain. This is because without sampling all the transformations are applied to the complete set of data.

The takeSample Action

If what we need is to grab a sample of raw data from our RDD into local memory in order to be used by other non-Spark libraries, takeSample can be used.

The syntax is very similar, but in this case we specify the number of items instead of the sample size as a fraction of the complete data size.

t0 = time()
raw_data_sample = raw_data.takeSample(False, 400000, 1234)
normal_data_sample = [x.split(",") for x in raw_data_sample if "normal." in x]
tt = time() - t0

normal_sample_size = len(normal_data_sample)

normal_ratio = normal_sample_size / 400000.0
print "The ratio of 'normal' interactions is {}".format(normal_ratio)
print "Count done in {} seconds".format(round(tt,3))
The ratio of 'normal' interactions is 0.1988025  
Count done in 76.166 seconds  

The process was very similar as before. We obtained a sample of about 10 percent of the data, and then filter and split.

However, it took longer, even with a slightly smaller sample. The reason is that Spark just distributed the execution of the sampling process. The filtering and splitting of the results were done locally in a single node.

Set Operations on RDDs

Spark support many of the operations we have in mathematical sets, such as union and intersection, even when the RDDs themselves are not properly sets. It is important to note that these operations require that the RDDs being operated on are of the same type.

Set operations are quite straightforward to understand as it work as expected. The only consideration comes from the fact that RDDs are not real sets, and therefore operations such as the union of RDDs doesn't remove duplicates. In this notebook we will have a brief look at substract, distinct, and cartesian.

Getting the Data and Creating the RDD

As we did in our first section, we will use again the reduced dataset (10 percent) provided for the KDD Cup 1999, containing nearly half million network interactions. The file is provided as a Gzip file that we will download locally.

import urllib
f = urllib.urlretrieve ("http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz", "kddcup.data_10_percent.gz")

data_file = "./kddcup.data_10_percent.gz"
raw_data = sc.textFile(data_file)

Getting Attack Interactions Using substract

For illustrative purposes, imagine we already have our RDD with non attack (normal) interactions from some previous analysis.

normal_raw_data = raw_data.filter(lambda x: "normal." in x)

We can obtain attack interactions by substracting normal ones from the original unfiltered RDD as follows.

attack_raw_data = raw_data.subtract(normal_raw_data)

Let's do some counts to check our results.

from time import time

# count all
t0 = time()
raw_data_count = raw_data.count()
tt = time() - t0
print "All count in {} secs".format(round(tt,3))
All count in 5.261 secs
# count normal
t0 = time()
normal_raw_data_count = normal_raw_data.count()
tt = time() - t0
print "Normal count in {} secs".format(round(tt,3))

Normal count in 5.571 secs

# count attacks
t0 = time()
attack_raw_data_count = attack_raw_data.count()
tt = time() - t0
print "Attack count in {} secs".format(round(tt,3))

Attack count in 12.075 secs

print "There are {} normal interactions and {} attacks, \
from a total of {} interactions".format(normal_raw_data_count,attack_raw_data_count,raw_data_count)
There are 97278 normal interactions and 396743 attacks, from a total of 494021 interactions

So now we have two RDDs, one with normal interactions and another one with attacks.

Protocol and Service Combinations Using cartesian

We can compute the Cartesian product between two RDDs by using the cartesian transformation. It returns all possible pairs of elements between two RDDs. In our case we will use it to generate all the possible combinations between service and protocol in our network interactions.

First of all we need to isolate each collection of values in two separate RDDs. For that we will use distinct on the CSV-parsed dataset. From the dataset description we know that protocol is the second column and service is the third (tag is the last one and not the first as appears in the page).

So first, let's get the protocols.

csv_data = raw_data.map(lambda x: x.split(","))
protocols = csv_data.map(lambda x: x[1]).distinct()
protocols.collect()

[u'udp', u'icmp', u'tcp']

Now we do the same for services.

services = csv_data.map(lambda x: x[2]).distinct()
services.collect()
[u'domain',
u'http_443',
u'Z39_50',
u'smtp',
u'urp_i',
u'private',
u'echo',
u'shell',
u'red_i',
u'eco_i',
u'sunrpc',
u'ftp_data',
u'urh_i',
u'pm_dump',
u'pop_3',
u'pop_2',
u'systat',
u'ftp',
u'uucp',
u'whois',
u'netbios_dgm',
u'efs',
u'remote_job',
u'daytime',
u'ntp_u',
u'finger',
u'ldap',
u'netbios_ns',
u'kshell',
u'iso_tsap',
u'ecr_i',
u'nntp',
u'printer',
u'domain_u',
u'uucp_path',
u'courier',
u'exec',
u'time',
u'netstat',
u'telnet',
u'gopher',
u'rje',
u'sql_net',
u'link',
u'auth',
u'netbios_ssn',
u'csnet_ns',
u'X11',
u'IRC',
u'tftp_u',
u'login',
u'supdup',
u'name',
u'nnsp',
u'mtp',
u'http',
u'bgp',
u'ctf',
u'hostnames',
u'klogin',
u'vmnet',
u'tim_i',
u'discard',
u'imap4',
u'other',
u'ssh']

A longer list in this case.

Now we can do the cartesian product.

product = protocols.cartesian(services).collect()
print "There are {} combinations of protocol X service".format(len(product))
There are 198 combinations of protocol X service

Obviously, for such small RDDs doesn't really makes sense to use Spark cartesian product. We could have perfectly collected the values after using distinct and do the cartesian product locally. Moreover, distinct and cartesian are expensive operations so they must be used with care when the operating datasets are large.

Discover and read more posts from Jose A Dianes
get started
post commentsBe the first to share your opinion
素偶啦
7 years ago

thanks a lot for the post!

Alexander Wolf
7 years ago

Thank you this helpful starter tutorial. For python 3.x one needs to change all print “” to print(""); Besides that, everything works fine :)

Show more replies