Tuesday, January 7, 2014

PIG Overview !!!!

Apache PIG !!

Point to ponder , why is pig sitting on Hadoop ?

What is PIG?


Apache Pig is a platform for analyzing large,very large sets of data . The next question generally asked is ,WHY PIG !! Aren't there other language that can do the same stuff .Traditionally large sets of data are analysed using data warehouse.In today's world Oracle/DB2 Databases are able to process them although they do take hours , days,weeks and sometimes forever. Sounds Deja vu. If you are a database developer you will concur with me . 

On top of that , think of the powerful servers that is being used to process these data . Think of the cost associated with it .If i were a small organization .Would i ever imagine processing large data using traditional tools and technologies! Probably not .

One simple reason why pig is gaining popularity is because Pig provides an engine for executing data flow in parallel on top of Hadoop as depicted in the picture as well as in standalone mode . 

Pig is a dataflow programming environment for processing very large files . 

DataFlow !! Now what is that ?

Pig is sequential in its approach , there is no control statement like IF and ELSE . That means the control cannot traverse from one statement to the next skipping a few in between .

A quick comparison between Pig and SQL .



One major difference and a very interesting difference between PIG and most other programming languages is :

Most programming language are compiled in which the below steps are performed :


1.  Lexical analysis

2.  Parsing
3.  Semantic Analysis
4.  Optimization
5.  Code generation.

The only thing that Pig does when compiled is to check whether the code is syntactically correct .

or do the first 3 steps Lexical Analysis , Parsing and Semantic Analysis .

For more details on compilers , please refer to the below :


https://prod-c2g.s3.amazonaws.com/CS143/Spring2013/files/lecture01.pdf?Signature=CT%2B6cKEwsgXlGMlWr2XG3CqW2F4%3D&Expires=1704596893&AWSAccessKeyId=AKIAIINO3Q3NXKJA2PXQ

Analysis of why we should use Pig .

  1. Pigs Eat Anything : Pig can operate on data and metadata . It can operate on data that is relational, nested, or unstructured. And it can easily be extended to operate on data beyond files, including key/value stores, databases, etc.
  2. Pigs Live Anywhere : Pig is intended to be a language for parallel data processing. It is not tied to one particular parallel framework. It has been implemented first on Hadoop, bit is not intended that to be only on Hadoop.
  3. Pigs fly : Pig processes data quickly
  4. Pigs Are Domestic Animals:
    • Pig is designed to be easily controlled and modified by its users.
    • Pig allows easy integration with Java or languages that can compile down to Java such as Jython. Pig Supports user defined load and store functions. If you do not understand this now . Kindly move on , you will understand them later.
    • Pig has an optimizer that rearranges some operations in Pig Latin scripts to give better performance, combines Map Reduce jobs together,etc. However, users can easily turn this optimizer off to prevent it from making changes that do not make sense in their situation.

Let us now analyze the basic difference  between PIG over Hadoop and SQL.

First and foremost Pig Latin should be a natural choice for constructing data pipelines . Now what the hell is data Pipeline ??

Wikipedia states that : pipeline is a set of data processing elements connected in series, where the output of one element is the input of the next one. The elements of a pipeline are often executed in parallel or in time-sliced fashion; in that case, some amount of buffer storage is often inserted between elements.

In simple terms we can imaging pipelines as water distribution system of a city . Numerous pipes merge into one big pipe carrying water and then it is distributed throughout the household.

If you are interested in reading more about pipe-lining , read from Wikipedia given below : 


1.   Pig Latin is procedural whereas SQL is declarative : Look at the example below 
SQL :  A chart below showing several of the SQL language elements that compose a single statement and hence it is declarative.


PIG : 
Sample data :  
File Name : txn.csv

 00031941,02-20-2011,4003793,110.97,Water Sports,Kitesurfing,Springfield,Illinois,credit  
 00031942,08-12-2011,4004260,037.22,Outdoor Recreation,Skateboarding,Indianapolis ,Indiana,cash  
 00031943,01-18-2011,4004986,188.70,Team Sports,Rugby,Santa Ana,California,credit  
 00031944,02-18-2011,4005434,035.64,Games,Board Games,Charlotte,North Carolina,cash  
 00031945,11-15-2011,4004357,126.87,Exercise & Fitness,Weightlifting Belts,Columbia,South Carolina,credit  
 00031946,02-11-2011,4008482,090.33,Exercise & Fitness,Cardio Machine Accessories,Newark,New Jersey,credit  
 00031947,01-21-2011,4000145,074.79,Team Sports,Soccer,Colorado Springs,Colorado,credit  

