您好,登录后才能下订单哦!
密码登录
登录注册
点击 登录注册 即表示同意《亿速云用户服务条款》
# 百万数据Python爬虫技巧有哪些
在当今大数据时代,高效获取海量数据已成为许多项目的核心需求。本文将深入探讨使用Python进行百万级数据爬取的关键技巧,涵盖从基础配置到高级优化的完整解决方案。
## 一、基础环境配置
### 1.1 选择合适的HTTP库
- **Requests库**:适合简单同步请求
```python
import requests
response = requests.get('https://example.com', timeout=10)
import aiohttp
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.text()
pip install requests aiohttp beautifulsoup4 scrapy selenium
from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=20) as executor:
executor.map(fetch_data, url_list)
import asyncio
async def fetch_all(urls):
async with aiohttp.ClientSession() as session:
tasks = [fetch(session, url) for url in urls]
return await asyncio.gather(*tasks)
import time
class RateLimiter:
def __init__(self, calls_per_second):
self.period = 1.0 / calls_per_second
self.last_call = 0
def __call__(self):
elapsed = time.time() - self.last_call
if elapsed < self.period:
time.sleep(self.period - elapsed)
self.last_call = time.time()
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Accept-Language': 'en-US,en;q=0.9',
'Referer': 'https://www.google.com/',
'X-Requested-With': 'XMLHttpRequest'
}
class ProxyPool:
def __init__(self):
self.proxies = [
'http://proxy1.example.com:8080',
'http://proxy2.example.com:3128'
]
self.current = 0
def get(self):
proxy = self.proxies[self.current]
self.current = (self.current + 1) % len(self.proxies)
return {'http': proxy, 'https': proxy}
# lxml比html.parser快10倍以上
soup = BeautifulSoup(html, 'lxml')
import re
pattern = re.compile(r'<div class="price">¥(\d+)</div>')
prices = pattern.findall(html)
# 使用绝对路径比相对路径快30%
tree.xpath('/html/body/div[2]/div[3]/span/text()')
# MySQL批量插入
cursor.executemany(
"INSERT INTO products VALUES (%s,%s,%s)",
[(1,'商品A',99), (2,'商品B',199)]
)
# 使用csv.writer的writerows方法
with open('data.csv', 'a', newline='') as f:
writer = csv.writer(f)
writer.writerows(data_chunk) # 批量写入
from functools import lru_cache
@lru_cache(maxsize=1024)
def parse_html(html):
# 解析逻辑
try:
response = requests.get(url, timeout=15)
except (requests.Timeout, requests.ConnectionError) as e:
logger.error(f"请求失败: {url} - {str(e)}")
return None
from tenacity import retry, stop_after_attempt
@retry(stop=stop_after_attempt(3))
def fetch_data(url):
return requests.get(url).json()
import logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('spider.log'),
logging.StreamHandler()
]
)
# settings.py
SCHEDULER = "scrapy_redis.scheduler.Scheduler"
DUPEFILTER_CLASS = "scrapy_redis.dupefilter.RFPDupeFilter"
@app.task(bind=True, max_retries=3)
def crawl_task(self, url):
try:
return requests.get(url).content
except Exception as exc:
raise self.retry(exc=exc)
import pika
connection = pika.BlockingConnection()
channel = connection.channel()
channel.queue_declare(queue='url_queue')
优化方案 | 请求速度(QPS) | CPU占用 | 内存占用 |
---|---|---|---|
单线程 | 5 | 15% | 50MB |
多线程(20线程) | 80 | 85% | 200MB |
异步IO(500并发) | 300 | 60% | 150MB |
import asyncio
import aiohttp
from bs4 import BeautifulSoup
async def fetch(session, url):
async with session.get(url) as response:
return await response.text()
async def parse(html):
soup = BeautifulSoup(html, 'lxml')
return soup.title.text
async def main(urls):
async with aiohttp.ClientSession() as session:
tasks = [fetch(session, url) for url in urls]
htmls = await asyncio.gather(*tasks)
return [await parse(html) for html in htmls]
if __name__ == '__main__':
urls = ['https://example.com/page{}'.format(i) for i in range(1,101)]
results = asyncio.run(main(urls))
print(results)
通过以上技巧的综合运用,可以构建出高效稳定的百万级数据爬取系统。建议在实际项目中根据具体需求选择合适的方案组合,并持续监控优化系统性能。 “`
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。