Skip to content

Commit f3ace5e

Browse files
committed
🔥 Realtime Data Enrichment - add get_enrichment_table_record fn to VRL log transform pipeline
1 parent 8ad22d2 commit f3ace5e

File tree

9 files changed

+252
-28
lines changed

9 files changed

+252
-28
lines changed
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
{"user_id":"john","name":"John Doe"}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
name: "user_info"
2+
3+
enrichment_type: "static"
4+
5+
# Iceberg schema (same as log sources)
6+
schema:
7+
fields:
8+
- name: name
9+
type: string
10+
- name: user_id
11+
type: string
12+
lookup_keys:
13+
- user_id

‎infra/src/DPMainStack.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,14 @@ export class DPMainStack extends MatanoStack {
199199
});
200200
transformer.node.addDependency(sqsSources);
201201

202+
if (enrichment != null) {
203+
transformer.transformerLambda.addEnvironment(
204+
"ENRICHMENT_TABLES_BUCKET",
205+
enrichment.enrichmentTablesBucket.bucketName
206+
);
207+
enrichment.enrichmentTablesBucket.grantRead(transformer.transformerLambda);
208+
}
209+
202210
const rawDataBatcher = new DataBatcher(this, "DataBatcher", {
203211
transformerFunction: transformer.transformerLambda,
204212
s3Bucket: props.matanoSourcesBucket,

‎lib/java/matano/iceberg_main/src/main/kotlin/com/matano/iceberg/AthenaUtil.kt

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
package com.matano.iceberg
22

3+
import org.slf4j.LoggerFactory
34
import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient
45
import software.amazon.awssdk.services.athena.AthenaAsyncClient
56
import software.amazon.awssdk.services.athena.model.*
67
import java.util.concurrent.CompletableFuture
78
import java.util.concurrent.TimeUnit
89

910
class AthenaQueryRunner(val workGroup: String = "matano_system") {
11+
private val logger = LoggerFactory.getLogger(this::class.java)
1012
val SLEEP_AMOUNT_MS = 500L
1113
val delayedExecutor = CompletableFuture.delayedExecutor(SLEEP_AMOUNT_MS, TimeUnit.MILLISECONDS)
1214

@@ -38,9 +40,12 @@ class AthenaQueryRunner(val workGroup: String = "matano_system") {
3840
)
3941
}
4042
QueryExecutionState.CANCELLED -> throw RuntimeException("The Amazon Athena query was cancelled.")
41-
QueryExecutionState.SUCCEEDED -> CompletableFuture.completedFuture(Unit)
43+
QueryExecutionState.SUCCEEDED -> {
44+
logger.info("Successfully completed Athena query.")
45+
CompletableFuture.completedFuture(Unit)
46+
}
4247
else -> {
43-
CompletableFuture.supplyAsync({ waitForQueryToComplete(queryExecutionId) }, delayedExecutor)
48+
CompletableFuture.supplyAsync({}, delayedExecutor).thenComposeAsync { waitForQueryToComplete(queryExecutionId) }
4449
}
4550
}
4651
}.thenApplyAsync {}

