-->
当前位置:首页 > DayDayUp > 正文内容

[python] SDK连接大华相机,接收卡口抓拍图片,提取人脸,通过1400协议上传平台

Luz1年前 (2023-09-04)DayDayUp3031

相机的坑

平台上加了很多相机,但是发现总有一部分卡口相机没有人脸数据,研究发现这部分相机本身也是支持抓人脸的,但是人脸仅仅只会叠加在卡口图片,使用SDK接收不到,平台也接收不到。于是尝试使用SDK接收相机抓拍图片后,从中分离出人脸图片,再通过GAT1400协议上传平台。



SDK对接相机(camera_conn.py)

官网上下载了python的sdk,是带gui的,稍微调整了一下,使其可以在命令行运行。同时,引入图像拆分、1400对接模块。

# coding=utf-8
import sys
from PyQt5.QtWidgets import QMainWindow,QApplication, QTableWidgetItem, QMessageBox
from PyQt5.QtCore import Qt
from PyQt5.QtGui import QPixmap
from PyQt5.QtCore import QThread,pyqtSignal
import image_get
from IntelligentTrafficUI import Ui_MainWindow
from NetSDK.NetSDK import NetClient
from NetSDK.SDK_Struct import *
from NetSDK.SDK_Enum import *
from NetSDK.SDK_Callback import *
import time
from queue import Queue
import subimage_1400
global wnd
callback_num = 0
class TrafficCallBackAlarmInfo:
    def __init__(self):
        self.time_str = ""
        self.plate_number_str = ""
        self.plate_color_str = ""
        self.object_subType_str = ""
        self.vehicle_color_str = ""
    def get_alarm_info(self, alarm_info):
        self.time_str = '{}-{}-{} {}:{}:{}'.format(alarm_info.UTC.dwYear, alarm_info.UTC.dwMonth, alarm_info.UTC.dwDay,
                                                   alarm_info.UTC.dwHour, alarm_info.UTC.dwMinute, alarm_info.UTC.dwSecond)
        self.plate_number_str = str(alarm_info.stTrafficCar.szPlateNumber.decode('gb2312'))
        self.plate_color_str = str(alarm_info.stTrafficCar.szPlateColor, 'utf-8')
        self.object_subType_str = str(alarm_info.stuVehicle.szObjectSubType, 'utf-8')
        self.vehicle_color_str = str(alarm_info.stTrafficCar.szVehicleColor, 'utf-8')
        print(alarm_info.byImageIndex)
class BackUpdateUIThread(QThread):
    # 通过类成员对象定义信号
    update_date = pyqtSignal(int, object, int, bool, bool)
    # 处理业务逻辑
    def run(self):
        pass
@CB_FUNCTYPE(None, C_LLONG, C_DWORD, c_void_p, POINTER(c_ubyte), C_DWORD, C_LDWORD, c_int, c_void_p)
def AnalyzerDataCallBack(lAnalyzerHandle, dwAlarmType, pAlarmInfo, pBuffer, dwBufSize, dwUser, nSequence, reserved):
    print('Enter AnalyzerDataCallBack',lAnalyzerHandle,dwAlarmType)
    global callback_num
    local_path = os.path.abspath('.')
    is_global = False
    is_small = False
    show_info = TrafficCallBackAlarmInfo()
    callback_num += 1
    alarm_info = cast(pAlarmInfo, POINTER(DEV_EVENT_TRAFFICJUNCTION_INFO)).contents
    show_info.get_alarm_info(alarm_info)
    #print('pAlarmInfo',cast(pAlarmInfo, POINTER(DEV_EVENT_TRAFFICJUNCTION_INFO)).value)
    print('buffer:',pBuffer)
    print(show_info.time_str,show_info.plate_number_str,show_info.plate_color_str,show_info.object_subType_str,show_info.vehicle_color_str)
    if alarm_info.stuObject.bPicEnble:
        is_global = True
        GlobalScene_buf = cast(pBuffer,POINTER(c_ubyte * alarm_info.stuObject.stPicInfo.dwOffSet)).contents
        if not os.path.isdir(os.path.join(local_path, 'Global')):
            os.mkdir(os.path.join(local_path, 'Global'))
        with open('./Global/Global_Img' + str(callback_num) + '.jpg', 'wb+') as global_pic:
            global_pic.write(bytes(GlobalScene_buf))
            face_list=image_get.get_face('./Global/Global_Img' + str(callback_num) + '.jpg')
            print(callback_num,face_list)
        if (alarm_info.stuObject.stPicInfo.dwFileLenth > 0):
            is_small = True
            small_buf = pBuffer[alarm_info.stuObject.stPicInfo.dwOffSet:alarm_info.stuObject.stPicInfo.dwOffSet +
                                                                    alarm_info.stuObject.stPicInfo.dwFileLenth]
            if not os.path.isdir(os.path.join(local_path, 'Small')):
                os.mkdir(os.path.join(local_path, 'Small'))
            with open('./Small/Small_Img' + str(callback_num) + '.jpg', 'wb+') as small_pic:
                small_pic.write(bytes(small_buf))
    elif (dwBufSize > 0):
        is_global = True
        GlobalScene_buf = cast(pBuffer, POINTER(c_ubyte * dwBufSize)).contents
        if not os.path.isdir(os.path.join(local_path, 'Global')):
            os.mkdir(os.path.join(local_path, 'Global'))
        with open('./Global/Global_Img' + str(callback_num) + '.jpg', 'wb+') as global_pic:
            global_pic.write(bytes(GlobalScene_buf))
            face_list=image_get.get_face('./Global/Global_Img' + str(callback_num) + '.jpg')
            print(callback_num,face_list)
    cj_path='Global/Global_Img' + str(callback_num) + '.jpg'
    if face_list !=[]:
        for i in face_list:
            subimage_1400.submit_face('33052200001310094777',cj_path,i)
    return
    
