我来帮你完成这个旋转桥梁AI+视觉监测项目。基于实训台操作手册和你的项目需求,我将提供完整的文件架构和代码实现。

一、项目文件架构


bridge_monitoring_project/

├── data/                           # 数据文件夹
│   ├── images/                     # 图片数据
│   │   ├── train/                  # 训练集图片
│   │   │   ├── images/             # 原始图片
│   │   │   └── labels/             # YOLO格式标注文件
│   │   ├── val/                    # 验证集图片
│   │   │   ├── images/
│   │   │   └── labels/
│   │   └── test/                   # 测试图片
│   │
│   ├── models/                     # 模型文件
│   │   ├── yolov8n.pt             # YOLOv8预训练权重
│   │   └── best.pt                # 训练后的最佳权重
│   │
│   ├── aruco/                      # ArUco标记
│   │   └── aruco_marker_4x4.png   # 打印用的ArUco码图片
│   │
│   └── logs/                       # 数据日志
│       └── measurement_log.csv     # 测量数据记录

├── src/                            # 源代码文件夹
│   ├── main.py                     # 主程序入口
│   ├── yolo_detector.py            # YOLO检测模块
│   ├── aruco_measurement.py        # ArUco测量模块
│   ├── hardware_control.py         # 硬件控制模块
│   ├── data_fusion.py              # 数据融合处理
│   └── utils.py                    # 工具函数

├── config/                         # 配置文件夹
│   ├── config.yaml                 # 系统配置文件
│   └── camera_calibration.yaml     # 相机标定参数

├── requirements.txt                # 依赖包列表
└── README.md                       # 项目说明文档

二、所需准备的内容

1. 图片数据(需要你自己准备)

  • 桥梁模型图片:50-80张,不同角度、光照
  • 入侵物体图片:50-80张(人手、工具、障碍物等)
  • 标注文件:使用LabelImg工具,标签为”Bridge”和”Intruder”

2. ArUco标记(我会提供生成代码)

  • 4x4_50规格的ArUco码,打印后贴在桥梁模型上

3. 硬件准备

  • USB摄像头(实训台自带或外接)
  • 桥梁模型(可旋转)
  • 可选:蜂鸣器、双色LED模块(如需硬件联动)

三、项目代码实现

1. 依赖包列表 (requirements.txt)


opencv-python==4.8.0.74
opencv-contrib-python==4.8.0.74
numpy==1.24.3
torch==2.0.1
torchvision==0.15.2
ultralytics==8.0.200
PyYAML==6.0.1
pandas==2.0.3
matplotlib==3.7.2
Pillow==10.0.0

2. 配置文件 (config/config.yaml)


# 相机配置
camera:
  device_id: 0  # 摄像头ID,0为默认摄像头
  width: 640
  height: 480
  fps: 30

# YOLO配置
yolo:
  model_path: "data/models/best.pt"  # 训练后的模型路径
  conf_threshold: 0.5  # 置信度阈值
  iou_threshold: 0.45  # NMS阈值

# ArUco配置
aruco:
  dictionary: "DICT_4X4_50"  # ArUco字典类型
  marker_size: 0.05  # 标记物理尺寸(米)

# 安全阈值
safety:
  max_pitch: 10.0  # 最大俯仰角(度)
  max_roll: 10.0   # 最大横滚角(度)
  max_speed: 2.0    # 最大转速(rad/s)

# 硬件控制(如果使用实训台硬件)
hardware:
  use_hardware: false  # 是否使用硬件
  buzzer_pin: 17      # 蜂鸣器GPIO
  led_red_pin: 17     # 红色LED GPIO
  led_green_pin: 18   # 绿色LED GPIO

# 日志配置
logging:
  save_log: true
  log_path: "data/logs/measurement_log.csv"

3. 生成ArUco标记代码


# generate_aruco.py
import cv2
import cv2.aruco as aruco
import numpy as np

# 生成ArUco标记
aruco_dict = aruco.Dictionary_get(aruco.DICT_4X4_50)
marker_size = 200  # 像素大小

# 生成ID为0的标记
marker_id = 0
marker_image = np.zeros((marker_size, marker_size), dtype=np.uint8)
marker_image = aruco.drawMarker(aruco_dict, marker_id, marker_size, marker_image, 1)

# 添加白色边框
border_size = 50
marker_with_border = np.ones((marker_size + 2*border_size, marker_size + 2*border_size), dtype=np.uint8) * 255
marker_with_border[border_size:border_size+marker_size, border_size:border_size+marker_size] = marker_image

