I read this article today – it’s about a year old but still really awesome. Previously I’ve read some publications about LinkedIn’s infrastructure and a few friends work(ed) on related things at LinkedIn, but it was nice to see how the actual OSS pieces (Kafka & Samza) play into how they run infrastructure. I was just thinking about what modern open-source web infrastructure has to offer and how people are thinking about data infrastructure these days.

At the end I’ll go a bit into how it matches up with my own experiences in comparison. For the record, it’s accurate to things I’ve seen and helped me think more deeply about why modern architectures are designed as they are.

Outline

The post goes through (4) examples of “derived data” given a classic web application’s data infrastructure (SQL database, stateless application servers, stateless frontend).

Derived data

The following are examples of derived data from your one true database:

  • Replication
  • Secondary Indexing
  • Caching
  • Materialized Views

…and in SQL world, writes look like this:

Writes -> [Database] --> Leader ---> Replication Event Stream ---> Follower(s)

More detailed notes on each section follow.

Replication

Good databases keep replicas of your data in case something fails. But you need to keep that data in sync with the master.

In practice, after a successful write happens on a master, the event is then republished to followers via some replication event stream. It could be that the followers have an exact same copy of the master’s db and apply the changes incrementally, or they could take periodic snapshots…not important. The important thing is that databases have the concept of a replication stream.

Secondary Indexes

Secondary indexes allow you to efficiently find rows that match a particular query. An index is often something like a B-Tree, which lets you avoid too many disk seeks to find the relevant rows to your query. In RDBMS, the index generally maps the field you index to the primary key of your database row.

When you add a secondary index to your database, there is added cost. For one, you must maintain the index. So when a row is modified, you need to also modify the index.

# Example database where the primary key is col1.
col1 | col2 | col3
------------------
row1 | 123  | "abc"
row2 | 234  | "def"
row3 | 345  | "ghi"

# Adding an index for col2 creates an index that maps the column values to row
# pointers (usually using the primary key).
123 -> [row1]
234 -> [row2]
345 -> [row3]

Now you can iterate over the data via those numeric keys instead of the primary key.

Transactional Index Updates

If you want your database to not make bad mistakes, you better make index updates part of the update transaction. So when you want to update 1 row, it will try to update all indexes affected, then commit the transaction. Or the whole transaction will fail.

The transformation function which derives the index from the original table is not applied just once when you create the index (on the whole table), but then continuously on all rows that get changed.

Concurrent Index Building

Because creating an index can take a long time on big tables, sometimes you can concurrently create an index. In PostGreSQL for example, you can use CREATE INDEX CONCURRENTLY and it will start a background process that runs while your application is still reading / writing to the db as usual.

To do this:

  1. The database needs to build the index from a snapshot at one point in time
  2. Needs to keep track of all changes that occurred since that snapshot while the index build was in progress.

Caching

Here’s a common pattern for application caches.

  1. Fetch key from cache
  2. If it doesn’t exist in the cache, fetch from database.
  3. Update the cache.

Problems:

  1. When to remove items from the cache? (Invalidation)
  2. 2 concurrent updates to the database, where DB chooses how to serialize. But then order of updates to the cache may not be serialized correctly… (Race conditions, consistency issues)
  3. Caches take time to “warm up.” In production systems, you can’t just cold start everything – can cause unexpected surge in requests to the db layer. Instead, you may need to figure out a bootstrapping strategy. That is, prefetch things into the cache first so it doesn’t start cold. (Cold start)

Application caching is annoying and sucks but we do it because performance matters :(

Caches ~= Secondary Indexes!

  • A secondary index is a redundant data structure on the side that structures the same data but in a different way, for performance.

  • A cache is a redundant data structure on the side that structures data for performance.

  • Cache contents are derived from database contents. If you lose your cache, you can rebuild it from the underlying database.

Materialized Views

Lets compare and contrast views with materialized views:

Ordinary SQL Views

# Create a SQL view.
CREATE VIEW example_view(foo) AS (
  SELECT foo
  FROM bar
  WHERE ...
);

# Write a query on the view.
SELECT x FROM example_view;

# SQL will rewrite your query as:
# SELECT foo FROM bar WHERE ...

Each query is just converted; SQL views say “transform my query for my convenience.”

Materialized Views in RDBMS

The database scans the contents of your input tables, selects all of the data, and copies the result into a temporary table. This is then written to disk, which is great if the view’s underlying query is expensive and you want to avoid recomputing.

“But this is just another cache!” you say. Yes. The big difference between the materialized view and application-managed caches is the responsibility for keeping it up to date.

The database takes care of maintaining the materialized view / keeping it up to date: this is sometimes done via ongoing basis, but some require periodically refreshing the view so changes take effect.

Stored Procedures

Pretty irrelevant to the point of the article, but I liked his tangential points here: you can implement arbitrary business logic running as a materialized view inside your database! But it’s hard to reason about things if you do:

  • Monitoring: did your business logic stop working?
  • Versioning: did your business logic change or regress or not get pushed?
  • Deployments
  • Performance Impact
  • Multi-tenant resource isolation
  • Etc

TL;DR…stay away from using stored procedures.

Materialized Views: Room for Improvement

Are materialized views good? The way they’re implemented…they put additional load on the db but the idea of a cache is to reduce load on the db, so it’s kinda crappy.

But it’s kind of a nice idea: materialized views are sort of caches that keep themselves up to date magically, putting the complexity of cache invalidation in the data infrastructure INSTEAD of the application.

Idea: Can we reinvent materialized views?

  • Modern / Scalable implementation?
  • General mechanism for cache maintenence?

A Log-centered Streaming Database Architecture

This describes how LinkedIn structures their infrastructure using Apache Samza. Instead of a database interface like SQL, the read/write event stream becomes the primary database interface.

[Write Event Stream / Transaction Log] --> [Materialized Views of the Log]

Writes

“Writes” become immutable events that are appended to the transaction log (an append-only data structure):

{
    user:"john",
    action:"updated",
    view_count_old:"1",
    view_count_new:"2",
    timestamp: "12345678"
}

Instead of interfacing with the system via SQL or some database API, interface by sending events to the log.

One thing you can use for a transaction log is Apache Kafka. Millions QPS for writes on modest software. Only need a leader if you need to validate constraints before write, otherwise you can actually just write to anywhere in the log.

Reads

Reading from logs is bad because requires scanning.

Solution: Build materialized views from the writes. They’re like secondary indexes – they’re data structures derived from data in the log optimized for fast reading. Can be rebuilt from the log at any time.

There can be many different materialized views on the same data (key-value store index, full-text search index, graph index, analytics system, etc).

Samza

Apache Samza can process Kafka streams and build the aforementioned views. It consumes and joins the streams. The post is pretty thin on details of how it works, how a user interfaces with the views, etc, but that’s OK.

Samza handles logs-compaction (vital, otherwise to compute a view you’d need to go through every event that ever happened in your system…wasteful). That’s nice to know.

Why Log-centered

  1. Better data: write once, read from many different views. Separation of concerns between writing and reading. More data is captured: you can track what actions a user may have taken and that’s good for analytics.

  2. Fully precomputed caches: no cold cache / warming, no hit/miss rates, no race conditions, no complex invalidation logic, better isolation/robustness.

  3. Reactive programming (the post says “streams everywhere”). In other words, UI systems like react are great for being updated from a data stream. Samza views could be subscribed to via WebSockets or SSE to update data! Less terrible n-layer-deep fetches.

A relevant quote about Martin Kleppmann’s philosophy long-term:

I’m saying that in the future, REST is not going to cut the mustard, because it’s based on a request-response model.

…traditional databases and caches are like global variables, a kind of shared mutable state that becomes messy at scale

Some Thoughts circa 2016

The article was written a little over a year ago now and most of the learnings are things I’ve seen deployed at Google as well in various capacities.

I work on abuse systems and we shuttle data around in streams, transforming or augmenting data with “transceivers” (jobs that read / write to streams) and outputting (continuously updated, so valid views) columnar dumps for analysis but also to data stores that can be read from (e.g Spanner).

User interactions with our systems result in mutations that funnel in through streams rather than interactions with some database. Since we capture a log of information it turns out to be helpful for auditing as well, which you might imagine is important to abuse systems.

The main difference I see between the open source stack and Google’s options when it comes to stream processing is Streaming Flume, the continuous computation engine that Google uses. It’s built on an older system called Millwheel; the original Millwheel paper is here. Externally, the popular solution for real-time processing has been the Lambda Architecture for a number of years now.

  1. Google Pubsub –> Kafka
  2. Google’s FlumeJava –> Hadoop, Storm
  3. Bigtable, Colossus, Placer –> Cassandra/HBase, HDFS, ??
  4. Google Dataflow (Streaming Flume & Millwheel) –> Lambda Architecture

I haven’t seen a good open source solution right now that can provide the kind of guarantees that millwheel does for Google at the moment (exactly-once processing, neartime latency, continuous). The only thing I’ve seen so far in the wild is an idea in this article by Jay Kreps, an awesome (now former) LinkedIn infrastructure engineer. His general concept is right on & a good start (you need a scratchpad to engineer such systems). Excerpt:

…The Lambda Architecture requires running both reprocessing and live processing all the time, whereas what I have proposed only requires running the second copy of the job when you need reprocessing. However, my proposal requires temporarily having 2x the storage space in the output database and requires a database that supports high-volume writes for the re-load. In both cases, the extra load of the reprocessing would likely average out.

On the right track with extra “scratchpad” space!

It was a little surprising at first to realize that there wasn’t an OSS alternative yet, but it makes sense because reasoning about time is really damn hard. I think it’s going to take a little more time before the open-source world has a real-time stream processing system with stronger guarantees.

Interesting Topics

Thinking about this made me wonder about some things:

  1. Log compaction: Takes time, takes resources, better do it correctly and not lose data. How much of an effect does this have on resources? What strategies are employed in practice?

  2. Performance: your streaming system better be fast. Even Google’s pubsub system suffers from some publishing latency that might not be tolerable for certain use cases. I’d be curious to see metrics on read-update-write cycles.

  3. Interfaces: you still have to provide abstractions to everything. I wonder what those look like.