Skip to content

[CDF-24982] 🧑‍🎨 Interactive select canvas #1673

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 22 commits into
base: canvas-api
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 50 additions & 2 deletions cognite_toolkit/_cdf_tk/apps/_migrate_app.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
from pathlib import Path
from typing import Annotated, Any
from typing import Annotated, Any, Optional

import typer

from cognite_toolkit._cdf_tk.commands import MigrateTimeseriesCommand, MigrationPrepareCommand
from cognite_toolkit._cdf_tk.commands import MigrateTimeseriesCommand, MigrationCanvasCommand, MigrationPrepareCommand
from cognite_toolkit._cdf_tk.utils.auth import EnvironmentVariables


Expand All @@ -13,6 +13,8 @@ def __init__(self, *args: Any, **kwargs: Any) -> None:
self.callback(invoke_without_command=True)(self.main)
self.command("prepare")(self.prepare)
self.command("timeseries")(self.timeseries)
# Uncomment when the Canvas migration command is ready
# self.command("canvas")(self.canvas)

def main(self, ctx: typer.Context) -> None:
"""Migrate resources from Asset-Centric to data modeling in CDF."""
Expand Down Expand Up @@ -99,3 +101,49 @@ def timeseries(
verbose=verbose,
)
)

@staticmethod
def canvas(
ctx: typer.Context,
external_id: Annotated[
Optional[list[str]],
typer.Argument(
help="The external ID of the Canvas to migrate. If not provided, and interactive selection will be "
"performed to select the Canvas to migrate."
),
] = None,
dry_run: Annotated[
bool,
typer.Option(
"--dry-run",
"-d",
help="If set, the migration will not be executed, but only a report of "
"what would be done is printed. This is useful for checking that all resources referenced by the Canvas"
"have been migrated to the new data modeling resources in CDF.",
),
] = False,
verbose: Annotated[
bool,
typer.Option(
"--verbose",
"-v",
help="Turn on to get more verbose output when running the command",
),
] = False,
) -> None:
"""Migrate Canvas applications from Asset-Centric to data modeling in CDF.

This command expects that the CogniteMigration data model is already deployed, and that the Mapping view
is populated with the mapping from Asset-Centric resources to the new data modeling resources.
"""
client = EnvironmentVariables.create_from_environment().get_client()

cmd = MigrationCanvasCommand()
cmd.run(
lambda: cmd.migrate_canvas(
client,
external_ids=external_id,
dry_run=dry_run,
verbose=verbose,
)
)
3 changes: 2 additions & 1 deletion cognite_toolkit/_cdf_tk/commands/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from ._migrate import MigrateTimeseriesCommand, MigrationPrepareCommand
from ._migrate import MigrateTimeseriesCommand, MigrationCanvasCommand, MigrationPrepareCommand
from ._populate import PopulateCommand
from ._profile import ProfileAssetCentricCommand
from ._purge import PurgeCommand
Expand Down Expand Up @@ -27,6 +27,7 @@
"FeatureFlagCommand",
"InitCommand",
"MigrateTimeseriesCommand",
"MigrationCanvasCommand",
"MigrationPrepareCommand",
"ModulesCommand",
"PopulateCommand",
Expand Down
3 changes: 2 additions & 1 deletion cognite_toolkit/_cdf_tk/commands/_migrate/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from .canvas import MigrationCanvasCommand
from .prepare import MigrationPrepareCommand
from .timeseries import MigrateTimeseriesCommand

__all__ = ["MigrateTimeseriesCommand", "MigrationPrepareCommand"]
__all__ = ["MigrateTimeseriesCommand", "MigrationCanvasCommand", "MigrationPrepareCommand"]
18 changes: 18 additions & 0 deletions cognite_toolkit/_cdf_tk/commands/_migrate/canvas.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from rich import print

from cognite_toolkit._cdf_tk.client import ToolkitClient
from cognite_toolkit._cdf_tk.commands._base import ToolkitCommand
from cognite_toolkit._cdf_tk.utils import humanize_collection
from cognite_toolkit._cdf_tk.utils.interactive_select import InteractiveCanvasSelect


class MigrationCanvasCommand(ToolkitCommand):
def migrate_canvas(
self,
client: ToolkitClient,
external_ids: list[str] | None = None,
dry_run: bool = False,
verbose: bool = False,
) -> None:
external_ids = external_ids or InteractiveCanvasSelect(client).select_external_ids()
print(f"Would migrate {len(external_ids)} canvases: {humanize_collection(external_ids)}")
94 changes: 94 additions & 0 deletions cognite_toolkit/_cdf_tk/utils/interactive_select.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
from abc import abstractmethod
from collections import defaultdict
from dataclasses import dataclass
from functools import lru_cache
from typing import ClassVar, Literal

import questionary
from cognite.client.data_classes import (
Expand All @@ -11,9 +14,13 @@
EventFilter,
FileMetadataFilter,
TimeSeriesFilter,
filters,
)
from cognite.client.data_classes.data_modeling import NodeList

from cognite_toolkit._cdf_tk.client import ToolkitClient
from cognite_toolkit._cdf_tk.client.data_classes.canvas import Canvas
from cognite_toolkit._cdf_tk.exceptions import ToolkitValueError


