OpenCV常用方法介绍
图像显示图像处理颜色空间转换图像变换滤波与平滑图像阈值与二值化边缘检测形态学操作轮廓检测绘图功能特征检测与匹配视频处理图像金字塔模板匹配Hough变换图像分割与前景提取总结:实用工具函数场景一:工业视觉检测 - 产品缺陷检测 (智能制造 - 工业零件缺陷检测系统)算法融合检测流程检测算法融合策略①、边缘检测(Canny):适用于检测裂纹、边缘破损②、阈值分割:适用于表面污点、凹坑③、模板匹配:适用
OpenCV解决了什么?
解决的时:图像/视频的底层处理和预处理问题(不依赖深度学习)
- 图像I/O与显示:读取、保存、摄像头采集、窗口显示
- 几何变换:缩放、旋转、仿射/投射变换
- 颜色空间转换:RGB-HSV-LAB等
- 图像滤波与增强:高斯模糊、中值滤波、直方图均衡化
- 边缘检测与特征提取:Canny、Sobel;SIFT、ORB、Harris角点
- 形态学操作:腐蚀、膨胀、开闭运算
- 传统视觉算法:光流(稀疏/稠密)、背景建模(MOG2)、模板匹配、轮廓查找
安装OpenCV之前需要先安装numpy,matplotlib
由于一些经典算法被申请了版权,所以选用3.4.3以下版本
# pip install opencv-python==3.4.2.17
# 测试是否安装成功
import cv2
lena = cv2.imread("1.jpg")
cv2.imshow("image",lena)
cv2.waitKey(0)
# 利用SIFT和SURF等进行特征提取,还需要安装
pip in stall opencv-contrib-python==3.4.2.17


图像读取与保存
import numpy as np
import cv2 as cv2
import matplotlib.pyplot as plt
"""
cv.imread() 读取图像 如果加载的路径有错误,不会报错,会返回一个None值
读取方式的标志:
- cv.IMREADCOLOR 以彩色模式加载图像,任何图像的透明度都将被忽略,这是默认参数,可以使用1来替代
- cv.IMREADGRAYSCALE 以灰度模式加载图像,可以用0替代
- cv.IMREAD_UNCHANGED以alpha通道加载图像模式,用-1来替代
"""
# 读取图像,将图像文件读取为numpy数组
img = cv2.imread('image.jpg') # 返回numpy数组
img = cv2.imread('image.jpg', cv2.IMREAD_GRAYSCALE) # 灰度读取
img = cv2.imread('messi2.jpg',0)
# 保存图像,将numpy数组保存为图像文件
cv2.imwrite('output.jpg', img)
# 图像属性(行数、列数和通道数)
height, width = img.shape[:2] # 获取高宽
channels = img.shape[2] if len(img.shape) == 3 else 1 # 通道数
# 获取并修改图像中的像素点(通过行列的坐标值),对于
px = img[100,100] # 获取某个像素点的值
blue = img[100,100,0] # 仅获取蓝色通道的强度值
img[100,100] = [255,255,255] # 修改某个位置的像素值
# 算数操作
x = np.uint8([250])
y = np.uint8([10])
print(cv.add(x,y)) # OpenCV的操作是饱和操作 250+10=260——>255
print(x + y) # Numpy添加是模运算,250+10=260 % 256 = 4
rain = cv2.imread("./image/rain.jpg")
plt.imshow(rain[:,:,::-1])
view = cv.imread("./image/view.jpg")
plt.imshow(view[:,:,::-1])
img1 = cv.add(rain,view)
plt.imshow(img1[:,:,::-1])
img2 = rain + view
plt.imshow(img2[:,:,::-1])
# 图像混合
img3 = cv.addWeighted(img1,0.7,img2,0.3,0)
plt.figure(figsize=(8,8))
plt.imshow(img3[:,:,::-1])
plt.show()


