欢迎光临易鼎网
详情描述

1. 环境准备

安装必要的库

pip install mysql-connector-python
# 或者
pip install pymysql
# 或者
pip install sqlalchemy

2. 基础连接示例

使用 mysql-connector-python

import mysql.connector
from mysql.connector import Error

def connect_to_mysql():
    try:
        # 创建连接
        connection = mysql.connector.connect(
            host='localhost',
            user='your_username',
            password='your_password',
            database='your_database',
            port=3306,  # 默认端口
            charset='utf8mb4',
            collation='utf8mb4_unicode_ci'
        )

        if connection.is_connected():
            print("成功连接到MySQL数据库")

            # 获取数据库信息
            db_info = connection.get_server_info()
            print(f"MySQL服务器版本: {db_info}")

            # 创建游标
            cursor = connection.cursor()

            # 执行查询
            cursor.execute("SELECT DATABASE()")
            database = cursor.fetchone()
            print(f"当前数据库: {database[0]}")

        return connection

    except Error as e:
        print(f"连接错误: {e}")
        return None

# 使用连接
connection = connect_to_mysql()
if connection:
    # 执行数据库操作...
    connection.close()

使用 pymysql

import pymysql
import pymysql.cursors

def connect_with_pymysql():
    connection = pymysql.connect(
        host='localhost',
        user='your_username',
        password='your_password',
        database='your_database',
        charset='utf8mb4',
        cursorclass=pymysql.cursors.DictCursor  # 返回字典格式的结果
    )

    return connection

3. 完整的CRUD操作示例

class MySQLDatabase:
    def __init__(self, config):
        self.config = config
        self.connection = None

    def connect(self):
        """建立数据库连接"""
        try:
            self.connection = mysql.connector.connect(**self.config)
            print("数据库连接成功")
        except Error as e:
            print(f"连接失败: {e}")
            raise

    def close(self):
        """关闭数据库连接"""
        if self.connection and self.connection.is_connected():
            self.connection.close()
            print("数据库连接已关闭")

    def create_table(self):
        """创建表"""
        create_table_query = """
        CREATE TABLE IF NOT EXISTS users (
            id INT AUTO_INCREMENT PRIMARY KEY,
            username VARCHAR(50) NOT NULL UNIQUE,
            email VARCHAR(100) NOT NULL UNIQUE,
            age INT,
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
        )
        """

        cursor = self.connection.cursor()
        try:
            cursor.execute(create_table_query)
            self.connection.commit()
            print("表创建成功")
        except Error as e:
            print(f"创建表失败: {e}")
        finally:
            cursor.close()

    def insert_data(self, data):
        """插入数据"""
        insert_query = """
        INSERT INTO users (username, email, age) 
        VALUES (%s, %s, %s)
        """

        cursor = self.connection.cursor()
        try:
            cursor.execute(insert_query, data)
            self.connection.commit()
            print(f"插入数据成功,ID: {cursor.lastrowid}")
        except Error as e:
            self.connection.rollback()
            print(f"插入数据失败: {e}")
        finally:
            cursor.close()

    def query_data(self, user_id=None):
        """查询数据"""
        cursor = self.connection.cursor(dictionary=True)  # 返回字典格式

        try:
            if user_id:
                query = "SELECT * FROM users WHERE id = %s"
                cursor.execute(query, (user_id,))
            else:
                query = "SELECT * FROM users ORDER BY created_at DESC"
                cursor.execute(query)

            results = cursor.fetchall()

            if results:
                for row in results:
                    print(f"ID: {row['id']}, 用户名: {row['username']}, 邮箱: {row['email']}, 年龄: {row['age']}")
            else:
                print("未找到数据")

            return results

        except Error as e:
            print(f"查询失败: {e}")
        finally:
            cursor.close()

    def update_data(self, user_id, update_data):
        """更新数据"""
        update_query = """
        UPDATE users 
        SET username = %s, email = %s, age = %s 
        WHERE id = %s
        """

        cursor = self.connection.cursor()
        try:
            cursor.execute(update_query, (*update_data, user_id))
            self.connection.commit()
            print(f"更新成功,影响行数: {cursor.rowcount}")
        except Error as e:
            self.connection.rollback()
            print(f"更新失败: {e}")
        finally:
            cursor.close()

    def delete_data(self, user_id):
        """删除数据"""
        delete_query = "DELETE FROM users WHERE id = %s"

        cursor = self.connection.cursor()
        try:
            cursor.execute(delete_query, (user_id,))
            self.connection.commit()
            print(f"删除成功,影响行数: {cursor.rowcount}")
        except Error as e:
            self.connection.rollback()
            print(f"删除失败: {e}")
        finally:
            cursor.close()

