周志洋

个人站

持续学习 才能不被淘汰


Python数据分析-大数据处理详解

大数据处理的重要性

随着数据量的爆炸式增长,传统的数据处理方法已经无法满足大数据分析的需求。Python提供了多种处理大数据的工具和技术,包括Dask、分块处理、内存优化等。这些技术使我们能够在有限的计算资源下处理TB级别的数据。本文将从基础的Dask使用到高级的分布式计算,全面介绍Python大数据处理的最佳实践。

Dask基础

1. Dask DataFrame基础

def dask_basics_demo():
    """Dask基础演示"""
    print("=== Dask基础 ===")
    
    import dask.dataframe as dd
    import pandas as pd
    import numpy as np
    from datetime import datetime
    
    # 1. 创建Dask DataFrame
    print("1. 创建Dask DataFrame:")
    
    def create_dask_dataframe():
        """创建Dask DataFrame"""
        # 创建大型数据集
        n_rows = 1000000  # 100万行数据
        data = {
            'id': range(n_rows),
            'category': np.random.choice(['A', 'B', 'C', 'D'], n_rows),
            'value': np.random.randn(n_rows),
            'date': pd.date_range('2019-01-01', periods=n_rows, freq='H')[:n_rows],
            'region': np.random.choice(['North', 'South', 'East', 'West'], n_rows)
        }
        
        # 转换为Dask DataFrame
        ddf = dd.from_pandas(pd.DataFrame(data), npartitions=4)
        print(f"   Dask DataFrame创建成功: {len(ddf)} 行数据")
        print(f"   分区数量: {ddf.npartitions}")
        
        # 显示基本信息
        print(f"   数据类型:")
        print(ddf.dtypes)
        
        return ddf
    
    dask_df = create_dask_dataframe()
    
    # 2. 基本操作
    print("\n2. 基本操作:")
    
    def basic_operations():
        """基本操作"""
        # 延迟计算 - 不会立即执行
        print("   延迟计算示例:")
        
        # 筛选操作
        filtered_ddf = dask_df[dask_df['category'] == 'A']
        print(f"   筛选category='A'的记录")
        
        # 分组聚合
        grouped = dask_df.groupby('category').agg({
            'value': ['mean', 'sum', 'count'],
            'id': 'count'
        })
        print(f"   按category分组聚合")
        
        # 排序
        sorted_ddf = dask_df.sort_values('value')
        print(f"   按value排序")
        
        # 计算实际结果
        print("   执行计算...")
        result = grouped.compute()
        print(f"   分组聚合结果:")
        print(result.head())
        
        return result
    
    aggregation_result = basic_operations()
    
    # 3. 内存优化
    print("\n3. 内存优化:")
    
    def memory_optimization():
        """内存优化"""
        # 数据类型优化
        print("   数据类型优化:")
        
        # 原始内存使用
        original_memory = dask_df.memory_usage(deep=True).sum().compute()
        print(f"   原始内存使用: {original_memory / 1024**2:.2f} MB")
        
        # 优化数据类型
        optimized_ddf = dask_df.copy()
        optimized_ddf['category'] = optimized_ddf['category'].astype('category')
        optimized_ddf['region'] = optimized_ddf['region'].astype('category')
        optimized_ddf['value'] = optimized_ddf['value'].astype('float32')
        
        optimized_memory = optimized_ddf.memory_usage(deep=True).sum().compute()
        print(f"   优化后内存使用: {optimized_memory / 1024**2:.2f} MB")
        print(f"   内存节省: {((original_memory - optimized_memory) / original_memory) * 100:.1f}%")
        
        return optimized_ddf
    
    optimized_df = memory_optimization()
    
    return dask_df, optimized_df

dask_dataframe, optimized_dataframe = dask_basics_demo()

2. 大文件处理