# 读取彩色图像
img = cv2.imread('image.jpg')
# 读取灰度图像(节省内存和处理时间)
img_gray = cv2.imread('image.jpg',cv2.IMREAD_GRAYSCALE)
# 读取原始通道(包括alpha通道)
img_unchanged = cv2.imread('image.png',cv2.IMREAD_UNCHANGED)
# 保存为JPEG(有损压缩,较小文件)
cv2.imwrite('output.jpg',img,[cv2.IMWRITE_JPEG_QUALITY,95])
# 保存为PNG(无损压缩,支持透明度)
cv2.imwrite('output.png',img,[cv2.IMWRITE_PNG_COMPRESSION,9])
# 图像通道的拆分与合并
# 有时需要在B,G,R通道图像上单独工作,需要将BGR图像分割为单个通道,或者在其他情况下将这些通道合并到BGR图像
b,g,r = cv.split(img)
img = cv.merge((b,g,r))
# 色彩空间的改变
# OpenCV中又150多种颜色空间转换方法,最广泛使用的有两种,BGR<->Gray和BGR<->HSV
cv.cvtColor(input_image,flag)
图像显示
cv2.imshow('窗口标题', img) # 显示图像
cv2.waitKey(0) # 等待按键(0为无限等待,>0为毫秒数),给图像绘制留下时间,否则窗口会出现无响应情况,并且图像无法显示出来
cv2.destroyAllWindows() # 关闭所有窗口
# 创建/调整窗口
cv2.namedWindow('window', cv2.WINDOW_NORMAL)
cv2.resizeWindow('window', width, height)
# 调试和实时查看处理效果,创建窗口显示图像,等待用户交互
cv2.imshow('Original',img)
key = cv2.waitKey(0) # 0表示无限等待,直到按键
if key == ord('q'): # 按q键退出
cv2.destroyAllWindows()
elif key == ord('s'): # 按s键保存
cv2.imwrite('saved.jpg', img)
图像处理
颜色空间转换
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) # BGR转灰度(人脸检测、边缘检测等)
hsv = cv2.cvtColor(img, cv2.COLOR_BGR2HSV) # BGR转HSV(颜色分割、跟踪)
rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) # BGR转RGB(使用matplotlib显示时)
plt.imshow(rgb) # matplotlib使用RGB格式
# 提取特定颜色范围
lower_red = np.array([0, 50, 50])
upper_red = np.array([10, 255, 255])
mask = cv2.inRange(hsv, lower_red, upper_red)
# 将图像的像素点移动(50,100)的距离
import numpy as np
import cv2 as cv
import matplotlib.pyplot as plt
img1 = cv.imread("./image/image.jpg")
rows,cols = img1.shape(:2)
M = M = nplfloat32([[1,0,100],[0,1,50]])
dst = cv.warpAffine(img1,M,(cols,rows))
fig.axes = plt.subplots(nrow-1,ncols=2,figsize=(10,8),dpi=100)
axes[0].imshow(img1[:,:,::-1])
axes[0].set_title("原图")
axes[1].imshow(dst[:,:,::-1])
axes[1].set_title("平移后的结果")
plt.show()
图像变换
# 尺寸调整
resized = cv2.resize(img, (width, height)) # 指定目标尺寸
resized = cv2.resize(img, None, fx=0.5, fy=0.5) # 按比例缩放,一半
# 指定插值方法(影响质量)
resized_nearest = cv2.resize(img,(300,200),interpolation=cv2.INTER_NEAREST) # 最近邻,速度快
resized_cubic = cv2.resize(img, (300, 200),interpolation=cv2.INTER_CUBIC) #
# 旋转
M = cv2.getRotationMatrix2D(center, angle, scale) # 获取旋转矩阵
rotated = cv2.warpAffine(img, M, (w, h))
(h,w) = img.shape[:2] # 旋转图像
center = (w // 2,h // 2)
M = cv2.getRotationMatrix2D(center,45,1.0) # 旋转45度
rotated = cv2.warpAffine(img,M,(w,h))
# 仿射变换/透视变换
M = cv2.getPerspectiveTransform(src_pts, dst_pts)
warped = cv2.warpPerspective(img, M, (w, h))
import cv2 as cv
img1 = cv.imread("./image/dog.jpeg")
"""
cv2.INTER_LINEAR 双线性插值法
cv2.INTER_NEAREST 最近邻插值
cv2.INTER_AREA 像素区域重采样(默认)
cv2.INTER_CUBIC 双三次插值
"""
# 图像缩放,绝对尺寸
rows,cols = cv.resize(imgq,(2*cols,2*rows),interpolation=cv.INTER_CUBIC)
res = cv.resize(img1,(2*cols,2*rows),interpolation=cv.INTER_CUBIC)
# 相对尺寸
res1 = cv.resize(img1,None,fx=0.5,fy=0.5)
# 图像显示(opencv显示图像,不推荐)
cv.imshow("orignal",img1)
cv.imshow("enlarge",res)
cv.imshow("shrink",res1)
cv.waitKey(0)
# matplotlib显示图像
fig,axes = plt.subplots(nrows=1,ncols=3,figsize(10,8),dpi=100)
axes[0].imshow(res[:,:,::-1])
axes[0].set_title("绝对尺度(放大)")
axes[1].imshow(img1[:,:,::-1])
axes[1].set_title("原图")
axes[2].imshow(res1[:,:,::-1])
axes[2].set_title("相对尺度(缩小)")
plt.show()
"""
旋转案例
"""
img = cv.imread("./image.image2.jpg")
rows,cols = img.shape(:2)
M = cv.getRotationMatrix2D((cols/2,rows/2),90,1) # 生成旋转矩阵
dst = cv.warpAffine(img,M,(cols,rows)) # 进行旋转变换
fig,axes = plt.subplots(nrow=1,ncols=2,figsize=(10,8),dpi=100)
axes[0].imshow(res[:,:,::-1])
axes[0].set_title("原图")
axes[1].imshow(img1[:,:,::-1])
axes[1].set_title("旋转后结果")
plt.show()
"""
仿射变换
"""
img = cv.imread("./iamge/image2.jpg")
rows,cols = img.shape[:2]
pts1 = np.float32([[50,50],[200,50],[50,200]]) # 创建三个点
pts2 = np.float32([[100,100],[200,50],[100,250]])
M = cv.getAffineTransform(pts1,pts2)
dst = cv.warpAffine(img,M,(cols,rows))
fig,axes = plt.subplots(nrow=1,ncols=2,figsize=(10,8),dpi=100)
axes[0].imshow(res[:,:,::-1])
axes[0].set_title("原图")
axes[1].imshow(img1[:,:,::-1])
axes[1].set_title("仿射后结果")
plt.show()
"""
图像的透射变换
"""
img = cv.imread("./image/image2.jpg")
rows,cols = img.shape[:2]
pts1 = np.float32([[56,65],[368,52],[28,387],[389,390]]) # 4个点
pts2 = np.float32([[100,145],[300,100],[80,290],[310,300]])
T = cv.getPerspectiveTransform(pts1,pts2) # 转换矩阵
dst = cv.warpPerspective(img,T,(cols,rows))
fig,axes = plt.subplots(nrow=1,ncols=2,figsize=(10,8),dpi=100)
axes[0].imshow(res[:,:,::-1])
axes[0].set_title("原图")
axes[1].imshow(img1[:,:,::-1])
axes[1].set_title("仿射后结果")
plt.show()
滤波与平滑
blur = cv2.GaussianBlur(img, (5,5), 0) # 高斯模糊,核大小为(5,5)标准差自动计算
#在边缘检测前适用,减少噪声影响
blurred = cv2.GaussianBlur(gray,(3,3),0) # 进行高斯模糊减少噪声
edges = cv2.Canny(blurred,50,150)
# 去除椒盐噪声,用中指替代中心像素值
median = cv2.medianBlur(img, 5) # 中值滤波 (核大小应为奇数)
# 需要保留边缘的平滑处理,在平滑的同时保持边缘清晰
bilateral = cv2.bilateralFilter(img, 9, 75, 75) # 双边滤波(参数:图像,邻域直径,颜色空间标准差,坐标空间标准差)
"""
图像噪声类型
椒盐噪声,随机出现的白点或者黑点
高斯噪声,噪声密度函数服从高斯分布
去除噪声的方案——图像平滑,去除高频信息,保留低频信息
- 均值滤波
- 高斯滤波
- 种植滤波
- 双边滤波
"""
import cv2 as cv
import numpy as np
from matploglib import pyplot as plt
img = cv.imread("./image/dogsp.jpeg")
blur = cv.blur(img,(5,5)) # 均值滤波
plt.figure(figsize=(10,8),dpi=100)
plt.subplot(121),plt.imshow(img[:,:,::-1]),plt.title('原图')
plt.xticks([]),plt.yticks([])
plt.subplot(122),plt.imshow(blur[:,:,::-1]),plt.title("均值滤波后结果")
plt.xticks([]),plt.yticks([])
plt.show()
blur = cv.GaussianBlur(img,(3,3),1) # 高斯滤波
plt.figure(figsize=(10,8),dpi=100)
plt.subplot(121),plt.imshow(img[:,:,::-1]),plt.title('原图')
plt.sticks([]),plt.yticks([])
plt.subplot(122),plt.imshow(blur[:,:,::-1]),plt.title('高斯滤波后结果')
plt.xticks([]),plt.yticks([])
plt.show()
blur = cv.medianBlur(img,5) # 中值滤波





图像阈值与二值化
# 全局阈值--适用于光照均匀的场景
ret, thresh = cv2.threshold(gray, 127, 255, cv2.THRESH_BINARY)
# Otsu阈值--自动寻找最佳阈值
ret,thresh = cv2.threshold(gray,0,255,cv2.THRESH_BINARY + cv2.THRESH_OTSU)
# 自适应阈值(适用于光照不均匀的场景)
thresh = cv2.adaptiveThreshold(gray, 255,
cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
cv2.THRESH_BINARY, 11, 2)
直方图

"""
对数据进行统计的一种方法,并且将统计值组织到一系列定义好的bin当中
灰度直方图:直方图是根据灰度图进行绘制,而不是彩色图像
参数中掩膜,提取感兴趣区域,屏蔽作用,结构特征提取,特殊形状图像制作
"""
import numpy as np
from cv2 as cv
from matplotlib import pyplot as plt
img = cv.imread('./image/cat.jpeg',0) # 直接以灰度图的方式读入
plt.imshow(img,cmap=plt.cm.gray) # 显示灰度图
histr = cv.calcHist([img],[0],None,[256],[0,256]) # 统计灰度图(完整图像的直方图)
plt.figure(figsize=(10,6),dpi=100)
plt.plot(histr) # 折线图
plt.grid()
plt.show()
# 如果要查找图像某些区域的直方图,只需要在要查找的直方图的区域上创建一个白色的掩膜图像,否则创建黑色,然后将其作为掩码mask传递即可
# 创建蒙版
mask = np.zeros(img.shape[:2],np.uint8)
mask[400:650,200:500] = 255
# 掩膜
masked_img = cv.bitwise_and(img,img,mask=mask)
#统计掩膜后图像的灰度图
mask_histr = cv.calcHist([img],[0],mask,[256],[1,256])
fig,axes = plt.subplots(nrows=2,ncols=2,figsize=(10,8))
axes[0,0].imshow(img,cmap=plt.cm.gray)
axes[0,0].set_title('原图')
axes[0,1].imshow(mask,cmpa=plt.cm.gray)
axes[0,1].set_title('蒙版数据')
axes[1,0].imshow(masked_img,cmap=plt.cm.gray)
axes[1,0].set_title("掩膜后数据")
axes[1,1].plot(mask_histr)
axes[1,1].grid()
axes[1,1].set_title("灰度直方图")
plt.show()
dst = cv.equalizeHist(img) # 均衡化处理



"""
自适应均衡化
"""
import numpy as np
import cv2 as cv
img = cv.imread('./image/cat.jpeg',0)
clahe = cv.createCLAHE(clipLimit=2.0,tileGridSize=(0,0))
cl1 = clahe.apply(img)
fig,axes = plt.subplots(nrows=1,ncols=2,figsize=(10,8),dpi=100)
axes[0].imshow(img,cmap=plt.cm.gray)
axes[0].set_title("原图")
axes[1].imshow(cl1,cmap=plt.cm.gray)
axes[1].set_title("自适应均衡化的结果")
plt.show()

边缘检测

# threshold1: 低阈值,threshold2: 高阈值
# 低于threshold1的丢弃,高于threshold2的保留,中间值看是否连接
edges = cv2.Canny(gray, threshold1=50, threshold2=150) # Canny边缘检测
sobelx = cv2.Sobel(gray, cv2.CV_64F, 1, 0, ksize=5) # Sobel算子
laplacian = cv2.Laplacian(gray, cv2.CV_64F) # Laplacian算子
"""
图像边缘检测大幅度减少了数据量,并且剔除了可以认为不相关的信息,保留了图像的重要结构属性
用于边检检测的方法分为两类:基于搜索核基于零穿越
- 基于搜索,通过寻找图像一阶导数中的最大值来检测边界,然后利用计算结果估计边缘局部方向,通常采用梯度的方向,并利用此方向找到局部梯度模的最大值,代表算法Sobel算子和Scharr算子(利用一阶导数的最大值获取边界)
- 基于零穿越,通过寻找图像二阶导数零穿越来寻找边界,代表算法Laplacian算子(利用二阶导数为0获取边界)
Sobel算子,基于搜索的方法获取边界
- cv.sobel()
- cv.convertScaleAbs()
- cv.addweights()
Laplacian算子,基于零穿越获取边界
- cv.Laplacian()
Canny算法流程:(4步)
1. 噪声去除:高斯滤波
2. 计算图像梯度:sobel算子,计算梯度大小和方向
3. 非极大值抑制:利用梯度方向像素来判断当前像素是否为边界点
4. 滞后阈值:设置两个阈值,确定最终边界
"""
import cv2 as cv
import numpy as np
from matplotlib import pyplot as plt
img = cv.imread('./image/horse.jpg',0)
x = cv.Sobel(img,cv.CV_165,1,0) # 计算Sobel卷积结果
y = cv.Sobel(img,cv.CV_165,0,1)
Scale_absX = cv.covertScaleAbs(x) # convert转换scale缩放
Scale_absY = cv.convertScaleAbs(y)
result = cv.addWeighted(Scale_absX,0.5,Scale_absY,0.5,0)
plt.figure(figsize=(10,8),dpi=100)
plt.subplot(121),plt.imshow(img,cmap=plt.cm.gray),plt.title('原图')
plt.xticks([]),plt.yticks([])
plt.subplot(122),plt.imshow(result,cmap=plt.cm.gray),plt.title('Sobel滤波后结果')
plt.xticks([]),plt.yticks([])
plt.show()
# 将上述代码中计算sobel算子的部分中将ksize设为-1,就是利用Scharr进行边缘检测
x = cv.Sobel(img,cv.CV_165,1,0,ksize=-1)
y = cv.Sobel(img,cv.CV_165,0,1,ksize=-1)
result = cv.Laplacian(img,cv.CV_165)
Scale_abs = cv.convertScaleAbs(result)



"""
边缘检测算法4步构成:
1. 噪声去除,由于边缘检测容易收到噪声影像,所以首先使用5*5高斯滤波去除噪声
2. 计算图像梯度,对平滑后的图像使用Sobel算子计算水平方向和竖直方向的一阶导数,根据得到的两幅梯度图找到边界梯度和方向
3. 非极大值抑制
4. 滞后阈值
"""
res = cv.Canny(img,0,100)
plt.imshow(res,cmap=plt.cm.gray)

形态学操作



kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (5,5))
eroded = cv2.erode(img, kernel) # 腐蚀 消除小的白色噪点
dilated = cv2.dilate(img, kernel) # 膨胀 连接断开区域
opened = cv2.morphologyEx(img, cv2.MORPH_OPEN, kernel) # 开运算 先腐蚀后膨胀,去除小物体
closed = cv2.morphologyEx(img, cv2.MORPH_CLOSE, kernel) # 闭运算 先膨胀后腐蚀,填充小孔
"""
膨胀就是使图像高亮部分扩张,效果图拥有比原图更大的高亮区域(求局部最大值的操作)
腐蚀使原图中高亮区域被蚕食,效果图拥有比原图更小的高亮区域(求局部最小值的操作)
"""
# 使用5*5卷积核实现腐蚀和膨胀的运算
import numpy as np
import cv2 as cv
import matploglib.pyplot as plt
img = cv.imread("./image/image.png")
kernel = np.ones((5,5),np.uint8) # 创建核结构
erosion = cv.erode(img,kernel) # 腐蚀
dilate = cv.dilate(img,kernel) # 膨胀
fig,axes = plt.subplots(nrows1,ncols=3,figsize=(10,8),dpi=100)
axes[0].imshow(img)
axes[0].set_title("原图")
axes[1].imshow(erosion)
axes[1].set_title("腐蚀后结果")
axes[2].imshow(dilate)
axes[2].set_title("膨胀后结果")
plt.show()
"""
开运算:先腐蚀,后膨胀。作用是分离物体,消除小区域,去除噪点,去除小的干扰块,而不影响原来的图像
闭运算:先膨胀,后腐蚀。作用是消除闭合物体里面的孔洞,可以填充闭合区域
"""
img1 = cv.imread("./image/image5.png")
img2 = cv.imread("./image/image6.png")
kernel = np.ones((10,10),np.unit8) # 创建核
cvOpen = cv.morphologyEx(img,cv.MORPH_OPEN,kernel)
cvClose = cv.morphologyEx(img,cv.MORPH_CLOSE,kernel)
fig,axes = plt.subplots(nrows=2,ncols=2,figsize=(10,8))
axes[0,0].imshow(img1)
axes[0,0].set_title("原图")
axes[0,1].imshow(cvOpen)
axes[0,1].set_title("开运算结果")
axes[1,0].imshow(img2)
axes[1,0].set_title("原图")
axes[1,1].imshow(cvClose)
axes[1,1].set_title("闭运算结果")
plt.show()
"""
礼帽:原图像与“开运算”的结果图之差
开运算带来的结果是放大了裂缝或者局部低亮度的区域,因此从原图中减去开运算后的图得到的效果突出了原图轮廓周围的区域更明亮的区域,这一操作的核的大小相关
礼帽运算用来分离比邻近点亮一些的斑块,当一幅图像具有大幅的背景的时候,而微小物品比较有规律的情况下,可以使用礼帽运算进行背景提取。
黑帽:原图与“闭运算”的结果图之差
黑帽运算后的效果突出了比原图轮廓周围的区域更暗的区域,这一操作与选择的核大小相关
黑帽运算用来分离比邻近点暗一些的斑块
"""
img1 = cv.imread()
img2 = cv.imread()
kernel = np.ones()
cvOpen = cv.morphologyEx(img1,cv.MORPH_TOPHAT,kernel) # 礼帽运算
cvClose = cv.morphologyEx(img2,cv.MORPH_BLACKHAT,kernel) # 黑帽运算
fig,axes = plt.subplots(nrows=2,ncols=2,figsize=(10,8))
axes[0,0].imshow(img1)
axes[0,0].set_title("原图")
axes[0,1].imshow(cvOpen)
axes[0,1].set_title("礼帽运算结果")
axes[1,0].imshow(img2)
axes[1,0].set_title("原图")
axes[1,1].imshow(cvClose)
axes[1,1].set_title("黑帽运算结果")
plt.show()
轮廓检测
# 查找轮廓
# RETR_EXTERNAL: 只检测外轮廓
# RETR_TREE: 检测所有轮廓并建立层级关系
# CHAIN_APPROX_SIMPLE: 压缩水平、垂直和对角线段,只保留端点
contours, hierarchy = cv2.findContours(thresh,
cv2.RETR_TREE,
cv2.CHAIN_APPROX_SIMPLE)
# 遍历轮廓
for i, cnt in enumerate(contours):
area = cv2.contourArea(cnt)
if area > 100: # 过滤小面积轮廓
# 绘制轮廓
cv2.drawContours(img, [cnt], 0, (0, 255, 0), 2)
# 获取外接矩形
x, y, w, h = cv2.boundingRect(cnt)
cv2.rectangle(img, (x, y), (x+w, y+h), (255, 0, 0), 2)
# 获取最小外接矩形(可旋转)
rect = cv2.minAreaRect(cnt)
box = cv2.boxPoints(rect)
box = np.int0(box)
cv2.drawContours(img, [box], 0, (0, 0, 255), 2)
# 绘制轮廓
cv2.drawContours(img, contours, -1, (0,255,0), 2)
# 轮廓特征
area = cv2.contourArea(cnt) # 面积
perimeter = cv2.arcLength(cnt, True) # 周长
x,y,w,h = cv2.boundingRect(cnt) # 外接矩形
绘图功能
"""
cv.line(img,start,end,color,thickness) 绘制直线
- img:要绘制直线图像
- start,end:直线的起点和终点
- color:线条颜色
- Thickness:线条宽度
cv.circle(img,centerpoint,r,color,thickness) 绘制圆形
- img:要绘制圆形的图像
- centerpoint,r:圆心和半径
- color:线条的颜色
- Thickness:线条宽度,为-1时生成闭合图案并填充颜色
cv.rectangle(img,leftupper,rightdown,color,thickness) 绘制矩形
- img:要绘制矩形的图像
- Leftupper,rightdown:矩形的左上角和右下角坐标
- color:线条的颜色
- Thickness:线条宽度
cv.putText(img,text,station,font,fontsize,color,thickness,cv.LINE_AA) 向图像中添加文字
- img:图像
- text:要写入的文本数据
- station:文本的放置位置
- font:字体
- Fontsize:字体大小
"""
import numpy as np
import cv2 as cv
import matplotlib.pyplot as plt
img = np.zeros((512,512,3),np.uint8)
# 创建图形
cv.line(img,(0,0),(511,511),(255,0,0),5)
cv.circle(img,(256,256),60,(0,0,255),4)
cv.rectangle(img,(100,100),(400,400),(0,255,0),5)
cv.putText(img,"hello",(100,150),cv.FONT_HERSHEY_COMPLEX,5,(255,255,255),3)
# 显示结果
plt.imshow(img[:,:,::-1])
plt.show()
特征检测
Harris算法
- 通过图像的局部小窗口观察图像,角点的特征是窗口沿任意方向移动都会导致图像灰度明显变化
- API:cv.cornerHarris()
Shi-Tomasi算法
- 对Harris算法的改进,能够更好地检测角点
- API:cv.goodFeatureToTrack()
# 关键点检测
sift = cv2.SIFT_create() # 初始化SIFT检测器
kp, des = sift.detectAndCompute(gray, None) # 检测关键点和计算描述符
# 关键点绘制
img_kp = cv2.drawKeypoints(img, kp, None)
img_kp2 = cv2.drawKeypoints(img, keypoints, None,
flags=cv2.DRAW_MATCHES_FLAGS_DRAW_RICH_KEYPOINTS)
# 特征匹配
bf = cv2.BFMatcher(cv2.NORM_HAMMING, crossCheck=True)
matches = bf.match(des1, des2)
# 按距离排序
matches = sorted(matches,key=lambda x:x.distance)
# 绘制最佳匹配
img_matches = cv2.drawMatches(img1, kp1, img2, kp2,
matches[:10], None,
flags=cv2.DrawMatchesFlags_NOT_DRAW_SINGLE_POINTS)
"""
角点检测
"""
import cv2 as cv
import numpy as np
import matplotlib.pyplot as plt
img = cv.imread('./image/chessboard.jpg')
gray = cv.cvtColor(img,cv.COLOR_BGR2GRAY)
gray = np.float32(gray) # 角点检测,输入图像必须讲过float32
dst = cv.cornerHarris(gray,2,3,0.04) # 最后一个参数在0.04-0.05之间
img[dst>0.001*dst.max()] = [0,0,255] # 设置阈值,将角点绘制出来,阈值根据图像进行选择
plt.figure(figsize=(10,8),dpi=100)
plt.imshow(img[:,:,::-1]),plt.title("Harris角点检测")
plt.xticks([]),plt.yticks([])
plt.show()

