PN Tech

A blog about Technology at Precision Nutrition

15 May 2020

Fun with MVCC II

by

Hello again and welcome back! When we last left off, we had just given up on trying to find a “simple” solution to the following problem:

The problem, restated

How can we read events from a table as they are being written without skipping records that haven’t committed at read time?

To provide a concrete example, we might run the following query:

SELECT id FROM events ORDER BY id WHERE id > 184

and get back 185, 186, 189 even though events 187 and 188 would show up in the results if we were to run this query sometime later. Why? Because it’s possible that at the time of our first read, the transaction that inserts 187 and 188 has not yet committed.

We can’t avoid this issue by looking at each id and waiting for id + 1 to show up before proceeding because the ids are generated by a Postgres sequence, and it is possible that there are gaps that will never be filled.

If this problem still seems unclear, please go back and read the first post.

What we want

Ideally, we would have a cleaner sequence of event ids. In particular, we want each id to be 1 greater than the previous id, with no gaps. This would permit us to read sequentially without skipping anything. As discussed in the previous post, the table we happen to be reading from is managed by a third party library, and we can’t dramatically modify how records are written to it.

But what if we could generate a table of surrogate event ids, arranged in the order that the event records were actually committed? And what if we could write these surrogates in such a way that we left no gaps in their id sequence and didn’t cause egregious performance problems? If this were possible, we could reliably process events by looping over the surrogate table to determine which events we still needed to process. This would permit us to process the events roughly in order, and ensure that none are skipped.

An events stream

Before I go into this solution, I want to credit its source. Commanded is an awesome CQRS/ES framework written in Elixir. It includes an event store implementation which is a veritable treasure trove of Postgres tricks. The following is directly lifted adapted from how that event store generates stream records1.

We will create a table called events_stream. This table will consist of two columns, id and event_id. The id column will be an integer representing event insert/commit order, and the event_id column will be a foreign key to the corresponding event record. While the id column will be a primary key, it will not be associated with a Postgres sequence because, as discussed in the previous post, sequences can have gaps. Instead, we will generate them using a different mechanism.

We will create a second table stream_ids containing a single column, id, which is again an integer primary key not associated with a Postgres sequence. We will only write a single row to this table which will act as the id counter for the events_stream table. Each time we want to insert records into events_stream, we will increment the id of the single record in the stream_ids table by the number of records we are inserting into events_stream. We will do this within the same transaction that inserts the events_stream records.

To give a concerete example, let’s say events 185 and 186 have just been inserted. We would run the following query to record them in the events_stream table:

WITH stream AS (
  UPDATE stream_ids SET id = id + 2
  RETURNING id - 2 AS original_id
),
events (event_id, index) AS (
  VALUES (185, 1), (186, 2)
)
INSERT INTO events_stream (id, event_id)
  SELECT
    stream.original_id + events.index,
    events.event_id
  FROM events, stream;

The stream common table expression in the above query contains an UPDATE which increments the id of our single stream_ids record by the number of event_stream records we are inserting, and returns the original value of the id before it was incremented. The ids of the events_stream records we are inserting are generated relative to this original value. So the first events_stream record is inserted with original id + 1, the second original id + 2, and so on.

The beauty of this query is the relative id incrementing. Even if we are inserting many events_stream records concurrently, we will never get a duplicate or a gap. It forces the sequence of event_stream ids to have just the properties we want.

Some details I’ve glossed over

Okay, this is great, but when does the query to stream the events run? In Commanded, it’s run within the transaction that writes the events in the first place. There, the context is a little bit different because the streams are a first-class resource rather than something which was tacked on later.

Another good place to run it would be inside of a trigger that fires on inserts to the events table. This works well provided that your event inserts are fairly fast because concurrent inserts can now force transactions writing events in other processes to be held open until they complete2.

If you have some event writes that take a long time and you don’t want these to interfere with the processing of other events, a third possibility is to use a queue. Here, you use a trigger on event insert to add the newly inserted event id to a queue table. You then have a separate process that dequeues and streams items from this table in a single transaction.

Back to the original problem

Now, we can finally fix the code that caused all of this trouble in the first place. We upgrade the original code to loop over the event_stream records to determine which events to process:

  sql = "SELECT id, event_id FROM events_stream WHERE id > $1 ORDER BY ID"
  streamed_events = fetch_records(sql, last_seen)

  for streamed_event in streamed_events
    sql = "SELECT * FROM events WHERE id = $?"
    event = fetch_record(sql, streamed_event.event_id)
    process(event)
    last_seen = streamed_event.id
  end
end

Conclusion

This approach ended up being somewhat more involved than we were expecting when we ran into the issue in the first place. On the bright side, we were able to implement it without having to make any changes to the 3rd party library that manages the events table, and it has been quite stable over the last few months in production.


  1. And if you’re curious, you can read the implementation right here 

  2. The reason for this is that when using the default READ COMMITTED transaction isolation level, an UPDATE will wait for any concurrent transactions open on its target row to commit or rollback before it proceeds. See here for more details. Hat tip again to @drteeth for digging deep enough to understand what was going on with concurrent queries of this type. 

tags: