Python生成器示例

本页面提供了一系列实用的Python生成器示例,展示了生成器在数据处理、内存优化、流式处理等场景中的应用。这些示例将帮助你更好地理解和使用生成器。

数据处理生成器

数据转换生成器


def csv_reader(file_path, delimiter=','):
    """CSV文件读取生成器"""
    with open(file_path, 'r', encoding='utf-8') as f:
        header = next(f).strip().split(delimiter)
        for line in f:
            # 将每行数据转换为字典
            values = line.strip().split(delimiter)
            yield dict(zip(header, values))

def data_transformer(data_iterator):
    """数据转换生成器"""
    for record in data_iterator:
        # 数据转换和清洗
        if 'age' in record:
            record['age'] = int(record['age'])
        if 'salary' in record:
            record['salary'] = float(record['salary'])
        yield record

def data_filter(data_iterator, conditions):
    """数据过滤生成器"""
    for record in data_iterator:
        if all(
            record.get(k) == v 
            for k, v in conditions.items()
        ):
            yield record

# 使用示例
def process_employee_data(file_path):
    # 创建数据处理管道
    records = csv_reader(file_path)
    transformed_records = data_transformer(records)
    filtered_records = data_filter(
        transformed_records, 
        {'department': 'IT'}
    )
    
    # 处理过滤后的数据
    for record in filtered_records:
        print(record)
                    

数据聚合生成器


from collections import defaultdict

def group_by(data_iterator, key):
    """数据分组生成器"""
    groups = defaultdict(list)
    for item in data_iterator:
        groups[item[key]].append(item)
    
    for key, group in groups.items():
        yield key, group

def calculate_statistics(data_iterator):
    """统计计算生成器"""
    for key, group in data_iterator:
        # 计算每组的统计信息
        count = len(group)
        age_avg = sum(item['age'] for item in group) / count
        salary_sum = sum(item['salary'] for item in group)
        
        yield {
            'group': key,
            'count': count,
            'age_avg': round(age_avg, 2),
            'salary_sum': salary_sum
        }

# 使用示例
def analyze_employee_data(records):
    # 按部门分组
    grouped_data = group_by(records, 'department')
    # 计算统计信息
    statistics = calculate_statistics(grouped_data)
    
    for stat in statistics:
        print(f"部门: {stat['group']}")
        print(f"人数: {stat['count']}")
        print(f"平均年龄: {stat['age_avg']}")
        print(f"总薪资: {stat['salary_sum']}")
        print("-" * 20)
                    

内存优化生成器

大文件处理生成器


def chunk_reader(file_path, chunk_size=8192):
    """大文件分块读取生成器"""
    with open(file_path, 'rb') as f:
        while True:
            chunk = f.read(chunk_size)
            if not chunk:
                break
            yield chunk

def line_reader(file_path, encoding='utf-8'):
    """内存优化的行读取生成器"""
    buffer = ''
    for chunk in chunk_reader(file_path):
        buffer += chunk.decode(encoding)
        while '\n' in buffer:
            line, buffer = buffer.split('\n', 1)
            yield line
    if buffer:
        yield buffer

# 使用示例
def process_large_file(file_path):
    total_lines = 0
    for line in line_reader(file_path):
        # 处理每一行数据
        total_lines += 1
        if total_lines % 100000 == 0:
            print(f"已处理 {total_lines} 行")
                    

内存分页生成器


def paginate(items, page_size=10):
    """数据分页生成器"""
    for i in range(0, len(items), page_size):
        yield items[i:i + page_size]

def query_with_pagination(query, page_size=100):
    """数据库查询分页生成器"""
    offset = 0
    while True:
        # 模拟数据库分页查询
        results = query.limit(page_size).offset(offset)
        if not results:
            break
        yield results
        offset += page_size

# 使用示例
def process_data_in_batches(items):
    for batch in paginate(items, 100):
        # 批量处理数据
        process_batch(batch)

def process_query_results(query):
    for page in query_with_pagination(query):
        # 处理每页数据
        for item in page:
            process_item(item)
                    

流式处理生成器

