Part VIII · Data Systems Chapter 33

The Coordination Problem

Every time two parts of your system need to agree on something, you pay a tax. This chapter is about understanding that tax — and knowing when to pay it, when to avoid it, and how to design systems that minimize it without sacrificing correctness.

What's in this chapter

Key Learnings at a Glance

Read this if you're short on time. Come back to the full chapter for the why.

01

Distributed locks don't give you the safety guarantee you think. A process can acquire a lock, get paused (GC, OS scheduling), have the lock expire, and then resume — not knowing another process now also holds the lock. The fix is fencing tokens, not better lock implementations.

02

A fencing token is a monotonically increasing number you get when acquiring a lock. The resource you're protecting must reject writes with a lower token than it has already seen. This is what actually prevents two processes from corrupting each other.

03

The split-brain problem is what kills systems with leader election. Two nodes both believe they are the leader. Use epoch numbers — every leader gets an ever-increasing generation number, and followers reject messages from old leaders. ZooKeeper and etcd handle this for you if you let them.

04

Distributed rate limiting does not need to be exact. A user occasionally getting 10% over their API quota is not a crisis. Use a central Redis counter for simplicity, or per-server local counters with periodic sync for scale. Approximate is almost always good enough.

05

Distributed cron has one hard rule: make your jobs idempotent first, then worry about exactly-once scheduling. Most "distributed cron" complexity exists because jobs are not idempotent. Fix that, and at-least-once scheduling with deduplication becomes trivially safe.

06

Before adding a coordination point, ask: can I make this monotone? Operations that only add information (never retract) can run on every node with no coordination and still converge. CRDTs are the formalization of this idea. Shopping carts, counters, presence — these don't need consensus.

07

The most expensive coordination is the kind you don't know you have. Hidden coordination shows up as: a central database that every service reads before acting, a global sequence number generator, a "is this ID unique" check. Audit your critical path for these before you hit scale.

Distributed Locks

A lock sounds simple. You want one process to do something at a time. In a single machine, a mutex does this perfectly. In a distributed system, the problem gets surprisingly tricky.

The classic use case: you have multiple workers picking jobs off a queue. You want to make sure two workers don't pick the same job. Or you're sending an email notification — send it once, not twice. These seem like simple exclusion problems, and they are. The danger is thinking a distributed lock solves them the same way a mutex does.

The naive implementation and where it breaks

The most common distributed lock uses Redis. You do a SET key value NX EX 30 — set the key only if it doesn't exist, and expire it after 30 seconds. If the command succeeds, you have the lock. When you're done, you delete the key.

# Acquire lock (NX = only if not exists, EX = expire in 30s)
SET lock:payment_job_42 worker-A NX EX 30

# Do the work...

# Release lock (must only delete if YOU own it)
if GET lock:payment_job_42 == "worker-A":
    DEL lock:payment_job_42

This looks reasonable. The TTL protects against a worker crashing and holding the lock forever. The ownership check prevents a worker from accidentally deleting someone else's lock. So what's the problem?

The Process Pause Problem

Worker A acquires the lock. Then Worker A's JVM does a full garbage collection and pauses for 40 seconds. The lock's 30-second TTL expires. Worker B acquires the lock and starts working. Worker A's GC finishes — it still thinks it holds the lock. Now both A and B are doing "protected" work at the same time. The lock has failed silently.

This is not a hypothetical. JVM garbage collection pauses can last seconds. The OS can suspend a process at any time for scheduling. A VM can be live-migrated. A network call inside the lock can take longer than you expect. You cannot prevent a process from pausing, and you cannot prevent a lock from expiring while a process is paused.

Fencing Tokens: The Actual Fix

The insight that fixes this comes from thinking about what we really need. We don't just need mutual exclusion of lock acquisition. We need mutual exclusion of the effect — the actual write to storage. The lock service can't enforce that once it grants a lock. But the storage system can.

Here's how fencing tokens work. Every time a client acquires the lock, the lock service returns a fencing token — a number that increases with every lock grant. The client sends this token with every write to the storage service. The storage service tracks the highest token it has seen, and it rejects any write with a lower token.

