Tuesday, January 31, 2017

Apache Hadoop HDFS一Knowing the Basics

Hadoop HDFS (Hadoop Distributed File System) is a distributed Java-based file system for storing large volumes of data. It is designed:
  • To be a scalable, fault-tolerant, distributed storage system 
  • To be the data management layer of Apache Hadoop
    • Hadoop (data management layer) = HDFS + YARN
      • YARN provides the resource management 
      • HDFS provides the distributed storage for big data
    • HDFS works closely with a wide variety of concurrent data access applications, coordinated by YARN.
  • To span large clusters of commodity servers
    • HDFS will “just work” under a variety of physical and systemic circumstances.
    • HDFS cluster = NameNode + DataNodes
In this article, we will use Apache Hadoop HDFS from the Hortonworks Data Platform (HDP: version 2.4.2) in the discussion.  For HDFS High Availability (HA) feature, our reference is based on [2].

HDFS Cluster

An HDFS cluster is comprised of a NameNode, which manages the cluster metadata, and DataNodes that store the data.  Prior to Hadoop 2.0.0, the NameNode was a single point of failure (SPOF) in an HDFS cluster. Each cluster had a single NameNode, and if that machine or process became unavailable, the cluster as a whole would be unavailable .

You can follow the instructions here to format and start HDFS on Hortonworks Data Platform. HDFS can be accessed from applications in many different ways. Natively, HDFS provides a FileSystem Java API for applications to use. A C language wrapper for this Java API is also available. In addition, an HTTP browser can also be used to browse the files of an HDFS instance. Work is in progress to expose HDFS through the WebDAV protocol.  For more information, read here.[3,10,11]

Name Node

High-level summary of Name Node which it:
  • Provides high availability (HA) using redundant Name Nodes[2]
    • NameNode (active)
    • Secondary NameNode (standby)
  • Maintains the following two metadata files (or checkpoint files):
    • fsimage file
      • Holds the entire file system namespace,[12] including the mapping of blocks to files and file system properties
    • editlog file
      • Holds every change that occurs to the filesystem metadata

Namenode Web UI 

To smoke test your NameNode server, you can use the following URL[7,11]
to determine if you can reach the NameNode server with the browser. If successful, you can also select the Utilities menu to "browse the file system".

High Availability

The HDFS High Availability feature (vs. another new HDFS Federation feature) addresses the SPOF problem by providing the option of running two redundant NameNodes in the same cluster in an Active/Passive configuration with a hot standby. This allows a fast failover to a new NameNode in the case that a machine crashes, or a graceful administrator-initiated failover for the purpose of planned maintenance.

If your individual IDs of NameNodes are nn1 and nn2, you can get their service status using the following command:[3]

$ sudo -u hdfs hdfs haadmin -getServiceState nn1
$ sudo -u hdfs hdfs haadmin -getServiceState nn2

Metadata Files

When NameNode starts up, it reads FsImage and EditLog files from disk, merges all the transactions present in the EditLog to the FsImage, and flushes out this new version into a new FsImage on disk. It can then truncate the old EditLog because its transactions have been applied to the persistent FsImage.

Metadata files are stored at: 
  • ${dfs.namenode.name.dir}/edits
  • ${dfs.namenode.name.dir}/fsimage
where dfs.namenode.name.dir property can be configured in hdfs-site.xml.[8]

Data Node

