企业微信机器人是一种可以在企业微信工作群中执行特定任务的自动化工具。它具备丰富的功能,可以帮助企业提高团队协作效率,简化工作流程,并为员工提供更好的工作体验。

获取企业 ID 信息

企业信息页面链接地址:https://work.weixin.qq.com/wework_admin/frame#profile

在这里插入图片描述

创建企业微信机器人

后台应用管理页面链接地址:https://work.weixin.qq.com/wework_admin/frame#apps

点击创建应用

在这里插入图片描述

填写配置机器人应用信息

在这里插入图片描述

机器人功能配置

在这里插入图片描述

配置密钥 Secret

获取机器人应用 agentId 和 Secret 信息

在这里插入图片描述

发送 Secret 到企业微信查看详细 Secret

在这里插入图片描述

创建 config.ini 配置文件配置企业微信机器人应用的配置信息

corpId=【企业 ID】
corpSecret=【应用密钥】

获取 access_token

API 开发文档:https://developer.work.weixin.qq.com/resource/devtool

获取 access_token 是调用企业微信API接口的第一步,相当于创建了一个登录凭证,其它的业务 API 接口,都需要依赖于 access_token 来鉴权调用者身份。

因此开发者,在使用业务接口前,要明确 access_token 的颁发来源,使用 正确的access_token。

请求方式: GETHTTPS)
请求地址: https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid=ID&corpsecret=SECRET

权限说明

每个应用有独立的 secret,获取到的 access_token 只能本应用使用,所以每个应用的 access_token 应该分开来获取。

在这里插入图片描述

注意事项

开发者需要缓存access_token,用于后续接口的调用(注意:不能频繁调用gettoken接口,否则会受到频率拦截)。当access_token失效或过期时,需要重新获取。

access_token的有效期通过返回的expires_in来传达,正常情况下为7200秒(2小时),有效期内重复获取返回相同结果,过期后获取会返回新的access_token。
由于企业微信每个应用的access_token是彼此独立的,所以进行缓存时需要区分应用来进行存储。
access_token至少保留512字节的存储空间。
企业微信可能会出于运营需要,提前使access_token失效,开发者应实现access_token失效时重新获取的逻辑。

返回结果

在这里插入图片描述

实现代码展示

import os  
import requests  
  
# 从环境变量中读取 corpid 和 corpsecret  
corpid = os.environ.get('corpid')  
corpsecret = os.environ.get('corpsecret')  
  
# 发送 GET 请求获取 access_token  
url = f"https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid={corpid}&corpsecret={corpsecret}"  
response = requests.get(url)  
  
# 检查响应状态  
if response.status_code == 200:  
    data = response.json()  
    if data.get('errcode') == 0:  
        # 将 access_token 写入文件  
        with open("access_token.txt", "w") as file:  
            file.write(data['access_token'])  
    else:  
        # 输出错误信息  
        print(data)  
else:  
    # 输出请求失败信息  
    print(f"Request failed with status code {response.status_code}")

接收消息服务器配置

为了能够让自建应用和企业微信进行双向通信,企业可以在应用的管理后台开启接收消息模式。开启接收消息模式的企业,需要提供可用的接收消息服务器 URL(建议使用 https)。

开启接收消息模式后,用户在应用里发送的消息会推送给企业后台。还可配置地理位置上报等事件消息,当事件触发时企业微信会把相应的数据推送到企业的后台。企业后台接收到消息后,可在回复该消息请求的响应包里带上新消息,企业微信会将该被动回复消息推送给用户。

配置消息服务器配置

在这里插入图片描述

企业微信在推送消息给企业时,会对消息内容做 AES 加密,以 XML 格式 POST 到企业应用的 URL 上。企业在被动响应时,也需要对数据加密,以 XML 格式返回给企业微信。

配置说明展示

属性说明
URL企业后台接收企业微信推送请求的访问协议和地址 支持 http 或 https 协议(为了提高安全性,建议使用https)
Token企业任意填写(用于生成签名)
EncodingAESKey用于消息体的加密

加解密方案说明

文档地址:https://developer.work.weixin.qq.com/document/path/90968

术语说明

msg_signature: 消息签名,用于验证请求是否来自企业微信(防止攻击者伪造)。

EncodingAESKey:用于消息体的加密,长度固定为43个字符,从a-z, A-Z, 0-9共62个字符中选取,是AESKey的Base64编码。解码后即为32字节长的AESKey

AESKey=Base64_Decode(EncodingAESKey +=)

AESKey:AES 算法的密钥,长度为 32 字节。AES 采用 CBC 模式,数据采用 PKCS#7 填充至 32 字节的倍数;IV 初始向量大小为 16 字节,取 AESKey 前 16 字节,详见:http://tools.ietf.org/html/rfc2315

msg:为消息体明文,格式为XML

msg_encrypt:明文消息msg加密处理后的Base64编码。

初始化加解密类

WXBizMsgCrypt wxcpt(sToken,sEncodingAESKey,sReceiveId);

要求传参数sToken,sEncodingAESKey,sReceiveId。

sToken,sEncodingAESKey即设置接收消息的参数章节所述配置的Token、EncodingAESKey。

