SQLite数据库
学生信息管理系统
import sqlite3
from datetime import datetime
class StudentDB:
"""SQLite学生信息管理系统"""
def __init__(self, db_name='school.db'):
self.db_name = db_name
self.init_db()
def init_db(self):
"""初始化数据库"""
with sqlite3.connect(self.db_name) as conn:
cursor = conn.cursor()
# 创建学生表
cursor.execute('''
CREATE TABLE IF NOT EXISTS students (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
age INTEGER,
grade TEXT,
created_at TIMESTAMP
)
''')
# 创建课程成绩表
cursor.execute('''
CREATE TABLE IF NOT EXISTS scores (
id INTEGER PRIMARY KEY AUTOINCREMENT,
student_id INTEGER,
subject TEXT,
score FLOAT,
exam_date DATE,
FOREIGN KEY (student_id) REFERENCES students (id)
)
''')
conn.commit()
def add_student(self, name, age, grade):
"""添加学生"""
with sqlite3.connect(self.db_name) as conn:
cursor = conn.cursor()
cursor.execute(
'INSERT INTO students (name, age, grade, created_at) VALUES (?, ?, ?, ?)',
(name, age, grade, datetime.now())
)
conn.commit()
return cursor.lastrowid
def add_score(self, student_id, subject, score, exam_date):
"""添加成绩"""
with sqlite3.connect(self.db_name) as conn:
cursor = conn.cursor()
cursor.execute(
'INSERT INTO scores (student_id, subject, score, exam_date) VALUES (?, ?, ?, ?)',
(student_id, subject, score, exam_date)
)
conn.commit()
def get_student_scores(self, student_id):
"""获取学生所有成绩"""
with sqlite3.connect(self.db_name) as conn:
cursor = conn.cursor()
cursor.execute('''
SELECT s.name, sc.subject, sc.score, sc.exam_date
FROM students s
JOIN scores sc ON s.id = sc.student_id
WHERE s.id = ?
''', (student_id,))
return cursor.fetchall()
def get_class_ranking(self, subject):
"""获取某科目的班级排名"""
with sqlite3.connect(self.db_name) as conn:
cursor = conn.cursor()
cursor.execute('''
SELECT s.name, sc.score
FROM students s
JOIN scores sc ON s.id = sc.student_id
WHERE sc.subject = ?
ORDER BY sc.score DESC
''', (subject,))
return cursor.fetchall()
# 使用示例
# db = StudentDB()
# student_id = db.add_student("张三", 15, "高一")
# db.add_score(student_id, "数学", 95.5, "2024-03-15")
# scores = db.get_student_scores(student_id)
# rankings = db.get_class_ranking("数学")
MySQL数据库
图书管理系统
import mysql.connector
from datetime import datetime
class LibraryDB:
"""MySQL图书管理系统"""
def __init__(self, host='localhost', user='root', password='', database='library'):
self.config = {
'host': host,
'user': user,
'password': password,
'database': database
}
self.init_db()
def init_db(self):
"""初始化数据库"""
conn = mysql.connector.connect(**self.config)
cursor = conn.cursor()
# 创建图书表
cursor.execute('''
CREATE TABLE IF NOT EXISTS books (
id INT AUTO_INCREMENT PRIMARY KEY,
title VARCHAR(255) NOT NULL,
author VARCHAR(100),
isbn VARCHAR(13) UNIQUE,
category VARCHAR(50),
status ENUM('available', 'borrowed') DEFAULT 'available',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# 创建借阅记录表
cursor.execute('''
CREATE TABLE IF NOT EXISTS borrowings (
id INT AUTO_INCREMENT PRIMARY KEY,
book_id INT,
reader_name VARCHAR(100),
borrow_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
return_date TIMESTAMP NULL,
FOREIGN KEY (book_id) REFERENCES books(id)
)
''')
conn.commit()
cursor.close()
conn.close()
def add_book(self, title, author, isbn, category):
"""添加图书"""
conn = mysql.connector.connect(**self.config)
cursor = conn.cursor()
cursor.execute('''
INSERT INTO books (title, author, isbn, category)
VALUES (%s, %s, %s, %s)
''', (title, author, isbn, category))
book_id = cursor.lastrowid
conn.commit()
cursor.close()
conn.close()
return book_id
def borrow_book(self, book_id, reader_name):
"""借阅图书"""
conn = mysql.connector.connect(**self.config)
cursor = conn.cursor()
try:
# 检查图书是否可借
cursor.execute(
'SELECT status FROM books WHERE id = %s',
(book_id,)
)
result = cursor.fetchone()
if not result or result[0] != 'available':
raise Exception("图书不可借")
# 更新图书状态
cursor.execute('''
UPDATE books SET status = 'borrowed'
WHERE id = %s
''', (book_id,))
# 创建借阅记录
cursor.execute('''
INSERT INTO borrowings (book_id, reader_name)
VALUES (%s, %s)
''', (book_id, reader_name))
conn.commit()
return True
except Exception as e:
conn.rollback()
raise e
finally:
cursor.close()
conn.close()
def return_book(self, book_id):
"""归还图书"""
conn = mysql.connector.connect(**self.config)
cursor = conn.cursor()
try:
# 更新图书状态
cursor.execute('''
UPDATE books SET status = 'available'
WHERE id = %s
''', (book_id,))
# 更新借阅记录
cursor.execute('''
UPDATE borrowings
SET return_date = CURRENT_TIMESTAMP
WHERE book_id = %s AND return_date IS NULL
''', (book_id,))
conn.commit()
return True
except Exception as e:
conn.rollback()
raise e
finally:
cursor.close()
conn.close()
def get_book_history(self, book_id):
"""获取图书借阅历史"""
conn = mysql.connector.connect(**self.config)
cursor = conn.cursor(dictionary=True)
cursor.execute('''
SELECT b.title, br.reader_name,
br.borrow_date, br.return_date
FROM books b
JOIN borrowings br ON b.id = br.book_id
WHERE b.id = %s
ORDER BY br.borrow_date DESC
''', (book_id,))
history = cursor.fetchall()
cursor.close()
conn.close()
return history
# 使用示例
# db = LibraryDB(password='your_password')
# book_id = db.add_book("Python编程", "张三", "9787111111111", "计算机")
# db.borrow_book(book_id, "李四")
# history = db.get_book_history(book_id)
ORM框架
SQLAlchemy示例
from sqlalchemy import create_engine, Column, Integer, String, Float, DateTime, ForeignKey
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, relationship
from datetime import datetime
# 创建基类
Base = declarative_base()
class Product(Base):
"""商品表"""
__tablename__ = 'products'
id = Column(Integer, primary_key=True)
name = Column(String(100), nullable=False)
price = Column(Float, nullable=False)
stock = Column(Integer, default=0)
created_at = Column(DateTime, default=datetime.now)
# 建立与订单项的关系
order_items = relationship("OrderItem", back_populates="product")
class Order(Base):
"""订单表"""
__tablename__ = 'orders'
id = Column(Integer, primary_key=True)
user_name = Column(String(100), nullable=False)
total_amount = Column(Float, default=0)
status = Column(String(20), default='pending')
created_at = Column(DateTime, default=datetime.now)
# 建立与订单项的关系
items = relationship("OrderItem", back_populates="order")
class OrderItem(Base):
"""订单项表"""
__tablename__ = 'order_items'
id = Column(Integer, primary_key=True)
order_id = Column(Integer, ForeignKey('orders.id'))
product_id = Column(Integer, ForeignKey('products.id'))
quantity = Column(Integer, nullable=False)
price = Column(Float, nullable=False)
# 建立与订单和商品的关系
order = relationship("Order", back_populates="items")
product = relationship("Product", back_populates="order_items")
class ECommerceDB:
"""电商系统数据库操作"""
def __init__(self, db_url='sqlite:///ecommerce.db'):
self.engine = create_engine(db_url)
Base.metadata.create_all(self.engine)
self.Session = sessionmaker(bind=self.engine)
def add_product(self, name, price, stock):
"""添加商品"""
session = self.Session()
try:
product = Product(name=name, price=price, stock=stock)
session.add(product)
session.commit()
return product.id
finally:
session.close()
def create_order(self, user_name, items):
"""创建订单"""
session = self.Session()
try:
# 创建订单
order = Order(user_name=user_name)
session.add(order)
total_amount = 0
# 添加订单项
for product_id, quantity in items:
product = session.query(Product).get(product_id)
if not product or product.stock < quantity:
raise Exception(f"商品{product_id}库存不足")
# 创建订单项
order_item = OrderItem(
order=order,
product=product,
quantity=quantity,
price=product.price
)
session.add(order_item)
# 更新库存
product.stock -= quantity
total_amount += product.price * quantity
# 更新订单总金额
order.total_amount = total_amount
session.commit()
return order.id
except Exception as e:
session.rollback()
raise e
finally:
session.close()
def get_order_details(self, order_id):
"""获取订单详情"""
session = self.Session()
try:
order = session.query(Order).get(order_id)
if not order:
return None
details = {
'order_id': order.id,
'user_name': order.user_name,
'total_amount': order.total_amount,
'status': order.status,
'created_at': order.created_at,
'items': []
}
for item in order.items:
details['items'].append({
'product_name': item.product.name,
'quantity': item.quantity,
'price': item.price
})
return details
finally:
session.close()
# 使用示例
# db = ECommerceDB()
# product_id = db.add_product("手机", 2999.00, 100)
# order_id = db.create_order("张三", [(product_id, 2)])
# order_details = db.get_order_details(order_id)
实际应用
数据库连接池
from dbutils.pooled_db import PooledDB
import mysql.connector
import threading
class DatabasePool:
"""数据库连接池实现"""
_instance = None
_lock = threading.Lock()
def __new__(cls, *args, **kwargs):
if not cls._instance:
with cls._lock:
if not cls._instance:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self, host='localhost', user='root', password='', database='test',
min_connections=1, max_connections=10):
if not hasattr(self, 'pool'):
self.pool = PooledDB(
creator=mysql.connector,
maxconnections=max_connections,
mincached=min_connections,
host=host,
user=user,
password=password,
database=database,
autocommit=True
)
def get_connection(self):
"""获取数据库连接"""
return self.pool.connection()
def execute_query(self, sql, params=None):
"""执行查询"""
with self.get_connection() as conn:
cursor = conn.cursor(dictionary=True)
cursor.execute(sql, params or ())
return cursor.fetchall()
def execute_update(self, sql, params=None):
"""执行更新"""
with self.get_connection() as conn:
cursor = conn.cursor()
cursor.execute(sql, params or ())
conn.commit()
return cursor.rowcount
# 使用示例
# pool = DatabasePool(password='your_password')
# results = pool.execute_query('SELECT * FROM users WHERE age > %s', (18,))
# affected_rows = pool.execute_update(
# 'UPDATE users SET status = %s WHERE id = %s',
# ('active', 1)
# )