您好,登录后才能下订单哦!
这篇文章将为大家详细讲解有关python中怎么实现多线程和多进程,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。
1、GIL
名称:全局解释器锁。在cpython中python中一个线程对应c语言中的一个线程。GIL使得同一时刻只有一个线程运行在同一个cpu上,无法将多个线程映射到 多个cpu上,从而保证了线程在某种时刻是安全的。GIL并不会一直占有,会根据执行的字节码行数、时间片或io操作时释放。非常适用io操作。
例如:
import threading var_total = 0def add():global var_totalfor var_i in range(1000000): var_total += 1def desc():global var_totalfor var_i in range(1000000): var_total -= 1if __name__ == '__main__': var_thread1 = threading.Thread(target=add) var_thread2 = threading.Thread(target=desc) var_thread1.start() var_thread2.start() var_thread1.join() var_thread2.join()print(var_total)
2、多线程编程
操作系统所能操作和调度的最小单元为线程。线程的调度比进程更加轻量级。对于io操作来说多进程和多线程性能相当。
多线程实现-方法
import timeimport threadingdef getDetailHtml(var_url):print('get detail html started') time.sleep(2)print('get detail html end')def getDetailUrl(var_url):print('get detail url started') time.sleep(4)print('get detail url end')if __name__ == '__main__': var_thread1 = threading.Thread(target=getDetailHtml,args=('http://www.baidu.com',)) var_thread2 = threading.Thread(target=getDetailUrl, args=('http://www.baidu.com',)) var_start_time = time.time()#设置当主线程退出,子线程终结,把线程设置成守护线程 #var_thread1.setDaemon(True) var_thread2.setDaemon(True) var_thread1.start() var_thread2.start()#线程阻塞,等待子线程执行完成,主线程退出 #var_thread1.join() #var_thread2.join() print('last time:{}'.format(time.time() - var_start_time))
多线程实现-类
import threadingimport timeclass GetDetailHtml(threading.Thread):def __init__(self,var_name):super().__init__(name = var_name)def run(self):print('get detail html started') time.sleep(2)print('get detail html end')class GetDetailUrl(threading.Thread):def run(self):print('get detail url started') time.sleep(4)print('get detail url end')if __name__ == '__main__': var_thread1 = GetDetailHtml('getDetailHtml') var_thread2 = GetDetailUrl() var_start_time = time.time() var_thread1.start() var_thread2.start() var_thread1.join() var_thread2.join()print('last time:{}'.format(time.time() - var_start_time))
3、线程间的通信Queue
from queue import Queueimport threadingimport timedef setDetailUrl(var_detail_queue):while True: time.sleep(2)for var_i in range(5): var_detail_queue.put('https://www.{}.com'.format(var_i))def getDetailUrl(var_detail_queue,var_thread_name):while True: var_url = var_detail_queue.get() time.sleep(2)print(var_thread_name,':',var_url)if __name__ == '__main__': var_detail_queue = Queue(maxsize=10000) var_set_detail = threading.Thread(target=setDetailUrl,args=(var_detail_queue,)) var_get_thread_list= []for var_index in range(5): var_get_thread = threading.Thread(target=getDetailUrl,args=(var_detail_queue,'thread'+str(var_index))) var_get_thread_list.append(var_get_thread) var_set_detail.start()for var_one in var_get_thread_list: var_one.start() var_set_detail.join()for var_one in var_get_thread_list: var_one.join()
4、线程间的锁Lock、Rlock
import threadingfrom threading import Lock var_total = 0var_lock = Lock()def add():global var_totalfor var_i in range(1000000):#使用锁会影响性能,使用锁会出现死锁(资源竞争也会出现死锁),lock不能连续acquire多次,否则出现死锁 var_lock.acquire() var_total += 1 var_lock.release()def reduce():global var_totalfor var_i in range(1000000): var_lock.acquire() var_total -= 1 var_lock.release()if __name__ == '__main__': var_thread_add = threading.Thread(target=add) var_thread_reduce = threading.Thread(target=reduce) var_thread_add.start() var_thread_reduce.start() var_thread_add.join() var_thread_reduce.join()print(var_total)
import threadingfrom threading import RLock var_total = 0var_lock = RLock()def add():global var_totalfor var_i in range(1000000):#RLock在同一个线程里面可以连续调用多次acquire,acquire的次数要和release的次数相等 var_lock.acquire() var_lock.acquire() var_total += 1 var_lock.release() var_lock.release()def reduce():global var_totalfor var_i in range(1000000): var_lock.acquire() var_total -= 1 var_lock.release()if __name__ == '__main__': var_thread_add = threading.Thread(target=add) var_thread_reduce = threading.Thread(target=reduce) var_thread_add.start() var_thread_reduce.start() var_thread_add.join() var_thread_reduce.join()print(var_total)
5、线程同步condition、Semaphore
from threading import Thread,Conditionclass XiaoAi(Thread):def __init__(self, var_con):super().__init__(name='小爱')self.var_con = var_condef run(self):with self.var_con:self.var_con.wait()print('{}:在'.format(self.name))self.var_con.notify()self.var_con.wait()print('{}:好啊'.format(self.name))class TianMao(Thread):def __init__(self, var_con):super().__init__(name='天猫精灵')self.var_con = var_condef run(self):with self.var_con:#notify,wait方法必须在with语句之后调用 print('{}:小爱同学'.format(self.name))#天猫先启动,notify已发出但是小爱未启动时未接收到 self.var_con.notify()self.var_con.wait()print('{}:我们来对古诗吧'.format(self.name))self.var_con.notify()if __name__ == '__main__': var_con = Condition() var_tianmao = TianMao(var_con) var_xiaoai = XiaoAi(var_con)#注意启动顺序 var_xiaoai.start() var_tianmao.start()
#Semaphore用于控制进入数量的锁import threadingimport timeclass HtmlSpider(threading.Thread):def __init__(self, var_url, var_sem):super().__init__()self.var_url = var_urlself.var_sem = var_semdef run(self): time.sleep(2)print(self.var_url)self.var_sem.release()class UrlProducer(threading.Thread):def __init__(self, var_sem):super().__init__()self.var_sem = var_semdef run(self):for var_i in range(20):self.var_sem.acquire() var_thread = HtmlSpider('https://www.baidu.com{}'.format(var_i), self.var_sem) var_thread.start()if __name__ == '__main__': var_sem = threading.Semaphore(5) var_url_producer = UrlProducer(var_sem) var_url_producer.start()
6、线程池
from concurrent.futures import ThreadPoolExecutor, as_completed, waitimport timefrom concurrent.futures import Future'''线程池,主线程中可以获取某一个线程的状态或者某一任务的状态,以及返回值当一个线程完成时主线程可以立即知道futures可以让多线程和多进程编码接口一致'''def getHtml(var_times): time.sleep(var_times)print('get page {} success'.format(var_times))return var_timesif __name__ == '__main__': var_execute = ThreadPoolExecutor(max_workers=1)# 通过submit函数提交执行的函数到线程池中,submit非阻塞的,立即返回 var_task1 = var_execute.submit(getHtml,(2)) var_task2 = var_execute.submit(getHtml,(3))#done方法用于判定任务是否执行成功 print(var_task1.done())print(var_task2.done())#在未执行的时候可以取消任务 print(var_task2.cancel()) time.sleep(3)print(var_task1.done())print(var_task2.done())#返回执行结果 print(var_task1.result())print('*'*20)#获取已经成功的task返回值 var_execute_01 = ThreadPoolExecutor(max_workers=2) var_times = [3,2,5,6,4]#list推导式 var_tasks = [var_execute_01.submit(getHtml, var_one_time) for var_one_time in var_times ]#阻塞线程 wait(var_tasks)print('wait')#as_completed 是一个生成器,谁先完成就打印谁 for var_one_complete_task in as_completed(var_tasks):print(var_one_complete_task.result())print('*' * 20)#使用map函数,打印顺序和var_times一致 var_tasks_data = var_execute_01.map(getHtml, var_times)for var_one_tasks_data in var_tasks_data:print(var_one_tasks_data)
7、多进程和多线程比较
#耗cpu的操作选用多进程编程,对于IO操作选用多线程编程,进程切换代价要高于线程import timefrom concurrent.futures import ThreadPoolExecutor, as_completedfrom concurrent.futures import ProcessPoolExecutordef fib(var_n):if var_n <= 2:return 1 return fib(var_n - 1) + fib(var_n - 2)def threadPool():with ThreadPoolExecutor(3) as var_executor: var_all_tasks = [var_executor.submit(fib, (var_num)) for var_num in range(25,35)] var_start_time = time.time()for var_future in as_completed(var_all_tasks):print('result: {}'.format(var_future.result()))print('thread run time is {} s'.format(time.time() - var_start_time))def processPool():with ProcessPoolExecutor(3) as var_executor: var_all_tasks = [var_executor.submit(fib, (var_num)) for var_num in range(25,35)] var_start_time = time.time()for var_future in as_completed(var_all_tasks):print('result: {}'.format(var_future.result()))print('process run time is {} s'.format(time.time() - var_start_time))def randomSleep(var_n): time.sleep(var_n)return var_ndef threadIoPool():with ThreadPoolExecutor(3) as var_executor: var_all_tasks = [var_executor.submit(randomSleep, (var_num)) for var_num in range(5,10)] var_start_time = time.time()for var_future in as_completed(var_all_tasks):print('result: {}'.format(var_future.result()))print('thread io run time is {} s'.format(time.time() - var_start_time))def processIoPool():with ProcessPoolExecutor(3) as var_executor: var_all_tasks = [var_executor.submit(randomSleep, (var_num)) for var_num in range(5,10)] var_start_time = time.time()for var_future in as_completed(var_all_tasks):print('result: {}'.format(var_future.result()))print('process io run time is {} s'.format(time.time() - var_start_time))if __name__ == '__main__': threadPool() processPool() threadIoPool() processIoPool()
8、fork案例
import osimport timeprint('bobby')#fork只能用于linux/unix中,用于创建子进程,子进程会把父进程中的数据原样拷贝至子进程中,子进程会运行fork之后的代码var_pid = os.fork()print('bobby1')if var_pid == 0:print('子进程:{},父进程是:{}'.format(os.getpid(), os.getppid()))else:print('我是父进程:{}'.format(var_pid)) time.sleep(2)'''bobbybobby1我是父进程:951bobby1子进程:951,父进程是:950'''
9、多进程使用
import multiprocessingimport timedef getSleep(var_n): time.sleep(var_n)print('The child process ran successfully')return var_nif __name__ == '__main__': var_process = multiprocessing.Process(target=getSleep, args=(2,))print(var_process.pid) var_process.start()print(var_process.pid) var_process.join()print('The main process runs successfully')
10、进程池
import multiprocessingimport timedef getSleep(var_n): time.sleep(var_n)print('The child process ran successfully')return var_nif __name__ == '__main__':#使用进程池 var_pool = multiprocessing.Pool(multiprocessing.cpu_count())''' var_result = var_pool.apply_async(getSleep, args=(3,)) #关闭pool不在接收新的任务 var_pool.close() #等待所有的任务完成 var_pool.join() #打印结果 print(var_result.get()) ''' ''' #imap方法,完成顺序和添加顺序一致 for var_one_result in var_pool.imap(getSleep, [3,2,1]): print(var_one_result) ''' #imap_unordered方法,先执行完成先打印 for var_one_result in var_pool.imap_unordered(getSleep, [3, 2, 1]):print(var_one_result)
11、进程间通信Queue、Manager
import timefrom multiprocessing import Process, Queue, Pool, Managerdef producer(var_queue): var_queue.put('a') time.sleep(2)def consumer(var_queue): time.sleep(2) var_data = var_queue.get()print(var_data)if __name__ == '__main__':''' var_queue = Queue(10) var_producer = Process(target=producer, args=(var_queue,)) var_consumer = Process(target=consumer, args=(var_queue,)) var_producer.start() var_consumer.start() var_producer.join() var_consumer.join() ''' #queue不能用于进程池 ''' var_queue = Queue(10) var_pool = Pool(2) var_pool.apply_async(producer, args=(var_queue,)) var_pool.apply_async(consumer, args=(var_queue,)) var_pool.close() var_pool.join() ''' #Manager可以用于进程之间的通信 var_queue = Manager().Queue(10) var_pool = Pool(2) var_pool.apply_async(producer, args=(var_queue,)) var_pool.apply_async(consumer, args=(var_queue,)) var_pool.close() var_pool.join()
from multiprocessing import Manager , Processdef addDict(var_dict, var_key, var_value): var_dict[var_key] = var_valueif __name__ == '__main__': var_dict = Manager().dict() first_process = Process(target=addDict, args=(var_dict, 'a', 'a')) second_process = Process(target=addDict, args=(var_dict, 'b', 'b')) first_process.start() second_process.start() first_process.join() second_process.join()print(var_dict)
12、进程间的通信Pipe
#pipe只能用于两个指定的进程之间的通信,pipe性能高于queueimport timefrom multiprocessing import Process, Pipedef producer(var_pipe): var_pipe.send('a') time.sleep(2)def consumer(var_pipe): time.sleep(2) var_data = var_pipe.recv()print(var_data)if __name__ == '__main__': var_recv, var_send = Pipe() var_producer = Process(target=producer, args=(var_send,)) var_consumer = Process(target=consumer, args=(var_recv,)) var_producer.start() var_consumer.start() var_producer.join() var_consumer.join()
关于python中怎么实现多线程和多进程就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。