Python高级生成器示例

本页面展示了Python生成器的高级用法,包括异步生成器、子生成器、生成器表达式等复杂应用场景。这些示例将帮助你深入理解生成器的强大功能。

异步生成器

异步数据流生成器


import asyncio
import aiohttp

async def fetch_data(url):
    """异步HTTP请求"""
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            return await response.json()

async def async_data_stream(urls):
    """异步数据流生成器"""
    for url in urls:
        try:
            data = await fetch_data(url)
            yield data
        except Exception as e:
            print(f"获取数据失败: {url}, 错误: {e}")

async def process_data_stream(urls):
    """处理异步数据流"""
    async for data in async_data_stream(urls):
        # 处理每条数据
        process_result = await process_single_data(data)
        yield process_result

async def process_single_data(data):
    """处理单条数据"""
    await asyncio.sleep(0.1)  # 模拟处理时间
    return {
        'status': 'processed',
        'data': data
    }

# 使用示例
async def main():
    urls = [
        'http://api.example.com/data/1',
        'http://api.example.com/data/2',
        'http://api.example.com/data/3'
    ]
    
    async for result in process_data_stream(urls):
        print(f"处理结果: {result}")

# asyncio.run(main())
                    

异步批处理生成器


async def batch_processor(data_stream, batch_size=10):
    """异步批处理生成器"""
    batch = []
    async for item in data_stream:
        batch.append(item)
        if len(batch) >= batch_size:
            yield batch
            batch = []
    if batch:
        yield batch

async def process_batch(batch):
    """批量处理数据"""
    tasks = [process_single_data(item) for item in batch]
    return await asyncio.gather(*tasks)

async def stream_with_backpressure(data_stream, max_buffer=100):
    """带背压的流处理生成器"""
    buffer = asyncio.Queue(maxsize=max_buffer)
    
    # 生产者协程
    async def producer():
        async for item in data_stream:
            await buffer.put(item)
        await buffer.put(None)  # 结束标记
    
    # 启动生产者
    asyncio.create_task(producer())
    
    # 消费数据
    while True:
        item = await buffer.get()
        if item is None:
            break
        yield item

# 使用示例
async def demo_batch_processing():
    async def data_generator():
        for i in range(100):
            await asyncio.sleep(0.01)
            yield {'id': i, 'value': f'data_{i}'}
    
    async for batch in batch_processor(data_generator(), 10):
        results = await process_batch(batch)
        print(f"处理批次: {len(results)} 条数据")
                    

子生成器

递归目录遍历生成器


import os
from pathlib import Path

def walk_files(directory):
    """递归遍历目录生成器"""
    for entry in os.scandir(directory):
        if entry.is_file():
            yield entry.path
        elif entry.is_dir():
            yield from walk_files(entry.path)

def file_type_filter(files, extensions):
    """文件类型过滤器"""
    for file in files:
        if any(file.endswith(ext) for ext in extensions):
            yield file

def file_content_reader(files):
    """文件内容读取生成器"""
    for file in files:
        try:
            with open(file, 'r', encoding='utf-8') as f:
                yield file, f.read()
        except Exception as e:
            print(f"读取文件失败: {file}, 错误: {e}")

# 使用示例
def search_in_files(directory, extensions, keyword):
    """搜索文件内容"""
    files = walk_files(directory)
    filtered_files = file_type_filter(files, extensions)
    
    for file_path, content in file_content_reader(filtered_files):
        if keyword in content:
            yield file_path, content.count(keyword)
                    

数据流转换管道


class DataPipeline:
    """数据流转换管道"""
    
    def __init__(self):
        self.transformers = []
    
    def add_transformer(self, transformer):
        """添加转换器"""
        self.transformers.append(transformer)
        return self
    
    def process(self, data_stream):
        """处理数据流"""
        for transformer in self.transformers:
            data_stream = transformer(data_stream)
        yield from data_stream

def split_lines(data_stream):
    """行分割转换器"""
    for chunk in data_stream:
        yield from chunk.splitlines()

def parse_json(data_stream):
    """JSON解析转换器"""
    import json
    for line in data_stream:
        try:
            yield json.loads(line)
        except json.JSONDecodeError:
            continue

def validate_data(data_stream):
    """数据验证转换器"""
    for item in data_stream:
        if all(k in item for k in ['id', 'name', 'value']):
            yield item

