Scalable incremental data aggregation on Postgres and Citus

Written by Marco Slot
June 14, 2018

Many companies generate large volumes of time series data from events happening in their application. It’s often useful to have a real-time analytics dashboard to spot trends and changes as they happen. You can build a real-time analytics dashboard on Postgres by constructing a simple pipeline:

  1. Load events into a raw data table in batches
  2. Periodically aggregate new events into a rollup table
  3. Select from the rollup table in the dashboard

For large data streams, Citus (an open source extension to Postgres that scales out Postgres horizontally) can scale out each of these steps across all the cores in a cluster of Postgres nodes.

One of the challenges of maintaining a rollup table is tracking which events have already been aggregated—so you can make sure that each event is aggregated exactly once. A common technique to ensure exactly-once aggregation is to run the aggregation for a particular time period after that time period is over. We often recommend aggregating at the end of the time period for its simplicity, but you cannot provide any results before the time period is over and backfilling is complicated.

Building rollup tables in a new and different way

In this blog post, we’ll introduce a new approach to building rollup tables which addresses the limitations of using time windows. With the new approach, you can easily backfill (add older events and see them reflected in the rollup table) and start aggregating data for the current time period before the time period is over, giving a more real time view of the data.

We assume all events have an identifier which is drawn from a sequence and provide a simple SQL function that enables you to incrementally aggregate ranges of sequence values in a safe, transactional manner.

We tested this approach for a CDN use case and found that a 4-node Citus database cluster can simultaneously:

  • Ingest and aggregate over a million rows per second
  • Keep the rollup table up-to-date within ~10s
  • Answer analytical queries in under 10ms.

SQL infrastructure for incremental aggregation

To do incremental aggregation, you need a way to distinguish which events have been aggregated. We assume each event has an identifier that is drawn from a Postgres sequence, and events have been aggregated up to a certain sequence number. To track this, you can create a rollups table that also contains the name of the events table and the sequence name.

CREATE TABLE rollups (
    name text primary key,
    event_table_name text not null,
    event_id_sequence_name text not null,
    last_aggregated_id bigint default 0
);

We wrote a PL/pgSQL function which use the rollups table to track which events have been aggregated, and returns the range of sequence numbers that are ready to be aggregated next. This PL/pgSQL function can be used in a transaction with an INSERT..SELECT, where the SELECT part filters out sequence numbers that fall outside the range.

In Postgres 10, you can use the pg_sequence_last_value function to check the most recently issued sequence number. However, it would not be safe to simply aggregate all events up to the most recent sequence value. There might still be in-progress writes to the events table that were assigned lower sequence values, but are not yet visible when the aggregation runs. To wait for in-progress writes to finish, we use an explicit table lock as discussed in our recent Postgres locking tips blog post. New writes will briefly block from the moment the LOCK command is executed. Once existing writes are finished, we have the lock, and then we immediately release it to allow new writes to continue. We can do that because we know that all new writes will have higher sequence number, and we can allow those writes to continue as long as we don’t include them in the current aggregation. As a result, we can obtain a range of new events that are ready to be aggregated with minimal interruption on the write side.

CREATE OR REPLACE FUNCTION incremental_rollup_window(rollup_name text, OUT window_start bigint, OUT window_end bigint)
RETURNS record
LANGUAGE plpgsql
AS $function$
DECLARE
    table_to_lock regclass;
