Skip to content

Conversation

@hantangwangd
Copy link
Member

@hantangwangd hantangwangd commented Oct 21, 2025

Description

This PR is the first part of many PRs to support distributed procedure into Presto. It is a split of the original entire PR which is located here: #22659.

The whole work in this PR includes the following parts:

  1. Re-factor ProcedureRegistry/Procedure data structure to support the creation and register of DistributedProcedure. And make sure ProcedureRegistry be available in presto-analyzer module and connectors, so that we can recognize distributed procedures in call statement during prepare analyze stages.

  2. Handle call statement on distributed procedures in preparer stage. In this stage, we figure out the procedure's type in call statement, and define a new query type CALL_DISTRIBUTED_PROCEDURE for call distributed procedure in BuiltInPreparedQuery. In this way, call distributed procedure statement would be handled by SqlQueryExecutionFactory, then be created and handled as a SqlQueryExecution.

  3. Analyze and plan the call distributed procedure statement based on the subtype of the distributed procedure. For subtype TableDataRewriteDistributedProcedure, ultimately generate a logical plan for it as follows:

OutputNode <- TableFinishNode <- CallDistributedProcedureNode <- FilterNode <- TableScanNode
  1. Optimize, segmentation, grouped tag and local plan for the logical plan generated above. The handle logical for CallDistributedProcedureNode is similar as TableWriterNode. Besides, a new optimizer RewriteWriterTarget is added, which is placed after all optimization rules. It is used to update the TableHandle held in TableFinishNode and CallDistributedProcedureNode based on the underlying TableScanNode after the entire optimization is completed, considering the possible filter pushing down.

Motivation and Context

prestodb/rfcs#12

Impact

N/A

Test Plan

  • Add test cases in each phase involving the procedure architecture expansion, including creating and registering for distributed procedures, preparing for call distributed procedure, analyzing for call distributed procedure, logical planning and optimizing for call distributed procedure

Contributor checklist

  • Please make sure your submission complies with our contributing guide, in particular code style and commit standards.
  • PR description addresses the issue accurately and concisely. If the change is non-trivial, a GitHub Issue is referenced.
  • Documented new properties (with its default value), SQL syntax, functions, or other functionality.
  • If release notes are required, they follow the release notes guidelines.
  • Adequate tests were added if applicable.
  • CI passed.
  • If adding new dependencies, verified they have an OpenSSF Scorecard score of 5.0 or higher (or obtained explicit TSC approval for lower scores).

Release Notes

== RELEASE NOTES ==

General Changes
 * Upgrade the procedure architecture to support distributed executing procedures

Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry @hantangwangd, your pull request is larger than the review limit of 150000 diff characters

@hantangwangd hantangwangd force-pushed the support_call_distributed_procedure_part1 branch 2 times, most recently from 2e0cff9 to dbb5eb0 Compare October 21, 2025 09:26
@hantangwangd hantangwangd marked this pull request as ready for review October 21, 2025 12:13
Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry @hantangwangd, your pull request is larger than the review limit of 150000 diff characters

@tdcmeehan tdcmeehan self-assigned this Oct 22, 2025
@hantangwangd hantangwangd force-pushed the support_call_distributed_procedure_part1 branch 2 times, most recently from 02d3252 to 8bf8be6 Compare October 30, 2025 10:44
@hantangwangd hantangwangd force-pushed the support_call_distributed_procedure_part1 branch from 8bf8be6 to 07a4fd9 Compare October 30, 2025 13:05
Copy link
Contributor

@tdcmeehan tdcmeehan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mainly wondering if we can avoid the breaking change on Procedure.

