NoSql Database Cluster Configuration for high Performance

posted in: Projects | 0

This post focuses on current commodity hardware. Its optimal setting for various types of use cases and how your choice of NoSql database can be configured to use it to achieve high performance.

Hardware Performance

Disk Drive Performance

For disk drives, most time consuming operation is seek times. From wikipedia, seek time of 7200 RPM drives is about 4 ms. So in worst case, disk drives can read about 2500 blocks per second. It needs at least 4-5 disk seeks (for database table with over 100K records, it will need at least 4-5 levels of btree) for Primary key Btree index lookup. So realistically, its performance is about 500 records per second with 5 seeks.

For sequential reads/writes (for append only databases or kafka like use case), 7200 RPM disk drive can do about 100 Mbytes per second. Or for 1KB record size it can read/write about 100,000 records per second.

SSD Performance

High performance SSDs today can do about 100,000 IOPs per second of about 4K size. Performance of SSD is almost same as sequential disk accesses.

It may be a good idea to use disk drives for append only files like redo logs or append type workloads like in kafka.

Object Serialization/deserialization Performance

Based on performance analysis done on this blog, it is about 250,000 serialization/de-serialization of 1,000 byte payload using ProtoBufs on Intel Intel Core i3-3240 (Ivy Bridge) 3.40GHz processor. It should linearly scale with more CPUs and cores as this operation is mostly CPU bound.

Typical hardware configuration

For cost and performance, we will use typical hadoop recommended configuration as our baseline.

#CPUs/Cores

Based on the price, power consumption, use case and rack size, try to get as many CPUs as possible per machines.

Its always better to scale up to certain limit before thinking about scaling out. Typical hardware with 8, 16 or even 32 CPUs is possible in todays commodity hardware configurations.

Just to put some perspective, typical memory hash map implementations like redis can do ~100,000 get/set per second per CPU. So most of NoSql workloads are going to be memory and disk bound (and not CPU bound) as throughput of disk based NoSqls is never going to be 100,000 queries per second per CPU.

Physical Memory

Try to get as much physical memory as possible within constraints of price, size and power consumption. It is possible to have memories ranging from 72 GB, 128GB, 256GB and 512GB. More the better.

Disk Drives and SSDs

SSDs beat disk drives in all respect except the size. Commodity hardware can have 14 disk drives of 2-4TB sizes, that amount to about 24 TB to 48 TB storage with disk.

Cost and physical slot size requirements stop SSDs to reach to that kind of storage per machine. Typical SSD storage could range from 1TB up to 10TBs.

Choosing hard drives or SSDs

Pure size perspective hard drives make sense. You will need to fill up 48 TBs before considering about sharding. But performance is a big bottleneck with hard drives. With 14 drives, you can have 14*500=7000 record QPS.

SSDs can do about 100,000 IOPS or 20,000 QPS for record (1/5th due to btree lookup). With 2-10 SSDs per hardware, one can expect about 20,000*10=200,000 QPS for records.

You can choose hybrid approach. Based on the use case some collections could be in SSDs and some in hard drives. For Example – If you are building E-commerce site then, use case usually behaves in a way that 80-90% of queries use hot records from memory. For example, there will be lot of queries on iPhone or standard items which could be in the memory. Or queries to get user preferences etc. In this case, it may be OK to use hard drives. For append only use cases like redo logs, kafka use disk drives.

RAID

For  random accesses use RAID 0 for performance. Most of NoSql databases provide redundancy, so redundant RAID configurations may not be required. But if you are using disk drives and data size requirements are going to be less than available size then you may consider using redundancy RAID configurations. At least single disk failures may not trigger node rebalancing.

For append only use cases, redo logs, kafka etc don’t use RAID. As these use cases rely heavily on the sequential access speed of disk.

Payloads

OLTP

OLPT use cases mostly tend to operate on single record. It is easier from scalability, extensibility and maintenance point of view to have schemas properly normalized. De-normalized schemas create maintenance issues as some future use cases may not be easy to implement if schema is not properly normalized. Classic example is, teacher changing her class timings. For normalized schema just one record in one table needs to change. But for de-normalized schemas, records of all students taking that class might have to be updated.

