NoSql Database Cluster Configuration for high Performance

posted in: Projects | 0

This post focuses on current commodity hardware. Its optimal setting for various types of use cases and how your choice of NoSql database can be configured to use it to achieve high performance.

Hardware Performance

Disk Drive Performance

For disk drives, most time consuming operation is seek times. From wikipedia, seek time of 7200 RPM drives is about 4 ms. So in worst case, disk drives can read about 2500 blocks per second. It needs at least 4-5 disk seeks (for database table with over 100K records, it will need at least 4-5 levels of btree) for Primary key Btree index lookup. So realistically, its performance is about 500 records per second with 5 seeks.

For sequential reads/writes (for append only databases or kafka like use case), 7200 RPM disk drive can do about 100 Mbytes per second. Or for 1KB record size it can read/write about 100,000 records per second.

SSD Performance

High performance SSDs today can do about 100,000 IOPs per second of about 4K size. Performance of SSD is almost same as sequential disk accesses.

It may be a good idea to use disk drives for append only files like redo logs or append type workloads like in kafka.

Object Serialization/deserialization Performance

Based on performance analysis done on this blog, it is about 250,000 serialization/de-serialization of 1,000 byte payload using ProtoBufs on Intel Intel Core i3-3240 (Ivy Bridge) 3.40GHz processor. It should linearly scale with more CPUs and cores as this operation is mostly CPU bound.

Typical hardware configuration

For cost and performance, we will use typical hadoop recommended configuration as our baseline.


Based on the price, power consumption, use case and rack size, try to get as many CPUs as possible per machines.

Its always better to scale up to certain limit before thinking about scaling out. Typical hardware with 8, 16 or even 32 CPUs is possible in todays commodity hardware configurations.

Just to put some perspective, typical memory hash map implementations like redis can do ~100,000 get/set per second per CPU. So most of NoSql workloads are going to be memory and disk bound (and not CPU bound) as throughput of disk based NoSqls is never going to be 100,000 queries per second per CPU.

Physical Memory

Try to get as much physical memory as possible within constraints of price, size and power consumption. It is possible to have memories ranging from 72 GB, 128GB, 256GB and 512GB. More the better.

Disk Drives and SSDs

SSDs beat disk drives in all respect except the size. Commodity hardware can have 14 disk drives of 2-4TB sizes, that amount to about 24 TB to 48 TB storage with disk.

Cost and physical slot size requirements stop SSDs to reach to that kind of storage per machine. Typical SSD storage could range from 1TB up to 10TBs.

Choosing hard drives or SSDs

Pure size perspective hard drives make sense. You will need to fill up 48 TBs before considering about sharding. But performance is a big bottleneck with hard drives. With 14 drives, you can have 14*500=7000 record QPS.

SSDs can do about 100,000 IOPS or 20,000 QPS for record (1/5th due to btree lookup). With 2-10 SSDs per hardware, one can expect about 20,000*10=200,000 QPS for records.

You can choose hybrid approach. Based on the use case some collections could be in SSDs and some in hard drives. For Example – If you are building E-commerce site then, use case usually behaves in a way that 80-90% of queries use hot records from memory. For example, there will be lot of queries on iPhone or standard items which could be in the memory. Or queries to get user preferences etc. In this case, it may be OK to use hard drives. For append only use cases like redo logs, kafka use disk drives.


For  random accesses use RAID 0 for performance. Most of NoSql databases provide redundancy, so redundant RAID configurations may not be required. But if you are using disk drives and data size requirements are going to be less than available size then you may consider using redundancy RAID configurations. At least single disk failures may not trigger node rebalancing.

For append only use cases, redo logs, kafka etc don’t use RAID. As these use cases rely heavily on the sequential access speed of disk.



OLPT use cases mostly tend to operate on single record. It is easier from scalability, extensibility and maintenance point of view to have schemas properly normalized. De-normalized schemas create maintenance issues as some future use cases may not be easy to implement if schema is not properly normalized. Classic example is, teacher changing her class timings. For normalized schema just one record in one table needs to change. But for de-normalized schemas, records of all students taking that class might have to be updated.

