Python全栈开发之并发编程

发布时间:2020-07-10 09:02:38 作者:灰白世界
来源:网络 阅读:1645

No.1 线程

什么是多任务

就是操作系统可以同时运行多个任务,就是可以一边用浏览器上网,同时又可以听歌,还能再撩个×××姐,这就是多任务,操作系统会轮流把系统调度到每个核心上去执行

并发和并行

并发是指任务数多余cpu核数,通过操作系统的各种任务调度算法,实现多个任务

并行是指任务数小于cpu核数,即任务同时执行

单线程

import time

def say_hello(i):
    print('hello ', i)

if __name__ == '__main__':
    for i in range(5):
        say_hello(i)

多线程

import threading
import time

def say_hello(i):
    print('hello ', i)

if __name__ == '__main__':
    for i in range(5):
        t = threading.Thread(target=say_hello,args=(i,))
        t.start() # 当调用start方法时,才会正真的执行线程

主线程会等待子线程

import threading
import time

def say_hello(name):
    for i in range(5):
        print('hello ', i, name)

if __name__ == '__main__':
    say_hello('主线程')
    t1 = threading.Thread(target=say_hello,args=('t1',))
    t2 = threading.Thread(target=say_hello,args=('t2',))
    t1.start()
    t2.start()
    print('end')
# hello  0 主线程
# hello  1 主线程
# hello  2 主线程
# hello  3 主线程
# hello  4 主线程
# hello  0 t1
# hello  1 t1
# hello  2 t1
# hello  3 t1
# hello  4 t1
# hello  0 t2
# hello  1 t2
# hello  2 t2
# hello  3 t2
# hello  4 t2
# end

查看线程数量

import threading
import time

def say_hello(i):
    print('hello ', i)

if __name__ == '__main__':
    say_hello('主线程')
    for i in range(5):
        t = threading.Thread(target=say_hello,args=(i,))
        t.start()
        while True:
            length = len(threading.enumerate())
            print('当前运行的线程数为:%d' % length)
            if length <= 1:
                break
# hello  主线程
# hello  0
# 当前运行的线程数为:2
# 当前运行的线程数为:1
# hello  1
# 当前运行的线程数为:1
# hello  2
# 当前运行的线程数为:1
# hello  3
# 当前运行的线程数为:1
# hello  4
# 当前运行的线程数为:1

封装线程

为了让每个线程的封装性更加完整,我们通常会创建一个线程类,让这个线程类继承自threading.Thread,然后重写run方法就可以了

import threading
import time
class MyThread(threading.Thread):

    def run(self):
        for i in range(5):
            time.sleep(1)
            print(self.name + str(i))

if __name__ == '__main__':
    t = MyThread()
    t.start()

Python的threading.Thread类的run方法,用于定义线程的功能函数,可以在我们自己的类中覆盖该方法,当创建自己的线程类对象后,可以start方法,进行调度

线程的执行顺序

线程的执行顺序是不确定的,当执行到sleep语句时,线程将会被阻塞,然后线程进入就绪状态,等待cpu的调度,线程调度自动选择一个线程执行

No.2 多线程

多线程共享全局变量

import threading
import time

num = 100
def demo1():
    global num
    for i in range(3):
        num -= 1
        print('num = ',num)

def demo2():
    print('num = ', num)

if __name__ == '__main__':
    print('线程创建之前num = ',num)
    t1 = threading.Thread(target=demo1)
    t1.start()
    time.sleep(1)
    t2 = threading.Thread(target=demo2)
    t2.start()
# 线程创建之前num =  100
# num =  99
# num =  98
# num =  97
# num =  97

在一个进程内的所有线程共享全局变量,能很方便的在多个线程之间共享数据,但是这也带来一个麻烦,就是线程就全局变量的随机修改可能会导致多线程之间对于全局变量的的混乱,即线程非安全

import threading
import time

