冷眸

一文教你面对高并发任务时如何选择:多进(线)程 VS 异步函数

· 冷眸

face_img

在现代软件开发中,处理高并发和网络I/O密集型任务是一个常见的挑战。Python提供了多种方法来处理并发,其中最常用的是多进(线)程和异步编程。本文将探讨这两种技术在实际应用中的性能差异,并通过实验来比较它们在处理大量网络请求时的效率。

1、多进(线)程和异步函数

1.1、多进(线)程

多进(线)程允许多个任务在同一程序中并行运行。每个线程占用一定的系统资源,如CPU时间和内存。多进(线)程适合于同时执行多个独立任务,尤其是在多核CPU上。

优点

  • 可以实现真正的并行执行。
  • 在多核处理器上,可以显著提高程序的执行效率。

缺点

  • 线程管理需要消耗额外的资源。
  • 线程之间的同步和通信可能导致复杂的竞态条件和死锁问题。

1.2、异步函数

异步编程是一种单线程的任务调度方式,它通过事件循环来管理任务的执行。这种方式非常适合处理I/O密集型任务,如网络请求和文件操作。

优点

  • 高效的I/O处理能力,不会阻塞主线程。
  • 减少了线程创建和上下文切换的开销。

缺点

  • 编程模型相对复杂,需要理解事件循环和回调机制。
  • 在CPU密集型任务中表现不佳,因为所有任务都在同一个线程中执行。

2、性能对比

异步编程和多进(线)程都是实现并发的有效手段,但它们各有优势和适用场景。异步编程通常用于I/O密集型任务,如文件操作和网络请求,而多进(线)程则可以同时处理多个任务,尤其是在多核处理器上。

2.1、多进(线)程

在使用多进(线)程的时候,一定要注意Python的全局解释器锁(Global Interpreter Lock,简称GIL)机制。我们先看一个多线程的例子,通过实验来直观的说明。

机器配置:Mac-Pro, Apple M2, 10核

2.1.1、Threading

以下是一个使用threading实现多线程的例子

import threading
from datetime import datetime

def cpu_bound_task(idx):
    # 执行一个计算密集型任务
    count = 0
    for i in range(100000000):
        count += i

# 任务数
num_tasks = 1

# 创建多于CPU核心数的线程
threads = []
for i in range(num_tasks):
    thread = threading.Thread(target=cpu_bound_task, args=(i,))
    threads.append(thread)

start_time = datetime.now()

# 启动所有线程
for thread in threads:
    thread.start()

# 等待所有线程完成
for thread in threads:
    thread.join()

end_time = datetime.now()
print(f"Time: {(end_time-start_time).total_seconds()} seconds")

时间消耗和任务数之间的关系:

Tasks Time (s)
1 2.469811
2 4.875715
3 7.155812
5 11.53091
10 24.403462

可以看到,完成所有任务的总时间和任务数几乎呈线性增长关系,明明是多线程并发执行,为什么时间会成倍增长呢。

因为在Python中,由于全局解释器锁(Global Interpreter Lock,简称GIL),在任何给定时刻只允许一个线程执行Python字节码。这意味着即使你的机器有多个CPU核心,使用Python的标准threading库进行多线程编程时,这些线程在执行计算密集型任务时实际上并不会并行执行,而是会在单个核心上交替执行。

2.1.2、Multiprocessing

我们换一种方式,使用Multiprocessing的Process来试试呢。

from multiprocessing import Process
from datetime import datetime

def cpu_bound_task(idx):
    count = 0
    for i in range(100000000):
        count += i

if __name__ == '__main__':
    num_tasks = 1
    processes = []
    for i in range(num_tasks):
        process = Process(target=cpu_bound_task, args=(i,))
        processes.append(process)

    start_time = datetime.now()

    # 启动所有进程
    for process in processes:
        process.start()

    # 等待所有进程完成
    for process in processes:
        process.join()

    end_time = datetime.now()
    print(f"Time: {(end_time - start_time).total_seconds()} seconds")

时间消耗和任务数之间的关系:

Tasks Time (s)
1 2.983705
2 3.308298
3 3.138572
5 3.76091
10 5.613843

可以看到,不论多少水个任务,时间消耗在3-5s之间,这个小的波动是因为进程越多,进程之间的资源调度和切换需要时间,并且个人机器上还有其他程序在运行,会占用部分核,所以这个波动是正常的,实现了并行的处理。

2.1.3、Concurrent.Futures

1. concurrent.futures 之 ThreadPoolExecutor

我们继续使用Multiprocessing中的ThreadPoolExecutor来尝试。

import concurrent.futures
from datetime import datetime

def task_function(idx):
    count = 0
    for i in range(100000000):
        count += i

def main():
    num_tasks = 1  # 你想要运行的任务数量
    start_time = datetime.now()  # 开始计时

    with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
        # 使用executor.map来并发执行
        executor.map(task_function, range(num_tasks))

    end_time = datetime.now()  # 结束计时
    total_duration = (end_time - start_time).total_seconds()
    print(f"Total duration: {total_duration} seconds")

if __name__ == "__main__":
    main()

时间消耗和任务数之间的关系:

Tasks num_workers Time (s)
1 1 2.392556
2 1 5.000087
3 1 7.298594
5 1 12.322661
10 1 24.470803
2 2 4.907088
3 3 7.498068
5 5 12.665615
5 10 24.979614

又出现了这种问题,当num_workers为1的时候,总时长随着任务数增加而成倍增长,这个很正常,因为只有一个worker,所有任务串行处理。可是当任务数和worker数一样的时候,总时长依然是随着任务数成倍增长,这里依然是python的GIL机制在作怪。

2. Concurrent.Futures 之 ProcessPoolExecutor

