Skip to content

Commit 0c0904b

Browse files
atoomulasrinipunuru
authored andcommitted
SAMZA-1779: Fix SamzaSql AvroRelConverter to handle BYTES and FIXED avro schema types
Author: Aditya Toomula <[email protected]> Reviewers: Srinivasulu Punuru <[email protected]> Closes apache#575 from atoomula/bytes1 and squashes the following commits: 855a03d7 [Aditya Toomula] Fix SamzaSql AvroRelConverter to handle BYTES and FIXED avro schema types df4886d8 [Aditya Toomula] Fix SamzaSql AvroRelConverter to handle BYTES and FIXED avro schema types 80268fc1 [Aditya Toomula] Fix SamzaSql AvroRelConverter to handle BYTES and FIXED avro schema types
1 parent 025f617 commit 0c0904b

File tree

6 files changed

+45
-7
lines changed

6 files changed

+45
-7
lines changed

samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.apache.samza.sql.avro;
2121

22+
import java.nio.ByteBuffer;
2223
import java.util.ArrayList;
2324
import java.util.HashMap;
2425
import java.util.List;
@@ -29,6 +30,7 @@
2930
import org.apache.avro.generic.GenericData;
3031
import org.apache.avro.generic.GenericRecord;
3132
import org.apache.avro.generic.IndexedRecord;
33+
import org.apache.calcite.avatica.util.ByteString;
3234
import org.apache.samza.SamzaException;
3335
import org.apache.samza.config.Config;
3436
import org.apache.samza.operators.KV;
@@ -172,10 +174,12 @@ public Object convertToAvroObject(Object relObj, Schema schema) {
172174
getNonNullUnionSchema(schema).getValueType())));
173175
case UNION:
174176
return convertToAvroObject(relObj, getNonNullUnionSchema(schema));
175-
case FIXED:
176-
return new GenericData.Fixed(schema, ((String) relObj).getBytes());
177177
case ENUM:
178178
return new GenericData.EnumSymbol(schema, (String) relObj);
179+
case FIXED:
180+
return new GenericData.Fixed(schema, ((ByteString) relObj).getBytes());
181+
case BYTES:
182+
return ByteBuffer.wrap(((ByteString) relObj).getBytes());
179183
default:
180184
return relObj;
181185
}
@@ -218,8 +222,12 @@ public Object convertToJavaObject(Object avroObj, Schema schema) {
218222
case UNION:
219223
return convertToJavaObject(avroObj, getNonNullUnionSchema(schema));
220224
case ENUM:
221-
case FIXED:
222225
return avroObj.toString();
226+
case FIXED:
227+
GenericData.Fixed fixed = (GenericData.Fixed) avroObj;
228+
return new ByteString(fixed.bytes());
229+
case BYTES:
230+
return new ByteString(((ByteBuffer) avroObj).array());
223231

224232
default:
225233
return avroObj;

samza-sql/src/test/java/org/apache/samza/sql/avro/TestAvroRelConversion.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import org.apache.avro.io.EncoderFactory;
4444
import org.apache.avro.specific.SpecificDatumWriter;
4545
import org.apache.avro.util.Utf8;
46+
import org.apache.calcite.avatica.util.ByteString;
4647
import org.apache.calcite.rel.type.RelDataType;
4748
import org.apache.calcite.rel.type.RelRecordType;
4849
import org.apache.calcite.sql.type.SqlTypeName;
@@ -51,6 +52,7 @@
5152
import org.apache.samza.sql.avro.schemas.AddressRecord;
5253
import org.apache.samza.sql.avro.schemas.ComplexRecord;
5354
import org.apache.samza.sql.avro.schemas.Kind;
55+
import org.apache.samza.sql.avro.schemas.MyFixed;
5456
import org.apache.samza.sql.avro.schemas.PhoneNumber;
5557
import org.apache.samza.sql.avro.schemas.Profile;
5658
import org.apache.samza.sql.avro.schemas.SimpleRecord;
@@ -66,6 +68,9 @@
6668
public class TestAvroRelConversion {
6769

6870
private static final Logger LOG = LoggerFactory.getLogger(TestAvroRelConversion.class);
71+
private static final byte[] DEFAULT_TRACKING_ID_BYTES =
72+
{76, 75, -24, 10, 33, -117, 24, -52, -110, -39, -5, 102, 65, 57, -62, -1};
73+
6974
private final AvroRelConverter simpleRecordAvroRelConverter;
7075
private final AvroRelConverter complexRecordAvroRelConverter;
7176
private final AvroRelConverter nestedRecordAvroRelConverter;
@@ -79,6 +84,7 @@ public class TestAvroRelConversion {
7984
private float floatValue = 0.6f;
8085
private String testStrValue = "testString";
8186
private ByteBuffer testBytes = ByteBuffer.wrap("testBytes".getBytes());
87+
private MyFixed fixedBytes = new MyFixed();
8288
private long longValue = 200L;
8389

8490
private HashMap<String, String> mapValue = new HashMap<String, String>() {{
@@ -111,6 +117,8 @@ public TestAvroRelConversion() {
111117
complexRecordAvroRelConverter = new AvroRelConverter(ss1, complexRecordSchemaProvider, new MapConfig());
112118
simpleRecordAvroRelConverter = new AvroRelConverter(ss2, simpleRecordSchemaProvider, new MapConfig());
113119
nestedRecordAvroRelConverter = new AvroRelConverter(ss3, nestedRecordSchemaProvider, new MapConfig());
120+
121+
fixedBytes.bytes(DEFAULT_TRACKING_ID_BYTES);
114122
}
115123

116124
@Test
@@ -190,6 +198,7 @@ public void testComplexRecordConversion() throws IOException {
190198
record.put("float_value", floatValue);
191199
record.put("string_value", testStrValue);
192200
record.put("bytes_value", testBytes);
201+
record.put("fixed_value", fixedBytes);
193202
record.put("long_value", longValue);
194203
record.put("array_values", arrayValue);
195204
record.put("map_values", mapValue);
@@ -201,6 +210,7 @@ public void testComplexRecordConversion() throws IOException {
201210
complexRecord.float_value = floatValue;
202211
complexRecord.string_value = testStrValue;
203212
complexRecord.bytes_value = testBytes;
213+
complexRecord.fixed_value = fixedBytes;
204214
complexRecord.long_value = longValue;
205215
complexRecord.array_values = new ArrayList<>();
206216
complexRecord.array_values.addAll(arrayValue);
@@ -304,7 +314,12 @@ private void validateAvroSerializedData(byte[] serializedData) throws IOExceptio
304314
.collect(Collectors.toMap(x -> new Utf8(x.getKey()), y -> new Utf8(y.getValue())))
305315
.equals(message.getSamzaSqlRelRecord().getField("map_values").get()));
306316

307-
Assert.assertTrue(message.getSamzaSqlRelRecord().getField("bytes_value").get().equals(testBytes));
317+
Assert.assertTrue(
318+
Arrays.equals(((ByteString) message.getSamzaSqlRelRecord().getField("bytes_value").get()).getBytes(),
319+
testBytes.array()));
320+
Assert.assertTrue(
321+
Arrays.equals(((ByteString) message.getSamzaSqlRelRecord().getField("fixed_value").get()).getBytes(),
322+
DEFAULT_TRACKING_ID_BYTES));
308323

309324
LOG.info(Joiner.on(",").useForNull("null").join(message.getSamzaSqlRelRecord().getFieldValues()));
310325
LOG.info(Joiner.on(",").join(message.getSamzaSqlRelRecord().getFieldNames()));

samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/ComplexRecord.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626

2727
@SuppressWarnings("all")
2828
public class ComplexRecord extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
29-
public static final org.apache.avro.Schema SCHEMA$ = org.apache.avro.Schema.parse("{\"type\":\"record\",\"name\":\"ComplexRecord\",\"namespace\":\"org.apache.samza.sql.system.avro\",\"fields\":[{\"name\":\"id\",\"type\":[\"null\",\"int\"],\"doc\":\"Record id.\",\"default\":null},{\"name\":\"bool_value\",\"type\":[\"null\",\"boolean\"],\"doc\":\"Boolean Value.\",\"default\":null},{\"name\":\"double_value\",\"type\":[\"null\",\"double\"],\"doc\":\"double Value.\",\"default\":null},{\"name\":\"float_value\",\"type\":[\"null\",\"float\"],\"doc\":\"float Value.\",\"default\":null},{\"name\":\"string_value\",\"type\":[\"null\",\"string\"],\"doc\":\"string Value.\",\"default\":null},{\"name\":\"bytes_value\",\"type\":[\"null\",\"bytes\"],\"doc\":\"bytes Value.\",\"default\":null},{\"name\":\"long_value\",\"type\":[\"null\",\"long\"],\"doc\":\"long Value.\",\"default\":null},{\"name\":\"fixed_value\",\"type\":[\"null\",{\"type\":\"fixed\",\"name\":\"MyFixed\",\"size\":16}],\"doc\":\"fixed Value.\"},{\"name\":\"array_values\",\"type\":[\"null\",{\"type\":\"array\",\"items\":\"string\"}],\"doc\":\"array values in the record.\",\"default\":[]},{\"name\":\"map_values\",\"type\":[\"null\",{\"type\":\"map\",\"values\":\"string\"}],\"doc\":\"map values in the record.\",\"default\":[]},{\"name\":\"enum_value\",\"type\":[\"null\",{\"type\":\"enum\",\"name\":\"TestEnumType\",\"symbols\":[\"foo\",\"bar\"]}],\"doc\":\"enum value.\",\"default\":[]},{\"name\":\"array_records\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"SubRecord\",\"fields\":[{\"name\":\"id\",\"type\":[\"null\",\"int\"],\"doc\":\"sub record id\"},{\"name\":\"sub_values\",\"type\":{\"type\":\"array\",\"items\":\"string\"},\"doc\":\"Sub record \"}]}],\"doc\":\"array of records.\",\"default\":[]}]}");
29+
public static final org.apache.avro.Schema SCHEMA$ = org.apache.avro.Schema.parse("{\"type\":\"record\",\"name\":\"ComplexRecord\",\"namespace\":\"org.apache.samza.sql.avro.schemas\",\"fields\":[{\"name\":\"id\",\"type\":[\"null\",\"int\"],\"doc\":\"Record id.\",\"default\":null},{\"name\":\"bool_value\",\"type\":[\"null\",\"boolean\"],\"doc\":\"Boolean Value.\",\"default\":null},{\"name\":\"double_value\",\"type\":[\"null\",\"double\"],\"doc\":\"double Value.\",\"default\":null},{\"name\":\"float_value\",\"type\":[\"null\",\"float\"],\"doc\":\"float Value.\",\"default\":null},{\"name\":\"string_value\",\"type\":[\"null\",\"string\"],\"doc\":\"string Value.\",\"default\":null},{\"name\":\"bytes_value\",\"type\":[\"null\",\"bytes\"],\"doc\":\"bytes Value.\",\"default\":null},{\"name\":\"long_value\",\"type\":[\"null\",\"long\"],\"doc\":\"long Value.\",\"default\":null},{\"name\":\"fixed_value\",\"type\":[\"null\",{\"type\":\"fixed\",\"name\":\"MyFixed\",\"size\":16}],\"doc\":\"fixed Value.\"},{\"name\":\"array_values\",\"type\":[\"null\",{\"type\":\"array\",\"items\":\"string\"}],\"doc\":\"array values in the record.\",\"default\":[]},{\"name\":\"map_values\",\"type\":[\"null\",{\"type\":\"map\",\"values\":\"string\"}],\"doc\":\"map values in the record.\",\"default\":[]},{\"name\":\"enum_value\",\"type\":[\"null\",{\"type\":\"enum\",\"name\":\"TestEnumType\",\"symbols\":[\"foo\",\"bar\"]}],\"doc\":\"enum value.\",\"default\":[]},{\"name\":\"array_records\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"SubRecord\",\"fields\":[{\"name\":\"id\",\"type\":[\"null\",\"int\"],\"doc\":\"sub record id\"},{\"name\":\"sub_values\",\"type\":{\"type\":\"array\",\"items\":\"string\"},\"doc\":\"Sub record \"}]}],\"doc\":\"array of records.\",\"default\":[]}]}");
3030
/** Record id. */
3131
public java.lang.Integer id;
3232
/** Boolean Value. */

samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/MyFixed.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,4 +26,7 @@
2626

2727
@SuppressWarnings("all")
2828
@org.apache.avro.specific.FixedSize(16)
29-
public class MyFixed extends org.apache.avro.specific.SpecificFixed {}
29+
public class MyFixed extends org.apache.avro.specific.SpecificFixed {
30+
public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"fixed\",\"name\":\"MyFixed\",\"namespace\":\"org.apache.samza.sql.avro.schemas\",\"size\":16}");
31+
public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
32+
}

samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.apache.samza.sql.system;
2121

22+
import java.nio.ByteBuffer;
2223
import java.util.ArrayList;
2324
import java.util.HashMap;
2425
import java.util.HashSet;
@@ -30,12 +31,14 @@
3031

3132
import org.apache.avro.generic.GenericData;
3233
import org.apache.avro.generic.GenericRecord;
34+
import org.apache.calcite.avatica.util.ByteString;
3335
import org.apache.samza.config.Config;
3436
import org.apache.samza.metrics.MetricsRegistry;
3537
import org.apache.samza.sql.avro.schemas.AddressRecord;
3638
import org.apache.samza.sql.avro.schemas.Company;
3739
import org.apache.samza.sql.avro.schemas.ComplexRecord;
3840
import org.apache.samza.sql.avro.schemas.Kind;
41+
import org.apache.samza.sql.avro.schemas.MyFixed;
3942
import org.apache.samza.sql.avro.schemas.PageView;
4043
import org.apache.samza.sql.avro.schemas.PhoneNumber;
4144
import org.apache.samza.sql.avro.schemas.Profile;
@@ -67,6 +70,8 @@ public class TestAvroSystemFactory implements SystemFactory {
6770
"444-444-4444", "555-555-5555"};
6871
public static final String[] companies = {"MSFT", "LKND", "GOOG", "FB", "AMZN", "CSCO"};
6972
public static final String[] pageKeys = {"inbox", "home", "search", "pymk", "group", "job"};
73+
public static final byte[] DEFAULT_TRACKING_ID_BYTES =
74+
{76, 75, -24, 10, 33, -117, 24, -52, -110, -39, -5, 102, 65, 57, -62, -1};
7075

7176
public static List<OutgoingMessageEnvelope> messages = new ArrayList<>();
7277

@@ -291,6 +296,10 @@ private Object createComplexRecord(int index) {
291296
GenericRecord record = new GenericData.Record(ComplexRecord.SCHEMA$);
292297
record.put("id", index);
293298
record.put("string_value", "Name" + index);
299+
record.put("bytes_value", ByteBuffer.wrap(("sample bytes").getBytes()));
300+
MyFixed myFixedVar = new MyFixed();
301+
myFixedVar.bytes(DEFAULT_TRACKING_ID_BYTES);
302+
record.put("fixed_value", myFixedVar);
294303
GenericData.Array<String> arrayValues =
295304
new GenericData.Array<>(index, ComplexRecord.SCHEMA$.getField("array_values").schema().getTypes().get(1));
296305
arrayValues.addAll(IntStream.range(0, index).mapToObj(String::valueOf).collect(Collectors.toList()));

samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,9 +94,12 @@ public void testEndToEndFlatten() throws Exception {
9494
int numMessages = 20;
9595
TestAvroSystemFactory.messages.clear();
9696
Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
97+
9798
LOG.info(" Class Path : " + RelOptUtil.class.getProtectionDomain().getCodeSource().getLocation().toURI().getPath());
9899
String sql1 =
99-
"Insert into testavro.outputTopic select Flatten(array_values) as string_value, id from testavro.COMPLEX1";
100+
"Insert into testavro.outputTopic "
101+
+ "select Flatten(array_values) as string_value, id, bytes_value, fixed_value "
102+
+ "from testavro.COMPLEX1";
100103
List<String> sqlStmts = Collections.singletonList(sql1);
101104
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
102105
SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs));

0 commit comments

Comments
 (0)