[python] SDK连接大华相机,接收卡口抓拍图片,提取人脸,通过1400协议上传平台
相机的坑
平台上加了很多相机,但是发现总有一部分卡口相机没有人脸数据,研究发现这部分相机本身也是支持抓人脸的,但是人脸仅仅只会叠加在卡口图片,使用SDK接收不到,平台也接收不到。于是尝试使用SDK接收相机抓拍图片后,从中分离出人脸图片,再通过GAT1400协议上传平台。
SDK对接相机(camera_conn.py)
官网上下载了python的sdk,是带gui的,稍微调整了一下,使其可以在命令行运行。同时,引入图像拆分、1400对接模块。
# coding=utf-8 import sys from PyQt5.QtWidgets import QMainWindow,QApplication, QTableWidgetItem, QMessageBox from PyQt5.QtCore import Qt from PyQt5.QtGui import QPixmap from PyQt5.QtCore import QThread,pyqtSignal import image_get from IntelligentTrafficUI import Ui_MainWindow from NetSDK.NetSDK import NetClient from NetSDK.SDK_Struct import * from NetSDK.SDK_Enum import * from NetSDK.SDK_Callback import * import time from queue import Queue import subimage_1400 global wnd callback_num = 0 class TrafficCallBackAlarmInfo: def __init__(self): self.time_str = "" self.plate_number_str = "" self.plate_color_str = "" self.object_subType_str = "" self.vehicle_color_str = "" def get_alarm_info(self, alarm_info): self.time_str = '{}-{}-{} {}:{}:{}'.format(alarm_info.UTC.dwYear, alarm_info.UTC.dwMonth, alarm_info.UTC.dwDay, alarm_info.UTC.dwHour, alarm_info.UTC.dwMinute, alarm_info.UTC.dwSecond) self.plate_number_str = str(alarm_info.stTrafficCar.szPlateNumber.decode('gb2312')) self.plate_color_str = str(alarm_info.stTrafficCar.szPlateColor, 'utf-8') self.object_subType_str = str(alarm_info.stuVehicle.szObjectSubType, 'utf-8') self.vehicle_color_str = str(alarm_info.stTrafficCar.szVehicleColor, 'utf-8') print(alarm_info.byImageIndex) class BackUpdateUIThread(QThread): # 通过类成员对象定义信号 update_date = pyqtSignal(int, object, int, bool, bool) # 处理业务逻辑 def run(self): pass @CB_FUNCTYPE(None, C_LLONG, C_DWORD, c_void_p, POINTER(c_ubyte), C_DWORD, C_LDWORD, c_int, c_void_p) def AnalyzerDataCallBack(lAnalyzerHandle, dwAlarmType, pAlarmInfo, pBuffer, dwBufSize, dwUser, nSequence, reserved): print('Enter AnalyzerDataCallBack',lAnalyzerHandle,dwAlarmType) global callback_num local_path = os.path.abspath('.') is_global = False is_small = False show_info = TrafficCallBackAlarmInfo() callback_num += 1 alarm_info = cast(pAlarmInfo, POINTER(DEV_EVENT_TRAFFICJUNCTION_INFO)).contents show_info.get_alarm_info(alarm_info) #print('pAlarmInfo',cast(pAlarmInfo, POINTER(DEV_EVENT_TRAFFICJUNCTION_INFO)).value) print('buffer:',pBuffer) print(show_info.time_str,show_info.plate_number_str,show_info.plate_color_str,show_info.object_subType_str,show_info.vehicle_color_str) if alarm_info.stuObject.bPicEnble: is_global = True GlobalScene_buf = cast(pBuffer,POINTER(c_ubyte * alarm_info.stuObject.stPicInfo.dwOffSet)).contents if not os.path.isdir(os.path.join(local_path, 'Global')): os.mkdir(os.path.join(local_path, 'Global')) with open('./Global/Global_Img' + str(callback_num) + '.jpg', 'wb+') as global_pic: global_pic.write(bytes(GlobalScene_buf)) face_list=image_get.get_face('./Global/Global_Img' + str(callback_num) + '.jpg') print(callback_num,face_list) if (alarm_info.stuObject.stPicInfo.dwFileLenth > 0): is_small = True small_buf = pBuffer[alarm_info.stuObject.stPicInfo.dwOffSet:alarm_info.stuObject.stPicInfo.dwOffSet + alarm_info.stuObject.stPicInfo.dwFileLenth] if not os.path.isdir(os.path.join(local_path, 'Small')): os.mkdir(os.path.join(local_path, 'Small')) with open('./Small/Small_Img' + str(callback_num) + '.jpg', 'wb+') as small_pic: small_pic.write(bytes(small_buf)) elif (dwBufSize > 0): is_global = True GlobalScene_buf = cast(pBuffer, POINTER(c_ubyte * dwBufSize)).contents if not os.path.isdir(os.path.join(local_path, 'Global')): os.mkdir(os.path.join(local_path, 'Global')) with open('./Global/Global_Img' + str(callback_num) + '.jpg', 'wb+') as global_pic: global_pic.write(bytes(GlobalScene_buf)) face_list=image_get.get_face('./Global/Global_Img' + str(callback_num) + '.jpg') print(callback_num,face_list) cj_path='Global/Global_Img' + str(callback_num) + '.jpg' if face_list !=[]: for i in face_list: subimage_1400.submit_face('33052200001310094777',cj_path,i) return def dingyue(sdk,channel,loginID): bNeedPicFile = 1 dwUser = 0 attachID = sdk.RealLoadPictureEx(loginID, channel, EM_EVENT_IVS_TYPE.ALL, bNeedPicFile, AnalyzerDataCallBack, dwUser, None) if not attachID: print('error:' + str(sdk.GetLastError())) else: print("订阅成功(Subscribe success)",attachID) def login(ip,username,password,port): loginID = C_LLONG() playID = C_LLONG() freePort = c_int() attachID = C_LLONG() """ 回调,不涉及 m_DisConnectCallBack = fDisConnect(DisConnectCallBack) m_ReConnectCallBack = fHaveReConnect(ReConnectCallBack) m_DecodingCallBack = fDecCBFun(DecodingCallBack) m_RealDataCallBack = fRealDataCallBackEx2(RealDataCallBack) """ # 获取NetSDK对象并初始化 sdk = NetClient() #sdk.InitEx(m_DisConnectCallBack) #掉线回调,不涉及 sdk.InitEx() #sdk.SetAutoReconnect(m_ReConnectCallBack) ip = ip port = port username = username password = password stuInParam = NET_IN_LOGIN_WITH_HIGHLEVEL_SECURITY() stuInParam.dwSize = sizeof(NET_IN_LOGIN_WITH_HIGHLEVEL_SECURITY) stuInParam.szIP = ip.encode() stuInParam.nPort = port stuInParam.szUserName = username.encode() stuInParam.szPassword = password.encode() stuInParam.emSpecCap = EM_LOGIN_SPAC_CAP_TYPE.TCP stuInParam.pCapParam = None stuOutParam = NET_OUT_LOGIN_WITH_HIGHLEVEL_SECURITY() stuOutParam.dwSize = sizeof(NET_OUT_LOGIN_WITH_HIGHLEVEL_SECURITY) loginID, device_info, error_msg = sdk.LoginWithHighLevelSecurity(stuInParam, stuOutParam) if loginID: print(loginID,device_info) for i in range(int(device_info.nChanNum)): print(i) dingyue(sdk,i,loginID) else: print(error_msg) if __name__ == '__main__': login('111.111.111.111','admin','admin123',37777) #连接相机 while True: time.sleep(1) #防止程序退出后收不到回调事件
图像分割(image_get.py)
使用opencv分割图像,取绿色的轮廓(即人脸)。
import cv2 import numpy as np import os import hashlib from functools import partial fd_path='Global' file_list=os.listdir(fd_path) def md5(data,block_size=65536): m=hashlib.md5() for item in iter(partial(data.read,block_size),b''): m.update(item) str_md5=m.hexdigest() return str_md5 def get_face(filepath): # 加载图像 image_path = filepath image = cv2.imread(image_path) # 将图像从 BGR 格式转换为 HSV 格式 hsv_image = cv2.cvtColor(image, cv2.COLOR_BGR2HSV) # 定义绿色在HSV颜色空间中的范围 lower_green = np.array([50, 50, 50]) # 你可能需要根据实际情况进行调整 upper_green = np.array([80, 255, 255]) # 创建掩码,通过颜色范围过滤图像 green_mask = cv2.inRange(hsv_image, lower_green, upper_green) # 寻找轮廓 contours, _ = cv2.findContours(green_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) searchflag=0 # 遍历找到的轮廓 i=1 face_list=[] for contour in contours: x, y, w, h = cv2.boundingRect(contour) #print(x,y,w,h) if w >200 and h > 200: # 限制最小方框尺寸 green_box = image[y:y+h, x:x+w] #str_md5=md5(green_box) #print(str_md5) cv2.imwrite('tempfile.jpg', green_box) #cv2.rectangle(image, (x, y), (x + w, y + h), (0, 255, 0), 2) # 在原图上绘制绿色方框 f=open('tempfile.jpg','rb') str_md5=md5(f) print(str_md5) f.close() cv2.imwrite('face//'+str_md5+'.jpg', green_box) face_list.append('face//'+str_md5+'.jpg') searchflag=1 if(searchflag): print(filepath,'提取成功') else: print(filepath,'提取失败') return face_list # 保存绘制了绿色方框的图像 #cv2.imwrite('Small\\'+i, image) """for i in file_list: if 'jpg' in i: get_face(i) """
1400对接(subimage_1400.py)
时间有限,网上也没有找到python实现GAT1400的模块,参考GAT1400的文档,简单做一下注册、保活、人脸图片上传的流程。
import requests import json import base64 import time from datetime import datetime import hashlib import random import threading import base64 base_url='http://111.222.111.222:9304' # 注册URL和认证信息 register_url = base_url+'/VIID/System/Register' keepalive_url = base_url+'/VIID/System/Keepalive' username = 'admin ' password = 'admin123' # 替换为实际的密码 device_code='33052200001320646938' channel_code='33052200001310094777' # 创建注册请求数据 register_data = { "RegisterObject": { "DeviceID": device_code } } # 创建保活请求数据 keepalive_data = { "KeepaliveObject": { "DeviceID": device_code } } # 发送第一次注册请求 headers = { 'Accept': 'application/json,application/*+json', 'Content-Type': 'application/VIID+JSON;charset=UTF-8', 'Connection': 'keepalive', 'User-Identify': device_code, 'User-Agent': 'libghttp/1.0' } def get_img_base(img_path): f=open(img_path,'rb') img=f.read() img_base=base64.b64encode(img).decode('utf-8') print(len(img_base)) return img_base def calculate_digest_response(username, password, realm, nonce, qop, cnonce, method, uri): # 计算 HA1 ha1_str = f"{username}:{realm}:{password}" ha1 = hashlib.md5(ha1_str.encode()).hexdigest() # 计算 HA2 ha2_str = f"{method}:{uri}" ha2 = hashlib.md5(ha2_str.encode()).hexdigest() # 计算 response if qop: response_str = f"{ha1}:{nonce}:{'00000001'}:{cnonce}:{qop}:{ha2}" else: response_str = f"{ha1}:{nonce}:{ha2}" response = hashlib.md5(response_str.encode()).hexdigest() return response response = requests.post(url=register_url, headers=headers, data=json.dumps(register_data),allow_redirects=False) print(response.text) if response.status_code == 401: # 如果返回未认证,继续进行第二次注册请求 auth_info = response.headers['WWW-Authenticate'] print(auth_info) nonce = auth_info.split('nonce="')[1].split('"')[0] #algorithm = auth_info.split('algorithm="')[1].split('"')[0] algorithm='MD5' realm = auth_info.split('realm')[1].split('"')[0] realm='iasserver' #print(nonce,algorithm,realm) # 创建第二次注册请求数据 qop = "auth" # 或 "auth-int",根据需要选择 cnonce = "your_cnonce" # 替换为生成的 cnonce method = "POST" # 或 "GET",根据实际请求方法选择 uri = "/VIID/System/Register" # 替换为实际的请求 URI cnonce = "luz" # 替换为实际的cnonce nc = "00000001" # 替换为实际的nc值 response_code = calculate_digest_response(username, password, realm, nonce, qop, cnonce, method, uri) print("计算得到的 response:", response_code) response_data = { "RegisterObject": { "DeviceID": device_code } } auth_header = f'Digest username="{username}", realm="{realm}", nonce="{nonce}", uri="/VIID/System/Register", algorithm="{algorithm}", cnonce="{cnonce}", nc={nc}, qop={qop}, response="{response_code}"' headers['Authorization'] = auth_header print("第二次参数:",auth_header) # 发送第二次注册请求 response = requests.post(register_url, headers=headers, data=json.dumps(response_data)) # 保活 def keepalive_1400(): while True: response = requests.post(keepalive_url, headers=headers, data=json.dumps(keepalive_data)) if response.status_code == 200: response_json = response.json() print("保活成功,响应内容:") print(response_json) else: print("保活失败,状态码:", response.status_code) # 每隔60秒执行一次保活操作 time.sleep(60) def submit_face(channel_code,cj_path,rl_path): print(cj_path,rl_path) # 定义请求的URL url = base_url+'/VIID/Faces' # 创建请求头 headers = { 'Accept': 'application/json,application/VIID+json', 'Content-Type': 'application/VIID+JSON;charset=UTF-8', 'Connection': 'keepalive', 'User-Identify': device_code, 'User-Agent': 'libghttp/1.0' } # 创建人脸批量增加请求数据 cur_time=str(datetime.now().strftime("%Y%m%d%H%M%S")) print(cur_time) begin_code=channel_code+cur_time+str(random.randrange(10,99)) face_data = { "FaceListObject": { "FaceObject": [{ #"FaceID": "330522000013204130240220200510200408000010600001", "FaceID":begin_code, "InfoKind": 0, "SourceID": begin_code, "DeviceID": channel_code, "LeftTopX": 163, "LeftTopY": 726, "RightBtmX": 334, "RightBtmY": 897, "AgeUpLimit": 0, "AgeLowerLimit": 0, "AccompanyNumber": 0, "IsDriver": 0, "IsForeigner": 0, "IsSuspectedTerrorist": 0, "IsCriminalInvolved": 0, "IsDetainees": 0, "IsVictim": 0, "IsSuspiciousPerson": 0, "Attitude": 0, "m_dSimilaritydegree": 0, "SubImageList": { "SubImageInfoObject": [{ "ImageID": begin_code+'3', "EventSort": 2, "DeviceID": channel_code, "Type": "14", #大图 "FileFormat": "Jpeg", "ShotTime": cur_time, "Width": 1920, "Height": 1080, "FileSize": 156500, "Data": get_img_base(cj_path) }, { "ImageID": begin_code+'2', "EventSort": 3, "DeviceID": channel_code, "Type": "11", #人脸图 "FileFormat": "Jpeg", "ShotTime": cur_time, "Width": 461, "Height": 461, "FileSize": 101984, "Data": get_img_base(rl_path) }] } }] } } # 发送人脸批量增加请求 response = requests.post(url, headers=headers, data=json.dumps(face_data)) # 检查响应 if response.status_code == 200: response_json = response.json() print("人脸批量增加成功,响应内容:") print(response_json) else: print("人脸批量增加失败,状态码:", response.status_code) print(response.text) # 检查注册是否成功 if response.status_code == 200: response_json = response.json() print("注册成功,响应内容:") print(response_json) thread=threading.Thread(target=keepalive_1400) thread.start() #另起一个线程做保活,否则主线程运行会阻塞 else: print("注册失败,状态码:", response.status_code) print(response.text)