|
19 | 19 |
|
20 | 20 | import sys |
21 | 21 | import collections |
| 22 | +import uuid |
22 | 23 |
|
23 | 24 | import tensorflow as tf |
24 | 25 | from tensorflow_io.core.python.ops import core_ops |
@@ -457,7 +458,9 @@ def __init__(self, |
457 | 458 | internal=False): |
458 | 459 | with tf.name_scope("AudioIOTensor") as scope: |
459 | 460 | resource, dtypes, shapes, rate = core_ops.wav_indexable_init( |
460 | | - filename, container=scope, shared_name=filename) |
| 461 | + filename, |
| 462 | + container=scope, |
| 463 | + shared_name="%s/%s" % (filename, uuid.uuid4().hex)) |
461 | 464 | shapes = [ |
462 | 465 | tf.TensorShape( |
463 | 466 | [None if dim < 0 else dim for dim in e.numpy() if dim != 0] |
@@ -493,27 +496,46 @@ class JSONIOTensor(IOTensor): |
493 | 496 | #============================================================================= |
494 | 497 | def __init__(self, |
495 | 498 | filename, |
| 499 | + columns=None, |
496 | 500 | internal=False): |
497 | 501 | with tf.name_scope("JSONIOTensor") as scope: |
| 502 | + metadata = [] |
| 503 | + if columns is not None: |
| 504 | + metadata.extend(["column: "+column for column in columns]) |
498 | 505 | resource, dtypes, shapes, columns = core_ops.json_indexable_init( |
499 | | - filename, container=scope, shared_name=filename) |
| 506 | + filename, metadata=metadata, |
| 507 | + container=scope, |
| 508 | + shared_name="%s/%s" % (filename, uuid.uuid4().hex)) |
500 | 509 | shapes = [ |
501 | 510 | tf.TensorShape( |
502 | 511 | [None if dim < 0 else dim for dim in e.numpy() if dim != 0] |
503 | 512 | ) for e in tf.unstack(shapes)] |
504 | 513 | dtypes = [tf.as_dtype(e.numpy()) for e in tf.unstack(dtypes)] |
505 | | - spec = [tf.TensorSpec(shape, dtype) for ( |
506 | | - shape, dtype) in zip(shapes, dtypes)] |
| 514 | + columns = [e.numpy() for e in tf.unstack(columns)] |
| 515 | + spec = [tf.TensorSpec(shape, dtype, column) for ( |
| 516 | + shape, dtype, column) in zip(shapes, dtypes, columns)] |
507 | 517 | if len(spec) == 1: |
508 | 518 | spec = spec[0] |
509 | 519 | else: |
510 | 520 | spec = tuple(spec) |
511 | | - columns = [e.numpy() for e in tf.unstack(columns)] |
| 521 | + self._filename = filename |
512 | 522 | super(JSONIOTensor, self).__init__( |
513 | 523 | spec, resource, core_ops.json_indexable_get_item, |
514 | 524 | None, |
515 | 525 | internal=internal) |
516 | 526 |
|
| 527 | + #============================================================================= |
| 528 | + # Accessors |
| 529 | + #============================================================================= |
| 530 | + |
| 531 | + def column(self, name): |
| 532 | + """The `TensorSpec` of column named `name`""" |
| 533 | + return next(e for e in tf.nest.flatten(self.spec) if e.name == name) |
| 534 | + |
| 535 | + def __call__(self, column): |
| 536 | + """Return a new JSONIOTensor with column named `column`""" |
| 537 | + return JSONIOTensor(self._filename, columns=[column], internal=True) |
| 538 | + |
517 | 539 | class KafkaIOTensor(IOIterableTensor): |
518 | 540 | """KafkaIOTensor""" |
519 | 541 |
|
@@ -541,7 +563,8 @@ def func_init(data): |
541 | 563 | """func_init""" |
542 | 564 | resource, _, _ = core_ops.kafka_iterable_init( |
543 | 565 | data["subscription"], metadata=data["metadata"], |
544 | | - container=scope, shared_name=subscription) |
| 566 | + container=scope, |
| 567 | + shared_name="%s/%s" % (subscription, uuid.uuid4().hex)) |
545 | 568 | return resource |
546 | 569 | func_next = core_ops.kafka_iterable_next |
547 | 570 |
|
|
0 commit comments