Python多线程编程示例

本页面提供了Python多线程编程的实用示例,包括基础线程创建、线程同步、线程通信、线程池等内容。这些示例将帮助你更好地理解和应用多线程编程的概念,提高程序的并发处理能力。

基础示例

并行下载器


import threading
import requests
import time

class DownloadManager:
    """多线程下载管理器"""
    
    def __init__(self):
        self.downloaded_files = []
        self.lock = threading.Lock()
    
    def download_file(self, url, filename):
        """下载单个文件"""
        try:
            response = requests.get(url)
            with open(filename, 'wb') as f:
                f.write(response.content)
            
            with self.lock:
                self.downloaded_files.append(filename)
                print(f"已下载: {filename}")
        
        except Exception as e:
            print(f"下载 {filename} 失败: {str(e)}")
    
    def parallel_download(self, url_list):
        """并行下载多个文件"""
        threads = []
        
        for i, url in enumerate(url_list):
            filename = f"file_{i+1}.txt"
            thread = threading.Thread(
                target=self.download_file,
                args=(url, filename)
            )
            threads.append(thread)
            thread.start()
        
        # 等待所有下载完成
        for thread in threads:
            thread.join()
        
        return self.downloaded_files

# 使用示例
urls = [
    "https://example.com/file1",
    "https://example.com/file2",
    "https://example.com/file3"
]

downloader = DownloadManager()
downloaded = downloader.parallel_download(urls)
print(f"下载完成的文件: {downloaded}")
                    

定时任务执行器


import threading
import time
from datetime import datetime

class TimedTaskExecutor:
    """定时任务执行器"""
    
    def __init__(self):
        self.tasks = {}
        self.running = True
    
    def add_task(self, task_name, func, interval):
        """添加定时任务"""
        def wrapper():
            while self.running:
                func()
                time.sleep(interval)
        
        thread = threading.Thread(target=wrapper)
        thread.daemon = True
        self.tasks[task_name] = thread
        thread.start()
    
    def stop_all(self):
        """停止所有任务"""
        self.running = False
    
    def list_tasks(self):
        """列出所有任务"""
        return list(self.tasks.keys())

# 示例任务函数
def log_time():
    print(f"当前时间: {datetime.now()}")

def check_system():
    print("系统检查正常")

# 使用示例
executor = TimedTaskExecutor()
executor.add_task("时间日志", log_time, 5)  # 每5秒记录一次时间
executor.add_task("系统检查", check_system, 10)  # 每10秒检查一次系统

print(f"运行中的任务: {executor.list_tasks()}")
time.sleep(30)  # 运行30秒
executor.stop_all()
                    

线程同步

银行转账系统


import threading
import time

class BankAccount:
    """银行账户类"""
    
    def __init__(self, account_id, balance):
        self.account_id = account_id
        self.balance = balance
        self.lock = threading.Lock()
    
    def transfer(self, target_account, amount):
        """转账操作"""
        if amount <= 0:
            raise ValueError("转账金额必须大于0")
        
        # 获取两个账户的锁,避免死锁
        first_lock = self.lock if id(self) < id(target_account) else target_account.lock
        second_lock = target_account.lock if id(self) < id(target_account) else self.lock
        
        with first_lock:
            with second_lock:
                if self.balance < amount:
                    raise ValueError("余额不足")
                
                # 模拟网络延迟
                time.sleep(0.1)
                
                self.balance -= amount
                target_account.balance += amount
                
                print(f"从账户{self.account_id}转账{amount}元到账户{target_account.account_id}")
                print(f"账户{self.account_id}余额: {self.balance}")
                print(f"账户{target_account.account_id}余额: {target_account.balance}")

# 使用示例
account1 = BankAccount("A001", 1000)
account2 = BankAccount("A002", 1000)

# 创建多个转账线程
threads = []
for _ in range(5):
    t1 = threading.Thread(target=account1.transfer, args=(account2, 100))
    t2 = threading.Thread(target=account2.transfer, args=(account1, 50))
    threads.extend([t1, t2])

# 启动所有线程
for thread in threads:
    thread.start()

# 等待所有线程完成
for thread in threads:
    thread.join()
                    

线程通信

生产者-消费者模式


import threading
import queue
import random
import time