High-level summary of Data Node:[4]
  • Scalable Storage
    • HDFS cluster storage scales horizontally with the addition of DataNodes
  • Minimal data motion
    • Hadoop moves compute processes to the data on HDFS and not the other way around. 
      • Processing tasks can occur on the physical node where the data resides, which significantly reduces network I/O and provides very high aggregate bandwidth.
  • Data Disk Failure一Heartbeats and replication
    • Each DataNode sends a Heartbeat message to the NameNode periodically. 
      • If NameNode detects a DataNode stop sending Heartbeat message, it marks DataNode as dead and stop forwarding new IO requests to them.
    • The NameNode constantly tracks which blocks need to be replicated and initiates replication whenever necessary. The necessity for re-replication may arise due to many reasons: 
      • a DataNode may become unavailable
      • a replica may become corrupted
      • a hard disk on a DataNode may fail
      • the replication factor of a file may be increased
  • Data Rebalancing
    • HDFS automatically move data from one DataNode to another if the free space on a DataNode falls below a certain threshold
  • Data Integritychecksum
    • When a client creates an HDFS file, it computes a checksum of each block of the file and stores these checksums in a separate hidden file in the same HDFS namespace. 


  1. Hadoop Distributed File System (HDFS)
  2. HDFS High Availability Using the Quorum Journal Manager
  3. HDFS Commands Guide (Apache Hadoop) 
    • All HDFS commands are invoked by the bin/hdfs script and can be grouped into:
      • User commands
      • Administrator commands
      • Debug commands
  4. HDFS Architecture (Apache Hadoop) 
  5. Apache Hadoop
  6. HDFS Federation (Hortonworks)
    • In order to scale the name service horizontally, federation uses multiple independent Namenodes/namespaces. 
  7. HDFS Ports (Hortonworks)
  8. Apache Ambari一Knowing the Basics (Xml and More)
  9. hdfs-default.xml (2.7.1)
  10. FileSystem Shell - Apache™ Hadoop
  11. Hadoop NameNode Web Interface
  12. Namespace (HDFS)
    • Consists of directories, files and blocks.
    • It supports all the namespace related file system operations such as create, delete, modify and list files and directories.
  13. Hadoop DistCp Guide
    • Copy file or directories recursively
  14. All Cloud-related articles on Xml and More

Saturday, January 28, 2017

Apache Ambari一Knowing the Basics

Apache Ambari provides an end-to-end management and monitoring application for Apache Hadoop. In a nutshell, it can be used for:
  • Managing most of the administration activities in a Hadoop cluster
    • To install, provision, deploy, manage, and monitor a Hadoop cluster
    • To hide the complexity of the Hadoop cluster management 
    • To provide a very easy and intuitive web UI.
  •  Integrating with other external tools for better management via its RESTful APIs
