Skip to content

Commit 14ae7d4

Browse files
authored
[common] Change class name DefaultLogRecord to IndexedLogRecord (#394)
1 parent 71ebe1b commit 14ae7d4

File tree

10 files changed

+30
-27
lines changed

10 files changed

+30
-27
lines changed

fluss-client/src/main/java/com/alibaba/fluss/client/write/WriteRecord.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@
2020
import com.alibaba.fluss.metadata.PhysicalTablePath;
2121
import com.alibaba.fluss.record.DefaultKvRecord;
2222
import com.alibaba.fluss.record.DefaultKvRecordBatch;
23-
import com.alibaba.fluss.record.DefaultLogRecord;
2423
import com.alibaba.fluss.record.DefaultLogRecordBatch;
24+
import com.alibaba.fluss.record.IndexedLogRecord;
2525
import com.alibaba.fluss.row.BinaryRow;
2626
import com.alibaba.fluss.row.InternalRow;
2727
import com.alibaba.fluss.row.indexed.IndexedRow;
@@ -84,7 +84,7 @@ public static WriteRecord forIndexedAppend(
8484
PhysicalTablePath tablePath, IndexedRow row, @Nullable byte[] bucketKey) {
8585
checkNotNull(row);
8686
int estimatedSizeInBytes =
87-
DefaultLogRecord.sizeOf(row) + DefaultLogRecordBatch.RECORD_BATCH_HEADER_SIZE;
87+
IndexedLogRecord.sizeOf(row) + DefaultLogRecordBatch.RECORD_BATCH_HEADER_SIZE;
8888
return new WriteRecord(
8989
tablePath,
9090
null,

fluss-client/src/test/java/com/alibaba/fluss/client/write/IndexedLogWriteBatchTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@
1919
import com.alibaba.fluss.memory.MemorySegment;
2020
import com.alibaba.fluss.memory.PreAllocatedPagedOutputView;
2121
import com.alibaba.fluss.metadata.TableBucket;
22-
import com.alibaba.fluss.record.DefaultLogRecord;
2322
import com.alibaba.fluss.record.DefaultLogRecordBatch;
23+
import com.alibaba.fluss.record.IndexedLogRecord;
2424
import com.alibaba.fluss.record.LogRecord;
2525
import com.alibaba.fluss.record.LogRecordBatch;
2626
import com.alibaba.fluss.record.LogRecordReadContext;
@@ -52,7 +52,7 @@ public class IndexedLogWriteBatchTest {
5252
@BeforeEach
5353
void setup() {
5454
row = indexedRow(DATA1_ROW_TYPE, new Object[] {1, "a"});
55-
estimatedSizeInBytes = DefaultLogRecord.sizeOf(row);
55+
estimatedSizeInBytes = IndexedLogRecord.sizeOf(row);
5656
}
5757

5858
@Test

fluss-client/src/test/java/com/alibaba/fluss/client/write/RecordAccumulatorTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
import com.alibaba.fluss.metadata.TableInfo;
3232
import com.alibaba.fluss.metadata.TablePath;
3333
import com.alibaba.fluss.record.DefaultKvRecord;
34-
import com.alibaba.fluss.record.DefaultLogRecord;
34+
import com.alibaba.fluss.record.IndexedLogRecord;
3535
import com.alibaba.fluss.record.LogRecord;
3636
import com.alibaba.fluss.record.LogRecordBatch;
3737
import com.alibaba.fluss.record.LogRecordReadContext;
@@ -391,7 +391,7 @@ void testAppendWithStickyBucketAssigner() throws Exception {
391391
void testPartialDrain() throws Exception {
392392
IndexedRow row = indexedRow(DATA1_ROW_TYPE, new Object[] {1, "a"});
393393
RecordAccumulator accum = createTestRecordAccumulator(1024, 10L * 1024);
394-
int appends = 1024 / DefaultLogRecord.sizeOf(row) + 1;
394+
int appends = 1024 / IndexedLogRecord.sizeOf(row) + 1;
395395
List<TableBucket> buckets = Arrays.asList(tb1, tb2);
396396
for (TableBucket tb : buckets) {
397397
for (int i = 0; i < appends; i++) {
@@ -597,7 +597,7 @@ private int expectedNumAppends(IndexedRow row, int batchSize) {
597597
int size = RECORD_BATCH_HEADER_SIZE;
598598
int offsetDelta = 0;
599599
while (true) {
600-
int recordSize = DefaultLogRecord.sizeOf(row);
600+
int recordSize = IndexedLogRecord.sizeOf(row);
601601
if (size + recordSize > batchSize) {
602602
return offsetDelta;
603603
}

fluss-common/src/main/java/com/alibaba/fluss/record/DefaultKvRecord.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
import java.nio.ByteBuffer;
3737

3838
/**
39-
* This class is an immutable kv record. Different from {@link DefaultLogRecord}, it isn't designed
39+
* This class is an immutable kv record. Different from {@link IndexedLogRecord}, it isn't designed
4040
* for persistence. The schema is as follows:
4141
*
4242
* <ul>

fluss-common/src/main/java/com/alibaba/fluss/record/DefaultLogRecordBatch.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -280,8 +280,8 @@ private CloseableIterator<LogRecord> rowRecordIterator(RowType rowType, long tim
280280

281281
@Override
282282
protected LogRecord readNext(long baseOffset) {
283-
DefaultLogRecord logRecord =
284-
DefaultLogRecord.readFrom(
283+
IndexedLogRecord logRecord =
284+
IndexedLogRecord.readFrom(
285285
segment, position, baseOffset + rowId, timestamp, fieldTypes);
286286
rowId++;
287287
position += logRecord.getSizeInBytes();

fluss-common/src/main/java/com/alibaba/fluss/record/DefaultLogRecord.java renamed to fluss-common/src/main/java/com/alibaba/fluss/record/IndexedLogRecord.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,7 @@
5454
* @since 0.1
5555
*/
5656
@PublicEvolving
57-
// TODO: should rename to IndexedLogRecord as it only indexed row?
58-
public class DefaultLogRecord implements LogRecord {
57+
public class IndexedLogRecord implements LogRecord {
5958

6059
private static final int ATTRIBUTES_LENGTH = 1;
6160

@@ -67,7 +66,7 @@ public class DefaultLogRecord implements LogRecord {
6766
private int offset;
6867
private int sizeInBytes;
6968

70-
DefaultLogRecord(long logOffset, long timestamp, DataType[] fieldTypes) {
69+
IndexedLogRecord(long logOffset, long timestamp, DataType[] fieldTypes) {
7170
this.logOffset = logOffset;
7271
this.fieldTypes = fieldTypes;
7372
this.timestamp = timestamp;
@@ -93,7 +92,7 @@ public boolean equals(Object o) {
9392
return false;
9493
}
9594

96-
DefaultLogRecord that = (DefaultLogRecord) o;
95+
IndexedLogRecord that = (IndexedLogRecord) o;
9796
return sizeInBytes == that.sizeInBytes
9897
&& segment.equalTo(that.segment, offset, that.offset, sizeInBytes);
9998
}
@@ -149,14 +148,14 @@ public static int writeTo(OutputView outputView, RowKind rowKind, IndexedRow row
149148
return sizeInBytes + LENGTH_LENGTH;
150149
}
151150

152-
public static DefaultLogRecord readFrom(
151+
public static IndexedLogRecord readFrom(
153152
MemorySegment segment,
154153
int position,
155154
long logOffset,
156155
long logTimestamp,
157156
DataType[] colTypes) {
158157
int sizeInBytes = segment.getInt(position);
159-
DefaultLogRecord logRecord = new DefaultLogRecord(logOffset, logTimestamp, colTypes);
158+
IndexedLogRecord logRecord = new IndexedLogRecord(logOffset, logTimestamp, colTypes);
160159
logRecord.pointTo(segment, position, sizeInBytes + LENGTH_LENGTH);
161160
return logRecord;
162161
}

fluss-common/src/main/java/com/alibaba/fluss/record/MemoryLogRecordsIndexedBuilder.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ public static MemoryLogRecordsIndexedBuilder builder(
109109
* appended, then this returns true.
110110
*/
111111
public boolean hasRoomFor(IndexedRow row) {
112-
return sizeInBytes + DefaultLogRecord.sizeOf(row) <= writeLimit;
112+
return sizeInBytes + IndexedLogRecord.sizeOf(row) <= writeLimit;
113113
}
114114

115115
public void append(RowKind rowKind, IndexedRow row) throws Exception {
@@ -122,7 +122,7 @@ private void appendRecord(RowKind rowKind, IndexedRow row) throws IOException {
122122
"Tried to append a record, but MemoryLogRecordsBuilder is closed for record appends");
123123
}
124124

125-
int recordByteSizes = DefaultLogRecord.writeTo(pagedOutputView, rowKind, row);
125+
int recordByteSizes = IndexedLogRecord.writeTo(pagedOutputView, rowKind, row);
126126
currentRecordNumber++;
127127
sizeInBytes += recordByteSizes;
128128
}

fluss-common/src/test/java/com/alibaba/fluss/record/DefaultLogRecordTest.java renamed to fluss-common/src/test/java/com/alibaba/fluss/record/IndexedLogRecordTest.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@
2929

3030
import static org.assertj.core.api.Assertions.assertThat;
3131

32-
/** Test for {@link DefaultLogRecord}. */
33-
class DefaultLogRecordTest extends LogTestBase {
32+
/** Test for {@link IndexedLogRecord}. */
33+
class IndexedLogRecordTest extends LogTestBase {
3434

3535
@Test
3636
void testBase() throws IOException {
@@ -43,10 +43,10 @@ void testBase() throws IOException {
4343
writer.writeString(BinaryString.fromString("abc"));
4444
row.pointTo(writer.segment(), 0, writer.position());
4545

46-
DefaultLogRecord.writeTo(outputView, RowKind.APPEND_ONLY, row);
46+
IndexedLogRecord.writeTo(outputView, RowKind.APPEND_ONLY, row);
4747
// Test read from.
48-
DefaultLogRecord defaultLogRecord =
49-
DefaultLogRecord.readFrom(
48+
IndexedLogRecord defaultLogRecord =
49+
IndexedLogRecord.readFrom(
5050
MemorySegment.wrap(outputView.getCopyOfBuffer()),
5151
0,
5252
1000,
@@ -64,13 +64,13 @@ void testBase() throws IOException {
6464
void testWriteToAndReadFromWithRandomData() throws IOException {
6565
// Test write to.
6666
IndexedRow row = TestInternalRowGenerator.genIndexedRowForAllType();
67-
DefaultLogRecord.writeTo(outputView, RowKind.APPEND_ONLY, row);
67+
IndexedLogRecord.writeTo(outputView, RowKind.APPEND_ONLY, row);
6868
DataType[] allColTypes =
6969
TestInternalRowGenerator.createAllRowType().getChildren().toArray(new DataType[0]);
7070

7171
// Test read from.
7272
LogRecord defaultLogRecord =
73-
DefaultLogRecord.readFrom(
73+
IndexedLogRecord.readFrom(
7474
MemorySegment.wrap(outputView.getCopyOfBuffer()),
7575
0,
7676
1000,

fluss-common/src/test/java/com/alibaba/fluss/record/LogTestBase.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,8 @@ protected void assertIndexedLogRecordBatchAndRowEquals(
8383
CloseableIterator<LogRecord> expectIter = expected.records(readContext)) {
8484
int i = 0;
8585
while (actualIter.hasNext() && expectIter.hasNext()) {
86-
DefaultLogRecord actualRecord = (DefaultLogRecord) actualIter.next();
87-
DefaultLogRecord expectedRecord = (DefaultLogRecord) expectIter.next();
86+
IndexedLogRecord actualRecord = (IndexedLogRecord) actualIter.next();
87+
IndexedLogRecord expectedRecord = (IndexedLogRecord) expectIter.next();
8888
assertIndexedRecordEquals(actualRecord, expectedRecord, rows.get(i), i);
8989
i++;
9090
}

fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/catalog/FlinkCatalogTest.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,10 @@ static void afterAll() {
124124

125125
@BeforeEach
126126
void beforeEach() throws Exception {
127+
// First check if database exists, and drop it if it does
128+
if (catalog.databaseExists(DEFAULT_DB)) {
129+
catalog.dropDatabase(DEFAULT_DB, true, true);
130+
}
127131
try {
128132
catalog.createDatabase(
129133
DEFAULT_DB, new CatalogDatabaseImpl(Collections.emptyMap(), null), true);

0 commit comments

Comments
 (0)