python concurrent.futures
本文内容纲要:
python因为其全局解释器锁GIL而无法通过线程实现真正的平行计算。这个论断我们不展开,但是有个概念我们要说明,IO密集型vs.计算密集型。
IO密集型:读取文件,读取网络套接字频繁。
计算密集型:大量消耗CPU的数学与逻辑运算,也就是我们这里说的平行计算。
而concurrent.futures模块,可以利用multiprocessing实现真正的平行计算。
核心原理是:concurrent.futures会以子进程的形式,平行的运行多个python解释器,从而令python程序可以利用多核CPU来提升执行速度。由于子进程与主解释器相分离,所以他们的全局解释器锁也是相互独立的。每个子进程都能够完整的使用一个CPU内核。
第一章concurrent.futures性能阐述
- 最大公约数
这个函数是一个计算密集型的函数。
#-*-coding:utf-8-*-
#求最大公约数
defgcd(pair):
a,b=pair
low=min(a,b)
foriinrange(low,0,-1):
ifa%i==0andb%i==0:
returni
numbers=[
(1963309,2265973),(1879675,2493670),(2030677,3814172),
(1551645,2229620),(1988912,4736670),(2198964,7876293)
]
-
不使用多线程/多进程
importtime
start=time.time() results=list(map(gcd,numbers)) end=time.time() print'Took%.3fseconds.'%(end-start)
Took2.507seconds.
消耗时间是:2.507。
-
多线程ThreadPoolExecutor
importtime fromconcurrent.futuresimportThreadPoolExecutor,ProcessPoolExecutor,Executor
start=time.time() pool=ThreadPoolExecutor(max_workers=2) results=list(pool.map(gcd,numbers)) end=time.time() print'Took%.3fseconds.'%(end-start)
Took2.840seconds.
消耗时间是:2.840。
上面说过gcd是一个计算密集型函数,因为GIL的原因,多线程是无法提升效率的。同时,线程启动的时候,有一定的开销,与线程池进行通信,也会有开销,所以这个程序使用了多线程反而更慢了。
-
多进程ProcessPoolExecutor
importtime fromconcurrent.futuresimportThreadPoolExecutor,ProcessPoolExecutor,Executor
start=time.time() pool=ProcessPoolExecutor(max_workers=2) results=list(pool.map(gcd,numbers)) end=time.time() print'Took%.3fseconds.'%(end-start)
Took1.861seconds.
消耗时间:1.861。
在两个CPU核心的机器上运行多进程程序,比其他两个版本都快。这是因为,ProcessPoolExecutor类会利用multiprocessing模块所提供的底层机制,完成下列操作:
1)把numbers列表中的每一项输入数据都传给map。
2)用pickle模块对数据进行序列化,将其变成二进制形式。
3)通过本地套接字,将序列化之后的数据从煮解释器所在的进程,发送到子解释器所在的进程。
4)在子进程中,用pickle对二进制数据进行反序列化,将其还原成python对象。
5)引入包含gcd函数的python模块。
6)各个子进程并行的对各自的输入数据进行计算。
7)对运行的结果进行序列化操作,将其转变成字节。
8)将这些字节通过socket复制到主进程之中。
9)主进程对这些字节执行反序列化操作,将其还原成python对象。
10)最后,把每个子进程所求出的计算结果合并到一份列表之中,并返回给调用者。
multiprocessing开销比较大,原因就在于:主进程和子进程之间通信,必须进行序列化和反序列化的操作。
第二章concurrent.futures源码分析
- Executor
可以任务Executor是一个抽象类,提供了如下抽象方法submit,map(上面已经使用过),shutdown。值得一提的是Executor实现了__enter__和__exit__使得其对象可以使用with操作符。关于上下文管理和with操作符详细请参看这篇博客http://www.cnblogs.com/kangoroo/p/7627167.html
ThreadPoolExecutor和ProcessPoolExecutor继承了Executor,分别被用来创建线程池和进程池的代码。
classExecutor(object):
"""Thisisanabstractbaseclassforconcreteasynchronousexecutors."""
defsubmit(self,fn,*args,**kwargs):
"""Submitsacallabletobeexecutedwiththegivenarguments.
Schedulesthecallabletobeexecutedasfn(*args,**kwargs)andreturns
aFutureinstancerepresentingtheexecutionofthecallable.
Returns:
AFuturerepresentingthegivencall.
"""
raiseNotImplementedError()
defmap(self,fn,*iterables,**kwargs):
"""Returnsaiteratorequivalenttomap(fn,iter).
Args:
fn:Acallablethatwilltakeasmanyargumentsasthereare
passediterables.
timeout:Themaximumnumberofsecondstowait.IfNone,thenthere
isnolimitonthewaittime.
Returns:
Aniteratorequivalentto:map(func,*iterables)butthecallsmay
beevaluatedout-of-order.
Raises:
TimeoutError:Iftheentireresultiteratorcouldnotbegenerated
beforethegiventimeout.
Exception:Iffn(*args)raisesforanyvalues.
"""
timeout=kwargs.get('timeout')
iftimeoutisnotNone:
end_time=timeout+time.time()
fs=[self.submit(fn,*args)forargsinitertools.izip(*iterables)]
#Yieldmustbehiddeninclosuresothatthefuturesaresubmitted
#beforethefirstiteratorvalueisrequired.
defresult_iterator():
try:
forfutureinfs:
iftimeoutisNone:
yieldfuture.result()
else:
yieldfuture.result(end_time-time.time())
finally:
forfutureinfs:
future.cancel()
returnresult_iterator()
defshutdown(self,wait=True):
"""Clean-uptheresourcesassociatedwiththeExecutor.
Itissafetocallthismethodseveraltimes.Otherwise,noother
methodscanbecalledafterthisone.
Args:
wait:IfTruethenshutdownwillnotreturnuntilallrunning
futureshavefinishedexecutingandtheresourcesusedbythe
executorhavebeenreclaimed.
"""
pass
def__enter__(self):
returnself
def__exit__(self,exc_type,exc_val,exc_tb):
self.shutdown(wait=True)
returnFalse
下面我们以线程ProcessPoolExecutor的方式说明其中的各个方法。
-
map
map(self,fn,*iterables,**kwargs)
map方法的实例我们上面已经实现过,值得注意的是,返回的results列表是有序的,顺序和*iterables迭代器的顺序一致。
这里我们使用with操作符,使得当任务执行完成之后,自动执行shutdown函数,而无需编写相关释放代码。
importtime
fromconcurrent.futuresimportThreadPoolExecutor,ProcessPoolExecutor,Executor
start=time.time()
withProcessPoolExecutor(max_workers=2)aspool:
results=list(pool.map(gcd,numbers))
print'results:%s'%results
end=time.time()
print'Took%.3fseconds.'%(end-start)
产出结果是:
results:[1,5,1,5,2,3]
Took1.617seconds.
-
submit
submit(self,fn,*args,**kwargs)
submit方法用于提交一个可并行的方法,submit方法同时返回一个future实例。
future对象标识这个线程/进程异步进行,并在未来的某个时间执行完成。future实例表示线程/进程状态的回调。
importtime
fromconcurrent.futuresimportThreadPoolExecutor,ProcessPoolExecutor,Executor
start=time.time()
futures=list()
withProcessPoolExecutor(max_workers=2)aspool:
forpairinnumbers:
future=pool.submit(gcd,pair)
futures.append(future)
print'results:%s'%[future.result()forfutureinfutures]
end=time.time()
print'Took%.3fseconds.'%(end-start)
产出结果是:
results:[1,5,1,5,2,3]
Took2.289seconds.
- future
submit函数返回future对象,future提供了跟踪任务执行状态的方法。比如判断任务是否执行中future.running(),判断任务是否执行完成future.done()等等。
as_completed方法传入futures迭代器和timeout两个参数
默认timeout=None,阻塞等待任务执行完成,并返回执行完成的future对象迭代器,迭代器是通过yield实现的。
timeout>0,等待timeout时间,如果timeout时间到仍有任务未能完成,不再执行并抛出异常TimeoutError
importtime
fromconcurrent.futuresimportThreadPoolExecutor,ProcessPoolExecutor,Executor,as_completed
start=time.time()
withProcessPoolExecutor(max_workers=2)aspool:
futures=[pool.submit(gcd,pair)forpairinnumbers]
forfutureinfutures:
print'执行中:%s,已完成:%s'%(future.running(),future.done())
print'####分界线####'
forfutureinas_completed(futures,timeout=2):
print'执行中:%s,已完成:%s'%(future.running(),future.done())
end=time.time()
print'Took%.3fseconds.'%(end-start)
- wait
wait方法接会返回一个tuple(元组),tuple中包含两个set(集合),一个是completed(已完成的)另外一个是uncompleted(未完成的)。
使用wait方法的一个优势就是获得更大的自由度,它接收三个参数FIRST_COMPLETED,FIRST_EXCEPTION和ALL_COMPLETE,默认设置为ALL_COMPLETED。
importtime
fromconcurrent.futuresimportThreadPoolExecutor,ProcessPoolExecutor,Executor,as_completed,wait,ALL_COMPLETED,FIRST_COMPLETED,FIRST_EXCEPTION
start=time.time()
withProcessPoolExecutor(max_workers=2)aspool:
futures=[pool.submit(gcd,pair)forpairinnumbers]
forfutureinfutures:
print'执行中:%s,已完成:%s'%(future.running(),future.done())
print'####分界线####'
done,unfinished=wait(futures,timeout=2,return_when=ALL_COMPLETED)
fordindone:
print'执行中:%s,已完成:%s'%(d.running(),d.done())
printd.result()
end=time.time()
print'Took%.3fseconds.'%(end-start)
由于设置了ALL_COMPLETED,所以wait等待所有的task执行完成,可以看到6个任务都执行完成了。
执行中:True,已完成:False
执行中:True,已完成:False
执行中:True,已完成:False
执行中:True,已完成:False
执行中:False,已完成:False
执行中:False,已完成:False
####分界线####
执行中:False,已完成:True
执行中:False,已完成:True
执行中:False,已完成:True
执行中:False,已完成:True
执行中:False,已完成:True
执行中:False,已完成:True
Took1.518seconds.
如果我们将配置改为FIRST_COMPLETED,wait会等待直到第一个任务执行完成,返回当时所有执行成功的任务。这里并没有做并发控制。
重跑,结构如下,可以看到执行了2个任务。
执行中:True,已完成:False
执行中:True,已完成:False
执行中:True,已完成:False
执行中:True,已完成:False
执行中:False,已完成:False
执行中:False,已完成:False
####分界线####
执行中:False,已完成:True
执行中:False,已完成:True
Took1.517seconds.
本文内容总结:
原文链接:https://www.cnblogs.com/kangoroo/p/7628092.html