Skip to content

datafusion-contrib/datafusion-java

Repository files navigation

datafusion-java

Build Release Maven metadata URL

A Java binding to Apache Arrow DataFusion

Status

This project is still a work in progress, and it currently works with Arrow 14.0 and DataFusion version 25.0. It is built and verified in CI against Java 11 and 21. You may check out the docker run instructions where Java 21 jshell is used to run interactively.

How to use in your code

The artifacts are published to maven central, so you can use datafusion-java like any normal Java library:

dependencies {
    implementation(
        group = "io.github.datafusion-contrib",
        name = "datafusion-java",
        version = "0.16.0" // or latest version, checkout https://github.com/datafusion-contrib/datafusion-java/releases
    )
}

To test it out, you can use this piece of demo code:

DataFusionDemo.java
package com.me;

import org.apache.arrow.datafusion.DataFrame;
import org.apache.arrow.datafusion.SessionContext;
import org.apache.arrow.datafusion.SessionContexts;

public class DataFusionDemo {

    public static void main(String[] args) throws Exception {
        try (SessionContext sessionContext = SessionContexts.create()) {
            sessionContext.sql("select sqrt(65536)").thenCompose(DataFrame::show).join();
        }
    }
}
build.gradle.kts
plugins {
  java
  application
}

repositories {
  mavenCentral()
  google()
}

tasks {
  application {
    mainClass.set("com.me.DataFusionDemo")
  }
}

dependencies {
  implementation(
    group = "io.github.datafusion-contrib",
    name = "datafusion-java",
    version = "0.16.0"
  )
}
Run result
$ ./gradlew run
...
> Task :compileKotlin UP-TO-DATE
> Task :compileJava UP-TO-DATE
> Task :processResources NO-SOURCE
> Task :classes UP-TO-DATE

> Task :run
successfully created tokio runtime
+--------------------+
| sqrt(Int64(65536)) |
+--------------------+
| 256                |
+--------------------+
successfully shutdown tokio runtime

BUILD SUCCESSFUL in 2s
3 actionable tasks: 1 executed, 2 up-to-date
16:43:34: Execution finished 'run'.

How to run the interactive demo

1. Run using Docker (with jshell)

First build the docker image:

docker build -t datafusion-example .

Then you can run the example program using Docker:

docker run --rm -it datafusion-example

Or start an interactive jshell session:

docker run --rm -it datafusion-example jshell
Example jshell session
Jan 11, 2024 1:49:28 AM java.util.prefs.FileSystemPreferences$1 run
INFO: Created user preferences directory.
|  Welcome to JShell -- Version 21
|  For an introduction type: /help intro

jshell> import org.apache.arrow.datafusion.*

jshell> var context = SessionContexts.create()
01:41:05.586 [main] DEBUG org.apache.arrow.datafusion.JNILoader -- successfully loaded datafusion_jni from library path
01:41:05.589 [main] DEBUG org.apache.arrow.datafusion.JNILoader -- datafusion_jni already loaded, returning
01:41:05.590 [main] DEBUG org.apache.arrow.datafusion.AbstractProxy -- Obtaining DefaultSessionContext@7f58383b8db0
01:41:05.591 [main] DEBUG org.apache.arrow.datafusion.AbstractProxy -- Obtaining TokioRuntime@7f58383ce110
context ==> org.apache.arrow.datafusion.DefaultSessionContext@2d209079

jshell> var df = context.sql("select 1.1 + cos(2.0)").join()
01:41:10.961 [main] DEBUG org.apache.arrow.datafusion.AbstractProxy -- Obtaining DefaultDataFrame@7f5838209100
df ==> org.apache.arrow.datafusion.DefaultDataFrame@34ce8af7

jshell> import org.apache.arrow.memory.*

