Python高级错误处理示例

本页面展示了Python错误处理的高级应用示例,包括异步错误处理、错误恢复机制、分布式系统错误处理等高级主题。

异步错误处理

异步上下文管理器


import asyncio
from contextlib import asynccontextmanager
from typing import AsyncGenerator

class AsyncResource:
    """异步资源类"""
    async def connect(self):
        await asyncio.sleep(0.1)  # 模拟连接
        print("资源已连接")
    
    async def disconnect(self):
        await asyncio.sleep(0.1)  # 模拟断开
        print("资源已断开")
    
    async def process(self, data):
        await asyncio.sleep(0.1)  # 模拟处理
        return f"处理结果: {data}"

@asynccontextmanager
async def managed_resource() -> AsyncGenerator[AsyncResource, None]:
    """异步资源管理器"""
    resource = AsyncResource()
    try:
        await resource.connect()
        yield resource
    finally:
        await resource.disconnect()

async def process_data_async(data_list):
    """异步处理数据"""
    async with managed_resource() as resource:
        results = []
        for data in data_list:
            try:
                result = await resource.process(data)
                results.append(result)
            except Exception as e:
                print(f"处理数据 {data} 时出错: {e}")
        return results

# 使用示例
async def main():
    data = ["item1", "item2", "item3"]
    results = await process_data_async(data)
    print(results)

# asyncio.run(main())
                    

异步重试机制


import asyncio
from functools import wraps
from typing import TypeVar, Callable, Any

T = TypeVar('T')

def async_retry(
    retries: int = 3,
    delay: float = 1.0,
    backoff: float = 2.0,
    exceptions: tuple = (Exception,)
):
    """异步重试装饰器"""
    def decorator(func: Callable[..., Any]) -> Callable[..., Any]:
        @wraps(func)
        async def wrapper(*args, **kwargs):
            last_exception = None
            current_delay = delay

            for attempt in range(retries):
                try:
                    return await func(*args, **kwargs)
                except exceptions as e:
                    last_exception = e
                    if attempt < retries - 1:
                        await asyncio.sleep(current_delay)
                        current_delay *= backoff
                    else:
                        raise last_exception

        return wrapper
    return decorator

# 使用示例
class AsyncAPI:
    @async_retry(retries=3, delay=1, exceptions=(ConnectionError,))
    async def fetch_data(self, url: str) -> dict:
        """从API获取数据"""
        if url == "bad_url":
            raise ConnectionError("连接失败")
        return {"data": "success"}

async def test_retry():
    api = AsyncAPI()
    try:
        result = await api.fetch_data("bad_url")
    except ConnectionError:
        print("所有重试都失败了")
                    

错误恢复机制

事务回滚


from typing import List, Optional
from dataclasses import dataclass
import logging

@dataclass
class Operation:
    """操作记录"""
    name: str
    data: dict
    completed: bool = False
    
    def execute(self) -> bool:
        """执行操作"""
        try:
            # 模拟操作执行
            print(f"执行操作: {self.name}")
            self.completed = True
            return True
        except Exception as e:
            logging.error(f"操作 {self.name} 执行失败: {e}")
            return False
    
    def rollback(self) -> bool:
        """回滚操作"""
        try:
            if self.completed:
                print(f"回滚操作: {self.name}")
                self.completed = False
            return True
        except Exception as e:
            logging.error(f"操作 {self.name} 回滚失败: {e}")
            return False

class TransactionManager:
    """事务管理器"""
    def __init__(self):
        self.operations: List[Operation] = []
        self.current_index: int = -1
    
    def add_operation(self, operation: Operation):
        """添加操作"""
        self.operations.append(operation)
    
    def execute_all(self) -> bool:
        """执行所有操作"""
        try:
            for i, operation in enumerate(self.operations):
                if not operation.execute():
                    self.current_index = i
                    self.rollback()
                    return False
                self.current_index = i
            return True
        except Exception as e:
            logging.error(f"执行事务时发生错误: {e}")
            self.rollback()
            return False
    
    def rollback(self):
        """回滚已完成的操作"""
        for i in range(self.current_index, -1, -1):
            self.operations[i].rollback()

# 使用示例
def process_order():
    """处理订单"""
    tm = TransactionManager()
    
    # 添加操作
    tm.add_operation(Operation("验证库存", {"product_id": 1}))
    tm.add_operation(Operation("扣减库存", {"product_id": 1, "quantity": 1}))
    tm.add_operation(Operation("创建订单", {"order_id": "123"}))
    tm.add_operation(Operation("支付订单", {"order_id": "123"}))
    
    # 执行事务
    if tm.execute_all():
        print("订单处理成功")
    else:
        print("订单处理失败,已回滚")
                    

状态恢复


from typing import Dict, Any
import json
import os

class StateManager:
    """状态管理器"""
    def __init__(self, checkpoint_file: str):
        self.checkpoint_file = checkpoint_file
        self.state: Dict[str, Any] = {}
        self.load_checkpoint()
    
    def save_checkpoint(self):
        """保存检查点"""
        try:
            with open(self.checkpoint_file, 'w') as f:
                json.dump(self.state, f)
        except Exception as e:
            logging.error(f"保存检查点失败: {e}")
    
    def load_checkpoint(self):
        """加载检查点"""
        try:
            if os.path.exists(self.checkpoint_file):
                with open(self.checkpoint_file, 'r') as f:
                    self.state = json.load(f)
        except Exception as e:
            logging.error(f"加载检查点失败: {e}")
    
    def update_state(self, key: str, value: Any):
        """更新状态"""
        self.state[key] = value
        self.save_checkpoint()
    
    def get_state(self, key: str, default: Any = None) -> Any:
        """获取状态"""
        return self.state.get(key, default)

# 使用示例
def process_large_file(filename: str):
    """处理大文件"""
    state_manager = StateManager("process_state.json")
    last_position = state_manager.get_state("position", 0)
    
    try:
        with open(filename, 'r') as f:
            f.seek(last_position)
            while True:
                line = f.readline()
                if not line:
                    break
                    
                # 处理行数据
                process_line(line)
                
                # 更新状态
                state_manager.update_state("position", f.tell())
                
    except Exception as e:
        logging.error(f"处理文件时发生错误: {e}")
        # 下次从断点继续处理
                    

分布式错误处理

分布式锁


import redis
from contextlib import contextmanager
import time
import uuid

class DistributedLock:
    """分布式锁实现"""
    def __init__(self, redis_client: redis.Redis, lock_name: str, timeout: int = 10):
        self.redis = redis_client
        self.lock_name = lock_name
        self.timeout = timeout
        self.lock_id = str(uuid.uuid4())
    
    @contextmanager
    def acquire(self):
        """获取锁"""
        try:
            if self._acquire_lock():
                yield
            else:
                raise TimeoutError("无法获取锁")
        finally:
            self._release_lock()
    
    def _acquire_lock(self) -> bool:
        """获取锁的实现"""
        end_time = time.time() + self.timeout
        while time.time() < end_time:
            if self.redis.set(
                self.lock_name,
                self.lock_id,
                ex=self.timeout,
                nx=True
            ):
                return True
            time.sleep(0.1)
        return False
    
    def _release_lock(self):
        """释放锁"""
        pipeline = self.redis.pipeline()
        pipeline.get(self.lock_name)
        pipeline.delete(self.lock_name)
        value, _ = pipeline.execute()
        
        if value and value.decode() == self.lock_id:
            return True
        return False

# 使用示例
def process_shared_resource():
    """处理共享资源"""
    redis_client = redis.Redis(host='localhost', port=6379)
    lock = DistributedLock(redis_client, "shared_resource_lock")
    
    try:
        with lock.acquire():
            # 处理共享资源
            print("正在处理共享资源")
    except TimeoutError:
        print("无法获取资源锁")
                    

分布式事务


from enum import Enum
from typing import List, Dict
import time

class TransactionState(Enum):
    INIT = "init"
    PREPARED = "prepared"
    COMMITTED = "committed"
    ROLLED_BACK = "rolled_back"

class Participant:
    """事务参与者"""
    def __init__(self, name: str):
        self.name = name
        self.state = TransactionState.INIT
    
    def prepare(self) -> bool:
        """准备阶段"""
        try:
            print(f"{self.name} 准备中...")
            self.state = TransactionState.PREPARED
            return True
        except Exception as e:
            print(f"{self.name} 准备失败: {e}")
            return False
    
    def commit(self) -> bool:
        """提交阶段"""
        try:
            print(f"{self.name} 提交中...")
            self.state = TransactionState.COMMITTED
            return True
        except Exception as e:
            print(f"{self.name} 提交失败: {e}")
            return False
    
    def rollback(self) -> bool:
        """回滚阶段"""
        try:
            print(f"{self.name} 回滚中...")
            self.state = TransactionState.ROLLED_BACK
            return True
        except Exception as e:
            print(f"{self.name} 回滚失败: {e}")
            return False

class TwoPhaseCommit:
    """两阶段提交协调器"""
    def __init__(self):
        self.participants: List[Participant] = []
    
    def add_participant(self, participant: Participant):
        """添加参与者"""
        self.participants.append(participant)
    
    def execute(self) -> bool:
        """执行分布式事务"""
        # 准备阶段
        if not self._prepare_phase():
            self._rollback_all()
            return False
        
        # 提交阶段
        if not self._commit_phase():
            self._rollback_all()
            return False
        
        return True
    
    def _prepare_phase(self) -> bool:
        """准备阶段"""
        for participant in self.participants:
            if not participant.prepare():
                return False
        return True
    
    def _commit_phase(self) -> bool:
        """提交阶段"""
        for participant in self.participants:
            if not participant.commit():
                return False
        return True
    
    def _rollback_all(self):
        """回滚所有参与者"""
        for participant in self.participants:
            participant.rollback()

# 使用示例
def distributed_transaction():
    """执行分布式事务"""
    coordinator = TwoPhaseCommit()
    
    # 添加参与者
    coordinator.add_participant(Participant("数据库A"))
    coordinator.add_participant(Participant("数据库B"))
    coordinator.add_participant(Participant("消息队列"))
    
    # 执行事务
    if coordinator.execute():
        print("分布式事务执行成功")
    else:
        print("分布式事务执行失败")
                    

错误监控系统

错误聚合器


from collections import defaultdict
from datetime import datetime, timedelta
import threading
from typing import Dict, List, Optional

class ErrorAggregator:
    """错误聚合器"""
    def __init__(self, window_size: timedelta = timedelta(minutes=5)):
        self.window_size = window_size
        self.errors: Dict[str, List[datetime]] = defaultdict(list)
        self.lock = threading.Lock()
        
        # 启动清理线程
        self._start_cleanup_thread()
    
    def record_error(self, error_type: str):
        """记录错误"""
        with self.lock:
            self.errors[error_type].append(datetime.now())
    
    def get_error_count(self, error_type: str) -> int:
        """获取错误计数"""
        with self.lock:
            self._cleanup_old_errors(error_type)
            return len(self.errors[error_type])
    
    def get_error_rate(self, error_type: str) -> float:
        """获取错误率"""
        count = self.get_error_count(error_type)
        return count / self.window_size.total_seconds()
    
    def _cleanup_old_errors(self, error_type: str):
        """清理过期错误"""
        cutoff_time = datetime.now() - self.window_size
        self.errors[error_type] = [
            t for t in self.errors[error_type]
            if t > cutoff_time
        ]
    
    def _start_cleanup_thread(self):
        """启动清理线程"""
        def cleanup():
            while True:
                with self.lock:
                    for error_type in list(self.errors.keys()):
                        self._cleanup_old_errors(error_type)
                time.sleep(60)  # 每分钟清理一次
        
        thread = threading.Thread(target=cleanup, daemon=True)
        thread.start()

class ErrorMonitor:
    """错误监控器"""
    def __init__(self):
        self.aggregator = ErrorAggregator()
        self.alerts: Dict[str, bool] = defaultdict(bool)
        self.thresholds: Dict[str, float] = {}
    
    def set_threshold(self, error_type: str, threshold: float):
        """设置告警阈值"""
        self.thresholds[error_type] = threshold
    
    def handle_error(self, error: Exception):
        """处理错误"""
        error_type = type(error).__name__
        self.aggregator.record_error(error_type)
        
        # 检查是否需要告警
        if error_type in self.thresholds:
            error_rate = self.aggregator.get_error_rate(error_type)
            if error_rate > self.thresholds[error_type]:
                self._trigger_alert(error_type, error_rate)
    
    def _trigger_alert(self, error_type: str, error_rate: float):
        """触发告警"""
        if not self.alerts[error_type]:
            print(f"告警: {error_type} 错误率 ({error_rate:.2f}/s) 超过阈值")
            self.alerts[error_type] = True

