计数汽车OpenCV + Python问题

我一直在试图计数过车时的车辆,它的工作原理,但问题是它计数一次车多次这是荒谬的,因为它应该算一次

这是我正在使用的代码:

import cv2 import numpy as np bgsMOG = cv2.BackgroundSubtractorMOG() cap = cv2.VideoCapture("traffic.avi") counter = 0 if cap: while True: ret, frame = cap.read() if ret: fgmask = bgsMOG.apply(frame, None, 0.01) cv2.line(frame,(0,60),(160,60),(255,255,0),1) # To find the countours of the Cars contours, hierarchy = cv2.findContours(fgmask, cv2.RETR_EXTERNAL,cv2.CHAIN_APPROX_SIMPLE) try: hierarchy = hierarchy[0] except: hierarchy = [] for contour, hier in zip(contours, hierarchy): (x, y, w, h) = cv2.boundingRect(contour) if w > 20 and h > 20: cv2.rectangle(frame, (x,y), (x+w,y+h), (255, 0, 0), 1) #To find centroid of the Car x1 = w/2 y1 = h/2 cx = x+x1 cy = y+y1 ## print "cy=", cy ## print "cx=", cx centroid = (cx,cy) ## print "centoid=", centroid # Draw the circle of Centroid cv2.circle(frame,(int(cx),int(cy)),2,(0,0,255),-1) # To make sure the Car crosses the line ## dy = cy-108 ## print "dy", dy if centroid > (27, 38) and centroid < (134, 108): ## if (cx <= 132)and(cx >= 20): counter +=1 ## print "counter=", counter ## if cy > 10 and cy < 160: cv2.putText(frame, str(counter), (x,y-5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 0, 255), 2) ## cv2.namedWindow('Output',cv2.cv.CV_WINDOW_NORMAL) cv2.imshow('Output', frame) ## cv2.imshow('FGMASK', fgmask) key = cv2.waitKey(60) if key == 27: break cap.release() cv2.destroyAllWindows() 

和video是在我的github页面@ https://github.com/Tes3awy/MatLab-Tutorials所谓traffic.avi,这也是一个内置的video在Matlab库

每一辆车的任何帮助都被计算一次?


编辑:video的个别帧如下所示:

制备

为了理解正在发生的事情,并最终解决我们的问题,我们首先需要稍微改进脚本。

我已经添加了对algorithm重要步骤的logging,重构了一些代码,并添加了掩码和处理图像的保存,增加了使用各个帧图像运行脚本的能力以及一些其他修改。

