This page looks best with JavaScript enabled

多线程、协程与高并发(2)

 ·  ☕ 5 min read · 👀... views

0x10 线程池与并发未来

不论是进程还是线程,均不能无限的开启,因此当我们用多线程解决并发问题时,我们常常使用构建线程池的方式来解决。在Python3.2及以后的版本中,内置了concurrent_futures(并发未来)模块,其可以实现线程池,进程池,不必再自己使用管道传数据造成死锁的问题。并且这个模块具有线程池和进程池、管理并行编程任务、处理非确定性的执行流程、进程/线程同步等功能,但是平时用的最多的还是用来构建线程池和进程池。

在线程池中,主线程可以获取任意一个线程的状态以及返回结果,并且当一个线程完成后,主线程能立即得到结果。下面来介绍一下concurrent_futures模块的组成部分:

  1. concurrent.futures.Executor: 这是一个虚拟基类,提供了异步执行的方法。但是别这样导入,这个类是一个抽象类,抽象类的目的是规范他的子类必须有某种方法(并且抽象类的方法必须实现),但是抽象类不能被实例化。

  2. p = concurrent.futures.ThreadPoolExecutor(max_works)创建一个线程池对象,对于线程池如果不写max_works:默认的是cpu的数目*5

  3. submit(function, argument): 调度函数(可调用的对象)的执行,将 argument 作为参数传入。 相当于apply_async异步方法。

  4. map(function, argument): 将 argument 作为参数执行函数,以 异步 的方式。

  5. shutdown(Wait=True): 发出让执行者释放所有资源的信号。

  6. concurrent.futures.Future: 其中包括函数的异步执行。Future对象是submit任务(即带有参数的functions)到executor的实例。

  7. done():比如t1.done()判断任务t1是否完成,没完成则返回False。

  8. cancel():比如t1.cancel(),取消线程执行,当线程正在执行和执行完毕后是没法取消执行的,但是如果一个线程没有启动的话,是可以取消t1线程执行的

0x11 通用模板

线程池用起来非常方便,基本上是照着模板往里面套即可(模板摘自互联网,稍作修改):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# coding:utf-8
from concurrent.futures import ThreadPoolExecutor
# 导入线程池模块
import threading
# 导入线程模块,作用是获取当前线程的名称
import os,time

def task(n):
    print('%s:%s is running' %(threading.currentThread().getName(),os.getpid()))
    # 打印当前线程名和运行的进程id号码
    time.sleep(2)
    return n**2
    # 返回传入参数的二次幂

if __name__ == '__main__':
    p=ThreadPoolExecutor()
    #实例化线程池,设置线程池的数量,不填则默认为cpu的个数*5
    l=[]
    # 用来保存返回的数据,做计算总计
    for i in range(10):
        obj=p.submit(task,i)
        # 这里的obj其实是futures的对象,使用obj.result()获取他的结果
        # 传入的参数是要执行的函数和该函数接受的参数
        # submit是非堵塞的
        # 这里执行的方式是异步执行
        # -----------------------------------
        # # p.submit(task,i).result()即同步执行
        # -----------------------------------
        # 上面的方法使用range循环有个高级的写法,即map内置函数
        # p = ThreadPoolExecutor()
        # objs=p.map(task,range(10))
        # 这里的objs的值就是直接返回的所有计算结果,不属于futures对象,是一个生成器对象
        #  可以用list(objs)的方式获取到所有结果,相当于为其中所有对象执行result()方法,这是一个阻塞式过程。
        # ----------------------------------
        l.append(obj)
        # 把返回的结果保存在空的列表中,做总计算
    p.shutdown()
    # 所有计划运行完毕,关闭结束线程池,默认有个参数wite=True (相当于close和join)

    print('='*30)
    print([obj.result() for obj in l])

#线程池支持上下文管理协议,因此上面方法也可写成下面的方法
    # with ThreadPoolExecutor() as p:   #类似打开文件,可省去.shutdown()
    #     future_tasks = [p.submit(task, i) for i in range(10)]
    # print('=' * 30)
    # print([obj.result() for obj in future_tasks])

0x12 submit与map的区别

线程池虽然简单,但是还是有一些概念容易混淆,就比如这里需要我们理清submit与map的区别:

  1. submit返回的是一个futures的对象,使用.result()才能获取他的运行结果
  2. map返回的是所有线程执行完毕后返回的结果,注意!是一个保存着所有结果的生成器对象!正因为它返回的是生成器对象而不是futures对象,所以后面介绍的futures对象的方法对其都不适用。
  3. submit接受的对象是函数加一个固定的参数
  4. map接受的对象是函数加一个传入函数的集合列表
  5. 他们都能提前获取先执行完的结果
  6. map比submit简单好用
  7. map返回的结果是按列表传入参数的顺序返回结果,submit返回结果是哪个线程先执行完就返回哪个线程的结果

0x13 判断是否执行完毕

1
2
3
task1 = p.submit(task,10)
print(task1.done())
# 如果执行完毕,返回True

0x14 取消执行线程

1
2
task1 = p.submit(task,10)
task1.cancel()

当线程正在执行和执行完毕后是没法取消执行的,但是如果一个线程没有启动的话,是可以取消t1线程执行的。

0x15 阻塞线程

多线程中使用jion堵塞线程,在线程池中也可以堵塞等待某一些子线程的线程池执行完毕后再执行主线程。

1
2
from concurrent.futures import wait
# wait(all_tasks,return_when=FIRST_COMPLETED)

wait用在你的线程池下面,比如:

1
2
all_tasks = [p.submit(task,obj) for obj in l]
wait(all_tasks)

这里只有等待all_tasks里面的线程执行完毕后才能继续执行,里面的可以加上参数等待线程池中只要有一个线程执行结束就执行后面的代码。

0x16 获取部分执行完成的结果

as_completed函数只会返回以完成的线程对象,其返回结果是一个生成器

比如:

1
2
3
4
from concurrent.futures import as_completed
all_tasks = [p.submit(task,obj) for obj in l]
for f in as_completed(all_tasks):
    data = f.result()

0x17 线程池设计理念

线程池优秀的设计理念在于:他返回的结果并不是执行完毕后的结果,而是futures的对象,这个对象会在未来存储线程执行完毕的结果,这一点也是异步编程的核心。python为了提高与统一可维护性,多线程多进程和协程的异步编程都是采取同样的方式。

下一节:多线程、协程与高并发(3)

Share on

Qfrost
WRITTEN BY
Qfrost
CTFer, Anti-Cheater, LLVM Committer