diff --git a/src/power_grid_model/validation/_validation.py b/src/power_grid_model/validation/_validation.py index f83f30e2b..875394386 100644 --- a/src/power_grid_model/validation/_validation.py +++ b/src/power_grid_model/validation/_validation.py @@ -10,6 +10,7 @@ """ import copy +import heapq from collections.abc import Sized as ABCSized from itertools import chain from typing import Literal @@ -66,6 +67,7 @@ from power_grid_model.validation.errors import ( IdNotInDatasetError, InvalidIdError, + InvalidTapRegulatorControlSideError, InvalidVoltageRegulationError, MissingValueError, MultiComponentNotUniqueError, @@ -1240,6 +1242,428 @@ def validate_regulator(data: SingleDataset, component: ComponentType) -> list[Va return errors +def _build_regulated_transformer_mappings( + regulator_data: np.ndarray, +) -> tuple[dict[int, int], dict[int, int]]: + """ + Build mappings of regulated transformers and their regulators. + + Args: + regulator_data: Array of transformer tap regulator data + + Returns: + Tuple of (regulated_transformers, regulator_id_by_transformer) where: + - regulated_transformers maps transformer_id -> control_side + - regulator_id_by_transformer maps transformer_id -> regulator_id + """ + regulated_transformers: dict[int, int] = {} + regulator_id_by_transformer: dict[int, int] = {} + + for regulator in regulator_data: + if regulator["status"] != 0: # Only active regulators + regulated_transformers[regulator["regulated_object"]] = regulator["control_side"] + regulator_id_by_transformer[regulator["regulated_object"]] = regulator["id"] + + return regulated_transformers, regulator_id_by_transformer + + +def _add_transformer_edges_to_graph( + data: SingleDataset, + graph: dict[int, list[tuple[int, str]]], + node_id_to_idx: dict[int, int], + regulated_transformers: dict[int, int], +) -> None: + """ + Add 2-winding transformer edges to the topology graph. + + Args: + data: Input dataset + graph: Adjacency list graph to update + node_id_to_idx: Mapping from node IDs to indices + regulated_transformers: Mapping of transformer_id -> control_side + """ + if ComponentType.transformer not in data: + return + + for transformer in data[ComponentType.transformer]: + if transformer["from_status"] == 0 or transformer["to_status"] == 0: + continue + + from_node_id = transformer["from_node"] + to_node_id = transformer["to_node"] + + if from_node_id not in node_id_to_idx or to_node_id not in node_id_to_idx: + continue + + from_idx = node_id_to_idx[from_node_id] + to_idx = node_id_to_idx[to_node_id] + transformer_id = transformer["id"] + + if transformer_id in regulated_transformers: + # For regulated transformers, add directed edge from non-control side to control side + control_side = regulated_transformers[transformer_id] + # BranchSide: from_side=0, to_side=1 + if control_side == int(BranchSide.from_side): + # Control side is from_side, so edge goes from to_side to from_side + graph[to_idx].append((from_idx, "regulated")) + else: # control_side == int(BranchSide.to_side) + # Control side is to_side, so edge goes from from_side to to_side + graph[from_idx].append((to_idx, "regulated")) + else: + # Unregulated transformers have bidirectional edges + graph[from_idx].append((to_idx, "unregulated")) + graph[to_idx].append((from_idx, "unregulated")) + + +def _add_three_winding_transformer_edges_to_graph( + data: SingleDataset, + graph: dict[int, list[tuple[int, str]]], + node_id_to_idx: dict[int, int], + regulated_transformers: dict[int, int], +) -> None: + """ + Add 3-winding transformer edges to the topology graph. + + Args: + data: Input dataset + graph: Adjacency list graph to update + node_id_to_idx: Mapping from node IDs to indices + regulated_transformers: Mapping of transformer_id -> control_side + """ + if ComponentType.three_winding_transformer not in data: + return + + for transformer3w in data[ComponentType.three_winding_transformer]: + if transformer3w["status_1"] == 0 or transformer3w["status_2"] == 0 or transformer3w["status_3"] == 0: + continue + + node_1_id = transformer3w["node_1"] + node_2_id = transformer3w["node_2"] + node_3_id = transformer3w["node_3"] + + if node_1_id not in node_id_to_idx or node_2_id not in node_id_to_idx or node_3_id not in node_id_to_idx: + continue + + node_1_idx = node_id_to_idx[node_1_id] + node_2_idx = node_id_to_idx[node_2_id] + node_3_idx = node_id_to_idx[node_3_id] + transformer_id = transformer3w["id"] + + if transformer_id in regulated_transformers: + control_side = regulated_transformers[transformer_id] + tap_side = transformer3w["tap_side"] + + # Branch3Side: side_1=0, side_2=1, side_3=2 + # Determine which node is the tap side and which is the control side + tap_node_idx = [node_1_idx, node_2_idx, node_3_idx][tap_side] + control_node_idx = [node_1_idx, node_2_idx, node_3_idx][control_side] + non_control_nodes = [idx for i, idx in enumerate([node_1_idx, node_2_idx, node_3_idx]) if i != control_side] + + if tap_side == control_side: + # Tap at control side: edges from both non-control sides to control side + for non_control_idx in non_control_nodes: + graph[non_control_idx].append((control_node_idx, "regulated")) + # Bidirectional edge between non-control sides + graph[non_control_nodes[0]].append((non_control_nodes[1], "unregulated")) + graph[non_control_nodes[1]].append((non_control_nodes[0], "unregulated")) + else: + # Tap at non-control side: edges from non-tap sides to their respective sides + for i, idx in enumerate([node_1_idx, node_2_idx, node_3_idx]): + if i == tap_side: + # From tap side to control side + graph[tap_node_idx].append((control_node_idx, "regulated")) + elif i != control_side: + # From other non-tap, non-control side to control side + graph[idx].append((control_node_idx, "regulated")) + # Bidirectional edge between tap and the other non-control side + other_non_control = next(idx for idx in non_control_nodes if idx != tap_node_idx) + graph[tap_node_idx].append((other_non_control, "unregulated")) + graph[other_non_control].append((tap_node_idx, "unregulated")) + else: + # Unregulated 3-winding transformers have bidirectional edges between all nodes + graph[node_1_idx].append((node_2_idx, "unregulated")) + graph[node_1_idx].append((node_3_idx, "unregulated")) + graph[node_2_idx].append((node_1_idx, "unregulated")) + graph[node_2_idx].append((node_3_idx, "unregulated")) + graph[node_3_idx].append((node_1_idx, "unregulated")) + graph[node_3_idx].append((node_2_idx, "unregulated")) + + +def _add_branch_edges_to_graph( + data: SingleDataset, graph: dict[int, list[tuple[int, str]]], node_id_to_idx: dict[int, int] +) -> None: + """ + Add line and link edges to the topology graph. + + Args: + data: Input dataset + graph: Adjacency list graph to update + node_id_to_idx: Mapping from node IDs to indices + """ + for branch_type in [ComponentType.line, ComponentType.link]: + if branch_type not in data: + continue + for branch in data[branch_type]: + if branch["from_status"] == 0 or branch["to_status"] == 0: + continue + + from_node_id = branch["from_node"] + to_node_id = branch["to_node"] + + if from_node_id not in node_id_to_idx or to_node_id not in node_id_to_idx: + continue + + from_idx = node_id_to_idx[from_node_id] + to_idx = node_id_to_idx[to_node_id] + + # Branches always have bidirectional edges + graph[from_idx].append((to_idx, "unregulated")) + graph[to_idx].append((from_idx, "unregulated")) + + +def _compute_distances_from_sources( + num_nodes: int, source_nodes: set[int], graph: dict[int, list[tuple[int, str]]] +) -> list[float]: + """ + Compute shortest distances from sources to all nodes using Dijkstra's algorithm. + + Args: + num_nodes: Number of nodes in the graph + source_nodes: Set of source node indices + graph: Adjacency list graph + + Returns: + List of distances from sources to each node (inf if unreachable) + """ + distances = [float("inf")] * num_nodes + + for source_idx in source_nodes: + # Use Dijkstra's algorithm + pq: list[tuple[float, int]] = [(0, source_idx)] + local_distances = [float("inf")] * num_nodes + local_distances[source_idx] = 0 + + while pq: + dist, u = heapq.heappop(pq) + + if dist > local_distances[u]: + continue + + for v, _ in graph[u]: + new_dist = local_distances[u] + 1 + if new_dist < local_distances[v]: + local_distances[v] = new_dist + heapq.heappush(pq, (new_dist, v)) + + # Update global distances with minimum from all sources + for i in range(num_nodes): + distances[i] = min(distances[i], local_distances[i]) + + return distances + + +def _check_two_winding_transformer_validity( + data: SingleDataset, + regulated_transformers: dict[int, int], + regulator_id_by_transformer: dict[int, int], + node_id_to_idx: dict[int, int], + distances: list[float], +) -> list[int]: + """ + Check 2-winding transformers for invalid control side configurations. + + Args: + data: Input dataset + regulated_transformers: Mapping of transformer_id -> control_side + regulator_id_by_transformer: Mapping of transformer_id -> regulator_id + node_id_to_idx: Mapping from node IDs to indices + distances: Distances from sources to each node + + Returns: + List of invalid regulator IDs + """ + invalid_regulator_ids: list[int] = [] + + if ComponentType.transformer not in data: + return invalid_regulator_ids + + for transformer in data[ComponentType.transformer]: + transformer_id = transformer["id"] + + if transformer_id not in regulated_transformers: + continue + + if transformer["from_status"] == 0 or transformer["to_status"] == 0: + continue + + from_node_id = transformer["from_node"] + to_node_id = transformer["to_node"] + + if from_node_id not in node_id_to_idx or to_node_id not in node_id_to_idx: + continue + + from_idx = node_id_to_idx[from_node_id] + to_idx = node_id_to_idx[to_node_id] + control_side = regulated_transformers[transformer_id] + + # Determine non-control side and control side + if control_side == int(BranchSide.from_side): + non_control_idx = to_idx + control_idx = from_idx + else: + non_control_idx = from_idx + control_idx = to_idx + + # Check if either side is unreachable + non_control_dist = distances[non_control_idx] + control_dist = distances[control_idx] + + # If at least one side is unreachable AND at least one is reachable, it's invalid + if (non_control_dist == float("inf") or control_dist == float("inf")) and ( + non_control_dist != float("inf") or control_dist != float("inf") + ): + regulator_id = regulator_id_by_transformer[transformer_id] + invalid_regulator_ids.append(regulator_id) + + return invalid_regulator_ids + + +def _check_three_winding_transformer_validity( + data: SingleDataset, + regulated_transformers: dict[int, int], + regulator_id_by_transformer: dict[int, int], + node_id_to_idx: dict[int, int], + distances: list[float], +) -> list[int]: + """ + Check 3-winding transformers for invalid control side configurations. + + Args: + data: Input dataset + regulated_transformers: Mapping of transformer_id -> control_side + regulator_id_by_transformer: Mapping of transformer_id -> regulator_id + node_id_to_idx: Mapping from node IDs to indices + distances: Distances from sources to each node + + Returns: + List of invalid regulator IDs + """ + invalid_regulator_ids: list[int] = [] + + if ComponentType.three_winding_transformer not in data: + return invalid_regulator_ids + + for transformer3w in data[ComponentType.three_winding_transformer]: + transformer_id = transformer3w["id"] + + if transformer_id not in regulated_transformers: + continue + + if transformer3w["status_1"] == 0 or transformer3w["status_2"] == 0 or transformer3w["status_3"] == 0: + continue + + node_1_id = transformer3w["node_1"] + node_2_id = transformer3w["node_2"] + node_3_id = transformer3w["node_3"] + + if node_1_id not in node_id_to_idx or node_2_id not in node_id_to_idx or node_3_id not in node_id_to_idx: + continue + + node_1_idx = node_id_to_idx[node_1_id] + node_2_idx = node_id_to_idx[node_2_id] + node_3_idx = node_id_to_idx[node_3_id] + + control_side = regulated_transformers[transformer_id] + control_node_idx = [node_1_idx, node_2_idx, node_3_idx][control_side] + non_control_nodes = [idx for i, idx in enumerate([node_1_idx, node_2_idx, node_3_idx]) if i != control_side] + + # Check if control side is reachable and at least one non-control side is unreachable + control_dist = distances[control_node_idx] + non_control_dists = [distances[idx] for idx in non_control_nodes] + + # If any endpoint is unreachable AND at least one is reachable, it's invalid + if (control_dist == float("inf") or any(d == float("inf") for d in non_control_dists)) and not all( + d == float("inf") for d in [control_dist, *non_control_dists] + ): + regulator_id = regulator_id_by_transformer[transformer_id] + invalid_regulator_ids.append(regulator_id) + + return invalid_regulator_ids + + +def validate_tap_regulator_control_side_topology(data: SingleDataset) -> list[ValidationError]: + """ + Validates that transformer tap regulators have valid control side configuration. + A transformer must be controlled from a side that is closer to or at the source, + not from a side that is farther from the source. + + This implements the same logic as the C++ tap_position_optimizer: + 1. Build a directed graph where regulated transformers have edges from non-control side to control side + 2. Use Dijkstra's algorithm to find shortest distances from sources to all nodes + 3. Flag transformers where the non-control side is unreachable from sources but control side is reachable + + Args: + data: A power-grid-model input dataset + + Returns: + A list of InvalidTapRegulatorControlSideError for transformers with invalid control side configuration + """ + # Early validation checks + if ( + ComponentType.transformer_tap_regulator not in data + or data[ComponentType.transformer_tap_regulator].size == 0 + or ComponentType.node not in data + ): + return [] + + regulator_data = data[ComponentType.transformer_tap_regulator] + node_data = data[ComponentType.node] + num_nodes = node_data.size + + # Create node ID to index mapping + node_id_to_idx = {node_id: idx for idx, node_id in enumerate(node_data["id"])} + + # Build adjacency list for the graph + graph: dict[int, list[tuple[int, str]]] = {i: [] for i in range(num_nodes)} + + # Get source nodes + source_nodes = set() + if ComponentType.source in data: + for source in data[ComponentType.source]: + if source["status"] != 0 and source["node"] in node_id_to_idx: + source_nodes.add(node_id_to_idx[source["node"]]) + + # Build mapping of regulated transformers + regulated_transformers, regulator_id_by_transformer = _build_regulated_transformer_mappings(regulator_data) + + # Build the graph by adding edges from all components + _add_transformer_edges_to_graph(data, graph, node_id_to_idx, regulated_transformers) + _add_three_winding_transformer_edges_to_graph(data, graph, node_id_to_idx, regulated_transformers) + _add_branch_edges_to_graph(data, graph, node_id_to_idx) + + # Compute distances from sources to all nodes + distances = _compute_distances_from_sources(num_nodes, source_nodes, graph) + + # Check for invalid configurations + invalid_regulator_ids = _check_two_winding_transformer_validity( + data, regulated_transformers, regulator_id_by_transformer, node_id_to_idx, distances + ) + invalid_regulator_ids.extend( + _check_three_winding_transformer_validity( + data, regulated_transformers, regulator_id_by_transformer, node_id_to_idx, distances + ) + ) + + if invalid_regulator_ids: + return [ + InvalidTapRegulatorControlSideError( + ComponentType.transformer_tap_regulator, "control_side", invalid_regulator_ids + ) + ] + + return [] + + def validate_transformer_tap_regulator(data: SingleDataset) -> list[ValidationError]: errors = validate_regulator(data, ComponentType.transformer_tap_regulator) errors += _all_boolean(data, ComponentType.transformer_tap_regulator, "status") @@ -1271,6 +1695,7 @@ def validate_transformer_tap_regulator(data: SingleDataset) -> list[ValidationEr errors += _all_greater_than_or_equal_to_zero( data, ComponentType.transformer_tap_regulator, "line_drop_compensation_x", 0.0 ) + errors += validate_tap_regulator_control_side_topology(data) return errors diff --git a/src/power_grid_model/validation/errors.py b/src/power_grid_model/validation/errors.py index 137bb9591..16a3b7871 100644 --- a/src/power_grid_model/validation/errors.py +++ b/src/power_grid_model/validation/errors.py @@ -608,3 +608,15 @@ class MissingVoltageAngleMeasurementError(MultiComponentValidationError): "Missing voltage angle measurement for {n} {objects}. " "If a voltage sensor measures the voltage of a terminal, it must also measure the voltage angle." ) + + +class InvalidTapRegulatorControlSideError(SingleFieldValidationError): + """ + The tap regulator has an invalid control side configuration. + The transformer is being controlled from non-source side towards source side. + """ + + _message = ( + "Field {field} contains invalid control side configuration for {n} {objects}. " + "The transformer(s) are being controlled from non-source side towards source side." + ) diff --git a/tests/unit/validation/test_tap_regulator_topology.py b/tests/unit/validation/test_tap_regulator_topology.py new file mode 100644 index 000000000..40258ffa6 --- /dev/null +++ b/tests/unit/validation/test_tap_regulator_topology.py @@ -0,0 +1,527 @@ +# SPDX-FileCopyrightText: Contributors to the Power Grid Model project +# +# SPDX-License-Identifier: MPL-2.0 + +""" +Tests for transformer tap regulator control side topology validation. +""" + +from power_grid_model import initialize_array +from power_grid_model._core.dataset_definitions import ComponentType, DatasetType +from power_grid_model.enum import Branch3Side, BranchSide +from power_grid_model.validation import validate_input_data +from power_grid_model.validation._validation import validate_tap_regulator_control_side_topology +from power_grid_model.validation.errors import InvalidTapRegulatorControlSideError + + +def test_valid_tap_regulator_control_side_simple(): + """Test valid control side configuration: source -> transformer with control on to_side""" + # Setup: source at node 0, transformer from 0 to 1, control from to_side (1) - VALID + node_input = initialize_array(DatasetType.input, ComponentType.node, 2) + node_input["id"] = [0, 1] + node_input["u_rated"] = [150e3, 10e3] + + source_input = initialize_array(DatasetType.input, ComponentType.source, 1) + source_input["id"] = [2] + source_input["node"] = [0] + source_input["status"] = [1] + source_input["u_ref"] = [1.0] + + transformer_input = initialize_array(DatasetType.input, ComponentType.transformer, 1) + transformer_input["id"] = [3] + transformer_input["from_node"] = [0] + transformer_input["to_node"] = [1] + transformer_input["from_status"] = [1] + transformer_input["to_status"] = [1] + transformer_input["u1"] = [150e3] + transformer_input["u2"] = [10e3] + transformer_input["sn"] = [1e5] + transformer_input["uk"] = [0.1] + transformer_input["pk"] = [1e3] + transformer_input["i0"] = [1.0e-6] + transformer_input["p0"] = [0.1] + transformer_input["winding_from"] = [1] + transformer_input["winding_to"] = [1] + transformer_input["clock"] = [0] + transformer_input["tap_side"] = [BranchSide.from_side] + transformer_input["tap_pos"] = [0] + transformer_input["tap_nom"] = [0] + transformer_input["tap_min"] = [-1] + transformer_input["tap_max"] = [1] + transformer_input["tap_size"] = [100] + + transformer_tap_regulator_input = initialize_array(DatasetType.input, ComponentType.transformer_tap_regulator, 1) + transformer_tap_regulator_input["id"] = [4] + transformer_tap_regulator_input["regulated_object"] = [3] + transformer_tap_regulator_input["status"] = [1] + transformer_tap_regulator_input["control_side"] = [BranchSide.to_side] # Control to_side (downstream) - VALID + transformer_tap_regulator_input["u_set"] = [10e3] + transformer_tap_regulator_input["u_band"] = [200] + + input_data = { + ComponentType.node: node_input, + ComponentType.source: source_input, + ComponentType.transformer: transformer_input, + ComponentType.transformer_tap_regulator: transformer_tap_regulator_input, + } + + errors = validate_tap_regulator_control_side_topology(input_data) + assert len(errors) == 0 + + +def test_invalid_tap_regulator_control_side_from_non_source(): + """Test invalid control side: controlling from non-source side (to_side) towards source side (from_side)""" + # Setup: source at node 0, transformer from 0 to 1, but control from to_side (1) - INVALID + node_input = initialize_array(DatasetType.input, ComponentType.node, 2) + node_input["id"] = [0, 1] + node_input["u_rated"] = [150e3, 10e3] + + source_input = initialize_array(DatasetType.input, ComponentType.source, 1) + source_input["id"] = [2] + source_input["node"] = [0] + source_input["status"] = [1] + source_input["u_ref"] = [1.0] + + transformer_input = initialize_array(DatasetType.input, ComponentType.transformer, 1) + transformer_input["id"] = [3] + transformer_input["from_node"] = [0] + transformer_input["to_node"] = [1] + transformer_input["from_status"] = [1] + transformer_input["to_status"] = [1] + transformer_input["u1"] = [150e3] + transformer_input["u2"] = [10e3] + transformer_input["sn"] = [1e5] + transformer_input["uk"] = [0.1] + transformer_input["pk"] = [1e3] + transformer_input["i0"] = [1.0e-6] + transformer_input["p0"] = [0.1] + transformer_input["winding_from"] = [1] + transformer_input["winding_to"] = [1] + transformer_input["clock"] = [0] + transformer_input["tap_side"] = [BranchSide.from_side] + transformer_input["tap_pos"] = [0] + transformer_input["tap_nom"] = [0] + transformer_input["tap_min"] = [-1] + transformer_input["tap_max"] = [1] + transformer_input["tap_size"] = [100] + + transformer_tap_regulator_input = initialize_array(DatasetType.input, ComponentType.transformer_tap_regulator, 1) + transformer_tap_regulator_input["id"] = [4] + transformer_tap_regulator_input["regulated_object"] = [3] + transformer_tap_regulator_input["status"] = [1] + transformer_tap_regulator_input["control_side"] = [BranchSide.from_side] # Controlling from source side + transformer_tap_regulator_input["u_set"] = [10e3] + transformer_tap_regulator_input["u_band"] = [200] + + input_data = { + ComponentType.node: node_input, + ComponentType.source: source_input, + ComponentType.transformer: transformer_input, + ComponentType.transformer_tap_regulator: transformer_tap_regulator_input, + } + + errors = validate_tap_regulator_control_side_topology(input_data) + assert len(errors) == 1 + assert isinstance(errors[0], InvalidTapRegulatorControlSideError) + assert errors[0].component == ComponentType.transformer_tap_regulator + assert errors[0].field == "control_side" + assert errors[0].ids == [4] + + +def test_valid_tap_regulator_cascaded_transformers(): + """Test valid control side in cascaded transformers: source -> T1 -> T2""" + # Setup: source at node 0, T1 from 0 to 1, T2 from 1 to 2 + node_input = initialize_array(DatasetType.input, ComponentType.node, 3) + node_input["id"] = [0, 1, 2] + node_input["u_rated"] = [150e3, 50e3, 10e3] + + source_input = initialize_array(DatasetType.input, ComponentType.source, 1) + source_input["id"] = [10] + source_input["node"] = [0] + source_input["status"] = [1] + source_input["u_ref"] = [1.0] + + transformer_input = initialize_array(DatasetType.input, ComponentType.transformer, 2) + transformer_input["id"] = [3, 4] + transformer_input["from_node"] = [0, 1] + transformer_input["to_node"] = [1, 2] + transformer_input["from_status"] = [1, 1] + transformer_input["to_status"] = [1, 1] + transformer_input["u1"] = [150e3, 50e3] + transformer_input["u2"] = [50e3, 10e3] + transformer_input["sn"] = [1e5, 1e5] + transformer_input["uk"] = [0.1, 0.1] + transformer_input["pk"] = [1e3, 1e3] + transformer_input["i0"] = [1.0e-6, 1.0e-6] + transformer_input["p0"] = [0.1, 0.1] + transformer_input["winding_from"] = [1, 1] + transformer_input["winding_to"] = [1, 1] + transformer_input["clock"] = [0, 0] + transformer_input["tap_side"] = [BranchSide.from_side, BranchSide.from_side] + transformer_input["tap_pos"] = [0, 0] + transformer_input["tap_nom"] = [0, 0] + transformer_input["tap_min"] = [-1, -1] + transformer_input["tap_max"] = [1, 1] + transformer_input["tap_size"] = [100, 100] + + transformer_tap_regulator_input = initialize_array(DatasetType.input, ComponentType.transformer_tap_regulator, 2) + transformer_tap_regulator_input["id"] = [5, 6] + transformer_tap_regulator_input["regulated_object"] = [3, 4] + transformer_tap_regulator_input["status"] = [1, 1] + # Both control downstream + transformer_tap_regulator_input["control_side"] = [BranchSide.to_side, BranchSide.to_side] + transformer_tap_regulator_input["u_set"] = [50e3, 10e3] + transformer_tap_regulator_input["u_band"] = [200, 200] + + input_data = { + ComponentType.node: node_input, + ComponentType.source: source_input, + ComponentType.transformer: transformer_input, + ComponentType.transformer_tap_regulator: transformer_tap_regulator_input, + } + + errors = validate_tap_regulator_control_side_topology(input_data) + assert len(errors) == 0 + + +def test_invalid_tap_regulator_second_transformer_wrong_control(): + """Test invalid control in cascaded transformers: T2 controlling from downstream towards upstream""" + # Setup: source at node 0, T1 from 0 to 1, T2 from 1 to 2, but T2 controls from to_side (wrong direction) + node_input = initialize_array(DatasetType.input, ComponentType.node, 3) + node_input["id"] = [0, 1, 2] + node_input["u_rated"] = [150e3, 50e3, 10e3] + + source_input = initialize_array(DatasetType.input, ComponentType.source, 1) + source_input["id"] = [10] + source_input["node"] = [0] + source_input["status"] = [1] + source_input["u_ref"] = [1.0] + + transformer_input = initialize_array(DatasetType.input, ComponentType.transformer, 2) + transformer_input["id"] = [3, 4] + transformer_input["from_node"] = [0, 1] + transformer_input["to_node"] = [1, 2] + transformer_input["from_status"] = [1, 1] + transformer_input["to_status"] = [1, 1] + transformer_input["u1"] = [150e3, 50e3] + transformer_input["u2"] = [50e3, 10e3] + transformer_input["sn"] = [1e5, 1e5] + transformer_input["uk"] = [0.1, 0.1] + transformer_input["pk"] = [1e3, 1e3] + transformer_input["i0"] = [1.0e-6, 1.0e-6] + transformer_input["p0"] = [0.1, 0.1] + transformer_input["winding_from"] = [1, 1] + transformer_input["winding_to"] = [1, 1] + transformer_input["clock"] = [0, 0] + transformer_input["tap_side"] = [BranchSide.from_side, BranchSide.from_side] + transformer_input["tap_pos"] = [0, 0] + transformer_input["tap_nom"] = [0, 0] + transformer_input["tap_min"] = [-1, -1] + transformer_input["tap_max"] = [1, 1] + transformer_input["tap_size"] = [100, 100] + + transformer_tap_regulator_input = initialize_array(DatasetType.input, ComponentType.transformer_tap_regulator, 2) + transformer_tap_regulator_input["id"] = [5, 6] + transformer_tap_regulator_input["regulated_object"] = [3, 4] + transformer_tap_regulator_input["status"] = [1, 1] + # T2 controls from_side (wrong!) + transformer_tap_regulator_input["control_side"] = [BranchSide.to_side, BranchSide.from_side] + transformer_tap_regulator_input["u_set"] = [50e3, 10e3] + transformer_tap_regulator_input["u_band"] = [200, 200] + + input_data = { + ComponentType.node: node_input, + ComponentType.source: source_input, + ComponentType.transformer: transformer_input, + ComponentType.transformer_tap_regulator: transformer_tap_regulator_input, + } + + errors = validate_tap_regulator_control_side_topology(input_data) + assert len(errors) == 1 + assert isinstance(errors[0], InvalidTapRegulatorControlSideError) + assert errors[0].ids == [6] # Only second regulator is invalid + + +def test_valid_three_winding_transformer_control_side(): + """Test valid control side for three-winding transformer""" + node_input = initialize_array(DatasetType.input, ComponentType.node, 3) + node_input["id"] = [0, 1, 2] + node_input["u_rated"] = [150e3, 50e3, 10e3] + + source_input = initialize_array(DatasetType.input, ComponentType.source, 1) + source_input["id"] = [10] + source_input["node"] = [0] + source_input["status"] = [1] + source_input["u_ref"] = [1.0] + + transformer3w_input = initialize_array(DatasetType.input, ComponentType.three_winding_transformer, 1) + transformer3w_input["id"] = [3] + transformer3w_input["node_1"] = [0] + transformer3w_input["node_2"] = [1] + transformer3w_input["node_3"] = [2] + transformer3w_input["status_1"] = [1] + transformer3w_input["status_2"] = [1] + transformer3w_input["status_3"] = [1] + transformer3w_input["u1"] = [150e3] + transformer3w_input["u2"] = [50e3] + transformer3w_input["u3"] = [10e3] + transformer3w_input["sn_1"] = [1e5] + transformer3w_input["sn_2"] = [1e5] + transformer3w_input["sn_3"] = [1e5] + transformer3w_input["uk_12"] = [0.1] + transformer3w_input["uk_13"] = [0.1] + transformer3w_input["uk_23"] = [0.1] + transformer3w_input["pk_12"] = [1e3] + transformer3w_input["pk_13"] = [1e3] + transformer3w_input["pk_23"] = [1e3] + transformer3w_input["i0"] = [1.0e-6] + transformer3w_input["p0"] = [0.1] + transformer3w_input["winding_1"] = [1] + transformer3w_input["winding_2"] = [1] + transformer3w_input["winding_3"] = [1] + transformer3w_input["clock_12"] = [0] + transformer3w_input["clock_13"] = [0] + transformer3w_input["tap_side"] = [Branch3Side.side_1] + transformer3w_input["tap_pos"] = [0] + transformer3w_input["tap_nom"] = [0] + transformer3w_input["tap_min"] = [-1] + transformer3w_input["tap_max"] = [1] + transformer3w_input["tap_size"] = [100] + + transformer_tap_regulator_input = initialize_array(DatasetType.input, ComponentType.transformer_tap_regulator, 1) + transformer_tap_regulator_input["id"] = [4] + transformer_tap_regulator_input["regulated_object"] = [3] + transformer_tap_regulator_input["status"] = [1] + transformer_tap_regulator_input["control_side"] = [Branch3Side.side_2] # Control on side_2 (valid) + transformer_tap_regulator_input["u_set"] = [50e3] + transformer_tap_regulator_input["u_band"] = [200] + + input_data = { + ComponentType.node: node_input, + ComponentType.source: source_input, + ComponentType.three_winding_transformer: transformer3w_input, + ComponentType.transformer_tap_regulator: transformer_tap_regulator_input, + } + + errors = validate_tap_regulator_control_side_topology(input_data) + assert len(errors) == 0 + + +def test_disabled_regulator_not_validated(): + """Test that disabled regulators are not validated""" + node_input = initialize_array(DatasetType.input, ComponentType.node, 2) + node_input["id"] = [0, 1] + node_input["u_rated"] = [150e3, 10e3] + + source_input = initialize_array(DatasetType.input, ComponentType.source, 1) + source_input["id"] = [2] + source_input["node"] = [0] + source_input["status"] = [1] + source_input["u_ref"] = [1.0] + + transformer_input = initialize_array(DatasetType.input, ComponentType.transformer, 1) + transformer_input["id"] = [3] + transformer_input["from_node"] = [0] + transformer_input["to_node"] = [1] + transformer_input["from_status"] = [1] + transformer_input["to_status"] = [1] + transformer_input["u1"] = [150e3] + transformer_input["u2"] = [10e3] + transformer_input["sn"] = [1e5] + transformer_input["uk"] = [0.1] + transformer_input["pk"] = [1e3] + transformer_input["i0"] = [1.0e-6] + transformer_input["p0"] = [0.1] + transformer_input["winding_from"] = [1] + transformer_input["winding_to"] = [1] + transformer_input["clock"] = [0] + transformer_input["tap_side"] = [BranchSide.from_side] + transformer_input["tap_pos"] = [0] + transformer_input["tap_nom"] = [0] + transformer_input["tap_min"] = [-1] + transformer_input["tap_max"] = [1] + transformer_input["tap_size"] = [100] + + transformer_tap_regulator_input = initialize_array(DatasetType.input, ComponentType.transformer_tap_regulator, 1) + transformer_tap_regulator_input["id"] = [4] + transformer_tap_regulator_input["regulated_object"] = [3] + transformer_tap_regulator_input["status"] = [0] # DISABLED + transformer_tap_regulator_input["control_side"] = [BranchSide.from_side] # Would be invalid if enabled + transformer_tap_regulator_input["u_set"] = [10e3] + transformer_tap_regulator_input["u_band"] = [200] + + input_data = { + ComponentType.node: node_input, + ComponentType.source: source_input, + ComponentType.transformer: transformer_input, + ComponentType.transformer_tap_regulator: transformer_tap_regulator_input, + } + + errors = validate_tap_regulator_control_side_topology(input_data) + assert len(errors) == 0 # Disabled regulator should not trigger error + + +def test_integration_with_validate_input_data(): + """Test that the new validation is integrated into validate_input_data""" + # Setup invalid control side configuration + node_input = initialize_array(DatasetType.input, ComponentType.node, 2) + node_input["id"] = [0, 1] + node_input["u_rated"] = [150e3, 10e3] + + source_input = initialize_array(DatasetType.input, ComponentType.source, 1) + source_input["id"] = [2] + source_input["node"] = [0] + source_input["status"] = [1] + source_input["u_ref"] = [1.0] + + transformer_input = initialize_array(DatasetType.input, ComponentType.transformer, 1) + transformer_input["id"] = [3] + transformer_input["from_node"] = [0] + transformer_input["to_node"] = [1] + transformer_input["from_status"] = [1] + transformer_input["to_status"] = [1] + transformer_input["u1"] = [150e3] + transformer_input["u2"] = [10e3] + transformer_input["sn"] = [1e5] + transformer_input["uk"] = [0.1] + transformer_input["pk"] = [1e3] + transformer_input["i0"] = [1.0e-6] + transformer_input["p0"] = [0.1] + transformer_input["winding_from"] = [1] + transformer_input["winding_to"] = [1] + transformer_input["clock"] = [0] + transformer_input["tap_side"] = [BranchSide.from_side] + transformer_input["tap_pos"] = [0] + transformer_input["tap_nom"] = [0] + transformer_input["tap_min"] = [-1] + transformer_input["tap_max"] = [1] + transformer_input["tap_size"] = [100] + + transformer_tap_regulator_input = initialize_array(DatasetType.input, ComponentType.transformer_tap_regulator, 1) + transformer_tap_regulator_input["id"] = [4] + transformer_tap_regulator_input["regulated_object"] = [3] + transformer_tap_regulator_input["status"] = [1] + transformer_tap_regulator_input["control_side"] = [BranchSide.from_side] # Controlling from source side - INVALID + transformer_tap_regulator_input["u_set"] = [10e3] + transformer_tap_regulator_input["u_band"] = [200] + + input_data = { + ComponentType.node: node_input, + ComponentType.source: source_input, + ComponentType.transformer: transformer_input, + ComponentType.transformer_tap_regulator: transformer_tap_regulator_input, + } + + errors = validate_input_data(input_data) + assert errors is not None + assert len(errors) > 0 + # Check that at least one error is InvalidTapRegulatorControlSideError + assert any(isinstance(err, InvalidTapRegulatorControlSideError) for err in errors) + + +def test_no_source_no_error(): + """Test that if there's no source, no error is raised (all nodes unreachable)""" + # Setup: No source, transformer from 0 to 1 + node_input = initialize_array(DatasetType.input, ComponentType.node, 2) + node_input["id"] = [0, 1] + node_input["u_rated"] = [150e3, 10e3] + + transformer_input = initialize_array(DatasetType.input, ComponentType.transformer, 1) + transformer_input["id"] = [3] + transformer_input["from_node"] = [0] + transformer_input["to_node"] = [1] + transformer_input["from_status"] = [1] + transformer_input["to_status"] = [1] + transformer_input["u1"] = [150e3] + transformer_input["u2"] = [10e3] + transformer_input["sn"] = [1e5] + transformer_input["uk"] = [0.1] + transformer_input["pk"] = [1e3] + transformer_input["i0"] = [1.0e-6] + transformer_input["p0"] = [0.1] + transformer_input["winding_from"] = [1] + transformer_input["winding_to"] = [1] + transformer_input["clock"] = [0] + transformer_input["tap_side"] = [BranchSide.from_side] + transformer_input["tap_pos"] = [0] + transformer_input["tap_nom"] = [0] + transformer_input["tap_min"] = [-1] + transformer_input["tap_max"] = [1] + transformer_input["tap_size"] = [100] + + transformer_tap_regulator_input = initialize_array(DatasetType.input, ComponentType.transformer_tap_regulator, 1) + transformer_tap_regulator_input["id"] = [4] + transformer_tap_regulator_input["regulated_object"] = [3] + transformer_tap_regulator_input["status"] = [1] + transformer_tap_regulator_input["control_side"] = [BranchSide.from_side] + transformer_tap_regulator_input["u_set"] = [10e3] + transformer_tap_regulator_input["u_band"] = [200] + + input_data = { + ComponentType.node: node_input, + ComponentType.transformer: transformer_input, + ComponentType.transformer_tap_regulator: transformer_tap_regulator_input, + } + + errors = validate_tap_regulator_control_side_topology(input_data) + assert len(errors) == 0 # No source means all nodes unreachable, no partial reachability issue + + +def test_multiple_invalid_regulators(): + """Test multiple transformers with invalid control side configuration""" + node_input = initialize_array(DatasetType.input, ComponentType.node, 4) + node_input["id"] = [0, 1, 2, 3] + node_input["u_rated"] = [150e3, 50e3, 10e3, 5e3] + + source_input = initialize_array(DatasetType.input, ComponentType.source, 1) + source_input["id"] = [10] + source_input["node"] = [0] + source_input["status"] = [1] + source_input["u_ref"] = [1.0] + + transformer_input = initialize_array(DatasetType.input, ComponentType.transformer, 3) + transformer_input["id"] = [20, 21, 22] + transformer_input["from_node"] = [0, 1, 2] + transformer_input["to_node"] = [1, 2, 3] + transformer_input["from_status"] = [1, 1, 1] + transformer_input["to_status"] = [1, 1, 1] + transformer_input["u1"] = [150e3, 50e3, 10e3] + transformer_input["u2"] = [50e3, 10e3, 5e3] + transformer_input["sn"] = [1e5, 1e5, 1e5] + transformer_input["uk"] = [0.1, 0.1, 0.1] + transformer_input["pk"] = [1e3, 1e3, 1e3] + transformer_input["i0"] = [1.0e-6, 1.0e-6, 1.0e-6] + transformer_input["p0"] = [0.1, 0.1, 0.1] + transformer_input["winding_from"] = [1, 1, 1] + transformer_input["winding_to"] = [1, 1, 1] + transformer_input["clock"] = [0, 0, 0] + transformer_input["tap_side"] = [BranchSide.from_side, BranchSide.from_side, BranchSide.from_side] + transformer_input["tap_pos"] = [0, 0, 0] + transformer_input["tap_nom"] = [0, 0, 0] + transformer_input["tap_min"] = [-1, -1, -1] + transformer_input["tap_max"] = [1, 1, 1] + transformer_input["tap_size"] = [100, 100, 100] + + transformer_tap_regulator_input = initialize_array(DatasetType.input, ComponentType.transformer_tap_regulator, 3) + transformer_tap_regulator_input["id"] = [30, 31, 32] + transformer_tap_regulator_input["regulated_object"] = [20, 21, 22] + transformer_tap_regulator_input["status"] = [1, 1, 1] + transformer_tap_regulator_input["control_side"] = [ + BranchSide.to_side, # Valid + BranchSide.from_side, # Invalid - controlling from downstream towards upstream + BranchSide.from_side, # Invalid - controlling from downstream towards upstream + ] + transformer_tap_regulator_input["u_set"] = [50e3, 10e3, 5e3] + transformer_tap_regulator_input["u_band"] = [200, 200, 200] + + input_data = { + ComponentType.node: node_input, + ComponentType.source: source_input, + ComponentType.transformer: transformer_input, + ComponentType.transformer_tap_regulator: transformer_tap_regulator_input, + } + + errors = validate_tap_regulator_control_side_topology(input_data) + assert len(errors) == 1 + assert isinstance(errors[0], InvalidTapRegulatorControlSideError) + assert set(errors[0].ids) == {31} # Only second regulator is invalid (third is in disconnected subgraph)