Skip to content

[prep CDF-24982] 🫅 Canvas industrial API #1683

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 20 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cognite_toolkit/_cdf_tk/client/_constants.py
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
DATA_MODELING_MAX_WRITE_WORKERS = 4
DATA_MODELING_MAX_DELETE_WORKERS = 2
35 changes: 33 additions & 2 deletions cognite_toolkit/_cdf_tk/client/api/canvas.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,24 @@
from typing import Any, overload

from cognite.client._constants import DEFAULT_LIMIT_READ
from cognite.client.data_classes.data_modeling import InstanceSort, NodeApplyResultList, NodeId, NodeList
from cognite.client.data_classes.data_modeling import (
EdgeId,
InstanceSort,
NodeApplyResultList,
NodeId,
NodeList,
)
from cognite.client.data_classes.filters import Filter
from cognite.client.utils.useful_types import SequenceNotStr

from cognite_toolkit._cdf_tk.client.data_classes.canvas import CANVAS_INSTANCE_SPACE, Canvas, CanvasApply
from cognite_toolkit._cdf_tk.client.data_classes.canvas import (
CANVAS_INSTANCE_SPACE,
Canvas,
CanvasApply,
IndustrialCanvas,
IndustrialCanvasApply,
)
from cognite_toolkit._cdf_tk.client.data_classes.instances import InstancesApplyResultList

from .extended_data_modeling import ExtendedInstancesAPI

Expand All @@ -15,6 +28,7 @@ class CanvasAPI:
def __init__(self, instance_api: ExtendedInstancesAPI) -> None:
self._instance_api = instance_api
self.instance_space = CANVAS_INSTANCE_SPACE
self.industrial = IndustrialCanvasAPI(instance_api)

