Codementor Events

Adding Flow Control to Apache Pig using Python

Published Aug 27, 2015Last updated Jan 18, 2017
Adding Flow Control to Apache Pig using Python

(image source)

Introduction

So you like Pig but its cramping your style? Are you not sure what Pig is about? Are you keen to write some code to write code for you? If yes, then this is for you.

This tutorial ties together a whole lot of different techniques and technologies. The aim here is to show you a trick to get Pig to behave in a way that's just a little bit more loopy. It's a trick I've used before quite a lot and I've written a couple of utility functions to make it easy. I'll go over the bits and pieces here. This tutorial, on a more general note, is about writing code that writes code. The general technique and concerns outlined here can be applied to other code generating problems.

What Does Pig Do?

Pig is a high-level scripting toolset used for defining and executing complex map-reduce workflows. Let's take a closer look at that sentence...

Pig, is a top-level Apache project. It is open source and really quite nifty. Learn more about it here. PigLatin is Pig's language. Pig executes PigLatin scripts. Within a PigLatin script you write a bunch of statements that get converted into a bunch of map-reduce jobs that can get executed in sequence on your Hadoop cluster. It's usually nice to abstract away from writing plain old map-reduce jobs because they can be a total pain in the neck.

If you haven't used Pig before and aren't sure if it's for you, it might be a good idea to check out Hive. Hive and Pig have a lot of overlap in terms of functionality, but have different philosophies. They aren't total competitors because they are often used in conjunction with one another. Hive resembles SQL, while PigLatin resembles... PigLatin. So if you are familiar with SQL then Hive might be an easier learn, but IMHO Pig is a bit more sensible than Hive in how it describes data flow.

What Doesn't Pig Do?

Pig doesn't make any decisions about the flow of program execution, it only allows you to specify the flow of data. In other words, it allows you to say stuff like this:

-----------------------------------------------
-- define some data format goodies
-----------------------------------------------

define CSV_READER org.apache.pig.piggybank.storage.CSVExcelStorage(
                                                            ',',
                                                            'YES_MULTILINE',
                                                            'UNIX'
                                                            );


define CSV_WRITER org.apache.pig.piggybank.storage.CSVExcelStorage(
                                                            ',',
                                                            'YES_MULTILINE',
                                                            'UNIX',
                                                            'SKIP_OUTPUT_HEADER'
                                                            );

-----------------------------------------------
-- load some data
-----------------------------------------------

r_one = LOAD 'one.csv' using CSV_READER
AS (a:chararray,b:chararray,c:chararray);

r_two = LOAD 'two.csv' using CSV_READER
AS (a:chararray,d:chararray,e:chararray);

-----------------------------------------------
-- do some processing
-----------------------------------------------

r_joined = JOIN r_one by a, t_two by a;

r_final = FOREACH r_joined GENERATE 
    r_one::a, b, e;

-----------------------------------------------
-- store the result
-----------------------------------------------

store r_final into 'r_three.csv' using CSV_WRITER;

The script above says where the data should flow. Every statement you see there will get executed exactly once no matter what (unless there is some kind of error).

You can run the script from the command line like so:

pig path/to/my_script.oink

Ok, what if we have a bunch of files and each of them needs to have the same stuff happen to it? Does that mean we would need to copy-paste our PigLatin script and edit each one to have the right paths?

Well, no. Pig allows some really basic substitutions. You can do stuff like this:

r_one = LOAD '$DIR/one.csv' using CSV_READER
AS (a:chararray,b:chararray,c:chararray);

r_two = LOAD '$DIR/two.csv' using CSV_READER
AS (a:chararray,d:chararray,e:chararray);

-----------------------------------------------
-- do some processing
-----------------------------------------------

r_joined = JOIN r_one by a, t_two by a;

r_final = FOREACH r_joined GENERATE 
    r_one::a, b, e;

-----------------------------------------------
-- store the result
-----------------------------------------------

store r_final into '$DIR/r_three.csv' using CSV_WRITER;

Then you can run the script as many times as you like with different values for DIR. Something like:

pig path/to/my_script.oink -p DIR=jan_2015
pig path/to/my_script.oink -p DIR=feb_2015
pig path/to/my_script.oink -p DIR=march_2015

