From b028660ba938da40888af71d5455c5b1e1676aa3 Mon Sep 17 00:00:00 2001 From: orgalorg <65609811+zachary-rote@users.noreply.github.com> Date: Thu, 4 May 2023 15:24:47 -0400 Subject: [PATCH 1/3] close producer when kafka-producer-actor terminates --- .../kafka/KafkaProducerActorImpl.scala | 1 + .../kafka/KafkaProducerActorImplSpec.scala | 48 ++++++++++++------- 2 files changed, 33 insertions(+), 16 deletions(-) diff --git a/modules/command-engine/core/src/main/scala/surge/internal/kafka/KafkaProducerActorImpl.scala b/modules/command-engine/core/src/main/scala/surge/internal/kafka/KafkaProducerActorImpl.scala index a02604bf0..2ee4bc313 100644 --- a/modules/command-engine/core/src/main/scala/surge/internal/kafka/KafkaProducerActorImpl.scala +++ b/modules/command-engine/core/src/main/scala/surge/internal/kafka/KafkaProducerActorImpl.scala @@ -156,6 +156,7 @@ class KafkaProducerActorImpl( flushMessagesScheduledTask.cancel() checkKTableLagScheduledTask.cancel() clearExpiredTrackersScheduledTask.cancel() + kafkaPublisher.close() super.postStop() } diff --git a/modules/command-engine/core/src/test/scala/surge/internal/kafka/KafkaProducerActorImplSpec.scala b/modules/command-engine/core/src/test/scala/surge/internal/kafka/KafkaProducerActorImplSpec.scala index 17318c142..acbc97379 100644 --- a/modules/command-engine/core/src/test/scala/surge/internal/kafka/KafkaProducerActorImplSpec.scala +++ b/modules/command-engine/core/src/test/scala/surge/internal/kafka/KafkaProducerActorImplSpec.scala @@ -2,32 +2,32 @@ package surge.internal.kafka -import akka.actor.{ ActorRef, ActorSystem, Props } +import akka.actor.{ActorRef, ActorSystem, Props} import akka.pattern.ask -import akka.testkit.{ TestKit, TestProbe } +import akka.testkit.{TestKit, TestProbe} import akka.util.Timeout -import com.typesafe.config.{ Config, ConfigFactory, ConfigValueFactory } -import org.apache.kafka.clients.producer.{ ProducerRecord, RecordMetadata } +import com.typesafe.config.{Config, ConfigFactory, ConfigValueFactory} +import org.apache.kafka.clients.producer.{ProducerRecord, RecordMetadata} import org.apache.kafka.common.TopicPartition -import org.apache.kafka.common.errors.{ AuthorizationException, ProducerFencedException } -import org.apache.kafka.common.header.internals.{ RecordHeader, RecordHeaders } -import org.mockito.ArgumentMatchers.{ any, same } +import org.apache.kafka.common.errors.{AuthorizationException, ProducerFencedException} +import org.apache.kafka.common.header.internals.{RecordHeader, RecordHeaders} +import org.mockito.ArgumentMatchers.{any, same} import org.mockito.Mockito._ -import org.mockito.{ ArgumentMatchers, Mockito } +import org.mockito.{ArgumentMatchers, Mockito} import org.scalatest.BeforeAndAfterAll -import org.scalatest.concurrent.{ Eventually, PatienceConfiguration, ScalaFutures } +import org.scalatest.concurrent.{Eventually, PatienceConfiguration, ScalaFutures} import org.scalatest.matchers.should.Matchers -import org.scalatest.time.{ Millis, Seconds, Span } +import org.scalatest.time.{Millis, Seconds, Span} import org.scalatest.wordspec.AnyWordSpecLike import org.scalatestplus.mockito.MockitoSugar -import surge.core.KafkaProducerActor.{ PublishFailure, PublishResult, PublishSuccess, PublishTracker, PublishTrackerWithExpiry } -import surge.core.{ KafkaProducerActor, TestBoundedContext } +import surge.core.KafkaProducerActor.{PublishFailure, PublishResult, PublishSuccess, PublishTracker, PublishTrackerWithExpiry} +import surge.core.{KafkaProducerActor, TestBoundedContext} import surge.health.HealthSignalBusTrait import surge.health.domain.EmittableHealthSignal import surge.internal.akka.cluster.ActorSystemHostAwareness -import surge.internal.akka.kafka.{ KafkaConsumerPartitionAssignmentTracker, KafkaConsumerStateTrackingActor } -import surge.internal.health.{ HealthCheck, HealthCheckStatus } -import surge.internal.kafka.KafkaProducerActorImpl.{ KTableProgressUpdate, PublishTrackerStateManager, SenderWithTrackingId } +import surge.internal.akka.kafka.{KafkaConsumerPartitionAssignmentTracker, KafkaConsumerStateTrackingActor} +import surge.internal.health.{HealthCheck, HealthCheckStatus} +import surge.internal.kafka.KafkaProducerActorImpl.{KTableProgressUpdate, PublishTrackerStateManager, SenderWithTrackingId, ShutdownProducer} import surge.kafka._ import surge.internal.health.HealthyActor.GetHealth import surge.kafka.streams.ExpectedTestException @@ -36,7 +36,7 @@ import surge.metrics.Metrics import java.time.Instant import java.util.UUID import scala.concurrent.duration._ -import scala.concurrent.{ ExecutionContext, Future } +import scala.concurrent.{ExecutionContext, Future} import scala.util.control.NoStackTrace class KafkaProducerActorImplSpec @@ -678,6 +678,22 @@ class KafkaProducerActorImplSpec probe.expectMsgClass(classOf[PublishSuccess]) verify(mockProducerFenceOnCommit).putRecords(records(testMessages2)) } + + "Close Kafka Producer on Shutdown" in { + val probe = TestProbe() + val mockPartitionTracker = mock[KafkaConsumerPartitionAssignmentTracker] + val mockProducer = mock[GenericKafkaProducer[String, Array[Byte]]] + + val mockLagChecker = mockKTableLagChecker(assignedPartition, 100L, 100L) + val producerActorToShutdown = + testProducerActor(assignedPartition, mockProducer, mockLagChecker, new PublishTrackerStateManager(), mockPartitionTracker) + + + // Send Shutdown Command + probe.send(producerActorToShutdown, ShutdownProducer) + + verify(mockProducer, Mockito.timeout(1000).times(1)).close() + } } "KafkaProducerActorState" should { From b32e6bd51dde5fa07bfa318fa9d1579f2ccdabd0 Mon Sep 17 00:00:00 2001 From: orgalorg <65609811+zachary-rote@users.noreply.github.com> Date: Thu, 4 May 2023 15:28:41 -0400 Subject: [PATCH 2/3] change formatting --- .../kafka/KafkaProducerActorImplSpec.scala | 33 +++++++++---------- 1 file changed, 16 insertions(+), 17 deletions(-) diff --git a/modules/command-engine/core/src/test/scala/surge/internal/kafka/KafkaProducerActorImplSpec.scala b/modules/command-engine/core/src/test/scala/surge/internal/kafka/KafkaProducerActorImplSpec.scala index acbc97379..05b8a4c83 100644 --- a/modules/command-engine/core/src/test/scala/surge/internal/kafka/KafkaProducerActorImplSpec.scala +++ b/modules/command-engine/core/src/test/scala/surge/internal/kafka/KafkaProducerActorImplSpec.scala @@ -2,32 +2,32 @@ package surge.internal.kafka -import akka.actor.{ActorRef, ActorSystem, Props} +import akka.actor.{ ActorRef, ActorSystem, Props } import akka.pattern.ask -import akka.testkit.{TestKit, TestProbe} +import akka.testkit.{ TestKit, TestProbe } import akka.util.Timeout -import com.typesafe.config.{Config, ConfigFactory, ConfigValueFactory} -import org.apache.kafka.clients.producer.{ProducerRecord, RecordMetadata} +import com.typesafe.config.{ Config, ConfigFactory, ConfigValueFactory } +import org.apache.kafka.clients.producer.{ ProducerRecord, RecordMetadata } import org.apache.kafka.common.TopicPartition -import org.apache.kafka.common.errors.{AuthorizationException, ProducerFencedException} -import org.apache.kafka.common.header.internals.{RecordHeader, RecordHeaders} -import org.mockito.ArgumentMatchers.{any, same} +import org.apache.kafka.common.errors.{ AuthorizationException, ProducerFencedException } +import org.apache.kafka.common.header.internals.{ RecordHeader, RecordHeaders } +import org.mockito.ArgumentMatchers.{ any, same } import org.mockito.Mockito._ -import org.mockito.{ArgumentMatchers, Mockito} +import org.mockito.{ ArgumentMatchers, Mockito } import org.scalatest.BeforeAndAfterAll -import org.scalatest.concurrent.{Eventually, PatienceConfiguration, ScalaFutures} +import org.scalatest.concurrent.{ Eventually, PatienceConfiguration, ScalaFutures } import org.scalatest.matchers.should.Matchers -import org.scalatest.time.{Millis, Seconds, Span} +import org.scalatest.time.{ Millis, Seconds, Span } import org.scalatest.wordspec.AnyWordSpecLike import org.scalatestplus.mockito.MockitoSugar -import surge.core.KafkaProducerActor.{PublishFailure, PublishResult, PublishSuccess, PublishTracker, PublishTrackerWithExpiry} -import surge.core.{KafkaProducerActor, TestBoundedContext} +import surge.core.KafkaProducerActor.{ PublishFailure, PublishResult, PublishSuccess, PublishTracker, PublishTrackerWithExpiry } +import surge.core.{ KafkaProducerActor, TestBoundedContext } import surge.health.HealthSignalBusTrait import surge.health.domain.EmittableHealthSignal import surge.internal.akka.cluster.ActorSystemHostAwareness -import surge.internal.akka.kafka.{KafkaConsumerPartitionAssignmentTracker, KafkaConsumerStateTrackingActor} -import surge.internal.health.{HealthCheck, HealthCheckStatus} -import surge.internal.kafka.KafkaProducerActorImpl.{KTableProgressUpdate, PublishTrackerStateManager, SenderWithTrackingId, ShutdownProducer} +import surge.internal.akka.kafka.{ KafkaConsumerPartitionAssignmentTracker, KafkaConsumerStateTrackingActor } +import surge.internal.health.{ HealthCheck, HealthCheckStatus } +import surge.internal.kafka.KafkaProducerActorImpl.{ KTableProgressUpdate, PublishTrackerStateManager, SenderWithTrackingId, ShutdownProducer } import surge.kafka._ import surge.internal.health.HealthyActor.GetHealth import surge.kafka.streams.ExpectedTestException @@ -36,7 +36,7 @@ import surge.metrics.Metrics import java.time.Instant import java.util.UUID import scala.concurrent.duration._ -import scala.concurrent.{ExecutionContext, Future} +import scala.concurrent.{ ExecutionContext, Future } import scala.util.control.NoStackTrace class KafkaProducerActorImplSpec @@ -688,7 +688,6 @@ class KafkaProducerActorImplSpec val producerActorToShutdown = testProducerActor(assignedPartition, mockProducer, mockLagChecker, new PublishTrackerStateManager(), mockPartitionTracker) - // Send Shutdown Command probe.send(producerActorToShutdown, ShutdownProducer) From 6cb75602d02cc12e9bb3d14b43d5d5ac3450e220 Mon Sep 17 00:00:00 2001 From: orgalorg <65609811+zachary-rote@users.noreply.github.com> Date: Thu, 4 May 2023 15:41:36 -0400 Subject: [PATCH 3/3] expect atleast one close --- .../scala/surge/internal/kafka/KafkaProducerActorImplSpec.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/command-engine/core/src/test/scala/surge/internal/kafka/KafkaProducerActorImplSpec.scala b/modules/command-engine/core/src/test/scala/surge/internal/kafka/KafkaProducerActorImplSpec.scala index 05b8a4c83..9d69de75f 100644 --- a/modules/command-engine/core/src/test/scala/surge/internal/kafka/KafkaProducerActorImplSpec.scala +++ b/modules/command-engine/core/src/test/scala/surge/internal/kafka/KafkaProducerActorImplSpec.scala @@ -691,7 +691,7 @@ class KafkaProducerActorImplSpec // Send Shutdown Command probe.send(producerActorToShutdown, ShutdownProducer) - verify(mockProducer, Mockito.timeout(1000).times(1)).close() + verify(mockProducer, Mockito.timeout(1000).atLeast(1)).close() } }