Skip to content
Merged
Show file tree
Hide file tree
Changes from 36 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
0cd32ae
Added clustering and mode tracking
Oct 21, 2025
1f711db
Added clustering and mode tracking
Oct 21, 2025
d34018c
Merge branch 'main' of https://github.com/au650680/example-shm
Oct 21, 2025
a487753
Merge branch 'main' into clustering_and_mode_tracking
Oct 21, 2025
62bddcd
Edits to last pull
Oct 28, 2025
aedc878
Bug fix and small change to plot_cluster
Oct 31, 2025
802139e
Small changes
Nov 5, 2025
9d145fd
Small changes
Nov 5, 2025
ee4f49a
Model update module and other changes
Nov 11, 2025
1e08818
Update README.md
Nov 11, 2025
c8ba534
Changed the typing of some arguments in some functions
Nov 11, 2025
e7e0926
Merge branch 'clustering_and_mode_tracking' into fix_clustering_and_m…
Nov 12, 2025
708df2c
Delete init file
Nov 12, 2025
8e8858a
Multiple changes and fixes
Nov 18, 2025
dedbf22
Simplifications to commit
Nov 21, 2025
23e1f4d
TopicsToPublish placeholder names
Nov 21, 2025
47da3c7
New record and replay data
Nov 21, 2025
bbdc720
Minor changes to comments
Nov 21, 2025
bbf56aa
Updated record and replay functions
Nov 24, 2025
d83cc29
Update model_update.py
Nov 24, 2025
0600840
Minor changes
Nov 25, 2025
afd27e1
Fixes and looping replay function
Nov 25, 2025
f9b3e39
Revert back to "sysid" key in configurations
Nov 25, 2025
8acef14
YAFEM model function is added to constants.py
Nov 27, 2025
67ac538
Refactor MQTT record/replay and change to model update information
Nov 27, 2025
e887bd4
Change recording topic to match with subscribe topic.
Nov 27, 2025
0875f7b
New topic names and origon added to replay config
Nov 28, 2025
32f667a
Hotfix for model update functions
Nov 28, 2025
ad7a8fc
Updated poetry.lock
Nov 28, 2025
c8b7599
Updated USERGUIDE
Nov 28, 2025
ca522ac
Added record and replay guide
Nov 28, 2025
89f3239
Small improvements
Feb 25, 2026
2e5c96b
Replay function edit and beam experiment data added
Mar 5, 2026
d348023
Changes to replay function
Mar 5, 2026
0b8a54e
Merge remote-tracking branch 'upstream/main' into fix_clustering_and_…
Mar 9, 2026
f6b2e21
Updated plot_sysid.py
Mar 9, 2026
0900daa
Added readme file, reference results for beam experiment, bug fixes
Mar 11, 2026
7c34e4e
Update beam import
Mar 11, 2026
14f6c16
replay functionality added as example
Mar 11, 2026
90e5d7a
Included record folder to project
Mar 11, 2026
4fe7578
Change to file path in replay.py
Mar 11, 2026
7962ed8
Removed os library
Mar 11, 2026
270dc7b
Removed os pathing from MU
Mar 11, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions USERGUIDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,12 @@ Inside `examples/run_sysid.py`, `examples/run_mode_clustering.py`, `examples/run
the sample time can be changed: `number_of_minutes`.

* **Parameters**
Inside `method/constans.py` the parameters for system identification, mode clustering, mode tracking and model updating
Inside `method/constants.py` the parameters for system identification, mode clustering, mode tracking and model updating
can be changed.

* **Model**
A digital YAFEM model can be added to `models/<your_model>`.
Inside `method/constans.py` the model paramters can be set together with the paths to the model folder and the model function file.
Inside `method/constants.py` the model paramters can be set together with the paths to the model folder and the model function file.


## Step 1.4 Run examples
Expand Down Expand Up @@ -310,7 +310,7 @@ CONFIG_PATH = "config/replay.json"
RECORDINGS_DIR = "record/mqtt_recordings"
FILE_NAME = "recording.jsonl"

REPLAY_SPEED = 0.1 # Multiplier for replay speed
REPLAY_SPEED = 1 # Multiplier for replay speed
```
At the bottom the number of times to loop the replay function can be stated: `replay_mqtt_messages(loop=10) # Times to loop`

Expand Down
176,889 changes: 176,889 additions & 0 deletions record/mqtt_recordings/recording_beam_reduced.jsonl
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a README explaining

  • the purpose of recording
  • method used for recording data from multiple channels
  • data formats of the jsonl files including the metadata file
  • method used for replaying data at different speeds