验证 URL 函数

① 签名校验 ② 解密数据包,得到明文消息内容。

int VerifyURL(const string &sMsgSignature, const string &sTimeStamp, const string &sNonce, const string &sEchoStr, string &sReplyEchoStr);

在这里插入图片描述

解密函数

① 签名校验 ② 解密数据包,得到明文消息结构体。

int DecryptMsg(const string &sMsgSignature, const string &sTimeStamp, const string &sNonce, const string &sPostData, string &sMsg);

在这里插入图片描述

加密函数

① 加密明文消息结构体 ② 生成签名 ③ 构造被动响应包

int EncryptMsg(const string &sReplyMsg, const string &sTimeStamp, const string &sNonce, string &sEncryptMsg);

在这里插入图片描述

项目结构目录

在这里插入图片描述

配置文件 config.ini

[wechat]
AgentId = 企业微信应用ID
Secret = 企业微信应用Secret
CorpID = 企业微信ID
Token = 企业微信应用Token
EncodingAESKey = 企业微信应用EncodingAESKey

[dev]
debug = false

requirements.txt

blinker==1.6.3
certifi==2023.7.22
charset-normalizer==3.3.1
click==8.1.7
colorama==0.4.6
configobj==5.0.8
Flask==3.0.0
idna==3.4
itsdangerous==2.1.2
Jinja2==3.1.2
lxml==4.9.3
MarkupSafe==2.1.3
Naked==0.1.32
pycryptodomex==3.19.0
PyYAML==6.0.1
requests==2.31.0
shellescape==3.8.1
six==1.16.0
urllib3==2.0.7
Werkzeug==3.0.0

加解密算法算法库

鉴于加解密算法相对复杂,企业微信提供了算法库。

./callback/ierror.py

WXBizMsgCrypt_OK = 0
WXBizMsgCrypt_ValidateSignature_Error = -40001
WXBizMsgCrypt_ParseXml_Error = -40002
WXBizMsgCrypt_ComputeSignature_Error = -40003
WXBizMsgCrypt_IllegalAesKey = -40004
WXBizMsgCrypt_ValidateCorpid_Error = -40005
WXBizMsgCrypt_EncryptAES_Error = -40006
WXBizMsgCrypt_DecryptAES_Error = -40007
WXBizMsgCrypt_IllegalBuffer = -40008
WXBizMsgCrypt_EncodeBase64_Error = -40009
WXBizMsgCrypt_DecodeBase64_Error = -40010
WXBizMsgCrypt_GenReturnXml_Error = -40011

./callback/WXBizMsgCrypt3.py

import logging
import base64
import random
import hashlib
import time
import struct
from Cryptodome.Cipher import AES
import xml.etree.cElementTree as ET
import socket

from callback import ierror


"""
关于Crypto.Cipher模块,ImportError: No module named 'Crypto'解决方案
请到官方网站 https://www.dlitz.net/software/pycrypto/ 下载pycrypto。
下载后,按照README中的“Installation”小节的提示进行pycrypto安装。
"""


class FormatException(Exception):
    pass


def throw_exception(message, exception_class=FormatException):
    """my define raise exception function"""
    raise exception_class(message)


class SHA1:
    """计算企业微信的消息签名接口"""

    def getSHA1(self, token, timestamp, nonce, encrypt):
        """用SHA1算法生成安全签名
        @param token:  票据
        @param timestamp: 时间戳
        @param encrypt: 密文
        @param nonce: 随机字符串
        @return: 安全签名
        """
        try:
            sortlist = [token, timestamp, nonce, encrypt]
            sortlist.sort()
            sha = hashlib.sha1()
            sha.update("".join(sortlist).encode())
            return ierror.WXBizMsgCrypt_OK, sha.hexdigest()
        except Exception as e:
            logger = logging.getLogger()
            logger.error(e)
            return ierror.WXBizMsgCrypt_ComputeSignature_Error, None


class XMLParse:
    """提供提取消息格式中的密文及生成回复消息格式的接口"""

    # xml消息模板
    AES_TEXT_RESPONSE_TEMPLATE = """<xml>
<Encrypt><![CDATA[%(msg_encrypt)s]]></Encrypt>
<MsgSignature><![CDATA[%(msg_signaturet)s]]></MsgSignature>
<TimeStamp>%(timestamp)s</TimeStamp>
<Nonce><![CDATA[%(nonce)s]]></Nonce>
</xml>"""

    def extract(self, xmltext):
        """提取出xml数据包中的加密消息
        @param xmltext: 待提取的xml字符串
        @return: 提取出的加密消息字符串
        """
        try:
            xml_tree = ET.fromstring(xmltext)
            encrypt = xml_tree.find("Encrypt")
            return ierror.WXBizMsgCrypt_OK, encrypt.text
        except Exception as e:
            logger = logging.getLogger()
            logger.error(e)
            return ierror.WXBizMsgCrypt_ParseXml_Error, None

    def generate(self, encrypt, signature, timestamp, nonce):
        """生成xml消息
        @param encrypt: 加密后的消息密文
        @param signature: 安全签名
        @param timestamp: 时间戳
        @param nonce: 随机字符串
        @return: 生成的xml字符串
        """
        resp_dict = {
            'msg_encrypt': encrypt,
            'msg_signaturet': signature,
            'timestamp': timestamp,
            'nonce': nonce,
        }
        resp_xml = self.AES_TEXT_RESPONSE_TEMPLATE % resp_dict
        return resp_xml


