diff --git a/setup.py b/setup.py index 99db3f35..b67afed9 100644 --- a/setup.py +++ b/setup.py @@ -552,5 +552,8 @@ def run_stubgen(self): "geopandas", "shapely", "folium", + "pyspark", + "matplotlib", + "pandas" ], ) diff --git a/src/dsf/tsm/__init__.py b/src/dsf/tsm/__init__.py index 6b92ae0d..6fd35383 100644 --- a/src/dsf/tsm/__init__.py +++ b/src/dsf/tsm/__init__.py @@ -1 +1,2 @@ from .tsm import TSM as TSM +from .tsm import _get_or_create_spark as _get_or_create_spark diff --git a/src/dsf/tsm/tsm.py b/src/dsf/tsm/tsm.py index cab2e898..1915409c 100644 --- a/src/dsf/tsm/tsm.py +++ b/src/dsf/tsm/tsm.py @@ -3,18 +3,35 @@ from pathlib import Path from typing import Dict, Optional -import polars as pl +from pyspark.sql import DataFrame, SparkSession, Window +import pyspark.sql.functions as F +import pyspark.sql.types as T + + +def _get_or_create_spark() -> SparkSession: + """Return the active SparkSession or create a local one.""" + return ( + SparkSession.builder + .master("local[*]") + .appName("TSM") + .config("spark.driver.memory", "128g") \ + .config("spark.executor.memory", "128g") \ + .config("spark.sql.shuffle.partitions", "1600") \ + .config("spark.sql.adaptive.enabled", "true") \ + .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \ + .getOrCreate() + ) class TSM: """Traffic State Monitoring data. Builds density/flow clusters from per-vehicle detector data stored in a - Polars ``DataFrame``. + PySpark ``DataFrame``. Parameters ---------- - data : pl.DataFrame + data : pyspark.sql.DataFrame Raw detector data. Must contain at least the columns ``detector``, ``timestamp``, and ``speed_kph`` (or names that are mapped to them via *column_mapping*). @@ -27,7 +44,7 @@ class TSM: - ``detector``: unique ID of the traffic detector (e.g. loop sensor). - ``timestamp``: timestamp of the vehicle passage (must be a - Polars datetime type). + PySpark ``TimestampType``). - ``speed_kph``: speed of the vehicle in km/h. Optional target columns: @@ -40,14 +57,15 @@ class TSM: def __init__( self, - data: pl.DataFrame, + data: DataFrame, column_mapping: Optional[Dict[str, str]] = None, ) -> None: if column_mapping is not None: - rename = { - src: eng for src, eng in column_mapping.items() if src in data.columns - } - self._df: pl.DataFrame = data.rename(rename) + df = data + for src, eng in column_mapping.items(): + if src in df.columns: + df = df.withColumnRenamed(src, eng) + self._df: DataFrame = df else: self._df = data @@ -63,7 +81,8 @@ def __init__( f"Available columns: {self._df.columns}" ) - self._result: Optional[pl.DataFrame] = None + self._result: Optional[DataFrame] = None + self._result_intratimes: Optional[DataFrame] = None # ------------------------------------------------------------------ # helpers @@ -83,6 +102,7 @@ def clusterize( self, min_vehicles: int = 5, gap_factor: float = 3.0, + intermediates=False ) -> "TSM": """Run the clustering pipeline. @@ -102,78 +122,101 @@ def clusterize( group = self._group_cols # --- lanes sub-table (only when lane info is available) ----------- - lanes_df: Optional[pl.DataFrame] = None + lanes_df: Optional[DataFrame] = None if self._has_lane: - lanes_df = self._df.group_by(group).agg( - pl.col("lane").n_unique().alias("n_lanes") + lanes_df = self._df.groupBy(group).agg( + F.count_distinct("lane").alias("n_lanes") ) + # --- window for per-detector and direction in ordered operations ---------------------- + w = Window.partitionBy(group).orderBy("timestamp") + # --- main pipeline ------------------------------------------------ + df = self._df + + # delta_t_s: seconds since previous row for the same detector and direction + df = df.withColumn( + "prev_timestamp", F.lag("timestamp").over(w) + ).withColumn( + "delta_t_s", + F.when( + F.col("prev_timestamp").isNotNull(), + F.unix_timestamp(F.col("timestamp")) - F.unix_timestamp(F.col("prev_timestamp")), + ), + ).drop("prev_timestamp") + + # distance_m + df = df.withColumn( + "distance_m", + F.col("speed_kph") * F.col("delta_t_s") / 3.6, + ) + + # new_cluster flag + df = df.withColumn( + "new_cluster", + ( + (F.col("distance_m") > F.lit(gap_factor) * (F.col("speed_kph") / 3.6)) + | F.col("delta_t_s").isNull() + ).cast("int"), + ) + + # cluster_local_id: cumulative sum of new_cluster within each detector and direction + w_unbounded = ( + Window.partitionBy(group) + .orderBy("timestamp") + .rowsBetween(Window.unboundedPreceding, Window.currentRow) + ) + df = df.withColumn( + "cluster_local_id", + F.sum("new_cluster").over(w_unbounded), + ) + df = df.withColumn( + "intra_cluster_intermediate_time_s", + F.when(F.lag("cluster_local_id").over(w) == F.col("cluster_local_id"), F.col("delta_t_s")).otherwise(None), + ) + if intermediates: + self._result_intratimes = df.drop("new_cluster").drop("prev_timestamp") + + # aggregate per cluster result = ( - self._df.sort(group + ["timestamp"]) - .with_columns( - (pl.col("timestamp") - pl.col("timestamp").shift(1)) - .dt.total_seconds() - .over(group) - .alias("delta_t_s") - ) - .with_columns( - (pl.col("speed_kph") * pl.col("delta_t_s") / 3.6).alias("distance_m") - ) - .with_row_index("row_idx") - .with_columns( - ( - (pl.col("distance_m") > gap_factor * (pl.col("speed_kph") / 3.6)) - | pl.col("delta_t_s").is_null() - ).alias("new_cluster") - ) - .with_columns( - pl.col("new_cluster") - .cast(pl.Int32) - .cum_sum() - .over(group) - .alias("cluster_local_id") - ) - .group_by(group + ["cluster_local_id"]) + df.groupBy(group + ["cluster_local_id"]) .agg( - pl.col("speed_kph").mean().alias("mean_speed_kph"), - pl.len().alias("num_vehicles"), - (pl.col("distance_m") * 1e-3).sum().alias("cluster_len_km"), - pl.col("delta_t_s").sum().alias("cluster_dt_s"), + F.mean("speed_kph").alias("mean_speed_kph"), + F.count("*").alias("num_vehicles"), + F.sum(F.col("distance_m") * 1e-3).alias("cluster_len_km"), + F.sum("delta_t_s").alias("cluster_dt_s"), + F.mean("intra_cluster_intermediate_time_s").alias("mean_intra_cluster_dt_s"), ) - .filter(pl.col("num_vehicles") > min_vehicles) + .filter(F.col("num_vehicles") > min_vehicles) ) # --- join lane count & compute density / flow --------------------- if lanes_df is not None: - result = result.join(lanes_df, on=group, how="left").with_columns( - ( - pl.col("num_vehicles") - / pl.col("cluster_len_km") - / pl.col("n_lanes") - ).alias("density"), - ( - pl.col("num_vehicles") - * 3.6e3 - / pl.col("cluster_dt_s") - / pl.col("n_lanes") - ).alias("flow"), + result = result.join(lanes_df, on=group, how="left").withColumn( + "density", + F.try_divide(F.col("num_vehicles"), F.col("cluster_len_km") * F.col("n_lanes")), + ).withColumn( + "flow", + F.try_divide(F.col("num_vehicles") * 3.6e3, F.col("cluster_dt_s") * F.col("n_lanes")), ) else: # Without lane info assume 1 lane - result = result.with_columns( - (pl.col("num_vehicles") / pl.col("cluster_len_km")).alias("density"), - (pl.col("num_vehicles") * 3.6e3 / pl.col("cluster_dt_s")).alias("flow"), + result = result.withColumn( + "density", + F.try_divide(F.col("num_vehicles"), F.col("cluster_len_km")), + ).withColumn( + "flow", + F.try_divide(F.col("num_vehicles") * 3.6e3, F.col("cluster_dt_s")), ) - self._result = result.sort(group + ["cluster_local_id"]) + self._result = result.orderBy(*group, "cluster_local_id") return self # ------------------------------------------------------------------ # accessors # ------------------------------------------------------------------ @property - def result(self) -> pl.DataFrame: + def result(self) -> DataFrame: """Return the clustered result DataFrame. Raises @@ -186,19 +229,64 @@ def result(self) -> pl.DataFrame: return self._result @property - def df(self) -> pl.DataFrame: + def result_intratimes(self) -> DataFrame: + """Return the intermediate times DataFrame. + + Raises + ------ + RuntimeError + If :meth:`clusterize` has not been called yet. + """ + if self._result_intratimes is None: + raise RuntimeError("Call .clusterize() with intermediates=True before accessing .result_intratimes") + return self._result_intratimes + + @property + def df(self) -> DataFrame: """Alias for :attr:`result`.""" return self.result def to_csv(self, path: str | Path, **kwargs) -> None: - """Write the result to a CSV file.""" - self.result.write_csv(path, **kwargs) + """Write the result to a CSV directory (Spark partitioned output). + + Parameters + ---------- + path : str or Path + Destination directory. Spark writes one or more part-* files. + **kwargs + Extra options forwarded to ``DataFrameWriter.csv()``. + """ + self.result.write.option("header", "true").csv(str(path), **kwargs) + + def to_csv_intratimes(self, path: str | Path, **kwargs) -> None: + """Write the intermediate times result to a CSV directory (Spark partitioned output). + + Parameters + ---------- + path : str or Path + Destination directory. Spark writes one or more part-* files. + **kwargs + Extra options forwarded to ``DataFrameWriter.csv()``. + """ + self.result_intratimes.write.option("header", "true").csv(str(path), **kwargs) + def to_parquet(self, path: str | Path, **kwargs) -> None: - """Write the result to a Parquet file.""" - self.result.write_parquet(path, **kwargs) + """Write the result to a Parquet directory (Spark partitioned output). + + Parameters + ---------- + path : str or Path + Destination directory. + **kwargs + Extra options forwarded to ``DataFrameWriter.parquet()``. + """ + self.result.write.parquet(str(path), **kwargs) def __repr__(self) -> str: status = "clusterized" if self._result is not None else "raw" - rows = len(self._result) if self._result is not None else len(self._df) - return f"TSM(status={status}, rows={rows})" + if self._result is not None: + rows = self._result.count() + else: + rows = self._df.count() + return f"TSM(status={status}, rows={rows})" \ No newline at end of file