First and foremost , Yes you need to install Pig and don't worry it is extremely easy.There are many blogs and documents available online for Pig installation . Refer to one and proceed . The below URL from Apache might be useful .Good Luck !
http://pig.apache.org/docs/r0.10.0/start.html#Pig+Setup
Look at this picture below , some of you might have already seen the picture and will be like "yes i have already seen this picture already". I am not here trying to reinvent the whole wheel but rather here to collate the information scattered everywhere and to explain what it is and things it does.
To read more , go to the website below .
http://pig.apache.org/docs/r0.7.0/piglatin_ref1.html#Overview
#2 above in the diagram states that pig has 2 execution modes or exectypes:
Interactive Mode.
Batch Mode.
http://pig.apache.org/docs/r0.10.0/start.html#Pig+Setup
Look at this picture below , some of you might have already seen the picture and will be like "yes i have already seen this picture already". I am not here trying to reinvent the whole wheel but rather here to collate the information scattered everywhere and to explain what it is and things it does.
#1 above in the diagram states that "Pig is a dataflow language and Pig Latin is used to express it ". Does not make sense ? Let me take an extract from Apache's website
A Pig Latin statement is an operator that takes a relation as input and produces another relation as output. (This definition applies to all Pig Latin operators except LOAD and STORE which read data from and write data to the file system.) Pig Latin statements can span multiple lines and must end with a semi-colon ( ; ). Pig Latin statements are generally organized in the following manner:
- A LOAD statement reads data from the file system - This is reading the data stored in the hard-drive for pig to be able to read or process or display in the terminal .
- A series of "transformation" statements process the data - Let us say this a business Logic being performed .
- A STORE statement writes output to the file system; or, a DUMP statement displays output to the screen - This is writing the data back to hard-disk .
To read more , go to the website below .
http://pig.apache.org/docs/r0.7.0/piglatin_ref1.html#Overview
#2 above in the diagram states that pig has 2 execution modes or exectypes:
- Local Mode - This means you are running pig only on one machine and not in distributed environment with clusters.Local mode are started using the -x flag ,the syntax of which is given below.
pig -x local
- Mapreduce Mode - This signifies that you need to have access to a Hadoop cluster and HDFS installation. Hadoop should already be running on your machine . How do we know that ? Type jps as given below
hadoop@ubuntu-server64:~/lab/install/hadoop-1.0.4/bin$ jps
2837 Jps
2255 SecondaryNameNode
1848 DataNode
2344 JobTracker
2590 TaskTracker
1606 NameNode
hadoop@ubuntu-server64:~/lab/install/hadoop-1.0.4/bin$
Please note that all the above services might not be running. The datanode and Tasktracer generally run on the slave machines and so if you are not able to see them while executing the command jps , dont worry . That is just allright .
Mapreduce mode is the default mode; you can,but don't need to, specify it using the -x flag Pig can also start in local and mapreduce mode using the java command which we will not discuss it here.
The discussion on Pig does not progress towards logical conclusion if we do not discuss about running PIG in various modes.
Running PIG !!
Pig Latin and Pig Commands are executed in 2 modes
1. Interactive Mode.
2. Batch Mode.
1. Interactive Mode.
2. Batch Mode.
Interactive Mode.
Pig can be run in interactive mode using the Grunt shell. As shown above enter the PIG's grunt shell in either local or Mapreduce mode .Hope you have not forgotten it already . If you have , refer the above .
grunt> transaction = LOAD 'retail/txn.csv' USING PigStorage(',') AS(txn_id,txn_dt,cust_id,amt:double,cat,sub_cat,adr1,adr2,trans_type);
grunt> dump transaction;
Note : If you want to work with these statements refer to the sample data i have provided in my previous blog on Pig.Batch Mode.
A group of pig latin statements can be grouped together and can be run together . This is more like packaging or creating procedures in PL/SQL .Or you can think of it as bunch of statements written together to perform a task.
Here is a piece of code .
copy the below in a text file . Save it with a name , say "trans.pig" in your local directory wherever pig is installed . For example /user/data/trans.pig
transactions = LOAD 'retail/txn.csv' USING PigStorage(',') AS(txn_id,txn_dt,cust_id,amt:double,cat,sub_cat,adr1,adr2,trans_type);
txn_100plus = FILTER transactions BY amt > 100.00;
txn_grpd = GROUP txn_100plus BY cat;
txn_cnt_bycat = FOREACH txn_grpd GENERATE group , COUNT(txn_100plus);
STORE txn_cnt_bycat INTO '/home/hadoop'
Now how do we execute it ? Here goes the details
Local Mode:
$ pig -x local trans.pig
Mapreduce Mode:
$ pig trans.pig
or
$ pig -x mapreduce trans.pig
Comments in code :
Before we start our journey in exploring the code , it is always good to know how to comment a piece of code or a bunch of line. We used to learn this as a first thing in our college classes as during the practical test all we had to do was "at the least" to make the code error free and what better way then commenting the erroneous code.
Getting to the point , there 2 ways to comment a code in pig
1. single-line comments using --
2. multi-line comments using /* …. */
Oracle pl/sql developers!! does this look exactly the same . Yes it is the same.
Let me give an example :
/* I am commenting this 3 lines below
transactions = LOAD 'retail/txn.csv' USING PigStorage(',') AS(txn_id,txn_dt,cust_id,amt:double,cat,sub_cat,adr1,adr2,trans_type);
txn_grpd = GROUP txn_100plus BY cat;
txn_cnt_bycat = FOREACH txn_grpd GENERATE group , COUNT(txn_100plus) commented till here */
STORE txn_cnt_bycat INTO '/home/hadoop' -- This part is commented again
First thing whenever we start learning a new language is to understand the datatypes , lets quickly walk through it
There are 2 data types in Pig
1. Scalar
2. Complex
1. Scalar
Scalar datatype is very similar to other languages and these are 6 different types .
int : They store 4 byte signed integer . Example : 102.
long : They store 8 byte signed integer . Example : 500000000003.
float : They store 4 byte value. In some calculation float loses its precision . This is obvious from its limited storage capacity .Example : 3.14 4.
double : They store 8 byte value . Example : 2.71828 or exponent format : 6.626e-34 chararray : A string or character array. Example : 'Hello' or the value for Ctrl-A is expressed as \u0001.5.
bytearray : A blob or array of bytes , There is no way to specify a bytearray.
Dont worry too much if you are not able to understand 100% . That is fine , the interesting concepts are below .
Complex data types are given below :
Map : This is a key value pair (k,v) . The key is a chararray used as an index to find the element referred to as Value. Because PIG does not know the type of value , it will assume it as a bytearray .However the actual value might be of some other data type.We can CAST the value to a different data type , if we know the data type of the value.By default there is no requirement that all values in a map must be of the same type.It is legitimate to have a map with two keys name and age, where the value for name is a chararry and value for age is an int.Map is formed using square brackets [] . For example, ['name'#'bob', 'age'#55] will create a map with two keys, “name” and “age”.
Tuple : A tuple is a fixed-length, ordered collection of Pig data elements.Tuples are divided into fields, with each field containing one data element. These elements can be of any type—they do not all need to be the same type. A tuple is analogous to a row in SQL,with the fields being SQL columns. Tuple constants use parentheses to indicate the tuple and commas to delimit fields in the tuple. For example, ('bob', 55) describes a tuple constant with two fields.
Bag : A bag is an unordered collection of tuples. Because it has no order, it is not possible to reference tuples in a bag by position.Bag constants are constructed using braces, with tuples in the bag separated by commas.For example, {('bob', 55), ('sally', 52), ('john', 25)} constructs a bag with three tuples, each with two fields.
Please NOTE : Bag is the one type in Pig that is not required to fit into memory. Because bags are used to store collections when grouping(ahh, you might not understand it here, hold your breadth explainatio), bags can become quite large.Pig has the ability to spill bags to disk when necessary, keeping only partial sections of the bag in memory. The size of the bag is limited to the amount of local disk available for spilling the bag.
Enough of theory isn't it , let's now work with some practical examples .Before we process , here is the data snapshot on which we will be performing our operations .The data has 3 fields as give below .
x: Name
y: Number
z: Number
Let us call this file as "foo.txt" and save this file in the default pig directory.
Tuple : A tuple is a fixed-length, ordered collection of Pig data elements.Tuples are divided into fields, with each field containing one data element. These elements can be of any type—they do not all need to be the same type. A tuple is analogous to a row in SQL,with the fields being SQL columns. Tuple constants use parentheses to indicate the tuple and commas to delimit fields in the tuple. For example, ('bob', 55) describes a tuple constant with two fields.
Bag : A bag is an unordered collection of tuples. Because it has no order, it is not possible to reference tuples in a bag by position.Bag constants are constructed using braces, with tuples in the bag separated by commas.For example, {('bob', 55), ('sally', 52), ('john', 25)} constructs a bag with three tuples, each with two fields.
Please NOTE : Bag is the one type in Pig that is not required to fit into memory. Because bags are used to store collections when grouping(ahh, you might not understand it here, hold your breadth explainatio), bags can become quite large.Pig has the ability to spill bags to disk when necessary, keeping only partial sections of the bag in memory. The size of the bag is limited to the amount of local disk available for spilling the bag.
Enough of theory isn't it , let's now work with some practical examples .Before we process , here is the data snapshot on which we will be performing our operations .The data has 3 fields as give below .
x: Name
y: Number
z: Number
Let us call this file as "foo.txt" and save this file in the default pig directory.
Bill 10 20
Mark 5 5
Larry 2 2
Bill 5 5
Let us now login into the grunt shell. grunt> ls
hdfs://localhost:8020/user/hadoop/FIRST_NAME <dir>
hdfs://localhost:8020/user/hadoop/NYSE_daily.txt<r 1> 1471331
hdfs://localhost:8020/user/hadoop/NYSE_dividends.txt<r 1> 17695
hdfs://localhost:8020/user/hadoop/TEMP_FIRST_NAME <dir>
hdfs://localhost:8020/user/hadoop/foo.txt<r 1> 41
hdfs://localhost:8020/user/hadoop/out1 <dir>
Execute the below command to load the data from foo.txt to a relation A grunt> A = load 'foo.txt' as (x:chararray, y:int, z:int);
grunt> dump A;
/* The Command follows Map Reduce .There are lot of information between dump and resultset given below . I am skipping them */
(Bill,10,20)
(Mark,5,5)
(Larry,2,2)
(Bill,5,5)
Please note : The result set above is a tuple ( or it is similar to a row in SQL ).How do i access field level data(data for only x or y) from the above relation A ?
If it were to be SQL , we would have written select x from <table_name> but in this case
grunt> B = foreach A generate x;
grunt> dump B
(Bill)
(Mark)
(Larry)
(Bill)
Most important concept here is to understand that each step of a pig statement generates a relation . The above statement FOREACH suggest that the entire set of records of the relation A be parsed one by one until the last record and field x be separated from it . This is then dumped into the terminal.Note : commands or statements are not case sensitive in pig . For Example load and LOAD are the same but relations are case sensitive . "A" above will not be the same as "a".
grunt> C = group A by x;
grunt> dump C
(Bill,{(Bill,10,20),(Bill,5,5)})
(Mark,{(Mark,5,5)})
(Larry,{(Larry,2,2)})
grunt> describe C;
C: {group: chararray,A: {(x: chararray,y: int,z: int)}}
The result returned here is tuple with a bag inside it . Remember tuple is denoted by () and bag by {}.Look at the description of C . it clearly mentions that the grouping has been done only to consolidate data in a single bag .
If we look at the data foo.txt , Bill appears twice and hence when we dump C , Bill as a group has 2 Bill inside it
(Bill,{(Bill,10,20),(Bill,5,5)})
This means that grouping in pig is entirely different from grouping in SQL .
Grouping in SQL needs an aggregate function whereas in pig it just brings together tuples with similar key or group by values .
Going a little off-topic so as to explain how is SQL grouping different from Pig grouping .
Here is my explanation ! Watch carefully
Let us consider the below table in SQL .Let us call it employee table
Name Dept Salary
Bill 10 100
Mark 10 200
Larry 20 500
King 20 50
George 30 1000
In sql, A grouping would look like below select dept,sum(salary) from employee group by dept
The result-set after firing the above query will look as given below
Dept Salary
10 300
20 550
30 1000
Let us now load the same data to a pig relation and assume this is a comma separated value employee = load 'employee.csv' USING PigStorage(',') as (name:chararray, dept:int, salary:int);
dept_groupby = group employee by dept;
dump dept_groupby;
(10,{(bill,10,100),(Mark,10,200)})
(20,{(Larry,20,500),(King,20,50)})
(30,{(George,20,1000,)})
so it means that there is no aggregation happening , it is just grouping a set a tuple or records together . Hence pig's grouping and sql grouping are two different operation .
C = foreach dept_groupby generate employee.dept,SUM(employee.salary);
dump C;
o/p
({(10),(10)},300)
({(20),(20)},550)
({(30)},1000)
Looks a little different and difficult as well for someone who might be working his whole life on high level languages such as SQL but didn't we tell you that this is a DATA FLOW language. You have to take one step at a time , remember No skipping any step here . Get back to basics .
Now the next question that follows is , what are the different types of relational operators available in pig ? Let us explore them one by one .
Foreach :
Foreach takes each record from the relation , applies some expression ( sometimes it doesn't apply any expression at all ) . From these expressions it generates new records to send down
the pipeline to the next operator . Look at the example below .
Foreach :
Foreach takes each record from the relation , applies some expression ( sometimes it doesn't apply any expression at all ) . From these expressions it generates new records to send down
the pipeline to the next operator . Look at the example below .
grunt> cat foo.txt
Bill 10 20
Mark 5 5
Larry 2 2
Bill 5 5
grunt> A = LOAD 'foo.txt' as ( name,x:int,y:int);
grunt> B = foreach A generate name;
grunt> dump B;
(Bill)
(Mark)
(Larry)
(Bill)
It is simple , isn't it . The Dump B generates only name field from the foo.txt data . This was a basic example lets make it a little complex .
C = foreach A generate x-y;
OR
D = foreach A generate $2-$1;
(10)
(0)
(0)
(0)
we can either use the field variables that has been defined or use $0,$1 .. where $0 signifies the first field , $1 second field and so on.Some more foreach related operations .
beginning = foreach A generate ..x; -- produces name and x . This means display all the data till the begining of x.
middle = foreach prices generate name..y; -- produces name and y . This means display all the data from name to y . In our case it is just 2 column(field) data.
end = foreach prices generate x..; -- produces x and y . This means display all the data from x till the end.In our case it is x and y.
UDFs in foreach :
User Defined Functions (UDFs) can be invoked in foreach.These are called evaluation
functions, or eval funcs. Because they are part of a foreach statement, these UDFs take
one record at a time and produce one output. The important thing to note here is that although the eval functions take one argument , that one argument can be bag and hence indirectly we can pass more than one values to the function . Likewise for output of the function .
A = load 'foo.txt' as (name:chararray, x:int, y:int);
D = foreach A generate UPPER(name) as name ,x;
E = group D by name;
F = foreach E generate group , sum(D.x);
o/p
014-01-23 02:26:30,254 [main] ERROR org.apache.pig.tools.grunt.Grunt - ERROR 1070: Could not resolve sum using imports: [, org.apache.pig.builtin., org.apache.pig.impl.builtin.]
Whoops !!!! why is it that i see error inspite of doing everything right ?? Any guesses ??The Reason is very simple and yet so very hard to remember . I failed almost for 30 mins before i could figure out what is the problem . UDF's are CASE SENSITIVE .Make sum to SUM
F = foreach E generate group , SUM(D.x);
o/p
(BILL,15)
(MARK,5)
(LARRY,2)
Hell Yeah , it works now !!Filter :
The filter statement allows you to select which records you want to retain in your data pipeline. You have all types of operators that can be used in filter such as == , != , >= etc . This is same as any other languages.
Now some important points
1. To use these operators with tuples , both tuples must either have same schema or no schema .
2. None of these operators can be applied with bags .
3. Pig Latin follows the operator precedence that is standard in most programming languages. Remember BODMAS rule is maths . Yes it is just that .
A SQL user/developers now knows that , this is nothing but "WHERE" in SQL queries . Now we ask , is there something like regular expression matching ?
Yes there is , look at the example below .
grunt> describe F;
F: {group: chararray,long}
grunt> G = filter F by $0 matches 'BI.*';
O/P
(BILL,15)
The describe on the Relation F above gives me a result which clearly shows that the fields has not been named and hence i have used $0 which gives me the first field .The ".*" above tells the engine to match everything after BI and hence the result is (BILL,15)
Group :
Having mentioned multiple times above and also having explained once already . Group by shares its syntax with SQL .
grunt> C = group A by name;
grunt> dump C
(Bill,{(Bill,10,20),(Bill,5,5)})
(Mark,{(Mark,5,5)})
(Larry,{(Larry,2,2)})
grunt> describe C;
C: {group: chararray,A: {(name: chararray,y: int,z: int)}}
Look at what is happening here , Pig group only bring's together set of tuples sharing the same key as in this case all Bill's records is in one bag now .
The difference with SQL is that , SQL needs an aggregator operation to be performed while grouping such as SUM,COUNT,AVG,MIN,MAX etc wheras in PIG ,GROUP literally mean the word . It just groups the data together .
Few pointers :
1. You can also group on multiple keys.
2. You can also use all to group together all of the records in your pipeline.The record coming out of group all has the chararray literal all as a key
grunt> I = group A all ;
grunt> describe I;
I: {group: chararray,A: {(name: chararray,x: int,y: int)}}
grunt> dump I;
o/p
(all,{(Bill,10,20),(Mark,5,5),(Larry,2,2),(Bill,5,5)})
3. group is the operator that usually will force a reduce phase.If the pipeline is in a map phase, this will force it to shuffle and then reduce. If the pipeline is already in a reduce, this will force it to pass through map, shuffle, and reduce phases again . 4. group handles nulls in the same way that SQL handles them: by collecting all records with a null key into the same group
After having discussed all these , Let us take a step back and understand why is PIG better for map-reduce .
In Map-Reduce we often get skewed results , meaning reducers might get different load of data at the end of Map,shuffle and sort . The simple reason being some keys might have more values and some less
Just because you have specified that your job have 100 reducers, there is no reason to expect that the number of values per key will be distributed evenly.
For example,
suppose you have an index of web pages and you group by the base URL. Certain values
such as yahoo.com(login.yahoo.com,in.answers.yahoo.com)are going to have far more entries than say indiatimes.com, which means that some reducers get far more data than others.
Now as we know that our MapReduce job is not finished (and any subsequent ones cannot start) until all our reducers have finished, this skew will significantly slow our processing. In some cases it might also not be impossible for one reducer to manage so much data. what do we do ?
Pig has a number of ways that it tries to manage this skew to balance out the load across reducers. The one that applies to grouping is Hadoop’s combiner. Let's talk a little more about combiners.
Combiners:
Combiner gives applications a chance to apply their reducer logic early on . As the map phase writes output, it is serialized and placed into an in-memory buffer.When this buffer fills, MapReduce will sort the buffer and then run the combiner if the application has provided an implementation for it.The resulting output is then written to local disk, to be picked up by the shuffle phase and sent to the reducers.MapReduce might choose not to run the combiner if it determines it will be more efficient not to.
If you don't understand where a combiner will be placed in MapReduce , here is another diagram for you .A picture is worth a thousand words , isn't it ?
Which means that combiner is doing the task of a reducer even before reducer comes into the picture .
Now here is what you have been waiting for !
Pig’s operators and built-in UDFs use the combiner whenever possible, because of its skew-reducing features and because early aggregation greatly reduces the amount of data shipped over the network and written to disk, thus speeding performance significantly.
Order by :
I believe we all understand Order by but let me explain very briefly as to what it is .Order statement sorts your data for you, producing a total order of your output data. Remember Sorting by maps, tuples, or bags produces errors
For all data types, nulls are taken to be smaller than all possible values
for that type, and thus will always appear first (or last when desc is used).
grunt> A = load 'foo.txt' as (name:chararray, x:int, y:int);
grunt> B = order A by name;
grunt> dump B;
o/p
(Bill,10,20)
(Bill,5,5)
(Larry,2,2)
(Mark,5,5)
Distinct :
Distinct statement removes duplicate records .It work only on entire records , not on individual files.
grunt> A = load 'foo.txt' as (name:chararray, x:int, y:int);
grunt> B = foreach A generate name;
grunt> C = distinct B;
o/p
(Bill)
(Mark)
(Larry)
Limit :
Sometimes we want to see only a limited set of results for many purpose . Sometimes to see if we are getting the output or not .Limit helps us achieve that .
grunt> D = limit C 2;
grunt> dump D;
o/p
(Bill)
(Mark)
A fact with limit with Pig is that inspite of we limiting the records ( in this example 2 ) . It will still read the entire dataset but just returns 2 records as result .Therefore it is not for performance .
Except for Order , None of the relation operator in pig guarantees to return the same result-set again if you execute the same query multiple times. Limit is no exception to the rule . In our example above ,if we do a limit againit might return different value these time. For example .
grunt> D = limit C 2;
grunt> dump D;
o/p
(Larry)
(Mark)
Sample :
Sample offers a simple way to get a sample of your data. It reads through all of your data but returns only a percentage of rows. What percentage it returns is expressed as a double value, between 0 and 1. So, in the following example, 0.1 indicates 10%:
The percentage will not be an exact match,but close.
grunt> E = sample A 0.4;
The sample A by 0.1 is rewritten to filter A by random() <= 0.1.Parallel
One of Pig’s core claims is that it provides a language for parallel data processing.so Pig prefers that you tell it how parallel to be. To do this, it provides the parallel clause.The parallel clause can be attached to any relational operator in Pig Latin. However,it controls only reduce-side parallelism, so it makes sense only for operators that force a reduce phase.These are: group*, order, distinct, join*, limit, cogroup*, and cross.Operators marked with an asterisk have multiple implementations, some of which force a reduce and some which do not
In this example, parallel will cause the MapReduce job spawned by Pig to have 10grunt> E = sample A 0.4;
reducers. parallel clauses apply only to the statement to which they are attached; they
do not carry through the script.
If, however, you do not want to set parallel separately for every reduce-invoking operator
in your script, you can set a script-wide value using the set command
grunt> set default_parallel 10;
I will talk about advanced operators in my next blog .Watch this space .