For OLTP workloads it is better to choose NoSql which supports relational type schemas and joins.

New wave of NewSql databases is picking up where they are focusing on “functionality of SQL/relational databases with scale of NoSql” mantra.

Data warehousing / Analytics

Use case in this case are mostly to insert records in fact tables and be able to join with dimension tables. Key here is dimension tables change very rarely. Sometimes these kinds of dimension tables are called slowly changing dimensions. For Example – Walmart adding support for new product or adding new store.

Due to their insert only nature, their size can go infinitely. Because of this proper sharding of data is a absolute must. Also, due to their append/insert only nature, it is ok to de-normalize schema up to certain extent. Schema de-normalization, doesn’t mean joins are not required. In these payloads as well, you still will need a join support for joining with dimension tables. For Example – To count number of products sold in Walmart store A from Cereal section in bottom rack. In this case there are 3 dimensions used; Store dimension, Section dimension and Rack dimension. Rack and Section dimensions may be hierarchical and may not be easily mapped to some metadata to avoid joins. In this case it may be best to use joins (and we will be better off using database supporting joins) rather than name-value or document databases which don’t support joins.

Bottom line is, data warehousing type of payloads need de-normalized data schema with shardable, horizontally scalable databases, but it may not be a bad idea to select latest type of NewSql databases which are highly scalable and support joins and other relational database constructs. Google released datastore which is highly scalable and also support some basic SQL like queries and joins.

Real time analytics

In this case, data from data warehousing is fed back to OLTP databases (or other databases which can be queried in real time) so that they can use this data to serve user better. For Example – Sites providing with recommendations based on user current activity.

This could be huge amounts of mostly read only data (data might change once a day, this change could be of insert, update or delete type) fed from data warehousing which will be queried by realtime use case. For Example – Say user is watching some movie; based on that user is provided with recommendations. Recommendations engine in this case uses both real time OLTP data (to get current user activities) and data warehousing data to use in recommendations algorithms to come up with best possible recommendations for that user.

NoSQL Databases

Due to scale and size of data, any selected NoSql database should support sharding and replication.

Most of current wave of top NoSql databases are mostly document or name/value and not completely designed to take full advantage of current commodity hardware specs.


use RAID 0 for collection and index files.

MongoDB architecture almost forces use of SSDs which limits maximum storage capacity per node. Which further forces going for sharding when other resources like CPU or memory haven’t reached their peak capacity.

Due to its memory mapped files and table locking for writes, it cannot use commodity hardware stack to its full capacity. Sharding further causes, hardware, operational, maintenance costs sky rocketing along with fragile structure of cluster since even sharding cannot completely solve underlying architectural issues.

On top, their loose (document) schema model comes in a way of future enhancements as due to need for atomicity, schema de-normalization is forced which in turn increases the document size and eventually suffers the performance and other issues crop up due de-normalized schema.

Bad queried can further degrade cluster performance since not frequently used records can clog up physical memory due to memory mapped files.


Cassandra read and writes could be sequential (their architecture document is not clear if reads are sequential). SSTables are written to disk sequentially. They are read periodically sequentially to merge in to  one SSTable.

Even though reads and writes are sequential, overtime due to increasing file count disk fragmentation can occur. They loose speed of sequential accesses which will show up in the performance and throughput.

Based on this information, we can assume that Cassandra node may not use RAID and should not need to use SSD.


Couchbase is document database like MongoDB so de-normalization issues discussed in MongoDB also apply to Couchbase.

Couchbase is classic example of append only database where every insert/update/delete is appended to datafile. And Compaction keeps compacting data files for every bucket.

If the data size is bigger than the physical memory then, random access reads may be required to fetch the record.

Since performance of disk drives and SSD is same for sequential accesses writes can use either SSD or disk drives. For reads SSD will outperform hard drives due to their random access nature.