BEGIN
    /*
     * Perform aggregation from the last aggregated ID + 1 up to the last committed ID.
     * We do a SELECT .. FOR UPDATE on the row in the rollup table to prevent
     * aggregations from running concurrently.
     */
    SELECT event_table_name, last_aggregated_id+1, pg_sequence_last_value(event_id_sequence_name)
    INTO table_to_lock, window_start, window_end
    FROM rollups
    WHERE name = rollup_name FOR UPDATE;

    IF NOT FOUND THEN
        RAISE 'rollup ''%'' is not in the rollups table', rollup_name;
    END IF;

    IF window_end IS NULL THEN
        /* sequence was never used */
        window_end := 0;
        RETURN;
    END IF;

    /*
     * Play a little trick: We very briefly lock the table for writes in order to
     * wait for all pending writes to finish. That way, we are sure that there are
     * no more uncommitted writes with a identifier lower or equal to window_end.
     * By throwing an exception, we release the lock immediately after obtaining it
     * such that writes can resume.
     */
    BEGIN
        EXECUTE format('LOCK %s IN EXCLUSIVE MODE', table_to_lock);
        RAISE 'release table lock';
    EXCEPTION WHEN OTHERS THEN
    END;

    /*
     * Remember the end of the window to continue from there next time.
     */
    UPDATE rollups SET last_aggregated_id = window_end WHERE name = rollup_name;
END;
$function$;

Now let’s look at an example of using this function for a typical rollup use case.

Incremental aggregation of page view data

A simple example of a real-time analytics dashboard I like to use is for monitoring page views on a website. In a past life I worked for a Content Delivery Network (CDN), where such a dashboard is essential both to operators and customers.

Back when I worked for the CDN, it would have been almost unthinkable to store full page view logs in a SQL database like Postgres. The logs would go into a big distributed storage system and were difficult to process. But with the Citus extension to Postgres, you can now scale Postgres as well as any distributed storage system, while supporting distributed queries, indexes, and rollups. Storing raw events in the database suddenly makes a lot of sense.

The table definition for my page views table is given below. To deal with large write volumes, it is still helpful to minimise index maintenance overhead. We recommend using a BRIN index for looking up ranges of sequence IDs during aggregation. A BRIN index takes very little storage space and is cheap to maintain in this case.

CREATE TABLE page_views (
    site_id int,
    path text,
    client_ip inet,
    view_time timestamptz default now(),
    view_id bigserial
);

-- Allow fast lookups of ranges of sequence IDs
CREATE INDEX view_id_idx ON page_views USING BRIN (view_id);

-- Citus only: distribute the table by site ID
SELECT create_distributed_table('page_views', 'site_id');

I also created a rollup table to keep track of the the number of views per page for a particular minute. Once populated, I can run SQL queries on the table to aggregate further.

CREATE TABLE page_views_1min (
    site_id int,
    path text,
    period_start timestamptz,
    view_count bigint,
    primary key (site_id, path, period_start)
);

-- Citus only: distribute the table by site ID
SELECT create_distributed_table('page_views_1min', 'site_id');

-- Add the 1-minute rollup to the rollups table
INSERT INTO rollups (name, event_table_name, event_id_sequence_name)
VALUES ('page_views_1min_rollup', 'page_views', 'page_views_view_id_seq');

The final step is to define a function for incrementally aggregating the page views using INSERT..SELECT..ON CONFLICT.. and the incremental_rollup_window to select a batch of new, unaggregated events:

CREATE OR REPLACE FUNCTION do_page_view_aggregation(OUT start_id bigint, OUT end_id bigint)
RETURNS record
LANGUAGE plpgsql
AS $function$
BEGIN
    /* determine which page views we can safely aggregate */
    SELECT window_start, window_end INTO start_id, end_id
    FROM incremental_rollup_window('page_views_1min_rollup');

    /* exit early if there are no new page views to aggregate */
    IF start_id > end_id THEN RETURN; END IF;

    /* aggregate the page views, merge results if the entry already exists */
    INSERT INTO page_views_1min (site_id, path, period_start, view_count)
      SELECT site_id, path, date_trunc('minute', view_time), count(*) AS view_count
      FROM page_views
      WHERE view_id BETWEEN start_id AND end_id
      GROUP BY site_id, path, date_trunc('minute', view_time)
      ON CONFLICT (site_id, path, period_start) DO UPDATE
      SET view_count = page_views_1min.view_count + EXCLUDED.view_count;
END;
$function$;

After inserting into the page_views table, the aggregation can be updated by periodically running:

SELECT * FROM do_page_view_aggregation();

