TimeSeries forecasting template with sktime¶
In [ ]:
!pip install sktime
!pip install holidays
!pip install psycopg2
!pip install sqlalchemy
!pip install xgboost
!pip install scikit-learn
!pip install mlflow
!pip install hyperopt
In [ ]:
import pandas as pd
import numpy as np
import sqlalchemy
import os
import requests
import time
import datetime
import copy
import plotly.io as pio
import plotly.express as px
import seaborn as sns
import matplotlib.pyplot as plt
import plotly.graph_objects as go
from plotly.subplots import make_subplots
%load_ext sql
%sql $engine.url
# Else graph won't render
pio.renderers.default='jupyterlab'
Loading¶
DB¶
In [ ]:
engine = sqlalchemy.create_engine(os.environ.get('DB'))
In [ ]:
sql = """
select data.*
from
data
"""
In [ ]:
df = pd.read_sql(sql, con=engine)
df.head()
API¶
In [ ]:
url = 'http://url/to/api'
headers = {'Accept': 'application/json'}
In [ ]:
data = requests.get(url, headers=headers).json()
df = pd.DataFrame.from_dict(data)
File¶
In [ ]:
df = pd.read_csv('data.csv')
Clean Data¶
Smooth outliers¶
In [ ]:
threshold = np.nanpercentile(df.y.values,99.999)
df.loc[df.y >= threshold,'y'] = np.nan
In [ ]:
from sktime.transformations.series.outlier_detection import HampelFilter
smoother = HampelFilter(window_length=10)
df.y = smoother.fit_predict(df.y)
Time series resampling¶
upsample¶
In [ ]:
df = df.set_index('time')
df = df.resample('H').asfreq()
downsample¶
In [ ]:
df = df.resample('H').mean() # or median()
In [ ]:
df.index.freq = df.index.inferred_freq
Feature Engineering¶
time¶
In [ ]:
df['time'] = pd.to_datetime(df.time)
df['dow'] = df.time.dt.dayofweek
df['month'] = df.time.dt.month
df['hour'] = df.time.dt.hour
In [ ]:
def make_harmonic_features(value, period=24):
value *= 2 * np.pi / period
return [np.cos(value), np.sin(value)]
df[['hour_cos','hour_sin']] = df[['hour']].apply(lambda x: make_harmonic_features(x.hour),result_type='expand',axis=1)
weekends & holidays¶
In [ ]:
import holidays
nl_holidays = holidays.NL(years=[2022])
In [ ]:
df['holidays'] = df.dow.apply(lambda x: False if x < 5 else True)
df['holidays'] = df[['time','holidays']].apply(lambda row: True if x['time'].date() in nl_holidays else row['holidays'],axis=1)
merge exogenous¶
exact time¶
In [ ]:
df = df.merge(exo_df,how='left',left_on='time',right_on='time')
between time bracket¶
In [ ]:
import sqlite3
#Make the db in memory
conn = sqlite3.connect(':memory:')
#write the tables
df.to_sql('df', conn, index=False)
exo_df.to_sql('exo_df', conn, index=False)
qry = '''
select
df.*,
exo_df.*
from
df left join exo_df on
time between exo_start_time and exo_end_time
'''
df = pd.read_sql_query(qry, conn)
Fix dtypes¶
In [ ]:
df.time = pd.to_datetime(df.time)
df = pd.concat([df.select_dtypes([], ['object']),df.select_dtypes(['object']).apply(pd.Series.astype, dtype='category')], axis=1)
df.bool = df.bool.astype("float32")
df.int = df.int.astype("int")
df.float = df.holidays.astype("float64")
df.cat = df.cat.astype("category")
Analysis¶
Seasonality¶
In [ ]:
from statsmodels.tsa.seasonal import seasonal_decompose
result = seasonal_decompose(df.y, model='additive')
print(result.trend)
print(result.seasonal)
print(result.resid)
print(result.observed)
In [ ]:
mstl = MSTL(timeseries["y"], periods=(24, 24 * 7), stl_kwargs={"seasonal_deg": 0})
res = mstl.fit()
Correlations¶
Numerical¶
In [ ]:
corr_matrix = df.select_dtypes(['float']).corr()
sns.heatmap(corr_matrix,annot=True,annot_kws={"fontsize":6})
Categorical¶
In [ ]:
from scipy.stats import f_oneway
In [ ]:
anova_data = {}
for col in df.select_dtypes(include=["category","boolean"]).columns:
anova_data[col] = df[[col]+['y']].dropna().groupby(col).agg(list)['y'].values.tolist()
In [ ]:
fig, ax = plt.subplots(nrows=2, ncols=3, figsize=[15, 10], sharey=True)
ax = ax.flatten()
sns_blue = sns.color_palette(as_cmap=True)[0]
for ix, col in enumerate(df.select_dtypes(include=["category","boolean"]).columns):
sns.boxplot(ax =ax[ix],x=col, y='y', data=df[[col]+['y']].dropna(), showfliers=False)
try:
_, anova = f_oneway(*anova_data[col])
except TypeError:
anova = 'nan'
ax[ix].set_title(col+' p:'+str(anova))
fig.tight_layout()
Prediction¶
In [ ]:
from sklearn.impute import SimpleImputer, KNNImputer
from sklearn.preprocessing import OneHotEncoder, MinMaxScaler
from sklearn.pipeline import Pipeline
from sktime.transformations.hierarchical.aggregate import Aggregator
from sktime.transformations.panel.compose import ColumnTransformer
from sktime.transformations.series.adapt import TabularToSeriesAdaptor, PandasTransformAdaptor
Feature preparation: missing, scaling, one-hot¶
In [ ]:
X,y = df[df.columns[~df.columns.isin(["y"])]], df[['y']]
X preprocessing¶
In [ ]:
categorical_pipeline = Pipeline(steps=[
('impute', TabularToSeriesAdaptor(SimpleImputer(strategy='most_frequent',missing_values=np.nan)))
('one-hot', TabularToSeriesAdaptor(OneHotEncoder(handle_unknown='ignore', sparse=False)))
])
In [ ]:
numeric_pipeline = Pipeline(steps=[
('impute', TabularToSeriesAdaptor(SimpleImputer(strategy='mean',missing_values=np.nan))),
('scale', TabularToSeriesAdaptor(MinMaxScaler()))
])
In [ ]:
X_preprocess = ColumnTransformer(transformers=[
('boolean_impute', PandasTransformAdaptor("fillna", kwargs={'value':-1}), X.select_dtypes(include='float32').columns.tolist()),
('category', categorical_pipeline, X.select_dtypes(include='category').columns.tolist()),
('numeric', numeric_pipeline, X.select_dtypes(exclude=['category','float32']).columns.tolist())
])
In [ ]:
X_trans = X_processor.fit_transform(X)
y preprocessing¶
In [ ]:
y_preprocess = SimpleImputer(strategy='mean',missing_values=np.nan) * PandasTransformAdaptor("replace", kwargs={'to_replace':0, 'value':1})
In [ ]:
y_trans = y_transformer.fit_transform(y)
Forecast (XGBRegressor)¶
In [ ]:
from sktime.forecasting.base import ForecastingHorizon
from sktime.forecasting.model_selection import temporal_train_test_split
from sktime.utils.plotting import plot_series
from sktime.transformations.series.boxcox import BoxCoxTransformer, LogTransformer
from xgboost import XGBRegressor
from sktime.forecasting.model_selection import ForecastingGridSearchCV, SlidingWindowSplitter, ExpandingWindowSplitter
from sktime.forecasting.compose import make_reduction, TransformedTargetForecaster
from sktime.forecasting.model_evaluation import evaluate
from sktime.performance_metrics.forecasting import MeanAbsoluteError,MeanAbsolutePercentageError,MeanSquaredError,MeanSquaredPercentageError, MeanSquaredScaledError
Split¶
In [ ]:
y_train, y_test, X_train, X_test = temporal_train_test_split(y_trans, X_trans,test_size=24)
plot_series(y_train, y_test, labels=["y_train", "y_test"])
Pick estimator¶
In [ ]:
estimator = XGBRegressor(objective='reg:squarederror', random_state=42,missing=np.nan)
Simple¶
forecast normal data¶
In [ ]:
forecaster = make_reduction(estimator, window_length=8, strategy="recursive")
forecaster.fit(y=y_train, X=X_train)
y_pred = forecaster.predict(X=X_test, fh=list(range(1,24+1)))
forecast skewed data¶
In [ ]:
forecaster = TransformedTargetForecaster(
[
("transformer", BoxCoxTransformer(method='mle')),
("forecast", make_reduction(estimator, window_length=8, strategy="recursive")),
]
)
forecaster.fit(y=y_train, X=X_train)
y_pred = forecaster.predict(X=X_test, fh=list(range(1,24+1)))
CV¶
In [ ]:
forecaster = TransformedTargetForecaster(
[
("transformer", BoxCoxTransformer(method='mle')),
("forecast", make_reduction(estimator, window_length=8, strategy="recursive")),
]
)
cv = ExpandingWindowSplitter(initial_window=30*24, fh=list(range(1,24+1)), step_length=8*24)
train_windows, test_windows = get_windows(y_train, cv)
plot_windows(y_train, train_windows, test_windows)
In [ ]:
results = evaluate(forecaster=forecaster, y=y_train, X=X_train, cv=cv, strategy="refit", scoring= MeanSquaredError())
GridSearchCV¶
In [ ]:
param_grid = {"strategy" : ["last", "mean", "drift"]}
gscv = ForecastingGridSearchCV(
forecaster=forecaster,
param_grid=param_grid,
cv=cv)
gscv.fit(y=y_test, X=X_test)
y_pred = gscv.predict(X=X_test, fh=list(range(1,24+1)))
In [ ]:
from hyperopt import tpe, hp, fmin, STATUS_OK,SparkTrials, space_eval
from hyperopt.pyll import scope
import mlflow
from sktime.utils import mlflow_sktime
from warnings import simplefilter
import xgboost as xgb
xgb.set_config(verbosity=0)
set hyperopt objective function¶
In [ ]:
def objective(params):
simplefilter("ignore", category=UserWarning)
estimator = XGBRegressor(objective='reg:squarederror', random_state=42,missing=np.nan)
forecaster = TransformedTargetForecaster(
[
("transformer", BoxCoxTransformer(method='mle')),
("forecast", make_reduction(estimator, window_length=8, strategy="recursive")),
]
).set_params(**params)
cv = ExpandingWindowSplitter(initial_window=30*24, fh=list(range(1,24+1)), step_length=8*24)
cv_results = evaluate(forecaster=forecaster, y=y_train, X=X_train, cv=cv, strategy="refit", scoring= MeanSquaredError())
mse = cv_results.iloc[:,0].mean()
mlflow.log_param("hyper-parameters", params)
mlflow.log_param("features", X_train.columns)
mlflow.log_metric("rmse", mse)
mlflow.sklearn.log_model(forecaster.fit(y=y_train, X=X_train), "model")
modelpath = "micro_{}_{}".format(mod,valdate)
mlflow_sktime.save_model( sktime_model=forecaster,path=modelpath)
return {'loss': mse, 'params': params, 'status': STATUS_OK}
set hypreopt search space¶
In [ ]:
space = {
'transformer__method': hp.choice('transformer__method',['mle']),
'forecast__window_length':scope.int(hp.quniform('forecast__window_length', 5, 10, q=1)),
'forecast__estimator__nthread':hp.choice('forecast__estimator__nthread',[4]), #when use hyperthread, xgboost may become slower
'forecast__estimator__objective':hp.choice('forecast__estimator__objective',['reg:squarederror']),
'forecast__estimator__learning_rate':hp.choice('forecast__estimator__learning_rate', [.03, 0.05, .07]), #so called `eta` value
'forecast__estimator__max_depth':hp.choice('forecast__estimator__max_depth', [5, 6, 7]),
'forecast__estimator__min_child_weight':hp.choice('forecast__estimator__min_child_weight', [4]),
'forecast__estimator__silent':hp.choice( 'forecast__estimator__silent', [1]),
'forecast__estimator__subsample':hp.choice('forecast__estimator__subsample', [0.7]),
'forecast__estimator__colsample_bytree':hp.choice( 'forecast__estimator__colsample_bytree', [0.7]),
'forecast__estimator__n_estimators':hp.choice('forecast__estimator__n_estimators', [500])
}
In [ ]:
#best_params = fmin(fn = objective, space = space, algo = tpe.suggest, max_evals = 10, rstate=np.random.RandomState(123),
#trials=spark_trials)
#trials = SparkTrials(parallelism=4)
with mlflow.start_run(run_name='initial_search') as run:
best_params = fmin(fn = objective, space = space, algo = tpe.suggest, max_evals = 10, rstate=np.random.RandomState(123),
#trials=spark_trials)
In [ ]:
best_params = space_eval(space, best_params)
forecaster = forecaster.set_params(**best_params)
forecaster.fit(y=y_train, X=X_train)
y_pred = forecaster.predict(X=X_test, fh=list(range(1,24+1)))
Hierarchical Reconciliation¶
Works only on multiindex dataframe
In [ ]:
from sktime.forecasting.reconcile import ReconcilerForecaster
forecaster = TransformedTargetForecaster(
[
("transformer", BoxCoxTransformer(method='mle')),
("forecast", ForecastByLevel(estimator,groupby="panel"))
]
)
reconciler = ReconcilerForecaster(forecaster, method="ols")
Plots¶
In [ ]:
avg = copy.deepcopy(y_train)
avg['dow'] = avg.index.dayofweek
avg['hour'] = avg.index.hour
avg = avg.groupby(['dow','hour']).median()
avg = avg.loc[zip(y_test.index.dayofweek,y_test.index.hour)]
avg.index = y_test.index
rmse = MeanSquaredError(square_root=True)
rmse_pred = rmse(y_test, y_pred)
rmse_avg = rmse(y_test, avg)
plot_series( y_test, y_pred,avg, labels=[ "y_test", "y_pred","avg"], title='RMSE: '+str(rmse_pred), x_label='Date', y_label='y');
# Evaluate
print('RMSE: %.2f' % rmse_pred)
print('RMSE: %.2f' % rmse_avg)
Utility functions¶
plot train / test windows for CV¶
In [ ]:
from warnings import simplefilter
from matplotlib.ticker import MaxNLocator
def plot_windows(y, train_windows, test_windows, title=""):
"""Visualize training and test windows"""
simplefilter("ignore", category=UserWarning)
def get_y(length, split):
# Create a constant vector based on the split for y-axis."""
return np.ones(length) * split
n_splits = len(train_windows)
n_timepoints = len(y)
len_test = len(test_windows[0])
train_color, test_color = sns.color_palette("colorblind")[:2]
fig, ax = plt.subplots(figsize=plt.figaspect(0.3))
for i in range(n_splits):
train = train_windows[i]
test = test_windows[i]
ax.plot(
np.arange(n_timepoints), get_y(n_timepoints, i), marker="o", c="lightgray"
)
ax.plot(
train,
get_y(len(train), i),
marker="o",
c=train_color,
label="Window",
)
ax.plot(
test,
get_y(len_test, i),
marker="o",
c=test_color,
label="Forecasting horizon",
)
ax.invert_yaxis()
ax.yaxis.set_major_locator(MaxNLocator(integer=True))
ax.set(
title=title,
ylabel="Window number",
xlabel="Time",
xticklabels=y.index,
)
# remove duplicate labels/handles
handles, labels = [(leg[:2]) for leg in ax.get_legend_handles_labels()]
ax.legend(handles, labels);
def get_windows(y, cv):
"""Generate windows"""
train_windows = []
test_windows = []
for i, (train, test) in enumerate(cv.split(y)):
train_windows.append(train)
test_windows.append(test)
return train_windows, test_windows