您好,登录后才能下订单哦!
密码登录
登录注册
点击 登录注册 即表示同意《亿速云用户服务条款》
# 怎么用Python处理上百个表格
在数据分析和业务处理场景中,我们经常需要批量处理大量表格文件(如Excel、CSV)。本文将详细介绍如何使用Python高效处理上百个表格文件,涵盖以下核心场景:
## 一、典型应用场景
1. **财务部门**:每月需要合并100+分公司报表
2. **电商运营**:分析各平台店铺的每日销售数据
3. **科研实验**:处理不同实验组的重复测量数据
4. **物联网**:解析多设备生成的日志表格
## 二、环境准备
推荐使用以下工具组合:
```python
# 基础工具库
import pandas as pd
import numpy as np
import os
# 性能优化
from joblib import Parallel, delayed
import dask.dataframe as dd
# 可视化
import matplotlib.pyplot as plt
def read_files_basic(folder):
all_data = []
for file in os.listdir(folder):
if file.endswith(('.xlsx', '.csv')):
filepath = os.path.join(folder, file)
df = pd.read_csv(filepath) if file.endswith('.csv') else pd.read_excel(filepath)
df['source_file'] = file # 标记数据来源
all_data.append(df)
return pd.concat(all_data, ignore_index=True)
from concurrent.futures import ThreadPoolExecutor
def read_file_parallel(file):
if file.endswith('.csv'):
return pd.read_csv(os.path.join(folder, file))
else:
return pd.read_excel(os.path.join(folder, file))
with ThreadPoolExecutor(max_workers=8) as executor:
results = list(executor.map(read_file_parallel,
[f for f in os.listdir(folder) if f.endswith(('.csv','.xlsx'))]))
combined_df = pd.concat(results)
ddf = dd.read_csv('folder/*.csv') # 支持通配符
result = ddf.groupby('category').sum().compute()
def standardize_columns(df):
df.columns = df.columns.str.lower().str.replace(' ', '_')
return df
# 应用示例
standardized_dfs = [standardize_columns(df) for df in raw_dfs]
def optimize_memory(df):
for col in df.select_dtypes(include=['int64']):
df[col] = pd.to_numeric(df[col], downcast='integer')
for col in df.select_dtypes(include=['float64']):
df[col] = pd.to_numeric(df[col], downcast='float')
return df
def detect_outliers(df, threshold=3):
numeric_cols = df.select_dtypes(include=[np.number]).columns
z_scores = (df[numeric_cols] - df[numeric_cols].mean()) / df[numeric_cols].std()
return df[(z_scores > threshold).any(axis=1)]
假设有180家门店的每日销售数据(CSV格式),需要完成: 1. 计算各门店月销售额 2. 识别异常交易 3. 生成可视化报告
# 1. 数据加载
sales_data = []
for file in glob.glob('sales_data/*.csv'):
store_id = file.split('_')[-1].replace('.csv','')
df = pd.read_csv(file, parse_dates=['date'])
df['store_id'] = store_id
sales_data.append(df)
all_sales = pd.concat(sales_data)
# 2. 数据透视
monthly_sales = all_sales.groupby(['store_id', pd.Grouper(key='date', freq='M')])['amount'].sum().unstack()
# 3. 异常检测
anomalies = detect_outliers(all_sales)
# 4. 可视化
plt.figure(figsize=(12,6))
monthly_sales.mean().plot(kind='bar')
plt.title('Average Monthly Sales by Store')
plt.savefig('sales_report.png')
方案 | 10个文件 | 100个文件 | 1000个文件 |
---|---|---|---|
单线程 | 12s | 110s | 18min |
多线程(8) | 3s | 25s | 4min |
Dask集群 | 8s | 35s | 6min |
优化建议: 1. 对于<1GB数据:多线程+内存优化 2. 对于1-10GB数据:Dask单机 3. 对于>10GB数据:Spark集群
def safe_read(file):
try:
return pd.read_csv(file)
except UnicodeDecodeError:
return pd.read_csv(file, encoding='gbk')
# 分块读取
chunk_iter = pd.read_csv('large.csv', chunksize=100000)
for chunk in chunk_iter:
process(chunk)
def read_with_schema(file, expected_columns):
df = pd.read_csv(file)
return df.reindex(columns=expected_columns)
def generate_report(df):
report = {
'missing_values': df.isnull().sum(),
'unique_counts': df.nunique(),
'data_types': df.dtypes
}
return pd.DataFrame(report)
from sqlalchemy import create_engine
engine = create_engine('postgresql://user:pass@localhost/db')
all_sales.to_sql('sales_data', engine, if_exists='append')
数据类型_日期_版本.csv
格式manifest.json
记录文件信息logging
模块记录处理过程# 增量处理示例
processed_files = set()
if os.path.exists('processed.log'):
with open('processed.log') as f:
processed_files.update(f.read().splitlines())
new_files = [f for f in os.listdir() if f not in processed_files]
通过以上方法,您可以构建健壮的批量表格处理流程。实际应用中建议先从100个文件测试开始,逐步优化处理逻辑,最终实现上千文件的自动化处理。 “`
(全文约1950字)
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。