Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a retryable mechanism for SWA features when files get deleted #1127

Open
wants to merge 4 commits into
base: main
Choose a base branch
from

Conversation

rakeshkashyap123
Copy link
Collaborator

The retry will happen only once to ensure that the time to fail is not very high.

@rakeshkashyap123 rakeshkashyap123 added the safe to test Tag to execute build pipeline for a PR from forked repo label Mar 29, 2023
contextDF = if (swaHandler.isDefined) swaHandler.get.join(labelDataDef, factDataDefs.toList) else SlidingWindowJoin.join(labelDataDef, factDataDefs.toList)
try {
// THIS IS FOR LOCAL TEST ONLY. It is to induce a spark exception with the root cause of FileNotFoundException.
if (isRetry && LocalFeatureJoinJob.shouldRetryAddingSWAFeatures) throw new SparkException("file not found", new FileNotFoundException())
Copy link
Collaborator

@jaymo001 jaymo001 Mar 29, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoid using LocalFeatureJoinJob in non-testing code would be better, e.g. create a class to read as DF, then inject the implementation for local test at runtime.

@@ -338,6 +338,25 @@ private[offline] class DataFrameFeatureJoiner(logicalPlan: MultiStageJoinPlan, d
swaObsTime,
failOnMissingPartition,
swaHandler)

// We will retry the SWA features which could not added because of
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix comment

@@ -81,7 +87,9 @@ private[offline] class SlidingWindowAggregationJoiner(
bloomFilters: Option[Map[Seq[Int], BloomFilter]],
swaObsTimeOpt: Option[DateTimeInterval],
failOnMissingPartition: Boolean,
swaHandler: Option[SWAHandler]): FeatureDataFrame = {
swaHandler: Option[SWAHandler],
isRetry: Boolean = true): (FeatureDataFrame, Seq[String]) = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isRetry should be shouldRetry?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
safe to test Tag to execute build pipeline for a PR from forked repo
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants