Python数据库编程示例

本页面提供了Python数据库编程的实用示例,包括SQLite、MySQL的基本操作、ORM框架使用等内容。这些示例将帮助你更好地理解和应用数据库编程的概念,开发数据库应用程序。

SQLite数据库

学生信息管理系统


import sqlite3
from datetime import datetime

class StudentDB:
    """SQLite学生信息管理系统"""
    
    def __init__(self, db_name='school.db'):
        self.db_name = db_name
        self.init_db()
    
    def init_db(self):
        """初始化数据库"""
        with sqlite3.connect(self.db_name) as conn:
            cursor = conn.cursor()
            
            # 创建学生表
            cursor.execute('''
                CREATE TABLE IF NOT EXISTS students (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    name TEXT NOT NULL,
                    age INTEGER,
                    grade TEXT,
                    created_at TIMESTAMP
                )
            ''')
            
            # 创建课程成绩表
            cursor.execute('''
                CREATE TABLE IF NOT EXISTS scores (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    student_id INTEGER,
                    subject TEXT,
                    score FLOAT,
                    exam_date DATE,
                    FOREIGN KEY (student_id) REFERENCES students (id)
                )
            ''')
            
            conn.commit()
    
    def add_student(self, name, age, grade):
        """添加学生"""
        with sqlite3.connect(self.db_name) as conn:
            cursor = conn.cursor()
            cursor.execute(
                'INSERT INTO students (name, age, grade, created_at) VALUES (?, ?, ?, ?)',
                (name, age, grade, datetime.now())
            )
            conn.commit()
            return cursor.lastrowid
    
    def add_score(self, student_id, subject, score, exam_date):
        """添加成绩"""
        with sqlite3.connect(self.db_name) as conn:
            cursor = conn.cursor()
            cursor.execute(
                'INSERT INTO scores (student_id, subject, score, exam_date) VALUES (?, ?, ?, ?)',
                (student_id, subject, score, exam_date)
            )
            conn.commit()
    
    def get_student_scores(self, student_id):
        """获取学生所有成绩"""
        with sqlite3.connect(self.db_name) as conn:
            cursor = conn.cursor()
            cursor.execute('''
                SELECT s.name, sc.subject, sc.score, sc.exam_date
                FROM students s
                JOIN scores sc ON s.id = sc.student_id
                WHERE s.id = ?
            ''', (student_id,))
            return cursor.fetchall()
    
    def get_class_ranking(self, subject):
        """获取某科目的班级排名"""
        with sqlite3.connect(self.db_name) as conn:
            cursor = conn.cursor()
            cursor.execute('''
                SELECT s.name, sc.score
                FROM students s
                JOIN scores sc ON s.id = sc.student_id
                WHERE sc.subject = ?
                ORDER BY sc.score DESC
            ''', (subject,))
            return cursor.fetchall()

# 使用示例
# db = StudentDB()
# student_id = db.add_student("张三", 15, "高一")
# db.add_score(student_id, "数学", 95.5, "2024-03-15")
# scores = db.get_student_scores(student_id)
# rankings = db.get_class_ranking("数学")
                    

MySQL数据库

图书管理系统


import mysql.connector
from datetime import datetime