"""
Shi-Tomas算法
"""
import cv2 as cv
import numpy as np
import matplotlib.pyplot as plt
img = cv.imread('./image/tv.jpg')
gray = cv.cvtColor(img,cv.COLOR_BGR2GRAY)
corners = cv.goodFeaturesToTrack(gray,1000,0.01,10) # 角点检测
# 绘制角点
for i in corners:
x,y = i.ravel()
cv.circle(img,(x,y),2,(0,0,255),-1)
plt.figure(figsize=(10,8),dpi=100)
plt.imshow(img[:,:,::-1]),plt.title('shi-tomasi角点检测')
plt.xticks([]),plt.yticks([])
plt.show()

上述的Harris和Shi-Tomasi角点检测算法具有旋转不变性,但不具有尺度不变性
SIFT算法在不同尺度空间上查找关键点(特征点),并计算出关键点的方向
import cv2 as cv
import numpy as np
import matplotlib.pyplot as plt
img = cv.imread('./image/tv.jpg')
gray = cv.cvtColor(img,cv.COLOR_BGR2GRAY)
sift = cv.xfeatures2d.SIFT_create() # 实例化sift对象
kp,des = sift.detectAndCompute(gray,None) # 关键点检测,包括方向、尺度,位置信息
# 在图像上绘制关键点的检测结果
cv.drawKeypoints(img,kp,img,flags=cv.DRAW_MATCHES_FLAGS_DRAW_RICH_KEYPOINTS)
plt.figure(figsize=(8,6),dpi-100)
plt.imshow(img[:,:,::-1]),plt.title('sift检测')
plt.xticks(),plt.yticks()
plt.show()

FAST和ORB算法
上述几个特征检测器效果都很好,特别是SIFT和SURF算法,但是从实时角度来看,效率太低
Fast算法
- 若一个像素周围有一定数量的像素与该点像素值不同,则认为其为角点
- cv.FastFeatureDetector_create()
ORB算法
- 是FAST算法和BRIEF算法结合
- cv.ORB_create()
FAST算法,拥挤角点检测,取图像中监测点,以该点为圆心的周围邻域内像素点判断监测点是否为角点,若一个像素周围有一定数量的像素与该点像素值不同,则认为其为角点
import cv2 as cv
import numpy as np
import matplotlib.pyplot as plt
img = cv.imread('./image/tv.jpg')
fast = cv.FastFeatureDetector_create(threshold=30) # 创建一个Fast对象,传入阈值(可以处理彩色空间图像)
kp = fast.detect(img,None) # 检测图像上的关键点
img2 = cv.drawKeypoints(img,kp,None,color=(0,0,255))
print("Threshold:{}".format(fast.getThreshold()))
print("nonmaxSuppression:{}".format(fast.getNonmaxSuppression()))
print("neighborhood:{}".format(fast.getType()))
print("Total Keypoints with nonmaxSuppression:{}".format(len(kp)))
# 关闭非极大值抑制
fast.setNonmaxSuppression(0)
kp = fast.detect(img,None)
print("Total Keypoints without nonmaxSuppression:{}".format(len(kp)))
# 绘制为进行非极大值抑制的结果
img3 = cv.drawKeypoints(img,kp,None,color=(0,0,255))
fig.axes = plt.subplots(nrow=1,ncols=2,figsize=(10,8),dpi=100)
axes[0].imshow(img2[:,:,::-1])
axes[0].set_title("加入非极大值抑制")
axes[1].imshow(img3[:,:,::-1])
axes[1].set_title("未加入非极大值抑制")
plt.show()

SIFT和SURF算法受专利保护,使用需要付费
ORB不需要,所以用来对图像中关键点快速创建特征向量,并用这些向量来识别图像中的对象
import cv2 as cv
import numpy as np
import matplotlib.pyplot as plt
img = cv.imread('./image/tv.jpg')
orb = cv.ORB_create(nfeatures=500)
kb,des = orb.detectAndCompute(img,None)
print(des.shape)
img2 = cv.drawKeypoints(img,kp,None,color=(0,0,255),flags=0)
plt.figure(figsize=(10,8),dpi=100)
plt.imshow(img2[:,:,::-1])
plt.xticks([]),plt.yticks([])
plt.show()

视频处理
读取视频
- 读取视频:cap = cv.VideoCapture()
- 判断读取成功:cap.isOpened()
- 读取每一帧图像:ret,frame = cap.read()
- 读取属性:cap.get(proid)
- 设置属性:cap.set(proid,value)
- 资源释放:cap.release()
保存视频
- 保存视频:out = cv.VideoWrite()
- 视频写入:out.write()
- 资源释放:out.release()
# 视频捕获
cap = cv2.VideoCapture(0) # 0为摄像头索引 (默认摄像头)
# 设置摄像头参数
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)
cap.set(cv2.CAP_PROP_FPS, 30)
while True:
# 读取一帧
ret, frame = cap.read()
if not ret:
break
# 处理帧
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
# 显示结果
cv2.imshow('Live Video', gray)
# 按键退出
if cv2.waitKey(1) & 0xFF == ord('q'):
break
ret, frame = cap.read()
# 视频属性
fps = cap.get(cv2.CAP_PROP_FPS)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
# 视频写入
fourcc = cv2.VideoWriter_fourcc(*'XVID')
out = cv2.VideoWriter('output.avi', fourcc, 20.0, (640,480))
# 释放资源
cap.release()
cv2.destroyAllWindows()
import numpy as np
import cv2 as cv
cap = cv.VideoCauture("DOG.wmv") # 获取视频对象
while(cap.isOpened()): # 判断是否读取成功
ret,frame = cap.read() # 获取每一帧图像
if ret == True: # 获取成功显示图像
cv.imshow('frame',frame)
if cv.waitKey(25) & 0xFF == ord('q'): # 每一帧间隔为25ms
break
cap.release() # 释放视频对象
cv.destoryAllwindows()
"""
读取视频之后,保存视频
"""
# 获取图像的属性(宽和高),并将其转换为整数
frame_width = int(cap.get(3))
frame_height = int(cap.get(4))
# 创建保存视频的对象,设置编码格式、帧率、图像的宽高等
out = cv.VideoWriter('outpy.avi',cv.VideoWriter_fourcc('M','J','P','G'),10,(frame_width,frame_height))
while(True):
ret,frame = cap.read()
if reg == True:
out.write(frame)
else:
break
cap.release()
out.release()
cv.destrbyAllWindows()
图像金字塔
# 高斯金字塔
lower = cv2.pyrDown(img) # 下采样
higher = cv2.pyrUp(img) # 上采样
"""
图像多尺度表达的一种,用于图像分割,以多分辨率来解释图像的有效但概念简单的结构
图像金字塔用于机器视觉和图像压缩,一幅图像的金字塔是一系列以金字塔形状排列的分辨率逐步降低,且来源于同一种原始图像集合
"""
import numpy as np
import cv2 as cv
import matplotlib.pyplot as plt
img = cv.imread("./iamge/image2.jpg")
up_img = cv.pyrUp(img)
img_1 = cv.pyrDown(img)
cv.imshow('enlarge',up_img)
cv.imshow('original',img)
cv.imshow('shrink',img_1)
cv.waitKey(0)
cv.destroyAllWindows()
模板匹配
模板匹配
- 原理:在给定的图片中查找和模板最相似的区域
- API:利用cv.matchTemplate()进行模板匹配,然后使用cv.minMaxLoc()搜索最匹配的位置
霍夫线检测
- 原理:将要检测的内容转换到霍夫空间,利用累加器统计最优解,将检测结果表示处理
- API:cv2.HoughLines()
- 该方法输入的是二值化图像,在进行检测前要将图像进行二值化处理
霍夫圆检测
- 霍夫梯度法
- API:cv.HoughCircles()
res = cv2.matchTemplate(img, template, cv2.TM_CCOEFF_NORMED)
min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(res)
# 在图像中查找特定模板,如Logo检测、屏幕定位
# 在输入图像中搜索模板图像
# 加载图像和模板
img = cv2.imread('scene.jpg', 0)
template = cv2.imread('template.jpg', 0)
w,h = template.shape(::-1)
# 执行模板匹配
# TM_CCOEFF_NORMED: 归一化相关系数匹配,返回最佳匹配位置
res = cv2.matchTemplate(img,template,cv2.TM_CCOEFF_NORMED)
# 设置阈值
threshold = 0.8
loc = np.where(res >= threshold)
# 绘制匹配区域
for pt in zip(*loc[::-1]):
cv2.rectangle(img, pt, (pt[0] + w, pt[1] + h), (0, 255, 0), 2)

import cv2 as cv
import numpy as np
from matplotlib import pyplot as plt
img = cv.imread('./image/wulin2.jpg')
template = cv.imread('./image/wulin.jpeg')
h,w,l = template.shape
res = cv.matchTemplate(img,template,cv.TM_CCORR)
min_val,max_val,min_loc,max_loc = cv.minMaxLoc(res)
top_left = max_loc
bottom_right = (top_left[0] + w,top_left[1] + h)
cv.rectangle(img,top_left,bottom_right,(0,255,0),2)
plt.imshow(img[:,:,::-1])
plt.title('匹配结果'),plt.xticks([]),plt.yticks([])
plt.show()

霍夫变换

import numpy as np
import random
import cv2 as cv
import matplotlib.pyplot as plt
img = cv.imread('./image/rili.jpg')
gray = cv.cvtColor(img,cv.COLOR_BGR2GRAY)
edges = cv.Canny(gray,50,150)
lines = cv.HoughLines(eges,0.8,np.pi / 180,150) # 霍夫直线变换
for line in lines: # 将检测的线绘制在图像上,(注意是极坐标)
rho,theta = line[0]
a = np.cos(theta)
b = np.sin(theta)
x0 = a * rho
y0 = b * rho
x1 = int(x0 + 1000 * (-b))
y1 = int(y0 + 1000 * (a))
x2 = int(x0 - 1000 * (-b))
y2 = int(y0 - 1000 * (a))
cv.line(img,(x1,y1),(x2,y2),(0,255,0))
plt.figure(figsize=(10,8),dpi=100)
plt.imshow(img[:,:,::-1],plt.title("霍夫变换线检测"))
plt.xticks([]),plt.yticks([])
plt.show()

