周志洋

个人站

持续学习 才能不被淘汰


Python数据分析-时间序列分析详解

时间序列分析的重要性

时间序列分析是数据分析中的重要分支,广泛应用于金融、经济、气象、销售等领域。通过分析时间序列数据,我们可以识别趋势、季节性模式、周期性变化,并进行预测和异常检测。Python提供了强大的时间序列分析工具,特别是pandas和statsmodels库。本文将从基础的时间序列操作到高级的建模和预测,全面介绍Python时间序列分析的最佳实践。

时间序列基础

1. 时间序列创建和操作

def time_series_basics_demo():
    """时间序列基础演示"""
    print("=== 时间序列基础 ===")
    
    import pandas as pd
    import numpy as np
    from datetime import datetime, timedelta
    
    # 1. 创建时间序列
    print("1. 创建时间序列:")
    
    def create_time_series():
        """创建各种类型的时间序列"""
        # 日频率时间序列(2019年全年)
        dates_daily = pd.date_range('2019-01-01', periods=365, freq='D')
        daily_ts = pd.Series(
            np.random.randn(365).cumsum() + 100,  # 随机游走 + 100
            index=dates_daily,
            name='daily_values'
        )
        print(f"   日频率序列: {len(daily_ts)} 个数据点")
        print(f"   时间范围: {daily_ts.index.min()}{daily_ts.index.max()}")
        
        # 小时频率时间序列(一周)
        dates_hourly = pd.date_range('2019-04-01', periods=168, freq='H')
        hourly_ts = pd.Series(
            np.sin(np.arange(168) * 2 * np.pi / 24) + np.random.randn(168) * 0.1,
            index=dates_hourly,
            name='hourly_values'
        )
        print(f"   小时频率序列: {len(hourly_ts)} 个数据点")
        
        # 月度时间序列(2019年)
        dates_monthly = pd.date_range('2019-01-01', periods=12, freq='M')
        monthly_ts = pd.Series(
            np.random.randn(12) * 10 + 50,
            index=dates_monthly,
            name='monthly_values'
        )
        print(f"   月度序列: {len(monthly_ts)} 个数据点")
        
        return daily_ts, hourly_ts, monthly_ts
    
    daily_ts, hourly_ts, monthly_ts = create_time_series()
    
    # 2. 时间序列索引操作
    print("\n2. 时间序列索引操作:")
    
    def time_series_indexing():
        """时间序列索引操作"""
        # 按日期切片
        april_data = daily_ts['2019-04']
        print(f"   2019年4月数据: {len(april_data)} 个数据点")
        
        # 按日期范围切片
        q1_data = daily_ts['2019-01-01':'2019-03-31']
        print(f"   2019年第一季度数据: {len(q1_data)} 个数据点")
        
        # 按条件筛选
        weekend_data = daily_ts[daily_ts.index.weekday >= 5]
        print(f"   周末数据: {len(weekend_data)} 个数据点")
        
        # 提取特定时间组件
        daily_ts['year'] = daily_ts.index.year
        daily_ts['month'] = daily_ts.index.month
        daily_ts['day'] = daily_ts.index.day
        daily_ts['weekday'] = daily_ts.index.weekday
        
        print(f"   添加时间组件: year, month, day, weekday")
    
    time_series_indexing()
    
    # 3. 重采样操作
    print("\n3. 重采样操作:")
    
    def resampling_operations():
        """重采样操作"""
        # 从日频率重采样到周频率
        weekly_mean = daily_ts.resample('W').mean()
        weekly_sum = daily_ts.resample('W').sum()
        print(f"   周频率重采样(均值): {len(weekly_mean)} 个数据点")
        print(f"   周频率重采样(求和): {len(weekly_sum)} 个数据点")
        
        # 从小时频率重采样到日频率
        daily_from_hourly = hourly_ts.resample('D').agg(['mean', 'min', 'max'])
        print(f"   从小时到日重采样: {len(daily_from_hourly)} 个数据点")
        
        # 上采样(插值)
        hourly_from_daily = daily_ts.resample('H').interpolate(method='linear')
        print(f"   从日到小时上采样: {len(hourly_from_daily)} 个数据点")
        
        return weekly_mean, daily_from_hourly
    
    weekly_data, daily_aggregated = resampling_operations()
    
    return daily_ts, weekly_data

