Python网络编程示例

本页面提供了Python网络编程的实用示例,包括TCP/UDP通信、HTTP客户端/服务器、WebSocket等内容。这些示例将帮助你更好地理解和应用网络编程的概念,开发网络应用程序。

Socket编程

TCP聊天室


import socket
import threading

class ChatServer:
    """TCP聊天服务器"""
    
    def __init__(self, host='localhost', port=9999):
        self.host = host
        self.port = port
        self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.clients = {}  # {client_socket: username}
        
    def start(self):
        """启动服务器"""
        self.server_socket.bind((self.host, self.port))
        self.server_socket.listen(5)
        print(f"聊天服务器启动在 {self.host}:{self.port}")
        
        while True:
            client_socket, address = self.server_socket.accept()
            print(f"新连接来自: {address}")
            
            # 为新客户端创建处理线程
            thread = threading.Thread(
                target=self.handle_client,
                args=(client_socket,)
            )
            thread.start()
    
    def handle_client(self, client_socket):
        """处理客户端连接"""
        # 获取用户名
        username = client_socket.recv(1024).decode()
        self.clients[client_socket] = username
        
        # 广播新用户加入
        self.broadcast(f"{username} 加入了聊天室")
        
        try:
            while True:
                message = client_socket.recv(1024).decode()
                if not message:
                    break
                
                # 广播消息
                self.broadcast(f"{username}: {message}")
        
        except:
            pass
        
        finally:
            # 清理断开的连接
            del self.clients[client_socket]
            client_socket.close()
            self.broadcast(f"{username} 离开了聊天室")
    
    def broadcast(self, message):
        """广播消息给所有客户端"""
        for client in self.clients:
            try:
                client.send(message.encode())
            except:
                continue

class ChatClient:
    """TCP聊天客户端"""
    
    def __init__(self, host='localhost', port=9999):
        self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.socket.connect((host, port))
    
    def start(self, username):
        """启动客户端"""
        self.socket.send(username.encode())
        
        # 创建接收消息的线程
        receive_thread = threading.Thread(target=self.receive_messages)
        receive_thread.daemon = True
        receive_thread.start()
        
        # 发送消息
        try:
            while True:
                message = input()
                if message.lower() == 'quit':
                    break
                self.socket.send(message.encode())
        
        finally:
            self.socket.close()
    
    def receive_messages(self):
        """接收消息"""
        while True:
            try:
                message = self.socket.recv(1024).decode()
                print(message)
            except:
                break

# 使用示例 - 服务器
# server = ChatServer()
# server.start()

# 使用示例 - 客户端
# client = ChatClient()
# client.start("用户名")
                    

UDP文件传输


import socket
import os
import struct

class FileTransfer:
    """UDP文件传输"""
    
    CHUNK_SIZE = 1024
    
    @staticmethod
    def send_file(filename, host='localhost', port=9999):
        """发送文件"""
        sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        
        try:
            # 获取文件大小
            file_size = os.path.getsize(filename)
            
            # 发送文件信息
            file_info = struct.pack('!Q', file_size)
            sock.sendto(file_info, (host, port))
            
            # 分块发送文件
            with open(filename, 'rb') as f:
                sequence = 0
                while True:
                    chunk = f.read(FileTransfer.CHUNK_SIZE)
                    if not chunk:
                        break
                    
                    # 打包序号和数据
                    packet = struct.pack('!I', sequence) + chunk
                    sock.sendto(packet, (host, port))
                    sequence += 1
            
            print(f"文件 {filename} 发送完成")
        
        finally:
            sock.close()
    
    @staticmethod
    def receive_file(filename, host='localhost', port=9999):
        """接收文件"""
        sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        sock.bind((host, port))
        
        try:
            # 接收文件大小
            data, _ = sock.recvfrom(8)
            file_size = struct.unpack('!Q', data)[0]
            
            # 接收文件内容
            received_chunks = {}
            received_size = 0
            
            while received_size < file_size:
                data, _ = sock.recvfrom(FileTransfer.CHUNK_SIZE + 4)
                sequence = struct.unpack('!I', data[:4])[0]
                chunk = data[4:]
                
                received_chunks[sequence] = chunk
                received_size += len(chunk)
            
            # 按序号重组文件
            with open(filename, 'wb') as f:
                for i in range(len(received_chunks)):
                    f.write(received_chunks[i])
            
            print(f"文件 {filename} 接收完成")
        
        finally:
            sock.close()

# 使用示例 - 发送方
# FileTransfer.send_file("test.txt")

# 使用示例 - 接收方
# FileTransfer.receive_file("received.txt")
                    

HTTP编程

简单Web服务器


from http.server import HTTPServer, BaseHTTPRequestHandler
import json
import urllib.parse

