Upload files to "python"
This commit is contained in:
parent
f49a7f8a0a
commit
70195d8948
188
python/ble_gui_circles.py
Normal file
188
python/ble_gui_circles.py
Normal file
@ -0,0 +1,188 @@
|
||||
import tkinter as tk
|
||||
import serial
|
||||
import threading
|
||||
import csv
|
||||
import serial.tools.list_ports
|
||||
from datetime import datetime
|
||||
import pytz
|
||||
import os
|
||||
|
||||
# Set timezone for Amsterdam
|
||||
AMSTERDAM_TZ = pytz.timezone("Europe/Amsterdam")
|
||||
|
||||
# Globals
|
||||
recording = False
|
||||
csv_file = None
|
||||
csv_writer = None
|
||||
ser = None
|
||||
|
||||
# Circle display parameters
|
||||
min_radius = 10
|
||||
max_radius = 80
|
||||
canvas_width = 400
|
||||
canvas_height = 300
|
||||
circle_coords = [
|
||||
(200, 100), # Sensor 1 (Blue)
|
||||
(135, 200), # Sensor 2 (Red)
|
||||
(265, 200) # Sensor 3 (Green)
|
||||
]
|
||||
|
||||
circle_items = []
|
||||
|
||||
def get_amsterdam_time():
|
||||
return datetime.now(AMSTERDAM_TZ).strftime("%Y-%m-%d %H:%M:%S.%f")
|
||||
|
||||
def get_timestamped_filename():
|
||||
timestamp = datetime.now(AMSTERDAM_TZ).strftime("%Y-%m-%d_%H-%M-%S")
|
||||
return f"sensor_data_{timestamp}.csv"
|
||||
|
||||
def list_serial_ports():
|
||||
return [port.device for port in serial.tools.list_ports.comports()]
|
||||
|
||||
def connect_serial():
|
||||
global ser
|
||||
selected_port = port_var.get()
|
||||
if selected_port:
|
||||
try:
|
||||
ser = serial.Serial(selected_port, 115200, timeout=1)
|
||||
lbl_status.config(text=f"Connected to {selected_port}", fg="green")
|
||||
threading.Thread(target=read_serial, daemon=True).start()
|
||||
except Exception as e:
|
||||
lbl_status.config(text=f"Error: {e}", fg="red")
|
||||
|
||||
def pressure_to_radius_and_brightness(pressure):
|
||||
#radius = max_radius - (pressure / 4095.0) * (max_radius - min_radius)
|
||||
radius = min_radius + (pressure / 4095.0) * (max_radius - min_radius)
|
||||
brightness = int(100 + (pressure / 4095.0) * 150) # range: 105 to 250
|
||||
return radius, brightness
|
||||
|
||||
def update_circles(pressures):
|
||||
for i in range(3):
|
||||
x, y = circle_coords[i]
|
||||
radius, brightness = pressure_to_radius_and_brightness(int(pressures[i]))
|
||||
color = ["blue", "red", "green"][i]
|
||||
color_hex = adjust_color_brightness(color, brightness)
|
||||
|
||||
# Update existing circle
|
||||
canvas.itemconfig(circle_items[i], fill=color_hex)
|
||||
canvas.coords(circle_items[i], x - radius, y - radius, x + radius , y + radius)
|
||||
|
||||
def adjust_color_brightness(color, brightness):
|
||||
base = {
|
||||
"blue": (0, 0, brightness),
|
||||
"red": (brightness, 0, 0),
|
||||
"green": (0, brightness, 0)
|
||||
}
|
||||
r, g, b = base[color]
|
||||
return f"#{r:02x}{g:02x}{b:02x}"
|
||||
|
||||
def read_serial():
|
||||
global csv_writer, csv_file
|
||||
while ser and ser.is_open:
|
||||
try:
|
||||
line = ser.readline().decode('utf-8').strip()
|
||||
values = line.split(",")
|
||||
if len(values) == 4:
|
||||
timestamp = get_amsterdam_time()
|
||||
|
||||
frame_id = values[0]
|
||||
val2, val3, val4 = values[1], values[2], values[3]
|
||||
formatted = [f"{int(v):>6}" for v in [val2, val3, val4]]
|
||||
|
||||
lbl_id.config(text=f"ESP32 ID: {frame_id}")
|
||||
lbl_value1.config(text=formatted[0])
|
||||
lbl_value2.config(text=formatted[1])
|
||||
lbl_value3.config(text=formatted[2])
|
||||
|
||||
update_circles([val2, val3, val4])
|
||||
|
||||
if recording and csv_writer:
|
||||
csv_writer.writerow([timestamp] + values)
|
||||
csv_file.flush()
|
||||
except:
|
||||
pass
|
||||
|
||||
def toggle_recording():
|
||||
global recording, csv_file, csv_writer
|
||||
if not recording:
|
||||
btn_record.config(text="Stop Recording", fg="red")
|
||||
filename = get_timestamped_filename()
|
||||
csv_file = open(filename, "w", newline="")
|
||||
csv_writer = csv.writer(csv_file)
|
||||
csv_writer.writerow(["Timestamp", "ID", "Sensor1", "Sensor2", "Sensor3"])
|
||||
lbl_status.config(text=f"Recording to {filename}", fg="blue")
|
||||
recording = True
|
||||
else:
|
||||
btn_record.config(text="Start Recording", fg="black")
|
||||
recording = False
|
||||
if csv_file:
|
||||
csv_file.close()
|
||||
csv_file = None
|
||||
lbl_status.config(text="Recording Stopped", fg="black")
|
||||
|
||||
# --- GUI Setup ---
|
||||
root = tk.Tk()
|
||||
root.title("BLE Pressure Sensor Monitor")
|
||||
root.geometry("500x700")
|
||||
root.resizable(False, False)
|
||||
|
||||
frame = tk.Frame(root, padx=20, pady=10)
|
||||
frame.pack(pady=5)
|
||||
|
||||
tk.Label(frame, text="Select Serial Port:", font=("Arial", 12)).grid(row=0, column=0, sticky="w", padx=5)
|
||||
port_var = tk.StringVar(root)
|
||||
ports = list_serial_ports()
|
||||
port_dropdown = tk.OptionMenu(frame, port_var, *ports)
|
||||
port_dropdown.grid(row=0, column=1, padx=5)
|
||||
port_var.set(ports[0] if ports else "No ports found")
|
||||
|
||||
btn_connect = tk.Button(frame, text="Connect", font=("Arial", 12), command=connect_serial)
|
||||
btn_connect.grid(row=0, column=2, padx=5)
|
||||
|
||||
lbl_status = tk.Label(root, text="Not connected", font=("Arial", 10), fg="red")
|
||||
lbl_status.pack(pady=5)
|
||||
|
||||
# Start/Stop Recording Button
|
||||
btn_record = tk.Button(root, text="Start Recording", font=("Arial", 12), command=toggle_recording)
|
||||
btn_record.pack(pady=10)
|
||||
|
||||
# Frame ID label
|
||||
lbl_id = tk.Label(root, text="ESP32 ID: ----", font=("Arial", 12, "bold"))
|
||||
lbl_id.pack(pady=(10, 0))
|
||||
|
||||
monospace_font = ("Courier", 14, "bold")
|
||||
tk.Label(frame, text="Sensor 1:", font=("Arial", 12)).grid(row=1, column=0, sticky="w", padx=10)
|
||||
lbl_value1 = tk.Label(frame, text="------", font=monospace_font, fg="blue", width=6, anchor="e")
|
||||
lbl_value1.grid(row=1, column=1, sticky="w")
|
||||
|
||||
tk.Label(frame, text="Sensor 2:", font=("Arial", 12)).grid(row=2, column=0, sticky="w", padx=10)
|
||||
lbl_value2 = tk.Label(frame, text="------", font=monospace_font, fg="red", width=6, anchor="e")
|
||||
lbl_value2.grid(row=2, column=1, sticky="w")
|
||||
|
||||
tk.Label(frame, text="Sensor 3:", font=("Arial", 12)).grid(row=3, column=0, sticky="w", padx=10)
|
||||
lbl_value3 = tk.Label(frame, text="------", font=monospace_font, fg="green", width=6, anchor="e")
|
||||
lbl_value3.grid(row=3, column=1, sticky="w")
|
||||
|
||||
# --- Circle Canvas ---
|
||||
canvas = tk.Canvas(root, width=canvas_width, height=canvas_height, bg="white")
|
||||
canvas.pack(pady=20)
|
||||
|
||||
# Create initial circles
|
||||
for i in range(3):
|
||||
x, y = circle_coords[i]
|
||||
r = min_radius
|
||||
color = ["blue", "red", "green"][i]
|
||||
color_hex = adjust_color_brightness(color, 100)
|
||||
circle = canvas.create_oval(x - r, y - r, x + r, y + r, fill=color_hex, outline="")
|
||||
circle_items.append(circle)
|
||||
|
||||
# Circle labels
|
||||
x, y = circle_coords[0]
|
||||
canvas.create_text(x, y- max_radius-10, text="Sensor 1", font=("Arial", 10), fill="blue")
|
||||
x, y = circle_coords[1]
|
||||
canvas.create_text(x, y+ max_radius+10, text="Sensor 2", font=("Arial", 10), fill="red")
|
||||
x, y = circle_coords[2]
|
||||
canvas.create_text(x, y+ max_radius+10, text="Sensor 3", font=("Arial", 10), fill="green")
|
||||
|
||||
|
||||
root.mainloop()
|
256
python/ble_gui_graph.py
Normal file
256
python/ble_gui_graph.py
Normal file
@ -0,0 +1,256 @@
|
||||
import tkinter as tk
|
||||
import serial
|
||||
import threading
|
||||
import csv
|
||||
import serial.tools.list_ports
|
||||
from datetime import datetime
|
||||
import pytz
|
||||
import os
|
||||
from collections import deque
|
||||
import matplotlib
|
||||
matplotlib.use("TkAgg")
|
||||
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
|
||||
import matplotlib.pyplot as plt
|
||||
|
||||
# Set timezone for Amsterdam
|
||||
AMSTERDAM_TZ = pytz.timezone("Europe/Amsterdam")
|
||||
|
||||
# Globals
|
||||
after_id = None
|
||||
recording = False
|
||||
csv_file = None
|
||||
csv_writer = None
|
||||
ser = None
|
||||
|
||||
# Circle display parameters
|
||||
min_radius = 10
|
||||
max_radius = 80
|
||||
canvas_width = 400
|
||||
canvas_height = 300
|
||||
circle_coords = [
|
||||
(200, 100), # Sensor 1 (Blue)
|
||||
(135, 200), # Sensor 2 (Red)
|
||||
(265, 200) # Sensor 3 (Green)
|
||||
]
|
||||
|
||||
circle_items = []
|
||||
|
||||
# Live plotting data
|
||||
plot_buffer = 100
|
||||
plot_data = {
|
||||
"sensor1": deque([0]*plot_buffer, maxlen=plot_buffer),
|
||||
"sensor2": deque([0]*plot_buffer, maxlen=plot_buffer),
|
||||
"sensor3": deque([0]*plot_buffer, maxlen=plot_buffer),
|
||||
"x": deque(range(-plot_buffer+1, 1), maxlen=plot_buffer)
|
||||
}
|
||||
|
||||
# Matplotlib setup
|
||||
fig, ax = plt.subplots(figsize=(5, 2.5))
|
||||
line1, = ax.plot([], [], color='blue', label='Sensor 1')
|
||||
line2, = ax.plot([], [], color='red', label='Sensor 2')
|
||||
line3, = ax.plot([], [], color='green', label='Sensor 3')
|
||||
ax.set_ylim(0, 4200)
|
||||
ax.set_xlim(-plot_buffer+1, 0)
|
||||
ax.set_ylabel("Pressure")
|
||||
ax.set_xlabel("Frames")
|
||||
ax.legend(loc='upper right')
|
||||
|
||||
|
||||
def get_amsterdam_time():
|
||||
return datetime.now(AMSTERDAM_TZ).strftime("%Y-%m-%d %H:%M:%S.%f")
|
||||
|
||||
def get_timestamped_filename():
|
||||
timestamp = datetime.now(AMSTERDAM_TZ).strftime("%Y-%m-%d_%H-%M-%S")
|
||||
return f"sensor_data_{timestamp}.csv"
|
||||
|
||||
def list_serial_ports():
|
||||
return [port.device for port in serial.tools.list_ports.comports()]
|
||||
|
||||
def connect_serial():
|
||||
global ser
|
||||
selected_port = port_var.get()
|
||||
if selected_port:
|
||||
try:
|
||||
ser = serial.Serial(selected_port, 115200, timeout=1)
|
||||
lbl_status.config(text=f"Connected to {selected_port}", fg="green")
|
||||
threading.Thread(target=read_serial, daemon=True).start()
|
||||
except Exception as e:
|
||||
lbl_status.config(text=f"Error: {e}", fg="red")
|
||||
|
||||
def pressure_to_radius_and_brightness(pressure):
|
||||
radius = min_radius + (pressure / 4095.0) * (max_radius - min_radius)
|
||||
brightness = int(100 + (pressure / 4095.0) * 150) # range: 55 to 255
|
||||
return radius, brightness
|
||||
|
||||
def update_circles(pressures):
|
||||
for i in range(3):
|
||||
x, y = circle_coords[i]
|
||||
radius, brightness = pressure_to_radius_and_brightness(int(pressures[i]))
|
||||
color = ["blue", "red", "green"][i]
|
||||
color_hex = adjust_color_brightness(color, brightness)
|
||||
|
||||
# Update existing circle
|
||||
canvas.itemconfig(circle_items[i], fill=color_hex)
|
||||
canvas.coords(circle_items[i], x - radius, y - radius, x + radius, y + radius)
|
||||
|
||||
def adjust_color_brightness(color, brightness):
|
||||
base = {
|
||||
"blue": (0, 0, brightness),
|
||||
"red": (brightness, 0, 0),
|
||||
"green": (0, brightness, 0)
|
||||
}
|
||||
r, g, b = base[color]
|
||||
return f"#{r:02x}{g:02x}{b:02x}"
|
||||
|
||||
def schedule_update_plot():
|
||||
global after_id
|
||||
update_plot()
|
||||
after_id = root.after(100, schedule_update_plot)
|
||||
|
||||
def update_plot():
|
||||
line1.set_data(plot_data["x"], plot_data["sensor1"])
|
||||
line2.set_data(plot_data["x"], plot_data["sensor2"])
|
||||
line3.set_data(plot_data["x"], plot_data["sensor3"])
|
||||
ax.set_xlim(min(plot_data["x"]), max(plot_data["x"]))
|
||||
canvas_plot.draw()
|
||||
|
||||
def read_serial():
|
||||
global csv_writer, csv_file
|
||||
while ser and ser.is_open:
|
||||
try:
|
||||
line = ser.readline().decode('utf-8').strip()
|
||||
values = line.split(",")
|
||||
if len(values) == 4:
|
||||
frame_id = values[0]
|
||||
try:
|
||||
val2 = int(values[1])
|
||||
val3 = int(values[2])
|
||||
val4 = int(values[3])
|
||||
except ValueError:
|
||||
continue # skip bad data
|
||||
|
||||
timestamp = get_amsterdam_time()
|
||||
|
||||
root.after(0, lambda fid=frame_id, v2=val2, v3=val3, v4=val4: update_gui(fid, v2, v3, v4))
|
||||
|
||||
if recording and csv_writer:
|
||||
csv_writer.writerow([timestamp, frame_id, val2, val3, val4])
|
||||
csv_file.flush()
|
||||
except:
|
||||
pass
|
||||
|
||||
def update_gui(frame_id, val2, val3, val4):
|
||||
lbl_id.config(text=f"Frame ID: {frame_id}")
|
||||
lbl_value1.config(text=f"{val2:>6}")
|
||||
lbl_value2.config(text=f"{val3:>6}")
|
||||
lbl_value3.config(text=f"{val4:>6}")
|
||||
update_circles([val2, val3, val4])
|
||||
plot_data["x"].append(plot_data["x"][-1] + 1)
|
||||
plot_data["sensor1"].append(val2)
|
||||
plot_data["sensor2"].append(val3)
|
||||
plot_data["sensor3"].append(val4)
|
||||
|
||||
def toggle_recording():
|
||||
global recording, csv_file, csv_writer
|
||||
if not recording:
|
||||
btn_record.config(text="Stop Recording", fg="red")
|
||||
filename = get_timestamped_filename()
|
||||
csv_file = open(filename, "w", newline="")
|
||||
csv_writer = csv.writer(csv_file)
|
||||
csv_writer.writerow(["Timestamp", "ID", "Sensor1", "Sensor2", "Sensor3"])
|
||||
lbl_status.config(text=f"Recording to {filename}", fg="blue")
|
||||
recording = True
|
||||
else:
|
||||
btn_record.config(text="Start Recording", fg="black")
|
||||
recording = False
|
||||
if csv_file:
|
||||
csv_file.close()
|
||||
csv_file = None
|
||||
lbl_status.config(text="Recording Stopped", fg="black")
|
||||
|
||||
# --- GUI Setup ---
|
||||
root = tk.Tk()
|
||||
root.title("BLE Pressure Sensor Monitor")
|
||||
root.geometry("600x950")
|
||||
root.resizable(False, False)
|
||||
|
||||
frame = tk.Frame(root, padx=20, pady=10)
|
||||
frame.pack(pady=5)
|
||||
|
||||
tk.Label(frame, text="Select Serial Port:", font=("Arial", 12)).grid(row=0, column=0, sticky="w", padx=5)
|
||||
port_var = tk.StringVar(root)
|
||||
ports = list_serial_ports()
|
||||
port_dropdown = tk.OptionMenu(frame, port_var, *ports)
|
||||
port_dropdown.grid(row=0, column=1, padx=5)
|
||||
port_var.set(ports[0] if ports else "No ports found")
|
||||
|
||||
btn_connect = tk.Button(frame, text="Connect", font=("Arial", 12), command=connect_serial)
|
||||
btn_connect.grid(row=0, column=2, padx=5)
|
||||
|
||||
lbl_status = tk.Label(root, text="Not connected", font=("Arial", 10), fg="red")
|
||||
lbl_status.pack(pady=5)
|
||||
|
||||
# Start/Stop Recording Button
|
||||
btn_record = tk.Button(root, text="Start Recording", font=("Arial", 12), command=toggle_recording)
|
||||
btn_record.pack(pady=10)
|
||||
|
||||
# Frame ID label
|
||||
lbl_id = tk.Label(root, text="Frame ID: ----", font=("Arial", 12, "bold"))
|
||||
lbl_id.pack(pady=(10, 0))
|
||||
|
||||
monospace_font = ("Courier", 14, "bold")
|
||||
tk.Label(frame, text="Sensor 1:", font=("Arial", 12)).grid(row=1, column=0, sticky="w", padx=10)
|
||||
lbl_value1 = tk.Label(frame, text="------", font=monospace_font, fg="blue", width=6, anchor="e")
|
||||
lbl_value1.grid(row=1, column=1, sticky="w")
|
||||
|
||||
tk.Label(frame, text="Sensor 2:", font=("Arial", 12)).grid(row=2, column=0, sticky="w", padx=10)
|
||||
lbl_value2 = tk.Label(frame, text="------", font=monospace_font, fg="red", width=6, anchor="e")
|
||||
lbl_value2.grid(row=2, column=1, sticky="w")
|
||||
|
||||
tk.Label(frame, text="Sensor 3:", font=("Arial", 12)).grid(row=3, column=0, sticky="w", padx=10)
|
||||
lbl_value3 = tk.Label(frame, text="------", font=monospace_font, fg="green", width=6, anchor="e")
|
||||
lbl_value3.grid(row=3, column=1, sticky="w")
|
||||
|
||||
# --- Circle Canvas ---
|
||||
canvas = tk.Canvas(root, width=canvas_width, height=canvas_height, bg="white")
|
||||
canvas.pack(pady=20)
|
||||
|
||||
# Create initial circles
|
||||
for i in range(3):
|
||||
x, y = circle_coords[i]
|
||||
r = min_radius
|
||||
color = ["blue", "red", "green"][i]
|
||||
color_hex = adjust_color_brightness(color, 100)
|
||||
circle = canvas.create_oval(x - r, y - r, x + r, y + r, fill=color_hex, outline="")
|
||||
circle_items.append(circle)
|
||||
|
||||
# Circle labels
|
||||
x, y = circle_coords[0]
|
||||
canvas.create_text(x, y- max_radius-10, text="Sensor 1", font=("Arial", 10), fill="blue")
|
||||
x, y = circle_coords[1]
|
||||
canvas.create_text(x, y+ max_radius+10, text="Sensor 2", font=("Arial", 10), fill="red")
|
||||
x, y = circle_coords[2]
|
||||
canvas.create_text(x, y+ max_radius+10, text="Sensor 3", font=("Arial", 10), fill="green")
|
||||
|
||||
# --- Matplotlib Chart ---
|
||||
canvas_plot = FigureCanvasTkAgg(fig, master=root)
|
||||
canvas_plot.get_tk_widget().pack(pady=10)
|
||||
canvas_plot.draw()
|
||||
|
||||
# Schedule periodic plot updates
|
||||
schedule_update_plot()
|
||||
|
||||
def on_exit():
|
||||
global ser, csv_file, after_id
|
||||
if after_id is not None:
|
||||
root.after_cancel(after_id)
|
||||
if ser and ser.is_open:
|
||||
ser.close()
|
||||
if csv_file:
|
||||
csv_file.close()
|
||||
root.destroy()
|
||||
|
||||
btn_exit = tk.Button(root, text="Exit", font=("Arial", 12), command=on_exit)
|
||||
btn_exit.pack(pady=10)
|
||||
|
||||
root.mainloop()
|
Loading…
x
Reference in New Issue
Block a user