Module 7: Managing a Hadoop Cluster

Introduction

Hadoop can be deployed on a variety of scales. The requirements at each of these will be different. Hadoop has a large number of tunable parameters that can be used to influence its operation. Furthermore, there are a number of other technologies which can be deployed with Hadoop for additional capabilities. This module describes how to configure clusters to meet varying needs in terms of size, processing power, and reliability and availability.

Goals for this Module:

  • Understand differences in requirements for different sizes of Hadoop clusters
  • Learn how to configure Hadoop for a variety of deployment scopes

Outline

  1. Introduction
  2. Goals for this Module
  3. Outline
  4. Basic Setup
    1. Java Requirements
    2. Operating System
    3. Downloading and Installing Hadoop
  5. Important Directories
  6. Selecting Machines
  7. Cluster Configurations
    1. Small Clusters: 2-10 Nodes
    2. Medium Clusters: 10-40 Nodes
    3. Large Clusters: Multiple Racks
  8. Performance Monitoring
    1. Ganglia
    2. Nagios
  9. Additional Tips
  10. References & Resources

Basic Setup

This section discusses the general platform requirements for Hadoop.

Java Requirements

Hadoop is a Java-based system. Recent versions of Hadoop require Sun Java 1.6.

Compiling Java programs to run on Hadoop can be done with any number of commonly-used Java compilers. Sun's compiler is fine, as is ecj, the Eclipse Compiler for Java. A bug in gcj, the GNU Compiler for Java, causes incompatibility between generated classes and Hadoop; it should not be used.

Operating System

As Hadoop is written in Java, it is mostly portable between different operating systems. Developers can and do run Hadoop under Windows. The various scripts used to manage Hadoop clusters are written in a UNIX shell scripting language that assumes sh- or bash-like behavior. Thus running Hadoop under Windows requires cygwin to be installed. The Hadoop documentation stresses that a Windows/cygwin installation is for development only. The vast majority of server deployments today are on Linux. (Other POSIX-style operating systems such as BSD may also work. Some Hadoop users have reported successfully running the system on Solaris.) The instructions on this page assume a command syntax and system design similar to Linux, but can be readily adapted to other systems.

Downloading and Installing Hadoop

Hadoop is available for download from the project homepage at http://hadoop.apache.org/core/releases.html. Here you will find several versions of Hadoop available.

The versioning strategy used is major.minor.revision. Increments to the major version number represent large differences in operation or interface and possibly significant incompatible changes. At the time of this writing (September 2008), there have been no major upgrades; all Hadoop versions have their major version set to 0. The minor version represents a large set of feature improvements and enhancements. Hadoop instances with different minor versions may use different versions of the HDFS file formats and protocols, requiring a DFS upgrade to migrate from one to the next. Revisions are used to provide bug fixes. Within a minor version, the most recent revision contains the most stable patches.

Within the releases page, two or three versions of Hadoop will be readily available, corresponding to the highest revision number in the most recent two or three minor version increments. The stable version is the highest revision number in the second most recent minor version. Production clusters should use this version. The most recent minor version may include improved performance or new features, but may also introduce regressions that will be fixed in ensuing revisions.

At the time of this writing, 0.18.0 is the most recent version, with 0.17.2 being the "stable" release. These example instructions assume that version 0.18.0 is being used; the directions will not change significantly for any other version, except by substituting the new version number where appropriate.

To install Hadoop, first download and install prerequisite software. This includes Java 6 or higher. Distributed operation requires ssh and sshd. Windows users must install and configure cygwin as well. Then download a Hadoop version using a web browser, wget, or curl, and then unzip the package:

gunzip hadoop-0.18.0.tar.gz
tar vxf hadoop-0.18.0.tar

Within the hadoop-0.18.0/ directory which results, there will be several subdirectories. The most interesting of these are bin/, where scripts to run the cluster are located, and conf/ where the cluster's configuration is stored.

Enter the conf/ directory and modify hadoop-env.sh. The JAVA_HOME variable must be set to the base directory of your Java installation. It is recommended that you install Java in the same location on all machines in the cluster, so this file can be replicated to each machine without modification.

The hadoop-site.xml file must also be modified to contain a number of configuration settings. The sections below address the settings which should be included here.

If you are interested in setting up a development installation, running Hadoop on a single machine, the Hadoop documentation includes getting started instructions which will configure Hadoop for standalone or "pseudo-distributed" operation.

