123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100 |
- """图像预处理模块"""
- import cv2
- import numpy as np
- from typing import Optional, Tuple
- from config.config import ModelConfig
- class ImageProcessor:
- """图像预处理器"""
-
- def __init__(self, config: ModelConfig):
- self.config = config
- self.input_size = config.input_size
- self.mean = np.array(config.mean, dtype=np.float32)
- self.std = np.array(config.std, dtype=np.float32)
-
- # 初始化CUDA加速
- self.use_cuda = cv2.cuda.getCudaEnabledDeviceCount() > 0
- self.gpu_frame = cv2.cuda_GpuMat() if self.use_cuda else None
-
- # 存储原始图像尺寸
- self.orig_h = 0
- self.orig_w = 0
-
- def preprocess(self, image_path: str) -> Optional[np.ndarray]:
- """图像预处理
-
- Args:
- image_path: 图像文件路径
-
- Returns:
- 预处理后的图像数据,格式为NCHW
- """
- try:
- if self.use_cuda:
- return self._preprocess_gpu(image_path)
- else:
- return self._preprocess_cpu(image_path)
- except Exception as e:
- print(f"预处理图像 {image_path} 时出错: {e}")
- return None
-
- def _preprocess_gpu(self, image_path: str) -> Optional[np.ndarray]:
- """GPU加速预处理"""
- # 读取图像到GPU
- cpu_image = cv2.imread(image_path)
- if cpu_image is None:
- print(f"无法加载图像:{image_path}")
- return None
-
- self.orig_h, self.orig_w = cpu_image.shape[:2]
-
- self.gpu_frame.upload(cpu_image)
- if self.gpu_frame.empty():
- print(f"无法上传图像到GPU:{image_path}")
- return None
-
- # GPU预处理流水线
- gpu_resized = cv2.cuda.resize(self.gpu_frame, self.input_size)
- gpu_rgb = cv2.cuda.cvtColor(gpu_resized, cv2.COLOR_BGR2RGB)
-
- # 下载到CPU进行后续处理
- image_orig = gpu_rgb.download()
- return self._normalize_image(image_orig)
-
- def _preprocess_cpu(self, image_path: str) -> Optional[np.ndarray]:
- """CPU预处理"""
- image_orig = cv2.imread(image_path)
- if image_orig is None:
- print(f"无法加载图像:{image_path}")
- return None
-
- self.orig_h, self.orig_w = image_orig.shape[:2]
- image_orig = cv2.resize(image_orig, self.input_size)
- image_orig = cv2.cvtColor(image_orig, cv2.COLOR_BGR2RGB)
-
- return self._normalize_image(image_orig)
-
- def _normalize_image(self, image: np.ndarray) -> np.ndarray:
- """图像归一化"""
- # 归一化到[0,1]
- image = image.astype(np.float32) / 255.0
-
- # 标准化
- image -= self.mean[None, None, :]
- image /= self.std[None, None, :]
-
- # 转换为CHW格式
- image = np.transpose(image, (2, 0, 1))
-
- # 添加batch维度
- return np.expand_dims(image, axis=0)
-
- def get_original_size(self) -> Tuple[int, int]:
- """获取原始图像尺寸
-
- Returns:
- (width, height)
- """
- return self.orig_w, self.orig_h
|