Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions python/pgq/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import pgq.consumer
import pgq.remoteconsumer
import pgq.producer
import pgq.slaveconsumer

import pgq.status

Expand All @@ -21,6 +22,7 @@
from pgq.remoteconsumer import *
from pgq.localconsumer import *
from pgq.producer import *
from pgq.slaveconsumer import *

from pgq.status import *

Expand All @@ -35,6 +37,7 @@
pgq.coopconsumer.__all__ +
pgq.remoteconsumer.__all__ +
pgq.localconsumer.__all__ +
pgq.slaveconsumer.__all__ +
pgq.cascade.nodeinfo.__all__ +
pgq.cascade.admin.__all__ +
pgq.cascade.consumer.__all__ +
Expand Down
132 changes: 132 additions & 0 deletions python/pgq/slaveconsumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@

"""Slave Consumer that runs in a Postgres readslave.
It allocates a new batch from master and reads events
locally by taking advantage of Postgres streaming replication.
It also sends back a batch of events for retry (instead of sending
back events one at a time).

"""

from pgq.consumer import Consumer

__all__ = ['SlaveConsumer']


# Event status codes
EV_UNTAGGED = -1
EV_RETRY = 0
EV_DONE = 1


class SlaveConsumer(Consumer):
"""Reads locally and writes to remote master.
Retries events in batch.
"""

def __init__(self, service_name, db_name, slave_db, args):
"""db_name is the remote master, slave_db is the local readslave
"""
Consumer.__init__(self, service_name, db_name, args)
self.slave_db = slave_db

def work(self):
"""Do the work loop, once (internal).
Returns: true if wants to be called again,
false if script can sleep.

This class allocates a new batch in master,
then it checks locally if streaming replication
has already caught up with regard to the batch_id + tick_id.
It yes, then proceed to read locally all the events and process them.
Otherwise return 0 and will retry the same batch later.
"""
self.pgq_lazy_fetch = 0

db = self.get_database(self.db_name)
master_curs = db.cursor()

self.stat_start()

# acquire batch
batch_id = self._load_next_batch(master_curs)
db.commit()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need to commit here? anyway to save this?

if batch_id == None:
return 0
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the doc says return values are true/false, but you are actually returning 0/1

slave_curs = self.get_database(self.slave_db).cursor()

slave_curs.execute("""select tick_id from pgq.tick where tick_id = %s""",
[self.batch_info['tick_id']])
if slave_curs.rowcount <= 0:
self.get_database(self.slave_db).commit()
return 0
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we have different sleeps for "no event from master" vs. "hasn't replicated to this slave yet"?. and stats?


# load events locally
ev_list = self._load_batch_events(slave_curs, batch_id)
self.get_database(self.slave_db).commit()

# process events
self._launch_process_batch(db, batch_id, ev_list)

# done, ack to master
self._finish_batch(master_curs, batch_id, ev_list)
db.commit()
self.stat_end(len(ev_list))

return 1

def _flush_retry(self, master_curs, batch_id, list):
"""Tag retry events."""

retry = 0
retry_list = []
retry_time = None
if self.pgq_lazy_fetch:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whats lazy_fetch?

for ev_id, stat in list.iter_status():
if stat[0] == EV_RETRY:
retry_list.append(ev_id)
if not retry_time or stat[1] > retry_time:
retry_time = stat[1]
retry += 1
elif stat[0] != EV_DONE:
raise Exception("Untagged event: id=%d" % ev_id)
else:
for ev in list:
if ev._status == EV_RETRY:
retry_list.append(ev.id)
if not retry_time or ev.retry_time > retry_time:
retry_time = ev.retry_time
retry += 1
elif ev._status != EV_DONE:
raise Exception("Untagged event: (id=%d, type=%s, data=%s, ex1=%s" % (
ev.id, ev.type, ev.data, ev.extra1))

# report weird events
if retry:
self._batch_retry(master_curs, batch_id, retry_list, retry_time)
self.stat_increase('retry-events', retry)

def _batch_retry(self, cx, batch_id, retry_list, retry_time):
"""send a batch of event ids for retry"""
cx.execute("select pgq.batch_slave_event_retry(%s, %s, %s)",
[batch_id, '{' + ",".join(str(x) for x in retry_list) + '}', retry_time])

def _load_batch_events(self, curs, batch_id):
"""Fetch all events for this batch."""

# load events
sql = "select * from pgq.get_batch_slave_events(%d, '%s', '%s', %d, %d)" % (
batch_id, self.queue_name, self.consumer_name,
self.batch_info['cur_tick_id'],
self.batch_info['prev_tick_id'])
if self.consumer_filter is not None:
sql += " where %s" % self.consumer_filter
curs.execute(sql)
rows = curs.dictfetchall()

# map them to python objects
ev_list = []
for r in rows:
ev = self._make_event(self.queue_name, r)
ev_list.append(ev)

return ev_list
185 changes: 185 additions & 0 deletions skytools_3.2.spec
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
%global pginstdir /usr/pgsql-9.2
%global pgmajorversion 92
%global sname skytools
%global _libdir /usr/lib

