Skip to content

Commit 38733f8

Browse files
committedAug 6, 2024
Flink: adjust code for the new 1.20 module.
also fixed the bug of missing jmh in the 1.19 module.
1 parent 0d8f2c4 commit 38733f8

File tree

11 files changed

+70
-66
lines changed

11 files changed

+70
-66
lines changed
 

‎.github/workflows/flink-ci.yml

+1-8
Original file line numberDiff line numberDiff line change
@@ -74,14 +74,7 @@ jobs:
7474
strategy:
7575
matrix:
7676
jvm: [11, 17, 21]
77-
flink: ['1.17', '1.18', '1.19']
78-
exclude:
79-
# Flink 1.17 does not support Java 17.
80-
- jvm: 17
81-
flink: '1.17'
82-
# Flink 1.17 does not support Java 21.
83-
- jvm: 21
84-
flink: '1.17'
77+
flink: ['1.18', '1.19', '1.20']
8578
env:
8679
SPARK_LOCAL_IP: localhost
8780
steps:

‎flink/build.gradle

+4-5
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,14 @@
1919

2020
def flinkVersions = (System.getProperty("flinkVersions") != null ? System.getProperty("flinkVersions") : System.getProperty("defaultFlinkVersions")).split(",")
2121

22-
23-
if (flinkVersions.contains("1.17")) {
24-
apply from: file("$projectDir/v1.17/build.gradle")
25-
}
26-
2722
if (flinkVersions.contains("1.18")) {
2823
apply from: file("$projectDir/v1.18/build.gradle")
2924
}
3025

3126
if (flinkVersions.contains("1.19")) {
3227
apply from: file("$projectDir/v1.19/build.gradle")
3328
}
29+
30+
if (flinkVersions.contains("1.20")) {
31+
apply from: file("$projectDir/v1.20/build.gradle")
32+
}

‎flink/v1.19/flink/src/jmh/java/org/apache/iceberg/flink/sink/shuffle/MapRangePartitionerBenchmark.java

+8-2
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.apache.iceberg.flink.sink.shuffle;
2020

21+
import java.util.Comparator;
2122
import java.util.List;
2223
import java.util.Map;
2324
import java.util.NavigableMap;
@@ -27,6 +28,8 @@
2728
import org.apache.iceberg.Schema;
2829
import org.apache.iceberg.SortKey;
2930
import org.apache.iceberg.SortOrder;
31+
import org.apache.iceberg.SortOrderComparators;
32+
import org.apache.iceberg.StructLike;
3033
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
3134
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
3235
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
@@ -66,6 +69,8 @@ public class MapRangePartitionerBenchmark {
6669
Types.NestedField.required(9, "name9", Types.StringType.get()));
6770

6871
private static final SortOrder SORT_ORDER = SortOrder.builderFor(SCHEMA).asc("id").build();
72+
private static final Comparator<StructLike> SORT_ORDER_COMPARTOR =
73+
SortOrderComparators.forSchema(SCHEMA, SORT_ORDER);
6974
private static final SortKey SORT_KEY = new SortKey(SCHEMA, SORT_ORDER);
7075

7176
private MapRangePartitioner partitioner;
@@ -82,10 +87,11 @@ public void setupBenchmark() {
8287
mapStatistics.put(sortKey, weight);
8388
});
8489

85-
MapDataStatistics dataStatistics = new MapDataStatistics(mapStatistics);
90+
MapAssignment mapAssignment =
91+
MapAssignment.fromKeyFrequency(2, mapStatistics, 0.0, SORT_ORDER_COMPARTOR);
8692
this.partitioner =
8793
new MapRangePartitioner(
88-
SCHEMA, SortOrder.builderFor(SCHEMA).asc("id").build(), dataStatistics, 2);
94+
SCHEMA, SortOrder.builderFor(SCHEMA).asc("id").build(), mapAssignment);
8995

9096
List<Integer> keys = Lists.newArrayList(weights.keySet().iterator());
9197
long[] weightsCDF = new long[keys.size()];

‎flink/v1.20/build.gradle

+18-18
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
* under the License.
1818
*/
1919

20-
String flinkMajorVersion = '1.19'
20+
String flinkMajorVersion = '1.20'
2121
String scalaVersion = System.getProperty("scalaVersion") != null ? System.getProperty("scalaVersion") : System.getProperty("defaultScalaVersion")
2222

2323
project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") {
@@ -32,15 +32,15 @@ project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") {
3232
implementation project(':iceberg-parquet')
3333
implementation project(':iceberg-hive-metastore')
3434

