SlideShare a Scribd company logo
1 of 75
Download to read offline
Skytools: PgQ
  Queues and applications
Agenda
 PgQ basics
    Queues, producers and consumers
    New features in 3.0
 Skytools – toolset and scripting framework
    Custom consumers
    Replication toolset
 Managing the Skytools environment
    Installing
    Migrations and upgrades
    Monitoring
What is PgQ?
 A queue implementation on PostgreSQL with a stored procedure
  interface.
 Events are queued by producers to be subsequently processed by
  consumers.




    producer     event        queue       events       consumer




                           Database
PgQ: what it's good for?
   Asynchronous messaging
   Batch processing
   Replication
   Distributed transactions
PgQ: Features
   Transactional. Events are created transactionally, can be coupled
    with surrounding business logic.
   Efficient. Events are processed in batches which gives low per event
    overhead.
   Flexible. No limits on the number of producers or consumers. Custom
    event formats.
   Reliable. Events are stored in PostgreSQL database – this adds the
    benefit of write ahead logging and crash recovery.
   Easy to use. Simple SQL interface, API-s for several languages.
   Open Source. No licensing fees, but occasionally you'll have to get
    your hands dirty.
PgQ: example
   Database for registering user accounts.
   Events are generated by a pl/pgsql stored procedure.
   Consumers also talk to the queue through stored procedure interface.


                                                                   java consumer:
                                                                   welcome email
                                   UserDB
      producer
     application:
      web client                   function:
                                 create_user                        C++ consumer:
                                                                     user counter

                                             queue:
                                           notifications
                                                                   python consumer:
                                                                    londiste replica
                                       queue:
                                     user_events
PgQ: at Skype
   Used everywhere where asynchronous data processing is needed.
   Hundreds of queues and consumers.
   Centrally monitored.
PgQ: brief history
   Started on 2006
   Inspired by ideas from Slony
   First application was Londiste replication
   Open source since 2007 as part of Skytools framework
   Version 3.0 in progress, alpha version out.
PgQ: glossary
   Event - atomic piece of data created by Producers. In PgQ event is one record in
    one of tables that services that queue. PgQ guarantees that each event is seen at
    least once but it is up to consumer to make sure that event is processed no more
    than once if that is needed.
   Batch - PgQ is designed for efficiency and high throughput so events are grouped
    into batches for bulk processing.
   Queue - Event are stored in queue tables i.e queues. Several producers can write
    into same queue and several consumers can read from the queue. Events are kept
    in queue until all the consumers have seen them.
   Producer - applications that pushes event into queue. Producer can be written in
    any language that is able to run stored procedures in PostgreSQL.
   Consumer - application that reads events from queue. Consumers can be written in
    any language that can interact with PostgreSQL.
PgQ: Queue
   Essentially a set of tables in a         userdb=# select * from pgq.queue;
    PostgreSQL database.                     -[ RECORD 1 ]------------+---
                                             queue_id                 | 1
   Default is to have 3 tables per queue,
    these are rotated to efficiently purge   queue_name               | q1
    discarded events.                        queue_ntables            | 3
                                             queue_cur_table          | 2
   Event is discarded when all the
                                             queue_data_pfx           | pgq.event_1
    consumers have processed it.
                                             ...
   Queues are accessible through stored
                                             userdb=# dt pgq.event_1*
    procedure API. Tools also available.
                                                        List of relations
   There can be multiple queues in one      Schema |     Name   | Type     |   Owner
    database.                                --------+-----------+-------+---------
   Any number of producers and              pgq    | event_1    | table | martinp
    consumers to the queue.                  pgq    | event_1_0 | table | martinp
                                             pgq    | event_1_1 | table | martinp
                                             pgq    | event_1_2 | table | martinp
                                             (4 rows)
PgQ: Queue API
   Creating and dropping queues
      pgq.create_queue(qname)
      pgq.drop_queue(qname)

   Queue information functions
      pgq.get_queue_info(qname)
      pgq.get_consumer_info(qname)
      pgq.current_event_table(qname)

   Managing consumers
      pgq.register_consumer(qname, cname)
      pgq.unregister_consumer(qname, cname)
PgQ: Event
                                    userdb=# d pgq.event_1
                                     Column    |           Type
 A record in the queue table.      -----------+--------------------------
 Internal fields used for event    ev_id      | bigint
  processing.                       ev_time    | timestamp with time zone
                                    ev_txid    | bigint
 Payload data, with user defined   ev_owner   | integer
  content.                          ev_retry   | integer
 The content format is agreed      ev_type    | text
  between producer and              ev_data    | text

  consumer.                         ev_extra1 | text
                                    ev_extra2 | text
 Field names hint at their         ev_extra3 | text
  intended usage.                   ev_extra4 | text
                                    Inherits: pgq.event_template
PgQ: Batch
 Events are grouped into batches for efficient processing.
 Consumers obtain events in batches.
 Batch size can be tuned to suit the application or network topology. For
  example, we might want to use a larger batch size for processing over
  wide area networks.
 Small batches have higher processing overhead, however too big
  batches have their own disadvantages.
 Batches are prepared by separate process called the ticker.
PgQ: Ticker
   Is a daemon that periodically creates ticks on the queues. The tick is
    essentially a position in the event stream.
   A batch is formed of events that are enqueued between two ticks.
   Without ticker there are no batches, without batches events cannot be
    processed.
   Pausing the ticker for extended period will produce a huge batch,
    consumers might not be able to cope with it.
   Ticker is also involved in miscellaneous housekeeping task, such as
    vacuuming pgq tables, scheduling retry events and rotating queue
    tables.
   Keep the ticker running!
PgQ: Consumer
 Subscribes to queue.
 Obtains events from queue by asking for a batch.
 Sees only events that have been produced after the subscription.
 Events are seen at least once – events don't get lost.
 Must have some sort of event tracking to process only once –
  skytools has several implementations.
 If the event cannot be immediately processed it can be postponed for
  retry processing (eg. some resource temporarily unavailable).
PgQ: Event processing
   Ask for next batch id:                     Event structure
    pgq.next_batch(queue, consumer)
 Nothing to do if NULL returned – sleep        Column    |   Type

  and try again.                               -----------+------------
                                               ev_id      | bigint
 Ask the set of events to be returned:        ev_time    | timestamptz
    pgq.get_batch_events(batch_id)             ev_txid    | bigint
 Process the events. Note that the batch      ev_retry   | integer
  can be empty if there were no events for     ev_type    | text

  the period.                                  ev_data    | text
                                               ev_extra1 | text
 Schedule the event for retry processing if   ev_extra2 | text
  necessary:                                   ev_extra3 | text
    pgq.event_retry(batch_id, ev_id, sec)      ev_extra4 | text
   Finalize the batch:
    pgq.finish_batch(batch_id)
PgQ: Consumer status
 Can be obtained by calling pgq.get_consumer_info()
 Reports queue, consumer name, lag and last seen for all of the
  consumers. Parameterized versions also available.
 lag is the age of the last finished batch.
 last seen is the elapsed time since consumer processed a batch.


    userdb=# select * from pgq.get_consumer_info();
    -[ RECORD 1 ]-+----------------
    queue_name    | notifications
    consumer_name | welcome_emailer
    lag           | 02:32:34.440654
    last_seen     | 00:00:00.019398
    last_tick     | 4785
    current_batch | 4754
    next_tick     | 4786
PgQ: Event tracking
   PgQ guarantees that the consumer sees the event at least once. But
    this could mean that the consumer could see the event more than
    once.
   This happens if the consumer crashes in the middle of processing a
    batch.
   Not a problem if the events are processed in the same database that
    they are produced – just keep the processing in the same transaction.
   We need to address the case where events are processed outside a
    database or in a remote database.
   Consumer needs to be able to keep track of which events are already
    processed.
PgQ: Event tracking
   Event and batch tracking support is included in pgq_ext schema – this
    is part of Skytools, but must be installed separately to target database.

   Use batch tracking when the whole batch is processed transactionally
    pgq_ext.is_batch_done(consumer, batch_id)
    pgq_ext.set_batch_done(consumer, batch_id)

   Per-event tracking is used to keep track of single events:
    pgq_ext.is_event_done(consumer, batch_id, ev_id)
    pgq_ext.set_event_done(consumer, batch_id, ev_id)

   Event tracking can be used to implement distributed transactions.
PgQ: distributed transactions
   Event tracking can be used to implement asynchronous distributed
    transactions.
   We'll use batch tracking as an example.
   Skip the batch if it is already processed on target.
   Otherwise process the batch and mark as processed.
   Commit on target.
   Finish batch and commit on source.
   Skytools framework handles this automatically!
PgQ: Producer
   Anything that places events into queues.
   Event payload format agreed between producer and consumer.
   Basic usage via SQL API.
      pgq.insert_event(queue, ev_type, ev_data)
   Replication uses triggers for providing events.
      pgq.sqltriga(queue, options)
      pgq.logutriga(queue, options)
   Bulk load is also possible.
PgQ: sqltriga and logutriga
 Log triggers are used for enqueuing table change log entries. Typically
  used for replication, but have other uses as well.
 Table structure is detected automatically - no messing around with
  column definition lists.
 Arguments:
    SKIP – enqueue only, skip the actual DML operation. Used in
      before-triggers.
    ignore=cols - The listed columns will be omitted from payload.
    pkey=cols - Defines a primary key for the table.
    backup – Add a copy of the original row to payload.
 Event format:
    pgq.sqltriga – partial SQL format used by Londiste replication.
    pgq.logutriga – URL encoded format.
PgQ: logutriga payload
   logutriga uses database specific URL encoding for payload data:
      column1=value1&column2=value2&column3&...
    column names and data values are URL encoded.
    NULL values are specified by omitting value and equal sign.
 Payload:
    ev_type – operation type: I/U/D plus primary key columns.
    ev_data – URL encoded row data
    ev_extra1 – Table name
    ev_extra2 – URL encoded row backup
PgQ: logutriga example
 We'll add a trigger to users table   create table users (
                                       user_id        serial primary key,
  that enqueues notification
                                       username       text     unique,
  events for new users.                password       text     not null,
 Add after-insert trigger that        email          text,
  executes logutriga with some         date_created   timestamp default now()
  columns ignored.                     );

 We could use before-insert           create trigger welcome_user_trg
  triggers with SKIP option to         after insert on users
  implement queue only tables.         for each row execute procedure
                                       pgq.logutriga(
                                            'notifications',
                                            'ignore=password');
PgQ: logutriga in action

Insert statement      Event data

insert into users (   ev_id      | 26
     username,        ev_time    | 2009-05-14 11:07:54.231954+02
     password,        ev_txid    | 263834
     email            ev_owner   |
) values (            ev_retry   |
     'bob',           ev_type    | I:user_id
     'secret',        ev_data    | user_id=1&username=bob&email=bob%40foo.bar
     'bob@foo.bar'    ev_extra1 | public.users
);                    ev_extra2 |
                      ev_extra3 |
                      ev_extra4 |
Skytools 3: new features
 Cooperative consumer – distributing the load of a single consumer
  between cooperating sub consumers.
 Cascading support – identical queue is maintained across nodes,
  consumers can easily move between nodes.
 Per-database tickers replaced with a single pgqd daemon.
 qadmin utility for managing queues and consumers.
PgQ: Cooperative consumer
   Sometimes single consumer cannot keep up with the volume of
    incoming events.
   Parallel consumers would help, but we need to somehow divide the
    workload – avoid processing the same event twice.
   We need the consumers to work in cooperation!
   Skytools 3 introduces sub consumers for the purpose. These share the
    workload of the master consumer in a cooperative manner.
   There are some differences in registration and batch handling, but they
    look a lot like regular consumers:

    pgq_coop.register_subconsumer(qname,cname,scname)
    pgq_coop.unregister_subconsumer(qname,cname,scname,mode)
    pgq_coop.next_batch(qname,cname,scname)
    pgq_coop.finish_batch(batch_id)
PgQ: Cascading
 The cascade is a set of database
  nodes and a queue that is distributed       root     branch 1
  between the nodes. Event and batch           q1         q1
  numbers are kept identical!
 The cascade can be depicted as a
  tree, where events created in the root
  are propagated down the cascade to
  other nodes.                                 q1
                                                         leaf
 There can be only one root node, but      branch 2

  any number of branches or leaves.
 Leaf nodes are specific to replication.
  They don't have a copy of the queue
  and don't participate in event
  propagation.
PgQ: Cascading
 Typical replication cascade would consist of a primary database -- the
  root, replicated to standby database – a branch.
 We can easily switch the replication roles of root and branch nodes.
  Consumers will continue as if nothing happened.
 On branch node failure we move its consumers to some surviving
  node. Business as usual.
 On root node failure we promote some surviving branch to root and
  reattach the consumers. In this scenario we have to deal with data
  loss.
PgQ wrapup
   Producers produce events into queues.
   Ticker groups events into batches.
   Batches are served to consumers in FIFO order.
   Consumers can track processed events with pgq_ext.

                                                                pgq_ext
        ticker
                           tick


                                   queue 1
                    tick
                                                               event tracking



       producer    event           queue 2   batch of events    consumer


                                  Database