So pig allows variable substitution and that is a pretty powerful thing on its own. But it doesn't allow loops or if statements and that can be somewhat limiting. What if we had to iterate over 60 different values for DIR? This is something Pig doesn't cater for.

Luckily for us, Python can loop just fine. So we could do something like:

def run_pig_script(sFilePath,dPigArgs=None):
    """
    run piggy run
    """
    import subprocess
    lCmd = ["pig",sFilePath,]  
    for sArg in ['{0}={1}'.format(*t) for t in (dPigArgs or {}).items()]:
        lCmd.append('-p')
        lCmd.append(sArg)
    print lCmd
    p = subprocess.Popen(lCmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True)
    stdout, stderr = p.communicate()
    return stdout,stderr

for sDir in lManyDirectories:
    run_pig_script(sFilePath="path/to/my_script.oink",dPigArgs={'DIR':sDir})

The run_pig_script function makes use of the subprocess module to create a Pig process through use of the Popen function. Popen takes a list of token strings as its first argument and makes a system call from there. So first we create the command list lCmd then start a process. The output of the process (the stuff that would usually get printed to the console window) gets redirected to the stderr and stdout objects.

In order to populate lCmd we use a short-hand for loop notation known as list comprehension. It's very cool and useful but beyond the scope of this text. Try calling run_pig_script with a few different arguments and see what it prints and you should easily get a feel for what Popen expects.

But what if you really need a loop inside your pig script?

So we have covered executing a PigLatin script many times with different values, what if we want to make use of many variables within the PigLatin script? For example, what happens if we want to loop over some variable number of directories within a single script? For example something like this...

r_jan_1 = LOAD 'jan_1/sales.csv' USING CSV_READER AS (a,b,c,d,e,f,g);
r_jan_2 = LOAD 'jan_2/sales.csv' USING CSV_READER AS (a,b,c,d,e,f,g);
r_jan_3 = LOAD 'jan_3/sales.csv' USING CSV_READER AS (a,b,c,d,e,f,g);
r_jan_4 = LOAD 'jan_4/sales.csv' USING CSV_READER AS (a,b,c,d,e,f,g);
... more stuff
r_jan_16 = LOAD 'jan_16/prices.csv' USING CSV_READER AS (a,b,c,d,e,f,g);

r_all = UNION r_jan_1, r_jan_2, r_jan_3, r_jan_4, ... r_jan_16;

Writing all that down could become tedious. Especially if we are working with an arbitrary number of files each time. Maybe we want a union of all the sales of the month so far, then we would need to come up with a new script for every day. That sounds pretty horrible and would require a lot of copy-paste and copy-paste smells bad.

So Here is What We are Going to Do Instead

Have some pythonish pseudo-code:

lStrs = complicated_operation_getting_list_of_strings() #1
sPigPath = generate_pig_script(lStrs)                   #2
run_pig_script(sFilePath = sPigPath)                    #3

So we have 3 steps in the code above: Step 1 is getting the data we need that the pig script is going to rely on. Then, in step 2, we need to take that data and turn it into something Pig will be able to understand. Step 3 then needs to make it run.

Step 1 of the process very much depends on what you are trying to do. Following from the previous example we would likely want complicated_operation_getting_list_of_strings to look like:

def complicated_operation_getting_list_of_strings():
    import datetime
    oNow = datetime.datetime.now()
    sMonth = oNow.strftime('%b').lower()
    return ["{0}_{1}".format(sMonth,i+1) for i in range(oNow.day)]

The rest of this tutorial wil be dealing with steps 2 and 3.

Template Systems

Writing code to write code for us! That's pretty futuristic stuff!

Not really...

Ever written a web app? Did you use some kind of framework for this? Did the framework specify (or allow you to specify) some special way of writing HTML so that you could do clever things in your HTML files? Clever things like loops and ifs and variable substitutions? If you answered yes to these questions, you wrote code that wrote HTML code for you at least. And if you answered no, then the take-away message here is: Writing code that writes code is something that has been done for ages, there are many systems libraries and packages that support this kind of thing in many languages. These kinds of tools are generally referred to as template systems.

The template system we'll be using for this is Mako. This is not a mako tutorial, to learn about mako, check this out.

An important thing in choosing a template system to make sure that it doesn't clash with the language you are using it to write. And if it does clash then you need to find ways to compensate. What I mean by this is: If I am using a template language then that language has a few well-defined control sequences for doing things like loops and variable substitution. An example from mako is:

