Open
Description
Query engine
Spark
Question
I am using a REST catalog, minio and spark iceberg on docker to read from Kafka topic, make some transformations and write it out as iceberg tables. I am able to print the transformed data to the console but throws an error when I write to Iceberg. I can see metadata in minio
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, min, max, avg, window
from pyspark.sql.types import StructField, StructType, DoubleType, StringType, TimestampType
if __name__ == "__main__":
spark = SparkSession.builder.appName("Kafka-Iceberg-Stream-Processor").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
schema = StructType([
StructField("BTC_EUR", DoubleType(), True),
StructField("BTC_INR", DoubleType(), True),
StructField("BTC_USD", DoubleType(), True),
StructField("ETH_EUR", DoubleType(), True),
StructField("ETH_INR", DoubleType(), True),
StructField("ETH_USD", DoubleType(), True),
StructField("timestamp", TimestampType(), True)
])
kafka_df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "192.168.1.16:9092") \
.option("subscribe", "crypto") \
.option("startingOffsets", "earliest") \
.load()
parsed_df = kafka_df.selectExpr(
"CAST(value AS STRING) AS value",
"timestamp"
).withColumn("parsed_json", from_json(col("value"), schema)) \
.select(
col("parsed_json.BTC_EUR").alias("BTC_EUR"),
col("parsed_json.BTC_INR").alias("BTC_INR"),
col("parsed_json.BTC_USD").alias("BTC_USD"),
col("parsed_json.ETH_EUR").alias("ETH_EUR"),
col("parsed_json.ETH_INR").alias("ETH_INR"),
col("parsed_json.ETH_USD").alias("ETH_USD"),
col("parsed_json.timestamp").alias("event_time"),
col("timestamp").alias("processing_time")
)
windowed_df = parsed_df \
.withWatermark("event_time", "1 minute") \
.groupBy(window(col("event_time"), "5 minutes", "5 minutes")) \
.agg(
max("BTC_EUR").alias("max_BTC_EUR"),
min("BTC_EUR").alias("min_BTC_EUR"),
avg("BTC_EUR").alias("avg_BTC_EUR"),
max("BTC_INR").alias("max_BTC_INR"),
min("BTC_INR").alias("min_BTC_INR"),
avg("BTC_INR").alias("avg_BTC_INR"),
max("BTC_USD").alias("max_BTC_USD"),
min("BTC_USD").alias("min_BTC_USD"),
avg("BTC_USD").alias("avg_BTC_USD"),
max("ETH_EUR").alias("max_ETH_EUR"),
min("ETH_EUR").alias("min_ETH_EUR"),
avg("ETH_EUR").alias("avg_ETH_EUR"),
max("ETH_INR").alias("max_ETH_INR"),
min("ETH_INR").alias("min_ETH_INR"),
avg("ETH_INR").alias("avg_ETH_INR"),
max("ETH_USD").alias("max_ETH_USD"),
min("ETH_USD").alias("min_ETH_USD"),
avg("ETH_USD").alias("avg_ETH_USD")
).withColumn("window_start", col("window.start")) \
.withColumn("window_end", col("window.end")) \
.drop("window") \
.select("window_start",
"window_end",
"max_BTC_EUR",
"min_BTC_EUR",
"avg_BTC_EUR",
"max_BTC_INR",
"min_BTC_INR",
"avg_BTC_INR",
"max_BTC_USD",
"min_BTC_USD",
"avg_BTC_USD",
"max_ETH_EUR",
"min_ETH_EUR",
"avg_ETH_EUR",
"max_ETH_INR",
"min_ETH_INR",
"avg_ETH_INR",
"max_ETH_USD",
"min_ETH_USD",
"avg_ETH_USD")
# Create the table if it doesn't exist
spark.sql("""
CREATE TABLE IF NOT EXISTS db.crypto_metrics4 (
window_start TIMESTAMP,
window_end TIMESTAMP,
max_BTC_EUR DOUBLE,
min_BTC_EUR DOUBLE,
avg_BTC_EUR DOUBLE,
max_BTC_INR DOUBLE,
min_BTC_INR DOUBLE,
avg_BTC_INR DOUBLE,
max_BTC_USD DOUBLE,
min_BTC_USD DOUBLE,
avg_BTC_USD DOUBLE,
max_ETH_EUR DOUBLE,
min_ETH_EUR DOUBLE,
avg_ETH_EUR DOUBLE,
max_ETH_INR DOUBLE,
min_ETH_INR DOUBLE,
avg_ETH_INR DOUBLE,
max_ETH_USD DOUBLE,
min_ETH_USD DOUBLE,
avg_ETH_USD DOUBLE
) USING iceberg
""")
print(spark.sql("SHOW TABLES IN rest.db").show())
print(windowed_df.printSchema())
# Write stream data to Iceberg
query = windowed_df.writeStream \
.outputMode("complete") \
.format("iceberg") \
.option("path", "s3://warehouse/crypto_metrics4") \
.option("checkpointLocation", "/tmp/spark/checkpoints/crypto_metrics4") \
.trigger(processingTime="1 minute") \
.start()
query.awaitTermination()
And I am submitting the spark job along with configurations as follows
spark-submit \
--packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.7.1,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.4 \
--conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
--conf spark.sql.catalog.rest=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.rest.type=rest \
--conf spark.sql.catalog.rest.uri=http://rest:8181 \
--conf spark.sql.catalog.rest=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.rest.warehouse=s3://warehouse \
--conf spark.sql.defaultCatalog=rest \
test.py
Error
py4j.protocol.Py4JJavaError: An error occurred while calling o181.start.
: org.apache.iceberg.exceptions.RuntimeIOException: Failed to get file system for path: s3://warehouse/crypto_metrics4/metadata/version-hint.text
at org.apache.iceberg.hadoop.Util.getFs(Util.java:58)
at org.apache.iceberg.hadoop.HadoopTableOperations.getFileSystem(HadoopTableOperations.java:412)
at org.apache.iceberg.hadoop.HadoopTableOperations.findVersion(HadoopTableOperations.java:315)
at org.apache.iceberg.hadoop.HadoopTableOperations.refresh(HadoopTableOperations.java:104)
at org.apache.iceberg.hadoop.HadoopTableOperations.current(HadoopTableOperations.java:84)
at org.apache.iceberg.hadoop.HadoopTables.load(HadoopTables.java:94)
at org.apache.iceberg.spark.SparkCatalog.loadFromPathIdentifier(SparkCatalog.java:972)
at org.apache.iceberg.spark.SparkCatalog.load(SparkCatalog.java:839)
at org.apache.iceberg.spark.SparkCatalog.loadTable(SparkCatalog.java:170)
at org.apache.iceberg.spark.source.IcebergSource.getTable(IcebergSource.java:107)
at org.apache.iceberg.spark.source.IcebergSource.inferPartitioning(IcebergSource.java:90)
at org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils$.getTableFromProvider(DataSourceV2Utils.scala:82)
at org.apache.spark.sql.streaming.DataStreamWriter.startInternal(DataStreamWriter.scala:399)
at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:251)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "s3"
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3443)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3466)
at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
at org.apache.iceberg.hadoop.Util.getFs(Util.java:56)
... 25 more
What am I possibly doing wrong?