class ProductionLine:
    """生产线系统"""
    
    def __init__(self, max_size=5):
        self.queue = queue.Queue(max_size)
        self.stop_event = threading.Event()
    
    def producer(self, producer_id):
        """生产者"""
        while not self.stop_event.is_set():
            if not self.queue.full():
                item = random.randint(1, 100)
                self.queue.put(item)
                print(f"生产者{producer_id}生产了商品: {item}")
                time.sleep(random.random())
    
    def consumer(self, consumer_id):
        """消费者"""
        while not self.stop_event.is_set():
            try:
                item = self.queue.get(timeout=1)
                print(f"消费者{consumer_id}消费了商品: {item}")
                self.queue.task_done()
                time.sleep(random.random())
            except queue.Empty:
                continue
    
    def run_simulation(self, producer_count=2, consumer_count=3, duration=10):
        """运行模拟"""
        # 创建生产者线程
        producers = [
            threading.Thread(
                target=self.producer,
                args=(i,)
            )
            for i in range(producer_count)
        ]
        
        # 创建消费者线程
        consumers = [
            threading.Thread(
                target=self.consumer,
                args=(i,)
            )
            for i in range(consumer_count)
        ]
        
        # 启动所有线程
        for thread in producers + consumers:
            thread.start()
        
        # 运行指定时间
        time.sleep(duration)
        
        # 停止模拟
        self.stop_event.set()
        
        # 等待所有线程结束
        for thread in producers + consumers:
            thread.join()

# 使用示例
production_line = ProductionLine()
production_line.run_simulation(duration=5)
                    

线程池

网站爬虫


from concurrent.futures import ThreadPoolExecutor
import requests
from urllib.parse import urljoin
import time

class WebCrawler:
    """网站爬虫"""
    
    def __init__(self, base_url, max_workers=5):
        self.base_url = base_url
        self.visited_urls = set()
        self.pool = ThreadPoolExecutor(max_workers=max_workers)
        self.lock = threading.Lock()
    
    def crawl_page(self, url):
        """爬取单个页面"""
        try:
            response = requests.get(url)
            with self.lock:
                print(f"爬取页面: {url}")
                self.visited_urls.add(url)
            return response.text
        except Exception as e:
            print(f"爬取失败 {url}: {str(e)}")
            return None
    
    def extract_links(self, html):
        """提取页面中的链接(简化版)"""
        # 这里使用简单的字符串查找,实际应该使用HTML解析器
        links = []
        start = 0
        while True:
            start = html.find('href="', start)
            if start == -1:
                break
            end = html.find('"', start + 6)
            if end == -1:
                break
            link = html[start + 6:end]
            if link.startswith('/'):
                link = urljoin(self.base_url, link)
            links.append(link)
            start = end + 1
        return links
    
    def crawl_site(self, max_pages=10):
        """爬取整个网站"""
        futures = []
        futures.append(self.pool.submit(self.crawl_page, self.base_url))
        
        while futures and len(self.visited_urls) < max_pages:
            done, futures = concurrent.futures.wait(
                futures,
                return_when=concurrent.futures.FIRST_COMPLETED
            )
            
            for future in done:
                html = future.result()
                if html:
                    links = self.extract_links(html)
                    for link in links:
                        if (link.startswith(self.base_url) and
                            link not in self.visited_urls and
                            len(self.visited_urls) < max_pages):
                            futures.append(
                                self.pool.submit(self.crawl_page, link)
                            )
        
        self.pool.shutdown()
        return self.visited_urls

# 使用示例
crawler = WebCrawler("https://example.com")
visited = crawler.crawl_site(max_pages=5)
print(f"已爬取的页面: {visited}")
                    

实际应用

图片处理服务


from concurrent.futures import ThreadPoolExecutor
from PIL import Image
import os
import threading

class ImageProcessor:
    """图片处理服务"""
    
    def __init__(self, input_dir, output_dir, max_workers=4):
        self.input_dir = input_dir
        self.output_dir = output_dir
        self.pool = ThreadPoolExecutor(max_workers=max_workers)
        self.processed_count = 0
        self.lock = threading.Lock()
        
        # 创建输出目录
        os.makedirs(output_dir, exist_ok=True)
    
    def process_image(self, filename):
        """处理单张图片"""
        try:
            input_path = os.path.join(self.input_dir, filename)
            output_path = os.path.join(self.output_dir, filename)
            
            # 打开图片
            with Image.open(input_path) as img:
                # 调整大小
                resized = img.resize((800, 600))
                # 转换为灰度图
                gray = resized.convert('L')
                # 保存处理后的图片
                gray.save(output_path)
            
            with self.lock:
                self.processed_count += 1
                print(f"已处理: {filename}")
        
        except Exception as e:
            print(f"处理图片 {filename} 失败: {str(e)}")
    
    def process_directory(self):
        """处理整个目录"""
        image_files = [
            f for f in os.listdir(self.input_dir)
            if f.lower().endswith(('.png', '.jpg', '.jpeg'))
        ]
        
        # 提交所有任务到线程池
        futures = [
            self.pool.submit(self.process_image, filename)
            for filename in image_files
        ]
        
        # 等待所有任务完成
        for future in concurrent.futures.as_completed(futures):
            try:
                future.result()
            except Exception as e:
                print(f"任务执行失败: {str(e)}")
        
        self.pool.shutdown()
        return self.processed_count

# 使用示例
processor = ImageProcessor("input_images", "output_images")
processed_count = processor.process_directory()
print(f"共处理了 {processed_count} 张图片")