hotspot_analyzer.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320
  1. import pandas as pd
  2. import numpy as np
  3. import matplotlib.pyplot as plt
  4. import seaborn as sns
  5. import os
  6. import re
  7. import argparse
  8. from matplotlib.colors import LinearSegmentedColormap
  9. from typing import Dict, List, Tuple, Optional
  10. class HotspotAnalyzer:
  11. """分析检测报告中的热点区域,生成热力图"""
  12. def __init__(self, csv_path: str, output_dir: str = None):
  13. """初始化热点分析器
  14. Args:
  15. csv_path: 检测报告CSV文件路径
  16. output_dir: 输出目录,默认为CSV文件所在目录
  17. """
  18. self.csv_path = csv_path
  19. self.data = None
  20. self.camera_data = {}
  21. # 设置输出目录
  22. if output_dir is None:
  23. self.output_dir = os.path.dirname(csv_path)
  24. else:
  25. self.output_dir = output_dir
  26. os.makedirs(output_dir, exist_ok=True)
  27. def load_data(self) -> None:
  28. """加载CSV数据"""
  29. try:
  30. self.data = pd.read_csv(self.csv_path)
  31. print(f"成功加载数据,共 {len(self.data)} 条记录")
  32. except Exception as e:
  33. print(f"加载CSV文件失败: {e}")
  34. raise
  35. def extract_camera_id(self, file_path: str) -> Optional[str]:
  36. """从文件路径中提取摄像头ID
  37. Args:
  38. file_path: 图像文件路径
  39. Returns:
  40. 摄像头ID,如果无法提取则返回None
  41. """
  42. # 尝试匹配常见的摄像头ID模式
  43. # 例如: cam_08_18 或 08_18
  44. patterns = [
  45. r'cam_(\d+)_(\d+)', # 匹配 cam_08_18 格式
  46. r'(\d+)_(\d+)_(\d+)', # 匹配 192_168_210_2_cam_08_18 格式中的 08_18
  47. r'(\d+)_(\d+)_\d+\.jpg$' # 匹配 01_07_00000.jpg 格式中的 01_07
  48. ]
  49. for pattern in patterns:
  50. match = re.search(pattern, file_path)
  51. if match:
  52. if len(match.groups()) >= 2:
  53. return f"{match.group(1)}_{match.group(2)}"
  54. else:
  55. return match.group(1)
  56. # 如果无法提取,返回文件名作为ID
  57. return os.path.basename(file_path).split('.')[0]
  58. def process_data(self) -> None:
  59. """处理数据,按摄像头ID分组"""
  60. if self.data is None:
  61. self.load_data()
  62. # 只处理有检测结果的数据
  63. detection_data = self.data[self.data['Object Count'] > 0].copy()
  64. if len(detection_data) == 0:
  65. print("警告: 没有找到任何检测结果")
  66. return
  67. # 提取摄像头ID并分组
  68. detection_data['Camera ID'] = detection_data['Image File'].apply(self.extract_camera_id)
  69. # 按摄像头ID分组
  70. for camera_id, group in detection_data.groupby('Camera ID'):
  71. self.camera_data[camera_id] = group
  72. print(f"摄像头 {camera_id}: {len(group)} 个检测结果")
  73. def generate_heatmap(self, camera_id: str = None, resolution: Tuple[int, int] = (1920, 1080),
  74. grid_size: Tuple[int, int] = (32, 18)) -> None:
  75. """为指定摄像头生成热力图
  76. Args:
  77. camera_id: 摄像头ID,如果为None则为所有摄像头生成热力图
  78. resolution: 图像分辨率,默认为1920x1080
  79. grid_size: 热力图网格大小,默认为32x18
  80. """
  81. if not self.camera_data:
  82. self.process_data()
  83. if not self.camera_data:
  84. print("没有可用的检测数据来生成热力图")
  85. return
  86. # 如果未指定摄像头ID,则为所有摄像头生成热力图
  87. if camera_id:
  88. camera_ids = [camera_id]
  89. else:
  90. camera_ids = list(self.camera_data.keys())
  91. for cam_id in camera_ids:
  92. if cam_id not in self.camera_data:
  93. print(f"未找到摄像头 {cam_id} 的数据")
  94. continue
  95. # 获取该摄像头的检测数据
  96. cam_data = self.camera_data[cam_id]
  97. # 创建热力图矩阵
  98. heatmap = np.zeros(grid_size)
  99. # 填充热力图数据
  100. for _, row in cam_data.iterrows():
  101. try:
  102. # 获取归一化坐标
  103. if 'Normalized Coordinates' in row and pd.notna(row['Normalized Coordinates']):
  104. try:
  105. norm_coords = row['Normalized Coordinates'].split(',')
  106. if len(norm_coords) >= 2:
  107. norm_x, norm_y = float(norm_coords[0]), float(norm_coords[1])
  108. else:
  109. continue
  110. except (ValueError, IndexError):
  111. continue
  112. else:
  113. # 如果没有归一化坐标,使用中心点坐标计算
  114. if pd.notna(row['BBox Center X']) and pd.notna(row['BBox Center Y']):
  115. center_x, center_y = row['BBox Center X'], row['BBox Center Y']
  116. norm_x, norm_y = center_x / resolution[0], center_y / resolution[1]
  117. else:
  118. continue
  119. # 确保归一化坐标在[0,1]范围内
  120. norm_x = max(0.0, min(norm_x, 0.999))
  121. norm_y = max(0.0, min(norm_y, 0.999))
  122. # 将归一化坐标映射到网格
  123. grid_x = int(norm_x * grid_size[0])
  124. grid_y = int(norm_y * grid_size[1])
  125. # 确保索引在有效范围内
  126. if 0 <= grid_x < grid_size[0] and 0 <= grid_y < grid_size[1]:
  127. # 增加热力值
  128. heatmap[grid_y, grid_x] += 1
  129. except Exception as e:
  130. print(f"处理坐标时出错: {e}, 坐标: {norm_x}, {norm_y}, 网格大小: {grid_size}")
  131. # 绘制热力图
  132. plt.figure(figsize=(12, 8))
  133. # 设置中文字体
  134. plt.rcParams['font.sans-serif'] = ['SimHei'] # 使用黑体
  135. plt.rcParams['axes.unicode_minus'] = False # 解决负号显示问题
  136. plt.rcParams.update({'font.family':'sans-serif'}) # 刷新字体缓存
  137. # 创建自定义颜色映射
  138. colors = [(0, 0, 0, 0), (0, 0, 1, 0.5), (0, 1, 0, 0.7), (1, 1, 0, 0.8), (1, 0, 0, 1)]
  139. cmap = LinearSegmentedColormap.from_list('custom_cmap', colors, N=100)
  140. # 使用seaborn绘制热力图
  141. ax = sns.heatmap(heatmap, cmap=cmap, annot=False, fmt=".0f",
  142. cbar_kws={'label': '检测频次'}, square=False)
  143. # 设置标题和标签
  144. plt.title(f"摄像头 {cam_id} 目标检测热点分布图")
  145. plt.xlabel("X坐标 (归一化到图像宽度)")
  146. plt.ylabel("Y坐标 (归一化到图像高度)")
  147. # 反转Y轴,使得原点在左上角
  148. ax.invert_yaxis()
  149. # 保存图像
  150. output_path = os.path.join(self.output_dir, f"hotspot_{cam_id}.png")
  151. plt.savefig(output_path, dpi=300, bbox_inches='tight')
  152. plt.close()
  153. print(f"已生成摄像头 {cam_id} 的热点分布图: {output_path}")
  154. def generate_all_heatmaps(self, resolution: Tuple[int, int] = (1920, 1080),
  155. grid_size: Tuple[int, int] = (32, 18)) -> None:
  156. """为所有摄像头生成热力图"""
  157. self.generate_heatmap(camera_id=None, resolution=resolution, grid_size=grid_size)
  158. def generate_summary(self) -> None:
  159. """生成摄像头检测统计摘要"""
  160. if not self.camera_data:
  161. self.process_data()
  162. if not self.camera_data:
  163. print("没有可用的检测数据来生成摘要")
  164. return
  165. # 创建摘要数据
  166. summary_data = []
  167. for camera_id, data in self.camera_data.items():
  168. # 计算检测频率最高的区域
  169. if 'BBox Center X' in data.columns and 'BBox Center Y' in data.columns:
  170. # 使用KMeans聚类找出热点区域
  171. from sklearn.cluster import KMeans
  172. # 准备坐标数据
  173. coords = data[['BBox Center X', 'BBox Center Y']].dropna().values
  174. if len(coords) > 0:
  175. # 确定聚类数量 (根据数据量动态调整)
  176. n_clusters = min(3, len(coords))
  177. if n_clusters > 0:
  178. kmeans = KMeans(n_clusters=n_clusters, random_state=42)
  179. kmeans.fit(coords)
  180. # 获取聚类中心
  181. centers = kmeans.cluster_centers_
  182. # 计算每个聚类的样本数量
  183. labels = kmeans.labels_
  184. counts = np.bincount(labels)
  185. # 找出样本最多的聚类
  186. max_cluster = np.argmax(counts)
  187. hotspot_center = centers[max_cluster]
  188. hotspot_count = counts[max_cluster]
  189. # 计算热点区域占比
  190. hotspot_ratio = hotspot_count / len(coords)
  191. summary_data.append({
  192. 'Camera ID': camera_id,
  193. 'Total Detections': len(data),
  194. 'Hotspot Center X': round(hotspot_center[0], 2),
  195. 'Hotspot Center Y': round(hotspot_center[1], 2),
  196. 'Hotspot Detection Count': int(hotspot_count),
  197. 'Hotspot Ratio': f"{hotspot_ratio:.2%}"
  198. })
  199. else:
  200. summary_data.append({
  201. 'Camera ID': camera_id,
  202. 'Total Detections': len(data),
  203. 'Hotspot Center X': 'N/A',
  204. 'Hotspot Center Y': 'N/A',
  205. 'Hotspot Detection Count': 'N/A',
  206. 'Hotspot Ratio': 'N/A'
  207. })
  208. else:
  209. summary_data.append({
  210. 'Camera ID': camera_id,
  211. 'Total Detections': len(data),
  212. 'Hotspot Center X': 'N/A',
  213. 'Hotspot Center Y': 'N/A',
  214. 'Hotspot Detection Count': 'N/A',
  215. 'Hotspot Ratio': 'N/A'
  216. })
  217. # 创建摘要DataFrame并保存为CSV
  218. if summary_data:
  219. summary_df = pd.DataFrame(summary_data)
  220. summary_path = os.path.join(self.output_dir, "hotspot_summary.csv")
  221. summary_df.to_csv(summary_path, index=False, encoding='utf-8')
  222. print(f"已生成热点区域摘要报告: {summary_path}")
  223. # 打印摘要信息
  224. print("\n热点区域摘要:")
  225. for row in summary_data:
  226. print(f"摄像头 {row['Camera ID']}: 共 {row['Total Detections']} 个检测结果")
  227. if row['Hotspot Center X'] != 'N/A':
  228. print(f" 热点区域中心: ({row['Hotspot Center X']}, {row['Hotspot Center Y']})")
  229. print(f" 热点区域检测数: {row['Hotspot Detection Count']} ({row['Hotspot Ratio']})")
  230. print()
  231. def main():
  232. parser = argparse.ArgumentParser(description='检测热点区域分析工具')
  233. parser.add_argument('--csv', type=str, required=True, help='检测报告CSV文件路径')
  234. parser.add_argument('--output', type=str, default=None, help='输出目录路径')
  235. parser.add_argument('--resolution', type=str, default='3840x2160', help='图像分辨率,格式为WxH')
  236. parser.add_argument('--grid', type=str, default='32x18', help='热力图网格大小,格式为WxH')
  237. parser.add_argument('--camera', type=str, default=None, help='指定摄像头ID进行分析,不指定则分析所有摄像头')
  238. args = parser.parse_args()
  239. # 解析分辨率和网格大小
  240. try:
  241. width, height = map(int, args.resolution.split('x'))
  242. resolution = (width, height)
  243. except ValueError:
  244. print(f"无效的分辨率格式: {args.resolution},使用默认值")
  245. resolution = (1920, 1080)
  246. try:
  247. grid_width, grid_height = map(int, args.grid.split('x'))
  248. grid_size = (grid_width, grid_height)
  249. except ValueError:
  250. print(f"无效的网格大小格式: {args.grid},使用默认值32x18")
  251. grid_size = (32, 18)
  252. # 创建分析器并处理数据
  253. analyzer = HotspotAnalyzer(args.csv, args.output)
  254. analyzer.load_data()
  255. analyzer.process_data()
  256. # 生成热力图
  257. if args.camera:
  258. analyzer.generate_heatmap(args.camera, resolution, grid_size)
  259. else:
  260. analyzer.generate_all_heatmaps(resolution, grid_size)
  261. # 生成摘要报告
  262. analyzer.generate_summary()
  263. if __name__ == "__main__":
  264. main()