Source code for guardian_ai.privacy_estimation.attack_runner

#!/usr/bin/env python
# -*- coding: utf-8 -*--

# Copyright (c) 2023, 2024 Oracle and/or its affiliates.
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/

from typing import Dict, List

from sklearn.linear_model import LogisticRegression

from guardian_ai.privacy_estimation.attack import (
    AttackType,
    ConfidenceBasedBlackBoxAttack,
    ExpectedConfidenceBasedBlackBoxAttack,
    ExpectedLossBasedBlackBoxAttack,
    LossBasedBlackBoxAttack,
    ThresholdClassifier,
)
from guardian_ai.privacy_estimation.combined_attacks import (
    CombinedBlackBoxAttack,
    CombinedWithMerlinBlackBoxAttack,
)
from guardian_ai.privacy_estimation.dataset import (
    AttackModelData,
    ClassificationDataset,
    TargetModelData,
)
from guardian_ai.privacy_estimation.merlin_attack import MerlinAttack
from guardian_ai.privacy_estimation.model import TargetModel
from guardian_ai.privacy_estimation.morgan_attack import MorganAttack, MorganClassifier


[docs] class AttackRunner: """ Class that can run the specified attacks against specified target models using the given dataset """ def __init__( self, dataset: ClassificationDataset, target_models: List[TargetModel], attacks: List[AttackType], threshold_grids, ): """ Initialize AttackRunner. Parameters ---------- dataset: ClassificationDataset Dataset that has been split and prepared for running the attacks target_models: List[TargetModel] Target models to run the attacks against attacks: Dict[str:List[float]], List of attacks to run. Use the pattern AttackType.LossBasedBlackBoxAttack.name Returns ------- AttackRunner """ self.dataset = dataset assert self.dataset.target_model_data is not None assert self.dataset.attack_model_data is not None self.target_models = target_models self.attacks = attacks self.threshold_grids = threshold_grids self.target_model_result_strings = {} self.attack_cache = {} def train_target_models(self): for target_model in self.target_models: print("Target Model: " + target_model.get_model_name()) target_model_data: TargetModelData = self.dataset.target_model_data classifier = target_model.train_model( target_model_data.X_target_train, target_model_data.y_target_train ) print("Target Model Train Evaluation: ") target_model.test_model( target_model_data.X_target_train, target_model_data.y_target_train ) train_f1 = target_model.get_f1( target_model_data.X_target_train, target_model_data.y_target_train ) print("Target Model Test Evaluation: ") target_model.test_model( target_model_data.X_target_test, target_model_data.y_target_test ) test_f1 = target_model.get_f1( target_model_data.X_target_test, target_model_data.y_target_test ) result_string = ( target_model.get_model_name() + "\t" + str(train_f1) + "\t" + str(test_f1) ) self.target_model_result_strings[target_model.get_model_name()] = result_string def _get_attack_object( self, attack_type: AttackType, target_model: TargetModel, # need this for Morgan Attack use_cache: bool = False, ): """ Instantiate the attack object of the specified attack_type. Some complex attack types may require training simpler attacks first if they have not been cached. Parameters ---------- attack_type: AttackType Type of the attack to instantiate target_model: TargetModel Target model is required to train simpler attacks as needed use_cache: bool Use attacks previously cached Returns ------- Attack Attack object """ attack = None if attack_type == AttackType.LossBasedBlackBoxAttack: attack = LossBasedBlackBoxAttack(ThresholdClassifier()) elif attack_type == AttackType.ExpectedLossBasedBlackBoxAttack: attack = ExpectedLossBasedBlackBoxAttack(LogisticRegression()) elif attack_type == AttackType.ConfidenceBasedBlackBoxAttack: attack = ConfidenceBasedBlackBoxAttack(ThresholdClassifier()) elif attack_type == AttackType.ExpectedConfidenceBasedBlackBoxAttack: attack = ExpectedConfidenceBasedBlackBoxAttack(LogisticRegression()) elif attack_type == AttackType.MerlinAttack: attack = MerlinAttack(ThresholdClassifier()) elif attack_type == AttackType.CombinedBlackBoxAttack: if use_cache: loss_attack = self.attack_cache[AttackType.LossBasedBlackBoxAttack] confidence_attack = self.attack_cache[AttackType.ConfidenceBasedBlackBoxAttack] attack = CombinedBlackBoxAttack( LogisticRegression(), loss_attack=loss_attack, confidence_attack=confidence_attack, ) else: attack = CombinedBlackBoxAttack(LogisticRegression()) elif attack_type == AttackType.CombinedWithMerlinBlackBoxAttack: if use_cache: loss_attack = self.attack_cache[AttackType.LossBasedBlackBoxAttack] confidence_attack = self.attack_cache[AttackType.ConfidenceBasedBlackBoxAttack] merlin_attack = self.attack_cache[AttackType.MerlinAttack] attack = CombinedWithMerlinBlackBoxAttack( LogisticRegression(), loss_attack=loss_attack, confidence_attack=confidence_attack, merlin_attack=merlin_attack, ) else: merlin_attack = MerlinAttack(ThresholdClassifier()) """ Note that we don't need to train the Merlin attack for this to work. We just need the noise parameters etc. from Merlin attack to calculate the ratio """ attack = CombinedWithMerlinBlackBoxAttack( LogisticRegression(), merlin_attack=merlin_attack ) elif attack_type == AttackType.MorganAttack: if use_cache: loss_attack = self.attack_cache[AttackType.LossBasedBlackBoxAttack] merlin_attack = self.attack_cache[AttackType.MerlinAttack] else: attack_model_data = self.dataset.attack_model_data # tune the loss-based attack and get the lower loss based threshold loss_attack = LossBasedBlackBoxAttack(ThresholdClassifier()) loss_attack.train_attack_model( target_model, attack_model_data.X_attack_train, attack_model_data.y_attack_train, attack_model_data.y_membership_train, self.threshold_grids[AttackType.LossBasedBlackBoxAttack.name], ) # Similarly, train Merlin attack too merlin_attack = MerlinAttack(ThresholdClassifier()) merlin_attack.train_attack_model( target_model, attack_model_data.X_attack_train, attack_model_data.y_attack_train, attack_model_data.y_membership_train, self.threshold_grids[AttackType.MerlinAttack.name], ) # careful, don't just cache the inputs here, because you'll also need to cache the test set by running eval. Might be better to just use fresh values. loss_lower_threshold = loss_attack.attack_model.threshold merlin_threshold = merlin_attack.attack_model.threshold attack = MorganAttack( MorganClassifier( loss_lower_threshold=loss_lower_threshold, merlin_threshold=merlin_threshold, ), loss_attack=loss_attack, merlin_attack=merlin_attack, ) else: raise Exception("This attack type is not supported.") return attack
[docs] def run_attack( self, target_model: TargetModel, attack_type: AttackType, metric_functions: List[str], print_roc_curve: bool = False, cache_input: bool = False, ): """ Instantiate the specified attack, trains and evaluates it, and prints out the result of the attack to an output result file, if provided. Parameters ---------- target_model: TargetModel Target model being attacked. attack_type: AttackType Type of the attack to run metric_functions: List[str] List of metric functions that we care about for evaluating the success of these attacks. Supports all sklearn.metrics that are relevant to binary classification, since the attack model is almost always a binary classifier. print_roc_curve: bool Print out the values of the tpr and fpr. Only works for trained attack classifiers for now. ache_input: bool Should we cache the input values - useful for expensive feature calculations like the merlin ratio. Returns ------- str Result string """ # figure out if we can use any of the previously cached values loss_exists = AttackType.LossBasedBlackBoxAttack in self.attack_cache.keys() confidence_exists = AttackType.ConfidenceBasedBlackBoxAttack in self.attack_cache.keys() merlin_ratio_exists = AttackType.MerlinAttack in self.attack_cache.keys() use_cache = False if attack_type == AttackType.MorganAttack: use_cache = loss_exists and merlin_ratio_exists if attack_type == AttackType.CombinedBlackBoxAttack: use_cache = loss_exists and confidence_exists if attack_type == AttackType.CombinedWithMerlinBlackBoxAttack: use_cache = loss_exists and confidence_exists and merlin_ratio_exists # Now, get the attack object attack = self._get_attack_object(attack_type, target_model, use_cache) # And, get the data needed to run the attack attack_model_data: AttackModelData = self.dataset.attack_model_data # train the attack attack.train_attack_model( target_model, attack_model_data.X_attack_train, attack_model_data.y_attack_train, attack_model_data.y_membership_train, threshold_grid=self.threshold_grids.get(attack.name, None), cache_input=cache_input, use_cache=use_cache, ) if cache_input: # then cache the full attack self.attack_cache[attack.name] = attack # Evaluate the attack print("Running " + attack.name + " against target model " + target_model.get_model_name()) print("Attack Metrics:") attack_metrics = attack.evaluate_attack( target_model, attack_model_data.X_attack_test, attack_model_data.y_attack_test, attack_model_data.y_membership_test, metric_functions, print_roc_curve=print_roc_curve, cache_input=cache_input, ) # Prepare the result string result_str = attack.name for i in range(len(attack_metrics)): result_str = result_str + "\t" + str(attack_metrics[i]) result_str = result_str + "\n" print(result_str) return result_str