pip install mysql-connector-python
# 或者
pip install pymysql
# 或者
pip install sqlalchemy
import mysql.connector
from mysql.connector import Error
def connect_to_mysql():
try:
# 创建连接
connection = mysql.connector.connect(
host='localhost',
user='your_username',
password='your_password',
database='your_database',
port=3306, # 默认端口
charset='utf8mb4',
collation='utf8mb4_unicode_ci'
)
if connection.is_connected():
print("成功连接到MySQL数据库")
# 获取数据库信息
db_info = connection.get_server_info()
print(f"MySQL服务器版本: {db_info}")
# 创建游标
cursor = connection.cursor()
# 执行查询
cursor.execute("SELECT DATABASE()")
database = cursor.fetchone()
print(f"当前数据库: {database[0]}")
return connection
except Error as e:
print(f"连接错误: {e}")
return None
# 使用连接
connection = connect_to_mysql()
if connection:
# 执行数据库操作...
connection.close()
import pymysql
import pymysql.cursors
def connect_with_pymysql():
connection = pymysql.connect(
host='localhost',
user='your_username',
password='your_password',
database='your_database',
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor # 返回字典格式的结果
)
return connection
class MySQLDatabase:
def __init__(self, config):
self.config = config
self.connection = None
def connect(self):
"""建立数据库连接"""
try:
self.connection = mysql.connector.connect(**self.config)
print("数据库连接成功")
except Error as e:
print(f"连接失败: {e}")
raise
def close(self):
"""关闭数据库连接"""
if self.connection and self.connection.is_connected():
self.connection.close()
print("数据库连接已关闭")
def create_table(self):
"""创建表"""
create_table_query = """
CREATE TABLE IF NOT EXISTS users (
id INT AUTO_INCREMENT PRIMARY KEY,
username VARCHAR(50) NOT NULL UNIQUE,
email VARCHAR(100) NOT NULL UNIQUE,
age INT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
)
"""
cursor = self.connection.cursor()
try:
cursor.execute(create_table_query)
self.connection.commit()
print("表创建成功")
except Error as e:
print(f"创建表失败: {e}")
finally:
cursor.close()
def insert_data(self, data):
"""插入数据"""
insert_query = """
INSERT INTO users (username, email, age)
VALUES (%s, %s, %s)
"""
cursor = self.connection.cursor()
try:
cursor.execute(insert_query, data)
self.connection.commit()
print(f"插入数据成功,ID: {cursor.lastrowid}")
except Error as e:
self.connection.rollback()
print(f"插入数据失败: {e}")
finally:
cursor.close()
def query_data(self, user_id=None):
"""查询数据"""
cursor = self.connection.cursor(dictionary=True) # 返回字典格式
try:
if user_id:
query = "SELECT * FROM users WHERE id = %s"
cursor.execute(query, (user_id,))
else:
query = "SELECT * FROM users ORDER BY created_at DESC"
cursor.execute(query)
results = cursor.fetchall()
if results:
for row in results:
print(f"ID: {row['id']}, 用户名: {row['username']}, 邮箱: {row['email']}, 年龄: {row['age']}")
else:
print("未找到数据")
return results
except Error as e:
print(f"查询失败: {e}")
finally:
cursor.close()
def update_data(self, user_id, update_data):
"""更新数据"""
update_query = """
UPDATE users
SET username = %s, email = %s, age = %s
WHERE id = %s
"""
cursor = self.connection.cursor()
try:
cursor.execute(update_query, (*update_data, user_id))
self.connection.commit()
print(f"更新成功,影响行数: {cursor.rowcount}")
except Error as e:
self.connection.rollback()
print(f"更新失败: {e}")
finally:
cursor.close()
def delete_data(self, user_id):
"""删除数据"""
delete_query = "DELETE FROM users WHERE id = %s"
cursor = self.connection.cursor()
try:
cursor.execute(delete_query, (user_id,))
self.connection.commit()
print(f"删除成功,影响行数: {cursor.rowcount}")
except Error as e:
self.connection.rollback()
print(f"删除失败: {e}")
finally:
cursor.close()
# 使用示例
config = {
'host': 'localhost',
'user': 'your_username',
'password': 'your_password',
'database': 'your_database'
}
db = MySQLDatabase(config)
db.connect()
db.create_table()
db.insert_data(('john_doe', 'john@example.com', 25))
db.query_data()
db.close()
from mysql.connector import pooling
import threading
class ConnectionPool:
def __init__(self, config, pool_size=5):
self.pool = pooling.MySQLConnectionPool(
pool_name="mypool",
pool_size=pool_size,
pool_reset_session=True,
**config
)
def get_connection(self):
"""从连接池获取连接"""
return self.pool.get_connection()
def execute_query(self, query, params=None):
"""执行查询(自动管理连接)"""
connection = self.get_connection()
cursor = connection.cursor(dictionary=True)
try:
cursor.execute(query, params or ())
result = cursor.fetchall()
connection.commit()
return result
except Error as e:
connection.rollback()
print(f"查询执行失败: {e}")
return None
finally:
cursor.close()
connection.close()
# 使用连接池
pool_config = {
'host': 'localhost',
'user': 'your_username',
'password': 'your_password',
'database': 'your_database'
}
pool = ConnectionPool(pool_config)
# 多线程示例
def worker(thread_id):
result = pool.execute_query("SELECT * FROM users LIMIT %s", (5,))
print(f"线程 {thread_id} 查询结果: {len(result)} 条记录")
threads = []
for i in range(3):
t = threading.Thread(target=worker, args=(i,))
threads.append(t)
t.start()
for t in threads:
t.join()
from sqlalchemy import create_engine, Column, Integer, String, DateTime, func
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
# 创建数据库连接
DATABASE_URL = "mysql+mysqlconnector://username:password@localhost/database_name"
engine = create_engine(
DATABASE_URL,
echo=True, # 显示SQL语句(调试用)
pool_size=10,
max_overflow=20,
pool_recycle=3600 # 连接回收时间
)
Base = declarative_base()
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
# 定义模型
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
username = Column(String(50), unique=True, nullable=False)
email = Column(String(100), unique=True, nullable=False)
age = Column(Integer)
created_at = Column(DateTime, default=func.now())
updated_at = Column(DateTime, default=func.now(), onupdate=func.now())
# 创建表
Base.metadata.create_all(bind=engine)
# 使用会话
def get_db():
"""获取数据库会话"""
db = SessionLocal()
try:
yield db
finally:
db.close()
# CRUD操作示例
def create_user(username, email, age):
db = SessionLocal()
try:
user = User(username=username, email=email, age=age)
db.add(user)
db.commit()
db.refresh(user)
return user
except Exception as e:
db.rollback()
raise e
finally:
db.close()
# 配置文件
import os
from dotenv import load_dotenv
load_dotenv() # 加载环境变量
class Config:
# 数据库配置
DB_CONFIG = {
'host': os.getenv('DB_HOST', 'localhost'),
'user': os.getenv('DB_USER', 'root'),
'password': os.getenv('DB_PASSWORD', ''),
'database': os.getenv('DB_NAME', 'test_db'),
'port': int(os.getenv('DB_PORT', 3306)),
'charset': 'utf8mb4',
'use_unicode': True,
'autocommit': False,
'pool_size': 10,
'pool_reset_session': True
}
# 连接超时设置
CONNECTION_TIMEOUT = 30
READ_TIMEOUT = 30
WRITE_TIMEOUT = 30
DB_HOST=localhost
DB_USER=root
DB_PASSWORD=your_password
DB_NAME=your_database
DB_PORT=3306
import mysql.connector
from mysql.connector import Error
class SecureMySQLConnection:
def __init__(self):
self.config = {
'host': 'localhost',
'user': 'app_user', # 使用专用应用用户
'database': 'app_db',
'ssl_disabled': False, # 启用SSL
'connection_timeout': 30
}
def safe_execute(self, query, params=None):
"""安全执行SQL(防止SQL注入)"""
connection = None
cursor = None
try:
# 1. 使用参数化查询
connection = mysql.connector.connect(**self.config)
cursor = connection.cursor(prepared=True) # 预编译语句
# 2. 执行查询
cursor.execute(query, params or ())
# 3. 如果是SELECT,获取结果
if query.strip().upper().startswith('SELECT'):
result = cursor.fetchall()
else:
connection.commit()
result = cursor.rowcount
return result
except Error as e:
print(f"SQL执行错误: {e}")
if connection:
connection.rollback()
raise
finally:
if cursor:
cursor.close()
if connection and connection.is_connected():
connection.close()
def validate_input(self, input_data):
"""输入验证"""
if any(('\'' in str(item) or ';' in str(item)) for item in input_data):
raise ValueError("输入包含非法字符")
return input_data
# 使用示例
db = SecureMySQLConnection()
# 安全插入(使用参数化查询)
user_input = ("john'; DROP TABLE users; --", "john@example.com", 25)
try:
# 验证输入
validated_input = db.validate_input(user_input)
# 安全执行
result = db.safe_execute(
"INSERT INTO users (username, email, age) VALUES (%s, %s, %s)",
validated_input
)
except Exception as e:
print(f"操作失败: {e}")
import logging
from datetime import datetime
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(f'db_log_{datetime.now().strftime("%Y%m%d")}.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
class DatabaseManager:
def __init__(self, config):
self.config = config
self.retry_count = 3
self.retry_delay = 2
def execute_with_retry(self, query, params=None):
"""带重试的查询执行"""
for attempt in range(self.retry_count):
try:
connection = mysql.connector.connect(**self.config)
cursor = connection.cursor()
cursor.execute(query, params or ())
result = cursor.fetchall() if query.strip().upper().startswith('SELECT') else cursor.rowcount
connection.commit()
logger.info(f"查询执行成功: {query[:50]}...")
cursor.close()
connection.close()
return result
except mysql.connector.Error as e:
logger.error(f"数据库错误 (尝试 {attempt+1}/{self.retry_count}): {e}")
if attempt == self.retry_count - 1:
logger.error("达到最大重试次数,操作失败")
raise
time.sleep(self.retry_delay * (attempt + 1))
except Exception as e:
logger.error(f"未知错误: {e}")
raise
finally:
try:
if cursor:
cursor.close()
if connection and connection.is_connected():
connection.close()
except:
pass
"""
MySQL数据库操作模板
使用方法:
1. 修改数据库配置
2. 根据需求选择使用连接池或普通连接
3. 实现具体的业务逻辑
"""
import mysql.connector
from mysql.connector import Error, pooling
from contextlib import contextmanager
import logging
# 配置
DB_CONFIG = {
'host': 'localhost',
'user': 'your_username',
'password': 'your_password',
'database': 'your_database',
'charset': 'utf8mb4'
}
# 日志
logging.basicConfig(level=logging.INFO)
@contextmanager
def get_connection():
"""上下文管理器自动管理连接"""
connection = None
try:
connection = mysql.connector.connect(**DB_CONFIG)
yield connection
except Error as e:
logging.error(f"数据库连接错误: {e}")
raise
finally:
if connection and connection.is_connected():
connection.close()
def execute_query(query, params=None, fetch=True):
"""执行SQL查询"""
with get_connection() as conn:
cursor = conn.cursor(dictionary=True)
try:
cursor.execute(query, params or ())
if fetch and query.strip().upper().startswith('SELECT'):
return cursor.fetchall()
else:
conn.commit()
return cursor.rowcount
except Error as e:
conn.rollback()
logging.error(f"SQL执行错误: {e}")
raise
finally:
cursor.close()
# 使用示例
if __name__ == "__main__":
# 查询示例
users = execute_query("SELECT * FROM users LIMIT %s", (10,))
for user in users:
print(user)
# 插入示例
row_count = execute_query(
"INSERT INTO users (username, email) VALUES (%s, %s)",
('test_user', 'test@example.com'),
fetch=False
)
print(f"插入了 {row_count} 行")
安全注意事项:
性能优化:
资源管理:
错误处理:
这个指南涵盖了从基础连接到高级用法的各个方面。根据你的具体需求选择合适的实现方式。对于简单项目,可以使用基础连接;对于生产环境,建议使用连接池和ORM框架。