Python线程之threading

发布时间:2020-06-23 04:17:37 作者:元婴期
来源:网络 阅读:386

线程是属于进程的,线程运行在进程空间内,同一进程所产生的线程共享同一内存空间,当进程退出时该进程所产生的线程都会被强制退出并清除。进程是资源分配的最小单位,线程是CPU调度的最小单位,每一个进程中至少有一个线程,线程可与属于同一进程的其它线程共享进程所拥有的全部资源,但是其本身基本上不拥有系统资源,只拥有一点在运行中必不可少的信息(如程序计数器、一组寄存器和栈)。

Threading模块提供线程相关的操作,Threading模块包含Thread,Lock,RLock,Event,Queue等组件;multiprocess模块完全模仿了threading模块的接口,二者在使用时,时极其相似的。

1、Thread

创建线程的两种方式:

示例1:

import time
from threading import Thread

def func(i):
    time.sleep(1)
    print("hello : %s"%i)

thread_l = []

# 开启多线程
for i in range(10):
    t = Thread(target=func,args=(i,)) #实例化线程对象
    t.start()  # 激活线程
    thread_l.append(t)

# 异步开启阻塞
for j in thread_l:
    j.join()  # 阻塞主线程,子线程执行完毕之后向下执行主线程代码

print("主线程")

结果:

hello : 2
hello : 0
hello : 1
hello : 3
hello : 5
hello : 4
hello : 7hello : 6hello : 9

hello : 8
主线程

示例2:使用类继承的方式创建线程

import time
from threading import Thread

class MyThread(Thread): # 继承Thread类
    count = 0   # 子线程间会共享此静态属性
    def __init__(self,arg1,arg2): # 通过init方法传递参数
        super().__init__()
        self.arg1 = arg1
        self.arg2 = arg2

    def run(self):  # 必须实现run方法
        MyThread.count += 1
        time.sleep(1)
        print("%s,hello!%s"%(self.arg1,self.arg2))

thread_l = []
for i in range(10):
    t = MyThread('eric','jonny')
    t.start()
    thread_l.append(t)
for j in thread_l:
    j.join()
print("conut: %s"%MyThread.count)

结果:
1,hello!jonny
0,hello!jonny
5,hello!jonny4,hello!jonny
3,hello!jonny
2,hello!jonny

6,hello!jonny9,hello!jonny

7,hello!jonny
8,hello!jonny
conut: 10

Thread的主要方法:

t.start() :激活线程
t.join():阻塞(等待子线程执行完毕,在向下执行),在每次激活线程后阻塞会使线程变为同步,所以要在线程激活完毕之后阻塞。
t.name :设置或获取线程名
t.getName():获取线程名
t.setName(NAME):设置线程名
t.is_alive() :判断线程是否激活
t.setDaemon() :设置守护线程,在激活线程之前设置,默认值为False
t.isDaemon() : 判断是否为守护线程

2、Lock与RLock

同一个进程内的线程是数据共享的,线程的GIL(全局解释性)锁是锁的线程调用CPU的时间,在第一个线程调用CPU操作共享数据的时候,时间轮转至第二个线程,第二个线程也要操作共享数据,这样就导致了数据的不一致,这是因为GIL不锁数据,这种情况下,线程锁的出现就能解决这个这个问题。
示例1:GIL锁发挥作用

from threading import Thread

def func():
    global n
    n -= 1

n = 1000

thread_l = []
for i in range(100):
    t = Thread(target=func)
    t.start()
    thread_l.append(t)

for j in thread_l:
    j.join()

print(n)
结果是:900

示例2:时间片轮转,GIL锁失效

import time
from threading import Thread

def func():
    global n
    # n -= 1
    temp = n  # 从进程中获取n
    time.sleep(0.01)  # 每个线程调用CPU的时间是很短的,制造时间片轮转的效果
    n = temp -1  # 得到结果,再将修改过的数据返回给进程

n = 1000

thread_l = []
for i in range(100):
    t = Thread(target=func)
    t.start()
    thread_l.append(t)

for j in thread_l:
    j.join()

print(n)
结果是:998

示例3:互斥锁Lock,对数据加锁

