周志洋

个人站

持续学习 才能不被淘汰


Python云计算-微服务架构详解

微服务架构的重要性

微服务架构是现代应用开发的重要模式,通过将单体应用拆分为多个独立服务,实现更好的可扩展性、可维护性和技术多样性。Python应用通过微服务架构可以实现服务解耦、独立部署、技术栈灵活选择等优势。本文将从服务拆分到API网关,全面介绍Python微服务架构的最佳实践。

服务拆分

1. 用户服务

# services/user_service/app.py
from flask import Flask, jsonify, request
from flask_sqlalchemy import SQLAlchemy
from datetime import datetime
import os

app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = os.getenv('DATABASE_URL')
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False

db = SQLAlchemy(app)

class User(db.Model):
    __tablename__ = 'users'
    
    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(100), nullable=False)
    email = db.Column(db.String(100), unique=True, nullable=False)
    created_at = db.Column(db.DateTime, default=datetime.utcnow)
    
    def to_dict(self):
        return {
            'id': self.id,
            'name': self.name,
            'email': self.email,
            'created_at': self.created_at.isoformat()
        }

@app.route('/users', methods=['GET'])
def get_users():
    """获取用户列表"""
    users = User.query.all()
    return jsonify([user.to_dict() for user in users]), 200

@app.route('/users/<int:user_id>', methods=['GET'])
def get_user(user_id):
    """获取单个用户"""
    user = User.query.get_or_404(user_id)
    return jsonify(user.to_dict()), 200

@app.route('/users', methods=['POST'])
def create_user():
    """创建用户"""
    data = request.json
    user = User(name=data['name'], email=data['email'])
    db.session.add(user)
    db.session.commit()
    return jsonify(user.to_dict()), 201

@app.route('/users/<int:user_id>', methods=['PUT'])
def update_user(user_id):
    """更新用户"""
    user = User.query.get_or_404(user_id)
    data = request.json
    user.name = data.get('name', user.name)
    user.email = data.get('email', user.email)
    db.session.commit()
    return jsonify(user.to_dict()), 200

@app.route('/users/<int:user_id>', methods=['DELETE'])
def delete_user(user_id):
    """删除用户"""
    user = User.query.get_or_404(user_id)
    db.session.delete(user)
    db.session.commit()
    return jsonify({'message': 'User deleted'}), 200

@app.route('/health', methods=['GET'])
def health_check():
    """健康检查"""
    return jsonify({'status': 'healthy'}), 200

if __name__ == '__main__':
    db.create_all()
    app.run(host='0.0.0.0', port=8001)

2. 订单服务

# services/order_service/app.py
from flask import Flask, jsonify, request
from flask_sqlalchemy import SQLAlchemy
from datetime import datetime
import requests
import os

app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = os.getenv('DATABASE_URL')
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False

db = SQLAlchemy(app)
USER_SERVICE_URL = os.getenv('USER_SERVICE_URL', 'http://user-service:8001')

class Order(db.Model):
    __tablename__ = 'orders'
    
    id = db.Column(db.Integer, primary_key=True)
    user_id = db.Column(db.Integer, nullable=False)
    total_amount = db.Column(db.Float, nullable=False)
    status = db.Column(db.String(50), default='pending')
    created_at = db.Column(db.DateTime, default=datetime.utcnow)
    
    def to_dict(self):
        return {
            'id': self.id,
            'user_id': self.user_id,
            'total_amount': self.total_amount,
            'status': self.status,
            'created_at': self.created_at.isoformat()
        }

@app.route('/orders', methods=['POST'])
def create_order():
    """创建订单"""
    data = request.json
    user_id = data['user_id']
    
    # 验证用户是否存在(调用用户服务)
    try:
        response = requests.get(f'{USER_SERVICE_URL}/users/{user_id}')
        if response.status_code != 200:
            return jsonify({'error': 'User not found'}), 404
    except Exception as e:
        return jsonify({'error': 'User service unavailable'}), 503
    
    # 创建订单
    order = Order(
        user_id=user_id,
        total_amount=data['total_amount'],
        status='pending'
    )
    db.session.add(order)
    db.session.commit()
    
    return jsonify(order.to_dict()), 201

