Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import com.facebook.presto.spi.security.Identity;
import com.facebook.presto.sql.tree.ExistsPredicate;
import com.facebook.presto.sql.tree.Expression;
import com.facebook.presto.sql.tree.FieldReference;
import com.facebook.presto.sql.tree.FunctionCall;
import com.facebook.presto.sql.tree.GroupingOperation;
import com.facebook.presto.sql.tree.Identifier;
Expand Down Expand Up @@ -197,6 +198,7 @@ public class Analysis
private Optional<TableHandle> analyzeTarget = Optional.empty();

private Optional<List<ColumnMetadata>> updatedColumns = Optional.empty();
private Optional<MergeAnalysis> mergeAnalysis = Optional.empty();

// for describe input and describe output
private final boolean isDescribe;
Expand Down Expand Up @@ -233,6 +235,9 @@ public class Analysis
private final Set<NodeRef<Relation>> aliasedRelations = new LinkedHashSet<>();
private final Set<NodeRef<TableFunctionInvocation>> polymorphicTableFunctions = new LinkedHashSet<>();

// Row id field used for MERGE INTO command.
private final Map<NodeRef<Table>, FieldReference> rowIdField = new LinkedHashMap<>();

public Analysis(@Nullable Statement root, Map<NodeRef<Parameter>, Expression> parameters, boolean isDescribe)
{
this.root = root;
Expand Down Expand Up @@ -445,6 +450,16 @@ public Expression getJoinCriteria(Join join)
return joins.get(NodeRef.of(join));
}

public void setRowIdField(Table table, FieldReference field)
{
rowIdField.put(NodeRef.of(table), field);
}

public FieldReference getRowIdField(Table table)
{
return rowIdField.get(NodeRef.of(table));
}

public void recordSubqueries(Node node, ExpressionAnalysis expressionAnalysis)
{
NodeRef<Node> key = NodeRef.of(node);
Expand Down Expand Up @@ -778,6 +793,16 @@ public Optional<List<ColumnMetadata>> getUpdatedColumns()
return updatedColumns;
}

public Optional<MergeAnalysis> getMergeAnalysis()
{
return mergeAnalysis;
}

public void setMergeAnalysis(MergeAnalysis mergeAnalysis)
{
this.mergeAnalysis = Optional.of(mergeAnalysis);
}

public void setRefreshMaterializedViewAnalysis(RefreshMaterializedViewAnalysis refreshMaterializedViewAnalysis)
{
this.refreshMaterializedViewAnalysis = Optional.of(refreshMaterializedViewAnalysis);
Expand Down Expand Up @@ -1817,4 +1842,76 @@ public ConnectorTransactionHandle getTransactionHandle()
return transactionHandle;
}
}

