Skip to content

Commit a6c94ad

Browse files
committed
Merge remote-tracking branch 'upstream/master'
2 parents 41299b5 + d893912 commit a6c94ad

File tree

14 files changed

+85
-61
lines changed

14 files changed

+85
-61
lines changed

samza-api/src/main/java/org/apache/samza/operators/TableDescriptor.java

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
package org.apache.samza.operators;
2020

2121
import org.apache.samza.annotation.InterfaceStability;
22-
import org.apache.samza.serializers.KVSerde;
2322

2423
/**
2524
* User facing class to collect metadata that fully describes a
@@ -30,8 +29,8 @@
3029
*
3130
* <pre>
3231
* {@code
33-
* TableDescriptor<Integer, String, ?> tableDesc = new RocksDbTableDescriptor("tbl")
34-
* .withSerde(KVSerde.of(new IntegerSerde(), new StringSerde("UTF-8")))
32+
* TableDescriptor<Integer, String, ?> tableDesc = new RocksDbTableDescriptor("tbl",
33+
* KVSerde.of(new IntegerSerde(), new StringSerde("UTF-8")))
3534
* .withBlockSize(1024)
3635
* .withConfig("some-key", "some-value");
3736
* }
@@ -54,14 +53,6 @@ public interface TableDescriptor<K, V, D extends TableDescriptor<K, V, D>> {
5453
*/
5554
String getTableId();
5655

57-
/**
58-
* Set the Serde for this table
59-
* @param serde the serde
60-
* @return this table descriptor instance
61-
* @throws IllegalArgumentException if null is provided
62-
*/
63-
D withSerde(KVSerde<K, V> serde);
64-
6556
/**
6657
* Add a configuration entry for the table
6758
* @param key the key

samza-api/src/main/java/org/apache/samza/table/TableDescriptorsProvider.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -44,13 +44,11 @@
4444
* public List<TableDescriptor> getTableDescriptors() {
4545
* List<TableDescriptor> tableDescriptors = new ArrayList<>();
4646
* final TableReadFunction readRemoteTableFn = new MyStoreReadFunction();
47-
* tableDescriptors.add(new RemoteTableDescriptor<>("remote-table-1")
48-
* .withReadFunction(readRemoteTableFn)
49-
* .withSerde(KVSerde.of(new StringSerde(), new StringSerde())));
47+
* tableDescriptors.add(new RemoteTableDescriptor<>("remote-table-1", KVSerde.of(new StringSerde(), new StringSerde()))
48+
* .withReadFunction(readRemoteTableFn);
5049
*
51-
* tableDescriptors.add(new RocksDbTableDescriptor("local-table-1")
50+
* tableDescriptors.add(new RocksDbTableDescriptor("local-table-1", KVSerde.of(new LongSerde(), new StringSerde<>()))
5251
* .withBlockSize(4096)
53-
* .withSerde(KVSerde.of(new LongSerde(), new StringSerde<>())));
5452
* .withConfig("some-key", "some-value");
5553
* return tableDescriptors;
5654
* }

samza-core/src/main/java/org/apache/samza/operators/BaseTableDescriptor.java

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -51,18 +51,19 @@ protected BaseTableDescriptor(String tableId) {
5151
this.tableId = tableId;
5252
}
5353

54-
@Override
55-
public D withConfig(String key, String value) {
56-
config.put(key, value);
57-
return (D) this;
54+
/**
55+
* Constructs a table descriptor instance
56+
* @param tableId Id of the table, it must confirm to pattern { @literal [\\d\\w-_]+ }
57+
* @param serde the serde for key and value
58+
*/
59+
protected BaseTableDescriptor(String tableId, KVSerde<K, V> serde) {
60+
this.tableId = tableId;
61+
this.serde = serde;
5862
}
5963

