diff --git a/h2o-core/src/main/java/water/api/FramesHandler.java b/h2o-core/src/main/java/water/api/FramesHandler.java index 58207669a5ba..5ac14ff7eb10 100644 --- a/h2o-core/src/main/java/water/api/FramesHandler.java +++ b/h2o-core/src/main/java/water/api/FramesHandler.java @@ -252,7 +252,7 @@ public FramesV3 export(int version, FramesV3 s) { if (s.parallel) { Log.warn("Parallel export to a single file is not supported for parquet format! Export will continue with a parquet-specific setup."); } - s.job = new JobV3(Frame.exportParquet(fr, s.path, s.force, s.compression)); + s.job = new JobV3(Frame.exportParquet(fr, s.path, s.force, s.compression, s.write_checksum)); } else { Frame.CSVStreamParams csvParms = new Frame.CSVStreamParams() .setSeparator(s.separator) diff --git a/h2o-core/src/main/java/water/api/schemas3/FramesV3.java b/h2o-core/src/main/java/water/api/schemas3/FramesV3.java index 2f812957f5bd..b95c91458b12 100644 --- a/h2o-core/src/main/java/water/api/schemas3/FramesV3.java +++ b/h2o-core/src/main/java/water/api/schemas3/FramesV3.java @@ -50,6 +50,9 @@ public class FramesV3 extends RequestSchemaV3 { @API(help="Compression method (default none; gzip, bzip2 and snappy available depending on runtime environment)") public String compression; + @API(help="Specifies if checksum should be written next to data files on export (if supported by export format).") + public boolean write_checksum = true; + @API(help="Field separator (default ',')") public byte separator = Frame.CSVStreamParams.DEFAULT_SEPARATOR; diff --git a/h2o-core/src/main/java/water/fvec/Frame.java b/h2o-core/src/main/java/water/fvec/Frame.java index 5ced07405610..9f66399a518a 100644 --- a/h2o-core/src/main/java/water/fvec/Frame.java +++ b/h2o-core/src/main/java/water/fvec/Frame.java @@ -1605,7 +1605,7 @@ public static Job export(Frame fr, String path, String frameName, boolean overwr return job.start(t, fr.anyVec().nChunks()); } - public static Job exportParquet(Frame fr, String path, boolean overwrite, String compression) { + public static Job exportParquet(Frame fr, String path, boolean overwrite, String compression, boolean writeChecksum) { // Validate input if (! H2O.getPM().isEmptyDirectoryAllNodes(path)) { throw new H2OIllegalArgumentException(path, "exportFrame", "Cannot use path " + path + @@ -1626,7 +1626,7 @@ public static Job exportParquet(Frame fr, String path, boolean overwrite, String } Job job = new Job<>(fr._key, "water.fvec.Frame", "Export dataset"); - H2O.H2OCountedCompleter t = parquetExporter.export(fr, path, overwrite, compression); + H2O.H2OCountedCompleter t = parquetExporter.export(fr, path, overwrite, compression, writeChecksum); return job.start(t, fr.anyVec().nChunks()); } diff --git a/h2o-core/src/main/java/water/parser/BinaryFormatExporter.java b/h2o-core/src/main/java/water/parser/BinaryFormatExporter.java index 9db1d5062bd3..f90fc9803735 100644 --- a/h2o-core/src/main/java/water/parser/BinaryFormatExporter.java +++ b/h2o-core/src/main/java/water/parser/BinaryFormatExporter.java @@ -6,7 +6,7 @@ public interface BinaryFormatExporter { - H2O.H2OCountedCompleter export(Frame frame, String path, boolean force, String compression); + H2O.H2OCountedCompleter export(Frame frame, String path, boolean force, String compression, boolean writeChecksum); boolean supports(ExportFileFormat format); } diff --git a/h2o-parsers/h2o-parquet-parser/src/main/java/water/parser/parquet/FrameParquetExporter.java b/h2o-parsers/h2o-parquet-parser/src/main/java/water/parser/parquet/FrameParquetExporter.java index 28adff1d1be4..8c35cc72c8b8 100644 --- a/h2o-parsers/h2o-parquet-parser/src/main/java/water/parser/parquet/FrameParquetExporter.java +++ b/h2o-parsers/h2o-parquet-parser/src/main/java/water/parser/parquet/FrameParquetExporter.java @@ -1,6 +1,8 @@ package water.parser.parquet; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.example.data.Group; import org.apache.parquet.example.data.simple.SimpleGroupFactory; import org.apache.parquet.hadoop.ParquetFileWriter; @@ -28,9 +30,19 @@ public class FrameParquetExporter { - public void export(H2O.H2OCountedCompleter completer, String path, Frame frame, boolean force, String compression) { + public void export(H2O.H2OCountedCompleter completer, String path, Frame frame, boolean force, String compression, boolean writeChecksum) { File f = new File(path); - new FrameParquetExporter.PartExportParquetTask(completer, f.getPath(), generateMessageTypeString(frame), frame.names(), frame.types(), frame.domains(), force, compression).dfork(frame); + new FrameParquetExporter.PartExportParquetTask( + completer, + f.getPath(), + generateMessageTypeString(frame), + frame.names(), + frame.types(), + frame.domains(), + force, + compression, + writeChecksum + ).dfork(frame); } private static class PartExportParquetTask extends MRTask { @@ -41,9 +53,11 @@ private static class PartExportParquetTask extends MRTask final byte[] _colTypes; final String[][] _domains; final boolean _force; + final boolean _writeChecksum; PartExportParquetTask(H2O.H2OCountedCompleter completer, String path, String messageTypeString, - String[] colNames, byte[] colTypes, String[][] domains, boolean force, String compression) { + String[] colNames, byte[] colTypes, String[][] domains, + boolean force, String compression, boolean writeChecksum) { super(completer); _path = path; _compressionCodecName = getCompressionCodecName(compression); @@ -52,6 +66,7 @@ private static class PartExportParquetTask extends MRTask _colTypes = colTypes; _domains = domains; _force = force; + _writeChecksum = writeChecksum; } CompressionCodecName getCompressionCodecName(String compression) { @@ -82,7 +97,7 @@ public void map(Chunk[] cs) { String partPath = _path + "/part-m-" + String.valueOf(100000 + partIdx).substring(1); SimpleGroupFactory fact = new SimpleGroupFactory(parseMessageType(_messageTypeString)); - try (ParquetWriter writer = buildWriter(new Path(partPath), _compressionCodecName, PersistHdfs.CONF, parseMessageType(_messageTypeString), getMode(_force))) { + try (ParquetWriter writer = buildWriter(new Path(partPath), _compressionCodecName, PersistHdfs.CONF, parseMessageType(_messageTypeString), getMode(_force), _writeChecksum)) { String currColName; byte currColType; @@ -122,34 +137,40 @@ public void map(Chunk[] cs) { } private static String generateMessageTypeString(Frame frame) { - String message_txt = "message test { "; + StringBuilder mb = new StringBuilder("message export_type { "); String currName; for (int i = 0; i < frame.numCols(); i++) { currName = frame._names[i]; switch (frame.types()[i]) { case (T_TIME): - message_txt = message_txt.concat("optional int64 ").concat(currName).concat(" (TIMESTAMP_MILLIS);"); + mb.append("optional int64 ").append(currName).append(" (TIMESTAMP_MILLIS);"); break; case (T_NUM): case (T_BAD): - message_txt = message_txt.concat("optional double ").concat(currName).concat("; "); + mb.append("optional double ").append(currName).append("; "); break; case (T_STR): case (T_CAT): - message_txt = message_txt.concat("optional BINARY ").concat(currName).concat(" (UTF8); "); + mb.append("optional BINARY ").append(currName).append(" (UTF8); "); break; case (T_UUID): - message_txt = message_txt.concat("optional fixed_len_byte_array(16) ").concat(currName).concat(" (UUID); "); + mb.append("optional fixed_len_byte_array(16) ").append(currName).append(" (UUID); "); break; } } - message_txt = message_txt.concat("} "); - return message_txt; + mb.append("} "); + return mb.toString(); } - private static ParquetWriter buildWriter(Path file, CompressionCodecName compressionCodecName, Configuration configuration, MessageType _schema, ParquetFileWriter.Mode mode) throws IOException { - GroupWriteSupport.setSchema(_schema, configuration); - return new ParquetWriter.Builder(file) { + private static ParquetWriter buildWriter(Path path, CompressionCodecName compressionCodecName, Configuration configuration, MessageType schema, ParquetFileWriter.Mode mode, boolean writeChecksum) throws IOException { + GroupWriteSupport.setSchema(schema, configuration); + + // The filesystem is cached for a given path and configuration, + // therefore the following modification on the fs is a bit hacky as another process could use the same instance. + // However, given the current use case and the fact that the changes impacts only the way files are written, it should be on the safe side. + FileSystem fs = path.getFileSystem(configuration); + fs.setWriteChecksum(writeChecksum); + return new ParquetWriter.Builder(path) { @Override protected ParquetWriter.Builder self() { return this; diff --git a/h2o-parsers/h2o-parquet-parser/src/main/java/water/parser/parquet/ParquetExporter.java b/h2o-parsers/h2o-parquet-parser/src/main/java/water/parser/parquet/ParquetExporter.java index 6211e6e210b1..76559c20eb15 100644 --- a/h2o-parsers/h2o-parquet-parser/src/main/java/water/parser/parquet/ParquetExporter.java +++ b/h2o-parsers/h2o-parquet-parser/src/main/java/water/parser/parquet/ParquetExporter.java @@ -8,16 +8,13 @@ public class ParquetExporter implements BinaryFormatExporter { @Override - public H2O.H2OCountedCompleter export(Frame frame, String path, boolean force, String compression) { - return new ExportParquetDriver(frame, path, force, compression); + public H2O.H2OCountedCompleter export(Frame frame, String path, boolean force, String compression, boolean writeChecksum) { + return new ExportParquetDriver(frame, path, force, compression, writeChecksum); } @Override public boolean supports(ExportFileFormat format) { - if (ExportFileFormat.parquet.equals(format)) { - return true; - } - return false; + return ExportFileFormat.parquet.equals(format); } private class ExportParquetDriver extends H2O.H2OCountedCompleter { @@ -26,19 +23,21 @@ private class ExportParquetDriver extends H2O.H2OCountedCompleter 1, list.files(dname, full.names = TRUE), dname) @@ -42,10 +42,14 @@ test.export.file <- function(parts) { print(head(H.pred)) expect_equal(R.pred, H.pred) + return(dname) } -test.export.file.single <- function() test.export.file(1) -test.export.file.multipart <- function() test.export.file(2) +test.export.file.csv.single <- function() test.export.file(1) +test.export.file.csv.multipart <- function() test.export.file(2) + +doSuite("Testing Exporting Files CSV", makeSuite( + test.export.file.csv.single, + test.export.file.csv.multipart, +)) -doTest("Testing Exporting Files (single file)", test.export.file.single) -doTest("Testing Exporting Files (part files)", test.export.file.multipart) diff --git a/h2o-r/tests/testdir_misc/runit_export_file_parquet.R b/h2o-r/tests/testdir_misc/runit_export_file_parquet.R new file mode 100644 index 000000000000..640df80dd85d --- /dev/null +++ b/h2o-r/tests/testdir_misc/runit_export_file_parquet.R @@ -0,0 +1,55 @@ +setwd(normalizePath(dirname(R.utils::commandArgs(asValues=TRUE)$"f"))) +source("../../scripts/h2o-r-test-setup.R") + + +#Export file with h2o.export_file and compare with R counterpart when re importing file to check for parity. + + +test.export.file <- function(path, write_checksum = TRUE) { + data <- h2o.uploadFile(locate(path)) + + fname <- paste(paste0(sample(letters, 3, replace = TRUE), collapse = ""), + paste0(sample(0:9, 3, replace = TRUE), collapse = ""), paste0(data$id, ".parquet"), sep = "_") + dname <- file.path(sandbox(), fname) + + Log.info("Exporting File...") + h2o.exportFile(data, dname, format = "parquet", force=TRUE, write_checksum = write_checksum) + + files <- list.files(dname, full.names = TRUE) + print(files) + + Log.info("Comparing file with R...") + rfiles <- ifelse( length(files) > 1, list.files(dname, full.names = TRUE), dname) + Log.info(sprintf("Results stored in files: %s", paste(rfiles, collapse = ", "))) + + imported <- h2o.importFolder(path = dname, pattern = "part-m-") + + if (length(files) == 1) { + expect_equal(imported, data) + } else { + expect_equal(mean(imported), mean(data)) + } + return(dname) +} + +test.export.file.prostate <- function() test.export.file("smalldata/prostate/prostate.csv") +test.export.file.airquality_train1 <- function() test.export.file("smalldata/testng/airquality_train1.csv") +test.export.file.autoclaims <- function() test.export.file("smalldata/gbm_test/autoclaims.csv") +test.export.file.item_demand <- function() test.export.file("smalldata/demos/item_demand.csv") + +test.export.file.titanic_expanded <- function() { + export_dir <- test.export.file("smalldata/titanic/titanic_expanded.csv") + expect_gt(length(list.files(path=export_dir, pattern="\\.crc$", all.files=TRUE, full.names=FALSE)), 0) +} +test.export.file.titanic_expanded.no_checksum<- function() { + export_dir <- test.export.file("smalldata/titanic/titanic_expanded.csv", write_checksum = FALSE) + expect_equal(length(list.files(path=export_dir, pattern="\\.crc$", all.files=TRUE, full.names=FALSE)), 0) +} + doSuite("Testing Exporting Parquet Files", makeSuite( + test.export.file.prostate, + test.export.file.airquality_train1, + test.export.file.autoclaims, + test.export.file.item_demand, + test.export.file.titanic_expanded, + test.export.file.titanic_expanded.no_checksum + )) diff --git a/h2o-r/tests/testdir_misc/runit_export_parquet_multipart.R b/h2o-r/tests/testdir_misc/runit_export_parquet_multipart.R deleted file mode 100644 index fc0fd0bdf478..000000000000 --- a/h2o-r/tests/testdir_misc/runit_export_parquet_multipart.R +++ /dev/null @@ -1,45 +0,0 @@ -setwd(normalizePath(dirname(R.utils::commandArgs(asValues=TRUE)$"f"))) -source("../../scripts/h2o-r-test-setup.R") - - -#Export file with h2o.export_file and compare with R counterpart when re importing file to check for parity. - - -test.export.file <- function(path) { - data <- h2o.uploadFile(locate(path)) - - fname <- paste(paste0(sample(letters, 3, replace = TRUE), collapse = ""), - paste0(sample(0:9, 3, replace = TRUE), collapse = ""), "prostate.parquet", sep = "_") - dname <- file.path(sandbox(), fname) - - Log.info("Exporting File...") - h2o.exportFile(data, dname, format = "parquet", force=TRUE) - - files <- list.files(dname, full.names = TRUE) - print(files) - - Log.info("Comparing file with R...") - rfiles <- ifelse( length(files) > 1, list.files(dname, full.names = TRUE), dname) - Log.info(sprintf("Results stored in files: %s", paste(rfiles, collapse = ", "))) - - imported <- h2o.importFolder(path = dname, pattern = "part-m-") - - if (length(files) == 1) { - expect_equal(imported, data) - } else { - expect_equal(mean(imported), mean(data)) - } - -} - - test.export.file.prostate <- function() test.export.file("smalldata/prostate/prostate.csv") - test.export.file.titanic_expanded <- function() test.export.file("smalldata/titanic/titanic_expanded.csv") - test.export.file.airquality_train1 <- function() test.export.file("smalldata/testng/airquality_train1.csv") - test.export.file.autoclaims <- function() test.export.file("smalldata/gbm_test/autoclaims.csv") - test.export.file.item_demand <- function() test.export.file("smalldata/demos/item_demand.csv") - - doTest("Testing Exporting Parquet Files (prostate)", test.export.file.prostate) - doTest("Testing Exporting Parquet Files (titanic_expanded)", test.export.file.titanic_expanded) - doTest("Testing Exporting Parquet Files (airquality_train1)", test.export.file.airquality_train1) - doTest("Testing Exporting Parquet Files (autoclaims)", test.export.file.autoclaims) - doTest("Testing Exporting Parquet Files (item_demand)", test.export.file.item_demand)