Monday 14 July 2014

Tuning Cassandra cluster configuration and improving performance


If you are considering to migrate to NO-SQL solution and want to use some data store which is open-source, massively scalable, fault tolerant, peer to peer and column based, there is a high probability that you will choose Apache Cassandra. According to my understanding, Apache Cassandra edges over Apache HBase when the data node topology required is something which is more robust and peer to peer. But again, HBase provides a tight integration with Hadoop ecosystem, which in a way or other can also be achieved by Apache Cassandra.
Anyways, yaml is always available to list down the configurable parameters but just to have a checklist, couple of important factors which should be take care when configuring Cassandra cluster for production environment (cloud/non-cloud based):

Q: Do we need high availability?
A: Yes, then we require to have multiple nodes across multiple data centres. We require to configure below:

  • Endpoint Snitch: You may want to specify which cassandra nodes are in the same data centre/same rack, write in ip address, data centres to be there in cluster. We can specify octet based (RackInferring), explicit property file mentioning node-DC mapping (PropertySnitch), or even EC2 region based(EC2Snitch)
  • Replication Factor: the copies of the data we want in our cluster, less or equals to number of nodes. 1 means no copies (just original data).
  • Partitioner: controls how data is distributed across nodes. If you want to facilitate range slice queries or want to provide indexing using Lucene/Solr (over Cassandra) then you will require OrderPreserving. There is a probability of Hot Spot in your cluster when using OrderPreserving. RandomPartitioning takes care of this hot spot problem as it performs a MD5 hash over "data key" get a "location key" which spans over a range of 0..2^127.
  • Seeds: You would like to feed in ip address of other nodes in the cluster.
  • ReplicaPlacement: You may want to control where data replicas are placed (something to add up to HA), OldNetworkTopology/RackAware places first one in other DC and others (if required) in other racks of the same DC. NetworkTopology places replicas (if required by ReplicationFactor) on locations each DC and Rack inside it.
Q: Do you want some new nodes to be added dynamically to the existing cluster?
A: Yes, then you will require to set Autobootstrap to true. For consistent hashing, Initial token must evenly divide the keyspace. Specifying Initial_Token you can dictate the token responsibility of the new node in the cluster. You can calculate the initial token for kth node in a node cluster by,
(k-1)*((2^127)/n)

