From 692128adc2af93b6ed1a7c28b2666a458370b6d7 Mon Sep 17 00:00:00 2001 From: Marek Horst Date: Mon, 2 Aug 2021 16:37:24 +0200 Subject: [PATCH 1/9] Closes #1298: Refactor the way IIS imports contents by dumping ObjectStore in favour of newly introduced PDF aggregation service Initial implementation, proof of concept. --- ...iveBasedDocumentContentUrlImporterJob.java | 104 +++++++++++++++ .../core_parquet/oozie_app/workflow.xml | 121 ++++++++++++++++++ 2 files changed, 225 insertions(+) create mode 100644 iis-wf/iis-wf-import/src/main/java/eu/dnetlib/iis/wf/importer/content/HiveBasedDocumentContentUrlImporterJob.java create mode 100644 iis-wf/iis-wf-import/src/main/resources/eu/dnetlib/iis/wf/importer/content_url/core_parquet/oozie_app/workflow.xml diff --git a/iis-wf/iis-wf-import/src/main/java/eu/dnetlib/iis/wf/importer/content/HiveBasedDocumentContentUrlImporterJob.java b/iis-wf/iis-wf-import/src/main/java/eu/dnetlib/iis/wf/importer/content/HiveBasedDocumentContentUrlImporterJob.java new file mode 100644 index 000000000..bf5aac16e --- /dev/null +++ b/iis-wf/iis-wf-import/src/main/java/eu/dnetlib/iis/wf/importer/content/HiveBasedDocumentContentUrlImporterJob.java @@ -0,0 +1,104 @@ +package eu.dnetlib.iis.wf.importer.content; + +import static org.apache.spark.sql.functions.col; +import static org.apache.spark.sql.functions.concat; +import static org.apache.spark.sql.functions.lit; + +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.functions; + +import com.beust.jcommander.JCommander; +import com.beust.jcommander.Parameter; +import com.beust.jcommander.Parameters; + +import eu.dnetlib.iis.common.schemas.IdentifierMapping; +import eu.dnetlib.iis.common.spark.SparkConfHelper; +import eu.dnetlib.iis.importer.auxiliary.schemas.DocumentContentUrl; +import eu.dnetlib.iis.common.spark.avro.AvroDataFrameSupport; +import pl.edu.icm.sparkutils.avro.SparkAvroSaver; + + +/** + * {@link DocumentContentUrl} importer reading data from hive table being a part of pdf aggregation subsystem. + * + * @author mhorst + * + */ +public class HiveBasedDocumentContentUrlImporterJob { + + private static SparkAvroSaver avroSaver = new SparkAvroSaver(); + + public static void main(String[] args) throws Exception { + + HiveBasedDocumentContentUrlImporterJobParameters params = new HiveBasedDocumentContentUrlImporterJobParameters(); + JCommander jcommander = new JCommander(params); + jcommander.parse(args); + + SparkConf conf = SparkConfHelper.withKryo(new SparkConf()); + // conf.registerKryoClasses(OafModelUtils.provideOafClasses()); + conf.set("hive.metastore.uris", params.hiveMetastoreUris); + + SparkSession sparkSession = SparkSession.builder() + //.config(SparkConfHelper.withKryo(conf)) + .config(conf) + .enableHiveSupport() + .getOrCreate(); + + // sparkSession.sql("show databases").show(); + // sparkSession.sql("show tables").show(); + + // this shows "hive" so the hive is set properly, not in-memory due to the lack of hive libs on classpath - this is fine + // System.out.println("catalog impl: " + sparkSession.conf().get("spark.sql.catalogImplementation")); + + // just to debug, shows proper number for openaire_prod but 0 for pdfaggregation + sparkSession.sql("select id from openaire_prod_20210716.publication limit 10").show(); + + // testing on a non empty table + Dataset pubsResult = sparkSession.sql("select id, originalid from openaire_prod_20210716.publication"); + pubsResult.cache(); + Dataset idResults = pubsResult.select(pubsResult.col("id")); + System.out.println("number of imported publication results: " + idResults.count()); + idResults.write().csv(params.outputPath+"_csv_test"); + // end of testing + + Dataset result = sparkSession.sql("select id, actual_url, mimetype, size, hash from " + params.inputTableName); + // number of imported records is 0; + System.out.println("number of imported results: " + result.count()); + + JavaRDD documentContentUrl = buildOutputRecord(result, sparkSession); + + avroSaver.saveJavaRDD(documentContentUrl, DocumentContentUrl.SCHEMA$, params.outputPath); + } + + private static JavaRDD buildOutputRecord(Dataset source, SparkSession spark) { + Dataset resultDs = source.select( + concat(lit("50|"), col("id")).as("id"), + col("actual_url").as("url"), + col("mimetype").as("mimeType"), + col("size").cast("long").divide(1024).as("contentSizeKB"), + col("hash").as("contentChecksum") + ); + return new AvroDataFrameSupport(spark).toDS(resultDs, DocumentContentUrl.class).toJavaRDD(); + } + + @Parameters(separators = "=") + private static class HiveBasedDocumentContentUrlImporterJobParameters { + + @Parameter(names = "-inputTableName", required = true) + private String inputTableName; + + @Parameter(names = "-hiveMetastoreUris", required = true) + private String hiveMetastoreUris; + + @Parameter(names = "-outputPath", required = true) + private String outputPath; + + @Parameter(names = "-outputReportPath", required = true) + private String outputReportPath; + } + +} diff --git a/iis-wf/iis-wf-import/src/main/resources/eu/dnetlib/iis/wf/importer/content_url/core_parquet/oozie_app/workflow.xml b/iis-wf/iis-wf-import/src/main/resources/eu/dnetlib/iis/wf/importer/content_url/core_parquet/oozie_app/workflow.xml new file mode 100644 index 000000000..a47982880 --- /dev/null +++ b/iis-wf/iis-wf-import/src/main/resources/eu/dnetlib/iis/wf/importer/content_url/core_parquet/oozie_app/workflow.xml @@ -0,0 +1,121 @@ + + + + + input_table_name + hive input table name + + + hive_warehouse_dir + /user/hive/warehouse + + + hive_metastore_uris + thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083 + + + + output + DocumentContentUrl avro datastore output + + + output_report_root_path + base directory for storing reports + + + output_report_relative_path + import_content_url_hive + directory for storing report (relative to output_report_root_path) + + + sparkDriverMemory + memory for driver process + + + sparkExecutorMemory + memory for individual executor + + + sparkExecutorCores + number of cores used by single executor + + + oozieActionShareLibForSpark2 + oozie action sharelib for spark 2.* + + + spark2ExtraListeners + com.cloudera.spark.lineage.NavigatorAppListener + spark 2.* extra listeners classname + + + spark2SqlQueryExecutionListeners + com.cloudera.spark.lineage.NavigatorQueryListener + spark 2.* sql query execution listeners classname + + + spark2YarnHistoryServerAddress + spark 2.* yarn history server address + + + spark2EventLogDir + spark 2.* event log dir location + + + + + ${jobTracker} + ${nameNode} + + + mapreduce.job.queuename + ${queueName} + + + oozie.launcher.mapred.job.queue.name + ${oozieLauncherQueueName} + + + oozie.action.sharelib.for.spark + ${oozieActionShareLibForSpark2} + + + + + + + + + yarn-cluster + cluster + import-content-url + eu.dnetlib.iis.wf.importer.content.HiveBasedDocumentContentUrlImporterJob + ${oozieTopWfApplicationPath}/lib/iis-wf-import-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.warehouse.dir=${hive_warehouse_dir} + + -inputTableName=${input_table_name} + -hiveMetastoreUris=${hive_metastore_uris} + -outputPath=${output} + -outputReportPath=${output_report_root_path}/${output_report_relative_path} + + + + + + + Unfortunately, the process failed -- error message: + [${wf:errorMessage(wf:lastErrorNode())}] + + + + + + From 90b86d0e49c08e183bc115033e1c7ace7fa97e10 Mon Sep 17 00:00:00 2001 From: Marek Horst Date: Tue, 3 Aug 2021 16:27:31 +0200 Subject: [PATCH 2/9] Closes #1298: Refactor the way IIS imports contents by dumping ObjectStore in favour of newly introduced PDF aggregation service Refined implementation. Subworkflow still to be integrated with IIS main workflow. --- ...iveBasedDocumentContentUrlImporterJob.java | 67 +++++++++---------- .../core_parquet/oozie_app/workflow.xml | 13 ++-- 2 files changed, 36 insertions(+), 44 deletions(-) diff --git a/iis-wf/iis-wf-import/src/main/java/eu/dnetlib/iis/wf/importer/content/HiveBasedDocumentContentUrlImporterJob.java b/iis-wf/iis-wf-import/src/main/java/eu/dnetlib/iis/wf/importer/content/HiveBasedDocumentContentUrlImporterJob.java index bf5aac16e..0ff377a46 100644 --- a/iis-wf/iis-wf-import/src/main/java/eu/dnetlib/iis/wf/importer/content/HiveBasedDocumentContentUrlImporterJob.java +++ b/iis-wf/iis-wf-import/src/main/java/eu/dnetlib/iis/wf/importer/content/HiveBasedDocumentContentUrlImporterJob.java @@ -7,18 +7,22 @@ import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; -import org.apache.spark.sql.functions; import com.beust.jcommander.JCommander; import com.beust.jcommander.Parameter; import com.beust.jcommander.Parameters; +import com.google.common.collect.Lists; -import eu.dnetlib.iis.common.schemas.IdentifierMapping; +import eu.dnetlib.iis.common.InfoSpaceConstants; +import eu.dnetlib.iis.common.java.io.HdfsUtils; +import eu.dnetlib.iis.common.report.ReportEntryFactory; +import eu.dnetlib.iis.common.schemas.ReportEntry; import eu.dnetlib.iis.common.spark.SparkConfHelper; -import eu.dnetlib.iis.importer.auxiliary.schemas.DocumentContentUrl; import eu.dnetlib.iis.common.spark.avro.AvroDataFrameSupport; +import eu.dnetlib.iis.importer.auxiliary.schemas.DocumentContentUrl; import pl.edu.icm.sparkutils.avro.SparkAvroSaver; @@ -32,6 +36,8 @@ public class HiveBasedDocumentContentUrlImporterJob { private static SparkAvroSaver avroSaver = new SparkAvroSaver(); + private static final String COUNTER_IMPORTED_RECORDS_TOTAL = "import.content.urls.fromAggregator"; + public static void main(String[] args) throws Exception { HiveBasedDocumentContentUrlImporterJobParameters params = new HiveBasedDocumentContentUrlImporterJobParameters(); @@ -39,44 +45,35 @@ public static void main(String[] args) throws Exception { jcommander.parse(args); SparkConf conf = SparkConfHelper.withKryo(new SparkConf()); - // conf.registerKryoClasses(OafModelUtils.provideOafClasses()); conf.set("hive.metastore.uris", params.hiveMetastoreUris); - SparkSession sparkSession = SparkSession.builder() - //.config(SparkConfHelper.withKryo(conf)) - .config(conf) - .enableHiveSupport() - .getOrCreate(); - - // sparkSession.sql("show databases").show(); - // sparkSession.sql("show tables").show(); - - // this shows "hive" so the hive is set properly, not in-memory due to the lack of hive libs on classpath - this is fine - // System.out.println("catalog impl: " + sparkSession.conf().get("spark.sql.catalogImplementation")); - - // just to debug, shows proper number for openaire_prod but 0 for pdfaggregation - sparkSession.sql("select id from openaire_prod_20210716.publication limit 10").show(); - - // testing on a non empty table - Dataset pubsResult = sparkSession.sql("select id, originalid from openaire_prod_20210716.publication"); - pubsResult.cache(); - Dataset idResults = pubsResult.select(pubsResult.col("id")); - System.out.println("number of imported publication results: " + idResults.count()); - idResults.write().csv(params.outputPath+"_csv_test"); - // end of testing - - Dataset result = sparkSession.sql("select id, actual_url, mimetype, size, hash from " + params.inputTableName); - // number of imported records is 0; - System.out.println("number of imported results: " + result.count()); - - JavaRDD documentContentUrl = buildOutputRecord(result, sparkSession); - - avroSaver.saveJavaRDD(documentContentUrl, DocumentContentUrl.SCHEMA$, params.outputPath); + try (SparkSession sparkSession = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()) { + + HdfsUtils.remove(sparkSession.sparkContext().hadoopConfiguration(), params.outputPath); + HdfsUtils.remove(sparkSession.sparkContext().hadoopConfiguration(), params.outputReportPath); + + Dataset result = sparkSession.sql("select id, actual_url, mimetype, size, hash from " + params.inputTableName); + + JavaRDD documentContentUrl = buildOutputRecord(result, sparkSession); + documentContentUrl.cache(); + + JavaRDD reports = generateReportEntries(sparkSession, documentContentUrl.count()); + + avroSaver.saveJavaRDD(documentContentUrl, DocumentContentUrl.SCHEMA$, params.outputPath); + avroSaver.saveJavaRDD(reports, ReportEntry.SCHEMA$, params.outputReportPath); + } + } + + private static JavaRDD generateReportEntries(SparkSession sparkSession, long recordsCount) { + return sparkSession.createDataset( + Lists.newArrayList( + ReportEntryFactory.createCounterReportEntry(COUNTER_IMPORTED_RECORDS_TOTAL, recordsCount)), + Encoders.kryo(ReportEntry.class)).javaRDD(); } private static JavaRDD buildOutputRecord(Dataset source, SparkSession spark) { Dataset resultDs = source.select( - concat(lit("50|"), col("id")).as("id"), + concat(lit(InfoSpaceConstants.ROW_PREFIX_RESULT), col("id")).as("id"), col("actual_url").as("url"), col("mimetype").as("mimeType"), col("size").cast("long").divide(1024).as("contentSizeKB"), diff --git a/iis-wf/iis-wf-import/src/main/resources/eu/dnetlib/iis/wf/importer/content_url/core_parquet/oozie_app/workflow.xml b/iis-wf/iis-wf-import/src/main/resources/eu/dnetlib/iis/wf/importer/content_url/core_parquet/oozie_app/workflow.xml index a47982880..55962fee9 100644 --- a/iis-wf/iis-wf-import/src/main/resources/eu/dnetlib/iis/wf/importer/content_url/core_parquet/oozie_app/workflow.xml +++ b/iis-wf/iis-wf-import/src/main/resources/eu/dnetlib/iis/wf/importer/content_url/core_parquet/oozie_app/workflow.xml @@ -1,17 +1,13 @@ - + input_table_name hive input table name - - hive_warehouse_dir - /user/hive/warehouse - hive_metastore_uris - thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083 + hive metastore locations @@ -24,7 +20,7 @@ output_report_relative_path - import_content_url_hive + import_content_url_from_aggregator directory for storing report (relative to output_report_root_path) @@ -88,7 +84,7 @@ yarn-cluster cluster - import-content-url + import-content-url_core eu.dnetlib.iis.wf.importer.content.HiveBasedDocumentContentUrlImporterJob ${oozieTopWfApplicationPath}/lib/iis-wf-import-${projectVersion}.jar @@ -99,7 +95,6 @@ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.warehouse.dir=${hive_warehouse_dir} -inputTableName=${input_table_name} -hiveMetastoreUris=${hive_metastore_uris} From e94c5bb16e64aaa5bddc294a09c1b93076b4bee2 Mon Sep 17 00:00:00 2001 From: Marek Horst Date: Tue, 19 Apr 2022 14:47:19 +0200 Subject: [PATCH 3/9] Closes #1298: Refactor the way IIS imports contents by dumping ObjectStore in favour of newly introduced PDF aggregation service Limiting entries to the ones with actual_url not null in other not to violate the DocumentContentURL schema constraints. --- .../content/HiveBasedDocumentContentUrlImporterJob.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/iis-wf/iis-wf-import/src/main/java/eu/dnetlib/iis/wf/importer/content/HiveBasedDocumentContentUrlImporterJob.java b/iis-wf/iis-wf-import/src/main/java/eu/dnetlib/iis/wf/importer/content/HiveBasedDocumentContentUrlImporterJob.java index 0ff377a46..45a802e65 100644 --- a/iis-wf/iis-wf-import/src/main/java/eu/dnetlib/iis/wf/importer/content/HiveBasedDocumentContentUrlImporterJob.java +++ b/iis-wf/iis-wf-import/src/main/java/eu/dnetlib/iis/wf/importer/content/HiveBasedDocumentContentUrlImporterJob.java @@ -52,7 +52,8 @@ public static void main(String[] args) throws Exception { HdfsUtils.remove(sparkSession.sparkContext().hadoopConfiguration(), params.outputPath); HdfsUtils.remove(sparkSession.sparkContext().hadoopConfiguration(), params.outputReportPath); - Dataset result = sparkSession.sql("select id, actual_url, mimetype, size, hash from " + params.inputTableName); + Dataset result = sparkSession.sql("select id, actual_url, mimetype, size, hash from " + + params.inputTableName + " where actual_url is not null"); JavaRDD documentContentUrl = buildOutputRecord(result, sparkSession); documentContentUrl.cache(); From 3031c790b38f876f2d102b1161f4309d4e5fe9c9 Mon Sep 17 00:00:00 2001 From: Marek Horst Date: Tue, 19 Apr 2022 16:47:46 +0200 Subject: [PATCH 4/9] Closes #1298: Refactor the way IIS imports contents by dumping ObjectStore in favour of newly introduced PDF aggregation service Renaming `actual_url` to `location` as a column reference pointing to S3 content location. --- .../content/HiveBasedDocumentContentUrlImporterJob.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/iis-wf/iis-wf-import/src/main/java/eu/dnetlib/iis/wf/importer/content/HiveBasedDocumentContentUrlImporterJob.java b/iis-wf/iis-wf-import/src/main/java/eu/dnetlib/iis/wf/importer/content/HiveBasedDocumentContentUrlImporterJob.java index 45a802e65..afa2b9bef 100644 --- a/iis-wf/iis-wf-import/src/main/java/eu/dnetlib/iis/wf/importer/content/HiveBasedDocumentContentUrlImporterJob.java +++ b/iis-wf/iis-wf-import/src/main/java/eu/dnetlib/iis/wf/importer/content/HiveBasedDocumentContentUrlImporterJob.java @@ -52,8 +52,8 @@ public static void main(String[] args) throws Exception { HdfsUtils.remove(sparkSession.sparkContext().hadoopConfiguration(), params.outputPath); HdfsUtils.remove(sparkSession.sparkContext().hadoopConfiguration(), params.outputReportPath); - Dataset result = sparkSession.sql("select id, actual_url, mimetype, size, hash from " - + params.inputTableName + " where actual_url is not null"); + Dataset result = sparkSession.sql("select id, location, mimetype, size, hash from " + + params.inputTableName + " where location is not null"); JavaRDD documentContentUrl = buildOutputRecord(result, sparkSession); documentContentUrl.cache(); @@ -75,7 +75,7 @@ private static JavaRDD generateReportEntries(SparkSession sparkSess private static JavaRDD buildOutputRecord(Dataset source, SparkSession spark) { Dataset resultDs = source.select( concat(lit(InfoSpaceConstants.ROW_PREFIX_RESULT), col("id")).as("id"), - col("actual_url").as("url"), + col("location").as("url"), col("mimetype").as("mimeType"), col("size").cast("long").divide(1024).as("contentSizeKB"), col("hash").as("contentChecksum") From 36606ab6352a94dd4cf8128d769d914e8d50bb37 Mon Sep 17 00:00:00 2001 From: Marek Horst Date: Wed, 20 Apr 2022 11:01:11 +0200 Subject: [PATCH 5/9] Closes #1298: Refactor the way IIS imports contents by dumping ObjectStore in favour of newly introduced PDF aggregation service Enabling the new hive-based content metadata importer with content_url importer uber workflow and metadataextraction cache builder. --- .../content_url/chain/oozie_app/import.txt | 2 +- .../content_url/chain/oozie_app/workflow.xml | 23 +++++-------------- .../cache/builder/oozie_app/workflow.xml | 17 +++++++------- 3 files changed, 15 insertions(+), 27 deletions(-) diff --git a/iis-wf/iis-wf-import/src/main/resources/eu/dnetlib/iis/wf/importer/content_url/chain/oozie_app/import.txt b/iis-wf/iis-wf-import/src/main/resources/eu/dnetlib/iis/wf/importer/content_url/chain/oozie_app/import.txt index 72c9899d8..f2d1db2e8 100644 --- a/iis-wf/iis-wf-import/src/main/resources/eu/dnetlib/iis/wf/importer/content_url/chain/oozie_app/import.txt +++ b/iis-wf/iis-wf-import/src/main/resources/eu/dnetlib/iis/wf/importer/content_url/chain/oozie_app/import.txt @@ -1,5 +1,5 @@ ## This is a classpath-based import file (this header is required) -import_content_url_core classpath eu/dnetlib/iis/wf/importer/content_url/core/oozie_app +import_content_url_core classpath eu/dnetlib/iis/wf/importer/content_url/core_parquet/oozie_app import_content_url_dedup classpath eu/dnetlib/iis/wf/importer/content_url/dedup/oozie_app transformers_idreplacer classpath eu/dnetlib/iis/wf/transformers/idreplacer/oozie_app transformers_common_existencefilter classpath eu/dnetlib/iis/wf/transformers/common/existencefilter/oozie_app diff --git a/iis-wf/iis-wf-import/src/main/resources/eu/dnetlib/iis/wf/importer/content_url/chain/oozie_app/workflow.xml b/iis-wf/iis-wf-import/src/main/resources/eu/dnetlib/iis/wf/importer/content_url/chain/oozie_app/workflow.xml index 1ff3212ff..bf458842f 100644 --- a/iis-wf/iis-wf-import/src/main/resources/eu/dnetlib/iis/wf/importer/content_url/chain/oozie_app/workflow.xml +++ b/iis-wf/iis-wf-import/src/main/resources/eu/dnetlib/iis/wf/importer/content_url/chain/oozie_app/workflow.xml @@ -3,24 +3,13 @@ - objectstore_facade_factory_classname - eu.dnetlib.iis.wf.importer.facade.WebServiceObjectStoreFacadeFactory - ServiceFacadeFactory implementation class name producing eu.dnetlib.iis.wf.importer.facade.ObjectStoreFacade + input_table_name + hive input table name + + + hive_metastore_uris + hive metastore locations - - objectstore_service_location - object store service location to retrieve PDF/text contents from - - - approved_objectstores_csv - $UNDEFINED$ - CSV of approved object stores - - - blacklisted_objectstores_csv - $UNDEFINED$ - CSV of blacklisted object stores - mimetypes_pdf pdf mime types diff --git a/iis-wf/iis-wf-metadataextraction/src/main/resources/eu/dnetlib/iis/wf/metadataextraction/cache/builder/oozie_app/workflow.xml b/iis-wf/iis-wf-metadataextraction/src/main/resources/eu/dnetlib/iis/wf/metadataextraction/cache/builder/oozie_app/workflow.xml index a3d63b2eb..7977e7bc6 100644 --- a/iis-wf/iis-wf-metadataextraction/src/main/resources/eu/dnetlib/iis/wf/metadataextraction/cache/builder/oozie_app/workflow.xml +++ b/iis-wf/iis-wf-metadataextraction/src/main/resources/eu/dnetlib/iis/wf/metadataextraction/cache/builder/oozie_app/workflow.xml @@ -20,15 +20,14 @@ import.metadataExtraction cache report properties prefix, to be set dynamically according to internal app_path - - - objectstore_service_location - object store service location required for content retrieval - - - approved_objectstores_csv - CSV list of object stores identifiers to be processed - + + input_table_name + hive input table name + + + hive_metastore_uris + hive metastore locations + mimetypes_pdf pdf,application/pdf From 8cf1b38bf1534b82353e859a41d474700cd97050 Mon Sep 17 00:00:00 2001 From: Marek Horst Date: Wed, 20 Apr 2022 12:15:46 +0200 Subject: [PATCH 6/9] Closes #1298: Refactor the way IIS imports contents by dumping ObjectStore in favour of newly introduced PDF aggregation service Excluding conflicting jackson libraries. --- iis-wf/iis-wf-metadataextraction/pom.xml | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/iis-wf/iis-wf-metadataextraction/pom.xml b/iis-wf/iis-wf-metadataextraction/pom.xml index 472a176e3..358a71b1f 100644 --- a/iis-wf/iis-wf-metadataextraction/pom.xml +++ b/iis-wf/iis-wf-metadataextraction/pom.xml @@ -38,6 +38,23 @@ ${project.version} test-jar test + + + + com.fasterxml.jackson.core + jackson-core + + + + com.fasterxml.jackson.core + jackson-databind + + + + com.fasterxml.jackson.core + jackson-annotations + + From 2439e414f8b591fc81df65a952b2813c726fa5c8 Mon Sep 17 00:00:00 2001 From: Marek Horst Date: Wed, 20 Apr 2022 14:20:27 +0200 Subject: [PATCH 7/9] Closes #1342: Fix content_url_chain subworkflow actions order for cache builder workflow --- .../iis/wf/importer/content_url/chain/oozie_app/workflow.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/iis-wf/iis-wf-import/src/main/resources/eu/dnetlib/iis/wf/importer/content_url/chain/oozie_app/workflow.xml b/iis-wf/iis-wf-import/src/main/resources/eu/dnetlib/iis/wf/importer/content_url/chain/oozie_app/workflow.xml index bf458842f..9179762ad 100644 --- a/iis-wf/iis-wf-import/src/main/resources/eu/dnetlib/iis/wf/importer/content_url/chain/oozie_app/workflow.xml +++ b/iis-wf/iis-wf-import/src/main/resources/eu/dnetlib/iis/wf/importer/content_url/chain/oozie_app/workflow.xml @@ -228,7 +228,7 @@ - ${input_id eq "$UNDEFINED$"} + ${input_id eq "$UNDEFINED$"} From d481c39efdea4f3b414130f03264c69c0105d83d Mon Sep 17 00:00:00 2001 From: Marek Horst Date: Wed, 29 Jun 2022 11:35:06 +0200 Subject: [PATCH 8/9] Closes #1298: Refactor the way IIS imports contents by dumping ObjectStore in favour of newly introduced PDF aggregation service The following new properties are supported: * `import_content_pdfaggregation_table_name` to be specified in at runtime 'dbname.tablename' format to indicate both hive database and table * `import_content_pdfaggregation_hive_metastore_uris` with URIs pointing to the hive metastore utilized by the PDF aggregation subsystem, to be defined statically in IIS environment (default-config.xml file) New hive-based PDF aggregation service support is automatically enabled by providing explicit `import_content_pdfaggregation_table_name` parameter value at runtime. When the parameter is unspecified then content importer module works in legacy mode (objectstore compatible). --- .../iis/common/spark/SparkSessionFactory.java | 7 + .../iis/common/spark/SparkSessionSupport.java | 18 ++ .../spark/TestWithSharedSparkSession.java | 23 ++- iis-wf/iis-wf-import/pom.xml | 12 ++ ...iveBasedDocumentContentUrlImporterJob.java | 13 +- .../content_url/chain/oozie_app/import.txt | 3 +- .../content_url/chain/oozie_app/workflow.xml | 54 ++++- .../core_parquet/oozie_app/workflow.xml | 2 +- ...asedDocumentContentUrlImporterJobTest.java | 128 ++++++++++++ .../iis/wf/importer/content/WorkflowTest.java | 12 +- .../unit_test/document_content_url.json | 1 + .../unit_test/document_content_url_empty.json | 0 .../content/output/unit_test/report.json | 1 + .../output/unit_test/report_empty.json | 1 + .../oozie_app/import.txt | 0 .../oozie_app/initdb.q | 11 + .../oozie_app/teardowndb.q | 2 + .../oozie_app/workflow.xml | 195 ++++++++++++++++++ .../oozie_app/import.txt | 2 + .../oozie_app/workflow.xml | 4 +- .../output/report_from_agg_subsystem.json | 5 + .../wf/primary/import/oozie_app/workflow.xml | 1 + .../wf/primary/main/oozie_app/workflow.xml | 28 ++- pom.xml | 7 + 24 files changed, 517 insertions(+), 13 deletions(-) create mode 100644 iis-wf/iis-wf-import/src/test/java/eu/dnetlib/iis/wf/importer/content/HiveBasedDocumentContentUrlImporterJobTest.java create mode 100644 iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content/output/unit_test/document_content_url.json create mode 100644 iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content/output/unit_test/document_content_url_empty.json create mode 100644 iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content/output/unit_test/report.json create mode 100644 iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content/output/unit_test/report_empty.json rename iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content_url/chain/{sampletest => agg_subsystem_based_importer}/oozie_app/import.txt (100%) create mode 100644 iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content_url/chain/agg_subsystem_based_importer/oozie_app/initdb.q create mode 100644 iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content_url/chain/agg_subsystem_based_importer/oozie_app/teardowndb.q create mode 100644 iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content_url/chain/agg_subsystem_based_importer/oozie_app/workflow.xml create mode 100644 iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content_url/chain/objectstore_based_importer/oozie_app/import.txt rename iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content_url/chain/{sampletest => objectstore_based_importer}/oozie_app/workflow.xml (99%) create mode 100644 iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content_url/data/output/report_from_agg_subsystem.json diff --git a/iis-common/src/main/java/eu/dnetlib/iis/common/spark/SparkSessionFactory.java b/iis-common/src/main/java/eu/dnetlib/iis/common/spark/SparkSessionFactory.java index c743546ca..31ca4dc2b 100644 --- a/iis-common/src/main/java/eu/dnetlib/iis/common/spark/SparkSessionFactory.java +++ b/iis-common/src/main/java/eu/dnetlib/iis/common/spark/SparkSessionFactory.java @@ -13,4 +13,11 @@ public static SparkSession withConfAndKryo(SparkConf conf) { .config(SparkConfHelper.withKryo(conf)) .getOrCreate(); } + + public static SparkSession withHiveEnabledConfAndKryo(SparkConf conf) { + return SparkSession.builder() + .config(SparkConfHelper.withKryo(conf)) + .enableHiveSupport() + .getOrCreate(); + } } diff --git a/iis-common/src/main/java/eu/dnetlib/iis/common/spark/SparkSessionSupport.java b/iis-common/src/main/java/eu/dnetlib/iis/common/spark/SparkSessionSupport.java index bef8d45d6..d15cd60fc 100644 --- a/iis-common/src/main/java/eu/dnetlib/iis/common/spark/SparkSessionSupport.java +++ b/iis-common/src/main/java/eu/dnetlib/iis/common/spark/SparkSessionSupport.java @@ -36,6 +36,24 @@ public static void runWithSparkSession(SparkConf conf, job); } + /** + * Runs a given job using SparkSession created using default builder and supplied SparkConf. Enables Hive support. + * Stops SparkSession when SparkSession is shared e.g. created in tests. Allows to reuse SparkSession created externally. + * + * @param conf SparkConf instance + * @param isSparkSessionShared When true will not stop SparkSession + * @param job Job using constructed SparkSession + */ + public static void runWithHiveEnabledSparkSession(SparkConf conf, + Boolean isSparkSessionShared, + Job job) { + runWithSparkSession( + SparkSessionFactory::withHiveEnabledConfAndKryo, + conf, + isSparkSessionShared, + job); + } + /** * Runs a given job using SparkSession created using supplied builder and supplied SparkConf. Stops SparkSession * when SparkSession is shared e.g. created in tests. Allows to reuse SparkSession created externally. diff --git a/iis-common/src/test/java/eu/dnetlib/iis/common/spark/TestWithSharedSparkSession.java b/iis-common/src/test/java/eu/dnetlib/iis/common/spark/TestWithSharedSparkSession.java index bb7b20593..0723d0385 100644 --- a/iis-common/src/test/java/eu/dnetlib/iis/common/spark/TestWithSharedSparkSession.java +++ b/iis-common/src/test/java/eu/dnetlib/iis/common/spark/TestWithSharedSparkSession.java @@ -6,6 +6,8 @@ import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeEach; +import org.apache.spark.sql.SparkSession.Builder; + import java.util.Objects; /** @@ -15,7 +17,22 @@ public class TestWithSharedSparkSession { private static SparkSession _spark; protected boolean initialized = false; + protected final boolean hiveSupportEnabled; + /** + * Default constructor with hive support in spark session disabled. + */ + public TestWithSharedSparkSession() { + this(false); + } + + /** + * @param hiveSupportEnabled indicates whether hive support should be enabled in spark session + */ + public TestWithSharedSparkSession(boolean hiveSupportEnabled) { + this.hiveSupportEnabled = hiveSupportEnabled; + } + public SparkSession spark() { return _spark; } @@ -31,7 +48,11 @@ public void beforeEach() { conf.set("spark.driver.host", "localhost"); conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); conf.set("spark.kryo.registrator", "pl.edu.icm.sparkutils.avro.AvroCompatibleKryoRegistrator"); - _spark = SparkSession.builder().config(conf).getOrCreate(); + Builder builder = SparkSession.builder().config(conf); + if (hiveSupportEnabled) { + builder = builder.enableHiveSupport(); + } + _spark = builder.getOrCreate(); } } diff --git a/iis-wf/iis-wf-import/pom.xml b/iis-wf/iis-wf-import/pom.xml index a3af82cb7..f6c86dac2 100644 --- a/iis-wf/iis-wf-import/pom.xml +++ b/iis-wf/iis-wf-import/pom.xml @@ -132,6 +132,18 @@ org.apache.spark spark-avro_2.11 + + + org.apache.spark + spark-hive_2.11 + + + + eigenbase + eigenbase-properties + + + org.apache.httpcomponents diff --git a/iis-wf/iis-wf-import/src/main/java/eu/dnetlib/iis/wf/importer/content/HiveBasedDocumentContentUrlImporterJob.java b/iis-wf/iis-wf-import/src/main/java/eu/dnetlib/iis/wf/importer/content/HiveBasedDocumentContentUrlImporterJob.java index afa2b9bef..687158df1 100644 --- a/iis-wf/iis-wf-import/src/main/java/eu/dnetlib/iis/wf/importer/content/HiveBasedDocumentContentUrlImporterJob.java +++ b/iis-wf/iis-wf-import/src/main/java/eu/dnetlib/iis/wf/importer/content/HiveBasedDocumentContentUrlImporterJob.java @@ -1,9 +1,11 @@ package eu.dnetlib.iis.wf.importer.content; +import static eu.dnetlib.iis.common.spark.SparkSessionSupport.runWithHiveEnabledSparkSession; import static org.apache.spark.sql.functions.col; import static org.apache.spark.sql.functions.concat; import static org.apache.spark.sql.functions.lit; +import org.apache.commons.lang3.StringUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.sql.Dataset; @@ -45,9 +47,11 @@ public static void main(String[] args) throws Exception { jcommander.parse(args); SparkConf conf = SparkConfHelper.withKryo(new SparkConf()); - conf.set("hive.metastore.uris", params.hiveMetastoreUris); + if (!StringUtils.isEmpty(params.hiveMetastoreUris)) { + conf.set("hive.metastore.uris", params.hiveMetastoreUris); + } - try (SparkSession sparkSession = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()) { + runWithHiveEnabledSparkSession(conf, params.isSparkSessionShared, sparkSession -> { HdfsUtils.remove(sparkSession.sparkContext().hadoopConfiguration(), params.outputPath); HdfsUtils.remove(sparkSession.sparkContext().hadoopConfiguration(), params.outputReportPath); @@ -62,7 +66,7 @@ public static void main(String[] args) throws Exception { avroSaver.saveJavaRDD(documentContentUrl, DocumentContentUrl.SCHEMA$, params.outputPath); avroSaver.saveJavaRDD(reports, ReportEntry.SCHEMA$, params.outputReportPath); - } + }); } private static JavaRDD generateReportEntries(SparkSession sparkSession, long recordsCount) { @@ -86,6 +90,9 @@ private static JavaRDD buildOutputRecord(Dataset source @Parameters(separators = "=") private static class HiveBasedDocumentContentUrlImporterJobParameters { + @Parameter(names = "-sharedSparkSession") + private Boolean isSparkSessionShared = Boolean.FALSE; + @Parameter(names = "-inputTableName", required = true) private String inputTableName; diff --git a/iis-wf/iis-wf-import/src/main/resources/eu/dnetlib/iis/wf/importer/content_url/chain/oozie_app/import.txt b/iis-wf/iis-wf-import/src/main/resources/eu/dnetlib/iis/wf/importer/content_url/chain/oozie_app/import.txt index f2d1db2e8..6cb19bab5 100644 --- a/iis-wf/iis-wf-import/src/main/resources/eu/dnetlib/iis/wf/importer/content_url/chain/oozie_app/import.txt +++ b/iis-wf/iis-wf-import/src/main/resources/eu/dnetlib/iis/wf/importer/content_url/chain/oozie_app/import.txt @@ -1,5 +1,6 @@ ## This is a classpath-based import file (this header is required) -import_content_url_core classpath eu/dnetlib/iis/wf/importer/content_url/core_parquet/oozie_app +import_content_url_core classpath eu/dnetlib/iis/wf/importer/content_url/core/oozie_app +import_content_url_core_parquet classpath eu/dnetlib/iis/wf/importer/content_url/core_parquet/oozie_app import_content_url_dedup classpath eu/dnetlib/iis/wf/importer/content_url/dedup/oozie_app transformers_idreplacer classpath eu/dnetlib/iis/wf/transformers/idreplacer/oozie_app transformers_common_existencefilter classpath eu/dnetlib/iis/wf/transformers/common/existencefilter/oozie_app diff --git a/iis-wf/iis-wf-import/src/main/resources/eu/dnetlib/iis/wf/importer/content_url/chain/oozie_app/workflow.xml b/iis-wf/iis-wf-import/src/main/resources/eu/dnetlib/iis/wf/importer/content_url/chain/oozie_app/workflow.xml index 9179762ad..e0d417adb 100644 --- a/iis-wf/iis-wf-import/src/main/resources/eu/dnetlib/iis/wf/importer/content_url/chain/oozie_app/workflow.xml +++ b/iis-wf/iis-wf-import/src/main/resources/eu/dnetlib/iis/wf/importer/content_url/chain/oozie_app/workflow.xml @@ -2,14 +2,40 @@ + input_table_name + $UNDEFINED$ hive input table name hive_metastore_uris + $UNDEFINED$ hive metastore locations + + + + objectstore_facade_factory_classname + eu.dnetlib.iis.wf.importer.facade.WebServiceObjectStoreFacadeFactory + ServiceFacadeFactory implementation class name producing eu.dnetlib.iis.wf.importer.facade.ObjectStoreFacade + + + objectstore_service_location + $UNDEFINED$ + object store service location to retrieve PDF/text contents from + + + approved_objectstores_csv + $UNDEFINED$ + CSV of approved object stores + + + blacklisted_objectstores_csv + $UNDEFINED$ + CSV of blacklisted object stores + + mimetypes_pdf pdf mime types @@ -135,9 +161,16 @@ org.apache.avro.Schema.Type.STRING - + + + + + ${input_table_name eq "$UNDEFINED$"} + + + @@ -158,6 +191,25 @@ + + + ${wf:appPath()}/import_content_url_core_parquet + + + + workingDir + ${workingDir}/import_content_url_core/working_dir + + + output + ${workingDir}/imported-urls + + + + + + + eu.dnetlib.iis.common.java.ProcessWrapper diff --git a/iis-wf/iis-wf-import/src/main/resources/eu/dnetlib/iis/wf/importer/content_url/core_parquet/oozie_app/workflow.xml b/iis-wf/iis-wf-import/src/main/resources/eu/dnetlib/iis/wf/importer/content_url/core_parquet/oozie_app/workflow.xml index 55962fee9..d60bb44e9 100644 --- a/iis-wf/iis-wf-import/src/main/resources/eu/dnetlib/iis/wf/importer/content_url/core_parquet/oozie_app/workflow.xml +++ b/iis-wf/iis-wf-import/src/main/resources/eu/dnetlib/iis/wf/importer/content_url/core_parquet/oozie_app/workflow.xml @@ -1,4 +1,4 @@ - + diff --git a/iis-wf/iis-wf-import/src/test/java/eu/dnetlib/iis/wf/importer/content/HiveBasedDocumentContentUrlImporterJobTest.java b/iis-wf/iis-wf-import/src/test/java/eu/dnetlib/iis/wf/importer/content/HiveBasedDocumentContentUrlImporterJobTest.java new file mode 100644 index 000000000..8ed3129ae --- /dev/null +++ b/iis-wf/iis-wf-import/src/test/java/eu/dnetlib/iis/wf/importer/content/HiveBasedDocumentContentUrlImporterJobTest.java @@ -0,0 +1,128 @@ +package eu.dnetlib.iis.wf.importer.content; + +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.nio.file.Path; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import eu.dnetlib.iis.common.ClassPathResourceProvider; +import eu.dnetlib.iis.common.schemas.ReportEntry; +import eu.dnetlib.iis.common.spark.TestWithSharedSparkSession; +import eu.dnetlib.iis.common.utils.AvroAssertTestUtil; +import eu.dnetlib.iis.importer.auxiliary.schemas.DocumentContentUrl; + +/** + * @author mhorst + * + */ +public class HiveBasedDocumentContentUrlImporterJobTest extends TestWithSharedSparkSession { + + @TempDir + public Path workingDir; + + private Path outputDir; + private Path outputReportDir; + + private HiveBasedDocumentContentUrlImporterJobTest() { + // enabling hive support in spark session instantiated for this set of tests + super(true); + } + + @BeforeEach + public void beforeEach() { + super.beforeEach(); + + outputDir = workingDir.resolve("output"); + outputReportDir = workingDir.resolve("output_report"); + } + + @Test + public void testImportFromNonExistingTable() throws Exception { + // when + String inputTableName = "nonExistingTable"; + String hiveMetastoreUris = "localhost"; + + assertThrows(RuntimeException.class, () -> HiveBasedDocumentContentUrlImporterJob.main(new String[]{ + "-sharedSparkSession", + "-inputTableName", inputTableName, + "-hiveMetastoreUris", hiveMetastoreUris, + "-outputPath", outputDir.toString(), + "-outputReportPath", outputReportDir.toString(), + })); + } + + @Test + public void testImportFromExistingLocalTable() throws Exception { + // when + String inputDbName = "test_aggregation_import_db"; + String inputTableName = "agg_payload"; + String hiveMetastoreUris = ""; + + // initializing test database + initializeLocalDatabase(inputDbName, inputTableName); + spark().sql("INSERT INTO TABLE " + inputDbName + '.' + inputTableName + + " VALUES ('od______2367::0001a50c6388e9bfcb791a924ec4b837', " + + "'s3://some_location', " + + "'application/pdf', '1024', '3a0fb023de6c8735a52ac9c1edace612')"); + + HiveBasedDocumentContentUrlImporterJob.main(new String[]{ + "-sharedSparkSession", + "-inputTableName", inputDbName + '.' + inputTableName, + "-hiveMetastoreUris", hiveMetastoreUris, + "-outputPath", outputDir.toString(), + "-outputReportPath", outputReportDir.toString(), + }); + + // then + String expectedOutputPath = ClassPathResourceProvider.getResourcePath("eu/dnetlib/iis/wf/importer/content/output/unit_test/document_content_url.json"); + String expectedReportPath = ClassPathResourceProvider.getResourcePath("eu/dnetlib/iis/wf/importer/content/output/unit_test/report.json"); + + AvroAssertTestUtil.assertEqualsWithJsonIgnoreOrder(outputDir.toString(), expectedOutputPath, DocumentContentUrl.class); + AvroAssertTestUtil.assertEqualsWithJsonIgnoreOrder(outputReportDir.toString(), expectedReportPath, ReportEntry.class); + + } + + @Test + public void testImportFromEmptyLocalTable() throws Exception { + // when + String inputDbName = "test_aggregation_import_db"; + String inputTableName = "agg_payload"; + String hiveMetastoreUris = ""; + + // initializing test database + initializeLocalDatabase(inputDbName, inputTableName); + + HiveBasedDocumentContentUrlImporterJob.main(new String[]{ + "-sharedSparkSession", + "-inputTableName", inputDbName + '.' + inputTableName, + "-hiveMetastoreUris", hiveMetastoreUris, + "-outputPath", outputDir.toString(), + "-outputReportPath", outputReportDir.toString(), + }); + + // then + String expectedOutputPath = ClassPathResourceProvider.getResourcePath("eu/dnetlib/iis/wf/importer/content/output/unit_test/document_content_url_empty.json"); + String expectedReportPath = ClassPathResourceProvider.getResourcePath("eu/dnetlib/iis/wf/importer/content/output/unit_test/report_empty.json"); + + AvroAssertTestUtil.assertEqualsWithJsonIgnoreOrder(outputDir.toString(), expectedOutputPath, DocumentContentUrl.class); + AvroAssertTestUtil.assertEqualsWithJsonIgnoreOrder(outputReportDir.toString(), expectedReportPath, ReportEntry.class); + + } + + /** + * Initializes local hive database. + * @param inputDbName database name to be initialized + * @param inputTableName table to name to be recreated + */ + private void initializeLocalDatabase(String inputDbName, String inputTableName) { + spark().sql("CREATE DATABASE IF NOT EXISTS " + inputDbName); + spark().sql("DROP TABLE IF EXISTS " + inputDbName + '.' + inputTableName); + spark().sql("CREATE TABLE " + inputDbName + '.' + inputTableName + + "(id string, location string, mimetype string, size string, hash string)" + + " STORED AS PARQUET"); + } + +} diff --git a/iis-wf/iis-wf-import/src/test/java/eu/dnetlib/iis/wf/importer/content/WorkflowTest.java b/iis-wf/iis-wf-import/src/test/java/eu/dnetlib/iis/wf/importer/content/WorkflowTest.java index 8b6c0abf4..3834ad2e9 100644 --- a/iis-wf/iis-wf-import/src/test/java/eu/dnetlib/iis/wf/importer/content/WorkflowTest.java +++ b/iis-wf/iis-wf-import/src/test/java/eu/dnetlib/iis/wf/importer/content/WorkflowTest.java @@ -13,10 +13,18 @@ public class WorkflowTest extends AbstractOozieWorkflowTestCase { @Test - public void testImportContentUrlWorkflow() { + public void testImportContentUrlWorkflowWithObjectStoreAsBackend() { OozieWorkflowTestConfiguration wfConf = new OozieWorkflowTestConfiguration(); wfConf.setTimeoutInSeconds(720); - testWorkflow("eu/dnetlib/iis/wf/importer/content_url/chain/sampletest", wfConf); + testWorkflow("eu/dnetlib/iis/wf/importer/content_url/chain/objectstore_based_importer", wfConf); + } + + //@Test + //TODO reenable this test once Hive is installed on CI Test hadoop cluster + public void testImportContentUrlWorkflowWithHiveBasedAggregationSubsystemAsBackend() { + OozieWorkflowTestConfiguration wfConf = new OozieWorkflowTestConfiguration(); + wfConf.setTimeoutInSeconds(1440); + testWorkflow("eu/dnetlib/iis/wf/importer/content_url/chain/agg_subsystem_based_importer", wfConf); } @Test diff --git a/iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content/output/unit_test/document_content_url.json b/iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content/output/unit_test/document_content_url.json new file mode 100644 index 000000000..0823f9e2c --- /dev/null +++ b/iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content/output/unit_test/document_content_url.json @@ -0,0 +1 @@ +{"id": "50|od______2367::0001a50c6388e9bfcb791a924ec4b837", "url": "s3://some_location", "mimeType": "application/pdf", "contentChecksum": "3a0fb023de6c8735a52ac9c1edace612", "contentSizeKB": 1} \ No newline at end of file diff --git a/iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content/output/unit_test/document_content_url_empty.json b/iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content/output/unit_test/document_content_url_empty.json new file mode 100644 index 000000000..e69de29bb diff --git a/iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content/output/unit_test/report.json b/iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content/output/unit_test/report.json new file mode 100644 index 000000000..a878e26e6 --- /dev/null +++ b/iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content/output/unit_test/report.json @@ -0,0 +1 @@ +{"key":"import.content.urls.fromAggregator", "type":"COUNTER", "value": "1"} diff --git a/iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content/output/unit_test/report_empty.json b/iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content/output/unit_test/report_empty.json new file mode 100644 index 000000000..6a23fd53f --- /dev/null +++ b/iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content/output/unit_test/report_empty.json @@ -0,0 +1 @@ +{"key":"import.content.urls.fromAggregator", "type":"COUNTER", "value": "0"} diff --git a/iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content_url/chain/sampletest/oozie_app/import.txt b/iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content_url/chain/agg_subsystem_based_importer/oozie_app/import.txt similarity index 100% rename from iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content_url/chain/sampletest/oozie_app/import.txt rename to iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content_url/chain/agg_subsystem_based_importer/oozie_app/import.txt diff --git a/iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content_url/chain/agg_subsystem_based_importer/oozie_app/initdb.q b/iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content_url/chain/agg_subsystem_based_importer/oozie_app/initdb.q new file mode 100644 index 000000000..a2d330944 --- /dev/null +++ b/iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content_url/chain/agg_subsystem_based_importer/oozie_app/initdb.q @@ -0,0 +1,11 @@ +CREATE DATABASE IF NOT EXISTS ${DB_NAME}; + +DROP TABLE IF EXISTS ${DB_NAME}.${TABLE_NAME}; + +CREATE TABLE ${DB_NAME}.${TABLE_NAME} (id string, location string, mimetype string, size string, hash string) STORED AS PARQUET; + +INSERT INTO TABLE ${DB_NAME}.${TABLE_NAME} VALUES + ('doaj10829873::0092cca2055a02f51c0b21b332395b09', 'http://localhost:8280/is/mvc/objectStore/retrieve.do?objectStore=doaj10829873%3A%3A0092cca2055a02f51c0b21b332395b09%3A%3A2ed5ca4d0d8f77aaf2ef4fbf12418750&objectId=doaj10829873%3A%3A0092cca2055a02f51c0b21b332395b09%3A%3A2ed5ca4d0d8f77aaf2ef4fbf12418750', 'text/html', '38912', '19a53e9ebb0c878ad088568a8cf23dc6'), + ('od______2367::0001a50c6388e9bfcb791a924ec4b837', 'http://localhost:8280/is/mvc/objectStore/retrieve.do?objectStore=od______2367%3A%3A0001a50c6388e9bfcb791a924ec4b837%3A%3A80aedf56c7fedf81758414e3b5af1f70&objectId=od______2367%3A%3A0001a50c6388e9bfcb791a924ec4b837%3A%3A80aedf56c7fedf81758414e3b5af1f70', 'application/pdf', '1024', '3a0fb023de6c8735a52ac9c1edace612'), + ('dedup1______::000015093c397516c0b1b000f38982de', 'http://localhost:8280/is/mvc/objectStore/retrieve.do?objectStore=webcrawl____%3A%3A000015093c397516c0b1b000f38982de&objectId=webcrawl____%3A%3A000015093c397516c0b1b000f38982de', 'file::WoS', '6144', 'a9af919ba339a1491a922a7fe7b2580f'), + ('od_______908::00005ab6b308ff8f0f5415be03d8cce9', 'http://localhost:8280/is/mvc/objectStore/retrieve.do?objectStore=od_______908%3A%3A00005ab6b308ff8f0f5415be03d8cce9&objectId=od_______908%3A%3A00005ab6b308ff8f0f5415be03d8cce9', 'xml', '7168', '464ac8be9b33e109e98596799cf9d05d'); \ No newline at end of file diff --git a/iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content_url/chain/agg_subsystem_based_importer/oozie_app/teardowndb.q b/iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content_url/chain/agg_subsystem_based_importer/oozie_app/teardowndb.q new file mode 100644 index 000000000..13cee61fc --- /dev/null +++ b/iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content_url/chain/agg_subsystem_based_importer/oozie_app/teardowndb.q @@ -0,0 +1,2 @@ +DROP TABLE ${DB_NAME}.${TABLE_NAME}; +DROP DATABASE ${DB_NAME}; \ No newline at end of file diff --git a/iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content_url/chain/agg_subsystem_based_importer/oozie_app/workflow.xml b/iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content_url/chain/agg_subsystem_based_importer/oozie_app/workflow.xml new file mode 100644 index 000000000..53f27519c --- /dev/null +++ b/iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content_url/chain/agg_subsystem_based_importer/oozie_app/workflow.xml @@ -0,0 +1,195 @@ + + + + + + localDbName + content_agg_test_db + + + localTableName + payload + + + + + ${jobTracker} + ${nameNode} + + + mapreduce.job.queuename + ${queueName} + + + oozie.launcher.mapred.job.queue.name + ${oozieLauncherQueueName} + + + + + + + + + + jdbc:hive2://iis-cdh5-test-m3.ocean.icm.edu.pl:10000 + + DB_NAME=${localDbName} + TABLE_NAME=${localTableName} + + + + + + + + + + + + eu.dnetlib.iis.common.java.ProcessWrapper + eu.dnetlib.iis.common.java.jsonworkflownodes.Producer + + -C{id, + eu.dnetlib.iis.common.schemas.Identifier, + eu/dnetlib/iis/wf/importer/content_url/data/input/id.json} + + -C{id_mapping, + eu.dnetlib.iis.common.schemas.IdentifierMapping, + eu/dnetlib/iis/wf/importer/content_url/data/input/id_mapping.json} + -Oid=${workingDir}/producer/id + -Oid_mapping=${workingDir}/producer/id_mapping + + + + + + + + ${wf:appPath()}/import_content_url + + + + input_table_name + ${localDbName}.${localTableName} + + + + hive_metastore_uris + thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083 + + + mimetypes_pdf + application/pdf + + + mimetypes_html + text/html + + + mimetypes_xml_pmc + xml + + + mimetypes_wos + file::WoS + + + input_id + ${workingDir}/producer/id + + + input_id_mapping + ${workingDir}/producer/id_mapping + + + output_root + ${workingDir}/out + + + output_report_root_path + ${workingDir}/report + + + output_report_relative_path + import_content_url + + + + + + + + + + + eu.dnetlib.iis.common.java.ProcessWrapper + + eu.dnetlib.iis.common.java.jsonworkflownodes.TestingConsumer + + -C{content_url_html, + eu.dnetlib.iis.importer.auxiliary.schemas.DocumentContentUrl, + eu/dnetlib/iis/wf/importer/content_url/data/output/document_content_url_html.json} + -C{content_url_pdf, + eu.dnetlib.iis.importer.auxiliary.schemas.DocumentContentUrl, + eu/dnetlib/iis/wf/importer/content_url/data/output/document_content_url_pdf.json} + -C{content_url_wos, + eu.dnetlib.iis.importer.auxiliary.schemas.DocumentContentUrl, + eu/dnetlib/iis/wf/importer/content_url/data/output/document_content_url_wos.json} + -C{content_url_xml, + eu.dnetlib.iis.importer.auxiliary.schemas.DocumentContentUrl, + eu/dnetlib/iis/wf/importer/content_url/data/output/document_content_url_xml.json} + + + -Icontent_url_html=${workingDir}/out/html + -Icontent_url_pdf=${workingDir}/out/pdf + -Icontent_url_wos=${workingDir}/out/wos + -Icontent_url_xml=${workingDir}/out/xmlpmc + + + + + + + + eu.dnetlib.iis.common.java.ProcessWrapper + eu.dnetlib.iis.common.java.jsonworkflownodes.TestingConsumer + -C{report,eu.dnetlib.iis.common.schemas.ReportEntry,eu/dnetlib/iis/wf/importer/content_url/data/output/report_from_agg_subsystem.json} + -Ireport=${workingDir}/report/import_content_url + + + + + + + + + jdbc:hive2://iis-cdh5-test-m3.ocean.icm.edu.pl:10000 + + DB_NAME=${localDbName} + TABLE_NAME=${localTableName} + + + + + + + + + jdbc:hive2://iis-cdh5-test-m3.ocean.icm.edu.pl:10000 + + DB_NAME=${localDbName} + TABLE_NAME=${localTableName} + + + + + + + Unfortunately, the process failed -- error message: [${wf:errorMessage(wf:lastErrorNode())}] + + + + \ No newline at end of file diff --git a/iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content_url/chain/objectstore_based_importer/oozie_app/import.txt b/iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content_url/chain/objectstore_based_importer/oozie_app/import.txt new file mode 100644 index 000000000..ab9386b5f --- /dev/null +++ b/iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content_url/chain/objectstore_based_importer/oozie_app/import.txt @@ -0,0 +1,2 @@ +## This is a classpath-based import file (this header is required) +import_content_url classpath eu/dnetlib/iis/wf/importer/content_url/chain/oozie_app diff --git a/iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content_url/chain/sampletest/oozie_app/workflow.xml b/iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content_url/chain/objectstore_based_importer/oozie_app/workflow.xml similarity index 99% rename from iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content_url/chain/sampletest/oozie_app/workflow.xml rename to iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content_url/chain/objectstore_based_importer/oozie_app/workflow.xml index f8485e19d..c25669981 100644 --- a/iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content_url/chain/sampletest/oozie_app/workflow.xml +++ b/iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content_url/chain/objectstore_based_importer/oozie_app/workflow.xml @@ -1,5 +1,5 @@ - + ${jobTracker} @@ -17,7 +17,7 @@ - + diff --git a/iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content_url/data/output/report_from_agg_subsystem.json b/iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content_url/data/output/report_from_agg_subsystem.json new file mode 100644 index 000000000..950ce481e --- /dev/null +++ b/iis-wf/iis-wf-import/src/test/resources/eu/dnetlib/iis/wf/importer/content_url/data/output/report_from_agg_subsystem.json @@ -0,0 +1,5 @@ +{"key":"import.content.urls.bytype.pdf", "type":"COUNTER", "value": "1"} +{"key":"import.content.urls.bytype.html", "type":"COUNTER", "value": "1"} +{"key":"import.content.urls.bytype.jats", "type":"COUNTER", "value": "1"} +{"key":"import.content.urls.bytype.wos", "type":"COUNTER", "value": "1"} +{"key": "import.content.urls.fromAggregator", "type": "COUNTER", "value": "4"} diff --git a/iis-wf/iis-wf-primary/src/main/resources/eu/dnetlib/iis/wf/primary/import/oozie_app/workflow.xml b/iis-wf/iis-wf-primary/src/main/resources/eu/dnetlib/iis/wf/primary/import/oozie_app/workflow.xml index 5df7025f4..681bfdc11 100644 --- a/iis-wf/iis-wf-primary/src/main/resources/eu/dnetlib/iis/wf/primary/import/oozie_app/workflow.xml +++ b/iis-wf/iis-wf-primary/src/main/resources/eu/dnetlib/iis/wf/primary/import/oozie_app/workflow.xml @@ -41,6 +41,7 @@ objectstore_service_location + $UNDEFINED$ object store service location required for content retrieval diff --git a/iis-wf/iis-wf-primary/src/main/resources/eu/dnetlib/iis/wf/primary/main/oozie_app/workflow.xml b/iis-wf/iis-wf-primary/src/main/resources/eu/dnetlib/iis/wf/primary/main/oozie_app/workflow.xml index aff140de0..9462dd86e 100644 --- a/iis-wf/iis-wf-primary/src/main/resources/eu/dnetlib/iis/wf/primary/main/oozie_app/workflow.xml +++ b/iis-wf/iis-wf-primary/src/main/resources/eu/dnetlib/iis/wf/primary/main/oozie_app/workflow.xml @@ -94,7 +94,21 @@ import_infospace_graph_location local InformationSpace graph location - + + + import_content_pdfaggregation_table_name + $UNDEFINED$ + Hive table name which is a part of the PDF aggregation subsystem, + defined in 'dbname.tablename' format to indicate both database and table. + This property value dictates the underlying importer module to be used: when set to $UNDEFINED$ value the old + ObjectStore based importer is activated otherwise PDF aggregation subsystem based importer is used. + + + import_content_pdfaggregation_hive_metastore_uris + $UNDEFINED$ + URIs pointing to the hive metastore utilized by the PDF aggregation subsystem + + import_content_object_store_location $UNDEFINED$ @@ -491,7 +505,16 @@ project_concepts_context_ids_csv ${import_project_concepts_context_ids_csv} - + + + input_table_name + ${import_content_pdfaggregation_table_name} + + + hive_metastore_uris + ${import_content_pdfaggregation_hive_metastore_uris} + + objectstore_service_location ${import_content_object_store_location} @@ -500,6 +523,7 @@ objectstore_s3_keystore_location ${import_content_objectstore_s3_keystore_location} + objectstore_s3_endpoint ${import_content_objectstore_s3_endpoint} diff --git a/pom.xml b/pom.xml index 03bfe686a..221bf82f6 100644 --- a/pom.xml +++ b/pom.xml @@ -338,6 +338,13 @@ ${iis.spark.version} provided + + + org.apache.spark + spark-hive_2.11 + ${iis.spark.version} + provided + pl.edu.icm.spark-utils From 4c1b3a2ae80c380d8d3bd2165a09d0f60b1d56c4 Mon Sep 17 00:00:00 2001 From: Marek Horst Date: Wed, 29 Jun 2022 18:13:32 +0200 Subject: [PATCH 9/9] Closes #1298: Refactor the way IIS imports contents by dumping ObjectStore in favour of newly introduced PDF aggregation service Applying code review fixes. --- .../eu/dnetlib/iis/wf/importer/content/WorkflowTest.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/iis-wf/iis-wf-import/src/test/java/eu/dnetlib/iis/wf/importer/content/WorkflowTest.java b/iis-wf/iis-wf-import/src/test/java/eu/dnetlib/iis/wf/importer/content/WorkflowTest.java index 3834ad2e9..cecb34ecc 100644 --- a/iis-wf/iis-wf-import/src/test/java/eu/dnetlib/iis/wf/importer/content/WorkflowTest.java +++ b/iis-wf/iis-wf-import/src/test/java/eu/dnetlib/iis/wf/importer/content/WorkflowTest.java @@ -2,6 +2,8 @@ import eu.dnetlib.iis.common.AbstractOozieWorkflowTestCase; import eu.dnetlib.iis.common.OozieWorkflowTestConfiguration; + +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; /** @@ -19,8 +21,7 @@ public void testImportContentUrlWorkflowWithObjectStoreAsBackend() { testWorkflow("eu/dnetlib/iis/wf/importer/content_url/chain/objectstore_based_importer", wfConf); } - //@Test - //TODO reenable this test once Hive is installed on CI Test hadoop cluster + @Disabled("TODO reenable this test once Hive is installed on CI Test hadoop cluster") public void testImportContentUrlWorkflowWithHiveBasedAggregationSubsystemAsBackend() { OozieWorkflowTestConfiguration wfConf = new OozieWorkflowTestConfiguration(); wfConf.setTimeoutInSeconds(1440);