NoSql Database Cluster Configuration for high Performance

posted in: Projects | 0

This post focuses on current commodity hardware. Its optimal setting for various types of use cases and how your choice of NoSql database can be configured to use it to achieve high performance.

Hardware Performance

Disk Drive Performance

For disk drives, most time consuming operation is seek times. From wikipedia, seek time of 7200 RPM drives is about 4 ms. So in worst case, disk drives can read about 2500 blocks per second. It needs at least 4-5 disk seeks (for database table with over 100K records, it will need at least 4-5 levels of btree) for Primary key Btree index lookup. So realistically, its performance is about 500 records per second with 5 seeks.

For sequential reads/writes (for append only databases or kafka like use case), 7200 RPM disk drive can do about 100 Mbytes per second. Or for 1KB record size it can read/write about 100,000 records per second.

SSD Performance

High performance SSDs today can do about 100,000 IOPs per second of about 4K size. Performance of SSD is almost same as sequential disk accesses.

It may be a good idea to use disk drives for append only files like redo logs or append type workloads like in kafka.

Object Serialization/deserialization Performance

Based on performance analysis done on this blog, it is about 250,000 serialization/de-serialization of 1,000 byte payload using ProtoBufs on Intel Intel Core i3-3240 (Ivy Bridge) 3.40GHz processor. It should linearly scale with more CPUs and cores as this operation is mostly CPU bound.

Typical hardware configuration

For cost and performance, we will use typical hadoop recommended configuration as our baseline.


Based on the price, power consumption, use case and rack size, try to get as many CPUs as possible per machines.

Its always better to scale up to certain limit before thinking about scaling out. Typical hardware with 8, 16 or even 32 CPUs is possible in todays commodity hardware configurations.

Just to put some perspective, typical memory hash map implementations like redis can do ~100,000 get/set per second per CPU. So most of NoSql workloads are going to be memory and disk bound (and not CPU bound) as throughput of disk based NoSqls is never going to be 100,000 queries per second per CPU.

Physical Memory

Try to get as much physical memory as possible within constraints of price, size and power consumption. It is possible to have memories ranging from 72 GB, 128GB, 256GB and 512GB. More the better.

Disk Drives and SSDs

SSDs beat disk drives in all respect except the size. Commodity hardware can have 14 disk drives of 2-4TB sizes, that amount to about 24 TB to 48 TB storage with disk.

Cost and physical slot size requirements stop SSDs to reach to that kind of storage per machine. Typical SSD storage could range from 1TB up to 10TBs.

Choosing hard drives or SSDs

Pure size perspective hard drives make sense. You will need to fill up 48 TBs before considering about sharding. But performance is a big bottleneck with hard drives. With 14 drives, you can have 14*500=7000 record QPS.

SSDs can do about 100,000 IOPS or 20,000 QPS for record (1/5th due to btree lookup). With 2-10 SSDs per hardware, one can expect about 20,000*10=200,000 QPS for records.

You can choose hybrid approach. Based on the use case some collections could be in SSDs and some in hard drives. For Example – If you are building E-commerce site then, use case usually behaves in a way that 80-90% of queries use hot records from memory. For example, there will be lot of queries on iPhone or standard items which could be in the memory. Or queries to get user preferences etc. In this case, it may be OK to use hard drives. For append only use cases like redo logs, kafka use disk drives.


For  random accesses use RAID 0 for performance. Most of NoSql databases provide redundancy, so redundant RAID configurations may not be required. But if you are using disk drives and data size requirements are going to be less than available size then you may consider using redundancy RAID configurations. At least single disk failures may not trigger node rebalancing.

For append only use cases, redo logs, kafka etc don’t use RAID. As these use cases rely heavily on the sequential access speed of disk.



OLPT use cases mostly tend to operate on single record. It is easier from scalability, extensibility and maintenance point of view to have schemas properly normalized. De-normalized schemas create maintenance issues as some future use cases may not be easy to implement if schema is not properly normalized. Classic example is, teacher changing her class timings. For normalized schema just one record in one table needs to change. But for de-normalized schemas, records of all students taking that class might have to be updated.

For OLTP workloads it is better to choose NoSql which supports relational type schemas and joins.

New wave of NewSql databases is picking up where they are focusing on “functionality of SQL/relational databases with scale of NoSql” mantra.

Data warehousing / Analytics

Use case in this case are mostly to insert records in fact tables and be able to join with dimension tables. Key here is dimension tables change very rarely. Sometimes these kinds of dimension tables are called slowly changing dimensions. For Example – Walmart adding support for new product or adding new store.

