From 347a194f75ce332cf93f6f736835f5e8f0444908 Mon Sep 17 00:00:00 2001 From: Aleksey Vlasenko Date: Wed, 18 Sep 2019 11:45:32 -0700 Subject: [PATCH 1/6] Fixing Dockerfile --- tools/dev/Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/dev/Dockerfile b/tools/dev/Dockerfile index 663a49bf0..f5e2d68bb 100644 --- a/tools/dev/Dockerfile +++ b/tools/dev/Dockerfile @@ -49,7 +49,7 @@ RUN /bin/bash -c "source activate tfio-dev && python -m pip install \ pyarrow==${ARROW_VERSION} \ pandas \ fastavro \ - gast==0.2.2 + gast==0.2.2 \ ${PIP_ADD_PACKAGES} \ " From 3199cc666baa972e1f59a117503307005f232103 Mon Sep 17 00:00:00 2001 From: Aleksey Vlasenko Date: Wed, 18 Sep 2019 11:46:17 -0700 Subject: [PATCH 2/6] Returning dataset in a form of Dictionary from BigQuery connector --- .../bigquery/python/ops/bigquery_api.py | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/tensorflow_io/bigquery/python/ops/bigquery_api.py b/tensorflow_io/bigquery/python/ops/bigquery_api.py index a6400373a..11ac8f017 100644 --- a/tensorflow_io/bigquery/python/ops/bigquery_api.py +++ b/tensorflow_io/bigquery/python/ops/bigquery_api.py @@ -27,6 +27,9 @@ from __future__ import division from __future__ import print_function +from collections import OrderedDict +from operator import itemgetter + from tensorflow.python.data.experimental.ops import interleave_ops from tensorflow.python.data.ops import dataset_ops from tensorflow.python.framework import dtypes @@ -223,8 +226,19 @@ class _BigQueryDataset(dataset_ops.DatasetSource): def __init__(self, client_resource, selected_fields, output_types, avro_schema, stream): - self._element_spec = tuple( - tensor_spec.TensorSpec([], dtype) for dtype in output_types) + + # selected_fields and corresponding output_types have to be sorted because + # of b/141251314 + sorted_fields_with_types = sorted( + zip(selected_fields, output_types), + key=itemgetter(0)) + selected_fields, output_types = list(zip(*sorted_fields_with_types)) + selected_fields = list(selected_fields) + output_types = list(output_types) + + self._element_spec = OrderedDict(zip( + selected_fields, + (tensor_spec.TensorSpec([], dtype) for dtype in output_types))) variant_tensor = _bigquery_so.big_query_dataset( client=client_resource, From d0090d0c2e07d4ac92eb981e1d614f970511dea2 Mon Sep 17 00:00:00 2001 From: Aleksey Vlasenko Date: Wed, 18 Sep 2019 11:49:00 -0700 Subject: [PATCH 3/6] Adding NULL fields support to BigQuery connector --- tensorflow_io/bigquery/kernels/bigquery_lib.h | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/tensorflow_io/bigquery/kernels/bigquery_lib.h b/tensorflow_io/bigquery/kernels/bigquery_lib.h index 52589a432..a1d600ccb 100644 --- a/tensorflow_io/bigquery/kernels/bigquery_lib.h +++ b/tensorflow_io/bigquery/kernels/bigquery_lib.h @@ -184,6 +184,9 @@ class BigQueryReaderDatasetIterator : public DatasetIterator { case avro::AVRO_ENUM: dtype = DT_STRING; break; + case avro::AVRO_NULL: + dtype = output_types[i]; + break; default: return errors::InvalidArgument("unsupported data type: ", field.type()); @@ -250,6 +253,8 @@ class BigQueryReaderDatasetIterator : public DatasetIterator { ((*out_tensors)[i]).scalar()() = field.value().symbol(); break; + case avro::AVRO_NULL: // Fallthrough; + break; default: return errors::InvalidArgument("unsupported data type: ", field.type()); From 593f00f73d38a015c33d80c28cac0d128b3ca154 Mon Sep 17 00:00:00 2001 From: Aleksey Vlasenko Date: Wed, 18 Sep 2019 12:39:14 -0700 Subject: [PATCH 4/6] python style tweak --- tensorflow_io/bigquery/python/ops/bigquery_api.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tensorflow_io/bigquery/python/ops/bigquery_api.py b/tensorflow_io/bigquery/python/ops/bigquery_api.py index 11ac8f017..918f70404 100644 --- a/tensorflow_io/bigquery/python/ops/bigquery_api.py +++ b/tensorflow_io/bigquery/python/ops/bigquery_api.py @@ -27,7 +27,7 @@ from __future__ import division from __future__ import print_function -from collections import OrderedDict +import collections from operator import itemgetter from tensorflow.python.data.experimental.ops import interleave_ops @@ -236,7 +236,7 @@ def __init__(self, client_resource, selected_fields, output_types, selected_fields = list(selected_fields) output_types = list(output_types) - self._element_spec = OrderedDict(zip( + self._element_spec = collections.OrderedDict(zip( selected_fields, (tensor_spec.TensorSpec([], dtype) for dtype in output_types))) From 5d343ab4822c19ef7cdeb9ea7c2d3e980c8d86f6 Mon Sep 17 00:00:00 2001 From: Aleksey Vlasenko Date: Wed, 18 Sep 2019 13:45:15 -0700 Subject: [PATCH 5/6] more style tweaks --- tensorflow_io/bigquery/kernels/bigquery_lib.h | 2 +- tensorflow_io/bigquery/python/ops/bigquery_api.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/tensorflow_io/bigquery/kernels/bigquery_lib.h b/tensorflow_io/bigquery/kernels/bigquery_lib.h index a1d600ccb..16d0ae4d1 100644 --- a/tensorflow_io/bigquery/kernels/bigquery_lib.h +++ b/tensorflow_io/bigquery/kernels/bigquery_lib.h @@ -253,7 +253,7 @@ class BigQueryReaderDatasetIterator : public DatasetIterator { ((*out_tensors)[i]).scalar()() = field.value().symbol(); break; - case avro::AVRO_NULL: // Fallthrough; + case avro::AVRO_NULL: break; default: return errors::InvalidArgument("unsupported data type: ", diff --git a/tensorflow_io/bigquery/python/ops/bigquery_api.py b/tensorflow_io/bigquery/python/ops/bigquery_api.py index 918f70404..594f5e077 100644 --- a/tensorflow_io/bigquery/python/ops/bigquery_api.py +++ b/tensorflow_io/bigquery/python/ops/bigquery_api.py @@ -28,7 +28,7 @@ from __future__ import print_function import collections -from operator import itemgetter +import operator from tensorflow.python.data.experimental.ops import interleave_ops from tensorflow.python.data.ops import dataset_ops @@ -231,7 +231,7 @@ def __init__(self, client_resource, selected_fields, output_types, # of b/141251314 sorted_fields_with_types = sorted( zip(selected_fields, output_types), - key=itemgetter(0)) + key=operator.itemgetter(0)) selected_fields, output_types = list(zip(*sorted_fields_with_types)) selected_fields = list(selected_fields) output_types = list(output_types) From 1870e5b0780f9c74e0a80e9ca79269b0e9b7cba9 Mon Sep 17 00:00:00 2001 From: Aleksey Vlasenko Date: Wed, 18 Sep 2019 13:42:26 -0700 Subject: [PATCH 6/6] Style tweaks, comming from google account --- tensorflow_io/bigquery/kernels/bigquery_lib.h | 2 +- tensorflow_io/bigquery/python/ops/bigquery_api.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/tensorflow_io/bigquery/kernels/bigquery_lib.h b/tensorflow_io/bigquery/kernels/bigquery_lib.h index 16d0ae4d1..a1d600ccb 100644 --- a/tensorflow_io/bigquery/kernels/bigquery_lib.h +++ b/tensorflow_io/bigquery/kernels/bigquery_lib.h @@ -253,7 +253,7 @@ class BigQueryReaderDatasetIterator : public DatasetIterator { ((*out_tensors)[i]).scalar()() = field.value().symbol(); break; - case avro::AVRO_NULL: + case avro::AVRO_NULL: // Fallthrough; break; default: return errors::InvalidArgument("unsupported data type: ", diff --git a/tensorflow_io/bigquery/python/ops/bigquery_api.py b/tensorflow_io/bigquery/python/ops/bigquery_api.py index 594f5e077..918f70404 100644 --- a/tensorflow_io/bigquery/python/ops/bigquery_api.py +++ b/tensorflow_io/bigquery/python/ops/bigquery_api.py @@ -28,7 +28,7 @@ from __future__ import print_function import collections -import operator +from operator import itemgetter from tensorflow.python.data.experimental.ops import interleave_ops from tensorflow.python.data.ops import dataset_ops @@ -231,7 +231,7 @@ def __init__(self, client_resource, selected_fields, output_types, # of b/141251314 sorted_fields_with_types = sorted( zip(selected_fields, output_types), - key=operator.itemgetter(0)) + key=itemgetter(0)) selected_fields, output_types = list(zip(*sorted_fields_with_types)) selected_fields = list(selected_fields) output_types = list(output_types)