Skip to content

Commit 1586a55

Browse files
authored
[FLINK-37862][table] Support inline structured types in SQL
This closes #26638.
1 parent 8d513fc commit 1586a55

File tree

30 files changed

+1264
-359
lines changed

30 files changed

+1264
-359
lines changed

docs/content.zh/docs/dev/table/types.md

Lines changed: 87 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1232,34 +1232,46 @@ equivalent to `ROW<myField INT, myOtherField BOOLEAN>`.
12321232

12331233
### User-Defined Data Types
12341234

1235-
{{< tabs "udf" >}}
1236-
{{< tab "Java/Scala" >}}
1237-
<span class="label label-danger">Attention</span> User-defined data types are not fully supported yet. They are
1238-
currently (as of Flink 1.11) only exposed as unregistered structured types in parameters and return types of functions.
1235+
#### `STRUCTURED`
12391236

1240-
A structured type is similar to an object in an object-oriented programming language. It contains
1241-
zero, one or more attributes. Each attribute consists of a name and a type.
1237+
Data type for a user-defined object.
12421238

1243-
There are two kinds of structured types:
1239+
Compared to `ROW`, which may also be considered a "struct-like" type, structured types are distinguishable even if they
1240+
contain the same set of fields. For example, `Visit(amount DOUBLE)` is distinct from `Interaction(amount DOUBLE)` due
1241+
its identifier.
12441242

1245-
- Types that are stored in a catalog and are identified by a _catalog identifier_ (like `cat.db.MyType`). Those
1246-
are equal to the SQL standard definition of structured types.
1243+
Similar to classes in object-oriented programming languages, structured types are identified by a class name and contain
1244+
zero, one or more attributes. Each attribute has a name, a type, and an optional description. A type cannot be defined
1245+
in such a way that one of its attribute types (transitively) refers to itself.
12471246

1248-
- Anonymously defined, unregistered types (usually reflectively extracted) that are identified by
1249-
an _implementation class_ (like `com.myorg.model.MyType`). Those are useful when programmatically
1250-
defining a table program. They enable reusing existing JVM classes without manually defining the
1251-
schema of a data type again.
1247+
Structured types are internally converted by the system into suitable data structures. Serialization and equality checks
1248+
are managed by the system based on the logical type.
12521249

1253-
#### Registered Structured Types
1250+
{{< tabs "udt" >}}
1251+
{{< tab "SQL" >}}
1252+
```sql
1253+
STRUCTURED<'c', n0 t0, n1 t1, ...>
1254+
STRUCTURED<'c', n0 t0, n1 t1 'd1', ...>
1255+
```
1256+
The type can be declared using `STRUCTURED<'c', n0 t0 'd0', n1 t1 'd1', ...>` where `c` is the class name, `n` is the
1257+
unique name of a field, `t` is the logical type of a field, `d` is the optional description of a field.
1258+
{{< /tab >}}
12541259

1255-
Currently, registered structured types are not supported. Thus, they cannot be stored in a catalog
1256-
or referenced in a `CREATE TABLE` DDL.
1260+
{{< tab "Java/Scala" >}}
1261+
Usually structured types are defined **inline** and can be reflectively extracted from a corresponding implementation class.
1262+
For example, in the signature of an `eval()` method for functions. This is useful when programmatically defining a table
1263+
program. They enable reusing existing JVM classes without manually defining the schema of a data type again.
12571264

1258-
#### Unregistered Structured Types
1265+
If the class name matches a class in the classpath, the system will convert a structured object to a JVM object at the edges
1266+
of the table ecosystem (e.g. when bridging to a function or connector). The implementation class must provide either a
1267+
zero-argument constructor or a full constructor that assigns all attributes.
12591268

1260-
Unregistered structured types can be created from regular POJOs (Plain Old Java Objects) using automatic reflective extraction.
1269+
But the class name does not need to be resolvable in the classpath, it may be used solely to distinguish between objects with
1270+
identical attribute sets. However, in Table API and UDF calls, the system will attempt to resolve the class name to an
1271+
actual implementation class. If resolution fails, `Row` is used as a fallback.
12611272

1262-
The implementation class of a structured type must meet the following requirements:
1273+
Inline structured types can be created from regular POJOs (Plain Old Java Objects) if the implementation class meets the
1274+
following requirements:
12631275
- The class must be globally accessible which means it must be declared `public`, `static`, and not `abstract`.
12641276
- The class must offer a default constructor with zero arguments or a full constructor that assigns all
12651277
fields.
@@ -1281,15 +1293,51 @@ For some classes an annotation is required in order to map the class to a data t
12811293
to assign a fixed precision and scale for `java.math.BigDecimal`).
12821294
{{< /tab >}}
12831295
{{< tab "Python" >}}
1296+
```python
1297+
Not supported.
1298+
```
12841299
{{< /tab >}}
12851300
{{< /tabs >}}
12861301