Standalone installations run all of Hadoop and your application inside a single Java process. The distributed file system is not used; file are read from and written to the local file system. Such a setup can be helpful for debugging Hadoop applications.

Pseudo-distributed operation refers to the use of several separate processes representing the different daemons (NameNode, DataNode, JobTracker, TaskTracker) and a separate task process to perform a Hadoop job, but with all processes running on a single machine. A pseudo-distributed instance will have a functioning NameNode/DataNode managing a "DFS" of sorts. Files in HDFS are in a separate namespace from the local file system, and are stored as block objects in a Hadoop-managed directory. However, it is not truly distributed, as no processing or data storage is performed on remote notes. A pseudo-distributed instance can be extended into a fully distributed cluster by adding more machines to function as Task/DataNodes, but more configuration settings are usually required to deploy a Hadoop cluster for multiple users.

The rest of this document deals with configuring Hadoop clusters of multiple nodes, intended for use by one or more developers.

After the conf/hadoop-site.xml is configured according to one of the models in the getting started, the sections below, or your own settings, two more files must be written.

The conf/masters file contains the hostname of the SecondaryNameNode. This should be changed from "localhost" to the fully-qualified domain name of the node to run the SecondaryNameNode service. It does not need to contain the hostname of the JobTracker/NameNode machine; that service is instantiated on whichever node is used to run bin/start-all.sh, regardless of the masters file. The conf/slaves file should contain the hostname of every machine in the cluster which should start TaskTracker and DataNode daemons. One hostname should be written per line in each of these files, e.g.:

slave01
slave02
slave03
...

The master node does not usually also function as a slave node, except in installations across only 1 or 2 machines.

If the nodes on your cluster do not support passwordless ssh, you should configure this now:

$ ssh-keygen -t dsa -P '' -f ~/.ssh/id_dsa
$ cat ~/.ssh/id_dsa.pub >> ~/.ssh/authorized_keys

This will enable passwordless ssh login to the local machine. (You can verify that this works by executing ssh localhost.) The ~/.ssh/id_dsa.pub and authorized_keys files should be replicated on all machines in the cluster.

At this point, the configuration must be replicated across all nodes in the cluster. Small clusters may use rsync or copy the configuration directory to each node. Larger clusters should use a configuration management system such as bcfg2, smartfrog, or puppet. NFS should be avoided as much as is possible, as it is a scalability bottleneck. DataNodes should never share block storage or other high-bandwidth responsibilities over NFS, and should avoid sharing configuration information over NFS if possible.

Various directories should be created on each node. The NameNode requires the NameNode metadata directory:

$ mkdir -p /home/hadoop/dfs/name

And every node needs the Hadoop tmp directory and DataNode directory created. Rather than logging in to each node and performing the steps multiple times manually, the file bin/slaves.sh allows a command to be executed on all nodes in the slaves file. For example, we can create these directories by executing the following commands on the NameNode:

$ mkdir -p /tmp/hadoop  # make the NameNode's tmp dir
$ export HADOOP_CONF_DIR=${HADOOP_HOME}/conf
$ export HADOOP_SLAVES=${HADOOP_CONF_DIR}/slaves
$ ${HADOOP_HOME}/bin/slaves.sh "mkdir -p /tmp/hadoop"
$ ${HADOOP_HOME}/bin/slaves.sh "mkdir -p /home/hadoop/dfs/data"

The environment variables $HADOOP_CONF_DIR and $HADOOP_SLAVES are used by the bin/slaves.sh script to find the slave machines list. The provided command is then executed over ssh. If you need particular ssh options, the contents of the $HADOOP_SSH_OPTS variable are passed to ssh as arguments.

We then format HDFS by executing the following command on the NameNode:

$ bin/hadoop namenode -format

And finally, start the cluster:

$ bin/start-all.sh

Now it is time to load in data and start processing it with Hadoop! Good luck!

The remainder of this document discusses various trade-offs in cluster configurations for different sizes, and reviews the settings which may be placed in the hadoop-site.xml file.

Important Directories

One of the basic tasks involved in setting up a Hadoop cluster is determining where the several various Hadoop-related directories will be located. Where they go is up to you; in some cases, the default locations are inadvisable and should be changed. This section identifies these directories.

