Skip to content

Commit b3ebcf1

Browse files
rodmenesespvary
authored andcommitted
Flink: Refactoring code and properties to make Flink 1.19 to work
1 parent f761d98 commit b3ebcf1

25 files changed

+110
-79
lines changed

.github/workflows/flink-ci.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ jobs:
7272
strategy:
7373
matrix:
7474
jvm: [8, 11]
75-
flink: ['1.16', '1.17', '1.18']
75+
flink: ['1.17', '1.18', '1.19']
7676
env:
7777
SPARK_LOCAL_IP: localhost
7878
steps:

dev/stage-binaries.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
#
2020

2121
SCALA_VERSION=2.12
22-
FLINK_VERSIONS=1.16,1.17,1.18
22+
FLINK_VERSIONS=1.17,1.18,1.19
2323
SPARK_VERSIONS=3.3,3.4,3.5
2424
HIVE_VERSIONS=2,3
2525

flink/build.gradle

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,6 @@
1919

2020
def flinkVersions = (System.getProperty("flinkVersions") != null ? System.getProperty("flinkVersions") : System.getProperty("defaultFlinkVersions")).split(",")
2121

22-
if (flinkVersions.contains("1.16")) {
23-
apply from: file("$projectDir/v1.16/build.gradle")
24-
}
2522

