diff --git a/.circleci/config.yml b/.circleci/config.yml index a0580f0..26c76f1 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -10,6 +10,7 @@ jobs: command: | aws s3 cp s3://com-stitchdata-dev-deployment-assets/environments/tap-tester/sandbox dev_env.sh source dev_env.sh + export LC_ALL=C python3 -m venv /usr/local/share/virtualenvs/tap-postgres source /usr/local/share/virtualenvs/tap-postgres/bin/activate pip install . diff --git a/.github/pull_request_template.md b/.github/pull_request_template.md new file mode 100644 index 0000000..c71d3b3 --- /dev/null +++ b/.github/pull_request_template.md @@ -0,0 +1,11 @@ +# Description of change +(write a short description here or paste a link to JIRA) + +# QA steps + - [ ] automated tests passing + - [ ] manual qa steps passing (list below) + +# Risks + +# Rollback steps + - revert this branch diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..f5ca3f4 --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,7 @@ +# Changelog + +## 0.0.65 + * Add support for `int8[]` (`bigint[]`) array types to log-based replication [#69](https://github.com/singer-io/tap-postgres/pull/69) + +## 0.0.64 + * Pass string to `decimal.Decimal` when handling numeric data type [#67](https://github.com/singer-io/tap-postgres/pull/67) diff --git a/setup.py b/setup.py index 8b3a890..8a3e1db 100644 --- a/setup.py +++ b/setup.py @@ -3,7 +3,7 @@ from setuptools import setup setup(name='tap-postgres', - version='0.0.59', + version='0.0.65', description='Singer.io tap for extracting data from PostgreSQL', author='Stitch', url='https://singer.io', diff --git a/tap_postgres/__init__.py b/tap_postgres/__init__.py index 9f08834..0cc9b28 100644 --- a/tap_postgres/__init__.py +++ b/tap_postgres/__init__.py @@ -273,7 +273,7 @@ def produce_table_info(conn): AND NOT a.attisdropped AND pg_class.relkind IN ('r', 'v', 'm') AND n.nspname NOT in ('pg_toast', 'pg_catalog', 'information_schema') -AND has_table_privilege(pg_class.oid, 'SELECT') = true """) +AND has_column_privilege(pg_class.oid, attname, 'SELECT') = true """) for row in cur.fetchall(): row_count, is_view, schema_name, table_name, *col_info = row @@ -493,7 +493,7 @@ def sync_method_for_streams(streams, state, default_replication_method): state = clear_state_on_replication_change(state, stream['tap_stream_id'], replication_key, replication_method) if replication_method not in set(['LOG_BASED', 'FULL_TABLE', 'INCREMENTAL']): - raise Exception("Unrecognized replication_method {}".format(replication_method)) + raise Exception("Unrecognized replication_method {} for stream {}".format(replication_method, stream['tap_stream_id'])) md_map = metadata.to_map(stream['metadata']) desired_columns = [c for c in stream['schema']['properties'].keys() if sync_common.should_sync_column(md_map, c)] @@ -691,9 +691,9 @@ def main_impl(): if args.discover: do_discovery(conn_config) - elif args.properties or args.catalog: + elif args.properties: state = args.state - do_sync(conn_config, args.catalog.to_dict() if args.catalog else args.properties, args.config.get('default_replication_method'), state) + do_sync(conn_config, args.properties, args.config.get('default_replication_method'), state) else: LOGGER.info("No properties were selected") diff --git a/tap_postgres/sync_strategies/logical_replication.py b/tap_postgres/sync_strategies/logical_replication.py index 6f930bf..3f2b82c 100644 --- a/tap_postgres/sync_strategies/logical_replication.py +++ b/tap_postgres/sync_strategies/logical_replication.py @@ -10,6 +10,7 @@ import tap_postgres.sync_strategies.common as sync_common from dateutil.parser import parse import psycopg2 +from psycopg2 import sql import copy from select import select from functools import reduce @@ -68,11 +69,14 @@ def tuples_to_map(accum, t): accum[t[0]] = t[1] return accum +def create_hstore_elem_query(elem): + return sql.SQL("SELECT hstore_to_array({})").format(sql.Literal(elem)) + def create_hstore_elem(conn_info, elem): with post_db.open_connection(conn_info) as conn: with conn.cursor() as cur: - sql = """SELECT hstore_to_array('{}')""".format(elem) - cur.execute(sql) + query = create_hstore_elem_query(elem) + cur.execute(query) res = cur.fetchone()[0] hstore_elem = reduce(tuples_to_map, [res[i:i + 2] for i in range(0, len(res), 2)], {}) return hstore_elem @@ -101,6 +105,8 @@ def create_array_elem(elem, sql_datatype, conn_info): cast_datatype = 'text[]' elif sql_datatype == 'integer[]': cast_datatype = 'integer[]' + elif sql_datatype == 'bigint[]': + cast_datatype = 'bigint[]' elif sql_datatype == 'inet[]': cast_datatype = 'inet[]' elif sql_datatype == 'json[]': @@ -130,8 +136,8 @@ def create_array_elem(elem, sql_datatype, conn_info): #custom datatypes like enums cast_datatype = 'text[]' - sql = """SELECT $stitch_quote${}$stitch_quote$::{}""".format(elem, cast_datatype) - cur.execute(sql) + sql_stmt = """SELECT $stitch_quote${}$stitch_quote$::{}""".format(elem, cast_datatype) + cur.execute(sql_stmt) res = cur.fetchone()[0] return res @@ -164,7 +170,7 @@ def selected_value_to_singer_value_impl(elem, og_sql_datatype, conn_info): if sql_datatype == 'hstore': return create_hstore_elem(conn_info, elem) if 'numeric' in sql_datatype: - return decimal.Decimal(elem) + return decimal.Decimal(str(elem)) if isinstance(elem, int): return elem if isinstance(elem, float): diff --git a/tests/test_discovery.py b/tests/test_discovery.py index 96d2bf8..7984318 100644 --- a/tests/test_discovery.py +++ b/tests/test_discovery.py @@ -365,6 +365,16 @@ def test_catalog(self): 'definitions' : tap_postgres.BASE_RECURSIVE_SCHEMAS}, stream_dict.get('schema')) + def test_escaping_values(self): + key = 'nickname' + value = "Dave's Courtyard" + elem = '"{}"=>"{}"'.format(key, value) + + with get_test_connection() as conn: + with conn.cursor() as cur: + query = tap_postgres.sync_strategies.logical_replication.create_hstore_elem_query(elem) + self.assertEqual(query.as_string(cur), "SELECT hstore_to_array('\"nickname\"=>\"Dave''s Courtyard\"')") + class TestEnumTable(unittest.TestCase): maxDiff = None @@ -566,6 +576,63 @@ def test_catalog(self): 'definitions' : tap_postgres.BASE_RECURSIVE_SCHEMAS}, stream_dict.get('schema')) +class TestColumnGrants(unittest.TestCase): + maxDiff = None + table_name = 'CHICKEN TIMES' + user = 'tmp_user_for_grant_tests' + password = 'password' + + def setUp(self): + table_spec = {"columns": [{"name" : "id", "type" : "integer", "serial" : True}, + {"name" : 'size integer', "type" : "integer", "quoted" : True}, + {"name" : 'size smallint', "type" : "smallint", "quoted" : True}, + {"name" : 'size bigint', "type" : "bigint", "quoted" : True}], + "name" : TestColumnGrants.table_name} + ensure_test_table(table_spec) + + with get_test_connection() as conn: + cur = conn.cursor() + + sql = """ DROP USER IF EXISTS {} """.format(self.user, self.password) + LOGGER.info(sql) + cur.execute(sql) + + sql = """ CREATE USER {} WITH PASSWORD '{}' """.format(self.user, self.password) + LOGGER.info(sql) + cur.execute(sql) + + sql = """ GRANT SELECT ("id") ON "{}" TO {}""".format(TestColumnGrants.table_name, self.user) + LOGGER.info("running sql: {}".format(sql)) + cur.execute(sql) + + + + + def test_catalog(self): + conn_config = get_test_connection_config() + conn_config['user'] = self.user + conn_config['password'] = self.password + streams = tap_postgres.do_discovery(conn_config) + chicken_streams = [s for s in streams if s['tap_stream_id'] == 'postgres-public-CHICKEN TIMES'] + + self.assertEqual(len(chicken_streams), 1) + stream_dict = chicken_streams[0] + + self.assertEqual(TestStringTableWithPK.table_name, stream_dict.get('table_name')) + self.assertEqual(TestStringTableWithPK.table_name, stream_dict.get('stream')) + + + stream_dict.get('metadata').sort(key=lambda md: md['breadcrumb']) + + self.assertEqual(metadata.to_map(stream_dict.get('metadata')), + {() : {'table-key-properties': [], 'database-name': 'postgres', 'schema-name': 'public', 'is-view': False, 'row-count': 0}, + ('properties', 'id') : {'inclusion': 'available', 'sql-datatype' : 'integer', 'selected-by-default' : True}}) + + self.assertEqual({'definitions' : tap_postgres.BASE_RECURSIVE_SCHEMAS, + 'type': 'object', + 'properties': {'id': {'type': ['null', 'integer'], 'minimum': -2147483648, 'maximum': 2147483647}}}, + stream_dict.get('schema')) + if __name__== "__main__": test1 = TestArraysTable()