Directory Description Default location Suggested location
HADOOP_LOG_DIR Output location for log files from daemons ${HADOOP_HOME}/logs /var/log/hadoop
hadoop.tmp.dir A base for other temporary directories /tmp/hadoop-${user.name} /tmp/hadoop
dfs.name.dir Where the NameNode metadata should be stored ${hadoop.tmp.dir}/dfs/name /home/hadoop/dfs/name
dfs.data.dir Where DataNodes store their blocks ${hadoop.tmp.dir}/dfs/data /home/hadoop/dfs/data
mapred.system.dir The in-HDFS path to shared MapReduce system files ${hadoop.tmp.dir}/mapred/system /hadoop/mapred/system

This table is not exhaustive; several other directories are listed in conf/hadoop-defaults.xml. The remaining directories, however, are initialized by default to reside under hadoop.tmp.dir, and are unlikely to be a concern.

It is critically important in a real cluster that dfs.name.dir and dfs.data.dir be moved out from hadoop.tmp.dir. A real cluster should never consider these directories temporary, as they are where all persistent HDFS data resides. Production clusters should have two paths listed for dfs.name.dir which are on two different physical file systems, to ensure that cluster metadata is preserved in the event of hardware failure.

A multi-user configuration should also definitely adjust mapred.system.dir. Hadoop's default installation is designed to work for standalone operation, which does not use HDFS. Thus it conflates HDFS and local file system paths. When enabling HDFS, however, MapReduce will store shared information about jobs in mapred.system.dir on the DFS. If this path includes the current username (as the default hadoop.tmp.dir does), this will prevent proper operation. The current username on the submitting node will be the username who actually submits the job, e.g., "alex." All other nodes will have the current username set to the username used to launch Hadoop itself (e.g., "hadoop"). If these do not match, the TaskTrackers will be unable to find the job information and run the MapReduce job.

For this reason, it is also advisable to remove ${user.name} from the general hadoop.tmp.dir.

While most of the directories listed above (all the ones with names in "foo.bar.baz" form) can be relocated via the conf/hadoop-site.xml file, the HADOOP_LOG_DIR directory is specified in conf/hadoop-env.sh as an environment variable. Relocating this directory requires editing this script.

Selecting Machines

Before diving into the details of configuring nodes, we include a brief word on choosing hardware for a cluster. While the processing demands of different organizations will dictate a different machine configuration for optimum efficiency, there are are commonalities associated with most Hadoop-based tasks.

Hadoop is designed to take advantage of whatever hardware is available. Modest "beige box" PCs can be used to run small Hadoop setups for experimentation and debugging. Providing greater computational resources will, to a point, result in increased performance by your Hadoop cluster. Many existing Hadoop deployments include Xeon processors in the 1.8-2.0GHz range. Hadoop jobs written in Java can consume between 1 and 2 GB of RAM per core. If you use HadoopStreaming to write your jobs in a scripting language such as Python, more memory may be advisable. Due to the I/O-bound nature of Hadoop, adding higher-clocked CPUs may not be the most efficient use of resources, unless the intent is to run HadoopStreaming. Big data clusters, of course, can use as many large and fast hard drives as are available. However, too many disks in a single machine will result in many disks not being used in parallel. It is better to have three machines with 4 hard disks each than one machine with 12 drives. The former configuration will be able to write to more drives in parallel and will provide greater throughput. Finally, gigabit Ethernet connections between machines will greatly improve performance over a cluster connected via a slower network interface.

It should be noted that the lower limit on minimum requirements for running Hadoop is well below the specifications for modern desktop or server class machines. However, multiple pages on the Hadoop wiki suggest similar specifications to those posted here for high-performance cluster design. (See [1][2].)

Cluster Configurations

This section provides cluster configuration advice and specific settings for clusters of varying sizes. These sizes were picked to demonstrate basic categories of clusters; your own installation may be a hybrid of different aspects of these profiles. Here we suggest various properties which should be included in the conf/hadoop-site.xml file to most effectively use a cluster of a given size, as well as other system configuration elements. The next section describes how to finish the installation after implementing the configurations described here. You should read through each of these configurations in order, as configuration suggestions for larger deployments are based on the preceding ones.

Small Clusters: 2-10 Nodes

Setting up a small cluster for development purposes is a very straightforward task. When using two nodes, one node will act as both NameNode/JobTracker and a DataNode/TaskTracker; the other node is only a DataNode/TaskTracker. Clusters of three or more machines typically use a dedicated NameNode/JobTracker, and all other nodes are workers.

A relatively minimalist configuration in conf/hadoop-site.xml will suffice for this installation:

