大华ICC登录鉴权python实现
官方文档
对接模式
平台API对接分【客户端模式】和【用户密码模式】两种,对接开发时需要根据场景选择正确模式
客户端模式:用于第三方平台和ICC平台间对接使用,和平台用户无关,对接交互不涉及用户数据权限过滤,如第三方平台从ICC平台同步全量基础数据。
用户密码模式:用于平台用户操作中需要和ICC平台交互的场景,对接交互中ICC接口会根据当前用户权限对相关数据进行数据权限过滤。
请求头设置
头部参数 | 解释 |
---|---|
Authorization | 格式:token_type + 空格 + access_token, token_type一般为固定值bearer 客户端认证模式示例:bearer 704dc1bc-b05e-4ed5-a889-0261584919f6 用户密码认证模式示例:bearer 1:bb951116-d963-4528-80e1-91c2e94c3e75 |
User-Id | 客户端认证模式必须携带且设置为1 |
用户密码认证模式(推荐)
通过ICC用户账号、用户密码、clientId 和 clientSecret 访问鉴权,实现资源调用
具体流程,[获取公钥]->[RSA密码加密]->[获取access_token]->[用户保活]->[刷新access_token]
公钥只能使用一次,用后失效
密码认证方式订阅事件,必须 进行[用户保活]
只有密码认证模式有刷新access_token,客户端模式直接重新获取
申请访问凭证非web_client/web_client时,用户密码模式access_token有效期2h
申请访问凭证是web_client/web_client时,用户密码模式access_token有效期5分钟(ICC5.0.6部分版本及之前版本,可能不支持申请访问凭证,故可能用到web_client/web_client作为凭证)
用户保活接口必须30分钟内做保活,保证终端用户在线
refresh_token有效期为30分钟,30分钟内需调刷新token保证refresh_token有效
建议在3~4分钟分钟内调保活和刷新token接口
获取公钥
GET /evo-apigw/evo-oauth/1.0.0/oauth/public-key HTTP/1.1Host: 10.80.3.101
RSA加密密码
/** * RSA公钥加密 * * @param publicKey 公钥 * @param password 待加密的密码 * @return 密文 */ public static String encrypt(String publicKey,String password){ try { byte[] decoded = java.util.Base64.getDecoder().decode(publicKey.getBytes("UTF-8")); RSAPublicKey pubKey = (RSAPublicKey) KeyFactory.getInstance("RSA").generatePublic(new X509EncodedKeySpec(decoded)); // RSA加密 Cipher cipher = Cipher.getInstance("RSA"); cipher.init(Cipher.ENCRYPT_MODE, pubKey); //**此处Base64编码,开发者可以使用自己的库** String outStr = java.util.Base64.getEncoder().encodeToString(cipher.doFinal(password.getBytes("UTF-8"))); //outStr 是加密密文 System.out.println(outStr); return outStr; } catch (Exception e) { e.printStackTrace(); } return null; }
获取token
POST /evo-apigw/evo-oauth/1.0.0/oauth/extend/token HTTP/1.1Content-Type: application/json;charset=UTF-8Host: 10.35.121.63:443{ "grant_type":"password", "username":"用户账号", "password":"RSA加密后的密码", "client_id":"申请的clientId", "client_secret":"申请的clientSecret", "public_key":"获取的公钥"}---------------------------------------------------------------------{ "success": true, "data": { "access_token": "10000:5cb47cd80-35d9-406d-a8d3-d87dd672d5d3", "token_type": "bearer", "refresh_token": "b83403e1-5ea1-41fd-81fc-e86fe5252c93", "expires_in": 86399, "scope": "*", "userId": "10000", "magicId": "1868a7db-adeb-42de-8064-43b3706ae407" }, "code": "0", "errMsg": ""}
python实现
import requests import base64 import json from Crypto.Cipher import PKCS1_v1_5 as Cipher_pksc1_v1_5 from Crypto.PublicKey import RSA import urllib3 import time import datetime import ssl from twilio.rest import Client from qcloudsms_py import SmsMultiSender, SmsSingleSender from qcloudsms_py.httpclient import HTTPError urllib3.disable_warnings() url="ICC登录地址" def _encrpt(string, public_key): #RSA加密 rsakey = RSA.importKey(public_key) cipher = Cipher_pksc1_v1_5.new(rsakey) # 因为encryptor.encrypt方法其内部就实现了加密再次Base64加密的过程,所以这里实际是通过下面的1和2完成了JSEncrypt的加密方法 encrypt_text = cipher.encrypt(string.encode()) # 1.对账号密码组成的字符串加密 cipher_text_tmp = base64.b64encode(encrypt_text) # 2.对加密后的字符串base64加密 return cipher_text_tmp.decode() def gen_body(pwd, public_key): #加密 key = '-----BEGIN PUBLIC KEY-----\n' + public_key + '\n-----END PUBLIC KEY-----' encrypt_res = _encrpt(pwd, key) return encrypt_res def get_public_key(): #第一步获取公钥 a=json.loads(requests.get(url+"/evo-apigw/evo-oauth/1.0.0/oauth/public-key",verify=False).text)['data']['publicKey'] return a def login(): pub=get_public_key() print("pubilc_key:\t",pub) b=gen_body("用户密码",pub) print("encrypt_pass:\t"+b) data={ "grant_type":"password",#认证模式 密码认证 "username":"system", #登录用户 "password":b, #加密的密码 "client_id":"Luz", #随便填 "client_secret":"052e9086-1cf7-487a-b2ba-5948f9e766f6", #随便填 "public_key":pub } token=json.loads(requests.post(url+"/evo-apigw/evo-oauth/1.0.0/oauth/extend/token",json=data,verify=False).text)['data']['access_token'] #从返回json中提取token print(token) return token token=login() headers={ "Authorization":"bearer "+token, "User-Id":"1" } #接口调用测试,查询通道十分钟内的录像 t1=int(time.time())-600 t2=t1+600 f=open("channel.txt") a=f.read() b=a.split('\n') for i in b: c=i.split('\t') data2={ "data":{ "channelId":c[1], "recordSource":"2", "startTime":t1, "endTime":t2, "streamType":"1", "recordType":"0" } } #print(111) d=requests.post(url+"/evo-apigw/admin/API/SS/Record/QueryRecords",headers=headers,json=data2,verify=False).text #d=requests.post(url+"/evo-apigw/admin/API/MTS/Video/StartVideo",headers=headers,json=data2,verify=False).text hf=json.loads(d)['data']['records'] #print(d) #print(d) chann_name=c[0] mat = "{:40}\t{:28}" sms="" if hf==[]: print(mat.format(chann_name,"error")) else: print(mat.format(chann_name,"success"))