The previous post in this series covered the next generation of Apache Hadoop MapReduce in a broad sense, particularly its motivation, high-level architecture, goals, requirements, and aspects of its implementation.
In the second post in a series unpacking details of the implementation, we’d like to present the protocol for resource allocation and scheduling that drives application execution on a Next Generation Apache Hadoop MapReduce cluster.
Apache Hadoop must scale reliably and transparently to handle the load of a modern, production cluster on commodity hardware. One of the most painful bottlenecks in the MapReduce framework has been the JobTracker, the daemon responsible not only for tracking and managing machine resources across the cluster, but also for enforcing the execution semantics for all the queued and running MapReduce jobs. The fundamental shift we hope to effect takes these two complex and interrelated concepts and re-factors them into separate entities.
In the proposed new architecture a global ResourceManager (RM) tracks machine availability and scheduling invariants while a per-application ApplicationMaster (AM) runs inside the cluster and tracks the program semantics for a given job. An application is either a single MapReduce job as the JobTracker supports today, it could be a directed, acyclic graph (DAG) of MapReduce jobs, or it could be a new framework. Each machine in the cluster runs a per-node daemon, the NodeManager (NM), responsible for enforcing and reporting the resource allocations made by the RM and monitoring the lifecycle of processes spawned on behalf of an application. Each process started by the NM is conceptually a container, or a bundle of resources allocated by the RM.
Next Generation Apache Hadoop MapReduce Architecture
The ResourceManager is the central authority that arbitrates resources among various competing applications in the cluster.
The ResourceManager has two main components:
The Scheduler is responsible for allocating resources to the various running applications subject to familiar constraints of capacities, queues etc. The Scheduler is pure scheduler in the sense that it performs no monitoring or tracking of status for the application. Also, it offers no guarantees on restarting failed containers either due to application failure or hardware failures.
The Scheduler performs its scheduling function based the resource requirements of the applications. Applications express their resource requirements using the abstract notion of resource request, which incorporates elements such as memory, cpu, disk, network etc.
The ApplicationsManager is responsible for accepting job-submissions, negotiating the first container for executing the application-specific ApplicationMaster and provides the service for restarting the ApplicationMaster container on failure.
This post focuses on providing details about the Scheduler, the resource-model, the scheduling algorithm etc.
### Resource Model
MapReduce NextGen supports a very generic resource model that allows cluster resources such as CPU, memory, disk and network bandwidth, local-storage etc. to be efficiently scheduled among applications.
The result of a resource request is a container. A container is a conceptual entity that grants an application the privilege to use a certain amount of resources on a given machine to run a component task.
The Scheduler, in v1, models every machine in the system as composed of multiple containers (with a lower-limit on the size of memory, say 512MB or 1 GB).
The ApplicationMaster request a container along with the resources (amount of memory, cpu capability, etc) it needs. It can request multiple type of containers, each with its different resource needs. In addition it can optionally specify location preferences such as specific machine or rack; this allows for first class support of the MapReduce model of performing the computation close to the data being processed.
Thus, unlike current implementation of Hadoop Map-Reduce, the cluster is not artificially segregated into map and reduce slots. Every container is fungible, yielding huge benefits for cluster utilization. Currently, a well-known problem with Hadoop MapReduce is that jobs are bottle-necked on reduce slots and the lack of fungible resources is a severe limiting factor.
### Resource Request
As described in the previous section, the ApplicationMaster can ask for resources with specific capabilities (memory, cpu etc.) on specific machines or racks. The ApplicationMaster can ask also ask for different kinds of resources for differing needs (e.g. maps and reduces have different resource necessities in the MapReduce paradigm). Among various resource requests, the ApplicationMaster can use request priorities to influence ordering of containers allocated by the Scheduler (e.g. run maps before reduces in MapReduce).
All outstanding resource requests are serviced strictly in priority order, as defined by the application.
The ApplicationMaster can ask for any number of containers of a given request type (<priority, capability> ).
Thus, the resource request understood by the Scheduler is of the form:
<priority, (hostname/rackname/*), capability, #containers>
### Resource Negotiation
The ApplicationMaster is responsible for computing the resource requirements of the application. In a MR application, the input-splits express its locality preferences. The MR ApplicationMaster translates them into resource requests for the Scheduler.
The expected mode of operation is for applications to present all of their resource requests upfront, on start-up, to the extent feasible. Of course, the protocol allows for applications to present updates to the resource requests via deltas.
As described above the resource request is of the form: <priority, (hostname/rackname/*), capability, #containers>.
Thus, for MapReduce applications, the MapReduce ApplicationMaster takes the input-splits and generates resource requests by inverting the split to host mappings, then annotating each request with priorities (map v/s reduce containers) and number of containers required on each host. The resource requests are also aggregated by racks and then by the special any (*) for all containers. All resource requests are subject to change via the delta protocol.
An illustrative example:
Consider a MapReduce job with 2 maps, each requiring 1G of RAM and 1 reduce requiring 2G of RAM.
The corresponding resource requests from the ApplicationMaster to the Scheduler:
Notice how the containers field is aggregated across the different hosts, racks and any (*) for both maps and reduces.
The main advantage of the proposed model is that it is extremely compact in terms of the amount of state necessary, per application, on the ResourceManager for scheduling and the amount of information passed around between the ApplicationMaster & ResourceManager. This is crucial for scaling the ResourceManager. The amount of information, per application, in this model is always
O(cluster size), whereas in the current Hadoop Map-Reduce JobTracker it is
O(number of tasks) which could run into hundreds of thousands of tasks. For large jobs it is sufficient to ask for containers only on racks and not specific hosts since the ApplicationMaster can use them appropriately since each rack has many appropriate resources (i.e. input splits for MapReduce applications).
The Scheduler will try to match appropriate machines for the application; it can also provide resources on the same rack or a different rack if the specific machine is unavailable. The ApplicationMaster might, at times, due to vagaries of busy-ness of the cluster, receive resources that are not the most appropriate or receive them too late; it can then reject them by returning them to the Scheduler without being charged for them.
The Scheduler is aware of the network topology of the cluster and uses the racks and any (*) fields as gating factors to ensure that ApplicationMasters do not get inappropriate resources without resorting to needing very detailed task-level information.
For e.g. in the above example the Scheduler doesn’t need to know that map0 can run on
<r11/h1001, r22/h2121, r31/h3118>.
If a container is available on
r22/h2121, the Scheduler allocates it to the application and invalidates requests on
* by updating the resource request list as follows:
The ApplicationMaster can then, optionally, invalidate requests for
r31 after it decides to run map0 on
Now, if a container is available on
r11/h1010, the Scheduler allocates it to the application and invalidates requests on
* as follows:
The ApplicationMaster can then, optionally, invalidate requests for
r45 after it decides to run map1 on
The important detail about the scheduling policy is that:
* When the number of required containers reaches 0 on a host, the Scheduler will stop granting further containers on that node to a the application at that priority/capability.
* When the number of required containers reaches 0 on a rack, the Scheduler will stop granting further containers to any node on that rack to the application at that priority/capability.
* reaches 0 for an application, the Scheduler stops granting further containers on any node for the application at that priority/capability.
The Scheduler aggregates resource requests from all the running applications to build a global plan to allocate resources. The Scheduler then allocates resources based on application-specific constraints such as appropriate machines and global constraints such as capacities of the application, queue, user etc.
The CapacityScheduler plug-in uses familiar notions of Capacity and Capacity Guarantees as the policy for arbitrating resources among competing applications.
The scheduling algorithm is straightforward:
* Pick the most under-served queue in the system.
* Pick the highest priority application in that queue.
* Serve the application's resource asks appropriately.
The FairScheduler plug-in uses fairness the main policy for arbitrating resources among active applications.
Note: A rogue ApplicationMaster could, in any scenario, request more containers than necessary - the Scheduler uses application-limits, user-limits, queue-limits etc. to protect the cluster from abuse.
### Scheduler API
There is a single API between the Scheduler and the ApplicationMaster:
(List <Container> newContainers, List <ContainerStatus> containerStatuses)
allocate (List <ResourceRequest> ask, List<Container> release)
The AM ask for specific resources via a list of ResourceRequests (ask) and releases unnecessary Containers which were allocated by the Scheduler.
The response contains a list of newly allocated Containers and the statuses of application-specific Containers that completed since the previous interaction between the AM and the RM.
### Resource Monitoring
The Scheduler receives periodic information about the resource usages on allocated resources from the NodeManagers. The Scheduler also makes available status of completed Containers to the appropriate ApplicationMaster.
In future, obvious enhancements to the Scheduler include using the per-container resource usage information provided by the NodeManagers to do over-committing of resources etc.
We believe the proposed ideas for the Scheduler in the Next Generation of Hadoop Map-Reduce are both novel and crucial for the fine-grained scheduling required to meet our stated goals for Apache Hadoop MapReduce. They allow the Scheduler to scale to very large clusters and still provide very fine-grained and efficient scheduling for exploiting data-locality, performance and reliability.
We expect and encourage other cluster schedulers/managers to incorporate our ideas and we look forward to learning from their experiences too.