这是剧本在这一点上的样子:

 import logging import logging.handlers import os import time import sys import cv2 import numpy as np from vehicle_counter import VehicleCounter # ============================================================================ IMAGE_DIR = "images" IMAGE_FILENAME_FORMAT = IMAGE_DIR + "/frame_%04d.png" # Support either video file or individual frames CAPTURE_FROM_VIDEO = False if CAPTURE_FROM_VIDEO: IMAGE_SOURCE = "traffic.avi" # Video file else: IMAGE_SOURCE = IMAGE_FILENAME_FORMAT # Image sequence # Time to wait between frames, 0=forever WAIT_TIME = 1 # 250 # ms LOG_TO_FILE = True # Colours for drawing on processed frames DIVIDER_COLOUR = (255, 255, 0) BOUNDING_BOX_COLOUR = (255, 0, 0) CENTROID_COLOUR = (0, 0, 255) # ============================================================================ def init_logging(): main_logger = logging.getLogger() formatter = logging.Formatter( fmt='%(asctime)s.%(msecs)03d %(levelname)-8s [%(name)s] %(message)s' , datefmt='%Y-%m-%d %H:%M:%S') handler_stream = logging.StreamHandler(sys.stdout) handler_stream.setFormatter(formatter) main_logger.addHandler(handler_stream) if LOG_TO_FILE: handler_file = logging.handlers.RotatingFileHandler("debug.log" , maxBytes = 2**24 , backupCount = 10) handler_file.setFormatter(formatter) main_logger.addHandler(handler_file) main_logger.setLevel(logging.DEBUG) return main_logger # ============================================================================ def save_frame(file_name_format, frame_number, frame, label_format): file_name = file_name_format % frame_number label = label_format % frame_number log.debug("Saving %s as '%s'", label, file_name) cv2.imwrite(file_name, frame) # ============================================================================ def get_centroid(x, y, w, h): x1 = int(w / 2) y1 = int(h / 2) cx = x + x1 cy = y + y1 return (cx, cy) # ============================================================================ def detect_vehicles(fg_mask): log = logging.getLogger("detect_vehicles") MIN_CONTOUR_WIDTH = 21 MIN_CONTOUR_HEIGHT = 21 # Find the contours of any vehicles in the image contours, hierarchy = cv2.findContours(fg_mask , cv2.RETR_EXTERNAL , cv2.CHAIN_APPROX_SIMPLE) log.debug("Found %d vehicle contours.", len(contours)) matches = [] for (i, contour) in enumerate(contours): (x, y, w, h) = cv2.boundingRect(contour) contour_valid = (w >= MIN_CONTOUR_WIDTH) and (h >= MIN_CONTOUR_HEIGHT) log.debug("Contour #%d: pos=(x=%d, y=%d) size=(w=%d, h=%d) valid=%s" , i, x, y, w, h, contour_valid) if not contour_valid: continue centroid = get_centroid(x, y, w, h) matches.append(((x, y, w, h), centroid)) return matches # ============================================================================ def filter_mask(fg_mask): kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (3, 3)) # Fill any small holes closing = cv2.morphologyEx(fg_mask, cv2.MORPH_CLOSE, kernel) # Remove noise opening = cv2.morphologyEx(closing, cv2.MORPH_OPEN, kernel) # Dilate to merge adjacent blobs dilation = cv2.dilate(opening, kernel, iterations = 2) return dilation # ============================================================================ def process_frame(frame_number, frame, bg_subtractor, car_counter): log = logging.getLogger("process_frame") # Create a copy of source frame to draw into processed = frame.copy() # Draw dividing line -- we count cars as they cross this line. cv2.line(processed, (0, car_counter.divider), (frame.shape[1], car_counter.divider), DIVIDER_COLOUR, 1) # Remove the background fg_mask = bg_subtractor.apply(frame, None, 0.01) fg_mask = filter_mask(fg_mask) save_frame(IMAGE_DIR + "/mask_%04d.png" , frame_number, fg_mask, "foreground mask for frame #%d") matches = detect_vehicles(fg_mask) log.debug("Found %d valid vehicle contours.", len(matches)) for (i, match) in enumerate(matches): contour, centroid = match log.debug("Valid vehicle contour #%d: centroid=%s, bounding_box=%s", i, centroid, contour) x, y, w, h = contour # Mark the bounding box and the centroid on the processed frame # NB: Fixed the off-by one in the bottom right corner cv2.rectangle(processed, (x, y), (x + w - 1, y + h - 1), BOUNDING_BOX_COLOUR, 1) cv2.circle(processed, centroid, 2, CENTROID_COLOUR, -1) log.debug("Updating vehicle count...") car_counter.update_count(matches, processed) return processed # ============================================================================ def main(): log = logging.getLogger("main") log.debug("Creating background subtractor...") bg_subtractor = cv2.BackgroundSubtractorMOG() log.debug("Pre-training the background subtractor...") default_bg = cv2.imread(IMAGE_FILENAME_FORMAT % 119) bg_subtractor.apply(default_bg, None, 1.0) car_counter = None # Will be created after first frame is captured # Set up image source log.debug("Initializing video capture device #%s...", IMAGE_SOURCE) cap = cv2.VideoCapture(IMAGE_SOURCE) frame_width = cap.get(cv2.cv.CV_CAP_PROP_FRAME_WIDTH) frame_height = cap.get(cv2.cv.CV_CAP_PROP_FRAME_HEIGHT) log.debug("Video capture frame size=(w=%d, h=%d)", frame_width, frame_height) log.debug("Starting capture loop...") frame_number = -1 while True: frame_number += 1 log.debug("Capturing frame #%d...", frame_number) ret, frame = cap.read() if not ret: log.error("Frame capture failed, stopping...") break log.debug("Got frame #%d: shape=%s", frame_number, frame.shape) if car_counter is None: # We do this here, so that we can initialize with actual frame size log.debug("Creating vehicle counter...") car_counter = VehicleCounter(frame.shape[:2], frame.shape[0] / 2) # Archive raw frames from video to disk for later inspection/testing if CAPTURE_FROM_VIDEO: save_frame(IMAGE_FILENAME_FORMAT , frame_number, frame, "source frame #%d") log.debug("Processing frame #%d...", frame_number) processed = process_frame(frame_number, frame, bg_subtractor, car_counter) save_frame(IMAGE_DIR + "/processed_%04d.png" , frame_number, processed, "processed frame #%d") cv2.imshow('Source Image', frame) cv2.imshow('Processed Image', processed) log.debug("Frame #%d processed.", frame_number) c = cv2.waitKey(WAIT_TIME) if c == 27: log.debug("ESC detected, stopping...") break log.debug("Closing video capture device...") cap.release() cv2.destroyAllWindows() log.debug("Done.") # ============================================================================ if __name__ == "__main__": log = init_logging() if not os.path.exists(IMAGE_DIR): log.debug("Creating image directory `%s`...", IMAGE_DIR) os.makedirs(IMAGE_DIR) main() 

