Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix EmrCreateJobFlowOperator using deferrable mode with wait_for_completion #41561

Open
wants to merge 8 commits into
base: main
Choose a base branch
from

Conversation

laksh-krishna-sharma
Copy link

@laksh-krishna-sharma laksh-krishna-sharma commented Aug 18, 2024


Summary of Changes

This PR addresses issue #40966 by modifying the logic in the EmrCreateJobFlowOperator to ensure that the deferrable argument only activates the deferral trigger when wait_for_completion is also set to True. Previously, the deferral trigger would be added regardless of the wait_for_completion setting, leading to unintended behavior.

Changes Made:

  • Updated the condition for deferring to check both deferrable and wait_for_completion.
  • Adjusted the condition for synchronous waiting to occur only when wait_for_completion=True and deferrable=False.

Please review the changes, and let me know if further adjustments are needed.

Closes: #40966

Copy link

boring-cyborg bot commented Aug 18, 2024

Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contributors' Guide (https://github.com/apache/airflow/blob/main/contributing-docs/README.rst)
Here are some useful points:

  • Pay attention to the quality of your code (ruff, mypy and type annotations). Our pre-commits will help you with that.
  • In case of a new feature add useful documentation (in docstrings or in docs/ directory). Adding a new operator? Check this short guide Consider adding an example DAG that shows how users should use it.
  • Consider using Breeze environment for testing locally, it's a heavy docker but it ships with a working Airflow and a lot of integrations.
  • Be patient and persistent. It might take some time to get a review or get the final approval from Committers.
  • Please follow ASF Code of Conduct for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack.
  • Be sure to read the Airflow Coding style.
  • Always keep your Pull Requests rebased, otherwise your build might fail due to changes not related to your commits.
    Apache Airflow is a community-driven project and together we are making it better 🚀.
    In case of doubts contact the developers at:
    Mailing List: [email protected]
    Slack: https://s.apache.org/airflow-slack

Copy link
Contributor

@shahar1 shahar1 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Welcome to Apache Airflow and congrats for your first PR!
Good start, I have some comments :)

@@ -824,7 +824,7 @@ def execute(self, context: Context) -> str | None:
job_flow_id=self._job_flow_id,
log_uri=get_log_uri(emr_client=self._emr_hook.conn, job_flow_id=self._job_flow_id),
)
if self.deferrable:
if self.deferrable and self.wait_for_completion:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. I'd consider refactoring both conditions as follows (a bit more readable IMO):
if self.wait_for_completion:
    if self.deferrable:
       ...
    else:
       ...
  1. Unit tests + docstring update for this behavior would be helpful.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is not it the case for 1? But I definitely agree on 2 :)

airflow/providers/amazon/aws/operators/emr.py Show resolved Hide resolved
@laksh-krishna-sharma
Copy link
Author

Thank you for letting me know, @shahar1. I will make the changes and add comments again . I'll update you once the problem is resolved."

@laksh-krishna-sharma
Copy link
Author

@shahar1 sir please review the update.

@vincbeck
Copy link
Contributor

Can you please add unit tests?

@laksh-krishna-sharma
Copy link
Author

@shahar1 sir could you please advise on the appropriate folder for placing unit test cases?

@shahar1
Copy link
Contributor

shahar1 commented Aug 23, 2024

@shahar1 sir could you please advise on the appropriate folder for placing unit test cases?

tests/providers/amazon/aws/operators/, and there check all the files that start with emr_.

@laksh-krishna-sharma
Copy link
Author

@shahar1 sir and @vincbeck sir I've added unit test cases and done my best to cover the expected behaviors. As I'm a beginner, I'd greatly appreciate your feedback and any suggestions for improvement.

Copy link
Contributor

@vincbeck vincbeck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good job!

@vincbeck
Copy link
Contributor

@shahar1 sir and @vincbeck sir I've added unit test cases and done my best to cover the expected behaviors. As I'm a beginner, I'd greatly appreciate your feedback and any suggestions for improvement.

Thanks for doing it! The tests are pretty good!

@shahar1
Copy link
Contributor

shahar1 commented Aug 26, 2024

Looks great!

@vincbeck
Copy link
Contributor

Some tests are failing, could you check that please?

@vincbeck
Copy link
Contributor

@laksh-krishna-sharma
Copy link
Author

Thank you for your review @vincbeck and @shahar1. I will investigate the issue and address the failing tests as soon as possible. I'll update once the problem is resolved.

@laksh-krishna-sharma
Copy link
Author

@shahar1 sir and @vincbeck sir I've added unit test cases and done my best to cover the expected behaviors. As I'm a beginner, I'd greatly appreciate your feedback and any suggestions for improvement but i am stuck in resolving these errors please sir can you guide me to resolve these issue

@shahar1 shahar1 self-requested a review September 19, 2024 13:56
@shahar1
Copy link
Contributor

shahar1 commented Sep 19, 2024

@shahar1 sir and @vincbeck sir I've added unit test cases and done my best to cover the expected behaviors. As I'm a beginner, I'd greatly appreciate your feedback and any suggestions for improvement but i am stuck in resolving these errors please sir can you guide me to resolve these issue

Hello, here's some guidance:

  1. In test_create_job_flow_deferrable you need to set self.operator.wait_for_completion = True (otherwise it won't pass the first if).
  2. In test_deferrable_and_wait_for_completion, when you call the assert statement you create a new instance of EmrCreateJobFlowTrigger - which will always be different than the instance that is created by the original function.
    I'd suggest to mock EmrCreateJobFlowTrigger (let's call it mock_trigger) and then you could assert on trigger=mock_trigger.return_value.
    You could also create a separate assertion for the attributes of mock_trigger to ensure that all the parameters are as expected.

Also, please notice that you need to resolve conflicts by merging/rebasing from main.
Please let me know if you encounter any issues.

@laksh-krishna-sharma
Copy link
Author

Thank you for your help, @shahar1 sir. I’ve resolved the issue, and all checks have now passed on GitHub. I really appreciate your guidance!

@vincbeck
Copy link
Contributor

There is a conflict now :)

@eladkal eladkal changed the title fixing emr-operator-deferral-logic Fix EmrCreateJobFlowOperator using deferrable mode with wait_for_completion Sep 20, 2024
@laksh-krishna-sharma
Copy link
Author

Sir, I resolved the branch conflict. Please review the updates.

@laksh-krishna-sharma
Copy link
Author

@shahar1 sir and @vincbeck sir I've tried to resolve conflict and done my best to cover the expected behaviors. As I'm a beginner, I'd greatly appreciate your feedback and any suggestions for improvement but i am stuck in resolving these errors please sir can you guide me to resolve these issue

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:providers provider:amazon-aws AWS/Amazon - related issues
Projects
None yet
Development

Successfully merging this pull request may close these issues.

EmrCreateJobFlowOperator should not wait_for_completion if wait_for_completion=false deferrable=true
3 participants