在 Python 中,理解并发(Concurrency)、并行(Parallelism)、同步(Synchronization)、异步(Asynchronous)、阻塞(Blocking)和非阻塞(Non-blocking)是非常重要的,因为它们是构建高性能应用程序的关键概念。

1. 并发(Concurrency)

并发是指程序在同一时间段内可以处理多个任务的能力。具体来说,程序看起来像是同时执行多个任务,但实际上它们是在交替执行。

示例:多线程

import threading
import time

# 多线程  并行
def worker_threads():
    print(f"Thread {threading.current_thread().name} started")
    time.sleep(2)
    print(f"Thread {threading.current_thread().name} finished")

# 多线程  并行
if __name__ == '__main__':
    # 创建多个线程
    threads = []
    for i in range(5):
        thread = threading.Thread(target=worker_threads, name=f"Thread-{i}")
        threads.append(thread)
        thread.start()

    # 等待所有线程完成
    for thread in threads:
        thread.join()

    print("All threads finished")

运行结果:
Thread Thread-0 started
Thread Thread-1 started
Thread Thread-2 started
Thread Thread-3 started
Thread Thread-4 started
Thread Thread-0 finished
Thread Thread-2 finishedThread Thread-4 finished
Thread Thread-3 finishedThread Thread-1 finished


All threads finished

Process finished with exit code 0

2. 并行(Parallelism)

并行是指程序在同一时间可以真正同时执行多个任务的能力。通常需要硬件支持,例如多核处理器。

示例:多进程

import multiprocessing
def worker_multiprocessing():
    print(f"Process {multiprocessing.current_process().name} started")
    time.sleep(2)
    print(f"Process {multiprocessing.current_process().name} finished")

if __name__ == '__main__':
    # 创建多个进程
    processes = []
    for i in range(5):
        multiprocessing.Process(target=worker_multiprocessing, name="dddd")
        process = multiprocessing.Process(target=worker_multiprocessing, name=f"Process-{i}")
        processes.append(process)
        process.start()
    # 等待所有进程完成
    for process in processes:
        process.join()
    print("All processes finished")
运行结果:
Process Process-0 started
Process Process-1 started
Process Process-2 started
Process Process-3 started
Process Process-4 started
Process Process-0 finished
Process Process-3 finished
Process Process-1 finished
Process Process-2 finished
Process Process-4 finished
All processes finished

3. 同步(Synchronization)

同步是指在多线程或多进程环境中,通过锁或其他机制确保资源的安全访问。

示例:锁(Lock)

import threading
import time

def worker_sync(lock):
    with lock:
        print(f"Thread {threading.current_thread().name} started")
        time.sleep(2)
        print(f"Thread {threading.current_thread().name} finished")

if __name__ == '__main__':
    lock = threading.Lock()
    # 创建多个线程
    threads = []
    for i in range(5):
        thread = threading.Thread(target=worker_sync, args=(lock,), name=f"Thread-{i}")
        threads.append(thread)
        thread.start()
    # 等待所有线程完成
    for thread in threads:
        thread.join()
    print("All threads finished")

运行结果:
Thread Thread-0 started
Thread Thread-0 finished
Thread Thread-1 started
Thread Thread-1 finished
Thread Thread-2 started
Thread Thread-2 finished
Thread Thread-3 started
Thread Thread-3 finished
Thread Thread-4 started
Thread Thread-4 finished
All threads finished

4. 异步(Asynchronous)

异步是指程序可以在等待某个操作完成的同时继续执行其他任务。异步编程通常使用回调函数或协程。

示例:异步 I/O(使用 asyncio)

# 异步
import asyncio

async def worker_async():
    print(f"Worker {asyncio.current_task().get_name()} started")
    await asyncio.sleep(2)
    print(f"Worker {asyncio.current_task().get_name()} finished")

async def main_async():
    tasks = []
    for i in range(5):
        task = asyncio.create_task(worker_async(), name=f"Worker-{i}")
        tasks.append(task)
    await asyncio.gather(*tasks)

if __name__ == '__main__':
    asyncio.run(main_async())

运行结果:
Worker Worker-0 started
Worker Worker-1 started
Worker Worker-2 started
Worker Worker-3 started
Worker Worker-4 started
Worker Worker-0 finished
Worker Worker-1 finished
Worker Worker-2 finished
Worker Worker-3 finished
Worker Worker-4 finished

5. 阻塞(Blocking)

