数据服务的重要性
云数据服务是现代应用架构的核心组件,提供可扩展、高可用的数据存储和查询能力。Python应用通过云数据服务可以实现对象存储、关系数据库、NoSQL数据库和搜索引擎等数据管理需求。本文将从S3对象存储到Elasticsearch搜索,全面介绍Python云数据服务的最佳实践。
S3对象存储
1. S3基础操作
# app/s3_manager.py
import boto3
from botocore.exceptions import ClientError
from io import BytesIO
import json
class S3Manager:
"""S3管理器"""
def __init__(self, bucket_name, region_name='us-west-2'):
self.bucket_name = bucket_name
self.s3_client = boto3.client('s3', region_name=region_name)
self.s3_resource = boto3.resource('s3', region_name=region_name)
def upload_file(self, local_file, s3_key, metadata=None):
"""上传文件到S3"""
try:
extra_args = {}
if metadata:
extra_args['Metadata'] = metadata
self.s3_client.upload_file(
local_file,
self.bucket_name,
s3_key,
ExtraArgs=extra_args
)
print(f"文件上传成功: s3://{self.bucket_name}/{s3_key}")
return True
except ClientError as e:
print(f"上传失败: {e}")
return False
def download_file(self, s3_key, local_file):
"""从S3下载文件"""
try:
self.s3_client.download_file(
self.bucket_name,
s3_key,
local_file
)
print(f"文件下载成功: {local_file}")
return True
except ClientError as e:
print(f"下载失败: {e}")
return False
def upload_string(self, content, s3_key, content_type='text/plain'):
"""上传字符串内容"""
try:
self.s3_client.put_object(
Bucket=self.bucket_name,
Key=s3_key,
Body=content.encode('utf-8'),
ContentType=content_type
)
return True
except ClientError as e:
print(f"上传失败: {e}")
return False
def download_string(self, s3_key):
"""下载字符串内容"""
try:
response = self.s3_client.get_object(
Bucket=self.bucket_name,
Key=s3_key
)
return response['Body'].read().decode('utf-8')
except ClientError as e:
print(f"下载失败: {e}")
return None
def list_objects(self, prefix='', max_keys=1000):
"""列出对象"""
try:
response = self.s3_client.list_objects_v2(
Bucket=self.bucket_name,
Prefix=prefix,
MaxKeys=max_keys
)
return [obj['Key'] for obj in response.get('Contents', [])]
except ClientError as e:
print(f"列出对象失败: {e}")
return []
def delete_object(self, s3_key):
"""删除对象"""
try:
self.s3_client.delete_object(
Bucket=self.bucket_name,
Key=s3_key
)
print(f"对象删除成功: {s3_key}")
return True
except ClientError as e:
print(f"删除失败: {e}")
return False
def generate_presigned_url(self, s3_key, expiration=3600):
"""生成预签名URL"""
try:
url = self.s3_client.generate_presigned_url(
'get_object',
Params={'Bucket': self.bucket_name, 'Key': s3_key},
ExpiresIn=expiration
)
return url
except ClientError as e:
print(f"生成URL失败: {e}")
return None
# 使用示例
if __name__ == "__main__":
s3 = S3Manager('my-bucket')
# 上传文件
s3.upload_file('local_file.txt', 'remote_file.txt')
# 下载文件
s3.download_file('remote_file.txt', 'downloaded_file.txt')
# 上传JSON数据
data = {'name': '张三', 'age': 30}
s3.upload_string(json.dumps(data), 'data.json', 'application/json')
# 生成预签名URL
url = s3.generate_presigned_url('remote_file.txt', expiration=3600)
print(f"预签名URL: {url}")
DynamoDB NoSQL数据库
1. DynamoDB基础操作
# app/dynamodb_manager.py
import boto3
from boto3.dynamodb.conditions import Key, Attr
from botocore.exceptions import ClientError
from decimal import Decimal
import json
class DynamoDBManager:
"""DynamoDB管理器"""
def __init__(self, table_name, region_name='us-west-2'):
self.table_name = table_name
self.dynamodb = boto3.resource('dynamodb', region_name=region_name)
self.table = self.dynamodb.Table(table_name)
self.client = boto3.client('dynamodb', region_name=region_name)
def put_item(self, item):
"""插入或更新项目"""
try:
# 转换Decimal类型
item = self._convert_decimals(item)
self.table.put_item(Item=item)
print(f"项目插入成功: {item}")
return True
except ClientError as e:
print(f"插入失败: {e}")
return False
def get_item(self, key):
"""获取项目"""
try:
response = self.table.get_item(Key=key)
if 'Item' in response:
return self._convert_from_decimals(response['Item'])
return None
except ClientError as e:
print(f"获取失败: {e}")
return None
def update_item(self, key, update_expression, expression_values):
"""更新项目"""
try:
# 转换Decimal类型
expression_values = self._convert_decimals(expression_values)
self.table.update_item(
Key=key,
UpdateExpression=update_expression,
ExpressionAttributeValues=expression_values,
ReturnValues='UPDATED_NEW'
)
return True
except ClientError as e:
print(f"更新失败: {e}")
return False
def query(self, key_condition_expression, filter_expression=None):
"""查询项目"""
try:
kwargs = {'KeyConditionExpression': key_condition_expression}
if filter_expression:
kwargs['FilterExpression'] = filter_expression
response = self.table.query(**kwargs)
items = [self._convert_from_decimals(item) for item in response['Items']]
return items
except ClientError as e:
print(f"查询失败: {e}")
return []
def scan(self, filter_expression=None, limit=None):
"""扫描表"""
try:
kwargs = {}
if filter_expression:
kwargs['FilterExpression'] = filter_expression
if limit:
kwargs['Limit'] = limit
response = self.table.scan(**kwargs)
items = [self._convert_from_decimals(item) for item in response['Items']]
return items
except ClientError as e:
print(f"扫描失败: {e}")
return []
def delete_item(self, key):
"""删除项目"""
try:
self.table.delete_item(Key=key)
print(f"项目删除成功: {key}")
return True
except ClientError as e:
print(f"删除失败: {e}")
return False
def _convert_decimals(self, obj):
"""转换Decimal类型"""
if isinstance(obj, dict):
return {k: self._convert_decimals(v) for k, v in obj.items()}
elif isinstance(obj, list):
return [self._convert_decimals(item) for item in obj]
elif isinstance(obj, float):
return Decimal(str(obj))
return obj
def _convert_from_decimals(self, obj):
"""从Decimal转换"""
if isinstance(obj, dict):
return {k: self._convert_from_decimals(v) for k, v in obj.items()}
elif isinstance(obj, list):
return [self._convert_from_decimals(item) for item in obj]
elif isinstance(obj, Decimal):
return float(obj)
return obj
# 使用示例
if __name__ == "__main__":
db = DynamoDBManager('Users')
# 插入用户
user = {
'id': 'user-123',
'name': '张三',
'email': 'zhangsan@example.com',
'age': 30
}
db.put_item(user)
# 查询用户
user = db.get_item({'id': 'user-123'})
print(f"用户信息: {user}")
# 更新用户
db.update_item(
{'id': 'user-123'},
'SET age = :age',
{':age': 31}
)
# 查询所有用户
users = db.scan()
print(f"所有用户: {users}")
RDS关系数据库
1. RDS连接和操作
# app/rds_manager.py
import psycopg2
from psycopg2 import pool
from contextlib import contextmanager
import os
class RDSManager:
"""RDS管理器"""
def __init__(self, db_config):
self.db_config = db_config
self.connection_pool = None
self._create_pool()
def _create_pool(self):
"""创建连接池"""
try:
self.connection_pool = pool.SimpleConnectionPool(
1, 20,
host=self.db_config['host'],
port=self.db_config.get('port', 5432),
database=self.db_config['database'],
user=self.db_config['user'],
password=self.db_config['password']
)
except Exception as e:
print(f"创建连接池失败: {e}")
@contextmanager
def get_connection(self):
"""获取数据库连接"""
conn = self.connection_pool.getconn()
try:
yield conn
finally:
self.connection_pool.putconn(conn)
def execute_query(self, query, params=None):
"""执行查询"""
with self.get_connection() as conn:
with conn.cursor() as cur:
cur.execute(query, params)
return cur.fetchall()
def execute_update(self, query, params=None):
"""执行更新"""
with self.get_connection() as conn:
with conn.cursor() as cur:
cur.execute(query, params)
conn.commit()
return cur.rowcount
# 使用示例
if __name__ == "__main__":
db_config = {
'host': os.getenv('DB_HOST'),
'database': os.getenv('DB_NAME'),
'user': os.getenv('DB_USER'),
'password': os.getenv('DB_PASSWORD')
}
rds = RDSManager(db_config)
# 查询数据
users = rds.execute_query('SELECT * FROM users WHERE age > %s', (25,))
print(f"用户列表: {users}")
# 插入数据
rds.execute_update(
'INSERT INTO users (name, email) VALUES (%s, %s)',
('李四', 'lisi@example.com')
)
Elasticsearch搜索
1. Elasticsearch操作
# app/elasticsearch_manager.py
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
class ElasticsearchManager:
"""Elasticsearch管理器"""
def __init__(self, hosts=['localhost:9200']):
self.es = Elasticsearch(hosts)
def create_index(self, index_name, mapping=None):
"""创建索引"""
if not self.es.indices.exists(index=index_name):
body = {}
if mapping:
body['mappings'] = mapping
self.es.indices.create(index=index_name, body=body)
print(f"索引创建成功: {index_name}")
return True
return False
def index_document(self, index_name, doc_id, document):
"""索引文档"""
self.es.index(index=index_name, id=doc_id, body=document)
print(f"文档索引成功: {doc_id}")
def search(self, index_name, query, size=10):
"""搜索文档"""
response = self.es.search(index=index_name, body=query, size=size)
return [hit['_source'] for hit in response['hits']['hits']]
def bulk_index(self, index_name, documents):
"""批量索引"""
actions = [
{
'_index': index_name,
'_id': doc.get('id'),
'_source': doc
}
for doc in documents
]
bulk(self.es, actions)
print(f"批量索引完成: {len(documents)} 个文档")
# 使用示例
if __name__ == "__main__":
es = ElasticsearchManager()
# 创建索引
mapping = {
'properties': {
'name': {'type': 'text'},
'age': {'type': 'integer'}
}
}
es.create_index('users', mapping)
# 索引文档
es.index_document('users', '1', {'name': '张三', 'age': 30})
# 搜索
query = {'query': {'match': {'name': '张三'}}}
results = es.search('users', query)
print(f"搜索结果: {results}")
总结
数据服务的关键要点:
- S3对象存储:文件上传下载、预签名URL、批量操作
- DynamoDB:NoSQL数据库、键值查询、扫描操作
- RDS:关系数据库、连接池、事务处理
- Elasticsearch:全文搜索、索引管理、批量操作
- 数据同步:跨服务数据同步、数据备份
- 性能优化:连接池、批量操作、索引优化
- 错误处理:异常捕获、重试机制、降级策略
掌握这些数据服务技能,可以实现高效的数据存储、查询和管理,为Python应用提供强大的云数据支持。
转载请注明:周志洋的博客 » Python云计算-数据服务详解


