一、背景

这还是2个月前做的一次接口性能测试,关于locust脚本的单机多核运行,以及主从节点之间的数据通信

先简单交代下背景,在APP上线之前,需要对登录接口进行性能测试。经过评估,我还是优先选择了locust来进行脚本开发,本次用到了locust的单机多核运行能力,只不过这里还涉及到主从节点之间数据通信。现成的可参考的有效文档甚少,所以还是自己摸着官方文档过河比较靠谱。

顺带提一下,学习框架这种东西最好的教程其实还得是官方文档以及框架源码了,这里贴上locust官方文档链接,需要的可以自行学习:https://docs.locust.io/en/stable/what-is-locust.html

二、代码编写

其实脚本代码的编写一大重点就是如何处理测试数据,不同的测试需求对于测试数据的处理是不同的。比如这次的需求,手机号不能重复。另外考虑到长时间的负载压力,数据量还得足够。

最后测试数据还需要处理,那么我使用的测试号段是非真实号码段,测试结束后可以查询对应号段内的手机号,进行相关业务数据的清理。

1. 代码概览

还是老样子,先附上全部代码,然后对其结构进行拆分讲解。


  1. import random

  2. import time

  3. from collections import deque

  4. from locust import HttpUser, task, run_single_user, TaskSet, events

  5. from locust.runners import WorkerRunner, MasterRunner

  6. CURRENT_TIMESTAMP = str(round(time.time() * 1000))

  7. RANDOM = str(random.randint(10000000, 99999999))

  8. MOBILE_HEADER = {

  9. "skip-request-expired": "true",

  10. "skip-auth": "true",

  11. "skip-sign": "true",

  12. "os": "IOS",

  13. "device-id": "198EA6A4677649018708B400F3DF69FB",

  14. "nonce": RANDOM,

  15. "sign": "12333",

  16. "version": "1.2.0",

  17. "timestamp": CURRENT_TIMESTAMP,

  18. "Content-Type": "application/json"

  19. }

  20. last_mobile = ""

  21. worker_mobile_deque = deque()

  22. # 13300120000, 13300160000 新用户注册号段

  23. @events.test_start.add_listener

  24. def on_test_start(environment, **_kwargs):

  25. if not isinstance(environment.runner, WorkerRunner):

  26. mobile_list = []

  27. for i in range(13300120000, 13300160000):

  28. mobile_list.append(i)

  29. mobile_list_length = len(mobile_list)

  30. print("列表已生成,总计数量:", mobile_list_length)

  31. worker_count = environment.runner.worker_count

  32. chunk_size = int(mobile_list_length / worker_count)

  33. print(f"平均每个worker分得的手机号数量:{chunk_size}")

  34. for i, worker in enumerate(environment.runner.clients):

  35. start_index = i * chunk_size

  36. if i + 1 < worker_count:

  37. end_index = start_index + chunk_size

  38. else:

  39. end_index = len(mobile_list)

  40. data = mobile_list[start_index:end_index]

  41. environment.runner.send_message("mobile_list", data, worker)

  42. def setup_mobile_list(environment, msg, **kwargs):

  43. len_msg_data = len(msg.data)

  44. print(f"worker收到的master传来的数据号段:{msg.data[0]} ~ {msg.data[len_msg_data-1]}")

  45. global worker_mobile_deque

  46. worker_mobile_deque = deque(msg.data)

  47. @events.init.add_listener

  48. def on_locust_init(environment, **_kwargs):

  49. if not isinstance(environment.runner, MasterRunner):

  50. environment.runner.register_message('mobile_list', setup_mobile_list)

  51. class VcodeLoginUser(TaskSet):

  52. # wait_time = between(5, 5)

  53. @task

  54. def vcode_login(self):

  55. test_mobile = worker_mobile_deque.popleft()

  56. print("当前获取的手机号:", test_mobile)

  57. # print("当前队列大小:", len(worker_mobile_deque))

  58. global last_mobile

  59. last_mobile = test_mobile

  60. with self.client.post("/g/sendMobileVcode",

  61. headers=MOBILE_HEADER,

  62. json={"busiType": "login", "mobile": str(test_mobile)}) as send_response:

  63. try:

  64. send_response_json = send_response.json()

  65. if send_response_json["message"] == "success":

  66. params = {"mobile": str(test_mobile), "vcode": "111111"}

  67. # print(test_mobile, "登录请求参数:", params)

  68. with self.client.post("/g/vcodeLogin",

  69. json=params,

  70. headers=MOBILE_HEADER,

  71. catch_response=True) as login_response:

  72. # print(login_response.json)

  73. login_response_json = login_response.json()

  74. if login_response_json["message"] != "success":

  75. login_response.failure("message not equal success")

  76. elif login_response_json["code"] != 0:

  77. login_response.failure("code not equal 0")

  78. elif login_response_json["data"]["rId"] == "":

  79. login_response.failure("rid is null")

  80. elif login_response_json["data"]["mobile"] != str(test_mobile):

  81. login_response.failure("mobile is error,入参手机号{},返回的手机号{}"

  82. .format(test_mobile, login_response.json()["data"]["mobile"]))

  83. # print(test_mobile, "请求结果:", login_response.json())

  84. else:

  85. send_response.failure("{} send code fail".format(test_mobile))

  86. except Exception as e:

  87. send_response.failure("send code fail {}".format(e))

  88. @events.test_stop.add_listener

  89. def on_test_stop(environment, **kwargs):

  90. print("脚本结束")

  91. print("当前队列大小:", len(worker_mobile_deque))

  92. print("最后的手机号:", last_mobile)

  93. class LocustLogin(HttpUser):

  94. tasks = [VcodeLoginUser]

  95. host = "https://qa.test.com"

  96. if __name__ == '__main__':

  97. run_single_user(LocustLogin)