# 使用示例
config = {
    'host': 'localhost',
    'user': 'your_username',
    'password': 'your_password',
    'database': 'your_database'
}

db = MySQLDatabase(config)
db.connect()
db.create_table()
db.insert_data(('john_doe', 'john@example.com', 25))
db.query_data()
db.close()

4. 使用连接池(推荐用于生产环境)

from mysql.connector import pooling
import threading

class ConnectionPool:
    def __init__(self, config, pool_size=5):
        self.pool = pooling.MySQLConnectionPool(
            pool_name="mypool",
            pool_size=pool_size,
            pool_reset_session=True,
            **config
        )

    def get_connection(self):
        """从连接池获取连接"""
        return self.pool.get_connection()

    def execute_query(self, query, params=None):
        """执行查询(自动管理连接)"""
        connection = self.get_connection()
        cursor = connection.cursor(dictionary=True)

        try:
            cursor.execute(query, params or ())
            result = cursor.fetchall()
            connection.commit()
            return result
        except Error as e:
            connection.rollback()
            print(f"查询执行失败: {e}")
            return None
        finally:
            cursor.close()
            connection.close()

# 使用连接池
pool_config = {
    'host': 'localhost',
    'user': 'your_username',
    'password': 'your_password',
    'database': 'your_database'
}

pool = ConnectionPool(pool_config)

# 多线程示例
def worker(thread_id):
    result = pool.execute_query("SELECT * FROM users LIMIT %s", (5,))
    print(f"线程 {thread_id} 查询结果: {len(result)} 条记录")

threads = []
for i in range(3):
    t = threading.Thread(target=worker, args=(i,))
    threads.append(t)
    t.start()

for t in threads:
    t.join()

5. 使用SQLAlchemy(ORM方式)

from sqlalchemy import create_engine, Column, Integer, String, DateTime, func
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

# 创建数据库连接
DATABASE_URL = "mysql+mysqlconnector://username:password@localhost/database_name"
engine = create_engine(
    DATABASE_URL,
    echo=True,  # 显示SQL语句(调试用)
    pool_size=10,
    max_overflow=20,
    pool_recycle=3600  # 连接回收时间
)

Base = declarative_base()
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

# 定义模型
class User(Base):
    __tablename__ = "users"

    id = Column(Integer, primary_key=True, index=True)
    username = Column(String(50), unique=True, nullable=False)
    email = Column(String(100), unique=True, nullable=False)
    age = Column(Integer)
    created_at = Column(DateTime, default=func.now())
    updated_at = Column(DateTime, default=func.now(), onupdate=func.now())

# 创建表
Base.metadata.create_all(bind=engine)

# 使用会话
def get_db():
    """获取数据库会话"""
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

# CRUD操作示例
def create_user(username, email, age):
    db = SessionLocal()
    try:
        user = User(username=username, email=email, age=age)
        db.add(user)
        db.commit()
        db.refresh(user)
        return user
    except Exception as e:
        db.rollback()
        raise e
    finally:
        db.close()

6. 配置文件管理

config.py

# 配置文件
import os
from dotenv import load_dotenv

load_dotenv()  # 加载环境变量

class Config:
    # 数据库配置
    DB_CONFIG = {
        'host': os.getenv('DB_HOST', 'localhost'),
        'user': os.getenv('DB_USER', 'root'),
        'password': os.getenv('DB_PASSWORD', ''),
        'database': os.getenv('DB_NAME', 'test_db'),
        'port': int(os.getenv('DB_PORT', 3306)),
        'charset': 'utf8mb4',
        'use_unicode': True,
        'autocommit': False,
        'pool_size': 10,
        'pool_reset_session': True
    }

    # 连接超时设置
    CONNECTION_TIMEOUT = 30
    READ_TIMEOUT = 30
    WRITE_TIMEOUT = 30

.env文件

DB_HOST=localhost
DB_USER=root
DB_PASSWORD=your_password
DB_NAME=your_database
DB_PORT=3306

7. 安全最佳实践

import mysql.connector
from mysql.connector import Error

class SecureMySQLConnection:
    def __init__(self):
        self.config = {
            'host': 'localhost',
            'user': 'app_user',  # 使用专用应用用户
            'database': 'app_db',
            'ssl_disabled': False,  # 启用SSL
            'connection_timeout': 30
        }

    def safe_execute(self, query, params=None):
        """安全执行SQL(防止SQL注入)"""
        connection = None
        cursor = None

        try:
            # 1. 使用参数化查询
            connection = mysql.connector.connect(**self.config)
            cursor = connection.cursor(prepared=True)  # 预编译语句

            # 2. 执行查询
            cursor.execute(query, params or ())

            # 3. 如果是SELECT,获取结果
            if query.strip().upper().startswith('SELECT'):
                result = cursor.fetchall()
            else:
                connection.commit()
                result = cursor.rowcount

            return result

        except Error as e:
            print(f"SQL执行错误: {e}")
            if connection:
                connection.rollback()
            raise

        finally:
            if cursor:
                cursor.close()
            if connection and connection.is_connected():
                connection.close()

    def validate_input(self, input_data):
        """输入验证"""
        if any(('\'' in str(item) or ';' in str(item)) for item in input_data):
            raise ValueError("输入包含非法字符")
        return input_data