# 保存图片
cv2.imwrite('data/aruco/aruco_marker_4x4.png', marker_with_border)
print("ArUco标记已生成并保存到: data/aruco/aruco_marker_4x4.png")
print("请打印此图片并贴在桥梁模型上")

4. 主程序 (src/main.py)


#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
旋转桥梁AI+视觉监测主程序
"""

import cv2
import yaml
import time
import sys
import os
sys.path.append(os.path.dirname(os.path.abspath(__file__)))

from yolo_detector import YOLODetector
from aruco_measurement import ArUcoMeasurement
from hardware_control import HardwareController
from data_fusion import DataFusion
from utils import setup_logger, draw_ui

# 加载配置
with open('config/config.yaml', 'r', encoding='utf-8') as f:
    config = yaml.safe_load(f)

def main():
    # 初始化日志
    logger = setup_logger(config['logging']['log_path'])

    # 初始化各模块
    print("正在初始化系统...")
    yolo_detector = YOLODetector(config['yolo'])
    aruco_measurement = ArUcoMeasurement(config['aruco'])
    hardware = HardwareController(config['hardware']) if config['hardware']['use_hardware'] else None
    data_fusion = DataFusion(config['safety'])

    # 打开摄像头
    cap = cv2.VideoCapture(config['camera']['device_id'])
    cap.set(cv2.CAP_PROP_FRAME_WIDTH, config['camera']['width'])
    cap.set(cv2.CAP_PROP_FRAME_HEIGHT, config['camera']['height'])
    cap.set(cv2.CAP_PROP_FPS, config['camera']['fps'])

    if not cap.isOpened():
        print("错误:无法打开摄像头")
        return

    print("系统初始化完成,开始监测...")
    print("按 'q' 退出程序")

    # 主循环
    last_time = time.time()
    last_yaw = None

    while True:
        ret, frame = cap.read()
        if not ret:
            print("错误:无法读取摄像头画面")
            break

        current_time = time.time()
        dt = current_time - last_time
        last_time = current_time

        # 1. YOLO检测
        detections = yolo_detector.detect(frame)

        # 2. 安全判定
        has_intruder = any(d['label'] == 'Intruder' for d in detections)
        has_bridge = any(d['label'] == 'Bridge' for d in detections)

        # 3. 状态处理
        status = "正常"
        measurement_data = None

        if has_intruder:
            status = "警报:检测到入侵"
            # 触发报警
            if hardware:
                hardware.trigger_alarm()
        else:
            if hardware:
                hardware.stop_alarm()

            if has_bridge:
                # 4. ArUco测量
                aruco_result = aruco_measurement.measure(frame)

                if aruco_result:
                    # 计算转速
                    if last_yaw is not None:
                        yaw_speed = (aruco_result['yaw'] - last_yaw) / dt
                    else:
                        yaw_speed = 0.0
                    last_yaw = aruco_result['yaw']

                    # 数据融合与安全检查
                    measurement_data = {
                        'pitch': aruco_result['pitch'],
                        'roll': aruco_result['roll'],
                        'yaw': aruco_result['yaw'],
                        'speed': yaw_speed,
                        'x': aruco_result['x'],
                        'y': aruco_result['y'],
                        'z': aruco_result['z']
                    }

                    # 检查是否超出安全阈值
                    is_safe, warnings = data_fusion.check_safety(measurement_data)
                    if not is_safe:
                        status = f"警告:{', '.join(warnings)}"
                        if hardware:
                            hardware.set_warning_light()
                    else:
                        if hardware:
                            hardware.set_normal_light()

                    # 记录日志
                    if config['logging']['save_log']:
                        logger.log_measurement(measurement_data, status)
                else:
                    status = "未检测到ArUco标记"
            else:
                status = "未检测到桥梁模型"

        # 5. 可视化
        display_frame = draw_ui(frame, detections, measurement_data, status)

        # 显示画面
        cv2.imshow('桥梁监测系统', display_frame)

        # 按键处理
        key = cv2.waitKey(1) & 0xFF
        if key == ord('q'):
            break

    # 清理资源
    print("\n正在关闭系统...")
    cap.release()
    cv2.destroyAllWindows()
    if hardware:
        hardware.cleanup()
    print("系统已安全关闭")

if __name__ == '__main__':
    main()

5. YOLO检测模块 (src/yolo_detector.py)


"""
YOLO目标检测模块
"""
from ultralytics import YOLO
import cv2

class YOLODetector:
    def __init__(self, config):
        self.model = YOLO(config['model_path'])
        self.conf_threshold = config['conf_threshold']
        self.iou_threshold = config['iou_threshold']

    def detect(self, frame):
        """执行目标检测"""
        results = self.model(frame, conf=self.conf_threshold, iou=self.iou_threshold)

        detections = []
        for r in results:
            boxes = r.boxes
            if boxes is not None:
                for box in boxes:
                    x1, y1, x2, y2 = box.xyxy[0].tolist()
                    conf = box.conf[0].item()
                    cls = int(box.cls[0].item())
                    label = self.model.names[cls]

                    detections.append({
                        'bbox': [int(x1), int(y1), int(x2), int(y2)],
                        'confidence': conf,
                        'label': label
                    })

        return detections

6. ArUco测量模块 (src/aruco_measurement.py)


"""
ArUco标记测量模块
"""
import cv2
import cv2.aruco as aruco
import numpy as np

class ArUcoMeasurement:
    def __init__(self, config):
        # 设置ArUco字典
        dict_name = getattr(aruco, config['dictionary'])
        self.aruco_dict = aruco.Dictionary_get(dict_name)
        self.aruco_params = aruco.DetectorParameters_create()
        self.marker_size = config['marker_size']

        # 相机内参(简化版,实际应用需要标定)
        self.camera_matrix = np.array([[640, 0, 320],
                                      [0, 640, 240],
                                      [0, 0, 1]], dtype=float)
        self.dist_coeffs = np.zeros((5, 1))

    def measure(self, frame):
        """检测ArUco标记并计算姿态"""
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)

        # 检测标记
        corners, ids, rejected = aruco.detectMarkers(
            gray, self.aruco_dict, parameters=self.aruco_params
        )

        if ids is not None and len(ids) > 0:
            # 估计姿态
            rvecs, tvecs, _ = aruco.estimatePoseSingleMarkers(
                corners, self.marker_size, self.camera_matrix, self.dist_coeffs
            )

            # 提取第一个标记的数据
            rvec = rvecs[0][0]
            tvec = tvecs[0][0]

            # 转换旋转向量为欧拉角
            rotation_matrix, _ = cv2.Rodrigues(rvec)
            euler_angles = self.rotation_matrix_to_euler_angles(rotation_matrix)

            # 绘制坐标轴
            aruco.drawAxis(frame, self.camera_matrix, self.dist_coeffs,
                          rvec, tvec, self.marker_size * 0.5)

            return {
                'x': tvec[0],
                'y': tvec[1],
                'z': tvec[2],
                'roll': np.degrees(euler_angles[0]),
                'pitch': np.degrees(euler_angles[1]),
                'yaw': np.degrees(euler_angles[2]),
                'corners': corners[0]
            }

        return None

    def rotation_matrix_to_euler_angles(self, R):
        """旋转矩阵转欧拉角"""
        sy = np.sqrt(R[0, 0] * R[0, 0] + R[1, 0] * R[1, 0])
        singular = sy < 1e-6

        if not singular:
            x = np.arctan2(R[2, 1], R[2, 2])
            y = np.arctan2(-R[2, 0], sy)
            z = np.arctan2(R[1, 0], R[0, 0])
        else:
            x = np.arctan2(-R[1, 2], R[1, 1])
            y = np.arctan2(-R[2, 0], sy)
            z = 0

        return np.array([x, y, z])

7. 硬件控制模块 (src/hardware_control.py)


"""
硬件控制模块(基于实训台)
"""
import time

class HardwareController:
    def __init__(self, config):
        self.use_hardware = config['use_hardware']

        if self.use_hardware:
            try:
                import RPi.GPIO as GPIO
                self.GPIO = GPIO
                self.GPIO.setmode(GPIO.BCM)
                self.GPIO.setwarnings(False)

                # 初始化引脚
                self.buzzer_pin = config['buzzer_pin']
                self.led_red_pin = config['led_red_pin']
                self.led_green_pin = config['led_green_pin']

                self.GPIO.setup(self.buzzer_pin, GPIO.OUT)
                self.GPIO.setup(self.led_red_pin, GPIO.OUT)
                self.GPIO.setup(self.led_green_pin, GPIO.OUT)

                # 初始状态
                self.GPIO.output(self.buzzer_pin, GPIO.HIGH)  # 蜂鸣器关闭
                self.GPIO.output(self.led_red_pin, GPIO.LOW)
                self.GPIO.output(self.led_green_pin, GPIO.HIGH)  # 绿灯亮

                self.hardware_available = True
            except ImportError:
                print("警告:未检测到GPIO库,硬件控制功能将被禁用")
                self.hardware_available = False
        else:
            self.hardware_available = False

    def trigger_alarm(self):
        """触发报警"""
        if self.hardware_available:
            self.GPIO.output(self.buzzer_pin, GPIO.LOW)  # 蜂鸣器响
            self.GPIO.output(self.led_red_pin, GPIO.HIGH)  # 红灯亮
            self.GPIO.output(self.led_green_pin, GPIO.LOW)

    def stop_alarm(self):
        """停止报警"""
        if self.hardware_available:
            self.GPIO.output(self.buzzer_pin, GPIO.HIGH)  # 蜂鸣器停
            self.GPIO.output(self.led_red_pin, GPIO.LOW)
            self.GPIO.output(self.led_green_pin, GPIO.HIGH)  # 绿灯亮

    def set_warning_light(self):
        """设置警告灯(黄色效果)"""
        if self.hardware_available:
            self.GPIO.output(self.led_red_pin, GPIO.HIGH)
            self.GPIO.output(self.led_green_pin, GPIO.HIGH)

    def set_normal_light(self):
        """设置正常灯"""
        if self.hardware_available:
            self.GPIO.output(self.led_red_pin, GPIO.LOW)
            self.GPIO.output(self.led_green_pin, GPIO.HIGH)

    def cleanup(self):
        """清理GPIO资源"""
        if self.hardware_available:
            self.GPIO.cleanup()

8. 数据融合模块 (src/data_fusion.py)


"""
数据融合与安全判定模块
"""
import numpy as np

class DataFusion:
    def __init__(self, config):
        self.max_pitch = config['max_pitch']
        self.max_roll = config['max_roll']
        self.max_speed = config['max_speed']

        # 数据平滑缓存
        self.history_size = 5
        self.pitch_history = []
        self.roll_history = []
        self.speed_history = []

    def check_safety(self, measurement_data):
        """检查安全状态"""
        warnings = []

        # 更新历史数据
        self._update_history(measurement_data)

        # 获取平滑后的数据
        smooth_pitch = self._get_smooth_value(self.pitch_history)
        smooth_roll = self._get_smooth_value(self.roll_history)
        smooth_speed = self._get_smooth_value(self.speed_history)

        # 检查各项指标
        if abs(smooth_pitch) > self.max_pitch:
            warnings.append(f"俯仰角过大({smooth_pitch:.1f}°)")

        if abs(smooth_roll) > self.max_roll:
            warnings.append(f"横滚角过大({smooth_roll:.1f}°)")

        if abs(smooth_speed) > self.max_speed:
            warnings.append(f"转速过快({smooth_speed:.2f} rad/s)")

        is_safe = len(warnings) == 0
        return is_safe, warnings

    def _update_history(self, data):
        """更新历史数据"""
        self.pitch_history.append(data['pitch'])
        self.roll_history.append(data['roll'])
        self.speed_history.append(data['speed'])

        # 保持固定长度
        if len(self.pitch_history) > self.history_size:
            self.pitch_history.pop(0)
        if len(self.roll_history) > self.history_size:
            self.roll_history.pop(0)
        if len(self.speed_history) > self.history_size:
            self.speed_history.pop(0)

    def _get_smooth_value(self, history):
        """获取平滑值"""
        if len(history) == 0:
            return 0.0

        # 去除异常值后取平均
        if len(history) >= 3:
            sorted_values = sorted(history)
            # 去掉最大最小值
            return np.mean(sorted_values[1:-1])
        else:
            return np.mean(history)

9. 工具函数模块 (src/utils.py)


"""
工具函数模块
"""
import cv2
import pandas as pd
from datetime import datetime
import os

class MeasurementLogger:
    def __init__(self, log_path):
        self.log_path = log_path
        os.makedirs(os.path.dirname(log_path), exist_ok=True)

        # 初始化CSV文件
        if not os.path.exists(log_path):
            df = pd.DataFrame(columns=['timestamp', 'pitch', 'roll', 'yaw',
                                     'speed', 'x', 'y', 'z', 'status'])
            df.to_csv(log_path, index=False)

    def log_measurement(self, data, status):
        """记录测量数据"""
        timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]

        log_entry = {
            'timestamp': timestamp,
            'pitch': data.get('pitch', 0),
            'roll': data.get('roll', 0),
            'yaw': data.get('yaw', 0),
            'speed': data.get('speed', 0),
            'x': data.get('x', 0),
            'y': data.get('y', 0),
            'z': data.get('z', 0),
            'status': status
        }

        df = pd.DataFrame([log_entry])
        df.to_csv(self.log_path, mode='a', header=False, index=False)

def setup_logger(log_path):
    """设置日志记录器"""
    return MeasurementLogger(log_path)

def draw_ui(frame, detections, measurement_data, status):
    """绘制UI界面"""
    display_frame = frame.copy()

    # 1. 绘制YOLO检测框
    for det in detections:
        x1, y1, x2, y2 = det['bbox']
        label = det['label']
        conf = det['confidence']

        # 选择颜色
        if label == 'Bridge':
            color = (255, 0, 0)  # 蓝色
        elif label == 'Intruder':
            color = (0, 0, 255)  # 红色
        else:
            color = (0, 255, 0)  # 绿色

        # 绘制边框
        cv2.rectangle(display_frame, (x1, y1), (x2, y2), color, 2)

        # 绘制标签
        label_text = f"{label}: {conf:.2f}"
        label_size = cv2.getTextSize(label_text, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1)[0]
        cv2.rectangle(display_frame, (x1, y1-20), (x1+label_size[0], y1), color, -1)
        cv2.putText(display_frame, label_text, (x1, y1-5),
                   cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)

    # 2. 绘制数据面板背景
    panel_height = 150
    overlay = display_frame.copy()
    cv2.rectangle(overlay, (0, 0), (display_frame.shape[1], panel_height),
                 (0, 0, 0), -1)
    cv2.addWeighted(overlay, 0.7, display_frame, 0.3, 0, display_frame)

    # 3. 显示测量数据
    y_offset = 25
    if measurement_data:
        cv2.putText(display_frame, f"Pitch: {measurement_data['pitch']:.1f} deg",
                   (10, y_offset), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)
        cv2.putText(display_frame, f"Roll: {measurement_data['roll']:.1f} deg",
                   (10, y_offset+25), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)
        cv2.putText(display_frame, f"Yaw Speed: {measurement_data['speed']:.2f} rad/s",
                   (10, y_offset+50), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)
        cv2.putText(display_frame, f"Position: X={measurement_data['x']:.3f}, Y={measurement_data['y']:.3f}, Z={measurement_data['z']:.3f}",
                   (10, y_offset+75), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)
    else:
        cv2.putText(display_frame, "等待测量数据...",
                   (10, y_offset), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 0), 2)

    # 4. 显示系统状态
    status_color = (0, 255, 0) if "正常" in status else (0, 0, 255)
    cv2.putText(display_frame, f"状态: {status}",
               (10, y_offset+100), cv2.FONT_HERSHEY_SIMPLEX, 0.7, status_color, 2)

    # 5. 显示时间戳
    timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    cv2.putText(display_frame, timestamp,
               (display_frame.shape[1]-200, y_offset),
               cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)

    return display_frame

10. YOLO训练脚本 (train_yolo.py)


"""
YOLO模型训练脚本
"""
from ultralytics import YOLO
import yaml

# 创建数据集配置文件
dataset_config = {
    'path': './data',
    'train': 'images/train',
    'val': 'images/val',
    'names': {
        0: 'Bridge',
        1: 'Intruder'
    }
}

with open('data/dataset.yaml', 'w') as f:
    yaml.dump(dataset_config, f)

# 训练模型
model = YOLO('yolov8n.pt')  # 加载预训练模型

# 开始训练
results = model.train(
    data='data/dataset.yaml',
    epochs=50,
    imgsz=640,
    batch=8,
    name='bridge_detection',
    patience=10,
    save=True,
    device='0'  # 使用GPU,如果没有GPU则改为'cpu'
)

print("训练完成!最佳模型保存在 runs/detect/bridge_detection/weights/best.pt")

四、使用步骤

1. 环境准备


# 创建项目文件夹结构
mkdir -p bridge_monitoring_project/{data/{images/{train/{images,labels},val/{images,labels},test},models,aruco,logs},src,config}

# 安装依赖
cd bridge_monitoring_project
pip install -r requirements.txt

2. 准备数据

  1. 生成并打印ArUco标记:

python generate_aruco.py
  1. 采集并标注数据:
  • 使用LabelImg标注工具
  • 标签设置为:Bridge, Intruder
  • 将标注好的数据放入对应文件夹

3. 训练YOLO模型


python train_yolo.py

4. 运行主程序


cd src
python main.py

五、调试建议

  1. 先测试各模块
  • 单独运行ArUco检测
  • 单独测试YOLO推理
  • 最后整合
  1. 相机标定(可选但推荐):
  • 使用棋盘格标定相机
  • 获得准确的相机内参
  1. 性能优化
  • 如果运行卡顿,降低相机分辨率
  • 使用更轻量的YOLO模型
  1. 数据增强
  • 多角度拍摄
  • 不同光照条件
  • 添加噪声和模糊

这个项目框架完整且模块化,你可以根据实际情况调整参数和功能。记得先从简单的功能开始测试,逐步完善整个系统。


0 条评论

发表回复

Avatar placeholder

您的邮箱地址不会被公开。 必填项已用 * 标注