HTAP databases - lets get distributed - Part 2 of 4

Introduction


This is the second in a 4-part series on HTAP (Hybrid Transaction and Analytical Processing) databases. In the previous blog post (link opens in separate window) in this series I described the architecture known as "Primary-row store with an in-memory column store" and discussed a simple design of such a system and the concurrency that would be required when designing it. I will now describe a separate system architecture for HTAP known as "Distributed row store with a replicated column store", see the image below for the architecture. This blog will cover weaknesses and strengths in this architecture, some notable implemementations in production systems and some pseudocode model in fake Pluscal to describe what I think it should look like. Again this is my mental model, you are encouraged to make your own and mine is purposely high level. As an aside, Pluscal is a math based language for describing concurrent systems. I use it as its more general than a programming language and more powerful to express the ideas I wish to describe. Please check out Pluscal to learn more. Anyway since a picture is worth a thousand words, lets start at the below schematic of the architecture.



Please note this diagram is adapted from the paper "HTAP databases: A Survey", by Zhang et al. 2024. The contents of this blog are inspired by insights from this brilliant paper as well. However I will not go into the depth that the paper provides. The reader is encouraged to read the paper.



How does "Distributed row store with a replicated column store handle data freshness and scalablity" ? Strengths and Weaknesses


In the previous blog we discussed our requirements for high freshness in our data and performance with scalability. Each database approaches these concerns differently. You are looking at a database that has distributed its transactions across several nodes (for the sake of simplicity we use nodes 1 and 2 for transtional queries, node 4 for analytic queries and a master). To start, a transactional query comes in of type write/read and is handled solely by the Master. A designated set of nodes will be using a column store exclusively (these nodes dont partake in leader election and consensus) and these will take responsibility for complex queries that are of type read only. Still other nodes (1 and 2 in this schematic) will be designated as replicas of the Master and its transactions (these nodes partake in leader election and consensus). Nodes 1 and 2 are read only stores. The Master will replicate its log to these nodes (1, 2 and 3). What you will notice is that now the transactions live in separate address spaces, possibly in different geographical locations. This means time spent doing a round trip when communcating to replicate the state (log) and co-ordinate. Here are some fun numbers: if the Master is in Amsterdam and Node 1 is in California we can expect a round trip to take around 200Mns (million nanoseconds). If both Nodes are in a single data center, we still take about 500kns (thousand nanoseconds). Compare this to an in-memory database in L1 cache where reads and write are about 0.7ns and you see the problem. Data will not be fresh here, but it will be distributed and this solution will offer scalability on cloud resources. Also we leverage horizontal scalability to get higher isolation performance. In other words you have to make a trade-off based on your case for freshness vs scalability.


Now for free-hand fake pluscal


In the previous blog post, I used a pseudocode to make the ideas widely accessible. With fair warning, this will not be pretty but you will get the gist even though this is free-hand without an IDE or git. I will not provide a code source but I encourage you to learn Pluscal as its the only way I know how to express this level of concurrency in distributed systems. Also this code will not be translatable to TLA+ as its fake and purposed with giving you a very high level view of the system. You can take this and do an implementation in whatever language you like.



// ASSUME: the Master has been elected via Leader Election, and has not failed
// at this time. We will not simulate Leader Election here. Also ignore
// that query processing is done by co-ordinating on different nodes.
// We do this to focus our discussion

// start of algorithm

// global data transfer 
// where txns are of type Analytical and Transactional
channel chan == FIFO queue of txns 

process client 1 to N where N is a Natural Number {
	local var trxn t sent to Master Node over channel chan
    (tcp perhaps) to write/read
}


process Master {
	either {
    HANDLE_QUERIES:
      while(true){
          if(channel chan has a txn t){
              write txn into the WAL for Recovery/Failure
              if(txn is a write){
                t1 is parsed through Query engine and written 
                to the root node of some B+tree or node in SkipList
              }

              if(txn is a read){
                t1 parsed through Query planner/optimizer 
                then run on tables to search for row data
                  return row data 
              }

          }
      }
    } or {
    REPLICATE LOGS:
      while(true){
      	if(logs exist on Master thats not replicated yet){
        	send log data from Master to Nodes to replicate
            the state on the Nodes
        }
      }
    } // end either-Or
}

process n in Set of {Nodes} such that Nodes >= (2*(Number of Failed Nodes) + 1) 
	AND n is NOT the Master Node {
    
	while(true){
    	if(channel chan has a txn t of type read){
        	t2 parsed through Query planner then run 
            on tables to search for column data if this node
            is Node 4 or get data from row data if this node
            is either Node 1 or 2 (see the schematic for the nodes)
            
            if(column data not in data structures){
        		pull in data from Master via Transform process
        	}
        }
    }
}

process Transform process {
	while(true){
    	if(event triggered){
        	pull in data from the Master store into the 
            Node that needs to service the query
        }
    }
}


Please note that distrbuted query planning is a topic I will cover later in detail but is too big to do with this post and will just draw focus from what I want to communicate. The point is that data is moving through the system from client to Master to Nodes and then other clients are querying the Nodes as well, but only for reads. Here the concurrency is [1 Transform process + 1 Master process + N number of Node processes] to ensure leader election and recovery. Here concurrent execution can easily see 100's of processes due to many Nodes in the system. This is a pretty hard architecture to test and build as there are many ways it can fail. However as always, I have to add, this is a beautiful system, complex but fascinating and since I assume you all love concurrency ... what a wonderful piece of engineering.


Analysis and Final thoughts

So where can you use such a system? Its ideal for a case where freshness is not paramount, and where scalabilty is the main driver. Systems such as recommendations for E-commerce and customer purchase preferences and analytics could find this database useful. I reiterate there is no single database to rule them all. However this database can provide a cost saving since the need for two separate databases (transactional + analytical + ETL) can be replaced with a single database that can do both transactional and analytical workloads at near-real-time and at high scalability.

Now for some shameless self promotion: if you are interested in these types of databases, I am building a HTAP database called SparrowDB. Please check it out! It's written in mostly Rust. If you want to learn about databases in general, follow me on Linkedin and checkout out my blogs. This blog space is my contribution to my favourite topic - Concurrency!

In closing, I am deliberately being abstract here and creating what I think should be done in this algorithm. This does not mean its correct! If you are interested in looking into actual systems that use this architecture, please see TiDB from PingCap and F1 Lightning (built on Google Spanner) which uses this architecture. Have a great week and come by later to see the rest.

Comments

Popular posts from this blog

HTAP Databases and the processes that keep them going - Part 1 of 4

Cache and Buffer Pool Manager