Source code for datawrangler.decorate.decorate

import warnings
import functools
import numpy as np
import pandas as pd
import sklearn.decomposition as decomposition
import sklearn.manifold as manifold
import sklearn.feature_extraction.text as text
import sklearn.mixture as mixture

from sklearn.experimental import enable_iterative_imputer
import sklearn.impute as impute

from ..zoo import wrangle
from ..zoo.text import is_sklearn_model
from ..zoo.dataframe import is_dataframe, is_multiindex_dataframe
from ..zoo.array import is_array
from ..core import get_default_options, apply_defaults, update_dict
from ..util.helpers import depth


defaults = get_default_options()
format_checkers = eval(defaults['supported_formats']['types'])


# import all model-like classes within a sklearn-like module; return a list of model names
# note: this is NOT a decorator-- it's a helper function used to seed functions for the module_checker decorator
def import_sklearn_models(module):
    """
    Given a Python module object, import all of the scikit-learn-style models it contains (i.e., all objects with
    fit_transform methods) into the workspace

    Parameters
    ----------
    :param module: a Python module object (e.g. sklearn.decomposition)

    Returns
    -------
    :return: a list of valid models contained in the module
    """
    models = [d for d in dir(module) if hasattr(getattr(module, d), 'fit_transform')]
    for m in models:
        exec(f'from {module.__name__} import {m}', globals())
    return models


def get_sklearn_model(x):
    """
    Wrangle a scikit-learn model into a consistent format

    Parameters
    ----------
    :param x: a callable scikit-learn model, a string containing a scikit-learn model's name (e.g.,
    'LatentDirichletAllocation'), or a dictionary with the following keys:
      - 'model': a callable scikit-learn model or a string containing a scikit-learn model's name
      - 'args': a list of arguments to pass to the model (this list will be pre-pended with the data the model is
        applied to)
      - 'kwargs': a list of keyword arguments to pass to the model

    Returns
    -------
    :return: A callable scikit-learn model
    """
    if is_sklearn_model(x):
        return x  # already a valid model
    elif type(x) is dict:
        if hasattr(x, 'model'):
            return get_sklearn_model(x['model'])
        else:
            return None
    elif type(x) is str:
        # noinspection PyBroadException
        try:
            return get_sklearn_model(eval(x))
        except:
            pass
    return None


# FIXME: this code is partially duplicated from zoo.text.apply_text-model
def apply_sklearn_model(model, data, *args, mode='fit_transform', return_model=False, **kwargs):
    """
    Apply one or more scikit-learn models to a dataset

    Parameters
    ----------
    :param model: a scikit-learn model (as defined in the *get_sklearn_model* description), or a list of models to be applied
           in sequence
    :param data: a dataset (array or DataFrame)
    :param args: other arguments to pass to the model (after data)
    :param mode: one of 'fit', 'transform', or 'fit_transform' (default: 'fit_transform'); uses scikit-learn syntax
    :param return_model: if True, both the (potentially transformed) data *and* the fitted model (or list of fitted models)
          are returned.  If False, only the (potentially transformed) data is returned.  Default: False
    :param kwargs: other keyword arguments to pass to *all* of the scikit-learn models (in addition to any model-specific
          keyword arguments)

    Returns
    -------
    :return: Either the (potentially transformed) dataset (if return_model == False) or a tuple containing the
    transformed dataset (first element) and the fitted model(s) (second element), if return_model == True.
    """
    assert mode in ['fit', 'transform', 'fit_transform']
    if type(model) is list:
        models = []
        for i, m in enumerate(model):
            if (i < len(model) - 1) and ('transform' not in mode):
                temp_mode = 'fit_transform'
            else:
                temp_mode = mode

            data, m = apply_sklearn_model(m, data, *args, mode=temp_mode, return_model=True, **kwargs)
            models.append(m)

        if return_model:
            return data, models
        else:
            return data
    elif type(model) is dict:
        assert all([k in model.keys() for k in ['model', 'args', 'kwargs']]), ValueError(f'invalid model: {model}')
        return apply_sklearn_model(model['model'], data, *[*model['args'], *args], mode=mode, return_model=return_model,
                                   **update_dict(model['kwargs'], kwargs))

    model = get_sklearn_model(model)
    if model is None:
        raise RuntimeError(f'unsupported model: {model}')
    model = apply_defaults(model)(*args, **kwargs)

    m = getattr(model, mode)
    transformed_data = m(data)
    if return_model:
        return transformed_data, {'model': model, 'args': args, 'kwargs': kwargs}
    return transformed_data


reduce_models = ['UMAP']
reduce_models.extend(import_sklearn_models(decomposition))
reduce_models.extend(import_sklearn_models(manifold))

text_vectorizers = import_sklearn_models(text)

impute_models = import_sklearn_models(impute)

# source: https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.interpolate.html
interpolation_models = ['linear', 'time', 'index', 'pad', 'nearest', 'zero', 'slinear', 'quadratic', 'cubic', 'spline',
                        'barycentric', 'polynomial']


[docs]def list_generalizer(f): """ A decorator that makes a function work for either a single object or a list of objects by calling the function on each element Parameters ---------- :param f: the function to decorate, of the form f(data, *args, **kwargs). Returns ------- :return: A decorated function that supports lists of data objects (rather than only non-list data objects) """ @functools.wraps(f) def wrapped(data, *args, **kwargs): if type(data) == list: return [f(d, *args, **kwargs) for d in data] else: return f(data, *args, **kwargs) return wrapped
[docs]def funnel(f): """ A decorator that coerces any data passed into the function into a pandas DataFrame or a list of DataFrames Parameters ---------- :param f: a function of the form f(data, *args, **kwargs) that assumes data is either a DataFrame or a list of DataFrames Returns ------- :return: A decorated function the supports any wrangle-able data format """ @functools.wraps(f) def wrapped(data, *args, **kwargs): wrangle_kwargs = kwargs.pop('wrangle_kwargs', {}) for fc in format_checkers: wrangle_kwargs[f'{fc}_kwargs'] = kwargs.pop(f'{fc}_kwargs', {}) return f(wrangle(data, **wrangle_kwargs), *args, **kwargs) return wrapped
[docs]def interpolate(f): """ A decorator that fills in missing data by imputing and/or interpolating missing values Parameters ---------- :param f: a function of the form f(data, *args, **kwargs) that assumes the data are formatted as either a DataFrame or a list of DataFrames, with no missing (numpy.nan) values Returns ------- :return: A decorated function that supports any wrangle-able datatype. Pass in the following keyword arguments to fill in missing data: impute_kwargs: a dictionary containing one or more scikit-learn imputation models (e.g., {'model': 'IterativeImputer'}. The 'model' can be specified as defined in the *apply_sklearn_model* function. any other keywords are passed to pandas.DataFrame.interpolate; e.g. method='linear' will apply linear interpolation to fill in missing values. A full list of supported arguments may be found here: https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.interpolate.html If no other keyword arguments are specified, no interpolation is performed. """ @funnel def fill_missing(data, return_model=False, **kwargs): impute_kwargs = kwargs.pop('impute_kwargs', {}) if impute_kwargs: model = impute_kwargs.pop('model', eval(defaults['impute']['model'])) imputed_data, model = apply_sklearn_model(model, data, return_model=True, **impute_kwargs) data = pd.DataFrame(data=imputed_data, index=data.index, columns=data.columns) else: model = None if kwargs: kwargs = update_dict(defaults['interpolate'], kwargs, from_config=True) data = data.interpolate(**kwargs) if return_model: return data, {'model': model, 'args': [], 'kwargs': kwargs} else: return data @functools.wraps(f) def wrapped(data, *args, **kwargs): interp_kwargs = kwargs.pop('interp_kwargs', {}) return f(fill_missing(data, *args, **interp_kwargs), **kwargs) return wrapped
# noinspection PyIncorrectDocstring def pandas_stack(data, names=None, keys=None, verify_integrity=False, sort=False, copy=True, ignore_index=False, levels=None): """ Take a list of DataFrames with the same number of columns and (optionally) a list of names (of the same length as the original list; default: range(len(x))). Return a single MultiIndex DataFrame where the original DataFrames are stacked vertically, with the data names as their level 1 indices and their original indices as their level 2 indices. Parameters ---------- :param data: A single DataFrame or a list of DataFrames with must matching columns. :param names: names for the levels in the resulting hierarchical index. (Default: None) :param keys: if multiple levels passed, should contain tuples. Construct hierarchical index using the passed keys as the outermost level. :param verify_integrity: check whether the new concatenated axis contains duplicates. This can be very expensive relative to the actual data concatenation. (Default: False) :param sort: sort non-concatenation axis if it is not already aligned when join is ‘outer’. This has no effect when join='inner', which already preserves the order of the non-concatenation axis. (Default: False) :param copy: if False, do not copy data unnecessarily. (Default: True) :param ignore_index: if True, do not use the index values along the concatenation axis. The resulting axis will be labeled 0, …, n - 1. This is useful if you are concatenating objects where the concatenation axis does not have meaningful indexing information. Note the index values on the other axes are still respected in the join. (Default: False) :param levels: specific levels (unique values) to use for constructing a MultiIndex. Otherwise they will be inferred from the keys. (Default: None) :param kwargs: any other keyword arguments will be passed to datawrangler.decorate.funnel Returns ------- :return: a single MultiIndex DataFrame """ if is_multiindex_dataframe(data): return data elif is_dataframe(data): data = [data] elif len(data) == 0: return None # ensure that Series objects are cast into DataFrames data = [pd.DataFrame(d) for d in data] assert len(np.unique([d.shape[1] for d in data])) == 1, 'All DataFrames must have the same number of columns' for i, d1 in enumerate(data): template = d1.columns.values for d2 in data[(i + 1):]: assert np.all([(c in template) for c in d2.columns.values]), 'All DataFrames must have the same columns' if keys is None: keys = np.arange(len(data), dtype=int) assert is_array(keys) or (type(keys) == list), f'keys must be None or a list or array of length len(data)' assert len(keys) == len(data), f'keys must be None or a list or array of length len(data)' if names is None: names = ['ID', *[f'ID{i}' for i in range(1, len(data[0].index.names))], None] return pd.concat(data, axis=0, join='outer', names=names, keys=keys, verify_integrity=verify_integrity, sort=sort, copy=copy, ignore_index=ignore_index, levels=levels) def pandas_unstack(x): """ Turn a MultiIndex DataFrame into a list of DataFrames, using the unique top-level index values to divide the data. If the dataset is a "regular" (non-MultiIndex) DataFrame, it is returned as a list with a single element, containing the un-modified DataFrame Parameters ---------- :param x: a single DataFrame, a MultiIndex DataFrame, or a list of DataFrames Returns ------- :return: a list of one or more DataFrames """ if not is_multiindex_dataframe(x): if is_dataframe(x): return x elif issubclass(type(x), pd.Series): return pd.DataFrame(x).T elif type(x) is list and all([is_dataframe(d) for d in x]): return x else: raise Exception(f'Unsupported datatype: {type(x)}') names = list(x.index.names) grouper = 'ID' if not (grouper in names): names[0] = grouper elif not (names[0] == grouper): for i in np.arange( len(names)): # trying n things other than 'ID'; one must be outside of the n-1 remaining names next_grouper = f'{grouper}{i}' if not (next_grouper in names): names[0] = next_grouper grouper = next_grouper break assert names[0] == grouper, 'Unstacking error' x.index.rename(names, inplace=True) groups = list(x.groupby(grouper)) n_levels = len(groups[0][1].index.levels) if n_levels > 2: g = groups[0][1] index = pd.MultiIndex.from_arrays([g.index.get_level_values(len(g.index.levels) - n) for n in range(1, len(g.index.levels))][::-1]) return [d[1].set_index(index) for d in groups] else: return [d[1].set_index(d[1].index.get_level_values(len(d[1].index.levels) - 1)) for d in list(x.groupby(grouper))]
[docs]def apply_stacked(f): """ Decorate a function to adjust how it handles data as follows: - Wrangle the data into DataFrames (the resulting DataFrames must all have the same number of columns). MultiIndex DataFrames are also supported (and can represent already-stacked datasets) - Vertically concatenate the wrangled data - Apply the function to the "stacked" dataset, treating the combined data as a "single" DataFrame - If the original dataset was provided in "unstacked" format, unstack the result into a list of DataFrames - Return the resulting (stacked or unstacked) DataFrame(s) Parameters ---------- :param f: a function of the form f(data, *args, **kwargs) that assumes data is a single DataFrame, and that returns a single DataFrame as output. Returns ------- :return: a decorated function that supports any wrangle-able data types, applies the original function to the full list of datasets simultaneously, and then returns the result(s) as a new DataFrame or list of DataFrames. """ @funnel def wrapped(data, *args, **kwargs): def helper(d, x, split): if split: x0 = x[0] x1 = x[1] else: x0 = x if is_dataframe(d) and type(x0) is list and len(x0) == 1: x0 = x0[0] if split: return x0, x1 else: return x0 stack_result = is_multiindex_dataframe(data) stacked_data = pandas_stack(data) transformed = f(stacked_data, *args, **kwargs) return_model = kwargs.copy().pop('return_model', False) if not stack_result: if ('return_model' in kwargs.keys()) and kwargs['return_model']: transformed[0] = pandas_unstack(transformed[0]) else: transformed = pandas_unstack(transformed) return helper(data, transformed, return_model) return wrapped
[docs]def apply_unstacked(f): """ Decorate a function to adjust how it handles data as follows: - Wrangle the data into a list of DataFrames. MultiIndex DataFrames are also supported (and can represent stacked datasets) - Apply the function (individually) to each DataFrame in the resulting list - If the original dataset was provided in "stacked" format, stack the result into a MultiIndex DataFrame - Return the resulting (stacked or unstacked) DataFrame(s) Parameters ---------- :param f: a function of the form f(data, *args, **kwargs) that assumes data is a single DataFrame, and that returns a single DataFrame as output. Returns ------- :return: A decorated function that supports any wrangle-able data types, applies the original function to the full list of datasets separately, and then returns the result(s) as a new DataFrame or list of DataFrames. """ @funnel def wrapped(data, *args, **kwargs): stack_result = is_multiindex_dataframe(data) unstacked_data = pandas_unstack(data) # noinspection PyArgumentList transformed = list_generalizer(f)(unstacked_data, *args, **kwargs) return_model = kwargs.pop('return_model', False) if return_model: # noinspection PyTypeChecker model = [t[1] for t in transformed] # noinspection PyTypeChecker transformed = [t[0] for t in transformed] if stack_result: transformed = pandas_stack(transformed) if return_model: # noinspection PyUnboundLocalVariable return transformed, model else: return transformed return wrapped