-
Notifications
You must be signed in to change notification settings - Fork 5.5k
feat: Distributed Procedure Support Part 1/X - core code base changes #26373
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Distributed Procedure Support Part 1/X - core code base changes #26373
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry @hantangwangd, your pull request is larger than the review limit of 150000 diff characters
2e0cff9 to
dbb5eb0
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry @hantangwangd, your pull request is larger than the review limit of 150000 diff characters
02d3252 to
8bf8be6
Compare
8bf8be6 to
07a4fd9
Compare
tdcmeehan
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mainly wondering if we can avoid the breaking change on Procedure.
| checkArgument(getArguments().get(i).getType().toString().equalsIgnoreCase("varchar"), | ||
| format("Argument `%s` must be string type", SCHEMA)); | ||
| schemaIndex = i; | ||
| } | ||
| else if (getArguments().get(i).getName().equals(TABLE_NAME)) { | ||
| checkArgument(getArguments().get(i).getType().toString().equalsIgnoreCase("varchar"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use StandardTypes
| checkArgument(getArguments().get(i).getType().toString().equalsIgnoreCase("varchar"), | |
| format("Argument `%s` must be string type", SCHEMA)); | |
| schemaIndex = i; | |
| } | |
| else if (getArguments().get(i).getName().equals(TABLE_NAME)) { | |
| checkArgument(getArguments().get(i).getType().toString().equalsIgnoreCase("varchar"), | |
| checkArgument(getArguments().get(i).getType().getBase().equals(VARCHAR), | |
| format("Argument `%s` must be string type", SCHEMA)); | |
| schemaIndex = i; | |
| } | |
| else if (getArguments().get(i).getName().equals(TABLE_NAME)) { | |
| checkArgument(getArguments().get(i).getType().getBase().equals(VARCHAR), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, fixed!
| protected static void checkArgument(boolean assertion, String message) | ||
| { | ||
| if (!assertion) { | ||
| throw new IllegalArgumentException(message); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe a generic IAE will get translated into an uncategorized Presto error. If so, better to use new PrestoException(INVALID_ARGUMENT, ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I've changed the exception type and the corresponding tests.
| import static java.util.stream.Collectors.joining; | ||
|
|
||
| public class Procedure | ||
| public abstract class Procedure |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we can avoid a breaking change here. What if we make Procedure extend an abstract type, for example, BaseProcedure, which DistributedProcedure can also extend from?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great idea! I've renamed the current abstract parent class to BaseProcedure and reverting LocalProcedure back to Procedure. Please take a look when you have a chance. Thanks a lot!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hantangwangd I believe there's still a breaking change here--all connectors will be required to migrate to return BaseProcedure. Wondering if it makes sense to keep getProcedures to return Procedure, but add a separate one getDistributedProcedures so folks can opt-in to these procedures without requiring a migration? We can consider a more generic API if a third type of procedure is added?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tdcmeehan thanks for pointing out this. Yes, you are correct that there's still a breaking change here. I'll look into the feasibility of your suggestion. Also, it seems moving the Argument class into BaseProcedure could also break the backward compatibility. I'll consider both points to figure out a reasonable solution.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @tdcmeehan, I've done the following two things to keep entirely backward compatibility:
-
Retain the
Connector.getProcedures()SPI method as you suggested, and add a new generic method to support otherBaseProceduresubtypes such asDistributedProcedure. -
Use generics for the
Argumentclass hierarchy to enable shared common logic while maintaining backward compatibility.
After this refactoring, existing in-tree and out-tree connectors require no changes, unless they intend to support distributed procedures. Please take a look when you get a chance. Thanks a lot!
07a4fd9 to
bfa4bc0
Compare
f19a8f8 to
d90f700
Compare
c57f657 to
fe00517
Compare
tdcmeehan
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks good to me, just some remaining questions on the SPI changes.
| requireNonNull(procedures, "Connector %s returned a null procedures set"); | ||
| this.procedures = ImmutableSet.copyOf(procedures); | ||
| proceduresBuilder.addAll(procedures); | ||
| Set<DistributedProcedure> distributedProcedures = connector.getProcedures(DistributedProcedure.class); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any particular reason to add a generic method here, instead of a simple addition method getDistributedProcedures which does this as well? I'm thinking adding the new method wouldn't require a deprecation cycle for the older getProcedures method, we simply have two parallel methods that each return different types of procedures.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can consider a more generic API if a third type of procedure is added?
@tdcmeehan thanks for the review and suggestion. I might have misunderstood your comment above. I thought you were suggesting we add a more generic API, to support any future addition of a third type of procedure, rather than one that only supports DistributedProcedure.
Sure, I'll change it to the more straightforward and specific getDistributedProcedures as you suggested. If we need to support a third type of procedure in the future, we can simply add a dedicated API method for it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @tdcmeehan, the API method has been changed to getDistributedProcedures. Please take a look when you get a chance, thanks!
fe00517 to
bb7dcd7
Compare
|
@hantangwangd I just realized that this framework doesn't appear to support explicit access control. My thinking is there should be two levels, the first is, we can have an access control check on the procedure itself, similar to table level access. Secondly, for distributed procedures which write data to a table, we should probably have INSERT + DELETE permissions required. I think this can be done as a followup. Secondly, I believe we should add user-facing documentation to our website for this framework. Instructions for how to create these distributed procedures. This PR is quite large so I will leave it up to you on whether or not to add them now or later, although I have a slight preference for adding that now. |
Thanks for the suggestion. Sure, I will add access control for procedures framework in a followup PR.
Yes, I have been thinking about this documentation for several days. When I started thinking about how to write this document, I realized that distributed procedures may need to include two levels. The first level is how developers can define and extend a new subtype of distributed procedure (refer to |
tdcmeehan
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks good to me. The abstractions match existing conventions faithfully. I'm looking forward to the extensive tests for Iceberg rewrites.
And expose the procedure registry to the `presto-analyzer` and `connectors` module
Refactor `Procedure` and `DistributedProcedure` into abstract classes. Use a subclass `TableDataRewriteDistributedProcedure` for table rewrite tasks, for example, merge small data files, sort table data, repartition table data etc. And introduce a new class `LocalProcedure` to represent the former coordinator-only procedures. Rename `IProcedureRegistry` to `ProcedureRegistry`, and accordingly rename previous `ProcedureRegistry` to `BuiltInProcedureRegistry`.
Rename abstract class `Procedure` to `BaseProcedure`, and then rename `LocalProcedure` back to `Procedure` to maintain backward compatibility
Use `StandardTypes` to check the type of the procedure arguments Throw a `PrestoException` with error code of `INVALID_ARGUMENTS` rather than an IAE
1. Retain the `Connector.getProcedures()` spi method for backward compatibility. Add a new generic method to support other `BaseProcedure` subtypes such as `DistributedProcedure`. 2. Use generics for the `Argument` class hierarchy to enable shared logic while maintaining backward compatibility.
bb7dcd7 to
1c42e20
Compare
|
@tdcmeehan thank you so much for your review throughout the process. I will add the developer documentation ASAP, and add access control for procedure architecture in a followup PR. |
Description
This PR is the first part of many PRs to support distributed procedure into Presto. It is a split of the original entire PR which is located here: #22659.
The whole work in this PR includes the following parts:
Re-factor
ProcedureRegistry/Proceduredata structure to support the creation and register ofDistributedProcedure. And make sureProcedureRegistrybe available inpresto-analyzermodule and connectors, so that we can recognize distributed procedures in call statement during prepare analyze stages.Handle call statement on distributed procedures in preparer stage. In this stage, we figure out the procedure's type in call statement, and define a new query type
CALL_DISTRIBUTED_PROCEDUREforcall distributed procedureinBuiltInPreparedQuery. In this way,call distributed procedurestatement would be handled bySqlQueryExecutionFactory, then be created and handled as aSqlQueryExecution.Analyze and plan the
call distributed procedurestatement based on the subtype of the distributed procedure. For subtypeTableDataRewriteDistributedProcedure, ultimately generate a logical plan for it as follows:CallDistributedProcedureNodeis similar asTableWriterNode. Besides, a new optimizerRewriteWriterTargetis added, which is placed after all optimization rules. It is used to update theTableHandleheld inTableFinishNodeandCallDistributedProcedureNodebased on the underlyingTableScanNodeafter the entire optimization is completed, considering the possible filter pushing down.Motivation and Context
prestodb/rfcs#12
Impact
N/A
Test Plan
Contributor checklist
Release Notes