{{ article.title }}
{{ article.summary|safe }}
阅读更多
本页面提供了Python Web编程的实用示例,包括Flask、FastAPI等框架的使用示例。这些示例将帮助你更好地理解Web开发的概念,快速构建Web应用。
from flask import Flask, request, render_template, redirect, url_for, flash
from flask_sqlalchemy import SQLAlchemy
from datetime import datetime
import markdown
app = Flask(__name__)
app.config['SECRET_KEY'] = 'your-secret-key'
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///blog.db'
db = SQLAlchemy(app)
class Post(db.Model):
"""博客文章模型"""
id = db.Column(db.Integer, primary_key=True)
title = db.Column(db.String(100), nullable=False)
content = db.Column(db.Text, nullable=False)
created_at = db.Column(db.DateTime, default=datetime.now)
def to_html(self):
"""将Markdown内容转换为HTML"""
return markdown.markdown(self.content)
@app.route('/')
def index():
"""首页:显示所有文章列表"""
posts = Post.query.order_by(Post.created_at.desc()).all()
return render_template('index.html', posts=posts)
@app.route('/post/')
def view_post(post_id):
"""查看文章详情"""
post = Post.query.get_or_404(post_id)
return render_template('post.html', post=post)
@app.route('/post/new', methods=['GET', 'POST'])
def new_post():
"""创建新文章"""
if request.method == 'POST':
title = request.form.get('title')
content = request.form.get('content')
if not title or not content:
flash('标题和内容不能为空')
return redirect(url_for('new_post'))
post = Post(title=title, content=content)
db.session.add(post)
db.session.commit()
flash('文章发布成功!')
return redirect(url_for('index'))
return render_template('new_post.html')
@app.route('/post//edit', methods=['GET', 'POST'])
def edit_post(post_id):
"""编辑文章"""
post = Post.query.get_or_404(post_id)
if request.method == 'POST':
title = request.form.get('title')
content = request.form.get('content')
if not title or not content:
flash('标题和内容不能为空')
return redirect(url_for('edit_post', post_id=post_id))
post.title = title
post.content = content
db.session.commit()
flash('文章更新成功!')
return redirect(url_for('view_post', post_id=post_id))
return render_template('edit_post.html', post=post)
@app.route('/post//delete', methods=['POST'])
def delete_post(post_id):
"""删除文章"""
post = Post.query.get_or_404(post_id)
db.session.delete(post)
db.session.commit()
flash('文章已删除!')
return redirect(url_for('index'))
# HTML模板示例 (templates/base.html)
"""
{% block title %}博客{% endblock %}
{% with messages = get_flashed_messages() %}
{% if messages %}
{% for message in messages %}
{{ message }}
{% endfor %}
{% endif %}
{% endwith %}
{% block content %}{% endblock %}
"""
# 启动应用
if __name__ == '__main__':
with app.app_context():
db.create_all()
app.run(debug=True)
from fastapi import FastAPI, HTTPException, Depends
from sqlalchemy import create_engine, Column, Integer, String, Float, DateTime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, Session
from pydantic import BaseModel
from datetime import datetime
from typing import List, Optional
# 数据库配置
SQLALCHEMY_DATABASE_URL = "sqlite:///./products.db"
engine = create_engine(SQLALCHEMY_DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
class ProductDB(Base):
"""产品数据库模型"""
__tablename__ = "products"
id = Column(Integer, primary_key=True, index=True)
name = Column(String, index=True)
description = Column(String)
price = Column(Float)
stock = Column(Integer)
created_at = Column(DateTime, default=datetime.now)
# 创建数据库表
Base.metadata.create_all(bind=engine)
# Pydantic模型
class ProductBase(BaseModel):
name: str
description: Optional[str] = None
price: float
stock: int
class ProductCreate(ProductBase):
pass
class Product(ProductBase):
id: int
created_at: datetime
class Config:
orm_mode = True
# FastAPI应用
app = FastAPI(title="产品管理API")
# 依赖项
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
@app.post("/products/", response_model=Product)
def create_product(product: ProductCreate, db: Session = Depends(get_db)):
"""创建新产品"""
db_product = ProductDB(**product.dict())
db.add(db_product)
db.commit()
db.refresh(db_product)
return db_product
@app.get("/products/", response_model=List[Product])
def list_products(
skip: int = 0,
limit: int = 10,
db: Session = Depends(get_db)
):
"""获取产品列表"""
products = db.query(ProductDB).offset(skip).limit(limit).all()
return products
@app.get("/products/{product_id}", response_model=Product)
def get_product(product_id: int, db: Session = Depends(get_db)):
"""获取单个产品详情"""
product = db.query(ProductDB).filter(ProductDB.id == product_id).first()
if product is None:
raise HTTPException(status_code=404, detail="Product not found")
return product
@app.put("/products/{product_id}", response_model=Product)
def update_product(
product_id: int,
product: ProductCreate,
db: Session = Depends(get_db)
):
"""更新产品信息"""
db_product = db.query(ProductDB).filter(ProductDB.id == product_id).first()
if db_product is None:
raise HTTPException(status_code=404, detail="Product not found")
for key, value in product.dict().items():
setattr(db_product, key, value)
db.commit()
db.refresh(db_product)
return db_product
@app.delete("/products/{product_id}")
def delete_product(product_id: int, db: Session = Depends(get_db)):
"""删除产品"""
db_product = db.query(ProductDB).filter(ProductDB.id == product_id).first()
if db_product is None:
raise HTTPException(status_code=404, detail="Product not found")
db.delete(db_product)
db.commit()
return {"message": "Product deleted"}
# 启动命令:uvicorn main:app --reload
{% block title %}默认标题{% endblock %}
{% block extra_css %}{% endblock %}
{% with messages = get_flashed_messages(with_categories=true) %}
{% if messages %}
{% for category, message in messages %}
{{ message }}
{% endfor %}
{% endif %}
{% endwith %}
{% block content %}{% endblock %}
{% block extra_js %}{% endblock %}
{% extends "base.html" %}
{% block title %}文章列表{% endblock %}
{% block content %}
{% for article in articles %}
{{ article.title }}
{{ article.summary|safe }}
阅读更多
{% else %}
暂无文章
{% endfor %}
{% if pagination %}
{% if pagination.has_prev %}
« 上一页
{% endif %}
{% for page in pagination.iter_pages() %}
{% if page %}
{% if page != pagination.page %}
{{ page }}
{% else %}
{{ page }}
{% endif %}
{% else %}
...
{% endif %}
{% endfor %}
{% if pagination.has_next %}
下一页 »
{% endif %}
{% endif %}
{% endblock %}
{% extends "base.html" %}
{% block title %}编辑文章{% endblock %}
{% block content %}
{% endblock %}
from flask import Flask, request, send_file, render_template
from werkzeug.utils import secure_filename
import os
from datetime import datetime
import hashlib
app = Flask(__name__)
app.config['UPLOAD_FOLDER'] = 'uploads'
app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 16MB限制
# 确保上传目录存在
os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True)
def allowed_file(filename):
"""检查文件类型是否允许"""
ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg', 'gif', 'pdf', 'txt', 'doc', 'docx'}
return '.' in filename and \
filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
def get_file_hash(file_stream):
"""计算文件哈希值"""
sha256_hash = hashlib.sha256()
for chunk in iter(lambda: file_stream.read(4096), b''):
sha256_hash.update(chunk)
file_stream.seek(0)
return sha256_hash.hexdigest()
@app.route('/')
def index():
"""上传页面"""
return render_template('upload.html')
@app.route('/upload', methods=['POST'])
def upload_file():
"""处理文件上传"""
if 'file' not in request.files:
return {'error': '没有文件'}, 400
file = request.files['file']
if file.filename == '':
return {'error': '没有选择文件'}, 400
if file and allowed_file(file.filename):
# 安全的文件名
filename = secure_filename(file.filename)
# 使用时间戳和哈希值创建唯一文件名
file_hash = get_file_hash(file)
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
ext = filename.rsplit('.', 1)[1].lower()
new_filename = f"{timestamp}_{file_hash[:8]}.{ext}"
# 保存文件
file_path = os.path.join(app.config['UPLOAD_FOLDER'], new_filename)
file.save(file_path)
# 返回文件信息
return {
'message': '文件上传成功',
'filename': new_filename,
'original_name': filename,
'download_url': f'/download/{new_filename}'
}
return {'error': '不允许的文件类型'}, 400
@app.route('/download/')
def download_file(filename):
"""下载文件"""
try:
return send_file(
os.path.join(app.config['UPLOAD_FOLDER'], filename),
as_attachment=True
)
except FileNotFoundError:
return {'error': '文件不存在'}, 404
@app.route('/files')
def list_files():
"""列出所有上传的文件"""
files = []
for filename in os.listdir(app.config['UPLOAD_FOLDER']):
file_path = os.path.join(app.config['UPLOAD_FOLDER'], filename)
file_stat = os.stat(file_path)
files.append({
'name': filename,
'size': file_stat.st_size,
'created_at': datetime.fromtimestamp(file_stat.st_ctime),
'download_url': f'/download/{filename}'
})
return render_template('files.html', files=files)
# 上传页面模板
"""
文件上传
文件上传
"""
if __name__ == '__main__':
app.run(debug=True)