国外课栈 - 跨学科知识视角栈

Python并行处理方式

 二维码 81
文章附图

背景知识视频教程

启动并行任务

利用concurrent.futures,这是用于异步执行可调用对象的高级接口。

您将学习如何使用current.futures库在Python中异步运行任务。 它是Python中线程和多处理类的更好替代方法,因为它使用相同的接口实现了线程和进程,该接口由抽象的Executor类定义。

此外,线程类不允许您从可调用函数返回一个值(null除外)。 parallel.futures模块的主要概念在于Executor类。 它是一个抽象类,提供异步执行调用的方法。 而不是直接使用它,我们将使用从其继承的子类:

  • ThreadPoolExecutor
  • ProcessPoolExecutor

ThreadPoolExecutor

在Python文件顶部添加以下导入声明:

from concurrent.futures import ThreadPoolExecutor
import time

让我们定义一个新函数,将其用作异步调用的可调用函数。我将定义一个简单的函数,该函数休眠两秒钟,然后返回两个输入参数的相乘结果:

def wait_function(x, y):
    print(
'Task(', x,'multiply', y, ') started')
    time.sleep(2)
    print(
'Task(', x,'multiply', y, ') completed')
   
return x * y

单任务

下一步是创建ThreadPoolExecutor对象。 强烈建议将其包装在with上下文管理器中,因为它将自行调用shutdown函数,并在完成执行后释放资源。 它接受以下输入参数。

  • max_workers: 此实例的工作线程数。从3.5版开始,它将默认为计算机上的处理器数量乘以5。从3.8版开始,默认值更改为min(32,os.cpu_count()+ 4)
  • thread_name_prefix:允许用户控制线程。池创建的工作线程的线程名称,以便于调试
  • Initializer:在每个工作进程开始时调用的可选可调用对象
  • initargs:传递给initializer的参数的元组

在此,我将仅使用max_workers参数。 让我们创建一个ThreadPoolExecutor并使用wait_function作为输入可调用函数来调用Submit函数。 请记住,wait_function接受两个输入参数。 我将把它们作为单独的参数而不是元组传递:

with ThreadPoolExecutor(max_workers=1) as executor:
    future = executor.submit(wait_function, 3, 4)

提交函数将返回一个Future对象,该对象封装了可调用对象的异步执行。 Future对象最常用的功能是:

  • cancel:尝试取消执行。返回一个布尔值,该布尔值指示呼叫是否已成功取消。
  • running:检查是否正在执行调用。返回一个布尔值。
  • done:检查调用是否被取消或完成。返回一个布尔值。
  • result:返回调用返回的值。 如果调用尚未完成,则此方法将等待输入超时参数指定的n秒。 强烈建议在调用结果之前使用done函数进行检查,因为超时会阻止当前执行。
  • add_done_callback:将可调用函数附加到Future对象。当Future取消或完成运行时,将使用Future作为其唯一参数调用此函数。

将以下代码附加在Submit函数的正下方。这只是一个简单的循环,在线程运行时打印一个字符串。完成后,将打印出结果:

while True:
   
if(future.running()):
        print(
"Task 1 running")
   
elif(future.done()):
        print(future.result())
       
break

附加后代码:

from concurrent.futures import ThreadPoolExecutor
import time

def wait_function(x, y):
    print(
'Task(', x,'multiply', y, ') started')
    time.sleep(2)
    print(
'Task(', x,'multiply', y, ') completed')
   
return x * y

with ThreadPoolExecutor(max_workers=1) as executor:
    future = executor.submit(wait_function, 3, 4)

   
while True:
       
if(future.running()):
            print(
"Task 1 running")
       
elif(future.done()):
            print(future.result())
           
break

运行Python文件时,您应该看到以下结果:

多任务

我们将向其中添加另一个任务,以便它们都可以并行运行。将您的Python文件中的代码更改为以下内容:

</>

现在,首先将max_workers设置为一。 运行它,您应该注意到任务不是并行运行的。 它将运行第一个任务,然后运行第二个任务。 这主要是因为池中只有一个工作线程。 让我们将max_workers增加到两个,您应该能够看到两个任务正在并行运行。

您可以将回调函数附加到Future对象。 一旦取消或完成执行,它将调用附加的函数。 如果您打算在成功连接到数据库或完成URL请求之后继续进行UI更新,则此函数非常有用。 现在,让我们创建一个简单的回调函数:

</>

在Submit函数下添加以下代码:

</>

完整代码

</>

运行Python文件时,控制台中将显示以下结果:

ProcessPoolExecutor

ProcessPoolExecutor类的工作原理与ThreadPoolExecutor完全相同,但有一些细微的差别。 它使用了多处理模块,从而可以避开全局解释器锁定。 但是,这也意味着只能执行和返回可拾取对象。

此外,它在交互式解释器中不起作用,并且必须具有可由工作程序子进程导入的__main__函数。 max_workers将是计算机中的进程数。 在Windows操作系统上,max_workers必须等于或小于61。

您必须导入ProcessPoolExecutor才能使用它:

from concurrent.futures import ProcessPoolExecutor

使用Joblib并行

multiprocessing并行

使用并行计算快速处理视频

现代并行和分布式Python

Multiprocessing扩展到集群

并行框架

并行处理提取Excel数据

阅读完整文档

文章分类: 编程技艺Python