Skip to content

Field not found in source schema #11843

Open
@rohitanil

Description

@rohitanil

Query engine

Spark

Question

I am using a REST catalog, minio and spark iceberg on docker to read from Kafka topic, make some transformations and write it out as iceberg tables. I am able to print the transformed data to the console but throws an error when I write to Iceberg. I can see metadata in minio

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, min, max, avg, window
from pyspark.sql.types import StructField, StructType, DoubleType, StringType, TimestampType

if __name__ == "__main__":
    spark = SparkSession.builder.appName("Kafka-Iceberg-Stream-Processor").getOrCreate()
    spark.sparkContext.setLogLevel("ERROR")

    schema = StructType([
        StructField("BTC_EUR", DoubleType(), True),
        StructField("BTC_INR", DoubleType(), True),
        StructField("BTC_USD", DoubleType(), True),
        StructField("ETH_EUR", DoubleType(), True),
        StructField("ETH_INR", DoubleType(), True),
        StructField("ETH_USD", DoubleType(), True),
        StructField("timestamp", TimestampType(), True)
    ])

    kafka_df = spark.readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "192.168.1.16:9092") \
        .option("subscribe", "crypto") \
        .option("startingOffsets", "earliest") \
        .load()

    parsed_df = kafka_df.selectExpr(
        "CAST(value AS STRING) AS value",
        "timestamp"
    ).withColumn("parsed_json", from_json(col("value"), schema)) \
        .select(
        col("parsed_json.BTC_EUR").alias("BTC_EUR"),
        col("parsed_json.BTC_INR").alias("BTC_INR"),
        col("parsed_json.BTC_USD").alias("BTC_USD"),
        col("parsed_json.ETH_EUR").alias("ETH_EUR"),
        col("parsed_json.ETH_INR").alias("ETH_INR"),
        col("parsed_json.ETH_USD").alias("ETH_USD"),
        col("parsed_json.timestamp").alias("event_time"),
        col("timestamp").alias("processing_time")
    )

    windowed_df = parsed_df \
        .withWatermark("event_time", "1 minute") \
        .groupBy(window(col("event_time"), "5 minutes", "5 minutes")) \
        .agg(
        max("BTC_EUR").alias("max_BTC_EUR"),
        min("BTC_EUR").alias("min_BTC_EUR"),
        avg("BTC_EUR").alias("avg_BTC_EUR"),
        max("BTC_INR").alias("max_BTC_INR"),
        min("BTC_INR").alias("min_BTC_INR"),
        avg("BTC_INR").alias("avg_BTC_INR"),
        max("BTC_USD").alias("max_BTC_USD"),
        min("BTC_USD").alias("min_BTC_USD"),
        avg("BTC_USD").alias("avg_BTC_USD"),
        max("ETH_EUR").alias("max_ETH_EUR"),
        min("ETH_EUR").alias("min_ETH_EUR"),
        avg("ETH_EUR").alias("avg_ETH_EUR"),
        max("ETH_INR").alias("max_ETH_INR"),
        min("ETH_INR").alias("min_ETH_INR"),
        avg("ETH_INR").alias("avg_ETH_INR"),
        max("ETH_USD").alias("max_ETH_USD"),
        min("ETH_USD").alias("min_ETH_USD"),
        avg("ETH_USD").alias("avg_ETH_USD")
    ).withColumn("window_start", col("window.start")) \
        .withColumn("window_end", col("window.end")) \
        .drop("window") \
        .select("window_start",
                "window_end",
                "max_BTC_EUR",
                "min_BTC_EUR",
                "avg_BTC_EUR",
                "max_BTC_INR",
                "min_BTC_INR",
                "avg_BTC_INR",
                "max_BTC_USD",
                "min_BTC_USD",
                "avg_BTC_USD",
                "max_ETH_EUR",
                "min_ETH_EUR",
                "avg_ETH_EUR",
                "max_ETH_INR",
                "min_ETH_INR",
                "avg_ETH_INR",
                "max_ETH_USD",
                "min_ETH_USD",
                "avg_ETH_USD")

    # Create the table if it doesn't exist
    spark.sql("""
        CREATE TABLE IF NOT EXISTS db.crypto_metrics4 (
            window_start TIMESTAMP,
            window_end TIMESTAMP,
            max_BTC_EUR DOUBLE,
            min_BTC_EUR DOUBLE,
            avg_BTC_EUR DOUBLE,
            max_BTC_INR DOUBLE,
            min_BTC_INR DOUBLE,
            avg_BTC_INR DOUBLE,
            max_BTC_USD DOUBLE,
            min_BTC_USD DOUBLE,
            avg_BTC_USD DOUBLE,
            max_ETH_EUR DOUBLE,
            min_ETH_EUR DOUBLE,
            avg_ETH_EUR DOUBLE,
            max_ETH_INR DOUBLE,
            min_ETH_INR DOUBLE,
            avg_ETH_INR DOUBLE,
            max_ETH_USD DOUBLE,
            min_ETH_USD DOUBLE,
            avg_ETH_USD DOUBLE
        ) USING iceberg
    """)

    print(spark.sql("SHOW TABLES IN rest.db").show())
    print(windowed_df.printSchema())
    # Write stream data to Iceberg
    query = windowed_df.writeStream \
        .outputMode("complete") \
        .format("iceberg") \
        .option("path", "s3://warehouse/crypto_metrics4") \
        .option("checkpointLocation", "/tmp/spark/checkpoints/crypto_metrics4") \
        .trigger(processingTime="1 minute") \
        .start()

    query.awaitTermination()

