From ce394b381ea45b45228c59c607d5041afc79b0a8 Mon Sep 17 00:00:00 2001 From: Grufoony Date: Tue, 24 Feb 2026 17:25:42 +0000 Subject: [PATCH 1/5] Transforming from a polars-dependent version to a pyspark version --- src/dsf/tsm/tsm.py | 185 +++++++++++++++++++++++++++++---------------- 1 file changed, 119 insertions(+), 66 deletions(-) diff --git a/src/dsf/tsm/tsm.py b/src/dsf/tsm/tsm.py index cab2e898..a8e6c585 100644 --- a/src/dsf/tsm/tsm.py +++ b/src/dsf/tsm/tsm.py @@ -3,18 +3,35 @@ from pathlib import Path from typing import Dict, Optional -import polars as pl +from pyspark.sql import DataFrame, SparkSession, Window +import pyspark.sql.functions as F +import pyspark.sql.types as T + + +def _get_or_create_spark() -> SparkSession: + """Return the active SparkSession or create a local one.""" + return ( + SparkSession.builder + .master("local[*]") + .appName("TSM") + .config("spark.driver.memory", "128g") \ + .config("spark.executor.memory", "128g") \ + .config("spark.sql.shuffle.partitions", "1600") \ + .config("spark.sql.adaptive.enabled", "true") \ + .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \ + .getOrCreate() + ) class TSM: """Traffic State Monitoring data. Builds density/flow clusters from per-vehicle detector data stored in a - Polars ``DataFrame``. + PySpark ``DataFrame``. Parameters ---------- - data : pl.DataFrame + data : pyspark.sql.DataFrame Raw detector data. Must contain at least the columns ``detector``, ``timestamp``, and ``speed_kph`` (or names that are mapped to them via *column_mapping*). @@ -27,7 +44,7 @@ class TSM: - ``detector``: unique ID of the traffic detector (e.g. loop sensor). - ``timestamp``: timestamp of the vehicle passage (must be a - Polars datetime type). + PySpark ``TimestampType``). - ``speed_kph``: speed of the vehicle in km/h. Optional target columns: @@ -40,14 +57,15 @@ class TSM: def __init__( self, - data: pl.DataFrame, + data: DataFrame, column_mapping: Optional[Dict[str, str]] = None, ) -> None: if column_mapping is not None: - rename = { - src: eng for src, eng in column_mapping.items() if src in data.columns - } - self._df: pl.DataFrame = data.rename(rename) + df = data + for src, eng in column_mapping.items(): + if src in df.columns: + df = df.withColumnRenamed(src, eng) + self._df: DataFrame = df else: self._df = data @@ -63,7 +81,7 @@ def __init__( f"Available columns: {self._df.columns}" ) - self._result: Optional[pl.DataFrame] = None + self._result: Optional[DataFrame] = None # ------------------------------------------------------------------ # helpers @@ -102,78 +120,94 @@ def clusterize( group = self._group_cols # --- lanes sub-table (only when lane info is available) ----------- - lanes_df: Optional[pl.DataFrame] = None + lanes_df: Optional[DataFrame] = None if self._has_lane: - lanes_df = self._df.group_by(group).agg( - pl.col("lane").n_unique().alias("n_lanes") + lanes_df = self._df.groupBy(group).agg( + F.count_distinct("lane").alias("n_lanes") ) + # --- window for per-detector and direction in ordered operations ---------------------- + w = Window.partitionBy(group).orderBy("timestamp") + # --- main pipeline ------------------------------------------------ + df = self._df + + # delta_t_s: seconds since previous row for the same detector and direction + df = df.withColumn( + "prev_timestamp", F.lag("timestamp").over(w) + ).withColumn( + "delta_t_s", + F.when( + F.col("prev_timestamp").isNotNull(), + F.col("timestamp").cast("long") - F.col("prev_timestamp").cast("long"), + ), + ).drop("prev_timestamp") + + # distance_m + df = df.withColumn( + "distance_m", + F.col("speed_kph") * F.col("delta_t_s") / 3.6, + ) + + # new_cluster flag + df = df.withColumn( + "new_cluster", + ( + (F.col("distance_m") > F.lit(gap_factor) * (F.col("speed_kph") / 3.6)) + | F.col("delta_t_s").isNull() + ).cast("int"), + ) + + # cluster_local_id: cumulative sum of new_cluster within each detector and direction + w_unbounded = ( + Window.partitionBy(group) + .orderBy("timestamp") + .rowsBetween(Window.unboundedPreceding, Window.currentRow) + ) + df = df.withColumn( + "cluster_local_id", + F.sum("new_cluster").over(w_unbounded), + ) + + # aggregate per cluster result = ( - self._df.sort(group + ["timestamp"]) - .with_columns( - (pl.col("timestamp") - pl.col("timestamp").shift(1)) - .dt.total_seconds() - .over(group) - .alias("delta_t_s") - ) - .with_columns( - (pl.col("speed_kph") * pl.col("delta_t_s") / 3.6).alias("distance_m") - ) - .with_row_index("row_idx") - .with_columns( - ( - (pl.col("distance_m") > gap_factor * (pl.col("speed_kph") / 3.6)) - | pl.col("delta_t_s").is_null() - ).alias("new_cluster") - ) - .with_columns( - pl.col("new_cluster") - .cast(pl.Int32) - .cum_sum() - .over(group) - .alias("cluster_local_id") - ) - .group_by(group + ["cluster_local_id"]) + df.groupBy(group + ["cluster_local_id"]) .agg( - pl.col("speed_kph").mean().alias("mean_speed_kph"), - pl.len().alias("num_vehicles"), - (pl.col("distance_m") * 1e-3).sum().alias("cluster_len_km"), - pl.col("delta_t_s").sum().alias("cluster_dt_s"), + F.mean("speed_kph").alias("mean_speed_kph"), + F.count("*").alias("num_vehicles"), + F.sum(F.col("distance_m") * 1e-3).alias("cluster_len_km"), + F.sum("delta_t_s").alias("cluster_dt_s"), ) - .filter(pl.col("num_vehicles") > min_vehicles) + .filter(F.col("num_vehicles") > min_vehicles) ) # --- join lane count & compute density / flow --------------------- if lanes_df is not None: - result = result.join(lanes_df, on=group, how="left").with_columns( - ( - pl.col("num_vehicles") - / pl.col("cluster_len_km") - / pl.col("n_lanes") - ).alias("density"), - ( - pl.col("num_vehicles") - * 3.6e3 - / pl.col("cluster_dt_s") - / pl.col("n_lanes") - ).alias("flow"), + result = result.join(lanes_df, on=group, how="left").withColumn( + "density", + F.col("num_vehicles") / F.col("cluster_len_km") / F.col("n_lanes"), + ).withColumn( + "flow", + F.col("num_vehicles") * 3.6e3 / F.col("cluster_dt_s") / F.col("n_lanes"), ) else: # Without lane info assume 1 lane - result = result.with_columns( - (pl.col("num_vehicles") / pl.col("cluster_len_km")).alias("density"), - (pl.col("num_vehicles") * 3.6e3 / pl.col("cluster_dt_s")).alias("flow"), + result = result.withColumn( + "density", + F.col("num_vehicles") / F.col("cluster_len_km"), + ).withColumn( + "flow", + F.col("num_vehicles") * 3.6e3 / F.col("cluster_dt_s"), ) - self._result = result.sort(group + ["cluster_local_id"]) + self._result = result.orderBy(*group, "cluster_local_id") return self # ------------------------------------------------------------------ # accessors # ------------------------------------------------------------------ @property - def result(self) -> pl.DataFrame: + def result(self) -> DataFrame: """Return the clustered result DataFrame. Raises @@ -186,19 +220,38 @@ def result(self) -> pl.DataFrame: return self._result @property - def df(self) -> pl.DataFrame: + def df(self) -> DataFrame: """Alias for :attr:`result`.""" return self.result def to_csv(self, path: str | Path, **kwargs) -> None: - """Write the result to a CSV file.""" - self.result.write_csv(path, **kwargs) + """Write the result to a CSV directory (Spark partitioned output). + + Parameters + ---------- + path : str or Path + Destination directory. Spark writes one or more part-* files. + **kwargs + Extra options forwarded to ``DataFrameWriter.csv()``. + """ + self.result.write.option("header", "true").csv(str(path), **kwargs) def to_parquet(self, path: str | Path, **kwargs) -> None: - """Write the result to a Parquet file.""" - self.result.write_parquet(path, **kwargs) + """Write the result to a Parquet directory (Spark partitioned output). + + Parameters + ---------- + path : str or Path + Destination directory. + **kwargs + Extra options forwarded to ``DataFrameWriter.parquet()``. + """ + self.result.write.parquet(str(path), **kwargs) def __repr__(self) -> str: status = "clusterized" if self._result is not None else "raw" - rows = len(self._result) if self._result is not None else len(self._df) + if self._result is not None: + rows = self._result.count() + else: + rows = self._df.count() return f"TSM(status={status}, rows={rows})" From d562ea99f2f6ab9b3a87403ddc2ee660c28c7d55 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Niccol=C3=B2=20Barbieri?= Date: Wed, 25 Feb 2026 17:04:36 +0000 Subject: [PATCH 2/5] Adding pyspark dependencies --- setup.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/setup.py b/setup.py index 99db3f35..b67afed9 100644 --- a/setup.py +++ b/setup.py @@ -552,5 +552,8 @@ def run_stubgen(self): "geopandas", "shapely", "folium", + "pyspark", + "matplotlib", + "pandas" ], ) From 374a1fd4dfb302a27959c1f43a25f34ef43bbdca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Niccol=C3=B2=20Barbieri?= Date: Wed, 25 Feb 2026 17:05:04 +0000 Subject: [PATCH 3/5] Add _get_or_create_spark import --- src/dsf/tsm/__init__.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/dsf/tsm/__init__.py b/src/dsf/tsm/__init__.py index 6b92ae0d..6fd35383 100644 --- a/src/dsf/tsm/__init__.py +++ b/src/dsf/tsm/__init__.py @@ -1 +1,2 @@ from .tsm import TSM as TSM +from .tsm import _get_or_create_spark as _get_or_create_spark From 3e08ce5b67f61d71c53e883e6170c568e56e7423 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Niccol=C3=B2=20Barbieri?= Date: Wed, 25 Feb 2026 17:05:15 +0000 Subject: [PATCH 4/5] Refactor density and flow calculations to use F.try_divide for safer division --- src/dsf/tsm/tsm.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/dsf/tsm/tsm.py b/src/dsf/tsm/tsm.py index a8e6c585..0d5fbd66 100644 --- a/src/dsf/tsm/tsm.py +++ b/src/dsf/tsm/tsm.py @@ -139,7 +139,7 @@ def clusterize( "delta_t_s", F.when( F.col("prev_timestamp").isNotNull(), - F.col("timestamp").cast("long") - F.col("prev_timestamp").cast("long"), + F.unix_timestamp(F.col("timestamp")) - F.unix_timestamp(F.col("prev_timestamp")), ), ).drop("prev_timestamp") @@ -185,19 +185,19 @@ def clusterize( if lanes_df is not None: result = result.join(lanes_df, on=group, how="left").withColumn( "density", - F.col("num_vehicles") / F.col("cluster_len_km") / F.col("n_lanes"), + F.try_divide(F.col("num_vehicles"), F.col("cluster_len_km") * F.col("n_lanes")), ).withColumn( "flow", - F.col("num_vehicles") * 3.6e3 / F.col("cluster_dt_s") / F.col("n_lanes"), + F.try_divide(F.col("num_vehicles") * 3.6e3, F.col("cluster_dt_s") * F.col("n_lanes")), ) else: # Without lane info assume 1 lane result = result.withColumn( "density", - F.col("num_vehicles") / F.col("cluster_len_km"), + F.try_divide(F.col("num_vehicles"), F.col("cluster_len_km")), ).withColumn( "flow", - F.col("num_vehicles") * 3.6e3 / F.col("cluster_dt_s"), + F.try_divide(F.col("num_vehicles") * 3.6e3, F.col("cluster_dt_s")), ) self._result = result.orderBy(*group, "cluster_local_id") @@ -254,4 +254,4 @@ def __repr__(self) -> str: rows = self._result.count() else: rows = self._df.count() - return f"TSM(status={status}, rows={rows})" + return f"TSM(status={status}, rows={rows})" \ No newline at end of file From 823c9bc3b1e96fec4af3454ba64ef2d8fa8f1df1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Niccol=C3=B2=20Barbieri?= Date: Mon, 2 Mar 2026 13:23:11 +0000 Subject: [PATCH 5/5] adding the dataframe labeled with intratimes --- src/dsf/tsm/tsm.py | 37 ++++++++++++++++++++++++++++++++++++- 1 file changed, 36 insertions(+), 1 deletion(-) diff --git a/src/dsf/tsm/tsm.py b/src/dsf/tsm/tsm.py index 0d5fbd66..1915409c 100644 --- a/src/dsf/tsm/tsm.py +++ b/src/dsf/tsm/tsm.py @@ -82,6 +82,7 @@ def __init__( ) self._result: Optional[DataFrame] = None + self._result_intratimes: Optional[DataFrame] = None # ------------------------------------------------------------------ # helpers @@ -101,6 +102,7 @@ def clusterize( self, min_vehicles: int = 5, gap_factor: float = 3.0, + intermediates=False ) -> "TSM": """Run the clustering pipeline. @@ -168,7 +170,13 @@ def clusterize( "cluster_local_id", F.sum("new_cluster").over(w_unbounded), ) - + df = df.withColumn( + "intra_cluster_intermediate_time_s", + F.when(F.lag("cluster_local_id").over(w) == F.col("cluster_local_id"), F.col("delta_t_s")).otherwise(None), + ) + if intermediates: + self._result_intratimes = df.drop("new_cluster").drop("prev_timestamp") + # aggregate per cluster result = ( df.groupBy(group + ["cluster_local_id"]) @@ -177,6 +185,7 @@ def clusterize( F.count("*").alias("num_vehicles"), F.sum(F.col("distance_m") * 1e-3).alias("cluster_len_km"), F.sum("delta_t_s").alias("cluster_dt_s"), + F.mean("intra_cluster_intermediate_time_s").alias("mean_intra_cluster_dt_s"), ) .filter(F.col("num_vehicles") > min_vehicles) ) @@ -219,6 +228,19 @@ def result(self) -> DataFrame: raise RuntimeError("Call .clusterize() before accessing .result") return self._result + @property + def result_intratimes(self) -> DataFrame: + """Return the intermediate times DataFrame. + + Raises + ------ + RuntimeError + If :meth:`clusterize` has not been called yet. + """ + if self._result_intratimes is None: + raise RuntimeError("Call .clusterize() with intermediates=True before accessing .result_intratimes") + return self._result_intratimes + @property def df(self) -> DataFrame: """Alias for :attr:`result`.""" @@ -236,6 +258,19 @@ def to_csv(self, path: str | Path, **kwargs) -> None: """ self.result.write.option("header", "true").csv(str(path), **kwargs) + def to_csv_intratimes(self, path: str | Path, **kwargs) -> None: + """Write the intermediate times result to a CSV directory (Spark partitioned output). + + Parameters + ---------- + path : str or Path + Destination directory. Spark writes one or more part-* files. + **kwargs + Extra options forwarded to ``DataFrameWriter.csv()``. + """ + self.result_intratimes.write.option("header", "true").csv(str(path), **kwargs) + + def to_parquet(self, path: str | Path, **kwargs) -> None: """Write the result to a Parquet directory (Spark partitioned output).