Sklearn pipeline

In [1]:
import datetime
import multiprocessing
import pandas as pd
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.ensemble import RandomForestClassifier
from sklearn.impute import SimpleImputer
from sklearn.linear_model import LogisticRegression
from sklearn import metrics
from sklearn.pipeline import FeatureUnion, Pipeline 
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.model_selection import GridSearchCV, TimeSeriesSplit
import sys
print(sys.version)
print(pd.__version__)
print(multiprocessing.cpu_count())
3.7.12 (default, Sep 10 2021, 00:21:48) 
[GCC 7.5.0]
1.1.5
2

Load raw stock price data

In [2]:
df = pd.read_csv(
    "/content/drive/My Drive/Colab Notebooks/custom_scikit_pipeline/SPY_yahoo_finance.csv",
    header=0
)
df.columns = [x.lower().replace(" ", "_") for x in df.columns]
df.head(5)
Out[2]:
date open high low close adj_close volume
0 1993-01-29 43.96875 43.96875 43.75000 43.93750 25.799770 1003200
1 1993-02-01 43.96875 44.25000 43.96875 44.25000 25.983273 480500
2 1993-02-02 44.21875 44.37500 44.12500 44.34375 26.038315 201300
3 1993-02-03 44.40625 44.84375 44.37500 44.81250 26.313566 529400
4 1993-02-04 44.96875 45.09375 44.46875 45.00000 26.423655 531500
In [3]:
df.describe(include="all")
Out[3]:
date open high low close adj_close volume
count 7193 7193.000000 7193.000000 7193.000000 7193.000000 7193.000000 7.193000e+03
unique 7193 NaN NaN NaN NaN NaN NaN
top 2000-10-09 NaN NaN NaN NaN NaN NaN
freq 1 NaN NaN NaN NaN NaN NaN
mean NaN 149.569786 150.446365 148.596184 149.573008 124.130425 8.432958e+07
std NaN 80.710651 81.049916 80.339373 80.732359 86.543832 9.571367e+07
min NaN 43.343750 43.531250 42.812500 43.406250 25.487831 5.200000e+03
25% NaN 100.739998 101.593750 99.790001 100.699997 71.142609 8.162800e+06
50% NaN 128.125000 128.860001 127.269997 128.187500 93.903046 5.864900e+07
75% NaN 190.369995 191.820007 188.789993 190.300003 168.407654 1.195754e+08
max NaN 445.589996 447.109985 445.070007 446.970001 446.970001 8.710263e+08
In [4]:
df.dtypes
Out[4]:
date          object
open         float64
high         float64
low          float64
close        float64
adj_close    float64
volume         int64
dtype: object
In [5]:
df["date"] = pd.to_datetime(df["date"])
df = df.sort_values(by="date", ascending=True)
df.describe(include="all")
/usr/local/lib/python3.7/dist-packages/ipykernel_launcher.py:3: FutureWarning: Treating datetime data as categorical rather than numeric in `.describe` is deprecated and will be removed in a future version of pandas. Specify `datetime_is_numeric=True` to silence this warning and adopt the future behavior now.
  This is separate from the ipykernel package so we can avoid doing imports until
Out[5]:
date open high low close adj_close volume
count 7193 7193.000000 7193.000000 7193.000000 7193.000000 7193.000000 7.193000e+03
unique 7193 NaN NaN NaN NaN NaN NaN
top 2007-09-10 00:00:00 NaN NaN NaN NaN NaN NaN
freq 1 NaN NaN NaN NaN NaN NaN
first 1993-01-29 00:00:00 NaN NaN NaN NaN NaN NaN
last 2021-08-20 00:00:00 NaN NaN NaN NaN NaN NaN
mean NaN 149.569786 150.446365 148.596184 149.573008 124.130425 8.432958e+07
std NaN 80.710651 81.049916 80.339373 80.732359 86.543832 9.571367e+07
min NaN 43.343750 43.531250 42.812500 43.406250 25.487831 5.200000e+03
25% NaN 100.739998 101.593750 99.790001 100.699997 71.142609 8.162800e+06
50% NaN 128.125000 128.860001 127.269997 128.187500 93.903046 5.864900e+07
75% NaN 190.369995 191.820007 188.789993 190.300003 168.407654 1.195754e+08
max NaN 445.589996 447.109985 445.070007 446.970001 446.970001 8.710263e+08
In [6]:
df.dtypes
Out[6]:
date         datetime64[ns]
open                float64
high                float64
low                 float64
close               float64
adj_close           float64
volume                int64
dtype: object

