Open Sourcing TensorFlowOnSpark: Distributed Deep Learning on Big-Data Clusters
<p>By Lee Yang, Jun Shi, Bobbie Chern, and Andy Feng (@afeng76), Yahoo Big ML team</p><p><b>Introduction</b></p><p>Today, we are pleased to offer <a href="https://github.com/yahoo/TensorFlowOnSpark" target="_blank">TensorFlowOnSpark</a> to the community, our latest open source framework for distributed deep learning on big-data clusters.</p><p>Deep learning (DL) has evolved significantly in recent years. At Yahoo, we’ve found that in order to gain insight from massive amounts of data, we need to deploy <i>distributed</i> deep learning. Existing DL frameworks often require us to set up separate clusters for deep learning, forcing us to create multiple programs for a machine learning pipeline (see Figure 1 below). Having separate clusters requires us to transfer large datasets between them, introducing unwanted system complexity and end-to-end learning latency.</p><p><a href="https://s.yimg.com/ge/default/691231/TFoSFigure1.png" target="_blank"></a></p><p><a href="https://s.yimg.com/ge/default/691231/TFoSFigure1.png" target="_blank"></a></p><p><a href="https://s.yimg.com/ge/default/691231/TFoSFigure1.png" target="_blank"></a></p><a href="https://s.yimg.com/ge/default/691231/TFoSFigure1.png" target="_blank"><figure class="tmblr-full" data-orig-height="560" data-orig-width="990" data-orig-src="https://s.yimg.com/ge/default/691231/TFoSFigure1.png"><img src="https://66.media.tumblr.com/7f8dbbfffec78fb039f5d0411ec6cfab/tumblr_inline_p7g6lapgux1t17fny_540.png" alt="image" data-orig-height="560" data-orig-width="990" data-orig-src="https://s.yimg.com/ge/default/691231/TFoSFigure1.png"/></figure></a><p>Last year we addressed scaleout issues by developing and publishing <a href="http://github.com/yahoo/CaffeOnSpark" target="_blank">CaffeOnSpark</a>, our open source framework that allows distributed deep learning and big-data processing on identical Spark and Hadoop clusters. We use CaffeOnSpark at Yahoo to improve our <a href="https://yahooeng.tumblr.com/post/151148689421/open-sourcing-a-deep-learning-solution-for" target="_blank">NSFW image detection</a>, to automatically identify eSports game highlights from live-streamed videos, and more. With the community’s valuable feedback and contributions, CaffeOnSpark has been upgraded with LSTM support, a new data layer, training and test interleaving, a Python API, and deployment on docker containers. This has been great for our Caffe users, but what about those who use the deep learning framework <a href="https://www.tensorflow.org/" target="_blank">TensorFlow</a>? We’re taking a page from our own playbook and doing for TensorFlow for what we did for Caffe. </p><p>After TensorFlow’s initial publication, Google released an enhanced TensorFlow with distributed deep learning capabilities in April 2016. In October 2016, TensorFlow introduced HDFS support. Outside of the Google cloud, however, users still needed a dedicated cluster for TensorFlow applications. TensorFlow programs could not be deployed on existing big-data clusters, thus increasing the cost and latency for those who wanted to take advantage of this technology at scale.</p><p>To address this limitation, several community projects wired TensorFlow onto Spark clusters. <a href="https://github.com/amplab/SparkNet" target="_blank">SparkNet</a> added the ability to launch TensorFlow networks in Spark executors. DataBricks proposed <a href="https://github.com/databricks/tensorframes" target="_blank">TensorFrame</a> to manipulate Apache Spark’s DataFrames with TensorFlow programs. While these approaches are a step in the right direction, after examining their code, we learned we would be unable to get the TensorFlow processes to communicate with each other directly, we would not be able to implement asynchronous distributed learning, and we would have to expend significant effort to migrate existing TensorFlow programs.</p><p><b>TensorFlowOnSpark</b></p><p><a href="https://s.yimg.com/ge/default/691231/TFoSFigure2.png" target="_blank"></a></p><p><a href="https://s.yimg.com/ge/default/691231/TFoSFigure2.png" target="_blank"></a></p><p><a href="https://s.yimg.com/ge/default/691231/TFoSFigure2.png" target="_blank"></a></p><a href="https://s.yimg.com/ge/default/691231/TFoSFigure2.png" target="_blank"><figure class="tmblr-full" data-orig-height="500" data-orig-width="950" data-orig-src="https://s.yimg.com/ge/default/691231/TFoSFigure2.png"><img src="https://66.media.tumblr.com/9bae4abc1c69491d645975b3f88137dc/tumblr_inline_p7g6lbEyCE1t17fny_540.png" alt="image" data-orig-height="500" data-orig-width="950" data-orig-src="https://s.yimg.com/ge/default/691231/TFoSFigure2.png"/></figure></a><p>Our new framework, TensorFlowOnSpark (TFoS), enables distributed
TensorFlow execution on Spark and Hadoop clusters. As illustrated in
Figure 2 above, TensorFlowOnSpark is designed to work along with
SparkSQL, MLlib, and other Spark libraries in a single pipeline or
program (e.g. Python notebook).</p><p>TensorFlowOnSpark supports all types of TensorFlow programs, enabling both asynchronous and synchronous training and inferencing. It supports model parallelism and data parallelism, as well as TensorFlow tools such as TensorBoard on Spark clusters.</p><p>Any TensorFlow program can be easily modified to work with TensorFlowOnSpark. Typically, changing fewer than 10 lines of Python code are needed. Many developers at Yahoo who use TensorFlow have easily migrated TensorFlow programs for execution with TensorFlowOnSpark.</p><p>TensorFlowOnSpark supports direct tensor communication among TensorFlow processes (workers and parameter servers). Process-to-process direct communication enables TensorFlowOnSpark programs to scale easily by adding machines. As illustrated in Figure 3, TensorFlowOnSpark doesn’t involve Spark drivers in tensor communication, and thus achieves similar scalability as stand-alone TensorFlow clusters.</p><p><a href="https://s.yimg.com/ge/default/691231/TFoSFigure3.png" target="_blank"></a></p><p><a href="https://s.yimg.com/ge/default/691231/TFoSFigure3.png" target="_blank"></a></p><p><a href="https://s.yimg.com/ge/default/691231/TFoSFigure3.png" target="_blank"></a></p><a href="https://s.yimg.com/ge/default/691231/TFoSFigure3.png" target="_blank"><figure class="tmblr-full" data-orig-height="830" data-orig-width="1242" data-orig-src="https://s.yimg.com/ge/default/691231/TFoSFigure3.png"><img src="https://66.media.tumblr.com/c7406e478beb085693c0e431f5f53c77/tumblr_inline_p7g6lbOEnO1t17fny_540.png" alt="image" data-orig-height="830" data-orig-width="1242" data-orig-src="https://s.yimg.com/ge/default/691231/TFoSFigure3.png"/></figure></a><p>TensorFlowOnSpark provides two different modes to ingest data for training and inference:</p><ol><li><b>TensorFlow QueueRunners</b>: TensorFlowOnSpark leverages TensorFlow’s <a href="https://www.tensorflow.org/how_tos/reading_data/#reading_from_files" target="_blank">file readers</a> and <a href="https://www.tensorflow.org/how_tos/threading_and_queues/#queuerunner" target="_blank">QueueRunners</a> to read data directly from HDFS files. Spark is not involved in accessing data.</li>
<li><b>Spark Feeding</b>: Spark RDD data is fed to each Spark executor, which subsequently feeds the data into the TensorFlow graph via <a href="https://www.tensorflow.org/how_tos/reading_data/#feeding" target="_blank">feed_dict</a>.</li>
</ol><p>Figure 4 illustrates how the synchronous distributed training of<a href="http://googleresearch.blogspot.com/2016/03/train-your-own-image-classifier-with.html" target="_blank"> Inception image classification</a> network scales in TFoS using QueueRunners with a simple setting: 1 GPU, 1 reader, and batch size 32 for each worker. Four TFoS jobs were launched to train 100,000 steps. When these jobs completed after 2+ days, the top-5 accuracy of these jobs were 0.730, 0.814, 0.854, and 0.879. Reaching top-5 accuracy of 0.730 takes 46 hours for a 1-worker job, 22.5 hours for a 2-worker job, 13 hours for a 4-worker job, and 7.5 hours for an 8-worker job. TFoS thus achieves near linear scalability for Inception model training. This is very encouraging, although TFoS scalability will vary for different models and hyperparameters.</p><p><a href="https://s.yimg.com/ge/default/691231/TFoSFigure4.png" target="_blank"></a></p><p><a href="https://s.yimg.com/ge/default/691231/TFoSFigure4.png" target="_blank"></a></p><p><a href="https://s.yimg.com/ge/default/691231/TFoSFigure4.png" target="_blank"></a></p><a href="https://s.yimg.com/ge/default/691231/TFoSFigure4.png" target="_blank"><figure class="tmblr-full" data-orig-height="934" data-orig-width="1076" data-orig-src="https://s.yimg.com/ge/default/691231/TFoSFigure4.png"><img src="https://66.media.tumblr.com/bcf6bf5c80a23598f71c59807d461d5a/tumblr_inline_p7g6lcYgPQ1t17fny_540.png" alt="image" data-orig-height="934" data-orig-width="1076" data-orig-src="https://s.yimg.com/ge/default/691231/TFoSFigure4.png"/></figure></a><p><b>RDMA for Distributed TensorFlow</b></p><p>In Yahoo’s Hadoop clusters, GPU nodes are connected by both Ethernet and Infiniband. Infiniband provides faster connectivity and supports direct access to other servers’ memories over RDMA. Current TensorFlow releases, however, only support distributed learning using gRPC over Ethernet. To speed up distributed learning, we have enhanced the TensorFlow C++ layer to enable RDMA over Infiniband.</p><p>In conjunction with our TFoS release, we are introducing a new protocol for TensorFlow servers in addition to the default <i>“grpc”</i> protocol. Any distributed TensorFlow program can leverage our enhancement via specifying <i>protocol=“grpc_rdma”</i> in <i>tf.train.ServerDef()</i> or <i>tf.train.Server()</i>.</p><p>With this new protocol, a RDMA rendezvous manager is created to ensure tensors are written directly into the memory of remote servers. We minimize the tensor buffer creation: Tensor buffers are allocated once at the beginning, and then reused across all training steps of a TensorFlow job. From our early experimentation with large models like the <a href="http://www.robots.ox.ac.uk/~vgg/research/very_deep/" target="_blank">VGG-19 network</a>, our RDMA implementation has demonstrated a significant speedup on training time compared with the existing gRPC implementation.</p><p>Since RDMA support is a highly requested capability (see TensorFlow issue <a href="https://github.com/tensorflow/tensorflow/issues/2916" target="_blank">#2916</a>), we decided to make our current implementation available as an alpha release to the TensorFlow community. In the coming weeks, we will polish our RDMA implementation further, and share detailed benchmark results.</p><p><b>Simple CLI and API</b></p><p>TFoS programs are launched by the standard Apache Spark command, <i>spark-submit</i>. As illustrated below, users can specify the number of Spark executors, the number of GPUs per executor, and the number of parameter servers in the CLI. A user can also state whether they want to use TensorBoard (–tensorboard) and/or RDMA (–rdma).</p><p> spark-submit –master ${MASTER} \ <br/> ${TFoS_HOME}/examples/slim/train_image_classifier.py \ <br/> –model_name inception_v3 \<br/> –train_dir hdfs://default/slim_train \ <br/> –dataset_dir hdfs://default/data/imagenet \<br/> –dataset_name imagenet \<br/> –dataset_split_name train \<br/> –cluster_size ${NUM_EXEC} \<br/> –num_gpus ${NUM_GPU} \<br/> –num_ps_tasks ${NUM_PS} \<br/> –sync_replicas \<br/> –replicas_to_aggregate ${NUM_WORKERS} \<br/> –tensorboard \<br/> –rdma </p><p>TFoS provides a high-level Python API (illustrated in our <a href="https://github.com/yahoo/TensorFlowOnSpark/blob/master/examples/mnist/TFOS_demo.ipynb" target="_blank">sample Python notebook</a>):</p><ul><li>TFCluster.reserve() … construct a TensorFlow cluster from Spark executors</li>
<li>TFCluster.start() … launch Tensorflow program on the executors</li>
<li>TFCluster.train() or TFCluster.inference() … feed RDD data to TensorFlow processes</li>
<li>TFCluster.shutdown() … shutdown Tensorflow execution on executors</li>
</ul><p><b>Open Source</b></p><p>Yahoo is happy to release TensorFlowOnSpark at <a href="https://github.com/yahoo/TensorFlowOnSpark" target="_blank">github.com/yahoo/TensorFlowOnSpark</a> and a RDMA enhancement of TensorFlow at <a href="https://github.com/yahoo/tensorflow/tree/yahoo" target="_blank">github.com/yahoo/tensorflow/tree/yahoo</a>. Multiple <a href="https://github.com/yahoo/TensorFlowOnSpark/tree/master/examples" target="_blank">example programs</a> (including mnist, cifar10, inception, and VGG) are provided to illustrate the simple conversion process of TensorFlow programs to TensorFlowOnSpark, and leverage RDMA. An Amazon Machine Image is also <a href="https://github.com/yahoo/TensorFlowOnSpark/wiki/GetStarted_EC2" target="_blank">available</a> for applying TensorFlowOnSpark on AWS EC2.</p><p>Going forward, we will advance TensorFlowOnSpark as we continue to do with CaffeOnSpark. We welcome the community’s continued feedback and contributions to CaffeOnSpark, and are interested in thoughts on ways TensorFlowOnSpark can be enhanced.</p>