-
Notifications
You must be signed in to change notification settings - Fork 237
Add a retryable mechanism for SWA features when files get deleted #1127
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -9,7 +9,7 @@ import com.linkedin.feathr.offline.anchored.keyExtractor.{MVELSourceKeyExtractor | |
| import com.linkedin.feathr.offline.client.DataFrameColName | ||
| import com.linkedin.feathr.offline.config.FeatureJoinConfig | ||
| import com.linkedin.feathr.offline.exception.FeathrIllegalStateException | ||
| import com.linkedin.feathr.offline.job.PreprocessedDataFrameManager | ||
| import com.linkedin.feathr.offline.job.{LocalFeatureJoinJob, PreprocessedDataFrameManager} | ||
| import com.linkedin.feathr.offline.join.DataFrameKeyCombiner | ||
| import com.linkedin.feathr.offline.source.DataSource | ||
| import com.linkedin.feathr.offline.source.accessor.DataSourceAccessor | ||
|
|
@@ -22,11 +22,14 @@ import com.linkedin.feathr.offline.{FeatureDataFrame, JoinStage} | |
| import com.linkedin.feathr.swj.{FactData, LabelData, SlidingWindowJoin} | ||
| import com.linkedin.feathr.{common, offline} | ||
| import org.apache.logging.log4j.LogManager | ||
| import org.apache.spark.SparkException | ||
| import org.apache.spark.sql.functions.{col, lit} | ||
| import org.apache.spark.sql.{DataFrame, SparkSession} | ||
| import org.apache.spark.util.sketch.BloomFilter | ||
|
|
||
| import java.io.FileNotFoundException | ||
| import scala.collection.mutable | ||
| import scala.collection.mutable.ArrayBuffer | ||
|
|
||
| /** | ||
| * Case class containing other SWA handler methods | ||
|
|
@@ -65,6 +68,9 @@ private[offline] class SlidingWindowAggregationJoiner( | |
| * @param obsDF Observation data | ||
| * @param swaObsTimeOpt start and end time of observation data | ||
| * @param failOnMissingPartition whether to fail the data loading if some of the date partitions are missing. | ||
| * @param swaHandler External SWA libraries if any should handle the SWA join | ||
| * @param isRetry If this is a retry attempt to retry adding features which were missed because of IOExceptions. | ||
| * Default is set to true. | ||
| * @return pair of : | ||
| * 1) dataframe with feature column appended to the obsData, | ||
| * it can be converted to a pair RDD of (observation data record, feature record), | ||
|
|
@@ -81,7 +87,9 @@ private[offline] class SlidingWindowAggregationJoiner( | |
| bloomFilters: Option[Map[Seq[Int], BloomFilter]], | ||
| swaObsTimeOpt: Option[DateTimeInterval], | ||
| failOnMissingPartition: Boolean, | ||
| swaHandler: Option[SWAHandler]): FeatureDataFrame = { | ||
| swaHandler: Option[SWAHandler], | ||
| isRetry: Boolean = true): (FeatureDataFrame, Seq[String]) = { | ||
|
||
| val retryableSwaFeatures = ArrayBuffer.empty[String] | ||
| val joinConfigSettings = joinConfig.settings | ||
| // extract time window settings | ||
| if (joinConfigSettings.isEmpty) { | ||
|
|
@@ -256,8 +264,19 @@ private[offline] class SlidingWindowAggregationJoiner( | |
| } | ||
| val origContextObsColumns = labelDataDef.dataSource.columns | ||
|
|
||
| contextDF = if (swaHandler.isDefined) swaHandler.get.join(labelDataDef, factDataDefs.toList) else SlidingWindowJoin.join(labelDataDef, factDataDefs.toList) | ||
| try { | ||
| // THIS IS FOR LOCAL TEST ONLY. It is to induce a spark exception with the root cause of FileNotFoundException. | ||
| if (isRetry && LocalFeatureJoinJob.shouldRetryAddingSWAFeatures) throw new SparkException("file not found", new FileNotFoundException()) | ||
|
||
| contextDF = if (swaHandler.isDefined) swaHandler.get.join(labelDataDef, factDataDefs.toList) else SlidingWindowJoin.join(labelDataDef, factDataDefs.toList) | ||
| } catch { | ||
| // Many times the files which are to be loaded gets deleted midway. We will retry all the features at this stage again by reloading the datasets. | ||
| case exception: SparkException => if (isRetry && exception.getCause != null && exception.getCause.isInstanceOf[FileNotFoundException]) { | ||
| val unjoinedFeatures = factDataDefs.flatMap(factData => factData.aggFeatures.map(_.name)) | ||
| retryableSwaFeatures ++= unjoinedFeatures | ||
| } | ||
| } | ||
|
|
||
| val finalJoinedFeatures = joinedFeatures diff retryableSwaFeatures | ||
| contextDF = if (shouldFilterNulls && !factDataRowsWithNulls.isEmpty) { | ||
| val nullDfWithFeatureCols = joinedFeatures.foldLeft(factDataRowsWithNulls)((s, x) => s.withColumn(x, lit(null))) | ||
| contextDF.union(nullDfWithFeatureCols) | ||
|
|
@@ -272,13 +291,13 @@ private[offline] class SlidingWindowAggregationJoiner( | |
| .asInstanceOf[TimeWindowConfigurableAnchorExtractor].features(nameToFeatureAnchor._1).columnFormat) | ||
|
|
||
| val FeatureDataFrame(withFDSFeatureDF, inferredTypes) = | ||
| SlidingWindowFeatureUtils.convertSWADFToFDS(contextDF, joinedFeatures.toSet, featureNameToColumnFormat, userSpecifiedTypesConfig) | ||
| SlidingWindowFeatureUtils.convertSWADFToFDS(contextDF, finalJoinedFeatures.toSet, featureNameToColumnFormat, userSpecifiedTypesConfig) | ||
| // apply default on FDS dataset | ||
| val withFeatureContextDF = | ||
| substituteDefaults(withFDSFeatureDF, defaults.keys.filter(joinedFeatures.contains).toSeq, defaults, userSpecifiedTypesConfig, ss) | ||
| substituteDefaults(withFDSFeatureDF, defaults.keys.filter(finalJoinedFeatures.contains).toSeq, defaults, userSpecifiedTypesConfig, ss) | ||
|
|
||
| allInferredFeatureTypes ++= inferredTypes | ||
| contextDF = standardizeFeatureColumnNames(origContextObsColumns, withFeatureContextDF, joinedFeatures, keyTags.map(keyTagList)) | ||
| contextDF = standardizeFeatureColumnNames(origContextObsColumns, withFeatureContextDF, finalJoinedFeatures, keyTags.map(keyTagList)) | ||
| if (shouldCheckPoint(ss)) { | ||
| // checkpoint complicated dataframe for each stage to avoid Spark failure | ||
| contextDF = contextDF.checkpoint(true) | ||
|
|
@@ -292,7 +311,7 @@ private[offline] class SlidingWindowAggregationJoiner( | |
| } | ||
| } | ||
| }}) | ||
| offline.FeatureDataFrame(contextDF, allInferredFeatureTypes.toMap) | ||
| (offline.FeatureDataFrame(contextDF, allInferredFeatureTypes.toMap), retryableSwaFeatures) | ||
| } | ||
|
|
||
| /** | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix comment