6064
@Override
61-
public D withSerde(KVSerde<K, V> serde) {
62-
if (serde == null) {
63-
throw new IllegalArgumentException("Serde cannot be null");
64-
}
65-
this.serde = serde;
65+
public D withConfig(String key, String value) {
66+
config.put(key, value);
6667
return (D) this;
6768
}
6869

samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableDescriptor.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.Map;
2424

2525
import org.apache.samza.operators.BaseTableDescriptor;
26+
import org.apache.samza.serializers.KVSerde;
2627
import org.apache.samza.table.TableSpec;
2728
import org.apache.samza.table.retry.TableRetryPolicy;
2829
import org.apache.samza.table.utils.SerdeUtils;
@@ -79,12 +80,22 @@ public class RemoteTableDescriptor<K, V> extends BaseTableDescriptor<K, V, Remot
7980
private int asyncCallbackPoolSize = -1;
8081

8182
/**
82-
* {@inheritDoc}
83+
* Constructs a table descriptor instance
84+
* @param tableId Id of the table, it must confirm to pattern { @literal [\\d\\w-_]+ }
8385
*/
8486
public RemoteTableDescriptor(String tableId) {
8587
super(tableId);
8688
}
8789

90+
/**
91+
* Constructs a table descriptor instance
92+
* @param tableId Id of the table, it must confirm to pattern { @literal [\\d\\w-_]+ }
93+
* @param serde the serde for key and value
94+
*/
95+
public RemoteTableDescriptor(String tableId, KVSerde<K, V> serde) {
96+
super(tableId, serde);
97+
}
98+
8899
@Override
89100
public TableSpec getTableSpec() {
90101
validate();

samza-kv-inmemory/src/main/java/org/apache/samza/storage/kv/inmemory/InMemoryTableDescriptor.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.HashMap;
2222
import java.util.Map;
2323

24+
import org.apache.samza.serializers.KVSerde;
2425
import org.apache.samza.storage.kv.BaseLocalStoreBackedTableDescriptor;
2526
import org.apache.samza.table.TableSpec;
2627

@@ -34,12 +35,22 @@
3435
public class InMemoryTableDescriptor<K, V> extends BaseLocalStoreBackedTableDescriptor<K, V, InMemoryTableDescriptor<K, V>> {
3536

3637
/**
37-
* {@inheritDoc}
38+
* Constructs a table descriptor instance
39+
* @param tableId Id of the table, it must confirm to pattern { @literal [\\d\\w-_]+ }
3840
*/
3941
public InMemoryTableDescriptor(String tableId) {
4042
super(tableId);
4143
}
4244

45+
/**
46+
* Constructs a table descriptor instance
47+
* @param tableId Id of the table, it must confirm to pattern { @literal [\\d\\w-_]+ }
48+
* @param serde the serde for key and value
49+
*/
50+
public InMemoryTableDescriptor(String tableId, KVSerde<K, V> serde) {
51+
super(tableId, serde);
52+
}
53+
4354
@Override
4455
protected void generateTableSpecConfig(Map<String, String> tableSpecConfig) {
4556
super.generateTableSpecConfig(tableSpecConfig);

samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/TestInMemoryTableDescriptor.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,8 @@ public class TestInMemoryTableDescriptor {
3131
@Test
3232
public void testTableSpec() {
3333

34-
TableSpec tableSpec = new InMemoryTableDescriptor<Integer, String>("1")
35-
.withSerde(KVSerde.of(new IntegerSerde(), new StringSerde()))
34+
TableSpec tableSpec = new InMemoryTableDescriptor("1",
35+
KVSerde.of(new IntegerSerde(), new StringSerde()))
3636
.withConfig("inmemory.abc", "xyz")
3737
.getTableSpec();
3838

samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbTableDescriptor.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.HashMap;
2222
import java.util.Map;
2323

24+
import org.apache.samza.serializers.KVSerde;
2425
import org.apache.samza.table.TableSpec;
2526

2627

@@ -57,12 +58,22 @@ public class RocksDbTableDescriptor<K, V> extends BaseLocalStoreBackedTableDescr
5758
private String compactionStyle;
5859

5960
/**
60-
* {@inheritDoc}
61+
* Constructs a table descriptor instance
62+
* @param tableId Id of the table, it must confirm to pattern { @literal [\\d\\w-_]+ }
6163
*/
6264
public RocksDbTableDescriptor(String tableId) {
6365
super(tableId);
6466
}
6567

68+
/**
69+
* Constructs a table descriptor instance
70+
* @param tableId Id of the table, it must confirm to pattern { @literal [\\d\\w-_]+ }
71+
* @param serde the serde for key and value
72+
*/
73+
public RocksDbTableDescriptor(String tableId, KVSerde<K, V> serde) {
74+
super(tableId, serde);
75+
}
76+
6677
/**
6778
* Refer to <code>stores.store-name.write.batch.size</code> in Samza configuration guide
6879
* @param writeBatchSize write batch size

samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/TestRocksDbTableDescriptor.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,8 @@ public void testMinimal() {
3737

3838
@Test
3939
public void testSerde() {
40-
TableSpec tableSpec = new RocksDbTableDescriptor<Integer, String>("1")
41-
.withSerde(KVSerde.of(new IntegerSerde(), new StringSerde()))
40+
TableSpec tableSpec = new RocksDbTableDescriptor("1",
41+
KVSerde.of(new IntegerSerde(), new StringSerde()))
4242
.getTableSpec();
4343
Assert.assertNotNull(tableSpec.getSerde());
4444
Assert.assertEquals(tableSpec.getSerde().getKeySerde().getClass(), IntegerSerde.class);
@@ -48,8 +48,8 @@ public void testSerde() {
4848
@Test
4949
public void testTableSpec() {
5050

51-
TableSpec tableSpec = new RocksDbTableDescriptor<Integer, String>("1")
52-
.withSerde(KVSerde.of(new IntegerSerde(), new StringSerde()))
51+
TableSpec tableSpec = new RocksDbTableDescriptor<Integer, String>("1",
52+
KVSerde.of(new IntegerSerde(), new StringSerde()))
5353
.withBlockSize(1)
5454
.withCacheSize(2)
5555
.withCompactionStyle("fifo")
@@ -85,8 +85,7 @@ public void testTableSpec() {
8585
@Test
8686
public void testTableSpecWithChangelogEnabled() {
8787

88-
TableSpec tableSpec = new RocksDbTableDescriptor<Integer, String>("1")
89-
.withSerde(KVSerde.of(new IntegerSerde(), new StringSerde()))
88+
TableSpec tableSpec = new RocksDbTableDescriptor("1", KVSerde.of(new IntegerSerde(), new StringSerde()))
9089
.withChangelogStream("changelog-$tream")
9190
.withChangelogReplicationFactor(10)
9291
.getTableSpec();

samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableDescriptor.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.util.Map;
2525

2626
import org.apache.samza.operators.BaseTableDescriptor;
27+
import org.apache.samza.serializers.KVSerde;
2728
import org.apache.samza.storage.SideInputsProcessor;
2829

2930

@@ -49,13 +50,21 @@ abstract public class BaseLocalStoreBackedTableDescriptor<K, V, D extends BaseLo
4950

5051
/**
5152
* Constructs a table descriptor instance
52-
*
53-
* @param tableId Id of the table
53+
* @param tableId Id of the table, it must confirm to pattern { @literal [\\d\\w-_]+ }
5454
*/
5555
public BaseLocalStoreBackedTableDescriptor(String tableId) {
5656
super(tableId);
5757
}
5858

59+
/**
60+
* Constructs a table descriptor instance
61+
* @param tableId Id of the table, it must confirm to pattern { @literal [\\d\\w-_]+ }
62+
* @param serde the serde for key and value
63+
*/
64+
public BaseLocalStoreBackedTableDescriptor(String tableId, KVSerde<K, V> serde) {
65+
super(tableId, serde);
66+
}
67+
5968
public D withSideInputs(List<String> sideInputs) {
6069
this.sideInputs = sideInputs;
6170
// Disable changelog

samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedIOResolverFactory.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -100,10 +100,9 @@ private SqlIOConfig fetchSystemInfo(String name) {
100100

101101
TableDescriptor tableDescriptor = null;
102102
if (isTable) {
103-
tableDescriptor = new RocksDbTableDescriptor("InputTable-" + name)
104-
.withSerde(KVSerde.of(
105-
new JsonSerdeV2<>(SamzaSqlCompositeKey.class),
106-
new JsonSerdeV2<>(SamzaSqlRelMessage.class)));
103+
tableDescriptor = new RocksDbTableDescriptor("InputTable-" + name, KVSerde.of(
104+
new JsonSerdeV2<>(SamzaSqlCompositeKey.class),
105+
new JsonSerdeV2<>(SamzaSqlRelMessage.class)));
107106
}
108107

109108
return new SqlIOConfig(systemName, streamName, fetchSystemConfigs(systemName), tableDescriptor);

0 commit comments

Comments
 (0)