-
Notifications
You must be signed in to change notification settings - Fork 1
/
message_handler_simulation2.py
97 lines (76 loc) · 2.95 KB
/
message_handler_simulation2.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
import can
import threading
import time
import random
import matplotlib.pyplot as plt
# Create a CAN bus
bus = can.interface.Bus(channel='vcan0', bustype='socketcan')
# Define message IDs
EMERGENCY_MESSAGE_ID = 0x100
ENGINE_HEAT_MESSAGE_ID = 0x200
# Lists to store data for visualization
time_points = []
temperature_data = []
emergency_data = []
# Enable Matplotlib interactive mode
plt.ion()
# Function to send messages from Node 1 (Engine Heat Monitor)
def engine_heat_monitor():
while True:
heat_data = random.uniform(80, 100) # Simulate temperature data between 80°C and 100°C
msg = can.Message(arbitration_id=ENGINE_HEAT_MESSAGE_ID, data=[int(heat_data)])
bus.send(msg)
# Record data for visualization
current_time = time.time()
time_points.append(current_time)
temperature_data.append(heat_data)
time.sleep(1)
# Function to send messages from Node 2 (Obstacle Finder)
def obstacle_finder():
while True:
# Simulate an "emergency situation" by sending an "emergency" message occasionally
if random.random() < 0.1:
msg = can.Message(arbitration_id=EMERGENCY_MESSAGE_ID, data=[1])
bus.send(msg)
# Record emergency data for visualization
current_time = time.time()
emergency_data.append((current_time, 1))
time.sleep(0.5)
# Function to handle received messages in Node 3 (Main Control)
def main_control():
while True:
message = bus.recv()
if message.arbitration_id == EMERGENCY_MESSAGE_ID:
print("Main Control: Received an emergency message.")
# Handle the emergency situation here
elif message.arbitration_id == ENGINE_HEAT_MESSAGE_ID:
temperature = message.data[0]
print(f"Main Control: Received engine temperature data: {temperature} °C")
# Handle regular engine heat data here
# Start the threads for each node
engine_heat_thread = threading.Thread(target=engine_heat_monitor)
obstacle_finder_thread = threading.Thread(target=obstacle_finder)
main_control_thread = threading.Thread(target=main_control)
engine_heat_thread.start()
obstacle_finder_thread.start()
main_control_thread.start()
# Visualization
def plot_data():
plt.figure(figsize=(10, 6))
plt.title("Engine Temperature and Emergency Situations Over Time")
# Plot temperature data
plt.plot(time_points, temperature_data, label="Engine Temperature (°C)")
# Plot emergency situations as vertical lines
for time_point, _ in emergency_data:
plt.axvline(x=time_point, color='r', linestyle='--', label="Emergency Situation")
plt.xlabel("Time (s)")
plt.ylabel("Temperature (°C)")
plt.legend()
plt.grid(True)
plt.pause(0.01) # Add a pause to allow interactive updates
# Start the data visualization thread
visualization_thread = threading.Thread(target=plot_data)
visualization_thread.start()
# Keep the script running
while True:
pass