# 使用示例
db = SecureMySQLConnection()

# 安全插入(使用参数化查询)
user_input = ("john'; DROP TABLE users; --", "john@example.com", 25)
try:
    # 验证输入
    validated_input = db.validate_input(user_input)

    # 安全执行
    result = db.safe_execute(
        "INSERT INTO users (username, email, age) VALUES (%s, %s, %s)",
        validated_input
    )
except Exception as e:
    print(f"操作失败: {e}")

8. 错误处理和日志记录

import logging
from datetime import datetime

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler(f'db_log_{datetime.now().strftime("%Y%m%d")}.log'),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)

class DatabaseManager:
    def __init__(self, config):
        self.config = config
        self.retry_count = 3
        self.retry_delay = 2

    def execute_with_retry(self, query, params=None):
        """带重试的查询执行"""
        for attempt in range(self.retry_count):
            try:
                connection = mysql.connector.connect(**self.config)
                cursor = connection.cursor()

                cursor.execute(query, params or ())
                result = cursor.fetchall() if query.strip().upper().startswith('SELECT') else cursor.rowcount

                connection.commit()
                logger.info(f"查询执行成功: {query[:50]}...")

                cursor.close()
                connection.close()

                return result

            except mysql.connector.Error as e:
                logger.error(f"数据库错误 (尝试 {attempt+1}/{self.retry_count}): {e}")

                if attempt == self.retry_count - 1:
                    logger.error("达到最大重试次数,操作失败")
                    raise

                time.sleep(self.retry_delay * (attempt + 1))

            except Exception as e:
                logger.error(f"未知错误: {e}")
                raise
            finally:
                try:
                    if cursor:
                        cursor.close()
                    if connection and connection.is_connected():
                        connection.close()
                except:
                    pass

9. 快速开始模板

"""
MySQL数据库操作模板
使用方法:
1. 修改数据库配置
2. 根据需求选择使用连接池或普通连接
3. 实现具体的业务逻辑
"""

import mysql.connector
from mysql.connector import Error, pooling
from contextlib import contextmanager
import logging

# 配置
DB_CONFIG = {
    'host': 'localhost',
    'user': 'your_username',
    'password': 'your_password',
    'database': 'your_database',
    'charset': 'utf8mb4'
}

# 日志
logging.basicConfig(level=logging.INFO)

@contextmanager
def get_connection():
    """上下文管理器自动管理连接"""
    connection = None
    try:
        connection = mysql.connector.connect(**DB_CONFIG)
        yield connection
    except Error as e:
        logging.error(f"数据库连接错误: {e}")
        raise
    finally:
        if connection and connection.is_connected():
            connection.close()

def execute_query(query, params=None, fetch=True):
    """执行SQL查询"""
    with get_connection() as conn:
        cursor = conn.cursor(dictionary=True)
        try:
            cursor.execute(query, params or ())
            if fetch and query.strip().upper().startswith('SELECT'):
                return cursor.fetchall()
            else:
                conn.commit()
                return cursor.rowcount
        except Error as e:
            conn.rollback()
            logging.error(f"SQL执行错误: {e}")
            raise
        finally:
            cursor.close()

# 使用示例
if __name__ == "__main__":
    # 查询示例
    users = execute_query("SELECT * FROM users LIMIT %s", (10,))
    for user in users:
        print(user)

    # 插入示例
    row_count = execute_query(
        "INSERT INTO users (username, email) VALUES (%s, %s)",
        ('test_user', 'test@example.com'),
        fetch=False
    )
    print(f"插入了 {row_count} 行")

重要提示

安全注意事项

  • 永远不要拼接SQL字符串
  • 使用参数化查询防止SQL注入
  • 限制数据库用户的权限
  • 生产环境使用SSL连接

性能优化

  • 使用连接池管理连接
  • 合理设置连接超时时间
  • 使用索引优化查询
  • 批量操作时使用executemany()

资源管理

  • 确保连接正确关闭
  • 使用上下文管理器(with语句)
  • 及时释放游标和连接

错误处理

  • 实现完整的异常处理
  • 添加重试机制
  • 记录详细的日志

这个指南涵盖了从基础连接到高级用法的各个方面。根据你的具体需求选择合适的实现方式。对于简单项目,可以使用基础连接;对于生产环境,建议使用连接池和ORM框架。