class PKCS7Encoder():
    """提供基于PKCS7算法的加解密接口"""

    block_size = 32

    def encode(self, text):
        """ 对需要加密的明文进行填充补位
        @param text: 需要进行填充补位操作的明文
        @return: 补齐明文字符串
        """
        text_length = len(text)
        # 计算需要填充的位数
        amount_to_pad = self.block_size - (text_length % self.block_size)
        if amount_to_pad == 0:
            amount_to_pad = self.block_size
        # 获得补位所用的字符
        pad = chr(amount_to_pad)
        return text + (pad * amount_to_pad).encode()

    def decode(self, decrypted):
        """删除解密后明文的补位字符
        @param decrypted: 解密后的明文
        @return: 删除补位字符后的明文
        """
        pad = ord(decrypted[-1])
        if pad < 1 or pad > 32:
            pad = 0
        return decrypted[:-pad]


class Prpcrypt(object):
    """提供接收和推送给企业微信消息的加解密接口"""
    def __init__(self, key):
        # self.key = base64.b64decode(key+"=")
        self.key = key
        # 设置加解密模式为AESCBC模式
        self.mode = AES.MODE_CBC

    def encrypt(self, text, receiveid):
        """对明文进行加密
        @param text: 需要加密的明文
        @return: 加密得到的字符串
        """
        # 16位随机字符串添加到明文开头
        text = text.encode()
        text = self.get_random_str() + struct.pack("I", socket.htonl(len(text))) + text + receiveid.encode()

        # 使用自定义的填充方式对明文进行补位填充
        pkcs7 = PKCS7Encoder()
        text = pkcs7.encode(text)
        # 加密
        cryptor = AES.new(self.key, self.mode, self.key[:16])
        try:
            ciphertext = cryptor.encrypt(text)
            # 使用BASE64对加密后的字符串进行编码
            return ierror.WXBizMsgCrypt_OK, base64.b64encode(ciphertext)
        except Exception as e:
            logger = logging.getLogger()
            logger.error(e)
            return ierror.WXBizMsgCrypt_EncryptAES_Error, None

    def decrypt(self, text, receiveid):
        """对解密后的明文进行补位删除
        @param text: 密文
        @return: 删除填充补位后的明文
        """
        try:
            cryptor = AES.new(self.key, self.mode, self.key[:16])
            # 使用BASE64对密文进行解码,然后AES-CBC解密
            plain_text = cryptor.decrypt(base64.b64decode(text))
        except Exception as e:
            logger = logging.getLogger()
            logger.error(e)
            return ierror.WXBizMsgCrypt_DecryptAES_Error, None
        try:
            pad = plain_text[-1]
            # 去掉补位字符串
            # pkcs7 = PKCS7Encoder()
            # plain_text = pkcs7.encode(plain_text)
            # 去除16位随机字符串
            content = plain_text[16:-pad]
            xml_len = socket.ntohl(struct.unpack("I", content[: 4])[0])
            xml_content = content[4: xml_len + 4]
            from_receiveid = content[xml_len + 4:]
        except Exception as e:
            logger = logging.getLogger()
            logger.error(e)
            return ierror.WXBizMsgCrypt_IllegalBuffer, None

        if from_receiveid.decode('utf8') != receiveid:
            return ierror.WXBizMsgCrypt_ValidateCorpid_Error, None
        return 0, xml_content

    def get_random_str(self):
        """ 随机生成16位字符串
        @return: 16位字符串
        """
        return str(random.randint(1000000000000000, 9999999999999999)).encode()