<configuration>
<property>
<name>mapred.job.tracker</name>
<value>head.server.node.com:9001</value>
</property>
<property>
<name>fs.default.name</name>
<value>hdfs://head.server.node.com:9000</value>
</property>
<property>
<name>dfs.data.dir</name>
<value>/home/hadoop/dfs/data</value>
<final>true</final>
</property>
<property>
<name>dfs.name.dir</name>
<value>/home/hadoop/dfs/name</value>
<final>true</final>
</property>
<property>
<name>hadoop.tmp.dir</name>
<value>/tmp/hadoop</value>
<final>true</final>
</property>
<property>
<name>mapred.system.dir</name>
<value>/hadoop/mapred/system</value>
<final>true</final>
</property>
<property>
<name>dfs.replication</name>
<value>2</value>
</property>
</configuration>

Clusters closer to the 8-10 node range may want to set dfs.replication to 3. Values higher than 3 are usually not necessary. Individual files which are heavily utilized by a large number of nodes may have their particular replication factor manually adjusted upward independent of the cluster default.

Medium Clusters: 10-40 Nodes

This category is for clusters that occupy the majority of a single rack. Additional considerations for high availability and reliability come into play at this level.

The single point of failure in a Hadoop cluster is the NameNode. While the loss of any other machine (intermittently or permanently) does not result in data loss, NameNode loss results in cluster unavailability. The permanent loss of NameNode data would render the cluster's HDFS inoperable.

Therefore, another step should be taken in this configuration to back up the NameNode metadata. One machine in the cluster should be designated as the NameNode's backup. This machine does not run the normal Hadoop daemons (i.e., the DataNode and TaskTracker). Instead, it exposes a directory via NFS which is only mounted on the NameNode (e.g., /mnt/namenode-backup/). The cluster's hadoop-site.xml file should then instruct the NameNode to write to this directory as well:

  <property>
<name>dfs.name.dir</name>
<value>/home/hadoop/dfs/name,/mnt/namenode-backup</value>
<final>true</final>
</property>

The NameNode will write its metadata to each directory in the comma-separated list of dfs.name.dir. If /mnt/namenode-backup is NFS-mounted from the backup machine, this will ensure that a redundant copy of HDFS metadata is available. The backup node should serve /mnt/namenode-backup from /home/hadoop/dfs/name on its own drive. This way, if the NameNode hardware completely dies, the backup machine can be brought up as the NameNode with no reconfiguration of the backup machine's software. To switch the NameNode and backup nodes, the backup machine should have its IP address changed to the original NameNode's IP address, and the server daemons should be started on that machine. The IP address must be changed to allow the DataNodes to recognize it as the "original" NameNode for HDFS. (Individual DataNodes will cache the DNS entry associated with the NameNode, so just changing the hostname is insufficient; the name reassignment must be performed at the IP address level.)

The backup machine still has Hadoop installed and configured on it in the same way as every other node in the cluster, but it is not listed in the slaves file, so normal daemons are not started there.

One function that the backup machine can be used for is to serve as the SecondaryNameNode. Note that this is not a failover NameNode process. The SecondaryNameNode process connects to the NameNode and takes periodic snapshots of its metadata (though not in real time). The NameNode metadata consists of a snapshot of the file system called the fsimage and a series of deltas to this snapshot called the editlog. With these two files, the current state of the system can be determined exactly. The SecondaryNameNode merges the fsimage and editlog into a new fsimage file that is a more compact representation of the file system state. Because this process can be memory intensive, running it on the backup machine (instead of on the NameNode itself) can be advantageous.

To configure the SecondaryNameNode daemon to run on the backup machine instead of on the master machine, edit the conf/masters file so that it contains the name of the backup machine. The bin/start-dfs.sh and bin/start-mapred.sh (and by extension, bin/start-all.sh) scripts will actually always start the master daemons (NameNode and JobTracker) on the local machine. The slaves file is used for starting DataNodes and TaskTrackers. The masters file is used for starting the SecondaryNameNode. This filename is used despite the fact that the master node may not be listed in the file itself.

A cluster of this size may also require nodes to be periodically decommissioned. As noted in Module 2, several machines cannot be turned off simultaneously, or data loss may occur. Nodes must be decommissioned on a schedule that permits replication of blocks being decommissioned. To prepare for this eventuality in advance, an excludes file should be added to the conf/hadoop-site.xml:

  <property>
