Source code for guardian_ai.fairness.bias_mitigation.sklearn

#!/usr/bin/env python
# -*- coding: utf-8 -*--

# Copyright (c) 2023, 2024 Oracle and/or its affiliates.
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/

"""Sklearn API for bias mitigation"""
from __future__ import annotations
import copy

from typing import (
    TYPE_CHECKING,
    Any,
    Callable,
    Dict,
    List,
    Optional,
    Tuple,
    Union,
    cast,
)

from guardian_ai.fairness.utils.lazy_loader import LazyLoader
from guardian_ai.fairness.metrics import _get_fairness_metric, fairness_metrics_dict
from guardian_ai.utils.exception import GuardianAITypeError, GuardianAIValueError
from guardian_ai.fairness.utils.util import dyn_docstring, _supported_score_metric
from guardian_ai.fairness.metrics.utils import (
    _positive_fairness_names,
    _automl_to_fairlearn_metric_names,
)
from guardian_ai.fairness.metrics.model import (
    _valid_regression_metrics,
)
from guardian_ai.fairness.metrics.utils import (
    _get_rate_scorer,
    _inhouse_metrics,
)


if TYPE_CHECKING:
    import numpy as np
    import optuna
    import pandas as pd
    import plotly.graph_objects as go
    import sklearn.metrics as skl_metrics
    from category_encoders.ordinal import OrdinalEncoder
    from sklearn.base import BaseEstimator
    from sklearn.model_selection import StratifiedShuffleSplit
    from fairlearn.postprocessing import ThresholdOptimizer
else:
    np = LazyLoader("numpy")
    pd = LazyLoader("pandas")
    optuna = LazyLoader("optuna")
    BaseEstimator = LazyLoader("sklearn.base", "BaseEstimator")
    go = LazyLoader("plotly.graph_objects")
    skl_metrics = LazyLoader("sklearn.metrics")
    StratifiedShuffleSplit = LazyLoader(
        "sklearn.model_selection", "StratifiedShuffleSplit"
    )
    OrdinalEncoder = LazyLoader("category_encoders.ordinal", "OrdinalEncoder")
    ThresholdOptimizer = LazyLoader("fairlearn.postprocessing", "ThresholdOptimizer")


