SlideShare a Scribd company logo
1 of 38
Download to read offline
ETL Patterns with Postgres
@martin_loetzsch
Dr. Martin Loetzsch
Crunch 2017
SQL as data processing language
@martin_loetzsch
2
Target of computation


CREATE TABLE m_dim_next.region (

region_id SMALLINT PRIMARY KEY,

region_name TEXT NOT NULL UNIQUE,

country_id SMALLINT NOT NULL,

country_name TEXT NOT NULL,

_region_name TEXT NOT NULL

);



Do computation and store result in table


WITH raw_region
AS (SELECT DISTINCT
country,

region

FROM m_data.ga_session

ORDER BY country, region)



INSERT INTO m_dim_next.region
SELECT
row_number()
OVER (ORDER BY country, region ) AS region_id,

CASE WHEN (SELECT count(DISTINCT country)
FROM raw_region r2
WHERE r2.region = r1.region) > 1
THEN region || ' / ' || country
ELSE region END AS region_name,
dense_rank() OVER (ORDER BY country) AS country_id,
country AS country_name,
region AS _region_name
FROM raw_region r1;

INSERT INTO m_dim_next.region
VALUES (-1, 'Unknown', -1, 'Unknown', 'Unknown');

Speedup subsequent transformations


SELECT util.add_index(
'm_dim_next', 'region',
column_names := ARRAY ['_region_name', ‘country_name',
'region_id']);



SELECT util.add_index(
'm_dim_next', 'region',
column_names := ARRAY ['country_id', 'region_id']);



ANALYZE m_dim_next.region;
3
Leave data in Postgres, run SQL queries
@martin_loetzsch
Tables as (intermediate) results of processing steps
At the beginning of a data integration pipeline


DROP SCHEMA IF EXISTS m_dim_next CASCADE;

CREATE SCHEMA m_dim_next;



DROP SCHEMA IF EXISTS m_tmp CASCADE;
CREATE SCHEMA m_tmp;





Do all transformations


CREATE TABLE m_tmp.foo

AS SELECT n
FROM generate_series(0, 10000) n;

CREATE TABLE m_dim_next.foo
AS SELECT
n,
n + 1
FROM m_tmp.foo;

-- ..

At the end of pipeline, after checks


DROP SCHEMA IF EXISTS m_dim CASCADE;
ALTER SCHEMA m_dim_next RENAME TO m_dim;











Build m_dim_next schema while users are seeing m_dim
- Data shown in frontend is always consistent
- When ETL breaks, data just gets old

Explicit operations determine structure and content of DWH
- Very developer / git friendly
- Requirement: Must be possible to rebuild DWH from sources
- Add incremental loading / processing when there is a pain
4
Drop & rebuild
@martin_loetzsch
Don’t bother with migrations
Renaming / dropping schemas requires exclusive locks


CREATE FUNCTION util.cancel_queries_on_schema(schema TEXT)
RETURNS BOOLEAN AS $$
SELECT pg_cancel_backend(pid)
FROM
(SELECT DISTINCT pid
FROM pg_locks
JOIN pg_database ON database = pg_database.oid
JOIN pg_class ON pg_class.oid = relation
JOIN pg_namespace ON relnamespace = pg_namespace.oid
WHERE datname = current_database() AND nspname = schema
AND pid != pg_backend_pid()) t;
$$ LANGUAGE SQL;





First rename, then drop


CREATE FUNCTION util.replace_schema(schemaname TEXT,
replace_with TEXT)
RETURNS VOID AS $$
BEGIN
PERFORM util.cancel_queries_on_schema(schemaname);
IF EXISTS(SELECT *
FROM information_schema.schemata s
WHERE s.schema_name = schemaname)
THEN
EXECUTE 'ALTER SCHEMA ' || schemaname
|| ' RENAME TO ' || schemaname || '_old;';
END IF;
EXECUTE 'ALTER SCHEMA ' || replace_with
|| ' RENAME TO ' || schemaname || ';';

-- again, for good measure
PERFORM util.cancel_queries_on_schema(schemaname);

EXECUTE 'DROP SCHEMA IF EXISTS ' || schemaname
|| '_old CASCADE;';
END;
$$ LANGUAGE 'plpgsql'









Nice one-liner


SELECT util.replace_schema('m_dim', 'm_dim_next');
5
Atomic & robust schema switches
@martin_loetzsch
Transactional DDL FTW
Each pipeline maintains their own schemas
- Never write to schemas of other pipelines
- Only read from “public” schemas of other pipelines



Project A schema name conventions
mp_dim: public tables of pipeline my-pipeline
mp_dim_next: schema for building the next version of mp_dim
mp_tmp: temporary tables that are not considered public
mp_data: tables that are not always dropped (for incremental loading)



Optional: copy “public” schema from ETL db to frontend db
6
Schemas & pipelines
@martin_loetzsch
Sometimes confusing for analysts
128 GB machine, highly overcommitted memory


max_connections = 200

temp_buffers = 2GB
work_mem = 2GB







Postgres gracefully restarts when it can’t allocate more memory


LOG: unexpected EOF on client connection with an open transaction
LOG: server process (PID 17697) was terminated by signal 9: Killed
DETAIL: Failed process was running: SELECT
m_tmp.index_tmp_touchpoint();
LOG: terminating any other active server processes
WARNING: terminating connection because of crash of another server
process
DETAIL: The postmaster has commanded this server process to roll
back the current transaction and exit, because another server
process exited abnormally and possibly corrupted shared memory.
HINT: In a moment you should be able to reconnect to the database
and repeat your command.
DROP SCHEMA x CASCADE;


max_locks_per_transaction = 4000











Speed up IO by crippling WAL


wal_level = minimal
fsync = off
synchronous_commit = off
full_page_writes = off
wal_buffers = -1
No protection against hardware failure!
7
Tuning Postgres for drop & rebuild
@martin_loetzsch
Not recommended for any other use case
Running queries


cat query.sql 

| PGOPTIONS=--client-min-messages=warning psql --no-psqlrc 

--set ON_ERROR_STOP=on etl_db



echo "SELECT s.my_function()" 

| PGOPTIONS=--client-min-messages=warning psql --no-psqlrc 

--set ON_ERROR_STOP=on etl_db









Loading files


unzip -p data.csv 

| python mapper_script.py 

| PGOPTIONS=--client-min-messages=warning psql --no-psqlrc 

--set ON_ERROR_STOP=on etl_db 
--command="COPY s.target_table FROM STDIN"

Loading from other databases


cat source-query.sql 

| mysql --skip-column-names source_db 

| PGOPTIONS=--client-min-messages=warning psql --no-psqlrc 

--set ON_ERROR_STOP=on etl_db 
--command="COPY s.target_table FROM STDIN
WITH NULL AS 'NULL'"











Don’t use ORMS / client libraries

Use “/usr/bin/env bash -o pipefail” as shell
8
Main interface to Postgres: psql
@martin_loetzsch
Hassle-free & fast
Consistency & correctness
@martin_loetzsch
9
It’s easy to make mistakes during ETL


DROP SCHEMA IF EXISTS s CASCADE; CREATE SCHEMA s;



CREATE TABLE s.city (
city_id SMALLINT,
city_name TEXT,
country_name TEXT
);

INSERT INTO s.city VALUES
(1, 'Berlin', 'Germany'),
(2, 'Budapest', 'Hungary');



CREATE TABLE s.customer (
customer_id BIGINT,
city_fk SMALLINT
);

INSERT INTO s.customer VALUES
(1, 1),
(1, 2),
(2, 3);

Customers per country?


SELECT
country_name,
count(*) AS number_of_customers
FROM s.customer JOIN s.city 

ON customer.city_fk = s.city.city_id
GROUP BY country_name;



Back up all assumptions about data by constraints


ALTER TABLE s.city ADD PRIMARY KEY (city_id);
ALTER TABLE s.city ADD UNIQUE (city_name);
ALTER TABLE s.city ADD UNIQUE (city_name, country_name);


ALTER TABLE s.customer ADD PRIMARY KEY (customer_id);
[23505] ERROR: could not create unique index "customer_pkey"
Detail: Key (customer_id)=(1) is duplicated.

ALTER TABLE s.customer ADD FOREIGN KEY (city_fk)
REFERENCES s.city (city_id);
[23503] ERROR: insert or update on table "customer" violates
foreign key constraint "customer_city_fk_fkey"
Detail: Key (city_fk)=(3) is not present in table "city"
10
Referential consistency
@martin_loetzsch
Only very little overhead, will save your ass
10/18/2017 2017-10-18-dwh-schema-pav.svg
customer
customer_id
first_order_fk
favourite_product_fk
lifetime_revenue
product
product_id
revenue_last_6_months
order
order_id
processed_order_id
customer_fk
product_fk
revenue
Never repeat “business logic”


SELECT sum(total_price) AS revenue
FROM os_data.order
WHERE status IN ('pending', 'accepted', 'completed',

'proposal_for_change');




SELECT CASE WHEN (status <> 'started'
AND payment_status = 'authorised'
AND order_type <> 'backend')
THEN o.order_id END AS processed_order_fk
FROM os_data.order;



SELECT (last_status = 'pending') :: INTEGER AS is_unprocessed
FROM os_data.order;









Refactor pipeline
Create separate task that computes everything we know about an order
Usually difficult in real life











Load → preprocess → transform → flatten-fact
11
Computational consistency
@martin_loetzsch
Requires discipline
load-product load-order load-customer
preprocess-product preprocess-order preprocess-customer
transform-product transform-order transform-customer
flatten-product-fact flatten-order-fact flatten-customer-fact
CREATE FUNCTION m_tmp.normalize_utm_source(TEXT)
RETURNS TEXT AS $$
SELECT
CASE
WHEN $1 LIKE '%.%' THEN lower($1)
WHEN $1 = '(direct)' THEN 'Direct'
WHEN $1 LIKE 'Untracked%' OR $1 LIKE '(%)'
THEN $1
ELSE initcap($1)
END;
$$ LANGUAGE SQL IMMUTABLE;

CREATE FUNCTION util.norm_phone_number(phone_number TEXT)
RETURNS TEXT AS $$
BEGIN
phone_number := TRIM(phone_number);
phone_number := regexp_replace(phone_number, '(0)', '');
phone_number
:= regexp_replace(phone_number, '[^[:digit:]]', '', 'g');
phone_number
:= regexp_replace(phone_number, '^(+49|0049|49)', '0');
phone_number := regexp_replace(phone_number, '^(00)', '');
phone_number := COALESCE(phone_number, '');
RETURN phone_number;
END;
$$ LANGUAGE PLPGSQL IMMUTABLE;

CREATE FUNCTION m_tmp.compute_ad_id(id BIGINT, api m_tmp.API)
RETURNS BIGINT AS $$
-- creates a collision free ad id from an id in a source system
SELECT ((CASE api
WHEN 'adwords' THEN 1
WHEN 'bing' THEN 2
WHEN 'criteo' THEN 3
WHEN 'facebook' THEN 4
WHEN 'backend' THEN 5
END) * 10 ^ 18) :: BIGINT + id
$$ LANGUAGE SQL IMMUTABLE;





CREATE FUNCTION pv.date_to_supplier_period_start(INTEGER)
RETURNS INTEGER AS $$
-- this maps all dates to either a integer which is included
-- in lieferantenrabatt.period_start or
-- null (meaning we don't have a lieferantenrabatt for it)
SELECT
CASE
WHEN $1 >= 20170501 THEN 20170501
WHEN $1 >= 20151231 THEN 20151231
ELSE 20151231
END;
$$ LANGUAGE SQL IMMUTABLE;
12
When not possible: use functions
@martin_loetzsch
Almost no performance overhead
Check for “lost” rows


SELECT util.assert_equal(
'The order items fact table should contain all order items',
'SELECT count(*) FROM os_dim.order_item',
'SELECT count(*) FROM os_dim.order_items_fact');







Check consistency across cubes / domains


SELECT util.assert_almost_equal(
'The number of first orders should be the same in '

|| 'orders and marketing touchpoints cube',
'SELECT count(net_order_id)
FROM os_dim.order
WHERE _net_order_rank = 1;',

'SELECT (SELECT sum(number_of_first_net_orders)
FROM m_dim.acquisition_performance)
/ (SELECT count(*)
FROM m_dim.performance_attribution_model)',
1.0
);