12871302
**Declaration**
12881303

12891304
{{< tabs "c5e5527b-b09d-4dc5-9549-8fd2bfc7cc2a" >}}
1290-
{{< tab "Java" >}}
1305+
{{< tab "Java/Scala" >}}
1306+
Structured types are usually declared via their implementation classes:
1307+
12911308
```java
1292-
class User {
1309+
// A simple POJO that qualifies as a structured type.
1310+
// Note: Without a fully assigning constructor, the order of fields will be alphabetical.
1311+
// The final data type will be:
1312+
// STRUCTURED<'com.myorg.Customer', active BOOLEAN, id INT NOT NULL, name STRING, properties MAP<STRING, STRING>>
1313+
class Customer {
1314+
public int id;
1315+
public String name;
1316+
public Map<String, String> properties;
1317+
public boolean active;
1318+
}
1319+
1320+
// A POJO with a fully assigning constructor defining the field order.
1321+
// The final data type will be:
1322+
// STRUCTURED<'com.myorg.Customer', id INT NOT NULL, name STRING, properties MAP<STRING, STRING>, active BOOLEAN>
1323+
class Customer {
1324+
public int id;
1325+
public String name;
1326+
public Map<String, String> properties;
1327+
public boolean active;
1328+
1329+
public Customer(int id, String name, Map<String, String> properties, boolean active) {
1330+
this.id = id;
1331+
this.name = name;
1332+
this.properties = properties;
1333+
this.active = active;
1334+
}
1335+
}
1336+
1337+
// A POJO that uses the @DataTypeHint annotations for supporting the reflective extraction.
1338+
// The final data type will be:
1339+
// STRUCTURED<'com.myorg.Customer', age INT NOT NULL, modelClass RAW(...), name STRING, totalBalance DECIMAL(10, 2)>
1340+
class Customer {
12931341

12941342
// extract fields automatically
12951343
public int age;
@@ -1301,35 +1349,30 @@ class User {
13011349
// enrich the extraction with forcing using RAW types
13021350
public @DataTypeHint("RAW") Class<?> modelClass;
13031351
}
1304-
1305-
DataTypes.of(User.class);
13061352
```
13071353

1308-
**Bridging to JVM Types**
1309-
1310-
| Java Type | Input | Output | Remarks |
1311-
|:-------------------------------------|:-----:|:------:|:----------------------------------------|
1312-
|*class* | X | X | Originating class or subclasses (for input) or <br>superclasses (for output). *Default* |
1313-
|`org.apache.flink.types.Row` | X | X | Represent the structured type as a row. |
1314-
|`org.apache.flink.table.data.RowData` | X | X | Internal data structure. |
1315-
1316-
{{< /tab >}}
1317-
{{< tab "Scala" >}}
1318-
```scala
1319-
case class User(
1354+
Or via explicit declaration:
1355+
```java
1356+
// Provide an implementation class
1357+
DataTypes.STRUCTURED(MyPojo.class, DataTypes.FIELD(n0, t0), DataTypes.FIELD(n1, t1), ...);
13201358

1321-
// extract fields automatically
1322-
age: Int,
1323-
name: String,
1359+
// Provide a class name only, the class is resolved only if available in the classpath
1360+
DataTypes.STRUCTURED("com.myorg.MyPojo", DataTypes.FIELD(n0, t0), DataTypes.FIELD(n1, t1), ...);
13241361

1325-
// enrich the extraction with precision information
1326-
@DataTypeHint("DECIMAL(10, 2)") totalBalance: java.math.BigDecimal,
1362+
// Full example
1363+
DataTypes.STRUCTURED(
1364+
Customer.class,
1365+
DataTypes.FIELD("age", DataTypes.INT().notNull()),
1366+
DataTypes.FIELD("name", DataTypes.STRING())
1367+
);
1368+
```
13271369

1328-
// enrich the extraction with forcing using a RAW type
1329-
@DataTypeHint("RAW") modelClass: Class[_]
1330-
)
1370+
Or via explicit extraction:
1371+
```java
1372+
DataTypes.of(Class);
13311373

