Showing posts with label Cassandra. Show all posts
Showing posts with label Cassandra. Show all posts

Monday, 20 October 2014

HBase vs Cassandra

Apache HBase vs Apache Cassandra

This comparative study was done by me and Larry Thomas in May, 2012. Cassandra stuff was prepared by Larry Thomas.
This information is NOT intended to be a tutorial for either Apache Cassandra orApache HBase. We tried our level best to provide the most accurate information. Please comment or email me if you find any corrections. I would be happy to maintain this list with the most accurate and updated information.


Point
HBase
Cassandra

Foundations

HBase is based on BigTable (Google)
Cassandra is based on DynamoDB (Amazon).   Initially developed at Facebook by former Amazon engineers.  This is one reason why Cassandra supports multi data center.  Rackspace is a big contributor to Cassandra due to multi data center support.
Infrastructure
HBase uses the Hadoop Infrastructure (Zookeeper, NameNode, HDFS).  Organizations that will deploy Hadoop anyway may be comfortable with leveraging Hadoop knowledge by using HBase
Cassandra started and evolved separate from Hadoop and its infrastructure and Operational knowledge requirements are different than Hadoop.  However, for analytics, many Cassandra deployments use Cassandra + Storm (which uses Zookeeper), and/or Cassandra + Hadoop.
Infrastructure Simplicity and SPOF
The HBase-Hadoop Infrastructure has several "moving parts" consisting of Zookeeper,  Name Node,  Hbase Master, and Data Nodes,   Zookeeper is clustered and naturally fault tolerant.  Name Node needs to be clustered to be fault tolerant.
Cassandra uses a a single Node-type.  All nodes are equal and perform all functions.   Any Node can act as a coordinator, ensuring no SPOF.   Adding Storm or Hadoop, of course, adds complexity to the infrastructure.
Read Intensive Use Cases
HBase is optimized for reads, supported by single-write master, and resulting strict consistency model, as well as use of Ordered Partitioning which supports row-scans.  HBase is well suited for doing Range based scans.
Cassandra has excellent single-row read performance as long as eventual consistency semantics are sufficient for the use-case.  Cassandra quorum reads, which are required for strict consistency will naturally be slower than Hbase reads.  Cassandra does not support Range based row-scans which may be limiting in certain use-cases.  Cassandra is well suited for supporting single-row queries,  or selecting multiple rows based on a  Column-Value index.
Multi-Data Center Support and Disaster Recovery
HBase provides for asynchronous replication of an HBase Cluster across a WAN.   HBase clusters cannot be set up to achieve zero RPO, but in steady-state HBase should be roughly failover-equivalent  to any other DBMS that relies on asynchronous replication over a WAN.   Fall-back processes and procedures (e.g. after failover) are TBD.
Cassandra Random Partitioning provides for row-replication of a single row across a WAN, either asynchronous (write.ONE,  write.LOCAL_QUORUM),  or synchronous (write.QUORUM,  write.ALL).  Cassandra clusters can therefore be set up to achieve zero RPO, but each write will require at least one wan-ACK back to the coordinator to achieve this capability.
Write.ONE Durability
Writes are replicated in a pipeline fashion: the first-data-node for the region persists the write, and then sends the write to the next Natural Endpoint, and so-on in a pipeline fashion. HBase’s commit log "acks" a write only after *all* of the nodes in the pipeline have written the data to their OS buffers.  The first Region Server in the pipeline must also have persisted the write to its WAL.
Cassandra's coordinators will send parallel write-requests to all Natural Endpoints, The coordinator will "ack" the write after exactly one Natural Endpoint has "acked" the write, which means that node has also persisted the write to its WAL.   The writes may or may not have committed to any other Natural Endpoint.
Ordered Partitioning
HBase only supports Ordered Partitoning.  This means that Rows for a CF are stored in RowKey order in HFiles, where each Hfile contains a "block" or "shard" of all the rows in a CF.  HFiles are distributed across all data-nodes in the Cluster
Cassandra officially supports Ordered Partitioning, but no production user of Cassandra uses Ordered Partitioning due to the "hot spots" it creates and the operational difficulties such hot-spots cause.  Random Partitioning is the only recommended Cassandra partitioning scheme, and rows are distributed across all nodes in the cluster.