阻塞是指程序在执行某个操作时会暂停执行,直到该操作完成。例如,当执行一个阻塞的 I/O 操作时,程序会等待直到 I/O 操作完成。

示例:阻塞 I/O

import time
def blocking_io():
    print("Starting blocking IO")
    time.sleep(5)
    print("Finished blocking IO")

if __name__ == '__main__':
    blocking_io()

6. 非阻塞(Non-blocking)

非阻塞是指程序在执行某个操作时不会暂停执行,而是继续执行其他任务。通常用于网络 I/O 或文件 I/O。

示例:非阻塞 I/O(使用 select)

# 非阻塞的tcp服务器示例 (注意:它监听的是 TCP 连接而不是 HTTP 请求,无法用http请求响应)
import select
import socket

server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_socket.bind(('localhost', 8001))
server_socket.listen(5)
sockets_list = [server_socket]

def handle_client(client_socket):
    request = client_socket.recv(1024)
    print(f"Received : {request.decode()}")
    response = "Hello, World!\n"
    client_socket.send(response.encode())
    client_socket.close()

while True:
    read_sockets, _, exception_sockets = select.select(sockets_list, [], [])

    for notified_socket in read_sockets:
        if notified_socket == server_socket:
            client_socket, client_address = server_socket.accept()
            sockets_list.append(client_socket)
        else:
            handle_client(notified_socket)

    for notified_socket in exception_sockets:
        if notified_socket.fileno() >= 0:  # Make sure file descriptor is valid
            if notified_socket in sockets_list:
                sockets_list.remove(notified_socket)
                notified_socket.close()

#下面客户端示例调用后,运行结果:
Received : Hello, TCP Server!

要测试非阻塞 TCP 服务器示例的可用性,可以使用一个 TCP 客户端来模拟连接并发送数据给服务器,然后查看服务器是否能够正确响应。TCP 客户端示例代码,可以用来测试非阻塞 TCP 服务器:

import socket

server_address = ('localhost', 8001)
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client_socket.connect(server_address)

# 发送数据给服务器
client_socket.sendall(b"Hello, TCP Server!")

# 接收服务器的响应
response = client_socket.recv(1024)
print("Received:", response.decode())

client_socket.close()

#运行结果:
Received: Hello, World!

概念解释完啦~ 简单示例也看完啦~


以下是以两个url为例,使用上述不同方式去请求的代码示例:

1. 并发(Concurrency)

示例:多线程发送 HTTP 请求,用于需要并发请求多个api

import threading
import requests

def send_request(url, headers, payload):
    response = requests.post(url, headers=headers, json=payload)
    print(f"Response status code: {response.status_code}")
    print(f"Response content: {response.text}")

# 测试数据
test_cases = [
    {
        "url": "https://api.example.com/v1/resource",
        "headers": {"Authorization": "Bearer token1"},
        "payload": {"key": "value1"}
    },
    {
        "url": "https://api.example.com/v1/resource",
        "headers": {"Authorization": "Bearer token2"},
        "payload": {"key": "value2"}
    }
]

# 创建线程列表
threads = []

# 创建并启动线程
for test_case in test_cases:
    thread = threading.Thread(target=send_request, args=(test_case["url"], test_case["headers"], test_case["payload"]))
    threads.append(thread)
    thread.start()

# 等待所有线程完成
for thread in threads:
    thread.join()

print("All requests finished")

2. 并行(Parallelism)

示例:多进程发送 HTTP 请求

import multiprocessing
import requests
import time

def send_request(url, headers, payload):
    response = requests.post(url, headers=headers, json=payload)
    print(f"Response status code: {response.status_code}")
    print(f"Response content: {response.text}")

# 测试数据
test_cases = [
    {
        "url": "https://api.example.com/v1/resource",
        "headers": {"Authorization": "Bearer token1"},
        "payload": {"key": "value1"}
    },
    {
        "url": "https://api.example.com/v1/resource",
        "headers": {"Authorization": "Bearer token2"},
        "payload": {"key": "value2"}
    }
]

# 创建进程列表
processes = []

# 创建并启动进程
for test_case in test_cases:
    process = multiprocessing.Process(target=send_request, args=(test_case["url"], test_case["headers"], test_case["payload"]))
    processes.append(process)
    process.start()

# 等待所有进程完成
for process in processes:
    process.join()

print("All requests finished")

3. 同步(Synchronization)

示例:使用锁同步多线程

import threading
import requests
import time