class WXBizMsgCrypt(object):
    # 构造函数
    def __init__(self, sToken, sEncodingAESKey, sReceiveId):
        try:
            self.key = base64.b64decode(sEncodingAESKey + "=")
            assert len(self.key) == 32
        except:
            throw_exception("[error]: EncodingAESKey unvalid !", FormatException)
            # return ierror.py.WXBizMsgCrypt_IllegalAesKey,None
        self.m_sToken = sToken
        self.m_sReceiveId = sReceiveId

        # 验证URL
        # @param sMsgSignature: 签名串,对应URL参数的msg_signature
        # @param sTimeStamp: 时间戳,对应URL参数的timestamp
        # @param sNonce: 随机串,对应URL参数的nonce
        # @param sEchoStr: 随机串,对应URL参数的echostr
        # @param sReplyEchoStr: 解密之后的echostr,当return返回0时有效
        # @return:成功0,失败返回对应的错误码

    def VerifyURL(self, sMsgSignature, sTimeStamp, sNonce, sEchoStr):
        sha1 = SHA1()
        ret, signature = sha1.getSHA1(self.m_sToken, sTimeStamp, sNonce, sEchoStr)
        if ret != 0:
            return ret, None
        if not signature == sMsgSignature:
            return ierror.WXBizMsgCrypt_ValidateSignature_Error, None
        pc = Prpcrypt(self.key)
        ret, sReplyEchoStr = pc.decrypt(sEchoStr, self.m_sReceiveId)
        return ret, sReplyEchoStr

    def EncryptMsg(self, sReplyMsg, sNonce, timestamp=None):
        # 将企业回复用户的消息加密打包
        # @param sReplyMsg: 企业号待回复用户的消息,xml格式的字符串
        # @param sTimeStamp: 时间戳,可以自己生成,也可以用URL参数的timestamp,如为None则自动用当前时间
        # @param sNonce: 随机串,可以自己生成,也可以用URL参数的nonce
        # sEncryptMsg: 加密后的可以直接回复用户的密文,包括msg_signature, timestamp, nonce, encrypt的xml格式的字符串,
        # return:成功0,sEncryptMsg,失败返回对应的错误码None
        pc = Prpcrypt(self.key)
        ret, encrypt = pc.encrypt(sReplyMsg, self.m_sReceiveId)
        encrypt = encrypt.decode('utf8')
        if ret != 0:
            return ret, None
        if timestamp is None:
            timestamp = str(int(time.time()))
        # 生成安全签名
        sha1 = SHA1()
        ret, signature = sha1.getSHA1(self.m_sToken, timestamp, sNonce, encrypt)
        if ret != 0:
            return ret, None
        xmlParse = XMLParse()
        return ret, xmlParse.generate(encrypt, signature, timestamp, sNonce)

    def DecryptMsg(self, sPostData, sMsgSignature, sTimeStamp, sNonce):
        # 检验消息的真实性,并且获取解密后的明文
        # @param sMsgSignature: 签名串,对应URL参数的msg_signature
        # @param sTimeStamp: 时间戳,对应URL参数的timestamp
        # @param sNonce: 随机串,对应URL参数的nonce
        # @param sPostData: 密文,对应POST请求的数据
        #  xml_content: 解密后的原文,当return返回0时有效
        # @return: 成功0,失败返回对应的错误码
        # 验证安全签名
        xmlParse = XMLParse()
        ret, encrypt = xmlParse.extract(sPostData)
        if ret != 0:
            return ret, None
        sha1 = SHA1()
        ret, signature = sha1.getSHA1(self.m_sToken, sTimeStamp, sNonce, encrypt)
        if ret != 0:
            return ret, None
        if not signature == sMsgSignature:
            return ierror.WXBizMsgCrypt_ValidateSignature_Error, None
        pc = Prpcrypt(self.key)
        ret, xml_content = pc.decrypt(encrypt, self.m_sReceiveId)
        return ret, xml_content

原理介绍(可略)

from WXBizMsgCrypt import WXBizMsgCrypt
import xml.etree.cElementTree as ET
import sys

