周志洋

个人站

持续学习 才能不被淘汰


Python云计算-数据服务详解

数据服务的重要性

云数据服务是现代应用架构的核心组件,提供可扩展、高可用的数据存储和查询能力。Python应用通过云数据服务可以实现对象存储、关系数据库、NoSQL数据库和搜索引擎等数据管理需求。本文将从S3对象存储到Elasticsearch搜索,全面介绍Python云数据服务的最佳实践。

S3对象存储

1. S3基础操作

# app/s3_manager.py
import boto3
from botocore.exceptions import ClientError
from io import BytesIO
import json

class S3Manager:
    """S3管理器"""
    
    def __init__(self, bucket_name, region_name='us-west-2'):
        self.bucket_name = bucket_name
        self.s3_client = boto3.client('s3', region_name=region_name)
        self.s3_resource = boto3.resource('s3', region_name=region_name)
    
    def upload_file(self, local_file, s3_key, metadata=None):
        """上传文件到S3"""
        try:
            extra_args = {}
            if metadata:
                extra_args['Metadata'] = metadata
            self.s3_client.upload_file(
                local_file,
                self.bucket_name,
                s3_key,
                ExtraArgs=extra_args
            )
            print(f"文件上传成功: s3://{self.bucket_name}/{s3_key}")
            return True
        except ClientError as e:
            print(f"上传失败: {e}")
            return False
    def download_file(self, s3_key, local_file):
        """从S3下载文件"""
        try:
            self.s3_client.download_file(
                self.bucket_name,
                s3_key,
                local_file
            )
            print(f"文件下载成功: {local_file}")
            return True
        except ClientError as e:
            print(f"下载失败: {e}")
            return False
    def upload_string(self, content, s3_key, content_type='text/plain'):
        """上传字符串内容"""
        try:
            self.s3_client.put_object(
                Bucket=self.bucket_name,
                Key=s3_key,
                Body=content.encode('utf-8'),
                ContentType=content_type
            )
            return True
        except ClientError as e:
            print(f"上传失败: {e}")
            return False
    def download_string(self, s3_key):
        """下载字符串内容"""
        try:
            response = self.s3_client.get_object(
                Bucket=self.bucket_name,
                Key=s3_key
            )
            return response['Body'].read().decode('utf-8')
        except ClientError as e:
            print(f"下载失败: {e}")
            return None
    def list_objects(self, prefix='', max_keys=1000):
        """列出对象"""
        try:
            response = self.s3_client.list_objects_v2(
                Bucket=self.bucket_name,
                Prefix=prefix,
                MaxKeys=max_keys
            )
            return [obj['Key'] for obj in response.get('Contents', [])]
        except ClientError as e:
            print(f"列出对象失败: {e}")
            return []
    def delete_object(self, s3_key):
        """删除对象"""
        try:
            self.s3_client.delete_object(
                Bucket=self.bucket_name,
                Key=s3_key
            )
            print(f"对象删除成功: {s3_key}")
            return True
        except ClientError as e:
            print(f"删除失败: {e}")
            return False
    def generate_presigned_url(self, s3_key, expiration=3600):
        """生成预签名URL"""
        try:
            url = self.s3_client.generate_presigned_url(
                'get_object',
                Params={'Bucket': self.bucket_name, 'Key': s3_key},
                ExpiresIn=expiration
            )
            return url
        except ClientError as e:
            print(f"生成URL失败: {e}")
            return None

# 使用示例
if __name__ == "__main__":
    s3 = S3Manager('my-bucket')
    
    # 上传文件
    s3.upload_file('local_file.txt', 'remote_file.txt')
    
    # 下载文件
    s3.download_file('remote_file.txt', 'downloaded_file.txt')
    
    # 上传JSON数据
    data = {'name': '张三', 'age': 30}
    s3.upload_string(json.dumps(data), 'data.json', 'application/json')
    
    # 生成预签名URL
    url = s3.generate_presigned_url('remote_file.txt', expiration=3600)
    print(f"预签名URL: {url}")

DynamoDB NoSQL数据库

1. DynamoDB基础操作

# app/dynamodb_manager.py
import boto3
from boto3.dynamodb.conditions import Key, Attr
from botocore.exceptions import ClientError
from decimal import Decimal
import json

