diff --git a/docs/source/testgres.rst b/docs/source/testgres.rst index 80c86e84..c3c43f2a 100644 --- a/docs/source/testgres.rst +++ b/docs/source/testgres.rst @@ -61,6 +61,14 @@ testgres.node .. autoclass:: testgres.node.ProcessProxy :members: +testgres.standby +---------------- + +.. automodule:: testgres.standby + :members: + :undoc-members: + :show-inheritance: + testgres.pubsub --------------- diff --git a/testgres/__init__.py b/testgres/__init__.py index c907b708..bf232e2e 100644 --- a/testgres/__init__.py +++ b/testgres/__init__.py @@ -24,3 +24,7 @@ get_bin_path, \ get_pg_config, \ get_pg_version + +from .standby import \ + First, \ + Any diff --git a/testgres/node.py b/testgres/node.py index 9978dcf6..d4b0147c 100644 --- a/testgres/node.py +++ b/testgres/node.py @@ -6,6 +6,7 @@ import subprocess import time +from collections import Iterable from shutil import rmtree from six import raise_from, iteritems from tempfile import mkstemp, mkdtemp @@ -65,6 +66,8 @@ from .pubsub import Publication, Subscription +from .standby import First + from .utils import \ PgVer, \ eprint, \ @@ -671,7 +674,7 @@ def restart(self, params=[]): def reload(self, params=[]): """ - Reload config files using pg_ctl. + Asynchronously reload config files using pg_ctl. Args: params: additional arguments for pg_ctl. @@ -1036,6 +1039,45 @@ def replicate(self, name=None, slot=None, **kwargs): with clean_on_error(self.backup(**kwargs)) as backup: return backup.spawn_replica(name=name, destroy=True, slot=slot) + def set_synchronous_standbys(self, standbys): + """ + Set standby synchronization options. This corresponds to + `synchronous_standby_names `_ + option. Note that :meth:`~.PostgresNode.reload` or + :meth:`~.PostgresNode.restart` is needed for changes to take place. + + Args: + standbys: either :class:`.First` or :class:`.Any` object specifying + sychronization parameters or just a plain list of + :class:`.PostgresNode`s replicas which would be equivalent + to passing ``First(1, )``. For PostgreSQL 9.5 and below + it is only possible to specify a plain list of standbys as + `FIRST` and `ANY` keywords aren't supported. + + Example:: + + from testgres import get_new_node, First + + master = get_new_node().init().start() + with master.replicate().start() as standby: + master.append_conf("synchronous_commit = remote_apply") + master.set_synchronous_standbys(First(1, [standby])) + master.restart() + + """ + if self._pg_version >= '9.6': + if isinstance(standbys, Iterable): + standbys = First(1, standbys) + else: + if isinstance(standbys, Iterable): + standbys = u", ".join( + u"\"{}\"".format(r.name) for r in standbys) + else: + raise TestgresException("Feature isn't supported in " + "Postgres 9.5 and below") + + self.append_conf("synchronous_standby_names = '{}'".format(standbys)) + def catchup(self, dbname=None, username=None): """ Wait until async replica catches up with its master. diff --git a/testgres/standby.py b/testgres/standby.py new file mode 100644 index 00000000..e7ce408d --- /dev/null +++ b/testgres/standby.py @@ -0,0 +1,49 @@ +# coding: utf-8 + +import six + + +@six.python_2_unicode_compatible +class First: + """ + Specifies a priority-based synchronous replication and makes transaction + commits wait until their WAL records are replicated to ``num_sync`` + synchronous standbys chosen based on their priorities. + + Args: + sync_num (int): the number of standbys that transaction need to wait + for replies from + standbys (:obj:`list` of :class:`.PostgresNode`): the list of standby + nodes + """ + + def __init__(self, sync_num, standbys): + self.sync_num = sync_num + self.standbys = standbys + + def __str__(self): + return u"{} ({})".format(self.sync_num, u", ".join( + u"\"{}\"".format(r.name) for r in self.standbys)) + + +@six.python_2_unicode_compatible +class Any: + """ + Specifies a quorum-based synchronous replication and makes transaction + commits wait until their WAL records are replicated to at least ``num_sync`` + listed standbys. Only available for Postgres 10 and newer. + + Args: + sync_num (int): the number of standbys that transaction need to wait + for replies from + standbys (:obj:`list` of :class:`.PostgresNode`): the list of standby + nodes + """ + + def __init__(self, sync_num, standbys): + self.sync_num = sync_num + self.standbys = standbys + + def __str__(self): + return u"ANY {} ({})".format(self.sync_num, u", ".join( + u"\"{}\"".format(r.name) for r in self.standbys)) diff --git a/tests/test_simple.py b/tests/test_simple.py index 230cff47..0c61504f 100755 --- a/tests/test_simple.py +++ b/tests/test_simple.py @@ -40,6 +40,10 @@ get_pg_config, \ get_pg_version +from testgres import \ + First, \ + Any + # NOTE: those are ugly imports from testgres import bound_ports from testgres.utils import PgVer @@ -409,6 +413,46 @@ def test_replicate(self): res = node.execute('select * from test') self.assertListEqual(res, []) + def test_synchronous_replication(self): + with get_new_node() as master: + old_version = not pg_version_ge('9.6') + + master.init(allow_streaming=True).start() + + if not old_version: + master.append_conf('synchronous_commit = remote_apply') + + # create standby + with master.replicate() as standby1, master.replicate() as standby2: + standby1.start() + standby2.start() + + # check formatting + self.assertEqual( + '1 ("{}", "{}")'.format(standby1.name, standby2.name), + str(First(1, (standby1, standby2)))) # yapf: disable + self.assertEqual( + 'ANY 1 ("{}", "{}")'.format(standby1.name, standby2.name), + str(Any(1, (standby1, standby2)))) # yapf: disable + + # set synchronous_standby_names + master.set_synchronous_standbys([standby1, standby2]) + master.restart() + + # the following part of the test is only applicable to newer + # versions of PostgresQL + if not old_version: + master.safe_psql('create table abc(a int)') + + # Create a large transaction that will take some time to apply + # on standby to check that it applies synchronously + # (If set synchronous_commit to 'on' or other lower level then + # standby most likely won't catchup so fast and test will fail) + master.safe_psql( + 'insert into abc select generate_series(1, 1000000)') + res = standby1.safe_psql('select count(*) from abc') + self.assertEqual(res, b'1000000\n') + @unittest.skipUnless(pg_version_ge('10'), 'requires 10+') def test_logical_replication(self): with get_new_node() as node1, get_new_node() as node2: