异步错误处理
异步上下文管理器
import asyncio
from contextlib import asynccontextmanager
from typing import AsyncGenerator
class AsyncResource:
"""异步资源类"""
async def connect(self):
await asyncio.sleep(0.1) # 模拟连接
print("资源已连接")
async def disconnect(self):
await asyncio.sleep(0.1) # 模拟断开
print("资源已断开")
async def process(self, data):
await asyncio.sleep(0.1) # 模拟处理
return f"处理结果: {data}"
@asynccontextmanager
async def managed_resource() -> AsyncGenerator[AsyncResource, None]:
"""异步资源管理器"""
resource = AsyncResource()
try:
await resource.connect()
yield resource
finally:
await resource.disconnect()
async def process_data_async(data_list):
"""异步处理数据"""
async with managed_resource() as resource:
results = []
for data in data_list:
try:
result = await resource.process(data)
results.append(result)
except Exception as e:
print(f"处理数据 {data} 时出错: {e}")
return results
# 使用示例
async def main():
data = ["item1", "item2", "item3"]
results = await process_data_async(data)
print(results)
# asyncio.run(main())
异步重试机制
import asyncio
from functools import wraps
from typing import TypeVar, Callable, Any
T = TypeVar('T')
def async_retry(
retries: int = 3,
delay: float = 1.0,
backoff: float = 2.0,
exceptions: tuple = (Exception,)
):
"""异步重试装饰器"""
def decorator(func: Callable[..., Any]) -> Callable[..., Any]:
@wraps(func)
async def wrapper(*args, **kwargs):
last_exception = None
current_delay = delay
for attempt in range(retries):
try:
return await func(*args, **kwargs)
except exceptions as e:
last_exception = e
if attempt < retries - 1:
await asyncio.sleep(current_delay)
current_delay *= backoff
else:
raise last_exception
return wrapper
return decorator
# 使用示例
class AsyncAPI:
@async_retry(retries=3, delay=1, exceptions=(ConnectionError,))
async def fetch_data(self, url: str) -> dict:
"""从API获取数据"""
if url == "bad_url":
raise ConnectionError("连接失败")
return {"data": "success"}
async def test_retry():
api = AsyncAPI()
try:
result = await api.fetch_data("bad_url")
except ConnectionError:
print("所有重试都失败了")
错误恢复机制
事务回滚
from typing import List, Optional
from dataclasses import dataclass
import logging
@dataclass
class Operation:
"""操作记录"""
name: str
data: dict
completed: bool = False
def execute(self) -> bool:
"""执行操作"""
try:
# 模拟操作执行
print(f"执行操作: {self.name}")
self.completed = True
return True
except Exception as e:
logging.error(f"操作 {self.name} 执行失败: {e}")
return False
def rollback(self) -> bool:
"""回滚操作"""
try:
if self.completed:
print(f"回滚操作: {self.name}")
self.completed = False
return True
except Exception as e:
logging.error(f"操作 {self.name} 回滚失败: {e}")
return False
class TransactionManager:
"""事务管理器"""
def __init__(self):
self.operations: List[Operation] = []
self.current_index: int = -1
def add_operation(self, operation: Operation):
"""添加操作"""
self.operations.append(operation)
def execute_all(self) -> bool:
"""执行所有操作"""
try:
for i, operation in enumerate(self.operations):
if not operation.execute():
self.current_index = i
self.rollback()
return False
self.current_index = i
return True
except Exception as e:
logging.error(f"执行事务时发生错误: {e}")
self.rollback()
return False
def rollback(self):
"""回滚已完成的操作"""
for i in range(self.current_index, -1, -1):
self.operations[i].rollback()
# 使用示例
def process_order():
"""处理订单"""
tm = TransactionManager()
# 添加操作
tm.add_operation(Operation("验证库存", {"product_id": 1}))
tm.add_operation(Operation("扣减库存", {"product_id": 1, "quantity": 1}))
tm.add_operation(Operation("创建订单", {"order_id": "123"}))
tm.add_operation(Operation("支付订单", {"order_id": "123"}))
# 执行事务
if tm.execute_all():
print("订单处理成功")
else:
print("订单处理失败,已回滚")
状态恢复
from typing import Dict, Any
import json
import os
class StateManager:
"""状态管理器"""
def __init__(self, checkpoint_file: str):
self.checkpoint_file = checkpoint_file
self.state: Dict[str, Any] = {}
self.load_checkpoint()
def save_checkpoint(self):
"""保存检查点"""
try:
with open(self.checkpoint_file, 'w') as f:
json.dump(self.state, f)
except Exception as e:
logging.error(f"保存检查点失败: {e}")
def load_checkpoint(self):
"""加载检查点"""
try:
if os.path.exists(self.checkpoint_file):
with open(self.checkpoint_file, 'r') as f:
self.state = json.load(f)
except Exception as e:
logging.error(f"加载检查点失败: {e}")
def update_state(self, key: str, value: Any):
"""更新状态"""
self.state[key] = value
self.save_checkpoint()
def get_state(self, key: str, default: Any = None) -> Any:
"""获取状态"""
return self.state.get(key, default)
# 使用示例
def process_large_file(filename: str):
"""处理大文件"""
state_manager = StateManager("process_state.json")
last_position = state_manager.get_state("position", 0)
try:
with open(filename, 'r') as f:
f.seek(last_position)
while True:
line = f.readline()
if not line:
break
# 处理行数据
process_line(line)
# 更新状态
state_manager.update_state("position", f.tell())
except Exception as e:
logging.error(f"处理文件时发生错误: {e}")
# 下次从断点继续处理
分布式错误处理
分布式锁
import redis
from contextlib import contextmanager
import time
import uuid
class DistributedLock:
"""分布式锁实现"""
def __init__(self, redis_client: redis.Redis, lock_name: str, timeout: int = 10):
self.redis = redis_client
self.lock_name = lock_name
self.timeout = timeout
self.lock_id = str(uuid.uuid4())
@contextmanager
def acquire(self):
"""获取锁"""
try:
if self._acquire_lock():
yield
else:
raise TimeoutError("无法获取锁")
finally:
self._release_lock()
def _acquire_lock(self) -> bool:
"""获取锁的实现"""
end_time = time.time() + self.timeout
while time.time() < end_time:
if self.redis.set(
self.lock_name,
self.lock_id,
ex=self.timeout,
nx=True
):
return True
time.sleep(0.1)
return False
def _release_lock(self):
"""释放锁"""
pipeline = self.redis.pipeline()
pipeline.get(self.lock_name)
pipeline.delete(self.lock_name)
value, _ = pipeline.execute()
if value and value.decode() == self.lock_id:
return True
return False
# 使用示例
def process_shared_resource():
"""处理共享资源"""
redis_client = redis.Redis(host='localhost', port=6379)
lock = DistributedLock(redis_client, "shared_resource_lock")
try:
with lock.acquire():
# 处理共享资源
print("正在处理共享资源")
except TimeoutError:
print("无法获取资源锁")
分布式事务
from enum import Enum
from typing import List, Dict
import time
class TransactionState(Enum):
INIT = "init"
PREPARED = "prepared"
COMMITTED = "committed"
ROLLED_BACK = "rolled_back"
class Participant:
"""事务参与者"""
def __init__(self, name: str):
self.name = name
self.state = TransactionState.INIT
def prepare(self) -> bool:
"""准备阶段"""
try:
print(f"{self.name} 准备中...")
self.state = TransactionState.PREPARED
return True
except Exception as e:
print(f"{self.name} 准备失败: {e}")
return False
def commit(self) -> bool:
"""提交阶段"""
try:
print(f"{self.name} 提交中...")
self.state = TransactionState.COMMITTED
return True
except Exception as e:
print(f"{self.name} 提交失败: {e}")
return False
def rollback(self) -> bool:
"""回滚阶段"""
try:
print(f"{self.name} 回滚中...")
self.state = TransactionState.ROLLED_BACK
return True
except Exception as e:
print(f"{self.name} 回滚失败: {e}")
return False
class TwoPhaseCommit:
"""两阶段提交协调器"""
def __init__(self):
self.participants: List[Participant] = []
def add_participant(self, participant: Participant):
"""添加参与者"""
self.participants.append(participant)
def execute(self) -> bool:
"""执行分布式事务"""
# 准备阶段
if not self._prepare_phase():
self._rollback_all()
return False
# 提交阶段
if not self._commit_phase():
self._rollback_all()
return False
return True
def _prepare_phase(self) -> bool:
"""准备阶段"""
for participant in self.participants:
if not participant.prepare():
return False
return True
def _commit_phase(self) -> bool:
"""提交阶段"""
for participant in self.participants:
if not participant.commit():
return False
return True
def _rollback_all(self):
"""回滚所有参与者"""
for participant in self.participants:
participant.rollback()
# 使用示例
def distributed_transaction():
"""执行分布式事务"""
coordinator = TwoPhaseCommit()
# 添加参与者
coordinator.add_participant(Participant("数据库A"))
coordinator.add_participant(Participant("数据库B"))
coordinator.add_participant(Participant("消息队列"))
# 执行事务
if coordinator.execute():
print("分布式事务执行成功")
else:
print("分布式事务执行失败")
错误监控系统
错误聚合器
from collections import defaultdict
from datetime import datetime, timedelta
import threading
from typing import Dict, List, Optional
class ErrorAggregator:
"""错误聚合器"""
def __init__(self, window_size: timedelta = timedelta(minutes=5)):
self.window_size = window_size
self.errors: Dict[str, List[datetime]] = defaultdict(list)
self.lock = threading.Lock()
# 启动清理线程
self._start_cleanup_thread()
def record_error(self, error_type: str):
"""记录错误"""
with self.lock:
self.errors[error_type].append(datetime.now())
def get_error_count(self, error_type: str) -> int:
"""获取错误计数"""
with self.lock:
self._cleanup_old_errors(error_type)
return len(self.errors[error_type])
def get_error_rate(self, error_type: str) -> float:
"""获取错误率"""
count = self.get_error_count(error_type)
return count / self.window_size.total_seconds()
def _cleanup_old_errors(self, error_type: str):
"""清理过期错误"""
cutoff_time = datetime.now() - self.window_size
self.errors[error_type] = [
t for t in self.errors[error_type]
if t > cutoff_time
]
def _start_cleanup_thread(self):
"""启动清理线程"""
def cleanup():
while True:
with self.lock:
for error_type in list(self.errors.keys()):
self._cleanup_old_errors(error_type)
time.sleep(60) # 每分钟清理一次
thread = threading.Thread(target=cleanup, daemon=True)
thread.start()
class ErrorMonitor:
"""错误监控器"""
def __init__(self):
self.aggregator = ErrorAggregator()
self.alerts: Dict[str, bool] = defaultdict(bool)
self.thresholds: Dict[str, float] = {}
def set_threshold(self, error_type: str, threshold: float):
"""设置告警阈值"""
self.thresholds[error_type] = threshold
def handle_error(self, error: Exception):
"""处理错误"""
error_type = type(error).__name__
self.aggregator.record_error(error_type)
# 检查是否需要告警
if error_type in self.thresholds:
error_rate = self.aggregator.get_error_rate(error_type)
if error_rate > self.thresholds[error_type]:
self._trigger_alert(error_type, error_rate)
def _trigger_alert(self, error_type: str, error_rate: float):
"""触发告警"""
if not self.alerts[error_type]:
print(f"告警: {error_type} 错误率 ({error_rate:.2f}/s) 超过阈值")
self.alerts[error_type] = True
# 使用示例
def monitor_application():
"""监控应用程序"""
monitor = ErrorMonitor()
monitor.set_threshold("ConnectionError", 0.1) # 每秒最多0.1个连接错误
try:
# 模拟应用程序运行
raise ConnectionError("连接失败")
except Exception as e:
monitor.handle_error(e)
错误处理设计模式
错误处理链
from abc import ABC, abstractmethod
from typing import Optional, Any
class ErrorHandler(ABC):
"""错误处理器基类"""
def __init__(self):
self.next_handler: Optional[ErrorHandler] = None
def set_next(self, handler: 'ErrorHandler') -> 'ErrorHandler':
"""设置下一个处理器"""
self.next_handler = handler
return handler
@abstractmethod
def handle(self, error: Exception) -> Optional[Any]:
"""处理错误"""
pass
class RetryHandler(ErrorHandler):
"""重试处理器"""
def __init__(self, max_retries: int = 3):
super().__init__()
self.max_retries = max_retries
def handle(self, error: Exception) -> Optional[Any]:
if isinstance(error, (ConnectionError, TimeoutError)):
for attempt in range(self.max_retries):
try:
print(f"重试第 {attempt + 1} 次")
# 重试操作
return "重试成功"
except Exception:
continue
return self.next_handler.handle(error) if self.next_handler else None
class LogHandler(ErrorHandler):
"""日志处理器"""
def handle(self, error: Exception) -> Optional[Any]:
print(f"记录错误: {error}")
return self.next_handler.handle(error) if self.next_handler else None
class NotificationHandler(ErrorHandler):
"""通知处理器"""
def handle(self, error: Exception) -> Optional[Any]:
if isinstance(error, (RuntimeError, SystemError)):
print(f"发送错误通知: {error}")
return "已通知管理员"
return self.next_handler.handle(error) if self.next_handler else None
# 使用示例
def setup_error_handling():
"""设置错误处理链"""
retry_handler = RetryHandler()
log_handler = LogHandler()
notification_handler = NotificationHandler()
retry_handler.set_next(log_handler).set_next(notification_handler)
return retry_handler
def process_request():
"""处理请求"""
handler = setup_error_handling()
try:
raise ConnectionError("连接超时")
except Exception as e:
result = handler.handle(e)
if result:
print(f"错误处理结果: {result}")
else:
print("错误无法处理")
错误处理测试
异常测试框架
import unittest
from typing import Type, Callable, Any
from contextlib import contextmanager
class ErrorTestCase(unittest.TestCase):
"""错误测试基类"""
@contextmanager
def assertRaisesWithMessage(
self,
expected_exception: Type[Exception],
expected_message: str
):
"""断言异常和错误消息"""
with self.assertRaises(expected_exception) as cm:
yield
self.assertEqual(str(cm.exception), expected_message)
def assertErrorHandler(
self,
handler: Callable,
input_data: Any,
expected_error: Type[Exception],
expected_result: Any
):
"""测试错误处理器"""
try:
result = handler(input_data)
self.assertEqual(result, expected_result)
except Exception as e:
self.assertIsInstance(e, expected_error)
# 测试示例
class TestErrorHandling(ErrorTestCase):
def test_value_validation(self):
"""测试值验证"""
def validate_age(age: int) -> str:
if age < 0:
raise ValueError("年龄不能为负数")
if age > 150:
raise ValueError("年龄不能超过150岁")
return "有效年龄"
# 测试有效值
self.assertEqual(validate_age(25), "有效年龄")
# 测试无效值
with self.assertRaisesWithMessage(ValueError, "年龄不能为负数"):
validate_age(-1)
with self.assertRaisesWithMessage(ValueError, "年龄不能超过150岁"):
validate_age(200)
def test_error_handler(self):
"""测试错误处理器"""
def process_number(value: str) -> int:
try:
return int(value)
except ValueError:
return -1
# 测试错误处理
self.assertErrorHandler(
handler=process_number,
input_data="abc",
expected_error=ValueError,
expected_result=-1
)
if __name__ == '__main__':
unittest.main()