if __name__ == "__main__":
    # 假设企业在企业微信后台上设置的参数如下
    sToken = "hJqcu3uJ9Tn2gXPmxx2w9kkCkCE2EPYo"
    sEncodingAESKey = "6qkdMrq68nTKduznJYO1A37W2oEgpkMUvkttRToqhUt"
    sCorpID = "ww1436e0e65a779aee"
    '''
     ------------使用示例一:验证回调URL---------------
     *企业开启回调模式时,企业号会向验证url发送一个get请求 
     假设点击验证时,企业收到类似请求:
     * GET /cgi-bin/wxpush?msg_signature=5c45ff5e21c57e6ad56bac8758b79b1d9ac89fd3&timestamp=1409659589&nonce=263014780&echostr=P9nAzCzyDtyTWESHep1vC5X9xho%2FqYX3Zpb4yKa9SKld1DsH3Iyt3tP3zNdtp%2B4RPcs8TgAE7OaBO%2BFZXvnaqQ%3D%3D 
     * HTTP/1.1 Host: qy.weixin.qq.com
 
     接收到该请求时,企业应	1.解析出Get请求的参数,包括消息体签名(msg_signature),时间戳(timestamp),随机数字串(nonce)以及企业微信推送过来的随机加密字符串(echostr),
     这一步注意作URL解码。
     2.验证消息体签名的正确性 
     3. 解密出echostr原文,将原文当作Get请求的response,返回给企业微信
     第2,3步可以用企业微信提供的库函数VerifyURL来实现。
    '''
    wxcpt = WXBizMsgCrypt(sToken, sEncodingAESKey, sCorpID)
    # sVerifyMsgSig=HttpUtils.ParseUrl("msg_signature")
    # ret = wxcpt.VerifyAESKey()
    # print ret
    sVerifyMsgSig = "012bc692d0a58dd4b10f8dfe5c4ac00ae211ebeb"
    # sVerifyTimeStamp=HttpUtils.ParseUrl("timestamp")
    sVerifyTimeStamp = "1476416373"
    # sVerifyNonce=HttpUitls.ParseUrl("nonce")
    sVerifyNonce = "47744683"
    # sVerifyEchoStr=HttpUtils.ParseUrl("echostr")
    sVerifyEchoStr = "fsi1xnbH4yQh0+PJxcOdhhK6TDXkjMyhEPA7xB2TGz6b+g7xyAbEkRxN/3cNXW9qdqjnoVzEtpbhnFyq6SVHyA=="
    ret, sEchoStr = wxcpt.VerifyURL(sVerifyMsgSig, sVerifyTimeStamp, sVerifyNonce, sVerifyEchoStr)
    if (ret != 0):
        print
        "ERR: VerifyURL ret: " + str(ret)
        sys.exit(1)
    # 验证URL成功,将sEchoStr返回给企业号
    # HttpUtils.SetResponse(sEchoStr)

    '''
    ------------使用示例二:对用户回复的消息解密---------------
    用户回复消息或者点击事件响应时,企业会收到回调消息,此消息是经过企业微信加密之后的密文以post形式发送给企业,密文格式请参考官方文档
    假设企业收到企业微信的回调消息如下:
    POST /cgi-bin/wxpush? msg_signature=477715d11cdb4164915debcba66cb864d751f3e6&timestamp=1409659813&nonce=1372623149 HTTP/1.1
    Host: qy.weixin.qq.com
    Content-Length: 613
    <xml> <ToUserName><![CDATA[wx5823bf96d3bd56c7]]></ToUserName><Encrypt><![CDATA[RypEvHKD8QQKFhvQ6QleEB4J58tiPdvo+rtK1I9qca6aM/wvqnLSV5zEPeusUiX5L5X/0lWfrf0QADHHhGd3QczcdCUpj911L3vg3W/sYYvuJTs3TUUkSUXxaccAS0qhxchrRYt66wiSpGLYL42aM6A8dTT+6k4aSknmPj48kzJs8qLjvd4Xgpue06DOdnLxAUHzM6+kDZ+HMZfJYuR+LtwGc2hgf5gsijff0ekUNXZiqATP7PF5mZxZ3Izoun1s4zG4LUMnvw2r+KqCKIw+3IQH03v+BCA9nMELNqbSf6tiWSrXJB3LAVGUcallcrw8V2t9EL4EhzJWrQUax5wLVMNS0+rUPA3k22Ncx4XXZS9o0MBH27Bo6BpNelZpS+/uh9KsNlY6bHCmJU9p8g7m3fVKn28H3KDYA5Pl/T8Z1ptDAVe0lXdQ2YoyyH2uyPIGHBZZIs2pDBS8R07+qN+E7Q==]]></Encrypt>
    <AgentID><![CDATA[218]]></AgentID>
    </xml>
 
    企业收到post请求之后应该 1.解析出url上的参数,包括消息体签名(msg_signature),时间戳(timestamp)以及随机数字串(nonce)
    2.验证消息体签名的正确性。 3.将post请求的数据进行xml解析,并将<Encrypt>标签的内容进行解密,解密出来的明文即是用户回复消息的明文,明文格式请参考官方文档
    第2,3步可以用企业微信提供的库函数DecryptMsg来实现。
    '''
    # sReqMsgSig = HttpUtils.ParseUrl("msg_signature")
    sReqMsgSig = "0c3914025cb4b4d68103f6bfc8db550f79dcf48e"
    sReqTimeStamp = "1476422779"
    sReqNonce = "1597212914"
    sReqData = "<xml><ToUserName><![CDATA[ww1436e0e65a779aee]]></ToUserName>\n<Encrypt><![CDATA[Kl7kjoSf6DMD1zh7rtrHjFaDapSCkaOnwu3bqLc5tAybhhMl9pFeK8NslNPVdMwmBQTNoW4mY7AIjeLvEl3NyeTkAgGzBhzTtRLNshw2AEew+kkYcD+Fq72Kt00fT0WnN87hGrW8SqGc+NcT3mu87Ha3dz1pSDi6GaUA6A0sqfde0VJPQbZ9U+3JWcoD4Z5jaU0y9GSh010wsHF8KZD24YhmZH4ch4Ka7ilEbjbfvhKkNL65HHL0J6EYJIZUC2pFrdkJ7MhmEbU2qARR4iQHE7wy24qy0cRX3Mfp6iELcDNfSsPGjUQVDGxQDCWjayJOpcwocugux082f49HKYg84EpHSGXAyh+/oxwaWbvL6aSDPOYuPDGOCI8jmnKiypE+]]></Encrypt>\n<AgentID><![CDATA[1000002]]></AgentID>\n</xml>"
    ret, sMsg = wxcpt.DecryptMsg(sReqData, sReqMsgSig, sReqTimeStamp, sReqNonce)
    print
    ret, sMsg
    if (ret != 0):
        print
        "ERR: DecryptMsg ret: " + str(ret)
        sys.exit(1)
    # 解密成功,sMsg即为xml格式的明文
    # TODO: 对明文的处理
    # For example:
    xml_tree = ET.fromstring(sMsg)
    content = xml_tree.find("Content").text
    print
    content
    # ...
    # ...

    '''
    ------------使用示例三:企业回复用户消息的加密---------------
    企业被动回复用户的消息也需要进行加密,并且拼接成密文格式的xml串。
    假设企业需要回复用户的明文如下:
    <xml>
    <ToUserName><![CDATA[mycreate]]></ToUserName>
    <FromUserName><![CDATA[wx5823bf96d3bd56c7]]></FromUserName>
    <CreateTime>1348831860</CreateTime>
    <MsgType><![CDATA[text]]></MsgType>
    <Content><![CDATA[this is a test]]></Content>
    <MsgId>1234567890123456</MsgId>
    <AgentID>128</AgentID>
    </xml>
 
    为了将此段明文回复给用户,企业应: 1.自己生成时间时间戳(timestamp),随机数字串(nonce)以便生成消息体签名,也可以直接用从企业微信的post url上解析出的对应值。
    2.将明文加密得到密文。   3.用密文,步骤1生成的timestamp,nonce和企业在企业微信设定的token生成消息体签名。   4.将密文,消息体签名,时间戳,随机数字串拼接成xml格式的字符串,发送给企业号。
    以上2,3,4步可以用企业微信提供的库函数EncryptMsg来实现。
    '''
    sRespData = "<xml><ToUserName>ww1436e0e65a779aee</ToUserName><FromUserName>ChenJiaShun</FromUserName><CreateTime>1476422779</CreateTime><MsgType>text</MsgType><Content>你好</Content><MsgId>1456453720</MsgId><AgentID>1000002</AgentID></xml>"
    ret, sEncryptMsg = wxcpt.EncryptMsg(sRespData, sReqNonce, sReqTimeStamp)
    if (ret != 0):
        print
        "ERR: EncryptMsg ret: " + str(ret)
        sys.exit(1)
    # ret == 0 加密成功,企业需要将sEncryptMsg返回给企业号
    # TODO:
    # HttpUitls.SetResponse(sEncryptMsg)