Due to their insert only nature, their size can go infinitely. Because of this proper sharding of data is a absolute must. Also, due to their append/insert only nature, it is ok to de-normalize schema up to certain extent. Schema de-normalization, doesn’t mean joins are not required. In these payloads as well, you still will need a join support for joining with dimension tables. For Example – To count number of products sold in Walmart store A from Cereal section in bottom rack. In this case there are 3 dimensions used; Store dimension, Section dimension and Rack dimension. Rack and Section dimensions may be hierarchical and may not be easily mapped to some metadata to avoid joins. In this case it may be best to use joins (and we will be better off using database supporting joins) rather than name-value or document databases which don’t support joins.

Bottom line is, data warehousing type of payloads need de-normalized data schema with shardable, horizontally scalable databases, but it may not be a bad idea to select latest type of NewSql databases which are highly scalable and support joins and other relational database constructs. Google released datastore which is highly scalable and also support some basic SQL like queries and joins.

Real time analytics

In this case, data from data warehousing is fed back to OLTP databases (or other databases which can be queried in real time) so that they can use this data to serve user better. For Example – Sites providing with recommendations based on user current activity.

This could be huge amounts of mostly read only data (data might change once a day, this change could be of insert, update or delete type) fed from data warehousing which will be queried by realtime use case. For Example – Say user is watching some movie; based on that user is provided with recommendations. Recommendations engine in this case uses both real time OLTP data (to get current user activities) and data warehousing data to use in recommendations algorithms to come up with best possible recommendations for that user.

NoSQL Databases

Due to scale and size of data, any selected NoSql database should support sharding and replication.

Most of current wave of top NoSql databases are mostly document or name/value and not completely designed to take full advantage of current commodity hardware specs.


use RAID 0 for collection and index files.

MongoDB architecture almost forces use of SSDs which limits maximum storage capacity per node. Which further forces going for sharding when other resources like CPU or memory haven’t reached their peak capacity.

Due to its memory mapped files and table locking for writes, it cannot use commodity hardware stack to its full capacity. Sharding further causes, hardware, operational, maintenance costs sky rocketing along with fragile structure of cluster since even sharding cannot completely solve underlying architectural issues.

On top, their loose (document) schema model comes in a way of future enhancements as due to need for atomicity, schema de-normalization is forced which in turn increases the document size and eventually suffers the performance and other issues crop up due de-normalized schema.

Bad queried can further degrade cluster performance since not frequently used records can clog up physical memory due to memory mapped files.


Cassandra read and writes could be sequential (their architecture document is not clear if reads are sequential). SSTables are written to disk sequentially. They are read periodically sequentially to merge in to  one SSTable.

Even though reads and writes are sequential, overtime due to increasing file count disk fragmentation can occur. They loose speed of sequential accesses which will show up in the performance and throughput.

Based on this information, we can assume that Cassandra node may not use RAID and should not need to use SSD.


Couchbase is document database like MongoDB so de-normalization issues discussed in MongoDB also apply to Couchbase.

Couchbase is classic example of append only database where every insert/update/delete is appended to datafile. And Compaction keeps compacting data files for every bucket.

If the data size is bigger than the physical memory then, random access reads may be required to fetch the record.

Since performance of disk drives and SSD is same for sequential accesses writes can use either SSD or disk drives. For reads SSD will outperform hard drives due to their random access nature.

So based on use case and data size you may have to choose between SSD and disk drive.

  • If physical memory is about same as data size per node then use disk drive. I would say data size bigger by up to 50%, use disk drive.
  • If your use case is such that 80-90% of queries use hot records from memory then use disk drive.
  • Any other case, use SSD, but note that you might have to start sharding earlier than you wanted.


Even though Kafka is not a NoSql database, it shows up in most of the NoSql cluster databases.

Its payload is mostly append only disk drive type with not much CPU needs. It usually needs as many disk drives as possible with no need for RAID.

Redo Log – Resource Journaling

posted in: Projects | 0


Redo Log (look ahead log or commit) journaling is required whenever system supports in-place updating of serialized resource which is backed up by datastore for durability . This is required to capture changes in atomic way so that they can be replayed properly in the event of system crashes or abrupt shutdowns.

Interesting fact is, this feature is not required for append only databases or applications since changes can be serialized to disk in atomic way in one write.

In the past this was mostly used in relational databases. But now, with new wave of applications with requirements of high level of redundancy, durability and horizontal scalability on the commodity hardware is picking up, this feature may be useful to lot of todays applications other than NoSql databases.