So based on use case and data size you may have to choose between SSD and disk drive.

  • If physical memory is about same as data size per node then use disk drive. I would say data size bigger by up to 50%, use disk drive.
  • If your use case is such that 80-90% of queries use hot records from memory then use disk drive.
  • Any other case, use SSD, but note that you might have to start sharding earlier than you wanted.


Even though Kafka is not a NoSql database, it shows up in most of the NoSql cluster databases.

Its payload is mostly append only disk drive type with not much CPU needs. It usually needs as many disk drives as possible with no need for RAID.

What is better? Unsafe, ByteBuffer or Direct ByteBuffer?

posted in: Projects | 0


Unsafe memory allocation is about 10-12% faster than ByteBuffer and Direct ByteBuffer. Since it lives out side of JVM heap, application needs to manage its allocation and deallocation.

There are couple of JAVA runtime options to consider as well which might help boost performance of ByteBuffer.

Conclusion is, consider all 3 options listed below and see which option is better for your specific use case. We are working on open sourcing a wrapper library which will wrap Unsafe buffer to java ByteBuffer so that they can used interchangeably in the code.

In WonderDB, we are going to support all 3 options. Since most of the our cache memory is allocated at the beginning and destroyed at the end, we really have no issues with application requiring to manage memory in case of Unsafe memory allocation.

Our Use Case

WonderDB is a NoSql transactional database built on top of relational database architecture and design concepts. Our caches are fully disk backed and we manage life cycle of data buffers by paging in and out of disk files. Currently we have built our caching engine on top of direct ByteBuffer and now in the process of evaluating other approaches for performance improvements.

Different ways to store bytes in to memory

We are considering following options:

  • ByteBuffer
  • Direct ByteBuffer
  • Unsafe

Most of access to cache in WonderDB is to read and write serialized byte arrays.

I found detailed performance analysis of all above three methods in Alexey’s blog. Unsafe read/write performance for byte arrays is about 10-12% better than ByteBuffer or Direct ByteBuffer.

Even though Unsafe memory allocation is faster, we believe, there are couple of JVM runtime options which should help ByteBuffer allocation.

JVM runtime options to consider


Most of the major operating systems support LargeMemory page settings. This setting improves performance due to following reasons.

  1. increased performance through increased Translation Lookaside Buffer (TLB) hits
  2. pages are locked in memory and are never swapped out which will guarantee whole JVM heap remains in RAM. Same guarantee could not be given for Direct ByteBuffer memory.
  3. contiguous pages are pre-allocated and cannot be used for anything else but for System V shared memory, for example JVM heap.
  4. less bookkeeping work for the kernel in that part of virtual memory due to larger page sizes

Only ByteBuffer can take advantage of this jvm setting. It does not help direct ByteBuffer or Unsafe buffers since they are are allocated off heap and out of control of JVM.

More information about LargeMemory pages can be read here in redhat site.

Please note that LargeMemory pages need to be enabled on the machine before using this option. Please click Oracle site here to understand how to enable Large pages in different operating systems including Linux, Windows and Solaris.

The way this option can be used is, set the size of Large Pages little bigger than the size of java heap size you are considering. This way, whole java heap will be pinned to the memory and os wont page in and out its contents.

-XX:+CompressedOops java option

This option allows pointer compression in 64bit JVM to reduce the heap.


Redo Log – Resource Journaling

posted in: Projects | 0


Redo Log (look ahead log or commit) journaling is required whenever system supports in-place updating of serialized resource which is backed up by datastore for durability . This is required to capture changes in atomic way so that they can be replayed properly in the event of system crashes or abrupt shutdowns.

Interesting fact is, this feature is not required for append only databases or applications since changes can be serialized to disk in atomic way in one write.

In the past this was mostly used in relational databases. But now, with new wave of applications with requirements of high level of redundancy, durability and horizontal scalability on the commodity hardware is picking up, this feature may be useful to lot of todays applications other than NoSql databases.