Timeline with fencing tokens: Time 0: Worker A acquires lock → receives token 33 Time 1: Worker A starts writing (token 33 is accepted ✓) Time 15: Worker A pauses (GC pause) Time 30: Lock expires Time 31: Worker B acquires lock → receives token 34 Time 32: Worker B starts writing (token 34 is accepted ✓) Time 55: Worker A resumes, tries to write (token 33 < 34 → REJECTED ✗) Storage system saw token 34. It will never accept 33 again. Worker A's late write is safely blocked.

The beautiful part about this design: the lock service just needs to hand out monotonically increasing numbers. Zookeeper's transaction IDs (zxid) work perfectly for this. etcd's revision numbers work too. The real enforcement is done by the storage system, which is where the data lives anyway. This is the right separation of responsibility.

💡

On RedLock

Redis's creator proposed an algorithm called RedLock that uses multiple Redis instances to make locking safer. Martin Kleppmann published a detailed critique arguing that RedLock is still unsafe precisely because of process pauses and clock skew — the problems fencing tokens solve. If your storage system supports fencing tokens, you don't need RedLock. If it doesn't, RedLock doesn't save you either. Use a consensus system like etcd and fencing tokens instead.

When to Avoid Distributed Locks Entirely

Here's the best advice about distributed locks: use them as little as possible. Many places where engineers reach for a lock have a better alternative.

Use idempotency instead. If you're locking to prevent a payment from being processed twice, make the payment operation idempotent with an idempotency key. The database enforces uniqueness on the key. No lock needed.

Use optimistic concurrency instead. Instead of locking before reading, read a version number along with the data. When you write, assert that the version number hasn't changed. If it has (someone else updated it), retry. This is often called compare-and-swap (CAS). Databases support this natively. It works well when conflicts are rare. It performs much better than a lock because no process is blocked waiting — they just fail fast and retry.

Locks make sense when: you need to prevent two workers from doing expensive work in parallel for the same entity (even if the result is idempotent), or when the critical section involves multiple operations that must be treated as a unit and cannot be expressed as a single CAS.

Leader Election

Some problems genuinely need a single node to be in charge. A scheduler that assigns tasks to workers. A master node that handles all writes. A service that sends daily digest emails — you don't want five instances all sending the same email. For these, you need leader election: a way for a group of nodes to agree on which one is currently the leader.

The Split-Brain Problem

The nightmare scenario in leader election is split-brain: two nodes both believe they are the current leader. This is almost always worse than having no leader at all. With two "leaders" writing to the same data store, you get conflicting writes, data corruption, and duplicate actions (like sending the same email twice to every user).

How does split-brain happen? The most common cause is a network partition. Node A is the leader. The network partitions: half the cluster can reach A, the other half cannot. The other half elects B as the new leader. Now A and B are both leaders, serving different halves of the cluster.

The "I'm still the leader" Trap

A leader that becomes isolated in a network partition will often continue to believe it is the leader and keep serving requests. This is dangerous. A well-designed leader must step down if it cannot confirm it still has quorum. "Being unsure you're the leader" is a safe state. "Being wrong that you're the leader" is not.

Epoch Numbers: The Same Solution as Fencing Tokens

The fix mirrors the fencing token solution for locks. Each leader gets an ever-increasing epoch number (also called a generation number or term, depending on the system). When a new leader is elected, it gets a higher epoch than the previous leader.

Every message from a leader includes its epoch number. Followers reject any message from a leader with an epoch number lower than the highest they've seen. So if old-leader (epoch 4) and new-leader (epoch 5) both try to commit the same log entry, followers will accept epoch 5 and reject epoch 4. The old leader's writes are safely blocked.

This is exactly how Raft works. Leaders have terms. Old terms are powerless against nodes that have seen a newer term. The term number is the epoch number.

Implementing Leader Election in Practice