@app.route('/orders/<int:order_id>', methods=['GET'])
def get_order(order_id):
    """获取订单"""
    order = Order.query.get_or_404(order_id)
    return jsonify(order.to_dict()), 200

@app.route('/orders/user/<int:user_id>', methods=['GET'])
def get_user_orders(user_id):
    """获取用户订单"""
    orders = Order.query.filter_by(user_id=user_id).all()
    return jsonify([order.to_dict() for order in orders]), 200

if __name__ == '__main__':
    db.create_all()
    app.run(host='0.0.0.0', port=8002)

API网关

1. Flask API网关

# gateway/app.py
from flask import Flask, request, jsonify
import requests
import os

app = Flask(__name__)

# 服务注册表
SERVICES = {
    'users': os.getenv('USER_SERVICE_URL', 'http://user-service:8001'),
    'orders': os.getenv('ORDER_SERVICE_URL', 'http://order-service:8002'),
    'products': os.getenv('PRODUCT_SERVICE_URL', 'http://product-service:8003')
}

def forward_request(service_name, path, method='GET', data=None):
    """转发请求到微服务"""
    service_url = SERVICES.get(service_name)
    if not service_url:
        return None, 404
    
    url = f"{service_url}{path}"
    headers = {'Content-Type': 'application/json'}
    
    try:
        if method == 'GET':
            response = requests.get(url, headers=headers, timeout=5)
        elif method == 'POST':
            response = requests.post(url, json=data, headers=headers, timeout=5)
        elif method == 'PUT':
            response = requests.put(url, json=data, headers=headers, timeout=5)
        elif method == 'DELETE':
            response = requests.delete(url, headers=headers, timeout=5)
        else:
            return None, 405
        return response.json(), response.status_code
    except requests.exceptions.RequestException as e:
        return {'error': 'Service unavailable'}, 503

@app.route('/api/<service>/<path:path>', methods=['GET', 'POST', 'PUT', 'DELETE'])
def proxy(service, path):
    """代理请求到微服务"""
    data = request.json if request.is_json else None
    result, status_code = forward_request(
        service,
        f'/{path}',
        method=request.method,
        data=data
    )
    
    return jsonify(result), status_code

@app.route('/health', methods=['GET'])
def health_check():
    """健康检查"""
    services_status = {}
    for service_name, service_url in SERVICES.items():
        try:
            response = requests.get(f'{service_url}/health', timeout=2)
            services_status[service_name] = 'healthy' if response.status_code == 200 else 'unhealthy'
        except:
            services_status[service_name] = 'unavailable'
    
    return jsonify({
        'gateway': 'healthy',
        'services': services_status
    }), 200

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=8000)

服务发现

1. 服务注册和发现

# services/service_discovery.py
import requests
import time
import threading
from flask import Flask, jsonify

app = Flask(__name__)

# 服务注册表
service_registry = {}

class ServiceRegistry:
    """服务注册表"""
    
    @staticmethod
    def register(service_name, service_url, health_check_url=None):
        """注册服务"""
        service_registry[service_name] = {
            'url': service_url,
            'health_check_url': health_check_url or f'{service_url}/health',
            'status': 'unknown',
            'last_check': None
        }
        print(f"服务注册: {service_name} -> {service_url}")
    
    @staticmethod
    def get_service(service_name):
        """获取服务URL"""
        service = service_registry.get(service_name)
        if service and service['status'] == 'healthy':
            return service['url']
        return None
    
    @staticmethod
    def check_health(service_name):
        """检查服务健康状态"""
        service = service_registry.get(service_name)
        if not service:
            return False
        try:
            response = requests.get(service['health_check_url'], timeout=2)
            service['status'] = 'healthy' if response.status_code == 200 else 'unhealthy'
            service['last_check'] = time.time()
            return service['status'] == 'healthy'
        except:
            service['status'] = 'unavailable'
            service['last_check'] = time.time()
            return False
    @staticmethod
    def health_check_loop():
        """定期健康检查"""
        while True:
            for service_name in list(service_registry.keys()):
                ServiceRegistry.check_health(service_name)
            time.sleep(10)  # 每10秒检查一次