Illustrative feature engineering

In [7]:
df["open_close_delta"] = df["close"] / df["open"]
df.head(5)
Out[7]:
date open high low close adj_close volume open_close_delta
0 1993-01-29 43.96875 43.96875 43.75000 43.93750 25.799770 1003200 0.999289
1 1993-02-01 43.96875 44.25000 43.96875 44.25000 25.983273 480500 1.006397
2 1993-02-02 44.21875 44.37500 44.12500 44.34375 26.038315 201300 1.002827
3 1993-02-03 44.40625 44.84375 44.37500 44.81250 26.313566 529400 1.009148
4 1993-02-04 44.96875 45.09375 44.46875 45.00000 26.423655 531500 1.000695
In [8]:
df["day_of_week"] = df["date"].dt.dayofweek
df["day_of_week"] = df["day_of_week"].apply(lambda x: "monday" if x == 0 else x)
df.head(5)
Out[8]:
date open high low close adj_close volume open_close_delta day_of_week
0 1993-01-29 43.96875 43.96875 43.75000 43.93750 25.799770 1003200 0.999289 4
1 1993-02-01 43.96875 44.25000 43.96875 44.25000 25.983273 480500 1.006397 monday
2 1993-02-02 44.21875 44.37500 44.12500 44.34375 26.038315 201300 1.002827 1
3 1993-02-03 44.40625 44.84375 44.37500 44.81250 26.313566 529400 1.009148 2
4 1993-02-04 44.96875 45.09375 44.46875 45.00000 26.423655 531500 1.000695 3

Define Label

In [9]:
df["return"] = df["adj_close"] / df["adj_close"].shift(1)
df["label"] = df["return"].shift(-1) # today's features are used to forecast tomorrow's return
# setup label as a classification problem {0, 1}
df["label"] = df["label"].apply(lambda x: 1.0 if x > 1.005 else 0.0)
print(df.loc[:, ["date", "adj_close", "return", "label"]].head(5))
print(df["label"].value_counts(ascending=False))
        date  adj_close    return  label
0 1993-01-29  25.799770       NaN    1.0
1 1993-02-01  25.983273  1.007113    0.0
2 1993-02-02  26.038315  1.002118    1.0
3 1993-02-03  26.313566  1.010571    0.0
4 1993-02-04  26.423655  1.004184    0.0
0.0    5144
1.0    2049
Name: label, dtype: int64

Train/test split

Time series dataset: Train test split by date to avoid leakage

In [10]:
train_df = df.loc[pd.Timestamp("2016-12-31") >= df["date"], :]
print(len(train_df))
print(train_df["date"].describe())
test_df = df.loc[pd.Timestamp("2016-12-31") < df["date"], :]
print(len(test_df))
print(test_df["date"].describe())
6026
count                    6026
unique                   6026
top       1999-08-23 00:00:00
freq                        1
first     1993-01-29 00:00:00
last      2016-12-30 00:00:00
Name: date, dtype: object
1167
count                    1167
unique                   1167
top       2017-10-30 00:00:00
freq                        1
first     2017-01-03 00:00:00
last      2021-08-20 00:00:00
Name: date, dtype: object
/usr/local/lib/python3.7/dist-packages/ipykernel_launcher.py:3: FutureWarning: Treating datetime data as categorical rather than numeric in `.describe` is deprecated and will be removed in a future version of pandas. Specify `datetime_is_numeric=True` to silence this warning and adopt the future behavior now.
  This is separate from the ipykernel package so we can avoid doing imports until
