Skip to content

Commit 520da15

Browse files
weisong44xinyuiscool
authored andcommitted
SAMZA-1964: Make getTableSpec() in RemoteTableDescriptor reentrant
As per subject Author: Wei Song <[email protected]> Reviewers: Xinyu Liu <[email protected]> Closes apache#747 from weisong44/SAMZA-1964
1 parent 8242fab commit 520da15

File tree

4 files changed

+18
-11
lines changed

4 files changed

+18
-11
lines changed

samza-core/src/main/java/org/apache/samza/table/remote/descriptors/RemoteTableDescriptor.java

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,7 @@ public class RemoteTableDescriptor<K, V> extends BaseTableDescriptor<K, V, Remot
6565
// Output support for a specific remote store (optional)
6666
private TableWriteFunction<K, V> writeFn;
6767

68-
// Rate limiter for client-side throttling;
69-
// can either be constructed indirectly from rates or overridden by withRateLimiter()
68+
// Rate limiter for client-side throttling; it is set by withRateLimiter()
7069
private RateLimiter rateLimiter;
7170

7271
// Rates for constructing the default rate limiter when they are non-zero
@@ -113,21 +112,19 @@ public TableSpec getTableSpec() {
113112
tableSpecConfig.put(RemoteTableProvider.WRITE_FN, SerdeUtils.serialize("write function", writeFn));
114113
}
115114

116-
// Serialize the rate limiter if specified
117115
if (!tagCreditsMap.isEmpty()) {
118-
rateLimiter = new EmbeddedTaggedRateLimiter(tagCreditsMap);
119-
}
120-
121-
if (rateLimiter != null) {
116+
tableSpecConfig.put(RemoteTableProvider.RATE_LIMITER, SerdeUtils.serialize("rate limiter",
117+
new EmbeddedTaggedRateLimiter(tagCreditsMap)));
118+
} else if (rateLimiter != null) {
122119
tableSpecConfig.put(RemoteTableProvider.RATE_LIMITER, SerdeUtils.serialize("rate limiter", rateLimiter));
123120
}
124121

125-
// Serialize the readCredit and writeCredit functions
122+
// Serialize the readCredit functions
126123
if (readCreditFn != null) {
127124
tableSpecConfig.put(RemoteTableProvider.READ_CREDIT_FN, SerdeUtils.serialize(
128125
"read credit function", readCreditFn));
129126
}
130-
127+
// Serialize the writeCredit functions
131128
if (writeCreditFn != null) {
132129
tableSpecConfig.put(RemoteTableProvider.WRITE_CREDIT_FN, SerdeUtils.serialize(
133130
"write credit function", writeCreditFn));

samza-core/src/test/java/org/apache/samza/table/remote/descriptors/TestRemoteTableDescriptor.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,7 @@ private void doTestDeserializeReadFunctionAndLimiter(boolean rateOnly, boolean r
197197
}
198198

199199
TableSpec spec = desc.getTableSpec();
200+
spec = desc.getTableSpec();
200201
RemoteTableProvider provider = new RemoteTableProvider(spec);
201202
provider.init(createMockContext());
202203
Table table = provider.getTable();

samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/descriptors/TestInMemoryTableDescriptor.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,13 @@
2828

2929

3030
public class TestInMemoryTableDescriptor {
31+
@Test
32+
public void testMinimal() {
33+
InMemoryTableDescriptor tableDescriptor = new InMemoryTableDescriptor("1");
34+
tableDescriptor.getTableSpec();
35+
tableDescriptor.getTableSpec();
36+
}
37+
3138
@Test
3239
public void testTableSpec() {
3340

samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/descriptors/TestRocksDbTableDescriptor.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,10 @@ public class TestRocksDbTableDescriptor {
3030

3131
@Test
3232
public void testMinimal() {
33-
new RocksDbTableDescriptor<Integer, String>("1")
34-
.validate();
33+
RocksDbTableDescriptor tableDescriptor = new RocksDbTableDescriptor("1");
34+
tableDescriptor.validate();
35+
tableDescriptor.getTableSpec();
36+
tableDescriptor.getTableSpec();
3537
}
3638

3739
@Test

0 commit comments

Comments
 (0)