Python自动化工具示例

本页面提供了Python自动化工具的实用示例,包括文件处理、网络爬虫、自动化测试等内容。这些示例将帮助你更好地理解自动化的概念,提高工作效率。

文件处理

批量文件处理工具


import os
import shutil
from pathlib import Path
from datetime import datetime
import hashlib
import logging
from concurrent.futures import ThreadPoolExecutor
from typing import List, Dict, Any

class FileProcessor:
    """文件批量处理工具"""
    
    def __init__(self, base_dir: str):
        self.base_dir = Path(base_dir)
        self.setup_logging()
    
    def setup_logging(self):
        """配置日志"""
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler('file_processor.log'),
                logging.StreamHandler()
            ]
        )
        self.logger = logging.getLogger(__name__)
    
    def scan_files(self, pattern: str = "*") -> List[Path]:
        """扫描文件
        pattern: 文件匹配模式,如 "*.txt", "*.jpg" 等
        """
        try:
            files = list(self.base_dir.rglob(pattern))
            self.logger.info(f"找到 {len(files)} 个文件")
            return files
        except Exception as e:
            self.logger.error(f"扫描文件时出错: {e}")
            return []
    
    def organize_by_extension(self, target_dir: str = None):
        """按扩展名整理文件"""
        target_dir = Path(target_dir) if target_dir else self.base_dir / "organized"
        target_dir.mkdir(exist_ok=True)
        
        try:
            for file in self.scan_files():
                if file.is_file():
                    # 获取文件扩展名(转换为小写)
                    ext = file.suffix.lower()[1:] or "no_extension"
                    # 创建扩展名目录
                    ext_dir = target_dir / ext
                    ext_dir.mkdir(exist_ok=True)
                    # 移动文件
                    shutil.copy2(file, ext_dir / file.name)
            
            self.logger.info("文件整理完成")
        except Exception as e:
            self.logger.error(f"整理文件时出错: {e}")
    
    def organize_by_date(self, target_dir: str = None):
        """按日期整理文件"""
        target_dir = Path(target_dir) if target_dir else self.base_dir / "by_date"
        target_dir.mkdir(exist_ok=True)
        
        try:
            for file in self.scan_files():
                if file.is_file():
                    # 获取文件修改时间
                    mtime = datetime.fromtimestamp(file.stat().st_mtime)
                    # 创建年月目录
                    date_dir = target_dir / f"{mtime.year}" / f"{mtime.month:02d}"
                    date_dir.mkdir(parents=True, exist_ok=True)
                    # 移动文件
                    shutil.copy2(file, date_dir / file.name)
            
            self.logger.info("按日期整理完成")
        except Exception as e:
            self.logger.error(f"按日期整理文件时出错: {e}")
    
    def rename_files(self, pattern: str, new_pattern: str):
        """批量重命名文件
        pattern: 原文件名模式
        new_pattern: 新文件名模式,支持 {index}, {date}, {original} 等占位符
        """
        try:
            for index, file in enumerate(self.scan_files(pattern), 1):
                if file.is_file():
                    # 准备替换变量
                    vars = {
                        'index': f"{index:04d}",
                        'date': datetime.now().strftime('%Y%m%d'),
                        'original': file.stem,
                        'ext': file.suffix
                    }
                    # 生成新文件名
                    new_name = new_pattern.format(**vars) + file.suffix
                    # 重命名文件
                    file.rename(file.parent / new_name)
            
            self.logger.info("文件重命名完成")
        except Exception as e:
            self.logger.error(f"重命名文件时出错: {e}")
    
    def calculate_file_hash(self, file_path: Path) -> str:
        """计算文件哈希值"""
        hash_md5 = hashlib.md5()
        try:
            with open(file_path, "rb") as f:
                for chunk in iter(lambda: f.read(4096), b""):
                    hash_md5.update(chunk)
            return hash_md5.hexdigest()
        except Exception as e:
            self.logger.error(f"计算文件哈希值时出错: {e}")
            return ""
    
    def find_duplicates(self) -> Dict[str, List[Path]]:
        """查找重复文件"""
        hash_dict = {}
        
        try:
            with ThreadPoolExecutor() as executor:
                for file in self.scan_files():
                    if file.is_file():
                        file_hash = self.calculate_file_hash(file)
                        if file_hash:
                            hash_dict.setdefault(file_hash, []).append(file)
            
            # 只返回有重复的文件
            duplicates = {k: v for k, v in hash_dict.items() if len(v) > 1}
            self.logger.info(f"找到 {len(duplicates)} 组重复文件")
            return duplicates
        except Exception as e:
            self.logger.error(f"查找重复文件时出错: {e}")
            return {}
    
    def batch_process_images(self, target_dir: str = None, size: tuple = None):
        """批量处理图片
        size: (width, height) 调整大小
        """
        try:
            from PIL import Image
            target_dir = Path(target_dir) if target_dir else self.base_dir / "processed_images"
            target_dir.mkdir(exist_ok=True)
            
            for file in self.scan_files("*.jpg") + self.scan_files("*.png"):
                if file.is_file():
                    with Image.open(file) as img:
                        # 调整大小
                        if size:
                            img = img.resize(size, Image.LANCZOS)
                        # 保存处理后的图片
                        new_path = target_dir / file.name
                        img.save(new_path, quality=95, optimize=True)
            
            self.logger.info("图片处理完成")
        except ImportError:
            self.logger.error("请安装 Pillow 库以处理图片")
        except Exception as e:
            self.logger.error(f"处理图片时出错: {e}")
    
    def compress_files(self, output_file: str, files_pattern: str = "*"):
        """压缩文件
        output_file: 输出的压缩文件名
        files_pattern: 要压缩的文件模式
        """
        import zipfile
        
        try:
            with zipfile.ZipFile(output_file, 'w', zipfile.ZIP_DEFLATED) as zipf:
                for file in self.scan_files(files_pattern):
                    if file.is_file():
                        zipf.write(file, file.name)
            
            self.logger.info(f"文件已压缩到 {output_file}")
        except Exception as e:
            self.logger.error(f"压缩文件时出错: {e}")

