Understanding Big Data: Stream Analytics and YARN
Real-time stream processing is growing in importance, as businesses need to be able to react faster to events as they that occur. Data that is valuable now may be worthless a few hours later. Use cases include sentiment analysis, monitoring and anomaly detection.
With cheap and infinitely scalable storage and compute infrastructure, more and more data flows into the Hadoop cluster. For the first time, the opportunity is ripe to fully leverage that infrastructure and bring real-time processing as close to the data in HDFS as possible, yet isolated from other workloads. This need has been a driver for Hadoop native streaming platforms and a key reason why other streaming solutions, like Storm, fall short.
This post motivates critical infrastructure pieces to build mission critical real-time streaming applications on Hadoop, specifically needed for end-to-end fault tolerance for the processing platform.
Hadoop YARN – The distributed OS
A new generation of Hadoop applications was enabled through YARN, allowing for processing paradigms other than MapReduce. Next to MapReduce, there are now many other applications and platforms running on YARN, including stream processing, interactive SQL, machine learning and graph processing.
Hadoop 2.x with YARN is becoming the distributed OS for the data center. Adaption is picking up speed, as all major Hadoop distributions moved to 2.x. YARN applications benefit from:
- Horizontal scalability with commodity hardware
- Distributed file system with “unlimited” storage space
- Central resource management with queues, limits and locality preferences
- Framework for multi-tenancy, fault tolerance and security
Most applications running on YARN rely on other external services that today are not native to YARN, like message brokers, databases and web servers. These services are separate and have their own infrastructure, resources and operational procedures. This results in fragmentation, inefficiency, higher cost and adaption barriers. For example, while a single scheduler can improve resource utilization while ensuring isolation, especially for elastic workloads, this is not possible when carving out separate clusters – another aspect of fault tolerance.
Fault Tolerant Processing by example
DataTorrent RTS is a real-time stream-processing platform. We set out in early 2012 to build the first YARN native application, besides MapReduce. All components of our platform and its entire architecture were built around YARN. Other solutions in the stream processing space are either completely outside Hadoop or are a YARN build-on that has multiple downsides.
Today, as we work with customers building applications on top of DataTorrent RTS, we see that there will be the need to extend the idea of the “distributed OS” to other peripheral systems that those applications depend on to leverage expertise and existing investments.
One of the key differentiators of DataTorrent RTS is fault tolerance. These capabilities would not be possible without support in YARN:
- Detection of process failures that can be used to implement automatic HA
- Ability to add and replace machines with no downtime
RTS as a YARN native application uses these basic building blocks to provide full HA with no loss of data or human intervention and minimum recovery time. This is a critical capability for real-time processing and low end-to-end latency requirements. No data loss implies that the application state is check-pointed. This is provided by RTS, without the user having to write extra code for it, a capability made possible due to tight HDFS integration.
Data needs to move into the Hadoop cluster for processing purposes. For stream processing, Kafka is an increasingly popular choice of message bus. It was built for scalability and meets low latency requirements. Let’s consider Kafka the message bus delivering data into a DataTorrent application, running in the YARN cluster, for processing:
For mission critical applications, high availability is essential. In the above scenario, the RTS stream processing application running on YARN is fully fault tolerant. Any process failure in the cluster will be handled within the framework established by YARN, using the built in support for recovery in RTS. In contrast, though Kafka is fault tolerant with replicated partitions and failover for leader broker, failures of the server processes are either not handled at all or are handled through a mechanism that the user must provide.
This compromises the end-to-end fault tolerance proposition and leads to an acceptance problem. Not only can failures in the Kafka cluster lead to service interruption in the pipeline, they can also pose problems for sensitive producers of data that have little tolerance to downtime of Kafka as downstream buffer. So, how can we make use of the great capabilities Kafka offers in way that can be operationalized? Today’s primary users of Kafka have built their own teams and proprietary infrastructure to address it, but that can become an expensive hobby and is typically not what a customer wants to hear.
Kafka on YARN (KOYA)
Moving Kafka into the YARN cluster is a solution to the problem, especially in our context where the user is running a YARN cluster anyways and has made the investment to operationalize it.
Before, the user had to perform a number of steps to replace the Kafka broker. As long as a replica remains, there would be no downtime. However, it is desirable that the path to recovery can be predefined and automated to avoid that alert in the middle of the night.
With YARN, the application master detects a process failure and can initiate recovery. In case of machine failure, the process can be replaced on a new machine. Since Kafka is sensitive to disk I/O, the YARN cluster administrator can reserve backup machines.
It makes sense to integrate Kafka with YARN, as existing investments and skills can be leveraged. Kafka running under the YARN umbrella can utilize the centrally managed pool of resources. The process monitoring and recovery features of YARN can be extended to provide complete HA for Kafka servers (Kafka provides replicated partitions, but it does not offer automation for dealing with failed brokers).
DataTorrent announced a new initiative to integrate Kafka and YARN under the KOYA project. KOYA was proposed as KAFKA-1754 and was well received by the community. The goals of KOYA are listed below:
- Automate broker recovery
- Automate deployment of Kafka cluster
- Central status of cluster
- Ease of management
- Support core Kafka as is, without modifications
Slider
We initially got ready to build a new application master for KOYA from scratch. Why not? We did it for RTS and have the expertise required. But considering the goals for KOYA and that Kafka already provides most of the HA features, we evaluated Apache Slider. Slider was built to enable long running services on YARN without making changes to the services themselves. We found Slider sufficient to bring Kafka to YARN as it provides much of the infrastructure required for KOYA.
With KOYA, there is only one pool of resources, with all machines running under YARN. The Slider application master is responsible for keeping the Kafka server containers running, each controlled by a Slider agent, which is written in Python.
Using KOYA, the user can specify the resources for the Kafka servers in a configuration file. Today, YARN supports memory and CPU, with disk as future consideration (YARN-2139). It is also possible to use the other parameters that YARN supports, such as node labels and locality. One feature that KOYA would benefit from and isn’t available in YARN today is anti-affinity. Anti-affinity is needed to ensure only a single broker runs on a given machine for optimal performance. The set of candidate machines can be restricted via labels. In the absence of direct anti-affinity support, requesting 51 percent of available resources is a workaround solution.
Kafka relies on the local file system for storing its logs (topic partition data). Hence, it is important that the server remains on the same machine across restarts, unless this becomes impossible, due to a machine failure for example. Slider allows users to pin a component to the machine it was first allocated to and will add improved support to relax the affinity constraint when needed without user intervention (SLIDER-799). With the latter, the user will be able to allow an alternative machine to be used when needed and Kafka will be able to restore the partition replication.
What’s next?
KOYA is under development as open source and we are looking to take it forward in collaboration with Kafka and YARN communities. We are targeting Q2 for the first release and one of our objectives is to provide a dedicated admin web service for the Kafka cluster. We see this as a future part of Kafka that should be integrated as a Slider component and plan to work with the Kafka community on it. We also identified a number of enhancements to Slider that we are looking forward to incorporating with future releases.
Thomas Weise is principal architect at DataTorrent and has developed and architected distributed systems, middleware and web applications since 1997. Before joining DataTorrent at its inception, he served as principal engineer in the Hadoop Services team at Yahoo! and contributed to several of the ecosystem projects, including Pig, Hive and HCatalog. Porting of the MapReduce oriented infrastructure to Hadoop 2.x also gave motivation to explore alternative, more interactive and real-time processing capabilities on the new YARN architecture. Earlier, he worked on enterprise software for network/device management, e-commerce and search engine marketing.
Photo credit: Misha Dontsov / Foter / CC BY