num = 100
def demo1():
    global num
    for i in range(1000000):
        # lock.acquire()
        num += 1
        # lock.release()

def demo2():
    global num

    for i in range(1000000):
        # lock.acquire()
        num += 1
        # lock.release()

if __name__ == '__main__':
    print('线程创建之前num = ',num)
    # lock = threading.Lock()
    t1 = threading.Thread(target=demo1)
    t1.start()
    t2 = threading.Thread(target=demo2)
    t2.start()
    while len(threading.enumerate()) != 1:
        time.sleep(1)
    print('线程执行完毕num = ',num)
# 线程创建之前num =  100
# 线程执行完毕num =  1559954
两个线程分别对线程自增了10次,结果却是122,如果多个线程同时对同一个全局变量操作,会出现资源竞争问题,从而数据结果会不正确

同步

对于多线程非安全的问题,可以采用同步的方式来解决,每个线程对数据的修改时,都要先上锁,处理完成后再解锁,在上锁的过程中不允许任何线程打扰,这样就能保证线程安全性了,数据也不会不正确

互斥锁

当多个线程几乎同时修改某一个共享数据的时候,需要进程同步控制,线程同步能够保证多个线程安全访问竞争资源,最简单的同步机制是引入互斥锁,互斥锁为资源引入一个状态,锁定/非锁定,某个线程要更改共享数据时,此时资源状态为锁定,其他线程不能更改,当该线程修改完毕,将资源设置为非锁定,互斥锁保证了每次只能由一个线程进入写入,从而保证了多线程情况下数据的正确性

# 创建锁
lock = threading.Lock()

# 锁定
lock.acquire()

# 释放
lock.release()

使用互斥锁对两个线程对同一个全局变量各加1亿次

import threading
import time

num = 100
def demo1():
    global num
    for i in range(100000000):
        lock.acquire()
        num += 1
        lock.release()

def demo2():
    global num

    for i in range(100000000):
        lock.acquire()
        num += 1
        lock.release()

if __name__ == '__main__':
    print('线程创建之前num = ',num)
    lock = threading.Lock()
    t1 = threading.Thread(target=demo1)
    t1.start()
    t2 = threading.Thread(target=demo2)
    t2.start()
    while len(threading.enumerate()) != 1:
        time.sleep(1)
    print('线程执行完毕num = ',num)
# 线程创建之前num =  100
# 线程执行完毕num =  200000100

上锁解锁过程

当一个线程调用所的acquire方法获得锁时,锁就进入locked状态,每次只能有一个线程的锁,如果此时有另外一个线程试图获得锁,那么此时这个锁就会进入阻塞状态,直到拥有锁的线程调用release解锁之后,锁进入unlocked状态,其他线程就可以获得锁了

锁的优缺点

确保了某段关键代码只能由一个线程完整执行,确保了数据的完整性,阻止了多线程并发,使得包含的锁的代码只能以单线程执行,效率就大大降低了,还可能发生死锁

死锁

在线程共享多个资源的时候,如果两个线程分别占有一部分资源并且同时等待对方的资源,就会形成死锁

import threading
import time

class MyThread1(threading.Thread):
    def run(self):
        # lockA上锁
        lockA.acquire()
        # 延时1秒,等待另外那个线程 把lockB上锁
        print(self.name+' A start')
        time.sleep(1)
        # 堵塞,因为这个lockB已经被另外的线程抢先上锁了
        lockB.acquire()
        print(self.name+' B end')
        lockB.release()
        # lockA解锁
        lockA.release()

class MyThread2(threading.Thread):
    def run(self):
        # lockB上锁
        lockB.acquire()
        # 延时1秒,等待另外那个线程 把lockA上锁
        print(self.name+' B start')
        time.sleep(1)
        # 堵塞,lockA已经被另外的线程抢先上锁了
        lockA.acquire()
        print(self.name+' A end')
        lockA.release()
        # lockB解锁
        lockB.release()