# 使用示例
def file_processing_example():
    """文件处理示例"""
    # 创建处理器实例
    processor = FileProcessor("./files")
    
    # 按扩展名整理文件
    processor.organize_by_extension("./organized_files")
    
    # 按日期整理文件
    processor.organize_by_date("./files_by_date")
    
    # 批量重命名文件
    processor.rename_files("*.jpg", "photo_{date}_{index}")
    
    # 查找重复文件
    duplicates = processor.find_duplicates()
    for hash_value, file_list in duplicates.items():
        print(f"\n发现重复文件(哈希值:{hash_value}):")
        for file in file_list:
            print(f"  - {file}")
    
    # 批量处理图片
    processor.batch_process_images("./processed_images", size=(800, 600))
    
    # 压缩文件
    processor.compress_files("archive.zip", "*.txt")

# 运行示例
if __name__ == "__main__":
    file_processing_example()
                    

网络爬虫

通用网页爬虫


import requests
from bs4 import BeautifulSoup
import pandas as pd
import json
import time
import random
from urllib.parse import urljoin
import logging
from typing import List, Dict, Any
from concurrent.futures import ThreadPoolExecutor
import aiohttp
import asyncio

class WebSpider:
    """通用网页爬虫"""
    
    def __init__(self, base_url: str, headers: Dict = None):
        self.base_url = base_url
        self.headers = headers or {
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
        }
        self.session = requests.Session()
        self.setup_logging()
    
    def setup_logging(self):
        """配置日志"""
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler('spider.log'),
                logging.StreamHandler()
            ]
        )
        self.logger = logging.getLogger(__name__)
    
    def get_page(self, url: str) -> str:
        """获取页面内容"""
        try:
            response = self.session.get(
                url,
                headers=self.headers,
                timeout=10
            )
            response.raise_for_status()
            return response.text
        except Exception as e:
            self.logger.error(f"获取页面失败: {url}, 错误: {e}")
            return ""
    
    def parse_html(self, html: str) -> BeautifulSoup:
        """解析HTML"""
        return BeautifulSoup(html, 'html.parser')
    
    def extract_links(self, soup: BeautifulSoup, pattern: str = None) -> List[str]:
        """提取链接"""
        links = []
        try:
            for link in soup.find_all('a', href=True):
                url = urljoin(self.base_url, link['href'])
                if pattern is None or pattern in url:
                    links.append(url)
        except Exception as e:
            self.logger.error(f"提取链接失败: {e}")
        return links
    
    def extract_data(self, soup: BeautifulSoup, selectors: Dict[str, str]) -> Dict[str, str]:
        """提取数据
        selectors: {'字段名': 'CSS选择器'}
        """
        data = {}
        try:
            for field, selector in selectors.items():
                element = soup.select_one(selector)
                data[field] = element.text.strip() if element else ""
        except Exception as e:
            self.logger.error(f"提取数据失败: {e}")
        return data
    
    def save_to_csv(self, data: List[Dict], filename: str):
        """保存数据到CSV"""
        try:
            df = pd.DataFrame(data)
            df.to_csv(filename, index=False, encoding='utf-8-sig')
            self.logger.info(f"数据已保存到: {filename}")
        except Exception as e:
            self.logger.error(f"保存CSV失败: {e}")
    
    def save_to_json(self, data: List[Dict], filename: str):
        """保存数据到JSON"""
        try:
            with open(filename, 'w', encoding='utf-8') as f:
                json.dump(data, f, ensure_ascii=False, indent=2)
            self.logger.info(f"数据已保存到: {filename}")
        except Exception as e:
            self.logger.error(f"保存JSON失败: {e}")
    
    def crawl_with_threads(self, urls: List[str], max_workers: int = 5) -> List[str]:
        """多线程爬取"""
        results = []
        with ThreadPoolExecutor(max_workers=max_workers) as executor:
            future_to_url = {
                executor.submit(self.get_page, url): url
                for url in urls
            }
            for future in future_to_url:
                url = future_to_url[future]
                try:
                    results.append(future.result())
                except Exception as e:
                    self.logger.error(f"爬取失败: {url}, 错误: {e}")
        return results
    
    async def async_get_page(self, url: str, session: aiohttp.ClientSession) -> str:
        """异步获取页面"""
        try:
            async with session.get(url, headers=self.headers) as response:
                return await response.text()
        except Exception as e:
            self.logger.error(f"异步获取页面失败: {url}, 错误: {e}")
            return ""
    
    async def crawl_with_asyncio(self, urls: List[str]) -> List[str]:
        """异步爬取"""
        async with aiohttp.ClientSession() as session:
            tasks = [
                self.async_get_page(url, session)
                for url in urls
            ]
            return await asyncio.gather(*tasks)
    
    def random_delay(self, min_delay: float = 1, max_delay: float = 3):
        """随机延迟"""
        time.sleep(random.uniform(min_delay, max_delay))

