You’ve probably implemented counters dozens of times. A simple count++ here, a database increment there. But what happens when your humble counter needs to handle millions of increments per second across multiple data centers? Welcome to the fascinating world of distributed counters.

Whether you’re building the next social media platform, tracking inventory for an e-commerce giant, or implementing rate limiting for your API, understanding distributed counters is essential. Let’s dive deep into the patterns, pitfalls, and practical solutions.

The Deceptively Simple Problem

At first glance, counting seems trivial:

1
2
3
// Easy, right?
int counter = 0;
counter++;

But in a distributed system, this becomes a nightmare. Imagine you have 100 servers, each handling user likes on a viral video. How do you keep an accurate count without:

  • Creating a bottleneck
  • Losing increments during network failures
  • Double-counting during retries
  • Blocking other operations

This is where things get interesting.

Real-World Battle Stories

Before we dive into solutions, let’s look at some real scenarios where distributed counters make or break systems:

X/Twitter’s Like Counter

When a tweet goes viral, thousands of users might like it simultaneously. Twitter can’t afford to:

  • Block users while updating a single counter
  • Show inconsistent like counts across the globe
  • Lose likes during server failures

Reddit’s Upvote System

Reddit’s voting system needs to:

  • Handle massive traffic spikes during breaking news
  • Prevent vote manipulation
  • Show consistent scores across regions
  • Remain responsive under load

The Fundamental Challenges

1. The Hot Spot Problem

Hot Spot Alert

When all servers try to update the same database row, you get contention. Think of it like 100 people trying to write on the same piece of paper simultaneously.

1
2
-- This becomes a bottleneck quickly
UPDATE counters SET value = value + 1 WHERE id = 'viral_video_likes';

2. The Split-Brain Scenario

What happens when your servers can’t communicate with each other? You might end up with:

  • Server A thinks the count is 1000
  • Server B thinks it’s 1500
  • The truth? Nobody knows.

3. The Retry Dilemma

Networks fail. When they do, should you retry the increment? But what if the first attempt actually succeeded? Now you’ve double-counted.

Architecture Pattern #1: Sharded Counters

The most popular solution is to split your counter into multiple shards.

Client Requests
Load Balancer
Shard 0
count=245
Shard 1
count=189
Shard 2
count=203
Shard 3
count=167
Aggregator
Total: 804

How It Works

  1. Split the counter into N independent shards (usually 10-100)
  2. Randomly distribute increments across shards
  3. Sum all shards when you need the total count
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import redis.clients.jedis.Jedis;

public class ShardedCounter {
    private final String name;
    private final int numShards;
    private final Jedis jedis;
    private final Random random;
    
    public ShardedCounter(String name, int numShards) {
        this.name = name;
        this.numShards = numShards;
        this.jedis = new Jedis("localhost");
        this.random = new Random();
    }
    
    public long increment(int amount) {
        // Pick a random shard to avoid hot spots
        int shardId = random.nextInt(numShards);
        String shardKey = String.format("%s:shard:%d", name, shardId);
        
        return jedis.incrBy(shardKey, amount);
    }
    
    public long increment() {
        return increment(1);
    }
    
    public long getCount() {
        // Sum all shards
        long total = 0;
        for (int i = 0; i < numShards; i++) {
            String shardKey = String.format("%s:shard:%d", name, i);
            String count = jedis.get(shardKey);
            total += count != null ? Long.parseLong(count) : 0;
        }
        return total;
    }
}

// Usage
ShardedCounter likesCounter = new ShardedCounter("video_123_likes", 20);
likesCounter.increment();  // Fast write
long totalLikes = likesCounter.getCount();  // Slightly slower read

Pros and Cons

Pros:

  • Scales writes horizontally
  • No single point of contention
  • Works with existing databases

Cons:

  • Reads are more expensive (must sum all shards)
  • Eventually consistent (temporary inconsistencies)
  • More complex than single counter

When to Use Sharded Counters

Perfect for:

  • Social media likes/views
  • Website analytics
  • Non-critical metrics

Architecture Pattern #2: Local Aggregation with Batch Updates

Instead of immediately updating a central counter, each server keeps local counts and periodically syncs.

Server 1
Local: 45
Every 30s
Batch Sync
Server 2
Local: 67
Every 30s
Batch Sync
Server 3
Local: 44
Every 30s
Batch Sync
↓ ↓ ↓
Periodic Batch Updates
Central Store
Total: 156
Durable & Consistent
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

public class LocalAggregatedCounter {
    private final ConcurrentHashMap<String, AtomicLong> localCounters;
    private final int syncIntervalSeconds;
    private final ScheduledExecutorService scheduler;
    
