Comparing Pig Latin and SQL for Constructing Data Processing Pipelines

I have been asked by users who are going to construct a data pipeline whether they should use Pig Latin or SQL.

For those of you who are not familiar with Pig, it is a platform for analyzing large data sets. It is built on Hadoop and provides ease of programming, optimization opportunities and extensibility. Pig Latin is the relational data-flow language and is one of the core aspects of Pig.

In this blog I refer to "data pipeline" as the means by which applications that take data from one or more sources, cleanse it, do some initial transformation on it that all the data
readers will need, and then store it in a data warehouse . As SQL is known by almost everyone, it is often chosen as the language in which to write
these data pipelines.

We are comparing Pig Latin over Hadoop to SQL over a relational database.

SQL's ubiquity is convenient. However, I believe that Pig Latin is a more natural choice for constructing data pipelines, for several reasons:

  1. Pig Latin is procedural, where SQL is declarative.
  2. Pig Latin allows pipeline developers to decide where to checkpoint data in the pipeline.
  3. Pig Latin allows the developer to select specific operator implementations directly rather than relying on the optimizer.
  4. Pig Latin supports splits in the pipeline.
  5. Pig Latin allows developers to insert their own code almost anywhere in the data pipeline.

I will consider each of these points in turn.

Pig Latin is Procedural

Since Pig Latin is procedural, it fits very naturally in the pipeline paradigm. SQL on the other hand is declarative. Consider, for
example, a simple pipeline, where data from
sources users and clicks is to be joined and filtered, and then joined to data from a third source geoinfo and aggregated and finally
stored into a table ValuableClicksPerDMA. In SQL this could be written as:

insert into ValuableClicksPerDMA
select dma, count(*)
from geoinfo join (
select name, ipaddr
from users join clicks on ( = clicks.user)
where value > 0;
) using ipaddr
group by dma;

The Pig Latin for this will look like:

Users                = load 'users' as (name, age, ipaddr);
Clicks               = load 'clicks' as (user, url, value);
ValuableClicks       = filter Clicks by value > 0;
UserClicks           = join Users by name, ValuableClicks by user;
Geoinfo              = load 'geoinfo' as (ipaddr, dma);
UserGeo              = join UserClicks by ipaddr, Geoinfo by ipaddr;
ByDMA                = group UserGeo by dma;
ValuableClicksPerDMA = foreach ByDMA generate group, COUNT(UserGeo);
store ValuableClicksPerDMA into 'ValuableClicksPerDMA';

Notice how SQL forces the pipeline to be written inside-out, with operations that need to happen first happening in the from clause sub-query.
Of course this can be resolved with the use of intermediate or temporary tables. Then the pipeline becomes a disjointed set of SQL
queries where ordering is only apparent by looking at a master script (written in some other language) that sews all the SQL together. Also,
depending on how the
database handles temporary tables, there may be cleanup issues to deal with. In contrast, Pig Latin shows users exactly the data flow,
without forcing them to either think inside out or construct a set of temporary tables and manage how those tables are used between
different SQL queries.

The pipeline given above is obviously simple and contrived. It consists of only two very simple steps. In practice data pipelines at large
organizations are often quite complex, if each Pig Latin script spans ten steps then the number of scripts to manage in source control, code maintenance, and the workflow specification drops by an order of magnitude.

Checkpointing Data

Experienced data pipeline developers will object to the point above about Pig Latin not needing temporary tables. They will note that storing
data in between operations has the advantage of check pointing data in the
pipeline. That way, when a failure occurs, the whole pipeline does not have to be rerun. This is true. Pig Latin allows users to store data at
any point in the pipeline without disrupting the pipeline execution. The advantage that Pig Latin provides is that pipeline developers decide where appropriate
checkpoints are in their pipeline rather than being forced to checkpoint wherever the semantics of SQL imposes it. So, if for the above pipeline
there was a need to store data after the second join (UserGeo) and before the group by (ByDMA), the script could be changed to:

Users                = load 'users' as (name, age, ipaddr);
Clicks               = load 'clicks' as (user, url, value);
ValuableClicks       = filter Clicks by value > 0;
UserClicks           = join Users by name, ValuableClicks by user;
Geoinfo              = load 'geoinfo' as (ipaddr, dma);
UserGeo              = join UserClicks by ipaddr, Geoinfo by ipaddr;
store UserGeo into 'UserGeoIntermediate';
ByDMA                = group UserGeo by dma;
ValuableClicksPerDMA = foreach ByDMA generate group, COUNT(UserGeo);
store ValuableClicksPerDMA into 'ValuableClicksPerDMA';

