Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Load-CDK/Destination-S3DataLake: DirectLoader (no spill2disk, partitioning) #53241

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

johnny-schmidt
Copy link
Contributor

@johnny-schmidt johnny-schmidt commented Feb 8, 2025

Theory

  • The dev interface is actually a lot of different interfaces, each affined to a use case (DirectLoad, BulkLoad, BulkInsert, etc) and configurable via micronaut; the idea is to hide all the internals (like batch state) from the dev completely, while maintaining strict inversion of control
  • These heterogenous interfaces are mapped to a common generic pipeline under the hood: the idea is to make this mapping as cheap to implement as possible, so the cost of adding new interfaces for new use cases is low

What's here

  • the first strategy: DirectLoader; its usage is described in detail in the comments (in the write module)
  • the internals required to implement it (in the pipeline module)
  • an S3DataLake implementation, enabled for testing (tests all pass without parallelism enabled)
  • some extra stuff to shim it into the current workflow
    • a dev-facing "StreamStateStore" so the dev can pass the initialized StreamLoader to the loaders (awkward!)
    • a new record queue (all streams in one queue)
    • a new path for doing batch updates
  • a little bit more than necessary for the direct loader (basically, the skeleton of the multi-stage implementations is already here)

What's not here

  • S3DataLake tests don't past with input-parts > 2 (syncs succeed but the assertions fail); there's clearly more going on here w/r/t running the writer in parallel
  • Some subtleties around partitioning (partition-v-key)
  • S3DataLake is crammed in with the least effort. This can definitely be made more elegant
  • Cleaning up the workflow outside the new pipeline (especially start/stop). I did the least amount to shim things in.

Frequently questioned answers

Some stuff that might be controversial, as it required picking between contradictory asks / feedback:

The factory interface for the loader

