Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add dynamic propery assignment #4

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions aiko_services/__tokens.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
CLI_TOKEN = "_cli"
SEPARATOR_TOKEN = "_SEP_"
16 changes: 8 additions & 8 deletions aiko_services/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,11 @@
import aiko_services.event as event
from aiko_services.pipeline import Pipeline, load_pipeline_definition
from aiko_services.utilities import get_logger, load_module
from aiko_services.__tokens import CLI_TOKEN, SEPARATOR_TOKEN

MATCH_CAMEL_CASE = re.compile(r"(?<!^)(?=[A-Z])")
DEFAULT_PIPELINE_NAME = "pipeline_definition"
DEFAULT_PIPELINE_FRAME_RATE = 0.05 # 20 FPS, 0 for flat-out!
SEP = "_SEP_"
pipeline_definition = []


Expand Down Expand Up @@ -157,10 +157,10 @@ def decorator(f):
component_name = ele["name"]

# Required cli params
params = {k:v for k,v in ele["parameters"].items() if not k.endswith("_cli")}
cli_attrs = {k:v for k,v in ele["parameters"].items() if k.endswith("_cli")}
params = {k:v for k,v in ele["parameters"].items() if not k.endswith(CLI_TOKEN)}
cli_attrs = {k:v for k,v in ele["parameters"].items() if k.endswith(CLI_TOKEN)}
for param_name, value in params.items():
attributes = cli_attrs.pop(f"{param_name}_cli", {})
attributes = cli_attrs.pop(f"{param_name}{CLI_TOKEN}", {})
if attributes.get("hidden", False):
continue
validate_attributes(attributes, component_name, param_name)
Expand All @@ -179,7 +179,7 @@ def decorator(f):
flags = attributes.get("name", infered_flag).split()

validate_value_type(value, attributes, component_name, param_name)
variable_name = f"{component_name}{SEP}{param_name}"
variable_name = f"{component_name}{SEPARATOR_TOKEN}{param_name}"
flags += [variable_name]
click.option(*flags, **attributes)(f)
return f
Expand All @@ -190,7 +190,7 @@ def clean_cli_params(pipeline_definition):
for component in [e for e in pipeline_definition if "parameters" in e]:
param_names = list(component["parameters"].keys())
for param_name in param_names:
if param_name.endswith("_cli"):
if param_name.endswith(CLI_TOKEN):
component["parameters"].pop(param_name)
return pipeline_definition

Expand Down Expand Up @@ -228,8 +228,8 @@ def main(**kwargs):
pipeline = Pipeline(_pipeline_def, kwargs["pipeline_frame_rate"])

for k,v in kwargs.items():
if SEP in k:
node_name, param_name = k.split(SEP)
if SEPARATOR_TOKEN in k:
node_name, param_name = k.split(SEPARATOR_TOKEN)
pipeline.update_node_parameter(node_name, param_name, v)

if kwargs["show"]:
Expand Down
1 change: 1 addition & 0 deletions aiko_services/elements/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
from .image_io import *
from .video_io import *
from .simple import *
112 changes: 85 additions & 27 deletions aiko_services/elements/image_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,38 +8,75 @@
from pathlib import Path
from PIL import Image

__all__ = ["ImageAnnotate1", "ImageAnnotate2", "ImageOverlay", "ImageReadFile", "ImageResize", "ImageWriteFile"]
__all__ = [
"ImageAnnotate1",
"ImageAnnotate2",
"ImageOverlay",
"ImageReadFile",
"ImageResize",
"ImageWriteFile",
]


class ImageAnnotate1(StreamElement):
def stream_frame_handler(self, stream_id, frame_id, swag):
self.logger.debug(f"stream_frame_handler(): stream_id: {stream_id}, frame_id: {frame_id}")
image = swag[self.predecessor]["image"]
expected_parameters = ()
expected_inputs = ("image",)
expected_outputs(
"image",
)

def stream_frame_handler(self, stream_id, frame_id, inputs):
self.logger.debug(
f"stream_frame_handler(): stream_id: {stream_id}, frame_id: {frame_id}"
)
image = inputs.image
return True, {"image": image}


class ImageAnnotate2(StreamElement):
def stream_frame_handler(self, stream_id, frame_id, swag):
self.logger.debug(f"stream_frame_handler(): stream_id: {stream_id}, frame_id: {frame_id}")
image = swag[self.predecessor]["image"]
expected_parameters = ()
expected_inputs = ("image",)
expected_outputs(
"image",
)

def stream_frame_handler(self, stream_id, frame_id, inputs):
self.logger.debug(
f"stream_frame_handler(): stream_id: {stream_id}, frame_id: {frame_id}"
)
image = inputs.image
return True, {"image": image}


class ImageOverlay(StreamElement):
def stream_frame_handler(self, stream_id, frame_id, swag):
self.logger.debug(f"stream_frame_handler(): stream_id: {stream_id}, frame_id: {frame_id}")
image = swag[self.predecessor]["image"]
expected_parameters = ()
expected_inputs = ("image",)
expected_outputs(
"image",
)

def stream_frame_handler(self, stream_id, frame_id, inputs):
self.logger.debug(
f"stream_frame_handler(): stream_id: {stream_id}, frame_id: {frame_id}"
)
image = inputs.image
return True, {"image": image}


class ImageReadFile(StreamElement):
def stream_start_handler(self, stream_id, frame_id, swag):
expected_parameters = ("image_pathname",)
expected_inputs = ()
expected_outputs("images,")