实时数据处理生成器


import time
from datetime import datetime

def sensor_data_generator(interval=1):
    """模拟传感器数据生成器"""
    while True:
        timestamp = datetime.now()
        # 模拟传感器数据
        data = {
            'timestamp': timestamp,
            'temperature': random.uniform(20, 30),
            'humidity': random.uniform(40, 60)
        }
        yield data
        time.sleep(interval)

def moving_average(data_stream, window_size=5):
    """滑动平均值计算生成器"""
    window = []
    for data in data_stream:
        window.append(data['temperature'])
        if len(window) > window_size:
            window.pop(0)
        if len(window) == window_size:
            yield {
                'timestamp': data['timestamp'],
                'avg_temperature': sum(window) / window_size
            }

# 使用示例
def monitor_temperature():
    sensor_data = sensor_data_generator()
    averages = moving_average(sensor_data)
    
    for avg in averages:
        print(
            f"时间: {avg['timestamp']}, "
            f"平均温度: {avg['avg_temperature']:.2f}°C"
        )
                    

事件流处理生成器


def event_stream():
    """事件流生成器"""
    while True:
        # 模拟接收事件
        event = yield
        # 处理事件
        if event['type'] == 'user_login':
            print(f"用户登录: {event['user']}")
        elif event['type'] == 'user_logout':
            print(f"用户登出: {event['user']}")

def event_filter(handler, event_type):
    """事件过滤生成器"""
    while True:
        event = yield
        if event['type'] == event_type:
            handler.send(event)

# 使用示例
def handle_events():
    # 创建事件处理器
    stream = event_stream()
    next(stream)  # 初始化生成器
    
    # 创建事件过滤器
    login_filter = event_filter(stream, 'user_login')
    next(login_filter)
    
    # 发送事件
    events = [
        {'type': 'user_login', 'user': 'alice'},
        {'type': 'user_logout', 'user': 'bob'},
        {'type': 'user_login', 'user': 'charlie'}
    ]
    
    for event in events:
        login_filter.send(event)
                    

数据管道生成器

数据处理管道


class Pipeline:
    """数据处理管道"""
    
    def __init__(self):
        self.functions = []
    
    def pipe(self, func):
        """添加处理函数到管道"""
        self.functions.append(func)
        return self
    
    def process(self, data):
        """处理数据"""
        for func in self.functions:
            data = func(data)
            if hasattr(data, '__iter__') and not isinstance(data, (str, bytes)):
                data = list(data)
        return data

def normalize_text(text):
    """文本规范化"""
    return text.lower().strip()

def tokenize(text):
    """分词"""
    return text.split()

def remove_stopwords(tokens):
    """去除停用词"""
    stopwords = {'的', '了', '和', '是', '在'}
    return (token for token in tokens if token not in stopwords)

def count_words(tokens):
    """词频统计"""
    from collections import Counter
    return Counter(tokens)

# 使用示例
def process_text(text):
    pipeline = Pipeline()
    pipeline.pipe(normalize_text)\
            .pipe(tokenize)\
            .pipe(remove_stopwords)\
            .pipe(count_words)
    
    result = pipeline.process(text)
    return result
                    

图像处理管道


def load_image(path):
    """图像加载生成器"""
    # 模拟图像加载
    yield f"加载图像: {path}"

def resize_image(image_stream, size):
    """图像缩放生成器"""
    for image in image_stream:
        yield f"缩放图像到 {size}: {image}"

def apply_filter(image_stream, filter_name):
    """应用滤镜生成器"""
    for image in image_stream:
        yield f"应用滤镜 {filter_name}: {image}"

def save_image(image_stream, output_path):
    """图像保存生成器"""
    for image in image_stream:
        yield f"保存图像到 {output_path}: {image}"

# 使用示例
def process_image(input_path, output_path):
    # 创建图像处理管道
    pipeline = (
        save_image(
            apply_filter(
                resize_image(
                    load_image(input_path),
                    (800, 600)
                ),
                'sharpen'
            ),
            output_path
        )
    )
    
    # 执行处理管道
    for step in pipeline:
        print(step)
                    

