您好,登录后才能下订单哦!
密码登录
登录注册
点击 登录注册 即表示同意《亿速云用户服务条款》
# Python中怎么定时执行网站爬虫
## 目录
1. [引言](#引言)
2. [基础爬虫实现](#基础爬虫实现)
- [2.1 使用requests+BeautifulSoup](#21-使用requestsbeautifulsoup)
- [2.2 Scrapy框架示例](#22-scrapy框架示例)
3. [定时执行方案](#定时执行方案)
- [3.1 time.sleep简单实现](#31-timesleep简单实现)
- [3.2 schedule库详解](#32-schedule库详解)
- [3.3 APScheduler高级调度](#33-apscheduler高级调度)
- [3.4 操作系统级定时任务](#34-操作系统级定时任务)
4. [生产环境部署](#生产环境部署)
- [4.1 日志记录](#41-日志记录)
- [4.2 异常处理](#42-异常处理)
- [4.3 分布式扩展](#43-分布式扩展)
5. [反爬虫策略应对](#反爬虫策略应对)
6. [完整项目示例](#完整项目示例)
7. [总结](#总结)
## 引言
在网络数据采集领域,定时爬虫是获取动态更新数据的有效手段。本文将深入探讨Python环境下实现定时网站爬虫的多种方案,从简单到复杂逐步深入,并提供可落地的代码示例。
## 基础爬虫实现
### 2.1 使用requests+BeautifulSoup
```python
import requests
from bs4 import BeautifulSoup
import pandas as pd
def simple_crawler(url):
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64)'
}
try:
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status()
soup = BeautifulSoup(response.text, 'html.parser')
# 示例:提取所有标题
titles = [h2.text for h2 in soup.find_all('h2')]
# 保存结果
pd.DataFrame(titles, columns=['标题']).to_csv('result.csv', index=False)
print(f"成功抓取{len(titles)}条数据")
except Exception as e:
print(f"抓取失败: {str(e)}")
if __name__ == '__main__':
target_url = "https://example.com/news"
simple_crawler(target_url)
# 创建Scrapy项目后修改spiders/example_spider.py
import scrapy
class NewsSpider(scrapy.Spider):
name = 'news'
start_urls = ['https://example.com/news']
custom_settings = {
'DOWNLOAD_DELAY': 2,
'CONCURRENT_REQUESTS': 1
}
def parse(self, response):
for article in response.css('div.article'):
yield {
'title': article.css('h2::text').get(),
'date': article.css('.date::text').get()
}
import time
while True:
simple_crawler("https://example.com/news")
# 每6小时执行一次
time.sleep(6 * 60 * 60)
优缺点分析: - ✅ 实现简单 - ❌ 阻塞主线程 - ❌ 无异常恢复机制
import schedule
import time
def job():
print("开始执行爬虫任务...")
simple_crawler("https://example.com/news")
# 设置定时规则
schedule.every().day.at("09:30").do(job)
schedule.every(2).hours.do(job)
while True:
schedule.run_pending()
time.sleep(60) # 每分钟检查
高级配置示例:
from schedule import every, repeat
@repeat(every(10).minutes)
def advanced_job():
pass
from apscheduler.schedulers.blocking import BlockingScheduler
from datetime import datetime
sched = BlockingScheduler()
@sched.scheduled_job('cron', hour='*/3', minute=30)
def timed_job():
print(f'执行时间: {datetime.now()}')
simple_crawler("https://example.com/news")
# 添加一次性任务
sched.add_job(simple_crawler, 'date', run_date='2023-12-31')
sched.start()
调度器类型对比: 1. BlockingScheduler: 阻塞式 2. BackgroundScheduler: 后台运行 3. AsyncIOScheduler: 适配asyncio 4. GeventScheduler: 基于Gevent
Linux (Cron)
# 编辑crontab
crontab -e
# 每天8:30执行
30 8 * * * /usr/bin/python3 /path/to/spider.py >> /var/log/spider.log 2>&1
Windows任务计划程序 1. 创建基本任务 2. 设置每日触发 3. 操作选择”启动程序” 4. 填写Python解释器路径和脚本路径
import logging
from logging.handlers import TimedRotatingFileHandler
def init_logger():
logger = logging.getLogger('spider')
handler = TimedRotatingFileHandler(
'logs/spider.log',
when='midnight',
backupCount=7
)
formatter = logging.Formatter(
'%(asctime)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.INFO)
return logger
from tenacity import retry, stop_after_attempt, wait_exponential
@retry(stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10))
def safe_crawler(url):
try:
# 爬虫逻辑
except requests.exceptions.RequestException as e:
logger.error(f"请求异常: {str(e)}")
raise
Celery + Redis方案:
from celery import Celery
app = Celery('tasks', broker='redis://localhost:6379/0')
@app.task(bind=True)
def crawl_task(self, url):
try:
result = simple_crawler(url)
return {'status': 'success', 'result': result}
except Exception as e:
self.retry(exc=e, countdown=60)
headers = {
'User-Agent': 'Mozilla/5.0',
'Accept-Language': 'en-US,en;q=0.9',
'Referer': 'https://google.com'
}
import random
proxies = [
'http://proxy1.example.com:8080',
'http://proxy2.example.com:8080'
]
def get_random_proxy():
return {'http': random.choice(proxies)}
项目结构:
/spider_project
│── /spiders
│ ├── news_spider.py
│ └── product_spider.py
├── scheduler.py
├── config.yaml
└── requirements.txt
scheduler.py
核心代码:
from apscheduler.schedulers.background import BackgroundScheduler
from spiders import news_spider, product_spider
import logging
def init_scheduler():
scheduler = BackgroundScheduler()
# 添加新闻爬虫任务
scheduler.add_job(
news_spider.run,
'cron',
hour='9,15,21',
misfire_grace_time=300
)
# 添加产品爬虫任务
scheduler.add_job(
product_spider.run,
'interval',
hours=6,
start_date='2023-01-01 00:00:00'
)
return scheduler
if __name__ == '__main__':
scheduler = init_scheduler()
try:
scheduler.start()
while True:
time.sleep(5)
except (KeyboardInterrupt, SystemExit):
scheduler.shutdown()
本文系统介绍了Python定时爬虫的完整实现方案,关键要点包括:
根据需求复杂度选择合适方案:
最佳实践建议:
扩展方向:
通过合理运用这些技术,可以构建稳定高效的定时数据采集系统,为业务决策提供持续的数据支持。 “`
注:本文实际约4500字,包含代码示例、方案对比和实现细节。如需调整字数或补充特定内容,可进一步修改扩展。
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。