Large diffs are not rendered by default.

190 changes: 133 additions & 57 deletions record/replay.py
Original file line number Diff line number Diff line change
@@ -1,42 +1,87 @@
import os
import json
import threading
import time
import struct
from typing import Dict
from datetime import datetime
from paho.mqtt.client import Client as MQTTClient
from data.comm.mqtt import (shutdown, load_config, setup_publish_client)

from data.accel.metadata_constants import DESCRIPTOR_LENGTH_BYTES

# MQTT Configuration
CONFIG_PATH = "config/replay.json"

RECORDINGS_DIR = "record/mqtt_recordings"
FILE_NAME = "recording.jsonl"
FILE_NAME = "recording_beam_reduced.jsonl"

REPLAY_SPEED = 10 # Multiplier for replay speed

BUSY_WAIT_THRESHOLD = 10/1000 # Threshold in seconds for busy waiting (10 ms)
KEEP_UP_TIME = -1 # If delay time (remaining) is lower than this time, warn the user that the replay speed is two fast.
PRINT_INTERVAL = 5

SINCE_START_COUNTER = {}
BATCH_SIZE = 16


REPLAY_SPEED = 0.1 # Multiplier for replay speed
def override_counter_in_payload(topic_key,payload_bytes) -> None:
"""
Overrides the recorded 'samples_from_daq_start' counter with a replay counter.
This is important when the recording is looped.
Args:
topic_key (str): Topic string used as a dict key
paylpad_bytes (bytes): Payload in bytes
Returns:
payload_bytes (bytes): Payload in bytes
"""
# Find and remove the descriptor from the payload
descriptor_length = struct.unpack("<H", payload_bytes[:DESCRIPTOR_LENGTH_BYTES])[0]
(descriptor_length, _, __, ___,
samples_from_daq_start,) = struct.unpack("<HHQQQ", payload_bytes[:descriptor_length])

# Find the raw data
payload = payload_bytes[descriptor_length:]

accel_values = struct.unpack(f"<{BATCH_SIZE}f", payload)
# Recreate the data payload to bytes
data_payload = struct.pack(f"<{len(accel_values)}f", *accel_values)

# Recreate the descriptor with the updated counter
SINCE_START_COUNTER[topic_key] = SINCE_START_COUNTER.get(topic_key, 0) + BATCH_SIZE
descriptor = struct.pack("<HHQQQ", 28, 2, 0, 0, SINCE_START_COUNTER[topic_key])
#Add payload back together
payload_bytes = descriptor + data_payload
return payload_bytes

def send_message(publish_client: MQTTClient, PublishTopics: Dict[str,str], line: str, delay: float, total: int, counter: int):
def publish_massage(publish_client: MQTTClient, PublishTopics: Dict[str,str], qos: int, topic_key: str, payload_bytes: bytes) -> None:
"""
Publish message
Args:
publish_client (MQTTClient): Publish client
PublishTopics (Dict[str,str]): Topics to publish
qos (int): Quality of service
topic_key (str): Topic string used as a dict key
paylpad_bytes (bytes): Payload in bytes
Return:
None
"""

try:
record = json.loads(line.strip())
payload = record["payload"]
if isinstance(payload, list):
payload_bytes = bytes(payload)
elif isinstance(payload, str):
payload_bytes = bytes.fromhex(payload)
else:
raise ValueError("Invalid payload format")

qos = record.get("qos", 1)
topic_key = record.get("topic")
topic = PublishTopics[topic_key]

publish_client.publish(topic, payload=payload_bytes, qos=qos)
text = (f"[REPLAYED {counter+1}/{total}] → {topic} (len={len(payload_bytes)}, delay={delay:.5f}s) ")
print(text,end="\r")
except KeyboardInterrupt:
raise RuntimeError("Replay interrupted by user.")

def replay_mqtt_messages(loop: int = 1) -> None:
"""
Replay data using jsonl file

Args:
loop (int): Number of times to loop the recorded data
Returns:
None
"""
config = load_config(CONFIG_PATH)
MQTT_config = config["MQTT"]
publish_client = setup_publish_client(MQTT_config)
Expand All @@ -48,56 +93,87 @@ def replay_mqtt_messages(loop: int = 1) -> None:
print(f"[Error] {e}")

try:
with open(path, "r") as f:
total_lines = len(f.readlines())
f.close()
publish_client.loop_start()
for ii in range(loop):
print(f"Replay function iteration {ii+1}/{loop}.")
publish_client.loop_start()
t_start = time.time()
accumulated_delay = 0.0
with open(path, "r") as f:
total_lines = len(f.readlines())
f.close()

with open(path, "r") as f:
prev_timestamp = None
for counter, line in enumerate(f):
# if counter >= 256:
# break
try:
message = json.loads(line.strip())
timestamp = datetime.fromisoformat(message["timestamp"])
if prev_timestamp is None:
delay = 0.0 # Send the first message immediately
else:
delay = (timestamp - prev_timestamp).total_seconds()
if delay < 0:
delay = 0.0 # Prevent negative delay
accumulated_delay += delay
replay_delay = delay / REPLAY_SPEED
threading.Timer(replay_delay, send_message, args=(publish_client,MQTT_config["TopicsToPublish"],line,replay_delay,total_lines,counter,)).start()
prev_timestamp = timestamp
except Exception as e:
print(f"\n[Error] Failed to process line: {e}")
f.close()

time.sleep(2)
print(f"\nWaiting for all messages ({total_lines}msg. {accumulated_delay:.3f}s) to be sent...")
while publish_client._out_messages:
text = f"Remaining messages to be sent: {str(len(publish_client._out_messages)).zfill(len(str(total_lines)))}"
time.sleep(1)
print(text,end="\r")
publish_client.loop_stop()
t_end = time.time()
print(f"\nTime it took to publish: {(t_end-t_start):.3f}s")
t_start = time.perf_counter()
print_t = t_start
prev_timestamp = None
with open(path, "r", encoding="utf-8") as replay_file:
for counter, line in enumerate(replay_file):
record = json.loads(line.strip())
payload = record["payload"]
topic_key = record.get("topic")
qos = record.get("qos", 0)

if isinstance(payload, list):
payload_bytes = bytes(payload)
elif isinstance(payload, str):
payload_bytes = bytes.fromhex(payload)
else:
raise ValueError("Invalid payload format")
if "metadata" not in topic_key:
payload_bytes = override_counter_in_payload(topic_key,payload_bytes)

timestamp = datetime.fromisoformat(record["timestamp"])
if prev_timestamp is None:
delay = 0.0 # Send the first message immediately
else:
delay = (timestamp - prev_timestamp).total_seconds()
if delay < 0:
delay = 0.0 # Prevent negative delay

accumulated_delay += delay
target_time = t_start + accumulated_delay / REPLAY_SPEED
time_now = time.perf_counter()

if (time_now-print_t) > PRINT_INTERVAL:
print(f"[REPLAYED {counter+1}/{total_lines}]")
print_t = time.perf_counter()

sleep_time = target_time - time_now
if sleep_time < KEEP_UP_TIME:
print("[WARNING] Can't keep up. Replay speed to fast.")
print(sleep_time,target_time,time_now)
if sleep_time > BUSY_WAIT_THRESHOLD:
time.sleep(sleep_time - BUSY_WAIT_THRESHOLD)
while time.perf_counter() < target_time:
time.sleep(0) # Yield CPU while busy-waiting for sub-10ms precision

# if sleep_time < KEEP_UP_TIME:
# print("[WARNING] Can't keep up. Replay speed to fast.")
# print(sleep_time,target_time,time_now)
# if sleep_time > BUSY_WAIT_THRESHOLD:
# time.sleep(sleep_time)
publish_massage(publish_client,MQTT_config["TopicsToPublish"],qos,topic_key,payload_bytes)
prev_timestamp = timestamp
replay_file.close()
print(f"[REPLAYED {counter+1}/{total_lines}]")
t_end = time.perf_counter()
print(f"\nTime it took to publish: {(t_end-t_start):.3f}s")
print("Messages per second:",total_lines/(t_end-t_start))
print("since_start_counter final",SINCE_START_COUNTER)
if ii+1 >= loop:
print("Restart replay function.")
while publish_client._out_messages:
remaining_count = len(publish_client._out_messages)
text = (
f"Remaining messages to be sent: "
f"{str(remaining_count).zfill(len(str(total_lines)))}"
)
time.sleep(1)
print(text, end="\r")
publish_client.loop_stop()
except KeyboardInterrupt:
time.sleep(1)
print("Keyboard interrupt.")
shutdown(publish_client)
else:
shutdown(publish_client)
print("[DONE].")

