HTAP database - Distributed column storage - Part 3 of 4

Introduction


This is the 3rd in a 4-part series on HTAP (Hybrid Transaction and Analytical Processing) databases. In the previous blog post (link opens in separate window) in this series I described the architecture known as "Distributed row store with a replicated column store" and discussed a simple design of such a system and the concurrency that would be required when designing it. If you like distributed systems go check that one out as well. I will now describe a separate system architecture for HTAP known as "Distributed column storage with Row store on a disk", see the image below for the architecture. This blog will cover weaknesses and strengths in this architecture, some notable implementations in production systems and some pseudocode model in fake Pluscal to describe what I think it should look like. Again this is my mental model, you are encouraged to make your own and mine is purposely high level. As an aside, Pluscal is a math based language for describing concurrent systems. I use it as its more general than a programming language and more powerful to express the ideas I wish to describe. Please check out Pluscal to learn more. Personally I best understand systems in graphical format, so I always start with what we are looking at in a schematic.



Please note this diagram is adapted from the paper "HTAP databases: A Survey", by Zhang et al. 2024. The contents of this blog are inspired by insights from this brilliant paper as well. However I will not go into the depth that the paper provides. The reader is encouraged to read the paper.

How does this system handle data freshness and scalablity? Comparing weaknesses and strengths


The present database has distributed Column storage and shows a Master node and 3 other nodes in a cluster called the Distributed column storage (on the right). In other words we have to configure a cluster for this system, and we can envision a boundary around the cluster as a logical isolation that keeps this system separate and also protected using isolation. The left module in the graphic shows a disk, a row storage disk to be exact, and it stores data its this format. Why row format? Its best for transactional queries and columns are of course best for analytic queries. More detailed discussions can be found on the previous blogs, which you are encouraged to read. Another architectural piece is what is called "Transform" that sits logically between the cluster and row store. The job of this component is to parse row storage data from the row storage disk to the column storage nodes of the cluster. The mechanism and schedule for this parsing mechanism, can be designed in severals ways, but briefly you can either do this at some time interval you select, when the buffer size of the data frame is above some size limit, or when a query is serviced by the Cluster that requires the data to be brought into the cluster on-demand. You can imagine this as some process named Transform. So now lets discuss the freshness of the data. We know from previous discussions that once you dont have analytical queries happening in memory next to your Row storage Bufferpool Manager, you can expect some delays since you no longer have access to the data logically in DRAM or in Cache. And as we see there's a Cluster and we can assume its in a single server farm, so lets say round trips in messaging will take us about 500Kns (thousand nanoseconds). But the penalty we pay in time taken to get our data for analytics is at least traded for much better isolation performance, which is great for analytic applications where performance requirements are paramount. That being said we lose a little power in scalability since we only have a single row data storage and we dont have the ability to easily scale this horizontally. So again we come back to the point: based on your application you need to make a tradeoff about what you need and decide what you can live with in terms of scalability, performance and freshness of data. As an aside I am building a database called KestrelDB (formerly SparrowDB) where I am trying to build an architecture that allows ideal benefits in scalablity, freshness, and performance. If you like databases and seeing systems being built, do take a look!


Now for free-hand fake pluscal as is the custom for my blog space


In the previous blog posts, I used a pseudocode to make the ideas widely accessible, to people who dont know pluscal or some language, and we just use english with some pluscal syntax. With fair warning, this will not be pretty but you will get the gist even though this is free-hand without an IDE or git. I will not provide a code source but I encourage you to learn Pluscal as its great for expressing this level of concurrency in distributed systems. Also this code will not be translatable to TLA+ as its fake and purposed with giving you a very high level view of the system. You can take this pseudocode and do an implementation in whatever language you like.



// ASSUME: the Master has been elected via Leader Election inside the cluster, 
// and has not failed at this time. We will not simulate Leader Election here. Also ignore
// that query processing is done by co-ordinating on different nodes in the Cluster.
// We do this to focus our discussion, I will address query processing separately.

// start of algorithm

