Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Python MQTT client hangs without error after a few hundred writes #47

Open
ManuelAlvesDtx opened this issue May 31, 2023 · 2 comments
Open

Comments

@ManuelAlvesDtx
Copy link

I've tested Thingsboard API (HTTP using JMeter) with very good results. Now I've been asked to do the same using MQTT. I’m using the docker image described in https://thingsboard.io/docs/user-guide/install/docker/

For starters, the examples in https://thingsboard.io/docs/reference/python-client-sdk/ do not work with tb-mqtt-client latest versions. By trial and error, I managed to make it work with version 1.1, the only one that worked for sending attributes and telemetry (python -m pip install tb-mqtt-client==1.1).
Using the MQTT python client to create devices never succeeds in provision more than 10.000. It hangs before that without any error.
Since what I was asked to test was sending data (attributes and telemetry) I resorted to pycurl to provision the devices and get their tokens so I could send data to each device.
Following the example in https://github.com/thingsboard/thingsboard-python-client-sdk/blob/master/examples/device/send_telemetry_and_attr.py I created these two functions to send data on custom MQTT class of my own where properties like server address and device token are populated when the device is provisioned using http.

    def write_device_attribute(self) -> bool:
        attribute = {"withoutHistory": random.randint(-1000000, 1000000)}
        client = TBDeviceMqttClient(host=self.server_address, token=self.device_token)
        client.connect()
        result = client.send_attributes(attribute)
        success = result.get() == TBPublishInfo.TB_ERR_SUCCESS
        client.disconnect()
        del client
        return success
    
    def write_device_telemetry(self) -> bool:
        telemetry = {"withHistory": random.randint(-1000000, 1000000)}
        client = TBDeviceMqttClient(host=self.server_address, token=self.device_token)
        client.connect()
        result = client.send_telemetry(telemetry,0)
        success = result.get() == TBPublishInfo.TB_ERR_SUCCESS
        client.disconnect()
        del client
        return success

These functions are called by several threads but each thread only writes to it’s own devices:
#################################################################
def write_attributes(user_n):
    """Write to the user devices attribute

    Args:
        user_n (int): integer identifying the user 1..N_USERS
    """
    try:
        
        for _ in range(N_DATA_POINTS):
            for n in range(N_DEVICES):
                success = MQTT_CLIENTS[user_n][n].write_device_attribute()
                Q_Request_log.put_nowait(f"{now_string()};attr;{success}")
                time.sleep(SLEEP_TIME)
    
    except TB_EXCEPTION as err:
        print(err)
#################################################################
def write_telemetry(user_n):
    """Write telemetry to the user devices

    Args:
        user_n (int): integer identifying the user 1..N_USERS
    """
    try:
        
        for _ in range(N_DATA_POINTS):
            for n in range(N_DEVICES):
                success = MQTT_CLIENTS[user_n][n].write_device_telemetry()
                Q_Request_log.put_nowait(f"{now_string()};ts;{success}")
                time.sleep(SLEEP_TIME)
    
    except TB_EXCEPTION as err:
        print(err)
#################################################################

And the main function:

    # Send telemetry (attribute, no history). Launch a thread for each user.
    start_time = datetime.now()
    for i in range(N_USERS):
        thread_a = threading.Thread(target = write_attributes, args=(i,))
        thread_a.start()
        threads_At.append(thread_a)

    # wait for all threads to finish
    for thread in threads_At:
        thread.join()
        
    end_time = datetime.now()
    print(f"Write attribute;{elapsed_time(start_time, end_time)}")
    
    # Send telemetry (attribute, time series). Launch a thread for each user.
    start_time = datetime.now()
    for i in range(N_USERS):
        thread_a = threading.Thread(target = write_telemetry, args=(i,))
        thread_a.start()
        threads_TS.append(thread_a)

    # wait for all threads to finish
    for thread in threads_TS:
        thread.join()
        
    end_time = datetime.now()
    print(f"Send telemetry;{elapsed_time(start_time, end_time)}")

This code starts by running as expected but after a few hundred attribute writes it gets stuck. Checking the process with ps -u I see that it is waiting in a interruptible sleep (waiting for an event to complete), specifically state “Sl+”.

Any clue as why this works fine for low numbers but gets stuck on a long run? The server is almost idle at 2% CPU usage, lots of free memory and disk.

@scholz
Copy link

scholz commented Jun 9, 2023

Hi, i dont know if this is helpful for your case but it may be an explaination to the behavior you describe.

We noticed in our application that if the connection goes down and we are using "success = result.get() == TBPublishInfo.TB_ERR_SUCCESS" to check the transmission (which is also documented as a blocking call) it will hang indefinitely.

Our current fix is as follows:

  • patch the _on_disconnect method in TBDeviceMqttClient so that self.disconnect() is called to execute a clean disconnect. This probably removes the ability of the client of automatic reconnection but here it helped as a workaround:

    ## fix the TBDeviceMqttClient _on_disconnect function
    from tb_device_mqtt import log
    def _on_disconnect_fix(self, client, userdata, result_code):
        prev_level = log.level
        log.setLevel("DEBUG")
        log.debug("Disconnected client: %s, user data: %s, result code: %s", str(client), str(userdata),
                  str(result_code))
        log.setLevel(prev_level)
        # only the following line is added compared to the original fn
        self.disconnect()

    TBDeviceMqttClient._on_disconnect = _on_disconnect_fix

  • remove the result.get() call

  • check instead client._is_connected() to see if client is really connected, notice that if a connection is lost, the default timeout is 120s after which the _is_disconnected method is called (This can be adapted in the TBDeviceMqttClient.connect call).

I see that you are connecting / disconnecting the client in each thread so it may not be connection related. However, this situation may still happen in your case if the connection breaks right after it was established for some reason (client.connect) or maybe it never worked and then the success call simply blocks producing the reported behavior.

A more general solution would surely include to add a timeout to the result.get() fn and improve the disconnect behavior of the mqtt client.

@ManuelAlvesDtx
Copy link
Author

Hi, thanks for your reply. I've managed to get it running by using paho and making a few changes ( I was missing client.loop_start() ). Now it runs with multiple threads without any issue. I leave it here for anyone trying to do the same:

`
import paho.mqtt.client as mqtt
import random, json
....

def write_device_attribute(self) -> int:
    client = mqtt.Client()
    client.username_pw_set(self.device_token)
    client.connect(mqttBroker)   
    attribute = {"withoutHistory": random.randint(-1000000, 1000000)}
    attribute = json.dumps(attribute)
    client.loop_start()
    info = client.publish(ATTRIBUTES_TOPIC, attribute)
    info.wait_for_publish()
    client.disconnect()
    return info.rc

def write_device_telemetry(self) -> int:
    client = mqtt.Client()
    client.username_pw_set(self.device_token)
    client.connect(mqttBroker)
    telemetry = {"withHistory":random.randint(-1000000, 1000000)}
    telemetry = json.dumps(telemetry)
    client.loop_start()
    info = client.publish(TELEMETRY_TOPIC, telemetry)
    info.wait_for_publish()
    client.disconnect()
    return info.rc

`

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants