Skip to content

Commit 7a4d71e

Browse files
luoyuxiawuchong
authored andcommitted
[lake] Use DataLake's key encoding and bucket assigner for tables created in DataLake configured cluster (#362)
1 parent 14ae7d4 commit 7a4d71e

File tree

64 files changed

+1362
-829
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

64 files changed

+1362
-829
lines changed

fluss-client/pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
</dependency>
4949

5050
<!-- paimon bundle -->
51+
<!-- remove paimon bundle in #408 -->
5152
<dependency>
5253
<groupId>org.apache.paimon</groupId>
5354
<artifactId>paimon-bundle</artifactId>

fluss-client/src/main/java/com/alibaba/fluss/client/lakehouse/LakeTableBucketAssigner.java

Lines changed: 0 additions & 63 deletions
This file was deleted.

fluss-client/src/main/java/com/alibaba/fluss/client/lakehouse/paimon/FlussDataTypeToPaimonDataType.java

Lines changed: 0 additions & 174 deletions
This file was deleted.

fluss-client/src/main/java/com/alibaba/fluss/client/lookup/PrefixKeyLookuper.java

Lines changed: 22 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,17 @@
1616

1717
package com.alibaba.fluss.client.lookup;
1818

19-
import com.alibaba.fluss.client.lakehouse.LakeTableBucketAssigner;
2019
import com.alibaba.fluss.client.metadata.MetadataUpdater;
2120
import com.alibaba.fluss.client.table.getter.PartitionGetter;
21+
import com.alibaba.fluss.lakehouse.DataLakeFormat;
22+
import com.alibaba.fluss.lakehouse.LakeBucketAssigner;
23+
import com.alibaba.fluss.lakehouse.LakeBucketAssignerFactory;
24+
import com.alibaba.fluss.lakehouse.LakeKeyEncoderFactory;
2225
import com.alibaba.fluss.metadata.TableBucket;
2326
import com.alibaba.fluss.metadata.TableInfo;
2427
import com.alibaba.fluss.row.InternalRow;
2528
import com.alibaba.fluss.row.decode.RowDecoder;
29+
import com.alibaba.fluss.row.encode.CompactedKeyEncoder;
2630
import com.alibaba.fluss.row.encode.KeyEncoder;
2731
import com.alibaba.fluss.row.encode.ValueDecoder;
2832
import com.alibaba.fluss.types.DataType;
@@ -33,6 +37,7 @@
3337
import java.util.ArrayList;
3438
import java.util.HashSet;
3539
import java.util.List;
40+
import java.util.Optional;
3641
import java.util.Set;
3742
import java.util.concurrent.CompletableFuture;
3843

@@ -56,7 +61,8 @@ class PrefixKeyLookuper implements Lookuper {
5661

5762
private final int numBuckets;
5863

59-
private final LakeTableBucketAssigner lakeTableBucketAssigner;
64+
// won't be null if the datalake format is set in the table
65+
private @Nullable final LakeBucketAssigner lakeBucketAssigner;
6066

6167
/**
6268
* a getter to extract partition from prefix lookup key row, null when it's not a partitioned.
@@ -80,15 +86,19 @@ public PrefixKeyLookuper(
8086
this.lookupClient = lookupClient;
8187
// the row type of the input lookup row
8288
RowType lookupRowType = tableInfo.getRowType().project(lookupColumnNames);
83-
this.bucketKeyEncoder =
84-
KeyEncoder.createKeyEncoder(lookupRowType, tableInfo.getBucketKeys());
85-
86-
if (tableInfo.getTableConfig().isDataLakeEnabled()) {
87-
this.lakeTableBucketAssigner =
88-
new LakeTableBucketAssigner(
89-
lookupRowType, tableInfo.getBucketKeys(), numBuckets);
89+
Optional<DataLakeFormat> optDataLakeFormat = tableInfo.getTableConfig().getDataLakeFormat();
90+
91+
if (optDataLakeFormat.isPresent()) {
92+
DataLakeFormat dataLakeFormat = optDataLakeFormat.get();
93+
this.bucketKeyEncoder =
94+
LakeKeyEncoderFactory.createKeyEncoder(
95+
dataLakeFormat, lookupRowType, tableInfo.getBucketKeys());
96+
this.lakeBucketAssigner =
97+
LakeBucketAssignerFactory.createLakeBucketAssigner(dataLakeFormat, numBuckets);
9098
} else {
91-
this.lakeTableBucketAssigner = null;
99+
this.bucketKeyEncoder =
100+
CompactedKeyEncoder.createKeyEncoder(lookupRowType, tableInfo.getBucketKeys());
101+
this.lakeBucketAssigner = null;
92102
}
93103

94104
this.partitionGetter =
@@ -152,14 +162,8 @@ private void validatePrefixLookup(TableInfo tableInfo, List<String> lookupColumn
152162

153163
@Override
154164
public CompletableFuture<LookupResult> lookup(InternalRow prefixKey) {
155-
byte[] bucketKeyBytes = bucketKeyEncoder.encode(prefixKey);
156-
int bucketId =
157-
getBucketId(
158-
bucketKeyBytes,
159-
prefixKey,
160-
lakeTableBucketAssigner,
161-
numBuckets,
162-
metadataUpdater);
165+
byte[] bucketKeyBytes = bucketKeyEncoder.encodeKey(prefixKey);
166+
int bucketId = getBucketId(bucketKeyBytes, lakeBucketAssigner, numBuckets);
163167

164168
Long partitionId = null;
165169
if (partitionGetter != null) {

0 commit comments

Comments
 (0)