-
Notifications
You must be signed in to change notification settings - Fork 1
/
message_handler_simulation3.py
119 lines (94 loc) · 3.7 KB
/
message_handler_simulation3.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
import can
import threading
import time
import random
import matplotlib.pyplot as plt
# Create a CAN bus
bus = can.interface.Bus(channel='vcan0', bustype='socketcan')
# Define message IDs
EMERGENCY_MESSAGE_ID = 0x100
ENGINE_HEAT_MESSAGE_ID = 0x200
# Lists to store data for visualization
time_points = []
temperature_data = []
emergency_data = []
# Enable Matplotlib interactive mode
plt.ion()
# Function to clear data
def clear_data():
global time_points, temperature_data, emergency_data
time_points = []
temperature_data = []
emergency_data = []
# Function to send messages from Node 1 (Engine Heat Monitor)
def engine_heat_monitor():
while True:
heat_data = random.uniform(80, 100) # Simulate temperature data between 80°C and 100°C
msg = can.Message(arbitration_id=ENGINE_HEAT_MESSAGE_ID, data=[int(heat_data)])
bus.send(msg)
# Record data for visualization
current_time = time.time()
time_points.append(current_time)
temperature_data.append(heat_data)
time.sleep(1)
# Function to send messages from Node 2 (Obstacle Finder)
def obstacle_finder():
while True:
# Simulate an "emergency situation" by sending an "emergency" message occasionally
if random.random() < 0.1:
msg = can.Message(arbitration_id=EMERGENCY_MESSAGE_ID, data=[1])
bus.send(msg)
# Record emergency data for visualization
current_time = time.time()
emergency_data.append((current_time, 1))
time.sleep(0.5)
# Function to handle received messages in Node 3 (Main Control)
def main_control():
while True:
message = bus.recv()
if message.arbitration_id == EMERGENCY_MESSAGE_ID:
print("Main Control: Received an emergency message.")
print("Main Control: Handling the emergency situation.")
# Handle the emergency situation here
elif message.arbitration_id == ENGINE_HEAT_MESSAGE_ID:
temperature = message.data[0]
print(f"Main Control: Received engine temperature data: {temperature} °C")
# Handle regular engine heat data here
# Start the threads for each node
engine_heat_thread = threading.Thread(target=engine_heat_monitor)
obstacle_finder_thread = threading.Thread(target=obstacle_finder)
main_control_thread = threading.Thread(target=main_control)
engine_heat_thread.start()
obstacle_finder_thread.start()
main_control_thread.start()
# Clear data at the beginning of each run
clear_data()
# Visualization
def plot_data():
plt.figure(figsize=(10, 6))
plt.title("Engine Temperature and Emergency Situations Over Time")
# Plot temperature data
plt.plot(time_points, temperature_data, label="Engine Temperature (°C)")
# Plot emergency situations as vertical lines
for time_point, _ in emergency_data:
plt.axvline(x=time_point, color='r', linestyle='--', label="Emergency Situation")
plt.xlabel("Time (s)")
plt.ylabel("Temperature (°C)")
plt.legend()
plt.grid(True)
plt.pause(0.01) # Add a pause to allow interactive updates
# Start the data visualization thread
visualization_thread = threading.Thread(target=plot_data)
visualization_thread.start()
# Controlled "emergency situation" simulation
def simulate_emergency():
time.sleep(10) # Simulate an emergency situation after 10 seconds
msg = can.Message(arbitration_id=EMERGENCY_MESSAGE_ID, data=[1])
bus.send(msg)
print("Simulated Emergency: An emergency situation has occurred.")
# Start the controlled "emergency situation" simulation
emergency_simulation_thread = threading.Thread(target=simulate_emergency)
emergency_simulation_thread.start()
# Keep the script running
while True:
pass