
一文了解:python 并发、并行、同步、异步、阻塞、非阻塞
python 并发、并行、同步、异步、阻塞、非阻塞
在 Python 中,理解并发(Concurrency)、并行(Parallelism)、同步(Synchronization)、异步(Asynchronous)、阻塞(Blocking)和非阻塞(Non-blocking)是非常重要的,因为它们是构建高性能应用程序的关键概念。
1. 并发(Concurrency)
并发是指程序在同一时间段内可以处理多个任务的能力。具体来说,程序看起来像是同时执行多个任务,但实际上它们是在交替执行。
示例:多线程
import threading
import time
# 多线程 并行
def worker_threads():
print(f"Thread {threading.current_thread().name} started")
time.sleep(2)
print(f"Thread {threading.current_thread().name} finished")
# 多线程 并行
if __name__ == '__main__':
# 创建多个线程
threads = []
for i in range(5):
thread = threading.Thread(target=worker_threads, name=f"Thread-{i}")
threads.append(thread)
thread.start()
# 等待所有线程完成
for thread in threads:
thread.join()
print("All threads finished")
运行结果:
Thread Thread-0 started
Thread Thread-1 started
Thread Thread-2 started
Thread Thread-3 started
Thread Thread-4 started
Thread Thread-0 finished
Thread Thread-2 finishedThread Thread-4 finished
Thread Thread-3 finishedThread Thread-1 finished
All threads finished
Process finished with exit code 0
2. 并行(Parallelism)
并行是指程序在同一时间可以真正同时执行多个任务的能力。通常需要硬件支持,例如多核处理器。
示例:多进程
import multiprocessing
def worker_multiprocessing():
print(f"Process {multiprocessing.current_process().name} started")
time.sleep(2)
print(f"Process {multiprocessing.current_process().name} finished")
if __name__ == '__main__':
# 创建多个进程
processes = []
for i in range(5):
multiprocessing.Process(target=worker_multiprocessing, name="dddd")
process = multiprocessing.Process(target=worker_multiprocessing, name=f"Process-{i}")
processes.append(process)
process.start()
# 等待所有进程完成
for process in processes:
process.join()
print("All processes finished")
运行结果:
Process Process-0 started
Process Process-1 started
Process Process-2 started
Process Process-3 started
Process Process-4 started
Process Process-0 finished
Process Process-3 finished
Process Process-1 finished
Process Process-2 finished
Process Process-4 finished
All processes finished
3. 同步(Synchronization)
同步是指在多线程或多进程环境中,通过锁或其他机制确保资源的安全访问。
示例:锁(Lock)
import threading
import time
def worker_sync(lock):
with lock:
print(f"Thread {threading.current_thread().name} started")
time.sleep(2)
print(f"Thread {threading.current_thread().name} finished")
if __name__ == '__main__':
lock = threading.Lock()
# 创建多个线程
threads = []
for i in range(5):
thread = threading.Thread(target=worker_sync, args=(lock,), name=f"Thread-{i}")
threads.append(thread)
thread.start()
# 等待所有线程完成
for thread in threads:
thread.join()
print("All threads finished")
运行结果:
Thread Thread-0 started
Thread Thread-0 finished
Thread Thread-1 started
Thread Thread-1 finished
Thread Thread-2 started
Thread Thread-2 finished
Thread Thread-3 started
Thread Thread-3 finished
Thread Thread-4 started
Thread Thread-4 finished
All threads finished
4. 异步(Asynchronous)
异步是指程序可以在等待某个操作完成的同时继续执行其他任务。异步编程通常使用回调函数或协程。
示例:异步 I/O(使用 asyncio)
# 异步
import asyncio
async def worker_async():
print(f"Worker {asyncio.current_task().get_name()} started")
await asyncio.sleep(2)
print(f"Worker {asyncio.current_task().get_name()} finished")
async def main_async():
tasks = []
for i in range(5):
task = asyncio.create_task(worker_async(), name=f"Worker-{i}")
tasks.append(task)
await asyncio.gather(*tasks)
if __name__ == '__main__':
asyncio.run(main_async())
运行结果:
Worker Worker-0 started
Worker Worker-1 started
Worker Worker-2 started
Worker Worker-3 started
Worker Worker-4 started
Worker Worker-0 finished
Worker Worker-1 finished
Worker Worker-2 finished
Worker Worker-3 finished
Worker Worker-4 finished
5. 阻塞(Blocking)
阻塞是指程序在执行某个操作时会暂停执行,直到该操作完成。例如,当执行一个阻塞的 I/O 操作时,程序会等待直到 I/O 操作完成。
示例:阻塞 I/O
import time
def blocking_io():
print("Starting blocking IO")
time.sleep(5)
print("Finished blocking IO")
if __name__ == '__main__':
blocking_io()
6. 非阻塞(Non-blocking)
非阻塞是指程序在执行某个操作时不会暂停执行,而是继续执行其他任务。通常用于网络 I/O 或文件 I/O。
示例:非阻塞 I/O(使用 select)
# 非阻塞的tcp服务器示例 (注意:它监听的是 TCP 连接而不是 HTTP 请求,无法用http请求响应)
import select
import socket
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_socket.bind(('localhost', 8001))
server_socket.listen(5)
sockets_list = [server_socket]
def handle_client(client_socket):
request = client_socket.recv(1024)
print(f"Received : {request.decode()}")
response = "Hello, World!\n"
client_socket.send(response.encode())
client_socket.close()
while True:
read_sockets, _, exception_sockets = select.select(sockets_list, [], [])
for notified_socket in read_sockets:
if notified_socket == server_socket:
client_socket, client_address = server_socket.accept()
sockets_list.append(client_socket)
else:
handle_client(notified_socket)
for notified_socket in exception_sockets:
if notified_socket.fileno() >= 0: # Make sure file descriptor is valid
if notified_socket in sockets_list:
sockets_list.remove(notified_socket)
notified_socket.close()
#下面客户端示例调用后,运行结果:
Received : Hello, TCP Server!
要测试非阻塞 TCP 服务器示例的可用性,可以使用一个 TCP 客户端来模拟连接并发送数据给服务器,然后查看服务器是否能够正确响应。TCP 客户端示例代码,可以用来测试非阻塞 TCP 服务器:
import socket
server_address = ('localhost', 8001)
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client_socket.connect(server_address)
# 发送数据给服务器
client_socket.sendall(b"Hello, TCP Server!")
# 接收服务器的响应
response = client_socket.recv(1024)
print("Received:", response.decode())
client_socket.close()
#运行结果:
Received: Hello, World!
概念解释完啦~ 简单示例也看完啦~
以下是以两个url为例,使用上述不同方式去请求的代码示例:
1. 并发(Concurrency)
示例:多线程发送 HTTP 请求,用于需要并发请求多个api
import threading
import requests
def send_request(url, headers, payload):
response = requests.post(url, headers=headers, json=payload)
print(f"Response status code: {response.status_code}")
print(f"Response content: {response.text}")
# 测试数据
test_cases = [
{
"url": "https://api.example.com/v1/resource",
"headers": {"Authorization": "Bearer token1"},
"payload": {"key": "value1"}
},
{
"url": "https://api.example.com/v1/resource",
"headers": {"Authorization": "Bearer token2"},
"payload": {"key": "value2"}
}
]
# 创建线程列表
threads = []
# 创建并启动线程
for test_case in test_cases:
thread = threading.Thread(target=send_request, args=(test_case["url"], test_case["headers"], test_case["payload"]))
threads.append(thread)
thread.start()
# 等待所有线程完成
for thread in threads:
thread.join()
print("All requests finished")
2. 并行(Parallelism)
示例:多进程发送 HTTP 请求
import multiprocessing
import requests
import time
def send_request(url, headers, payload):
response = requests.post(url, headers=headers, json=payload)
print(f"Response status code: {response.status_code}")
print(f"Response content: {response.text}")
# 测试数据
test_cases = [
{
"url": "https://api.example.com/v1/resource",
"headers": {"Authorization": "Bearer token1"},
"payload": {"key": "value1"}
},
{
"url": "https://api.example.com/v1/resource",
"headers": {"Authorization": "Bearer token2"},
"payload": {"key": "value2"}
}
]
# 创建进程列表
processes = []
# 创建并启动进程
for test_case in test_cases:
process = multiprocessing.Process(target=send_request, args=(test_case["url"], test_case["headers"], test_case["payload"]))
processes.append(process)
process.start()
# 等待所有进程完成
for process in processes:
process.join()
print("All requests finished")
3. 同步(Synchronization)
示例:使用锁同步多线程
import threading
import requests
import time
def send_request(lock, url, headers, payload):
with lock:
response = requests.post(url, headers=headers, json=payload)
print(f"Response status code: {response.status_code}")
print(f"Response content: {response.text}")
# 测试数据
test_cases = [
{
"url": "https://api.example.com/v1/resource",
"headers": {"Authorization": "Bearer token1"},
"payload": {"key": "value1"}
},
{
"url": "https://api.example.com/v1/resource",
"headers": {"Authorization": "Bearer token2"},
"payload": {"key": "value2"}
}
]
# 创建锁
lock = threading.Lock()
# 创建线程列表
threads = []
# 创建并启动线程
for test_case in test_cases:
thread = threading.Thread(target=send_request, args=(lock, test_case["url"], test_case["headers"], test_case["payload"]))
threads.append(thread)
thread.start()
# 等待所有线程完成
for thread in threads:
thread.join()
print("All requests finished")
4. 异步(Asynchronous)
示例:使用 asyncio 发送异步 HTTP 请求
import asyncio
import aiohttp
async def send_request(url, headers, payload):
async with aiohttp.ClientSession() as session:
async with session.post(url, headers=headers, json=payload) as response:
print(f"Response status code: {response.status}")
response_text = await response.text()
print(f"Response content: {response_text}")
# 测试数据
test_cases = [
{
"url": "https://api.example.com/v1/resource",
"headers": {"Authorization": "Bearer token1"},
"payload": {"key": "value1"}
},
{
"url": "https://api.example.com/v1/resource",
"headers": {"Authorization": "Bearer token2"},
"payload": {"key": "value2"}
}
]
async def main():
tasks = []
for test_case in test_cases:
task = asyncio.create_task(send_request(test_case["url"], test_case["headers"], test_case["payload"]))
tasks.append(task)
await asyncio.gather(*tasks)
asyncio.run(main())
5. 阻塞(Blocking)
示例:阻塞式发送 HTTP 请求
import requests
import time
def send_request(url, headers, payload):
response = requests.post(url, headers=headers, json=payload)
print(f"Response status code: {response.status_code}")
print(f"Response content: {response.text}")
# 测试数据
test_cases = [
{
"url": "https://api.example.com/v1/resource",
"headers": {"Authorization": "Bearer token1"},
"payload": {"key": "value1"}
},
{
"url": "https://api.example.com/v1/resource",
"headers": {"Authorization": "Bearer token2"},
"payload": {"key": "value2"}
}
]
# 依次发送请求
for test_case in test_cases:
send_request(test_case["url"], test_case["headers"], test_case["payload"])
print("All requests finished")
6. 非阻塞(Non-blocking)
示例:使用 select 发送非阻塞 HTTP 请求
import select
import socket
import requests
import time
def send_request(url, headers, payload):
response = requests.post(url, headers=headers, json=payload)
print(f"Response status code: {response.status_code}")
print(f"Response content: {response.text}")
# 测试数据
test_cases = [
{
"url": "https://api.example.com/v1/resource",
"headers": {"Authorization": "Bearer token1"},
"payload": {"key": "value1"}
},
{
"url": "https://api.example.com/v1/resource",
"headers": {"Authorization": "Bearer token2"},
"payload": {"key": "value2"}
}
]
# 创建套接字列表
sockets_list = []
# 创建并启动套接字
for test_case in test_cases:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(("localhost", 8000))
sockets_list.append(sock)
# 监听套接字
while sockets_list:
ready_to_read, _, _ = select.select(sockets_list, [], [])
for sock in ready_to_read:
send_request(test_cases[sockets_list.index(sock)]["url"], test_cases[sockets_list.index(sock)]["headers"], test_cases[sockets_list.index(sock)]["payload"])
sockets_list.remove(sock)
print("All requests finished")
7. 总结
通过以上示例,我们详细介绍了 Python 中的几个关键概念:
并发(Concurrency):在同一时间段内交替处理多个任务。
并行(Parallelism):在同一时间真正同时执行多个任务(需硬件支持)。
同步(Synchronization):确保多线程或多进程环境下的资源安全访问。
异步(Asynchronous):在等待某个操作完成的同时继续执行其他任务。
阻塞(Blocking):在执行某个操作时会暂停执行,直到该操作完成。
非阻塞(Non-blocking):在执行某个操作时不会暂停执行,而是继续执行其他任务。
更多推荐
所有评论(0)