Skip to content

Commit 3bf243d

Browse files
committed
[common] Use a unified way to encode keys and assign buckets for datalake enabled or not
1 parent 7a4d71e commit 3bf243d

File tree

43 files changed

+399
-572
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+399
-572
lines changed

fluss-client/src/main/java/com/alibaba/fluss/client/lookup/PrefixKeyLookuper.java

Lines changed: 7 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,14 @@
1616

1717
package com.alibaba.fluss.client.lookup;
1818

19+
import com.alibaba.fluss.bucketing.BucketingFunction;
1920
import com.alibaba.fluss.client.metadata.MetadataUpdater;
2021
import com.alibaba.fluss.client.table.getter.PartitionGetter;
21-
import com.alibaba.fluss.lakehouse.DataLakeFormat;
22-
import com.alibaba.fluss.lakehouse.LakeBucketAssigner;
23-
import com.alibaba.fluss.lakehouse.LakeBucketAssignerFactory;
24-
import com.alibaba.fluss.lakehouse.LakeKeyEncoderFactory;
22+
import com.alibaba.fluss.metadata.DataLakeFormat;
2523
import com.alibaba.fluss.metadata.TableBucket;
2624
import com.alibaba.fluss.metadata.TableInfo;
2725
import com.alibaba.fluss.row.InternalRow;
2826
import com.alibaba.fluss.row.decode.RowDecoder;
29-
import com.alibaba.fluss.row.encode.CompactedKeyEncoder;
3027
import com.alibaba.fluss.row.encode.KeyEncoder;
3128
import com.alibaba.fluss.row.encode.ValueDecoder;
3229
import com.alibaba.fluss.types.DataType;
@@ -37,11 +34,9 @@
3734
import java.util.ArrayList;
3835
import java.util.HashSet;
3936
import java.util.List;
40-
import java.util.Optional;
4137
import java.util.Set;
4238
import java.util.concurrent.CompletableFuture;
4339

44-
import static com.alibaba.fluss.client.utils.ClientUtils.getBucketId;
4540
import static com.alibaba.fluss.client.utils.ClientUtils.getPartitionId;
4641

4742
/**
@@ -59,11 +54,9 @@ class PrefixKeyLookuper implements Lookuper {
5954
/** Extract bucket key from prefix lookup key row. */
6055
private final KeyEncoder bucketKeyEncoder;
6156

57+
private final BucketingFunction bucketingFunction;
6258
private final int numBuckets;
6359

