Skip to content

Commit 84506b8

Browse files
committed
Add option to specify executeProject
1 parent fbbdd3a commit 84506b8

File tree

3 files changed

+18
-2
lines changed

3 files changed

+18
-2
lines changed

python/batch-predictor/merlinpyspark/config.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,11 @@ def table(self) -> str:
140140

141141
def project(self) -> str:
142142
return self._project
143+
144+
def execute_project(self) -> str:
145+
if self.options() is not None and "execute_project" in self.options():
146+
return self.options()["execute_project"]
147+
return ""
143148

144149
def schema(self) -> str:
145150
return self._schema
@@ -246,6 +251,11 @@ def result_column(self) -> str:
246251
def options(self) -> MutableMapping[str, str]:
247252
return self._proto.options
248253

254+
def execute_project(self) -> str:
255+
if self.options() is not None and "execute_project" in self.options():
256+
return self.options()["execute_project"]
257+
return ""
258+
249259
def save_mode(self) -> str:
250260
return SaveMode.Name(self._proto.save_mode).lower()
251261

python/batch-predictor/merlinpyspark/sink.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,10 @@ def __init__(self, spark_session: SparkSession, config: MaxComputeSinkConfig):
7373
self._spark = spark_session
7474

7575
def get_jdbc_url(self):
76-
return f"jdbc:odps:{self._config.endpoint()}?project={self._config.project()}&accessId={self.get_access_id()}&accessKey={self.get_access_key()}&interactiveMode={self.get_interactive_mode()}&odpsNamespaceSchema=true&schema={self._config.schema()}&enableLimit=false"
76+
url = f"jdbc:odps:{self._config.endpoint()}?project={self._config.project()}&accessId={self.get_access_id()}&accessKey={self.get_access_key()}&interactiveMode={self.get_interactive_mode()}&odpsNamespaceSchema=true&schema={self._config.schema()}&enableLimit=false&alwaysFallback=true&enableOdpsLogger=true"
77+
if self._config.execute_project() is not None:
78+
url += f"&executeProject={self._config.execute_project()}"
79+
return url
7780

7881
def get_query_timeout(self):
7982
return self._config.options().get("queryTimeout", "300")

python/batch-predictor/merlinpyspark/source.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,10 @@ def features(self) -> Iterable[str]:
154154
return self._config.features()
155155

156156
def get_jdbc_url(self):
157-
return f"jdbc:odps:{self._config.endpoint()}?project={self._config.project()}&accessId={self.get_access_id()}&accessKey={self.get_access_key()}&interactiveMode={self.get_interactive_mode()}&odpsNamespaceSchema=true&schema={self._config.schema()}&enableLimit=false&autoSelectLimit={self.auto_select_limit}&alwaysFallback=true&enableOdpsLogger=true"
157+
url = f"jdbc:odps:{self._config.endpoint()}?project={self._config.project()}&accessId={self.get_access_id()}&accessKey={self.get_access_key()}&interactiveMode={self.get_interactive_mode()}&odpsNamespaceSchema=true&schema={self._config.schema()}&enableLimit=false&autoSelectLimit={self.auto_select_limit}&alwaysFallback=true&enableOdpsLogger=true"
158+
if self._config.execute_project() is not None:
159+
url += f"&executeProject={self._config.execute_project()}"
160+
return url
158161

159162
def get_query_timeout(self):
160163
return self._config.options().get("queryTimeout", "300")

0 commit comments

Comments
 (0)