Skytools
Toolset and scripting framework
Skytools: introduction
   Set of applications for replication, batch processing and queue
    management.
   Includes a Python scripting framework that greatly simplifies the
    implementation of custom consumers.
   Written mostly in Python with some bits in C.
   PgQ is distributed as part of Skytools.
   Get it from http://pgfoundry.org/projects/skytools/
   We'll start by configuring the ticker.
The Ticker
 The ticker prepares batches for the consumers.
 Also performs queue maintenance: table rotation, vacuuming, re-
  queuing retry events.
 We'll need to configure it for each database that has queues.
 Skytools 3 includes a super-ticker that handles all the databases in a
  PostgreSQL cluster.
pgqadm.py: configuration file
 db – the name of the database      [pgqadm]
                                     job_name = pgqadm_userdb
  where queues are located.          db = dbname=userdb
 maint_delay_min – interval,        # how often to run maintenance [minutes]
  at which maintenance               maint_delay_min = 5
  commands are run.                  # how often to check for activity [secs]
                                     loop_delay = 0.5
 loop_delay – interval, at which
  the ticker wakes up to check for   logfile = log/%(job_name)s.log
                                     pidfile = pid/%(job_name)s.pid
  work.
pgqadm.py: installing and starting
   Install pgq schema
   Start the ticker in background
   Repeat the process for all databases with queues.
$ pgqadm.py pgqadm_userdb.ini   install
2009-05-13 12:37:17,913 19376   INFO plpgsql is installed
2009-05-13 12:37:17,936 19376   INFO txid_current_snapshot is installed
2009-05-13 12:37:17,936 19376   INFO Installing pgq
2009-05-13 12:37:17,984 19376   INFO   Reading from /usr/local/share/skytools/pgq.sql

$ pgqadm.py pgqadm_userdb.ini   ticker -d
...
$ tail log/pgqadm_userdb.log
2009-05-13 12:45:42,572 17184   INFO   {ticks: 1}
2009-05-13 12:45:42,586 17184   INFO   {maint_duration: 0.0229749679565}
2009-05-13 12:50:42,639 17184   INFO   {maint_duration: 0.0530850887299}
2009-05-13 12:50:42,719 17184   INFO   {ticks: 9}
pgqadm.py: command line
 pgqadm.py provides additional functionality besides the ticker:
    creating and configuring queues
    managing consumers
    querying status
 Convenient front end for the PgQ SQL API.

Usage: pgqadm.py [options] INI CMD [subcmd args]

commands:
  ticker                     start ticking & maintenance process

    status                   show overview of queue health

    install                  install code into db
    create QNAME             create queue
    drop QNAME               drop queue
    register QNAME CONS      install code into db
    unregister QNAME CONS    install code into db
    config QNAME [VAR=VAL]   show or change queue config
pgqadm.py: creating queues and consumers
   We'll create a queue called notifications and subscribe a consumer
    welcome_consumer to it.
$ pgqadm.py pgqadm_userdb.ini create notifications
2009-05-12 17:53:34,726 12610 INFO Creating queue: notifications

$ pgqadm.py pgqadm_userdb.ini register notifications welcome_consumer
2009-05-12 17:53:47,808 12632 INFO Registering consumer welcome_consumer on queue
notifications

$ pgqadm.py pgqadm_userdb.ini status
Postgres version: 8.3.7   PgQ version: 2.1.8

Event queue                                    Rotation        Ticker   TLag
------------------------------------------------------------------------------
notifications                                   3/7200s    500/3s/60s     1s
------------------------------------------------------------------------------
Consumer                                                       Lag LastSeen
------------------------------------------------------------------------------
notifications:
  welcome_consumer                                            116s      103s
------------------------------------------------------------------------------
pgqadm.py: setting queue parameters
   Per-queue tuning options:
      ticker_max_lag – Max time between ticks.
      ticker_idle_period – Tick at interval, if no events.
      ticker_max_count – Tick, if number of events exceeds this. Can be used to
       tune batch sizes.
      rotation_period – How often to rotate queue tables, balance between disk
       space and event history.

$ pgqadm.py pgqadm_userdb.ini config notifications
notifications
    ticker_max_lag      =      3
    ticker_idle_period =      60
    rotation_period     =   7200
    ticker_max_count    =    500

$ pgqadm.py pgqadm_userdb.ini config notifications ticker_max_count=1000
Change queue notifications config to: queue_ticker_max_count='1000'
pgqadm.py: summary
   Runs the ticker.
   Performs queue maintenance.
   Provides command line interface for managing the queues and
    consumers.
Python framework
 The framework handles database connection management, logging,
  statistics, quoting, daemonization etc.
 Most of the Skytools applications are implemented by extending
  DBScript - a Python class providing the infrastructure needed for
  typical database batch job.
 Several base classes available for implementing custom consumer
  applications.
DBScript: configuration
 Most DBScripts have a configuration file that defines the parameters
  for the script – source database, queue name, log file location etc.
 Python ConfigParser format.
 Common options:
    job_name – Identifies the current script
    loop_delay – how often to check for work, seconds.
    pidfile – pid file for daemons
    logfile – log file name
    log_size – size of individual log files.
    log_count – number of log files kept.
    use_skylog – override logging configuration by skylog.ini
DBScript: command line
   Standard invocation is:
    script.py configuration.ini [options]

   Common options:
      -h, --help – Show usage for the particular script.
      -v, --verbose – Make the script more verbose.
      -q, --quiet – Log only errors and warnings.
      -d, --daemon – Run the script in background.
      -r, --reload – Reload a running application,
      -s, --stop – Wait for work loop to finish, then stop.
      -k, --kill – Terminate the application immediately.
Custom consumer
 We'll write a small queue application called welcome_consumer.
 The application reads events from a queue and prints out event
  payload if type matches “welcome”.
 The application base class extends Consumer which in turn extends
  DBScript.
 Consumer implements the pgq consumer event loop. We only need to
  add the bits that do the event handling.
 All of the regular DBScript configuration and command options apply.
Custom consumer: configuration
 pgq_queue_name – name of    [welcome_app]
  the queue the consumer is   job_name        = welcome_consumer

  subscribing to.             src_db          = dbname=userdb
 pgq_consumer_id – name of   pgq_queue_name = notifications
  the consumer, defaults to   pgq_consumer_id = %(job_name)s

  job_name if not present.    logfile         = log/%(job_name)s.log
                              pidfile         = pid/%(job_name)s.pid
Custom consumer: Python code
 import sys, pgq, skytools                              Event structure
 class WelcomeConsumer(pgq.Consumer):                    Column   |   Type
     def __init__(self, args):
         pgq.Consumer.__init__(self,                    -----------+------------
            "welcome_app", "src_db", args)               ev_id     | bigint
                                                         ev_time   | timestamptz
    def process_event(self, src_db, ev):                 ev_txid   | bigint
        if ev.ev_type == 'welcome':                      ev_retry | integer
            self.log.info('Welcome %s!' % ev.ev_data)    ev_type   | text
        ev.tag_done()                                    ev_data   | text
                                                         ev_extra1 | text
 if __name__ == '__main__':                              ev_extra2 | text
     script = WelcomeConsumer(sys.argv[1:])              ev_extra3 | text
     script.start()                                      ev_extra4 | text
Custom consumer: event processing
 The process_event() function is called for each event in a batch.
  If there are no batches the script sleeps loop_delay seconds and
  retries.
 A DB API connection to the queue database is passed in src_db,
  the transaction will be committed after successfully returning from
  process_event().
 On failure, the transaction will be rolled back, active batch will be
  reprocessed on next iteration.
 We need to call tag_done() for the processed events – otherwise
  they'll be scheduled for retry processing.
    ...
    def process_event(self, src_db, ev):
        if ev.ev_type == 'welcome':
            self.log.info('Welcome %s!' % ev.ev_data)
        ev.tag_done()
    ...
Custom consumer: running it
 The consumer must be subscribed to the queue before events can be
  processed. In Skytools 2 this happens automatically.
 Skytools 3 requires explicit subscription, provides a –register switch
  for the purpose.
 For each processed batch the script logs the number of events and
  processing duration.

    $ python welcome_consumer.py   welcome_consumer.ini
    2009-05-13 11:50:27,700 4318   INFO {count: 0, duration: 0.0322341918945}
    2009-05-13 11:50:27,705 4318   INFO {count: 0, duration: 0.00421690940857}
    2009-05-13 11:51:13,720 4318   INFO {count: 0, duration: 0.0121331214905}
Custom consumer: event processing
 So far our consumer hasn't seen any events.
 We'll use pgq.insert_event() stored procedure to feed some test
  events into the queue.
 In it's simplest form it takes queue name, event type and payload as
  arguments.

userdb# select pgq.insert_event('notifications', 'welcome', 'Custom Consumer');
userdb# select pgq.insert_event('notifications', 'irrelevant', 'Another Event');
...
2009-05-13 12:19:11,563 6884 INFO {count: 0, duration: 0.00770711898804}
2009-05-13 12:19:14,583 6884 INFO {count: 0, duration: 0.0210809707642}
2009-05-13 12:19:25,591 6884 INFO Welcome Custom Consumer!
2009-05-13 12:19:25,595 6884 INFO {count: 2, duration: 0.012876033783}
2009-05-13 12:19:28,608 6884 INFO {count: 0, duration: 0.0131230354309}
Custom consumer: event tracking
   Extend RemoteConsumer to add batch tracking support.
   pgq_ext must be installed on the target database.
   We'll use a simple counter application as an example.
   This actually implements distributed transactions.


class UserCounter(pgq.RemoteConsumer):
    def __init__(self, args):
        pgq.RemoteConsumer.__init__(self, "user_counter", "src_db", "dst_db", args)

    def process_remote_batch(self, db, batch_id, event_list, dst_db):
        for ev in event_list:
            ev.tag_done()
        cur = dst_db.cursor()
        cur.execute("update user_count set n = n + %s" % len(event_list))
Custom consumer: wrapup
 We have just implemented some simple PgQ consumers.
 Extend Consumer class for simple consumers. Advanced consumer
  base classes also available.
 RemoteConsumer and SerialConsumer – provide batch tracking,
  these are used for processing events in remote databases.
 CascadedConsumer adds cascading support (Skytools 3).


        ticker
                      tick

                                   notifications

                                                   batch of events
                        event
                                                                     welcome consumer
                                pgq.insert_event

                                  userdb
Replication toolset
 Replication tools built on top of PgQ:
    londiste – replication
    table dispatcher – archiving and partitioning
    queue mover – copy events from one queue to another
    queue splitter – split queues into queues
 Changelog triggers are used for capturing table data changes.
 Replication process is just another PgQ consumer.
Londiste
    Master/slave replication system implemented on top of PgQ.
    Uses sqltriga/logtriga to capture table changes on the master.
    PgQ consumer replays the captured events on the slave.
    One master can feed several slaves.
    Slaves can be masters to other slaves.


                Master                                                                   Slave
                                                 ticker
                source table
                  sqltriga                                                                londiste
                                 tick                                                    completed
    changelog                                                          event tracking


                    queue      batch of events       londiste worker               DML   target table
Londiste: setting up
                                                    [londiste]
   Prepare the configuration file – source and     job_name = l_u_to_f
    target databases, queue name.
   Run londiste install commands for               provider_db = dbname=userdb
    provider and subscriber.                        subscriber_db = dbname=foodb

   Start the replication process – consume         pgq_queue_name = user_events
    from master and replay on slave.
   The replay process can run anywhere, as         logfile = log/%(job_name)s.log
    long as it can connect to both databases.       pidfile = pid/%(job_name)s.pid

   Add tables to replication.
   Initial copy is started, tables are usable on
    slave after it finishes.
Londiste: demonstration
$ londiste.py londiste_userdb_to_foodb.ini provider install
2009-05-14 15:12:42,714 27716 INFO plpgsql is installed
2009-05-14 15:12:42,716 27716 INFO txid_current_snapshot is installed
2009-05-14 15:12:42,716 27716 INFO pgq is installed
2009-05-14 15:12:42,717 27716 INFO Installing londiste
2009-05-14 15:12:42,717 27716 INFO   Reading from /usr/local/share/skytools/londiste.sql
$ londiste.py londiste_userdb_to_foodb.ini subscriber install
2009-05-14 15:12:48,887 27728 INFO plpgsql is installed
2009-05-14 15:12:48,889 27728 INFO Installing londiste
2009-05-14 15:12:48,889 27728 INFO   Reading from /usr/local/share/skytools/londiste.sql
$ londiste.py londiste_userdb_to_foodb.ini replay -d
$ pg_dump -t users -s userdb | psql foodb
$ londiste.py londiste_userdb_to_foodb.ini provider add users
2009-05-14 15:15:19,730 27959 INFO Adding public.users
$ londiste.py londiste_userdb_to_foodb.ini subscriber add users
2009-05-14 15:16:29,845 28082 INFO Checking public.users
2009-05-14 15:16:29,888 28082 INFO Adding public.users
$ tail log/londiste_userdb_to_foodb.log
2009-05-14 15:44:47,293 28122 INFO {count: 0, ignored: 0, duration: 0.0210900306702}
2009-05-14 15:45:47,309 28122 INFO {count: 0, ignored: 0, duration: 0.0170979499817}
Table dispatcher
    Archiving and partitioning tool.
    Customizable table structure.
    Automatically creates partitions based on user specified conditions.
    Does not handle updates.



               Master                                                                     Slave
                                               ticker
              source table
               logutriga                                                                   pgq_ext
                               tick                                                     completed_tick