RowKey Range Scans
Because of ordered partitioning,  HBase queries can be formulated with partial start and end row-keys, and can locate rows inclusive-of, or exclusive of these partial-rowkeys.  The start and end row-keys in a range-scan need not even exist in Hbase.
Because of random partitioning,  partial rowkeys cannot be used with Cassandra.  RowKeys must be known exactly.  Counting rows in a CF is complicated.   It is highly recommended that for these types of use-cases,  data should be stored in columns in Cassandra, not in rows.
Linear Scalability for large tables and range scans
Due to Ordered Partitioning, HBase will easily scale horizontally while still supporting rowkey range scans.
If data is stored in columns in Cassandra to support range scans, the practical limitation of a row size in Cassandra is 10's of Megabytes.  Rows larger than that causes problems with compaction overhead and time.
Atomic Compare and Set
HBase supports Atomic Compare and Set. HBase supports supports transaction within a Row.
Cassandra does not support Atomic Compare and Set.   Counters require dedicated counter column-families which because of eventual-consistency requires that all replicas in all natural end-points be read and updated with ACK.  However, hinted-handoff mechanisms can make even these built-in counters suspect for accuracy.   FIFO queues are difficult (if not impossible) to implement with Cassandra.
Read Load Balancing - single Row
Hbase does not support Read Load Balancing against a single row.  A single row is served by exactly one region server at a time.  Other replicas are used ony in case of a node failure.  Scalability is primarily supported by Partitioning which statistically distributes reads of different rows across multiple data nodes.
Cassandra will support Read Load Balancing against a single row.  However,  this is primarily supported by Read.ONE, and eventual consistency must be taken into consideration.  Scalability is primarily supported by Partitioning which distributes reads of different rows across multiple data nodes. 
Bloom Filters
Bloom Filters can be used in HBase as another form of Indexing.  They work on the basis of RowKey or RowKey+ColumnName to reduce the number of data-blocks that HBase has to read to satisfy a query.  (Bloom Filters may exhibit false-positives (reading too much data), but never false negatives (reading not enough data).
Cassandra uses bloom filters for key lookup.
Triggers
Triggers are supported by the CoProcessor capability in HBase.  They allow HBase to observe the get/put/delete events on a table (CF), and then execute the trigger-logic.    Triggers are coded as java classes.
Cassandra does not support co-processor-like functionality (as far as we know)
Secondary Indexes
Hbase does not natively support secondary indexes, but one use-case of Triggers is that a trigger on a "put" can automatically keep a secondary index up-to-date, and therefore not put the burden on the application (client).
Cassandra supports secondary indexes on column families where the column name is known.  (Not on dynamic columns).
Simple Aggregation
Hbase CoProcessors support out-of-the-box simple aggregations in HBase.   SUM, MIN, MAX, AVG,  STD.   Other aggregations can be built by defining java-classes to perform the aggregation
Aggregations in Cassandra are not supported by the Cassandra nodes - client must provide aggregations.  When the aggregation requirement spans multiple rows, Random Partitioning makes aggregations very difficult for the client.   Recommendation is to use Storm or Hadoop for aggregations.
HIVE Integration
HIVE can access HBase tables directly (uses de-serialization under the hood that is aware of the HBase file format).
Work in Progress (https://issues.apache.org/jira/browse/CASSANDRA-4131)
PIG Integration
PIG has native support for writing into/reading from HBase.
Cassandra 0.7.4+









          Point
        HBase
             Cassandra

CAP Theorem Focus
Consistency, Availability
Availability, Partition-Tolerance
Consistency
Strong
Eventual (Strong is Optional)
Single Write Master
Yes
No (R+W+1 to get Strong Consistency)
Optimized For
Reads
Writes
Main Data Structure
CF, RowKey,  Name Value Pair Set
CF, RowKey, Name Value Pair Set
Dynamic Columns
Yes
Yes
Column Names as Data
Yes
Yes
Static Columns
No
Yes
RowKey Slices
Yes
No
Static Column Value Indexes
No
Yes
Sorted Column Names
Yes
Yes
Cell Versioning Support
Yes
No



Bloom Filters
Yes
Yes(only on Key)
CoProcessors
Yes
No
Triggers
Yes(Part of Coprocessor)
No
Push Down Predicates
Yes(Part of Coprocessor)
No
Atomic Compare and Set
Yes
No
Explicit Row Locks
Yes
No
Row Key Caching
Yes
Yes
Partitioning Strategy
Ordered Partitioning
Random Partitioning recommended
Rebalancing
Automatic
Not Needed with Random Partitioning
Availability
N-Replicas across Nodes
N-Replicas across Nodes
Data Node Failure
Graceful Degredation
Graceful Degredation
Data Node Failure - Replication
N-Replicas Preserved
(N-1) Replicas Preserved + Hinted Handoff
Data Node Restoration
Same as Node Addition
Requires Node Repair Admin-action
Data Node Addition
Rebalancing Automatic
Rebalancing Requires Token-Assignment Adjustment
Data Node Management
Simple (Roll In, Role Out)
Human Admin Action Required
Cluster Admin Nodes
Zookeeper, NameNode, HMaster
All Nodes are Equal
SPOF
Now, all the Admin Nodes are Fault Tolerant
All Nodes are Equal
Write.ANY
No, but Replicas are Node Agnostic
Yes (Writes Never Fail if this option is used)
Write.ONE
Standard, HA, Strong Consistency
Yes (often used), HA,  Weak Consistency
Write.QUORUM
No (not required)
Yes (often used with Read.QUORUM for Strong Consistency
Write.ALL
Yes (performance penalty)
Yes (performance penalty, not HA)
Asynchronous WAN Replication
Yes, but it needs testing on corner cases.
Yes (Replica's can span data centers)
Synchronous WAN Replication
No
Yes with Write.QUORUM or Write.EACH-QUORUM
Compression Support
Yes
Yes

Thursday, 10 July 2014

cassandra Monitoring.


Guide to Cassandra Thread Pools


This guide provides a description of the different thread pools and how to monitor them. Includes what to alert on, common issues and solutions.

Concepts

Cassandra is based off of a Staged Event Driven Architecture (SEDA).  This separates different tasks in stages that are connected by a messaging service.  Each like task is grouped into a stage having a queue and thread pool (ScheduledThreadPoolExecutor more specifically for the Java folks).  Some stages skip the messaging service and queue tasks immediately on a different stage if it exists on the same node.  Each of these queues can be backed up if execution at a stage is being over run.  This is a common indication of an issue or performance bottleneck.To demonstrate take for example a read request:
blog

Note: For the sake of this example the data exists on a different node then the initial request, but in some scenarios this could all be on a single node.  There are also multiple paths a read may take.  This is just for demonstration purposes.
  1. NODE1> The request is queued from node1 on the ReadStage of node2 with the task holding a reference to a callback
  2. NODE2> The ReadStage will pull the task off the queue once one of its 32 threads become available
  3. NODE2> The task will execute to and compute the resulting Row, which will be sent off to the RequestResponse stage of node1
  4. NODE1> One of the four threads in the RequestResponseStage will process the task, and pass the resulting Row to the callback referenced from step 1 which will return to the client.
  5. NODE1> It will also possibly kick off an async read repair task on node1 to be processed in the ReadRepairStage once one of its four threads become available
The ReadRepairStage and how long it takes to process its work is not in the feedback loop from the client.  This means if the rate of reads occurs at a rate higher than the rate of read repairs the queue will grow until it is full without the client application ever noticing.  At the point of the queue being full (varies between stages, more below) the act of enqueueing a task will block.
In some stages the processing of these tasks are not critical.  These tasks are marked as “DROPPABLE” meaning the first thing the stage will do when executing it is to check if its exceeded past a timeout from when it was created.  If the timeout has passed it will throw it away instead of processing.

Accessing metrics

TPStats

For manual debugging this is the output given by
nodetool tpstats

Pool Name                    Active   Pending      Completed   Blocked  All time blocked
ReadStage                         0         0         113702         0                 0
RequestResponseStage              0         0              0         0                 0
MutationStage                     0         0         164503         0                 0
ReadRepairStage                   0         0              0         0                 0
ReplicateOnWriteStage             0         0              0         0                 0
GossipStage                       0         0              0         0                 0
AntiEntropyStage                  0         0              0         0                 0
MigrationStage                    0         0              0         0                 0
MemoryMeter                       0         0             35         0                 0
MemtablePostFlusher               0         0           1427         0                 0
FlushWriter                       0         0             44         0                 0
MiscStage                         0         0              0         0                 0
PendingRangeCalculator            0         0              1         0                 0
commitlog_archiver                0         0              0         0                 0
InternalResponseStage             0         0              0         0                 0
HintedHandoff                     0         0              0         0                 0

Message type           Dropped
RANGE_SLICE                  0
READ_REPAIR                  0
PAGED_RANGE                  0
BINARY                       0
READ                         0
MUTATION                     0
_TRACE                       0
REQUEST_RESPONSE             0
COUNTER_MUTATION             0
The description of the pool values (Active, Pending, etc) is defined in the table below. The second table lists dropped tasks by message type, for more information on that read below.

JMX

These are all available via JMX as well in:
org.apache.cassandra.request:type=*
and
org.apache.cassandra.internal:type=*
with the attributes (relative to tpstats) being:
MBean Attributetpstats nameDescription
ActiveCountActiveNumber of tasks pulled off the queue with a Thread currently processing.
PendingTasksPendingNumber of tasks in queue waiting for a thread
CompletedTasksCompletedNumber of tasks completed
CurrentlyBlockedTasksBlockedWhen a pool reaches its max thread count (configurable or set per stage, more below) it will begin queuing until the max size is reached.  When this is reached it will block until there is room in the queue.
TotalBlockedTasksAll time blockedTotal number of tasks that have been blocked

 Metrics Reporters

Cassandra has pluggable metrics reporting capabilities you can read more about here

Droppable Messages

The second table in nodetool tpstats displays a list of messages that were DROPPABLE. These ran after a given timeout set per message type so was thrown away. In JMX these are accessible via org.apache.cassandra.net:MessagingService or org.apache.cassandra.metrics:DroppedMessage. The following table will provide a little information on each type of message.
Message TypeStageNotes
BINARYn/aThis is deprecated and no longer has any use
_TRACEn/a (special)Used for recording traces (nodetool settraceprobability) Has a special executor (1 thread, 1000 queue depth) that throws away messages on insertion instead of within the execute
MUTATIONMutationStageIf a write message is processed after its timeout (write_request_timeout_in_ms) it either sent a failure to the client or it met its requested consistency level and will relay on hinted handoff and read repairs to do the mutation if it succeeded.
COUNTER_MUTATIONMutationStageIf a write message is processed after its timeout (write_request_timeout_in_ms) it either sent a failure to the client or it met its requested consistency level and will relay on hinted handoff and read repairs to do the mutation if it succeeded.
READ_REPAIRMutationStageTimes out after write_request_timeout_in_ms
READReadStageTimes out after read_request_timeout_in_ms. No point in servicing reads after that point since it would of returned error to client
RANGE_SLICEReadStageTimes out after range_request_timeout_in_ms.
PAGED_RANGEReadStageTimes out after request_timeout_in_ms.
REQUEST_RESPONSERequestResponseStageTimes out after request_timeout_in_ms. Response was completed and sent back but not before the timeout

Stage Details

Below lists a majority of exposed stages as of 2.0.7, but they tend to be a little volatile so if they do not exist don’t be surprised.  The heading of each section is correlates with the name of the stage from nodetool tpstats. The notes on the stages mostly refer to the 2.x data but some references to things in 1.2. 1.1 and previous versions are not considered at all in making of this document.
Alerts; given as a boolean expression, are intended as a starting point and should be tailored to specific use cases and environments in the case of many false positives.
When listed as “number of processors” its value is retrieved first by the system property “cassandra.available_processors” which can be overridden by adding a
-Dcassandra.available_processors
to JVM_OPTS. Falls back to default of Runtime.availableProcessors

Index

ReadStage

Performing a local read. Also includes deserializing data from row cache.  If there are pending values this can cause increased read latency.  This can spike due to disk problems, poor tuning, or over loading your cluster.  In many cases (not disk failure) this is resolved by adding nodes or tuning the system.
JMX beans:org.apache.cassandra.request.ReadStage
org.apache.cassandra.metrics.ThreadPools.request.ReadStage
Number of threads:concurrent_reads (default: 32)
Max pending tasks:231-1
Alerts:pending > 15 || blocked > 0

RequestResponseStage

When a response to a request is received this is the stage used to execute any callbacks that were created with the original request
JMX beans:org.apache.cassandra.request.RequestResponseStage
org.apache.cassandra.metrics.ThreadPools.request.RequestResponseStage
Number of threads:number of processors
Max pending tasks:231-1
Alerts:pending > 15 || blocked > 0

MutationStage

Performing a local including:
  • insert/updates
  • Schema merges
  • commit log replays
  • hints in progress
Similar to ReadStage, an increase in pending tasks here can be caused by disk issues, over loading a system, or poor tuning. If messages are backed up in this stage, you can add nodes, tune hardware and configuration, or update the data model and use case.
JMX beans:org.apache.cassandra.request.MutationStage
org.apache.cassandra.metrics.ThreadPools.request.MutationStage
Number of threads:concurrent_writers (default: 32)
Max pending tasks:231-1
Alerts:pending > 15 || blocked > 0

ReadRepairStage

Performing read repairs. Chance of them occurring is configurable per column family with read_repair_chance. More likely to back up if using CL.ONE (and to lesser possibly other non-CL.ALL queries) for reads and using multiple data centers. It will then be kicked off asynchronously outside of the queries feedback loop, demonstrated in the diagram above. Note that this is not very likely to be a problem since does not happen on all queries and is fast providing good connectivity between replicas. The repair being droppable also means that after write_request_timeout_in_ms it will be thrown away which further mitigates this. If pending grows attempt to lower the rate for high read CFs:
ALTER TABLE column_family WITH read_repair_chance = 0.01;
JMX beans:org.apache.cassandra.request.ReadRepairStage
org.apache.cassandra.metrics.ThreadPools.request.ReadRepairStage
Number of threads:number of processors
Max pending tasks:231-1
Alerts:pending > 15 || blocked > 0

ReplicateOnWriteStage

CounterMutation in 2.1, also counters changing dramatically so post 2.0 should consider this obsolete. Performs counter writes on non-coordinator nodes and replicates after a local write. This includes a read so can be pretty expensive. Will back up if the rate of writes exceed the rate that the mutations can occur. Particularly possible with CL.ONE and high counter increment workloads.
JMX beans:org.apache.cassandra.request.ReplicateOnWriteStage
org.apache.cassandra.metrics.ThreadPools.request.ReplicateOnWriteStage
Number of threads:concurrent_replicates (default: 32)
Max pending tasks:1024 x number of processors
Alerts:pending > 15 || blocked > 0

GossipStage

Post 2.0.3 there should no longer be issue with pending tasks. Instead monitor logs for a message:
Gossip stage has {} pending tasks; skipping status check (no nodes will be marked down)
Before that change, in particular older versions of 1.2, with a lot of nodes (100+) while using vnodes can cause a lot of cpu intensive work that caused the stage to get behind.
Been known to of been caused with out of sync schemas. Check NTP working correctly and attempt nodetool resetlocalschema or the more drastic deleting of system column family folder.
JMX beans:org.apache.cassandra.internal.GossipStage
org.apache.cassandra.metrics.ThreadPools.internal.GossipStage
Number of threads:1
Max pending tasks:231-1
Alerts:pending > 15 || blocked > 0

AntiEntropyStage

Repairing consistency. Handle repair messages like merkle tree transfer (from Validation compaction) and streaming.
JMX beans:org.apache.cassandra.internal.AntiEntropyStage
org.apache.cassandra.metrics.ThreadPools.internal.AntiEntropyStage
Number of threads:1
Max pending tasks:231-1
Alerts:pending > 15 || blocked > 0

MigrationStage

Making schema changes
JMX beans:org.apache.cassandra.internal.MigrationStage
org.apache.cassandra.metrics.ThreadPools.internal.MigrationStage
Number of threads:1
Max pending tasks:231-1
Alerts:pending > 15 || blocked > 0

MemtablePostFlusher

Operations after flushing the memtable. Discard commit log files that have had all data in them in sstables. Flushing non-cf backed secondary indexes.
JMX beans:org.apache.cassandra.internal.MemtablePostFlusher
org.apache.cassandra.metrics.ThreadPools.internal.MemtablePostFlusher
Number of threads:1
Max pending tasks:231-1
Alerts:pending > 15 || blocked > 0

FlushWriter

Sort and write memtables to disk. A vast majority of time this backing up is from over running disk capability. The sorting can cause issues as well however. In the case of sorting being a problem, it is usually accompanied with high load but a small amount of actual flushes (seen in cfstats). Can be from huge rows with large column names. i.e. something inserting many large values into a cql collection. If overrunning disk capabilities, it is recommended to add nodes or tune the configuration.
JMX beans:org.apache.cassandra.internal.FlushWriter
org.apache.cassandra.metrics.ThreadPools.internal.FlushWriter
Number of threads:memtable_flush_writers (1 per data directory)
Max pending tasks:memtable_flush_queue_size (default: 4)
Alerts:pending > 15 || blocked > 0

MiscStage

Snapshotting, replicating data after node remove completed.
JMX beans:org.apache.cassandra.internal.MiscStage
org.apache.cassandra.metrics.ThreadPools.internal.MiscStage
Number of threads:1
Max pending tasks:231-1
Alerts:pending > 15 || blocked > 0

InternalResponseStage

Responding to non-client initiated messages, including bootstrapping and schema checking
JMX beans:org.apache.cassandra.internal.InternalResponseStage
org.apache.cassandra.metrics.ThreadPools.internal.InternalResponseStage
Number of threads:number of processors
Max pending tasks:231-1
Alerts:pending > 15 || blocked > 0

HintedHandoff

Sending missed mutations to other nodes. Usually a symptom of a problem elsewhere so dont treat as root issue. Can use nodetool disablehandoff to prevent further saving. Can also use JMX (nodetool truncatehints) to clear the handoffs for specific endpoints or all of them with org.apache.cassandra.db:HintedHandoffManager operations. This must be followed up with repairs!
JMX beans:org.apache.cassandra.internal.HintedHandoff
org.apache.cassandra.metrics.ThreadPools.internal.HintedHandoff
Number of threads:max_hints_delivery_threads (default: 1)
Max pending tasks:231-1
Alerts:pending > 15 || blocked > 0

MemoryMeter

Measures memory usage and live ratio of a memtable. The pending should not grow beyond size of number of memtables (front ended by set to prevent duplicates). This can take minutes for large memtables.
JMX beans:org.apache.cassandra.internal.MemoryMeter
org.apache.cassandra.metrics.ThreadPools.internal.MemoryMeter
Number of threads:1
Max pending tasks:231-1
Alerts:pending > column_family_count || blocked > 0

PendingRangeCalculator

Calculates the token ranges based on bootstrapping and leaving nodes. Instead of blocking this stage discards tasks when one already in progress so no value in monitoring
JMX beans:org.apache.cassandra.internal.PendingRangeCalculator
org.apache.cassandra.metrics.ThreadPools.internal.PendingRangeCalculator
Number of threads:1
Max pending tasks:1

commitlog_archiver

(Renamed CommitLogArchiver in 2.1) Executes a command to copy (or user defined command) commit log files for recovery. Read more at DataStax Dev Blog
JMX beans:org.apache.cassandra.internal.commitlog_archiver
org.apache.cassandra.metrics.ThreadPools.internal.commitlog_archiver
Number of threads:1
Max pending tasks:231-1
Alerts:pending > 15 || blocked > 0

AntiEntropySessions

Number of active repairs that are in progress. Will not show up until after a repair has been run so any monitoring utilities should be able to handle it not existing.
JMX beans:org.apache.cassandra.internal.AntiEntropySessions
org.apache.cassandra.metrics.ThreadPools.internal.AntiEntropySessions
Number of threads:4
Max pending tasks:231-1
Alerts:pending > 1 || blocked > 0

Angular Tutorial (Update to Angular 7)

As Angular 7 has just been released a few days ago. This tutorial is updated to show you how to create an Angular 7 project and the new fe...