class DynamoDBManager:
    """DynamoDB管理器"""
    
    def __init__(self, table_name, region_name='us-west-2'):
        self.table_name = table_name
        self.dynamodb = boto3.resource('dynamodb', region_name=region_name)
        self.table = self.dynamodb.Table(table_name)
        self.client = boto3.client('dynamodb', region_name=region_name)
    
    def put_item(self, item):
        """插入或更新项目"""
        try:
            # 转换Decimal类型
            item = self._convert_decimals(item)
            self.table.put_item(Item=item)
            print(f"项目插入成功: {item}")
            return True
        except ClientError as e:
            print(f"插入失败: {e}")
            return False
    def get_item(self, key):
        """获取项目"""
        try:
            response = self.table.get_item(Key=key)
            if 'Item' in response:
                return self._convert_from_decimals(response['Item'])
            return None
        except ClientError as e:
            print(f"获取失败: {e}")
            return None
    def update_item(self, key, update_expression, expression_values):
        """更新项目"""
        try:
            # 转换Decimal类型
            expression_values = self._convert_decimals(expression_values)
            
            self.table.update_item(
                Key=key,
                UpdateExpression=update_expression,
                ExpressionAttributeValues=expression_values,
                ReturnValues='UPDATED_NEW'
            )
            return True
        except ClientError as e:
            print(f"更新失败: {e}")
            return False
    def query(self, key_condition_expression, filter_expression=None):
        """查询项目"""
        try:
            kwargs = {'KeyConditionExpression': key_condition_expression}
            if filter_expression:
                kwargs['FilterExpression'] = filter_expression
            response = self.table.query(**kwargs)
            items = [self._convert_from_decimals(item) for item in response['Items']]
            return items
        except ClientError as e:
            print(f"查询失败: {e}")
            return []
    def scan(self, filter_expression=None, limit=None):
        """扫描表"""
        try:
            kwargs = {}
            if filter_expression:
                kwargs['FilterExpression'] = filter_expression
            if limit:
                kwargs['Limit'] = limit
            response = self.table.scan(**kwargs)
            items = [self._convert_from_decimals(item) for item in response['Items']]
            return items
        except ClientError as e:
            print(f"扫描失败: {e}")
            return []
    def delete_item(self, key):
        """删除项目"""
        try:
            self.table.delete_item(Key=key)
            print(f"项目删除成功: {key}")
            return True
        except ClientError as e:
            print(f"删除失败: {e}")
            return False
    def _convert_decimals(self, obj):
        """转换Decimal类型"""
        if isinstance(obj, dict):
            return {k: self._convert_decimals(v) for k, v in obj.items()}
        elif isinstance(obj, list):
            return [self._convert_decimals(item) for item in obj]
        elif isinstance(obj, float):
            return Decimal(str(obj))
        return obj
    def _convert_from_decimals(self, obj):
        """从Decimal转换"""
        if isinstance(obj, dict):
            return {k: self._convert_from_decimals(v) for k, v in obj.items()}
        elif isinstance(obj, list):
            return [self._convert_from_decimals(item) for item in obj]
        elif isinstance(obj, Decimal):
            return float(obj)
        return obj

# 使用示例
if __name__ == "__main__":
    db = DynamoDBManager('Users')
    
    # 插入用户
    user = {
        'id': 'user-123',
        'name': '张三',
        'email': 'zhangsan@example.com',
        'age': 30
    }
    db.put_item(user)
    
    # 查询用户
    user = db.get_item({'id': 'user-123'})
    print(f"用户信息: {user}")
    
    # 更新用户
    db.update_item(
        {'id': 'user-123'},
        'SET age = :age',
        {':age': 31}
    )
    
    # 查询所有用户
    users = db.scan()
    print(f"所有用户: {users}")

RDS关系数据库

1. RDS连接和操作

# app/rds_manager.py
import psycopg2
from psycopg2 import pool
from contextlib import contextmanager
import os

