Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions src/deepstream/helpers/load_class_labels.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from typing import List

PLANT_LABELS = "/workspace/configs/crop_and_weed_83_classes.txt"

# Load class labels
def load_class_labels() -> List[str]:
try:
with open(PLANT_LABELS, "r") as f:
return [line.strip() for line in f.readlines()]
except:
return [f"class_{i}" for i in range(83)]
68 changes: 68 additions & 0 deletions src/deepstream/helpers/plant_msg_meta_builder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import sys
from pydantic import BaseModel

import gi
gi.require_version("Gst", "1.0")

from gi.repository import Gst
import pyds

from src.deepstream.helpers.softmax_topk_classifier import ClassificationPrediction

class PlantEvent(BaseModel):
frame_id: int
plant_id: str
prediction: ClassificationPrediction


class NvdsPlantEventBuilder:
"""
For the reference, please check
https://github.com/NVIDIA-AI-IOT/deepstream_python_apps/blob/9b27f02ffea46a3ded2ad26b3eea27ef3e2dfded/apps/deepstream-test4/deepstream_test_4.py
"""

def build(self, batch_meta, frame_meta, event: PlantEvent) -> None:
user_event_meta = pyds.nvds_acquire_user_meta_from_pool(batch_meta)
msg_meta = pyds.NvDsEventMsgMeta.cast(pyds.alloc_nvds_event_msg_meta(user_event_meta))

# Generic IDs / bookkeeping
msg_meta.frameId = event.frame_id
msg_meta.objectId = event.plant_id
msg_meta.trackingId = event.frame_id
msg_meta.objClassId = event.prediction.class_id
msg_meta.confidence = float(event.prediction.confidence)

# Link to sensor0 / place0 / analytics0 from msgconv config
msg_meta.sensorId = 0
msg_meta.placeId = 0
msg_meta.moduleId = 0

msg_meta.type = pyds.NvDsEventType.NVDS_EVENT_CUSTOM
msg_meta.objType = pyds.NvDsObjectType.NVDS_OBJECT_TYPE_PERSON

# Timestamp
msg_meta.ts = pyds.alloc_buffer(32)
pyds.generate_ts_rfc3339(msg_meta.ts, 32)

# Dummy bbox (positive; you can plug in real ROIs later)
msg_meta.bbox.top = -1
msg_meta.bbox.left = -1
msg_meta.bbox.width = 0
msg_meta.bbox.height = 0

# Attach person object repurposed to carry plant info
obj = pyds.alloc_nvds_person_object()
person = pyds.NvDsPersonObject.cast(obj)

person.age = 0
person.gender = "plant"
person.hair = "none"
person.cap = "none"
person.apparel = event.prediction.class_name

msg_meta.extMsg = obj
msg_meta.extMsgSize = sys.getsizeof(pyds.NvDsPersonObject)

user_event_meta.user_meta_data = msg_meta
user_event_meta.base_meta.meta_type = pyds.NvDsMetaType.NVDS_EVENT_MSG_META
pyds.nvds_add_user_meta_to_frame(frame_meta, user_event_meta)
23 changes: 23 additions & 0 deletions src/deepstream/helpers/remove_background.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import numpy as np
import cv2

MORPH_KERNEL = 34

def remove_background(frame_bgr: np.ndarray) -> np.ndarray:
rgb = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2RGB).astype(np.float32)
R, G, B = rgb[..., 0], rgb[..., 1], rgb[..., 2]

exg = 2.0 * G - R - B
exg_norm = cv2.normalize(exg, None, 0, 255, cv2.NORM_MINMAX).astype(np.uint8)

_, mask = cv2.threshold(exg_norm, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)

kernel = cv2.getStructuringElement(
cv2.MORPH_ELLIPSE,
(MORPH_KERNEL, MORPH_KERNEL)
)
mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, kernel)

out = np.zeros_like(frame_bgr)
out[mask > 0] = frame_bgr[mask > 0]
return out
134 changes: 134 additions & 0 deletions src/deepstream/pipelines/deepstream_pipeline_cpu.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
import os
from datetime import datetime
from typing import Any, Dict, List
import gi
import numpy as np
import cv2
import pyds

gi.require_version("Gst", "1.0")
from gi.repository import GLib, Gst

from src.frame_comparison.frame_change_detector import FrameChangeDetector
from src.model_conversion.onnx_to_trt import build_engine_if_missing

from src.deepstream.helpers.load_class_labels import load_class_labels
from src.deepstream.helpers.meta_tensor_extractor import TensorExtractor
from src.deepstream.helpers.softmax_topk_classifier import ClassificationPrediction, SoftmaxTopKClassifier
from src.deepstream.helpers.plant_msg_meta_builder import NvdsPlantEventBuilder, PlantEvent
from src.deepstream.helpers.pipeline_runner import run_pipeline

