Skip to content

Commit

Permalink
Merge branch 'dptool' of github.com:garthwatney/fprime-gds into dptool
Browse files Browse the repository at this point in the history
  • Loading branch information
watney committed May 16, 2024
2 parents c365954 + fd0a903 commit b3bbe93
Show file tree
Hide file tree
Showing 20 changed files with 2,778 additions and 245 deletions.
1 change: 1 addition & 0 deletions .github/actions/spelling/excludes.txt
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,4 @@ ignore$
\.ico$
pyproject.toml
setup.py
dictionary.xml
3 changes: 3 additions & 0 deletions .github/actions/spelling/expect.txt
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,8 @@ hidecallgraph
hideinitializer
hlp
HLTH
Hookimpl
hookspec
hostname
hpaulson
hpp
Expand Down Expand Up @@ -329,6 +331,7 @@ ints
ip
ipc
ipp
isabstract
isalnum
isclass
isdir
Expand Down
4 changes: 4 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -89,3 +89,7 @@ fprime_test_api = "fprime_gds.common.testing_fw.pytest_integration"
# - https://setuptools.pypa.io/en/latest/userguide/pyproject_config.html
# - https://setuptools.pypa.io/en/latest/userguide/package_discovery.html#src-layout
####
[tool.pytest.ini_options]
markers =[
"gds_cli"
]
3 changes: 0 additions & 3 deletions pytest.ini

This file was deleted.

88 changes: 30 additions & 58 deletions src/fprime_gds/common/communication/adapters/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ class representing the core features of the adapter class that must be implement
@author lestarch
"""
import abc
from typing import Type
from fprime_gds.plugin.definitions import gds_plugin_implementation, gds_plugin_specification


class BaseAdapter(abc.ABC):
Expand Down Expand Up @@ -47,71 +49,41 @@ def write(self, frame):
"""

@classmethod
@abc.abstractmethod
def get_arguments(cls):
"""
Returns a set of arguments consumed by this adapter. This will be consumed by the CLI layer in order to provide
command line arguments to the user. Note: these should be globally unique args, e.g. --ip-address
@gds_plugin_specification
def register_communication_plugin(cls) -> Type["BaseAdapter"]:
"""Register a communications adapter
:return: dictionary, keys of tuple of arg flags and value of list of other arguments to argparse's add_argument
"""
Plugin hook for registering a plugin that supplies an adapter to the communications interface (radio, uart, i2c,
etc). This interface is expected to read and write bytes from a wire and will be provided to the framing system.
@classmethod
@abc.abstractmethod
def check_arguments(cls, args):
"""
Code that should check arguments of this adapter. If there is a problem with this code, then a "ValueError"
should be raised describing the problem with these arguments.
Note: users should return the class, not an instance of the class. Needed arguments for instantiation are
determined from class methods, solicited via the command line, and provided at construction time to the chosen
instantiation.
:param args: arguments as dictionary
Returns:
BaseAdapter subclass
"""
raise NotImplementedError()


class NoneAdapter(BaseAdapter):
""" None adapter used to turn off the comm script """

@classmethod
def get_adapters(cls):
"""
Get all known adapters of this base class. These must be imported into the comm-layer to be available to the
system, however; they only need to be imported. Once imported this function will find them and make them
available to the comm-layer in the standard way.
def get_name(cls):
""" Get name of the non-adapter """
return "none"

:return: list of all imported comm adapters.
"""
adapter_map = {}
for adapter in cls.__subclasses__():
# Get two versions of names
adapter_name = adapter.__module__
adapter_short = adapter_name.split(".")[-1]
# Check to use long or short name
if adapter_short not in adapter_map:
adapter_name = adapter_short
adapter_map[adapter_name] = adapter
return adapter_map

@staticmethod
def process_arguments(clazz, args):
"""
Process arguments incoming from the command line and construct a dictionary of kwargs to supply to the chosen
adapter at creation time. This will allow the user to choose any imported adapters at runtime.
def read(self, timeout=0.500):
""" Raise exception if this is called"""
raise NotImplementedError()

:param args: arguments to process
:return: dictionary of constructor keyword arguments
"""
return {
value["dest"]: getattr(args, value["dest"])
for value in clazz.get_arguments().values()
}
def write(self, frame):
""" Raise exception if this is called"""
raise NotImplementedError()

@classmethod
def construct_adapter(cls, adapter_name, args):
"""
Constructs a new adapter, from the given adapter name and the given namespace of argument inputs. This is a
wrapper of "get_adapters" and "process_arguments" to help build a new, fully configured, adapter. This is a
factory method.
:param adapter_name: name of the adapter to build
:param args: namespace of arg value to help build an adapter
:return: newly constructed adapter
"""
if adapter_name == "none":
return None
adapter = cls.get_adapters()[adapter_name]
return adapter(**cls.process_arguments(adapter, args))
@gds_plugin_implementation
def register_communication_plugin(cls):
""" Register this as a plugin """
return cls
28 changes: 23 additions & 5 deletions src/fprime_gds/common/communication/adapters/ip.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import fprime_gds.common.communication.adapters.base
import fprime_gds.common.logger

from fprime_gds.plugin.definitions import gds_plugin_implementation

LOGGER = logging.getLogger("ip_adapter")


Expand Down Expand Up @@ -114,7 +116,7 @@ def th_handler(self, handler):

def write(self, frame):
"""
Send a given framed bit of data by sending it out the serial interface. It will attempt to reconnect if there is
Send a given framed bit of data by sending it out the serial interface. It will attempt to reconnect if there
was a problem previously. This function will return true on success, or false on error.
:param frame: framed data packet to send out
Expand Down Expand Up @@ -151,6 +153,11 @@ def th_alive(self, interval):
self.write(IpAdapter.KEEPALIVE_DATA)
time.sleep(interval)