insert changelog                                                      event tracking

                                                                                         target table
                   queue                           table dispatcher
                             batch of events                                      DML     partitions
Table dispatcher: setting up
 Add logutriga to the source table, or      userdb# d users
                                             ...
  reuse an existing trigger.                 Triggers:
 Create the base table structure on         welcome_user_trg AFTER INSERT ON users
                                             FOR EACH ROW EXECUTE PROCEDURE
  target. Individual table partitions will     pgq.logutriga('notifications',
  be inherited from that.                                    'ignore=password')

 Prepare the configuration file which       archivedb# d user_history
                                                 Column    | Type
  specifies the source queue, target         --------------+----------
  table and partitioning options.             username     | text
                                              date_created | timestamp
Table dispatcher: configuration
   dest_table – Base table for        [table_dispatcher]
    partitions.                        job_name          = user_archiver

   fields – select the columns to     src_db           = dbname=userdb
                                       dst_db           = dbname=archivedb
    include, or use * for all.
                                       pgq_queue_name   = notifications
   part_field – the column used for
    partitioning.                      logfile          = log/%(job_name)s.log
                                       pidfile          = pid/%(job_name)s.pid
   part_method – either daily or
    monthly.                           dest_table = user_history
                                       fields = username, date_created
   part_template – SQL template       part_field = date_created
                                       part_method = daily
    for creating the partitions.
                                       part_template     =
                                           create table _DEST_TABLE ()
                                             inherits (user_history);
                                           grant select on _DEST_TABLE
                                             to reporting;
Table dispatcher: demonstration
$ table_dispatcher.py td_userdb_to_archivedb.ini
2009-05-18 11:05:14,370 10625 INFO {count: 0, duration: 0.0341429710388}
2009-05-18 11:05:14,379 10625 INFO {count: 1, duration: 0.00861620903015}
2009-05-18 11:05:15,394 10625 INFO {count: 0, duration: 0.0151319503784}
...

$ psql archivedb
archivedb# dt user_history*
                 List of relations
 Schema |          Name           | Type | Owner
--------+-------------------------+-------+---------
 public | user_history            | table | martinp
 public | user_history_2009_05_17 | table | martinp
 public | user_history_2009_05_18 | table | martinp
(3 rows)
Queue mover
   Transports events from one queue to another.
   Useful for performing queue migrations.
   Consolidating queues from partitioned databases.




         Master                                                                Slave
                                      ticker

                                                                                pgq_ext
                      tick                                                   completed_tick
                                                         event tracking

           source                                                                 target
                                           queue mover              events        queue
           queue    batch of events
Queue splitter
 Transports events from one queue to several target queues.
 ev_extra1 field is used to determine the target queue.
  logutriga automatically puts table name there.
 Useful for transporting events for batch processing.

            Source                                                                               Target
                                                                                               pgq_ext
                                                                                            completed_tick
            logutriga()                     ticker
                                                                  event tracking
                                                                                                  target
                                                                                                  queue
    event
                            tick
                                                                               events               target
                                                                                                    queue
               source                                                              events
                                                 queue splitter
               queue      batch of events
                                                                                                      target
                                                                                   events
                                                                                                      queue
Replication tools: wrapup
 Replication tools are ordinary PgQ consumers implemented with
  Skytools framework.
 On master database changelog events are enqueued through
  sqltriga/logutriga.
 On slave the DML statements are reconstructed and replayed.
 Event tracking is used to ensure that duplicate batches are not
  processed.
Skytools
Managing the Skytools environment
Skytools: getting and installing
 Prerequisites:
     Python
     psycopg2
 If you are lucky:
     apt-get install skytools
     http://yum.pgsqlrpms.org
 Building from source
     Get it from http://pgfoundry.org/projects/skytools/
     Needs PostgreSQL development headers and libraries
     untar, configure, make install
 For the adventurous, Skytools3:
     http://github.com/markokr/skytools-dev
Skytools2: installing from tarball
 Get the latest tarball from pgfoundry.
 Dependencies:
    C compiler and make
    PostgreSQL development headers and libraries
    Python development package
 Makefile can also generate Debian packages.

$ tar zxf skytools-2.1.9.tar.gz
$ cd skytools-2.1.9
$ ./configure –prefix=/usr/local
$ make
$ sudo make install
... or ...
$ make deb83
Skytools3: building from Git
    Main repository is on github, clone from there or create your own fork.
    Adds additional dependencies:
       asciidoc
       xmlto
       autoconf

$   git clone git://github.com/markokr/skytools-dev.git
$   cd skytools-dev
$   git submodule init
$   git submodule update
$   make boot
$   ./configure –prefix=/usr/local --with-asciidoc
$   make
Skytools: migrations and upgrades
 Upgrading a database with PgQ – pretty much straightforward, but has
  some additional steps.
 Migrating consumers from one database to another – to take some
  load off the primary server or to prepare for database migrations.
 Migrating whole databases.
Upgrading a PgQ database
1. pg_dump the database, shutdown database, stop tickers and
   consumers.
