Thursday, 22 May 2014

Hadoop MapReduce Output to multiple Cassandra Column Family

Hadoop MapReduce Output to multiple Cassandra Column Family

Introduction

Hadoop MapReduce is the de-facto of the Big Data Analytics that is used for analyzing very large datasets in scale of Petabytes using the power of distributed computing. As with most of the enterprise applications, we need a persistence layer that can provide data source and data storage capabilities to MapReduce Jobs.

Cassandra is a Highly Available, Distributed NoSQL/Key-Value Data Warehouse System which is designed for economical storage and management of very large datasets. The strength lies in peer-to-peer architecture, resulting in no single point of failure and thus, high availability.

Thus, Hadoop and Cassandra provide a compatible solution as both are distributed in architecture and Cassandra’s high availability minimizes the possibility of loss of information.

Hadoop supports multiple output formats for the persisting the MapReduce output like TextOutputFormat (for persisting as simple text file), SequenceFileOutputFormat (for persisting as serialized binary file). Similarly, it supports NoSQL/Key-Value Data Warehouse as Data Targets by exposing an interface, OutputFormat which can be implemented for persisting reducer output in that database.

Cassandra provides OutputFormat for writing to only single ColumnFamily (equivalent to RDBMS table). Moreover, ColumnFamily name has to be mentioned statically on compile time. Thus, it doesn’t support writing output to multiple column families. 

Real-World Use-Case for Need of Multiple Output Tables

Scenario Description

There is a Log Analysis Application which is responsible for analyzing the sales data of a very popular shopping store to find out the sales pattern across multiple product segments like Furniture, Electronics, Books, etc. The sales pattern can help in finding out the time-periods of high sale, buying patterns and preferences of the customer that can help in targeted advertising for that product segment and better customer service. The product segment-based sales information is fed into Decision-support systems.
But patterns for different product segments need to be treated differently as each product segment is mutually exclusive and is governed by different sets of factors and dynamics.

Thus, we need to keep different product segment data in different logical tables because
  1. We are analyzing a particular product segment data independent of another product segment data.
  2. If there is only one table, we have to query that table to find out the particular product segment data which will increase overhead as indexing is limited in scope in NoSQL databases.

Problem Statement

Thus, in respect to use case discussed above, a technique needs to be devised where we can write different product segment data to the corresponding product segment column family in Cassandra. Generalizing the problem, we need a solution which will enable us to output key-value pair to a particular column family among multiple column Families, where the output column family has to be determined during run-time on the basis of some filter condition.

As discussed before, Cassandra currently provide only one OutputFormat i.e. ColumnFamilyOutputFormat. This is not useful to our usecase as it writes to a single output ColumnFamily.

Solution Approaches

For the above discussed problem, there can be two approaches:
  • Independent Filtering Layer (Separate MapReduce Job): Inserting a Filtering Layer in architecture between Hadoop MapReduce and Cassandra Output Tables which can consist of number of MapReduce Jobs where each job will correspond to a single product segment.
So, for product segments = {furniture, books}, MapReduce jobs = {MRfurniture ,MRbook}

Each job, for instance MRfurniture will be responsible for mapping the analytics data(output of ProductSegmentAnalysisReducer and stored in HDFS or in Cassandra’s table) to the furniture product segment and persisting into Cassandra’s ColumnFamily named Furniture_Sales. It will run as an independent batch job after completion of Hadoop Analytics job.

In this approach, it can result into unnecessary addition in complexity as there is one more independent layer and increase in latency also as each product segment-mapping Job will take some extra time to complete.
  • A Customized OutputFormat for writing into multiple Cassandra’s Column Family:  This approach requires implementation of a customized OutputFormat which will do the task of filtering the data on basis of product segment(or other filter condition) during execution of the hadoop job only and then, persisting the data into corresponding Cassandra logical tables.
It is simple and can be relatively efficient, flexible and pluggable.