该脚本负责处理图像stream,并识别每个帧中的所有车辆 – 我把它们称为代码中的matches


对检测到的车辆进行计数的任务被委托给VehicleCounter类。 我select让这个class级成为class级的原因,随着我们的进步将会变得很明显。 我没有实现你的车辆计数algorithm,因为当我们深入研究这个algorithm的时候,这个algorithm是不会起作用的。

文件vehicle_counter.py包含以下代码:

 import logging # ============================================================================ class VehicleCounter(object): def __init__(self, shape, divider): self.log = logging.getLogger("vehicle_counter") self.height, self.width = shape self.divider = divider self.vehicle_count = 0 def update_count(self, matches, output_image = None): self.log.debug("Updating count using %d matches...", len(matches)) # ============================================================================ 

最后,我写了一个脚本,将所有生成的图像拼接在一起,因此更容易检查它们:

 import cv2 import numpy as np # ============================================================================ INPUT_WIDTH = 160 INPUT_HEIGHT = 120 OUTPUT_TILE_WIDTH = 10 OUTPUT_TILE_HEIGHT = 12 TILE_COUNT = OUTPUT_TILE_WIDTH * OUTPUT_TILE_HEIGHT # ============================================================================ def stitch_images(input_format, output_filename): output_shape = (INPUT_HEIGHT * OUTPUT_TILE_HEIGHT , INPUT_WIDTH * OUTPUT_TILE_WIDTH , 3) output = np.zeros(output_shape, np.uint8) for i in range(TILE_COUNT): img = cv2.imread(input_format % i) cv2.rectangle(img, (0, 0), (INPUT_WIDTH - 1, INPUT_HEIGHT - 1), (0, 0, 255), 1) # Draw the frame number cv2.putText(img, str(i), (2, 10) , cv2.FONT_HERSHEY_PLAIN, 0.7, (255, 255, 255), 1) x = i % OUTPUT_TILE_WIDTH * INPUT_WIDTH y = i / OUTPUT_TILE_WIDTH * INPUT_HEIGHT output[y:y+INPUT_HEIGHT, x:x+INPUT_WIDTH,:] = img cv2.imwrite(output_filename, output) # ============================================================================ stitch_images("images/frame_%04d.png", "stitched_frames.png") stitch_images("images/mask_%04d.png", "stitched_masks.png") stitch_images("images/processed_%04d.png", "stitched_processed.png") 

分析

为了解决这个问题,我们应该对我们期望得到的结果有一些了解。 我们还应该在video中标注所有不同的汽车,所以谈论它们更容易。

视频中的所有10辆车

如果我们运行我们的脚本,并将图像拼接在一起,我们会得到一些有用的文件来帮助我们分析问题:

  • 包含input框镶嵌的图像
  • 包含前景蒙版马赛克的图像:

  • 包含处理帧的马赛克的图像

  • 运行的debugging日志 。

在检查这些问题后,一些问题变得明显:

  • 前景面具往往是嘈杂的。 我们应该做一些过滤(侵蚀/扩张?)来消除噪音,缩小差距。
  • 有时我们会错过车辆(灰色)。
  • 一些车辆在单个框架中被检测到两次。
  • 很less在车架上部区域检测到车辆。
  • 经常在连续的帧中检测到相同的车辆。 我们需要找出一种方法来跟踪连续帧中的相同车辆,并只计算一次。

