数据处理生成器
数据转换生成器
def csv_reader(file_path, delimiter=','):
"""CSV文件读取生成器"""
with open(file_path, 'r', encoding='utf-8') as f:
header = next(f).strip().split(delimiter)
for line in f:
# 将每行数据转换为字典
values = line.strip().split(delimiter)
yield dict(zip(header, values))
def data_transformer(data_iterator):
"""数据转换生成器"""
for record in data_iterator:
# 数据转换和清洗
if 'age' in record:
record['age'] = int(record['age'])
if 'salary' in record:
record['salary'] = float(record['salary'])
yield record
def data_filter(data_iterator, conditions):
"""数据过滤生成器"""
for record in data_iterator:
if all(
record.get(k) == v
for k, v in conditions.items()
):
yield record
# 使用示例
def process_employee_data(file_path):
# 创建数据处理管道
records = csv_reader(file_path)
transformed_records = data_transformer(records)
filtered_records = data_filter(
transformed_records,
{'department': 'IT'}
)
# 处理过滤后的数据
for record in filtered_records:
print(record)
数据聚合生成器
from collections import defaultdict
def group_by(data_iterator, key):
"""数据分组生成器"""
groups = defaultdict(list)
for item in data_iterator:
groups[item[key]].append(item)
for key, group in groups.items():
yield key, group
def calculate_statistics(data_iterator):
"""统计计算生成器"""
for key, group in data_iterator:
# 计算每组的统计信息
count = len(group)
age_avg = sum(item['age'] for item in group) / count
salary_sum = sum(item['salary'] for item in group)
yield {
'group': key,
'count': count,
'age_avg': round(age_avg, 2),
'salary_sum': salary_sum
}
# 使用示例
def analyze_employee_data(records):
# 按部门分组
grouped_data = group_by(records, 'department')
# 计算统计信息
statistics = calculate_statistics(grouped_data)
for stat in statistics:
print(f"部门: {stat['group']}")
print(f"人数: {stat['count']}")
print(f"平均年龄: {stat['age_avg']}")
print(f"总薪资: {stat['salary_sum']}")
print("-" * 20)
内存优化生成器
大文件处理生成器
def chunk_reader(file_path, chunk_size=8192):
"""大文件分块读取生成器"""
with open(file_path, 'rb') as f:
while True:
chunk = f.read(chunk_size)
if not chunk:
break
yield chunk
def line_reader(file_path, encoding='utf-8'):
"""内存优化的行读取生成器"""
buffer = ''
for chunk in chunk_reader(file_path):
buffer += chunk.decode(encoding)
while '\n' in buffer:
line, buffer = buffer.split('\n', 1)
yield line
if buffer:
yield buffer
# 使用示例
def process_large_file(file_path):
total_lines = 0
for line in line_reader(file_path):
# 处理每一行数据
total_lines += 1
if total_lines % 100000 == 0:
print(f"已处理 {total_lines} 行")
内存分页生成器
def paginate(items, page_size=10):
"""数据分页生成器"""
for i in range(0, len(items), page_size):
yield items[i:i + page_size]
def query_with_pagination(query, page_size=100):
"""数据库查询分页生成器"""
offset = 0
while True:
# 模拟数据库分页查询
results = query.limit(page_size).offset(offset)
if not results:
break
yield results
offset += page_size
# 使用示例
def process_data_in_batches(items):
for batch in paginate(items, 100):
# 批量处理数据
process_batch(batch)
def process_query_results(query):
for page in query_with_pagination(query):
# 处理每页数据
for item in page:
process_item(item)
流式处理生成器
实时数据处理生成器
import time
from datetime import datetime
def sensor_data_generator(interval=1):
"""模拟传感器数据生成器"""
while True:
timestamp = datetime.now()
# 模拟传感器数据
data = {
'timestamp': timestamp,
'temperature': random.uniform(20, 30),
'humidity': random.uniform(40, 60)
}
yield data
time.sleep(interval)
def moving_average(data_stream, window_size=5):
"""滑动平均值计算生成器"""
window = []
for data in data_stream:
window.append(data['temperature'])
if len(window) > window_size:
window.pop(0)
if len(window) == window_size:
yield {
'timestamp': data['timestamp'],
'avg_temperature': sum(window) / window_size
}
# 使用示例
def monitor_temperature():
sensor_data = sensor_data_generator()
averages = moving_average(sensor_data)
for avg in averages:
print(
f"时间: {avg['timestamp']}, "
f"平均温度: {avg['avg_temperature']:.2f}°C"
)
事件流处理生成器
def event_stream():
"""事件流生成器"""
while True:
# 模拟接收事件
event = yield
# 处理事件
if event['type'] == 'user_login':
print(f"用户登录: {event['user']}")
elif event['type'] == 'user_logout':
print(f"用户登出: {event['user']}")
def event_filter(handler, event_type):
"""事件过滤生成器"""
while True:
event = yield
if event['type'] == event_type:
handler.send(event)
# 使用示例
def handle_events():
# 创建事件处理器
stream = event_stream()
next(stream) # 初始化生成器
# 创建事件过滤器
login_filter = event_filter(stream, 'user_login')
next(login_filter)
# 发送事件
events = [
{'type': 'user_login', 'user': 'alice'},
{'type': 'user_logout', 'user': 'bob'},
{'type': 'user_login', 'user': 'charlie'}
]
for event in events:
login_filter.send(event)
数据管道生成器
数据处理管道
class Pipeline:
"""数据处理管道"""
def __init__(self):
self.functions = []
def pipe(self, func):
"""添加处理函数到管道"""
self.functions.append(func)
return self
def process(self, data):
"""处理数据"""
for func in self.functions:
data = func(data)
if hasattr(data, '__iter__') and not isinstance(data, (str, bytes)):
data = list(data)
return data
def normalize_text(text):
"""文本规范化"""
return text.lower().strip()
def tokenize(text):
"""分词"""
return text.split()
def remove_stopwords(tokens):
"""去除停用词"""
stopwords = {'的', '了', '和', '是', '在'}
return (token for token in tokens if token not in stopwords)
def count_words(tokens):
"""词频统计"""
from collections import Counter
return Counter(tokens)
# 使用示例
def process_text(text):
pipeline = Pipeline()
pipeline.pipe(normalize_text)\
.pipe(tokenize)\
.pipe(remove_stopwords)\
.pipe(count_words)
result = pipeline.process(text)
return result
图像处理管道
def load_image(path):
"""图像加载生成器"""
# 模拟图像加载
yield f"加载图像: {path}"
def resize_image(image_stream, size):
"""图像缩放生成器"""
for image in image_stream:
yield f"缩放图像到 {size}: {image}"
def apply_filter(image_stream, filter_name):
"""应用滤镜生成器"""
for image in image_stream:
yield f"应用滤镜 {filter_name}: {image}"
def save_image(image_stream, output_path):
"""图像保存生成器"""
for image in image_stream:
yield f"保存图像到 {output_path}: {image}"
# 使用示例
def process_image(input_path, output_path):
# 创建图像处理管道
pipeline = (
save_image(
apply_filter(
resize_image(
load_image(input_path),
(800, 600)
),
'sharpen'
),
output_path
)
)
# 执行处理管道
for step in pipeline:
print(step)
协程应用生成器
任务调度器
def task_scheduler():
"""简单的任务调度器"""
tasks = {}
while True:
action, *args = yield
if action == "add":
task_id, task = args
tasks[task_id] = task
print(f"添加任务: {task_id}")
elif action == "remove":
task_id = args[0]
if task_id in tasks:
del tasks[task_id]
print(f"移除任务: {task_id}")
elif action == "get":
task_id = args[0]
yield tasks.get(task_id)
# 使用示例
def run_scheduler():
scheduler = task_scheduler()
next(scheduler) # 初始化调度器
# 添加任务
scheduler.send(("add", 1, "任务1"))
scheduler.send(("add", 2, "任务2"))
# 获取任务
result = scheduler.send(("get", 1))
print(f"获取任务1: {result}")
# 移除任务
scheduler.send(("remove", 1))
状态机生成器
def state_machine():
"""简单的状态机"""
state = 'INIT'
while True:
event = yield state
print(f"当前状态: {state}, 收到事件: {event}")
if state == 'INIT':
if event == 'start':
state = 'RUNNING'
elif state == 'RUNNING':
if event == 'pause':
state = 'PAUSED'
elif event == 'stop':
state = 'STOPPED'
elif state == 'PAUSED':
if event == 'resume':
state = 'RUNNING'
elif event == 'stop':
state = 'STOPPED'
elif state == 'STOPPED':
if event == 'reset':
state = 'INIT'
# 使用示例
def run_state_machine():
sm = state_machine()
current_state = next(sm) # 初始化状态机
# 发送事件
events = ['start', 'pause', 'resume', 'stop', 'reset']
for event in events:
current_state = sm.send(event)
print(f"状态机状态: {current_state}")
设计模式生成器
观察者模式
class Observable:
"""可观察对象"""
def __init__(self):
self.observers = []
def add_observer(self, observer):
self.observers.append(observer)
def remove_observer(self, observer):
self.observers.remove(observer)
def notify_observers(self, event):
for observer in self.observers:
observer.send(event)
def create_observer(name):
"""创建观察者生成器"""
while True:
event = yield
print(f"观察者 {name} 收到事件: {event}")
# 使用示例
def demo_observer_pattern():
# 创建可观察对象
subject = Observable()
# 创建观察者
observer1 = create_observer("观察者1")
observer2 = create_observer("观察者2")
next(observer1) # 初始化观察者
next(observer2)
# 添加观察者
subject.add_observer(observer1)
subject.add_observer(observer2)
# 发送事件
subject.notify_observers("测试事件")
迭代器模式
class TreeNode:
"""树节点"""
def __init__(self, value):
self.value = value
self.left = None
self.right = None
def inorder_traversal(root):
"""中序遍历生成器"""
if root:
yield from inorder_traversal(root.left)
yield root.value
yield from inorder_traversal(root.right)
def preorder_traversal(root):
"""前序遍历生成器"""
if root:
yield root.value
yield from preorder_traversal(root.left)
yield from preorder_traversal(root.right)
def postorder_traversal(root):
"""后序遍历生成器"""
if root:
yield from postorder_traversal(root.left)
yield from postorder_traversal(root.right)
yield root.value
# 使用示例
def traverse_tree():
# 创建二叉树
root = TreeNode(1)
root.left = TreeNode(2)
root.right = TreeNode(3)
root.left.left = TreeNode(4)
root.left.right = TreeNode(5)
print("中序遍历:")
for value in inorder_traversal(root):
print(value, end=' ')
print("\n前序遍历:")
for value in preorder_traversal(root):
print(value, end=' ')
print("\n后序遍历:")
for value in postorder_traversal(root):
print(value, end=' ')