# 使用示例
def monitor_application():
    """监控应用程序"""
    monitor = ErrorMonitor()
    monitor.set_threshold("ConnectionError", 0.1)  # 每秒最多0.1个连接错误
    
    try:
        # 模拟应用程序运行
        raise ConnectionError("连接失败")
    except Exception as e:
        monitor.handle_error(e)
                    

错误处理设计模式

错误处理链


from abc import ABC, abstractmethod
from typing import Optional, Any

class ErrorHandler(ABC):
    """错误处理器基类"""
    def __init__(self):
        self.next_handler: Optional[ErrorHandler] = None
    
    def set_next(self, handler: 'ErrorHandler') -> 'ErrorHandler':
        """设置下一个处理器"""
        self.next_handler = handler
        return handler
    
    @abstractmethod
    def handle(self, error: Exception) -> Optional[Any]:
        """处理错误"""
        pass

class RetryHandler(ErrorHandler):
    """重试处理器"""
    def __init__(self, max_retries: int = 3):
        super().__init__()
        self.max_retries = max_retries
    
    def handle(self, error: Exception) -> Optional[Any]:
        if isinstance(error, (ConnectionError, TimeoutError)):
            for attempt in range(self.max_retries):
                try:
                    print(f"重试第 {attempt + 1} 次")
                    # 重试操作
                    return "重试成功"
                except Exception:
                    continue
        
        return self.next_handler.handle(error) if self.next_handler else None

class LogHandler(ErrorHandler):
    """日志处理器"""
    def handle(self, error: Exception) -> Optional[Any]:
        print(f"记录错误: {error}")
        return self.next_handler.handle(error) if self.next_handler else None

class NotificationHandler(ErrorHandler):
    """通知处理器"""
    def handle(self, error: Exception) -> Optional[Any]:
        if isinstance(error, (RuntimeError, SystemError)):
            print(f"发送错误通知: {error}")
            return "已通知管理员"
        
        return self.next_handler.handle(error) if self.next_handler else None

# 使用示例
def setup_error_handling():
    """设置错误处理链"""
    retry_handler = RetryHandler()
    log_handler = LogHandler()
    notification_handler = NotificationHandler()
    
    retry_handler.set_next(log_handler).set_next(notification_handler)
    return retry_handler

def process_request():
    """处理请求"""
    handler = setup_error_handling()
    
    try:
        raise ConnectionError("连接超时")
    except Exception as e:
        result = handler.handle(e)
        if result:
            print(f"错误处理结果: {result}")
        else:
            print("错误无法处理")
                    

错误处理测试

异常测试框架


import unittest
from typing import Type, Callable, Any
from contextlib import contextmanager

class ErrorTestCase(unittest.TestCase):
    """错误测试基类"""
    @contextmanager
    def assertRaisesWithMessage(
        self,
        expected_exception: Type[Exception],
        expected_message: str
    ):
        """断言异常和错误消息"""
        with self.assertRaises(expected_exception) as cm:
            yield
        self.assertEqual(str(cm.exception), expected_message)
    
    def assertErrorHandler(
        self,
        handler: Callable,
        input_data: Any,
        expected_error: Type[Exception],
        expected_result: Any
    ):
        """测试错误处理器"""
        try:
            result = handler(input_data)
            self.assertEqual(result, expected_result)
        except Exception as e:
            self.assertIsInstance(e, expected_error)

# 测试示例
class TestErrorHandling(ErrorTestCase):
    def test_value_validation(self):
        """测试值验证"""
        def validate_age(age: int) -> str:
            if age < 0:
                raise ValueError("年龄不能为负数")
            if age > 150:
                raise ValueError("年龄不能超过150岁")
            return "有效年龄"
        
        # 测试有效值
        self.assertEqual(validate_age(25), "有效年龄")
        
        # 测试无效值
        with self.assertRaisesWithMessage(ValueError, "年龄不能为负数"):
            validate_age(-1)
        
        with self.assertRaisesWithMessage(ValueError, "年龄不能超过150岁"):
            validate_age(200)
    
    def test_error_handler(self):
        """测试错误处理器"""
        def process_number(value: str) -> int:
            try:
                return int(value)
            except ValueError:
                return -1
        
        # 测试错误处理
        self.assertErrorHandler(
            handler=process_number,
            input_data="abc",
            expected_error=ValueError,
            expected_result=-1
        )

if __name__ == '__main__':
    unittest.main()