Socket编程
TCP聊天室
import socket
import threading
class ChatServer:
"""TCP聊天服务器"""
def __init__(self, host='localhost', port=9999):
self.host = host
self.port = port
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.clients = {} # {client_socket: username}
def start(self):
"""启动服务器"""
self.server_socket.bind((self.host, self.port))
self.server_socket.listen(5)
print(f"聊天服务器启动在 {self.host}:{self.port}")
while True:
client_socket, address = self.server_socket.accept()
print(f"新连接来自: {address}")
# 为新客户端创建处理线程
thread = threading.Thread(
target=self.handle_client,
args=(client_socket,)
)
thread.start()
def handle_client(self, client_socket):
"""处理客户端连接"""
# 获取用户名
username = client_socket.recv(1024).decode()
self.clients[client_socket] = username
# 广播新用户加入
self.broadcast(f"{username} 加入了聊天室")
try:
while True:
message = client_socket.recv(1024).decode()
if not message:
break
# 广播消息
self.broadcast(f"{username}: {message}")
except:
pass
finally:
# 清理断开的连接
del self.clients[client_socket]
client_socket.close()
self.broadcast(f"{username} 离开了聊天室")
def broadcast(self, message):
"""广播消息给所有客户端"""
for client in self.clients:
try:
client.send(message.encode())
except:
continue
class ChatClient:
"""TCP聊天客户端"""
def __init__(self, host='localhost', port=9999):
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.connect((host, port))
def start(self, username):
"""启动客户端"""
self.socket.send(username.encode())
# 创建接收消息的线程
receive_thread = threading.Thread(target=self.receive_messages)
receive_thread.daemon = True
receive_thread.start()
# 发送消息
try:
while True:
message = input()
if message.lower() == 'quit':
break
self.socket.send(message.encode())
finally:
self.socket.close()
def receive_messages(self):
"""接收消息"""
while True:
try:
message = self.socket.recv(1024).decode()
print(message)
except:
break
# 使用示例 - 服务器
# server = ChatServer()
# server.start()
# 使用示例 - 客户端
# client = ChatClient()
# client.start("用户名")
UDP文件传输
import socket
import os
import struct
class FileTransfer:
"""UDP文件传输"""
CHUNK_SIZE = 1024
@staticmethod
def send_file(filename, host='localhost', port=9999):
"""发送文件"""
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
# 获取文件大小
file_size = os.path.getsize(filename)
# 发送文件信息
file_info = struct.pack('!Q', file_size)
sock.sendto(file_info, (host, port))
# 分块发送文件
with open(filename, 'rb') as f:
sequence = 0
while True:
chunk = f.read(FileTransfer.CHUNK_SIZE)
if not chunk:
break
# 打包序号和数据
packet = struct.pack('!I', sequence) + chunk
sock.sendto(packet, (host, port))
sequence += 1
print(f"文件 {filename} 发送完成")
finally:
sock.close()
@staticmethod
def receive_file(filename, host='localhost', port=9999):
"""接收文件"""
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind((host, port))
try:
# 接收文件大小
data, _ = sock.recvfrom(8)
file_size = struct.unpack('!Q', data)[0]
# 接收文件内容
received_chunks = {}
received_size = 0
while received_size < file_size:
data, _ = sock.recvfrom(FileTransfer.CHUNK_SIZE + 4)
sequence = struct.unpack('!I', data[:4])[0]
chunk = data[4:]
received_chunks[sequence] = chunk
received_size += len(chunk)
# 按序号重组文件
with open(filename, 'wb') as f:
for i in range(len(received_chunks)):
f.write(received_chunks[i])
print(f"文件 {filename} 接收完成")
finally:
sock.close()
# 使用示例 - 发送方
# FileTransfer.send_file("test.txt")
# 使用示例 - 接收方
# FileTransfer.receive_file("received.txt")
HTTP编程
简单Web服务器
from http.server import HTTPServer, BaseHTTPRequestHandler
import json
import urllib.parse
class SimpleHTTPServer(BaseHTTPRequestHandler):
"""简单的HTTP服务器"""
# 模拟数据库
users = {}
def do_GET(self):
"""处理GET请求"""
parsed_path = urllib.parse.urlparse(self.path)
if parsed_path.path == '/users':
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.end_headers()
response = json.dumps(self.users)
self.wfile.write(response.encode())
elif parsed_path.path.startswith('/users/'):
user_id = parsed_path.path.split('/')[-1]
if user_id in self.users:
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.end_headers()
response = json.dumps(self.users[user_id])
self.wfile.write(response.encode())
else:
self.send_error(404, "用户不存在")
else:
self.send_error(404, "页面不存在")
def do_POST(self):
"""处理POST请求"""
if self.path == '/users':
content_length = int(self.headers['Content-Length'])
post_data = self.rfile.read(content_length)
user_data = json.loads(post_data.decode())
# 生成用户ID
user_id = str(len(self.users) + 1)
self.users[user_id] = user_data
self.send_response(201)
self.send_header('Content-type', 'application/json')
self.end_headers()
response = json.dumps({'id': user_id})
self.wfile.write(response.encode())
else:
self.send_error(404, "接口不存在")
def do_PUT(self):
"""处理PUT请求"""
if self.path.startswith('/users/'):
user_id = self.path.split('/')[-1]
if user_id in self.users:
content_length = int(self.headers['Content-Length'])
put_data = self.rfile.read(content_length)
user_data = json.loads(put_data.decode())
self.users[user_id].update(user_data)
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.end_headers()
response = json.dumps(self.users[user_id])
self.wfile.write(response.encode())
else:
self.send_error(404, "用户不存在")
else:
self.send_error(404, "接口不存在")
def run_server(host='localhost', port=8000):
"""运行服务器"""
server_address = (host, port)
httpd = HTTPServer(server_address, SimpleHTTPServer)
print(f"服务器运行在 http://{host}:{port}")
httpd.serve_forever()
# 使用示例
# run_server()
WebSocket
实时聊天服务器
import asyncio
import websockets
import json
class ChatRoom:
"""WebSocket聊天室"""
def __init__(self):
self.clients = set()
self.messages = []
async def register(self, websocket):
"""注册新客户端"""
self.clients.add(websocket)
# 发送历史消息
for message in self.messages[-50:]: # 最近50条消息
await websocket.send(json.dumps(message))
async def unregister(self, websocket):
"""注销客户端"""
self.clients.remove(websocket)
async def broadcast(self, message):
"""广播消息"""
self.messages.append(message)
if len(self.messages) > 100: # 限制消息历史
self.messages = self.messages[-100:]
# 发送给所有客户端
for client in self.clients:
try:
await client.send(json.dumps(message))
except websockets.ConnectionClosed:
await self.unregister(client)
class ChatServer:
"""WebSocket聊天服务器"""
def __init__(self):
self.chatroom = ChatRoom()
async def handler(self, websocket, path):
"""处理WebSocket连接"""
# 等待客户端发送用户名
try:
message = await websocket.recv()
user_data = json.loads(message)
username = user_data.get('username', 'Anonymous')
# 注册客户端
await self.chatroom.register(websocket)
# 广播用户加入消息
await self.chatroom.broadcast({
'type': 'system',
'content': f'{username} 加入了聊天室'
})
try:
async for message in websocket:
data = json.loads(message)
# 广播聊天消息
await self.chatroom.broadcast({
'type': 'chat',
'username': username,
'content': data.get('content', '')
})
finally:
# 广播用户离开消息
await self.chatroom.broadcast({
'type': 'system',
'content': f'{username} 离开了聊天室'
})
await self.chatroom.unregister(websocket)
except websockets.ConnectionClosed:
pass
# 使用示例
async def main():
server = ChatServer()
async with websockets.serve(server.handler, 'localhost', 8765):
await asyncio.Future() # 运行直到被中断
# asyncio.run(main())
# 客户端示例(JavaScript)
"""
const ws = new WebSocket('ws://localhost:8765');
// 连接后发送用户名
ws.onopen = () => {
ws.send(JSON.stringify({
username: 'Alice'
}));
};
// 接收消息
ws.onmessage = (event) => {
const message = JSON.parse(event.data);
console.log(message);
};
// 发送消息
function sendMessage(content) {
ws.send(JSON.stringify({
content: content
}));
}
"""
异步网络
异步HTTP客户端
import aiohttp
import asyncio
import time
class AsyncWebClient:
"""异步网络请求客户端"""
def __init__(self):
self.session = None
async def __aenter__(self):
self.session = aiohttp.ClientSession()
return self
async def __aexit__(self, exc_type, exc, tb):
await self.session.close()
async def fetch_url(self, url):
"""获取单个URL内容"""
try:
async with self.session.get(url) as response:
return await response.text()
except Exception as e:
print(f"获取 {url} 失败: {str(e)}")
return None
async def fetch_all(self, urls):
"""并发获取多个URL内容"""
tasks = [self.fetch_url(url) for url in urls]
return await asyncio.gather(*tasks)
@staticmethod
async def download_files(urls, output_dir):
"""并发下载文件"""
async with AsyncWebClient() as client:
tasks = []
for url in urls:
filename = url.split('/')[-1]
task = client.download_file(url, f"{output_dir}/{filename}")
tasks.append(task)
await asyncio.gather(*tasks)
async def download_file(self, url, filename):
"""下载单个文件"""
try:
async with self.session.get(url) as response:
with open(filename, 'wb') as f:
while True:
chunk = await response.content.read(8192)
if not chunk:
break
f.write(chunk)
print(f"下载完成: {filename}")
except Exception as e:
print(f"下载 {url} 失败: {str(e)}")
# 使用示例
async def main():
# 并发请求多个URL
urls = [
'https://api.example.com/data1',
'https://api.example.com/data2',
'https://api.example.com/data3'
]
start_time = time.time()
async with AsyncWebClient() as client:
results = await client.fetch_all(urls)
for url, result in zip(urls, results):
if result:
print(f"成功获取 {url}")
print(f"总耗时: {time.time() - start_time:.2f}秒")
# 并发下载文件
download_urls = [
'https://example.com/file1.pdf',
'https://example.com/file2.pdf',
'https://example.com/file3.pdf'
]
await AsyncWebClient.download_files(download_urls, 'downloads')
# asyncio.run(main())
实际应用
天气预报API客户端
import requests
import json
from datetime import datetime
import time
class WeatherClient:
"""天气预报API客户端"""
def __init__(self, api_key):
self.api_key = api_key
self.base_url = "https://api.weatherapi.com/v1"
self.cache = {}
self.cache_duration = 1800 # 30分钟缓存
def get_weather(self, city):
"""获取城市天气信息"""
# 检查缓存
cache_key = f"weather_{city}"
cached = self.cache.get(cache_key)
if cached and time.time() - cached['timestamp'] < self.cache_duration:
return cached['data']
# 发送API请求
url = f"{self.base_url}/current.json"
params = {
'key': self.api_key,
'q': city,
'lang': 'zh'
}
try:
response = requests.get(url, params=params)
response.raise_for_status()
data = response.json()
# 格式化天气信息
weather_info = {
'city': data['location']['name'],
'country': data['location']['country'],
'temperature': data['current']['temp_c'],
'condition': data['current']['condition']['text'],
'humidity': data['current']['humidity'],
'wind_speed': data['current']['wind_kph'],
'updated': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
}
# 更新缓存
self.cache[cache_key] = {
'timestamp': time.time(),
'data': weather_info
}
return weather_info
except requests.exceptions.RequestException as e:
print(f"获取天气信息失败: {str(e)}")
return None
def get_forecast(self, city, days=3):
"""获取天气预报"""
url = f"{self.base_url}/forecast.json"
params = {
'key': self.api_key,
'q': city,
'days': days,
'lang': 'zh'
}
try:
response = requests.get(url, params=params)
response.raise_for_status()
data = response.json()
# 格式化预报信息
forecast = []
for day in data['forecast']['forecastday']:
forecast.append({
'date': day['date'],
'max_temp': day['day']['maxtemp_c'],
'min_temp': day['day']['mintemp_c'],
'condition': day['day']['condition']['text'],
'rain_chance': day['day']['daily_chance_of_rain']
})
return forecast
except requests.exceptions.RequestException as e:
print(f"获取天气预报失败: {str(e)}")
return None
# 使用示例
"""
client = WeatherClient('your_api_key')
# 获取当前天气
weather = client.get_weather('北京')
if weather:
print(f"{weather['city']}当前天气:")
print(f"温度: {weather['temperature']}°C")
print(f"天气: {weather['condition']}")
print(f"湿度: {weather['humidity']}%")
print(f"风速: {weather['wind_speed']}km/h")
print(f"更新时间: {weather['updated']}")
# 获取天气预报
forecast = client.get_forecast('北京')
if forecast:
print("\n未来三天天气预报:")
for day in forecast:
print(f"\n{day['date']}:")
print(f"最高温度: {day['max_temp']}°C")
print(f"最低温度: {day['min_temp']}°C")
print(f"天气: {day['condition']}")
print(f"降雨概率: {day['rain_chance']}%")
"""