<name>dfs.hosts.exclude</name>
<value>/home/hadoop/excludes</value>
<final>true</final>
</property>
<property>
<name>mapred.hosts.exclude</name>
<value>/home/hadoop/excludes</value>
<final>true</final>
</property>

This property should provide the full path to the excludes file (the actual location of the file is up to you). You should then create an empty file with this name:

$ touch /home/hadoop/excludes

While the dfs.hosts.exclude property allows the definition of a list of machines which are explicitly barred from connecting to the NameNode (and similarly, mapred.hosts.exclude for the JobTracker), a large cluster may want to explicitly manage a list of machines which are approved to connect to a given JobTracker or NameNode.

The dfs.hosts and mapred.hosts properties allow an administrator to supply a file containing an approved list of hostnames. If a machine is not in this list, it will be denied access to the cluster. This can be used to enforce policies regarding which teams of developers have access to which MapReduce sub-clusters. These are configured in exactly the same way as the excludes file.

Of course, at this scale and above, 3 replicas of each block are advisable; the hadoop-site.xml file should contain:

  <property>
<name>dfs.replication</name>
<value>3</value>
</property>

By default, HDFS does not preserve any free space on the DataNodes; the DataNode service will continue to accept blocks until all free space on the disk is exhausted, which may cause problems. The following setting will require each DataNode to reserve at least 1 GB of space on the drive free before it writes more blocks, which helps preserve system stability:

  <property>
<name>dfs.datanode.du.reserved</name>
<value>1073741824</value>
<final>true</final>
</property>

Another parameter to watch is the heap size associated with each task. Hadoop caps the heap of each task process at 200 MB, which is too small for most data processing tasks. This cap is set as a parameter passed to the child Java process. It is common to override this with a higher cap by specifying:

  <property>
<name>mapred.child.java.opts</name>
<value>-Xmx512m</value>
</property>

This will provide each child task with 512 MB of heap. It is not unreasonable in some cases to specify -Xmx1024m instead. In the interest of providing only what is actually required, it may be better to leave this set to 512 MB by default, and allowing applications to manually configure for a full GB of RAM/task themselves.

Using multiple drives per machine

While small clusters often have only one hard drive per machine, more high-performance configurations may include two or more disks per node. Slight configuration changes are required to make Hadoop take advantage of additional disks.

DataNodes can be configured to write blocks out to multiple disks via the dfs.data.dir property. It can take on a comma-separated list of directories. Each block is written to one of these directories. E.g., assuming that there are four disks, mounted on /d1, /d2, /d3, and /d4, the following (or something like it) should be in the configuration for each DataNode:

  <property>
<name>dfs.data.dir</name>
<value>/d1/dfs/data,/d2/dfs/data,/d3/dfs/data,/d4/dfs/data</value>
<final>true</final>
</property>

MapReduce performance can also be improved by distributing the temporary data generated by MapReduce tasks across multiple disks on each machine:

  <property>
<name>mapred.local.dir</name>
<value>/d1/mapred/local,/d2/mapred/local,/d3/mapred/local,/d4/mapred/local</value>
<final>true</final>
</property>

Finally, if there are multiple drives available in the NameNode, they can be used to provide additional redundant copies of the NameNode metadata in the event of the failure of one drive. Unlike the above two properties, where one drive out of many is selected to write a piece of data, the NameNode writes to each comma-separated path in dfs.name.dir. If too many drives are listed here it may adversely affect the performance of the NameNode, as the probability of blocking on one or more I/O operations increases with the number of devices involved, but it is imperative that the sole copy of the metadata does not reside on a single drive.

Large Clusters: Multiple Racks

Configuring multiple racks of machines for Hadoop requires further advance planning. The possibility of rack failure now exists, and operational racks should be able to continue even if entire other racks are disabled. Naive setups may result in large cross-rack data transfers which adversely affect performance. Furthermore, in a large cluster, the amount of metadata under the care of the NameNode increases. This section proposes configuring several properties to help Hadoop operate at very large scale, but the numbers used in this section are just guidelines. There is no single magic number which works for all deployments, and individual tuning will be necessary. These will, however, provide a starting point and alert you to settings which will be important.

The NameNode is responsible for managing metadata associated with each block in the HDFS. As the amount of information in the rack scales into the 10's or 100's of TB, this can grow to be quite sizable. The NameNode machine needs to keep the blockmap in RAM to work efficiently. Therefore, at large scale, this machine will require more RAM than other machines in the cluster. The amount of metadata can also be dropped almost in half by doubling the block size:

  <property>