daily_series, weekly_series = time_series_basics_demo()

2. 趋势分析

def trend_analysis_demo():
    """趋势分析演示"""
    print("\n=== 趋势分析 ===")
    
    import pandas as pd
    import numpy as np
    from datetime import datetime
    
    # 1. 移动平均
    print("1. 移动平均:")
    
    def moving_averages():
        """移动平均分析"""
        # 简单移动平均
        sma_7 = daily_series.rolling(window=7).mean()
        sma_30 = daily_series.rolling(window=30).mean()
        
        print(f"   7日简单移动平均: {sma_7.dropna().iloc[0]:.2f}{sma_7.dropna().iloc[-1]:.2f}")
        print(f"   30日简单移动平均: {sma_30.dropna().iloc[0]:.2f}{sma_30.dropna().iloc[-1]:.2f}")
        
        # 加权移动平均
        weights = np.array([0.1, 0.2, 0.3, 0.4])  # 最近的值权重更大
        wma = daily_series.rolling(window=4).apply(lambda x: np.average(x, weights=weights))
        print(f"   4日加权移动平均: 已完成计算")
        
        # 指数移动平均
        ema_7 = daily_series.ewm(span=7).mean()
        ema_30 = daily_series.ewm(span=30).mean()
        
        print(f"   7日指数移动平均: {ema_7.iloc[0]:.2f}{ema_7.iloc[-1]:.2f}")
        print(f"   30日指数移动平均: {ema_30.iloc[0]:.2f}{ema_30.iloc[-1]:.2f}")
        
        return sma_7, sma_30, ema_7, ema_30
    
    sma_7, sma_30, ema_7, ema_30 = moving_averages()
    
    # 2. 趋势检测
    print("\n2. 趋势检测:")
    
    def trend_detection():
        """趋势检测"""
        # 线性趋势拟合
        from scipy import stats
        
        x = np.arange(len(daily_series))
        slope, intercept, r_value, p_value, std_err = stats.linregress(x, daily_series.values)
        
        print(f"   线性趋势斜率: {slope:.4f}")
        print(f"   相关系数: {r_value:.4f}")
        print(f"   p值: {p_value:.4f}")
        
        if slope > 0:
            print("   趋势方向: 上升")
        elif slope < 0:
            print("   趋势方向: 下降")
        else:
            print("   趋势方向: 平稳")
        
        # 趋势强度判断
        if abs(r_value) > 0.7:
            print("   趋势强度: 强")
        elif abs(r_value) > 0.4:
            print("   趋势强度: 中等")
        else:
            print("   趋势强度: 弱")
        
        return slope, r_value, p_value
    
    trend_slope, trend_correlation, trend_pvalue = trend_detection()
    
    # 3. 趋势分解
    print("\n3. 趋势分解:")
    
    def trend_decomposition():
        """趋势分解"""
        from statsmodels.tsa.seasonal import seasonal_decompose
        
        # 添加一些季节性和趋势
        t = np.arange(len(daily_series))
        seasonal = 10 * np.sin(2 * np.pi * t / 365.25)  # 年度季节性
        trend = 0.01 * t  # 线性趋势
        noise = np.random.randn(len(daily_series)) * 2
        
        synthetic_ts = daily_series + seasonal + trend + noise
        
        # 季节性分解
        decomposition = seasonal_decompose(synthetic_ts, model='additive', period=365)
        
        print(f"   原始序列均值: {synthetic_ts.mean():.2f}")
        print(f"   趋势分量均值: {decomposition.trend.mean():.2f}")
        print(f"   季节性分量标准差: {decomposition.seasonal.std():.2f}")
        print(f"   残差分量标准差: {decomposition.resid.std():.2f}")
        
        return decomposition
    
    decomposition = trend_decomposition()
    
    return sma_30, ema_30, decomposition

trend_analysis_demo()

3. 季节性分析

