Skip to content

Commit c890440

Browse files
renurajagopauden-woolfson
authored andcommitted
Add support for skip_header_line_count and skip_footer_line_count in hive
1 parent a2c5b8f commit c890440

File tree

7 files changed

+368
-5
lines changed

7 files changed

+368
-5
lines changed

presto-docs/src/main/sphinx/connector/hive.rst

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -253,6 +253,14 @@ Property Name Description
253253
reading an Avro-formatted table. If specified, Presto will fetch override schema)
254254
and use this schema instead of relying on any schema in the
255255
Metastore.
256+
257+
``skip_header_line_count`` Number of header lines to skip when reading CSV or TEXTFILE tables. None (ignored if not set). Must be non-negative. Only valid for
258+
When set to ``1``, a header row will be written when creating new CSV and TEXTFILE formats. Values greater than ``0`` are not
259+
CSV or TEXTFILE tables. supported for ``CREATE TABLE AS`` or ``INSERT`` operations.
260+
261+
``skip_footer_line_count`` Number of footer lines to skip when reading CSV or TEXTFILE tables. None (ignored if not set). Must be non-negative. Only valid for
262+
Cannot be used when inserting into a table. CSV and TEXTFILE formats. Values greater than ``0`` are not
263+
supported for ``CREATE TABLE AS`` or ``INSERT`` operations.
256264
======================================================== ============================================================================== ======================================================================================
257265

258266
Hive Metastore Configuration for Avro

presto-hive/src/main/java/com/facebook/presto/hive/HiveMetadata.java

Lines changed: 67 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -235,10 +235,12 @@
235235
import static com.facebook.presto.hive.HiveSessionProperties.shouldCreateEmptyBucketFilesForTemporaryTable;
236236
import static com.facebook.presto.hive.HiveStatisticsUtil.createPartitionStatistics;
237237
import static com.facebook.presto.hive.HiveStorageFormat.AVRO;
238+
import static com.facebook.presto.hive.HiveStorageFormat.CSV;
238239
import static com.facebook.presto.hive.HiveStorageFormat.DWRF;
239240
import static com.facebook.presto.hive.HiveStorageFormat.ORC;
240241
import static com.facebook.presto.hive.HiveStorageFormat.PAGEFILE;
241242
import static com.facebook.presto.hive.HiveStorageFormat.PARQUET;
243+
import static com.facebook.presto.hive.HiveStorageFormat.TEXTFILE;
242244
import static com.facebook.presto.hive.HiveStorageFormat.values;
243245
import static com.facebook.presto.hive.HiveTableProperties.AVRO_SCHEMA_URL;
244246
import static com.facebook.presto.hive.HiveTableProperties.BUCKETED_BY_PROPERTY;
@@ -255,6 +257,8 @@
255257
import static com.facebook.presto.hive.HiveTableProperties.ORC_BLOOM_FILTER_FPP;
256258
import static com.facebook.presto.hive.HiveTableProperties.PARTITIONED_BY_PROPERTY;
257259
import static com.facebook.presto.hive.HiveTableProperties.PREFERRED_ORDERING_COLUMNS;
260+
import static com.facebook.presto.hive.HiveTableProperties.SKIP_FOOTER_LINE_COUNT;
261+
import static com.facebook.presto.hive.HiveTableProperties.SKIP_HEADER_LINE_COUNT;
258262
import static com.facebook.presto.hive.HiveTableProperties.SORTED_BY_PROPERTY;
259263
import static com.facebook.presto.hive.HiveTableProperties.STORAGE_FORMAT_PROPERTY;
260264
import static com.facebook.presto.hive.HiveTableProperties.getAvroSchemaUrl;
@@ -265,6 +269,8 @@
265269
import static com.facebook.presto.hive.HiveTableProperties.getEncryptColumns;
266270
import static com.facebook.presto.hive.HiveTableProperties.getEncryptTable;
267271
import static com.facebook.presto.hive.HiveTableProperties.getExternalLocation;
272+
import static com.facebook.presto.hive.HiveTableProperties.getFooterSkipCount;
273+
import static com.facebook.presto.hive.HiveTableProperties.getHeaderSkipCount;
268274
import static com.facebook.presto.hive.HiveTableProperties.getHiveStorageFormat;
269275
import static com.facebook.presto.hive.HiveTableProperties.getOrcBloomFilterColumns;
270276
import static com.facebook.presto.hive.HiveTableProperties.getOrcBloomFilterFpp;
@@ -412,6 +418,9 @@ public class HiveMetadata
412418
private static final String CSV_QUOTE_KEY = OpenCSVSerde.QUOTECHAR;
413419
private static final String CSV_ESCAPE_KEY = OpenCSVSerde.ESCAPECHAR;
414420

421+
public static final String SKIP_HEADER_COUNT_KEY = "skip.header.line.count";
422+
public static final String SKIP_FOOTER_COUNT_KEY = "skip.footer.line.count";
423+
415424
private static final JsonCodec<MaterializedViewDefinition> MATERIALIZED_VIEW_JSON_CODEC = jsonCodec(MaterializedViewDefinition.class);
416425

417426
private final boolean allowCorruptWritesForTesting;
@@ -753,6 +762,12 @@ private ConnectorTableMetadata getTableMetadata(Optional<Table> table, SchemaTab
753762
properties.put(AVRO_SCHEMA_URL, avroSchemaUrl);
754763
}
755764

765+
// Textfile and CSV specific properties
766+
getSerdeProperty(table.get(), SKIP_HEADER_COUNT_KEY)
767+
.ifPresent(skipHeaderCount -> properties.put(SKIP_HEADER_LINE_COUNT, Integer.valueOf(skipHeaderCount)));
768+
getSerdeProperty(table.get(), SKIP_FOOTER_COUNT_KEY)
769+
.ifPresent(skipFooterCount -> properties.put(SKIP_FOOTER_LINE_COUNT, Integer.valueOf(skipFooterCount)));
770+
756771
// CSV specific property
757772
getCsvSerdeProperty(table.get(), CSV_SEPARATOR_KEY)
758773
.ifPresent(csvSeparator -> properties.put(CSV_SEPARATOR, csvSeparator));
@@ -1294,20 +1309,42 @@ private Map<String, String> getEmptyTableProperties(
12941309
tableProperties.put(AVRO_SCHEMA_URL_KEY, validateAndNormalizeAvroSchemaUrl(avroSchemaUrl, hdfsContext));
12951310
}
12961311

1312+
// Textfile and CSV specific properties
1313+
Set<HiveStorageFormat> csvAndTextFile = ImmutableSet.of(TEXTFILE, CSV);
1314+
getHeaderSkipCount(tableMetadata.getProperties()).ifPresent(headerSkipCount -> {
1315+
if (headerSkipCount > 0) {
1316+
checkFormatForProperty(hiveStorageFormat, csvAndTextFile, SKIP_HEADER_LINE_COUNT);
1317+
tableProperties.put(SKIP_HEADER_COUNT_KEY, String.valueOf(headerSkipCount));
1318+
}
1319+
if (headerSkipCount < 0) {
1320+
throw new PrestoException(HIVE_INVALID_METADATA, format("Invalid value for %s property: %s", SKIP_HEADER_LINE_COUNT, headerSkipCount));
1321+
}
1322+
});
1323+
1324+
getFooterSkipCount(tableMetadata.getProperties()).ifPresent(footerSkipCount -> {
1325+
if (footerSkipCount > 0) {
1326+
checkFormatForProperty(hiveStorageFormat, csvAndTextFile, SKIP_FOOTER_LINE_COUNT);
1327+
tableProperties.put(SKIP_FOOTER_COUNT_KEY, String.valueOf(footerSkipCount));
1328+
}
1329+
if (footerSkipCount < 0) {
1330+
throw new PrestoException(HIVE_INVALID_METADATA, format("Invalid value for %s property: %s", SKIP_FOOTER_LINE_COUNT, footerSkipCount));
1331+
}
1332+
});
1333+
12971334
// CSV specific properties
12981335
getCsvProperty(tableMetadata.getProperties(), CSV_ESCAPE)
12991336
.ifPresent(escape -> {
1300-
checkFormatForProperty(hiveStorageFormat, HiveStorageFormat.CSV, CSV_ESCAPE);
1337+
checkFormatForProperty(hiveStorageFormat, CSV, CSV_ESCAPE);
13011338
tableProperties.put(CSV_ESCAPE_KEY, escape.toString());
13021339
});
13031340
getCsvProperty(tableMetadata.getProperties(), CSV_QUOTE)
13041341
.ifPresent(quote -> {
1305-
checkFormatForProperty(hiveStorageFormat, HiveStorageFormat.CSV, CSV_QUOTE);
1342+
checkFormatForProperty(hiveStorageFormat, CSV, CSV_QUOTE);
13061343
tableProperties.put(CSV_QUOTE_KEY, quote.toString());
13071344
});
13081345
getCsvProperty(tableMetadata.getProperties(), CSV_SEPARATOR)
13091346
.ifPresent(separator -> {
1310-
checkFormatForProperty(hiveStorageFormat, HiveStorageFormat.CSV, CSV_SEPARATOR);
1347+
checkFormatForProperty(hiveStorageFormat, CSV, CSV_SEPARATOR);
13111348
tableProperties.put(CSV_SEPARATOR_KEY, separator.toString());
13121349
});
13131350