Code Snapshot : 
 transaction = LOAD 'retail/txn.csv' USING PigStorage(',') AS(txn_id,txn_dt,cust_id,amt:double,cat,sub_cat,adr1,adr2,trans_type);  

The above data will be loaded into a relation "transaction". Most of us coming from other technical background have the tendency to call "transaction" as variable which should actually be called a Relation.

This relation is somewhat similar to oracle's relation which Put simply: a "relation" is a table, the heading being the definition of the structure and the rows being the data.


Also interestingly every step in PIG returns a data set as given below :


Below is reduced data set to 10 as given below :
 grunt> transaction = LOAD 'retail/txn.csv' USING PigStorage(',') AS(txn_id,txn_dt,cust_id,amt:double,cat,sub_cat,adr1,adr2,trans_type);  
 grunt> limit_transaction = limit transaction 10;  #limiting the data to 10 
 grunt> dump limit_transaction  
 2014-01-06 22:55:03,568 [main] INFO org.apache.pig.tools.pigstats.ScriptState - Pig features used in the script: LIMIT  
 2014-01-06 22:55:03,890 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler - File concatenation threshold: 100 optimistic? false  
 2014-01-06 22:55:03,957 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer - MR plan size before optimization: 2  
 2014-01-06 22:55:03,958 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer - MR plan size after optimization: 2  
 2014-01-06 22:55:04,063 [main] INFO org.apache.pig.tools.pigstats.ScriptState - Pig script settings are added to the job  
 2014-01-06 22:55:04,098 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler - mapred.job.reduce.markreset.buffer.percent is not set, set to default 0.3  
 2014-01-06 22:55:04,101 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler - Setting Parallelism to 1  
 2014-01-06 22:55:04,102 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler - creating jar file Job5187351683488465485.jar  
 2014-01-06 22:55:08,236 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler - jar file Job5187351683488465485.jar created  
 2014-01-06 22:55:08,265 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler - Setting up single store job  
 2014-01-06 22:55:08,273 [main] INFO org.apache.pig.data.SchemaTupleFrontend - Key [pig.schematuple] is false, will not generate code.  
 2014-01-06 22:55:08,274 [main] INFO org.apache.pig.data.SchemaTupleFrontend - Starting process to move generated code to distributed cacche  
 2014-01-06 22:55:08,276 [main] INFO org.apache.pig.data.SchemaTupleFrontend - Setting key [pig.schematuple.classes] with classes to deserialize []  
 2014-01-06 22:55:08,370 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 1 map-reduce job(s) waiting for submission.  
 2014-01-06 22:55:08,838 [JobControl] INFO org.apache.hadoop.mapreduce.lib.input.FileInputFormat - Total input paths to process : 1  
 2014-01-06 22:55:08,839 [JobControl] INFO org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil - Total input paths to process : 1  
 2014-01-06 22:55:08,856 [JobControl] INFO org.apache.hadoop.util.NativeCodeLoader - Loaded the native-hadoop library  
 2014-01-06 22:55:08,857 [JobControl] WARN org.apache.hadoop.io.compress.snappy.LoadSnappy - Snappy native library not loaded  
 2014-01-06 22:55:08,861 [JobControl] INFO org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil - Total input paths (combined) to process : 2  
 2014-01-06 22:55:08,874 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 0% complete  
 2014-01-06 22:55:09,694 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - HadoopJobId: job_201401062108_0002  
 2014-01-06 22:55:09,694 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Processing aliases limit_transaction,transaction  
 2014-01-06 22:55:09,694 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - detailed locations: M: transaction[1,14],limit_transaction[2,20] C: R:  
 2014-01-06 22:55:09,694 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - More information at: http://localhost:50030/jobdetails.jsp?jobid=job_201401062108_0002  
 2014-01-06 22:55:27,792 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 25% complete  
 2014-01-06 22:55:36,838 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 33% complete  
 2014-01-06 22:55:42,864 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 50% complete  
 2014-01-06 22:55:49,426 [main] INFO org.apache.pig.tools.pigstats.ScriptState - Pig script settings are added to the job  
 2014-01-06 22:55:49,428 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler - mapred.job.reduce.markreset.buffer.percent is not set, set to default 0.3  
 2014-01-06 22:55:49,428 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler - Setting Parallelism to 1  
 2014-01-06 22:55:49,429 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler - creating jar file Job6292515056400790331.jar  
 2014-01-06 22:55:52,856 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler - jar file Job6292515056400790331.jar created  
 2014-01-06 22:55:52,866 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler - Setting up single store job  
 2014-01-06 22:55:52,867 [main] INFO org.apache.pig.data.SchemaTupleFrontend - Key [pig.schematuple] is false, will not generate code.  
 2014-01-06 22:55:52,868 [main] INFO org.apache.pig.data.SchemaTupleFrontend - Starting process to move generated code to distributed cacche  
 2014-01-06 22:55:52,868 [main] INFO org.apache.pig.data.SchemaTupleFrontend - Setting key [pig.schematuple.classes] with classes to deserialize []  
 2014-01-06 22:55:52,906 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 1 map-reduce job(s) waiting for submission.  
 2014-01-06 22:55:53,119 [JobControl] INFO org.apache.hadoop.mapreduce.lib.input.FileInputFormat - Total input paths to process : 1  
 2014-01-06 22:55:53,120 [JobControl] INFO org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil - Total input paths to process : 1  
 2014-01-06 22:55:53,122 [JobControl] INFO org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil - Total input paths (combined) to process : 1  
 2014-01-06 22:55:53,408 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - HadoopJobId: job_201401062108_0003  
 2014-01-06 22:55:53,409 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Processing aliases transaction  
 2014-01-06 22:55:53,409 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - detailed locations: M: C: R: transaction[-1,-1]  
 2014-01-06 22:55:53,410 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - More information at: http://localhost:50030/jobdetails.jsp?jobid=job_201401062108_0003  
 2014-01-06 22:56:10,004 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 75% complete  
 2014-01-06 22:56:33,334 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 100% complete  
 2014-01-06 22:56:33,337 [main] INFO org.apache.pig.tools.pigstats.SimplePigStats - Script Statistics:  
 HadoopVersion  PigVersion   UserId StartedAt    FinishedAt   Features  
 1.0.4  0.11.0 hadoop 2014-01-06 22:55:04   2014-01-06 22:56:33   LIMIT  
 Success!  
 Job Stats (time in seconds):  
 JobId  Maps  Reduces MaxMapTime   MinMapTIme   AvgMapTime   MedianMapTime  MaxReduceTime  MinReduceTime  AvgReduceTime  MedianReducetime    Alias Feature Outputs  
 job_201401062108_0002  2    1    9    9    9    9    15   15   15   15   limit_transaction,transaction  
 job_201401062108_0003  1    1    6    6    6    6    15   15   15   15   transaction       hdfs://localhost:8020/tmp/temp-833806321/tmp715439762,  
 Input(s):  
 Successfully read 20 records (8927 bytes) from: "hdfs://localhost:8020/user/hadoop/retail/txn.csv"  
 Output(s):  
 Successfully stored 10 records (1109 bytes) in: "hdfs://localhost:8020/tmp/temp-833806321/tmp715439762"  
 Counters:  
 Total records written : 10  
 Total bytes written : 1109  
 Spillable Memory Manager spill count : 0  
 Total bags proactively spilled: 0  
 Total records proactively spilled: 0  
 Job DAG:  
 job_201401062108_0002  ->   job_201401062108_0003,  
 job_201401062108_0003  
 2014-01-06 22:56:33,355 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Success!  
 2014-01-06 22:56:33,358 [main] INFO org.apache.pig.data.SchemaTupleBackend - Key [pig.schematuple] was not set... will not generate code.  
 2014-01-06 22:56:33,366 [main] INFO org.apache.hadoop.mapreduce.lib.input.FileInputFormat - Total input paths to process : 1  
 2014-01-06 22:56:33,367 [main] INFO org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil - Total input paths to process : 1  
 (00000000,06-26-2011,4007024,40.33,Exercise & Fitness,Cardio Machine Accessories,Clarksville,Tennessee,credit)  
 (00000001,05-26-2011,4006742,198.44,Exercise & Fitness,Weightlifting Gloves,Long Beach,California,credit)  
 (00000002,06-01-2011,4009775,5.58,Exercise & Fitness,Weightlifting Machine Accessories,Anaheim,California,credit)  
 (00000003,06-05-2011,4002199,198.19,Gymnastics,Gymnastics Rings,Milwaukee,Wisconsin,credit)  
 (00000004,12-17-2011,4002613,98.81,Team Sports,Field Hockey,Nashville ,Tennessee,credit)  
 (00000005,02-14-2011,4007591,193.63,Outdoor Recreation,Camping & Backpacking & Hiking,Chicago,Illinois,credit)  
 (00000006,10-28-2011,4002190,27.89,Puzzles,Jigsaw Puzzles,Charleston,South Carolina,credit)  
 (00000007,07-14-2011,4002964,96.01,Outdoor Play Equipment,Sandboxes,Columbus,Ohio,credit)  
 (00000008,01-17-2011,4007361,10.44,Winter Sports,Snowmobiling,Des Moines,Iowa,credit)  
 (00000009,05-17-2011,4004798,152.46,Jumping,Bungee Jumping,St. Petersburg,Florida,credit)  

