时间序列分析的重要性
时间序列分析是数据分析中的重要分支,广泛应用于金融、经济、气象、销售等领域。通过分析时间序列数据,我们可以识别趋势、季节性模式、周期性变化,并进行预测和异常检测。Python提供了强大的时间序列分析工具,特别是pandas和statsmodels库。本文将从基础的时间序列操作到高级的建模和预测,全面介绍Python时间序列分析的最佳实践。
时间序列基础
1. 时间序列创建和操作
def time_series_basics_demo():
"""时间序列基础演示"""
print("=== 时间序列基础 ===")
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
# 1. 创建时间序列
print("1. 创建时间序列:")
def create_time_series():
"""创建各种类型的时间序列"""
# 日频率时间序列(2019年全年)
dates_daily = pd.date_range('2019-01-01', periods=365, freq='D')
daily_ts = pd.Series(
np.random.randn(365).cumsum() + 100, # 随机游走 + 100
index=dates_daily,
name='daily_values'
)
print(f" 日频率序列: {len(daily_ts)} 个数据点")
print(f" 时间范围: {daily_ts.index.min()} 到 {daily_ts.index.max()}")
# 小时频率时间序列(一周)
dates_hourly = pd.date_range('2019-04-01', periods=168, freq='H')
hourly_ts = pd.Series(
np.sin(np.arange(168) * 2 * np.pi / 24) + np.random.randn(168) * 0.1,
index=dates_hourly,
name='hourly_values'
)
print(f" 小时频率序列: {len(hourly_ts)} 个数据点")
# 月度时间序列(2019年)
dates_monthly = pd.date_range('2019-01-01', periods=12, freq='M')
monthly_ts = pd.Series(
np.random.randn(12) * 10 + 50,
index=dates_monthly,
name='monthly_values'
)
print(f" 月度序列: {len(monthly_ts)} 个数据点")
return daily_ts, hourly_ts, monthly_ts
daily_ts, hourly_ts, monthly_ts = create_time_series()
# 2. 时间序列索引操作
print("\n2. 时间序列索引操作:")
def time_series_indexing():
"""时间序列索引操作"""
# 按日期切片
april_data = daily_ts['2019-04']
print(f" 2019年4月数据: {len(april_data)} 个数据点")
# 按日期范围切片
q1_data = daily_ts['2019-01-01':'2019-03-31']
print(f" 2019年第一季度数据: {len(q1_data)} 个数据点")
# 按条件筛选
weekend_data = daily_ts[daily_ts.index.weekday >= 5]
print(f" 周末数据: {len(weekend_data)} 个数据点")
# 提取特定时间组件
daily_ts['year'] = daily_ts.index.year
daily_ts['month'] = daily_ts.index.month
daily_ts['day'] = daily_ts.index.day
daily_ts['weekday'] = daily_ts.index.weekday
print(f" 添加时间组件: year, month, day, weekday")
time_series_indexing()
# 3. 重采样操作
print("\n3. 重采样操作:")
def resampling_operations():
"""重采样操作"""
# 从日频率重采样到周频率
weekly_mean = daily_ts.resample('W').mean()
weekly_sum = daily_ts.resample('W').sum()
print(f" 周频率重采样(均值): {len(weekly_mean)} 个数据点")
print(f" 周频率重采样(求和): {len(weekly_sum)} 个数据点")
# 从小时频率重采样到日频率
daily_from_hourly = hourly_ts.resample('D').agg(['mean', 'min', 'max'])
print(f" 从小时到日重采样: {len(daily_from_hourly)} 个数据点")
# 上采样(插值)
hourly_from_daily = daily_ts.resample('H').interpolate(method='linear')
print(f" 从日到小时上采样: {len(hourly_from_daily)} 个数据点")
return weekly_mean, daily_from_hourly
weekly_data, daily_aggregated = resampling_operations()
return daily_ts, weekly_data
daily_series, weekly_series = time_series_basics_demo()
2. 趋势分析
def trend_analysis_demo():
"""趋势分析演示"""
print("\n=== 趋势分析 ===")
import pandas as pd
import numpy as np
from datetime import datetime
# 1. 移动平均
print("1. 移动平均:")
def moving_averages():
"""移动平均分析"""
# 简单移动平均
sma_7 = daily_series.rolling(window=7).mean()
sma_30 = daily_series.rolling(window=30).mean()
print(f" 7日简单移动平均: {sma_7.dropna().iloc[0]:.2f} 到 {sma_7.dropna().iloc[-1]:.2f}")
print(f" 30日简单移动平均: {sma_30.dropna().iloc[0]:.2f} 到 {sma_30.dropna().iloc[-1]:.2f}")
# 加权移动平均
weights = np.array([0.1, 0.2, 0.3, 0.4]) # 最近的值权重更大
wma = daily_series.rolling(window=4).apply(lambda x: np.average(x, weights=weights))
print(f" 4日加权移动平均: 已完成计算")
# 指数移动平均
ema_7 = daily_series.ewm(span=7).mean()
ema_30 = daily_series.ewm(span=30).mean()
print(f" 7日指数移动平均: {ema_7.iloc[0]:.2f} 到 {ema_7.iloc[-1]:.2f}")
print(f" 30日指数移动平均: {ema_30.iloc[0]:.2f} 到 {ema_30.iloc[-1]:.2f}")
return sma_7, sma_30, ema_7, ema_30
sma_7, sma_30, ema_7, ema_30 = moving_averages()
# 2. 趋势检测
print("\n2. 趋势检测:")
def trend_detection():
"""趋势检测"""
# 线性趋势拟合
from scipy import stats
x = np.arange(len(daily_series))
slope, intercept, r_value, p_value, std_err = stats.linregress(x, daily_series.values)
print(f" 线性趋势斜率: {slope:.4f}")
print(f" 相关系数: {r_value:.4f}")
print(f" p值: {p_value:.4f}")
if slope > 0:
print(" 趋势方向: 上升")
elif slope < 0:
print(" 趋势方向: 下降")
else:
print(" 趋势方向: 平稳")
# 趋势强度判断
if abs(r_value) > 0.7:
print(" 趋势强度: 强")
elif abs(r_value) > 0.4:
print(" 趋势强度: 中等")
else:
print(" 趋势强度: 弱")
return slope, r_value, p_value
trend_slope, trend_correlation, trend_pvalue = trend_detection()
# 3. 趋势分解
print("\n3. 趋势分解:")
def trend_decomposition():
"""趋势分解"""
from statsmodels.tsa.seasonal import seasonal_decompose
# 添加一些季节性和趋势
t = np.arange(len(daily_series))
seasonal = 10 * np.sin(2 * np.pi * t / 365.25) # 年度季节性
trend = 0.01 * t # 线性趋势
noise = np.random.randn(len(daily_series)) * 2
synthetic_ts = daily_series + seasonal + trend + noise
# 季节性分解
decomposition = seasonal_decompose(synthetic_ts, model='additive', period=365)
print(f" 原始序列均值: {synthetic_ts.mean():.2f}")
print(f" 趋势分量均值: {decomposition.trend.mean():.2f}")
print(f" 季节性分量标准差: {decomposition.seasonal.std():.2f}")
print(f" 残差分量标准差: {decomposition.resid.std():.2f}")
return decomposition
decomposition = trend_decomposition()
return sma_30, ema_30, decomposition
trend_analysis_demo()
3. 季节性分析
def seasonal_analysis_demo():
"""季节性分析演示"""
print("\n=== 季节性分析 ===")
import pandas as pd
import numpy as np
# 1. 季节性模式识别
print("1. 季节性模式识别:")
def seasonal_patterns():
"""季节性模式识别"""
# 创建具有季节性的人工数据
dates = pd.date_range('2019-01-01', periods=365, freq='D')
# 年度季节性
annual_seasonal = 20 * np.sin(2 * np.pi * np.arange(365) / 365.25)
# 周季节性
weekly_seasonal = 5 * np.sin(2 * np.pi * np.arange(365) / 7)
# 月度季节性
monthly_seasonal = 3 * np.sin(2 * np.pi * np.arange(365) / 30.44)
# 组合季节性
seasonal_ts = pd.Series(
annual_seasonal + weekly_seasonal + monthly_seasonal + np.random.randn(365) * 2,
index=dates,
name='seasonal_data'
)
print(f" 年度季节性幅度: {annual_seasonal.max() - annual_seasonal.min():.2f}")
print(f" 周季节性幅度: {weekly_seasonal.max() - weekly_seasonal.min():.2f}")
print(f" 月度季节性幅度: {monthly_seasonal.max() - monthly_seasonal.min():.2f}")
return seasonal_ts
seasonal_data = seasonal_patterns()
# 2. 自相关分析
print("\n2. 自相关分析:")
def autocorrelation_analysis():
"""自相关分析"""
from statsmodels.tsa.stattools import acf, pacf
# 计算自相关函数
autocorr = acf(seasonal_data.dropna(), nlags=40)
# 计算偏自相关函数
partial_autocorr = pacf(seasonal_data.dropna(), nlags=40)
print(f" 滞后1期自相关: {autocorr[1]:.4f}")
print(f" 滞后7期自相关: {autocorr[7]:.4f}")
print(f" 滞后30期自相关: {autocorr[30]:.4f}")
# 找到显著的自相关滞后
significant_lags = np.where(np.abs(autocorr) > 0.2)[0]
print(f" 显著自相关滞后: {significant_lags[:5]}") # 显示前5个
return autocorr, partial_autocorr
autocorr_values, pacf_values = autocorrelation_analysis()
# 3. 周期性检测
print("\n3. 周期性检测:")
def periodicity_detection():
"""周期性检测"""
from scipy import signal
# 使用FFT检测周期性
fft = np.fft.fft(seasonal_data.values)
freqs = np.fft.fftfreq(len(seasonal_data))
# 找到主要频率
power_spectrum = np.abs(fft) ** 2
dominant_freq_idx = np.argmax(power_spectrum[1:len(power_spectrum)//2]) + 1
dominant_freq = freqs[dominant_freq_idx]
dominant_period = 1 / abs(dominant_freq)
print(f" 主要周期: {dominant_period:.1f} 天")
# 检测多个周期
sorted_indices = np.argsort(power_spectrum[1:len(power_spectrum)//2])[::-1]
top_periods = []
for idx in sorted_indices[:5]:
period = 1 / abs(freqs[idx + 1])
if 2 <= period <= 365: # 合理的周期范围
top_periods.append(period)
print(f" 主要周期列表: {[f'{p:.1f}天' for p in top_periods[:3]]}")
return dominant_period, top_periods
main_period, detected_periods = periodicity_detection()
return seasonal_data, autocorr_values, main_period
seasonal_analysis_demo()
4. ARIMA模型
def arima_modeling_demo():
"""ARIMA模型演示"""
print("\n=== ARIMA模型 ===")
import pandas as pd
import numpy as np
from statsmodels.tsa.arima.model import ARIMA
from statsmodels.tsa.stattools import adfuller
from datetime import datetime
# 1. 平稳性检验
print("1. 平稳性检验:")
def stationarity_test():
"""平稳性检验"""
# 使用之前创建的时间序列数据
ts_data = daily_series.dropna()
# ADF检验
adf_result = adfuller(ts_data)
print(f" ADF统计量: {adf_result[0]:.4f}")
print(f" p值: {adf_result[1]:.4f}")
print(f" 临界值:")
for key, value in adf_result[4].items():
print(f" {key}: {value:.4f}")
if adf_result[1] <= 0.05:
print(" 结论: 序列是平稳的")
is_stationary = True
else:
print(" 结论: 序列是非平稳的,需要差分")
is_stationary = False
return is_stationary, ts_data
is_stationary, ts_for_arima = stationarity_test()
# 2. ARIMA模型拟合
print("\n2. ARIMA模型拟合:")
def arima_fitting():
"""ARIMA模型拟合"""
# 如果序列非平稳,进行差分
if not is_stationary:
diff_data = ts_for_arima.diff().dropna()
print(f" 一阶差分后数据点数: {len(diff_data)}")
else:
diff_data = ts_for_arima
# 尝试不同的ARIMA模型
arima_models = [
(1, 0, 0), # AR(1)
(0, 0, 1), # MA(1)
(1, 0, 1), # ARMA(1,1)
(1, 1, 0), # ARIMA(1,1,0)
(0, 1, 1), # ARIMA(0,1,1)
(1, 1, 1), # ARIMA(1,1,1)
]
best_model = None
best_aic = float('inf')
for order in arima_models:
try:
model = ARIMA(diff_data, order=order)
fitted_model = model.fit()
print(f" ARIMA{order} - AIC: {fitted_model.aic:.2f}")
if fitted_model.aic < best_aic:
best_aic = fitted_model.aic
best_model = fitted_model
best_order = order
except Exception as e:
print(f" ARIMA{order} - 拟合失败: {e}")
print(f"\n 最佳模型: ARIMA{best_order}")
print(f" 最佳AIC: {best_aic:.2f}")
return best_model, best_order
arima_model, best_order = arima_fitting()
# 3. 模型诊断
print("\n3. 模型诊断:")
def model_diagnostics():
"""模型诊断"""
# 残差分析
residuals = arima_model.resid
print(f" 残差均值: {residuals.mean():.4f}")
print(f" 残差标准差: {residuals.std():.4f}")
# 残差正态性检验
from scipy import stats
shapiro_stat, shapiro_p = stats.shapiro(residuals[:1000]) # 限制样本大小
print(f" 残差正态性检验 (Shapiro): p值 = {shapiro_p:.4f}")
# Ljung-Box检验(残差自相关)
from statsmodels.stats.diagnostic import acorr_ljungbox
lb_stat, lb_p = acorr_ljungbox(residuals, lags=10, return_df=False)
print(f" Ljung-Box检验: p值 = {lb_p[-1]:.4f}")
if lb_p[-1] > 0.05:
print(" 残差无显著自相关,模型合适")
else:
print(" 残差存在自相关,模型可能需要改进")
return residuals
model_residuals = model_diagnostics()
return arima_model, model_residuals
arima_modeling_demo()
5. 时间序列预测
def time_series_forecasting_demo():
"""时间序列预测演示"""
print("\n=== 时间序列预测 ===")
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
# 1. 简单预测方法
print("1. 简单预测方法:")
def simple_forecasting():
"""简单预测方法"""
# 使用历史均值预测
historical_mean = daily_series.mean()
naive_forecast = [historical_mean] * 30 # 预测未来30天
print(f" 历史均值预测: {historical_mean:.2f}")
# 最后值预测(朴素方法)
last_value = daily_series.iloc[-1]
naive_last_forecast = [last_value] * 30
print(f" 最后值预测: {last_value:.2f}")
# 线性趋势外推
x = np.arange(len(daily_series))
slope, intercept = np.polyfit(x, daily_series.values, 1)
future_x = np.arange(len(daily_series), len(daily_series) + 30)
linear_forecast = slope * future_x + intercept
print(f" 线性趋势预测范围: {linear_forecast[0]:.2f} 到 {linear_forecast[-1]:.2f}")
return naive_forecast, linear_forecast
naive_pred, linear_pred = simple_forecasting()
# 2. 指数平滑预测
print("\n2. 指数平滑预测:")
def exponential_smoothing_forecast():
"""指数平滑预测"""
from statsmodels.tsa.holtwinters import ExponentialSmoothing
# 简单指数平滑
ses_model = ExponentialSmoothing(daily_series, trend=None, seasonal=None)
ses_fitted = ses_model.fit()
ses_forecast = ses_fitted.forecast(30)
print(f" 简单指数平滑预测范围: {ses_forecast.iloc[0]:.2f} 到 {ses_forecast.iloc[-1]:.2f}")
# 双重指数平滑(Holt方法)
holt_model = ExponentialSmoothing(daily_series, trend='add', seasonal=None)
holt_fitted = holt_model.fit()
holt_forecast = holt_fitted.forecast(30)
print(f" Holt方法预测范围: {holt_forecast.iloc[0]:.2f} 到 {holt_forecast.iloc[-1]:.2f}")
return ses_forecast, holt_forecast
ses_pred, holt_pred = exponential_smoothing_forecast()
# 3. ARIMA预测
print("\n3. ARIMA预测:")
def arima_forecasting():
"""ARIMA预测"""
# 使用之前拟合的ARIMA模型
if 'arima_model' in globals():
arima_forecast = arima_model.forecast(30)
forecast_ci = arima_model.get_forecast(30).conf_int()
print(f" ARIMA预测范围: {arima_forecast.iloc[0]:.2f} 到 {arima_forecast.iloc[-1]:.2f}")
print(f" 预测置信区间宽度: {forecast_ci.iloc[0, 1] - forecast_ci.iloc[0, 0]:.2f}")
else:
print(" ARIMA模型未拟合,跳过ARIMA预测")
arima_forecast = None
forecast_ci = None
return arima_forecast, forecast_ci
arima_pred, arima_ci = arima_forecasting()
# 4. 预测评估
print("\n4. 预测评估:")
def forecast_evaluation():
"""预测评估"""
# 创建评估数据集(使用最后30天作为测试集)
train_data = daily_series[:-30]
test_data = daily_series[-30:]
print(f" 训练集大小: {len(train_data)}")
print(f" 测试集大小: {len(test_data)}")
# 计算简单预测的误差
naive_errors = test_data - train_data.mean()
mae_naive = np.mean(np.abs(naive_errors))
rmse_naive = np.sqrt(np.mean(naive_errors ** 2))
print(f" 朴素方法 MAE: {mae_naive:.2f}")
print(f" 朴素方法 RMSE: {rmse_naive:.2f}")
return mae_naive, rmse_naive
mae, rmse = forecast_evaluation()
return ses_pred, holt_pred, mae, rmse
time_series_forecasting_demo()
总结
时间序列分析的关键要点:
- 基础操作:时间序列创建、索引操作、重采样
- 趋势分析:移动平均、指数平滑、趋势检测、趋势分解
- 季节性分析:季节性模式识别、自相关分析、周期性检测
- ARIMA模型:平稳性检验、模型拟合、模型诊断
- 时间序列预测:简单预测、指数平滑、ARIMA预测
- 模型评估:预测误差计算、模型比较
- 最佳实践:数据预处理、模型选择、参数调优
掌握这些时间序列分析技能,可以深入理解时间序列数据的特征,构建准确的预测模型,为业务决策提供可靠的时间序列洞察。
转载请注明:周志洋的博客 » Python数据分析-时间序列分析详解