This would result in no additional Map Reduce jobs. Pig would store the intermediate data after the aggregation and continue with the
join as before.

Faith in the Optimizer

By definition, a declarative language allows the developer to specify what must be done, not how it is done. Thus in SQL users can specify
that data from two tables must be joined, but not what join implementation to use. Developers are forced to have faith that the optimizer
will make the right choice for them. Some databases work around this by allowing hints
to be given to the optimizer, but even then the implementation is not required to follow those hints.

While for many SQL applications the query writer may not have enough knowledge of the data or enough expertise to specify an appropriate join
algorithm, this is not usually the case for data pipelines. Data flowing through data pipelines does not tend to vary significantly from run to
run, in terms either of volume or key distribution. In addition data pipeline developers are usually sophisticated enough to choose the
correct algorithm. For these reasons allowing developers to explicitly choose an implementation, and be guaranteed that their choice will be
honored, is quite useful in data pipelines.

Pig Latin allows users to specify an implementation or aspects of an implementation to be used in executing a script in several ways. For
joins and grouping operations users
can specify an implementation to use, and Pig guarantees that it will use that implementation. Currently Pig supports four different join
implementations and two grouping implementations. It also allows users to specify parallelism of operations inside a Pig Latin script, and
does not require that every operator in the script have the same parallelization factor. This is important because data sizes often grow and
shrink as data flows through the pipeline.

Splits in Pipelines

Another common feature of data pipelines is that they are often graphs (DAGs) and not linear pipelines. SQL, however, is oriented
around queries that produce a single result. Thus SQL handles trees (such as joins) naturally, but has no built in mechanism for
splitting a data processing stream and applying different operators to each sub-stream. A very common use case we have seen in Yahoo! is a
desire to read one data set in a pipeline and group it by multiple different
grouping keys and store each as separate output. Since disk reads and writes (both scan time and intermediate results) usually dominate
processing of large data sets, reducing the number of times data must be written to and read from disk is crucial to good performance.

Take for example a user data set where there is a desire to analyze the data set both in geographic and demographic dimensions. The Pig
Latin to do this analysis looks like:

Users         = load 'users' as (name, age, gender, zip);
Purchases     = load 'purchases' as (user, purchase_price);
UserPurchases = join Users by name, Purchases by user;
GeoGroup      = group UserPurchases by zip;
GeoPurchase   = foreach GeoGroup generate group, SUM(UserPurchases.purchase_price) as sum;
ValuableGeos  = filter GeoPurchase by sum > 1000000;
store ValuableGeos into 'byzip';
DemoGroup     = group UserPurchases by (age, gender);
DemoPurchases = foreach DemoGroup generate group, SUM(UserPurchases.purchase_price) as sum;
ValuableDemos = filter DemoPurchases by sum > 100000000;
store ValuableDemos into 'byagegender';

This Pig Latin script describes a DAG rather than a pipeline. It starts with two inputs which are brought into one stream (via join)
which is then split into two streams. Pig will do
this in two Map Reduce jobs (one for the join and one for both group bys and their filters) rather than requiring that the join be either
run twice or materialized as an intermediate result as traditional SQL would.

Inserting Developer Code

Pig Latin's ability to include user code at any point in the pipeline is useful for pipeline development. This is accomplished through user
defined functions (UDFs) and streaming. UDFs allow users to specify how data is loaded, how it is stored, and how it is processed.
Streaming allows users to include executables at any point in the data flow.

Allowing developers to specify how data is loaded is useful because in most data pipelines data sources are not database tables. If SQL is
used, data must first be imported into the database, and then the cleansing and transformation process can begin. There are many ETL tools
on the market to handle this import process for databases. Pig allows developers to write a function in Java to read data directly from the
source. This eliminates the need for a second tool which must be purchased, learned, and used and allows the data pipeline to combine the
loading and initial cleansing and transformation steps.

Pipelines also often include user defined column transformation functions and user defined aggregations. Pig Latin
supports writing both of these types of functions in Java. We plan to extend that to a number of scripting languages in
the near future, thus enabling users to easily write UDFs in the language of their choice.

If the user defined code will not fit well into a UDF, streaming allows pipelines to place an executable in the pipeline at any point. This
can also be used to include legacy functionality that cannot be modified.

To conclude, I hope you will agree with me that these advantages of an intuitive, procedural programming model, control of where data is check pointed in the pipeline,
the ability to completely control how data is processed, support for general DAGs, and the ability to include user code wherever necessary
make Pig Latin a better choice for developing data pipelines on Hadoop.

Alan Gates, Architect

Pig Development Team, Yahoo!