协程应用生成器

任务调度器


def task_scheduler():
    """简单的任务调度器"""
    tasks = {}
    while True:
        action, *args = yield
        if action == "add":
            task_id, task = args
            tasks[task_id] = task
            print(f"添加任务: {task_id}")
        elif action == "remove":
            task_id = args[0]
            if task_id in tasks:
                del tasks[task_id]
                print(f"移除任务: {task_id}")
        elif action == "get":
            task_id = args[0]
            yield tasks.get(task_id)

# 使用示例
def run_scheduler():
    scheduler = task_scheduler()
    next(scheduler)  # 初始化调度器
    
    # 添加任务
    scheduler.send(("add", 1, "任务1"))
    scheduler.send(("add", 2, "任务2"))
    
    # 获取任务
    result = scheduler.send(("get", 1))
    print(f"获取任务1: {result}")
    
    # 移除任务
    scheduler.send(("remove", 1))
                    

状态机生成器


def state_machine():
    """简单的状态机"""
    state = 'INIT'
    while True:
        event = yield state
        print(f"当前状态: {state}, 收到事件: {event}")
        
        if state == 'INIT':
            if event == 'start':
                state = 'RUNNING'
        elif state == 'RUNNING':
            if event == 'pause':
                state = 'PAUSED'
            elif event == 'stop':
                state = 'STOPPED'
        elif state == 'PAUSED':
            if event == 'resume':
                state = 'RUNNING'
            elif event == 'stop':
                state = 'STOPPED'
        elif state == 'STOPPED':
            if event == 'reset':
                state = 'INIT'

# 使用示例
def run_state_machine():
    sm = state_machine()
    current_state = next(sm)  # 初始化状态机
    
    # 发送事件
    events = ['start', 'pause', 'resume', 'stop', 'reset']
    for event in events:
        current_state = sm.send(event)
        print(f"状态机状态: {current_state}")
                    

设计模式生成器

观察者模式


class Observable:
    """可观察对象"""
    
    def __init__(self):
        self.observers = []
    
    def add_observer(self, observer):
        self.observers.append(observer)
    
    def remove_observer(self, observer):
        self.observers.remove(observer)
    
    def notify_observers(self, event):
        for observer in self.observers:
            observer.send(event)

def create_observer(name):
    """创建观察者生成器"""
    while True:
        event = yield
        print(f"观察者 {name} 收到事件: {event}")

# 使用示例
def demo_observer_pattern():
    # 创建可观察对象
    subject = Observable()
    
    # 创建观察者
    observer1 = create_observer("观察者1")
    observer2 = create_observer("观察者2")
    next(observer1)  # 初始化观察者
    next(observer2)
    
    # 添加观察者
    subject.add_observer(observer1)
    subject.add_observer(observer2)
    
    # 发送事件
    subject.notify_observers("测试事件")
                    

迭代器模式


class TreeNode:
    """树节点"""
    def __init__(self, value):
        self.value = value
        self.left = None
        self.right = None

def inorder_traversal(root):
    """中序遍历生成器"""
    if root:
        yield from inorder_traversal(root.left)
        yield root.value
        yield from inorder_traversal(root.right)

def preorder_traversal(root):
    """前序遍历生成器"""
    if root:
        yield root.value
        yield from preorder_traversal(root.left)
        yield from preorder_traversal(root.right)

def postorder_traversal(root):
    """后序遍历生成器"""
    if root:
        yield from postorder_traversal(root.left)
        yield from postorder_traversal(root.right)
        yield root.value

# 使用示例
def traverse_tree():
    # 创建二叉树
    root = TreeNode(1)
    root.left = TreeNode(2)
    root.right = TreeNode(3)
    root.left.left = TreeNode(4)
    root.left.right = TreeNode(5)
    
    print("中序遍历:")
    for value in inorder_traversal(root):
        print(value, end=' ')
    
    print("\n前序遍历:")
    for value in preorder_traversal(root):
        print(value, end=' ')
    
    print("\n后序遍历:")
    for value in postorder_traversal(root):
        print(value, end=' ')