Comment on lines 57 to 62
checkArgument(getArguments().get(i).getType().toString().equalsIgnoreCase("varchar"),
format("Argument `%s` must be string type", SCHEMA));
schemaIndex = i;
}
else if (getArguments().get(i).getName().equals(TABLE_NAME)) {
checkArgument(getArguments().get(i).getType().toString().equalsIgnoreCase("varchar"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use StandardTypes

Suggested change
checkArgument(getArguments().get(i).getType().toString().equalsIgnoreCase("varchar"),
format("Argument `%s` must be string type", SCHEMA));
schemaIndex = i;
}
else if (getArguments().get(i).getName().equals(TABLE_NAME)) {
checkArgument(getArguments().get(i).getType().toString().equalsIgnoreCase("varchar"),
checkArgument(getArguments().get(i).getType().getBase().equals(VARCHAR),
format("Argument `%s` must be string type", SCHEMA));
schemaIndex = i;
}
else if (getArguments().get(i).getName().equals(TABLE_NAME)) {
checkArgument(getArguments().get(i).getType().getBase().equals(VARCHAR),

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, fixed!

protected static void checkArgument(boolean assertion, String message)
{
if (!assertion) {
throw new IllegalArgumentException(message);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe a generic IAE will get translated into an uncategorized Presto error. If so, better to use new PrestoException(INVALID_ARGUMENT, ...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I've changed the exception type and the corresponding tests.

import static java.util.stream.Collectors.joining;

public class Procedure
public abstract class Procedure
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we can avoid a breaking change here. What if we make Procedure extend an abstract type, for example, BaseProcedure, which DistributedProcedure can also extend from?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great idea! I've renamed the current abstract parent class to BaseProcedure and reverting LocalProcedure back to Procedure. Please take a look when you have a chance. Thanks a lot!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hantangwangd I believe there's still a breaking change here--all connectors will be required to migrate to return BaseProcedure. Wondering if it makes sense to keep getProcedures to return Procedure, but add a separate one getDistributedProcedures so folks can opt-in to these procedures without requiring a migration? We can consider a more generic API if a third type of procedure is added?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tdcmeehan thanks for pointing out this. Yes, you are correct that there's still a breaking change here. I'll look into the feasibility of your suggestion. Also, it seems moving the Argument class into BaseProcedure could also break the backward compatibility. I'll consider both points to figure out a reasonable solution.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @tdcmeehan, I've done the following two things to keep entirely backward compatibility:

  1. Retain the Connector.getProcedures() SPI method as you suggested, and add a new generic method to support other BaseProcedure subtypes such as DistributedProcedure.

  2. Use generics for the Argument class hierarchy to enable shared common logic while maintaining backward compatibility.

After this refactoring, existing in-tree and out-tree connectors require no changes, unless they intend to support distributed procedures. Please take a look when you get a chance. Thanks a lot!

@hantangwangd hantangwangd force-pushed the support_call_distributed_procedure_part1 branch from 07a4fd9 to bfa4bc0 Compare November 1, 2025 17:13
@hantangwangd hantangwangd force-pushed the support_call_distributed_procedure_part1 branch 5 times, most recently from f19a8f8 to d90f700 Compare November 2, 2025 03:05
@hantangwangd hantangwangd force-pushed the support_call_distributed_procedure_part1 branch 3 times, most recently from c57f657 to fe00517 Compare November 8, 2025 17:01
Copy link
Contributor

@tdcmeehan tdcmeehan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks good to me, just some remaining questions on the SPI changes.

requireNonNull(procedures, "Connector %s returned a null procedures set");
this.procedures = ImmutableSet.copyOf(procedures);
proceduresBuilder.addAll(procedures);
Set<DistributedProcedure> distributedProcedures = connector.getProcedures(DistributedProcedure.class);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any particular reason to add a generic method here, instead of a simple addition method getDistributedProcedures which does this as well? I'm thinking adding the new method wouldn't require a deprecation cycle for the older getProcedures method, we simply have two parallel methods that each return different types of procedures.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can consider a more generic API if a third type of procedure is added?

@tdcmeehan thanks for the review and suggestion. I might have misunderstood your comment above. I thought you were suggesting we add a more generic API, to support any future addition of a third type of procedure, rather than one that only supports DistributedProcedure.

Sure, I'll change it to the more straightforward and specific getDistributedProcedures as you suggested. If we need to support a third type of procedure in the future, we can simply add a dedicated API method for it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @tdcmeehan, the API method has been changed to getDistributedProcedures. Please take a look when you get a chance, thanks!

@hantangwangd hantangwangd force-pushed the support_call_distributed_procedure_part1 branch from fe00517 to bb7dcd7 Compare November 11, 2025 02:54
@tdcmeehan
Copy link
Contributor

@hantangwangd I just realized that this framework doesn't appear to support explicit access control. My thinking is there should be two levels, the first is, we can have an access control check on the procedure itself, similar to table level access. Secondly, for distributed procedures which write data to a table, we should probably have INSERT + DELETE permissions required. I think this can be done as a followup.

Secondly, I believe we should add user-facing documentation to our website for this framework. Instructions for how to create these distributed procedures. This PR is quite large so I will leave it up to you on whether or not to add them now or later, although I have a slight preference for adding that now.

@hantangwangd
Copy link
Member Author

My thinking is there should be two levels, the first is, we can have an access control check on the procedure itself, similar to table level access. Secondly, for distributed procedures which write data to a table, we should probably have INSERT + DELETE permissions required. I think this can be done as a followup.

Thanks for the suggestion. Sure, I will add access control for procedures framework in a followup PR.

I believe we should add user-facing documentation to our website for this framework. Instructions for how to create these distributed procedures.

Yes, I have been thinking about this documentation for several days. When I started thinking about how to write this document, I realized that distributed procedures may need to include two levels. The first level is how developers can define and extend a new subtype of distributed procedure (refer to TableDataRewriteDistributedProcedure). The second level is how developers can implement a concrete distributed procedure with a specific subtype for a particular connector (refer to RewriteDataFilesProcedure on Iceberg). Besides, we currently lack developer documentation for the original procedures as well. Therefore, my thought is that once we finalize the design for extending the procedure framework at both of these levels, I can add all these documents in a dedicated follow-up PR. Does this sound reasonable to you? Also, any suggestions for the document's content would be greatly appreciated!

tdcmeehan
tdcmeehan previously approved these changes Nov 14, 2025
Copy link
Contributor

@tdcmeehan tdcmeehan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks good to me. The abstractions match existing conventions faithfully. I'm looking forward to the extensive tests for Iceberg rewrites.

And expose the procedure registry to the `presto-analyzer` and
`connectors` module
Refactor `Procedure` and `DistributedProcedure` into abstract classes.
Use a subclass `TableDataRewriteDistributedProcedure` for table rewrite
tasks, for example, merge small data files, sort table data,
repartition table data etc. And introduce a new class `LocalProcedure`
to represent the former coordinator-only procedures.

Rename `IProcedureRegistry` to `ProcedureRegistry`, and accordingly
rename previous `ProcedureRegistry` to `BuiltInProcedureRegistry`.
Rename abstract class `Procedure` to `BaseProcedure`, and then rename
`LocalProcedure` back to `Procedure` to maintain backward compatibility
Use `StandardTypes` to check the type of the procedure arguments

Throw a `PrestoException` with error code of `INVALID_ARGUMENTS` rather than an IAE
1. Retain the `Connector.getProcedures()` spi method for backward
compatibility. Add a new generic method to support other
`BaseProcedure` subtypes such as `DistributedProcedure`.

2. Use generics for the `Argument` class hierarchy to enable shared
logic while maintaining backward compatibility.
@hantangwangd
Copy link
Member Author

@tdcmeehan thank you so much for your review throughout the process. I will add the developer documentation ASAP, and add access control for procedure architecture in a followup PR.

@hantangwangd hantangwangd merged commit 2f8bbba into prestodb:master Nov 15, 2025
82 of 83 checks passed
@hantangwangd hantangwangd deleted the support_call_distributed_procedure_part1 branch November 15, 2025 05:05
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants