Source code for recipies.step
from abc import abstractmethod
from copy import deepcopy
from typing import Union, Dict
from pandas.core.groupby import DataFrameGroupBy
import polars as pl
from polars.dataframe.group_by import GroupBy
from scipy.sparse import isspmatrix
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler
from pandas.api.types import is_timedelta64_dtype, is_datetime64_any_dtype
from .ingredients import Ingredients
from enum import Enum
from .selector import (
Selector,
all_predictors,
all_numeric_predictors,
select_groups,
select_sequence,
)
from .constants import Backend
[docs]
class Step:
"""This class represents a step in a recipe.
Steps are transformations to be executed on selected columns of a DataFrame.
They fit a transformer to the selected columns and afterwards transform the data with the fitted transformer.
Args:
sel: Object that holds information about the selected columns.
Attributes:
columns: List with the names of the selected columns.
"""
[docs]
def __init__(self, sel: Selector = all_predictors(), supported_backends: list[Backend] = [Backend.POLARS, Backend.PANDAS]):
self.sel = sel
self.columns = []
self._trained = False
self._group = True
self.supported_backends = supported_backends
@property
def trained(self) -> bool:
return self._trained
@property
def group(self) -> bool:
return self._group
[docs]
def fit(self, data: Ingredients):
"""This function fits the transformer to the data.
Args:
data: The DataFrame to fit to.
"""
data = self._check_ingredients(data)
self.columns = self.sel(data)
self.do_fit(data)
self._trained = True
def _check_ingredients(self, data: Union[Ingredients, GroupBy | DataFrameGroupBy]) -> Ingredients:
"""Check input for allowed types
Args:
data: input to the step
Raises:
ValueError: If a grouped pd.DataFrame is provided to a step that can't use groups.
ValueError: If input are not (potentially grouped) Ingredients.
Returns:
Validated input
"""
if self.supported_backends is not None and data.get_backend() not in self.supported_backends:
raise ValueError(f"{data.get_backend()} not supported by this step.")
if isinstance(data, GroupBy) or isinstance(data, DataFrameGroupBy):
if not self._group:
raise ValueError("Step does not accept grouped data.")
# data = data.apply(lambda df: df)
if not isinstance(data, Ingredients):
raise ValueError(f"Expected Ingredients object, got {data.__class__}")
return data
[docs]
def transform(self, data: Ingredients) -> Ingredients:
"""This function transforms the data with the fitted transformer.
Args:
data: The DataFrame to transform.
Returns:
The transformed DataFrame.
"""
pass
[docs]
def fit_transform(self, data: Ingredients) -> Ingredients:
self.fit(data)
return self.transform(data)
def __repr__(self) -> str:
repr = self.desc + " for "
if not self.trained:
repr += str(self.sel)
else:
repr += str(self.columns) if len(self.columns) < 3 else str(self.columns[:2] + ["..."]) # FIXME: remove brackets
repr += " [trained]"
return repr
[docs]
class StepImputeFill(Step):
"""For Pandas: uses pandas' internal `nafill` function to replace missing values.
See `pandas.DataFrame.nafill` for a description of the arguments.
"""
[docs]
def __init__(self, sel=all_predictors(), value=None, strategy=None, limit=None):
super().__init__(sel)
self.desc = f"Impute with {strategy if strategy else value}"
self.value = value
self.strategy = strategy
self.limit = limit
[docs]
def transform(self, data):
new_data = self._check_ingredients(data)
groups = select_groups(new_data)
if data.get_backend() == Backend.POLARS:
if len(groups) > 0:
new_data.data = data.data.with_columns(
pl.col(self.columns).fill_null(self.value, strategy=self.strategy, limit=self.limit).over(groups)
)
else:
new_data.data = data.data.with_columns(
pl.col(self.columns).fill_null(self.value, strategy=self.strategy, limit=self.limit)
)
else:
# Pandas syntax
func = None
if self.strategy == "forward":
func = pd.core.groupby.SeriesGroupBy.ffill
elif self.strategy == "backward":
func = pd.core.groupby.SeriesGroupBy.bfill
elif self.strategy == "zero":
self.value = 0
elif self.value is None:
raise ValueError(f"No valid strategy provided. Strategy was: {self.strategy}")
if len(groups) > 0:
df = new_data.groupby(groups)
else:
df = new_data.get_df()
# [self.columns] = data.groupby(groups)[self.columns].fillna(self.value, method=self.strategy, limit=self.limit)
new_df = new_data.get_df()
if self.value is not None:
# If value is set, fill with value
if isinstance(df, GroupBy) or isinstance(df, DataFrameGroupBy):
# This type of imputation can only be done with ungrouped data
df = df.obj
updated_columns = {col: df[col].fillna(self.value) for col in self.columns}
else:
# if func is None:
# # updated_columns = {col: df[col].fillna(self.value, method=self.strategy, limit=self.limit) for col in
# # self.columns}
# else:
updated_columns = {col: func(df[col]) for col in self.columns}
# Use pd.concat to update the DataFrame in one go
new_df = pd.concat([new_df.drop(columns=self.columns), pd.DataFrame(updated_columns)], axis=1)
df = new_df
new_data.set_df(df)
return new_data
[docs]
class StepImputeFastZeroFill(Step):
"""Quick variant of pandas' internal `nafill(value=0)` for grouped dataframes."""
[docs]
def __init__(self, sel=all_predictors()):
super().__init__(sel, supported_backends=[Backend.PANDAS])
self.desc = "Impute quickly with 0"
[docs]
def transform(self, data):
new_data = self._check_ingredients(data)
# Ignore grouping as grouping does not matter for zero fill.
new_data[self.columns] = new_data[self.columns].fillna(0)
return new_data
[docs]
class StepImputeFastForwardFill(Step):
"""Quick variant of pandas' internal `nafill(method='ffill')` for grouped dataframes.
Note: this variant does not allow for setting a limit.
"""
[docs]
def __init__(self, sel=all_predictors()):
super().__init__(sel, supported_backends=[Backend.PANDAS])
self.desc = "Impute with fast ffill"
[docs]
def transform(self, data):
new_data = self._check_ingredients(data)
# Use cumsum (which is optimised for grouped frames) to figure out which
# values should be left at NaN, then ffill on the ungrouped dataframe. Adopted from:
# https://stackoverflow.com/questions/36871783/fillna-forward-fill-on-a-large-dataframe-efficiently-with-groupby
df = new_data.get_df()
nofill = df.copy()
nofill[self.columns] = pd.notnull(nofill[self.columns])
nofill = nofill.groupby(select_groups(new_data))[self.columns].cumsum()
df[self.columns] = df[self.columns].ffill()
for col in self.columns:
df.loc[nofill[col].to_numpy() == 0, col] = np.nan
new_data.set_df(df)
return new_data
# class StepImputeFastZeroFill(Step):
# """Quick variant of pandas' internal `nafill(value=0)` for grouped dataframes."""
#
# def __init__(self, sel=all_predictors()):
# super().__init__(sel)
# self.desc = "Impute quickly with 0"
#
# def transform(self, data):
# new_data = self._check_ingredients(data)
# # Ignore grouping as grouping does not matter for zero fill.
# new_data[self.columns] = new_data[self.columns].fillna(0)
#
# return new_data
#
#
# class StepImputeFastForwardFill(Step):
# """Quick variant of pandas' internal `nafill(method='ffill')` for grouped dataframes.
#
# Note: this variant does not allow for setting a limit.
# """
#
# def __init__(self, sel=all_predictors()):
# super().__init__(sel)
# self.desc = "Impute with fast ffill"
#
# def transform(self, data):
# new_data = self._check_ingredients(data)
#
# # Use cumsum (which is optimised for grouped frames) to figure out which
# # values should be left at NaN, then ffill on the ungrouped dataframe. Adopted from:
# # https://stackoverflow.com/questions/36871783/fillna-forward-fill-on-a-large-dataframe-efficiently-with-groupby
# nofill = new_data.copy()
# nofill[self.columns] = pd.notnull(nofill[self.columns])
# nofill = nofill.groupby(data.keys).cumsum()
#
# new_data[self.columns] = new_data[self.columns].ffill()
# for col in self.columns:
# new_data.loc[nofill[col].to_numpy() == 0, col] = np.nan
#
# return new_data
[docs]
class StepImputeModel(Step):
"""Uses a pretrained imputation model to impute missing values.
Args:
model: A function that takes a dataframe and the grouping columns as input and
returns a dataframe with imputed values without the grouping column.
"""
[docs]
def __init__(self, sel=all_predictors(), model=None):
super().__init__(sel)
self.desc = "Impute with pretrained imputation model"
self.model = model
[docs]
def transform(self, data):
new_data = self._check_ingredients(data)
if data.get_backend() == Backend.POLARS:
new_data[self.columns] = self.model(new_data[self.columns + select_groups(new_data)], select_groups(new_data))
return new_data
[docs]
class Accumulator(Enum):
MAX = "max"
MIN = "min"
MEAN = "mean"
MEDIAN = "median"
COUNT = "count"
VAR = "var"
FIRST = "first"
LAST = "last"
[docs]
class StepHistorical(Step):
"""This step generates columns with a historical accumulator provided by the user.
Args:
fun: Instance of the Accumulator enumerable that signifies which type of historical accumulation
to use (default is MAX).
suffix: Defaults to none. Set the name to have the step generate new columns with this suffix
instead of the default suffix.
role: Defaults to 'predictor'. In case new columns are added, set their role to role.
"""
[docs]
def __init__(
self,
sel: Selector = all_numeric_predictors(),
fun: Accumulator = Accumulator.MAX,
suffix: str = None,
role: str = "predictor",
):
super().__init__(sel)
self.desc = f"Create historical {fun}"
self.fun = fun
if suffix is None:
try:
suffix = fun.value
except Exception:
raise TypeError(f"Expected Accumulator enum for function, got {self.fun.__class__}")
self.suffix = suffix
self.role = role
[docs]
def transform(self, data: Ingredients) -> Ingredients:
"""
Raises:
TypeError: If the function is not of type Accumulator
"""
new_data = self._check_ingredients(data)
self.suffix = "_" + self.suffix
new_columns = [c + self.suffix for c in self.columns]
selected = new_data.data
selected_cols = pl.col(self.columns)
id = select_groups(new_data)
if data.get_backend() == Backend.POLARS:
if self.fun is Accumulator.MAX:
res = selected.with_columns(selected_cols.cum_max().over(id).name.suffix(self.suffix))
elif self.fun is Accumulator.MIN:
res = selected.with_columns(selected_cols.cum_min().over(id).name.suffix(self.suffix))
elif self.fun is Accumulator.MEAN:
res = selected.with_columns(
selected_cols.rolling_mean(window_size=selected.height, min_samples=0).over(id).name.suffix(self.suffix)
)
elif self.fun is Accumulator.MEDIAN:
res = selected.with_columns(
selected_cols.rolling_median(window_size=selected.height, min_samples=0).over(id).name.suffix(self.suffix)
)
elif self.fun is Accumulator.COUNT:
res = selected.with_columns(selected_cols.cum_count().over(id).name.suffix(self.suffix))
elif self.fun is Accumulator.VAR:
res = selected.with_columns(
selected_cols.rolling_var(window_size=selected.height, min_samples=0).over(id).name.suffix(self.suffix)
)
else:
raise TypeError(f"Expected Accumulator enum for function, got {self.fun.__class__}")
new_data.set_df(res)
else:
data = data.groupby(id)
if self.fun is Accumulator.MAX:
res = data[self.columns].cummax(skipna=True)
elif self.fun is Accumulator.MIN:
res = data[self.columns].cummin(skipna=True)
elif self.fun is Accumulator.MEAN:
# Reset index, as we get back a multi-index, and we want a simple rolling index
res = data[self.columns].expanding().mean().reset_index(drop=True)
elif self.fun is Accumulator.MEDIAN:
res = data[self.columns].expanding().median().reset_index(drop=True)
elif self.fun is Accumulator.COUNT:
res = data[self.columns].expanding().count().reset_index(drop=True)
elif self.fun is Accumulator.VAR:
res = data[self.columns].expanding().var().reset_index(drop=True)
else:
raise TypeError(f"Expected Accumulator enum for function, got {self.fun.__class__}")
# df = new_data.get_df()
# df[new_columns] = res
# new_data.set_df(df)
new_data.set_df(new_data.get_df().assign(**{new_columns[i]: res.iloc[:, i] for i in range(len(new_columns))}))
for nc in new_columns:
new_data.update_role(nc, self.role)
return new_data
[docs]
class StepSklearn(Step):
"""This step takes a transformer from scikit-learn and makes it usable as a step in a recipe.
Args:
sklearn_transformer: Instance of scikit-learn transformer that implements fit() and transform().
columnwise: Defaults to False. Set to True to fit and transform the DF column by column.
in_place: Defaults to True. Set to False to have the step generate new columns
instead of overwriting the existing ones.
role (str, optional): Defaults to 'predictor'. Incase new columns are added, set their role to role.
"""
[docs]
def __init__(
self,
sklearn_transformer: object,
sel: Selector = all_predictors(),
columnwise: bool = False,
in_place: bool = True,
role: str = "predictor",
):
super().__init__(sel)
self.desc = f"Use sklearn transformer {sklearn_transformer.__class__.__name__}"
self.sklearn_transformer = sklearn_transformer
self.columnwise = columnwise
self.in_place = in_place
self.role = role
self._group = False
[docs]
def do_fit(self, data: Ingredients) -> Ingredients:
"""
Raises:
ValueError: If the transformer expects a single column but gets multiple.
"""
if self.columnwise:
self._transformers = {
# copy the transformer so we keep the distinct fit for each column and don't just refit
col: deepcopy(self.sklearn_transformer.fit(data[col]))
for col in self.columns
}
else:
try:
# print(data[self.columns])
self.sklearn_transformer.fit(data[self.columns])
except ValueError as e:
if "should be a 1d array" in str(e) or "Multioutput target data is not supported" in str(e):
raise ValueError(
"The sklearn transformer expects a 1d array as input. " "Try running the step with columnwise=True."
)
raise
[docs]
def transform(self, data: Ingredients) -> Ingredients:
"""
Raises:
TypeError: If the transformer returns a sparse matrix.
ValueError: If the transformer returns an unexpected amount of columns.
"""
new_data = self._check_ingredients(data)
if self.columnwise:
for col in self.columns:
new_cols = self._transformers[col].transform(new_data[col])
if self.in_place and new_cols.ndim == 2 and new_cols.shape[1] > 1:
raise ValueError(
"The sklearn transformer returned more than one column. Try running the step with in_place=False."
)
col_names = (
col
if self.in_place
else [f"{self.sklearn_transformer.__class__.__name__}_{col}_{i + 1}" for i in range(new_cols.shape[1])]
)
if data.get_backend() == Backend.POLARS:
if isinstance(col_names, str):
col_names = [col_names]
updated_cols = pl.from_numpy(new_cols, schema=col_names)
new_data.data = new_data.data.with_columns(updated_cols)
else:
df = new_data.get_df()
df[col_names] = new_cols
new_data.set_df(df)
else:
new_cols = self.sklearn_transformer.transform(new_data[self.columns])
if isspmatrix(new_cols):
raise TypeError(
"The sklearn transformer returns a sparse matrix, "
"but recipes expects a dense numpy representation. "
"Try setting sparse_output=False or similar in the transformer initialization."
)
col_names = (
self.columns
if self.in_place
else (
[f"{self.sklearn_transformer.__class__.__name__}_{self.columns[i]}" for i in range(new_cols.shape[1])]
if new_cols.shape[1] == len(self.columns)
else [f"{self.sklearn_transformer.__class__.__name__}_{i + 1}" for i in range(new_cols.shape[1])]
)
)
if new_cols.shape[1] != len(col_names):
raise ValueError(
"The sklearn transformer returned a different amount of columns. Try running the step with in_place=False."
)
new_data[col_names] = new_cols
# set role of new columns
if not self.in_place:
for col in col_names:
new_data.update_role(col, self.role)
return new_data
[docs]
class StepResampling(Step):
[docs]
def __init__(
self,
new_resolution: str = "1h",
accumulator_dict: Dict[Selector, Accumulator] = {all_predictors(): Accumulator.LAST},
default_accumulator: Accumulator = Accumulator.LAST,
):
"""This class represents a resampling step in a recipe.
Args:
new_resolution: Resolution to resample to.
accumulator_dict: Supply dictionary with individual accumulation methods for each Selector.
default_accumulator: Accumulator to use for variables not supplied in dictionary.
"""
super().__init__()
self.new_resolution = new_resolution
self.acc_dict = accumulator_dict
self.default_accumulator = default_accumulator
self._group = True
[docs]
def transform(self, data):
new_data = self._check_ingredients(data)
# Check for and save first sequence role
if select_sequence(new_data) is not None:
sequence_role = select_sequence(new_data)[0]
else:
raise AssertionError("Sequence role has not been assigned, resampling step not possible")
sequence_datatype = new_data.dtypes[sequence_role]
# if not (isinstance(pl.datatypes.TemporalType,sequence_datatype)): #or is_datetime64_any_dtype(sequence_datatype)):
if data.get_backend() == Backend.POLARS and not (
sequence_datatype.is_temporal()
): # or is_datetime64_any_dtype(sequence_datatype)):
raise ValueError(f"Expected Timedelta or Timestamp object, got {sequence_role(data).__class__}")
if data.get_backend() == Backend.PANDAS and not (
is_timedelta64_dtype(sequence_datatype) or is_datetime64_any_dtype(sequence_datatype)
):
raise ValueError(f"Expected Timedelta or Timestamp object, got {sequence_role(data).__class__}")
# Dictionary with the format column: str , accumulator:str is created
col_acc_map = {}
# Go through supplied Selector, Accumulator pairs
for selector, accumulator in self.acc_dict.items():
selected_columns = selector(new_data)
# Add variables associated with selector with supplied accumulator
col_acc_map.update({col: accumulator.value for col in selected_columns})
# Add non-specified variables, if not a sequence role
col_acc_map.update(
{
col: self.default_accumulator.value
for col in set(new_data.columns).difference(col_acc_map.keys())
if col not in select_sequence(new_data)
}
)
# acc_col_map = dict((v, k) for k, v in col_acc_map.items())
if data.get_backend() == Backend.POLARS:
from collections import defaultdict
acc_col_map = defaultdict(list)
for k, v in col_acc_map.items():
acc_col_map[v].append(k)
if len(select_groups(new_data)) > 0:
grouping_role = select_groups(new_data)[0]
# Resampling with the functions defined in col_acc_map
new_data.set_df(new_data.get_df().sort(grouping_role, sequence_role).set_sorted(sequence_role))
new_data.set_df(
new_data.get_df()
.upsample(every=self.new_resolution, time_column=sequence_role, group_by=grouping_role)
.with_columns(pl.col(acc_col_map["last"]).fill_null(strategy="forward"))
.with_columns(pl.col(acc_col_map["mean"]).fill_null(strategy="mean"))
.with_columns(pl.col(acc_col_map["max"]).fill_null(strategy="max"))
.with_columns(pl.col(grouping_role).fill_null(strategy="forward"))
)
else:
new_data.set_df(new_data.get_df().sort(sequence_role).set_sorted(sequence_role))
new_data.set_df(
new_data.get_df()
.upsample(every=self.new_resolution, time_column=sequence_role)
.with_columns(pl.col(acc_col_map["last"]).fill_null(strategy="forward"))
.with_columns(pl.col(acc_col_map["mean"]).fill_null(strategy="mean"))
.with_columns(pl.col(acc_col_map["max"]).fill_null(strategy="max"))
)
else:
# Resampling with the functions defined in col_acc_map
if len(select_groups(new_data)) > 0:
df = data.groupby(select_groups(data))
else:
df = data.get_df()
new_data.set_df(df.resample(self.new_resolution, on=sequence_role).agg(col_acc_map))
# Remove multi-index in case of grouped data
if isinstance(data.get_df(), DataFrameGroupBy):
new_data = new_data.set_df(new_data.get_df().droplevel(select_groups(data.get_df().obj)))
# Remove sequence index, while keeping column
# new_data = new_data.set_df(new_data.get_df().reset_index(drop=False))
return new_data
[docs]
class StepScale(StepSklearn):
"""Provides a wrapper for a scaling with StepSklearn.
Note that because SKlearn transforms None (nulls) to NaN, we have to revert.
Args:
with_mean: Defaults to True. If True, center the data before scaling.
with_std: Defaults to True. If True, scale the data to unit variance (or equivalently, unit standard deviation).
in_place: Defaults to True. Set to False to have the step generate new columns instead of overwriting the existing ones.
role (str, optional): Defaults to 'predictor'. Incase new columns are added, set their role to role.
"""
[docs]
def __init__(self, sel=all_numeric_predictors(), with_mean: bool = True, with_std: bool = True, *args, **kwargs):
super().__init__(
sklearn_transformer=StandardScaler(with_mean=with_mean, with_std=with_std), sel=sel, in_place=True, *args, **kwargs
)
self.desc = "Scale with StandardScaler"
[docs]
def transform(self, data: Ingredients) -> Ingredients:
data = super().transform(data)
# Revert null to nan conversion done by sklearn
if data.get_backend() == Backend.POLARS:
data.set_df(data.get_df().with_columns(pl.col(self.columns).fill_nan(None)))
# else:
# data.set_df(data.get_df()[self.columns].fillna(value=None))
return data
[docs]
class StepFunction(Step):
"""Provides a wrapper for a simple transformation function, without fitting."""
[docs]
def __init__(self, sel: Selector, function):
super().__init__(sel=sel)
self.function = function
self._trained = True
[docs]
def transform(self, data: Ingredients) -> Ingredients:
new_data = self._check_ingredients(data)
new_data = self.function(new_data)
return new_data