import time
from threading import Thread
from threading import Lock

def func():
    global n
    # n -= 1
    lock.acquire()   # 上锁
    temp = n  # 从进程中获取n
    time.sleep(0.01)  # 每个线程调用CPU的时间是很短的,制造时间片轮转的效果
    n = temp -1  # 得到结果,再将修改过的数据返回给进程
    lock.release()   # 释放

n = 1000
lock = Lock()  # 实例化锁对象
thread_l = []
for i in range(100):
    t = Thread(target=func)
    t.start()
    thread_l.append(t)

for j in thread_l:
    j.join()

print(n)
结果是:900

互斥锁Lock后使数据一致,具有安全性,但是也带来了新的问题,因为锁的缘故,每一个线程都是串行的拿到锁,在释放;整个程序运行变成串行,效率降低。

示例4:递归锁Rlock
Lock在同一线程中只能被acquire一次,下一次的acquire必须等待release之后才可以;而RLock允许在同一线程中被多次acquire,但是acquire和release必须是成对存在。

from threading import Lock
from threading import RLock

lock = Lock() # 实例化出互斥锁对象
lock.acquire()
lock.acquire() # 在第二次acquire时,程序阻塞等待release之后拿到锁
print("死锁")
lock.release()
lock.release()

lock1 = RLock() # 实例化出递归锁对象
lock1.acquire()
lock1.acquire() # 可被多次acquire
print("running")
lock1.release()
lock1.release() # acquire与release成对出现

在多线程并发的情况下,同一个线程中,如果出现多次acquire,就可能发生死锁现象,使用RLock就不会出现死锁问题

3、Semaphore

线程的信号量与进程的信号量使用基本一致;信号量可以允许指定数量的线程操作上锁的数据,即一把锁有多个钥匙。对与有信号量限制的程序来说,信号量有几个任务就开启几个线程,在加锁阶段会限制程序运行数量,并不影响其它代码的并发。
示例

import random
import time
from threading import Semaphore
from threading import Thread

def sing(i,sem):
    sem.acquire() # 加锁
    print('%s enter the ktv'%i)
    time.sleep(random.randint(1,10))
    print('%s leave the ktv'%i)
    sem.release() # 释放

sem = Semaphore(4)
for i in range(20):
    t = Thread(target=sing, args=(i, sem))
    t.start()

4、Event

事件:线程之间状态标记通信,使用方法与进程的基本一致
主要方法:
e = Event() # 实例化一个事件对象
e.set() # 标记变为非阻塞
e.wait() # 默认标记为阻塞,在等待的过程中,遇到非阻塞信号就继续执行
e.clear() # 标记变为阻塞
e.is_set() # 是否阻塞 True就是非阻塞,False是阻塞

示例:连接数据库

'''
连接数据库
每0.5秒连一次,连接三次
用事件来标志数据库的连接情况
如果连接成功,显示成功
否则报错
'''

import time
import random
from threading import Thread
from threading import Event

# 模拟检查连接,检查连接成功使事件标志位为非阻塞
def check_conn():
    time.sleep(random.randint(1,3))
    e.set()

# 在还没检查成功前等待,接到非阻塞信号则连接数据库
def conn_mysql():
    count = 1
    while not e.is_set():
        if count > 3:
            raise TimeoutError

        print("尝试第 %s 次连接" % count)
        count += 1
        e.wait(0.5)
    print("连接成功")

e = Event()  # 实例化事件对象

Thread(target=check_conn).start()
Thread(target=conn_mysql).start()

5、Timer

定时器:定时开启一个线程,执行一个任务
示例:

from threading import Timer

def func():
    print("hello")

'''
必须有两个参数
第一个是时间,单位为秒
第二个是要执行的函数
'''
Timer(1,func).start()

6、Condition

条件变量:条件包含递归锁RLock和事件Event中的wait()方法的功能。
五个方法:
acquire(): 递归锁
release(): 释放锁
wait(timeout): 等待通知,或者等到设定的超时时间;才会被唤醒继续运行。wait()必须在已获得Lock前提下才能调用,否则会触发RuntimeError异常。
notify(n=1): 通知其他线程,传入的参数必须时int类型的,那些挂起的线程接到这个通知之后会开始运行,默认是通知一个正等待该condition的线程,最多则唤醒n个等待的线程。notify()必须在已获得Lock前提下才能调用,否则会触发RuntimeError。notify()不会主动释放Lock。
notifyAll(): 如果wait状态线程比较多,notifyAll的作用就是通知所有线程