    public LocalAggregatedCounter(int syncIntervalSeconds) {
        this.localCounters = new ConcurrentHashMap<>();
        this.syncIntervalSeconds = syncIntervalSeconds;
        this.scheduler = Executors.newScheduledThreadPool(1);
        
        // Start background sync
        scheduler.scheduleAtFixedRate(
            this::syncToCentralStore, 
            syncIntervalSeconds, 
            syncIntervalSeconds, 
            TimeUnit.SECONDS
        );
    }
    
    public void increment(String counterName, long amount) {
        // Lightning fast - just update local memory
        localCounters.computeIfAbsent(counterName, k -> new AtomicLong(0))
                    .addAndGet(amount);
    }
    
    public void increment(String counterName) {
        increment(counterName, 1);
    }
    
    private void syncToCentralStore() {
        localCounters.forEach((name, counter) -> {
            long count = counter.getAndSet(0);
            if (count > 0) {
                updateCentralCounter(name, count);
            }
        });
    }
    
    private void updateCentralCounter(String name, long amount) {
        // This could be Redis, MongoDB, PostgreSQL, etc.
        System.out.printf("Syncing %s: +%d%n", name, amount);
        // jedis.incrBy(name, amount);
    }
    
    public void shutdown() {
        scheduler.shutdown();
    }
}

The Write-Ahead Log Pattern

For critical counters, you can add durability:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import java.io.FileWriter;
import java.io.IOException;
import java.time.Instant;

public class DurableLocalCounter extends LocalAggregatedCounter {
    private final String walFile;
    
    public DurableLocalCounter(String walFile, int syncIntervalSeconds) {
        super(syncIntervalSeconds);
        this.walFile = walFile;
    }
    
    @Override
    public void increment(String counterName, long amount) {
        // Write to WAL first
        writeToWAL(counterName, amount);
        super.increment(counterName, amount);
    }
    
    private void writeToWAL(String name, long amount) {
        // Simple write-ahead log
        try (FileWriter writer = new FileWriter(walFile, true)) {
            writer.write(String.format("%d,%s,%d%n", 
                Instant.now().getEpochSecond(), name, amount));
        } catch (IOException e) {
            System.err.println("Failed to write to WAL: " + e.getMessage());
        }
    }
}

Architecture Pattern #3: CRDTs (Conflict-Free Replicated Data Types)

CRDTs are mathematical structures that automatically resolve conflicts. They’re perfect for distributed counters that need to work during network partitions.

Node A
A:5, B:3, C:2
Node B
A:4, B:4, C:2
Node C
A:4, B:3, C:3
↔ Continuous Replication ↔
Conflict Resolution
Max of each: A:5, B:4, C:3
Converged Result
Final Count: 12
Eventually Consistent Across All Nodes

G-Counter (Grow-Only Counter)

Each node maintains a vector of counts for all nodes:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;

public class GCounter {
    private final String nodeId;
    private final Set<String> nodes;
    private final ConcurrentHashMap<String, Long> counts;
    
    public GCounter(String nodeId, Set<String> nodes) {
        this.nodeId = nodeId;
        this.nodes = new HashSet<>(nodes);
        this.counts = new ConcurrentHashMap<>();
        
        // Initialize counts for all nodes
        for (String node : nodes) {
            counts.put(node, 0L);
        }
    }
    
    public void increment(long amount) {
        // Only increment your own counter
        counts.compute(nodeId, (key, val) -> (val == null ? 0 : val) + amount);
    }
    
    public void increment() {
        increment(1);
    }
    
    public long value() {
        // Sum all node counts
        return counts.values().stream().mapToLong(Long::longValue).sum();
    }
    
    public void merge(GCounter other) {
        // Take the maximum for each node
        for (String node : nodes) {
            long otherCount = other.counts.getOrDefault(node, 0L);
            counts.compute(node, (key, val) -> 
                Math.max(val == null ? 0 : val, otherCount));
        }
    }
    
    public boolean compare(GCounter other) {
        // Vector clock comparison
        return nodes.stream().allMatch(node -> 
            counts.getOrDefault(node, 0L) >= other.counts.getOrDefault(node, 0L));
    }
    
    public Map<String, Long> getCounts() {
        return new HashMap<>(counts);
    }
}

// Usage across multiple nodes
Set<String> nodes = Set.of("server1", "server2", "server3");

// On server1
GCounter counter1 = new GCounter("server1", nodes);
counter1.increment(5);

// On server2 
GCounter counter2 = new GCounter("server2", nodes);
counter2.increment(3);