验证 URL 有效性

文档地址:https://developer.work.weixin.qq.com/document/10514

当点击“保存”提交以上信息时,企业微信会发送一条验证消息到填写的 URL,发送方法为 GET。企业的接收消息服务器接收到验证请求后,需要作出正确的响应才能通过 URL 验证。

企业在获取请求时需要做Urldecode处理,否则会验证不成功
你可以访问接口调试工具进行调试,依次选择 建立连接 > 测试回调模式。

假设接收消息地址设置为:http://api.3dept.com/,企业微信将向该地址发送如下验证请求:

请求方式:GET
请求地址:http://api.3dept.com/?msg_signature=ASDFQWEXZCVAQFASDFASDFSS&timestamp=13500001234&nonce=123412323&echostr=ENCRYPT_STR

在这里插入图片描述
企业后台收到请求后,需要做如下操作:

对收到的请求做Urldecode处理
通过参数msg_signature对请求进行校验,确认调用者的合法性。
解密echostr参数得到消息内容(即msg字段)
在1秒内原样返回明文消息内容(不能加引号,不能带bom头,不能带换行符)

Python 实现验证 URL 有效性源码

from flask import Flask, request
from callback.WXBizMsgCrypt3 import WXBizMsgCrypt
from lxml import etree
import time

from configobj import ConfigObj
from receive.PassiveRecovery import PassiveRecovery

config = ConfigObj('./config.ini', encoding='utf-8')
debug_mode = config['dev'].as_bool('debug')

app = Flask(__name__)


@app.route('/', methods=['GET'])
def receive_callback():
    # 获取参数
    msg_signature = request.args.get('msg_signature')
    timestamp = request.args.get('timestamp')
    nonce = request.args.get('nonce')
    echostr = request.args.get('echostr')
    
    print("msg_signature", msg_signature)
    print("timestamp", timestamp)
    print("nonce", nonce)
    print("echostr", echostr)    
    print(config['wechat']['Token'], config['wechat']['EncodingAESKey'], config['wechat']['CorpID'])

    # 创建 WXBizMsgCrypt 对象
    wxcpt = WXBizMsgCrypt(config['wechat']['Token'], config['wechat']['EncodingAESKey'], config['wechat']['CorpID'])
    ret, sEchoStr = wxcpt.VerifyURL(msg_signature, timestamp, nonce, echostr)
    print("sEchoStr", sEchoStr)
    if ret != 0:
        print(str(ret))
        return 'ERR: VerifyURL ret: ' + str(ret)


    return sEchoStr


if __name__ == '__main__':
    app.run(host="0.0.0.0", debug=debug_mode, port=8888)

使用接收消息

开启接收消息模式后,企业微信会将消息发送给企业填写的URL,企业后台需要做正确的响应。

接收消息协议的说明

企业微信服务器在五秒内收不到响应会断掉连接,并且重新发起请求,总共重试三次。如果企业在调试中,发现成员无法收到被动回复的消息,可以检查是否消息处理超时。
当接收成功后,http头部返回200表示接收ok,其他错误码企业微信后台会一律当做失败并发起重试。
关于重试的消息排重,有msgid的消息推荐使用msgid排重。事件类型消息推荐使用FromUserName + CreateTime排重。
假如企业无法保证在五秒内处理并回复,或者不想回复任何内容,可以直接返回200(即以空串为返回包)。企业后续可以使用主动发消息接口进行异步回复。

