Skip to content

Commit

Permalink
refactor(cdc): refactor parsing of non-builtin Postgres data types (#…
Browse files Browse the repository at this point in the history
…16589)

Co-authored-by: Xiangjin <[email protected]>
  • Loading branch information
StrikeW and xiangjinwu committed May 10, 2024
1 parent 2dd6b73 commit f343eba
Show file tree
Hide file tree
Showing 4 changed files with 270 additions and 187 deletions.
4 changes: 3 additions & 1 deletion src/connector/src/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use self::avro::AvroAccessBuilder;
use self::bytes_parser::BytesAccessBuilder;
pub use self::mysql::mysql_row_to_owned_row;
use self::plain_parser::PlainParser;
pub use self::postgres::{postgres_row_to_owned_row, EnumString};
pub use self::postgres::postgres_row_to_owned_row;
use self::simd_json_parser::DebeziumJsonAccessBuilder;
pub use self::unified::json::TimestamptzHandling;
use self::unified::AccessImpl;
Expand Down Expand Up @@ -75,7 +75,9 @@ mod maxwell;
mod mysql;
pub mod plain_parser;
mod postgres;

mod protobuf;
pub mod scalar_adapter;
mod unified;
mod upsert_parser;
mod util;
Expand Down
163 changes: 46 additions & 117 deletions src/connector/src/parser/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,21 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::str::FromStr;
use std::sync::LazyLock;

use bytes::BytesMut;
use chrono::{NaiveDate, Utc};
use pg_bigdecimal::PgNumeric;
use risingwave_common::catalog::Schema;
use risingwave_common::log::LogSuppresser;
use risingwave_common::row::OwnedRow;
use risingwave_common::types::{
DataType, Date, Decimal, Int256, Interval, JsonbVal, ListValue, ScalarImpl, Time, Timestamp,
DataType, Date, Decimal, Interval, JsonbVal, ListValue, ScalarImpl, Time, Timestamp,
Timestamptz,
};
use rust_decimal::Decimal as RustDecimal;
use thiserror_ext::AsReport;
use tokio_postgres::types::{to_sql_checked, FromSql, IsNull, Kind, ToSql, Type};
use tokio_postgres::types::{Kind, Type};

use crate::parser::scalar_adapter::ScalarAdapter;
use crate::parser::util::log_error;