# 使用示例
def process_log_file(file_path):
    def read_chunks(file_path, chunk_size=8192):
        with open(file_path, 'r') as f:
            while True:
                chunk = f.read(chunk_size)
                if not chunk:
                    break
                yield chunk
    
    pipeline = DataPipeline()
    pipeline.add_transformer(split_lines)\
           .add_transformer(parse_json)\
           .add_transformer(validate_data)
    
    for item in pipeline.process(read_chunks(file_path)):
        print(f"处理数据: {item}")
                    

生成器表达式

高级生成器表达式


def matrix_operations():
    """矩阵运算示例"""
    # 创建矩阵
    matrix = [
        [1, 2, 3],
        [4, 5, 6],
        [7, 8, 9]
    ]
    
    # 转置矩阵
    transpose = (
        [row[i] for row in matrix]
        for i in range(len(matrix[0]))
    )
    
    # 矩阵元素平方
    squared = (
        (x * x for x in row)
        for row in matrix
    )
    
    # 对角线元素
    diagonal = (
        matrix[i][i]
        for i in range(len(matrix))
    )
    
    # 上三角矩阵
    upper_triangle = (
        (val if j >= i else 0
         for j, val in enumerate(row))
        for i, row in enumerate(matrix)
    )
    
    return transpose, squared, diagonal, upper_triangle

# 使用示例
def demo_matrix_operations():
    transpose, squared, diagonal, upper = matrix_operations()
    
    print("转置矩阵:")
    for row in transpose:
        print(list(row))
    
    print("\n平方矩阵:")
    for row in squared:
        print(list(row))
    
    print("\n对角线元素:")
    print(list(diagonal))
    
    print("\n上三角矩阵:")
    for row in upper:
        print(list(row))
                    

链式生成器表达式


def chain_expressions(data):
    """链式生成器表达式示例"""
    # 数据转换
    numbers = (int(x) for x in data if x.isdigit())
    
    # 过滤偶数
    evens = (x for x in numbers if x % 2 == 0)
    
    # 计算平方
    squares = (x * x for x in evens)
    
    # 累积和
    running_sum = (
        sum(squares[i] for i in range(j + 1))
        for j in range(len(list(squares)))
    )
    
    return running_sum

# 使用示例
def demo_chain_expressions():
    data = ['1', 'a', '2', '3', 'b', '4', '5', '6']
    result = chain_expressions(data)
    print("累积和序列:")
    print(list(result))
                    

上下文管理

资源管理生成器


from contextlib import contextmanager
import threading

class ResourcePool:
    """资源池"""
    def __init__(self, size):
        self.resources = [Resource(i) for i in range(size)]
        self.lock = threading.Lock()
        self.available = threading.Semaphore(size)
    
    @contextmanager
    def acquire(self):
        """获取资源的上下文管理器"""
        self.available.acquire()
        try:
            with self.lock:
                resource = next(r for r in self.resources if not r.in_use)
                resource.in_use = True
            try:
                yield resource
            finally:
                with self.lock:
                    resource.in_use = False
        finally:
            self.available.release()

class Resource:
    """模拟资源"""
    def __init__(self, id):
        self.id = id
        self.in_use = False
    
    def use(self):
        return f"使用资源 {self.id}"

def resource_user(pool):
    """资源使用生成器"""
    while True:
        with pool.acquire() as resource:
            yield resource.use()

# 使用示例
def demo_resource_pool():
    pool = ResourcePool(3)
    users = [resource_user(pool) for _ in range(5)]
    
    for _ in range(10):
        for user in users:
            print(next(user))
                    

事务管理生成器


class Transaction:
    """简单事务管理器"""
    def __init__(self):
        self.operations = []
    
    def add(self, operation):
        self.operations.append(operation)
    
    def rollback(self):
        for operation in reversed(self.operations):
            operation.undo()
        self.operations.clear()
    
    def commit(self):
        self.operations.clear()

@contextmanager
def transaction():
    """事务上下文管理器"""
    tx = Transaction()
    try:
        yield tx
        tx.commit()
    except Exception:
        tx.rollback()
        raise

class Operation:
    """可回滚的操作"""
    def __init__(self, data):
        self.data = data
        self.old_value = None
    
    def execute(self, value):
        self.old_value = self.data['value']
        self.data['value'] = value
    
    def undo(self):
        if self.old_value is not None:
            self.data['value'] = self.old_value

# 使用示例
def update_with_transaction(data):
    with transaction() as tx:
        op = Operation(data)
        op.execute("新值")
        tx.add(op)
        # 可能引发异常的操作
        if data['value'] == "新值":
            raise ValueError("测试回滚")
                    

并行处理

并行生成器


import multiprocessing
from concurrent.futures import ProcessPoolExecutor

