Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ jobs:
command: |
aws s3 cp s3://com-stitchdata-dev-deployment-assets/environments/tap-tester/sandbox dev_env.sh
source dev_env.sh
export LC_ALL=C
python3 -m venv /usr/local/share/virtualenvs/tap-postgres
source /usr/local/share/virtualenvs/tap-postgres/bin/activate
pip install .
Expand Down
11 changes: 11 additions & 0 deletions .github/pull_request_template.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# Description of change
(write a short description here or paste a link to JIRA)

# QA steps
- [ ] automated tests passing
- [ ] manual qa steps passing (list below)

# Risks

# Rollback steps
- revert this branch
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Changelog

## 0.0.65
* Add support for `int8[]` (`bigint[]`) array types to log-based replication [#69](https://github.com/singer-io/tap-postgres/pull/69)

## 0.0.64
* Pass string to `decimal.Decimal` when handling numeric data type [#67](https://github.com/singer-io/tap-postgres/pull/67)
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from setuptools import setup

setup(name='tap-postgres',
version='0.0.59',
version='0.0.65',
description='Singer.io tap for extracting data from PostgreSQL',
author='Stitch',
url='https://singer.io',
Expand Down
8 changes: 4 additions & 4 deletions tap_postgres/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ def produce_table_info(conn):
AND NOT a.attisdropped
AND pg_class.relkind IN ('r', 'v', 'm')
AND n.nspname NOT in ('pg_toast', 'pg_catalog', 'information_schema')
AND has_table_privilege(pg_class.oid, 'SELECT') = true """)
AND has_column_privilege(pg_class.oid, attname, 'SELECT') = true """)
for row in cur.fetchall():
row_count, is_view, schema_name, table_name, *col_info = row

Expand Down Expand Up @@ -493,7 +493,7 @@ def sync_method_for_streams(streams, state, default_replication_method):
state = clear_state_on_replication_change(state, stream['tap_stream_id'], replication_key, replication_method)

if replication_method not in set(['LOG_BASED', 'FULL_TABLE', 'INCREMENTAL']):
raise Exception("Unrecognized replication_method {}".format(replication_method))
raise Exception("Unrecognized replication_method {} for stream {}".format(replication_method, stream['tap_stream_id']))

md_map = metadata.to_map(stream['metadata'])
desired_columns = [c for c in stream['schema']['properties'].keys() if sync_common.should_sync_column(md_map, c)]
Expand Down Expand Up @@ -691,9 +691,9 @@ def main_impl():

if args.discover:
do_discovery(conn_config)
elif args.properties or args.catalog:
elif args.properties:
state = args.state
do_sync(conn_config, args.catalog.to_dict() if args.catalog else args.properties, args.config.get('default_replication_method'), state)
do_sync(conn_config, args.properties, args.config.get('default_replication_method'), state)
else:
LOGGER.info("No properties were selected")

Expand Down
16 changes: 11 additions & 5 deletions tap_postgres/sync_strategies/logical_replication.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import tap_postgres.sync_strategies.common as sync_common
from dateutil.parser import parse
import psycopg2
from psycopg2 import sql
import copy
from select import select
from functools import reduce
Expand Down Expand Up @@ -68,11 +69,14 @@ def tuples_to_map(accum, t):
accum[t[0]] = t[1]
return accum

def create_hstore_elem_query(elem):
return sql.SQL("SELECT hstore_to_array({})").format(sql.Literal(elem))

def create_hstore_elem(conn_info, elem):
with post_db.open_connection(conn_info) as conn:
with conn.cursor() as cur:
sql = """SELECT hstore_to_array('{}')""".format(elem)
cur.execute(sql)
query = create_hstore_elem_query(elem)
cur.execute(query)
res = cur.fetchone()[0]
hstore_elem = reduce(tuples_to_map, [res[i:i + 2] for i in range(0, len(res), 2)], {})
return hstore_elem
Expand Down Expand Up @@ -101,6 +105,8 @@ def create_array_elem(elem, sql_datatype, conn_info):
cast_datatype = 'text[]'
elif sql_datatype == 'integer[]':
cast_datatype = 'integer[]'
elif sql_datatype == 'bigint[]':
cast_datatype = 'bigint[]'
elif sql_datatype == 'inet[]':
cast_datatype = 'inet[]'
elif sql_datatype == 'json[]':
Expand Down Expand Up @@ -130,8 +136,8 @@ def create_array_elem(elem, sql_datatype, conn_info):
#custom datatypes like enums
cast_datatype = 'text[]'

sql = """SELECT $stitch_quote${}$stitch_quote$::{}""".format(elem, cast_datatype)
cur.execute(sql)
sql_stmt = """SELECT $stitch_quote${}$stitch_quote$::{}""".format(elem, cast_datatype)
cur.execute(sql_stmt)
res = cur.fetchone()[0]
return res

Expand Down Expand Up @@ -164,7 +170,7 @@ def selected_value_to_singer_value_impl(elem, og_sql_datatype, conn_info):
if sql_datatype == 'hstore':
return create_hstore_elem(conn_info, elem)
if 'numeric' in sql_datatype:
return decimal.Decimal(elem)
return decimal.Decimal(str(elem))
if isinstance(elem, int):
return elem
if isinstance(elem, float):
Expand Down
67 changes: 67 additions & 0 deletions tests/test_discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,16 @@ def test_catalog(self):
'definitions' : tap_postgres.BASE_RECURSIVE_SCHEMAS},
stream_dict.get('schema'))