from src.deepstream.probes.background_removal.cpu.background_removal_probe import remove_background_probe
from src.deepstream.probes.db_message_meta_probe import DbMessageMetaProbe
from src.deepstream.probes.frame_comparison.cpu.frame_skipping_probe import frame_skip_probe

# Configuration
RTSP_PORT = os.environ.get("RTSP_PORT", "8554")
RTSP_URL = f"rtsp://127.0.0.1:{RTSP_PORT}/test"

CONFIG_FILE: str = "/workspace/configs/resnet18.txt"
MSGCONV_CONFIG: str = "/workspace/configs/nvmsgbroker_msgconv_config.txt"
MQTT_CONN_STR = "172.17.0.1;1883;agstream-client"
MQTT_TOPIC = "deepstream/predictions"

CLASS_LABELS = load_class_labels()

def build_pipeline() -> Gst.Pipeline:
"""Build DeepStream pipeline with background removal, frame skipping, and message broker."""
pipeline = Gst.Pipeline.new("final-cpu-pipeline")

# Elements
rtspsrc = Gst.ElementFactory.make("rtspsrc", "source")
depay = Gst.ElementFactory.make("rtph264depay", "depay")
parse = Gst.ElementFactory.make("h264parse", "parse")
decode = Gst.ElementFactory.make("decodebin", "decode")
convert = Gst.ElementFactory.make("videoconvert", "convert")
nvvideoconvert = Gst.ElementFactory.make("nvvideoconvert", "nvvideoconvert")
capsfilter = Gst.ElementFactory.make("capsfilter", "capsfilter")
streammux = Gst.ElementFactory.make("nvstreammux", "streammux")
nvinfer = Gst.ElementFactory.make("nvinfer", "nvinfer")
nvmsgconv = Gst.ElementFactory.make("nvmsgconv", "nvmsgconv")
nvmsgbroker = Gst.ElementFactory.make("nvmsgbroker", "nvmsgbroker")

for e in [
rtspsrc,
depay,
parse,
decode,
convert,
nvvideoconvert,
capsfilter,
streammux,
nvinfer,
nvmsgconv,
nvmsgbroker,
]:
assert e is not None, f"Failed to create element {e}"
pipeline.add(e)

# Configure elements
rtspsrc.set_property("location", RTSP_URL)
rtspsrc.set_property("latency", 200)
streammux.set_property("batch-size", 1)
streammux.set_property("width", 256)
streammux.set_property("height", 256)
nvinfer.set_property("config-file-path", CONFIG_FILE)
caps = Gst.Caps.from_string("video/x-raw(memory:NVMM), format=RGBA")
capsfilter.set_property("caps", caps)
nvmsgconv.set_property("config", MSGCONV_CONFIG)
nvmsgconv.set_property("payload-type", 0)
nvmsgbroker.set_property("proto-lib", "/opt/nvidia/deepstream/deepstream-6.4/lib/libnvds_mqtt_proto.so")
nvmsgbroker.set_property("conn-str", MQTT_CONN_STR)
nvmsgbroker.set_property("topic", MQTT_TOPIC)
nvmsgbroker.set_property("sync", False)

# Dynamic pad linking
def on_pad_added_rtspsrc(src: Any, pad: Any) -> None:
sinkpad = depay.get_static_pad("sink")
if not sinkpad.is_linked():
pad.link(sinkpad)

rtspsrc.connect("pad-added", on_pad_added_rtspsrc)

def on_pad_added_decode(src: Any, pad: Any) -> None:
sinkpad = convert.get_static_pad("sink")
if not sinkpad.is_linked():
pad.link(sinkpad)

decode.connect("pad-added", on_pad_added_decode)

# Link capsfilter → streammux
depay.link(parse)
parse.link(decode)
convert.link(nvvideoconvert)
nvvideoconvert.link(capsfilter)
srcpad = capsfilter.get_static_pad("src")
sinkpad = streammux.get_request_pad("sink_0")
srcpad.link(sinkpad)

streammux.link(nvinfer)
nvinfer.link(nvmsgconv)
nvmsgconv.link(nvmsgbroker)

frame_change_detector = FrameChangeDetector()

streammux_src_pad = streammux.get_static_pad("src")
streammux_src_pad.add_probe(Gst.PadProbeType.BUFFER, frame_skip_probe, frame_change_detector)

streammux_src_pad.add_probe(Gst.PadProbeType.BUFFER, remove_background_probe)

