diff --git a/src/deepstream/helpers/load_class_labels.py b/src/deepstream/helpers/load_class_labels.py new file mode 100644 index 0000000..1b34721 --- /dev/null +++ b/src/deepstream/helpers/load_class_labels.py @@ -0,0 +1,11 @@ +from typing import List + +PLANT_LABELS = "/workspace/configs/crop_and_weed_83_classes.txt" + +# Load class labels +def load_class_labels() -> List[str]: + try: + with open(PLANT_LABELS, "r") as f: + return [line.strip() for line in f.readlines()] + except: + return [f"class_{i}" for i in range(83)] diff --git a/src/deepstream/helpers/plant_msg_meta_builder.py b/src/deepstream/helpers/plant_msg_meta_builder.py new file mode 100644 index 0000000..e886768 --- /dev/null +++ b/src/deepstream/helpers/plant_msg_meta_builder.py @@ -0,0 +1,68 @@ +import sys +from pydantic import BaseModel + +import gi +gi.require_version("Gst", "1.0") + +from gi.repository import Gst +import pyds + +from src.deepstream.helpers.softmax_topk_classifier import ClassificationPrediction + +class PlantEvent(BaseModel): + frame_id: int + plant_id: str + prediction: ClassificationPrediction + + +class NvdsPlantEventBuilder: + """ + For the reference, please check + https://github.com/NVIDIA-AI-IOT/deepstream_python_apps/blob/9b27f02ffea46a3ded2ad26b3eea27ef3e2dfded/apps/deepstream-test4/deepstream_test_4.py + """ + + def build(self, batch_meta, frame_meta, event: PlantEvent) -> None: + user_event_meta = pyds.nvds_acquire_user_meta_from_pool(batch_meta) + msg_meta = pyds.NvDsEventMsgMeta.cast(pyds.alloc_nvds_event_msg_meta(user_event_meta)) + + # Generic IDs / bookkeeping + msg_meta.frameId = event.frame_id + msg_meta.objectId = event.plant_id + msg_meta.trackingId = event.frame_id + msg_meta.objClassId = event.prediction.class_id + msg_meta.confidence = float(event.prediction.confidence) + + # Link to sensor0 / place0 / analytics0 from msgconv config + msg_meta.sensorId = 0 + msg_meta.placeId = 0 + msg_meta.moduleId = 0 + + msg_meta.type = pyds.NvDsEventType.NVDS_EVENT_CUSTOM + msg_meta.objType = pyds.NvDsObjectType.NVDS_OBJECT_TYPE_PERSON + + # Timestamp + msg_meta.ts = pyds.alloc_buffer(32) + pyds.generate_ts_rfc3339(msg_meta.ts, 32) + + # Dummy bbox (positive; you can plug in real ROIs later) + msg_meta.bbox.top = -1 + msg_meta.bbox.left = -1 + msg_meta.bbox.width = 0 + msg_meta.bbox.height = 0 + + # Attach person object repurposed to carry plant info + obj = pyds.alloc_nvds_person_object() + person = pyds.NvDsPersonObject.cast(obj) + + person.age = 0 + person.gender = "plant" + person.hair = "none" + person.cap = "none" + person.apparel = event.prediction.class_name + + msg_meta.extMsg = obj + msg_meta.extMsgSize = sys.getsizeof(pyds.NvDsPersonObject) + + user_event_meta.user_meta_data = msg_meta + user_event_meta.base_meta.meta_type = pyds.NvDsMetaType.NVDS_EVENT_MSG_META + pyds.nvds_add_user_meta_to_frame(frame_meta, user_event_meta) diff --git a/src/deepstream/helpers/remove_background.py b/src/deepstream/helpers/remove_background.py new file mode 100644 index 0000000..f5e08d3 --- /dev/null +++ b/src/deepstream/helpers/remove_background.py @@ -0,0 +1,23 @@ +import numpy as np +import cv2 + +MORPH_KERNEL = 34 + +def remove_background(frame_bgr: np.ndarray) -> np.ndarray: + rgb = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2RGB).astype(np.float32) + R, G, B = rgb[..., 0], rgb[..., 1], rgb[..., 2] + + exg = 2.0 * G - R - B + exg_norm = cv2.normalize(exg, None, 0, 255, cv2.NORM_MINMAX).astype(np.uint8) + + _, mask = cv2.threshold(exg_norm, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU) + + kernel = cv2.getStructuringElement( + cv2.MORPH_ELLIPSE, + (MORPH_KERNEL, MORPH_KERNEL) + ) + mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, kernel) + + out = np.zeros_like(frame_bgr) + out[mask > 0] = frame_bgr[mask > 0] + return out diff --git a/src/deepstream/pipelines/deepstream_pipeline_cpu.py b/src/deepstream/pipelines/deepstream_pipeline_cpu.py new file mode 100644 index 0000000..9f4b15b --- /dev/null +++ b/src/deepstream/pipelines/deepstream_pipeline_cpu.py @@ -0,0 +1,134 @@ +import os +from datetime import datetime +from typing import Any, Dict, List +import gi +import numpy as np +import cv2 +import pyds + +gi.require_version("Gst", "1.0") +from gi.repository import GLib, Gst + +from src.frame_comparison.frame_change_detector import FrameChangeDetector +from src.model_conversion.onnx_to_trt import build_engine_if_missing + +from src.deepstream.helpers.load_class_labels import load_class_labels +from src.deepstream.helpers.meta_tensor_extractor import TensorExtractor +from src.deepstream.helpers.softmax_topk_classifier import ClassificationPrediction, SoftmaxTopKClassifier +from src.deepstream.helpers.plant_msg_meta_builder import NvdsPlantEventBuilder, PlantEvent +from src.deepstream.helpers.pipeline_runner import run_pipeline + +from src.deepstream.probes.background_removal.cpu.background_removal_probe import remove_background_probe +from src.deepstream.probes.db_message_meta_probe import DbMessageMetaProbe +from src.deepstream.probes.frame_comparison.cpu.frame_skipping_probe import frame_skip_probe + +# Configuration +RTSP_PORT = os.environ.get("RTSP_PORT", "8554") +RTSP_URL = f"rtsp://127.0.0.1:{RTSP_PORT}/test" + +CONFIG_FILE: str = "/workspace/configs/resnet18.txt" +MSGCONV_CONFIG: str = "/workspace/configs/nvmsgbroker_msgconv_config.txt" +MQTT_CONN_STR = "172.17.0.1;1883;agstream-client" +MQTT_TOPIC = "deepstream/predictions" + +CLASS_LABELS = load_class_labels() + +def build_pipeline() -> Gst.Pipeline: + """Build DeepStream pipeline with background removal, frame skipping, and message broker.""" + pipeline = Gst.Pipeline.new("final-cpu-pipeline") + + # Elements + rtspsrc = Gst.ElementFactory.make("rtspsrc", "source") + depay = Gst.ElementFactory.make("rtph264depay", "depay") + parse = Gst.ElementFactory.make("h264parse", "parse") + decode = Gst.ElementFactory.make("decodebin", "decode") + convert = Gst.ElementFactory.make("videoconvert", "convert") + nvvideoconvert = Gst.ElementFactory.make("nvvideoconvert", "nvvideoconvert") + capsfilter = Gst.ElementFactory.make("capsfilter", "capsfilter") + streammux = Gst.ElementFactory.make("nvstreammux", "streammux") + nvinfer = Gst.ElementFactory.make("nvinfer", "nvinfer") + nvmsgconv = Gst.ElementFactory.make("nvmsgconv", "nvmsgconv") + nvmsgbroker = Gst.ElementFactory.make("nvmsgbroker", "nvmsgbroker") + + for e in [ + rtspsrc, + depay, + parse, + decode, + convert, + nvvideoconvert, + capsfilter, + streammux, + nvinfer, + nvmsgconv, + nvmsgbroker, + ]: + assert e is not None, f"Failed to create element {e}" + pipeline.add(e) + + # Configure elements + rtspsrc.set_property("location", RTSP_URL) + rtspsrc.set_property("latency", 200) + streammux.set_property("batch-size", 1) + streammux.set_property("width", 256) + streammux.set_property("height", 256) + nvinfer.set_property("config-file-path", CONFIG_FILE) + caps = Gst.Caps.from_string("video/x-raw(memory:NVMM), format=RGBA") + capsfilter.set_property("caps", caps) + nvmsgconv.set_property("config", MSGCONV_CONFIG) + nvmsgconv.set_property("payload-type", 0) + nvmsgbroker.set_property("proto-lib", "/opt/nvidia/deepstream/deepstream-6.4/lib/libnvds_mqtt_proto.so") + nvmsgbroker.set_property("conn-str", MQTT_CONN_STR) + nvmsgbroker.set_property("topic", MQTT_TOPIC) + nvmsgbroker.set_property("sync", False) + + # Dynamic pad linking + def on_pad_added_rtspsrc(src: Any, pad: Any) -> None: + sinkpad = depay.get_static_pad("sink") + if not sinkpad.is_linked(): + pad.link(sinkpad) + + rtspsrc.connect("pad-added", on_pad_added_rtspsrc) + + def on_pad_added_decode(src: Any, pad: Any) -> None: + sinkpad = convert.get_static_pad("sink") + if not sinkpad.is_linked(): + pad.link(sinkpad) + + decode.connect("pad-added", on_pad_added_decode) + + # Link capsfilter → streammux + depay.link(parse) + parse.link(decode) + convert.link(nvvideoconvert) + nvvideoconvert.link(capsfilter) + srcpad = capsfilter.get_static_pad("src") + sinkpad = streammux.get_request_pad("sink_0") + srcpad.link(sinkpad) + + streammux.link(nvinfer) + nvinfer.link(nvmsgconv) + nvmsgconv.link(nvmsgbroker) + + frame_change_detector = FrameChangeDetector() + + streammux_src_pad = streammux.get_static_pad("src") + streammux_src_pad.add_probe(Gst.PadProbeType.BUFFER, frame_skip_probe, frame_change_detector) + + streammux_src_pad.add_probe(Gst.PadProbeType.BUFFER, remove_background_probe) + + tensor_extractor = TensorExtractor() + classifier = SoftmaxTopKClassifier(CLASS_LABELS) + plant_event_builder = NvdsPlantEventBuilder() + db_message_meta_probe = DbMessageMetaProbe(tensor_extractor, classifier, plant_event_builder) + + nvinfer_src_pad = nvinfer.get_static_pad("src") + nvinfer_src_pad.add_probe(Gst.PadProbeType.BUFFER, db_message_meta_probe.pad_probe) + + return pipeline + +if __name__ == "__main__": + Gst.init(None) + build_engine_if_missing(CONFIG_FILE) + pipeline = build_pipeline() + run_pipeline(pipeline) diff --git a/src/deepstream/pipelines/nvmsgbroker_pipeline.py b/src/deepstream/pipelines/nvmsgbroker_pipeline.py index 45d7e7a..606e9b0 100644 --- a/src/deepstream/pipelines/nvmsgbroker_pipeline.py +++ b/src/deepstream/pipelines/nvmsgbroker_pipeline.py @@ -10,6 +10,7 @@ gi.require_version("Gst", "1.0") +from src.deepstream.helpers.load_class_labels import load_class_labels from src.deepstream.helpers.meta_tensor_extractor import TensorExtractor from src.deepstream.helpers.softmax_topk_classifier import ClassificationPrediction, SoftmaxTopKClassifier from src.model_conversion.onnx_to_trt import build_engine_if_missing @@ -19,17 +20,6 @@ CONFIG_FILE = "/workspace/configs/mobilenet.txt" MSGCONV_CONFIG = "/workspace/configs/nvmsgbroker_msgconv_config.txt" -CLASS_LABELS_FILE = "/workspace/configs/crop_and_weed_83_classes.txt" - - -def load_class_labels() -> List[str]: - try: - with open(CLASS_LABELS_FILE) as f: - return [line.strip() for line in f] - except Exception: - # Fallback to dummy labels if file is missing - return [f"class_{i}" for i in range(83)] - CLASS_LABELS = load_class_labels() diff --git a/src/deepstream/pipelines/pipeline_onnx_real_input.py b/src/deepstream/pipelines/pipeline_onnx_real_input.py index 2e09ae5..c0ea1bb 100644 --- a/src/deepstream/pipelines/pipeline_onnx_real_input.py +++ b/src/deepstream/pipelines/pipeline_onnx_real_input.py @@ -8,21 +8,12 @@ gi.require_version("Gst", "1.0") from gi.repository import GLib, Gst +from src.deepstream.helpers.load_class_labels import load_class_labels from src.deepstream.helpers.meta_tensor_extractor import TensorExtractor from src.deepstream.helpers.softmax_topk_classifier import ( SoftmaxTopKClassifier, ) -CLASS_LABELS_FILE = "/workspace/configs/crop_and_weed_83_classes.txt" - - -def load_class_labels() -> List[str]: - try: - with open(CLASS_LABELS_FILE) as f: - return [line.strip() for line in f] - except Exception: - return [f"class_{i}" for i in range(83)] - def list_image_files(folder: str) -> List[str]: exts = (".jpg", ".jpeg", ".png") diff --git a/src/deepstream/pipelines/pipeline_onnx_test.py b/src/deepstream/pipelines/pipeline_onnx_test.py index 787902c..9407b17 100755 --- a/src/deepstream/pipelines/pipeline_onnx_test.py +++ b/src/deepstream/pipelines/pipeline_onnx_test.py @@ -8,6 +8,7 @@ import pyds from gi.repository import Gst # noqa: E402 +from src.deepstream.helpers.load_class_labels import load_class_labels from src.deepstream.helpers.meta_tensor_extractor import TensorExtractor from src.deepstream.helpers.softmax_topk_classifier import ( ClassificationPrediction, @@ -15,18 +16,6 @@ ) from src.model_conversion.onnx_to_trt import build_engine_if_missing -CLASS_LABELS_FILE = "/workspace/configs/crop_and_weed_83_classes.txt" - - -def load_class_labels() -> List[str]: - try: - with open(CLASS_LABELS_FILE) as f: - return [line.strip() for line in f] - except Exception: - # Fallback to dummy labels if file is missing - return [f"class_{i}" for i in range(83)] - - CLASS_LABELS = load_class_labels() # Reuse the tensor extractor + softmax top-k classifier from your RTSP pipeline diff --git a/src/deepstream/probes/background_removal/cpu/background_removal_probe.py b/src/deepstream/probes/background_removal/cpu/background_removal_probe.py new file mode 100644 index 0000000..4ba4391 --- /dev/null +++ b/src/deepstream/probes/background_removal/cpu/background_removal_probe.py @@ -0,0 +1,26 @@ +import cv2 +import numpy as np +import pyds +from gi.repository import Gst + +from src.deepstream.helpers.remove_background import remove_background + + +def remove_background_probe(pad: Gst.Pad, info: Gst.PadProbeInfo) -> Gst.PadProbeReturn: + gst_buffer = info.get_buffer() + if not gst_buffer: + return Gst.PadProbeReturn.OK + + try: + surface = pyds.get_nvds_buf_surface(hash(gst_buffer), 0) + frame_rgba = np.array(surface, copy=False, order="C") + frame_bgr = cv2.cvtColor(frame_rgba, cv2.COLOR_RGBA2BGR) + + masked_bgr = remove_background(frame_bgr) + frame_rgba_new = cv2.cvtColor(masked_bgr, cv2.COLOR_BGR2RGBA) + + np.copyto(frame_rgba, frame_rgba_new) + except Exception as e: + raise RuntimeError("BackgroundRemovalProbe: failed to fetch/map NvBufSurface") from e + + return Gst.PadProbeReturn.OK diff --git a/src/deepstream/probes/db_message_meta_probe.py b/src/deepstream/probes/db_message_meta_probe.py new file mode 100644 index 0000000..0edf235 --- /dev/null +++ b/src/deepstream/probes/db_message_meta_probe.py @@ -0,0 +1,57 @@ +from typing import Any, List +import pyds +from gi.repository import Gst + +from src.deepstream.helpers.meta_tensor_extractor import TensorExtractor +from src.deepstream.helpers.softmax_topk_classifier import ClassificationPrediction, SoftmaxTopKClassifier +from src.deepstream.helpers.plant_msg_meta_builder import NvdsPlantEventBuilder, PlantEvent + + +class DbMessageMetaProbe: + def __init__(self, tensor_extractor : TensorExtractor, classifier: SoftmaxTopKClassifier, plant_event_builder: NvdsPlantEventBuilder): + self.tensor_extractor = tensor_extractor + self.classifier = classifier + self.plant_event_builder = plant_event_builder + + def extract_tensor_output(self, frame_meta) -> List[ClassificationPrediction]: + tensor_meta = self.tensor_extractor.find_tensor_meta(frame_meta) + if tensor_meta is None: + return [] + + logits = self.tensor_extractor.extract_logits(tensor_meta) + return self.classifier.predict_from_logits(logits) + + def attach_msgmeta(self, batch_meta, frame_meta, frame_number: int, predictions: List[ClassificationPrediction]) -> None: + best = predictions[0] + event = PlantEvent( + frame_id=frame_number, + plant_id=f"plant-{frame_number}", + prediction=best, + ) + self.plant_event_builder.build(batch_meta, frame_meta, event) + + def pad_probe(self, pad: Gst.Pad, info: Gst.PadProbeInfo) -> Gst.PadProbeReturn: + buffer = info.get_buffer() + if not buffer: + return Gst.PadProbeReturn.OK + + batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(buffer)) + l_frame = batch_meta.frame_meta_list + + while l_frame: + frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data) + frame_number = frame_meta.frame_num + + predictions = self.extract_tensor_output(frame_meta) + if predictions: + top1 = predictions[0] + print( + f"[frame {frame_number}] " + f"plant='{top1.class_name}' " + f"(id={top1.class_id}, conf={top1.confidence:.3f})" + ) + self.attach_msgmeta(batch_meta, frame_meta, frame_number, predictions) + + l_frame = l_frame.next + + return Gst.PadProbeReturn.OK diff --git a/src/deepstream/probes/frame_comparison/cpu/frame_skipping_probe.py b/src/deepstream/probes/frame_comparison/cpu/frame_skipping_probe.py index f13c689..8f769b7 100644 --- a/src/deepstream/probes/frame_comparison/cpu/frame_skipping_probe.py +++ b/src/deepstream/probes/frame_comparison/cpu/frame_skipping_probe.py @@ -1,7 +1,3 @@ -import sys -import os -sys.path.append(os.path.join(os.path.dirname(__file__), '../../../..')) - from typing import Any import gi import numpy as np