我来帮你完成这个旋转桥梁AI+视觉监测项目。基于实训台操作手册和你的项目需求,我将提供完整的文件架构和代码实现。
一、项目文件架构
bridge_monitoring_project/
│
├── data/ # 数据文件夹
│ ├── images/ # 图片数据
│ │ ├── train/ # 训练集图片
│ │ │ ├── images/ # 原始图片
│ │ │ └── labels/ # YOLO格式标注文件
│ │ ├── val/ # 验证集图片
│ │ │ ├── images/
│ │ │ └── labels/
│ │ └── test/ # 测试图片
│ │
│ ├── models/ # 模型文件
│ │ ├── yolov8n.pt # YOLOv8预训练权重
│ │ └── best.pt # 训练后的最佳权重
│ │
│ ├── aruco/ # ArUco标记
│ │ └── aruco_marker_4x4.png # 打印用的ArUco码图片
│ │
│ └── logs/ # 数据日志
│ └── measurement_log.csv # 测量数据记录
│
├── src/ # 源代码文件夹
│ ├── main.py # 主程序入口
│ ├── yolo_detector.py # YOLO检测模块
│ ├── aruco_measurement.py # ArUco测量模块
│ ├── hardware_control.py # 硬件控制模块
│ ├── data_fusion.py # 数据融合处理
│ └── utils.py # 工具函数
│
├── config/ # 配置文件夹
│ ├── config.yaml # 系统配置文件
│ └── camera_calibration.yaml # 相机标定参数
│
├── requirements.txt # 依赖包列表
└── README.md # 项目说明文档
二、所需准备的内容
1. 图片数据(需要你自己准备)
- 桥梁模型图片:50-80张,不同角度、光照
- 入侵物体图片:50-80张(人手、工具、障碍物等)
- 标注文件:使用LabelImg工具,标签为”Bridge”和”Intruder”
2. ArUco标记(我会提供生成代码)
- 4x4_50规格的ArUco码,打印后贴在桥梁模型上
3. 硬件准备
- USB摄像头(实训台自带或外接)
- 桥梁模型(可旋转)
- 可选:蜂鸣器、双色LED模块(如需硬件联动)
三、项目代码实现
1. 依赖包列表 (requirements.txt)
opencv-python==4.8.0.74
opencv-contrib-python==4.8.0.74
numpy==1.24.3
torch==2.0.1
torchvision==0.15.2
ultralytics==8.0.200
PyYAML==6.0.1
pandas==2.0.3
matplotlib==3.7.2
Pillow==10.0.0
2. 配置文件 (config/config.yaml)
# 相机配置
camera:
device_id: 0 # 摄像头ID,0为默认摄像头
width: 640
height: 480
fps: 30
# YOLO配置
yolo:
model_path: "data/models/best.pt" # 训练后的模型路径
conf_threshold: 0.5 # 置信度阈值
iou_threshold: 0.45 # NMS阈值
# ArUco配置
aruco:
dictionary: "DICT_4X4_50" # ArUco字典类型
marker_size: 0.05 # 标记物理尺寸(米)
# 安全阈值
safety:
max_pitch: 10.0 # 最大俯仰角(度)
max_roll: 10.0 # 最大横滚角(度)
max_speed: 2.0 # 最大转速(rad/s)
# 硬件控制(如果使用实训台硬件)
hardware:
use_hardware: false # 是否使用硬件
buzzer_pin: 17 # 蜂鸣器GPIO
led_red_pin: 17 # 红色LED GPIO
led_green_pin: 18 # 绿色LED GPIO
# 日志配置
logging:
save_log: true
log_path: "data/logs/measurement_log.csv"
3. 生成ArUco标记代码
# generate_aruco.py
import cv2
import cv2.aruco as aruco
import numpy as np
# 生成ArUco标记
aruco_dict = aruco.Dictionary_get(aruco.DICT_4X4_50)
marker_size = 200 # 像素大小
# 生成ID为0的标记
marker_id = 0
marker_image = np.zeros((marker_size, marker_size), dtype=np.uint8)
marker_image = aruco.drawMarker(aruco_dict, marker_id, marker_size, marker_image, 1)
# 添加白色边框
border_size = 50
marker_with_border = np.ones((marker_size + 2*border_size, marker_size + 2*border_size), dtype=np.uint8) * 255
marker_with_border[border_size:border_size+marker_size, border_size:border_size+marker_size] = marker_image
# 保存图片
cv2.imwrite('data/aruco/aruco_marker_4x4.png', marker_with_border)
print("ArUco标记已生成并保存到: data/aruco/aruco_marker_4x4.png")
print("请打印此图片并贴在桥梁模型上")
4. 主程序 (src/main.py)
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
旋转桥梁AI+视觉监测主程序
"""
import cv2
import yaml
import time
import sys
import os
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
from yolo_detector import YOLODetector
from aruco_measurement import ArUcoMeasurement
from hardware_control import HardwareController
from data_fusion import DataFusion
from utils import setup_logger, draw_ui
# 加载配置
with open('config/config.yaml', 'r', encoding='utf-8') as f:
config = yaml.safe_load(f)
def main():
# 初始化日志
logger = setup_logger(config['logging']['log_path'])
# 初始化各模块
print("正在初始化系统...")
yolo_detector = YOLODetector(config['yolo'])
aruco_measurement = ArUcoMeasurement(config['aruco'])
hardware = HardwareController(config['hardware']) if config['hardware']['use_hardware'] else None
data_fusion = DataFusion(config['safety'])
# 打开摄像头
cap = cv2.VideoCapture(config['camera']['device_id'])
cap.set(cv2.CAP_PROP_FRAME_WIDTH, config['camera']['width'])
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, config['camera']['height'])
cap.set(cv2.CAP_PROP_FPS, config['camera']['fps'])
if not cap.isOpened():
print("错误:无法打开摄像头")
return
print("系统初始化完成,开始监测...")
print("按 'q' 退出程序")
# 主循环
last_time = time.time()
last_yaw = None
while True:
ret, frame = cap.read()
if not ret:
print("错误:无法读取摄像头画面")
break
current_time = time.time()
dt = current_time - last_time
last_time = current_time
# 1. YOLO检测
detections = yolo_detector.detect(frame)
# 2. 安全判定
has_intruder = any(d['label'] == 'Intruder' for d in detections)
has_bridge = any(d['label'] == 'Bridge' for d in detections)
# 3. 状态处理
status = "正常"
measurement_data = None
if has_intruder:
status = "警报:检测到入侵"
# 触发报警
if hardware:
hardware.trigger_alarm()
else:
if hardware:
hardware.stop_alarm()
if has_bridge:
# 4. ArUco测量
aruco_result = aruco_measurement.measure(frame)
if aruco_result:
# 计算转速
if last_yaw is not None:
yaw_speed = (aruco_result['yaw'] - last_yaw) / dt
else:
yaw_speed = 0.0
last_yaw = aruco_result['yaw']
# 数据融合与安全检查
measurement_data = {
'pitch': aruco_result['pitch'],
'roll': aruco_result['roll'],
'yaw': aruco_result['yaw'],
'speed': yaw_speed,
'x': aruco_result['x'],
'y': aruco_result['y'],
'z': aruco_result['z']
}
# 检查是否超出安全阈值
is_safe, warnings = data_fusion.check_safety(measurement_data)
if not is_safe:
status = f"警告:{', '.join(warnings)}"
if hardware:
hardware.set_warning_light()
else:
if hardware:
hardware.set_normal_light()
# 记录日志
if config['logging']['save_log']:
logger.log_measurement(measurement_data, status)
else:
status = "未检测到ArUco标记"
else:
status = "未检测到桥梁模型"
# 5. 可视化
display_frame = draw_ui(frame, detections, measurement_data, status)
# 显示画面
cv2.imshow('桥梁监测系统', display_frame)
# 按键处理
key = cv2.waitKey(1) & 0xFF
if key == ord('q'):
break
# 清理资源
print("\n正在关闭系统...")
cap.release()
cv2.destroyAllWindows()
if hardware:
hardware.cleanup()
print("系统已安全关闭")
if __name__ == '__main__':
main()
5. YOLO检测模块 (src/yolo_detector.py)
"""
YOLO目标检测模块
"""
from ultralytics import YOLO
import cv2
class YOLODetector:
def __init__(self, config):
self.model = YOLO(config['model_path'])
self.conf_threshold = config['conf_threshold']
self.iou_threshold = config['iou_threshold']
def detect(self, frame):
"""执行目标检测"""
results = self.model(frame, conf=self.conf_threshold, iou=self.iou_threshold)
detections = []
for r in results:
boxes = r.boxes
if boxes is not None:
for box in boxes:
x1, y1, x2, y2 = box.xyxy[0].tolist()
conf = box.conf[0].item()
cls = int(box.cls[0].item())
label = self.model.names[cls]
detections.append({
'bbox': [int(x1), int(y1), int(x2), int(y2)],
'confidence': conf,
'label': label
})
return detections
6. ArUco测量模块 (src/aruco_measurement.py)
"""
ArUco标记测量模块
"""
import cv2
import cv2.aruco as aruco
import numpy as np
class ArUcoMeasurement:
def __init__(self, config):
# 设置ArUco字典
dict_name = getattr(aruco, config['dictionary'])
self.aruco_dict = aruco.Dictionary_get(dict_name)
self.aruco_params = aruco.DetectorParameters_create()
self.marker_size = config['marker_size']
# 相机内参(简化版,实际应用需要标定)
self.camera_matrix = np.array([[640, 0, 320],
[0, 640, 240],
[0, 0, 1]], dtype=float)
self.dist_coeffs = np.zeros((5, 1))
def measure(self, frame):
"""检测ArUco标记并计算姿态"""
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
# 检测标记
corners, ids, rejected = aruco.detectMarkers(
gray, self.aruco_dict, parameters=self.aruco_params
)
if ids is not None and len(ids) > 0:
# 估计姿态
rvecs, tvecs, _ = aruco.estimatePoseSingleMarkers(
corners, self.marker_size, self.camera_matrix, self.dist_coeffs
)
# 提取第一个标记的数据
rvec = rvecs[0][0]
tvec = tvecs[0][0]
# 转换旋转向量为欧拉角
rotation_matrix, _ = cv2.Rodrigues(rvec)
euler_angles = self.rotation_matrix_to_euler_angles(rotation_matrix)
# 绘制坐标轴
aruco.drawAxis(frame, self.camera_matrix, self.dist_coeffs,
rvec, tvec, self.marker_size * 0.5)
return {
'x': tvec[0],
'y': tvec[1],
'z': tvec[2],
'roll': np.degrees(euler_angles[0]),
'pitch': np.degrees(euler_angles[1]),
'yaw': np.degrees(euler_angles[2]),
'corners': corners[0]
}
return None
def rotation_matrix_to_euler_angles(self, R):
"""旋转矩阵转欧拉角"""
sy = np.sqrt(R[0, 0] * R[0, 0] + R[1, 0] * R[1, 0])
singular = sy < 1e-6
if not singular:
x = np.arctan2(R[2, 1], R[2, 2])
y = np.arctan2(-R[2, 0], sy)
z = np.arctan2(R[1, 0], R[0, 0])
else:
x = np.arctan2(-R[1, 2], R[1, 1])
y = np.arctan2(-R[2, 0], sy)
z = 0
return np.array([x, y, z])
7. 硬件控制模块 (src/hardware_control.py)
"""
硬件控制模块(基于实训台)
"""
import time
class HardwareController:
def __init__(self, config):
self.use_hardware = config['use_hardware']
if self.use_hardware:
try:
import RPi.GPIO as GPIO
self.GPIO = GPIO
self.GPIO.setmode(GPIO.BCM)
self.GPIO.setwarnings(False)
# 初始化引脚
self.buzzer_pin = config['buzzer_pin']
self.led_red_pin = config['led_red_pin']
self.led_green_pin = config['led_green_pin']
self.GPIO.setup(self.buzzer_pin, GPIO.OUT)
self.GPIO.setup(self.led_red_pin, GPIO.OUT)
self.GPIO.setup(self.led_green_pin, GPIO.OUT)
# 初始状态
self.GPIO.output(self.buzzer_pin, GPIO.HIGH) # 蜂鸣器关闭
self.GPIO.output(self.led_red_pin, GPIO.LOW)
self.GPIO.output(self.led_green_pin, GPIO.HIGH) # 绿灯亮
self.hardware_available = True
except ImportError:
print("警告:未检测到GPIO库,硬件控制功能将被禁用")
self.hardware_available = False
else:
self.hardware_available = False
def trigger_alarm(self):
"""触发报警"""
if self.hardware_available:
self.GPIO.output(self.buzzer_pin, GPIO.LOW) # 蜂鸣器响
self.GPIO.output(self.led_red_pin, GPIO.HIGH) # 红灯亮
self.GPIO.output(self.led_green_pin, GPIO.LOW)
def stop_alarm(self):
"""停止报警"""
if self.hardware_available:
self.GPIO.output(self.buzzer_pin, GPIO.HIGH) # 蜂鸣器停
self.GPIO.output(self.led_red_pin, GPIO.LOW)
self.GPIO.output(self.led_green_pin, GPIO.HIGH) # 绿灯亮
def set_warning_light(self):
"""设置警告灯(黄色效果)"""
if self.hardware_available:
self.GPIO.output(self.led_red_pin, GPIO.HIGH)
self.GPIO.output(self.led_green_pin, GPIO.HIGH)
def set_normal_light(self):
"""设置正常灯"""
if self.hardware_available:
self.GPIO.output(self.led_red_pin, GPIO.LOW)
self.GPIO.output(self.led_green_pin, GPIO.HIGH)
def cleanup(self):
"""清理GPIO资源"""
if self.hardware_available:
self.GPIO.cleanup()
8. 数据融合模块 (src/data_fusion.py)
"""
数据融合与安全判定模块
"""
import numpy as np
class DataFusion:
def __init__(self, config):
self.max_pitch = config['max_pitch']
self.max_roll = config['max_roll']
self.max_speed = config['max_speed']
# 数据平滑缓存
self.history_size = 5
self.pitch_history = []
self.roll_history = []
self.speed_history = []
def check_safety(self, measurement_data):
"""检查安全状态"""
warnings = []
# 更新历史数据
self._update_history(measurement_data)
# 获取平滑后的数据
smooth_pitch = self._get_smooth_value(self.pitch_history)
smooth_roll = self._get_smooth_value(self.roll_history)
smooth_speed = self._get_smooth_value(self.speed_history)
# 检查各项指标
if abs(smooth_pitch) > self.max_pitch:
warnings.append(f"俯仰角过大({smooth_pitch:.1f}°)")
if abs(smooth_roll) > self.max_roll:
warnings.append(f"横滚角过大({smooth_roll:.1f}°)")
if abs(smooth_speed) > self.max_speed:
warnings.append(f"转速过快({smooth_speed:.2f} rad/s)")
is_safe = len(warnings) == 0
return is_safe, warnings
def _update_history(self, data):
"""更新历史数据"""
self.pitch_history.append(data['pitch'])
self.roll_history.append(data['roll'])
self.speed_history.append(data['speed'])
# 保持固定长度
if len(self.pitch_history) > self.history_size:
self.pitch_history.pop(0)
if len(self.roll_history) > self.history_size:
self.roll_history.pop(0)
if len(self.speed_history) > self.history_size:
self.speed_history.pop(0)
def _get_smooth_value(self, history):
"""获取平滑值"""
if len(history) == 0:
return 0.0
# 去除异常值后取平均
if len(history) >= 3:
sorted_values = sorted(history)
# 去掉最大最小值
return np.mean(sorted_values[1:-1])
else:
return np.mean(history)
9. 工具函数模块 (src/utils.py)
"""
工具函数模块
"""
import cv2
import pandas as pd
from datetime import datetime
import os
class MeasurementLogger:
def __init__(self, log_path):
self.log_path = log_path
os.makedirs(os.path.dirname(log_path), exist_ok=True)
# 初始化CSV文件
if not os.path.exists(log_path):
df = pd.DataFrame(columns=['timestamp', 'pitch', 'roll', 'yaw',
'speed', 'x', 'y', 'z', 'status'])
df.to_csv(log_path, index=False)
def log_measurement(self, data, status):
"""记录测量数据"""
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
log_entry = {
'timestamp': timestamp,
'pitch': data.get('pitch', 0),
'roll': data.get('roll', 0),
'yaw': data.get('yaw', 0),
'speed': data.get('speed', 0),
'x': data.get('x', 0),
'y': data.get('y', 0),
'z': data.get('z', 0),
'status': status
}
df = pd.DataFrame([log_entry])
df.to_csv(self.log_path, mode='a', header=False, index=False)
def setup_logger(log_path):
"""设置日志记录器"""
return MeasurementLogger(log_path)
def draw_ui(frame, detections, measurement_data, status):
"""绘制UI界面"""
display_frame = frame.copy()
# 1. 绘制YOLO检测框
for det in detections:
x1, y1, x2, y2 = det['bbox']
label = det['label']
conf = det['confidence']
# 选择颜色
if label == 'Bridge':
color = (255, 0, 0) # 蓝色
elif label == 'Intruder':
color = (0, 0, 255) # 红色
else:
color = (0, 255, 0) # 绿色
# 绘制边框
cv2.rectangle(display_frame, (x1, y1), (x2, y2), color, 2)
# 绘制标签
label_text = f"{label}: {conf:.2f}"
label_size = cv2.getTextSize(label_text, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1)[0]
cv2.rectangle(display_frame, (x1, y1-20), (x1+label_size[0], y1), color, -1)
cv2.putText(display_frame, label_text, (x1, y1-5),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)
# 2. 绘制数据面板背景
panel_height = 150
overlay = display_frame.copy()
cv2.rectangle(overlay, (0, 0), (display_frame.shape[1], panel_height),
(0, 0, 0), -1)
cv2.addWeighted(overlay, 0.7, display_frame, 0.3, 0, display_frame)
# 3. 显示测量数据
y_offset = 25
if measurement_data:
cv2.putText(display_frame, f"Pitch: {measurement_data['pitch']:.1f} deg",
(10, y_offset), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)
cv2.putText(display_frame, f"Roll: {measurement_data['roll']:.1f} deg",
(10, y_offset+25), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)
cv2.putText(display_frame, f"Yaw Speed: {measurement_data['speed']:.2f} rad/s",
(10, y_offset+50), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)
cv2.putText(display_frame, f"Position: X={measurement_data['x']:.3f}, Y={measurement_data['y']:.3f}, Z={measurement_data['z']:.3f}",
(10, y_offset+75), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)
else:
cv2.putText(display_frame, "等待测量数据...",
(10, y_offset), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 0), 2)
# 4. 显示系统状态
status_color = (0, 255, 0) if "正常" in status else (0, 0, 255)
cv2.putText(display_frame, f"状态: {status}",
(10, y_offset+100), cv2.FONT_HERSHEY_SIMPLEX, 0.7, status_color, 2)
# 5. 显示时间戳
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
cv2.putText(display_frame, timestamp,
(display_frame.shape[1]-200, y_offset),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)
return display_frame
10. YOLO训练脚本 (train_yolo.py)
"""
YOLO模型训练脚本
"""
from ultralytics import YOLO
import yaml
# 创建数据集配置文件
dataset_config = {
'path': './data',
'train': 'images/train',
'val': 'images/val',
'names': {
0: 'Bridge',
1: 'Intruder'
}
}
with open('data/dataset.yaml', 'w') as f:
yaml.dump(dataset_config, f)
# 训练模型
model = YOLO('yolov8n.pt') # 加载预训练模型
# 开始训练
results = model.train(
data='data/dataset.yaml',
epochs=50,
imgsz=640,
batch=8,
name='bridge_detection',
patience=10,
save=True,
device='0' # 使用GPU,如果没有GPU则改为'cpu'
)
print("训练完成!最佳模型保存在 runs/detect/bridge_detection/weights/best.pt")
四、使用步骤
1. 环境准备
# 创建项目文件夹结构
mkdir -p bridge_monitoring_project/{data/{images/{train/{images,labels},val/{images,labels},test},models,aruco,logs},src,config}
# 安装依赖
cd bridge_monitoring_project
pip install -r requirements.txt
2. 准备数据
- 生成并打印ArUco标记:
python generate_aruco.py
- 采集并标注数据:
- 使用LabelImg标注工具
- 标签设置为:Bridge, Intruder
- 将标注好的数据放入对应文件夹
3. 训练YOLO模型
python train_yolo.py
4. 运行主程序
cd src
python main.py
五、调试建议
- 先测试各模块:
- 单独运行ArUco检测
- 单独测试YOLO推理
- 最后整合
- 相机标定(可选但推荐):
- 使用棋盘格标定相机
- 获得准确的相机内参
- 性能优化:
- 如果运行卡顿,降低相机分辨率
- 使用更轻量的YOLO模型
- 数据增强:
- 多角度拍摄
- 不同光照条件
- 添加噪声和模糊
这个项目框架完整且模块化,你可以根据实际情况调整参数和功能。记得先从简单的功能开始测试,逐步完善整个系统。
0 条评论