Python高级文件操作示例

本页面提供了Python高级文件操作的实用示例,包括文件加密解密、大文件处理、文件监控、CSV/JSON处理等进阶操作方法。这些示例将帮助你更好地处理复杂的文件操作场景。

文件安全

简单文件加密


from cryptography.fernet import Fernet
import base64

def generate_key():
    """生成加密密钥"""
    return Fernet.generate_key()

def encrypt_file(file_path, key):
    """加密文件"""
    f = Fernet(key)
    with open(file_path, 'rb') as file:
        file_data = file.read()
    
    encrypted_data = f.encrypt(file_data)
    with open(file_path + '.encrypted', 'wb') as file:
        file.write(encrypted_data)

def decrypt_file(file_path, key):
    """解密文件"""
    f = Fernet(key)
    with open(file_path, 'rb') as file:
        encrypted_data = file.read()
    
    decrypted_data = f.decrypt(encrypted_data)
    with open(file_path[:-10], 'wb') as file:
        file.write(decrypted_data)

# 使用示例
key = generate_key()
encrypt_file('secret.txt', key)
decrypt_file('secret.txt.encrypted', key)
                    

文件校验


import hashlib

def calculate_file_hash(file_path, hash_type='sha256'):
    """计算文件哈希值"""
    hash_func = getattr(hashlib, hash_type)()
    with open(file_path, 'rb') as f:
        for chunk in iter(lambda: f.read(4096), b''):
            hash_func.update(chunk)
    return hash_func.hexdigest()

def verify_file_integrity(file_path, original_hash):
    """验证文件完整性"""
    current_hash = calculate_file_hash(file_path)
    return current_hash == original_hash

# 使用示例
file_hash = calculate_file_hash('important.txt')
print(f"文件哈希值: {file_hash}")
is_valid = verify_file_integrity('important.txt', file_hash)
print(f"文件完整性: {'完整' if is_valid else '已被修改'}")
                    

大文件处理

分块读写大文件


def process_large_file(input_path, output_path, chunk_size=8192):
    """分块处理大文件"""
    with open(input_path, 'rb') as source:
        with open(output_path, 'wb') as dest:
            while True:
                chunk = source.read(chunk_size)
                if not chunk:
                    break
                # 这里可以添加处理逻辑
                processed_chunk = chunk.upper()  # 示例:转换为大写
                dest.write(processed_chunk)

def count_lines_efficiently(file_path):
    """高效统计文件行数"""
    count = 0
    with open(file_path, 'rb') as f:
        while True:
            buffer = f.read(8192*1024)
            if not buffer:
                break
            count += buffer.count(b'\n')
    return count

# 使用生成器处理大文件
def read_large_file(file_path):
    """使用生成器读取大文件"""
    with open(file_path, 'r', encoding='utf-8') as f:
        while True:
            chunk = f.read(8192)
            if not chunk:
                break
            yield chunk
                    

内存映射文件


import mmap
import os

def process_with_mmap(file_path):
    """使用内存映射处理大文件"""
    with open(file_path, 'r+b') as f:
        # 获取文件大小
        size = os.path.getsize(file_path)
        
        # 创建内存映射
        with mmap.mmap(f.fileno(), size) as mm:
            # 查找特定内容
            position = mm.find(b'search_term')
            if position != -1:
                print(f"找到内容在位置: {position}")
            
            # 替换内容
            mm.seek(0)
            content = mm.read().replace(b'old', b'new')
            mm.seek(0)
            mm.write(content)
                    

文件监控

实时文件监控


from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import time

class FileChangeHandler(FileSystemEventHandler):
    def on_modified(self, event):
        if not event.is_directory:
            print(f"文件被修改: {event.src_path}")
            # 可以添加自定义处理逻辑
    
    def on_created(self, event):
        if not event.is_directory:
            print(f"新文件创建: {event.src_path}")
    
    def on_deleted(self, event):
        if not event.is_directory:
            print(f"文件被删除: {event.src_path}")

def monitor_directory(path):
    """监控目录变化"""
    event_handler = FileChangeHandler()
    observer = Observer()
    observer.schedule(event_handler, path, recursive=False)
    observer.start()
    
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        observer.stop()
    observer.join()

# 使用示例
monitor_directory("./watched_folder")
                    

日志文件实时监控


def tail_file(file_path, interval=1.0):
    """实时监控日志文件更新"""
    with open(file_path, 'r') as f:
        # 移动到文件末尾
        f.seek(0, 2)
        while True:
            line = f.readline()
            if not line:
                time.sleep(interval)
                continue
            yield line.strip()

# 使用示例
def monitor_log():
    for line in tail_file('app.log'):
        if 'ERROR' in line:
            print(f"发现错误: {line}")
            # 可以添加告警逻辑
                    

特殊格式处理

CSV文件处理


import csv
import pandas as pd