def stream_start_handler(self, stream_id, frame_id, inputs):
self.logger.debug(f"stream_start_handler(): stream_id: {stream_id}")
self.image_pathname = self.parameters["image_pathname"]
image_directory = Path(self.image_pathname).parent
if not image_directory.exists:
self.logger.error(f"Couldn't find directory: {image_directory}")
return False, None
return True, None

def stream_frame_handler(self, stream_id, frame_id, swag):
def stream_frame_handler(self, stream_id, frame_id, inputs):
image_path = self.image_pathname.format(frame_id)
try:
pil_image = Image.open(image_path)
Expand All @@ -48,36 +85,57 @@ def stream_frame_handler(self, stream_id, frame_id, swag):
self.logger.debug("End of images")
return False, None

self.logger.debug(f"stream_frame_handler(): stream_id: {stream_id}, frame_id: {frame_id}")
self.logger.debug(
f"stream_frame_handler(): stream_id: {stream_id}, frame_id: {frame_id}"
)
if frame_id % 10 == 0:
print(f"Frame Id: {frame_id}", end="\r")
return True, {"image": image}


class ImageResize(StreamElement):
def stream_start_handler(self, stream_id, frame_id, swag):
expected_parameters = (
"new_height",
"new_width",
)
expected_inputs = ("image",)
expected_outputs(
"image",
)

def stream_start_handler(self, stream_id, frame_id, inputs):
self.logger.debug(f"stream_start_handler(): stream_id: {stream_id}")
self.new_height = self.parameters["new_height"]
self.new_width = self.parameters["new_width"]
self.new_height = self.new_height
self.new_width = self.new_width
return True, None

def stream_frame_handler(self, stream_id, frame_id, swag):
self.logger.debug(f"stream_frame_handler(): stream_id: {stream_id}, frame_id: {frame_id}")
pil_image = Image.fromarray(swag[self.predecessor]["image"])
def stream_frame_handler(self, stream_id, frame_id, inputs):
self.logger.debug(
f"stream_frame_handler(): stream_id: {stream_id}, frame_id: {frame_id}"
)
pil_image = Image.fromarray(inputs.image)
pil_image = pil_image.resize((self.new_width, self.new_height))
image = np.asarray(pil_image, dtype=np.uint8)
return True, {"image": image}


class ImageWriteFile(StreamElement):
def stream_start_handler(self, stream_id, frame_id, swag):
expected_parameters = ("image_pathname",)
expected_inputs = ("image",)
expected_outputs()

def stream_start_handler(self, stream_id, frame_id, inputs):
self.logger.debug(f"stream_start_handler(): stream_id: {stream_id}")
self.image_pathname = self.parameters["image_pathname"]
# TODO: Error handling
self.image_pathname = self.image_pathname
# TODO: Error handling
Path(self.image_pathname).parent.mkdir(exist_ok=True, parents=True)
return True, None

def stream_frame_handler(self, stream_id, frame_id, swag):
self.logger.debug(f"stream_frame_handler(): stream_id: {stream_id}, frame_id: {frame_id}")
pil_image = Image.fromarray(swag[self.predecessor]["image"])
# TODO: Error handling: 1) format image, 2) save image
def stream_frame_handler(self, stream_id, frame_id, inputs):
self.logger.debug(
f"stream_frame_handler(): stream_id: {stream_id}, frame_id: {frame_id}"
)
pil_image = Image.fromarray(inputs.image)
# TODO: Error handling: 1) format image, 2) save image
pil_image.save(self.image_pathname.format(frame_id))
return True, None
86 changes: 86 additions & 0 deletions aiko_services/elements/simple.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
from aiko_services.stream import StreamElement

import numpy as np
from pathlib import Path

__all__ = ["MathList", "RandInt", "Print"]


class MathList(StreamElement):
"""Adds the numbers in inputs["numbers"]"""

expected_parameters = ("operation",) # "add", "multiply"
expected_inputs = ("numbers",)
expected_outputs = ("result",)

def stream_start_handler(self, stream_id, frame_id, inputs):
self.logger.debug(f"stream_start_handler(): stream_id: {stream_id}")

if self.operation == "add":
self.fn = np.sum
elif self.operation == "product":
self.fn = np.product
else:
self.logger.error(f"Unsupported operation: '{self.operation}'")
return False, None

return True, None

def stream_frame_handler(self, stream_id, frame_id, inputs):
self.logger.debug(
f"stream_frame_handler(): stream_id: {stream_id}, frame_id: {frame_id}"
)
result = {"result": self.fn(inputs.numbers)}
return True, result


class RandInt(StreamElement):
"""Generates list of ints in 'range'. Result of length 'list_len' for
'iterations' loops
"""

expected_parameters = (
("list_len", 10),
("iterations", 10),
("min", 0),
("max", 10),
)
expected_inputs = ()
expected_outputs = ("list",)

def stream_frame_handler(self, stream_id, frame_id, inputs):
self.logger.debug(
f"stream_frame_handler(): stream_id: {stream_id}, frame_id: {frame_id}"
)
if frame_id == self.iterations:
self.logger.info("Number of iterations completed. Exiting")
return False, None

result = {
"list": list(np.random.randint(self.min, self.max, size=(self.list_len)))
}
return True, result


class Print(StreamElement):
"""Prints:
f'{message}{inputs.to_print}'
"""

expected_parameters = (
"message_1",
"message_2",
)
expected_inputs = (
"to_print_1",
"to_print_2",
)
expected_outputs = ()

def stream_frame_handler(self, stream_id, frame_id, inputs):
self.logger.debug(
f"stream_frame_handler(): stream_id: {stream_id}, frame_id: {frame_id}"
)
self.logger.info(f"{self.message_1}{inputs.to_print_1}")
self.logger.info(f"{self.message_2}{inputs.to_print_2}")
return True, None
Loading