diff --git a/python/pgq/__init__.py b/python/pgq/__init__.py index d8473da5..9716995a 100644 --- a/python/pgq/__init__.py +++ b/python/pgq/__init__.py @@ -6,6 +6,7 @@ import pgq.consumer import pgq.remoteconsumer import pgq.producer +import pgq.slaveconsumer import pgq.status @@ -21,6 +22,7 @@ from pgq.remoteconsumer import * from pgq.localconsumer import * from pgq.producer import * +from pgq.slaveconsumer import * from pgq.status import * @@ -35,6 +37,7 @@ pgq.coopconsumer.__all__ + pgq.remoteconsumer.__all__ + pgq.localconsumer.__all__ + + pgq.slaveconsumer.__all__ + pgq.cascade.nodeinfo.__all__ + pgq.cascade.admin.__all__ + pgq.cascade.consumer.__all__ + diff --git a/python/pgq/slaveconsumer.py b/python/pgq/slaveconsumer.py new file mode 100644 index 00000000..217c1d9c --- /dev/null +++ b/python/pgq/slaveconsumer.py @@ -0,0 +1,132 @@ + +"""Slave Consumer that runs in a Postgres readslave. + It allocates a new batch from master and reads events + locally by taking advantage of Postgres streaming replication. + It also sends back a batch of events for retry (instead of sending + back events one at a time). + +""" + +from pgq.consumer import Consumer + +__all__ = ['SlaveConsumer'] + + +# Event status codes +EV_UNTAGGED = -1 +EV_RETRY = 0 +EV_DONE = 1 + + +class SlaveConsumer(Consumer): + """Reads locally and writes to remote master. + Retries events in batch. + """ + + def __init__(self, service_name, db_name, slave_db, args): + """db_name is the remote master, slave_db is the local readslave + """ + Consumer.__init__(self, service_name, db_name, args) + self.slave_db = slave_db + + def work(self): + """Do the work loop, once (internal). + Returns: true if wants to be called again, + false if script can sleep. + + This class allocates a new batch in master, + then it checks locally if streaming replication + has already caught up with regard to the batch_id + tick_id. + It yes, then proceed to read locally all the events and process them. + Otherwise return 0 and will retry the same batch later. + """ + self.pgq_lazy_fetch = 0 + + db = self.get_database(self.db_name) + master_curs = db.cursor() + + self.stat_start() + + # acquire batch + batch_id = self._load_next_batch(master_curs) + db.commit() + if batch_id == None: + return 0 + slave_curs = self.get_database(self.slave_db).cursor() + + slave_curs.execute("""select tick_id from pgq.tick where tick_id = %s""", + [self.batch_info['tick_id']]) + if slave_curs.rowcount <= 0: + self.get_database(self.slave_db).commit() + return 0 + + # load events locally + ev_list = self._load_batch_events(slave_curs, batch_id) + self.get_database(self.slave_db).commit() + + # process events + self._launch_process_batch(db, batch_id, ev_list) + + # done, ack to master + self._finish_batch(master_curs, batch_id, ev_list) + db.commit() + self.stat_end(len(ev_list)) + + return 1 + + def _flush_retry(self, master_curs, batch_id, list): + """Tag retry events.""" + + retry = 0 + retry_list = [] + retry_time = None + if self.pgq_lazy_fetch: + for ev_id, stat in list.iter_status(): + if stat[0] == EV_RETRY: + retry_list.append(ev_id) + if not retry_time or stat[1] > retry_time: + retry_time = stat[1] + retry += 1 + elif stat[0] != EV_DONE: + raise Exception("Untagged event: id=%d" % ev_id) + else: + for ev in list: + if ev._status == EV_RETRY: + retry_list.append(ev.id) + if not retry_time or ev.retry_time > retry_time: + retry_time = ev.retry_time + retry += 1 + elif ev._status != EV_DONE: + raise Exception("Untagged event: (id=%d, type=%s, data=%s, ex1=%s" % ( + ev.id, ev.type, ev.data, ev.extra1)) + + # report weird events + if retry: + self._batch_retry(master_curs, batch_id, retry_list, retry_time) + self.stat_increase('retry-events', retry) + + def _batch_retry(self, cx, batch_id, retry_list, retry_time): + """send a batch of event ids for retry""" + cx.execute("select pgq.batch_slave_event_retry(%s, %s, %s)", + [batch_id, '{' + ",".join(str(x) for x in retry_list) + '}', retry_time]) + + def _load_batch_events(self, curs, batch_id): + """Fetch all events for this batch.""" + + # load events + sql = "select * from pgq.get_batch_slave_events(%d, '%s', '%s', %d, %d)" % ( + batch_id, self.queue_name, self.consumer_name, + self.batch_info['cur_tick_id'], + self.batch_info['prev_tick_id']) + if self.consumer_filter is not None: + sql += " where %s" % self.consumer_filter + curs.execute(sql) + rows = curs.dictfetchall() + + # map them to python objects + ev_list = [] + for r in rows: + ev = self._make_event(self.queue_name, r) + ev_list.append(ev) + + return ev_list diff --git a/skytools_3.2.spec b/skytools_3.2.spec new file mode 100644 index 00000000..864b7c34 --- /dev/null +++ b/skytools_3.2.spec @@ -0,0 +1,185 @@ +%global pginstdir /usr/pgsql-9.2 +%global pgmajorversion 92 +%global sname skytools +%global _libdir /usr/lib + +# Python major version. +%{expand: %%define pyver %(python -c 'import sys;print(sys.version[0:3])')} +%{!?python_sitearch: %define python_sitearch %(python -c "from distutils.sysconfig import get_python_lib; print get_python_lib(1)")} + +Summary: PostgreSQL database management tools from Skype +Name: %{sname}-%{pgmajorversion} +Version: 3.2 +Release: insta3%{?dist} +License: BSD +Group: Applications/Databases +Source0: %{sname}-%{version}.tar.gz +URL: http://pgfoundry.org/projects/skytools +BuildRequires: postgresql%{pgmajorversion}-devel, python-devel +Requires: python-psycopg2, postgresql%{pgmajorversion} +BuildRoot: %{_tmppath}/%{name}-%{version}-%{release}-root-%(%{__id_u} -n) +Autoreq: 0 + +%description +Database management tools from Skype:WAL shipping, queueing, replication. +The tools are named walmgr, PgQ and Londiste, respectively. + +%package modules +Summary: PostgreSQL modules of Skytools +Group: Applications/Databases +Requires: %{sname}-%{pgmajorversion} = %{version}-%{release} + +%description modules +This package has PostgreSQL modules of skytools. + +%prep +%setup -q -n %{sname}-%{version} + +%build +%configure --with-pgconfig=%{pginstdir}/bin/pg_config --libdir=$RPM_BUILD_ROOT%{_libdir} --with-xmlto --with-asciidoc + +make %{?_smp_mflags} + +%install +rm -rf %{buildroot} + +make %{?_smp_mflags} DESTDIR=%{buildroot} python-install modules-install + +%clean +rm -rf %{buildroot} + +%post -p /sbin/ldconfig +%postun -p /sbin/ldconfig + +%files +%defattr(644,root,root,755) +%attr(755,root,root) %{_bindir}/data_maintainer3 +%attr(755,root,root) %{_bindir}/londiste3 +%attr(755,root,root) %{_bindir}/pgqd +%attr(755,root,root) %{_bindir}/qadmin +%attr(755,root,root) %{_bindir}/queue_mover3 +%attr(755,root,root) %{_bindir}/queue_splitter3 +%attr(755,root,root) %{_bindir}/scriptmgr3 +%attr(755,root,root) %{_bindir}/simple_consumer3 +%attr(755,root,root) %{_bindir}/simple_local_consumer3 +%attr(755,root,root) %{_bindir}/skytools_upgrade3 +%attr(755,root,root) %{_bindir}/walmgr3 +/usr/lib/python%{pyver}/site-packages/pkgloader-1.0-py%{pyver}.egg-info +/usr/lib/python%{pyver}/site-packages/pkgloader.py* +%dir %{_libdir}/python%{pyver}/site-packages/londiste +%{_libdir}/python%{pyver}/site-packages/londiste/*.py* +%{_libdir}/python%{pyver}/site-packages/londiste/handlers/*.py* +%{_libdir}/python%{pyver}/site-packages/pgq/*.py* +%{_libdir}/python%{pyver}/site-packages/skytools/*.py* +%{_libdir}/python%{pyver}/site-packages/skytools/*.so +%{_libdir}/python%{pyver}/site-packages/pgq/cascade/*.py* +%{_libdir}/python%{pyver}/site-packages/%{sname}-%{version}-py%{pyver}.egg-info +%{pginstdir}/share/contrib/londiste.sql +%{pginstdir}/share/contrib/londiste.upgrade.sql +%{pginstdir}/share/contrib/newgrants_londiste.sql +%{pginstdir}/share/contrib/newgrants_pgq.sql +%{pginstdir}/share/contrib/newgrants_pgq_coop.sql +%{pginstdir}/share/contrib/newgrants_pgq_ext.sql +%{pginstdir}/share/contrib/newgrants_pgq_node.sql +%{pginstdir}/share/contrib/oldgrants_londiste.sql +%{pginstdir}/share/contrib/oldgrants_pgq.sql +%{pginstdir}/share/contrib/oldgrants_pgq_coop.sql +%{pginstdir}/share/contrib/oldgrants_pgq_ext.sql +%{pginstdir}/share/contrib/oldgrants_pgq_node.sql +%{pginstdir}/share/contrib/pgq.sql +%{pginstdir}/share/contrib/pgq.upgrade.sql +%{pginstdir}/share/contrib/pgq_coop.sql +%{pginstdir}/share/contrib/pgq_coop.upgrade.sql +%{pginstdir}/share/contrib/pgq_ext.sql +%{pginstdir}/share/contrib/pgq_ext.upgrade.sql +%{pginstdir}/share/contrib/pgq_node.sql +%{pginstdir}/share/contrib/pgq_node.upgrade.sql +%{pginstdir}/share/contrib/txid.sql +%{pginstdir}/share/contrib/uninstall_pgq.sql +%{pginstdir}/share/extension/londiste.control +%{pginstdir}/share/extension/londiste--3.1--3.2.sql +%{pginstdir}/share/extension/londiste--3.1.1--3.2.sql +%{pginstdir}/share/extension/londiste--3.1.3--3.2.sql +%{pginstdir}/share/extension/londiste--3.1.4--3.2.sql +%{pginstdir}/share/extension/londiste--3.1.6--3.2.sql +%{pginstdir}/share/extension/londiste--3.2.sql +%{pginstdir}/share/extension/londiste--unpackaged--3.2.sql +%{pginstdir}/share/extension/pgq--3.1--3.2.sql +%{pginstdir}/share/extension/pgq--3.1.1--3.2.sql +%{pginstdir}/share/extension/pgq--3.1.2--3.2.sql +%{pginstdir}/share/extension/pgq--3.1.3--3.2.sql +%{pginstdir}/share/extension/pgq--3.1.6--3.2.sql +%{pginstdir}/share/extension/pgq--3.2.sql +%{pginstdir}/share/extension/pgq--unpackaged--3.2.sql +%{pginstdir}/share/extension/pgq.control +%{pginstdir}/share/extension/pgq_coop--3.1--3.1.1.sql +%{pginstdir}/share/extension/pgq_coop--3.1.1.sql +%{pginstdir}/share/extension/pgq_coop--unpackaged--3.1.1.sql +%{pginstdir}/share/extension/pgq_coop.control +%{pginstdir}/share/extension/pgq_node--3.1--3.2.sql +%{pginstdir}/share/extension/pgq_node--3.1.3--3.2.sql +%{pginstdir}/share/extension/pgq_node--3.1.6--3.2.sql +%{pginstdir}/share/extension/pgq_node--3.2.sql +%{pginstdir}/share/extension/pgq_node--unpackaged--3.2.sql +%{pginstdir}/share/extension/pgq_ext--3.1.sql +%{pginstdir}/share/extension/pgq_ext--unpackaged--3.1.sql +%{pginstdir}/share/extension/pgq_ext.control +%{pginstdir}/share/extension/pgq_node.control +%{_docdir}/skytools3/conf/pgqd.ini.templ +%{_docdir}/skytools3/conf/wal-master.ini +%{_docdir}/skytools3/conf/wal-slave.ini +%{_docdir}/pgsql/extension/README.pgq +%{_docdir}/pgsql/extension/README.pgq_ext +%{_mandir}/man1/londiste3.1.gz +%{_mandir}/man1/pgqd.1.gz +%{_mandir}/man1/qadmin.1.gz +%{_mandir}/man1/queue_mover3.1.gz +%{_mandir}/man1/queue_splitter3.1.gz +%{_mandir}/man1/scriptmgr3.1.gz +%{_mandir}/man1/simple_consumer3.1.gz +%{_mandir}/man1/simple_local_consumer3.1.gz +%{_mandir}/man1/skytools_upgrade3.1.gz +%{_mandir}/man1/walmgr3.1.gz +%{_datadir}/skytools3/londiste.upgrade_2.1_to_3.1.sql +%{_datadir}/skytools3/pgq.upgrade_2.1_to_3.0.sql + +%{_datadir}/skytools3/londiste.sql +%{_datadir}/skytools3/londiste.upgrade.sql +%{_datadir}/skytools3/pgq.sql +%{_datadir}/skytools3/pgq.upgrade.sql +%{_datadir}/skytools3/pgq_coop.sql +%{_datadir}/skytools3/pgq_coop.upgrade.sql +%{_datadir}/skytools3/pgq_ext.sql +%{_datadir}/skytools3/pgq_ext.upgrade.sql +%{_datadir}/skytools3/pgq_node.sql +%{_datadir}/skytools3/pgq_node.upgrade.sql + +%files modules +%{pginstdir}/lib/pgq_lowlevel.so +%{pginstdir}/share/contrib/pgq_lowlevel.sql +%{pginstdir}/lib/pgq_triggers.so +%{pginstdir}/share/contrib/pgq_triggers.sql + +%changelog +* Tue Aug 20 2013 Devrim GÜNDÜZ - 3.1.5-1 +- Update to 3.1.5, per changes described at + http://pgfoundry.org/frs/shownotes.php?release_id=2045 + +* Tue Jan 15 2013 Devrim GÜNDÜZ - 3.1.3-1 +- Update to 3.1.3 + +* Fri Jul 27 2012 - Devrim Gunduz - 3.1-1 +- Update to 3.1 +- Re-add mistakenly removed modules subpackage + +* Fri Jun 8 2012 - Devrim Gunduz - 3.0.3-1 +- Update to 3.0.3 + +* Tue Mar 8 2011 Devrim GUNDUZ - 2.1.12-1 +- Update to 2.1.12 + +* Thu Mar 11 2010 Devrim GUNDUZ - 2.1.11-1 +- Update to 2.1.11 +- Apply fixes for multiple PostgreSQL installation. +- Trim changelog + diff --git a/sql/pgq/functions/pgq.batch_slave_event_retry.sql b/sql/pgq/functions/pgq.batch_slave_event_retry.sql new file mode 100644 index 00000000..3e0d57c4 --- /dev/null +++ b/sql/pgq/functions/pgq.batch_slave_event_retry.sql @@ -0,0 +1,78 @@ +create or replace function pgq.batch_slave_event_retry( + x_batch_id bigint, + x_event_ids bigint[], + x_retry_time timestamptz) +returns integer as $$ +-- ---------------------------------------------------------------------- +-- Function: pgq.batch_slave_event_retry(3a) +-- +-- Put the events into retry queue, to be processed again later. +-- +-- Parameters: +-- x_batch_id - ID of active batch. +-- x_event_ids - event ids in an array +-- x_retry_time - Time when the event should be put back into queue +-- +-- Returns: +-- 1 - success +-- 0 - event already in retry queue +-- Calls: +-- None +-- Tables directly manipulated: +-- insert - pgq.retry_queue +-- ---------------------------------------------------------------------- +begin + insert into pgq.retry_queue (ev_retry_after, ev_queue, + ev_id, ev_time, ev_txid, ev_owner, ev_retry, ev_type, ev_data, + ev_extra1, ev_extra2, ev_extra3, ev_extra4) + select x_retry_time, sub_queue, + ev_id, ev_time, NULL, sub_id, coalesce(ev_retry, 0) + 1, + ev_type, ev_data, ev_extra1, ev_extra2, ev_extra3, ev_extra4 + from pgq.get_batch_events(x_batch_id), + pgq.subscription + where sub_batch = x_batch_id + and ev_id = ANY(x_event_ids); + if not found then + raise exception 'event not found'; + end if; + return 1; + +-- dont worry if the event is already in queue +exception + when unique_violation then + return 0; +end; +$$ language plpgsql security definer; + + +create or replace function pgq.batch_slave_event_retry( + x_batch_id bigint, + x_event_ids bigint[], + x_retry_seconds integer) +returns integer as $$ +-- ---------------------------------------------------------------------- +-- Function: pgq.batch_slave_event_retry(3b) +-- +-- Put the event ids into retry queue, to be processed later again. +-- +-- Parameters: +-- x_batch_id - ID of active batch. +-- x_event_ids - event ids in an array +-- x_retry_seconds - Time when the event should be put back into queue +-- +-- Returns: +-- 1 - success +-- 0 - event already in retry queue +-- Calls: +-- pgq.event_retry(3a) +-- Tables directly manipulated: +-- None +-- ---------------------------------------------------------------------- +declare + new_retry timestamptz; +begin + new_retry := current_timestamp + ((x_retry_seconds::text || ' seconds')::interval); + return pgq.batch_slave_event_retry(x_batch_id, x_event_ids, new_retry); +end; +$$ language plpgsql security definer; + diff --git a/sql/pgq/functions/pgq.batch_slave_events.sql b/sql/pgq/functions/pgq.batch_slave_events.sql new file mode 100644 index 00000000..2ed7919b --- /dev/null +++ b/sql/pgq/functions/pgq.batch_slave_events.sql @@ -0,0 +1,290 @@ + +-- Group: Low-level event handling + +create or replace function pgq.get_batch_slave_events( + in x_batch_id bigint, + in x_queue text, + in x_consumer text, + in cur_tick_id bigint, + in prev_tick_id bigint, + out ev_id bigint, + out ev_time timestamptz, + out ev_txid bigint, + out ev_retry int4, + out ev_type text, + out ev_data text, + out ev_extra1 text, + out ev_extra2 text, + out ev_extra3 text, + out ev_extra4 text) +returns setof record as $$ +-- ---------------------------------------------------------------------- +-- Function: pgq.get_batch_slave_events(1) +-- +-- Get all events in batch from local read slave. +-- Because we are reading from local read slave, we +-- need more information instead a single batch_id. +-- +-- Parameters: +-- x_batch_id - ID of active batch. +-- x_queue - name of the event queue +-- x_consumer - name of the consumer +-- cur_tick_id - current tick id +-- prev_tick_id - last tick id +-- +-- Returns: +-- List of events. +-- ---------------------------------------------------------------------- +declare + sql text; + queue_id integer; + sub_id integer; + cons_id integer; + errmsg text; +begin + select s.sub_queue, s.sub_consumer, s.sub_id + into queue_id, cons_id, sub_id + from pgq.consumer c, + pgq.queue q, + pgq.subscription s + where q.queue_name = x_queue + and c.co_name = x_consumer + and s.sub_queue = q.queue_id + and s.sub_consumer = c.co_id; + if not found then + errmsg := 'Not subscriber to queue: ' + || coalesce(x_queue, 'NULL') + || '/' + || coalesce(x_consumer, 'NULL'); + raise exception '%', errmsg; + end if; + + sql := pgq.batch_slave_event_sql(x_batch_id, sub_id, cur_tick_id, prev_tick_id); + for ev_id, ev_time, ev_txid, ev_retry, ev_type, ev_data, + ev_extra1, ev_extra2, ev_extra3, ev_extra4 + in execute sql + loop + return next; + end loop; + return; +end; +$$ language plpgsql; -- no perms needed + + +create or replace function pgq.batch_slave_event_sql( + x_batch_id bigint, + x_sub_id bigint, + cur_tick_id bigint, + prev_tick_id bigint) +returns text as $$ +-- ---------------------------------------------------------------------- +-- Function: pgq.batch_slave_event_sql(1) +-- Creates SELECT statement that fetches events for this batch from local readslave. +-- +-- Parameters: +-- x_batch_id - ID of a active batch. +-- x_sub_id - ID of the subscription of this batch +-- cur_tick_id - current tick id +-- prev_tick_id - last tick id +-- +-- Returns: +-- SQL statement. +-- ---------------------------------------------------------------------- + +-- ---------------------------------------------------------------------- +-- Algorithm description: +-- Given 2 snapshots, sn1 and sn2 with sn1 having xmin1, xmax1 +-- and sn2 having xmin2, xmax2 create expression that filters +-- right txid's from event table. +-- +-- Simplest solution would be +-- > WHERE ev_txid >= xmin1 AND ev_txid <= xmax2 +-- > AND NOT txid_visible_in_snapshot(ev_txid, sn1) +-- > AND txid_visible_in_snapshot(ev_txid, sn2) +-- +-- The simple solution has a problem with long transactions (xmin1 very low). +-- All the batches that happen when the long tx is active will need +-- to scan all events in that range. Here is 2 optimizations used: +-- +-- 1) Use [xmax1..xmax2] for range scan. That limits the range to +-- txids that actually happened between two snapshots. For txids +-- in the range [xmin1..xmax1] look which ones were actually +-- committed between snapshots and search for them using exact +-- values using IN (..) list. +-- +-- 2) As most TX are short, there could be lot of them that were +-- just below xmax1, but were committed before xmax2. So look +-- if there are ID's near xmax1 and lower the range to include +-- them, thus decresing size of IN (..) list. +-- ---------------------------------------------------------------------- +declare + rec record; + sql text; + tbl text; + arr text; + part text; + select_fields text; + retry_expr text; + batch record; +begin + select s.sub_last_tick, s.sub_next_tick, s.sub_id, s.sub_queue, + txid_snapshot_xmax(last.tick_snapshot) as tx_start, + txid_snapshot_xmax(cur.tick_snapshot) as tx_end, + last.tick_snapshot as last_snapshot, + cur.tick_snapshot as cur_snapshot + into batch + from pgq.subscription s, pgq.tick last, pgq.tick cur + where s.sub_id = x_sub_id + and last.tick_queue = s.sub_queue + and last.tick_id = prev_tick_id + and cur.tick_queue = s.sub_queue + and cur.tick_id = cur_tick_id; + if not found then + raise exception 'batch not found'; + end if; + + batch.sub_last_tick := prev_tick_id; + batch.sub_next_tick := cur_tick_id; + + -- load older transactions + arr := ''; + for rec in + -- active tx-es in prev_snapshot that were committed in cur_snapshot + select id1 from + txid_snapshot_xip(batch.last_snapshot) id1 left join + txid_snapshot_xip(batch.cur_snapshot) id2 on (id1 = id2) + where id2 is null + order by 1 desc + loop + -- try to avoid big IN expression, so try to include nearby + -- tx'es into range + if batch.tx_start - 100 <= rec.id1 then + batch.tx_start := rec.id1; + else + if arr = '' then + arr := rec.id1::text; + else + arr := arr || ',' || rec.id1::text; + end if; + end if; + end loop; + + -- must match pgq.event_template + select_fields := 'select ev_id, ev_time, ev_txid, ev_retry, ev_type,' + || ' ev_data, ev_extra1, ev_extra2, ev_extra3, ev_extra4'; + retry_expr := ' and (ev_owner is null or ev_owner = ' + || batch.sub_id::text || ')'; + + -- now generate query that goes over all potential tables + sql := ''; + for rec in + select xtbl from pgq.batch_slave_event_tables(x_batch_id, x_sub_id, cur_tick_id, prev_tick_id) xtbl + loop + tbl := pgq.quote_fqname(rec.xtbl); + -- this gets newer queries that definitely are not in prev_snapshot + part := select_fields + || ' from pgq.tick cur, pgq.tick last, ' || tbl || ' ev ' + || ' where cur.tick_id = ' || batch.sub_next_tick::text + || ' and cur.tick_queue = ' || batch.sub_queue::text + || ' and last.tick_id = ' || batch.sub_last_tick::text + || ' and last.tick_queue = ' || batch.sub_queue::text + || ' and ev.ev_txid >= ' || batch.tx_start::text + || ' and ev.ev_txid <= ' || batch.tx_end::text + || ' and txid_visible_in_snapshot(ev.ev_txid, cur.tick_snapshot)' + || ' and not txid_visible_in_snapshot(ev.ev_txid, last.tick_snapshot)' + || retry_expr; + -- now include older tx-es, that were ongoing + -- at the time of prev_snapshot + if arr <> '' then + part := part || ' union all ' + || select_fields || ' from ' || tbl || ' ev ' + || ' where ev.ev_txid in (' || arr || ')' + || retry_expr; + end if; + if sql = '' then + sql := part; + else + sql := sql || ' union all ' || part; + end if; + end loop; + if sql = '' then + raise exception 'could not construct sql for batch %', x_batch_id; + end if; + return sql || ' order by 1'; +end; +$$ language plpgsql; -- no perms needed + + + +create or replace function pgq.batch_slave_event_tables( + x_batch_id bigint, + x_sub_id bigint, + cur_tick_id bigint, + prev_tick_id bigint) +returns setof text as $$ +-- ---------------------------------------------------------------------- +-- Function: pgq.batch_slave_event_tables(1) +-- +-- Returns set of table names where this batch events may reside from local readslave. +-- +-- Parameters: +-- x_batch_id - ID of a active batch. +-- x_sub_id - ID of the subscription of this batch +-- cur_tick_id - current tick id +-- prev_tick_id - last tick id +-- ---------------------------------------------------------------------- +declare + nr integer; + tbl text; + use_prev integer; + use_next integer; + batch record; +begin + select + txid_snapshot_xmin(last.tick_snapshot) as tx_min, -- absolute minimum + txid_snapshot_xmax(cur.tick_snapshot) as tx_max, -- absolute maximum + q.queue_data_pfx, q.queue_ntables, + q.queue_cur_table, q.queue_switch_step1, q.queue_switch_step2 + into batch + from pgq.tick last, pgq.tick cur, pgq.subscription s, pgq.queue q + where cur.tick_id = cur_tick_id + and cur.tick_queue = s.sub_queue + and last.tick_id = prev_tick_id + and last.tick_queue = s.sub_queue + and s.sub_id = x_sub_id + and q.queue_id = s.sub_queue; + if not found then + raise exception 'Cannot find data for batch %', x_batch_id; + end if; + + -- if its definitely not in one or other, look into both + if batch.tx_max < batch.queue_switch_step1 then + use_prev := 1; + use_next := 0; + elsif batch.queue_switch_step2 is not null + and (batch.tx_min > batch.queue_switch_step2) + then + use_prev := 0; + use_next := 1; + else + use_prev := 1; + use_next := 1; + end if; + + if use_prev then + nr := batch.queue_cur_table - 1; + if nr < 0 then + nr := batch.queue_ntables - 1; + end if; + tbl := batch.queue_data_pfx || '_' || nr::text; + return next tbl; + end if; + + if use_next then + tbl := batch.queue_data_pfx || '_' || batch.queue_cur_table::text; + return next tbl; + end if; + + return; +end; +$$ language plpgsql; -- no perms needed diff --git a/sql/pgq/structure/func_public.sql b/sql/pgq/structure/func_public.sql index d069094e..6eb920b9 100644 --- a/sql/pgq/structure/func_public.sql +++ b/sql/pgq/structure/func_public.sql @@ -68,3 +68,7 @@ \i functions/pgq.version.sql \i functions/pgq.get_batch_info.sql +-- Group: Readslave related functions + +\i functions/pgq.batch_slave_events.sql +\i functions/pgq.batch_slave_event_retry.sql diff --git a/sql/pgq/structure/grants.ini b/sql/pgq/structure/grants.ini index 96b59c3f..f168d1b9 100644 --- a/sql/pgq/structure/grants.ini +++ b/sql/pgq/structure/grants.ini @@ -59,7 +59,9 @@ pgq_generic_fns = pgq_read_fns = pgq.batch_event_sql(bigint), + pgq.batch_slave_event_sql(bigint, bigint, bigint, bigint), pgq.batch_event_tables(bigint), + pgq.batch_slave_event_tables(bigint, bigint, bigint, bigint), pgq.find_tick_helper(int4, int8, timestamptz, int8, int8, interval), pgq.register_consumer(text, text), pgq.register_consumer_at(text, text, bigint), @@ -68,9 +70,12 @@ pgq_read_fns = pgq.next_batch(text, text), pgq.next_batch_custom(text, text, interval, int4, interval), pgq.get_batch_events(bigint), + pgq.get_batch_slave_events(bigint, text, text, bigint, bigint), pgq.get_batch_info(bigint), pgq.get_batch_cursor(bigint, text, int4, text), pgq.get_batch_cursor(bigint, text, int4), + pgq.batch_slave_event_retry(bigint, bigint[], timestamptz), + pgq.batch_slave_event_retry(bigint, bigint[], integer), pgq.event_retry(bigint, bigint, timestamptz), pgq.event_retry(bigint, bigint, integer), pgq.batch_retry(bigint, integer),