You almost never need to implement leader election from scratch. The right move is to use a system that already does consensus correctly — etcd, ZooKeeper, or Consul — and use its primitives.

With etcd, the pattern looks like this:

# Each node tries to create a lease-backed key
# Only one node will succeed

lease = etcd.grant(ttl=10)                # 10-second TTL
result = etcd.put_if_absent(
    key="/service/leader",
    value=my_node_id,
    lease=lease
)

if result.succeeded:
    # I am the leader. Start keepalive to refresh the lease.
    etcd.keepalive(lease)
    do_leader_work()
else:
    # Someone else is leader. Watch for changes.
    etcd.watch("/service/leader", on_change=try_acquire_leadership)

The lease is the key mechanism. It has a TTL. The leader must send a keepalive message before the TTL expires. If the leader dies, it stops sending keepalives, the lease expires, and another node can acquire the key. This gives you automatic failover.

The TTL is a trade-off. A short TTL (5 seconds) means fast failover but also means a momentary network hiccup could cause a leader to lose its lease and trigger an unnecessary election. A long TTL (30 seconds) means a dead leader takes longer to be replaced. Pick a value that matches your availability requirements.

The Thundering Herd on Leader Failure

When the leader fails, the lease expires, and the key becomes available. If you have 100 follower nodes, all 100 will try to acquire the key at the same instant. This is a thundering herd and it hammers your consensus system.

The fix is simple: each follower waits a random amount of time before trying to acquire the key. Most of them will see the key is already taken after the first node wins and can go back to watching. Raft uses the same trick for election timeouts.

Distributed Rate Limiting

Rate limiting protects your services from overload and enforces fairness. On a single server, this is easy — keep a counter in memory. On multiple servers, the problem is harder: a user's requests are spread across all your servers, and each server only sees part of the traffic.

A user with a limit of 100 requests per minute might send 60 to server A and 60 to server B. Each server individually thinks the user is under their limit. But together, the user has sent 120 requests — 20% over their limit. This is the core problem of distributed rate limiting.

The Algorithms

Before talking about distribution, understand the two most useful single-node algorithms.

Token Bucket

Imagine a bucket that holds up to N tokens. Tokens are added at a constant rate (say, 10 per second). Each request consumes one token. If the bucket is empty, the request is rejected.

The key property of token bucket is that it allows bursting. If a user hasn't made any requests for 10 seconds, their bucket fills up to capacity. They can then burst 100 requests all at once. This is usually the right behaviour — a user who has been quiet should be able to make a few requests quickly. Token bucket is forgiving of bursty but infrequent traffic.

Sliding Window Counter

Track a count of requests in the last N seconds. Every request increments the counter. When the counter exceeds the limit, reject. Use a sorted set in Redis with timestamps as scores to efficiently count events in a sliding window.

Sliding window is more precise than token bucket for enforcing "at most X requests per minute" semantics. It doesn't allow the same burst that token bucket does, but for many APIs that's exactly what you want.

Approaches to Distributing Rate Limiting

Approach Accuracy Performance Complexity
Centralized Redis counter High Medium — Redis RTT on every request Low
Local counter + periodic sync Approximate (~10% error) High — no network call per request Medium
Sticky routing High High Medium — requires consistent hashing at load balancer
Gossip-based propagation Approximate High High

Centralized Redis Counter (the right default)

All your servers talk to a shared Redis instance. When a request comes in, the server calls Redis to increment a counter and check if it's over the limit. This is accurate and simple to understand.

# Using a Lua script to make this atomic
local key = "ratelimit:" .. user_id .. ":" .. current_minute
local count = redis.call('INCR', key)
if count == 1 then
    redis.call('EXPIRE', key, 60)
end
return count

The main concern is Redis becoming a bottleneck. In practice, Redis handles hundreds of thousands of operations per second on modest hardware. For most systems, Redis rate limiting is fast enough. Make Redis highly available (Redis Cluster or a replica), and decide in advance what to do when Redis is down — fail open (allow the request) or fail closed (reject it). Failing open is usually the right default unless you have strict security requirements.