Selected Approach: Custom OutputFormat in Java

As both Hadoop and Cassandra are implemented in Java programming language, we will implement a new OutputFormat in Java, MultipleColumnFamilyOutputFormat which will extend the abstract class, OutputFormat provided by Hadoop API. This will enable to write the output Key-Value Pair to a specific column family that is specified or logically determined on run time.
There are some necessary Cassandra concepts that are needed for understanding this approach:
  • Data Model of Cassandra:   is based on Google’s BigTable data model and powered by Amazon Dynamo like infrastructure. It provides a structured key-value store. The Keyspace is the container for your application data, similar to a database or schema in a relational database. Inside the Keyspace are one or more Column Family objects, which are analogous to relational tables. Column families contain key-value pairs where a Key can map to multiple related values where values correspond to columns. Columns can be logically grouped and each row in a column family can have different numbers and sets of columns.
  • Mutation:    An Insert or Update in a Cassandra Column-Family. Mutation consists of two values: Key and Value. If Key doesn’t exist already, it inserts the value otherwise it updates the value.
  • Range:  It is a representation of the range of tokens that a node is responsible for, on the DHT (Distributed Hash Table) ring. Nodes of a Cassandra cluster are arranged in the form of a Ring.
  • RingCache:   It keeps track of the range of tokens that each node in the ring is responsible for.  Thus it allows us to arrange mutations by the endpoints they should be targeted at. The targeted endpoint essentially acts as the primary replica of rows being affected by the mutations.
The implementation of this approach in Java strictly follows the architecture of Hadoop and tries to follow the same structure and data flow as the already implemented OutputFormat of Hadoop for Cassandra, ColumnFamilyOutputFormat.
This approach includes implementing following Java Classes in the same order as mentioned.
  • Record Writer Component
    •  
      • ColumnFamilyRowKeyPair.java
      • RangeClient.java
      • MultipleColumnFamilyRecordWriter.java
  • Configuration Properties Management Component
  • MultipleColumnFamilyConfigHelper.java
  • Output Format Component
  • MultipleColumnFamilyOutputFormat.java

Solution Implementation

Record Writer Component

This component will be responsible to write Key-Value pair to the specified ColumnFamily using Thrift API.

ColumnFamilyRowKeyPair.JAVA

It is a serializable container class consisting of the values of Column Family and Row Key.
public class ColumnFamilyRowKeyPair<T1, T2> implements Serializable {
                private T1 left = null;
private T2 right = null;
                //Constructor
public ColumnFamilyRowKeyPair(T1 left, T2 right) {
this.left = left;
this.right = right;
}             
//Getters And Setters
}

RangeClient.JAVA

It acts as a client that runs as a thread daemon (in a thread pool) and connects to the list of endpoints (nodes) for a particular range of tokens i.e. row-keys.

Mutation whose key-value is in the specified token-range for the client is inserted into a special kind of queue called Blocking Queue. The java.util.concurrent.ArrayBlockingQueue provides a bounded, synchronized or thread-safe queue which can be used by multiple threads in a classic producer-consumer scenario. Further, It uses an Array kind of functionality, ordering elements in FIFO.
Its method offer(E e, long timeout, TimeUnit unit) method inserts the specified element, e into this queue, waiting up to the specified wait time, timeout if necessary for space to become available. It returns true if successful, or false if the specified waiting time elapses before space is available.

Input parameter is a pair of ColumnFamilyRowKeyPair in byte sequence and mutation corresponding to it.
Method put(Pair<ByteBuffer,Mutation> value):void
                {
                                while (true)
                                {
                                                if (lastException != null)
                                                                break and stop the thread and close the Thrift Connection
else
                                                {             
arrayBlockingQueue.offer(value, 100, TimeUnit.MILLISECONDS))
                                                                break;
                                                }
                                }
