From e16b06126f53eaea033edf6d7575db76e3b4873e Mon Sep 17 00:00:00 2001 From: agennart Date: Thu, 25 Jul 2024 14:17:11 +0200 Subject: [PATCH 1/3] Reformat file with tab is 4 spaces and use f-string --- scripts/ntrip_ros.py | 375 +++++++++---------- src/ntrip_client/nmea_parser.py | 88 ++--- src/ntrip_client/ntrip_client.py | 611 ++++++++++++++++--------------- src/ntrip_client/rtcm_parser.py | 205 ++++++----- 4 files changed, 650 insertions(+), 629 deletions(-) diff --git a/scripts/ntrip_ros.py b/scripts/ntrip_ros.py index 5dbc4e4..8810b88 100755 --- a/scripts/ntrip_ros.py +++ b/scripts/ntrip_ros.py @@ -19,194 +19,197 @@ have_mavros_msgs = False have_rtcm_msgs = False if importlib.util.find_spec(_MAVROS_MSGS_NAME) is not None: - have_mavros_msgs = True - from mavros_msgs.msg import RTCM as mavros_msgs_RTCM + have_mavros_msgs = True + from mavros_msgs.msg import RTCM as mavros_msgs_RTCM if importlib.util.find_spec(_RTCM_MSGS_NAME) is not None: - have_rtcm_msgs = True - from rtcm_msgs.msg import Message as rtcm_msgs_RTCM + have_rtcm_msgs = True + from rtcm_msgs.msg import Message as rtcm_msgs_RTCM class NTRIPRos(Node): - def __init__(self): - # Read a debug flag from the environment that should have been set by the launch file - try: - self._debug = json.loads(os.environ["NTRIP_CLIENT_DEBUG"].lower()) - except: - self._debug = False - - # Init the node and declare params - super().__init__('ntrip_client') - self.declare_parameters( - namespace='', - parameters=[ - ('host', '127.0.0.1'), - ('port', 2101), - ('mountpoint', 'mount'), - ('ntrip_version', 'None'), - ('authenticate', False), - ('username', ''), - ('password', ''), - ('ssl', False), - ('cert', 'None'), - ('key', 'None'), - ('ca_cert', 'None'), - ('rtcm_frame_id', 'odom'), - ('nmea_max_length', NMEA_DEFAULT_MAX_LENGTH), - ('nmea_min_length', NMEA_DEFAULT_MIN_LENGTH), - ('rtcm_message_package', _MAVROS_MSGS_NAME), - ('reconnect_attempt_max', NTRIPClient.DEFAULT_RECONNECT_ATTEMPT_MAX), - ('reconnect_attempt_wait_seconds', NTRIPClient.DEFAULT_RECONNECT_ATEMPT_WAIT_SECONDS), - ('rtcm_timeout_seconds', NTRIPClient.DEFAULT_RTCM_TIMEOUT_SECONDS), - ] - ) - - # Read some mandatory config - host = self.get_parameter('host').value - port = self.get_parameter('port').value - mountpoint = self.get_parameter('mountpoint').value - - # Optionally get the ntrip version from the launch file - ntrip_version = self.get_parameter('ntrip_version').value - if ntrip_version == 'None': - ntrip_version = None - - # Set the log level to debug if debug is true - if self._debug: - rclpy.logging.set_logger_level(self.get_logger().name, rclpy.logging.LoggingSeverity.DEBUG) - - # If we were asked to authenticate, read the username and password - username = None - password = None - if self.get_parameter('authenticate').value: - username = self.get_parameter('username').value - password = self.get_parameter('password').value - if not username: - self.get_logger().error( - 'Requested to authenticate, but param "username" was not set') - sys.exit(1) - if not password: - self.get_logger().error( - 'Requested to authenticate, but param "password" was not set') - sys.exit(1) - - # Read an optional Frame ID from the config - self._rtcm_frame_id = self.get_parameter('rtcm_frame_id').value - - # Determine the type of RTCM message that will be published - rtcm_message_package = self.get_parameter('rtcm_message_package').value - if rtcm_message_package == _MAVROS_MSGS_NAME: - if have_mavros_msgs: - self._rtcm_message_type = mavros_msgs_RTCM - self._create_rtcm_message = self._create_mavros_msgs_rtcm_message - else: - self.get_logger().fatal('The requested RTCM package {} is a valid option, but we were unable to import it. Please make sure you have it installed'.format(rtcm_message_package)) - elif rtcm_message_package == _RTCM_MSGS_NAME: - if have_rtcm_msgs: - self._rtcm_message_type = rtcm_msgs_RTCM - self._create_rtcm_message = self._create_rtcm_msgs_rtcm_message - else: - self.get_logger().fatal('The requested RTCM package {} is a valid option, but we were unable to import it. Please make sure you have it installed'.format(rtcm_message_package)) - else: - self.get_logger().fatal('The RTCM package {} is not a valid option. Please choose between the following packages {}'.format(rtcm_message_package, ','.join([_MAVROS_MSGS_NAME, _RTCM_MSGS_NAME]))) - - # Setup the RTCM publisher - self._rtcm_pub = self.create_publisher(self._rtcm_message_type, 'rtcm', 10) - - # Initialize the client - self._client = NTRIPClient( - host=host, - port=port, - mountpoint=mountpoint, - ntrip_version=ntrip_version, - username=username, - password=password, - logerr=self.get_logger().error, - logwarn=self.get_logger().warning, - loginfo=self.get_logger().info, - logdebug=self.get_logger().debug - ) - - # Get some SSL parameters for the NTRIP client - self._client.ssl = self.get_parameter('ssl').value - self._client.cert = self.get_parameter('cert').value - self._client.key = self.get_parameter('key').value - self._client.ca_cert = self.get_parameter('ca_cert').value - if self._client.cert == 'None': - self._client.cert = None - if self._client.key == 'None': - self._client.key = None - if self._client.ca_cert == 'None': - self._client.ca_cert = None - - # Get some timeout parameters for the NTRIP client - self._client.nmea_parser.nmea_max_length = self.get_parameter('nmea_max_length').value - self._client.nmea_parser.nmea_min_length = self.get_parameter('nmea_min_length').value - self._client.reconnect_attempt_max = self.get_parameter('reconnect_attempt_max').value - self._client.reconnect_attempt_wait_seconds = self.get_parameter('reconnect_attempt_wait_seconds').value - self._client.rtcm_timeout_seconds = self.get_parameter('rtcm_timeout_seconds').value - - def run(self): - # Connect the client - if not self._client.connect(): - self.get_logger().error('Unable to connect to NTRIP server') - return False - # Setup our subscriber - self._nmea_sub = self.create_subscription(Sentence, 'nmea', self.subscribe_nmea, 10) - - # Start the timer that will check for RTCM data - self._rtcm_timer = self.create_timer(0.1, self.publish_rtcm) - return True - - def stop(self): - self.get_logger().info('Stopping RTCM publisher') - if self._rtcm_timer: - self._rtcm_timer.cancel() - self._rtcm_timer.destroy() - self.get_logger().info('Disconnecting NTRIP client') - self._client.disconnect() - self.get_logger().info('Shutting down node') - self.destroy_node() - - def subscribe_nmea(self, nmea): - # Just extract the NMEA from the message, and send it right to the server - self._client.send_nmea(nmea.sentence) - - def publish_rtcm(self): - for raw_rtcm in self._client.recv_rtcm(): - self._rtcm_pub.publish(self._create_rtcm_message(raw_rtcm)) - - def _create_mavros_msgs_rtcm_message(self, rtcm): - return mavros_msgs_RTCM( - header=Header( - stamp=self.get_clock().now().to_msg(), - frame_id=self._rtcm_frame_id - ), - data=rtcm - ) - - def _create_rtcm_msgs_rtcm_message(self, rtcm): - return rtcm_msgs_RTCM( - header=Header( - stamp=self.get_clock().now().to_msg(), - frame_id=self._rtcm_frame_id - ), - message=rtcm - ) + def __init__(self): + # Read a debug flag from the environment that should have been set by the launch file + try: + self._debug = json.loads(os.environ["NTRIP_CLIENT_DEBUG"].lower()) + except: + self._debug = False + + # Init the node and declare params + super().__init__('ntrip_client') + self.declare_parameters( + namespace='', + parameters=[ + ('host', '127.0.0.1'), + ('port', 2101), + ('mountpoint', 'mount'), + ('ntrip_version', 'None'), + ('authenticate', False), + ('username', ''), + ('password', ''), + ('ssl', False), + ('cert', 'None'), + ('key', 'None'), + ('ca_cert', 'None'), + ('rtcm_frame_id', 'odom'), + ('nmea_max_length', NMEA_DEFAULT_MAX_LENGTH), + ('nmea_min_length', NMEA_DEFAULT_MIN_LENGTH), + ('rtcm_message_package', _MAVROS_MSGS_NAME), + ('reconnect_attempt_max', NTRIPClient.DEFAULT_RECONNECT_ATTEMPT_MAX), + ('reconnect_attempt_wait_seconds', NTRIPClient.DEFAULT_RECONNECT_ATEMPT_WAIT_SECONDS), + ('rtcm_timeout_seconds', NTRIPClient.DEFAULT_RTCM_TIMEOUT_SECONDS), + ] + ) + + # Read some mandatory config + host = self.get_parameter('host').value + port = self.get_parameter('port').value + mountpoint = self.get_parameter('mountpoint').value + + # Optionally get the ntrip version from the launch file + ntrip_version = self.get_parameter('ntrip_version').value + if ntrip_version == 'None': + ntrip_version = None + + # Set the log level to debug if debug is true + if self._debug: + rclpy.logging.set_logger_level(self.get_logger().name, rclpy.logging.LoggingSeverity.DEBUG) + + # If we were asked to authenticate, read the username and password + username = None + password = None + if self.get_parameter('authenticate').value: + username = self.get_parameter('username').value + password = self.get_parameter('password').value + if not username: + self.get_logger().error( + 'Requested to authenticate, but param "username" was not set') + sys.exit(1) + if not password: + self.get_logger().error( + 'Requested to authenticate, but param "password" was not set') + sys.exit(1) + + # Read an optional Frame ID from the config + self._rtcm_frame_id = self.get_parameter('rtcm_frame_id').value + + # Determine the type of RTCM message that will be published + rtcm_message_package = self.get_parameter('rtcm_message_package').value + if rtcm_message_package == _MAVROS_MSGS_NAME: + if have_mavros_msgs: + self._rtcm_message_type = mavros_msgs_RTCM + self._create_rtcm_message = self._create_mavros_msgs_rtcm_message + else: + self.get_logger().fatal( + f'The requested RTCM package {rtcm_message_package} is a valid option, but we were unable to import it. Please make sure you have it installed') + elif rtcm_message_package == _RTCM_MSGS_NAME: + if have_rtcm_msgs: + self._rtcm_message_type = rtcm_msgs_RTCM + self._create_rtcm_message = self._create_rtcm_msgs_rtcm_message + else: + self.get_logger().fatal( + f'The requested RTCM package {rtcm_message_package} is a valid option, but we were unable to import it. Please make sure you have it installed') + else: + self.get_logger().fatal( + f'The RTCM package {rtcm_message_package} is not a valid option. Please choose between the following packages {_MAVROS_MSGS_NAME}, {_RTCM_MSGS_NAME}') + + # Setup the RTCM publisher + self._rtcm_pub = self.create_publisher(self._rtcm_message_type, 'rtcm', 10) + + # Initialize the client + self._client = NTRIPClient( + host=host, + port=port, + mountpoint=mountpoint, + ntrip_version=ntrip_version, + username=username, + password=password, + logerr=self.get_logger().error, + logwarn=self.get_logger().warning, + loginfo=self.get_logger().info, + logdebug=self.get_logger().debug + ) + + # Get some SSL parameters for the NTRIP client + self._client.ssl = self.get_parameter('ssl').value + self._client.cert = self.get_parameter('cert').value + self._client.key = self.get_parameter('key').value + self._client.ca_cert = self.get_parameter('ca_cert').value + if self._client.cert == 'None': + self._client.cert = None + if self._client.key == 'None': + self._client.key = None + if self._client.ca_cert == 'None': + self._client.ca_cert = None + + # Get some timeout parameters for the NTRIP client + self._client.nmea_parser.nmea_max_length = self.get_parameter('nmea_max_length').value + self._client.nmea_parser.nmea_min_length = self.get_parameter('nmea_min_length').value + self._client.reconnect_attempt_max = self.get_parameter('reconnect_attempt_max').value + self._client.reconnect_attempt_wait_seconds = self.get_parameter('reconnect_attempt_wait_seconds').value + self._client.rtcm_timeout_seconds = self.get_parameter('rtcm_timeout_seconds').value + + def run(self): + # Connect the client + if not self._client.connect(): + self.get_logger().error('Unable to connect to NTRIP server') + return False + # Setup our subscriber + self._nmea_sub = self.create_subscription(Sentence, 'nmea', self.subscribe_nmea, 10) + + # Start the timer that will check for RTCM data + self._rtcm_timer = self.create_timer(0.1, self.publish_rtcm) + return True + + def stop(self): + self.get_logger().info('Stopping RTCM publisher') + if self._rtcm_timer: + self._rtcm_timer.cancel() + self._rtcm_timer.destroy() + self.get_logger().info('Disconnecting NTRIP client') + self._client.disconnect() + self.get_logger().info('Shutting down node') + self.destroy_node() + + def subscribe_nmea(self, nmea): + # Just extract the NMEA from the message, and send it right to the server + self._client.send_nmea(nmea.sentence) + + def publish_rtcm(self): + for raw_rtcm in self._client.recv_rtcm(): + self._rtcm_pub.publish(self._create_rtcm_message(raw_rtcm)) + + def _create_mavros_msgs_rtcm_message(self, rtcm): + return mavros_msgs_RTCM( + header=Header( + stamp=self.get_clock().now().to_msg(), + frame_id=self._rtcm_frame_id + ), + data=rtcm + ) + + def _create_rtcm_msgs_rtcm_message(self, rtcm): + return rtcm_msgs_RTCM( + header=Header( + stamp=self.get_clock().now().to_msg(), + frame_id=self._rtcm_frame_id + ), + message=rtcm + ) if __name__ == '__main__': - # Start the node - rclpy.init() - node = NTRIPRos() - if not node.run(): - sys.exit(1) - try: - # Spin until we are shut down - rclpy.spin(node) - except KeyboardInterrupt: - pass - except BaseException as e: - raise e - finally: - node.stop() - - # Shutdown the node and stop rclpy - rclpy.shutdown() \ No newline at end of file + # Start the node + rclpy.init() + node = NTRIPRos() + if not node.run(): + sys.exit(1) + try: + # Spin until we are shut down + rclpy.spin(node) + except KeyboardInterrupt: + pass + except BaseException as e: + raise e + finally: + node.stop() + + # Shutdown the node and stop rclpy + rclpy.shutdown() diff --git a/src/ntrip_client/nmea_parser.py b/src/ntrip_client/nmea_parser.py index 70630b9..f0e3c72 100644 --- a/src/ntrip_client/nmea_parser.py +++ b/src/ntrip_client/nmea_parser.py @@ -6,51 +6,51 @@ class NMEAParser: - def __init__(self, logerr=logging.error, logwarn=logging.warning, loginfo=logging.info, logdebug=logging.debug): - # Bit of a strange pattern here, but save the log functions so we can be agnostic of ROS - self._logerr = logerr - self._logwarn = logwarn - self._loginfo = loginfo - self._logdebug = logdebug + def __init__(self, logerr=logging.error, logwarn=logging.warning, loginfo=logging.info, logdebug=logging.debug): + # Bit of a strange pattern here, but save the log functions so we can be agnostic of ROS + self._logerr = logerr + self._logwarn = logwarn + self._loginfo = loginfo + self._logdebug = logdebug - # Save some other config - self.nmea_max_length = NMEA_DEFAULT_MAX_LENGTH - self.nmea_min_length = NMEA_DEFAULT_MIN_LENGTH + # Save some other config + self.nmea_max_length = NMEA_DEFAULT_MAX_LENGTH + self.nmea_min_length = NMEA_DEFAULT_MIN_LENGTH - def is_valid_sentence(self, sentence): - # Simple sanity checks - if len(sentence) > self.nmea_max_length: - self._logwarn('Received invalid NMEA sentence. Max length is {}, but sentence was {} bytes'.format(self.nmea_max_length, len(sentence))) - self._logwarn('Sentence: {}'.format(sentence)) - return False - if len(sentence) < self.nmea_min_length: - self._logwarn('Received invalid NMEA sentence. We need at least {} bytes to parse but got {} bytes'.format(self.nmea_min_length, len(sentence))) - self._logwarn('Sentence: {}'.format(sentence)) - return False - if sentence[0] != '$' and sentence[0] != '!': - self._logwarn('Received invalid NMEA sentence. Sentence should begin with "$" or "!", but instead begins with {}'.format(sentence[0])) - self._logwarn('Sentence: {}'.format(sentence)) - return False - if sentence[-2:] != '\r\n': - self._logwarn('Received invalid NMEA sentence. Sentence should end with \\r\\n, but instead ends with {}'.format(sentence[-2:])) - self._logwarn('Sentence: {}'.format(sentence)) - return False - if _NMEA_CHECKSUM_SEPERATOR not in sentence: - self._logwarn('Received invalid NMEA sentence. Sentence should have a "{}" character to seperate the checksum, but we could not find it.'.format(_NMEA_CHECKSUM_SEPERATOR)) - self._logwarn('Sentence: {}'.format(sentence)) - return False + def is_valid_sentence(self, sentence): + # Simple sanity checks + if len(sentence) > self.nmea_max_length: + self._logwarn(f'Received invalid NMEA sentence. Max length is {self.nmea_max_length}, but sentence was {len(sentence)} bytes') + self._logwarn(f'Sentence: {sentence}') + return False + if len(sentence) < self.nmea_min_length: + self._logwarn(f'Received invalid NMEA sentence. We need at least {self.nmea_min_length} bytes to parse but got {len(sentence)} bytes') + self._logwarn(f'Sentence: {sentence}') + return False + if sentence[0] != '$' and sentence[0] != '!': + self._logwarn(f'Received invalid NMEA sentence. Sentence should begin with "$" or "!", but instead begins with {sentence[0]}') + self._logwarn(f'Sentence: {sentence}') + return False + if sentence[-2:] != '\r\n': + self._logwarn(f'Received invalid NMEA sentence. Sentence should end with \\r\\n, but instead ends with {sentence[-2:]}') + self._logwarn(f'Sentence: {sentence}') + return False + if _NMEA_CHECKSUM_SEPERATOR not in sentence: + self._logwarn(f'Received invalid NMEA sentence. Sentence should have a "{_NMEA_CHECKSUM_SEPERATOR}" character to seperate the checksum, but we could not find it.') + self._logwarn(f'Sentence: {sentence}') + return False - # Checksum check - data, expected_checksum_str = sentence.rsplit(_NMEA_CHECKSUM_SEPERATOR, 1) - expected_checksum = int(expected_checksum_str, 16) - calculated_checksum = 0 - for char in data[1:]: - calculated_checksum ^= ord(char) - if expected_checksum != calculated_checksum: - self._logwarn('Received invalid NMEA sentence. Checksum mismatch'); - self._logwarn('Expected Checksum: 0x{:X}'.format(expected_checksum)) - self._logwarn('Calculated Checksum: 0x{:X}'.format(calculated_checksum)) - return False + # Checksum check + data, expected_checksum_str = sentence.rsplit(_NMEA_CHECKSUM_SEPERATOR, 1) + expected_checksum = int(expected_checksum_str, 16) + calculated_checksum = 0 + for char in data[1:]: + calculated_checksum ^= ord(char) + if expected_checksum != calculated_checksum: + self._logwarn('Received invalid NMEA sentence. Checksum mismatch'); + self._logwarn(f'Expected Checksum: 0x{expected_checksum:X}') + self._logwarn(f'Calculated Checksum: 0x{calculated_checksum:X}') + return False - # Passed all checks - return True + # Passed all checks + return True diff --git a/src/ntrip_client/ntrip_client.py b/src/ntrip_client/ntrip_client.py index d9dd5ce..f17f59f 100644 --- a/src/ntrip_client/ntrip_client.py +++ b/src/ntrip_client/ntrip_client.py @@ -12,313 +12,324 @@ _CHUNK_SIZE = 1024 _SOURCETABLE_RESPONSES = [ - 'SOURCETABLE 200 OK' + 'SOURCETABLE 200 OK' ] _SUCCESS_RESPONSES = [ - 'ICY 200 OK', - 'HTTP/1.0 200 OK', - 'HTTP/1.1 200 OK' + 'ICY 200 OK', + 'HTTP/1.0 200 OK', + 'HTTP/1.1 200 OK' ] _UNAUTHORIZED_RESPONSES = [ - '401' + '401' ] class NTRIPClient: - # Public constants - DEFAULT_RECONNECT_ATTEMPT_MAX = 10 - DEFAULT_RECONNECT_ATEMPT_WAIT_SECONDS = 5 - DEFAULT_RTCM_TIMEOUT_SECONDS = 4 - - def __init__(self, host, port, mountpoint, ntrip_version, username, password, logerr=logging.error, logwarn=logging.warning, loginfo=logging.info, logdebug=logging.debug): - # Bit of a strange pattern here, but save the log functions so we can be agnostic of ROS - self._logerr = logerr - self._logwarn = logwarn - self._loginfo = loginfo - self._logdebug = logdebug - - # Save the server info - self._host = host - self._port = port - self._mountpoint = mountpoint - self._ntrip_version = ntrip_version - if username is not None and password is not None: - self._basic_credentials = base64.b64encode('{}:{}'.format( - username, password).encode('utf-8')).decode('utf-8') - else: - self._basic_credentials = None - - # Initialize this so we don't throw an exception when closing - self._raw_socket = None - self._server_socket = None - - # Setup some parsers to parse incoming messages - self.rtcm_parser = RTCMParser( - logerr=logerr, - logwarn=logwarn, - loginfo=loginfo, - logdebug=logdebug - ) - self.nmea_parser = NMEAParser( - logerr=logerr, - logwarn=logwarn, - loginfo=loginfo, - logdebug=logdebug - ) - - # Public SSL configuration - self.ssl = False - self.cert = None - self.key = None - self.ca_cert = None - - # Setup some state - self._shutdown = False - self._connected = False - - # Private reconnect info - self._reconnect_attempt_count = 0 - self._nmea_send_failed_count = 0 - self._nmea_send_failed_max = 5 - self._read_zero_bytes_count = 0 - self._read_zero_bytes_max = 5 - self._first_rtcm_received = False - self._recv_rtcm_last_packet_timestamp = 0 - - # Public reconnect info - self.reconnect_attempt_max = self.DEFAULT_RECONNECT_ATTEMPT_MAX - self.reconnect_attempt_wait_seconds = self.DEFAULT_RECONNECT_ATEMPT_WAIT_SECONDS - self.rtcm_timeout_seconds = self.DEFAULT_RTCM_TIMEOUT_SECONDS - - def connect(self): - # Create a socket object that we will use to connect to the server - self._server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self._server_socket.settimeout(5) - - # Connect the socket to the server - try: - self._server_socket.connect((self._host, self._port)) - except Exception as e: - self._logerr( - 'Unable to connect socket to server at http://{}:{}'.format(self._host, self._port)) - self._logerr('Exception: {}'.format(str(e))) - return False - - # If SSL, wrap the socket - if self.ssl: - # Configre the context based on the config - self._ssl_context = ssl.create_default_context() - if self.cert: - self._ssl_context.load_cert_chain(self.cert, self.key) - if self.ca_cert: - self._ssl_context.load_verify_locations(self.ca_cert) - - # Save the old socket for later just in case, and create a new SSL socket - self._raw_socket = self._server_socket - self._server_socket = self._ssl_context.wrap_socket(self._raw_socket, server_hostname=self._host) - - # Send the HTTP Request - try: - self._server_socket.send(self._form_request()) - except Exception as e: - self._logerr( - 'Unable to send request to server at http://{}:{}'.format(self._host, self._port)) - self._logerr('Exception: {}'.format(str(e))) - return False - - # Get the response from the server - response = '' - try: - response = self._server_socket.recv(_CHUNK_SIZE).decode('ISO-8859-1') - except Exception as e: - self._logerr( - 'Unable to read response from server at http://{}:{}'.format(self._host, self._port)) - self._logerr('Exception: {}'.format(str(e))) - return False - - # Properly handle the response - if any(success in response for success in _SUCCESS_RESPONSES): - self._connected = True - - # Some debugging hints about the kind of error we received - known_error = False - if any(sourcetable in response for sourcetable in _SOURCETABLE_RESPONSES): - self._logwarn('Received sourcetable response from the server. This probably means the mountpoint specified is not valid') - known_error = True - elif any(unauthorized in response for unauthorized in _UNAUTHORIZED_RESPONSES): - self._logwarn('Received unauthorized response from the server. Check your username, password, and mountpoint to make sure they are correct.') - known_error = True - elif not self._connected and (self._ntrip_version == None or self._ntrip_version == ''): - self._logwarn('Received unknown error from the server. Note that the NTRIP version was not specified in the launch file. This is not necesarilly the cause of this error, but it may be worth checking your NTRIP casters documentation to see if the NTRIP version needs to be specified.') - known_error = True - - # Wish we could just return from the above checks, but some casters return both a success and an error in the response - # If we received any known error, even if we received a success it should be considered a failure - if known_error or not self._connected: - self._logerr('Invalid response received from http://{}:{}/{}'.format( - self._host, self._port, self._mountpoint)) - self._logerr('Response: {}'.format(response)) - return False - else: - self._loginfo( - 'Connected to http://{}:{}/{}'.format(self._host, self._port, self._mountpoint)) - return True - - - def disconnect(self): - # Disconnect the socket - self._connected = False - try: - if self._server_socket: - self._server_socket.shutdown(socket.SHUT_RDWR) - if self._raw_socket: - self._raw_socket.shutdown(socket.SHUT_RDWR) - except Exception as e: - self._logdebug('Encountered exception when shutting down the socket. This can likely be ignored') - self._logdebug('Exception: {}'.format(e)) - try: - if self._server_socket: - self._server_socket.close() - if self._raw_socket: - self._raw_socket.close() - except Exception as e: - self._logdebug('Encountered exception when closing the socket. This can likely be ignored') - self._logdebug('Exception: {}'.format(e)) - - def reconnect(self): - if self._connected: - while not self._shutdown: - self._reconnect_attempt_count += 1 - self.disconnect() - connect_success = self.connect() - if not connect_success and self._reconnect_attempt_count < self.reconnect_attempt_max: - self._logerr('Reconnect to http://{}:{} failed. Retrying in {} seconds'.format(self._host, self._port, self.reconnect_attempt_wait_seconds)) - time.sleep(self.reconnect_attempt_wait_seconds) - elif self._reconnect_attempt_count >= self.reconnect_attempt_max: - self._reconnect_attempt_count = 0 - raise Exception("Reconnect was attempted {} times, but never succeeded".format(self._reconnect_attempt_count)) - break - elif connect_success: - self._reconnect_attempt_count = 0 - break - else: - self._logdebug('Reconnect called while still connected, ignoring') - - def send_nmea(self, sentence): - if not self._connected: - self._logwarn('NMEA sent before client was connected, discarding NMEA') - return - - # Not sure if this is the right thing to do, but python will escape the return characters at the end of the string, so do this manually - if sentence[-4:] == '\\r\\n': - sentence = sentence[:-4] + '\r\n' - elif sentence[-2:] != '\r\n': - sentence = sentence + '\r\n' - - # Check if it is a valid NMEA sentence - if not self.nmea_parser.is_valid_sentence(sentence): - self._logwarn("Invalid NMEA sentence, not sending to server") - return - - # Encode the data and send it to the socket - try: - self._server_socket.send(sentence.encode('utf-8')) - except Exception as e: - self._logwarn('Unable to send NMEA sentence to server.') - self._logwarn('Exception: {}'.format(str(e))) - self._nmea_send_failed_count += 1 - if self._nmea_send_failed_count >= self._nmea_send_failed_max: - self._logwarn("NMEA sentence failed to send to server {} times, restarting".format(self._nmea_send_failed_count)) - self.reconnect() + # Public constants + DEFAULT_RECONNECT_ATTEMPT_MAX = 10 + DEFAULT_RECONNECT_ATEMPT_WAIT_SECONDS = 5 + DEFAULT_RTCM_TIMEOUT_SECONDS = 4 + + def __init__( + self, + host, + port, + mountpoint, + ntrip_version, + username, + password, + logerr=logging.error, + logwarn=logging.warning, + loginfo=logging.info, + logdebug=logging.debug + ): + # Bit of a strange pattern here, but save the log functions so we can be agnostic of ROS + self._logerr = logerr + self._logwarn = logwarn + self._loginfo = loginfo + self._logdebug = logdebug + + # Save the server info + self._host = host + self._port = port + self._mountpoint = mountpoint + self._ntrip_version = ntrip_version + if username is not None and password is not None: + self._basic_credentials = base64.b64encode(f'{username}:{password}'.encode('utf-8')).decode('utf-8') + else: + self._basic_credentials = None + + # Initialize this so we don't throw an exception when closing + self._raw_socket = None + self._server_socket = None + + # Setup some parsers to parse incoming messages + self.rtcm_parser = RTCMParser( + logerr=logerr, + logwarn=logwarn, + loginfo=loginfo, + logdebug=logdebug + ) + self.nmea_parser = NMEAParser( + logerr=logerr, + logwarn=logwarn, + loginfo=loginfo, + logdebug=logdebug + ) + + # Public SSL configuration + self.ssl = False + self._ssl_context = None + self.cert = None + self.key = None + self.ca_cert = None + + # Setup some state + self._shutdown = False + self._connected = False + + # Private reconnect info + self._reconnect_attempt_count = 0 self._nmea_send_failed_count = 0 - self.send_nmea(sentence) # Try sending the NMEA sentence again - - - def recv_rtcm(self): - if not self._connected: - self._logwarn( - 'RTCM requested before client was connected, returning empty list') - return [] - - # If it has been too long since we received an RTCM packet, reconnect - if time.time() - self.rtcm_timeout_seconds >= self._recv_rtcm_last_packet_timestamp and self._first_rtcm_received: - self._logerr('RTCM data not received for {} seconds, reconnecting'.format(self.rtcm_timeout_seconds)) - self.reconnect() - self._first_rtcm_received = False - - # Check if there is any data available on the socket - read_sockets, _, _ = select.select([self._server_socket], [], [], 0) - if not read_sockets: - return [] - - # Since we only ever pass the server socket to the list of read sockets, we can just read from that - # Read all available data into a buffer - data = b'' - while True: - try: - chunk = self._server_socket.recv(_CHUNK_SIZE) - data += chunk - if len(chunk) < _CHUNK_SIZE: - break - except Exception as e: - self._logerr('Error while reading {} bytes from socket'.format(_CHUNK_SIZE)) - if not self._socket_is_open(): - self._logerr('Socket appears to be closed. Reconnecting') - self.reconnect() - return [] - break - self._logdebug('Read {} bytes'.format(len(data))) - - # If 0 bytes were read from the socket even though we were told data is available multiple times, - # it can be safely assumed that we can reconnect as the server has closed the connection - if len(data) == 0: - self._read_zero_bytes_count += 1 - if self._read_zero_bytes_count >= self._read_zero_bytes_max: - self._logwarn('Reconnecting because we received 0 bytes from the socket even though it said there was data available {} times'.format(self._read_zero_bytes_count)) - self.reconnect() + self._nmea_send_failed_max = 5 self._read_zero_bytes_count = 0 - return [] - else: - # Looks like we received valid data, so note when the data was received - self._recv_rtcm_last_packet_timestamp = time.time() - self._first_rtcm_received = True - - # Send the data to the RTCM parser to parse it - return self.rtcm_parser.parse(data) if data else [] - - def shutdown(self): - # Set some state, and then disconnect - self._shutdown = True - self.disconnect() - - def _form_request(self): - if self._ntrip_version != None and self._ntrip_version != '': - request_str = 'GET /{} HTTP/1.0\r\nNtrip-Version: {}\r\nUser-Agent: NTRIP ntrip_client_ros\r\n'.format( - self._mountpoint, self._ntrip_version) - else: - request_str = 'GET /{} HTTP/1.0\r\nUser-Agent: NTRIP ntrip_client_ros\r\n'.format( - self._mountpoint) - if self._basic_credentials is not None: - request_str += 'Authorization: Basic {}\r\n'.format( - self._basic_credentials) - request_str += '\r\n' - return request_str.encode('utf-8') - - def _socket_is_open(self): - try: - # this will try to read bytes without blocking and also without removing them from buffer (peek only) - data = self._server_socket.recv(_CHUNK_SIZE, socket.MSG_DONTWAIT | socket.MSG_PEEK) - if len(data) == 0: - return False - except BlockingIOError: - return True # socket is open and reading from it would block - except ConnectionResetError: - self._logwarn('Connection reset by peer') - return False # socket was closed for some other reason - except socket.timeout: - return True # timeout likely means that the socket is still open - except Exception as e: - self._logwarn('Socket appears to be closed') - self._logwarn('Exception: {}'.format(e)) - return False - return True + self._read_zero_bytes_max = 5 + self._first_rtcm_received = False + self._recv_rtcm_last_packet_timestamp = 0 + + # Public reconnect info + self.reconnect_attempt_max = self.DEFAULT_RECONNECT_ATTEMPT_MAX + self.reconnect_attempt_wait_seconds = self.DEFAULT_RECONNECT_ATEMPT_WAIT_SECONDS + self.rtcm_timeout_seconds = self.DEFAULT_RTCM_TIMEOUT_SECONDS + + def connect(self): + # Create a socket object that we will use to connect to the server + self._server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self._server_socket.settimeout(5) + + # Connect the socket to the server + try: + self._server_socket.connect((self._host, self._port)) + except Exception as e: + self._logerr( + f'Unable to connect socket to server at http://{self._host}:{self._port}') + self._logerr(f'Exception: {e}') + return False + + # If SSL, wrap the socket + if self.ssl: + # Configre the context based on the config + self._ssl_context = ssl.create_default_context() + if self.cert: + self._ssl_context.load_cert_chain(self.cert, self.key) + if self.ca_cert: + self._ssl_context.load_verify_locations(self.ca_cert) + + # Save the old socket for later just in case, and create a new SSL socket + self._raw_socket = self._server_socket + self._server_socket = self._ssl_context.wrap_socket(self._raw_socket, server_hostname=self._host) + + # Send the HTTP Request + try: + self._server_socket.send(self._form_request()) + except Exception as e: + self._logerr( + f'Unable to send request to server at http://{self._host}:{self._port}') + self._logerr(f'Exception: {e}') + return False + + # Get the response from the server + response = '' + try: + response = self._server_socket.recv(_CHUNK_SIZE).decode('ISO-8859-1') + except Exception as e: + self._logerr( + f'Unable to read response from server at http://{self._host}:{self._port}') + self._logerr(f'Exception: {e}') + return False + + # Properly handle the response + if any(success in response for success in _SUCCESS_RESPONSES): + self._connected = True + + # Some debugging hints about the kind of error we received + known_error = False + if any(sourcetable in response for sourcetable in _SOURCETABLE_RESPONSES): + self._logwarn( + 'Received sourcetable response from the server. This probably means the mountpoint specified is not valid') + known_error = True + elif any(unauthorized in response for unauthorized in _UNAUTHORIZED_RESPONSES): + self._logwarn( + 'Received unauthorized response from the server. Check your username, password, and mountpoint to make sure they are correct.') + known_error = True + elif not self._connected and self._ntrip_version in (None, ''): + self._logwarn( + 'Received unknown error from the server. Note that the NTRIP version was not specified in the launch file. This is not necesarilly the cause of this error, but it may be worth checking your NTRIP casters documentation to see if the NTRIP version needs to be specified.') + known_error = True + + # Wish we could just return from the above checks, but some casters return both a success and an error in the response + # If we received any known error, even if we received a success it should be considered a failure + if known_error or not self._connected: + self._logerr(f'Invalid response received from http://{self._host}:{self._port}/{self._mountpoint}') + self._logerr(f'Response: {response}') + return False + else: + self._loginfo(f'Connected to http://{self._host}:{self._port}/{self._mountpoint}') + return True + + + def disconnect(self): + # Disconnect the socket + self._connected = False + try: + if self._server_socket: + self._server_socket.shutdown(socket.SHUT_RDWR) + if self._raw_socket: + self._raw_socket.shutdown(socket.SHUT_RDWR) + except Exception as e: + self._logdebug('Encountered exception when shutting down the socket. This can likely be ignored') + self._logdebug(f'Exception: {e}') + try: + if self._server_socket: + self._server_socket.close() + if self._raw_socket: + self._raw_socket.close() + except Exception as e: + self._logdebug('Encountered exception when closing the socket. This can likely be ignored') + self._logdebug(f'Exception: {e}') + + def reconnect(self): + if self._connected: + while not self._shutdown: + self._reconnect_attempt_count += 1 + self.disconnect() + connect_success = self.connect() + if not connect_success and self._reconnect_attempt_count < self.reconnect_attempt_max: + self._logerr( + f'Reconnect to http://{self._host}:{self._port} failed. Retrying in {self.reconnect_attempt_wait_seconds} seconds') + time.sleep(self.reconnect_attempt_wait_seconds) + elif self._reconnect_attempt_count >= self.reconnect_attempt_max: + self._reconnect_attempt_count = 0 + raise Exception(f"Reconnect was attempted {self._reconnect_attempt_count} times, but never succeeded") + elif connect_success: + self._reconnect_attempt_count = 0 + break + else: + self._logdebug('Reconnect called while still connected, ignoring') + + def send_nmea(self, sentence): + if not self._connected: + self._logwarn('NMEA sent before client was connected, discarding NMEA') + return + + # Not sure if this is the right thing to do, but python will escape the return characters at the end of the string, so do this manually + if sentence[-4:] == '\\r\\n': + sentence = sentence[:-4] + '\r\n' + elif sentence[-2:] != '\r\n': + sentence = sentence + '\r\n' + + # Check if it is a valid NMEA sentence + if not self.nmea_parser.is_valid_sentence(sentence): + self._logwarn("Invalid NMEA sentence, not sending to server") + return + + # Encode the data and send it to the socket + try: + self._server_socket.send(sentence.encode('utf-8')) + except Exception as e: + self._logwarn('Unable to send NMEA sentence to server.') + self._logwarn(f'Exception: {e}') + self._nmea_send_failed_count += 1 + if self._nmea_send_failed_count >= self._nmea_send_failed_max: + self._logwarn(f"NMEA sentence failed to send to server {self._nmea_send_failed_count} times, restarting") + self.reconnect() + self._nmea_send_failed_count = 0 + self.send_nmea(sentence) # Try sending the NMEA sentence again + + + def recv_rtcm(self): + if not self._connected: + self._logwarn( + 'RTCM requested before client was connected, returning empty list') + return [] + + # If it has been too long since we received an RTCM packet, reconnect + if time.time() - self.rtcm_timeout_seconds >= self._recv_rtcm_last_packet_timestamp and self._first_rtcm_received: + self._logerr(f'RTCM data not received for {self.rtcm_timeout_seconds} seconds, reconnecting') + self.reconnect() + self._first_rtcm_received = False + + # Check if there is any data available on the socket + read_sockets, _, _ = select.select([self._server_socket], [], [], 0) + if not read_sockets: + return [] + + # Since we only ever pass the server socket to the list of read sockets, we can just read from that + # Read all available data into a buffer + data = b'' + while True: + try: + chunk = self._server_socket.recv(_CHUNK_SIZE) + data += chunk + if len(chunk) < _CHUNK_SIZE: + break + except Exception as e: + self._logerr(f'Error while reading {_CHUNK_SIZE} bytes from socket') + if not self._socket_is_open(): + self._logerr('Socket appears to be closed. Reconnecting') + self.reconnect() + return [] + break + self._logdebug(f'Read {len(data)} bytes') + + # If 0 bytes were read from the socket even though we were told data is available multiple times, + # it can be safely assumed that we can reconnect as the server has closed the connection + if len(data) == 0: + self._read_zero_bytes_count += 1 + if self._read_zero_bytes_count >= self._read_zero_bytes_max: + self._logwarn( + f'Reconnecting because we received 0 bytes from the socket even though it said there was data available {self._read_zero_bytes_count} times') + self.reconnect() + self._read_zero_bytes_count = 0 + return [] + else: + # Looks like we received valid data, so note when the data was received + self._recv_rtcm_last_packet_timestamp = time.time() + self._first_rtcm_received = True + + # Send the data to the RTCM parser to parse it + return self.rtcm_parser.parse(data) if data else [] + + def shutdown(self): + # Set some state, and then disconnect + self._shutdown = True + self.disconnect() + + def _form_request(self): + if self._ntrip_version not in (None, ''): + request_str = f'GET /{self._mountpoint} HTTP/1.0\r\nNtrip-Version: {self._ntrip_version}\r\nUser-Agent: NTRIP ntrip_client_ros\r\n' + else: + request_str = f'GET /{self._mountpoint} HTTP/1.0\r\nUser-Agent: NTRIP ntrip_client_ros\r\n' + if self._basic_credentials is not None: + request_str += f'Authorization: Basic {self._basic_credentials}\r\n' + request_str += '\r\n' + return request_str.encode('utf-8') + + def _socket_is_open(self): + try: + # this will try to read bytes without blocking and also without removing them from buffer (peek only) + data = self._server_socket.recv(_CHUNK_SIZE, socket.MSG_DONTWAIT | socket.MSG_PEEK) + if len(data) == 0: + return False + except BlockingIOError: + return True # socket is open and reading from it would block + except ConnectionResetError: + self._logwarn('Connection reset by peer') + return False # socket was closed for some other reason + except socket.timeout: + return True # timeout likely means that the socket is still open + except Exception as e: + self._logwarn('Socket appears to be closed') + self._logwarn(f'Exception: {e}') + return False + return True diff --git a/src/ntrip_client/rtcm_parser.py b/src/ntrip_client/rtcm_parser.py index 6989f5b..229d3e7 100644 --- a/src/ntrip_client/rtcm_parser.py +++ b/src/ntrip_client/rtcm_parser.py @@ -2,117 +2,124 @@ _RTCM_3_2_PREAMBLE = 0b11010011 _RTCM_CRC_LOOKUP = [ - 0x000000, 0x864CFB, 0x8AD50D, 0x0C99F6, 0x93E6E1, 0x15AA1A, 0x1933EC, 0x9F7F17, - 0xA18139, 0x27CDC2, 0x2B5434, 0xAD18CF, 0x3267D8, 0xB42B23, 0xB8B2D5, 0x3EFE2E, - 0xC54E89, 0x430272, 0x4F9B84, 0xC9D77F, 0x56A868, 0xD0E493, 0xDC7D65, 0x5A319E, - 0x64CFB0, 0xE2834B, 0xEE1ABD, 0x685646, 0xF72951, 0x7165AA, 0x7DFC5C, 0xFBB0A7, - 0x0CD1E9, 0x8A9D12, 0x8604E4, 0x00481F, 0x9F3708, 0x197BF3, 0x15E205, 0x93AEFE, - 0xAD50D0, 0x2B1C2B, 0x2785DD, 0xA1C926, 0x3EB631, 0xB8FACA, 0xB4633C, 0x322FC7, - 0xC99F60, 0x4FD39B, 0x434A6D, 0xC50696, 0x5A7981, 0xDC357A, 0xD0AC8C, 0x56E077, - 0x681E59, 0xEE52A2, 0xE2CB54, 0x6487AF, 0xFBF8B8, 0x7DB443, 0x712DB5, 0xF7614E, - 0x19A3D2, 0x9FEF29, 0x9376DF, 0x153A24, 0x8A4533, 0x0C09C8, 0x00903E, 0x86DCC5, - 0xB822EB, 0x3E6E10, 0x32F7E6, 0xB4BB1D, 0x2BC40A, 0xAD88F1, 0xA11107, 0x275DFC, - 0xDCED5B, 0x5AA1A0, 0x563856, 0xD074AD, 0x4F0BBA, 0xC94741, 0xC5DEB7, 0x43924C, - 0x7D6C62, 0xFB2099, 0xF7B96F, 0x71F594, 0xEE8A83, 0x68C678, 0x645F8E, 0xE21375, - 0x15723B, 0x933EC0, 0x9FA736, 0x19EBCD, 0x8694DA, 0x00D821, 0x0C41D7, 0x8A0D2C, - 0xB4F302, 0x32BFF9, 0x3E260F, 0xB86AF4, 0x2715E3, 0xA15918, 0xADC0EE, 0x2B8C15, - 0xD03CB2, 0x567049, 0x5AE9BF, 0xDCA544, 0x43DA53, 0xC596A8, 0xC90F5E, 0x4F43A5, - 0x71BD8B, 0xF7F170, 0xFB6886, 0x7D247D, 0xE25B6A, 0x641791, 0x688E67, 0xEEC29C, - 0x3347A4, 0xB50B5F, 0xB992A9, 0x3FDE52, 0xA0A145, 0x26EDBE, 0x2A7448, 0xAC38B3, - 0x92C69D, 0x148A66, 0x181390, 0x9E5F6B, 0x01207C, 0x876C87, 0x8BF571, 0x0DB98A, - 0xF6092D, 0x7045D6, 0x7CDC20, 0xFA90DB, 0x65EFCC, 0xE3A337, 0xEF3AC1, 0x69763A, - 0x578814, 0xD1C4EF, 0xDD5D19, 0x5B11E2, 0xC46EF5, 0x42220E, 0x4EBBF8, 0xC8F703, - 0x3F964D, 0xB9DAB6, 0xB54340, 0x330FBB, 0xAC70AC, 0x2A3C57, 0x26A5A1, 0xA0E95A, - 0x9E1774, 0x185B8F, 0x14C279, 0x928E82, 0x0DF195, 0x8BBD6E, 0x872498, 0x016863, - 0xFAD8C4, 0x7C943F, 0x700DC9, 0xF64132, 0x693E25, 0xEF72DE, 0xE3EB28, 0x65A7D3, - 0x5B59FD, 0xDD1506, 0xD18CF0, 0x57C00B, 0xC8BF1C, 0x4EF3E7, 0x426A11, 0xC426EA, - 0x2AE476, 0xACA88D, 0xA0317B, 0x267D80, 0xB90297, 0x3F4E6C, 0x33D79A, 0xB59B61, - 0x8B654F, 0x0D29B4, 0x01B042, 0x87FCB9, 0x1883AE, 0x9ECF55, 0x9256A3, 0x141A58, - 0xEFAAFF, 0x69E604, 0x657FF2, 0xE33309, 0x7C4C1E, 0xFA00E5, 0xF69913, 0x70D5E8, - 0x4E2BC6, 0xC8673D, 0xC4FECB, 0x42B230, 0xDDCD27, 0x5B81DC, 0x57182A, 0xD154D1, - 0x26359F, 0xA07964, 0xACE092, 0x2AAC69, 0xB5D37E, 0x339F85, 0x3F0673, 0xB94A88, - 0x87B4A6, 0x01F85D, 0x0D61AB, 0x8B2D50, 0x145247, 0x921EBC, 0x9E874A, 0x18CBB1, - 0xE37B16, 0x6537ED, 0x69AE1B, 0xEFE2E0, 0x709DF7, 0xF6D10C, 0xFA48FA, 0x7C0401, - 0x42FA2F, 0xC4B6D4, 0xC82F22, 0x4E63D9, 0xD11CCE, 0x575035, 0x5BC9C3, 0xDD8538 + 0x000000, 0x864CFB, 0x8AD50D, 0x0C99F6, 0x93E6E1, 0x15AA1A, 0x1933EC, 0x9F7F17, + 0xA18139, 0x27CDC2, 0x2B5434, 0xAD18CF, 0x3267D8, 0xB42B23, 0xB8B2D5, 0x3EFE2E, + 0xC54E89, 0x430272, 0x4F9B84, 0xC9D77F, 0x56A868, 0xD0E493, 0xDC7D65, 0x5A319E, + 0x64CFB0, 0xE2834B, 0xEE1ABD, 0x685646, 0xF72951, 0x7165AA, 0x7DFC5C, 0xFBB0A7, + 0x0CD1E9, 0x8A9D12, 0x8604E4, 0x00481F, 0x9F3708, 0x197BF3, 0x15E205, 0x93AEFE, + 0xAD50D0, 0x2B1C2B, 0x2785DD, 0xA1C926, 0x3EB631, 0xB8FACA, 0xB4633C, 0x322FC7, + 0xC99F60, 0x4FD39B, 0x434A6D, 0xC50696, 0x5A7981, 0xDC357A, 0xD0AC8C, 0x56E077, + 0x681E59, 0xEE52A2, 0xE2CB54, 0x6487AF, 0xFBF8B8, 0x7DB443, 0x712DB5, 0xF7614E, + 0x19A3D2, 0x9FEF29, 0x9376DF, 0x153A24, 0x8A4533, 0x0C09C8, 0x00903E, 0x86DCC5, + 0xB822EB, 0x3E6E10, 0x32F7E6, 0xB4BB1D, 0x2BC40A, 0xAD88F1, 0xA11107, 0x275DFC, + 0xDCED5B, 0x5AA1A0, 0x563856, 0xD074AD, 0x4F0BBA, 0xC94741, 0xC5DEB7, 0x43924C, + 0x7D6C62, 0xFB2099, 0xF7B96F, 0x71F594, 0xEE8A83, 0x68C678, 0x645F8E, 0xE21375, + 0x15723B, 0x933EC0, 0x9FA736, 0x19EBCD, 0x8694DA, 0x00D821, 0x0C41D7, 0x8A0D2C, + 0xB4F302, 0x32BFF9, 0x3E260F, 0xB86AF4, 0x2715E3, 0xA15918, 0xADC0EE, 0x2B8C15, + 0xD03CB2, 0x567049, 0x5AE9BF, 0xDCA544, 0x43DA53, 0xC596A8, 0xC90F5E, 0x4F43A5, + 0x71BD8B, 0xF7F170, 0xFB6886, 0x7D247D, 0xE25B6A, 0x641791, 0x688E67, 0xEEC29C, + 0x3347A4, 0xB50B5F, 0xB992A9, 0x3FDE52, 0xA0A145, 0x26EDBE, 0x2A7448, 0xAC38B3, + 0x92C69D, 0x148A66, 0x181390, 0x9E5F6B, 0x01207C, 0x876C87, 0x8BF571, 0x0DB98A, + 0xF6092D, 0x7045D6, 0x7CDC20, 0xFA90DB, 0x65EFCC, 0xE3A337, 0xEF3AC1, 0x69763A, + 0x578814, 0xD1C4EF, 0xDD5D19, 0x5B11E2, 0xC46EF5, 0x42220E, 0x4EBBF8, 0xC8F703, + 0x3F964D, 0xB9DAB6, 0xB54340, 0x330FBB, 0xAC70AC, 0x2A3C57, 0x26A5A1, 0xA0E95A, + 0x9E1774, 0x185B8F, 0x14C279, 0x928E82, 0x0DF195, 0x8BBD6E, 0x872498, 0x016863, + 0xFAD8C4, 0x7C943F, 0x700DC9, 0xF64132, 0x693E25, 0xEF72DE, 0xE3EB28, 0x65A7D3, + 0x5B59FD, 0xDD1506, 0xD18CF0, 0x57C00B, 0xC8BF1C, 0x4EF3E7, 0x426A11, 0xC426EA, + 0x2AE476, 0xACA88D, 0xA0317B, 0x267D80, 0xB90297, 0x3F4E6C, 0x33D79A, 0xB59B61, + 0x8B654F, 0x0D29B4, 0x01B042, 0x87FCB9, 0x1883AE, 0x9ECF55, 0x9256A3, 0x141A58, + 0xEFAAFF, 0x69E604, 0x657FF2, 0xE33309, 0x7C4C1E, 0xFA00E5, 0xF69913, 0x70D5E8, + 0x4E2BC6, 0xC8673D, 0xC4FECB, 0x42B230, 0xDDCD27, 0x5B81DC, 0x57182A, 0xD154D1, + 0x26359F, 0xA07964, 0xACE092, 0x2AAC69, 0xB5D37E, 0x339F85, 0x3F0673, 0xB94A88, + 0x87B4A6, 0x01F85D, 0x0D61AB, 0x8B2D50, 0x145247, 0x921EBC, 0x9E874A, 0x18CBB1, + 0xE37B16, 0x6537ED, 0x69AE1B, 0xEFE2E0, 0x709DF7, 0xF6D10C, 0xFA48FA, 0x7C0401, + 0x42FA2F, 0xC4B6D4, 0xC82F22, 0x4E63D9, 0xD11CCE, 0x575035, 0x5BC9C3, 0xDD8538 ] # If we find the beginning of a packet, but not the end, we will cache up to this number of bytes _MAX_BUFFER_SIZE = 1024 * 10 class RTCMParser: - - def __init__(self, logerr=logging.error, logwarn=logging.warning, loginfo=logging.info, logdebug=logging.debug): - # Bit of a strange pattern here, but save the log functions so we can be agnostic of ROS - self._logerr = logerr - self._logwarn = logwarn - self._loginfo = loginfo - self._logdebug = logdebug - # Empty buffer, will be filled out - self._caching_data = False - self._buffer = b'' + def __init__( + self, + logerr=logging.error, + logwarn=logging.warning, + loginfo=logging.info, + logdebug=logging.debug + ): + # Bit of a strange pattern here, but save the log functions so we can be agnostic of ROS + self._logerr = logerr + self._logwarn = logwarn + self._loginfo = loginfo + self._logdebug = logdebug - def parse(self, buffer): - # Add any data that we have cached - if self._caching_data: - combined_buffer = self._buffer + buffer - else: - combined_buffer = buffer + # Empty buffer, will be filled out + self._caching_data = False + self._buffer = b'' - # Loop over the passed buffer, and parse all available RTCM packets - index = 0 - rtcm_packets = [] - while index < len(combined_buffer): - # Find the start of the RTCM 3.2 packet - if combined_buffer[index] == _RTCM_3_2_PREAMBLE: - # Make sure we have enough data to find the length - if len(combined_buffer) <= index + 2: - self._logdebug('Found beginning of RTCM packet at {}, but there is not enough data in the buffer to find the message length'.format(index)) - self._caching_data = True - buffer = buffer[index:] - break + def parse(self, buffer): + # Add any data that we have cached + if self._caching_data: + combined_buffer = self._buffer + buffer + else: + combined_buffer = buffer - # Make sure we have enough data in the packet to validate it - message_length = (combined_buffer[index + 1] << 8 | combined_buffer[index + 2]) & 0x03FF - if index + message_length + 6 <= len(combined_buffer): - # Grab the packet from the buffer, and verify that it is valid by comparing checksums - packet = combined_buffer[index:index + message_length + 6] - expected_checksum = packet[-3] << 16 | packet[-2] << 8 | packet[-1] - actual_checksum = self._checksum(packet[:-3]) - if expected_checksum == actual_checksum: - self._logdebug('Found valid packet at {} with length {}'.format(index, message_length)) - rtcm_packets.append(packet) - index += message_length + 6 + # Loop over the passed buffer, and parse all available RTCM packets + index = 0 + rtcm_packets = [] + while index < len(combined_buffer): + # Find the start of the RTCM 3.2 packet + if combined_buffer[index] == _RTCM_3_2_PREAMBLE: + # Make sure we have enough data to find the length + if len(combined_buffer) <= index + 2: + self._logdebug( + f'Found beginning of RTCM packet at {index}, but there is not enough data in the buffer to find the message length') + self._caching_data = True + buffer = buffer[index:] + break - # Remove the packet we just found from the cached buffer - self._caching_data = False - self._buffer = self._buffer[message_length + 5:] - continue - else: - self._logwarn('Found packet, but checksums didn\'t match') - self._logwarn('Expected Checksum: 0x{:X}'.format(expected_checksum)) - self._logwarn('Actual Checksum: 0x{:X}'.format(actual_checksum)) - else: - self._logdebug('Found beginning of RTCM packet at {}, but there is not enough data in the buffer to extract it, caching'.format(index)) - self._caching_data = True + # Make sure we have enough data in the packet to validate it + message_length = (combined_buffer[index + 1] << 8 | combined_buffer[index + 2]) & 0x03FF + if index + message_length + 6 <= len(combined_buffer): + # Grab the packet from the buffer, and verify that it is valid by comparing checksums + packet = combined_buffer[index:index + message_length + 6] + expected_checksum = packet[-3] << 16 | packet[-2] << 8 | packet[-1] + actual_checksum = self._checksum(packet[:-3]) + if expected_checksum == actual_checksum: + self._logdebug(f'Found valid packet at {index} with length {message_length}') + rtcm_packets.append(packet) + index += message_length + 6 + + # Remove the packet we just found from the cached buffer + self._caching_data = False + self._buffer = self._buffer[message_length + 5:] + continue + else: + self._logwarn('Found packet, but checksums didn\'t match') + self._logwarn(f'Expected Checksum: 0x{expected_checksum:X}') + self._logwarn(f'Actual Checksum: 0x{actual_checksum:X}') + else: + self._logdebug(f'Found beginning of RTCM packet at {index}, but there is not enough data in the buffer to extract it, caching') + self._caching_data = True + + # If we didn't find a message, manually move on to the next byte + index += 1 - # If we didn't find a message, manually move on to the next byte - index += 1 + # If we didn't find a full packet, cache this one for next time + if self._caching_data: + self._buffer += buffer - # If we didn't find a full packet, cache this one for next time - if self._caching_data: - self._buffer += buffer + # Throw away old data if we are at our limit + if len(self._buffer) > _MAX_BUFFER_SIZE: + self._logwarn(f"Too much data buffered, trimming to {_MAX_BUFFER_SIZE} bytes.") + self._buffer = self._buffer[:_MAX_BUFFER_SIZE] - # Throw away old data if we are at our limit - if len(self._buffer) > _MAX_BUFFER_SIZE: - self._logwarn("Too much data buffered, trimming to {} bytes.".format(_MAX_BUFFER_SIZE)) - self._buffer = self._buffer[:_MAX_BUFFER_SIZE] + # Return the RTCM packets we found + return rtcm_packets - # Return the RTCM packets we found - return rtcm_packets - - def _checksum(self, packet): - crc = 0 - for byte in packet: - crc = ((crc << 8) & 0xFFFFFF) ^ _RTCM_CRC_LOOKUP[(crc >> 16) ^ byte] - return crc + def _checksum(self, packet): + crc = 0 + for byte in packet: + crc = ((crc << 8) & 0xFFFFFF) ^ _RTCM_CRC_LOOKUP[(crc >> 16) ^ byte] + return crc From 4f3bba20acfed59117924f8e06b6a291fd53faf3 Mon Sep 17 00:00:00 2001 From: agennart Date: Thu, 25 Jul 2024 16:54:12 +0200 Subject: [PATCH 2/3] Reorganize parameters in constructor Instead of updating private members of the NtripClient class after initialization, set them as parameters and add them to the constructor. --- scripts/ntrip_ros.py | 101 +++++++++++++++++-------------- src/ntrip_client/nmea_parser.py | 14 ++++- src/ntrip_client/ntrip_client.py | 25 +++++--- 3 files changed, 86 insertions(+), 54 deletions(-) diff --git a/scripts/ntrip_ros.py b/scripts/ntrip_ros.py index 8810b88..916d1ae 100755 --- a/scripts/ntrip_ros.py +++ b/scripts/ntrip_ros.py @@ -59,35 +59,10 @@ def __init__(self): ] ) - # Read some mandatory config - host = self.get_parameter('host').value - port = self.get_parameter('port').value - mountpoint = self.get_parameter('mountpoint').value - - # Optionally get the ntrip version from the launch file - ntrip_version = self.get_parameter('ntrip_version').value - if ntrip_version == 'None': - ntrip_version = None - # Set the log level to debug if debug is true if self._debug: rclpy.logging.set_logger_level(self.get_logger().name, rclpy.logging.LoggingSeverity.DEBUG) - # If we were asked to authenticate, read the username and password - username = None - password = None - if self.get_parameter('authenticate').value: - username = self.get_parameter('username').value - password = self.get_parameter('password').value - if not username: - self.get_logger().error( - 'Requested to authenticate, but param "username" was not set') - sys.exit(1) - if not password: - self.get_logger().error( - 'Requested to authenticate, but param "password" was not set') - sys.exit(1) - # Read an optional Frame ID from the config self._rtcm_frame_id = self.get_parameter('rtcm_frame_id').value @@ -114,6 +89,54 @@ def __init__(self): # Setup the RTCM publisher self._rtcm_pub = self.create_publisher(self._rtcm_message_type, 'rtcm', 10) + self._client = None + self.init_client() + + def init_client(self): + # Read some mandatory config + host = self.get_parameter('host').value + port = self.get_parameter('port').value + mountpoint = self.get_parameter('mountpoint').value + + # Optionally get the ntrip version from the launch file + ntrip_version = self.get_parameter('ntrip_version').value + if ntrip_version == 'None': + ntrip_version = None + + # If we were asked to authenticate, read the username and password + username = None + password = None + if self.get_parameter('authenticate').value: + username = self.get_parameter('username').value + password = self.get_parameter('password').value + if not username: + self.get_logger().error( + 'Requested to authenticate, but param "username" was not set') + sys.exit(1) + if not password: + self.get_logger().error( + 'Requested to authenticate, but param "password" was not set') + sys.exit(1) + + # Get some SSL parameters for the NTRIP client + ssl = self.get_parameter('ssl').value + cert = self.get_parameter('cert').value + key = self.get_parameter('key').value + ca_cert = self.get_parameter('ca_cert').value + if cert == 'None': + cert = None + if key == 'None': + key = None + if ca_cert == 'None': + ca_cert = None + + # Get nmea and rtcm parameters + reconnect_attempt_max = self.get_parameter('reconnect_attempt_max').value + reconnect_attempt_wait_seconds = self.get_parameter('reconnect_attempt_wait_seconds').value + rtcm_timeout_seconds = self.get_parameter('rtcm_timeout_seconds').value + nmea_min_length = self.get_parameter('nmea_min_length').value + nmea_max_length = self.get_parameter('nmea_max_length').value + # Initialize the client self._client = NTRIPClient( host=host, @@ -122,31 +145,21 @@ def __init__(self): ntrip_version=ntrip_version, username=username, password=password, + reconnect_attempt_max=reconnect_attempt_max, + reconnect_attempt_wait_seconds=reconnect_attempt_wait_seconds, + rtcm_timeout_seconds=rtcm_timeout_seconds, + nmea_min_length=nmea_min_length, + nmea_max_length=nmea_max_length, + use_ssl=ssl, + cert=cert, + key=key, + ca_cert=ca_cert, logerr=self.get_logger().error, logwarn=self.get_logger().warning, loginfo=self.get_logger().info, logdebug=self.get_logger().debug ) - # Get some SSL parameters for the NTRIP client - self._client.ssl = self.get_parameter('ssl').value - self._client.cert = self.get_parameter('cert').value - self._client.key = self.get_parameter('key').value - self._client.ca_cert = self.get_parameter('ca_cert').value - if self._client.cert == 'None': - self._client.cert = None - if self._client.key == 'None': - self._client.key = None - if self._client.ca_cert == 'None': - self._client.ca_cert = None - - # Get some timeout parameters for the NTRIP client - self._client.nmea_parser.nmea_max_length = self.get_parameter('nmea_max_length').value - self._client.nmea_parser.nmea_min_length = self.get_parameter('nmea_min_length').value - self._client.reconnect_attempt_max = self.get_parameter('reconnect_attempt_max').value - self._client.reconnect_attempt_wait_seconds = self.get_parameter('reconnect_attempt_wait_seconds').value - self._client.rtcm_timeout_seconds = self.get_parameter('rtcm_timeout_seconds').value - def run(self): # Connect the client if not self._client.connect(): diff --git a/src/ntrip_client/nmea_parser.py b/src/ntrip_client/nmea_parser.py index f0e3c72..12fd04c 100644 --- a/src/ntrip_client/nmea_parser.py +++ b/src/ntrip_client/nmea_parser.py @@ -6,7 +6,15 @@ class NMEAParser: - def __init__(self, logerr=logging.error, logwarn=logging.warning, loginfo=logging.info, logdebug=logging.debug): + def __init__( + self, + nmea_max_length=NMEA_DEFAULT_MAX_LENGTH, + nmea_min_length=NMEA_DEFAULT_MIN_LENGTH, + logerr=logging.error, + logwarn=logging.warning, + loginfo=logging.info, + logdebug=logging.debug + ): # Bit of a strange pattern here, but save the log functions so we can be agnostic of ROS self._logerr = logerr self._logwarn = logwarn @@ -14,8 +22,8 @@ def __init__(self, logerr=logging.error, logwarn=logging.warning, loginfo=loggin self._logdebug = logdebug # Save some other config - self.nmea_max_length = NMEA_DEFAULT_MAX_LENGTH - self.nmea_min_length = NMEA_DEFAULT_MIN_LENGTH + self.nmea_max_length = nmea_max_length + self.nmea_min_length = nmea_min_length def is_valid_sentence(self, sentence): # Simple sanity checks diff --git a/src/ntrip_client/ntrip_client.py b/src/ntrip_client/ntrip_client.py index f17f59f..260e801 100644 --- a/src/ntrip_client/ntrip_client.py +++ b/src/ntrip_client/ntrip_client.py @@ -38,6 +38,15 @@ def __init__( ntrip_version, username, password, + reconnect_attempt_max, + reconnect_attempt_wait_seconds, + rtcm_timeout_seconds, + nmea_min_length, + nmea_max_length, + use_ssl=False, + cert=None, + key=None, + ca_cert=None, logerr=logging.error, logwarn=logging.warning, loginfo=logging.info, @@ -71,6 +80,8 @@ def __init__( logdebug=logdebug ) self.nmea_parser = NMEAParser( + nmea_min_length=nmea_min_length, + nmea_max_length=nmea_max_length, logerr=logerr, logwarn=logwarn, loginfo=loginfo, @@ -78,11 +89,11 @@ def __init__( ) # Public SSL configuration - self.ssl = False + self.ssl = use_ssl self._ssl_context = None - self.cert = None - self.key = None - self.ca_cert = None + self.cert = cert + self.key = key + self.ca_cert = ca_cert # Setup some state self._shutdown = False @@ -98,9 +109,9 @@ def __init__( self._recv_rtcm_last_packet_timestamp = 0 # Public reconnect info - self.reconnect_attempt_max = self.DEFAULT_RECONNECT_ATTEMPT_MAX - self.reconnect_attempt_wait_seconds = self.DEFAULT_RECONNECT_ATEMPT_WAIT_SECONDS - self.rtcm_timeout_seconds = self.DEFAULT_RTCM_TIMEOUT_SECONDS + self.reconnect_attempt_max = reconnect_attempt_max + self.reconnect_attempt_wait_seconds = reconnect_attempt_wait_seconds + self.rtcm_timeout_seconds = rtcm_timeout_seconds def connect(self): # Create a socket object that we will use to connect to the server From 53e44508a403636e9e004b841776f838ddedf54a Mon Sep 17 00:00:00 2001 From: agennart Date: Thu, 25 Jul 2024 16:55:31 +0200 Subject: [PATCH 3/3] Add dynamic parameter callback When the ntrip server parameters are updated, the ntrip server is restarted with the new set of parameters. Note that for it to work properly, it is required to call the set parameters service with all the parameters that needs to be overwritten at once. --- scripts/ntrip_ros.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/scripts/ntrip_ros.py b/scripts/ntrip_ros.py index 916d1ae..949ad36 100755 --- a/scripts/ntrip_ros.py +++ b/scripts/ntrip_ros.py @@ -9,6 +9,7 @@ from rclpy.node import Node from std_msgs.msg import Header from nmea_msgs.msg import Sentence +from rcl_interfaces.msg import SetParametersResult from ntrip_client.ntrip_client import NTRIPClient from ntrip_client.nmea_parser import NMEA_DEFAULT_MAX_LENGTH, NMEA_DEFAULT_MIN_LENGTH @@ -59,6 +60,8 @@ def __init__(self): ] ) + self.add_post_set_parameters_callback(self.post_set_parameters_callback) + # Set the log level to debug if debug is true if self._debug: rclpy.logging.set_logger_level(self.get_logger().name, rclpy.logging.LoggingSeverity.DEBUG) @@ -160,6 +163,16 @@ def init_client(self): logdebug=self.get_logger().debug ) + def post_set_parameters_callback(self, _params): + self.get_logger().info('Shutdown NTRIP client') + self._client.shutdown() + + self.get_logger().info('Re-initialize NTRIP client') + self.init_client() + + if not self._client.connect(): + self.get_logger().error('Unable to connect to NTRIP server') + def run(self): # Connect the client if not self._client.connect():