Skip to content

Commit

Permalink
Merge pull request #1280 from cloudsufi/csm1779
Browse files Browse the repository at this point in the history
PLUGIN-1671 - Time Partitioning Type Support in BigQuery Sink Plugin
  • Loading branch information
vikasrathee-cs authored Aug 28, 2023
2 parents b46c5df + e5a0706 commit 698be58
Show file tree
Hide file tree
Showing 8 changed files with 219 additions and 16 deletions.
3 changes: 3 additions & 0 deletions docs/BigQueryTable-batchsink.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ is ignored if the table already exists.
* When this is set to Integer, table will be created with range partitioning.
* When this is set to None, table will be created without time partitioning.

**Time Partitioning Type**: Specifies the time partitioning type. Can either be Daily or Hourly or Monthly or Yearly.
Default is Daily. Ignored when table already exists

**Range Start**: For integer partitioning, specifies the start of the range. Only used when table doesn’t
exist already, and partitioning type is set to Integer.
* The start value is inclusive.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,9 @@ public void commitJob(JobContext jobContext) throws IOException {
LOG.debug("Allow schema relaxation: '{}'", allowSchemaRelaxation);
PartitionType partitionType = conf.getEnum(BigQueryConstants.CONFIG_PARTITION_TYPE, PartitionType.NONE);
LOG.debug("Create Partitioned Table type: '{}'", partitionType);
com.google.cloud.bigquery.TimePartitioning.Type timePartitioningType = conf.getEnum(
BigQueryConstants.CONFIG_TIME_PARTITIONING_TYPE, com.google.cloud.bigquery.TimePartitioning.Type.DAY
);
Range range = partitionType == PartitionType.INTEGER ? createRangeForIntegerPartitioning(conf) : null;
String partitionByField = conf.get(BigQueryConstants.CONFIG_PARTITION_BY_FIELD, null);
LOG.debug("Partition Field: '{}'", partitionByField);
Expand All @@ -254,7 +257,7 @@ public void commitJob(JobContext jobContext) throws IOException {

try {
importFromGcs(destProjectId, destTable, destSchema.orElse(null), kmsKeyName, outputFileFormat,
writeDisposition, sourceUris, partitionType, range, partitionByField,
writeDisposition, sourceUris, partitionType, timePartitioningType, range, partitionByField,
requirePartitionFilter, clusteringOrderList, tableExists, conf);
} catch (Exception e) {
throw new IOException("Failed to import GCS into BigQuery. ", e);
Expand Down Expand Up @@ -298,8 +301,9 @@ public void abortJob(JobContext context, JobStatus.State state) throws IOExcepti
*/
private void importFromGcs(String projectId, TableReference tableRef, @Nullable TableSchema schema,
@Nullable String kmsKeyName, BigQueryFileFormat sourceFormat, String writeDisposition,
List<String> gcsPaths, PartitionType partitionType, @Nullable Range range,
@Nullable String partitionByField, boolean requirePartitionFilter,
List<String> gcsPaths, PartitionType partitionType,
com.google.cloud.bigquery.TimePartitioning.Type timePartitioningType,
@Nullable Range range, @Nullable String partitionByField, boolean requirePartitionFilter,
List<String> clusteringOrderList, boolean tableExists, Configuration conf)
throws IOException, InterruptedException {
LOG.info("Importing into table '{}' from {} paths; path[0] is '{}'; awaitCompletion: {}",
Expand Down Expand Up @@ -357,7 +361,8 @@ private void importFromGcs(String projectId, TableReference tableRef, @Nullable
if (!tableExists) {
switch (partitionType) {
case TIME:
TimePartitioning timePartitioning = createTimePartitioning(partitionByField, requirePartitionFilter);
TimePartitioning timePartitioning = createTimePartitioning(partitionByField, requirePartitionFilter,
timePartitioningType);
loadConfig.setTimePartitioning(timePartitioning);
break;
case INTEGER:
Expand Down Expand Up @@ -756,9 +761,10 @@ private Range createRangeForIntegerPartitioning(Configuration conf) {
}

private TimePartitioning createTimePartitioning(
@Nullable String partitionByField, boolean requirePartitionFilter) {
@Nullable String partitionByField, boolean requirePartitionFilter,
com.google.cloud.bigquery.TimePartitioning.Type timePartitioningType) {
TimePartitioning timePartitioning = new TimePartitioning();
timePartitioning.setType("DAY");
timePartitioning.setType(timePartitioningType.name());
if (partitionByField != null) {
timePartitioning.setField(partitionByField);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.cloud.bigquery.JobId;
import com.google.cloud.bigquery.JobStatistics;
import com.google.cloud.bigquery.Table;
import com.google.cloud.bigquery.TimePartitioning;
import com.google.cloud.hadoop.io.bigquery.output.BigQueryTableFieldSchema;
import com.google.common.collect.ImmutableMap;
import com.google.gson.Gson;
Expand Down Expand Up @@ -316,6 +317,9 @@ private void configureBigQuerySink() {
PartitionType partitioningType = getConfig().getPartitioningType();
baseConfiguration.setEnum(BigQueryConstants.CONFIG_PARTITION_TYPE, partitioningType);

TimePartitioning.Type timePartitioningType = getConfig().getTimePartitioningType();
baseConfiguration.setEnum(BigQueryConstants.CONFIG_TIME_PARTITIONING_TYPE, timePartitioningType);

if (config.getRangeStart() != null) {
baseConfiguration.setLong(BigQueryConstants.CONFIG_PARTITION_INTEGER_RANGE_START, config.getRangeStart());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ public final class BigQuerySinkConfig extends AbstractBigQuerySinkConfig {
public static final String NAME_OPERATION = "operation";
public static final String PARTITION_FILTER = "partitionFilter";
public static final String NAME_PARTITIONING_TYPE = "partitioningType";
public static final String NAME_TIME_PARTITIONING_TYPE = "timePartitioningType";
public static final String NAME_RANGE_START = "rangeStart";
public static final String NAME_RANGE_END = "rangeEnd";
public static final String NAME_RANGE_INTERVAL = "rangeInterval";
Expand Down Expand Up @@ -102,6 +103,13 @@ public final class BigQuerySinkConfig extends AbstractBigQuerySinkConfig {
+ "Ignored when table already exists")
protected String partitioningType;

@Name(NAME_TIME_PARTITIONING_TYPE)
@Macro
@Nullable
@Description("Specifies the time partitioning type. Can either be Daily or Hourly or Monthly or Yearly. "
+ "Ignored when table already exists")
protected String timePartitioningType;

@Name(NAME_RANGE_START)
@Macro
@Nullable
Expand Down Expand Up @@ -269,6 +277,11 @@ public PartitionType getPartitioningType() {
: PartitionType.valueOf(partitioningType.toUpperCase());
}

public TimePartitioning.Type getTimePartitioningType() {
return Strings.isNullOrEmpty(timePartitioningType) ? TimePartitioning.Type.DAY :
TimePartitioning.Type.valueOf(timePartitioningType.toUpperCase());
}

/**
* @return the schema of the dataset
*/
Expand Down Expand Up @@ -458,7 +471,7 @@ private void validateColumnForPartition(@Nullable String columnName, @Nullable S
Schema fieldSchema = field.getSchema();
fieldSchema = fieldSchema.isNullable() ? fieldSchema.getNonNullable() : fieldSchema;
if (partitioningType == PartitionType.TIME) {
validateTimePartitioningColumn(columnName, collector, fieldSchema);
validateTimePartitioningColumn(columnName, collector, fieldSchema, getTimePartitioningType());
} else if (partitioningType == PartitionType.INTEGER) {
validateIntegerPartitioningColumn(columnName, collector, fieldSchema);
validateIntegerPartitioningRange(getRangeStart(), getRangeEnd(), getRangeInterval(), collector);
Expand All @@ -474,15 +487,30 @@ private void validateIntegerPartitioningColumn(String columnName, FailureCollect
}
}

private void validateTimePartitioningColumn(String columnName, FailureCollector collector, Schema fieldSchema) {
private void validateTimePartitioningColumn(String columnName, FailureCollector collector,
Schema fieldSchema, TimePartitioning.Type timePartitioningType) {

Schema.LogicalType logicalType = fieldSchema.getLogicalType();
if (logicalType != LogicalType.DATE && logicalType != LogicalType.TIMESTAMP_MICROS
&& logicalType != LogicalType.TIMESTAMP_MILLIS) {

boolean isTimestamp = logicalType == LogicalType.TIMESTAMP_MICROS || logicalType == LogicalType.TIMESTAMP_MILLIS;
boolean isDate = logicalType == LogicalType.DATE;
boolean isTimestampOrDate = isTimestamp || isDate;

// If timePartitioningType is HOUR, then logicalType cannot be DATE Only TIMESTAMP_MICROS and TIMESTAMP_MILLIS
if (timePartitioningType == TimePartitioning.Type.HOUR && !isTimestamp) {
collector.addFailure(
String.format("Partition column '%s' is of invalid type '%s'.", columnName, fieldSchema.getDisplayName()),
"Partition column must be a date or timestamp.")
.withConfigProperty(NAME_PARTITION_BY_FIELD)
.withOutputSchemaField(columnName).withInputSchemaField(columnName);
String.format("Partition column '%s' is of invalid type '%s'.",
columnName, fieldSchema.getDisplayName()),
"Partition column must be a timestamp.").withConfigProperty(NAME_PARTITION_BY_FIELD)
.withOutputSchemaField(columnName).withInputSchemaField(columnName);

// For any other timePartitioningType (DAY, MONTH, YEAR) logicalType can be DATE, TIMESTAMP_MICROS, TIMESTAMP_MILLIS
} else if (!isTimestampOrDate) {
collector.addFailure(
String.format("Partition column '%s' is of invalid type '%s'.",
columnName, fieldSchema.getDisplayName()),
"Partition column must be a date or timestamp.").withConfigProperty(NAME_PARTITION_BY_FIELD)
.withOutputSchemaField(columnName).withInputSchemaField(columnName);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -405,8 +405,9 @@ protected QueryJobConfiguration.Builder getUpdateUpsertQueryJobBuilder(TableId s
* @return Time Partitioning configuration
*/
protected TimePartitioning getTimePartitioning(BigQuerySinkConfig config) {
// Create time partitioning based on DAY
TimePartitioning.Builder timePartitioningBuilder = TimePartitioning.newBuilder(TimePartitioning.Type.DAY);

// Default partitioning type is DAY if not specified
TimePartitioning.Builder timePartitioningBuilder = TimePartitioning.newBuilder(config.getTimePartitioningType());

// Set partition field if specified
if (config.getPartitionByField() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public interface BigQueryConstants {
String CONFIG_VIEW_MATERIALIZATION_PROJECT = "cdap.bq.source.view.materialization.project";
String CONFIG_VIEW_MATERIALIZATION_DATASET = "cdap.bq.source.view.materialization.dataset";
String CONFIG_PARTITION_TYPE = "cdap.bq.sink.partition.type";
String CONFIG_TIME_PARTITIONING_TYPE = "cdap.bq.sink.time.partitioning.type";
String CONFIG_PARTITION_INTEGER_RANGE_START = "cdap.bq.sink.partition.integer.range.start";
String CONFIG_PARTITION_INTEGER_RANGE_END = "cdap.bq.sink.partition.integer.range.end";
String CONFIG_PARTITION_INTEGER_RANGE_INTERVAL = "cdap.bq.sink.partition.integer.range.interval";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* Copyright © 2023 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.plugin.gcp.bigquery.sink;

import com.google.cloud.bigquery.TimePartitioning;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.mock.validation.MockFailureCollector;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;

/**
* Tests for {@link BigQuerySinkConfig}.
*/

public class BigQuerySinkConfigTest {
MockFailureCollector collector;
BigQuerySinkConfig config;
Method validateTimePartitioningColumnMethod;

@Before
public void setup() throws NoSuchMethodException {
collector = new MockFailureCollector();
config = BigQuerySinkConfig.builder().build();
validateTimePartitioningColumnMethod = config.getClass()
.getDeclaredMethod("validateTimePartitioningColumn", String.class, FailureCollector.class,
Schema.class, TimePartitioning.Type.class);
validateTimePartitioningColumnMethod.setAccessible(true);
}
@Test
public void testValidateTimePartitioningColumnWithHourAndDate() throws
InvocationTargetException, IllegalAccessException {
String columnName = "partitionFrom";
Schema fieldSchema = Schema.recordOf("test", Schema.Field.of("partitionFrom",
Schema.of(Schema.LogicalType.DATE)));
TimePartitioning.Type timePartitioningType = TimePartitioning.Type.HOUR;

validateTimePartitioningColumnMethod.invoke(config, columnName, collector, fieldSchema, timePartitioningType);
Assert.assertEquals(String.format("Partition column '%s' is of invalid type '%s'.",
columnName, fieldSchema.getDisplayName()),
collector.getValidationFailures().stream().findFirst().get().getMessage());
}

@Test
public void testValidateTimePartitioningColumnWithHourAndTimestamp() throws
InvocationTargetException, IllegalAccessException {

String columnName = "partitionFrom";
Schema schema = Schema.of(Schema.LogicalType.TIMESTAMP_MICROS);

Schema fieldSchema = schema.isNullable() ? schema.getNonNullable() : schema;
TimePartitioning.Type timePartitioningType = TimePartitioning.Type.HOUR;

validateTimePartitioningColumnMethod.invoke(config, columnName, collector, fieldSchema, timePartitioningType);
Assert.assertEquals(0, collector.getValidationFailures().size());
}

@Test
public void testValidateTimePartitioningColumnWithDayAndString() throws
InvocationTargetException, IllegalAccessException {

String columnName = "partitionFrom";
Schema schema = Schema.of(Schema.Type.STRING);

Schema fieldSchema = schema.isNullable() ? schema.getNonNullable() : schema;
TimePartitioning.Type timePartitioningType = TimePartitioning.Type.DAY;

validateTimePartitioningColumnMethod.invoke(config, columnName, collector, fieldSchema, timePartitioningType);
Assert.assertEquals(String.format("Partition column '%s' is of invalid type '%s'.",
columnName, fieldSchema.getDisplayName()),
collector.getValidationFailures().stream().findFirst().get().getMessage());
}

@Test
public void testValidateTimePartitioningColumnWithDayAndDate() throws
InvocationTargetException, IllegalAccessException {

String columnName = "partitionFrom";
Schema schema = Schema.of(Schema.LogicalType.DATE);

Schema fieldSchema = schema.isNullable() ? schema.getNonNullable() : schema;
TimePartitioning.Type timePartitioningType = TimePartitioning.Type.DAY;

validateTimePartitioningColumnMethod.invoke(config, columnName, collector, fieldSchema, timePartitioningType);
Assert.assertEquals(0, collector.getValidationFailures().size());
}

@Test
public void testValidateTimePartitioningColumnWithNullAndDate() throws
InvocationTargetException, IllegalAccessException {

String columnName = "partitionFrom";
Schema schema = Schema.of(Schema.LogicalType.DATE);

Schema fieldSchema = schema.isNullable() ? schema.getNonNullable() : schema;
TimePartitioning.Type timePartitioningType = null;

validateTimePartitioningColumnMethod.invoke(config, columnName, collector, fieldSchema, timePartitioningType);
// No error as null time timePartitioningType will default to DAY
Assert.assertEquals(0, collector.getValidationFailures().size());
}

}
39 changes: 39 additions & 0 deletions widgets/BigQueryTable-batchsink.json
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,33 @@
]
}
},
{
"widget-type": "radio-group",
"label": "Time Partitioning Type",
"name": "timePartitioningType",
"widget-attributes": {
"layout": "inline",
"default": "DAY",
"options": [
{
"id": "DAY",
"label": "Daily"
},
{
"id": "HOUR",
"label": "Hourly"
},
{
"id": "MONTH",
"label": "Monthly"
},
{
"id": "YEAR",
"label": "Yearly"
}
]
}
},
{
"widget-type": "Number",
"label": "Range Start (inclusive)",
Expand Down Expand Up @@ -375,6 +402,18 @@
}
]
},
{
"name": "PartitioningTimeFieldsFilter",
"condition": {
"expression": "partitioningType == 'TIME'"
},
"show": [
{
"type": "property",
"name": "timePartitioningType"
}
]
},
{
"name": "PartitionFieldFilter",
"condition": {
Expand Down

0 comments on commit 698be58

Please sign in to comment.