I spent the last few months working through Designing Data-Intensive Applications. Not gunna lie, without an engineering background this was tough! But I did learn a lot, and I think it will be particularly useful for me when trying to understand why things went wrong (even if I won’t be able to proactively apply a lot of it without more practice).
This blog is a summary of my notes from the book.
The author sets the stage for the book by pointing out that most modern applications are data-intensive rather than compute-intensive, meaning that CPU power is rarely a limiting factor for these applications—bigger problems are usually the amount of data, the complexity of data, and the speed at which it is changing.
Increasingly, many applications now have requirements that are too demanding or wide-ranging for a single tool to meet all of its data processing and storage needs. Instead, tools focus on individual tasks, and many tools are used together in one system.
This can lead to complexity. For example, how do you ensure that the data remains correct and complete, even when things go wrong internally? How do you provide consistently good performance to clients, even when parts of your system are degraded? How do you scale to handle an increase in load? Or, what does a good API for the service look like?
To address these questions, the author highlights three important considerations: reliability, scalability, and maintainability:
Taking a closer look at reliability, this essentially means that the application performs the function the user expects, it can tolerate the user making mistakes or using the software in unexpected ways, the performance is good under the expected load and data volume, and the system prevents any unauthorized access or abuse.
The things that can go wrong are called faults, and systems that anticipate faults and can cope with them are called fault-tolerant or resilient. Note that a fault is different from a failure; A fault is usually defined as one component of the system deviating from its spec, whereas a failure is when the system as a whole stops providing the required service to the user. The goal is to be able to tolerate faults without causing failures.
Unfortunately, we can’t be resilient to all faults (i.e. a black hole swallows the earth). So we focus on certain types of faults.
Hardware faults are things like faulty RAM, a power grid blackout, or a hard disk crash. A first response to these kinds of faults is typically hardware redundancy, so that we have a backup if something goes wrong, which has traditionally worked well. However, as data volumes and applications’ computing demands have increased, more applications have begun using larger numbers of machines, which proportionally increases the rate of hardware faults. Moreover, in some cloud platforms such as Amazon Web Services (AWS) it is fairly common for virtual machine instances to become unavailable without warning, as the platforms are designed to prioritize flexibility and elasticity over single-machine reliability.
So, there is a move toward systems that can tolerate the loss of entire machines, by using software fault-tolerance techniques instead of or in addition to hardware redundancy.
While hardware faults tend to be random and independent from each other, software faults are systemic within the system. Such faults are harder to anticipate, and because they are correlated across nodes, they tend to cause many more system failures than uncorrelated hardware faults. Some examples of software faults include a bug that causes a server to crash when given a particular bad input, a runaway process that uses up some shared resource (like memory or disk space), or cascading failures, where a small fault in one component triggers a fault in another component, which in turn triggers further faults.
There is no quick solution to the problem of systematic faults in software, but lots of small things can help, like thorough testing, monitoring, and analyzing system behavior in production.
Finally, human error is actually responsible for the majority of outages. Some preventive measures include:
Next, turning to scalability, we first need to be able to describe load: Load can be described with a few numbers called load parameters, which differ depending on the architecture of your system. Some examples of load parameters include requests per second to a web server, the ratio of reads to writes in a database, the number of simultaneously active users in a chat room, the hit rate on a cache, etc. Load can either be described by the average case, or extreme cases.
Once we have a way to describe load, we then need clarity on how to describe performance. In a batch processing system such as Hadoop, we usually care about throughput—so, the number of records we can process per second, or the total time it takes to run a job on a dataset of a certain size. In online systems, what’s usually more important is the service’s response time (the time between a client sending a request and receiving a response). Note that response time will always vary slightly with each request, so it’s better to think of it as a distribution of values. Also, average response time is often reported, but this misses outliers (users who experienced a delay), so it’s typically better to use percentiles.
Scaling to accommodate increased load can be accomplished in multiple ways:
The author notes that the majority of the cost of software is not in its initial development, but in its ongoing maintenance—fixing bugs, keeping its systems operational, investigating failures, adapting it to new platforms, modifying it for new use cases, repaying technical debt, and adding new features.
There are a few design principles to minimize pain during maintenance:
First, operability. Operability aims to make life easy for the team responsible for things like monitoring the health of the system, keeping systems up to date, coordinating systems, anticipating future problems, and maintenance tasks. This can be accomplished by:
Next, we should (of course) avoid complex code (where complex code refers to things like a tight coupling of modules, tangled dependencies, inconsistent naming and terminology, and hacks aimed at solving performance problems). Complexity makes it easier to introduce bugs and harder for engineers to learn/maintain code, so simplicity is key to maintainability.
One of the best tools we have for removing accidental complexity is abstraction. A good abstraction can hide a great deal of implementation detail behind a clean, simple-to-understand facade.
Finally, evolvability refers to the ease with which you can modify a data system, and adapt it to changing requirements. From what I understand this is basically the same idea as agility, but for data-systems.
“A data model is an abstract model that organizes elements of data and standardizes how they relate to one another and to the properties of real-world entities.” In other words, it is how we choose to represent data, often in layers.
Adding to this, application code is often object-oriented but data is often stored in relational databases, leading to the need for object-relational mapping tools or complex translation layers.
Historically, data started out being represented as one big tree (the hierarchical model), but that wasn’t good for representing many-to-many relationships, so the relational model was invented to solve that problem.
The best-known relational model is SQL and relational databases.
More recently, developers found that some applications don’t fit well in the relational model either:
So, NoSQL came into being. These datastores have diverged in two main directions:
Document models target the use cases where data comes in self-contained documents, and relationships between one document and another are rare.
For example, if we think of a of resume or LinkedIn profile:
The main arguments in favor of the document data model are schema flexibility, better performance due to locality, and that for some applications it is closer to the data structures used by the application. If the data in your application has a document-like structure (i.e., a tree of one-to- many relationships, where typically the entire tree is loaded at once), then it’s probably a good idea to use a document model. The relational technique of shredding— splitting a document-like structure into multiple tables (like positions, education, and contact_info)—can lead to cumbersome schemas and unnecessarily complicated application code.
Graph-like data models go in the opposite direction, targeting use cases where anything is potentially related to everything. For example, social connections (vertices are people and edges represent connections between people) or a web graph (web pages are vertices and edges are hyperlinks connecting them).
If your data has lots of many-to-many relationships, graph models are the best bet. Graphs are good for evolvability: as you add features to your application, a graph can easily be extended to accommodate changes in your application’s data structures.
One thing that document and graph databases have in common is that they typically don’t enforce a schema for the data they store, which can make it easier to adapt applications to changing requirements. However, your application most likely still assumes that data has a certain structure; it’s just a question of whether the schema is explicit (enforced on write) or implicit (handled on read).
Also, in general the different data models are starting to converge, since many relational databases allow JSON, and some NoSQL allow joins, etc.
Like SQL, many query languages are declarative rather than imperative.
We also have MapReduce, which processes large volumes of data in bulk and is neither declarative nor fully imperative, but rather based on snippets of code called repeatedly. It is a fairly low-level programming language compared to other query languages.
Databases need to do two things: store data, and then retrieve it when requested.
One of the key factors in designing data systems is understanding your use case, and which kinds of queries reads/writes are going to need to be supported:
An index is an additional structure that is derived from the primary data. It can help to improve the performance of queries, but this will incur overhead, especially on writes. For this reason databases don’t index everything by default, but require the application developer to choose them manually.
There are two families of storage engines that these databases use:
These are both answers to limitations on disk space. In practice, most storage engines use some combinations of the above.
There are way more details about all of the above in the book. Here are several other resources with good notes 1, 2, and 3.
In chapter 1, the author introduced the concept of evolvability. Since applications change over time, we should aim to build systems that make it easy to adapt to that change.
In most chases, changing an application’s features also requires changing the data that it stores or uses. Maybe we need to add a new field, or change the way existing data is presented, etc..
Different data models have different approaches to dealing with this change. For example, relational databases generally assume that all data in the database conforms to one schema; Although that schema can be changed (through schema migrations; i.e., ALTER statements), there is exactly one schema in force at any one point in time. By contrast, schema-on-read (“schemaless”) databases don’t enforce a schema, so the database can contain a mixture of older and newer data formats written at different times.
When we have data or schema changes, we also often need to make changes to application code. However, this usually can’t happen all at once. For example, we may want to do a rolling upgrade to make sure everything is working properly as we go, and with client-side applications we have to wait for them to upgrade on their own. This means we need to be able to support both old and new versions of the code at the same time.
To facilitate this, we need forward and backward compatibility.
Forward compatibility means that older code can read data that was written by newer code, while backward compatibility means that newer code can read data that was written by older code.
Backward compatibility is normally not hard to achieve: as author of the newer code, you know the format of data written by older code, and so you can explicitly handle it (if necessary by simply keeping the old code to read the old data). Forward compatibility can be trickier, because it requires older code to ignore additions made by a newer version of the code.
More generally, we can think of compatibility as a relationship between one process that encodes the data, and another process that decodes it.
Programs usually work with data in (at least) two different representations:
So, we need some way to translate between these two representations. The translation from the in-memory representation to a byte sequence is called encoding (also known as serialization or marshalling), and the reverse is called decoding (or parsing, deserialization, and unmarshalling).
The first way to encode data is language-specific methods, such as “pickling” in python.
This is easy to do and requires minimal extra code, but it has some disadvantages:
Next are textual formats such as JSON, XML, and CSV. These have the advantage of being fairly human-readable, but also have some disadvantages:
A slightly more space-efficient option is binary encoding formats for JSON and XML, though this results in a loss of human-readability. Apache Thrift and Protocol Buffers (protobuf) are binary encoding libraries that are based on the same principle, though with some key differences:
First, they are more space efficient. Critically, they require a schema:
Apache Avro is another binary encoding format, but it is a bit different from Thrift and protobuf. Specifically, while Avro also uses a schema to specify the structure of the data being encoded, it has two schema languages: one (Avro IDL) intended for human editing, and one (based on JSON) that is more easily machine-readable. It does not use tag numbers, and there is nothing in the byte sequence to identify fields or their datatypes. The encoding simply consists of values concatenated together.
To parse the binary data, you go through the fields in the order that they appear in the schema and use the schema to tell you the datatype of each field. This means that the binary data can only be decoded correctly if the code reading the data is using the exact same schema as the code that wrote the data. Any mismatch in the schema between the reader and the writer would mean incorrectly decoded data.
So, how does Avro support schema evolution?
With Avro, forward compatibility means that you can have a new version of the schema as writer and an old version of the schema as reader. Conversely, backward compatibility means that you can have a new version of the schema as reader and an old version as writer.
This approach without tag numbers has the advantage of being able to have dynamically-generated schemas. Now, if the database schema changes (for example, a table has one column added and one column removed), you can just generate a new Avro schema from the updated database schema and export data in the new Avro schema. By contrast, if you were using Thrift or Protocol Buffers for this purpose, the field tags would likely have to be assigned by hand: every time the database schema changes, an administrator would have to manually update the mapping from database column names to field tags.
Now that we have an understanding of ways data can be encoded and decoded, we turn to how data can flow from one process to another (in other words, who is doing the encoding and decoding).
When you have processes that need to communicate over a network, there are a few different ways of arranging that communication. The most common arrangement is to have two roles: clients and servers. The servers expose an API over the network, and the clients can connect to the servers to make requests to that API. The API exposed by the server is known as a service.
For example, this is how the web works: clients (web browsers) make requests to web servers, making GET requests to download HTML, CSS, JavaScript, images, etc., and making POST requests to submit data to the server.
As another example, a native app running on a mobile device or a desktop computer can also make network requests to a server.
Or, a server can itself be a client to another service (for example, a typical web app server acts as client to a database). This approach is often used to decompose a large application into smaller services by area of functionality, such that one service makes a request to another when it requires some functionality or data from that other service. This way of building applications has traditionally been called a service-oriented architecture, also known as microservices architecture.
When HTTP is used as the underlying protocol for talking to the service, it is called a web service. REST APIs are currently the most widely used, but the book does go into detail about others (specifically, SOAP, and remote procedure call/RPCs). The author mostly talks about why he doesn’t like these, and I think we’re unlikely to come across them too much in our work, so I’ll leave out the details here.
Since APIs are often used by many people across organizations the provider of a service often has no control over its clients and cannot force them to upgrade. Thus, compatibility needs to be maintained for a long time, perhaps indefinitely. If a compatibility-breaking change is required, the service provider often ends up maintaining multiple versions of the service API side by side. There is no agreement on how API versioning should work (i.e., how a client can indicate which version of the API it wants to use). For RESTful APIs, common approaches are to use a version number in the URL or in the HTTP Accept header.
Asynchronous message-passing systems are somewhere between RPC and databases. They are similar to RPC in that a client’s request (usually called a message) is delivered to another process with low latency. They are similar to databases in that the message is not sent via a direct network connection, but goes via an intermediary called a message broker (also called a message queue or message-oriented middleware), which stores the message temporarily.
Using a message broker has several advantages compared to direct RPC:
However, a difference compared to RPC is that message-passing communication is usually one-way: a sender normally doesn’t expect to receive a reply to its messages. It is possible for a process to send a response, but this would usually be done on a separate channel. This communication pattern is asynchronous: the sender doesn’t wait for the message to be delivered, but simply sends it and then forgets about it.
While different message brokers operate differently, in general one process sends a message to a named queue or topic, and the broker ensures that the message is delivered to one or more consumers of or subscribers to that queue or topic. There can be many producers and many consumers on the same topic.
Message brokers typically don’t enforce any particular data model—a message is just a sequence of bytes with some metadata, so you can use any encoding format. If the encoding is backward and forward compatible, you have the greatest flexibility to change publishers and consumers independently and deploy them in any order.
Replication is keeping multiple copies of data in different locations. You might want to do this to improve availability (in case some parts of the system fails), latency (by bringing the data closer to users), and scalability (there are only so many reads and writes that a single copy of the data can handle).
One of the large trade offs when implementing replication is between consistency of the data, and speed of information retrieval. If consistency is the most important, write operations are costly since every write needs to be replicated to all/many of the copies. If writing needs to be frequent/cheap, there are more opportunities for data to be improperly replicated.
Replication can be done a few ways:
There are three main data paradigms described in the chapter:
Each of these options has their own set of challenges:
A big problem that can occur with replication is replication lag. This shows up in a few ways:
Handling write conflicts:
Reaching quorum: A general rule is that the number of reads + the number of writes exceeds the total number of nodes. This gives us a guarantee that for every read <> write pair, we have at least one node that it is written to that is being read from.
For very large datasets, or very high query throughput, replication is not sufficient: we need to break the data up into partitions, also known as sharding. Partitioning is necessary when you have so much data that storing and processing it on a single machine is no longer feasible.
Normally, partitions are defined in such a way that each piece of data (each record, row, or document) belongs to exactly one partition. In effect, each partition is a small database of its own, although the database may support operations that touch multiple partitions at the same time. Partitioning is usually combined with replication so that copies of each partition are stored on multiple nodes. This means that even though each record belongs to exactly one partition, it may still be stored on several different nodes for fault tolerance.
The main reason for wanting to partition data is scalability. Different partitions can be placed on different nodes in a shared-nothing cluster. Thus, a large dataset can be distributed across many disks, and the query load can be distributed across many processors. The goal of partitioning is to spread the data and query load evenly across multiple machines, avoiding hot spots (nodes with disproportionately high load).
For queries that operate on a single partition, each node can independently execute the queries for its own partition, so query throughput can be scaled by adding more nodes. Large, complex queries can potentially be parallelized across many nodes, although this gets significantly harder.
Our goal with partitioning is to spread the data and the query load evenly across nodes. If every node takes a fair share, then—in theory—10 nodes should be able to handle 10 times as much data and 10 times the read and write throughput of a single node (ignoring replication for now).
If the partitioning is unfair, so that some partitions have more data or queries than others, we call it skewed. The presence of skew makes partitioning much less effective. In an extreme case, all the load could end up on one partition, so 9 out of 10 nodes are idle and your bottleneck is the single busy node. A partition with disproportionately high load is called a hot spot.
The simplest approach for avoiding hot spots would be to assign records to nodes randomly. That would distribute the data quite evenly across the nodes, but it has a big disadvantage: when you’re trying to read a particular item, you have no way of knowing which node it is on, so you have to query all nodes in parallel.
If we assume we have a key-value data model where we always access records by PK, we have a few ways to partition data:
Key-range partitioning: Assign a continuous range of keys to each partition.
Hash partitioning: Because of this risk of skew and hot spots, many distributed datastores use a hash function to determine the partition for a given key. A good hash function takes skewed data and makes it uniformly distributed.
All of the above assumes there is only one primary key index. It gets more complicated if there are secondary indices. There are two main approaches to partitioning a database with secondary indexes:
Document-based partitioning: Each partition is completely different and maintains its own secondary index.
Term-based partitioning: Rather than each partition having its own secondary index (a local index), we can construct a global index that covers data in all partitions. However, we can’t just store that index on one node, since it would likely become a bottleneck and defeat the purpose of partitioning. A global index must also be partitioned, but it can be partitioned differently from the primary key index.
Any changes that require moving load from one node in the cluster to another are called rebalancing. For example, maybe we add a new node, or a failed node. The existing partitions need to be rebalanced to account for this.
There are three strategies for rebalancing:
No matter which partitioning scheme is used, rebalancing is usually expected to meet some minimum requirements:
Rebalancing can be done automatically or manually. A manual approach is recommended in the book.
Many things can go wrong in data systems. For example, the system may fail mid-operation, or multiple operations may happen at the same time, causing unexpected bugs.
Transactions have been the mechanism of choice for simplifying the issue of fault-tolerance.
A transaction is a way for an application to group several reads and writes together into a logical unit. All the reads and writes in a transaction are executed as one operation: either the entire transaction succeeds or it fails and is aborted or rolled back. If it fails, the application can safely retry.
Transactions can be for a single object (for example, updating a row in one table), or multiple objects (like updating a row in one table that has a foreign key reference to a row in another table).
With transactions, error handling becomes much simpler for an application, because it doesn’t need to worry about partial failure—i.e., the case where some operations succeed and some fail (for whatever reason).
Not every application needs transactions, though. For example, an application with very simple access patterns, such as reading and writing only a single record, can probably manage without transactions. However, for more complex access patterns, transactions can hugely reduce the number of potential error cases you need to think about.
The question is, how do you figure out whether you need transactions? In order to answer that question, we first need to understand exactly what safety guarantees transactions can provide, and what costs are associated with them.
The safety guarantees provided by transactions are often described by the well-known acronym ACID, which stands for Atomicity, Consistency, Isolation, and Durability. There is some ambiguity about the definitions of each of these terms, so one system that is ACID-compliant may be very different from another that claims the same thing. Generally, though:
Concurrency is tricky, so databases have long tried to hide concurrency issues from application developers by providing “transaction isolation.” In theory, isolation should make your life easier by letting you pretend that no concurrency is happening: serializable isolation means that the database guarantees that transactions have the same effect as if they ran serially.
However, serializable isolation has a performance cost, and many databases don’t want to pay that price. It’s therefore common for systems to use weaker levels of isolation, which protect against some concurrency issues, but not all. Even many popular relational database systems (which are usually considered “ACID”) use weak isolation.
Different levels of isolation protect against different levels of bugs:
Read Committed
The most basic level of transaction isolation is read committed. It makes two guarantees: When reading from the database, you will only see data that has been committed (no dirty reads), and when writing to the database, you will only overwrite data that has been committed (no dirty writes).
Dirty reads are when a transaction can see data that is being written by another transaction, but has not yet committed or aborted (basically, it isn’t final). Dirty writes are similar: One client overwrites data that another client has written, but not yet committed.
These are two good protections, but the read committed level of isolation does allow for read skew, also known as non-repeatable reads. This is when a client sees different parts of the database at different points in time. For example, taking a backup requires making a copy of the entire database, which may take hours on a large database. During the time that the backup process is running, writes will continue to be made to the database. Thus, you could end up with some parts of the backup containing an older version of the data, and other parts containing a newer version. If you need to restore from such a backup, the inconsistencies become permanent.
Snapshot Isolation
Read skew is most commonly prevented with snapshot isolation, which allows a transaction to only read from a consistent snapshot at one point in time.
In order to implement this, the database must potentially keep several different committed versions of an object, because various in-progress transactions may need to see the state of the database at different points in time. Because it maintains several versions of an object side by side, this technique is known as multi-version concurrency control (MVCC).
So, snapshot isolation protects against dirty reads, dirty writes, and read skew. But there are other kinds of conflicts that can occur with concurrent writes. For example, the lost update problem can occur if an application reads some value from the database, modifies it, and writes back the modified value (a read-modify-write cycle). If two transactions do this concurrently, one of the modifications can be lost, because the second write does not include the first modification. This can be addressed through:
Phantom reads are another potential issue. Phantom reads are when a transaction reads objects that match some search condition, and another client makes a write that affects the results of that search.
Snapshot isolation prevents straightforward phantom reads, but phantoms in the context of “write skew” require special treatment, such as index-range locks. Write skew is when you have some condition (such as, there must be at least one doctor on call) and writes are made to different objects at the same time, the condition can be violated (both doctors request to go at the same time, both are approved, and the expectations are broken). The anomalous behavior was only possible because the transactions ran concurrently. This is only preventable with serializable isolation.
Actual Serial Execution
Serializable isolation is usually regarded as the strongest isolation level. It guarantees that even though transactions may execute in parallel, the end result is the same as if they had executed one at a time, serially, without any concurrency. There are a few approaches. The most straightforward is to literally execute transactions in a serial order. In other words, we execute only one transaction at a time, in serial order, on a single thread.
In order for this to work, a few conditions must be met:
Generally, actual serial execution does not scale well.
Two-Phase Locking (2PL)
Another approach is two-phase locking, which was the standard for decades.
Several transactions are allowed to concurrently read the same object as long as nobody is writing to it. But as soon as anyone wants to write (modify or delete) an object, exclusive access is required. So A can’t read an object B is writing to until it is done, and A can’t write to an object while B is reading.
In 2PL, writers don’t just block other writers; they also block readers and vice versa.
The big downside of two-phase locking, and the reason why it hasn’t been used by everybody since the 1970s, is performance: transaction throughput and response times of queries are significantly worse under two-phase locking than under weak isolation.
Serializable Snapshot Isolation
Two-phase locking is a so-called pessimistic concurrency control mechanism: it is based on the principle that if anything might possibly go wrong (as indicated by a lock held by another transaction), it’s better to wait until the situation is safe again before doing anything.
Serial execution is even more pessimistic: it is essentially equivalent to each transaction having an exclusive lock on the entire database (or one partition of the database) for the duration of the transaction.
By contrast, serializable snapshot isolation is an optimistic concurrency control technique. Optimistic in this context means that instead of blocking if something potentially dangerous happens, transactions continue anyway, in the hope that everything will turn out all right. When a transaction wants to commit, the database checks whether anything bad happened (i.e., whether isolation was violated); if so, the transaction is aborted and has to be retried.
Only transactions that executed serializably are allowed to commit. Others are aborted.
It performs badly if there is high contention (many transactions trying to access the same objects), as this leads to a high proportion of transactions needing to abort. If the system is already close to its maximum throughput, the additional transaction load from retried transactions can make performance worse. However, if there is enough spare capacity, and if contention between transactions is not too high, optimistic concurrency control techniques tend to perform better than pessimistic ones.
When working with a single computer, we typically expect deterministic behavior: Either things work, or they don’t, and it’s typically pretty consistent. This is no longer the case with distributed systems, where many things can go wrong, either entirely or partially.
The fact that partial failures can occur is the defining characteristic of distributed systems. The difficulty is that partial failures are nondeterministic: it may sometimes work and sometimes unpredictably fail. The process could also randomly go slow, or not respond at all (and eventually time out).
Three common families of issues with distributed systems are problems with networks, clocks, process pauses.
The internet and most internal networks in data centers are “asynchronous packet networks.” In this kind of network, one node can send a message (called a packet) to another node, but the network gives no guarantees as to when it will arrive, or whether it will arrive at all. If you send a request and expect a response, many things could go wrong. For example, your request could get lost, could get stuck in a queue, the remote node may have failed or temporarily stopped responding, the remote node may have responded but the response got lost, or the response has been delayed.
If you send a request and don’t get a response, it’s not possible to distinguish exactly what happened out of any of the previous options.
The usual way of handling this issue is a timeout: after some time you give up waiting and assume that the response is not going to arrive. However, when a timeout occurs, you still don’t know whether the remote node got your request or not (and if the request is still queued somewhere, it may still be delivered to the recipient, even if the sender has given up on it).
In a distributed system, time is a tricky business, because communication is not instantaneous: it takes time for a message to travel across the network from one machine to another. The time when a message is received is always later than the time when it is sent, but due to variable delays in the network, we don’t know how much later. This fact sometimes makes it difficult to determine the order in which things happened when multiple machines are involved.
Moreover, each machine on the network has its own clock, which is an actual hardware device: usually a quartz crystal oscillator. These devices are not perfectly accurate, so each machine has its own notion of time, which may be slightly faster or slower than on other machines. They can also drift depending on things like temperature.
Modern computers have at least two different kinds of clocks: a time-of-day clock and a monotonic clock.
A time-of-day clock does what you intuitively expect of a clock: it returns the current date and time according to some calendar. Since each computer has its own, these rarely line up. It is possible to synchronize time-of-day clocks to some degree: the most commonly used mechanism is the Network Time Protocol (NTP), which allows the computer clock to be adjusted according to the time reported by a group of servers. The servers in turn get their time from a more accurate time source, such as a GPS receiver.
A monotonic clock is suitable for measuring a duration (like a time interval), such as a timeout or a service’s response time. You can check the value of the monotonic clock at one point in time, do something, and then check the clock again at a later time. The difference between the two values tells you how much time elapsed between the two checks. However, the absolute value of the clock is meaningless: it might be the number of nanoseconds since the computer was started, or something similarly arbitrary. In particular, it makes no sense to compare monotonic clock values from two different computers, because they don’t mean the same thing.
In a distributed system, using a monotonic clock for measuring elapsed time (e.g., timeouts) is usually fine, because it doesn’t assume any synchronization between different nodes’ clocks and is not sensitive to slight inaccuracies of measurement.
The problem with clocks is that while they seem simple and easy to use, they have a surprising number of pitfalls: a day may not have exactly 86,400 seconds, time-of-day clocks may move backward in time (if they are re-synced using NTP), and the time on one node may be quite different from the time on another node.
This can be important for applications that rely on clocks, such as ordering events across multiple nodes, which is needed for things like deciding which of two writes happened most recently. In systems that use LWW (last write wins) we could have a major issue if the clocks aren’t synced properly. We could also lose data without knowing it if a node with fast clock overwrites data from a node with a slower clock.
Generally, it doesn’t make sense to think of a clock reading as a point in time—it is more like a range of times, within a confidence interval: for example, a system may be 95% confident that the time now is between 10.3 and 10.5 seconds past the minute.
Another time-related family of issues is process pauses. In this case, consider a node whose execution is paused, even in the middle of some function. During the pause, the rest of the nodes keep up their work, and may even declare the paused node dead since it isn’t responding. Eventually, the paused node may continue running, without even noticing that it was asleep until it checks its clock sometime later. It might overwrite work or duplicate some process when it wakes up.
In some environments, a situation like this (or any failure to respond within a specified time) can cause serious damage: computers that control aircraft, rockets, robots, cars, and other physical objects must respond quickly and predictably to their sensor inputs. In these systems, there is a specified deadline by which the software must respond; if it doesn’t meet the deadline, that may cause a failure of the entire system. These are so-called hard real-time systems. But implementing this is expensive, so rarely done unless totally necessary.
To tolerate faults, the first step is to detect them, but even that is hard. Most systems don’t have an accurate mechanism of detecting whether a node has failed, so most distributed algorithms rely on timeouts to determine whether a remote node is still available.
If you’re going to use timeouts, the next question is how long the timeout should be. There’s a trade-off here because a long timeout means a long wait until a node is declared dead (and during this time, users may have to wait or see error messages). A short timeout detects faults faster, but carries a higher risk of incorrectly declaring a node dead when in fact it has only suffered a temporary slowdown.
Prematurely declaring a node dead is problematic, because if the node is actually alive and in the middle of performing some action (for example, sending an email), and another node takes over, the action may end up being performed twice (like we talked about with process pauses).
Also, when a node is declared dead, its responsibilities need to be transferred to other nodes, which places additional load on other nodes and the network. If the system is already struggling with high load, declaring nodes dead prematurely can make the problem worse. In particular, it could happen that the node actually wasn’t dead but only slow to respond due to overload; transferring its load to other nodes can cause a cascading failure (in the extreme case, all nodes declare each other dead, and everything stops working).
It’s hard to say exactly the perfect length of time to wait, because there is often a lot of variability in packet delays. This is often caused by queuing (basically, too much traffic).
In such environments, you can only choose timeouts experimentally: measure the distribution of network round-trip times over an extended period, and over many machines, to determine the expected variability of delays. Then, taking into account your application’s characteristics, you can determine an appropriate trade-off between failure detection delay and risk of premature timeouts.
Even better, rather than using configured constant timeouts, systems can continually measure response times and their variability (jitter), and automatically adjust timeouts according to the observed response time distribution.
Another issue entirely is that sometimes a node can be in a degraded state: for example, a Gigabit network interface could suddenly drop to 1 Kb/s throughput due to a driver bug. Such a node that is “limping” but not dead can be even more difficult to deal with than a cleanly failed node.
Once a fault is detected, making a system tolerate it is not easy either: there is no global variable, no shared memory, no common knowledge or any other kind of shared state between the machines. In order to make decisions, we need a quorum among the nodes:
Distributed systems problems become much harder if there is a risk that nodes may “lie” (send arbitrary faulty or corrupted responses)—for example, if a node may claim to have received a particular message when in fact it didn’t. Such behavior is known as a Byzantine fault. However, the author says that in the kinds of systems we discuss in this book, we can usually safely assume that there are no Byzantine faults.
There are various system models that outline what kinds of faults are expected in a system.
The following models consider timing delays:
Besides timing issues, we have to consider node failures. The three most common system models for nodes are:
For modeling real systems, the partially synchronous model with crash-recovery faults is generally the most useful model.
Now that we covered potential faults in distributed systems in the last chapter, this chapter is about tolerating those faults.
If you look at two database nodes at the same moment in time, you’re likely to see different data on the two nodes, because write requests arrive at different times. These inconsistencies occur no matter what replication method the database uses (single-leader, multi-leader, or leaderless replication). Most replicated databases provide at least eventual consistency, which means that if you stop writing to the database and wait for some unspecified length of time, then eventually all read requests will return the same value. In other words, the inconsistency is temporary, and it eventually resolves itself. But it doesn’t say anything about when this will happen.
When working with a database that provides only weak guarantees, you need to be constantly aware of its limitations and not accidentally assume too much. Bugs are often subtle and hard to find by testing, because the application may work well most of the time. The edge cases of eventual consistency only become apparent when there is a fault in the system (e.g., a network interruption) or at high concurrency.
However, stronger guarantees come at the cost of performance and fault tolerance.
Linearizability (also known as atomic consistency, strong consistency, immediate consistency, or external consistency) is a popular consistency model: its goal is to make replicated data appear as though there were only a single copy, and to make all operations act on it atomically.
In a linearizable system, as soon as one client successfully completes a write, all clients reading from the database must be able to see the value just written. Maintaining the illusion of a single copy of the data means guaranteeing that the value read is the most recent, up-to-date value, and doesn’t come from a stale cache or replica. In other words, linearizability is a recency guarantee.
Linearizability is not the same as serializability. Serializability is an isolation property of transactions, where every transaction may read and write multiple objects (rows, documents, records). It guarantees that transactions behave the same as if they had executed in some serial order (each transaction running to completion before the next transaction starts). It is okay for that serial order to be different from the order in which transactions were actually run.
Linearizability is a recency guarantee on reads and writes of a register (an individual object). It doesn’t group operations together into transactions, so it does not prevent problems such as write skew, unless you take additional measures such as materializing conflicts.
A database may provide both serializability and linearizability, and this combination is known as strict serializability or strong one-copy serializability
Although linearizability is appealing because it is easy to understand—it makes a database behave like a variable in a single-threaded program—it has the downside of being slow, especially in environments with large network delays. It is also less tolerant to network problems.
The Unhelpful CAP Theorem
CAP is sometimes presented as Consistency, Availability, Partition tolerance: pick two out of three. Unfortunately, putting it this way is misleading because network partitions are a kind of fault, so they aren’t something about which you have a choice: they will happen whether you like it or not.
At times when the network is working correctly, a system can provide both consistency (linearizability) and total availability. When a network fault occurs, you have to choose between either linearizability or total availability. Thus, a better way of phrasing CAP would be either Consistent or Available when Partitioned. A more reliable network needs to make this choice less often, but at some point the choice is inevitable.
In discussions of CAP there are several contradictory definitions of the term availability, and the formalization as a theorem does not match its usual meaning. Many so-called “highly available” (fault-tolerant) systems actually do not meet CAP’s idiosyncratic definition of availability.
The CAP theorem as formally defined is also of very narrow scope: it only considers one consistency model (namely linearizability) and one kind of fault (network partitions, or nodes that are alive but disconnected from each other). It doesn’t say anything about network delays, dead nodes, or other trade-offs. Thus, although CAP has been historically influential, there is a lot of misunderstanding and confusion around CAP, and it does not help us understand systems better, so it is best avoided.
Causality is another consistency model which imposes an ordering on events in a system (what happened before what, based on cause and effect). Unlike linearizability, which puts all operations in a single, totally ordered timeline, causality provides us with a weaker consistency model: some things can be concurrent, so the version history is like a timeline with branching and merging. Causal consistency does not have the coordination overhead of linearizability and is much less sensitive to network problems.
However, even if we capture the causal ordering, some things cannot be implemented this way: For example, ensuring that a username is unique and rejecting concurrent registrations for the same username. If one node is going to accept a registration, it needs to somehow know that another node isn’t concurrently in the process of registering the same name. This problem is why we often need consensus.
Achieving consensus means deciding something in such a way that all nodes agree on what was decided, and such that the decision is irrevocable. For example, a database must decide whether to commit or abort a distributed transaction, or a messaging system must decide on the order in which to deliver messages.
These issues are straightforward if you only have a single node, or if you are willing to assign the decision-making capability to a single node. This is what happens in a single-leader database: all the power to make decisions is vested in the leader, which is why such databases are able to provide linearizable operations, uniqueness constraints, a totally ordered replication log, and more.
However, if that single leader fails, or if a network interruption makes the leader unreachable, such a system becomes unable to make any progress. There are three ways of handling that situation:
Wait for the leader to recover, and accept that the system will be blocked in the meantime. This approach does not fully solve consensus because it does not satisfy the termination property: if the leader does not recover, the system can be blocked forever. Manually fail over by getting humans to choose a new leader node and reconfigure the system to use it. Many relational databases take this approach. The speed of failover is limited by the speed at which humans can act, which is generally slower than computers. Use an algorithm to automatically choose a new leader. This approach requires a consensus algorithm, and it is advisable to use a proven algorithm that correctly handles adverse network conditions
Although a single-leader database can provide linearizability without executing a consensus algorithm on every write, it still requires consensus to maintain its leadership and for leadership changes. Thus, in some sense, having a leader only “kicks the can down the road”: consensus is still required, only in a different place, and less frequently. The good news is that fault-tolerant algorithms and systems for consensus exist.
Tools like ZooKeeper play an important role in providing an “outsourced” consensus, failure detection, and membership service that applications can use.
Note that not every system necessarily requires consensus: for example, leaderless and multi-leader replication systems typically do not use global consensus. The conflicts that occur in these systems are a consequence of not having consensus across different leaders, but maybe that’s okay: maybe we simply need to cope without linearizability and learn to work better with data that has branching and merging version histories.
Engineering batch processes are where you have some process scheduled periodically with the goal of optimizing for throughput vs. latency. For example, a pre-computed search index.
The chapter motivates batch processing with unix principles:
These principles are carried over into MapReduce, where instead of the immutable type being a text file as in the unix case, it is a distributed database with records.
MapReduce works as follows:
For example, if you wanted to compute the average age for users in a country. Our Map function would get a key value pair for each record, where the key is the user’s country code and the value is the user’s age. These keys are then sorted within each partition by country code, and all the users with the same country code are moved to the same partition. Finally, the reducer function aggregates the average by country.
This is a very flexible paradigm that can be used to solve lots of different problems. It allows for computation across multiple nodes, and better fault tolerance, since each step is written to disk and jobs can be re-started.
Joins can also be done:
Limitations of MapReduce:
In reality, a lot of data is unbounded because it arrives gradually over time: your users produced data yesterday and today, and they will continue to produce more data tomorrow. Unless you go out of business, this process never ends, and so the dataset is never “complete” in any meaningful way. Thus, batch processors must artificially divide the data into chunks of fixed duration: for example, processing a day’s worth of data at the end of every day, or processing an hour’s worth of data at the end of every hour.
The problem with daily batch processes is that changes in the input are only reflected in the output a day later, which is too slow for many impatient users. To reduce the delay, we can run the processing more frequently—say, processing a second’s worth of data at the end of every second—or even continuously, abandoning the fixed time slices entirely and simply processing every event as it happens. That is the idea behind stream processing.
In the batch processing world, the inputs and outputs of a job are files (perhaps on a distributed filesystem). For stream processing, if the input is a file (a sequence of bytes), the first processing step is usually to parse it into a sequence of records. In a stream processing context, a record is more commonly known as an event, but it is essentially the same thing: a small, self- contained, immutable object containing the details of something that happened at some point in time. An event usually contains a timestamp indicating when it happened according to a time-of-day clock
In a streaming system, related events are usually grouped together into a topic or stream.
A common approach for notifying consumers of topics about new events is to use a messaging system: a producer sends a message containing the event, which is then pushed to consumers.
A widely used approach is to send messages via a message broker (also known as a message queue), which is essentially a kind of database that is optimized for handling message streams. It runs as a server, with producers and consumers connecting to it as clients. Producers write messages to the broker, and consumers receive them by reading them from the broker.
The chapter then goes into a bunch of detail on the different types of message brokers, fault tolerance in stream processing frameworks, the difficulties of reasoning about time in a stream processor, and joins with streams. I’m skipping all that here because I am lazy, but there are lots of notes here, here, and here.
Once you have a stream, you can process it. There are three main ways to do this:
Tags: mlops
Updated: May 17, 2022