Replication in databases - Part 1
This write up is Part-1 of series of posts about Replication in databases.
Replication is a process of maintaining a copy of the same data on multiple machines that are connected via a network. Following are the common use cases for replicating data:
- To keep data geographically close to users of the system(ultimately reducing latency)
- To allow the system to continue working even if some of its parts have failed(ultimately increasing availability)
- To scale out the number of machines that can serve read queries(ultimately increasing read throughput)
Replication becomes fairly easy and straightforward if the data which the system has is very static and doesn’t change over time at all. Just copying the data to every node will do the task. But the difficulty lies in replicating ever changing dynamic data.
To understand how dynamic data is replicated in databases, we’ll need to understand the concept of a leader, followers. Every node which stores a copy of the data is called a replica. With multiple replicas how do we ensure that the data ends up on all the replicas? Every write to the database needs to be processed by every replica, otherwise the replicas would no longer contain the same data. The most common solution for this is leader-based replication(AKA active/passive master-slave replication). It works as follows:
- One of the replicas is designated as the leader(AKA master/primary). When the clients(Applications) want to write to the database, they will send the requests to the leader, which first writes the data to its local storage.
- The other replicas (AKA followers/read replicas/slaves/secondaries/hot standbys) receive the writes( the data which the leader processed as part of the replication log/change stream). Each follower takes the log from the leader and updates its local copy of the database accordingly, by apply writes in the same order as they were processed on the leader.
- When a client wants to read from the database, it can query either the leader or any one of the followers. However writes are only accepted on the leader(followers are read only according to the client)
This mode of replication is the built-in feature in many relational databases like PostgreSQL, MySQL. Many non-relational databases like MongoDB, RethinkDB also use the following replication process.
Synchronous Versus Asynchronous Replication
Now that we know how replication is achieved, an important detail to be considered is whether replication happens synchronously or asynchronously(configurable in most databases). Let’s imagine a scenario where client2 in the above image updates the username attribute by sending the update request to the leader, shortly afterword the same request is received by the followers stating that the data has changed, eventually the leader notifies the client that the update was successful.
In the above scenario, the leader waits until replica1 confirms that it has received the write and it has executed the operation locally and successfully before reporting success to client2. Hence the replication to replica1 is synchronous. On the other hand the leader doesn’t wait for acknowledgement from replica2, thus replication in case of replica2 is asynchronous. As you can see there can be a substantial amount of delay before any of the asynchronously replicated replicas apply the changes locally. Usually replication is very fast, most databases apply changes to followers in less than 1 second, but this again is dependent on the configuration(number of sync replicas, location etc). Thus there is no guarantee on how much time the entire replication process might take.
I’ve seen circumstances where followers fall behind master by several minutes or more. These might occur due to
- System recovering from a fault
- System operating under maximum capacity
- Network partition among members of the cluster
An advantage of synchronous replication is that there will always be one up to date(consistent) copy of the leader at any point in time. If the leader fails due to whatever reasons, we can be sure that the data is still available on the follower. This again is a double edged sword, if the synchronous follower doesn’t respond(system fault, max capacity, network partition etc), writes will not be processed, until all of the followers are available to apply the operations. The leader will block all the operations until then.
It’s for this reason, configuring all of the replicas to be synchronous is not a very good practice. Any one among these replicas going down would result in the system would grind to a halt. Usually one of the replicas would be synchronous, rest of them would be operating in asynchronous mode. If the synchronous follower becomes unavailable or slow, one of the asynchronous follower(who has the most updated data snapshot), will be made synchronous. This configuration is called semi-synchronous.
If the goal of the system is to support high throughput(writes), leader-based replication is always configured to be completely asynchronous, so that the leader need not be worried about the replication status in the replicas, even if they fall behind and can continue processing requests. But in case if the leader fails and if any writes haven’t been replicated to the asynchronous follower that is going to be promoted, these un replicated writes will be lost completely (not suitable for data critical) systems.
Setting up new followers
There will be times where we’ll have to setup new followers(expanding to a new geo-location, Disaster recovery etc), in this case simply copying the data from the master over to the new replicas won’t suffice, because the data is always in flux, so a standard file copy would see different parts of the database at different points in time, this might not make any sense as a whole. Another option would be to make the files on disk consistent by locking the database( making it unavailable for writes), but that’s against the goal of high availability. However the widely used approach is something similar to this:
- Take a consistent snapshot of the leader’s database at some point in time - if possible without locking the entire DB
- Copy the snapshot to the new replica node
- Replica connects to the leader and requests for all the operations that have happened since the snapshot was taken, this requires that the snapshot is associated with an exact position in the leader’s replication log(MySQL binlog for ex).
- Once the follower has processed the backlog of data changes since the snapshot, it is marked as caught up. It can now continue to process data changes from leader as they happen
One thing to keep in mind here is, since the new follower is already behind when it starts reading backlog, if both this node and the leader are processing operations at the same rate and the leader continues to get requests, the follower will never catch up, hence its advisable to have either
- The replica operate at a higher throughput
- Setup the replica when there’s minimal requests hitting the master
Node Outages
The hard truth about computers is that they can go down at any point in time(unexpectedly due to a fault, planned maintenance, installing a security patch etc). To be highly available, we should be able to reboot nodes(leader, replicas) without any downtime. Let’s see the details behind handling both the leader failure, follower failure.
- Follower failure: On its local disk, each replica keeps a log of the data changes it has received from the leader. If a follower crashes or is restarted or if the network between the leader and follower is temporarily interrupted, the follower can require quite easily by getting the last transaction that was processed before the fault occurred, the follower connects to the leader and request all the data that has changed after that transaction. When it has applied these changes, it has caught up to the leader and can continue receiving stream of data changes as before.
-
Leader failure: This failure is trickier because one of the leaders has to be promoted as the new leader, clients need to the reconfigured to send the writes to the new leader rather than the old one, other followers need to start consuming data changes from the new leader. This is called failover. Failover can happen manually(administrator taking necessary steps to promote the new leader) or automatically. Automatic failover process usually consists of the following steps:
- Determining that the leader has failed: there are many things that could potentially go wrong: crashes, power outages, network partitions and more. There is no single foolproof way of detecting what has gone wrong, so most systems simply use a timeout. Clients/Nodes frequently bounce messages back and forth between each other(heart beats) and if a node doesn’t respond to the heart beat for some period of time say(30s), it is assumed to be dead (talk about the trouble with distributed systems).
- Choosing a new leader: This is usually done through an election process(where a leader is chosen by a voting process where the remaining replicas participate in an election using any consensus algorithm(Paxos/Raft/Zab)). The best candidate is usually the replica with the most up to date snapshot from the old leader.
- Reconfiguring the system to use the new leader: Clients will now send the requests(writes) to the new leader. catch here is that the old leader might come back, thinking that its still the leader, the system needs to ensure that the old leader becomes a follower and recognizes the new leader rather than waging a war against it.
If asynchronous replication is used, the new leader might not have received all the writes from the old leader before it failed. If the former leader rejoins the cluster after the new leader was elected, the new leader might have received conflicting writes in the meantime. For example let’s say that there was a row insert operation which was granted a sequential primary key X, and this operation wasn’t replicated to the followers. After this operation let’s assume that the leader went down and one of the followers was made a leader. Sequential primary keys have the invariant that if a row X was inserted at point T, then any of the rows which were inserted before X would have the primary key less than X. Hence, if the new leader starts receiving writes, it might grant X to some other row. Subsequently if the old leader comes back to life, row with primary key X have conflicting data. The most common solution for this problem is to discard old leader’s un-replicated writes, this might violate durability expectations.
Also in some cases it could happen that both the old leader and the new leader believe that they are leaders. This situation is called a split brain and is especially dangerous if both the leaders accept writes and there is no process for resolving conflicts. Some systems have a mechanism to shut down one of the nodes if two leaders are detected, but this solution is to be taken with a pinch of salt, i.e if its not correctly designed both the nodes might be shut down.
Another interesting problem here is what is the right timeout for the node to be declared dead? A longer timeout means a longer time to recover in case the leader fails. If the timeout is too short then there can be unnecessary failures. Example: A temporary spike in load could cause the node’s response time to increase above the timeout or a network glitch could cause delayed packets. If the system is already struggling with load/network problems, an unnecessary failover is likely to make the situation worse, not better.