123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320 |
- import pandas as pd
- import numpy as np
- import matplotlib.pyplot as plt
- import seaborn as sns
- import os
- import re
- import argparse
- from matplotlib.colors import LinearSegmentedColormap
- from typing import Dict, List, Tuple, Optional
- class HotspotAnalyzer:
- """分析检测报告中的热点区域,生成热力图"""
-
- def __init__(self, csv_path: str, output_dir: str = None):
- """初始化热点分析器
-
- Args:
- csv_path: 检测报告CSV文件路径
- output_dir: 输出目录,默认为CSV文件所在目录
- """
- self.csv_path = csv_path
- self.data = None
- self.camera_data = {}
-
- # 设置输出目录
- if output_dir is None:
- self.output_dir = os.path.dirname(csv_path)
- else:
- self.output_dir = output_dir
- os.makedirs(output_dir, exist_ok=True)
-
- def load_data(self) -> None:
- """加载CSV数据"""
- try:
- self.data = pd.read_csv(self.csv_path)
- print(f"成功加载数据,共 {len(self.data)} 条记录")
- except Exception as e:
- print(f"加载CSV文件失败: {e}")
- raise
-
- def extract_camera_id(self, file_path: str) -> Optional[str]:
- """从文件路径中提取摄像头ID
-
- Args:
- file_path: 图像文件路径
-
- Returns:
- 摄像头ID,如果无法提取则返回None
- """
- # 尝试匹配常见的摄像头ID模式
- # 例如: cam_08_18 或 08_18
- patterns = [
- r'cam_(\d+)_(\d+)', # 匹配 cam_08_18 格式
- r'(\d+)_(\d+)_(\d+)', # 匹配 192_168_210_2_cam_08_18 格式中的 08_18
- r'(\d+)_(\d+)_\d+\.jpg$' # 匹配 01_07_00000.jpg 格式中的 01_07
- ]
-
- for pattern in patterns:
- match = re.search(pattern, file_path)
- if match:
- if len(match.groups()) >= 2:
- return f"{match.group(1)}_{match.group(2)}"
- else:
- return match.group(1)
-
- # 如果无法提取,返回文件名作为ID
- return os.path.basename(file_path).split('.')[0]
-
- def process_data(self) -> None:
- """处理数据,按摄像头ID分组"""
- if self.data is None:
- self.load_data()
-
- # 只处理有检测结果的数据
- detection_data = self.data[self.data['Object Count'] > 0].copy()
-
- if len(detection_data) == 0:
- print("警告: 没有找到任何检测结果")
- return
-
- # 提取摄像头ID并分组
- detection_data['Camera ID'] = detection_data['Image File'].apply(self.extract_camera_id)
-
- # 按摄像头ID分组
- for camera_id, group in detection_data.groupby('Camera ID'):
- self.camera_data[camera_id] = group
- print(f"摄像头 {camera_id}: {len(group)} 个检测结果")
-
- def generate_heatmap(self, camera_id: str = None, resolution: Tuple[int, int] = (1920, 1080),
- grid_size: Tuple[int, int] = (32, 18)) -> None:
- """为指定摄像头生成热力图
-
- Args:
- camera_id: 摄像头ID,如果为None则为所有摄像头生成热力图
- resolution: 图像分辨率,默认为1920x1080
- grid_size: 热力图网格大小,默认为32x18
- """
- if not self.camera_data:
- self.process_data()
-
- if not self.camera_data:
- print("没有可用的检测数据来生成热力图")
- return
-
- # 如果未指定摄像头ID,则为所有摄像头生成热力图
- if camera_id:
- camera_ids = [camera_id]
- else:
- camera_ids = list(self.camera_data.keys())
-
- for cam_id in camera_ids:
- if cam_id not in self.camera_data:
- print(f"未找到摄像头 {cam_id} 的数据")
- continue
-
- # 获取该摄像头的检测数据
- cam_data = self.camera_data[cam_id]
-
- # 创建热力图矩阵
- heatmap = np.zeros(grid_size)
-
- # 填充热力图数据
- for _, row in cam_data.iterrows():
- try:
- # 获取归一化坐标
- if 'Normalized Coordinates' in row and pd.notna(row['Normalized Coordinates']):
- try:
- norm_coords = row['Normalized Coordinates'].split(',')
- if len(norm_coords) >= 2:
- norm_x, norm_y = float(norm_coords[0]), float(norm_coords[1])
- else:
- continue
- except (ValueError, IndexError):
- continue
- else:
- # 如果没有归一化坐标,使用中心点坐标计算
- if pd.notna(row['BBox Center X']) and pd.notna(row['BBox Center Y']):
- center_x, center_y = row['BBox Center X'], row['BBox Center Y']
- norm_x, norm_y = center_x / resolution[0], center_y / resolution[1]
- else:
- continue
-
- # 确保归一化坐标在[0,1]范围内
- norm_x = max(0.0, min(norm_x, 0.999))
- norm_y = max(0.0, min(norm_y, 0.999))
-
- # 将归一化坐标映射到网格
- grid_x = int(norm_x * grid_size[0])
- grid_y = int(norm_y * grid_size[1])
-
- # 确保索引在有效范围内
- if 0 <= grid_x < grid_size[0] and 0 <= grid_y < grid_size[1]:
- # 增加热力值
- heatmap[grid_y, grid_x] += 1
- except Exception as e:
- print(f"处理坐标时出错: {e}, 坐标: {norm_x}, {norm_y}, 网格大小: {grid_size}")
-
- # 绘制热力图
- plt.figure(figsize=(12, 8))
-
- # 设置中文字体
- plt.rcParams['font.sans-serif'] = ['SimHei'] # 使用黑体
- plt.rcParams['axes.unicode_minus'] = False # 解决负号显示问题
- plt.rcParams.update({'font.family':'sans-serif'}) # 刷新字体缓存
-
- # 创建自定义颜色映射
- colors = [(0, 0, 0, 0), (0, 0, 1, 0.5), (0, 1, 0, 0.7), (1, 1, 0, 0.8), (1, 0, 0, 1)]
- cmap = LinearSegmentedColormap.from_list('custom_cmap', colors, N=100)
-
- # 使用seaborn绘制热力图
- ax = sns.heatmap(heatmap, cmap=cmap, annot=False, fmt=".0f",
- cbar_kws={'label': '检测频次'}, square=False)
-
- # 设置标题和标签
- plt.title(f"摄像头 {cam_id} 目标检测热点分布图")
- plt.xlabel("X坐标 (归一化到图像宽度)")
- plt.ylabel("Y坐标 (归一化到图像高度)")
-
- # 反转Y轴,使得原点在左上角
- ax.invert_yaxis()
-
- # 保存图像
- output_path = os.path.join(self.output_dir, f"hotspot_{cam_id}.png")
- plt.savefig(output_path, dpi=300, bbox_inches='tight')
- plt.close()
-
- print(f"已生成摄像头 {cam_id} 的热点分布图: {output_path}")
-
- def generate_all_heatmaps(self, resolution: Tuple[int, int] = (1920, 1080),
- grid_size: Tuple[int, int] = (32, 18)) -> None:
- """为所有摄像头生成热力图"""
- self.generate_heatmap(camera_id=None, resolution=resolution, grid_size=grid_size)
-
- def generate_summary(self) -> None:
- """生成摄像头检测统计摘要"""
- if not self.camera_data:
- self.process_data()
-
- if not self.camera_data:
- print("没有可用的检测数据来生成摘要")
- return
-
- # 创建摘要数据
- summary_data = []
- for camera_id, data in self.camera_data.items():
- # 计算检测频率最高的区域
- if 'BBox Center X' in data.columns and 'BBox Center Y' in data.columns:
- # 使用KMeans聚类找出热点区域
- from sklearn.cluster import KMeans
-
- # 准备坐标数据
- coords = data[['BBox Center X', 'BBox Center Y']].dropna().values
-
- if len(coords) > 0:
- # 确定聚类数量 (根据数据量动态调整)
- n_clusters = min(3, len(coords))
-
- if n_clusters > 0:
- kmeans = KMeans(n_clusters=n_clusters, random_state=42)
- kmeans.fit(coords)
-
- # 获取聚类中心
- centers = kmeans.cluster_centers_
-
- # 计算每个聚类的样本数量
- labels = kmeans.labels_
- counts = np.bincount(labels)
-
- # 找出样本最多的聚类
- max_cluster = np.argmax(counts)
- hotspot_center = centers[max_cluster]
- hotspot_count = counts[max_cluster]
-
- # 计算热点区域占比
- hotspot_ratio = hotspot_count / len(coords)
-
- summary_data.append({
- 'Camera ID': camera_id,
- 'Total Detections': len(data),
- 'Hotspot Center X': round(hotspot_center[0], 2),
- 'Hotspot Center Y': round(hotspot_center[1], 2),
- 'Hotspot Detection Count': int(hotspot_count),
- 'Hotspot Ratio': f"{hotspot_ratio:.2%}"
- })
- else:
- summary_data.append({
- 'Camera ID': camera_id,
- 'Total Detections': len(data),
- 'Hotspot Center X': 'N/A',
- 'Hotspot Center Y': 'N/A',
- 'Hotspot Detection Count': 'N/A',
- 'Hotspot Ratio': 'N/A'
- })
- else:
- summary_data.append({
- 'Camera ID': camera_id,
- 'Total Detections': len(data),
- 'Hotspot Center X': 'N/A',
- 'Hotspot Center Y': 'N/A',
- 'Hotspot Detection Count': 'N/A',
- 'Hotspot Ratio': 'N/A'
- })
-
- # 创建摘要DataFrame并保存为CSV
- if summary_data:
- summary_df = pd.DataFrame(summary_data)
- summary_path = os.path.join(self.output_dir, "hotspot_summary.csv")
- summary_df.to_csv(summary_path, index=False, encoding='utf-8')
- print(f"已生成热点区域摘要报告: {summary_path}")
-
- # 打印摘要信息
- print("\n热点区域摘要:")
- for row in summary_data:
- print(f"摄像头 {row['Camera ID']}: 共 {row['Total Detections']} 个检测结果")
- if row['Hotspot Center X'] != 'N/A':
- print(f" 热点区域中心: ({row['Hotspot Center X']}, {row['Hotspot Center Y']})")
- print(f" 热点区域检测数: {row['Hotspot Detection Count']} ({row['Hotspot Ratio']})")
- print()
- def main():
- parser = argparse.ArgumentParser(description='检测热点区域分析工具')
- parser.add_argument('--csv', type=str, required=True, help='检测报告CSV文件路径')
- parser.add_argument('--output', type=str, default=None, help='输出目录路径')
- parser.add_argument('--resolution', type=str, default='3840x2160', help='图像分辨率,格式为WxH')
- parser.add_argument('--grid', type=str, default='32x18', help='热力图网格大小,格式为WxH')
- parser.add_argument('--camera', type=str, default=None, help='指定摄像头ID进行分析,不指定则分析所有摄像头')
-
- args = parser.parse_args()
-
- # 解析分辨率和网格大小
- try:
- width, height = map(int, args.resolution.split('x'))
- resolution = (width, height)
- except ValueError:
- print(f"无效的分辨率格式: {args.resolution},使用默认值")
- resolution = (1920, 1080)
-
- try:
- grid_width, grid_height = map(int, args.grid.split('x'))
- grid_size = (grid_width, grid_height)
- except ValueError:
- print(f"无效的网格大小格式: {args.grid},使用默认值32x18")
- grid_size = (32, 18)
-
- # 创建分析器并处理数据
- analyzer = HotspotAnalyzer(args.csv, args.output)
- analyzer.load_data()
- analyzer.process_data()
-
- # 生成热力图
- if args.camera:
- analyzer.generate_heatmap(args.camera, resolution, grid_size)
- else:
- analyzer.generate_all_heatmaps(resolution, grid_size)
-
- # 生成摘要报告
- analyzer.generate_summary()
- if __name__ == "__main__":
- main()
|