def dingyue(sdk,channel,loginID):
    bNeedPicFile = 1
    dwUser = 0
    attachID = sdk.RealLoadPictureEx(loginID, channel, EM_EVENT_IVS_TYPE.ALL, bNeedPicFile, AnalyzerDataCallBack, dwUser, None)
    if not attachID:
        print('error:' + str(sdk.GetLastError()))
    else:
        print("订阅成功(Subscribe success)",attachID)
def login(ip,username,password,port):
    loginID = C_LLONG()
    playID = C_LLONG()
    freePort = c_int()
    attachID = C_LLONG()
    """
    回调,不涉及
    m_DisConnectCallBack = fDisConnect(DisConnectCallBack)
    m_ReConnectCallBack = fHaveReConnect(ReConnectCallBack)
    m_DecodingCallBack = fDecCBFun(DecodingCallBack)
    m_RealDataCallBack = fRealDataCallBackEx2(RealDataCallBack)
    """
        # 获取NetSDK对象并初始化
    
    sdk = NetClient()
    #sdk.InitEx(m_DisConnectCallBack) #掉线回调,不涉及
    sdk.InitEx()
    #sdk.SetAutoReconnect(m_ReConnectCallBack)
    ip = ip
    port = port
    username = username
    password = password
    stuInParam = NET_IN_LOGIN_WITH_HIGHLEVEL_SECURITY()
    stuInParam.dwSize = sizeof(NET_IN_LOGIN_WITH_HIGHLEVEL_SECURITY)
    stuInParam.szIP = ip.encode()
    stuInParam.nPort = port
    stuInParam.szUserName = username.encode()
    stuInParam.szPassword = password.encode()
    stuInParam.emSpecCap = EM_LOGIN_SPAC_CAP_TYPE.TCP
    stuInParam.pCapParam = None
    stuOutParam = NET_OUT_LOGIN_WITH_HIGHLEVEL_SECURITY()
    stuOutParam.dwSize = sizeof(NET_OUT_LOGIN_WITH_HIGHLEVEL_SECURITY)
    loginID, device_info, error_msg = sdk.LoginWithHighLevelSecurity(stuInParam, stuOutParam)
    if loginID:
        print(loginID,device_info)
        for i in range(int(device_info.nChanNum)):
            print(i)
            dingyue(sdk,i,loginID)
    else:
        print(error_msg)
    
if __name__ == '__main__':
    login('111.111.111.111','admin','admin123',37777) #连接相机
    while True:
        time.sleep(1) #防止程序退出后收不到回调事件



图像分割(image_get.py)

使用opencv分割图像,取绿色的轮廓(即人脸)。

import cv2
import numpy as np
import os
import hashlib
from functools import partial
fd_path='Global'
file_list=os.listdir(fd_path)
def md5(data,block_size=65536):
    m=hashlib.md5()
    for item in iter(partial(data.read,block_size),b''):
        m.update(item)
    str_md5=m.hexdigest()
    return str_md5