示例:

from threading import Condition
from threading import Thread

def run(n):
    con.acquire()
    con.wait()
    print("run the thread: %s"%n)
    con.release()

if __name__ == '__main__':
    con = Condition()
    for i in range(10):
        t = Thread(target=run,args=(i,))
        t.start()

    while True:
        msg = input(">>> ")
        if msg == 'q':
            break
        con.acquire() # 递归锁
        if msg == 'all':
            con.notify_all()     # 放行所有线程
        else:
            con.notify(int(msg)) # 传递信号,放行线程,参数是int类型的
        con.release() # 释放锁

7、Queue模块

queue模块就是线程的队列,它是数据安全的。
主要方法:
q.put(1) # 将传入的数据放入队列
q.get() # 根据对象所属类的不同,取出队列中的数据
q.join() # 等队列为空时,在执行别的操作
q.qsize() # 返回队列的大小,不一定准确
q.empty() # 队列为空时,返回True,否则返回False,不一定准确
q.full() # 队列满时,返回True,否则返回False,不一定准确

Queue类的使用:先进先出

import queue

q = queue.Queue()  # 实例化一个队列对象,可给出队列长度,先进先出
q.put(1)           # 将传入的数据放入队列
q.put(2)
print(q.get())     # 先进先出,取出队列的第一个值

LifoQueue类的主要方法:后进先出

import queue

lfq = queue.LifoQueue() # 实例化一个对象,可给出长度,后进先出
lfq.put(1)
lfq.put(2)
print(lfq.get()) #后进先出,取出2

PriorityQueue类的主要方法:优先级
import queue

pq = queue.PriorityQueue()  # 实例化一个队列对象,优先级队列,优先级值越小越优先
pq.put((10,'a'))
pq.put((5,'b'))
pq.put((1,'c'))
pq.put((15,'d'))

for i in range(4):
    print(pq.get())
结果:
(1, 'c')
(5, 'b')
(10, 'a')
(15, 'd')

8、concurrent模块之futures

concurrent是用来操作池的模块,这个模块可创建进程池和线程池,其使用方法完全一致,统一了入口和方法,使用池更便捷,且python内置,导入便可使用。

主要方法:
submit(FUNC,ARGS):创建线程对象和激活线程,FUNC是要执行的任务,ARGS是参数
shutdown():shutdown方法封装了close和join方法,调用该方法时,不能在忘池中添加任务,且要等待池中任务执行结束
result():result方法取线程执行的函数返回值
map(FUNC,iter):map方法异步执行,需传入要执行的任务FUNC,以及一个可迭代对象iter,map方法无返回值
add_done_callback(call):回调函数

示例1:

import time
import random
from concurrent import futures

def func(n):
    time.sleep(random.randint(1,3))
    print(n)
    return n*"*"

thread_pool = futures.ThreadPoolExecutor(20)  # 实例化一个线程池对象,一般开启CPU核数*5
f_l = []
for i in range(10):
    t = thread_pool.submit(func,i)    # submit方法合并了创建线程对象和激活线程的功能
    f_l.append(t)
thread_pool.shutdown()    # shutdown方法封装了close和join方法,调用该方法时,不能在忘池中添加任务,且要等待池中任务执行结束

for j in f_l:
    print(j.result())   # result方法取线程执行的函数返回值

示例2:回调

from concurrent import futures

def func(n):
    print(n)
    return n*"*"

def call(args):
    print(args.result())

thread_pool = futures.ThreadPoolExecutor(20)
thread_pool.submit(func,1).add_done_callback(call)
推荐阅读:
  1. 多线程 threading模块___python
  2. threading包

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

threading queue concurrent

上一篇:OGG维护优化脚本(二十二)-部署准备篇--脚本部署准备

下一篇:python 函数声明和调用(18)

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》