python中怎么实现多线程和多进程

发布时间:2021-06-22 17:30:34 作者:Leah
来源:亿速云 阅读:183

这篇文章将为大家详细讲解有关python中怎么实现多线程和多进程,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。

1、GIL

名称:全局解释器锁。在cpython中python中一个线程对应c语言中的一个线程。GIL使得同一时刻只有一个线程运行在同一个cpu上,无法将多个线程映射到  多个cpu上,从而保证了线程在某种时刻是安全的。GIL并不会一直占有,会根据执行的字节码行数、时间片或io操作时释放。非常适用io操作。

例如:

import threading

var_total = 0def add():global var_totalfor var_i in range(1000000):
        var_total += 1def desc():global var_totalfor var_i in range(1000000):
        var_total -= 1if __name__ == '__main__':
    var_thread1 = threading.Thread(target=add)
    var_thread2 = threading.Thread(target=desc)
    var_thread1.start()
    var_thread2.start()
    var_thread1.join()
    var_thread2.join()print(var_total)

2、多线程编程

操作系统所能操作和调度的最小单元为线程。线程的调度比进程更加轻量级。对于io操作来说多进程和多线程性能相当。

多线程实现-方法

import timeimport threadingdef getDetailHtml(var_url):print('get detail html started')
    time.sleep(2)print('get detail html end')def getDetailUrl(var_url):print('get detail url started')
    time.sleep(4)print('get detail url end')if __name__ == '__main__':
    var_thread1 = threading.Thread(target=getDetailHtml,args=('http://www.baidu.com',))
    var_thread2 = threading.Thread(target=getDetailUrl, args=('http://www.baidu.com',))
    var_start_time = time.time()#设置当主线程退出,子线程终结,把线程设置成守护线程    #var_thread1.setDaemon(True)    var_thread2.setDaemon(True)
    var_thread1.start()
    var_thread2.start()#线程阻塞,等待子线程执行完成,主线程退出    #var_thread1.join()    #var_thread2.join()    print('last time:{}'.format(time.time() - var_start_time))

多线程实现-类

import threadingimport timeclass GetDetailHtml(threading.Thread):def __init__(self,var_name):super().__init__(name = var_name)def run(self):print('get detail html started')
        time.sleep(2)print('get detail html end')class GetDetailUrl(threading.Thread):def run(self):print('get detail url started')
        time.sleep(4)print('get detail url end')if __name__ == '__main__':
    var_thread1 = GetDetailHtml('getDetailHtml')
    var_thread2 = GetDetailUrl()
    var_start_time = time.time()
    var_thread1.start()
    var_thread2.start()
    var_thread1.join()
    var_thread2.join()print('last time:{}'.format(time.time() - var_start_time))

3、线程间的通信Queue

from queue import Queueimport threadingimport timedef setDetailUrl(var_detail_queue):while True:
        time.sleep(2)for var_i in range(5):
            var_detail_queue.put('https://www.{}.com'.format(var_i))def getDetailUrl(var_detail_queue,var_thread_name):while True:
        var_url = var_detail_queue.get()
        time.sleep(2)print(var_thread_name,':',var_url)if __name__ == '__main__':
    var_detail_queue = Queue(maxsize=10000)
    var_set_detail = threading.Thread(target=setDetailUrl,args=(var_detail_queue,))

    var_get_thread_list= []for var_index in range(5):
        var_get_thread = threading.Thread(target=getDetailUrl,args=(var_detail_queue,'thread'+str(var_index)))
        var_get_thread_list.append(var_get_thread)

    var_set_detail.start()for var_one in var_get_thread_list:
        var_one.start()

    var_set_detail.join()for var_one in var_get_thread_list:
        var_one.join()

4、线程间的锁Lock、Rlock

import threadingfrom threading import Lock

var_total = 0var_lock = Lock()def add():global var_totalfor var_i in range(1000000):#使用锁会影响性能,使用锁会出现死锁(资源竞争也会出现死锁),lock不能连续acquire多次,否则出现死锁        var_lock.acquire()
        var_total += 1        var_lock.release()def reduce():global var_totalfor var_i in range(1000000):
        var_lock.acquire()
        var_total -= 1        var_lock.release()if __name__ == '__main__':
    var_thread_add = threading.Thread(target=add)
    var_thread_reduce = threading.Thread(target=reduce)
    var_thread_add.start()
    var_thread_reduce.start()
    var_thread_add.join()
    var_thread_reduce.join()print(var_total)
import threadingfrom threading import RLock

var_total = 0var_lock = RLock()def add():global var_totalfor var_i in range(1000000):#RLock在同一个线程里面可以连续调用多次acquire,acquire的次数要和release的次数相等        var_lock.acquire()
        var_lock.acquire()
        var_total += 1        var_lock.release()
        var_lock.release()def reduce():global var_totalfor var_i in range(1000000):
        var_lock.acquire()
        var_total -= 1        var_lock.release()if __name__ == '__main__':
    var_thread_add = threading.Thread(target=add)
    var_thread_reduce = threading.Thread(target=reduce)
    var_thread_add.start()
    var_thread_reduce.start()
    var_thread_add.join()
    var_thread_reduce.join()print(var_total)

