Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integrate new aggregation subsystem based content importer module #1361

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,11 @@ public static SparkSession withConfAndKryo(SparkConf conf) {
.config(SparkConfHelper.withKryo(conf))
.getOrCreate();
}

public static SparkSession withHiveEnabledConfAndKryo(SparkConf conf) {
return SparkSession.builder()
.config(SparkConfHelper.withKryo(conf))
.enableHiveSupport()
.getOrCreate();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,24 @@ public static void runWithSparkSession(SparkConf conf,
job);
}

/**
* Runs a given job using SparkSession created using default builder and supplied SparkConf. Enables Hive support.
* Stops SparkSession when SparkSession is shared e.g. created in tests. Allows to reuse SparkSession created externally.
*
* @param conf SparkConf instance
* @param isSparkSessionShared When true will not stop SparkSession
* @param job Job using constructed SparkSession
*/
public static void runWithHiveEnabledSparkSession(SparkConf conf,
Boolean isSparkSessionShared,
Job job) {
runWithSparkSession(
SparkSessionFactory::withHiveEnabledConfAndKryo,
conf,
isSparkSessionShared,
job);
}

/**
* Runs a given job using SparkSession created using supplied builder and supplied SparkConf. Stops SparkSession
* when SparkSession is shared e.g. created in tests. Allows to reuse SparkSession created externally.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeEach;

import org.apache.spark.sql.SparkSession.Builder;

import java.util.Objects;

/**
Expand All @@ -15,7 +17,22 @@
public class TestWithSharedSparkSession {
private static SparkSession _spark;
protected boolean initialized = false;
protected final boolean hiveSupportEnabled;

/**
* Default constructor with hive support in spark session disabled.
*/
public TestWithSharedSparkSession() {
this(false);
}

/**
* @param hiveSupportEnabled indicates whether hive support should be enabled in spark session
*/
public TestWithSharedSparkSession(boolean hiveSupportEnabled) {
this.hiveSupportEnabled = hiveSupportEnabled;
}

public SparkSession spark() {
return _spark;
}
Expand All @@ -31,7 +48,11 @@ public void beforeEach() {
conf.set("spark.driver.host", "localhost");
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
conf.set("spark.kryo.registrator", "pl.edu.icm.sparkutils.avro.AvroCompatibleKryoRegistrator");
_spark = SparkSession.builder().config(conf).getOrCreate();
Builder builder = SparkSession.builder().config(conf);
if (hiveSupportEnabled) {
builder = builder.enableHiveSupport();
}
_spark = builder.getOrCreate();
}
}

Expand Down
12 changes: 12 additions & 0 deletions iis-wf/iis-wf-import/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,18 @@
<groupId>org.apache.spark</groupId>
<artifactId>spark-avro_2.11</artifactId>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<exclusions>
<exclusion>
<!-- unavailable dependency in cloudera repo -->
<groupId>eigenbase</groupId>
<artifactId>eigenbase-properties</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.httpcomponents</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package eu.dnetlib.iis.wf.importer.content;

import static eu.dnetlib.iis.common.spark.SparkSessionSupport.runWithHiveEnabledSparkSession;
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.concat;
import static org.apache.spark.sql.functions.lit;

import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.Parameters;
import com.google.common.collect.Lists;

import eu.dnetlib.iis.common.InfoSpaceConstants;
import eu.dnetlib.iis.common.java.io.HdfsUtils;
import eu.dnetlib.iis.common.report.ReportEntryFactory;
import eu.dnetlib.iis.common.schemas.ReportEntry;
import eu.dnetlib.iis.common.spark.SparkConfHelper;
import eu.dnetlib.iis.common.spark.avro.AvroDataFrameSupport;
import eu.dnetlib.iis.importer.auxiliary.schemas.DocumentContentUrl;
import pl.edu.icm.sparkutils.avro.SparkAvroSaver;