Todays hardware configurations are also very different from the past. Commodity hardware can have 10+ of CPUs, 100+ GB RAM and 10-40 TB of hard disk with ~1 GB SSD. If your application needs to store lot of changing data in to the backend, in-place editing of the resource may be more scalable than append only serialization. In that case redo log or resource journaling will be very handy for you.

High level requirements

Redo log journaling not only should manage redo logs but also apply the changes to backend data stores. Applying changes to the backend should guarantee the order and be multi threaded.

Redo log management should include log file rotations and also cleaning up of old log files whose changes are already applied.

High Level design

It has following 4 apis.



It returns transaction id which will be further used during logging, commit or rollback apis.

log (transaction id, byte[] serializedObject)

It keeps serializedObject in local ordered list against the transaction id so that it can be serialized to redo log file in atomic fashion.

commit (transaction id)

It writes list of all serialized objects for given transaction in to the redo log file. After writing, if log file size goes beyond certain size, it creates new one. And if more one than log folders are provided, it creates new log file in other folder.

rollback (transaction id)

It removes list of all serialized objects from the list for given transaction and never writes them to redo logs.

Apply types

This product not only maintains redo log files but also applies to backend store. Following are various apply types.


It applies changes to backend for every transaction. This may not be most suitable from performance point of view in the running system since it writes every transaction not only to redo log file but also to backend store.

Only advantage of this type is, it doesn’t have to flush pending transactions to the backend store during shutdown. So shutdown time is shortened.

Every so often (configurable number)

It is similar to Every, except it syncs transactions to backend after set number of transactions. This is little better from performance point of view since it batches up the transactions to write to backend.

On Log switch

This is most preferred way. It syncs up transactions after log file switch. If it is configured with multiple log files, then it can provide the best performance and throughput since it could be writing redo logs in one disk while syncing them from another disk.


It doesn’t apply changes to datastore. Application generating transactions will take this responsibility.

In WonderDB we use this setting since WOnderDB database syncs up caches back to the data store in regular intervals.

Should support multiple log files in multiple folders

Todays commodity hardware have multiple hard drives (mostly for hadoop like workloads). For high performance, we need to assume logs will be stored in multiple folders (and not just one). So that look ahead logs can be written to one folder in some disk and at the same time, those logs are being read and applied to backend from other disk or folder. This will reduce the contention on disk heads (and disks) since reads and writes will happen on two separate folders possibly from separate disks.

For example, redo logs are configured to write in two folders A and B. Say folder A resides on disk1 and folder B resides on disk 2. Also say we have set max file size for redo logs is 100K. Meaning, after log file reaches 100K or above, new log file will be created and new logs will be written into the new log file on disk2. Now, we can start writing back logs to backend from disk1 or log file1 so that reads and writes will be happening on different disk.

How it works

btree_image.002Redo Transaction Manager as shown in diagram is the implementation of redo log journaling. Applications can use it just by adding its dependency in their maven project and start using it.

As described above it provids 4 apis.

Here is the flow application will take to integrate with the redo logger. First application should have some resource which it needs to serialize to disk or some other data store. It performs following 4 steps to use redo logger to make sure the resource is serialized in atomic/transactional way in to the data store.

  1. Application updates resource
  2. It calls redo transaction logger to log it in to the redo log.
  3. Redo transaction logger logs/serializes it into the redo log. It also manages life cycle of redo log files.
  4. Eventually in some other thread redo transaction applier, which is also part of the redo logger picks up this transaction from redo log and applies it to the actual data files.

For more information on its uses please refer to Getting Started page.

Hash Index

posted in: Projects | 0

btree_image.003Hash index is based on hash map data structure. It has lot of properties like, rehasing, load factor etc. You can read more about it in wikipedia.

We will more focus on usage and design of this data structure in the context of databases, where its size could be bigger than the physical memory and will need to be serialized to disk. Following are various points to consider when using it in databases.

Rehash – Increasing/decreasing size of buckets

