Python Web编程示例

本页面提供了Python Web编程的实用示例,包括Flask、FastAPI等框架的使用示例。这些示例将帮助你更好地理解Web开发的概念,快速构建Web应用。

Flask框架

简单博客系统


from flask import Flask, request, render_template, redirect, url_for, flash
from flask_sqlalchemy import SQLAlchemy
from datetime import datetime
import markdown

app = Flask(__name__)
app.config['SECRET_KEY'] = 'your-secret-key'
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///blog.db'
db = SQLAlchemy(app)

class Post(db.Model):
    """博客文章模型"""
    id = db.Column(db.Integer, primary_key=True)
    title = db.Column(db.String(100), nullable=False)
    content = db.Column(db.Text, nullable=False)
    created_at = db.Column(db.DateTime, default=datetime.now)
    
    def to_html(self):
        """将Markdown内容转换为HTML"""
        return markdown.markdown(self.content)

@app.route('/')
def index():
    """首页:显示所有文章列表"""
    posts = Post.query.order_by(Post.created_at.desc()).all()
    return render_template('index.html', posts=posts)

@app.route('/post/')
def view_post(post_id):
    """查看文章详情"""
    post = Post.query.get_or_404(post_id)
    return render_template('post.html', post=post)

@app.route('/post/new', methods=['GET', 'POST'])
def new_post():
    """创建新文章"""
    if request.method == 'POST':
        title = request.form.get('title')
        content = request.form.get('content')
        
        if not title or not content:
            flash('标题和内容不能为空')
            return redirect(url_for('new_post'))
        
        post = Post(title=title, content=content)
        db.session.add(post)
        db.session.commit()
        
        flash('文章发布成功!')
        return redirect(url_for('index'))
    
    return render_template('new_post.html')

@app.route('/post//edit', methods=['GET', 'POST'])
def edit_post(post_id):
    """编辑文章"""
    post = Post.query.get_or_404(post_id)
    
    if request.method == 'POST':
        title = request.form.get('title')
        content = request.form.get('content')
        
        if not title or not content:
            flash('标题和内容不能为空')
            return redirect(url_for('edit_post', post_id=post_id))
        
        post.title = title
        post.content = content
        db.session.commit()
        
        flash('文章更新成功!')
        return redirect(url_for('view_post', post_id=post_id))
    
    return render_template('edit_post.html', post=post)

@app.route('/post//delete', methods=['POST'])
def delete_post(post_id):
    """删除文章"""
    post = Post.query.get_or_404(post_id)
    db.session.delete(post)
    db.session.commit()
    
    flash('文章已删除!')
    return redirect(url_for('index'))

# HTML模板示例 (templates/base.html)
"""



    
    {% block title %}博客{% endblock %}
    


    
    
    {% with messages = get_flashed_messages() %}
        {% if messages %}
            {% for message in messages %}
                
{{ message }}
{% endfor %} {% endif %} {% endwith %} {% block content %}{% endblock %} """ # 启动应用 if __name__ == '__main__': with app.app_context(): db.create_all() app.run(debug=True)

FastAPI框架

RESTful API服务


from fastapi import FastAPI, HTTPException, Depends
from sqlalchemy import create_engine, Column, Integer, String, Float, DateTime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, Session
from pydantic import BaseModel
from datetime import datetime
from typing import List, Optional

