diff --git a/documentation/changelog.rst b/documentation/changelog.rst index b11957f534..edaac58c89 100644 --- a/documentation/changelog.rst +++ b/documentation/changelog.rst @@ -9,6 +9,7 @@ v0.31.0 | February XX, 2026 New features ------------- +* Added Materialized View for latest beliefs per sensor to speed up data loading [see `PR #1671 `_] Infrastructure / Support ---------------------- diff --git a/documentation/configuration.rst b/documentation/configuration.rst index bf57f037d6..f440cf5010 100644 --- a/documentation/configuration.rst +++ b/documentation/configuration.rst @@ -725,6 +725,16 @@ Allow to override the default sunset link for your clients. Default: ``None`` (defaults are set internally for each sunset API version, e.g. ``"https://flexmeasures.readthedocs.io/en/v0.13.0/api/v2_0.html"`` for v2.0) +Performance optimizations +---------------------------- + +FLEXMEASURES_MVIEW_REFRESH_INTERVAL +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +Interval in minutes to refresh the materialized db view which caches the most recent beliefs at a given point in time (for faster queries). + +Default: None + + .. _reporters-config: Reporters @@ -741,4 +751,4 @@ Extend this list if you want to permit additional pseudo-methods in reporter pip .. note:: Only add trusted pseudo-methods here. Since these methods bypass Python signature validation, loosening this list unnecessarily can reduce safety guarantees in your data processing pipeline. -Default: ``["get_attribute"]`` \ No newline at end of file +Default: ``["get_attribute"]`` diff --git a/flexmeasures/api/dev/sensors.py b/flexmeasures/api/dev/sensors.py index 9b31699c4f..d189d9f535 100644 --- a/flexmeasures/api/dev/sensors.py +++ b/flexmeasures/api/dev/sensors.py @@ -20,6 +20,7 @@ from flexmeasures.data.models.time_series import Sensor from flexmeasures.data.services.annotations import prepare_annotations_for_chart from flexmeasures.ui.utils.view_utils import set_session_variables +from flexmeasures.data.config import most_recent_beliefs_mview class SensorAPI(FlaskView): @@ -93,6 +94,7 @@ def get_chart(self, id: int, sensor: Sensor, **kwargs): "most_recent_beliefs_only": fields.Boolean( required=False, load_default=True ), + "use_materialized_view": fields.Boolean(required=False, load_default=True), "compress_json": fields.Boolean(required=False), }, location="query", @@ -114,6 +116,7 @@ def get_chart_data(self, id: int, sensor: Sensor, **kwargs): - "resolution" (see [docs about describing timing](https://flexmeasures.readthedocs.io/latest/api/notation.html#frequency-and-resolution)) - "most_recent_beliefs_only" (if true, returns the most recent belief for each event; if false, returns each belief for each event; defaults to true) """ + kwargs["most_recent_beliefs_mview"] = most_recent_beliefs_mview return sensor.search_beliefs(as_json=True, **kwargs) @route("//chart_annotations", strict_slashes=False) diff --git a/flexmeasures/api/v3_0/assets.py b/flexmeasures/api/v3_0/assets.py index ec62911355..31a7eeb322 100644 --- a/flexmeasures/api/v3_0/assets.py +++ b/flexmeasures/api/v3_0/assets.py @@ -59,6 +59,7 @@ from flexmeasures.utils.coding_utils import ( flatten_unique, ) +from flexmeasures.data.config import most_recent_beliefs_mview from flexmeasures.ui.utils.view_utils import clear_session, set_session_variables from flexmeasures.auth.policy import check_access from flexmeasures.data.schemas.sensors import ( @@ -816,6 +817,7 @@ def get_chart(self, id: int, asset: GenericAsset, **kwargs): required=False, load_default=False ), "most_recent_beliefs_only": fields.Boolean(required=False), + "use_materialized_view": fields.Boolean(required=False, load_default=True), "compress_json": fields.Boolean(required=False), }, location="query", @@ -857,6 +859,7 @@ def get_chart_data(self, id: int, asset: GenericAsset, **kwargs): - Assets """ sensors = flatten_unique(asset.validate_sensors_to_show()) + kwargs["most_recent_beliefs_mview"] = most_recent_beliefs_mview return asset.search_beliefs(sensors=sensors, as_json=True, **kwargs) @route("//auditlog") diff --git a/flexmeasures/cli/db_ops.py b/flexmeasures/cli/db_ops.py index 21173150ef..6bb649eeaa 100644 --- a/flexmeasures/cli/db_ops.py +++ b/flexmeasures/cli/db_ops.py @@ -9,6 +9,7 @@ import click from flexmeasures.cli.utils import MsgStyle +from flexmeasures.data import db @click.group("db-ops") @@ -95,4 +96,40 @@ def restore(file: str): click.secho("db restore unsuccessful", **MsgStyle.ERROR) +@fm_db_ops.command("refresh-materialized-views") +@with_appcontext +@click.option("--concurrent", is_flag=True, default=False) +def refresh_materialized_views(concurrent: bool): + """ + Refresh the materialized views for getting the most recent data. + By default, this locks the materialized view for the duration of the refresh. + Use the --concurrent option to avoid locking, at the cost of higher resource usage and + the requirement that a unique index exists on the materialized view. + """ + from sqlalchemy import text + + refresh_type = "CONCURRENTLY" if concurrent else "" + import time + + start_time = time.time() + click.secho( + f"Refreshing materialized views {'CONCURRENTLY' if concurrent else 'without concurrency'}...", + **MsgStyle.INFO, + ) + try: + db.session.execute( + text(f"REFRESH MATERIALIZED VIEW {refresh_type} most_recent_beliefs_mview;") + ) + db.session.commit() + elapsed_time = time.time() - start_time + click.secho( + f"✓ Materialized views refreshed successfully in {elapsed_time:.2f} seconds", + **MsgStyle.SUCCESS, + ) + except Exception as e: + db.session.rollback() + click.secho(f"✗ Error refreshing materialized views: {e}", **MsgStyle.ERROR) + raise click.Abort() + + app.cli.add_command(fm_db_ops) diff --git a/flexmeasures/data/config.py b/flexmeasures/data/config.py index f8c98c6c8e..55a78ed9a7 100644 --- a/flexmeasures/data/config.py +++ b/flexmeasures/data/config.py @@ -16,6 +16,7 @@ ) Base = None # type: ignore session_options = None +most_recent_beliefs_mview = None def init_db(): @@ -32,7 +33,7 @@ def init_db(): def configure_db_for(app: Flask): """Call this to configure the database and the tools we use on it for the Flask app. This should only be called once in the app's lifetime.""" - global db, Base + global db, Base, most_recent_beliefs_mview with app.app_context(): db.init_app(app) @@ -51,6 +52,18 @@ def configure_db_for(app: Flask): forecasting, ) # noqa: F401 + import timely_beliefs.utils as tb_utils + + try: + most_recent_beliefs_mview = tb_utils.get_most_recent_beliefs_mview( + db.session + ) + except Exception: + app.logger.warning( + "Could not determine most_recent_beliefs_mview. Do you have timely-beliefs installed and is the latest version?" + " Beliefs will be retrieved from the actual table instead of the materialized view.", + ) + # This would create db structure based on models, but you should use `flask db upgrade` for that. # Base.metadata.create_all(bind=db.engine) diff --git a/flexmeasures/data/migrations/versions/75a82be1c1d6_merge.py b/flexmeasures/data/migrations/versions/75a82be1c1d6_merge.py new file mode 100644 index 0000000000..371a8ea660 --- /dev/null +++ b/flexmeasures/data/migrations/versions/75a82be1c1d6_merge.py @@ -0,0 +1,21 @@ +"""merge + +Revision ID: 75a82be1c1d6 +Revises: 6cca6c002135, c98798csds8c +Create Date: 2025-12-15 16:53:05.313298 + +""" + +# revision identifiers, used by Alembic. +revision = "75a82be1c1d6" +down_revision = ["6cca6c002135", "c98798csds8c"] +branch_labels = None +depends_on = None + + +def upgrade(): + pass + + +def downgrade(): + pass diff --git a/flexmeasures/data/migrations/versions/timed_beliefs_materialized_views.py b/flexmeasures/data/migrations/versions/timed_beliefs_materialized_views.py new file mode 100644 index 0000000000..d82ed668ff --- /dev/null +++ b/flexmeasures/data/migrations/versions/timed_beliefs_materialized_views.py @@ -0,0 +1,71 @@ +"""Add materialized view for belief optimization + +Revision ID: c98798csds8c +Revises: b8f3cda5e023 +Create Date: 2025-08-08 04:55:33.722545 + +""" + +from alembic import op + +# revision identifiers +revision = "c98798csds8c" +down_revision = "b8f3cda5e023" +branch_labels = None +depends_on = None + + +def upgrade(): + # Create the materialized view with proper alias + op.execute( + """ + CREATE MATERIALIZED VIEW most_recent_beliefs_mview AS + SELECT * + FROM ( + SELECT + timed_belief.sensor_id, + timed_belief.event_start, + timed_belief.source_id, + MIN(timed_belief.belief_horizon) AS most_recent_belief_horizon + FROM timed_belief + INNER JOIN data_source + ON data_source.id = timed_belief.source_id + GROUP BY + timed_belief.sensor_id, + timed_belief.event_start, + timed_belief.source_id + ) AS belief_mins + GROUP BY + sensor_id, + event_start, + source_id, + most_recent_belief_horizon; + """ + ) + + # Create indexes + op.execute( + """ + CREATE INDEX idx_most_recent_beliefs_mview_sensor_event + ON most_recent_beliefs_mview(sensor_id, event_start); + """ + ) + + op.execute( + """ + CREATE INDEX idx_most_recent_beliefs_mview_event_start + ON most_recent_beliefs_mview(event_start); + """ + ) + + # Create a unique index to allow concurrent refreshes + op.execute( + """ + CREATE UNIQUE INDEX idx_most_recent_beliefs_mview_unique + ON most_recent_beliefs_mview(sensor_id, event_start, source_id); + """ + ) + + +def downgrade(): + op.execute("DROP MATERIALIZED VIEW IF EXISTS most_recent_beliefs_mview CASCADE;") diff --git a/flexmeasures/data/models/generic_assets.py b/flexmeasures/data/models/generic_assets.py index 07c1cf2bc7..b50951c3ba 100644 --- a/flexmeasures/data/models/generic_assets.py +++ b/flexmeasures/data/models/generic_assets.py @@ -7,7 +7,7 @@ from flask import current_app from flask_security import current_user import pandas as pd -from sqlalchemy import select, and_ +from sqlalchemy import select, and_, Table from sqlalchemy.ext.hybrid import hybrid_method from sqlalchemy.sql.expression import func from sqlalchemy.ext.mutable import MutableDict, MutableList @@ -709,6 +709,8 @@ def search_beliefs( # noqa C901 as_json: bool = False, compress_json: bool = False, resolution: timedelta | None = None, + use_materialized_view: bool = True, + most_recent_beliefs_mview: Table | None = None, ) -> BeliefsDataFrame | str: """Search all beliefs about events for all sensors of this asset @@ -751,6 +753,8 @@ def search_beliefs( # noqa C901 most_recent_events_only=most_recent_events_only, one_deterministic_belief_per_event_per_source=True, resolution=resolution, + use_materialized_view=use_materialized_view, + most_recent_beliefs_mview=most_recent_beliefs_mview, ) if as_json and not compress_json: from flexmeasures.data.services.time_series import simplify_index diff --git a/flexmeasures/data/models/time_series.py b/flexmeasures/data/models/time_series.py index 78fc018246..879e094dd5 100644 --- a/flexmeasures/data/models/time_series.py +++ b/flexmeasures/data/models/time_series.py @@ -8,7 +8,10 @@ from flask import current_app import pandas as pd -from sqlalchemy import select +from sqlalchemy import ( + select, + Table, +) from sqlalchemy.ext.declarative import declared_attr from sqlalchemy.ext.mutable import MutableDict from sqlalchemy.schema import UniqueConstraint @@ -404,6 +407,8 @@ def search_beliefs( # noqa: C901 as_json: bool = False, compress_json: bool = False, resolution: str | timedelta | None = None, + use_materialized_view: bool = True, + most_recent_beliefs_mview: Table | None = None, ) -> tb.BeliefsDataFrame | str: """Search all beliefs about events for this sensor. @@ -449,6 +454,8 @@ def search_beliefs( # noqa: C901 one_deterministic_belief_per_event=one_deterministic_belief_per_event, one_deterministic_belief_per_event_per_source=one_deterministic_belief_per_event_per_source, resolution=resolution, + use_materialized_view=use_materialized_view, + most_recent_beliefs_mview=most_recent_beliefs_mview, ) if as_json and not compress_json: df = bdf.reset_index() @@ -859,6 +866,8 @@ def search( one_deterministic_belief_per_event_per_source: bool = False, resolution: str | timedelta = None, sum_multiple: bool = True, + use_materialized_view: bool = True, + most_recent_beliefs_mview: Table | None = None, ) -> tb.BeliefsDataFrame | dict[str, tb.BeliefsDataFrame]: """Search all beliefs about events for the given sensors. @@ -942,6 +951,8 @@ def search( **most_recent_filters, custom_filter_criteria=source_criteria, custom_join_targets=custom_join_targets, + use_materialized_view=use_materialized_view, + most_recent_beliefs_mview=most_recent_beliefs_mview, ) if use_latest_version_per_event: bdf = keep_latest_version( diff --git a/flexmeasures/data/schemas/tests/test_input_schema.py b/flexmeasures/data/schemas/tests/test_input_schema.py index 809f5d7ed6..91e7d847f8 100644 --- a/flexmeasures/data/schemas/tests/test_input_schema.py +++ b/flexmeasures/data/schemas/tests/test_input_schema.py @@ -18,6 +18,8 @@ def test_input_schema(): # These arguments are not mapped to a field at all (state a reason) excluded_arg_names = [ "as_json", # used in Sensor.search_beliefs but not in TimedBelief.search + "most_recent_beliefs_mview", # used in Sensor.search_beliefs but not in TimedBelief.search + "use_materialized_view", # used in Sensor.search_beliefs as well as in TimedBelief.search "compress_json", # used in Sensor.search_beliefs but not in TimedBelief.search ] diff --git a/flexmeasures/ui/templates/base.html b/flexmeasures/ui/templates/base.html index fa68870773..42c1ffc1fb 100644 --- a/flexmeasures/ui/templates/base.html +++ b/flexmeasures/ui/templates/base.html @@ -227,10 +227,22 @@ {% endfor %} -
-
+
+ {# Option to refresh data from materialized view if applicable #} + {# Note: the checkbox is checked by default, meaning the materialized view is used (for faster loading) #} + {# Unchecking it will reload the graph with fresh data, which may take more time #} + {# This option is only shown when a materialized view refresh interval is configured #} + {% if active_subpage == "asset_graph" and mv_refresh_interval %} +
+ + +
+ {% endif %} +
- +
{% if breadcrumb_info['current_asset_view'] == 'Graphs' %}
@@ -596,7 +608,7 @@ {% endif %} function createPicker(startDate, endDate, customRanges) { - + picker = new Litepicker({ element: document.getElementById('datepicker'), plugins: ['ranges', 'keyboardnav'], @@ -997,14 +1009,14 @@ * @param {String} queryEndDate The end date as a string in ISO format, to be used in the query. * @return {Promise} A promise that resolves with the data. */ - function fetchGraphDataAndKPIs(previousResult, startDate, endDate, queryStartDate, queryEndDate) { + function fetchGraphDataAndKPIs(previousResult, startDate, endDate, queryStartDate, queryEndDate, useMaterializedView=true) { if (previousResult && previousResult.start.getTime() === startDate.getTime() && previousResult.end.getTime() === endDate.getTime()){ return Promise.resolve(previousResult.data); } else { {% if active_subpage == "asset_graph" and has_kpis %} getAssetKPIs(); {% endif %} - return fetch(dataPath + '/chart_data?event_starts_after=' + queryStartDate + '&event_ends_before=' + queryEndDate + '&compress_json=true', { + return fetch(dataPath + '/chart_data?event_starts_after=' + queryStartDate + '&event_ends_before=' + queryEndDate + '&compress_json=true' + '&use_materialized_view=' + useMaterializedView, { method: "GET", headers: { "Content-Type": "application/json" }, signal: signal, @@ -1014,6 +1026,92 @@ } } + const useMaterializedViewCheckbox = document.getElementById("useMaterializedView"); + useMaterializedViewCheckbox.addEventListener("change", function() { + toggleMaterializedView(); + }); + + // Toggle materialized view usage for timely beliefs + function toggleMaterializedView() { + // Get the checked status of the checkbox + const isChecked = useMaterializedViewCheckbox.checked; + const pickerStartDate = picker.getStartDate().toJSDate(); + const queryStartDate= encodeURIComponent(toIsoStringWithOffset(pickerStartDate)); + var queryEndDate = picker.getEndDate(); + queryEndDate.setDate(queryEndDate.getDate() + 1); + queryEndDate = encodeURIComponent(toIsoStringWithOffset(queryEndDate.toJSDate())); + // Since we are going to make a call anyway so we'll send these values as null + const startDate = null; + const endDate = null; + previousResult = null; + + $("#spinner").show(); + if (isChecked) { + console.log("Checked.") + // fetchGraphDataAndKPIs(previousResult, startDate, endDate, queryStartDate, queryEndDate, true); + Promise.all([ + fetchGraphDataAndKPIs(previousResult, startDate, endDate, queryStartDate, queryEndDate, true), + /** + // Fetch annotations + fetch(dataPath + '/chart_annotations?event_starts_after=' + queryStartDate + '&event_ends_before=' + queryEndDate, { + method: "GET", + headers: {"Content-Type": "application/json"}, + signal: signal, + }) + .then(function(response) { return response.json(); }), + */ + + // Embed chart + embedAndLoad(chartSpecsPath + 'event_starts_after=' + queryStartDate + '&event_ends_before=' + queryEndDate + '&', elementId, datasetName, previousResult, startDate, endDate), + ]).then(function(result) { + $("#spinner").hide(); + vegaView.change(datasetName, vega.changeset().remove(vega.truthy).insert(result[0])).resize().run(); + previousResult = { + start: startDate, + end: endDate, + data: result[0] + }; + checkSourceMasking(previousResult.data); + playBackDataLoadedForKnownDateRange = false; + /** + vegaView.change(datasetName + '_annotations', vega.changeset().remove(vega.truthy).insert(result[1])).resize().run(); + */ + }).catch(console.error); + } + else { + // fetchGraphDataAndKPIs(previousResult, startDate, endDate, queryStartDate, queryEndDate, false); + Promise.all([ + fetchGraphDataAndKPIs(previousResult, startDate, endDate, queryStartDate, queryEndDate, false), + /** + // Fetch annotations + fetch(dataPath + '/chart_annotations?event_starts_after=' + queryStartDate + '&event_ends_before=' + queryEndDate, { + method: "GET", + headers: {"Content-Type": "application/json"}, + signal: signal, + }) + .then(function(response) { return response.json(); }), + */ + + // Embed chart + embedAndLoad(chartSpecsPath + 'event_starts_after=' + queryStartDate + '&event_ends_before=' + queryEndDate + '&', elementId, datasetName, previousResult, startDate, endDate), + ]).then(function(result) { + $("#spinner").hide(); + vegaView.change(datasetName, vega.changeset().remove(vega.truthy).insert(result[0])).resize().run(); + previousResult = { + start: startDate, + end: endDate, + data: result[0] + }; + checkSourceMasking(previousResult.data); + playBackDataLoadedForKnownDateRange = false; + /** + vegaView.change(datasetName + '_annotations', vega.changeset().remove(vega.truthy).insert(result[1])).resize().run(); + */ + }).catch(console.error); + } + $("#spinner").hide(); + } + {% if active_subpage == "asset_graph" and has_kpis %} function getAssetKPIs() { const pickerStartDate = picker.getStartDate().toJSDate(); @@ -1413,7 +1511,7 @@
{% endblock about %} - + {% block credits %} Credits. @@ -1455,21 +1553,21 @@

Icons from Flaticon
- + {% block support %} - + {% if FLEXMEASURES_SUPPORT_PAGE %} Get Support. {% endif %} - + {% endblock support %} {% block termsofservice %} - + {% if FLEXMEASURES_TOS_PAGE %} Terms of Service. {% endif %} - + {% endblock termsofservice %} diff --git a/flexmeasures/ui/views/assets/views.py b/flexmeasures/ui/views/assets/views.py index daec0ed067..19c03488d3 100644 --- a/flexmeasures/ui/views/assets/views.py +++ b/flexmeasures/ui/views/assets/views.py @@ -346,12 +346,16 @@ def graphs(self, id: str, start_time=None, end_time=None): asset_form = AssetForm() asset_form.with_options() asset_form.process(obj=asset) + mv_refresh_interval = current_app.config.get( + "FLEXMEASURES_MVIEW_REFRESH_INTERVAL", None + ) return render_flexmeasures_template( "assets/asset_graph.html", asset=asset, has_kpis=has_kpis, asset_kpis=asset_kpis, + mv_refresh_interval=mv_refresh_interval, current_page="Graphs", ) diff --git a/flexmeasures/utils/config_defaults.py b/flexmeasures/utils/config_defaults.py index 4d7b2fbb87..e7cb6444a0 100644 --- a/flexmeasures/utils/config_defaults.py +++ b/flexmeasures/utils/config_defaults.py @@ -21,6 +21,7 @@ class Config(object): DEBUG: bool = False LOGGING_LEVEL: int = logging.WARNING SECRET_KEY: str | None = None + FLEXMEASURES_MVIEW_REFRESH_INTERVAL: int | None = None FLEXMEASURES_ENV_DEFAULT = "production"