1.播种背景减法器

我们的video很短,只有120帧。 以0.01学习率,背景检测器的video的相当大一部分将会稳定。

幸运的是,video的最后一帧(第119帧)完全没有车辆,因此我们可以将它用作我们的初始背景图像。 (在笔记和注释中提到了获取合适图像的其他选项。)

背景图

要使用这个初始背景图像,我们只需加载它,并将其apply学习因子为1.0的背景减法器:

 bg_subtractor = cv2.BackgroundSubtractorMOG() default_bg = cv2.imread(IMAGE_FILENAME_FORMAT % 119) bg_subtractor.apply(default_bg, None, 1.0) 

当我们看到新的马赛克蒙版,我们可以看到,我们得到更less的噪音,车辆检测在早期帧更好地工作。

2.清理前景面具

一个简单的方法来改善我们的前景蒙板是应用几个形态转换 。

 def filter_mask(fg_mask): kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (3, 3)) # Fill any small holes closing = cv2.morphologyEx(fg_mask, cv2.MORPH_CLOSE, kernel) # Remove noise opening = cv2.morphologyEx(closing, cv2.MORPH_OPEN, kernel) # Dilate to merge adjacent blobs dilation = cv2.dilate(opening, kernel, iterations = 2) return dilation 

通过检查掩码 , 处理后的帧和过滤产生的日志文件 ,我们可以看到,我们现在可以更可靠地检测到车辆,并且减轻了作为单独对象检测到的一辆车的不同部分的问题。

3.跟踪车架之间的车辆

在这一点上,我们需要通过我们的日志文件,并收集每个车辆的所有质心坐标。 这将允许我们绘制和检查每个车辆在图像上追踪的path,并且开发一种自动执行此操作的algorithm。 为了简化这个过程,我们可以通过删除相关条目来创build一个简化日志 。

质心坐标列表:

 traces = { 'A': [(112, 36), (112, 45), (112, 52), (112, 54), (112, 63), (111, 73), (111, 86), (111, 91), (111, 97), (110, 105)] , 'B': [(119, 37), (120, 42), (121, 54), (121, 55), (123, 64), (124, 74), (125, 87), (127, 94), (125, 100), (126, 108)] , 'C': [(93, 23), (91, 27), (89, 31), (87, 36), (85, 42), (82, 49), (79, 59), (74, 71), (70, 82), (62, 86), (61, 92), (55, 101)] , 'D': [(118, 30), (124, 83), (125, 90), (116, 101), (122, 100)] , 'E': [(77, 27), (75, 30), (73, 33), (70, 37), (67, 42), (63, 47), (59, 53), (55, 59), (49, 67), (43, 75), (36, 85), (27, 92), (24, 97), (20, 102)] , 'F': [(119, 30), (120, 34), (120, 39), (122, 59), (123, 60), (124, 70), (125, 82), (127, 91), (126, 97), (128, 104)] , 'G': [(88, 37), (87, 41), (85, 48), (82, 55), (79, 63), (76, 74), (72, 87), (67, 92), (65, 98), (60, 106)] , 'H': [(124, 35), (123, 40), (125, 45), (127, 59), (126, 59), (128, 67), (130, 78), (132, 88), (134, 93), (135, 99), (135, 107)] , 'I': [(98, 26), (97, 30), (96, 34), (94, 40), (92, 47), (90, 55), (87, 64), (84, 77), (79, 87), (74, 93), (73, 102)] , 'J': [(123, 60), (125, 63), (125, 81), (127, 93), (126, 98), (125, 100)] } 

个人车辆痕迹绘制在背景上:

探测到的车辆痕迹

所有车辆痕迹的组合放大图像:

所有的痕迹放在一起放大的背景

vector

为了分析运动,我们需要使用vector(即移动的距离和方向)。 下图显示了angular度如何对应图像中车辆的移动。