# 由于霍夫圆对噪声比较敏感,所以首先对图像进行中值滤波
import cv2 as cv
import numpy as np
import matploglib.pyplot as plt
planets = cv.imread("./image/star.jpeg")
gay_img = cv.cvtColor(planets,cv.COLOR_BGRA2GRAY)
img = cv.medianBlur(gay_img,7) # 进行中值模糊,去噪点
circles = cv.HoughCircles(img,cv.HOUGH_GRADIENT,1,200,param1=100,param2=30,minRadius=0,maxRadius=100)
for i in circles[0,:]: # 遍历矩阵每一行数据
cv.circle(planets,(i[0],i[1]),i[2],(0,255,0),2) # 绘制圆形
cv.circle(planets,(i[0],i[1]),2,(0,0,255),3) # 绘制圆心
plt.figure(figsize=(),dip=100)
plt.imshow(planets[:,:,::-1]),plt.title('霍夫变换圆检测')
plt.xticks([]),plt.yticks([])
plt.show()

Hough变换
# 直线检测
lines = cv2.HoughLines(edges, rho, theta, threshold)
lines = cv2.HoughLines(edges, 1, np.pi/180, threshold=100)
if lines is not None:
for line in lines:
rho,theta = line[0]
a = np.cos(theta)
b = np.sin(theta)
x0 = a * rho
y0 = b * rho
x1 = int(x0 + 1000 * (-b))
y1 = int(y0 + 1000 * (a))
x2 = int(x0 - 1000 * (-b))
y2 = int(y0 - 1000 * (a))
cv2.line(img, (x1, y1), (x2, y2), (0, 0, 255), 2)
# 圆检测
circles = cv2.HoughCircles(gray, cv2.HOUGH_GRADIENT, dp, minDist)
circles = cv2.HoughCircles(blurred, cv2.HOUGH_GRADIENT, dp=1,
minDist=20, param1=50, param2=30,
minRadius=0, maxRadius=0)
if circles is not None:
circles = np.uint16(np.around(circles))
for circle in circles[0, :]:
cv2.circle(img, (circle[0], circle[1]), circle[2], (0, 255, 0), 2)
图像分割与前景提取
# GrabCut算法
mask = np.zeros(img.shape[:2], np.uint8) # 初始化掩码
# 初始化背景和前景模型
bgdModel = np.zeros((1,65), np.float64)
fgdModel = np.zeros((1,65), np.float64)
# 定义矩形区域(包含前景物体)
rect = (50,50,450,290) # (x, y, width, height)
cv2.grabCut(img, mask, rect, bgdModel, fgdModel, 5, cv2.GC_INIT_WITH_RECT)
# 创建掩码:0=背景,1=前景,2=可能的背景,3=可能的前景
mask2 = np.where((mask == 2) | (mask == 0), 0, 1)).astype('uint8')
# 应用掩码
result = img * mask2[:, :, np.newaxis]
总结:
# OCR预处理完整示例
def preprocess_for_ocr(image_path):
# 1.读取图像
img = cv2.imread(image_path)
# 2.转换为灰度
gray = cv2.cvtColor(img,cv2.COLOR_BGR2GRAY)
# 3.去噪
denoised = cv2.medianBlur(gray,3)
# 4.自适应阈值(处理光照不均)
binary = cv2.adaptiveThreshold(denoised, 255,
cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
cv2.THRESH_BINARY, 11, 2)
# 5.形态学操作去除噪点
kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (2, 2))
cleaned = cv2.morphologyEx(binary, cv2.MORPH_CLOSE, kernel)
# 6. 倾斜校正(可选)
# ... 使用霍夫变换检测文本角度
return cleaned
# 使用
processed_img = preprocess_for_ocr('document.jpg')
meanshift和Camshift进行目标追踪
import numpy as np
import cv2 as cv
cap = cv.VideoCapture('DOG.wmv')
ret,frame = cap.read() # 获取第一帧图像,并指定目标位置
r,h,c,w = 197,141,0,208 # 目标位置(行,高,列,宽)
track_window = (c,r,w,h)
roi = frame[f:f+h,c:c+w] # 指定目标感兴趣区域
hsv_roi = cv.cvtColor(roi,cv.COLOR_BGR2HSV) # 转换色彩空间(HSV)
# mask = cv.inRange(hsv_roi,np.array([0.,60.,32.]),np.array((180.,255.,255.)))
roi_hist = cv.calcHist([hsv_roi],[0],None,[180],[0,180]) # 计算直方图
cv.normalize(roi_hist,roi_hist,0,255,cv.NORM_MINMAX) # 归一化
# 目标追踪
term_crit = (cv.TERM_CRITERIA_EPS | cv.TERM_CRITERIA_COUNT,10,1) # 设置窗口搜索终止条件,最大迭代次数,窗口中心漂移最小值
while(True):
ret,frame = cap.read() # 获取每一帧图像
if ret == True:
# 计算直方图反向投影
hsv = cv.cvtColor(frame,cv.COLOR_BGR2HSV)
dst = cv.calcBackProject([hsv],[0],roi_hist,[0,180],1)
# 进行meanshift追踪
ret,track_window = cv.meanShift(dst,track_window,term_crit)
x,y,w,h = track_window
img2 = cv.rectangle(frame,(x,y),(x+w,y+h),255,2)
cv.imshow('frame',img2)
if cv.waitKey() & 0xFF == ord('q'):
break
else:
break
cap.release()
cv.destroyAllWindows()

上述方式检测窗口是大小固定的,而视频中由远及近是一个逐渐变小的过程,固定窗口不合适
所以需要根据目标大小和角度进行对窗口进行修正,CamShift解决
"""
CamShift算法(连续自适应算法),是对MeanShift算法的改进,可随着追踪目标的大小变化实时调整搜索窗口的大小
将meanshift函数改为Canshift即可
"""
# Camshift
ret,track_window = cv.meanShift(dst,track_window,term_crit)
x,y,w,h = track_window
img2 = cv.rectangle(frame,(x,y),(x+w,y+h),255,2)
# 改为:
ret,track_window = v.CamShift(dst,track_window,term_crit)
pts = cv.boxPoints(ret)
pts = np.int0(pts)
img2 = cv.polylines(frame,[pts],True,255,2)
人脸检测Haar特征
import cv2 as cv
print(cv.__file__)

检测流程如下:
①、读取图片,并转换成灰度图
②、实例化人脸和眼睛检测的分类器对象
# 实例化级联分类器
classifier = cv.CascadeClassifier("haarcascade_frontalface_default.xml")
# 加载分类图
classifier.load("haarcascade_frontalface_defalut.xml")
③、进行人脸和眼睛的检测
"""
Gray:要进行检测的人脸图像
scaleFactor:前后两次扫描中,搜索窗口的比例系数
minneighbors:目标至少被检测到minNeighbors次才会被认为是目标
minsize和maxsize:目标的最小尺寸和最大尺寸
"""
rect = classifier.detectHultiScale(gray,scaleFactor,minNeighbors,minSize,maxsize)
④、将检测结果绘制出来
# 主程序
import cv2 as cv
import matplotlib.pyplot as plt
img = cv.imread("16.jpg")
gray = cv.cvtColor(img,cv.COLOR_BGR2GRAY)
face_cas = cv.CascadeClassifier("haarcascade_frontalface_defalult.xml")
face_cas.load("haarcascade_eye.xml")
eyes_cas = cv.CascadeClassifier("haarcascade_eye.xml")
eyes_Cas.load("haarcascade_eye.xml")
# 调用识别人脸
faceRects = face_cas.detecMultiScale(gray,scaleFactor=1.2,minNieghbors=3,minSize=(32,32))
for faceRect in faceRects:
x,y,w,h = faceRect
# 框出人脸
cv.rectangle(img,(x,y),(x+h,y+w),(0,255,0),3)
# 在识别出人脸中进行眼睛的检测
roi_color = img[y:y+h,x:x+w]
roi_gray = gray[y:y+h,x:x+w]
eyes = eyes_cas.detectMultiScale(roi_gray)
for(ex,ey,ew,eh) in eyes:
cv.rectangle(rol_color,(ex,ey),(ex+ew,ey+eh),(0,255,0),2)
plt.figure(figsize=(8,6),dpi=100)
plt.imshow(img[:,:,::-1]),plt.title("检测结果")
plt.xticks([]),plt.yticks([])
plt.show()

