Python生成器详解

生成器是Python中一种优雅的迭代器实现方式,它能够高效地处理大量数据并节省内存。本教程将帮助你深入理解生成器的工作原理和应用场景。

生成器简介

什么是生成器?

生成器是一种特殊的迭代器,它能够"记住"上一次执行的状态,并在下一次调用时从该位置继续执行。生成器具有以下特点:

  • 惰性求值:只在需要时才生成值
  • 状态保持:能够记住上次执行的位置
  • 内存效率:不会一次性加载所有数据到内存
  • 单向迭代:只能向前迭代,不能回退

为什么使用生成器?

  • 内存友好:处理大数据集时不会耗尽内存
  • 执行效率:避免一次性计算所有值
  • 代码简洁:简化迭代器的实现
  • 流式处理:适合处理实时数据流

生成器函数

基本语法


# 使用yield关键字创建生成器函数
def count_up_to(n):
    i = 1
    while i <= n:
        yield i
        i += 1

# 使用生成器
counter = count_up_to(5)
print(next(counter))  # 1
print(next(counter))  # 2
print(next(counter))  # 3

# 使用for循环遍历
for num in count_up_to(5):
    print(num)  # 1, 2, 3, 4, 5
                    

多个yield语句


def fibonacci():
    """无限斐波那契数列生成器"""
    a, b = 0, 1
    while True:
        yield a
        a, b = b, a + b

# 使用示例
fib = fibonacci()
for _ in range(10):
    print(next(fib))  # 0, 1, 1, 2, 3, 5, 8, 13, 21, 34
                    

生成器表达式

基本用法


# 生成器表达式语法:(表达式 for 变量 in 可迭代对象)
squares = (x**2 for x in range(10))
print(next(squares))  # 0
print(next(squares))  # 1

# 带条件的生成器表达式
even_squares = (x**2 for x in range(10) if x % 2 == 0)
print(list(even_squares))  # [0, 4, 16, 36, 64]

# 链式生成器
numbers = (x for x in range(100))
filtered = (x for x in numbers if x % 2 == 0)
squared = (x**2 for x in filtered)
                    

注意事项

  • 生成器只能遍历一次
  • 不支持索引和切片操作
  • 不能直接获取长度(len())

高级特性

send()和yield表达式


def counter_with_step():
    """可以动态调整步长的计数器"""
    count = 0
    step = 1
    while True:
        # yield可以接收send()发送的值
        new_step = yield count
        if new_step is not None:
            step = new_step
        count += step

# 使用示例
c = counter_with_step()
print(next(c))      # 0
print(c.send(2))    # 2
print(c.send(3))    # 5
print(c.send(1))    # 6
                    

子生成器(yield from)


def sub_gen():
    yield 1
    yield 2
    yield 3

def main_gen():
    yield 'a'
    yield from sub_gen()  # 委托给子生成器
    yield 'b'

# 使用示例
for item in main_gen():
    print(item)  # a, 1, 2, 3, b
                    

实际应用

文件读取


def read_large_file(file_path, chunk_size=1024):
    """分块读取大文件"""
    with open(file_path, 'r') as f:
        while True:
            chunk = f.read(chunk_size)
            if not chunk:
                break
            yield chunk

# 使用示例
for chunk in read_large_file('large_file.txt'):
    process_chunk(chunk)
                    

数据流处理


def process_log_file(file_path):
    """处理日志文件的生成器"""
    def parse_line(line):
        # 解析日志行
        return line.strip().split(',')
    
    def filter_errors(entries):
        # 只保留错误日志
        for entry in entries:
            if entry[1] == 'ERROR':
                yield entry
    
    def format_output(entries):
        # 格式化输出
        for entry in entries:
            yield f"{entry[0]}: {entry[2]}"
    
    # 构建处理管道
    lines = (line for line in open(file_path))
    parsed = (parse_line(line) for line in lines)
    errors = filter_errors(parsed)
    return format_output(errors)

# 使用示例
for error in process_log_file('app.log'):
    print(error)
                    

性能优化

内存使用对比


import sys

# 列表方式
numbers_list = [x**2 for x in range(1000000)]
print(f"列表占用内存: {sys.getsizeof(numbers_list) / 1024:.2f} KB")

# 生成器方式
numbers_gen = (x**2 for x in range(1000000))
print(f"生成器占用内存: {sys.getsizeof(numbers_gen) / 1024:.2f} KB")

# 性能测试
import time

def measure_time(func):
    start = time.time()
    func()
    return time.time() - start

def process_list():
    return sum([x**2 for x in range(1000000)])

def process_generator():
    return sum(x**2 for x in range(1000000))

print(f"列表处理时间: {measure_time(process_list):.2f}秒")
print(f"生成器处理时间: {measure_time(process_generator):.2f}秒")
                    

优化建议

  • 处理大数据集时优先使用生成器
  • 避免多次遍历同一个生成器
  • 合理使用生成器表达式链
  • 注意生成器的惰性求值特性