def large_file_processing_demo():
    """大文件处理演示"""
    print("\n=== 大文件处理 ===")
    
    import dask.dataframe as dd
    import pandas as pd
    import numpy as np
    import os
    
    # 1. 创建大型CSV文件
    print("1. 创建大型CSV文件:")
    
    def create_large_csv():
        """创建大型CSV文件用于演示"""
        file_path = "large_dataset_2019.csv"
        
        if not os.path.exists(file_path):
            print("   正在创建大型CSV文件...")
            
            # 分块写入大文件
            chunk_size = 50000
            total_chunks = 20  # 100万行数据
            
            for i in range(total_chunks):
                # 创建数据块
                chunk_data = {
                    'id': range(i * chunk_size, (i + 1) * chunk_size),
                    'timestamp': pd.date_range('2019-01-01', periods=chunk_size, freq='min')[i*chunk_size:(i+1)*chunk_size],
                    'category': np.random.choice(['Electronics', 'Clothing', 'Books', 'Home'], chunk_size),
                    'price': np.random.uniform(10, 1000, chunk_size),
                    'quantity': np.random.randint(1, 100, chunk_size),
                    'customer_id': np.random.randint(1, 10000, chunk_size),
                    'region': np.random.choice(['North', 'South', 'East', 'West', 'Central'], chunk_size)
                }
                
                chunk_df = pd.DataFrame(chunk_data)
                
                # 写入文件
                mode = 'w' if i == 0 else 'a'
                header = True if i == 0 else False
                chunk_df.to_csv(file_path, mode=mode, header=header, index=False)
                
                print(f"     已写入 {i+1}/{total_chunks} 个数据块")
            
            print(f"   大型CSV文件创建完成: {file_path}")
        else:
            print(f"   文件已存在: {file_path}")
        
        return file_path
    
    large_file = create_large_csv()
    
    # 2. 使用Dask读取大文件
    print("\n2. 使用Dask读取大文件:")
    
    def read_large_file_with_dask():
        """使用Dask读取大文件"""
        # 读取大文件(延迟加载)
        ddf = dd.read_csv(large_file, parse_dates=['timestamp'])
        
        print(f"   Dask DataFrame信息:")
        print(f"   行数: {len(ddf)}")
        print(f"   列数: {len(ddf.columns)}")
        print(f"   分区数: {ddf.npartitions}")
        print(f"   数据类型:")
        print(ddf.dtypes)
        
        # 快速预览
        print(f"\n   数据预览:")
        print(ddf.head().compute())
        
        return ddf
    
    large_ddf = read_large_file_with_dask()
    
    # 3. 大数据分析
    print("\n3. 大数据分析:")
    
    def big_data_analysis():
        """大数据分析"""
        # 按类别统计
        category_stats = large_ddf.groupby('category').agg({
            'price': ['mean', 'sum', 'count'],
            'quantity': ['sum', 'mean']
        }).compute()
        
        print("   按类别统计:")
        print(category_stats)
        
        # 按地区统计
        region_stats = large_ddf.groupby('region').agg({
            'price': 'sum',
            'quantity': 'sum'
        }).compute()
        
        print(f"\n   按地区统计:")
        print(region_stats)
        
        # 时间序列分析
        daily_sales = large_ddf.set_index('timestamp').resample('D').agg({
            'price': 'sum',
            'quantity': 'sum'
        }).compute()
        
        print(f"\n   每日销售统计:")
        print(daily_sales.head())
        
        return category_stats, region_stats, daily_sales
    
    cat_stats, reg_stats, daily_stats = big_data_analysis()
    
    return large_ddf, cat_stats

large_file_ddf, file_analysis = large_file_processing_demo()

3. 分块处理

def chunk_processing_demo():
    """分块处理演示"""
    print("\n=== 分块处理 ===")
    
    import pandas as pd
    import numpy as np
    from datetime import datetime
    
    # 1. 基础分块处理
    print("1. 基础分块处理:")
    
    def basic_chunk_processing():
        """基础分块处理"""
        chunk_size = 10000
        output_file = "processed_data_2019.csv"
        
        # 清空输出文件
        if os.path.exists(output_file):
            os.remove(output_file)
        
        processed_rows = 0
        total_processed = 0
        
        print(f"   开始分块处理,块大小: {chunk_size}")
        
        for i, chunk in enumerate(pd.read_csv('large_dataset_2019.csv', chunksize=chunk_size)):
            # 数据清洗和处理
            processed_chunk = chunk.copy()
            
            # 数据类型转换
            processed_chunk['price'] = pd.to_numeric(processed_chunk['price'], errors='coerce')
            processed_chunk['quantity'] = pd.to_numeric(processed_chunk['quantity'], errors='coerce')
            
            # 数据验证
            processed_chunk = processed_chunk.dropna(subset=['price', 'quantity'])
            
            # 添加计算列
            processed_chunk['total_amount'] = processed_chunk['price'] * processed_chunk['quantity']
            processed_chunk['processing_date'] = datetime.now().isoformat()
            
            # 筛选条件
            high_value_orders = processed_chunk[processed_chunk['total_amount'] > 500]
            
            # 保存结果
            mode = 'w' if i == 0 else 'a'
            header = True if i == 0 else False
            high_value_orders.to_csv(output_file, mode=mode, header=header, index=False)
            
            processed_rows += len(high_value_orders)
            total_processed += len(chunk)
            
            if (i + 1) % 5 == 0:  # 每5个块报告一次进度
                print(f"     已处理 {i + 1} 个数据块,筛选出 {processed_rows} 条高价值订单")
        
        print(f"   分块处理完成:")
        print(f"   总处理行数: {total_processed}")
        print(f"   筛选结果行数: {processed_rows}")
        print(f"   筛选率: {processed_rows/total_processed*100:.2f}%")
        
        return processed_rows, total_processed
    
    processed_count, total_count = basic_chunk_processing()
    
    return processed_count, total_count