2.   Pig Latin allows pipeline developers to decide where to checkpoint data(save the data ) in the pipeline:  Pig allow storage of data at every point .That way, when a failure occurs, the whole pipeline does not have to be rerun. This is done using LOAD and STORE command . I will talk about the 

 STORE limit_transaction INTO 'retail/txn_10.csv' USING PigStorage ('*');  

This will store the data in hdfs location :

 hdfs://localhost:8020/user/hadoop/retail/txn_10.csv  

This is how the data will be displayed :
 grunt> cat hdfs://localhost:8020/user/hadoop/retail/txn_10.csv  
 00000000*06-26-2011*4007024*40.33*Exercise & Fitness*Cardio Machine Accessories*Clarksville*Tennessee*credit  
 00000001*05-26-2011*4006742*198.44*Exercise & Fitness*Weightlifting Gloves*Long Beach*California*credit  
 00000002*06-01-2011*4009775*5.58*Exercise & Fitness*Weightlifting Machine Accessories*Anaheim*California*credit  
 00000003*06-05-2011*4002199*198.19*Gymnastics*Gymnastics Rings*Milwaukee*Wisconsin*credit  
 00000004*12-17-2011*4002613*98.81*Team Sports*Field Hockey*Nashville *Tennessee*credit  
 00000005*02-14-2011*4007591*193.63*Outdoor Recreation*Camping & Backpacking & Hiking*Chicago*Illinois*credit  
 00000006*10-28-2011*4002190*27.89*Puzzles*Jigsaw Puzzles*Charleston*South Carolina*credit  
 00000007*07-14-2011*4002964*96.01*Outdoor Play Equipment*Sandboxes*Columbus*Ohio*credit  
 00000008*01-17-2011*4007361*10.44*Winter Sports*Snowmobiling*Des Moines*Iowa*credit  
 00000009*05-17-2011*4004798*152.46*Jumping*Bungee Jumping*St. Petersburg*Florida*credit  