5、线程同步condition、Semaphore

from threading import Thread,Conditionclass XiaoAi(Thread):def __init__(self, var_con):super().__init__(name='小爱')self.var_con = var_condef run(self):with self.var_con:self.var_con.wait()print('{}:在'.format(self.name))self.var_con.notify()self.var_con.wait()print('{}:好啊'.format(self.name))class TianMao(Thread):def __init__(self, var_con):super().__init__(name='天猫精灵')self.var_con = var_condef run(self):with self.var_con:#notify,wait方法必须在with语句之后调用            print('{}:小爱同学'.format(self.name))#天猫先启动,notify已发出但是小爱未启动时未接收到            self.var_con.notify()self.var_con.wait()print('{}:我们来对古诗吧'.format(self.name))self.var_con.notify()if __name__ == '__main__':
    var_con = Condition()
    var_tianmao = TianMao(var_con)
    var_xiaoai = XiaoAi(var_con)#注意启动顺序    var_xiaoai.start()
    var_tianmao.start()
#Semaphore用于控制进入数量的锁import threadingimport timeclass HtmlSpider(threading.Thread):def __init__(self, var_url, var_sem):super().__init__()self.var_url = var_urlself.var_sem = var_semdef run(self):
        time.sleep(2)print(self.var_url)self.var_sem.release()class UrlProducer(threading.Thread):def __init__(self, var_sem):super().__init__()self.var_sem = var_semdef run(self):for var_i in range(20):self.var_sem.acquire()
            var_thread = HtmlSpider('https://www.baidu.com{}'.format(var_i), self.var_sem)
            var_thread.start()if __name__ == '__main__':
    var_sem = threading.Semaphore(5)
    var_url_producer = UrlProducer(var_sem)
    var_url_producer.start()

6、线程池

from concurrent.futures import ThreadPoolExecutor, as_completed, waitimport timefrom concurrent.futures import Future'''线程池,主线程中可以获取某一个线程的状态或者某一任务的状态,以及返回值当一个线程完成时主线程可以立即知道futures可以让多线程和多进程编码接口一致'''def getHtml(var_times):
    time.sleep(var_times)print('get page {} success'.format(var_times))return var_timesif __name__ == '__main__':
    var_execute = ThreadPoolExecutor(max_workers=1)# 通过submit函数提交执行的函数到线程池中,submit非阻塞的,立即返回    var_task1 = var_execute.submit(getHtml,(2))
    var_task2 = var_execute.submit(getHtml,(3))#done方法用于判定任务是否执行成功    print(var_task1.done())print(var_task2.done())#在未执行的时候可以取消任务    print(var_task2.cancel())
    time.sleep(3)print(var_task1.done())print(var_task2.done())#返回执行结果    print(var_task1.result())print('*'*20)#获取已经成功的task返回值    var_execute_01 = ThreadPoolExecutor(max_workers=2)
    var_times = [3,2,5,6,4]#list推导式    var_tasks = [var_execute_01.submit(getHtml, var_one_time) for var_one_time in var_times ]#阻塞线程    wait(var_tasks)print('wait')#as_completed 是一个生成器,谁先完成就打印谁    for var_one_complete_task in as_completed(var_tasks):print(var_one_complete_task.result())print('*' * 20)#使用map函数,打印顺序和var_times一致    var_tasks_data = var_execute_01.map(getHtml, var_times)for var_one_tasks_data in var_tasks_data:print(var_one_tasks_data)

7、多进程和多线程比较

#耗cpu的操作选用多进程编程,对于IO操作选用多线程编程,进程切换代价要高于线程import timefrom concurrent.futures import ThreadPoolExecutor, as_completedfrom concurrent.futures import ProcessPoolExecutordef fib(var_n):if var_n <= 2:return 1    return fib(var_n - 1) + fib(var_n - 2)def threadPool():with ThreadPoolExecutor(3) as var_executor:
        var_all_tasks = [var_executor.submit(fib, (var_num)) for var_num in range(25,35)]
        var_start_time = time.time()for var_future in as_completed(var_all_tasks):print('result: {}'.format(var_future.result()))print('thread run time is {} s'.format(time.time() - var_start_time))def processPool():with ProcessPoolExecutor(3) as var_executor:
        var_all_tasks = [var_executor.submit(fib, (var_num)) for var_num in range(25,35)]
        var_start_time = time.time()for var_future in as_completed(var_all_tasks):print('result: {}'.format(var_future.result()))print('process run time is {} s'.format(time.time() - var_start_time))def randomSleep(var_n):
    time.sleep(var_n)return var_ndef threadIoPool():with ThreadPoolExecutor(3) as var_executor:
        var_all_tasks = [var_executor.submit(randomSleep, (var_num)) for var_num in range(5,10)]
        var_start_time = time.time()for var_future in as_completed(var_all_tasks):print('result: {}'.format(var_future.result()))print('thread io run time is {} s'.format(time.time() - var_start_time))def processIoPool():with ProcessPoolExecutor(3) as var_executor:
        var_all_tasks = [var_executor.submit(randomSleep, (var_num)) for var_num in range(5,10)]
        var_start_time = time.time()for var_future in as_completed(var_all_tasks):print('result: {}'.format(var_future.result()))print('process io run time is {} s'.format(time.time() - var_start_time))if __name__ == '__main__':
    threadPool()
    processPool()

    threadIoPool()
    processIoPool()