tensor_extractor = TensorExtractor()
classifier = SoftmaxTopKClassifier(CLASS_LABELS)
plant_event_builder = NvdsPlantEventBuilder()
db_message_meta_probe = DbMessageMetaProbe(tensor_extractor, classifier, plant_event_builder)

nvinfer_src_pad = nvinfer.get_static_pad("src")
nvinfer_src_pad.add_probe(Gst.PadProbeType.BUFFER, db_message_meta_probe.pad_probe)

return pipeline

if __name__ == "__main__":
Gst.init(None)
build_engine_if_missing(CONFIG_FILE)
pipeline = build_pipeline()
run_pipeline(pipeline)
12 changes: 1 addition & 11 deletions src/deepstream/pipelines/nvmsgbroker_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
gi.require_version("Gst", "1.0")


from src.deepstream.helpers.load_class_labels import load_class_labels
from src.deepstream.helpers.meta_tensor_extractor import TensorExtractor
from src.deepstream.helpers.softmax_topk_classifier import ClassificationPrediction, SoftmaxTopKClassifier
from src.model_conversion.onnx_to_trt import build_engine_if_missing
Expand All @@ -19,17 +20,6 @@

CONFIG_FILE = "/workspace/configs/mobilenet.txt"
MSGCONV_CONFIG = "/workspace/configs/nvmsgbroker_msgconv_config.txt"
CLASS_LABELS_FILE = "/workspace/configs/crop_and_weed_83_classes.txt"


def load_class_labels() -> List[str]:
try:
with open(CLASS_LABELS_FILE) as f:
return [line.strip() for line in f]
except Exception:
# Fallback to dummy labels if file is missing
return [f"class_{i}" for i in range(83)]


CLASS_LABELS = load_class_labels()

Expand Down
11 changes: 1 addition & 10 deletions src/deepstream/pipelines/pipeline_onnx_real_input.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,12 @@
gi.require_version("Gst", "1.0")
from gi.repository import GLib, Gst

from src.deepstream.helpers.load_class_labels import load_class_labels
from src.deepstream.helpers.meta_tensor_extractor import TensorExtractor
from src.deepstream.helpers.softmax_topk_classifier import (
SoftmaxTopKClassifier,
)

CLASS_LABELS_FILE = "/workspace/configs/crop_and_weed_83_classes.txt"


def load_class_labels() -> List[str]:
try:
with open(CLASS_LABELS_FILE) as f:
return [line.strip() for line in f]
except Exception:
return [f"class_{i}" for i in range(83)]


def list_image_files(folder: str) -> List[str]:
exts = (".jpg", ".jpeg", ".png")
Expand Down
13 changes: 1 addition & 12 deletions src/deepstream/pipelines/pipeline_onnx_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,14 @@
import pyds
from gi.repository import Gst # noqa: E402

from src.deepstream.helpers.load_class_labels import load_class_labels
from src.deepstream.helpers.meta_tensor_extractor import TensorExtractor
from src.deepstream.helpers.softmax_topk_classifier import (
ClassificationPrediction,
SoftmaxTopKClassifier,
)
from src.model_conversion.onnx_to_trt import build_engine_if_missing

CLASS_LABELS_FILE = "/workspace/configs/crop_and_weed_83_classes.txt"


def load_class_labels() -> List[str]:
try:
with open(CLASS_LABELS_FILE) as f:
return [line.strip() for line in f]
except Exception:
# Fallback to dummy labels if file is missing
return [f"class_{i}" for i in range(83)]


CLASS_LABELS = load_class_labels()

# Reuse the tensor extractor + softmax top-k classifier from your RTSP pipeline
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import cv2
import numpy as np
import pyds
from gi.repository import Gst

from src.deepstream.helpers.remove_background import remove_background


def remove_background_probe(pad: Gst.Pad, info: Gst.PadProbeInfo) -> Gst.PadProbeReturn:
gst_buffer = info.get_buffer()
if not gst_buffer:
return Gst.PadProbeReturn.OK

try:
surface = pyds.get_nvds_buf_surface(hash(gst_buffer), 0)
frame_rgba = np.array(surface, copy=False, order="C")
frame_bgr = cv2.cvtColor(frame_rgba, cv2.COLOR_RGBA2BGR)

masked_bgr = remove_background(frame_bgr)
frame_rgba_new = cv2.cvtColor(masked_bgr, cv2.COLOR_BGR2RGBA)

np.copyto(frame_rgba, frame_rgba_new)
except Exception as e:
raise RuntimeError("BackgroundRemovalProbe: failed to fetch/map NvBufSurface") from e

return Gst.PadProbeReturn.OK
Loading
Loading