"""
视频中进行人脸检测
"""
import cv2 as cv
import matplotlib.pyplot as plt
cap = cv.VideoCapture("movie.mp4")
# 在每一帧数据中进行人脸识别
while(cap.isOpened()):
ret,frame = cap.read()
if ret == True:
gray = cv.cvtColor(frame,cv.COLOR_BGR2GRAY)
face_cas = cv.CascadeClassifier("haarcascade_frontalface_default.xml")
face_cas.load("haarcascade_frontalface_default.xml")
faceRects = face_cas.detectMultiScale(gray,scaleFactor=1.2,minNeighbors=3,minsize=(32,32))
for faceRect in faceRects:
x,y,w,h = faceRect
cv.rectangle(frame,(x,y),(x+h,y+w),(0,255,0),3)
cv.imshow("frame",frame)
if cv.waitKey() & 0xFF == ord("q")
break
cap.release()
cv.destroyAllWindow()
实用工具函数
# 通道分离与合并
b, g, r = cv2.split(img)
merged = cv2.merge([b, g, r])
# 图像拼接
h_stack = cv2.hconcat([img1, img2]) # 水平拼接
v_stack = cv2.vconcat([img1, img2]) # 垂直拼接
# 添加边界
bordered = cv2.copyMakeBorder(img, top, bottom, left, right,
cv2.BORDER_CONSTANT, value=[0,0,0])
场景一:工业视觉检测 - 产品缺陷检测 (智能制造 - 工业零件缺陷检测系统)
算法融合检测流程
输入图像 → 预处理 → 多方法并行检测 → 结果融合 → 质量判定 → 报告生成
↓ ↓ ↓ ↓ ↓ ↓
图像读取 → 灰度化+降噪 → 1.边缘检测法 → 轮廓合并 → 缺陷特征 → 可视化报告
→ 2.阈值分割法 → 过滤噪声 → 严重度评分 → JSON数据
→ 3.模板匹配法 → 形状验证 → 合格判定 → 数据库存储
检测算法融合策略
①、边缘检测(Canny):适用于检测裂纹、边缘破损
1. 高斯滤波 → 2. 梯度计算 → 3. 非极大值抑制 → 4. 双阈值检测
②、阈值分割:适用于表面污点、凹坑
自适应阈值 → 形态学操作 → 轮廓提取 → 区域筛选
③、模板匹配:适用于尺寸和形状验证
标准模板 → 多尺度匹配 → 相似度评分 → 形状合规性
import cv2
import numpy as np
import os
import json
from datetime import datetime
from pathlib import Path
import logging
class IndestrialDefectDetector:
"""工业零件缺陷检测系统-用于汽车零部件生产线的质量检测"""
def __init__(self,config_path="config.json"):
# 企业级项目需要使用配置文件和日志系统
self.config = self._load_config(config_path)
self.setup_logging()
self.setup_directories()
# 初始化模型参数(实际项目中可能加载预训练模型)
self.min_contour_area = self.config.get("min_contour_area", 100)
self.max_defect_size = self.config.get("max_defect_size", 5000)
self.gaussian_kernel = tuple(self.config.get("gaussian_kernel", (5, 5)))
# 统计信息
self.total_inspected = 0
self.defective_count = 0
self.production_log = []
logging.info("工业零件缺陷检测系统初始化完成")
def _load_config(self,config_path):
"""加载配置文件--企业项目中需要统一配置管理"""
default_config = {
"input_dir": "./input_parts",
"output_dir": "./output_results",
"log_dir": "./logs",
"min_contour_area": 100,
"max_defect_size": 5000,
"gaussian_kernel": [5, 5],
"threshold_value": 127,
"canny_thresholds": [50, 150],
"morphology_kernel": [3, 3],
"save_debug_images": True
}
if os.path.exists(config_path):
with open(config_path, 'r') as f:
config = json.load(f)
return {**default_config, **config}
return default_config
def setup_logging(self):
"""设置日志系统--企业项目必须记录运行状态"""
log_dir = self.config("log_dir")
os.makedirs(log_dir, exist_ok=True)
log_file = os.path.join(log_dir,f"defect_detection_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log")
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_file),
logging.StreamHandler() # 同时输出到控制台
]
)
def process_single_part(self, image_path, part_id):
"""
处理单个零件图像
Args:
image_path: 零件图像路径
part_id: 零件唯一标识符
Returns:
检测结果字典
"""
logging.info(f"开始检测零件 {part_id}")
try:
# 1.读取图像,使用cv2.imdecode处理中文路径和网络传输场景
if isinstance(image_path,(str,Path)):
img_array = np.fromfile(str(image_path),dtype=np.uint8)
img = cv2.imdecode(img_array,cv2.IMREAD_COLOR)
else:
#支持直接传入numpy数组(用于实时摄像头)
img = image_path.copy()
if img is None:
raise ValueError(f"无法读取图像: {image_path}")
original_img = img.copy()
# 2.图像预处理(标准化处理流程)
# 转换为灰度图-减少计算复杂度,大多数缺陷检测只需要亮度信息
gray = cv2.cvtColor(img,cv2.COLOR_BGR2GRAY)
# 高斯模糊-减少噪声影响,避免误检
blurred = cv2.GaussianBlur(gray,self.gaussian_kernel,0)
# 3.缺陷检测核心算法
# 方法一:边缘检测法(检测边缘异常)
edges = self._edge_based_defect_detection(blurred)
# 方法二:阈值分割法(检测表面缺陷)
binary_mask = self._hreshold_based_defect_detection(blurred)
# 方法三:模板匹配法(检测形状缺陷)
template_score = self._template_matching_detection(gray)
# 4.结果融合与决策
defects = self._fuse_detection_results(edges, binary_mask, original_img)
# 5.计算缺陷特征
defect_features = self._calculate_defect_features(defects)
# 6.质量判定
is_defective, quality_score = self._quality_judgement(defect_features, template_score)
# 7.生成可视化报告
result_img = self._generate_result_visualization(
original_img, defects, defect_features, is_defective
)
# 8.保存结果
result = self._save_results(
part_id, result_img, defect_features,
is_defective, quality_score
)
# 9.更新生产统计
self._update_production_stats(part_id, is_defective, quality_score)
logging.info(f"零件 {part_id} 检测完成: {'合格' if not is_defective else '不合格'}")
return result
except Exception as e:
logging.error(f"零件 {part_id} 检测失败: {str(e)}")
return {
"part_id": part_id,
"status": "error",
"error_message": str(e),
"timestamp": datetime.now().isoformat()
}
def _edge_based_defect_detection(self, blurred_img):
"""
基于边缘的缺陷检测
适用于检测裂纹、边缘破损等缺陷
"""
# Canny边缘检测
# 双阈值策略:地域50的丢弃,高于150的确认为边缘,中间值看连接性
edges = cv2.Canny(blurred_img,self.config["canny_thresholds"][0],self.config["canny_thresholds"][1])
#形态学操作增强边缘
kernel = np.ones(tuple(self.config["morphology_kernel"]),np.uint8)
edges = cv2.dilate(edges,kernel,iterations=1)
edges = cv2.erode(edges,kernel,iterations=1)
return edges
def _threshold_based_defect_detection(self,blurred_img):
"""
基于阈值的缺陷检测,适用于检测表面污点、凹坑等缺陷
"""
# 自适应阈值处理-应对光照不均的工业环境
binary = cv2.adaptiveThreshold(
blurred_img,
255,
cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
cv2.THRESH_BINARY_INV,
11, # 邻域大小
2 # 常数C
)
# 开运算去除噪声
kernel = np.ones(tuple(self.config["morphology_kernel"]),np.uint8)
binary = cv2.morphologyEx(binary,cv2.MORPH_OPEN,kernel)
return binary
def _template_matching_detection(self,gray_img):
"""
模板匹配检测,用于检测形状和尺寸是否合规
实际项目中,使用余弦训练好的模板
"""
# 这里简化实现,实际项目会加载标准模板
# 假设我们已经有了一个标准零件的模板
template_size = (100,100)
template = np.ones(template_size,dtype=np.uint8) * 128
# 如果图像太小,调整模板大小
if gray_img.shape[0] < template_size[0] or gray_img.shape[1] < template_size[1]:
#使用图像本身作为模板(简化)
h,w = gray_img.shape
template = gray_img[h//4:3*h//4, w//4:3*w//4]
else:
# 从图像中心裁剪模板区域
h,w = gray_img.shape
center_h,center_w = h//2,w//2
template = gray_img[center_h-50:center_h+50, center_w-50:center_w+50]
# 执行模板匹配
res = cv2.matchTemplate(gray_img,tempalte,cv2.TM_CCOEFF_NORMED)
# 获取最佳匹配分数
min_val,max_val,min_loc,max_loc = cv2.minMaxLoc(res)
return max_val # 返回匹配度
def _fuse_detection_results(self,edges,binary_mask,original_img):
"""
融合多种检测结果
企业级项目通常使用多算法融合提高准确性
"""
# 合并边缘和阈值检测结果
combined = cv2.bitwise_or(edges,binary_mask)
# 寻找轮廓
contours,hierarchy = cv2.findContours(
combined,
cv2.RETR_EXTERNAL, # 只检测外部轮廓
cv2.CHAIN_APPROX_SIMPLE # 压缩轮廓点
)
# 过滤小轮廓(噪声)
valid_contours = []
for contour in contours:
area = cv2.contourArea(contour)
if self.min_contour_area < area < self.max_defect_size:
valid_contours.append(contour)
return valid_contours
def _calculate_defect_features(self,contours):
"""
计算缺陷特征
为后续的质量分析和过程控制提供数据
"""
features = []
for i,contour in enumerate(contours):
# 基本特征
area = cv2.contourArea(contour)
perimeter = cv2.arcLength(contour,True)
# 形状特征
x,y,w,h = cv2.boundingRect(contour)
aspect_ratio = w / h if h > 0 else 0
# 圆度(衡量形状接近圆的程度)
if perimeter > 0:
circularity = 4 * np.pi * area / (perimeter * perimeter)
else:
circularity = 0
# 凸性检测
hull = cv2.convexHull(contour)
hull_area = cv2.contourArea(hull)
convexity = area / hull_area if hull_area > 0 else 0
# 最小外接矩形
rect = cv2.minAreaRect(contour)
box = cv2.boxPoints(rect)
box = np.int0(box)
# 缺陷类型初步判断
defect_type = self._classify_defect_type(area, circularity, aspect_ratio)
features.append({
"id": i,
"area": float(area),
"perimeter": float(perimeter),
"bounding_box": [int(x), int(y), int(w), int(h)],
"aspect_ratio": float(aspect_ratio),
"circularity": float(circularity),
"convexity": float(convexity),
"defect_type": defect_type,
"min_area_rect": box.tolist()
})
return features
def _classify_defect_type(self, area, circularity, aspect_ratio):
"""
根据特征初步分类缺陷类型
实际项目中会使用机器学习模型
"""
if area < 200:
return "小点状缺陷" # 可能是气泡或小杂质
elif circularity > 0.7:
return "圆形缺陷" # 可能是凹坑
elif aspect_ratio > 3:
return "线性缺陷" # 可能是裂纹
else:
return "不规则缺陷"
def _quality_judgement(self, defect_features, template_score):
"""
质量判定逻辑
基于缺陷特征和模板匹配分数
"""
# 如果没有缺陷,直接判定合格
if not defect_features:
return False, 1.0
# 计算缺陷总面积
total_defect_area = sum(feat["area"] for feat in defect_features)
# 判断是否超过阈值(这里简化,实际会更复杂)
max_allowed_area = self.config.get("max_defect_area", 1000)
# 质量评分:基于缺陷面积和模板匹配度
area_score = max(0, 1 - total_defect_area / max_allowed_area)
final_score = area_score * 0.7 + template_score * 0.3
# 判定是否合格
quality_threshold = self.config.get("quality_threshold", 0.8)
is_defective = final_score < quality_threshold
return is_defective, float(final_score)
def _generate_result_visualization(self, original_img, contours, features, is_defective):
"""
生成可视化检测结果
用于人工复核和报告生成
"""
# 创建结果图像
result_img = original_img.copy()
# 绘制所有缺陷轮廓
cv2.drawContours(result_img, contours, -1, (0, 0, 255), 2) # 红色轮廓
# 为每个缺陷添加标注
for i, (contour, feat) in enumerate(zip(contours, features)):
# 绘制边界框
x, y, w, h = feat["bounding_box"]
cv2.rectangle(result_img, (x, y), (x+w, y+h), (255, 0, 0), 2)
# 添加文本标注
label = f"D{i}: {feat['defect_type']}"
cv2.putText(result_img, label, (x, y-10),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 0), 2)
# 绘制最小外接矩形
box = np.array(feat["min_area_rect"], dtype=np.int32)
cv2.drawContours(result_img, [box], 0, (0, 255, 0), 1)
# 添加整体判定结果
status_text = "DEFECTIVE" if is_defective else "PASS"
status_color = (0, 0, 255) if is_defective else (0, 255, 0)
cv2.putText(result_img, f"Status: {status_text}", (20, 40),
cv2.FONT_HERSHEY_SIMPLEX, 1, status_color, 3)
# 添加时间戳和生产信息
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
cv2.putText(result_img, timestamp, (20, result_img.shape[0] - 20),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)
return result_img
def _save_results(self, part_id, result_img, features, is_defective, quality_score):
"""
保存检测结果
包括图像、JSON数据和数据库记录
"""
# 创建时间戳
timestamp = datetime.now()
timestamp_str = timestamp.strftime("%Y%m%d_%H%M%S")
# 保存结果图像
output_dir = Path(self.config["output_dir"])
img_filename = f"{part_id}_{timestamp_str}_{'NG' if is_defective else 'OK'}.jpg"
img_path = output_dir / "images" / img_filename
os.makedirs(img_path.parent, exist_ok=True)
cv2.imwrite(str(img_path), result_img)
# 保存JSON结果数据
result_data = {
"part_id": part_id,
"timestamp": timestamp.isoformat(),
"status": "defective" if is_defective else "ok",
"quality_score": quality_score,
"defect_count": len(features),
"defect_features": features,
"result_image": str(img_path),
"inspection_time": datetime.now().isoformat()
}
json_filename = f"{part_id}_{timestamp_str}.json"
json_path = output_dir / "data" / json_filename
os.makedirs(json_path.parent, exist_ok=True)
with open(json_path, 'w') as f:
json.dump(result_data, f, indent=2, ensure_ascii=False)
# 在实际项目中,这里还会保存到数据库
# self._save_to_database(result_data)
logging.info(f"零件 {part_id} 结果已保存: {img_path}")
return result_data
def _update_production_stats(self, part_id, is_defective, quality_score):
"""更新生产统计信息"""
self.total_inspected += 1
if is_defective:
self.defective_count += 1
self.production_log.append({
"part_id": part_id,
"timestamp": datetime.now().isoformat(),
"defective": is_defective,
"quality_score": quality_score
})
# 计算实时合格率
if self.total_inspected > 0:
pass_rate = 1 - (self.defective_count / self.total_inspected)
logging.info(f"当前合格率: {pass_rate:.2%}")
def generate_production_report(self):
"""生成生产报告"""
if not self.production_log:
return None
report = {
"report_time": datetime.now().isoformat(),
"total_inspected": self.total_inspected,
"defective_count": self.defective_count,
"pass_rate": 1 - (self.defective_count / self.total_inspected) if self.total_inspected > 0 else 0,
"inspection_log": self.production_log[-100:], # 最近100条记录
"system_uptime": str(datetime.now() - datetime.fromisoformat(self.production_log[0]["timestamp"]))
}
# 保存报告
report_dir = Path(self.config["output_dir"]) / "reports"
os.makedirs(report_dir, exist_ok=True)
report_file = report_dir / f"production_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
with open(report_file, 'w') as f:
json.dump(report, f, indent=2, ensure_ascii=False)
return report
def batch_process_parts(self):
"""批量处理零件 - 实际生产环境中的连续处理"""
input_dir = Path(self.config["input_dir"])
if not input_dir.exists():
logging.warning(f"输入目录不存在: {input_dir}")
return
# 支持多种图像格式
image_extensions = ['.jpg', '.jpeg', '.png', '.bmp', '.tiff']
image_files = []
for ext in image_extensions:
image_files.extend(input_dir.glob(f"*{ext}"))
image_files.extend(input_dir.glob(f"*{ext.upper()}"))
logging.info(f"找到 {len(image_files)} 个零件图像")
results = []
for img_file in image_files:
# 从文件名提取零件ID
part_id = img_file.stem
# 处理单个零件
result = self.process_single_part(img_file, part_id)
results.append(result)
# 生成中间报告(每处理10个零件)
if len(results) % 10 == 0:
logging.info(f"已处理 {len(results)}/{len(image_files)} 个零件")
# 生成最终报告
final_report = self.generate_production_report()
logging.info("批量处理完成")
return results, final_report
# 使用示例
if __name__ == "__main__":
# 初始化检测系统
detector = IndustrialDefectDetector("config.json")
# 批量处理零件
results, report = detector.batch_process_parts()
# 打印摘要
if report:
print(f"检测完成!")
print(f"总计检测: {report['total_inspected']} 个零件")
print(f"不合格数: {report['defective_count']} 个")
print(f"合格率: {report['pass_rate']:.2%}")
场景二:安防监控 - 实时移动物体检测与跟踪(智能安防 - 实时入侵检测与报警系统)
主控中心
│
┌───────────┼───────────┐
↓ ↓ ↓
摄像头1 摄像头2 摄像头N
│ │ │
↓ ↓ ↓
视频流捕获 → 帧缓冲队列 → 运动检测
│ │
↓ ↓
实时显示 异常判定
│
↓
多级报警触发
┌───────┴───────┐
↓ ↓
现场报警 远程通知
(声音) (邮件/短信)
背景减除算法流程:
1. 初始化背景模型(MOG2):
- 学习率:控制背景更新速度
- 方差阈值:区分前景/背景
- 历史帧数:影响背景稳定性
2. 每帧处理:
- 应用背景减除 → 获取前景掩码
- 二值化处理 → 形态学操作(去噪、连接)
- 轮廓检测 → 区域筛选(面积阈值)
3. 行为分析:
- 连续帧检测 → 排除瞬时干扰
- 区域跟踪 → 判断运动轨迹
- 活动热图 → 识别常发区域
import cv2
import numpy as np
import threading
import queue
import time
import json
from datetime import datetime
import os
from pathlib import Path
import logging
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.mime.image import MIMEImage
import pygame # 用于声音报警
class SecurityMonitoringSystem:
"""智能安防监控系统 - 实时入侵检测与报警"""
def __init__(self,config_path="security_config.json"):
# 加载配置
self.config = self._load_config(config_path)
self.setup_logging() # 设置安全日志
self.setup_directories() #创建安全存储目录
# 视频流配置
self.alarm_active = False
self.alarm_cooldown = 0
# 报警状态
self.alarm_active = False
self.alarm_cooldown = 0
# 运动检测参数
self.bg_subtractor = cv2.createBackgroundSubtractorMOG2(
history=500,#历史帧数
varThreshold=16,#方差阈值
detectShadows=False #不检测阴影
)
# 初始化报警声音
if self.config.get("enable_audio_alarm", False):
pygame.mixer.init()
self.alarm_soud = pygame.mixer.Sound("alarm.wav")# 需要准备音频文件
# 统计信息
self.motion_detected_count = 0
self.alarms_triggered = 0
self.start_time = time.time()
logging.info("智能安防监控系统初始化完成")
def _load_config(self,config_path):
"""加载安全配置"""
default_config = {
"video_sources": [
{"id": 0, "name": "Main Entrance", "type": "camera", "url": 0},
{"id": 1, "name": "Backyard", "type": "camera", "url": 1}
],
"alert_emails": ["security@company.com"],
"min_motion_area": 1000,
"alarm_cooldown_seconds": 30,
"save_frames_on_alarm": True,
"frame_save_count": 10,
"enable_email_alerts": True,
"enable_audio_alarm": False,
"smtp_server": "smtp.company.com",
"smtp_port": 587,
"email_username": "security@company.com",
"email_password": "your_password"
}
if os.path.exists(config_path):
with open(config_path, 'r') as f:
return {**default_config, **json.load(f)}
return default_config
def setup_logging(self):
"""设置安全日志"""
log_dir = Path("security_logs")
log_dir.mkdir(exist_ok = True)
logging.basicConfig(
level = logging.INFO,
format = '%(asctime)s - SECURITY - %(levelname)s - %(message)s',
handlers = [
logging.FileHandler(log_dir / f"security_{datetime.now().strftime('%Y%m%d')}.log"),
logging.StreamHandler()
]
)
def setup_directories(self):
"""创建安全存储目录"""
dirs = ["security_footage", "alarm_images", "motion_logs"]
for dir_name in dirs:
Path(dir_name).mkdir(exist_ok=True)
def capture_video_stream(self,camera_config):
"""捕获视频流线程"""
camera_id = camera_config["id"]
camera_name = camera_config["name"]
source = camera_config["url"]
logging.info(f"启动摄像头 {camera_name} (ID: {camera_id})")
cap = cv2.VideoCapture(source)
#设置摄像头参数
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)
cap.set(cv2.CAP_PROP_FPS, 30)
frame_count = 0
while True:
try:
ret,frame = cap.read()
if not ret:
logging.error(f"摄像头 {camera_name} 读取失败")
time.sleep(1)
continue
# 每3帧处理一次以减轻计算压力
if frame_count % 3 == 0:
timestamp = datetime.now()
#预处理帧
processed_frame = self._preprocess_frame(frame)
#放入处理队列
try:
self.frame_queue.put_nowait({
"camera_id": camera_id,
"camera_name": camera_name,
"frame": processed_frame,
"original_frame": frame.copy(),
"timestamp": timestamp,
"frame_count": frame_count
})
except queue.Full:
#队列满时丢弃旧帧
try:
self.frame_queue.get_nowait()
except queue.Empty:
pass
frame_count += 1
# 显示实时画面(调试用)
if self.config.get("show_live_feed", False):
cv2.imshow(camera_name, frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
except Excetion as e:
logging.error(f"摄像头 {camera_name} 捕获错误: {e}")
time.sleep(1)
cap.release()
def _preprocess_frame(self,frame):
"""帧预处理"""
# 调整大小以加快处理速度
processed = cv2.resize(frame,(320,240))
# 转换为灰度图
gray = cv2.cvtColor(processed,cv2.COLOR_BGR2GRAY)
# 高斯模糊减少噪声
blurred = cv2.GaussianBlur(gray,(5,5),0)
return blurred
def monion_detection_worker(self):
"""运动检测工作流程"""
logging.info("运动检测线程启动")
motion_history = []
consecutive_motion_frames = 0
while True:
try:
# 从队列获取帧
frame_data = self.frame_queue.get(timeout=1)
frame = frame_data["frame"]
camera_name = frame_data["camera_name"]
timestamp = frame_data["timestamp"]
#应用背景减除
fg_mask = self.bg_subtractor.apply(frame)
#二值化
_,thresh = cv2.threshold(fg_mask, 25, 255, cv2.THRESH_BINARY)
# 形态学操作去除噪声
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5))
thresh = cv2.morphologyEx(thresh, cv2.MORPH_OPEN, kernel)
thresh = cv2.dilate(thresh, kernel, iterations=2)
# 查找轮廓
contours,_ = cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
motion_detected = False
motion_areas = []
for contour in contours:
area = cv2.contourArea()
if area > self.config["min_motion_area"]:
motion_detected = True
motion_areas.append(area)
#绘制检测框
x,y,w,h = cv2.boundingRect(contour)
frame_data["original_frame"] = cv2.rectangle(
frame_data["original_frame"],
(x*2, y*2), # 恢复到原图尺寸
((x+w)*2, (y+h)*2),
(0, 0, 255), 2
)
# 运动检测逻辑
if motion_detected:
self.motion_detected_count += 1
consecutive_motion_frames += 1
# 记录运动事件
motion_event = {
"camera": camera_name,
"timestamp": timestamp.isoformat(),
"motion_areas": motion_areas,
"total_area": sum(motion_areas),
"consecutive_frames": consecutive_motion_frames
}
motion_history.append(motion_event)
# 如果连续检测到运动超过5帧,触发报警
if consecutive_motion_frames >= 5 and not self.alarm_active:
logging.warning(f"检测到持续运动!摄像头: {camera_name}")
self.trigger_alarm(frame_data, motion_event)
# 保存运动帧用于分析
if self.config.get("save_motion_frames", False):
self._save_motion_frame(frame_data, motion_event)
else:
consecutive_motion_frames = 0
# 更新运动历史(保持最近100条记录)
if len(motion_history) > 100:
motion_history = motion_history[-100:]
# 显示处理结果(调试)
if self.config.get("show_processing", False):
display_frame = cv2.resize(frame_data["original_frame"], (640, 480))
cv2.putText(display_frame, f"Motion: {motion_detected}",
(10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1,
(0, 255, 0) if not motion_detected else (0, 0, 255), 2)
cv2.imshow(f"Processed - {camera_name}", display_frame)
except queue.Empty:
continue
except Exception as e:
logging.error(f"运动检测错误: {e}")
def trigger_alarm(self, frame_data, motion_event):
"""触发报警"""
if self.alarm_active:
return
self.alarm_active = True
self.alarms_triggered += 1
self.alarm_cooldown = time.time()
# 记录报警
alarm_data = {
"alarm_id": self.alarms_triggered,
"timestamp": datetime.now().isoformat(),
"camera": frame_data["camera_name"],
"motion_event": motion_event,
"frame_count": frame_data["frame_count"]
}
logging.critical(f"🚨 报警触发!ID: {self.alarms_triggered}, 摄像头: {frame_data['camera_name']}")
# 保存报警帧
if self.config["save_frames_on_alarm"]:
self._save_alarm_frames(frame_data, alarm_data)
# 发送邮件报警
if self.config["enable_email_alerts"]:
threading.Thread(target=self.send_email_alert,
args=(frame_data, alarm_data)).start()
# 播放声音报警
if self.config.get("enable_audio_alarm", False):
threading.Thread(target=self.play_alarm_sound).start()
# 保存报警记录
self._save_alarm_record(alarm_data)
def _save_alarm_frames(self, frame_data, alarm_data):
"""保存报警前后的帧"""
alarm_dir = Path("alarm_images") / f"alarm_{alarm_data['alarm_id']}"
alarm_dir.mkdir(exist_ok=True)
# 保存当前帧
timestamp_str = datetime.now().strftime("%Y%m%d_%H%M%S_%f")
frame_path = alarm_dir / f"{timestamp_str}.jpg"
cv2.imwrite(str(frame_path), frame_data["original_frame"])
# 保存报警数据
alarm_json_path = alarm_dir / "alarm_data.json"
with open(alarm_json_path, 'w') as f:
json.dump(alarm_data, f, indent=2)
def send_email_alert(self, frame_data, alarm_data):
"""发送邮件报警"""
try:
# 创建邮件
msg = MIMEMultipart()
msg['Subject'] = f'🚨 安全报警 - 摄像头 {frame_data["camera_name"]}'
msg['From'] = self.config["email_username"]
msg['To'] = ', '.join(self.config["alert_emails"])
# 邮件正文
body = f"""
安全系统检测到异常活动!
报警详情:
- 时间: {alarm_data['timestamp']}
- 摄像头: {frame_data['camera_name']}
- 报警ID: {alarm_data['alarm_id']}
- 运动区域: {alarm_data['motion_event']['total_area']} 像素
请立即查看安全监控系统。
"""
msg.attach(MIMEText(body, 'plain'))
# 附加报警图片
with open(f"alarm_images/alarm_{alarm_data['alarm_id']}/{datetime.now().strftime('%Y%m%d_%H%M%S_%f')}.jpg", 'rb') as f:
img = MIMEImage(f.read())
img.add_header('Content-Disposition', 'attachment',
filename=f"alarm_{alarm_data['alarm_id']}.jpg")
msg.attach(img)
# 发送邮件
with smtplib.SMTP(self.config["smtp_server"], self.config["smtp_port"]) as server:
server.starttls()
server.login(self.config["email_username"], self.config["email_password"])
server.send_message(msg)
logging.info(f"报警邮件发送成功: {alarm_data['alarm_id']}")
except Exception as e:
logging.error(f"发送报警邮件失败: {e}")
def play_alarm_sound(self):
"""播放报警声音"""
try:
self.alarm_sound.play()
time.sleep(5) # 播放5秒
self.alarm_sound.stop()
except Exception as e:
logging.error(f"播放报警声音失败: {e}")
def _save_alarm_record(self, alarm_data):
"""保存报警记录到文件"""
alarms_file = Path("security_logs") / "alarms.json"
alarms = []
if alarms_file.exists():
with open(alarms_file, 'r') as f:
alarms = json.load(f)
alarms.append(alarm_data)
# 只保留最近1000条报警记录
if len(alarms) > 1000:
alarms = alarms[-1000:]
with open(alarms_file, 'w') as f:
json.dump(alarms, f, indent=2)
def alarm_cooldown_manager(self):
"""报警冷却管理"""
while True:
if self.alarm_active and time.time() - self.alarm_cooldown > self.config["alarm_cooldown_seconds"]:
self.alarm_active = False
logging.info("报警系统已重置")
time.sleep(1)
def generate_security_report(self):
"""生成安全报告"""
report = {
"report_time": datetime.now().isoformat(),
"system_uptime": time.time() - self.start_time,
"cameras_monitoring": len(self.video_sources),
"motion_detected_count": self.motion_detected_count,
"alarms_triggered": self.alarms_triggered,
"current_status": "ACTIVE" if not self.alarm_active else "ALARM ACTIVE"
}
report_file = Path("security_logs") / f"security_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
with open(report_file, 'w') as f:
json.dump(report, f, indent=2)
return report
def run(self):
"""运行安全监控系统"""
logging.info("启动安全监控系统...")
# 启动摄像头捕获线程
capture_threads = []
for camera_config in self.video_sources:
thread = threading.Thread(
target=self.capture_video_stream,
args=(camera_config,),
daemon=True
)
thread.start()
capture_threads.append(thread)
# 启动运动检测线程
motion_thread = threading.Thread(
target=self.motion_detection_worker,
daemon=True
)
motion_thread.start()
# 启动报警冷却管理
cooldown_thread = threading.Thread(
target=self.alarm_cooldown_manager,
daemon=True
)
cooldown_thread.start()
# 主循环
try:
while True:
# 每小时生成报告
if int(time.time()) % 3600 == 0:
self.generate_security_report()
time.sleep(1)
except KeyboardInterrupt:
logging.info("安全监控系统正在关闭...")
self.generate_security_report()
logging.info("安全监控系统已关闭")
# 使用示例
if __name__ == "__main__":
# 初始化安全系统
security_system = SecurityMonitoringSystem("security_config.json")
# 运行监控系统
security_system.run()
场景三:文档数字化 - 自动文档扫描与OCR预处理(医疗影像 - X光片骨折自动检测系统)
医疗影像处理流程
DICOM文件 → 医学图像处理 → 多算法骨折检测 → 特征量化 → AI辅助诊断 → 结构化报告
↓ ↓ ↓ ↓ ↓ ↓
元数据提取 → 窗宽窗位调整 → 1.边缘检测法 → 几何特征 → 严重度评分 → 临床建议
→ 图像增强 → 2.纹理分析法 → 统计特征 → 置信度评估 → 随访建议
→ 骨骼增强 → 3.轮廓分析法 → 形态特征 → 风险评估 → 会诊建议
多算法协同检测
基于边缘的方法:
骨骼区域分割 → 多尺度边缘检测 → 长直线段提取 → 骨折线识别
基于纹理的方法:
局部纹理分析 → 纹理不均匀性检测 → 异常区域标记
基于轮廓的方法:
骨骼轮廓提取 → 凸包计算 → 凸性缺陷分析 → 凹陷区域检测
import cv2
import numpy as np
import pydicom # DICOM医疗影像格式
import os
import json
from datetime import datetime
from pathlib import Path
import logging
from skimage import exposure
import matplotlib.pyplot as plt
from scipy import ndimage
class FractureDetectionSystem:
"""X光片骨折自动检测系统 - 辅助医生诊断"""
def __init__(self, config_path="medical_config.json"):
# 加载医疗配置
self.config = self._load_config(config_path)
self.setup_logging()
self.setup_directories()
# 医疗图像处理参数
self.clahe_clip_limit = self.config.get("clahe_clip_limit", 2.0)
self.clahe_grid_size = tuple(self.config.get("clahe_grid_size", (8, 8)))
# 骨折检测参数
self.min_fracture_length = self.config.get("min_fracture_length", 10)
self.edge_thresholds = tuple(self.config.get("edge_thresholds", [30, 100]))
self.bone_density_range = tuple(self.config.get("bone_density_range", [100, 200]))
# 患者数据库(简化版)
self.patient_records = {}
# 统计信息
self.processed_count = 0
self.fracture_detected_count = 0
logging.info("X光片骨折检测系统初始化完成")
def _load_config(self, config_path):
"""加载医疗配置"""
default_config = {
"input_dir": "./dicom_images",
"output_dir": "./diagnosis_results",
"log_dir": "./medical_logs",
"clahe_clip_limit": 2.0,
"clahe_grid_size": [8, 8],
"min_fracture_length": 10,
"edge_thresholds": [30, 100],
"bone_density_range": [100, 200],
"save_annotated_images": True,
"generate_pdf_report": True,
"doctor_review_required": True
}
if os.path.exists(config_path):
with open(config_path, 'r') as f:
return {**default_config, **json.load(f)}
return default_config
def setup_logging(self):
"""设置医疗日志系统"""
log_dir = Path(self.config["log_dir"])
log_dir.mkdir(exist_ok=True)
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - MEDICAL - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_dir / f"medical_{datetime.now().strftime('%Y%m%d')}.log"),
logging.StreamHandler()
]
)
def setup_directories(self):
"""创建医疗影像目录结构"""
dirs = [
"dicom_images",
"diagnosis_results",
"medical_logs",
"reports",
"annotated_images",
"patient_records"
]
for dir_name in dirs:
Path(dir_name).mkdir(exist_ok=True)
def load_dicom_image(self, dicom_path):
"""
加载DICOM格式的医疗影像
DICOM是医疗影像的标准格式,包含丰富的元数据
"""
try:
# 读取DICOM文件
dicom_data = pydicom.dcmread(dicom_path)
# 提取患者信息
patient_info = {
"patient_id": getattr(dicom_data, 'PatientID', 'Unknown'),
"patient_name": getattr(dicom_data, 'PatientName', 'Unknown'),
"study_date": getattr(dicom_data, 'StudyDate', 'Unknown'),
"modality": getattr(dicom_data, 'Modality', 'Unknown'),
"body_part": getattr(dicom_data, 'BodyPartExamined', 'Unknown'),
"study_description": getattr(dicom_data, 'StudyDescription', 'Unknown')
}
# 获取图像数据
img_array = dicom_data.pixel_array
# 应用DICOM窗宽窗位(Window Width/Window Center)
if hasattr(dicom_data, 'WindowWidth') and hasattr(dicom_data, 'WindowCenter'):
window_width = dicom_data.WindowWidth
window_center = dicom_data.WindowCenter
if isinstance(window_width, pydicom.multival.MultiValue):
window_width = window_width[0]
if isinstance(window_center, pydicom.multival.MultiValue):
window_center = window_center[0]
# 应用窗宽窗位调整
img_array = self._apply_window_level(img_array, window_width, window_center)
# 转换为8位灰度图像
img_array = exposure.rescale_intensity(img_array, out_range=(0, 255))
img_array = img_array.astype(np.uint8)
logging.info(f"成功加载DICOM图像: {dicom_path}")
return img_array, patient_info, dicom_data
except Exception as e:
logging.error(f"加载DICOM图像失败: {dicom_path}, 错误: {e}")
return None, None, None
def _apply_window_level(self, image, window_width, window_level):
"""应用DICOM窗宽窗位调整"""
min_value = window_level - window_width // 2
max_value = window_level + window_width // 2
# 裁剪值域
image = np.clip(image, min_value, max_value)
# 线性拉伸到0-255
image = ((image - min_value) / (max_value - min_value) * 255).astype(np.uint8)
return image
def preprocess_xray_image(self, image, patient_info):
"""
X光片预处理
医疗影像需要特殊的预处理来增强细节
"""
# 1. 图像增强 - CLAHE(对比度受限的自适应直方图均衡化)
# 特别适用于X光片,可以增强局部对比度而不放大噪声
clahe = cv2.createCLAHE(
clipLimit=self.clahe_clip_limit,
tileGridSize=self.clahe_grid_size
)
enhanced = clahe.apply(image)
# 2. 高斯模糊去除噪声
blurred = cv2.GaussianBlur(enhanced, (3, 3), 0)
# 3. 骨骼区域增强
# 通过阈值分割提取骨骼区域
_, bone_mask = cv2.threshold(
blurred,
self.bone_density_range[0],
self.bone_density_range[1],
cv2.THRESH_BINARY
)
# 4. 边缘保留滤波
bilateral = cv2.bilateralFilter(blurred, 9, 75, 75)
# 5. 多尺度细节增强
# 拉普拉斯金字塔增强细节
gaussian_pyramid = [bilateral]
for i in range(3):
blurred_down = cv2.pyrDown(gaussian_pyramid[-1])
gaussian_pyramid.append(blurred_down)
laplacian_pyramid = []
for i in range(len(gaussian_pyramid)-1):
size = (gaussian_pyramid[i].shape[1], gaussian_pyramid[i].shape[0])
gaussian_expanded = cv2.pyrUp(gaussian_pyramid[i+1], dstsize=size)
laplacian = cv2.subtract(gaussian_pyramid[i], gaussian_expanded)
laplacian_pyramid.append(laplacian)
# 增强拉普拉斯层
for i in range(len(laplacian_pyramid)):
laplacian_pyramid[i] = cv2.multiply(laplacian_pyramid[i], 1.5)
# 重建增强图像
reconstructed = gaussian_pyramid[-1]
for i in range(len(laplacian_pyramid)-1, -1, -1):
size = (laplacian_pyramid[i].shape[1], laplacian_pyramid[i].shape[0])
reconstructed = cv2.pyrUp(reconstructed, dstsize=size)
reconstructed = cv2.add(reconstructed, laplacian_pyramid[i])
# 合并结果
final_image = cv2.addWeighted(bilateral, 0.7, reconstructed, 0.3, 0)
return {
"original": image,
"enhanced": enhanced,
"bone_mask": bone_mask,
"bilateral": bilateral,
"final": final_image,
"patient_info": patient_info
}
def detect_fractures(self, processed_images):
"""
骨折检测核心算法
使用多方法融合提高检测准确性
"""
image = processed_images["final"]
bone_mask = processed_images["bone_mask"]
fractures = {
"edge_based": [],
"texture_based": [],
"contour_based": []
}
# 方法1:基于边缘的骨折检测
edge_fractures = self._edge_based_fracture_detection(image, bone_mask)
fractures["edge_based"] = edge_fractures
# 方法2:基于纹理的骨折检测
texture_fractures = self._texture_based_fracture_detection(image)
fractures["texture_based"] = texture_fractures
# 方法3:基于轮廓的骨折检测
contour_fractures = self._contour_based_fracture_detection(image, bone_mask)
fractures["contour_based"] = contour_fractures
# 融合检测结果
all_fractures = self._fuse_fracture_detections(fractures)
# 计算骨折特征
fracture_features = self._calculate_fracture_features(all_fractures, image)
return fracture_features
def _edge_based_fracture_detection(self, image, bone_mask):
"""基于边缘的骨折检测"""
# 1. 在骨骼区域应用Canny边缘检测
bone_region = cv2.bitwise_and(image, image, mask=bone_mask)
# 2. 多尺度边缘检测
edges_coarse = cv2.Canny(bone_region, 20, 60)
edges_medium = cv2.Canny(bone_region, 40, 100)
edges_fine = cv2.Canny(bone_region, 60, 150)
# 3. 合并多尺度边缘
combined_edges = cv2.bitwise_or(edges_coarse, edges_medium)
combined_edges = cv2.bitwise_or(combined_edges, edges_fine)
# 4. 形态学操作连接断裂边缘
kernel_line = cv2.getStructuringElement(cv2.MORPH_RECT, (1, 5))
connected_edges = cv2.morphologyEx(combined_edges, cv2.MORPH_CLOSE, kernel_line)
# 5. 查找长直线(可能是骨折线)
lines = cv2.HoughLinesP(
connected_edges,
rho=1,
theta=np.pi/180,
threshold=30,
minLineLength=self.min_fracture_length,
maxLineGap=10
)
fractures = []
if lines is not None:
for line in lines:
x1, y1, x2, y2 = line[0]
length = np.sqrt((x2 - x1)**2 + (y2 - y1)**2)
# 只保留足够长的线
if length >= self.min_fracture_length:
fractures.append({
"type": "edge_line",
"points": [(int(x1), int(y1)), (int(x2), int(y2))],
"length": float(length),
"method": "edge_based"
})
return fractures
def _texture_based_fracture_detection(self, image):
"""基于纹理的骨折检测"""
fractures = []
# 1. 计算局部二值模式(LBP)纹理特征
radius = 3
n_points = 8 * radius
# 简化LBP计算
height, width = image.shape
lbp_image = np.zeros_like(image, dtype=np.uint8)
for i in range(radius, height - radius):
for j in range(radius, width - radius):
center = image[i, j]
binary_code = 0
for n in range(n_points):
angle = 2 * np.pi * n / n_points
x = int(j + radius * np.cos(angle))
y = int(i + radius * np.sin(angle))
if image[y, x] >= center:
binary_code |= (1 << (n_points - n - 1))
lbp_image[i, j] = binary_code
# 2. 计算纹理不均匀区域(可能是骨折)
# 使用局部标准差检测纹理变化
window_size = 7
texture_variance = ndimage.generic_filter(
image,
np.std,
size=window_size
)
# 3. 阈值分割高纹理变化区域
_, texture_mask = cv2.threshold(
texture_variance.astype(np.uint8),
15, 255, cv2.THRESH_BINARY
)
# 4. 查找轮廓
contours, _ = cv2.findContours(
texture_mask,
cv2.RETR_EXTERNAL,
cv2.CHAIN_APPROX_SIMPLE
)
for contour in contours:
area = cv2.contourArea(contour)
if area > 50: # 足够大的纹理变化区域
x, y, w, h = cv2.boundingRect(contour)
fractures.append({
"type": "texture_anomaly",
"bbox": [int(x), int(y), int(w), int(h)],
"area": float(area),
"method": "texture_based"
})
return fractures
def _contour_based_fracture_detection(self, image, bone_mask):
"""基于轮廓的骨折检测"""
fractures = []
# 1. 骨骼区域轮廓分析
contours, hierarchy = cv2.findContours(
bone_mask,
cv2.RETR_TREE,
cv2.CHAIN_APPROX_SIMPLE
)
for contour in contours:
# 2. 计算轮廓的凸性缺陷
hull = cv2.convexHull(contour, returnPoints=False)
if len(hull) > 3:
# 查找凸性缺陷(可能是骨折部位)
defects = cv2.convexityDefects(contour, hull)
if defects is not None:
for i in range(defects.shape[0]):
s, e, f, d = defects[i, 0]
# d是深度,值越大缺陷越深
if d > 1000: # 阈值需要根据图像调整
start = tuple(contour[s][0])
end = tuple(contour[e][0])
far = tuple(contour[f][0])
fractures.append({
"type": "contour_defect",
"points": [start, end, far],
"depth": float(d / 256.0), # OpenCV的d需要除以256
"method": "contour_based"
})
return fractures
def _fuse_fracture_detections(self, fractures_dict):
"""融合多种检测方法的结果"""
all_fractures = []
# 合并所有检测结果
for method, fractures in fractures_dict.items():
all_fractures.extend(fractures)
# 去除重复检测(基于位置和大小)
merged_fractures = []
used_indices = set()
for i, frac1 in enumerate(all_fractures):
if i in used_indices:
continue
similar_fractures = [frac1]
# 查找相似的骨折检测
for j, frac2 in enumerate(all_fractures[i+1:], i+1):
if j in used_indices:
continue
# 计算两个骨折之间的距离/重叠度
if self._are_fractures_similar(frac1, frac2):
similar_fractures.append(frac2)
used_indices.add(j)
# 合并相似骨折
merged_fracture = self._merge_similar_fractures(similar_fractures)
merged_fractures.append(merged_fracture)
used_indices.add(i)
return merged_fractures
def _are_fractures_similar(self, frac1, frac2):
"""判断两个骨折检测是否相似"""
# 简化实现:基于边界框重叠
if "bbox" in frac1 and "bbox" in frac2:
x1, y1, w1, h1 = frac1["bbox"]
x2, y2, w2, h2 = frac2["bbox"]
# 计算IoU(交并比)
inter_x1 = max(x1, x2)
inter_y1 = max(y1, y2)
inter_x2 = min(x1 + w1, x2 + w2)
inter_y2 = min(y1 + h1, y2 + h2)
if inter_x2 < inter_x1 or inter_y2 < inter_y1:
return False
inter_area = max(0, inter_x2 - inter_x1) * max(0, inter_y2 - inter_y1)
union_area = w1 * h1 + w2 * h2 - inter_area
iou = inter_area / union_area if union_area > 0 else 0
return iou > 0.3 # 如果重叠超过30%,认为是同一骨折
return False
def _merge_similar_fractures(self, fractures):
"""合并相似的骨折检测"""
if not fractures:
return None
# 使用第一个骨折作为基准
merged = fractures[0].copy()
# 合并方法信息
methods = set()
for frac in fractures:
if "method" in frac:
methods.add(frac["method"])
merged["detection_methods"] = list(methods)
merged["confidence"] = len(fractures) # 被多个方法检测到,置信度更高
return merged
def _calculate_fracture_features(self, fractures, image):
"""计算骨折特征"""
for fracture in fractures:
# 计算骨折位置
if "bbox" in fracture:
x, y, w, h = fracture["bbox"]
fracture["center"] = [int(x + w/2), int(y + h/2)]
fracture["size"] = [int(w), int(h)]
# 计算骨折方向
if "points" in fracture and len(fracture["points"]) >= 2:
points = fracture["points"]
if len(points) == 2:
# 直线骨折
x1, y1 = points[0]
x2, y2 = points[1]
angle = np.degrees(np.arctan2(y2 - y1, x2 - x1))
fracture["angle"] = float(angle)
# 计算骨折严重程度评分
fracture["severity_score"] = self._calculate_severity_score(fracture, image)
return fractures
def _calculate_severity_score(self, fracture, image):
"""计算骨折严重程度评分"""
score = 0.0
# 基于长度/大小
if "length" in fracture:
score += min(fracture["length"] / 100, 1.0) * 0.4
if "area" in fracture:
score += min(fracture["area"] / 1000, 1.0) * 0.3
# 基于检测方法数量
if "detection_methods" in fracture:
method_count = len(fracture["detection_methods"])
score += (method_count / 3) * 0.3
return min(score, 1.0)
def generate_diagnosis_report(self, patient_info, fractures, processed_images):
"""生成诊断报告"""
# 创建报告目录
report_dir = Path(self.config["output_dir"]) / "reports" / patient_info["patient_id"]
report_dir.mkdir(parents=True, exist_ok=True)
# 基础报告信息
report = {
"report_id": f"DX_{datetime.now().strftime('%Y%m%d_%H%M%S')}",
"patient_info": patient_info,
"examination_date": datetime.now().isoformat(),
"radiologist": "AI辅助诊断系统",
"findings": [],
"impression": "",
"recommendations": []
}
# 分析骨折
fracture_count = len(fractures)
if fracture_count == 0:
report["findings"].append("未发现明确骨折征象。")
report["impression"] = "未见明显骨折。"
report["recommendations"].append("建议临床随访。")
else:
report["findings"].append(f"发现 {fracture_count} 处可疑骨折区域。")
for i, fracture in enumerate(fractures):
finding = f"骨折区域 {i+1}: "
if fracture.get("type") == "edge_line":
finding += f"线性骨折,长度约{fracture.get('length', 0):.1f}像素。"
elif fracture.get("type") == "texture_anomaly":
finding += f"纹理异常区域,面积约{fracture.get('area', 0):.1f}平方像素。"
elif fracture.get("type") == "contour_defect":
finding += f"轮廓缺陷,深度约{fracture.get('depth', 0):.1f}。"
if "severity_score" in fracture:
severity = fracture["severity_score"]
if severity > 0.7:
finding += "(高度可疑)"
elif severity > 0.4:
finding += "(中度可疑)"
else:
finding += "(低度可疑)"
report["findings"].append(finding)
# 总体印象
total_severity = sum(f.get("severity_score", 0) for f in fractures) / fracture_count
if total_severity > 0.7:
report["impression"] = "高度怀疑存在骨折,请立即临床评估。"
report["recommendations"].append("建议立即进行CT检查确认。")
report["recommendations"].append("建议骨科急诊就诊。")
elif total_severity > 0.4:
report["impression"] = "可疑骨折,需要进一步评估。"
report["recommendations"].append("建议专科医生会诊。")
report["recommendations"].append("建议一周后复查X光片。")
else:
report["impression"] = "骨折可能性较低,但不能完全排除。"
report["recommendations"].append("建议临床随访观察。")
report["recommendations"].append("本报告为AI辅助诊断,最终诊断需由执业医师确认。")
# 保存报告
report_file = report_dir / f"diagnosis_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
with open(report_file, 'w', encoding='utf-8') as f:
json.dump(report, f, indent=2, ensure_ascii=False)
# 生成可视化图像
self._generate_annotated_image(processed_images, fractures, report_dir)
# 更新统计
self.processed_count += 1
if fracture_count > 0:
self.fracture_detected_count += 1
logging.info(f"为患者 {patient_info['patient_id']} 生成诊断报告")
return report
def _generate_annotated_image(self, processed_images, fractures, report_dir):
"""生成标注图像"""
original = processed_images["original"]
enhanced = processed_images["enhanced"]
# 创建合并图像
annotated = cv2.cvtColor(original, cv2.COLOR_GRAY2BGR)
# 绘制骨折区域
colors = [(0, 0, 255), (0, 255, 0), (255, 0, 0)] # 红、绿、蓝
for i, fracture in enumerate(fractures):
color = colors[i % len(colors)]
if fracture.get("type") == "edge_line" and "points" in fracture:
# 绘制骨折线
points = fracture["points"]
if len(points) == 2:
cv2.line(annotated, points[0], points[1], color, 2)
elif fracture.get("type") == "texture_anomaly" and "bbox" in fracture:
# 绘制边界框
x, y, w, h = fracture["bbox"]
cv2.rectangle(annotated, (x, y), (x+w, y+h), color, 2)
elif fracture.get("type") == "contour_defect" and "points" in fracture:
# 绘制三角形表示缺陷
points = fracture["points"]
if len(points) == 3:
pts = np.array(points, np.int32)
cv2.polylines(annotated, [pts], True, color, 2)
# 添加标签
if "center" in fracture:
cv2.putText(annotated, f"F{i+1}",
tuple(fracture["center"]),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, color, 2)
# 添加患者信息
patient_info = processed_images["patient_info"]
info_text = f"Patient: {patient_info.get('patient_id', 'Unknown')}"
cv2.putText(annotated, info_text, (10, 30),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 1)
# 保存图像
image_file = report_dir / f"annotated_{datetime.now().strftime('%Y%m%d_%H%M%S')}.jpg"
cv2.imwrite(str(image_file), annotated)
# 保存增强对比图像
enhanced_file = report_dir / f"enhanced_{datetime.now().strftime('%Y%m%d_%H%M%S')}.jpg"
cv2.imwrite(str(enhanced_file), enhanced)
def process_patient_xray(self, dicom_path):
"""处理单个患者的X光片"""
logging.info(f"处理X光片: {dicom_path}")
# 1. 加载DICOM图像
image, patient_info, dicom_data = self.load_dicom_image(dicom_path)
if image is None:
return None
# 2. 图像预处理
processed_images = self.preprocess_xray_image(image, patient_info)
# 3. 骨折检测
fractures = self.detect_fractures(processed_images)
# 4. 生成诊断报告
report = self.generate_diagnosis_report(patient_info, fractures, processed_images)
# 5. 保存到患者记录
self._update_patient_record(patient_info, fractures, report)
return {
"patient_info": patient_info,
"fractures": fractures,
"report": report,
"processed_images": processed_images
}
def _update_patient_record(self, patient_info, fractures, report):
"""更新患者记录"""
patient_id = patient_info.get("patient_id")
if patient_id not in self.patient_records:
self.patient_records[patient_id] = {
"patient_info": patient_info,
"examinations": []
}
examination = {
"date": datetime.now().isoformat(),
"fracture_count": len(fractures),
"fractures": fractures,
"report_id": report["report_id"],
"severity_scores": [f.get("severity_score", 0) for f in fractures]
}
self.patient_records[patient_id]["examinations"].append(examination)
# 保存到文件
record_file = Path("patient_records") / f"{patient_id}.json"
with open(record_file, 'w', encoding='utf-8') as f:
json.dump(self.patient_records[patient_id], f, indent=2, ensure_ascii=False)
def batch_process_xrays(self):
"""批量处理X光片"""
input_dir = Path(self.config["input_dir"])
if not input_dir.exists():
logging.error(f"输入目录不存在: {input_dir}")
return []
# 查找DICOM文件
dicom_files = list(input_dir.glob("*.dcm")) + list(input_dir.glob("*.dicom"))
if not dicom_files:
logging.warning(f"在 {input_dir} 中未找到DICOM文件")
return []
logging.info(f"找到 {len(dicom_files)} 个DICOM文件")
results = []
for dicom_file in dicom_files:
try:
result = self.process_patient_xray(dicom_file)
if result:
results.append(result)
# 打印简要结果
patient_id = result["patient_info"]["patient_id"]
fracture_count = len(result["fractures"])
logging.info(f"患者 {patient_id}: 检测到 {fracture_count} 处骨折")
except Exception as e:
logging.error(f"处理文件 {dicom_file} 失败: {e}")
# 生成统计报告
self._generate_statistics_report()
return results
def _generate_statistics_report(self):
"""生成统计报告"""
stats = {
"report_date": datetime.now().isoformat(),
"total_patients_processed": self.processed_count,
"patients_with_fractures": self.fracture_detected_count,
"detection_rate": self.fracture_detected_count / self.processed_count if self.processed_count > 0 else 0,
"patient_records": len(self.patient_records),
"system_version": "1.0.0"
}
stats_file = Path(self.config["output_dir"]) / f"statistics_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
with open(stats_file, 'w', encoding='utf-8') as f:
json.dump(stats, f, indent=2, ensure_ascii=False)
logging.info(f"统计报告已生成: {stats_file}")
return stats
# 使用示例
if __name__ == "__main__":
# 初始化骨折检测系统
fracture_system = FractureDetectionSystem("medical_config.json")
# 批量处理X光片
results = fracture_system.batch_process_xrays()
# 打印摘要
print("\n" + "="*60)
print("X光片骨折检测系统 - 处理完成")
print("="*60)
if results:
total_patients = len(results)
patients_with_fractures = sum(1 for r in results if r["fractures"])
print(f"总计处理患者: {total_patients}")
print(f"发现骨折患者: {patients_with_fractures}")
print(f"检测率: {patients_with_fractures/total_patients*100:.1f}%")
# 显示第一个患者的结果
if results:
first_result = results[0]
print(f"\n示例患者: {first_result['patient_info']['patient_id']}")
print(f"骨折数量: {len(first_result['fractures'])}")
if first_result['fractures']:
print("骨折详情:")
for i, fracture in enumerate(first_result['fractures']):
print(f" 骨折{i+1}: {fracture.get('type', '未知')}, "
f"严重度: {fracture.get('severity_score', 0):.2f}")
print("\n诊断报告保存在: diagnosis_results/ 目录中")
更多推荐
所有评论(0)