2623
if (flinkVersions.contains("1.17")) {
2724
apply from: file("$projectDir/v1.17/build.gradle")
@@ -30,3 +27,7 @@ if (flinkVersions.contains("1.17")) {
3027
if (flinkVersions.contains("1.18")) {
3128
apply from: file("$projectDir/v1.18/build.gradle")
3229
}
30+
31+
if (flinkVersions.contains("1.19")) {
32+
apply from: file("$projectDir/v1.19/build.gradle")
33+
}

flink/v1.19/build.gradle

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
* under the License.
1818
*/
1919

20-
String flinkMajorVersion = '1.18'
20+
String flinkMajorVersion = '1.19'
2121
String scalaVersion = System.getProperty("scalaVersion") != null ? System.getProperty("scalaVersion") : System.getProperty("defaultScalaVersion")
2222

2323
project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") {
@@ -32,15 +32,15 @@ project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") {
3232
implementation project(':iceberg-parquet')
3333
implementation project(':iceberg-hive-metastore')
3434

35-
compileOnly libs.flink118.avro
35+
compileOnly libs.flink119.avro
3636
// for dropwizard histogram metrics implementation
37-
compileOnly libs.flink118.metrics.dropwizard
38-
compileOnly libs.flink118.streaming.java
39-
compileOnly "${libs.flink118.streaming.java.get().module}:${libs.flink118.streaming.java.get().getVersion()}:tests"
40-
compileOnly libs.flink118.table.api.java.bridge
41-
compileOnly "org.apache.flink:flink-table-planner_${scalaVersion}:${libs.versions.flink118.get()}"
42-
compileOnly libs.flink118.connector.base
43-
compileOnly libs.flink118.connector.files
37+
compileOnly libs.flink119.metrics.dropwizard
38+
compileOnly libs.flink119.streaming.java
39+
compileOnly "${libs.flink119.streaming.java.get().module}:${libs.flink119.streaming.java.get().getVersion()}:tests"
40+
compileOnly libs.flink119.table.api.java.bridge
41+
compileOnly "org.apache.flink:flink-table-planner_${scalaVersion}:${libs.versions.flink119.get()}"
42+
compileOnly libs.flink119.connector.base
43+
compileOnly libs.flink119.connector.files
4444

4545
compileOnly libs.hadoop2.hdfs
4646
compileOnly libs.hadoop2.common
@@ -66,13 +66,13 @@ project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") {
6666
exclude group: 'org.slf4j'
6767
}
6868

69-
testImplementation libs.flink118.connector.test.utils
70-
testImplementation libs.flink118.core
71-
testImplementation libs.flink118.runtime
72-
testImplementation(libs.flink118.test.utilsjunit) {
69+
testImplementation libs.flink119.connector.test.utils
70+
testImplementation libs.flink119.core
71+
testImplementation libs.flink119.runtime
72+
testImplementation(libs.flink119.test.utilsjunit) {
7373
exclude group: 'junit'
7474
}
75-
testImplementation(libs.flink118.test.utils) {
75+
testImplementation(libs.flink119.test.utils) {
7676
exclude group: "org.apache.curator", module: 'curator-test'
7777
exclude group: 'junit'
7878
}
@@ -166,7 +166,7 @@ project(":iceberg-flink:iceberg-flink-runtime-${flinkMajorVersion}") {
166166
}
167167

168168
// for dropwizard histogram metrics implementation
169-
implementation libs.flink118.metrics.dropwizard
169+
implementation libs.flink119.metrics.dropwizard
170170

171171
// for integration testing with the flink-runtime-jar
172172
// all of those dependencies are required because the integration test extends FlinkTestBase
@@ -176,13 +176,13 @@ project(":iceberg-flink:iceberg-flink-runtime-${flinkMajorVersion}") {
176176
integrationImplementation project(path: ":iceberg-flink:iceberg-flink-${flinkMajorVersion}", configuration: "testArtifacts")
177177
integrationImplementation project(path: ':iceberg-api', configuration: 'testArtifacts')
178178
integrationImplementation project(path: ':iceberg-hive-metastore', configuration: 'testArtifacts')
179-
integrationImplementation(libs.flink118.test.utils) {
179+
integrationImplementation(libs.flink119.test.utils) {
180180
exclude group: "org.apache.curator", module: 'curator-test'
181181
exclude group: 'junit'
182182
}
183183

184-
integrationImplementation libs.flink118.table.api.java.bridge
185-
integrationImplementation "org.apache.flink:flink-table-planner_${scalaVersion}:${libs.versions.flink118.get()}"
184+
integrationImplementation libs.flink119.table.api.java.bridge
185+
integrationImplementation "org.apache.flink:flink-table-planner_${scalaVersion}:${libs.versions.flink119.get()}"
186186

187187
integrationImplementation libs.hadoop2.common
188188
integrationImplementation libs.hadoop2.hdfs

flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,8 +70,8 @@ public class FlinkCatalogFactory implements CatalogFactory {
7070
public static final String HADOOP_CONF_DIR = "hadoop-conf-dir";
7171
public static final String DEFAULT_DATABASE = "default-database";
7272
public static final String DEFAULT_DATABASE_NAME = "default";
73+
public static final String DEFAULT_CATALOG_NAME = "default_catalog";
7374
public static final String BASE_NAMESPACE = "base-namespace";
74-
7575
public static final String TYPE = "type";
7676
public static final String PROPERTY_VERSION = "property-version";
7777

flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/FlinkTestBase.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
*/
1919
package org.apache.iceberg.flink;
2020

21+
import static org.apache.iceberg.flink.FlinkCatalogFactory.DEFAULT_CATALOG_NAME;
22+
2123
import java.util.List;
2224
import org.apache.flink.table.api.EnvironmentSettings;
2325
import org.apache.flink.table.api.TableEnvironment;
@@ -126,4 +128,20 @@ protected void dropCatalog(String catalogName, boolean ifExists) {
126128
sql("USE CATALOG default_catalog");
127129
sql("DROP CATALOG %s %s", ifExists ? "IF EXISTS" : "", catalogName);
128130
}
131+
132+
/**
133+
* We can not drop currently used database after FLINK-33226, so we have make sure that we do not
134+
* use the current database before dropping it. This method switches to the default database in
135+
* the default catalog, and then it and drops the one requested.
136+
*
137+
* @param database The database to drop
138+
* @param ifExists If we should use the 'IF EXISTS' when dropping the database
139+
*/
140+
protected void dropDatabase(String database, boolean ifExists) {
141+
String currentCatalog = getTableEnv().getCurrentCatalog();
142+
sql("USE CATALOG %s", DEFAULT_CATALOG_NAME);
143+
sql("USE %s", getTableEnv().listDatabases()[0]);
144+
sql("USE CATALOG %s", currentCatalog);
145+
sql("DROP DATABASE %s %s", ifExists ? "IF EXISTS" : "", database);
146+
}
129147
}

flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestBase.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
*/
1919
package org.apache.iceberg.flink;
2020

21+
import static org.apache.iceberg.flink.FlinkCatalogFactory.DEFAULT_CATALOG_NAME;
22+
2123
import java.nio.file.Path;
2224
import java.util.List;
2325
import org.apache.flink.table.api.EnvironmentSettings;
@@ -124,7 +126,23 @@ protected void assertSameElements(String message, Iterable<Row> expected, Iterab
124126
* @param ifExists If we should use the 'IF EXISTS' when dropping the catalog
125127
*/
126128
protected void dropCatalog(String catalogName, boolean ifExists) {
127-
sql("USE CATALOG default_catalog");
129+
sql("USE CATALOG %s", DEFAULT_CATALOG_NAME);
128130
sql("DROP CATALOG %s %s", ifExists ? "IF EXISTS" : "", catalogName);
129131
}
132+
133+
/**
134+
* We can not drop currently used database after FLINK-33226, so we have make sure that we do not
135+
* use the current database before dropping it. This method switches to the default database in
136+
* the default catalog, and then it and drops the one requested.
137+
*
138+
* @param database The database to drop
139+
* @param ifExists If we should use the 'IF EXISTS' when dropping the database
140+
*/
141+
protected void dropDatabase(String database, boolean ifExists) {
142+
String currentCatalog = getTableEnv().getCurrentCatalog();
143+
sql("USE CATALOG %s", DEFAULT_CATALOG_NAME);
144+
sql("USE %s", getTableEnv().listDatabases()[0]);
145+
sql("USE CATALOG %s", currentCatalog);
146+
sql("DROP DATABASE %s %s", ifExists ? "IF EXISTS" : "", database);
147+
}
130148
}

flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestChangeLogTable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ public void before() {
9898
@Override
9999
public void clean() {
100100
sql("DROP TABLE IF EXISTS %s", TABLE_NAME);
101-
sql("DROP DATABASE IF EXISTS %s", DATABASE_NAME);
101+
dropDatabase(DATABASE_NAME, true);
102102
dropCatalog(CATALOG_NAME, true);
103103
BoundedTableFactory.clearDataSets();
104104
}

flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogDatabase.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ public class TestFlinkCatalogDatabase extends CatalogTestBase {
4141
@Override
4242
public void clean() {
4343
sql("DROP TABLE IF EXISTS %s.tl", flinkDatabase);
44-
sql("DROP DATABASE IF EXISTS %s", flinkDatabase);
44+
dropDatabase(flinkDatabase, true);
4545
super.clean();
4646
}
4747

@@ -61,7 +61,7 @@ public void testCreateNamespace() {
6161
.as("Database should still exist")
6262
.isTrue();
6363

64-
sql("DROP DATABASE IF EXISTS %s", flinkDatabase);
64+
dropDatabase(flinkDatabase, true);
6565
assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace))
6666
.as("Database should be dropped")
6767
.isFalse();
@@ -81,7 +81,7 @@ public void testDropEmptyDatabase() {
8181
assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace))
8282
.as("Namespace should exist")
8383
.isTrue();
84-
sql("DROP DATABASE %s", flinkDatabase);
84+
dropDatabase(flinkDatabase, true);
8585
assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace))
8686
.as("Namespace should have been dropped")
8787
.isFalse();
@@ -105,7 +105,7 @@ public void testDropNonEmptyNamespace() {
105105
assertThat(validationCatalog.tableExists(TableIdentifier.of(icebergNamespace, "tl")))
106106
.as("Table should exist")
107107
.isTrue();
108-
Assertions.assertThatThrownBy(() -> sql("DROP DATABASE %s", flinkDatabase))
108+
Assertions.assertThatThrownBy(() -> dropDatabase(flinkDatabase, true))
109109
.cause()
110110
.isInstanceOf(DatabaseNotEmptyException.class)
111111
.hasMessage(

flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ public void before() {
7474
public void cleanNamespaces() {
7575
sql("DROP TABLE IF EXISTS %s.tl", flinkDatabase);
7676
sql("DROP TABLE IF EXISTS %s.tl2", flinkDatabase);
77-
sql("DROP DATABASE IF EXISTS %s", flinkDatabase);
77+
dropDatabase(flinkDatabase, true);
7878
super.clean();
7979
}
8080

0 commit comments

Comments
 (0)