diff --git a/pkg/exporter/process.go b/pkg/exporter/process.go index 38575806..4d0fd8dd 100644 --- a/pkg/exporter/process.go +++ b/pkg/exporter/process.go @@ -52,6 +52,7 @@ type templateValue struct { // maxMsgSize is not set correctly, the message may be fragmented. type ExportingProcess struct { connToCollector net.Conn + connMut sync.Mutex obsDomainID uint32 seqNumber uint32 templateID uint16 @@ -296,6 +297,17 @@ func InitExportingProcess(input ExporterInput) (*ExportingProcess, error) { case <-expProc.stopCh: return case <-ticker.C: + // Dial again (e.g. host name resolving to a different IP) + klog.V(2).Info("Refreshing connection") + conn, err = net.Dial(input.CollectorProtocol, input.CollectorAddress) + if err != nil { + klog.Errorf("Cannot connect to the collector %s: %v", input.CollectorAddress, err) + } else { + expProc.connMut.Lock() + expProc.connToCollector = conn + expProc.connMut.Unlock() + } + klog.V(2).Info("Sending refreshed templates to the collector") err := expProc.sendRefreshedTemplates() if err != nil { @@ -440,6 +452,8 @@ func (ep *ExportingProcess) closeConnToCollector() { } klog.Info("Closing connection to the collector") close(ep.stopCh) + ep.connMut.Lock() + defer ep.connMut.Unlock() if err := ep.connToCollector.Close(); err != nil { // Just log the error that happened when closing the connection. Not returning error // as we do not expect library consumers to exit their programs with this error. @@ -450,6 +464,8 @@ func (ep *ExportingProcess) closeConnToCollector() { // checkConnToCollector checks whether the connection from exporter is still open // by trying to read from connection. Closed connection will return EOF from read. func (ep *ExportingProcess) checkConnToCollector(oneByteForRead []byte) bool { + ep.connMut.Lock() + defer ep.connMut.Unlock() ep.connToCollector.SetReadDeadline(time.Now().Add(time.Millisecond)) if _, err := ep.connToCollector.Read(oneByteForRead); err == io.EOF { return false @@ -475,6 +491,8 @@ func (ep *ExportingProcess) createAndSendIPFIXMsg(set entities.Set, buf *bytes.B } // Send the message on the exporter connection. + ep.connMut.Lock() + defer ep.connMut.Unlock() bytesSent, err := ep.connToCollector.Write(buf.Bytes()) if err != nil { @@ -499,6 +517,8 @@ func (ep *ExportingProcess) createAndSendJSONRecords(records []entities.Record, bytesSent := 0 elements := make(map[string]interface{}) message := make(map[string]interface{}, 2) + ep.connMut.Lock() + defer ep.connMut.Unlock() for _, record := range records { clear(elements) orderedElements := record.GetOrderedElementList()