diff --git a/src/main/scala/com/comcast/xfinity/sirius/api/impl/Sirius1Dot2Extensions.scala b/src/main/scala/com/comcast/xfinity/sirius/api/impl/Sirius1Dot2Extensions.scala new file mode 100644 index 00000000..ef0514ed --- /dev/null +++ b/src/main/scala/com/comcast/xfinity/sirius/api/impl/Sirius1Dot2Extensions.scala @@ -0,0 +1,36 @@ +/** + * Copyright 2014 Comcast Cable Communications Management, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.comcast.xfinity.sirius.api.impl + +import com.comcast.xfinity.sirius.api.Sirius + +/** + * These are additional methods supported by an expanded Sirius interface. + */ +trait Sirius1Dot2Extensions { + /** + * Register a callback to be invoked once the Sirius subsystem has been + * initialized (i.e. log replay has completed). This may be called + * multiple times to install multiple init hook callbacks; each will be + * called once upon initialization. If Sirius has already been + * initialized, the callback will be invoked right away. + * + * @param initHook callback to run + */ + def onInitialized(initHook: Runnable): Unit +} + +trait Sirius1Dot2 extends Sirius with Sirius1Dot2Extensions diff --git a/src/main/scala/com/comcast/xfinity/sirius/api/impl/SiriusImpl.scala b/src/main/scala/com/comcast/xfinity/sirius/api/impl/SiriusImpl.scala index 55a6262e..195189e0 100644 --- a/src/main/scala/com/comcast/xfinity/sirius/api/impl/SiriusImpl.scala +++ b/src/main/scala/com/comcast/xfinity/sirius/api/impl/SiriusImpl.scala @@ -25,11 +25,12 @@ import akka.actor._ import java.util.concurrent.Future import com.comcast.xfinity.sirius.writeaheadlog.SiriusLog import com.comcast.xfinity.sirius.api.SiriusConfiguration -import scala.concurrent.{Await, Future => AkkaFuture} +import scala.concurrent.{Future => AkkaFuture, ExecutionContext, Await} import akka.util.Timeout import scala.concurrent.duration._ import status.NodeStats.FullNodeStatus import status.StatusWorker._ +import scala.concurrent.ExecutionContext.Implicits.global object SiriusImpl { @@ -61,7 +62,7 @@ object SiriusImpl { * @param actorSystem the actorSystem to use to create the Actors for Sirius */ class SiriusImpl(config: SiriusConfiguration, supProps: Props)(implicit val actorSystem: ActorSystem) - extends Sirius { + extends Sirius1Dot2 { val supName = config.getProp(SiriusConfiguration.SIRIUS_SUPERVISOR_NAME, "sirius") implicit val timeout: Timeout = @@ -131,6 +132,12 @@ class SiriusImpl(config: SiriusConfiguration, supProps: Props)(implicit val acto onShutdownHook = Some(() => shutdownHook) } + def onInitialized(initHook: Runnable) { + (supervisor ? SiriusSupervisor.RegisterInitHook) onSuccess { + case _ => initHook.run() + } + } + /** * Terminate this instance. Shuts down all associated Actors. */ diff --git a/src/main/scala/com/comcast/xfinity/sirius/api/impl/SiriusSupervisor.scala b/src/main/scala/com/comcast/xfinity/sirius/api/impl/SiriusSupervisor.scala index 5d1d027e..4f2a92bb 100644 --- a/src/main/scala/com/comcast/xfinity/sirius/api/impl/SiriusSupervisor.scala +++ b/src/main/scala/com/comcast/xfinity/sirius/api/impl/SiriusSupervisor.scala @@ -44,6 +44,8 @@ object SiriusSupervisor { case object CheckPaxosMembership extends SupervisorMessage case class IsInitializedResponse(initialized: Boolean) + case object RegisterInitHook + case object Initialized /** * Factory for creating the children actors of SiriusSupervisor. @@ -133,6 +135,8 @@ private[impl] class SiriusSupervisor(childProvider: ChildProvider, config: Siriu val membershipCheckSchedule = context.system.scheduler. schedule(0 seconds, checkIntervalSecs seconds, self, CheckPaxosMembership) + var initHookClients : Set[ActorRef] = Set.empty + override def postStop() { membershipCheckSchedule.cancel() } @@ -149,9 +153,13 @@ private[impl] class SiriusSupervisor(childProvider: ChildProvider, config: Siriu context.become(initialized) sender ! SiriusSupervisor.IsInitializedResponse(initialized = true) + + initHookClients foreach { _ ! SiriusSupervisor.Initialized } + initHookClients = Set.empty } else { sender ! SiriusSupervisor.IsInitializedResponse(initialized = false) } + case SiriusSupervisor.RegisterInitHook => initHookClients += sender // Ignore other messages until Initialized. case _ => @@ -162,6 +170,7 @@ private[impl] class SiriusSupervisor(childProvider: ChildProvider, config: Siriu case logQuery: LogQuery => stateSup forward logQuery case membershipMessage: MembershipMessage => membershipActor forward membershipMessage case SiriusSupervisor.IsInitializedRequest => sender ! new SiriusSupervisor.IsInitializedResponse(true) + case SiriusSupervisor.RegisterInitHook => sender ! SiriusSupervisor.Initialized case statusQuery: StatusQuery => statusSubsystem forward statusQuery case compactionMessage: CompactionMessage => compactionManager match { case Some(actor) => actor forward compactionMessage diff --git a/src/test/scala/com/comcast/xfinity/sirius/api/impl/SiriusImplTest.scala b/src/test/scala/com/comcast/xfinity/sirius/api/impl/SiriusImplTest.scala index b5dec599..d1da2fe4 100644 --- a/src/test/scala/com/comcast/xfinity/sirius/api/impl/SiriusImplTest.scala +++ b/src/test/scala/com/comcast/xfinity/sirius/api/impl/SiriusImplTest.scala @@ -30,6 +30,9 @@ import com.comcast.xfinity.sirius.{TimedTest, NiceTest} import com.comcast.xfinity.sirius.api.{SiriusConfiguration, SiriusResult} import status.NodeStats.FullNodeStatus import status.StatusWorker._ +import scala.concurrent.ExecutionContext.Implicits.global +import com.comcast.xfinity.sirius.api.impl.SiriusSupervisor.{Initialized, RegisterInitHook} +import scala.concurrent.{Await, Promise} object SiriusImplTestCompanion { @@ -91,6 +94,9 @@ class SiriusImplTest extends NiceTest with TimedTest { case GetStatus => sender ! mockNodeStatus this + case RegisterInitHook => + sender ! Initialized + this } }) @@ -105,6 +111,17 @@ class SiriusImplTest extends NiceTest with TimedTest { } describe("a SiriusImpl") { + it("should send a RegisterInitHook message to the supervisor actor when onInitialized is called") { + underTest.onInitialized(new Runnable() { def run() { } }) + supervisorActorProbe.expectMsg(SiriusSupervisor.RegisterInitHook) + } + + it("should call the initHook once the supervisor actor responds with an Initialized message") { + val p : Promise[Boolean] = Promise() + underTest.onInitialized(new Runnable() { def run() { p.success(true) }}) + assert(Await.result(p.future, 50 millis)) + } + it("should send a Get message to the supervisor actor when enqueueGet is called") { val key = "hello" val getFuture = underTest.enqueueGet(key) diff --git a/src/test/scala/com/comcast/xfinity/sirius/api/impl/SiriusSupervisorTest.scala b/src/test/scala/com/comcast/xfinity/sirius/api/impl/SiriusSupervisorTest.scala index 9caf90e3..271bcc63 100644 --- a/src/test/scala/com/comcast/xfinity/sirius/api/impl/SiriusSupervisorTest.scala +++ b/src/test/scala/com/comcast/xfinity/sirius/api/impl/SiriusSupervisorTest.scala @@ -116,6 +116,32 @@ class SiriusSupervisorTest extends NiceTest with BeforeAndAfterAll with TimedTes waitForTrue(stateAgent().supervisorInitialized, 5000, 250) } + it("should reply to a registered initHook once initialized") { + val probe = TestProbe() + probe.send(supervisor, SiriusSupervisor.RegisterInitHook) + initializeSupervisor(supervisor) + probe.expectMsg(SiriusSupervisor.Initialized) + } + + it("should reply to all registered initHooks once initialized") { + val probe1 = TestProbe() + val probe2 = TestProbe() + probe1.send(supervisor, SiriusSupervisor.RegisterInitHook) + probe2.send(supervisor, SiriusSupervisor.RegisterInitHook) + initializeSupervisor(supervisor) + probe1.expectMsg(SiriusSupervisor.Initialized) + probe2.expectMsg(SiriusSupervisor.Initialized) + } + + it("should reply immediately to an initHook registration once already initialized") { + val probe = TestProbe() + val stateAgent = supervisor.underlyingActor.siriusStateAgent + initializeSupervisor(supervisor) + waitForTrue(stateAgent().supervisorInitialized, 5000, 250) + probe.send(supervisor, SiriusSupervisor.RegisterInitHook) + probe.expectMsg(SiriusSupervisor.Initialized) + } + it("should forward MembershipMessages to the membershipActor") { initializeSupervisor(supervisor) initializeOrdering(supervisor, Some(paxosProbe.ref))