You’ve probably implemented counters dozens of times. A simple count++
here, a database increment there. But what happens when your humble counter needs to handle millions of increments per second across multiple data centers? Welcome to the fascinating world of distributed counters.
Whether you’re building the next social media platform, tracking inventory for an e-commerce giant, or implementing rate limiting for your API, understanding distributed counters is essential. Let’s dive deep into the patterns, pitfalls, and practical solutions.
The Deceptively Simple Problem
At first glance, counting seems trivial:
1
2
3
// Easy, right?
int counter = 0;
counter++;
But in a distributed system, this becomes a nightmare. Imagine you have 100 servers, each handling user likes on a viral video. How do you keep an accurate count without:
- Creating a bottleneck
- Losing increments during network failures
- Double-counting during retries
- Blocking other operations
This is where things get interesting.
Real-World Battle Stories
Before we dive into solutions, let’s look at some real scenarios where distributed counters make or break systems:
X/Twitter’s Like Counter
When a tweet goes viral, thousands of users might like it simultaneously. Twitter can’t afford to:
- Block users while updating a single counter
- Show inconsistent like counts across the globe
- Lose likes during server failures
Reddit’s Upvote System
Reddit’s voting system needs to:
- Handle massive traffic spikes during breaking news
- Prevent vote manipulation
- Show consistent scores across regions
- Remain responsive under load
The Fundamental Challenges
1. The Hot Spot Problem
Hot Spot Alert
When all servers try to update the same database row, you get contention. Think of it like 100 people trying to write on the same piece of paper simultaneously.
1
2
-- This becomes a bottleneck quickly
UPDATE counters SET value = value + 1 WHERE id = 'viral_video_likes';
2. The Split-Brain Scenario
What happens when your servers can’t communicate with each other? You might end up with:
- Server A thinks the count is 1000
- Server B thinks it’s 1500
- The truth? Nobody knows.
3. The Retry Dilemma
Networks fail. When they do, should you retry the increment? But what if the first attempt actually succeeded? Now you’ve double-counted.
Architecture Pattern #1: Sharded Counters
The most popular solution is to split your counter into multiple shards.
count=245
count=189
count=203
count=167
How It Works
- Split the counter into N independent shards (usually 10-100)
- Randomly distribute increments across shards
- Sum all shards when you need the total count
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import redis.clients.jedis.Jedis;
public class ShardedCounter {
private final String name;
private final int numShards;
private final Jedis jedis;
private final Random random;
public ShardedCounter(String name, int numShards) {
this.name = name;
this.numShards = numShards;
this.jedis = new Jedis("localhost");
this.random = new Random();
}
public long increment(int amount) {
// Pick a random shard to avoid hot spots
int shardId = random.nextInt(numShards);
String shardKey = String.format("%s:shard:%d", name, shardId);
return jedis.incrBy(shardKey, amount);
}
public long increment() {
return increment(1);
}
public long getCount() {
// Sum all shards
long total = 0;
for (int i = 0; i < numShards; i++) {
String shardKey = String.format("%s:shard:%d", name, i);
String count = jedis.get(shardKey);
total += count != null ? Long.parseLong(count) : 0;
}
return total;
}
}
// Usage
ShardedCounter likesCounter = new ShardedCounter("video_123_likes", 20);
likesCounter.increment(); // Fast write
long totalLikes = likesCounter.getCount(); // Slightly slower read
Pros and Cons
Pros:
- Scales writes horizontally
- No single point of contention
- Works with existing databases
Cons:
- Reads are more expensive (must sum all shards)
- Eventually consistent (temporary inconsistencies)
- More complex than single counter
When to Use Sharded Counters
Perfect for:
- Social media likes/views
- Website analytics
- Non-critical metrics
Architecture Pattern #2: Local Aggregation with Batch Updates
Instead of immediately updating a central counter, each server keeps local counts and periodically syncs.
Batch Sync
Batch Sync
Batch Sync
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
public class LocalAggregatedCounter {
private final ConcurrentHashMap<String, AtomicLong> localCounters;
private final int syncIntervalSeconds;
private final ScheduledExecutorService scheduler;
public LocalAggregatedCounter(int syncIntervalSeconds) {
this.localCounters = new ConcurrentHashMap<>();
this.syncIntervalSeconds = syncIntervalSeconds;
this.scheduler = Executors.newScheduledThreadPool(1);
// Start background sync
scheduler.scheduleAtFixedRate(
this::syncToCentralStore,
syncIntervalSeconds,
syncIntervalSeconds,
TimeUnit.SECONDS
);
}
public void increment(String counterName, long amount) {
// Lightning fast - just update local memory
localCounters.computeIfAbsent(counterName, k -> new AtomicLong(0))
.addAndGet(amount);
}
public void increment(String counterName) {
increment(counterName, 1);
}
private void syncToCentralStore() {
localCounters.forEach((name, counter) -> {
long count = counter.getAndSet(0);
if (count > 0) {
updateCentralCounter(name, count);
}
});
}
private void updateCentralCounter(String name, long amount) {
// This could be Redis, MongoDB, PostgreSQL, etc.
System.out.printf("Syncing %s: +%d%n", name, amount);
// jedis.incrBy(name, amount);
}
public void shutdown() {
scheduler.shutdown();
}
}
The Write-Ahead Log Pattern
For critical counters, you can add durability:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import java.io.FileWriter;
import java.io.IOException;
import java.time.Instant;
public class DurableLocalCounter extends LocalAggregatedCounter {
private final String walFile;
public DurableLocalCounter(String walFile, int syncIntervalSeconds) {
super(syncIntervalSeconds);
this.walFile = walFile;
}
@Override
public void increment(String counterName, long amount) {
// Write to WAL first
writeToWAL(counterName, amount);
super.increment(counterName, amount);
}
private void writeToWAL(String name, long amount) {
// Simple write-ahead log
try (FileWriter writer = new FileWriter(walFile, true)) {
writer.write(String.format("%d,%s,%d%n",
Instant.now().getEpochSecond(), name, amount));
} catch (IOException e) {
System.err.println("Failed to write to WAL: " + e.getMessage());
}
}
}
Architecture Pattern #3: CRDTs (Conflict-Free Replicated Data Types)
CRDTs are mathematical structures that automatically resolve conflicts. They’re perfect for distributed counters that need to work during network partitions.
G-Counter (Grow-Only Counter)
Each node maintains a vector of counts for all nodes:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
public class GCounter {
private final String nodeId;
private final Set<String> nodes;
private final ConcurrentHashMap<String, Long> counts;
public GCounter(String nodeId, Set<String> nodes) {
this.nodeId = nodeId;
this.nodes = new HashSet<>(nodes);
this.counts = new ConcurrentHashMap<>();
// Initialize counts for all nodes
for (String node : nodes) {
counts.put(node, 0L);
}
}
public void increment(long amount) {
// Only increment your own counter
counts.compute(nodeId, (key, val) -> (val == null ? 0 : val) + amount);
}
public void increment() {
increment(1);
}
public long value() {
// Sum all node counts
return counts.values().stream().mapToLong(Long::longValue).sum();
}
public void merge(GCounter other) {
// Take the maximum for each node
for (String node : nodes) {
long otherCount = other.counts.getOrDefault(node, 0L);
counts.compute(node, (key, val) ->
Math.max(val == null ? 0 : val, otherCount));
}
}
public boolean compare(GCounter other) {
// Vector clock comparison
return nodes.stream().allMatch(node ->
counts.getOrDefault(node, 0L) >= other.counts.getOrDefault(node, 0L));
}
public Map<String, Long> getCounts() {
return new HashMap<>(counts);
}
}
// Usage across multiple nodes
Set<String> nodes = Set.of("server1", "server2", "server3");
// On server1
GCounter counter1 = new GCounter("server1", nodes);
counter1.increment(5);
// On server2
GCounter counter2 = new GCounter("server2", nodes);
counter2.increment(3);
// Merge when nodes communicate
counter1.merge(counter2);
System.out.println(counter1.value()); // 8
PN-Counter (Positive-Negative Counter)
For counters that can decrement:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import java.util.Set;
public class PNCounter {
private final GCounter positive;
private final GCounter negative;
public PNCounter(String nodeId, Set<String> nodes) {
this.positive = new GCounter(nodeId, nodes);
this.negative = new GCounter(nodeId, nodes);
}
public void increment(long amount) {
if (amount >= 0) {
positive.increment(amount);
} else {
negative.increment(-amount);
}
}
public void increment() {
increment(1);
}
public void decrement(long amount) {
increment(-amount);
}
public void decrement() {
decrement(1);
}
public long value() {
return positive.value() - negative.value();
}
public void merge(PNCounter other) {
positive.merge(other.positive);
negative.merge(other.negative);
}
}
Performance Comparison
Let’s look at how these patterns perform under different loads:
Centralized
Sharded
Local Aggregation
CRDT
Handling Edge Cases
The Thundering Herd Problem
When a counter becomes extremely popular, you might get a thundering herd:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import java.util.Random;
import java.util.concurrent.CompletableFuture;
public class ThunderingHerdProtection {
private final Counter baseCounter;
private final int jitterMs;
private final Random random;
public ThunderingHerdProtection(Counter baseCounter, int jitterMs) {
this.baseCounter = baseCounter;
this.jitterMs = jitterMs;
this.random = new Random();
}
public CompletableFuture<Long> incrementWithJitter(long amount) {
return CompletableFuture.supplyAsync(() -> {
// Add random delay to spread out requests
int jitter = random.nextInt(jitterMs);
try {
Thread.sleep(jitter);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
return baseCounter.increment(amount);
});
}
}
interface Counter {
long increment(long amount);
}
Circuit Breaker for Counter Operations
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import java.time.Instant;
public enum CircuitState {
CLOSED, OPEN, HALF_OPEN
}
public class CounterCircuitBreaker {
private final int failureThreshold;
private final long timeoutSeconds;
private int failureCount = 0;
private long lastFailureTime = 0;
private CircuitState state = CircuitState.CLOSED;
public CounterCircuitBreaker(int failureThreshold, long timeoutSeconds) {
this.failureThreshold = failureThreshold;
this.timeoutSeconds = timeoutSeconds;
}
public long increment(Counter counter, long amount) throws Exception {
if (state == CircuitState.OPEN) {
if (Instant.now().getEpochSecond() - lastFailureTime < timeoutSeconds) {
// Fail fast
throw new RuntimeException("Counter circuit breaker is OPEN");
} else {
state = CircuitState.HALF_OPEN;
}
}
try {
long result = counter.increment(amount);
if (state == CircuitState.HALF_OPEN) {
state = CircuitState.CLOSED;
failureCount = 0;
}
return result;
} catch (Exception e) {
failureCount++;
lastFailureTime = Instant.now().getEpochSecond();
if (failureCount >= failureThreshold) {
state = CircuitState.OPEN;
}
throw e;
}
}
}
Conclusion
Distributed counters are a perfect example of how “simple” problems become fascinatingly complex at scale. The key is understanding your requirements:
- Need strong consistency? Accept lower throughput
- Need high throughput? Accept eventual consistency
- Need partition tolerance? Consider CRDTs
- Need blazing speed? Use local aggregation
Further Reading
- CAP Theorem and Counters
- Redis Patterns: Distributed Locks
- CRDTs Paper by Marc Shapiro
- Google’s Spanner and TrueTime
Have you implemented distributed counters in production? What patterns worked best for your use case? Share your war stories in the comments!