Hash maps need to lock the whole data structure during the rehash. This may not be very expensive operation for in memory implementations of hash map. But may be almost impossible in the context of databases since rehash will have to lock the whole table.

Most databases suggest to rebuild the index when performance starts degrading over time due size of items in the index vs number of buckets. Or load factor going below certain threshold value.

Generic implementations of hashCode() and equals() methods

It may not be easy to provide hashCode() and equals() implementations specific to your index class. Usually databases will use their generic implementations which may not be very efficient to your index class. In wonderdb, we are working on a feature to register data type.

Load factor – Calculating buck size

By default Java hash map implementation sets load factor to 0.75. Allowing size of hash map to grow up to certain size before it is rehashed. For example say initial capacity of hash map is 100, then it will be rehashed  when 75th item is stored in to the hash map.

We need different load factor considerations for hash index. Lets take a example on how to calculate load factor for hash indexes.

As shown in the figure above index entries are stored in disk block. It is called leaf node in wonderdb. Say size of disk block is 2048 bytes. Now lets say you want to store long in to the index (8 bytes). Also assume index stores pointer to actual table record, say pointer size is 12 bytes, then it needs 20 bytes to store index item in the disk. Say one disk block (2048 bytes) stores 100 index items.

In this case, for optimized use of disk space it will be better to assume optimal settings will be 100 items per bucket, instead of 1 item per bucket (as in Java hash map implementation). So load factor settings for hash index will be .0075. Or in another way to look at it; to store 750 items, java hash map will need 1000 buckets to achieve 0.75 load factor, whereas hash index will need only 7.5 (or 8) buckets to store 1000 items since we need to assume 100 items per bucket in case of hash index.

BTree+ Vs Hash Index

Advantage of hash map data structure vs tree data structure is its access time is constant time (O1). Or it just takes 1 comparison (one instruction) to get to the item assuming it is properly organized. Where as for tree the access time is O(log n). Or it takes 10 compare instructions to get to an item in a tree containing 1024 items.

So for 1M items it takes 20 compares and for 1B it takes 30 compares for the tree. Where as for hash map it will take 1 compare instruction to get to the item. But problem is, we probably wont be able to store billions to items in hash map due to physical memory size constraints.

So for storing billions of items in hash index we need to also take load factor in to account. Say load factor is 0.0075 in case of storing longs, or in other words, if bucket should contain 100 items for optimal disk usage then already it needs to do 100 long 100 = 7 compares within the buckets to get to the item. So already access time is no more O1 but O7.

So to see optimal performance, we need number of items for Btree+ to do at lease 3x of 7 or 21 compares which is ~ 1M items.

So point here is, unless you are expecting millions of items in the index, dont even consider hash index.


We are able to see hash index performance can go up to 2x for a case where BTree+ needed 3 level deep structure.

We see performance of 60K queries per second in case of 32000 items with key size of 1500. We selected key size of 1500 just to make force 3 level deep BTree+. Where as for BTree+ we see close to 35-40K queries per second.

Here was the setup for the test.


It needed 32000 leaf nodes to store 32000 items since it can store only 1 item per node with 1500 size with 2048 block size.

It needed 32000/150 = 234 branch blocks. Branch blocks store pointers to next branch or leaf blocks and can store 150 items per block.

Next level of branch blocks needed 234/150 = 2 blocks.

And root node pointed to 2 next level blocks.

So our tree structure had 4 levels with

– 2 items in root node

– 2 branch blocks pointing to 234 items in next level

– 234 branch to point to 32000 leaf blocks.

– and 32000 leaf blocks.

It needed 1 compare (on root level) + 8 compares (on next level containing 234 pointers) + 15 compares (to get to leaf node) + 1 compare to find item in leaf node = 25 compares.

Hash Index

We had 32000 buckets to make hash index fully optimized to get O1 compare to get to an item.

With this setup we saw 60K queries per second for hash index and 40K queries per second for BTree+. over 50% improvement.