jshell> var allocator = new RootAllocator()
01:41:22.521 [main] INFO org.apache.arrow.memory.BaseAllocator -- Debug mode disabled. Enable with the VM option -Darrow.memory.debug.allocator=true.
01:41:22.525 [main] INFO org.apache.arrow.memory.DefaultAllocationManagerOption -- allocation manager type not specified, using netty as the default type
01:41:22.525 [main] INFO org.apache.arrow.memory.CheckAllocator -- Using DefaultAllocationManager at memory-unsafe-14.0.2.jar!/org/apache/arrow/memory/DefaultAllocationManagerFactory.class
01:41:22.531 [main] DEBUG org.apache.arrow.memory.util.MemoryUtil -- Constructor for direct buffer found and made accessible
01:41:22.536 [main] DEBUG org.apache.arrow.memory.util.MemoryUtil -- direct buffer constructor: available
01:41:22.537 [main] DEBUG org.apache.arrow.memory.rounding.DefaultRoundingPolicy -- -Dorg.apache.memory.allocator.pageSize: 8192
01:41:22.537 [main] DEBUG org.apache.arrow.memory.rounding.DefaultRoundingPolicy -- -Dorg.apache.memory.allocator.maxOrder: 11
allocator ==> Allocator(ROOT) 0/0/0/9223372036854775807 (res/actual/peak/limit)


jshell> var r = df.collect(allocator).join()
01:41:29.635 [main] INFO org.apache.arrow.datafusion.DefaultDataFrame -- successfully completed with arr length=610
r ==> org.apache.arrow.vector.ipc.ArrowFileReader@7ac7a4e4

jshell> var root = r.getVectorSchemaRoot()
01:41:34.658 [main] DEBUG org.apache.arrow.vector.ipc.ReadChannel -- Reading buffer with size: 10
01:41:34.661 [main] DEBUG org.apache.arrow.vector.ipc.ArrowFileReader -- Footer starts at 416, length: 184
01:41:34.661 [main] DEBUG org.apache.arrow.vector.ipc.ReadChannel -- Reading buffer with size: 184
root ==> org.apache.arrow.vector.VectorSchemaRoot@6cd28fa7

jshell> r.loadNextBatch()
01:41:39.421 [main] DEBUG org.apache.arrow.vector.ipc.ArrowFileReader -- RecordBatch at 200, metadata: 192, body: 16
01:41:39.423 [main] DEBUG org.apache.arrow.vector.ipc.ReadChannel -- Reading buffer with size: 208
01:41:39.424 [main] DEBUG org.apache.arrow.vector.ipc.message.ArrowRecordBatch -- Buffer in RecordBatch at 0, length: 1
01:41:39.425 [main] DEBUG org.apache.arrow.vector.ipc.message.ArrowRecordBatch -- Buffer in RecordBatch at 8, length: 8
$8 ==> true

jshell> var v = root.getVector(0)
v ==> [0.6838531634528577]

2. Build from source

Note you must have a local Rust and Java environment setup.

Run the example in one line:

./gradlew run

Or roll your own test example:

import org.apache.arrow.datafusion.DataFrame;
import org.apache.arrow.datafusion.SessionContext;
import org.apache.arrow.datafusion.SessionContexts;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.Float8Vector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.ipc.ArrowReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

public class ExampleMain {

    private static final Logger logger = LoggerFactory.getLogger(ExampleMain.class);

    public static void main(String[] args) throws Exception {
        try (SessionContext sessionContext = SessionContexts.create(); BufferAllocator allocator = new RootAllocator()) {
            DataFrame dataFrame = sessionContext.sql("select 1.5 + sqrt(2.0)").get();
            dataFrame.collect(allocator).thenAccept(ExampleMain::onReaderResult).get();
        }
    }

    private static void onReaderResult(ArrowReader reader) {
        try {
            VectorSchemaRoot root = reader.getVectorSchemaRoot();
            while (reader.loadNextBatch()) {
                Float8Vector vector = (Float8Vector) root.getVector(0);
                for (int i = 0; i < root.getRowCount(); i += 1) {
                    logger.info("value {}={}", i, vector.getValueAsDouble(i));
                }
            }
            // close to release resource
            reader.close();
        } catch (IOException e) {
            logger.warn("got IO Exception", e);
        }
    }
}

To build the library:

./gradlew build