43进程_multiprocessing_concurrent

发布时间:2020-07-11 23:21:08 作者:chaijowin
来源:网络 阅读:322

 

 

 

目录

multiprocessing模块:... 1

多进程举例:... 2

multiprocessing.Pool,进程池:... 3

concurrent包:... 3

 

 

 

多进程:

由于pyGIL,多线程未必是cpu密集型程序好的选择;

 

multiprocessing模块:

多进程,可在完全独立的进程环境中运行程序,可充分利用多处理器;

但进程本身的隔离带来的数据不共享是个问题;

线程比进程轻量级;

 

Process类:

遵循了Thread类的API,减少了学习难度;

多进程,一定要在__main__()中,否则抛错;

 

p=multiprocessing.Process()

p.pid,进程ID

p.exitcode,进程的退出状态码;

p.terminate(),终止指定的进程;

 

进程间同步:

提供了和线程同步一样的类,使用的方法一样,使用的效果也类似;

不过,进程间代价要高于线程,而且底层实现不同,只不过py屏蔽了这些,让用户能简单使用(py提供的库抹平了它们之间的差别,为方便使用);

 

multiprocessing还提供了共享内存、服务器进程来共享数据;还提供了QueuePipe(上一个进程的标准输出到下一个进程的标准输入)用来进程间通信;

Queue.queue是线程级别的锁;

multiprocessing.Queue可跨进程用,用的少;此时应用第三方工具MQ,单机进程间通信用的也少,一般应用都是跨节点的进程间通信,即RPCRPC框架很多,如swfitdubbo

 

通信方式不同:

多进程就是启动多个解释器进程,进程间通信必须序列化、反序列化;

数据的线程安全性问题,由于每个进程中没有实现多线程,GIL可以说没用了;

 

进程池:

只允许使用计算机这么多资源(不能抢占计算机其它资源);

很多进程要反复创建的情形下,用进程池;

 

多进程、多线程的选择:

CPU密集型,cpython中使用GIL,多线程时锁相互竞争,多核优势不能发挥,用py多进程(多个解释器进程)效率更高;

IO密集型,适合用多线程,减少IO序列化开销,且在IO等待时,切换到其它线程继续执行,效率不错;

 

应用场景:

请求-应答模型;

web应用中常见的处理模型,用多进程+多线程;

如,nginx工作模式:

master启动多个worker工作进程,一般和cpu数目相同;

worker中启动多线程,提高并发处理能力,worker处理用户的请求,往往需要等待数据;

 

nginx应就地本地编译(本地指令集,甚至用指定cpu(如intel)编译器编译,这样可优化指令,性能更高);

 

 

多进程举例:

例:

def calc(i):

    sum = 0

    for _ in range(100000000):

        sum += 1

 

if __name__ == '__main__':   #使用多进程,必须要在main中执行,否则报错

    start = datetime.datetime.now()

 

    lst = []

    for i in range(5):

        p = multiprocessing.Process(target=calc, args=(i,), name='p-{}'.format(i))

        p.start()

        lst.append(p)

    for p in lst:

        p.join()

 

    delta = (datetime.datetime.now()-start).total_seconds()

    print(delta)   #win上查看,有5py进程

输出:

20.037146

 

 

multiprocessing.Pool,进程池:

例,进程池:

def calc(i):

    sum = 0

    for _ in range(100000000):

        sum += 1

 

if __name__ == '__main__':

    start = datetime.datetime.now()

 

    pool = multiprocessing.Pool(5)

    for i in range(5):

        pool.apply_async(calc,args=(i,))

    pool.close()   #重要,要有此步

    pool.join()

 

    delta = (datetime.datetime.now()-start).total_seconds()

    print(delta)

 

 

 

concurrent包:

目前仅有一个concurrent.futures3.2引入;

异步并行任务编程模块,提供一个高级的异步可执行的便利接口;

 

提供了2个池执行器:

ThreadPoolExecutor,异步调用的线程池的Executor

ProcessPoolExecutor,异步调用的进程池的Executor

 

from conncurrent import futures

executor=futures.ThreadPoolExecutor(max_workers=1),池中至多创建max_workers个线程来同时异步执行,默认1个,返回Executor实例;

f=executor.submit(fn,*args,**kwargs),提交执行的函数及其参数,返回Futures实例;

executor.shutdown(wait=True),清理池;

 

Future类(submit()的实例):

f=executor.submit(work,2)

f.result(),可查看调用的返回的结果,函数的return结果;

f.done(),如果调用被成功的取消或执行完成,返回True,是标志,注意不是函数的返回值;

f.cancelled(),如果调用被成功的取消,返回True

f.running(),如果正在运行且不能被取消,返回True

f.cancel(),尝试取消调用,如果已经执行且不能取消,返回False,否则返回True

f.result(timeout=None),取返回的结果,超时为None,一直等待返回;

f.exception(timeout=None),取返回的异常,超时为None,一直等待返回;

 

支持上下文管理:

futures.ThreadPoolExecutor继承自_base.Executor,父类有__enter__()__exit__()方法;

例:

with futures.ThreadPoolExecutor(max_workers=1) as executor:

    future = executor.submit(pow, 2, 3)

    print(future.result())

输出:

8

 

总结:

统一了线程池、进程池调用,简化了编程;

py简单的思想哲学的体现;

唯一的缺点,无法设置线程名称;

 

例,多线程:

def work(n):

    logging.info('working-{}'.format(n))

    time.sleep(5)

    logging.info('end-work-{}'.format(n))

 

executor = futures.ThreadPoolExecutor(3)

 

fs = []

 

for i in range(3):

    f = executor.submit(work, i)

    fs.append(f)

 

for i in range(3):

    f = executor.submit(work, i)

    fs.append(f)

 

for i in range(3):

    f = executor.submit(work, i)

    fs.append(f)

 

while True:

    time.sleep(2)

    logging.info(threading.enumerate())

    flag = True

    for f in fs:

        flag = flag and f.done()

        if flag:

            executor.shutdown()

            logging.info(threading.enumerate())

            break

 

例,多进程:

def work(n):

    logging.info('working-{}'.format(n))

    time.sleep(5)

    logging.info('end-work-{}'.format(n))

 

if __name__ == '__main__':

    executor = futures.ProcessPoolExecutor(3)

    fs = []

 

    for i in range(3):

        f = executor.submit(work, i)

        fs.append(f)

 

    for i in range(3):

        f = executor.submit(work, i)

        fs.append(f)

 

    for i in range(3):

        f = executor.submit(work, i)

        fs.append(f)

 

    while True:

        time.sleep(2)

        logging.info(threading.enumerate())

        flag = True

        for f in fs:

            flag = flag and f.done()

            if flag:

                executor.shutdown()

                # logging.info(threading.enumerate())   #多进程用不上

                break

 

 


推荐阅读:
  1. 父进程和子进程
  2. [社群QA] “专家坐诊”第43期问答汇总

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

python multiprocessing concurrent

上一篇:使用kubeadm部署kubernets单节点集群

下一篇:如何保护自己的网站不被劫持

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》