-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Load-CDK/Destination-S3DataLake: DirectLoader (no spill2disk, partitioning) #53241
base: master
Are you sure you want to change the base?
Conversation
The latest updates on your projects. Learn more about Vercel for Git ↗︎
|
c32e45a
to
1e35293
Compare
1e35293
to
62d9812
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
initial skim, had a few very surface-level comments
import kotlinx.coroutines.flow.consumeAsFlow | ||
|
||
class PartitionedQueue<T>(val partitions: Int, capacity: Int = 1) : AutoCloseable { | ||
private val channel: ConcurrentLinkedQueue<Channel<T>> = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can this just be plain List<Channel<T>>
? (since it doesn't look like you're ever adding/removing items)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah
* for making state generated during initialization generally available to the Loaders. | ||
*/ | ||
@Singleton | ||
class StreamStateStore<S> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: rename file to StreamStateStore.kt, or rename this class to StreamLoaderStore
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I'll factor it out completely
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, after chatting with @tryangul , I think we should promote this to a first class citizen for providing state shared between different objects.
* [DirectLoader] is for the use case where records are loaded directly into the destination or | ||
* added incrementally via a 3rd party library (eg, Iceberg) | ||
* | ||
* One direct loader will be created per batch of records per stream (optionally: and per part). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: it's not immediately obvious what this means (i.e. that a DirectLoader lives until it returns Complete, which is either of its own volition in accept
, or forcibly in finish
) - I was initially confused that maybe a loader would be reused after a finish
call
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll clarify
core: | ||
load-pipeline: | ||
strategy: direct | ||
input-parts: 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: these two properties should live in src/main/resources/application-override.yaml
, since they also apply in production
(the record-batch-size-override shouldn't even be needed :/ that should already be set everywhere by this thing https://github.com/airbytehq/airbyte/blob/master/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/IntegrationTest.kt#L306)
* received. Its end is user-defined, or forced at the end of the stream. | ||
* | ||
* To enable, set [airbyte.destination.core.load-pipeline.strategy=direct] in the connector's | ||
* application.yaml. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/application.yaml/application-override.yaml/g
airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/pipeline/InputPartitioner.kt
Outdated
Show resolved
Hide resolved
val inputQueue: PartitionedQueue<PipelineEvent<K, DestinationRecordAirbyteValue>>, | ||
@Named("batchStateUpdateQueue") val batchQueue: QueueWriter<BatchUpdate>, | ||
@Value("\${airbyte.destination.core.load-pipeline.input-parts}") | ||
private val numWorkers: Int? = null, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
noncritical wishlist: setting worker count per stream, based on properties of that stream
specifically, iceberg needs to guarantee record ordering per PK in dedup mode (possibly only in CDC syncs), so it would be cool to be able to do
if (stream.syncMode is Dedupe && stream.columns.containsKey("_ab_cdc_deleted_at")) {
numWorkers = 1
} else {
numWorkers = some bigger number
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this would be WorkerConfiguration you could inject that would override the values?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe - I think in that world we'd need to define a worker config per stream
or maybe the other thing we could do, is something like:
- instead of passing the
numPartitions
value up to the CDK and then back down to the partitioner - have that value live completely within connector code
- and then the partitioner could look at the stream, in addition to the record - e.g.
fun getPart(record: DestinationRecord, stream: DestinationStream) {
if (stream.dedup && cdc && whatever) {
// one partition for the whole stream
return hash(stream)
} else {
// 5 partitions for the stream, partitioned by PK
return hash(stream, record.pk) % 5
}
}
(and if the CDK wants to limit the global worker count, it would then do getPart() % workerCount
to get the actual partition ID)
val inputQueue: PartitionedQueue<PipelineEvent<K, DestinationRecordAirbyteValue>>, | ||
@Named("batchStateUpdateQueue") val batchQueue: QueueWriter<BatchUpdate>, | ||
@Value("\${airbyte.destination.core.load-pipeline.input-parts}") | ||
private val numWorkers: Int? = null, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also, checking my understanding - iceberg's (and presumably mssql's) requirement is to set 1 thread per partition - is this doing that, or is this setting the number of workers total?
edit: or maybe numWorkers == num partitions, and partitions are always processed in-order singlethreaded?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
workers == partitions rn. but yes they are different concepts and as long as we're only exposing partitions rn, it should be called partitions w/ the proviso that 1 thread (coruotine) will be assigned to each partition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cool, that makes sense. and honestly probably simplifies the interface to always have one worker per partition?
b/c if someone wants multiple workers per partition, they could always do return hash(random() % workersPerPartition, actualPartitionHashmod)
or something
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's really fun is when we start needing to distinguish between the partition key and the key on which we keep loader state..... fortunately we don't have to think about that here
airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/write/DirectLoader.kt
Outdated
Show resolved
Hide resolved
airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/write/DirectLoader.kt
Outdated
Show resolved
Hide resolved
...e/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeDirectLoader.kt
Outdated
Show resolved
Hide resolved
...e/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeDirectLoader.kt
Outdated
Show resolved
Hide resolved
...e/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeDirectLoader.kt
Outdated
Show resolved
Hide resolved
...e/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeDirectLoader.kt
Outdated
Show resolved
Hide resolved
mainBranchName = icebergConfiguration.icebergCatalogConfiguration.mainBranchName, | ||
) | ||
|
||
streamStateStore.put(stream.descriptor, loader) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is confusing to me.
Anyway this could be added to the loader instead?
I am not sure I understand why we need this 3rd party StateStore object
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See earlier comment.
Some more detail:
There are a few options to handle scope here
- put a method into the streamloader that provides the Loader object the stream is using (and thread dependencies through)
- have an explicit contract that the factory sees the loader on create
- have an explicit contract that
start
returns a user-defined state object that is passed to create - implement a statestore and leave the user free to pass thread-safe state between objects
The last option is the most flexible and maintainable, as it leaves us free to change the type or signatures of either object or method (StreamLoader, start, DirectLoaderFactory, create) w/o breaking any implemented behavior.
|
||
override fun getPart(outputKey: K2, numParts: Int): Int { | ||
throw NotImplementedError("This partitioner is a no-op and should not be used.") | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's this Noop class used for? Also, Nit: Nop -> Noop
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@@ -197,7 +197,7 @@ class DefaultStreamManager( | |||
} | |||
val stateRangesJoined = | |||
stateRangesToAdd.joinToString(",") { "${it.first}->${it.second}" } | |||
val readRange = TreeRangeSet.create(listOf(Range.closed(0, recordCount.get()))) | |||
val readRange = TreeRangeSet.create(listOf(Range.closed(0, recordCount.get() - 1))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why a minus 1 here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a debug log for the old range-based accounting.
I added the -1 when I was debugging the S3DataLake tests, because recordCount.get()
returns the count, not the last index, and it was annoying me. I'll clean it up.
ConcurrentLinkedQueue((0 until partitions).map { Channel(capacity) }) | ||
|
||
suspend fun consume(partition: Int): Flow<T> = channel.elementAt(partition).consumeAsFlow() | ||
suspend fun publish(value: T, partition: Int) = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why pass the partition key in instead of just working with the partitions? i.e. getPartition(key) -> Partition
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't want to couple the queue with the key type / key -> partition logic.
I think I'd rather have this just be a queue decorator, totally agnostic of the underlying queue type.
LoadPipelineStep( | ||
numWorkers = numWorkers ?: 1, | ||
taskForPartition = { part -> | ||
return@LoadPipelineStep LoadPipelineStepTask( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd suggest making beans for the tasks and remove this factory layer.
* partitioned by a hash of the stream name and namespace. | ||
*/ | ||
interface InputPartitioner { | ||
fun getPart(record: DestinationRecordAirbyteValue, numParts: Int): Int |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: getPart
is misleading given Part
is an object itself. Suggestion: method could be named something generic apply
as the partitioning bit is implied in the class name or just getPartition
etc
|
||
data class LoadPipelineStep( | ||
val numWorkers: Int, | ||
val taskForPartition: suspend (Int) -> LoadPipelineStepTask<*, *, *, *, *> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggestion: collapse task and pipeline step.
80b7ce5
to
409422f
Compare
fyi: I'm moving a ton of files around for #53700 (with the goal of enabling a destination-gcs-data-lake). Which prooooooobably conflicts with some of the changes in this PR :( |
409422f
to
79f9b87
Compare
58b8266
to
9e6bb11
Compare
7015ba6
to
9f4b9e5
Compare
9f4b9e5
to
8783a37
Compare
8783a37
to
ea5dd60
Compare
8811fc7
to
acf02d1
Compare
acf02d1
to
2455a13
Compare
5df096e
to
d4fd5fb
Compare
d4fd5fb
to
b538c10
Compare
b538c10
to
d9992d2
Compare
d9992d2
to
b137d7b
Compare
Theory
What's here
DirectLoader;
its usage is described in detail in the comments (in thewrite
module)pipeline
module)What's not here
Frequently questioned answers
Some stuff that might be controversial, as it required picking between contradictory asks / feedback:
The factory interface for the loader
The obvious choice is between a stateless model with a monadic interface and a stateful model with a factory interface. I tried both in parallel, but the stateless one's explanatory comments were about 50% longer, due to the ambiguities that arise over when the state is passed to what. Additionally
If anyone misses the fp style, feel free to think of the factory as a curried constructor.
Using micronaut to wire up the parts.
I'm still not 100% sure on this. The decision was between nested factories (DestinationWriter -> StreamLoader -> DirectLoader, etc) and a micronaut contract. Basically the tradeoff was between the unambiguous scope / sequencing implied by the nested factories and the difficulty of wiring everything together under the hood when everything was created lazily.
Especially the forced introduction of the state store feels like a hack. (Can we design this away by rethinking
DestinationWriter
/StreamLoader
?)