def parallel_processor(data_stream, process_func, max_workers=None):
    """并行处理生成器"""
    with ProcessPoolExecutor(max_workers=max_workers) as executor:
        yield from executor.map(process_func, data_stream)

def parallel_chunk_processor(data_stream, process_func, chunk_size=1000):
    """并行分块处理生成器"""
    def process_chunk(chunk):
        return [process_func(item) for item in chunk]
    
    current_chunk = []
    for item in data_stream:
        current_chunk.append(item)
        if len(current_chunk) >= chunk_size:
            yield from parallel_processor(current_chunk, process_func)
            current_chunk = []
    
    if current_chunk:
        yield from parallel_processor(current_chunk, process_func)

# 使用示例
def cpu_intensive_task(x):
    """CPU密集型任务"""
    result = 0
    for i in range(1000000):
        result += i * x
    return result

def process_large_dataset():
    data = range(10000)
    results = parallel_chunk_processor(
        data,
        cpu_intensive_task,
        chunk_size=100
    )
    
    for result in results:
        print(f"处理结果: {result}")
                    

并行文件处理


def parallel_file_processor(file_paths, process_func, max_workers=None):
    """并行文件处理生成器"""
    def process_file(file_path):
        try:
            with open(file_path, 'r') as f:
                content = f.read()
            return process_func(content)
        except Exception as e:
            print(f"处理文件失败: {file_path}, 错误: {e}")
            return None
    
    yield from parallel_processor(
        file_paths,
        process_file,
        max_workers
    )

def word_count(text):
    """单个文件的词频统计"""
    from collections import Counter
    words = text.lower().split()
    return Counter(words)

def merge_counters(counters):
    """合并多个Counter对象"""
    result = Counter()
    for counter in counters:
        if counter is not None:
            result.update(counter)
    return result

# 使用示例
def count_words_in_files(directory):
    file_paths = list(walk_files(directory))
    counters = parallel_file_processor(
        file_paths,
        word_count
    )
    
    total_count = merge_counters(counters)
    return total_count
                    

高级应用

内存优化的大数据处理


import mmap
import os

def memory_mapped_reader(file_path, chunk_size=1024*1024):
    """内存映射文件读取器"""
    file_size = os.path.getsize(file_path)
    with open(file_path, 'rb') as f:
        with mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) as mm:
            for offset in range(0, file_size, chunk_size):
                yield mm[offset:min(offset + chunk_size, file_size)]

def incremental_json_parser(byte_stream):
    """增量式JSON解析器"""
    import json
    decoder = json.JSONDecoder()
    buffer = ''
    
    for chunk in byte_stream:
        buffer += chunk.decode('utf-8')
        while buffer:
            try:
                obj, index = decoder.raw_decode(buffer)
                yield obj
                buffer = buffer[index:].lstrip()
            except json.JSONDecodeError:
                break

# 使用示例
def process_large_json_file(file_path):
    reader = memory_mapped_reader(file_path)
    parser = incremental_json_parser(reader)
    
    for item in parser:
        process_item(item)
                    

实时数据分析管道


import time
from collections import deque
from statistics import mean, median, stdev

class RealTimeAnalytics:
    """实时数据分析器"""
    
    def __init__(self, window_size=100):
        self.window = deque(maxlen=window_size)
    
    def add_value(self, value):
        self.window.append(value)
    
    def get_stats(self):
        data = list(self.window)
        if not data:
            return None
        
        return {
            'count': len(data),
            'mean': mean(data),
            'median': median(data),
            'std_dev': stdev(data) if len(data) > 1 else 0
        }

def realtime_data_analyzer(data_stream, window_size=100):
    """实时数据分析生成器"""
    analyzer = RealTimeAnalytics(window_size)
    
    for value in data_stream:
        analyzer.add_value(value)
        stats = analyzer.get_stats()
        if stats:
            yield stats

def alert_generator(stats_stream, threshold):
    """异常检测生成器"""
    for stats in stats_stream:
        if abs(stats['mean']) > threshold:
            yield {
                'type': 'alert',
                'message': f"数值超出阈值: {stats['mean']:.2f}",
                'stats': stats
            }

# 使用示例
def monitor_data_stream():
    def data_source():
        """模拟数据源"""
        import random
        while True:
            yield random.gauss(0, 1)
            time.sleep(0.1)
    
    data = data_source()
    stats = realtime_data_analyzer(data, window_size=100)
    alerts = alert_generator(stats, threshold=2.0)
    
    for alert in alerts:
        print(f"警报: {alert['message']}")