chunk_processing_demo()

4. 内存优化技术

def memory_optimization_demo():
    """内存优化演示"""
    print("\n=== 内存优化技术 ===")
    
    import pandas as pd
    import numpy as np
    import gc
    
    # 1. 数据类型优化
    print("1. 数据类型优化:")
    
    def data_type_optimization():
        """数据类型优化"""
        # 创建测试数据
        n_rows = 100000
        data = {
            'id': np.random.randint(0, 1000000, n_rows),
            'category': np.random.choice(['A', 'B', 'C', 'D', 'E'], n_rows),
            'score': np.random.uniform(0, 100, n_rows),
            'flag': np.random.choice([True, False], n_rows),
            'timestamp': pd.date_range('2019-01-01', periods=n_rows, freq='min')
        }
        
        df_original = pd.DataFrame(data)
        
        # 原始内存使用
        original_memory = df_original.memory_usage(deep=True).sum()
        print(f"   原始内存使用: {original_memory / 1024**2:.2f} MB")
        
        # 优化数据类型
        df_optimized = df_original.copy()
        
        # 整数类型优化
        df_optimized['id'] = pd.to_numeric(df_optimized['id'], downcast='integer')
        
        # 浮点类型优化
        df_optimized['score'] = pd.to_numeric(df_optimized['score'], downcast='float')
        
        # 类别类型优化
        df_optimized['category'] = df_optimized['category'].astype('category')
        
        # 布尔类型优化
        df_optimized['flag'] = df_optimized['flag'].astype('bool')
        
        # 优化后内存使用
        optimized_memory = df_optimized.memory_usage(deep=True).sum()
        print(f"   优化后内存使用: {optimized_memory / 1024**2:.2f} MB")
        print(f"   内存节省: {((original_memory - optimized_memory) / original_memory) * 100:.1f}%")
        
        return df_optimized
    
    optimized_df = data_type_optimization()
    
    # 2. 流式处理
    print("\n2. 流式处理:")
    
    def streaming_processing():
        """流式处理"""
        def process_streaming_data():
            """流式处理数据"""
            chunk_size = 1000
            total_processed = 0
            running_sum = 0
            running_count = 0
            
            print(f"   开始流式处理...")
            
            for i, chunk in enumerate(pd.read_csv('large_dataset_2019.csv', chunksize=chunk_size)):
                # 流式计算统计量
                chunk_sum = chunk['price'].sum()
                chunk_count = len(chunk)
                
                running_sum += chunk_sum
                running_count += chunk_count
                total_processed += chunk_count
                
                # 每100个块报告一次
                if (i + 1) % 100 == 0:
                    avg_price = running_sum / running_count
                    print(f"     已处理 {total_processed} 行,平均价格: {avg_price:.2f}")
                
                # 限制处理数量(演示用)
                if total_processed >= 100000:
                    break
            
            final_avg = running_sum / running_count
            print(f"   流式处理完成:")
            print(f"   总处理行数: {total_processed}")
            print(f"   最终平均价格: {final_avg:.2f}")
            
            return final_avg
        
        streaming_result = process_streaming_data()
        return streaming_result
    
    stream_result = streaming_processing()
    
    return optimized_df, stream_result

memory_optimization_demo()

5. 分布式计算

