Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Destination S3 Data Lake: Extract generic iceberg stuff to toolkit #53700

Merged
merged 8 commits into from
Feb 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ dependencies {
implementation project(':airbyte-cdk:bulk:core:bulk-cdk-core-load')
api "org.apache.iceberg:iceberg-core:${project.ext.apacheIcebergVersion}"
api "org.apache.iceberg:iceberg-api:${project.ext.apacheIcebergVersion}"
api("org.apache.iceberg:iceberg-data:${project.ext.apacheIcebergVersion}")
api "org.apache.iceberg:iceberg-parquet:${project.ext.apacheIcebergVersion}"
api "org.apache.iceberg:iceberg-nessie:${project.ext.apacheIcebergVersion}"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.s3_data_lake.io;
package io.airbyte.cdk.load.toolkits.iceberg.parquet.io;

import io.airbyte.cdk.ConfigErrorException;
import java.io.IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.s3_data_lake
package io.airbyte.cdk.load.toolkits.iceberg.parquet

import io.airbyte.cdk.ConfigErrorException
import jakarta.inject.Singleton
Expand All @@ -19,22 +19,22 @@ import org.apache.iceberg.types.Types.*
* The "supertype" is a type to which both input types can safely be promoted without data loss. For
* instance, INT can be promoted to LONG, FLOAT can be promoted to DOUBLE, etc.
*
* @property S3DataLakeTypesComparator comparator used to verify deep type equality.
* @property IcebergTypesComparator comparator used to verify deep type equality.
*/
@Singleton
class S3DataLakeSuperTypeFinder(private val s3DataLakeTypesComparator: S3DataLakeTypesComparator) {
class IcebergSuperTypeFinder(private val icebergTypesComparator: IcebergTypesComparator) {
private val unsupportedTypeIds = setOf(BINARY, DECIMAL, FIXED, UUID, MAP, TIMESTAMP_NANO)

/**
* Returns a supertype for [existingType] and [incomingType] if one exists.
* - If they are deeply equal (according to [S3DataLakeTypesComparator.typesAreEqual]), returns
* the [existingType] as-is.
* - If they are deeply equal (according to [IcebergTypesComparator.typesAreEqual]), returns the
* [existingType] as-is.
* - Otherwise, attempts to combine them into a valid supertype.
* - Throws [ConfigErrorException] if no valid supertype can be found.
*/
fun findSuperType(existingType: Type, incomingType: Type, columnName: String): Type {
// If the two types are already deeply equal, return one of them (arbitrary).
if (s3DataLakeTypesComparator.typesAreEqual(incomingType, existingType)) {
if (icebergTypesComparator.typesAreEqual(incomingType, existingType)) {
return existingType
}
// Otherwise, attempt to combine them into a valid supertype.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,19 @@
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.s3_data_lake
package io.airbyte.cdk.load.toolkits.iceberg.parquet

import io.airbyte.cdk.ConfigErrorException
import io.airbyte.integrations.destination.s3_data_lake.S3DataLakeTypesComparator.Companion.PARENT_CHILD_SEPARATOR
import io.airbyte.integrations.destination.s3_data_lake.S3DataLakeTypesComparator.Companion.splitIntoParentAndLeaf
import io.airbyte.cdk.load.toolkits.iceberg.parquet.IcebergTypesComparator.Companion.PARENT_CHILD_SEPARATOR
import io.airbyte.cdk.load.toolkits.iceberg.parquet.IcebergTypesComparator.Companion.splitIntoParentAndLeaf
import jakarta.inject.Singleton
import org.apache.iceberg.Schema
import org.apache.iceberg.Table
import org.apache.iceberg.UpdateSchema
import org.apache.iceberg.types.Type
import org.apache.iceberg.types.Type.PrimitiveType

/** Describes how the [S3DataLakeTableSynchronizer] handles column type changes. */
/** Describes how the [IcebergTableSynchronizer] handles column type changes. */
enum class ColumnTypeChangeBehavior {
/**
* Find the supertype between the old and new types, throwing an error if Iceberg does not
Expand All @@ -30,7 +30,7 @@ enum class ColumnTypeChangeBehavior {
};

/**
* If true, [S3DataLakeTableSynchronizer.maybeApplySchemaChanges] will commit the schema update
* If true, [IcebergTableSynchronizer.maybeApplySchemaChanges] will commit the schema update
* itself. If false, the caller is responsible for calling
* `schemaUpdateResult.pendingUpdate?.commit()`.
*/
Expand All @@ -50,9 +50,9 @@ enum class ColumnTypeChangeBehavior {
* @property superTypeFinder Used to find a common supertype when data types differ.
*/
@Singleton
class S3DataLakeTableSynchronizer(
private val comparator: S3DataLakeTypesComparator,
private val superTypeFinder: S3DataLakeSuperTypeFinder,
class IcebergTableSynchronizer(
private val comparator: IcebergTypesComparator,
private val superTypeFinder: IcebergSuperTypeFinder,
) {
/**
* Compare [table]'s current schema with [incomingSchema] and apply changes as needed:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.s3_data_lake
package io.airbyte.cdk.load.toolkits.iceberg.parquet

import jakarta.inject.Singleton
import org.apache.iceberg.Schema
Expand All @@ -17,7 +17,7 @@ import org.apache.iceberg.types.Types
* - Columns that changed from required to optional.
*/
@Singleton
class S3DataLakeTypesComparator {
class IcebergTypesComparator {

companion object {
/** Separator used to represent nested field paths: parent~child. */
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.load.toolkits.iceberg.parquet

import io.airbyte.cdk.load.command.DestinationStream
import org.apache.iceberg.catalog.Namespace
import org.apache.iceberg.catalog.TableIdentifier

/**
* Convert our internal stream descriptor to an Iceberg [TableIdentifier]. Implementations should
* handle catalog-specific naming restrictions.
*/
// TODO accept default namespace in config as a val here
interface TableIdGenerator {
fun toTableIdentifier(stream: DestinationStream.Descriptor): TableIdentifier
}

class SimpleTableIdGenerator(private val configNamespace: String? = "") : TableIdGenerator {
override fun toTableIdentifier(stream: DestinationStream.Descriptor): TableIdentifier {
val namespace = stream.namespace ?: configNamespace
return tableIdOf(namespace!!, stream.name)
}
}

// iceberg namespace+name must both be nonnull.
fun tableIdOf(namespace: String, name: String): TableIdentifier =
TableIdentifier.of(Namespace.of(namespace), name)
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.s3_data_lake.io
package io.airbyte.cdk.load.toolkits.iceberg.parquet.io

import jakarta.inject.Singleton
import org.apache.iceberg.Table
Expand All @@ -16,7 +16,7 @@ import org.apache.iceberg.io.SupportsPrefixOperations
* catalog implementations do not clear the underlying files written to table storage.
*/
@Singleton
class S3DataLakeTableCleaner(private val s3DataLakeUtil: S3DataLakeUtil) {
class IcebergTableCleaner(private val icebergUtil: IcebergUtil) {

/**
* Clears the table identified by the provided [TableIdentifier]. This removes all data and
Expand Down Expand Up @@ -49,7 +49,7 @@ class S3DataLakeTableCleaner(private val s3DataLakeUtil: S3DataLakeUtil) {
val genIdsToDelete =
generationIdSuffix
.filter {
s3DataLakeUtil.assertGenerationIdSuffixIsOfValidFormat(it)
icebergUtil.assertGenerationIdSuffixIsOfValidFormat(it)
true
}
.toSet()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.s3_data_lake.io
package io.airbyte.cdk.load.toolkits.iceberg.parquet.io

import io.airbyte.cdk.load.command.Append
import io.airbyte.cdk.load.command.Dedupe
Expand Down Expand Up @@ -30,7 +30,7 @@ import org.apache.iceberg.util.PropertyUtil
* and whether primary keys are configured on the destination table's schema.
*/
@Singleton
class S3DataLakeTableWriterFactory(private val s3DataLakeUtil: S3DataLakeUtil) {
class IcebergTableWriterFactory(private val icebergUtil: IcebergUtil) {
/**
* Creates a new [BaseTaskWriter] based on the configuration of the destination target [Table].
*
Expand All @@ -45,7 +45,7 @@ class S3DataLakeTableWriterFactory(private val s3DataLakeUtil: S3DataLakeUtil) {
importType: ImportType,
schema: Schema
): BaseTaskWriter<Record> {
s3DataLakeUtil.assertGenerationIdSuffixIsOfValidFormat(generationId)
icebergUtil.assertGenerationIdSuffixIsOfValidFormat(generationId)
val format =
FileFormat.valueOf(
table
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.load.toolkits.iceberg.parquet.io

import io.airbyte.cdk.load.command.Dedupe
import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.command.ImportType
import io.airbyte.cdk.load.data.MapperPipeline
import io.airbyte.cdk.load.data.NullValue
import io.airbyte.cdk.load.data.ObjectValue
import io.airbyte.cdk.load.data.iceberg.parquet.toIcebergRecord
import io.airbyte.cdk.load.data.iceberg.parquet.toIcebergSchema
import io.airbyte.cdk.load.data.withAirbyteMeta
import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue
import io.airbyte.cdk.load.toolkits.iceberg.parquet.TableIdGenerator
import io.github.oshai.kotlinlogging.KotlinLogging
import javax.inject.Singleton
import org.apache.hadoop.conf.Configuration
import org.apache.iceberg.CatalogUtil
import org.apache.iceberg.FileFormat
import org.apache.iceberg.Schema
import org.apache.iceberg.SortOrder
import org.apache.iceberg.Table
import org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT
import org.apache.iceberg.catalog.Catalog
import org.apache.iceberg.catalog.SupportsNamespaces
import org.apache.iceberg.data.Record
import org.apache.iceberg.exceptions.AlreadyExistsException

private val logger = KotlinLogging.logger {}

const val AIRBYTE_CDC_DELETE_COLUMN = "_ab_cdc_deleted_at"

@Singleton
class IcebergUtil(private val tableIdGenerator: TableIdGenerator) {
class InvalidFormatException(message: String) : Exception(message)

private val generationIdRegex = Regex("""ab-generation-id-\d+-e""")

fun assertGenerationIdSuffixIsOfValidFormat(generationId: String) {
if (!generationIdRegex.matches(generationId)) {
throw InvalidFormatException(
"Invalid format: $generationId. Expected format is 'ab-generation-id-<number>-e'",
)
}
}

fun constructGenerationIdSuffix(stream: DestinationStream): String {
return constructGenerationIdSuffix(stream.generationId)
}

fun constructGenerationIdSuffix(generationId: Long): String {
if (generationId < 0) {
throw IllegalArgumentException(
"GenerationId must be non-negative. Provided: $generationId",
)
}
return "ab-generation-id-${generationId}-e"
}
/**
* Builds an Iceberg [Catalog].
*
* @param catalogName The name of the catalog.
* @param properties The map of catalog configuration properties.
* @return The configured Iceberg [Catalog].
*/
fun createCatalog(catalogName: String, properties: Map<String, String>): Catalog {
return CatalogUtil.buildIcebergCatalog(catalogName, properties, Configuration())
}

/** Create the namespace if it doesn't already exist. */
fun createNamespace(streamDescriptor: DestinationStream.Descriptor, catalog: Catalog) {
val tableIdentifier = tableIdGenerator.toTableIdentifier(streamDescriptor)
synchronized(tableIdentifier.namespace()) {
if (
catalog is SupportsNamespaces &&
!catalog.namespaceExists(tableIdentifier.namespace())
) {
try {
catalog.createNamespace(tableIdentifier.namespace())
logger.info { "Created namespace '${tableIdentifier.namespace()}'." }
} catch (e: AlreadyExistsException) {
// This exception occurs when multiple threads attempt to write to the same
// namespace in parallel.
// One thread may create the namespace successfully, causing the other threads
// to encounter this exception
// when they also try to create the namespace.
logger.info {
"Namespace '${tableIdentifier.namespace()}' was likely created by another thread during parallel operations."
}
}
}
}
}

/**
* Builds (if necessary) an Iceberg [Table]. This includes creating the table's namespace if it
* does not already exist. If the [Table] already exists, it is loaded from the [Catalog].
*
* @param streamDescriptor The [DestinationStream.Descriptor] that contains the Airbyte stream's
* namespace and name.
* @param catalog The Iceberg [Catalog] that contains the [Table] or should contain it once
* created.
* @param schema The Iceberg [Schema] associated with the [Table].
* @param properties The [Table] configuration properties derived from the [Catalog].
* @return The Iceberg [Table], created if it does not yet exist.
*/
fun createTable(
streamDescriptor: DestinationStream.Descriptor,
catalog: Catalog,
schema: Schema,
properties: Map<String, String>
): Table {
val tableIdentifier = tableIdGenerator.toTableIdentifier(streamDescriptor)
return if (!catalog.tableExists(tableIdentifier)) {
logger.info { "Creating Iceberg table '$tableIdentifier'...." }
catalog
.buildTable(tableIdentifier, schema)
.withProperties(properties)
.withProperty(DEFAULT_FILE_FORMAT, FileFormat.PARQUET.name.lowercase())
.withSortOrder(getSortOrder(schema = schema))
.create()
} else {
logger.info { "Loading Iceberg table $tableIdentifier ..." }
catalog.loadTable(tableIdentifier)
}
}

/**
* Converts an Airbyte [DestinationRecordAirbyteValue] into an Iceberg [Record]. The converted
* record will be wrapped to include [Operation] information, which is used by the writer to
* determine how to write the data to the underlying Iceberg files.
*
* @param record The Airbyte [DestinationRecordAirbyteValue] record to be converted for writing
* by Iceberg.
* @param stream The Airbyte [DestinationStream] that contains information about the stream.
* @param tableSchema The Iceberg [Table] [Schema].
* @param pipeline The [MapperPipeline] used to convert the Airbyte record to an Iceberg record.
* @return An Iceberg [Record] representation of the Airbyte [DestinationRecordAirbyteValue].
*/
fun toRecord(
record: DestinationRecordAirbyteValue,
stream: DestinationStream,
tableSchema: Schema,
pipeline: MapperPipeline
): Record {
val dataMapped =
pipeline
.map(record.data, record.meta?.changes)
.withAirbyteMeta(stream, record.emittedAtMs, true)
// TODO figure out how to detect the actual operation value
return RecordWrapper(
delegate = dataMapped.toIcebergRecord(tableSchema),
operation = getOperation(record = record, importType = stream.importType)
)
}

fun toIcebergSchema(stream: DestinationStream, pipeline: MapperPipeline): Schema {
val primaryKeys =
when (stream.importType) {
is Dedupe -> (stream.importType as Dedupe).primaryKey
else -> emptyList()
}
return pipeline.finalSchema.withAirbyteMeta(true).toIcebergSchema(primaryKeys)
}

private fun getSortOrder(schema: Schema): SortOrder {
val builder = SortOrder.builderFor(schema)
schema.identifierFieldNames().forEach { builder.asc(it) }
return builder.build()
}

private fun getOperation(
record: DestinationRecordAirbyteValue,
importType: ImportType,
): Operation =
if (
record.data is ObjectValue &&
(record.data as ObjectValue).values[AIRBYTE_CDC_DELETE_COLUMN] != null &&
(record.data as ObjectValue).values[AIRBYTE_CDC_DELETE_COLUMN] !is NullValue
) {
Operation.DELETE
} else if (importType is Dedupe) {
Operation.UPDATE
} else {
Operation.INSERT
}
}
Loading
Loading