class AssetCentricInteractiveSelect:
Expand Down Expand Up @@ -153,3 +160,90 @@ def _aggregate_count(self, hierarchies: list[str], data_sets: list[str]) -> int:
asset_subtree_ids=[{"externalId": item} for item in hierarchies] or None,
)
)


@dataclass
class CanvasFilter:
visibility: Literal["public", "private"] | None = None
created_by: Literal["user"] | None = None
select_all: bool = False

def as_dms_filter(self) -> filters.Filter:
canvas_id = Canvas.get_source()
leaf_filters: list[filters.Filter] = [
filters.Not(filters.Equals(canvas_id.as_property_ref("isArchived"), True)),
# When sourceCanvasId is not set, we get the newest version of the canvas
filters.Not(filters.Exists(canvas_id.as_property_ref("sourceCanvasId"))),
]
if self.visibility is not None:
leaf_filters.append(filters.Equals(canvas_id.as_property_ref("visibility"), self.visibility))

return filters.And(*leaf_filters)


class InteractiveCanvasSelect:
opening_choices: ClassVar[list[questionary.Choice]] = [
questionary.Choice(title="All public Canvases", value=CanvasFilter(visibility="public", select_all=True)),
questionary.Choice(title="Selected public Canvases", value=CanvasFilter(visibility="public", select_all=False)),
questionary.Choice(title="All by given user", value=CanvasFilter(created_by="user", select_all=True)),
questionary.Choice(title="Selected by given user", value=CanvasFilter(created_by="user", select_all=False)),
questionary.Choice(title="All Canvases", value=CanvasFilter(visibility=None, select_all=True)),
]

def __init__(self, client: ToolkitClient) -> None:
self.client = client

def select_external_ids(self) -> list[str]:
select_filter = self._select_filter()

return self._select_external_ids(select_filter)

@classmethod
def _select_filter(cls) -> CanvasFilter:
user_response = questionary.select(
"Which Canvases do you want to select?",
choices=cls.opening_choices,
).ask()
if user_response is None:
raise ToolkitValueError("No Canvas selection made. Aborting.")
return user_response

def _select_external_ids(self, select_filter: CanvasFilter) -> list[str]:
available_canvases = self.client.canvas.list(filter=select_filter.as_dms_filter(), limit=-1)
if select_filter.select_all and select_filter.created_by is None:
return [canvas.external_id for canvas in available_canvases]
users = self.client.iam.user_profiles.list(limit=-1)
display_name_by_user_identifier = {user.user_identifier: user.display_name or "missing" for user in users}
if select_filter.created_by == "user":
canvas_by_user: dict[str, list[Canvas]] = defaultdict(list)
for canvas in available_canvases:
canvas_by_user[canvas.created_by].append(canvas)

user_response = questionary.select(
"Which user do you want to select Canvases for?",
choices=[
questionary.Choice(
title=f"{user.display_name} ({user.user_identifier})",
value=canvas_by_user[user.user_identifier],
)
for user in users
if user.user_identifier in canvas_by_user
],
).ask()
available_canvases = NodeList[Canvas](user_response)

if select_filter.select_all:
return [canvas.external_id for canvas in available_canvases]

selected_canvases = questionary.checkbox(
"Select Canvases",
choices=[
questionary.Choice(
title=f"{canvas.name} (Created by {display_name_by_user_identifier[canvas.created_by]!r}, last updated {canvas.updated_at!r})",
value=canvas.external_id,
)
for canvas in available_canvases
],
).ask()

return selected_canvases or []
4 changes: 2 additions & 2 deletions tests/auth_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ class EnvironmentVariables:
LOGIN_FLOW: _LOGIN_FLOW = "infer"
IDP_CLIENT_ID: str | None = None
IDP_CLIENT_SECRET: str | None = None
TOKEN: str | None = None
CDF_TOKEN: str | None = None
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Small bug in the get_toolkit_client which is not part of the Toolkit package, but used in manual testing.


IDP_TENANT_ID: str | None = None
IDP_TOKEN_URL: str | None = None
Expand Down Expand Up @@ -149,7 +149,7 @@ def create_from_environ(cls) -> "EnvironmentVariables":
LOGIN_FLOW=os.environ.get("LOGIN_FLOW", "infer"), # type: ignore[arg-type]
IDP_CLIENT_ID=os.environ.get("IDP_CLIENT_ID"),
IDP_CLIENT_SECRET=os.environ.get("IDP_CLIENT_SECRET"),
TOKEN=os.environ.get("TOKEN"),
CDF_TOKEN=os.environ.get("CDF_TOKEN"),
CDF_URL=os.environ.get("CDF_URL"),
IDP_TOKEN_URL=os.environ.get("IDP_TOKEN_URL"),
IDP_TENANT_ID=os.environ.get("IDP_TENANT_ID"),
Expand Down
130 changes: 130 additions & 0 deletions tests/test_unit/test_cdf_tk/test_utils/test_interactive_select.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,24 @@
from datetime import datetime