@classmethod
def get_name(cls):
""" Get the name of this adapter """
return "ip"

@classmethod
def get_arguments(cls):
"""
Expand All @@ -163,13 +170,13 @@ def get_arguments(cls):
"dest": "address",
"type": str,
"default": "0.0.0.0",
"help": "Address of the IP adapter server. Default: %(default)s",
"help": "Address of the IP adapter server.",
},
("--ip-port",): {
"dest": "port",
"type": int,
"default": 50000,
"help": "Port of the IP adapter server. Default: %(default)s",
"help": "Port of the IP adapter server.",
},
("--ip-client",): {
# dest is "server" since it is handled in BaseAdapter.construct_adapter and passed with the same
Expand All @@ -182,14 +189,25 @@ def get_arguments(cls):
}

@classmethod
def check_arguments(cls, args):
@gds_plugin_implementation
def register_communication_plugin(cls):
""" Register this as a communication plugin """
return cls

@classmethod
def check_arguments(cls, address, port, server=True):
"""
Code that should check arguments of this adapter. If there is a problem with this code, then a "ValueError"
should be raised describing the problem with these arguments.
:param args: arguments as dictionary
"""
check_port(args["address"], args["port"])
try:
if server:
check_port(address, port)
except OSError as os_error:
raise ValueError(f"{os_error}")



class IpHandler(abc.ABC):
Expand Down
27 changes: 20 additions & 7 deletions src/fprime_gds/common/communication/adapters/uart.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

import fprime_gds.common.communication.adapters.base

from fprime_gds.plugin.definitions import gds_plugin_implementation

LOGGER = logging.getLogger("serial_adapter")


Expand Down Expand Up @@ -151,35 +153,46 @@ def get_arguments(cls):
"dest": "device",
"type": str,
"default": default,
"help": "UART device representing the FSW. Default: %(default)s",
"help": "UART device representing the FSW.",
},
("--uart-baud",): {
"dest": "baud",
"type": int,
"default": 9600,
"help": "Baud rate of the serial device. Default: %(default)s",
"help": "Baud rate of the serial device.",
},
}

@classmethod
def check_arguments(cls, args):
def get_name(cls):
""" Get name of the adapter """
return "uart"

@classmethod
@gds_plugin_implementation
def register_communication_plugin(cls):
""" Register this as a communication plugin """
return cls

@classmethod
def check_arguments(cls, device, baud):
"""
Code that should check arguments of this adapter. If there is a problem with this code, then a "ValueError"
should be raised describing the problem with these arguments.
:param args: arguments as dictionary
"""
ports = map(lambda info: info.device, list_ports.comports(include_links=True))
if args["device"] not in ports:
msg = f"Serial port '{args['device']}' not valid. Available ports: {ports}"
if device not in ports:
msg = f"Serial port '{device}' not valid. Available ports: {ports}"
raise ValueError(
msg
)
# Note: baud rate may not *always* work. These are a superset
try:
baud = int(args["baud"])
baud = int(baud)
except ValueError:
msg = f"Serial baud rate '{args['baud']}' not integer. Use one of: {SerialAdapter.BAUDS}"
msg = f"Serial baud rate '{baud}' not integer. Use one of: {SerialAdapter.BAUDS}"
raise ValueError(
msg
)
Expand Down
4 changes: 1 addition & 3 deletions src/fprime_gds/common/communication/checksum.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,14 @@ def crc_calculation(data: bytes):
return zlib.crc32(data) & 0xFFFFFFFF


CHECKSUM_SELECTION = "crc32"
CHECKSUM_MAPPING = {
"fixed": lambda data: 0xCAFECAFE,
"crc32": crc_calculation,
"default": crc_calculation,
}


def calculate_checksum(data: bytes):
def calculate_checksum(data: bytes, selected_checksum: str):
"""Calculates the checksum of bytes"""
selected_checksum = CHECKSUM_SELECTION
hash_fn = CHECKSUM_MAPPING.get(selected_checksum, CHECKSUM_MAPPING.get("default"))
return hash_fn(data)
Loading

0 comments on commit b3bbe93

Please sign in to comment.