static LOG_SUPPERSSER: LazyLock<LogSuppresser> = LazyLock::new(LogSuppresser::default);
Expand Down Expand Up @@ -121,9 +119,9 @@ pub fn postgres_row_to_owned_row(row: tokio_postgres::Row, schema: &Schema) -> O
// we use the PgNumeric type to convert the decimal to a string.
// Then we convert the string to Int256.
// Note: It's only used to map the numeric type in upstream Postgres to RisingWave's rw_int256.
let res = row.try_get::<_, Option<PgNumeric>>(i);
let res = row.try_get::<_, Option<ScalarAdapter<'_>>>(i);
match res {
Ok(val) => pg_numeric_to_rw_int256(val, name),
Ok(val) => val.and_then(|v| v.into_scalar(DataType::Int256)),
Err(err) => {
log_error!(name, err, "parse numeric column as pg_numeric failed");
None
Expand All @@ -133,9 +131,9 @@ pub fn postgres_row_to_owned_row(row: tokio_postgres::Row, schema: &Schema) -> O
DataType::Varchar => {
if let Kind::Enum(_) = row.columns()[i].type_().kind() {
// enum type needs to be handled separately
let res = row.try_get::<_, Option<EnumString>>(i);
let res = row.try_get::<_, Option<ScalarAdapter<'_>>>(i);
match res {
Ok(val) => val.map(|v| ScalarImpl::from(v.0)),
Ok(val) => val.and_then(|v| v.into_scalar(DataType::Varchar)),
Err(err) => {
log_error!(name, err, "parse enum column failed");
None
Expand All @@ -145,9 +143,9 @@ pub fn postgres_row_to_owned_row(row: tokio_postgres::Row, schema: &Schema) -> O
match *row.columns()[i].type_() {
// Since we don't support UUID natively, adapt it to a VARCHAR column
Type::UUID => {
let res = row.try_get::<_, Option<uuid::Uuid>>(i);
let res = row.try_get::<_, Option<ScalarAdapter<'_>>>(i);
match res {
Ok(val) => val.map(|v| ScalarImpl::from(v.to_string())),
Ok(val) => val.and_then(|v| v.into_scalar(DataType::Varchar)),
Err(err) => {
log_error!(name, err, "parse uuid column failed");
None
Expand All @@ -159,9 +157,9 @@ pub fn postgres_row_to_owned_row(row: tokio_postgres::Row, schema: &Schema) -> O
// Currently in order to handle the decimal beyond RustDecimal,
// we use the PgNumeric type to convert the decimal to a string.
// Note: It's only used to map the numeric type in upstream Postgres to RisingWave's varchar.
let res = row.try_get::<_, Option<PgNumeric>>(i);
let res = row.try_get::<_, Option<ScalarAdapter<'_>>>(i);
match res {
Ok(val) => pg_numeric_to_varchar(val),
Ok(val) => val.and_then(|v| v.into_scalar(DataType::Varchar)),
Err(err) => {
log_error!(
name,
Expand Down Expand Up @@ -216,13 +214,13 @@ pub fn postgres_row_to_owned_row(row: tokio_postgres::Row, schema: &Schema) -> O
// As `NULL` in enum list is not supported in Debezium, we use `EnumString`
// instead of `Option<EnumString>` to handle enum to keep the behaviors aligned.
// An enum list contains `NULL` will be converted to `NULL`.
let res = row.try_get::<_, Option<Vec<EnumString>>>(i);
let res = row.try_get::<_, Option<Vec<ScalarAdapter<'_>>>>(i);
match res {
Ok(val) => {
if let Some(v) = val {
v.into_iter().for_each(|val| {
builder.append(Some(ScalarImpl::from(val.0)));
});
if let Some(vec) = val {
for val in vec {
builder.append(val.into_scalar(DataType::Varchar))
}
}
Some(ScalarImpl::from(ListValue::new(builder.finish())))
}
Expand Down Expand Up @@ -261,16 +259,18 @@ pub fn postgres_row_to_owned_row(row: tokio_postgres::Row, schema: &Schema) -> O
match *row.columns()[i].type_() {
// Since we don't support UUID natively, adapt it to a VARCHAR column
Type::UUID_ARRAY => {
let res =
row.try_get::<_, Option<Vec<Option<uuid::Uuid>>>>(i);
let res = row
.try_get::<_, Option<Vec<Option<ScalarAdapter<'_>>>>>(
i,
);
match res {
Ok(val) => {
if let Some(v) = val {
v.into_iter().for_each(|val| {
builder.append(val.map(|v| {
ScalarImpl::from(v.to_string())
if let Some(vec) = val {
for val in vec {
builder.append(val.and_then(|v| {
v.into_scalar(DataType::Varchar)
}))
});
}
}
}
Err(err) => {
Expand All @@ -279,14 +279,18 @@ pub fn postgres_row_to_owned_row(row: tokio_postgres::Row, schema: &Schema) -> O
};
}
Type::NUMERIC_ARRAY => {
let res =
row.try_get::<_, Option<Vec<Option<PgNumeric>>>>(i);
let res = row
.try_get::<_, Option<Vec<Option<ScalarAdapter<'_>>>>>(
i,
);
match res {
Ok(val) => {
if let Some(v) = val {
v.into_iter().for_each(|val| {
builder.append(pg_numeric_to_varchar(val))
});
if let Some(vec) = val {
for val in vec {
builder.append(val.and_then(|v| {
v.into_scalar(DataType::Varchar)
}))
}
}
}
Err(err) => {
Expand Down Expand Up @@ -364,13 +368,18 @@ pub fn postgres_row_to_owned_row(row: tokio_postgres::Row, schema: &Schema) -> O
}
}
DataType::Int256 => {
let res = row.try_get::<_, Option<Vec<Option<PgNumeric>>>>(i);
let res =
row.try_get::<_, Option<Vec<Option<ScalarAdapter<'_>>>>>(i);
match res {
Ok(val) => {
if let Some(v) = val {
v.into_iter().for_each(|val| {
builder.append(pg_numeric_to_rw_int256(val, name))
});
if let Some(vec) = val {
for val in vec {
builder.append(
val.and_then(|v| {
v.into_scalar(DataType::Int256)
}),
)
}
}
}
Err(err) => {
Expand Down Expand Up @@ -404,91 +413,11 @@ pub fn postgres_row_to_owned_row(row: tokio_postgres::Row, schema: &Schema) -> O
OwnedRow::new(datums)
}

fn pg_numeric_to_rw_int256(val: Option<PgNumeric>, name: &str) -> Option<ScalarImpl> {
let string = pg_numeric_to_string(val)?;
match Int256::from_str(string.as_str()) {
Ok(num) => Some(ScalarImpl::from(num)),
Err(err) => {
log_error!(name, err, "parse numeric string as rw_int256 failed");
None
}
}
}

fn pg_numeric_to_varchar(val: Option<PgNumeric>) -> Option<ScalarImpl> {
pg_numeric_to_string(val).map(ScalarImpl::from)
}

fn pg_numeric_to_string(val: Option<PgNumeric>) -> Option<String> {
if let Some(pg_numeric) = val {
// TODO(kexiang): NEGATIVE_INFINITY -> -Infinity, POSITIVE_INFINITY -> Infinity, NAN -> NaN
// The current implementation is to ensure consistency with the behavior of cdc event parsor.
match pg_numeric {
PgNumeric::NegativeInf => Some(String::from("NEGATIVE_INFINITY")),
PgNumeric::Normalized(big_decimal) => Some(big_decimal.to_string()),
PgNumeric::PositiveInf => Some(String::from("POSITIVE_INFINITY")),
PgNumeric::NaN => Some(String::from("NAN")),
}
} else {
// NULL
None
}
}

#[derive(Clone, Debug)]
pub struct EnumString(pub String);

impl<'a> FromSql<'a> for EnumString {
fn from_sql(
_ty: &Type,
raw: &'a [u8],
) -> Result<Self, Box<dyn std::error::Error + 'static + Sync + Send>> {
Ok(EnumString(String::from_utf8_lossy(raw).into_owned()))
}

fn accepts(ty: &Type) -> bool {
matches!(ty.kind(), Kind::Enum(_))
}
}

impl ToSql for EnumString {
to_sql_checked!();

fn accepts(ty: &Type) -> bool {
matches!(ty.kind(), Kind::Enum(_))
}

fn to_sql(
&self,
ty: &Type,
out: &mut BytesMut,
) -> Result<IsNull, Box<dyn std::error::Error + Sync + Send>>
where
Self: Sized,
{
match ty.kind() {
Kind::Enum(e) => {
if e.contains(&self.0) {
out.extend_from_slice(self.0.as_bytes());
Ok(IsNull::No)
} else {
Err(format!(
"EnumString value {} is not in the enum type {:?}",
self.0, e
)
.into())
}
}
_ => Err("EnumString can only be used with ENUM types".into()),
}
}
}

#[cfg(test)]
mod tests {
use tokio_postgres::NoTls;

use crate::parser::postgres::EnumString;
use crate::parser::scalar_adapter::EnumString;
const DB: &str = "postgres";
const USER: &str = "kexiang";

Expand Down

0 comments on commit f343eba

Please sign in to comment.