if __name__ == '__main__':
    lockA = threading.Lock()
    lockB = threading.Lock()
    t1 = MyThread1()
    t2 = MyThread2()
    t1.start()
    t2.start()
# Thread-1 A start
# Thread-2 B start

如何避免死锁

import threading
import time

class MyThread1(threading.Thread):
    def run(self):
        lockA.acquire()
        print(self.name, 'A', 'start')
        time.sleep(1)
        # 如果在规定时间内可以上锁就返回True,反之。。。
        result = lockB.acquire(timeout=1)
        if result:
            print(self.name, 'B', 'start')
            lockA.release()
            print(self.name, 'A', 'end')
            lockB.release()
            print(self.name, 'B', 'end')
        else:
            lockA.release()

class MyThread2(threading.Thread):
    def run(self):
        lockB.acquire()
        print(self.name, 'B', 'start')
        time.sleep(1)
        lockA.acquire()
        print(self.name, 'A', 'start')
        lockA.release()
        print(self.name, 'A', 'end')
        lockB.release()
        print(self.name, 'B', 'end')

if __name__ == '__main__':
    lockA = threading.Lock()
    lockB = threading.Lock()
    t1 = MyThread1()
    t2 = MyThread2()
    t1.start()
    t2.start()

GIL

Python语言和GIL没有半毛钱关系,仅仅是由于历史原因在Cpython虚拟机,难以移除GIL

GIL,全局解释器锁,每个线程在执行的过程都需要先获取GIL,保证同一时刻只有一个线程可以执行代码

线程释放GIL锁的情况: 在IO操作等可能会引起阻塞的system call之前,可以暂时释放GIL,但在执行完毕后,必须重新获取GIL

Python使用多进程是可以利用多核的CPU资源的

多线程爬取比单线程性能有提升,因为遇到IO阻塞会自动释放GIL锁

No.3 进程

什么是进程?

一个程序在运行期间,代码和程序运行所需的资源称为进程

进程的状态

工作中,任务数往往大于cpu核心数,所以一定有一些任务在执行,另外一部分是处于等待状态
就绪态,运行的条件已经满足,等待cpu执行
执行态,cpu正在执行该任务
等待态,等待某些条件满足

创建进程

from multiprocessing import Process
import time

def run_proc():
    while True:
        print('子线程')
        time.sleep(1)

if __name__ == '__main__':
    p = Process(target=run_proc)
    p.start()
    while True:
        print('主线程')
        time.sleep(1)

pid

from multiprocessing import Process
import time
import os
def run_proc():
    # 获取当前进程的pid
    print('子进程 pid = ',os.getpid())

if __name__ == '__main__':
    print('父进程 pid = ',os.getpid())
    p = Process(target=run_proc)
    p.start()

Process语法结构

Process([group [, target [, name [, args [, kwargs]]]]])
target 如果传递了函数的引用,可以让子进程执行内部代码
args 给target指定的函数传递参数,元组方式
kwargs 给target指定的函数传递命名参数
name 给进程设定一个名字
group 指定进程组
常用方法
start() 启动子进程
is_alive() 判断进程是否还在运行
join([timeout]) 是否等待子进程执行结束
terminate() 不管任务是否执行完毕,直接结束

进程不能共享全局变量

from multiprocessing import Process
import time
NUM_LIST = [11,22,33]

def demo1(num_list):
    num_list.append(44)
    print(num_list)

def demo2(num_list):
    num_list.append(55)
    print(num_list)

if __name__ == '__main__':
    p1 = Process(target=demo1,args=(NUM_LIST,))
    p2 = Process(target=demo2,args=(NUM_LIST,))
    p1.start()
    p2.start()
    print(NUM_LIST)
# [11, 22, 33]
# [11, 22, 33, 44]
# [11, 22, 33, 55]

进程、线程对比

定义

进程,能够完成多任务,例如,在一台电脑上登录多个QQ客户端

