您好,登录后才能下订单哦!
密码登录
登录注册
点击 登录注册 即表示同意《亿速云用户服务条款》
# Python中如何下载文件
## 目录
1. [引言](#引言)
2. [基础方法:urllib库](#基础方法urllib库)
3. [更灵活的requests库](#更灵活的requests库)
4. [处理大文件下载](#处理大文件下载)
5. [异步下载:aiohttp](#异步下载aiohttp)
6. [进度条显示](#进度条显示)
7. [错误处理与重试机制](#错误处理与重试机制)
8. [实战案例](#实战案例)
9. [总结](#总结)
---
## 引言
在Python中下载文件是常见的网络操作需求,无论是爬虫项目、数据采集还是自动化工具开发都需要这项技能。本文将系统介绍多种Python下载文件的方法,包括标准库和第三方库的解决方案。
---
## 基础方法:urllib库
Python内置的`urllib.request`模块提供了最简单的下载方式:
```python
from urllib.request import urlretrieve
url = "http://example.com/file.zip"
filename = "local_file.zip"
# 最简单的方式
urlretrieve(url, filename)
# 添加进度回调
def progress_hook(count, block_size, total_size):
percent = int(count * block_size * 100 / total_size)
print(f"Downloaded: {percent}%")
urlretrieve(url, filename, reporthook=progress_hook)
优点: - 无需安装额外依赖 - 适合简单场景
缺点: - 功能有限 - 缺乏现代HTTP功能支持
requests
是最受欢迎的HTTP库,提供更人性化的API:
import requests
url = "https://example.com/largefile.iso"
response = requests.get(url, stream=True)
with open("downloaded_file.iso", "wb") as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk: # 过滤keep-alive新块
f.write(chunk)
关键参数说明:
- stream=True
:保持连接持续获取数据
- chunk_size
:控制内存使用的缓冲区大小
高级功能:
# 添加HTTP头
headers = {"User-Agent": "Mozilla/5.0"}
response = requests.get(url, headers=headers)
# 处理认证
auth = ("username", "password")
requests.get(url, auth=auth)
对于GB级大文件,需要特殊处理:
import requests
from pathlib import Path
def download_large_file(url, filename):
with requests.get(url, stream=True) as r:
r.raise_for_status()
with open(filename, "wb") as f:
for chunk in r.iter_content(chunk_size=8192*8): # 64KB chunks
f.write(chunk)
return Path(filename).stat().st_size
def resume_download(url, filename):
headers = {}
file_size = 0
if os.path.exists(filename):
file_size = os.path.getsize(filename)
headers = {"Range": f"bytes={file_size}-"}
response = requests.get(url, headers=headers, stream=True)
mode = "ab" if file_size else "wb"
with open(filename, mode) as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
对于高并发下载需求,异步方案效率更高:
import aiohttp
import asyncio
async def async_download(url, filename):
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
with open(filename, "wb") as f:
while True:
chunk = await response.content.read(8192)
if not chunk:
break
f.write(chunk)
# 运行示例
asyncio.run(async_download(url, filename))
批量下载示例:
async def batch_download(url_list):
tasks = []
for i, url in enumerate(url_list):
filename = f"file_{i}.zip"
tasks.append(async_download(url, filename))
await asyncio.gather(*tasks)
使用tqdm
库添加可视化进度:
from tqdm import tqdm
def download_with_progress(url, filename):
response = requests.get(url, stream=True)
total_size = int(response.headers.get("content-length", 0))
with open(filename, "wb") as file, tqdm(
desc=filename,
total=total_size,
unit="iB",
unit_scale=True,
unit_divisor=1024,
) as bar:
for data in response.iter_content(chunk_size=8192):
size = file.write(data)
bar.update(size)
健壮的下载器需要处理网络异常:
from tenacity import retry, stop_after_attempt, wait_exponential
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10)
)
def robust_download(url, filename):
try:
response = requests.get(url, stream=True, timeout=10)
response.raise_for_status()
with open(filename, "wb") as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
except requests.exceptions.RequestException as e:
print(f"Download failed: {str(e)}")
raise
常见异常处理: - 连接超时 - HTTP错误状态码 - SSL证书错误 - 磁盘空间不足
import os
from concurrent.futures import ThreadPoolExecutor
def download_image(url, save_dir):
filename = os.path.join(save_dir, url.split("/")[-1])
try:
response = requests.get(url, stream=True, timeout=5)
response.raise_for_status()
with open(filename, "wb") as f:
for chunk in response.iter_content(8192):
f.write(chunk)
return True
except Exception as e:
print(f"Failed to download {url}: {str(e)}")
return False
def batch_download(url_list, save_dir="images"):
os.makedirs(save_dir, exist_ok=True)
with ThreadPoolExecutor(max_workers=5) as executor:
results = list(executor.map(
lambda url: download_image(url, save_dir),
url_list
))
print(f"Successfully downloaded {sum(results)}/{len(url_list)} images")
from ftplib import FTP
def download_ftp_file(host, username, password, remote_path, local_path):
with FTP(host) as ftp:
ftp.login(username, password)
with open(local_path, "wb") as f:
ftp.retrbinary(f"RETR {remote_path}", f.write)
方法 | 适用场景 | 优点 | 缺点 |
---|---|---|---|
urllib | 简单下载 | 内置支持 | 功能有限 |
requests | 大多数场景 | 简单易用 | 同步阻塞 |
aiohttp | 高并发下载 | 异步高效 | 代码复杂 |
多线程 | 批量下载 | 提高速度 | 管理复杂 |
最佳实践建议:
1. 小文件直接使用requests
2. 大文件务必使用stream=True
3. 批量下载考虑异步或多线程
4. 生产环境添加完善的错误处理
扩展学习方向: - 文件校验(MD5/SHA验证) - 代理设置 - 速率限制控制 - 云存储SDK集成(如boto3) “`
注:本文实际约2500字,可根据需要扩展具体章节的示例代码或增加更多实际应用场景的详细说明以达到精确字数要求。
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。