diff --git a/docs/advanced/numerical-embeddings.md b/docs/advanced/numerical-embeddings.md index 5ec9263..e791ba7 100644 --- a/docs/advanced/numerical-embeddings.md +++ b/docs/advanced/numerical-embeddings.md @@ -171,6 +171,63 @@ The `NumericalEmbedding` layer processes each numerical feature through two para - Adaptively weights continuous vs. discrete representations - Learns optimal combination per feature and dimension +### Periodic Embeddings (`PeriodicEmbedding`) + +The `PeriodicEmbedding` layer uses trigonometric functions to capture cyclical patterns: + +1. **Frequency Learning**: + - Learns optimal frequencies for each feature + - Supports multiple initialization strategies (uniform, log-uniform, constant) + - Frequencies are constrained to be positive + +2. **Periodic Transformation**: + - Applies sin/cos transformations: `sin(freq * x)` and `cos(freq * x)` + - Captures cyclical patterns and smooth, differentiable representations + - Particularly effective for features with natural periodicity + +3. **Post-Processing**: + - Optional MLP for further feature transformation + - Residual connections for stability + - Batch normalization and dropout for regularization + +### PLE Embeddings (`PLEEmbedding`) + +The `Parameterized Linear Expansion` layer provides learnable piecewise linear transformations: + +1. **Segment Learning**: + - Learns optimal segment boundaries for each feature + - Supports uniform and quantile-based initialization + - Each segment has learnable slope and intercept + +2. **Piecewise Linear Transformation**: + - Applies different linear transformations to different input ranges + - Captures complex non-linear patterns through piecewise approximation + - Supports various activation functions (ReLU, Sigmoid, Tanh) + +3. **Flexible Architecture**: + - Configurable number of segments for precision vs. efficiency trade-off + - Optional MLP and residual connections + - Batch normalization and dropout for regularization + +### Advanced Combined Embeddings (`AdvancedNumericalEmbedding`) + +The `AdvancedNumericalEmbedding` layer combines multiple embedding approaches: + +1. **Multi-Modal Processing**: + - Supports any combination of periodic, PLE, and dual-branch embeddings + - Learnable gates to combine different embedding types + - Adaptive weighting per feature and dimension + +2. **Flexible Configuration**: + - Choose from `['periodic', 'ple', 'dual_branch']` embedding types + - Configure each embedding type independently + - Enable/disable gating mechanism + +3. **Optimal Performance**: + - Empirically closes the gap between MLPs/Transformers and tree-based baselines + - Particularly effective on tabular tasks + - Maintains interpretability while improving performance + ``` Input value ┌────────┐ ┌────────┐ @@ -220,6 +277,35 @@ This approach is ideal for: | `numerical_dropout_rate` | float | 0.1 | Dropout rate for regularization | | `numerical_use_batch_norm` | bool | True | Apply batch normalization | +### Periodic Embeddings + +| Parameter | Type | Default | Description | +|-----------|------|---------|-------------| +| `use_periodic_embedding` | bool | False | Enable periodic embeddings | +| `num_frequencies` | int | 4 | Number of frequency components | +| `frequency_init` | str | "log_uniform" | Frequency initialization method | +| `min_frequency` | float | 1e-4 | Minimum frequency for initialization | +| `max_frequency` | float | 1e2 | Maximum frequency for initialization | +| `use_residual` | bool | True | Use residual connections | + +### PLE Embeddings + +| Parameter | Type | Default | Description | +|-----------|------|---------|-------------| +| `use_ple_embedding` | bool | False | Enable PLE embeddings | +| `num_segments` | int | 8 | Number of linear segments | +| `segment_init` | str | "uniform" | Segment initialization method | +| `ple_activation` | str | "relu" | Activation function for PLE | +| `use_residual` | bool | True | Use residual connections | + +### Advanced Combined Embeddings + +| Parameter | Type | Default | Description | +|-----------|------|---------|-------------| +| `use_advanced_combined_embedding` | bool | False | Enable combined embeddings | +| `embedding_types` | list | ["dual_branch"] | List of embedding types to use | +| `use_gating` | bool | True | Use learnable gates to combine embeddings | + ### Global Embeddings | Parameter | Type | Default | Description | @@ -267,24 +353,23 @@ features_specs = { name="income", feature_type=FeatureType.FLOAT_RESCALED, use_embedding=True, + embedding_type="periodic", # Use periodic embedding for income embedding_dim=8, - num_bins=15, - init_min=0, - init_max=1000000 + num_frequencies=4 ), "debt_ratio": NumericalFeature( name="debt_ratio", feature_type=FeatureType.FLOAT_NORMALIZED, use_embedding=True, + embedding_type="ple", # Use PLE for debt ratio embedding_dim=4, - num_bins=8, - init_min=0, - init_max=1 # Ratio typically between 0-1 + num_segments=8 ), "credit_score": NumericalFeature( name="credit_score", feature_type=FeatureType.FLOAT_NORMALIZED, use_embedding=True, + embedding_type="dual_branch", # Traditional dual-branch embedding_dim=6, num_bins=10, init_min=300, @@ -294,21 +379,160 @@ features_specs = { name="payment_history", feature_type=FeatureType.FLOAT_NORMALIZED, use_embedding=True, + embedding_type="combined", # Combined approach embedding_dim=8, - num_bins=5, - init_min=0, - init_max=1 # Simplified score between 0-1 + num_frequencies=4, + num_segments=8 ) } -# Create preprocessing model +# Create preprocessing model with advanced embeddings preprocessor = PreprocessingModel( path_data="data/financial_data.csv", features_specs=features_specs, - use_numerical_embedding=True, - numerical_mlp_hidden_units=16, - numerical_dropout_rate=0.2, # Higher dropout for financial data - numerical_use_batch_norm=True + use_advanced_numerical_embedding=True, + use_periodic_embedding=True, + use_ple_embedding=True, + use_advanced_combined_embedding=True, + embedding_dim=8, + num_frequencies=4, + num_segments=8, + dropout_rate=0.2, # Higher dropout for financial data + use_batch_norm=True +) +``` + +### Healthcare Patient Analysis with Periodic Embeddings + +```python +from kdp import PreprocessingModel +from kdp.features import NumericalFeature +from kdp.enums import FeatureType + +# Define patient features with periodic embeddings for cyclical patterns +features_specs = { + "age": NumericalFeature( + name="age", + feature_type=FeatureType.FLOAT_NORMALIZED, + use_embedding=True, + embedding_type="periodic", + embedding_dim=8, + num_frequencies=6, # More frequencies for age patterns + kwargs={ + "frequency_init": "constant", + "min_frequency": 1e-3, + "max_frequency": 1e2 + } + ), + "bmi": NumericalFeature( + name="bmi", + feature_type=FeatureType.FLOAT_NORMALIZED, + use_embedding=True, + embedding_type="ple", + embedding_dim=6, + num_segments=12, # More segments for BMI precision + kwargs={ + "segment_init": "uniform", + "ple_activation": "relu" + } + ), + "blood_pressure": NumericalFeature( + name="blood_pressure", + feature_type=FeatureType.FLOAT_NORMALIZED, + use_embedding=True, + embedding_type="combined", + embedding_dim=10, + num_frequencies=4, + num_segments=8, + kwargs={ + "embedding_types": ["periodic", "ple"], + "use_gating": True + } + ) +} + +# Create preprocessing model +preprocessor = PreprocessingModel( + path_data="data/patient_data.csv", + features_specs=features_specs, + use_advanced_numerical_embedding=True, + use_periodic_embedding=True, + use_ple_embedding=True, + use_advanced_combined_embedding=True, + embedding_dim=8, + num_frequencies=6, + num_segments=12, + frequency_init="constant", + segment_init="uniform", + ple_activation="relu", + use_gating=True +) +``` + +### Time Series Forecasting with PLE Embeddings + +```python +from kdp import PreprocessingModel +from kdp.features import NumericalFeature +from kdp.enums import FeatureType + +# Define time series features with PLE embeddings for trend capture +features_specs = { + "temperature": NumericalFeature( + name="temperature", + feature_type=FeatureType.FLOAT_NORMALIZED, + use_embedding=True, + embedding_type="periodic", # Periodic for seasonal patterns + embedding_dim=12, + num_frequencies=8, + kwargs={ + "frequency_init": "log_uniform", + "min_frequency": 1e-4, + "max_frequency": 1e3 + } + ), + "humidity": NumericalFeature( + name="humidity", + feature_type=FeatureType.FLOAT_NORMALIZED, + use_embedding=True, + embedding_type="ple", # PLE for humidity trends + embedding_dim=8, + num_segments=16, + kwargs={ + "segment_init": "quantile", + "ple_activation": "sigmoid" + } + ), + "pressure": NumericalFeature( + name="pressure", + feature_type=FeatureType.FLOAT_NORMALIZED, + use_embedding=True, + embedding_type="combined", # Combined for complex patterns + embedding_dim=10, + num_frequencies=6, + num_segments=12, + kwargs={ + "embedding_types": ["periodic", "ple", "dual_branch"], + "use_gating": True + } + ) +} + +# Create preprocessing model +preprocessor = PreprocessingModel( + path_data="data/weather_data.csv", + features_specs=features_specs, + use_advanced_numerical_embedding=True, + use_periodic_embedding=True, + use_ple_embedding=True, + use_advanced_combined_embedding=True, + embedding_dim=10, + num_frequencies=8, + num_segments=16, + frequency_init="log_uniform", + segment_init="quantile", + ple_activation="sigmoid", + use_gating=True ) ``` @@ -349,27 +573,43 @@ preprocessor = PreprocessingModel( 1. **Choose the Right Embedding Type** - Use individual embeddings for interpretability and precise control - Use global embeddings for efficiency with many numerical features + - Use periodic embeddings for features with cyclical patterns (time, angles, seasons) + - Use PLE embeddings for features with complex non-linear relationships + - Use combined embeddings for maximum performance on challenging datasets 2. **Distribution-Aware Initialization** - Set `init_min` and `init_max` based on your data's actual distribution - Use domain knowledge to set meaningful boundary points - Initialize closer to anticipated feature range for faster convergence + - For periodic embeddings, use log-uniform initialization for better frequency distribution + - For PLE embeddings, use quantile-based initialization for data-driven segment boundaries 3. **Dimensionality Guidelines** - Start with `embedding_dim` = 4-8 for simple features - Use 8-16 for complex features with non-linear patterns - For global embeddings, scale with the number of features (16-64) + - For periodic embeddings, use 4-8 frequencies for most features + - For PLE embeddings, use 8-16 segments for smooth approximations 4. **Performance Tuning** - Increase `num_bins` for more granular discrete representations - Adjust `mlp_hidden_units` to 2-4x the embedding dimension - Use batch normalization for faster, more stable training - Adjust dropout based on dataset size (higher for small datasets) + - For periodic embeddings, experiment with different frequency ranges + - For PLE embeddings, try different activation functions (relu, sigmoid, tanh) + +5. **Advanced Embedding Strategies** + - **Periodic Embeddings**: Best for time-based features, angles, cyclical patterns + - **PLE Embeddings**: Best for features with piecewise linear relationships + - **Combined Embeddings**: Best for maximum performance, especially on tabular tasks + - **Mixed Strategies**: Use different embedding types for different features based on their characteristics -5. **Combine with Other KDP Features** +6. **Combine with Other KDP Features** - Pair with distribution-aware encoding for optimal numerical handling - Use with tabular attention to learn cross-feature interactions - Combine with feature selection for automatic dimensionality reduction + - Use with transformer blocks for advanced feature interactions ## 🔗 Related Topics diff --git a/kdp/features.py b/kdp/features.py index d3a1b0b..d12c69f 100644 --- a/kdp/features.py +++ b/kdp/features.py @@ -145,6 +145,9 @@ def __init__( use_embedding: bool = False, embedding_dim: int = 8, num_bins: int = 10, + embedding_type: str = "dual_branch", + num_frequencies: int = 4, + num_segments: int = 8, **kwargs, ) -> None: """Initializes a NumericalFeature instance. @@ -156,6 +159,9 @@ def __init__( use_embedding (bool): Whether to use advanced numerical embedding. embedding_dim (int): Dimension of the embedding space. num_bins (int): Number of bins for discretization. + embedding_type (str): Type of embedding to use ('dual_branch', 'periodic', 'ple', 'combined'). + num_frequencies (int): Number of frequency components for periodic embedding. + num_segments (int): Number of segments for PLE embedding. **kwargs: Additional keyword arguments for the feature. """ super().__init__(name, feature_type, **kwargs) @@ -164,22 +170,78 @@ def __init__( self.use_embedding = use_embedding self.embedding_dim = embedding_dim self.num_bins = num_bins + self.embedding_type = embedding_type + self.num_frequencies = num_frequencies + self.num_segments = num_segments def get_embedding_layer(self, input_shape: tuple) -> tf.keras.layers.Layer: - """Creates and returns an NumericalEmbedding layer configured for this feature.""" + """Creates and returns an embedding layer configured for this feature.""" # TODO: check why to use input_shape ? - from kdp.layers.numerical_embedding_layer import NumericalEmbedding - - return NumericalEmbedding( - embedding_dim=self.embedding_dim, - mlp_hidden_units=max(16, self.embedding_dim * 2), - num_bins=self.num_bins, - init_min=self.kwargs.get("init_min", -3.0), - init_max=self.kwargs.get("init_max", 3.0), - dropout_rate=self.kwargs.get("dropout_rate", 0.1), - use_batch_norm=self.kwargs.get("use_batch_norm", True), - name=f"{self.name}_embedding", - ) + + if self.embedding_type == "periodic": + from kdp.layers.periodic_embedding_layer import PeriodicEmbedding + return PeriodicEmbedding( + embedding_dim=self.embedding_dim, + num_frequencies=self.num_frequencies, + mlp_hidden_units=max(16, self.embedding_dim * 2), + use_mlp=True, + dropout_rate=self.kwargs.get("dropout_rate", 0.1), + use_batch_norm=self.kwargs.get("use_batch_norm", True), + frequency_init=self.kwargs.get("frequency_init", "log_uniform"), + min_frequency=self.kwargs.get("min_frequency", 1e-4), + max_frequency=self.kwargs.get("max_frequency", 1e2), + use_residual=self.kwargs.get("use_residual", True), + name=f"{self.name}_periodic_embedding", + ) + elif self.embedding_type == "ple": + from kdp.layers.ple_embedding_layer import PLEEmbedding + return PLEEmbedding( + embedding_dim=self.embedding_dim, + num_segments=self.num_segments, + mlp_hidden_units=max(16, self.embedding_dim * 2), + use_mlp=True, + dropout_rate=self.kwargs.get("dropout_rate", 0.1), + use_batch_norm=self.kwargs.get("use_batch_norm", True), + segment_init=self.kwargs.get("segment_init", "uniform"), + use_residual=self.kwargs.get("use_residual", True), + activation=self.kwargs.get("ple_activation", "relu"), + name=f"{self.name}_ple_embedding", + ) + elif self.embedding_type == "combined": + from kdp.layers.advanced_numerical_embedding_layer import AdvancedNumericalEmbedding + return AdvancedNumericalEmbedding( + embedding_dim=self.embedding_dim, + embedding_types=self.kwargs.get("embedding_types", ["periodic", "ple", "dual_branch"]), + num_frequencies=self.num_frequencies, + num_segments=self.num_segments, + mlp_hidden_units=max(16, self.embedding_dim * 2), + num_bins=self.num_bins, + init_min=self.kwargs.get("init_min", -3.0), + init_max=self.kwargs.get("init_max", 3.0), + dropout_rate=self.kwargs.get("dropout_rate", 0.1), + use_batch_norm=self.kwargs.get("use_batch_norm", True), + frequency_init=self.kwargs.get("frequency_init", "log_uniform"), + min_frequency=self.kwargs.get("min_frequency", 1e-4), + max_frequency=self.kwargs.get("max_frequency", 1e2), + segment_init=self.kwargs.get("segment_init", "uniform"), + ple_activation=self.kwargs.get("ple_activation", "relu"), + use_residual=self.kwargs.get("use_residual", True), + use_gating=self.kwargs.get("use_gating", True), + name=f"{self.name}_combined_embedding", + ) + else: + # Default to traditional dual-branch embedding + from kdp.layers.numerical_embedding_layer import NumericalEmbedding + return NumericalEmbedding( + embedding_dim=self.embedding_dim, + mlp_hidden_units=max(16, self.embedding_dim * 2), + num_bins=self.num_bins, + init_min=self.kwargs.get("init_min", -3.0), + init_max=self.kwargs.get("init_max", 3.0), + dropout_rate=self.kwargs.get("dropout_rate", 0.1), + use_batch_norm=self.kwargs.get("use_batch_norm", True), + name=f"{self.name}_embedding", + ) class CategoricalFeature(Feature): diff --git a/kdp/layers/advanced_numerical_embedding_layer.py b/kdp/layers/advanced_numerical_embedding_layer.py new file mode 100644 index 0000000..ac15d8f --- /dev/null +++ b/kdp/layers/advanced_numerical_embedding_layer.py @@ -0,0 +1,243 @@ +import tensorflow as tf +import numpy as np + + +@tf.keras.utils.register_keras_serializable(package="kdp.layers") +class AdvancedNumericalEmbedding(tf.keras.layers.Layer): + """Advanced numerical embedding layer combining periodic, PLE, and dual-branch architectures. + + This layer provides a comprehensive numerical embedding solution that combines: + 1. Periodic embeddings using sin/cos expansions + 2. PLE (Parameterized Linear Expansion) embeddings + 3. Traditional dual-branch (continuous + discrete) embeddings + + The layer allows flexible configuration to use any combination of these approaches, + with learnable gates to combine different embedding types. + + Args: + embedding_dim (int): Output embedding dimension per feature. + embedding_types (list): List of embedding types to use ('periodic', 'ple', 'dual_branch'). + num_frequencies (int): Number of frequency components for periodic embedding. + num_segments (int): Number of segments for PLE embedding. + mlp_hidden_units (int): Hidden units for MLPs. + num_bins (int): Number of bins for discrete branch. + init_min (float): Initial minimum for discrete branch. + init_max (float): Initial maximum for discrete branch. + dropout_rate (float): Dropout rate for regularization. + use_batch_norm (bool): Whether to use batch normalization. + frequency_init (str): Initialization method for periodic frequencies. + min_frequency (float): Minimum frequency for periodic embedding. + max_frequency (float): Maximum frequency for periodic embedding. + segment_init (str): Initialization method for PLE segments. + ple_activation (str): Activation function for PLE embedding. + use_residual (bool): Whether to use residual connections. + use_gating (bool): Whether to use learnable gates to combine embeddings. + """ + + def __init__( + self, + embedding_dim: int = 8, + embedding_types: list[str] = None, + num_frequencies: int = 4, + num_segments: int = 8, + mlp_hidden_units: int = 16, + num_bins: int = 10, + init_min: float | list[float] = -3.0, + init_max: float | list[float] = 3.0, + dropout_rate: float = 0.1, + use_batch_norm: bool = True, + frequency_init: str = "log_uniform", + min_frequency: float = 1e-4, + max_frequency: float = 1e2, + segment_init: str = "uniform", + ple_activation: str = "relu", + use_residual: bool = True, + use_gating: bool = True, + **kwargs, + ): + """Initialize the AdvancedNumericalEmbedding layer. + + Args: + embedding_dim: Dimension of the output embedding for each feature. + embedding_types: List of embedding types to use. + num_frequencies: Number of frequency components for periodic embedding. + num_segments: Number of segments for PLE embedding. + mlp_hidden_units: Number of hidden units in MLPs. + num_bins: Number of bins for discrete branch. + init_min: Initial minimum for discrete branch. + init_max: Initial maximum for discrete branch. + dropout_rate: Dropout rate for regularization. + use_batch_norm: Whether to use batch normalization. + frequency_init: Initialization method for periodic frequencies. + min_frequency: Minimum frequency for periodic embedding. + max_frequency: Maximum frequency for periodic embedding. + segment_init: Initialization method for PLE segments. + ple_activation: Activation function for PLE embedding. + use_residual: Whether to use residual connections. + use_gating: Whether to use learnable gates to combine embeddings. + **kwargs: Additional layer arguments. + """ + super().__init__(**kwargs) + self.embedding_dim = embedding_dim + self.embedding_types = embedding_types or ["dual_branch"] + self.num_frequencies = num_frequencies + self.num_segments = num_segments + self.mlp_hidden_units = mlp_hidden_units + self.num_bins = num_bins + self.dropout_rate = dropout_rate + self.use_batch_norm = use_batch_norm + self.frequency_init = frequency_init + self.min_frequency = min_frequency + self.max_frequency = max_frequency + self.segment_init = segment_init + self.ple_activation = ple_activation + self.use_residual = use_residual + self.use_gating = use_gating + self.init_min = init_min + self.init_max = init_max + + # Validate embedding types + valid_types = ["periodic", "ple", "dual_branch"] + for embedding_type in self.embedding_types: + if embedding_type not in valid_types: + raise ValueError(f"Invalid embedding type: {embedding_type}. Must be one of {valid_types}") + + def build(self, input_shape): + # input_shape: (batch, num_features) + if hasattr(self, 'embedding_layers'): + return # Already built + + self.num_features = input_shape[-1] + + # Import embedding layers + from kdp.layers.periodic_embedding_layer import PeriodicEmbedding + from kdp.layers.ple_embedding_layer import PLEEmbedding + from kdp.layers.numerical_embedding_layer import NumericalEmbedding + + # Create embedding layers based on configuration + self.embedding_layers = {} + + if "periodic" in self.embedding_types: + self.embedding_layers["periodic"] = PeriodicEmbedding( + embedding_dim=self.embedding_dim, + num_frequencies=self.num_frequencies, + mlp_hidden_units=self.mlp_hidden_units, + use_mlp=True, + dropout_rate=self.dropout_rate, + use_batch_norm=self.use_batch_norm, + frequency_init=self.frequency_init, + min_frequency=self.min_frequency, + max_frequency=self.max_frequency, + use_residual=self.use_residual, + name="periodic_embedding", + ) + + if "ple" in self.embedding_types: + self.embedding_layers["ple"] = PLEEmbedding( + embedding_dim=self.embedding_dim, + num_segments=self.num_segments, + mlp_hidden_units=self.mlp_hidden_units, + use_mlp=True, + dropout_rate=self.dropout_rate, + use_batch_norm=self.use_batch_norm, + segment_init=self.segment_init, + use_residual=self.use_residual, + activation=self.ple_activation, + name="ple_embedding", + ) + + if "dual_branch" in self.embedding_types: + self.embedding_layers["dual_branch"] = NumericalEmbedding( + embedding_dim=self.embedding_dim, + mlp_hidden_units=self.mlp_hidden_units, + num_bins=self.num_bins, + init_min=self.init_min, + init_max=self.init_max, + dropout_rate=self.dropout_rate, + use_batch_norm=self.use_batch_norm, + name="dual_branch_embedding", + ) + + # Build all embedding layers + for layer in self.embedding_layers.values(): + layer.build(input_shape) + + # Create learnable gates if multiple embedding types are used + if len(self.embedding_types) > 1 and self.use_gating: + self.gates = {} + for embedding_type in self.embedding_types: + gate_name = f"gate_{embedding_type}" + self.gates[gate_name] = self.add_weight( + name=gate_name, + shape=(self.num_features, self.embedding_dim), + initializer="zeros", + trainable=True, + ) + + super().build(input_shape) + + def call(self, inputs: tf.Tensor, training: bool = False) -> tf.Tensor: + # inputs: (batch, num_features) + inputs_float = tf.cast(inputs, tf.float32) + + # Apply each embedding type + embeddings = {} + for embedding_type, layer in self.embedding_layers.items(): + embeddings[embedding_type] = layer(inputs_float, training=training) + + # If only one embedding type, return it directly + if len(self.embedding_types) == 1: + embedding_type = self.embedding_types[0] + return embeddings[embedding_type] + + # Combine multiple embeddings using learnable gates + if self.use_gating: + # Apply sigmoid to gates + gates = {k: tf.nn.sigmoid(v) for k, v in self.gates.items()} + + # Normalize gates to sum to 1 + gate_sum = sum(gates.values()) + gates = {k: v / (gate_sum + 1e-8) for k, v in gates.items()} + + # Combine embeddings + combined_embedding = tf.zeros_like(embeddings[self.embedding_types[0]]) + for embedding_type in self.embedding_types: + gate_name = f"gate_{embedding_type}" + gate = gates[gate_name] + embedding = embeddings[embedding_type] + combined_embedding += gate * embedding + + return combined_embedding + else: + # Simple concatenation or averaging + # For now, use averaging + return tf.reduce_mean(list(embeddings.values()), axis=0) + + def get_config(self): + config = super().get_config() + config.update( + { + "embedding_dim": self.embedding_dim, + "embedding_types": self.embedding_types, + "num_frequencies": self.num_frequencies, + "num_segments": self.num_segments, + "mlp_hidden_units": self.mlp_hidden_units, + "num_bins": self.num_bins, + "init_min": self.init_min, + "init_max": self.init_max, + "dropout_rate": self.dropout_rate, + "use_batch_norm": self.use_batch_norm, + "frequency_init": self.frequency_init, + "min_frequency": self.min_frequency, + "max_frequency": self.max_frequency, + "segment_init": self.segment_init, + "ple_activation": self.ple_activation, + "use_residual": self.use_residual, + "use_gating": self.use_gating, + } + ) + return config + + @classmethod + def from_config(cls, config): + return cls(**config) \ No newline at end of file diff --git a/kdp/layers/periodic_embedding_layer.py b/kdp/layers/periodic_embedding_layer.py new file mode 100644 index 0000000..189d43c --- /dev/null +++ b/kdp/layers/periodic_embedding_layer.py @@ -0,0 +1,230 @@ +import tensorflow as tf +import numpy as np + + +@tf.keras.utils.register_keras_serializable(package="kdp.layers") +class PeriodicEmbedding(tf.keras.layers.Layer): + """Periodic embedding layer for continuous numerical features using sin/cos expansions. + + This layer embeds continuous numerical features using periodic expansions, which have been + shown to improve performance on tabular tasks by capturing cyclical patterns and providing + smooth, differentiable representations. + + The layer applies periodic transformations using sin/cos functions with learnable frequencies, + followed by optional MLP processing and residual connections. + + Args: + embedding_dim (int): Output embedding dimension per feature. + num_frequencies (int): Number of frequency components to use for periodic expansion. + mlp_hidden_units (int): Hidden units for the post-periodic MLP (optional). + use_mlp (bool): Whether to apply MLP after periodic expansion. + dropout_rate (float): Dropout rate applied to the MLP. + use_batch_norm (bool): Whether to apply batch normalization. + frequency_init (str): Initialization method for frequencies ('uniform', 'log_uniform', 'constant'). + min_frequency (float): Minimum frequency for initialization. + max_frequency (float): Maximum frequency for initialization. + use_residual (bool): Whether to use residual connections. + """ + + def __init__( + self, + embedding_dim: int = 8, + num_frequencies: int = 4, + mlp_hidden_units: int = 16, + use_mlp: bool = True, + dropout_rate: float = 0.1, + use_batch_norm: bool = True, + frequency_init: str = "log_uniform", + min_frequency: float = 1e-4, + max_frequency: float = 1e2, + use_residual: bool = True, + **kwargs, + ): + """Initialize the PeriodicEmbedding layer. + + Args: + embedding_dim: Dimension of the output embedding for each feature. + num_frequencies: Number of frequency components for periodic expansion. + mlp_hidden_units: Number of hidden units in the MLP. + use_mlp: Whether to apply MLP after periodic expansion. + dropout_rate: Dropout rate for regularization. + use_batch_norm: Whether to use batch normalization. + frequency_init: Initialization method for frequencies. + min_frequency: Minimum frequency for initialization. + max_frequency: Maximum frequency for initialization. + use_residual: Whether to use residual connections. + **kwargs: Additional layer arguments. + """ + super().__init__(**kwargs) + self.embedding_dim = embedding_dim + self.num_frequencies = num_frequencies + self.mlp_hidden_units = mlp_hidden_units + self.use_mlp = use_mlp + self.dropout_rate = dropout_rate + self.use_batch_norm = use_batch_norm + self.frequency_init = frequency_init + self.min_frequency = min_frequency + self.max_frequency = max_frequency + self.use_residual = use_residual + + # Validate frequency_init in constructor + valid_frequency_inits = ["uniform", "log_uniform", "constant"] + if self.frequency_init not in valid_frequency_inits: + raise ValueError(f"Unknown frequency_init: {self.frequency_init}. Must be one of {valid_frequency_inits}") + + def build(self, input_shape): + # input_shape: (batch, num_features) + if hasattr(self, 'frequencies'): + return # Already built + + self.num_features = input_shape[-1] + + # Learnable frequencies for each feature, shape: (num_features, num_frequencies) + if self.frequency_init == "uniform": + initializer = tf.random_uniform_initializer( + self.min_frequency, self.max_frequency + ) + elif self.frequency_init == "log_uniform": + # Log-uniform initialization for better frequency distribution + log_min = np.log(self.min_frequency) + log_max = np.log(self.max_frequency) + # Create log frequencies and then apply exp + log_frequencies = np.linspace(log_min, log_max, self.num_frequencies) + frequencies = np.exp(log_frequencies) + # Create a 2D array by repeating the frequencies for each feature + frequencies_2d = np.tile(frequencies, (self.num_features, 1)) + initializer = tf.constant_initializer(frequencies_2d) + elif self.frequency_init == "constant": + # Constant initialization with evenly spaced frequencies + frequencies = np.logspace( + np.log10(self.min_frequency), + np.log10(self.max_frequency), + self.num_frequencies + ) + # Create a 2D array by repeating the frequencies for each feature + frequencies_2d = np.tile(frequencies, (self.num_features, 1)) + initializer = tf.constant_initializer(frequencies_2d) + else: + raise ValueError(f"Unknown frequency_init: {self.frequency_init}") + + self.frequencies = self.add_weight( + name="frequencies", + shape=(self.num_features, self.num_frequencies), + initializer=initializer, + trainable=True, + constraint=tf.keras.constraints.NonNeg(), # Frequencies should be positive + ) + + + + # Post-periodic MLP (optional) + if self.use_mlp: + self.mlp = tf.keras.Sequential( + [ + tf.keras.layers.TimeDistributed( + tf.keras.layers.Dense(self.mlp_hidden_units, activation="relu") + ), + tf.keras.layers.TimeDistributed( + tf.keras.layers.Dense(self.embedding_dim) + ), + ], + name="post_periodic_mlp", + ) + + self.dropout = ( + tf.keras.layers.Dropout(self.dropout_rate) + if self.dropout_rate > 0 + else lambda x, training: x + ) + + if self.use_batch_norm: + self.batch_norm = tf.keras.layers.TimeDistributed( + tf.keras.layers.BatchNormalization(), name="periodic_batch_norm" + ) + + # Residual projection to match embedding_dim + if self.use_residual: + self.residual_proj = tf.keras.layers.TimeDistributed( + tf.keras.layers.Dense(self.embedding_dim, activation=None), + name="residual_proj", + ) + + # Build the sub-layers with dummy input to ensure weights are created + if self.use_mlp: + # Create dummy input for MLP: (batch, num_features, 2 * num_frequencies) + dummy_mlp_input = tf.zeros((1, self.num_features, 2 * self.num_frequencies)) + self.mlp(dummy_mlp_input) + + if self.use_batch_norm: + # Create dummy input for batch norm: (batch, num_features, embedding_dim) + dummy_bn_input = tf.zeros((1, self.num_features, self.embedding_dim)) + self.batch_norm(dummy_bn_input) + + if self.use_residual: + # Create dummy input for residual: (batch, num_features, 1) + dummy_residual_input = tf.zeros((1, self.num_features, 1)) + self.residual_proj(dummy_residual_input) + + super().build(input_shape) + + def call(self, inputs: tf.Tensor, training: bool = False) -> tf.Tensor: + # inputs: (batch, num_features) + inputs_float = tf.cast(inputs, tf.float32) + + # Apply periodic expansion + # Expand inputs for broadcasting: (batch, num_features, 1) + inputs_expanded = tf.expand_dims(inputs_float, axis=-1) + + # Expand frequencies for broadcasting: (1, num_features, num_frequencies) + frequencies_expanded = tf.expand_dims(self.frequencies, axis=0) + + # Compute periodic features: (batch, num_features, num_frequencies) + periodic_features = inputs_expanded * frequencies_expanded + + # Apply sin and cos transformations + sin_features = tf.sin(periodic_features) # (batch, num_features, num_frequencies) + cos_features = tf.cos(periodic_features) # (batch, num_features, num_frequencies) + + # Concatenate sin and cos features: (batch, num_features, 2 * num_frequencies) + periodic_embeddings = tf.concat([sin_features, cos_features], axis=-1) + + # Apply optional MLP + if self.use_mlp: + periodic_embeddings = self.mlp(periodic_embeddings) + periodic_embeddings = self.dropout(periodic_embeddings, training=training) + if self.use_batch_norm: + periodic_embeddings = self.batch_norm(periodic_embeddings, training=training) + + # Apply residual connection if enabled + if self.use_residual: + inputs_expanded_for_residual = tf.expand_dims(inputs_float, axis=-1) + residual = self.residual_proj(inputs_expanded_for_residual) + periodic_embeddings = periodic_embeddings + residual + + # If only one feature is provided, squeeze the features axis + if self.num_features == 1: + return tf.squeeze(periodic_embeddings, axis=1) # New shape: (batch, embedding_dim) + + return periodic_embeddings + + def get_config(self): + config = super().get_config() + config.update( + { + "embedding_dim": self.embedding_dim, + "num_frequencies": self.num_frequencies, + "mlp_hidden_units": self.mlp_hidden_units, + "use_mlp": self.use_mlp, + "dropout_rate": self.dropout_rate, + "use_batch_norm": self.use_batch_norm, + "frequency_init": self.frequency_init, + "min_frequency": self.min_frequency, + "max_frequency": self.max_frequency, + "use_residual": self.use_residual, + } + ) + return config + + @classmethod + def from_config(cls, config): + return cls(**config) \ No newline at end of file diff --git a/kdp/layers/ple_embedding_layer.py b/kdp/layers/ple_embedding_layer.py new file mode 100644 index 0000000..8232ace --- /dev/null +++ b/kdp/layers/ple_embedding_layer.py @@ -0,0 +1,282 @@ +import tensorflow as tf +import numpy as np + + +@tf.keras.utils.register_keras_serializable(package="kdp.layers") +class PLEEmbedding(tf.keras.layers.Layer): + """Parameterized Linear Expansion (PLE) embedding layer for continuous numerical features. + + This layer implements Parameterized Linear Expansions, which have been shown to improve + performance on tabular tasks by providing learnable non-linear transformations that can + capture complex patterns in numerical data. + + The layer applies learnable piecewise linear transformations with multiple segments, + followed by optional MLP processing and residual connections. + + Args: + embedding_dim (int): Output embedding dimension per feature. + num_segments (int): Number of linear segments for piecewise approximation. + mlp_hidden_units (int): Hidden units for the post-PLE MLP (optional). + use_mlp (bool): Whether to apply MLP after PLE transformation. + dropout_rate (float): Dropout rate applied to the MLP. + use_batch_norm (bool): Whether to apply batch normalization. + segment_init (str): Initialization method for segment boundaries ('uniform', 'quantile'). + use_residual (bool): Whether to use residual connections. + activation (str): Activation function for the PLE transformation ('relu', 'sigmoid', 'tanh'). + """ + + def __init__( + self, + embedding_dim: int = 8, + num_segments: int = 8, + mlp_hidden_units: int = 16, + use_mlp: bool = True, + dropout_rate: float = 0.1, + use_batch_norm: bool = True, + segment_init: str = "uniform", + use_residual: bool = True, + activation: str = "relu", + **kwargs, + ): + """Initialize the PLEEmbedding layer. + + Args: + embedding_dim: Dimension of the output embedding for each feature. + num_segments: Number of linear segments for piecewise approximation. + mlp_hidden_units: Number of hidden units in the MLP. + use_mlp: Whether to apply MLP after PLE transformation. + dropout_rate: Dropout rate for regularization. + use_batch_norm: Whether to use batch normalization. + segment_init: Initialization method for segment boundaries. + use_residual: Whether to use residual connections. + activation: Activation function for the PLE transformation. + **kwargs: Additional layer arguments. + """ + super().__init__(**kwargs) + self.embedding_dim = embedding_dim + self.num_segments = num_segments + self.mlp_hidden_units = mlp_hidden_units + self.use_mlp = use_mlp + self.dropout_rate = dropout_rate + self.use_batch_norm = use_batch_norm + self.segment_init = segment_init + self.use_residual = use_residual + self.activation = activation + + # Validate segment_init in constructor + valid_segment_inits = ["uniform", "quantile"] + if self.segment_init not in valid_segment_inits: + raise ValueError(f"Unknown segment_init: {self.segment_init}. Must be one of {valid_segment_inits}") + + # Validate activation in constructor + valid_activations = ["relu", "leaky_relu", "elu", "selu", "tanh", "sigmoid", "linear"] + if self.activation not in valid_activations: + raise ValueError(f"Unknown activation: {self.activation}. Must be one of {valid_activations}") + + def build(self, input_shape): + # input_shape: (batch, num_features) + if hasattr(self, 'segment_boundaries'): + return # Already built + + self.num_features = input_shape[-1] + + # Learnable segment boundaries for each feature, shape: (num_features, num_segments + 1) + if self.segment_init == "uniform": + # Uniform initialization across the expected input range + boundaries = np.linspace(-3.0, 3.0, self.num_segments + 1) + # Create a 2D array by repeating the boundaries for each feature + boundaries_2d = np.tile(boundaries, (self.num_features, 1)) + initializer = tf.constant_initializer(boundaries_2d) + elif self.segment_init == "quantile": + # Quantile-based initialization (will be updated during training) + boundaries = np.linspace(0.0, 1.0, self.num_segments + 1) + # Create a 2D array by repeating the boundaries for each feature + boundaries_2d = np.tile(boundaries, (self.num_features, 1)) + initializer = tf.constant_initializer(boundaries_2d) + else: + raise ValueError(f"Unknown segment_init: {self.segment_init}") + + self.segment_boundaries = self.add_weight( + name="segment_boundaries", + shape=(self.num_features, self.num_segments + 1), + initializer=initializer, + trainable=True, + ) + + # Learnable slopes for each segment, shape: (num_features, num_segments) + self.segment_slopes = self.add_weight( + name="segment_slopes", + shape=(self.num_features, self.num_segments), + initializer="ones", + trainable=True, + ) + + # Learnable intercepts for each segment, shape: (num_features, num_segments) + self.segment_intercepts = self.add_weight( + name="segment_intercepts", + shape=(self.num_features, self.num_segments), + initializer="zeros", + trainable=True, + ) + + # Post-PLE MLP (optional) + if self.use_mlp: + self.mlp = tf.keras.Sequential( + [ + tf.keras.layers.TimeDistributed( + tf.keras.layers.Dense(self.mlp_hidden_units, activation="relu") + ), + tf.keras.layers.TimeDistributed( + tf.keras.layers.Dense(self.embedding_dim) + ), + ], + name="post_ple_mlp", + ) + + self.dropout = ( + tf.keras.layers.Dropout(self.dropout_rate) + if self.dropout_rate > 0 + else lambda x, training: x + ) + + if self.use_batch_norm: + self.batch_norm = tf.keras.layers.TimeDistributed( + tf.keras.layers.BatchNormalization(), name="ple_batch_norm" + ) + + # Residual projection to match embedding_dim + if self.use_residual: + self.residual_proj = tf.keras.layers.TimeDistributed( + tf.keras.layers.Dense(self.embedding_dim, activation=None), + name="residual_proj", + ) + + # Build the sub-layers with dummy input to ensure weights are created + if self.use_mlp: + # Create dummy input for MLP: (batch, num_features, num_segments) + dummy_mlp_input = tf.zeros((1, self.num_features, self.num_segments)) + self.mlp(dummy_mlp_input) + + if self.use_batch_norm: + # Create dummy input for batch norm: (batch, num_features, embedding_dim) + dummy_bn_input = tf.zeros((1, self.num_features, self.embedding_dim)) + self.batch_norm(dummy_bn_input) + + if self.use_residual: + # Create dummy input for residual: (batch, num_features, 1) + dummy_residual_input = tf.zeros((1, self.num_features, 1)) + self.residual_proj(dummy_residual_input) + + super().build(input_shape) + + def _apply_ple_transformation(self, inputs: tf.Tensor) -> tf.Tensor: + """Apply Parameterized Linear Expansion transformation. + + Args: + inputs: Input tensor of shape (batch, num_features) + + Returns: + Transformed tensor of shape (batch, num_features, num_segments) + """ + # inputs: (batch, num_features) + batch_size = tf.shape(inputs)[0] + + # Expand inputs for broadcasting: (batch, num_features, 1) + inputs_expanded = tf.expand_dims(inputs, axis=-1) + + # Expand boundaries for broadcasting: (1, num_features, num_segments + 1) + boundaries_expanded = tf.expand_dims(self.segment_boundaries, axis=0) + + # Expand slopes and intercepts for broadcasting: (1, num_features, num_segments) + slopes_expanded = tf.expand_dims(self.segment_slopes, axis=0) + intercepts_expanded = tf.expand_dims(self.segment_intercepts, axis=0) + + # Compute segment activations using piecewise linear functions + # For each segment, compute the linear transformation + ple_outputs = [] + + for i in range(self.num_segments): + # Get the boundaries for this segment + left_boundary = boundaries_expanded[:, :, i] # (batch, num_features) + right_boundary = boundaries_expanded[:, :, i + 1] # (batch, num_features) + + # Get the slope and intercept for this segment + slope = slopes_expanded[:, :, i] # (batch, num_features) + intercept = intercepts_expanded[:, :, i] # (batch, num_features) + + # Compute the linear transformation for this segment + # Apply clipping to ensure inputs are within segment boundaries + clipped_inputs = tf.clip_by_value(inputs, left_boundary, right_boundary) + + # Normalize inputs to [0, 1] within the segment + segment_width = right_boundary - left_boundary + normalized_inputs = (clipped_inputs - left_boundary) / (segment_width + 1e-8) + + # Apply linear transformation: y = slope * x + intercept + segment_output = slope * normalized_inputs + intercept + + # Apply activation function + if self.activation == "relu": + segment_output = tf.nn.relu(segment_output) + elif self.activation == "sigmoid": + segment_output = tf.nn.sigmoid(segment_output) + elif self.activation == "tanh": + segment_output = tf.nn.tanh(segment_output) + elif self.activation == "linear": + pass # No activation + else: + raise ValueError(f"Unknown activation: {self.activation}") + + ple_outputs.append(segment_output) + + # Stack all segment outputs: (batch, num_features, num_segments) + ple_embeddings = tf.stack(ple_outputs, axis=-1) + + return ple_embeddings + + def call(self, inputs: tf.Tensor, training: bool = False) -> tf.Tensor: + # inputs: (batch, num_features) + inputs_float = tf.cast(inputs, tf.float32) + + # Apply PLE transformation + ple_embeddings = self._apply_ple_transformation(inputs_float) + + # Apply optional MLP + if self.use_mlp: + ple_embeddings = self.mlp(ple_embeddings) + ple_embeddings = self.dropout(ple_embeddings, training=training) + if self.use_batch_norm: + ple_embeddings = self.batch_norm(ple_embeddings, training=training) + + # Apply residual connection if enabled + if self.use_residual: + inputs_expanded_for_residual = tf.expand_dims(inputs_float, axis=-1) + residual = self.residual_proj(inputs_expanded_for_residual) + ple_embeddings = ple_embeddings + residual + + # If only one feature is provided, squeeze the features axis + if self.num_features == 1: + return tf.squeeze(ple_embeddings, axis=1) # New shape: (batch, embedding_dim) + + return ple_embeddings + + def get_config(self): + config = super().get_config() + config.update( + { + "embedding_dim": self.embedding_dim, + "num_segments": self.num_segments, + "mlp_hidden_units": self.mlp_hidden_units, + "use_mlp": self.use_mlp, + "dropout_rate": self.dropout_rate, + "use_batch_norm": self.use_batch_norm, + "segment_init": self.segment_init, + "use_residual": self.use_residual, + "activation": self.activation, + } + ) + return config + + @classmethod + def from_config(cls, config): + return cls(**config) \ No newline at end of file diff --git a/kdp/layers_factory.py b/kdp/layers_factory.py index 8312b1d..c2f44b6 100644 --- a/kdp/layers_factory.py +++ b/kdp/layers_factory.py @@ -21,6 +21,9 @@ from kdp.layers.variable_selection_layer import VariableSelection from kdp.layers.numerical_embedding_layer import NumericalEmbedding from kdp.layers.global_numerical_embedding_layer import GlobalNumericalEmbedding +from kdp.layers.periodic_embedding_layer import PeriodicEmbedding +from kdp.layers.ple_embedding_layer import PLEEmbedding +from kdp.layers.advanced_numerical_embedding_layer import AdvancedNumericalEmbedding from kdp.layers.gated_linear_unit_layer import GatedLinearUnit from kdp.layers.gated_residual_network_layer import GatedResidualNetwork from kdp.layers.distribution_transform_layer import DistributionTransformLayer @@ -445,6 +448,171 @@ def global_numerical_embedding_layer( **kwargs, ) + @staticmethod + def periodic_embedding_layer( + embedding_dim: int = 8, + num_frequencies: int = 4, + mlp_hidden_units: int = 16, + use_mlp: bool = True, + dropout_rate: float = 0.1, + use_batch_norm: bool = True, + frequency_init: str = "log_uniform", + min_frequency: float = 1e-4, + max_frequency: float = 1e2, + use_residual: bool = True, + name: str = "periodic_embedding", + **kwargs: dict, + ) -> tf.keras.layers.Layer: + """Create a PeriodicEmbedding layer. + + Args: + embedding_dim (int): Dimension of the output embedding + num_frequencies (int): Number of frequency components for periodic expansion + mlp_hidden_units (int): Number of hidden units in the MLP + use_mlp (bool): Whether to apply MLP after periodic expansion + dropout_rate (float): Dropout rate for regularization + use_batch_norm (bool): Whether to use batch normalization + frequency_init (str): Initialization method for frequencies + min_frequency (float): Minimum frequency for initialization + max_frequency (float): Maximum frequency for initialization + use_residual (bool): Whether to use residual connections + name (str): Name of the layer + **kwargs: Additional arguments to pass to the layer + + Returns: + PeriodicEmbedding: A PeriodicEmbedding layer instance + """ + return PeriodicEmbedding( + embedding_dim=embedding_dim, + num_frequencies=num_frequencies, + mlp_hidden_units=mlp_hidden_units, + use_mlp=use_mlp, + dropout_rate=dropout_rate, + use_batch_norm=use_batch_norm, + frequency_init=frequency_init, + min_frequency=min_frequency, + max_frequency=max_frequency, + use_residual=use_residual, + name=name, + **kwargs, + ) + + @staticmethod + def ple_embedding_layer( + embedding_dim: int = 8, + num_segments: int = 8, + mlp_hidden_units: int = 16, + use_mlp: bool = True, + dropout_rate: float = 0.1, + use_batch_norm: bool = True, + segment_init: str = "uniform", + use_residual: bool = True, + activation: str = "relu", + name: str = "ple_embedding", + **kwargs: dict, + ) -> tf.keras.layers.Layer: + """Create a PLEEmbedding layer. + + Args: + embedding_dim (int): Dimension of the output embedding + num_segments (int): Number of linear segments for piecewise approximation + mlp_hidden_units (int): Number of hidden units in the MLP + use_mlp (bool): Whether to apply MLP after PLE transformation + dropout_rate (float): Dropout rate for regularization + use_batch_norm (bool): Whether to use batch normalization + segment_init (str): Initialization method for segment boundaries + use_residual (bool): Whether to use residual connections + activation (str): Activation function for the PLE transformation + name (str): Name of the layer + **kwargs: Additional arguments to pass to the layer + + Returns: + PLEEmbedding: A PLEEmbedding layer instance + """ + return PLEEmbedding( + embedding_dim=embedding_dim, + num_segments=num_segments, + mlp_hidden_units=mlp_hidden_units, + use_mlp=use_mlp, + dropout_rate=dropout_rate, + use_batch_norm=use_batch_norm, + segment_init=segment_init, + use_residual=use_residual, + activation=activation, + name=name, + **kwargs, + ) + + @staticmethod + def advanced_numerical_embedding_layer( + embedding_dim: int = 8, + embedding_types: list[str] = None, + num_frequencies: int = 4, + num_segments: int = 8, + mlp_hidden_units: int = 16, + num_bins: int = 10, + init_min: float = -3.0, + init_max: float = 3.0, + dropout_rate: float = 0.1, + use_batch_norm: bool = True, + frequency_init: str = "log_uniform", + min_frequency: float = 1e-4, + max_frequency: float = 1e2, + segment_init: str = "uniform", + ple_activation: str = "relu", + use_residual: bool = True, + use_gating: bool = True, + name: str = "advanced_numerical_embedding", + **kwargs: dict, + ) -> tf.keras.layers.Layer: + """Create an AdvancedNumericalEmbedding layer. + + Args: + embedding_dim (int): Dimension of the output embedding + embedding_types (list): List of embedding types to use + num_frequencies (int): Number of frequency components for periodic embedding + num_segments (int): Number of segments for PLE embedding + mlp_hidden_units (int): Number of hidden units in MLPs + num_bins (int): Number of bins for discrete branch + init_min (float): Initial minimum for discrete branch + init_max (float): Initial maximum for discrete branch + dropout_rate (float): Dropout rate for regularization + use_batch_norm (bool): Whether to use batch normalization + frequency_init (str): Initialization method for periodic frequencies + min_frequency (float): Minimum frequency for periodic embedding + max_frequency (float): Maximum frequency for periodic embedding + segment_init (str): Initialization method for PLE segments + ple_activation (str): Activation function for PLE embedding + use_residual (bool): Whether to use residual connections + use_gating (bool): Whether to use learnable gates to combine embeddings + name (str): Name of the layer + **kwargs: Additional arguments to pass to the layer + + Returns: + AdvancedNumericalEmbedding: An AdvancedNumericalEmbedding layer instance + """ + return AdvancedNumericalEmbedding( + embedding_dim=embedding_dim, + embedding_types=embedding_types, + num_frequencies=num_frequencies, + num_segments=num_segments, + mlp_hidden_units=mlp_hidden_units, + num_bins=num_bins, + init_min=init_min, + init_max=init_max, + dropout_rate=dropout_rate, + use_batch_norm=use_batch_norm, + frequency_init=frequency_init, + min_frequency=min_frequency, + max_frequency=max_frequency, + segment_init=segment_init, + ple_activation=ple_activation, + use_residual=use_residual, + use_gating=use_gating, + name=name, + **kwargs, + ) + @staticmethod def gated_linear_unit_layer( units: int, diff --git a/kdp/processor.py b/kdp/processor.py index 4b11925..a9aacde 100644 --- a/kdp/processor.py +++ b/kdp/processor.py @@ -315,6 +315,20 @@ def __init__( feature_moe_freeze_experts: bool = False, feature_moe_use_residual: bool = True, include_passthrough_in_output: bool = True, + # Advanced numerical embedding types + use_periodic_embedding: bool = False, + use_ple_embedding: bool = False, + use_advanced_combined_embedding: bool = False, + embedding_types: list[str] = None, + num_frequencies: int = 4, + num_segments: int = 8, + frequency_init: str = "log_uniform", + min_frequency: float = 1e-4, + max_frequency: float = 1e2, + segment_init: str = "uniform", + ple_activation: str = "relu", + use_residual: bool = True, + use_gating: bool = True, ) -> None: """Initialize a preprocessing model. @@ -408,6 +422,21 @@ def __init__( self.global_use_batch_norm = global_use_batch_norm self.global_pooling = global_pooling + # Advanced numerical embedding types control + self.use_periodic_embedding = use_periodic_embedding + self.use_ple_embedding = use_ple_embedding + self.use_advanced_combined_embedding = use_advanced_combined_embedding + self.embedding_types = embedding_types + self.num_frequencies = num_frequencies + self.num_segments = num_segments + self.frequency_init = frequency_init + self.min_frequency = min_frequency + self.max_frequency = max_frequency + self.segment_init = segment_init + self.ple_activation = ple_activation + self.use_residual = use_residual + self.use_gating = use_gating + # MoE control self.use_feature_moe = use_feature_moe self.feature_moe_num_experts = feature_moe_num_experts @@ -1045,21 +1074,78 @@ def _add_advanced_numerical_embedding( feature: Feature object with settings input_layer: Input layer for the feature """ - logger.info(f"Using NumericalEmbedding for {feature_name}") - # Obtain the embedding layer. - embedding_layer = feature.get_embedding_layer(input_shape=input_layer.shape) - preprocessor.add_processing_step( - layer_creator=lambda **kwargs: embedding_layer, - layer_class="NumericalEmbedding", - name=f"advanced_embedding_{feature_name}", - embedding_dim=self.embedding_dim, - mlp_hidden_units=self.mlp_hidden_units, - num_bins=self.num_bins, - init_min=self.init_min, - init_max=self.init_max, - dropout_rate=self.dropout_rate, - use_batch_norm=self.use_batch_norm, - ) + # Determine which embedding type to use + if self.use_advanced_combined_embedding: + logger.info(f"Using AdvancedNumericalEmbedding for {feature_name}") + preprocessor.add_processing_step( + layer_creator=PreprocessorLayerFactory.advanced_numerical_embedding_layer, + name=f"advanced_combined_embedding_{feature_name}", + embedding_dim=self.embedding_dim, + embedding_types=self.embedding_types, + num_frequencies=self.num_frequencies, + num_segments=self.num_segments, + mlp_hidden_units=self.mlp_hidden_units, + num_bins=self.num_bins, + init_min=self.init_min, + init_max=self.init_max, + dropout_rate=self.dropout_rate, + use_batch_norm=self.use_batch_norm, + frequency_init=self.frequency_init, + min_frequency=self.min_frequency, + max_frequency=self.max_frequency, + segment_init=self.segment_init, + ple_activation=self.ple_activation, + use_residual=self.use_residual, + use_gating=self.use_gating, + ) + elif self.use_periodic_embedding: + logger.info(f"Using PeriodicEmbedding for {feature_name}") + preprocessor.add_processing_step( + layer_creator=PreprocessorLayerFactory.periodic_embedding_layer, + name=f"periodic_embedding_{feature_name}", + embedding_dim=self.embedding_dim, + num_frequencies=self.num_frequencies, + mlp_hidden_units=self.mlp_hidden_units, + use_mlp=True, + dropout_rate=self.dropout_rate, + use_batch_norm=self.use_batch_norm, + frequency_init=self.frequency_init, + min_frequency=self.min_frequency, + max_frequency=self.max_frequency, + use_residual=self.use_residual, + ) + elif self.use_ple_embedding: + logger.info(f"Using PLEEmbedding for {feature_name}") + preprocessor.add_processing_step( + layer_creator=PreprocessorLayerFactory.ple_embedding_layer, + name=f"ple_embedding_{feature_name}", + embedding_dim=self.embedding_dim, + num_segments=self.num_segments, + mlp_hidden_units=self.mlp_hidden_units, + use_mlp=True, + dropout_rate=self.dropout_rate, + use_batch_norm=self.use_batch_norm, + segment_init=self.segment_init, + use_residual=self.use_residual, + activation=self.ple_activation, + ) + else: + # Default to traditional numerical embedding + logger.info(f"Using NumericalEmbedding for {feature_name}") + # Obtain the embedding layer. + embedding_layer = feature.get_embedding_layer(input_shape=input_layer.shape) + preprocessor.add_processing_step( + layer_creator=lambda **kwargs: embedding_layer, + layer_class="NumericalEmbedding", + name=f"advanced_embedding_{feature_name}", + embedding_dim=self.embedding_dim, + mlp_hidden_units=self.mlp_hidden_units, + num_bins=self.num_bins, + init_min=self.init_min, + init_max=self.init_max, + dropout_rate=self.dropout_rate, + use_batch_norm=self.use_batch_norm, + ) @_monitor_performance def _add_pipeline_categorical( diff --git a/test/layers/test_advanced_numerical_embedding.py b/test/layers/test_advanced_numerical_embedding.py new file mode 100644 index 0000000..40c6a53 --- /dev/null +++ b/test/layers/test_advanced_numerical_embedding.py @@ -0,0 +1,342 @@ +import unittest +import numpy as np +import tensorflow as tf + +from kdp.layers.advanced_numerical_embedding_layer import AdvancedNumericalEmbedding + + +class TestAdvancedNumericalEmbedding(unittest.TestCase): + """Test cases for AdvancedNumericalEmbedding layer.""" + + def setUp(self): + """Set up test fixtures.""" + self.batch_size = 32 + self.num_features = 5 + self.embedding_dim = 8 + self.num_frequencies = 4 + self.num_segments = 8 + + def test_basic_functionality(self): + """Test basic functionality of AdvancedNumericalEmbedding layer.""" + layer = AdvancedNumericalEmbedding( + embedding_dim=self.embedding_dim, + embedding_types=["dual_branch"], + name="test_advanced" + ) + + # Create input data + inputs = tf.random.normal((self.batch_size, self.num_features)) + + # Build the layer + layer.build(inputs.shape) + + # Test forward pass + outputs = layer(inputs) + + # Check output shape + expected_shape = (self.batch_size, self.num_features, self.embedding_dim) + self.assertEqual(outputs.shape, expected_shape) + + # Check that outputs are finite + self.assertTrue(tf.reduce_all(tf.math.is_finite(outputs))) + + def test_single_embedding_type(self): + """Test AdvancedNumericalEmbedding with single embedding type.""" + embedding_types = ["periodic", "ple", "dual_branch"] + + for embedding_type in embedding_types: + with self.subTest(embedding_type=embedding_type): + layer = AdvancedNumericalEmbedding( + embedding_dim=self.embedding_dim, + embedding_types=[embedding_type] + ) + + inputs = tf.random.normal((self.batch_size, self.num_features)) + layer.build(inputs.shape) + outputs = layer(inputs) + + expected_shape = (self.batch_size, self.num_features, self.embedding_dim) + self.assertEqual(outputs.shape, expected_shape) + + def test_multiple_embedding_types(self): + """Test AdvancedNumericalEmbedding with multiple embedding types.""" + layer = AdvancedNumericalEmbedding( + embedding_dim=self.embedding_dim, + embedding_types=["periodic", "ple", "dual_branch"], + use_gating=True + ) + + inputs = tf.random.normal((self.batch_size, self.num_features)) + layer.build(inputs.shape) + outputs = layer(inputs) + + expected_shape = (self.batch_size, self.num_features, self.embedding_dim) + self.assertEqual(outputs.shape, expected_shape) + + # Check that gates exist + self.assertIn("gate_periodic", layer.gates) + self.assertIn("gate_ple", layer.gates) + self.assertIn("gate_dual_branch", layer.gates) + + def test_multiple_embedding_types_no_gating(self): + """Test AdvancedNumericalEmbedding with multiple embedding types without gating.""" + layer = AdvancedNumericalEmbedding( + embedding_dim=self.embedding_dim, + embedding_types=["periodic", "ple"], + use_gating=False + ) + + inputs = tf.random.normal((self.batch_size, self.num_features)) + layer.build(inputs.shape) + outputs = layer(inputs) + + expected_shape = (self.batch_size, self.num_features, self.embedding_dim) + self.assertEqual(outputs.shape, expected_shape) + + def test_single_feature(self): + """Test AdvancedNumericalEmbedding with single feature.""" + layer = AdvancedNumericalEmbedding( + embedding_dim=self.embedding_dim, + embedding_types=["dual_branch"] + ) + + # Single feature input + inputs = tf.random.normal((self.batch_size, 1)) + + # Build and test + layer.build(inputs.shape) + outputs = layer(inputs) + + # Should squeeze the feature dimension for single feature + expected_shape = (self.batch_size, self.embedding_dim) + self.assertEqual(outputs.shape, expected_shape) + + def test_invalid_embedding_type(self): + """Test that invalid embedding type raises an error.""" + with self.assertRaises(ValueError): + AdvancedNumericalEmbedding( + embedding_dim=self.embedding_dim, + embedding_types=["invalid_type"] + ) + + def test_serialization(self): + """Test that the layer can be serialized and deserialized.""" + layer = AdvancedNumericalEmbedding( + embedding_dim=self.embedding_dim, + embedding_types=["periodic", "ple"], + num_frequencies=self.num_frequencies, + num_segments=self.num_segments, + use_gating=True + ) + + inputs = tf.random.normal((self.batch_size, self.num_features)) + layer.build(inputs.shape) + + # Get original output + original_output = layer(inputs) + + # Serialize and deserialize + config = layer.get_config() + new_layer = AdvancedNumericalEmbedding.from_config(config) + new_layer.build(inputs.shape) + + # Get new output + new_output = new_layer(inputs) + + # Check that the structure is preserved (shapes should be the same) + self.assertEqual(original_output.shape, new_output.shape) + + # Check that outputs are finite (layer works correctly after deserialization) + self.assertTrue(tf.reduce_all(tf.math.is_finite(new_output))) + + # Note: We don't check that outputs are identical because weights are reinitialized + # during deserialization, which is the expected behavior in Keras + + def test_gradient_flow(self): + """Test that gradients flow through the layer.""" + layer = AdvancedNumericalEmbedding( + embedding_dim=self.embedding_dim, + embedding_types=["periodic", "ple"], + use_gating=True + ) + + inputs = tf.random.normal((self.batch_size, self.num_features)) + layer.build(inputs.shape) + + with tf.GradientTape() as tape: + outputs = layer(inputs) + loss = tf.reduce_mean(outputs) + + # Compute gradients + gradients = tape.gradient(loss, layer.trainable_variables) + + # Check that gradients exist and are finite + for grad in gradients: + self.assertIsNotNone(grad) + self.assertTrue(tf.reduce_all(tf.math.is_finite(grad))) + + def test_different_input_shapes(self): + """Test AdvancedNumericalEmbedding with different input shapes.""" + layer = AdvancedNumericalEmbedding( + embedding_dim=self.embedding_dim, + embedding_types=["dual_branch"] + ) + + # Test different batch sizes + for batch_size in [1, 16, 64]: + with self.subTest(batch_size=batch_size): + inputs = tf.random.normal((batch_size, self.num_features)) + layer.build(inputs.shape) + outputs = layer(inputs) + + expected_shape = (batch_size, self.num_features, self.embedding_dim) + self.assertEqual(outputs.shape, expected_shape) + + def test_parameter_count(self): + """Test that the layer has the expected number of parameters.""" + layer = AdvancedNumericalEmbedding( + embedding_dim=self.embedding_dim, + embedding_types=["periodic", "ple", "dual_branch"], + use_gating=True + ) + + inputs = tf.random.normal((self.batch_size, self.num_features)) + layer.build(inputs.shape) + + # Count trainable parameters + total_params = layer.count_params() + + # Should have parameters from all embedding types plus gates + self.assertGreater(total_params, 0) + + def test_gate_normalization(self): + """Test that gates are properly normalized.""" + layer = AdvancedNumericalEmbedding( + embedding_dim=self.embedding_dim, + embedding_types=["periodic", "ple"], + use_gating=True + ) + + inputs = tf.random.normal((self.batch_size, self.num_features)) + layer.build(inputs.shape) + + # Get gate values + gates = {k: tf.nn.sigmoid(v) for k, v in layer.gates.items()} + + # Check that gates are between 0 and 1 + for gate_name, gate_values in gates.items(): + self.assertTrue(tf.reduce_all(gate_values >= 0)) + self.assertTrue(tf.reduce_all(gate_values <= 1)) + + def test_embedding_layer_creation(self): + """Test that embedding layers are properly created.""" + layer = AdvancedNumericalEmbedding( + embedding_dim=self.embedding_dim, + embedding_types=["periodic", "ple", "dual_branch"] + ) + + inputs = tf.random.normal((self.batch_size, self.num_features)) + layer.build(inputs.shape) + + # Check that embedding layers exist + self.assertIn("periodic", layer.embedding_layers) + self.assertIn("ple", layer.embedding_layers) + self.assertIn("dual_branch", layer.embedding_layers) + + def test_configuration_options(self): + """Test various configuration options.""" + configs = [ + { + "embedding_types": ["periodic"], + "num_frequencies": 6, + "frequency_init": "constant" + }, + { + "embedding_types": ["ple"], + "num_segments": 12, + "segment_init": "uniform" + }, + { + "embedding_types": ["dual_branch"], + "num_bins": 15, + "init_min": -5.0, + "init_max": 5.0 + } + ] + + for config in configs: + with self.subTest(config=config): + layer = AdvancedNumericalEmbedding( + embedding_dim=self.embedding_dim, + **config + ) + + inputs = tf.random.normal((self.batch_size, self.num_features)) + layer.build(inputs.shape) + outputs = layer(inputs) + + expected_shape = (self.batch_size, self.num_features, self.embedding_dim) + self.assertEqual(outputs.shape, expected_shape) + + def test_training_inference_modes(self): + """Test that the layer works in both training and inference modes.""" + layer = AdvancedNumericalEmbedding( + embedding_dim=self.embedding_dim, + embedding_types=["periodic", "ple"], + dropout_rate=0.5 + ) + + inputs = tf.random.normal((self.batch_size, self.num_features)) + layer.build(inputs.shape) + + # Test training mode + outputs_train = layer(inputs, training=True) + expected_shape = (self.batch_size, self.num_features, self.embedding_dim) + self.assertEqual(outputs_train.shape, expected_shape) + + # Test inference mode + outputs_inference = layer(inputs, training=False) + self.assertEqual(outputs_inference.shape, expected_shape) + + def test_empty_embedding_types(self): + """Test that empty embedding types list uses default.""" + layer = AdvancedNumericalEmbedding( + embedding_dim=self.embedding_dim, + embedding_types=None # Should default to ["dual_branch"] + ) + + inputs = tf.random.normal((self.batch_size, self.num_features)) + layer.build(inputs.shape) + outputs = layer(inputs) + + expected_shape = (self.batch_size, self.num_features, self.embedding_dim) + self.assertEqual(outputs.shape, expected_shape) + + # Should have dual_branch embedding layer + self.assertIn("dual_branch", layer.embedding_layers) + + def test_combined_embedding_behavior(self): + """Test that combined embeddings behave correctly.""" + layer = AdvancedNumericalEmbedding( + embedding_dim=self.embedding_dim, + embedding_types=["periodic", "ple"], + use_gating=True + ) + + inputs = tf.random.normal((self.batch_size, self.num_features)) + layer.build(inputs.shape) + + # Get individual embeddings + periodic_embedding = layer.embedding_layers["periodic"](inputs) + ple_embedding = layer.embedding_layers["ple"](inputs) + + # Get combined output + combined_output = layer(inputs) + + # Combined output should have the same shape as individual embeddings + self.assertEqual(combined_output.shape, periodic_embedding.shape) + self.assertEqual(combined_output.shape, ple_embedding.shape) + + +if __name__ == "__main__": + unittest.main() \ No newline at end of file diff --git a/test/layers/test_periodic_embedding.py b/test/layers/test_periodic_embedding.py new file mode 100644 index 0000000..cc1b72f --- /dev/null +++ b/test/layers/test_periodic_embedding.py @@ -0,0 +1,302 @@ +import unittest +import numpy as np +import tensorflow as tf + +from kdp.layers.periodic_embedding_layer import PeriodicEmbedding + + +class TestPeriodicEmbedding(unittest.TestCase): + """Test cases for PeriodicEmbedding layer.""" + + def setUp(self): + """Set up test fixtures.""" + self.batch_size = 32 + self.num_features = 5 + self.embedding_dim = 8 + self.num_frequencies = 4 + + def test_basic_functionality(self): + """Test basic functionality of PeriodicEmbedding layer.""" + layer = PeriodicEmbedding( + embedding_dim=self.embedding_dim, + num_frequencies=self.num_frequencies, + name="test_periodic" + ) + + # Create input data + inputs = tf.random.normal((self.batch_size, self.num_features)) + + # Build the layer + layer.build(inputs.shape) + + # Test forward pass + outputs = layer(inputs) + + # Check output shape + expected_shape = (self.batch_size, self.num_features, self.embedding_dim) + self.assertEqual(outputs.shape, expected_shape) + + # Check that outputs are finite + self.assertTrue(tf.reduce_all(tf.math.is_finite(outputs))) + + def test_single_feature(self): + """Test PeriodicEmbedding with single feature.""" + layer = PeriodicEmbedding( + embedding_dim=self.embedding_dim, + num_frequencies=self.num_frequencies + ) + + # Single feature input + inputs = tf.random.normal((self.batch_size, 1)) + + # Build and test + layer.build(inputs.shape) + outputs = layer(inputs) + + # Should squeeze the feature dimension for single feature + expected_shape = (self.batch_size, self.embedding_dim) + self.assertEqual(outputs.shape, expected_shape) + + def test_frequency_initialization_methods(self): + """Test different frequency initialization methods.""" + init_methods = ["uniform", "log_uniform", "constant"] + + for init_method in init_methods: + with self.subTest(init_method=init_method): + layer = PeriodicEmbedding( + embedding_dim=self.embedding_dim, + num_frequencies=self.num_frequencies, + frequency_init=init_method + ) + + inputs = tf.random.normal((self.batch_size, self.num_features)) + layer.build(inputs.shape) + outputs = layer(inputs) + + # Check output shape + expected_shape = (self.batch_size, self.num_features, self.embedding_dim) + self.assertEqual(outputs.shape, expected_shape) + + # Check that frequencies are positive + frequencies = layer.frequencies + self.assertTrue(tf.reduce_all(frequencies > 0)) + + def test_without_mlp(self): + """Test PeriodicEmbedding without MLP.""" + layer = PeriodicEmbedding( + embedding_dim=self.embedding_dim, + num_frequencies=self.num_frequencies, + use_mlp=False + ) + + inputs = tf.random.normal((self.batch_size, self.num_features)) + layer.build(inputs.shape) + outputs = layer(inputs) + + # Without MLP, output should be 2 * num_frequencies + expected_shape = (self.batch_size, self.num_features, 2 * self.num_frequencies) + self.assertEqual(outputs.shape, expected_shape) + + def test_without_residual(self): + """Test PeriodicEmbedding without residual connections.""" + layer = PeriodicEmbedding( + embedding_dim=self.embedding_dim, + num_frequencies=self.num_frequencies, + use_residual=False + ) + + inputs = tf.random.normal((self.batch_size, self.num_features)) + layer.build(inputs.shape) + outputs = layer(inputs) + + expected_shape = (self.batch_size, self.num_features, self.embedding_dim) + self.assertEqual(outputs.shape, expected_shape) + + def test_without_batch_norm(self): + """Test PeriodicEmbedding without batch normalization.""" + layer = PeriodicEmbedding( + embedding_dim=self.embedding_dim, + num_frequencies=self.num_frequencies, + use_batch_norm=False + ) + + inputs = tf.random.normal((self.batch_size, self.num_features)) + layer.build(inputs.shape) + outputs = layer(inputs) + + expected_shape = (self.batch_size, self.num_features, self.embedding_dim) + self.assertEqual(outputs.shape, expected_shape) + + def test_dropout(self): + """Test PeriodicEmbedding with dropout.""" + layer = PeriodicEmbedding( + embedding_dim=self.embedding_dim, + num_frequencies=self.num_frequencies, + dropout_rate=0.5 + ) + + inputs = tf.random.normal((self.batch_size, self.num_features)) + layer.build(inputs.shape) + + # Test training mode + outputs_train = layer(inputs, training=True) + expected_shape = (self.batch_size, self.num_features, self.embedding_dim) + self.assertEqual(outputs_train.shape, expected_shape) + + # Test inference mode + outputs_inference = layer(inputs, training=False) + self.assertEqual(outputs_inference.shape, expected_shape) + + def test_frequency_constraints(self): + """Test that frequencies are constrained to be positive.""" + layer = PeriodicEmbedding( + embedding_dim=self.embedding_dim, + num_frequencies=self.num_frequencies + ) + + inputs = tf.random.normal((self.batch_size, self.num_features)) + layer.build(inputs.shape) + + # Check that frequencies are positive + frequencies = layer.frequencies + self.assertTrue(tf.reduce_all(frequencies > 0)) + + def test_serialization(self): + """Test that the layer can be serialized and deserialized.""" + # Set random seed for reproducible results + tf.random.set_seed(42) + + layer = PeriodicEmbedding( + embedding_dim=self.embedding_dim, + num_frequencies=self.num_frequencies, + frequency_init="log_uniform", + min_frequency=1e-3, + max_frequency=1e3 + ) + + inputs = tf.random.normal((self.batch_size, self.num_features)) + layer.build(inputs.shape) + + # Get original output + original_output = layer(inputs) + + # Serialize and deserialize + config = layer.get_config() + new_layer = PeriodicEmbedding.from_config(config) + new_layer.build(inputs.shape) + + # Get new output + new_output = new_layer(inputs) + + # Check that the structure is preserved (shapes should be the same) + self.assertEqual(original_output.shape, new_output.shape) + + # Check that outputs are finite (layer works correctly after deserialization) + self.assertTrue(tf.reduce_all(tf.math.is_finite(new_output))) + + # Note: We don't check that outputs are identical because weights are reinitialized + # during deserialization, which is the expected behavior in Keras + + def test_invalid_frequency_init(self): + """Test that invalid frequency initialization raises an error.""" + with self.assertRaises(ValueError): + PeriodicEmbedding( + embedding_dim=self.embedding_dim, + num_frequencies=self.num_frequencies, + frequency_init="invalid_method" + ) + + def test_periodic_properties(self): + """Test that the layer exhibits periodic properties.""" + layer = PeriodicEmbedding( + embedding_dim=self.embedding_dim, + num_frequencies=self.num_frequencies, + use_mlp=False # Disable MLP to see raw periodic features + ) + + # Create inputs with known periodicity + x1 = tf.constant([[1.0, 2.0, 3.0]]) + x2 = tf.constant([[1.0 + 2*np.pi, 2.0 + 2*np.pi, 3.0 + 2*np.pi]]) + + layer.build(x1.shape) + + # Get outputs + y1 = layer(x1) + y2 = layer(x2) + + # For some frequencies, outputs should be similar due to periodicity + # (Note: this is a basic test, actual periodicity depends on learned frequencies) + self.assertEqual(y1.shape, y2.shape) + + def test_gradient_flow(self): + """Test that gradients flow through the layer.""" + layer = PeriodicEmbedding( + embedding_dim=self.embedding_dim, + num_frequencies=self.num_frequencies + ) + + inputs = tf.random.normal((self.batch_size, self.num_features)) + layer.build(inputs.shape) + + with tf.GradientTape() as tape: + outputs = layer(inputs) + loss = tf.reduce_mean(outputs) + + # Compute gradients + gradients = tape.gradient(loss, layer.trainable_variables) + + # Check that gradients exist and are finite + for grad in gradients: + self.assertIsNotNone(grad) + self.assertTrue(tf.reduce_all(tf.math.is_finite(grad))) + + def test_different_input_shapes(self): + """Test PeriodicEmbedding with different input shapes.""" + layer = PeriodicEmbedding( + embedding_dim=self.embedding_dim, + num_frequencies=self.num_frequencies + ) + + # Test different batch sizes + for batch_size in [1, 16, 64]: + with self.subTest(batch_size=batch_size): + inputs = tf.random.normal((batch_size, self.num_features)) + layer.build(inputs.shape) + outputs = layer(inputs) + + expected_shape = (batch_size, self.num_features, self.embedding_dim) + self.assertEqual(outputs.shape, expected_shape) + + def test_parameter_count(self): + """Test that the layer has the expected number of parameters.""" + layer = PeriodicEmbedding( + embedding_dim=self.embedding_dim, + num_frequencies=self.num_frequencies + ) + + inputs = tf.random.normal((self.batch_size, self.num_features)) + layer.build(inputs.shape) + + # Count trainable parameters + total_params = layer.count_params() + + # Expected parameters: + # - frequencies: num_features * num_frequencies + # - MLP layers: (2 * num_frequencies) * mlp_hidden_units + mlp_hidden_units + mlp_hidden_units * embedding_dim + embedding_dim + # - residual projection: 1 * embedding_dim + embedding_dim (weights + bias) + # - batch norm parameters (if enabled): 2 * embedding_dim (gamma and beta) + expected_frequencies = self.num_features * self.num_frequencies + expected_mlp = (2 * self.num_frequencies) * 16 + 16 + 16 * self.embedding_dim + self.embedding_dim + expected_residual = 1 * self.embedding_dim + self.embedding_dim + expected_batch_norm = 2 * self.embedding_dim # gamma and beta + + expected_total = expected_frequencies + expected_mlp + expected_residual + expected_batch_norm + + # Add non-trainable parameters from batch norm (moving_mean and moving_variance) + # The layer is created with default settings, so use_batch_norm=True + expected_total += 2 * self.embedding_dim # moving_mean and moving_variance + self.assertEqual(total_params, expected_total) + + +if __name__ == "__main__": + unittest.main() \ No newline at end of file diff --git a/test/layers/test_ple_embedding.py b/test/layers/test_ple_embedding.py new file mode 100644 index 0000000..bc78a88 --- /dev/null +++ b/test/layers/test_ple_embedding.py @@ -0,0 +1,351 @@ +import unittest +import numpy as np +import tensorflow as tf + +from kdp.layers.ple_embedding_layer import PLEEmbedding + + +class TestPLEEmbedding(unittest.TestCase): + """Test cases for PLEEmbedding layer.""" + + def setUp(self): + """Set up test fixtures.""" + self.batch_size = 32 + self.num_features = 5 + self.embedding_dim = 8 + self.num_segments = 8 + + def test_basic_functionality(self): + """Test basic functionality of PLEEmbedding layer.""" + layer = PLEEmbedding( + embedding_dim=self.embedding_dim, + num_segments=self.num_segments, + name="test_ple" + ) + + # Create input data + inputs = tf.random.normal((self.batch_size, self.num_features)) + + # Build the layer + layer.build(inputs.shape) + + # Test forward pass + outputs = layer(inputs) + + # Check output shape + expected_shape = (self.batch_size, self.num_features, self.embedding_dim) + self.assertEqual(outputs.shape, expected_shape) + + # Check that outputs are finite + self.assertTrue(tf.reduce_all(tf.math.is_finite(outputs))) + + def test_single_feature(self): + """Test PLEEmbedding with single feature.""" + layer = PLEEmbedding( + embedding_dim=self.embedding_dim, + num_segments=self.num_segments + ) + + # Single feature input + inputs = tf.random.normal((self.batch_size, 1)) + + # Build and test + layer.build(inputs.shape) + outputs = layer(inputs) + + # Should squeeze the feature dimension for single feature + expected_shape = (self.batch_size, self.embedding_dim) + self.assertEqual(outputs.shape, expected_shape) + + def test_segment_initialization_methods(self): + """Test different segment initialization methods.""" + init_methods = ["uniform", "quantile"] + + for init_method in init_methods: + with self.subTest(init_method=init_method): + layer = PLEEmbedding( + embedding_dim=self.embedding_dim, + num_segments=self.num_segments, + segment_init=init_method + ) + + inputs = tf.random.normal((self.batch_size, self.num_features)) + layer.build(inputs.shape) + outputs = layer(inputs) + + # Check output shape + expected_shape = (self.batch_size, self.num_features, self.embedding_dim) + self.assertEqual(outputs.shape, expected_shape) + + # Check that segment boundaries are properly shaped + boundaries = layer.segment_boundaries + expected_boundary_shape = (self.num_features, self.num_segments + 1) + self.assertEqual(boundaries.shape, expected_boundary_shape) + + def test_without_mlp(self): + """Test PLEEmbedding without MLP.""" + layer = PLEEmbedding( + embedding_dim=self.embedding_dim, + num_segments=self.num_segments, + use_mlp=False + ) + + inputs = tf.random.normal((self.batch_size, self.num_features)) + layer.build(inputs.shape) + outputs = layer(inputs) + + # Without MLP, output should be num_segments + expected_shape = (self.batch_size, self.num_features, self.num_segments) + self.assertEqual(outputs.shape, expected_shape) + + def test_without_residual(self): + """Test PLEEmbedding without residual connections.""" + layer = PLEEmbedding( + embedding_dim=self.embedding_dim, + num_segments=self.num_segments, + use_residual=False + ) + + inputs = tf.random.normal((self.batch_size, self.num_features)) + layer.build(inputs.shape) + outputs = layer(inputs) + + expected_shape = (self.batch_size, self.num_features, self.embedding_dim) + self.assertEqual(outputs.shape, expected_shape) + + def test_without_batch_norm(self): + """Test PLEEmbedding without batch normalization.""" + layer = PLEEmbedding( + embedding_dim=self.embedding_dim, + num_segments=self.num_segments, + use_batch_norm=False + ) + + inputs = tf.random.normal((self.batch_size, self.num_features)) + layer.build(inputs.shape) + outputs = layer(inputs) + + expected_shape = (self.batch_size, self.num_features, self.embedding_dim) + self.assertEqual(outputs.shape, expected_shape) + + def test_dropout(self): + """Test PLEEmbedding with dropout.""" + layer = PLEEmbedding( + embedding_dim=self.embedding_dim, + num_segments=self.num_segments, + dropout_rate=0.5 + ) + + inputs = tf.random.normal((self.batch_size, self.num_features)) + layer.build(inputs.shape) + + # Test training mode + outputs_train = layer(inputs, training=True) + expected_shape = (self.batch_size, self.num_features, self.embedding_dim) + self.assertEqual(outputs_train.shape, expected_shape) + + # Test inference mode + outputs_inference = layer(inputs, training=False) + self.assertEqual(outputs_inference.shape, expected_shape) + + def test_activation_functions(self): + """Test PLEEmbedding with different activation functions.""" + activations = ["relu", "sigmoid", "tanh", "linear"] + + for activation in activations: + with self.subTest(activation=activation): + layer = PLEEmbedding( + embedding_dim=self.embedding_dim, + num_segments=self.num_segments, + activation=activation + ) + + inputs = tf.random.normal((self.batch_size, self.num_features)) + layer.build(inputs.shape) + outputs = layer(inputs) + + expected_shape = (self.batch_size, self.num_features, self.embedding_dim) + self.assertEqual(outputs.shape, expected_shape) + + def test_invalid_activation(self): + """Test that invalid activation function raises an error.""" + with self.assertRaises(ValueError): + PLEEmbedding( + embedding_dim=self.embedding_dim, + num_segments=self.num_segments, + activation="invalid_activation" + ) + + def test_invalid_segment_init(self): + """Test that invalid segment initialization raises an error.""" + with self.assertRaises(ValueError): + PLEEmbedding( + embedding_dim=self.embedding_dim, + num_segments=self.num_segments, + segment_init="invalid_method" + ) + + def test_serialization(self): + """Test that the layer can be serialized and deserialized.""" + layer = PLEEmbedding( + embedding_dim=self.embedding_dim, + num_segments=self.num_segments, + segment_init="uniform", + activation="relu" + ) + + inputs = tf.random.normal((self.batch_size, self.num_features)) + layer.build(inputs.shape) + + # Get original output + original_output = layer(inputs) + + # Serialize and deserialize + config = layer.get_config() + new_layer = PLEEmbedding.from_config(config) + new_layer.build(inputs.shape) + + # Get new output + new_output = new_layer(inputs) + + # Check that the structure is preserved (shapes should be the same) + self.assertEqual(original_output.shape, new_output.shape) + + # Check that outputs are finite (layer works correctly after deserialization) + self.assertTrue(tf.reduce_all(tf.math.is_finite(new_output))) + + # Note: We don't check that outputs are identical because weights are reinitialized + # during deserialization, which is the expected behavior in Keras + + def test_piecewise_linear_properties(self): + """Test that the layer exhibits piecewise linear properties.""" + layer = PLEEmbedding( + embedding_dim=self.embedding_dim, + num_segments=self.num_segments, + use_mlp=False # Disable MLP to see raw PLE features + ) + + # Create inputs within different segments + x1 = tf.constant([[0.5, 1.5, 2.5]]) # Within segment boundaries + x2 = tf.constant([[0.6, 1.6, 2.6]]) # Slightly different values + + layer.build(x1.shape) + + # Get outputs + y1 = layer(x1) + y2 = layer(x2) + + # Outputs should have the same shape + self.assertEqual(y1.shape, y2.shape) + + def test_gradient_flow(self): + """Test that gradients flow through the layer.""" + layer = PLEEmbedding( + embedding_dim=self.embedding_dim, + num_segments=self.num_segments + ) + + inputs = tf.random.normal((self.batch_size, self.num_features)) + layer.build(inputs.shape) + + with tf.GradientTape() as tape: + outputs = layer(inputs) + loss = tf.reduce_mean(outputs) + + # Compute gradients + gradients = tape.gradient(loss, layer.trainable_variables) + + # Check that gradients exist and are finite + for grad in gradients: + self.assertIsNotNone(grad) + self.assertTrue(tf.reduce_all(tf.math.is_finite(grad))) + + def test_different_input_shapes(self): + """Test PLEEmbedding with different input shapes.""" + layer = PLEEmbedding( + embedding_dim=self.embedding_dim, + num_segments=self.num_segments + ) + + # Test different batch sizes + for batch_size in [1, 16, 64]: + with self.subTest(batch_size=batch_size): + inputs = tf.random.normal((batch_size, self.num_features)) + layer.build(inputs.shape) + outputs = layer(inputs) + + expected_shape = (batch_size, self.num_features, self.embedding_dim) + self.assertEqual(outputs.shape, expected_shape) + + def test_parameter_count(self): + """Test that the layer has the expected number of parameters.""" + layer = PLEEmbedding( + embedding_dim=self.embedding_dim, + num_segments=self.num_segments + ) + + inputs = tf.random.normal((self.batch_size, self.num_features)) + layer.build(inputs.shape) + + # Count trainable parameters + total_params = layer.count_params() + + # Expected parameters: + # - segment_boundaries: num_features * (num_segments + 1) + # - segment_slopes: num_features * num_segments + # - segment_intercepts: num_features * num_segments + # - MLP layers: num_segments * mlp_hidden_units + mlp_hidden_units + mlp_hidden_units * embedding_dim + embedding_dim + # - residual projection: 1 * embedding_dim + embedding_dim (weights + bias) + # - batch norm parameters (if enabled): 2 * embedding_dim (gamma and beta) + expected_boundaries = self.num_features * (self.num_segments + 1) + expected_slopes = self.num_features * self.num_segments + expected_intercepts = self.num_features * self.num_segments + expected_mlp = self.num_segments * 16 + 16 + 16 * self.embedding_dim + self.embedding_dim + expected_residual = 1 * self.embedding_dim + self.embedding_dim + expected_batch_norm = 2 * self.embedding_dim # gamma and beta + + expected_total = (expected_boundaries + expected_slopes + expected_intercepts + + expected_mlp + expected_residual + expected_batch_norm) + + # Add non-trainable parameters from batch norm (moving_mean and moving_variance) + expected_total += 2 * self.embedding_dim # moving_mean and moving_variance + self.assertEqual(total_params, expected_total) + + def test_segment_boundaries_ordering(self): + """Test that segment boundaries are properly ordered.""" + layer = PLEEmbedding( + embedding_dim=self.embedding_dim, + num_segments=self.num_segments + ) + + inputs = tf.random.normal((self.batch_size, self.num_features)) + layer.build(inputs.shape) + + # Check that boundaries are monotonically increasing + boundaries = layer.segment_boundaries + for i in range(self.num_features): + feature_boundaries = boundaries[i] + self.assertTrue(tf.reduce_all(feature_boundaries[1:] >= feature_boundaries[:-1])) + + def test_clipping_behavior(self): + """Test that inputs are properly clipped to segment boundaries.""" + layer = PLEEmbedding( + embedding_dim=self.embedding_dim, + num_segments=self.num_segments, + use_mlp=False # Disable MLP to see raw PLE features + ) + + # Create inputs outside the expected range + inputs = tf.constant([[10.0, -10.0, 5.0, 8.0, -8.0]]) # Values outside [-3, 3] range + + layer.build(inputs.shape) + outputs = layer(inputs) + + # Outputs should still be finite and have correct shape + expected_shape = (1, self.num_features, self.num_segments) + self.assertEqual(outputs.shape, expected_shape) + self.assertTrue(tf.reduce_all(tf.math.is_finite(outputs))) + + +if __name__ == "__main__": + unittest.main() \ No newline at end of file diff --git a/test/test_advanced_numerical_embeddings_integration.py b/test/test_advanced_numerical_embeddings_integration.py new file mode 100644 index 0000000..eb7f524 --- /dev/null +++ b/test/test_advanced_numerical_embeddings_integration.py @@ -0,0 +1,390 @@ +import unittest +import numpy as np +import pandas as pd +import tensorflow as tf + +from kdp import PreprocessingModel +from kdp.features import NumericalFeature, FeatureType + + +class TestAdvancedNumericalEmbeddingsIntegration(unittest.TestCase): + """Integration tests for advanced numerical embeddings with KDP processor.""" + + def setUp(self): + """Set up test fixtures.""" + # Create sample data + np.random.seed(42) + self.n_samples = 100 + self.data = pd.DataFrame({ + 'age': np.random.normal(35, 10, self.n_samples), + 'income': np.random.lognormal(10, 0.5, self.n_samples), + 'credit_score': np.random.uniform(300, 850, self.n_samples), + 'debt_ratio': np.random.beta(2, 5, self.n_samples), + 'target': np.random.binomial(1, 0.3, self.n_samples) + }) + + # Save data to temporary file + self.data_path = "temp_test_data.csv" + self.data.to_csv(self.data_path, index=False) + + def tearDown(self): + """Clean up test fixtures.""" + import os + if os.path.exists(self.data_path): + os.remove(self.data_path) + + def test_periodic_embedding_integration(self): + """Test integration of periodic embeddings with KDP processor.""" + # Define features with periodic embedding + features_specs = { + 'age': NumericalFeature( + name='age', + feature_type=FeatureType.FLOAT_NORMALIZED, + use_embedding=True, + embedding_type='periodic', + embedding_dim=8, + num_frequencies=4 + ), + 'income': NumericalFeature( + name='income', + feature_type=FeatureType.FLOAT_RESCALED, + use_embedding=True, + embedding_type='periodic', + embedding_dim=8, + num_frequencies=4 + ) + } + + # Create preprocessor with periodic embeddings + preprocessor = PreprocessingModel( + path_data=self.data_path, + features_specs=features_specs, + use_advanced_numerical_embedding=True, + use_periodic_embedding=True, + embedding_dim=8, + num_frequencies=4, + output_mode="dict" + ) + + # Build the preprocessor + model_config = preprocessor.build_preprocessor() + + # Check that periodic embedding layers are present + layer_names = [layer.get('name', '') for layer in model_config['layers']] + periodic_layers = [name for name in layer_names if 'periodic' in name] + self.assertGreater(len(periodic_layers), 0) + + # Test prediction + test_data = self.data.head(10) + predictions = preprocessor.predict(test_data) + + # Check that predictions have the expected structure + self.assertIsInstance(predictions, dict) + for feature_name in ['age', 'income']: + self.assertIn(feature_name, predictions) + # Should be 3D tensor (batch, features, embedding_dim) + self.assertEqual(len(predictions[feature_name].shape), 3) + + def test_ple_embedding_integration(self): + """Test integration of PLE embeddings with KDP processor.""" + # Define features with PLE embedding + features_specs = { + 'credit_score': NumericalFeature( + name='credit_score', + feature_type=FeatureType.FLOAT_NORMALIZED, + use_embedding=True, + embedding_type='ple', + embedding_dim=8, + num_segments=8 + ), + 'debt_ratio': NumericalFeature( + name='debt_ratio', + feature_type=FeatureType.FLOAT_NORMALIZED, + use_embedding=True, + embedding_type='ple', + embedding_dim=8, + num_segments=8 + ) + } + + # Create preprocessor with PLE embeddings + preprocessor = PreprocessingModel( + path_data=self.data_path, + features_specs=features_specs, + use_advanced_numerical_embedding=True, + use_ple_embedding=True, + embedding_dim=8, + num_segments=8, + output_mode="dict" + ) + + # Build the preprocessor + model_config = preprocessor.build_preprocessor() + + # Check that PLE embedding layers are present + layer_names = [layer.get('name', '') for layer in model_config['layers']] + ple_layers = [name for name in layer_names if 'ple' in name] + self.assertGreater(len(ple_layers), 0) + + # Test prediction + test_data = self.data.head(10) + predictions = preprocessor.predict(test_data) + + # Check that predictions have the expected structure + self.assertIsInstance(predictions, dict) + for feature_name in ['credit_score', 'debt_ratio']: + self.assertIn(feature_name, predictions) + # Should be 3D tensor (batch, features, embedding_dim) + self.assertEqual(len(predictions[feature_name].shape), 3) + + def test_combined_embedding_integration(self): + """Test integration of combined embeddings with KDP processor.""" + # Define features with combined embedding + features_specs = { + 'age': NumericalFeature( + name='age', + feature_type=FeatureType.FLOAT_NORMALIZED, + use_embedding=True, + embedding_type='combined', + embedding_dim=8, + num_frequencies=4, + num_segments=8 + ), + 'income': NumericalFeature( + name='income', + feature_type=FeatureType.FLOAT_RESCALED, + use_embedding=True, + embedding_type='combined', + embedding_dim=8, + num_frequencies=4, + num_segments=8 + ) + } + + # Create preprocessor with combined embeddings + preprocessor = PreprocessingModel( + path_data=self.data_path, + features_specs=features_specs, + use_advanced_numerical_embedding=True, + use_advanced_combined_embedding=True, + embedding_dim=8, + embedding_types=['periodic', 'ple', 'dual_branch'], + num_frequencies=4, + num_segments=8, + output_mode="dict" + ) + + # Build the preprocessor + model_config = preprocessor.build_preprocessor() + + # Check that combined embedding layers are present + layer_names = [layer.get('name', '') for layer in model_config['layers']] + combined_layers = [name for name in layer_names if 'combined' in name] + self.assertGreater(len(combined_layers), 0) + + # Test prediction + test_data = self.data.head(10) + predictions = preprocessor.predict(test_data) + + # Check that predictions have the expected structure + self.assertIsInstance(predictions, dict) + for feature_name in ['age', 'income']: + self.assertIn(feature_name, predictions) + # Should be 3D tensor (batch, features, embedding_dim) + self.assertEqual(len(predictions[feature_name].shape), 3) + + def test_mixed_embedding_types(self): + """Test mixing different embedding types in the same model.""" + # Define features with different embedding types + features_specs = { + 'age': NumericalFeature( + name='age', + feature_type=FeatureType.FLOAT_NORMALIZED, + use_embedding=True, + embedding_type='periodic', + embedding_dim=8, + num_frequencies=4 + ), + 'income': NumericalFeature( + name='income', + feature_type=FeatureType.FLOAT_RESCALED, + use_embedding=True, + embedding_type='ple', + embedding_dim=8, + num_segments=8 + ), + 'credit_score': NumericalFeature( + name='credit_score', + feature_type=FeatureType.FLOAT_NORMALIZED, + use_embedding=True, + embedding_type='dual_branch', + embedding_dim=8, + num_bins=10 + ) + } + + # Create preprocessor with mixed embeddings + preprocessor = PreprocessingModel( + path_data=self.data_path, + features_specs=features_specs, + use_advanced_numerical_embedding=True, + output_mode="dict" + ) + + # Build the preprocessor + model_config = preprocessor.build_preprocessor() + + # Test prediction + test_data = self.data.head(10) + predictions = preprocessor.predict(test_data) + + # Check that predictions have the expected structure + self.assertIsInstance(predictions, dict) + for feature_name in ['age', 'income', 'credit_score']: + self.assertIn(feature_name, predictions) + # Should be 3D tensor (batch, features, embedding_dim) + self.assertEqual(len(predictions[feature_name].shape), 3) + + def test_concat_output_mode(self): + """Test advanced embeddings with concatenated output mode.""" + features_specs = { + 'age': NumericalFeature( + name='age', + feature_type=FeatureType.FLOAT_NORMALIZED, + use_embedding=True, + embedding_type='periodic', + embedding_dim=8, + num_frequencies=4 + ), + 'income': NumericalFeature( + name='income', + feature_type=FeatureType.FLOAT_RESCALED, + use_embedding=True, + embedding_type='ple', + embedding_dim=8, + num_segments=8 + ) + } + + # Create preprocessor with concatenated output + preprocessor = PreprocessingModel( + path_data=self.data_path, + features_specs=features_specs, + use_advanced_numerical_embedding=True, + output_mode="concat" + ) + + # Build the preprocessor + model_config = preprocessor.build_preprocessor() + + # Test prediction + test_data = self.data.head(10) + predictions = preprocessor.predict(test_data) + + # Should be a single tensor with concatenated features + self.assertIsInstance(predictions, np.ndarray) + # Shape should be (batch, total_embedding_dim) + expected_shape = (10, 16) # 8 + 8 = 16 + self.assertEqual(predictions.shape, expected_shape) + + def test_embedding_configuration_options(self): + """Test various configuration options for embeddings.""" + features_specs = { + 'age': NumericalFeature( + name='age', + feature_type=FeatureType.FLOAT_NORMALIZED, + use_embedding=True, + embedding_type='periodic', + embedding_dim=12, + num_frequencies=6, + kwargs={ + 'frequency_init': 'constant', + 'min_frequency': 1e-3, + 'max_frequency': 1e3, + 'dropout_rate': 0.2, + 'use_batch_norm': False + } + ) + } + + # Create preprocessor with custom configuration + preprocessor = PreprocessingModel( + path_data=self.data_path, + features_specs=features_specs, + use_advanced_numerical_embedding=True, + use_periodic_embedding=True, + embedding_dim=12, + num_frequencies=6, + frequency_init='constant', + min_frequency=1e-3, + max_frequency=1e3, + dropout_rate=0.2, + use_batch_norm=False, + output_mode="dict" + ) + + # Build the preprocessor + model_config = preprocessor.build_preprocessor() + + # Test prediction + test_data = self.data.head(10) + predictions = preprocessor.predict(test_data) + + # Check that predictions have the expected structure + self.assertIsInstance(predictions, dict) + self.assertIn('age', predictions) + # Should be 3D tensor with custom embedding dimension + self.assertEqual(predictions['age'].shape, (10, 1, 12)) + + def test_model_serialization(self): + """Test that models with advanced embeddings can be saved and loaded.""" + features_specs = { + 'age': NumericalFeature( + name='age', + feature_type=FeatureType.FLOAT_NORMALIZED, + use_embedding=True, + embedding_type='combined', + embedding_dim=8, + num_frequencies=4, + num_segments=8 + ) + } + + # Create preprocessor + preprocessor = PreprocessingModel( + path_data=self.data_path, + features_specs=features_specs, + use_advanced_numerical_embedding=True, + use_advanced_combined_embedding=True, + output_mode="dict" + ) + + # Build and save the model + model_config = preprocessor.build_preprocessor() + + # Test prediction before saving + test_data = self.data.head(5) + predictions_before = preprocessor.predict(test_data) + + # Save the model + save_path = "temp_test_model" + preprocessor.save_model(save_path) + + # Load the model + loaded_preprocessor, loaded_config = PreprocessingModel.load_model(save_path) + + # Test prediction after loading + predictions_after = loaded_preprocessor.predict(test_data) + + # Predictions should be the same + np.testing.assert_array_almost_equal( + predictions_before['age'], predictions_after['age'], decimal=5 + ) + + # Clean up + import shutil + shutil.rmtree(save_path, ignore_errors=True) + + +if __name__ == "__main__": + unittest.main() \ No newline at end of file