<name>dfs.block.size</name>
<value>134217728</value>
</property>

This changes the block size from 64MB (the default) to 128MB, which decreases pressure on the NameNode's memory. On the other hand, this potentially decreases the amount of parallelism that can be achieved, as the number of blocks per file decreases. This means fewer hosts may have sections of a file to offer to MapReduce tasks without contending for disk access. The larger the individual files involved (or the more files involved in the average MapReduce job), the less of an issue this is.

In the medium configuration, the NameNode wrote HDFS metadata through to another machine on the rack via NFS. It also used that same machine to checkpoint the NameNode metadata and compact it in the SecondaryNameNode process. Using this same setup will result in the cluster being dependent on a single rack's continued operation. The NFS-mounted write-through backup should be placed in a different rack from the NameNode, to ensure that the metadata for the file system survives the failure of an individual rack. For the same reason, the SecondaryNameNode should be instantiated on a separate rack as well.

With multiple racks of servers, RPC timeouts may become more frequent. The NameNode takes a continual census of DataNodes and their health via heartbeat messages sent every few seconds. A similar timeout mechanism exists on the MapReduce side with the JobTracker. With many racks of machines, they may force one another to timeout because the master node is not handling them fast enough. The following options increase the number of threads on the master machine dedicated to handling RPC's from slave nodes:

  <property>
<name>dfs.namenode.handler.count</name>
<value>40</value>
</property>
<property>
<name>mapred.job.tracker.handler.count</name>
<value>40</value>
</property>

These settings were used in clusters of several hundred nodes. They should be scaled up accordingly with larger deployments.

The following settings provide additional starting points for optimization. These are based on the reported configurations of actual clusters from 250 to 2000 nodes.

Property Range Description
io.file.buffer.size 32768-131072 Read/write buffer size used in SequenceFiles (should be in multiples of the hardware page size)
io.sort.factor 50-200 Number of streams to merge concurrently when sorting files during shuffling
io.sort.mb 50-200 Amount of memory to use while sorting data
mapred.reduce.parallel.copies 20-50 Number of concurrent connections a reducer should use when fetching its input from mappers
tasktracker.http.threads 40-50 Number of threads each TaskTracker uses to provide intermediate map output to reducers
mapred.tasktracker.map.tasks.maximum 1/2 * (cores/node) to 2 * (cores/node) Number of map tasks to deploy on each machine.
mapred.tasktracker.reduce.tasks.maximum 1/2 * (cores/node) to 2 * (cores/node) Number of reduce tasks to deploy on each machine.

Rack awareness

In a multi-rack configuration, it is important to ensure that replicas of blocks are placed on multiple racks to minimize the possibility of data loss. Thus, a rack-aware placement policy should be used. A basic rack awareness script is provided in Module 2. The guidelines there suggest how to set up a basic rack awareness policy; due to the heterogeneity of network topologies, a definitive general-purpose solution cannot be provided here.

This tutorial targets Hadoop version 0.18.0. While most of the interfaces described will work on other, older versions of Hadoop, rack-awareness underwent a major overhaul in version 0.17. Thus, the following does not apply to version 0.16 and before.

One major consequence of the upgrade is that while rack-aware block replica placement has existed in Hadoop for some time, rack-aware task placement has only been added in version 0.17. If Hadoop MapReduce cannot place a task on the same node as the block of data which the task is scheduled to process, then it picks an arbitrary different node on which to schedule the task. Starting with 0.17.0, tasks will be placed (when possible) on the same rack as at least one replica of an input data block for a job, which should further minimize the amount of inter-rack data transfers required to perform a job.

Hadoop includes an interface called DNSToSwitchMapping which allows arbitrary Java code to be used to map servers onto a rack topology. The configuration key topology.node.switch.mapping.impl can be used to specify a class which meets this interface. More straightforward than writing a Java class for this purpose, however, is to use the default mapper, which executes a user-specified script (or other command) on each node of the cluster, which returns the rack id for that node. These rack ids are then aggregated and sent back to the NameNode.

Note that the rack mapping script used by this system is incompatible with the 0.16 method of using dfs.network.script. Whereas dfs.network.script runs on each DataNode, a new script specified by topology.script.file.name is run by the master node only. To set the rack mapping script, specify the key topology.script.file.name in conf/hadoop-site.xml.