class NewsSpider(WebSpider):
    """新闻爬虫示例"""
    
    def __init__(self, base_url: str):
        super().__init__(base_url)
        self.data = []
    
    def parse_news_list(self, html: str) -> List[str]:
        """解析新闻列表页"""
        soup = self.parse_html(html)
        return self.extract_links(soup, '/news/')
    
    def parse_news_detail(self, html: str) -> Dict[str, str]:
        """解析新闻详情页"""
        soup = self.parse_html(html)
        selectors = {
            'title': 'h1.news-title',
            'content': 'div.news-content',
            'date': 'span.news-date',
            'author': 'span.news-author'
        }
        return self.extract_data(soup, selectors)
    
    def crawl_news(self, max_pages: int = 5):
        """爬取新闻"""
        try:
            # 爬取新闻列表
            for page in range(1, max_pages + 1):
                url = f"{self.base_url}/news/list?page={page}"
                html = self.get_page(url)
                news_links = self.parse_news_list(html)
                
                # 爬取新闻详情
                for link in news_links:
                    self.random_delay()
                    detail_html = self.get_page(link)
                    news_data = self.parse_news_detail(detail_html)
                    news_data['url'] = link
                    self.data.append(news_data)
                
                self.logger.info(f"完成第 {page} 页")
            
            # 保存数据
            self.save_to_csv(self.data, 'news.csv')
            self.save_to_json(self.data, 'news.json')
            
        except Exception as e:
            self.logger.error(f"爬取新闻失败: {e}")

class EcommerceSpider(WebSpider):
    """电商爬虫示例"""
    
    def __init__(self, base_url: str):
        super().__init__(base_url)
        self.data = []
    
    def parse_product_list(self, html: str) -> List[str]:
        """解析商品列表页"""
        soup = self.parse_html(html)
        return self.extract_links(soup, '/product/')
    
    def parse_product_detail(self, html: str) -> Dict[str, str]:
        """解析商品详情页"""
        soup = self.parse_html(html)
        selectors = {
            'title': 'h1.product-title',
            'price': 'span.product-price',
            'description': 'div.product-description',
            'category': 'span.product-category'
        }
        return self.extract_data(soup, selectors)
    
    async def crawl_products(self, max_pages: int = 5):
        """异步爬取商品"""
        try:
            # 获取所有商品链接
            product_links = []
            for page in range(1, max_pages + 1):
                url = f"{self.base_url}/products/list?page={page}"
                html = self.get_page(url)
                links = self.parse_product_list(html)
                product_links.extend(links)
                self.logger.info(f"获取第 {page} 页商品链接")
            
            # 异步爬取商品详情
            async with aiohttp.ClientSession() as session:
                tasks = [
                    self.async_get_page(url, session)
                    for url in product_links
                ]
                pages = await asyncio.gather(*tasks)
                
                # 解析商品详情
                for url, html in zip(product_links, pages):
                    product_data = self.parse_product_detail(html)
                    product_data['url'] = url
                    self.data.append(product_data)
            
            # 保存数据
            self.save_to_csv(self.data, 'products.csv')
            self.save_to_json(self.data, 'products.json')
            
        except Exception as e:
            self.logger.error(f"爬取商品失败: {e}")

