Transactionally Staged Job Drains in Postgres

Sep 20, 2017

Background jobs are one of the most common patterns in web programming, and for good reason. Slow API calls and other heavy lifting is deferred to out-of-band workers so that a user’s request is executed as quickly as possible. In web services, fast is a feature.

But when it comes to working with background jobs in conjunction with ACID transactions of the likes you’d find in Postgres, MySQL, or SQL Server, there are a few sharp edges that aren’t immediately obvious. To demonstrate, let’s take a simple workflow that starts a transaction, executes a few DB operations, and queues a job somewhere in the middle:

DB.transaction do |t|
  db_op1(t)
  queue_job()
  db_op2(t)
end

It’s not easy to spot, but if your queue is fast, the job enqueued by queue_job() is likely to fail. A worker starts running it before its enclosing transaction is committed, and it fails to access data that it expected to be available.

As an easy example, imagine db_op1() inserts a user record. queue_job() puts a job in the queue to retrieve that record, and add that user’s email address (along with a unique internal ID) to an email whitelist managed by another service. A background worker dequeues the job, but finds that the user record it’s looking for is nowhere to be found in the database.

A job failing because the data it relies on is not yet committed.
A job failing because the data it relies on is not yet committed.

A related problem are transaction rollbacks. In these cases data is discarded completely, and jobs inserted into the queue will never succeed no matter how many times they’re retried.

Sidekiq has a FAQ on this exact subject:

Why am I seeing a lot of “Can’t find ModelName with ID=12345” errors with Sidekiq?

Your client is creating the Model instance within a transaction and pushing a job to Sidekiq. Sidekiq is trying to execute your job before the transaction has actually committed. Use Rails’s after_commit :on => :create hook or move the job creation outside of the transaction block.

Not to pick on Sidekiq in particular (you can find similar answers and implementations all over the web), but this solution solves one problem only to introduce another.

If you queue a job after a transaction is committed, you run the risk of your program crashing after the commit, but before the job makes it to the queue. Data is persisted, but the background work doesn’t get done. It’s a problem that’s less common than the one Sidekiq is addressing, but one that’s far more nefarious; you almost certainly won’t notice when it happens.

Other common solutions are equally as bad. For example, another well-worn pattern is to allow the job’s first few tries to fail, and rely on the queue’s retry scheme to eventually push the work through at some point after the transaction has committed. The downsides of this implementation is that it thrashes needlessly (lots of wasted work is done) and throws a lot of unnecessary errors.

We can dequeue jobs gracefully by using a transactionally-staged job drain.

With this pattern, jobs aren’t immediately sent to the job queue. Instead, they’re staged in a table within the relational database itself, and the ACID properties of the running transaction keep them invisible until they’re ready to be worked. A secondary enqueuer process reads the table and sends any jobs it finds to the job queue before removing their rows.

Here’s some sample DDL for what a staged_jobs table might look like:

CREATE TABLE staged_jobs (
    id       BIGSERIAL PRIMARY KEY,
    job_name TEXT      NOT NULL,
    job_args JSONB     NOT NULL
);

And here’s what a simple enqueuer implementation that sends jobs through to Sidekiq:

# Only one enqueuer should be running at any given time.
acquire_lock(:enqueuer) do

  loop do
    # Need at least repeatable read isolation level so that our DELETE after
    # enqueueing will see the same jobs as the original SELECT.
    DB.transaction(isolation_level: :repeatable_read) do
      jobs = StagedJob.order(:id).limit(BATCH_SIZE)

      unless jobs.empty?
        jobs.each do |job|
          Sidekiq.enqueue(job.job_name, *job.job_args)
        end

        StagedJob.where(Sequel.lit("id <= ?", jobs.last.id)).delete
      end
    end

    # If `staged_jobs` was empty, sleep for some time so
    # we're not continuously hammering the database with
    # no-ops.
    sleep_with_exponential_backoff
  end

end

Transactional isolation means that the enqueuer is unable to see jobs that aren’t yet commmitted (even if they’ve been inserted into staged_jobs by an uncommitted transaction), so jobs are never worked too early.

Jobs are invisible to the enqueuer until their transaction is committed.
Jobs are invisible to the enqueuer until their transaction is committed.

It’s similarly protected against rollbacks. If a job is inserted within a transaction that’s subsequently discarded, the job is discarded with it.

The enqueuer is also totally resistant to job loss. Jobs are only removed after they’re successfully transmitted to the queue, so even if the worker dies partway through, it will pick back up again and send along any jobs that it missed. At least once delivery semantics are guaranteed.

Jobs being sequestered in a staging table and enqueued when they're ready to be worked.
Jobs being sequestered in a staging table and enqueued when they're ready to be worked.

Delayed_job, que, and queue_classic use a similar transactional mechanic to keep jobs hidden, and take it even a step further by having workers dequeue jobs directly from within the database.

This is workable at modest to medium scale, but the frantic pace at which workers try to lock jobs doesn’t scale very well for a database that’s experiencing considerable load. For Postgres in particular, long-running transactions greatly increase the amount of time it takes for workers to find a job that they can lock, and this can lead to the job queue spiraling out of control.

The transactionally-staged job drain avoids this problem by selecting primed jobs in bulk and feeding them into another store like Redis that’s better-suited for distributing jobs to competing workers.

This article’s picked up some traction since I originally wrote it, and some enterprising people have taken it upon themselves to produce transactionally-staged queues for various platforms. Here’s a list of some that I’ve been told about 1:

1 If you have a project that’s widely applicable and you wanted it added to this list, let me know.

Did I make a mistake? Please consider sending a pull request.