Skip to content

Commit c77d2a9

Browse files
renurajagopauden-woolfson
authored andcommitted
Add support for skip_header_line_count and skip_footer_line_count in hive
1 parent 84af5a1 commit c77d2a9

File tree

6 files changed

+354
-1
lines changed

6 files changed

+354
-1
lines changed

presto-hive/src/main/java/com/facebook/presto/hive/HiveMetadata.java

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -255,6 +255,8 @@
255255
import static com.facebook.presto.hive.HiveTableProperties.ORC_BLOOM_FILTER_FPP;
256256
import static com.facebook.presto.hive.HiveTableProperties.PARTITIONED_BY_PROPERTY;
257257
import static com.facebook.presto.hive.HiveTableProperties.PREFERRED_ORDERING_COLUMNS;
258+
import static com.facebook.presto.hive.HiveTableProperties.SKIP_FOOTER_LINE_COUNT;
259+
import static com.facebook.presto.hive.HiveTableProperties.SKIP_HEADER_LINE_COUNT;
258260
import static com.facebook.presto.hive.HiveTableProperties.SORTED_BY_PROPERTY;
259261
import static com.facebook.presto.hive.HiveTableProperties.STORAGE_FORMAT_PROPERTY;
260262
import static com.facebook.presto.hive.HiveTableProperties.getAvroSchemaUrl;
@@ -265,6 +267,8 @@
265267
import static com.facebook.presto.hive.HiveTableProperties.getEncryptColumns;
266268
import static com.facebook.presto.hive.HiveTableProperties.getEncryptTable;
267269
import static com.facebook.presto.hive.HiveTableProperties.getExternalLocation;
270+
import static com.facebook.presto.hive.HiveTableProperties.getFooterSkipCount;
271+
import static com.facebook.presto.hive.HiveTableProperties.getHeaderSkipCount;
268272
import static com.facebook.presto.hive.HiveTableProperties.getHiveStorageFormat;
269273
import static com.facebook.presto.hive.HiveTableProperties.getOrcBloomFilterColumns;
270274
import static com.facebook.presto.hive.HiveTableProperties.getOrcBloomFilterFpp;
@@ -412,6 +416,9 @@ public class HiveMetadata
412416
private static final String CSV_QUOTE_KEY = OpenCSVSerde.QUOTECHAR;
413417
private static final String CSV_ESCAPE_KEY = OpenCSVSerde.ESCAPECHAR;
414418

419+
public static final String SKIP_HEADER_COUNT_KEY = "skip.header.line.count";
420+
public static final String SKIP_FOOTER_COUNT_KEY = "skip.footer.line.count";
421+
415422
private static final JsonCodec<MaterializedViewDefinition> MATERIALIZED_VIEW_JSON_CODEC = jsonCodec(MaterializedViewDefinition.class);
416423

417424
private final boolean allowCorruptWritesForTesting;
@@ -753,6 +760,12 @@ private ConnectorTableMetadata getTableMetadata(Optional<Table> table, SchemaTab
753760
properties.put(AVRO_SCHEMA_URL, avroSchemaUrl);
754761
}
755762

763+
// Textfile and CSV specific properties
764+
getSerdeProperty(table.get(), SKIP_HEADER_COUNT_KEY)
765+
.ifPresent(skipHeaderCount -> properties.put(SKIP_HEADER_LINE_COUNT, Integer.valueOf(skipHeaderCount)));
766+
getSerdeProperty(table.get(), SKIP_FOOTER_COUNT_KEY)
767+
.ifPresent(skipFooterCount -> properties.put(SKIP_FOOTER_LINE_COUNT, Integer.valueOf(skipFooterCount)));
768+
756769
// CSV specific property
757770
getCsvSerdeProperty(table.get(), CSV_SEPARATOR_KEY)
758771
.ifPresent(csvSeparator -> properties.put(CSV_SEPARATOR, csvSeparator));
@@ -1294,6 +1307,28 @@ private Map<String, String> getEmptyTableProperties(
12941307
tableProperties.put(AVRO_SCHEMA_URL_KEY, validateAndNormalizeAvroSchemaUrl(avroSchemaUrl, hdfsContext));
12951308
}
12961309