‎lib/java/matano/iceberg_main/src/main/kotlin/com/matano/iceberg/Enrichment.kt

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ class EnrichmentIcebergSyncer {
157157
TO 's3://$tempSyncBucket/$keyPrefix'
158158
WITH (format = 'AVRO')
159159
""".trimIndent()
160-
athenaQueryRunner.runAthenaQuery(qs)
160+
athenaQueryRunner.runAthenaQuery(qs).await()
161161

162162
// Make downloads concurrent
163163
val futs = s3AsyncClient.listObjectsV2 { r -> r.bucket(tempSyncBucket).prefix(keyPrefix) }.await().contents().map { res ->
@@ -229,16 +229,18 @@ class EnrichmentIcebergSyncer {
229229
TO 's3://$tempSyncBucket/$keyPrefix'
230230
WITH (format = 'PARQUET', compression='snappy')
231231
""".trimIndent()
232-
athenaQueryRunner.runAthenaQuery(qs)
232+
athenaQueryRunner.runAthenaQuery(qs).await()
233233

234-
val futs = s3AsyncClient.listObjectsV2 { r -> r.bucket(tempSyncBucket).prefix(keyPrefix) }.await().contents().map { res ->
234+
val ll = s3AsyncClient.listObjectsV2 { r -> r.bucket(tempSyncBucket).prefix(keyPrefix) }.await().contents().toList()
235+
val futs = ll.map { res ->
235236
s3AsyncClient
236237
.getObject({ it.bucket(tempSyncBucket).key(res.key()) }, AsyncResponseTransformer.toBytes())
237238
.thenApply { InMemoryInputFile(it.asByteArray()) }
238239
}
239240
CompletableFuture.allOf(*futs.toTypedArray()).await()
240241
val inputFiles = futs.map { it.get() }
241242
if (inputFiles.isEmpty()) {
243+
logger.info("Input files are empty.")
242244
return null
243245
}
244246

‎lib/rust/shared/src/avro_index.rs

Lines changed: 25 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,27 @@
1+
use crate::avro::AvroValueExt;
12
use apache_avro::from_avro_datum;
3+
use log::info;
24
use memmap2::{Mmap, MmapOptions};
35
use once_cell::sync::OnceCell;
4-
use crate::avro::AvroValueExt;
56
use std::io::{BufRead, Read};
67
use std::io::{BufReader, Seek};
78
use std::vec;
89
use std::{
910
collections::HashMap,
1011
io::Cursor,
11-
sync::{Arc, Mutex},
12+
sync::{Arc, Mutex, Weak},
1213
};
1314

1415
use anyhow::{anyhow, Context, Result};
1516

1617
/// We just need this because the Avro reader takes ownership but we want to be able to use the underlying reader to seek.
1718
struct ReaderHolder {
18-
reader: Arc<Mutex<Cursor<Mmap>>>,
19+
reader: Weak<Mutex<Cursor<Mmap>>>,
1920
}
2021

2122
impl std::io::Read for ReaderHolder {
2223
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
23-
self.reader.lock().unwrap().read(buf)
24+
self.reader.upgrade().unwrap().lock().unwrap().read(buf)
2425
}
2526
}
2627

@@ -41,12 +42,12 @@ type RecordPosMap = HashMap<String, [i64; 2]>; // [start_pos, block_length]
4142
pub struct AvroIndex {
4243
pub schema: apache_avro::Schema,
4344
indices: HashMap<String, (String, OnceCell<RecordPosMap>)>,
44-
reader: Arc<Mutex<Cursor<Mmap>>>,
45+
reader: Arc<Mmap>,
4546
}
4647

4748
impl AvroIndex {
48-
/// There's an offset of 3 bytes between the start positions written in Index and where raw compressed block starts.
49-
const OFFSET: u64 = 3;
49+
/// There's an offset of 2 bytes between the start positions written in Index and where raw compressed block starts.
50+
const OFFSET: u64 = 2;
5051
/// The length of the sync marker at the end of each block. Remove to get actual data.
5152
const SYNC_LENGTH: usize = 16;
5253

@@ -63,15 +64,24 @@ impl AvroIndex {
6364
let reader = Arc::new(Mutex::new(Cursor::new(mmap)));
6465

6566
let reader_holder = ReaderHolder {
66-
reader: reader.clone(),
67+
reader: Arc::downgrade(&reader.clone()),
6768
};
6869

6970
let avro_reader = apache_avro::Reader::new(reader_holder)?;
71+
let schema = avro_reader.writer_schema().clone();
72+
73+
let mem_cursor = Arc::try_unwrap(reader)
74+
.map_err(|e| anyhow!("Could not unwrap Arc: {:?}", e))?
75+
.into_inner()?;
76+
77+
let mem = mem_cursor.into_inner();
78+
79+
let reader = Arc::new(mem);
7080

7181
Ok(Self {
7282
indices,
7383
reader,
74-
schema: avro_reader.writer_schema().clone(),
84+
schema,
7585
})
7686
}
7787

@@ -120,19 +130,17 @@ impl AvroIndex {
120130
let block_len = *block_len as usize - (Self::OFFSET as usize + Self::SYNC_LENGTH);
121131

122132
// Read the whole block since we know the length and avoid multiple reads.
123-
// Need scope to ensure mutex is dropped and released before avro reads.
124133
let raw_block: Vec<u8> = {
125-
let mut reader = self.reader.lock().unwrap();
126-
reader.seek(std::io::SeekFrom::Start(start_pos))?;
127-
128-
let mut buf = vec![0; block_len as usize];
129-
reader.read_exact(buf.as_mut())?;
130-
134+
let reader = self.reader.clone();
135+
let start_pos = start_pos as usize;
136+
let buf = reader[start_pos..start_pos + block_len].to_vec();
131137
buf
132138
};
133139

134140
let mut block = vec![];
135-
zstd::Decoder::new(raw_block.as_slice())?.read_to_end(block.as_mut())?;
141+
zstd::Decoder::new(raw_block.as_slice())?
142+
.read_to_end(block.as_mut())
143+
.map_err(|e| anyhow!("Failed to decompress avro idx block").context(e))?;
136144

137145
let mut block_reader = block.as_slice();
138146
while !block_reader.is_empty() {

‎lib/rust/shared/src/enrichment.rs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ use async_compression::tokio::bufread::ZstdDecoder;
1515
use pyo3::prelude::*;
1616
use tokio::io::{AsyncReadExt, AsyncWriteExt};
1717
use tokio::runtime::Runtime;
18+
use tracing::log::info;
1819

1920
use crate::avro_index::AvroIndex;
2021
use crate::utils::load_enrichment_config;
@@ -88,6 +89,12 @@ pub fn create_enrichment_tables(table_names: Vec<String>) -> PyResult<Vec<Enrich
8889
Ok(tables)
8990
}
9091

92+
pub fn create_enrichment_table(table_name: &str) -> Result<EnrichmentTable> {
93+
let lookup_keys = get_enrichment_table_lookup_keys(table_name)
94+
.ok_or_else(|| anyhow!("No lookup key(s) for table {}", table_name))?;
95+
Ok(EnrichmentTable::new(table_name.to_string(), lookup_keys))
96+
}
97+
9198
fn get_enrichment_table_lookup_keys(table_name: &str) -> Option<Vec<String>> {
9299
lazy_static! {
93100
static ref ENRICHMENT_CONFIG: HashMap<String, serde_yaml::Value> =
@@ -109,6 +116,7 @@ async fn load_avro_index(table_name: &str, lookup_keys: &Vec<String>) -> Result<
109116
let bucket = std::env::var("ENRICHMENT_TABLES_BUCKET")?;
110117
let avro_key = format!("tables/{}.avro", table_name);
111118
let local_dir = std::env::temp_dir().join("tables");
119+
tokio::fs::create_dir_all(&local_dir).await?;
112120
let local_avro_path = local_dir.join(format!("{}.zstd.avro", table_name));
113121

114122
let load_avro = || async {
@@ -122,12 +130,11 @@ async fn load_avro_index(table_name: &str, lookup_keys: &Vec<String>) -> Result<
122130
.into_async_read();
123131
let mut avro_reader = tokio::io::BufReader::new(avro_reader);
124132

125-
tokio::fs::create_dir_all(&local_dir).await?;
126-
127133
let file = tokio::fs::File::create(&local_avro_path).await?;
128134
let mut writer = tokio::io::BufWriter::new(file);
129135

130136
tokio::io::copy(&mut avro_reader, &mut writer).await?;
137+
131138
anyhow::Ok(HashMap::<String, String>::with_capacity(0)) // just to align types
132139
};
133140

@@ -153,10 +160,10 @@ async fn load_avro_index(table_name: &str, lookup_keys: &Vec<String>) -> Result<
153160
let mut index_reader =
154161
ZstdDecoder::new(tokio::io::BufReader::new(index_reader));
155162

156-
let file = tokio::fs::File::create(&local_index_path).await?;
163+
let file = tokio::fs::File::create(&local_index_path).await.map_err(|e| anyhow!("failed to create file {}", local_index_path).context(e))?;
157164
let mut writer = tokio::io::BufWriter::new(file);
158165

159-
tokio::io::copy(&mut index_reader, &mut writer).await?;
166+
tokio::io::copy(&mut index_reader, &mut writer).await.map_err(|e| anyhow!("failed to copy index").context(e))?;
160167
anyhow::Ok((lookup_key.to_string(), local_index_path))
161168
}
162169
})

0 commit comments

Comments
 (0)