From d49321b1069dc2e33bad67285dccc4001c13b25a Mon Sep 17 00:00:00 2001 From: kivkiv12345 Date: Sun, 10 Aug 2025 11:23:51 +0200 Subject: [PATCH 1/2] Added `csp_zmqhub__filters()` To change promisc settings after init --- include/csp/interfaces/csp_if_zmqhub.h | 4 ++ src/interfaces/csp_if_zmqhub.c | 78 ++++++++++++++++++-------- 2 files changed, 60 insertions(+), 22 deletions(-) diff --git a/include/csp/interfaces/csp_if_zmqhub.h b/include/csp/interfaces/csp_if_zmqhub.h index 17d972442..893d39a4a 100644 --- a/include/csp/interfaces/csp_if_zmqhub.h +++ b/include/csp/interfaces/csp_if_zmqhub.h @@ -123,6 +123,10 @@ int csp_zmqhub_init_w_name_endpoints_rxfilter(const char * ifname, uint16_t addr int csp_zmqhub_init_filter2(const char * ifname, const char * host, uint16_t addr, uint16_t netmask, int promisc, csp_iface_t ** return_interface, char * sec_key, uint16_t subport, uint16_t pubport); +void csp_zmqhub_remove_filters(csp_iface_t * zmq_iface); +void csp_zmqhub_add_filters(csp_iface_t * zmq_iface); + + #ifdef __cplusplus } #endif diff --git a/src/interfaces/csp_if_zmqhub.c b/src/interfaces/csp_if_zmqhub.c index 815faba28..c2e774d06 100644 --- a/src/interfaces/csp_if_zmqhub.c +++ b/src/interfaces/csp_if_zmqhub.c @@ -18,6 +18,9 @@ typedef struct { void * context; void * publisher; void * subscriber; + /* We must allocate filters per interface, as ZMQ does not copy the filter value to the + outgoing packet for each setsockopt call. */ + uint16_t filt[4][3]; char name[CSP_IFLIST_NAME_MAX + 1]; csp_iface_t iface; } zmq_driver_t; @@ -218,6 +221,53 @@ int csp_zmqhub_init_w_name_endpoints_rxfilter(const char * ifname, uint16_t addr return CSP_ERR_NONE; } +void csp_zmqhub_remove_filters(csp_iface_t * zmq_iface) { + + int ret = 0; + zmq_driver_t * drv = zmq_iface->driver_data; + const uint16_t addr = zmq_iface->addr; + const uint16_t hostmask = (1 << (csp_id_get_host_bits() - zmq_iface->netmask)) - 1; + + /* Unsubscribe from any current filters */ + for (int i = 0; i < 4; i++) { + //int i = CSP_PRIO_NORM; + drv->filt[i][0] = __builtin_bswap16((i << 14) | addr); + drv->filt[i][1] = __builtin_bswap16((i << 14) | addr | hostmask); + drv->filt[i][2] = __builtin_bswap16((i << 14) | 16383); + ret = zmq_setsockopt(drv->subscriber, ZMQ_UNSUBSCRIBE, &drv->filt[i][0], 2); + ret = zmq_setsockopt(drv->subscriber, ZMQ_UNSUBSCRIBE, &drv->filt[i][1], 2); + ret = zmq_setsockopt(drv->subscriber, ZMQ_UNSUBSCRIBE, &drv->filt[i][2], 2); + } + + /* subscribe to all packets - no filter */ + ret = zmq_setsockopt(drv->subscriber, ZMQ_SUBSCRIBE, NULL, 0); + assert(ret == 0); +} + +void csp_zmqhub_add_filters(csp_iface_t * zmq_iface) { + + int ret = 0; + zmq_driver_t * drv = zmq_iface->driver_data; + const uint16_t addr = zmq_iface->addr; + const uint16_t hostmask = (1 << (csp_id_get_host_bits() - zmq_iface->netmask)) - 1; + + /* Subscribe to all packets - no filter */ + ret = zmq_setsockopt(drv->subscriber, ZMQ_UNSUBSCRIBE, NULL, 0); + assert(ret == 0); + + /* Subscribe to unpromiscuous filters */ + for (int i = 0; i < 4; i++) { + //int i = CSP_PRIO_NORM; + drv->filt[i][0] = __builtin_bswap16((i << 14) | addr); + drv->filt[i][1] = __builtin_bswap16((i << 14) | addr | hostmask); + drv->filt[i][2] = __builtin_bswap16((i << 14) | 16383); + ret = zmq_setsockopt(drv->subscriber, ZMQ_SUBSCRIBE, &drv->filt[i][0], 2); + ret = zmq_setsockopt(drv->subscriber, ZMQ_SUBSCRIBE, &drv->filt[i][1], 2); + ret = zmq_setsockopt(drv->subscriber, ZMQ_SUBSCRIBE, &drv->filt[i][2], 2); + } + assert(ret == 0); +} + int csp_zmqhub_init_filter2(const char * ifname, const char * host, uint16_t addr, uint16_t netmask, int promisc, csp_iface_t ** return_interface, char * sec_key, uint16_t subport, uint16_t pubport) { /* ZMQ will cause valgrind errors if `sec_key` isn't exactly 40 characters long. @@ -249,6 +299,9 @@ int csp_zmqhub_init_filter2(const char * ifname, const char * host, uint16_t add drv->iface.driver_data = drv; drv->iface.nexthop = csp_zmqhub_tx; + drv->iface.addr = addr; + drv->iface.netmask = netmask; + drv->context = zmq_ctx_new(); assert(drv->context != NULL); @@ -293,9 +346,6 @@ int csp_zmqhub_init_filter2(const char * ifname, const char * host, uint16_t add zmq_setsockopt(drv->subscriber, ZMQ_TCP_KEEPALIVE_IDLE, &idle, sizeof(idle)); zmq_setsockopt(drv->subscriber, ZMQ_TCP_KEEPALIVE_CNT, &cnt, sizeof(cnt)); zmq_setsockopt(drv->subscriber, ZMQ_TCP_KEEPALIVE_INTVL, &intvl, sizeof(intvl)); - - /* Generate filters */ - uint16_t hostmask = (1 << (csp_id_get_host_bits() - netmask)) - 1; /* Connect to server */ ret = zmq_connect(drv->publisher, pub); @@ -305,28 +355,12 @@ int csp_zmqhub_init_filter2(const char * ifname, const char * host, uint16_t add (void)ret; if (promisc) { - // subscribe to all packets - no filter - ret = zmq_setsockopt(drv->subscriber, ZMQ_SUBSCRIBE, NULL, 0); - assert(ret == 0); + csp_zmqhub_remove_filters(&drv->iface); } else { - - /* This needs to be static, because ZMQ does not copy the filter value to the - * outgoing packet for each setsockopt call */ - static uint16_t filt[4][3]; - - for (int i = 0; i < 4; i++) { - //int i = CSP_PRIO_NORM; - filt[i][0] = __builtin_bswap16((i << 14) | addr); - filt[i][1] = __builtin_bswap16((i << 14) | addr | hostmask); - filt[i][2] = __builtin_bswap16((i << 14) | 16383); - ret = zmq_setsockopt(drv->subscriber, ZMQ_SUBSCRIBE, &filt[i][0], 2); - ret = zmq_setsockopt(drv->subscriber, ZMQ_SUBSCRIBE, &filt[i][1], 2); - ret = zmq_setsockopt(drv->subscriber, ZMQ_SUBSCRIBE, &filt[i][2], 2); - } - - } + csp_zmqhub_add_filters(&drv->iface); + } /* Start RX thread */ From dad2db680790f7eb818d73ce0021a9c5d468ef7c Mon Sep 17 00:00:00 2001 From: kivkiv12345 Date: Sun, 10 Aug 2025 11:51:48 +0200 Subject: [PATCH 2/2] Fix build in release mode --- src/interfaces/csp_if_zmqhub.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/interfaces/csp_if_zmqhub.c b/src/interfaces/csp_if_zmqhub.c index c2e774d06..1c4c9fad6 100644 --- a/src/interfaces/csp_if_zmqhub.c +++ b/src/interfaces/csp_if_zmqhub.c @@ -242,6 +242,7 @@ void csp_zmqhub_remove_filters(csp_iface_t * zmq_iface) { /* subscribe to all packets - no filter */ ret = zmq_setsockopt(drv->subscriber, ZMQ_SUBSCRIBE, NULL, 0); assert(ret == 0); + (void)ret; } void csp_zmqhub_add_filters(csp_iface_t * zmq_iface) { @@ -266,6 +267,7 @@ void csp_zmqhub_add_filters(csp_iface_t * zmq_iface) { ret = zmq_setsockopt(drv->subscriber, ZMQ_SUBSCRIBE, &drv->filt[i][2], 2); } assert(ret == 0); + (void)ret; } int csp_zmqhub_init_filter2(const char * ifname, const char * host, uint16_t addr, uint16_t netmask, int promisc, csp_iface_t ** return_interface, char * sec_key, uint16_t subport, uint16_t pubport) {