By running the do_page_view_aggregation function frequently, you can keep the rollup table up-to-date within seconds of the raw events table. You can also safely load older page view data with a view_time in the past, because these records will still have higher sequence numbers.

If you want to keep track of more advanced statistics in your rollup table, we recommend you check out how to incrementally update count distinct using HLL and heavy hitters using TopN.

Querying the rollup table from the dashboard

If you’re building a dashboard that provides real-time insights into page views, you could actually run queries directly on the raw event data. Citus will parallelise the query at different levels to achieve high performance.

The following query gets the number of page views for an entire site per minute for the last 30 minutes, which takes between 800-900ms on Citus with 1 billion rows loaded.

SELECT date_trunc('minute', view_time) period_start, count(*) FROM page_views WHERE site_id = 2 AND view_time >= '2018-06-07 08:54:00' GROUP BY period_start ORDER BY period_start;
      period_start      |  sum  
------------------------+-------
 2018-06-07 08:54:00+00 | 36482
 2018-06-07 08:55:00+00 | 51272
 2018-06-07 08:56:00+00 | 55216
 2018-06-07 08:57:00+00 | 74936
 2018-06-07 08:58:00+00 | 15776

(30 rows)

Time: 869.478 ms

This may actually be fast enough to power a single user dashboard, but if there are multiple users then the query uses ways too much raw CPU time. Fortunately, the equivalent query on the rollup table is more than 100x faster because the table is smaller and indexed:

citus=# SELECT period_start, sum(view_count) FROM page_views_1min WHERE site_id = 2 AND period_start >= '2018-06-07 08:54:00' GROUP BY period_start ORDER BY period_start;
      period_start      |  sum  
------------------------+-------
 2018-06-07 08:54:00+00 | 36482
 2018-06-07 08:55:00+00 | 51272
 2018-06-07 08:56:00+00 | 55216
 2018-06-07 08:57:00+00 | 74936
 2018-06-07 08:58:00+00 | 15776
...
(30 rows)

Time: 5.473 ms

It’s clear that when we want to support a larger number of users, rollup tables are the way to go.

Aggregation pipeline performance in Citus and Postgres

To test the performance of the data pipeline we randomly generate page view data for 1,000 sites and 100,000 pages. We compared the performance of a distributed Citus Cloud formation (4*r4.4xlarge) against a single Postgres node with equivalent hardware (r4.16xlarge RDS).

We also tried using Aurora, but since it runs an older version of Postgres the pg_sequence_last_value function was unavailable.

We loaded 1 billion rows into the page_views table using the COPY command over 4 connections in batches of 1 million rows. Below are the average data loading speeds.

To actually be able to process 1 million rows per second, it’s important for the aggregation process to keep up with the stream of data, while it is being loaded.

During the data load, we ran SELECT * FROM do_page_view_aggregation() in a loop to update the page_views_1min table. Citus parallelises the aggregation across all the cores in the cluster and is easily able to keep up with the COPY stream. In contrast, single-node Postgres does not parallelise INSERT...SELECT commands and could not keep up with its own ingest speed.

On Citus, every individual run took around 10 seconds, which means that the page_views_1min table was never more than 10 seconds behind on the page_views table. Whereas with a single Postgres node, the aggregation could not keep up, so it started taking arbitrarily long (>10 minutes).

A database for real-time analytics at scale

Being able to express your aggregations in SQL and using indexes that keep your aggregations performant are both invaluable in doing real-time analytics on large data streams. While a single Postgres server cannot always keep up with large data streams, Citus transforms Postgres into a distributed database and enables you to scale out across multiple cores, and process over a million rows per second.

Marco Slot

Written by Marco Slot

Former lead engineer for the Citus database engine at Microsoft. Speaker at Postgres Conf EU, PostgresOpen, pgDay Paris, Hello World, SIGMOD, & lots of meetups. Talk selection team member for Citus Con: An Event for Postgres. PhD in distributed systems. Loves mountain hiking.

@marcoslot marcocitus