发帖
7 0 0

【电子DIY作品】久坐检测提醒

WT_0213
论坛元老

132

主题

1981

回帖

1万

积分

论坛元老

勤劳的打工人

积分
19474
电子DIY 594 7 2025-10-26 11:28:17
[i=s] 本帖最后由 WT_0213 于 2025-10-26 11:47 编辑 [/i]

考虑到长时间伏案工作,普通的提醒工具只能通过时间提示。如果已经离开办公桌也检测不到。之前申请的Rd-03V2刚好满足,所以就通过Rd-03V2实现感知人体存在。由于主要考虑PC提醒所以这次上位机没有使用单片机,而是使用 USB转串口连接Rd-03V2,使用上报模式来实现较为细致的行为检测。

之间做的都是方方正正的,这次想着美化一下。弄了个机械键盘按键,上面放个键帽。凑合看的吧。还想着加个灯光啥的,手上的led灯比较大。需要把模型增大。就放弃了。

为了演示,视频中示例程序时间改成了15秒,正常设置的是 30 *60 三十分钟 。

石矶娘娘:尔等凡人,久坐如顽石化形~!快起来动动吧。

3.jpg

2.jpg

1.jpg

cgi-bin_mmwebwx-bin_webwxgetmsgimg_&MsgID=4065939803245957212&skey=@crypt_3423.jpg

Snipaste_2025-10-26_10-53-11.png这次小DIY 主要用到 RD03_V2 和 一个串口

Snipaste_2025-10-26_11-22-06.png

Snipaste_2025-10-26_10-56-13.png

PC 端程序:

import sys
import traceback

# 定义全局异常处理函数
def global_exception_handler(exc_type, exc_value, exc_tb):
    # 如果是系统退出的异常,可以忽略
    if exc_type == SystemExit:
        sys.__excepthook__(exc_type, exc_value, exc_tb)
    else:
        # 打印异常信息
        print(f"未处理的异常: {exc_type.__name__}: {exc_value}")
        # 打印详细的堆栈信息
        traceback.print_tb(exc_tb)

# 设置全局异常捕获
sys.excepthook = global_exception_handler
import ctypes
import sys
import time
import binascii
import threading
import serial
import platform
import serial.tools.list_ports
from PyQt5.QtWidgets import QApplication, QSystemTrayIcon, QMainWindow, QVBoxLayout, QMenu, QAction, QWidget, QLabel, QPushButton, QTextEdit, QDesktopWidget, QFileDialog, QComboBox, QMessageBox
from PyQt5.QtGui import QIcon, QMovie
from PyQt5.QtCore import Qt, QThread, pyqtSignal, QTimer
from PyQt5.QtNetwork import QLocalServer, QLocalSocket
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import os

import tkinter as tk
from tkinter import messagebox