接收消息请求的说明
假设企业的接收消息的URL设置为http://api.3dept.com。

请求方式:POST
请求地址 :http://api.3dept.com/?msg_signature=ASDFQWEXZCVAQFASDFASDFSS&timestamp=13500001234&nonce=123412323

接收数据格式

<xml> 
   <ToUserName><![CDATA[toUser]]></ToUserName>
   <AgentID><![CDATA[toAgentID]]></AgentID>
   <Encrypt><![CDATA[msg_encrypt]]></Encrypt>
</xml>

在这里插入图片描述
企业收到消息后,需要作如下处理:

对msg_signature进行校验
解密Encrypt,得到明文的消息结构体(消息结构体后面章节会详说)
如果需要被动回复消息,构造被动响应包
正确响应本次请求

被动响应包的数据格式

<xml>
   <Encrypt><![CDATA[msg_encrypt]]></Encrypt>
   <MsgSignature><![CDATA[msg_signature]]></MsgSignature>
   <TimeStamp>timestamp</TimeStamp>
   <Nonce><![CDATA[nonce]]></Nonce>
</xml>

在这里插入图片描述
Python 实现接收回复消息源码

from flask import Flask, request
from callback.WXBizMsgCrypt3 import WXBizMsgCrypt
from lxml import etree
import time

from configobj import ConfigObj
from receive.PassiveRecovery import PassiveRecovery

config = ConfigObj('./config.ini', encoding='utf-8')
debug_mode = config['dev'].as_bool('debug')

app = Flask(__name__)


@app.route('/', methods=['POST'])
def callback_message():
    # 获取参数
    msg_signature = request.args.get('msg_signature')
    timestamp = request.args.get('timestamp')
    nonce = request.args.get('nonce')

    # 获取 POST 的原始数据
    sReqData = request.data

    # 创建 WXBizMsgCrypt 对象
    wxcpt = WXBizMsgCrypt(config['wechat']['Token'], config['wechat']['EncodingAESKey'], config['wechat']['CorpID'])

    ret, sMsg = wxcpt.DecryptMsg(sReqData, msg_signature, timestamp, nonce)
    if ret != 0:
        return 'ERR: DecryptMsg ret: ' + str(ret)

    # 解析 XML
    xml_tree = etree.fromstring(sMsg)
    print(xml_tree)

    from_user = xml_tree.find("FromUserName").text
    to_user = xml_tree.find("ToUserName").text
    msg_type = xml_tree.find("MsgType").text

    print(from_user)
    print(to_user)
    print(msg_type)

    # 消息被动回复
    if msg_type == 'text':
        content = xml_tree.find("Content").text
        print(content)
        reply_xml = PassiveRecovery(to_user, from_user, msg_type, msg_content=content).text()
    else:
        media_id = xml_tree.find("MediaId").text
        print(media_id)
        reply_xml = PassiveRecovery(to_user, from_user, msg_type, media_id=media_id).image()

    # 加密回复的 XML
    ret, sEncryptMsg = wxcpt.EncryptMsg(reply_xml, nonce, timestamp)
    if ret != 0:
        return 'ERR: EncryptMsg ret: ' + str(ret)

    return sEncryptMsg


if __name__ == '__main__':
    app.run(debug=debug_mode, port=8888)

企业微信服务器 IP 段

企业微信在回调企业指定的URL时,是通过特定的IP发送出去的。如果企业需要做防火墙配置,那么可以通过这个接口获取到所有相关的IP段。

请求方式:GETHTTPS)
请求地址: https://qyapi.weixin.qq.com/cgi-bin/getcallbackip?access_token=ACCESS_TOKEN

在这里插入图片描述
返回结果:

{
	"errcode": 0,
	"errmsg": "ok",
	"ip_list": ["101.226.103.*", "101.226.62.*"]
}

在这里插入图片描述

源码部署服务器

接收消息 ./receive/PassiveRecovery.py

import time


class PassiveRecovery(object):
    # 传入数据 to_user, from_user, msg_type, msg_content, media_id
    def __init__(self, to_user, from_user, msg_type, msg_content=None, media_id=None):
        self.to_user = to_user
        self.from_user = from_user
        self.msg_type = msg_type
        self.msg_content = msg_content
        self.media_id = media_id

    def text(self):
        return f"""<xml>
        <ToUserName><![CDATA[{self.to_user}]]></ToUserName>
        <FromUserName><![CDATA[{self.from_user}]]></FromUserName>
        <CreateTime>{int(time.time())}</CreateTime>
        <MsgType><![CDATA[text]]></MsgType>
        <Content><![CDATA[{self.msg_content}]]></Content>
        </xml>"""

    def image(self):
        return f"""<xml>
        <ToUserName><![CDATA[{self.to_user}]]></ToUserName>
        <FromUserName><![CDATA[{self.from_user}]]></FromUserName>
        <CreateTime>{int(time.time())}</CreateTime>
        <MsgType><![CDATA[image]]></MsgType>
        <Image>
            <MediaId><![CDATA[{self.media_id}]]></MediaId>
        </Image>
        </xml>"""

    def video(self):
        return f"""<xml>
        <ToUserName><![CDATA[{self.to_user}]]></ToUserName>
        <FromUserName><![CDATA[{self.from_user}]]></FromUserName>
        <CreateTime>{int(time.time())}</CreateTime>
        <MsgType><![CDATA[video]]></MsgType>
        <Video>
            <MediaId><![CDATA[{self.media_id}]]></MediaId>
            <Title><![CDATA[title]]></Title>
            <Description><![CDATA[description]]></Description>
        </Video>
        </xml>"""

