Some companies have to process data volumes that by far exceed the capacity of “small” database clusters and they definitely have a valid use case for one of the modern parallelizing / streaming / big data processing technologies. For all others, expressing transformations in plain SQL is just fine and PostgreSQL is the perfect workhorse for that purpose.
In this talk, I will go through some of our best practices for building fast, robust, and tested data integration pipelines inside PostgreSQL. I will explain many of our technical patterns, for example for schema management or for splitting large computations by chunking and table partitioning. And I will show how to apply standard software engineering techniques to maintain agility, consistency, and correctness.
Video: http://www.ustream.tv/recorded/109227465
Bank Loan Approval Analysis: A Comprehensive Data Analysis Project
ETL Patterns with Postgres
1. ETL Patterns with Postgres
@martin_loetzsch
Dr. Martin Loetzsch
Crunch 2017
2. SQL as data processing language
@martin_loetzsch
2
3. Target of computation
CREATE TABLE m_dim_next.region (
region_id SMALLINT PRIMARY KEY,
region_name TEXT NOT NULL UNIQUE,
country_id SMALLINT NOT NULL,
country_name TEXT NOT NULL,
_region_name TEXT NOT NULL
);
Do computation and store result in table
WITH raw_region
AS (SELECT DISTINCT
country,
region
FROM m_data.ga_session
ORDER BY country, region)
INSERT INTO m_dim_next.region
SELECT
row_number()
OVER (ORDER BY country, region ) AS region_id,
CASE WHEN (SELECT count(DISTINCT country)
FROM raw_region r2
WHERE r2.region = r1.region) > 1
THEN region || ' / ' || country
ELSE region END AS region_name,
dense_rank() OVER (ORDER BY country) AS country_id,
country AS country_name,
region AS _region_name
FROM raw_region r1;
INSERT INTO m_dim_next.region
VALUES (-1, 'Unknown', -1, 'Unknown', 'Unknown');
Speedup subsequent transformations
SELECT util.add_index(
'm_dim_next', 'region',
column_names := ARRAY ['_region_name', ‘country_name',
'region_id']);
SELECT util.add_index(
'm_dim_next', 'region',
column_names := ARRAY ['country_id', 'region_id']);
ANALYZE m_dim_next.region;
3
Leave data in Postgres, run SQL queries
@martin_loetzsch
Tables as (intermediate) results of processing steps
4. At the beginning of a data integration pipeline
DROP SCHEMA IF EXISTS m_dim_next CASCADE;
CREATE SCHEMA m_dim_next;
DROP SCHEMA IF EXISTS m_tmp CASCADE;
CREATE SCHEMA m_tmp;
Do all transformations
CREATE TABLE m_tmp.foo
AS SELECT n
FROM generate_series(0, 10000) n;
CREATE TABLE m_dim_next.foo
AS SELECT
n,
n + 1
FROM m_tmp.foo;
-- ..
At the end of pipeline, after checks
DROP SCHEMA IF EXISTS m_dim CASCADE;
ALTER SCHEMA m_dim_next RENAME TO m_dim;
Build m_dim_next schema while users are seeing m_dim
- Data shown in frontend is always consistent
- When ETL breaks, data just gets old
Explicit operations determine structure and content of DWH
- Very developer / git friendly
- Requirement: Must be possible to rebuild DWH from sources
- Add incremental loading / processing when there is a pain
4
Drop & rebuild
@martin_loetzsch
Don’t bother with migrations
5. Renaming / dropping schemas requires exclusive locks
CREATE FUNCTION util.cancel_queries_on_schema(schema TEXT)
RETURNS BOOLEAN AS $$
SELECT pg_cancel_backend(pid)
FROM
(SELECT DISTINCT pid
FROM pg_locks
JOIN pg_database ON database = pg_database.oid
JOIN pg_class ON pg_class.oid = relation
JOIN pg_namespace ON relnamespace = pg_namespace.oid
WHERE datname = current_database() AND nspname = schema
AND pid != pg_backend_pid()) t;
$$ LANGUAGE SQL;
First rename, then drop
CREATE FUNCTION util.replace_schema(schemaname TEXT,
replace_with TEXT)
RETURNS VOID AS $$
BEGIN
PERFORM util.cancel_queries_on_schema(schemaname);
IF EXISTS(SELECT *
FROM information_schema.schemata s
WHERE s.schema_name = schemaname)
THEN
EXECUTE 'ALTER SCHEMA ' || schemaname
|| ' RENAME TO ' || schemaname || '_old;';
END IF;
EXECUTE 'ALTER SCHEMA ' || replace_with
|| ' RENAME TO ' || schemaname || ';';
-- again, for good measure
PERFORM util.cancel_queries_on_schema(schemaname);
EXECUTE 'DROP SCHEMA IF EXISTS ' || schemaname
|| '_old CASCADE;';
END;
$$ LANGUAGE 'plpgsql'
Nice one-liner
SELECT util.replace_schema('m_dim', 'm_dim_next');
5
Atomic & robust schema switches
@martin_loetzsch
Transactional DDL FTW
6. Each pipeline maintains their own schemas
- Never write to schemas of other pipelines
- Only read from “public” schemas of other pipelines
Project A schema name conventions
mp_dim: public tables of pipeline my-pipeline
mp_dim_next: schema for building the next version of mp_dim
mp_tmp: temporary tables that are not considered public
mp_data: tables that are not always dropped (for incremental loading)
Optional: copy “public” schema from ETL db to frontend db
6
Schemas & pipelines
@martin_loetzsch
Sometimes confusing for analysts
7. 128 GB machine, highly overcommitted memory
max_connections = 200
temp_buffers = 2GB
work_mem = 2GB
Postgres gracefully restarts when it can’t allocate more memory
LOG: unexpected EOF on client connection with an open transaction
LOG: server process (PID 17697) was terminated by signal 9: Killed
DETAIL: Failed process was running: SELECT
m_tmp.index_tmp_touchpoint();
LOG: terminating any other active server processes
WARNING: terminating connection because of crash of another server
process
DETAIL: The postmaster has commanded this server process to roll
back the current transaction and exit, because another server
process exited abnormally and possibly corrupted shared memory.
HINT: In a moment you should be able to reconnect to the database
and repeat your command.
DROP SCHEMA x CASCADE;
max_locks_per_transaction = 4000
Speed up IO by crippling WAL
wal_level = minimal
fsync = off
synchronous_commit = off
full_page_writes = off
wal_buffers = -1
No protection against hardware failure!
7
Tuning Postgres for drop & rebuild
@martin_loetzsch
Not recommended for any other use case
8. Running queries
cat query.sql
| PGOPTIONS=--client-min-messages=warning psql --no-psqlrc
--set ON_ERROR_STOP=on etl_db
echo "SELECT s.my_function()"
| PGOPTIONS=--client-min-messages=warning psql --no-psqlrc
--set ON_ERROR_STOP=on etl_db
Loading files
unzip -p data.csv
| python mapper_script.py
| PGOPTIONS=--client-min-messages=warning psql --no-psqlrc
--set ON_ERROR_STOP=on etl_db
--command="COPY s.target_table FROM STDIN"
Loading from other databases
cat source-query.sql
| mysql --skip-column-names source_db
| PGOPTIONS=--client-min-messages=warning psql --no-psqlrc
--set ON_ERROR_STOP=on etl_db
--command="COPY s.target_table FROM STDIN
WITH NULL AS 'NULL'"
Don’t use ORMS / client libraries
Use “/usr/bin/env bash -o pipefail” as shell
8
Main interface to Postgres: psql
@martin_loetzsch
Hassle-free & fast
10. It’s easy to make mistakes during ETL
DROP SCHEMA IF EXISTS s CASCADE; CREATE SCHEMA s;
CREATE TABLE s.city (
city_id SMALLINT,
city_name TEXT,
country_name TEXT
);
INSERT INTO s.city VALUES
(1, 'Berlin', 'Germany'),
(2, 'Budapest', 'Hungary');
CREATE TABLE s.customer (
customer_id BIGINT,
city_fk SMALLINT
);
INSERT INTO s.customer VALUES
(1, 1),
(1, 2),
(2, 3);
Customers per country?
SELECT
country_name,
count(*) AS number_of_customers
FROM s.customer JOIN s.city
ON customer.city_fk = s.city.city_id
GROUP BY country_name;
Back up all assumptions about data by constraints
ALTER TABLE s.city ADD PRIMARY KEY (city_id);
ALTER TABLE s.city ADD UNIQUE (city_name);
ALTER TABLE s.city ADD UNIQUE (city_name, country_name);
ALTER TABLE s.customer ADD PRIMARY KEY (customer_id);
[23505] ERROR: could not create unique index "customer_pkey"
Detail: Key (customer_id)=(1) is duplicated.
ALTER TABLE s.customer ADD FOREIGN KEY (city_fk)
REFERENCES s.city (city_id);
[23503] ERROR: insert or update on table "customer" violates
foreign key constraint "customer_city_fk_fkey"
Detail: Key (city_fk)=(3) is not present in table "city"
10
Referential consistency
@martin_loetzsch
Only very little overhead, will save your ass
11. 10/18/2017 2017-10-18-dwh-schema-pav.svg
customer
customer_id
first_order_fk
favourite_product_fk
lifetime_revenue
product
product_id
revenue_last_6_months
order
order_id
processed_order_id
customer_fk
product_fk
revenue
Never repeat “business logic”
SELECT sum(total_price) AS revenue
FROM os_data.order
WHERE status IN ('pending', 'accepted', 'completed',
'proposal_for_change');
SELECT CASE WHEN (status <> 'started'
AND payment_status = 'authorised'
AND order_type <> 'backend')
THEN o.order_id END AS processed_order_fk
FROM os_data.order;
SELECT (last_status = 'pending') :: INTEGER AS is_unprocessed
FROM os_data.order;
Refactor pipeline
Create separate task that computes everything we know about an order
Usually difficult in real life
Load → preprocess → transform → flatten-fact
11
Computational consistency
@martin_loetzsch
Requires discipline
load-product load-order load-customer
preprocess-product preprocess-order preprocess-customer
transform-product transform-order transform-customer
flatten-product-fact flatten-order-fact flatten-customer-fact
12. CREATE FUNCTION m_tmp.normalize_utm_source(TEXT)
RETURNS TEXT AS $$
SELECT
CASE
WHEN $1 LIKE '%.%' THEN lower($1)
WHEN $1 = '(direct)' THEN 'Direct'
WHEN $1 LIKE 'Untracked%' OR $1 LIKE '(%)'
THEN $1
ELSE initcap($1)
END;
$$ LANGUAGE SQL IMMUTABLE;
CREATE FUNCTION util.norm_phone_number(phone_number TEXT)
RETURNS TEXT AS $$
BEGIN
phone_number := TRIM(phone_number);
phone_number := regexp_replace(phone_number, '(0)', '');
phone_number
:= regexp_replace(phone_number, '[^[:digit:]]', '', 'g');
phone_number
:= regexp_replace(phone_number, '^(+49|0049|49)', '0');
phone_number := regexp_replace(phone_number, '^(00)', '');
phone_number := COALESCE(phone_number, '');
RETURN phone_number;
END;
$$ LANGUAGE PLPGSQL IMMUTABLE;
CREATE FUNCTION m_tmp.compute_ad_id(id BIGINT, api m_tmp.API)
RETURNS BIGINT AS $$
-- creates a collision free ad id from an id in a source system
SELECT ((CASE api
WHEN 'adwords' THEN 1
WHEN 'bing' THEN 2
WHEN 'criteo' THEN 3
WHEN 'facebook' THEN 4
WHEN 'backend' THEN 5
END) * 10 ^ 18) :: BIGINT + id
$$ LANGUAGE SQL IMMUTABLE;
CREATE FUNCTION pv.date_to_supplier_period_start(INTEGER)
RETURNS INTEGER AS $$
-- this maps all dates to either a integer which is included
-- in lieferantenrabatt.period_start or
-- null (meaning we don't have a lieferantenrabatt for it)
SELECT
CASE
WHEN $1 >= 20170501 THEN 20170501
WHEN $1 >= 20151231 THEN 20151231
ELSE 20151231
END;
$$ LANGUAGE SQL IMMUTABLE;
12
When not possible: use functions
@martin_loetzsch
Almost no performance overhead
13. Check for “lost” rows
SELECT util.assert_equal(
'The order items fact table should contain all order items',
'SELECT count(*) FROM os_dim.order_item',
'SELECT count(*) FROM os_dim.order_items_fact');
Check consistency across cubes / domains
SELECT util.assert_almost_equal(
'The number of first orders should be the same in '
|| 'orders and marketing touchpoints cube',
'SELECT count(net_order_id)
FROM os_dim.order
WHERE _net_order_rank = 1;',
'SELECT (SELECT sum(number_of_first_net_orders)
FROM m_dim.acquisition_performance)
/ (SELECT count(*)
FROM m_dim.performance_attribution_model)',
1.0
);
Check completeness of source data
SELECT util.assert_not_found(
'Each adwords campaign must have the attribute "Channel"',
'SELECT DISTINCT campaign_name, account_name
FROM aw_tmp.ad
JOIN aw_dim.ad_performance ON ad_fk = ad_id
WHERE attributes->>''Channel'' IS NULL
AND impressions > 0
AND _date > now() - INTERVAL ''30 days''');
Check correctness of redistribution transformations
SELECT util.assert_almost_equal_relative(
'The cost of non-converting touchpoints must match the'
|| 'redistributed customer acquisition and reactivation cost',
'SELECT sum(cost)
FROM m_tmp.cost_of_non_converting_touchpoints;',
'SELECT
(SELECT sum(cost_per_touchpoint * number_of_touchpoints)
FROM m_tmp.redistributed_customer_acquisition_cost)
+ (SELECT sum(cost_per_touchpoint * number_of_touchpoints)
FROM m_tmp.redistributed_customer_reactivation_cost);',
0.00001);
13
Data consistency checks
@martin_loetzsch
Makes changing things easy
14. Execute queries and compare results
CREATE FUNCTION util.assert(description TEXT, query TEXT)
RETURNS BOOLEAN AS $$
DECLARE
succeeded BOOLEAN;
BEGIN
EXECUTE query INTO succeeded;
IF NOT succeeded THEN RAISE EXCEPTION 'assertion failed:
# % #
%', description, query;
END IF;
RETURN succeeded;
END
$$ LANGUAGE 'plpgsql';
CREATE FUNCTION util.assert_almost_equal_relative(
description TEXT, query1 TEXT,
query2 TEXT, percentage DECIMAL)
RETURNS BOOLEAN AS $$
DECLARE
result1 NUMERIC;
result2 NUMERIC;
succeeded BOOLEAN;
BEGIN
EXECUTE query1 INTO result1;
EXECUTE query2 INTO result2;
EXECUTE 'SELECT abs(' || result2 || ' - ' || result1 || ') / '
|| result1 || ' < ' || percentage INTO succeeded;
IF NOT succeeded THEN RAISE WARNING '%
assertion failed: abs(% - %) / % < %
%: (%)
%: (%)', description, result2, result1, result1, percentage,
result1, query1, result2, query2;
END IF;
RETURN succeeded;
END
$$ LANGUAGE 'plpgsql';
14
Consistency check functions
@martin_loetzsch
Also: assert_not_found, assert_equal_table, assert_smaller_than_or_equal
17. Create two tables, fill with 10M values
DROP SCHEMA IF EXISTS s CASCADE; CREATE SCHEMA s;
CREATE TABLE s.table_1 (
user_id BIGINT,
some_number INTEGER);
INSERT INTO s.table_1
SELECT n, n FROM generate_series(1, 10000000) n;
CREATE TABLE s.table_2 AS SELECT * FROM s.table_1;
Join both tables
EXPLAIN ANALYZE
SELECT *
FROM s.table_1
JOIN s.table_2 USING (user_id);
Let’s assume 18 seconds is slow
Merge Join (rows=10000000)
Merge Cond: (table_1.user_id = table_2.user_id)
-> Sort (rows=10000000)
Sort Key: table_1.user_id
Sort Method: external merge Disk: 215032kB
-> Seq Scan on table_1 (rows=10000000)
-> Materialize (rows=10000000)
-> Sort (rows=10000000)
Sort Key: table_2.user_id
Sort Method: external merge Disk: 254144kB
-> Seq Scan on table_2 (rows=10000000)
Planning time: 0.700 ms
Execution time: 18278.520 ms
17
Joining large tables can be slow
@martin_loetzsch
Think of: join all user touchpoints with all user orders
18. Create table_1 with 5 partitions
DROP SCHEMA IF EXISTS s CASCADE; CREATE SCHEMA s;
CREATE TABLE s.table_1 (
user_id BIGINT NOT NULL,
user_chunk SMALLINT NOT NULL,
some_number INTEGER NOT NULL);
CREATE TABLE s.table_1_0 (CHECK (user_chunk = 0))
INHERITS (s.table_1);
CREATE TABLE s.table_1_1 (CHECK (user_chunk = 1))
INHERITS (s.table_1);
CREATE TABLE s.table_1_2 (CHECK (user_chunk = 2))
INHERITS (s.table_1);
CREATE TABLE s.table_1_3 (CHECK (user_chunk = 3))
INHERITS (s.table_1);
CREATE TABLE s.table_1_4 (CHECK (user_chunk = 4))
INHERITS (s.table_1);
Insert directly into partition (don’t use triggers etc.)
INSERT INTO s.table_1_0
SELECT n, n % 5, n from generate_series(0, 10000000, 5) n;
Automate insertion
CREATE FUNCTION s.insert_data(table_name TEXT, chunk SMALLINT)
RETURNS VOID AS $$
BEGIN
EXECUTE 'INSERT INTO ' || table_name || '_' || chunk
|| ' SELECT n, n % 5, n from generate_series(‘
|| chunk || ', 10000000, 5) n';
EXECUTE 'ANALYZE ' || table_name || '_' || chunk;
END;
$$ LANGUAGE plpgsql;
Run in parallel (depends on ETL framework)
SELECT s.insert_data('s.table_1', 1 :: SMALLINT);
SELECT s.insert_data('s.table_1', 2 :: SMALLINT);
SELECT s.insert_data('s.table_1', 3 :: SMALLINT);
SELECT s.insert_data('s.table_1', 4 :: SMALLINT);
18
Splitting data in chunks / partitions
@martin_loetzsch
user chunk = user_id % 5
19. Build DDL statement and execute it
CREATE FUNCTION s.create_table_partitions(schemaname TEXT,
tablename TEXT,
key_column TEXT,
keys TEXT[])
RETURNS VOID AS $$
DECLARE key TEXT;
BEGIN
FOREACH KEY IN ARRAY keys LOOP
IF
NOT EXISTS(SELECT 1
FROM information_schema.tables t
WHERE t.table_schema = schemaname
AND t.table_name = tablename || '_' || key)
THEN
EXECUTE 'CREATE TABLE ' || schemaname || '.' || tablename
|| '_' || key || ' ( CHECK (' || key_column
|| ' = ' || key || ') ) INHERITS (' || schemaname
|| '.' || tablename || ');';
END IF;
END LOOP;
END
$$ LANGUAGE plpgsql;
Create table_2
CREATE TABLE s.table_2 (
user_id BIGINT NOT NULL,
user_chunk SMALLINT NOT NULL,
some_number INTEGER NOT NULL
);
SELECT s.create_table_partitions(
's', 'table_2', 'user_chunk',
(SELECT array(SELECT n :: TEXT FROM generate_series(0, 4) n)));
Run in parallel (depends on ETL framework):
SELECT s.insert_data('s.table_2', 0 :: SMALLINT);
SELECT s.insert_data('s.table_2', 1 :: SMALLINT);
SELECT s.insert_data('s.table_2', 2 :: SMALLINT);
SELECT s.insert_data('s.table_2', 3 :: SMALLINT);
SELECT s.insert_data('s.table_2', 4 :: SMALLINT);
19
Automating partition management
@martin_loetzsch
Also great: pg_partman, Postgres 10 declarative partitioning
20. Join table_1 and table_2 for chunk 0
EXPLAIN ANALYZE
SELECT
user_id,
0 AS user_chunk,
table_1.some_number + table_2.some_number AS sum
FROM s.table_1
JOIN s.table_2 USING (user_id)
WHERE table_1.user_chunk = 0 AND table_2.user_chunk = 0;
Similar to original join (18 seconds), but almost 5x faster
Merge Join
Merge Cond: (table_1.user_id = table_2.user_id)
-> Sort (rows=2000001)
Sort Key: table_1.user_id
Sort Method: external merge Disk: 43008kB
-> Append (rows=2000001)
-> Seq Scan on table_1 (rows=0)
Filter: (user_chunk = 0)
-> Seq Scan on table_1_0 (rows=2000001)
Filter: (user_chunk = 0)
-> Sort (rows=2000001)
Sort Key: table_2.user_id
Sort Method: external sort Disk: 58664kB
-> Append (rows=2000001)
-> Seq Scan on table_2 (rows=0)
Filter: (user_chunk = 0)
-> Seq Scan on table_2_0 (rows=2000001)
Filter: (user_chunk = 0)
Planning time: 5.171 ms
Execution time: 3901.282 ms
20
Querying chunked data
@martin_loetzsch
21. No chunk restriction on table_2
EXPLAIN ANALYZE
SELECT
user_id,
0 AS user_chunk,
table_1.some_number + table_2.some_number AS sum
FROM s.table_1
JOIN s.table_2 USING (user_id)
WHERE table_1.user_chunk = 0;
Append operator quite costly
Merge Join (rows=2000001)
Merge Cond: (table_1.user_id = table_2.user_id)
-> Sort (rows=2000001)
Sort Key: table_1.user_id
Sort Method: external merge Disk: 43008kB
-> Append (rows=2000001)
-> Seq Scan on table_1 (rows=0)
Filter: (user_chunk = 0)
-> Seq Scan on table_1_0 (rows=2000001)
Filter: (user_chunk = 0)
-> Materialize (rows=10000001)
-> Sort (rows=10000001)
Sort Key: table_2.user_id
Sort Method: external merge Disk: 254144kB
-> Append (rows=10000001)
-> Seq Scan on table_2 (rows=0)
-> Seq Scan on table_2_0 (rows=2000001)
-> Seq Scan on table_2_1 (rows=2000000)
-> Seq Scan on table_2_2 (rows=2000000)
-> Seq Scan on table_2_3 (rows=2000000)
-> Seq Scan on table_2_4 (rows=2000000)
Planning time: 0.371 ms
Execution time: 11654.858 ms
21
Common mistake
@martin_loetzsch
Check for sequence scans on complete partitioned tables
22. Create partitioned table for target of computation
CREATE TABLE s.table_3 (
user_id BIGINT NOT NULL,
user_chunk SMALLINT NOT NULL,
sum INTEGER NOT NULL);
SELECT s.create_table_partitions(
's', 'table_3', 'user_chunk',
(SELECT array(SELECT n :: TEXT FROM generate_series(0, 4) n)));
A function for the computation
CREATE FUNCTION s.join_table_1_and_table_2(chunk SMALLINT)
RETURNS SETOF s.table_3 AS $$
BEGIN
RETURN QUERY
SELECT
user_id,
table_1.some_number + table_2.some_number AS sum,
chunk
FROM s.table_1 JOIN s.table_2 USING (user_id)
WHERE table_1.user_chunk = chunk AND table_2.user_chunk = chunk;
END
$$ LANGUAGE plpgsql;
Insert from a separate function
CREATE OR REPLACE FUNCTION s.insert_table_3(chunk INTEGER)
RETURNS VOID AS $$
BEGIN
EXECUTE 'INSERT INTO s.table_3_' || chunk
|| ' (SELECT * FROM s.join_table_1_and_table_2('
|| chunk || '::SMALLINT))';
END
$$ LANGUAGE plpgsql;
Run in parallel (depends on ETL framework)
SELECT s.insert_table_3(0);
SELECT s.insert_table_3(1);
SELECT s.insert_table_3(2);
SELECT s.insert_table_3(3);
SELECT s.insert_table_3(4);
22
Using functions for chunked processing
@martin_loetzsch
Our best practices, depends on ETL framework
26. Join a table with 10M values with a table with 10K values
DROP SCHEMA IF EXISTS s CASCADE; CREATE SCHEMA s;
CREATE TABLE s.table_1 AS
SELECT a
FROM generate_series(1, 10000) a;
CREATE TABLE s.table_2 AS
SELECT a
FROM generate_series(1, 10000000) a;
CREATE INDEX table_2__a ON s.table_2 (a);
EXPLAIN ANALYZE
SELECT *
FROM s.table_2
JOIN s.table_1 USING (a);
Index on table_2 is not used
Merge Join (rows=573750000) (actual rows=10000)
Merge Cond: (table_1.a = table_2.a)
-> Sort (rows=11475) (actual rows=10000)
Sort Key: table_1.a
Sort Method: quicksort Memory: 853kB
-> Seq Scan on table_1 (rows=11475) (actual rows=10000)
-> Materialize (rows=10000000) (actual rows=10001)
-> Sort (rows=10000000) (actual rows=10001)
Sort Key: table_2.a
Sort Method: external merge Disk: 136816kB
-> Seq Scan on table_2 (rows=10000000)
(actual rows=10000000)
Planning time: 2.173 ms
Execution time: 3071.127 ms
The query planner didn’t know that table_2 has much more
distinct values than table_1
Statistics about cardinality and distribution of data are collected by
asynchronous autovacuum daemon
26
Nondeterministic behavior?
@martin_loetzsch
Queries sometimes don’t terminate when run in ETL, but look fine when run separately
27. Manually collect data statistics before using new tables
DROP SCHEMA IF EXISTS s CASCADE; CREATE SCHEMA s;
CREATE TABLE s.table_1 AS
SELECT a
FROM generate_series(1, 10000) a;
CREATE TABLE s.table_2 AS
SELECT a
FROM generate_series(1, 10000000) a;
CREATE INDEX table_2__a ON s.table_2 (a);
ANALYZE s.table_1;
ANALYZE s.table_2;
EXPLAIN ANALYZE
SELECT *
FROM s.table_2
JOIN s.table_1 USING (a);
> 300 x faster
Merge Join (rows=10000) (actual rows=10000)
Merge Cond: (table_2.a = table_1.a)
-> Index Only Scan using table_2__a on table_2 (rows=9999985)
(actual rows=10001)
Heap Fetches: 10001
-> Sort (rows=10000) (actual rows=10000)
Sort Key: table_1.a
Sort Method: quicksort Memory: 853kB
-> Seq Scan on table_1 (rows=10000) (rows=10000)
Planning time: 1.759 ms
Execution time: 9.287 ms
27
Analyze all the things
@martin_loetzsch
Always analyze newly created tables
31. Initialize extension
CREATE EXTENSION IF NOT EXISTS cstore_fdw;
DO $$
BEGIN
IF NOT (SELECT exists(SELECT 1
FROM pg_foreign_server
WHERE srvname = 'cstore_server'))
THEN CREATE SERVER cstore_server FOREIGN DATA WRAPPER cstore_fdw;
END IF;
END
$$;
Create fact table
DROP SCHEMA IF EXISTS s CASCADE; CREATE SCHEMA s;
CREATE FOREIGN TABLE s.fact (
fact_id BIGSERIAL,
a TEXT,
b TEXT
)
SERVER cstore_server OPTIONS (compression 'pglz');
Insert 10M rows
INSERT INTO s.fact
SELECT
n,
('a ' || 1 + (4 * random()) :: INTEGER) AS a,
('b ' || 1 + (4 * random()) :: INTEGER) AS b
FROM generate_series(0, 10000000) n
ORDER BY a, b;
8% of original size
SELECT pg_size_pretty(pg_table_size(’s.fact'));
37 MB
31
Save disk space with cstore tables
@martin_loetzsch
Optimized Row Columnar (ORC) format
32. Typical OLAP query
EXPLAIN ANALYZE
SELECT
a,
count(*) AS count
FROM s.fact
WHERE b = 'b 1'
GROUP BY a;
5 x faster than without store
HashAggregate (rows=5)
Group Key: a
-> Foreign Scan on fact (rows=1248701)
Filter: (b = 'b 1'::text)
Rows Removed by Filter: 41299
CStore File: /postgresql/9.6/cstore_fdw/836136/849220
CStore File Size: 39166772
Planning time: 10.057 ms
Execution time: 257.107 ms
Highly selective query
EXPLAIN ANALYZE
SELECT
count(*) AS count
FROM s.fact
WHERE a = 'a 1'
AND b = 'b 1’;
Really fast
Aggregate (rows=1)
-> Foreign Scan on fact (rows=156676)
Filter: ((a = 'a 1'::text) AND (b = 'b 1'::text))
Rows Removed by Filter: 13324
CStore File: /postgresql/9.6/cstore_fdw/836136/849220
CStore File Size: 39166772
Planning time: 5.492 ms
Execution time: 31.689 ms
32
Main operation: foreign scan
@martin_loetzsch
Aggregation pushdown coming soon