一文教你面对高并发任务时如何选择:多进(线)程 VS 异步函数
在现代软件开发中,处理高并发和网络I/O密集型任务是一个常见的挑战。Python提供了多种方法来处理并发,其中最常用的是多进(线)程和异步编程。本文将探讨这两种技术在实际应用中的性能差异,并通过实验来比较它们在处理大量网络请求时的效率。
1、多进(线)程和异步函数
1.1、多进(线)程
多进(线)程允许多个任务在同一程序中并行运行。每个线程占用一定的系统资源,如CPU时间和内存。多进(线)程适合于同时执行多个独立任务,尤其是在多核CPU上。
优点
- 可以实现真正的并行执行。
- 在多核处理器上,可以显著提高程序的执行效率。
缺点
- 线程管理需要消耗额外的资源。
- 线程之间的同步和通信可能导致复杂的竞态条件和死锁问题。
1.2、异步函数
异步编程是一种单线程的任务调度方式,它通过事件循环来管理任务的执行。这种方式非常适合处理I/O密集型任务,如网络请求和文件操作。
优点
- 高效的I/O处理能力,不会阻塞主线程。
- 减少了线程创建和上下文切换的开销。
缺点
- 编程模型相对复杂,需要理解事件循环和回调机制。
- 在CPU密集型任务中表现不佳,因为所有任务都在同一个线程中执行。
2、性能对比
异步编程和多进(线)程都是实现并发的有效手段,但它们各有优势和适用场景。异步编程通常用于I/O密集型任务,如文件操作和网络请求,而多进(线)程则可以同时处理多个任务,尤其是在多核处理器上。
2.1、多进(线)程
在使用多进(线)程的时候,一定要注意Python的全局解释器锁(Global Interpreter Lock,简称GIL)机制。我们先看一个多线程的例子,通过实验来直观的说明。
机器配置:Mac-Pro, Apple M2, 10核
2.1.1、Threading
以下是一个使用threading实现多线程的例子
import threading
from datetime import datetime
def cpu_bound_task(idx):
# 执行一个计算密集型任务
count = 0
for i in range(100000000):
count += i
# 任务数
num_tasks = 1
# 创建多于CPU核心数的线程
threads = []
for i in range(num_tasks):
thread = threading.Thread(target=cpu_bound_task, args=(i,))
threads.append(thread)
start_time = datetime.now()
# 启动所有线程
for thread in threads:
thread.start()
# 等待所有线程完成
for thread in threads:
thread.join()
end_time = datetime.now()
print(f"Time: {(end_time-start_time).total_seconds()} seconds")
时间消耗和任务数之间的关系:
Tasks | Time (s) |
---|---|
1 | 2.469811 |
2 | 4.875715 |
3 | 7.155812 |
5 | 11.53091 |
10 | 24.403462 |
可以看到,完成所有任务的总时间和任务数几乎呈线性增长关系,明明是多线程并发执行,为什么时间会成倍增长呢。
因为在Python中,由于全局解释器锁(Global Interpreter Lock,简称GIL),在任何给定时刻只允许一个线程执行Python字节码。这意味着即使你的机器有多个CPU核心,使用Python的标准threading库进行多线程编程时,这些线程在执行计算密集型任务时实际上并不会并行执行,而是会在单个核心上交替执行。
2.1.2、Multiprocessing
我们换一种方式,使用Multiprocessing的Process来试试呢。
from multiprocessing import Process
from datetime import datetime
def cpu_bound_task(idx):
count = 0
for i in range(100000000):
count += i
if __name__ == '__main__':
num_tasks = 1
processes = []
for i in range(num_tasks):
process = Process(target=cpu_bound_task, args=(i,))
processes.append(process)
start_time = datetime.now()
# 启动所有进程
for process in processes:
process.start()
# 等待所有进程完成
for process in processes:
process.join()
end_time = datetime.now()
print(f"Time: {(end_time - start_time).total_seconds()} seconds")
时间消耗和任务数之间的关系:
Tasks | Time (s) |
---|---|
1 | 2.983705 |
2 | 3.308298 |
3 | 3.138572 |
5 | 3.76091 |
10 | 5.613843 |
可以看到,不论多少水个任务,时间消耗在3-5s之间,这个小的波动是因为进程越多,进程之间的资源调度和切换需要时间,并且个人机器上还有其他程序在运行,会占用部分核,所以这个波动是正常的,实现了并行的处理。
2.1.3、Concurrent.Futures
1. concurrent.futures 之 ThreadPoolExecutor
我们继续使用Multiprocessing中的ThreadPoolExecutor来尝试。
import concurrent.futures
from datetime import datetime
def task_function(idx):
count = 0
for i in range(100000000):
count += i
def main():
num_tasks = 1 # 你想要运行的任务数量
start_time = datetime.now() # 开始计时
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
# 使用executor.map来并发执行
executor.map(task_function, range(num_tasks))
end_time = datetime.now() # 结束计时
total_duration = (end_time - start_time).total_seconds()
print(f"Total duration: {total_duration} seconds")
if __name__ == "__main__":
main()
时间消耗和任务数之间的关系:
Tasks | num_workers | Time (s) |
---|---|---|
1 | 1 | 2.392556 |
2 | 1 | 5.000087 |
3 | 1 | 7.298594 |
5 | 1 | 12.322661 |
10 | 1 | 24.470803 |
2 | 2 | 4.907088 |
3 | 3 | 7.498068 |
5 | 5 | 12.665615 |
5 | 10 | 24.979614 |
又出现了这种问题,当num_workers为1的时候,总时长随着任务数增加而成倍增长,这个很正常,因为只有一个worker,所有任务串行处理。可是当任务数和worker数一样的时候,总时长依然是随着任务数成倍增长,这里依然是python的GIL机制在作怪。
2. Concurrent.Futures 之 ProcessPoolExecutor
我们再使用concurrent.futures中的ProcessPoolExecutor来尝试。
import concurrent.futures
from datetime import datetime
def task_function(idx):
count = 0
for i in range(100000000):
count += i
def main():
num_tasks = 1 # 你想要运行的任务数量
start_time = datetime.now() # 开始计时
with concurrent.futures.ProcessPoolExecutor(max_workers=1) as executor:
# 使用executor.map来并发执行
results = executor.map(task_function, range(num_tasks))
end_time = datetime.now() # 结束计时
total_duration = (end_time - start_time).total_seconds()
print(f"Total duration: {total_duration} seconds")
if __name__ == "__main__":
main()
时间消耗和任务数之间的关系:
Tasks | num_workers | Time (s) |
---|---|---|
1 | 1 | 3.018315 |
2 | 2 | 3.198022 |
3 | 3 | 3.391358 |
5 | 5 | 3.84788 |
5 | 10 | 5.737346 |
很显然,当woker数和任务数一样的时候,随着任务数的增长,总的消耗时间基本相同,实现了并行处理。
使用Threading和concurrent.futures中的ThreadPoolExecutor容易受到GIL机制的影响,有时候并不能实现真正的并行。而Multiprocessing的Process,concurrent.futures中的ProcessPoolExecutor则能绕过GIL机制实现并行处理。
注意的是,GIL机制对计算密集型任务有较为明显的影响,但对于网络IO型任务的影响基本上可以忽略。以下是一个网络IO型任务的例子,总时长受任务数量基本可以忽略不计。
import threading
import requests
import time
urls = [
'https://jsonplaceholder.typicode.com/posts/1',
'https://jsonplaceholder.typicode.com/posts/2',
'https://jsonplaceholder.typicode.com/posts/3',
'https://jsonplaceholder.typicode.com/posts/4',
'https://jsonplaceholder.typicode.com/posts/5'
] * 10
def fetch_data(url):
response = requests.get(url)
start_time = time.time()
threads = []
for url in urls:
thread = threading.Thread(target=fetch_data, args=(url,))
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
end_time = time.time()
print(f"Time taken with multithreading: {end_time - start_time} seconds")
2.2、异步函数
即使GIL机制对网络IO型任务影响不大,但是在有大量网络IO型任务需要并发的时候,仍然建议采用异步函数的方式来实现。因为多线程或者多进程机制需要为每一个任务创建一个线程或进程,会大大消耗机器的资源,如果不是计算密集型任务,这种资源的消耗是完全没有必要的。
而异步编程通常使用单线程事件循环来管理多个I/O操作,这样可以避免线程上下文切换的开销。对于大量小而频繁的I/O操作,异步编程可以更高效地利用系统资源。下面是一个异步并发的例子。
import aiohttp
import asyncio
import time
urls = [
'https://jsonplaceholder.typicode.com/posts/1',
'https://jsonplaceholder.typicode.com/posts/2',
'https://jsonplaceholder.typicode.com/posts/3',
'https://jsonplaceholder.typicode.com/posts/4',
'https://jsonplaceholder.typicode.com/posts/5'
] * 100
async def fetch_data(session, url):
async with session.get(url) as response:
pass
async def main():
async with aiohttp.ClientSession() as session:
tasks = [fetch_data(session, url) for url in urls]
await asyncio.gather(*tasks)
start_time = time.time()
asyncio.run(main())
end_time = time.time()
print(f"Time taken with async: {end_time - start_time} seconds")
3、使用建议
3.1、异步编程的优势
-
资源效率:异步编程通常使用单线程事件循环来管理多个I/O操作,这样可以避免线程上下文切换的开销。对于大量小而频繁的I/O操作,异步编程可以更高效地利用系统资源。
-
简单的并发模型:异步编程通过回调、async/await等机制来实现并发,避免了多线程编程中的一些复杂问题,如死锁、竞态条件等。
-
可扩展性:在处理大量并发连接(如高并发的网络服务器)时,异步编程通常比多线程更具可扩展性,因为它不需要为每个连接创建一个线程。
3.2、多线程编程的优势
-
多核利用:多线程编程可以利用多核CPU的优势,真正实现并行计算。对于CPU密集型任务,多线程编程通常比异步编程更有效。
-
直观的编程模型:对于一些开发者来说,多线程编程的模型可能更直观,因为它与顺序编程的思维方式更接近。
-
现有库的兼容性:有些库和框架可能不支持异步编程,但可以很好地与多线程结合使用。
3.3、具体场景的选择
- I/O密集型任务:如果你的任务主要是I/O密集型(如网络请求、文件读写等),异步编程通常会表现得更好,尤其是在高并发场景下。
2.CPU密集型任务:如果你的任务主要是CPU密集型(如复杂计算、数据处理等),多线程编程可能会更合适,因为它可以利用多核CPU的优势。
- 混合任务:对于既有I/O密集型任务又有CPU密集型任务的场景,可以考虑混合使用异步和多线程编程。例如,使用异步编程处理I/O操作,使用线程池处理CPU密集型任务。
3.4、实际应用中的考虑
在实际应用中,选择异步编程还是多线程编程,通常需要根据具体的需求和场景来决定。你可以通过以下步骤来做出选择:
- 分析任务类型:确定你的任务是I/O密集型还是CPU密集型,或者是两者的混合。
- 性能测试:对比异步编程和多线程编程在你的具体场景下的性能表现。
- 代码复杂度:考虑哪种编程模型更容易实现和维护。
- 系统资源:评估系统资源的利用情况,选择更高效的方案。
总之,异步编程和多线程编程各有优缺点,选择哪种方式需要根据具体的应用场景和需求来决定。希望这些信息能帮助你更好地理解和选择适合的编程模型。