diff --git a/modules/command-engine/core/src/main/scala/surge/internal/kafka/KafkaProducerActorImpl.scala b/modules/command-engine/core/src/main/scala/surge/internal/kafka/KafkaProducerActorImpl.scala index a02604bf0..2ee4bc313 100644 --- a/modules/command-engine/core/src/main/scala/surge/internal/kafka/KafkaProducerActorImpl.scala +++ b/modules/command-engine/core/src/main/scala/surge/internal/kafka/KafkaProducerActorImpl.scala @@ -156,6 +156,7 @@ class KafkaProducerActorImpl( flushMessagesScheduledTask.cancel() checkKTableLagScheduledTask.cancel() clearExpiredTrackersScheduledTask.cancel() + kafkaPublisher.close() super.postStop() } diff --git a/modules/command-engine/core/src/test/scala/surge/internal/kafka/KafkaProducerActorImplSpec.scala b/modules/command-engine/core/src/test/scala/surge/internal/kafka/KafkaProducerActorImplSpec.scala index 17318c142..9d69de75f 100644 --- a/modules/command-engine/core/src/test/scala/surge/internal/kafka/KafkaProducerActorImplSpec.scala +++ b/modules/command-engine/core/src/test/scala/surge/internal/kafka/KafkaProducerActorImplSpec.scala @@ -27,7 +27,7 @@ import surge.health.domain.EmittableHealthSignal import surge.internal.akka.cluster.ActorSystemHostAwareness import surge.internal.akka.kafka.{ KafkaConsumerPartitionAssignmentTracker, KafkaConsumerStateTrackingActor } import surge.internal.health.{ HealthCheck, HealthCheckStatus } -import surge.internal.kafka.KafkaProducerActorImpl.{ KTableProgressUpdate, PublishTrackerStateManager, SenderWithTrackingId } +import surge.internal.kafka.KafkaProducerActorImpl.{ KTableProgressUpdate, PublishTrackerStateManager, SenderWithTrackingId, ShutdownProducer } import surge.kafka._ import surge.internal.health.HealthyActor.GetHealth import surge.kafka.streams.ExpectedTestException @@ -678,6 +678,21 @@ class KafkaProducerActorImplSpec probe.expectMsgClass(classOf[PublishSuccess]) verify(mockProducerFenceOnCommit).putRecords(records(testMessages2)) } + + "Close Kafka Producer on Shutdown" in { + val probe = TestProbe() + val mockPartitionTracker = mock[KafkaConsumerPartitionAssignmentTracker] + val mockProducer = mock[GenericKafkaProducer[String, Array[Byte]]] + + val mockLagChecker = mockKTableLagChecker(assignedPartition, 100L, 100L) + val producerActorToShutdown = + testProducerActor(assignedPartition, mockProducer, mockLagChecker, new PublishTrackerStateManager(), mockPartitionTracker) + + // Send Shutdown Command + probe.send(producerActorToShutdown, ShutdownProducer) + + verify(mockProducer, Mockito.timeout(1000).atLeast(1)).close() + } } "KafkaProducerActorState" should {