def seasonal_analysis_demo():
    """季节性分析演示"""
    print("\n=== 季节性分析 ===")
    
    import pandas as pd
    import numpy as np
    
    # 1. 季节性模式识别
    print("1. 季节性模式识别:")
    
    def seasonal_patterns():
        """季节性模式识别"""
        # 创建具有季节性的人工数据
        dates = pd.date_range('2019-01-01', periods=365, freq='D')
        
        # 年度季节性
        annual_seasonal = 20 * np.sin(2 * np.pi * np.arange(365) / 365.25)
        
        # 周季节性
        weekly_seasonal = 5 * np.sin(2 * np.pi * np.arange(365) / 7)
        
        # 月度季节性
        monthly_seasonal = 3 * np.sin(2 * np.pi * np.arange(365) / 30.44)
        
        # 组合季节性
        seasonal_ts = pd.Series(
            annual_seasonal + weekly_seasonal + monthly_seasonal + np.random.randn(365) * 2,
            index=dates,
            name='seasonal_data'
        )
        
        print(f"   年度季节性幅度: {annual_seasonal.max() - annual_seasonal.min():.2f}")
        print(f"   周季节性幅度: {weekly_seasonal.max() - weekly_seasonal.min():.2f}")
        print(f"   月度季节性幅度: {monthly_seasonal.max() - monthly_seasonal.min():.2f}")
        
        return seasonal_ts
    
    seasonal_data = seasonal_patterns()
    
    # 2. 自相关分析
    print("\n2. 自相关分析:")
    
    def autocorrelation_analysis():
        """自相关分析"""
        from statsmodels.tsa.stattools import acf, pacf
        
        # 计算自相关函数
        autocorr = acf(seasonal_data.dropna(), nlags=40)
        
        # 计算偏自相关函数
        partial_autocorr = pacf(seasonal_data.dropna(), nlags=40)
        
        print(f"   滞后1期自相关: {autocorr[1]:.4f}")
        print(f"   滞后7期自相关: {autocorr[7]:.4f}")
        print(f"   滞后30期自相关: {autocorr[30]:.4f}")
        
        # 找到显著的自相关滞后
        significant_lags = np.where(np.abs(autocorr) > 0.2)[0]
        print(f"   显著自相关滞后: {significant_lags[:5]}")  # 显示前5个
        
        return autocorr, partial_autocorr
    
    autocorr_values, pacf_values = autocorrelation_analysis()
    
    # 3. 周期性检测
    print("\n3. 周期性检测:")
    
    def periodicity_detection():
        """周期性检测"""
        from scipy import signal
        
        # 使用FFT检测周期性
        fft = np.fft.fft(seasonal_data.values)
        freqs = np.fft.fftfreq(len(seasonal_data))
        
        # 找到主要频率
        power_spectrum = np.abs(fft) ** 2
        dominant_freq_idx = np.argmax(power_spectrum[1:len(power_spectrum)//2]) + 1
        dominant_freq = freqs[dominant_freq_idx]
        dominant_period = 1 / abs(dominant_freq)
        
        print(f"   主要周期: {dominant_period:.1f} 天")
        
        # 检测多个周期
        sorted_indices = np.argsort(power_spectrum[1:len(power_spectrum)//2])[::-1]
        top_periods = []
        for idx in sorted_indices[:5]:
            period = 1 / abs(freqs[idx + 1])
            if 2 <= period <= 365:  # 合理的周期范围
                top_periods.append(period)
        
        print(f"   主要周期列表: {[f'{p:.1f}' for p in top_periods[:3]]}")
        
        return dominant_period, top_periods
    
    main_period, detected_periods = periodicity_detection()
    
    return seasonal_data, autocorr_values, main_period

seasonal_analysis_demo()

4. ARIMA模型

def arima_modeling_demo():
    """ARIMA模型演示"""
    print("\n=== ARIMA模型 ===")
    
    import pandas as pd
    import numpy as np
    from statsmodels.tsa.arima.model import ARIMA
    from statsmodels.tsa.stattools import adfuller
    from datetime import datetime
    
    # 1. 平稳性检验
    print("1. 平稳性检验:")
    
    def stationarity_test():
        """平稳性检验"""
        # 使用之前创建的时间序列数据
        ts_data = daily_series.dropna()
        
        # ADF检验
        adf_result = adfuller(ts_data)
        
        print(f"   ADF统计量: {adf_result[0]:.4f}")
        print(f"   p值: {adf_result[1]:.4f}")
        print(f"   临界值:")
        for key, value in adf_result[4].items():
            print(f"     {key}: {value:.4f}")
        
        if adf_result[1] <= 0.05:
            print("   结论: 序列是平稳的")
            is_stationary = True
        else:
            print("   结论: 序列是非平稳的,需要差分")
            is_stationary = False
        
        return is_stationary, ts_data
    
    is_stationary, ts_for_arima = stationarity_test()
    
    # 2. ARIMA模型拟合
    print("\n2. ARIMA模型拟合:")
    
    def arima_fitting():
        """ARIMA模型拟合"""
        # 如果序列非平稳,进行差分
        if not is_stationary:
            diff_data = ts_for_arima.diff().dropna()
            print(f"   一阶差分后数据点数: {len(diff_data)}")
        else:
            diff_data = ts_for_arima
        
        # 尝试不同的ARIMA模型
        arima_models = [
            (1, 0, 0),  # AR(1)
            (0, 0, 1),  # MA(1)
            (1, 0, 1),  # ARMA(1,1)
            (1, 1, 0),  # ARIMA(1,1,0)
            (0, 1, 1),  # ARIMA(0,1,1)
            (1, 1, 1),  # ARIMA(1,1,1)
        ]
        
        best_model = None
        best_aic = float('inf')
        
        for order in arima_models:
            try:
                model = ARIMA(diff_data, order=order)
                fitted_model = model.fit()
                
                print(f"   ARIMA{order} - AIC: {fitted_model.aic:.2f}")
                
                if fitted_model.aic < best_aic:
                    best_aic = fitted_model.aic
                    best_model = fitted_model
                    best_order = order
                    
            except Exception as e:
                print(f"   ARIMA{order} - 拟合失败: {e}")
        
        print(f"\n   最佳模型: ARIMA{best_order}")
        print(f"   最佳AIC: {best_aic:.2f}")
        
        return best_model, best_order
    
    arima_model, best_order = arima_fitting()
    
    # 3. 模型诊断
    print("\n3. 模型诊断:")
    
    def model_diagnostics():
        """模型诊断"""
        # 残差分析
        residuals = arima_model.resid
        
        print(f"   残差均值: {residuals.mean():.4f}")
        print(f"   残差标准差: {residuals.std():.4f}")
        
        # 残差正态性检验
        from scipy import stats
        shapiro_stat, shapiro_p = stats.shapiro(residuals[:1000])  # 限制样本大小
        print(f"   残差正态性检验 (Shapiro): p值 = {shapiro_p:.4f}")
        
        # Ljung-Box检验(残差自相关)
        from statsmodels.stats.diagnostic import acorr_ljungbox
        lb_stat, lb_p = acorr_ljungbox(residuals, lags=10, return_df=False)
        print(f"   Ljung-Box检验: p值 = {lb_p[-1]:.4f}")
        
        if lb_p[-1] > 0.05:
            print("   残差无显著自相关,模型合适")
        else:
            print("   残差存在自相关,模型可能需要改进")
        
        return residuals
    
    model_residuals = model_diagnostics()
    
    return arima_model, model_residuals

arima_modeling_demo()

5. 时间序列预测

def time_series_forecasting_demo():
    """时间序列预测演示"""
    print("\n=== 时间序列预测 ===")
    
    import pandas as pd
    import numpy as np
    from datetime import datetime, timedelta
    
    # 1. 简单预测方法
    print("1. 简单预测方法:")
    
    def simple_forecasting():
        """简单预测方法"""
        # 使用历史均值预测
        historical_mean = daily_series.mean()
        naive_forecast = [historical_mean] * 30  # 预测未来30天
        
        print(f"   历史均值预测: {historical_mean:.2f}")
        
        # 最后值预测(朴素方法)
        last_value = daily_series.iloc[-1]
        naive_last_forecast = [last_value] * 30
        
        print(f"   最后值预测: {last_value:.2f}")
        
        # 线性趋势外推
        x = np.arange(len(daily_series))
        slope, intercept = np.polyfit(x, daily_series.values, 1)
        
        future_x = np.arange(len(daily_series), len(daily_series) + 30)
        linear_forecast = slope * future_x + intercept
        
        print(f"   线性趋势预测范围: {linear_forecast[0]:.2f}{linear_forecast[-1]:.2f}")
        
        return naive_forecast, linear_forecast
    
    naive_pred, linear_pred = simple_forecasting()
    
    # 2. 指数平滑预测
    print("\n2. 指数平滑预测:")
    
    def exponential_smoothing_forecast():
        """指数平滑预测"""
        from statsmodels.tsa.holtwinters import ExponentialSmoothing
        
        # 简单指数平滑
        ses_model = ExponentialSmoothing(daily_series, trend=None, seasonal=None)
        ses_fitted = ses_model.fit()
        ses_forecast = ses_fitted.forecast(30)
        
        print(f"   简单指数平滑预测范围: {ses_forecast.iloc[0]:.2f}{ses_forecast.iloc[-1]:.2f}")
        
        # 双重指数平滑(Holt方法)
        holt_model = ExponentialSmoothing(daily_series, trend='add', seasonal=None)
        holt_fitted = holt_model.fit()
        holt_forecast = holt_fitted.forecast(30)
        
        print(f"   Holt方法预测范围: {holt_forecast.iloc[0]:.2f}{holt_forecast.iloc[-1]:.2f}")
        
        return ses_forecast, holt_forecast
    
    ses_pred, holt_pred = exponential_smoothing_forecast()
    
    # 3. ARIMA预测
    print("\n3. ARIMA预测:")
    
    def arima_forecasting():
        """ARIMA预测"""
        # 使用之前拟合的ARIMA模型
        if 'arima_model' in globals():
            arima_forecast = arima_model.forecast(30)
            forecast_ci = arima_model.get_forecast(30).conf_int()
            
            print(f"   ARIMA预测范围: {arima_forecast.iloc[0]:.2f}{arima_forecast.iloc[-1]:.2f}")
            print(f"   预测置信区间宽度: {forecast_ci.iloc[0, 1] - forecast_ci.iloc[0, 0]:.2f}")
        else:
            print("   ARIMA模型未拟合,跳过ARIMA预测")
            arima_forecast = None
            forecast_ci = None
        
        return arima_forecast, forecast_ci
    
    arima_pred, arima_ci = arima_forecasting()
    
    # 4. 预测评估
    print("\n4. 预测评估:")
    
    def forecast_evaluation():
        """预测评估"""
        # 创建评估数据集(使用最后30天作为测试集)
        train_data = daily_series[:-30]
        test_data = daily_series[-30:]
        
        print(f"   训练集大小: {len(train_data)}")
        print(f"   测试集大小: {len(test_data)}")
        
        # 计算简单预测的误差
        naive_errors = test_data - train_data.mean()
        mae_naive = np.mean(np.abs(naive_errors))
        rmse_naive = np.sqrt(np.mean(naive_errors ** 2))
        
        print(f"   朴素方法 MAE: {mae_naive:.2f}")
        print(f"   朴素方法 RMSE: {rmse_naive:.2f}")
        
        return mae_naive, rmse_naive
    
    mae, rmse = forecast_evaluation()
    
    return ses_pred, holt_pred, mae, rmse

time_series_forecasting_demo()

总结

时间序列分析的关键要点:

  1. 基础操作:时间序列创建、索引操作、重采样
  2. 趋势分析:移动平均、指数平滑、趋势检测、趋势分解
  3. 季节性分析:季节性模式识别、自相关分析、周期性检测
  4. ARIMA模型:平稳性检验、模型拟合、模型诊断
  5. 时间序列预测:简单预测、指数平滑、ARIMA预测
  6. 模型评估:预测误差计算、模型比较
  7. 最佳实践:数据预处理、模型选择、参数调优

掌握这些时间序列分析技能,可以深入理解时间序列数据的特征,构建准确的预测模型,为业务决策提供可靠的时间序列洞察。


转载请注明:周志洋的博客 » Python数据分析-时间序列分析详解

打赏一个呗

取消

感谢您的支持,我会继续努力的!

扫码支持
扫码支持
扫码打赏,你说多少就多少

打开支付宝扫一扫,即可进行扫码打赏哦