def distributed_computing_demo():
    """分布式计算演示"""
    print("\n=== 分布式计算 ===")
    
    import dask
    import dask.dataframe as dd
    from dask.distributed import Client, LocalCluster
    
    # 1. 本地集群设置
    print("1. 本地集群设置:")
    
    def setup_local_cluster():
        """设置本地集群"""
        # 创建本地集群
        cluster = LocalCluster(
            n_workers=2,  # 2个工作进程
            threads_per_worker=2,  # 每个进程2个线程
            memory_limit='1GB'  # 每个工作进程内存限制
        )
        
        # 创建客户端
        client = Client(cluster)
        
        print(f"   集群信息:")
        print(f"   工作进程数: {len(cluster.workers)}")
        print(f"   总线程数: {cluster.total_workers * cluster.threads_per_worker}")
        print(f"   内存限制: {cluster.memory_limit}")
        
        return client, cluster
    
    client, cluster = setup_local_cluster()
    
    # 2. 分布式数据处理
    print("\n2. 分布式数据处理:")
    
    def distributed_data_processing():
        """分布式数据处理"""
        # 读取数据
        ddf = dd.read_csv('large_dataset_2019.csv')
        
        # 分布式计算
        print("   执行分布式计算...")
        
        # 复杂的数据处理任务
        result = (ddf
                 .groupby(['category', 'region'])
                 .agg({
                     'price': ['mean', 'sum', 'count'],
                     'quantity': ['sum', 'mean']
                 })
                 .compute())  # 触发分布式计算
        
        print(f"   分布式计算完成:")
        print(f"   结果形状: {result.shape}")
        print(f"   结果预览:")
        print(result.head())
        
        return result
    
    distributed_result = distributed_data_processing()
    
    # 3. 性能比较
    print("\n3. 性能比较:")
    
    def performance_comparison():
        """性能比较"""
        import time
        
        # 单线程处理
        print("   单线程处理...")
        start_time = time.time()
        
        single_thread_result = (dd.read_csv('large_dataset_2019.csv')
                               .groupby('category')
                               .agg({'price': 'mean'})
                               .compute(scheduler='single-threaded'))
        
        single_thread_time = time.time() - start_time
        print(f"   单线程处理时间: {single_thread_time:.2f} 秒")
        
        # 多线程处理
        print("   多线程处理...")
        start_time = time.time()
        
        multi_thread_result = (dd.read_csv('large_dataset_2019.csv')
                              .groupby('category')
                              .agg({'price': 'mean'})
                              .compute(scheduler='threads'))
        
        multi_thread_time = time.time() - start_time
        print(f"   多线程处理时间: {multi_thread_time:.2f} 秒")
        
        # 分布式处理
        print("   分布式处理...")
        start_time = time.time()
        
        distributed_result = (dd.read_csv('large_dataset_2019.csv')
                             .groupby('category')
                             .agg({'price': 'mean'})
                             .compute())
        
        distributed_time = time.time() - start_time
        print(f"   分布式处理时间: {distributed_time:.2f} 秒")
        
        # 性能提升
        if multi_thread_time > 0:
            speedup_threads = single_thread_time / multi_thread_time
            print(f"   多线程加速比: {speedup_threads:.2f}x")
        
        if distributed_time > 0:
            speedup_distributed = single_thread_time / distributed_time
            print(f"   分布式加速比: {speedup_distributed:.2f}x")
        
        return single_thread_time, multi_thread_time, distributed_time
    
    perf_results = performance_comparison()
    
    # 清理资源
    print("\n4. 清理资源:")
    client.close()
    cluster.close()
    print("   集群已关闭")
    
    return distributed_result, perf_results

distributed_computing_demo()

总结

大数据处理的关键要点:

  1. Dask基础:延迟计算、分区管理、基本操作
  2. 大文件处理:分块读取、内存优化、数据预处理
  3. 分块处理:基础分块、并行处理、流式处理
  4. 内存优化:数据类型优化、内存监控、垃圾回收
  5. 分布式计算:本地集群、分布式数据处理、性能优化
  6. 性能调优:分区策略、计算调度、资源管理
  7. 最佳实践:数据预处理、错误处理、监控告警

掌握这些大数据处理技能,可以高效处理TB级别的数据,构建可扩展的数据分析管道,为大规模数据分析和机器学习提供强大的计算支持。


转载请注明:周志洋的博客 » Python数据分析-大数据处理详解

打赏一个呗

取消

感谢您的支持,我会继续努力的!

扫码支持
扫码支持
扫码打赏,你说多少就多少

打开支付宝扫一扫,即可进行扫码打赏哦