Check completeness of source data


SELECT util.assert_not_found(
'Each adwords campaign must have the attribute "Channel"',
'SELECT DISTINCT campaign_name, account_name
FROM aw_tmp.ad
JOIN aw_dim.ad_performance ON ad_fk = ad_id
WHERE attributes->>''Channel'' IS NULL
AND impressions > 0
AND _date > now() - INTERVAL ''30 days''');



Check correctness of redistribution transformations


SELECT util.assert_almost_equal_relative(
'The cost of non-converting touchpoints must match the'
|| 'redistributed customer acquisition and reactivation cost',
'SELECT sum(cost)
FROM m_tmp.cost_of_non_converting_touchpoints;',
'SELECT
(SELECT sum(cost_per_touchpoint * number_of_touchpoints)
FROM m_tmp.redistributed_customer_acquisition_cost)
+ (SELECT sum(cost_per_touchpoint * number_of_touchpoints)
FROM m_tmp.redistributed_customer_reactivation_cost);',
0.00001);
13
Data consistency checks
@martin_loetzsch
Makes changing things easy
Execute queries and compare results

CREATE FUNCTION util.assert(description TEXT, query TEXT)
RETURNS BOOLEAN AS $$
DECLARE
succeeded BOOLEAN;
BEGIN
EXECUTE query INTO succeeded;
IF NOT succeeded THEN RAISE EXCEPTION 'assertion failed:
# % #
%', description, query;
END IF;
RETURN succeeded;
END
$$ LANGUAGE 'plpgsql';







CREATE FUNCTION util.assert_almost_equal_relative(
description TEXT, query1 TEXT,
query2 TEXT, percentage DECIMAL)
RETURNS BOOLEAN AS $$
DECLARE
result1 NUMERIC;
result2 NUMERIC;
succeeded BOOLEAN;
BEGIN
EXECUTE query1 INTO result1;
EXECUTE query2 INTO result2;
EXECUTE 'SELECT abs(' || result2 || ' - ' || result1 || ') / '
|| result1 || ' < ' || percentage INTO succeeded;
IF NOT succeeded THEN RAISE WARNING '%
assertion failed: abs(% - %) / % < %
%: (%)
%: (%)', description, result2, result1, result1, percentage,
result1, query1, result2, query2;
END IF;
RETURN succeeded;
END
$$ LANGUAGE 'plpgsql';
14
Consistency check functions
@martin_loetzsch
Also: assert_not_found, assert_equal_table, assert_smaller_than_or_equal
Yes, unit tests
SELECT util.assert_value_equal('test_german_number_with_country_prefix', util.norm_phone_number('00491234'), '01234');
SELECT util.assert_value_equal('test_german_number_not_to_be_confused_with_country_prefix', util.norm_phone_number('0491234'), '0491234');
SELECT util.assert_value_equal('test_non_german_number_with_plus', util.norm_phone_number('+44 1234'), '441234');
SELECT util.assert_value_equal('test_german_number_with_prefix_and_additional_zero', util.norm_phone_number('+49 (0)1234'), '01234');
SELECT util.assert_value_equal('test__trim', util.norm_phone_number(' 0491234 '), '0491234');
SELECT util.assert_value_equal('test_number_with_leading_wildcard_symbol', util.norm_phone_number('*+436504834933'), '436504834933');
SELECT util.assert_value_equal('test_NULL', util.norm_phone_number(NULL), '');
SELECT util.assert_value_equal('test_empty', util.norm_phone_number(''), '');
SELECT util.assert_value_equal('test_wildcard_only', util.norm_phone_number('*'), '');
SELECT util.assert_value_equal('test_foreign_number_with_two_leading_zeroes', util.norm_phone_number('*00436769553701'), '436769553701');
SELECT util.assert_value_equal('test_domestic_number_with_trailing_letters', util.norm_phone_number('017678402HORST'), '017678402');
SELECT util.assert_value_equal('test_domestic_number_with_leading_letters', util.norm_phone_number('HORST017678402'), '017678402');
SELECT util.assert_value_equal('test_domestic_number_with_letters_in_between', util.norm_phone_number('0H1O7R6S7T8402'), '017678402');
SELECT util.assert_value_equal('test_german_number_with_country_prefix_and_leading_letters',
util.norm_phone_number('HORST00491234'), '01234');
SELECT util.assert_value_equal(‘test_german_number_not_to_be_confused_with_country_prefix_and_leading_letters',
util.norm_phone_number('HORST0491234'), '0491234');
SELECT util.assert_value_equal('test_non_german_number_with_plus_and_leading_letters', util.norm_phone_number('HORST+44 1234'), '441234');
SELECT util.assert_value_equal('test_german_number_with_prefix_and_additional_zero_and_leading_letters',
util.norm_phone_number('HORST+49 (0)1234'), ‘01234');
15
Unit tests
@martin_loetzsch
People enter horrible telephone numbers into websites
Divide & conquer strategies
@martin_loetzsch
16
Create two tables, fill with 10M values


DROP SCHEMA IF EXISTS s CASCADE; CREATE SCHEMA s;




CREATE TABLE s.table_1 (
user_id BIGINT,
some_number INTEGER);


INSERT INTO s.table_1
SELECT n, n FROM generate_series(1, 10000000) n;




CREATE TABLE s.table_2 AS SELECT * FROM s.table_1;

Join both tables


EXPLAIN ANALYZE
SELECT *
FROM s.table_1
JOIN s.table_2 USING (user_id);



Let’s assume 18 seconds is slow


Merge Join (rows=10000000)

Merge Cond: (table_1.user_id = table_2.user_id)

-> Sort (rows=10000000)

Sort Key: table_1.user_id

Sort Method: external merge Disk: 215032kB

-> Seq Scan on table_1 (rows=10000000)

-> Materialize (rows=10000000)

-> Sort (rows=10000000)

Sort Key: table_2.user_id

Sort Method: external merge Disk: 254144kB

-> Seq Scan on table_2 (rows=10000000)

Planning time: 0.700 ms

Execution time: 18278.520 ms
17
Joining large tables can be slow
@martin_loetzsch
Think of: join all user touchpoints with all user orders
Create table_1 with 5 partitions


DROP SCHEMA IF EXISTS s CASCADE; CREATE SCHEMA s;


CREATE TABLE s.table_1 (
user_id BIGINT NOT NULL,
user_chunk SMALLINT NOT NULL,
some_number INTEGER NOT NULL);




CREATE TABLE s.table_1_0 (CHECK (user_chunk = 0))
INHERITS (s.table_1);


CREATE TABLE s.table_1_1 (CHECK (user_chunk = 1))
INHERITS (s.table_1);


CREATE TABLE s.table_1_2 (CHECK (user_chunk = 2))
INHERITS (s.table_1);


CREATE TABLE s.table_1_3 (CHECK (user_chunk = 3))
INHERITS (s.table_1);


CREATE TABLE s.table_1_4 (CHECK (user_chunk = 4))
INHERITS (s.table_1);

Insert directly into partition (don’t use triggers etc.)


INSERT INTO s.table_1_0
SELECT n, n % 5, n from generate_series(0, 10000000, 5) n;


Automate insertion


CREATE FUNCTION s.insert_data(table_name TEXT, chunk SMALLINT)
RETURNS VOID AS $$
BEGIN
EXECUTE 'INSERT INTO ' || table_name || '_' || chunk
|| ' SELECT n, n % 5, n from generate_series(‘
|| chunk || ', 10000000, 5) n';
EXECUTE 'ANALYZE ' || table_name || '_' || chunk;
END;
$$ LANGUAGE plpgsql;

Run in parallel (depends on ETL framework)


SELECT s.insert_data('s.table_1', 1 :: SMALLINT);
SELECT s.insert_data('s.table_1', 2 :: SMALLINT);
SELECT s.insert_data('s.table_1', 3 :: SMALLINT);
SELECT s.insert_data('s.table_1', 4 :: SMALLINT);
18
Splitting data in chunks / partitions
@martin_loetzsch
user chunk = user_id % 5
Build DDL statement and execute it


CREATE FUNCTION s.create_table_partitions(schemaname TEXT,
tablename TEXT,
key_column TEXT,
keys TEXT[])
RETURNS VOID AS $$
DECLARE key TEXT;
BEGIN
FOREACH KEY IN ARRAY keys LOOP
IF
NOT EXISTS(SELECT 1
FROM information_schema.tables t
WHERE t.table_schema = schemaname
AND t.table_name = tablename || '_' || key)
THEN
EXECUTE 'CREATE TABLE ' || schemaname || '.' || tablename
|| '_' || key || ' ( CHECK (' || key_column
|| ' = ' || key || ') ) INHERITS (' || schemaname
|| '.' || tablename || ');';
END IF;
END LOOP;
END
$$ LANGUAGE plpgsql;

Create table_2


CREATE TABLE s.table_2 (
user_id BIGINT NOT NULL,
user_chunk SMALLINT NOT NULL,
some_number INTEGER NOT NULL
);


SELECT s.create_table_partitions(
's', 'table_2', 'user_chunk',
(SELECT array(SELECT n :: TEXT FROM generate_series(0, 4) n)));









Run in parallel (depends on ETL framework):


SELECT s.insert_data('s.table_2', 0 :: SMALLINT);
SELECT s.insert_data('s.table_2', 1 :: SMALLINT);
SELECT s.insert_data('s.table_2', 2 :: SMALLINT);
SELECT s.insert_data('s.table_2', 3 :: SMALLINT);
SELECT s.insert_data('s.table_2', 4 :: SMALLINT);
19
Automating partition management
@martin_loetzsch
Also great: pg_partman, Postgres 10 declarative partitioning
Join table_1 and table_2 for chunk 0


EXPLAIN ANALYZE
SELECT
user_id,
0 AS user_chunk,
table_1.some_number + table_2.some_number AS sum
FROM s.table_1
JOIN s.table_2 USING (user_id)
WHERE table_1.user_chunk = 0 AND table_2.user_chunk = 0;

Similar to original join (18 seconds), but almost 5x faster


Merge Join
Merge Cond: (table_1.user_id = table_2.user_id)
-> Sort (rows=2000001)
Sort Key: table_1.user_id
Sort Method: external merge Disk: 43008kB
-> Append (rows=2000001)
-> Seq Scan on table_1 (rows=0)
Filter: (user_chunk = 0)
-> Seq Scan on table_1_0 (rows=2000001)
Filter: (user_chunk = 0)
-> Sort (rows=2000001)
Sort Key: table_2.user_id
Sort Method: external sort Disk: 58664kB
-> Append (rows=2000001)
-> Seq Scan on table_2 (rows=0)
Filter: (user_chunk = 0)
-> Seq Scan on table_2_0 (rows=2000001)
Filter: (user_chunk = 0)
Planning time: 5.171 ms
Execution time: 3901.282 ms
20
Querying chunked data
@martin_loetzsch
No chunk restriction on table_2


EXPLAIN ANALYZE
SELECT
user_id,
0 AS user_chunk,
table_1.some_number + table_2.some_number AS sum
FROM s.table_1
JOIN s.table_2 USING (user_id)
WHERE table_1.user_chunk = 0;

Append operator quite costly


Merge Join (rows=2000001)
Merge Cond: (table_1.user_id = table_2.user_id)
-> Sort (rows=2000001)
Sort Key: table_1.user_id
Sort Method: external merge Disk: 43008kB
-> Append (rows=2000001)
-> Seq Scan on table_1 (rows=0)
Filter: (user_chunk = 0)
-> Seq Scan on table_1_0 (rows=2000001)
Filter: (user_chunk = 0)
-> Materialize (rows=10000001)
-> Sort (rows=10000001)
Sort Key: table_2.user_id
Sort Method: external merge Disk: 254144kB
-> Append (rows=10000001)
-> Seq Scan on table_2 (rows=0)
-> Seq Scan on table_2_0 (rows=2000001)
-> Seq Scan on table_2_1 (rows=2000000)
-> Seq Scan on table_2_2 (rows=2000000)
-> Seq Scan on table_2_3 (rows=2000000)
-> Seq Scan on table_2_4 (rows=2000000)
Planning time: 0.371 ms
Execution time: 11654.858 ms
21
Common mistake
@martin_loetzsch
Check for sequence scans on complete partitioned tables
Create partitioned table for target of computation


CREATE TABLE s.table_3 (
user_id BIGINT NOT NULL,
user_chunk SMALLINT NOT NULL,
sum INTEGER NOT NULL);

SELECT s.create_table_partitions(
's', 'table_3', 'user_chunk',
(SELECT array(SELECT n :: TEXT FROM generate_series(0, 4) n)));

A function for the computation


CREATE FUNCTION s.join_table_1_and_table_2(chunk SMALLINT)
RETURNS SETOF s.table_3 AS $$
BEGIN
RETURN QUERY
SELECT
user_id,
table_1.some_number + table_2.some_number AS sum,
chunk
FROM s.table_1 JOIN s.table_2 USING (user_id)
WHERE table_1.user_chunk = chunk AND table_2.user_chunk = chunk;
END
$$ LANGUAGE plpgsql;
Insert from a separate function


CREATE OR REPLACE FUNCTION s.insert_table_3(chunk INTEGER)
RETURNS VOID AS $$
BEGIN
EXECUTE 'INSERT INTO s.table_3_' || chunk
|| ' (SELECT * FROM s.join_table_1_and_table_2('
|| chunk || '::SMALLINT))';
END
$$ LANGUAGE plpgsql;











Run in parallel (depends on ETL framework)


SELECT s.insert_table_3(0);
SELECT s.insert_table_3(1);
SELECT s.insert_table_3(2);
SELECT s.insert_table_3(3);
SELECT s.insert_table_3(4);
22
Using functions for chunked processing
@martin_loetzsch
Our best practices, depends on ETL framework
read
ga
session
map
ga
visitor
preprocess
adwords
ad
preprocess
criteo
campaign
preprocess
facebook
ad
set
transform
device
transform
host
transform
landing
page
transform
region
update
campaign
tree
preprocess
touchpoint
preprocess
ad
preprocess
campaign
tree
map
manual
cost
transform
ad
attribute
transform
ad
transform
acquisition
performance
transform
reactivation
performance
count
converting
touchpoints
transform
marketing
cost
compute
first
touchpoint
create
performance
attribution
model
read
manual
cost
flatten
touchpoints
fact
flatten
marketing
cost
fact
compute
redistributed
customer
acquisition
cost
compute
redistributed
customer
reactivation
cost
collect
non
converting
cost
compute
touchpoint
cost
transform
touchpoint
Choose chunks depending on things that are processed together
Use secondary indexes on tables that are processed in different chunks
Re-chunk on chunk “borders”



Real world example


CREATE FUNCTION m_tmp.insert_dim_touchpoint(visitor_chunk SMALLINT)
RETURNS VOID AS $$
DECLARE day_chunk SMALLINT;
BEGIN
FOR day_chunk IN (SELECT util.get_all_chunks())
LOOP
EXECUTE 'INSERT INTO m_dim_next.touchpoint_' || visitor_chunk
|| ' (SELECT * FROM m_tmp.transform_touchpoint('
|| visitor_chunk || '::SMALLINT, ' || day_chunk ||
'::SMALLINT))';
END LOOP;
END
$$ LANGUAGE plpgsql;
23
Chunking all the way down
@martin_loetzsch
Takes discipline
day
day_chunk
visitor_chunk
attribution_model
Make changing chunk size easy


CREATE FUNCTION s.get_number_of_chunks()
RETURNS SMALLINT AS $$
SELECT 5 :: SMALLINT;
$$ LANGUAGE SQL IMMUTABLE;


CREATE FUNCTION s.get_all_chunks()
RETURNS SETOF INTEGER AS $$
SELECT generate_series(0, s.get_number_of_chunks() - 1);
$$ LANGUAGE SQL IMMUTABLE;


CREATE FUNCTION s.compute_chunk(x BIGINT)
RETURNS SMALLINT AS $$
SELECT coalesce(abs(x) % s.get_number_of_chunks(), 0) :: SMALLINT;
$$ LANGUAGE SQL IMMUTABLE;
Inline chunk computation


DROP FUNCTION s.compute_chunk( BIGINT );


DO $$
BEGIN
EXECUTE '
CREATE FUNCTION s.compute_chunk(x BIGINT)
RETURNS SMALLINT AS $_$
SELECT coalesce(abs(x) % ' || s.get_number_of_chunks() || ',
0)::SMALLINT;
$_$ LANGUAGE SQL IMMUTABLE;';
END
$$;
24
Configurable chunking
@martin_loetzsch
Integrate with ETL framework
Becoming friends with the query planner
@martin_loetzsch
25
Join a table with 10M values with a table with 10K values


DROP SCHEMA IF EXISTS s CASCADE; CREATE SCHEMA s;




CREATE TABLE s.table_1 AS
SELECT a
FROM generate_series(1, 10000) a;


CREATE TABLE s.table_2 AS
SELECT a
FROM generate_series(1, 10000000) a;


CREATE INDEX table_2__a ON s.table_2 (a);




EXPLAIN ANALYZE
SELECT *
FROM s.table_2
JOIN s.table_1 USING (a);

Index on table_2 is not used


Merge Join (rows=573750000) (actual rows=10000)
Merge Cond: (table_1.a = table_2.a)
-> Sort (rows=11475) (actual rows=10000)
Sort Key: table_1.a
Sort Method: quicksort Memory: 853kB
-> Seq Scan on table_1 (rows=11475) (actual rows=10000)
-> Materialize (rows=10000000) (actual rows=10001)
-> Sort (rows=10000000) (actual rows=10001)
Sort Key: table_2.a
Sort Method: external merge Disk: 136816kB
-> Seq Scan on table_2 (rows=10000000)
(actual rows=10000000)
Planning time: 2.173 ms
Execution time: 3071.127 ms



The query planner didn’t know that table_2 has much more
distinct values than table_1
Statistics about cardinality and distribution of data are collected by
asynchronous autovacuum daemon
26
Nondeterministic behavior?
@martin_loetzsch
Queries sometimes don’t terminate when run in ETL, but look fine when run separately
Manually collect data statistics before using new tables


DROP SCHEMA IF EXISTS s CASCADE; CREATE SCHEMA s;




CREATE TABLE s.table_1 AS
SELECT a
FROM generate_series(1, 10000) a;


CREATE TABLE s.table_2 AS
SELECT a
FROM generate_series(1, 10000000) a;


CREATE INDEX table_2__a ON s.table_2 (a);




ANALYZE s.table_1;
ANALYZE s.table_2;




EXPLAIN ANALYZE
SELECT *
FROM s.table_2
JOIN s.table_1 USING (a);

> 300 x faster


Merge Join (rows=10000) (actual rows=10000)
Merge Cond: (table_2.a = table_1.a)
-> Index Only Scan using table_2__a on table_2 (rows=9999985)
(actual rows=10001)
Heap Fetches: 10001
-> Sort (rows=10000) (actual rows=10000)
Sort Key: table_1.a
Sort Method: quicksort Memory: 853kB
-> Seq Scan on table_1 (rows=10000) (rows=10000)
Planning time: 1.759 ms
Execution time: 9.287 ms
27
Analyze all the things
@martin_loetzsch
Always analyze newly created tables
Add to postgresql.conf
session_preload_libraries = 'auto_explain'
auto_explain.log_min_duration = 10
auto_explain.log_nested_statements = true
auto_explain.log_verbose = true
auto_explain.log_analyze = true
debug_pretty_print = on
log_lock_waits = on
Carefully manage log file size
28
Auto-explain everything
@martin_loetzsch
tail -f /path/to/postgres.log
Columnar storage in Postgres
@martin_loetzsch
29
30
Postgres extension cstore_fdw
@martin_loetzsch
Quite hassle-free since version 1.6
Initialize extension


CREATE EXTENSION IF NOT EXISTS cstore_fdw;


DO $$
BEGIN
IF NOT (SELECT exists(SELECT 1
FROM pg_foreign_server
WHERE srvname = 'cstore_server'))
THEN CREATE SERVER cstore_server FOREIGN DATA WRAPPER cstore_fdw;
END IF;
END
$$;



Create fact table


DROP SCHEMA IF EXISTS s CASCADE; CREATE SCHEMA s;


CREATE FOREIGN TABLE s.fact (
fact_id BIGSERIAL,
a TEXT,
b TEXT
)
SERVER cstore_server OPTIONS (compression 'pglz');
Insert 10M rows


INSERT INTO s.fact
SELECT
n,
('a ' || 1 + (4 * random()) :: INTEGER) AS a,
('b ' || 1 + (4 * random()) :: INTEGER) AS b
FROM generate_series(0, 10000000) n
ORDER BY a, b;







8% of original size


SELECT pg_size_pretty(pg_table_size(’s.fact'));


37 MB
31
Save disk space with cstore tables
@martin_loetzsch
Optimized Row Columnar (ORC) format
Typical OLAP query


EXPLAIN ANALYZE
SELECT
a,
count(*) AS count
FROM s.fact
WHERE b = 'b 1'
GROUP BY a;



5 x faster than without store


HashAggregate (rows=5)
Group Key: a
-> Foreign Scan on fact (rows=1248701)
Filter: (b = 'b 1'::text)
Rows Removed by Filter: 41299
CStore File: /postgresql/9.6/cstore_fdw/836136/849220
CStore File Size: 39166772
Planning time: 10.057 ms
Execution time: 257.107 ms

Highly selective query


EXPLAIN ANALYZE
SELECT
count(*) AS count
FROM s.fact
WHERE a = 'a 1'
AND b = 'b 1’;





Really fast


Aggregate (rows=1)
-> Foreign Scan on fact (rows=156676)
Filter: ((a = 'a 1'::text) AND (b = 'b 1'::text))
Rows Removed by Filter: 13324
CStore File: /postgresql/9.6/cstore_fdw/836136/849220
CStore File Size: 39166772
Planning time: 5.492 ms
Execution time: 31.689 ms
32
Main operation: foreign scan
@martin_loetzsch
Aggregation pushdown coming soon
Conclusions
@martin_loetzsch
33
Consider Postgres for your next ETL project
@martin_loetzsch
34
Focus on the complexity of data
rather than the complexity of technology
@martin_loetzsch
35
36
We are open sourcing our BI infrastructure
@martin_loetzsch
ETL part released end of November 2017
37
Work with us
@martin_loetzsch
In Berlin, Stockholm, Amsterdam, Rotterdam, Paris, Los Angeles
Thank you
@martin_loetzsch
38

More Related Content

What's hot

The Importance of Master Data Management
The Importance of Master Data ManagementThe Importance of Master Data Management
The Importance of Master Data ManagementDATAVERSITY
 
Azure data platform overview
Azure data platform overviewAzure data platform overview
Azure data platform overviewJames Serra
 
Data Lake Overview
Data Lake OverviewData Lake Overview
Data Lake OverviewJames Serra
 
Introduction SQL Analytics on Lakehouse Architecture
Introduction SQL Analytics on Lakehouse ArchitectureIntroduction SQL Analytics on Lakehouse Architecture
Introduction SQL Analytics on Lakehouse ArchitectureDatabricks
 
SF Python Meetup - Introduction to NATS Messaging with Python3
SF Python Meetup - Introduction to NATS Messaging with Python3SF Python Meetup - Introduction to NATS Messaging with Python3
SF Python Meetup - Introduction to NATS Messaging with Python3wallyqs
 
Serverless Kafka and Spark in a Multi-Cloud Lakehouse Architecture
Serverless Kafka and Spark in a Multi-Cloud Lakehouse ArchitectureServerless Kafka and Spark in a Multi-Cloud Lakehouse Architecture
Serverless Kafka and Spark in a Multi-Cloud Lakehouse ArchitectureKai Wähner
 
The Future of Data Science and Machine Learning at Scale: A Look at MLflow, D...
The Future of Data Science and Machine Learning at Scale: A Look at MLflow, D...The Future of Data Science and Machine Learning at Scale: A Look at MLflow, D...
The Future of Data Science and Machine Learning at Scale: A Look at MLflow, D...Databricks
 
Introduction to MongoDB
Introduction to MongoDBIntroduction to MongoDB
Introduction to MongoDBMongoDB
 
Gartner: Master Data Management Functionality
Gartner: Master Data Management FunctionalityGartner: Master Data Management Functionality
Gartner: Master Data Management FunctionalityGartner
 
Getting Started with Delta Lake on Databricks
Getting Started with Delta Lake on DatabricksGetting Started with Delta Lake on Databricks
Getting Started with Delta Lake on DatabricksKnoldus Inc.
 
Snowflake Architecture.pptx
Snowflake Architecture.pptxSnowflake Architecture.pptx
Snowflake Architecture.pptxchennakesava44
 
Architecting a Data Warehouse: A Case Study
Architecting a Data Warehouse: A Case StudyArchitecting a Data Warehouse: A Case Study
Architecting a Data Warehouse: A Case StudyMark Ginnebaugh
 
Data Lakehouse, Data Mesh, and Data Fabric (r2)
Data Lakehouse, Data Mesh, and Data Fabric (r2)Data Lakehouse, Data Mesh, and Data Fabric (r2)
Data Lakehouse, Data Mesh, and Data Fabric (r2)James Serra
 
Challenges in Building a Data Pipeline
Challenges in Building a Data PipelineChallenges in Building a Data Pipeline
Challenges in Building a Data PipelineManish Kumar
 
Modernizing to a Cloud Data Architecture
Modernizing to a Cloud Data ArchitectureModernizing to a Cloud Data Architecture
Modernizing to a Cloud Data ArchitectureDatabricks
 
Percona Xtrabackup Best Practices
Percona Xtrabackup Best PracticesPercona Xtrabackup Best Practices
Percona Xtrabackup Best PracticesMarcelo Altmann
 
Building Robust Production Data Pipelines with Databricks Delta
Building Robust Production Data Pipelines with Databricks DeltaBuilding Robust Production Data Pipelines with Databricks Delta
Building Robust Production Data Pipelines with Databricks DeltaDatabricks
 
Introduction to Apache Spark
Introduction to Apache SparkIntroduction to Apache Spark
Introduction to Apache SparkRahul Jain
 

What's hot (20)

Mongo db intro.pptx
Mongo db intro.pptxMongo db intro.pptx
Mongo db intro.pptx
 
The Importance of Master Data Management
The Importance of Master Data ManagementThe Importance of Master Data Management
The Importance of Master Data Management
 
Azure data platform overview
Azure data platform overviewAzure data platform overview
Azure data platform overview
 
Presto
PrestoPresto
Presto
 
Data Lake Overview
Data Lake OverviewData Lake Overview
Data Lake Overview
 
Introduction SQL Analytics on Lakehouse Architecture
Introduction SQL Analytics on Lakehouse ArchitectureIntroduction SQL Analytics on Lakehouse Architecture
Introduction SQL Analytics on Lakehouse Architecture
 
SF Python Meetup - Introduction to NATS Messaging with Python3
SF Python Meetup - Introduction to NATS Messaging with Python3SF Python Meetup - Introduction to NATS Messaging with Python3
SF Python Meetup - Introduction to NATS Messaging with Python3
 
Serverless Kafka and Spark in a Multi-Cloud Lakehouse Architecture
Serverless Kafka and Spark in a Multi-Cloud Lakehouse ArchitectureServerless Kafka and Spark in a Multi-Cloud Lakehouse Architecture
Serverless Kafka and Spark in a Multi-Cloud Lakehouse Architecture
 
The Future of Data Science and Machine Learning at Scale: A Look at MLflow, D...
The Future of Data Science and Machine Learning at Scale: A Look at MLflow, D...The Future of Data Science and Machine Learning at Scale: A Look at MLflow, D...
The Future of Data Science and Machine Learning at Scale: A Look at MLflow, D...
 
Introduction to MongoDB
Introduction to MongoDBIntroduction to MongoDB
Introduction to MongoDB
 
Gartner: Master Data Management Functionality
Gartner: Master Data Management FunctionalityGartner: Master Data Management Functionality
Gartner: Master Data Management Functionality
 
Getting Started with Delta Lake on Databricks
Getting Started with Delta Lake on DatabricksGetting Started with Delta Lake on Databricks
Getting Started with Delta Lake on Databricks
 
Snowflake Architecture.pptx
Snowflake Architecture.pptxSnowflake Architecture.pptx
Snowflake Architecture.pptx
 
Architecting a Data Warehouse: A Case Study
Architecting a Data Warehouse: A Case StudyArchitecting a Data Warehouse: A Case Study
Architecting a Data Warehouse: A Case Study
 
Data Lakehouse, Data Mesh, and Data Fabric (r2)
Data Lakehouse, Data Mesh, and Data Fabric (r2)Data Lakehouse, Data Mesh, and Data Fabric (r2)
Data Lakehouse, Data Mesh, and Data Fabric (r2)
 
Challenges in Building a Data Pipeline
Challenges in Building a Data PipelineChallenges in Building a Data Pipeline
Challenges in Building a Data Pipeline
 
Modernizing to a Cloud Data Architecture
Modernizing to a Cloud Data ArchitectureModernizing to a Cloud Data Architecture
Modernizing to a Cloud Data Architecture
 
Percona Xtrabackup Best Practices
Percona Xtrabackup Best PracticesPercona Xtrabackup Best Practices
Percona Xtrabackup Best Practices
 
Building Robust Production Data Pipelines with Databricks Delta
Building Robust Production Data Pipelines with Databricks DeltaBuilding Robust Production Data Pipelines with Databricks Delta
Building Robust Production Data Pipelines with Databricks Delta
 
Introduction to Apache Spark
Introduction to Apache SparkIntroduction to Apache Spark
Introduction to Apache Spark
 

Similar to ETL Patterns with Postgres

PerlApp2Postgresql (2)
PerlApp2Postgresql (2)PerlApp2Postgresql (2)
PerlApp2Postgresql (2)Jerome Eteve
 
11 Things About 11gr2
11 Things About 11gr211 Things About 11gr2
11 Things About 11gr2afa reg
 
Sydney Oracle Meetup - execution plans
Sydney Oracle Meetup - execution plansSydney Oracle Meetup - execution plans
Sydney Oracle Meetup - execution planspaulguerin
 
Project A Data Modelling Best Practices Part II: How to Build a Data Warehouse?
Project A Data Modelling Best Practices Part II: How to Build a Data Warehouse?Project A Data Modelling Best Practices Part II: How to Build a Data Warehouse?
Project A Data Modelling Best Practices Part II: How to Build a Data Warehouse?Martin Loetzsch
 
Getting Started with PL/Proxy
Getting Started with PL/ProxyGetting Started with PL/Proxy
Getting Started with PL/ProxyPeter Eisentraut
 
perl usage at database applications
perl usage at database applicationsperl usage at database applications
perl usage at database applicationsJoe Jiang
 
The Story About The Migration
 The Story About The Migration The Story About The Migration
The Story About The MigrationEDB
 
OTN TOUR 2016 - DBA Commands and Concepts That Every Developer Should Know
OTN TOUR 2016 - DBA Commands and Concepts That Every Developer Should KnowOTN TOUR 2016 - DBA Commands and Concepts That Every Developer Should Know
OTN TOUR 2016 - DBA Commands and Concepts That Every Developer Should KnowAlex Zaballa
 
OTN TOUR 2016 - DBA Commands and Concepts That Every Developer Should Know
OTN TOUR 2016 - DBA Commands and Concepts That Every Developer Should KnowOTN TOUR 2016 - DBA Commands and Concepts That Every Developer Should Know
OTN TOUR 2016 - DBA Commands and Concepts That Every Developer Should KnowAlex Zaballa
 
Postgres performance for humans
Postgres performance for humansPostgres performance for humans
Postgres performance for humansCraig Kerstiens
 
Oracle PL/SQL - Creative Conditional Compilation
Oracle PL/SQL - Creative Conditional CompilationOracle PL/SQL - Creative Conditional Compilation
Oracle PL/SQL - Creative Conditional CompilationScott Wesley
 
Database Connectivity MYSQL by Dr.C.R.Dhivyaa Kongu Engineering College
Database Connectivity MYSQL by Dr.C.R.Dhivyaa Kongu Engineering CollegeDatabase Connectivity MYSQL by Dr.C.R.Dhivyaa Kongu Engineering College
Database Connectivity MYSQL by Dr.C.R.Dhivyaa Kongu Engineering CollegeDhivyaa C.R
 
Calling r from sas (msug meeting, feb 17, 2018) revised
Calling r from sas (msug meeting, feb 17, 2018)   revisedCalling r from sas (msug meeting, feb 17, 2018)   revised
Calling r from sas (msug meeting, feb 17, 2018) revisedBarry DeCicco
 
Php Data Objects
Php Data ObjectsPhp Data Objects
Php Data Objectshiren.joshi
 
DBA Brasil 1.0 - DBA Commands and Concepts That Every Developer Should Know
DBA Brasil 1.0 - DBA Commands and Concepts That Every Developer Should KnowDBA Brasil 1.0 - DBA Commands and Concepts That Every Developer Should Know
DBA Brasil 1.0 - DBA Commands and Concepts That Every Developer Should KnowAlex Zaballa
 
DBA Brasil 1.0 - DBA Commands and Concepts That Every Developer Should Know
DBA Brasil 1.0 - DBA Commands and Concepts That Every Developer Should KnowDBA Brasil 1.0 - DBA Commands and Concepts That Every Developer Should Know
DBA Brasil 1.0 - DBA Commands and Concepts That Every Developer Should KnowAlex Zaballa
 
DBA Brasil 1.0 - DBA Commands and Concepts That Every Developer Should Know
DBA Brasil 1.0 - DBA Commands and Concepts That Every Developer Should KnowDBA Brasil 1.0 - DBA Commands and Concepts That Every Developer Should Know
DBA Brasil 1.0 - DBA Commands and Concepts That Every Developer Should KnowAlex Zaballa
 
Oracle data guard configuration in 12c
Oracle data guard configuration in 12cOracle data guard configuration in 12c
Oracle data guard configuration in 12cuzzal basak
 
Internationalizing CakePHP Applications
Internationalizing CakePHP ApplicationsInternationalizing CakePHP Applications
Internationalizing CakePHP ApplicationsPierre MARTIN
 

Similar to ETL Patterns with Postgres (20)

PerlApp2Postgresql (2)
PerlApp2Postgresql (2)PerlApp2Postgresql (2)
PerlApp2Postgresql (2)
 
11 Things About 11gr2
11 Things About 11gr211 Things About 11gr2
11 Things About 11gr2
 
Sydney Oracle Meetup - execution plans
Sydney Oracle Meetup - execution plansSydney Oracle Meetup - execution plans
Sydney Oracle Meetup - execution plans
 
Project A Data Modelling Best Practices Part II: How to Build a Data Warehouse?
Project A Data Modelling Best Practices Part II: How to Build a Data Warehouse?Project A Data Modelling Best Practices Part II: How to Build a Data Warehouse?
Project A Data Modelling Best Practices Part II: How to Build a Data Warehouse?
 
Getting Started with PL/Proxy
Getting Started with PL/ProxyGetting Started with PL/Proxy
Getting Started with PL/Proxy
 
perl usage at database applications
perl usage at database applicationsperl usage at database applications
perl usage at database applications
 
The Story About The Migration
 The Story About The Migration The Story About The Migration
The Story About The Migration
 
OTN TOUR 2016 - DBA Commands and Concepts That Every Developer Should Know
OTN TOUR 2016 - DBA Commands and Concepts That Every Developer Should KnowOTN TOUR 2016 - DBA Commands and Concepts That Every Developer Should Know
OTN TOUR 2016 - DBA Commands and Concepts That Every Developer Should Know
 
OTN TOUR 2016 - DBA Commands and Concepts That Every Developer Should Know
OTN TOUR 2016 - DBA Commands and Concepts That Every Developer Should KnowOTN TOUR 2016 - DBA Commands and Concepts That Every Developer Should Know
OTN TOUR 2016 - DBA Commands and Concepts That Every Developer Should Know
 
Postgres performance for humans
Postgres performance for humansPostgres performance for humans
Postgres performance for humans
 
Oracle PL/SQL - Creative Conditional Compilation
Oracle PL/SQL - Creative Conditional CompilationOracle PL/SQL - Creative Conditional Compilation
Oracle PL/SQL - Creative Conditional Compilation
 
UNIT V (5).pptx
UNIT V (5).pptxUNIT V (5).pptx
UNIT V (5).pptx
 
Database Connectivity MYSQL by Dr.C.R.Dhivyaa Kongu Engineering College
Database Connectivity MYSQL by Dr.C.R.Dhivyaa Kongu Engineering CollegeDatabase Connectivity MYSQL by Dr.C.R.Dhivyaa Kongu Engineering College
Database Connectivity MYSQL by Dr.C.R.Dhivyaa Kongu Engineering College
 
Calling r from sas (msug meeting, feb 17, 2018) revised
Calling r from sas (msug meeting, feb 17, 2018)   revisedCalling r from sas (msug meeting, feb 17, 2018)   revised
Calling r from sas (msug meeting, feb 17, 2018) revised
 
Php Data Objects
Php Data ObjectsPhp Data Objects
Php Data Objects
 
DBA Brasil 1.0 - DBA Commands and Concepts That Every Developer Should Know
DBA Brasil 1.0 - DBA Commands and Concepts That Every Developer Should KnowDBA Brasil 1.0 - DBA Commands and Concepts That Every Developer Should Know
DBA Brasil 1.0 - DBA Commands and Concepts That Every Developer Should Know
 
DBA Brasil 1.0 - DBA Commands and Concepts That Every Developer Should Know
DBA Brasil 1.0 - DBA Commands and Concepts That Every Developer Should KnowDBA Brasil 1.0 - DBA Commands and Concepts That Every Developer Should Know
DBA Brasil 1.0 - DBA Commands and Concepts That Every Developer Should Know
 
DBA Brasil 1.0 - DBA Commands and Concepts That Every Developer Should Know
DBA Brasil 1.0 - DBA Commands and Concepts That Every Developer Should KnowDBA Brasil 1.0 - DBA Commands and Concepts That Every Developer Should Know
DBA Brasil 1.0 - DBA Commands and Concepts That Every Developer Should Know
 
Oracle data guard configuration in 12c
Oracle data guard configuration in 12cOracle data guard configuration in 12c
Oracle data guard configuration in 12c
 
Internationalizing CakePHP Applications
Internationalizing CakePHP ApplicationsInternationalizing CakePHP Applications
Internationalizing CakePHP Applications
 

Recently uploaded

Identifying Appropriate Test Statistics Involving Population Mean
Identifying Appropriate Test Statistics Involving Population MeanIdentifying Appropriate Test Statistics Involving Population Mean
Identifying Appropriate Test Statistics Involving Population MeanMYRABACSAFRA2
 
Learn How Data Science Changes Our World
Learn How Data Science Changes Our WorldLearn How Data Science Changes Our World
Learn How Data Science Changes Our WorldEduminds Learning
 
Predictive Analysis for Loan Default Presentation : Data Analysis Project PPT
Predictive Analysis for Loan Default  Presentation : Data Analysis Project PPTPredictive Analysis for Loan Default  Presentation : Data Analysis Project PPT
Predictive Analysis for Loan Default Presentation : Data Analysis Project PPTBoston Institute of Analytics
 
Cyber awareness ppt on the recorded data
Cyber awareness ppt on the recorded dataCyber awareness ppt on the recorded data
Cyber awareness ppt on the recorded dataTecnoIncentive
 
Minimizing AI Hallucinations/Confabulations and the Path towards AGI with Exa...
Minimizing AI Hallucinations/Confabulations and the Path towards AGI with Exa...Minimizing AI Hallucinations/Confabulations and the Path towards AGI with Exa...
Minimizing AI Hallucinations/Confabulations and the Path towards AGI with Exa...Thomas Poetter
 
NO1 Certified Black Magic Specialist Expert Amil baba in Lahore Islamabad Raw...
NO1 Certified Black Magic Specialist Expert Amil baba in Lahore Islamabad Raw...NO1 Certified Black Magic Specialist Expert Amil baba in Lahore Islamabad Raw...
NO1 Certified Black Magic Specialist Expert Amil baba in Lahore Islamabad Raw...Amil Baba Dawood bangali
 
English-8-Q4-W3-Synthesizing-Essential-Information-From-Various-Sources-1.pdf
English-8-Q4-W3-Synthesizing-Essential-Information-From-Various-Sources-1.pdfEnglish-8-Q4-W3-Synthesizing-Essential-Information-From-Various-Sources-1.pdf
English-8-Q4-W3-Synthesizing-Essential-Information-From-Various-Sources-1.pdfblazblazml
 
Decoding the Heart: Student Presentation on Heart Attack Prediction with Data...
Decoding the Heart: Student Presentation on Heart Attack Prediction with Data...Decoding the Heart: Student Presentation on Heart Attack Prediction with Data...
Decoding the Heart: Student Presentation on Heart Attack Prediction with Data...Boston Institute of Analytics
 
Real-Time AI Streaming - AI Max Princeton
Real-Time AI  Streaming - AI Max PrincetonReal-Time AI  Streaming - AI Max Princeton
Real-Time AI Streaming - AI Max PrincetonTimothy Spann
 
Principles and Practices of Data Visualization
Principles and Practices of Data VisualizationPrinciples and Practices of Data Visualization
Principles and Practices of Data VisualizationKianJazayeri1
 
Semantic Shed - Squashing and Squeezing.pptx
Semantic Shed - Squashing and Squeezing.pptxSemantic Shed - Squashing and Squeezing.pptx
Semantic Shed - Squashing and Squeezing.pptxMike Bennett
 
专业一比一美国俄亥俄大学毕业证成绩单pdf电子版制作修改
专业一比一美国俄亥俄大学毕业证成绩单pdf电子版制作修改专业一比一美国俄亥俄大学毕业证成绩单pdf电子版制作修改
专业一比一美国俄亥俄大学毕业证成绩单pdf电子版制作修改yuu sss
 
Data Factory in Microsoft Fabric (MsBIP #82)
Data Factory in Microsoft Fabric (MsBIP #82)Data Factory in Microsoft Fabric (MsBIP #82)
Data Factory in Microsoft Fabric (MsBIP #82)Cathrine Wilhelmsen
 
Advanced Machine Learning for Business Professionals
Advanced Machine Learning for Business ProfessionalsAdvanced Machine Learning for Business Professionals
Advanced Machine Learning for Business ProfessionalsVICTOR MAESTRE RAMIREZ
 
Top 5 Best Data Analytics Courses In Queens
Top 5 Best Data Analytics Courses In QueensTop 5 Best Data Analytics Courses In Queens
Top 5 Best Data Analytics Courses In Queensdataanalyticsqueen03
 
Thiophen Mechanism khhjjjjjjjhhhhhhhhhhh
Thiophen Mechanism khhjjjjjjjhhhhhhhhhhhThiophen Mechanism khhjjjjjjjhhhhhhhhhhh
Thiophen Mechanism khhjjjjjjjhhhhhhhhhhhYasamin16
 
FAIR, FAIRsharing, FAIR Cookbook and ELIXIR - Sansone SA - Boston 2024
FAIR, FAIRsharing, FAIR Cookbook and ELIXIR - Sansone SA - Boston 2024FAIR, FAIRsharing, FAIR Cookbook and ELIXIR - Sansone SA - Boston 2024
FAIR, FAIRsharing, FAIR Cookbook and ELIXIR - Sansone SA - Boston 2024Susanna-Assunta Sansone
 
办美国阿肯色大学小石城分校毕业证成绩单pdf电子版制作修改#真实留信入库#永久存档#真实可查#diploma#degree
办美国阿肯色大学小石城分校毕业证成绩单pdf电子版制作修改#真实留信入库#永久存档#真实可查#diploma#degree办美国阿肯色大学小石城分校毕业证成绩单pdf电子版制作修改#真实留信入库#永久存档#真实可查#diploma#degree
办美国阿肯色大学小石城分校毕业证成绩单pdf电子版制作修改#真实留信入库#永久存档#真实可查#diploma#degreeyuu sss
 
Bank Loan Approval Analysis: A Comprehensive Data Analysis Project
Bank Loan Approval Analysis: A Comprehensive Data Analysis ProjectBank Loan Approval Analysis: A Comprehensive Data Analysis Project
Bank Loan Approval Analysis: A Comprehensive Data Analysis ProjectBoston Institute of Analytics
 

Recently uploaded (20)

Identifying Appropriate Test Statistics Involving Population Mean
Identifying Appropriate Test Statistics Involving Population MeanIdentifying Appropriate Test Statistics Involving Population Mean
Identifying Appropriate Test Statistics Involving Population Mean
 
Learn How Data Science Changes Our World
Learn How Data Science Changes Our WorldLearn How Data Science Changes Our World
Learn How Data Science Changes Our World
 
Predictive Analysis for Loan Default Presentation : Data Analysis Project PPT
Predictive Analysis for Loan Default  Presentation : Data Analysis Project PPTPredictive Analysis for Loan Default  Presentation : Data Analysis Project PPT
Predictive Analysis for Loan Default Presentation : Data Analysis Project PPT
 
Cyber awareness ppt on the recorded data
Cyber awareness ppt on the recorded dataCyber awareness ppt on the recorded data
Cyber awareness ppt on the recorded data
 
Minimizing AI Hallucinations/Confabulations and the Path towards AGI with Exa...
Minimizing AI Hallucinations/Confabulations and the Path towards AGI with Exa...Minimizing AI Hallucinations/Confabulations and the Path towards AGI with Exa...
Minimizing AI Hallucinations/Confabulations and the Path towards AGI with Exa...
 
NO1 Certified Black Magic Specialist Expert Amil baba in Lahore Islamabad Raw...
NO1 Certified Black Magic Specialist Expert Amil baba in Lahore Islamabad Raw...NO1 Certified Black Magic Specialist Expert Amil baba in Lahore Islamabad Raw...
NO1 Certified Black Magic Specialist Expert Amil baba in Lahore Islamabad Raw...
 
English-8-Q4-W3-Synthesizing-Essential-Information-From-Various-Sources-1.pdf
English-8-Q4-W3-Synthesizing-Essential-Information-From-Various-Sources-1.pdfEnglish-8-Q4-W3-Synthesizing-Essential-Information-From-Various-Sources-1.pdf
English-8-Q4-W3-Synthesizing-Essential-Information-From-Various-Sources-1.pdf
 
Insurance Churn Prediction Data Analysis Project
Insurance Churn Prediction Data Analysis ProjectInsurance Churn Prediction Data Analysis Project
Insurance Churn Prediction Data Analysis Project
 
Decoding the Heart: Student Presentation on Heart Attack Prediction with Data...
Decoding the Heart: Student Presentation on Heart Attack Prediction with Data...Decoding the Heart: Student Presentation on Heart Attack Prediction with Data...
Decoding the Heart: Student Presentation on Heart Attack Prediction with Data...
 
Real-Time AI Streaming - AI Max Princeton
Real-Time AI  Streaming - AI Max PrincetonReal-Time AI  Streaming - AI Max Princeton
Real-Time AI Streaming - AI Max Princeton
 
Principles and Practices of Data Visualization
Principles and Practices of Data VisualizationPrinciples and Practices of Data Visualization
Principles and Practices of Data Visualization
 
Semantic Shed - Squashing and Squeezing.pptx
Semantic Shed - Squashing and Squeezing.pptxSemantic Shed - Squashing and Squeezing.pptx
Semantic Shed - Squashing and Squeezing.pptx
 
专业一比一美国俄亥俄大学毕业证成绩单pdf电子版制作修改
专业一比一美国俄亥俄大学毕业证成绩单pdf电子版制作修改专业一比一美国俄亥俄大学毕业证成绩单pdf电子版制作修改
专业一比一美国俄亥俄大学毕业证成绩单pdf电子版制作修改
 
Data Factory in Microsoft Fabric (MsBIP #82)
Data Factory in Microsoft Fabric (MsBIP #82)Data Factory in Microsoft Fabric (MsBIP #82)
Data Factory in Microsoft Fabric (MsBIP #82)
 
Advanced Machine Learning for Business Professionals
Advanced Machine Learning for Business ProfessionalsAdvanced Machine Learning for Business Professionals
Advanced Machine Learning for Business Professionals
 
Top 5 Best Data Analytics Courses In Queens
Top 5 Best Data Analytics Courses In QueensTop 5 Best Data Analytics Courses In Queens
Top 5 Best Data Analytics Courses In Queens
 
Thiophen Mechanism khhjjjjjjjhhhhhhhhhhh
Thiophen Mechanism khhjjjjjjjhhhhhhhhhhhThiophen Mechanism khhjjjjjjjhhhhhhhhhhh
Thiophen Mechanism khhjjjjjjjhhhhhhhhhhh
 
FAIR, FAIRsharing, FAIR Cookbook and ELIXIR - Sansone SA - Boston 2024
FAIR, FAIRsharing, FAIR Cookbook and ELIXIR - Sansone SA - Boston 2024FAIR, FAIRsharing, FAIR Cookbook and ELIXIR - Sansone SA - Boston 2024
FAIR, FAIRsharing, FAIR Cookbook and ELIXIR - Sansone SA - Boston 2024
 
办美国阿肯色大学小石城分校毕业证成绩单pdf电子版制作修改#真实留信入库#永久存档#真实可查#diploma#degree
办美国阿肯色大学小石城分校毕业证成绩单pdf电子版制作修改#真实留信入库#永久存档#真实可查#diploma#degree办美国阿肯色大学小石城分校毕业证成绩单pdf电子版制作修改#真实留信入库#永久存档#真实可查#diploma#degree
办美国阿肯色大学小石城分校毕业证成绩单pdf电子版制作修改#真实留信入库#永久存档#真实可查#diploma#degree
 
Bank Loan Approval Analysis: A Comprehensive Data Analysis Project
Bank Loan Approval Analysis: A Comprehensive Data Analysis ProjectBank Loan Approval Analysis: A Comprehensive Data Analysis Project
Bank Loan Approval Analysis: A Comprehensive Data Analysis Project
 

ETL Patterns with Postgres

  • 1. ETL Patterns with Postgres @martin_loetzsch Dr. Martin Loetzsch Crunch 2017
  • 2. SQL as data processing language @martin_loetzsch 2
  • 3. Target of computation 
 CREATE TABLE m_dim_next.region (
 region_id SMALLINT PRIMARY KEY,
 region_name TEXT NOT NULL UNIQUE,
 country_id SMALLINT NOT NULL,
 country_name TEXT NOT NULL,
 _region_name TEXT NOT NULL
 );
 
 Do computation and store result in table 
 WITH raw_region AS (SELECT DISTINCT country,
 region
 FROM m_data.ga_session
 ORDER BY country, region)
 
 INSERT INTO m_dim_next.region SELECT row_number() OVER (ORDER BY country, region ) AS region_id,
 CASE WHEN (SELECT count(DISTINCT country) FROM raw_region r2 WHERE r2.region = r1.region) > 1 THEN region || ' / ' || country ELSE region END AS region_name, dense_rank() OVER (ORDER BY country) AS country_id, country AS country_name, region AS _region_name FROM raw_region r1;
 INSERT INTO m_dim_next.region VALUES (-1, 'Unknown', -1, 'Unknown', 'Unknown');
 Speedup subsequent transformations 
 SELECT util.add_index( 'm_dim_next', 'region', column_names := ARRAY ['_region_name', ‘country_name', 'region_id']);
 
 SELECT util.add_index( 'm_dim_next', 'region', column_names := ARRAY ['country_id', 'region_id']);
 
 ANALYZE m_dim_next.region; 3 Leave data in Postgres, run SQL queries @martin_loetzsch Tables as (intermediate) results of processing steps
  • 4. At the beginning of a data integration pipeline 
 DROP SCHEMA IF EXISTS m_dim_next CASCADE;
 CREATE SCHEMA m_dim_next;
 
 DROP SCHEMA IF EXISTS m_tmp CASCADE; CREATE SCHEMA m_tmp;
 
 
 Do all transformations 
 CREATE TABLE m_tmp.foo
 AS SELECT n FROM generate_series(0, 10000) n;
 CREATE TABLE m_dim_next.foo AS SELECT n, n + 1 FROM m_tmp.foo;
 -- ..
 At the end of pipeline, after checks 
 DROP SCHEMA IF EXISTS m_dim CASCADE; ALTER SCHEMA m_dim_next RENAME TO m_dim;
 
 
 
 
 
 Build m_dim_next schema while users are seeing m_dim - Data shown in frontend is always consistent - When ETL breaks, data just gets old
 Explicit operations determine structure and content of DWH - Very developer / git friendly - Requirement: Must be possible to rebuild DWH from sources - Add incremental loading / processing when there is a pain 4 Drop & rebuild @martin_loetzsch Don’t bother with migrations
  • 5. Renaming / dropping schemas requires exclusive locks 
 CREATE FUNCTION util.cancel_queries_on_schema(schema TEXT) RETURNS BOOLEAN AS $$ SELECT pg_cancel_backend(pid) FROM (SELECT DISTINCT pid FROM pg_locks JOIN pg_database ON database = pg_database.oid JOIN pg_class ON pg_class.oid = relation JOIN pg_namespace ON relnamespace = pg_namespace.oid WHERE datname = current_database() AND nspname = schema AND pid != pg_backend_pid()) t; $$ LANGUAGE SQL;
 
 
 First rename, then drop 
 CREATE FUNCTION util.replace_schema(schemaname TEXT, replace_with TEXT) RETURNS VOID AS $$ BEGIN PERFORM util.cancel_queries_on_schema(schemaname); IF EXISTS(SELECT * FROM information_schema.schemata s WHERE s.schema_name = schemaname) THEN EXECUTE 'ALTER SCHEMA ' || schemaname || ' RENAME TO ' || schemaname || '_old;'; END IF; EXECUTE 'ALTER SCHEMA ' || replace_with || ' RENAME TO ' || schemaname || ';';
 -- again, for good measure PERFORM util.cancel_queries_on_schema(schemaname);
 EXECUTE 'DROP SCHEMA IF EXISTS ' || schemaname || '_old CASCADE;'; END; $$ LANGUAGE 'plpgsql'
 
 
 
 
 Nice one-liner 
 SELECT util.replace_schema('m_dim', 'm_dim_next'); 5 Atomic & robust schema switches @martin_loetzsch Transactional DDL FTW
  • 6. Each pipeline maintains their own schemas - Never write to schemas of other pipelines - Only read from “public” schemas of other pipelines
 
 Project A schema name conventions mp_dim: public tables of pipeline my-pipeline mp_dim_next: schema for building the next version of mp_dim mp_tmp: temporary tables that are not considered public mp_data: tables that are not always dropped (for incremental loading)
 
 Optional: copy “public” schema from ETL db to frontend db 6 Schemas & pipelines @martin_loetzsch Sometimes confusing for analysts
  • 7. 128 GB machine, highly overcommitted memory 
 max_connections = 200
 temp_buffers = 2GB work_mem = 2GB
 
 
 
 Postgres gracefully restarts when it can’t allocate more memory 
 LOG: unexpected EOF on client connection with an open transaction LOG: server process (PID 17697) was terminated by signal 9: Killed DETAIL: Failed process was running: SELECT m_tmp.index_tmp_touchpoint(); LOG: terminating any other active server processes WARNING: terminating connection because of crash of another server process DETAIL: The postmaster has commanded this server process to roll back the current transaction and exit, because another server process exited abnormally and possibly corrupted shared memory. HINT: In a moment you should be able to reconnect to the database and repeat your command. DROP SCHEMA x CASCADE; 
 max_locks_per_transaction = 4000
 
 
 
 
 
 Speed up IO by crippling WAL 
 wal_level = minimal fsync = off synchronous_commit = off full_page_writes = off wal_buffers = -1 No protection against hardware failure! 7 Tuning Postgres for drop & rebuild @martin_loetzsch Not recommended for any other use case
  • 8. Running queries 
 cat query.sql 
 | PGOPTIONS=--client-min-messages=warning psql --no-psqlrc 
 --set ON_ERROR_STOP=on etl_db
 
 echo "SELECT s.my_function()" 
 | PGOPTIONS=--client-min-messages=warning psql --no-psqlrc 
 --set ON_ERROR_STOP=on etl_db
 
 
 
 
 Loading files 
 unzip -p data.csv 
 | python mapper_script.py 
 | PGOPTIONS=--client-min-messages=warning psql --no-psqlrc 
 --set ON_ERROR_STOP=on etl_db --command="COPY s.target_table FROM STDIN"
 Loading from other databases 
 cat source-query.sql 
 | mysql --skip-column-names source_db 
 | PGOPTIONS=--client-min-messages=warning psql --no-psqlrc 
 --set ON_ERROR_STOP=on etl_db --command="COPY s.target_table FROM STDIN WITH NULL AS 'NULL'"
 
 
 
 
 
 Don’t use ORMS / client libraries
 Use “/usr/bin/env bash -o pipefail” as shell 8 Main interface to Postgres: psql @martin_loetzsch Hassle-free & fast
  • 10. It’s easy to make mistakes during ETL 
 DROP SCHEMA IF EXISTS s CASCADE; CREATE SCHEMA s;
 
 CREATE TABLE s.city ( city_id SMALLINT, city_name TEXT, country_name TEXT );
 INSERT INTO s.city VALUES (1, 'Berlin', 'Germany'), (2, 'Budapest', 'Hungary');
 
 CREATE TABLE s.customer ( customer_id BIGINT, city_fk SMALLINT );
 INSERT INTO s.customer VALUES (1, 1), (1, 2), (2, 3);
 Customers per country? 
 SELECT country_name, count(*) AS number_of_customers FROM s.customer JOIN s.city 
 ON customer.city_fk = s.city.city_id GROUP BY country_name;
 
 Back up all assumptions about data by constraints 
 ALTER TABLE s.city ADD PRIMARY KEY (city_id); ALTER TABLE s.city ADD UNIQUE (city_name); ALTER TABLE s.city ADD UNIQUE (city_name, country_name); 
 ALTER TABLE s.customer ADD PRIMARY KEY (customer_id); [23505] ERROR: could not create unique index "customer_pkey" Detail: Key (customer_id)=(1) is duplicated.
 ALTER TABLE s.customer ADD FOREIGN KEY (city_fk) REFERENCES s.city (city_id); [23503] ERROR: insert or update on table "customer" violates foreign key constraint "customer_city_fk_fkey" Detail: Key (city_fk)=(3) is not present in table "city" 10 Referential consistency @martin_loetzsch Only very little overhead, will save your ass
  • 11. 10/18/2017 2017-10-18-dwh-schema-pav.svg customer customer_id first_order_fk favourite_product_fk lifetime_revenue product product_id revenue_last_6_months order order_id processed_order_id customer_fk product_fk revenue Never repeat “business logic” 
 SELECT sum(total_price) AS revenue FROM os_data.order WHERE status IN ('pending', 'accepted', 'completed',
 'proposal_for_change'); 
 
 SELECT CASE WHEN (status <> 'started' AND payment_status = 'authorised' AND order_type <> 'backend') THEN o.order_id END AS processed_order_fk FROM os_data.order;
 
 SELECT (last_status = 'pending') :: INTEGER AS is_unprocessed FROM os_data.order;
 
 
 
 
 Refactor pipeline Create separate task that computes everything we know about an order Usually difficult in real life
 
 
 
 
 
 Load → preprocess → transform → flatten-fact 11 Computational consistency @martin_loetzsch Requires discipline load-product load-order load-customer preprocess-product preprocess-order preprocess-customer transform-product transform-order transform-customer flatten-product-fact flatten-order-fact flatten-customer-fact
  • 12. CREATE FUNCTION m_tmp.normalize_utm_source(TEXT) RETURNS TEXT AS $$ SELECT CASE WHEN $1 LIKE '%.%' THEN lower($1) WHEN $1 = '(direct)' THEN 'Direct' WHEN $1 LIKE 'Untracked%' OR $1 LIKE '(%)' THEN $1 ELSE initcap($1) END; $$ LANGUAGE SQL IMMUTABLE;
 CREATE FUNCTION util.norm_phone_number(phone_number TEXT) RETURNS TEXT AS $$ BEGIN phone_number := TRIM(phone_number); phone_number := regexp_replace(phone_number, '(0)', ''); phone_number := regexp_replace(phone_number, '[^[:digit:]]', '', 'g'); phone_number := regexp_replace(phone_number, '^(+49|0049|49)', '0'); phone_number := regexp_replace(phone_number, '^(00)', ''); phone_number := COALESCE(phone_number, ''); RETURN phone_number; END; $$ LANGUAGE PLPGSQL IMMUTABLE;
 CREATE FUNCTION m_tmp.compute_ad_id(id BIGINT, api m_tmp.API) RETURNS BIGINT AS $$ -- creates a collision free ad id from an id in a source system SELECT ((CASE api WHEN 'adwords' THEN 1 WHEN 'bing' THEN 2 WHEN 'criteo' THEN 3 WHEN 'facebook' THEN 4 WHEN 'backend' THEN 5 END) * 10 ^ 18) :: BIGINT + id $$ LANGUAGE SQL IMMUTABLE;
 
 
 CREATE FUNCTION pv.date_to_supplier_period_start(INTEGER) RETURNS INTEGER AS $$ -- this maps all dates to either a integer which is included -- in lieferantenrabatt.period_start or -- null (meaning we don't have a lieferantenrabatt for it) SELECT CASE WHEN $1 >= 20170501 THEN 20170501 WHEN $1 >= 20151231 THEN 20151231 ELSE 20151231 END; $$ LANGUAGE SQL IMMUTABLE; 12 When not possible: use functions @martin_loetzsch Almost no performance overhead
  • 13. Check for “lost” rows 
 SELECT util.assert_equal( 'The order items fact table should contain all order items', 'SELECT count(*) FROM os_dim.order_item', 'SELECT count(*) FROM os_dim.order_items_fact');
 
 
 
 Check consistency across cubes / domains 
 SELECT util.assert_almost_equal( 'The number of first orders should be the same in '
 || 'orders and marketing touchpoints cube', 'SELECT count(net_order_id) FROM os_dim.order WHERE _net_order_rank = 1;',
 'SELECT (SELECT sum(number_of_first_net_orders) FROM m_dim.acquisition_performance) / (SELECT count(*) FROM m_dim.performance_attribution_model)', 1.0 );
 Check completeness of source data 
 SELECT util.assert_not_found( 'Each adwords campaign must have the attribute "Channel"', 'SELECT DISTINCT campaign_name, account_name FROM aw_tmp.ad JOIN aw_dim.ad_performance ON ad_fk = ad_id WHERE attributes->>''Channel'' IS NULL AND impressions > 0 AND _date > now() - INTERVAL ''30 days''');
 
 Check correctness of redistribution transformations 
 SELECT util.assert_almost_equal_relative( 'The cost of non-converting touchpoints must match the' || 'redistributed customer acquisition and reactivation cost', 'SELECT sum(cost) FROM m_tmp.cost_of_non_converting_touchpoints;', 'SELECT (SELECT sum(cost_per_touchpoint * number_of_touchpoints) FROM m_tmp.redistributed_customer_acquisition_cost) + (SELECT sum(cost_per_touchpoint * number_of_touchpoints) FROM m_tmp.redistributed_customer_reactivation_cost);', 0.00001); 13 Data consistency checks @martin_loetzsch Makes changing things easy
  • 14. Execute queries and compare results
 CREATE FUNCTION util.assert(description TEXT, query TEXT) RETURNS BOOLEAN AS $$ DECLARE succeeded BOOLEAN; BEGIN EXECUTE query INTO succeeded; IF NOT succeeded THEN RAISE EXCEPTION 'assertion failed: # % # %', description, query; END IF; RETURN succeeded; END $$ LANGUAGE 'plpgsql';
 
 
 
 CREATE FUNCTION util.assert_almost_equal_relative( description TEXT, query1 TEXT, query2 TEXT, percentage DECIMAL) RETURNS BOOLEAN AS $$ DECLARE result1 NUMERIC; result2 NUMERIC; succeeded BOOLEAN; BEGIN EXECUTE query1 INTO result1; EXECUTE query2 INTO result2; EXECUTE 'SELECT abs(' || result2 || ' - ' || result1 || ') / ' || result1 || ' < ' || percentage INTO succeeded; IF NOT succeeded THEN RAISE WARNING '% assertion failed: abs(% - %) / % < % %: (%) %: (%)', description, result2, result1, result1, percentage, result1, query1, result2, query2; END IF; RETURN succeeded; END $$ LANGUAGE 'plpgsql'; 14 Consistency check functions @martin_loetzsch Also: assert_not_found, assert_equal_table, assert_smaller_than_or_equal
  • 15. Yes, unit tests SELECT util.assert_value_equal('test_german_number_with_country_prefix', util.norm_phone_number('00491234'), '01234'); SELECT util.assert_value_equal('test_german_number_not_to_be_confused_with_country_prefix', util.norm_phone_number('0491234'), '0491234'); SELECT util.assert_value_equal('test_non_german_number_with_plus', util.norm_phone_number('+44 1234'), '441234'); SELECT util.assert_value_equal('test_german_number_with_prefix_and_additional_zero', util.norm_phone_number('+49 (0)1234'), '01234'); SELECT util.assert_value_equal('test__trim', util.norm_phone_number(' 0491234 '), '0491234'); SELECT util.assert_value_equal('test_number_with_leading_wildcard_symbol', util.norm_phone_number('*+436504834933'), '436504834933'); SELECT util.assert_value_equal('test_NULL', util.norm_phone_number(NULL), ''); SELECT util.assert_value_equal('test_empty', util.norm_phone_number(''), ''); SELECT util.assert_value_equal('test_wildcard_only', util.norm_phone_number('*'), ''); SELECT util.assert_value_equal('test_foreign_number_with_two_leading_zeroes', util.norm_phone_number('*00436769553701'), '436769553701'); SELECT util.assert_value_equal('test_domestic_number_with_trailing_letters', util.norm_phone_number('017678402HORST'), '017678402'); SELECT util.assert_value_equal('test_domestic_number_with_leading_letters', util.norm_phone_number('HORST017678402'), '017678402'); SELECT util.assert_value_equal('test_domestic_number_with_letters_in_between', util.norm_phone_number('0H1O7R6S7T8402'), '017678402'); SELECT util.assert_value_equal('test_german_number_with_country_prefix_and_leading_letters', util.norm_phone_number('HORST00491234'), '01234'); SELECT util.assert_value_equal(‘test_german_number_not_to_be_confused_with_country_prefix_and_leading_letters', util.norm_phone_number('HORST0491234'), '0491234'); SELECT util.assert_value_equal('test_non_german_number_with_plus_and_leading_letters', util.norm_phone_number('HORST+44 1234'), '441234'); SELECT util.assert_value_equal('test_german_number_with_prefix_and_additional_zero_and_leading_letters', util.norm_phone_number('HORST+49 (0)1234'), ‘01234'); 15 Unit tests @martin_loetzsch People enter horrible telephone numbers into websites
  • 16. Divide & conquer strategies @martin_loetzsch 16
  • 17. Create two tables, fill with 10M values 
 DROP SCHEMA IF EXISTS s CASCADE; CREATE SCHEMA s; 
 
 CREATE TABLE s.table_1 ( user_id BIGINT, some_number INTEGER); 
 INSERT INTO s.table_1 SELECT n, n FROM generate_series(1, 10000000) n; 
 
 CREATE TABLE s.table_2 AS SELECT * FROM s.table_1;
 Join both tables 
 EXPLAIN ANALYZE SELECT * FROM s.table_1 JOIN s.table_2 USING (user_id);
 
 Let’s assume 18 seconds is slow 
 Merge Join (rows=10000000)
 Merge Cond: (table_1.user_id = table_2.user_id)
 -> Sort (rows=10000000)
 Sort Key: table_1.user_id
 Sort Method: external merge Disk: 215032kB
 -> Seq Scan on table_1 (rows=10000000)
 -> Materialize (rows=10000000)
 -> Sort (rows=10000000)
 Sort Key: table_2.user_id
 Sort Method: external merge Disk: 254144kB
 -> Seq Scan on table_2 (rows=10000000)
 Planning time: 0.700 ms
 Execution time: 18278.520 ms 17 Joining large tables can be slow @martin_loetzsch Think of: join all user touchpoints with all user orders
  • 18. Create table_1 with 5 partitions 
 DROP SCHEMA IF EXISTS s CASCADE; CREATE SCHEMA s; 
 CREATE TABLE s.table_1 ( user_id BIGINT NOT NULL, user_chunk SMALLINT NOT NULL, some_number INTEGER NOT NULL); 
 
 CREATE TABLE s.table_1_0 (CHECK (user_chunk = 0)) INHERITS (s.table_1); 
 CREATE TABLE s.table_1_1 (CHECK (user_chunk = 1)) INHERITS (s.table_1); 
 CREATE TABLE s.table_1_2 (CHECK (user_chunk = 2)) INHERITS (s.table_1); 
 CREATE TABLE s.table_1_3 (CHECK (user_chunk = 3)) INHERITS (s.table_1); 
 CREATE TABLE s.table_1_4 (CHECK (user_chunk = 4)) INHERITS (s.table_1);
 Insert directly into partition (don’t use triggers etc.) 
 INSERT INTO s.table_1_0 SELECT n, n % 5, n from generate_series(0, 10000000, 5) n; 
 Automate insertion 
 CREATE FUNCTION s.insert_data(table_name TEXT, chunk SMALLINT) RETURNS VOID AS $$ BEGIN EXECUTE 'INSERT INTO ' || table_name || '_' || chunk || ' SELECT n, n % 5, n from generate_series(‘ || chunk || ', 10000000, 5) n'; EXECUTE 'ANALYZE ' || table_name || '_' || chunk; END; $$ LANGUAGE plpgsql;
 Run in parallel (depends on ETL framework) 
 SELECT s.insert_data('s.table_1', 1 :: SMALLINT); SELECT s.insert_data('s.table_1', 2 :: SMALLINT); SELECT s.insert_data('s.table_1', 3 :: SMALLINT); SELECT s.insert_data('s.table_1', 4 :: SMALLINT); 18 Splitting data in chunks / partitions @martin_loetzsch user chunk = user_id % 5
  • 19. Build DDL statement and execute it 
 CREATE FUNCTION s.create_table_partitions(schemaname TEXT, tablename TEXT, key_column TEXT, keys TEXT[]) RETURNS VOID AS $$ DECLARE key TEXT; BEGIN FOREACH KEY IN ARRAY keys LOOP IF NOT EXISTS(SELECT 1 FROM information_schema.tables t WHERE t.table_schema = schemaname AND t.table_name = tablename || '_' || key) THEN EXECUTE 'CREATE TABLE ' || schemaname || '.' || tablename || '_' || key || ' ( CHECK (' || key_column || ' = ' || key || ') ) INHERITS (' || schemaname || '.' || tablename || ');'; END IF; END LOOP; END $$ LANGUAGE plpgsql;
 Create table_2 
 CREATE TABLE s.table_2 ( user_id BIGINT NOT NULL, user_chunk SMALLINT NOT NULL, some_number INTEGER NOT NULL ); 
 SELECT s.create_table_partitions( 's', 'table_2', 'user_chunk', (SELECT array(SELECT n :: TEXT FROM generate_series(0, 4) n)));
 
 
 
 
 Run in parallel (depends on ETL framework): 
 SELECT s.insert_data('s.table_2', 0 :: SMALLINT); SELECT s.insert_data('s.table_2', 1 :: SMALLINT); SELECT s.insert_data('s.table_2', 2 :: SMALLINT); SELECT s.insert_data('s.table_2', 3 :: SMALLINT); SELECT s.insert_data('s.table_2', 4 :: SMALLINT); 19 Automating partition management @martin_loetzsch Also great: pg_partman, Postgres 10 declarative partitioning
  • 20. Join table_1 and table_2 for chunk 0 
 EXPLAIN ANALYZE SELECT user_id, 0 AS user_chunk, table_1.some_number + table_2.some_number AS sum FROM s.table_1 JOIN s.table_2 USING (user_id) WHERE table_1.user_chunk = 0 AND table_2.user_chunk = 0;
 Similar to original join (18 seconds), but almost 5x faster 
 Merge Join Merge Cond: (table_1.user_id = table_2.user_id) -> Sort (rows=2000001) Sort Key: table_1.user_id Sort Method: external merge Disk: 43008kB -> Append (rows=2000001) -> Seq Scan on table_1 (rows=0) Filter: (user_chunk = 0) -> Seq Scan on table_1_0 (rows=2000001) Filter: (user_chunk = 0) -> Sort (rows=2000001) Sort Key: table_2.user_id Sort Method: external sort Disk: 58664kB -> Append (rows=2000001) -> Seq Scan on table_2 (rows=0) Filter: (user_chunk = 0) -> Seq Scan on table_2_0 (rows=2000001) Filter: (user_chunk = 0) Planning time: 5.171 ms Execution time: 3901.282 ms 20 Querying chunked data @martin_loetzsch
  • 21. No chunk restriction on table_2 
 EXPLAIN ANALYZE SELECT user_id, 0 AS user_chunk, table_1.some_number + table_2.some_number AS sum FROM s.table_1 JOIN s.table_2 USING (user_id) WHERE table_1.user_chunk = 0;
 Append operator quite costly 
 Merge Join (rows=2000001) Merge Cond: (table_1.user_id = table_2.user_id) -> Sort (rows=2000001) Sort Key: table_1.user_id Sort Method: external merge Disk: 43008kB -> Append (rows=2000001) -> Seq Scan on table_1 (rows=0) Filter: (user_chunk = 0) -> Seq Scan on table_1_0 (rows=2000001) Filter: (user_chunk = 0) -> Materialize (rows=10000001) -> Sort (rows=10000001) Sort Key: table_2.user_id Sort Method: external merge Disk: 254144kB -> Append (rows=10000001) -> Seq Scan on table_2 (rows=0) -> Seq Scan on table_2_0 (rows=2000001) -> Seq Scan on table_2_1 (rows=2000000) -> Seq Scan on table_2_2 (rows=2000000) -> Seq Scan on table_2_3 (rows=2000000) -> Seq Scan on table_2_4 (rows=2000000) Planning time: 0.371 ms Execution time: 11654.858 ms 21 Common mistake @martin_loetzsch Check for sequence scans on complete partitioned tables
  • 22. Create partitioned table for target of computation 
 CREATE TABLE s.table_3 ( user_id BIGINT NOT NULL, user_chunk SMALLINT NOT NULL, sum INTEGER NOT NULL);
 SELECT s.create_table_partitions( 's', 'table_3', 'user_chunk', (SELECT array(SELECT n :: TEXT FROM generate_series(0, 4) n)));
 A function for the computation 
 CREATE FUNCTION s.join_table_1_and_table_2(chunk SMALLINT) RETURNS SETOF s.table_3 AS $$ BEGIN RETURN QUERY SELECT user_id, table_1.some_number + table_2.some_number AS sum, chunk FROM s.table_1 JOIN s.table_2 USING (user_id) WHERE table_1.user_chunk = chunk AND table_2.user_chunk = chunk; END $$ LANGUAGE plpgsql; Insert from a separate function 
 CREATE OR REPLACE FUNCTION s.insert_table_3(chunk INTEGER) RETURNS VOID AS $$ BEGIN EXECUTE 'INSERT INTO s.table_3_' || chunk || ' (SELECT * FROM s.join_table_1_and_table_2(' || chunk || '::SMALLINT))'; END $$ LANGUAGE plpgsql;
 
 
 
 
 
 Run in parallel (depends on ETL framework) 
 SELECT s.insert_table_3(0); SELECT s.insert_table_3(1); SELECT s.insert_table_3(2); SELECT s.insert_table_3(3); SELECT s.insert_table_3(4); 22 Using functions for chunked processing @martin_loetzsch Our best practices, depends on ETL framework
  • 23. read ga session map ga visitor preprocess adwords ad preprocess criteo campaign preprocess facebook ad set transform device transform host transform landing page transform region update campaign tree preprocess touchpoint preprocess ad preprocess campaign tree map manual cost transform ad attribute transform ad transform acquisition performance transform reactivation performance count converting touchpoints transform marketing cost compute first touchpoint create performance attribution model read manual cost flatten touchpoints fact flatten marketing cost fact compute redistributed customer acquisition cost compute redistributed customer reactivation cost collect non converting cost compute touchpoint cost transform touchpoint Choose chunks depending on things that are processed together Use secondary indexes on tables that are processed in different chunks Re-chunk on chunk “borders”
 
 Real world example 
 CREATE FUNCTION m_tmp.insert_dim_touchpoint(visitor_chunk SMALLINT) RETURNS VOID AS $$ DECLARE day_chunk SMALLINT; BEGIN FOR day_chunk IN (SELECT util.get_all_chunks()) LOOP EXECUTE 'INSERT INTO m_dim_next.touchpoint_' || visitor_chunk || ' (SELECT * FROM m_tmp.transform_touchpoint(' || visitor_chunk || '::SMALLINT, ' || day_chunk || '::SMALLINT))'; END LOOP; END $$ LANGUAGE plpgsql; 23 Chunking all the way down @martin_loetzsch Takes discipline day day_chunk visitor_chunk attribution_model
  • 24. Make changing chunk size easy 
 CREATE FUNCTION s.get_number_of_chunks() RETURNS SMALLINT AS $$ SELECT 5 :: SMALLINT; $$ LANGUAGE SQL IMMUTABLE; 
 CREATE FUNCTION s.get_all_chunks() RETURNS SETOF INTEGER AS $$ SELECT generate_series(0, s.get_number_of_chunks() - 1); $$ LANGUAGE SQL IMMUTABLE; 
 CREATE FUNCTION s.compute_chunk(x BIGINT) RETURNS SMALLINT AS $$ SELECT coalesce(abs(x) % s.get_number_of_chunks(), 0) :: SMALLINT; $$ LANGUAGE SQL IMMUTABLE; Inline chunk computation 
 DROP FUNCTION s.compute_chunk( BIGINT ); 
 DO $$ BEGIN EXECUTE ' CREATE FUNCTION s.compute_chunk(x BIGINT) RETURNS SMALLINT AS $_$ SELECT coalesce(abs(x) % ' || s.get_number_of_chunks() || ', 0)::SMALLINT; $_$ LANGUAGE SQL IMMUTABLE;'; END $$; 24 Configurable chunking @martin_loetzsch Integrate with ETL framework
  • 25. Becoming friends with the query planner @martin_loetzsch 25
  • 26. Join a table with 10M values with a table with 10K values 
 DROP SCHEMA IF EXISTS s CASCADE; CREATE SCHEMA s; 
 
 CREATE TABLE s.table_1 AS SELECT a FROM generate_series(1, 10000) a; 
 CREATE TABLE s.table_2 AS SELECT a FROM generate_series(1, 10000000) a; 
 CREATE INDEX table_2__a ON s.table_2 (a); 
 
 EXPLAIN ANALYZE SELECT * FROM s.table_2 JOIN s.table_1 USING (a);
 Index on table_2 is not used 
 Merge Join (rows=573750000) (actual rows=10000) Merge Cond: (table_1.a = table_2.a) -> Sort (rows=11475) (actual rows=10000) Sort Key: table_1.a Sort Method: quicksort Memory: 853kB -> Seq Scan on table_1 (rows=11475) (actual rows=10000) -> Materialize (rows=10000000) (actual rows=10001) -> Sort (rows=10000000) (actual rows=10001) Sort Key: table_2.a Sort Method: external merge Disk: 136816kB -> Seq Scan on table_2 (rows=10000000) (actual rows=10000000) Planning time: 2.173 ms Execution time: 3071.127 ms
 
 The query planner didn’t know that table_2 has much more distinct values than table_1 Statistics about cardinality and distribution of data are collected by asynchronous autovacuum daemon 26 Nondeterministic behavior? @martin_loetzsch Queries sometimes don’t terminate when run in ETL, but look fine when run separately
  • 27. Manually collect data statistics before using new tables 
 DROP SCHEMA IF EXISTS s CASCADE; CREATE SCHEMA s; 
 
 CREATE TABLE s.table_1 AS SELECT a FROM generate_series(1, 10000) a; 
 CREATE TABLE s.table_2 AS SELECT a FROM generate_series(1, 10000000) a; 
 CREATE INDEX table_2__a ON s.table_2 (a); 
 
 ANALYZE s.table_1; ANALYZE s.table_2; 
 
 EXPLAIN ANALYZE SELECT * FROM s.table_2 JOIN s.table_1 USING (a);
 > 300 x faster 
 Merge Join (rows=10000) (actual rows=10000) Merge Cond: (table_2.a = table_1.a) -> Index Only Scan using table_2__a on table_2 (rows=9999985) (actual rows=10001) Heap Fetches: 10001 -> Sort (rows=10000) (actual rows=10000) Sort Key: table_1.a Sort Method: quicksort Memory: 853kB -> Seq Scan on table_1 (rows=10000) (rows=10000) Planning time: 1.759 ms Execution time: 9.287 ms 27 Analyze all the things @martin_loetzsch Always analyze newly created tables
  • 28. Add to postgresql.conf session_preload_libraries = 'auto_explain' auto_explain.log_min_duration = 10 auto_explain.log_nested_statements = true auto_explain.log_verbose = true auto_explain.log_analyze = true debug_pretty_print = on log_lock_waits = on Carefully manage log file size 28 Auto-explain everything @martin_loetzsch tail -f /path/to/postgres.log
  • 29. Columnar storage in Postgres @martin_loetzsch 29
  • 31. Initialize extension 
 CREATE EXTENSION IF NOT EXISTS cstore_fdw; 
 DO $$ BEGIN IF NOT (SELECT exists(SELECT 1 FROM pg_foreign_server WHERE srvname = 'cstore_server')) THEN CREATE SERVER cstore_server FOREIGN DATA WRAPPER cstore_fdw; END IF; END $$;
 
 Create fact table 
 DROP SCHEMA IF EXISTS s CASCADE; CREATE SCHEMA s; 
 CREATE FOREIGN TABLE s.fact ( fact_id BIGSERIAL, a TEXT, b TEXT ) SERVER cstore_server OPTIONS (compression 'pglz'); Insert 10M rows 
 INSERT INTO s.fact SELECT n, ('a ' || 1 + (4 * random()) :: INTEGER) AS a, ('b ' || 1 + (4 * random()) :: INTEGER) AS b FROM generate_series(0, 10000000) n ORDER BY a, b;
 
 
 
 8% of original size 
 SELECT pg_size_pretty(pg_table_size(’s.fact')); 
 37 MB 31 Save disk space with cstore tables @martin_loetzsch Optimized Row Columnar (ORC) format
  • 32. Typical OLAP query 
 EXPLAIN ANALYZE SELECT a, count(*) AS count FROM s.fact WHERE b = 'b 1' GROUP BY a;
 
 5 x faster than without store 
 HashAggregate (rows=5) Group Key: a -> Foreign Scan on fact (rows=1248701) Filter: (b = 'b 1'::text) Rows Removed by Filter: 41299 CStore File: /postgresql/9.6/cstore_fdw/836136/849220 CStore File Size: 39166772 Planning time: 10.057 ms Execution time: 257.107 ms
 Highly selective query 
 EXPLAIN ANALYZE SELECT count(*) AS count FROM s.fact WHERE a = 'a 1' AND b = 'b 1’;
 
 
 Really fast 
 Aggregate (rows=1) -> Foreign Scan on fact (rows=156676) Filter: ((a = 'a 1'::text) AND (b = 'b 1'::text)) Rows Removed by Filter: 13324 CStore File: /postgresql/9.6/cstore_fdw/836136/849220 CStore File Size: 39166772 Planning time: 5.492 ms Execution time: 31.689 ms 32 Main operation: foreign scan @martin_loetzsch Aggregation pushdown coming soon
  • 34. Consider Postgres for your next ETL project @martin_loetzsch 34
  • 35. Focus on the complexity of data rather than the complexity of technology @martin_loetzsch 35
  • 36. 36 We are open sourcing our BI infrastructure @martin_loetzsch ETL part released end of November 2017
  • 37. 37 Work with us @martin_loetzsch In Berlin, Stockholm, Amsterdam, Rotterdam, Paris, Los Angeles