35-
compileOnly libs.flink119.avro
35+
compileOnly libs.flink120.avro
3636
// for dropwizard histogram metrics implementation
37-
compileOnly libs.flink119.metrics.dropwizard
38-
compileOnly libs.flink119.streaming.java
39-
compileOnly "${libs.flink119.streaming.java.get().module}:${libs.flink119.streaming.java.get().getVersion()}:tests"
40-
compileOnly libs.flink119.table.api.java.bridge
41-
compileOnly "org.apache.flink:flink-table-planner_${scalaVersion}:${libs.versions.flink119.get()}"
42-
compileOnly libs.flink119.connector.base
43-
compileOnly libs.flink119.connector.files
37+
compileOnly libs.flink120.metrics.dropwizard
38+
compileOnly libs.flink120.streaming.java
39+
compileOnly "${libs.flink120.streaming.java.get().module}:${libs.flink120.streaming.java.get().getVersion()}:tests"
40+
compileOnly libs.flink120.table.api.java.bridge
41+
compileOnly "org.apache.flink:flink-table-planner_${scalaVersion}:${libs.versions.flink120.get()}"
42+
compileOnly libs.flink120.connector.base
43+
compileOnly libs.flink120.connector.files
4444

4545
compileOnly libs.hadoop2.hdfs
4646
compileOnly libs.hadoop2.common
@@ -68,13 +68,13 @@ project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") {
6868

6969
implementation libs.datasketches
7070

