From 3ca67f46a4ef9c4711395ce80a085f54ceabf517 Mon Sep 17 00:00:00 2001 From: Muhammad-Moiz626 Date: Tue, 19 Aug 2025 20:14:50 +0500 Subject: [PATCH 01/12] WIP: added migration file. --- .../timed_beliefs_materialized_views.py | 63 +++++++++++++++++++ 1 file changed, 63 insertions(+) create mode 100644 flexmeasures/data/migrations/versions/timed_beliefs_materialized_views.py diff --git a/flexmeasures/data/migrations/versions/timed_beliefs_materialized_views.py b/flexmeasures/data/migrations/versions/timed_beliefs_materialized_views.py new file mode 100644 index 0000000000..f4aa64e4d5 --- /dev/null +++ b/flexmeasures/data/migrations/versions/timed_beliefs_materialized_views.py @@ -0,0 +1,63 @@ +"""Add materialized view for belief optimization + +Revision ID: c98798csds8c +Revises: b8f3cda5e023 +Create Date: 2025-08-08 04:55:33.722545 + +""" + +from alembic import op + +# revision identifiers +revision = "c98798csds8c" +down_revision = "b8f3cda5e023" +branch_labels = None +depends_on = None + + +def upgrade(): + # Create the materialized view with proper alias + op.execute( + """ + CREATE MATERIALIZED VIEW timed_belief_min_v AS + SELECT * + FROM ( + SELECT + timed_belief.sensor_id, + timed_belief.event_start, + timed_belief.source_id, + MIN(timed_belief.belief_horizon) AS most_recent_belief_horizon + FROM timed_belief + INNER JOIN data_source + ON data_source.id = timed_belief.source_id + GROUP BY + timed_belief.sensor_id, + timed_belief.event_start, + timed_belief.source_id + ) AS belief_mins + GROUP BY + sensor_id, + event_start, + source_id, + most_recent_belief_horizon; + """ + ) + + # Create indexes + op.execute( + """ + CREATE INDEX idx_timed_belief_min_v_sensor_event + ON timed_belief_min_v(sensor_id, event_start); + """ + ) + + op.execute( + """ + CREATE INDEX idx_timed_belief_min_v_event_start + ON timed_belief_min_v(event_start); + """ + ) + + +def downgrade(): + op.execute("DROP MATERIALIZED VIEW IF EXISTS timed_belief_min_v CASCADE;") From 14226badb047b117dbc8c3bb86de37c258fd0250 Mon Sep 17 00:00:00 2001 From: Muhammad-Moiz626 Date: Wed, 20 Aug 2025 19:23:11 +0500 Subject: [PATCH 02/12] Added materialized view boolean to backend logic and created CLI command for refresh. --- flexmeasures/api/dev/sensors.py | 5 +++ flexmeasures/api/v3_0/assets.py | 5 +++ flexmeasures/cli/db_ops.py | 29 +++++++++++++++ flexmeasures/data/models/generic_assets.py | 6 +++- flexmeasures/data/models/time_series.py | 10 +++++- .../data/schemas/tests/test_input_schema.py | 2 ++ flexmeasures/utils/validation_utils.py | 35 +++++++++++++++++++ 7 files changed, 90 insertions(+), 2 deletions(-) diff --git a/flexmeasures/api/dev/sensors.py b/flexmeasures/api/dev/sensors.py index b8f52669e0..9a5dbadfc9 100644 --- a/flexmeasures/api/dev/sensors.py +++ b/flexmeasures/api/dev/sensors.py @@ -20,6 +20,9 @@ from flexmeasures.data.models.time_series import Sensor from flexmeasures.data.services.annotations import prepare_annotations_for_chart from flexmeasures.ui.utils.view_utils import set_session_variables +from flexmeasures.utils.validation_utils import ( + validate_timed_belief_min_v, +) class SensorAPI(FlaskView): @@ -90,6 +93,7 @@ def get_chart(self, id: int, sensor: Sensor, **kwargs): "most_recent_beliefs_only": fields.Boolean( required=False, load_default=True ), + "use_materialized_view": fields.Boolean(required=False, load_default=True), }, location="query", ) @@ -110,6 +114,7 @@ def get_chart_data(self, id: int, sensor: Sensor, **kwargs): - "resolution" (see :ref:`resolutions`) - "most_recent_beliefs_only" (if true, returns the most recent belief for each event; if false, returns each belief for each event; defaults to true) """ + kwargs["timed_belief_min_v"] = validate_timed_belief_min_v(db.session) return sensor.search_beliefs(as_json=True, **kwargs) @route("//chart_annotations", strict_slashes=False) diff --git a/flexmeasures/api/v3_0/assets.py b/flexmeasures/api/v3_0/assets.py index 55a13a8e24..4b123e298a 100644 --- a/flexmeasures/api/v3_0/assets.py +++ b/flexmeasures/api/v3_0/assets.py @@ -47,6 +47,9 @@ from flexmeasures.utils.coding_utils import ( flatten_unique, ) +from flexmeasures.utils.validation_utils import ( + validate_timed_belief_min_v, +) from flexmeasures.ui.utils.view_utils import clear_session, set_session_variables from flexmeasures.auth.policy import check_access from werkzeug.exceptions import Forbidden, Unauthorized @@ -657,6 +660,7 @@ def get_chart(self, id: int, asset: GenericAsset, **kwargs): "beliefs_after": AwareDateTimeField(format="iso", required=False), "beliefs_before": AwareDateTimeField(format="iso", required=False), "most_recent_beliefs_only": fields.Boolean(required=False), + "use_materialized_view": fields.Boolean(required=False, load_default=True), }, location="query", ) @@ -669,6 +673,7 @@ def get_chart_data(self, id: int, asset: GenericAsset, **kwargs): Data for use in charts (in case you have the chart specs already). """ sensors = flatten_unique(asset.validate_sensors_to_show()) + kwargs["timed_belief_min_v"] = validate_timed_belief_min_v(db.session) return asset.search_beliefs(sensors=sensors, as_json=True, **kwargs) @route("//auditlog") diff --git a/flexmeasures/cli/db_ops.py b/flexmeasures/cli/db_ops.py index 21173150ef..01c04a41cf 100644 --- a/flexmeasures/cli/db_ops.py +++ b/flexmeasures/cli/db_ops.py @@ -9,6 +9,7 @@ import click from flexmeasures.cli.utils import MsgStyle +from flexmeasures.data import db @click.group("db-ops") @@ -95,4 +96,32 @@ def restore(file: str): click.secho("db restore unsuccessful", **MsgStyle.ERROR) +@fm_db_ops.command("refresh-materialized-views") +@with_appcontext +@click.option("--concurrent", is_flag=True, default=False) +def refresh_materialized_views(concurrent: bool): + """Refresh materialized views for better query performance.""" + from sqlalchemy import text + + refresh_type = "CONCURRENTLY" if concurrent else "" + import time + + start_time = time.time() + click.echo( + f"Refreshing materialized views {'CONCURRENTLY' if concurrent else 'without concurrency'}..." + ) + try: + db.session.execute( + text(f"REFRESH MATERIALIZED {refresh_type} VIEW timed_belief_min_v;") + ) + db.session.commit() + elapsed_time = time.time() - start_time + click.echo( + f"✓ Materialized views refreshed successfully in {elapsed_time:.2f} seconds" + ) + except Exception as e: + db.session.rollback() + click.echo(f"✗ Error refreshing materialized views: {e}") + + app.cli.add_command(fm_db_ops) diff --git a/flexmeasures/data/models/generic_assets.py b/flexmeasures/data/models/generic_assets.py index 4aae30d47f..564dbc312c 100644 --- a/flexmeasures/data/models/generic_assets.py +++ b/flexmeasures/data/models/generic_assets.py @@ -7,7 +7,7 @@ from flask import current_app from flask_security import current_user import pandas as pd -from sqlalchemy import select +from sqlalchemy import select, Table from sqlalchemy.engine import Row from sqlalchemy.ext.hybrid import hybrid_method from sqlalchemy.sql.expression import func, text @@ -647,6 +647,8 @@ def search_beliefs( most_recent_events_only: bool = False, as_json: bool = False, resolution: timedelta | None = None, + use_materialized_view: bool = True, + timed_belief_min_v: Table | None = None, ) -> BeliefsDataFrame | str: """Search all beliefs about events for all sensors of this asset @@ -686,6 +688,8 @@ def search_beliefs( most_recent_events_only=most_recent_events_only, one_deterministic_belief_per_event_per_source=True, resolution=resolution, + use_materialized_view=use_materialized_view, + timed_belief_min_v=timed_belief_min_v, ) if as_json: from flexmeasures.data.services.time_series import simplify_index diff --git a/flexmeasures/data/models/time_series.py b/flexmeasures/data/models/time_series.py index 803a541053..2b068c57c4 100644 --- a/flexmeasures/data/models/time_series.py +++ b/flexmeasures/data/models/time_series.py @@ -8,7 +8,7 @@ from flask import current_app import pandas as pd -from sqlalchemy import select +from sqlalchemy import select, Table from sqlalchemy.ext.declarative import declared_attr from sqlalchemy.ext.mutable import MutableDict from sqlalchemy.schema import UniqueConstraint @@ -354,6 +354,8 @@ def search_beliefs( one_deterministic_belief_per_event_per_source: bool = False, as_json: bool = False, resolution: str | timedelta | None = None, + use_materialized_view: bool = True, + timed_belief_min_v: Table | None = None, ) -> tb.BeliefsDataFrame | str: """Search all beliefs about events for this sensor. @@ -398,6 +400,8 @@ def search_beliefs( one_deterministic_belief_per_event=one_deterministic_belief_per_event, one_deterministic_belief_per_event_per_source=one_deterministic_belief_per_event_per_source, resolution=resolution, + use_materialized_view=use_materialized_view, + timed_belief_min_v=timed_belief_min_v, ) if as_json: df = bdf.reset_index() @@ -715,6 +719,8 @@ def search( one_deterministic_belief_per_event_per_source: bool = False, resolution: str | timedelta = None, sum_multiple: bool = True, + use_materialized_view: bool = True, + timed_belief_min_v: Table | None = None, ) -> tb.BeliefsDataFrame | dict[str, tb.BeliefsDataFrame]: """Search all beliefs about events for the given sensors. @@ -798,6 +804,8 @@ def search( **most_recent_filters, custom_filter_criteria=source_criteria, custom_join_targets=custom_join_targets, + use_materialized_view=use_materialized_view, + timed_belief_min_v=timed_belief_min_v, ) if use_latest_version_per_event: bdf = keep_latest_version( diff --git a/flexmeasures/data/schemas/tests/test_input_schema.py b/flexmeasures/data/schemas/tests/test_input_schema.py index 37fd6daf10..325c395332 100644 --- a/flexmeasures/data/schemas/tests/test_input_schema.py +++ b/flexmeasures/data/schemas/tests/test_input_schema.py @@ -18,6 +18,8 @@ def test_input_schema(): # These arguments are not mapped to a field at all (state a reason) excluded_arg_names = [ "as_json", # used in Sensor.search_beliefs but not in TimedBelief.search + "timed_belief_min_v", # used in Sensor.search_beliefs but not in TimedBelief.search + "use_materialized_view", # used in Sensor.search_beliefs but not in ] arg_names_without_associated_fields = [ diff --git a/flexmeasures/utils/validation_utils.py b/flexmeasures/utils/validation_utils.py index 50935ed544..183d967feb 100644 --- a/flexmeasures/utils/validation_utils.py +++ b/flexmeasures/utils/validation_utils.py @@ -1,4 +1,6 @@ import re +from sqlalchemy import MetaData, Table, Column, Integer, DateTime, Interval +from sqlalchemy.sql import text def validate_color_hex(value): @@ -51,3 +53,36 @@ def validate_url(value): ) return value + + +def validate_timed_belief_min_v(session) -> Table | None: + """Define the structure of the timed_belief_min_v materialized view if it exists.""" + # Check if materialized view exists + result = session.execute( + text( + """ + SELECT EXISTS ( + SELECT FROM information_schema.tables + WHERE table_name = 'timed_belief_min_v' + AND table_schema = 'public' + ); + """ + ) + ) + + exists = result.scalar() + if not exists: + return None + print("Using timed_belief_min_v materialized view for optimized queries.") + + # Only create the table definition if it exists + metadata = MetaData() + timed_belief_min_v = Table( + "timed_belief_min_v", + metadata, + Column("sensor_id", Integer), + Column("event_start", DateTime), + Column("source_id", Integer), + Column("most_recent_belief_horizon", Interval), + ) + return timed_belief_min_v From ce19cb012bd0a3e08969016e989aab74f118c7ec Mon Sep 17 00:00:00 2001 From: Muhammad-Moiz626 Date: Fri, 22 Aug 2025 20:08:26 +0500 Subject: [PATCH 03/12] Added refresh interval, Fixed some bugs. --- flexmeasures/api/dev/sensors.py | 4 +- flexmeasures/api/v3_0/assets.py | 4 +- flexmeasures/ui/templates/base.html | 96 +++++++++++++++++++++++++- flexmeasures/ui/views/assets/views.py | 4 ++ flexmeasures/utils/config_defaults.py | 1 + flexmeasures/utils/validation_utils.py | 22 +----- 6 files changed, 105 insertions(+), 26 deletions(-) diff --git a/flexmeasures/api/dev/sensors.py b/flexmeasures/api/dev/sensors.py index 9a5dbadfc9..ec54f821f3 100644 --- a/flexmeasures/api/dev/sensors.py +++ b/flexmeasures/api/dev/sensors.py @@ -21,7 +21,7 @@ from flexmeasures.data.services.annotations import prepare_annotations_for_chart from flexmeasures.ui.utils.view_utils import set_session_variables from flexmeasures.utils.validation_utils import ( - validate_timed_belief_min_v, + get_timed_belief_min_v, ) @@ -114,7 +114,7 @@ def get_chart_data(self, id: int, sensor: Sensor, **kwargs): - "resolution" (see :ref:`resolutions`) - "most_recent_beliefs_only" (if true, returns the most recent belief for each event; if false, returns each belief for each event; defaults to true) """ - kwargs["timed_belief_min_v"] = validate_timed_belief_min_v(db.session) + kwargs["timed_belief_min_v"] = get_timed_belief_min_v(db.session) return sensor.search_beliefs(as_json=True, **kwargs) @route("//chart_annotations", strict_slashes=False) diff --git a/flexmeasures/api/v3_0/assets.py b/flexmeasures/api/v3_0/assets.py index 4b123e298a..b90a79bb0b 100644 --- a/flexmeasures/api/v3_0/assets.py +++ b/flexmeasures/api/v3_0/assets.py @@ -48,7 +48,7 @@ flatten_unique, ) from flexmeasures.utils.validation_utils import ( - validate_timed_belief_min_v, + get_timed_belief_min_v, ) from flexmeasures.ui.utils.view_utils import clear_session, set_session_variables from flexmeasures.auth.policy import check_access @@ -673,7 +673,7 @@ def get_chart_data(self, id: int, asset: GenericAsset, **kwargs): Data for use in charts (in case you have the chart specs already). """ sensors = flatten_unique(asset.validate_sensors_to_show()) - kwargs["timed_belief_min_v"] = validate_timed_belief_min_v(db.session) + kwargs["timed_belief_min_v"] = get_timed_belief_min_v(db.session) return asset.search_beliefs(sensors=sensors, as_json=True, **kwargs) @route("//auditlog") diff --git a/flexmeasures/ui/templates/base.html b/flexmeasures/ui/templates/base.html index 8c07697b81..dc98592321 100644 --- a/flexmeasures/ui/templates/base.html +++ b/flexmeasures/ui/templates/base.html @@ -227,7 +227,13 @@ {% endfor %} + {% if active_subpage == "asset_graph" %}
+ + +
+ {% endif %} +
@@ -893,20 +899,106 @@ * @param {String} queryEndDate The end date as a string in ISO format, to be used in the query. * @return {Promise} A promise that resolves with the data. */ - function fetchGraphDataAndKPIs(previousResult, startDate, endDate, queryStartDate, queryEndDate) { + function fetchGraphDataAndKPIs(previousResult, startDate, endDate, queryStartDate, queryEndDate, useMaterializedView=true) { if (previousResult && previousResult.start.getTime() === startDate.getTime() && previousResult.end.getTime() === endDate.getTime()){ return Promise.resolve(previousResult.data); } else { {% if active_subpage == "asset_graph" and has_kpis %} getAssetKPIs(); {% endif %} - return fetch(dataPath + '/chart_data?event_starts_after=' + queryStartDate + '&event_ends_before=' + queryEndDate, { + return fetch(dataPath + '/chart_data?event_starts_after=' + queryStartDate + '&event_ends_before=' + queryEndDate + '&use_materialized_view=' + useMaterializedView, { method: "GET", headers: { "Content-Type": "application/json" }, signal: signal, }).then(response => response.json()); } } + + const useMaterializedViewCheckbox = document.getElementById("useMaterializedView"); + useMaterializedViewCheckbox.addEventListener("change", function() { + toggleMaterializedView(); + }); + + // Toggle materialized view usage for timely beliefs + function toggleMaterializedView() { + // Get the checked status of the checkbox + const isChecked = useMaterializedViewCheckbox.checked; + const pickerStartDate = picker.getStartDate().toJSDate(); + const queryStartDate= encodeURIComponent(toIsoStringWithOffset(pickerStartDate)); + var queryEndDate = picker.getEndDate(); + queryEndDate.setDate(queryEndDate.getDate() + 1); + queryEndDate = encodeURIComponent(toIsoStringWithOffset(queryEndDate.toJSDate())); + // Since we are going to make a call anyway so we'll send these values as null + const startDate = null; + const endDate = null; + previousResult = null; + + $("#spinner").show(); + if (isChecked) { + console.log("Checked.") + // fetchGraphDataAndKPIs(previousResult, startDate, endDate, queryStartDate, queryEndDate, true); + Promise.all([ + fetchGraphDataAndKPIs(previousResult, startDate, endDate, queryStartDate, queryEndDate, true), + /** + // Fetch annotations + fetch(dataPath + '/chart_annotations?event_starts_after=' + queryStartDate + '&event_ends_before=' + queryEndDate, { + method: "GET", + headers: {"Content-Type": "application/json"}, + signal: signal, + }) + .then(function(response) { return response.json(); }), + */ + + // Embed chart + embedAndLoad(chartSpecsPath + 'event_starts_after=' + queryStartDate + '&event_ends_before=' + queryEndDate + '&', elementId, datasetName, previousResult, startDate, endDate), + ]).then(function(result) { + $("#spinner").hide(); + vegaView.change(datasetName, vega.changeset().remove(vega.truthy).insert(result[0])).resize().run(); + previousResult = { + start: startDate, + end: endDate, + data: result[0] + }; + checkSourceMasking(previousResult.data); + playBackDataLoadedForKnownDateRange = false; + /** + vegaView.change(datasetName + '_annotations', vega.changeset().remove(vega.truthy).insert(result[1])).resize().run(); + */ + }).catch(console.error); + } + else { + // fetchGraphDataAndKPIs(previousResult, startDate, endDate, queryStartDate, queryEndDate, false); + Promise.all([ + fetchGraphDataAndKPIs(previousResult, startDate, endDate, queryStartDate, queryEndDate, false), + /** + // Fetch annotations + fetch(dataPath + '/chart_annotations?event_starts_after=' + queryStartDate + '&event_ends_before=' + queryEndDate, { + method: "GET", + headers: {"Content-Type": "application/json"}, + signal: signal, + }) + .then(function(response) { return response.json(); }), + */ + + // Embed chart + embedAndLoad(chartSpecsPath + 'event_starts_after=' + queryStartDate + '&event_ends_before=' + queryEndDate + '&', elementId, datasetName, previousResult, startDate, endDate), + ]).then(function(result) { + $("#spinner").hide(); + vegaView.change(datasetName, vega.changeset().remove(vega.truthy).insert(result[0])).resize().run(); + previousResult = { + start: startDate, + end: endDate, + data: result[0] + }; + checkSourceMasking(previousResult.data); + playBackDataLoadedForKnownDateRange = false; + /** + vegaView.change(datasetName + '_annotations', vega.changeset().remove(vega.truthy).insert(result[1])).resize().run(); + */ + }).catch(console.error); + } + $("#spinner").hide(); + } {% if active_subpage == "asset_graph" and has_kpis %} function getAssetKPIs() { diff --git a/flexmeasures/ui/views/assets/views.py b/flexmeasures/ui/views/assets/views.py index d01cf18501..d513b46057 100644 --- a/flexmeasures/ui/views/assets/views.py +++ b/flexmeasures/ui/views/assets/views.py @@ -363,12 +363,16 @@ def graphs(self, id: str, start_time=None, end_time=None): asset_form = AssetForm() asset_form.with_options() asset_form.process(obj=asset) + mv_refresh_interval = current_app.config.get( + "FLEXMEASURES_MVIEW_UPDATE_INTERVAL", None + ) return render_flexmeasures_template( "assets/asset_graph.html", asset=asset, has_kpis=has_kpis, asset_kpis=asset_kpis, + mv_refresh_interval=mv_refresh_interval, current_page="Graphs", ) diff --git a/flexmeasures/utils/config_defaults.py b/flexmeasures/utils/config_defaults.py index a4afa4a07c..ce323b26e9 100644 --- a/flexmeasures/utils/config_defaults.py +++ b/flexmeasures/utils/config_defaults.py @@ -21,6 +21,7 @@ class Config(object): DEBUG: bool = False LOGGING_LEVEL: int = logging.WARNING SECRET_KEY: str | None = None + FLEXMEASURES_MVIEW_UPDATE_INTERVAL: int | None = None FLEXMEASURES_ENV_DEFAULT = "production" diff --git a/flexmeasures/utils/validation_utils.py b/flexmeasures/utils/validation_utils.py index 183d967feb..5b3aec5eb9 100644 --- a/flexmeasures/utils/validation_utils.py +++ b/flexmeasures/utils/validation_utils.py @@ -1,6 +1,5 @@ import re from sqlalchemy import MetaData, Table, Column, Integer, DateTime, Interval -from sqlalchemy.sql import text def validate_color_hex(value): @@ -55,25 +54,8 @@ def validate_url(value): return value -def validate_timed_belief_min_v(session) -> Table | None: - """Define the structure of the timed_belief_min_v materialized view if it exists.""" - # Check if materialized view exists - result = session.execute( - text( - """ - SELECT EXISTS ( - SELECT FROM information_schema.tables - WHERE table_name = 'timed_belief_min_v' - AND table_schema = 'public' - ); - """ - ) - ) - - exists = result.scalar() - if not exists: - return None - print("Using timed_belief_min_v materialized view for optimized queries.") +def get_timed_belief_min_v(session) -> Table | None: + """Define the structure of the timed_belief_min_v materialized view.""" # Only create the table definition if it exists metadata = MetaData() From fd41966d54877f74b61b9d539f4e677fc70b0e14 Mon Sep 17 00:00:00 2001 From: Muhammad-Moiz626 Date: Mon, 25 Aug 2025 19:59:40 +0500 Subject: [PATCH 04/12] Resolved comments. --- documentation/configuration.rst | 11 +++++++++- flexmeasures/api/dev/sensors.py | 2 +- flexmeasures/api/v3_0/assets.py | 2 +- flexmeasures/cli/db_ops.py | 22 +++++++++++++------ .../timed_beliefs_materialized_views.py | 8 +++++++ .../data/schemas/tests/test_input_schema.py | 2 +- flexmeasures/utils/validation_utils.py | 2 +- 7 files changed, 37 insertions(+), 12 deletions(-) diff --git a/documentation/configuration.rst b/documentation/configuration.rst index da8c20c214..9ef69c4481 100644 --- a/documentation/configuration.rst +++ b/documentation/configuration.rst @@ -671,4 +671,13 @@ FLEXMEASURES_API_SUNSET_LINK Allow to override the default sunset link for your clients. -Default: ``None`` (defaults are set internally for each sunset API version, e.g. ``"https://flexmeasures.readthedocs.io/en/v0.13.0/api/v2_0.html"`` for v2.0) \ No newline at end of file +Default: ``None`` (defaults are set internally for each sunset API version, e.g. ``"https://flexmeasures.readthedocs.io/en/v0.13.0/api/v2_0.html"`` for v2.0) + +Perforamance optimizations +---------------------------- + +FLEXMEASURES_MVIEW_UPDATE_INTERVAL +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +Interval in minutes to refresh the materialized views in the background. + +Default: ``0`` (minutes) diff --git a/flexmeasures/api/dev/sensors.py b/flexmeasures/api/dev/sensors.py index 75171eff44..7a452411a9 100644 --- a/flexmeasures/api/dev/sensors.py +++ b/flexmeasures/api/dev/sensors.py @@ -115,7 +115,7 @@ def get_chart_data(self, id: int, sensor: Sensor, **kwargs): - "resolution" (see :ref:`resolutions`) - "most_recent_beliefs_only" (if true, returns the most recent belief for each event; if false, returns each belief for each event; defaults to true) """ - kwargs["timed_belief_min_v"] = get_timed_belief_min_v(db.session) + kwargs["timed_belief_min_v"] = get_timed_belief_min_v() return sensor.search_beliefs(as_json=True, **kwargs) @route("//chart_annotations", strict_slashes=False) diff --git a/flexmeasures/api/v3_0/assets.py b/flexmeasures/api/v3_0/assets.py index ae460053c2..5f693755d8 100644 --- a/flexmeasures/api/v3_0/assets.py +++ b/flexmeasures/api/v3_0/assets.py @@ -674,7 +674,7 @@ def get_chart_data(self, id: int, asset: GenericAsset, **kwargs): Data for use in charts (in case you have the chart specs already). """ sensors = flatten_unique(asset.validate_sensors_to_show()) - kwargs["timed_belief_min_v"] = get_timed_belief_min_v(db.session) + kwargs["timed_belief_min_v"] = get_timed_belief_min_v() return asset.search_beliefs(sensors=sensors, as_json=True, **kwargs) @route("//auditlog") diff --git a/flexmeasures/cli/db_ops.py b/flexmeasures/cli/db_ops.py index 01c04a41cf..8977acb288 100644 --- a/flexmeasures/cli/db_ops.py +++ b/flexmeasures/cli/db_ops.py @@ -100,28 +100,36 @@ def restore(file: str): @with_appcontext @click.option("--concurrent", is_flag=True, default=False) def refresh_materialized_views(concurrent: bool): - """Refresh materialized views for better query performance.""" + """ + Refresh the materialized views for getting the most recent data. + By default, this locks the materialized view for the duration of the refresh. + Use the --concurrent option to avoid locking, at the cost of higher resource usage and + the requirement that a unique index exists on the materialized view. + """ from sqlalchemy import text refresh_type = "CONCURRENTLY" if concurrent else "" import time start_time = time.time() - click.echo( - f"Refreshing materialized views {'CONCURRENTLY' if concurrent else 'without concurrency'}..." + click.secho( + f"Refreshing materialized views {'CONCURRENTLY' if concurrent else 'without concurrency'}...", + **MsgStyle.INFO, ) try: db.session.execute( - text(f"REFRESH MATERIALIZED {refresh_type} VIEW timed_belief_min_v;") + text(f"REFRESH MATERIALIZED VIEW {refresh_type} timed_belief_min_v;") ) db.session.commit() elapsed_time = time.time() - start_time - click.echo( - f"✓ Materialized views refreshed successfully in {elapsed_time:.2f} seconds" + click.secho( + f"✓ Materialized views refreshed successfully in {elapsed_time:.2f} seconds", + **MsgStyle.SUCCESS, ) except Exception as e: db.session.rollback() - click.echo(f"✗ Error refreshing materialized views: {e}") + click.secho(f"✗ Error refreshing materialized views: {e}", **MsgStyle.ERROR) + raise click.Abort() app.cli.add_command(fm_db_ops) diff --git a/flexmeasures/data/migrations/versions/timed_beliefs_materialized_views.py b/flexmeasures/data/migrations/versions/timed_beliefs_materialized_views.py index f4aa64e4d5..3bc5a8657c 100644 --- a/flexmeasures/data/migrations/versions/timed_beliefs_materialized_views.py +++ b/flexmeasures/data/migrations/versions/timed_beliefs_materialized_views.py @@ -58,6 +58,14 @@ def upgrade(): """ ) + # Create a unique index to allow concurrent refreshes + op.execute( + """ + CREATE UNIQUE INDEX idx_timed_belief_min_v_unique + ON timed_belief_min_v(sensor_id, event_start, source_id); + """ + ) + def downgrade(): op.execute("DROP MATERIALIZED VIEW IF EXISTS timed_belief_min_v CASCADE;") diff --git a/flexmeasures/data/schemas/tests/test_input_schema.py b/flexmeasures/data/schemas/tests/test_input_schema.py index 97bcdd6ebb..05df9c1e58 100644 --- a/flexmeasures/data/schemas/tests/test_input_schema.py +++ b/flexmeasures/data/schemas/tests/test_input_schema.py @@ -19,7 +19,7 @@ def test_input_schema(): excluded_arg_names = [ "as_json", # used in Sensor.search_beliefs but not in TimedBelief.search "timed_belief_min_v", # used in Sensor.search_beliefs but not in TimedBelief.search - "use_materialized_view", # used in Sensor.search_beliefs but not in + "use_materialized_view", # used in Sensor.search_beliefs as well as in TimedBelief.search "compress_json", # used in Sensor.search_beliefs but not in TimedBelief.search ] diff --git a/flexmeasures/utils/validation_utils.py b/flexmeasures/utils/validation_utils.py index 5b3aec5eb9..558180e77e 100644 --- a/flexmeasures/utils/validation_utils.py +++ b/flexmeasures/utils/validation_utils.py @@ -54,7 +54,7 @@ def validate_url(value): return value -def get_timed_belief_min_v(session) -> Table | None: +def get_timed_belief_min_v() -> Table | None: """Define the structure of the timed_belief_min_v materialized view.""" # Only create the table definition if it exists From e483d982367fd1283f9c4e8778ee49378200a23b Mon Sep 17 00:00:00 2001 From: Muhammad-Moiz626 Date: Mon, 25 Aug 2025 20:15:16 +0500 Subject: [PATCH 05/12] Tiny refactor. --- flexmeasures/utils/validation_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flexmeasures/utils/validation_utils.py b/flexmeasures/utils/validation_utils.py index 558180e77e..92832e3090 100644 --- a/flexmeasures/utils/validation_utils.py +++ b/flexmeasures/utils/validation_utils.py @@ -54,7 +54,7 @@ def validate_url(value): return value -def get_timed_belief_min_v() -> Table | None: +def get_timed_belief_min_v() -> Table: """Define the structure of the timed_belief_min_v materialized view.""" # Only create the table definition if it exists From 74d66518d3823cabff8de59dacf15d49d5e046ce Mon Sep 17 00:00:00 2001 From: Muhammad-Moiz626 Date: Tue, 2 Sep 2025 14:03:10 +0500 Subject: [PATCH 06/12] Update documentation/configuration.rst Co-authored-by: Felix Claessen <30658763+Flix6x@users.noreply.github.com> Signed-off-by: Muhammad-Moiz626 --- documentation/configuration.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/documentation/configuration.rst b/documentation/configuration.rst index 9ef69c4481..63b3691b03 100644 --- a/documentation/configuration.rst +++ b/documentation/configuration.rst @@ -673,7 +673,7 @@ Allow to override the default sunset link for your clients. Default: ``None`` (defaults are set internally for each sunset API version, e.g. ``"https://flexmeasures.readthedocs.io/en/v0.13.0/api/v2_0.html"`` for v2.0) -Perforamance optimizations +Performance optimizations ---------------------------- FLEXMEASURES_MVIEW_UPDATE_INTERVAL From c65ef8c08c0465a1e0594180dd0df2b7ddb4b46a Mon Sep 17 00:00:00 2001 From: Muhammad-Moiz626 Date: Tue, 2 Sep 2025 20:36:30 +0500 Subject: [PATCH 07/12] Moved get_timed_belief_min_v and some other small changes. --- documentation/configuration.rst | 6 ++-- flexmeasures/api/dev/sensors.py | 6 ++-- flexmeasures/api/v3_0/assets.py | 6 ++-- flexmeasures/data/config.py | 7 ++++- flexmeasures/data/models/time_series.py | 38 ++++++++++++++++++++++++- flexmeasures/ui/views/assets/views.py | 2 +- flexmeasures/utils/config_defaults.py | 2 +- flexmeasures/utils/validation_utils.py | 17 ----------- 8 files changed, 52 insertions(+), 32 deletions(-) diff --git a/documentation/configuration.rst b/documentation/configuration.rst index 9ef69c4481..902c74ea39 100644 --- a/documentation/configuration.rst +++ b/documentation/configuration.rst @@ -676,8 +676,8 @@ Default: ``None`` (defaults are set internally for each sunset API version, e.g. Perforamance optimizations ---------------------------- -FLEXMEASURES_MVIEW_UPDATE_INTERVAL +FLEXMEASURES_MVIEW_REFRESH_INTERVAL ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ -Interval in minutes to refresh the materialized views in the background. +Interval in minutes to refresh the materialized db view which caches the most recent beliefs at a given point in time (for faster queries). -Default: ``0`` (minutes) +Default: None diff --git a/flexmeasures/api/dev/sensors.py b/flexmeasures/api/dev/sensors.py index 7a452411a9..d7a153ba96 100644 --- a/flexmeasures/api/dev/sensors.py +++ b/flexmeasures/api/dev/sensors.py @@ -20,9 +20,7 @@ from flexmeasures.data.models.time_series import Sensor from flexmeasures.data.services.annotations import prepare_annotations_for_chart from flexmeasures.ui.utils.view_utils import set_session_variables -from flexmeasures.utils.validation_utils import ( - get_timed_belief_min_v, -) +from flexmeasures.data.config import timed_belief_min_v class SensorAPI(FlaskView): @@ -115,7 +113,7 @@ def get_chart_data(self, id: int, sensor: Sensor, **kwargs): - "resolution" (see :ref:`resolutions`) - "most_recent_beliefs_only" (if true, returns the most recent belief for each event; if false, returns each belief for each event; defaults to true) """ - kwargs["timed_belief_min_v"] = get_timed_belief_min_v() + kwargs["timed_belief_min_v"] = timed_belief_min_v return sensor.search_beliefs(as_json=True, **kwargs) @route("//chart_annotations", strict_slashes=False) diff --git a/flexmeasures/api/v3_0/assets.py b/flexmeasures/api/v3_0/assets.py index 5f693755d8..180661e465 100644 --- a/flexmeasures/api/v3_0/assets.py +++ b/flexmeasures/api/v3_0/assets.py @@ -47,9 +47,7 @@ from flexmeasures.utils.coding_utils import ( flatten_unique, ) -from flexmeasures.utils.validation_utils import ( - get_timed_belief_min_v, -) +from flexmeasures.data.config import timed_belief_min_v from flexmeasures.ui.utils.view_utils import clear_session, set_session_variables from flexmeasures.auth.policy import check_access from werkzeug.exceptions import Forbidden, Unauthorized @@ -674,7 +672,7 @@ def get_chart_data(self, id: int, asset: GenericAsset, **kwargs): Data for use in charts (in case you have the chart specs already). """ sensors = flatten_unique(asset.validate_sensors_to_show()) - kwargs["timed_belief_min_v"] = get_timed_belief_min_v() + kwargs["timed_belief_min_v"] = timed_belief_min_v return asset.search_beliefs(sensors=sensors, as_json=True, **kwargs) @route("//auditlog") diff --git a/flexmeasures/data/config.py b/flexmeasures/data/config.py index f8c98c6c8e..3d3a741613 100644 --- a/flexmeasures/data/config.py +++ b/flexmeasures/data/config.py @@ -16,6 +16,7 @@ ) Base = None # type: ignore session_options = None +timed_belief_min_v = None def init_db(): @@ -32,7 +33,7 @@ def init_db(): def configure_db_for(app: Flask): """Call this to configure the database and the tools we use on it for the Flask app. This should only be called once in the app's lifetime.""" - global db, Base + global db, Base, timed_belief_min_v with app.app_context(): db.init_app(app) @@ -51,6 +52,10 @@ def configure_db_for(app: Flask): forecasting, ) # noqa: F401 + from flexmeasures.data.models.time_series import get_timed_belief_min_v + + timed_belief_min_v = get_timed_belief_min_v(db.session) + # This would create db structure based on models, but you should use `flask db upgrade` for that. # Base.metadata.create_all(bind=db.engine) diff --git a/flexmeasures/data/models/time_series.py b/flexmeasures/data/models/time_series.py index 736168431b..1b43565144 100644 --- a/flexmeasures/data/models/time_series.py +++ b/flexmeasures/data/models/time_series.py @@ -8,7 +8,17 @@ from flask import current_app import pandas as pd -from sqlalchemy import select, Table +from sqlalchemy import ( + select, + Table, + Column, + DateTime, + Integer, + MetaData, + text, + Interval, +) +from sqlalchemy.orm import Session from sqlalchemy.ext.declarative import declared_attr from sqlalchemy.ext.mutable import MutableDict from sqlalchemy.schema import UniqueConstraint @@ -47,6 +57,32 @@ from flexmeasures.utils.geo_utils import parse_lat_lng +def get_timed_belief_min_v(session: Session) -> Table | None: + """Define the structure of the timed_belief_min_v materialized view.""" + + timed_belief_min_v = session.execute( + text( + """ + SELECT * + FROM pg_matviews + WHERE matviewname = 'timed_belief_min_v'; + """ + ) + ).fetchone() + if timed_belief_min_v: + metadata = MetaData() + timed_belief_min_v = Table( + "timed_belief_min_v", + metadata, + Column("sensor_id", Integer), + Column("event_start", DateTime), + Column("source_id", Integer), + Column("most_recent_belief_horizon", Interval), + ) + + return timed_belief_min_v + + class Sensor(db.Model, tb.SensorDBMixin, AuthModelMixin): """A sensor measures events.""" diff --git a/flexmeasures/ui/views/assets/views.py b/flexmeasures/ui/views/assets/views.py index 50bf17d310..62a38421b0 100644 --- a/flexmeasures/ui/views/assets/views.py +++ b/flexmeasures/ui/views/assets/views.py @@ -356,7 +356,7 @@ def graphs(self, id: str, start_time=None, end_time=None): asset_form.with_options() asset_form.process(obj=asset) mv_refresh_interval = current_app.config.get( - "FLEXMEASURES_MVIEW_UPDATE_INTERVAL", None + "FLEXMEASURES_MVIEW_REFRESH_INTERVAL", None ) return render_flexmeasures_template( diff --git a/flexmeasures/utils/config_defaults.py b/flexmeasures/utils/config_defaults.py index ce323b26e9..8df4fb6f5b 100644 --- a/flexmeasures/utils/config_defaults.py +++ b/flexmeasures/utils/config_defaults.py @@ -21,7 +21,7 @@ class Config(object): DEBUG: bool = False LOGGING_LEVEL: int = logging.WARNING SECRET_KEY: str | None = None - FLEXMEASURES_MVIEW_UPDATE_INTERVAL: int | None = None + FLEXMEASURES_MVIEW_REFRESH_INTERVAL: int | None = None FLEXMEASURES_ENV_DEFAULT = "production" diff --git a/flexmeasures/utils/validation_utils.py b/flexmeasures/utils/validation_utils.py index 92832e3090..50935ed544 100644 --- a/flexmeasures/utils/validation_utils.py +++ b/flexmeasures/utils/validation_utils.py @@ -1,5 +1,4 @@ import re -from sqlalchemy import MetaData, Table, Column, Integer, DateTime, Interval def validate_color_hex(value): @@ -52,19 +51,3 @@ def validate_url(value): ) return value - - -def get_timed_belief_min_v() -> Table: - """Define the structure of the timed_belief_min_v materialized view.""" - - # Only create the table definition if it exists - metadata = MetaData() - timed_belief_min_v = Table( - "timed_belief_min_v", - metadata, - Column("sensor_id", Integer), - Column("event_start", DateTime), - Column("source_id", Integer), - Column("most_recent_belief_horizon", Interval), - ) - return timed_belief_min_v From 942380cba6c069ec82719ea9290a3d1273b201ae Mon Sep 17 00:00:00 2001 From: Muhammad-Moiz626 Date: Wed, 3 Sep 2025 20:03:12 +0500 Subject: [PATCH 08/12] Moved get_timed_belief_min_v to timely-beliefs repo. --- flexmeasures/data/config.py | 13 +++++++--- flexmeasures/data/models/time_series.py | 33 ------------------------- 2 files changed, 10 insertions(+), 36 deletions(-) diff --git a/flexmeasures/data/config.py b/flexmeasures/data/config.py index 3d3a741613..414f13f814 100644 --- a/flexmeasures/data/config.py +++ b/flexmeasures/data/config.py @@ -52,9 +52,16 @@ def configure_db_for(app: Flask): forecasting, ) # noqa: F401 - from flexmeasures.data.models.time_series import get_timed_belief_min_v - - timed_belief_min_v = get_timed_belief_min_v(db.session) + import timely_beliefs.utils as tb_utils + + try: + timed_belief_min_v = tb_utils.get_timed_belief_min_v(db.session) + except Exception: + timed_belief_min_v = None + app.logger.warning( + "Could not determine timed_belief_min_v. Do you have timely-beliefs installed and is the latest version?" + " Beliefs will be retrieved from the actual table instead of the materialized view.", + ) # This would create db structure based on models, but you should use `flask db upgrade` for that. # Base.metadata.create_all(bind=db.engine) diff --git a/flexmeasures/data/models/time_series.py b/flexmeasures/data/models/time_series.py index b42fee7839..49d45919e4 100644 --- a/flexmeasures/data/models/time_series.py +++ b/flexmeasures/data/models/time_series.py @@ -11,14 +11,7 @@ from sqlalchemy import ( select, Table, - Column, - DateTime, - Integer, - MetaData, - text, - Interval, ) -from sqlalchemy.orm import Session from sqlalchemy.ext.declarative import declared_attr from sqlalchemy.ext.mutable import MutableDict from sqlalchemy.schema import UniqueConstraint @@ -58,32 +51,6 @@ from flexmeasures.utils.geo_utils import parse_lat_lng -def get_timed_belief_min_v(session: Session) -> Table | None: - """Define the structure of the timed_belief_min_v materialized view.""" - - timed_belief_min_v = session.execute( - text( - """ - SELECT * - FROM pg_matviews - WHERE matviewname = 'timed_belief_min_v'; - """ - ) - ).fetchone() - if timed_belief_min_v: - metadata = MetaData() - timed_belief_min_v = Table( - "timed_belief_min_v", - metadata, - Column("sensor_id", Integer), - Column("event_start", DateTime), - Column("source_id", Integer), - Column("most_recent_belief_horizon", Interval), - ) - - return timed_belief_min_v - - class Sensor(db.Model, tb.SensorDBMixin, AuthModelMixin, OrderByIdMixin): """A sensor measures events.""" From b6ca3d457e4d8763274b60c9214241ab857fbaa1 Mon Sep 17 00:00:00 2001 From: Muhammad-Moiz626 Date: Thu, 4 Sep 2025 11:12:55 +0500 Subject: [PATCH 09/12] Updated base.html according to materialized view refresh interval --- flexmeasures/ui/templates/base.html | 26 +++++++++++++++++--------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/flexmeasures/ui/templates/base.html b/flexmeasures/ui/templates/base.html index f2832b1d9d..1172043039 100644 --- a/flexmeasures/ui/templates/base.html +++ b/flexmeasures/ui/templates/base.html @@ -227,15 +227,23 @@ {% endfor %} - {% if active_subpage == "asset_graph" %} -
- - -
- {% endif %} -
- - +
+ {# Option to refresh data from materialized view if applicable #} + {# Note: the checkbox is checked by default, meaning the materialized view is used (for faster loading) #} + {# Unchecking it will reload the graph with fresh data, which may take more time #} + {# This option is only shown when a materialized view refresh interval is configured #} + {% if active_subpage == "asset_graph" and mv_refresh_interval %} +
+ + +
+ {% endif %} +
+ + +
{% endif %} From e2140a16b1efe80bae51f44abb8f8b066f2cca82 Mon Sep 17 00:00:00 2001 From: Muhammad-Moiz626 Date: Thu, 4 Sep 2025 11:29:44 +0500 Subject: [PATCH 10/12] Renamed timed_belief_min_v to most_recent_beliefs_mview --- flexmeasures/api/dev/sensors.py | 4 ++-- flexmeasures/api/v3_0/assets.py | 4 ++-- flexmeasures/cli/db_ops.py | 2 +- flexmeasures/data/config.py | 11 ++++++----- .../versions/timed_beliefs_materialized_views.py | 16 ++++++++-------- flexmeasures/data/models/generic_assets.py | 4 ++-- flexmeasures/data/models/time_series.py | 8 ++++---- .../data/schemas/tests/test_input_schema.py | 2 +- 8 files changed, 26 insertions(+), 25 deletions(-) diff --git a/flexmeasures/api/dev/sensors.py b/flexmeasures/api/dev/sensors.py index d7a153ba96..b720273ccb 100644 --- a/flexmeasures/api/dev/sensors.py +++ b/flexmeasures/api/dev/sensors.py @@ -20,7 +20,7 @@ from flexmeasures.data.models.time_series import Sensor from flexmeasures.data.services.annotations import prepare_annotations_for_chart from flexmeasures.ui.utils.view_utils import set_session_variables -from flexmeasures.data.config import timed_belief_min_v +from flexmeasures.data.config import most_recent_beliefs_mview class SensorAPI(FlaskView): @@ -113,7 +113,7 @@ def get_chart_data(self, id: int, sensor: Sensor, **kwargs): - "resolution" (see :ref:`resolutions`) - "most_recent_beliefs_only" (if true, returns the most recent belief for each event; if false, returns each belief for each event; defaults to true) """ - kwargs["timed_belief_min_v"] = timed_belief_min_v + kwargs["most_recent_beliefs_mview"] = most_recent_beliefs_mview return sensor.search_beliefs(as_json=True, **kwargs) @route("//chart_annotations", strict_slashes=False) diff --git a/flexmeasures/api/v3_0/assets.py b/flexmeasures/api/v3_0/assets.py index c562a77afe..363e33628c 100644 --- a/flexmeasures/api/v3_0/assets.py +++ b/flexmeasures/api/v3_0/assets.py @@ -48,7 +48,7 @@ from flexmeasures.utils.coding_utils import ( flatten_unique, ) -from flexmeasures.data.config import timed_belief_min_v +from flexmeasures.data.config import most_recent_beliefs_mview from flexmeasures.ui.utils.view_utils import clear_session, set_session_variables from flexmeasures.auth.policy import check_access from werkzeug.exceptions import Forbidden, Unauthorized @@ -710,7 +710,7 @@ def get_chart_data(self, id: int, asset: GenericAsset, **kwargs): Data for use in charts (in case you have the chart specs already). """ sensors = flatten_unique(asset.validate_sensors_to_show()) - kwargs["timed_belief_min_v"] = timed_belief_min_v + kwargs["most_recent_beliefs_mview"] = most_recent_beliefs_mview return asset.search_beliefs(sensors=sensors, as_json=True, **kwargs) @route("//auditlog") diff --git a/flexmeasures/cli/db_ops.py b/flexmeasures/cli/db_ops.py index 8977acb288..6bb649eeaa 100644 --- a/flexmeasures/cli/db_ops.py +++ b/flexmeasures/cli/db_ops.py @@ -118,7 +118,7 @@ def refresh_materialized_views(concurrent: bool): ) try: db.session.execute( - text(f"REFRESH MATERIALIZED VIEW {refresh_type} timed_belief_min_v;") + text(f"REFRESH MATERIALIZED VIEW {refresh_type} most_recent_beliefs_mview;") ) db.session.commit() elapsed_time = time.time() - start_time diff --git a/flexmeasures/data/config.py b/flexmeasures/data/config.py index 414f13f814..55a78ed9a7 100644 --- a/flexmeasures/data/config.py +++ b/flexmeasures/data/config.py @@ -16,7 +16,7 @@ ) Base = None # type: ignore session_options = None -timed_belief_min_v = None +most_recent_beliefs_mview = None def init_db(): @@ -33,7 +33,7 @@ def init_db(): def configure_db_for(app: Flask): """Call this to configure the database and the tools we use on it for the Flask app. This should only be called once in the app's lifetime.""" - global db, Base, timed_belief_min_v + global db, Base, most_recent_beliefs_mview with app.app_context(): db.init_app(app) @@ -55,11 +55,12 @@ def configure_db_for(app: Flask): import timely_beliefs.utils as tb_utils try: - timed_belief_min_v = tb_utils.get_timed_belief_min_v(db.session) + most_recent_beliefs_mview = tb_utils.get_most_recent_beliefs_mview( + db.session + ) except Exception: - timed_belief_min_v = None app.logger.warning( - "Could not determine timed_belief_min_v. Do you have timely-beliefs installed and is the latest version?" + "Could not determine most_recent_beliefs_mview. Do you have timely-beliefs installed and is the latest version?" " Beliefs will be retrieved from the actual table instead of the materialized view.", ) diff --git a/flexmeasures/data/migrations/versions/timed_beliefs_materialized_views.py b/flexmeasures/data/migrations/versions/timed_beliefs_materialized_views.py index 3bc5a8657c..d82ed668ff 100644 --- a/flexmeasures/data/migrations/versions/timed_beliefs_materialized_views.py +++ b/flexmeasures/data/migrations/versions/timed_beliefs_materialized_views.py @@ -19,7 +19,7 @@ def upgrade(): # Create the materialized view with proper alias op.execute( """ - CREATE MATERIALIZED VIEW timed_belief_min_v AS + CREATE MATERIALIZED VIEW most_recent_beliefs_mview AS SELECT * FROM ( SELECT @@ -46,26 +46,26 @@ def upgrade(): # Create indexes op.execute( """ - CREATE INDEX idx_timed_belief_min_v_sensor_event - ON timed_belief_min_v(sensor_id, event_start); + CREATE INDEX idx_most_recent_beliefs_mview_sensor_event + ON most_recent_beliefs_mview(sensor_id, event_start); """ ) op.execute( """ - CREATE INDEX idx_timed_belief_min_v_event_start - ON timed_belief_min_v(event_start); + CREATE INDEX idx_most_recent_beliefs_mview_event_start + ON most_recent_beliefs_mview(event_start); """ ) # Create a unique index to allow concurrent refreshes op.execute( """ - CREATE UNIQUE INDEX idx_timed_belief_min_v_unique - ON timed_belief_min_v(sensor_id, event_start, source_id); + CREATE UNIQUE INDEX idx_most_recent_beliefs_mview_unique + ON most_recent_beliefs_mview(sensor_id, event_start, source_id); """ ) def downgrade(): - op.execute("DROP MATERIALIZED VIEW IF EXISTS timed_belief_min_v CASCADE;") + op.execute("DROP MATERIALIZED VIEW IF EXISTS most_recent_beliefs_mview CASCADE;") diff --git a/flexmeasures/data/models/generic_assets.py b/flexmeasures/data/models/generic_assets.py index 78666aee3b..1178ebce9d 100644 --- a/flexmeasures/data/models/generic_assets.py +++ b/flexmeasures/data/models/generic_assets.py @@ -649,7 +649,7 @@ def search_beliefs( # noqa C901 compress_json: bool = False, resolution: timedelta | None = None, use_materialized_view: bool = True, - timed_belief_min_v: Table | None = None, + most_recent_beliefs_mview: Table | None = None, ) -> BeliefsDataFrame | str: """Search all beliefs about events for all sensors of this asset @@ -691,7 +691,7 @@ def search_beliefs( # noqa C901 one_deterministic_belief_per_event_per_source=True, resolution=resolution, use_materialized_view=use_materialized_view, - timed_belief_min_v=timed_belief_min_v, + most_recent_beliefs_mview=most_recent_beliefs_mview, ) if as_json and not compress_json: from flexmeasures.data.services.time_series import simplify_index diff --git a/flexmeasures/data/models/time_series.py b/flexmeasures/data/models/time_series.py index 49d45919e4..34b8f6718e 100644 --- a/flexmeasures/data/models/time_series.py +++ b/flexmeasures/data/models/time_series.py @@ -360,7 +360,7 @@ def search_beliefs( # noqa: C901 compress_json: bool = False, resolution: str | timedelta | None = None, use_materialized_view: bool = True, - timed_belief_min_v: Table | None = None, + most_recent_beliefs_mview: Table | None = None, ) -> tb.BeliefsDataFrame | str: """Search all beliefs about events for this sensor. @@ -407,7 +407,7 @@ def search_beliefs( # noqa: C901 one_deterministic_belief_per_event_per_source=one_deterministic_belief_per_event_per_source, resolution=resolution, use_materialized_view=use_materialized_view, - timed_belief_min_v=timed_belief_min_v, + most_recent_beliefs_mview=most_recent_beliefs_mview, ) if as_json and not compress_json: df = bdf.reset_index() @@ -812,7 +812,7 @@ def search( resolution: str | timedelta = None, sum_multiple: bool = True, use_materialized_view: bool = True, - timed_belief_min_v: Table | None = None, + most_recent_beliefs_mview: Table | None = None, ) -> tb.BeliefsDataFrame | dict[str, tb.BeliefsDataFrame]: """Search all beliefs about events for the given sensors. @@ -897,7 +897,7 @@ def search( custom_filter_criteria=source_criteria, custom_join_targets=custom_join_targets, use_materialized_view=use_materialized_view, - timed_belief_min_v=timed_belief_min_v, + most_recent_beliefs_mview=most_recent_beliefs_mview, ) if use_latest_version_per_event: bdf = keep_latest_version( diff --git a/flexmeasures/data/schemas/tests/test_input_schema.py b/flexmeasures/data/schemas/tests/test_input_schema.py index 05df9c1e58..91e7d847f8 100644 --- a/flexmeasures/data/schemas/tests/test_input_schema.py +++ b/flexmeasures/data/schemas/tests/test_input_schema.py @@ -18,7 +18,7 @@ def test_input_schema(): # These arguments are not mapped to a field at all (state a reason) excluded_arg_names = [ "as_json", # used in Sensor.search_beliefs but not in TimedBelief.search - "timed_belief_min_v", # used in Sensor.search_beliefs but not in TimedBelief.search + "most_recent_beliefs_mview", # used in Sensor.search_beliefs but not in TimedBelief.search "use_materialized_view", # used in Sensor.search_beliefs as well as in TimedBelief.search "compress_json", # used in Sensor.search_beliefs but not in TimedBelief.search ] From fa522faba3f43804039aaf47fb2845a0189451a0 Mon Sep 17 00:00:00 2001 From: Muhammad-Moiz626 Date: Thu, 4 Sep 2025 11:48:28 +0500 Subject: [PATCH 11/12] Added changelog. --- documentation/changelog.rst | 1 + 1 file changed, 1 insertion(+) diff --git a/documentation/changelog.rst b/documentation/changelog.rst index 78293e1f38..e296371a40 100644 --- a/documentation/changelog.rst +++ b/documentation/changelog.rst @@ -15,6 +15,7 @@ New features * Display KPIs for asset sensors with daily event resolution [see `PR #1608 `_, `PR #1634 `_ and `PR #1656 `_] * Improved timestamp on sensor detail page to be more friendly [see `PR #1632 `_] * Asset types support: new API endpoint (`GET /assets/types`), better docs and fix CLI command `flexmeasures show asset-types` [see `PR #1663 `_] +* Added Materialized View for latest beliefs per sensor to speed up data loading [see `PR #1671 `_] Infrastructure / Support ---------------------- From aabd536a87f3488600adad996eaed7302749fb06 Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Mon, 15 Dec 2025 17:01:45 +0100 Subject: [PATCH 12/12] chore: db merge Signed-off-by: F.N. Claessen --- .../migrations/versions/75a82be1c1d6_merge.py | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) create mode 100644 flexmeasures/data/migrations/versions/75a82be1c1d6_merge.py diff --git a/flexmeasures/data/migrations/versions/75a82be1c1d6_merge.py b/flexmeasures/data/migrations/versions/75a82be1c1d6_merge.py new file mode 100644 index 0000000000..371a8ea660 --- /dev/null +++ b/flexmeasures/data/migrations/versions/75a82be1c1d6_merge.py @@ -0,0 +1,21 @@ +"""merge + +Revision ID: 75a82be1c1d6 +Revises: 6cca6c002135, c98798csds8c +Create Date: 2025-12-15 16:53:05.313298 + +""" + +# revision identifiers, used by Alembic. +revision = "75a82be1c1d6" +down_revision = ["6cca6c002135", "c98798csds8c"] +branch_labels = None +depends_on = None + + +def upgrade(): + pass + + +def downgrade(): + pass