posted in: Projects | 0

btree_image.001BTree data structure is generalization of self balanced tree where in, node can contain more than one child node. This generalization is useful in storing it on to the disk files. In this case node can be mapped to disk block and can easily be read and written to the disk.

Btree+ is further generalization of Btree where in indexes and records are stored in leaf nodes. Branch nodes only contain pointers to other leaf and branch nodes. This optimization is useful in further performance improvements in increasing the throughput since branch nodes don’t need to be locked except during rebalancing. Data is usually inserted or deleted in leaf nodes hence only leaf nodes need to be locked during CRUD operations.

Further in BTree+ leaf nodes can contain pointers to previous and next leaf node for index range traversals.

Node Types

Every node contains ordered list of items in ascending or descending order depending on how it is created.


Branch nodes contain next level ordered list of pointers to other nodes. Order list is based on max key of every node. For default block size of 2048 bytes, it stores pointers to 150 next level branch or leaf nodes. It also contains copy of maximum key value, (last key of the ultimate leaf node it is pointing to). This key is used to tree traversal to get to proper leaf node during query processing.

For example, say branch node B contains pointers to 2 leaf nodes X and Y. Max key of X node is say 100 and max key of Y is say 200 then node B will store these 2 nodes in order X and Y. Also, in this case max value of node B will be 200.


Root node is special case of branch or leaf node. This is the entry point into the data structure. If all keys can be stored in one node, then it will be of leaf node. Else, usually root is a branch node.


Leaf nodes contain ordered list of index or record key data. It also contains pointer to next and previous leaf nodes. Current implementation needs pointers to previous nodes but we are working on enhancement to remove that dependence.

Based on the size of the key, it contains as many keys that can fit in leaf block size. Default block size is 2048 but can be changed during the construction of the index by providing STORAGE keyword.

Max key value of leaf is usually last key in the node.

For Example, say leaf node can contain keys 100, 200 and 300 in the order. Also, its max key value in this case will be 300.



During insertion it starts at the root node. And it traverses up to the leaf node (position in the ordered list of keys) where the item can be inserted.

Since every item in all nodes (branch and leaf) are ordered, it uses binary search to locate the item in the node where new item can be inserted. It follows this step recursively from root up to the leaf and inserts that item into the leaf.

Insertion balancing

If newly added item increases the size of leaf node more than the block size (default 2048 bytes), then it splits the node in to 2 leaf nodes and traverses upward to insert pointer of newly added leaf into the parent branch node. If after inserting this new pointer, if branch node needs to be split because of its, size then it recursively continues upward until root node to rebalance. Since rebalancing is changing whole structure of the BTree+, whole BTree+ needs to be locked during this operation.


Similar to insertion, deletion also starts at the root node and traverses up to the leaf where item to be deleted is located. And it removes that item from the leaf node.

Delete rebalancing

If leaf node becomes empty after the deletion of the item then again, it needs to rebalancing to remove its pointer from the parent node and recursively up to the root node if required. During this time, whole BTree+ needs to be locked.


Fortunately update is not required. Update can turn out to be delete and insert operation which will be two separate operations on the BTree+

Tree locking scenarios


Following steps are performed

  1. Read lock on the tree.
  2. Traverse up to the leaf node
  3. Read lock leaf node.
  4. Unlock read lock on tree started on #1.
  5. Return iterator so that range (or PK lookup) query can walk through the range. Iterator properly unlocks and locks leaf nodes while traversing from node to node.
  6. At the end it is query processor (callers) responsibility to unlock the read lock on the leaf node.

