飞腾派坐姿检测

张开发
2026/5/19 8:14:01 15 分钟阅读
飞腾派坐姿检测
硬件飞腾派、USB 免驱摄像头实现原理PaddleHub 实现人体骨骼关键点识别根据关键点判断坐姿状态1.判断歪头 根据 head_top 坐标和 right_shoulder、left_shoulder 中心点的连线偏移角度来判定头部倾斜角度2.判断低头 根据头顶到脖子的距离和脖子到胸腔的距离进行比较部署模型•创建虚拟环境python3 -m venv ~/paddle_env•激活虚拟环境source ~/paddle_env/bin/activate•安装 PaddlePaddlepip install paddlepaddle -i https://pypi.tuna.tsinghua.edu.cn/simple•安装 PaddleHubpip install --upgrade paddlehub -i https://pypi.tuna.tsinghua.edu.cn/simple•使用 PaddleHub 安装骨骼关键点识别模型hub install human_pose_estimation_resnet50_mpii程序开发import os import sys import time import math import signal import json import glob from collections import deque from datetime import datetime import cv2 import paddlehub as hub CAMERA_DEVICE /dev/video0 INTERVAL 15 MQTT_TOPIC ctrl/lyh SAVE_DIR ./imgs MAX_SAVED_IMAGES 5 STATUS_FAIL 0 # 失败/异常 STATUS_NORMAL 1 # 正常 STATUS_HEAD_DOWN 2 # 低头 STATUS_HEAD_TILT 3 # 歪头/头部倾斜 running True saved_image_paths deque(maxlenMAX_SAVED_IMAGES)print([系统] 正在加载模型...) try: pose_estimation hub.Module(namehuman_pose_estimation_resnet50_mpii) print([系统] 模型加载完成) except Exception as e: print(f[系统] 模型加载失败: {e}) sys.exit(1)def Center_Point(point_1, point_2): 计算中心点 center_x math.fabs((point_1[0] - point_2[0]) / 2) min(point_1[0], point_2[0]) center_y math.fabs((point_1[1] - point_2[1]) / 2) min(point_1[1], point_2[1]) return [center_x, center_y] def Cal_Ang(point_1, point_2, point_3): 计算夹角 a math.sqrt((point_2[0] - point_3[0])**2 (point_2[1] - point_3[1])**2) b math.sqrt((point_1[0] - point_3[0])**2 (point_1[1] - point_3[1])**2) c math.sqrt((point_1[0] - point_2[0])**2 (point_1[1] - point_2[1])**2) if a * c 0: return 90.0 cos_value (b*b - a*a - c*c) / (-2 * a * c) cos_value max(-1.0, min(1.0, cos_value)) return math.degrees(math.acos(cos_value))def detect_pose_from_frame(frame): 检测单帧图像返回状态码 0: 失败, 1: 正常, 2: 低头, 3: 歪头 try: temp_path /tmp/pose_temp.jpg cv2.imwrite(temp_path, frame) result pose_estimation.keypoint_detection( paths[temp_path], visualizationFalse ) if os.path.exists(temp_path): os.remove(temp_path) if not result or len(result) 0: return STATUS_FAIL, 未检测到人体, None data result[0][data] left_shoulder data.get(left_shoulder) right_shoulder data.get(right_shoulder) head_top data.get(head_top) upper_neck data.get(upper_neck) thorax data.get(thorax) if not all([left_shoulder, right_shoulder, head_top, upper_neck, thorax]): return STATUS_FAIL, 关键点不完整, None head_neck math.fabs(head_top[1] - upper_neck[1]) neck_thorax math.fabs(thorax[1] - upper_neck[1]) is_head_down (head_neck / 3.0 neck_thorax) center_point Center_Point(left_shoulder, right_shoulder) head_angle Cal_Ang(head_top, center_point, right_shoulder) is_head_tilt (head_angle 80 or head_angle 100) if is_head_tilt: return STATUS_HEAD_TILT, 歪头, data elif is_head_down: return STATUS_HEAD_DOWN, 低头, data else: return STATUS_NORMAL, 正常, data except Exception as e: return STATUS_FAIL, f检测异常: {str(e)}, None def save_annotated_image(frame, keypoints_data, status_code): 保存带标注的图像带关键点和连线 global saved_image_paths os.makedirs(SAVE_DIR, exist_okTrue) annotated frame.copy() # 颜色定义 color_keypoint (0, 255, 0) # 绿色 - 关键点 color_line (255, 0, 0) # 蓝色 - 连线 color_text (0, 0, 255) # 红色 - 文字 # 关键点映射 keypoints_map { head_top: keypoints_data.get(head_top), upper_neck: keypoints_data.get(upper_neck), thorax: keypoints_data.get(thorax), left_shoulder: keypoints_data.get(left_shoulder), right_shoulder: keypoints_data.get(right_shoulder), left_elbow: keypoints_data.get(left_elbow), right_elbow: keypoints_data.get(right_elbow), left_wrist: keypoints_data.get(left_wrist), right_wrist: keypoints_data.get(right_wrist), } # 骨架连接关系 connections [ (head_top, upper_neck), (upper_neck, thorax), (thorax, left_shoulder), (thorax, right_shoulder), (left_shoulder, left_elbow), (left_elbow, left_wrist), (right_shoulder, right_elbow), (right_elbow, right_wrist), ] # 绘制连线 for p1_name, p2_name in connections: p1 keypoints_map.get(p1_name) p2 keypoints_map.get(p2_name) if p1 and p2: pt1 (int(p1[0]), int(p1[1])) pt2 (int(p2[0]), int(p2[1])) cv2.line(annotated, pt1, pt2, color_line, 2) # 绘制关键点 for name, point in keypoints_map.items(): if point: x, y int(point[0]), int(point[1]) cv2.circle(annotated, (x, y), 4, color_keypoint, -1) cv2.circle(annotated, (x, y), 4, (0, 0, 0), 1) # 添加状态文字 status_texts {0: FAIL, 1: NORMAL, 2: HEAD DOWN, 3: HEAD TILT} text fStatus: {status_code} ({status_texts.get(status_code, UNKNOWN)}) cv2.putText(annotated, text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.8, color_text, 2) # 添加时间戳 timestamp datetime.now().strftime(%H:%M:%S) cv2.putText(annotated, timestamp, (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.6, color_text, 1) # 保存文件 filename fpose_{datetime.now().strftime(%H%M%S)}_{status_code}.jpg filepath os.path.join(SAVE_DIR, filename) cv2.imwrite(filepath, annotated) saved_image_paths.append(filepath) # 清理旧文件 all_files sorted(glob.glob(os.path.join(SAVE_DIR, pose_*.jpg)), keyos.path.getctime, reverseTrue) for old_file in all_files[MAX_SAVED_IMAGES:]: try: os.remove(old_file) except: pass print(f[图像] 已保存: {filepath}) return filepathdef main(): global running print(f[系统] 正在打开摄像头: {CAMERA_DEVICE}) cap cv2.VideoCapture(CAMERA_DEVICE) if not cap.isOpened(): print(f[系统] 无法打开 {CAMERA_DEVICE}尝试索引 0...) cap cv2.VideoCapture(0) if not cap.isOpened(): print([系统] 无法打开任何摄像头) return cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640) cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480) print(f[系统] 摄像头就绪 ({int(cap.get(3))}x{int(cap.get(4))})) print(f[系统] 检测间隔: {INTERVAL}秒) print( * 60) print(按 CtrlC 停止) print( * 60) last_detection_time 0 frame_count 0 while running: ret, frame cap.read() if not ret: time.sleep(0.1) continue frame_count 1 current_time time.time() if current_time - last_detection_time INTERVAL: timestamp datetime.now().strftime(%H:%M:%S) status_code, status_text, keypoints_data detect_pose_from_frame(frame) if keypoints_data is not None: save_annotated_image(frame, keypoints_data, status_code) print(f[{timestamp}] 帧{frame_count} | 姿势: {status_code}({status_text}), end) last_detection_time current_time time.sleep(0.1) cap.release() print([系统] 已停止) if __name__ __main__: main()运行结果

更多文章