Skip to content
Merged

Dev #426

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 3 additions & 6 deletions config/task_generator/process_conf.json
Original file line number Diff line number Diff line change
Expand Up @@ -556,19 +556,16 @@
"valid_to": "",
"gate_open": "P1DT5H",
"gate_close": "P1DT4H",
"run_at": "30 22 3 * 5",
"run_at": "00 23 17 * *",
"time_frame": "M-1",
"data_timestamps": "30 7 * * *",
"data_timestamps": "30 9 * * *",
"data_resolution": "PT1H",
"tags": [],
"properties": {
"merge_type": "BA",
"merging_entity": "BALTICRCC",
"included": [
"AST",
"PSE",
"ELERING",
"LITGRID"
"PSE"
],
"excluded": [],
"local_import": [
Expand Down
46 changes: 36 additions & 10 deletions emf/common/loadflow_tool/settings_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@
from copy import deepcopy
import pypowsybl
import logging
import config
from emf.common.config_parser import parse_app_properties
from enum import Enum as _PyEnum
from elasticsearch import Elasticsearch
from emf.common.loadflow_tool import loadflow_settings

try:
Expand All @@ -15,6 +18,8 @@

logger = logging.getLogger(__name__)

parse_app_properties(globals(), config.paths.integrations.elastic)


class LoadflowSettingsManager:
"""Class-based settings manager for pypowsybl load flow parameters.
Expand All @@ -39,31 +44,52 @@ class LoadflowSettingsManager:
_KNOWN_PARAM_FIELDS = [f for f in _KNOWN_PARAM_FIELDS if f != "provider_parameters"]

def __init__(self,
elastic_server: str = ELK_SERVER,
elastic_username: str | None = None,
elastic_password: str | None = None,
elastic_index: str = 'config-lf-parameters',
settings_keyword: str = 'EU_DEFAULT',
override_path: str | None = None,
):

self.elastic_server = elastic_server
self.elastic_username = elastic_username
self.elastic_password = elastic_password
self.elastic_index = elastic_index
self.settings_keyword = settings_keyword

# Decide override path from arg or env
env_path = os.environ.get('LOADFLOW_CONFIG_OVERRIDE_PATH')
self.override_path = Path(override_path or env_path) if (override_path or env_path) else None
if self.override_path:
logger.info(f"Loadflow settings override path: {self.override_path}")
else:
logger.info(f"Using settings from default definitions: {self.settings_keyword}")

# Build defaults snapshot (dict-based), then merge overrides if any
_default_settings = getattr(loadflow_settings, self.settings_keyword)
base = {
'LF_PROVIDER': deepcopy(_default_settings.provider_parameters),
'LF_PARAMETERS': self._extract_params_dict(_default_settings),
}
# Firstly try to get loadflow parameters from Elastic as primary source, otherwise - fallback to repository
try:
base = self._get_defaults_from_elastic()
except Exception as err:
logger.warning(f"Loadflow settings retrieving failed from Elastic: {err}")
logger.warning(f"Using default settings from repository with key: {self.settings_keyword}")
_default_settings = getattr(loadflow_settings, self.settings_keyword)
base = {
'LF_PROVIDER': deepcopy(_default_settings.provider_parameters),
'LF_PARAMETERS': self._extract_params_dict(_default_settings),
}

# Handle overrides if defined
overrides = self._load_override_file(self.override_path) if self.override_path else {}
self.config = self._deep_merge(base, overrides)

# ----------------- I/O -----------------
def _load_override_file(self, path: Path | None) -> dict:
def _get_defaults_from_elastic(self) -> dict:
client = Elasticsearch(self.elastic_server)
logger.info(f"Retrieving base loadflow settings fromm Elasticsearch with key: {self.settings_keyword}")
response = client.get(index=self.elastic_index, id=self.settings_keyword)

return response.raw["_source"]

@staticmethod
def _load_override_file(path: Path | None) -> dict:
if not path:
return {}
if not path.exists():
Expand Down Expand Up @@ -262,7 +288,7 @@ def export_config(self, plain: bool = True, enum_repr: str = 'name') -> dict:
print(mgr.config)
pp_mgr = mgr.build_pypowsybl_parameters()
# Test accessors
print('Sample read:', mgr.get('LF_PARAMETERS.write_slack_bus', None))
print('Sample read:', mgr.get('LF_PARAMETERS.connected_component_mode', None))
# mgr.set('LF_PROVIDER.maxNewtonRaphsonIterations', '25')
# print('After set:', mgr.get('LF_PROVIDER.maxNewtonRaphsonIterations'))

Expand Down
12 changes: 6 additions & 6 deletions emf/model_retriever/model_retriever.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def __init__(self):
logger.error(f"Failed to connect to OPDM: {e}")
time.sleep(60) # wait 60 seconds before retry

def handle(self, message: bytes, properties: dict, **kwargs):
def handle(self, message: bytes, properties: object, **kwargs):
# Load from binary to json
opdm_objects = json.loads(message)

Expand All @@ -41,8 +41,8 @@ def handle(self, message: bytes, properties: dict, **kwargs):
opdm_object["data-source"] = "OPDM"
else:
logger.warning(f"{party} and {time_horizon} message not processed due to configured filtering") # if out of filter raise exception and move on
properties.header['success'] = False
return opdm_objects, properties
properties.headers['success'] = False
return opdm_objects, properties

return opdm_objects, properties

Expand All @@ -52,7 +52,7 @@ class HandlerModelsFromBytesIO:
def __init__(self):
pass

def handle(self, message: bytes, properties: dict, **kwargs):
def handle(self, message: bytes, properties: object, **kwargs):

message_content = BytesIO(message)
message_content.name = 'unknown.zip'
Expand All @@ -74,7 +74,7 @@ class HandlerModelsToMinio:
def __init__(self):
self.minio_service = minio_api.ObjectStorage()

def handle(self, message: bytes, properties: dict, **kwargs):
def handle(self, message: bytes, properties: object, **kwargs):

opdm_objects = message

Expand Down Expand Up @@ -121,7 +121,7 @@ class HandlerModelsToValidator:
def __init__(self):
pass

def handle(self, message: bytes, properties: dict, **kwargs):
def handle(self, message: bytes, properties: object, **kwargs):

# Load from binary to json
opdm_objects = json.loads(message)
Expand Down
40 changes: 17 additions & 23 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,30 +5,24 @@ description = "Add your description here"
readme = "README.md"
requires-python = ">=3.11"
dependencies = [
"aniso8601>=10.0.1",
"croniter>=6.0.0",
"edx>=0.0.16",
"elasticsearch>=9.0.1",
"lxml>=5.4.0",
"minio>=7.2.15",
"ndjson>=0.3.1",
"opdm-api>=0.1.3",
"pandas==2.2.2",
"pika>=1.3.2",
"pypowsybl==1.9.0",
"pytz>=2025.2",
"requests>=2.32.3",
"saxonche>=12.7.0",
"triplets>=0.0.10",
"pandas",
"numpy",
"pypowsybl",
"aniso8601",
"lxml",
"edx",
"opdm-api",
"triplets",
"ndjson",
"elasticsearch",
"minio",
"pytz",
"requests",
"pika",
"saxonche",
"croniter",
]

[[tool.uv.index]]
name = "pypi"
url = "https://pypi.org/simple"

[[tool.uv.index]]
name = "rcc-pypi"
url = "https://artifactory.elering.sise/artifactory/api/pypi/rcc-pypi/simple"

[tool.uv]
allow-insecure-host = ["artifactory.elering.sise"]
url = "https://pypi.org/simple"
Loading
Loading