From 60936c5fef7615c1ebb81d4b7c8c879931f68dda Mon Sep 17 00:00:00 2001 From: Ildar Musin Date: Thu, 29 Mar 2018 15:49:33 +0300 Subject: [PATCH 1/5] Add set_synchronous_standbys() method --- docs/source/testgres.rst | 10 +++++++- testgres/node.py | 32 +++++++++++++++++++++++++- testgres/standby.py | 49 ++++++++++++++++++++++++++++++++++++++++ tests/test_simple.py | 33 +++++++++++++++++++++++++++ 4 files changed, 122 insertions(+), 2 deletions(-) create mode 100644 testgres/standby.py diff --git a/docs/source/testgres.rst b/docs/source/testgres.rst index fd9c2d4d..e435d8c7 100644 --- a/docs/source/testgres.rst +++ b/docs/source/testgres.rst @@ -59,4 +59,12 @@ testgres.node .. automethod:: __init__ .. autoclass:: testgres.node.ProcessProxy - :members: \ No newline at end of file + :members: + +testgres.standby +---------------- + +.. automodule:: testgres.standby + :members: + :undoc-members: + :show-inheritance: \ No newline at end of file diff --git a/testgres/node.py b/testgres/node.py index 1ca19fc5..2b4763bc 100644 --- a/testgres/node.py +++ b/testgres/node.py @@ -6,6 +6,7 @@ import subprocess import time +from collections import Iterable from shutil import rmtree from six import raise_from, iteritems from tempfile import mkstemp, mkdtemp @@ -58,6 +59,8 @@ from .logger import TestgresLogger +from .standby import First + from .utils import \ eprint, \ get_bin_path, \ @@ -650,7 +653,7 @@ def restart(self, params=[]): def reload(self, params=[]): """ - Reload config files using pg_ctl. + Asynchronously reload config files using pg_ctl. Args: params: additional arguments for pg_ctl. @@ -979,6 +982,33 @@ def replicate(self, name=None, slot=None, **kwargs): with clean_on_error(self.backup(**kwargs)) as backup: return backup.spawn_replica(name=name, destroy=True, slot=slot) + def set_synchronous_standbys(self, standbys): + """ + Set standby synchronization options. This corresponds to + `synchronous_standby_names `_ + option. Note that :meth:`~.PostgresNode.reload` or + :meth:`~.PostgresNode.restart` is needed for changes to take place. + + Args: + standbys: either :class:`.First` or :class:`.Any` object specifying + sychronization parameters. It is also possible to pass simply + a list of replicas which would be equivalent to passing + ``First(1, )`` + + Example:: + + master = get_new_node().init().start() + with master.replicate.start() as standby: + master.append_conf("synchronous_commit = remote_apply") + master.set_synchronous_standbys(First(1, [standby])) + master.restart() + + """ + if isinstance(standbys, Iterable): + standbys = First(1, standbys) + + self.append_conf("synchronous_standby_names = '{}'".format(standbys)) + def catchup(self, dbname=None, username=None): """ Wait until async replica catches up with its master. diff --git a/testgres/standby.py b/testgres/standby.py new file mode 100644 index 00000000..775ad55f --- /dev/null +++ b/testgres/standby.py @@ -0,0 +1,49 @@ +# coding: utf-8 + +import six + + +@six.python_2_unicode_compatible +class First: + """ + Specifies a priority-based synchronous replication and makes transaction + commits wait until their WAL records are replicated to ``num_sync`` + synchronous standbys chosen based on their priorities. + + Args: + sync_num (int): the number of standbys that transaction need to wait + for replies from + standbys (:obj:`list` of :class:`.PostgresNode`): the list of standby + nodes + """ + + def __init__(self, sync_num, standbys): + self.sync_num = sync_num + self.standbys = standbys + + def __str__(self): + return u"{} ({})".format(self.sync_num, u", ".join( + u"\"{}\"".format(r.name) for r in self.standbys)) + + +@six.python_2_unicode_compatible +class Any: + """ + Specifies a quorum-based synchronous replication and makes transaction + commits wait until their WAL records are replicated to at least ``num_sync`` + listed standbys. Only available for Postgres 10 and newer. + + Args: + sync_num (int): the number of standbys that transaction need to wait + for replies from + standbys (:obj:`list` of :class:`.PostgresNode`): the list of standby + nodes + """ + + def __init__(self, sync_num, standbys): + self.sync_num = sync_num + self.standbys = standbys + + def __str__(self): + return u"ANY {} ({})".format(self.sync_num, u", ".join( + u"\"{}\"".format(r.name) for r in self.standbys)) diff --git a/tests/test_simple.py b/tests/test_simple.py index f639e92a..32a37a24 100755 --- a/tests/test_simple.py +++ b/tests/test_simple.py @@ -42,6 +42,7 @@ from testgres import bound_ports from testgres.utils import pg_version_ge from testgres.enums import ProcessType +from testgres.standby import First, Any def util_exists(util): @@ -388,6 +389,38 @@ def test_replicate(self): res = node.execute('select * from test') self.assertListEqual(res, []) + def test_synchronous_replication(self): + with get_new_node() as master: + master.init(allow_streaming=True).start() + master.append_conf('synchronous_commit = remote_apply') + + # create standby + with master.replicate() as standby1, master.replicate() as standby2: + standby1.start() + standby2.start() + + # check formatting + self.assertEqual('1 ("{}", "{}")'.format( + standby1.name, standby2.name), + str(First(1, (standby1, standby2)))) + self.assertEqual('ANY 1 ("{}", "{}")'.format( + standby1.name, standby2.name), + str(Any(1, (standby1, standby2)))) + + # set synchronous_standby_names + master.set_synchronous_standbys([standby1, standby2]) + master.restart() + master.safe_psql('create table abc(a int)') + + # Create a large transaction that will take some time to apply + # on standby to check that it applies synchronously + # (If set synchronous_commit to 'on' or other lower level then + # standby most likely won't catchup so fast and test will fail) + master.safe_psql( + 'insert into abc select generate_series(1, 1000000)') + res = standby1.safe_psql('select count(*) from abc') + self.assertEqual(res, b'1000000\n') + def test_replication_slots(self): with get_new_node() as node: node.init(allow_streaming=True).start() From 60b2b1cd181feef9657ac58cd2e1cf61cd7a9f43 Mon Sep 17 00:00:00 2001 From: Ildar Musin Date: Thu, 29 Mar 2018 15:49:33 +0300 Subject: [PATCH 2/5] Add set_synchronous_standbys() method --- docs/source/testgres.rst | 10 +++++++- testgres/node.py | 32 +++++++++++++++++++++++++- testgres/standby.py | 49 ++++++++++++++++++++++++++++++++++++++++ tests/test_simple.py | 33 +++++++++++++++++++++++++++ 4 files changed, 122 insertions(+), 2 deletions(-) create mode 100644 testgres/standby.py diff --git a/docs/source/testgres.rst b/docs/source/testgres.rst index fd9c2d4d..e435d8c7 100644 --- a/docs/source/testgres.rst +++ b/docs/source/testgres.rst @@ -59,4 +59,12 @@ testgres.node .. automethod:: __init__ .. autoclass:: testgres.node.ProcessProxy - :members: \ No newline at end of file + :members: + +testgres.standby +---------------- + +.. automodule:: testgres.standby + :members: + :undoc-members: + :show-inheritance: \ No newline at end of file diff --git a/testgres/node.py b/testgres/node.py index d8ce1f03..34480bb3 100644 --- a/testgres/node.py +++ b/testgres/node.py @@ -6,6 +6,7 @@ import subprocess import time +from collections import Iterable from shutil import rmtree from six import raise_from, iteritems from tempfile import mkstemp, mkdtemp @@ -62,6 +63,8 @@ from .logger import TestgresLogger +from .standby import First + from .utils import \ eprint, \ get_bin_path, \ @@ -656,7 +659,7 @@ def restart(self, params=[]): def reload(self, params=[]): """ - Reload config files using pg_ctl. + Asynchronously reload config files using pg_ctl. Args: params: additional arguments for pg_ctl. @@ -1022,6 +1025,33 @@ def replicate(self, name=None, slot=None, **kwargs): with clean_on_error(self.backup(**kwargs)) as backup: return backup.spawn_replica(name=name, destroy=True, slot=slot) + def set_synchronous_standbys(self, standbys): + """ + Set standby synchronization options. This corresponds to + `synchronous_standby_names `_ + option. Note that :meth:`~.PostgresNode.reload` or + :meth:`~.PostgresNode.restart` is needed for changes to take place. + + Args: + standbys: either :class:`.First` or :class:`.Any` object specifying + sychronization parameters. It is also possible to pass simply + a list of replicas which would be equivalent to passing + ``First(1, )`` + + Example:: + + master = get_new_node().init().start() + with master.replicate.start() as standby: + master.append_conf("synchronous_commit = remote_apply") + master.set_synchronous_standbys(First(1, [standby])) + master.restart() + + """ + if isinstance(standbys, Iterable): + standbys = First(1, standbys) + + self.append_conf("synchronous_standby_names = '{}'".format(standbys)) + def catchup(self, dbname=None, username=None): """ Wait until async replica catches up with its master. diff --git a/testgres/standby.py b/testgres/standby.py new file mode 100644 index 00000000..775ad55f --- /dev/null +++ b/testgres/standby.py @@ -0,0 +1,49 @@ +# coding: utf-8 + +import six + + +@six.python_2_unicode_compatible +class First: + """ + Specifies a priority-based synchronous replication and makes transaction + commits wait until their WAL records are replicated to ``num_sync`` + synchronous standbys chosen based on their priorities. + + Args: + sync_num (int): the number of standbys that transaction need to wait + for replies from + standbys (:obj:`list` of :class:`.PostgresNode`): the list of standby + nodes + """ + + def __init__(self, sync_num, standbys): + self.sync_num = sync_num + self.standbys = standbys + + def __str__(self): + return u"{} ({})".format(self.sync_num, u", ".join( + u"\"{}\"".format(r.name) for r in self.standbys)) + + +@six.python_2_unicode_compatible +class Any: + """ + Specifies a quorum-based synchronous replication and makes transaction + commits wait until their WAL records are replicated to at least ``num_sync`` + listed standbys. Only available for Postgres 10 and newer. + + Args: + sync_num (int): the number of standbys that transaction need to wait + for replies from + standbys (:obj:`list` of :class:`.PostgresNode`): the list of standby + nodes + """ + + def __init__(self, sync_num, standbys): + self.sync_num = sync_num + self.standbys = standbys + + def __str__(self): + return u"ANY {} ({})".format(self.sync_num, u", ".join( + u"\"{}\"".format(r.name) for r in self.standbys)) diff --git a/tests/test_simple.py b/tests/test_simple.py index 33defb12..f41567ed 100755 --- a/tests/test_simple.py +++ b/tests/test_simple.py @@ -42,6 +42,7 @@ from testgres import bound_ports from testgres.utils import pg_version_ge from testgres.enums import ProcessType +from testgres.standby import First, Any def util_exists(util): @@ -402,6 +403,38 @@ def test_replicate(self): res = node.execute('select * from test') self.assertListEqual(res, []) + def test_synchronous_replication(self): + with get_new_node() as master: + master.init(allow_streaming=True).start() + master.append_conf('synchronous_commit = remote_apply') + + # create standby + with master.replicate() as standby1, master.replicate() as standby2: + standby1.start() + standby2.start() + + # check formatting + self.assertEqual('1 ("{}", "{}")'.format( + standby1.name, standby2.name), + str(First(1, (standby1, standby2)))) + self.assertEqual('ANY 1 ("{}", "{}")'.format( + standby1.name, standby2.name), + str(Any(1, (standby1, standby2)))) + + # set synchronous_standby_names + master.set_synchronous_standbys([standby1, standby2]) + master.restart() + master.safe_psql('create table abc(a int)') + + # Create a large transaction that will take some time to apply + # on standby to check that it applies synchronously + # (If set synchronous_commit to 'on' or other lower level then + # standby most likely won't catchup so fast and test will fail) + master.safe_psql( + 'insert into abc select generate_series(1, 1000000)') + res = standby1.safe_psql('select count(*) from abc') + self.assertEqual(res, b'1000000\n') + def test_replication_slots(self): with get_new_node() as node: node.init(allow_streaming=True).start() From 45c837806531fc220755fbbab36830b79c94a1c2 Mon Sep 17 00:00:00 2001 From: Ildar Musin Date: Thu, 31 May 2018 19:01:20 +0300 Subject: [PATCH 3/5] fix formatting, make standby.First and standby.Any classes available from top level module --- testgres/__init__.py | 4 ++++ testgres/node.py | 4 +++- testgres/standby.py | 40 ++++++++++++++++++++-------------------- 3 files changed, 27 insertions(+), 21 deletions(-) diff --git a/testgres/__init__.py b/testgres/__init__.py index c907b708..bf232e2e 100644 --- a/testgres/__init__.py +++ b/testgres/__init__.py @@ -24,3 +24,7 @@ get_bin_path, \ get_pg_config, \ get_pg_version + +from .standby import \ + First, \ + Any diff --git a/testgres/node.py b/testgres/node.py index 34480bb3..73e621a9 100644 --- a/testgres/node.py +++ b/testgres/node.py @@ -1040,8 +1040,10 @@ def set_synchronous_standbys(self, standbys): Example:: + from testgres import get_new_node, First + master = get_new_node().init().start() - with master.replicate.start() as standby: + with master.replicate().start() as standby: master.append_conf("synchronous_commit = remote_apply") master.set_synchronous_standbys(First(1, [standby])) master.restart() diff --git a/testgres/standby.py b/testgres/standby.py index 775ad55f..e7ce408d 100644 --- a/testgres/standby.py +++ b/testgres/standby.py @@ -6,16 +6,16 @@ @six.python_2_unicode_compatible class First: """ - Specifies a priority-based synchronous replication and makes transaction - commits wait until their WAL records are replicated to ``num_sync`` - synchronous standbys chosen based on their priorities. - - Args: - sync_num (int): the number of standbys that transaction need to wait - for replies from - standbys (:obj:`list` of :class:`.PostgresNode`): the list of standby - nodes - """ + Specifies a priority-based synchronous replication and makes transaction + commits wait until their WAL records are replicated to ``num_sync`` + synchronous standbys chosen based on their priorities. + + Args: + sync_num (int): the number of standbys that transaction need to wait + for replies from + standbys (:obj:`list` of :class:`.PostgresNode`): the list of standby + nodes + """ def __init__(self, sync_num, standbys): self.sync_num = sync_num @@ -29,16 +29,16 @@ def __str__(self): @six.python_2_unicode_compatible class Any: """ - Specifies a quorum-based synchronous replication and makes transaction - commits wait until their WAL records are replicated to at least ``num_sync`` - listed standbys. Only available for Postgres 10 and newer. - - Args: - sync_num (int): the number of standbys that transaction need to wait - for replies from - standbys (:obj:`list` of :class:`.PostgresNode`): the list of standby - nodes - """ + Specifies a quorum-based synchronous replication and makes transaction + commits wait until their WAL records are replicated to at least ``num_sync`` + listed standbys. Only available for Postgres 10 and newer. + + Args: + sync_num (int): the number of standbys that transaction need to wait + for replies from + standbys (:obj:`list` of :class:`.PostgresNode`): the list of standby + nodes + """ def __init__(self, sync_num, standbys): self.sync_num = sync_num From 0819053175179cf7f9981855ced49e37dc3b5dd5 Mon Sep 17 00:00:00 2001 From: Ildar Musin Date: Thu, 31 May 2018 19:34:24 +0300 Subject: [PATCH 4/5] fix formatting --- tests/test_simple.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/tests/test_simple.py b/tests/test_simple.py index f41567ed..72261873 100755 --- a/tests/test_simple.py +++ b/tests/test_simple.py @@ -414,12 +414,12 @@ def test_synchronous_replication(self): standby2.start() # check formatting - self.assertEqual('1 ("{}", "{}")'.format( - standby1.name, standby2.name), - str(First(1, (standby1, standby2)))) - self.assertEqual('ANY 1 ("{}", "{}")'.format( - standby1.name, standby2.name), - str(Any(1, (standby1, standby2)))) + self.assertEqual( + '1 ("{}", "{}")'.format(standby1.name, standby2.name), + str(First(1, (standby1, standby2)))) # yapf: disable + self.assertEqual( + 'ANY 1 ("{}", "{}")'.format(standby1.name, standby2.name), + str(Any(1, (standby1, standby2)))) # yapf: disable # set synchronous_standby_names master.set_synchronous_standbys([standby1, standby2]) From a1fcfaca2bca74c8c0bcb9fa90baf96f12e12647 Mon Sep 17 00:00:00 2001 From: Ildar Musin Date: Fri, 1 Jun 2018 16:40:57 +0300 Subject: [PATCH 5/5] fix test_synchronous_replication so it could pass on all supported versions of Postgres --- testgres/node.py | 20 +++++++++++++++----- tests/test_simple.py | 30 +++++++++++++++++++----------- 2 files changed, 34 insertions(+), 16 deletions(-) diff --git a/testgres/node.py b/testgres/node.py index 73e621a9..dee5446b 100644 --- a/testgres/node.py +++ b/testgres/node.py @@ -1034,9 +1034,11 @@ def set_synchronous_standbys(self, standbys): Args: standbys: either :class:`.First` or :class:`.Any` object specifying - sychronization parameters. It is also possible to pass simply - a list of replicas which would be equivalent to passing - ``First(1, )`` + sychronization parameters or just a plain list of + :class:`.PostgresNode`s replicas which would be equivalent + to passing ``First(1, )``. For PostgreSQL 9.5 and below + it is only possible to specify a plain list of standbys as + `FIRST` and `ANY` keywords aren't supported. Example:: @@ -1049,8 +1051,16 @@ def set_synchronous_standbys(self, standbys): master.restart() """ - if isinstance(standbys, Iterable): - standbys = First(1, standbys) + if pg_version_ge('9.6'): + if isinstance(standbys, Iterable): + standbys = First(1, standbys) + else: + if isinstance(standbys, Iterable): + standbys = u", ".join( + u"\"{}\"".format(r.name) for r in standbys) + else: + raise TestgresException("Feature isn't supported in " + "Postgres 9.5 and below") self.append_conf("synchronous_standby_names = '{}'".format(standbys)) diff --git a/tests/test_simple.py b/tests/test_simple.py index 72261873..f0990430 100755 --- a/tests/test_simple.py +++ b/tests/test_simple.py @@ -405,8 +405,12 @@ def test_replicate(self): def test_synchronous_replication(self): with get_new_node() as master: + old_version = not pg_version_ge('9.6') + master.init(allow_streaming=True).start() - master.append_conf('synchronous_commit = remote_apply') + + if not old_version: + master.append_conf('synchronous_commit = remote_apply') # create standby with master.replicate() as standby1, master.replicate() as standby2: @@ -424,16 +428,20 @@ def test_synchronous_replication(self): # set synchronous_standby_names master.set_synchronous_standbys([standby1, standby2]) master.restart() - master.safe_psql('create table abc(a int)') - - # Create a large transaction that will take some time to apply - # on standby to check that it applies synchronously - # (If set synchronous_commit to 'on' or other lower level then - # standby most likely won't catchup so fast and test will fail) - master.safe_psql( - 'insert into abc select generate_series(1, 1000000)') - res = standby1.safe_psql('select count(*) from abc') - self.assertEqual(res, b'1000000\n') + + # the following part of the test is only applicable to newer + # versions of PostgresQL + if not old_version: + master.safe_psql('create table abc(a int)') + + # Create a large transaction that will take some time to apply + # on standby to check that it applies synchronously + # (If set synchronous_commit to 'on' or other lower level then + # standby most likely won't catchup so fast and test will fail) + master.safe_psql( + 'insert into abc select generate_series(1, 1000000)') + res = standby1.safe_psql('select count(*) from abc') + self.assertEqual(res, b'1000000\n') def test_replication_slots(self): with get_new_node() as node: