微服务架构的重要性
微服务架构是现代应用开发的重要模式,通过将单体应用拆分为多个独立服务,实现更好的可扩展性、可维护性和技术多样性。Python应用通过微服务架构可以实现服务解耦、独立部署、技术栈灵活选择等优势。本文将从服务拆分到API网关,全面介绍Python微服务架构的最佳实践。
服务拆分
1. 用户服务
# services/user_service/app.py
from flask import Flask, jsonify, request
from flask_sqlalchemy import SQLAlchemy
from datetime import datetime
import os
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = os.getenv('DATABASE_URL')
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
db = SQLAlchemy(app)
class User(db.Model):
__tablename__ = 'users'
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(100), nullable=False)
email = db.Column(db.String(100), unique=True, nullable=False)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
def to_dict(self):
return {
'id': self.id,
'name': self.name,
'email': self.email,
'created_at': self.created_at.isoformat()
}
@app.route('/users', methods=['GET'])
def get_users():
"""获取用户列表"""
users = User.query.all()
return jsonify([user.to_dict() for user in users]), 200
@app.route('/users/<int:user_id>', methods=['GET'])
def get_user(user_id):
"""获取单个用户"""
user = User.query.get_or_404(user_id)
return jsonify(user.to_dict()), 200
@app.route('/users', methods=['POST'])
def create_user():
"""创建用户"""
data = request.json
user = User(name=data['name'], email=data['email'])
db.session.add(user)
db.session.commit()
return jsonify(user.to_dict()), 201
@app.route('/users/<int:user_id>', methods=['PUT'])
def update_user(user_id):
"""更新用户"""
user = User.query.get_or_404(user_id)
data = request.json
user.name = data.get('name', user.name)
user.email = data.get('email', user.email)
db.session.commit()
return jsonify(user.to_dict()), 200
@app.route('/users/<int:user_id>', methods=['DELETE'])
def delete_user(user_id):
"""删除用户"""
user = User.query.get_or_404(user_id)
db.session.delete(user)
db.session.commit()
return jsonify({'message': 'User deleted'}), 200
@app.route('/health', methods=['GET'])
def health_check():
"""健康检查"""
return jsonify({'status': 'healthy'}), 200
if __name__ == '__main__':
db.create_all()
app.run(host='0.0.0.0', port=8001)
2. 订单服务
# services/order_service/app.py
from flask import Flask, jsonify, request
from flask_sqlalchemy import SQLAlchemy
from datetime import datetime
import requests
import os
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = os.getenv('DATABASE_URL')
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
db = SQLAlchemy(app)
USER_SERVICE_URL = os.getenv('USER_SERVICE_URL', 'http://user-service:8001')
class Order(db.Model):
__tablename__ = 'orders'
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.Integer, nullable=False)
total_amount = db.Column(db.Float, nullable=False)
status = db.Column(db.String(50), default='pending')
created_at = db.Column(db.DateTime, default=datetime.utcnow)
def to_dict(self):
return {
'id': self.id,
'user_id': self.user_id,
'total_amount': self.total_amount,
'status': self.status,
'created_at': self.created_at.isoformat()
}
@app.route('/orders', methods=['POST'])
def create_order():
"""创建订单"""
data = request.json
user_id = data['user_id']
# 验证用户是否存在(调用用户服务)
try:
response = requests.get(f'{USER_SERVICE_URL}/users/{user_id}')
if response.status_code != 200:
return jsonify({'error': 'User not found'}), 404
except Exception as e:
return jsonify({'error': 'User service unavailable'}), 503
# 创建订单
order = Order(
user_id=user_id,
total_amount=data['total_amount'],
status='pending'
)
db.session.add(order)
db.session.commit()
return jsonify(order.to_dict()), 201
@app.route('/orders/<int:order_id>', methods=['GET'])
def get_order(order_id):
"""获取订单"""
order = Order.query.get_or_404(order_id)
return jsonify(order.to_dict()), 200
@app.route('/orders/user/<int:user_id>', methods=['GET'])
def get_user_orders(user_id):
"""获取用户订单"""
orders = Order.query.filter_by(user_id=user_id).all()
return jsonify([order.to_dict() for order in orders]), 200
if __name__ == '__main__':
db.create_all()
app.run(host='0.0.0.0', port=8002)
API网关
1. Flask API网关
# gateway/app.py
from flask import Flask, request, jsonify
import requests
import os
app = Flask(__name__)
# 服务注册表
SERVICES = {
'users': os.getenv('USER_SERVICE_URL', 'http://user-service:8001'),
'orders': os.getenv('ORDER_SERVICE_URL', 'http://order-service:8002'),
'products': os.getenv('PRODUCT_SERVICE_URL', 'http://product-service:8003')
}
def forward_request(service_name, path, method='GET', data=None):
"""转发请求到微服务"""
service_url = SERVICES.get(service_name)
if not service_url:
return None, 404
url = f"{service_url}{path}"
headers = {'Content-Type': 'application/json'}
try:
if method == 'GET':
response = requests.get(url, headers=headers, timeout=5)
elif method == 'POST':
response = requests.post(url, json=data, headers=headers, timeout=5)
elif method == 'PUT':
response = requests.put(url, json=data, headers=headers, timeout=5)
elif method == 'DELETE':
response = requests.delete(url, headers=headers, timeout=5)
else:
return None, 405
return response.json(), response.status_code
except requests.exceptions.RequestException as e:
return {'error': 'Service unavailable'}, 503
@app.route('/api/<service>/<path:path>', methods=['GET', 'POST', 'PUT', 'DELETE'])
def proxy(service, path):
"""代理请求到微服务"""
data = request.json if request.is_json else None
result, status_code = forward_request(
service,
f'/{path}',
method=request.method,
data=data
)
return jsonify(result), status_code
@app.route('/health', methods=['GET'])
def health_check():
"""健康检查"""
services_status = {}
for service_name, service_url in SERVICES.items():
try:
response = requests.get(f'{service_url}/health', timeout=2)
services_status[service_name] = 'healthy' if response.status_code == 200 else 'unhealthy'
except:
services_status[service_name] = 'unavailable'
return jsonify({
'gateway': 'healthy',
'services': services_status
}), 200
if __name__ == '__main__':
app.run(host='0.0.0.0', port=8000)
服务发现
1. 服务注册和发现
# services/service_discovery.py
import requests
import time
import threading
from flask import Flask, jsonify
app = Flask(__name__)
# 服务注册表
service_registry = {}
class ServiceRegistry:
"""服务注册表"""
@staticmethod
def register(service_name, service_url, health_check_url=None):
"""注册服务"""
service_registry[service_name] = {
'url': service_url,
'health_check_url': health_check_url or f'{service_url}/health',
'status': 'unknown',
'last_check': None
}
print(f"服务注册: {service_name} -> {service_url}")
@staticmethod
def get_service(service_name):
"""获取服务URL"""
service = service_registry.get(service_name)
if service and service['status'] == 'healthy':
return service['url']
return None
@staticmethod
def check_health(service_name):
"""检查服务健康状态"""
service = service_registry.get(service_name)
if not service:
return False
try:
response = requests.get(service['health_check_url'], timeout=2)
service['status'] = 'healthy' if response.status_code == 200 else 'unhealthy'
service['last_check'] = time.time()
return service['status'] == 'healthy'
except:
service['status'] = 'unavailable'
service['last_check'] = time.time()
return False
@staticmethod
def health_check_loop():
"""定期健康检查"""
while True:
for service_name in list(service_registry.keys()):
ServiceRegistry.check_health(service_name)
time.sleep(10) # 每10秒检查一次
# 启动健康检查线程
health_check_thread = threading.Thread(target=ServiceRegistry.health_check_loop, daemon=True)
health_check_thread.start()
@app.route('/register', methods=['POST'])
def register_service():
"""注册服务端点"""
data = request.json
ServiceRegistry.register(
data['name'],
data['url'],
data.get('health_check_url')
)
return jsonify({'message': 'Service registered'}), 201
@app.route('/services/<service_name>', methods=['GET'])
def get_service(service_name):
"""获取服务信息"""
service = service_registry.get(service_name)
if service:
return jsonify(service), 200
return jsonify({'error': 'Service not found'}), 404
@app.route('/services', methods=['GET'])
def list_services():
"""列出所有服务"""
return jsonify(service_registry), 200
if __name__ == '__main__':
app.run(host='0.0.0.0', port=8500)
负载均衡
1. 简单负载均衡器
# gateway/load_balancer.py
import random
import requests
from collections import defaultdict
class LoadBalancer:
"""负载均衡器"""
def __init__(self):
self.services = defaultdict(list)
self.request_counts = defaultdict(int)
def add_service(self, service_name, service_url):
"""添加服务实例"""
self.services[service_name].append(service_url)
print(f"添加服务实例: {service_name} -> {service_url}")
def get_service_url(self, service_name, strategy='round_robin'):
"""获取服务URL(负载均衡)"""
instances = self.services.get(service_name)
if not instances:
return None
if strategy == 'round_robin':
# 轮询
index = self.request_counts[service_name] % len(instances)
self.request_counts[service_name] += 1
return instances[index]
elif strategy == 'random':
# 随机
return random.choice(instances)
elif strategy == 'least_connections':
# 最少连接(简化版)
return instances[0] # 实际应该跟踪连接数
else:
return instances[0]
def forward_request(self, service_name, path, method='GET', data=None, strategy='round_robin'):
"""转发请求(带负载均衡)"""
service_url = self.get_service_url(service_name, strategy)
if not service_url:
return None, 404
url = f"{service_url}{path}"
headers = {'Content-Type': 'application/json'}
try:
if method == 'GET':
response = requests.get(url, headers=headers, timeout=5)
elif method == 'POST':
response = requests.post(url, json=data, headers=headers, timeout=5)
else:
return None, 405
return response.json(), response.status_code
except requests.exceptions.RequestException as e:
# 如果请求失败,尝试下一个实例
if len(self.services[service_name]) > 1:
return self.forward_request(service_name, path, method, data, strategy)
return {'error': 'Service unavailable'}, 503
# 使用示例
if __name__ == "__main__":
lb = LoadBalancer()
# 添加多个服务实例
lb.add_service('users', 'http://user-service-1:8001')
lb.add_service('users', 'http://user-service-2:8001')
lb.add_service('users', 'http://user-service-3:8001')
# 转发请求(自动负载均衡)
result, status = lb.forward_request('users', '/users', 'GET')
print(f"响应: {result}, 状态码: {status}")
总结
微服务架构的关键要点:
- 服务拆分:按业务领域拆分服务,保持服务独立性
- API网关:统一入口、路由转发、请求聚合
- 服务发现:服务注册、健康检查、动态发现
- 负载均衡:轮询、随机、最少连接等策略
- 服务通信:REST API、消息队列、gRPC
- 数据管理:每个服务独立数据库、数据同步
- 监控追踪:分布式追踪、服务监控、日志聚合
掌握这些微服务技能,可以实现高可扩展、高可维护的Python应用架构,为大型应用提供强大的微服务支持。
转载请注明:周志洋的博客 » Python云计算-微服务架构详解