线程,能够完成多任务,例如,在一台电脑上和多个妹子聊天

区别

一个程序至少有一个进程,一个进程至少有一个线程,线程的划分尺度小于进程,是的多线程的并发高于多进程,进程在执行过程中拥有独立的内存单元,而线程却是共享的,线程的运行开销小,但是不安全,进程和它相反,所以我们要根据不同的场景选择适合的

进程池

当需要创建的线程数量不多时,我们可以直接利用Process动态生成进程,但是当进程数量上百甚至上千时,我们再采用Process创建进程就可以猝死了,此时可以使用pool,初始化pool时,可以指定一个最大进程数,当有新的请求提交到pool时,如果线程池没有满,那么会创建一个新的线程来执行该请求,否则,等待其他进程结束

from multiprocessing import Pool
import time

def func(arg):
  print(arg)
  time.sleep(1)

if __name__ == '__main__':
  pool = Pool(5)
  for i in range(30):
      pool.apply_async(func=func,args=(i,))
  pool.close() # 所有任务执行完毕
  pool.join() # 等待pool中所有子进程执行完成,必须放在close语句之后

pool函数解析

apply_async(func[, args[, kwds]]) :使用非阻塞方式调用func(并行执行,堵塞方式必须等待上一个进程退出才能执行下一个进程),args为传递给func的参数列表,kwds为传递给func的关键字参数列表
close():关闭Pool,使其不再接受新的任务;
terminate():不管任务是否完成,立即终止;
join():主进程阻塞,等待子进程的退出, 必须在close或terminate之后使用;

进程池中的queue

from multiprocessing import Pool,Manager
import time
import random

def write(q):
    for i in [11,22,33]:
        if not q.full():
            q.put(i)
            print('put %s to queue' %i)
            time.sleep(random.random())

def read(q):
    while True:
        if not q.empty():
            value = q.get()
            print('get %s to queue' %value)
            time.sleep(random.random())
        else:
            break

if __name__ == '__main__':
    q = Manager().Queue()
    pool = Pool()
    pool.apply_async(write,args=(q,))
    # time.sleep(1)
    pool.apply_async(read,args=(q,))
    pool.close()
    pool.join()

进程间通信

进程间有时也需要通信,可以使用multiprocessing模块的Queue实现