def get_face(filepath):
    # 加载图像
    image_path = filepath
    image = cv2.imread(image_path)
    # 将图像从 BGR 格式转换为 HSV 格式
    hsv_image = cv2.cvtColor(image, cv2.COLOR_BGR2HSV)
    # 定义绿色在HSV颜色空间中的范围
    lower_green = np.array([50, 50, 50])  # 你可能需要根据实际情况进行调整
    upper_green = np.array([80, 255, 255])
    # 创建掩码,通过颜色范围过滤图像
    green_mask = cv2.inRange(hsv_image, lower_green, upper_green)
    # 寻找轮廓
    contours, _ = cv2.findContours(green_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    searchflag=0
    # 遍历找到的轮廓
    i=1
    face_list=[]
    for contour in contours:
        x, y, w, h = cv2.boundingRect(contour)
        #print(x,y,w,h)
        if w >200 and h > 200:  # 限制最小方框尺寸
            green_box = image[y:y+h, x:x+w]
            #str_md5=md5(green_box)
            #print(str_md5)
            cv2.imwrite('tempfile.jpg', green_box)
            #cv2.rectangle(image, (x, y), (x + w, y + h), (0, 255, 0), 2)  # 在原图上绘制绿色方框
            f=open('tempfile.jpg','rb')
            str_md5=md5(f)
            print(str_md5)
            f.close()
            cv2.imwrite('face//'+str_md5+'.jpg', green_box)
            face_list.append('face//'+str_md5+'.jpg')
            searchflag=1
    if(searchflag):
        print(filepath,'提取成功')
    else:
        print(filepath,'提取失败')
        
    return face_list
    # 保存绘制了绿色方框的图像
    #cv2.imwrite('Small\\'+i, image)
"""for i in file_list:
    if 'jpg' in i:
        get_face(i)
        """


1400对接(subimage_1400.py)

时间有限,网上也没有找到python实现GAT1400的模块,参考GAT1400的文档,简单做一下注册、保活、人脸图片上传的流程。

import requests
import json
import base64
import time
from datetime import datetime
import hashlib
import random
import threading
import base64
base_url='http://111.222.111.222:9304'
# 注册URL和认证信息
register_url = base_url+'/VIID/System/Register'
keepalive_url = base_url+'/VIID/System/Keepalive'
username = 'admin    '
password = 'admin123'  # 替换为实际的密码

device_code='33052200001320646938'
channel_code='33052200001310094777'
# 创建注册请求数据
register_data = {
    "RegisterObject": {
        "DeviceID": device_code
    }
}

# 创建保活请求数据
keepalive_data = {
    "KeepaliveObject": {
        "DeviceID": device_code
    }
}

# 发送第一次注册请求
headers = {
    'Accept': 'application/json,application/*+json',
    'Content-Type': 'application/VIID+JSON;charset=UTF-8',
    'Connection': 'keepalive',
    'User-Identify': device_code,
    'User-Agent': 'libghttp/1.0'
}


def get_img_base(img_path):
    f=open(img_path,'rb')
    img=f.read()
    img_base=base64.b64encode(img).decode('utf-8')
    print(len(img_base))
    return img_base


def calculate_digest_response(username, password, realm, nonce, qop, cnonce, method, uri):
    # 计算 HA1
    ha1_str = f"{username}:{realm}:{password}"
    ha1 = hashlib.md5(ha1_str.encode()).hexdigest()
    # 计算 HA2
    ha2_str = f"{method}:{uri}"
    ha2 = hashlib.md5(ha2_str.encode()).hexdigest()
    # 计算 response
    if qop:
        response_str = f"{ha1}:{nonce}:{'00000001'}:{cnonce}:{qop}:{ha2}"
    else:
        response_str = f"{ha1}:{nonce}:{ha2}"
    response = hashlib.md5(response_str.encode()).hexdigest()
    return response




response = requests.post(url=register_url, headers=headers, data=json.dumps(register_data),allow_redirects=False)
print(response.text)
if response.status_code == 401:
    # 如果返回未认证,继续进行第二次注册请求
    auth_info = response.headers['WWW-Authenticate']
    print(auth_info)
    nonce = auth_info.split('nonce="')[1].split('"')[0]
    #algorithm = auth_info.split('algorithm="')[1].split('"')[0]
    algorithm='MD5'
    realm = auth_info.split('realm')[1].split('"')[0]
    realm='iasserver'
    #print(nonce,algorithm,realm)
    # 创建第二次注册请求数据
    qop = "auth"  # 或 "auth-int",根据需要选择
    cnonce = "your_cnonce"  # 替换为生成的 cnonce
    method = "POST"  # 或 "GET",根据实际请求方法选择
    uri = "/VIID/System/Register"  # 替换为实际的请求 URI
    cnonce = "luz"  # 替换为实际的cnonce
    nc = "00000001"  # 替换为实际的nc值
    response_code = calculate_digest_response(username, password, realm, nonce, qop, cnonce, method, uri)
    print("计算得到的 response:", response_code)
    response_data = {
        "RegisterObject": {
            "DeviceID": device_code
        }
    }

    auth_header = f'Digest username="{username}", realm="{realm}", nonce="{nonce}", uri="/VIID/System/Register", algorithm="{algorithm}", cnonce="{cnonce}", nc={nc}, qop={qop}, response="{response_code}"'
    headers['Authorization'] = auth_header
    print("第二次参数:",auth_header)
    # 发送第二次注册请求
    response = requests.post(register_url, headers=headers, data=json.dumps(response_data))


# 保活 

def keepalive_1400():
    while True:
        response = requests.post(keepalive_url, headers=headers, data=json.dumps(keepalive_data))
        
        if response.status_code == 200:
            response_json = response.json()
            print("保活成功,响应内容:")
            print(response_json)
        else:
            print("保活失败,状态码:", response.status_code)
        
        # 每隔60秒执行一次保活操作
        time.sleep(60)

def submit_face(channel_code,cj_path,rl_path):
    print(cj_path,rl_path)
    # 定义请求的URL
    url = base_url+'/VIID/Faces'

    # 创建请求头
    headers = {
        'Accept': 'application/json,application/VIID+json',
        'Content-Type': 'application/VIID+JSON;charset=UTF-8',
        'Connection': 'keepalive',
        'User-Identify': device_code,
        'User-Agent': 'libghttp/1.0'
    }

    # 创建人脸批量增加请求数据
    cur_time=str(datetime.now().strftime("%Y%m%d%H%M%S"))
    print(cur_time)
    begin_code=channel_code+cur_time+str(random.randrange(10,99))
    face_data = {
        "FaceListObject": {
            "FaceObject": [{
                #"FaceID":   "330522000013204130240220200510200408000010600001",
                "FaceID":begin_code,
                "InfoKind": 0,
                "SourceID": begin_code,
                "DeviceID": channel_code,
                "LeftTopX": 163,
                "LeftTopY": 726,
                "RightBtmX": 334,
                "RightBtmY": 897,
                "AgeUpLimit": 0,
                "AgeLowerLimit": 0,
                "AccompanyNumber": 0,
                "IsDriver": 0,
                "IsForeigner": 0,
                "IsSuspectedTerrorist": 0,
                "IsCriminalInvolved": 0,
                "IsDetainees": 0,
                "IsVictim": 0,
                "IsSuspiciousPerson": 0,
                "Attitude": 0,
                "m_dSimilaritydegree": 0,
                "SubImageList": {
                    "SubImageInfoObject": [{
                        "ImageID": begin_code+'3',
                        "EventSort": 2,
                        "DeviceID": channel_code,
                        "Type": "14", #大图
                        "FileFormat": "Jpeg",
                        "ShotTime": cur_time,
                        "Width": 1920,
                        "Height": 1080,
                        "FileSize": 156500,
                        "Data": get_img_base(cj_path)
                    }, {
                        "ImageID": begin_code+'2',
                        "EventSort": 3,
                        "DeviceID": channel_code,
                        "Type": "11", #人脸图
                        "FileFormat": "Jpeg",
                        "ShotTime": cur_time,
                        "Width": 461,
                        "Height": 461,
                        "FileSize": 101984,
                        "Data": get_img_base(rl_path)
                    }]
                }
            }]
        }
    }

    # 发送人脸批量增加请求
    response = requests.post(url, headers=headers, data=json.dumps(face_data))

    # 检查响应
    if response.status_code == 200:
        response_json = response.json()
        print("人脸批量增加成功,响应内容:")
        print(response_json)
    else:
        print("人脸批量增加失败,状态码:", response.status_code)
        print(response.text)



# 检查注册是否成功
if response.status_code == 200:
    response_json = response.json()
    print("注册成功,响应内容:")
    print(response_json)
    thread=threading.Thread(target=keepalive_1400)
    thread.start() #另起一个线程做保活,否则主线程运行会阻塞
else:
    print("注册失败,状态码:", response.status_code)
    print(response.text)



运行效果

image.png

发表评论

访客

◎欢迎参与讨论,请在这里发表您的看法和观点。