${xSomeVariable}

When you render that line of code then the value of xSomeVariable will get turned into a string. But what if ${stuff} meant something in the language you are trying to generate? Then there is a good chance that mako will find things in your template files that it thinks it needs to deal with and it will either output garbage or raise exceptions.

Mako and PigLatin don't have this problem. So that's pretty convenient.

Using Python to generate PigLatin

Remember this: sPigPath = generate_pig_script(lNames)?

Good coders don't mix languages in the same file if they can help it (which is pretty much always). So while it is possible to define your entire PigLatin mako template in the form of a big giant string inside your Python script, we aren't going to do that.

Also, it would be nice if the code we are writing works for more than one template. So instead of:

sPigPath = generate_pig_script(lStrs)   #2

We'll do this:

sPigPath = generate_pig_script(sFilePath,dContext)   #2

We want to pass in the path to our template file, along with a dictionary containing the context variables we'd use to render it this time. For example we could have:

dContext = {
    'lStrs' : complicated_operation_getting_list_of_strings()
}

Ok, so lets write some real code then...

def generate_pig_script(sFilePath,dContext):
    """
    render the template at sFilePath using the context in dContext,
    save the output in a temporary file
    return the path to the generated file
    """
    from mako.template import Template
    import datetime

    #1. fetch the template from the file
    oTemplate = Template(filename=sFilePath)
    
    #2. render it using the context dictionary. This gives us a string
    sOutputScript = oTemplate.render(**dContext)

    #3. put the output into some file...
    sOutPath = "{0}_{1}".format(sFilePath,datetime.datetime.now().isoformat())
    with open(sOutPath,'w') as f:
        f.write(sOutputScript)

    return sOutPath

The comments in the code should be enough to understand its general functioning.

Just to complete the picture, let's make an actual template...

Remember this?

r_jan_1 = LOAD 'jan_1/sales.csv' USING CSV_READER AS (a,b,c,d,e,f,g);
r_jan_2 = LOAD 'jan_2/sales.csv' USING CSV_READER AS (a,b,c,d,e,f,g);
r_jan_3 = LOAD 'jan_3/sales.csv' USING CSV_READER AS (a,b,c,d,e,f,g);
r_jan_4 = LOAD 'jan_4/sales.csv' USING CSV_READER AS (a,b,c,d,e,f,g);
... more stuff
r_jan_16 = LOAD 'jan_16/prices.csv' USING CSV_READER AS (a,b,c,d,e,f,g);

r_all = UNION r_jan_1, r_jan_2, r_jan_3, r_jan_4, ... r_jan_16;

Here it is in the form of a mako template:

%for sD in lStrs:

r_${sD} = LOAD '${sD}/sales.csv' USING CSV_READER AS (a,b,c,d,e,f,g);

%endfor

r_all = UNION ${','.join(['r_{0}'.format(sD) for sD in lStrs])};

The full picture

So now we have used Python to generate a PigLatin script and store it in a known location. And we already know how to get Python to launch Pig. So that's it. Pretty straight forward, eh? This tutorial made use of a few different technologies and techniques and it's impossible not to jump around a little bit so I've included a little summary here of how to use this technique:

#1 given a working PigLatin script that has a lot of repitition or a variable number of inputs, create a mako template
#2 write a function that creates the context for the mako template. eg:
dContext = {
    'lStrs' : complicated_operation_getting_list_of_strings()
}
#3 render the template
sPigFilePath = generate_pig_script(sMakoFilePath,dContext)
#and finally run the thing...
run_pig_script(sPigFilePath,dPigArgs=None)

Conclusion

We've covered some of the basics of code generation and used Python and the mako templating system to make Pig more loopy. I've touched on a lot of different technologies and techniques. Pig itself is quite a big deal, and the kinds of problems it is applied to can fill books. The mako templating engine is a powerful thing in itself and has many use cases other than Pig (I mostly use it in conjunction with Pyramid for example). Python loops and list comprehension is worth looking into if any of the weird for-loop stuff didn't make sense; and finally the subprocess modult- it constitutes quite a rabbit hole on its own.

Discover and read more posts from Sheena
get started
post commentsBe the first to share your opinion
Show more replies