Todays hardware configurations are also very different from the past. Commodity hardware can have 10+ of CPUs, 100+ GB RAM and 10-40 TB of hard disk with ~1 GB SSD. If your application needs to store lot of changing data in to the backend, in-place editing of the resource may be more scalable than append only serialization. In that case redo log or resource journaling will be very handy for you.

High level requirements

Redo log journaling not only should manage redo logs but also apply the changes to backend data stores. Applying changes to the backend should guarantee the order and be multi threaded.

Redo log management should include log file rotations and also cleaning up of old log files whose changes are already applied.

High Level design

It has following 4 apis.



It returns transaction id which will be further used during logging, commit or rollback apis.

log (transaction id, byte[] serializedObject)

It keeps serializedObject in local ordered list against the transaction id so that it can be serialized to redo log file in atomic fashion.

commit (transaction id)

It writes list of all serialized objects for given transaction in to the redo log file. After writing, if log file size goes beyond certain size, it creates new one. And if more one than log folders are provided, it creates new log file in other folder.

rollback (transaction id)

It removes list of all serialized objects from the list for given transaction and never writes them to redo logs.

Apply types

This product not only maintains redo log files but also applies to backend store. Following are various apply types.


It applies changes to backend for every transaction. This may not be most suitable from performance point of view in the running system since it writes every transaction not only to redo log file but also to backend store.

Only advantage of this type is, it doesn’t have to flush pending transactions to the backend store during shutdown. So shutdown time is shortened.

Every so often (configurable number)

It is similar to Every, except it syncs transactions to backend after set number of transactions. This is little better from performance point of view since it batches up the transactions to write to backend.

On Log switch

This is most preferred way. It syncs up transactions after log file switch. If it is configured with multiple log files, then it can provide the best performance and throughput since it could be writing redo logs in one disk while syncing them from another disk.


It doesn’t apply changes to datastore. Application generating transactions will take this responsibility.

In WonderDB we use this setting since WOnderDB database syncs up caches back to the data store in regular intervals.

Should support multiple log files in multiple folders

Todays commodity hardware have multiple hard drives (mostly for hadoop like workloads). For high performance, we need to assume logs will be stored in multiple folders (and not just one). So that look ahead logs can be written to one folder in some disk and at the same time, those logs are being read and applied to backend from other disk or folder. This will reduce the contention on disk heads (and disks) since reads and writes will happen on two separate folders possibly from separate disks.

For example, redo logs are configured to write in two folders A and B. Say folder A resides on disk1 and folder B resides on disk 2. Also say we have set max file size for redo logs is 100K. Meaning, after log file reaches 100K or above, new log file will be created and new logs will be written into the new log file on disk2. Now, we can start writing back logs to backend from disk1 or log file1 so that reads and writes will be happening on different disk.

How it works

btree_image.002Redo Transaction Manager as shown in diagram is the implementation of redo log journaling. Applications can use it just by adding its dependency in their maven project and start using it.

As described above it provids 4 apis.

Here is the flow application will take to integrate with the redo logger. First application should have some resource which it needs to serialize to disk or some other data store. It performs following 4 steps to use redo logger to make sure the resource is serialized in atomic/transactional way in to the data store.

  1. Application updates resource
  2. It calls redo transaction logger to log it in to the redo log.
  3. Redo transaction logger logs/serializes it into the redo log. It also manages life cycle of redo log files.
  4. Eventually in some other thread redo transaction applier, which is also part of the redo logger picks up this transaction from redo log and applies it to the actual data files.

For more information on its uses please refer to Getting Started page.

Hash Index

posted in: Projects | 0

btree_image.003Hash index is based on hash map data structure. It has lot of properties like, rehasing, load factor etc. You can read more about it in wikipedia.

We will more focus on usage and design of this data structure in the context of databases, where its size could be bigger than the physical memory and will need to be serialized to disk. Following are various points to consider when using it in databases.

Rehash – Increasing/decreasing size of buckets

Hash maps need to lock the whole data structure during the rehash. This may not be very expensive operation for in memory implementations of hash map. But may be almost impossible in the context of databases since rehash will have to lock the whole table.

Most databases suggest to rebuild the index when performance starts degrading over time due size of items in the index vs number of buckets. Or load factor going below certain threshold value.

Generic implementations of hashCode() and equals() methods

It may not be easy to provide hashCode() and equals() implementations specific to your index class. Usually databases will use their generic implementations which may not be very efficient to your index class. In wonderdb, we are working on a feature to register data type.

Load factor – Calculating buck size