Local Counters with Periodic Sync (when Redis is too slow)

Each server keeps a local in-memory counter. Every few seconds, it pushes its local count to Redis and pulls the global total. Rate limiting decisions are made against the global total, updated periodically.

This means a user could temporarily exceed their limit by up to (sync interval × rate). If you sync every 5 seconds and the limit is 100 req/min, a user could burst to about 108 requests before being stopped. For most use cases, that's fine. For security-critical rate limiting (like OTP attempts or password resets), use the centralized approach.

"Rate limiting doesn't need to be a law. It needs to be a guardrail. A user who gets 5% more than their limit, briefly, is not a catastrophe. A rate limiter that adds 20ms to every request latency is."

Rate Limiting at Multiple Levels

In practice, you usually need several rate limits at once:

Model these as separate counters in Redis with separate keys. A request is allowed only if all applicable limits pass. Return a 429 Too Many Requests with a Retry-After header so clients know when they can try again.

Distributed Cron

Cron on a single machine is simple. A process wakes up at the scheduled time, runs the job, and goes back to sleep. Three problems make this insufficient in a distributed system.

First, the machine running cron can die. Your job won't run, and you might not notice for hours. Second, if you solve the first problem by running cron on multiple machines, every machine will run every job — duplicates everywhere. Third, if a job takes longer than its interval (a nightly report that takes 25 hours to finish), the next run starts before the last one finishes.

The Execution Guarantee Problem

Before you design your scheduling system, decide which guarantee you need:

📋

The Three Execution Guarantees

At-most-once: The job runs zero or one times. It might not run at all if the scheduler is down during the scheduled window. Safe for jobs where skipping is better than duplicating (e.g., sending a push notification).

At-least-once: The job runs at least once, possibly more. Safe only if the job is idempotent. Easy to implement.

Exactly-once: The job runs exactly once. Hardest to guarantee. Requires consensus-based deduplication. Usually not worth the complexity — instead, aim for at-least-once with idempotent jobs.

The Leader-Based Approach

The most common and practical approach: use the leader election mechanism from the previous section. Only the current leader schedules and triggers jobs. If the leader dies, a new leader takes over and handles missed jobs.

Leader-based distributed cron: ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ Scheduler │ │ Scheduler │ │ Scheduler │ │ Node A │ │ Node B │ │ Node C │ │ [LEADER] │ │ [FOLLOWER] │ │ [FOLLOWER] │ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │ │ │ │ Watch for leader key in etcd │ │◄─────────────────┼──────────────────┤ │ │ │ Only A schedules jobs. │ │ A dies → lease expires → B or C wins election. New leader checks last-run timestamps and fires missed jobs.

The new leader needs to handle the question: "was there a job that should have run while I was a follower and the old leader was dead?" Store the last successful run time for each job in your persistent store. When a new leader starts, compare last-run to the current time and schedule any jobs that were missed.

How many missed runs to re-execute is a business decision. For a job that runs every minute and the scheduler was down for 2 hours, do you run it 120 times? Probably not. Run it once (the most recent) and move on. Document this decision explicitly.

Make Jobs Idempotent — This is the Most Important Thing

Almost all complexity in distributed scheduling exists because jobs are not idempotent. A job that sends a weekly report email is dangerous to run twice. A job that re-calculates derived analytics from raw events is completely safe to run twice — you get the same result.

Design your jobs around idempotency:

Overlapping Runs

What happens if a job takes longer than its scheduled interval? Your 5-minute job runs for 12 minutes. Should a second instance start at the 5-minute mark?

For most jobs, the answer is no. Use a distributed lock with a TTL longer than the expected maximum job duration. When the scheduler tries to start the job, it first acquires the lock. If the lock is already held (previous run is still going), it skips this run. The lock releases automatically when the job finishes or when the TTL expires.

There is no universally right answer here — it depends on whether missing a run or running in parallel is worse for your use case. The important thing is making this decision explicitly, not accidentally.

Tools Worth Knowing

