Source code for teras._src.preprocessing.data_transformers.ctgan

from keras import ops
from keras import random
from sklearn.preprocessing import OneHotEncoder
from sklearn.mixture import BayesianGaussianMixture
import pandas as pd
from teras._src.preprocessing.data_transformers.data_transformer import DataTransformer
from teras._src.typing import FeaturesNamesType
from teras._src.decorators import assert_fitted
import concurrent.futures
import numpy as np
import json
import pickle


def continuous_feature_transformer(args):
    x, feature_name, weight_threshold, bgm_kwargs = args
    if isinstance(x, pd.DataFrame):
        feature = x[feature_name].values.reshape(-1, 1)
    elif isinstance(x, np.ndarray):
        feature = x[:, feature_name].reshape(-1, 1)
    else:
        raise ValueError(
            f"`x` must be either a pandas DataFrame or numpy ndarray. "
            f"{type(x)} was given.")

    feature_metadata = {}
    feature_metadata["name"] = feature_name
    bay_guass_mix = BayesianGaussianMixture(**bgm_kwargs)
    bay_guass_mix.fit(feature)
    # The authors use a weight threshold to filter out components in their
    # implementation.
    # For consistency's sake, we're going to use this idea but with slight
    # modification.
    # Reference to the official implementation:
    # https://github.com/sdv-dev/CTGAN
    valid_clusters_indicator = bay_guass_mix.weights_ > weight_threshold
    # Compute probability of coming from each cluster for each value in the
    # given (continuous) feature.
    clusters_probs = bay_guass_mix.predict_proba(feature)
    # Filter out the "invalid" clusters
    clusters_probs = clusters_probs[:, valid_clusters_indicator]
    clusters_probs[np.isnan(clusters_probs)] = 1e-10
    # Normalize probabilities to sum up to 1 for each row
    clusters_probs /= np.sum(clusters_probs, axis=1, keepdims=True)
    selected_clusters_indices = np.apply_along_axis(
        lambda cpa: np.random.choice(np.arange(len(cpa)),
                                     replace=False, p=cpa),
        axis=1,
        arr=clusters_probs)
    # To create one-hot component, we'll store the selected clusters indices
    # and the number of valid clusters
    num_valid_clusters = sum(valid_clusters_indicator)
    feature_metadata[
        'selected_clusters_indices'] = selected_clusters_indices
    feature_metadata['num_valid_clusters'] = num_valid_clusters
    # Use the selected clusters to normalize the values.
    # To normalize, we need the means and standard deviations
    # Means
    feature_metadata['clusters_means'] = bay_guass_mix.means_.squeeze()[
        valid_clusters_indicator]
    # Standard Deviations
    feature_metadata['clusters_stds'] = np.sqrt(
        bay_guass_mix.covariances_).squeeze()[valid_clusters_indicator]
    return feature_metadata


