diff --git a/pyaer/__about__.py b/pyaer/__about__.py
deleted file mode 100644
index 87498d8..0000000
--- a/pyaer/__about__.py
+++ /dev/null
@@ -1,6 +0,0 @@
-__all__ = ["__version__", "__author__", "__author_email__", "__url__"]
-
-__version__ = "0.2.7a0"
-__author__ = "Yuhuang Hu"
-__author_email__ = "duguyue100@gmail.com"
-__url__ = "https://github.com/duguyue100/pyaer"
diff --git a/pyaer/__init__.py b/pyaer/__init__.py
index 1b8b2cb..362cbd6 100644
--- a/pyaer/__init__.py
+++ b/pyaer/__init__.py
@@ -1,18 +1,8 @@
-from __future__ import absolute_import
-from __future__ import print_function
-
-import os
-
from pyaer import log
-from pyaer.__about__ import __author__ # noqa
-from pyaer.__about__ import __version__ # noqa
-FILE_PATH = os.path.realpath(__file__)
-CURR_PATH = os.path.dirname(os.path.realpath(__file__))
-PKG_PATH = os.path.dirname(CURR_PATH)
-
# System logging level
+
LOG_LEVEL = log.DEBUG
try:
diff --git a/pyaer/constants.py b/pyaer/constants.py
new file mode 100644
index 0000000..3c7d9de
--- /dev/null
+++ b/pyaer/constants.py
@@ -0,0 +1,2 @@
+DISABLE_MAX_CONTAINER_SIZE = 0
+DEFAULT_MAX_PACKET_INTERVAL = 10000 # 10ms
diff --git a/pyaer/device.py b/pyaer/device.py
index 7a1d80a..64b1fc0 100644
--- a/pyaer/device.py
+++ b/pyaer/device.py
@@ -1,18 +1,20 @@
from __future__ import annotations
import abc
+from typing import Any
from pyaer import libcaer
+from pyaer.constants import DEFAULT_MAX_PACKET_INTERVAL
+from pyaer.constants import DISABLE_MAX_CONTAINER_SIZE
-class USBDevice(object):
+class USBDevice:
"""Base class for all USB devices.
This class is the base of DVS128, DAVIS240, DAVIS346 and DYNAPSE.
"""
- def __init__(self):
- """Device."""
+ def __init__(self) -> None:
self.handle = None
# functions for get events number and packet functions
@@ -33,29 +35,27 @@ def __init__(self):
}
@abc.abstractmethod
- def obtain_device_info(self, handle):
- """Obtain device handle.
+ def obtain_device_info(self, handle: Any) -> None:
+ """Obtains device handle.
- This abstract method should be implemented in all derived classes.
- This method collects the general information about the USB device
- such as the width and height of the camera or the serial number
- of the device.
+ This abstract method should be implemented in all derived classes. This method
+ collects the general information about the device such as the width and height
+ of the camera or the serial number of the device.
- # Arguments
- handle: `caerDeviceHandle`
- a valid device handle that can be used with the other
- `libcaer` functions, or `None` on error.
+ # Args:
+ handle: a valid device handle that can be used with the other `libcaer`
+ functions, or `None` on error.
"""
- return
+ raise NotADirectoryError()
@abc.abstractmethod
- def get_event(self):
- """Get Event.
+ def get_event(self) -> None:
+ """Gets Event.
This abstract method should be implemented in all derived classes. This method
returns a packet of events according to the type of the sensor.
"""
- return
+ raise NotImplementedError()
def open(
self,
@@ -106,15 +106,15 @@ def open(
if self.handle is None:
raise ValueError("The device is failed to open.")
- def close(self):
- """Close USB device.
+ def close(self) -> None:
+ """Closes USB device.
This method closes an opened USB device if the respective handle is not None.
"""
if self.handle is not None:
libcaer.caerDeviceClose(self.handle)
- def shutdown(self):
+ def shutdown(self) -> None:
"""Shutdown device.
This method is a combination of `data_stop` and `close`. This is a preferred way
@@ -123,13 +123,12 @@ def shutdown(self):
self.data_stop()
self.close()
- def data_start(self):
- """Start data transmission.
+ def data_start(self) -> bool:
+ """Starts data transmission.
- # Returns
- flag: `bool`
- Return `True` if the data transmission is
- initialized successfully. Otherwise `False`.
+ Returns:
+ flag: Return `True` if the data transmission is initialized successfully.
+ Otherwise `False`.
"""
# TODO figure out the parameter meaning
if self.handle is not None:
@@ -140,25 +139,24 @@ def data_start(self):
else:
return False
- def data_stop(self):
- """Stop data transmission.
+ def data_stop(self) -> None:
+ """Stops data transmission.
This method stops the data transmission only. Note that this method does not
destroy the respective device `handle`.
"""
libcaer.caerDeviceDataStop(self.handle)
- def send_default_config(self):
+ def send_default_config(self) -> bool:
"""Send default configuration.
- Each type of devices has a set of default configurations (e.g. bias)
+ Each type of devices has a set of default configurations (e.g., bias)
that are pre-defined in the `libcaer` library.
- Note that the default configuration might not be suitable for your
- needs.
+ Note that the default configuration might not be suitable for your needs.
- # Returns
- flag: `bool`
- return `True` if the default config is set successfully,
+
+ Returns:
+ flag: Return `True` if the default config is set successfully,
`False` otherwise.
"""
if self.handle is not None:
@@ -167,15 +165,41 @@ def send_default_config(self):
else:
return False
- def set_max_container_packet_size(self, max_packet_size=0):
+ def set_config(self, mod_addr: int, param_addr: int, param: int | bool) -> bool:
+ """Sets configuration.
+
+ The main function of setting configurations (e.g., bias).
+
+ # Args:
+ mod_addr: a module address, used to specify which configuration module one
+ wants to update. Negative addresses are used for host-side
+ configuration, while positive addresses (including zero) are used for
+ device-side configuration.
+ param_addr: a parameter address, to select a specific parameter to update
+ from this particular configuration module. Only positive numbers
+ (including zero) are allowed.
+ param: a configuration parameter's new value.
+
+ # Returns:
+ `True` if the config is set successfully, `False` otherwise.
+ """
+ if self.handle is not None:
+ set_success = libcaer.caerDeviceConfigSet(
+ self.handle, mod_addr, param_addr, param
+ )
+ return set_success
+ else:
+ return False
+
+ def set_max_container_packet_size(
+ self, max_packet_size: int = DISABLE_MAX_CONTAINER_SIZE
+ ) -> bool:
"""Set max container packet size.
- # Arguments
- max_packet_size: `int`
- set the maximum number of events any of a packet container's
- packets may hold before it's made available to the user.
- Set to zero to disable.
- The default is `0`.
+ Args:
+ max_packet_size: set the maximum number of events any of a packet
+ container's packets may hold before it's made available to the user.
+ Set to `pyaer.constants.DISABLE_MAX_CONTAINER_SIZE` to disable.
"""
return self.set_config(
libcaer.CAER_HOST_CONFIG_PACKETS,
@@ -183,16 +207,16 @@ def set_max_container_packet_size(self, max_packet_size=0):
max_packet_size,
)
- def set_max_container_interval(self, max_packet_interval=10000):
- """Set max packet interval.
+ def set_max_container_interval(
+ self, max_packet_interval: int = DEFAULT_MAX_PACKET_INTERVAL
+ ) -> bool:
+ """Sets max packet interval.
- # Arguments
- max_packet_interval: `int`
- set the time interval between subsequent packet containers.
- Must be at least 1 microsecond.
- The value is in microseconds, and is checked across all
- types of events contained in the EventPacketContainer.
- The default is `10000` (10ms or 100 packets/s)
+ Args:
+ max_packet_interval: set the time interval between subsequent packet
+ containers. Must be at least 1 microsecond. The value is in
+ microseconds, and is checked across all types of events contained in
+ the EventPacketContainer. The default is `10000` (10ms or 100 packets/s)
"""
return self.set_config(
libcaer.CAER_HOST_CONFIG_PACKETS,
@@ -200,18 +224,16 @@ def set_max_container_interval(self, max_packet_interval=10000):
max_packet_interval,
)
- def set_data_exchange_blocking(self, exchange_blocking=True):
- """Set data exchange blocking.
+ def set_data_exchange_blocking(self, exchange_blocking: bool = True) -> bool:
+ """Sets data exchange blocking.
- # Arguments
- exchange_blocking: `bool`
- whether to start all the data producer modules on the device
- (DVS, APS, Mux, ...) automatically when starting the
- data transfer thread with `caerDeviceDataStart()` or not.
- If disabled, be aware you will have to start the right modules
- manually, which can be useful if you need precise control
- over which ones are running at any time.
- The default is `True`.
+ Arg:
+ exchange_blocking: Whether to start all the data producer modules on the
+ device (DVS, APS, Mux, ...) automatically when starting the data
+ transfer thread with `caerDeviceDataStart()` or not. If disabled, be
+ aware you will have to start the right modules manually, which can be
+ useful if you need precise control over which ones are running at any
+ time. The default is `True`.
"""
return self.set_config(
libcaer.CAER_HOST_CONFIG_DATAEXCHANGE,
@@ -219,71 +241,32 @@ def set_data_exchange_blocking(self, exchange_blocking=True):
exchange_blocking,
)
- def set_config(self, mod_addr, param_addr, param):
- """Set configuration.
-
- The main function of setting configurations (e.g., bias).
+ def get_config(self, mod_addr: int, param_addr: int) -> int | bool | None:
+ """Gets Configuration.
- # Arguments
- mod_addr: `int`
- a module address, used to specify which configuration module
- one wants to update. Negative addresses are used for host-side
- configuration, while positive addresses (including zero) are
- used for device-side configuration.
- param_addr: `int`
- a parameter address, to select a specific parameter to update
- from this particular configuration module.
- Only positive numbers
+ Args:
+ mod_addr: a module address, used to specify which configuration module one
+ wants to update. Negative addresses are used for host-side
+ configuration, while positive addresses (including zero) are used for
+ device-side configuration.
+ param_addr: a parameter address, to select a specific parameter to update
+ from this particular configuration module. Only positive numbers
(including zero) are allowed.
- param: `int` or `bool`
- a configuration parameter's new value.
- # Returns
- flag: `bool`
- returns `True` if the config is set successfully,
- `False` otherwise.
- """
- if self.handle is not None:
- set_success = libcaer.caerDeviceConfigSet(
- self.handle, mod_addr, param_addr, param
- )
- return set_success
- else:
- return False
-
- def get_config(self, mod_addr, param_addr):
- """Get Configuration.
-
- # Arguments
- mod_addr: `int`
- a module address, used to specify which configuration module
- one wants to update. Negative addresses are used for host-side
- configuration, while positive addresses (including zero) are
- used for device-side configuration.
- param_addr: `int`
- a parameter address, to select a specific parameter to update
- from this particular configuration module.
- Only positive numbers
- (including zero) are allowed.
-
- # Returns
- param: `int` or `bool`
- a configuration parameter's new value. Returns None
- if the handle is not valid.
+ Returns:
+ A configuration parameter's new value. Returns None if the handle is not
+ valid.
"""
if self.handle is not None:
return libcaer.caerDeviceConfigGet(self.handle, mod_addr, param_addr)
else:
return None
- def get_packet_container(self):
- """Get event packet container.
+ def get_packet_container(self) -> tuple[Any | None, int | None]:
+ """Gets event packet container.
- # Returns
- packet_container: `caerEventPacketContainer`
- a container that consists of event packets.
- packet_number: `int`
- number of event packet in the container.
+ Returns:
+ A tuple of `(packet_container, num_event_packets)`.
"""
packet_container = libcaer.caerDeviceDataGet(self.handle)
if packet_container is not None:
@@ -294,20 +277,17 @@ def get_packet_container(self):
else:
return None, None
- def get_packet_header(self, packet_container, idx):
- """Get a single packet header.
+ def get_packet_header(
+ self, packet_container: Any, idx: int
+ ) -> tuple[Any | None, Any | None]:
+ """Gets a single packet header.
- # Arguments
- packet_container: `caerEventPacketContainer`
- the event packet container
- idx: `int`
- the index of the packet header
+ Args:
+ packet_container: the event packet container
+ idx: the index of the packet header
- # Returns
- packet_header: `caerEventPacketHeader`
- the header that represents a event packet
- packet_type: `caerEventPacketType`
- the type of the event packet
+ Returns:
+ A tuple of `(packet_header, packet_type)`.
"""
packet_header = libcaer.caerEventPacketContainerGetEventPacket(
packet_container, idx
diff --git a/setup.py b/setup.py
index e169017..ad4ec78 100644
--- a/setup.py
+++ b/setup.py
@@ -25,13 +25,10 @@
License :: OSI Approved :: MIT License
"""
-try:
- from pyaer import __about__
-
- about = __about__.__dict__
-except ImportError:
- about = dict()
- exec(open("pyaer/__about__.py").read(), about)
+__version__ = "0.2.7a0"
+__author__ = "Yuhuang Hu"
+__author_email__ = "duguyue100@gmail.com"
+__url__ = "https://github.com/duguyue100/pyaer"
python_paths = get_paths()
@@ -74,10 +71,10 @@
setup(
name="pyaer",
- version=about["__version__"],
- author=about["__author__"],
- author_email=about["__author_email__"],
- url=about["__url__"],
+ version=__version__,
+ author=__author__,
+ author_email=__author_email__,
+ url=__url__,
install_requires=["numpy"],
packages=find_packages(),
ext_modules=[libcaer_wrap],