diff --git a/iis-common/src/main/java/eu/dnetlib/iis/common/spark/SparkSessionFactory.java b/iis-common/src/main/java/eu/dnetlib/iis/common/spark/SparkSessionFactory.java index c743546ca..31ca4dc2b 100644 --- a/iis-common/src/main/java/eu/dnetlib/iis/common/spark/SparkSessionFactory.java +++ b/iis-common/src/main/java/eu/dnetlib/iis/common/spark/SparkSessionFactory.java @@ -13,4 +13,11 @@ public static SparkSession withConfAndKryo(SparkConf conf) { .config(SparkConfHelper.withKryo(conf)) .getOrCreate(); } + + public static SparkSession withHiveEnabledConfAndKryo(SparkConf conf) { + return SparkSession.builder() + .config(SparkConfHelper.withKryo(conf)) + .enableHiveSupport() + .getOrCreate(); + } } diff --git a/iis-common/src/main/java/eu/dnetlib/iis/common/spark/SparkSessionSupport.java b/iis-common/src/main/java/eu/dnetlib/iis/common/spark/SparkSessionSupport.java index bef8d45d6..d15cd60fc 100644 --- a/iis-common/src/main/java/eu/dnetlib/iis/common/spark/SparkSessionSupport.java +++ b/iis-common/src/main/java/eu/dnetlib/iis/common/spark/SparkSessionSupport.java @@ -36,6 +36,24 @@ public static void runWithSparkSession(SparkConf conf, job); } + /** + * Runs a given job using SparkSession created using default builder and supplied SparkConf. Enables Hive support. + * Stops SparkSession when SparkSession is shared e.g. created in tests. Allows to reuse SparkSession created externally. + * + * @param conf SparkConf instance + * @param isSparkSessionShared When true will not stop SparkSession + * @param job Job using constructed SparkSession + */ + public static void runWithHiveEnabledSparkSession(SparkConf conf, + Boolean isSparkSessionShared, + Job job) { + runWithSparkSession( + SparkSessionFactory::withHiveEnabledConfAndKryo, + conf, + isSparkSessionShared, + job); + } + /** * Runs a given job using SparkSession created using supplied builder and supplied SparkConf. Stops SparkSession * when SparkSession is shared e.g. created in tests. Allows to reuse SparkSession created externally. diff --git a/iis-common/src/test/java/eu/dnetlib/iis/common/spark/TestWithSharedSparkSession.java b/iis-common/src/test/java/eu/dnetlib/iis/common/spark/TestWithSharedSparkSession.java index bb7b20593..0723d0385 100644 --- a/iis-common/src/test/java/eu/dnetlib/iis/common/spark/TestWithSharedSparkSession.java +++ b/iis-common/src/test/java/eu/dnetlib/iis/common/spark/TestWithSharedSparkSession.java @@ -6,6 +6,8 @@ import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeEach; +import org.apache.spark.sql.SparkSession.Builder; + import java.util.Objects; /** @@ -15,7 +17,22 @@ public class TestWithSharedSparkSession { private static SparkSession _spark; protected boolean initialized = false; + protected final boolean hiveSupportEnabled; + /** + * Default constructor with hive support in spark session disabled. + */ + public TestWithSharedSparkSession() { + this(false); + } + + /** + * @param hiveSupportEnabled indicates whether hive support should be enabled in spark session + */ + public TestWithSharedSparkSession(boolean hiveSupportEnabled) { + this.hiveSupportEnabled = hiveSupportEnabled; + } + public SparkSession spark() { return _spark; } @@ -31,7 +48,11 @@ public void beforeEach() { conf.set("spark.driver.host", "localhost"); conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); conf.set("spark.kryo.registrator", "pl.edu.icm.sparkutils.avro.AvroCompatibleKryoRegistrator"); - _spark = SparkSession.builder().config(conf).getOrCreate(); + Builder builder = SparkSession.builder().config(conf); + if (hiveSupportEnabled) { + builder = builder.enableHiveSupport(); + } + _spark = builder.getOrCreate(); } } diff --git a/iis-wf/iis-wf-import/pom.xml b/iis-wf/iis-wf-import/pom.xml index a3af82cb7..f6c86dac2 100644 --- a/iis-wf/iis-wf-import/pom.xml +++ b/iis-wf/iis-wf-import/pom.xml @@ -132,6 +132,18 @@ org.apache.spark spark-avro_2.11 + + + org.apache.spark + spark-hive_2.11 + + + + eigenbase + eigenbase-properties + + + org.apache.httpcomponents diff --git a/iis-wf/iis-wf-import/src/main/java/eu/dnetlib/iis/wf/importer/content/HiveBasedDocumentContentUrlImporterJob.java b/iis-wf/iis-wf-import/src/main/java/eu/dnetlib/iis/wf/importer/content/HiveBasedDocumentContentUrlImporterJob.java new file mode 100644 index 000000000..687158df1 --- /dev/null +++ b/iis-wf/iis-wf-import/src/main/java/eu/dnetlib/iis/wf/importer/content/HiveBasedDocumentContentUrlImporterJob.java @@ -0,0 +1,109 @@ +package eu.dnetlib.iis.wf.importer.content; + +import static eu.dnetlib.iis.common.spark.SparkSessionSupport.runWithHiveEnabledSparkSession; +import static org.apache.spark.sql.functions.col; +import static org.apache.spark.sql.functions.concat; +import static org.apache.spark.sql.functions.lit; + +import org.apache.commons.lang3.StringUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; + +import com.beust.jcommander.JCommander; +import com.beust.jcommander.Parameter; +import com.beust.jcommander.Parameters; +import com.google.common.collect.Lists; + +import eu.dnetlib.iis.common.InfoSpaceConstants; +import eu.dnetlib.iis.common.java.io.HdfsUtils; +import eu.dnetlib.iis.common.report.ReportEntryFactory; +import eu.dnetlib.iis.common.schemas.ReportEntry; +import eu.dnetlib.iis.common.spark.SparkConfHelper; +import eu.dnetlib.iis.common.spark.avro.AvroDataFrameSupport; +import eu.dnetlib.iis.importer.auxiliary.schemas.DocumentContentUrl; +import pl.edu.icm.sparkutils.avro.SparkAvroSaver; + + +/** + * {@link DocumentContentUrl} importer reading data from hive table being a part of pdf aggregation subsystem. + * + * @author mhorst + * + */ +public class HiveBasedDocumentContentUrlImporterJob { + + private static SparkAvroSaver avroSaver = new SparkAvroSaver(); + + private static final String COUNTER_IMPORTED_RECORDS_TOTAL = "import.content.urls.fromAggregator"; + + public static void main(String[] args) throws Exception { + + HiveBasedDocumentContentUrlImporterJobParameters params = new HiveBasedDocumentContentUrlImporterJobParameters(); + JCommander jcommander = new JCommander(params); + jcommander.parse(args); + + SparkConf conf = SparkConfHelper.withKryo(new SparkConf()); + if (!StringUtils.isEmpty(params.hiveMetastoreUris)) { + conf.set("hive.metastore.uris", params.hiveMetastoreUris); + } + + runWithHiveEnabledSparkSession(conf, params.isSparkSessionShared, sparkSession -> { + + HdfsUtils.remove(sparkSession.sparkContext().hadoopConfiguration(), params.outputPath); + HdfsUtils.remove(sparkSession.sparkContext().hadoopConfiguration(), params.outputReportPath); + + Dataset result = sparkSession.sql("select id, location, mimetype, size, hash from " + + params.inputTableName + " where location is not null"); + + JavaRDD documentContentUrl = buildOutputRecord(result, sparkSession); + documentContentUrl.cache(); + + JavaRDD reports = generateReportEntries(sparkSession, documentContentUrl.count()); + + avroSaver.saveJavaRDD(documentContentUrl, DocumentContentUrl.SCHEMA$, params.outputPath); + avroSaver.saveJavaRDD(reports, ReportEntry.SCHEMA$, params.outputReportPath); + }); + } + + private static JavaRDD generateReportEntries(SparkSession sparkSession, long recordsCount) { + return sparkSession.createDataset( + Lists.newArrayList( + ReportEntryFactory.createCounterReportEntry(COUNTER_IMPORTED_RECORDS_TOTAL, recordsCount)), + Encoders.kryo(ReportEntry.class)).javaRDD(); + } + + private static JavaRDD buildOutputRecord(Dataset source, SparkSession spark) { + Dataset resultDs = source.select( + concat(lit(InfoSpaceConstants.ROW_PREFIX_RESULT), col("id")).as("id"), + col("location").as("url"), + col("mimetype").as("mimeType"), + col("size").cast("long").divide(1024).as("contentSizeKB"), + col("hash").as("contentChecksum") + ); + return new AvroDataFrameSupport(spark).toDS(resultDs, DocumentContentUrl.class).toJavaRDD(); + } + + @Parameters(separators = "=") + private static class HiveBasedDocumentContentUrlImporterJobParameters { + + @Parameter(names = "-sharedSparkSession") + private Boolean isSparkSessionShared = Boolean.FALSE; + + @Parameter(names = "-inputTableName", required = true) + private String inputTableName; + + @Parameter(names = "-hiveMetastoreUris", required = true) + private String hiveMetastoreUris; + + @Parameter(names = "-outputPath", required = true) + private String outputPath; + + @Parameter(names = "-outputReportPath", required = true) + private String outputReportPath; + } + +} diff --git a/iis-wf/iis-wf-import/src/main/resources/eu/dnetlib/iis/wf/importer/content_url/chain/oozie_app/import.txt b/iis-wf/iis-wf-import/src/main/resources/eu/dnetlib/iis/wf/importer/content_url/chain/oozie_app/import.txt index 72c9899d8..6cb19bab5 100644 --- a/iis-wf/iis-wf-import/src/main/resources/eu/dnetlib/iis/wf/importer/content_url/chain/oozie_app/import.txt +++ b/iis-wf/iis-wf-import/src/main/resources/eu/dnetlib/iis/wf/importer/content_url/chain/oozie_app/import.txt @@ -1,5 +1,6 @@ ## This is a classpath-based import file (this header is required) import_content_url_core classpath eu/dnetlib/iis/wf/importer/content_url/core/oozie_app +import_content_url_core_parquet classpath eu/dnetlib/iis/wf/importer/content_url/core_parquet/oozie_app import_content_url_dedup classpath eu/dnetlib/iis/wf/importer/content_url/dedup/oozie_app transformers_idreplacer classpath eu/dnetlib/iis/wf/transformers/idreplacer/oozie_app transformers_common_existencefilter classpath eu/dnetlib/iis/wf/transformers/common/existencefilter/oozie_app diff --git a/iis-wf/iis-wf-import/src/main/resources/eu/dnetlib/iis/wf/importer/content_url/chain/oozie_app/workflow.xml b/iis-wf/iis-wf-import/src/main/resources/eu/dnetlib/iis/wf/importer/content_url/chain/oozie_app/workflow.xml index 1ff3212ff..e0d417adb 100644 --- a/iis-wf/iis-wf-import/src/main/resources/eu/dnetlib/iis/wf/importer/content_url/chain/oozie_app/workflow.xml +++ b/iis-wf/iis-wf-import/src/main/resources/eu/dnetlib/iis/wf/importer/content_url/chain/oozie_app/workflow.xml @@ -2,25 +2,40 @@ + + + input_table_name + $UNDEFINED$ + hive input table name + + + hive_metastore_uris + $UNDEFINED$ + hive metastore locations + + + objectstore_facade_factory_classname eu.dnetlib.iis.wf.importer.facade.WebServiceObjectStoreFacadeFactory ServiceFacadeFactory implementation class name producing eu.dnetlib.iis.wf.importer.facade.ObjectStoreFacade - - objectstore_service_location - object store service location to retrieve PDF/text contents from - - - approved_objectstores_csv - $UNDEFINED$ - CSV of approved object stores - - - blacklisted_objectstores_csv - $UNDEFINED$ - CSV of blacklisted object stores - + + objectstore_service_location + $UNDEFINED$ + object store service location to retrieve PDF/text contents from + + + approved_objectstores_csv + $UNDEFINED$ + CSV of approved object stores + + + blacklisted_objectstores_csv + $UNDEFINED$ + CSV of blacklisted object stores + + mimetypes_pdf pdf mime types @@ -146,9 +161,16 @@ org.apache.avro.Schema.Type.STRING - + + + + + ${input_table_name eq "$UNDEFINED$"} + + + @@ -169,6 +191,25 @@ + + + ${wf:appPath()}/import_content_url_core_parquet + + + + workingDir + ${workingDir}/import_content_url_core/working_dir + + + output + ${workingDir}/imported-urls + + + + + + + eu.dnetlib.iis.common.java.ProcessWrapper @@ -239,7 +280,7 @@ - ${input_id eq "$UNDEFINED$"} + ${input_id eq "$UNDEFINED$"} diff --git a/iis-wf/iis-wf-import/src/main/resources/eu/dnetlib/iis/wf/importer/content_url/core_parquet/oozie_app/workflow.xml b/iis-wf/iis-wf-import/src/main/resources/eu/dnetlib/iis/wf/importer/content_url/core_parquet/oozie_app/workflow.xml new file mode 100644 index 000000000..d60bb44e9 --- /dev/null +++ b/iis-wf/iis-wf-import/src/main/resources/eu/dnetlib/iis/wf/importer/content_url/core_parquet/oozie_app/workflow.xml @@ -0,0 +1,116 @@ + + + + + input_table_name + hive input table name + + + hive_metastore_uris + hive metastore locations + + + + output + DocumentContentUrl avro datastore output + + + output_report_root_path + base directory for storing reports + + + output_report_relative_path + import_content_url_from_aggregator + directory for storing report (relative to output_report_root_path) + + + sparkDriverMemory + memory for driver process + + + sparkExecutorMemory + memory for individual executor + + + sparkExecutorCores + number of cores used by single executor + + + oozieActionShareLibForSpark2 + oozie action sharelib for spark 2.* + + + spark2ExtraListeners + com.cloudera.spark.lineage.NavigatorAppListener + spark 2.* extra listeners classname + + + spark2SqlQueryExecutionListeners + com.cloudera.spark.lineage.NavigatorQueryListener + spark 2.* sql query execution listeners classname + + + spark2YarnHistoryServerAddress + spark 2.* yarn history server address + + + spark2EventLogDir + spark 2.* event log dir location + + + + + ${jobTracker} + ${nameNode} + + + mapreduce.job.queuename + ${queueName} + + + oozie.launcher.mapred.job.queue.name + ${oozieLauncherQueueName} + + + oozie.action.sharelib.for.spark + ${oozieActionShareLibForSpark2} + + + + + + + + + yarn-cluster + cluster + import-content-url_core + eu.dnetlib.iis.wf.importer.content.HiveBasedDocumentContentUrlImporterJob + ${oozieTopWfApplicationPath}/lib/iis-wf-import-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + + -inputTableName=${input_table_name} + -hiveMetastoreUris=${hive_metastore_uris} + -outputPath=${output} + -outputReportPath=${output_report_root_path}/${output_report_relative_path} + + + + + + + Unfortunately, the process failed -- error message: + [${wf:errorMessage(wf:lastErrorNode())}] + + + + + + diff --git a/iis-wf/iis-wf-import/src/test/java/eu/dnetlib/iis/wf/importer/content/HiveBasedDocumentContentUrlImporterJobTest.java b/iis-wf/iis-wf-import/src/test/java/eu/dnetlib/iis/wf/importer/content/HiveBasedDocumentContentUrlImporterJobTest.java new file mode 100644 index 000000000..8ed3129ae --- /dev/null +++ b/iis-wf/iis-wf-import/src/test/java/eu/dnetlib/iis/wf/importer/content/HiveBasedDocumentContentUrlImporterJobTest.java @@ -0,0 +1,128 @@ +package eu.dnetlib.iis.wf.importer.content; + +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.nio.file.Path; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import eu.dnetlib.iis.common.ClassPathResourceProvider; +import eu.dnetlib.iis.common.schemas.ReportEntry; +import eu.dnetlib.iis.common.spark.TestWithSharedSparkSession; +import eu.dnetlib.iis.common.utils.AvroAssertTestUtil; +import eu.dnetlib.iis.importer.auxiliary.schemas.DocumentContentUrl; + +/** + * @author mhorst + * + */ +public class HiveBasedDocumentContentUrlImporterJobTest extends TestWithSharedSparkSession { + + @TempDir + public Path workingDir; + + private Path outputDir; + private Path outputReportDir; + + private HiveBasedDocumentContentUrlImporterJobTest() { + // enabling hive support in spark session instantiated for this set of tests + super(true); + } + + @BeforeEach + public void beforeEach() { + super.beforeEach(); + + outputDir = workingDir.resolve("output"); + outputReportDir = workingDir.resolve("output_report"); + } + + @Test + public void testImportFromNonExistingTable() throws Exception { + // when + String inputTableName = "nonExistingTable"; + String hiveMetastoreUris = "localhost"; + + assertThrows(RuntimeException.class, () -> HiveBasedDocumentContentUrlImporterJob.main(new String[]{ + "-sharedSparkSession", + "-inputTableName", inputTableName, + "-hiveMetastoreUris", hiveMetastoreUris, + "-outputPath", outputDir.toString(), + "-outputReportPath", outputReportDir.toString(), + })); + } + + @Test + public void testImportFromExistingLocalTable() throws Exception { + // when + String inputDbName = "test_aggregation_import_db"; + String inputTableName = "agg_payload"; + String hiveMetastoreUris = ""; + + // initializing test database + initializeLocalDatabase(inputDbName, inputTableName); + spark().sql("INSERT INTO TABLE " + inputDbName + '.' + inputTableName + + " VALUES ('od______2367::0001a50c6388e9bfcb791a924ec4b837', " + + "'s3://some_location', " + + "'application/pdf', '1024', '3a0fb023de6c8735a52ac9c1edace612')"); + + HiveBasedDocumentContentUrlImporterJob.main(new String[]{ + "-sharedSparkSession", + "-inputTableName", inputDbName + '.' + inputTableName, + "-hiveMetastoreUris", hiveMetastoreUris, + "-outputPath", outputDir.toString(), + "-outputReportPath", outputReportDir.toString(), + }); + + // then + String expectedOutputPath = ClassPathResourceProvider.getResourcePath("eu/dnetlib/iis/wf/importer/content/output/unit_test/document_content_url.json"); + String expectedReportPath = ClassPathResourceProvider.getResourcePath("eu/dnetlib/iis/wf/importer/content/output/unit_test/report.json"); + + AvroAssertTestUtil.assertEqualsWithJsonIgnoreOrder(outputDir.toString(), expectedOutputPath, DocumentContentUrl.class); + AvroAssertTestUtil.assertEqualsWithJsonIgnoreOrder(outputReportDir.toString(), expectedReportPath, ReportEntry.class); + + } + + @Test + public void testImportFromEmptyLocalTable() throws Exception { + // when + String inputDbName = "test_aggregation_import_db"; + String inputTableName = "agg_payload"; + String hiveMetastoreUris = ""; + + // initializing test database + initializeLocalDatabase(inputDbName, inputTableName); + + HiveBasedDocumentContentUrlImporterJob.main(new String[]{ + "-sharedSparkSession", + "-inputTableName", inputDbName + '.' + inputTableName, + "-hiveMetastoreUris", hiveMetastoreUris, + "-outputPath", outputDir.toString(), + "-outputReportPath", outputReportDir.toString(), + }); + + // then + String expectedOutputPath = ClassPathResourceProvider.getResourcePath("eu/dnetlib/iis/wf/importer/content/output/unit_test/document_content_url_empty.json"); + String expectedReportPath = ClassPathResourceProvider.getResourcePath("eu/dnetlib/iis/wf/importer/content/output/unit_test/report_empty.json"); + + AvroAssertTestUtil.assertEqualsWithJsonIgnoreOrder(outputDir.toString(), expectedOutputPath, DocumentContentUrl.class); + AvroAssertTestUtil.assertEqualsWithJsonIgnoreOrder(outputReportDir.toString(), expectedReportPath, ReportEntry.class); + + } + + /** + * Initializes local hive database. + * @param inputDbName database name to be initialized + * @param inputTableName table to name to be recreated + */ + private void initializeLocalDatabase(String inputDbName, String inputTableName) { + spark().sql("CREATE DATABASE IF NOT EXISTS " + inputDbName); + spark().sql("DROP TABLE IF EXISTS " + inputDbName + '.' + inputTableName); + spark().sql("CREATE TABLE " + inputDbName + '.' + inputTableName + + "(id string, location string, mimetype string, size string, hash string)" + + " STORED AS PARQUET"); + } + +} diff --git a/iis-wf/iis-wf-import/src/test/java/eu/dnetlib/iis/wf/importer/content/WorkflowTest.java b/iis-wf/iis-wf-import/src/test/java/eu/dnetlib/iis/wf/importer/content/WorkflowTest.java index 8b6c0abf4..cecb34ecc 100644 --- a/iis-wf/iis-wf-import/src/test/java/eu/dnetlib/iis/wf/importer/content/WorkflowTest.java +++ b/iis-wf/iis-wf-import/src/test/java/eu/dnetlib/iis/wf/importer/content/WorkflowTest.java @@ -2,6 +2,8 @@ import eu.dnetlib.iis.common.AbstractOozieWorkflowTestCase; import eu.dnetlib.iis.common.OozieWorkflowTestConfiguration; + +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; /** @@ -13,10 +15,17 @@ public class WorkflowTest extends AbstractOozieWorkflowTestCase { @Test - public void testImportContentUrlWorkflow() { + public void testImportContentUrlWorkflowWithObjectStoreAsBackend() { OozieWorkflowTestConfiguration wfConf = new OozieWorkflowTestConfiguration(); wfConf.setTimeoutInSeconds(720); - testWorkflow("eu/dnetlib/iis/wf/importer/content_url/chain/sampletest", wfConf); + testWorkflow("eu/dnetlib/iis/wf/importer/content_url/chain/objectstore_based_importer", wfConf); + } + + @Disabled("TODO reenable this test once Hive is installed on CI Test hadoop cluster") + public void testImportContentUrlWorkflowWithHiveBasedAggregationSubsystemAsBackend() { + OozieWorkflowTestConfiguration wfConf = new OozieWorkflowTestConfiguration(); + wfConf.setTimeoutInSeconds(1440); + testWorkflow("eu/dnetlib/iis/wf/importer/content_url/chain/agg_subsystem_based_importer", wfConf); } @Test diff --git a/iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content/output/unit_test/document_content_url.json b/iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content/output/unit_test/document_content_url.json new file mode 100644 index 000000000..0823f9e2c --- /dev/null +++ b/iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content/output/unit_test/document_content_url.json @@ -0,0 +1 @@ +{"id": "50|od______2367::0001a50c6388e9bfcb791a924ec4b837", "url": "s3://some_location", "mimeType": "application/pdf", "contentChecksum": "3a0fb023de6c8735a52ac9c1edace612", "contentSizeKB": 1} \ No newline at end of file diff --git a/iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content/output/unit_test/document_content_url_empty.json b/iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content/output/unit_test/document_content_url_empty.json new file mode 100644 index 000000000..e69de29bb diff --git a/iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content/output/unit_test/report.json b/iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content/output/unit_test/report.json new file mode 100644 index 000000000..a878e26e6 --- /dev/null +++ b/iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content/output/unit_test/report.json @@ -0,0 +1 @@ +{"key":"import.content.urls.fromAggregator", "type":"COUNTER", "value": "1"} diff --git a/iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content/output/unit_test/report_empty.json b/iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content/output/unit_test/report_empty.json new file mode 100644 index 000000000..6a23fd53f --- /dev/null +++ b/iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content/output/unit_test/report_empty.json @@ -0,0 +1 @@ +{"key":"import.content.urls.fromAggregator", "type":"COUNTER", "value": "0"} diff --git a/iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content_url/chain/sampletest/oozie_app/import.txt b/iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content_url/chain/agg_subsystem_based_importer/oozie_app/import.txt similarity index 100% rename from iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content_url/chain/sampletest/oozie_app/import.txt rename to iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content_url/chain/agg_subsystem_based_importer/oozie_app/import.txt diff --git a/iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content_url/chain/agg_subsystem_based_importer/oozie_app/initdb.q b/iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content_url/chain/agg_subsystem_based_importer/oozie_app/initdb.q new file mode 100644 index 000000000..a2d330944 --- /dev/null +++ b/iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content_url/chain/agg_subsystem_based_importer/oozie_app/initdb.q @@ -0,0 +1,11 @@ +CREATE DATABASE IF NOT EXISTS ${DB_NAME}; + +DROP TABLE IF EXISTS ${DB_NAME}.${TABLE_NAME}; + +CREATE TABLE ${DB_NAME}.${TABLE_NAME} (id string, location string, mimetype string, size string, hash string) STORED AS PARQUET; + +INSERT INTO TABLE ${DB_NAME}.${TABLE_NAME} VALUES + ('doaj10829873::0092cca2055a02f51c0b21b332395b09', 'http://localhost:8280/is/mvc/objectStore/retrieve.do?objectStore=doaj10829873%3A%3A0092cca2055a02f51c0b21b332395b09%3A%3A2ed5ca4d0d8f77aaf2ef4fbf12418750&objectId=doaj10829873%3A%3A0092cca2055a02f51c0b21b332395b09%3A%3A2ed5ca4d0d8f77aaf2ef4fbf12418750', 'text/html', '38912', '19a53e9ebb0c878ad088568a8cf23dc6'), + ('od______2367::0001a50c6388e9bfcb791a924ec4b837', 'http://localhost:8280/is/mvc/objectStore/retrieve.do?objectStore=od______2367%3A%3A0001a50c6388e9bfcb791a924ec4b837%3A%3A80aedf56c7fedf81758414e3b5af1f70&objectId=od______2367%3A%3A0001a50c6388e9bfcb791a924ec4b837%3A%3A80aedf56c7fedf81758414e3b5af1f70', 'application/pdf', '1024', '3a0fb023de6c8735a52ac9c1edace612'), + ('dedup1______::000015093c397516c0b1b000f38982de', 'http://localhost:8280/is/mvc/objectStore/retrieve.do?objectStore=webcrawl____%3A%3A000015093c397516c0b1b000f38982de&objectId=webcrawl____%3A%3A000015093c397516c0b1b000f38982de', 'file::WoS', '6144', 'a9af919ba339a1491a922a7fe7b2580f'), + ('od_______908::00005ab6b308ff8f0f5415be03d8cce9', 'http://localhost:8280/is/mvc/objectStore/retrieve.do?objectStore=od_______908%3A%3A00005ab6b308ff8f0f5415be03d8cce9&objectId=od_______908%3A%3A00005ab6b308ff8f0f5415be03d8cce9', 'xml', '7168', '464ac8be9b33e109e98596799cf9d05d'); \ No newline at end of file diff --git a/iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content_url/chain/agg_subsystem_based_importer/oozie_app/teardowndb.q b/iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content_url/chain/agg_subsystem_based_importer/oozie_app/teardowndb.q new file mode 100644 index 000000000..13cee61fc --- /dev/null +++ b/iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content_url/chain/agg_subsystem_based_importer/oozie_app/teardowndb.q @@ -0,0 +1,2 @@ +DROP TABLE ${DB_NAME}.${TABLE_NAME}; +DROP DATABASE ${DB_NAME}; \ No newline at end of file diff --git a/iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content_url/chain/agg_subsystem_based_importer/oozie_app/workflow.xml b/iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content_url/chain/agg_subsystem_based_importer/oozie_app/workflow.xml new file mode 100644 index 000000000..53f27519c --- /dev/null +++ b/iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content_url/chain/agg_subsystem_based_importer/oozie_app/workflow.xml @@ -0,0 +1,195 @@ + + + + + + localDbName + content_agg_test_db + + + localTableName + payload + + + + + ${jobTracker} + ${nameNode} + + + mapreduce.job.queuename + ${queueName} + + + oozie.launcher.mapred.job.queue.name + ${oozieLauncherQueueName} + + + + + + + + + + jdbc:hive2://iis-cdh5-test-m3.ocean.icm.edu.pl:10000 + + DB_NAME=${localDbName} + TABLE_NAME=${localTableName} + + + + + + + + + + + + eu.dnetlib.iis.common.java.ProcessWrapper + eu.dnetlib.iis.common.java.jsonworkflownodes.Producer + + -C{id, + eu.dnetlib.iis.common.schemas.Identifier, + eu/dnetlib/iis/wf/importer/content_url/data/input/id.json} + + -C{id_mapping, + eu.dnetlib.iis.common.schemas.IdentifierMapping, + eu/dnetlib/iis/wf/importer/content_url/data/input/id_mapping.json} + -Oid=${workingDir}/producer/id + -Oid_mapping=${workingDir}/producer/id_mapping + + + + + + + + ${wf:appPath()}/import_content_url + + + + input_table_name + ${localDbName}.${localTableName} + + + + hive_metastore_uris + thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083 + + + mimetypes_pdf + application/pdf + + + mimetypes_html + text/html + + + mimetypes_xml_pmc + xml + + + mimetypes_wos + file::WoS + + + input_id + ${workingDir}/producer/id + + + input_id_mapping + ${workingDir}/producer/id_mapping + + + output_root + ${workingDir}/out + + + output_report_root_path + ${workingDir}/report + + + output_report_relative_path + import_content_url + + + + + + + + + + + eu.dnetlib.iis.common.java.ProcessWrapper + + eu.dnetlib.iis.common.java.jsonworkflownodes.TestingConsumer + + -C{content_url_html, + eu.dnetlib.iis.importer.auxiliary.schemas.DocumentContentUrl, + eu/dnetlib/iis/wf/importer/content_url/data/output/document_content_url_html.json} + -C{content_url_pdf, + eu.dnetlib.iis.importer.auxiliary.schemas.DocumentContentUrl, + eu/dnetlib/iis/wf/importer/content_url/data/output/document_content_url_pdf.json} + -C{content_url_wos, + eu.dnetlib.iis.importer.auxiliary.schemas.DocumentContentUrl, + eu/dnetlib/iis/wf/importer/content_url/data/output/document_content_url_wos.json} + -C{content_url_xml, + eu.dnetlib.iis.importer.auxiliary.schemas.DocumentContentUrl, + eu/dnetlib/iis/wf/importer/content_url/data/output/document_content_url_xml.json} + + + -Icontent_url_html=${workingDir}/out/html + -Icontent_url_pdf=${workingDir}/out/pdf + -Icontent_url_wos=${workingDir}/out/wos + -Icontent_url_xml=${workingDir}/out/xmlpmc + + + + + + + + eu.dnetlib.iis.common.java.ProcessWrapper + eu.dnetlib.iis.common.java.jsonworkflownodes.TestingConsumer + -C{report,eu.dnetlib.iis.common.schemas.ReportEntry,eu/dnetlib/iis/wf/importer/content_url/data/output/report_from_agg_subsystem.json} + -Ireport=${workingDir}/report/import_content_url + + + + + + + + + jdbc:hive2://iis-cdh5-test-m3.ocean.icm.edu.pl:10000 + + DB_NAME=${localDbName} + TABLE_NAME=${localTableName} + + + + + + + + + jdbc:hive2://iis-cdh5-test-m3.ocean.icm.edu.pl:10000 + + DB_NAME=${localDbName} + TABLE_NAME=${localTableName} + + + + + + + Unfortunately, the process failed -- error message: [${wf:errorMessage(wf:lastErrorNode())}] + + + + \ No newline at end of file diff --git a/iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content_url/chain/objectstore_based_importer/oozie_app/import.txt b/iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content_url/chain/objectstore_based_importer/oozie_app/import.txt new file mode 100644 index 000000000..ab9386b5f --- /dev/null +++ b/iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content_url/chain/objectstore_based_importer/oozie_app/import.txt @@ -0,0 +1,2 @@ +## This is a classpath-based import file (this header is required) +import_content_url classpath eu/dnetlib/iis/wf/importer/content_url/chain/oozie_app diff --git a/iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content_url/chain/sampletest/oozie_app/workflow.xml b/iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content_url/chain/objectstore_based_importer/oozie_app/workflow.xml similarity index 99% rename from iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content_url/chain/sampletest/oozie_app/workflow.xml rename to iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content_url/chain/objectstore_based_importer/oozie_app/workflow.xml index f8485e19d..c25669981 100644 --- a/iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content_url/chain/sampletest/oozie_app/workflow.xml +++ b/iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content_url/chain/objectstore_based_importer/oozie_app/workflow.xml @@ -1,5 +1,5 @@ - + ${jobTracker} @@ -17,7 +17,7 @@ - + diff --git a/iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content_url/data/output/report_from_agg_subsystem.json b/iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content_url/data/output/report_from_agg_subsystem.json new file mode 100644 index 000000000..950ce481e --- /dev/null +++ b/iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content_url/data/output/report_from_agg_subsystem.json @@ -0,0 +1,5 @@ +{"key":"import.content.urls.bytype.pdf", "type":"COUNTER", "value": "1"} +{"key":"import.content.urls.bytype.html", "type":"COUNTER", "value": "1"} +{"key":"import.content.urls.bytype.jats", "type":"COUNTER", "value": "1"} +{"key":"import.content.urls.bytype.wos", "type":"COUNTER", "value": "1"} +{"key": "import.content.urls.fromAggregator", "type": "COUNTER", "value": "4"} diff --git a/iis-wf/iis-wf-metadataextraction/pom.xml b/iis-wf/iis-wf-metadataextraction/pom.xml index 472a176e3..358a71b1f 100644 --- a/iis-wf/iis-wf-metadataextraction/pom.xml +++ b/iis-wf/iis-wf-metadataextraction/pom.xml @@ -38,6 +38,23 @@ ${project.version} test-jar test + + + + com.fasterxml.jackson.core + jackson-core + + + + com.fasterxml.jackson.core + jackson-databind + + + + com.fasterxml.jackson.core + jackson-annotations + + diff --git a/iis-wf/iis-wf-metadataextraction/src/main/resources/eu/dnetlib/iis/wf/metadataextraction/cache/builder/oozie_app/workflow.xml b/iis-wf/iis-wf-metadataextraction/src/main/resources/eu/dnetlib/iis/wf/metadataextraction/cache/builder/oozie_app/workflow.xml index a3d63b2eb..7977e7bc6 100644 --- a/iis-wf/iis-wf-metadataextraction/src/main/resources/eu/dnetlib/iis/wf/metadataextraction/cache/builder/oozie_app/workflow.xml +++ b/iis-wf/iis-wf-metadataextraction/src/main/resources/eu/dnetlib/iis/wf/metadataextraction/cache/builder/oozie_app/workflow.xml @@ -20,15 +20,14 @@ import.metadataExtraction cache report properties prefix, to be set dynamically according to internal app_path - - - objectstore_service_location - object store service location required for content retrieval - - - approved_objectstores_csv - CSV list of object stores identifiers to be processed - + + input_table_name + hive input table name + + + hive_metastore_uris + hive metastore locations + mimetypes_pdf pdf,application/pdf diff --git a/iis-wf/iis-wf-primary/src/main/resources/eu/dnetlib/iis/wf/primary/import/oozie_app/workflow.xml b/iis-wf/iis-wf-primary/src/main/resources/eu/dnetlib/iis/wf/primary/import/oozie_app/workflow.xml index 5df7025f4..681bfdc11 100644 --- a/iis-wf/iis-wf-primary/src/main/resources/eu/dnetlib/iis/wf/primary/import/oozie_app/workflow.xml +++ b/iis-wf/iis-wf-primary/src/main/resources/eu/dnetlib/iis/wf/primary/import/oozie_app/workflow.xml @@ -41,6 +41,7 @@ objectstore_service_location + $UNDEFINED$ object store service location required for content retrieval diff --git a/iis-wf/iis-wf-primary/src/main/resources/eu/dnetlib/iis/wf/primary/main/oozie_app/workflow.xml b/iis-wf/iis-wf-primary/src/main/resources/eu/dnetlib/iis/wf/primary/main/oozie_app/workflow.xml index aff140de0..9462dd86e 100644 --- a/iis-wf/iis-wf-primary/src/main/resources/eu/dnetlib/iis/wf/primary/main/oozie_app/workflow.xml +++ b/iis-wf/iis-wf-primary/src/main/resources/eu/dnetlib/iis/wf/primary/main/oozie_app/workflow.xml @@ -94,7 +94,21 @@ import_infospace_graph_location local InformationSpace graph location - + + + import_content_pdfaggregation_table_name + $UNDEFINED$ + Hive table name which is a part of the PDF aggregation subsystem, + defined in 'dbname.tablename' format to indicate both database and table. + This property value dictates the underlying importer module to be used: when set to $UNDEFINED$ value the old + ObjectStore based importer is activated otherwise PDF aggregation subsystem based importer is used. + + + import_content_pdfaggregation_hive_metastore_uris + $UNDEFINED$ + URIs pointing to the hive metastore utilized by the PDF aggregation subsystem + + import_content_object_store_location $UNDEFINED$ @@ -491,7 +505,16 @@ project_concepts_context_ids_csv ${import_project_concepts_context_ids_csv} - + + + input_table_name + ${import_content_pdfaggregation_table_name} + + + hive_metastore_uris + ${import_content_pdfaggregation_hive_metastore_uris} + + objectstore_service_location ${import_content_object_store_location} @@ -500,6 +523,7 @@ objectstore_s3_keystore_location ${import_content_objectstore_s3_keystore_location} + objectstore_s3_endpoint ${import_content_objectstore_s3_endpoint} diff --git a/pom.xml b/pom.xml index 03bfe686a..221bf82f6 100644 --- a/pom.xml +++ b/pom.xml @@ -338,6 +338,13 @@ ${iis.spark.version} provided + + + org.apache.spark + spark-hive_2.11 + ${iis.spark.version} + provided + pl.edu.icm.spark-utils