8、fork案例

import osimport timeprint('bobby')#fork只能用于linux/unix中,用于创建子进程,子进程会把父进程中的数据原样拷贝至子进程中,子进程会运行fork之后的代码var_pid = os.fork()print('bobby1')if var_pid == 0:print('子进程:{},父进程是:{}'.format(os.getpid(), os.getppid()))else:print('我是父进程:{}'.format(var_pid))

time.sleep(2)'''bobbybobby1我是父进程:951bobby1子进程:951,父进程是:950'''

9、多进程使用

import multiprocessingimport timedef getSleep(var_n):
    time.sleep(var_n)print('The child process ran successfully')return var_nif __name__ == '__main__':
    var_process = multiprocessing.Process(target=getSleep, args=(2,))print(var_process.pid)
    var_process.start()print(var_process.pid)
    var_process.join()print('The main process runs successfully')

10、进程池

import multiprocessingimport timedef getSleep(var_n):
    time.sleep(var_n)print('The child process ran successfully')return var_nif __name__ == '__main__':#使用进程池    var_pool = multiprocessing.Pool(multiprocessing.cpu_count())'''    var_result = var_pool.apply_async(getSleep, args=(3,))    #关闭pool不在接收新的任务    var_pool.close()    #等待所有的任务完成    var_pool.join()    #打印结果    print(var_result.get())    '''    '''    #imap方法,完成顺序和添加顺序一致    for var_one_result in var_pool.imap(getSleep, [3,2,1]):        print(var_one_result)    '''    #imap_unordered方法,先执行完成先打印    for var_one_result in var_pool.imap_unordered(getSleep, [3, 2, 1]):print(var_one_result)

11、进程间通信Queue、Manager

import timefrom multiprocessing import Process, Queue, Pool, Managerdef producer(var_queue):
    var_queue.put('a')
    time.sleep(2)def consumer(var_queue):
    time.sleep(2)
    var_data = var_queue.get()print(var_data)if __name__ == '__main__':'''    var_queue = Queue(10)    var_producer = Process(target=producer, args=(var_queue,))    var_consumer = Process(target=consumer, args=(var_queue,))    var_producer.start()    var_consumer.start()    var_producer.join()    var_consumer.join()    '''    #queue不能用于进程池    '''    var_queue = Queue(10)    var_pool = Pool(2)    var_pool.apply_async(producer, args=(var_queue,))    var_pool.apply_async(consumer, args=(var_queue,))    var_pool.close()    var_pool.join()    '''    #Manager可以用于进程之间的通信    var_queue = Manager().Queue(10)
    var_pool = Pool(2)
    var_pool.apply_async(producer, args=(var_queue,))
    var_pool.apply_async(consumer, args=(var_queue,))
    var_pool.close()
    var_pool.join()
from multiprocessing import Manager , Processdef addDict(var_dict, var_key, var_value):
    var_dict[var_key] = var_valueif __name__ == '__main__':
    var_dict = Manager().dict()
    first_process = Process(target=addDict, args=(var_dict, 'a', 'a'))
    second_process = Process(target=addDict, args=(var_dict, 'b', 'b'))
    first_process.start()
    second_process.start()
    first_process.join()
    second_process.join()print(var_dict)

12、进程间的通信Pipe

#pipe只能用于两个指定的进程之间的通信,pipe性能高于queueimport timefrom multiprocessing import Process, Pipedef producer(var_pipe):
    var_pipe.send('a')
    time.sleep(2)def consumer(var_pipe):
    time.sleep(2)
    var_data = var_pipe.recv()print(var_data)if __name__ == '__main__':
    var_recv, var_send = Pipe()
    var_producer = Process(target=producer, args=(var_send,))
    var_consumer = Process(target=consumer, args=(var_recv,))
    var_producer.start()
    var_consumer.start()
    var_producer.join()
    var_consumer.join()

关于python中怎么实现多线程和多进程就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。

推荐阅读:
  1. 多线程和多进程的选择
  2. 关于python多线程和多进程的区别有哪些

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

python

上一篇:使用Datagrip怎么从csv文件中导入数据

下一篇:Java中AQS的原理及作用是什么

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》