Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhancement: upload workdir anywhere for deploy #13141

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
19 changes: 17 additions & 2 deletions src/prefect/cli/deployment.py
Original file line number Diff line number Diff line change
Expand Up @@ -1124,6 +1124,13 @@ async def apply(
" storage."
),
),
storage_local_path: str = typer.Option(
None,
"--storage-local-path",
help=(
"Uploads this dir all files to storage, default is `pwd`"
),
),
work_queue_concurrency: int = typer.Option(
None,
"--limit",
Expand Down Expand Up @@ -1163,7 +1170,7 @@ async def apply(
deployment.storage
and "put-directory" in deployment.storage.get_block_capabilities()
):
file_count = await deployment.upload_to_storage()
file_count = await deployment.upload_to_storage(local_path=storage_local_path)
if file_count:
app.console.print(
(
Expand Down Expand Up @@ -1383,6 +1390,13 @@ async def build(
" remote storage."
),
),
storage_local_path: str = typer.Option(
None,
"--storage-local-path",
help=(
"Uploads this dir all files to storage, default is `pwd`"
),
),
cron: str = typer.Option(
None,
"--cron",
Expand Down Expand Up @@ -1625,6 +1639,7 @@ async def build(
output=deployment_loc,
skip_upload=skip_upload,
apply=False,
storage_local_path=storage_local_path,
**init_kwargs,
)
app.console.print(
Expand All @@ -1642,7 +1657,7 @@ async def build(
deployment.storage
and "put-directory" in deployment.storage.get_block_capabilities()
):
file_count = await deployment.upload_to_storage()
file_count = await deployment.upload_to_storage(local_path=storage_local_path)
if file_count:
app.console.print(
(
Expand Down
14 changes: 8 additions & 6 deletions src/prefect/deployments/deployments.py
Original file line number Diff line number Diff line change
Expand Up @@ -773,7 +773,7 @@ async def update(self, ignore_none: bool = False, **kwargs):

@sync_compatible
async def upload_to_storage(
self, storage_block: str = None, ignore_file: str = ".prefectignore"
self, storage_block: str = None, ignore_file: str = ".prefectignore", local_path: str = None,
) -> Optional[int]:
"""
Uploads the workflow this deployment represents using a provided storage block;
Expand All @@ -799,7 +799,7 @@ async def upload_to_storage(

# upload current directory to storage location
file_count = await self.storage.put_directory(
ignore_file=ignore_file, to_path=self.path
ignore_file=ignore_file, local_path=local_path, to_path=self.path
)
elif self.storage:
if "put-directory" not in self.storage.get_block_capabilities():
Expand All @@ -809,7 +809,7 @@ async def upload_to_storage(
)

file_count = await self.storage.put_directory(
ignore_file=ignore_file, to_path=self.path
ignore_file=ignore_file, local_path=local_path, to_path=self.path
)

# persists storage now in case it contains secret values
Expand All @@ -820,7 +820,7 @@ async def upload_to_storage(

@sync_compatible
async def apply(
self, upload: bool = False, work_queue_concurrency: int = None
self, upload: bool = False, work_queue_concurrency: int = None, storage_local_path: str = None,
) -> UUID:
"""
Registers this deployment with the API and returns the deployment's ID.
Expand All @@ -846,7 +846,7 @@ async def apply(
)

if upload:
await self.upload_to_storage()
await self.upload_to_storage(local_path=storage_local_path)

if self.work_queue_name and work_queue_concurrency is not None:
try:
Expand Down Expand Up @@ -937,6 +937,7 @@ async def build_from_flow(
apply: bool = False,
load_existing: bool = True,
schedules: Optional[FlexibleScheduleList] = None,
storage_local_path: str = None,
**kwargs,
) -> "Deployment":
"""
Expand Down Expand Up @@ -964,6 +965,7 @@ async def build_from_flow(
the schedule is active or not.
- An instance of one of the predefined schedule types:
`IntervalSchedule`, `CronSchedule`, or `RRuleSchedule`.
storage_from_path: The path to the storage
**kwargs: other keyword arguments to pass to the constructor for the
`Deployment` class
"""
Expand Down Expand Up @@ -1031,7 +1033,7 @@ async def build_from_flow(
deployment.storage
and "put-directory" in deployment.storage.get_block_capabilities()
):
await deployment.upload_to_storage(ignore_file=ignore_file)
await deployment.upload_to_storage(ignore_file=ignore_file, local_path=storage_local_path)

if output:
await deployment.to_yaml(output)
Expand Down