/usr/local/lib/python3.7/dist-packages/ipykernel_launcher.py:6: FutureWarning: Treating datetime data as categorical rather than numeric in `.describe` is deprecated and will be removed in a future version of pandas. Specify `datetime_is_numeric=True` to silence this warning and adopt the future behavior now.
  

Feature transformation pipeline

In [11]:
numerical_features = ["volume", "open_close_delta"]
categorical_features = ["day_of_week"]
In [12]:
class FeatureSelector(BaseEstimator, TransformerMixin):
    def __init__(self, feature_names):
        self.feature_names = feature_names   
    def fit( self, X, y = None ):
        return self
    def transform(self, X, y=None):
        return X.loc[:, self.feature_names].copy(deep=True)
In [13]:
numerical_pipeline = Pipeline(steps = [ 
    ("num_selector", FeatureSelector(numerical_features)),
    ("imputer", SimpleImputer(strategy="median")),
    ("std_scaler", StandardScaler()) 
])
In [14]:
categorical_pipeline = Pipeline(steps = [ 
    ("num_selector", FeatureSelector(categorical_features)),
    ("ohe", OneHotEncoder(
        handle_unknown="ignore", 
        sparse=False,
        categories=[
            df["day_of_week"].unique()
        ])
    ) 
])

Example feature engineering inside pipeline

In [15]:
class DailyTrendFeature(BaseEstimator, TransformerMixin):
    def __init__(self):
        pass
    def fit( self, X, y = None ):
        return self
    def transform(self, X, y=None):
        X.loc[:, "open_close_delta"] = X["close"] / X["open"]
        def daily_trend(row):
            if 0.99 > row["open_close_delta"]: # assume 'down' day when prices fall > 1% from open
                row["daily_trend"] = "down"
            elif 1.01 < row["open_close_delta"]: # assume 'up' day when prices rise > 1% from open
                row["daily_trend"] = "up"
            else:
                row["daily_trend"] = "flat"
            return row
        X = X.apply(daily_trend, axis=1)
        return X

daily_trend_feature_pipeline = Pipeline(steps = [ 
    ("selector", FeatureSelector(["open", "close"])),
    ("feature_engineering", DailyTrendFeature()),
    ("selector_new", FeatureSelector(["daily_trend"])),
    ("ohe", OneHotEncoder(
        handle_unknown="ignore", 
        sparse=False,
        categories=[
            ["up", "down", "flat"],
        ])
    ) 
])
In [16]:
def test_new_feature_pipeline():
    test_df = train_df.sample(5).copy(deep=True).reset_index()
    print(test_df.loc[:, ["return"]])
    sample_transforms = daily_trend_feature_pipeline.fit_transform(
        test_df, 
        test_df["label"]
    )
    print(pd.DataFrame(
        sample_transforms, 
        columns=daily_trend_feature_pipeline.named_steps["ohe"].get_feature_names()
    ))
test_new_feature_pipeline()
     return
0  0.991621
1  0.997277
2  0.981978
3  0.996962
4  1.012702
   x0_up  x0_down  x0_flat
