浙江大华9系平台鉴权与HLS拉流地址获取 python demo
官方文档
https://open-gov.dahuatech.com/#/interfaces?id=141&parentID=135
创建会话
接口描述
通过两次交互,最终完成登录,并获取到token,在用户后续的请求交互中需要携带该token进行身份和权限的校验。
登录流程:
第一次登陆,客户端只传用户名,服务端返回realm、readomKey和encryptType信息。
第二次登录,客户端根据返回的信息,按照指定的加密算法计算签名,再带着用户名和签名登陆一次。
参数说明:
username-value:用户名
passwd-value:明文密码
encryptType-value:服务端返回的哈希算法,可以是MD5,SHA1等,目前只支持MD5,且计算结果的a-f使用小写字母表示。
realm-value:服务端返回的域信息,服务端可以使用集群UID作为域信息
randomKey-value:随机秘钥种子
通用加密流程:(第一次登陆返回没有method字段)
passwd-value0 = encryptType-value(passwd-value) 其中passwd-value是密码明文。
passwd-value1 = encryptType-value(username-value:passwd-value0) 中间无:号
passwd-tmp = encryptType-value(passwd-value1)
encrypted-passwd = encryptType-value(username-value:realm-value:passwd-tmp) 中间含:号
signature = encryptType-value(encrypted-passwd:randomKey-value) 中间含:号
简易加密流程:(第一次登陆返回method字段为simple)
encrypted-passwd = encryptType-value(username-value:realm-value:passwd-value) 中间含:号,passwd-value是密码明文。
signature = encryptType-value(encrypted-passwd:randomKey-value) 中间含:号
请求
请求语法
POST /videoService/accounts/authorize
请求参数
无
第一次交互
请求内容
{ "userName" : "system", "clientType" : "winpc", "ipAddress" : "10.10.10.10" }
请求内容参数
userName: 类型
string
,必填。用户名,最长32字节。clientType: 类型
string
,必填。客户端类型。固定填 winpc 。ipAddress: 类型
string
,选填。客户端的ip地址,用于日志审计。
响应内容
{ "realm" : "TheNextService", "randomKey" : "54b53072540eeeb8f8e9343e71f28176_d0e2bdad-1777-446b-b863-51257f5558d3", "encryptType" : "MD5", "method" : "simple" }
响应内容参数
realm: 类型
string
。域信息,加密过程使用。randomKey: 类型
string
。随机密钥种子。encryptType: 类型
string
。加密算法method : 类型
string
。使用哪种加密流程。没有该字段,按通用加密流程计算,"simple"表示按简易加密流程计算。
第二次交互
请求内容
{ "userName" : "system", "signature" : "ad1290902f8bfb0bd7983111d91ddb6b", "randomKey" : "54b53072540eeeb8f8e9343e71f28176_d0e2bdad-1777-446b-b863-51257f5558d3", "encryptType" : "MD5", "clientType" : "winpc", "ipAddress" : "10.10.10.10" }
请求内容参数
userName : 类型
string
,必填。用户名,最长32字节。signature : 类型
string
,必填。根据签名计算方法得到的签名值。randomKey : 类型
string
,必填。随机密钥种子。encryptType : 类型
string
,必填。加密算法。clientType : 类型
string
,必填。客户端类型。ipAddress : 类型
string
,选填。客户端ip地址,用于日志审计。
响应内容
{ "duration" : 120, "token" : "S4NbecfYB19QUJHT4T8M7G_Y5524jUq1yNQtu+FruA87hAErnAHoQTEXtUn1AEZz0TKwKi0EIriyrT8ul2ejSGcy7lxWndsP4+GWch2C/fcgy+YAz2GJnbQtRFFbq3XNfi7iAXeEG6O40GKODI2ICTq", "userName": "system", "userId" : "S4NbecfYB19QUJHT4T8M7G", "lastLoginIp": "10.10.10.10" }
响应内容参数
duration : 类型
int
。有效时间,由服务端指定,单位秒,建议3/4 duration时就进行会话更新。token : 类型
string
。返回登陆令牌,令牌字符串由服务端发布。之后的其它请求,在HTTP头的 X-Subject-Token: 带上这个令牌进行鉴权。userName : 类型
string
。创建该会话的用户名。userId : 类型
string
。用户名对应的用户Id,如果需要对该用户信息进行修改,使用用户Id。lastLoginIp : 类型
string
。上次登陆的IP。version : 类型
string
。视频云版本
注意事项
必须设置HTTP头的
Content-Type: application/json;charset=UTF-8
,否则返回415。第一次登录成功,响应返回的http 状态码为401
为便于理解,附上计算通用加密流程签名的Java伪代码片断
JSON response = firstLogin(); String randomKey = response["randomKey"]; String realm = response["realm"]; String userName = xxx; /// 用户名 String password = yyy; /// 该用户的明文密码 /// 一共计算五次MD5 String signature = encrypt(password, "MD5"); signature = encrypt(userName+signature, "MD5"); signature = encrypt(signature, "MD5"); signature = encrypt(userName+":"+realm+":"+signature, "MD5"); signature = encrypt(signature+":"+randomKey, "MD5");
示例
第一次请求示例
POST /videoService/accounts/authorize HTTP/1.1 Content-Type: application/json;charset=UTF-8 Content-Length: 103 { "userName" : "system", "clientType" : "winpc", "ipAddress" : "10.10.10.10" }
第一次响应示例
HTTP/1.1 401 Unauthorized { "realm" : "TheNextService", "randomKey" : "54b53072540eeeb8f8e9343e71f28176_d0e2bdad", "encryptType" : "MD5" }
第二次请求示例
POST /videoService/accounts/authorize HTTP/1.1 Content-Type: application/json;charset=UTF-8 Content-Length: 217 { "userName" : "system", "signature" : "ad1290902f8bfb0bd7983111d91ddb6b", "randomKey" : "54b53072540eeeb8f8e9343e71f28176_d0e2bdad-1777-446b-b863-51257f5558d3", "encryptType" : "MD5", "clientType" : "winpc", "ipAddress" : "10.10.10.10" }
第二次响应示例
HTTP/1.1 200 OK { "duration" : 120, "token" : "S4NbecfYB19QUJHT4T8M7G_Y5524jUq1yNQtu+FruA87hAErnAHoQTEXtUn1AEZz0TKwKi0EIriyrT8ul2ejSGcy7lxWndsP4+GWch2C/fcgy+YAz2GJnbQtRFFbq3XNfi7iAXeEG6O40GKODI2ICTq", "userName": "system", "userId" : "S4NbecfYB19QUJHT4T8M7G", "lastLoginIp": "10.10.10.10" }
demo示例
import hashlib import requests import json API_ENDPOINT = "/videoService/accounts/authorize" baseUrl="http://1.1.1.1:8314" #平台Web地址 def send_first_login_request(): # Prepare and send the first login request to the API endpoint url = baseUrl + API_ENDPOINT # Replace "API_BASE_URL" with the actual base URL of the API payload = { "userName": "system", "clientType": "winpc", "ipAddress": "10.10.10.10" } headers = { "Content-Type": "application/json;charset=UTF-8" } response = requests.post(url, json=payload, headers=headers) return response.json() def calculate_signature(username, password, realm, random_key, encrypt_type, method): password_value_0 = encrypt(password, encrypt_type) password_value_1 = encrypt(username + password_value_0, encrypt_type) password_tmp = encrypt(password_value_1, encrypt_type) encrypted_password = encrypt(username + ":" + realm + ":" + password_tmp, encrypt_type) return encrypt(encrypted_password + ":" + random_key, encrypt_type) def send_second_login_request(username, signature, random_key, encrypt_type): # Prepare and send the second login request to the API endpoint url = baseUrl + API_ENDPOINT # Replace "API_BASE_URL" with the actual base URL of the API payload = { "userName": username, "signature": signature, "randomKey": random_key, "encryptType": encrypt_type, "clientType": "winpc", "ipAddress": "10.10.10.10" } headers = { "Content-Type": "application/json;charset=UTF-8" } response = requests.post(url, json=payload, headers=headers) return response.json() def encrypt(input_string, encrypt_type): hashed = hashlib.new(encrypt_type, input_string.encode()).hexdigest() return hashed # Step 1: Send first login request first_login_response = send_first_login_request() print(first_login_response) # Extract response parameters realm = first_login_response["realm"] random_key = first_login_response["randomKey"] encrypt_type = first_login_response["encryptType"] method = first_login_response.get("method", "") # Step 2: Prepare second login request username = "system" password = "XXXXXX" # Replace with the actual password signature = calculate_signature(username, password, realm, random_key, encrypt_type, method) print(signature) # Step 3: Send second login request second_login_response = send_second_login_request(username, signature, random_key, encrypt_type) print(second_login_response) # Process second login response # Extract the "token", "userName", "userId", "lastLoginIp", and other required fields from the response JSON # and perform the necessary actions. def get_hls_url(channel_id, token): # Prepare and send the request to get HLS URL base_url = baseUrl # Replace with the actual base URL endpoint = "/videoService/realmonitor/uri" params = { "channelId": channel_id, "scheme": "HLS" } headers = { "X-Subject-Token": token } response = requests.get(base_url + endpoint, params=params, headers=headers).text return json.loads(response) # Example usage channel_id = "xxxxx" #测试通道编码 token = second_login_response['token'] print(token) # Get HLS URL hls_url_response = get_hls_url(channel_id, token) print(hls_url_response) # Extract the HLS URL hls_url = hls_url_response["url"] print("HLS URL:", hls_url)