def test_escaping_values(self):
key = 'nickname'
value = "Dave's Courtyard"
elem = '"{}"=>"{}"'.format(key, value)

with get_test_connection() as conn:
with conn.cursor() as cur:
query = tap_postgres.sync_strategies.logical_replication.create_hstore_elem_query(elem)
self.assertEqual(query.as_string(cur), "SELECT hstore_to_array('\"nickname\"=>\"Dave''s Courtyard\"')")


class TestEnumTable(unittest.TestCase):
maxDiff = None
Expand Down Expand Up @@ -566,6 +576,63 @@ def test_catalog(self):
'definitions' : tap_postgres.BASE_RECURSIVE_SCHEMAS},
stream_dict.get('schema'))

class TestColumnGrants(unittest.TestCase):
maxDiff = None
table_name = 'CHICKEN TIMES'
user = 'tmp_user_for_grant_tests'
password = 'password'

def setUp(self):
table_spec = {"columns": [{"name" : "id", "type" : "integer", "serial" : True},
{"name" : 'size integer', "type" : "integer", "quoted" : True},
{"name" : 'size smallint', "type" : "smallint", "quoted" : True},
{"name" : 'size bigint', "type" : "bigint", "quoted" : True}],
"name" : TestColumnGrants.table_name}
ensure_test_table(table_spec)

with get_test_connection() as conn:
cur = conn.cursor()

sql = """ DROP USER IF EXISTS {} """.format(self.user, self.password)
LOGGER.info(sql)
cur.execute(sql)

sql = """ CREATE USER {} WITH PASSWORD '{}' """.format(self.user, self.password)
LOGGER.info(sql)
cur.execute(sql)

sql = """ GRANT SELECT ("id") ON "{}" TO {}""".format(TestColumnGrants.table_name, self.user)
LOGGER.info("running sql: {}".format(sql))
cur.execute(sql)




def test_catalog(self):
conn_config = get_test_connection_config()
conn_config['user'] = self.user
conn_config['password'] = self.password
streams = tap_postgres.do_discovery(conn_config)
chicken_streams = [s for s in streams if s['tap_stream_id'] == 'postgres-public-CHICKEN TIMES']

self.assertEqual(len(chicken_streams), 1)
stream_dict = chicken_streams[0]

self.assertEqual(TestStringTableWithPK.table_name, stream_dict.get('table_name'))
self.assertEqual(TestStringTableWithPK.table_name, stream_dict.get('stream'))


stream_dict.get('metadata').sort(key=lambda md: md['breadcrumb'])

self.assertEqual(metadata.to_map(stream_dict.get('metadata')),
{() : {'table-key-properties': [], 'database-name': 'postgres', 'schema-name': 'public', 'is-view': False, 'row-count': 0},
('properties', 'id') : {'inclusion': 'available', 'sql-datatype' : 'integer', 'selected-by-default' : True}})

self.assertEqual({'definitions' : tap_postgres.BASE_RECURSIVE_SCHEMAS,
'type': 'object',
'properties': {'id': {'type': ['null', 'integer'], 'minimum': -2147483648, 'maximum': 2147483647}}},
stream_dict.get('schema'))


if __name__== "__main__":
test1 = TestArraysTable()
Expand Down