The obvious choice is between a stateless model with a monadic interface and a stateful model with a factory interface. I tried both in parallel, but the stateless one's explanatory comments were about 50% longer, due to the ambiguities that arise over when the state is passed to what. Additionally

  • the stateful model is more accessible (it's an older pattern, and a lot of people missed the fp craze)
  • it's more natural to modify state in most cases than to do copy-construction, and sometimes more efficient (also we don't have to reason about whether we're defeating jvm escape analysis)
    If anyone misses the fp style, feel free to think of the factory as a curried constructor.

Using micronaut to wire up the parts.

I'm still not 100% sure on this. The decision was between nested factories (DestinationWriter -> StreamLoader -> DirectLoader, etc) and a micronaut contract. Basically the tradeoff was between the unambiguous scope / sequencing implied by the nested factories and the difficulty of wiring everything together under the hood when everything was created lazily.

Especially the forced introduction of the state store feels like a hack. (Can we design this away by rethinking DestinationWriter/StreamLoader?)

Copy link

vercel bot commented Feb 8, 2025

The latest updates on your projects. Learn more about Vercel for Git ↗︎

Name Status Preview Comments Updated (UTC)
airbyte-docs ✅ Ready (Inspect) Visit Preview 💬 Add feedback Feb 20, 2025 9:09pm

Copy link
Contributor

@edgao edgao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

initial skim, had a few very surface-level comments

import kotlinx.coroutines.flow.consumeAsFlow

class PartitionedQueue<T>(val partitions: Int, capacity: Int = 1) : AutoCloseable {
private val channel: ConcurrentLinkedQueue<Channel<T>> =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can this just be plain List<Channel<T>>? (since it doesn't look like you're ever adding/removing items)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah

* for making state generated during initialization generally available to the Loaders.
*/
@Singleton
class StreamStateStore<S> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: rename file to StreamStateStore.kt, or rename this class to StreamLoaderStore

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I'll factor it out completely

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, after chatting with @tryangul , I think we should promote this to a first class citizen for providing state shared between different objects.

* [DirectLoader] is for the use case where records are loaded directly into the destination or
* added incrementally via a 3rd party library (eg, Iceberg)
*
* One direct loader will be created per batch of records per stream (optionally: and per part).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: it's not immediately obvious what this means (i.e. that a DirectLoader lives until it returns Complete, which is either of its own volition in accept, or forcibly in finish) - I was initially confused that maybe a loader would be reused after a finish call

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll clarify

core:
load-pipeline:
strategy: direct
input-parts: 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: these two properties should live in src/main/resources/application-override.yaml, since they also apply in production

(the record-batch-size-override shouldn't even be needed :/ that should already be set everywhere by this thing https://github.com/airbytehq/airbyte/blob/master/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/IntegrationTest.kt#L306)

* received. Its end is user-defined, or forced at the end of the stream.
*
* To enable, set [airbyte.destination.core.load-pipeline.strategy=direct] in the connector's
* application.yaml.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/application.yaml/application-override.yaml/g

val inputQueue: PartitionedQueue<PipelineEvent<K, DestinationRecordAirbyteValue>>,
@Named("batchStateUpdateQueue") val batchQueue: QueueWriter<BatchUpdate>,
@Value("\${airbyte.destination.core.load-pipeline.input-parts}")
private val numWorkers: Int? = null,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

noncritical wishlist: setting worker count per stream, based on properties of that stream

specifically, iceberg needs to guarantee record ordering per PK in dedup mode (possibly only in CDC syncs), so it would be cool to be able to do

if (stream.syncMode is Dedupe && stream.columns.containsKey("_ab_cdc_deleted_at")) {
  numWorkers = 1
} else {
  numWorkers = some bigger number
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this would be WorkerConfiguration you could inject that would override the values?

Copy link
Contributor

@edgao edgao Feb 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe - I think in that world we'd need to define a worker config per stream


or maybe the other thing we could do, is something like:

  • instead of passing the numPartitions value up to the CDK and then back down to the partitioner
  • have that value live completely within connector code
  • and then the partitioner could look at the stream, in addition to the record - e.g.
fun getPart(record: DestinationRecord, stream: DestinationStream) {
  if (stream.dedup && cdc && whatever) {
    // one partition for the whole stream
    return hash(stream)
  } else {
    // 5 partitions for the stream, partitioned by PK
    return hash(stream, record.pk) % 5
  }
}

(and if the CDK wants to limit the global worker count, it would then do getPart() % workerCount to get the actual partition ID)

val inputQueue: PartitionedQueue<PipelineEvent<K, DestinationRecordAirbyteValue>>,
@Named("batchStateUpdateQueue") val batchQueue: QueueWriter<BatchUpdate>,
@Value("\${airbyte.destination.core.load-pipeline.input-parts}")
private val numWorkers: Int? = null,
Copy link
Contributor

@edgao edgao Feb 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also, checking my understanding - iceberg's (and presumably mssql's) requirement is to set 1 thread per partition - is this doing that, or is this setting the number of workers total?

edit: or maybe numWorkers == num partitions, and partitions are always processed in-order singlethreaded?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

workers == partitions rn. but yes they are different concepts and as long as we're only exposing partitions rn, it should be called partitions w/ the proviso that 1 thread (coruotine) will be assigned to each partition.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cool, that makes sense. and honestly probably simplifies the interface to always have one worker per partition?

b/c if someone wants multiple workers per partition, they could always do return hash(random() % workersPerPartition, actualPartitionHashmod) or something

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's really fun is when we start needing to distinguish between the partition key and the key on which we keep loader state..... fortunately we don't have to think about that here

mainBranchName = icebergConfiguration.icebergCatalogConfiguration.mainBranchName,
)

streamStateStore.put(stream.descriptor, loader)
Copy link
Contributor

@frifriSF59 frifriSF59 Feb 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is confusing to me.
Anyway this could be added to the loader instead?
I am not sure I understand why we need this 3rd party StateStore object

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See earlier comment.

Some more detail:

There are a few options to handle scope here

  • put a method into the streamloader that provides the Loader object the stream is using (and thread dependencies through)
  • have an explicit contract that the factory sees the loader on create
  • have an explicit contract that start returns a user-defined state object that is passed to create
  • implement a statestore and leave the user free to pass thread-safe state between objects

The last option is the most flexible and maintainable, as it leaves us free to change the type or signatures of either object or method (StreamLoader, start, DirectLoaderFactory, create) w/o breaking any implemented behavior.


override fun getPart(outputKey: K2, numParts: Int): Int {
throw NotImplementedError("This partitioner is a no-op and should not be used.")
}
Copy link
Contributor

@tryangul tryangul Feb 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's this Noop class used for? Also, Nit: Nop -> Noop

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's because the last task in the pipeline has no output, but I can't make the partitioner nullable w/o causing a compilation error because K2 won't resolve. I can probably factor that out though.

Also
Screenshot 2025-02-13 at 10 02 54 AM
:)
I agree Noop is better tho.

@@ -197,7 +197,7 @@ class DefaultStreamManager(
}
val stateRangesJoined =
stateRangesToAdd.joinToString(",") { "${it.first}->${it.second}" }
val readRange = TreeRangeSet.create(listOf(Range.closed(0, recordCount.get())))
val readRange = TreeRangeSet.create(listOf(Range.closed(0, recordCount.get() - 1)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why a minus 1 here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a debug log for the old range-based accounting.

I added the -1 when I was debugging the S3DataLake tests, because recordCount.get() returns the count, not the last index, and it was annoying me. I'll clean it up.

ConcurrentLinkedQueue((0 until partitions).map { Channel(capacity) })

suspend fun consume(partition: Int): Flow<T> = channel.elementAt(partition).consumeAsFlow()
suspend fun publish(value: T, partition: Int) =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why pass the partition key in instead of just working with the partitions? i.e. getPartition(key) -> Partition

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't want to couple the queue with the key type / key -> partition logic.

I think I'd rather have this just be a queue decorator, totally agnostic of the underlying queue type.

LoadPipelineStep(
numWorkers = numWorkers ?: 1,
taskForPartition = { part ->
return@LoadPipelineStep LoadPipelineStepTask(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd suggest making beans for the tasks and remove this factory layer.

* partitioned by a hash of the stream name and namespace.
*/
interface InputPartitioner {
fun getPart(record: DestinationRecordAirbyteValue, numParts: Int): Int
Copy link
Contributor

@tryangul tryangul Feb 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: getPart is misleading given Part is an object itself. Suggestion: method could be named something generic apply as the partitioning bit is implied in the class name or just getPartition etc


data class LoadPipelineStep(
val numWorkers: Int,
val taskForPartition: suspend (Int) -> LoadPipelineStepTask<*, *, *, *, *>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: collapse task and pipeline step.

@johnny-schmidt johnny-schmidt force-pushed the jschmidt/load-cdk/interface-two-oh branch 2 times, most recently from 80b7ce5 to 409422f Compare February 14, 2025 00:41
@edgao
Copy link
Contributor

edgao commented Feb 14, 2025

fyi: I'm moving a ton of files around for #53700 (with the goal of enabling a destination-gcs-data-lake). Which prooooooobably conflicts with some of the changes in this PR :(

@johnny-schmidt johnny-schmidt force-pushed the jschmidt/load-cdk/interface-two-oh branch from 409422f to 79f9b87 Compare February 18, 2025 17:52
@johnny-schmidt johnny-schmidt force-pushed the jschmidt/load-cdk/interface-two-oh branch from 58b8266 to 9e6bb11 Compare February 18, 2025 22:04
@johnny-schmidt johnny-schmidt force-pushed the jschmidt/load-cdk/interface-two-oh branch 2 times, most recently from 7015ba6 to 9f4b9e5 Compare February 18, 2025 23:28
@johnny-schmidt johnny-schmidt force-pushed the jschmidt/load-cdk/interface-two-oh branch from 9f4b9e5 to 8783a37 Compare February 19, 2025 17:37
@johnny-schmidt johnny-schmidt force-pushed the jschmidt/load-cdk/interface-two-oh branch from 8783a37 to ea5dd60 Compare February 19, 2025 21:02
@johnny-schmidt johnny-schmidt force-pushed the jschmidt/load-cdk/interface-two-oh branch 2 times, most recently from 8811fc7 to acf02d1 Compare February 19, 2025 21:47
@johnny-schmidt johnny-schmidt force-pushed the jschmidt/load-cdk/interface-two-oh branch from acf02d1 to 2455a13 Compare February 19, 2025 23:42
@johnny-schmidt johnny-schmidt force-pushed the jschmidt/load-cdk/interface-two-oh branch 3 times, most recently from 5df096e to d4fd5fb Compare February 20, 2025 00:08
@johnny-schmidt johnny-schmidt force-pushed the jschmidt/load-cdk/interface-two-oh branch from d4fd5fb to b538c10 Compare February 20, 2025 17:17
@johnny-schmidt johnny-schmidt changed the title WIP: Load CDK: New Interface proposal Load-CDK/Destination-S3DataLake: DirectLoader (no spill2disk, partitioning) Feb 20, 2025
@johnny-schmidt johnny-schmidt marked this pull request as ready for review February 20, 2025 18:16
@johnny-schmidt johnny-schmidt requested a review from a team as a code owner February 20, 2025 18:16
@johnny-schmidt johnny-schmidt force-pushed the jschmidt/load-cdk/interface-two-oh branch from b538c10 to d9992d2 Compare February 20, 2025 20:57
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants