Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions include/csp/interfaces/csp_if_zmqhub.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,10 @@ int csp_zmqhub_init_w_name_endpoints_rxfilter(const char * ifname, uint16_t addr
int csp_zmqhub_init_filter2(const char * ifname, const char * host, uint16_t addr, uint16_t netmask, int promisc, csp_iface_t ** return_interface, char * sec_key, uint16_t subport, uint16_t pubport);


void csp_zmqhub_remove_filters(csp_iface_t * zmq_iface);
void csp_zmqhub_add_filters(csp_iface_t * zmq_iface);


#ifdef __cplusplus
}
#endif
80 changes: 58 additions & 22 deletions src/interfaces/csp_if_zmqhub.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ typedef struct {
void * context;
void * publisher;
void * subscriber;
/* We must allocate filters per interface, as ZMQ does not copy the filter value to the
outgoing packet for each setsockopt call. */
uint16_t filt[4][3];
char name[CSP_IFLIST_NAME_MAX + 1];
csp_iface_t iface;
} zmq_driver_t;
Expand Down Expand Up @@ -218,6 +221,55 @@ int csp_zmqhub_init_w_name_endpoints_rxfilter(const char * ifname, uint16_t addr
return CSP_ERR_NONE;
}

void csp_zmqhub_remove_filters(csp_iface_t * zmq_iface) {

int ret = 0;
zmq_driver_t * drv = zmq_iface->driver_data;
const uint16_t addr = zmq_iface->addr;
const uint16_t hostmask = (1 << (csp_id_get_host_bits() - zmq_iface->netmask)) - 1;

/* Unsubscribe from any current filters */
for (int i = 0; i < 4; i++) {
//int i = CSP_PRIO_NORM;
drv->filt[i][0] = __builtin_bswap16((i << 14) | addr);
drv->filt[i][1] = __builtin_bswap16((i << 14) | addr | hostmask);
drv->filt[i][2] = __builtin_bswap16((i << 14) | 16383);
ret = zmq_setsockopt(drv->subscriber, ZMQ_UNSUBSCRIBE, &drv->filt[i][0], 2);
ret = zmq_setsockopt(drv->subscriber, ZMQ_UNSUBSCRIBE, &drv->filt[i][1], 2);
ret = zmq_setsockopt(drv->subscriber, ZMQ_UNSUBSCRIBE, &drv->filt[i][2], 2);
}

/* subscribe to all packets - no filter */
ret = zmq_setsockopt(drv->subscriber, ZMQ_SUBSCRIBE, NULL, 0);
assert(ret == 0);
(void)ret;
}

void csp_zmqhub_add_filters(csp_iface_t * zmq_iface) {

int ret = 0;
zmq_driver_t * drv = zmq_iface->driver_data;
const uint16_t addr = zmq_iface->addr;
const uint16_t hostmask = (1 << (csp_id_get_host_bits() - zmq_iface->netmask)) - 1;

/* Subscribe to all packets - no filter */
ret = zmq_setsockopt(drv->subscriber, ZMQ_UNSUBSCRIBE, NULL, 0);
assert(ret == 0);

/* Subscribe to unpromiscuous filters */
for (int i = 0; i < 4; i++) {
//int i = CSP_PRIO_NORM;
drv->filt[i][0] = __builtin_bswap16((i << 14) | addr);
drv->filt[i][1] = __builtin_bswap16((i << 14) | addr | hostmask);
drv->filt[i][2] = __builtin_bswap16((i << 14) | 16383);
ret = zmq_setsockopt(drv->subscriber, ZMQ_SUBSCRIBE, &drv->filt[i][0], 2);
ret = zmq_setsockopt(drv->subscriber, ZMQ_SUBSCRIBE, &drv->filt[i][1], 2);
ret = zmq_setsockopt(drv->subscriber, ZMQ_SUBSCRIBE, &drv->filt[i][2], 2);
}
assert(ret == 0);
(void)ret;
}

int csp_zmqhub_init_filter2(const char * ifname, const char * host, uint16_t addr, uint16_t netmask, int promisc, csp_iface_t ** return_interface, char * sec_key, uint16_t subport, uint16_t pubport) {

/* ZMQ will cause valgrind errors if `sec_key` isn't exactly 40 characters long.
Expand Down Expand Up @@ -249,6 +301,9 @@ int csp_zmqhub_init_filter2(const char * ifname, const char * host, uint16_t add
drv->iface.driver_data = drv;
drv->iface.nexthop = csp_zmqhub_tx;

drv->iface.addr = addr;
drv->iface.netmask = netmask;
Copy link

@jeanbaptistelab jeanbaptistelab Aug 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That netmask assignment is new, isn't it ? Actually, so is the addr assignment.
I don't think it's an issue, I just noticed this wasn't there before. I've looked at the other csp_zmqhub_init_* functions and they do set the addr but not the netmask.
Whether it's something we should streamline or not, I don't know...

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For completeness I'll answer here as well.
Previously we made this assignment in CSH after the csp_zmqhub_init_filter2 call, which seems pretty dumb since every possible caller will have to make the same assignment (from arguments which are given to csp_zmqhub_init_filter2 no less).
For this PR to work, I need the fields to be assigned before calling the promisc functions.
So it was a good excuse to improve some wonky call semantics.


drv->context = zmq_ctx_new();
assert(drv->context != NULL);

Expand Down Expand Up @@ -293,9 +348,6 @@ int csp_zmqhub_init_filter2(const char * ifname, const char * host, uint16_t add
zmq_setsockopt(drv->subscriber, ZMQ_TCP_KEEPALIVE_IDLE, &idle, sizeof(idle));
zmq_setsockopt(drv->subscriber, ZMQ_TCP_KEEPALIVE_CNT, &cnt, sizeof(cnt));
zmq_setsockopt(drv->subscriber, ZMQ_TCP_KEEPALIVE_INTVL, &intvl, sizeof(intvl));

/* Generate filters */
uint16_t hostmask = (1 << (csp_id_get_host_bits() - netmask)) - 1;

/* Connect to server */
ret = zmq_connect(drv->publisher, pub);
Expand All @@ -305,28 +357,12 @@ int csp_zmqhub_init_filter2(const char * ifname, const char * host, uint16_t add
(void)ret;

if (promisc) {

// subscribe to all packets - no filter
ret = zmq_setsockopt(drv->subscriber, ZMQ_SUBSCRIBE, NULL, 0);
assert(ret == 0);
csp_zmqhub_remove_filters(&drv->iface);

} else {

/* This needs to be static, because ZMQ does not copy the filter value to the
* outgoing packet for each setsockopt call */
static uint16_t filt[4][3];

for (int i = 0; i < 4; i++) {
//int i = CSP_PRIO_NORM;
filt[i][0] = __builtin_bswap16((i << 14) | addr);
filt[i][1] = __builtin_bswap16((i << 14) | addr | hostmask);
filt[i][2] = __builtin_bswap16((i << 14) | 16383);
ret = zmq_setsockopt(drv->subscriber, ZMQ_SUBSCRIBE, &filt[i][0], 2);
ret = zmq_setsockopt(drv->subscriber, ZMQ_SUBSCRIBE, &filt[i][1], 2);
ret = zmq_setsockopt(drv->subscriber, ZMQ_SUBSCRIBE, &filt[i][2], 2);
}

}
csp_zmqhub_add_filters(&drv->iface);
}


/* Start RX thread */
Expand Down
Loading