def process_csv_with_pandas(file_path):
    """使用pandas处理CSV文件"""
    # 读取CSV
    df = pd.read_csv(file_path)
    
    # 数据处理
    df['new_column'] = df['column1'] + df['column2']
    
    # 数据过滤
    filtered_df = df[df['age'] > 25]
    
    # 保存结果
    filtered_df.to_csv('processed.csv', index=False)

def process_csv_manually(input_path, output_path):
    """手动处理CSV文件"""
    with open(input_path, 'r', newline='', encoding='utf-8') as infile:
        with open(output_path, 'w', newline='', encoding='utf-8') as outfile:
            reader = csv.DictReader(infile)
            # 添加新字段
            fieldnames = reader.fieldnames + ['new_field']
            writer = csv.DictWriter(outfile, fieldnames=fieldnames)
            writer.writeheader()
            
            for row in reader:
                # 处理每一行数据
                row['new_field'] = process_row(row)
                writer.writerow(row)

def process_row(row):
    """处理CSV行数据的示例函数"""
    return f"Processed_{row['name']}"
                    

JSON文件处理


import json

def update_json_file(file_path, updates):
    """更新JSON文件"""
    # 读取JSON
    with open(file_path, 'r', encoding='utf-8') as f:
        data = json.load(f)
    
    # 更新数据
    for key, value in updates.items():
        if isinstance(value, dict):
            data.setdefault(key, {}).update(value)
        else:
            data[key] = value
    
    # 保存更新
    with open(file_path, 'w', encoding='utf-8') as f:
        json.dump(data, f, indent=4, ensure_ascii=False)

# 使用示例
updates = {
    'name': '张三',
    'settings': {
        'theme': 'dark',
        'language': 'zh-CN'
    }
}
update_json_file('config.json', updates)
                    

压缩文件

ZIP文件操作


import zipfile
import os

def create_zip(zip_path, files_to_zip):
    """创建ZIP文件"""
    with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
        for file in files_to_zip:
            zipf.write(file, os.path.basename(file))

def extract_zip(zip_path, extract_path):
    """解压ZIP文件"""
    with zipfile.ZipFile(zip_path, 'r') as zipf:
        zipf.extractall(extract_path)

def add_to_zip(zip_path, file_path):
    """向ZIP文件添加内容"""
    with zipfile.ZipFile(zip_path, 'a') as zipf:
        zipf.write(file_path, os.path.basename(file_path))

# 使用示例
files = ['file1.txt', 'file2.txt']
create_zip('archive.zip', files)
extract_zip('archive.zip', 'extracted/')
                    

高级应用

文件备份系统


import shutil
import datetime
import os

class BackupSystem:
    def __init__(self, source_dir, backup_dir):
        self.source_dir = source_dir
        self.backup_dir = backup_dir
    
    def create_backup(self):
        """创建备份"""
        timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
        backup_path = os.path.join(
            self.backup_dir, 
            f'backup_{timestamp}'
        )
        
        try:
            # 创建备份
            shutil.copytree(self.source_dir, backup_path)
            print(f"备份创建成功: {backup_path}")
            return backup_path
        except Exception as e:
            print(f"备份失败: {e}")
            return None
    
    def restore_backup(self, backup_path):
        """恢复备份"""
        try:
            if os.path.exists(self.source_dir):
                shutil.rmtree(self.source_dir)
            shutil.copytree(backup_path, self.source_dir)
            print("备份恢复成功")
        except Exception as e:
            print(f"恢复失败: {e}")

# 使用示例
backup_system = BackupSystem('source_folder', 'backups')
backup_path = backup_system.create_backup()
if backup_path:
    backup_system.restore_backup(backup_path)
                    

文件同步工具


import filecmp
import os
import shutil

class FolderSync:
    def __init__(self, dir1, dir2):
        self.dir1 = dir1
        self.dir2 = dir2
    
    def sync_folders(self):
        """同步两个文件夹"""
        comparison = filecmp.dircmp(self.dir1, self.dir2)
        self._sync_folders_recursive(comparison)
    
    def _sync_folders_recursive(self, comparison):
        # 复制新文件
        for name in comparison.left_only:
            path1 = os.path.join(comparison.left, name)
            path2 = os.path.join(comparison.right, name)
            if os.path.isfile(path1):
                shutil.copy2(path1, path2)
            else:
                shutil.copytree(path1, path2)
        
        # 更新不同的文件
        for name in comparison.diff_files:
            path1 = os.path.join(comparison.left, name)
            path2 = os.path.join(comparison.right, name)
            shutil.copy2(path1, path2)
        
        # 递归处理子目录
        for subdir in comparison.common_dirs:
            left_subdir = os.path.join(comparison.left, subdir)
            right_subdir = os.path.join(comparison.right, subdir)
            sub_comparison = filecmp.dircmp(left_subdir, right_subdir)
            self._sync_folders_recursive(sub_comparison)

# 使用示例
sync_tool = FolderSync('folder1', 'folder2')
sync_tool.sync_folders()