On the other hand, the client thread keeps reading the head element from the queue (waiting if no element is present in queue).
Method run():void
            {
                        while (! arrayBlockingQueue.isEmpty())
                        {
                                    Pair<ByteBuffer, Mutation> mutationPair = queue.take();
Next, mutationPair is parsed to retrieve column-Family Name and RowKey in Bytes.
while (mutation != null)
                                    {
                                                byte[] byteArrTemp = ByteBufferUtil.getArray(mutationPair.left);
                                              
                                                //byteArrTemp is deserialized into the ColumnFamilyRowKeyPair object
colFamilyName = cfRkPair.getLeft();
keybuff = ByteBufferUtil.bytes(cfRkPair.getRight());
Client creates a batch for each ColumnFamily where batch is a map of ColumnFamily and its associated mutations.
Map<ByteBuffer, Map<String, List<Mutation>>> batch = new HashMap<ByteBuffer, Map<String, List<Mutation>>>();

Map<String, List<Mutation>> subBatch = batch.get(keybuff);
if (subBatch == null)
{
subBatch = Collections.singletonMap(colFamilyName, (List<Mutation>) new ArrayList<Mutation>());

batch.put(keybuff, subBatch);
}
  subBatch.get(colFamilyName).add(mutation.right);
Once the BatchThreshold (Maximum number of elements in a batch) is reached, it will send the batch to thrift Client. The method poll() retrieves and removes the head of this queue, or returns null if this queue is empty.
if (batch.size() >= batchThreshold)
{break;}
mutation = arrayBlockingQueue.poll();
}
If it gets thrift exception (for not able to connect to the first endpoint for the client), it will iterate over other end-points for establishing thrift connection. This process conforms to the concept of High Availability for Cassandra Cluster.
Iterator<InetAddress> iter = endpoints.iterator();
                                    while (true)
                                    {
                                                // send the mutation to the last-used endpoint.
                                                thriftClient.batch_mutate(batch, consistencyLevel);
                                              
if (previous command doesn’t throw any exception)
break;
                                                else
                                                            //iterate over the other end-points
                                                            InetAddress address = iter.next();
thriftSocket = new TSocket(address.getHostName(), ConfigHelper.getRpcPort(conf));
                                                                      
thriftClient = ColumnFamilyOutputFormat.createAuthenticatedClient(thriftSocket, conf);
                                    }
}
}
RangeClient can be implemented as an inner class to the MultipleColumnFamilyRecordWriter or as a separate class.

MultipleColumnFamilyRecordWriter.JAVA

The main purpose of this Class is to initialize and manage RangeClient threads. It extends RecordWriter, a Generic Class provided by Hadoop MapReduce API.

The main function of this class is to implement the methodology to write Key-Value pair to a particular Cassandra table (where table name is decided during run-time only).
The most significant overridden method is write(ByteBuffer cfKeybuff, List<Mutation> value) which is called from the customized OutputFormat class that we will implement in next part. It is called each time there is some reducer output to be written. As shown, Input is a pair of ByteBuffer (which is the serialized Byte Stream of the ColumnFamilyRowKeyPair Class) and an ArrayList of the corresponding Mutation instances. After deserializing the ColumnFamilyRowKeyPair class, Range is determined for the given RowKey.
void write(ByteBuffer cfKeybuff, List<Mutation> value) { 
                        //CfKeyBuff is deserialized intoColumnFamilyRowKeyPair object, cfRkPair
                        keybuff = ByteBufferUtil.bytes(cfRkPair.getRight());
Range range = ringCache.getRange(keybuff);
Next, a RangeClient is assigned to that range and associated mutations are written to that client for commit. Internally, The RangeClient establishes connection to the assigned node points for committing the changes
RangeClient client = clients.get(range);
                        if (client == null)
                        {
                                    // haven't seen keys for this range: create new client
                                    client = new RangeClient(ringCache.getEndpoint(range));
                                    client.start();
                                    clients.put(range, client);
                        }
                        for (Mutation mutation : value)
client.put(new Pair<ByteBuffer,Mutation>(cfKeybuff, mutation));
}

