-
Notifications
You must be signed in to change notification settings - Fork 15
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #2346 from craddm/allowlist
Adding commands to manipulate allowlists
- Loading branch information
Showing
17 changed files
with
961 additions
and
16 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
from .allowlist import Allowlist | ||
|
||
__all__ = ["Allowlist"] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,100 @@ | ||
from __future__ import annotations | ||
|
||
from difflib import unified_diff | ||
from typing import Self | ||
|
||
from data_safe_haven.config import Context | ||
from data_safe_haven.exceptions import DataSafeHavenAzureStorageError | ||
from data_safe_haven.external import AzureSdk | ||
from data_safe_haven.infrastructure import SREProjectManager | ||
from data_safe_haven.types import AllowlistRepository | ||
|
||
|
||
class Allowlist: | ||
"""Allowlist for packages.""" | ||
|
||
def __init__( | ||
self, | ||
repository: AllowlistRepository, | ||
sre_stack: SREProjectManager, | ||
allowlist: str | None = None, | ||
): | ||
self.repository = repository | ||
self.sre_resource_group = sre_stack.output("sre_resource_group") | ||
self.storage_account_name = sre_stack.output("data")[ | ||
"storage_account_data_configuration_name" | ||
] | ||
self.share_name = sre_stack.output("allowlist_share_name") | ||
self.filename = sre_stack.output("allowlist_share_filenames")[repository.value] | ||
self.allowlist = str(allowlist) if allowlist else "" | ||
|
||
@classmethod | ||
def from_remote( | ||
cls: type[Self], | ||
*, | ||
context: Context, | ||
repository: AllowlistRepository, | ||
sre_stack: SREProjectManager, | ||
) -> Self: | ||
azure_sdk = AzureSdk(subscription_name=context.subscription_name) | ||
allowlist = cls(repository=repository, sre_stack=sre_stack) | ||
try: | ||
share_file = azure_sdk.download_share_file( | ||
allowlist.filename, | ||
allowlist.sre_resource_group, | ||
allowlist.storage_account_name, | ||
allowlist.share_name, | ||
) | ||
allowlist.allowlist = share_file | ||
return allowlist | ||
except DataSafeHavenAzureStorageError as exc: | ||
msg = f"Storage account '{cls.storage_account_name}' does not exist." | ||
raise DataSafeHavenAzureStorageError(msg) from exc | ||
|
||
@classmethod | ||
def remote_exists( | ||
cls: type[Self], | ||
context: Context, | ||
*, | ||
repository: AllowlistRepository, | ||
sre_stack: SREProjectManager, | ||
) -> bool: | ||
# Get the Azure SDK | ||
azure_sdk = AzureSdk(subscription_name=context.subscription_name) | ||
|
||
allowlist = cls(repository=repository, sre_stack=sre_stack) | ||
|
||
# Get the file share name | ||
share_list_exists = azure_sdk.file_share_exists( | ||
allowlist.filename, | ||
allowlist.sre_resource_group, | ||
allowlist.storage_account_name, | ||
allowlist.share_name, | ||
) | ||
return share_list_exists | ||
|
||
def upload( | ||
self, | ||
context: Context, | ||
) -> None: | ||
# Get the Azure SDK | ||
azure_sdk = AzureSdk(subscription_name=context.subscription_name) | ||
|
||
azure_sdk.upload_file_share( | ||
self.allowlist, | ||
self.filename, | ||
self.sre_resource_group, | ||
self.storage_account_name, | ||
self.share_name, | ||
) | ||
|
||
def diff(self, other: Allowlist) -> list[str]: | ||
diff = list( | ||
unified_diff( | ||
self.allowlist.splitlines(), | ||
other.allowlist.splitlines(), | ||
fromfile="remote", | ||
tofile="local", | ||
) | ||
) | ||
return diff |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,183 @@ | ||
"""Command group for managing package allowlists""" | ||
|
||
from pathlib import Path | ||
from typing import Annotated, Optional | ||
|
||
import typer | ||
|
||
from data_safe_haven import console | ||
from data_safe_haven.allowlist import Allowlist | ||
from data_safe_haven.config import ContextManager, DSHPulumiConfig, SREConfig | ||
from data_safe_haven.exceptions import DataSafeHavenConfigError, DataSafeHavenError | ||
from data_safe_haven.infrastructure import SREProjectManager | ||
from data_safe_haven.logging import get_logger | ||
from data_safe_haven.types import AllowlistRepository | ||
|
||
allowlist_command_group = typer.Typer() | ||
|
||
|
||
@allowlist_command_group.command() | ||
def show( | ||
name: Annotated[ | ||
str, | ||
typer.Argument(help="Name of SRE to show allowlist for."), | ||
], | ||
repository: Annotated[ | ||
AllowlistRepository, | ||
typer.Argument(help="Name of the repository to show the allowlist for."), | ||
], | ||
file: Annotated[ | ||
Optional[str], # noqa: UP007 | ||
typer.Option(help="File path to write the allowlist to."), | ||
] = None, | ||
) -> None: | ||
"""Print the current package allowlist""" | ||
logger = get_logger() | ||
|
||
try: | ||
context = ContextManager.from_file().assert_context() | ||
except DataSafeHavenConfigError as exc: | ||
logger.critical( | ||
"No context is selected. Use `dsh context add` to create a context " | ||
"or `dsh context switch` to select one." | ||
) | ||
raise typer.Exit(1) from exc | ||
|
||
sre_config = SREConfig.from_remote_by_name(context, name) | ||
|
||
# Load Pulumi config | ||
pulumi_config = DSHPulumiConfig.from_remote(context) | ||
|
||
if sre_config.name not in pulumi_config.project_names: | ||
msg = f"Could not load Pulumi settings for '{sre_config.name}'. Have you deployed the SRE?" | ||
logger.error(msg) | ||
raise typer.Exit(1) | ||
|
||
sre_stack = SREProjectManager( | ||
context=context, | ||
config=sre_config, | ||
pulumi_config=pulumi_config, | ||
) | ||
|
||
try: | ||
allowlist = Allowlist.from_remote( | ||
context=context, repository=repository, sre_stack=sre_stack | ||
) | ||
except DataSafeHavenError as exc: | ||
logger.critical( | ||
"No allowlist is configured. Use `dsh allowlist upload` to create one." | ||
) | ||
raise typer.Exit(1) from exc | ||
|
||
if file: | ||
with open(file, "w") as f: | ||
f.write(allowlist.allowlist) | ||
else: | ||
console.print(allowlist.allowlist) | ||
|
||
|
||
@allowlist_command_group.command() | ||
def template( | ||
repository: Annotated[ | ||
AllowlistRepository, | ||
typer.Argument(help="Name of the repository to show the allowlist for."), | ||
], | ||
file: Annotated[ | ||
Optional[Path], # noqa: UP007 | ||
typer.Option(help="File path to write allowlist template to."), | ||
] = None, | ||
) -> None: | ||
"""Print a template for the package allowlist""" | ||
|
||
template_path = Path( | ||
"data_safe_haven/resources", | ||
"software_repositories", | ||
"allowlists", | ||
f"{repository.value}.allowlist", | ||
) | ||
with open(template_path) as f: | ||
example_allowlist = f.read() | ||
if file: | ||
with open(file, "w") as f: | ||
f.write(example_allowlist) | ||
raise typer.Exit() | ||
else: | ||
console.print(example_allowlist) | ||
|
||
|
||
@allowlist_command_group.command() | ||
def upload( | ||
name: Annotated[ | ||
str, | ||
typer.Argument(help="Name of SRE to upload the allowlist for."), | ||
], | ||
file: Annotated[ | ||
Path, | ||
typer.Argument(help="Path to the allowlist file to upload."), | ||
], | ||
repository: Annotated[ | ||
AllowlistRepository, | ||
typer.Argument(help="Repository type of the allowlist."), | ||
], | ||
force: Annotated[ # noqa: FBT002 | ||
bool, | ||
typer.Option(help="Skip check for existing remote allowlist."), | ||
] = False, | ||
) -> None: | ||
"""Upload a package allowlist""" | ||
context = ContextManager.from_file().assert_context() | ||
logger = get_logger() | ||
|
||
if file.is_file(): | ||
with open(file) as f: | ||
allowlist = f.read() | ||
else: | ||
logger.critical(f"Allowlist file '{file}' not found.") | ||
raise typer.Exit(1) | ||
sre_config = SREConfig.from_remote_by_name(context, name) | ||
|
||
# Load Pulumi config | ||
pulumi_config = DSHPulumiConfig.from_remote(context) | ||
|
||
if sre_config.name not in pulumi_config.project_names: | ||
msg = f"Could not load Pulumi settings for '{sre_config.name}'. Have you deployed the SRE?" | ||
logger.error(msg) | ||
raise typer.Exit(1) | ||
|
||
sre_stack = SREProjectManager( | ||
context=context, | ||
config=sre_config, | ||
pulumi_config=pulumi_config, | ||
) | ||
|
||
local_allowlist = Allowlist( | ||
repository=repository, sre_stack=sre_stack, allowlist=allowlist | ||
) | ||
|
||
if not force and Allowlist.remote_exists( | ||
context=context, | ||
repository=repository, | ||
sre_stack=sre_stack, | ||
): | ||
remote_allowlist = Allowlist.from_remote( | ||
context=context, | ||
repository=repository, | ||
sre_stack=sre_stack, | ||
) | ||
if allow_diff := remote_allowlist.diff(local_allowlist): | ||
for line in list(filter(None, "\n".join(allow_diff).splitlines())): | ||
logger.info(line) | ||
if not console.confirm( | ||
f"An allowlist already exists for {repository.name}. Do you want to overwrite it?", | ||
default_to_yes=True, | ||
): | ||
raise typer.Exit() | ||
else: | ||
console.print("No changes, won't upload allowlist.") | ||
raise typer.Exit() | ||
try: | ||
logger.info(f"Uploading allowlist for {repository.name} to {sre_config.name}") | ||
local_allowlist.upload(context=context) | ||
except DataSafeHavenError as exc: | ||
logger.error(f"Failed to upload allowlist: {exc}") | ||
raise typer.Exit(1) from exc |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.