我们可以使用下面的函数来计算两点之间的向量:

 def get_vector(a, b): """Calculate vector (distance, angle in degrees) from point a to point b. Angle ranges from -180 to 180 degrees. Vector with angle 0 points straight down on the image. Values increase in clockwise direction. """ dx = float(b[0] - a[0]) dy = float(b[1] - a[1]) distance = math.sqrt(dx**2 + dy**2) if dy > 0: angle = math.degrees(math.atan(-dx/dy)) elif dy == 0: if dx < 0: angle = 90.0 elif dx > 0: angle = -90.0 else: angle = 0.0 else: if dx < 0: angle = 180 - math.degrees(math.atan(dx/dy)) elif dx > 0: angle = -180 - math.degrees(math.atan(dx/dy)) else: angle = 180.0 return distance, angle 

分类

我们可以寻找可用于将运动分类为有效/无效的模式的一种方法是制作散点图(angular度与距离):

角度与距离的图

  • 绿点代表有效的运动,我们使用每个车辆的点列表来确定。
  • 红点表示无效的运动 – 相邻车道点之间的vector。
  • 我绘制了两条蓝色曲线,我们可以用它来分离两种运动。 位于任一曲线下的任何点都可以被认为是有效的。 曲线是:
    • distance = -0.008 * angle**2 + 0.4 * angle + 25.0
    • distance = 10.0

我们可以使用下面的函数来分类运动vector:

 def is_valid_vector(a): distance, angle = a threshold_distance = max(10.0, -0.008 * angle**2 + 0.4 * angle + 25.0) return (distance <= threshold_distance) 

注意:有一个离群值,这是由于我们在第43..48帧中丢失了车辆D的数据。

algorithm

我们将使用Class Vehicle来存储有关每辆履带车辆的信息:

  • 某种标识符
  • 职位列表,最近在前面
  • 上次看到的计数器 – 自从我们上次看到此车辆以来的帧数
  • 标记车辆是否被计数

VehicleCounter将存储当前跟踪车辆的列表,并跟踪总数。 在每一帧中,我们将使用边界框列表和已识别车辆的位置(候选列表)来更新VehicleCounter的状态:

  1. 更新当前跟踪Vehicle s:
    • 对于每辆车
      • 如果给定的车辆有任何有效的匹配,则更新车辆位置并重新设置最后看到的计数器。 从候选人列表中删除匹配。
      • 否则,增加该车辆的最后一个计数器。
  2. 为剩余的比赛创build新的Vehicle
  3. 更新车辆数量
    • 对于每辆车
      • 如果车辆已经过了分隔线并且尚未计算完毕,则更新总数并将车辆标记为已计数
  4. 移除不再可见的车辆
    • 对于每辆车
      • 如果最后看到的计数器超过阈值,请取出车辆

4.解决scheme