2. Run pg_resetxlog -n to determine the current epoch (extract from
   Latest checkpoint's NextXID).
3. Upgrade PostgreSQL binaries AND skytools modules.
4. Run pg_resetxlog -e to increase the epoch value. This is needed to
   enable pgq to correctly interpret stored txid values.

   Alternatively, if you are using the schema based txid (prior to 8.3), start
   the cluster and update the epoch in txid schema:

   UPDATE txid.epoch SET epoch = epoch + 1,
   last_value = (get_current_txid() & 4294967295);
5. Start the database, import dump file.
6. Start the ticker and consumers.
Skytools2: migrating consumers
1. Set up a queue mover to replicate the queue to new database, we will
   move the consumer subscriptions to the queue replica.
2. Stop the ticker on primary database - no more new batches will be
   prepared. After processing the pending batches, the consumers will
   stop at identical positions on the queue.
3. We can now subscribe the consumers to the replicated queue. Note
   that we need to reset the event tracking for the migrated consumers.
   Replication tools have --reset option for the purpose.
4. Start the ticker. Queue mover will continue queue replication,
   consumers on the new queue will continue where they left off.
5. If all is good, unregister the consumers from the old master.
Skytools3: migrating consumers
 Cascaded queues are kept identical across nodes - no need to set up
  explicit queue movers.
 Consumers that extend CascadedConsumer can be switched by
  simply running change-provider command of the set admin tool.
 No need to mess around with tickers and configuration files.
 The core features are complete, some development needed.
Skytools2: migrating databases
1. Create the database structure on the new host. Omit pgq, pgq_ext and
   londiste schemas – better to reinstall those later.
2. Replicate the database to the new host using londiste.
3. Create the queues and changelog triggers on the new database.
4. Pay special attention to applications that use stored procedures to
   enqueue events - maybe a queue mover is needed?
5. Migrate the consumers to the new database.
6. Make the primary database read-only and wait for londiste replication
   to catch up.
7. Redirect the applications to the new database.
Skytools3: migrating databases
 Cascading can be used to greatly simplify the migration process.
 The target database should be a branch node of the cascade.
 Migration is then performed by stopping the applications, running a
  londiste switchover command and redirecting the applications to the
  new database.
 Switchover will switch the roles of the root and branch, consumers
  needn't be aware that something changed.
Skytools: monitoring consumers
 We need to ensure that all our consumers are running happily.
 The best indicator for this is the consumer lag – if a consumer is
  lagging, it is not processing events adequately.
 pgqadm.py status command or pgq.get_consumer_info() SQL
  function can be used to determine the lag.
 In the example welcome_consumer hasn't processed anything in 6
  days – probably not running at all.

    select queue_name, consumer_name, lag, last_seen from pgq.get_consumer_info();

      queue_name   |      consumer_name | lag       | last_seen
    ---------------+--------------------+-----------+----------
     notifications | user_counter       | 00:00:43 | 00:00:00
     notifications | welcome_consumer   | 6 days    | 6 days
Skytools: logging
 Skytools applications use Python logging module which can be used to
  forward the log and statistics messages to a central location.
 Just set use_skylog = 1 in the configuration and configure the log
  handlers in skylog.ini
 Use syslog or write your own log handler. Examples are provided for
  sending the log over UDP or to a PostgreSQL database via stored
  procedure calls (see skylog.py).
 At Skype, we use the logging facilities to populate a configuration
  management database and feed error messages to Nagios.
Skytools: links
   PgFoundry project page
    http://pgfoundry.org/projects/skytools
   PgQ tutorial
    http://wiki.postgresql.org/wiki/PGQ_Tutorial
   Tool documentation
    http://skytools.projects.postgresql.org/doc/
   PHP consumer
    http://pgsql.tapoueh.org/pgq/pgq-php/
   Github repository for Skytools3
    http://github.com/markokr/skytools-dev/tree/master
Questions?

More Related Content

What's hot

PGConf.ASIA 2017 Logical Replication Internals (English)
PGConf.ASIA 2017 Logical Replication Internals (English)PGConf.ASIA 2017 Logical Replication Internals (English)
PGConf.ASIA 2017 Logical Replication Internals (English)Noriyoshi Shinoda
 
Shipping Data from Postgres to Clickhouse, by Murat Kabilov, Adjust
Shipping Data from Postgres to Clickhouse, by Murat Kabilov, AdjustShipping Data from Postgres to Clickhouse, by Murat Kabilov, Adjust
Shipping Data from Postgres to Clickhouse, by Murat Kabilov, AdjustAltinity Ltd
 
C* Summit 2013: The World's Next Top Data Model by Patrick McFadin
C* Summit 2013: The World's Next Top Data Model by Patrick McFadinC* Summit 2013: The World's Next Top Data Model by Patrick McFadin
C* Summit 2013: The World's Next Top Data Model by Patrick McFadinDataStax Academy
 
What’s the Best PostgreSQL High Availability Framework? PAF vs. repmgr vs. Pa...
What’s the Best PostgreSQL High Availability Framework? PAF vs. repmgr vs. Pa...What’s the Best PostgreSQL High Availability Framework? PAF vs. repmgr vs. Pa...
What’s the Best PostgreSQL High Availability Framework? PAF vs. repmgr vs. Pa...ScaleGrid.io
 
pg_proctab: Accessing System Stats in PostgreSQL
pg_proctab: Accessing System Stats in PostgreSQLpg_proctab: Accessing System Stats in PostgreSQL
pg_proctab: Accessing System Stats in PostgreSQLMark Wong
 
[Pgday.Seoul 2019] Citus를 이용한 분산 데이터베이스
[Pgday.Seoul 2019] Citus를 이용한 분산 데이터베이스[Pgday.Seoul 2019] Citus를 이용한 분산 데이터베이스
[Pgday.Seoul 2019] Citus를 이용한 분산 데이터베이스PgDay.Seoul
 
Webinar: Strength in Numbers: Introduction to ClickHouse Cluster Performance
Webinar: Strength in Numbers: Introduction to ClickHouse Cluster PerformanceWebinar: Strength in Numbers: Introduction to ClickHouse Cluster Performance
Webinar: Strength in Numbers: Introduction to ClickHouse Cluster PerformanceAltinity Ltd
 
PostgreSQL Administration for System Administrators
PostgreSQL Administration for System AdministratorsPostgreSQL Administration for System Administrators
PostgreSQL Administration for System AdministratorsCommand Prompt., Inc
 
BP204 - Take a REST and put your data to work with APIs!
BP204 - Take a REST and put your data to work with APIs!BP204 - Take a REST and put your data to work with APIs!
BP204 - Take a REST and put your data to work with APIs!Craig Schumann
 
ksqlDB - Stream Processing simplified!
ksqlDB - Stream Processing simplified!ksqlDB - Stream Processing simplified!
ksqlDB - Stream Processing simplified!Guido Schmutz
 
Apache kafka 모니터링을 위한 Metrics 이해 및 최적화 방안
Apache kafka 모니터링을 위한 Metrics 이해 및 최적화 방안Apache kafka 모니터링을 위한 Metrics 이해 및 최적화 방안
Apache kafka 모니터링을 위한 Metrics 이해 및 최적화 방안SANG WON PARK
 
MySQL Advanced Administrator 2021 - 네오클로바
MySQL Advanced Administrator 2021 - 네오클로바MySQL Advanced Administrator 2021 - 네오클로바
MySQL Advanced Administrator 2021 - 네오클로바NeoClova
 
Materialize: a platform for changing data
Materialize: a platform for changing dataMaterialize: a platform for changing data
Materialize: a platform for changing dataAltinity Ltd
 
[pgday.Seoul 2022] 서비스개편시 PostgreSQL 도입기 - 진소린 & 김태정
[pgday.Seoul 2022] 서비스개편시 PostgreSQL 도입기 - 진소린 & 김태정[pgday.Seoul 2022] 서비스개편시 PostgreSQL 도입기 - 진소린 & 김태정
[pgday.Seoul 2022] 서비스개편시 PostgreSQL 도입기 - 진소린 & 김태정PgDay.Seoul
 
MySQL GTID Concepts, Implementation and troubleshooting
MySQL GTID Concepts, Implementation and troubleshooting MySQL GTID Concepts, Implementation and troubleshooting
MySQL GTID Concepts, Implementation and troubleshooting Mydbops
 
MySQL8.0_performance_schema.pptx
MySQL8.0_performance_schema.pptxMySQL8.0_performance_schema.pptx
MySQL8.0_performance_schema.pptxNeoClova
 
Storing 16 Bytes at Scale
Storing 16 Bytes at ScaleStoring 16 Bytes at Scale
Storing 16 Bytes at ScaleFabian Reinartz
 
Data warehouse on Kubernetes - gentle intro to Clickhouse Operator, by Robert...
Data warehouse on Kubernetes - gentle intro to Clickhouse Operator, by Robert...Data warehouse on Kubernetes - gentle intro to Clickhouse Operator, by Robert...
Data warehouse on Kubernetes - gentle intro to Clickhouse Operator, by Robert...Altinity Ltd
 

What's hot (20)

PGConf.ASIA 2017 Logical Replication Internals (English)
PGConf.ASIA 2017 Logical Replication Internals (English)PGConf.ASIA 2017 Logical Replication Internals (English)
PGConf.ASIA 2017 Logical Replication Internals (English)
 
Shipping Data from Postgres to Clickhouse, by Murat Kabilov, Adjust
Shipping Data from Postgres to Clickhouse, by Murat Kabilov, AdjustShipping Data from Postgres to Clickhouse, by Murat Kabilov, Adjust
Shipping Data from Postgres to Clickhouse, by Murat Kabilov, Adjust
 
C* Summit 2013: The World's Next Top Data Model by Patrick McFadin
C* Summit 2013: The World's Next Top Data Model by Patrick McFadinC* Summit 2013: The World's Next Top Data Model by Patrick McFadin
C* Summit 2013: The World's Next Top Data Model by Patrick McFadin
 
What’s the Best PostgreSQL High Availability Framework? PAF vs. repmgr vs. Pa...
What’s the Best PostgreSQL High Availability Framework? PAF vs. repmgr vs. Pa...What’s the Best PostgreSQL High Availability Framework? PAF vs. repmgr vs. Pa...
What’s the Best PostgreSQL High Availability Framework? PAF vs. repmgr vs. Pa...
 
Hash joins and bloom filters at AMIS25
Hash joins and bloom filters at AMIS25Hash joins and bloom filters at AMIS25
Hash joins and bloom filters at AMIS25
 
pg_proctab: Accessing System Stats in PostgreSQL
pg_proctab: Accessing System Stats in PostgreSQLpg_proctab: Accessing System Stats in PostgreSQL
pg_proctab: Accessing System Stats in PostgreSQL
 
[Pgday.Seoul 2019] Citus를 이용한 분산 데이터베이스
[Pgday.Seoul 2019] Citus를 이용한 분산 데이터베이스[Pgday.Seoul 2019] Citus를 이용한 분산 데이터베이스
[Pgday.Seoul 2019] Citus를 이용한 분산 데이터베이스
 
Webinar: Strength in Numbers: Introduction to ClickHouse Cluster Performance
Webinar: Strength in Numbers: Introduction to ClickHouse Cluster PerformanceWebinar: Strength in Numbers: Introduction to ClickHouse Cluster Performance
Webinar: Strength in Numbers: Introduction to ClickHouse Cluster Performance
 
PostgreSQL Administration for System Administrators
PostgreSQL Administration for System AdministratorsPostgreSQL Administration for System Administrators
PostgreSQL Administration for System Administrators
 
BP204 - Take a REST and put your data to work with APIs!
BP204 - Take a REST and put your data to work with APIs!BP204 - Take a REST and put your data to work with APIs!
BP204 - Take a REST and put your data to work with APIs!
 
Backup and-recovery2
Backup and-recovery2Backup and-recovery2
Backup and-recovery2
 
ksqlDB - Stream Processing simplified!
ksqlDB - Stream Processing simplified!ksqlDB - Stream Processing simplified!
ksqlDB - Stream Processing simplified!
 
Apache kafka 모니터링을 위한 Metrics 이해 및 최적화 방안
Apache kafka 모니터링을 위한 Metrics 이해 및 최적화 방안Apache kafka 모니터링을 위한 Metrics 이해 및 최적화 방안
Apache kafka 모니터링을 위한 Metrics 이해 및 최적화 방안
 
MySQL Advanced Administrator 2021 - 네오클로바
MySQL Advanced Administrator 2021 - 네오클로바MySQL Advanced Administrator 2021 - 네오클로바
MySQL Advanced Administrator 2021 - 네오클로바
 
Materialize: a platform for changing data
Materialize: a platform for changing dataMaterialize: a platform for changing data
Materialize: a platform for changing data
 
[pgday.Seoul 2022] 서비스개편시 PostgreSQL 도입기 - 진소린 & 김태정
[pgday.Seoul 2022] 서비스개편시 PostgreSQL 도입기 - 진소린 & 김태정[pgday.Seoul 2022] 서비스개편시 PostgreSQL 도입기 - 진소린 & 김태정
[pgday.Seoul 2022] 서비스개편시 PostgreSQL 도입기 - 진소린 & 김태정
 
MySQL GTID Concepts, Implementation and troubleshooting
MySQL GTID Concepts, Implementation and troubleshooting MySQL GTID Concepts, Implementation and troubleshooting
MySQL GTID Concepts, Implementation and troubleshooting
 
MySQL8.0_performance_schema.pptx
MySQL8.0_performance_schema.pptxMySQL8.0_performance_schema.pptx
MySQL8.0_performance_schema.pptx
 
Storing 16 Bytes at Scale
Storing 16 Bytes at ScaleStoring 16 Bytes at Scale
Storing 16 Bytes at Scale
 
Data warehouse on Kubernetes - gentle intro to Clickhouse Operator, by Robert...
Data warehouse on Kubernetes - gentle intro to Clickhouse Operator, by Robert...Data warehouse on Kubernetes - gentle intro to Clickhouse Operator, by Robert...
Data warehouse on Kubernetes - gentle intro to Clickhouse Operator, by Robert...
 

Viewers also liked

Использование очередей асинхронных сообщений с PostgreSQL (Илья Космодемьянский)
Использование очередей асинхронных сообщений с PostgreSQL (Илья Космодемьянский)Использование очередей асинхронных сообщений с PostgreSQL (Илья Космодемьянский)
Использование очередей асинхронных сообщений с PostgreSQL (Илья Космодемьянский)Ontico
 
Integrating PostgreSql with RabbitMQ
Integrating PostgreSql with RabbitMQIntegrating PostgreSql with RabbitMQ
Integrating PostgreSql with RabbitMQGavin Roy
 
Индексы в MSSQL: принципы работы и способы оптимизации
Индексы в MSSQL: принципы работы и способы оптимизацииИндексы в MSSQL: принципы работы и способы оптимизации
Индексы в MSSQL: принципы работы и способы оптимизацииAlexander Byndyu
 
PostgreSQL в высоконагруженных проектах
PostgreSQL в высоконагруженных проектахPostgreSQL в высоконагруженных проектах
PostgreSQL в высоконагруженных проектахAlexey Vasiliev
 
Spilo, отказоустойчивый PostgreSQL кластер / Oleksii Kliukin (Zalando SE)
Spilo, отказоустойчивый PostgreSQL кластер / Oleksii Kliukin (Zalando SE)Spilo, отказоустойчивый PostgreSQL кластер / Oleksii Kliukin (Zalando SE)
Spilo, отказоустойчивый PostgreSQL кластер / Oleksii Kliukin (Zalando SE)Ontico
 
Linux Kernel Cryptographic API and Use Cases
Linux Kernel Cryptographic API and Use CasesLinux Kernel Cryptographic API and Use Cases
Linux Kernel Cryptographic API and Use CasesKernel TLV
 

Viewers also liked (7)

Использование очередей асинхронных сообщений с PostgreSQL (Илья Космодемьянский)
Использование очередей асинхронных сообщений с PostgreSQL (Илья Космодемьянский)Использование очередей асинхронных сообщений с PostgreSQL (Илья Космодемьянский)
Использование очередей асинхронных сообщений с PostgreSQL (Илья Космодемьянский)
 
PostgreSQL: meet your queue
PostgreSQL: meet your queuePostgreSQL: meet your queue
PostgreSQL: meet your queue
 
Integrating PostgreSql with RabbitMQ
Integrating PostgreSql with RabbitMQIntegrating PostgreSql with RabbitMQ
Integrating PostgreSql with RabbitMQ
 
Индексы в MSSQL: принципы работы и способы оптимизации
Индексы в MSSQL: принципы работы и способы оптимизацииИндексы в MSSQL: принципы работы и способы оптимизации
Индексы в MSSQL: принципы работы и способы оптимизации
 
PostgreSQL в высоконагруженных проектах
PostgreSQL в высоконагруженных проектахPostgreSQL в высоконагруженных проектах
PostgreSQL в высоконагруженных проектах
 
Spilo, отказоустойчивый PostgreSQL кластер / Oleksii Kliukin (Zalando SE)
Spilo, отказоустойчивый PostgreSQL кластер / Oleksii Kliukin (Zalando SE)Spilo, отказоустойчивый PostgreSQL кластер / Oleksii Kliukin (Zalando SE)
Spilo, отказоустойчивый PostgreSQL кластер / Oleksii Kliukin (Zalando SE)
 
Linux Kernel Cryptographic API and Use Cases
Linux Kernel Cryptographic API and Use CasesLinux Kernel Cryptographic API and Use Cases
Linux Kernel Cryptographic API and Use Cases
 

Similar to Skytools: PgQ Queues and applications

Database Tools by Skype
Database Tools by SkypeDatabase Tools by Skype
Database Tools by Skypeelliando dias
 
Distributed Computing on PostgreSQL | PGConf EU 2017 | Marco Slot
Distributed Computing on PostgreSQL | PGConf EU 2017 | Marco SlotDistributed Computing on PostgreSQL | PGConf EU 2017 | Marco Slot
Distributed Computing on PostgreSQL | PGConf EU 2017 | Marco SlotCitus Data
 
Using MongoDB with Kafka - Use Cases and Best Practices
Using MongoDB with Kafka -  Use Cases and Best PracticesUsing MongoDB with Kafka -  Use Cases and Best Practices
Using MongoDB with Kafka - Use Cases and Best PracticesAntonios Giannopoulos
 
Small Overview of Skype Database Tools
Small Overview of Skype Database ToolsSmall Overview of Skype Database Tools
Small Overview of Skype Database Toolselliando dias
 
Slash n: Technical Session 2 - Messaging as a Platform - Shashwat Agarwal, V...
Slash n: Technical Session 2 - Messaging as a Platform - Shashwat Agarwal,  V...Slash n: Technical Session 2 - Messaging as a Platform - Shashwat Agarwal,  V...
Slash n: Technical Session 2 - Messaging as a Platform - Shashwat Agarwal, V...slashn
 
Data Con LA 2019 - Unifying streaming and message queue with Apache Kafka by ...
Data Con LA 2019 - Unifying streaming and message queue with Apache Kafka by ...Data Con LA 2019 - Unifying streaming and message queue with Apache Kafka by ...
Data Con LA 2019 - Unifying streaming and message queue with Apache Kafka by ...Data Con LA
 
Story of migrating event pipeline from batch to streaming
Story of migrating event pipeline from batch to streamingStory of migrating event pipeline from batch to streaming
Story of migrating event pipeline from batch to streaminglohitvijayarenu
 
Unified Stream Processing at Scale with Apache Samza by Jake Maes at Big Data...
Unified Stream Processing at Scale with Apache Samza by Jake Maes at Big Data...Unified Stream Processing at Scale with Apache Samza by Jake Maes at Big Data...
Unified Stream Processing at Scale with Apache Samza by Jake Maes at Big Data...Big Data Spain
 
Architectual Comparison of Apache Apex and Spark Streaming
Architectual Comparison of Apache Apex and Spark StreamingArchitectual Comparison of Apache Apex and Spark Streaming
Architectual Comparison of Apache Apex and Spark StreamingApache Apex
 
Kappa Architecture on Apache Kafka and Querona: datamass.io
Kappa Architecture on Apache Kafka and Querona: datamass.ioKappa Architecture on Apache Kafka and Querona: datamass.io
Kappa Architecture on Apache Kafka and Querona: datamass.ioPiotr Czarnas
 
Fabric - Realtime stream processing framework
Fabric - Realtime stream processing frameworkFabric - Realtime stream processing framework
Fabric - Realtime stream processing frameworkShashank Gautam
 
Netflix Data Pipeline With Kafka
Netflix Data Pipeline With KafkaNetflix Data Pipeline With Kafka
Netflix Data Pipeline With KafkaSteven Wu
 
Introduction to kubernetes
Introduction to kubernetesIntroduction to kubernetes
Introduction to kubernetesRishabh Indoria
 
Intro to Apache Apex - Next Gen Platform for Ingest and Transform
Intro to Apache Apex - Next Gen Platform for Ingest and TransformIntro to Apache Apex - Next Gen Platform for Ingest and Transform
Intro to Apache Apex - Next Gen Platform for Ingest and TransformApache Apex
 
Scaling Django with gevent
Scaling Django with geventScaling Django with gevent
Scaling Django with geventMahendra M
 
OSDC 2018 | From Monolith to Microservices by Paul Puschmann_
OSDC 2018 | From Monolith to Microservices by Paul Puschmann_OSDC 2018 | From Monolith to Microservices by Paul Puschmann_
OSDC 2018 | From Monolith to Microservices by Paul Puschmann_NETWAYS
 

Similar to Skytools: PgQ Queues and applications (20)

Database Tools by Skype
Database Tools by SkypeDatabase Tools by Skype
Database Tools by Skype
 
Distributed Computing on PostgreSQL | PGConf EU 2017 | Marco Slot
Distributed Computing on PostgreSQL | PGConf EU 2017 | Marco SlotDistributed Computing on PostgreSQL | PGConf EU 2017 | Marco Slot
Distributed Computing on PostgreSQL | PGConf EU 2017 | Marco Slot
 
Using MongoDB with Kafka - Use Cases and Best Practices
Using MongoDB with Kafka -  Use Cases and Best PracticesUsing MongoDB with Kafka -  Use Cases and Best Practices
Using MongoDB with Kafka - Use Cases and Best Practices
 
Small Overview of Skype Database Tools
Small Overview of Skype Database ToolsSmall Overview of Skype Database Tools
Small Overview of Skype Database Tools
 
Slash n: Technical Session 2 - Messaging as a Platform - Shashwat Agarwal, V...
Slash n: Technical Session 2 - Messaging as a Platform - Shashwat Agarwal,  V...Slash n: Technical Session 2 - Messaging as a Platform - Shashwat Agarwal,  V...
Slash n: Technical Session 2 - Messaging as a Platform - Shashwat Agarwal, V...
 
Data Con LA 2019 - Unifying streaming and message queue with Apache Kafka by ...
Data Con LA 2019 - Unifying streaming and message queue with Apache Kafka by ...Data Con LA 2019 - Unifying streaming and message queue with Apache Kafka by ...
Data Con LA 2019 - Unifying streaming and message queue with Apache Kafka by ...
 
Story of migrating event pipeline from batch to streaming
Story of migrating event pipeline from batch to streamingStory of migrating event pipeline from batch to streaming
Story of migrating event pipeline from batch to streaming
 
PCP
PCPPCP
PCP
 
Unified Stream Processing at Scale with Apache Samza by Jake Maes at Big Data...
Unified Stream Processing at Scale with Apache Samza by Jake Maes at Big Data...Unified Stream Processing at Scale with Apache Samza by Jake Maes at Big Data...
Unified Stream Processing at Scale with Apache Samza by Jake Maes at Big Data...
 
GCF
GCFGCF
GCF
 
Architectual Comparison of Apache Apex and Spark Streaming
Architectual Comparison of Apache Apex and Spark StreamingArchitectual Comparison of Apache Apex and Spark Streaming
Architectual Comparison of Apache Apex and Spark Streaming
 
Kappa Architecture on Apache Kafka and Querona: datamass.io
Kappa Architecture on Apache Kafka and Querona: datamass.ioKappa Architecture on Apache Kafka and Querona: datamass.io
Kappa Architecture on Apache Kafka and Querona: datamass.io
 
Fabric - Realtime stream processing framework
Fabric - Realtime stream processing frameworkFabric - Realtime stream processing framework
Fabric - Realtime stream processing framework
 
Netflix Data Pipeline With Kafka
Netflix Data Pipeline With KafkaNetflix Data Pipeline With Kafka
Netflix Data Pipeline With Kafka
 
Netflix Data Pipeline With Kafka
Netflix Data Pipeline With KafkaNetflix Data Pipeline With Kafka
Netflix Data Pipeline With Kafka
 
Introduction to kubernetes
Introduction to kubernetesIntroduction to kubernetes
Introduction to kubernetes
 
Intro to Apache Apex - Next Gen Platform for Ingest and Transform
Intro to Apache Apex - Next Gen Platform for Ingest and TransformIntro to Apache Apex - Next Gen Platform for Ingest and Transform
Intro to Apache Apex - Next Gen Platform for Ingest and Transform
 
Scaling Django with gevent
Scaling Django with geventScaling Django with gevent
Scaling Django with gevent
 
Scaling GraphQL Subscriptions
Scaling GraphQL SubscriptionsScaling GraphQL Subscriptions
Scaling GraphQL Subscriptions
 
OSDC 2018 | From Monolith to Microservices by Paul Puschmann_
OSDC 2018 | From Monolith to Microservices by Paul Puschmann_OSDC 2018 | From Monolith to Microservices by Paul Puschmann_
OSDC 2018 | From Monolith to Microservices by Paul Puschmann_
 

More from elliando dias

Clojurescript slides
Clojurescript slidesClojurescript slides
Clojurescript slideselliando dias
 
Why you should be excited about ClojureScript
Why you should be excited about ClojureScriptWhy you should be excited about ClojureScript
Why you should be excited about ClojureScriptelliando dias
 
Functional Programming with Immutable Data Structures
Functional Programming with Immutable Data StructuresFunctional Programming with Immutable Data Structures
Functional Programming with Immutable Data Structureselliando dias
 
Nomenclatura e peças de container
Nomenclatura  e peças de containerNomenclatura  e peças de container
Nomenclatura e peças de containerelliando dias
 
Polyglot and Poly-paradigm Programming for Better Agility
Polyglot and Poly-paradigm Programming for Better AgilityPolyglot and Poly-paradigm Programming for Better Agility
Polyglot and Poly-paradigm Programming for Better Agilityelliando dias
 
Javascript Libraries
Javascript LibrariesJavascript Libraries
Javascript Librarieselliando dias
 
How to Make an Eight Bit Computer and Save the World!
How to Make an Eight Bit Computer and Save the World!How to Make an Eight Bit Computer and Save the World!
How to Make an Eight Bit Computer and Save the World!elliando dias
 
A Practical Guide to Connecting Hardware to the Web
A Practical Guide to Connecting Hardware to the WebA Practical Guide to Connecting Hardware to the Web
A Practical Guide to Connecting Hardware to the Webelliando dias
 
Introdução ao Arduino
Introdução ao ArduinoIntrodução ao Arduino
Introdução ao Arduinoelliando dias
 
Incanter Data Sorcery
Incanter Data SorceryIncanter Data Sorcery
Incanter Data Sorceryelliando dias
 
Fab.in.a.box - Fab Academy: Machine Design
Fab.in.a.box - Fab Academy: Machine DesignFab.in.a.box - Fab Academy: Machine Design
Fab.in.a.box - Fab Academy: Machine Designelliando dias
 
The Digital Revolution: Machines that makes
The Digital Revolution: Machines that makesThe Digital Revolution: Machines that makes
The Digital Revolution: Machines that makeselliando dias
 
Hadoop - Simple. Scalable.
Hadoop - Simple. Scalable.Hadoop - Simple. Scalable.
Hadoop - Simple. Scalable.elliando dias
 
Hadoop and Hive Development at Facebook
Hadoop and Hive Development at FacebookHadoop and Hive Development at Facebook
Hadoop and Hive Development at Facebookelliando dias
 
Multi-core Parallelization in Clojure - a Case Study
Multi-core Parallelization in Clojure - a Case StudyMulti-core Parallelization in Clojure - a Case Study
Multi-core Parallelization in Clojure - a Case Studyelliando dias
 

More from elliando dias (20)

Clojurescript slides
Clojurescript slidesClojurescript slides
Clojurescript slides
 
Why you should be excited about ClojureScript
Why you should be excited about ClojureScriptWhy you should be excited about ClojureScript
Why you should be excited about ClojureScript
 
Functional Programming with Immutable Data Structures
Functional Programming with Immutable Data StructuresFunctional Programming with Immutable Data Structures
Functional Programming with Immutable Data Structures
 
Nomenclatura e peças de container
Nomenclatura  e peças de containerNomenclatura  e peças de container
Nomenclatura e peças de container
 
Geometria Projetiva
Geometria ProjetivaGeometria Projetiva
Geometria Projetiva
 
Polyglot and Poly-paradigm Programming for Better Agility
Polyglot and Poly-paradigm Programming for Better AgilityPolyglot and Poly-paradigm Programming for Better Agility
Polyglot and Poly-paradigm Programming for Better Agility
 
Javascript Libraries
Javascript LibrariesJavascript Libraries
Javascript Libraries
 
How to Make an Eight Bit Computer and Save the World!
How to Make an Eight Bit Computer and Save the World!How to Make an Eight Bit Computer and Save the World!
How to Make an Eight Bit Computer and Save the World!
 
Ragel talk
Ragel talkRagel talk
Ragel talk
 
A Practical Guide to Connecting Hardware to the Web
A Practical Guide to Connecting Hardware to the WebA Practical Guide to Connecting Hardware to the Web
A Practical Guide to Connecting Hardware to the Web
 
Introdução ao Arduino
Introdução ao ArduinoIntrodução ao Arduino
Introdução ao Arduino
 
Minicurso arduino
Minicurso arduinoMinicurso arduino
Minicurso arduino
 
Incanter Data Sorcery
Incanter Data SorceryIncanter Data Sorcery
Incanter Data Sorcery
 
Rango
RangoRango
Rango
 
Fab.in.a.box - Fab Academy: Machine Design
Fab.in.a.box - Fab Academy: Machine DesignFab.in.a.box - Fab Academy: Machine Design
Fab.in.a.box - Fab Academy: Machine Design
 
The Digital Revolution: Machines that makes
The Digital Revolution: Machines that makesThe Digital Revolution: Machines that makes
The Digital Revolution: Machines that makes
 
Hadoop + Clojure
Hadoop + ClojureHadoop + Clojure
Hadoop + Clojure
 
Hadoop - Simple. Scalable.
Hadoop - Simple. Scalable.Hadoop - Simple. Scalable.
Hadoop - Simple. Scalable.
 
Hadoop and Hive Development at Facebook
Hadoop and Hive Development at FacebookHadoop and Hive Development at Facebook
Hadoop and Hive Development at Facebook
 
Multi-core Parallelization in Clojure - a Case Study
Multi-core Parallelization in Clojure - a Case StudyMulti-core Parallelization in Clojure - a Case Study
Multi-core Parallelization in Clojure - a Case Study
 

Recently uploaded

Abdul Kader Baba- Managing Cybersecurity Risks and Compliance Requirements i...
Abdul Kader Baba- Managing Cybersecurity Risks  and Compliance Requirements i...Abdul Kader Baba- Managing Cybersecurity Risks  and Compliance Requirements i...
Abdul Kader Baba- Managing Cybersecurity Risks and Compliance Requirements i...itnewsafrica
 
Emixa Mendix Meetup 11 April 2024 about Mendix Native development
Emixa Mendix Meetup 11 April 2024 about Mendix Native developmentEmixa Mendix Meetup 11 April 2024 about Mendix Native development
Emixa Mendix Meetup 11 April 2024 about Mendix Native developmentPim van der Noll
 
Generative Artificial Intelligence: How generative AI works.pdf
Generative Artificial Intelligence: How generative AI works.pdfGenerative Artificial Intelligence: How generative AI works.pdf
Generative Artificial Intelligence: How generative AI works.pdfIngrid Airi González
 
4. Cobus Valentine- Cybersecurity Threats and Solutions for the Public Sector
4. Cobus Valentine- Cybersecurity Threats and Solutions for the Public Sector4. Cobus Valentine- Cybersecurity Threats and Solutions for the Public Sector
4. Cobus Valentine- Cybersecurity Threats and Solutions for the Public Sectoritnewsafrica
 
Accelerating Enterprise Software Engineering with Platformless
Accelerating Enterprise Software Engineering with PlatformlessAccelerating Enterprise Software Engineering with Platformless
Accelerating Enterprise Software Engineering with PlatformlessWSO2
 
[Webinar] SpiraTest - Setting New Standards in Quality Assurance
[Webinar] SpiraTest - Setting New Standards in Quality Assurance[Webinar] SpiraTest - Setting New Standards in Quality Assurance
[Webinar] SpiraTest - Setting New Standards in Quality AssuranceInflectra
 
Testing tools and AI - ideas what to try with some tool examples
Testing tools and AI - ideas what to try with some tool examplesTesting tools and AI - ideas what to try with some tool examples
Testing tools and AI - ideas what to try with some tool examplesKari Kakkonen
 
QCon London: Mastering long-running processes in modern architectures
QCon London: Mastering long-running processes in modern architecturesQCon London: Mastering long-running processes in modern architectures
QCon London: Mastering long-running processes in modern architecturesBernd Ruecker
 
Why device, WIFI, and ISP insights are crucial to supporting remote Microsoft...
Why device, WIFI, and ISP insights are crucial to supporting remote Microsoft...Why device, WIFI, and ISP insights are crucial to supporting remote Microsoft...
Why device, WIFI, and ISP insights are crucial to supporting remote Microsoft...panagenda
 
Unleashing Real-time Insights with ClickHouse_ Navigating the Landscape in 20...
Unleashing Real-time Insights with ClickHouse_ Navigating the Landscape in 20...Unleashing Real-time Insights with ClickHouse_ Navigating the Landscape in 20...
Unleashing Real-time Insights with ClickHouse_ Navigating the Landscape in 20...Alkin Tezuysal
 
Genislab builds better products and faster go-to-market with Lean project man...
Genislab builds better products and faster go-to-market with Lean project man...Genislab builds better products and faster go-to-market with Lean project man...
Genislab builds better products and faster go-to-market with Lean project man...Farhan Tariq
 
Connecting the Dots for Information Discovery.pdf
Connecting the Dots for Information Discovery.pdfConnecting the Dots for Information Discovery.pdf
Connecting the Dots for Information Discovery.pdfNeo4j
 
Infrared simulation and processing on Nvidia platforms
Infrared simulation and processing on Nvidia platformsInfrared simulation and processing on Nvidia platforms
Infrared simulation and processing on Nvidia platformsYoss Cohen
 
Transcript: New from BookNet Canada for 2024: BNC SalesData and LibraryData -...
Transcript: New from BookNet Canada for 2024: BNC SalesData and LibraryData -...Transcript: New from BookNet Canada for 2024: BNC SalesData and LibraryData -...
Transcript: New from BookNet Canada for 2024: BNC SalesData and LibraryData -...BookNet Canada
 
Long journey of Ruby standard library at RubyConf AU 2024
Long journey of Ruby standard library at RubyConf AU 2024Long journey of Ruby standard library at RubyConf AU 2024
Long journey of Ruby standard library at RubyConf AU 2024Hiroshi SHIBATA
 
Zeshan Sattar- Assessing the skill requirements and industry expectations for...
Zeshan Sattar- Assessing the skill requirements and industry expectations for...Zeshan Sattar- Assessing the skill requirements and industry expectations for...
Zeshan Sattar- Assessing the skill requirements and industry expectations for...itnewsafrica
 
Design pattern talk by Kaya Weers - 2024 (v2)
Design pattern talk by Kaya Weers - 2024 (v2)Design pattern talk by Kaya Weers - 2024 (v2)
Design pattern talk by Kaya Weers - 2024 (v2)Kaya Weers
 
Microservices, Docker deploy and Microservices source code in C#
Microservices, Docker deploy and Microservices source code in C#Microservices, Docker deploy and Microservices source code in C#
Microservices, Docker deploy and Microservices source code in C#Karmanjay Verma
 
A Glance At The Java Performance Toolbox
A Glance At The Java Performance ToolboxA Glance At The Java Performance Toolbox
A Glance At The Java Performance ToolboxAna-Maria Mihalceanu
 
Modern Roaming for Notes and Nomad – Cheaper Faster Better Stronger
Modern Roaming for Notes and Nomad – Cheaper Faster Better StrongerModern Roaming for Notes and Nomad – Cheaper Faster Better Stronger
Modern Roaming for Notes and Nomad – Cheaper Faster Better Strongerpanagenda
 

Recently uploaded (20)

Abdul Kader Baba- Managing Cybersecurity Risks and Compliance Requirements i...
Abdul Kader Baba- Managing Cybersecurity Risks  and Compliance Requirements i...Abdul Kader Baba- Managing Cybersecurity Risks  and Compliance Requirements i...
Abdul Kader Baba- Managing Cybersecurity Risks and Compliance Requirements i...
 
Emixa Mendix Meetup 11 April 2024 about Mendix Native development
Emixa Mendix Meetup 11 April 2024 about Mendix Native developmentEmixa Mendix Meetup 11 April 2024 about Mendix Native development
Emixa Mendix Meetup 11 April 2024 about Mendix Native development
 
Generative Artificial Intelligence: How generative AI works.pdf
Generative Artificial Intelligence: How generative AI works.pdfGenerative Artificial Intelligence: How generative AI works.pdf
Generative Artificial Intelligence: How generative AI works.pdf
 
4. Cobus Valentine- Cybersecurity Threats and Solutions for the Public Sector
4. Cobus Valentine- Cybersecurity Threats and Solutions for the Public Sector4. Cobus Valentine- Cybersecurity Threats and Solutions for the Public Sector
4. Cobus Valentine- Cybersecurity Threats and Solutions for the Public Sector
 
Accelerating Enterprise Software Engineering with Platformless
Accelerating Enterprise Software Engineering with PlatformlessAccelerating Enterprise Software Engineering with Platformless
Accelerating Enterprise Software Engineering with Platformless
 
[Webinar] SpiraTest - Setting New Standards in Quality Assurance
[Webinar] SpiraTest - Setting New Standards in Quality Assurance[Webinar] SpiraTest - Setting New Standards in Quality Assurance
[Webinar] SpiraTest - Setting New Standards in Quality Assurance
 
Testing tools and AI - ideas what to try with some tool examples
Testing tools and AI - ideas what to try with some tool examplesTesting tools and AI - ideas what to try with some tool examples
Testing tools and AI - ideas what to try with some tool examples
 
QCon London: Mastering long-running processes in modern architectures
QCon London: Mastering long-running processes in modern architecturesQCon London: Mastering long-running processes in modern architectures
QCon London: Mastering long-running processes in modern architectures
 
Why device, WIFI, and ISP insights are crucial to supporting remote Microsoft...
Why device, WIFI, and ISP insights are crucial to supporting remote Microsoft...Why device, WIFI, and ISP insights are crucial to supporting remote Microsoft...
Why device, WIFI, and ISP insights are crucial to supporting remote Microsoft...
 
Unleashing Real-time Insights with ClickHouse_ Navigating the Landscape in 20...
Unleashing Real-time Insights with ClickHouse_ Navigating the Landscape in 20...Unleashing Real-time Insights with ClickHouse_ Navigating the Landscape in 20...
Unleashing Real-time Insights with ClickHouse_ Navigating the Landscape in 20...
 
Genislab builds better products and faster go-to-market with Lean project man...
Genislab builds better products and faster go-to-market with Lean project man...Genislab builds better products and faster go-to-market with Lean project man...
Genislab builds better products and faster go-to-market with Lean project man...
 
Connecting the Dots for Information Discovery.pdf
Connecting the Dots for Information Discovery.pdfConnecting the Dots for Information Discovery.pdf
Connecting the Dots for Information Discovery.pdf
 
Infrared simulation and processing on Nvidia platforms
Infrared simulation and processing on Nvidia platformsInfrared simulation and processing on Nvidia platforms
Infrared simulation and processing on Nvidia platforms
 
Transcript: New from BookNet Canada for 2024: BNC SalesData and LibraryData -...
Transcript: New from BookNet Canada for 2024: BNC SalesData and LibraryData -...Transcript: New from BookNet Canada for 2024: BNC SalesData and LibraryData -...
Transcript: New from BookNet Canada for 2024: BNC SalesData and LibraryData -...
 
Long journey of Ruby standard library at RubyConf AU 2024
Long journey of Ruby standard library at RubyConf AU 2024Long journey of Ruby standard library at RubyConf AU 2024
Long journey of Ruby standard library at RubyConf AU 2024
 
Zeshan Sattar- Assessing the skill requirements and industry expectations for...
Zeshan Sattar- Assessing the skill requirements and industry expectations for...Zeshan Sattar- Assessing the skill requirements and industry expectations for...
Zeshan Sattar- Assessing the skill requirements and industry expectations for...
 
Design pattern talk by Kaya Weers - 2024 (v2)
Design pattern talk by Kaya Weers - 2024 (v2)Design pattern talk by Kaya Weers - 2024 (v2)
Design pattern talk by Kaya Weers - 2024 (v2)
 
Microservices, Docker deploy and Microservices source code in C#
Microservices, Docker deploy and Microservices source code in C#Microservices, Docker deploy and Microservices source code in C#
Microservices, Docker deploy and Microservices source code in C#
 
A Glance At The Java Performance Toolbox
A Glance At The Java Performance ToolboxA Glance At The Java Performance Toolbox
A Glance At The Java Performance Toolbox
 
Modern Roaming for Notes and Nomad – Cheaper Faster Better Stronger
Modern Roaming for Notes and Nomad – Cheaper Faster Better StrongerModern Roaming for Notes and Nomad – Cheaper Faster Better Stronger
Modern Roaming for Notes and Nomad – Cheaper Faster Better Stronger
 

Skytools: PgQ Queues and applications

  • 1. Skytools: PgQ Queues and applications
  • 2. Agenda  PgQ basics  Queues, producers and consumers  New features in 3.0  Skytools – toolset and scripting framework  Custom consumers  Replication toolset  Managing the Skytools environment  Installing  Migrations and upgrades  Monitoring
  • 3. What is PgQ?  A queue implementation on PostgreSQL with a stored procedure interface.  Events are queued by producers to be subsequently processed by consumers. producer event queue events consumer Database
  • 4. PgQ: what it's good for?  Asynchronous messaging  Batch processing  Replication  Distributed transactions
  • 5. PgQ: Features  Transactional. Events are created transactionally, can be coupled with surrounding business logic.  Efficient. Events are processed in batches which gives low per event overhead.  Flexible. No limits on the number of producers or consumers. Custom event formats.  Reliable. Events are stored in PostgreSQL database – this adds the benefit of write ahead logging and crash recovery.  Easy to use. Simple SQL interface, API-s for several languages.  Open Source. No licensing fees, but occasionally you'll have to get your hands dirty.
  • 6. PgQ: example  Database for registering user accounts.  Events are generated by a pl/pgsql stored procedure.  Consumers also talk to the queue through stored procedure interface. java consumer: welcome email UserDB producer application: web client function: create_user C++ consumer: user counter queue: notifications python consumer: londiste replica queue: user_events
  • 7. PgQ: at Skype  Used everywhere where asynchronous data processing is needed.  Hundreds of queues and consumers.  Centrally monitored.
  • 8. PgQ: brief history  Started on 2006  Inspired by ideas from Slony  First application was Londiste replication  Open source since 2007 as part of Skytools framework  Version 3.0 in progress, alpha version out.
  • 9. PgQ: glossary  Event - atomic piece of data created by Producers. In PgQ event is one record in one of tables that services that queue. PgQ guarantees that each event is seen at least once but it is up to consumer to make sure that event is processed no more than once if that is needed.  Batch - PgQ is designed for efficiency and high throughput so events are grouped into batches for bulk processing.  Queue - Event are stored in queue tables i.e queues. Several producers can write into same queue and several consumers can read from the queue. Events are kept in queue until all the consumers have seen them.  Producer - applications that pushes event into queue. Producer can be written in any language that is able to run stored procedures in PostgreSQL.  Consumer - application that reads events from queue. Consumers can be written in any language that can interact with PostgreSQL.
  • 10. PgQ: Queue  Essentially a set of tables in a userdb=# select * from pgq.queue; PostgreSQL database. -[ RECORD 1 ]------------+--- queue_id | 1  Default is to have 3 tables per queue, these are rotated to efficiently purge queue_name | q1 discarded events. queue_ntables | 3 queue_cur_table | 2  Event is discarded when all the queue_data_pfx | pgq.event_1 consumers have processed it. ...  Queues are accessible through stored userdb=# dt pgq.event_1* procedure API. Tools also available. List of relations  There can be multiple queues in one Schema | Name | Type | Owner database. --------+-----------+-------+---------  Any number of producers and pgq | event_1 | table | martinp consumers to the queue. pgq | event_1_0 | table | martinp pgq | event_1_1 | table | martinp pgq | event_1_2 | table | martinp (4 rows)
  • 11. PgQ: Queue API  Creating and dropping queues  pgq.create_queue(qname)  pgq.drop_queue(qname)  Queue information functions  pgq.get_queue_info(qname)  pgq.get_consumer_info(qname)  pgq.current_event_table(qname)  Managing consumers  pgq.register_consumer(qname, cname)  pgq.unregister_consumer(qname, cname)
  • 12. PgQ: Event userdb=# d pgq.event_1 Column | Type  A record in the queue table. -----------+--------------------------  Internal fields used for event ev_id | bigint processing. ev_time | timestamp with time zone ev_txid | bigint  Payload data, with user defined ev_owner | integer content. ev_retry | integer  The content format is agreed ev_type | text between producer and ev_data | text consumer. ev_extra1 | text ev_extra2 | text  Field names hint at their ev_extra3 | text intended usage. ev_extra4 | text Inherits: pgq.event_template
  • 13. PgQ: Batch  Events are grouped into batches for efficient processing.  Consumers obtain events in batches.  Batch size can be tuned to suit the application or network topology. For example, we might want to use a larger batch size for processing over wide area networks.  Small batches have higher processing overhead, however too big batches have their own disadvantages.  Batches are prepared by separate process called the ticker.
  • 14. PgQ: Ticker  Is a daemon that periodically creates ticks on the queues. The tick is essentially a position in the event stream.  A batch is formed of events that are enqueued between two ticks.  Without ticker there are no batches, without batches events cannot be processed.  Pausing the ticker for extended period will produce a huge batch, consumers might not be able to cope with it.  Ticker is also involved in miscellaneous housekeeping task, such as vacuuming pgq tables, scheduling retry events and rotating queue tables.  Keep the ticker running!
  • 15. PgQ: Consumer  Subscribes to queue.  Obtains events from queue by asking for a batch.  Sees only events that have been produced after the subscription.  Events are seen at least once – events don't get lost.  Must have some sort of event tracking to process only once – skytools has several implementations.  If the event cannot be immediately processed it can be postponed for retry processing (eg. some resource temporarily unavailable).
  • 16. PgQ: Event processing  Ask for next batch id: Event structure pgq.next_batch(queue, consumer)  Nothing to do if NULL returned – sleep Column | Type and try again. -----------+------------ ev_id | bigint  Ask the set of events to be returned: ev_time | timestamptz pgq.get_batch_events(batch_id) ev_txid | bigint  Process the events. Note that the batch ev_retry | integer can be empty if there were no events for ev_type | text the period. ev_data | text ev_extra1 | text  Schedule the event for retry processing if ev_extra2 | text necessary: ev_extra3 | text pgq.event_retry(batch_id, ev_id, sec) ev_extra4 | text  Finalize the batch: pgq.finish_batch(batch_id)
  • 17. PgQ: Consumer status  Can be obtained by calling pgq.get_consumer_info()  Reports queue, consumer name, lag and last seen for all of the consumers. Parameterized versions also available.  lag is the age of the last finished batch.  last seen is the elapsed time since consumer processed a batch. userdb=# select * from pgq.get_consumer_info(); -[ RECORD 1 ]-+---------------- queue_name | notifications consumer_name | welcome_emailer lag | 02:32:34.440654 last_seen | 00:00:00.019398 last_tick | 4785 current_batch | 4754 next_tick | 4786
  • 18. PgQ: Event tracking  PgQ guarantees that the consumer sees the event at least once. But this could mean that the consumer could see the event more than once.  This happens if the consumer crashes in the middle of processing a batch.  Not a problem if the events are processed in the same database that they are produced – just keep the processing in the same transaction.  We need to address the case where events are processed outside a database or in a remote database.  Consumer needs to be able to keep track of which events are already processed.
  • 19. PgQ: Event tracking  Event and batch tracking support is included in pgq_ext schema – this is part of Skytools, but must be installed separately to target database.  Use batch tracking when the whole batch is processed transactionally pgq_ext.is_batch_done(consumer, batch_id) pgq_ext.set_batch_done(consumer, batch_id)  Per-event tracking is used to keep track of single events: pgq_ext.is_event_done(consumer, batch_id, ev_id) pgq_ext.set_event_done(consumer, batch_id, ev_id)  Event tracking can be used to implement distributed transactions.
  • 20. PgQ: distributed transactions  Event tracking can be used to implement asynchronous distributed transactions.  We'll use batch tracking as an example.  Skip the batch if it is already processed on target.  Otherwise process the batch and mark as processed.  Commit on target.  Finish batch and commit on source.  Skytools framework handles this automatically!
  • 21. PgQ: Producer  Anything that places events into queues.  Event payload format agreed between producer and consumer.  Basic usage via SQL API.  pgq.insert_event(queue, ev_type, ev_data)  Replication uses triggers for providing events.  pgq.sqltriga(queue, options)  pgq.logutriga(queue, options)  Bulk load is also possible.
  • 22. PgQ: sqltriga and logutriga  Log triggers are used for enqueuing table change log entries. Typically used for replication, but have other uses as well.  Table structure is detected automatically - no messing around with column definition lists.  Arguments:  SKIP – enqueue only, skip the actual DML operation. Used in before-triggers.  ignore=cols - The listed columns will be omitted from payload.  pkey=cols - Defines a primary key for the table.  backup – Add a copy of the original row to payload.  Event format:  pgq.sqltriga – partial SQL format used by Londiste replication.  pgq.logutriga – URL encoded format.
  • 23. PgQ: logutriga payload  logutriga uses database specific URL encoding for payload data:  column1=value1&column2=value2&column3&...  column names and data values are URL encoded.  NULL values are specified by omitting value and equal sign.  Payload:  ev_type – operation type: I/U/D plus primary key columns.  ev_data – URL encoded row data  ev_extra1 – Table name  ev_extra2 – URL encoded row backup
  • 24. PgQ: logutriga example  We'll add a trigger to users table create table users ( user_id serial primary key, that enqueues notification username text unique, events for new users. password text not null,  Add after-insert trigger that email text, executes logutriga with some date_created timestamp default now() columns ignored. );  We could use before-insert create trigger welcome_user_trg triggers with SKIP option to after insert on users implement queue only tables. for each row execute procedure pgq.logutriga( 'notifications', 'ignore=password');
  • 25. PgQ: logutriga in action Insert statement Event data insert into users ( ev_id | 26 username, ev_time | 2009-05-14 11:07:54.231954+02 password, ev_txid | 263834 email ev_owner | ) values ( ev_retry | 'bob', ev_type | I:user_id 'secret', ev_data | user_id=1&username=bob&email=bob%40foo.bar 'bob@foo.bar' ev_extra1 | public.users ); ev_extra2 | ev_extra3 | ev_extra4 |
  • 26. Skytools 3: new features  Cooperative consumer – distributing the load of a single consumer between cooperating sub consumers.  Cascading support – identical queue is maintained across nodes, consumers can easily move between nodes.  Per-database tickers replaced with a single pgqd daemon.  qadmin utility for managing queues and consumers.
  • 27. PgQ: Cooperative consumer  Sometimes single consumer cannot keep up with the volume of incoming events.  Parallel consumers would help, but we need to somehow divide the workload – avoid processing the same event twice.  We need the consumers to work in cooperation!  Skytools 3 introduces sub consumers for the purpose. These share the workload of the master consumer in a cooperative manner.  There are some differences in registration and batch handling, but they look a lot like regular consumers: pgq_coop.register_subconsumer(qname,cname,scname) pgq_coop.unregister_subconsumer(qname,cname,scname,mode) pgq_coop.next_batch(qname,cname,scname) pgq_coop.finish_batch(batch_id)
  • 28. PgQ: Cascading  The cascade is a set of database nodes and a queue that is distributed root branch 1 between the nodes. Event and batch q1 q1 numbers are kept identical!  The cascade can be depicted as a tree, where events created in the root are propagated down the cascade to other nodes. q1 leaf  There can be only one root node, but branch 2 any number of branches or leaves.  Leaf nodes are specific to replication. They don't have a copy of the queue and don't participate in event propagation.
  • 29. PgQ: Cascading  Typical replication cascade would consist of a primary database -- the root, replicated to standby database – a branch.  We can easily switch the replication roles of root and branch nodes. Consumers will continue as if nothing happened.  On branch node failure we move its consumers to some surviving node. Business as usual.  On root node failure we promote some surviving branch to root and reattach the consumers. In this scenario we have to deal with data loss.
  • 30. PgQ wrapup  Producers produce events into queues.  Ticker groups events into batches.  Batches are served to consumers in FIFO order.  Consumers can track processed events with pgq_ext. pgq_ext ticker tick queue 1 tick event tracking producer event queue 2 batch of events consumer Database
  • 32. Skytools: introduction  Set of applications for replication, batch processing and queue management.  Includes a Python scripting framework that greatly simplifies the implementation of custom consumers.  Written mostly in Python with some bits in C.  PgQ is distributed as part of Skytools.  Get it from http://pgfoundry.org/projects/skytools/  We'll start by configuring the ticker.
  • 33. The Ticker  The ticker prepares batches for the consumers.  Also performs queue maintenance: table rotation, vacuuming, re- queuing retry events.  We'll need to configure it for each database that has queues.  Skytools 3 includes a super-ticker that handles all the databases in a PostgreSQL cluster.
  • 34. pgqadm.py: configuration file  db – the name of the database [pgqadm] job_name = pgqadm_userdb where queues are located. db = dbname=userdb  maint_delay_min – interval, # how often to run maintenance [minutes] at which maintenance maint_delay_min = 5 commands are run. # how often to check for activity [secs] loop_delay = 0.5  loop_delay – interval, at which the ticker wakes up to check for logfile = log/%(job_name)s.log pidfile = pid/%(job_name)s.pid work.
  • 35. pgqadm.py: installing and starting  Install pgq schema  Start the ticker in background  Repeat the process for all databases with queues. $ pgqadm.py pgqadm_userdb.ini install 2009-05-13 12:37:17,913 19376 INFO plpgsql is installed 2009-05-13 12:37:17,936 19376 INFO txid_current_snapshot is installed 2009-05-13 12:37:17,936 19376 INFO Installing pgq 2009-05-13 12:37:17,984 19376 INFO Reading from /usr/local/share/skytools/pgq.sql $ pgqadm.py pgqadm_userdb.ini ticker -d ... $ tail log/pgqadm_userdb.log 2009-05-13 12:45:42,572 17184 INFO {ticks: 1} 2009-05-13 12:45:42,586 17184 INFO {maint_duration: 0.0229749679565} 2009-05-13 12:50:42,639 17184 INFO {maint_duration: 0.0530850887299} 2009-05-13 12:50:42,719 17184 INFO {ticks: 9}
  • 36. pgqadm.py: command line  pgqadm.py provides additional functionality besides the ticker:  creating and configuring queues  managing consumers  querying status  Convenient front end for the PgQ SQL API. Usage: pgqadm.py [options] INI CMD [subcmd args] commands: ticker start ticking & maintenance process status show overview of queue health install install code into db create QNAME create queue drop QNAME drop queue register QNAME CONS install code into db unregister QNAME CONS install code into db config QNAME [VAR=VAL] show or change queue config
  • 37. pgqadm.py: creating queues and consumers  We'll create a queue called notifications and subscribe a consumer welcome_consumer to it. $ pgqadm.py pgqadm_userdb.ini create notifications 2009-05-12 17:53:34,726 12610 INFO Creating queue: notifications $ pgqadm.py pgqadm_userdb.ini register notifications welcome_consumer 2009-05-12 17:53:47,808 12632 INFO Registering consumer welcome_consumer on queue notifications $ pgqadm.py pgqadm_userdb.ini status Postgres version: 8.3.7 PgQ version: 2.1.8 Event queue Rotation Ticker TLag ------------------------------------------------------------------------------ notifications 3/7200s 500/3s/60s 1s ------------------------------------------------------------------------------ Consumer Lag LastSeen ------------------------------------------------------------------------------ notifications: welcome_consumer 116s 103s ------------------------------------------------------------------------------
  • 38. pgqadm.py: setting queue parameters  Per-queue tuning options:  ticker_max_lag – Max time between ticks.  ticker_idle_period – Tick at interval, if no events.  ticker_max_count – Tick, if number of events exceeds this. Can be used to tune batch sizes.  rotation_period – How often to rotate queue tables, balance between disk space and event history. $ pgqadm.py pgqadm_userdb.ini config notifications notifications ticker_max_lag = 3 ticker_idle_period = 60 rotation_period = 7200 ticker_max_count = 500 $ pgqadm.py pgqadm_userdb.ini config notifications ticker_max_count=1000 Change queue notifications config to: queue_ticker_max_count='1000'
  • 39. pgqadm.py: summary  Runs the ticker.  Performs queue maintenance.  Provides command line interface for managing the queues and consumers.
  • 40. Python framework  The framework handles database connection management, logging, statistics, quoting, daemonization etc.  Most of the Skytools applications are implemented by extending DBScript - a Python class providing the infrastructure needed for typical database batch job.  Several base classes available for implementing custom consumer applications.
  • 41. DBScript: configuration  Most DBScripts have a configuration file that defines the parameters for the script – source database, queue name, log file location etc.  Python ConfigParser format.  Common options:  job_name – Identifies the current script  loop_delay – how often to check for work, seconds.  pidfile – pid file for daemons  logfile – log file name  log_size – size of individual log files.  log_count – number of log files kept.  use_skylog – override logging configuration by skylog.ini
  • 42. DBScript: command line  Standard invocation is: script.py configuration.ini [options]  Common options:  -h, --help – Show usage for the particular script.  -v, --verbose – Make the script more verbose.  -q, --quiet – Log only errors and warnings.  -d, --daemon – Run the script in background.  -r, --reload – Reload a running application,  -s, --stop – Wait for work loop to finish, then stop.  -k, --kill – Terminate the application immediately.
  • 43. Custom consumer  We'll write a small queue application called welcome_consumer.  The application reads events from a queue and prints out event payload if type matches “welcome”.  The application base class extends Consumer which in turn extends DBScript.  Consumer implements the pgq consumer event loop. We only need to add the bits that do the event handling.  All of the regular DBScript configuration and command options apply.
  • 44. Custom consumer: configuration  pgq_queue_name – name of [welcome_app] the queue the consumer is job_name = welcome_consumer subscribing to. src_db = dbname=userdb  pgq_consumer_id – name of pgq_queue_name = notifications the consumer, defaults to pgq_consumer_id = %(job_name)s job_name if not present. logfile = log/%(job_name)s.log pidfile = pid/%(job_name)s.pid
  • 45. Custom consumer: Python code import sys, pgq, skytools Event structure class WelcomeConsumer(pgq.Consumer): Column | Type def __init__(self, args): pgq.Consumer.__init__(self, -----------+------------ "welcome_app", "src_db", args) ev_id | bigint ev_time | timestamptz def process_event(self, src_db, ev): ev_txid | bigint if ev.ev_type == 'welcome': ev_retry | integer self.log.info('Welcome %s!' % ev.ev_data) ev_type | text ev.tag_done() ev_data | text ev_extra1 | text if __name__ == '__main__': ev_extra2 | text script = WelcomeConsumer(sys.argv[1:]) ev_extra3 | text script.start() ev_extra4 | text
  • 46. Custom consumer: event processing  The process_event() function is called for each event in a batch. If there are no batches the script sleeps loop_delay seconds and retries.  A DB API connection to the queue database is passed in src_db, the transaction will be committed after successfully returning from process_event().  On failure, the transaction will be rolled back, active batch will be reprocessed on next iteration.  We need to call tag_done() for the processed events – otherwise they'll be scheduled for retry processing. ... def process_event(self, src_db, ev): if ev.ev_type == 'welcome': self.log.info('Welcome %s!' % ev.ev_data) ev.tag_done() ...
  • 47. Custom consumer: running it  The consumer must be subscribed to the queue before events can be processed. In Skytools 2 this happens automatically.  Skytools 3 requires explicit subscription, provides a –register switch for the purpose.  For each processed batch the script logs the number of events and processing duration. $ python welcome_consumer.py welcome_consumer.ini 2009-05-13 11:50:27,700 4318 INFO {count: 0, duration: 0.0322341918945} 2009-05-13 11:50:27,705 4318 INFO {count: 0, duration: 0.00421690940857} 2009-05-13 11:51:13,720 4318 INFO {count: 0, duration: 0.0121331214905}
  • 48. Custom consumer: event processing  So far our consumer hasn't seen any events.  We'll use pgq.insert_event() stored procedure to feed some test events into the queue.  In it's simplest form it takes queue name, event type and payload as arguments. userdb# select pgq.insert_event('notifications', 'welcome', 'Custom Consumer'); userdb# select pgq.insert_event('notifications', 'irrelevant', 'Another Event'); ... 2009-05-13 12:19:11,563 6884 INFO {count: 0, duration: 0.00770711898804} 2009-05-13 12:19:14,583 6884 INFO {count: 0, duration: 0.0210809707642} 2009-05-13 12:19:25,591 6884 INFO Welcome Custom Consumer! 2009-05-13 12:19:25,595 6884 INFO {count: 2, duration: 0.012876033783} 2009-05-13 12:19:28,608 6884 INFO {count: 0, duration: 0.0131230354309}
  • 49. Custom consumer: event tracking  Extend RemoteConsumer to add batch tracking support.  pgq_ext must be installed on the target database.  We'll use a simple counter application as an example.  This actually implements distributed transactions. class UserCounter(pgq.RemoteConsumer): def __init__(self, args): pgq.RemoteConsumer.__init__(self, "user_counter", "src_db", "dst_db", args) def process_remote_batch(self, db, batch_id, event_list, dst_db): for ev in event_list: ev.tag_done() cur = dst_db.cursor() cur.execute("update user_count set n = n + %s" % len(event_list))
  • 50. Custom consumer: wrapup  We have just implemented some simple PgQ consumers.  Extend Consumer class for simple consumers. Advanced consumer base classes also available.  RemoteConsumer and SerialConsumer – provide batch tracking, these are used for processing events in remote databases.  CascadedConsumer adds cascading support (Skytools 3). ticker tick notifications batch of events event welcome consumer pgq.insert_event userdb
  • 51. Replication toolset  Replication tools built on top of PgQ:  londiste – replication  table dispatcher – archiving and partitioning  queue mover – copy events from one queue to another  queue splitter – split queues into queues  Changelog triggers are used for capturing table data changes.  Replication process is just another PgQ consumer.
  • 52. Londiste  Master/slave replication system implemented on top of PgQ.  Uses sqltriga/logtriga to capture table changes on the master.  PgQ consumer replays the captured events on the slave.  One master can feed several slaves.  Slaves can be masters to other slaves. Master Slave ticker source table sqltriga londiste tick completed changelog event tracking queue batch of events londiste worker DML target table
  • 53. Londiste: setting up [londiste]  Prepare the configuration file – source and job_name = l_u_to_f target databases, queue name.  Run londiste install commands for provider_db = dbname=userdb provider and subscriber. subscriber_db = dbname=foodb  Start the replication process – consume pgq_queue_name = user_events from master and replay on slave.  The replay process can run anywhere, as logfile = log/%(job_name)s.log long as it can connect to both databases. pidfile = pid/%(job_name)s.pid  Add tables to replication.  Initial copy is started, tables are usable on slave after it finishes.
  • 54. Londiste: demonstration $ londiste.py londiste_userdb_to_foodb.ini provider install 2009-05-14 15:12:42,714 27716 INFO plpgsql is installed 2009-05-14 15:12:42,716 27716 INFO txid_current_snapshot is installed 2009-05-14 15:12:42,716 27716 INFO pgq is installed 2009-05-14 15:12:42,717 27716 INFO Installing londiste 2009-05-14 15:12:42,717 27716 INFO Reading from /usr/local/share/skytools/londiste.sql $ londiste.py londiste_userdb_to_foodb.ini subscriber install 2009-05-14 15:12:48,887 27728 INFO plpgsql is installed 2009-05-14 15:12:48,889 27728 INFO Installing londiste 2009-05-14 15:12:48,889 27728 INFO Reading from /usr/local/share/skytools/londiste.sql $ londiste.py londiste_userdb_to_foodb.ini replay -d $ pg_dump -t users -s userdb | psql foodb $ londiste.py londiste_userdb_to_foodb.ini provider add users 2009-05-14 15:15:19,730 27959 INFO Adding public.users $ londiste.py londiste_userdb_to_foodb.ini subscriber add users 2009-05-14 15:16:29,845 28082 INFO Checking public.users 2009-05-14 15:16:29,888 28082 INFO Adding public.users $ tail log/londiste_userdb_to_foodb.log 2009-05-14 15:44:47,293 28122 INFO {count: 0, ignored: 0, duration: 0.0210900306702} 2009-05-14 15:45:47,309 28122 INFO {count: 0, ignored: 0, duration: 0.0170979499817}
  • 55. Table dispatcher  Archiving and partitioning tool.  Customizable table structure.  Automatically creates partitions based on user specified conditions.  Does not handle updates. Master Slave ticker source table logutriga pgq_ext tick completed_tick insert changelog event tracking target table queue table dispatcher batch of events DML partitions
  • 56. Table dispatcher: setting up  Add logutriga to the source table, or userdb# d users ... reuse an existing trigger. Triggers:  Create the base table structure on welcome_user_trg AFTER INSERT ON users FOR EACH ROW EXECUTE PROCEDURE target. Individual table partitions will pgq.logutriga('notifications', be inherited from that. 'ignore=password')  Prepare the configuration file which archivedb# d user_history Column | Type specifies the source queue, target --------------+---------- table and partitioning options. username | text date_created | timestamp
  • 57. Table dispatcher: configuration  dest_table – Base table for [table_dispatcher] partitions. job_name = user_archiver  fields – select the columns to src_db = dbname=userdb dst_db = dbname=archivedb include, or use * for all. pgq_queue_name = notifications  part_field – the column used for partitioning. logfile = log/%(job_name)s.log pidfile = pid/%(job_name)s.pid  part_method – either daily or monthly. dest_table = user_history fields = username, date_created  part_template – SQL template part_field = date_created part_method = daily for creating the partitions. part_template = create table _DEST_TABLE () inherits (user_history); grant select on _DEST_TABLE to reporting;
  • 58. Table dispatcher: demonstration $ table_dispatcher.py td_userdb_to_archivedb.ini 2009-05-18 11:05:14,370 10625 INFO {count: 0, duration: 0.0341429710388} 2009-05-18 11:05:14,379 10625 INFO {count: 1, duration: 0.00861620903015} 2009-05-18 11:05:15,394 10625 INFO {count: 0, duration: 0.0151319503784} ... $ psql archivedb archivedb# dt user_history* List of relations Schema | Name | Type | Owner --------+-------------------------+-------+--------- public | user_history | table | martinp public | user_history_2009_05_17 | table | martinp public | user_history_2009_05_18 | table | martinp (3 rows)
  • 59. Queue mover  Transports events from one queue to another.  Useful for performing queue migrations.  Consolidating queues from partitioned databases. Master Slave ticker pgq_ext tick completed_tick event tracking source target queue mover events queue queue batch of events
  • 60. Queue splitter  Transports events from one queue to several target queues.  ev_extra1 field is used to determine the target queue. logutriga automatically puts table name there.  Useful for transporting events for batch processing. Source Target pgq_ext completed_tick logutriga() ticker event tracking target queue event tick events target queue source events queue splitter queue batch of events target events queue
  • 61. Replication tools: wrapup  Replication tools are ordinary PgQ consumers implemented with Skytools framework.  On master database changelog events are enqueued through sqltriga/logutriga.  On slave the DML statements are reconstructed and replayed.  Event tracking is used to ensure that duplicate batches are not processed.
  • 63. Skytools: getting and installing  Prerequisites:  Python  psycopg2  If you are lucky:  apt-get install skytools  http://yum.pgsqlrpms.org  Building from source  Get it from http://pgfoundry.org/projects/skytools/  Needs PostgreSQL development headers and libraries  untar, configure, make install  For the adventurous, Skytools3:  http://github.com/markokr/skytools-dev
  • 64. Skytools2: installing from tarball  Get the latest tarball from pgfoundry.  Dependencies:  C compiler and make  PostgreSQL development headers and libraries  Python development package  Makefile can also generate Debian packages. $ tar zxf skytools-2.1.9.tar.gz $ cd skytools-2.1.9 $ ./configure –prefix=/usr/local $ make $ sudo make install ... or ... $ make deb83
  • 65. Skytools3: building from Git  Main repository is on github, clone from there or create your own fork.  Adds additional dependencies:  asciidoc  xmlto  autoconf $ git clone git://github.com/markokr/skytools-dev.git $ cd skytools-dev $ git submodule init $ git submodule update $ make boot $ ./configure –prefix=/usr/local --with-asciidoc $ make
  • 66. Skytools: migrations and upgrades  Upgrading a database with PgQ – pretty much straightforward, but has some additional steps.  Migrating consumers from one database to another – to take some load off the primary server or to prepare for database migrations.  Migrating whole databases.
  • 67. Upgrading a PgQ database 1. pg_dump the database, shutdown database, stop tickers and consumers. 2. Run pg_resetxlog -n to determine the current epoch (extract from Latest checkpoint's NextXID). 3. Upgrade PostgreSQL binaries AND skytools modules. 4. Run pg_resetxlog -e to increase the epoch value. This is needed to enable pgq to correctly interpret stored txid values. Alternatively, if you are using the schema based txid (prior to 8.3), start the cluster and update the epoch in txid schema: UPDATE txid.epoch SET epoch = epoch + 1, last_value = (get_current_txid() & 4294967295); 5. Start the database, import dump file. 6. Start the ticker and consumers.
  • 68. Skytools2: migrating consumers 1. Set up a queue mover to replicate the queue to new database, we will move the consumer subscriptions to the queue replica. 2. Stop the ticker on primary database - no more new batches will be prepared. After processing the pending batches, the consumers will stop at identical positions on the queue. 3. We can now subscribe the consumers to the replicated queue. Note that we need to reset the event tracking for the migrated consumers. Replication tools have --reset option for the purpose. 4. Start the ticker. Queue mover will continue queue replication, consumers on the new queue will continue where they left off. 5. If all is good, unregister the consumers from the old master.
  • 69. Skytools3: migrating consumers  Cascaded queues are kept identical across nodes - no need to set up explicit queue movers.  Consumers that extend CascadedConsumer can be switched by simply running change-provider command of the set admin tool.  No need to mess around with tickers and configuration files.  The core features are complete, some development needed.
  • 70. Skytools2: migrating databases 1. Create the database structure on the new host. Omit pgq, pgq_ext and londiste schemas – better to reinstall those later. 2. Replicate the database to the new host using londiste. 3. Create the queues and changelog triggers on the new database. 4. Pay special attention to applications that use stored procedures to enqueue events - maybe a queue mover is needed? 5. Migrate the consumers to the new database. 6. Make the primary database read-only and wait for londiste replication to catch up. 7. Redirect the applications to the new database.
  • 71. Skytools3: migrating databases  Cascading can be used to greatly simplify the migration process.  The target database should be a branch node of the cascade.  Migration is then performed by stopping the applications, running a londiste switchover command and redirecting the applications to the new database.  Switchover will switch the roles of the root and branch, consumers needn't be aware that something changed.
  • 72. Skytools: monitoring consumers  We need to ensure that all our consumers are running happily.  The best indicator for this is the consumer lag – if a consumer is lagging, it is not processing events adequately.  pgqadm.py status command or pgq.get_consumer_info() SQL function can be used to determine the lag.  In the example welcome_consumer hasn't processed anything in 6 days – probably not running at all. select queue_name, consumer_name, lag, last_seen from pgq.get_consumer_info(); queue_name | consumer_name | lag | last_seen ---------------+--------------------+-----------+---------- notifications | user_counter | 00:00:43 | 00:00:00 notifications | welcome_consumer | 6 days | 6 days
  • 73. Skytools: logging  Skytools applications use Python logging module which can be used to forward the log and statistics messages to a central location.  Just set use_skylog = 1 in the configuration and configure the log handlers in skylog.ini  Use syslog or write your own log handler. Examples are provided for sending the log over UDP or to a PostgreSQL database via stored procedure calls (see skylog.py).  At Skype, we use the logging facilities to populate a configuration management database and feed error messages to Nagios.
  • 74. Skytools: links  PgFoundry project page http://pgfoundry.org/projects/skytools  PgQ tutorial http://wiki.postgresql.org/wiki/PGQ_Tutorial  Tool documentation http://skytools.projects.postgresql.org/doc/  PHP consumer http://pgsql.tapoueh.org/pgq/pgq-php/  Github repository for Skytools3 http://github.com/markokr/skytools-dev/tree/master