public static class MergeAnalysis
{
private final Table targetTable;
private final List<ColumnMetadata> targetColumnsMetadata;
private final List<ColumnHandle> targetColumnHandles;
private final List<List<ColumnHandle>> mergeCaseColumnHandles;
private final Set<ColumnHandle> nonNullableColumnHandles;
private final Map<ColumnHandle, Integer> columnHandleFieldNumbers;
private final Scope targetTableScope;
private final Scope joinScope;

public MergeAnalysis(
Table targetTable,
List<ColumnMetadata> targetColumnsMetadata,
List<ColumnHandle> targetColumnHandles,
List<List<ColumnHandle>> mergeCaseColumnHandles,
Set<ColumnHandle> nonNullableTargetColumnHandles,
Map<ColumnHandle, Integer> targetColumnHandleFieldNumbers,
Scope targetTableScope,
Scope joinScope)
{
this.targetTable = requireNonNull(targetTable, "targetTable is null");
this.targetColumnsMetadata = requireNonNull(targetColumnsMetadata, "targetColumnsMetadata is null");
this.targetColumnHandles = requireNonNull(targetColumnHandles, "targetColumnHandles is null");
this.mergeCaseColumnHandles = requireNonNull(mergeCaseColumnHandles, "mergeCaseColumnHandles is null");
this.nonNullableColumnHandles = requireNonNull(nonNullableTargetColumnHandles, "nonNullableTargetColumnHandles is null");
this.columnHandleFieldNumbers = requireNonNull(targetColumnHandleFieldNumbers, "targetColumnHandleFieldNumbers is null");
this.targetTableScope = requireNonNull(targetTableScope, "targetTableScope is null");
this.joinScope = requireNonNull(joinScope, "joinScope is null");
}

public Table getTargetTable()
{
return targetTable;
}

public List<ColumnMetadata> getTargetColumnsMetadata()
{
return targetColumnsMetadata;
}

public List<ColumnHandle> getTargetColumnHandles()
{
return targetColumnHandles;
}

public List<List<ColumnHandle>> getMergeCaseColumnHandles()
{
return mergeCaseColumnHandles;
}

public Set<ColumnHandle> getNonNullableColumnHandles()
{
return nonNullableColumnHandles;
}

public Map<ColumnHandle, Integer> getColumnHandleFieldNumbers()
{
return columnHandleFieldNumbers;
}

public Scope getJoinScope()
{
return joinScope;
}

public Scope getTargetTableScope()
{
return targetTableScope;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -307,4 +307,27 @@ static Block[] ensureBlocksAreLoaded(Block[] blocks)
// No newly loaded blocks
return blocks;
}

static boolean[] copyIsNullAndAppendNull(@Nullable boolean[] isNull, int offsetBase, int positionCount)
{
int desiredLength = offsetBase + positionCount + 1;
boolean[] newIsNull = new boolean[desiredLength];
if (isNull != null) {
checkArrayRange(isNull, offsetBase, positionCount);
System.arraycopy(isNull, 0, newIsNull, 0, desiredLength - 1);
}
// mark the last element to append null
newIsNull[desiredLength - 1] = true;
return newIsNull;
}

static int[] copyOffsetsAndAppendNull(int[] offsets, int offsetBase, int positionCount)
{
int desiredLength = offsetBase + positionCount + 2;
checkArrayRange(offsets, offsetBase, positionCount + 1);
int[] newOffsets = Arrays.copyOf(offsets, desiredLength);
// Null element does not move the offset forward
newOffsets[desiredLength - 1] = newOffsets[desiredLength - 2];
return newOffsets;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,33 @@ public Block getLoadedBlock()
return new DictionaryBlock(idsOffset, getPositionCount(), loadedDictionary, ids, false, randomDictionaryId());
}

public Block createProjection(Block newDictionary)
{
if (newDictionary.getPositionCount() != dictionary.getPositionCount()) {
throw new IllegalArgumentException("newDictionary must have the same position count");
}

// if the new dictionary is lazy be careful to not materialize it
if (newDictionary instanceof LazyBlock) {
return new LazyBlock(positionCount, (block) -> {
Block newDictionaryBlock = newDictionary.getBlock(0);
Block newBlock = createProjection(newDictionaryBlock);
block.setBlock(newBlock);
});
}
if (newDictionary instanceof RunLengthEncodedBlock) {
RunLengthEncodedBlock rle = (RunLengthEncodedBlock) newDictionary;
return new RunLengthEncodedBlock(rle.getValue(), positionCount);
}

// unwrap dictionary in dictionary
int[] newIds = new int[positionCount];
for (int position = 0; position < positionCount; position++) {
newIds[position] = getIdUnchecked(position);
}
return new DictionaryBlock(0, positionCount, newDictionary, newIds, false, randomDictionaryId());
}

public Block getDictionary()
{
return dictionary;
Expand All @@ -533,6 +560,11 @@ public int getId(int position)
return ids[position + idsOffset];
}

private int getIdUnchecked(int position)
{
return ids[position + idsOffset];
}

public DictionaryId getDictionarySourceId()
{
return dictionarySourceId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@
import org.openjdk.jol.info.ClassLayout;

import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.function.ObjLongConsumer;
import java.util.stream.Collectors;

import static com.facebook.presto.common.block.BlockUtil.ensureBlocksAreLoaded;
import static io.airlift.slice.SizeOf.sizeOf;
Expand Down Expand Up @@ -248,6 +250,39 @@ public void retainedBytesForEachPart(ObjLongConsumer<Object> consumer)
consumer.accept(this, INSTANCE_SIZE);
}

/**
* Returns the row fields from the specified block. The block maybe a LazyBlock, RunLengthEncodedBlock, or
* DictionaryBlock, but the underlying block must be a RowBlock. The returned field blocks will be the same
* length as the specified block, which means they are not null suppressed.
*/
public static List<Block> getRowFieldsFromBlock(Block block)
{
// if the block is lazy, be careful to not materialize the nested blocks
if (block instanceof LazyBlock) {
LazyBlock lazyBlock = (LazyBlock) block;
block = lazyBlock.getBlock(0);
}

if (block instanceof RunLengthEncodedBlock) {
RunLengthEncodedBlock runLengthEncodedBlock = (RunLengthEncodedBlock) block;
RowBlock rowBlock = (RowBlock) runLengthEncodedBlock.getValue();
return Arrays.stream(rowBlock.fieldBlocks)
.map(fieldBlock -> new RunLengthEncodedBlock(fieldBlock, runLengthEncodedBlock.getPositionCount()))
.collect(Collectors.toList());
}
if (block instanceof DictionaryBlock) {
DictionaryBlock dictionaryBlock = (DictionaryBlock) block;
RowBlock rowBlock = (RowBlock) dictionaryBlock.getDictionary();
return Arrays.stream(rowBlock.fieldBlocks)
.map(dictionaryBlock::createProjection)
.collect(Collectors.toList());
}
if (block instanceof RowBlock) {
return Arrays.asList(((RowBlock) block).fieldBlocks);
}
throw new IllegalArgumentException("Unexpected block type: " + block.getClass().getSimpleName());
}

@Override
public String toString()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,12 @@ public void writeBoolean(BlockBuilder blockBuilder, boolean value)
throw new UnsupportedOperationException(getClass().getName());
}