main.py

from flask import Flask, request
from callback.WXBizMsgCrypt3 import WXBizMsgCrypt
from lxml import etree
import time

from configobj import ConfigObj
from receive.PassiveRecovery import PassiveRecovery

config = ConfigObj('./config.ini', encoding='utf-8')
debug_mode = config['dev'].as_bool('debug')

app = Flask(__name__)


@app.route('/', methods=['GET'])
def receive_callback():
    # 获取参数
    msg_signature = request.args.get('msg_signature')
    timestamp = request.args.get('timestamp')
    nonce = request.args.get('nonce')
    echostr = request.args.get('echostr')

    # 创建 WXBizMsgCrypt 对象
    wxcpt = WXBizMsgCrypt(config['wechat']['Token'], config['wechat']['EncodingAESKey'], config['wechat']['CorpID'])

    ret, sEchoStr = wxcpt.VerifyURL(msg_signature, timestamp, nonce, echostr)
    if ret != 0:
        return 'ERR: VerifyURL ret: ' + str(ret)

    return sEchoStr


@app.route('/', methods=['POST'])
def callback_message():
    # 获取参数
    msg_signature = request.args.get('msg_signature')
    timestamp = request.args.get('timestamp')
    nonce = request.args.get('nonce')

    # 获取 POST 的原始数据
    sReqData = request.data

    # 创建 WXBizMsgCrypt 对象
    wxcpt = WXBizMsgCrypt(config['wechat']['Token'], config['wechat']['EncodingAESKey'], config['wechat']['CorpID'])

    ret, sMsg = wxcpt.DecryptMsg(sReqData, msg_signature, timestamp, nonce)
    if ret != 0:
        return 'ERR: DecryptMsg ret: ' + str(ret)

    # 解析 XML
    xml_tree = etree.fromstring(sMsg)
    print(xml_tree)

    from_user = xml_tree.find("FromUserName").text
    to_user = xml_tree.find("ToUserName").text
    msg_type = xml_tree.find("MsgType").text

    print(from_user)
    print(to_user)
    print(msg_type)

    # 消息被动回复
    if msg_type == 'text':
        content = xml_tree.find("Content").text
        print(content)
        reply_xml = PassiveRecovery(to_user, from_user, msg_type, msg_content=content).text()
    else:
        media_id = xml_tree.find("MediaId").text
        print(media_id)
        reply_xml = PassiveRecovery(to_user, from_user, msg_type, media_id=media_id).image()

    # 加密回复的 XML
    ret, sEncryptMsg = wxcpt.EncryptMsg(reply_xml, nonce, timestamp)
    if ret != 0:
        return 'ERR: EncryptMsg ret: ' + str(ret)

    return sEncryptMsg


if __name__ == '__main__':
    app.run(debug=debug_mode, port=8888)

参考项目:https://github.com/Waite0603/enterprise_wechat_bot

执行 main.py

python main.py

使用 Gunicorn 运行 Flask

gevent 是一个基于 libevent 的 Python 协程库,用于实现高性能的并发服务器。与多线程或多进程相比,使用 gevent 可以更有效地处理大量的并发连接,因为它通过协程实现非阻塞 I/O,从而避免了线程切换的开销。

安装必要的库:

首先确保你已经安装了 Flask 和 gevent。你可以使用 pip 来安装它们:

pip install Flask gevent

使用 Gunicorn + gevent worker

对于生产环境,更常见的做法是使用 Gunicorn 作为反向代理服务器,并配置它使用 gevent worker。这样,你可以利用 Gunicorn 的许多高级功能,如进程管理、日志记录等,同时仍然享受 gevent 带来的性能优势。

安装 Gunicorn

pip install gunicorn

使用命令启动你的 Flask 应用:

gunicorn -w 4 -k gevent 'app:main'

这里,-w 4 指定了 4 个 worker 进程,-k gevent 指定了使用 gevent worker。‘app:main’ 是你的 Flask 应用的位置,其中 app 是 Python 模块名,后面的 main 是 Flask 应用实例的变量名。

配置 nginx 服务器

域名解析:http://wxbot.willwaking.com -> 服务器 IP

在这里插入图片描述

注意:防止和静态路由冲突需要配置 nginx 请求转发。

location /wxbot/ {  
    proxy_pass http://localhost:8888/;  
    proxy_set_header Host $host;  
    proxy_set_header X-Real-IP $remote_addr;  
    proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;  
    proxy_set_header X-Forwarded-Proto $scheme;  
}

测试结果:

在这里插入图片描述
配置成功:

在这里插入图片描述

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