class ModeSpecificNormalization:
    """
    Mode Specific Normalization as proposed by
    Lei Xu et al. in the paper,
    "Modeling Tabular data using Conditional GAN".

    Reference(s):
        https://arxiv.org/abs/1907.00503

    Args:
        continuous_features: list, List of continuous features names.
            In the case of ndarray, pass a list of continuous column indices
        max_clusters: int, Maximum clusters. Defaults to 10.
        std_multiplier: int, Multiplies the standard deviation in the
            normalization. Defaults to 4 as proposed in the paper.
        weight_threshold: float, Taken from the official implementation.
            The minimum value a component weight can take to be considered
            a valid component. `weights_` under this value will be ignored.
            Defaults to 0.005
        covariance_type: str, Parameter for the `GaussianMixtureModel`
            class of sklearn
        weight_concentration_prior_type: str, Parameter for the
            `GaussianMixtureModel` class of sklearn. Defaults to
            "dirichlet_process".
        weight_concentration_prior: float, Parameter for the
        GaussianMixtureModel class of sklearn. Defaults to 0.001.
    """
    def __init__(self,
                 continuous_features: FeaturesNamesType,
                 max_clusters: int = 10,
                 std_multiplier: int = 4,
                 weight_threshold: float = 0.005,
                 covariance_type: str = "full",
                 weight_concentration_prior_type: str = "dirichlet_process",
                 weight_concentration_prior: float = 0.001):
        self.continuous_features = continuous_features
        self.max_clusters = max_clusters
        self.std_multiplier = std_multiplier
        self.weight_threshold = weight_threshold
        self.covariance_type = covariance_type
        self.weight_concentration_prior_type = weight_concentration_prior_type
        self.weight_concentration_prior = weight_concentration_prior

        # Features meta-data dictionary will contain all the information
        # about a feature such as selected clusters indices, number of
        # valid clusters, clusters means & stds.
        # 1. `Clusters indices` will be used in the transform method
        #       to create the one hot vector B(i,j) where B stands for
        #       Beta, for each value c(i,j) where c(i,j) is the jth
        #       value in the ith continuous feature as proposed in the
        #       paper in the steps to apply mode specific normalization
        #       method on page 3-4.
        # 2. `Number of valid clusters` will be used in the transform
        #       method when one-hotting
        # 3. `means` and `standard deviations` will be used in transform
        #       step to normalize the value c(i,j) to create a(i,j) where
        #       a stands for alpha.
        self._metadata = {}

        # self.bay_guass_mix = BayesianGaussianMixture(
        #     n_components=self.max_clusters,
        #     covariance_type=self.covariance_type,
        #     weight_concentration_prior_type=self.weight_concentration_prior_type,
        #     weight_concentration_prior=self.weight_concentration_prior)
        self._bgm_kwargs = dict(
            n_components=self.max_clusters,
            covariance_type=self.covariance_type,
            weight_concentration_prior_type=self.weight_concentration_prior_type,
            weight_concentration_prior=self.weight_concentration_prior,
        )

        self._fitted = False

    @property
    @assert_fitted
    def metadata(self):
        return self._metadata

    def fit(self, x):
        """
        Args:
            x: A pandas DataFrame or numpy ndarray
        """
        relative_indices_all = []
        num_valid_clusters_all = []
        relative_index = 0

        feat_args = [(x, feature_name, self.weight_threshold,
                      self._bgm_kwargs)
                     for feature_name in self.continuous_features]
        with concurrent.futures.ProcessPoolExecutor() as executor:
            results = executor.map(continuous_feature_transformer,
                                   feat_args)
            for r in results:
                _name = r["name"]
                relative_indices_all.append(relative_index)
                relative_index += (1 + r["num_valid_clusters"])
                num_valid_clusters_all.append(r["num_valid_clusters"])
                del r["name"]
                self._metadata[_name] = r
        self._metadata["relative_indices_all"] = np.array(relative_indices_all)
        self._metadata["num_valid_clusters_all"] = num_valid_clusters_all
        self._fitted = True

    @assert_fitted
    def transform(self, x):
        """
        Args:
            x: A pandas DataFrame
        Returns:
            A numpy ndarray of transformed continuous data
        """
        # Contain the normalized continuous features
        x_cont_normalized = []
        for feature_name in self.continuous_features:
            selected_clusters_indices = self._metadata[feature_name][
                'selected_clusters_indices']
            num_valid_clusters = self._metadata[feature_name][
                'num_valid_clusters']
            # One hot components for all values in the feature.
            # We borrow the beta notation from the paper for clarity and
            # understanding's sake.
            betas = np.eye(num_valid_clusters)[selected_clusters_indices]

            # Normalizing
            means = self._metadata[feature_name]['clusters_means']
            stds = self._metadata[feature_name]['clusters_stds']
            if isinstance(x, pd.DataFrame):
                feature = x[feature_name].values
            elif isinstance(x, np.ndarray):
                feature = x[:, feature_name]
            else:
                raise ValueError(f"`x` must be either a pandas DataFrame "
                                 f"or numpy ndarray. {type(x)} was given.")
            means = means[selected_clusters_indices]
            stds = stds[selected_clusters_indices]
            alphas = (feature - means) / (self.std_multiplier * stds)
            alphas = ops.expand_dims(alphas, 1)

            normalized_feature = ops.concatenate([alphas, betas], axis=1)
            x_cont_normalized.append(normalized_feature)

        x_cont_normalized = ops.concatenate(x_cont_normalized, axis=1)
        return x_cont_normalized

    def fit_transform(self, x):
        """
        Fits and transforms x.
        Returns:
            A normalized copy of x.
        """
        self.fit(x)
        return self.transform(x)

    @assert_fitted
    def reverse_transform(self, x_normalized):
        """
        Args:
            x: DataFrame or n-dimensional numpy array
        Returns:
            Convert the normalized features to original values,
            effectively reversing the transformation
        """
        x = x_normalized.copy()
        for feature_name in self.continuous_features:
            means = self._metadata[feature_name]['clusters_means']
            stds = self._metadata[feature_name]['clusters_stds']
            if isinstance(x_normalized, pd.DataFrame):
                normalized_feature = x_normalized[feature_name].values
                x[feature_name] = normalized_feature * (self.std_multiplier
                                                        * stds) + means
            else:
                raise ValueError(
                    f"`x_normalized` must be either a pandas DataFrame"
                    f"Received {type(x_normalized)}")
        return x

    @classmethod
    def load_from_preset(cls, metadata, **kwargs):
        c = cls(**kwargs)
        c._metadata = metadata
        return c