// Merge when nodes communicate
counter1.merge(counter2);
System.out.println(counter1.value());  // 8

PN-Counter (Positive-Negative Counter)

For counters that can decrement:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import java.util.Set;

public class PNCounter {
    private final GCounter positive;
    private final GCounter negative;
    
    public PNCounter(String nodeId, Set<String> nodes) {
        this.positive = new GCounter(nodeId, nodes);
        this.negative = new GCounter(nodeId, nodes);
    }
    
    public void increment(long amount) {
        if (amount >= 0) {
            positive.increment(amount);
        } else {
            negative.increment(-amount);
        }
    }
    
    public void increment() {
        increment(1);
    }
    
    public void decrement(long amount) {
        increment(-amount);
    }
    
    public void decrement() {
        decrement(1);
    }
    
    public long value() {
        return positive.value() - negative.value();
    }
    
    public void merge(PNCounter other) {
        positive.merge(other.positive);
        negative.merge(other.negative);
    }
}

Performance Comparison

Let’s look at how these patterns perform under different loads:

Centralized

1K ops/sec
✓ Excellent
10K ops/sec
⚠ Caution
100K ops/sec
✗ Poor
1M ops/sec
✗ Poor

Sharded

1K ops/sec
✓ Excellent
10K ops/sec
✓ Excellent
100K ops/sec
✓ Excellent
1M ops/sec
⚠ Caution

Local Aggregation

1K ops/sec
✓ Excellent
10K ops/sec
✓ Excellent
100K ops/sec
✓ Excellent
1M ops/sec
✓ Excellent

CRDT

1K ops/sec
✓ Excellent
10K ops/sec
✓ Excellent
100K ops/sec
⚠ Caution
1M ops/sec
✗ Poor
✓ Excellent
Handles load effortlessly
⚠ Caution
Works with tuning
✗ Poor
Not recommended

Handling Edge Cases

The Thundering Herd Problem

When a counter becomes extremely popular, you might get a thundering herd:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import java.util.Random;
import java.util.concurrent.CompletableFuture;

public class ThunderingHerdProtection {
    private final Counter baseCounter;
    private final int jitterMs;
    private final Random random;
    
    public ThunderingHerdProtection(Counter baseCounter, int jitterMs) {
        this.baseCounter = baseCounter;
        this.jitterMs = jitterMs;
        this.random = new Random();
    }
    
    public CompletableFuture<Long> incrementWithJitter(long amount) {
        return CompletableFuture.supplyAsync(() -> {
            // Add random delay to spread out requests
            int jitter = random.nextInt(jitterMs);
            try {
                Thread.sleep(jitter);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
            
            return baseCounter.increment(amount);
        });
    }
}

interface Counter {
    long increment(long amount);
}

Circuit Breaker for Counter Operations

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import java.time.Instant;

public enum CircuitState {
    CLOSED, OPEN, HALF_OPEN
}

public class CounterCircuitBreaker {
    private final int failureThreshold;
    private final long timeoutSeconds;
    private int failureCount = 0;
    private long lastFailureTime = 0;
    private CircuitState state = CircuitState.CLOSED;
    
    public CounterCircuitBreaker(int failureThreshold, long timeoutSeconds) {
        this.failureThreshold = failureThreshold;
        this.timeoutSeconds = timeoutSeconds;
    }
    
    public long increment(Counter counter, long amount) throws Exception {
        if (state == CircuitState.OPEN) {
            if (Instant.now().getEpochSecond() - lastFailureTime < timeoutSeconds) {
                // Fail fast
                throw new RuntimeException("Counter circuit breaker is OPEN");
            } else {
                state = CircuitState.HALF_OPEN;
            }
        }
        
        try {
            long result = counter.increment(amount);
            if (state == CircuitState.HALF_OPEN) {
                state = CircuitState.CLOSED;
                failureCount = 0;
            }
            return result;
        } catch (Exception e) {
            failureCount++;
            lastFailureTime = Instant.now().getEpochSecond();
            
            if (failureCount >= failureThreshold) {
                state = CircuitState.OPEN;
            }
            
            throw e;
        }
    }
}

Conclusion

Distributed counters are a perfect example of how “simple” problems become fascinatingly complex at scale. The key is understanding your requirements:

  • Need strong consistency? Accept lower throughput
  • Need high throughput? Accept eventual consistency
  • Need partition tolerance? Consider CRDTs
  • Need blazing speed? Use local aggregation

Further Reading


Have you implemented distributed counters in production? What patterns worked best for your use case? Share your war stories in the comments!