基础示例
并行下载器
import threading
import requests
import time
class DownloadManager:
"""多线程下载管理器"""
def __init__(self):
self.downloaded_files = []
self.lock = threading.Lock()
def download_file(self, url, filename):
"""下载单个文件"""
try:
response = requests.get(url)
with open(filename, 'wb') as f:
f.write(response.content)
with self.lock:
self.downloaded_files.append(filename)
print(f"已下载: {filename}")
except Exception as e:
print(f"下载 {filename} 失败: {str(e)}")
def parallel_download(self, url_list):
"""并行下载多个文件"""
threads = []
for i, url in enumerate(url_list):
filename = f"file_{i+1}.txt"
thread = threading.Thread(
target=self.download_file,
args=(url, filename)
)
threads.append(thread)
thread.start()
# 等待所有下载完成
for thread in threads:
thread.join()
return self.downloaded_files
# 使用示例
urls = [
"https://example.com/file1",
"https://example.com/file2",
"https://example.com/file3"
]
downloader = DownloadManager()
downloaded = downloader.parallel_download(urls)
print(f"下载完成的文件: {downloaded}")
定时任务执行器
import threading
import time
from datetime import datetime
class TimedTaskExecutor:
"""定时任务执行器"""
def __init__(self):
self.tasks = {}
self.running = True
def add_task(self, task_name, func, interval):
"""添加定时任务"""
def wrapper():
while self.running:
func()
time.sleep(interval)
thread = threading.Thread(target=wrapper)
thread.daemon = True
self.tasks[task_name] = thread
thread.start()
def stop_all(self):
"""停止所有任务"""
self.running = False
def list_tasks(self):
"""列出所有任务"""
return list(self.tasks.keys())
# 示例任务函数
def log_time():
print(f"当前时间: {datetime.now()}")
def check_system():
print("系统检查正常")
# 使用示例
executor = TimedTaskExecutor()
executor.add_task("时间日志", log_time, 5) # 每5秒记录一次时间
executor.add_task("系统检查", check_system, 10) # 每10秒检查一次系统
print(f"运行中的任务: {executor.list_tasks()}")
time.sleep(30) # 运行30秒
executor.stop_all()
线程同步
银行转账系统
import threading
import time
class BankAccount:
"""银行账户类"""
def __init__(self, account_id, balance):
self.account_id = account_id
self.balance = balance
self.lock = threading.Lock()
def transfer(self, target_account, amount):
"""转账操作"""
if amount <= 0:
raise ValueError("转账金额必须大于0")
# 获取两个账户的锁,避免死锁
first_lock = self.lock if id(self) < id(target_account) else target_account.lock
second_lock = target_account.lock if id(self) < id(target_account) else self.lock
with first_lock:
with second_lock:
if self.balance < amount:
raise ValueError("余额不足")
# 模拟网络延迟
time.sleep(0.1)
self.balance -= amount
target_account.balance += amount
print(f"从账户{self.account_id}转账{amount}元到账户{target_account.account_id}")
print(f"账户{self.account_id}余额: {self.balance}")
print(f"账户{target_account.account_id}余额: {target_account.balance}")
# 使用示例
account1 = BankAccount("A001", 1000)
account2 = BankAccount("A002", 1000)
# 创建多个转账线程
threads = []
for _ in range(5):
t1 = threading.Thread(target=account1.transfer, args=(account2, 100))
t2 = threading.Thread(target=account2.transfer, args=(account1, 50))
threads.extend([t1, t2])
# 启动所有线程
for thread in threads:
thread.start()
# 等待所有线程完成
for thread in threads:
thread.join()
线程通信
生产者-消费者模式
import threading
import queue
import random
import time
class ProductionLine:
"""生产线系统"""
def __init__(self, max_size=5):
self.queue = queue.Queue(max_size)
self.stop_event = threading.Event()
def producer(self, producer_id):
"""生产者"""
while not self.stop_event.is_set():
if not self.queue.full():
item = random.randint(1, 100)
self.queue.put(item)
print(f"生产者{producer_id}生产了商品: {item}")
time.sleep(random.random())
def consumer(self, consumer_id):
"""消费者"""
while not self.stop_event.is_set():
try:
item = self.queue.get(timeout=1)
print(f"消费者{consumer_id}消费了商品: {item}")
self.queue.task_done()
time.sleep(random.random())
except queue.Empty:
continue
def run_simulation(self, producer_count=2, consumer_count=3, duration=10):
"""运行模拟"""
# 创建生产者线程
producers = [
threading.Thread(
target=self.producer,
args=(i,)
)
for i in range(producer_count)
]
# 创建消费者线程
consumers = [
threading.Thread(
target=self.consumer,
args=(i,)
)
for i in range(consumer_count)
]
# 启动所有线程
for thread in producers + consumers:
thread.start()
# 运行指定时间
time.sleep(duration)
# 停止模拟
self.stop_event.set()
# 等待所有线程结束
for thread in producers + consumers:
thread.join()
# 使用示例
production_line = ProductionLine()
production_line.run_simulation(duration=5)
线程池
网站爬虫
from concurrent.futures import ThreadPoolExecutor
import requests
from urllib.parse import urljoin
import time
class WebCrawler:
"""网站爬虫"""
def __init__(self, base_url, max_workers=5):
self.base_url = base_url
self.visited_urls = set()
self.pool = ThreadPoolExecutor(max_workers=max_workers)
self.lock = threading.Lock()
def crawl_page(self, url):
"""爬取单个页面"""
try:
response = requests.get(url)
with self.lock:
print(f"爬取页面: {url}")
self.visited_urls.add(url)
return response.text
except Exception as e:
print(f"爬取失败 {url}: {str(e)}")
return None
def extract_links(self, html):
"""提取页面中的链接(简化版)"""
# 这里使用简单的字符串查找,实际应该使用HTML解析器
links = []
start = 0
while True:
start = html.find('href="', start)
if start == -1:
break
end = html.find('"', start + 6)
if end == -1:
break
link = html[start + 6:end]
if link.startswith('/'):
link = urljoin(self.base_url, link)
links.append(link)
start = end + 1
return links
def crawl_site(self, max_pages=10):
"""爬取整个网站"""
futures = []
futures.append(self.pool.submit(self.crawl_page, self.base_url))
while futures and len(self.visited_urls) < max_pages:
done, futures = concurrent.futures.wait(
futures,
return_when=concurrent.futures.FIRST_COMPLETED
)
for future in done:
html = future.result()
if html:
links = self.extract_links(html)
for link in links:
if (link.startswith(self.base_url) and
link not in self.visited_urls and
len(self.visited_urls) < max_pages):
futures.append(
self.pool.submit(self.crawl_page, link)
)
self.pool.shutdown()
return self.visited_urls
# 使用示例
crawler = WebCrawler("https://example.com")
visited = crawler.crawl_site(max_pages=5)
print(f"已爬取的页面: {visited}")
实际应用
图片处理服务
from concurrent.futures import ThreadPoolExecutor
from PIL import Image
import os
import threading
class ImageProcessor:
"""图片处理服务"""
def __init__(self, input_dir, output_dir, max_workers=4):
self.input_dir = input_dir
self.output_dir = output_dir
self.pool = ThreadPoolExecutor(max_workers=max_workers)
self.processed_count = 0
self.lock = threading.Lock()
# 创建输出目录
os.makedirs(output_dir, exist_ok=True)
def process_image(self, filename):
"""处理单张图片"""
try:
input_path = os.path.join(self.input_dir, filename)
output_path = os.path.join(self.output_dir, filename)
# 打开图片
with Image.open(input_path) as img:
# 调整大小
resized = img.resize((800, 600))
# 转换为灰度图
gray = resized.convert('L')
# 保存处理后的图片
gray.save(output_path)
with self.lock:
self.processed_count += 1
print(f"已处理: {filename}")
except Exception as e:
print(f"处理图片 {filename} 失败: {str(e)}")
def process_directory(self):
"""处理整个目录"""
image_files = [
f for f in os.listdir(self.input_dir)
if f.lower().endswith(('.png', '.jpg', '.jpeg'))
]
# 提交所有任务到线程池
futures = [
self.pool.submit(self.process_image, filename)
for filename in image_files
]
# 等待所有任务完成
for future in concurrent.futures.as_completed(futures):
try:
future.result()
except Exception as e:
print(f"任务执行失败: {str(e)}")
self.pool.shutdown()
return self.processed_count
# 使用示例
processor = ImageProcessor("input_images", "output_images")
processed_count = processor.process_directory()
print(f"共处理了 {processed_count} 张图片")