class RDSManager:
    """RDS管理器"""
    
    def __init__(self, db_config):
        self.db_config = db_config
        self.connection_pool = None
        self._create_pool()
    
    def _create_pool(self):
        """创建连接池"""
        try:
            self.connection_pool = pool.SimpleConnectionPool(
                1, 20,
                host=self.db_config['host'],
                port=self.db_config.get('port', 5432),
                database=self.db_config['database'],
                user=self.db_config['user'],
                password=self.db_config['password']
            )
        except Exception as e:
            print(f"创建连接池失败: {e}")
    
    @contextmanager
    def get_connection(self):
        """获取数据库连接"""
        conn = self.connection_pool.getconn()
        try:
            yield conn
        finally:
            self.connection_pool.putconn(conn)
    
    def execute_query(self, query, params=None):
        """执行查询"""
        with self.get_connection() as conn:
            with conn.cursor() as cur:
                cur.execute(query, params)
                return cur.fetchall()
    
    def execute_update(self, query, params=None):
        """执行更新"""
        with self.get_connection() as conn:
            with conn.cursor() as cur:
                cur.execute(query, params)
                conn.commit()
                return cur.rowcount

# 使用示例
if __name__ == "__main__":
    db_config = {
        'host': os.getenv('DB_HOST'),
        'database': os.getenv('DB_NAME'),
        'user': os.getenv('DB_USER'),
        'password': os.getenv('DB_PASSWORD')
    }
    
    rds = RDSManager(db_config)
    
    # 查询数据
    users = rds.execute_query('SELECT * FROM users WHERE age > %s', (25,))
    print(f"用户列表: {users}")
    
    # 插入数据
    rds.execute_update(
        'INSERT INTO users (name, email) VALUES (%s, %s)',
        ('李四', 'lisi@example.com')
    )

Elasticsearch搜索

1. Elasticsearch操作

# app/elasticsearch_manager.py
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk

class ElasticsearchManager:
    """Elasticsearch管理器"""
    
    def __init__(self, hosts=['localhost:9200']):
        self.es = Elasticsearch(hosts)
    
    def create_index(self, index_name, mapping=None):
        """创建索引"""
        if not self.es.indices.exists(index=index_name):
            body = {}
            if mapping:
                body['mappings'] = mapping
            self.es.indices.create(index=index_name, body=body)
            print(f"索引创建成功: {index_name}")
            return True
        return False
    def index_document(self, index_name, doc_id, document):
        """索引文档"""
        self.es.index(index=index_name, id=doc_id, body=document)
        print(f"文档索引成功: {doc_id}")
    
    def search(self, index_name, query, size=10):
        """搜索文档"""
        response = self.es.search(index=index_name, body=query, size=size)
        return [hit['_source'] for hit in response['hits']['hits']]
    
    def bulk_index(self, index_name, documents):
        """批量索引"""
        actions = [
            {
                '_index': index_name,
                '_id': doc.get('id'),
                '_source': doc
            }
            for doc in documents
        ]
        bulk(self.es, actions)
        print(f"批量索引完成: {len(documents)} 个文档")

# 使用示例
if __name__ == "__main__":
    es = ElasticsearchManager()
    
    # 创建索引
    mapping = {
        'properties': {
            'name': {'type': 'text'},
            'age': {'type': 'integer'}
        }
    }
    es.create_index('users', mapping)
    
    # 索引文档
    es.index_document('users', '1', {'name': '张三', 'age': 30})
    
    # 搜索
    query = {'query': {'match': {'name': '张三'}}}
    results = es.search('users', query)
    print(f"搜索结果: {results}")

总结

数据服务的关键要点:

  1. S3对象存储:文件上传下载、预签名URL、批量操作
  2. DynamoDB:NoSQL数据库、键值查询、扫描操作
  3. RDS:关系数据库、连接池、事务处理
  4. Elasticsearch:全文搜索、索引管理、批量操作
  5. 数据同步:跨服务数据同步、数据备份
  6. 性能优化:连接池、批量操作、索引优化
  7. 错误处理:异常捕获、重试机制、降级策略

掌握这些数据服务技能,可以实现高效的数据存储、查询和管理,为Python应用提供强大的云数据支持。


转载请注明:周志洋的博客 » Python云计算-数据服务详解

打赏一个呗

取消

感谢您的支持,我会继续努力的!

扫码支持
扫码支持
扫码打赏,你说多少就多少

打开支付宝扫一扫,即可进行扫码打赏哦