import pytest
from cognite.client.data_classes import (
Asset,
CountAggregate,
DataSet,
UserProfile,
UserProfileList,
)
from cognite.client.data_classes.data_modeling import NodeList
from questionary import Choice

from cognite_toolkit._cdf_tk.client.data_classes.canvas import CANVAS_INSTANCE_SPACE, Canvas
from cognite_toolkit._cdf_tk.client.testing import monkeypatch_toolkit_client
from cognite_toolkit._cdf_tk.exceptions import ToolkitValueError
from cognite_toolkit._cdf_tk.utils.interactive_select import (
AssetInteractiveSelect,
EventInteractiveSelect,
FileMetadataInteractiveSelect,
InteractiveCanvasSelect,
TimeSeriesInteractiveSelect,
)
from tests.test_unit.utils import MockQuestionary
Expand Down Expand Up @@ -156,3 +165,124 @@ def select_hierarchy(choices: list[Choice]) -> list[str]:

assert selected_hierarchy == ["Root2"]
assert selected_dataset == ["dataset3"]


class TestInteractiveCanvasSelect:
@pytest.mark.parametrize(
"selected_cdf, answers, expected_selected, expected_options",
[
pytest.param({}, ["All Canvases"], [], None, id="No canvases in CDF"),
pytest.param(
{"Public1", "Public2", "Private1", "Private2"},
["All Canvases"],
["Public1", "Public2", "Private1", "Private2"],
None,
id="All canvases selected",
),
pytest.param(
{"Public1", "Public2"},
["All public Canvases"],
["Public1", "Public2"],
None,
id="All public canvases selected",
),
pytest.param(
{"Public1", "Public2"},
["Selected public Canvases", {"Public1"}],
["Public1"],
2,
id="Selected public canvases",
),
pytest.param(
{"Public1", "Public2", "Private1", "Private2"},
["All by given user", "Marge Simpson (marge)"],
["Public2", "Private1"],
None,
id="All by given user",
),
pytest.param(
{"Public1", "Public2", "Private1", "Private2"},
["Selected by given user", "Marge Simpson (marge)", {"Public2"}],
["Public2"],
2,
id="Selected by given user",
),
],
)
def test_interactive_selection(
self,
selected_cdf: set[str],
answers: list,
expected_selected: list[str],
expected_options: int | None,
monkeypatch,
) -> None:
default_args = dict(
space=CANVAS_INSTANCE_SPACE,
version=1,
last_updated_time=1,
created_time=1,
updated_by="Irrelevant",
updated_at=datetime.now(),
)
cdf_canvases = [
Canvas(external_id="Public1", name="Canvas 1", visibility="public", created_by="homer", **default_args),
Canvas(external_id="Public2", name="Canvas 2", visibility="public", created_by="marge", **default_args),
Canvas(external_id="Private1", name="Private 1", visibility="private", created_by="marge", **default_args),
Canvas(external_id="Private2", name="Private 2", visibility="private", created_by="homer", **default_args),
]
first_answer_by_choice_title = {c.title: c.value for c in InteractiveCanvasSelect.opening_choices}
assert len(answers) >= 1, "At least one answer is required to select a canvas"
assert answers[0] in first_answer_by_choice_title, "Bug in test data. First answer must be a choice title"
if "user" in answers[0] and len(answers) >= 2:
user_title = answers[1]

def select_user(choices: list[Choice]) -> str:
assert len(choices) == 2
user_choice = next((c for c in choices if c.title == user_title), None)
assert user_choice is not None, f"Bug in test data. User choice '{user_title}' not found in choices"
return user_choice.value

answers[1] = select_user
answers[0] = first_answer_by_choice_title[answers[0]]

if expected_options is not None:
# Last question is which canvases to select.
last_selection = answers[-1]

def select_canvases(choices: list[Choice]) -> list[str]:
assert len(choices) == expected_options, (
f"Expected {expected_options} choices, but got {len(choices)} choices: {choices}"
)
return [choice.value for choice in choices if choice.value in last_selection]

answers[-1] = select_canvases

with (
monkeypatch_toolkit_client() as client,
MockQuestionary(InteractiveCanvasSelect.__module__, monkeypatch, answers),
):
client.canvas.list.return_value = NodeList[Canvas](
[canvas for canvas in cdf_canvases if canvas.external_id in selected_cdf]
)
client.iam.user_profiles.list.return_value = UserProfileList(
[
UserProfile(user_identifier="homer", display_name="Homer Simpson", last_updated_time=1),
UserProfile(user_identifier="marge", display_name="Marge Simpson", last_updated_time=1),
]
)
selector = InteractiveCanvasSelect(client)
selected_external_ids = selector.select_external_ids()

assert selected_external_ids == expected_selected

def test_interactive_abort_selection(self, monkeypatch) -> None:
answers = [None]
with (
monkeypatch_toolkit_client() as client,
MockQuestionary(InteractiveCanvasSelect.__module__, monkeypatch, answers),
):
selector = InteractiveCanvasSelect(client)
with pytest.raises(ToolkitValueError) as exc_info:
selector.select_external_ids()
assert str(exc_info.value) == "No Canvas selection made. Aborting."