[docs] class CTGANDataTransformer(DataTransformer): """ Data Transformation class based on the data transformation in the official CTGAN paper and implementation. Reference(s): https://arxiv.org/abs/1907.00503 https://github.com/sdv-dev/CTGAN/ Args: categorical_features: list, List of categorical features names in the dataset. continuous_features: list, List of continuous features names in the dataset. max_clusters: int, Maximum Number of clusters to use in `ModeSpecificNormalization`. Defaults to 10. std_multiplier: int, Multiplies the standard deviation in the normalization. Defaults to 4. weight_threshold: float, The minimum value a component weight can take to be considered a valid component. `weights_` under this value will be ignored. (Taken from the official implementation.) Defaults to 0.005. covariance_type: str, Parameter for the `GaussianMixtureModel` class of sklearn. Defaults to "full". weight_concentration_prior_type: str, Parameter for the `GaussianMixtureModel` class of sklearn. Defaults to "dirichlet_process" weight_concentration_prior: float, Parameter for the `GaussianMixtureModel` class of sklearn. Defaults to 0.001. """
[docs] def __init__(self, continuous_features: FeaturesNamesType = None, categorical_features: FeaturesNamesType = None, max_clusters: int = 10, std_multiplier: int = 4, weight_threshold: float = 0.005, covariance_type: str = "full", weight_concentration_prior_type: str = "dirichlet_process", weight_concentration_prior: float = 0.001 ): super().__init__() self.continuous_features = continuous_features if continuous_features else [] self.categorical_features = categorical_features if categorical_features else [] self.max_clusters = max_clusters self.std_multiplier = std_multiplier self.weight_threshold = weight_threshold self.covariance_type = covariance_type self.weight_concentration_prior_type = weight_concentration_prior_type self.weight_concentration_prior = weight_concentration_prior self._num_categorical_features = len(categorical_features) self._num_continuous_features = len(continuous_features) self._metadata["num_categorical"] = self._num_categorical_features self._metadata["num_continuous"] = self._num_continuous_features self.mode_specific_normalizer = None if self._num_continuous_features > 0: self.mode_specific_normalizer = ModeSpecificNormalization( continuous_features=self.continuous_features, max_clusters=self.max_clusters, std_multiplier=self.std_multiplier, weight_threshold=self.weight_threshold, covariance_type=self.covariance_type, weight_concentration_prior_type=self.weight_concentration_prior_type, weight_concentration_prior=self.weight_concentration_prior) self._one_hot_enc = OneHotEncoder() self._fitted = False
def fit(self, x): if not isinstance(x, pd.DataFrame): raise ValueError( f"`x` must be a pandas dataframe. Received {type(x)}") self._fit_continuous(x[self.continuous_features]) self._fit_categorical(x[self.categorical_features]) self._fitted = True def _fit_continuous(self, x_cont): self.mode_specific_normalizer.fit(x_cont) self._metadata["continuous"] = self.mode_specific_normalizer.metadata def _transform_continuous(self, x_cont): return self.mode_specific_normalizer.transform(x_cont) def _fit_categorical(self, x_cat): # To speedup computation of conditional vector down the road, # we assign a relative index to each feature. For instance, # Given three categorical columns Gender(2 categories), # City (4 categories) and EconomicClass (5 categories) # Relative indexes will be calculated as below: # gender_relative_index: 0 # city_relative_index: gender_relative_index + num_categories_in_gender => 0 + 2 = 2 # economic_class_relative_index: city_relative_index + num_categories_in_city => 2 + 4 = 6 categorical_metadata = dict() # NOTE: The purpose of using these lists is that, this way we'll later # be able to access metadata for multiple features at once using # their indices rather than names which would've been required in # case of a dict and would have been less efficient? relative_indices_all = [] num_categories_all = [] # For every feature we'll compute the probabilities over the range of # values based on their freqs and then append that probabilities # array to the following mother array categories_probs_all = [] # A nested list where each element corresponds to the list of # categories in the feature categories_all = [] feature_relative_index = 0 for feature_name in self.categorical_features: num_categories = x_cat[feature_name].nunique() num_categories_all.append(num_categories) relative_indices_all.append(feature_relative_index) feature_relative_index += num_categories log_freqs = x_cat[feature_name].value_counts().apply(ops.log) categories_probs_dict = log_freqs.to_dict() # To overcome the floating point precision issue which causes # probabilities to not sum up to 1 and resultantly causes error # in ops.random.choice method, we round the probabilities to 7 # decimal points probs = ops.round(ops.array(list(categories_probs_dict.values())), decimals=7) # Normalizing so all probs sum up to 1 probs = probs / ops.sum(probs) categories, categories_probs = list(categories_probs_dict.keys()), probs categories_probs_all.append(categories_probs) categories_all.append(categories) categorical_metadata["total_num_categories"] = sum(num_categories_all) categorical_metadata["num_categories_all"] = num_categories_all categorical_metadata["relative_indices_all"] = ops.array( relative_indices_all) categorical_metadata["categories_probs_all"] = categories_probs_all categorical_metadata["categories_all"] = categories_all self._metadata["categorical"] = categorical_metadata self._one_hot_enc.fit(x_cat) def _transform_categorical(self, x_cat): return self._one_hot_enc.transform(x_cat) @assert_fitted def transform(self, x): total_transformed_features = 0 x_continuous, x_categorical = None, None if self._num_continuous_features > 0: x_continuous = self._transform_continuous( x[self.continuous_features]) total_transformed_features += ( self._metadata["continuous"]["relative_indices_all"][-1] + self._metadata["continuous"]["num_valid_clusters_all"][-1]) if self._num_categorical_features > 0: x_categorical = self._transform_categorical( x[self.categorical_features]) total_transformed_features += ( self._metadata["categorical"]["relative_indices_all"][-1] + self._metadata["categorical"]["num_categories_all"][-1] + 1) # since we concatenate the categorical features AFTER the continuous # alphas and betas so we'll create an overall relative indices array # where we offset the relative indices of the categorical features by # the total number of continuous features components relative_indices_all = [] offset = 0 if x_continuous is not None: cont_relative_indices = self._metadata["continuous"]["relative_indices_all"] relative_indices_all.extend(cont_relative_indices) offset = cont_relative_indices[-1] + self._metadata["continuous"]["num_valid_clusters_all"][-1] if x_categorical is not None: # +1 since the categorical relative indices start at 0 relative_indices_all.extend(self._metadata["categorical"]["relative_indices_all"] + 1 + offset) self._metadata["relative_indices_all"] = relative_indices_all self._metadata["total_transformed_features"] = total_transformed_features x_transformed = ops.concatenate([x_continuous, x_categorical.toarray()], axis=1) return x_transformed def reverse_transform(self, x_generated): """ Reverses transforms the generated data to the original data format. Args: x_generated: Generated dataset. Returns: Generated data in the original data format. """ all_features = self.continuous_features + self.categorical_features if self._num_continuous_features > 0: num_valid_clusters_all = self._metadata["continuous"]["num_valid_clusters_all"] if self._num_categorical_features > 0: num_categories_all = self._metadata["categorical"]["num_categories_all"] data = {} cat_index = 0 # categorical index cont_index = 0 # continuous index for index, feature_name in enumerate(all_features): # the first n features are continuous if index < len(self.continuous_features): alphas = x_generated[:, index] betas = x_generated[:, index + 1 : index + 1 + num_valid_clusters_all[cont_index]] # Recall that betas represent the one hot encoded form of the # cluster number cluster_indices = ops.argmax(betas, axis=1) # List of cluster means for a feature. contains one value per # cluster means = self._metadata["continuous"][feature_name]["clusters_means"] # Since each individual element within the cluster is associated # with one of the cluster's mean. We use the # `cluster_indices` to get a list of size len(x) where each # element is a mean for the corresponding element in the feature means = means[cluster_indices] # Do the same for stds stds = self._metadata["continuous"][feature_name]["clusters_stds"] stds = stds[cluster_indices] feature = alphas * (self.std_multiplier * stds) + means data[feature_name] = feature cont_index += 1 else: # if the index is greater than or equal to # len(continuous_features), then the column at hand is # categorical raw_feature_parts = x_generated[:, index: index + num_categories_all[cat_index]] categories = self._one_hot_enc.categories_[cat_index] categories_indices = ops.argmax(raw_feature_parts, axis=1) feature = categories[categories_indices] data[feature_name] = feature cat_index += 1 return pd.DataFrame(data) def save(self, filename): """ Saves the fitted state of `CTGANDataTransformer` instance for portability, in the `json` format. It also saves a binary file with same name (excluding the file extension ofcourse.) Args: filename: Filename or file path ending in `.json` extension. """ args = { "categorical_features": self.categorical_features, "continuous_features": self.continuous_features, "max_clusters": self.max_clusters, "std_multiplier": self.std_multiplier, "weight_threshold": self.weight_threshold, "covariance_type": self.covariance_type, "weight_concentration_prior_type": self.weight_concentration_prior_type, "weight_concentration_prior": self.weight_concentration_prior, } attrs = { "_metadata": self._metadata, "_fitted": self._fitted } state = { "args": args, "attrs": attrs, } with open(filename, "w") as f: json.dump(state, f) encoder_filename = str(filename).replace(".json", "_encoder.pkl") with open(encoder_filename, "wb") as f: pickle.dump(self._one_hot_enc) @classmethod def load(cls, filename): """ Loads the saved state of `CTGANDataTransformer` from the `json` file. It also loads the pickled `OneHotEncoder` instance. Args: filename: Filename or file path ending in `.json` extension. Returns: An instance of `CTGANDataTransformer` with state stored in the `filename` json file. """ with open(filename, "r") as f: state = json.load(f) c = cls(**state.pop("params")) for name, value in state.pop("attrs"): c.__setattr__(name, value) encoder_filename = str(filename).replace(".json", "_encoder.pkl") with open(encoder_filename, "rb") as f: c._one_hot_enc = pickle.load(f) return c