我们再使用concurrent.futures中的ProcessPoolExecutor来尝试。

import concurrent.futures
from datetime import datetime

def task_function(idx):
    count = 0
    for i in range(100000000):
        count += i

def main():
    num_tasks = 1  # 你想要运行的任务数量
    start_time = datetime.now()  # 开始计时

    with concurrent.futures.ProcessPoolExecutor(max_workers=1) as executor:
        # 使用executor.map来并发执行
        results = executor.map(task_function, range(num_tasks))

    end_time = datetime.now()  # 结束计时
    total_duration = (end_time - start_time).total_seconds()
    print(f"Total duration: {total_duration} seconds")

if __name__ == "__main__":
    main()

时间消耗和任务数之间的关系:

Tasks num_workers Time (s)
1 1 3.018315
2 2 3.198022
3 3 3.391358
5 5 3.84788
5 10 5.737346

很显然,当woker数和任务数一样的时候,随着任务数的增长,总的消耗时间基本相同,实现了并行处理。

使用Threading和concurrent.futures中的ThreadPoolExecutor容易受到GIL机制的影响,有时候并不能实现真正的并行。而Multiprocessing的Process,concurrent.futures中的ProcessPoolExecutor则能绕过GIL机制实现并行处理。

注意的是,GIL机制对计算密集型任务有较为明显的影响,但对于网络IO型任务的影响基本上可以忽略。以下是一个网络IO型任务的例子,总时长受任务数量基本可以忽略不计。

import threading
import requests
import time

urls = [
           'https://jsonplaceholder.typicode.com/posts/1',
           'https://jsonplaceholder.typicode.com/posts/2',
           'https://jsonplaceholder.typicode.com/posts/3',
           'https://jsonplaceholder.typicode.com/posts/4',
           'https://jsonplaceholder.typicode.com/posts/5'
       ] * 10

def fetch_data(url):
    response = requests.get(url)

start_time = time.time()

threads = []
for url in urls:
    thread = threading.Thread(target=fetch_data, args=(url,))
    threads.append(thread)
    thread.start()

for thread in threads:
    thread.join()

end_time = time.time()
print(f"Time taken with multithreading: {end_time - start_time} seconds")

2.2、异步函数

即使GIL机制对网络IO型任务影响不大,但是在有大量网络IO型任务需要并发的时候,仍然建议采用异步函数的方式来实现。因为多线程或者多进程机制需要为每一个任务创建一个线程或进程,会大大消耗机器的资源,如果不是计算密集型任务,这种资源的消耗是完全没有必要的。

而异步编程通常使用单线程事件循环来管理多个I/O操作,这样可以避免线程上下文切换的开销。对于大量小而频繁的I/O操作,异步编程可以更高效地利用系统资源。下面是一个异步并发的例子。

import aiohttp
import asyncio
import time

urls = [
    'https://jsonplaceholder.typicode.com/posts/1',
    'https://jsonplaceholder.typicode.com/posts/2',
    'https://jsonplaceholder.typicode.com/posts/3',
    'https://jsonplaceholder.typicode.com/posts/4',
    'https://jsonplaceholder.typicode.com/posts/5'
] * 100

async def fetch_data(session, url):
    async with session.get(url) as response:
        pass

async def main():
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_data(session, url) for url in urls]
        await asyncio.gather(*tasks)

start_time = time.time()

asyncio.run(main())

end_time = time.time()
print(f"Time taken with async: {end_time - start_time} seconds")

3、使用建议

3.1、异步编程的优势

  1. 资源效率:异步编程通常使用单线程事件循环来管理多个I/O操作,这样可以避免线程上下文切换的开销。对于大量小而频繁的I/O操作,异步编程可以更高效地利用系统资源。

  2. 简单的并发模型:异步编程通过回调、async/await等机制来实现并发,避免了多线程编程中的一些复杂问题,如死锁、竞态条件等。

  3. 可扩展性:在处理大量并发连接(如高并发的网络服务器)时,异步编程通常比多线程更具可扩展性,因为它不需要为每个连接创建一个线程。

3.2、多线程编程的优势

  1. 多核利用:多线程编程可以利用多核CPU的优势,真正实现并行计算。对于CPU密集型任务,多线程编程通常比异步编程更有效。

  2. 直观的编程模型:对于一些开发者来说,多线程编程的模型可能更直观,因为它与顺序编程的思维方式更接近。

  3. 现有库的兼容性:有些库和框架可能不支持异步编程,但可以很好地与多线程结合使用。

3.3、具体场景的选择

  1. I/O密集型任务:如果你的任务主要是I/O密集型(如网络请求、文件读写等),异步编程通常会表现得更好,尤其是在高并发场景下。

2.CPU密集型任务:如果你的任务主要是CPU密集型(如复杂计算、数据处理等),多线程编程可能会更合适,因为它可以利用多核CPU的优势。

  1. 混合任务:对于既有I/O密集型任务又有CPU密集型任务的场景,可以考虑混合使用异步和多线程编程。例如,使用异步编程处理I/O操作,使用线程池处理CPU密集型任务。

3.4、实际应用中的考虑

在实际应用中,选择异步编程还是多线程编程,通常需要根据具体的需求和场景来决定。你可以通过以下步骤来做出选择:

  • 分析任务类型:确定你的任务是I/O密集型还是CPU密集型,或者是两者的混合。
  • 性能测试:对比异步编程和多线程编程在你的具体场景下的性能表现。
  • 代码复杂度:考虑哪种编程模型更容易实现和维护。
  • 系统资源:评估系统资源的利用情况,选择更高效的方案。

总之,异步编程和多线程编程各有优缺点,选择哪种方式需要根据具体的应用场景和需求来决定。希望这些信息能帮助你更好地理解和选择适合的编程模型。