Skip to content

Commit

Permalink
simplify
Browse files Browse the repository at this point in the history
  • Loading branch information
duguyue100 committed Mar 16, 2024
1 parent 8d4e3e8 commit d68f897
Show file tree
Hide file tree
Showing 5 changed files with 120 additions and 157 deletions.
6 changes: 0 additions & 6 deletions pyaer/__about__.py

This file was deleted.

12 changes: 1 addition & 11 deletions pyaer/__init__.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,8 @@
from __future__ import absolute_import
from __future__ import print_function

import os

from pyaer import log
from pyaer.__about__ import __author__ # noqa
from pyaer.__about__ import __version__ # noqa


FILE_PATH = os.path.realpath(__file__)
CURR_PATH = os.path.dirname(os.path.realpath(__file__))
PKG_PATH = os.path.dirname(CURR_PATH)

# System logging level

LOG_LEVEL = log.DEBUG

try:
Expand Down
2 changes: 2 additions & 0 deletions pyaer/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
DISABLE_MAX_CONTAINER_SIZE = 0
DEFAULT_MAX_PACKET_INTERVAL = 10000 # 10ms
238 changes: 109 additions & 129 deletions pyaer/device.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
from __future__ import annotations

import abc
from typing import Any

from pyaer import libcaer
from pyaer.constants import DEFAULT_MAX_PACKET_INTERVAL
from pyaer.constants import DISABLE_MAX_CONTAINER_SIZE


class USBDevice(object):
class USBDevice:
"""Base class for all USB devices.
This class is the base of DVS128, DAVIS240, DAVIS346 and DYNAPSE.
"""

def __init__(self):
"""Device."""
def __init__(self) -> None:
self.handle = None

# functions for get events number and packet functions
Expand All @@ -33,29 +35,27 @@ def __init__(self):
}

@abc.abstractmethod
def obtain_device_info(self, handle):
"""Obtain device handle.
def obtain_device_info(self, handle: Any) -> None:
"""Obtains device handle.
This abstract method should be implemented in all derived classes.
This method collects the general information about the USB device
such as the width and height of the camera or the serial number
of the device.
This abstract method should be implemented in all derived classes. This method
collects the general information about the device such as the width and height
of the camera or the serial number of the device.
# Arguments
handle: `caerDeviceHandle`<br/>
a valid device handle that can be used with the other
`libcaer` functions, or `None` on error.
# Args:
handle: a valid device handle that can be used with the other `libcaer`
functions, or `None` on error.
"""
return
raise NotADirectoryError()

@abc.abstractmethod
def get_event(self):
"""Get Event.
def get_event(self) -> None:
"""Gets Event.
This abstract method should be implemented in all derived classes. This method
returns a packet of events according to the type of the sensor.
"""
return
raise NotImplementedError()