# 启动健康检查线程
health_check_thread = threading.Thread(target=ServiceRegistry.health_check_loop, daemon=True)
health_check_thread.start()

@app.route('/register', methods=['POST'])
def register_service():
    """注册服务端点"""
    data = request.json
    ServiceRegistry.register(
        data['name'],
        data['url'],
        data.get('health_check_url')
    )
    return jsonify({'message': 'Service registered'}), 201

@app.route('/services/<service_name>', methods=['GET'])
def get_service(service_name):
    """获取服务信息"""
    service = service_registry.get(service_name)
    if service:
        return jsonify(service), 200
    return jsonify({'error': 'Service not found'}), 404

@app.route('/services', methods=['GET'])
def list_services():
    """列出所有服务"""
    return jsonify(service_registry), 200

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=8500)

负载均衡

1. 简单负载均衡器

# gateway/load_balancer.py
import random
import requests
from collections import defaultdict

class LoadBalancer:
    """负载均衡器"""
    
    def __init__(self):
        self.services = defaultdict(list)
        self.request_counts = defaultdict(int)
    
    def add_service(self, service_name, service_url):
        """添加服务实例"""
        self.services[service_name].append(service_url)
        print(f"添加服务实例: {service_name} -> {service_url}")
    
    def get_service_url(self, service_name, strategy='round_robin'):
        """获取服务URL(负载均衡)"""
        instances = self.services.get(service_name)
        if not instances:
            return None
        if strategy == 'round_robin':
            # 轮询
            index = self.request_counts[service_name] % len(instances)
            self.request_counts[service_name] += 1
            return instances[index]
        elif strategy == 'random':
            # 随机
            return random.choice(instances)
        elif strategy == 'least_connections':
            # 最少连接(简化版)
            return instances[0]  # 实际应该跟踪连接数
        else:
            return instances[0]
    
    def forward_request(self, service_name, path, method='GET', data=None, strategy='round_robin'):
        """转发请求(带负载均衡)"""
        service_url = self.get_service_url(service_name, strategy)
        if not service_url:
            return None, 404
        url = f"{service_url}{path}"
        headers = {'Content-Type': 'application/json'}
        
        try:
            if method == 'GET':
                response = requests.get(url, headers=headers, timeout=5)
            elif method == 'POST':
                response = requests.post(url, json=data, headers=headers, timeout=5)
            else:
                return None, 405
            return response.json(), response.status_code
        except requests.exceptions.RequestException as e:
            # 如果请求失败,尝试下一个实例
            if len(self.services[service_name]) > 1:
                return self.forward_request(service_name, path, method, data, strategy)
            return {'error': 'Service unavailable'}, 503

# 使用示例
if __name__ == "__main__":
    lb = LoadBalancer()
    
    # 添加多个服务实例
    lb.add_service('users', 'http://user-service-1:8001')
    lb.add_service('users', 'http://user-service-2:8001')
    lb.add_service('users', 'http://user-service-3:8001')
    
    # 转发请求(自动负载均衡)
    result, status = lb.forward_request('users', '/users', 'GET')
    print(f"响应: {result}, 状态码: {status}")

总结

微服务架构的关键要点:

  1. 服务拆分:按业务领域拆分服务,保持服务独立性
  2. API网关:统一入口、路由转发、请求聚合
  3. 服务发现:服务注册、健康检查、动态发现
  4. 负载均衡:轮询、随机、最少连接等策略
  5. 服务通信:REST API、消息队列、gRPC
  6. 数据管理:每个服务独立数据库、数据同步
  7. 监控追踪:分布式追踪、服务监控、日志聚合

掌握这些微服务技能,可以实现高可扩展、高可维护的Python应用架构,为大型应用提供强大的微服务支持。


转载请注明:周志洋的博客 » Python云计算-微服务架构详解

打赏一个呗

取消

感谢您的支持,我会继续努力的!

扫码支持
扫码支持
扫码打赏,你说多少就多少

打开支付宝扫一扫,即可进行扫码打赏哦