class LibraryDB:
    """MySQL图书管理系统"""
    
    def __init__(self, host='localhost', user='root', password='', database='library'):
        self.config = {
            'host': host,
            'user': user,
            'password': password,
            'database': database
        }
        self.init_db()
    
    def init_db(self):
        """初始化数据库"""
        conn = mysql.connector.connect(**self.config)
        cursor = conn.cursor()
        
        # 创建图书表
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS books (
                id INT AUTO_INCREMENT PRIMARY KEY,
                title VARCHAR(255) NOT NULL,
                author VARCHAR(100),
                isbn VARCHAR(13) UNIQUE,
                category VARCHAR(50),
                status ENUM('available', 'borrowed') DEFAULT 'available',
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            )
        ''')
        
        # 创建借阅记录表
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS borrowings (
                id INT AUTO_INCREMENT PRIMARY KEY,
                book_id INT,
                reader_name VARCHAR(100),
                borrow_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                return_date TIMESTAMP NULL,
                FOREIGN KEY (book_id) REFERENCES books(id)
            )
        ''')
        
        conn.commit()
        cursor.close()
        conn.close()
    
    def add_book(self, title, author, isbn, category):
        """添加图书"""
        conn = mysql.connector.connect(**self.config)
        cursor = conn.cursor()
        
        cursor.execute('''
            INSERT INTO books (title, author, isbn, category)
            VALUES (%s, %s, %s, %s)
        ''', (title, author, isbn, category))
        
        book_id = cursor.lastrowid
        conn.commit()
        cursor.close()
        conn.close()
        return book_id
    
    def borrow_book(self, book_id, reader_name):
        """借阅图书"""
        conn = mysql.connector.connect(**self.config)
        cursor = conn.cursor()
        
        try:
            # 检查图书是否可借
            cursor.execute(
                'SELECT status FROM books WHERE id = %s',
                (book_id,)
            )
            result = cursor.fetchone()
            
            if not result or result[0] != 'available':
                raise Exception("图书不可借")
            
            # 更新图书状态
            cursor.execute('''
                UPDATE books SET status = 'borrowed'
                WHERE id = %s
            ''', (book_id,))
            
            # 创建借阅记录
            cursor.execute('''
                INSERT INTO borrowings (book_id, reader_name)
                VALUES (%s, %s)
            ''', (book_id, reader_name))
            
            conn.commit()
            return True
            
        except Exception as e:
            conn.rollback()
            raise e
        
        finally:
            cursor.close()
            conn.close()
    
    def return_book(self, book_id):
        """归还图书"""
        conn = mysql.connector.connect(**self.config)
        cursor = conn.cursor()
        
        try:
            # 更新图书状态
            cursor.execute('''
                UPDATE books SET status = 'available'
                WHERE id = %s
            ''', (book_id,))
            
            # 更新借阅记录
            cursor.execute('''
                UPDATE borrowings 
                SET return_date = CURRENT_TIMESTAMP
                WHERE book_id = %s AND return_date IS NULL
            ''', (book_id,))
            
            conn.commit()
            return True
            
        except Exception as e:
            conn.rollback()
            raise e
        
        finally:
            cursor.close()
            conn.close()
    
    def get_book_history(self, book_id):
        """获取图书借阅历史"""
        conn = mysql.connector.connect(**self.config)
        cursor = conn.cursor(dictionary=True)
        
        cursor.execute('''
            SELECT b.title, br.reader_name, 
                   br.borrow_date, br.return_date
            FROM books b
            JOIN borrowings br ON b.id = br.book_id
            WHERE b.id = %s
            ORDER BY br.borrow_date DESC
        ''', (book_id,))
        
        history = cursor.fetchall()
        cursor.close()
        conn.close()
        return history

# 使用示例
# db = LibraryDB(password='your_password')
# book_id = db.add_book("Python编程", "张三", "9787111111111", "计算机")
# db.borrow_book(book_id, "李四")
# history = db.get_book_history(book_id)
                    

ORM框架

SQLAlchemy示例


from sqlalchemy import create_engine, Column, Integer, String, Float, DateTime, ForeignKey
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, relationship
from datetime import datetime

# 创建基类
Base = declarative_base()

class Product(Base):
    """商品表"""
    __tablename__ = 'products'
    
    id = Column(Integer, primary_key=True)
    name = Column(String(100), nullable=False)
    price = Column(Float, nullable=False)
    stock = Column(Integer, default=0)
    created_at = Column(DateTime, default=datetime.now)
    
    # 建立与订单项的关系
    order_items = relationship("OrderItem", back_populates="product")

class Order(Base):
    """订单表"""
    __tablename__ = 'orders'
    
    id = Column(Integer, primary_key=True)
    user_name = Column(String(100), nullable=False)
    total_amount = Column(Float, default=0)
    status = Column(String(20), default='pending')
    created_at = Column(DateTime, default=datetime.now)
    
    # 建立与订单项的关系
    items = relationship("OrderItem", back_populates="order")