0    0.0      0.0      1.0
1    0.0      0.0      1.0
2    0.0      0.0      1.0
3    0.0      1.0      0.0
4    0.0      0.0      1.0
In [17]:
feature_pipeline = FeatureUnion(
    n_jobs=-1, 
    transformer_list=[ 
        ("numerical_pipeline", numerical_pipeline),
        ("categorical_pipeline", categorical_pipeline),
        ("daily_trend_feature_pipeline", daily_trend_feature_pipeline),
    ]
)
In [18]:
def test_feature_pipeline():
    test_df = train_df.sample(5).copy(deep=True).reset_index()
    display(test_df)
    feature_pipeline.fit(test_df, test_df["label"])
    display(pd.DataFrame(feature_pipeline.transform(test_df),
            columns = (
                numerical_features 
                + list(feature_pipeline.transformer_list[1][1]["ohe"].get_feature_names())
                + list(feature_pipeline.transformer_list[2][1]["ohe"].get_feature_names())
            )
        )
    )
test_feature_pipeline()
index date open high low close adj_close volume open_close_delta day_of_week return label
0 5215 2013-10-14 169.210007 171.080002 169.080002 170.940002 147.378723 112106000 1.010224 monday 1.003994 0.0
1 5924 2016-08-08 218.399994 218.520004 217.740005 218.050003 198.728592 39906500 0.998397 monday 0.999404 0.0
2 821 1996-04-30 65.437500 65.562500 65.125000 65.390625 41.525272 184400 0.999284 1 0.999284 0.0
3 973 1996-12-04 74.875000 75.062500 74.093750 74.953125 48.096256 2365100 1.001043 2 1.002717 0.0
4 2043 2001-03-05 124.150002 124.779999 123.809998 124.739998 84.509819 5293200 1.004752 monday 1.009142 1.0
volume open_close_delta x0_4 x0_monday x0_1 x0_2 x0_3 x0_up x0_down x0_flat
0 1.880130 1.728391 0.0 1.0 0.0 0.0 0.0 1.0 0.0 0.0
1 0.186182 -1.002947 0.0 1.0 0.0 0.0 0.0 0.0 0.0 1.0
2 -0.745779 -0.798280 0.0 0.0 1.0 0.0 0.0 0.0 0.0 1.0
3 -0.694616 -0.391867 0.0 0.0 0.0 1.0 0.0 0.0 0.0 1.0
4 -0.625917 0.464703 0.0 1.0 0.0 0.0 0.0 0.0 0.0 1.0

Model

In [19]:
model_pipeline = Pipeline(steps=[
    ("feature_pipeline", feature_pipeline),
    ("model", LogisticRegression())
])
param_grid = [
    {
        "feature_pipeline__numerical_pipeline__imputer__strategy": ["mean", "median"],
        "model": [LogisticRegression()],
        "model__C": [0.1, 1.0, 10],
    },
    {
        "feature_pipeline__numerical_pipeline__imputer__strategy": ["mean", "median"],
        "model": [RandomForestClassifier()],
        "model__max_depth": [3.0, 5.0, 7.0],
    }
]
grid_search = GridSearchCV(
    model_pipeline, 
    param_grid, 
    cv=TimeSeriesSplit(n_splits=5),
    scoring="roc_auc",
    refit=True,
    n_jobs=-1
)
# grid_search
In [ ]:
now = datetime.datetime.now()
grid_search.fit(train_df, train_df["label"])
print(datetime.datetime.now() - now)
In [ ]:
print(f"Best params: {grid_search.best_params_}")
print(f"Best score: {grid_search.best_score_}")

Metrics

In [ ]:
metrics.roc_auc_score(
    y_true=train_df["label"],
    y_score=grid_search.predict(train_df),
    average="weighted"
)
In [ ]:
metrics.roc_auc_score(
    y_true=test_df["label"],
    y_score=grid_search.predict(test_df),
    average="weighted"
)

Export notebook as HTML

In [ ]:
%%shell
jupyter nbconvert --to html '/content/drive/My Drive/Colab Notebooks/custom_scikit_pipeline/sklearn_pipe.ipynb'
In [ ]:
%%shell
# ### html with outputs
# jupyter nbconvert --to html  --no-input --no-prompt '/content/drive/My Drive/Colab Notebooks/custom_scikit_pipeline/sklearn_pipe.ipynb'
In [ ]: