Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a retryable mechanism for SWA features when files get deleted #1127

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ import org.apache.spark.sql.SparkSession
*/
object LocalFeatureJoinJob {

// This is a config for local test only to induce a FileNotFoundException.
var shouldRetryAddingSWAFeatures = false

// for user convenience, create spark session within this function, so user does not need to create one
// this also ensure it has same setting as the real feathr join job
val ss: SparkSession = createSparkSession(enableHiveSupport = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ private[offline] class DataFrameFeatureJoiner(logicalPlan: MultiStageJoinPlan, d
offline.FeatureDataFrame(obsToJoinWithFeatures, Map())
} else {
val swaJoiner = new SlidingWindowAggregationJoiner(featureGroups.allWindowAggFeatures, anchorToDataSourceMapper)
swaJoiner.joinWindowAggFeaturesAsDF(
val (featureDataFrame, retryableFeatureNames) = swaJoiner.joinWindowAggFeaturesAsDF(
ss,
obsToJoinWithFeatures,
joinConfig,
Expand All @@ -338,6 +338,25 @@ private[offline] class DataFrameFeatureJoiner(logicalPlan: MultiStageJoinPlan, d
swaObsTime,
failOnMissingPartition,
swaHandler)

// We will retry the SWA features which could not added because of
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix comment

val retryableErasedEntityTaggedFeatures = requiredWindowAggFeatures.filter(x => retryableFeatureNames.contains(x.getFeatureName))
if (retryableFeatureNames.nonEmpty) {
swaJoiner.joinWindowAggFeaturesAsDF(
ss,
featureDataFrame.df,
joinConfig,
keyTagIntsToStrings,
windowAggFeatureStages,
retryableErasedEntityTaggedFeatures,
bloomFilters,
swaObsTime,
failOnMissingPartition,
swaHandler,
false)._1
} else {
featureDataFrame
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import com.linkedin.feathr.offline.anchored.keyExtractor.{MVELSourceKeyExtractor
import com.linkedin.feathr.offline.client.DataFrameColName
import com.linkedin.feathr.offline.config.FeatureJoinConfig
import com.linkedin.feathr.offline.exception.FeathrIllegalStateException
import com.linkedin.feathr.offline.job.PreprocessedDataFrameManager
import com.linkedin.feathr.offline.job.{LocalFeatureJoinJob, PreprocessedDataFrameManager}
import com.linkedin.feathr.offline.join.DataFrameKeyCombiner
import com.linkedin.feathr.offline.source.DataSource
import com.linkedin.feathr.offline.source.accessor.DataSourceAccessor
Expand All @@ -22,11 +22,14 @@ import com.linkedin.feathr.offline.{FeatureDataFrame, JoinStage}
import com.linkedin.feathr.swj.{FactData, LabelData, SlidingWindowJoin}
import com.linkedin.feathr.{common, offline}
import org.apache.logging.log4j.LogManager
import org.apache.spark.SparkException
import org.apache.spark.sql.functions.{col, lit}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.util.sketch.BloomFilter

import java.io.FileNotFoundException
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

/**
* Case class containing other SWA handler methods
Expand Down Expand Up @@ -65,6 +68,9 @@ private[offline] class SlidingWindowAggregationJoiner(
* @param obsDF Observation data
* @param swaObsTimeOpt start and end time of observation data
* @param failOnMissingPartition whether to fail the data loading if some of the date partitions are missing.
* @param swaHandler External SWA libraries if any should handle the SWA join
* @param isRetry If this is a retry attempt to retry adding features which were missed because of IOExceptions.
* Default is set to true.
* @return pair of :
* 1) dataframe with feature column appended to the obsData,
* it can be converted to a pair RDD of (observation data record, feature record),
Expand All @@ -81,7 +87,9 @@ private[offline] class SlidingWindowAggregationJoiner(
bloomFilters: Option[Map[Seq[Int], BloomFilter]],
swaObsTimeOpt: Option[DateTimeInterval],
failOnMissingPartition: Boolean,
swaHandler: Option[SWAHandler]): FeatureDataFrame = {
swaHandler: Option[SWAHandler],
isRetry: Boolean = true): (FeatureDataFrame, Seq[String]) = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isRetry should be shouldRetry?

val retryableSwaFeatures = ArrayBuffer.empty[String]
val joinConfigSettings = joinConfig.settings
// extract time window settings
if (joinConfigSettings.isEmpty) {
Expand Down Expand Up @@ -256,8 +264,19 @@ private[offline] class SlidingWindowAggregationJoiner(
}
val origContextObsColumns = labelDataDef.dataSource.columns

contextDF = if (swaHandler.isDefined) swaHandler.get.join(labelDataDef, factDataDefs.toList) else SlidingWindowJoin.join(labelDataDef, factDataDefs.toList)
try {
// THIS IS FOR LOCAL TEST ONLY. It is to induce a spark exception with the root cause of FileNotFoundException.
if (isRetry && LocalFeatureJoinJob.shouldRetryAddingSWAFeatures) throw new SparkException("file not found", new FileNotFoundException())
Copy link
Collaborator

@jaymo001 jaymo001 Mar 29, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoid using LocalFeatureJoinJob in non-testing code would be better, e.g. create a class to read as DF, then inject the implementation for local test at runtime.

contextDF = if (swaHandler.isDefined) swaHandler.get.join(labelDataDef, factDataDefs.toList) else SlidingWindowJoin.join(labelDataDef, factDataDefs.toList)
} catch {
// Many times the files which are to be loaded gets deleted midway. We will retry all the features at this stage again by reloading the datasets.
case exception: SparkException => if (isRetry && exception.getCause != null && exception.getCause.isInstanceOf[FileNotFoundException]) {
val unjoinedFeatures = factDataDefs.flatMap(factData => factData.aggFeatures.map(_.name))
retryableSwaFeatures ++= unjoinedFeatures
}
}

val finalJoinedFeatures = joinedFeatures diff retryableSwaFeatures
contextDF = if (shouldFilterNulls && !factDataRowsWithNulls.isEmpty) {
val nullDfWithFeatureCols = joinedFeatures.foldLeft(factDataRowsWithNulls)((s, x) => s.withColumn(x, lit(null)))
contextDF.union(nullDfWithFeatureCols)
Expand All @@ -272,13 +291,13 @@ private[offline] class SlidingWindowAggregationJoiner(
.asInstanceOf[TimeWindowConfigurableAnchorExtractor].features(nameToFeatureAnchor._1).columnFormat)

val FeatureDataFrame(withFDSFeatureDF, inferredTypes) =
SlidingWindowFeatureUtils.convertSWADFToFDS(contextDF, joinedFeatures.toSet, featureNameToColumnFormat, userSpecifiedTypesConfig)
SlidingWindowFeatureUtils.convertSWADFToFDS(contextDF, finalJoinedFeatures.toSet, featureNameToColumnFormat, userSpecifiedTypesConfig)
// apply default on FDS dataset
val withFeatureContextDF =
substituteDefaults(withFDSFeatureDF, defaults.keys.filter(joinedFeatures.contains).toSeq, defaults, userSpecifiedTypesConfig, ss)
substituteDefaults(withFDSFeatureDF, defaults.keys.filter(finalJoinedFeatures.contains).toSeq, defaults, userSpecifiedTypesConfig, ss)

allInferredFeatureTypes ++= inferredTypes
contextDF = standardizeFeatureColumnNames(origContextObsColumns, withFeatureContextDF, joinedFeatures, keyTags.map(keyTagList))
contextDF = standardizeFeatureColumnNames(origContextObsColumns, withFeatureContextDF, finalJoinedFeatures, keyTags.map(keyTagList))
if (shouldCheckPoint(ss)) {
// checkpoint complicated dataframe for each stage to avoid Spark failure
contextDF = contextDF.checkpoint(true)
Expand All @@ -292,7 +311,7 @@ private[offline] class SlidingWindowAggregationJoiner(
}
}
}})
offline.FeatureDataFrame(contextDF, allInferredFeatureTypes.toMap)
(offline.FeatureDataFrame(contextDF, allInferredFeatureTypes.toMap), retryableSwaFeatures)
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.linkedin.feathr.offline

import com.linkedin.feathr.offline.AssertFeatureUtils.{rowApproxEquals, validateRows}
import com.linkedin.feathr.offline.job.LocalFeatureJoinJob
import com.linkedin.feathr.offline.util.FeathrUtils
import com.linkedin.feathr.offline.util.FeathrUtils.{FILTER_NULLS, SKIP_MISSING_FEATURE, setFeathrJobParam}
import org.apache.spark.sql.Row
Expand Down Expand Up @@ -328,13 +329,88 @@ class SlidingWindowAggIntegTest extends FeathrIntegTest {
setFeathrJobParam(FILTER_NULLS, "false")
}

/**
* test SWA with dense vector feature with retry.
* This should get handled by the SWA retry method.
*/
@Test
def testLocalAnchorSWAWithDenseVectorWithRetry(): Unit = {
LocalFeatureJoinJob.shouldRetryAddingSWAFeatures = true
val res = runLocalFeatureJoinForTest(
"""
| settings: {
| joinTimeSettings: {
| timestampColumn: {
| def: "timestamp"
| format: "yyyy-MM-dd"
| }
| simulateTimeDelay: 1d
| }
|}
|
|features: [
| {
| key: [mId],
| featureList: ["aEmbedding", "memberEmbeddingAutoTZ"]
| }
|]
""".stripMargin,
"""
|sources: {
| swaSource: {
| location: { path: "generation/daily" }
| timePartitionPattern: "yyyy/MM/dd"
| timeWindowParameters: {
| timestampColumn: "timestamp"
| timestampColumnFormat: "yyyy-MM-dd"
| }
| }
|}
|
|anchors: {
| swaAnchor: {
| source: "swaSource"
| key: "x"
| features: {
| aEmbedding: {
| def: "embedding"
| aggregation: LATEST
| window: 3d
| }
| memberEmbeddingAutoTZ: {
| def: "embedding"
| aggregation: LATEST
| window: 3d
| type: {
| type: TENSOR
| tensorCategory: SPARSE
| dimensionType: [INT]
| valType: FLOAT
| }
| }
| }
| }
|}
""".stripMargin,
observationDataPath = "slidingWindowAgg/csvTypeTimeFile1.csv").data

val featureList = res.collect().sortBy(row => if (row.get(0) != null) row.getAs[String]("mId") else "null")

assertEquals(featureList.size, 2)
assertEquals(featureList(0).getAs[Row]("aEmbedding"), mutable.WrappedArray.make(Array(5.5f, 5.8f)))
assertEquals(featureList(0).getAs[Row]("memberEmbeddingAutoTZ"),
TestUtils.build1dSparseTensorFDSRow(Array(0, 1), Array(5.5f, 5.8f)))
LocalFeatureJoinJob.shouldRetryAddingSWAFeatures = false
}

/**
* test SWA with dense vector feature
* The feature dataset generation/daily has different but compatible schema for different partitions,
* this is supported by fuzzyUnion
*/
@Test
def testLocalAnchorSWAWithDenseVector(): Unit = {
LocalFeatureJoinJob.shouldRetry = true
val res = runLocalFeatureJoinForTest(
"""
| settings: {
Expand Down Expand Up @@ -399,6 +475,7 @@ class SlidingWindowAggIntegTest extends FeathrIntegTest {
assertEquals(featureList(0).getAs[Row]("aEmbedding"), mutable.WrappedArray.make(Array(5.5f, 5.8f)))
assertEquals(featureList(0).getAs[Row]("memberEmbeddingAutoTZ"),
TestUtils.build1dSparseTensorFDSRow(Array(0, 1), Array(5.5f, 5.8f)))
LocalFeatureJoinJob.shouldRetry = false
}

/**
Expand Down