2. 代码拆解-要加必要的断言

首先是基于locust开发的http请求的脚本大结构是不变的,依旧是两大块:HttpUserTaskSet,这里不再对其讲解了,大伙看下官方文档就明白了。

接下来就是类VcodeLoginUser,可以看到在这里面是定义了单个用户的详细动作。注意这里要加上必要的断言。否则仅靠框架的非200外的错误断言还是不够的。

比如我这里关注登录成功后的几个必要字段:coderIdmobile,这些一定是要符合断言的才可以。

果不其然,压测过程中就发现了并发情况下会出现的问题:入参手机号是a,接口返回的手机号是b。并发量越大错误越多。如果我只断言code=0,那么这个问题就不容易发现了,虽然接口返回的code都是成功的,但是业务上已经存在错误了。


  1. ...

  2. with self.client.post("/g/sendMobileVcode",

  3. headers=MOBILE_HEADER,

  4. json={"busiType": "login", "mobile": str(test_mobile)}) as send_response:

  5. try:

  6. send_response_json = send_response.json()

  7. if send_response_json["message"] == "success":

  8. params = {"mobile": str(test_mobile), "vcode": "111111"}

  9. # print(test_mobile, "登录请求参数:", params)

  10. with self.client.post("/g/vcodeLogin",

  11. json=params,

  12. headers=MOBILE_HEADER,

  13. catch_response=True) as login_response:

  14. # print(login_response.json)

  15. login_response_json = login_response.json()

  16. if login_response_json["message"] != "success":

  17. login_response.failure("message not equal success")

  18. elif login_response_json["code"] != 0:

  19. login_response.failure("code not equal 0")

  20. elif login_response_json["data"]["rId"] == "":

  21. login_response.failure("rid is null")

  22. elif login_response_json["data"]["mobile"] != str(test_mobile):

  23. login_response.failure("mobile is error,入参手机号{},返回的手机号{}"

  24. .format(test_mobile, login_response.json()["data"]["mobile"]))

  25. # print(test_mobile, "请求结果:", login_response.json())

  26. else:

  27. send_response.failure("{} send code fail".format(test_mobile))

  28. except Exception as e:

  29. send_response.failure("send code fail {}".format(e))

  30. ...

3. 代码拆解-单机多核处理

接下来就是重点了,如何在单台机器上用到多cpu。最开始的时候我忽略了这点,后来发现负载上不去,一打开资源监视器才发现只有1个cpu在满负载运行。

这里示意图仅供参考,我的win笔记本是12c的。

因为Locust是单进程的,不能充分利用多核CPU,于是需要我们压力机上开启一个master进程,然后再开启多个slave进程,组成一个单机分布式系统即可。

开启的方式也很简单:


  1. # 开启 master

  2. locust -f locustfile.py --master

  3. # 开启 slave

  4. locust -f locustfile.py --slave

