Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft pr for initial review of apicurio #24715

Draft
wants to merge 9 commits into
base: master
Choose a base branch
from

Conversation

davidradl
Copy link

@davidradl davidradl commented Apr 24, 2024

Early code for review of Apicurio Avro format prototyping

@flinkbot
Copy link
Collaborator

flinkbot commented Apr 24, 2024

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

Signed-off-by: David Radley <[email protected]>
Signed-off-by: David Radley <[email protected]>
Signed-off-by: David Radley <[email protected]>
Signed-off-by: David Radley <[email protected]>
Signed-off-by: David Radley <[email protected]>
Signed-off-by: David Radley <[email protected]>
Signed-off-by: David Radley <[email protected]>
Signed-off-by: David Radley <[email protected]>
Signed-off-by: David Radley <[email protected]>
Copy link

@mnuttall mnuttall left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly cosmetic tidy ups

Optional<String> schemaString = formatOptions.getOptional(SCHEMA);
Map<String, ?> optionalPropertiesMap = buildOptionalPropertiesMap(formatOptions);
return new ProjectableDecodingFormat<DeserializationSchema<RowData>>() {
// --------------------------------------------------------------------------------------------

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comments in this method don't add very much and could probably be removed

LogicalType convertedDataType =
AvroSchemaConverter.convertToDataType(schemaString).getLogicalType();

if (convertedDataType.isNullable()) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you make convertedDataType non-nullable because rowType is always non-nullable, and the .equals() below won't work otherwise? A comment here could be helpful.

@Override
public Schema readSchema(InputStream in) throws IOException {
throw new IOException(
"Incompatible Kafka connector. Use a Kafka connector that passes headers");

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the connection between readSchema() and 'a Kafka connector that passes headers' ?

public long bytesToLong(byte[] bytes) {
ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
buffer.put(bytes);
buffer.flip(); // need flip

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// need flip could be elaborated

long schemaId = getSchemaId(in, additionalParameters, useGlobalId);

if (!useGlobalId) {
// if we are using the content ID there is not get that will dereference.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo in this comment block, 'there is not get'

// defined via general property map
properties.put("schema", SCHEMA_STRING);

return getModifiedOptions(

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could the formatting of this block be condensed?

SerializationSchema<RowData> actualSer =
sinkMock.valueFormat.createRuntimeEncoder(null, SCHEMA.toPhysicalRowDataType());

// assertThat(actualSer).isEqualTo(expectedSer);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

commented out code

sinkMock.valueFormat.createRuntimeEncoder(null, SCHEMA.toPhysicalRowDataType());

// assertThat(actualSer).isEqualTo(expectedSer);
System.err.println("expectedSer:" + expectedSer + "\nactualSer:" + actualSer);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

System.err ?

return nestedSchema.serialize(record);
} catch (Exception e) {
throw new RuntimeException("Failed to serialize row.", e);
throw new RuntimeException("Failed to serialize row1. record=" + record, e);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

row1 ?

+ inputAdditionalProperties
+ ",out"
+ outputAdditionalProperties);
// schemaCoder.writeSchema(getSchema(), outputStream);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

commented out code

@davidradl
Copy link
Author

@mnuttall thank you very much for the review I will have a look.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
3 participants