New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Draft pr for initial review of apicurio #24715
base: master
Are you sure you want to change the base?
Conversation
Signed-off-by: David Radley <[email protected]>
Signed-off-by: David Radley <[email protected]>
Signed-off-by: David Radley <[email protected]>
Signed-off-by: David Radley <[email protected]>
Signed-off-by: David Radley <[email protected]>
Signed-off-by: David Radley <[email protected]>
Signed-off-by: David Radley <[email protected]>
Signed-off-by: David Radley <[email protected]>
Signed-off-by: David Radley <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly cosmetic tidy ups
Optional<String> schemaString = formatOptions.getOptional(SCHEMA); | ||
Map<String, ?> optionalPropertiesMap = buildOptionalPropertiesMap(formatOptions); | ||
return new ProjectableDecodingFormat<DeserializationSchema<RowData>>() { | ||
// -------------------------------------------------------------------------------------------- |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comments in this method don't add very much and could probably be removed
LogicalType convertedDataType = | ||
AvroSchemaConverter.convertToDataType(schemaString).getLogicalType(); | ||
|
||
if (convertedDataType.isNullable()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you make convertedDataType
non-nullable because rowType
is always non-nullable, and the .equals()
below won't work otherwise? A comment here could be helpful.
@Override | ||
public Schema readSchema(InputStream in) throws IOException { | ||
throw new IOException( | ||
"Incompatible Kafka connector. Use a Kafka connector that passes headers"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the connection between readSchema()
and 'a Kafka connector that passes headers' ?
public long bytesToLong(byte[] bytes) { | ||
ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES); | ||
buffer.put(bytes); | ||
buffer.flip(); // need flip |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// need flip
could be elaborated
long schemaId = getSchemaId(in, additionalParameters, useGlobalId); | ||
|
||
if (!useGlobalId) { | ||
// if we are using the content ID there is not get that will dereference. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo in this comment block, 'there is not get'
// defined via general property map | ||
properties.put("schema", SCHEMA_STRING); | ||
|
||
return getModifiedOptions( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could the formatting of this block be condensed?
SerializationSchema<RowData> actualSer = | ||
sinkMock.valueFormat.createRuntimeEncoder(null, SCHEMA.toPhysicalRowDataType()); | ||
|
||
// assertThat(actualSer).isEqualTo(expectedSer); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
commented out code
sinkMock.valueFormat.createRuntimeEncoder(null, SCHEMA.toPhysicalRowDataType()); | ||
|
||
// assertThat(actualSer).isEqualTo(expectedSer); | ||
System.err.println("expectedSer:" + expectedSer + "\nactualSer:" + actualSer); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
System.err ?
return nestedSchema.serialize(record); | ||
} catch (Exception e) { | ||
throw new RuntimeException("Failed to serialize row.", e); | ||
throw new RuntimeException("Failed to serialize row1. record=" + record, e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
row1 ?
+ inputAdditionalProperties | ||
+ ",out" | ||
+ outputAdditionalProperties); | ||
// schemaCoder.writeSchema(getSchema(), outputStream); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
commented out code
@mnuttall thank you very much for the review I will have a look. |
Early code for review of Apicurio Avro format prototyping