From a3d7460191699f98c74809c0ad6ebd110b169979 Mon Sep 17 00:00:00 2001 From: Joel Takvorian Date: Fri, 4 Jul 2025 14:45:07 +0200 Subject: [PATCH 1/2] UDP: refresh connection at the same time as the templates refresh --- pkg/exporter/process.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/pkg/exporter/process.go b/pkg/exporter/process.go index 38575806..6bf246f7 100644 --- a/pkg/exporter/process.go +++ b/pkg/exporter/process.go @@ -296,6 +296,15 @@ func InitExportingProcess(input ExporterInput) (*ExportingProcess, error) { case <-expProc.stopCh: return case <-ticker.C: + // Dial again (e.g. host name resolving to a different IP) + klog.V(2).Info("Refreshing connection") + conn, err = net.Dial(input.CollectorProtocol, input.CollectorAddress) + if err != nil { + klog.Errorf("Cannot connect to the collector %s: %v", input.CollectorAddress, err) + } else { + expProc.connToCollector = conn + } + klog.V(2).Info("Sending refreshed templates to the collector") err := expProc.sendRefreshedTemplates() if err != nil { From 407c539ea1016677f3c07d6b8e3443bd218f287f Mon Sep 17 00:00:00 2001 From: Joel Takvorian Date: Tue, 8 Jul 2025 13:51:23 +0200 Subject: [PATCH 2/2] concurrent-safe use of connection --- pkg/exporter/process.go | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/pkg/exporter/process.go b/pkg/exporter/process.go index 6bf246f7..4d0fd8dd 100644 --- a/pkg/exporter/process.go +++ b/pkg/exporter/process.go @@ -52,6 +52,7 @@ type templateValue struct { // maxMsgSize is not set correctly, the message may be fragmented. type ExportingProcess struct { connToCollector net.Conn + connMut sync.Mutex obsDomainID uint32 seqNumber uint32 templateID uint16 @@ -302,7 +303,9 @@ func InitExportingProcess(input ExporterInput) (*ExportingProcess, error) { if err != nil { klog.Errorf("Cannot connect to the collector %s: %v", input.CollectorAddress, err) } else { + expProc.connMut.Lock() expProc.connToCollector = conn + expProc.connMut.Unlock() } klog.V(2).Info("Sending refreshed templates to the collector") @@ -449,6 +452,8 @@ func (ep *ExportingProcess) closeConnToCollector() { } klog.Info("Closing connection to the collector") close(ep.stopCh) + ep.connMut.Lock() + defer ep.connMut.Unlock() if err := ep.connToCollector.Close(); err != nil { // Just log the error that happened when closing the connection. Not returning error // as we do not expect library consumers to exit their programs with this error. @@ -459,6 +464,8 @@ func (ep *ExportingProcess) closeConnToCollector() { // checkConnToCollector checks whether the connection from exporter is still open // by trying to read from connection. Closed connection will return EOF from read. func (ep *ExportingProcess) checkConnToCollector(oneByteForRead []byte) bool { + ep.connMut.Lock() + defer ep.connMut.Unlock() ep.connToCollector.SetReadDeadline(time.Now().Add(time.Millisecond)) if _, err := ep.connToCollector.Read(oneByteForRead); err == io.EOF { return false @@ -484,6 +491,8 @@ func (ep *ExportingProcess) createAndSendIPFIXMsg(set entities.Set, buf *bytes.B } // Send the message on the exporter connection. + ep.connMut.Lock() + defer ep.connMut.Unlock() bytesSent, err := ep.connToCollector.Write(buf.Bytes()) if err != nil { @@ -508,6 +517,8 @@ func (ep *ExportingProcess) createAndSendJSONRecords(records []entities.Record, bytesSent := 0 elements := make(map[string]interface{}) message := make(map[string]interface{}, 2) + ep.connMut.Lock() + defer ep.connMut.Unlock() for _, record := range records { clear(elements) orderedElements := record.GetOrderedElementList()