By default Java hash map implementation sets load factor to 0.75. Allowing size of hash map to grow up to certain size before it is rehashed. For example say initial capacity of hash map is 100, then it will be rehashed  when 75th item is stored in to the hash map.

We need different load factor considerations for hash index. Lets take a example on how to calculate load factor for hash indexes.

As shown in the figure above index entries are stored in disk block. It is called leaf node in wonderdb. Say size of disk block is 2048 bytes. Now lets say you want to store long in to the index (8 bytes). Also assume index stores pointer to actual table record, say pointer size is 12 bytes, then it needs 20 bytes to store index item in the disk. Say one disk block (2048 bytes) stores 100 index items.

In this case, for optimized use of disk space it will be better to assume optimal settings will be 100 items per bucket, instead of 1 item per bucket (as in Java hash map implementation). So load factor settings for hash index will be .0075. Or in another way to look at it; to store 750 items, java hash map will need 1000 buckets to achieve 0.75 load factor, whereas hash index will need only 7.5 (or 8) buckets to store 1000 items since we need to assume 100 items per bucket in case of hash index.

BTree+ Vs Hash Index

Advantage of hash map data structure vs tree data structure is its access time is constant time (O1). Or it just takes 1 comparison (one instruction) to get to the item assuming it is properly organized. Where as for tree the access time is O(log n). Or it takes 10 compare instructions to get to an item in a tree containing 1024 items.

So for 1M items it takes 20 compares and for 1B it takes 30 compares for the tree. Where as for hash map it will take 1 compare instruction to get to the item. But problem is, we probably wont be able to store billions to items in hash map due to physical memory size constraints.

So for storing billions of items in hash index we need to also take load factor in to account. Say load factor is 0.0075 in case of storing longs, or in other words, if bucket should contain 100 items for optimal disk usage then already it needs to do 100 long 100 = 7 compares within the buckets to get to the item. So already access time is no more O1 but O7.

So to see optimal performance, we need number of items for Btree+ to do at lease 3x of 7 or 21 compares which is ~ 1M items.

So point here is, unless you are expecting millions of items in the index, dont even consider hash index.


We are able to see hash index performance can go up to 2x for a case where BTree+ needed 3 level deep structure.

We see performance of 60K queries per second in case of 32000 items with key size of 1500. We selected key size of 1500 just to make force 3 level deep BTree+. Where as for BTree+ we see close to 35-40K queries per second.

Here was the setup for the test.


It needed 32000 leaf nodes to store 32000 items since it can store only 1 item per node with 1500 size with 2048 block size.

It needed 32000/150 = 234 branch blocks. Branch blocks store pointers to next branch or leaf blocks and can store 150 items per block.

Next level of branch blocks needed 234/150 = 2 blocks.

And root node pointed to 2 next level blocks.

So our tree structure had 4 levels with

– 2 items in root node

– 2 branch blocks pointing to 234 items in next level

– 234 branch to point to 32000 leaf blocks.

– and 32000 leaf blocks.

It needed 1 compare (on root level) + 8 compares (on next level containing 234 pointers) + 15 compares (to get to leaf node) + 1 compare to find item in leaf node = 25 compares.

Hash Index

We had 32000 buckets to make hash index fully optimized to get O1 compare to get to an item.

With this setup we saw 60K queries per second for hash index and 40K queries per second for BTree+. over 50% improvement.



posted in: Projects | 0

btree_image.001BTree data structure is generalization of self balanced tree where in, node can contain more than one child node. This generalization is useful in storing it on to the disk files. In this case node can be mapped to disk block and can easily be read and written to the disk.

Btree+ is further generalization of Btree where in indexes and records are stored in leaf nodes. Branch nodes only contain pointers to other leaf and branch nodes. This optimization is useful in further performance improvements in increasing the throughput since branch nodes don’t need to be locked except during rebalancing. Data is usually inserted or deleted in leaf nodes hence only leaf nodes need to be locked during CRUD operations.

Further in BTree+ leaf nodes can contain pointers to previous and next leaf node for index range traversals.

Node Types

Every node contains ordered list of items in ascending or descending order depending on how it is created.


Branch nodes contain next level ordered list of pointers to other nodes. Order list is based on max key of every node. For default block size of 2048 bytes, it stores pointers to 150 next level branch or leaf nodes. It also contains copy of maximum key value, (last key of the ultimate leaf node it is pointing to). This key is used to tree traversal to get to proper leaf node during query processing.

For example, say branch node B contains pointers to 2 leaf nodes X and Y. Max key of X node is say 100 and max key of Y is say 200 then node B will store these 2 nodes in order X and Y. Also, in this case max value of node B will be 200.