And I am submitting the spark job along with configurations as follows

spark-submit \
    --packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.7.1,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.4 \
    --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
    --conf spark.sql.catalog.rest=org.apache.iceberg.spark.SparkCatalog \
    --conf spark.sql.catalog.rest.type=rest \
    --conf spark.sql.catalog.rest.uri=http://rest:8181 \
    --conf spark.sql.catalog.rest=org.apache.iceberg.spark.SparkCatalog \
    --conf spark.sql.catalog.rest.warehouse=s3://warehouse \
    --conf spark.sql.defaultCatalog=rest \
    test.py

Error

py4j.protocol.Py4JJavaError: An error occurred while calling o181.start.
: org.apache.iceberg.exceptions.RuntimeIOException: Failed to get file system for path: s3://warehouse/crypto_metrics4/metadata/version-hint.text
        at org.apache.iceberg.hadoop.Util.getFs(Util.java:58)
        at org.apache.iceberg.hadoop.HadoopTableOperations.getFileSystem(HadoopTableOperations.java:412)
        at org.apache.iceberg.hadoop.HadoopTableOperations.findVersion(HadoopTableOperations.java:315)
        at org.apache.iceberg.hadoop.HadoopTableOperations.refresh(HadoopTableOperations.java:104)
        at org.apache.iceberg.hadoop.HadoopTableOperations.current(HadoopTableOperations.java:84)
        at org.apache.iceberg.hadoop.HadoopTables.load(HadoopTables.java:94)
        at org.apache.iceberg.spark.SparkCatalog.loadFromPathIdentifier(SparkCatalog.java:972)
        at org.apache.iceberg.spark.SparkCatalog.load(SparkCatalog.java:839)
        at org.apache.iceberg.spark.SparkCatalog.loadTable(SparkCatalog.java:170)
        at org.apache.iceberg.spark.source.IcebergSource.getTable(IcebergSource.java:107)
        at org.apache.iceberg.spark.source.IcebergSource.inferPartitioning(IcebergSource.java:90)
        at org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils$.getTableFromProvider(DataSourceV2Utils.scala:82)
        at org.apache.spark.sql.streaming.DataStreamWriter.startInternal(DataStreamWriter.scala:399)
        at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:251)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
        at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "s3"
        at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3443)
        at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3466)
        at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174)
        at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574)
        at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521)
        at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540)
        at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
        at org.apache.iceberg.hadoop.Util.getFs(Util.java:56)
        ... 25 more

What am I possibly doing wrong?

Metadata

Metadata

Assignees

No one assigned

    Labels

    questionFurther information is requestedstale

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions