import datetime
import multiprocessing
import pandas as pd
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.ensemble import RandomForestClassifier
from sklearn.impute import SimpleImputer
from sklearn.linear_model import LogisticRegression
from sklearn import metrics
from sklearn.pipeline import FeatureUnion, Pipeline
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.model_selection import GridSearchCV, TimeSeriesSplit
import sys
print(sys.version)
print(pd.__version__)
print(multiprocessing.cpu_count())
df = pd.read_csv(
"/content/drive/My Drive/Colab Notebooks/custom_scikit_pipeline/SPY_yahoo_finance.csv",
header=0
)
df.columns = [x.lower().replace(" ", "_") for x in df.columns]
df.head(5)
df.describe(include="all")
df.dtypes
df["date"] = pd.to_datetime(df["date"])
df = df.sort_values(by="date", ascending=True)
df.describe(include="all")
df.dtypes
df["open_close_delta"] = df["close"] / df["open"]
df.head(5)
df["day_of_week"] = df["date"].dt.dayofweek
df["day_of_week"] = df["day_of_week"].apply(lambda x: "monday" if x == 0 else x)
df.head(5)
df["return"] = df["adj_close"] / df["adj_close"].shift(1)
df["label"] = df["return"].shift(-1) # today's features are used to forecast tomorrow's return
# setup label as a classification problem {0, 1}
df["label"] = df["label"].apply(lambda x: 1.0 if x > 1.005 else 0.0)
print(df.loc[:, ["date", "adj_close", "return", "label"]].head(5))
print(df["label"].value_counts(ascending=False))
Time series dataset: Train test split by date to avoid leakage
train_df = df.loc[pd.Timestamp("2016-12-31") >= df["date"], :]
print(len(train_df))
print(train_df["date"].describe())
test_df = df.loc[pd.Timestamp("2016-12-31") < df["date"], :]
print(len(test_df))
print(test_df["date"].describe())
numerical_features = ["volume", "open_close_delta"]
categorical_features = ["day_of_week"]
class FeatureSelector(BaseEstimator, TransformerMixin):
def __init__(self, feature_names):
self.feature_names = feature_names
def fit( self, X, y = None ):
return self
def transform(self, X, y=None):
return X.loc[:, self.feature_names].copy(deep=True)
numerical_pipeline = Pipeline(steps = [
("num_selector", FeatureSelector(numerical_features)),
("imputer", SimpleImputer(strategy="median")),
("std_scaler", StandardScaler())
])
categorical_pipeline = Pipeline(steps = [
("num_selector", FeatureSelector(categorical_features)),
("ohe", OneHotEncoder(
handle_unknown="ignore",
sparse=False,
categories=[
df["day_of_week"].unique()
])
)
])
class DailyTrendFeature(BaseEstimator, TransformerMixin):
def __init__(self):
pass
def fit( self, X, y = None ):
return self
def transform(self, X, y=None):
X.loc[:, "open_close_delta"] = X["close"] / X["open"]
def daily_trend(row):
if 0.99 > row["open_close_delta"]: # assume 'down' day when prices fall > 1% from open
row["daily_trend"] = "down"
elif 1.01 < row["open_close_delta"]: # assume 'up' day when prices rise > 1% from open
row["daily_trend"] = "up"
else:
row["daily_trend"] = "flat"
return row
X = X.apply(daily_trend, axis=1)
return X
daily_trend_feature_pipeline = Pipeline(steps = [
("selector", FeatureSelector(["open", "close"])),
("feature_engineering", DailyTrendFeature()),
("selector_new", FeatureSelector(["daily_trend"])),
("ohe", OneHotEncoder(
handle_unknown="ignore",
sparse=False,
categories=[
["up", "down", "flat"],
])
)
])
def test_new_feature_pipeline():
test_df = train_df.sample(5).copy(deep=True).reset_index()
print(test_df.loc[:, ["return"]])
sample_transforms = daily_trend_feature_pipeline.fit_transform(
test_df,
test_df["label"]
)
print(pd.DataFrame(
sample_transforms,
columns=daily_trend_feature_pipeline.named_steps["ohe"].get_feature_names()
))
test_new_feature_pipeline()
feature_pipeline = FeatureUnion(
n_jobs=-1,
transformer_list=[
("numerical_pipeline", numerical_pipeline),
("categorical_pipeline", categorical_pipeline),
("daily_trend_feature_pipeline", daily_trend_feature_pipeline),
]
)
def test_feature_pipeline():
test_df = train_df.sample(5).copy(deep=True).reset_index()
display(test_df)
feature_pipeline.fit(test_df, test_df["label"])
display(pd.DataFrame(feature_pipeline.transform(test_df),
columns = (
numerical_features
+ list(feature_pipeline.transformer_list[1][1]["ohe"].get_feature_names())
+ list(feature_pipeline.transformer_list[2][1]["ohe"].get_feature_names())
)
)
)
test_feature_pipeline()
model_pipeline = Pipeline(steps=[
("feature_pipeline", feature_pipeline),
("model", LogisticRegression())
])
param_grid = [
{
"feature_pipeline__numerical_pipeline__imputer__strategy": ["mean", "median"],
"model": [LogisticRegression()],
"model__C": [0.1, 1.0, 10],
},
{
"feature_pipeline__numerical_pipeline__imputer__strategy": ["mean", "median"],
"model": [RandomForestClassifier()],
"model__max_depth": [3.0, 5.0, 7.0],
}
]
grid_search = GridSearchCV(
model_pipeline,
param_grid,
cv=TimeSeriesSplit(n_splits=5),
scoring="roc_auc",
refit=True,
n_jobs=-1
)
# grid_search
now = datetime.datetime.now()
grid_search.fit(train_df, train_df["label"])
print(datetime.datetime.now() - now)
print(f"Best params: {grid_search.best_params_}")
print(f"Best score: {grid_search.best_score_}")
metrics.roc_auc_score(
y_true=train_df["label"],
y_score=grid_search.predict(train_df),
average="weighted"
)
metrics.roc_auc_score(
y_true=test_df["label"],
y_score=grid_search.predict(test_df),
average="weighted"
)
%%shell
jupyter nbconvert --to html '/content/drive/My Drive/Colab Notebooks/custom_scikit_pipeline/sklearn_pipe.ipynb'
%%shell
# ### html with outputs
# jupyter nbconvert --to html --no-input --no-prompt '/content/drive/My Drive/Colab Notebooks/custom_scikit_pipeline/sklearn_pipe.ipynb'