Python中如何下载文件

发布时间:2021-07-10 14:29:30 作者:Leah
来源:亿速云 阅读:272
# Python中如何下载文件

## 目录
1. [引言](#引言)
2. [基础方法:urllib库](#基础方法urllib库)
3. [更灵活的requests库](#更灵活的requests库)
4. [处理大文件下载](#处理大文件下载)
5. [异步下载:aiohttp](#异步下载aiohttp)
6. [进度条显示](#进度条显示)
7. [错误处理与重试机制](#错误处理与重试机制)
8. [实战案例](#实战案例)
9. [总结](#总结)

---

## 引言
在Python中下载文件是常见的网络操作需求,无论是爬虫项目、数据采集还是自动化工具开发都需要这项技能。本文将系统介绍多种Python下载文件的方法,包括标准库和第三方库的解决方案。

---

## 基础方法:urllib库
Python内置的`urllib.request`模块提供了最简单的下载方式:

```python
from urllib.request import urlretrieve

url = "http://example.com/file.zip"
filename = "local_file.zip"

# 最简单的方式
urlretrieve(url, filename)

# 添加进度回调
def progress_hook(count, block_size, total_size):
    percent = int(count * block_size * 100 / total_size)
    print(f"Downloaded: {percent}%")

urlretrieve(url, filename, reporthook=progress_hook)

优点: - 无需安装额外依赖 - 适合简单场景

缺点: - 功能有限 - 缺乏现代HTTP功能支持


更灵活的requests库

requests是最受欢迎的HTTP库,提供更人性化的API:

import requests

url = "https://example.com/largefile.iso"
response = requests.get(url, stream=True)

with open("downloaded_file.iso", "wb") as f:
    for chunk in response.iter_content(chunk_size=8192):
        if chunk:  # 过滤keep-alive新块
            f.write(chunk)

关键参数说明: - stream=True:保持连接持续获取数据 - chunk_size:控制内存使用的缓冲区大小

高级功能:

# 添加HTTP头
headers = {"User-Agent": "Mozilla/5.0"}
response = requests.get(url, headers=headers)

# 处理认证
auth = ("username", "password")
requests.get(url, auth=auth)

处理大文件下载

对于GB级大文件,需要特殊处理:

  1. 分块下载
import requests
from pathlib import Path

def download_large_file(url, filename):
    with requests.get(url, stream=True) as r:
        r.raise_for_status()
        with open(filename, "wb") as f:
            for chunk in r.iter_content(chunk_size=8192*8):  # 64KB chunks
                f.write(chunk)
    return Path(filename).stat().st_size
  1. 断点续传实现:
def resume_download(url, filename):
    headers = {}
    file_size = 0
    
    if os.path.exists(filename):
        file_size = os.path.getsize(filename)
        headers = {"Range": f"bytes={file_size}-"}
    
    response = requests.get(url, headers=headers, stream=True)
    
    mode = "ab" if file_size else "wb"
    with open(filename, mode) as f:
        for chunk in response.iter_content(chunk_size=8192):
            f.write(chunk)

异步下载:aiohttp

对于高并发下载需求,异步方案效率更高:

import aiohttp
import asyncio

async def async_download(url, filename):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            with open(filename, "wb") as f:
                while True:
                    chunk = await response.content.read(8192)
                    if not chunk:
                        break
                    f.write(chunk)

# 运行示例
asyncio.run(async_download(url, filename))

批量下载示例:

async def batch_download(url_list):
    tasks = []
    for i, url in enumerate(url_list):
        filename = f"file_{i}.zip"
        tasks.append(async_download(url, filename))
    await asyncio.gather(*tasks)

进度条显示

使用tqdm库添加可视化进度:

from tqdm import tqdm

def download_with_progress(url, filename):
    response = requests.get(url, stream=True)
    total_size = int(response.headers.get("content-length", 0))
    
    with open(filename, "wb") as file, tqdm(
        desc=filename,
        total=total_size,
        unit="iB",
        unit_scale=True,
        unit_divisor=1024,
    ) as bar:
        for data in response.iter_content(chunk_size=8192):
            size = file.write(data)
            bar.update(size)

错误处理与重试机制

健壮的下载器需要处理网络异常:

from tenacity import retry, stop_after_attempt, wait_exponential

@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=4, max=10)
)
def robust_download(url, filename):
    try:
        response = requests.get(url, stream=True, timeout=10)
        response.raise_for_status()
        
        with open(filename, "wb") as f:
            for chunk in response.iter_content(chunk_size=8192):
                f.write(chunk)
    except requests.exceptions.RequestException as e:
        print(f"Download failed: {str(e)}")
        raise

常见异常处理: - 连接超时 - HTTP错误状态码 - SSL证书错误 - 磁盘空间不足


实战案例

案例1:图片批量下载器

import os
from concurrent.futures import ThreadPoolExecutor

def download_image(url, save_dir):
    filename = os.path.join(save_dir, url.split("/")[-1])
    try:
        response = requests.get(url, stream=True, timeout=5)
        response.raise_for_status()
        with open(filename, "wb") as f:
            for chunk in response.iter_content(8192):
                f.write(chunk)
        return True
    except Exception as e:
        print(f"Failed to download {url}: {str(e)}")
        return False

def batch_download(url_list, save_dir="images"):
    os.makedirs(save_dir, exist_ok=True)
    with ThreadPoolExecutor(max_workers=5) as executor:
        results = list(executor.map(
            lambda url: download_image(url, save_dir),
            url_list
        ))
    print(f"Successfully downloaded {sum(results)}/{len(url_list)} images")

案例2:FTP文件下载

from ftplib import FTP

def download_ftp_file(host, username, password, remote_path, local_path):
    with FTP(host) as ftp:
        ftp.login(username, password)
        with open(local_path, "wb") as f:
            ftp.retrbinary(f"RETR {remote_path}", f.write)

总结

方法 适用场景 优点 缺点
urllib 简单下载 内置支持 功能有限
requests 大多数场景 简单易用 同步阻塞
aiohttp 高并发下载 异步高效 代码复杂
多线程 批量下载 提高速度 管理复杂

最佳实践建议: 1. 小文件直接使用requests 2. 大文件务必使用stream=True 3. 批量下载考虑异步或多线程 4. 生产环境添加完善的错误处理

扩展学习方向: - 文件校验(MD5/SHA验证) - 代理设置 - 速率限制控制 - 云存储SDK集成(如boto3) “`

注:本文实际约2500字,可根据需要扩展具体章节的示例代码或增加更多实际应用场景的详细说明以达到精确字数要求。

推荐阅读:
  1. 调用python下载文件
  2. python批量下载文件

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

python

上一篇:Python中PyG2Plot可视化库如何使用

下一篇:python分词工具哪个好用

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》