这里我们开启 slave 节点的时候可以开启对应多个命令行窗口,当时没截图,借用网上的图片示意一下:

开启后,你的web界面就可以实时看到当前启动的节点数了。

4. 代码拆解-处理主从节点数据通信

开启主从节点倒是很容易,测试数据就需要针对性进行处理了。

因为我的测试登录用的手机号不可以重复,所以要保证不同 slave 节点上同时运行的代码产生的手机号都不可以重复。

继续扒了下官方文档,发现可以通过增加事件监听器来实现我的需求。

这里我加了三个监听器分别来处理不同的事情:

  • @events.init.add_listener:在locust运行初始化的时候执行
  • @events.test_start.add_listener: 在测试代码开始运行的时候执行
  • @events.test_stop.add_listener: 在测试代码结束运行的时候执行

@events.test_start.add_listener 首先,在@events.test_start.add_listener里,我主要处理全量数据的生成,以及把这些手机号平均分配给生成的 slave 节点。


  1. @events.test_start.add_listener

  2. def on_test_start(environment, **_kwargs):

  3. if not isinstance(environment.runner, WorkerRunner):

  4. mobile_list = []

  5. for i in range(13300120000, 13300160000):

  6. mobile_list.append(i)

  7. mobile_list_length = len(mobile_list)

  8. print("列表已生成,总计数量:", mobile_list_length)

  9. worker_count = environment.runner.worker_count

  10. chunk_size = int(mobile_list_length / worker_count)

  11. print(f"平均每个worker分得的手机号数量:{chunk_size}")

  12. for i, worker in enumerate(environment.runner.clients):

  13. start_index = i * chunk_size

  14. if i + 1 < worker_count:

  15. end_index = start_index + chunk_size

  16. else:

  17. end_index = len(mobile_list)

  18. data = mobile_list[start_index:end_index]

  19. environment.runner.send_message("mobile_list", data, worker)


  1. def setup_mobile_list(environment, msg, **kwargs):

  2. len_msg_data = len(msg.data)

  3. print(f"worker收到的master传来的数据号段:{msg.data[0]} ~ {msg.data[len_msg_data-1]}")

  4. global worker_mobile_deque

  5. worker_mobile_deque = deque(msg.data)

这样,不同的 slave 节点脚步分配到的手机号段就是不同的了,解决测试数据重复的问题。

另外,我定义另一个全局变量worker_mobile_deque,这样不同的 slave 节点接收的数据就可以放到队列里,运行的时候从队列里面取,用一个少一个,直到队列里的数据用完。

@events.init.add_listener 接着就是在@events.init.add_listener里要注册上面定义的数据字段和处理函数。


  1. @events.init.add_listener

  2. def on_locust_init(environment, **_kwargs):

  3. if not isinstance(environment.runner, MasterRunner):

  4. environment.runner.register_message('mobile_list', setup_mobile_list)

@events.test_stop.add_listener 最后,在@events.test_stop.add_listener这里可以做一些后置处理,我是简单起见,只是记录输出了本次测试用到了哪个号码段,这样我下次运行脚本的时候可以从后面的数据开始,最大化测试数据的使用,不浪费。


  1. @events.test_stop.add_listener

  2. def on_test_stop(environment, **kwargs):

  3. print("脚本结束")

  4. print("当前队列大小:", len(worker_mobile_deque))

  5. print("最后的手机号:", last_mobile)

三、小结

脚本调试完后可以稳定运行,接下来就是测试的过程了,进行了服务器单节点、多节点负载能力的测试,水平拓展能力的测试,以及服务动态扩容、长时间高负载测试。测试的角度观察测试报告,服务各项指标的情况。只不过涉及到开发端,调优分析的工作并未能参与很多。不过大概还是那些常见问题,后续有机会可以再单独分享了。

从使用角度来看,locust深得我爱,比起 jemter真的太轻便了,代码灵活度也非常高,单机负载能力也是响当当的,这点比jemeter强太多了。我这个项目不需要非常高的量,所以单机只用了8c就够了。如果有小伙伴需要非常高的并发,locust 也支持多机器分布式,进一步扩大并发能力。

 如果我的博客对你有帮助、如果你喜欢我的博客内容,请 “点赞” “评论” “收藏” 一键三连哦!

最后: 下方这份完整的软件测试视频教程已经整理上传完成,需要的朋友们可以自行领取【保证100%免费】

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