64-
// won't be null if the datalake format is set in the table
65-
private @Nullable final LakeBucketAssigner lakeBucketAssigner;
66-
6760
/**
6861
* a getter to extract partition from prefix lookup key row, null when it's not a partitioned.
6962
*/
@@ -86,21 +79,10 @@ public PrefixKeyLookuper(
8679
this.lookupClient = lookupClient;
8780
// the row type of the input lookup row
8881
RowType lookupRowType = tableInfo.getRowType().project(lookupColumnNames);
89-
Optional<DataLakeFormat> optDataLakeFormat = tableInfo.getTableConfig().getDataLakeFormat();
90-
91-
if (optDataLakeFormat.isPresent()) {
92-
DataLakeFormat dataLakeFormat = optDataLakeFormat.get();
93-
this.bucketKeyEncoder =
94-
LakeKeyEncoderFactory.createKeyEncoder(
95-
dataLakeFormat, lookupRowType, tableInfo.getBucketKeys());
96-
this.lakeBucketAssigner =
97-
LakeBucketAssignerFactory.createLakeBucketAssigner(dataLakeFormat, numBuckets);
98-
} else {
99-
this.bucketKeyEncoder =
100-
CompactedKeyEncoder.createKeyEncoder(lookupRowType, tableInfo.getBucketKeys());
101-
this.lakeBucketAssigner = null;
102-
}
82+
DataLakeFormat lakeFormat = tableInfo.getTableConfig().getDataLakeFormat().orElse(null);
10383

84+
this.bucketKeyEncoder = KeyEncoder.of(lookupRowType, tableInfo.getBucketKeys(), lakeFormat);
85+
this.bucketingFunction = BucketingFunction.of(lakeFormat);
10486
this.partitionGetter =
10587
tableInfo.isPartitioned()
10688
? new PartitionGetter(lookupRowType, tableInfo.getPartitionKeys())
@@ -163,7 +145,7 @@ private void validatePrefixLookup(TableInfo tableInfo, List<String> lookupColumn
163145
@Override
164146
public CompletableFuture<LookupResult> lookup(InternalRow prefixKey) {
165147
byte[] bucketKeyBytes = bucketKeyEncoder.encodeKey(prefixKey);
166-
int bucketId = getBucketId(bucketKeyBytes, lakeBucketAssigner, numBuckets);
148+
int bucketId = bucketingFunction.bucketing(bucketKeyBytes, numBuckets);
167149

168150
Long partitionId = null;
169151
if (partitionGetter != null) {

fluss-client/src/main/java/com/alibaba/fluss/client/lookup/PrimaryKeyLookuper.java

Lines changed: 14 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -16,28 +16,23 @@
1616

1717
package com.alibaba.fluss.client.lookup;
1818

19+
import com.alibaba.fluss.bucketing.BucketingFunction;
1920
import com.alibaba.fluss.client.metadata.MetadataUpdater;
2021
import com.alibaba.fluss.client.table.getter.PartitionGetter;
21-
import com.alibaba.fluss.lakehouse.DataLakeFormat;
22-
import com.alibaba.fluss.lakehouse.LakeBucketAssigner;
23-
import com.alibaba.fluss.lakehouse.LakeBucketAssignerFactory;
24-
import com.alibaba.fluss.lakehouse.LakeKeyEncoderFactory;
22+
import com.alibaba.fluss.metadata.DataLakeFormat;
2523
import com.alibaba.fluss.metadata.TableBucket;
2624
import com.alibaba.fluss.metadata.TableInfo;
2725
import com.alibaba.fluss.row.InternalRow;
2826
import com.alibaba.fluss.row.decode.RowDecoder;
29-
import com.alibaba.fluss.row.encode.CompactedKeyEncoder;
3027
import com.alibaba.fluss.row.encode.KeyEncoder;
3128
import com.alibaba.fluss.row.encode.ValueDecoder;
3229
import com.alibaba.fluss.types.DataType;
3330
import com.alibaba.fluss.types.RowType;
3431

3532
import javax.annotation.Nullable;
3633

37-
import java.util.Optional;
3834
import java.util.concurrent.CompletableFuture;
3935

40-
import static com.alibaba.fluss.client.utils.ClientUtils.getBucketId;
4136
import static com.alibaba.fluss.client.utils.ClientUtils.getPartitionId;
4237
import static com.alibaba.fluss.utils.Preconditions.checkArgument;
4338

@@ -58,9 +53,7 @@ class PrimaryKeyLookuper implements Lookuper {
5853
*/
5954
private final KeyEncoder bucketKeyEncoder;
6055

61-
// won't be null if the datalake type is set in the table
62-
private @Nullable final LakeBucketAssigner lakeBucketAssigner;
63-
56+
private final BucketingFunction bucketingFunction;
6457
private final int numBuckets;
6558

6659
/** a getter to extract partition from lookup key row, null when it's not a partitioned. */
@@ -82,33 +75,16 @@ public PrimaryKeyLookuper(
8275

8376
// the row type of the input lookup row
8477
RowType lookupRowType = tableInfo.getRowType().project(tableInfo.getPrimaryKeys());
85-
Optional<DataLakeFormat> optDataLakeFormat = tableInfo.getTableConfig().getDataLakeFormat();
86-
87-
if (optDataLakeFormat.isPresent()) {
88-
DataLakeFormat dataLakeFormat = optDataLakeFormat.get();
89-
// the encoded primary key is the physical primary key
90-
this.primaryKeyEncoder =
91-
LakeKeyEncoderFactory.createKeyEncoder(
92-
dataLakeFormat, lookupRowType, tableInfo.getPhysicalPrimaryKeys());
93-
this.bucketKeyEncoder =
94-
tableInfo.isDefaultBucketKey()
95-
? primaryKeyEncoder
96-
: LakeKeyEncoderFactory.createKeyEncoder(
97-
dataLakeFormat, lookupRowType, tableInfo.getBucketKeys());
98-
this.lakeBucketAssigner =
99-
LakeBucketAssignerFactory.createLakeBucketAssigner(dataLakeFormat, numBuckets);
100-
} else {
101-
// the encoded primary key is the physical primary key
102-
this.primaryKeyEncoder =
103-
CompactedKeyEncoder.createKeyEncoder(
104-
lookupRowType, tableInfo.getPhysicalPrimaryKeys());
105-
this.bucketKeyEncoder =
106-
tableInfo.isDefaultBucketKey()
107-
? primaryKeyEncoder
108-
: CompactedKeyEncoder.createKeyEncoder(
109-
lookupRowType, tableInfo.getBucketKeys());
110-
this.lakeBucketAssigner = null;
111-
}
78+
DataLakeFormat lakeFormat = tableInfo.getTableConfig().getDataLakeFormat().orElse(null);
79+
80+
// the encoded primary key is the physical primary key
81+
this.primaryKeyEncoder =
82+
KeyEncoder.of(lookupRowType, tableInfo.getPhysicalPrimaryKeys(), lakeFormat);
83+
this.bucketKeyEncoder =
84+
tableInfo.isDefaultBucketKey()
85+
? primaryKeyEncoder
86+
: KeyEncoder.of(lookupRowType, tableInfo.getBucketKeys(), lakeFormat);
87+
this.bucketingFunction = BucketingFunction.of(lakeFormat);
11288

11389
this.partitionGetter =
11490
tableInfo.isPartitioned()
@@ -138,7 +114,7 @@ public CompletableFuture<LookupResult> lookup(InternalRow lookupKey) {
138114
partitionGetter,
139115
tableInfo.getTablePath(),
140116
metadataUpdater);
141-
int bucketId = getBucketId(bkBytes, lakeBucketAssigner, numBuckets);
117+
int bucketId = bucketingFunction.bucketing(bkBytes, numBuckets);
142118
TableBucket tableBucket = new TableBucket(tableInfo.getTableId(), partitionId, bucketId);
143119
return lookupClient
144120
.lookup(tableBucket, pkBytes)

fluss-client/src/main/java/com/alibaba/fluss/client/table/writer/AppendWriterImpl.java

Lines changed: 3 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,13 @@
1919
import com.alibaba.fluss.client.metadata.MetadataUpdater;
2020
import com.alibaba.fluss.client.write.WriteRecord;
2121
import com.alibaba.fluss.client.write.WriterClient;
22-
import com.alibaba.fluss.lakehouse.DataLakeFormat;
23-
import com.alibaba.fluss.lakehouse.LakeKeyEncoderFactory;
22+
import com.alibaba.fluss.metadata.DataLakeFormat;
2423
import com.alibaba.fluss.metadata.LogFormat;
2524
import com.alibaba.fluss.metadata.PhysicalTablePath;
2625
import com.alibaba.fluss.metadata.TableInfo;
2726
import com.alibaba.fluss.metadata.TablePath;
2827
import com.alibaba.fluss.row.InternalRow;
2928
import com.alibaba.fluss.row.InternalRow.FieldGetter;
30-
import com.alibaba.fluss.row.encode.CompactedKeyEncoder;
3129
import com.alibaba.fluss.row.encode.IndexedRowEncoder;
3230
import com.alibaba.fluss.row.encode.KeyEncoder;
3331
import com.alibaba.fluss.row.indexed.IndexedRow;
@@ -36,7 +34,6 @@
3634
import javax.annotation.Nullable;
3735

3836
import java.util.List;
39-
import java.util.Optional;
4037
import java.util.concurrent.CompletableFuture;
4138

4239
/** The writer to write data to the log table. */
@@ -60,18 +57,8 @@ class AppendWriterImpl extends AbstractTableWriter implements AppendWriter {
6057
this.bucketKeyEncoder = null;
6158
} else {
6259
RowType rowType = tableInfo.getSchema().getRowType();
63-
Optional<DataLakeFormat> optDataLakeFormat =
64-
tableInfo.getTableConfig().getDataLakeFormat();
65-
this.bucketKeyEncoder =
66-
optDataLakeFormat
67-
.map(
68-
dataLakeFormat ->
69-
LakeKeyEncoderFactory.createKeyEncoder(
70-
dataLakeFormat, rowType, bucketKeys))
71-
.orElseGet(
72-
() ->
73-
CompactedKeyEncoder.createKeyEncoder(
74-
rowType, bucketKeys));
60+
DataLakeFormat lakeFormat = tableInfo.getTableConfig().getDataLakeFormat().orElse(null);
61+
this.bucketKeyEncoder = KeyEncoder.of(rowType, bucketKeys, lakeFormat);
7562
}
7663

7764
this.logFormat = tableInfo.getTableConfig().getLogFormat();

fluss-client/src/main/java/com/alibaba/fluss/client/table/writer/UpsertWriterImpl.java

Lines changed: 9 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,14 @@
1919
import com.alibaba.fluss.client.metadata.MetadataUpdater;
2020
import com.alibaba.fluss.client.write.WriteRecord;
2121
import com.alibaba.fluss.client.write.WriterClient;
22-
import com.alibaba.fluss.lakehouse.DataLakeFormat;
23-
import com.alibaba.fluss.lakehouse.LakeKeyEncoderFactory;
22+
import com.alibaba.fluss.metadata.DataLakeFormat;
2423
import com.alibaba.fluss.metadata.KvFormat;
2524
import com.alibaba.fluss.metadata.TableInfo;
2625
import com.alibaba.fluss.metadata.TablePath;
2726
import com.alibaba.fluss.row.BinaryRow;
2827
import com.alibaba.fluss.row.InternalRow;
2928
import com.alibaba.fluss.row.InternalRow.FieldGetter;
3029
import com.alibaba.fluss.row.compacted.CompactedRow;
31-
import com.alibaba.fluss.row.encode.CompactedKeyEncoder;
3230
import com.alibaba.fluss.row.encode.KeyEncoder;
3331
import com.alibaba.fluss.row.encode.RowEncoder;
3432
import com.alibaba.fluss.row.indexed.IndexedRow;
@@ -38,7 +36,6 @@
3836

3937
import java.util.BitSet;
4038
import java.util.List;
41-
import java.util.Optional;
4239
import java.util.concurrent.CompletableFuture;
4340

4441
/** The writer to write data to the primary key table. */
@@ -67,30 +64,14 @@ class UpsertWriterImpl extends AbstractTableWriter implements UpsertWriter {
6764
sanityCheck(rowType, tableInfo.getPrimaryKeys(), partialUpdateColumns);
6865

6966
this.targetColumns = partialUpdateColumns;
70-
Optional<DataLakeFormat> optDataLakeFormat = tableInfo.getTableConfig().getDataLakeFormat();
71-
72-
if (optDataLakeFormat.isPresent()) {
73-
DataLakeFormat dataLakeFormat = optDataLakeFormat.get();
74-
// encode primary key using physical primary key
75-
this.primaryKeyEncoder =
76-
LakeKeyEncoderFactory.createKeyEncoder(
77-
dataLakeFormat, rowType, tableInfo.getPhysicalPrimaryKeys());
78-
this.bucketKeyEncoder =
79-
tableInfo.isDefaultBucketKey()
80-
? primaryKeyEncoder
81-
: LakeKeyEncoderFactory.createKeyEncoder(
82-
dataLakeFormat, rowType, tableInfo.getBucketKeys());
83-
} else {
84-
// encode primary key using physical primary key
85-
this.primaryKeyEncoder =
86-
CompactedKeyEncoder.createKeyEncoder(
87-
rowType, tableInfo.getPhysicalPrimaryKeys());
88-
this.bucketKeyEncoder =
89-
tableInfo.isDefaultBucketKey()
90-
? primaryKeyEncoder
91-
: CompactedKeyEncoder.createKeyEncoder(
92-
rowType, tableInfo.getBucketKeys());
93-
}
67+
DataLakeFormat lakeFormat = tableInfo.getTableConfig().getDataLakeFormat().orElse(null);
68+
// encode primary key using physical primary key
69+
this.primaryKeyEncoder =
70+
KeyEncoder.of(rowType, tableInfo.getPhysicalPrimaryKeys(), lakeFormat);
71+
this.bucketKeyEncoder =
72+
tableInfo.isDefaultBucketKey()
73+
? primaryKeyEncoder
74+
: KeyEncoder.of(rowType, tableInfo.getBucketKeys(), lakeFormat);
9475

9576
this.kvFormat = tableInfo.getTableConfig().getKvFormat();
9677
this.rowEncoder = RowEncoder.create(kvFormat, rowType);

fluss-client/src/main/java/com/alibaba/fluss/client/utils/ClientUtils.java

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -18,20 +18,16 @@
1818

1919
import com.alibaba.fluss.client.metadata.MetadataUpdater;
2020
import com.alibaba.fluss.client.table.getter.PartitionGetter;
21-
import com.alibaba.fluss.client.write.HashBucketAssigner;
2221
import com.alibaba.fluss.config.ConfigOptions;
2322
import com.alibaba.fluss.exception.IllegalConfigurationException;
2423
import com.alibaba.fluss.exception.PartitionNotExistException;
25-
import com.alibaba.fluss.lakehouse.LakeBucketAssigner;
2624
import com.alibaba.fluss.metadata.PhysicalTablePath;
2725
import com.alibaba.fluss.metadata.TablePath;
2826
import com.alibaba.fluss.row.InternalRow;
2927

3028
import org.slf4j.Logger;
3129
import org.slf4j.LoggerFactory;
3230

33-
import javax.annotation.Nullable;
34-
3531
import java.net.InetSocketAddress;
3632
import java.util.ArrayList;
3733
import java.util.List;
@@ -134,15 +130,4 @@ public static Long getPartitionId(
134130
metadataUpdater.checkAndUpdatePartitionMetadata(physicalTablePath);
135131
return metadataUpdater.getCluster().getPartitionIdOrElseThrow(physicalTablePath);
136132
}
137-
138-
public static int getBucketId(
139-
byte[] bucketKeyBytes,
140-
@Nullable LakeBucketAssigner lakeBucketAssigner,
141-
int numBuckets) {
142-
if (lakeBucketAssigner == null) {
143-
return HashBucketAssigner.bucketForRowKey(bucketKeyBytes, numBuckets);
144-
} else {
145-
return lakeBucketAssigner.assignBucket(bucketKeyBytes);
146-
}
147-
}
148133
}

fluss-client/src/main/java/com/alibaba/fluss/client/write/HashBucketAssigner.java

Lines changed: 9 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -17,33 +17,26 @@
1717
package com.alibaba.fluss.client.write;
1818

1919
import com.alibaba.fluss.annotation.Internal;
20-
import com.alibaba.fluss.utils.MathUtils;
21-
import com.alibaba.fluss.utils.MurmurHashUtils;
22-
import com.alibaba.fluss.utils.Preconditions;
23-
24-
import static com.alibaba.fluss.utils.UnsafeUtils.BYTE_ARRAY_BASE_OFFSET;
20+
import com.alibaba.fluss.bucketing.BucketingFunction;
21+
import com.alibaba.fluss.bucketing.FlussBucketingFunction;
2522

2623
/** Hash bucket assigner. */
2724
@Internal
2825
public class HashBucketAssigner extends StaticBucketAssigner {
2926

3027
private final int numBuckets;
28+
private final BucketingFunction function;
3129

3230
public HashBucketAssigner(int numBuckets) {
33-
this.numBuckets = numBuckets;
31+
this(numBuckets, new FlussBucketingFunction());
3432
}
3533

36-
public int assignBucket(byte[] bucketKeys) {
37-
return bucketForRowKey(bucketKeys, numBuckets);
34+
public HashBucketAssigner(int numBuckets, BucketingFunction function) {
35+
this.numBuckets = numBuckets;
36+
this.function = function;
3837
}
3938

40-
/**
41-
* If the table contains primary key, the default hashing function to choose a bucket from the
42-
* serialized key bytes.
43-
*/
44-
public static int bucketForRowKey(final byte[] key, final int numBuckets) {
45-
Preconditions.checkArgument(key.length != 0, "Assigned key must not be empty!");
46-
int keyHash = MurmurHashUtils.hashUnsafeBytes(key, BYTE_ARRAY_BASE_OFFSET, key.length);
47-
return MathUtils.murmurHash(keyHash) % numBuckets;
39+
public int assignBucket(byte[] bucketKeys) {
40+
return function.bucketing(bucketKeys, numBuckets);
4841
}
4942
}

0 commit comments

Comments
 (0)