Root node is special case of branch or leaf node. This is the entry point into the data structure. If all keys can be stored in one node, then it will be of leaf node. Else, usually root is a branch node.


Leaf nodes contain ordered list of index or record key data. It also contains pointer to next and previous leaf nodes. Current implementation needs pointers to previous nodes but we are working on enhancement to remove that dependence.

Based on the size of the key, it contains as many keys that can fit in leaf block size. Default block size is 2048 but can be changed during the construction of the index by providing STORAGE keyword.

Max key value of leaf is usually last key in the node.

For Example, say leaf node can contain keys 100, 200 and 300 in the order. Also, its max key value in this case will be 300.



During insertion it starts at the root node. And it traverses up to the leaf node (position in the ordered list of keys) where the item can be inserted.

Since every item in all nodes (branch and leaf) are ordered, it uses binary search to locate the item in the node where new item can be inserted. It follows this step recursively from root up to the leaf and inserts that item into the leaf.

Insertion balancing

If newly added item increases the size of leaf node more than the block size (default 2048 bytes), then it splits the node in to 2 leaf nodes and traverses upward to insert pointer of newly added leaf into the parent branch node. If after inserting this new pointer, if branch node needs to be split because of its, size then it recursively continues upward until root node to rebalance. Since rebalancing is changing whole structure of the BTree+, whole BTree+ needs to be locked during this operation.


Similar to insertion, deletion also starts at the root node and traverses up to the leaf where item to be deleted is located. And it removes that item from the leaf node.

Delete rebalancing

If leaf node becomes empty after the deletion of the item then again, it needs to rebalancing to remove its pointer from the parent node and recursively up to the root node if required. During this time, whole BTree+ needs to be locked.


Fortunately update is not required. Update can turn out to be delete and insert operation which will be two separate operations on the BTree+

Tree locking scenarios


Following steps are performed

  1. Read lock on the tree.
  2. Traverse up to the leaf node
  3. Read lock leaf node.
  4. Unlock read lock on tree started on #1.
  5. Return iterator so that range (or PK lookup) query can walk through the range. Iterator properly unlocks and locks leaf nodes while traversing from node to node.
  6. At the end it is query processor (callers) responsibility to unlock the read lock on the leaf node.

Following steps are performed

  1. Read lock on the tree.
  2. Traverse up to the leaf node.
  3. Write lock leaf node.
  4. Try inserting an item. If it needs to split because of of size increase, then
    1. Release write lock on the leaf. (Step #3)
    2. unlock read lock (Step #1).
    3. acquire write lock on the tree, since rebalancing operation might be performed.
    4. Again Traverse up to the leaf node.
    5. Write lock leaf node. In theory this is not required since there is a lock on the tree itself. There are no other threads in this tree at this time.
    6. If rebalance is required. Rebalance the tree
    7. Unlock write lock on the tree
  5. Insert the item.
  6. Unlock write lock on leaf
  7. Unlock read lock on the tree.
  1. Read lock on the tree.
  2. Traverse to the leaf node.
  3. Write lock leaf node.
  4. Remove the item. If it needs to reblance
    1. Release write lock on the item
    2. Release read lock on the tree.
    3. Acquire write lock on the tree
    4. Traverse to the leaf
    5. Remove the item.
    6. Rebalance of required.
    7. Unlock tree
  5. Unlock leaf.
  6. Unlock tree.

Node levels in Wonderdb

Branch nodes store disk pointer to next node (about 10 bytes). So for default 2048 bytes, it stores about 200 items in branch node.

Leaf node on the other hand stores actual key value and the pointer to the actual record contents. So for key size of 100 bytes, it needs 110 bytes (100 for the key and 10 for the pointer to the record). So it stores about 100 keys in the leaf block.

Based on above assumptions,

2 level tree will store 100*200 = 20000 items, 200+1 = 201 blocks = 201*2048 ~ 400KB disk space

3 level tree will store 100*200*200 = 4000000 = 4M items, size of tree will be (200*200)*2048 ~ 80MB

4 level tree will store 100*200*200*200 = 1600000000 = 1.6B items, disk space = 200*200*200*2048 = 1.6 GB

From above calculations you can easily see why whole btree can be present in physical memory if we assume we have 50+ GB physical memory. Machines with 50+GB is considered commodity hardware nowadays.

This calculation is very important in choosing BTree+ vs Hash index if range query is not required. If configured properly, hash index can perform 2-3 times faster than BTree+ which will be huge improvement.