Skip to content

Commit 96cf05d

Browse files
authored
fix: add option to specify executeProject (#650)
<!-- Thanks for sending a pull request! Here are some tips for you: 1. Run unit tests and ensure that they are passing 2. If your change introduces any API changes, make sure to update the e2e tests 3. Make sure documentation is updated for your PR! --> # Description <!-- Briefly describe the motivation for the change. Please include illustrations where appropriate. --> * Allow users to specify executeProject in options of MaxComputeSource * For example: ``` mc_source = MaxComputeSource( table="some_other_project.data_science_platform_playground.batch_prediction_test_3", features=["sepal_length", "sepal_width", "petal_length", "petal_width"], endpoint="https://service.ap-southeast-5.maxcompute.aliyun.com/api", options={"execute_project": "project_a"} ) ``` This will `project_a` to execute the maxcompute job, even if the table being accessed is in `some_other_project` cc @mbruner # Modifications <!-- Summarize the key code changes. --> # Tests <!-- Besides the existing / updated automated tests, what specific scenarios should be tested? Consider the backward compatibility of the changes, whether corner cases are covered, etc. Please describe the tests and check the ones that have been completed. Eg: - [x] Deploying new and existing standard models - [ ] Deploying PyFunc models --> # Checklist - [x] Added PR label - [ ] Added unit test, integration, and/or e2e tests - [ ] Tested locally - [ ] Updated documentation - [ ] Update Swagger spec if the PR introduce API changes - [ ] Regenerated Golang and Python client if the PR introduces API changes # Release Notes <!-- Does this PR introduce a user-facing change? If no, just write "NONE" in the release-note block below. If yes, a release note is required. Enter your extended release note in the block below. If the PR requires additional action from users switching to the new release, include the string "action required". For more information about release notes, see kubernetes' guide here: http://git.k8s.io/community/contributors/guide/release-notes.md --> ```release-note ```
1 parent fbbdd3a commit 96cf05d

File tree

3 files changed

+22
-3
lines changed

3 files changed

+22
-3
lines changed

python/batch-predictor/merlinpyspark/config.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,8 @@ class MaxComputeSourceConfig(SourceConfig):
124124

125125
def __init__(self, mc_src_proto: MaxComputeSource):
126126
self._proto = mc_src_proto
127-
self._project, self._schema, self._table = self._proto.table.split(".")
127+
self._project, self._schema, _ = self._proto.table.split(".")
128+
self._table = self._proto.table
128129

129130
def source_type(self) -> str:
130131
return self.TYPE
@@ -140,6 +141,11 @@ def table(self) -> str:
140141

141142
def project(self) -> str:
142143
return self._project
144+
145+
def execute_project(self) -> str:
146+
if self.options() is not None and "execute_project" in self.options():
147+
return self.options()["execute_project"]
148+
return ""
143149

144150
def schema(self) -> str:
145151
return self._schema
@@ -232,6 +238,8 @@ class MaxComputeSinkConfig(SinkConfig):
232238

233239
def __init__(self, mc_sink_proto: MaxComputeSink):
234240
self._proto = mc_sink_proto
241+
# NOTE: the table for the sink is not in the format of `project.schema.table`
242+
# because the INSERT statement in MaxCompute only supports `table`
235243
self._project, self._schema, self._table = self._proto.table.split(".")
236244

237245
def sink_type(self) -> str:
@@ -246,6 +254,11 @@ def result_column(self) -> str:
246254
def options(self) -> MutableMapping[str, str]:
247255
return self._proto.options
248256

257+
def execute_project(self) -> str:
258+
if self.options() is not None and "execute_project" in self.options():
259+
return self.options()["execute_project"]
260+
return ""
261+
249262
def save_mode(self) -> str:
250263
return SaveMode.Name(self._proto.save_mode).lower()
251264

python/batch-predictor/merlinpyspark/sink.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,10 @@ def __init__(self, spark_session: SparkSession, config: MaxComputeSinkConfig):
7373
self._spark = spark_session
7474

7575
def get_jdbc_url(self):
76-
return f"jdbc:odps:{self._config.endpoint()}?project={self._config.project()}&accessId={self.get_access_id()}&accessKey={self.get_access_key()}&interactiveMode={self.get_interactive_mode()}&odpsNamespaceSchema=true&schema={self._config.schema()}&enableLimit=false"
76+
url = f"jdbc:odps:{self._config.endpoint()}?project={self._config.project()}&accessId={self.get_access_id()}&accessKey={self.get_access_key()}&interactiveMode={self.get_interactive_mode()}&odpsNamespaceSchema=true&schema={self._config.schema()}&enableLimit=false&alwaysFallback=true&enableOdpsLogger=true"
77+
if self._config.execute_project() is not None:
78+
url += f"&executeProject={self._config.execute_project()}"
79+
return url
7780

7881
def get_query_timeout(self):
7982
return self._config.options().get("queryTimeout", "300")

python/batch-predictor/merlinpyspark/source.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,10 @@ def features(self) -> Iterable[str]:
154154
return self._config.features()
155155

156156
def get_jdbc_url(self):
157-
return f"jdbc:odps:{self._config.endpoint()}?project={self._config.project()}&accessId={self.get_access_id()}&accessKey={self.get_access_key()}&interactiveMode={self.get_interactive_mode()}&odpsNamespaceSchema=true&schema={self._config.schema()}&enableLimit=false&autoSelectLimit={self.auto_select_limit}&alwaysFallback=true&enableOdpsLogger=true"
157+
url = f"jdbc:odps:{self._config.endpoint()}?project={self._config.project()}&accessId={self.get_access_id()}&accessKey={self.get_access_key()}&interactiveMode={self.get_interactive_mode()}&odpsNamespaceSchema=true&schema={self._config.schema()}&enableLimit=false&autoSelectLimit={self.auto_select_limit}&alwaysFallback=true&enableOdpsLogger=true"
158+
if self._config.execute_project() is not None:
159+
url += f"&executeProject={self._config.execute_project()}"
160+
return url
158161

159162
def get_query_timeout(self):
160163
return self._config.options().get("queryTimeout", "300")

0 commit comments

Comments
 (0)