-
Notifications
You must be signed in to change notification settings - Fork 260
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add a retryable mechanism for SWA features when files get deleted #1127
base: main
Are you sure you want to change the base?
Conversation
contextDF = if (swaHandler.isDefined) swaHandler.get.join(labelDataDef, factDataDefs.toList) else SlidingWindowJoin.join(labelDataDef, factDataDefs.toList) | ||
try { | ||
// THIS IS FOR LOCAL TEST ONLY. It is to induce a spark exception with the root cause of FileNotFoundException. | ||
if (isRetry && LocalFeatureJoinJob.shouldRetryAddingSWAFeatures) throw new SparkException("file not found", new FileNotFoundException()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoid using LocalFeatureJoinJob in non-testing code would be better, e.g. create a class to read as DF, then inject the implementation for local test at runtime.
@@ -338,6 +338,25 @@ private[offline] class DataFrameFeatureJoiner(logicalPlan: MultiStageJoinPlan, d | |||
swaObsTime, | |||
failOnMissingPartition, | |||
swaHandler) | |||
|
|||
// We will retry the SWA features which could not added because of |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix comment
@@ -81,7 +87,9 @@ private[offline] class SlidingWindowAggregationJoiner( | |||
bloomFilters: Option[Map[Seq[Int], BloomFilter]], | |||
swaObsTimeOpt: Option[DateTimeInterval], | |||
failOnMissingPartition: Boolean, | |||
swaHandler: Option[SWAHandler]): FeatureDataFrame = { | |||
swaHandler: Option[SWAHandler], | |||
isRetry: Boolean = true): (FeatureDataFrame, Seq[String]) = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isRetry should be shouldRetry?
The retry will happen only once to ensure that the time to fail is not very high.