Cluster contention

If you are configuring a large number of machines, it is likely that you have a large number of users who wish to submit jobs to execute on it. Hadoop's job scheduling algorithm is based on a simple FIFO scheduler. Using this in a large deployment without external controls or policies agreed upon by all users can lead to lots of contention for the JobTracker, causing short jobs to be delayed by other long-running tasks and frustrating users.

An advanced technique to combat this problem is to configure a single HDFS cluster which spans all available machines, and configure several separate MapReduce clusters with their own JobTrackers and pools of TaskTrackers. All MapReduce clusters are configured to use the same DFS and the same NameNode; but separate groups of machines have a different machine acting as JobTracker (i.e., subclusters have different settings for mapred.job.tracker). Breaking machines up into several smaller clusters, each of which contains 20-40 TaskTrackers, provides users with lower contention for the system. Users may be assigned to different clusters by policy, or they can use the JobTracker status web pages (a web page exposed on port 50030 of each JobTracker) to determine which is underutilized.

Multiple strategies exist for this assignment process. It is considered best practice to stripe the TaskTrackers associated with each JobTracker across all racks. This maximizes the availability of each cluster (as they are all resistant to individual rack failure), and works with the HDFS replica placement policy to ensure that each MapReduce cluster can find rack-local replicas of all files used in any MapReduce jobs.

Performance Monitoring

Multiple tools exist to monitor large clusters for performance and troubleshooting. This section briefly highlights two such tools.

Ganglia

Ganglia is a performance monitoring framework for distributed systems. Ganglia provides a distributed service which collects metrics on individual machines and forwards them to an aggregator which can report back to an administrator on the global state of a cluster.

Ganglia is designed to be integrated into other applications to collect statistics about their operation. Hadoop includes a performance monitoring framework which can use Ganglia as its backend. Instructions are available on the Hadoop wiki as to how to enable Ganglia metrics in Hadoop. Instructions are also included below.

After installing and configuring Ganglia on your cluster, to direct Hadoop to output its metric reports to Ganglia, create a file named hadoop-metrics.properties in the $HADOOP_HOME/conf directory. The file should have the following contents:

dfs.class=org.apache.hadoop.metrics.ganglia.GangliaContext
dfs.period=10
dfs.servers=localhost:8649

mapred.class=org.apache.hadoop.metrics.ganglia.GangliaContext
mapred.period=10
mapred.servers=localhost:8649

This assumes that gmond is running on each machine in the cluster. Instructions on the Hadoop wiki note that (in the experience of the wiki article author) this may result in all nodes reporting their results as "localhost" instead of with their individual hostnames. If this problem affects your cluster, an alternate configuration is proposed, in which all Hadoop instances speak directly with gmetad:

dfs.class=org.apache.hadoop.metrics.ganglia.GangliaContext
dfs.period=10
dfs.servers=@GMETAD@:8650

mapred.class=org.apache.hadoop.metrics.ganglia.GangliaContext
mapred.period=10
mapred.servers=@GMETAD@:8650

Where @GMETAD@ is the hostname of the server on which the gmetad service is running. If deploying Ganglia and Hadoop on a very large number of machines, the impact of this configuration (vs. the standard Ganglia configuration where individual services talk to gmond on localhost) should be evaluated.

Nagios

While Ganglia will monitor Hadoop-specific metrics, general information about the health of the cluster should be monitored with an additional tool.

Nagios is a machine and service monitoring system designed for large clusters. Nagios will provide useful diagnostic information for tuning your cluster, including network, disk, and CPU utilization across machines.

Additional Tips

The following are a few additional pieces of small advice:

  • Create a separate user named "hadoop" to run your instances; this will separate the Hadoop processes from any users on the system. Do not run Hadoop as root.
  • If Hadoop is installed in /home/hadoop/hadoop-0.18.0, link /home/hadoop/hadoop to /home/hadoop/hadoop-0.18.0. When upgrading to a newer version in the future, the link can be moved to make this process easier on other scripts that depend on the hadoop/bin directory.

References & Resources

Hadoop Getting Started - Single-node configuration instructions
Hadoop Cluster Setup - Official Hadoop configuration instructions
Michael Noll's Hadoop configuration tutorials for single and multiple node configurations.

 

Creative Commons License
"Hadoop Tutorial from Yahoo!" by Yahoo! Inc. is licensed under a Creative Commons Attribution 3.0 Unported License.