Configuration Properties Management Component

MultipleColumnFamilyConfigHelper.JAVA

This class/component is basically a simple helper class, defined on the basis of the already existing Cassandra architecture. It is similar to the existing ConfigHelper class in the functionality and thus, it is made to inherit basic functionality of it by extending it.
Basically, it holds the essential Hadoop and Cassandra Configuration parameters. These parameters are required for connecting to Hadoop Cluster and Cassandra instance.  All other components refer to it for retrieving configuration values. Thus, it helps in maintaining consistency of configuration values across all components.
The parameters required to be define are defined below
public class MultipleColumnFamilyConfigHelper extends ConfigHelper
{
            private static final String OUTPUT_KEYSPACE_CONFIG = "cassandra.output.keyspace";
private static final String OUTPUT_KEYSPACE_USERNAME_CONFIG =          "cassandra.output.keyspace.username";
private static final String OUTPUT_KEYSPACE_PASSWD_CONFIG = "cassandra.output.keyspace.passwd";
            private static final String THRIFT_PORT = "cassandra.thrift.port";
            private static final String INITIAL_THRIFT_ADDRESS = "cassandra.thrift.address";
            private static final String PARTITIONER_CONFIG = "cassandra.partitioner.class";
Next we have to define getters and setters for these configuration parameters but they are different from usual getters and setters. For instance, for OUTPUT_KEYSPACE_CONFIG, getters and setters are defined as below.
public static String getOutputKeyspace(Configuration conf)
{
                        return conf.get(OUTPUT_KEYSPACE_CONFIG);
}
public static void setOutputKeyspace(Configuration conf, String keyspace)
{
conf.set(OUTPUT_KEYSPACE_CONFIG, keyspace);
}
Getters and Setters for rest of the configuration parameters can be defined in the similar fashion.

Output Format Component

MultipleColumnFamilyOutputFormat.JAVA

This component/class is a wrapper class above the Record Writer component. Thus, it provides a single point of contact to hadoop jobs for committing the output to Cassandra tables.

We only need to specify this class as the output-format class in hadoop’s Job configuration.
There are two important methods that need to be overridden.
  1. getOutputCommitter() method returns the output committer that is responsible for committing the output stream to the HDFS. But as we don’t need to write to HDFS, we will return an instance of OutputCommitter with no definition of methods. 
NullOutputCommitter is a skeleton class that extends org.apache.hadoop.mapreduce.OutputCommitter but with no definitions of inherited methods
public OutputCommitter getOutputCommitter(TaskAttemptContext context) {
                        return new NullOutputCommitter();
}

2. This is the most significant method where we call the Record Writer component and pass key-value pair to it. 
public MultipleColumnFamilyRecordWriter getRecordWriter(TaskAttemptContext context) {
            return new MultipleColumnFamilyRecordWriter(context);
}

HOW TO USE THIS OUTPUT FORMAT FROM THE MAPREDUCE JOBS

Now, we are ready with a new output format that can write hadoop output to multiple Cassandra column-families (tables).
Taking forward our usecase of analyzing data on basis of product segment(FURNITURE,ELECTRONICS and BOOKS) basis, following is a code snippet that shows the methodology to write data to Cassandra tables where table-name is determined on run time.
  1. Assuming there is mapper, ProductSegmentAnalysisMapper which has already mapped the input data into multiple product-segments.
2. Next part is the Reducer, ProductSegmentAnalysisReducer which will determine the name of column-family on run time and then, commit the data.
The reduce() function of the reducer that will take Mapper Output as Input is
protected void reduce(Text key,Iterable<DoubleWritable> values,Context context)
            throws IOException, InterruptedException
            {
                        // Code that will return key-value pair and associated Column-Family name
/*
* For writing output directly to the Cassandra Column Family
*/
ColumnFamilyRowKeyPair<String,String> cfRkPair = new ColumnFamilyRowKeyPair
<String, String>(columnFamilyName,UUID.randomUUID().toString());
//serialize the ColumnFamilyRowKeyPair object to ByteBuffer instance, outputRowKey

Map outputMap = new HashMap();
outputMap.put(columnFamilyName+"_Value", keyValue);
outputMap.put(columnFamilyName+"_Sales", aggregatedValue);
                        context.write(outputRowKey, getMutation(outputMap));
}
private List<Mutation> getMutation(Map outputMap) {
List<Mutation> mutations = new ArrayList<Mutation>();
for(Object key : outputMap.keySet())
{                                 
                                    Column col = new Column();
                                    col.setName(Arrays.copyOf(((String)key).getBytes(), ((String)key).length()));
                                  
if(outputMap.get(key) instanceof Double){
                                                col.setValue(ByteBufferUtil.bytes((Double)outputMap.get(key)));
                                    }
                                    else if(outputMap.get(key) instanceof String){
                                                col.setValue(ByteBufferUtil.bytes((String)outputMap.get(key)));
                                    }
                                    col.setTimestamp(System.currentTimeMillis());
                                  
                                    Mutation mutation = new Mutation();
                                    mutation.setColumn_or_supercolumn(new ColumnOrSuperColumn());
                                    mutation.column_or_supercolumn.setColumn(col);
                                    mutations.add(mutation);
                        }                     
                        return mutations;
            }

3. MAPREDUCE DRIVER: This is a driver class that is responsible for the execution of the hadoop job. It will set the MultipleColumnFamilyOutputFormat as the sink for reducer output. Finally, Hadoop Job is configured and executed.
The basic steps can be listed as below
  1. Configure the Hadoop Job following the standard procedure. Some deviations to be followed are
job.setOutputKeyClass(ByteBuffer.class);
            job.setOutputValueClass(List.class);    
            job.setOutputFormatClass(MultipleColumnFamilyOutputFormat.class);
            MultipleColumnFamilyConfigHelper.setOutputKeyspace(job.getConfiguration(), KEYSPACE);
MultipleColumnFamilyConfigHelper.setRpcPort(job.getConfiguration(), THRIFT_PORT);
MultipleColumnFamilyConfigHelper.setInitialAddress(job.getConfiguration(), THRIFT_HOSTNAME);
MultipleColumnFamilyConfigHelper.setPartitioner(job.getConfiguration(), "org.apache.cassandra.dht.RandomPartitioner");

2. The output ColumnFamily should pre-exist. Therefore, we can add an optional step for ensuring the availability of the required column families before the execution of the Hadoop Job.
It verifies the existence of required column families by querying cassandra’s metadata and creating the same, if they don’t exist. For quering and CRUD operations on Cassandra, it uses Hector’s Java Client for Cassandra.
CLUSTER_NAME is same as defined in Cassandra.yaml configuration file.
Cluster cassandraCluster = HFactory.getOrCreateCluster(CLUSTER_NAME,                                                        new CassandraHostConfigurator(RPC_ADDRESS));
            void checkCassandraKeyspaceExist(Cluster cassandraCluster,String ksName) {

KeyspaceDefinition ksDef = cassandraCluster.describeKeyspace(ksName);
if(ksDef == null){
int replicationFactor = 1;
List<ColumnFamilyDefinition> cfDefs = null;
ksDef = HFactory.createKeyspaceDefinition(ksName, SimpleStrategy.class.getName(),replicationFactor, cfDefs);
String outputMsg = cassandraCluster.addKeyspace(ksDef, true);
//log keyspace metadata
}
}
            void checkCassandraColumnFamilyExist(Cluster cassandraCluster,String ksName) {
                        //populate array, requiredCFNameArray with required Column Family names       
for(String ColFamilyName : requiredCFNameArray){
if(!isColFamilyExist(cassandraCluster,ksName,colFamilyName)){
createColFamily(cassandraCluster,ksName,colFamilyName);
}
}
}
boolean isColFamilyExist(Cluster cassandraCluster, String ksName, String colFamilyName) {
                        for(ColumnFamilyDefinition cfDef : cassandraCluster.describeKeyspace(ksName).
getCfDefs()){
if(cfDef.getName().equals(colFamilyName))
//set flag to true
}         
return flag;     
}
void createColFamily(Cluster cassandraCluster, String ksName,  String colFamilyName) {
                        BasicColumnDefinition valueColDef = new BasicColumnDefinition();
valueColDef.setName(new StringSerializer().toByteBuffer(colFamilyName+"_Value"));
valueColDef.setValidationClass(ComparatorType.UTF8TYPE.getClassName());
valueColDef.setIndexName(colFamilyName+"_Value_idx");
valueColDef.setIndexType(ColumnIndexType.KEYS);
                        //similarly define other Column Definitions
ColumnFamilyDefinition colFamilyDef = HFactory.createColumnFamilyDefinition(
ksName, colFamilyName,ComparatorType.UTF8TYPE,                                                                           Arrays.asList((ColumnDefinition)salesColDef));
colFamilyDef.setKeyValidationClass(ComparatorType.UTF8TYPE.getClassName());
colFamilyDef.setDefaultValidationClass(ComparatorType.UTF8TYPE.getClassName());
String outputMsg = cassandraCluster.addColumnFamily(colFamilyDef, true);
}

Advantages of using MultipleColumnFamilyOutputFormat

By using this customized OutputFormat, we can expect to:
  • Remove the need of the previously discussed separate filtering layer from the system.
  • Implicitly improves the performance of the system because previously if in time t, hadoop reducer is writing data in a single Cassandra table, now it is able to write in multiple Cassandra tables.
  • Provide easily pluggable component into hadoop ecosystem as it is following the basic principles of hadoop architecture and extending/implementing hadoop exposed interfaces and abstract classes only.
Some other advantages that this approach inherits from Hadoop and Cassandra are:
  • Handling Huge Amount of Data(in unit of Petabytes)
  • Scalability: Hadoop and Cassandra Cluster are scalable as new nodes can be added.
  • High Availability: The Cassandra is based on peer-to-peer Architecture.

Conclusion

By implementing the customized OutputFormat, MultipleColumnFamilyOutputFormat, we can leverage Hadoop MapReduce Framework to persist analytics data in a specific Cassandra Table(Column Family) which is determined on run time using number of Filter parameters and logic(like product segment parameter taken in usecase discussed in this paper).  Thus we can remove the need of some other independent component or filtering/querying layer in Hadoop Solutions for similar usecase.
This solution will be highly beneficial in use cases where we need to split analytics data from MapReduce in mutually-exclusive logical data sets (like mutually-exclusive datasets of monthly and yearly analysis data).

References



About Author
Author is currently working as Software Engineer at Infosys Limited, Bangalore, India. He is a part of the research group called Infosys Labs (http://www.infosys.com/infosys-labs). Currently working in Cloud Computing COE on emerging technologies such as Hadoop and related Big Data technologies.

2 comments:

  1. please share the source code

    ReplyDelete
  2. Positive site, where did u come up with the information on this posting?I have read a few of the articles on your website now, and I really like your style. Thanks a million and please keep up the effective work.






    mobile phone repair in Novi
    iphone repair in Novi
    cell phone repair in Novi
    phone repair in Novi
    tablet repair in Novi
    ipad repair in Novi
    mobile phone repair Novi
    iphone repair Novi
    cell phone repair Novi
    phone repair Novi

    ReplyDelete

Angular Tutorial (Update to Angular 7)

As Angular 7 has just been released a few days ago. This tutorial is updated to show you how to create an Angular 7 project and the new fe...