Skip to content

Commit

Permalink
Revert "Skip field-id re-assignment during table creation (#247)" (#272)
Browse files Browse the repository at this point in the history
This reverts commit 6d69b6c.

## Summary

<!--- HINT: Replace #nnn with corresponding Issue number, if you are
fixing an existing issue -->

[Issue](https://github.com/linkedin/openhouse/issues/#nnn)] Briefly
discuss the summary of the changes made in this
pull request in 2-3 lines.

## Changes

- [ ] Client-facing API Changes
- [ ] Internal API Changes
- [ ] Bug Fixes
- [X] New Features
- [ ] Performance Improvements
- [ ] Code Style
- [ ] Refactoring
- [ ] Documentation
- [ ] Tests

For all the boxes checked, please include additional details of the
changes made in this pull request.

## Testing Done
<!--- Check any relevant boxes with "x" -->

- [ ] Manually Tested on local docker setup. Please include commands
ran, and their output.
- [ ] Added new tests for the changes made.
- [ ] Updated existing tests to reflect the changes made.
- [ ] No tests added or updated. Please explain why. If unsure, please
feel free to ask for help.
- [X] Some other form of testing like staging or soak time in
production. Please explain.

For all the boxes checked, include a detailed description of the testing
done for the changes made in this pull request.

# Additional Information

- [ ] Breaking Changes
- [ ] Deprecations
- [ ] Large PR broken into smaller PRs, and PR plan linked in the
description.

For all the boxes checked, include additional details of the changes
made in this pull request.
  • Loading branch information
HotSushi authored Dec 17, 2024
1 parent eaf36db commit 0ad93ba
Show file tree
Hide file tree
Showing 4 changed files with 0 additions and 386 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ public final class CatalogConstants {

static final String FEATURE_TOGGLE_STOP_CREATE = "stop_create";

static final String CLIENT_TABLE_SCHEMA = "client.table.schema";

private CatalogConstants() {
// Noop
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,25 +30,16 @@
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.iceberg.BaseMetastoreTableOperations;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.SortDirection;
import org.apache.iceberg.SortField;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableMetadataParser;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.BadRequestException;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.expressions.Term;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.base.Objects;
import org.springframework.data.util.Pair;
Expand Down Expand Up @@ -177,26 +168,6 @@ public void commit(TableMetadata base, TableMetadata metadata) {
@SuppressWarnings("checkstyle:MissingSwitchDefault")
@Override
protected void doCommit(TableMetadata base, TableMetadata metadata) {

/**
* During table creation, the table metadata object that arrives here has the field-ids
* reassigned from the client supplied schema.This code block creates a new table metadata
* object using the client supplied schema by preserving its field-ids.
*/
if (base == null && metadata.properties().get(CatalogConstants.CLIENT_TABLE_SCHEMA) != null) {
Schema clientSchema =
SchemaParser.fromJson(metadata.properties().get(CatalogConstants.CLIENT_TABLE_SCHEMA));
metadata =
TableMetadata.buildFromEmpty()
.setLocation(metadata.location())
.setCurrentSchema(clientSchema, metadata.lastColumnId())
.addPartitionSpec(
rebuildPartitionSpec(metadata.spec(), metadata.schema(), clientSchema))
.addSortOrder(rebuildSortOrder(metadata.sortOrder(), clientSchema))
.setProperties(metadata.properties())
.build();
}

int version = currentVersion() + 1;
CommitStatus commitStatus = CommitStatus.FAILURE;

Expand Down Expand Up @@ -311,103 +282,6 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) {
}
}

/**
* Build a new partition spec with new schema from original pspec. The new pspec has the same
* partition fields as the original pspec with source ids from the new schema
*
* @param originalPspec
* @param originalSchema
* @param newSchema
* @return new partition spec
*/
static PartitionSpec rebuildPartitionSpec(
PartitionSpec originalPspec, Schema originalSchema, Schema newSchema) {
PartitionSpec.Builder builder = PartitionSpec.builderFor(newSchema);

for (PartitionField field : originalPspec.fields()) {
// get field name from original schema using source id of partition field
// because Pspec appends _bucket and _trunc to field name for bucket and truncate fields
String fieldName = originalSchema.findField(field.sourceId()).name();
// Check if the partition field is present in new schema
if (newSchema.findField(fieldName) == null) {
throw new IllegalArgumentException(
"Field " + fieldName + " does not exist in the new schema");
}
// build the pspec from transform string representation
buildPspecFromTransform(builder, field, fieldName);
}

return builder.build();
}

static void buildPspecFromTransform(
PartitionSpec.Builder builder, PartitionField field, String fieldName) {
// Recreate the transform using the string representation
String transformString = field.transform().toString();

// Add the field to the new PartitionSpec based on the transform type
if ("identity".equalsIgnoreCase(transformString)) {
builder.identity(fieldName);
} else if (transformString.startsWith("bucket[")) {
// Extract bucket number from the string (e.g., bucket[16])
int numBuckets =
Integer.parseInt(
transformString.substring(
transformString.indexOf('[') + 1, transformString.indexOf(']')));
builder.bucket(fieldName, numBuckets);
} else if (transformString.startsWith("truncate[")) {
// Extract width from the string (e.g., truncate[10])
int width =
Integer.parseInt(
transformString.substring(
transformString.indexOf('[') + 1, transformString.indexOf(']')));
builder.truncate(fieldName, width);
} else if ("year".equalsIgnoreCase(transformString)) {
builder.year(fieldName);
} else if ("month".equalsIgnoreCase(transformString)) {
builder.month(fieldName);
} else if ("day".equalsIgnoreCase(transformString)) {
builder.day(fieldName);
} else if ("hour".equalsIgnoreCase(transformString)) {
builder.hour(fieldName);
} else {
throw new UnsupportedOperationException("Unsupported transform: " + transformString);
}
}

/**
* Build a new sort order with new schema from original sort order. The new sort order has the
* same fields as the original sort order with source ids from the new schema
*
* @param originalSortOrder
* @param newSchema
* @return new SortOrder
*/
static SortOrder rebuildSortOrder(SortOrder originalSortOrder, Schema newSchema) {
SortOrder.Builder builder = SortOrder.builderFor(newSchema);

for (SortField field : originalSortOrder.fields()) {
// Find the field name in the original schema based on the sourceId
String fieldName = originalSortOrder.schema().findField(field.sourceId()).name();
// Check if the sortorder field is present in new schema
if (newSchema.findField(fieldName) == null) {
throw new IllegalArgumentException(
"Field " + fieldName + " does not exist in the new schema");
}
// Create a new SortField with the updated sourceId and original direction and null order
Term term = Expressions.ref(fieldName);

// Apply sort direction and null ordering with the updated sourceId
if (field.direction() == SortDirection.ASC) {
builder.asc(term, field.nullOrder());
} else {
builder.desc(term, field.nullOrder());
}
}

return builder.build();
}

/**
* If this commit comes from Iceberg built-in retry in
* org.apache.iceberg.PropertiesUpdate#commit() Then throw fatal {@link CommitFailedException} to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.SortDirection;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableMetadataParser;
import org.apache.iceberg.catalog.TableIdentifier;
Expand Down Expand Up @@ -634,191 +632,4 @@ void testDoCommitDeleteLastStagedSnapshotWhenNoRefs() throws IOException {
Mockito.verify(mockHouseTableRepository, Mockito.times(1)).save(Mockito.eq(mockHouseTable));
}
}

@Test
void testRebuildPartitionSpecUnpartitioned() {
Schema originalSchema =
new Schema(Types.NestedField.optional(1, "field1", Types.StringType.get()));

PartitionSpec originalSpec = PartitionSpec.unpartitioned();
PartitionSpec rebuiltSpec =
OpenHouseInternalTableOperations.rebuildPartitionSpec(
originalSpec, originalSchema, originalSchema);

Assertions.assertNotNull(rebuiltSpec);
Assertions.assertTrue(rebuiltSpec.isUnpartitioned());
}

@Test
void testRebuildPartitionSpec_NewSchemaSameFieldIds() {
Schema originalSchema =
new Schema(
Types.NestedField.optional(1, "field1", Types.StringType.get()),
Types.NestedField.optional(2, "field2", Types.IntegerType.get()),
Types.NestedField.optional(3, "field3", Types.LongType.get()),
Types.NestedField.optional(4, "field4", Types.LongType.get()));

PartitionSpec originalSpec =
PartitionSpec.builderFor(originalSchema)
.identity("field1")
.bucket("field2", 10)
.truncate("field3", 20)
.build();

PartitionSpec rebuiltSpec =
OpenHouseInternalTableOperations.rebuildPartitionSpec(
originalSpec, originalSchema, originalSchema);

Assertions.assertNotNull(rebuiltSpec);
Assertions.assertEquals(0, rebuiltSpec.specId());
Assertions.assertEquals(3, rebuiltSpec.fields().size());
Assertions.assertEquals("field1", rebuiltSpec.fields().get(0).name());
Assertions.assertEquals("identity", rebuiltSpec.fields().get(0).transform().toString());
// field id in table schema should match sourceid in partition spec
Assertions.assertEquals(1, rebuiltSpec.fields().get(0).sourceId());
// Iceberg internally appends _bucket to partition field name
Assertions.assertEquals("field2_bucket", rebuiltSpec.fields().get(1).name());
Assertions.assertEquals("bucket[10]", rebuiltSpec.fields().get(1).transform().toString());
Assertions.assertEquals(2, rebuiltSpec.fields().get(1).sourceId());
// Iceberg internally appends _trunc to partition field name
Assertions.assertEquals("field3_trunc", rebuiltSpec.fields().get(2).name());
Assertions.assertEquals("truncate[20]", rebuiltSpec.fields().get(2).transform().toString());
Assertions.assertEquals(3, rebuiltSpec.fields().get(2).sourceId());
}

@Test
void testRebuildPartitionSpec_NewSchemaDifferentFieldIds() {
Schema originalSchema =
new Schema(
Types.NestedField.optional(1, "field1", Types.StringType.get()),
Types.NestedField.optional(2, "field2", Types.IntegerType.get()),
Types.NestedField.optional(3, "field3", Types.LongType.get()),
Types.NestedField.optional(4, "field4", Types.LongType.get()));

PartitionSpec originalSpec =
PartitionSpec.builderFor(originalSchema)
.identity("field1")
.bucket("field2", 10)
.truncate("field3", 20)
.build();

// field2 and field3 have different fieldids compared to original schema
Schema newSchema =
new Schema(
Types.NestedField.optional(1, "field1", Types.StringType.get()),
Types.NestedField.optional(3, "field2", Types.IntegerType.get()),
Types.NestedField.optional(2, "field3", Types.LongType.get()),
Types.NestedField.optional(4, "field4", Types.LongType.get()));

PartitionSpec rebuiltSpec =
OpenHouseInternalTableOperations.rebuildPartitionSpec(
originalSpec, originalSchema, newSchema);

Assertions.assertNotNull(rebuiltSpec);
Assertions.assertEquals(0, rebuiltSpec.specId());
Assertions.assertEquals(3, rebuiltSpec.fields().size());
Assertions.assertEquals("field1", rebuiltSpec.fields().get(0).name());
Assertions.assertEquals("identity", rebuiltSpec.fields().get(0).transform().toString());
// field id in table schema should match sourceid in partition spec
Assertions.assertEquals(1, rebuiltSpec.fields().get(0).sourceId());
// Iceberg internally appends _bucket to partition field name
Assertions.assertEquals("field2_bucket", rebuiltSpec.fields().get(1).name());
Assertions.assertEquals("bucket[10]", rebuiltSpec.fields().get(1).transform().toString());
Assertions.assertEquals(3, rebuiltSpec.fields().get(1).sourceId());
// Iceberg internally appends _trunc to partition field name
Assertions.assertEquals("field3_trunc", rebuiltSpec.fields().get(2).name());
Assertions.assertEquals("truncate[20]", rebuiltSpec.fields().get(2).transform().toString());
Assertions.assertEquals(2, rebuiltSpec.fields().get(2).sourceId());
}

@Test
void testRebuildPartitionSpec_fieldMissingInNewSchema() {
Schema originalSchema =
new Schema(Types.NestedField.optional(1, "field1", Types.StringType.get()));

PartitionSpec originalSpec =
PartitionSpec.builderFor(originalSchema).identity("field1").build();

Schema newSchema = new Schema(Types.NestedField.optional(2, "field2", Types.IntegerType.get()));

IllegalArgumentException exception =
Assertions.assertThrows(
IllegalArgumentException.class,
() ->
OpenHouseInternalTableOperations.rebuildPartitionSpec(
originalSpec, originalSchema, newSchema));

Assertions.assertEquals(
"Field field1 does not exist in the new schema", exception.getMessage());
}

@Test
void testRebuildSortOrder_NewSchemaSameFieldIds() {
Schema originalSchema =
new Schema(
Types.NestedField.optional(1, "field1", Types.StringType.get()),
Types.NestedField.optional(2, "field2", Types.IntegerType.get()));

SortOrder originalSortOrder =
SortOrder.builderFor(originalSchema).asc("field1").desc("field2").build();

Schema newSchema =
new Schema(
Types.NestedField.optional(1, "field1", Types.StringType.get()),
Types.NestedField.optional(2, "field2", Types.IntegerType.get()));

SortOrder rebuiltSortOrder =
OpenHouseInternalTableOperations.rebuildSortOrder(originalSortOrder, newSchema);

Assertions.assertNotNull(rebuiltSortOrder);
Assertions.assertEquals(2, rebuiltSortOrder.fields().size());
Assertions.assertEquals(SortDirection.ASC, rebuiltSortOrder.fields().get(0).direction());
Assertions.assertEquals(1, rebuiltSortOrder.fields().get(0).sourceId());
Assertions.assertEquals(SortDirection.DESC, rebuiltSortOrder.fields().get(1).direction());
Assertions.assertEquals(2, rebuiltSortOrder.fields().get(1).sourceId());
}

@Test
void testRebuildSortOrder_NewSchemaDifferentFieldIds() {
Schema originalSchema =
new Schema(
Types.NestedField.optional(1, "field1", Types.StringType.get()),
Types.NestedField.optional(2, "field2", Types.IntegerType.get()));

SortOrder originalSortOrder =
SortOrder.builderFor(originalSchema).asc("field1").desc("field2").build();

Schema newSchema =
new Schema(
Types.NestedField.optional(2, "field1", Types.StringType.get()),
Types.NestedField.optional(1, "field2", Types.IntegerType.get()));

SortOrder rebuiltSortOrder =
OpenHouseInternalTableOperations.rebuildSortOrder(originalSortOrder, newSchema);

Assertions.assertNotNull(rebuiltSortOrder);
Assertions.assertEquals(2, rebuiltSortOrder.fields().size());
Assertions.assertEquals(SortDirection.ASC, rebuiltSortOrder.fields().get(0).direction());
Assertions.assertEquals(2, rebuiltSortOrder.fields().get(0).sourceId());
Assertions.assertEquals(SortDirection.DESC, rebuiltSortOrder.fields().get(1).direction());
Assertions.assertEquals(1, rebuiltSortOrder.fields().get(1).sourceId());
}

@Test
void testRebuildSortOrder_fieldMissingInNewSchema() {
Schema originalSchema =
new Schema(Types.NestedField.optional(1, "field1", Types.StringType.get()));

SortOrder originalSortOrder = SortOrder.builderFor(originalSchema).asc("field1").build();

Schema newSchema = new Schema(Types.NestedField.optional(2, "field2", Types.IntegerType.get()));

IllegalArgumentException exception =
Assertions.assertThrows(
IllegalArgumentException.class,
() -> OpenHouseInternalTableOperations.rebuildSortOrder(originalSortOrder, newSchema));

Assertions.assertEquals(
"Field field1 does not exist in the new schema", exception.getMessage());
}
}
Loading

0 comments on commit 0ad93ba

Please sign in to comment.