Thursday, January 23, 2014

Pig coding made easy.

First and foremost , Yes you need to install Pig and don't worry it is extremely easy.There are many blogs and documents available online for Pig installation . Refer to one and proceed . The below URL from Apache might be useful .Good Luck !

http://pig.apache.org/docs/r0.10.0/start.html#Pig+Setup

Look at this picture below , some of you might have already seen the picture and will be like "yes i have already seen this picture already". I am not here trying to reinvent the whole wheel but rather here to collate the information scattered everywhere and to explain what it is and things it does.


   #1 above in the diagram states that "Pig is a dataflow language and Pig Latin is used to express it ". Does not make sense ? Let me take an extract from Apache's website 

A Pig Latin statement is an operator that takes a relation as input and produces another relation as output. (This definition applies to all Pig Latin operators except LOAD and STORE which read data from and write data to the file system.) Pig Latin statements can span multiple lines and must end with a semi-colon ( ; ). Pig Latin statements are generally organized in the following manner:
  1. A LOAD statement reads data from the file system - This is reading the data stored in the hard-drive for pig to be able to read or process or display in the terminal .
  2. A series of "transformation" statements process the data - Let us say this a business Logic being performed .
  3. A STORE statement writes output to the file system; or, a DUMP statement displays output to the screen - This is writing the data back to hard-disk .
Simple isn't it . If you are still not able to completely comprehend it . Move on , by the time you are done with reading the blog , you will have a clear understanding of what it is.

To read more , go to the website below .
http://pig.apache.org/docs/r0.7.0/piglatin_ref1.html#Overview
   #2 above in the diagram states that pig has 2 execution modes or exectypes:

  • Local Mode - This means you are running pig only on one machine and not in distributed environment with clusters.Local mode are started using the -x flag ,the syntax of which is given below.

     pig -x local  
    
  • Mapreduce Mode - This signifies that you need to have access to a Hadoop cluster and HDFS installation. Hadoop should already be running on your machine . How do we know that ?                                                                                                      Type jps as given below

     hadoop@ubuntu-server64:~/lab/install/hadoop-1.0.4/bin$ jps  
     2837 Jps  
     2255 SecondaryNameNode  
     1848 DataNode  
     2344 JobTracker  
     2590 TaskTracker  
     1606 NameNode  
     hadoop@ubuntu-server64:~/lab/install/hadoop-1.0.4/bin$  
Please note that all the above services might not be running. The datanode and Tasktracer generally run on the slave machines and so if you are not able to see them while executing the command jps , dont worry . That is just allright .
Mapreduce mode is the default mode; you can,but don't need to, specify it using the -x flag Pig can also start in local and mapreduce mode using the java command which we will not discuss it here.
The discussion on Pig does not progress towards logical conclusion if we do not discuss about running PIG in various modes.

Running PIG !!
Pig Latin and Pig Commands are executed in 2 modes 
    1.   Interactive Mode.
    2.   Batch Mode.

Interactive Mode.
Pig can be run in interactive mode using the Grunt shell. As shown above enter the PIG's grunt shell in either local or Mapreduce mode .Hope you have not forgotten it already . If you have , refer the above .
 grunt> transaction = LOAD 'retail/txn.csv' USING PigStorage(',') AS(txn_id,txn_dt,cust_id,amt:double,cat,sub_cat,adr1,adr2,trans_type);   
 grunt> dump transaction;   
Note : If you want to work with these statements refer to the sample data i have provided in my previous blog on Pig.

Batch Mode.
A group of pig latin statements can be grouped together and can be run together . This is more like packaging or creating procedures in PL/SQL .Or you can think of it as bunch of statements written together to perform a task.

Here is a piece of code . 
copy the below in a text file . Save it with a name , say "trans.pig" in your local directory wherever pig is installed . For example /user/data/trans.pig
 transactions = LOAD 'retail/txn.csv' USING PigStorage(',') AS(txn_id,txn_dt,cust_id,amt:double,cat,sub_cat,adr1,adr2,trans_type);  
 txn_100plus = FILTER transactions BY amt > 100.00;  
 txn_grpd = GROUP txn_100plus BY cat;  
 txn_cnt_bycat = FOREACH txn_grpd GENERATE group , COUNT(txn_100plus);  
 STORE txn_cnt_bycat INTO '/home/hadoop'  

Now how do we execute it ? Here goes the details
Local Mode:
 $ pig -x local trans.pig  
Mapreduce Mode:
 $ pig trans.pig  
 or  
 $ pig -x mapreduce trans.pig  

