Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.pinot.spi.data.TimeGranularitySpec;

import java.util.List;
import java.util.Locale;
import java.util.Optional;

import static com.facebook.presto.pinot.PinotErrorCode.PINOT_UNSUPPORTED_COLUMN_TYPE;
Expand All @@ -52,15 +53,20 @@ private PinotColumnUtils()

public static List<PinotColumn> getPinotColumnsForPinotSchema(Schema pinotTableSchema, boolean inferDateType, boolean inferTimestampType)
{
return getPinotColumnsForPinotSchema(pinotTableSchema, inferDateType, inferTimestampType, false);
return getPinotColumnsForPinotSchema(pinotTableSchema, inferDateType, inferTimestampType, false, false);
}

public static List<PinotColumn> getPinotColumnsForPinotSchema(Schema pinotTableSchema, boolean inferDateType, boolean inferTimestampType, boolean nullHandlingEnabled)
public static List<PinotColumn> getPinotColumnsForPinotSchema(
Schema pinotTableSchema,
boolean inferDateType,
boolean inferTimestampType,
boolean nullHandlingEnabled,
boolean isCaseSensitiveNameMatchingEnabled)
{
return pinotTableSchema.getColumnNames().stream()
.filter(columnName -> !columnName.startsWith("$")) // Hidden columns starts with "$", ignore them as we can't use them in SQL
.map(columnName -> new PinotColumn(
columnName,
isCaseSensitiveNameMatchingEnabled ? columnName : columnName.toLowerCase(Locale.ROOT),
getPrestoTypeFromPinotType(pinotTableSchema.getFieldSpecFor(columnName), inferDateType, inferTimestampType),
isNullableColumnFromPinotType(pinotTableSchema.getFieldSpecFor(columnName), nullHandlingEnabled),
getCommentFromPinotType(pinotTableSchema.getFieldSpecFor(columnName))))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,10 @@ public List<PinotColumn> load(String tableName)
throws Exception
{
Schema tablePinotSchema = pinotClusterInfoFetcher.getTableSchema(tableName);
return PinotColumnUtils.getPinotColumnsForPinotSchema(tablePinotSchema, pinotConfig.isInferDateTypeInSchema(), pinotConfig.isInferTimestampTypeInSchema(), nullHandlingEnabled);
return PinotColumnUtils.getPinotColumnsForPinotSchema(tablePinotSchema, pinotConfig.isInferDateTypeInSchema(),
pinotConfig.isInferTimestampTypeInSchema(),
nullHandlingEnabled,
pinotConfig.isCaseSensitiveNameMatchingEnabled());
}
}, executor));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,13 @@ public List<String> listSchemaNames(ConnectorSession session)
return ImmutableList.of("default");
}

private String getPinotTableNameFromPrestoTableName(String prestoTableName)
private String getPinotTableNameFromPrestoTableName(ConnectorSession session, String prestoTableName)
{
List<String> allTables = pinotPrestoConnection.getTableNames();
String normalizedPrestoTableName = normalizeIdentifier(session, prestoTableName);
for (String pinotTableName : allTables) {
if (prestoTableName.equals(pinotTableName)) {
String normalizedPinotTableName = normalizeIdentifier(session, pinotTableName);
if (normalizedPrestoTableName.equals(normalizedPinotTableName)) {
return pinotTableName;
}
}
Expand All @@ -77,7 +79,7 @@ private String getPinotTableNameFromPrestoTableName(String prestoTableName)
@Override
public PinotTableHandle getTableHandle(ConnectorSession session, SchemaTableName tableName)
{
String pinotTableName = getPinotTableNameFromPrestoTableName(tableName.getTableName());
String pinotTableName = getPinotTableNameFromPrestoTableName(session, tableName.getTableName());
return new PinotTableHandle(connectorId, tableName.getSchemaName(), pinotTableName);
}

Expand Down Expand Up @@ -107,7 +109,7 @@ public ConnectorTableMetadata getTableMetadata(ConnectorSession session, Connect
checkArgument(pinotTableHandle.getConnectorId().equals(connectorId), "tableHandle is not for this connector");
SchemaTableName tableName = new SchemaTableName(pinotTableHandle.getSchemaName(), pinotTableHandle.getTableName());

return getTableMetadata(tableName);
return getTableMetadata(session, tableName);
}

@Override
Expand All @@ -126,7 +128,7 @@ public Map<String, ColumnHandle> getColumnHandles(ConnectorSession session, Conn
PinotTableHandle pinotTableHandle = (PinotTableHandle) tableHandle;
checkArgument(pinotTableHandle.getConnectorId().equals(connectorId), "tableHandle is not for this connector");

String pinotTableName = getPinotTableNameFromPrestoTableName(pinotTableHandle.getTableName());
String pinotTableName = getPinotTableNameFromPrestoTableName(session, pinotTableHandle.getTableName());
PinotTable table = pinotPrestoConnection.getTable(pinotTableName);
if (table == null) {
throw new TableNotFoundException(pinotTableHandle.toSchemaTableName());
Expand All @@ -145,7 +147,7 @@ public Map<SchemaTableName, List<ColumnMetadata>> listTableColumns(ConnectorSess
requireNonNull(prefix, "prefix is null");
ImmutableMap.Builder<SchemaTableName, List<ColumnMetadata>> columns = ImmutableMap.builder();
for (SchemaTableName tableName : listTables(session, prefix)) {
ConnectorTableMetadata tableMetadata = getTableMetadata(tableName);
ConnectorTableMetadata tableMetadata = getTableMetadata(session, tableName);
// table can disappear during listing operation
if (tableMetadata != null) {
columns.put(tableName, tableMetadata.getColumns());
Expand All @@ -154,9 +156,9 @@ public Map<SchemaTableName, List<ColumnMetadata>> listTableColumns(ConnectorSess
return columns.build();
}

private ConnectorTableMetadata getTableMetadata(SchemaTableName tableName)
private ConnectorTableMetadata getTableMetadata(ConnectorSession session, SchemaTableName tableName)
{
String pinotTableName = getPinotTableNameFromPrestoTableName(tableName.getTableName());
String pinotTableName = getPinotTableNameFromPrestoTableName(session, tableName.getTableName());
PinotTable table = pinotPrestoConnection.getTable(pinotTableName);
if (table == null) {
return null;
Expand Down
Loading