根据历史销售数据训练预测模型
评估以下模型,找出契合度最高的
- '移动平均'
- '自回归模型 (AR)'
- 'ARIMA 模型'
- '简单指数平滑法'
- '季节性 ARIMA 模型 (SARIMAX)'
根据预测模型预测下一阶段的销售趋势
ps.想要用Rust重写,但评估了Rust的学习曲线后,果断放弃。以后有时间再考虑吧~
import pandas as pd
from rich.logging import RichHandler
from statsmodels.tsa.ar_model import AutoReg
from statsmodels.tsa.arima.model import ARIMA
from statsmodels.tsa.holtwinters import SimpleExpSmoothing
from statsmodels.tsa.statespace.sarimax import SARIMAX
from prophet import Prophet
from sklearn.metrics import mean_squared_error
import warnings
import logging
from time_counter import timeit # 导入本地的 time_counter.py 中的 timeit 装饰器
import itertools
import numpy as np
from statsmodels.tsa.seasonal import STL # 新增导入
logging.basicConfig(
level="NOTSET",
format="%(message)s",
datefmt="[%X]",
handlers=[RichHandler()]
)
log = logging.getLogger("rich")
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# 忽略警告
warnings.filterwarnings("ignore")
# 配置参数
DATA_FILE = '20210401_20250430.xlsx'
DATE_COLUMN = 'Created On(Delivery)'
PRODUCT_NAME_COLUMN = 'Material Short Text'
QUANTITY_COLUMN = '发货单数量'
START_DATE = '2021-01-01'
FREQ = 'MS'
# 将 methods 字典定义为全局变量
methods = {
'移动平均': lambda data: data.rolling(window=3).mean().iloc[-1],
'自回归模型 (AR)': lambda data: AutoReg(data, lags=3).fit().forecast(steps=5),
'ARIMA 模型': lambda data: ARIMA(data, order=(1, 1, 1)).fit().forecast(steps=5),
'简单指数平滑法': lambda data: SimpleExpSmoothing(data).fit().forecast(steps=5),
'季节性 ARIMA 模型 (SARIMAX)': lambda data: SARIMAX(data, order=(1, 1, 1), seasonal_order=(1, 1, 1, 12)).fit().forecast(steps=5),
'Prophet 模型': lambda data: Prophet().fit(pd.DataFrame({'ds': pd.date_range(start=START_DATE, periods=len(data), freq=FREQ), 'y': data})).make_future_dataframe(periods=5, freq=FREQ).tail(5)['yhat']
}
@timeit
def read_and_preprocess_data(product_name):
"""
读取并预处理数据
:param product_name: 产品名称
:return: 预处理后的数据
"""
try:
logging.info(f"正在读取 Excel 文件: {DATA_FILE}")
df = pd.read_excel(DATA_FILE)
logging.info("Excel 文件读取完成")
logging.info(f"读取到的数据总行数: {len(df)}")
except FileNotFoundError:
logging.error(f"未找到文件: {DATA_FILE}")
return None
df[PRODUCT_NAME_COLUMN] = df[PRODUCT_NAME_COLUMN].str.strip().str.upper()
product_name = product_name.strip().upper()
logging.info(f"正在筛选 {PRODUCT_NAME_COLUMN} 为 {product_name} 的数据...")
filtered_df = df[df[PRODUCT_NAME_COLUMN] == product_name].copy()
logging.info(f"筛选后的数据总行数: {len(filtered_df)}")
if len(filtered_df) == 0:
logging.error(f"未找到 {product_name} 相关数据,请检查数据文件。")
return None
logging.info("数据筛选完成")
try:
logging.info(f"正在将 {DATE_COLUMN} 转换为日期序列...")
filtered_df[DATE_COLUMN] = pd.to_datetime(filtered_df[DATE_COLUMN])
logging.info("日期序列转换完成")
except ValueError:
logging.error(f"无法将 {DATE_COLUMN} 转换为日期序列,请检查数据格式。")
return None
# 提取年 - 月信息
filtered_df['YearMonth'] = filtered_df[DATE_COLUMN].dt.to_period('M')
logging.info("正在提取年、月信息...")
filtered_df['Year'] = filtered_df[DATE_COLUMN].dt.year
filtered_df['Month'] = filtered_df[DATE_COLUMN].dt.month
logging.info("年、月信息提取完成")
logging.info("正在汇总年、月的发货单数量并转换为整数...")
aggregated_df = filtered_df.groupby(['YearMonth', 'Year', 'Month'])[QUANTITY_COLUMN].sum().astype(int).reset_index()
aggregated_df.rename(columns={QUANTITY_COLUMN: 'Actual'}, inplace=True)
logging.info("发货单数量汇总完成")
# 检查日期连续性(补充缺失月份)
logging.info("正在检查日期连续性并补充缺失月份...")
full_date_range = pd.date_range(start=START_DATE, end=aggregated_df['YearMonth'].dt.to_timestamp().max(), freq=FREQ)
aggregated_df['YearMonth'] = aggregated_df['YearMonth'].dt.to_timestamp()
aggregated_df = aggregated_df.set_index('YearMonth').reindex(full_date_range).fillna(0).reset_index()
aggregated_df.rename(columns={'index': 'YearMonth'}, inplace=True)
aggregated_df['Year'] = aggregated_df['YearMonth'].dt.year
aggregated_df['Month'] = aggregated_df['YearMonth'].dt.month
logging.info("日期连续性检查和缺失月份补充完成")
# 新增季节性分解代码
stl = STL(aggregated_df['Actual'], period=12).fit()
aggregated_df = aggregated_df.assign(
trend=stl.trend,
seasonal=stl.seasonal,
residual=stl.resid
)
logging.info("季节性分解完成")
return aggregated_df
@timeit
def train_and_predict(method_name, train_data, steps):
"""
训练模型并进行预测
:param method_name: 预测方法名称
:param train_data: 训练数据
:param steps: 预测步数
:return: 预测结果
"""
if method_name == '移动平均':
return [methods[method_name](train_data)] * steps
elif method_name == '自回归模型 (AR)':
# 定义参数范围
lags_range = range(1, 10)
best_aic = float('inf')
best_lags = None
for lags in lags_range:
try:
model = AutoReg(train_data, lags=lags).fit()
if model.aic < best_aic:
best_aic = model.aic
best_lags = lags
except:
continue
model = AutoReg(train_data, lags=best_lags).fit()
return model.forecast(steps=steps)
elif method_name == 'ARIMA 模型':
# 定义参数范围
p = d = q = range(0, 2)
pdq = list(itertools.product(p, d, q))
best_aic = float('inf')
best_pdq = None
for param in pdq:
try:
model = ARIMA(train_data, order=param).fit()
if model.aic < best_aic:
best_aic = model.aic
best_pdq = param
except:
continue
model = ARIMA(train_data, order=best_pdq).fit()
return model.forecast(steps=steps)
elif method_name == '简单指数平滑法':
# 定义参数范围
smoothing_levels = np.linspace(0.1, 1, 10)
best_mse = float('inf')
best_smoothing_level = None
for smoothing_level in smoothing_levels:
try:
model = SimpleExpSmoothing(train_data).fit(smoothing_level=smoothing_level)
forecast = model.forecast(steps=steps)
mse = mean_squared_error(train_data, forecast[:len(train_data)])
if mse < best_mse:
best_mse = mse
best_smoothing_level = smoothing_level
except:
continue
model = SimpleExpSmoothing(train_data).fit(smoothing_level=best_smoothing_level)
return model.forecast(steps=steps)
elif method_name == '季节性 ARIMA 模型 (SARIMAX)':
# 定义参数范围
p = d = q = range(0, 2)
P = D = Q = range(0, 2)
pdq = list(itertools.product(p, d, q))
seasonal_pdq = [(x[0], x[1], x[2], 12) for x in list(itertools.product(P, D, Q))]
best_aic = float('inf')
best_pdq = None
best_seasonal_pdq = None
for param in pdq:
for param_seasonal in seasonal_pdq:
try:
model = SARIMAX(train_data, order=param, seasonal_order=param_seasonal).fit()
if model.aic < best_aic:
best_aic = model.aic
best_pdq = param
best_seasonal_pdq = param_seasonal
except:
continue
model = SARIMAX(train_data, order=best_pdq, seasonal_order=best_seasonal_pdq).fit()
return model.forecast(steps=steps)
elif method_name == 'Prophet 模型':
train_df = pd.DataFrame({'ds': pd.date_range(start=START_DATE, periods=len(train_data), freq=FREQ), 'y': train_data})
model = Prophet()
model.fit(train_df)
future = model.make_future_dataframe(periods=steps, freq=FREQ)
return model.predict(future)['yhat'][-steps:]
@timeit
def evaluate_forecasting_methods(aggregated_df, train_size):
"""
评估不同的预测方法
:param aggregated_df: 汇总后的数据
:param train_size: 训练集大小
:return: 最佳预测方法和最佳均方误差
"""
test_size = len(aggregated_df) - train_size
train_data = aggregated_df['Actual'].iloc[:train_size]
test_data = aggregated_df['Actual'].iloc[train_size:]
logging.info(f"训练集数据量: {len(train_data)},测试集数据量: {len(test_data)}")
logging.info("训练集和测试集划分完成")
logging.info("开始评估每种预测方法...")
best_method = None
best_mse = float('inf')
for method_name in methods:
logging.info(f"正在使用 {method_name} 方法进行预测...")
try:
forecast = train_and_predict(method_name, train_data, test_size)
mse = mean_squared_error(test_data, forecast)
logging.info(f"{method_name} 方法的均方误差 (MSE): {mse}")
if mse < best_mse:
best_mse = mse
best_method = method_name
except Exception as e:
logging.error(f"{method_name} 计算时出现错误: {e}")
logging.info("预测方法评估完成")
return best_method, best_mse
def generate_future_dates(aggregated_df, forecast_steps):
"""
生成未来多个月的年和月信息
:param aggregated_df: 汇总后的数据
:param forecast_steps: 预测步数
:return: 未来日期列表
"""
last_year = aggregated_df['Year'].iloc[-1]
last_month = aggregated_df['Month'].iloc[-1]
future_dates = []
for i in range(1, forecast_steps + 1):
next_month = (last_month + i) % 12
if next_month == 0:
next_month = 12
next_year = last_year + (last_month + i - 1) // 12
future_dates.append((next_year, next_month))
return future_dates
@timeit
def make_final_forecast(aggregated_df, best_method, product_name, forecast_steps):
if best_method:
print(f"正在使用 {best_method} 方法对未来 {forecast_steps} 个月度周期进行预测...")
if best_method == 'Prophet 模型':
train_df = pd.DataFrame({'ds': pd.date_range(start='2021-01-01', periods=len(aggregated_df['Actual']), freq='MS'), 'y': aggregated_df['Actual']})
model = Prophet()
model.fit(train_df)
future = model.make_future_dataframe(periods=forecast_steps, freq='MS')
final_forecast = model.predict(future)['yhat'][-forecast_steps:]
else:
final_forecast = (methods[best_method])(aggregated_df['Actual'])
# 处理移动平均方法返回单一数值的情况
if best_method == '移动平均':
final_forecast = [final_forecast] * forecast_steps
# 将预测结果转换为整数类型
final_forecast = pd.Series(final_forecast).astype(int)
# 获取最后一个已知月份的年和月
last_year = aggregated_df['Year'].iloc[-1]
last_month = aggregated_df['Month'].iloc[-1]
# 生成未来多个月的年和月信息
future_dates = []
for i in range(1, forecast_steps + 1):
next_month = (last_month + i) % 12
if next_month == 0:
next_month = 12
next_year = last_year + (last_month + i - 1) // 12
future_dates.append((next_year, next_month))
# 创建一个包含预测结果和对应年月的 DataFrame
forecast_df = pd.DataFrame({
'Year': [year for year, _ in future_dates],
'Month': [month for _, month in future_dates],
'Actual': [None] * forecast_steps, # 预测部分实际值为空
'Forecast': final_forecast
})
# 合并历史数据和预测数据
combined_df = pd.concat([aggregated_df, forecast_df], ignore_index=True)
# 打印包含年和月信息的预测结果
print(f"未来 {forecast_steps} 个月度周期的预测结果:")
for (year, month), forecast in zip(future_dates, final_forecast):
print(f"{year}年{month}月: {forecast}")
# 保存为 CSV 文件,文件名包含 product_name
csv_filename = f'{product_name}_forecast.csv'
combined_df.to_csv(csv_filename, index=False)
print(f"预测结果已保存到 {csv_filename}")
if __name__ == "__main__":
product_name = input("请输入要预测的产品名称: ").strip()
# product_name = 'IUP27 硒鼓' # 可修改为其他产品名称
forecast_steps = 12 # 预测周期
train_percentage_minmax = (0.5, 0.9) # 训练集百分比的最小值和最大值,这里以 70% 到 90% 为例
aggregated_df = read_and_preprocess_data(product_name)
best_overall_mse = float('inf')
best_overall_method = None
best_train_percentage = None
# 遍历不同的训练集百分比
for train_percentage in [i / 100 for i in range(int(train_percentage_minmax[0] * 100), int(train_percentage_minmax[1] * 100) + 1)]:
train_size = int(len(aggregated_df) * train_percentage)
print(f"\n正在评估训练集百分比为 {train_percentage * 100}% 的情况...")
best_method, best_mse = evaluate_forecasting_methods(aggregated_df, train_size)
if best_mse < best_overall_mse:
best_overall_mse = best_mse
best_overall_method = best_method
best_train_percentage = train_percentage
print(f"\n最佳整体训练集百分比: {best_train_percentage * 100}%")
print(f"最佳整体预测方法: {best_overall_method},均方误差: {best_overall_mse}")
best_train_size = int(len(aggregated_df) * best_train_percentage)
make_final_forecast(aggregated_df, best_overall_method, product_name, forecast_steps)