def send_request(lock, url, headers, payload):
    with lock:
        response = requests.post(url, headers=headers, json=payload)
        print(f"Response status code: {response.status_code}")
        print(f"Response content: {response.text}")

# 测试数据
test_cases = [
    {
        "url": "https://api.example.com/v1/resource",
        "headers": {"Authorization": "Bearer token1"},
        "payload": {"key": "value1"}
    },
    {
        "url": "https://api.example.com/v1/resource",
        "headers": {"Authorization": "Bearer token2"},
        "payload": {"key": "value2"}
    }
]

# 创建锁
lock = threading.Lock()

# 创建线程列表
threads = []

# 创建并启动线程
for test_case in test_cases:
    thread = threading.Thread(target=send_request, args=(lock, test_case["url"], test_case["headers"], test_case["payload"]))
    threads.append(thread)
    thread.start()

# 等待所有线程完成
for thread in threads:
    thread.join()

print("All requests finished")

4. 异步(Asynchronous)

示例:使用 asyncio 发送异步 HTTP 请求

import asyncio
import aiohttp

async def send_request(url, headers, payload):
    async with aiohttp.ClientSession() as session:
        async with session.post(url, headers=headers, json=payload) as response:
            print(f"Response status code: {response.status}")
            response_text = await response.text()
            print(f"Response content: {response_text}")

# 测试数据
test_cases = [
    {
        "url": "https://api.example.com/v1/resource",
        "headers": {"Authorization": "Bearer token1"},
        "payload": {"key": "value1"}
    },
    {
        "url": "https://api.example.com/v1/resource",
        "headers": {"Authorization": "Bearer token2"},
        "payload": {"key": "value2"}
    }
]

async def main():
    tasks = []
    for test_case in test_cases:
        task = asyncio.create_task(send_request(test_case["url"], test_case["headers"], test_case["payload"]))
        tasks.append(task)
    await asyncio.gather(*tasks)

asyncio.run(main())

5. 阻塞(Blocking)

示例:阻塞式发送 HTTP 请求

import requests
import time

def send_request(url, headers, payload):
    response = requests.post(url, headers=headers, json=payload)
    print(f"Response status code: {response.status_code}")
    print(f"Response content: {response.text}")

# 测试数据
test_cases = [
    {
        "url": "https://api.example.com/v1/resource",
        "headers": {"Authorization": "Bearer token1"},
        "payload": {"key": "value1"}
    },
    {
        "url": "https://api.example.com/v1/resource",
        "headers": {"Authorization": "Bearer token2"},
        "payload": {"key": "value2"}
    }
]

# 依次发送请求
for test_case in test_cases:
    send_request(test_case["url"], test_case["headers"], test_case["payload"])

print("All requests finished")

6. 非阻塞(Non-blocking)

示例:使用 select 发送非阻塞 HTTP 请求

import select
import socket
import requests
import time

def send_request(url, headers, payload):
    response = requests.post(url, headers=headers, json=payload)
    print(f"Response status code: {response.status_code}")
    print(f"Response content: {response.text}")

# 测试数据
test_cases = [
    {
        "url": "https://api.example.com/v1/resource",
        "headers": {"Authorization": "Bearer token1"},
        "payload": {"key": "value1"}
    },
    {
        "url": "https://api.example.com/v1/resource",
        "headers": {"Authorization": "Bearer token2"},
        "payload": {"key": "value2"}
    }
]

# 创建套接字列表
sockets_list = []

# 创建并启动套接字
for test_case in test_cases:
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.connect(("localhost", 8000))
    sockets_list.append(sock)

# 监听套接字
while sockets_list:
    ready_to_read, _, _ = select.select(sockets_list, [], [])
    for sock in ready_to_read:
        send_request(test_cases[sockets_list.index(sock)]["url"], test_cases[sockets_list.index(sock)]["headers"], test_cases[sockets_list.index(sock)]["payload"])
        sockets_list.remove(sock)

print("All requests finished")

7. 总结

通过以上示例,我们详细介绍了 Python 中的几个关键概念:

并发(Concurrency):在同一时间段内交替处理多个任务。

并行(Parallelism):在同一时间真正同时执行多个任务(需硬件支持)。

同步(Synchronization):确保多线程或多进程环境下的资源安全访问。

异步(Asynchronous):在等待某个操作完成的同时继续执行其他任务。

阻塞(Blocking):在执行某个操作时会暂停执行,直到该操作完成。

非阻塞(Non-blocking):在执行某个操作时不会暂停执行,而是继续执行其他任务。

0

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