class SerialMonitorThread(QThread):

    request_check_devices = pyqtSignal()
    sensor_person_present = pyqtSignal(bool)
  
    def __init__(self, port_name, baudrate=115200):
        super().__init__()
        self.port_name = port_name
        self.baudrate = baudrate
        self.serial_connection = None
        self.is_running = True
        self.buffer = b''
        self.count = 0
        self.open_cmd = bytearray([0xfd, 0xfc, 0xfb, 0xfa, 0x04, 0x00, 0xff, 0x00, 0x01, 0x00, 0x04, 0x03, 0x02, 0x01])
        self.report = bytearray([0xFD, 0xFC, 0xFB, 0xFA, 0x08, 0x00, 0x12, 0x00, 0x00, 0x00, 0x04, 0x00, 0x00, 0x00, 0x04, 0x03, 0x02, 0x01])
        self.save_confif = bytearray([0xFD, 0xFC, 0xFB, 0xFA, 0x02, 0x00, 0xFD, 0x00, 0x04, 0x03, 0x02, 0x01])
        self.close_cmd = bytearray([0xFD, 0xFC, 0xFB, 0xFA, 0x02, 0x00, 0xFE, 0x00, 0x04, 0x03, 0x02, 0x01])

    def run(self):
        while self.is_running:
            try:
                if not self.serial_connection or not self.serial_connection.is_open:
                    self.serial_connection = serial.Serial(self.port_name, baudrate=self.baudrate, timeout=0.05)
                    print(f"已连接: {self.port_name} ")
                    print(f"波特率: {self.baudrate} ")
                    print(f"已连接: {self.port_name} ")
                    self.serial_connection.write(self.open_cmd)
                    self.serial_connection.flush()
                    self.serial_connection.write(self.report)
                    self.serial_connection.flush()
                    self.serial_connection.write(self.save_confif)
                    self.serial_connection.flush()
                    self.serial_connection.write(self.close_cmd)
                    self.serial_connection.flush()

                if self.serial_connection.in_waiting > 0:
                    data = self.serial_connection.read(self.serial_connection.in_waiting)
                    self.process_data(data)
                else:
                    QThread.msleep(10)
            except Exception as e:
                print(f"错误: {str(e)}")
                if self.serial_connection:
                    self.serial_connection.close()
                    self.serial_connection = None
                self.request_check_devices.emit()
                QThread.msleep(1000)  # Wait before retrying
  
    def process_data(self, data):
        try:
            self.buffer += data
            start = self.buffer.find(b"\xf4\xf3\xf2\xf1")
            end = self.buffer.find(b"\xf8\xf7\xf6\xf5")
            print("帧头:", start, end)
            if start  != -1 and end !=-1:
                real_data = self.buffer[start: end + len(b"\xf8\xf7\xf6\xf5") ]
        
                frame_header = real_data[:4]
                # print(f"检测到人体感应数据: {hex_data}")
                print("帧头:", frame_header)
                # 帧尾 F8 F7 F6 F5
                frame_footer = real_data[-4:]
                print("帧尾:", frame_footer)
                # 数据长度 2字节
                data_length = int.from_bytes(real_data[4:6], byteorder='little')  # 假设数据长度是小端
                print("数据长度:", data_length)

                # 检测结果 1字节
                detection_result = real_data[6]
                print("检测结果:", detection_result)
        
                if detection_result == 0:
                    self.count += 1
                else:
                    self.count = 0

                # 目标距离 2字节
                target_distance = int.from_bytes(real_data[7:9], byteorder='little')
                print("目标距离:", target_distance)

                # 各距离门能量(128字节)
                distance_energy = real_data[9:137]  # 128字节的能量数据
                print("各距离门能量(前16字节示例):", distance_energy[:16])
                if detection_result == 1:
                    self.sensor_person_present.emit(True)
                else:
                    if self.count > 3:
                        self.sensor_person_present.emit(False)
                self.buffer = b''
        except Exception as e:
            print(f"{e}")
            self.buffer = b''

    def stop(self):
        self.is_running = False


class SerialDeviceHandler(FileSystemEventHandler):
    def __init__(self, main_window):
        super().__init__()
        self.main_window = main_window

    def on_created(self, event):
        if event.is_directory:
            return
        if "tty" in event.src_path.lower():
            self.main_window.check_serial_devices()


class SerialDeviceMonitorThread(QThread):
    request_check_devices = pyqtSignal()

    def __init__(self, interval=3000):
        super().__init__()
        self.interval = interval
        self.is_running = True

    def run(self):
        while self.is_running:
            self.request_check_devices.emit()
            QThread.msleep(self.interval)

    def stop(self):
        self.is_running = False