Following steps are performed

  1. Read lock on the tree.
  2. Traverse up to the leaf node.
  3. Write lock leaf node.
  4. Try inserting an item. If it needs to split because of of size increase, then
    1. Release write lock on the leaf. (Step #3)
    2. unlock read lock (Step #1).
    3. acquire write lock on the tree, since rebalancing operation might be performed.
    4. Again Traverse up to the leaf node.
    5. Write lock leaf node. In theory this is not required since there is a lock on the tree itself. There are no other threads in this tree at this time.
    6. If rebalance is required. Rebalance the tree
    7. Unlock write lock on the tree
  5. Insert the item.
  6. Unlock write lock on leaf
  7. Unlock read lock on the tree.
  1. Read lock on the tree.
  2. Traverse to the leaf node.
  3. Write lock leaf node.
  4. Remove the item. If it needs to reblance
    1. Release write lock on the item
    2. Release read lock on the tree.
    3. Acquire write lock on the tree
    4. Traverse to the leaf
    5. Remove the item.
    6. Rebalance of required.
    7. Unlock tree
  5. Unlock leaf.
  6. Unlock tree.

Node levels in Wonderdb

Branch nodes store disk pointer to next node (about 10 bytes). So for default 2048 bytes, it stores about 200 items in branch node.

Leaf node on the other hand stores actual key value and the pointer to the actual record contents. So for key size of 100 bytes, it needs 110 bytes (100 for the key and 10 for the pointer to the record). So it stores about 100 keys in the leaf block.

Based on above assumptions,

2 level tree will store 100*200 = 20000 items, 200+1 = 201 blocks = 201*2048 ~ 400KB disk space

3 level tree will store 100*200*200 = 4000000 = 4M items, size of tree will be (200*200)*2048 ~ 80MB

4 level tree will store 100*200*200*200 = 1600000000 = 1.6B items, disk space = 200*200*200*2048 = 1.6 GB

From above calculations you can easily see why whole btree can be present in physical memory if we assume we have 50+ GB physical memory. Machines with 50+GB is considered commodity hardware nowadays.

This calculation is very important in choosing BTree+ vs Hash index if range query is not required. If configured properly, hash index can perform 2-3 times faster than BTree+ which will be huge improvement.


High Level Architecture

posted in: Projects | 0

archAt the core, it provides persistent BTree+ and linked lists of variable size blocks and memory/buffer management. With this basic infrastructure, it can be customized to store any type in the database with indexing support for fast data retrieval. First use case we are implementing is cache.

Record level locking and variable buffer management are very useful for performance and throughput. With variable size blocks, tables with bigger record sizes can be stored in data files with bigger block sizes where as indexes or small tables could be stored in smaller block size data files. This way record/index can be fetched in to memory cache with one disk io.

We have seen it scale linearly with queries per second per node up to 33000 per second with Amazon EC2 xlarge node (4 CPU, 13 GB RAM and 2, 40GB SSDs with 3000 IPOs).

Buffer Cache Manager

There are 2 buffer caches in the system. One which sits in the JVM heap (By default it is allocated about 30% of heap size) and other uses java direct buffers outside of JVM.

Buffer cache is a simple fixed bucket hash map of objects or blocks. Buffer cache in JVM heap stores Record or index objects where as buffer cache in direct memory stores block of bytes,

On cache miss, JVM buffer cache gets data from direct memory buffer cache. Direct memory buffer cache on cache miss brings buffer from disk. Until buffer is brought to jvm buffer cache, requesting thread blocks. This thread is most probably the thread executing the query.

It modifies data in both the buffers for the queries which modify buffer contents (Insert, Update and Delete queries).

Buffers which are accessed during query processing (any CRUD query) are pinned to the memory by inserting their pointers (location in disk) in ConcurrentLinkedQueue so that eviction or writer threads don’t evict or sync back to disk these buffers (or objects) as these buffers/objects are most probably locked during query processing.

Cache Evictor

There are two evictor threads in the system.

Normal JVM heap evictor thread

This thread starts evicting JVM heap buffer cache when it reaches high watermark. This thread starts evicting least used buffers which are not pinned (being accessed at the time). Low watermark by default is set to 90% of cache size. And high watermark is by default set to 95%. It evicts objects until it reaches low watermark.

Normal Direct memory evictor thread

When cache reaches high watermark, eviction thread starts evicting least used buffers which are not pinned and dirty (dirty buffers are the buffers which have changed recently and not yet synced to disk). Low watermark by default is set to 90% of cache size.And high watermark is by default set to 95%. It evicts objects until it reaches low watermark.

Cache Writer

This can be set in to modes;

Aggressive write mode

As soon as buffer gets dirty (may be due to insert, update, delete query), it is written back to disk. This setting is useful when log writer is not enabled.

Timed write mode

In this mode, main writer thread wakes up every so often (3 seconds by default) and writes all dirty buffers which have changed before the writer thread was started.

During the scan, for every dirty buffer, it picks up a new thread from the pool for writing to disk. If thread is not available in thread pool, it waits until a thread is available. Thus writer is implemented as multi-threaded which is very useful when a machine has lot of spindles.

Data structures

Both BTree+ and Linked list support persistent store so that if any element/block is not available in the memory, it can go and get it from the disk.

How Queries are executed


  1. First it creates Unique index objects from insert parameters and puts if absent in global ConcurrentHashMap. This step is required to make sure no other query is inserting or updating same Unique indexes if it is, then UKViolationException is thrown.
  2. Then it checks every unique BTree+ index tree to make sure these indexes are not already present in the BTree. If it does’ it throws UKViolationException.
  3. It inserts this record in Collection/Table linked list.
  4. Inserts all indexes in to various BTree+ trees.
  5. Removes unique index objects from ConcurrentHashMap in step 1 and returns.


  1. Select query can select more than one record based on where filter.
  2. Based on the filter columns, it checks if it needs to take Index scan or full table scan.
  3. If its full table scan, it positions Full table scan iterator on head of collection/table linked list.
  4. If its indexed scan, it first positions Index scan iterator into proper LeafIndexBlock. From this is where it will start scanning the index list.
  5. For each record in the iterator, it applies the filter and generates the list of record pointers in the result set. For index scan if filter returns true, it selects that record, else it stops scanning. For Full table scan it scans whole link list.
  6. Once it has list of record pointers, it goes selects every record and returns based on select column list if that record still passes filter.

set (Update)

  1. Update query can update more than one record.
  2. So first it selects records which need to be updated executing steps 1-5 in select query.
  3. For each record in the select list, it updates the record if that record still passes filter.


  1. Similar to update query, delete query first selects records which need to be deleted.
  2. Then deleted every record if that record still passes filter.


Locking Tree

  1. Whole tree is locked for reads when tree is being searched. Once record/block is located, this lock is removed.
  2. Once leaf node/index entry is located, it puts read or write lock on index leaf block based on type of query. For reads it puts read locks. For writes, it puts exclusive write lock on index leaf block.
  3. If block needs to split, it sets the splitRequired = true for that block. Removes the write lock and then tries to acquire exclusive write lock on the tree and then goes through the BTree split motions. During this any number of blocks might need to split. But since Tree is locked exclusively, other threads wont get affected. Also, while during the time between lock on leaf block is removed and exclusive lock on tree is acquired, if any other thread tries to update that leaf block, it will get SplitRequiredException.
  4. Current implementation sets splitRequired=true flag for the index leaf block even if all index entries in that block are removed. And then gets exclusive lock on the tree and removes that block and may be many more blocks from the tree. Technically this can be done separately without blocking query thread. Something I need to change in the future.

Locking Link List

  1. Specific block in linked list is read or write locked for every CRUD operation on the record in that block.
  2. During table scans, when it needs to move to next block, it first locks next block and then removes lock on the current block. So for some time, 2 consecutive blocks in the list will be blocked. Due to this, list could be scanned in forward direction only.

Extending the Link List

  1. Extending link list tail is carried out by separate asynchronous thread. Every time it keeps extending the list by configurable number (5 blocks by default) of blocks. This way, whole list doesn’t get locked until schema metadata for that collection/table is updated.

This scheme of extending link list is required so that inserts wont become serialized to single threaded when tail needs to extend during inserts.