In this article, we will use Apache Ambari (Version from the Hortonworks Data Platform (HDP) in our discussion.

Hortonworks Data Platform

You can deploy Hortonworks Data Platform (HDP) using either Apache Ambari or not.  If you choose not to use Ambari, you can follow the instructions here.  However, it will be much easier to deploy Apache Hadoop stack with Ambari (see the instruction here).

After initial installation and deployment, your Apache Hadoop cluster could still grow and change with use over the time.  With Apache Ambari, you can easily and quickly add new services or expand the storage and processing capacity of the cluster.

The ecosystem of Ambari consists of three main components:
  • Ambari Web
  • Ambari Server
    •  Serves as the collection point for data from across the cluster
  • Ambari Agent
    • Run on each host in the cluster to allow the Ambari Server to control it

Ambari Web

Using the Ambari Web UI and REST APIs, you can deploy, operate, manage configuration changes, and monitor services for all nodes in your cluster from a central point.

Ambari Web is a client-side JavaScript application, which calls the Ambari REST API (accessible from the Ambari Server) to access cluster information and perform cluster operations. A relational database is used to store the information about the cluster configuration and topology.

With Ambari Views, you can customize the Ambari Web UI.  Ambari Views offer a systematic way to plug-in UI capabilities to surface custom visualization, management and monitoring features in Ambari Web.

Ambari Server

Before starting the Ambari Server, you must set up the Ambari Server once. Setup configures Ambari to talk to the Ambari database, installs the JDK and allows you to customize the user account (default: root) the Ambari Server daemon will run as.

After setup, all the configuration is stored in: 
  • /etc/ambari-server/conf/ambari.properties
Then you can run the following commands from the Ambari Server host:
  • ambari-server start
    • If you reboot your cluster, you must restart the Ambari Server and all the Ambari Agents manually.
  • ambari-server status
  • ambari-server stop
Once started, you can access Ambari using the following URL:
from a web browser.

The start script /usr/sbin/ambari-server is a shell script, that set environment variables and kicks off a python script which kicks off a java process (see details here).


You can start ambari in debug mode to get more detailed output via:
ambari-server start --verbose --debug
# or for short
ambari-server start -v -g
Significant files/directories:
  • /var/log/ambari-server/ambari-server.log 
    • To monitor Ambari, you do
      • tail -f /var/log/ambari-server/ambari-server.log
  • /var/lib/ambari-server/resources/ 
    • SQL scripts to initialize psql DB

Ambari Agent

Ambari Agents will heartbeat to the master every few seconds and will receive commands from the master in the heartbeat responses. Heartbeat responses will be the only way for master to send a command to the Agent. The command will be queued in the action queue, which will be picked up by the action executioner.

Action executioner will pick the right tool (Puppet, Python, etc) for execution depending on the command type and action type. Thus the actions sent in the hearbeat response will be processed asynchronously at the Agent. The action executioner will put the response or progress messages on the message queue. The Agent will send everything on the message queue to the master in the next heartbeat.

Here are the steps you do to install Ambari Agent manually on RHEL/CentOS/Oracle Linux 6:
  1. Install the Ambari Agent on every host in your cluster.
    • yum install ambari-agent
  2. Using a text editor, configure the Ambari Agent by editing the ambari-agent.ini file as shown below:
    • vi /etc/ambari-agent/conf/ambari-agent.ini
      [server] hostname=
  3. Start the Agent on every host in your cluster.
    • ambari-agent start
      • The Agent registers with the Server on start.
The Agent should not die if the master suddenly disappears. It should continue to poll at regular intervals and recover as needed when the master comes back up:
The Ambari Agent should keep all the necessary information it planned to send to the master in case of a connection failure and re-send the information after the master comes back up. It may need to re-register if it was previously in the process of registering.

  • The first thing to do if you run into trouble is to find the logs. Ambari Agent logs can be found at 
    • /var/log/ambari-agent/ambari-agent.log


  1. Non-Ambari Cluster Installation Guide (HDP)
  2. Installing Hadoop Using Ambari
  4. Understanding the Basics
  5. Ambari Architecture (pdf)
  6. How can I start my Ambari heartbeat?
  7. Installing Ambari Agents Manually
  9. Ambari Admin Guide (Version 
  10. Ambari Reference Guide (Version 
  11. Ambari User’s Guide (Version 
  12. Ambari Troubleshooting Guide (Version 
  13. Ambari Security Guide (Version 
  14. Automated Install with Ambari  (Version 
  15. Ambari Upgrade Guide (Version 
  16. Install, Configure, and Deploy an HDP Cluster 
  17. Ambari Agent certificates (to be removed if you need to update the Agent)
    • /var/lib/ambari-agent/keys/*
  18. Blueprint Support for HA Clusters (Apache Ambari)
  19. Ambari Metrics System ("AMS")
    • A system for collecting, aggregating and serving Hadoop and system metrics in Ambari-managed clusters
  20. All Cloud-related articles on Xml and More
  21. Installing Spark Using Ambari (HDP-2.5.3)

Thursday, January 19, 2017

ZooKeeper: Knowing the Basics

ZooKeeper can be run in two different modes:[1]
  • Standalone
    • is convenient for evaluation, development, and testing
  • Replicated
    • should be used in production
In this article, we will focus on running zookeeper in replicated mode and knowing its basics.

ZooKeeper Service 

Apache ZooKeeper can be used in distributed applications (e.g., Yahoo! Message Broker) to enable highly reliable distributed coordination.  First example is that you can use zookeeper for the high-availability of Spark standalone masters.[2] A standalone Spark Master can run with recovery mode enabled by using zookeeper and be able to recover state among the available swarm of masters. Another example is that HDFS NameNode HA can be enabled to allow a cluster to be configured such that a NameNode is not a single point of failure.[10] In these cases, HDFS relies on Zookeeper to manage the details of failover.

ZooKeeper Functionalities

ZooKeeper allows distributed processes to coordinate with each other through a shared hierarchical name space of data registers (or znodes), much like a file system. Here are the high-level descriptions of its functionalities:
  • Provides similar semantics as Google's Chubby for coordinating distributed systems, and being a consistent and highly available key-value store makes it an ideal cluster configuration store and directory of services
  • Is a centralized coordination service for
    • maintaining configuration data:
      • status information
      • configuration (e.g., security rules)
      • location information
    • naming
    • providing distributed synchronization
    • providing group services
  • Provides the following capabilities
    • Consensus
    • Group management
    • Presence protocols
  • Provides a variety of client bindings is available for a number of languages 
    • In the release
      • ships with C, Java, Perl and Python client bindings
    • From the community
      • check here for a list of client bindings that are available from the community but not yet included in the release

ZooKeeper Architecture

When run in replicated mode, the zookeeper service comprise of an Ensemble of servers.  A Zookeeper cluster, Ensemble, consists of a Leader node and followers. A leader node is chosen by consensus within the ensemble. If the leader fails another node will be elected as leader.

The design for the Ensemble requires that all know about each other.  Zookeeper servers maintain an in-memory image of the data tree along with a transaction logs and snapshots in a persistent store.  The downside to an in-memory database is that the size of the database that zookeeper can manage is limited by memory.

ZooKeeper Servers

To start ZooKeeper you need a configuration file which governs ZooKeeper's behavior. Here is a sample in conf/zoo.cfg:


The entries of the form server.X list the servers that make up the ZooKeeper service. When the server starts up, it knows which server it is by looking for the file myid in the data directory (i.e., dataDir). That file contains the server number, in ASCII.

Note the two port numbers after each server name: " 2888" and "3888". Peers use the former port to connect to other peers. Such a connection is necessary so that peers can communicate, for example, to agree upon the order of updates. More specifically, a ZooKeeper server uses this port to connect followers to the leader. When a new leader arises, a follower opens a TCP connection to the leader using this port. Because the default leader election also uses TCP, we currently require another port for leader election. This is the second port in the server entry.

For the details of other configuration parameters, read here.

ZooKeeper Clients

Clients only connect to a single ZooKeeper server. The client maintains a TCP connection through which it sends requests, get responses, gets watch events, and send heartbeats. If the TCP connection to the server breaks, the client will connect to a different server.

In ZooKeeper's configuration file, clientPort (e.g. 2181) is used to specify the port to listen for client connections.  There are two ways to connect to ZooKeeper service using:
  • telnet or nc[6] 
  • ZooKeeper Command Line Interface (CLI)[7] 

telnet or nc

You can issue the commands to ZooKeeper via telnet or nc, at the client port.  Each command is composed of four letters.  For example, command ruok can test if server is running in a non-error state. The server will respond with imok if it is running. Otherwise it will not respond at all.

$ echo ruok | nc 2181

For the full list of commands, read here.


ZooKeeper Command Line Interface (CLI)[7] is used to interact with the ZooKeeper ensemble for development purpose. It is useful for debugging and working around with different options.

To perform ZooKeeper CLI operations, first turn on your ZooKeeper server (“bin/zkServer.sh start”) and then, ZooKeeper client (“bin/zkCli.sh”).

$ ./current/zookeeper-client/bin/zkCli.sh
Connecting to localhost:2181

[zk: localhost:2181(CONNECTED) 0] help
ZooKeeper -server host:port cmd args
        stat path [watch]
        set path data [version]
        ls path [watch]

[zk: localhost:2181(CONNECTED) 1]

For information on installing the client side libraries, refer to the Bindings section of the ZooKeeper Programmer's Guide.


  1. Getting Started (Zookeeper)
  2. Spark Standalone - Using ZooKeeper for High-Availability of Master
  3. How to run zookeeper's zkCli.sh commands from bash?
  4. Apache Zookeeper Explained: Tutorial, Use Cases and Zookeeper Java API Examples
  5. ZooKeeper Administrator's Guide
  6. [Apache ZooKeeper] command line zkCli.sh Guide
  7. ZooKeeper - CLI
  8. ZooKeeper Client Bindings
  9. ZooKeeper Programer's Guide
  10. Blueprint Support for HA Clusters
  11. All Cloud-related articles on Xml and More