class HealthReminderApp(QMainWindow):

    time_update_signal = pyqtSignal(int)  # 用于更新时间标签的信号

    def __init__(self):
        super().__init__()

        self.mVIdPIds = ["VID:1A86/PID:7523", "VID:1A86/PID:55D3", "VID:1A86/PID:55E9"]

        # 设置窗口置顶
        self.setWindowFlags(self.windowFlags() | Qt.WindowStaysOnTopHint)

        # 获取屏幕尺寸
        screen_geometry = QApplication.desktop().availableGeometry()
  
        # 设置窗口位置为右下角
        offset_x = 50
        offset_y = 50
        window_width = 250
        window_height = 230
        self.setGeometry(screen_geometry.width() - window_width - offset_x, screen_geometry.height() - window_height - offset_y, window_width, window_height)

        # 设置应用窗口
        self.setWindowTitle('健康提醒')
        self.icon = QIcon(self.resource_path("icon.ico"))
        self.setWindowIcon(self.icon)
        self.tray = QSystemTrayIcon()
        self.tray.setIcon(self.icon)
        self.tray.setVisible(True)
  
        menu = QMenu()
        show_action = QAction("显示窗口", menu)
        config_action = QAction("设置", menu)
        quit_action = QAction("退出程序", menu)
        # 添加分隔线
        menu.addAction(show_action)
        menu.addSeparator()
        menu.addAction(quit_action)
  
        show_action.triggered.connect(self.show_window)
        quit_action.triggered.connect(self.exit_app)
        self.tray.setContextMenu(menu)
  
        self.tray.activated.connect(self.on_tray_activated)
  
        self.central_widget = QWidget()
        self.setCentralWidget(self.central_widget)
  
        self.layout = QVBoxLayout()
        self.central_widget.setLayout(self.layout)

         # 创建显示GIF的标签
        self.movieLabel = QLabel(self)
        self.movieLabel.setAlignment(Qt.AlignCenter)
        self.movieLabel.setFixedSize(230, 201)
        self.layout.addWidget(self.movieLabel)

        # 加载GIF文件
        self.movie = QMovie(self.resource_path('sport.gif'))
        # 检查动画是否有效
        if self.movie.isValid():
            # 设置动画到标签
            self.movieLabel.setMovie(self.movie)
    
            # 关键设置:使动画充满标签
            self.movieLabel.setScaledContents(True)
    
            # 或者使用setScaledSize精确控制
            # self.movie.setScaledSize(self.movieLabel.size())
    
            # 开始播放动画
            self.movie.start()
        else:
            self.movieLabel.setText("动画文件加载失败")

        # Create label to display elapsed time
        self.time_label = QLabel("已久坐时间: 0秒", self)
        self.time_label.setFixedWidth(200)  # 设置固定宽度,防止文本被截断
        # self.time_label.setAlignment(Qt.AlignCenter)
        # self.time_label.setStyleSheet("font-size: 16px;")
        # self.time_label.setFixedSize(200, 50)
        # self.time_label.move(100, 10)
        self.layout.addWidget(self.time_label)
  
        # 设置久坐提醒时长
        self.reminder_interval = 30 * 60   # 默认30分钟
        self.timer = None
        self.is_person_present = True
        self.elapsed_time = 0  # 累计时间
        self.timer_running = True
        self.monitor_sensor_runing = True

        # 启动检测线程
        self.sensor_thread = threading.Thread(target=self.monitor_sensor)
        self.sensor_thread.start()

        # 启动定时器线程
        self.timer_thread = threading.Thread(target=self.start_timer)
        self.timer_thread.start()

        self.device_monitor_thread = None
        self.serial_thread = None

        # 连接信号到更新方法
        self.time_update_signal.connect(self.update_time_label)

        self.start_serial_device_monitor()

    def resource_path(self, relative_path):
        if hasattr(sys, "_MEIPASS"):
            return os.path.join(sys._MEIPASS, relative_path)
        return os.path.join(os.path.abspath("."), relative_path)


    def show_window(self):
        print("show_window")
        self.show()
        self.raise_()
        self.activateWindow()
  
    def exit_app(self):
        self.tray.hide()
        if self.serial_thread:
            self.serial_thread.stop()
            self.serial_thread.wait()
        if self.device_monitor_thread:
            self.device_monitor_thread.stop()
            self.device_monitor_thread.wait()
        if self.timer_thread:
            self.stop_timer()
        if self.sensor_thread:
            self.stop_monitor_sensor()
        QApplication.quit()

    def on_tray_activated(self, reason):
        if reason == QSystemTrayIcon.DoubleClick:
            print(f"on_tray_activated")
            self.show_window()
  

    def start_timer(self):
        while self.timer_running:
            if self.is_person_present:
                self.elapsed_time += 1
                if self.elapsed_time >= self.reminder_interval:
                    self.safe_show_alert("提示", f"该活动一下,站起来走动走动吧!")
                    self.elapsed_time = 0  # 重置计时器
                self.time_update_signal.emit(self.elapsed_time)  # 发射信号来更新时间标签
            time.sleep(1)

    def stop_timer(self):
        self.timer_running = False

    def update_time_label(self, elapsed_time):
        # 计算已久坐时间的分钟和秒数
        minutes = elapsed_time // 60
        seconds = elapsed_time % 60
        # 计算当前百分比
        percentage = (elapsed_time / self.reminder_interval) * 100
  
        # 根据百分比调整文字颜色
        if percentage >= 90:
            color = "red"  # 红色
        elif percentage >= 80:
            color = "orange"  # 橙色
        else:
            color = "green"  # 绿色

        if self.is_person_present:
            # 更新标签内容和颜色
            self.time_label.setText(f"已久坐时间: {minutes}分{seconds}秒")
            self.time_label.setStyleSheet(f"color: {color};")
        else:
            self.time_label.setText(f"非常棒,运动一下吧。")


    def start_serial_device_monitor(self):
        if self.device_monitor_thread:
            self.device_monitor_thread.stop()
            self.device_monitor_thread.wait()
        self.device_monitor_thread = SerialDeviceMonitorThread()
        self.device_monitor_thread.request_check_devices.connect(self.check_serial_devices)
        self.device_monitor_thread.start()

    def monitor_sensor(self):
        while self.monitor_sensor_runing:
            if not self.is_person_present:
                self.elapsed_time = 0  # 重置计时器
            time.sleep(1)
  
    def stop_monitor_sensor(self):
        self.monitor_sensor_runing = False

    def safe_show_alert(self, title, message):
        try:
            # 创建隐藏根窗口
            root = tk.Tk()
            root.withdraw()
            root.attributes('-topmost', True)  # 确保窗口在前台
    
            # 显示消息框
            messagebox.showinfo(title, message)
    
            # 及时销毁窗口释放资源
            root.after(100, root.destroy)
            root.mainloop()
    
        except Exception as e:
            print(f"弹窗显示失败: {e}")
            # 备用方案:使用系统通知
            import subprocess
            subprocess.run(['osascript', '-e', f'display notification "{message}" with title "{title}"'])

    def show_notification(self, title, text):
        os.system(f"""osascript -e 'display notification "{text}" with title "{title}"'""")

  
    def lock_computer(self):
        os.system('rundll32.exe user32.dll,LockWorkStation')  # 锁定计算机

    def check_serial_devices(self):
        ports = serial.tools.list_ports.comports()
        device_found = False
        for port in ports:
            mVidPid = None
            vid = port.vid
            pid = port.pid
            if vid != None and pid != None:
                mVidPid = "VID:{:04X}/PID:{:04X}".format(vid, pid)
                print(mVidPid)
            if mVidPid in self.mVIdPIds:
                print(f"状态: 等待连接 {port.device} ...")
                print(f"找到串行设备: {port.device} 端口")
                print(f"   -- {port.description}")
                print(f"   -- {mVidPid}")
                self.start_serial_monitor(port.device)
                device_found = True
                if self.device_monitor_thread:
                    self.device_monitor_thread.stop()
                    self.device_monitor_thread.wait()
                    self.device_monitor_thread = None
                break
  
        if not device_found:
            print("状态: 设备未连接")
            if self.serial_thread:
                self.serial_thread.stop()
                self.serial_thread.wait()
                self.serial_thread = None

    def start_serial_monitor(self, port_name):
        if self.serial_thread:
            self.serial_thread.stop()
            self.serial_thread.wait()

        # baudrate = int(self.baudrate_combo.currentText())
        self.serial_thread = SerialMonitorThread(port_name)
        self.serial_thread.request_check_devices.connect(self.start_serial_device_monitor)
        self.serial_thread.sensor_person_present.connect(self.on_sensor_person_present)
        self.serial_thread.start()

    def on_sensor_person_present(self, is_person_present):
        if is_person_present:
            if not self.is_person_present:
                self.elapsed_time = 0  # 重置计时器
            self.is_person_present = True
        else:
            if self.is_person_present:
                self.is_person_present = False
                self.elapsed_time = 0  # 重置计时器
                # 锁定屏幕的Windows API
                ctypes.windll.user32.LockWorkStation()

    def closeEvent(self, event):
        print("closeEvent")
        event.ignore()
        self.hide()
        if platform.system() == "Windows":
            self.tray.showMessage(
                "托盘提示",
                "程序已经最小化到托盘,双击图标或右键菜单显示窗口",
                QSystemTrayIcon.Information,
                500
            )