# 使用示例
def spider_example():
    """爬虫示例"""
    # 新闻爬虫示例
    news_spider = NewsSpider('https://example.com')
    news_spider.crawl_news(max_pages=3)
    
    # 电商爬虫示例
    ecommerce_spider = EcommerceSpider('https://shop.example.com')
    asyncio.run(ecommerce_spider.crawl_products(max_pages=3))

# 运行示例
if __name__ == "__main__":
    spider_example()
                    

自动化测试

Web应用测试框架


import unittest
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException
import pytest
from typing import Any, Dict, List
import logging
import json
import time
from datetime import datetime

class WebTestFramework:
    """Web应用测试框架"""
    
    def __init__(self, base_url: str, browser: str = 'chrome'):
        self.base_url = base_url
        self.browser = browser
        self.driver = None
        self.setup_logging()
    
    def setup_logging(self):
        """配置日志"""
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler('test.log'),
                logging.StreamHandler()
            ]
        )
        self.logger = logging.getLogger(__name__)
    
    def start_browser(self):
        """启动浏览器"""
        try:
            if self.browser == 'chrome':
                options = webdriver.ChromeOptions()
                options.add_argument('--headless')  # 无头模式
                self.driver = webdriver.Chrome(options=options)
            elif self.browser == 'firefox':
                options = webdriver.FirefoxOptions()
                options.add_argument('--headless')
                self.driver = webdriver.Firefox(options=options)
            
            self.driver.implicitly_wait(10)
            self.logger.info(f"启动 {self.browser} 浏览器")
        except Exception as e:
            self.logger.error(f"启动浏览器失败: {e}")
            raise
    
    def stop_browser(self):
        """关闭浏览器"""
        if self.driver:
            self.driver.quit()
            self.logger.info("关闭浏览器")
    
    def navigate_to(self, url: str):
        """导航到指定页面"""
        try:
            full_url = urljoin(self.base_url, url)
            self.driver.get(full_url)
            self.logger.info(f"导航到: {full_url}")
        except Exception as e:
            self.logger.error(f"导航失败: {e}")
            raise
    
    def find_element(self, by: str, value: str, timeout: int = 10) -> Any:
        """查找元素"""
        try:
            element = WebDriverWait(self.driver, timeout).until(
                EC.presence_of_element_located((by, value))
            )
            return element
        except TimeoutException:
            self.logger.error(f"查找元素超时: {by}={value}")
            raise
        except Exception as e:
            self.logger.error(f"查找元素失败: {e}")
            raise
    
    def click_element(self, by: str, value: str):
        """点击元素"""
        try:
            element = self.find_element(by, value)
            element.click()
            self.logger.info(f"点击元素: {by}={value}")
        except Exception as e:
            self.logger.error(f"点击元素失败: {e}")
            raise
    
    def input_text(self, by: str, value: str, text: str):
        """输入文本"""
        try:
            element = self.find_element(by, value)
            element.clear()
            element.send_keys(text)
            self.logger.info(f"输入文本: {text}")
        except Exception as e:
            self.logger.error(f"输入文本失败: {e}")
            raise
    
    def get_text(self, by: str, value: str) -> str:
        """获取元素文本"""
        try:
            element = self.find_element(by, value)
            return element.text
        except Exception as e:
            self.logger.error(f"获取文本失败: {e}")
            raise
    
    def take_screenshot(self, name: str):
        """截图"""
        try:
            timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
            filename = f"screenshot_{name}_{timestamp}.png"
            self.driver.save_screenshot(filename)
            self.logger.info(f"保存截图: {filename}")
        except Exception as e:
            self.logger.error(f"截图失败: {e}")
    
    def execute_script(self, script: str, *args) -> Any:
        """执行JavaScript"""
        try:
            return self.driver.execute_script(script, *args)
        except Exception as e:
            self.logger.error(f"执行脚本失败: {e}")
            raise