For OLTP workloads it is better to choose NoSql which supports relational type schemas and joins.

New wave of NewSql databases is picking up where they are focusing on “functionality of SQL/relational databases with scale of NoSql” mantra.

Data warehousing / Analytics

Use case in this case are mostly to insert records in fact tables and be able to join with dimension tables. Key here is dimension tables change very rarely. Sometimes these kinds of dimension tables are called slowly changing dimensions. For Example – Walmart adding support for new product or adding new store.

Due to their insert only nature, their size can go infinitely. Because of this proper sharding of data is a absolute must. Also, due to their append/insert only nature, it is ok to de-normalize schema up to certain extent. Schema de-normalization, doesn’t mean joins are not required. In these payloads as well, you still will need a join support for joining with dimension tables. For Example – To count number of products sold in Walmart store A from Cereal section in bottom rack. In this case there are 3 dimensions used; Store dimension, Section dimension and Rack dimension. Rack and Section dimensions may be hierarchical and may not be easily mapped to some metadata to avoid joins. In this case it may be best to use joins (and we will be better off using database supporting joins) rather than name-value or document databases which don’t support joins.

Bottom line is, data warehousing type of payloads need de-normalized data schema with shardable, horizontally scalable databases, but it may not be a bad idea to select latest type of NewSql databases which are highly scalable and support joins and other relational database constructs. Google released datastore which is highly scalable and also support some basic SQL like queries and joins.

Real time analytics

In this case, data from data warehousing is fed back to OLTP databases (or other databases which can be queried in real time) so that they can use this data to serve user better. For Example – Sites providing with recommendations based on user current activity.

This could be huge amounts of mostly read only data (data might change once a day, this change could be of insert, update or delete type) fed from data warehousing which will be queried by realtime use case. For Example – Say user is watching some movie; based on that user is provided with recommendations. Recommendations engine in this case uses both real time OLTP data (to get current user activities) and data warehousing data to use in recommendations algorithms to come up with best possible recommendations for that user.

NoSQL Databases

Due to scale and size of data, any selected NoSql database should support sharding and replication.

Most of current wave of top NoSql databases are mostly document or name/value and not completely designed to take full advantage of current commodity hardware specs.

MongoDB

use RAID 0 for collection and index files.

MongoDB architecture almost forces use of SSDs which limits maximum storage capacity per node. Which further forces going for sharding when other resources like CPU or memory haven’t reached their peak capacity.

Due to its memory mapped files and table locking for writes, it cannot use commodity hardware stack to its full capacity. Sharding further causes, hardware, operational, maintenance costs sky rocketing along with fragile structure of cluster since even sharding cannot completely solve underlying architectural issues.

On top, their loose (document) schema model comes in a way of future enhancements as due to need for atomicity, schema de-normalization is forced which in turn increases the document size and eventually suffers the performance and other issues crop up due de-normalized schema.

Bad queried can further degrade cluster performance since not frequently used records can clog up physical memory due to memory mapped files.

Cassandra

Cassandra read and writes could be sequential (their architecture document is not clear if reads are sequential). SSTables are written to disk sequentially. They are read periodically sequentially to merge in to  one SSTable.

Even though reads and writes are sequential, overtime due to increasing file count disk fragmentation can occur. They loose speed of sequential accesses which will show up in the performance and throughput.

Based on this information, we can assume that Cassandra node may not use RAID and should not need to use SSD.

Couchbase

Couchbase is document database like MongoDB so de-normalization issues discussed in MongoDB also apply to Couchbase.

Couchbase is classic example of append only database where every insert/update/delete is appended to datafile. And Compaction keeps compacting data files for every bucket.

If the data size is bigger than the physical memory then, random access reads may be required to fetch the record.

Since performance of disk drives and SSD is same for sequential accesses writes can use either SSD or disk drives. For reads SSD will outperform hard drives due to their random access nature.

So based on use case and data size you may have to choose between SSD and disk drive.

  • If physical memory is about same as data size per node then use disk drive. I would say data size bigger by up to 50%, use disk drive.
  • If your use case is such that 80-90% of queries use hot records from memory then use disk drive.
  • Any other case, use SSD, but note that you might have to start sharding earlier than you wanted.

