/ distributed-systems

Trouble with distributed systems - Process pauses

This is a continuation for the write up at Trouble with Distributed Systems - Part 2, which focused on Issues with Time, the current write up will primarily be focused on Process Pauses.

Process Pauses

Let’s consider an example of a dangerous clock use in a distributed system. Say we have a database with a single leader per partition. Only the leader is accepting writes. How will the node know that it is still the leader?(that it hasn’t been declared dead by other nodes), and that it may safely accept writes? One option is for the leader to obtain a lease from the other nodes, which is similar to a lock with a timeout. Only one node can hold the lease at any point in time- thus, when a node obtains the lease, it knows that it is the leader for some amount of time, until the lease expires. In order to remain the leader, the node must periodically renew the lease before it expires. If the node fails, it stops renewing the lease, so another node can take over when it expires. This can be thought of a request-handling loop looking something like this:

while(true){
    request = getIncomingRequest();
    if(lease.expiryTimeMills() - System.currentTimeMills() < 10000){
        lease = lease.renew(); 
    }
    if(lease.isValid()){
        process(request);
    }
}

Well, there are multiple things which are wrong with this piece of code(thread safety etc), but the flaw which we are more concerned about is relying on synchronized clock(System.currentTimeMills()), the expiry time on the lease being set by a different machine(where the expiry time being calculated as the current time plus 30s for example.), and it’s being compared to the local system clock. If the clocks are out of sync by more than a few seconds, the code will start doing strange things.

Secondly, even if we change the protocol to only use the local monotonic clock, there is another problem: the code assumes that very little time passes between the point that it checks the time (System.currentTimeMills()) and the time when the request is processed (process(request)). Normally this code should run very quickly, so the 10 second buffer is more than enough that the lease doesn’t expire in the middle of processing a request.

However, what if there is an unexpected pause in the execution of the program? For example, imagine the thread being stopped for 15s around the line lease.isValid() before finally continuing. In that case, it’s very likely that the lease will have expired by the time the request is processed, and another node has already taken over as leader. However there is nothing to tell that this thread was paused for so long, so this code won’t notice that the lease has expired until the next iteration of the loop - by which time it may have already done something unsafe by processing the request.

While the idea of assuming that thread sleeping for 15s may sound very crazy, but there are various reasons why this could happen like:

  1. Programming language run-times like the JVM have a Garbage collector, that occasionally needs to stop all running threads. These stop-the-world GC pauses can last for several minutes some times. You might be thinking about the concurrent garbage collectors like the HotSpot JVM’s concurrent mark sweep, but even this cannot fully run in parallel with the application code - even they need to stop the world from time to time. Although these pauses can be reduced by changing the allocation patterns or tuning GC settings, we must always assume the worst if we want to offer robust guarantees.
  2. In virtualized environments, a virtual machine can be suspended(pausing the execution of all processes and saving the contents of memory to disk) and resumed(restoring the contents of memory and continuing execution). This pause can occur at any time in a process’s execution cycle and can last for an arbitrary length of time. This is especially applicable when the virtual machine gets migrated from one node to another.
  3. When the operating system context-switches to another thread, or when the hypervisor switches to a different virtual machine(when running in a virtual machine), the currently running thread can be paused at any arbitrary point in code.
  4. If the application performs synchronous disk access/network access, a thread may be paused waiting for the slow disk/ network I/O operation to complete.
  5. If the operating system is configured to allow swapping to disk(paging), a simple memory access may result in a page fault that requires a page from disk to be loaded into memory. The thread is paused while this slow I/O operation takes place. If the memory pressure is high, this may in turn require a different page to be swapped out to disk. In extreme circumstances, the operating system may spend most of its time swapping pages in and out of memory and getting little actual work done(this is known as thrashing). To avoid this problem, paging is often disabled on server machines.

All of the above operations can preempt the running thread at any point and resume it at some later time, without the thread even noticing. The problem is similar to making multi-threaded code on a single machine thread-safe: we cannot assume anything about timing, because context switches and parallelism may occur.

