From d68f897e01538bcd292063f2c09518bdb63d9659 Mon Sep 17 00:00:00 2001 From: Yuhuang Hu Date: Sat, 16 Mar 2024 02:44:12 +0100 Subject: [PATCH] simplify --- pyaer/__about__.py | 6 -- pyaer/__init__.py | 12 +-- pyaer/constants.py | 2 + pyaer/device.py | 238 +++++++++++++++++++++------------------------ setup.py | 19 ++-- 5 files changed, 120 insertions(+), 157 deletions(-) delete mode 100644 pyaer/__about__.py create mode 100644 pyaer/constants.py diff --git a/pyaer/__about__.py b/pyaer/__about__.py deleted file mode 100644 index 87498d8..0000000 --- a/pyaer/__about__.py +++ /dev/null @@ -1,6 +0,0 @@ -__all__ = ["__version__", "__author__", "__author_email__", "__url__"] - -__version__ = "0.2.7a0" -__author__ = "Yuhuang Hu" -__author_email__ = "duguyue100@gmail.com" -__url__ = "https://github.com/duguyue100/pyaer" diff --git a/pyaer/__init__.py b/pyaer/__init__.py index 1b8b2cb..362cbd6 100644 --- a/pyaer/__init__.py +++ b/pyaer/__init__.py @@ -1,18 +1,8 @@ -from __future__ import absolute_import -from __future__ import print_function - -import os - from pyaer import log -from pyaer.__about__ import __author__ # noqa -from pyaer.__about__ import __version__ # noqa -FILE_PATH = os.path.realpath(__file__) -CURR_PATH = os.path.dirname(os.path.realpath(__file__)) -PKG_PATH = os.path.dirname(CURR_PATH) - # System logging level + LOG_LEVEL = log.DEBUG try: diff --git a/pyaer/constants.py b/pyaer/constants.py new file mode 100644 index 0000000..3c7d9de --- /dev/null +++ b/pyaer/constants.py @@ -0,0 +1,2 @@ +DISABLE_MAX_CONTAINER_SIZE = 0 +DEFAULT_MAX_PACKET_INTERVAL = 10000 # 10ms diff --git a/pyaer/device.py b/pyaer/device.py index 7a1d80a..64b1fc0 100644 --- a/pyaer/device.py +++ b/pyaer/device.py @@ -1,18 +1,20 @@ from __future__ import annotations import abc +from typing import Any from pyaer import libcaer +from pyaer.constants import DEFAULT_MAX_PACKET_INTERVAL +from pyaer.constants import DISABLE_MAX_CONTAINER_SIZE -class USBDevice(object): +class USBDevice: """Base class for all USB devices. This class is the base of DVS128, DAVIS240, DAVIS346 and DYNAPSE. """ - def __init__(self): - """Device.""" + def __init__(self) -> None: self.handle = None # functions for get events number and packet functions @@ -33,29 +35,27 @@ def __init__(self): } @abc.abstractmethod - def obtain_device_info(self, handle): - """Obtain device handle. + def obtain_device_info(self, handle: Any) -> None: + """Obtains device handle. - This abstract method should be implemented in all derived classes. - This method collects the general information about the USB device - such as the width and height of the camera or the serial number - of the device. + This abstract method should be implemented in all derived classes. This method + collects the general information about the device such as the width and height + of the camera or the serial number of the device. - # Arguments - handle: `caerDeviceHandle`
- a valid device handle that can be used with the other - `libcaer` functions, or `None` on error. + # Args: + handle: a valid device handle that can be used with the other `libcaer` + functions, or `None` on error. """ - return + raise NotADirectoryError() @abc.abstractmethod - def get_event(self): - """Get Event. + def get_event(self) -> None: + """Gets Event. This abstract method should be implemented in all derived classes. This method returns a packet of events according to the type of the sensor. """ - return + raise NotImplementedError() def open( self, @@ -106,15 +106,15 @@ def open( if self.handle is None: raise ValueError("The device is failed to open.") - def close(self): - """Close USB device. + def close(self) -> None: + """Closes USB device. This method closes an opened USB device if the respective handle is not None. """ if self.handle is not None: libcaer.caerDeviceClose(self.handle) - def shutdown(self): + def shutdown(self) -> None: """Shutdown device. This method is a combination of `data_stop` and `close`. This is a preferred way @@ -123,13 +123,12 @@ def shutdown(self): self.data_stop() self.close() - def data_start(self): - """Start data transmission. + def data_start(self) -> bool: + """Starts data transmission. - # Returns - flag: `bool`
- Return `True` if the data transmission is - initialized successfully. Otherwise `False`. + Returns: + flag: Return `True` if the data transmission is initialized successfully. + Otherwise `False`. """ # TODO figure out the parameter meaning if self.handle is not None: @@ -140,25 +139,24 @@ def data_start(self): else: return False - def data_stop(self): - """Stop data transmission. + def data_stop(self) -> None: + """Stops data transmission. This method stops the data transmission only. Note that this method does not destroy the respective device `handle`. """ libcaer.caerDeviceDataStop(self.handle) - def send_default_config(self): + def send_default_config(self) -> bool: """Send default configuration. - Each type of devices has a set of default configurations (e.g. bias) + Each type of devices has a set of default configurations (e.g., bias) that are pre-defined in the `libcaer` library. - Note that the default configuration might not be suitable for your - needs. + Note that the default configuration might not be suitable for your needs. - # Returns - flag: `bool`
- return `True` if the default config is set successfully, + + Returns: + flag: Return `True` if the default config is set successfully, `False` otherwise. """ if self.handle is not None: @@ -167,15 +165,41 @@ def send_default_config(self): else: return False - def set_max_container_packet_size(self, max_packet_size=0): + def set_config(self, mod_addr: int, param_addr: int, param: int | bool) -> bool: + """Sets configuration. + + The main function of setting configurations (e.g., bias). + + # Args: + mod_addr: a module address, used to specify which configuration module one + wants to update. Negative addresses are used for host-side + configuration, while positive addresses (including zero) are used for + device-side configuration. + param_addr: a parameter address, to select a specific parameter to update + from this particular configuration module. Only positive numbers + (including zero) are allowed. + param: a configuration parameter's new value. + + # Returns: + `True` if the config is set successfully, `False` otherwise. + """ + if self.handle is not None: + set_success = libcaer.caerDeviceConfigSet( + self.handle, mod_addr, param_addr, param + ) + return set_success + else: + return False + + def set_max_container_packet_size( + self, max_packet_size: int = DISABLE_MAX_CONTAINER_SIZE + ) -> bool: """Set max container packet size. - # Arguments - max_packet_size: `int`
- set the maximum number of events any of a packet container's - packets may hold before it's made available to the user. - Set to zero to disable.
- The default is `0`. + Args: + max_packet_size: set the maximum number of events any of a packet + container's packets may hold before it's made available to the user. + Set to `pyaer.constants.DISABLE_MAX_CONTAINER_SIZE` to disable. """ return self.set_config( libcaer.CAER_HOST_CONFIG_PACKETS, @@ -183,16 +207,16 @@ def set_max_container_packet_size(self, max_packet_size=0): max_packet_size, ) - def set_max_container_interval(self, max_packet_interval=10000): - """Set max packet interval. + def set_max_container_interval( + self, max_packet_interval: int = DEFAULT_MAX_PACKET_INTERVAL + ) -> bool: + """Sets max packet interval. - # Arguments - max_packet_interval: `int`
- set the time interval between subsequent packet containers. - Must be at least 1 microsecond. - The value is in microseconds, and is checked across all - types of events contained in the EventPacketContainer.
- The default is `10000` (10ms or 100 packets/s) + Args: + max_packet_interval: set the time interval between subsequent packet + containers. Must be at least 1 microsecond. The value is in + microseconds, and is checked across all types of events contained in + the EventPacketContainer. The default is `10000` (10ms or 100 packets/s) """ return self.set_config( libcaer.CAER_HOST_CONFIG_PACKETS, @@ -200,18 +224,16 @@ def set_max_container_interval(self, max_packet_interval=10000): max_packet_interval, ) - def set_data_exchange_blocking(self, exchange_blocking=True): - """Set data exchange blocking. + def set_data_exchange_blocking(self, exchange_blocking: bool = True) -> bool: + """Sets data exchange blocking. - # Arguments - exchange_blocking: `bool`
- whether to start all the data producer modules on the device - (DVS, APS, Mux, ...) automatically when starting the - data transfer thread with `caerDeviceDataStart()` or not. - If disabled, be aware you will have to start the right modules - manually, which can be useful if you need precise control - over which ones are running at any time.
- The default is `True`. + Arg: + exchange_blocking: Whether to start all the data producer modules on the + device (DVS, APS, Mux, ...) automatically when starting the data + transfer thread with `caerDeviceDataStart()` or not. If disabled, be + aware you will have to start the right modules manually, which can be + useful if you need precise control over which ones are running at any + time. The default is `True`. """ return self.set_config( libcaer.CAER_HOST_CONFIG_DATAEXCHANGE, @@ -219,71 +241,32 @@ def set_data_exchange_blocking(self, exchange_blocking=True): exchange_blocking, ) - def set_config(self, mod_addr, param_addr, param): - """Set configuration. - - The main function of setting configurations (e.g., bias). + def get_config(self, mod_addr: int, param_addr: int) -> int | bool | None: + """Gets Configuration. - # Arguments - mod_addr: `int`
- a module address, used to specify which configuration module - one wants to update. Negative addresses are used for host-side - configuration, while positive addresses (including zero) are - used for device-side configuration. - param_addr: `int`
- a parameter address, to select a specific parameter to update - from this particular configuration module. - Only positive numbers + Args: + mod_addr: a module address, used to specify which configuration module one + wants to update. Negative addresses are used for host-side + configuration, while positive addresses (including zero) are used for + device-side configuration. + param_addr: a parameter address, to select a specific parameter to update + from this particular configuration module. Only positive numbers (including zero) are allowed. - param: `int` or `bool`
- a configuration parameter's new value. - # Returns - flag: `bool`
- returns `True` if the config is set successfully, - `False` otherwise. - """ - if self.handle is not None: - set_success = libcaer.caerDeviceConfigSet( - self.handle, mod_addr, param_addr, param - ) - return set_success - else: - return False - - def get_config(self, mod_addr, param_addr): - """Get Configuration. - - # Arguments - mod_addr: `int`
- a module address, used to specify which configuration module - one wants to update. Negative addresses are used for host-side - configuration, while positive addresses (including zero) are - used for device-side configuration. - param_addr: `int`
- a parameter address, to select a specific parameter to update - from this particular configuration module. - Only positive numbers - (including zero) are allowed. - - # Returns - param: `int` or `bool`
- a configuration parameter's new value. Returns None - if the handle is not valid. + Returns: + A configuration parameter's new value. Returns None if the handle is not + valid. """ if self.handle is not None: return libcaer.caerDeviceConfigGet(self.handle, mod_addr, param_addr) else: return None - def get_packet_container(self): - """Get event packet container. + def get_packet_container(self) -> tuple[Any | None, int | None]: + """Gets event packet container. - # Returns - packet_container: `caerEventPacketContainer`
- a container that consists of event packets. - packet_number: `int`
- number of event packet in the container. + Returns: + A tuple of `(packet_container, num_event_packets)`. """ packet_container = libcaer.caerDeviceDataGet(self.handle) if packet_container is not None: @@ -294,20 +277,17 @@ def get_packet_container(self): else: return None, None - def get_packet_header(self, packet_container, idx): - """Get a single packet header. + def get_packet_header( + self, packet_container: Any, idx: int + ) -> tuple[Any | None, Any | None]: + """Gets a single packet header. - # Arguments - packet_container: `caerEventPacketContainer`
- the event packet container - idx: `int`
- the index of the packet header + Args: + packet_container: the event packet container + idx: the index of the packet header - # Returns - packet_header: `caerEventPacketHeader`
- the header that represents a event packet - packet_type: `caerEventPacketType`
- the type of the event packet + Returns: + A tuple of `(packet_header, packet_type)`. """ packet_header = libcaer.caerEventPacketContainerGetEventPacket( packet_container, idx diff --git a/setup.py b/setup.py index e169017..ad4ec78 100644 --- a/setup.py +++ b/setup.py @@ -25,13 +25,10 @@ License :: OSI Approved :: MIT License """ -try: - from pyaer import __about__ - - about = __about__.__dict__ -except ImportError: - about = dict() - exec(open("pyaer/__about__.py").read(), about) +__version__ = "0.2.7a0" +__author__ = "Yuhuang Hu" +__author_email__ = "duguyue100@gmail.com" +__url__ = "https://github.com/duguyue100/pyaer" python_paths = get_paths() @@ -74,10 +71,10 @@ setup( name="pyaer", - version=about["__version__"], - author=about["__author__"], - author_email=about["__author_email__"], - url=about["__url__"], + version=__version__, + author=__author__, + author_email=__author_email__, + url=__url__, install_requires=["numpy"], packages=find_packages(), ext_modules=[libcaer_wrap],