1310+
// Textfile and CSV specific properties
1311+
Set<HiveStorageFormat> csvAndTextFile = ImmutableSet.of(HiveStorageFormat.TEXTFILE, HiveStorageFormat.CSV);
1312+
getHeaderSkipCount(tableMetadata.getProperties()).ifPresent(headerSkipCount -> {
1313+
if (headerSkipCount > 0) {
1314+
checkFormatForProperty(hiveStorageFormat, csvAndTextFile, SKIP_HEADER_LINE_COUNT);
1315+
tableProperties.put(SKIP_HEADER_COUNT_KEY, String.valueOf(headerSkipCount));
1316+
}
1317+
if (headerSkipCount < 0) {
1318+
throw new PrestoException(HIVE_INVALID_METADATA, format("Invalid value for %s property: %s", SKIP_HEADER_LINE_COUNT, headerSkipCount));
1319+
}
1320+
});
1321+
1322+
getFooterSkipCount(tableMetadata.getProperties()).ifPresent(footerSkipCount -> {
1323+
if (footerSkipCount > 0) {
1324+
checkFormatForProperty(hiveStorageFormat, csvAndTextFile, SKIP_FOOTER_LINE_COUNT);
1325+
tableProperties.put(SKIP_FOOTER_COUNT_KEY, String.valueOf(footerSkipCount));
1326+
}
1327+
if (footerSkipCount < 0) {
1328+
throw new PrestoException(HIVE_INVALID_METADATA, format("Invalid value for %s property: %s", SKIP_FOOTER_LINE_COUNT, footerSkipCount));
1329+
}
1330+
});
1331+
12971332
// CSV specific properties
12981333
getCsvProperty(tableMetadata.getProperties(), CSV_ESCAPE)
12991334
.ifPresent(escape -> {
@@ -1327,6 +1362,13 @@ private static void checkFormatForProperty(HiveStorageFormat actualStorageFormat
13271362
}
13281363
}
13291364

1365+
private static void checkFormatForProperty(HiveStorageFormat actualStorageFormat, Set<HiveStorageFormat> expectedStorageFormats, String propertyName)
1366+
{
1367+
if (!expectedStorageFormats.contains(actualStorageFormat)) {
1368+
throw new PrestoException(INVALID_TABLE_PROPERTY, format("Cannot specify %s table property for storage format: %s", propertyName, actualStorageFormat));
1369+
}
1370+
}
1371+
13301372
private String validateAndNormalizeAvroSchemaUrl(String url, HdfsContext context)
13311373
{
13321374
try {
@@ -1647,7 +1689,17 @@ public HiveOutputTableHandle beginCreateTable(ConnectorSession session, Connecto
16471689
if (getAvroSchemaUrl(tableMetadata.getProperties()) != null) {
16481690
throw new PrestoException(NOT_SUPPORTED, "CREATE TABLE AS not supported when Avro schema url is set");
16491691
}
1692+
getHeaderSkipCount(tableMetadata.getProperties()).ifPresent(headerSkipCount -> {
1693+
if (headerSkipCount > 1) {
1694+
throw new PrestoException(NOT_SUPPORTED, format("Creating Hive table with data with value of %s property greater than 0 is not supported", SKIP_HEADER_COUNT_KEY));
1695+
}
1696+
});
16501697

1698+
getFooterSkipCount(tableMetadata.getProperties()).ifPresent(footerSkipCount -> {
1699+
if (footerSkipCount > 0) {
1700+
throw new PrestoException(NOT_SUPPORTED, format("Creating Hive table with data with value of %s property greater than 0 is not supported", SKIP_FOOTER_COUNT_KEY));
1701+
}
1702+
});
16511703
HiveStorageFormat tableStorageFormat = getHiveStorageFormat(tableMetadata.getProperties());
16521704
List<String> partitionedBy = getPartitionedBy(tableMetadata.getProperties());
16531705
Optional<HiveBucketProperty> bucketProperty = getBucketProperty(tableMetadata.getProperties());
@@ -2016,6 +2068,15 @@ private HiveInsertTableHandle beginInsertInternal(ConnectorSession session, Conn
20162068
locationHandle = locationService.forExistingTable(metastore, session, table, tempPathRequired);
20172069
}
20182070

2071+
Optional.ofNullable(table.getParameters().get(SKIP_HEADER_COUNT_KEY)).map(Integer::parseInt).ifPresent(headerSkipCount -> {
2072+
if (headerSkipCount > 1) {
2073+
throw new PrestoException(NOT_SUPPORTED, format("INSERT into %s Hive table with value of %s property greater than 1 is not supported", tableName, SKIP_HEADER_COUNT_KEY));
2074+
}
2075+
});
2076+
if (table.getParameters().containsKey(SKIP_FOOTER_COUNT_KEY)) {
2077+
throw new PrestoException(NOT_SUPPORTED, format("INSERT into %s Hive table with %s property not supported", tableName, SKIP_FOOTER_COUNT_KEY));
2078+
}
2079+
20192080
Optional<? extends TableEncryptionProperties> tableEncryptionProperties = getTableEncryptionPropertiesFromHiveProperties(table.getParameters(), tableStorageFormat);
20202081

20212082
HiveStorageFormat partitionStorageFormat = isRespectTableFormat(session) ? tableStorageFormat : getHiveStorageFormat(session);

presto-hive/src/main/java/com/facebook/presto/hive/HiveTableProperties.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@ public class HiveTableProperties
5757
public static final String CSV_SEPARATOR = "csv_separator";
5858
public static final String CSV_QUOTE = "csv_quote";
5959
public static final String CSV_ESCAPE = "csv_escape";
60+
public static final String SKIP_HEADER_LINE_COUNT = "skip_header_line_count";
61+
public static final String SKIP_FOOTER_LINE_COUNT = "skip_footer_line_count";
6062

6163
private final List<PropertyMetadata<?>> tableProperties;
6264

@@ -155,6 +157,8 @@ public HiveTableProperties(TypeManager typeManager, HiveClientConfig config)
155157
stringProperty(CSV_SEPARATOR, "CSV separator character", null, false),
156158
stringProperty(CSV_QUOTE, "CSV quote character", null, false),
157159
stringProperty(CSV_ESCAPE, "CSV escape character", null, false),
160+
integerProperty(SKIP_HEADER_LINE_COUNT, "Number of header lines", null, false),
161+
integerProperty(SKIP_FOOTER_LINE_COUNT, "Number of footer lines", null, false),
158162
new PropertyMetadata<>(
159163
ENCRYPT_COLUMNS,
160164
"List of key references and columns being encrypted. Example: ARRAY['key1:col1,col2', 'key2:col3,col4']",
@@ -290,4 +294,14 @@ public static ColumnEncryptionInformation getEncryptColumns(Map<String, Object>
290294
return tableProperties.containsKey(ENCRYPT_COLUMNS) ? (ColumnEncryptionInformation) tableProperties.get(ENCRYPT_COLUMNS) :
291295
ColumnEncryptionInformation.fromMap(ImmutableMap.of());
292296
}
297+
298+
public static Optional<Integer> getHeaderSkipCount(Map<String, Object> tableProperties)
299+
{
300+
return Optional.ofNullable((Integer) tableProperties.get(SKIP_HEADER_LINE_COUNT));
301+
}
302+
303+
public static Optional<Integer> getFooterSkipCount(Map<String, Object> tableProperties)
304+
{
305+
return Optional.ofNullable((Integer) tableProperties.get(SKIP_FOOTER_LINE_COUNT));
306+
}
293307
}

presto-hive/src/main/java/com/facebook/presto/hive/HiveWriteUtils.java

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import com.google.common.primitives.Shorts;
4646
import com.google.common.primitives.SignedBytes;
4747
import org.apache.hadoop.conf.Configuration;
48+
import org.apache.hadoop.fs.FSDataOutputStream;
4849
import org.apache.hadoop.fs.FileSystem;
4950
import org.apache.hadoop.fs.HadoopExtendedFileSystem;
5051
import org.apache.hadoop.fs.Path;
@@ -53,10 +54,14 @@
5354
import org.apache.hadoop.hive.conf.HiveConf;
5455
import org.apache.hadoop.hive.metastore.ProtectMode;
5556
import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
57+
import org.apache.hadoop.hive.ql.exec.TextRecordWriter;
58+
import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat;
5659
import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
5760
import org.apache.hadoop.hive.ql.io.RCFile;
5861
import org.apache.hadoop.hive.ql.io.RCFileOutputFormat;
5962
import org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat;
63+
import org.apache.hadoop.hive.serde.serdeConstants;
64+
import org.apache.hadoop.hive.serde2.AbstractSerDe;
6065
import org.apache.hadoop.hive.serde2.SerDeException;
6166
import org.apache.hadoop.hive.serde2.Serializer;
6267
import org.apache.hadoop.hive.serde2.io.DateWritable;
@@ -75,6 +80,7 @@
7580
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
7681
import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
7782
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
83+
import org.apache.hadoop.io.BinaryComparable;
7884
import org.apache.hadoop.io.BooleanWritable;
7985
import org.apache.hadoop.io.ByteWritable;
8086
import org.apache.hadoop.io.BytesWritable;
@@ -90,6 +96,7 @@
9096
import org.apache.hive.common.util.ReflectionUtil;
9197

9298
import java.io.IOException;
99+
import java.io.OutputStream;
93100
import java.util.ArrayList;
94101
import java.util.HashMap;
95102
import java.util.List;
@@ -117,13 +124,15 @@
117124
import static com.facebook.presto.hive.metastore.MetastoreUtil.verifyOnline;
118125
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
119126
import static java.lang.Float.intBitsToFloat;
127+
import static java.lang.Integer.parseInt;
120128
import static java.lang.Math.toIntExact;
121129
import static java.lang.String.format;
122130
import static java.util.Objects.requireNonNull;
123131
import static java.util.UUID.randomUUID;
124132
import static java.util.stream.Collectors.toList;
125133
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.COMPRESSRESULT;
126134
import static org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_COLUMNS;
135+
import static org.apache.hadoop.hive.ql.exec.Utilities.createCompressedStream;
127136
import static org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector;
128137
import static org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector;
129138
import static org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory.javaBooleanObjectInspector;
@@ -159,6 +168,11 @@ private HiveWriteUtils()
159168
}
160169

161170
public static RecordWriter createRecordWriter(Path target, JobConf conf, Properties properties, String outputFormatName, ConnectorSession session)
171+
{
172+
return createRecordWriter(target, conf, properties, outputFormatName, session, Optional.empty());
173+
}
174+
175+
public static RecordWriter createRecordWriter(Path target, JobConf conf, Properties properties, String outputFormatName, ConnectorSession session, Optional<TextCSVHeaderWriter> textCSVHeaderWriter)
162176
{
163177
try {
164178
boolean compress = HiveConf.getBoolVar(conf, COMPRESSRESULT);
@@ -168,6 +182,9 @@ public static RecordWriter createRecordWriter(Path target, JobConf conf, Propert
168182
if (outputFormatName.equals(MapredParquetOutputFormat.class.getName())) {
169183
return createParquetWriter(target, conf, properties, compress, session);
170184
}
185+
if (outputFormatName.equals(HiveIgnoreKeyTextOutputFormat.class.getName())) {
186+
return createTextCsvFileWriter(target, conf, properties, compress, textCSVHeaderWriter);
187+
}
171188
Object writer = Class.forName(outputFormatName).getConstructor().newInstance();
172189
return ((HiveOutputFormat<?, ?>) writer).getHiveRecordWriter(conf, target, Text.class, compress, properties, Reporter.NULL);
173190
}
@@ -218,6 +235,63 @@ public void close(boolean abort)
218235
};
219236
}
220237

238+
private static RecordWriter createTextCsvFileWriter(Path target, JobConf conf, Properties properties, boolean compress, Optional<TextCSVHeaderWriter> textCSVHeaderWriter)
239+
throws IOException
240+
{
241+
String rowSeparatorString = properties.getProperty(serdeConstants.LINE_DELIM, "\n");
242+
243+
int rowSeparatorByte;
244+
try {
245+
rowSeparatorByte = Byte.parseByte(rowSeparatorString);
246+
}
247+
catch (NumberFormatException e) {
248+
rowSeparatorByte = rowSeparatorString.charAt(0);
249+
}
250+
251+
FSDataOutputStream output = target.getFileSystem(conf).create(target, Reporter.NULL);
252+
OutputStream compressedOutput = createCompressedStream(conf, output, compress);
253+
TextRecordWriter writer = new TextRecordWriter();
254+
writer.initialize(compressedOutput, conf);
255+
Optional<String> skipHeaderLine = Optional.ofNullable(properties.getProperty("skip.header.line.count"));
256+
if (skipHeaderLine.isPresent()) {
257+
if (parseInt(skipHeaderLine.get()) == 1) {
258+
textCSVHeaderWriter
259+
.orElseThrow(() -> new IllegalArgumentException("TextHeaderWriter must not be empty when skip.header.line.count is set to 1"))
260+
.write(compressedOutput, rowSeparatorByte);
261+
}
262+
}
263+
int finalRowSeparatorByte = rowSeparatorByte;
264+
return new ExtendedRecordWriter()
265+
{
266+
private long length;
267+
268+
@Override
269+
public long getWrittenBytes()
270+
{
271+
return length;
272+
}
273+
274+
@Override
275+
public void write(Writable value)
276+
throws IOException
277+
{
278+
BinaryComparable binary = (BinaryComparable) value;
279+
compressedOutput.write(binary.getBytes(), 0, binary.getLength());
280+
compressedOutput.write(finalRowSeparatorByte);
281+
}
282+
283+
@Override
284+
public void close(boolean abort)
285+
throws IOException
286+
{
287+
writer.close();
288+
if (!abort) {
289+
length = target.getFileSystem(conf).getFileStatus(target).getLen();
290+
}
291+
}
292+
};
293+
}
294+
221295
public static Serializer initializeSerializer(Configuration conf, Properties properties, String serializerName)
222296
{
223297
try {

presto-hive/src/main/java/com/facebook/presto/hive/RecordFileWriter.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import com.google.common.collect.ImmutableList;
2727
import org.apache.hadoop.fs.Path;
2828
import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
29+
import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat;
2930
import org.apache.hadoop.hive.serde2.SerDeException;
3031
import org.apache.hadoop.hive.serde2.Serializer;
3132
import org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe;
@@ -100,11 +101,18 @@ public RecordFileWriter(
100101
serDe = OptimizedLazyBinaryColumnarSerde.class.getName();
101102
}
102103
serializer = initializeSerializer(conf, schema, serDe);
103-
recordWriter = createRecordWriter(path, conf, schema, storageFormat.getOutputFormat(), session);
104104

105105
List<ObjectInspector> objectInspectors = getRowColumnInspectors(fileColumnTypes);
106106
tableInspector = getStandardStructObjectInspector(fileColumnNames, objectInspectors);
107107

108+
if (storageFormat.getOutputFormat().equals(HiveIgnoreKeyTextOutputFormat.class.getName())) {
109+
Optional<TextCSVHeaderWriter> textHeaderWriter = Optional.of(new TextCSVHeaderWriter(serializer, typeManager, session, fileColumnNames));
110+
recordWriter = createRecordWriter(path, conf, schema, storageFormat.getOutputFormat(), session, textHeaderWriter);
111+
}
112+
else {
113+
recordWriter = createRecordWriter(path, conf, schema, storageFormat.getOutputFormat(), session, Optional.empty());
114+
}
115+
108116
// reorder (and possibly reduce) struct fields to match input
109117
structFields = ImmutableList.copyOf(inputColumnNames.stream()
110118
.map(tableInspector::getStructFieldRef)

0 commit comments

Comments
 (0)