def upsert(self, canvas: CanvasApply | Sequence[CanvasApply]) -> NodeApplyResultList:
return self._instance_api.apply(canvas).nodes
Expand Down Expand Up @@ -49,3 +63,20 @@ def list(
return self._instance_api.list(
instance_type=Canvas, space=self.instance_space, limit=limit, sort=sort, filter=filter
)


class IndustrialCanvasAPI:
def __init__(self, instance_api: ExtendedInstancesAPI) -> None:
self._instance_api = instance_api

def retrieve(self, external_id: str) -> IndustrialCanvas:
query = IndustrialCanvas._create_query(external_id)
result = self._instance_api.query(query)
return IndustrialCanvas._load(result)

def upsert(self, canvas: IndustrialCanvasApply) -> InstancesApplyResultList:
return self._instance_api.apply_fast(canvas.as_instances())

def delete(self, canvas: IndustrialCanvasApply) -> list[NodeId | EdgeId]:
# Solution tags are used by multiple canvases, so we do not include them in the deletion.
return self._instance_api.delete_fast(canvas.as_instance_ids(include_solution_tags=False))
70 changes: 68 additions & 2 deletions cognite_toolkit/_cdf_tk/client/api/extended_data_modeling.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,14 @@
from cognite.client._api_client import T
from cognite.client._cognite_client import ClientConfig, CogniteClient
from cognite.client._http_client import HTTPClient, HTTPClientConfig, get_global_requests_session
from cognite.client.data_classes.data_modeling import InstanceApply
from cognite.client.data_classes.data_modeling import EdgeId, InstanceApply, NodeId
from cognite.client.exceptions import CogniteConnectionError, CogniteReadTimeout
from cognite.client.utils import _json
from cognite.client.utils._concurrency import execute_tasks
from cognite.client.utils._identifier import InstanceId
from requests import Response

from cognite_toolkit._cdf_tk.client._constants import DATA_MODELING_MAX_WRITE_WORKERS
from cognite_toolkit._cdf_tk.client._constants import DATA_MODELING_MAX_DELETE_WORKERS, DATA_MODELING_MAX_WRITE_WORKERS
from cognite_toolkit._cdf_tk.client.data_classes.instances import InstancesApplyResultList
from cognite_toolkit._cdf_tk.client.utils._concurrency import ToolkitConcurrencySettings
from cognite_toolkit._cdf_tk.client.utils._http_client import ToolkitRetryTracker
Expand Down Expand Up @@ -117,6 +118,71 @@ def str_format_element(el: T) -> str | T:

return InstancesApplyResultList._load(created_resources)

def delete_fast(
self,
instance_ids: Sequence[NodeId | EdgeId],
) -> list[NodeId | EdgeId]:
"""`Delete one or more instances <https://developer.cognite.com/api#tag/Instances/operation/deleteBulk>`_

Args:
instance_ids (Sequence[NodeId | EdgeId]): A sequence of NodeId or EdgeId instances to delete.

Returns:
list[NodeId | EdgeId]: A list of NodeId or EdgeId instances that were successfully deleted. An empty
list is returned if no instances were deleted.
"""
tasks = [
(f"{self._RESOURCE_PATH}/delete", task_items, ToolkitRetryTracker(self._http_client_with_retry.config))
for task_items in self._prepare_item_chunks(
[
{"space": item.space, "externalId": item.external_id, "instanceType": item._instance_type}
for item in instance_ids
],
self._DELETE_LIMIT,
None,
)
]
summary = execute_tasks(
self._post_with_item_reduction_retry,
tasks,
max_workers=min(self._config.max_workers, DATA_MODELING_MAX_DELETE_WORKERS),
executor=ToolkitConcurrencySettings.get_data_modeling_delete_executor(),
)

def unwrap_element(el: T) -> InstanceApply | T:
if isinstance(el, dict) and "instanceType" in el:
if el["instanceType"] == "node":
return NodeId.load(el)
elif el["instanceType"] == "edge":
return EdgeId.load(el)
return el
else:
return el

def str_format_element(el: T) -> str | T:
if isinstance(el, InstanceId):
return f"{el.space}:{el.external_id}"
return el

summary.raise_compound_exception_if_failed_tasks(
task_unwrap_fn=lambda task: task[1]["items"],
task_list_element_unwrap_fn=unwrap_element,
str_format_element_fn=str_format_element,
)
deleted_resources = summary.joined_results(lambda res: res.json()["items"])
result: list[NodeId | EdgeId] = []
for resource in deleted_resources:
if "instanceType" not in resource:
raise ValueError("Resource must contain 'instanceType' key.")
instance_type = resource.get("instanceType")
if instance_type == "node":
result.append(NodeId.load(resource))
elif instance_type == "edge":
result.append(EdgeId.load(resource))
else:
raise TypeError(f"Resource must be a NodeId or EdgeId, not {instance_type}.")
return result

def _post_with_item_reduction_retry(
self,
url_path: str,
Expand Down
233 changes: 231 additions & 2 deletions cognite_toolkit/_cdf_tk/client/data_classes/canvas.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,29 @@
from collections.abc import Sequence
from datetime import datetime

from cognite.client.data_classes.data_modeling import DirectRelationReference
from cognite.client.data_classes.data_modeling.ids import ViewId
from cognite.client.data_classes.data_modeling import DirectRelationReference, filters, query
from cognite.client.data_classes.data_modeling.ids import EdgeId, NodeId, ViewId
from cognite.client.data_classes.data_modeling.instances import (
EdgeApply,
InstanceApply,
Node,
NodeApply,
NodeListWithCursor,
PropertyOptions,
T_Node,
TypedNode,
TypedNodeApply,
)
from cognite.client.data_classes.data_modeling.query import QueryResult

CANVAS_INSTANCE_SPACE = "IndustrialCanvasInstanceSpace"
SOLUTION_TAG_SPACE = "SolutionTagsInstanceSpace"
CANVAS_SCHEMA_SPACE = "cdf_industrial_canvas"
ANNOTATION_EDGE_TYPE = DirectRelationReference(CANVAS_SCHEMA_SPACE, "referencesCanvasAnnotation")
CONTAINER_REFERENCE_EDGE_TYPE = DirectRelationReference(CANVAS_SCHEMA_SPACE, "referencesContainerReference")
FDM_CONTAINER_REFERENCE_EDGE_TYPE = DirectRelationReference(
CANVAS_SCHEMA_SPACE, "referencesFdmInstanceContainerReference"
)


class _CanvasProperties:
Expand Down Expand Up @@ -749,3 +763,218 @@ def as_write(self) -> FdmInstanceContainerReferenceApply:
existing_version=self.version,
type=self.type,
)


class IndustrialCanvasApply:
"""This class represents the writing format of IndustrialCanvas.
It is used to when data is written to CDF.
Args:
canvas: The Canvas object.
annotations: A list of CanvasAnnotation objects.
container_references: A list of ContainerReference objects.
fdm_instance_container_references: A list of FdmInstanceContainerReference objects.
solution_tags: A list of CogniteSolutionTag objects.
"""

def __init__(
self,
canvas: CanvasApply,
annotations: Sequence[CanvasAnnotationApply] | None = None,
container_references: Sequence[ContainerReferenceApply] | None = None,
fdm_instance_container_references: Sequence[FdmInstanceContainerReferenceApply] | None = None,
solution_tags: Sequence[CogniteSolutionTagApply] | None = None,
) -> None:
self.canvas = canvas
self.annotations = annotations or []
self.container_references = container_references or []
self.fdm_instance_container_references = fdm_instance_container_references or []
self.solution_tags = solution_tags or []

def as_instances(self) -> list[InstanceApply]:
"""Convert the IndustrialCanvasApply to a list of InstanceApply objects."""
instances: list[InstanceApply] = [self.canvas]
instances.extend(self.annotations)
instances.extend(self.container_references)
instances.extend(self.fdm_instance_container_references)
instances.extend(self.solution_tags)
for items, edge_type in [
(self.annotations, ANNOTATION_EDGE_TYPE),
(self.container_references, CONTAINER_REFERENCE_EDGE_TYPE),
(self.fdm_instance_container_references, FDM_CONTAINER_REFERENCE_EDGE_TYPE),
]:
for item in items:
instances.append(
EdgeApply(
space=CANVAS_INSTANCE_SPACE,
external_id=f"{self.canvas.external_id}_{item.external_id}",
start_node=DirectRelationReference(
space=self.canvas.space, external_id=self.canvas.external_id
),
end_node=DirectRelationReference(space=item.space, external_id=item.external_id),
type=edge_type,
)
)

return instances

def as_id(self) -> str:
return self.canvas.external_id

def as_instance_ids(self, include_solution_tags: bool = False) -> list[NodeId | EdgeId]:
"""Return a list of IDs for the instances in the IndustrialCanvasApply."""
instances = self.as_instances()
ids: list[NodeId | EdgeId] = []
for instance in instances:
if isinstance(instance, NodeApply) and (
include_solution_tags or not isinstance(instance, CogniteSolutionTagApply)
):
ids.append(NodeId(instance.space, instance.external_id))
elif isinstance(instance, EdgeApply):
ids.append(EdgeId(instance.space, instance.external_id))
else:
raise TypeError(f"Unexpected instance type: {type(instance)}")
return ids

def dump(self, exclude_existing_version: bool = False) -> dict[str, object]:
"""Dump the IndustrialCanvasApply to a dictionary."""
output: dict[str, object] = {
"canvas": self.canvas.dump(),
"annotations": [annotation.dump() for annotation in self.annotations],
"containerReferences": [container_ref.dump() for container_ref in self.container_references],
"fdmInstanceContainerReferences": [
fdm_instance_container_ref.dump()
for fdm_instance_container_ref in self.fdm_instance_container_references
],
"solutionTags": [solution_tag.dump() for solution_tag in self.solution_tags],
}
if exclude_existing_version:
for key in list(output.keys()):
value = output[key]
if isinstance(value, list):
for item in value:
if isinstance(item, dict) and "existingVersion" in item:
del item["existingVersion"]
elif isinstance(value, dict) and "existingVersion" in value:
del value["existingVersion"]
return output


class IndustrialCanvas:
"""This class represents one instances of the Canvas with all connected data."""

def __init__(
self,
canvas: Canvas,
annotations: NodeListWithCursor[CanvasAnnotation] | None = None,
container_references: NodeListWithCursor[ContainerReference] | None = None,
fdm_instance_container_references: NodeListWithCursor[FdmInstanceContainerReference] | None = None,
solution_tags: NodeListWithCursor[CogniteSolutionTag] | None = None,
) -> None:
self.canvas = canvas
self.annotations = annotations or NodeListWithCursor[CanvasAnnotation]([], None)
self.container_references = container_references or NodeListWithCursor[ContainerReference]([], None)
self.fdm_instance_container_references = fdm_instance_container_references or NodeListWithCursor[
FdmInstanceContainerReference
]([], None)
self.solution_tags = solution_tags or NodeListWithCursor[CogniteSolutionTag]([], None)

@classmethod
def _load(cls, result: QueryResult) -> "IndustrialCanvas":
"""Load an IndustrialCanvas instance from a QueryResult."""
if not ("canvas" in result and isinstance(result["canvas"], NodeListWithCursor) and len(result["canvas"]) == 1):
raise ValueError("QueryResult does not contain a canvas node.")
canvas = Canvas._load(result["canvas"][0].dump())
return cls(
canvas=canvas,
annotations=cls._load_items(result.get("annotations"), CanvasAnnotation),
container_references=cls._load_items(result.get("containerReferences"), ContainerReference),
fdm_instance_container_references=cls._load_items(
result.get("fdmInstanceContainerReferences"), FdmInstanceContainerReference
),
solution_tags=cls._load_items(result.get("solutionTags"), CogniteSolutionTag),
)

@classmethod
def _load_items(
cls, response: NodeListWithCursor[Node] | None, node_cls: type[T_Node]
) -> NodeListWithCursor[T_Node]:
if response is None:
return NodeListWithCursor[T_Node]([], None)
return NodeListWithCursor[T_Node]([node_cls._load(node.dump()) for node in response], response.cursor)

@classmethod
def _create_query(cls, external_id: str) -> query.Query:
return query.Query(
with_={
"canvas": query.NodeResultSetExpression(
filter=filters.InstanceReferences([(CANVAS_INSTANCE_SPACE, external_id)]),
limit=1,
),
"solutionTags": query.NodeResultSetExpression(
from_="canvas",
through=Canvas.get_source().as_property_ref("solutionTags"),
),
"annotationEdges": query.EdgeResultSetExpression(
from_="canvas",
filter=filters.Equals(["edge", "type"], ANNOTATION_EDGE_TYPE.dump()),
node_filter=filters.HasData(views=[CanvasAnnotation.get_source()]),
direction="outwards",
),
"containerReferenceEdges": query.EdgeResultSetExpression(
from_="canvas",
filter=filters.Equals(["edge", "type"], CONTAINER_REFERENCE_EDGE_TYPE.dump()),
node_filter=filters.HasData(views=[ContainerReference.get_source()]),
direction="outwards",
),
"fdmInstanceContainerReferenceEdges": query.EdgeResultSetExpression(
from_="canvas",
filter=filters.Equals(
["edge", "type"],
FDM_CONTAINER_REFERENCE_EDGE_TYPE.dump(),
),
node_filter=filters.HasData(views=[FdmInstanceContainerReference.get_source()]),
direction="outwards",
),
"annotations": query.NodeResultSetExpression(from_="annotationEdges"),
"containerReferences": query.NodeResultSetExpression(from_="containerReferenceEdges"),
"fdmInstanceContainerReferences": query.NodeResultSetExpression(
from_="fdmInstanceContainerReferenceEdges"
),
},
select={
"canvas": query.Select([query.SourceSelector(Canvas.get_source(), properties=["*"])]),
"solutionTags": query.Select([query.SourceSelector(CogniteSolutionTag.get_source(), properties=["*"])]),
"annotations": query.Select([query.SourceSelector(CanvasAnnotation.get_source(), properties=["*"])]),
"containerReferences": query.Select(
[query.SourceSelector(ContainerReference.get_source(), properties=["*"])]
),
"fdmInstanceContainerReferences": query.Select(
[query.SourceSelector(FdmInstanceContainerReference.get_source(), properties=["*"])]
),
},
)

def dump(self) -> dict[str, object]:
"""Dump the IndustrialCanvas to a dictionary."""
return {
"canvas": self.canvas.dump(),
"annotations": [annotation.dump() for annotation in self.annotations],
"containerReferences": [container_ref.dump() for container_ref in self.container_references],
"fdmInstanceContainerReferences": [
fdm_instance_container_ref.dump()
for fdm_instance_container_ref in self.fdm_instance_container_references
],
"solutionTags": [solution_tag.dump() for solution_tag in self.solution_tags],
}

def as_write(self) -> "IndustrialCanvasApply":
return IndustrialCanvasApply(
canvas=self.canvas.as_write(),
annotations=[annotation.as_write() for annotation in self.annotations],
container_references=[container_ref.as_write() for container_ref in self.container_references],
fdm_instance_container_references=[
fdm_instance_container_ref.as_write()
for fdm_instance_container_ref in self.fdm_instance_container_references
],
solution_tags=[solution_tag.as_write() for solution_tag in self.solution_tags],
)
Loading