// global data transfer 
// where txns are of type Analytical and Transactional
channel chan_OLTP == FIFO queue of txns for OLTP
channel chan_OLAP == FIFO queue of txns for OLAP

process OLTP client 1 to N where N is a Natural Number {
	local var trxn t sent to Row Storage over channel chan_OLTP 
    (tcp perhaps) to write/read
}

process OLAP client 1 to N where N is a Natural Number {
	local var trxn t sent to Cluster Master over channel chan_OLAP
    (tcp perhaps) to read
}


process Master in Cluster {
	either {
    HANDLE_QUERIES:
      while(true){
          if(channel chan_OLAP has a txn t){
              write txn into the WAL for Recovery/Failure
              if(txn is a read){
                t1 parsed through Query planner/optimizer 
                then run on tables to search for column data
                then return column data 
              }

          }
      }
    } or {
    REPLICATE LOGS:
      while(true){
      	if(logs exist on Master thats not replicated yet){
        	send log data from Master to Nodes to replicate
            the state on the Nodes
        }
      }
    } // end either-Or
}

process n in Set of {Nodes in Cluster} such that Nodes >= (2*(Number of Failed Nodes) + 1) 
	AND n is NOT the Master Node {
    
	while(true){
    	if(channel chan_OLAP has a txn t of type read){
        	t parsed through Query planner then run 
            on tables to search for column data if this node
            is in set of Nodes {1,2,3}
            (see the schematic for the nodes 1,2,3 in Cluster)
            
            if(column data not in data structures){
        	 	trigger an event to the Transform process 
                to get the data from the Row storage disk
                into the Cluster
        	}
        }
    }
}

process Transform process {
	while(true){
    	if(event triggered){
        	pull in data from the Master store into the 
            Node that needs to service the query
        } else if (based on timing interval AND interval == 0){
        	trigger event to parse the data from the Row 
            Storage to the Cluster
        } else {
        	some other mechanism can also trigger the parsing
            of data to the cluster
        }
    }
}


Please note that distributed query planning is a topic I will cover later in detail but is too big to do with this post. The point here is that data is moving through the system from clients doing transactions or analytics to the Row store or the Cluster Master and its Nodes, respectively. The Row storage handles the transactinal queries and Analytics is handled by the Cluster. The Transform process is doing the job of parsing data from Row Storage to Column storage, something that requires some careful thought, and which I am incidently doing right now for my project called KestrelDB. Here the concurrency is [1 Transform process + 1 Row Storage process (with its own sub-processes) + N Processes in the Cluster]. Here concurrent execution can easily see 1000's of processes due to many Nodes in the system. This a difficult architecture to code, and I hope that giving you the pseudocode above will at least give you an idea of how to do it yourself. I recommend you start with threads and see if you can create such a system in a simple format, then expand on it. I also recommend TDD (test driven design) as concurrency is hard and I use TDD very often when I am working on the KestrelDB code base in Rust.


Analysis and Final thoughts

So where can you use such a system? To recap, the limiting factors are lower data freshness, but the benefits are higher performance but with lower scalability due to the Row Storage on disk. It is postulate that IoT applications that require lower data freshness in certain applications is a good match, provided that the service does not do updates very often. This architecture will provide great performance for such systems.

Now for some shameless self promotion: if you are interested in these types of databases, I am building a HTAP database called KestrelDB. Please check it out! It's written in mostly Rust. If you want to learn about databases in general, follow me on Linkedin and checkout out my blogs. This blog space is my contribution to my favourite topic - Concurrency!

In closing, I am deliberately being abstract here and creating what I think should be done in this algorithm. This does not mean its correct! If you are interested in looking into actual systems that use this architecture, please see MySQL Heatwave which uses this architecture. Have a great week and come by later to see the rest. Thanks for reading to the end.

Comments

Popular posts from this blog

HTAP Databases and the processes that keep them going - Part 1 of 4

Cache and Buffer Pool Manager

HTAP databases - lets get distributed - Part 2 of 4