-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathhealth_monitor.py
109 lines (89 loc) · 3.01 KB
/
health_monitor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
import docker
import time
import psutil
import serial
print("Starting health monitor script")
# initialize Docker client
try:
client = docker.from_env()
print("Docker client initialized")
except Exception as e:
print(f"Error initializing Docker client: {e}")
# serial settings for NodeMCU
SERIAL_PORT = "/dev/ttyUSB0"
BAUD_RATE = 9600
def get_docker_statuses():
print("Getting Docker statuses")
active_count = 0
inactive_count = 0
try:
containers = client.containers.list(all=True) # get all containers
active_count = sum(
1 for container in containers if container.status == "running"
)
inactive_count = len(containers) - active_count
except Exception as e:
print(f"Error getting Docker statuses: {e}")
return active_count, inactive_count
def get_system_metrics():
print("Getting system metrics")
temperatures = psutil.sensors_temperatures()
print(f"Temperatures: {temperatures}")
coretemp = temperatures.get("coretemp", [])
print(f"Coretemp: {coretemp}")
if coretemp and isinstance(coretemp, list):
coretemp = coretemp[0]
metrics = {
"T": coretemp.current if hasattr(coretemp, "current") else None,
"C": psutil.cpu_percent(interval=0),
"R": psutil.virtual_memory().percent,
}
else:
metrics = {
"T": None,
"C": psutil.cpu_percent(interval=0),
"R": psutil.virtual_memory().percent,
}
uptime_seconds = time.time() - psutil.boot_time()
uptime_days = int(uptime_seconds // (24 * 3600))
uptime_hours = int((uptime_seconds % (24 * 3600)) // 3600)
uptime_minutes = int((uptime_seconds % 3600) // 60)
uptime_str = f"{uptime_days}d{uptime_hours}h{uptime_minutes}m"
print(f"Metrics: {metrics}")
return metrics, uptime_str
def format_data_as_string(active_count, inactive_count, metrics, uptime):
# format the Docker container statuses
docker_statuses_str = f"ON{active_count}OFF{inactive_count}"
# round numbers
temp_str = f"{round(metrics['T'])}" if metrics["T"] is not None else "N/A"
cpu_str = f"{round(metrics['C'])}"
ram_str = f"{round(metrics['R'])}"
metrics_str = f"T{temp_str}C{cpu_str}R{ram_str}"
# combine uptime and metrics into a single string
final_string = f"(Docker:{docker_statuses_str} Metrics:{metrics_str} Uptime:{uptime})\n"
print(
f"Formatted data string: {final_string}"
) # print the exact data string (debugging)
return final_string
def send_data_to_nodemcu(data):
try:
with serial.Serial(SERIAL_PORT, BAUD_RATE, timeout=1) as ser:
ser.reset_input_buffer()
ser.reset_output_buffer()
ser.write(data.encode("utf-8")) # send serial data
print("Data sent successfully")
except Exception as e:
print(f"Error sending data: {e}")
if __name__ == "__main__":
while True:
try:
active_count, inactive_count = get_docker_statuses()
metrics, uptime = get_system_metrics()
data_string = format_data_as_string(
active_count, inactive_count, metrics, uptime
)
send_data_to_nodemcu(data_string)
time.sleep(1) # collect data every 1 second
except Exception as e:
print(f"Error in main loop: {e}")
time.sleep(1) # wait before retrying