文件安全
简单文件加密
from cryptography.fernet import Fernet
import base64
def generate_key():
"""生成加密密钥"""
return Fernet.generate_key()
def encrypt_file(file_path, key):
"""加密文件"""
f = Fernet(key)
with open(file_path, 'rb') as file:
file_data = file.read()
encrypted_data = f.encrypt(file_data)
with open(file_path + '.encrypted', 'wb') as file:
file.write(encrypted_data)
def decrypt_file(file_path, key):
"""解密文件"""
f = Fernet(key)
with open(file_path, 'rb') as file:
encrypted_data = file.read()
decrypted_data = f.decrypt(encrypted_data)
with open(file_path[:-10], 'wb') as file:
file.write(decrypted_data)
# 使用示例
key = generate_key()
encrypt_file('secret.txt', key)
decrypt_file('secret.txt.encrypted', key)
文件校验
import hashlib
def calculate_file_hash(file_path, hash_type='sha256'):
"""计算文件哈希值"""
hash_func = getattr(hashlib, hash_type)()
with open(file_path, 'rb') as f:
for chunk in iter(lambda: f.read(4096), b''):
hash_func.update(chunk)
return hash_func.hexdigest()
def verify_file_integrity(file_path, original_hash):
"""验证文件完整性"""
current_hash = calculate_file_hash(file_path)
return current_hash == original_hash
# 使用示例
file_hash = calculate_file_hash('important.txt')
print(f"文件哈希值: {file_hash}")
is_valid = verify_file_integrity('important.txt', file_hash)
print(f"文件完整性: {'完整' if is_valid else '已被修改'}")
大文件处理
分块读写大文件
def process_large_file(input_path, output_path, chunk_size=8192):
"""分块处理大文件"""
with open(input_path, 'rb') as source:
with open(output_path, 'wb') as dest:
while True:
chunk = source.read(chunk_size)
if not chunk:
break
# 这里可以添加处理逻辑
processed_chunk = chunk.upper() # 示例:转换为大写
dest.write(processed_chunk)
def count_lines_efficiently(file_path):
"""高效统计文件行数"""
count = 0
with open(file_path, 'rb') as f:
while True:
buffer = f.read(8192*1024)
if not buffer:
break
count += buffer.count(b'\n')
return count
# 使用生成器处理大文件
def read_large_file(file_path):
"""使用生成器读取大文件"""
with open(file_path, 'r', encoding='utf-8') as f:
while True:
chunk = f.read(8192)
if not chunk:
break
yield chunk
内存映射文件
import mmap
import os
def process_with_mmap(file_path):
"""使用内存映射处理大文件"""
with open(file_path, 'r+b') as f:
# 获取文件大小
size = os.path.getsize(file_path)
# 创建内存映射
with mmap.mmap(f.fileno(), size) as mm:
# 查找特定内容
position = mm.find(b'search_term')
if position != -1:
print(f"找到内容在位置: {position}")
# 替换内容
mm.seek(0)
content = mm.read().replace(b'old', b'new')
mm.seek(0)
mm.write(content)
文件监控
实时文件监控
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import time
class FileChangeHandler(FileSystemEventHandler):
def on_modified(self, event):
if not event.is_directory:
print(f"文件被修改: {event.src_path}")
# 可以添加自定义处理逻辑
def on_created(self, event):
if not event.is_directory:
print(f"新文件创建: {event.src_path}")
def on_deleted(self, event):
if not event.is_directory:
print(f"文件被删除: {event.src_path}")
def monitor_directory(path):
"""监控目录变化"""
event_handler = FileChangeHandler()
observer = Observer()
observer.schedule(event_handler, path, recursive=False)
observer.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()
# 使用示例
monitor_directory("./watched_folder")
日志文件实时监控
def tail_file(file_path, interval=1.0):
"""实时监控日志文件更新"""
with open(file_path, 'r') as f:
# 移动到文件末尾
f.seek(0, 2)
while True:
line = f.readline()
if not line:
time.sleep(interval)
continue
yield line.strip()
# 使用示例
def monitor_log():
for line in tail_file('app.log'):
if 'ERROR' in line:
print(f"发现错误: {line}")
# 可以添加告警逻辑
特殊格式处理
CSV文件处理
import csv
import pandas as pd
def process_csv_with_pandas(file_path):
"""使用pandas处理CSV文件"""
# 读取CSV
df = pd.read_csv(file_path)
# 数据处理
df['new_column'] = df['column1'] + df['column2']
# 数据过滤
filtered_df = df[df['age'] > 25]
# 保存结果
filtered_df.to_csv('processed.csv', index=False)
def process_csv_manually(input_path, output_path):
"""手动处理CSV文件"""
with open(input_path, 'r', newline='', encoding='utf-8') as infile:
with open(output_path, 'w', newline='', encoding='utf-8') as outfile:
reader = csv.DictReader(infile)
# 添加新字段
fieldnames = reader.fieldnames + ['new_field']
writer = csv.DictWriter(outfile, fieldnames=fieldnames)
writer.writeheader()
for row in reader:
# 处理每一行数据
row['new_field'] = process_row(row)
writer.writerow(row)
def process_row(row):
"""处理CSV行数据的示例函数"""
return f"Processed_{row['name']}"
JSON文件处理
import json
def update_json_file(file_path, updates):
"""更新JSON文件"""
# 读取JSON
with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
# 更新数据
for key, value in updates.items():
if isinstance(value, dict):
data.setdefault(key, {}).update(value)
else:
data[key] = value
# 保存更新
with open(file_path, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=4, ensure_ascii=False)
# 使用示例
updates = {
'name': '张三',
'settings': {
'theme': 'dark',
'language': 'zh-CN'
}
}
update_json_file('config.json', updates)
压缩文件
ZIP文件操作
import zipfile
import os
def create_zip(zip_path, files_to_zip):
"""创建ZIP文件"""
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
for file in files_to_zip:
zipf.write(file, os.path.basename(file))
def extract_zip(zip_path, extract_path):
"""解压ZIP文件"""
with zipfile.ZipFile(zip_path, 'r') as zipf:
zipf.extractall(extract_path)
def add_to_zip(zip_path, file_path):
"""向ZIP文件添加内容"""
with zipfile.ZipFile(zip_path, 'a') as zipf:
zipf.write(file_path, os.path.basename(file_path))
# 使用示例
files = ['file1.txt', 'file2.txt']
create_zip('archive.zip', files)
extract_zip('archive.zip', 'extracted/')
高级应用
文件备份系统
import shutil
import datetime
import os
class BackupSystem:
def __init__(self, source_dir, backup_dir):
self.source_dir = source_dir
self.backup_dir = backup_dir
def create_backup(self):
"""创建备份"""
timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
backup_path = os.path.join(
self.backup_dir,
f'backup_{timestamp}'
)
try:
# 创建备份
shutil.copytree(self.source_dir, backup_path)
print(f"备份创建成功: {backup_path}")
return backup_path
except Exception as e:
print(f"备份失败: {e}")
return None
def restore_backup(self, backup_path):
"""恢复备份"""
try:
if os.path.exists(self.source_dir):
shutil.rmtree(self.source_dir)
shutil.copytree(backup_path, self.source_dir)
print("备份恢复成功")
except Exception as e:
print(f"恢复失败: {e}")
# 使用示例
backup_system = BackupSystem('source_folder', 'backups')
backup_path = backup_system.create_backup()
if backup_path:
backup_system.restore_backup(backup_path)
文件同步工具
import filecmp
import os
import shutil
class FolderSync:
def __init__(self, dir1, dir2):
self.dir1 = dir1
self.dir2 = dir2
def sync_folders(self):
"""同步两个文件夹"""
comparison = filecmp.dircmp(self.dir1, self.dir2)
self._sync_folders_recursive(comparison)
def _sync_folders_recursive(self, comparison):
# 复制新文件
for name in comparison.left_only:
path1 = os.path.join(comparison.left, name)
path2 = os.path.join(comparison.right, name)
if os.path.isfile(path1):
shutil.copy2(path1, path2)
else:
shutil.copytree(path1, path2)
# 更新不同的文件
for name in comparison.diff_files:
path1 = os.path.join(comparison.left, name)
path2 = os.path.join(comparison.right, name)
shutil.copy2(path1, path2)
# 递归处理子目录
for subdir in comparison.common_dirs:
left_subdir = os.path.join(comparison.left, subdir)
right_subdir = os.path.join(comparison.right, subdir)
sub_comparison = filecmp.dircmp(left_subdir, right_subdir)
self._sync_folders_recursive(sub_comparison)
# 使用示例
sync_tool = FolderSync('folder1', 'folder2')
sync_tool.sync_folders()