You don't need to build all of this from scratch. Several battle-tested tools handle the scheduling hard parts for you. Kubernetes CronJobs work well for containerized workloads (at-least-once semantics). Temporal is excellent for long-running, stateful workflows with complex retry and error-handling requirements. For simpler cases, a database table with a "next_run_at" column and a polling worker is often enough and much easier to operate.

The Coordination Tax

Every coordination point in your system is a tax. You pay it in three currencies: latency (waiting for the coordinator), availability (you can't proceed if the coordinator is down), and complexity (coordination code is hard to write correctly and hard to debug).

The most dangerous kind of coordination is the kind you don't see. It shows up as: a global sequence number generator that every service increments, a "check if unique" query against a central database before every write, a configuration service that must be available before any node can start. These seem innocuous. At scale, each one becomes a chokepoint.

The CALM Theorem: When Can You Avoid Coordination?

There's a theoretical result called CALM (Consistency As Logical Monotonicity) that gives us a precise answer to this question. A computation can run without coordination if and only if it is monotone — meaning it only ever adds information, never retracts it.

This sounds abstract but is very practical. Here's what it means:

Operation Monotone? Needs Coordination?
Incrementing a counter Yes No
Adding an item to a set Yes No
Recording "event X happened" Yes No
Removing an item from a set No Yes
"Is this username available?" check No Yes
Enforcing a maximum budget No Yes
Maintaining a sorted ranking No Yes

Before you design a coordination mechanism, ask: is this operation actually non-monotone? Sometimes a small redesign makes a non-monotone problem monotone. For example, instead of checking "is this username available?" (non-monotone — requires global uniqueness check), you can generate unique usernames by appending a UUID (monotone — just add, never check). Not always possible, but often overlooked.

CRDTs: Data Structures That Don't Need Coordination

CRDTs — Conflict-free Replicated Data Types — are data structures designed so that any replica can accept writes and merge with any other replica, without coordination, and the result is always correct. The merge function must satisfy three properties: commutative (A merge B = B merge A), associative ((A merge B) merge C = A merge (B merge C)), and idempotent (A merge A = A).

If these properties hold, you can replicate to any number of nodes, let each accept writes independently, and eventually propagate updates to all nodes. They will converge to the same state.

Practical CRDTs You Should Know

G-Counter (Grow-Only Counter): Each node has its own counter slot. To increment, increment your own slot. To read the total, sum all slots. Merging two replicas takes the max of each slot. Works perfectly for things like view counts, likes, and download counts.

G-Counter with 3 nodes: Node A's view: [A:12, B:8, C:5] → total: 25 Node B's view: [A:10, B:9, C:6] → total: 25 After merge: [A:12, B:9, C:6] → total: 27 (Take the max of each slot — no coordination needed)

OR-Set (Observed-Remove Set): Add or remove items from a set across replicas without conflicts. Each add gets a unique tag. A remove only removes the specific tagged entry it observed. Concurrent add and remove results in the item staying (add wins). Used in Amazon's shopping cart design.

LWW-Register (Last-Write-Wins Register): A single value where the most recent write wins. Each write gets a timestamp. Merge takes the value with the highest timestamp. Simple, widely used. The risk: concurrent writes can silently overwrite each other. Use only when losing a write is acceptable.

Where CRDTs Are a Natural Fit

Collaborative editing (text, whiteboards), real-time presence (who's online, who's typing), shopping carts, feature flag vote counts, distributed caches, multiplayer game state sync. All of these involve multiple writers and need eventual consistency without coordination. CRDTs are the right model — not just an optimization, but a correctness tool.

A Checklist Before Adding a Coordination Point

Use this before you design any mechanism that requires nodes to agree before proceeding:

Coordination is not inherently bad. Strict consistency genuinely requires it. The mistake is not questioning whether you need it. Most systems coordinate more than they need to, often because "this is how we've always done it." The systems that scale most gracefully are the ones where engineers asked "can we avoid this coordination?" for every single point of agreement — and only paid the tax where they had to.

All Chapters Table of Contents