Benchmarking Streaming Computation Engines at Yahoo!
<p><a class="tumblr_blog" href="http://yahooeng.tumblr.com/post/135321837876" target="_blank">yahooeng</a>:</p>
<blockquote>
<p><i>(Yahoo Storm Team in alphabetical order) <a href="https://github.com/redsanket" target="_blank">Sanket Chintapalli</a>, <a href="https://github.com/d2r/" target="_blank">Derek Dagit</a>, <a href="https://github.com/revans2" target="_blank">Bobby Evans</a>, <a href="https://github.com/rfarivar" target="_blank">Reza Farivar</a>, <a href="https://github.com/tgravescs" target="_blank">Tom Graves</a>, Mark Holderbaugh, <a href="https://github.com/zhuoliu" target="_blank">Zhuo Liu</a>, <a href="https://github.com/knusbaum" target="_blank">Kyle Nusbaum</a>, <a href="https://github.com/kishorvpatil/" target="_blank">Kishorkumar Patil</a>, <a href="https://github.com/jerrypeng" target="_blank">Boyang Jerry Peng</a> and <a href="https://github.com/ppoulosk" target="_blank">Paul Poulosky.</a></i><br/></p>
<p><i><b>Executive Summary - Due to a lack of real-world streaming benchmarks, we developed <a href="https://github.com/yahoo/streaming-benchmarks" target="_blank">one</a> to compare Apache Flink, Apache Storm and Apache Spark Streaming. Storm 0.10.0, 0.11.0-SNAPSHOT and Flink 0.10.1 show sub- second latencies at relatively high throughputs with Storm having the lowest 99th percentile latency. Spark streaming 1.5.1 supports high throughputs, but at a relatively higher latency. </b></i><br/></p>
<p>At Yahoo, we have invested heavily in a number of open source big data platforms that we use daily to support our business. For streaming workloads, our platform of choice has been Apache Storm, which replaced our internally developed S4 platform. We have been using Storm extensively, and the number of nodes running Storm at Yahoo has now reached about 2,300 (and is still growing).</p>
<p>Since our initial decision to use Storm in 2012, the streaming landscape has changed drastically. There are now several other noteworthy competitors including Apache Flink, Apache Spark (Spark Streaming), Apache Samza, Apache Apex and Google Cloud Dataflow. There is increasing confusion over which package offers the best set of features and which one performs better under which conditions (for instance see <a href="http://www.altaterra.net/blogpost/288668/225612/Which-Stream-Processing-Engine-Storm-Spark-Samza-or-Flink" target="_blank"></a><a href="http://www.altaterra.net/blogpost/288668/225612/Which-Stream-Processing-Engine-Storm-Spark-Samza-or-Flink" target="_blank">here</a>, <a href="http://data-artisans.com/high-throughput-low-latency-and-exactly-once-stream-processing-with-apache-flink/" target="_blank">here</a>, <a href="https://tsicilian.wordpress.com/2015/02/16/streaming-big-data-storm-spark-and-samza/" target="_blank"></a><a href="https://tsicilian.wordpress.com/2015/02/16/streaming-big-data-storm-spark-and-samza/" target="_blank">here</a>, and <a href="http://www.altaterra.net/blogpost/288668/225578/Open-Source-Packages-and-NoSQL-Now-Conference-in-San-Jose-Part-2?hhSearchTerms=%22flink%22&terms=" target="_blank">here</a>).</p>
<p>To provide the best streaming tools to our internal customers, we wanted to know what Storm is good at and where it needs to be improved compared to other systems. To do this we started to look for stream processing benchmarks that we could use to do this evaluation, but all of them were lacking in several fundamental areas. Primarily, they did not test with anything close to a real world use case. So we decided to write one and released it as open source <a href="https://github.com/yahoo/streaming-benchmarks" target="_blank">https://github.com/yahoo/streaming-benchmarks</a>. In our initial evaluation we decided to limit our test to three of the most popular and promising platforms (Storm, Flink and Spark), but welcome contributions for other systems, and to expand the scope of the benchmark.</p>
<h2><b>Benchmark Design</b></h2>
<p>The benchmark is a simple advertisement application. There are a number of advertising campaigns, and a number of advertisements for each campaign. The job of the benchmark is to read various JSON events from Kafka, identify the relevant events, and store a windowed count of relevant events per campaign into Redis. These steps attempt to probe some common operations performed on data streams.</p>
<p>The flow of operations is as follows (and shown in the following figure):</p>
<ol><li>Read an event from Kafka.</li>
<li>Deserialize the JSON string.</li>
<li>Filter out irrelevant events (based on event_type field)</li>
<li>Take a projection of the relevant fields (ad_id and event_time)</li>
<li>Join each event by ad_id with its associated campaign_id. This information is stored in Redis.</li>
<li>Take a windowed count of events per campaign and store each window in Redis along with a timestamp of the time the window was last updated in Redis. This step must be able to handle late events.<figure class="tmblr-full" data-orig-height="400" data-orig-width="540" data-orig-src="https://66.media.tumblr.com/0f9d705bb81a1b11fb6d52da41af5972/tumblr_inline_nzet5po95h1spdvr2_540.png"><img alt="" src="https://66.media.tumblr.com/71b4117e1e3915ba349e80e9531ead08/tumblr_inline_pecnpl0ixG1t17fny_540.png" data-orig-height="400" data-orig-width="540" data-orig-src="https://66.media.tumblr.com/0f9d705bb81a1b11fb6d52da41af5972/tumblr_inline_nzet5po95h1spdvr2_540.png"/></figure></li>
</ol><p>The input data has the following schema:</p>
<ul><li>
<i>user_id</i>: UUID</li>
<li>
<i>page_id</i>: UUID</li>
<li>
<i>ad_id</i>: UUID</li>
<li>
<i>ad_type</i>: String in {banner, modal, sponsored-search, mail, mobile}</li>
<li>
<i>event_type</i>: String in {view, click, purchase}</li>
<li>
<i>event_time</i>: Timestamp</li>
<li>
<i>ip_address</i>: String</li>
</ul><p>Producers create events with timestamps marking creation time. Truncating this timestamp to a particular digit gives the <i>begin-time</i> of the time window the event belongs in. In Storm and Flink, updates to Redis are written periodically, but frequently enough to meet a chosen SLA. Our SLA was 1 second, so once per second we wrote updated windows to Redis. Spark operated slightly differently due to great differences in its design. There’s more details on that in the Spark section. Along with the data, we record the time at which each window in Redis was last updated.</p>
<p>After each run, a utility reads windows from Redis and compares the windows’ times to their <i>last_updated_at</i> times, yielding a latency data point. Because the last event for a window cannot have been emitted after the window closed but will be very shortly before, the difference between a window’s time and its last_updated_at time minus its duration represents the time it took for the final tuple in a window to go from Kafka to Redis through the application.</p>
<p><i>window.final_event_latency = (window.last_updated_at – window.timestamp) – window.duration</i></p>
<p>This is a bit rough, but this benchmark was not purposed to get fine-grained numbers on these engines, but to provide a more high-level view of their behavior.</p>
<p>Benchmark setup</p>
<ul><li>10 second windows</li>
<li>1 second SLA</li>
<li>100 campaigns</li>
<li>10 ads per campaign</li>
<li>5 Kafka nodes with 5 partitions</li>
<li>1 Redis node</li>
<li>10 worker nodes (not including coordination nodes like Storm’s Nimbus)</li>
<li>5-10 Kafka producer nodes</li>
<li>3 ZooKeeper nodes</li>
</ul><p>Since the Redis node in our architecture only performs in-memory lookups using a well-optimized hashing scheme, it did not become a bottleneck. The nodes are homogeneously configured, each with two Intel E5530 processors running at 2.4GHz, with a total of 16 cores (8 physical, 16 hyperthreading) per node. Each node has 24GiB of memory, and the machines are all located within the same rack, connected through a gigabit Ethernet switch. The cluster has a total of 40 nodes available.</p>
<p>We ran multiple instances of the Kafka producers to create the required load since individual producers begin to fall behind at around 17,000 events per second. In total, we use anywhere between 20 to 25 nodes in this benchmark.</p>
<p>The use of 10 workers for a topology is near the average number we see being used by topologies internal to Yahoo. Of course, our Storm clusters are larger in size, but they are multi-tenant and run many topologies.</p>
<p>To begin the benchmarks Kafka is cleared, Redis is populated with initial data (<i>ad_id</i> to <i>campaign_id</i> mapping), the streaming job is started, and then after a bit of time to let the job finish launching, the producers are started with instructions to produce events at a particular rate, giving the desired aggregate throughput. The system was left to run for 30 minutes before the producers were shut down. A few seconds were allowed for all events to be processed before the streaming job itself was stopped. The benchmark utility was then run to generate a file containing a list of <i>window.last_updated_at – window.timestamp</i> numbers. These files were saved for each throughput we tested and then were used to generate the charts in this document.</p>
<h2><b>Flink</b></h2>
<p>The benchmark for Flink was implemented in Java by using Flink’s DataStream API. The Flink DataStream API has many similarities to Storm’s streaming API. For both Flink and Storm, the dataflow can be represented as a directed graph. Each vertex is a user defined operator and each directed edge represents a flow of data. Storm’s API uses spouts and bolts as its operators while Flink uses map, flatMap, as well as many pre-built operators such as filter, project, and reduce. Flink uses a mechanism called checkpointing to guarantee processing. Unless checkpointing is used in the Flink job, Flink offers at most once processing similar to Storm with acking turned on. For the Flink benchmark we did not use checkpointing. Notable configs we used in Flink is listed below:</p>
<ul><li>taskmanager.heap.mb: 15360</li>
<li>taskmanager.numberOfTaskSlots: 16</li>
</ul><p>The Flink version of the benchmark uses the FlinkKafkaConsumer to read data in from Kafka. The data read in from Kafka—which is in a JSON formatted string— is then deserialized and parsed by a custom defined flatMap operator. Once deserialized, the data is filtered via a custom defined filter operator. Afterwards, the filtered data is projected by using the project operator. From there, the data is joined with data in Redis by a custom defined flapMap function. Lastly, the final results are calculated from the data and written to redis.</p>
<p>The rate at which Kafka emitted data events into the Flink benchmark is varied from 50,000 events/sec to 170,000 events/sec. For each Kafka emit rate, the percentile latency for a tuple to be completely processed in the Flink benchmark is illustrated in the graph below.
</p>
<figure class="tmblr-full" data-orig-height="365" data-orig-width="540" data-orig-src="https://66.media.tumblr.com/ad625d5307b79ab4fab204c7fde1b733/tumblr_inline_nzet5oeYfc1spdvr2_540.png"><img alt="" src="https://66.media.tumblr.com/a32a2d127b8a07ec02b12fd18a77ee41/tumblr_inline_pecnpmUhEO1t17fny_540.png" data-orig-height="365" data-orig-width="540" data-orig-src="https://66.media.tumblr.com/ad625d5307b79ab4fab204c7fde1b733/tumblr_inline_nzet5oeYfc1spdvr2_540.png"/></figure><p>The percentile latency for all Kafka emit rates are relatively the same. The percentile latency rises linearly until around the 99th percentile, where the latency appears to increase exponentially. </p>
<h2><b>Spark</b></h2>
<p>For the Spark benchmark, the code was written in Scala. Since the micro-batching methodology of Spark is different than the pure streaming nature of Storm, we needed to rethink parts of the benchmark. Storm and Flink benchmarks would update the Redis database once a second to try and meet our SLA, keeping the intermediate update values in a local cache. As a result, the batch duration in the Spark streaming version was set to 1 second, at least for smaller amounts of traffic. We had to increase the batch duration for larger throughputs.</p>
<p>The benchmark is written in a typical Spark style using DStreams. DStreams are the streaming equivalent of regular RDDs, and create a separate RDD for every micro batch. Note that in the subsequent discussion, we use the term “RDD” instead of “DStream” to refer to the RDD representation of the DStream in the currently active microbatch. Processing begins with the direct Kafka consumer included with Spark 1.5. Since the Kafka input data in our benchmark is stored in 5 partitions, this Kafka consumer creates a DStream with 5 partitions as well. After that, a number of transformations are applied on the DStreams, including maps and filters. The transformation involving joining data with Redis is a special case. Since we do not want to create a separate connection to Redis for each record, we use a mapPartitions operation that can give control of a whole RDD partition to our code. This way, we create one connection to Redis and use this single connection to query information from Redis for all the events in that RDD partition. The same approach is used later when we update the final results in Redis.</p>
<p>It should be noted that our writes to Redis were implemented as a side-effect of the execution of the RDD transformation in order to keep the benchmark simple, so this would not be compatible with exactly-once semantics.</p>
<p>We found that with high enough throughput, Spark was not able to keep up. At 100,000 messages per second the latency greatly increased. We considered adjustments along two control dimensions to help Spark cope with increasing throughput.</p>
<p>The first is the microbatch duration. This is a control dimension that is not present in a pure streaming system like Storm. Increasing the duration increases latency while reducing overhead and therefore increasing maximum throughput. The challenge is that the choice of the optimal batch duration that minimizes latency while allowing spark to handle the throughput is a time consuming process. Essentially, we have to set a batch duration, run the benchmark for 30 minutes, check the results and decrease/increase the duration.</p>
<p>The second dimension is parallelism. However, increasing parallelism is simpler said than done in the case of Spark. For a true streaming system like Storm, one bolt instance can send its results to any number of subsequent bolt instances by using a random shuffle. To scale, one can increase the parallelism of the second bolt. In the case of a micro batch system like Spark, we need to perform a reshuffle operation similar to how intermediate data in a Hadoop MapReduce program are shuffled and merged across the cluster. But the reshuffling itself introduces considerable overhead. Initially, we thought our operations were CPU-bound, and so the benefits of reshuffling to a higher number of partitions would outweigh the cost of reshuffling. Instead, we found the bottleneck to be scheduling, and so reshuffling only added overhead. We suspect that at higher throughput rates or with operations that are CPU-bound, the reverse would be true.</p>
<p>The final results are interesting. There are essentially three behaviors for a Spark workload depending on the window duration. First, if the batch duration is set sufficiently large, the majority of the events will be handled within the current micro batch. The following figure shows the resulting percentile processing graph for this case (100K events, 10 seconds batch duration).
</p>
<figure class="tmblr-full" data-orig-height="370" data-orig-width="540" data-orig-src="https://66.media.tumblr.com/9f1f17e7ea760b5a1fc65f135b426a46/tumblr_inline_nzet5rxBg81spdvr2_540.png"><img alt="" src="https://66.media.tumblr.com/9a8a7ef36080b0c49dbd874dbce82e82/tumblr_inline_pecnpm7Xbp1t17fny_540.png" data-orig-height="370" data-orig-width="540" data-orig-src="https://66.media.tumblr.com/9f1f17e7ea760b5a1fc65f135b426a46/tumblr_inline_nzet5rxBg81spdvr2_540.png"/></figure><p>But whenever 90% of events are processed in the first batch, there is possibility of improving latency. By reducing the batch duration sufficiently, we get into a region where the incoming events are processed within 3 or 4 subsequent batches. This is the second behavior, in which the batch duration puts the system on the verge of falling behind, but is still manageable, and results in better latency. This situation is shown in the following figure for a sample throughput rate (100K events, 3 seconds batch duration).
</p>
<figure class="tmblr-full" data-orig-height="366" data-orig-width="540" data-orig-src="https://66.media.tumblr.com/f6f8e5d5a0e6d621e820bc0cffba9b2a/tumblr_inline_nzet5qLz2X1spdvr2_540.png"><img alt="" src="https://66.media.tumblr.com/f70a928da49ce4580c85a68e7210929c/tumblr_inline_pecnpma4NJ1t17fny_540.png" data-orig-height="366" data-orig-width="540" data-orig-src="https://66.media.tumblr.com/f6f8e5d5a0e6d621e820bc0cffba9b2a/tumblr_inline_nzet5qLz2X1spdvr2_540.png"/></figure><p>Finally, the third behavior is when Spark streaming falls behind. In this case, the benchmark takes a few minutes after the input data finishes to process all of the events. This situation is shown in the following figure. Under this undesirable operating region, Spark spills lots of data onto disks, and in extreme cases we could end up running out of disk space.</p>
<p>One final note is that we tried the new back pressure feature introduced in Spark 1.5. If the system is in the first operating region, enabling back pressure does nothing. In the second operating region, enabling back pressure results in longer latencies. The third operating region is where back pressure shows the most negative impact. It changes the batch length, but Spark still cannot cope with the throughput and falls behind. This is shown in the next figures. Our experiments showed that the current back pressure implementation did not help our benchmark, and as a result we disabled it.
</p>
<figure class="tmblr-full" data-orig-height="369" data-orig-width="540" data-orig-src="https://66.media.tumblr.com/a6bc6c128e64a117f8d068e78b1ded60/tumblr_inline_nzet5r68Um1spdvr2_540.png"><img alt="" src="https://66.media.tumblr.com/6fcbbf78a4b9a7db6d5b1a6c49c4240b/tumblr_inline_pecnpnD96v1t17fny_540.png" data-orig-height="369" data-orig-width="540" data-orig-src="https://66.media.tumblr.com/a6bc6c128e64a117f8d068e78b1ded60/tumblr_inline_nzet5r68Um1spdvr2_540.png"/></figure><figure class="tmblr-full" data-orig-height="367" data-orig-width="540" data-orig-src="https://66.media.tumblr.com/6884852b7619285bbd93e17efe7e1832/tumblr_inline_nzet5sebGd1spdvr2_540.png"><img alt="" src="https://66.media.tumblr.com/dc67479ae9f954720963d94493d682f9/tumblr_inline_pecnpnymmK1t17fny_540.png" data-orig-height="367" data-orig-width="540" data-orig-src="https://66.media.tumblr.com/6884852b7619285bbd93e17efe7e1832/tumblr_inline_nzet5sebGd1spdvr2_540.png"/></figure><p><i><b>Performance without back pressure (top), and with back pressure enabled (bottom). The latencies with the back pressure enabled are worse (70 seconds vs 120 seconds). Note that both of these results are unacceptable for a streaming system as both fall behind the incoming data. Batch duration was set to 2 seconds for each run, with 130,000 throughput.</b></i></p>
<h2><b>Storm</b></h2>
<p>Storm’s benchmark was written using the Java API. We tested both Apache Storm 0.10.0 release and a 0.11.0 snapshot. The snapshot’s commit hash was a8d253a. One worker process per host was used, and each worker was given 16 tasks to run in 16 executors - one for each core.</p>
<p>Storm 0.10.0:</p>
<figure class="tmblr-full" data-orig-height="368" data-orig-width="540" data-orig-src="https://66.media.tumblr.com/4bb815ba6ae9e8ca100f753effd78d38/tumblr_inline_nzet5rcFDo1spdvr2_540.png"><img alt="" src="https://66.media.tumblr.com/33ab569f8d0e4e9c4613aad1aad753b4/tumblr_inline_pecnpoduG61t17fny_540.png" data-orig-height="368" data-orig-width="540" data-orig-src="https://66.media.tumblr.com/4bb815ba6ae9e8ca100f753effd78d38/tumblr_inline_nzet5rcFDo1spdvr2_540.png"/></figure><p>Storm 0.11.0:</p>
<figure class="tmblr-full" data-orig-height="368" data-orig-width="540" data-orig-src="https://66.media.tumblr.com/5f611487a173ab399ed20ca7ad189bc7/tumblr_inline_nzet5oN3y51spdvr2_540.png"><img alt="" src="https://66.media.tumblr.com/9255fe4023b25e55db88ace3888c2c9d/tumblr_inline_pecnppMNnm1t17fny_540.png" data-orig-height="368" data-orig-width="540" data-orig-src="https://66.media.tumblr.com/5f611487a173ab399ed20ca7ad189bc7/tumblr_inline_nzet5oN3y51spdvr2_540.png"/></figure><p>Storm compared favorably to both Flink and Spark Streaming. Storm 0.11.0 beat Storm 0.10.0, showing the optimizations that have gone in since the 0.10.0 release. However, at high-throughput both versions of Storm struggled. Storm 0.10.0 was not able to handle throughputs above 135,000 events per second.</p>
<p>Storm 0.11.0 performed similarly until we disabled acking. In the benchmarking topology, acking was used for flow control but not for processing guarantees. In 0.11.0, Storm added a simple back pressure controller, allowing us to avoid the overhead of acking. With acking enabled, 0.11.0 performed terribly at 150,000/s—slightly better than 0.10.0, but still far worse than anything else. With acking disabled, Storm even beat Flink for latency at high throughput. However, with acking disabled, the ability to report and handle tuple failures is disabled also.</p>
<h2><b>Conclusions and Future Work</b></h2>
<p>It is interesting to compare the behavior of these three systems. Looking at the following figure, we can see that Storm and Flink both respond quite linearly. This is because these two systems try to process an incoming event as it becomes available. On the other hand, the Spark Streaming system behaves in a stepwise function, a direct result from its micro-batching design.
</p>
<figure class="tmblr-full" data-orig-height="368" data-orig-width="540" data-orig-src="https://66.media.tumblr.com/f2bfdf7856cc5b74ac0b13a2d6747f16/tumblr_inline_nzet5qLI4P1spdvr2_540.png"><img alt="" src="https://66.media.tumblr.com/072bb01c8918be0ebfceadb78ecb8368/tumblr_inline_pecnpryDuG1t17fny_540.png" data-orig-height="368" data-orig-width="540" data-orig-src="https://66.media.tumblr.com/f2bfdf7856cc5b74ac0b13a2d6747f16/tumblr_inline_nzet5qLI4P1spdvr2_540.png"/></figure><p>The throughput vs latency graph for the various systems is maybe the most revealing, as it summarizes our findings with this benchmark. Flink and Storm have very similar performance, and Spark Streaming, while it has much higher latency, is expected to be able to handle much higher throughput.
</p>
<figure class="tmblr-full" data-orig-height="365" data-orig-width="540" data-orig-src="https://66.media.tumblr.com/09114914eb9baead2d40b782742c5e72/tumblr_inline_nzet5oLKk51spdvr2_540.png"><img alt="" src="https://66.media.tumblr.com/034b6bfeff359d1e12545a6a38563a25/tumblr_inline_pecnpr7Kxo1t17fny_540.png" data-orig-height="365" data-orig-width="540" data-orig-src="https://66.media.tumblr.com/09114914eb9baead2d40b782742c5e72/tumblr_inline_nzet5oLKk51spdvr2_540.png"/></figure><p>We did not include the results for Storm 0.10.0 and 0.11.0 with acking enabled beyond 135,000 events per second, because they could not keep up with the throughput. The resulting graph had the final point for Storm 0.10.0 in the 45,000 ms range, dwarfing every other line on the graph. The longer the topology ran, the higher the latencies got, indicating that it was losing ground.</p>
<p>All of these benchmarks except where otherwise noted were performed using default settings for Storm, Spark, and Flink, and we focused on writing correct, easy to understand programs without optimizing each to its full potential. Because of this each of the six steps were a separate bolt or spout. Flink and Spark both do operator combining automatically, but Storm (without Trident) does not. What this means for Storm is that events go through many more steps and have a higher overhead compared to the other systems.</p>
<p>In addition to further optimizations to Storm, we would like to expand the benchmark in terms of functionality, and to include other stream processing systems like Samza and Apex. We would also like to take into account fault tolerance, processing guarantees, and resource utilization.</p>
<p>The bottom line for us is Storm did great. Writing topologies is simple, and it’s easy to get low latency comparable to or better than Flink up to fairly high throughputs. Without acking, Storm even beat Flink at very high throughput, and we expect that with further optimizations like combining bolts, more intelligent routing of tuples, and improved acking, Storm with acking enabled would compete with Flink at very high throughput too.</p>
<p>The competition between near real time streaming systems is heating up, and there is no clear winner at this point. Each of the platforms studied here have their advantages and disadvantages. Performance is but one factor among others, such as security or integration with tools and libraries. Active communities for these and other big data processing projects continue to innovate and benefit from each other’s advancements. We look forward to expanding this benchmark and testing newer releases of these systems as they come out.</p>
</blockquote>