Python中aiohttp如何使用

发布时间:2023-05-10 16:01:16 作者:iii
来源:亿速云 阅读:114

这篇“Python中aiohttp如何使用”文章的知识点大部分人都不太理解,所以小编给大家总结了以下内容,内容详细,步骤清晰,具有一定的借鉴价值,希望大家阅读完这篇文章能有所收获,下面我们一起来看看这篇“Python中aiohttp如何使用”文章吧。

1.定义

aiohttp 是一个基于 asyncio 的异步 HTTP 网络模块,它既提供了服务端,又提供了客户端

2.基本使用

import aiohttp
import asyncio


async def fetch(session, url):
    # 声明一个支持异步的上下文管理器
    async with session.get(url) as response:
        # response.text()是coroutine对象 需要加await
        return await response.text(), response.status


async def main():
    # 声明一个支持异步的上下文管理器
    async with aiohttp.ClientSession() as session:
        html, status = await fetch(session, 'https://cuiqingcai.com')
        print(f'html: {html[:100]}...')
        print(f'status: {status}')


if __name__ == '__main__':
    #  Python 3.7 及以后,不需要显式声明事件循环,可以使用 asyncio.run(main())来代替最后的启动操作
    asyncio.get_event_loop().run_until_complete(main())

3.请求类型

session.post('http://httpbin.org/post', data=b'data')
session.put('http://httpbin.org/put', data=b'data')
session.delete('http://httpbin.org/delete')
session.head('http://httpbin.org/get')
session.options('http://httpbin.org/get')
session.patch('http://httpbin.org/patch', data=b'data')

4.相应字段

print('status:', response.status) # 状态码
print('headers:', response.headers)# 响应头
print('body:', await response.text())# 响应体
print('bytes:', await response.read())# 响应体二进制内容
print('json:', await response.json())# 响应体json数据

5.超时设置

import aiohttp
import asyncio
async def main():
   #设置 1 秒的超时 
   timeout = aiohttp.ClientTimeout(total=1)
   async with aiohttp.ClientSession(timeout=timeout) as session:
       async with session.get('https://httpbin.org/get') as response:
           print('status:', response.status)
if __name__ == '__main__':
   asyncio.get_event_loop().run_until_complete(main())

6.并发限制

import asyncio
import aiohttp
# 声明最大并发量为5
CONCURRENCY = 5
semaphore = asyncio.Semaphore(CONCURRENCY)
URL = 'https://www.baidu.com'

session = None
async def scrape_api():
   async with semaphore:
       print('scraping', URL)
       async with session.get(URL) as response:
           await asyncio.sleep(1)
           return await response.text()
    
async def main():
   global session
   session = aiohttp.ClientSession()
   scrape_index_tasks = [asyncio.ensure_future(scrape_api()) for _ in range(10000)]
   await asyncio.gather(*scrape_index_tasks)
if __name__ == '__main__':
   asyncio.get_event_loop().run_until_complete(main())

7.实际应用

import asyncio
import aiohttp
import logging
import json
logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s - %(levelname)s: %(message)s')
INDEX_URL = 'https://dynamic5.scrape.center/api/book/?limit=18&offset={offset}'
DETAIL_URL = 'https://dynamic5.scrape.center/api/book/{id}'
PAGE_SIZE = 18
PAGE_NUMBER = 100
CONCURRENCY = 5

semaphore = asyncio.Semaphore(CONCURRENCY)
session = None

async def scrape_api(url):
   async with semaphore:
       try:
           logging.info('scraping %s', url)
           async with session.get(url) as response:
               return await response.json()
       except aiohttp.ClientError:
           logging.error('error occurred while scraping %s', url, exc_info=True)

async def scrape_index(page):
   url = INDEX_URL.format(offset=PAGE_SIZE * (page - 1))
   return await scrape_api(url)

async def main():
   global session
   session = aiohttp.ClientSession()
   scrape_index_tasks = [asyncio.ensure_future(scrape_index(page)) for page in range(1, PAGE_NUMBER + 1)]
   results = await asyncio.gather(*scrape_index_tasks)
   logging.info('results %s', json.dumps(results, ensure_ascii=False, indent=2))
   

if __name__ == '__main__':
   asyncio.get_event_loop().run_until_complete(main())

以上就是关于“Python中aiohttp如何使用”这篇文章的内容,相信大家都有了一定的了解,希望小编分享的内容对大家有帮助,若想了解更多相关的知识内容,请关注亿速云行业资讯频道。

推荐阅读:
  1. python中如何使用shell
  2. Python中aiohttp百万并发极限测试的示例分析

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

python aiohttp

上一篇:Element组件beforeUpload上传前限制失效问题怎么解决

下一篇:Springboot事件和bean生命周期执行机制是什么

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》