class SimpleHTTPServer(BaseHTTPRequestHandler):
    """简单的HTTP服务器"""
    
    # 模拟数据库
    users = {}
    
    def do_GET(self):
        """处理GET请求"""
        parsed_path = urllib.parse.urlparse(self.path)
        
        if parsed_path.path == '/users':
            self.send_response(200)
            self.send_header('Content-type', 'application/json')
            self.end_headers()
            
            response = json.dumps(self.users)
            self.wfile.write(response.encode())
        
        elif parsed_path.path.startswith('/users/'):
            user_id = parsed_path.path.split('/')[-1]
            if user_id in self.users:
                self.send_response(200)
                self.send_header('Content-type', 'application/json')
                self.end_headers()
                
                response = json.dumps(self.users[user_id])
                self.wfile.write(response.encode())
            else:
                self.send_error(404, "用户不存在")
        
        else:
            self.send_error(404, "页面不存在")
    
    def do_POST(self):
        """处理POST请求"""
        if self.path == '/users':
            content_length = int(self.headers['Content-Length'])
            post_data = self.rfile.read(content_length)
            user_data = json.loads(post_data.decode())
            
            # 生成用户ID
            user_id = str(len(self.users) + 1)
            self.users[user_id] = user_data
            
            self.send_response(201)
            self.send_header('Content-type', 'application/json')
            self.end_headers()
            
            response = json.dumps({'id': user_id})
            self.wfile.write(response.encode())
        
        else:
            self.send_error(404, "接口不存在")
    
    def do_PUT(self):
        """处理PUT请求"""
        if self.path.startswith('/users/'):
            user_id = self.path.split('/')[-1]
            if user_id in self.users:
                content_length = int(self.headers['Content-Length'])
                put_data = self.rfile.read(content_length)
                user_data = json.loads(put_data.decode())
                
                self.users[user_id].update(user_data)
                
                self.send_response(200)
                self.send_header('Content-type', 'application/json')
                self.end_headers()
                
                response = json.dumps(self.users[user_id])
                self.wfile.write(response.encode())
            else:
                self.send_error(404, "用户不存在")
        else:
            self.send_error(404, "接口不存在")

def run_server(host='localhost', port=8000):
    """运行服务器"""
    server_address = (host, port)
    httpd = HTTPServer(server_address, SimpleHTTPServer)
    print(f"服务器运行在 http://{host}:{port}")
    httpd.serve_forever()

# 使用示例
# run_server()
                    

WebSocket

实时聊天服务器


import asyncio
import websockets
import json

class ChatRoom:
    """WebSocket聊天室"""
    
    def __init__(self):
        self.clients = set()
        self.messages = []
    
    async def register(self, websocket):
        """注册新客户端"""
        self.clients.add(websocket)
        # 发送历史消息
        for message in self.messages[-50:]:  # 最近50条消息
            await websocket.send(json.dumps(message))
    
    async def unregister(self, websocket):
        """注销客户端"""
        self.clients.remove(websocket)
    
    async def broadcast(self, message):
        """广播消息"""
        self.messages.append(message)
        if len(self.messages) > 100:  # 限制消息历史
            self.messages = self.messages[-100:]
        
        # 发送给所有客户端
        for client in self.clients:
            try:
                await client.send(json.dumps(message))
            except websockets.ConnectionClosed:
                await self.unregister(client)

class ChatServer:
    """WebSocket聊天服务器"""
    
    def __init__(self):
        self.chatroom = ChatRoom()
    
    async def handler(self, websocket, path):
        """处理WebSocket连接"""
        # 等待客户端发送用户名
        try:
            message = await websocket.recv()
            user_data = json.loads(message)
            username = user_data.get('username', 'Anonymous')
            
            # 注册客户端
            await self.chatroom.register(websocket)
            
            # 广播用户加入消息
            await self.chatroom.broadcast({
                'type': 'system',
                'content': f'{username} 加入了聊天室'
            })
            
            try:
                async for message in websocket:
                    data = json.loads(message)
                    # 广播聊天消息
                    await self.chatroom.broadcast({
                        'type': 'chat',
                        'username': username,
                        'content': data.get('content', '')
                    })
            
            finally:
                # 广播用户离开消息
                await self.chatroom.broadcast({
                    'type': 'system',
                    'content': f'{username} 离开了聊天室'
                })
                await self.chatroom.unregister(websocket)
        
        except websockets.ConnectionClosed:
            pass

# 使用示例
async def main():
    server = ChatServer()
    async with websockets.serve(server.handler, 'localhost', 8765):
        await asyncio.Future()  # 运行直到被中断

# asyncio.run(main())

# 客户端示例(JavaScript)
"""
const ws = new WebSocket('ws://localhost:8765');

// 连接后发送用户名
ws.onopen = () => {
    ws.send(JSON.stringify({
        username: 'Alice'
    }));
};

// 接收消息
ws.onmessage = (event) => {
    const message = JSON.parse(event.data);
    console.log(message);
};

// 发送消息
function sendMessage(content) {
    ws.send(JSON.stringify({
        content: content
    }));
}
"""
                    

异步网络

异步HTTP客户端


import aiohttp
import asyncio
import time

class AsyncWebClient:
    """异步网络请求客户端"""
    
    def __init__(self):
        self.session = None
    
    async def __aenter__(self):
        self.session = aiohttp.ClientSession()
        return self
    
    async def __aexit__(self, exc_type, exc, tb):
        await self.session.close()
    
    async def fetch_url(self, url):
        """获取单个URL内容"""
        try:
            async with self.session.get(url) as response:
                return await response.text()
        except Exception as e:
            print(f"获取 {url} 失败: {str(e)}")
            return None
    
    async def fetch_all(self, urls):
        """并发获取多个URL内容"""
        tasks = [self.fetch_url(url) for url in urls]
        return await asyncio.gather(*tasks)
    
    @staticmethod
    async def download_files(urls, output_dir):
        """并发下载文件"""
        async with AsyncWebClient() as client:
            tasks = []
            for url in urls:
                filename = url.split('/')[-1]
                task = client.download_file(url, f"{output_dir}/{filename}")
                tasks.append(task)
            
            await asyncio.gather(*tasks)
    
    async def download_file(self, url, filename):
        """下载单个文件"""
        try:
            async with self.session.get(url) as response:
                with open(filename, 'wb') as f:
                    while True:
                        chunk = await response.content.read(8192)
                        if not chunk:
                            break
                        f.write(chunk)
                print(f"下载完成: {filename}")
        except Exception as e:
            print(f"下载 {url} 失败: {str(e)}")

# 使用示例
async def main():
    # 并发请求多个URL
    urls = [
        'https://api.example.com/data1',
        'https://api.example.com/data2',
        'https://api.example.com/data3'
    ]
    
    start_time = time.time()
    
    async with AsyncWebClient() as client:
        results = await client.fetch_all(urls)
        for url, result in zip(urls, results):
            if result:
                print(f"成功获取 {url}")
    
    print(f"总耗时: {time.time() - start_time:.2f}秒")
    
    # 并发下载文件
    download_urls = [
        'https://example.com/file1.pdf',
        'https://example.com/file2.pdf',
        'https://example.com/file3.pdf'
    ]
    
    await AsyncWebClient.download_files(download_urls, 'downloads')

# asyncio.run(main())
                    

实际应用

天气预报API客户端


import requests
import json
from datetime import datetime
import time

class WeatherClient:
    """天气预报API客户端"""
    
    def __init__(self, api_key):
        self.api_key = api_key
        self.base_url = "https://api.weatherapi.com/v1"
        self.cache = {}
        self.cache_duration = 1800  # 30分钟缓存
    
    def get_weather(self, city):
        """获取城市天气信息"""
        # 检查缓存
        cache_key = f"weather_{city}"
        cached = self.cache.get(cache_key)
        if cached and time.time() - cached['timestamp'] < self.cache_duration:
            return cached['data']
        
        # 发送API请求
        url = f"{self.base_url}/current.json"
        params = {
            'key': self.api_key,
            'q': city,
            'lang': 'zh'
        }
        
        try:
            response = requests.get(url, params=params)
            response.raise_for_status()
            data = response.json()
            
            # 格式化天气信息
            weather_info = {
                'city': data['location']['name'],
                'country': data['location']['country'],
                'temperature': data['current']['temp_c'],
                'condition': data['current']['condition']['text'],
                'humidity': data['current']['humidity'],
                'wind_speed': data['current']['wind_kph'],
                'updated': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
            }
            
            # 更新缓存
            self.cache[cache_key] = {
                'timestamp': time.time(),
                'data': weather_info
            }
            
            return weather_info
        
        except requests.exceptions.RequestException as e:
            print(f"获取天气信息失败: {str(e)}")
            return None
    
    def get_forecast(self, city, days=3):
        """获取天气预报"""
        url = f"{self.base_url}/forecast.json"
        params = {
            'key': self.api_key,
            'q': city,
            'days': days,
            'lang': 'zh'
        }
        
        try:
            response = requests.get(url, params=params)
            response.raise_for_status()
            data = response.json()
            
            # 格式化预报信息
            forecast = []
            for day in data['forecast']['forecastday']:
                forecast.append({
                    'date': day['date'],
                    'max_temp': day['day']['maxtemp_c'],
                    'min_temp': day['day']['mintemp_c'],
                    'condition': day['day']['condition']['text'],
                    'rain_chance': day['day']['daily_chance_of_rain']
                })
            
            return forecast
        
        except requests.exceptions.RequestException as e:
            print(f"获取天气预报失败: {str(e)}")
            return None

# 使用示例
"""
client = WeatherClient('your_api_key')

# 获取当前天气
weather = client.get_weather('北京')
if weather:
    print(f"{weather['city']}当前天气:")
    print(f"温度: {weather['temperature']}°C")
    print(f"天气: {weather['condition']}")
    print(f"湿度: {weather['humidity']}%")
    print(f"风速: {weather['wind_speed']}km/h")
    print(f"更新时间: {weather['updated']}")

# 获取天气预报
forecast = client.get_forecast('北京')
if forecast:
    print("\n未来三天天气预报:")
    for day in forecast:
        print(f"\n{day['date']}:")
        print(f"最高温度: {day['max_temp']}°C")
        print(f"最低温度: {day['min_temp']}°C")
        print(f"天气: {day['condition']}")
        print(f"降雨概率: {day['rain_chance']}%")
"""