From 9e1f2fc38c2e24f99d29c0393a659024b0598e5a Mon Sep 17 00:00:00 2001 From: Sarah Gershuni Date: Wed, 19 Nov 2025 17:52:58 +0200 Subject: [PATCH 1/6] basic implementation of deepstream_pipeline_cpu.py --- .../helpers/plant_msg_meta_builder.py | 68 +++++++ .../pipelines/deepstream_pipeline_cpu.py | 177 ++++++++++++++++++ .../cpu/background_removal_probe.py | 48 +++++ .../probes/db_message_meta_probe.py | 51 +++++ 4 files changed, 344 insertions(+) create mode 100644 src/deepstream/helpers/plant_msg_meta_builder.py create mode 100644 src/deepstream/pipelines/deepstream_pipeline_cpu.py create mode 100644 src/deepstream/probes/background_removal/cpu/background_removal_probe.py create mode 100644 src/deepstream/probes/db_message_meta_probe.py diff --git a/src/deepstream/helpers/plant_msg_meta_builder.py b/src/deepstream/helpers/plant_msg_meta_builder.py new file mode 100644 index 0000000..a6ac2db --- /dev/null +++ b/src/deepstream/helpers/plant_msg_meta_builder.py @@ -0,0 +1,68 @@ +import sys +from pydantic import BaseModel + +import gi +gi.require_version("Gst", "1.0") + +from gi.repository import Gst +import pyds + +from deepstream.helpers.softmax_topk_classifier import ClassificationPrediction + +class PlantEvent(BaseModel): + frame_id: int + plant_id: str + prediction: ClassificationPrediction + + +class NvdsPlantEventBuilder: + """ + For the reference, please check + https://github.com/NVIDIA-AI-IOT/deepstream_python_apps/blob/9b27f02ffea46a3ded2ad26b3eea27ef3e2dfded/apps/deepstream-test4/deepstream_test_4.py + """ + + def build(self, batch_meta, frame_meta, event: PlantEvent) -> None: + user_event_meta = pyds.nvds_acquire_user_meta_from_pool(batch_meta) + msg_meta = pyds.NvDsEventMsgMeta.cast(pyds.alloc_nvds_event_msg_meta(user_event_meta)) + + # Generic IDs / bookkeeping + msg_meta.frameId = event.frame_id + msg_meta.objectId = event.plant_id + msg_meta.trackingId = event.frame_id + msg_meta.objClassId = event.prediction.class_id + msg_meta.confidence = float(event.prediction.confidence) + + # Link to sensor0 / place0 / analytics0 from msgconv config + msg_meta.sensorId = 0 + msg_meta.placeId = 0 + msg_meta.moduleId = 0 + + msg_meta.type = pyds.NvDsEventType.NVDS_EVENT_CUSTOM + msg_meta.objType = pyds.NvDsObjectType.NVDS_OBJECT_TYPE_PERSON + + # Timestamp + msg_meta.ts = pyds.alloc_buffer(32) + pyds.generate_ts_rfc3339(msg_meta.ts, 32) + + # Dummy bbox (positive; you can plug in real ROIs later) + msg_meta.bbox.top = -1 + msg_meta.bbox.left = -1 + msg_meta.bbox.width = 0 + msg_meta.bbox.height = 0 + + # Attach person object repurposed to carry plant info + obj = pyds.alloc_nvds_person_object() + person = pyds.NvDsPersonObject.cast(obj) + + person.age = 0 + person.gender = "plant" + person.hair = "none" + person.cap = "none" + person.apparel = event.prediction.class_name + + msg_meta.extMsg = obj + msg_meta.extMsgSize = sys.getsizeof(pyds.NvDsPersonObject) + + user_event_meta.user_meta_data = msg_meta + user_event_meta.base_meta.meta_type = pyds.NvDsMetaType.NVDS_EVENT_MSG_META + pyds.nvds_add_user_meta_to_frame(frame_meta, user_event_meta) diff --git a/src/deepstream/pipelines/deepstream_pipeline_cpu.py b/src/deepstream/pipelines/deepstream_pipeline_cpu.py new file mode 100644 index 0000000..ea7de84 --- /dev/null +++ b/src/deepstream/pipelines/deepstream_pipeline_cpu.py @@ -0,0 +1,177 @@ +import os +from datetime import datetime +from typing import Any, Dict, List +import gi +import numpy as np +import cv2 +import pyds + +gi.require_version("Gst", "1.0") +from gi.repository import GLib, Gst + +from frame_comparison.frame_change_detector import FrameChangeDetector +from deepstream.helpers.meta_tensor_extractor import TensorExtractor +from deepstream.helpers.softmax_topk_classifier import ClassificationPrediction, SoftmaxTopKClassifier +from deepstream.helpers.plant_msg_meta_builder import NvdsPlantEventBuilder, PlantEvent + +from deepstream.probes.background_removal.cpu.background_removal_probe import BackgroundRemovalProbe +from deepstream.probes.db_message_meta_probe import DbMessageMetaProbe +from deepstream.probes.frame_comparison.cpu.frame_skip_probe import FrameSkipProbe + +# Configuration +RTSP_PORT = os.environ.get("RTSP_PORT", "8554") +RTSP_URL = f"rtsp://127.0.0.1:{RTSP_PORT}/test" + +CONFIG_FILE: str = "/workspace/configs/resnet18.txt" +MSGCONV_CONFIG: str = "/workspace/configs/nvmsgbroker_msgconv_config.txt" +MQTT_CONN_STR = "172.17.0.1;1883;agstream-client" +MQTT_TOPIC = "deepstream/predictions" +PLANT_LABELS = "/workspace/configs/crop_and_weed_83_classes.txt" + +# Load class labels +def load_class_labels() -> List[str]: + try: + with open(PLANT_LABELS, "r") as f: + return [line.strip() for line in f.readlines()] + except: + return [f"class_{i}" for i in range(83)] + +CLASS_LABELS = load_class_labels() + +tensor_extractor = TensorExtractor() +classifier = SoftmaxTopKClassifier(CLASS_LABELS) +plant_event_builder = NvdsPlantEventBuilder() +db_message_meta_probe = DbMessageMetaProbe(tensor_extractor, classifier, plant_event_builder) + +detector = FrameChangeDetector() +frame_skip_probe = FrameSkipProbe(detector) + +background_removal_probe = BackgroundRemovalProbe() + +# Pipeline construction +def build_pipeline() -> Gst.Pipeline: + """Build DeepStream pipeline with background removal, frame skipping, and message broker.""" + pipeline = Gst.Pipeline.new("classification-pipeline") + + # Elements + rtspsrc = Gst.ElementFactory.make("rtspsrc", "source") + depay = Gst.ElementFactory.make("rtph264depay", "depay") + parse = Gst.ElementFactory.make("h264parse", "parse") + decode = Gst.ElementFactory.make("decodebin", "decode") + convert = Gst.ElementFactory.make("videoconvert", "convert") + nvvideoconvert = Gst.ElementFactory.make("nvvideoconvert", "nvvideoconvert") + capsfilter = Gst.ElementFactory.make("capsfilter", "capsfilter") + streammux = Gst.ElementFactory.make("nvstreammux", "streammux") + nvinfer = Gst.ElementFactory.make("nvinfer", "nvinfer") + nvosd = Gst.ElementFactory.make("nvdsosd", "osd") + nvmsgconv = Gst.ElementFactory.make("nvmsgconv", "nvmsgconv") + nvmsgbroker = Gst.ElementFactory.make("nvmsgbroker", "nvmsgbroker") + + for e in [ + rtspsrc, + depay, + parse, + decode, + convert, + nvvideoconvert, + capsfilter, + streammux, + nvinfer, + nvosd, + nvmsgconv, + nvmsgbroker, + ]: + assert e is not None, f"Failed to create element {e}" + pipeline.add(e) + + # Configure elements + rtspsrc.set_property("location", RTSP_URL) + rtspsrc.set_property("latency", 200) + streammux.set_property("batch-size", 1) + streammux.set_property("width", 640) + streammux.set_property("height", 480) + nvinfer.set_property("config-file-path", CONFIG_FILE) + caps = Gst.Caps.from_string("video/x-raw(memory:NVMM), format=RGBA") + capsfilter.set_property("caps", caps) + nvmsgconv.set_property("config", MSGCONV_CONFIG) + nvmsgconv.set_property("payload-type", 0) + nvmsgbroker.set_property("proto-lib", "/opt/nvidia/deepstream/deepstream-6.4/lib/libnvds_mqtt_proto.so") + nvmsgbroker.set_property("conn-str", MQTT_CONN_STR) + nvmsgbroker.set_property("topic", MQTT_TOPIC) + nvmsgbroker.set_property("sync", False) + + # Dynamic pad linking + def on_pad_added_rtspsrc(src: Any, pad: Any) -> None: + sinkpad = depay.get_static_pad("sink") + if not sinkpad.is_linked(): + pad.link(sinkpad) + + rtspsrc.connect("pad-added", on_pad_added_rtspsrc) + + def on_pad_added_decode(src: Any, pad: Any) -> None: + sinkpad = convert.get_static_pad("sink") + if not sinkpad.is_linked(): + pad.link(sinkpad) + + decode.connect("pad-added", on_pad_added_decode) + + # Link capsfilter → streammux + depay.link(parse) + parse.link(decode) + convert.link(nvvideoconvert) + nvvideoconvert.link(capsfilter) + srcpad = capsfilter.get_static_pad("src") + sinkpad = streammux.get_request_pad("sink_0") + srcpad.link(sinkpad) + + streammux.link(nvinfer) + nvinfer.link(nvosd) + nvosd.link(nvmsgconv) + nvmsgconv.link(nvmsgbroker) + + # Add probes + streammux_src_pad = streammux.get_static_pad("src") + streammux_src_pad.add_probe(Gst.PadProbeType.BUFFER, frame_skip_probe.handle_probe) + + streammux_src_pad.add_probe(Gst.PadProbeType.BUFFER, background_removal_probe.handle_probe) + + nvinfer_src_pad = nvinfer.get_static_pad("src") + nvinfer_src_pad.add_probe(Gst.PadProbeType.BUFFER, db_message_meta_probe.handle_probe) + + return pipeline + + +# Run pipeline +def run_pipeline(pipeline: Gst.Pipeline) -> None: + """Run the pipeline and handle messages.""" + loop = GLib.MainLoop() + bus = pipeline.get_bus() + bus.add_signal_watch() + + def on_message(bus: Any, msg: Any) -> None: + if msg.type == Gst.MessageType.ERROR: + err, _ = msg.parse_error() + print("ERROR:", err) + loop.quit() + elif msg.type == Gst.MessageType.EOS: + print("End of stream.") + loop.quit() + + bus.connect("message", on_message) + pipeline.set_state(Gst.State.PLAYING) + print(f"🚀 Pipeline started (publishing to {MQTT_TOPIC} via {MQTT_CONN_STR})") + + try: + loop.run() + except KeyboardInterrupt: + pass + finally: + pipeline.set_state(Gst.State.NULL) + print("Pipeline stopped.") + + +# Main +if __name__ == "__main__": + Gst.init(None) + pipeline = build_pipeline() + run_pipeline(pipeline) diff --git a/src/deepstream/probes/background_removal/cpu/background_removal_probe.py b/src/deepstream/probes/background_removal/cpu/background_removal_probe.py new file mode 100644 index 0000000..ce46f63 --- /dev/null +++ b/src/deepstream/probes/background_removal/cpu/background_removal_probe.py @@ -0,0 +1,48 @@ +from typing import Any +import numpy as np +import cv2 +import pyds +from gi.repository import Gst + + +class BackgroundRemovalProbe: + def __init__(self, morph_kernel: int = 34): + self.morph_kernel = morph_kernel + + def remove_background(self, frame_bgr: np.ndarray) -> np.ndarray: + rgb = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2RGB).astype(np.float32) + R, G, B = rgb[..., 0], rgb[..., 1], rgb[..., 2] + + exg = 2.0 * G - R - B + exg_norm = cv2.normalize(exg, None, 0, 255, cv2.NORM_MINMAX).astype(np.uint8) + + _, mask = cv2.threshold(exg_norm, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU) + + kernel = cv2.getStructuringElement( + cv2.MORPH_ELLIPSE, + (self.morph_kernel, self.morph_kernel) + ) + mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, kernel) + + out = np.zeros_like(frame_bgr) + out[mask > 0] = frame_bgr[mask > 0] + return out + + def handle_probe(self, pad: Gst.Pad, info: Gst.PadProbeInfo) -> Gst.PadProbeReturn: + gst_buffer = info.get_buffer() + if not gst_buffer: + return Gst.PadProbeReturn.OK + + try: + surface = pyds.get_nvds_buf_surface(hash(gst_buffer), 0) + frame_rgba = np.array(surface, copy=False, order="C") + frame_bgr = cv2.cvtColor(frame_rgba, cv2.COLOR_RGBA2BGR) + + masked_bgr = self.remove_background(frame_bgr) + frame_rgba_new = cv2.cvtColor(masked_bgr, cv2.COLOR_BGR2RGBA) + + np.copyto(frame_rgba, frame_rgba_new) + except Exception as e: + raise RuntimeError("BackgroundRemovalProbe: failed to fetch/map NvBufSurface") from e + + return Gst.PadProbeReturn.OK diff --git a/src/deepstream/probes/db_message_meta_probe.py b/src/deepstream/probes/db_message_meta_probe.py new file mode 100644 index 0000000..4b98582 --- /dev/null +++ b/src/deepstream/probes/db_message_meta_probe.py @@ -0,0 +1,51 @@ +from typing import Any, List +import pyds +from gi.repository import Gst + +from deepstream.helpers.meta_tensor_extractor import TensorExtractor +from deepstream.helpers.softmax_topk_classifier import ClassificationPrediction, SoftmaxTopKClassifier +from deepstream.helpers.plant_msg_meta_builder import NvdsPlantEventBuilder, PlantEvent + + +class DbMessageMetaProbe: + def __init__(self, tensor_extractor : TensorExtractor, classifier: SoftmaxTopKClassifier, plant_event_builder: NvdsPlantEventBuilder): + self.tensor_extractor = tensor_extractor + self.classifier = classifier + self.plant_event_builder = plant_event_builder + + def extract_tensor_output(self, frame_meta) -> List[ClassificationPrediction]: + tensor_meta = self.tensor_extractor.find_tensor_meta(frame_meta) + if tensor_meta is None: + return [] + + logits = self.tensor_extractor.extract_logits(tensor_meta) + return self.classifier.predict_from_logits(logits) + + def attach_msgmeta(self, batch_meta, frame_meta, frame_number: int, predictions: List[ClassificationPrediction]) -> None: + best = predictions[0] + event = PlantEvent( + frame_id=frame_number, + plant_id=f"plant-{frame_number}", + prediction=best, + ) + self.plant_event_builder.build(batch_meta, frame_meta, event) + + def handle_probe(self, pad: Gst.Pad, info: Gst.PadProbeInfo) -> Gst.PadProbeReturn: + buffer = info.get_buffer() + if not buffer: + return Gst.PadProbeReturn.OK + + batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(buffer)) + l_frame = batch_meta.frame_meta_list + + while l_frame: + frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data) + frame_number = frame_meta.frame_num + + predictions = self.extract_tensor_output(frame_meta) + if predictions: + self.attach_msgmeta(batch_meta, frame_meta, frame_number, predictions) + + l_frame = l_frame.next + + return Gst.PadProbeReturn.OK From 075587f1c35e77733e0c06149b8aa9f8a409e0ed Mon Sep 17 00:00:00 2001 From: Sarah Gershuni Date: Thu, 20 Nov 2025 12:22:31 +0200 Subject: [PATCH 2/6] full implementation for square videos --- .../helpers/plant_msg_meta_builder.py | 2 +- src/deepstream/helpers/remove_background.py | 23 +++++ .../pipelines/deepstream_pipeline_cpu.py | 88 +++++++------------ .../cpu/background_removal_probe.py | 52 ++++------- .../probes/db_message_meta_probe.py | 14 ++- .../cpu/frame_skipping_probe.py | 4 - 6 files changed, 83 insertions(+), 100 deletions(-) create mode 100644 src/deepstream/helpers/remove_background.py diff --git a/src/deepstream/helpers/plant_msg_meta_builder.py b/src/deepstream/helpers/plant_msg_meta_builder.py index a6ac2db..e886768 100644 --- a/src/deepstream/helpers/plant_msg_meta_builder.py +++ b/src/deepstream/helpers/plant_msg_meta_builder.py @@ -7,7 +7,7 @@ from gi.repository import Gst import pyds -from deepstream.helpers.softmax_topk_classifier import ClassificationPrediction +from src.deepstream.helpers.softmax_topk_classifier import ClassificationPrediction class PlantEvent(BaseModel): frame_id: int diff --git a/src/deepstream/helpers/remove_background.py b/src/deepstream/helpers/remove_background.py new file mode 100644 index 0000000..f5e08d3 --- /dev/null +++ b/src/deepstream/helpers/remove_background.py @@ -0,0 +1,23 @@ +import numpy as np +import cv2 + +MORPH_KERNEL = 34 + +def remove_background(frame_bgr: np.ndarray) -> np.ndarray: + rgb = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2RGB).astype(np.float32) + R, G, B = rgb[..., 0], rgb[..., 1], rgb[..., 2] + + exg = 2.0 * G - R - B + exg_norm = cv2.normalize(exg, None, 0, 255, cv2.NORM_MINMAX).astype(np.uint8) + + _, mask = cv2.threshold(exg_norm, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU) + + kernel = cv2.getStructuringElement( + cv2.MORPH_ELLIPSE, + (MORPH_KERNEL, MORPH_KERNEL) + ) + mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, kernel) + + out = np.zeros_like(frame_bgr) + out[mask > 0] = frame_bgr[mask > 0] + return out diff --git a/src/deepstream/pipelines/deepstream_pipeline_cpu.py b/src/deepstream/pipelines/deepstream_pipeline_cpu.py index ea7de84..bdfe696 100644 --- a/src/deepstream/pipelines/deepstream_pipeline_cpu.py +++ b/src/deepstream/pipelines/deepstream_pipeline_cpu.py @@ -9,14 +9,17 @@ gi.require_version("Gst", "1.0") from gi.repository import GLib, Gst -from frame_comparison.frame_change_detector import FrameChangeDetector -from deepstream.helpers.meta_tensor_extractor import TensorExtractor -from deepstream.helpers.softmax_topk_classifier import ClassificationPrediction, SoftmaxTopKClassifier -from deepstream.helpers.plant_msg_meta_builder import NvdsPlantEventBuilder, PlantEvent +from src.frame_comparison.frame_change_detector import FrameChangeDetector +from src.deepstream.helpers.meta_tensor_extractor import TensorExtractor +from src.deepstream.helpers.softmax_topk_classifier import ClassificationPrediction, SoftmaxTopKClassifier +from src.deepstream.helpers.plant_msg_meta_builder import NvdsPlantEventBuilder, PlantEvent +from src.deepstream.helpers.pipeline_runner import run_pipeline -from deepstream.probes.background_removal.cpu.background_removal_probe import BackgroundRemovalProbe -from deepstream.probes.db_message_meta_probe import DbMessageMetaProbe -from deepstream.probes.frame_comparison.cpu.frame_skip_probe import FrameSkipProbe +from src.deepstream.probes.background_removal.cpu.background_removal_probe import remove_background_probe +from src.deepstream.probes.db_message_meta_probe import DbMessageMetaProbe +from src.deepstream.probes.frame_comparison.cpu.frame_skipping_probe import frame_skip_probe + +from src.model_conversion.onnx_to_trt import build_engine_if_missing # Configuration RTSP_PORT = os.environ.get("RTSP_PORT", "8554") @@ -37,21 +40,21 @@ def load_class_labels() -> List[str]: return [f"class_{i}" for i in range(83)] CLASS_LABELS = load_class_labels() +def on_new_pad(decodebin, pad, streammux): + caps = pad.get_current_caps() + s = caps.get_structure(0) + width = s.get_int("width").value + height = s.get_int("height").value -tensor_extractor = TensorExtractor() -classifier = SoftmaxTopKClassifier(CLASS_LABELS) -plant_event_builder = NvdsPlantEventBuilder() -db_message_meta_probe = DbMessageMetaProbe(tensor_extractor, classifier, plant_event_builder) - -detector = FrameChangeDetector() -frame_skip_probe = FrameSkipProbe(detector) + streammux.set_property("width", width) + streammux.set_property("height", height) -background_removal_probe = BackgroundRemovalProbe() + pad.link(streammux.get_request_pad("sink_0")) # Pipeline construction def build_pipeline() -> Gst.Pipeline: """Build DeepStream pipeline with background removal, frame skipping, and message broker.""" - pipeline = Gst.Pipeline.new("classification-pipeline") + pipeline = Gst.Pipeline.new("final-cpu-pipeline") # Elements rtspsrc = Gst.ElementFactory.make("rtspsrc", "source") @@ -63,7 +66,6 @@ def build_pipeline() -> Gst.Pipeline: capsfilter = Gst.ElementFactory.make("capsfilter", "capsfilter") streammux = Gst.ElementFactory.make("nvstreammux", "streammux") nvinfer = Gst.ElementFactory.make("nvinfer", "nvinfer") - nvosd = Gst.ElementFactory.make("nvdsosd", "osd") nvmsgconv = Gst.ElementFactory.make("nvmsgconv", "nvmsgconv") nvmsgbroker = Gst.ElementFactory.make("nvmsgbroker", "nvmsgbroker") @@ -77,7 +79,6 @@ def build_pipeline() -> Gst.Pipeline: capsfilter, streammux, nvinfer, - nvosd, nvmsgconv, nvmsgbroker, ]: @@ -90,6 +91,8 @@ def build_pipeline() -> Gst.Pipeline: streammux.set_property("batch-size", 1) streammux.set_property("width", 640) streammux.set_property("height", 480) + # streammux.set_property("width", 1280) + # streammux.set_property("height", 720) nvinfer.set_property("config-file-path", CONFIG_FILE) caps = Gst.Caps.from_string("video/x-raw(memory:NVMM), format=RGBA") capsfilter.set_property("caps", caps) @@ -107,6 +110,7 @@ def on_pad_added_rtspsrc(src: Any, pad: Any) -> None: pad.link(sinkpad) rtspsrc.connect("pad-added", on_pad_added_rtspsrc) + # decode.connect("pad-added", on_new_pad, streammux) def on_pad_added_decode(src: Any, pad: Any) -> None: sinkpad = convert.get_static_pad("sink") @@ -125,53 +129,29 @@ def on_pad_added_decode(src: Any, pad: Any) -> None: srcpad.link(sinkpad) streammux.link(nvinfer) - nvinfer.link(nvosd) - nvosd.link(nvmsgconv) + nvinfer.link(nvmsgconv) nvmsgconv.link(nvmsgbroker) - # Add probes + frame_change_detector = FrameChangeDetector() + streammux_src_pad = streammux.get_static_pad("src") - streammux_src_pad.add_probe(Gst.PadProbeType.BUFFER, frame_skip_probe.handle_probe) + streammux_src_pad.add_probe(Gst.PadProbeType.BUFFER, frame_skip_probe, frame_change_detector) - streammux_src_pad.add_probe(Gst.PadProbeType.BUFFER, background_removal_probe.handle_probe) + streammux_src_pad.add_probe(Gst.PadProbeType.BUFFER, remove_background_probe) + + tensor_extractor = TensorExtractor() + classifier = SoftmaxTopKClassifier(CLASS_LABELS) + plant_event_builder = NvdsPlantEventBuilder() + db_message_meta_probe = DbMessageMetaProbe(tensor_extractor, classifier, plant_event_builder) nvinfer_src_pad = nvinfer.get_static_pad("src") - nvinfer_src_pad.add_probe(Gst.PadProbeType.BUFFER, db_message_meta_probe.handle_probe) + nvinfer_src_pad.add_probe(Gst.PadProbeType.BUFFER, db_message_meta_probe.pad_probe) return pipeline - -# Run pipeline -def run_pipeline(pipeline: Gst.Pipeline) -> None: - """Run the pipeline and handle messages.""" - loop = GLib.MainLoop() - bus = pipeline.get_bus() - bus.add_signal_watch() - - def on_message(bus: Any, msg: Any) -> None: - if msg.type == Gst.MessageType.ERROR: - err, _ = msg.parse_error() - print("ERROR:", err) - loop.quit() - elif msg.type == Gst.MessageType.EOS: - print("End of stream.") - loop.quit() - - bus.connect("message", on_message) - pipeline.set_state(Gst.State.PLAYING) - print(f"🚀 Pipeline started (publishing to {MQTT_TOPIC} via {MQTT_CONN_STR})") - - try: - loop.run() - except KeyboardInterrupt: - pass - finally: - pipeline.set_state(Gst.State.NULL) - print("Pipeline stopped.") - - # Main if __name__ == "__main__": Gst.init(None) + build_engine_if_missing(CONFIG_FILE) pipeline = build_pipeline() run_pipeline(pipeline) diff --git a/src/deepstream/probes/background_removal/cpu/background_removal_probe.py b/src/deepstream/probes/background_removal/cpu/background_removal_probe.py index ce46f63..867e905 100644 --- a/src/deepstream/probes/background_removal/cpu/background_removal_probe.py +++ b/src/deepstream/probes/background_removal/cpu/background_removal_probe.py @@ -4,45 +4,23 @@ import pyds from gi.repository import Gst +from src.deepstream.helpers.remove_background import remove_background, MORPH_KERNEL -class BackgroundRemovalProbe: - def __init__(self, morph_kernel: int = 34): - self.morph_kernel = morph_kernel - - def remove_background(self, frame_bgr: np.ndarray) -> np.ndarray: - rgb = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2RGB).astype(np.float32) - R, G, B = rgb[..., 0], rgb[..., 1], rgb[..., 2] - - exg = 2.0 * G - R - B - exg_norm = cv2.normalize(exg, None, 0, 255, cv2.NORM_MINMAX).astype(np.uint8) - - _, mask = cv2.threshold(exg_norm, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU) - - kernel = cv2.getStructuringElement( - cv2.MORPH_ELLIPSE, - (self.morph_kernel, self.morph_kernel) - ) - mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, kernel) - - out = np.zeros_like(frame_bgr) - out[mask > 0] = frame_bgr[mask > 0] - return out - - def handle_probe(self, pad: Gst.Pad, info: Gst.PadProbeInfo) -> Gst.PadProbeReturn: - gst_buffer = info.get_buffer() - if not gst_buffer: - return Gst.PadProbeReturn.OK +def remove_background_probe(pad: Gst.Pad, info: Gst.PadProbeInfo) -> Gst.PadProbeReturn: + gst_buffer = info.get_buffer() + if not gst_buffer: + return Gst.PadProbeReturn.OK - try: - surface = pyds.get_nvds_buf_surface(hash(gst_buffer), 0) - frame_rgba = np.array(surface, copy=False, order="C") - frame_bgr = cv2.cvtColor(frame_rgba, cv2.COLOR_RGBA2BGR) + try: + surface = pyds.get_nvds_buf_surface(hash(gst_buffer), 0) + frame_rgba = np.array(surface, copy=False, order="C") + frame_bgr = cv2.cvtColor(frame_rgba, cv2.COLOR_RGBA2BGR) - masked_bgr = self.remove_background(frame_bgr) - frame_rgba_new = cv2.cvtColor(masked_bgr, cv2.COLOR_BGR2RGBA) + masked_bgr = remove_background(frame_bgr) + frame_rgba_new = cv2.cvtColor(masked_bgr, cv2.COLOR_BGR2RGBA) - np.copyto(frame_rgba, frame_rgba_new) - except Exception as e: - raise RuntimeError("BackgroundRemovalProbe: failed to fetch/map NvBufSurface") from e + np.copyto(frame_rgba, frame_rgba_new) + except Exception as e: + raise RuntimeError("BackgroundRemovalProbe: failed to fetch/map NvBufSurface") from e - return Gst.PadProbeReturn.OK + return Gst.PadProbeReturn.OK \ No newline at end of file diff --git a/src/deepstream/probes/db_message_meta_probe.py b/src/deepstream/probes/db_message_meta_probe.py index 4b98582..0edf235 100644 --- a/src/deepstream/probes/db_message_meta_probe.py +++ b/src/deepstream/probes/db_message_meta_probe.py @@ -2,9 +2,9 @@ import pyds from gi.repository import Gst -from deepstream.helpers.meta_tensor_extractor import TensorExtractor -from deepstream.helpers.softmax_topk_classifier import ClassificationPrediction, SoftmaxTopKClassifier -from deepstream.helpers.plant_msg_meta_builder import NvdsPlantEventBuilder, PlantEvent +from src.deepstream.helpers.meta_tensor_extractor import TensorExtractor +from src.deepstream.helpers.softmax_topk_classifier import ClassificationPrediction, SoftmaxTopKClassifier +from src.deepstream.helpers.plant_msg_meta_builder import NvdsPlantEventBuilder, PlantEvent class DbMessageMetaProbe: @@ -30,7 +30,7 @@ def attach_msgmeta(self, batch_meta, frame_meta, frame_number: int, predictions: ) self.plant_event_builder.build(batch_meta, frame_meta, event) - def handle_probe(self, pad: Gst.Pad, info: Gst.PadProbeInfo) -> Gst.PadProbeReturn: + def pad_probe(self, pad: Gst.Pad, info: Gst.PadProbeInfo) -> Gst.PadProbeReturn: buffer = info.get_buffer() if not buffer: return Gst.PadProbeReturn.OK @@ -44,6 +44,12 @@ def handle_probe(self, pad: Gst.Pad, info: Gst.PadProbeInfo) -> Gst.PadProbeRetu predictions = self.extract_tensor_output(frame_meta) if predictions: + top1 = predictions[0] + print( + f"[frame {frame_number}] " + f"plant='{top1.class_name}' " + f"(id={top1.class_id}, conf={top1.confidence:.3f})" + ) self.attach_msgmeta(batch_meta, frame_meta, frame_number, predictions) l_frame = l_frame.next diff --git a/src/deepstream/probes/frame_comparison/cpu/frame_skipping_probe.py b/src/deepstream/probes/frame_comparison/cpu/frame_skipping_probe.py index f13c689..8f769b7 100644 --- a/src/deepstream/probes/frame_comparison/cpu/frame_skipping_probe.py +++ b/src/deepstream/probes/frame_comparison/cpu/frame_skipping_probe.py @@ -1,7 +1,3 @@ -import sys -import os -sys.path.append(os.path.join(os.path.dirname(__file__), '../../../..')) - from typing import Any import gi import numpy as np From d81ab30d2ec59b36f4f03794b6cd1df503b2cb6b Mon Sep 17 00:00:00 2001 From: Sarah Gershuni Date: Thu, 20 Nov 2025 17:58:30 +0200 Subject: [PATCH 3/6] update output resolution to 256x256 to match model training size --- src/deepstream/pipelines/deepstream_pipeline_cpu.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/deepstream/pipelines/deepstream_pipeline_cpu.py b/src/deepstream/pipelines/deepstream_pipeline_cpu.py index bdfe696..d0e4285 100644 --- a/src/deepstream/pipelines/deepstream_pipeline_cpu.py +++ b/src/deepstream/pipelines/deepstream_pipeline_cpu.py @@ -89,10 +89,8 @@ def build_pipeline() -> Gst.Pipeline: rtspsrc.set_property("location", RTSP_URL) rtspsrc.set_property("latency", 200) streammux.set_property("batch-size", 1) - streammux.set_property("width", 640) - streammux.set_property("height", 480) - # streammux.set_property("width", 1280) - # streammux.set_property("height", 720) + streammux.set_property("width", 256) + streammux.set_property("height", 256) nvinfer.set_property("config-file-path", CONFIG_FILE) caps = Gst.Caps.from_string("video/x-raw(memory:NVMM), format=RGBA") capsfilter.set_property("caps", caps) From a764ae5e0d385af91780abaec66ff487a95045b8 Mon Sep 17 00:00:00 2001 From: Sarah Gershuni Date: Sat, 22 Nov 2025 19:00:53 +0200 Subject: [PATCH 4/6] remove unused probe --- src/deepstream/pipelines/deepstream_pipeline_cpu.py | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/src/deepstream/pipelines/deepstream_pipeline_cpu.py b/src/deepstream/pipelines/deepstream_pipeline_cpu.py index d0e4285..9a16ac4 100644 --- a/src/deepstream/pipelines/deepstream_pipeline_cpu.py +++ b/src/deepstream/pipelines/deepstream_pipeline_cpu.py @@ -40,16 +40,6 @@ def load_class_labels() -> List[str]: return [f"class_{i}" for i in range(83)] CLASS_LABELS = load_class_labels() -def on_new_pad(decodebin, pad, streammux): - caps = pad.get_current_caps() - s = caps.get_structure(0) - width = s.get_int("width").value - height = s.get_int("height").value - - streammux.set_property("width", width) - streammux.set_property("height", height) - - pad.link(streammux.get_request_pad("sink_0")) # Pipeline construction def build_pipeline() -> Gst.Pipeline: @@ -108,7 +98,6 @@ def on_pad_added_rtspsrc(src: Any, pad: Any) -> None: pad.link(sinkpad) rtspsrc.connect("pad-added", on_pad_added_rtspsrc) - # decode.connect("pad-added", on_new_pad, streammux) def on_pad_added_decode(src: Any, pad: Any) -> None: sinkpad = convert.get_static_pad("sink") From 86ff68ae9009ce6374f2b0b05d18ed5047ead0d0 Mon Sep 17 00:00:00 2001 From: Sarah Gershuni Date: Sun, 23 Nov 2025 20:33:28 +0200 Subject: [PATCH 5/6] extract shared helper, remove comments, and apply linting --- .../pipelines/deepstream_pipeline_cpu.py | 16 +++------------- .../cpu/background_removal_probe.py | 8 ++++---- 2 files changed, 7 insertions(+), 17 deletions(-) diff --git a/src/deepstream/pipelines/deepstream_pipeline_cpu.py b/src/deepstream/pipelines/deepstream_pipeline_cpu.py index 9a16ac4..9f4b15b 100644 --- a/src/deepstream/pipelines/deepstream_pipeline_cpu.py +++ b/src/deepstream/pipelines/deepstream_pipeline_cpu.py @@ -10,6 +10,9 @@ from gi.repository import GLib, Gst from src.frame_comparison.frame_change_detector import FrameChangeDetector +from src.model_conversion.onnx_to_trt import build_engine_if_missing + +from src.deepstream.helpers.load_class_labels import load_class_labels from src.deepstream.helpers.meta_tensor_extractor import TensorExtractor from src.deepstream.helpers.softmax_topk_classifier import ClassificationPrediction, SoftmaxTopKClassifier from src.deepstream.helpers.plant_msg_meta_builder import NvdsPlantEventBuilder, PlantEvent @@ -19,8 +22,6 @@ from src.deepstream.probes.db_message_meta_probe import DbMessageMetaProbe from src.deepstream.probes.frame_comparison.cpu.frame_skipping_probe import frame_skip_probe -from src.model_conversion.onnx_to_trt import build_engine_if_missing - # Configuration RTSP_PORT = os.environ.get("RTSP_PORT", "8554") RTSP_URL = f"rtsp://127.0.0.1:{RTSP_PORT}/test" @@ -29,19 +30,9 @@ MSGCONV_CONFIG: str = "/workspace/configs/nvmsgbroker_msgconv_config.txt" MQTT_CONN_STR = "172.17.0.1;1883;agstream-client" MQTT_TOPIC = "deepstream/predictions" -PLANT_LABELS = "/workspace/configs/crop_and_weed_83_classes.txt" - -# Load class labels -def load_class_labels() -> List[str]: - try: - with open(PLANT_LABELS, "r") as f: - return [line.strip() for line in f.readlines()] - except: - return [f"class_{i}" for i in range(83)] CLASS_LABELS = load_class_labels() -# Pipeline construction def build_pipeline() -> Gst.Pipeline: """Build DeepStream pipeline with background removal, frame skipping, and message broker.""" pipeline = Gst.Pipeline.new("final-cpu-pipeline") @@ -136,7 +127,6 @@ def on_pad_added_decode(src: Any, pad: Any) -> None: return pipeline -# Main if __name__ == "__main__": Gst.init(None) build_engine_if_missing(CONFIG_FILE) diff --git a/src/deepstream/probes/background_removal/cpu/background_removal_probe.py b/src/deepstream/probes/background_removal/cpu/background_removal_probe.py index 867e905..4ba4391 100644 --- a/src/deepstream/probes/background_removal/cpu/background_removal_probe.py +++ b/src/deepstream/probes/background_removal/cpu/background_removal_probe.py @@ -1,10 +1,10 @@ -from typing import Any -import numpy as np import cv2 +import numpy as np import pyds from gi.repository import Gst -from src.deepstream.helpers.remove_background import remove_background, MORPH_KERNEL +from src.deepstream.helpers.remove_background import remove_background + def remove_background_probe(pad: Gst.Pad, info: Gst.PadProbeInfo) -> Gst.PadProbeReturn: gst_buffer = info.get_buffer() @@ -23,4 +23,4 @@ def remove_background_probe(pad: Gst.Pad, info: Gst.PadProbeInfo) -> Gst.PadProb except Exception as e: raise RuntimeError("BackgroundRemovalProbe: failed to fetch/map NvBufSurface") from e - return Gst.PadProbeReturn.OK \ No newline at end of file + return Gst.PadProbeReturn.OK From 11e082feb7526e980b8c3e8ff86989bc864a19a6 Mon Sep 17 00:00:00 2001 From: Sarah Gershuni Date: Sun, 23 Nov 2025 20:36:00 +0200 Subject: [PATCH 6/6] move load_class_labels to shared helper and update all files using it --- src/deepstream/helpers/load_class_labels.py | 11 +++++++++++ src/deepstream/pipelines/nvmsgbroker_pipeline.py | 12 +----------- .../pipelines/pipeline_onnx_real_input.py | 11 +---------- src/deepstream/pipelines/pipeline_onnx_test.py | 13 +------------ 4 files changed, 14 insertions(+), 33 deletions(-) create mode 100644 src/deepstream/helpers/load_class_labels.py diff --git a/src/deepstream/helpers/load_class_labels.py b/src/deepstream/helpers/load_class_labels.py new file mode 100644 index 0000000..1b34721 --- /dev/null +++ b/src/deepstream/helpers/load_class_labels.py @@ -0,0 +1,11 @@ +from typing import List + +PLANT_LABELS = "/workspace/configs/crop_and_weed_83_classes.txt" + +# Load class labels +def load_class_labels() -> List[str]: + try: + with open(PLANT_LABELS, "r") as f: + return [line.strip() for line in f.readlines()] + except: + return [f"class_{i}" for i in range(83)] diff --git a/src/deepstream/pipelines/nvmsgbroker_pipeline.py b/src/deepstream/pipelines/nvmsgbroker_pipeline.py index 45d7e7a..606e9b0 100644 --- a/src/deepstream/pipelines/nvmsgbroker_pipeline.py +++ b/src/deepstream/pipelines/nvmsgbroker_pipeline.py @@ -10,6 +10,7 @@ gi.require_version("Gst", "1.0") +from src.deepstream.helpers.load_class_labels import load_class_labels from src.deepstream.helpers.meta_tensor_extractor import TensorExtractor from src.deepstream.helpers.softmax_topk_classifier import ClassificationPrediction, SoftmaxTopKClassifier from src.model_conversion.onnx_to_trt import build_engine_if_missing @@ -19,17 +20,6 @@ CONFIG_FILE = "/workspace/configs/mobilenet.txt" MSGCONV_CONFIG = "/workspace/configs/nvmsgbroker_msgconv_config.txt" -CLASS_LABELS_FILE = "/workspace/configs/crop_and_weed_83_classes.txt" - - -def load_class_labels() -> List[str]: - try: - with open(CLASS_LABELS_FILE) as f: - return [line.strip() for line in f] - except Exception: - # Fallback to dummy labels if file is missing - return [f"class_{i}" for i in range(83)] - CLASS_LABELS = load_class_labels() diff --git a/src/deepstream/pipelines/pipeline_onnx_real_input.py b/src/deepstream/pipelines/pipeline_onnx_real_input.py index 2e09ae5..c0ea1bb 100644 --- a/src/deepstream/pipelines/pipeline_onnx_real_input.py +++ b/src/deepstream/pipelines/pipeline_onnx_real_input.py @@ -8,21 +8,12 @@ gi.require_version("Gst", "1.0") from gi.repository import GLib, Gst +from src.deepstream.helpers.load_class_labels import load_class_labels from src.deepstream.helpers.meta_tensor_extractor import TensorExtractor from src.deepstream.helpers.softmax_topk_classifier import ( SoftmaxTopKClassifier, ) -CLASS_LABELS_FILE = "/workspace/configs/crop_and_weed_83_classes.txt" - - -def load_class_labels() -> List[str]: - try: - with open(CLASS_LABELS_FILE) as f: - return [line.strip() for line in f] - except Exception: - return [f"class_{i}" for i in range(83)] - def list_image_files(folder: str) -> List[str]: exts = (".jpg", ".jpeg", ".png") diff --git a/src/deepstream/pipelines/pipeline_onnx_test.py b/src/deepstream/pipelines/pipeline_onnx_test.py index 787902c..9407b17 100755 --- a/src/deepstream/pipelines/pipeline_onnx_test.py +++ b/src/deepstream/pipelines/pipeline_onnx_test.py @@ -8,6 +8,7 @@ import pyds from gi.repository import Gst # noqa: E402 +from src.deepstream.helpers.load_class_labels import load_class_labels from src.deepstream.helpers.meta_tensor_extractor import TensorExtractor from src.deepstream.helpers.softmax_topk_classifier import ( ClassificationPrediction, @@ -15,18 +16,6 @@ ) from src.model_conversion.onnx_to_trt import build_engine_if_missing -CLASS_LABELS_FILE = "/workspace/configs/crop_and_weed_83_classes.txt" - - -def load_class_labels() -> List[str]: - try: - with open(CLASS_LABELS_FILE) as f: - return [line.strip() for line in f] - except Exception: - # Fallback to dummy labels if file is missing - return [f"class_{i}" for i in range(83)] - - CLASS_LABELS = load_class_labels() # Reuse the tensor extractor + softmax top-k classifier from your RTSP pipeline