FeatHub provides HiveSource to read data from a Hive table and HiveSink to
materialize feature view to a Hive table.
- Flink1: Streaming Scan, Streaming Append
- Only supports Hadoop 3.1.x
- Flink: CSV, Parquet
Before using Hive connector in Flink Processor, make sure you have configured the hadoop and hive environment on both your Flink cluster and PyFlink client. Please refer to Flink's document for configuration guidelines.
Here are the examples of using HiveSource and HiveSink:
feature_view = DerivedFeatureView(...)
hive_sink = HiveSink(
database="default",
table="table",
hive_catalog_conf_dir=".",
data_format="parquet",
processor_specific_props={
'sink.partition-commit.policy.kind': 'metastore,success-file',
}
)
feathub_client.materialize_features(
feature_descriptor=feature_view,
sink=sink,
allow_overwrite=True,
).wait(30000)schema = (
Schema.new_builder()
...
.build()
)
source = HiveSource(
name="hive_source",
database="default",
table="table",
schema=schema,
data_format="parquet",
keys=["key"],
hive_catalog_conf_dir=".",
timestamp_field="timestamp",
timestamp_format="%Y-%m-%d %H:%M:%S %z",
)
feature_view = DerivedFeatureView(
name="feature_view",
source=source,
features=[
...
],
)