FeatHub provides HiveSource
to read data from a Hive table and HiveSink
to
materialize feature view to a Hive table.
- Flink1: Streaming Scan, Streaming Append
- Only supports Hadoop 3.1.x
- Flink: CSV, Parquet
Before using Hive connector in Flink Processor, make sure you have configured the hadoop and hive environment on both your Flink cluster and PyFlink client. Please refer to Flink's document for configuration guidelines.
Here are the examples of using HiveSource
and HiveSink
:
feature_view = DerivedFeatureView(...)
hive_sink = HiveSink(
database="default",
table="table",
hive_catalog_conf_dir=".",
data_format="parquet",
processor_specific_props={
'sink.partition-commit.policy.kind': 'metastore,success-file',
}
)
feathub_client.materialize_features(
feature_descriptor=feature_view,
sink=sink,
allow_overwrite=True,
).wait(30000)
schema = (
Schema.new_builder()
...
.build()
)
source = HiveSource(
name="hive_source",
database="default",
table="table",
schema=schema,
data_format="parquet",
keys=["key"],
hive_catalog_conf_dir=".",
timestamp_field="timestamp",
timestamp_format="%Y-%m-%d %H:%M:%S %z",
)
feature_view = DerivedFeatureView(
name="feature_view",
source=source,
features=[
...
],
)