# Python major version.
%{expand: %%define pyver %(python -c 'import sys;print(sys.version[0:3])')}
%{!?python_sitearch: %define python_sitearch %(python -c "from distutils.sysconfig import get_python_lib; print get_python_lib(1)")}

Summary: PostgreSQL database management tools from Skype
Name: %{sname}-%{pgmajorversion}
Version: 3.2
Release: insta3%{?dist}
License: BSD
Group: Applications/Databases
Source0: %{sname}-%{version}.tar.gz
URL: http://pgfoundry.org/projects/skytools
BuildRequires: postgresql%{pgmajorversion}-devel, python-devel
Requires: python-psycopg2, postgresql%{pgmajorversion}
BuildRoot: %{_tmppath}/%{name}-%{version}-%{release}-root-%(%{__id_u} -n)
Autoreq: 0

%description
Database management tools from Skype:WAL shipping, queueing, replication.
The tools are named walmgr, PgQ and Londiste, respectively.

%package modules
Summary: PostgreSQL modules of Skytools
Group: Applications/Databases
Requires: %{sname}-%{pgmajorversion} = %{version}-%{release}

%description modules
This package has PostgreSQL modules of skytools.

%prep
%setup -q -n %{sname}-%{version}

%build
%configure --with-pgconfig=%{pginstdir}/bin/pg_config --libdir=$RPM_BUILD_ROOT%{_libdir} --with-xmlto --with-asciidoc

make %{?_smp_mflags}

%install
rm -rf %{buildroot}

make %{?_smp_mflags} DESTDIR=%{buildroot} python-install modules-install

%clean
rm -rf %{buildroot}

%post -p /sbin/ldconfig
%postun -p /sbin/ldconfig