if __name__ == '__main__':
    try:
        app = QApplication(sys.argv)
        health_reminder = HealthReminderApp()
        health_reminder.show()
        sys.exit(app.exec_())
    except KeyboardInterrupt:
        print("程序已停止")
        sys.exit(0)  # Gracefully exit the program

pyinstaller 打包程序

pyinstaller --clean --debug=all --hidden-import=pkg_resources --hidden-import=importlib_resources --hidden-import asyncio -F -w -i "%cd%\icon.ico" --add-data="icon.ico;." --add-data="sport.gif;." main.py

打包后exe有点大,直接把源码放上来吧

upload 附件:health-reminder.zip

upload 附件:久坐检测提醒.zip

──── 0人觉得很赞 ────

使用道具 举报

2025-10-26 12:27:06
点赞
点赞
2025-10-26 20:53:33
石矶娘娘:留得青山在,不怕没柴烧😁
2025-10-27 08:35:03
iiv 发表于 2025-10-26 20:53
石矶娘娘:留得青山在,不怕没柴烧😁

你小子给我等着
2025-10-27 09:04:25
妙啊
2025-10-27 14:00:01

😄 玛丽哥表挑逗七哥,七哥是正经的人。
2025-10-27 14:01:07
iiv 发表于 2025-10-26 20:53
石矶娘娘:留得青山在,不怕没柴烧😁

最近在看浪浪山了🤭
您需要登录后才可以回帖 立即登录
高级模式
返回
统计信息
  • 会员数: 30596 个
  • 话题数: 44719 篇