Python如何利用txt文件对Mysql进行增删改查移

发布时间:2021-12-30 16:06:04 作者:小新
来源:亿速云 阅读:165
# Python如何利用txt文件对MySQL进行增删改查移

## 目录
1. [引言](#引言)
2. [环境准备](#环境准备)
3. [连接MySQL数据库](#连接mysql数据库)
4. [从txt文件读取数据](#从txt文件读取数据)
5. [增删改查移操作详解](#增删改查移操作详解)
   - [5.1 插入数据](#51-插入数据)
   - [5.2 删除数据](#52-删除数据)
   - [5.3 更新数据](#53-更新数据)
   - [5.4 查询数据](#54-查询数据)
   - [5.5 数据迁移](#55-数据迁移)
6. [完整代码示例](#完整代码示例)
7. [性能优化建议](#性能优化建议)
8. [常见问题解决](#常见问题解决)
9. [总结](#总结)

## 引言
在数据处理和管理中,将文本文件(txt)与数据库(如MySQL)结合使用是常见场景。Python凭借其简洁语法和丰富库支持,成为实现这种数据流转的理想工具。本文将详细介绍如何使用Python读取txt文件内容,并对MySQL数据库执行增删改查移(CRUDM)操作。

## 环境准备
在开始前,请确保已安装以下组件:
```python
# 必需Python库
pip install mysql-connector-python pymysql

同时需要: - MySQL服务器(本地或远程) - 具有操作权限的数据库账号 - 文本编辑器或IDE(推荐VS Code/PyCharm)

连接MySQL数据库

首先建立Python与MySQL的连接:

import mysql.connector

def create_connection():
    try:
        conn = mysql.connector.connect(
            host="localhost",
            user="your_username",
            password="your_password",
            database="your_database"
        )
        print("MySQL连接成功")
        return conn
    except mysql.connector.Error as e:
        print(f"连接错误: {e}")
        return None

从txt文件读取数据

假设我们有一个data.txt文件,格式如下:

id,name,age,email
1,张三,25,zhangsan@example.com
2,李四,30,lisi@example.com

读取方法:

def read_txt_file(file_path):
    data = []
    with open(file_path, 'r', encoding='utf-8') as file:
        headers = file.readline().strip().split(',')
        for line in file:
            values = line.strip().split(',')
            data.append(dict(zip(headers, values)))
    return data

增删改查移操作详解

5.1 插入数据

批量插入txt文件中的数据到MySQL:

def insert_data(conn, data):
    cursor = conn.cursor()
    sql = "INSERT INTO users (id, name, age, email) VALUES (%s, %s, %s, %s)"
    
    try:
        cursor.executemany(sql, [(d['id'], d['name'], d['age'], d['email']) for d in data])
        conn.commit()
        print(f"成功插入 {cursor.rowcount} 条记录")
    except mysql.connector.Error as e:
        conn.rollback()
        print(f"插入失败: {e}")
    finally:
        cursor.close()

5.2 删除数据

根据txt文件中的ID列表删除记录:

def delete_data(conn, id_list):
    cursor = conn.cursor()
    sql = "DELETE FROM users WHERE id = %s"
    
    try:
        cursor.executemany(sql, [(id,) for id in id_list])
        conn.commit()
        print(f"成功删除 {cursor.rowcount} 条记录")
    except mysql.connector.Error as e:
        conn.rollback()
        print(f"删除失败: {e}")
    finally:
        cursor.close()

5.3 更新数据

使用txt文件数据更新数据库:

def update_data(conn, data):
    cursor = conn.cursor()
    sql = """UPDATE users 
             SET name = %s, age = %s, email = %s 
             WHERE id = %s"""
    
    try:
        cursor.executemany(sql, [(d['name'], d['age'], d['email'], d['id']) for d in data])
        conn.commit()
        print(f"成功更新 {cursor.rowcount} 条记录")
    except mysql.connector.Error as e:
        conn.rollback()
        print(f"更新失败: {e}")
    finally:
        cursor.close()

5.4 查询数据

将查询结果导出到txt文件:

def export_to_txt(conn, output_file):
    cursor = conn.cursor(dictionary=True)
    sql = "SELECT * FROM users"
    
    try:
        cursor.execute(sql)
        results = cursor.fetchall()
        
        with open(output_file, 'w', encoding='utf-8') as f:
            # 写入表头
            if results:
                headers = results[0].keys()
                f.write(','.join(headers) + '\n')
            
            # 写入数据
            for row in results:
                f.write(','.join(str(v) for v in row.values()) + '\n')
        
        print(f"成功导出 {len(results)} 条记录到 {output_file}")
    except mysql.connector.Error as e:
        print(f"导出失败: {e}")
    finally:
        cursor.close()

5.5 数据迁移

将数据从源表迁移到目标表:

def migrate_data(conn, source_table, target_table):
    cursor = conn.cursor()
    
    try:
        # 1. 创建目标表(如果不存在)
        cursor.execute(f"CREATE TABLE IF NOT EXISTS {target_table} LIKE {source_table}")
        
        # 2. 迁移数据
        cursor.execute(f"INSERT INTO {target_table} SELECT * FROM {source_table}")
        conn.commit()
        print(f"成功从 {source_table} 迁移 {cursor.rowcount} 条数据到 {target_table}")
    except mysql.connector.Error as e:
        conn.rollback()
        print(f"迁移失败: {e}")
    finally:
        cursor.close()

完整代码示例

以下是一个整合所有功能的完整示例:

import mysql.connector
from typing import List, Dict

class MySQLTxtHandler:
    def __init__(self, host, user, password, database):
        self.connection = self._create_connection(host, user, password, database)
    
    @staticmethod
    def _create_connection(host, user, password, database):
        try:
            conn = mysql.connector.connect(
                host=host,
                user=user,
                password=password,
                database=database
            )
            print("MySQL连接成功")
            return conn
        except mysql.connector.Error as e:
            print(f"连接错误: {e}")
            return None
    
    def read_txt(self, file_path: str) -> List[Dict]:
        data = []
        with open(file_path, 'r', encoding='utf-8') as file:
            headers = file.readline().strip().split(',')
            for line in file:
                values = line.strip().split(',')
                data.append(dict(zip(headers, values)))
        return data
    
    def write_txt(self, data: List[Dict], output_file: str):
        with open(output_file, 'w', encoding='utf-8') as f:
            if data:
                headers = data[0].keys()
                f.write(','.join(headers) + '\n')
            
            for row in data:
                f.write(','.join(str(v) for v in row.values()) + '\n')
    
    def execute_query(self, query: str, params=None, fetch=True):
        cursor = self.connection.cursor(dictionary=True)
        try:
            cursor.execute(query, params or ())
            if fetch:
                return cursor.fetchall()
            else:
                self.connection.commit()
                return cursor.rowcount
        except mysql.connector.Error as e:
            self.connection.rollback()
            print(f"查询执行失败: {e}")
            return None
        finally:
            cursor.close()
    
    def close(self):
        if self.connection:
            self.connection.close()
            print("MySQL连接已关闭")

# 使用示例
if __name__ == "__main__":
    handler = MySQLTxtHandler(
        host="localhost",
        user="root",
        password="password",
        database="test_db"
    )
    
    # 1. 从txt导入数据
    data = handler.read_txt("input_data.txt")
    insert_sql = "INSERT INTO users (id, name, age, email) VALUES (%(id)s, %(name)s, %(age)s, %(email)s)"
    handler.execute_query(insert_sql, data, fetch=False)
    
    # 2. 导出查询结果到txt
    results = handler.execute_query("SELECT * FROM users WHERE age > 25")
    handler.write_txt(results, "older_users.txt")
    
    # 3. 关闭连接
    handler.close()

性能优化建议

  1. 批量操作:使用executemany()而非循环执行单条SQL
  2. 事务管理:合理使用事务减少I/O开销
  3. 索引优化:确保查询字段已建立适当索引
  4. 连接池:考虑使用mysql-connector-pool处理高并发
  5. 缓冲区调整:增大max_allowed_packet参数处理大数据量

常见问题解决

Q1: 中文乱码问题 - 确保数据库、表和连接都使用UTF-8编码 - 连接字符串添加charset='utf8mb4'参数

Q2: 大文件处理内存不足 - 使用文件逐行读取而非一次性加载

def batch_insert_from_large_file(conn, file_path, batch_size=1000):
    cursor = conn.cursor()
    with open(file_path, 'r', encoding='utf-8') as f:
        headers = f.readline().strip().split(',')
        batch = []
        
        for line in f:
            values = line.strip().split(',')
            batch.append(values)
            
            if len(batch) >= batch_size:
                cursor.executemany(insert_sql, batch)
                conn.commit()
                batch = []
        
        if batch:  # 处理剩余记录
            cursor.executemany(insert_sql, batch)
            conn.commit()
    
    cursor.close()

Q3: 数据类型转换问题 - 在读取txt后添加类型转换逻辑:

def convert_types(row):
    return {
        'id': int(row['id']),
        'name': str(row['name']),
        'age': int(row['age']),
        'email': str(row['email'])
    }

总结

本文详细介绍了Python通过txt文件操作MySQL数据库的全流程,包括: 1. 建立可靠数据库连接 2. 高效读写文本文件数据 3. 实现完整的CRUDM操作 4. 处理常见性能问题和异常情况

通过合理组合这些技术,可以构建健壮的数据处理管道,满足各种业务场景需求。实际应用中,建议根据具体需求调整实现细节,并始终注意数据安全和操作原子性。

扩展阅读方向: - 使用ORM工具(如SQLAlchemy)简化操作 - 结合Pandas处理复杂数据转换 - 实现自动化ETL流程 - 数据库备份与恢复策略 “`

推荐阅读:
  1. 如何使用php对txt文件进行修改
  2. 使用python对 MySQL 数据库进行增删改查操作

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

python mysql txt

上一篇:C++中Qt如何绘制时钟界面

下一篇:Python如何实现特定场景去除高光算法

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》