Comments in code :
Before we start our journey in exploring the code , it is always good to know how to comment a piece of code or a bunch of line. We used to learn this as a first thing in our college classes as during the practical test all we had to do was "at the least" to make the code error free and what better way then commenting the erroneous code.

Getting to the point , there 2 ways to comment a code in pig 

1.   single-line comments using --
2.   multi-line comments using /* …. */

Oracle pl/sql developers!! does this look exactly the same . Yes it is the same.
Let me give an example : 


  /* I am commenting this 3 lines below   
  transactions = LOAD 'retail/txn.csv' USING PigStorage(',') AS(txn_id,txn_dt,cust_id,amt:double,cat,sub_cat,adr1,adr2,trans_type);    
  txn_grpd = GROUP txn_100plus BY cat;   
  txn_cnt_bycat = FOREACH txn_grpd GENERATE group , COUNT(txn_100plus) commented till here */  
  STORE txn_cnt_bycat INTO '/home/hadoop' -- This part is commented again  

First thing whenever we start learning a new language is to understand the datatypes , lets quickly walk through it 

There are 2 data types in Pig 
       1.   Scalar 
  •        2.  Complex

  • Scalar datatype is very similar to other languages and these are 6 different types
    int              : They store 4 byte signed integer . Example : 102.   
    long           : They store 8 byte signed integer . Example : 500000000003.   
    float          : They store 4 byte value. In some calculation float loses its precision . This is obvious                  from its limited storage capacity .Example : 3.14 4.   
    double    They store 8 byte value . Example : 2.71828 or exponent format : 6.626e-34      chararray : A string or character array. Example : 'Hello' or  the value for Ctrl-A is expressed                        as \u0001.5.   
    bytearray A blob or array of bytes , There is no way to specify a bytearray.

    Dont worry too much if you are not able to understand 100% . That is fine , the interesting concepts are below .

    Complex data types are given below :
    Map : This is a key value pair (k,v) . The key is a chararray used as an index to find the element referred to as Value. Because PIG does not know the type of value , it will assume it as a bytearray .However the actual value might be of some other data type.We can CAST the value to a different data type , if we know the data type of the value.By default there is no requirement that all values in a map must be of the same type.It is legitimate to have a map with two keys name and age, where the value for name is a chararry and value for age is an int.Map is formed using square brackets [] . For example, ['name'#'bob', 'age'#55] will create a map with two keys, “name” and “age”.

    Tuple A tuple is a fixed-length, ordered collection of Pig data elements.Tuples are divided into fields, with each field containing one data element. These elements can be of any type—they do not all need to be the same type. A tuple is analogous to a row in SQL,with the fields being SQL columns.                                                                                                                Tuple constants use parentheses to indicate the tuple and commas to delimit fields in the tuple. For example, ('bob', 55) describes a tuple constant with two fields.

    Bag A bag is an unordered collection of tuples. Because it has no order, it is not possible to reference tuples in a bag by position.Bag constants are constructed using braces, with tuples in the bag separated by commas.For example, {('bob', 55), ('sally', 52), ('john', 25)} constructs a bag with three tuples, each with two fields. 

    Please NOTE : Bag is the one type in Pig that is not required to fit into memory.               Because bags are used to store collections when grouping(ahh, you might not understand it here, hold your breadth explainatio), bags can become quite large.Pig has the ability to spill bags to disk when necessary, keeping only partial sections of the bag in memory. The size of the bag is limited to the amount of local disk available for spilling the bag.

    Enough of theory isn't it , let's now work with some practical examples .Before we process , here is the data snapshot on which we will be performing our operations .The data has 3 fields as give below . 
    x: Name 
    y: Number
    z: Number
    Let us call this file as "foo.txt" and save this file in the default pig directory.
     Bill     10     20  
     Mark     5     5  
     Larry     2     2  
     Bill     5     5  
    
    Let us now login into the grunt shell.
     grunt> ls  
     hdfs://localhost:8020/user/hadoop/FIRST_NAME  <dir>  
     hdfs://localhost:8020/user/hadoop/NYSE_daily.txt<r 1>  1471331  
     hdfs://localhost:8020/user/hadoop/NYSE_dividends.txt<r 1>    17695  
     hdfs://localhost:8020/user/hadoop/TEMP_FIRST_NAME    <dir>  
     hdfs://localhost:8020/user/hadoop/foo.txt<r 1> 41  
     hdfs://localhost:8020/user/hadoop/out1 <dir>  
    
    Execute the below command to load the data from foo.txt to a relation A
     grunt> A = load 'foo.txt' as (x:chararray, y:int, z:int);  
     grunt> dump A;  
     /* The Command follows Map Reduce .There are lot of information between dump and resultset given below . I am skipping them */  
     (Bill,10,20)  
     (Mark,5,5)  
     (Larry,2,2)  
     (Bill,5,5)  
    
    Please note : The result set above is a tuple ( or it is similar to a row in SQL ).
    How do i access field level data(data for only x or y) from the above relation A ?
    If it were to be SQL , we would have written select x from <table_name> but in this case 
     grunt> B = foreach A generate x;  
     grunt> dump B  
     (Bill)  
     (Mark)  
     (Larry)  
     (Bill)  
    
    Most important concept here is to understand that each step of a pig statement generates a relation . The above statement FOREACH suggest that the entire set of records of the relation A be parsed one by one until the last record and field x be separated from it . This is then dumped into the terminal.
    Note : commands or statements are not case sensitive in pig . For Example load and LOAD are the same but relations are case sensitive . "A" above will not be the same as "a".
     grunt> C = group A by x;  
     grunt> dump C  
     (Bill,{(Bill,10,20),(Bill,5,5)})  
     (Mark,{(Mark,5,5)})  
     (Larry,{(Larry,2,2)})  
     grunt> describe C;  
     C: {group: chararray,A: {(x: chararray,y: int,z: int)}}  
    
    The result returned here is tuple with a bag inside it . Remember tuple is denoted by () and bag by {}.
    Look at the description of C . it clearly mentions that the grouping has been done only to consolidate data in a single bag .
    If we look at the data foo.txt , Bill appears twice and hence when we dump C , Bill as a group has 2 Bill inside it 
     (Bill,{(Bill,10,20),(Bill,5,5)})  
    
    This means that grouping in pig is entirely different from grouping in SQL . 
    Grouping in SQL needs an aggregate function whereas in pig it just brings together tuples with similar key or group by values .

    Going a little off-topic so as to explain how is SQL grouping different from Pig grouping .

    Here is my explanation ! Watch carefully 
    Let us consider the below table in SQL .Let us call it employee table 
     Name       Dept       Salary  
     Bill       10          100  
     Mark       10          200  
     Larry      20          500  
     King       20          50  
     George     30          1000  
    
    In sql, A grouping would look like below
     select dept,sum(salary) from employee group by dept  
    
    The result-set after firing the above query will look as given below 
     Dept      Salary  
     10          300  
     20          550  
     30          1000  
    
    Let us now load the same data to a pig relation and assume this is a comma separated value
     employee = load 'employee.csv' USING PigStorage(',') as (name:chararray, dept:int, salary:int);  
     dept_groupby = group employee by dept;  
     dump dept_groupby;  
     (10,{(bill,10,100),(Mark,10,200)})  
     (20,{(Larry,20,500),(King,20,50)})  
     (30,{(George,20,1000,)})  
    
    so it means that there is no aggregation happening , it is just grouping a set a tuple or records together . Hence pig's grouping and sql grouping are two different operation .

    Now you might ask , how is it that we aggregate or sum in pig . The group statement is not working ! Here is the answer for you . You sum it up mathematically . Don't understand my statement . follow the example below .
     C = foreach dept_groupby generate employee.dept,SUM(employee.salary);  
     dump C;  
     o/p   
     ({(10),(10)},300)  
     ({(20),(20)},550)  
     ({(30)},1000)  
    
    Looks a little different and difficult as well for someone who might be working his whole life on high level languages such as SQL but didn't we tell you that this is a DATA FLOW language. You have to take one step at a time , remember No skipping any step here . Get back to basics .

    Now the next question that follows is , what are the different types of relational operators available in pig ? Let us explore them one by one .
    Foreach

    Foreach takes each record from the relation , applies some expression ( sometimes it doesn't apply any expression at all  ) . From these expressions it generates new records to send down
    the pipeline to the next operator . Look at the example below .
     grunt> cat foo.txt  
     Bill   10   20  
     Mark    5    5  
     Larry   2    2  
     Bill    5    5  
     grunt> A = LOAD 'foo.txt' as ( name,x:int,y:int);  
     grunt> B = foreach A generate name;  
     grunt> dump B;  
     (Bill)  
     (Mark)  
     (Larry)  
     (Bill)  
    
    It is simple , isn't it . The Dump B generates only name field from the foo.txt data . 
    This was a basic example lets make it a little complex .
     C = foreach A generate x-y;  
    OR 
     D = foreach A generate $2-$1;
     (10)  
     (0)  
     (0)  
     (0)  
    
    we can either use the field variables that has been defined or use $0,$1 .. where $0 signifies the first field , $1 second field and so on.
    Some more foreach related operations .
     beginning = foreach A generate ..x; -- produces name and x . This means display all the data till the begining of x.  
     middle = foreach prices generate name..y; -- produces name and y . This means display all the data from name to y . In our case it is just 2 column(field) data.  
     end = foreach prices generate x..; -- produces x and y . This means display all the data from x till the end.In our case it is x and y.  
    

    UDFs in foreach :

    User Defined Functions (UDFs) can be invoked in foreach.These are called evaluation
    functions, or eval funcs. Because they are part of a foreach statement, these UDFs take
    one record at a time and produce one output. The important thing to note here is that although the eval functions take one argument , that one argument can be bag and hence indirectly we can pass more than one values to the function . Likewise for output of the function .

     A = load 'foo.txt' as (name:chararray, x:int, y:int);  
     D = foreach A generate UPPER(name) as name ,x;  
     E = group D by name;  
     F = foreach E generate group , sum(D.x);  
     o/p  
     014-01-23 02:26:30,254 [main] ERROR org.apache.pig.tools.grunt.Grunt - ERROR 1070: Could not resolve sum using imports: [, org.apache.pig.builtin., org.apache.pig.impl.builtin.]  
    
    Whoops !!!! why is it that i see error inspite of doing everything right ?? Any guesses ??

    The Reason is very simple and yet so very hard to remember . I failed almost for 30 mins before i could figure out what is the problem . UDF's are CASE SENSITIVE .Make sum to SUM
     F = foreach E generate group , SUM(D.x);  
     o/p  
     (BILL,15)  
     (MARK,5)  
     (LARRY,2)  
    
    Hell Yeah , it works now !!

    Filter :

    The filter statement allows you to select which records you want to retain in your data pipeline. You have all types of operators that can be used in filter such as == , != , >= etc . This is same as any other languages.

    Now some important points 

    1.   To use these operators with tuples , both tuples must either have same schema or no schema . 
    2.   None of these operators can be applied with bags .
    3.   Pig Latin follows the operator precedence that is standard in most programming languages. Remember BODMAS rule is maths . Yes it is just that .

    A SQL user/developers now knows that , this is nothing but "WHERE" in SQL queries . Now we ask , is there something like regular expression matching ?


    Yes there is , look at the example below .
     grunt> describe F;  
     F: {group: chararray,long}  
     grunt> G = filter F by $0 matches 'BI.*';  
     O/P  
     (BILL,15)  
    
    The describe on the Relation F above gives me a result which clearly shows that the fields has not been named and hence i have used $0 which gives me the first field .

    The ".*" above tells the engine to match everything after BI and hence the result is (BILL,15)

    Group : 

    Having mentioned multiple times above and also having explained once already . Group by shares its syntax with SQL . 
     grunt> C = group A by name;   
      grunt> dump C   
      (Bill,{(Bill,10,20),(Bill,5,5)})   
      (Mark,{(Mark,5,5)})   
      (Larry,{(Larry,2,2)})   
      grunt> describe C;   
      C: {group: chararray,A: {(name: chararray,y: int,z: int)}}   
    
    Look at what is happening here , Pig group only bring's together set of tuples sharing the same key as in this case all Bill's records is in one bag now .
    The difference with SQL is that , SQL needs an aggregator operation to be performed while grouping such as SUM,COUNT,AVG,MIN,MAX etc wheras in PIG ,GROUP literally mean the word . It just groups the data together .

    Few pointers : 
    1.   You can also group on multiple keys. 
    2.   You can also use all to group together all of the records in your pipeline.The record coming         out of group all has the chararray literal all as a key
     grunt> I = group A all ;  
     grunt> describe I;  
     I: {group: chararray,A: {(name: chararray,x: int,y: int)}}  
     grunt> dump I;  
     o/p  
     (all,{(Bill,10,20),(Mark,5,5),(Larry,2,2),(Bill,5,5)})  
    
    3.   group is the operator that usually will force a reduce phase.If the pipeline is in a map phase, this will force it to shuffle and then reduce. If the pipeline is already in a reduce, this will force it to pass through map, shuffle, and reduce phases again . 
    4.  group handles nulls in the same way that SQL handles them: by collecting all records with a null key into the same group

    After having discussed all these , Let us take a step back and understand why is PIG better for map-reduce .


    In Map-Reduce we often get skewed results , meaning reducers might get different load of data at the end of Map,shuffle and sort . The simple reason being some keys might have more values and some less 
    Just because you have specified that your job have 100 reducers, there is no reason to expect that the number of values per key will be distributed evenly.

    For example,

    suppose you have an index of web pages and you group by the base URL. Certain values
    such as yahoo.com(login.yahoo.com,in.answers.yahoo.com)are going to have far more entries than say indiatimes.com, which means that some reducers get far more data than others. 

    Now as we know that our MapReduce job is not finished (and any subsequent ones cannot start) until all our reducers have finished, this skew will significantly slow our processing. In some cases it might also not be impossible for one reducer to manage so much data. what do we do ?


    Pig has a number of ways that it tries to manage this skew to balance out the load across reducers. The one that applies to grouping is Hadoop’s combiner. Let's talk a little more about combiners.


    Combiners:
    Combiner gives applications a chance to apply their reducer logic early on . As the map phase writes output, it is serialized and placed into an in-memory buffer.When this buffer fills, MapReduce will sort the buffer and then run the combiner if the application has provided an implementation for it.The resulting output is then written to local disk, to be picked up by the shuffle phase and sent to the reducers.MapReduce might choose not to run the combiner if it determines it will be more efficient not to.

    A typical MapReduce will look like this .


    If you don't understand where a combiner will be placed in MapReduce , here is another diagram for you .A picture is worth a thousand words , isn't it ?

    Which means that combiner is doing the task of a reducer even before reducer comes into the picture .

    Now here is what you have been waiting for !


    Pig’s operators and built-in UDFs use the combiner whenever possible, because of its skew-reducing features and because early aggregation greatly reduces the amount of data shipped over the network and written to disk, thus speeding performance significantly.



    Order by :

    I believe we all understand Order by but let me explain very briefly as to what it is .Order statement sorts your data for you, producing a total order of your output data. Remember Sorting by maps, tuples, or bags produces errors

    For all data types, nulls are taken to be smaller than all possible values 
    for that type, and thus will always appear first (or last when desc is used).
     grunt> A = load 'foo.txt' as (name:chararray, x:int, y:int);  
     grunt> B = order A by name;  
     grunt> dump B;  
     o/p  
     (Bill,10,20)  
     (Bill,5,5)  
     (Larry,2,2)  
     (Mark,5,5)  
    

    Distinct :

    Distinct statement removes duplicate records .It work only on entire records , not on individual files.
     grunt> A = load 'foo.txt' as (name:chararray, x:int, y:int);  
     grunt> B = foreach A generate name;  
     grunt> C = distinct B;  
     o/p  
     (Bill)  
     (Mark)  
     (Larry)  
    

    Limit :

    Sometimes we want to see only a limited set of results for many purpose . Sometimes to see if we are getting the output or not .Limit helps us achieve that .
     grunt> D = limit C 2;  
     grunt> dump D;  
     o/p  
     (Bill)  
     (Mark)  
    
    A fact with limit with Pig is that inspite of we limiting the records ( in this example 2 ) . It will still read the entire dataset but just returns 2 records as result .Therefore it is not for performance .

    Except for Order , None of the relation operator in pig  guarantees to return the same result-set again if you execute the same query multiple times. Limit is no exception to the rule . In our example above ,if we do a limit againit might return different value these time. For example .
     grunt> D = limit C 2;   
      grunt> dump D;   
      o/p   
      (Larry)   
      (Mark)  
    
    This is how pig has been designed . 

    Sample : 

    Sample offers a simple way to get a sample of your data. It reads through all of your data but returns only a percentage of rows. What percentage it returns is expressed as a double value, between 0 and 1. So, in the following example, 0.1 indicates 10%:



    The percentage will not be an exact match,but close.
     grunt> E = sample A 0.4;  
    
    The sample A by 0.1 is rewritten to filter A by random() <= 0.1.

    Parallel

    One of Pig’s core claims is that it provides a language for parallel data processing.so Pig prefers that you tell it how parallel to be. To do this, it provides the parallel clause.
    The parallel clause can be attached to any relational operator in Pig Latin. However,it controls only reduce-side parallelism, so it makes sense only for operators that force a reduce phase.These are: group*, order, distinct, join*, limit, cogroup*, and cross.Operators marked with an asterisk have multiple implementations, some of which force a reduce and some which do not
     grunt> E = sample A 0.4;  
    
    In this example, parallel will cause the MapReduce job spawned by Pig to have 10
    reducers. parallel clauses apply only to the statement to which they are attached; they
    do not carry through the script.
    If, however, you do not want to set parallel separately for every reduce-invoking operator

    in your script, you can set a script-wide value using the set command
     grunt> set default_parallel 10;  
    
    I will talk about advanced operators in my next blog .Watch this space .

    No comments:

    Post a Comment