Skip to content

Latest commit

 

History

History
78 lines (60 loc) · 1.64 KB

hive.md

File metadata and controls

78 lines (60 loc) · 1.64 KB

Hive

FeatHub provides HiveSource to read data from a Hive table and HiveSink to materialize feature view to a Hive table.

Supported Processors and Modes

  • Flink1: Streaming Scan, Streaming Append
  1. Only supports Hadoop 3.1.x

Supported format

  • Flink: CSV, Parquet

Configuring Hive connector for FlinkProcessor

Before using Hive connector in Flink Processor, make sure you have configured the hadoop and hive environment on both your Flink cluster and PyFlink client. Please refer to Flink's document for configuration guidelines.

Examples

Here are the examples of using HiveSource and HiveSink:

Use as Streaming Append Sink

feature_view = DerivedFeatureView(...)

hive_sink = HiveSink(
    database="default",
    table="table",
    hive_catalog_conf_dir=".",
    data_format="parquet",
    processor_specific_props={
        'sink.partition-commit.policy.kind': 'metastore,success-file',
    }
)

feathub_client.materialize_features(
    feature_descriptor=feature_view,
    sink=sink,
    allow_overwrite=True,
).wait(30000)

Use as Streaming Scan Source

schema = (
    Schema.new_builder()
    ...
    .build()
)

source = HiveSource(
    name="hive_source",
    database="default",
    table="table",
    schema=schema,
    data_format="parquet",
    keys=["key"],
    hive_catalog_conf_dir=".",
    timestamp_field="timestamp",
    timestamp_format="%Y-%m-%d %H:%M:%S %z",
)

feature_view = DerivedFeatureView(
    name="feature_view",
    source=source,
    features=[
        ...
    ],
)