Kafka

Even though Kafka is not a NoSql database, it shows up in most of the NoSql cluster databases.

Its payload is mostly append only disk drive type with not much CPU needs. It usually needs as many disk drives as possible with no need for RAID.

BTree+

posted in: Projects | 0




btree_image.001BTree data structure is generalization of self balanced tree where in, node can contain more than one child node. This generalization is useful in storing it on to the disk files. In this case node can be mapped to disk block and can easily be read and written to the disk.

Btree+ is further generalization of Btree where in indexes and records are stored in leaf nodes. Branch nodes only contain pointers to other leaf and branch nodes. This optimization is useful in further performance improvements in increasing the throughput since branch nodes don’t need to be locked except during rebalancing. Data is usually inserted or deleted in leaf nodes hence only leaf nodes need to be locked during CRUD operations.

Further in BTree+ leaf nodes can contain pointers to previous and next leaf node for index range traversals.

Node Types

Every node contains ordered list of items in ascending or descending order depending on how it is created.

Branch

Branch nodes contain next level ordered list of pointers to other nodes. Order list is based on max key of every node. For default block size of 2048 bytes, it stores pointers to 150 next level branch or leaf nodes. It also contains copy of maximum key value, (last key of the ultimate leaf node it is pointing to). This key is used to tree traversal to get to proper leaf node during query processing.

For example, say branch node B contains pointers to 2 leaf nodes X and Y. Max key of X node is say 100 and max key of Y is say 200 then node B will store these 2 nodes in order X and Y. Also, in this case max value of node B will be 200.

Root

Root node is special case of branch or leaf node. This is the entry point into the data structure. If all keys can be stored in one node, then it will be of leaf node. Else, usually root is a branch node.

Leaf

Leaf nodes contain ordered list of index or record key data. It also contains pointer to next and previous leaf nodes. Current implementation needs pointers to previous nodes but we are working on enhancement to remove that dependence.

Based on the size of the key, it contains as many keys that can fit in leaf block size. Default block size is 2048 but can be changed during the construction of the index by providing STORAGE keyword.

Max key value of leaf is usually last key in the node.

For Example, say leaf node can contain keys 100, 200 and 300 in the order. Also, its max key value in this case will be 300.

Operations

Insertion

During insertion it starts at the root node. And it traverses up to the leaf node (position in the ordered list of keys) where the item can be inserted.

Since every item in all nodes (branch and leaf) are ordered, it uses binary search to locate the item in the node where new item can be inserted. It follows this step recursively from root up to the leaf and inserts that item into the leaf.

Insertion balancing

If newly added item increases the size of leaf node more than the block size (default 2048 bytes), then it splits the node in to 2 leaf nodes and traverses upward to insert pointer of newly added leaf into the parent branch node. If after inserting this new pointer, if branch node needs to be split because of its, size then it recursively continues upward until root node to rebalance. Since rebalancing is changing whole structure of the BTree+, whole BTree+ needs to be locked during this operation.

Deletion

Similar to insertion, deletion also starts at the root node and traverses up to the leaf where item to be deleted is located. And it removes that item from the leaf node.

Delete rebalancing

If leaf node becomes empty after the deletion of the item then again, it needs to rebalancing to remove its pointer from the parent node and recursively up to the root node if required. During this time, whole BTree+ needs to be locked.

Update

Fortunately update is not required. Update can turn out to be delete and insert operation which will be two separate operations on the BTree+

Tree locking scenarios

Read

Following steps are performed

  1. Read lock on the tree.
  2. Traverse up to the leaf node
  3. Read lock leaf node.
  4. Unlock read lock on tree started on #1.
  5. Return iterator so that range (or PK lookup) query can walk through the range. Iterator properly unlocks and locks leaf nodes while traversing from node to node.
  6. At the end it is query processor (callers) responsibility to unlock the read lock on the leaf node.
Insert

