diff --git a/iis-common/src/main/java/eu/dnetlib/iis/common/spark/SparkSessionFactory.java b/iis-common/src/main/java/eu/dnetlib/iis/common/spark/SparkSessionFactory.java
index c743546ca..31ca4dc2b 100644
--- a/iis-common/src/main/java/eu/dnetlib/iis/common/spark/SparkSessionFactory.java
+++ b/iis-common/src/main/java/eu/dnetlib/iis/common/spark/SparkSessionFactory.java
@@ -13,4 +13,11 @@ public static SparkSession withConfAndKryo(SparkConf conf) {
.config(SparkConfHelper.withKryo(conf))
.getOrCreate();
}
+
+ public static SparkSession withHiveEnabledConfAndKryo(SparkConf conf) {
+ return SparkSession.builder()
+ .config(SparkConfHelper.withKryo(conf))
+ .enableHiveSupport()
+ .getOrCreate();
+ }
}
diff --git a/iis-common/src/main/java/eu/dnetlib/iis/common/spark/SparkSessionSupport.java b/iis-common/src/main/java/eu/dnetlib/iis/common/spark/SparkSessionSupport.java
index bef8d45d6..d15cd60fc 100644
--- a/iis-common/src/main/java/eu/dnetlib/iis/common/spark/SparkSessionSupport.java
+++ b/iis-common/src/main/java/eu/dnetlib/iis/common/spark/SparkSessionSupport.java
@@ -36,6 +36,24 @@ public static void runWithSparkSession(SparkConf conf,
job);
}
+ /**
+ * Runs a given job using SparkSession created using default builder and supplied SparkConf. Enables Hive support.
+ * Stops SparkSession when SparkSession is shared e.g. created in tests. Allows to reuse SparkSession created externally.
+ *
+ * @param conf SparkConf instance
+ * @param isSparkSessionShared When true will not stop SparkSession
+ * @param job Job using constructed SparkSession
+ */
+ public static void runWithHiveEnabledSparkSession(SparkConf conf,
+ Boolean isSparkSessionShared,
+ Job job) {
+ runWithSparkSession(
+ SparkSessionFactory::withHiveEnabledConfAndKryo,
+ conf,
+ isSparkSessionShared,
+ job);
+ }
+
/**
* Runs a given job using SparkSession created using supplied builder and supplied SparkConf. Stops SparkSession
* when SparkSession is shared e.g. created in tests. Allows to reuse SparkSession created externally.
diff --git a/iis-common/src/test/java/eu/dnetlib/iis/common/spark/TestWithSharedSparkSession.java b/iis-common/src/test/java/eu/dnetlib/iis/common/spark/TestWithSharedSparkSession.java
index bb7b20593..0723d0385 100644
--- a/iis-common/src/test/java/eu/dnetlib/iis/common/spark/TestWithSharedSparkSession.java
+++ b/iis-common/src/test/java/eu/dnetlib/iis/common/spark/TestWithSharedSparkSession.java
@@ -6,6 +6,8 @@
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeEach;
+import org.apache.spark.sql.SparkSession.Builder;
+
import java.util.Objects;
/**
@@ -15,7 +17,22 @@
public class TestWithSharedSparkSession {
private static SparkSession _spark;
protected boolean initialized = false;
+ protected final boolean hiveSupportEnabled;
+ /**
+ * Default constructor with hive support in spark session disabled.
+ */
+ public TestWithSharedSparkSession() {
+ this(false);
+ }
+
+ /**
+ * @param hiveSupportEnabled indicates whether hive support should be enabled in spark session
+ */
+ public TestWithSharedSparkSession(boolean hiveSupportEnabled) {
+ this.hiveSupportEnabled = hiveSupportEnabled;
+ }
+
public SparkSession spark() {
return _spark;
}
@@ -31,7 +48,11 @@ public void beforeEach() {
conf.set("spark.driver.host", "localhost");
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
conf.set("spark.kryo.registrator", "pl.edu.icm.sparkutils.avro.AvroCompatibleKryoRegistrator");
- _spark = SparkSession.builder().config(conf).getOrCreate();
+ Builder builder = SparkSession.builder().config(conf);
+ if (hiveSupportEnabled) {
+ builder = builder.enableHiveSupport();
+ }
+ _spark = builder.getOrCreate();
}
}
diff --git a/iis-wf/iis-wf-import/pom.xml b/iis-wf/iis-wf-import/pom.xml
index a3af82cb7..f6c86dac2 100644
--- a/iis-wf/iis-wf-import/pom.xml
+++ b/iis-wf/iis-wf-import/pom.xml
@@ -132,6 +132,18 @@
org.apache.spark
spark-avro_2.11
+
+
+ org.apache.spark
+ spark-hive_2.11
+
+
+
+ eigenbase
+ eigenbase-properties
+
+
+
org.apache.httpcomponents
diff --git a/iis-wf/iis-wf-import/src/main/java/eu/dnetlib/iis/wf/importer/content/HiveBasedDocumentContentUrlImporterJob.java b/iis-wf/iis-wf-import/src/main/java/eu/dnetlib/iis/wf/importer/content/HiveBasedDocumentContentUrlImporterJob.java
new file mode 100644
index 000000000..687158df1
--- /dev/null
+++ b/iis-wf/iis-wf-import/src/main/java/eu/dnetlib/iis/wf/importer/content/HiveBasedDocumentContentUrlImporterJob.java
@@ -0,0 +1,109 @@
+package eu.dnetlib.iis.wf.importer.content;
+
+import static eu.dnetlib.iis.common.spark.SparkSessionSupport.runWithHiveEnabledSparkSession;
+import static org.apache.spark.sql.functions.col;
+import static org.apache.spark.sql.functions.concat;
+import static org.apache.spark.sql.functions.lit;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+import com.google.common.collect.Lists;
+
+import eu.dnetlib.iis.common.InfoSpaceConstants;
+import eu.dnetlib.iis.common.java.io.HdfsUtils;
+import eu.dnetlib.iis.common.report.ReportEntryFactory;
+import eu.dnetlib.iis.common.schemas.ReportEntry;
+import eu.dnetlib.iis.common.spark.SparkConfHelper;
+import eu.dnetlib.iis.common.spark.avro.AvroDataFrameSupport;
+import eu.dnetlib.iis.importer.auxiliary.schemas.DocumentContentUrl;
+import pl.edu.icm.sparkutils.avro.SparkAvroSaver;
+
+
+/**
+ * {@link DocumentContentUrl} importer reading data from hive table being a part of pdf aggregation subsystem.
+ *
+ * @author mhorst
+ *
+ */
+public class HiveBasedDocumentContentUrlImporterJob {
+
+ private static SparkAvroSaver avroSaver = new SparkAvroSaver();
+
+ private static final String COUNTER_IMPORTED_RECORDS_TOTAL = "import.content.urls.fromAggregator";
+
+ public static void main(String[] args) throws Exception {
+
+ HiveBasedDocumentContentUrlImporterJobParameters params = new HiveBasedDocumentContentUrlImporterJobParameters();
+ JCommander jcommander = new JCommander(params);
+ jcommander.parse(args);
+
+ SparkConf conf = SparkConfHelper.withKryo(new SparkConf());
+ if (!StringUtils.isEmpty(params.hiveMetastoreUris)) {
+ conf.set("hive.metastore.uris", params.hiveMetastoreUris);
+ }
+
+ runWithHiveEnabledSparkSession(conf, params.isSparkSessionShared, sparkSession -> {
+
+ HdfsUtils.remove(sparkSession.sparkContext().hadoopConfiguration(), params.outputPath);
+ HdfsUtils.remove(sparkSession.sparkContext().hadoopConfiguration(), params.outputReportPath);
+
+ Dataset result = sparkSession.sql("select id, location, mimetype, size, hash from "
+ + params.inputTableName + " where location is not null");
+
+ JavaRDD documentContentUrl = buildOutputRecord(result, sparkSession);
+ documentContentUrl.cache();
+
+ JavaRDD reports = generateReportEntries(sparkSession, documentContentUrl.count());
+
+ avroSaver.saveJavaRDD(documentContentUrl, DocumentContentUrl.SCHEMA$, params.outputPath);
+ avroSaver.saveJavaRDD(reports, ReportEntry.SCHEMA$, params.outputReportPath);
+ });
+ }
+
+ private static JavaRDD generateReportEntries(SparkSession sparkSession, long recordsCount) {
+ return sparkSession.createDataset(
+ Lists.newArrayList(
+ ReportEntryFactory.createCounterReportEntry(COUNTER_IMPORTED_RECORDS_TOTAL, recordsCount)),
+ Encoders.kryo(ReportEntry.class)).javaRDD();
+ }
+
+ private static JavaRDD buildOutputRecord(Dataset source, SparkSession spark) {
+ Dataset resultDs = source.select(
+ concat(lit(InfoSpaceConstants.ROW_PREFIX_RESULT), col("id")).as("id"),
+ col("location").as("url"),
+ col("mimetype").as("mimeType"),
+ col("size").cast("long").divide(1024).as("contentSizeKB"),
+ col("hash").as("contentChecksum")
+ );
+ return new AvroDataFrameSupport(spark).toDS(resultDs, DocumentContentUrl.class).toJavaRDD();
+ }
+
+ @Parameters(separators = "=")
+ private static class HiveBasedDocumentContentUrlImporterJobParameters {
+
+ @Parameter(names = "-sharedSparkSession")
+ private Boolean isSparkSessionShared = Boolean.FALSE;
+
+ @Parameter(names = "-inputTableName", required = true)
+ private String inputTableName;
+
+ @Parameter(names = "-hiveMetastoreUris", required = true)
+ private String hiveMetastoreUris;
+
+ @Parameter(names = "-outputPath", required = true)
+ private String outputPath;
+
+ @Parameter(names = "-outputReportPath", required = true)
+ private String outputReportPath;
+ }
+
+}
diff --git a/iis-wf/iis-wf-import/src/main/resources/eu/dnetlib/iis/wf/importer/content_url/chain/oozie_app/import.txt b/iis-wf/iis-wf-import/src/main/resources/eu/dnetlib/iis/wf/importer/content_url/chain/oozie_app/import.txt
index 72c9899d8..6cb19bab5 100644
--- a/iis-wf/iis-wf-import/src/main/resources/eu/dnetlib/iis/wf/importer/content_url/chain/oozie_app/import.txt
+++ b/iis-wf/iis-wf-import/src/main/resources/eu/dnetlib/iis/wf/importer/content_url/chain/oozie_app/import.txt
@@ -1,5 +1,6 @@
## This is a classpath-based import file (this header is required)
import_content_url_core classpath eu/dnetlib/iis/wf/importer/content_url/core/oozie_app
+import_content_url_core_parquet classpath eu/dnetlib/iis/wf/importer/content_url/core_parquet/oozie_app
import_content_url_dedup classpath eu/dnetlib/iis/wf/importer/content_url/dedup/oozie_app
transformers_idreplacer classpath eu/dnetlib/iis/wf/transformers/idreplacer/oozie_app
transformers_common_existencefilter classpath eu/dnetlib/iis/wf/transformers/common/existencefilter/oozie_app
diff --git a/iis-wf/iis-wf-import/src/main/resources/eu/dnetlib/iis/wf/importer/content_url/chain/oozie_app/workflow.xml b/iis-wf/iis-wf-import/src/main/resources/eu/dnetlib/iis/wf/importer/content_url/chain/oozie_app/workflow.xml
index 1ff3212ff..e0d417adb 100644
--- a/iis-wf/iis-wf-import/src/main/resources/eu/dnetlib/iis/wf/importer/content_url/chain/oozie_app/workflow.xml
+++ b/iis-wf/iis-wf-import/src/main/resources/eu/dnetlib/iis/wf/importer/content_url/chain/oozie_app/workflow.xml
@@ -2,25 +2,40 @@
+
+
+ input_table_name
+ $UNDEFINED$
+ hive input table name
+
+
+ hive_metastore_uris
+ $UNDEFINED$
+ hive metastore locations
+
+
+
objectstore_facade_factory_classname
eu.dnetlib.iis.wf.importer.facade.WebServiceObjectStoreFacadeFactory
ServiceFacadeFactory implementation class name producing eu.dnetlib.iis.wf.importer.facade.ObjectStoreFacade
-
- objectstore_service_location
- object store service location to retrieve PDF/text contents from
-
-
- approved_objectstores_csv
- $UNDEFINED$
- CSV of approved object stores
-
-
- blacklisted_objectstores_csv
- $UNDEFINED$
- CSV of blacklisted object stores
-
+
+ objectstore_service_location
+ $UNDEFINED$
+ object store service location to retrieve PDF/text contents from
+
+
+ approved_objectstores_csv
+ $UNDEFINED$
+ CSV of approved object stores
+
+
+ blacklisted_objectstores_csv
+ $UNDEFINED$
+ CSV of blacklisted object stores
+
+
mimetypes_pdf
pdf mime types
@@ -146,9 +161,16 @@
org.apache.avro.Schema.Type.STRING
-
+
+
+
+
+ ${input_table_name eq "$UNDEFINED$"}
+
+
+
@@ -169,6 +191,25 @@
+
+
+ ${wf:appPath()}/import_content_url_core_parquet
+
+
+
+ workingDir
+ ${workingDir}/import_content_url_core/working_dir
+
+
+ output
+ ${workingDir}/imported-urls
+
+
+
+
+
+
+
eu.dnetlib.iis.common.java.ProcessWrapper
@@ -239,7 +280,7 @@
- ${input_id eq "$UNDEFINED$"}
+ ${input_id eq "$UNDEFINED$"}
diff --git a/iis-wf/iis-wf-import/src/main/resources/eu/dnetlib/iis/wf/importer/content_url/core_parquet/oozie_app/workflow.xml b/iis-wf/iis-wf-import/src/main/resources/eu/dnetlib/iis/wf/importer/content_url/core_parquet/oozie_app/workflow.xml
new file mode 100644
index 000000000..d60bb44e9
--- /dev/null
+++ b/iis-wf/iis-wf-import/src/main/resources/eu/dnetlib/iis/wf/importer/content_url/core_parquet/oozie_app/workflow.xml
@@ -0,0 +1,116 @@
+
+
+
+
+ input_table_name
+ hive input table name
+
+
+ hive_metastore_uris
+ hive metastore locations
+
+
+
+ output
+ DocumentContentUrl avro datastore output
+
+
+ output_report_root_path
+ base directory for storing reports
+
+
+ output_report_relative_path
+ import_content_url_from_aggregator
+ directory for storing report (relative to output_report_root_path)
+
+
+ sparkDriverMemory
+ memory for driver process
+
+
+ sparkExecutorMemory
+ memory for individual executor
+
+
+ sparkExecutorCores
+ number of cores used by single executor
+
+
+ oozieActionShareLibForSpark2
+ oozie action sharelib for spark 2.*
+
+
+ spark2ExtraListeners
+ com.cloudera.spark.lineage.NavigatorAppListener
+ spark 2.* extra listeners classname
+
+
+ spark2SqlQueryExecutionListeners
+ com.cloudera.spark.lineage.NavigatorQueryListener
+ spark 2.* sql query execution listeners classname
+
+
+ spark2YarnHistoryServerAddress
+ spark 2.* yarn history server address
+
+
+ spark2EventLogDir
+ spark 2.* event log dir location
+
+
+
+
+ ${jobTracker}
+ ${nameNode}
+
+
+ mapreduce.job.queuename
+ ${queueName}
+
+
+ oozie.launcher.mapred.job.queue.name
+ ${oozieLauncherQueueName}
+
+
+ oozie.action.sharelib.for.spark
+ ${oozieActionShareLibForSpark2}
+
+
+
+
+
+
+
+
+ yarn-cluster
+ cluster
+ import-content-url_core
+ eu.dnetlib.iis.wf.importer.content.HiveBasedDocumentContentUrlImporterJob
+ ${oozieTopWfApplicationPath}/lib/iis-wf-import-${projectVersion}.jar
+
+ --executor-memory=${sparkExecutorMemory}
+ --executor-cores=${sparkExecutorCores}
+ --driver-memory=${sparkDriverMemory}
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+
+ -inputTableName=${input_table_name}
+ -hiveMetastoreUris=${hive_metastore_uris}
+ -outputPath=${output}
+ -outputReportPath=${output_report_root_path}/${output_report_relative_path}
+
+
+
+
+
+
+ Unfortunately, the process failed -- error message:
+ [${wf:errorMessage(wf:lastErrorNode())}]
+
+
+
+
+
+
diff --git a/iis-wf/iis-wf-import/src/test/java/eu/dnetlib/iis/wf/importer/content/HiveBasedDocumentContentUrlImporterJobTest.java b/iis-wf/iis-wf-import/src/test/java/eu/dnetlib/iis/wf/importer/content/HiveBasedDocumentContentUrlImporterJobTest.java
new file mode 100644
index 000000000..8ed3129ae
--- /dev/null
+++ b/iis-wf/iis-wf-import/src/test/java/eu/dnetlib/iis/wf/importer/content/HiveBasedDocumentContentUrlImporterJobTest.java
@@ -0,0 +1,128 @@
+package eu.dnetlib.iis.wf.importer.content;
+
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import java.nio.file.Path;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import eu.dnetlib.iis.common.ClassPathResourceProvider;
+import eu.dnetlib.iis.common.schemas.ReportEntry;
+import eu.dnetlib.iis.common.spark.TestWithSharedSparkSession;
+import eu.dnetlib.iis.common.utils.AvroAssertTestUtil;
+import eu.dnetlib.iis.importer.auxiliary.schemas.DocumentContentUrl;
+
+/**
+ * @author mhorst
+ *
+ */
+public class HiveBasedDocumentContentUrlImporterJobTest extends TestWithSharedSparkSession {
+
+ @TempDir
+ public Path workingDir;
+
+ private Path outputDir;
+ private Path outputReportDir;
+
+ private HiveBasedDocumentContentUrlImporterJobTest() {
+ // enabling hive support in spark session instantiated for this set of tests
+ super(true);
+ }
+
+ @BeforeEach
+ public void beforeEach() {
+ super.beforeEach();
+
+ outputDir = workingDir.resolve("output");
+ outputReportDir = workingDir.resolve("output_report");
+ }
+
+ @Test
+ public void testImportFromNonExistingTable() throws Exception {
+ // when
+ String inputTableName = "nonExistingTable";
+ String hiveMetastoreUris = "localhost";
+
+ assertThrows(RuntimeException.class, () -> HiveBasedDocumentContentUrlImporterJob.main(new String[]{
+ "-sharedSparkSession",
+ "-inputTableName", inputTableName,
+ "-hiveMetastoreUris", hiveMetastoreUris,
+ "-outputPath", outputDir.toString(),
+ "-outputReportPath", outputReportDir.toString(),
+ }));
+ }
+
+ @Test
+ public void testImportFromExistingLocalTable() throws Exception {
+ // when
+ String inputDbName = "test_aggregation_import_db";
+ String inputTableName = "agg_payload";
+ String hiveMetastoreUris = "";
+
+ // initializing test database
+ initializeLocalDatabase(inputDbName, inputTableName);
+ spark().sql("INSERT INTO TABLE " + inputDbName + '.' + inputTableName +
+ " VALUES ('od______2367::0001a50c6388e9bfcb791a924ec4b837', "
+ + "'s3://some_location', "
+ + "'application/pdf', '1024', '3a0fb023de6c8735a52ac9c1edace612')");
+
+ HiveBasedDocumentContentUrlImporterJob.main(new String[]{
+ "-sharedSparkSession",
+ "-inputTableName", inputDbName + '.' + inputTableName,
+ "-hiveMetastoreUris", hiveMetastoreUris,
+ "-outputPath", outputDir.toString(),
+ "-outputReportPath", outputReportDir.toString(),
+ });
+
+ // then
+ String expectedOutputPath = ClassPathResourceProvider.getResourcePath("eu/dnetlib/iis/wf/importer/content/output/unit_test/document_content_url.json");
+ String expectedReportPath = ClassPathResourceProvider.getResourcePath("eu/dnetlib/iis/wf/importer/content/output/unit_test/report.json");
+
+ AvroAssertTestUtil.assertEqualsWithJsonIgnoreOrder(outputDir.toString(), expectedOutputPath, DocumentContentUrl.class);
+ AvroAssertTestUtil.assertEqualsWithJsonIgnoreOrder(outputReportDir.toString(), expectedReportPath, ReportEntry.class);
+
+ }
+
+ @Test
+ public void testImportFromEmptyLocalTable() throws Exception {
+ // when
+ String inputDbName = "test_aggregation_import_db";
+ String inputTableName = "agg_payload";
+ String hiveMetastoreUris = "";
+
+ // initializing test database
+ initializeLocalDatabase(inputDbName, inputTableName);
+
+ HiveBasedDocumentContentUrlImporterJob.main(new String[]{
+ "-sharedSparkSession",
+ "-inputTableName", inputDbName + '.' + inputTableName,
+ "-hiveMetastoreUris", hiveMetastoreUris,
+ "-outputPath", outputDir.toString(),
+ "-outputReportPath", outputReportDir.toString(),
+ });
+
+ // then
+ String expectedOutputPath = ClassPathResourceProvider.getResourcePath("eu/dnetlib/iis/wf/importer/content/output/unit_test/document_content_url_empty.json");
+ String expectedReportPath = ClassPathResourceProvider.getResourcePath("eu/dnetlib/iis/wf/importer/content/output/unit_test/report_empty.json");
+
+ AvroAssertTestUtil.assertEqualsWithJsonIgnoreOrder(outputDir.toString(), expectedOutputPath, DocumentContentUrl.class);
+ AvroAssertTestUtil.assertEqualsWithJsonIgnoreOrder(outputReportDir.toString(), expectedReportPath, ReportEntry.class);
+
+ }
+
+ /**
+ * Initializes local hive database.
+ * @param inputDbName database name to be initialized
+ * @param inputTableName table to name to be recreated
+ */
+ private void initializeLocalDatabase(String inputDbName, String inputTableName) {
+ spark().sql("CREATE DATABASE IF NOT EXISTS " + inputDbName);
+ spark().sql("DROP TABLE IF EXISTS " + inputDbName + '.' + inputTableName);
+ spark().sql("CREATE TABLE " + inputDbName + '.' + inputTableName +
+ "(id string, location string, mimetype string, size string, hash string)"
+ + " STORED AS PARQUET");
+ }
+
+}
diff --git a/iis-wf/iis-wf-import/src/test/java/eu/dnetlib/iis/wf/importer/content/WorkflowTest.java b/iis-wf/iis-wf-import/src/test/java/eu/dnetlib/iis/wf/importer/content/WorkflowTest.java
index 8b6c0abf4..cecb34ecc 100644
--- a/iis-wf/iis-wf-import/src/test/java/eu/dnetlib/iis/wf/importer/content/WorkflowTest.java
+++ b/iis-wf/iis-wf-import/src/test/java/eu/dnetlib/iis/wf/importer/content/WorkflowTest.java
@@ -2,6 +2,8 @@
import eu.dnetlib.iis.common.AbstractOozieWorkflowTestCase;
import eu.dnetlib.iis.common.OozieWorkflowTestConfiguration;
+
+import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
/**
@@ -13,10 +15,17 @@
public class WorkflowTest extends AbstractOozieWorkflowTestCase {
@Test
- public void testImportContentUrlWorkflow() {
+ public void testImportContentUrlWorkflowWithObjectStoreAsBackend() {
OozieWorkflowTestConfiguration wfConf = new OozieWorkflowTestConfiguration();
wfConf.setTimeoutInSeconds(720);
- testWorkflow("eu/dnetlib/iis/wf/importer/content_url/chain/sampletest", wfConf);
+ testWorkflow("eu/dnetlib/iis/wf/importer/content_url/chain/objectstore_based_importer", wfConf);
+ }
+
+ @Disabled("TODO reenable this test once Hive is installed on CI Test hadoop cluster")
+ public void testImportContentUrlWorkflowWithHiveBasedAggregationSubsystemAsBackend() {
+ OozieWorkflowTestConfiguration wfConf = new OozieWorkflowTestConfiguration();
+ wfConf.setTimeoutInSeconds(1440);
+ testWorkflow("eu/dnetlib/iis/wf/importer/content_url/chain/agg_subsystem_based_importer", wfConf);
}
@Test
diff --git a/iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content/output/unit_test/document_content_url.json b/iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content/output/unit_test/document_content_url.json
new file mode 100644
index 000000000..0823f9e2c
--- /dev/null
+++ b/iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content/output/unit_test/document_content_url.json
@@ -0,0 +1 @@
+{"id": "50|od______2367::0001a50c6388e9bfcb791a924ec4b837", "url": "s3://some_location", "mimeType": "application/pdf", "contentChecksum": "3a0fb023de6c8735a52ac9c1edace612", "contentSizeKB": 1}
\ No newline at end of file
diff --git a/iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content/output/unit_test/document_content_url_empty.json b/iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content/output/unit_test/document_content_url_empty.json
new file mode 100644
index 000000000..e69de29bb
diff --git a/iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content/output/unit_test/report.json b/iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content/output/unit_test/report.json
new file mode 100644
index 000000000..a878e26e6
--- /dev/null
+++ b/iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content/output/unit_test/report.json
@@ -0,0 +1 @@
+{"key":"import.content.urls.fromAggregator", "type":"COUNTER", "value": "1"}
diff --git a/iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content/output/unit_test/report_empty.json b/iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content/output/unit_test/report_empty.json
new file mode 100644
index 000000000..6a23fd53f
--- /dev/null
+++ b/iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content/output/unit_test/report_empty.json
@@ -0,0 +1 @@
+{"key":"import.content.urls.fromAggregator", "type":"COUNTER", "value": "0"}
diff --git a/iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content_url/chain/sampletest/oozie_app/import.txt b/iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content_url/chain/agg_subsystem_based_importer/oozie_app/import.txt
similarity index 100%
rename from iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content_url/chain/sampletest/oozie_app/import.txt
rename to iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content_url/chain/agg_subsystem_based_importer/oozie_app/import.txt
diff --git a/iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content_url/chain/agg_subsystem_based_importer/oozie_app/initdb.q b/iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content_url/chain/agg_subsystem_based_importer/oozie_app/initdb.q
new file mode 100644
index 000000000..a2d330944
--- /dev/null
+++ b/iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content_url/chain/agg_subsystem_based_importer/oozie_app/initdb.q
@@ -0,0 +1,11 @@
+CREATE DATABASE IF NOT EXISTS ${DB_NAME};
+
+DROP TABLE IF EXISTS ${DB_NAME}.${TABLE_NAME};
+
+CREATE TABLE ${DB_NAME}.${TABLE_NAME} (id string, location string, mimetype string, size string, hash string) STORED AS PARQUET;
+
+INSERT INTO TABLE ${DB_NAME}.${TABLE_NAME} VALUES
+ ('doaj10829873::0092cca2055a02f51c0b21b332395b09', 'http://localhost:8280/is/mvc/objectStore/retrieve.do?objectStore=doaj10829873%3A%3A0092cca2055a02f51c0b21b332395b09%3A%3A2ed5ca4d0d8f77aaf2ef4fbf12418750&objectId=doaj10829873%3A%3A0092cca2055a02f51c0b21b332395b09%3A%3A2ed5ca4d0d8f77aaf2ef4fbf12418750', 'text/html', '38912', '19a53e9ebb0c878ad088568a8cf23dc6'),
+ ('od______2367::0001a50c6388e9bfcb791a924ec4b837', 'http://localhost:8280/is/mvc/objectStore/retrieve.do?objectStore=od______2367%3A%3A0001a50c6388e9bfcb791a924ec4b837%3A%3A80aedf56c7fedf81758414e3b5af1f70&objectId=od______2367%3A%3A0001a50c6388e9bfcb791a924ec4b837%3A%3A80aedf56c7fedf81758414e3b5af1f70', 'application/pdf', '1024', '3a0fb023de6c8735a52ac9c1edace612'),
+ ('dedup1______::000015093c397516c0b1b000f38982de', 'http://localhost:8280/is/mvc/objectStore/retrieve.do?objectStore=webcrawl____%3A%3A000015093c397516c0b1b000f38982de&objectId=webcrawl____%3A%3A000015093c397516c0b1b000f38982de', 'file::WoS', '6144', 'a9af919ba339a1491a922a7fe7b2580f'),
+ ('od_______908::00005ab6b308ff8f0f5415be03d8cce9', 'http://localhost:8280/is/mvc/objectStore/retrieve.do?objectStore=od_______908%3A%3A00005ab6b308ff8f0f5415be03d8cce9&objectId=od_______908%3A%3A00005ab6b308ff8f0f5415be03d8cce9', 'xml', '7168', '464ac8be9b33e109e98596799cf9d05d');
\ No newline at end of file
diff --git a/iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content_url/chain/agg_subsystem_based_importer/oozie_app/teardowndb.q b/iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content_url/chain/agg_subsystem_based_importer/oozie_app/teardowndb.q
new file mode 100644
index 000000000..13cee61fc
--- /dev/null
+++ b/iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content_url/chain/agg_subsystem_based_importer/oozie_app/teardowndb.q
@@ -0,0 +1,2 @@
+DROP TABLE ${DB_NAME}.${TABLE_NAME};
+DROP DATABASE ${DB_NAME};
\ No newline at end of file
diff --git a/iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content_url/chain/agg_subsystem_based_importer/oozie_app/workflow.xml b/iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content_url/chain/agg_subsystem_based_importer/oozie_app/workflow.xml
new file mode 100644
index 000000000..53f27519c
--- /dev/null
+++ b/iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content_url/chain/agg_subsystem_based_importer/oozie_app/workflow.xml
@@ -0,0 +1,195 @@
+
+
+
+
+
+ localDbName
+ content_agg_test_db
+
+
+ localTableName
+ payload
+
+
+
+
+ ${jobTracker}
+ ${nameNode}
+
+
+ mapreduce.job.queuename
+ ${queueName}
+
+
+ oozie.launcher.mapred.job.queue.name
+ ${oozieLauncherQueueName}
+
+
+
+
+
+
+
+
+
+ jdbc:hive2://iis-cdh5-test-m3.ocean.icm.edu.pl:10000
+
+ DB_NAME=${localDbName}
+ TABLE_NAME=${localTableName}
+
+
+
+
+
+
+
+
+
+
+
+ eu.dnetlib.iis.common.java.ProcessWrapper
+ eu.dnetlib.iis.common.java.jsonworkflownodes.Producer
+
+ -C{id,
+ eu.dnetlib.iis.common.schemas.Identifier,
+ eu/dnetlib/iis/wf/importer/content_url/data/input/id.json}
+
+ -C{id_mapping,
+ eu.dnetlib.iis.common.schemas.IdentifierMapping,
+ eu/dnetlib/iis/wf/importer/content_url/data/input/id_mapping.json}
+ -Oid=${workingDir}/producer/id
+ -Oid_mapping=${workingDir}/producer/id_mapping
+
+
+
+
+
+
+
+ ${wf:appPath()}/import_content_url
+
+
+
+ input_table_name
+ ${localDbName}.${localTableName}
+
+
+
+ hive_metastore_uris
+ thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083
+
+
+ mimetypes_pdf
+ application/pdf
+
+
+ mimetypes_html
+ text/html
+
+
+ mimetypes_xml_pmc
+ xml
+
+
+ mimetypes_wos
+ file::WoS
+
+
+ input_id
+ ${workingDir}/producer/id
+
+
+ input_id_mapping
+ ${workingDir}/producer/id_mapping
+
+
+ output_root
+ ${workingDir}/out
+
+
+ output_report_root_path
+ ${workingDir}/report
+
+
+ output_report_relative_path
+ import_content_url
+
+
+
+
+
+
+
+
+
+
+ eu.dnetlib.iis.common.java.ProcessWrapper
+
+ eu.dnetlib.iis.common.java.jsonworkflownodes.TestingConsumer
+
+ -C{content_url_html,
+ eu.dnetlib.iis.importer.auxiliary.schemas.DocumentContentUrl,
+ eu/dnetlib/iis/wf/importer/content_url/data/output/document_content_url_html.json}
+ -C{content_url_pdf,
+ eu.dnetlib.iis.importer.auxiliary.schemas.DocumentContentUrl,
+ eu/dnetlib/iis/wf/importer/content_url/data/output/document_content_url_pdf.json}
+ -C{content_url_wos,
+ eu.dnetlib.iis.importer.auxiliary.schemas.DocumentContentUrl,
+ eu/dnetlib/iis/wf/importer/content_url/data/output/document_content_url_wos.json}
+ -C{content_url_xml,
+ eu.dnetlib.iis.importer.auxiliary.schemas.DocumentContentUrl,
+ eu/dnetlib/iis/wf/importer/content_url/data/output/document_content_url_xml.json}
+
+
+ -Icontent_url_html=${workingDir}/out/html
+ -Icontent_url_pdf=${workingDir}/out/pdf
+ -Icontent_url_wos=${workingDir}/out/wos
+ -Icontent_url_xml=${workingDir}/out/xmlpmc
+
+
+
+
+
+
+
+ eu.dnetlib.iis.common.java.ProcessWrapper
+ eu.dnetlib.iis.common.java.jsonworkflownodes.TestingConsumer
+ -C{report,eu.dnetlib.iis.common.schemas.ReportEntry,eu/dnetlib/iis/wf/importer/content_url/data/output/report_from_agg_subsystem.json}
+ -Ireport=${workingDir}/report/import_content_url
+
+
+
+
+
+
+
+
+ jdbc:hive2://iis-cdh5-test-m3.ocean.icm.edu.pl:10000
+
+ DB_NAME=${localDbName}
+ TABLE_NAME=${localTableName}
+
+
+
+
+
+
+
+
+ jdbc:hive2://iis-cdh5-test-m3.ocean.icm.edu.pl:10000
+
+ DB_NAME=${localDbName}
+ TABLE_NAME=${localTableName}
+
+
+
+
+
+
+ Unfortunately, the process failed -- error message: [${wf:errorMessage(wf:lastErrorNode())}]
+
+
+
+
\ No newline at end of file
diff --git a/iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content_url/chain/objectstore_based_importer/oozie_app/import.txt b/iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content_url/chain/objectstore_based_importer/oozie_app/import.txt
new file mode 100644
index 000000000..ab9386b5f
--- /dev/null
+++ b/iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content_url/chain/objectstore_based_importer/oozie_app/import.txt
@@ -0,0 +1,2 @@
+## This is a classpath-based import file (this header is required)
+import_content_url classpath eu/dnetlib/iis/wf/importer/content_url/chain/oozie_app
diff --git a/iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content_url/chain/sampletest/oozie_app/workflow.xml b/iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content_url/chain/objectstore_based_importer/oozie_app/workflow.xml
similarity index 99%
rename from iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content_url/chain/sampletest/oozie_app/workflow.xml
rename to iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content_url/chain/objectstore_based_importer/oozie_app/workflow.xml
index f8485e19d..c25669981 100644
--- a/iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content_url/chain/sampletest/oozie_app/workflow.xml
+++ b/iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content_url/chain/objectstore_based_importer/oozie_app/workflow.xml
@@ -1,5 +1,5 @@
-
+
${jobTracker}
@@ -17,7 +17,7 @@
-
+
diff --git a/iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content_url/data/output/report_from_agg_subsystem.json b/iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content_url/data/output/report_from_agg_subsystem.json
new file mode 100644
index 000000000..950ce481e
--- /dev/null
+++ b/iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content_url/data/output/report_from_agg_subsystem.json
@@ -0,0 +1,5 @@
+{"key":"import.content.urls.bytype.pdf", "type":"COUNTER", "value": "1"}
+{"key":"import.content.urls.bytype.html", "type":"COUNTER", "value": "1"}
+{"key":"import.content.urls.bytype.jats", "type":"COUNTER", "value": "1"}
+{"key":"import.content.urls.bytype.wos", "type":"COUNTER", "value": "1"}
+{"key": "import.content.urls.fromAggregator", "type": "COUNTER", "value": "4"}
diff --git a/iis-wf/iis-wf-metadataextraction/pom.xml b/iis-wf/iis-wf-metadataextraction/pom.xml
index 472a176e3..358a71b1f 100644
--- a/iis-wf/iis-wf-metadataextraction/pom.xml
+++ b/iis-wf/iis-wf-metadataextraction/pom.xml
@@ -38,6 +38,23 @@
${project.version}
test-jar
test
+
+
+
+ com.fasterxml.jackson.core
+ jackson-core
+
+
+
+ com.fasterxml.jackson.core
+ jackson-databind
+
+
+
+ com.fasterxml.jackson.core
+ jackson-annotations
+
+
diff --git a/iis-wf/iis-wf-metadataextraction/src/main/resources/eu/dnetlib/iis/wf/metadataextraction/cache/builder/oozie_app/workflow.xml b/iis-wf/iis-wf-metadataextraction/src/main/resources/eu/dnetlib/iis/wf/metadataextraction/cache/builder/oozie_app/workflow.xml
index a3d63b2eb..7977e7bc6 100644
--- a/iis-wf/iis-wf-metadataextraction/src/main/resources/eu/dnetlib/iis/wf/metadataextraction/cache/builder/oozie_app/workflow.xml
+++ b/iis-wf/iis-wf-metadataextraction/src/main/resources/eu/dnetlib/iis/wf/metadataextraction/cache/builder/oozie_app/workflow.xml
@@ -20,15 +20,14 @@
import.metadataExtraction
cache report properties prefix, to be set dynamically according to internal app_path
-
-
- objectstore_service_location
- object store service location required for content retrieval
-
-
- approved_objectstores_csv
- CSV list of object stores identifiers to be processed
-
+
+ input_table_name
+ hive input table name
+
+
+ hive_metastore_uris
+ hive metastore locations
+
mimetypes_pdf
pdf,application/pdf
diff --git a/iis-wf/iis-wf-primary/src/main/resources/eu/dnetlib/iis/wf/primary/import/oozie_app/workflow.xml b/iis-wf/iis-wf-primary/src/main/resources/eu/dnetlib/iis/wf/primary/import/oozie_app/workflow.xml
index 5df7025f4..681bfdc11 100644
--- a/iis-wf/iis-wf-primary/src/main/resources/eu/dnetlib/iis/wf/primary/import/oozie_app/workflow.xml
+++ b/iis-wf/iis-wf-primary/src/main/resources/eu/dnetlib/iis/wf/primary/import/oozie_app/workflow.xml
@@ -41,6 +41,7 @@
objectstore_service_location
+ $UNDEFINED$
object store service location required for content retrieval
diff --git a/iis-wf/iis-wf-primary/src/main/resources/eu/dnetlib/iis/wf/primary/main/oozie_app/workflow.xml b/iis-wf/iis-wf-primary/src/main/resources/eu/dnetlib/iis/wf/primary/main/oozie_app/workflow.xml
index aff140de0..9462dd86e 100644
--- a/iis-wf/iis-wf-primary/src/main/resources/eu/dnetlib/iis/wf/primary/main/oozie_app/workflow.xml
+++ b/iis-wf/iis-wf-primary/src/main/resources/eu/dnetlib/iis/wf/primary/main/oozie_app/workflow.xml
@@ -94,7 +94,21 @@
import_infospace_graph_location
local InformationSpace graph location
-
+
+
+ import_content_pdfaggregation_table_name
+ $UNDEFINED$
+ Hive table name which is a part of the PDF aggregation subsystem,
+ defined in 'dbname.tablename' format to indicate both database and table.
+ This property value dictates the underlying importer module to be used: when set to $UNDEFINED$ value the old
+ ObjectStore based importer is activated otherwise PDF aggregation subsystem based importer is used.
+
+
+ import_content_pdfaggregation_hive_metastore_uris
+ $UNDEFINED$
+ URIs pointing to the hive metastore utilized by the PDF aggregation subsystem
+
+
import_content_object_store_location
$UNDEFINED$
@@ -491,7 +505,16 @@
project_concepts_context_ids_csv
${import_project_concepts_context_ids_csv}
-
+
+
+ input_table_name
+ ${import_content_pdfaggregation_table_name}
+
+
+ hive_metastore_uris
+ ${import_content_pdfaggregation_hive_metastore_uris}
+
+
objectstore_service_location
${import_content_object_store_location}
@@ -500,6 +523,7 @@
objectstore_s3_keystore_location
${import_content_objectstore_s3_keystore_location}
+
objectstore_s3_endpoint
${import_content_objectstore_s3_endpoint}
diff --git a/pom.xml b/pom.xml
index 03bfe686a..221bf82f6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -338,6 +338,13 @@
${iis.spark.version}
provided
+
+
+ org.apache.spark
+ spark-hive_2.11
+ ${iis.spark.version}
+ provided
+
pl.edu.icm.spark-utils