def open(
self,
Expand Down Expand Up @@ -106,15 +106,15 @@ def open(
if self.handle is None:
raise ValueError("The device is failed to open.")

def close(self):
"""Close USB device.
def close(self) -> None:
"""Closes USB device.
This method closes an opened USB device if the respective handle is not None.
"""
if self.handle is not None:
libcaer.caerDeviceClose(self.handle)

def shutdown(self):
def shutdown(self) -> None:
"""Shutdown device.
This method is a combination of `data_stop` and `close`. This is a preferred way
Expand All @@ -123,13 +123,12 @@ def shutdown(self):
self.data_stop()
self.close()

def data_start(self):
"""Start data transmission.
def data_start(self) -> bool:
"""Starts data transmission.
# Returns
flag: `bool`<br/>
Return `True` if the data transmission is
initialized successfully. Otherwise `False`.
Returns:
flag: Return `True` if the data transmission is initialized successfully.
Otherwise `False`.
"""
# TODO figure out the parameter meaning
if self.handle is not None:
Expand All @@ -140,25 +139,24 @@ def data_start(self):
else:
return False

def data_stop(self):
"""Stop data transmission.
def data_stop(self) -> None:
"""Stops data transmission.
This method stops the data transmission only. Note that this method does not
destroy the respective device `handle`.
"""
libcaer.caerDeviceDataStop(self.handle)

def send_default_config(self):
def send_default_config(self) -> bool:
"""Send default configuration.
Each type of devices has a set of default configurations (e.g. bias)
Each type of devices has a set of default configurations (e.g., bias)
that are pre-defined in the `libcaer` library.
Note that the default configuration might not be suitable for your
needs.
Note that the default configuration might not be suitable for your needs.
# Returns
flag: `bool`<br/>
return `True` if the default config is set successfully,
Returns:
flag: Return `True` if the default config is set successfully,
`False` otherwise.
"""
if self.handle is not None:
Expand All @@ -167,123 +165,108 @@ def send_default_config(self):
else:
return False

def set_max_container_packet_size(self, max_packet_size=0):
def set_config(self, mod_addr: int, param_addr: int, param: int | bool) -> bool:
"""Sets configuration.
The main function of setting configurations (e.g., bias).
# Args:
mod_addr: a module address, used to specify which configuration module one
wants to update. Negative addresses are used for host-side
configuration, while positive addresses (including zero) are used for
device-side configuration.
param_addr: a parameter address, to select a specific parameter to update
from this particular configuration module. Only positive numbers
(including zero) are allowed.
param: a configuration parameter's new value.
# Returns:
`True` if the config is set successfully, `False` otherwise.
"""
if self.handle is not None:
set_success = libcaer.caerDeviceConfigSet(
self.handle, mod_addr, param_addr, param
)
return set_success
else:
return False

def set_max_container_packet_size(
self, max_packet_size: int = DISABLE_MAX_CONTAINER_SIZE
) -> bool:
"""Set max container packet size.
# Arguments
max_packet_size: `int`<br/>
set the maximum number of events any of a packet container's
packets may hold before it's made available to the user.
Set to zero to disable.<br/>
The default is `0`.
Args:
max_packet_size: set the maximum number of events any of a packet
container's packets may hold before it's made available to the user.
Set to `pyaer.constants.DISABLE_MAX_CONTAINER_SIZE` to disable.
"""
return self.set_config(
libcaer.CAER_HOST_CONFIG_PACKETS,
libcaer.CAER_HOST_CONFIG_PACKETS_MAX_CONTAINER_PACKET_SIZE,
max_packet_size,
)

def set_max_container_interval(self, max_packet_interval=10000):
"""Set max packet interval.
def set_max_container_interval(
self, max_packet_interval: int = DEFAULT_MAX_PACKET_INTERVAL
) -> bool:
"""Sets max packet interval.
# Arguments
max_packet_interval: `int`<br/>
set the time interval between subsequent packet containers.
Must be at least 1 microsecond.
The value is in microseconds, and is checked across all
types of events contained in the EventPacketContainer.<br/>
The default is `10000` (10ms or 100 packets/s)
Args:
max_packet_interval: set the time interval between subsequent packet
containers. Must be at least 1 microsecond. The value is in
microseconds, and is checked across all types of events contained in
the EventPacketContainer. The default is `10000` (10ms or 100 packets/s)
"""
return self.set_config(
libcaer.CAER_HOST_CONFIG_PACKETS,
libcaer.CAER_HOST_CONFIG_PACKETS_MAX_CONTAINER_INTERVAL,
max_packet_interval,
)

def set_data_exchange_blocking(self, exchange_blocking=True):
"""Set data exchange blocking.
def set_data_exchange_blocking(self, exchange_blocking: bool = True) -> bool:
"""Sets data exchange blocking.
# Arguments
exchange_blocking: `bool`<br/>
whether to start all the data producer modules on the device
(DVS, APS, Mux, ...) automatically when starting the
data transfer thread with `caerDeviceDataStart()` or not.
If disabled, be aware you will have to start the right modules
manually, which can be useful if you need precise control
over which ones are running at any time.<br/>
The default is `True`.
Arg:
exchange_blocking: Whether to start all the data producer modules on the
device (DVS, APS, Mux, ...) automatically when starting the data
transfer thread with `caerDeviceDataStart()` or not. If disabled, be
aware you will have to start the right modules manually, which can be
useful if you need precise control over which ones are running at any
time. The default is `True`.
"""
return self.set_config(
libcaer.CAER_HOST_CONFIG_DATAEXCHANGE,
libcaer.CAER_HOST_CONFIG_DATAEXCHANGE_BLOCKING,
exchange_blocking,
)

def set_config(self, mod_addr, param_addr, param):
"""Set configuration.
The main function of setting configurations (e.g., bias).
def get_config(self, mod_addr: int, param_addr: int) -> int | bool | None:
"""Gets Configuration.
# Arguments
mod_addr: `int`<br/>
a module address, used to specify which configuration module
one wants to update. Negative addresses are used for host-side
configuration, while positive addresses (including zero) are
used for device-side configuration.
param_addr: `int`<br/>
a parameter address, to select a specific parameter to update
from this particular configuration module.
Only positive numbers
Args:
mod_addr: a module address, used to specify which configuration module one
wants to update. Negative addresses are used for host-side
configuration, while positive addresses (including zero) are used for
device-side configuration.
param_addr: a parameter address, to select a specific parameter to update
from this particular configuration module. Only positive numbers
(including zero) are allowed.
param: `int` or `bool`<br/>
a configuration parameter's new value.
# Returns
flag: `bool`<br/>
returns `True` if the config is set successfully,
`False` otherwise.
"""
if self.handle is not None:
set_success = libcaer.caerDeviceConfigSet(
self.handle, mod_addr, param_addr, param
)
return set_success
else:
return False

def get_config(self, mod_addr, param_addr):
"""Get Configuration.
# Arguments
mod_addr: `int`<br/>
a module address, used to specify which configuration module
one wants to update. Negative addresses are used for host-side
configuration, while positive addresses (including zero) are
used for device-side configuration.
param_addr: `int`<br/>
a parameter address, to select a specific parameter to update
from this particular configuration module.
Only positive numbers
(including zero) are allowed.
# Returns
param: `int` or `bool`<br/>
a configuration parameter's new value. Returns None
if the handle is not valid.
Returns:
A configuration parameter's new value. Returns None if the handle is not
valid.
"""
if self.handle is not None:
return libcaer.caerDeviceConfigGet(self.handle, mod_addr, param_addr)
else:
return None

def get_packet_container(self):
"""Get event packet container.
def get_packet_container(self) -> tuple[Any | None, int | None]:
"""Gets event packet container.
# Returns
packet_container: `caerEventPacketContainer`<br/>
a container that consists of event packets.
packet_number: `int`<br/>
number of event packet in the container.
Returns:
A tuple of `(packet_container, num_event_packets)`.
"""
packet_container = libcaer.caerDeviceDataGet(self.handle)
if packet_container is not None:
Expand All @@ -294,20 +277,17 @@ def get_packet_container(self):
else:
return None, None

def get_packet_header(self, packet_container, idx):
"""Get a single packet header.
def get_packet_header(
self, packet_container: Any, idx: int
) -> tuple[Any | None, Any | None]:
"""Gets a single packet header.
# Arguments
packet_container: `caerEventPacketContainer`<br/>
the event packet container
idx: `int`<br/>
the index of the packet header
Args:
packet_container: the event packet container
idx: the index of the packet header
# Returns
packet_header: `caerEventPacketHeader`<br/>
the header that represents a event packet
packet_type: `caerEventPacketType`<br/>
the type of the event packet
Returns:
A tuple of `(packet_header, packet_type)`.
"""
packet_header = libcaer.caerEventPacketContainerGetEventPacket(
packet_container, idx
Expand Down
Loading

0 comments on commit d68f897

Please sign in to comment.