
2024 年最新使用 Python 部署腾讯云服务器搭建企业微信机器人应用详细教程
企业微信机器人是一种可以在企业微信工作群中执行特定任务的自动化工具。它具备丰富的功能,可以帮助企业提高团队协作效率,简化工作流程,并为员工提供更好的工作体验。
企业微信机器人是一种可以在企业微信工作群中执行特定任务的自动化工具。它具备丰富的功能,可以帮助企业提高团队协作效率,简化工作流程,并为员工提供更好的工作体验。
获取企业 ID 信息
企业信息页面链接地址:https://work.weixin.qq.com/wework_admin/frame#profile
创建企业微信机器人
后台应用管理页面链接地址:https://work.weixin.qq.com/wework_admin/frame#apps
点击创建应用
填写配置机器人应用信息
机器人功能配置
配置密钥 Secret
获取机器人应用 agentId 和 Secret 信息
发送 Secret 到企业微信查看详细 Secret
创建 config.ini 配置文件配置企业微信机器人应用的配置信息
corpId=【企业 ID】
corpSecret=【应用密钥】
获取 access_token
API 开发文档:https://developer.work.weixin.qq.com/resource/devtool
获取 access_token 是调用企业微信API接口的第一步,相当于创建了一个登录凭证,其它的业务 API 接口,都需要依赖于 access_token 来鉴权调用者身份。
因此开发者,在使用业务接口前,要明确 access_token 的颁发来源,使用 正确的access_token。
请求方式: GET(HTTPS)
请求地址: https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid=ID&corpsecret=SECRET
权限说明
每个应用有独立的 secret,获取到的 access_token 只能本应用使用,所以每个应用的 access_token 应该分开来获取。
注意事项
开发者需要缓存access_token,用于后续接口的调用(注意:不能频繁调用gettoken接口,否则会受到频率拦截)。当access_token失效或过期时,需要重新获取。
access_token的有效期通过返回的expires_in来传达,正常情况下为7200秒(2小时),有效期内重复获取返回相同结果,过期后获取会返回新的access_token。
由于企业微信每个应用的access_token是彼此独立的,所以进行缓存时需要区分应用来进行存储。
access_token至少保留512字节的存储空间。
企业微信可能会出于运营需要,提前使access_token失效,开发者应实现access_token失效时重新获取的逻辑。
返回结果
实现代码展示
import os
import requests
# 从环境变量中读取 corpid 和 corpsecret
corpid = os.environ.get('corpid')
corpsecret = os.environ.get('corpsecret')
# 发送 GET 请求获取 access_token
url = f"https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid={corpid}&corpsecret={corpsecret}"
response = requests.get(url)
# 检查响应状态
if response.status_code == 200:
data = response.json()
if data.get('errcode') == 0:
# 将 access_token 写入文件
with open("access_token.txt", "w") as file:
file.write(data['access_token'])
else:
# 输出错误信息
print(data)
else:
# 输出请求失败信息
print(f"Request failed with status code {response.status_code}")
接收消息服务器配置
为了能够让自建应用和企业微信进行双向通信,企业可以在应用的管理后台开启接收消息模式。开启接收消息模式的企业,需要提供可用的接收消息服务器 URL(建议使用 https)。
开启接收消息模式后,用户在应用里发送的消息会推送给企业后台。还可配置地理位置上报等事件消息,当事件触发时企业微信会把相应的数据推送到企业的后台。企业后台接收到消息后,可在回复该消息请求的响应包里带上新消息,企业微信会将该被动回复消息推送给用户。
配置消息服务器配置
企业微信在推送消息给企业时,会对消息内容做 AES 加密,以 XML 格式 POST 到企业应用的 URL 上。企业在被动响应时,也需要对数据加密,以 XML 格式返回给企业微信。
配置说明展示
属性 | 说明 |
---|---|
URL | 企业后台接收企业微信推送请求的访问协议和地址 支持 http 或 https 协议(为了提高安全性,建议使用https) |
Token | 企业任意填写(用于生成签名) |
EncodingAESKey | 用于消息体的加密 |
加解密方案说明
文档地址:https://developer.work.weixin.qq.com/document/path/90968
术语说明
msg_signature
: 消息签名,用于验证请求是否来自企业微信(防止攻击者伪造)。
EncodingAESKey
:用于消息体的加密,长度固定为43个字符,从a-z, A-Z, 0-9共62个字符中选取,是AESKey的Base64编码。解码后即为32字节长的AESKey
AESKey=Base64_Decode(EncodingAESKey + “=”)
AESKey
:AES 算法的密钥,长度为 32 字节。AES 采用 CBC 模式,数据采用 PKCS#7 填充至 32 字节的倍数;IV 初始向量大小为 16 字节,取 AESKey 前 16 字节,详见:http://tools.ietf.org/html/rfc2315
msg
:为消息体明文,格式为XML
msg_encrypt
:明文消息msg加密处理后的Base64编码。
初始化加解密类
WXBizMsgCrypt wxcpt(sToken,sEncodingAESKey,sReceiveId);
要求传参数sToken,sEncodingAESKey,sReceiveId。
sToken,sEncodingAESKey即设置接收消息的参数章节所述配置的Token、EncodingAESKey。
验证 URL 函数
① 签名校验 ② 解密数据包,得到明文消息内容。
int VerifyURL(const string &sMsgSignature, const string &sTimeStamp, const string &sNonce, const string &sEchoStr, string &sReplyEchoStr);
解密函数
① 签名校验 ② 解密数据包,得到明文消息结构体。
int DecryptMsg(const string &sMsgSignature, const string &sTimeStamp, const string &sNonce, const string &sPostData, string &sMsg);
加密函数
① 加密明文消息结构体 ② 生成签名 ③ 构造被动响应包
int EncryptMsg(const string &sReplyMsg, const string &sTimeStamp, const string &sNonce, string &sEncryptMsg);
项目结构目录
配置文件 config.ini
[wechat]
AgentId = 企业微信应用ID
Secret = 企业微信应用Secret
CorpID = 企业微信ID
Token = 企业微信应用Token
EncodingAESKey = 企业微信应用EncodingAESKey
[dev]
debug = false
requirements.txt
blinker==1.6.3
certifi==2023.7.22
charset-normalizer==3.3.1
click==8.1.7
colorama==0.4.6
configobj==5.0.8
Flask==3.0.0
idna==3.4
itsdangerous==2.1.2
Jinja2==3.1.2
lxml==4.9.3
MarkupSafe==2.1.3
Naked==0.1.32
pycryptodomex==3.19.0
PyYAML==6.0.1
requests==2.31.0
shellescape==3.8.1
six==1.16.0
urllib3==2.0.7
Werkzeug==3.0.0
加解密算法算法库
鉴于加解密算法相对复杂,企业微信提供了算法库。
./callback/ierror.py
WXBizMsgCrypt_OK = 0
WXBizMsgCrypt_ValidateSignature_Error = -40001
WXBizMsgCrypt_ParseXml_Error = -40002
WXBizMsgCrypt_ComputeSignature_Error = -40003
WXBizMsgCrypt_IllegalAesKey = -40004
WXBizMsgCrypt_ValidateCorpid_Error = -40005
WXBizMsgCrypt_EncryptAES_Error = -40006
WXBizMsgCrypt_DecryptAES_Error = -40007
WXBizMsgCrypt_IllegalBuffer = -40008
WXBizMsgCrypt_EncodeBase64_Error = -40009
WXBizMsgCrypt_DecodeBase64_Error = -40010
WXBizMsgCrypt_GenReturnXml_Error = -40011
./callback/WXBizMsgCrypt3.py
import logging
import base64
import random
import hashlib
import time
import struct
from Cryptodome.Cipher import AES
import xml.etree.cElementTree as ET
import socket
from callback import ierror
"""
关于Crypto.Cipher模块,ImportError: No module named 'Crypto'解决方案
请到官方网站 https://www.dlitz.net/software/pycrypto/ 下载pycrypto。
下载后,按照README中的“Installation”小节的提示进行pycrypto安装。
"""
class FormatException(Exception):
pass
def throw_exception(message, exception_class=FormatException):
"""my define raise exception function"""
raise exception_class(message)
class SHA1:
"""计算企业微信的消息签名接口"""
def getSHA1(self, token, timestamp, nonce, encrypt):
"""用SHA1算法生成安全签名
@param token: 票据
@param timestamp: 时间戳
@param encrypt: 密文
@param nonce: 随机字符串
@return: 安全签名
"""
try:
sortlist = [token, timestamp, nonce, encrypt]
sortlist.sort()
sha = hashlib.sha1()
sha.update("".join(sortlist).encode())
return ierror.WXBizMsgCrypt_OK, sha.hexdigest()
except Exception as e:
logger = logging.getLogger()
logger.error(e)
return ierror.WXBizMsgCrypt_ComputeSignature_Error, None
class XMLParse:
"""提供提取消息格式中的密文及生成回复消息格式的接口"""
# xml消息模板
AES_TEXT_RESPONSE_TEMPLATE = """<xml>
<Encrypt><![CDATA[%(msg_encrypt)s]]></Encrypt>
<MsgSignature><![CDATA[%(msg_signaturet)s]]></MsgSignature>
<TimeStamp>%(timestamp)s</TimeStamp>
<Nonce><![CDATA[%(nonce)s]]></Nonce>
</xml>"""
def extract(self, xmltext):
"""提取出xml数据包中的加密消息
@param xmltext: 待提取的xml字符串
@return: 提取出的加密消息字符串
"""
try:
xml_tree = ET.fromstring(xmltext)
encrypt = xml_tree.find("Encrypt")
return ierror.WXBizMsgCrypt_OK, encrypt.text
except Exception as e:
logger = logging.getLogger()
logger.error(e)
return ierror.WXBizMsgCrypt_ParseXml_Error, None
def generate(self, encrypt, signature, timestamp, nonce):
"""生成xml消息
@param encrypt: 加密后的消息密文
@param signature: 安全签名
@param timestamp: 时间戳
@param nonce: 随机字符串
@return: 生成的xml字符串
"""
resp_dict = {
'msg_encrypt': encrypt,
'msg_signaturet': signature,
'timestamp': timestamp,
'nonce': nonce,
}
resp_xml = self.AES_TEXT_RESPONSE_TEMPLATE % resp_dict
return resp_xml
class PKCS7Encoder():
"""提供基于PKCS7算法的加解密接口"""
block_size = 32
def encode(self, text):
""" 对需要加密的明文进行填充补位
@param text: 需要进行填充补位操作的明文
@return: 补齐明文字符串
"""
text_length = len(text)
# 计算需要填充的位数
amount_to_pad = self.block_size - (text_length % self.block_size)
if amount_to_pad == 0:
amount_to_pad = self.block_size
# 获得补位所用的字符
pad = chr(amount_to_pad)
return text + (pad * amount_to_pad).encode()
def decode(self, decrypted):
"""删除解密后明文的补位字符
@param decrypted: 解密后的明文
@return: 删除补位字符后的明文
"""
pad = ord(decrypted[-1])
if pad < 1 or pad > 32:
pad = 0
return decrypted[:-pad]
class Prpcrypt(object):
"""提供接收和推送给企业微信消息的加解密接口"""
def __init__(self, key):
# self.key = base64.b64decode(key+"=")
self.key = key
# 设置加解密模式为AES的CBC模式
self.mode = AES.MODE_CBC
def encrypt(self, text, receiveid):
"""对明文进行加密
@param text: 需要加密的明文
@return: 加密得到的字符串
"""
# 16位随机字符串添加到明文开头
text = text.encode()
text = self.get_random_str() + struct.pack("I", socket.htonl(len(text))) + text + receiveid.encode()
# 使用自定义的填充方式对明文进行补位填充
pkcs7 = PKCS7Encoder()
text = pkcs7.encode(text)
# 加密
cryptor = AES.new(self.key, self.mode, self.key[:16])
try:
ciphertext = cryptor.encrypt(text)
# 使用BASE64对加密后的字符串进行编码
return ierror.WXBizMsgCrypt_OK, base64.b64encode(ciphertext)
except Exception as e:
logger = logging.getLogger()
logger.error(e)
return ierror.WXBizMsgCrypt_EncryptAES_Error, None
def decrypt(self, text, receiveid):
"""对解密后的明文进行补位删除
@param text: 密文
@return: 删除填充补位后的明文
"""
try:
cryptor = AES.new(self.key, self.mode, self.key[:16])
# 使用BASE64对密文进行解码,然后AES-CBC解密
plain_text = cryptor.decrypt(base64.b64decode(text))
except Exception as e:
logger = logging.getLogger()
logger.error(e)
return ierror.WXBizMsgCrypt_DecryptAES_Error, None
try:
pad = plain_text[-1]
# 去掉补位字符串
# pkcs7 = PKCS7Encoder()
# plain_text = pkcs7.encode(plain_text)
# 去除16位随机字符串
content = plain_text[16:-pad]
xml_len = socket.ntohl(struct.unpack("I", content[: 4])[0])
xml_content = content[4: xml_len + 4]
from_receiveid = content[xml_len + 4:]
except Exception as e:
logger = logging.getLogger()
logger.error(e)
return ierror.WXBizMsgCrypt_IllegalBuffer, None
if from_receiveid.decode('utf8') != receiveid:
return ierror.WXBizMsgCrypt_ValidateCorpid_Error, None
return 0, xml_content
def get_random_str(self):
""" 随机生成16位字符串
@return: 16位字符串
"""
return str(random.randint(1000000000000000, 9999999999999999)).encode()
class WXBizMsgCrypt(object):
# 构造函数
def __init__(self, sToken, sEncodingAESKey, sReceiveId):
try:
self.key = base64.b64decode(sEncodingAESKey + "=")
assert len(self.key) == 32
except:
throw_exception("[error]: EncodingAESKey unvalid !", FormatException)
# return ierror.py.WXBizMsgCrypt_IllegalAesKey,None
self.m_sToken = sToken
self.m_sReceiveId = sReceiveId
# 验证URL
# @param sMsgSignature: 签名串,对应URL参数的msg_signature
# @param sTimeStamp: 时间戳,对应URL参数的timestamp
# @param sNonce: 随机串,对应URL参数的nonce
# @param sEchoStr: 随机串,对应URL参数的echostr
# @param sReplyEchoStr: 解密之后的echostr,当return返回0时有效
# @return:成功0,失败返回对应的错误码
def VerifyURL(self, sMsgSignature, sTimeStamp, sNonce, sEchoStr):
sha1 = SHA1()
ret, signature = sha1.getSHA1(self.m_sToken, sTimeStamp, sNonce, sEchoStr)
if ret != 0:
return ret, None
if not signature == sMsgSignature:
return ierror.WXBizMsgCrypt_ValidateSignature_Error, None
pc = Prpcrypt(self.key)
ret, sReplyEchoStr = pc.decrypt(sEchoStr, self.m_sReceiveId)
return ret, sReplyEchoStr
def EncryptMsg(self, sReplyMsg, sNonce, timestamp=None):
# 将企业回复用户的消息加密打包
# @param sReplyMsg: 企业号待回复用户的消息,xml格式的字符串
# @param sTimeStamp: 时间戳,可以自己生成,也可以用URL参数的timestamp,如为None则自动用当前时间
# @param sNonce: 随机串,可以自己生成,也可以用URL参数的nonce
# sEncryptMsg: 加密后的可以直接回复用户的密文,包括msg_signature, timestamp, nonce, encrypt的xml格式的字符串,
# return:成功0,sEncryptMsg,失败返回对应的错误码None
pc = Prpcrypt(self.key)
ret, encrypt = pc.encrypt(sReplyMsg, self.m_sReceiveId)
encrypt = encrypt.decode('utf8')
if ret != 0:
return ret, None
if timestamp is None:
timestamp = str(int(time.time()))
# 生成安全签名
sha1 = SHA1()
ret, signature = sha1.getSHA1(self.m_sToken, timestamp, sNonce, encrypt)
if ret != 0:
return ret, None
xmlParse = XMLParse()
return ret, xmlParse.generate(encrypt, signature, timestamp, sNonce)
def DecryptMsg(self, sPostData, sMsgSignature, sTimeStamp, sNonce):
# 检验消息的真实性,并且获取解密后的明文
# @param sMsgSignature: 签名串,对应URL参数的msg_signature
# @param sTimeStamp: 时间戳,对应URL参数的timestamp
# @param sNonce: 随机串,对应URL参数的nonce
# @param sPostData: 密文,对应POST请求的数据
# xml_content: 解密后的原文,当return返回0时有效
# @return: 成功0,失败返回对应的错误码
# 验证安全签名
xmlParse = XMLParse()
ret, encrypt = xmlParse.extract(sPostData)
if ret != 0:
return ret, None
sha1 = SHA1()
ret, signature = sha1.getSHA1(self.m_sToken, sTimeStamp, sNonce, encrypt)
if ret != 0:
return ret, None
if not signature == sMsgSignature:
return ierror.WXBizMsgCrypt_ValidateSignature_Error, None
pc = Prpcrypt(self.key)
ret, xml_content = pc.decrypt(encrypt, self.m_sReceiveId)
return ret, xml_content
原理介绍(可略)
from WXBizMsgCrypt import WXBizMsgCrypt
import xml.etree.cElementTree as ET
import sys
if __name__ == "__main__":
# 假设企业在企业微信后台上设置的参数如下
sToken = "hJqcu3uJ9Tn2gXPmxx2w9kkCkCE2EPYo"
sEncodingAESKey = "6qkdMrq68nTKduznJYO1A37W2oEgpkMUvkttRToqhUt"
sCorpID = "ww1436e0e65a779aee"
'''
------------使用示例一:验证回调URL---------------
*企业开启回调模式时,企业号会向验证url发送一个get请求
假设点击验证时,企业收到类似请求:
* GET /cgi-bin/wxpush?msg_signature=5c45ff5e21c57e6ad56bac8758b79b1d9ac89fd3×tamp=1409659589&nonce=263014780&echostr=P9nAzCzyDtyTWESHep1vC5X9xho%2FqYX3Zpb4yKa9SKld1DsH3Iyt3tP3zNdtp%2B4RPcs8TgAE7OaBO%2BFZXvnaqQ%3D%3D
* HTTP/1.1 Host: qy.weixin.qq.com
接收到该请求时,企业应 1.解析出Get请求的参数,包括消息体签名(msg_signature),时间戳(timestamp),随机数字串(nonce)以及企业微信推送过来的随机加密字符串(echostr),
这一步注意作URL解码。
2.验证消息体签名的正确性
3. 解密出echostr原文,将原文当作Get请求的response,返回给企业微信
第2,3步可以用企业微信提供的库函数VerifyURL来实现。
'''
wxcpt = WXBizMsgCrypt(sToken, sEncodingAESKey, sCorpID)
# sVerifyMsgSig=HttpUtils.ParseUrl("msg_signature")
# ret = wxcpt.VerifyAESKey()
# print ret
sVerifyMsgSig = "012bc692d0a58dd4b10f8dfe5c4ac00ae211ebeb"
# sVerifyTimeStamp=HttpUtils.ParseUrl("timestamp")
sVerifyTimeStamp = "1476416373"
# sVerifyNonce=HttpUitls.ParseUrl("nonce")
sVerifyNonce = "47744683"
# sVerifyEchoStr=HttpUtils.ParseUrl("echostr")
sVerifyEchoStr = "fsi1xnbH4yQh0+PJxcOdhhK6TDXkjMyhEPA7xB2TGz6b+g7xyAbEkRxN/3cNXW9qdqjnoVzEtpbhnFyq6SVHyA=="
ret, sEchoStr = wxcpt.VerifyURL(sVerifyMsgSig, sVerifyTimeStamp, sVerifyNonce, sVerifyEchoStr)
if (ret != 0):
print
"ERR: VerifyURL ret: " + str(ret)
sys.exit(1)
# 验证URL成功,将sEchoStr返回给企业号
# HttpUtils.SetResponse(sEchoStr)
'''
------------使用示例二:对用户回复的消息解密---------------
用户回复消息或者点击事件响应时,企业会收到回调消息,此消息是经过企业微信加密之后的密文以post形式发送给企业,密文格式请参考官方文档
假设企业收到企业微信的回调消息如下:
POST /cgi-bin/wxpush? msg_signature=477715d11cdb4164915debcba66cb864d751f3e6×tamp=1409659813&nonce=1372623149 HTTP/1.1
Host: qy.weixin.qq.com
Content-Length: 613
<xml> <ToUserName><![CDATA[wx5823bf96d3bd56c7]]></ToUserName><Encrypt><![CDATA[RypEvHKD8QQKFhvQ6QleEB4J58tiPdvo+rtK1I9qca6aM/wvqnLSV5zEPeusUiX5L5X/0lWfrf0QADHHhGd3QczcdCUpj911L3vg3W/sYYvuJTs3TUUkSUXxaccAS0qhxchrRYt66wiSpGLYL42aM6A8dTT+6k4aSknmPj48kzJs8qLjvd4Xgpue06DOdnLxAUHzM6+kDZ+HMZfJYuR+LtwGc2hgf5gsijff0ekUNXZiqATP7PF5mZxZ3Izoun1s4zG4LUMnvw2r+KqCKIw+3IQH03v+BCA9nMELNqbSf6tiWSrXJB3LAVGUcallcrw8V2t9EL4EhzJWrQUax5wLVMNS0+rUPA3k22Ncx4XXZS9o0MBH27Bo6BpNelZpS+/uh9KsNlY6bHCmJU9p8g7m3fVKn28H3KDYA5Pl/T8Z1ptDAVe0lXdQ2YoyyH2uyPIGHBZZIs2pDBS8R07+qN+E7Q==]]></Encrypt>
<AgentID><![CDATA[218]]></AgentID>
</xml>
企业收到post请求之后应该 1.解析出url上的参数,包括消息体签名(msg_signature),时间戳(timestamp)以及随机数字串(nonce)
2.验证消息体签名的正确性。 3.将post请求的数据进行xml解析,并将<Encrypt>标签的内容进行解密,解密出来的明文即是用户回复消息的明文,明文格式请参考官方文档
第2,3步可以用企业微信提供的库函数DecryptMsg来实现。
'''
# sReqMsgSig = HttpUtils.ParseUrl("msg_signature")
sReqMsgSig = "0c3914025cb4b4d68103f6bfc8db550f79dcf48e"
sReqTimeStamp = "1476422779"
sReqNonce = "1597212914"
sReqData = "<xml><ToUserName><![CDATA[ww1436e0e65a779aee]]></ToUserName>\n<Encrypt><![CDATA[Kl7kjoSf6DMD1zh7rtrHjFaDapSCkaOnwu3bqLc5tAybhhMl9pFeK8NslNPVdMwmBQTNoW4mY7AIjeLvEl3NyeTkAgGzBhzTtRLNshw2AEew+kkYcD+Fq72Kt00fT0WnN87hGrW8SqGc+NcT3mu87Ha3dz1pSDi6GaUA6A0sqfde0VJPQbZ9U+3JWcoD4Z5jaU0y9GSh010wsHF8KZD24YhmZH4ch4Ka7ilEbjbfvhKkNL65HHL0J6EYJIZUC2pFrdkJ7MhmEbU2qARR4iQHE7wy24qy0cRX3Mfp6iELcDNfSsPGjUQVDGxQDCWjayJOpcwocugux082f49HKYg84EpHSGXAyh+/oxwaWbvL6aSDPOYuPDGOCI8jmnKiypE+]]></Encrypt>\n<AgentID><![CDATA[1000002]]></AgentID>\n</xml>"
ret, sMsg = wxcpt.DecryptMsg(sReqData, sReqMsgSig, sReqTimeStamp, sReqNonce)
print
ret, sMsg
if (ret != 0):
print
"ERR: DecryptMsg ret: " + str(ret)
sys.exit(1)
# 解密成功,sMsg即为xml格式的明文
# TODO: 对明文的处理
# For example:
xml_tree = ET.fromstring(sMsg)
content = xml_tree.find("Content").text
print
content
# ...
# ...
'''
------------使用示例三:企业回复用户消息的加密---------------
企业被动回复用户的消息也需要进行加密,并且拼接成密文格式的xml串。
假设企业需要回复用户的明文如下:
<xml>
<ToUserName><![CDATA[mycreate]]></ToUserName>
<FromUserName><![CDATA[wx5823bf96d3bd56c7]]></FromUserName>
<CreateTime>1348831860</CreateTime>
<MsgType><![CDATA[text]]></MsgType>
<Content><![CDATA[this is a test]]></Content>
<MsgId>1234567890123456</MsgId>
<AgentID>128</AgentID>
</xml>
为了将此段明文回复给用户,企业应: 1.自己生成时间时间戳(timestamp),随机数字串(nonce)以便生成消息体签名,也可以直接用从企业微信的post url上解析出的对应值。
2.将明文加密得到密文。 3.用密文,步骤1生成的timestamp,nonce和企业在企业微信设定的token生成消息体签名。 4.将密文,消息体签名,时间戳,随机数字串拼接成xml格式的字符串,发送给企业号。
以上2,3,4步可以用企业微信提供的库函数EncryptMsg来实现。
'''
sRespData = "<xml><ToUserName>ww1436e0e65a779aee</ToUserName><FromUserName>ChenJiaShun</FromUserName><CreateTime>1476422779</CreateTime><MsgType>text</MsgType><Content>你好</Content><MsgId>1456453720</MsgId><AgentID>1000002</AgentID></xml>"
ret, sEncryptMsg = wxcpt.EncryptMsg(sRespData, sReqNonce, sReqTimeStamp)
if (ret != 0):
print
"ERR: EncryptMsg ret: " + str(ret)
sys.exit(1)
# ret == 0 加密成功,企业需要将sEncryptMsg返回给企业号
# TODO:
# HttpUitls.SetResponse(sEncryptMsg)
验证 URL 有效性
文档地址:https://developer.work.weixin.qq.com/document/10514
当点击“保存”提交以上信息时,企业微信会发送一条验证消息到填写的 URL,发送方法为 GET。企业的接收消息服务器接收到验证请求后,需要作出正确的响应才能通过 URL 验证。
企业在获取请求时需要做Urldecode处理,否则会验证不成功
你可以访问接口调试工具进行调试,依次选择 建立连接 > 测试回调模式。
假设接收消息地址设置为:http://api.3dept.com/,企业微信将向该地址发送如下验证请求:
请求方式:GET
请求地址:http://api.3dept.com/?msg_signature=ASDFQWEXZCVAQFASDFASDFSS×tamp=13500001234&nonce=123412323&echostr=ENCRYPT_STR
企业后台收到请求后,需要做如下操作:
对收到的请求做Urldecode处理
通过参数msg_signature对请求进行校验,确认调用者的合法性。
解密echostr参数得到消息内容(即msg字段)
在1秒内原样返回明文消息内容(不能加引号,不能带bom头,不能带换行符)
Python 实现验证 URL 有效性源码
from flask import Flask, request
from callback.WXBizMsgCrypt3 import WXBizMsgCrypt
from lxml import etree
import time
from configobj import ConfigObj
from receive.PassiveRecovery import PassiveRecovery
config = ConfigObj('./config.ini', encoding='utf-8')
debug_mode = config['dev'].as_bool('debug')
app = Flask(__name__)
@app.route('/', methods=['GET'])
def receive_callback():
# 获取参数
msg_signature = request.args.get('msg_signature')
timestamp = request.args.get('timestamp')
nonce = request.args.get('nonce')
echostr = request.args.get('echostr')
print("msg_signature", msg_signature)
print("timestamp", timestamp)
print("nonce", nonce)
print("echostr", echostr)
print(config['wechat']['Token'], config['wechat']['EncodingAESKey'], config['wechat']['CorpID'])
# 创建 WXBizMsgCrypt 对象
wxcpt = WXBizMsgCrypt(config['wechat']['Token'], config['wechat']['EncodingAESKey'], config['wechat']['CorpID'])
ret, sEchoStr = wxcpt.VerifyURL(msg_signature, timestamp, nonce, echostr)
print("sEchoStr", sEchoStr)
if ret != 0:
print(str(ret))
return 'ERR: VerifyURL ret: ' + str(ret)
return sEchoStr
if __name__ == '__main__':
app.run(host="0.0.0.0", debug=debug_mode, port=8888)
使用接收消息
开启接收消息模式后,企业微信会将消息发送给企业填写的URL,企业后台需要做正确的响应。
接收消息协议的说明
企业微信服务器在五秒内收不到响应会断掉连接,并且重新发起请求,总共重试三次。如果企业在调试中,发现成员无法收到被动回复的消息,可以检查是否消息处理超时。
当接收成功后,http头部返回200表示接收ok,其他错误码企业微信后台会一律当做失败并发起重试。
关于重试的消息排重,有msgid的消息推荐使用msgid排重。事件类型消息推荐使用FromUserName + CreateTime排重。
假如企业无法保证在五秒内处理并回复,或者不想回复任何内容,可以直接返回200(即以空串为返回包)。企业后续可以使用主动发消息接口进行异步回复。
接收消息请求的说明
假设企业的接收消息的URL设置为http://api.3dept.com。
请求方式:POST
请求地址 :http://api.3dept.com/?msg_signature=ASDFQWEXZCVAQFASDFASDFSS×tamp=13500001234&nonce=123412323
接收数据格式
<xml>
<ToUserName><![CDATA[toUser]]></ToUserName>
<AgentID><![CDATA[toAgentID]]></AgentID>
<Encrypt><![CDATA[msg_encrypt]]></Encrypt>
</xml>
企业收到消息后,需要作如下处理:
对msg_signature进行校验
解密Encrypt,得到明文的消息结构体(消息结构体后面章节会详说)
如果需要被动回复消息,构造被动响应包
正确响应本次请求
被动响应包的数据格式
<xml>
<Encrypt><![CDATA[msg_encrypt]]></Encrypt>
<MsgSignature><![CDATA[msg_signature]]></MsgSignature>
<TimeStamp>timestamp</TimeStamp>
<Nonce><![CDATA[nonce]]></Nonce>
</xml>
Python 实现接收回复消息源码
from flask import Flask, request
from callback.WXBizMsgCrypt3 import WXBizMsgCrypt
from lxml import etree
import time
from configobj import ConfigObj
from receive.PassiveRecovery import PassiveRecovery
config = ConfigObj('./config.ini', encoding='utf-8')
debug_mode = config['dev'].as_bool('debug')
app = Flask(__name__)
@app.route('/', methods=['POST'])
def callback_message():
# 获取参数
msg_signature = request.args.get('msg_signature')
timestamp = request.args.get('timestamp')
nonce = request.args.get('nonce')
# 获取 POST 的原始数据
sReqData = request.data
# 创建 WXBizMsgCrypt 对象
wxcpt = WXBizMsgCrypt(config['wechat']['Token'], config['wechat']['EncodingAESKey'], config['wechat']['CorpID'])
ret, sMsg = wxcpt.DecryptMsg(sReqData, msg_signature, timestamp, nonce)
if ret != 0:
return 'ERR: DecryptMsg ret: ' + str(ret)
# 解析 XML
xml_tree = etree.fromstring(sMsg)
print(xml_tree)
from_user = xml_tree.find("FromUserName").text
to_user = xml_tree.find("ToUserName").text
msg_type = xml_tree.find("MsgType").text
print(from_user)
print(to_user)
print(msg_type)
# 消息被动回复
if msg_type == 'text':
content = xml_tree.find("Content").text
print(content)
reply_xml = PassiveRecovery(to_user, from_user, msg_type, msg_content=content).text()
else:
media_id = xml_tree.find("MediaId").text
print(media_id)
reply_xml = PassiveRecovery(to_user, from_user, msg_type, media_id=media_id).image()
# 加密回复的 XML
ret, sEncryptMsg = wxcpt.EncryptMsg(reply_xml, nonce, timestamp)
if ret != 0:
return 'ERR: EncryptMsg ret: ' + str(ret)
return sEncryptMsg
if __name__ == '__main__':
app.run(debug=debug_mode, port=8888)
企业微信服务器 IP 段
企业微信在回调企业指定的URL时,是通过特定的IP发送出去的。如果企业需要做防火墙配置,那么可以通过这个接口获取到所有相关的IP段。
请求方式:GET(HTTPS)
请求地址: https://qyapi.weixin.qq.com/cgi-bin/getcallbackip?access_token=ACCESS_TOKEN
返回结果:
{
"errcode": 0,
"errmsg": "ok",
"ip_list": ["101.226.103.*", "101.226.62.*"]
}
源码部署服务器
接收消息 ./receive/PassiveRecovery.py
import time
class PassiveRecovery(object):
# 传入数据 to_user, from_user, msg_type, msg_content, media_id
def __init__(self, to_user, from_user, msg_type, msg_content=None, media_id=None):
self.to_user = to_user
self.from_user = from_user
self.msg_type = msg_type
self.msg_content = msg_content
self.media_id = media_id
def text(self):
return f"""<xml>
<ToUserName><![CDATA[{self.to_user}]]></ToUserName>
<FromUserName><![CDATA[{self.from_user}]]></FromUserName>
<CreateTime>{int(time.time())}</CreateTime>
<MsgType><![CDATA[text]]></MsgType>
<Content><![CDATA[{self.msg_content}]]></Content>
</xml>"""
def image(self):
return f"""<xml>
<ToUserName><![CDATA[{self.to_user}]]></ToUserName>
<FromUserName><![CDATA[{self.from_user}]]></FromUserName>
<CreateTime>{int(time.time())}</CreateTime>
<MsgType><![CDATA[image]]></MsgType>
<Image>
<MediaId><![CDATA[{self.media_id}]]></MediaId>
</Image>
</xml>"""
def video(self):
return f"""<xml>
<ToUserName><![CDATA[{self.to_user}]]></ToUserName>
<FromUserName><![CDATA[{self.from_user}]]></FromUserName>
<CreateTime>{int(time.time())}</CreateTime>
<MsgType><![CDATA[video]]></MsgType>
<Video>
<MediaId><![CDATA[{self.media_id}]]></MediaId>
<Title><![CDATA[title]]></Title>
<Description><![CDATA[description]]></Description>
</Video>
</xml>"""
main.py
from flask import Flask, request
from callback.WXBizMsgCrypt3 import WXBizMsgCrypt
from lxml import etree
import time
from configobj import ConfigObj
from receive.PassiveRecovery import PassiveRecovery
config = ConfigObj('./config.ini', encoding='utf-8')
debug_mode = config['dev'].as_bool('debug')
app = Flask(__name__)
@app.route('/', methods=['GET'])
def receive_callback():
# 获取参数
msg_signature = request.args.get('msg_signature')
timestamp = request.args.get('timestamp')
nonce = request.args.get('nonce')
echostr = request.args.get('echostr')
# 创建 WXBizMsgCrypt 对象
wxcpt = WXBizMsgCrypt(config['wechat']['Token'], config['wechat']['EncodingAESKey'], config['wechat']['CorpID'])
ret, sEchoStr = wxcpt.VerifyURL(msg_signature, timestamp, nonce, echostr)
if ret != 0:
return 'ERR: VerifyURL ret: ' + str(ret)
return sEchoStr
@app.route('/', methods=['POST'])
def callback_message():
# 获取参数
msg_signature = request.args.get('msg_signature')
timestamp = request.args.get('timestamp')
nonce = request.args.get('nonce')
# 获取 POST 的原始数据
sReqData = request.data
# 创建 WXBizMsgCrypt 对象
wxcpt = WXBizMsgCrypt(config['wechat']['Token'], config['wechat']['EncodingAESKey'], config['wechat']['CorpID'])
ret, sMsg = wxcpt.DecryptMsg(sReqData, msg_signature, timestamp, nonce)
if ret != 0:
return 'ERR: DecryptMsg ret: ' + str(ret)
# 解析 XML
xml_tree = etree.fromstring(sMsg)
print(xml_tree)
from_user = xml_tree.find("FromUserName").text
to_user = xml_tree.find("ToUserName").text
msg_type = xml_tree.find("MsgType").text
print(from_user)
print(to_user)
print(msg_type)
# 消息被动回复
if msg_type == 'text':
content = xml_tree.find("Content").text
print(content)
reply_xml = PassiveRecovery(to_user, from_user, msg_type, msg_content=content).text()
else:
media_id = xml_tree.find("MediaId").text
print(media_id)
reply_xml = PassiveRecovery(to_user, from_user, msg_type, media_id=media_id).image()
# 加密回复的 XML
ret, sEncryptMsg = wxcpt.EncryptMsg(reply_xml, nonce, timestamp)
if ret != 0:
return 'ERR: EncryptMsg ret: ' + str(ret)
return sEncryptMsg
if __name__ == '__main__':
app.run(debug=debug_mode, port=8888)
参考项目:https://github.com/Waite0603/enterprise_wechat_bot
执行 main.py
python main.py
使用 Gunicorn 运行 Flask
gevent 是一个基于 libevent 的 Python 协程库,用于实现高性能的并发服务器。与多线程或多进程相比,使用 gevent 可以更有效地处理大量的并发连接,因为它通过协程实现非阻塞 I/O,从而避免了线程切换的开销。
安装必要的库:
首先确保你已经安装了 Flask 和 gevent。你可以使用 pip 来安装它们:
pip install Flask gevent
使用 Gunicorn + gevent worker
对于生产环境,更常见的做法是使用 Gunicorn 作为反向代理服务器,并配置它使用 gevent worker。这样,你可以利用 Gunicorn 的许多高级功能,如进程管理、日志记录等,同时仍然享受 gevent 带来的性能优势。
安装 Gunicorn
pip install gunicorn
使用命令启动你的 Flask 应用:
gunicorn -w 4 -k gevent 'app:main'
这里,-w 4 指定了 4 个 worker 进程,-k gevent 指定了使用 gevent worker。‘app:main’ 是你的 Flask 应用的位置,其中 app 是 Python 模块名,后面的 main 是 Flask 应用实例的变量名。
配置 nginx 服务器
域名解析:http://wxbot.willwaking.com -> 服务器 IP
注意:防止和静态路由冲突需要配置 nginx 请求转发。
location /wxbot/ {
proxy_pass http://localhost:8888/;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
}
测试结果:
配置成功:
更多推荐
所有评论(0)