Enabling Hadoop Batch Processing Systems to Consume Streaming Data

At Yahoo!, the ability to analyze and process enormous amounts of data is increasingly important. It’s a foundational layer for improving our consumer experiences and for sharing audience insights with advertisers.

In the last few years, I have been a part of a project to design, build, and run a low-latency, large-scale, distributed event data collection system at Yahoo!. When we started off, the goal seemed relatively unambitious, to collect web-access event data across all of the web-servers across all the data centers and bring it to a central location for processing. This perception soon changed after we realized that this involved around 20000 machines and over 20 data centers across the world amounting to over 40 billion events per day that helped fill-up over 10 TB of disk space. To add to the mix, the data had to be available within 15 minutes with an expected completeness of 99% across trans-oceanic fiber optic cable.

We decided to collect the data in a streaming fashion. This enabled us to feed the data at very low latencies to stream processing applications. However, there was an existing batch processing application that required all the data for the entire day to be available with near 100% completeness.

In order to achieve this, the data was collected in a streaming fashion and put into files that contained events belonging to that particular time period, the default being a minute and hence called minute files. Once the data was collected for the minute, the minute files were closed and the data was made available to the consuming application using a queue. Now, this worked reasonably well when there was one application but had problems when the consuming application wanted to reprocess or perform partial updates. In addition to this, the queue essentially kept the consuming application state making the collection and processing systems tightly coupled. This made is increasingly hard when it came to supporting multiple applications because the state for each batch for each application needed to be kept.

This made it even more interesting when the Hadoop initiative at Yahoo! began. All batch processing application were now running on the grid while the data was still consumed by legacy applications. How would we feed the old and the new systems with the same data without duplication?

What we needed was a loosely coupled or completely decoupled method of communicating the files to be processed to downstream batch processing applications. The solution we came up with was a simple but elegant one called a List of Files (LoF) repository.

The LoF repository contains an entry for each minute file collected and its associated attributes such as the start time, the end time, the size in bytes, the number of events, the collection pipeline instance name, and other relevant data. An API to access this data called the LoF API was provided to be able to query the repository for a set of files that satisfied certain attribute constraints. For example, a query might request “all the files that belong to the period 12:00 to 12:05 collected from the sports web servers”. The repository did not need to keep state of which files this application had processed or maintain any queue. This allowed the application to maintain its state and multiple applications were as simple as the single application case. To simplify the usage the API was made available in the form of a RESTful web-service.

Different applications had different completeness requirements from the data collected. For example, a low-latency behavioral targeting application would typically be happy with 95% of the data within 1 minute of the data, while a revenue realization or tracking application would need 100% of the data within 15 minutes. In order to support this, the API returned a completeness metric along with the list of files returned to indicate the percentage of data the list represented. The application could use this information to commence processing based on its own completeness requirements.

Given the distributed nature of the web-servers, data was often delayed or unavailable due to network outages or temporary host unavailability. This meant that applications requiring higher levels of completeness were routinely delayed beyond their SLAs. To solve this we provided a simple timestamp based cursor facility to enable incremental processing. The cursor was essentially returned with the list to indicate the timestamp at which the list was generated. The subsequent query would provide the previously returned cursor along with the subsequent query to indicate the time of the last fetch and the query would return all the files later that that timestamp.

This is what the web-service request looks like:

The response to this is of the form:

collector1.yahoo.com 1234388520 1234 /col1/1200.gz
collector2.yahoo.com 1234388520 3232 /col2/1200.gz

A subsequent request to get incremental data would use the cursor timestamp returned to fetch additional files as follows:

which would get a response similar to:

collector1.yahoo.com 1234388580 3232 /col1/1201.gz

I would like to conclude by saying that the LoF API has enabled the same data to be made available to different application with varying completeness and latency requirements in a simple and elegant manner. Moreover, it has enabled the collection system, which uses a stream-based paradigm, to easily feed multiple largely batch-oriented systems in a relatively seamless manner. Keeping the design simple enabled us to solve a reasonably complex problem.

Akon Dey
Akon Dey

Architect, Event Data Collection System at Yahoo!