A node in a distributed system must assume its execution can be paused for a significant amount of time at any point, even in the middle of a function. During the pause, the rest of the world keeps moving and may even declare the paused node dead because it’s not responding. Eventually, the paused node may continue running, without even noticing that it was asleep until it checks its clock sometime later.

A more realistic example

Frequently, some systems require that there’s only one of something. For example:

  1. Only one transaction or client is allowed to hold the lock for a particular resource or object, to prevent concurrently writing to it and corrupting it.
  2. Only one user is allowed to register a particular username, because a username must uniquely identify a user.

Implementing the above things in a distributed system requires care: even if a node believes that it is the chosen one. that doesn’t necessarily mean a quorum of nodes(majority of nodes amongst a group of nodes) agree. A node may have formerly been the leader, but if the other nodes declared it dead in the meantime(e.g, due to a network interruption or GC pause), it may have been demoted and another leader may have already been elected.

If a node continues acting as the chosen one, even though the majority of nodes have declared it dead, it could cause problems in a system that is not carefully designed. Such a node can send messages to other nodes in its self-appointed capacity, and if other nodes believe it, the system as a whole may do something incorrect.

The following diagram, shows a data corruption bug due to an incorrect implementation of locking. Say that we want to ensure that a file in storage service can only be accessed by one client at a time, because if multiple clients tried to write to it, the file would become corrupted. We try to implement this by requiring a client to obtain a lease from a lock service before accessing the file.

file access by locking
file access by locking
The problem is an example of what was described at the very beginning of this post(process pauses). If the client holding the lease is paused for too long, its lease expires. Another client can obtain a lease for the same file, and start writing to the file. When the paused client comes back, it believes(incorrectly) that it still has a valid lease and proceeds to also write to the file. As a result, the client’s write clash and corrupt the file.

Solution - Fencing tokens

When using a lock or a lease to protect access to some resource, such as the file storage in the above diagram, we need to ensure that a node that is under a false belief of being the chosen one cannot disrupt the rest of the system. A technique called fencing is used to achieve this goal, it works as illustrated in the below diagram

file access by locking and fencing
file access by locking and fencing

Let’s assume that every time the lock server grants a lock or lease, it also returns a fencing token, which is a number that increases every time a lock is granted(e.g, incremented by the lock service like zookeeper). We can then require that every time a client sends a write request to the storage service, it must include its current fencing token.

In the above diagram, the Client 1 acquires the lock with a token of 33, but then it goes into a long pause and the lease expires. Client 2 acquires the lease with a token of 34 (since the number always increases) and then sends its write request to the storage service, including the token of 34. Later Client 1 comes back to life and sends its write to the storage service, including its token value 33. However, the storage server remembers that it has already processed a write with a higher token number 34, and so it rejects the request with token 33.

If Zookeeper is used as the lock service, the transaction id zxid or the node version cversion can be used as fencing token. Since they are guaranteed to be monotonically increasing. Some systems like Redisson argue that choosing a token on the client side is a downside, but with the above limitations in place processing of tokens on the server is a good thing, because it is unwise for a service to assume that its clients will always be well behaved, because the clients are often run by people whose priorities are very different from the priorities of the people running the service.

Conclusion

While this post might have presented run times with stop the world garbage collection, as a potentially bad/ineffective run time. In practice this is not the case, most of the applications in production today are based on these run times. Hence, its very important to understand the limitations that the underlying run times come with, in order to prevent any surprising down times or production issues blowing up in front of our face. However, there are other run times, with a better garbage collection strategy like BEAM, with per process/thread garbage collection, without shared memory, it is true that even in these systems, there will be pauses, but the impact area is confined, i.e, restricted to a single process/thread. Hence the system can still be functioning without this notion of the entire world going to a pause.

Kumar D

Kumar D

Software Developer. Tech Enthusiast. Loves coding 💻 and music 🎼.

Read More
Trouble with distributed systems - Process pauses
Share this