class LoginTest(unittest.TestCase):
    """登录功能测试示例"""
    
    @classmethod
    def setUpClass(cls):
        cls.framework = WebTestFramework('https://example.com')
        cls.framework.start_browser()
    
    @classmethod
    def tearDownClass(cls):
        cls.framework.stop_browser()
    
    def setUp(self):
        self.framework.navigate_to('/login')
    
    def test_successful_login(self):
        """测试成功登录"""
        # 输入用户名和密码
        self.framework.input_text(By.ID, 'username', 'testuser')
        self.framework.input_text(By.ID, 'password', 'password123')
        
        # 点击登录按钮
        self.framework.click_element(By.ID, 'login-button')
        
        # 验证登录成功
        welcome_text = self.framework.get_text(By.CLASS_NAME, 'welcome-message')
        self.assertIn('Welcome', welcome_text)
    
    def test_failed_login(self):
        """测试登录失败"""
        # 输入错误的用户名和密码
        self.framework.input_text(By.ID, 'username', 'wronguser')
        self.framework.input_text(By.ID, 'password', 'wrongpass')
        
        # 点击登录按钮
        self.framework.click_element(By.ID, 'login-button')
        
        # 验证错误消息
        error_text = self.framework.get_text(By.CLASS_NAME, 'error-message')
        self.assertIn('Invalid credentials', error_text)
        
        # 保存失败截图
        self.framework.take_screenshot('login_failed')

@pytest.fixture
def web_framework():
    """Pytest fixture for web framework"""
    framework = WebTestFramework('https://example.com')
    framework.start_browser()
    yield framework
    framework.stop_browser()

class TestRegistration:
    """注册功能测试示例(使用pytest)"""
    
    def test_successful_registration(self, web_framework):
        """测试成功注册"""
        web_framework.navigate_to('/register')
        
        # 填写注册表单
        web_framework.input_text(By.ID, 'username', 'newuser')
        web_framework.input_text(By.ID, 'email', '[email protected]')
        web_framework.input_text(By.ID, 'password', 'password123')
        web_framework.input_text(By.ID, 'confirm-password', 'password123')
        
        # 同意条款
        web_framework.click_element(By.ID, 'terms-checkbox')
        
        # 提交注册
        web_framework.click_element(By.ID, 'register-button')
        
        # 验证注册成功
        success_text = web_framework.get_text(By.CLASS_NAME, 'success-message')
        assert 'Registration successful' in success_text
    
    def test_duplicate_username(self, web_framework):
        """测试重复用户名"""
        web_framework.navigate_to('/register')
        
        # 使用已存在的用户名注册
        web_framework.input_text(By.ID, 'username', 'existinguser')
        web_framework.input_text(By.ID, 'email', '[email protected]')
        web_framework.input_text(By.ID, 'password', 'password123')
        web_framework.input_text(By.ID, 'confirm-password', 'password123')
        
        # 同意条款
        web_framework.click_element(By.ID, 'terms-checkbox')
        
        # 提交注册
        web_framework.click_element(By.ID, 'register-button')
        
        # 验证错误消息
        error_text = web_framework.get_text(By.CLASS_NAME, 'error-message')
        assert 'Username already exists' in error_text
        
        # 保存失败截图
        web_framework.take_screenshot('registration_failed')

def run_tests():
    """运行测试"""
    # 运行unittest测试
    unittest.main(argv=[''], verbosity=2, exit=False)
    
    # 运行pytest测试
    pytest.main(['-v'])

if __name__ == '__main__':
    run_tests()
                    

API测试框架


import requests
import pytest
import json
import logging
from typing import Dict, Any
from datetime import datetime