我们可以重复使用最终版本的vehicle_counter.py的主脚本,其中包含我们的计数algorithm的实现:

 import logging import math import cv2 import numpy as np # ============================================================================ CAR_COLOURS = [ (0,0,255), (0,106,255), (0,216,255), (0,255,182), (0,255,76) , (144,255,0), (255,255,0), (255,148,0), (255,0,178), (220,0,255) ] # ============================================================================ class Vehicle(object): def __init__(self, id, position): self.id = id self.positions = [position] self.frames_since_seen = 0 self.counted = False @property def last_position(self): return self.positions[-1] def add_position(self, new_position): self.positions.append(new_position) self.frames_since_seen = 0 def draw(self, output_image): car_colour = CAR_COLOURS[self.id % len(CAR_COLOURS)] for point in self.positions: cv2.circle(output_image, point, 2, car_colour, -1) cv2.polylines(output_image, [np.int32(self.positions)] , False, car_colour, 1) # ============================================================================ class VehicleCounter(object): def __init__(self, shape, divider): self.log = logging.getLogger("vehicle_counter") self.height, self.width = shape self.divider = divider self.vehicles = [] self.next_vehicle_id = 0 self.vehicle_count = 0 self.max_unseen_frames = 7 @staticmethod def get_vector(a, b): """Calculate vector (distance, angle in degrees) from point a to point b. Angle ranges from -180 to 180 degrees. Vector with angle 0 points straight down on the image. Values increase in clockwise direction. """ dx = float(b[0] - a[0]) dy = float(b[1] - a[1]) distance = math.sqrt(dx**2 + dy**2) if dy > 0: angle = math.degrees(math.atan(-dx/dy)) elif dy == 0: if dx < 0: angle = 90.0 elif dx > 0: angle = -90.0 else: angle = 0.0 else: if dx < 0: angle = 180 - math.degrees(math.atan(dx/dy)) elif dx > 0: angle = -180 - math.degrees(math.atan(dx/dy)) else: angle = 180.0 return distance, angle @staticmethod def is_valid_vector(a): distance, angle = a threshold_distance = max(10.0, -0.008 * angle**2 + 0.4 * angle + 25.0) return (distance <= threshold_distance) def update_vehicle(self, vehicle, matches): # Find if any of the matches fits this vehicle for i, match in enumerate(matches): contour, centroid = match vector = self.get_vector(vehicle.last_position, centroid) if self.is_valid_vector(vector): vehicle.add_position(centroid) self.log.debug("Added match (%d, %d) to vehicle #%d. vector=(%0.2f,%0.2f)" , centroid[0], centroid[1], vehicle.id, vector[0], vector[1]) return i # No matches fit... vehicle.frames_since_seen += 1 self.log.debug("No match for vehicle #%d. frames_since_seen=%d" , vehicle.id, vehicle.frames_since_seen) return None def update_count(self, matches, output_image = None): self.log.debug("Updating count using %d matches...", len(matches)) # First update all the existing vehicles for vehicle in self.vehicles: i = self.update_vehicle(vehicle, matches) if i is not None: del matches[i] # Add new vehicles based on the remaining matches for match in matches: contour, centroid = match new_vehicle = Vehicle(self.next_vehicle_id, centroid) self.next_vehicle_id += 1 self.vehicles.append(new_vehicle) self.log.debug("Created new vehicle #%d from match (%d, %d)." , new_vehicle.id, centroid[0], centroid[1]) # Count any uncounted vehicles that are past the divider for vehicle in self.vehicles: if not vehicle.counted and (vehicle.last_position[1] > self.divider): self.vehicle_count += 1 vehicle.counted = True self.log.debug("Counted vehicle #%d (total count=%d)." , vehicle.id, self.vehicle_count) # Optionally draw the vehicles on an image if output_image is not None: for vehicle in self.vehicles: vehicle.draw(output_image) cv2.putText(output_image, ("%02d" % self.vehicle_count), (142, 10) , cv2.FONT_HERSHEY_PLAIN, 0.7, (127, 255, 255), 1) # Remove vehicles that have not been seen long enough removed = [ v.id for v in self.vehicles if v.frames_since_seen >= self.max_unseen_frames ] self.vehicles[:] = [ v for v in self.vehicles if not v.frames_since_seen >= self.max_unseen_frames ] for id in removed: self.log.debug("Removed vehicle #%d.", id) self.log.debug("Count updated, tracking %d vehicles.", len(self.vehicles)) # ============================================================================ 

该程序现在将所有当前跟踪的车辆的历史path与车辆数量一起绘制到输出图像中。 每辆车分配10种颜色中的1种。

注意,车辆D最终被跟踪了两次,但是它只被计算一次,因为我们在穿越分隔器之前失去了跟踪。 附录中提到了如何解决这个问题的想法。

基于脚本生成的最后处理帧

最后处理的帧

总的车辆数是10 。 这是一个正确的结果。

更多细节可以在脚本生成的输出中find:

  • 完整的debugging日志
  • 过滤出车辆计数器日志
  • 处理帧的马赛克:


A.潜在的改进

  • 重构,添加unit testing。
  • 改进前景蒙版的过滤/预处理
    • 多次迭代过滤,使用cv2.drawContours使用CV_FILLED填充空洞?
    • stream域algorithm?
  • 改进运动向量的分类
    • 创build一个预测器来估计车辆创build时的初始移动angular度(只有一个位置是已知的)…为了能够
    • 单独使用方向而不是方向 改变 (我认为这会使有效运动vector的angular度接近于零)。
  • 改善车辆跟踪
    • 预测车辆不被看见的位置。

B.注意

  • 似乎不可能直接从Python中的BackgroundSubtractorMOG (至less在OpenCV 2.4.x)中提取当前的背景图像,但有一种方法可以做一些工作。
  • 正如Henrik所build议的那样,我们可以使用中值混合来获得对背景的良好估计。