class OrderItem(Base):
    """订单项表"""
    __tablename__ = 'order_items'
    
    id = Column(Integer, primary_key=True)
    order_id = Column(Integer, ForeignKey('orders.id'))
    product_id = Column(Integer, ForeignKey('products.id'))
    quantity = Column(Integer, nullable=False)
    price = Column(Float, nullable=False)
    
    # 建立与订单和商品的关系
    order = relationship("Order", back_populates="items")
    product = relationship("Product", back_populates="order_items")

class ECommerceDB:
    """电商系统数据库操作"""
    
    def __init__(self, db_url='sqlite:///ecommerce.db'):
        self.engine = create_engine(db_url)
        Base.metadata.create_all(self.engine)
        self.Session = sessionmaker(bind=self.engine)
    
    def add_product(self, name, price, stock):
        """添加商品"""
        session = self.Session()
        try:
            product = Product(name=name, price=price, stock=stock)
            session.add(product)
            session.commit()
            return product.id
        finally:
            session.close()
    
    def create_order(self, user_name, items):
        """创建订单"""
        session = self.Session()
        try:
            # 创建订单
            order = Order(user_name=user_name)
            session.add(order)
            
            total_amount = 0
            # 添加订单项
            for product_id, quantity in items:
                product = session.query(Product).get(product_id)
                if not product or product.stock < quantity:
                    raise Exception(f"商品{product_id}库存不足")
                
                # 创建订单项
                order_item = OrderItem(
                    order=order,
                    product=product,
                    quantity=quantity,
                    price=product.price
                )
                session.add(order_item)
                
                # 更新库存
                product.stock -= quantity
                total_amount += product.price * quantity
            
            # 更新订单总金额
            order.total_amount = total_amount
            session.commit()
            return order.id
            
        except Exception as e:
            session.rollback()
            raise e
        finally:
            session.close()
    
    def get_order_details(self, order_id):
        """获取订单详情"""
        session = self.Session()
        try:
            order = session.query(Order).get(order_id)
            if not order:
                return None
            
            details = {
                'order_id': order.id,
                'user_name': order.user_name,
                'total_amount': order.total_amount,
                'status': order.status,
                'created_at': order.created_at,
                'items': []
            }
            
            for item in order.items:
                details['items'].append({
                    'product_name': item.product.name,
                    'quantity': item.quantity,
                    'price': item.price
                })
            
            return details
        finally:
            session.close()

# 使用示例
# db = ECommerceDB()
# product_id = db.add_product("手机", 2999.00, 100)
# order_id = db.create_order("张三", [(product_id, 2)])
# order_details = db.get_order_details(order_id)
                    

实际应用

数据库连接池


from dbutils.pooled_db import PooledDB
import mysql.connector
import threading

class DatabasePool:
    """数据库连接池实现"""
    
    _instance = None
    _lock = threading.Lock()
    
    def __new__(cls, *args, **kwargs):
        if not cls._instance:
            with cls._lock:
                if not cls._instance:
                    cls._instance = super().__new__(cls)
        return cls._instance
    
    def __init__(self, host='localhost', user='root', password='', database='test',
                 min_connections=1, max_connections=10):
        if not hasattr(self, 'pool'):
            self.pool = PooledDB(
                creator=mysql.connector,
                maxconnections=max_connections,
                mincached=min_connections,
                host=host,
                user=user,
                password=password,
                database=database,
                autocommit=True
            )
    
    def get_connection(self):
        """获取数据库连接"""
        return self.pool.connection()
    
    def execute_query(self, sql, params=None):
        """执行查询"""
        with self.get_connection() as conn:
            cursor = conn.cursor(dictionary=True)
            cursor.execute(sql, params or ())
            return cursor.fetchall()
    
    def execute_update(self, sql, params=None):
        """执行更新"""
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(sql, params or ())
            conn.commit()
            return cursor.rowcount

# 使用示例
# pool = DatabasePool(password='your_password')
# results = pool.execute_query('SELECT * FROM users WHERE age > %s', (18,))
# affected_rows = pool.execute_update(
#     'UPDATE users SET status = %s WHERE id = %s',
#     ('active', 1)
# )