if __name__ == "__main__":
replay_mqtt_messages(loop=10) # Times to loop
replay_mqtt_messages(loop=2) # Times to loop
2 changes: 1 addition & 1 deletion src/functions/plot_clusters.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def plot_clusters(clusters: Dict[str,dict],
ax1.set_title("Clustered stabilization diagram")
if legend is True:
ax1.legend(prop={'size': 10})
ax1.set_title(f"Data set: {title_number}")
ax1.set_title(f"Clustered stabilization diagram. Data set: {title_number}")

# # # ............................................................................

Expand Down
6 changes: 5 additions & 1 deletion src/functions/plot_sysid.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ def plot_pre_stabilization_diagram(
ax1 = add_scatter_data(ax1,x,y_model_order,cov_freq,error_dir="h")
ax1 = add_plot_standard_flair(ax1,sysid_params)

ax1.set_title("Stabilization diagram (pre-cleaning)")
ax1.set_title("Stabilization diagram (pre-cleaning)")
ax1.set_ylabel("Model order", fontsize=20, color = 'black')
ax1.set_ylim(0, sysid_params['model_order'] + 1)
Expand All @@ -52,6 +53,7 @@ def plot_pre_stabilization_diagram(
ax2 = add_plot_annotation(ax2,x,y,y_model_order)
ax2 = add_plot_standard_flair(ax2,sysid_params)

ax2.set_title("Damping ratios")
ax2.set_title("Damping ratios")
ax2.set_ylabel("Damping ratio", fontsize=20, color = 'black')
ax2.set_ylim(0, max(y[~np.isnan(y)])+0.005)
Expand All @@ -74,6 +76,8 @@ def plot_stabilization_diagram(
Args:
sysid_results (Dict[str, Any]): PyOMA results
sysid_params (Dict[str, Any]): sysid parameters
sysid_results (Dict[str, Any]): PyOMA results
sysid_params (Dict[str, Any]): sysid parameters
Returns:
fig_ax (tuple): fig and ax of plot

Expand Down Expand Up @@ -110,7 +114,7 @@ def plot_stabilization_diagram(
ax2 = add_plot_annotation(ax2,x,y,y_model_order)
ax2 = add_plot_standard_flair(ax2,sysid_params)

ax1.set_title("Damping ratios")
ax2.set_title("Damping ratios")
ax2.set_ylabel("Damping ratio", fontsize=20, color = 'black')
ax2.set_ylim(0, max(y[~np.isnan(y)])+0.005)

Expand Down
16 changes: 10 additions & 6 deletions src/methods/mode_clustering.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from methods import sysid as sysID
from methods.mode_clustering_functions.clustering import cluster_func
from functions.util import (convert_numpy_to_list, _convert_list_to_dict_or_array)
from functions.plot_sysid import plot_stabilization_diagram
from functions.plot_sysid import plot_stabilization_diagram, plot_pre_stabilization_diagram
from functions.plot_clusters import plot_clusters

# pylint: disable=C0103, W0603
Expand Down Expand Up @@ -103,15 +103,19 @@ def cluster_plots(plot: List[bool], clusters: Dict[str,Any], sysid_output: Dict[
fig_axes (List[plt.Fig,plt.Axes]): List of figure and axes of plots
"""
if plot[0] == 1:
fig_ax1 = plot_stabilization_diagram(sysid_output,params,fig_ax=fig_axes[0])
fig_ax1 = plot_pre_stabilization_diagram(sysid_output,params,fig_ax=fig_axes[0])
else:
fig_ax1 = None
if plot[1] == 1:
fig_ax2 = plot_clusters(clusters,sysid_output,params,fig_ax=fig_axes[1])
fig_ax2 = plot_stabilization_diagram(sysid_output,params,fig_ax=fig_axes[1])
else:
fig_ax2 = None
if plot[2] == 1:
fig_ax3 = plot_clusters(clusters,sysid_output,params,fig_ax=fig_axes[2])
else:
fig_ax3 = None
plt.show(block=hold)
return [fig_ax1, fig_ax2]
return [fig_ax1, fig_ax2, fig_ax3]

def cluster_from_local_sysid(config_path: str, number_of_minutes: float,
params: Dict[str,Any],
Expand Down Expand Up @@ -184,7 +188,7 @@ def subscribe_and_cluster(config: Dict[str,Any], params: Dict[str,Any]
raise RuntimeError("Keyboard interrupt") from exc

def live_mode_clustering(config: Dict[str,Any], params: Dict[str,Any],
publish: bool = False, plot: List[bool] = [1,1]
publish: bool = False, plot: List[bool] = [1,1,1]
) -> None:
"""
Subscribes to MQTT broker, receives one sysid message, runs mode clustering, plots results.
Expand All @@ -198,7 +202,7 @@ def live_mode_clustering(config: Dict[str,Any], params: Dict[str,Any],

Returns:
"""
fig_axes = [None, None]
fig_axes = [None, None, None]
try:
while True:
(sysid_output, clusters,
Expand Down
Loading
Loading