Q: How to ensure consistency across distributed cassandra nodes?
A: Defined at data access layer (hector/thrift/avro), Consistency Level plays an important role in determining whether the overall cassandra cluster may serve old data or not (if replication has not happened yet). Consistency Level ANY, ONE for write and ONE for read gives better performance but at the cost of eventually consistent data. A higher data consistency can be achieved by QUORUM((N/2)+1), LOCAL_QUORUM (local DC's (RF/2)+1), EACH_QUORUM (each DC's (RF/2)+1), and finally ALL (which blocks till read/writer is served by all).
Just to sum up, if you want strongly consistent data (consistency level for write+consistency level for read >Replication factor. QUORUM provides consistent data and availability of data in case of some node failure. If performance/latency is more important than you can lower the values of consistency level for either READ or WRITE or both.

Q: How to achieve performance SLAs?
A: Read performances can be enhanced by tuning couple of parameters:

  • ColumFamily enables you to have Key and/or Row caching on them. You can specify the number of key/rows, percentage or fraction over the whole data you want to store in cache. Key cache is lighter as they data stored in them is just keys. By default, Cassandra caches 200, 000 keys per CF. Key cache decreases a I/O to index file to figure out the row location of the data corresponding to that key. It's very productive to have key cache. Row Cache holds the entire content of a row in cache, by default it is off. The overhead of enabling row cache or over increasing it is that you may require more JVM heap of Cassandra, may adverse the performance. Also, if data's column size is small, it's good so that we can increase the size of row cache without worrying too much about memory consumption, but if columns are larger than increasing row cache size is tricky. The best way to fine tune it is to watch out "RowCacheHitRatio" exposed per CF on JMX over a sample data run and according to the utilization configure the optimal size. If "RowCacheHitRatio" is too low <20-30 (which will be in case writes are much higher than read or extremely random reads, anyhow will trigger high GC activities), then I will suggest it doesn't make sense to enable row caching. You can use nodetool cfstats to see the performance/hit raio of key/row cache. It takes time to get a good idea about the hit rate because hit rate gradually matures.
  • Read performances can also be enhanced by fine tuning the Concurrent Reads, usually the rule is 4 threads per CPU core in the cluster. The higher the values the number of threads spanned for read, if the machines have faster I/O than the usual commodity versions then we can even increase this number a bit. Increasing it too much will in turn cause high context switches and will not yield higher performance gains.
  • Try to use faster RPC client instead of wrapper over wrapped RPC layer.
  • Read performance also get seriously degraded by contention during SSTable compaction, "NoofLiveSSTables" exposed on JMX (I guess so) gives you a typical indication about how many SSTables are getting used for a CF, or else we can use cfstats sstable count to get that. If this number is quite high or increasing, than your read operation may have contention with sstable compaction. Also, the more SSTables, the more fragmented (internal and external both) your data is. Increasing the configurations likememtable_flush_after_mins (or memtable_operations_in_millons or memtable_throughtput_in_mb {total ram/(1048576*16)}) will decrease memtable flushes which decrease number of SSTables and eventually number of them be compacted. Hence, less conflicts with read operations, which will improve read performance. The larger the value of these variables, the more heap memory you require to bubble up this in memory data structure. Even, the thread priority of the compaction thread inside can be controlled which eventually decreases number of compactions, this is a JVM argument to cassandra.

Write performance can be also be tuned, but do we really require to increase the lightning fast write behaviour cassandra provides? Anyway, following are the some of the ways to achieve it,

  • We can use bulkloading API's (StorageProxy, mutation or Binary verb) to write to Cassandra instead of using one of the wrappers.
  • Decreasing the Write Consistency level will also help as less number of nodes required to be blocked for the write operation.
  • It's preferable to use separate disk drive to write commit log. I guess this avoids I/O contention of Commit Log writes with SSTable reads.
  • You can tune Concurrent_Write to increase it from defaulted 32. Concurrent Writersshould be set a bit higher than Concurrent Reads, ranging between 1x to 1.5x (x=number of Concurrent Reads). You can set it around 12x of cores.
Swap memory may throw important challenges to achieve effective read/write performances in a java application. Operating System swaps pages to and from the disk even when sufficient memory available. During swap space read/write you may observe some difference in performance. You can disable the swap with swapoff command. On linux based OS, you can append vm.swappiness=0 or 5 to file /etc/sysctl.conf to reduce likelihood of OS to use swap space. Cassandra provides a memory_locking_policy parameter in yaml, you require to enable it.  JNA libraries helps you to lock JVM memory making it unevicatable to be swapped.
Performance of Memory Mapped I/O is better than regular I/O, on a 64 bit architecture you can use this performance efficient I/O by setting disk_access_mode  in yaml to mmap.

Q: Do I require to tune JVM for Cassandra cluster setup?
A: Yes, you may want to customize it. Make MAX and MIN to be same (just to avoid full GC during heap growth/heap shrink) It's expensive to grow the heap, just MIN to what you assume should be the maximum cassandra will consume, make MAX a bit higher than that for "just in case" cassandra is hit with more load. As in most cases, its better to do a full GC and grow the heap rather than get an OutOfMemory and crash. Heap size to be allocated to Cassandra, can be calculated to be approximately by, Memtable_throughtput_in_mb*3*(number of hot column families) + I GB + key cache + row cache.Cassandra default to many GC configurations which are perfectly configured for the big data entities. Just to for example, -XX:SurvivorRatio (defaulted to 8), try to retain as many objects in survivor spaces so that they can be reclaimed by GC, if we increase the size of survivor space then copying these long lived objects between survivors will trigger minor GC. Interesting balance, but better to copy more between survivors than to promote more to Tenured. Tenuring distribution can be seen using -XX:+PrintTenuringDistribution.

We have -XX:+UseParNewGC, parallel GC has ergonomics, like tuning young generation, it avoids/decrease frequency of major GCs, it avoids full GC by avoiding/decreasing promotions, maximizes the heap size. Also, we use CMS (ConcurrentMarkSweepGC), this is for tuning old generation, CMS is designed to avoid stop-the-world pauses. Number of concurrent threads in CMS can be controlled by -XX:+ParallelCMSThreads. Pauses dones during remark phases can be reduced by enabling -XX:+CMSParallelRemarkEnabled. Just to put a point if you want to use, by default classes will not be unloaded from the permanent generation when using CMS, this can be enabled by using -XX:+CMSClassUnloadingEnabled. 

On 64 bit machines, on Java SE 1.6 update 14, you can utilize the compressed pointers which results in smaller heap sizes, you have to append -XX:+UseCompressedOops to enable this.

Q: Any hardware changes to boost the performance?
A: Yes sure.
  • You can utilize 10 Gigabit ethernet to increase network throughput.
  • You can increase RAM, RAM can be allocated to Cassandra JVM heap, eventually passes advantage to lot many factors.
  • Multi Channel Memory Architecture (MCMA) technology increases the transfer rate between RAM and Memory Controllers by adding channels, theoretically transfer rate gets multiplied by the number of channels. Modern high end chip sets like, i7-9x series or latest of Xeon chip sets supports triple channel memory architecture. Even quad or eight channel memory chip sets are also present. You can optimise the RAM according to the multi memory channel, you can pair DIMMs as the number of memory channels, for example, if using triple channel memory chip sets, let each server to have triplets of DIMMs. You have to cross check the MCMA support of your server's Processor as well as Motherboard, once you know this, you can pair up your DIMMs according to that supported number to extract the best performance from the RAM.
  • On master or server you typically add more disk as RAID, whereas just to avoid administrative overhead, pipelined writes, you must add more disk to slaves as JBOD (typically 0.8*core number of disk to get optimal performance, but must not be greater than 1*core number of disk).
  • You can use fast seeking disks, Solid State Drives (SSD), SCSI systems are also better in performance. Typically, SATA drivers are recommended over SAS drives.
  • If you are setting filesystem as ext3 (you can disable the noatime, so that administrative overhead can be reduced) Changes require to done in /etc/fstab and do a mount -o remount /. If you have option to use other file system, you can use faster filesystems like ext4 and XFS, which also supports bigger file sizes and bigger volumes.
  • You can club multiple network cards to achieve high performance.

Thursday 10 July 2014

Building The Perfect Cassandra Test Environment

Building The Perfect Cassandra Test Environment
John Berryman — August 31, 2013 | 2 Comments | Filed in: solr Tagged: PlanetCassandra |
A month back, one of our clients asked us to set up 15 individual single-node Cassandra instances, each of which would live in 64MB of RAM and each of which would reside on the same machine. My first response was “Why!?”
Qualities Of An Ideal Cassandra Test Framework
So what are the qualities of an ideal Cassandra test framework?
Light-weight and available — A good test framework will take up as little resources as possible and be accessible right when you want it.
Parity with Production — The test environment should perfectly simulate the production environment. This is a no-brainer. After all what good does it do you to pass a test only to wonder whether or not an error lurks in the differences between the test and production environments?
Stateless — Between running tests, there’s no reason to keep any information around. So why not just throw it all away?
Isolated — Most often there will be several developers on a team, and there’s a good chance they’ll be testing things at the same time. It’s important to keep each developer quarantined from the others.
Fault Resistant — Remember, we’re a little concerned here that Cassandra is going to be a resource hog or otherwise just not work. Being “fault resistant” means striking the right balance so that Cassandra takes up as little resources as possible without actually failing.
Implementing The Ideal Cassandra Test Framework
The first thing to do is to set up the test environment on a per-developer basis. This means changing a few paths. From cassandra.yaml change
data_file_directories:
- /home/jberryman/cassandra/data
commitlog_directory: /home/jberryman/cassandra/commitlog
saved_caches_directory: /home/jberryman/saved_caches
And then in log4j-server.properties change
log4j.appender.R.File=/home/jberryman/cassandra/system.log
Next, it’s a good idea to create a wrapper around whatever client you’re using. This has several benefits. For one thing, creating a wrapper provides a guard against the client changing from under you. This is especially important right now since so many clients are scrambling to be CQL3 compliant. This wrapper is also a great place to stick any safeguards against horking up your production data when you thinkyou’re running a test. Perhaps the easiest way to safeguard against this is to issue the CQL DESCRIBE CLUSTER statement and make sure that the cluster name is “TestCluster”. (If your CQL client doesn’t honor this statement, you can just create a keyspace called “Yes_ThisIsIndeedATestCluster” and test for its existence.) Once the wrapper is complete, it can be used with functional parity on both the test and production cluster.
The simplest way to make Cassandra light weight is to simply declare it so! In cassandra-env.sh, simply change
MAX_HEAP_SIZE="64M"
HEAP_NEWSIZE="12M"
However, just because you have now declared Cassandra to be light weight doesn’t mean that it will JustWork™. Given this little heap space to move in, Cassandra will happily toss you an OutOfMemory error on it’s first SSTable flush or compaction or garbage collection. To guard against this we have a bit of work to do!
The first thing to do is to reduce the number of threads, especially for reading and writing. In cassandra.yaml there are several changes to make:
rpc_server_type: hsha
Here, hsha stands for “half synchronous, half asynchronous.” This makes sure that all thrift clients are handled asynchronously using a small number of threads that do not vary with the amount of thrift clients.
concurrent_reads: 2;
concurrent_writes: 2
rpc_min_threads: 1;
rpc_max_threads: 1
As stated, the first two lines limit the number of reads and writes that can happen at the same time. 2 is the minimum number allowed here. The second two lines limit how many threads are available used for serving requests. Everything to this point serves to make sure that writes and reads can not overpower Cassandra during flushes and compactions. Next up:
concurrent_compactors: 1
If you are using SSD’s then this will limit the number of compactors to 1. If you’re using spinning magnets, then you’re already limited to a single concurrent compactor.
Next we need to make sure that we do everything we can so that compaction is not hindered. One setting here:
compaction_throughput_mb_per_sec: 0
This disables compaction throttling completely so that compaction has full reign over other competing priorities.
Next we turn all the knobs on memory usage as low as possible:
in_memory_compaction_limit_in_mb: 1
This is the minimal limit for allowing compaction to take place in memory. With such a low setting, much of compaction will take place in a 2-pass method that is I/O intensive — but I/O is not the thing we’re worried about!
key_cache_size_in_mb: 0
At the expense of read times, we can do away with key caches. But this may not even be necessary because we can do even better:
reduce_cache_sizes_at: 0
reduce_cache_capacity_to: 0
The first line say “As soon as you’ve used up this much memory, then reduce cache capacity.” And since this is set to 0, cache capacity is reduced just about as soon as Cassandra starts being used. The second line then dictates that the caches should effectively not be used at all.
Finally, on a test cluster, we’re not worried about data durability, so there are plenty of safeguards that we can simply do away with. For one, before starting the test cluster, go ahead and remove everything in the data dir and commitlog directories. Next, in cassandra.yaml set hinted_handoff_enabled: false. When creating a test keyspace, go ahead and set durable_writes = false so that the commit log is never even populated. Finally, when creating test tables, consider settingread_repair_chance = 0 and bloom_filter_fp_chance = 1. Though perhaps these modifications on keyspaces and tables are unnecessary because I was able to get pretty good performance without them.
Testing The Test Framework
Now since all of our changes are in place, let’s fire up Cassandra and see how she performs!
$ rm -fr /home/jberryman/cassandra && bin/cassandra -f
So far good. “Starting listening for CQL clients on localhost/127.0.0.1:9042″ means that we’re alive and ready to service requests. Now it’s time to slam Cassandra:
$ bin/cassandra-stress
total,interval_op_rate,interval_key_rate,latency/95th/99th,elapsed_time
33287,3328,3328,8.0,54.6,277.0,10
85059,5177,5177,7.5,33.3,276.7,20
133153,4809,4809,7.4,34.0,274.8,30
183111,4995,4995,6.9,31.6,165.1,40
233177,5006,5006,6.8,32.0,123.5,51
288998,5582,5582,6.7,26.7,123.5,61
341481,5248,5248,6.7,26.3,129.7,71
391594,5011,5011,6.7,26.7,129.7,81
441645,5005,5005,6.5,29.0,122.5,92
494198,5255,5255,6.3,28.3,122.9,102
539406,4520,4520,6.4,24.4,122.9,112
591272,5186,5186,6.4,26.8,122.9,122
641202,4993,4993,6.6,27.9,122.9,132
696041,5483,5483,6.6,28.2,122.9,143
747078,5103,5103,6.5,26.1,274.4,153
797125,5004,5004,6.4,25.3,274.4,163
839887,4276,4276,6.1,23.9,273.6,173
880678,4079,4079,6.0,22.9,273.6,184
928384,4770,4770,5.8,21.7,273.6,194
979878,5149,5149,5.7,20.2,273.6,204
1000000,2012,2012,5.5,19.4,273.6,208
END
Wow… so not only does it not die, it’s actually pretty darn performant! Looking back at the logs I see a couple warnings:
WARN 17:15:57,030 Heap is 0.5260566963447822 full.  You may need to reduce
memtable and/or cache sizes.  Cassandra is now reducing cache sizes to free up
memory.  Adjust reduce_cache_sizes_at threshold in cassandra.yaml if you don't
want Cassandra to do this automatically
Ah… this has to do with the reduce_cache_sizes_at, reduce_cache_capacity_to bit from earlier. After this warning we hits, we know that caches have been tossed out. Without caches, I wonder how that will affect the read performance. Let’s see!
$ bin/cassandra-stress --operation READ
total,interval_op_rate,interval_key_rate,latency/95th/99th,elapsed_time
34948,3494,3494,8.4,39.9,147.0,10
95108,6016,6016,7.9,19.3,145.2,20
155830,6072,6072,7.8,15.4,144.7,30
213037,5720,5720,7.8,14.6,72.5,40
274021,6098,6098,7.8,13.7,56.8,51
335575,6155,6155,7.7,12.6,56.6,61
396074,6049,6049,7.7,12.6,56.6,71
455660,5958,5958,7.7,12.7,45.8,81
516840,6118,6118,7.7,12.3,45.8,91
576045,5920,5920,7.7,12.3,45.6,102
635237,5919,5919,7.7,12.7,45.6,112
688830,5359,5359,7.7,13.5,45.6,122
740047,5121,5121,7.7,15.1,45.8,132
796249,5620,5620,7.8,14.8,42.4,143
853788,5753,5753,7.9,14.1,37.1,153
906821,5303,5303,7.9,15.1,37.1,163
963981,5716,5716,7.9,14.1,37.1,173
1000000,3601,3601,7.9,13.3,37.1,180
END
Hooray, it works! And it’s still quite performant! I was concerned about the lack of caches killing Cassandra read performance, but it seems to be just fine. Looking back at the log file again, there are several more warnings each look about like this:
WARN 17:16:25,082 Heap is 0.7914885099943694 full.  You may need to reduce
memtable and/or cache sizes.  Cassandra will now flush up to the two largest
memtables to free up memory.  Adjust flush_largest_memtables_at threshold in
cassandra.yaml if you don't want Cassandra to do this automatically
WARN 17:16:25,083 Flushing CFS(Keyspace='Keyspace1', ColumnFamily='Standard1')
to relieve memory pressure
Despite the fact that we’re regularly having these emergency memtable flushes, Cassandra never died!
Popping open jconsole, we can make a couple more observations. The first is that while the unaltered Cassandra process takes up roughly 8GB of memory, this test Cassandra never goes over 64MB. Second, we also see that that the number of threads on the unaltered Cassandra hovers around 120-130 while the test Cassandra remains somewhere between 40 and 50.
Conclusion

So you see, my client’s request was actually quite reasonable and quite a good idea! Now they have a test framework that is able to support 15 developers on a single machine so that each developer has their own isolated test environment. This is a good example of how consultants sometimes learn from the companies they’re consulting.

cassandra Monitoring.


Guide to Cassandra Thread Pools


This guide provides a description of the different thread pools and how to monitor them. Includes what to alert on, common issues and solutions.

Concepts

Cassandra is based off of a Staged Event Driven Architecture (SEDA).  This separates different tasks in stages that are connected by a messaging service.  Each like task is grouped into a stage having a queue and thread pool (ScheduledThreadPoolExecutor more specifically for the Java folks).  Some stages skip the messaging service and queue tasks immediately on a different stage if it exists on the same node.  Each of these queues can be backed up if execution at a stage is being over run.  This is a common indication of an issue or performance bottleneck.To demonstrate take for example a read request:
blog

Note: For the sake of this example the data exists on a different node then the initial request, but in some scenarios this could all be on a single node.  There are also multiple paths a read may take.  This is just for demonstration purposes.
  1. NODE1> The request is queued from node1 on the ReadStage of node2 with the task holding a reference to a callback
  2. NODE2> The ReadStage will pull the task off the queue once one of its 32 threads become available
  3. NODE2> The task will execute to and compute the resulting Row, which will be sent off to the RequestResponse stage of node1
  4. NODE1> One of the four threads in the RequestResponseStage will process the task, and pass the resulting Row to the callback referenced from step 1 which will return to the client.
  5. NODE1> It will also possibly kick off an async read repair task on node1 to be processed in the ReadRepairStage once one of its four threads become available
The ReadRepairStage and how long it takes to process its work is not in the feedback loop from the client.  This means if the rate of reads occurs at a rate higher than the rate of read repairs the queue will grow until it is full without the client application ever noticing.  At the point of the queue being full (varies between stages, more below) the act of enqueueing a task will block.
In some stages the processing of these tasks are not critical.  These tasks are marked as “DROPPABLE” meaning the first thing the stage will do when executing it is to check if its exceeded past a timeout from when it was created.  If the timeout has passed it will throw it away instead of processing.

Accessing metrics

TPStats

For manual debugging this is the output given by
nodetool tpstats

Pool Name                    Active   Pending      Completed   Blocked  All time blocked
ReadStage                         0         0         113702         0                 0
RequestResponseStage              0         0              0         0                 0
MutationStage                     0         0         164503         0                 0
ReadRepairStage                   0         0              0         0                 0
ReplicateOnWriteStage             0         0              0         0                 0
GossipStage                       0         0              0         0                 0
AntiEntropyStage                  0         0              0         0                 0
MigrationStage                    0         0              0         0                 0
MemoryMeter                       0         0             35         0                 0
MemtablePostFlusher               0         0           1427         0                 0
FlushWriter                       0         0             44         0                 0
MiscStage                         0         0              0         0                 0
PendingRangeCalculator            0         0              1         0                 0
commitlog_archiver                0         0              0         0                 0
InternalResponseStage             0         0              0         0                 0
HintedHandoff                     0         0              0         0                 0

Message type           Dropped
RANGE_SLICE                  0
READ_REPAIR                  0
PAGED_RANGE                  0
BINARY                       0
READ                         0
MUTATION                     0
_TRACE                       0
REQUEST_RESPONSE             0
COUNTER_MUTATION             0
The description of the pool values (Active, Pending, etc) is defined in the table below. The second table lists dropped tasks by message type, for more information on that read below.

JMX

These are all available via JMX as well in:
org.apache.cassandra.request:type=*
and
org.apache.cassandra.internal:type=*
with the attributes (relative to tpstats) being:
MBean Attributetpstats nameDescription
ActiveCountActiveNumber of tasks pulled off the queue with a Thread currently processing.
PendingTasksPendingNumber of tasks in queue waiting for a thread
CompletedTasksCompletedNumber of tasks completed
CurrentlyBlockedTasksBlockedWhen a pool reaches its max thread count (configurable or set per stage, more below) it will begin queuing until the max size is reached.  When this is reached it will block until there is room in the queue.
TotalBlockedTasksAll time blockedTotal number of tasks that have been blocked

 Metrics Reporters

Cassandra has pluggable metrics reporting capabilities you can read more about here

Droppable Messages

The second table in nodetool tpstats displays a list of messages that were DROPPABLE. These ran after a given timeout set per message type so was thrown away. In JMX these are accessible via org.apache.cassandra.net:MessagingService or org.apache.cassandra.metrics:DroppedMessage. The following table will provide a little information on each type of message.
Message TypeStageNotes
BINARYn/aThis is deprecated and no longer has any use
_TRACEn/a (special)Used for recording traces (nodetool settraceprobability) Has a special executor (1 thread, 1000 queue depth) that throws away messages on insertion instead of within the execute
MUTATIONMutationStageIf a write message is processed after its timeout (write_request_timeout_in_ms) it either sent a failure to the client or it met its requested consistency level and will relay on hinted handoff and read repairs to do the mutation if it succeeded.
COUNTER_MUTATIONMutationStageIf a write message is processed after its timeout (write_request_timeout_in_ms) it either sent a failure to the client or it met its requested consistency level and will relay on hinted handoff and read repairs to do the mutation if it succeeded.
READ_REPAIRMutationStageTimes out after write_request_timeout_in_ms
READReadStageTimes out after read_request_timeout_in_ms. No point in servicing reads after that point since it would of returned error to client
RANGE_SLICEReadStageTimes out after range_request_timeout_in_ms.
PAGED_RANGEReadStageTimes out after request_timeout_in_ms.
REQUEST_RESPONSERequestResponseStageTimes out after request_timeout_in_ms. Response was completed and sent back but not before the timeout

Stage Details

Below lists a majority of exposed stages as of 2.0.7, but they tend to be a little volatile so if they do not exist don’t be surprised.  The heading of each section is correlates with the name of the stage from nodetool tpstats. The notes on the stages mostly refer to the 2.x data but some references to things in 1.2. 1.1 and previous versions are not considered at all in making of this document.
Alerts; given as a boolean expression, are intended as a starting point and should be tailored to specific use cases and environments in the case of many false positives.
When listed as “number of processors” its value is retrieved first by the system property “cassandra.available_processors” which can be overridden by adding a
-Dcassandra.available_processors
to JVM_OPTS. Falls back to default of Runtime.availableProcessors

Index

ReadStage

Performing a local read. Also includes deserializing data from row cache.  If there are pending values this can cause increased read latency.  This can spike due to disk problems, poor tuning, or over loading your cluster.  In many cases (not disk failure) this is resolved by adding nodes or tuning the system.
JMX beans:org.apache.cassandra.request.ReadStage
org.apache.cassandra.metrics.ThreadPools.request.ReadStage
Number of threads:concurrent_reads (default: 32)
Max pending tasks:231-1
Alerts:pending > 15 || blocked > 0

RequestResponseStage

When a response to a request is received this is the stage used to execute any callbacks that were created with the original request
JMX beans:org.apache.cassandra.request.RequestResponseStage
org.apache.cassandra.metrics.ThreadPools.request.RequestResponseStage
Number of threads:number of processors
Max pending tasks:231-1
Alerts:pending > 15 || blocked > 0

MutationStage

Performing a local including:
  • insert/updates
  • Schema merges
  • commit log replays
  • hints in progress
Similar to ReadStage, an increase in pending tasks here can be caused by disk issues, over loading a system, or poor tuning. If messages are backed up in this stage, you can add nodes, tune hardware and configuration, or update the data model and use case.
JMX beans:org.apache.cassandra.request.MutationStage
org.apache.cassandra.metrics.ThreadPools.request.MutationStage
Number of threads:concurrent_writers (default: 32)
Max pending tasks:231-1
Alerts:pending > 15 || blocked > 0

ReadRepairStage

Performing read repairs. Chance of them occurring is configurable per column family with read_repair_chance. More likely to back up if using CL.ONE (and to lesser possibly other non-CL.ALL queries) for reads and using multiple data centers. It will then be kicked off asynchronously outside of the queries feedback loop, demonstrated in the diagram above. Note that this is not very likely to be a problem since does not happen on all queries and is fast providing good connectivity between replicas. The repair being droppable also means that after write_request_timeout_in_ms it will be thrown away which further mitigates this. If pending grows attempt to lower the rate for high read CFs:
ALTER TABLE column_family WITH read_repair_chance = 0.01;
JMX beans:org.apache.cassandra.request.ReadRepairStage
org.apache.cassandra.metrics.ThreadPools.request.ReadRepairStage
Number of threads:number of processors
Max pending tasks:231-1
Alerts:pending > 15 || blocked > 0

ReplicateOnWriteStage

CounterMutation in 2.1, also counters changing dramatically so post 2.0 should consider this obsolete. Performs counter writes on non-coordinator nodes and replicates after a local write. This includes a read so can be pretty expensive. Will back up if the rate of writes exceed the rate that the mutations can occur. Particularly possible with CL.ONE and high counter increment workloads.
JMX beans:org.apache.cassandra.request.ReplicateOnWriteStage
org.apache.cassandra.metrics.ThreadPools.request.ReplicateOnWriteStage
Number of threads:concurrent_replicates (default: 32)
Max pending tasks:1024 x number of processors
Alerts:pending > 15 || blocked > 0

GossipStage

Post 2.0.3 there should no longer be issue with pending tasks. Instead monitor logs for a message:
Gossip stage has {} pending tasks; skipping status check (no nodes will be marked down)
Before that change, in particular older versions of 1.2, with a lot of nodes (100+) while using vnodes can cause a lot of cpu intensive work that caused the stage to get behind.
Been known to of been caused with out of sync schemas. Check NTP working correctly and attempt nodetool resetlocalschema or the more drastic deleting of system column family folder.
JMX beans:org.apache.cassandra.internal.GossipStage
org.apache.cassandra.metrics.ThreadPools.internal.GossipStage
Number of threads:1
Max pending tasks:231-1
Alerts:pending > 15 || blocked > 0

AntiEntropyStage

Repairing consistency. Handle repair messages like merkle tree transfer (from Validation compaction) and streaming.
JMX beans:org.apache.cassandra.internal.AntiEntropyStage
org.apache.cassandra.metrics.ThreadPools.internal.AntiEntropyStage
Number of threads:1
Max pending tasks:231-1
Alerts:pending > 15 || blocked > 0

MigrationStage

Making schema changes
JMX beans:org.apache.cassandra.internal.MigrationStage
org.apache.cassandra.metrics.ThreadPools.internal.MigrationStage
Number of threads:1
Max pending tasks:231-1
Alerts:pending > 15 || blocked > 0

MemtablePostFlusher

Operations after flushing the memtable. Discard commit log files that have had all data in them in sstables. Flushing non-cf backed secondary indexes.
JMX beans:org.apache.cassandra.internal.MemtablePostFlusher
org.apache.cassandra.metrics.ThreadPools.internal.MemtablePostFlusher
Number of threads:1
Max pending tasks:231-1
Alerts:pending > 15 || blocked > 0

FlushWriter

Sort and write memtables to disk. A vast majority of time this backing up is from over running disk capability. The sorting can cause issues as well however. In the case of sorting being a problem, it is usually accompanied with high load but a small amount of actual flushes (seen in cfstats). Can be from huge rows with large column names. i.e. something inserting many large values into a cql collection. If overrunning disk capabilities, it is recommended to add nodes or tune the configuration.
JMX beans:org.apache.cassandra.internal.FlushWriter
org.apache.cassandra.metrics.ThreadPools.internal.FlushWriter
Number of threads:memtable_flush_writers (1 per data directory)
Max pending tasks:memtable_flush_queue_size (default: 4)
Alerts:pending > 15 || blocked > 0

MiscStage

Snapshotting, replicating data after node remove completed.
JMX beans:org.apache.cassandra.internal.MiscStage
org.apache.cassandra.metrics.ThreadPools.internal.MiscStage
Number of threads:1
Max pending tasks:231-1
Alerts:pending > 15 || blocked > 0

InternalResponseStage

Responding to non-client initiated messages, including bootstrapping and schema checking
JMX beans:org.apache.cassandra.internal.InternalResponseStage
org.apache.cassandra.metrics.ThreadPools.internal.InternalResponseStage
Number of threads:number of processors
Max pending tasks:231-1
Alerts:pending > 15 || blocked > 0

HintedHandoff

Sending missed mutations to other nodes. Usually a symptom of a problem elsewhere so dont treat as root issue. Can use nodetool disablehandoff to prevent further saving. Can also use JMX (nodetool truncatehints) to clear the handoffs for specific endpoints or all of them with org.apache.cassandra.db:HintedHandoffManager operations. This must be followed up with repairs!
JMX beans:org.apache.cassandra.internal.HintedHandoff
org.apache.cassandra.metrics.ThreadPools.internal.HintedHandoff
Number of threads:max_hints_delivery_threads (default: 1)
Max pending tasks:231-1
Alerts:pending > 15 || blocked > 0

MemoryMeter

Measures memory usage and live ratio of a memtable. The pending should not grow beyond size of number of memtables (front ended by set to prevent duplicates). This can take minutes for large memtables.
JMX beans:org.apache.cassandra.internal.MemoryMeter
org.apache.cassandra.metrics.ThreadPools.internal.MemoryMeter
Number of threads:1
Max pending tasks:231-1
Alerts:pending > column_family_count || blocked > 0

PendingRangeCalculator

Calculates the token ranges based on bootstrapping and leaving nodes. Instead of blocking this stage discards tasks when one already in progress so no value in monitoring
JMX beans:org.apache.cassandra.internal.PendingRangeCalculator
org.apache.cassandra.metrics.ThreadPools.internal.PendingRangeCalculator
Number of threads:1
Max pending tasks:1

commitlog_archiver

(Renamed CommitLogArchiver in 2.1) Executes a command to copy (or user defined command) commit log files for recovery. Read more at DataStax Dev Blog
JMX beans:org.apache.cassandra.internal.commitlog_archiver
org.apache.cassandra.metrics.ThreadPools.internal.commitlog_archiver
Number of threads:1
Max pending tasks:231-1
Alerts:pending > 15 || blocked > 0

AntiEntropySessions

Number of active repairs that are in progress. Will not show up until after a repair has been run so any monitoring utilities should be able to handle it not existing.
JMX beans:org.apache.cassandra.internal.AntiEntropySessions
org.apache.cassandra.metrics.ThreadPools.internal.AntiEntropySessions
Number of threads:4
Max pending tasks:231-1
Alerts:pending > 1 || blocked > 0

Angular Tutorial (Update to Angular 7)

As Angular 7 has just been released a few days ago. This tutorial is updated to show you how to create an Angular 7 project and the new fe...