class APITestFramework:
    """API测试框架"""
    
    def __init__(self, base_url: str):
        self.base_url = base_url
        self.session = requests.Session()
        self.setup_logging()
    
    def setup_logging(self):
        """配置日志"""
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler('api_test.log'),
                logging.StreamHandler()
            ]
        )
        self.logger = logging.getLogger(__name__)
    
    def send_request(self, method: str, endpoint: str, **kwargs) -> requests.Response:
        """发送请求"""
        url = f"{self.base_url}{endpoint}"
        try:
            response = self.session.request(method, url, **kwargs)
            self.logger.info(f"{method} {url}")
            self.logger.debug(f"Request: {kwargs}")
            self.logger.debug(f"Response: {response.text}")
            return response
        except Exception as e:
            self.logger.error(f"请求失败: {e}")
            raise
    
    def get(self, endpoint: str, **kwargs) -> requests.Response:
        """GET请求"""
        return self.send_request('GET', endpoint, **kwargs)
    
    def post(self, endpoint: str, **kwargs) -> requests.Response:
        """POST请求"""
        return self.send_request('POST', endpoint, **kwargs)
    
    def put(self, endpoint: str, **kwargs) -> requests.Response:
        """PUT请求"""
        return self.send_request('PUT', endpoint, **kwargs)
    
    def delete(self, endpoint: str, **kwargs) -> requests.Response:
        """DELETE请求"""
        return self.send_request('DELETE', endpoint, **kwargs)
    
    def assert_status_code(self, response: requests.Response, expected_code: int):
        """断言状态码"""
        assert response.status_code == expected_code, \
            f"Expected status code {expected_code}, but got {response.status_code}"
    
    def assert_json_value(self, response: requests.Response, key: str, expected_value: Any):
        """断言JSON值"""
        data = response.json()
        assert key in data, f"Key '{key}' not found in response"
        assert data[key] == expected_value, \
            f"Expected '{key}' to be {expected_value}, but got {data[key]}"
    
    def save_response(self, response: requests.Response, filename: str):
        """保存响应"""
        try:
            with open(filename, 'w', encoding='utf-8') as f:
                json.dump(response.json(), f, ensure_ascii=False, indent=2)
            self.logger.info(f"响应已保存到: {filename}")
        except Exception as e:
            self.logger.error(f"保存响应失败: {e}")

@pytest.fixture
def api_framework():
    """Pytest fixture for API framework"""
    return APITestFramework('https://api.example.com')

class TestUserAPI:
    """用户API测试示例"""
    
    def test_get_user(self, api_framework):
        """测试获取用户信息"""
        # 发送请求
        response = api_framework.get('/users/1')
        
        # 验证响应
        api_framework.assert_status_code(response, 200)
        api_framework.assert_json_value(response, 'username', 'testuser')
        
        # 保存响应
        api_framework.save_response(response, 'user_response.json')
    
    def test_create_user(self, api_framework):
        """测试创建用户"""
        # 准备数据
        data = {
            'username': 'newuser',
            'email': '[email protected]',
            'password': 'password123'
        }
        
        # 发送请求
        response = api_framework.post('/users', json=data)
        
        # 验证响应
        api_framework.assert_status_code(response, 201)
        api_framework.assert_json_value(response, 'message', 'User created successfully')
    
    def test_update_user(self, api_framework):
        """测试更新用户"""
        # 准备数据
        data = {
            'email': '[email protected]'
        }
        
        # 发送请求
        response = api_framework.put('/users/1', json=data)
        
        # 验证响应
        api_framework.assert_status_code(response, 200)
        api_framework.assert_json_value(response, 'email', '[email protected]')
    
    def test_delete_user(self, api_framework):
        """测试删除用户"""
        # 发送请求
        response = api_framework.delete('/users/1')
        
        # 验证响应
        api_framework.assert_status_code(response, 204)

class TestAuthAPI:
    """认证API测试示例"""
    
    def test_login(self, api_framework):
        """测试登录"""
        # 准备数据
        data = {
            'username': 'testuser',
            'password': 'password123'
        }
        
        # 发送请求
        response = api_framework.post('/auth/login', json=data)
        
        # 验证响应
        api_framework.assert_status_code(response, 200)
        assert 'token' in response.json()
        
        # 保存token
        token = response.json()['token']
        api_framework.session.headers.update({'Authorization': f'Bearer {token}'})
    
    def test_refresh_token(self, api_framework):
        """测试刷新token"""
        # 发送请求
        response = api_framework.post('/auth/refresh')
        
        # 验证响应
        api_framework.assert_status_code(response, 200)
        assert 'token' in response.json()
    
    def test_logout(self, api_framework):
        """测试登出"""
        # 发送请求
        response = api_framework.post('/auth/logout')
        
        # 验证响应
        api_framework.assert_status_code(response, 200)
        api_framework.assert_json_value(response, 'message', 'Logged out successfully')

def run_api_tests():
    """运行API测试"""
    pytest.main(['-v'])

if __name__ == '__main__':
    run_api_tests()