3.   Pig Latin allows the developer to select specific operator implementations directly rather than relying on the optimizer : By definition, a declarative language allows the developer to specify what must be done, not how it is done.Thus in SQL , the users can specify the data from the two tables must be joined but not what join implementation to use. This means that the optimizer is free to choose the best algorithm or the shortest path algorithm it deems fit to fetch the data for the given statement .

SQL developers , look at explain plan to see what an optimizer does but Pig keeps it simple , it will just do what you ask it to do .

Currently Pig supports four different join implementations and two grouping implementations. It also allows users to specify parallelism of operations inside a Pig Latin script, and does not require that every operator in the script have the same parallelization factor. This is important because data sizes often grow and shrink as data flows through the pipeline . This will be discussed later in detail.

4.   Pig Latin supports splits in the pipeline : As evident in the data splits below. Watch the operators and data splits below .
 transaction = LOAD 'retail/txn.csv' USING PigStorage(',') AS(txn_id,txn_dt,cust_id,amt:double,cat,sub_cat,adr1,adr2,trans_type);  
 less_than_100 = FILTER transaction BY amt < 100.00;  
 greater_than_100 = FILTER transaction BY amt > 100.00;  
 equal_to_100 = FILTER transaction BY amt = 100.00;  

5   Pig Latin allows developers to insert their own code almost anywhere in the data pipeline : I will discuss this later when i talk about UDF's ( dont be perplexed by it ,UDF means an ordinary function in Oracle or C or Java )

Before i proceed to the PIG scripting , i would like to briefly talk about the difference between PIG and HIVE . For those of you who do not know HIVE , Please ignore this para.

Read this wonderful blog : http://developer.yahoo.com/blogs/hadoop/pig-hive-yahoo-464.html

Taking a cue from the blog :

Hive and Pig both are used for data processing but have distinct functionality.
Data processing often splits into three separate tasks: data collection, data preparation, and data presentation

The data preparation phase is often known as ETL (Extract Transform Load) or the data factory. Raw data is loaded in, cleaned up, conformed to the selected data model, joined with other data sources, and so on. PIG is apt for these operations.

The data presentation phase is usually referred to as the data warehouse. A warehouse stores products ready for consumers; they need only come and select the proper products off of the shelves. HIVE is good here.

My Next Blog on PIG will cover PIG scripting in details . 

No comments:

Post a Comment