子生成器
递归目录遍历生成器
import os
from pathlib import Path
def walk_files(directory):
"""递归遍历目录生成器"""
for entry in os.scandir(directory):
if entry.is_file():
yield entry.path
elif entry.is_dir():
yield from walk_files(entry.path)
def file_type_filter(files, extensions):
"""文件类型过滤器"""
for file in files:
if any(file.endswith(ext) for ext in extensions):
yield file
def file_content_reader(files):
"""文件内容读取生成器"""
for file in files:
try:
with open(file, 'r', encoding='utf-8') as f:
yield file, f.read()
except Exception as e:
print(f"读取文件失败: {file}, 错误: {e}")
# 使用示例
def search_in_files(directory, extensions, keyword):
"""搜索文件内容"""
files = walk_files(directory)
filtered_files = file_type_filter(files, extensions)
for file_path, content in file_content_reader(filtered_files):
if keyword in content:
yield file_path, content.count(keyword)
数据流转换管道
class DataPipeline:
"""数据流转换管道"""
def __init__(self):
self.transformers = []
def add_transformer(self, transformer):
"""添加转换器"""
self.transformers.append(transformer)
return self
def process(self, data_stream):
"""处理数据流"""
for transformer in self.transformers:
data_stream = transformer(data_stream)
yield from data_stream
def split_lines(data_stream):
"""行分割转换器"""
for chunk in data_stream:
yield from chunk.splitlines()
def parse_json(data_stream):
"""JSON解析转换器"""
import json
for line in data_stream:
try:
yield json.loads(line)
except json.JSONDecodeError:
continue
def validate_data(data_stream):
"""数据验证转换器"""
for item in data_stream:
if all(k in item for k in ['id', 'name', 'value']):
yield item
# 使用示例
def process_log_file(file_path):
def read_chunks(file_path, chunk_size=8192):
with open(file_path, 'r') as f:
while True:
chunk = f.read(chunk_size)
if not chunk:
break
yield chunk
pipeline = DataPipeline()
pipeline.add_transformer(split_lines)\
.add_transformer(parse_json)\
.add_transformer(validate_data)
for item in pipeline.process(read_chunks(file_path)):
print(f"处理数据: {item}")
生成器表达式
高级生成器表达式
def matrix_operations():
"""矩阵运算示例"""
# 创建矩阵
matrix = [
[1, 2, 3],
[4, 5, 6],
[7, 8, 9]
]
# 转置矩阵
transpose = (
[row[i] for row in matrix]
for i in range(len(matrix[0]))
)
# 矩阵元素平方
squared = (
(x * x for x in row)
for row in matrix
)
# 对角线元素
diagonal = (
matrix[i][i]
for i in range(len(matrix))
)
# 上三角矩阵
upper_triangle = (
(val if j >= i else 0
for j, val in enumerate(row))
for i, row in enumerate(matrix)
)
return transpose, squared, diagonal, upper_triangle
# 使用示例
def demo_matrix_operations():
transpose, squared, diagonal, upper = matrix_operations()
print("转置矩阵:")
for row in transpose:
print(list(row))
print("\n平方矩阵:")
for row in squared:
print(list(row))
print("\n对角线元素:")
print(list(diagonal))
print("\n上三角矩阵:")
for row in upper:
print(list(row))
链式生成器表达式
def chain_expressions(data):
"""链式生成器表达式示例"""
# 数据转换
numbers = (int(x) for x in data if x.isdigit())
# 过滤偶数
evens = (x for x in numbers if x % 2 == 0)
# 计算平方
squares = (x * x for x in evens)
# 累积和
running_sum = (
sum(squares[i] for i in range(j + 1))
for j in range(len(list(squares)))
)
return running_sum
# 使用示例
def demo_chain_expressions():
data = ['1', 'a', '2', '3', 'b', '4', '5', '6']
result = chain_expressions(data)
print("累积和序列:")
print(list(result))
上下文管理
资源管理生成器
from contextlib import contextmanager
import threading
class ResourcePool:
"""资源池"""
def __init__(self, size):
self.resources = [Resource(i) for i in range(size)]
self.lock = threading.Lock()
self.available = threading.Semaphore(size)
@contextmanager
def acquire(self):
"""获取资源的上下文管理器"""
self.available.acquire()
try:
with self.lock:
resource = next(r for r in self.resources if not r.in_use)
resource.in_use = True
try:
yield resource
finally:
with self.lock:
resource.in_use = False
finally:
self.available.release()
class Resource:
"""模拟资源"""
def __init__(self, id):
self.id = id
self.in_use = False
def use(self):
return f"使用资源 {self.id}"
def resource_user(pool):
"""资源使用生成器"""
while True:
with pool.acquire() as resource:
yield resource.use()
# 使用示例
def demo_resource_pool():
pool = ResourcePool(3)
users = [resource_user(pool) for _ in range(5)]
for _ in range(10):
for user in users:
print(next(user))
事务管理生成器
class Transaction:
"""简单事务管理器"""
def __init__(self):
self.operations = []
def add(self, operation):
self.operations.append(operation)
def rollback(self):
for operation in reversed(self.operations):
operation.undo()
self.operations.clear()
def commit(self):
self.operations.clear()
@contextmanager
def transaction():
"""事务上下文管理器"""
tx = Transaction()
try:
yield tx
tx.commit()
except Exception:
tx.rollback()
raise
class Operation:
"""可回滚的操作"""
def __init__(self, data):
self.data = data
self.old_value = None
def execute(self, value):
self.old_value = self.data['value']
self.data['value'] = value
def undo(self):
if self.old_value is not None:
self.data['value'] = self.old_value
# 使用示例
def update_with_transaction(data):
with transaction() as tx:
op = Operation(data)
op.execute("新值")
tx.add(op)
# 可能引发异常的操作
if data['value'] == "新值":
raise ValueError("测试回滚")
并行处理
并行生成器
import multiprocessing
from concurrent.futures import ProcessPoolExecutor
def parallel_processor(data_stream, process_func, max_workers=None):
"""并行处理生成器"""
with ProcessPoolExecutor(max_workers=max_workers) as executor:
yield from executor.map(process_func, data_stream)
def parallel_chunk_processor(data_stream, process_func, chunk_size=1000):
"""并行分块处理生成器"""
def process_chunk(chunk):
return [process_func(item) for item in chunk]
current_chunk = []
for item in data_stream:
current_chunk.append(item)
if len(current_chunk) >= chunk_size:
yield from parallel_processor(current_chunk, process_func)
current_chunk = []
if current_chunk:
yield from parallel_processor(current_chunk, process_func)
# 使用示例
def cpu_intensive_task(x):
"""CPU密集型任务"""
result = 0
for i in range(1000000):
result += i * x
return result
def process_large_dataset():
data = range(10000)
results = parallel_chunk_processor(
data,
cpu_intensive_task,
chunk_size=100
)
for result in results:
print(f"处理结果: {result}")
并行文件处理
def parallel_file_processor(file_paths, process_func, max_workers=None):
"""并行文件处理生成器"""
def process_file(file_path):
try:
with open(file_path, 'r') as f:
content = f.read()
return process_func(content)
except Exception as e:
print(f"处理文件失败: {file_path}, 错误: {e}")
return None
yield from parallel_processor(
file_paths,
process_file,
max_workers
)
def word_count(text):
"""单个文件的词频统计"""
from collections import Counter
words = text.lower().split()
return Counter(words)
def merge_counters(counters):
"""合并多个Counter对象"""
result = Counter()
for counter in counters:
if counter is not None:
result.update(counter)
return result
# 使用示例
def count_words_in_files(directory):
file_paths = list(walk_files(directory))
counters = parallel_file_processor(
file_paths,
word_count
)
total_count = merge_counters(counters)
return total_count
高级应用
内存优化的大数据处理
import mmap
import os
def memory_mapped_reader(file_path, chunk_size=1024*1024):
"""内存映射文件读取器"""
file_size = os.path.getsize(file_path)
with open(file_path, 'rb') as f:
with mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) as mm:
for offset in range(0, file_size, chunk_size):
yield mm[offset:min(offset + chunk_size, file_size)]
def incremental_json_parser(byte_stream):
"""增量式JSON解析器"""
import json
decoder = json.JSONDecoder()
buffer = ''
for chunk in byte_stream:
buffer += chunk.decode('utf-8')
while buffer:
try:
obj, index = decoder.raw_decode(buffer)
yield obj
buffer = buffer[index:].lstrip()
except json.JSONDecodeError:
break
# 使用示例
def process_large_json_file(file_path):
reader = memory_mapped_reader(file_path)
parser = incremental_json_parser(reader)
for item in parser:
process_item(item)
实时数据分析管道
import time
from collections import deque
from statistics import mean, median, stdev
class RealTimeAnalytics:
"""实时数据分析器"""
def __init__(self, window_size=100):
self.window = deque(maxlen=window_size)
def add_value(self, value):
self.window.append(value)
def get_stats(self):
data = list(self.window)
if not data:
return None
return {
'count': len(data),
'mean': mean(data),
'median': median(data),
'std_dev': stdev(data) if len(data) > 1 else 0
}
def realtime_data_analyzer(data_stream, window_size=100):
"""实时数据分析生成器"""
analyzer = RealTimeAnalytics(window_size)
for value in data_stream:
analyzer.add_value(value)
stats = analyzer.get_stats()
if stats:
yield stats
def alert_generator(stats_stream, threshold):
"""异常检测生成器"""
for stats in stats_stream:
if abs(stats['mean']) > threshold:
yield {
'type': 'alert',
'message': f"数值超出阈值: {stats['mean']:.2f}",
'stats': stats
}
# 使用示例
def monitor_data_stream():
def data_source():
"""模拟数据源"""
import random
while True:
yield random.gauss(0, 1)
time.sleep(0.1)
data = data_source()
stats = realtime_data_analyzer(data, window_size=100)
alerts = alert_generator(stats, threshold=2.0)
for alert in alerts:
print(f"警报: {alert['message']}")