71-
testImplementation libs.flink119.connector.test.utils
72-
testImplementation libs.flink119.core
73-
testImplementation libs.flink119.runtime
74-
testImplementation(libs.flink119.test.utilsjunit) {
71+
testImplementation libs.flink120.connector.test.utils
72+
testImplementation libs.flink120.core
73+
testImplementation libs.flink120.runtime
74+
testImplementation(libs.flink120.test.utilsjunit) {
7575
exclude group: 'junit'
7676
}
77-
testImplementation(libs.flink119.test.utils) {
77+
testImplementation(libs.flink120.test.utils) {
7878
exclude group: "org.apache.curator", module: 'curator-test'
7979
exclude group: 'junit'
8080
}
@@ -168,7 +168,7 @@ project(":iceberg-flink:iceberg-flink-runtime-${flinkMajorVersion}") {
168168
}
169169

170170
// for dropwizard histogram metrics implementation
171-
implementation libs.flink119.metrics.dropwizard
171+
implementation libs.flink120.metrics.dropwizard
172172

173173
// for integration testing with the flink-runtime-jar
174174
// all of those dependencies are required because the integration test extends FlinkTestBase
@@ -178,13 +178,13 @@ project(":iceberg-flink:iceberg-flink-runtime-${flinkMajorVersion}") {
178178
integrationImplementation project(path: ":iceberg-flink:iceberg-flink-${flinkMajorVersion}", configuration: "testArtifacts")
179179
integrationImplementation project(path: ':iceberg-api', configuration: 'testArtifacts')
180180
integrationImplementation project(path: ':iceberg-hive-metastore', configuration: 'testArtifacts')
181-
integrationImplementation(libs.flink119.test.utils) {
181+
integrationImplementation(libs.flink120.test.utils) {
182182
exclude group: "org.apache.curator", module: 'curator-test'
183183
exclude group: 'junit'
184184
}
185185

186-
integrationImplementation libs.flink119.table.api.java.bridge
187-
integrationImplementation "org.apache.flink:flink-table-planner_${scalaVersion}:${libs.versions.flink119.get()}"
186+
integrationImplementation libs.flink120.table.api.java.bridge
187+
integrationImplementation "org.apache.flink:flink-table-planner_${scalaVersion}:${libs.versions.flink120.get()}"
188188

189189
integrationImplementation libs.hadoop2.common
190190
integrationImplementation libs.hadoop2.hdfs

‎flink/v1.20/flink/src/jmh/java/org/apache/iceberg/flink/sink/shuffle/MapRangePartitionerBenchmark.java

+8-2
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.apache.iceberg.flink.sink.shuffle;
2020

21+
import java.util.Comparator;
2122
import java.util.List;
2223
import java.util.Map;
2324
import java.util.NavigableMap;
@@ -27,6 +28,8 @@
2728
import org.apache.iceberg.Schema;
2829
import org.apache.iceberg.SortKey;
2930
import org.apache.iceberg.SortOrder;
31+
import org.apache.iceberg.SortOrderComparators;
32+
import org.apache.iceberg.StructLike;
3033
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
3134
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
3235
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
@@ -66,6 +69,8 @@ public class MapRangePartitionerBenchmark {
6669
Types.NestedField.required(9, "name9", Types.StringType.get()));
6770

6871
private static final SortOrder SORT_ORDER = SortOrder.builderFor(SCHEMA).asc("id").build();
72+
private static final Comparator<StructLike> SORT_ORDER_COMPARTOR =
73+
SortOrderComparators.forSchema(SCHEMA, SORT_ORDER);
6974
private static final SortKey SORT_KEY = new SortKey(SCHEMA, SORT_ORDER);
7075

7176
private MapRangePartitioner partitioner;
@@ -82,10 +87,11 @@ public void setupBenchmark() {
8287
mapStatistics.put(sortKey, weight);
8388
});
8489

85-
MapDataStatistics dataStatistics = new MapDataStatistics(mapStatistics);
90+
MapAssignment mapAssignment =
91+
MapAssignment.fromKeyFrequency(2, mapStatistics, 0.0, SORT_ORDER_COMPARTOR);
8692
this.partitioner =
8793
new MapRangePartitioner(
88-
SCHEMA, SortOrder.builderFor(SCHEMA).asc("id").build(), dataStatistics, 2);
94+
SCHEMA, SortOrder.builderFor(SCHEMA).asc("id").build(), mapAssignment);
8995

9096
List<Integer> keys = Lists.newArrayList(weights.keySet().iterator());
9197
long[] weightsCDF = new long[keys.size()];

‎flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSpeculativeExecutionSupport.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ private static Configuration configure() {
165165
Configuration configuration = new Configuration();
166166
configuration.set(CoreOptions.CHECK_LEAKED_CLASSLOADER, false);
167167
configuration.set(RestOptions.BIND_PORT, "0");
168-
configuration.set(JobManagerOptions.SLOT_REQUEST_TIMEOUT, 5000L);
168+
configuration.set(JobManagerOptions.SLOT_REQUEST_TIMEOUT, Duration.ofSeconds(5));
169169

170170
// Use FLIP-27 source
171171
configuration.set(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_FLIP27_SOURCE, true);

‎flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/util/TestFlinkPackage.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ public class TestFlinkPackage {
2929
/** This unit test would need to be adjusted as new Flink version is supported. */
3030
@Test
3131
public void testVersion() {
32-
assertThat(FlinkPackage.version()).isEqualTo("1.19.0");
32+
assertThat(FlinkPackage.version()).isEqualTo("1.20.0");
3333
}
3434

3535
@Test

‎gradle.properties

+2-2
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@
1616
jmhOutputPath=build/reports/jmh/human-readable-output.txt
1717
jmhJsonOutputPath=build/reports/jmh/results.json
1818
jmhIncludeRegex=.*
19-
systemProp.defaultFlinkVersions=1.19
20-
systemProp.knownFlinkVersions=1.17,1.18,1.19
19+
systemProp.defaultFlinkVersions=1.20
20+
systemProp.knownFlinkVersions=1.18,1.19,1.20
2121
systemProp.defaultHiveVersions=2
2222
systemProp.knownHiveVersions=2,3
2323
systemProp.defaultSparkVersions=3.5

‎gradle/libs.versions.toml

+12-12
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,9 @@ delta-spark = "3.2.0"
3939
esotericsoftware-kryo = "4.0.3"
4040
errorprone-annotations = "2.29.2"
4141
findbugs-jsr305 = "3.0.2"
42-
flink117 = { strictly = "1.17.2"}
4342
flink118 = { strictly = "1.18.1"}
4443
flink119 = { strictly = "1.19.0"}
44+
flink120 = { strictly = "1.20.0"}
4545
google-libraries-bom = "26.43.0"
4646
guava = "33.2.1-jre"
4747
hadoop2 = "2.7.3"
@@ -108,12 +108,6 @@ datasketches = { module = "org.apache.datasketches:datasketches-java", version.r
108108
delta-standalone = { module = "io.delta:delta-standalone_2.12", version.ref = "delta-standalone" }
109109
errorprone-annotations = { module = "com.google.errorprone:error_prone_annotations", version.ref = "errorprone-annotations" }
110110
findbugs-jsr305 = { module = "com.google.code.findbugs:jsr305", version.ref = "findbugs-jsr305" }
111-
flink117-avro = { module = "org.apache.flink:flink-avro", version.ref = "flink117" }
112-
flink117-connector-base = { module = "org.apache.flink:flink-connector-base", version.ref = "flink117" }
113-
flink117-connector-files = { module = "org.apache.flink:flink-connector-files", version.ref = "flink117" }
114-
flink117-metrics-dropwizard = { module = "org.apache.flink:flink-metrics-dropwizard", version.ref = "flink117" }
115-
flink117-streaming-java = { module = "org.apache.flink:flink-streaming-java", version.ref = "flink117" }
116-
flink117-table-api-java-bridge = { module = "org.apache.flink:flink-table-api-java-bridge", version.ref = "flink117" }
117111
flink118-avro = { module = "org.apache.flink:flink-avro", version.ref = "flink118" }
118112
flink118-connector-base = { module = "org.apache.flink:flink-connector-base", version.ref = "flink118" }
119113
flink118-connector-files = { module = "org.apache.flink:flink-connector-files", version.ref = "flink118" }
@@ -126,6 +120,12 @@ flink119-connector-files = { module = "org.apache.flink:flink-connector-files",
126120
flink119-metrics-dropwizard = { module = "org.apache.flink:flink-metrics-dropwizard", version.ref = "flink119" }
127121
flink119-streaming-java = { module = "org.apache.flink:flink-streaming-java", version.ref = "flink119" }
128122
flink119-table-api-java-bridge = { module = "org.apache.flink:flink-table-api-java-bridge", version.ref = "flink119" }
123+
flink120-avro = { module = "org.apache.flink:flink-avro", version.ref = "flink120" }
124+
flink120-connector-base = { module = "org.apache.flink:flink-connector-base", version.ref = "flink120" }
125+
flink120-connector-files = { module = "org.apache.flink:flink-connector-files", version.ref = "flink120" }
126+
flink120-metrics-dropwizard = { module = "org.apache.flink:flink-metrics-dropwizard", version.ref = "flink120" }
127+
flink120-streaming-java = { module = "org.apache.flink:flink-streaming-java", version.ref = "flink120" }
128+
flink120-table-api-java-bridge = { module = "org.apache.flink:flink-table-api-java-bridge", version.ref = "flink120" }
129129
google-libraries-bom = { module = "com.google.cloud:libraries-bom", version.ref = "google-libraries-bom" }
130130
guava-guava = { module = "com.google.guava:guava", version.ref = "guava" }
131131
hadoop2-client = { module = "org.apache.hadoop:hadoop-client", version.ref = "hadoop2" }
@@ -180,11 +180,6 @@ assertj-core = { module = "org.assertj:assertj-core", version.ref = "assertj-cor
180180
awaitility = { module = "org.awaitility:awaitility", version.ref = "awaitility" }
181181
delta-spark = { module = "io.delta:delta-spark_2.12", version.ref = "delta-spark" }
182182
esotericsoftware-kryo = { module = "com.esotericsoftware:kryo", version.ref = "esotericsoftware-kryo" }
183-
flink117-connector-test-utils = { module = "org.apache.flink:flink-connector-test-utils", version.ref = "flink117" }
184-
flink117-core = { module = "org.apache.flink:flink-core", version.ref = "flink117" }
185-
flink117-runtime = { module = "org.apache.flink:flink-runtime", version.ref = "flink117" }
186-
flink117-test-utils = { module = "org.apache.flink:flink-test-utils", version.ref = "flink117" }
187-
flink117-test-utilsjunit = { module = "org.apache.flink:flink-test-utils-junit", version.ref = "flink117" }
188183
flink118-connector-test-utils = { module = "org.apache.flink:flink-connector-test-utils", version.ref = "flink118" }
189184
flink118-core = { module = "org.apache.flink:flink-core", version.ref = "flink118" }
190185
flink118-runtime = { module = "org.apache.flink:flink-runtime", version.ref = "flink118" }
@@ -195,6 +190,11 @@ flink119-core = { module = "org.apache.flink:flink-core", version.ref = "flink11
195190
flink119-runtime = { module = "org.apache.flink:flink-runtime", version.ref = "flink119" }
196191
flink119-test-utils = { module = "org.apache.flink:flink-test-utils", version.ref = "flink119" }
197192
flink119-test-utilsjunit = { module = "org.apache.flink:flink-test-utils-junit", version.ref = "flink119" }
193+
flink120-connector-test-utils = { module = "org.apache.flink:flink-connector-test-utils", version.ref = "flink120" }
194+
flink120-core = { module = "org.apache.flink:flink-core", version.ref = "flink120" }
195+
flink120-runtime = { module = "org.apache.flink:flink-runtime", version.ref = "flink120" }
196+
flink120-test-utils = { module = "org.apache.flink:flink-test-utils", version.ref = "flink120" }
197+
flink120-test-utilsjunit = { module = "org.apache.flink:flink-test-utils-junit", version.ref = "flink120" }
198198
guava-testlib = { module = "com.google.guava:guava-testlib", version.ref = "guava" }
199199
jakarta-el-api = { module = "jakarta.el:jakarta.el-api", version.ref = "jakarta-el-api" }
200200
jakarta-servlet = {module = "jakarta.servlet:jakarta.servlet-api", version.ref = "jakarta-servlet-api"}

‎jmh.gradle

+6-6
Original file line numberDiff line numberDiff line change
@@ -26,16 +26,16 @@ def sparkVersions = (System.getProperty("sparkVersions") != null ? System.getPro
2626
def scalaVersion = System.getProperty("scalaVersion") != null ? System.getProperty("scalaVersion") : System.getProperty("defaultScalaVersion")
2727
def jmhProjects = [project(":iceberg-core"), project(":iceberg-data")]
2828

29-
if (flinkVersions.contains("1.16")) {
30-
jmhProjects.add(project(":iceberg-flink:iceberg-flink-1.16"))
29+
if (flinkVersions.contains("1.18")) {
30+
jmhProjects.add(project(":iceberg-flink:iceberg-flink-1.18"))
3131
}
3232

33-
if (flinkVersions.contains("1.17")) {
34-
jmhProjects.add(project(":iceberg-flink:iceberg-flink-1.17"))
33+
if (flinkVersions.contains("1.19")) {
34+
jmhProjects.add(project(":iceberg-flink:iceberg-flink-1.19"))
3535
}
3636

37-
if (flinkVersions.contains("1.18")) {
38-
jmhProjects.add(project(":iceberg-flink:iceberg-flink-1.18"))
37+
if (flinkVersions.contains("1.20")) {
38+
jmhProjects.add(project(":iceberg-flink:iceberg-flink-1.20"))
3939
}
4040

4141
if (sparkVersions.contains("3.3")) {

‎settings.gradle

+9-9
Original file line numberDiff line numberDiff line change
@@ -112,15 +112,6 @@ if (!flinkVersions.isEmpty()) {
112112
project(':flink').name = 'iceberg-flink'
113113
}
114114

115-
if (flinkVersions.contains("1.17")) {
116-
include ":iceberg-flink:flink-1.17"
117-
include ":iceberg-flink:flink-runtime-1.17"
118-
project(":iceberg-flink:flink-1.17").projectDir = file('flink/v1.17/flink')
119-
project(":iceberg-flink:flink-1.17").name = "iceberg-flink-1.17"
120-
project(":iceberg-flink:flink-runtime-1.17").projectDir = file('flink/v1.17/flink-runtime')
121-
project(":iceberg-flink:flink-runtime-1.17").name = "iceberg-flink-runtime-1.17"
122-
}
123-
124115
if (flinkVersions.contains("1.18")) {
125116
include ":iceberg-flink:flink-1.18"
126117
include ":iceberg-flink:flink-runtime-1.18"
@@ -139,6 +130,15 @@ if (flinkVersions.contains("1.19")) {
139130
project(":iceberg-flink:flink-runtime-1.19").name = "iceberg-flink-runtime-1.19"
140131
}
141132

133+
if (flinkVersions.contains("1.20")) {
134+
include ":iceberg-flink:flink-1.20"
135+
include ":iceberg-flink:flink-runtime-1.20"
136+
project(":iceberg-flink:flink-1.20").projectDir = file('flink/v1.20/flink')
137+
project(":iceberg-flink:flink-1.20").name = "iceberg-flink-1.20"
138+
project(":iceberg-flink:flink-runtime-1.20").projectDir = file('flink/v1.20/flink-runtime')
139+
project(":iceberg-flink:flink-runtime-1.20").name = "iceberg-flink-runtime-1.20"
140+
}
141+
142142
if (sparkVersions.contains("3.3")) {
143143
include ":iceberg-spark:spark-3.3_${scalaVersion}"
144144
include ":iceberg-spark:spark-extensions-3.3_${scalaVersion}"

0 commit comments

Comments
 (0)
Please sign in to comment.