您好,登录后才能下订单哦!
密码登录
登录注册
点击 登录注册 即表示同意《亿速云用户服务条款》
# 怎么用Python实现进程、线程和协程
## 前言
在现代计算机编程中,并发编程是提高程序性能的重要手段。Python作为一门流行的编程语言,提供了多种实现并发的方式,主要包括进程、线程和协程。本文将详细介绍这三种并发编程方式的概念、区别以及在Python中的具体实现方法。
## 一、进程、线程和协程的基本概念
### 1.1 进程(Process)
进程是操作系统资源分配的基本单位,每个进程都有独立的内存空间和系统资源。进程之间的通信需要通过特定的机制(如管道、消息队列等)来实现。
特点:
- 独立性:进程之间相互独立,一个进程崩溃不会影响其他进程
- 资源开销大:创建和销毁进程需要较大的系统开销
- 通信复杂:进程间通信(IPC)需要特殊机制
### 1.2 线程(Thread)
线程是进程的执行单元,一个进程可以包含多个线程,这些线程共享进程的内存空间和资源。
特点:
- 共享内存:同一进程内的线程共享内存空间
- 轻量级:创建和切换线程的开销比进程小
- 存在GIL限制:在CPython中,全局解释器锁(GIL)限制了多线程的并行执行
### 1.3 协程(Coroutine)
协程是一种用户态的轻量级线程,由程序员在代码中显式控制切换,而不是由操作系统调度。
特点:
- 极轻量:协程的切换开销极小
- 非抢占式:由程序员控制切换时机
- 高并发:单线程内可支持大量协程并发
## 二、Python中的多进程实现
### 2.1 multiprocessing模块
Python的multiprocessing模块提供了创建进程的API,可以绕过GIL限制实现真正的并行。
#### 基本使用
```python
import multiprocessing
import os
def worker():
print(f'Worker process {os.getpid()}')
if __name__ == '__main__':
processes = []
for _ in range(3):
p = multiprocessing.Process(target=worker)
processes.append(p)
p.start()
for p in processes:
p.join()
from multiprocessing import Pool
def square(x):
return x * x
if __name__ == '__main__':
with Pool(4) as p:
results = p.map(square, range(10))
print(results)
from multiprocessing import Process, Queue
def producer(q):
q.put('Hello from producer')
def consumer(q):
print(q.get())
if __name__ == '__main__':
q = Queue()
p1 = Process(target=producer, args=(q,))
p2 = Process(target=consumer, args=(q,))
p1.start()
p2.start()
p1.join()
p2.join()
from multiprocessing import Process, Pipe
def sender(conn):
conn.send('Hello from sender')
conn.close()
def receiver(conn):
print(conn.recv())
conn.close()
if __name__ == '__main__':
parent_conn, child_conn = Pipe()
p1 = Process(target=sender, args=(child_conn,))
p2 = Process(target=receiver, args=(parent_conn,))
p1.start()
p2.start()
p1.join()
p2.join()
Python的threading模块提供了线程相关的操作。
import threading
import time
def worker(num):
print(f'Thread {num} started')
time.sleep(1)
print(f'Thread {num} finished')
threads = []
for i in range(3):
t = threading.Thread(target=worker, args=(i,))
threads.append(t)
t.start()
for t in threads:
t.join()
import threading
counter = 0
lock = threading.Lock()
def increment():
global counter
with lock:
temp = counter
temp += 1
counter = temp
threads = []
for _ in range(10):
t = threading.Thread(target=increment)
t.start()
threads.append(t)
for t in threads:
t.join()
print(f'Final counter value: {counter}')
import threading
import time
semaphore = threading.Semaphore(3) # 允许最多3个线程同时访问
def access_resource(thread_id):
with semaphore:
print(f'Thread {thread_id} accessing resource')
time.sleep(1)
print(f'Thread {thread_id} releasing resource')
threads = []
for i in range(5):
t = threading.Thread(target=access_resource, args=(i,))
t.start()
threads.append(t)
for t in threads:
t.join()
from concurrent.futures import ThreadPoolExecutor
import time
def task(name):
print(f'Task {name} started')
time.sleep(1)
print(f'Task {name} finished')
return f'Result of {name}'
with ThreadPoolExecutor(max_workers=3) as executor:
results = executor.map(task, ['A', 'B', 'C', 'D', 'E'])
for result in results:
print(result)
在Python 3.4之前,可以使用生成器实现简单的协程。
def coroutine():
print("Coroutine started")
while True:
x = yield
print("Received:", x)
c = coroutine()
next(c) # 启动协程
c.send(10)
c.send(20)
Python 3.4引入asyncio模块,提供了更完善的协程支持。
import asyncio
async def say_hello():
print("Hello")
await asyncio.sleep(1)
print("World")
async def main():
await asyncio.gather(say_hello(), say_hello(), say_hello())
asyncio.run(main())
import asyncio
async def fetch_data():
print("Start fetching")
await asyncio.sleep(2)
print("Done fetching")
return {'data': 1}
async def print_numbers():
for i in range(10):
print(i)
await asyncio.sleep(0.25)
async def main():
task1 = asyncio.create_task(fetch_data())
task2 = asyncio.create_task(print_numbers())
value = await task1
print(value)
await task2
asyncio.run(main())
import asyncio
async def set_after(fut, delay, value):
await asyncio.sleep(delay)
fut.set_result(value)
async def main():
loop = asyncio.get_running_loop()
fut = loop.create_future()
loop.create_task(set_after(fut, 1, '... world'))
print('hello ...')
print(await fut)
asyncio.run(main())
特性 | 进程 | 线程 | 协程 |
---|---|---|---|
创建开销 | 大 | 中 | 小 |
切换开销 | 大 | 中 | 极小 |
内存占用 | 高 | 低 | 极低 |
并行能力 | 是(多核) | 受限(GIL) | 否 |
数据共享 | 复杂(IPC) | 简单(共享内存) | 简单 |
import os
import requests
from multiprocessing import Pool
def download_image(url):
print(f"Downloading {url}")
response = requests.get(url, stream=True)
filename = os.path.basename(url)
with open(filename, 'wb') as f:
for chunk in response.iter_content(1024):
f.write(chunk)
print(f"Finished {url}")
if __name__ == '__main__':
urls = [
'https://example.com/image1.jpg',
'https://example.com/image2.jpg',
'https://example.com/image3.jpg'
]
with Pool(3) as p:
p.map(download_image, urls)
import threading
import queue
import requests
from bs4 import BeautifulSoup
class Crawler:
def __init__(self, base_url, num_threads=5):
self.base_url = base_url
self.num_threads = num_threads
self.queue = queue.Queue()
self.visited = set()
def crawl(self):
self.queue.put(self.base_url)
threads = []
for _ in range(self.num_threads):
t = threading.Thread(target=self.worker)
t.start()
threads.append(t)
self.queue.join()
for _ in range(self.num_threads):
self.queue.put(None)
for t in threads:
t.join()
def worker(self):
while True:
url = self.queue.get()
if url is None:
break
if url in self.visited:
self.queue.task_done()
continue
print(f"Crawling {url}")
try:
response = requests.get(url)
self.visited.add(url)
soup = BeautifulSoup(response.text, 'html.parser')
for link in soup.find_all('a'):
href = link.get('href')
if href and href.startswith('http'):
self.queue.put(href)
except Exception as e:
print(f"Error crawling {url}: {e}")
self.queue.task_done()
if __name__ == '__main__':
crawler = Crawler('https://example.com')
crawler.crawl()
import asyncio
import aiohttp
import time
async def fetch(session, url):
async with session.get(url) as response:
return await response.text()
async def main():
urls = [
'https://example.com',
'https://example.org',
'https://example.net'
] * 10
async with aiohttp.ClientSession() as session:
tasks = [fetch(session, url) for url in urls]
responses = await asyncio.gather(*tasks)
for url, content in zip(urls, responses):
print(f"{url}: {len(content)} bytes")
start = time.time()
asyncio.run(main())
print(f"Time taken: {time.time() - start:.2f} seconds")
问题:进程间通信开销大
问题:子进程不退出
问题:GIL限制性能
问题:线程安全问题
问题:阻塞操作影响事件循环
问题:协程不执行
Python提供了丰富的并发编程工具,开发者可以根据具体需求选择合适的并发模型:
在实际开发中,也可以组合使用这些技术,例如”多进程+协程”的模式,既能利用多核又能实现高并发。
随着Python版本的更新,异步编程的支持越来越完善,asyncio生态系统也在不断丰富,协程将成为Python高并发编程的主流选择。
”`
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。