[docs] @dyn_docstring( "', '".join(fairness_metrics_dict), "', '".join(_supported_score_metric["binary"]) ) class ModelBiasMitigator: r""" Class to mitigate the bias of an already fitted machine learning model. The mitigation procedure works by multiplying the majority class label by a different scalar for every population subgroup and then rescaling the prediction probabilities, producing tweaked label probabilities. The different multiplying scalars are searched in order to find the best possible trade-offs between any fairness and accuracy metrics passed as input. This object produces a set of optimal fairness-accuracy trade-offs, which can be visualized using the `show_tradeoff` method. A default best multiplier is selected according to parametrizable input constraints. It is possible to select any other multiplier on the trade-off using the ``select_model`` method and inputting the index of the preferred multiplier, as shown when hovering over multipliers in ``show_tradeoff``. Parameters ---------- base_estimator : model object The base estimator on which we want to mitigate bias. protected_attribute_names: str, List[str] The protected attribute names to use to compute fairness metrics. These should always be a part of any input dataset passed. fairness_metric : str, callable The fairness metric to mitigate bias for. - If str, it is the name of the scoring metric. Available metrics are: [%s] - If callable, it has to have the ``fairness_metric(y_true, y_pred, subgroups)`` signature. accuracy_metric : str, callable The accuracy metric to optimize for while mitigating bias. - If str, it is the name of the scoring metric. Available metrics are: [%s] - If callable, it has to have the ``accuracy_metric(y_true, y_pred)`` signature. higher_fairness_is_better : bool, 'auto', default='auto' Whether a higher fairness score with respect to `fairness_metric` is better. Needs to be set to "auto" if `fairness_metric` is a str, in which case it is set automatically. higher_accuracy_is_better : bool, 'auto', default='auto' Whether a higher accuracy score with respect to `accuracy_metric` is better. Needs to be set to "auto" if `accuracy_metric` is a str, in which case it is set automatically. fairness_metric_uses_probas: bool, 'auto', default='auto' Whether or not the fairness metric should be given label probabilities or actual labels as input. Needs to be set to "auto" if `fairness_metric` is a str, in which case it is set automatically. accuracy_metric_uses_probas: bool, 'auto', default='auto' Whether or not the accuracy metric should be given label probabilities or actual labels as input. Needs to be set to "auto" if `accuracy_metric` is a str, in which case it is set automatically. constraint_target: str, default='accuracy' On which metric should the constraint be applied for default model selection. Possible values are ``'fairness'`` and ``'accuracy'``. constraint_type: str, default='relative' Which type of constraint should be used to select the default model. Possible values are: - ``'relative'``: Apply a constraint relative to the best found models. A relative constraint on accuracy with F1 metric would look at the best F1 model found and tolerate a ``constraint_value`` relative deviation to it at max, returning the model with the best fairness within that constraint. - ``'absolute'``: Apply an absolute constraint to best found models. An absolute constraint on fairness with Equalized Odds metric would only consider models with Equalized Odds below ``constraint_value``, returning the model with the best accuracy within that constraint. constraint_value: float, default=0.05 What value to apply the constraint with when selecting the default model. Look at ``constraint_type``'s documentation for more details. base_estimator_uses_protected_attributes: bool, default=True Whether or not ``base_estimator`` uses the protected attributes for inference. If set to ``False``, protected attributes will be removed from any input dataset before being collecting predictions from ``base_estimator``. n_trials_per_group: int, default=100 Number of different multiplying scalars to consider. Scales linearly with the number of groups in the data, i.e. ``n_trials = n_trials_per_group * n_groups``. When both ``n_trials_per_group`` and ``time_limit`` are specified, the first occurrence will stop the search procedure. time_limit: float or None, default=None Number of seconds to spend in search at maximum. ``None`` value means no time limit is set. When both ``n_trials_per_group`` and ``time_limit`` are specified, the first occurrence will stop the search procedure. subsampling: int, default=50000 The number of rows to subsample the dataset to when tuning. This parameter drastically improves running time on large datasets with little decrease in overall performance. Can be deactivated by passing ``numpy.inf``. regularization_factor: float, default=0.001 The amount of regularization to be applied when selecting multipliers. favorable_label_idx: int, default=1 Index of the favorable label to use when computing metrics. random_seed: int, default=0 Random seed to ensure reproducible outcome. Attributes ---------- tradeoff_summary_: pd.DataFrame DataFrame containing the optimal fairness-accuracy trade-off models with only the most relevant information. selected_multipliers_idx_: int Index of the currently selected model for ``self._best_trials_detailed``. selected_multipliers_: pd.DataFrame DataFrame containing the multipliers for each sensitive group that are currently used for inference. constrained_metric_: str Name of the metric on which the constraint is applied. unconstrained_metric_: str Name of the metric on which no constraint is applied. constraint_criterion_value_: float Value of the constraint being currently applied. Raises ------ GuardianAITypeError, GuardianAIValueError Will be raised when one input argument is invalid Examples -------- .. code-block:: python from guardian_ai.fairness.bias_mitigation import ModelBiasMitigator bias_mitigated_model = ModelBiasMitigator(model, protected_attribute_names='sex', fairness_metric='equalized_odds', accuracy_metric='balanced_accuracy') # Scikit-learn like API supported bias_mitigated_model.fit(X_val, y_val) y_pred_proba = bias_mitigated_model.predict_proba(X_test) y_pred_labels = bias_mitigated_model.predict(X_test) # Use show_tradeoff() to display all available models bias_mitigated_model.show_tradeoff() # Can select a specific model manually bias_mitigated_model.select_model(1) # Predictions are now made with new model y_pred_proba = bias_mitigated_model.predict_proba(X_test) y_pred_labels = bias_mitigated_model.predict(X_test) """ # noqa D412 def __init__( self, base_estimator: BaseEstimator, protected_attribute_names: Union[str, List[str]], fairness_metric: Union[str, Callable], accuracy_metric: Union[str, Callable], higher_fairness_is_better: Union[bool, str] = "auto", higher_accuracy_is_better: Union[bool, str] = "auto", fairness_metric_uses_probas: Union[bool, str] = "auto", accuracy_metric_uses_probas: Union[bool, str] = "auto", constraint_target: str = "accuracy", constraint_type: str = "relative", constraint_value: float = 0.05, base_estimator_uses_protected_attributes: bool = True, n_trials_per_group: int = 100, time_limit: Optional[float] = None, subsampling: int = 50000, regularization_factor: float = 1e-3, favorable_label_idx: int = 1, random_seed: int = 0, ): optuna.logging.set_verbosity(optuna.logging.WARNING) self._base_estimator = base_estimator self._protected_attribute_names = protected_attribute_names self._higher_fairness_is_better = higher_fairness_is_better self._higher_accuracy_is_better = higher_accuracy_is_better self._fairness_metric_uses_probas = fairness_metric_uses_probas self._accuracy_metric_uses_probas = accuracy_metric_uses_probas self._constraint_target = constraint_target self._constraint_type = constraint_type self._constraint_value = constraint_value self._base_estimator_uses_protected_attributes = ( base_estimator_uses_protected_attributes ) self._n_trials_per_group = n_trials_per_group self._time_limit = time_limit self._subsampling = subsampling self._regularization_factor = regularization_factor self._favorable_label_idx = favorable_label_idx self._random_seed = random_seed self._warmstart_grid_resolution = 100 self._warmstart = True self._set_metric_names_and_callables(fairness_metric, accuracy_metric) # Public attributes to be set by `fit` self.tradeoff_summary_: Optional[pd.DataFrame] = None self.selected_multipliers_idx_: Optional[int] = None self.constrained_metric_: Optional[str] = None self.unconstrained_metric_: Optional[str] = None self.constraint_criterion_value_: Optional[float] = None # Private attributes to be set by `fit` self._best_trials_detailed: Optional[pd.DataFrame] = None self._accuracy_base_: Optional[float] = None self._fairness_base_: Optional[float] = None self._unique_groups_: Optional[np.ndarray] = None self._unique_group_names_: Optional[list] = None self._multiplier_names_: Optional[List[str]] = None self._admissible_trials_mask_: Optional[pd.DataFrame] = None self._third_objective = True self._third_objective_name = "levelling_down" self._validate_current_state() # Setup the objective function def objective_fn(trial): probas = self._probas_predicted.copy() multipliers = {} for group_name, multiplier_name in zip( self._unique_group_names_, self._multiplier_names_ ): min_val, max_val = self._group_ranges[group_name] multipliers[multiplier_name] = trial.suggest_float( multiplier_name, min_val, max_val, log=True ) penalty_acc, penalty_fairness = self._get_multiplier_penalty( multipliers, self._group_ranges, self._unique_group_names_ ) probas = _apply_multiplier( probas, self._groups, self._unique_groups_, self._unique_group_names_, multipliers, self._favorable_label_idx, ) perf = self._get_accuracy_score(self._y, probas) fairness = self._get_fairness_score(self._y, probas, self._groups) if self.fairness_metric_name in _valid_regression_metrics: adjusted_fairness_regressions = [] base_fairness_trial = self._regression_metric_trials_["base"] adjusted_fairness_trial = self._get_outcome_rates( self._y, probas, self._groups ) self._regression_metric_trials_[trial._trial_id] = ( adjusted_fairness_trial ) for ( group_fairness_name, group_fairness_value, ) in adjusted_fairness_trial.items(): # Cur version: maximize avg_{groups} (min(TPR_group_adjusted - TPR_group_original, 0)) if self.fairness_metric_name in _positive_fairness_names: adjusted_fairness_regressions.append( min( 0, group_fairness_value - base_fairness_trial[group_fairness_name], ) ) else: adjusted_fairness_regressions.append( min( 0, base_fairness_trial[group_fairness_name] - group_fairness_value, ) ) avg_adjusted_fairness_regression = sum( adjusted_fairness_regressions ) / len(adjusted_fairness_regressions) self._avg_fairness_regression_[trial._trial_id] = ( avg_adjusted_fairness_regression ) if ( self.fairness_metric_name in _valid_regression_metrics and self._third_objective ): return ( perf + penalty_acc, fairness + penalty_fairness, avg_adjusted_fairness_regression, ) else: return perf + penalty_acc, fairness + penalty_fairness self._objective_fn = objective_fn def _validate_current_state(self): """ Validate current attributes have valid values. Raises ------ GuardianAITypeError Will be raised when one input argument has invalid type GuardianAIValueError Will be raised when one input argument has invalid value """ if isinstance(self._protected_attribute_names, str): self._protected_attribute_names = [self._protected_attribute_names] if self._higher_accuracy_is_better != "auto": if not isinstance(self._higher_accuracy_is_better, bool): raise GuardianAIValueError( "`higher_accuracy_is_better` should be a bool or 'auto'" f", received {self._higher_accuracy_is_better} instead." ) if self._higher_fairness_is_better != "auto": if not isinstance(self._higher_fairness_is_better, bool): raise GuardianAIValueError( "`higher_fairness_is_better` should be a bool or 'auto'" f", received {self._higher_fairness_is_better} instead." ) if self._fairness_metric_uses_probas != "auto": if not isinstance(self._fairness_metric_uses_probas, bool): raise GuardianAIValueError( "`_fairness_metric_uses_probas` should be a bool or 'auto'" f", received {self._fairness_metric_uses_probas} instead." ) if self._accuracy_metric_uses_probas != "auto": if not isinstance(self._accuracy_metric_uses_probas, bool): raise GuardianAIValueError( "`_accuracy_metric_uses_probas` should be a bool or 'auto'" f", received {self._accuracy_metric_uses_probas} instead." ) supported_constraint_targets = ["accuracy", "fairness"] if self._constraint_target not in supported_constraint_targets: raise GuardianAIValueError( f"Received `{self._constraint_target}` for `constraint_target`. " f"Supported values are {supported_constraint_targets}" ) supported_constraint_types = ["absolute", "relative"] if self._constraint_type not in supported_constraint_types: raise GuardianAIValueError( f"Received `{self._constraint_type}` for `constraint_type`. " f"Supported values are {supported_constraint_types}" ) if not isinstance(self._constraint_value, (float, int)): raise GuardianAITypeError( "`constraint_value` should be a float, received " f"{self._constraint_type} instead." ) if not isinstance(self._base_estimator_uses_protected_attributes, bool): raise GuardianAITypeError( "`base_estimator_uses_protected_attributes` should be a bool" f", received {self._base_estimator_uses_protected_attributes} instead." ) if self._n_trials_per_group is not None: if ( not isinstance(self._n_trials_per_group, int) or self._n_trials_per_group <= 0 ): raise GuardianAIValueError( "`n_trials_per_group` should be a positive integer or None, received " f"{self._n_trials_per_group} instead." ) if self._time_limit is not None: if ( not isinstance(self._time_limit, (float, int)) or self._time_limit <= 0.0 ): raise GuardianAIValueError( "`time_limit` should be a positive float or None, received " f"{self._time_limit} instead." ) if self._n_trials_per_group is None and self._time_limit is None: raise GuardianAIValueError( "`n_trials_per_group` and `time_limit` cannot both be None." ) if not isinstance(self._subsampling, int) or self._subsampling <= 0: if not np.isinf(self._subsampling): raise GuardianAIValueError( "`subsampling` should be a positive integer or `np.inf`, received " f"{self._subsampling} instead." ) if ( not isinstance(self._regularization_factor, (float, int)) or self._regularization_factor < 0 ): raise GuardianAIValueError( "`regularization_factor` should be a non-negative float, received " f"{self._regularization_factor} instead." ) if ( not isinstance(self._favorable_label_idx, int) or self._favorable_label_idx < 0 ): raise GuardianAIValueError( "`favorable_label_idx` should be a non-negative integer, received " f"{self._favorable_label_idx} instead." ) if self._random_seed is not None: if not isinstance(self._random_seed, int) or self._random_seed < 0: raise GuardianAIValueError( "`random_seed` should be a non-negative integer or None, received " f"{self._random_seed} instead." ) def _set_metric_names_and_callables( self, fairness_metric: Union[str, Callable], accuracy_metric: Union[str, Callable], ): """ Grab fairness and accuracy metrics from input arguments. Set values to _callable and _name attributes for fairness and accuracy metrics. Arguments --------- fairness_metric: str, Callable The fairness metric to use. accuracy_metric: str, Callable The accuracy metric to use. Raises ------ GuardianAITypeError Will be raised if one the metrics is not a str or callable GuardianAIValueError Will be raised if there is an invalid combination of a metric and its `higher_is_better` and `uses_probas` attributes. """ self.accuracy_metric_callable: Callable self.accuracy_metric_name: str self.fairness_metric_callable: Callable self.fairness_metric_name: str if isinstance(accuracy_metric, str): if self._higher_accuracy_is_better != "auto": raise GuardianAIValueError( '`higher_accuracy_is_better` should be set to "auto" when' "`accuracy_metric` is a str." ) if self._accuracy_metric_uses_probas != "auto": raise GuardianAIValueError( '`accuracy_metric_uses_probas` should be set to "auto" when' "`accuracy_metric` is a str." ) metric_object = skl_metrics.get_scorer(accuracy_metric) self.accuracy_metric_callable = _PredictionScorer(metric_object) self.accuracy_metric_name = accuracy_metric # Always true because scores are inverted by sklearn when needed self._higher_accuracy_is_better = True """Direct use of `_ProbaScorer` and `_ThresholdScorer` is deprecated after version 1.4 Consider checking response methods dynamically. See: https://github.com/scikit-learn/scikit-learn/pull/26840/files""" self._accuracy_metric_uses_probas = isinstance( metric_object, skl_metrics._scorer._Scorer, ) and ( skl_metrics._scorer._check_response_method( self._base_estimator, metric_object._response_method ).__name__ in ["predict_proba", "decision_function"] ) elif callable(accuracy_metric): if self._higher_accuracy_is_better == "auto": raise GuardianAIValueError( "`higher_accuracy_is_better` should be manually set when" "`accuracy_metric` is a callable." ) if self._accuracy_metric_uses_probas == "auto": raise GuardianAIValueError( "`accuracy_metric_uses_probas` should be manually set when" "`accuracy_metric` is a callable." ) self.accuracy_metric_callable = accuracy_metric self.accuracy_metric_name = accuracy_metric.__name__ else: raise GuardianAITypeError( "`accuracy_metric` should be a `str` or callable. Received " f"{accuracy_metric} instead." ) if isinstance(fairness_metric, str): if self._higher_fairness_is_better != "auto": raise GuardianAIValueError( '`higher_fairness_is_better` should be set to "auto" when' "`fairness_metric` is a str." ) if self._fairness_metric_uses_probas != "auto": raise GuardianAIValueError( '`fairness_metric_uses_probas` should be set to "auto" when' "`fairness_metric` is a str." ) self.fairness_metric_callable = _get_fairness_metric(fairness_metric) self.fairness_metric_name = fairness_metric self._higher_fairness_is_better = False self._fairness_metric_uses_probas = False elif callable(fairness_metric): if self._higher_fairness_is_better == "auto": raise GuardianAIValueError( "`higher_fairness_is_better` should be manually set when" "`fairness_metric` is a callable." ) if self._fairness_metric_uses_probas == "auto": raise GuardianAIValueError( "`fairness_metric_uses_probas` should be manually set when" "`fairness_metric` is a callable." ) self.fairness_metric_callable = fairness_metric self.fairness_metric_name = fairness_metric.__name__ else: raise GuardianAITypeError( "`fairness_metric` should be a `str` or callable. Received " f"{fairness_metric} instead." ) def _get_fairness_score( self, y_true: np.ndarray, y_probas: np.ndarray, groups: pd.DataFrame ) -> float: """ Get fairness score. Arguments --------- y_true: np.ndarray True labels y_probas: np.ndarray Label probabilities groups: pd.DataFrame Protected attribute(s) value(s) for every sample. Returns ------- float: score The fairness score """ if self._fairness_metric_uses_probas: y_pred = y_probas[:, self._favorable_label_idx] else: y_pred = y_probas.argmax(-1) return self.fairness_metric_callable(y_true, y_pred, groups) def _get_accuracy_score(self, y_true: np.ndarray, y_probas: np.ndarray) -> float: """ Get accuracy score. Arguments --------- y_true: np.ndarray True labels y_probas: np.ndarray Label probabilities Returns ------- float: score The accuracy score """ if self._accuracy_metric_uses_probas: y_pred = y_probas else: y_pred = y_probas.argmax(-1) return self.accuracy_metric_callable(y_true, y_pred) def _get_outcome_rates( self, y_true: np.ndarray, y_probas: np.ndarray, groups: pd.DataFrame ) -> Dict: """ Get raw outcome rate scores. Arguments --------- y_true: np.ndarray True labels y_probas: np.ndarray Label probabilities groups: pd.DataFrame Protected attribute(s) value(s) for every sample. Returns ------- Dict: score The fairness score """ if self._fairness_metric_uses_probas: y_pred = y_probas[:, self._favorable_label_idx] else: y_pred = y_probas.argmax(-1) outcome_rates = {} for group_name in self._group_masks: mask = self._group_masks[group_name] outcome_rates[group_name] = self._rate_scorer(y_true[mask], y_pred[mask]) return outcome_rates
[docs] def fit(self, X: pd.DataFrame, y: Union[pd.DataFrame, pd.Series, np.ndarray]): """ Apply bias mitigation to the base estimator given a dataset and labels. Note that it is highly recommended you use a validation set for this method, so as to have a more representative range of probabilities for the model instead of the potentially skewed probabilities on training samples. Parameters ---------- X: pd.DataFrame The dataset on which to mitigate the estimator's bias. y: pd.DataFrame, pd.Series, np.ndarray The labels for which to mitigate the estimator's bias. Returns ------- self: ModelBiasMitigator The fitted ModelBiasMitigator object. Raises ------ GuardianAIValueError Raised when an invalid value is encountered. """ groups, group_names = self._prepare_subgroups(X) X, y, group_names, groups = self._apply_subsampling(X, y, group_names, groups) self._probas_predicted = self._get_base_probas(X) self._groups = groups self._X = X self._y = y # Includes the original model and, depending on the fairness metrics, additional solutions # obtained by leveling up all groups and from the EO paper. default_multipliers, max_multiplier = self._warmstart_multipliers( X, y, groups, ) self._default_multipliers = default_multipliers self._max_multiplier = max_multiplier self._group_ranges = self._get_group_ranges( self._probas_predicted, groups, max_multiplier, ) self._regression_metric_trials_ = {} self._avg_fairness_regression_ = {} self._accuracy_base_ = self._get_accuracy_score(y, self._probas_predicted) self._fairness_base_ = self._get_fairness_score( y, self._probas_predicted, groups ) if self.fairness_metric_name in _valid_regression_metrics: self._regression_metric_trials_["base"] = self._get_outcome_rates( y, self._probas_predicted, groups ) sampler = optuna.samplers.NSGAIISampler(seed=self._random_seed) study = optuna.create_study( directions=self._get_optimization_directions(), sampler=sampler ) if self._unique_group_names_ is None: raise GuardianAIValueError("_unique_group_names cannot be None!") for multipliers in default_multipliers: study.enqueue_trial(multipliers) # Finally, we allow Optuna to use these starting points to search for solutions # that trade-off between these three solutions/objectives. study.optimize( self._objective_fn, n_trials=self._n_trials_per_group * len(self._unique_group_names_), timeout=self._time_limit, show_progress_bar=True, ) self._produce_best_trials_frame(study, self._group_ranges) self._select_best_model_from_constraints() # Otherwise the object cannot be pickled self._objective_fn = None return self
def _prepare_subgroups(self, X: pd.DataFrame) -> Tuple[pd.DataFrame, pd.Series]: """ Handle protected subgroups logic. Sets the `_unique_groups_`, `_unique_group_names_` and `_multiplier_names_` attributes. Arguments --------- X: pd.DataFrame Dataset to prepare subgroups for. Returns ------- (pd.DataFrame, pd.Series) Tuple containing groups: pd.DataFrame DataFrame mapping every sample to its protected attribute(s) ] value(s). group_names: pd.Series Series mapping every sample to its unique group name. Raises ------ GuardianAIValueError Raised when an invalid value is encountered. """ groups = X[self._protected_attribute_names].astype("category") unique_groups_and_counts = groups.value_counts().reset_index(name="count") unique_groups = unique_groups_and_counts[self._protected_attribute_names] self._unique_groups_ = unique_groups.values unique_groups["name"] = unique_groups.apply( lambda x: "--".join( [f"{attr}={x[attr]}" for attr in self._protected_attribute_names] ), axis=1, ) self._unique_group_names_ = unique_groups["name"].tolist() self._multiplier_names_ = [ f"multiplier_{group_name}" for group_name in self._unique_group_names_ # type: ignore ] if self._unique_group_names_ is None: raise GuardianAIValueError("_unique_groupe_names cannot be None!") if self._multiplier_names_ is None: raise GuardianAIValueError("_multiplier_names cannot be None!") groups["name"] = "" for group, group_name in zip( self._unique_groups_, self._unique_group_names_ # type: ignore ): # type: ignore mask = (groups.drop(columns="name") == group).all(1).to_numpy().squeeze() groups["name"][mask] = group_name return groups.drop(columns="name"), groups["name"] def _apply_subsampling( self, X: pd.DataFrame, y: pd.Series, group_names: pd.Series, groups: pd.DataFrame, ) -> Tuple[pd.DataFrame, pd.Series, pd.Series, pd.DataFrame]: """ Apply subsampling on the input dataset. The subsampling applied is stratified sampling from the groundtruth labels. Arguments --------- X: pd.DataFrame The dataset on which to apply subsampling. y: pd.Series The labels on which to apply subsampling. group_names: pd.Series Series mapping every sample to its unique group name. Used to apply subsampling and also to return subsampled version. groups: pd.DataFrame DataFrame mapping every sample to its protected attribute(s) value(s). Subsampling is applied to it. Returns ------- pd.DataFrame, pd.Series, pd.Series, pd.DataFrame Tuple containing X_subsampled: pd.DataFrame Subsampled version of input X y_subsampled: pd.Series Subsampled version of input y group_names_subsampled: pd.Series Subsampled version of input group_names groups_subsampled: pd.DataFrame Subsampled version of input groups """ if len(X) <= self._subsampling: return X, y, group_names, groups else: n_samples = min(len(X), self._subsampling) sss = StratifiedShuffleSplit( n_splits=1, test_size=n_samples / len(X), random_state=self._random_seed ) stratas = pd.concat((groups, y), axis=1) stratas = OrdinalEncoder().fit_transform(stratas) _, idxs = next( iter(sss.split(np.arange(0, len(X)).reshape(-1, 1), y=stratas)) ) return X.iloc[idxs], y.iloc[idxs], group_names.iloc[idxs], groups.iloc[idxs] def _get_group_ranges( self, probas, groups: pd.DataFrame, max_multiplier, ) -> Dict: """ Return the range for which to search multipliers for each sensitive group. The logic is that if probabilities are constrained to [0.45, 0.55] range, we should be looking at multipliers much closer to 1 than if the probabilities are in [0.05, 0.95] range. In the former case, a multiplier of 1.25 suffices to flip all predictions while in the latter a multiplier of 10 is not enough to flip all predictions. The returned ranges are set to ensure that total prediction flips are possible and that we constrain the search to relevant multipliers (e.g. it's pointless to test a value of 1.5 if 1.25 already suffices to flip all predictions). If there is already a large probability coverage (e.g. [0.05, 0.95]), we constraint the multipliers search range for this group to [0.1, 10] as a reasonable enough default. Arguments --------- probas: pd.DataFrame The probabilities used to collect group-specific probability ranges. groups: pd.DataFrame The groups used to separate samples. Returns ------- Dict: group_ranges Dictionary mapping every group name to its (min, max) range to consider for multipliers. Raises ------ GuardianAIValueError Raised when an invalid value is encountered. """ group_ranges = {} for group, group_name in zip(self._unique_groups_, self._unique_group_names_): mask = (groups == group).all(1).to_numpy().squeeze() min_proba = probas[mask].min() max_proba = probas[mask].max() ratio = max_proba / (min_proba + 1e-6) ratio = min(ratio, max_multiplier) group_ranges[group_name] = (1 / ratio, ratio) return group_ranges def _get_multiplier_penalty( self, multipliers: Dict, group_ranges: Dict, unique_group_names: Optional[List] ) -> Tuple[float, float]: """ Get the multiplier penalty for both the fairness and accuracy metrics. Returned values are already adjusted to be either negative or positive penalties so they can be added directly to the scores. Arguments --------- multipliers: Dict Mapping from multiplier name to multiplier value. group_ranges: Dict Mapping from group name to group range (min_val, max_val). unique_group_names: List or None Array of all unique group names. Returns ------- (float, float) Tuple containing accuracy_penalty: float The penalty to be applied on the accuracy score. fairness_penalty: float The penalty to be applied on the fairness score. Raises ------ GuardianAIValueError Raised when an invalid value is encountered. """ if unique_group_names is None: raise GuardianAIValueError("unique_group_names cannot be None!") multiplier_reg_penalty = [] for group_name in unique_group_names: multiplier = multipliers[f"multiplier_{group_name}"] _, max_val = group_ranges[group_name] penalty = np.abs(np.log(multiplier)) / np.log(max_val) multiplier_reg_penalty.append(penalty) penalty = self._regularization_factor * np.mean(multiplier_reg_penalty) penalty_direction_acc = -1 if self._higher_accuracy_is_better else 1 penalty_direction_fairness = -1 if self._higher_fairness_is_better else 1 return penalty * penalty_direction_acc, penalty * penalty_direction_fairness def _get_pareto_efficient_points(self, metrics): """ Find the pareto-efficient points :param cometricssts: An (n_points, n_costs) array :return: A (n_points, ) boolean array, indicating whether each point is Pareto efficient """ is_efficient = np.ones(metrics.shape[0], dtype=bool) for i, m in enumerate(metrics): if is_efficient[i]: is_efficient[is_efficient] = np.any( metrics[is_efficient] > m, axis=1 ) # Keep any point with a higher cost is_efficient[i] = True # And keep self return is_efficient def _produce_best_trials_frame(self, study: optuna.Study, group_ranges: Dict): """ Produce _best_trials_detailed dataframe from optuna Study object. Arguments --------- study: optuna.Study The completed study object. group_ranges: Dict Mapping from group name to group range (min_val, max_val). Raises ------ GuardianAIValueError Raised when an invalid value is encountered. """ if self._multiplier_names_ is None: raise GuardianAIValueError("_multiplier_names_ cannot be None!") regularized_metric_names = [ f"{metric}_regularized" for metric in [self.accuracy_metric_name, self.fairness_metric_name] ] metric_names = copy.deepcopy(regularized_metric_names) if ( self.fairness_metric_name in _valid_regression_metrics and self._third_objective ): metric_names.append("Third Objective: " + self._third_objective_name) if self.fairness_metric_name in _valid_regression_metrics: fairness_trial_names = list( list(self._regression_metric_trials_.items())[0][1].keys() ) _prefix = ( "PPR" if self.fairness_metric_name == "statistical_parity" else self.fairness_metric_name ) fairness_trial_names = [ f"{_prefix}_{name}" for name in fairness_trial_names ] df = pd.DataFrame( [ ( *trial.values, *trial.params.values(), *self._regression_metric_trials_[trial._trial_id].values(), ) for trial in study.best_trials ], columns=metric_names + self._multiplier_names_ + fairness_trial_names, ) else: df = pd.DataFrame( [ (*trial.values, *trial.params.values()) for trial in study.best_trials ], columns=regularized_metric_names + self._multiplier_names_, ) # This is calculated for when objective_fn considers only two metrics: if self.fairness_metric_name in _valid_regression_metrics: avg_fairness_regression_best_trials = [] for trial in study.best_trials: avg_fairness_regression_best_trials.append( self._avg_fairness_regression_[trial._trial_id] ) df[self._third_objective_name] = pd.Series( avg_fairness_regression_best_trials ).values df[self._third_objective_name] = ( np.sign(df[self._third_objective_name]) * df[self._third_objective_name] ) df["Third Objective: " + self._third_objective_name] = df[ self._third_objective_name ] # Unwrap regularization factors regularization_factors = np.array( [ self._get_multiplier_penalty( multipliers, group_ranges, self._unique_group_names_ ) for _, multipliers in df[self._multiplier_names_].iterrows() ], dtype=float, ) df["regularization_accuracy"] = regularization_factors[:, 0] df["regularization_fairness"] = regularization_factors[:, 1] df[self.accuracy_metric_name] = ( df[f"{self.accuracy_metric_name}_regularized"] - df["regularization_accuracy"] ) df[self.fairness_metric_name] = ( df[f"{self.fairness_metric_name}_regularized"] - df["regularization_fairness"] ) # Remove possible multipliers duplicates df = df.drop_duplicates(self._multiplier_names_) # Filter pareto front solutions if ( self.fairness_metric_name in _valid_regression_metrics and self._third_objective ): pareto_columns = [ self.accuracy_metric_name, self.fairness_metric_name, self._third_objective_name, ] else: pareto_columns = [self.accuracy_metric_name, self.fairness_metric_name] datapoints = df[pareto_columns].copy() datapoints[self.fairness_metric_name] = -datapoints[self.fairness_metric_name] datapoints = datapoints.to_numpy() df = df.iloc[np.where(self._get_pareto_efficient_points(datapoints))] # Sort best trials by fairness df = df.sort_values(by=self.fairness_metric_name) # Need to reset index so that it's ordered by fairness df = df.reset_index(drop=True) self._best_trials_detailed = df self.tradeoff_summary_ = df.drop( [col for col in df.columns if self._should_drop_column(col)], axis=1, ) self.tradeoff_summary_ = self.tradeoff_summary_[ self.tradeoff_summary_.columns[::-1] ] def _should_drop_column(self, column: str) -> bool: """Determines if the specified column should be dropped from tradeoff_summary_. Arguments --------- column : str The name of the column to check. Returns ------- bool: True if the column should be dropped, False otherwise. """ unwanted_col_keys = ["regulariz", "Third Objective"] return any(unwanted_key in column for unwanted_key in unwanted_col_keys) def _get_optimization_directions(self) -> List[str]: """ Return optimization direction list used by Optuna to optimize fairness-accuracy trade-off. Returns ------- optimization_directions: List[str] List of str corresponding to optimization directions for the two metrics. """ def _get_optimization_direction(higher_is_better: Union[bool, str]): return "maximize" if higher_is_better else "minimize" if ( self.fairness_metric_name in _valid_regression_metrics and self._third_objective ): return [ _get_optimization_direction(self._higher_accuracy_is_better), _get_optimization_direction(self._higher_fairness_is_better), _get_optimization_direction( True ), # higher outcome regression is better ] else: return [ _get_optimization_direction(self._higher_accuracy_is_better), _get_optimization_direction(self._higher_fairness_is_better), ] def _get_base_probas(self, X: pd.DataFrame) -> np.ndarray: """ Get the probabilities from the base estimator on a dataset. Is in charge of removing the protected attributes if needed. Arguments --------- X: pd.DataFrame The dataset for which to collect label probabilities. Returns ------- probas: np.ndarray Label probabilities for every row in X. """ if self._base_estimator_uses_protected_attributes: return self._base_estimator.predict_proba(X) # type: ignore else: return self._base_estimator.predict_proba( # type: ignore X.drop(columns=self._protected_attribute_names) # type: ignore ) def _select_best_model_from_constraints(self): """ Select best model from the available trade-offs according to constraint. Calls `select_best_model` with best found model. Raises ------ GuardianAIValueError If ``constraint_target`` or ``constraint_type`` have invalid values. """ if self._constraint_target == "fairness": self.constrained_metric_ = self.fairness_metric_name self.unconstrained_metric_ = self.accuracy_metric_name constrained_higher_is_better = self._higher_fairness_is_better unconstrained_higher_is_better = self._higher_accuracy_is_better elif self._constraint_target == "accuracy": self.constrained_metric_ = self.accuracy_metric_name self.unconstrained_metric_ = self.fairness_metric_name constrained_higher_is_better = self._higher_accuracy_is_better unconstrained_higher_is_better = self._higher_fairness_is_better else: raise GuardianAIValueError( "Only `accuracy` and `fairness` are supported for " f"`constraint_target`. Received {self._constraint_target}" ) if self._constraint_type == "relative": if constrained_higher_is_better: ref_val = self._best_trials_detailed[self.constrained_metric_].max() relative_ratio = ( (1 - self._constraint_value) if ref_val > 0 else (1 + self._constraint_value) ) self.constraint_criterion_value_ = relative_ratio * ref_val self._admissible_trials_mask_ = ( self._best_trials_detailed[self.constrained_metric_] >= self.constraint_criterion_value_ ) else: ref_val = self._best_trials_detailed[self.constrained_metric_].min() relative_ratio = ( (1 + self._constraint_value) if ref_val > 0 else (1 - self._constraint_value) ) self.constraint_criterion_value_ = relative_ratio * ref_val self._admissible_trials_mask_ = ( self._best_trials_detailed[self.constrained_metric_] <= self.constraint_criterion_value_ ) elif self._constraint_type == "absolute": self.constraint_criterion_value_ = self._constraint_value if constrained_higher_is_better: self._admissible_trials_mask_ = ( self._best_trials_detailed[self.constrained_metric_] >= self.constraint_criterion_value_ ) else: self._admissible_trials_mask_ = ( self._best_trials_detailed[self.constrained_metric_] <= self.constraint_criterion_value_ ) else: raise GuardianAIValueError( "Only `relative` and `absolute` are supported for " f"`constraint_type`. Received {self._constraint_type}" ) admissible = self._best_trials_detailed[self._admissible_trials_mask_] if unconstrained_higher_is_better: best_model = admissible[self.unconstrained_metric_].idxmax() else: best_model = admissible[self.unconstrained_metric_].idxmin() self.select_model(best_model)
[docs] def predict(self, X: pd.DataFrame) -> np.ndarray: """ Predict class for input dataset X. Parameters ---------- X: pd.DataFrame The dataset for which to collect labels. Returns ------- labels: np.ndarray The labels for every sample. """ return self.predict_proba(X).argmax(-1)
[docs] def predict_proba(self, X: pd.DataFrame) -> np.ndarray: """ Predict class probabilities for input dataset X. Parameters ---------- X: pd.DataFrame The dataset for which to collect label probabilities. Returns ------- probabilities: np.ndarray The label probabilities for every sample. """ probas = self._get_base_probas(X) groups = X[self._protected_attribute_names].astype("category") self._unique_groups_ = cast(np.ndarray, self._unique_groups_) return _apply_multiplier( probas, groups, self._unique_groups_, self._unique_group_names_, self.selected_multipliers_, self._favorable_label_idx, )
[docs] def show_tradeoff(self, hide_inadmissible: bool = False): """ Show the models representing the best fairness-accuracy trade-off found. Arguments --------- hide_inadmissible: bool, default=False Whether or not to hide the models that don't satisfy the constraint. """ metric_is_valid = self.fairness_metric_name in _valid_regression_metrics level_down = "levelling_down: %{customdata:.4f}</br>" if metric_is_valid else "" df = self._best_trials_detailed if hide_inadmissible: df = df[self._admissible_trials_mask_] # type: ignore df = df.reset_index() # type: ignore marker = None if metric_is_valid: marker = dict( color=df[self._third_objective_name], symbol=[ "star" if val == 0.0 else "circle" for val in df[self._third_objective_name] ], colorscale="bluered", colorbar=dict(title="Levelling Down"), showscale=True, cmin=0, cmax=max( max(df[self._third_objective_name]), max(df[self.fairness_metric_name]), ), ) fig = go.Figure() fig.add_trace( go.Scatter( x=df[self.fairness_metric_name], y=df[self.accuracy_metric_name], customdata=df[self._third_objective_name] if metric_is_valid else None, text=df["index"], line_shape="vh" if self._higher_fairness_is_better else "hv", mode="markers" if metric_is_valid else "markers+lines", marker=marker, hovertemplate=f"{self.fairness_metric_name}" + ": %{x:.4f}" + f"<br>{self.accuracy_metric_name}" + ": %{y:.4f}</br>" + level_down + "Index: %{text}</br>", name="Multiplier Tuning (Best Models)", ) ) # dummy traces to show leveling down legend if metric_is_valid: fig.add_trace( go.Scatter( x=[None], y=[None], mode="markers", marker=dict( size=10, color="blue", symbol="star", ), name="No Levelling Down", ) ) fig.add_trace( go.Scatter( x=[self._fairness_base_], y=[self._accuracy_base_], mode="markers", marker_symbol="cross", marker_color="green", marker_size=10, hovertemplate=f"{self.fairness_metric_name}" + ": %{x:.4f}" + f"<br>{self.accuracy_metric_name}" + ": %{y:.4f}</br>", name="Base Estimator", ) ) fig.add_trace( go.Scatter( x=[ self._best_trials_detailed[self.fairness_metric_name].iloc[ # type: ignore self.selected_multipliers_idx_ ] ], y=[ self._best_trials_detailed[self.accuracy_metric_name].iloc[ # type: ignore self.selected_multipliers_idx_ ] ], mode="markers", marker_symbol="circle-open", marker_color="red", marker_line_width=3, marker_size=15, hoverinfo="skip", name="Currently Selected Model", ) ) # Constraint fig.add_trace(self._get_constraint_line(df)) fig.update_xaxes(gridwidth=1, gridcolor="LightGrey", zerolinecolor="LightGrey") fig.update_yaxes(gridwidth=1, gridcolor="LightGrey", zerolinecolor="LightGrey") fig.update_layout( title="Bias Mitigation Best Models Found", xaxis_title=self.fairness_metric_name, yaxis_title=self.accuracy_metric_name, legend_title="Models", legend_orientation="h" if metric_is_valid else "v", plot_bgcolor="rgba(0,0,0,0)", width=None, height=600, margin={"t": 50, "b": 50}, ) fig.show()
def _get_constraint_line(self, df: pd.DataFrame) -> Any: """ Return the Plotly Line object that represents the constraint on the figure. Arguments --------- df: pd.DataFrame DataFrame of trials that will be plotted. Used to determine the range the constraint line has to cover. Returns ------- Any: line The plotly line representing the constraint. """ x_min = min(df[self.fairness_metric_name].min(), self._fairness_base_) x_max = max(df[self.fairness_metric_name].max(), self._fairness_base_) y_min = min(df[self.accuracy_metric_name].min(), self._accuracy_base_) y_max = max(df[self.accuracy_metric_name].max(), self._accuracy_base_) if self._constraint_target == "fairness": x = [self.constraint_criterion_value_] * 2 y = [y_min, y_max] elif self._constraint_target == "accuracy": x = [x_min, x_max] y = [self.constraint_criterion_value_] * 2 if self._constraint_type == "relative": name = f"{self._constraint_value * 100:.1f}% relative {self.constrained_metric_} drop" elif self._constraint_type == "absolute": name = f"{self.constrained_metric_} value of {self.constraint_criterion_value_:.2f}" return go.Scatter( x=x, y=y, mode="lines", line=dict(color="black", width=4, dash="dash"), name=name, )
[docs] def select_model(self, model_idx: int): """ Select the multipliers to use for inference. Arguments --------- model_idx: int The index of the multipliers in `self.best_trials_` to use for inference, as displayed by `show_tradeoff`. Raises ------ GuardianAIValueError Raised when the passed model_idx is invalid. """ if model_idx < 0 or model_idx >= len(self._best_trials_detailed): # type: ignore raise GuardianAIValueError(f"Invalid `model_idx` received: {model_idx}") self.selected_multipliers_idx_ = model_idx
@property def selected_multipliers_(self): # noqa D102 if self._best_trials_detailed is None or self.selected_multipliers_idx_ is None: return None else: return self._best_trials_detailed[self._multiplier_names_].iloc[ self.selected_multipliers_idx_ ] def _warmstart_multipliers(self, X, y, groups): default_multipliers = [] # ----- Multipliers that re-create the original model ----- # good accuracy, bad disparity, good regression default_multipliers.append( { f"multiplier_{group_name}": 1.0 for group_name in self._unique_group_names_ } ) # Unless we calculate a better bound below, assume that multipliers should # never be larger than 10 max_multiplier = 10 # Get masks for filtering data by each group group_masks = {} for group, group_name in zip(self._unique_groups_, self._unique_group_names_): mask = (groups == group).all(1).to_numpy().squeeze() group_masks[group_name] = mask self._group_masks = group_masks if self.fairness_metric_name in _inhouse_metrics: # Get a rate calculator/scorer for the given constraint rate_scorer = _get_rate_scorer(self.fairness_metric_name) self._rate_scorer = rate_scorer # For some metrics, we can calculate additional defaults if self.fairness_metric_name in _inhouse_metrics and self._warmstart: # Check if metric is supported by fairlearn if self.fairness_metric_name in _automl_to_fairlearn_metric_names: # Fit the EO method as implemented in fairlearn adjusted_model = ThresholdOptimizer( estimator=self._base_estimator, constraints=_automl_to_fairlearn_metric_names[ self.fairness_metric_name ], objective=( "accuracy_score" if self.accuracy_metric_name == "accuracy" else "balanced_accuracy_score" ), prefit=True, ) adjusted_model.fit(X, y, sensitive_features=groups) # Estimate the target rate value for the given rate adjusted_predictions = adjusted_model.predict( X, sensitive_features=groups ) target_rate = rate_scorer(y, adjusted_predictions) # Find multipliers that produce the corresponding target rate as closely as possible for each group multipliers_eo = self._find_multipliers_for_rate( y, target_rate, rate_scorer, group_masks, ) default_multipliers.append(multipliers_eo) # Do we want to minimize or maximize the given outcome rate? if self.fairness_metric_name in _positive_fairness_names: best = np.max else: best = np.min # Calculate the outcome rates for each group predictions = self._probas_predicted.argmax(-1) rates_by_group = [] for group_name in self._unique_group_names_: mask = group_masks[group_name] rates_by_group.append(rate_scorer(y[mask], predictions[mask])) # Find the group with the best outcome rate target_rate = best(rates_by_group) # Find multipliers that produce the corresponding target rate as closely as possible for each group multipliers_or = self._find_multipliers_for_rate( y, target_rate, rate_scorer, group_masks, ) default_multipliers.append(multipliers_or) max_multiplier = max( [m if m > 1 else 1 / m for m in multipliers_or.values()] ) # Allow to increase/decrease probabilities by a factor of at most 10, # unless larger is required to allow the trivial solution above. max_multiplier = max(max_multiplier, 10) return default_multipliers, max_multiplier def _find_multipliers_for_rate(self, y, target_rate, rate_scorer, group_masks): multipliers = {} for group, group_name, multiplier_name in zip( self._unique_groups_, self._unique_group_names_, self._multiplier_names_ ): # Get only the data for this group mask = group_masks[group_name] # Unique values of up to grid_resolution quantiles # Always include 0.5 -- the default threshold thresholds = np.unique( list( np.quantile( self._probas_predicted[mask], np.linspace(0, 1, self._warmstart_grid_resolution + 1), method="nearest", ) ) + [0.5] ) # Ensure the thresholds are in (0, 1) (exclusive) thresholds = thresholds[np.logical_and(thresholds > 0, thresholds < 1)] # Calcuate rate for each threshold rates = [ rate_scorer( y[mask], self._probas_predicted[mask][:, self._favorable_label_idx] > threshold, ) for threshold in thresholds ] # Find threshold with closest rate to target rate, break ties towards # thresholds of 0.5 -- i.e., doing nothing ideal_index = np.argmin( (rates - target_rate) ** 2 + np.abs(thresholds - 0.5) * 10**-5 ) ideal_threshold = thresholds[ideal_index] # Closed form solution to convert thresholds to multipliers multipliers[multiplier_name] = (1 - ideal_threshold) / ideal_threshold return multipliers
def _apply_multiplier( probas: np.ndarray, groups: pd.DataFrame, unique_groups: np.ndarray, unique_group_names: pd.DataFrame, multipliers: pd.DataFrame, majority_class_idx: int, ) -> np.ndarray: """ Apply multipliers over an input probas array. Arguments --------- probas: np.ndarray The class probabilities on which to apply multipliers. groups: pd.DataFrame DataFrame representing the different protected attributes' values for every sample. unique_groups: np.ndarray Array of all possible unique groups. unique_group_names: pd.DataFrame Name corresponding to a unique group. Used to map a group to its corresponding multiplier in ``multipliers``. multipliers: pd.DataFrame DataFrame mapping a group name to its multiplying scalar. majority_class_idx: int Which class label in ``probas`` to apply the multiplier on. Returns ------- probas_multiplied: np.ndarray The multiplier class probabilities """ for group, group_name in zip(unique_groups, unique_group_names): mask = (groups == group).all(1).to_numpy().squeeze() multiplier = multipliers[f"multiplier_{group_name}"] probas[:, majority_class_idx][mask] *= multiplier return probas / probas.sum(-1, keepdims=True) class _PredictionReturner: _estimator_type = "classifier" classes_ = np.array([0, 1]) def predict(self, y_pred): self.classes_ = np.unique(y_pred) return y_pred def predict_proba(self, y_pred): self.classes_ = np.arange(y_pred.shape[1]) return y_pred class _PredictionScorer: def __init__(self, scorer): self.scorer = scorer self.dummy_model = _PredictionReturner() def __call__(self, y_true, y_pred): return self.scorer(self.dummy_model, y_pred, y_true)