@@ -1327,6 +1364,13 @@ private static void checkFormatForProperty(HiveStorageFormat actualStorageFormat
13271364
}
13281365
}
13291366

1367+
private static void checkFormatForProperty(HiveStorageFormat actualStorageFormat, Set<HiveStorageFormat> expectedStorageFormats, String propertyName)
1368+
{
1369+
if (!expectedStorageFormats.contains(actualStorageFormat)) {
1370+
throw new PrestoException(INVALID_TABLE_PROPERTY, format("Cannot specify %s table property for storage format: %s", propertyName, actualStorageFormat));
1371+
}
1372+
}
1373+
13301374
private String validateAndNormalizeAvroSchemaUrl(String url, HdfsContext context)
13311375
{
13321376
try {
@@ -1647,7 +1691,17 @@ public HiveOutputTableHandle beginCreateTable(ConnectorSession session, Connecto
16471691
if (getAvroSchemaUrl(tableMetadata.getProperties()) != null) {
16481692
throw new PrestoException(NOT_SUPPORTED, "CREATE TABLE AS not supported when Avro schema url is set");
16491693
}
1694+
getHeaderSkipCount(tableMetadata.getProperties()).ifPresent(headerSkipCount -> {
1695+
if (headerSkipCount > 0) {
1696+
throw new PrestoException(NOT_SUPPORTED, format("Creating Hive table with data with value of %s property greater than 0 is not supported", SKIP_HEADER_COUNT_KEY));
1697+
}
1698+
});
16501699

1700+
getFooterSkipCount(tableMetadata.getProperties()).ifPresent(footerSkipCount -> {
1701+
if (footerSkipCount > 0) {
1702+
throw new PrestoException(NOT_SUPPORTED, format("Creating Hive table with data with value of %s property greater than 0 is not supported", SKIP_FOOTER_COUNT_KEY));
1703+
}
1704+
});
16511705
HiveStorageFormat tableStorageFormat = getHiveStorageFormat(tableMetadata.getProperties());
16521706
List<String> partitionedBy = getPartitionedBy(tableMetadata.getProperties());
16531707
Optional<HiveBucketProperty> bucketProperty = getBucketProperty(tableMetadata.getProperties());
@@ -2016,6 +2070,15 @@ private HiveInsertTableHandle beginInsertInternal(ConnectorSession session, Conn
20162070
locationHandle = locationService.forExistingTable(metastore, session, table, tempPathRequired);
20172071
}
20182072

2073+
Optional.ofNullable(table.getParameters().get(SKIP_HEADER_COUNT_KEY)).map(Integer::parseInt).ifPresent(headerSkipCount -> {
2074+
if (headerSkipCount > 1) {
2075+
throw new PrestoException(NOT_SUPPORTED, format("INSERT into %s Hive table with value of %s property greater than 1 is not supported", tableName, SKIP_HEADER_COUNT_KEY));
2076+
}
2077+
});
2078+
if (table.getParameters().containsKey(SKIP_FOOTER_COUNT_KEY)) {
2079+
throw new PrestoException(NOT_SUPPORTED, format("INSERT into %s Hive table with %s property not supported", tableName, SKIP_FOOTER_COUNT_KEY));
2080+
}
2081+
20192082
Optional<? extends TableEncryptionProperties> tableEncryptionProperties = getTableEncryptionPropertiesFromHiveProperties(table.getParameters(), tableStorageFormat);
20202083

20212084
HiveStorageFormat partitionStorageFormat = isRespectTableFormat(session) ? tableStorageFormat : getHiveStorageFormat(session);
@@ -3676,7 +3739,7 @@ else if (column.isHidden()) {
36763739

36773740
private static void validateCsvColumns(ConnectorTableMetadata tableMetadata)
36783741
{
3679-
if (getHiveStorageFormat(tableMetadata.getProperties()) != HiveStorageFormat.CSV) {
3742+
if (getHiveStorageFormat(tableMetadata.getProperties()) != CSV) {
36803743
return;
36813744
}
36823745

presto-hive/src/main/java/com/facebook/presto/hive/HiveTableProperties.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@ public class HiveTableProperties
5757
public static final String CSV_SEPARATOR = "csv_separator";
5858
public static final String CSV_QUOTE = "csv_quote";
5959
public static final String CSV_ESCAPE = "csv_escape";
60+
public static final String SKIP_HEADER_LINE_COUNT = "skip_header_line_count";
61+
public static final String SKIP_FOOTER_LINE_COUNT = "skip_footer_line_count";
6062

6163
private final List<PropertyMetadata<?>> tableProperties;
6264

@@ -155,6 +157,8 @@ public HiveTableProperties(TypeManager typeManager, HiveClientConfig config)
155157
stringProperty(CSV_SEPARATOR, "CSV separator character", null, false),
156158
stringProperty(CSV_QUOTE, "CSV quote character", null, false),
157159
stringProperty(CSV_ESCAPE, "CSV escape character", null, false),
160+
integerProperty(SKIP_HEADER_LINE_COUNT, "Number of header lines", null, false),
161+
integerProperty(SKIP_FOOTER_LINE_COUNT, "Number of footer lines", null, false),
158162
new PropertyMetadata<>(
159163
ENCRYPT_COLUMNS,
160164
"List of key references and columns being encrypted. Example: ARRAY['key1:col1,col2', 'key2:col3,col4']",
@@ -290,4 +294,14 @@ public static ColumnEncryptionInformation getEncryptColumns(Map<String, Object>
290294
return tableProperties.containsKey(ENCRYPT_COLUMNS) ? (ColumnEncryptionInformation) tableProperties.get(ENCRYPT_COLUMNS) :
291295
ColumnEncryptionInformation.fromMap(ImmutableMap.of());
292296
}
297+
298+
public static Optional<Integer> getHeaderSkipCount(Map<String, Object> tableProperties)
299+
{
300+
return Optional.ofNullable((Integer) tableProperties.get(SKIP_HEADER_LINE_COUNT));
301+
}
302+
303+
public static Optional<Integer> getFooterSkipCount(Map<String, Object> tableProperties)
304+
{
305+
return Optional.ofNullable((Integer) tableProperties.get(SKIP_FOOTER_LINE_COUNT));
306+
}
293307
}

presto-hive/src/main/java/com/facebook/presto/hive/HiveWriteUtils.java

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import com.google.common.primitives.Shorts;
4646
import com.google.common.primitives.SignedBytes;
4747
import org.apache.hadoop.conf.Configuration;
48+
import org.apache.hadoop.fs.FSDataOutputStream;
4849
import org.apache.hadoop.fs.FileSystem;
4950
import org.apache.hadoop.fs.HadoopExtendedFileSystem;
5051
import org.apache.hadoop.fs.Path;
@@ -53,10 +54,13 @@
5354
import org.apache.hadoop.hive.conf.HiveConf;
5455
import org.apache.hadoop.hive.metastore.ProtectMode;
5556
import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
57+
import org.apache.hadoop.hive.ql.exec.TextRecordWriter;
58+
import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat;
5659
import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
5760
import org.apache.hadoop.hive.ql.io.RCFile;
5861
import org.apache.hadoop.hive.ql.io.RCFileOutputFormat;
5962
import org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat;
63+
import org.apache.hadoop.hive.serde.serdeConstants;
6064
import org.apache.hadoop.hive.serde2.SerDeException;
6165
import org.apache.hadoop.hive.serde2.Serializer;
6266
import org.apache.hadoop.hive.serde2.io.DateWritable;
@@ -75,6 +79,7 @@
7579
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
7680
import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
7781
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
82+
import org.apache.hadoop.io.BinaryComparable;
7883
import org.apache.hadoop.io.BooleanWritable;
7984
import org.apache.hadoop.io.ByteWritable;
8085
import org.apache.hadoop.io.BytesWritable;
@@ -90,6 +95,7 @@
9095
import org.apache.hive.common.util.ReflectionUtil;
9196

9297
import java.io.IOException;
98+
import java.io.OutputStream;
9399
import java.util.ArrayList;
94100
import java.util.HashMap;
95101
import java.util.List;
@@ -117,13 +123,15 @@
117123
import static com.facebook.presto.hive.metastore.MetastoreUtil.verifyOnline;
118124
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
119125
import static java.lang.Float.intBitsToFloat;
126+
import static java.lang.Integer.parseInt;
120127
import static java.lang.Math.toIntExact;
121128
import static java.lang.String.format;
122129
import static java.util.Objects.requireNonNull;
123130
import static java.util.UUID.randomUUID;
124131
import static java.util.stream.Collectors.toList;
125132
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.COMPRESSRESULT;
126133
import static org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_COLUMNS;
134+
import static org.apache.hadoop.hive.ql.exec.Utilities.createCompressedStream;
127135
import static org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector;
128136
import static org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector;
129137
import static org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory.javaBooleanObjectInspector;
@@ -159,6 +167,11 @@ private HiveWriteUtils()
159167
}
160168

161169
public static RecordWriter createRecordWriter(Path target, JobConf conf, Properties properties, String outputFormatName, ConnectorSession session)
170+
{
171+
return createRecordWriter(target, conf, properties, outputFormatName, session, Optional.empty());
172+
}
173+
174+
public static RecordWriter createRecordWriter(Path target, JobConf conf, Properties properties, String outputFormatName, ConnectorSession session, Optional<TextCSVHeaderWriter> textCSVHeaderWriter)
162175
{
163176
try {
164177
boolean compress = HiveConf.getBoolVar(conf, COMPRESSRESULT);
@@ -168,6 +181,9 @@ public static RecordWriter createRecordWriter(Path target, JobConf conf, Propert
168181
if (outputFormatName.equals(MapredParquetOutputFormat.class.getName())) {
169182
return createParquetWriter(target, conf, properties, compress, session);
170183
}
184+
if (outputFormatName.equals(HiveIgnoreKeyTextOutputFormat.class.getName())) {
185+
return createTextCsvFileWriter(target, conf, properties, compress, textCSVHeaderWriter);
186+
}
171187
Object writer = Class.forName(outputFormatName).getConstructor().newInstance();
172188
return ((HiveOutputFormat<?, ?>) writer).getHiveRecordWriter(conf, target, Text.class, compress, properties, Reporter.NULL);
173189
}
@@ -218,6 +234,63 @@ public void close(boolean abort)
218234
};
219235
}
220236

237+
private static RecordWriter createTextCsvFileWriter(Path target, JobConf conf, Properties properties, boolean compress, Optional<TextCSVHeaderWriter> textCSVHeaderWriter)
238+
throws IOException
239+
{
240+
String rowSeparatorString = properties.getProperty(serdeConstants.LINE_DELIM, "\n");
241+
242+
int rowSeparatorByte;
243+
try {
244+
rowSeparatorByte = Byte.parseByte(rowSeparatorString);
245+
}
246+
catch (NumberFormatException e) {
247+
rowSeparatorByte = rowSeparatorString.charAt(0);
248+
}
249+
250+
FSDataOutputStream output = target.getFileSystem(conf).create(target, Reporter.NULL);
251+
OutputStream compressedOutput = createCompressedStream(conf, output, compress);
252+
TextRecordWriter writer = new TextRecordWriter();
253+
writer.initialize(compressedOutput, conf);
254+
Optional<String> skipHeaderLine = Optional.ofNullable(properties.getProperty("skip.header.line.count"));
255+
if (skipHeaderLine.isPresent()) {
256+
if (parseInt(skipHeaderLine.get()) == 1) {
257+
textCSVHeaderWriter
258+
.orElseThrow(() -> new IllegalArgumentException("TextHeaderWriter must not be empty when skip.header.line.count is set to 1"))
259+
.write(compressedOutput, rowSeparatorByte);
260+
}
261+
}
262+
int finalRowSeparatorByte = rowSeparatorByte;
263+
return new ExtendedRecordWriter()
264+
{
265+
private long length;
266+
267+
@Override
268+
public long getWrittenBytes()
269+
{
270+
return length;
271+
}
272+
273+
@Override
274+
public void write(Writable value)
275+
throws IOException
276+
{
277+
BinaryComparable binary = (BinaryComparable) value;
278+
compressedOutput.write(binary.getBytes(), 0, binary.getLength());
279+
compressedOutput.write(finalRowSeparatorByte);
280+
}
281+
282+
@Override
283+
public void close(boolean abort)
284+
throws IOException
285+
{
286+
writer.close();
287+
if (!abort) {
288+
length = target.getFileSystem(conf).getFileStatus(target).getLen();
289+
}
290+
}
291+
};
292+
}
293+
221294
public static Serializer initializeSerializer(Configuration conf, Properties properties, String serializerName)
222295
{
223296
try {

0 commit comments

Comments
 (0)