1332-
DataTypes.of(classOf[User])
1374+
// For example:
1375+
DataTypes.of(Customer.class);
13331376
```
13341377

13351378
**Bridging to JVM Types**

docs/content/docs/dev/table/types.md

Lines changed: 87 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1241,34 +1241,46 @@ equivalent to `ROW<myField INT, myOtherField BOOLEAN>`.
12411241

12421242
### User-Defined Data Types
12431243

1244-
{{< tabs "udf" >}}
1245-
{{< tab "Java/Scala" >}}
1246-
<span class="label label-danger">Attention</span> User-defined data types are not fully supported yet. They are
1247-
currently (as of Flink 1.11) only exposed as unregistered structured types in parameters and return types of functions.
1244+
#### `STRUCTURED`
12481245

1249-
A structured type is similar to an object in an object-oriented programming language. It contains
1250-
zero, one or more attributes. Each attribute consists of a name and a type.
1246+
Data type for a user-defined object.
12511247

1252-
There are two kinds of structured types:
1248+
Compared to `ROW`, which may also be considered a "struct-like" type, structured types are distinguishable even if they
1249+
contain the same set of fields. For example, `Visit(amount DOUBLE)` is distinct from `Interaction(amount DOUBLE)` due
1250+
its identifier.
12531251

1254-
- Types that are stored in a catalog and are identified by a _catalog identifier_ (like `cat.db.MyType`). Those
1255-
are equal to the SQL standard definition of structured types.
1252+
Similar to classes in object-oriented programming languages, structured types are identified by a class name and contain
1253+
zero, one or more attributes. Each attribute has a name, a type, and an optional description. A type cannot be defined
1254+
in such a way that one of its attribute types (transitively) refers to itself.
12561255

1257-
- Anonymously defined, unregistered types (usually reflectively extracted) that are identified by
1258-
an _implementation class_ (like `com.myorg.model.MyType`). Those are useful when programmatically
1259-
defining a table program. They enable reusing existing JVM classes without manually defining the
1260-
schema of a data type again.
1256+
Structured types are internally converted by the system into suitable data structures. Serialization and equality checks
1257+
are managed by the system based on the logical type.
12611258

1262-
#### Registered Structured Types
1259+
{{< tabs "udt" >}}
1260+
{{< tab "SQL" >}}
1261+
```sql
1262+
STRUCTURED<'c', n0 t0, n1 t1, ...>
1263+
STRUCTURED<'c', n0 t0, n1 t1 'd1', ...>
1264+
```
1265+
The type can be declared using `STRUCTURED<'c', n0 t0 'd0', n1 t1 'd1', ...>` where `c` is the class name, `n` is the
1266+
unique name of a field, `t` is the logical type of a field, `d` is the optional description of a field.
1267+
{{< /tab >}}
12631268

1264-
Currently, registered structured types are not supported. Thus, they cannot be stored in a catalog
1265-
or referenced in a `CREATE TABLE` DDL.
1269+
{{< tab "Java/Scala" >}}
1270+
Usually structured types are defined **inline** and can be reflectively extracted from a corresponding implementation class.
1271+
For example, in the signature of an `eval()` method for functions. This is useful when programmatically defining a table
1272+
program. They enable reusing existing JVM classes without manually defining the schema of a data type again.
12661273

1267-
#### Unregistered Structured Types
1274+
If the class name matches a class in the classpath, the system will convert a structured object to a JVM object at the edges
1275+
of the table ecosystem (e.g. when bridging to a function or connector). The implementation class must provide either a
1276+
zero-argument constructor or a full constructor that assigns all attributes.
12681277

1269-
Unregistered structured types can be created from regular POJOs (Plain Old Java Objects) using automatic reflective extraction.
1278+
But the class name does not need to be resolvable in the classpath, it may be used solely to distinguish between objects with
1279+
identical attribute sets. However, in Table API and UDF calls, the system will attempt to resolve the class name to an
1280+
actual implementation class. If resolution fails, `Row` is used as a fallback.
12701281

1271-
The implementation class of a structured type must meet the following requirements:
1282+
Inline structured types can be created from regular POJOs (Plain Old Java Objects) if the implementation class meets the
1283+
following requirements:
12721284
- The class must be globally accessible which means it must be declared `public`, `static`, and not `abstract`.
12731285
- The class must offer a default constructor with zero arguments or a full constructor that assigns all
12741286
fields.
@@ -1290,15 +1302,51 @@ For some classes an annotation is required in order to map the class to a data t
12901302
to assign a fixed precision and scale for `java.math.BigDecimal`).
12911303
{{< /tab >}}
12921304
{{< tab "Python" >}}
1305+
```python
1306+
Not supported.
1307+
```
12931308
{{< /tab >}}
12941309
{{< /tabs >}}
12951310

12961311
**Declaration**
12971312

12981313
{{< tabs "c5e5527b-b09d-4dc5-9549-8fd2bfc7cc2a" >}}
1299-
{{< tab "Java" >}}
1314+
{{< tab "Java/Scala" >}}
1315+
Structured types are usually declared via their implementation classes:
1316+
13001317
```java
1301-
class User {
1318+
// A simple POJO that qualifies as a structured type.
1319+
// Note: Without a fully assigning constructor, the order of fields will be alphabetical.
1320+
// The final data type will be:
1321+
// STRUCTURED<'com.myorg.Customer', active BOOLEAN, id INT NOT NULL, name STRING, properties MAP<STRING, STRING>>
1322+
class Customer {
1323+
public int id;
1324+
public String name;
1325+
public Map<String, String> properties;
1326+
public boolean active;
1327+
}
1328+
1329+
// A POJO with a fully assigning constructor defining the field order.
1330+
// The final data type will be:
1331+
// STRUCTURED<'com.myorg.Customer', id INT NOT NULL, name STRING, properties MAP<STRING, STRING>, active BOOLEAN>
1332+
class Customer {
1333+
public int id;
1334+
public String name;
1335+
public Map<String, String> properties;
1336+
public boolean active;
1337+
1338+
public Customer(int id, String name, Map<String, String> properties, boolean active) {
1339+
this.id = id;
1340+
this.name = name;
1341+
this.properties = properties;
1342+
this.active = active;
1343+
}
1344+
}
1345+
1346+
// A POJO that uses the @DataTypeHint annotations for supporting the reflective extraction.
1347+
// The final data type will be:
1348+
// STRUCTURED<'com.myorg.Customer', age INT NOT NULL, modelClass RAW(...), name STRING, totalBalance DECIMAL(10, 2)>
1349+
class Customer {
13021350

13031351
// extract fields automatically
13041352
public int age;
@@ -1310,35 +1358,30 @@ class User {
13101358
// enrich the extraction with forcing using RAW types
13111359
public @DataTypeHint("RAW") Class<?> modelClass;
13121360
}
1313-
1314-
DataTypes.of(User.class);
13151361
```
13161362

1317-
**Bridging to JVM Types**
1318-
1319-
| Java Type | Input | Output | Remarks |
1320-
|:-------------------------------------|:-----:|:------:|:----------------------------------------|
1321-
|*class* | X | X | Originating class or subclasses (for input) or <br>superclasses (for output). *Default* |
1322-
|`org.apache.flink.types.Row` | X | X | Represent the structured type as a row. |
1323-
|`org.apache.flink.table.data.RowData` | X | X | Internal data structure. |
1324-
1325-
{{< /tab >}}
1326-
{{< tab "Scala" >}}
1327-
```scala
1328-
case class User(
1363+
Or via explicit declaration:
1364+
```java
1365+
// Provide an implementation class
1366+
DataTypes.STRUCTURED(MyPojo.class, DataTypes.FIELD(n0, t0), DataTypes.FIELD(n1, t1), ...);
13291367

1330-
// extract fields automatically
1331-
age: Int,
1332-
name: String,
1368+
// Provide a class name only, the class is resolved only if available in the classpath
1369+
DataTypes.STRUCTURED("com.myorg.MyPojo", DataTypes.FIELD(n0, t0), DataTypes.FIELD(n1, t1), ...);
13331370

1334-
// enrich the extraction with precision information
1335-
@DataTypeHint("DECIMAL(10, 2)") totalBalance: java.math.BigDecimal,
1371+
// Full example
1372+
DataTypes.STRUCTURED(
1373+
Customer.class,
1374+
DataTypes.FIELD("age", DataTypes.INT().notNull()),
1375+
DataTypes.FIELD("name", DataTypes.STRING())
1376+
);
1377+
```
13361378

1337-
// enrich the extraction with forcing using a RAW type
1338-
@DataTypeHint("RAW") modelClass: Class[_]
1339-
)
1379+
Or via explicit extraction:
1380+
```java
1381+
DataTypes.of(Class);
13401382

1341-
DataTypes.of(classOf[User])
1383+
// For example:
1384+
DataTypes.of(Customer.class);
13421385
```
13431386

13441387
**Bridging to JVM Types**

flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,7 @@
150150
"org.apache.flink.sql.parser.type.ExtendedSqlRowTypeNameSpec"
151151
"org.apache.flink.sql.parser.type.SqlMapTypeNameSpec"
152152
"org.apache.flink.sql.parser.type.SqlRawTypeNameSpec"
153+
"org.apache.flink.sql.parser.type.SqlStructuredTypeNameSpec"
153154
"org.apache.flink.sql.parser.type.SqlTimestampLtzTypeNameSpec"
154155
"org.apache.flink.sql.parser.utils.ParserResource"
155156
"org.apache.flink.sql.parser.validate.FlinkSqlConformance"
@@ -219,6 +220,7 @@
219220
"STATISTICS"
220221
"STOP"
221222
"STRING"
223+
"STRUCTURED"
222224
"SUSPEND"
223225
"REFRESH"
224226
"RESUME"
@@ -661,6 +663,7 @@
661663
"SqlMapTypeName()"
662664
"SqlRawTypeName()"
663665
"ExtendedSqlRowTypeName()"
666+
"SqlStructuredTypeName()"
664667
]
665668

666669
# List of methods for parsing builtin function calls.

0 commit comments

Comments
 (0)