@Override
public byte getByte(Block block, int position)
{
throw new UnsupportedOperationException(getClass().getName());
}

@Override
public long getLong(Block block, int position)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,12 @@ public void writeBoolean(BlockBuilder blockBuilder, boolean value)
throw new UnsupportedOperationException(getClass().getName());
}

@Override
public byte getByte(Block block, int position)
{
throw new UnsupportedOperationException(getClass().getName());
}

@Override
public long getLong(Block block, int position)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,12 @@ public long getLong(Block block, int position)
return (long) block.getByte(position);
}

@Override
public byte getByte(Block block, int position)
{
return block.getByte(position);
}

@Override
public long getLongUnchecked(UncheckedBlock block, int internalPosition)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@ default boolean equalValuesAreIdentical()
*/
boolean getBooleanUnchecked(UncheckedBlock block, int internalPosition);

/**
* Gets the value at the {@code block} {@code position} as a byte.
*/
byte getByte(Block block, int position);

/**
* Gets the value at the {@code block} {@code position} as a long.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,12 @@ public boolean getBooleanUnchecked(UncheckedBlock block, int internalPosition)
return type.getBooleanUnchecked(block, internalPosition);
}

@Override
public byte getByte(Block block, int position)
{
return type.getByte(block, position);
}

@Override
public long getLong(Block block, int position)
{
Expand Down
20 changes: 16 additions & 4 deletions presto-docs/src/main/sphinx/connector/iceberg.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1277,6 +1277,8 @@ SQL Operation Presto Java Presto C++ Comments
``DESCRIBE`` Yes Yes

``UPDATE`` Yes No

``MERGE`` Yes No
============================== ============= ============ ============================================================================

The Iceberg connector supports querying and manipulating Iceberg tables and schemas
Expand Down Expand Up @@ -1727,11 +1729,11 @@ For example, ``DESCRIBE`` from the partitioned Iceberg table ``customer``:
comment | varchar | |
(3 rows)

UPDATE
^^^^^^
UPDATE and MERGE
^^^^^^^^^^^^^^^^

The Iceberg connector supports :doc:`../sql/update` operations on Iceberg
tables. Only some tables support updates. These tables must be at minimum format
The Iceberg connector supports :doc:`../sql/update` and :doc:`../sql/merge` operations on Iceberg
tables. Only some tables support them. These tables must be at minimum format
version 2, and the ``write.update.mode`` must be set to `merge-on-read`.

.. code-block:: sql
Expand All @@ -1751,6 +1753,16 @@ updates.

Query 20250204_010445_00022_ymwi5 failed: Iceberg table updates require at least format version 2 and update mode must be merge-on-read

Iceberg tables do not support running multiple ``MERGE`` statements on the same table in parallel. If two or more ``MERGE`` operations are executed concurrently on the same Iceberg table:

* The first operation to complete will succeed.
* Subsequent operations will fail due to conflicting writes and will return the following error:

.. code-block:: text

Failed to commit Iceberg update to table: <table name>
Found conflicting files that can contain records matching true

Schema Evolution
----------------

Expand Down
1 change: 1 addition & 0 deletions presto-docs/src/main/sphinx/sql.rst
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ This chapter describes the SQL syntax used in Presto.
sql/grant
sql/grant-roles
sql/insert
sql/merge
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At this commit, the merge into only supports parsing and not the full functionality. Let's move the documentation to the last commit when it's fully supported.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, it's done.

sql/prepare
sql/refresh-materialized-view
sql/reset-session
Expand Down
Loading
Loading