# 数据库配置
SQLALCHEMY_DATABASE_URL = "sqlite:///./products.db"
engine = create_engine(SQLALCHEMY_DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()

class ProductDB(Base):
    """产品数据库模型"""
    __tablename__ = "products"
    
    id = Column(Integer, primary_key=True, index=True)
    name = Column(String, index=True)
    description = Column(String)
    price = Column(Float)
    stock = Column(Integer)
    created_at = Column(DateTime, default=datetime.now)

# 创建数据库表
Base.metadata.create_all(bind=engine)

# Pydantic模型
class ProductBase(BaseModel):
    name: str
    description: Optional[str] = None
    price: float
    stock: int

class ProductCreate(ProductBase):
    pass

class Product(ProductBase):
    id: int
    created_at: datetime
    
    class Config:
        orm_mode = True

# FastAPI应用
app = FastAPI(title="产品管理API")

# 依赖项
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

@app.post("/products/", response_model=Product)
def create_product(product: ProductCreate, db: Session = Depends(get_db)):
    """创建新产品"""
    db_product = ProductDB(**product.dict())
    db.add(db_product)
    db.commit()
    db.refresh(db_product)
    return db_product

@app.get("/products/", response_model=List[Product])
def list_products(
    skip: int = 0,
    limit: int = 10,
    db: Session = Depends(get_db)
):
    """获取产品列表"""
    products = db.query(ProductDB).offset(skip).limit(limit).all()
    return products

@app.get("/products/{product_id}", response_model=Product)
def get_product(product_id: int, db: Session = Depends(get_db)):
    """获取单个产品详情"""
    product = db.query(ProductDB).filter(ProductDB.id == product_id).first()
    if product is None:
        raise HTTPException(status_code=404, detail="Product not found")
    return product

@app.put("/products/{product_id}", response_model=Product)
def update_product(
    product_id: int,
    product: ProductCreate,
    db: Session = Depends(get_db)
):
    """更新产品信息"""
    db_product = db.query(ProductDB).filter(ProductDB.id == product_id).first()
    if db_product is None:
        raise HTTPException(status_code=404, detail="Product not found")
    
    for key, value in product.dict().items():
        setattr(db_product, key, value)
    
    db.commit()
    db.refresh(db_product)
    return db_product

@app.delete("/products/{product_id}")
def delete_product(product_id: int, db: Session = Depends(get_db)):
    """删除产品"""
    db_product = db.query(ProductDB).filter(ProductDB.id == product_id).first()
    if db_product is None:
        raise HTTPException(status_code=404, detail="Product not found")
    
    db.delete(db_product)
    db.commit()
    return {"message": "Product deleted"}

# 启动命令:uvicorn main:app --reload
                    

模板引擎

Jinja2模板示例






    
    
    {% block title %}默认标题{% endblock %}
    
    {% block extra_css %}{% endblock %}


    
{% with messages = get_flashed_messages(with_categories=true) %} {% if messages %} {% for category, message in messages %}
{{ message }}
{% endfor %} {% endif %} {% endwith %} {% block content %}{% endblock %}

© {{ now.year }} 我的网站. All rights reserved.

{% block extra_js %}{% endblock %} {% extends "base.html" %} {% block title %}文章列表{% endblock %} {% block content %}
{% for article in articles %}

{{ article.title }}

作者: {{ article.author }} 发布于: {{ article.created_at|datetime }}
{{ article.summary|safe }}
{% for tag in article.tags %} {{ tag }} {% endfor %}
阅读更多
{% else %}

暂无文章

{% endfor %} {% if pagination %} {% endif %}
{% endblock %} {% extends "base.html" %} {% block title %}编辑文章{% endblock %} {% block content %}
{{ form.csrf_token }}
{{ form.title.label }} {{ form.title(class="form-control") }} {% if form.title.errors %}
{% for error in form.title.errors %} {{ error }} {% endfor %}
{% endif %}
{{ form.content.label }} {{ form.content(class="form-control") }} {% if form.content.errors %}
{% for error in form.content.errors %} {{ error }} {% endfor %}
{% endif %}
{{ form.tags.label }} {{ form.tags(class="form-control") }} 使用逗号分隔多个标签
取消
{% endblock %}

实际应用

文件上传服务


from flask import Flask, request, send_file, render_template
from werkzeug.utils import secure_filename
import os
from datetime import datetime
import hashlib

app = Flask(__name__)
app.config['UPLOAD_FOLDER'] = 'uploads'
app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024  # 16MB限制

# 确保上传目录存在
os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True)

def allowed_file(filename):
    """检查文件类型是否允许"""
    ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg', 'gif', 'pdf', 'txt', 'doc', 'docx'}
    return '.' in filename and \
           filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS

def get_file_hash(file_stream):
    """计算文件哈希值"""
    sha256_hash = hashlib.sha256()
    for chunk in iter(lambda: file_stream.read(4096), b''):
        sha256_hash.update(chunk)
    file_stream.seek(0)
    return sha256_hash.hexdigest()

@app.route('/')
def index():
    """上传页面"""
    return render_template('upload.html')

@app.route('/upload', methods=['POST'])
def upload_file():
    """处理文件上传"""
    if 'file' not in request.files:
        return {'error': '没有文件'}, 400
    
    file = request.files['file']
    if file.filename == '':
        return {'error': '没有选择文件'}, 400
    
    if file and allowed_file(file.filename):
        # 安全的文件名
        filename = secure_filename(file.filename)
        
        # 使用时间戳和哈希值创建唯一文件名
        file_hash = get_file_hash(file)
        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
        ext = filename.rsplit('.', 1)[1].lower()
        new_filename = f"{timestamp}_{file_hash[:8]}.{ext}"
        
        # 保存文件
        file_path = os.path.join(app.config['UPLOAD_FOLDER'], new_filename)
        file.save(file_path)
        
        # 返回文件信息
        return {
            'message': '文件上传成功',
            'filename': new_filename,
            'original_name': filename,
            'download_url': f'/download/{new_filename}'
        }
    
    return {'error': '不允许的文件类型'}, 400

@app.route('/download/')
def download_file(filename):
    """下载文件"""
    try:
        return send_file(
            os.path.join(app.config['UPLOAD_FOLDER'], filename),
            as_attachment=True
        )
    except FileNotFoundError:
        return {'error': '文件不存在'}, 404

@app.route('/files')
def list_files():
    """列出所有上传的文件"""
    files = []
    for filename in os.listdir(app.config['UPLOAD_FOLDER']):
        file_path = os.path.join(app.config['UPLOAD_FOLDER'], filename)
        file_stat = os.stat(file_path)
        files.append({
            'name': filename,
            'size': file_stat.st_size,
            'created_at': datetime.fromtimestamp(file_stat.st_ctime),
            'download_url': f'/download/{filename}'
        })
    
    return render_template('files.html', files=files)

# 上传页面模板
"""



    
    文件上传
    


    

文件上传

0%
""" if __name__ == '__main__': app.run(debug=True)