初始化Queue()对象时,若括号中没有指定最大可接收的消息数量,或数量为负值,那么就代表可接受的消息数量没有上限
Queue.qsize() 返回当前队列包含的消息数量
Queue.empty() 如果队列为空,返回True,反之False
Queue.full() 如果队列满了,返回True,反之False
Queue.get([block[, timeout]]) 获取队列中的一条消息,然后将其从列队中移除,block默认值为True,如果block使用默认值,且没有设置timeout,消息列队如果为空,此时程序将被阻塞,直到从消息列队读到消息为止,如果设置了timeout,则会等待timeout秒,若还没读取到任何消息,则出"Queue.Empty"异常,如果block值为False,消息列队如果为空,则会立刻抛出"Queue.Empty"异常
Queue.get_nowait() 相当Queue.get(False)
Queue.put(item,[block[, timeout]] 将item消息写入队列,block默认值为True,如果block使用默认值,且没有设置timeout,消息列队如果已经没有空间可写入,此时程序将被阻塞,直到从消息列队腾出空间为止,如果设置了timeout,则会等待timeout秒,若还没空间,则抛出"Queue.Full"异常,如果block值为False,消息列队如果没有空间可写入,则会立刻抛出"Queue.Full"异常
Queue.put_nowait(item) 相当Queue.put(item, False)

栗子

from multiprocessing import Process,Queue
import time
import random

def write(q):
    for i in [11,22,33]:
        if not q.full():
            q.put(i)
            print('put %s to queue' %i)
            time.sleep(random.random())

def read(q):
    while True:
        if not q.empty():
            value = q.get()
            print('get %s to queue' %value)
            time.sleep(random.random())
        else:
            break

if __name__ == '__main__':
    q = Queue()
    t1 = Process(target=write,args=(q,))
    t2 = Process(target=read,args=(q,))
    t1.start()
    t1.join()
    t2.start()
    t2.join()

No.4 迭代器

迭代是遍历序列的一种方式,它可以记住序列的遍历位置,迭代器从第一个元素开始访问,只能向前,不能后退

可迭代对象

可以通过for...in...这类语句迭代的对象称为可迭代对象

如何判断一个对象是否可以迭代

可以使用inistance(obj,Iterable)判断一个对象是不是iterable对象

可迭代对象的本质

可迭代对象进行迭代的时候,我们发现没迭代一次,都会返回对象的中的下一条数据,一直往后读取数据直到数据全部迭代完成,那么,这个负责记录数据迭代到的索引的机制叫做迭代器,可迭代对象通过iter方法向我们提供一个迭代器,我们在迭代对象的时候,实际上就是调用该方法获取了一个迭代器,然后根据迭代器来获取数据的,也就说,具有iter方法 的对象称为可迭代对象

from collections import Iterable

class MyInt(int):
    def __iter__(self):
        pass

if __name__ == '__main__':
    myint = MyInt()
    print(isinstance(myint, Iterable)) # True

iter函数与next函数

可迭代对象通过iter函数获取迭代器,我们可以对获取到的迭代器不停的使用next函数来获取下一条数据,当我们对迭代器使用iter函数就是调用了可迭代对象的iter函数,注意,当我们迭代玩最后一个数据时,再次调用next函数会抛出StopIterable异常

迭代器iterable

当我们对迭代器使用next方法的时候,实际上时调用的next函数(Python2是next函数),所以,想构建一个迭代器,就要实现它的nextiter函数,实现了这两个函数,就是迭代器

class MyIterator(object):
    """自定义的供上面可迭代对象使用的一个迭代器"""
    def __init__(self):
        self.items = []
        self.current = 0 # current用来记录当前访问到的位置
    def add(self,value):
        self.items.append(value)

    def __next__(self):
        if self.current < len(self.items):
            item = self.items[self.current]
            self.current += 1
            return item
        else:
            raise StopIteration

    def __iter__(self):
        return self

if __name__ == '__main__':
    mi = MyIterator()
    mi.add(1)
    mi.add(2)
    mi.add(3)
    mi.add(4)
    mi.add(5)
    for num in mi:
        print(num)

for...in...本质

本质就是先通过iter获取迭代器,在通过迭代器不断的调用next方法,当遇到异常退出

迭代器应用场景

每次返回的数据不是在一个已有的数据集合中读取,而是程序通过一定的规律计算生成的,也就是说不用将所有要迭代的数据一次存储下来提供后续读取,这样可以节省很大空间

class FibIterator(object):
    def __init__(self, n):
        self.n = n
        self.current = 0
        self.num1 = 0
        self.num2 = 1

    def __next__(self):
        if self.current < self.n:
            num = self.num1
            self.num1, self.num2 = self.num2, self.num1+self.num2
            self.current += 1
            return num
        else:
            raise StopIteration

    def __iter__(self):
        return self

if __name__ == '__main__':
    fib = FibIterator(10)
    for num in fib:
        print(num, end=" ")

No.5 生成器

生成器

一边循环一边计算的机制,称为生成器,生成器是一种特殊的迭代器

创建生成器

将列表生成式的定界符改成()

G = ( x*2 for x in range(5))
G
<generator object <genexpr> at 0x000001E86BC993B8>

创建生成式和生成器的区别仅在于定界符,L是一个列表,G是一个生成器,我们可以直接打印出列表的每个元素,而对于生成其,我们可以按照迭代器的使用方法来使用

使用yield

def fib(n):
    current = 0
    num1, num2 = 0, 1
    while current < n:
        num = num1
        num1, num2 = num2, num1+num2
        current += 1
        yield num

if __name__ == '__main__':
    f = fib(10)
    for i in f:
        print(i)

使用了yield关键字的函数就是生成器,yield的作用有两点,一是保存当前运行状态,暂停执行,挂起生成器,二是将yield后面的表达式的值作为返回值返回,可以使用next函数让生成器从断点处继续执行,唤醒生成器

使用send唤醒

我们除了可以使用next函数来唤醒生成器,还可以使用send函数,使用send还可以在唤醒的同时从间断点传入一个附加数据

def fib(n):
    current = 0
    num1, num2 = 0, 1
    while current < n:
        num = num1
        num1, num2 = num2, num1+num2
        current += 1
        yield num

if __name__ == '__main__':
    f = fib(10)
    print(next(f))
    print(f.send('...'))

No.7 协程

协程概念

协程是什么

协程是Python中另外一种实现多任务的方式,只不是比比线程更小的执行单元,协程自带CPU上下文,只要在合适的时机,我们可以把一个协程切换到另一个协程,这要在这个过程中保存或恢复CPU上下文那么程序还是可以运行的,说到这,小伙伴们是不是想到了上文介绍的yield

协程和线程差异

在实现多任务时, 线程切换从系统层面远不止保存和恢复 CPU上下文这么简单。 操作系统为了程序运行的高效性每个线程都有自己缓存Cache等等数据,操作系统还会帮你做这些数据的恢复操作。 所以线程的切换非常耗性能。但是协程的切换只是单纯的操作CPU的上下文,所以一秒钟切换个上百万次系统都抗的住

简单实现协程

import time

def work1():
    while True:
        print("----work1---")
        yield
        time.sleep(0.5)

def work2():
    while True:
        print("----work2---")
        yield
        time.sleep(0.5)

def main():
    w1 = work1()
    w2 = work2()
    while True:
        next(w1)
        next(w2)

if __name__ == "__main__":
    main()

greenlet

安装greenlet pip3 install greenlet

from greenlet import greenlet
import time

def demo1():
    while True:
        print('Demo1 is running')
        gl2.switch()
        time.sleep(1)

def demo2():
    while True:
        print('Demo2 is running')
        gl1.switch()
        time.sleep(1)

if __name__ == '__main__':
    gl1 = greenlet(demo1)
    gl2 = greenlet(demo2)
    gl1.switch() # 切换到gl1执行

gevent

交替运行

import gevent

def demo():
    for i in range(10):
        print(gevent.getcurrent(),i)

if __name__ == '__main__':
    g1 = gevent.spawn(demo,)
    g2 = gevent.spawn(demo,)
    g3 = gevent.spawn(demo,)
    g4 = gevent.spawn(demo,)
    g1.join()
    g2.join()
    g3.join()
    g4.join()

自动切换

import gevent

def demo():
    for i in range(10):
        print(gevent.getcurrent(),i)
        gevent.sleep(1)

if __name__ == '__main__':
    g1 = gevent.spawn(demo,)
    g2 = gevent.spawn(demo,)
    g3 = gevent.spawn(demo,)
    g4 = gevent.spawn(demo,)
    g1.join()
    g2.join()
    g3.join()
    g4.join()

No.7 线程、进程、协程区别

进程是资源分配的单位

线程是操作系统调度的单位

进程切换需要的资源很最大,效率很低

线程切换需要的资源一般,效率一般(当然了在不考虑GIL的情况下)

协程切换任务资源很小,效率高

多进程、多线程根据cpu核数不一样可能是并行的,但是协程是在一个线程中,所以是并发

推荐阅读:
  1. Python全栈开发之Git
  2. Python全栈开发之常用模块

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

python 全栈开发 并发编程

上一篇:多行同时请求与串行请求之——promise.all/promise.race

下一篇:批量删除nagios的停机计划

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》