/**
* {@link DocumentContentUrl} importer reading data from hive table being a part of pdf aggregation subsystem.
*
* @author mhorst
*
*/
public class HiveBasedDocumentContentUrlImporterJob {

private static SparkAvroSaver avroSaver = new SparkAvroSaver();

private static final String COUNTER_IMPORTED_RECORDS_TOTAL = "import.content.urls.fromAggregator";

public static void main(String[] args) throws Exception {

HiveBasedDocumentContentUrlImporterJobParameters params = new HiveBasedDocumentContentUrlImporterJobParameters();
JCommander jcommander = new JCommander(params);
jcommander.parse(args);

SparkConf conf = SparkConfHelper.withKryo(new SparkConf());
if (!StringUtils.isEmpty(params.hiveMetastoreUris)) {
conf.set("hive.metastore.uris", params.hiveMetastoreUris);
}

runWithHiveEnabledSparkSession(conf, params.isSparkSessionShared, sparkSession -> {

HdfsUtils.remove(sparkSession.sparkContext().hadoopConfiguration(), params.outputPath);
HdfsUtils.remove(sparkSession.sparkContext().hadoopConfiguration(), params.outputReportPath);

Dataset<Row> result = sparkSession.sql("select id, location, mimetype, size, hash from "
+ params.inputTableName + " where location is not null");

JavaRDD<DocumentContentUrl> documentContentUrl = buildOutputRecord(result, sparkSession);
documentContentUrl.cache();

JavaRDD<ReportEntry> reports = generateReportEntries(sparkSession, documentContentUrl.count());

avroSaver.saveJavaRDD(documentContentUrl, DocumentContentUrl.SCHEMA$, params.outputPath);
avroSaver.saveJavaRDD(reports, ReportEntry.SCHEMA$, params.outputReportPath);
});
}

private static JavaRDD<ReportEntry> generateReportEntries(SparkSession sparkSession, long recordsCount) {
return sparkSession.createDataset(
Lists.newArrayList(
ReportEntryFactory.createCounterReportEntry(COUNTER_IMPORTED_RECORDS_TOTAL, recordsCount)),
Encoders.kryo(ReportEntry.class)).javaRDD();
}

private static JavaRDD<DocumentContentUrl> buildOutputRecord(Dataset<Row> source, SparkSession spark) {
Dataset<Row> resultDs = source.select(
concat(lit(InfoSpaceConstants.ROW_PREFIX_RESULT), col("id")).as("id"),
col("location").as("url"),
col("mimetype").as("mimeType"),
col("size").cast("long").divide(1024).as("contentSizeKB"),
col("hash").as("contentChecksum")
);
return new AvroDataFrameSupport(spark).toDS(resultDs, DocumentContentUrl.class).toJavaRDD();
}

@Parameters(separators = "=")
private static class HiveBasedDocumentContentUrlImporterJobParameters {

@Parameter(names = "-sharedSparkSession")
private Boolean isSparkSessionShared = Boolean.FALSE;

@Parameter(names = "-inputTableName", required = true)
private String inputTableName;

@Parameter(names = "-hiveMetastoreUris", required = true)
private String hiveMetastoreUris;

@Parameter(names = "-outputPath", required = true)
private String outputPath;

@Parameter(names = "-outputReportPath", required = true)
private String outputReportPath;
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
## This is a classpath-based import file (this header is required)
import_content_url_core classpath eu/dnetlib/iis/wf/importer/content_url/core/oozie_app
import_content_url_core_parquet classpath eu/dnetlib/iis/wf/importer/content_url/core_parquet/oozie_app
import_content_url_dedup classpath eu/dnetlib/iis/wf/importer/content_url/dedup/oozie_app
transformers_idreplacer classpath eu/dnetlib/iis/wf/transformers/idreplacer/oozie_app
transformers_common_existencefilter classpath eu/dnetlib/iis/wf/transformers/common/existencefilter/oozie_app
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,40 @@

<parameters>
<!-- input -->
<!-- new aggregation system backed import related -->
<property>
<name>input_table_name</name>
<value>$UNDEFINED$</value>
<description>hive input table name</description>
</property>
<property>
<name>hive_metastore_uris</name>
<value>$UNDEFINED$</value>
<description>hive metastore locations</description>
</property>

<!-- old objectstore backed import related -->
<property>
<name>objectstore_facade_factory_classname</name>
<value>eu.dnetlib.iis.wf.importer.facade.WebServiceObjectStoreFacadeFactory</value>
<description>ServiceFacadeFactory implementation class name producing eu.dnetlib.iis.wf.importer.facade.ObjectStoreFacade</description>
</property>
<property>
<name>objectstore_service_location</name>
<description>object store service location to retrieve PDF/text contents from</description>
</property>
<property>
<name>approved_objectstores_csv</name>
<value>$UNDEFINED$</value>
<description>CSV of approved object stores</description>
</property>
<property>
<name>blacklisted_objectstores_csv</name>
<value>$UNDEFINED$</value>
<description>CSV of blacklisted object stores</description>
</property>
<property>
<name>objectstore_service_location</name>
<value>$UNDEFINED$</value>
<description>object store service location to retrieve PDF/text contents from</description>
</property>
<property>
<name>approved_objectstores_csv</name>
<value>$UNDEFINED$</value>
<description>CSV of approved object stores</description>
</property>
<property>
<name>blacklisted_objectstores_csv</name>
<value>$UNDEFINED$</value>
<description>CSV of blacklisted object stores</description>
</property>

<property>
<name>mimetypes_pdf</name>
<description>pdf mime types</description>
Expand Down Expand Up @@ -146,9 +161,16 @@
<arg>org.apache.avro.Schema.Type.STRING</arg>
<capture-output />
</java>
<ok to="objectstore-content-url-importer" />
marekhorst marked this conversation as resolved.
Show resolved Hide resolved
<ok to="decision-content-url-import-mode" />
<error to="fail" />
</action>

<decision name="decision-content-url-import-mode">
<switch>
<case to="objectstore-content-url-importer">${input_table_name eq "$UNDEFINED$"}</case>
<default to="parquet-content-url-importer"/>
</switch>
</decision>

<action name="objectstore-content-url-importer">
<sub-workflow>
Expand All @@ -169,6 +191,25 @@
<error to="fail" />
</action>

<action name="parquet-content-url-importer">
<sub-workflow>
<app-path>${wf:appPath()}/import_content_url_core_parquet</app-path>
<propagate-configuration/>
<configuration>
<property>
<name>workingDir</name>
<value>${workingDir}/import_content_url_core/working_dir</value>
</property>
<property>
<name>output</name>
<value>${workingDir}/imported-urls</value>
</property>
</configuration>
</sub-workflow>
<ok to="input_id_mapping-path-setter" />
<error to="fail" />
</action>

<action name='input_id_mapping-path-setter'>
<java>
<main-class>eu.dnetlib.iis.common.java.ProcessWrapper</main-class>
Expand Down Expand Up @@ -239,7 +280,7 @@

<decision name="decision-existence-filter">
<switch>
<case to="content-url-dispatcher">${input_id eq "$UNDEFINED$"}</case>
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This particular change could be confusing but it is bound to a specific commit which is closing the #1342 issue spotted when working on #1298.

Maybe it is not worth to elevate it to a dedicated PR but we should find a nice way to indicate this solution somehow in the commit comment once the commits get squshed before the merge.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I was surprised at first but found the reason for this change.

I personally prefer just not squashing, but adding "Also closes #1342, which was found during this work, by going to content-url-dedup from decision-existence-filter in the content_url_chain" should be enough, I think.

<case to="content-url-dedup">${input_id eq "$UNDEFINED$"}</case>
<default to="transformers_common_existencefilter"/>
</switch>
</decision>
Expand Down
Loading