Following steps are performed

  1. Read lock on the tree.
  2. Traverse up to the leaf node.
  3. Write lock leaf node.
  4. Try inserting an item. If it needs to split because of of size increase, then
    1. Release write lock on the leaf. (Step #3)
    2. unlock read lock (Step #1).
    3. acquire write lock on the tree, since rebalancing operation might be performed.
    4. Again Traverse up to the leaf node.
    5. Write lock leaf node. In theory this is not required since there is a lock on the tree itself. There are no other threads in this tree at this time.
    6. If rebalance is required. Rebalance the tree
    7. Unlock write lock on the tree
  5. Insert the item.
  6. Unlock write lock on leaf
  7. Unlock read lock on the tree.
Delete
  1. Read lock on the tree.
  2. Traverse to the leaf node.
  3. Write lock leaf node.
  4. Remove the item. If it needs to reblance
    1. Release write lock on the item
    2. Release read lock on the tree.
    3. Acquire write lock on the tree
    4. Traverse to the leaf
    5. Remove the item.
    6. Rebalance of required.
    7. Unlock tree
  5. Unlock leaf.
  6. Unlock tree.

Node levels in Wonderdb

Branch nodes store disk pointer to next node (about 10 bytes). So for default 2048 bytes, it stores about 200 items in branch node.

Leaf node on the other hand stores actual key value and the pointer to the actual record contents. So for key size of 100 bytes, it needs 110 bytes (100 for the key and 10 for the pointer to the record). So it stores about 100 keys in the leaf block.

Based on above assumptions,

2 level tree will store 100*200 = 20000 items, 200+1 = 201 blocks = 201*2048 ~ 400KB disk space

3 level tree will store 100*200*200 = 4000000 = 4M items, size of tree will be (200*200)*2048 ~ 80MB

4 level tree will store 100*200*200*200 = 1600000000 = 1.6B items, disk space = 200*200*200*2048 = 1.6 GB

From above calculations you can easily see why whole btree can be present in physical memory if we assume we have 50+ GB physical memory. Machines with 50+GB is considered commodity hardware nowadays.

This calculation is very important in choosing BTree+ vs Hash index if range query is not required. If configured properly, hash index can perform 2-3 times faster than BTree+ which will be huge improvement.

 

High Level Architecture

posted in: Projects | 0




archAt the core, it provides persistent BTree+ and linked lists of variable size blocks and memory/buffer management. With this basic infrastructure, it can be customized to store any type in the database with indexing support for fast data retrieval. First use case we are implementing is cache.

Record level locking and variable buffer management are very useful for performance and throughput. With variable size blocks, tables with bigger record sizes can be stored in data files with bigger block sizes where as indexes or small tables could be stored in smaller block size data files. This way record/index can be fetched in to memory cache with one disk io.

We have seen it scale linearly with queries per second per node up to 33000 per second with Amazon EC2 xlarge node (4 CPU, 13 GB RAM and 2, 40GB SSDs with 3000 IPOs).

Buffer Cache Manager

There are 2 buffer caches in the system. One which sits in the JVM heap (By default it is allocated about 30% of heap size) and other uses java direct buffers outside of JVM.

Buffer cache is a simple fixed bucket hash map of objects or blocks. Buffer cache in JVM heap stores Record or index objects where as buffer cache in direct memory stores block of bytes,

On cache miss, JVM buffer cache gets data from direct memory buffer cache. Direct memory buffer cache on cache miss brings buffer from disk. Until buffer is brought to jvm buffer cache, requesting thread blocks. This thread is most probably the thread executing the query.

It modifies data in both the buffers for the queries which modify buffer contents (Insert, Update and Delete queries).

Buffers which are accessed during query processing (any CRUD query) are pinned to the memory by inserting their pointers (location in disk) in ConcurrentLinkedQueue so that eviction or writer threads don’t evict or sync back to disk these buffers (or objects) as these buffers/objects are most probably locked during query processing.

Cache Evictor

There are two evictor threads in the system.

Normal JVM heap evictor thread

This thread starts evicting JVM heap buffer cache when it reaches high watermark. This thread starts evicting least used buffers which are not pinned (being accessed at the time). Low watermark by default is set to 90% of cache size. And high watermark is by default set to 95%. It evicts objects until it reaches low watermark.

Normal Direct memory evictor thread

When cache reaches high watermark, eviction thread starts evicting least used buffers which are not pinned and dirty (dirty buffers are the buffers which have changed recently and not yet synced to disk). Low watermark by default is set to 90% of cache size.And high watermark is by default set to 95%. It evicts objects until it reaches low watermark.

Cache Writer

This can be set in to modes;

Aggressive write mode

As soon as buffer gets dirty (may be due to insert, update, delete query), it is written back to disk. This setting is useful when log writer is not enabled.

Timed write mode

In this mode, main writer thread wakes up every so often (3 seconds by default) and writes all dirty buffers which have changed before the writer thread was started.

During the scan, for every dirty buffer, it picks up a new thread from the pool for writing to disk. If thread is not available in thread pool, it waits until a thread is available. Thus writer is implemented as multi-threaded which is very useful when a machine has lot of spindles.

Data structures

Both BTree+ and Linked list support persistent store so that if any element/block is not available in the memory, it can go and get it from the disk.

How Queries are executed

set

  1. First it creates Unique index objects from insert parameters and puts if absent in global ConcurrentHashMap. This step is required to make sure no other query is inserting or updating same Unique indexes if it is, then UKViolationException is thrown.
  2. Then it checks every unique BTree+ index tree to make sure these indexes are not already present in the BTree. If it does’ it throws UKViolationException.
  3. It inserts this record in Collection/Table linked list.
  4. Inserts all indexes in to various BTree+ trees.
  5. Removes unique index objects from ConcurrentHashMap in step 1 and returns.

get

  1. Select query can select more than one record based on where filter.
  2. Based on the filter columns, it checks if it needs to take Index scan or full table scan.
  3. If its full table scan, it positions Full table scan iterator on head of collection/table linked list.
  4. If its indexed scan, it first positions Index scan iterator into proper LeafIndexBlock. From this is where it will start scanning the index list.
  5. For each record in the iterator, it applies the filter and generates the list of record pointers in the result set. For index scan if filter returns true, it selects that record, else it stops scanning. For Full table scan it scans whole link list.
  6. Once it has list of record pointers, it goes selects every record and returns based on select column list if that record still passes filter.

set (Update)

  1. Update query can update more than one record.
  2. So first it selects records which need to be updated executing steps 1-5 in select query.
  3. For each record in the select list, it updates the record if that record still passes filter.

remove

  1. Similar to update query, delete query first selects records which need to be deleted.
  2. Then deleted every record if that record still passes filter.

Locking

Locking Tree

  1. Whole tree is locked for reads when tree is being searched. Once record/block is located, this lock is removed.
  2. Once leaf node/index entry is located, it puts read or write lock on index leaf block based on type of query. For reads it puts read locks. For writes, it puts exclusive write lock on index leaf block.
  3. If block needs to split, it sets the splitRequired = true for that block. Removes the write lock and then tries to acquire exclusive write lock on the tree and then goes through the BTree split motions. During this any number of blocks might need to split. But since Tree is locked exclusively, other threads wont get affected. Also, while during the time between lock on leaf block is removed and exclusive lock on tree is acquired, if any other thread tries to update that leaf block, it will get SplitRequiredException.
  4. Current implementation sets splitRequired=true flag for the index leaf block even if all index entries in that block are removed. And then gets exclusive lock on the tree and removes that block and may be many more blocks from the tree. Technically this can be done separately without blocking query thread. Something I need to change in the future.

Locking Link List

  1. Specific block in linked list is read or write locked for every CRUD operation on the record in that block.
  2. During table scans, when it needs to move to next block, it first locks next block and then removes lock on the current block. So for some time, 2 consecutive blocks in the list will be blocked. Due to this, list could be scanned in forward direction only.

Extending the Link List

  1. Extending link list tail is carried out by separate asynchronous thread. Every time it keeps extending the list by configurable number (5 blocks by default) of blocks. This way, whole list doesn’t get locked until schema metadata for that collection/table is updated.

This scheme of extending link list is required so that inserts wont become serialized to single threaded when tail needs to extend during inserts.