%files
%defattr(644,root,root,755)
%attr(755,root,root) %{_bindir}/data_maintainer3
%attr(755,root,root) %{_bindir}/londiste3
%attr(755,root,root) %{_bindir}/pgqd
%attr(755,root,root) %{_bindir}/qadmin
%attr(755,root,root) %{_bindir}/queue_mover3
%attr(755,root,root) %{_bindir}/queue_splitter3
%attr(755,root,root) %{_bindir}/scriptmgr3
%attr(755,root,root) %{_bindir}/simple_consumer3
%attr(755,root,root) %{_bindir}/simple_local_consumer3
%attr(755,root,root) %{_bindir}/skytools_upgrade3
%attr(755,root,root) %{_bindir}/walmgr3
/usr/lib/python%{pyver}/site-packages/pkgloader-1.0-py%{pyver}.egg-info
/usr/lib/python%{pyver}/site-packages/pkgloader.py*
%dir %{_libdir}/python%{pyver}/site-packages/londiste
%{_libdir}/python%{pyver}/site-packages/londiste/*.py*
%{_libdir}/python%{pyver}/site-packages/londiste/handlers/*.py*
%{_libdir}/python%{pyver}/site-packages/pgq/*.py*
%{_libdir}/python%{pyver}/site-packages/skytools/*.py*
%{_libdir}/python%{pyver}/site-packages/skytools/*.so
%{_libdir}/python%{pyver}/site-packages/pgq/cascade/*.py*
%{_libdir}/python%{pyver}/site-packages/%{sname}-%{version}-py%{pyver}.egg-info
%{pginstdir}/share/contrib/londiste.sql
%{pginstdir}/share/contrib/londiste.upgrade.sql
%{pginstdir}/share/contrib/newgrants_londiste.sql
%{pginstdir}/share/contrib/newgrants_pgq.sql
%{pginstdir}/share/contrib/newgrants_pgq_coop.sql
%{pginstdir}/share/contrib/newgrants_pgq_ext.sql
%{pginstdir}/share/contrib/newgrants_pgq_node.sql
%{pginstdir}/share/contrib/oldgrants_londiste.sql
%{pginstdir}/share/contrib/oldgrants_pgq.sql
%{pginstdir}/share/contrib/oldgrants_pgq_coop.sql
%{pginstdir}/share/contrib/oldgrants_pgq_ext.sql
%{pginstdir}/share/contrib/oldgrants_pgq_node.sql
%{pginstdir}/share/contrib/pgq.sql
%{pginstdir}/share/contrib/pgq.upgrade.sql
%{pginstdir}/share/contrib/pgq_coop.sql
%{pginstdir}/share/contrib/pgq_coop.upgrade.sql
%{pginstdir}/share/contrib/pgq_ext.sql
%{pginstdir}/share/contrib/pgq_ext.upgrade.sql
%{pginstdir}/share/contrib/pgq_node.sql
%{pginstdir}/share/contrib/pgq_node.upgrade.sql
%{pginstdir}/share/contrib/txid.sql
%{pginstdir}/share/contrib/uninstall_pgq.sql
%{pginstdir}/share/extension/londiste.control
%{pginstdir}/share/extension/londiste--3.1--3.2.sql
%{pginstdir}/share/extension/londiste--3.1.1--3.2.sql
%{pginstdir}/share/extension/londiste--3.1.3--3.2.sql
%{pginstdir}/share/extension/londiste--3.1.4--3.2.sql
%{pginstdir}/share/extension/londiste--3.1.6--3.2.sql
%{pginstdir}/share/extension/londiste--3.2.sql
%{pginstdir}/share/extension/londiste--unpackaged--3.2.sql
%{pginstdir}/share/extension/pgq--3.1--3.2.sql
%{pginstdir}/share/extension/pgq--3.1.1--3.2.sql
%{pginstdir}/share/extension/pgq--3.1.2--3.2.sql
%{pginstdir}/share/extension/pgq--3.1.3--3.2.sql
%{pginstdir}/share/extension/pgq--3.1.6--3.2.sql
%{pginstdir}/share/extension/pgq--3.2.sql
%{pginstdir}/share/extension/pgq--unpackaged--3.2.sql
%{pginstdir}/share/extension/pgq.control
%{pginstdir}/share/extension/pgq_coop--3.1--3.1.1.sql
%{pginstdir}/share/extension/pgq_coop--3.1.1.sql
%{pginstdir}/share/extension/pgq_coop--unpackaged--3.1.1.sql
%{pginstdir}/share/extension/pgq_coop.control
%{pginstdir}/share/extension/pgq_node--3.1--3.2.sql
%{pginstdir}/share/extension/pgq_node--3.1.3--3.2.sql
%{pginstdir}/share/extension/pgq_node--3.1.6--3.2.sql
%{pginstdir}/share/extension/pgq_node--3.2.sql
%{pginstdir}/share/extension/pgq_node--unpackaged--3.2.sql
%{pginstdir}/share/extension/pgq_ext--3.1.sql
%{pginstdir}/share/extension/pgq_ext--unpackaged--3.1.sql
%{pginstdir}/share/extension/pgq_ext.control
%{pginstdir}/share/extension/pgq_node.control
%{_docdir}/skytools3/conf/pgqd.ini.templ
%{_docdir}/skytools3/conf/wal-master.ini
%{_docdir}/skytools3/conf/wal-slave.ini
%{_docdir}/pgsql/extension/README.pgq
%{_docdir}/pgsql/extension/README.pgq_ext
%{_mandir}/man1/londiste3.1.gz
%{_mandir}/man1/pgqd.1.gz
%{_mandir}/man1/qadmin.1.gz
%{_mandir}/man1/queue_mover3.1.gz
%{_mandir}/man1/queue_splitter3.1.gz
%{_mandir}/man1/scriptmgr3.1.gz
%{_mandir}/man1/simple_consumer3.1.gz
%{_mandir}/man1/simple_local_consumer3.1.gz
%{_mandir}/man1/skytools_upgrade3.1.gz
%{_mandir}/man1/walmgr3.1.gz
%{_datadir}/skytools3/londiste.upgrade_2.1_to_3.1.sql
%{_datadir}/skytools3/pgq.upgrade_2.1_to_3.0.sql

%{_datadir}/skytools3/londiste.sql
%{_datadir}/skytools3/londiste.upgrade.sql
%{_datadir}/skytools3/pgq.sql
%{_datadir}/skytools3/pgq.upgrade.sql
%{_datadir}/skytools3/pgq_coop.sql
%{_datadir}/skytools3/pgq_coop.upgrade.sql
%{_datadir}/skytools3/pgq_ext.sql
%{_datadir}/skytools3/pgq_ext.upgrade.sql
%{_datadir}/skytools3/pgq_node.sql
%{_datadir}/skytools3/pgq_node.upgrade.sql

%files modules
%{pginstdir}/lib/pgq_lowlevel.so
%{pginstdir}/share/contrib/pgq_lowlevel.sql
%{pginstdir}/lib/pgq_triggers.so
%{pginstdir}/share/contrib/pgq_triggers.sql

%changelog
* Tue Aug 20 2013 Devrim GÜNDÜZ <devrim@gunduz.org> - 3.1.5-1
- Update to 3.1.5, per changes described at
http://pgfoundry.org/frs/shownotes.php?release_id=2045

* Tue Jan 15 2013 Devrim GÜNDÜZ <devrim@gunduz.org> - 3.1.3-1
- Update to 3.1.3

* Fri Jul 27 2012 - Devrim Gunduz <devrim@gunduz.org> - 3.1-1
- Update to 3.1
- Re-add mistakenly removed modules subpackage

* Fri Jun 8 2012 - Devrim Gunduz <devrim@gunduz.org> - 3.0.3-1
- Update to 3.0.3

* Tue Mar 8 2011 Devrim GUNDUZ <devrim@gunduz.org> - 2.1.12-1
- Update to 2.1.12

* Thu Mar 11 2010 Devrim GUNDUZ <devrim@gunduz.org> - 2.1.11-1
- Update to 2.1.11
- Apply fixes for multiple PostgreSQL installation.
- Trim changelog

Loading