From 0aa0d303020bebe4bf7b49575bf9df1628e4a48d Mon Sep 17 00:00:00 2001 From: "Spindler, Justin" Date: Sat, 15 Feb 2025 18:12:09 -0500 Subject: [PATCH 1/8] Add GetLogRangeLimit message to read log ranges by limit of events --- .../impl/state/SiriusPersistenceActor.scala | 38 +++++++--- .../xfinity/sirius/uberstore/UberPair.scala | 5 ++ .../xfinity/sirius/uberstore/UberStore.scala | 7 ++ .../sirius/uberstore/data/UberDataFile.scala | 29 ++++++- .../sirius/uberstore/segmented/Segment.scala | 16 +++- .../segmented/SegmentedUberStore.scala | 9 +++ .../uberstore/seqindex/DiskOnlySeqIndex.scala | 20 ++++- .../sirius/uberstore/seqindex/SeqIndex.scala | 7 ++ .../writeaheadlog/CachedSiriusLog.scala | 3 + .../sirius/writeaheadlog/SiriusLog.scala | 9 +++ .../state/SiriusPersistenceActorTest.scala | 75 ++++++++++++++++--- .../sirius/itest/DoNothingSiriusLog.scala | 2 + .../sirius/uberstore/UberToolTest.scala | 28 ++++++- 13 files changed, 221 insertions(+), 27 deletions(-) diff --git a/src/main/scala/com/comcast/xfinity/sirius/api/impl/state/SiriusPersistenceActor.scala b/src/main/scala/com/comcast/xfinity/sirius/api/impl/state/SiriusPersistenceActor.scala index 686ac709..f7d59dda 100644 --- a/src/main/scala/com/comcast/xfinity/sirius/api/impl/state/SiriusPersistenceActor.scala +++ b/src/main/scala/com/comcast/xfinity/sirius/api/impl/state/SiriusPersistenceActor.scala @@ -15,12 +15,14 @@ */ package com.comcast.xfinity.sirius.api.impl.state -import akka.actor.{Props, Actor, ActorRef} +import akka.actor.{Actor, ActorRef, Props} import com.comcast.xfinity.sirius.writeaheadlog.SiriusLog import com.comcast.xfinity.sirius.api.{SiriusConfiguration, SiriusResult} import com.comcast.xfinity.sirius.api.impl._ import com.comcast.xfinity.sirius.admin.MonitoringHooks +import scala.collection.mutable.ListBuffer + object SiriusPersistenceActor { /** @@ -41,6 +43,7 @@ object SiriusPersistenceActor { * @param end last sequence number of the range, inclusive */ case class GetLogSubrange(begin: Long, end: Long) extends LogQuery + case class GetLogRangeLimit(begin: Long, events: Long) extends LogQuery trait LogSubrange trait PopulatedSubrange extends LogSubrange { @@ -129,21 +132,34 @@ class SiriusPersistenceActor(stateActor: ActorRef, lastWriteTime = thisWriteTime + case GetLogRangeLimit(start, limit) => + val buffer = siriusLog.foldLeftWhile(start)(ListBuffer.empty[OrderedEvent])(buffer => buffer.length < limit)( + (acc, event) => acc += event + ) + if (buffer.isEmpty) { + sender ! EmptySubrange + } + if (buffer.length < limit) { + sender ! PartialSubrange(buffer.head.sequence, buffer.last.sequence, buffer.toList) + } else { + sender ! CompleteSubrange(buffer.head.sequence, buffer.last.sequence, buffer.toList) + } + case GetLogSubrange(rangeStart, rangeEnd) if rangeEnd < siriusLog.getNextSeq => // we can answer fully - val events = siriusLog.foldLeftRange(rangeStart, rangeEnd)(List[OrderedEvent]())( - (acc, event) => event :: acc - ).reverse - sender ! CompleteSubrange(rangeStart, rangeEnd, events) + val events = siriusLog.foldLeftRange(rangeStart, rangeEnd)(ListBuffer.empty[OrderedEvent])( + (acc, event) => acc += event + ) + sender ! CompleteSubrange(rangeStart, rangeEnd, events.toList) - case GetLogSubrange(rangeStart, rangeEnd) if siriusLog.getNextSeq <= rangeStart => // we can't send anything useful back + case GetLogSubrange(rangeStart, _) if siriusLog.getNextSeq <= rangeStart => // we can't send anything useful back sender ! EmptySubrange - case GetLogSubrange(rangeStart, rangeEnd) => // we can respond partially + case GetLogSubrange(rangeStart, _) => // we can respond partially val lastSeq = siriusLog.getNextSeq - 1 - val events = siriusLog.foldLeftRange(rangeStart, lastSeq)(List[OrderedEvent]())( - (acc, event) => event :: acc - ).reverse - sender ! PartialSubrange(rangeStart, lastSeq, events) + val events = siriusLog.foldLeftRange(rangeStart, lastSeq)(ListBuffer.empty[OrderedEvent])( + (acc, event) => acc += event + ) + sender ! PartialSubrange(rangeStart, lastSeq, events.toList) case GetNextLogSeq => sender ! siriusLog.getNextSeq diff --git a/src/main/scala/com/comcast/xfinity/sirius/uberstore/UberPair.scala b/src/main/scala/com/comcast/xfinity/sirius/uberstore/UberPair.scala index 439549d1..ac433e53 100644 --- a/src/main/scala/com/comcast/xfinity/sirius/uberstore/UberPair.scala +++ b/src/main/scala/com/comcast/xfinity/sirius/uberstore/UberPair.scala @@ -107,6 +107,11 @@ class UberPair(dataFile: UberDataFile, index: SeqIndex) { ) } + def foldLeftWhile[T](startSeq: Long)(acc0: T)(pred: T => Boolean)(foldFun: (T, OrderedEvent) => T): T = { + val (startOffset, _) = index.getOffsetRange(startSeq, Long.MaxValue) + dataFile.foldLeftWhile(startOffset)(acc0)(pred)(foldFun) + } + /** * Close underlying file handles or connections. This UberStoreFilePair should not be used after * close is called. diff --git a/src/main/scala/com/comcast/xfinity/sirius/uberstore/UberStore.scala b/src/main/scala/com/comcast/xfinity/sirius/uberstore/UberStore.scala index 10e026cf..1de8eb9b 100644 --- a/src/main/scala/com/comcast/xfinity/sirius/uberstore/UberStore.scala +++ b/src/main/scala/com/comcast/xfinity/sirius/uberstore/UberStore.scala @@ -82,6 +82,13 @@ class UberStore private[uberstore] (baseDir: String, uberpair: UberPair) extends def foldLeftRange[T](startSeq: Long, endSeq: Long)(acc0: T)(foldFun: (T, OrderedEvent) => T): T = uberpair.foldLeftRange(startSeq, endSeq)(acc0)(foldFun) + /** + * @inheritdoc + */ + def foldLeftWhile[T](startSeq: Long)(acc0: T)(pred: T => Boolean)(foldFun: (T, OrderedEvent) => T): T = { + uberpair.foldLeftWhile(startSeq)(acc0)(pred)(foldFun) + } + /** * Close underlying file handles or connections. This UberStore should not be used after * close is called. diff --git a/src/main/scala/com/comcast/xfinity/sirius/uberstore/data/UberDataFile.scala b/src/main/scala/com/comcast/xfinity/sirius/uberstore/data/UberDataFile.scala index f77bfaa6..64e576ba 100644 --- a/src/main/scala/com/comcast/xfinity/sirius/uberstore/data/UberDataFile.scala +++ b/src/main/scala/com/comcast/xfinity/sirius/uberstore/data/UberDataFile.scala @@ -101,7 +101,16 @@ private[uberstore] class UberDataFile(dataFileName: String, def foldLeftRange[T](baseOff: Long, endOff: Long)(acc0: T)(foldFun: (T, Long, OrderedEvent) => T): T = { val readHandle = fileHandleFactory.createReadHandle(dataFileName, baseOff) try { - foldLeftUntil(readHandle, endOff, acc0, foldFun) + foldLeftUntilOffset(readHandle, endOff, acc0, foldFun) + } finally { + readHandle.close() + } + } + + def foldLeftWhile[T](baseOff:Long)(acc0: T)(pred: T => Boolean)(foldFun: (T, OrderedEvent) => T): T = { + val readHandle = fileHandleFactory.createReadHandle(dataFileName, baseOff) + try { + foldLeftWhile(readHandle, acc0, pred, foldFun) } finally { readHandle.close() } @@ -109,7 +118,7 @@ private[uberstore] class UberDataFile(dataFileName: String, // private low low low level fold left @tailrec - private def foldLeftUntil[T](readHandle: UberDataFileReadHandle, maxOffset: Long, acc: T, foldFun: (T, Long, OrderedEvent) => T): T = { + private def foldLeftUntilOffset[T](readHandle: UberDataFileReadHandle, maxOffset: Long, acc: T, foldFun: (T, Long, OrderedEvent) => T): T = { val offset = readHandle.offset() if (offset > maxOffset) { acc @@ -118,7 +127,21 @@ private[uberstore] class UberDataFile(dataFileName: String, case None => acc case Some(bytes) => val accNew = foldFun(acc, offset, codec.deserialize(bytes)) - foldLeftUntil(readHandle, maxOffset, accNew, foldFun) + foldLeftUntilOffset(readHandle, maxOffset, accNew, foldFun) + } + } + } + + @tailrec + private def foldLeftWhile[T](readHandle: UberDataFileReadHandle, acc: T, pred: T => Boolean, foldFun: (T, OrderedEvent) => T): T = { + if (!pred(acc)) { + acc + } else { + fileOps.readNext(readHandle) match { + case None => acc + case Some(bytes) => + val accNew = foldFun(acc, codec.deserialize(bytes)) + foldLeftWhile(readHandle, accNew, pred, foldFun) } } } diff --git a/src/main/scala/com/comcast/xfinity/sirius/uberstore/segmented/Segment.scala b/src/main/scala/com/comcast/xfinity/sirius/uberstore/segmented/Segment.scala index d7497e33..7776274c 100644 --- a/src/main/scala/com/comcast/xfinity/sirius/uberstore/segmented/Segment.scala +++ b/src/main/scala/com/comcast/xfinity/sirius/uberstore/segmented/Segment.scala @@ -107,7 +107,7 @@ class Segment private[uberstore](val location: File, * Get the number of entries written to the Segment * @return number of entries in the Segment */ - def size = index.size + def size: Long = index.size /** * Write OrderedEvent event into this dir. Will fail if closed or sequence @@ -136,10 +136,15 @@ class Segment private[uberstore](val location: File, (acc, event) => acc + event.request.key ) + /** + * Get the first sequence number in this dir. + */ + def getFirstSeq: Option[Long] = index.getMinSeq + /** * Get the next possible sequence number in this dir. */ - def getNextSeq = index.getMaxSeq match { + def getNextSeq: Long = index.getMaxSeq match { case None => 1L case Some(seq) => seq + 1 } @@ -167,6 +172,13 @@ class Segment private[uberstore](val location: File, ) } + def foldLeftWhile[T](startSeq: Long)(acc0: T)(pred: T => Boolean)(foldFun: (T, OrderedEvent) => T): T = { + val (startOffset, _) = index.getOffsetRange(startSeq, Long.MaxValue) + dataFile.foldLeftWhile(startOffset)(acc0)(pred)( + (acc, evt) => foldFun(acc, evt) + ) + } + /** * Close underlying file handles or connections. This Segment should not be used after * close is called. diff --git a/src/main/scala/com/comcast/xfinity/sirius/uberstore/segmented/SegmentedUberStore.scala b/src/main/scala/com/comcast/xfinity/sirius/uberstore/segmented/SegmentedUberStore.scala index 58316054..e7fb6c74 100644 --- a/src/main/scala/com/comcast/xfinity/sirius/uberstore/segmented/SegmentedUberStore.scala +++ b/src/main/scala/com/comcast/xfinity/sirius/uberstore/segmented/SegmentedUberStore.scala @@ -151,6 +151,15 @@ class SegmentedUberStore private[segmented] (base: JFile, liveDir.foldLeftRange(startSeq, endSeq)(res0)(foldFun) } + /** + * @inheritdoc + */ + def foldLeftWhile[T](startSeq: Long)(acc0: T)(pred: T => Boolean)(foldFun: (T, OrderedEvent) => T): T = { + val res0 = readOnlyDirs.foldLeft(acc0)( + (acc, dir) => dir.foldLeftWhile(startSeq)(acc)(pred)(foldFun) + ) + liveDir.foldLeftWhile(startSeq)(res0)(pred)(foldFun) + } /** * Close underlying file handles or connections. This SegmentedUberStore should not be used after diff --git a/src/main/scala/com/comcast/xfinity/sirius/uberstore/seqindex/DiskOnlySeqIndex.scala b/src/main/scala/com/comcast/xfinity/sirius/uberstore/seqindex/DiskOnlySeqIndex.scala index f0927f07..2b7ce875 100644 --- a/src/main/scala/com/comcast/xfinity/sirius/uberstore/seqindex/DiskOnlySeqIndex.scala +++ b/src/main/scala/com/comcast/xfinity/sirius/uberstore/seqindex/DiskOnlySeqIndex.scala @@ -51,7 +51,17 @@ class DiskOnlySeqIndex private(handle: RandomAccessFile, var isClosed = false var size: Long = handle.length() / 24 - var maxSeq = { + private var minSeq = { + if (handle.length == 0) + None + else { + handle.seek(0) + val (seq, _) = fileOps.readEntry(handle) + Some(seq) + } + } + + private var maxSeq = { if (handle.length == 0) None else { @@ -87,6 +97,11 @@ class DiskOnlySeqIndex private(handle: RandomAccessFile, getOffsetForAux(0, handle.length) } + /** + * {@inheritdoc} + */ + def getMinSeq: Option[Long] = minSeq + /** * {@inheritdoc} */ @@ -98,6 +113,9 @@ class DiskOnlySeqIndex private(handle: RandomAccessFile, def put(seq: Long, offset: Long): Unit = synchronized { handle.seek(handle.length) fileOps.put(handle, seq, offset) + if (minSeq.isEmpty) { + minSeq = Some(seq) + } maxSeq = Some(seq) size += 1 } diff --git a/src/main/scala/com/comcast/xfinity/sirius/uberstore/seqindex/SeqIndex.scala b/src/main/scala/com/comcast/xfinity/sirius/uberstore/seqindex/SeqIndex.scala index ef91d16f..2649b3d1 100644 --- a/src/main/scala/com/comcast/xfinity/sirius/uberstore/seqindex/SeqIndex.scala +++ b/src/main/scala/com/comcast/xfinity/sirius/uberstore/seqindex/SeqIndex.scala @@ -25,6 +25,13 @@ trait SeqIndex { */ def getOffsetFor(seq: Long): Option[Long] + /** + * Get the minimum sequence number stored, if such exists + * + * @return Some(sequence) or None if none such exists + */ + def getMinSeq: Option[Long] + /** * Get the maximum sequence number stored, if such exists * diff --git a/src/main/scala/com/comcast/xfinity/sirius/writeaheadlog/CachedSiriusLog.scala b/src/main/scala/com/comcast/xfinity/sirius/writeaheadlog/CachedSiriusLog.scala index fe265477..922e0f23 100644 --- a/src/main/scala/com/comcast/xfinity/sirius/writeaheadlog/CachedSiriusLog.scala +++ b/src/main/scala/com/comcast/xfinity/sirius/writeaheadlog/CachedSiriusLog.scala @@ -108,6 +108,9 @@ class CachedSiriusLog(log: SiriusLog, maxCacheSize: Int) extends SiriusLog { .values.asScala.foldLeft(acc0)(foldFun) } + override def foldLeftWhile[T](startSeq: Long)(acc0: T)(pred: T => Boolean)(foldFun: (T, OrderedEvent) => T): T = + log.foldLeftWhile(startSeq)(acc0)(pred)(foldFun) + def getNextSeq = log.getNextSeq def compact(): Unit = { diff --git a/src/main/scala/com/comcast/xfinity/sirius/writeaheadlog/SiriusLog.scala b/src/main/scala/com/comcast/xfinity/sirius/writeaheadlog/SiriusLog.scala index 53254349..00a4a038 100644 --- a/src/main/scala/com/comcast/xfinity/sirius/writeaheadlog/SiriusLog.scala +++ b/src/main/scala/com/comcast/xfinity/sirius/writeaheadlog/SiriusLog.scala @@ -59,6 +59,15 @@ trait SiriusLog { */ def foldLeftRange[T](startSeq: Long, endSeq: Long)(acc0: T)(foldFun: (T, OrderedEvent) => T): T + /** + * Fold left across log entries while a condition is met + * @param startSeq sequence number to start with, inclusive + * @param acc0 initial accumulator value + * @param pred condition to continue accumulating log entries + * @param foldFun function to apply to the log entry, the result being the new accumulator + */ + def foldLeftWhile[T](startSeq: Long)(acc0: T)(pred: T => Boolean)(foldFun: (T, OrderedEvent) => T): T + /** * retrieves the next sequence number to be written */ diff --git a/src/test/scala/com/comcast/xfinity/sirius/api/impl/state/SiriusPersistenceActorTest.scala b/src/test/scala/com/comcast/xfinity/sirius/api/impl/state/SiriusPersistenceActorTest.scala index 55f2d194..ec49a665 100644 --- a/src/test/scala/com/comcast/xfinity/sirius/api/impl/state/SiriusPersistenceActorTest.scala +++ b/src/test/scala/com/comcast/xfinity/sirius/api/impl/state/SiriusPersistenceActorTest.scala @@ -19,15 +19,15 @@ import akka.actor.{ActorRef, ActorSystem} import akka.testkit.TestActorRef import akka.testkit.TestProbe import com.comcast.xfinity.sirius.writeaheadlog.SiriusLog - - import org.mockito.Mockito._ import com.comcast.xfinity.sirius.NiceTest -import com.comcast.xfinity.sirius.api.impl.{OrderedEvent, Put, Delete} +import com.comcast.xfinity.sirius.api.impl.{Delete, OrderedEvent, Put} import com.comcast.xfinity.sirius.api.impl.state.SiriusPersistenceActor._ -import org.mockito.ArgumentMatchers.{any, eq => meq, anyLong} +import org.mockito.ArgumentMatchers.{any, anyLong, eq => meq} import com.comcast.xfinity.sirius.api.SiriusConfiguration +import scala.collection.mutable.ListBuffer + class SiriusPersistenceActorTest extends NiceTest { implicit var actorSystem: ActorSystem = _ @@ -54,9 +54,10 @@ class SiriusPersistenceActorTest extends NiceTest { TestActorRef(new SiriusPersistenceActor(stateActor, siriusLog, config)) } - def makeMockLog(events: List[OrderedEvent], nextSeq: Long = 1L): SiriusLog = { + def makeMockLog(events: ListBuffer[OrderedEvent], nextSeq: Long = 1L): SiriusLog = { val mockLog = mock[SiriusLog] doReturn(events).when(mockLog).foldLeftRange(anyLong, anyLong)(any[Symbol])(anyFoldFun) + doReturn(events).when(mockLog).foldLeftWhile(anyLong)(any[Symbol])(anyPred)(anyFoldFun) doReturn(nextSeq).when(mockLog).getNextSeq mockLog @@ -64,8 +65,14 @@ class SiriusPersistenceActorTest extends NiceTest { def anyFoldFun = any[(Symbol, OrderedEvent) => Symbol]() + def anyPred = any[Symbol => Boolean] + def verifyFoldLeftRanged(siriusLog: SiriusLog, start: Long, end: Long): Unit = { - verify(siriusLog).foldLeftRange(meq(start), meq(end))(meq(List[OrderedEvent]()))(any[(List[OrderedEvent], OrderedEvent) => List[OrderedEvent]]()) + verify(siriusLog).foldLeftRange(meq(start), meq(end))(meq(ListBuffer[OrderedEvent]()))(any[(ListBuffer[OrderedEvent], OrderedEvent) => ListBuffer[OrderedEvent]]()) + } + + def verifyFoldLeftWhile(siriusLog: SiriusLog, start: Long): Unit = { + verify(siriusLog).foldLeftWhile(meq(start))(meq(ListBuffer[OrderedEvent]()))(any[ListBuffer[OrderedEvent] => Boolean])(any[(ListBuffer[OrderedEvent], OrderedEvent) => ListBuffer[OrderedEvent]]) } describe("a SiriusPersistenceActor") { @@ -135,7 +142,7 @@ class SiriusPersistenceActorTest extends NiceTest { val event1 = mock[OrderedEvent] val event2 = mock[OrderedEvent] - val mockLog = makeMockLog(List(event2, event1), 10L) + val mockLog = makeMockLog(ListBuffer(event1, event2), 10L) val underTest = makePersistenceActor(siriusLog = mockLog) senderProbe.send(underTest, GetLogSubrange(1, 2)) @@ -150,7 +157,7 @@ class SiriusPersistenceActorTest extends NiceTest { val event1 = mock[OrderedEvent] val event2 = mock[OrderedEvent] - val mockLog = makeMockLog(List(event2, event1), 10L) + val mockLog = makeMockLog(ListBuffer(event1, event2), 10L) val underTest = makePersistenceActor(siriusLog = mockLog) senderProbe.send(underTest, GetLogSubrange(8, 11)) @@ -165,7 +172,7 @@ class SiriusPersistenceActorTest extends NiceTest { val event1 = mock[OrderedEvent] val event2 = mock[OrderedEvent] - val mockLog = makeMockLog(List(event2, event1), 5L) + val mockLog = makeMockLog(ListBuffer(event1, event2), 5L) val underTest = makePersistenceActor(siriusLog = mockLog) senderProbe.send(underTest, GetLogSubrange(8, 11)) @@ -174,5 +181,55 @@ class SiriusPersistenceActorTest extends NiceTest { } } } + + describe("upon receiving a GetLogRangeLimit message") { + describe("when we can fully reply") { + it("should build the list of events and reply with it") { + val senderProbe = TestProbe() + + val event1 = mock[OrderedEvent] + doReturn(1L).when(event1).sequence + val event2 = mock[OrderedEvent] + doReturn(2L).when(event2).sequence + val mockLog = makeMockLog(ListBuffer(event1, event2), 10L) + val underTest = makePersistenceActor(siriusLog = mockLog) + + senderProbe.send(underTest, GetLogRangeLimit(1, 2)) + + verifyFoldLeftWhile(mockLog, 1) + senderProbe.expectMsg(CompleteSubrange(1, 2, List(event1, event2))) + } + } + describe("when we can partially reply") { + it("should build the list of events and reply with it") { + val senderProbe = TestProbe() + + val event1 = mock[OrderedEvent] + doReturn(8L).when(event1).sequence + val event2 = mock[OrderedEvent] + doReturn(9L).when(event2).sequence + val mockLog = makeMockLog(ListBuffer(event1, event2), 10L) + val underTest = makePersistenceActor(siriusLog = mockLog) + + senderProbe.send(underTest, GetLogRangeLimit(8, 3)) + + verifyFoldLeftWhile(mockLog, 8) + senderProbe.expectMsg(PartialSubrange(8, 9, List(event1, event2))) + } + } + describe("when we can't send anything useful at all") { + it("should send back an EmptySubrange message") { + val senderProbe = TestProbe() + + val mockLog = makeMockLog(ListBuffer(), 5L) + val underTest = makePersistenceActor(siriusLog = mockLog) + + senderProbe.send(underTest, GetLogRangeLimit(8, 11)) + + verifyFoldLeftWhile(mockLog, 8) + senderProbe.expectMsg(EmptySubrange) + } + } + } } } diff --git a/src/test/scala/com/comcast/xfinity/sirius/itest/DoNothingSiriusLog.scala b/src/test/scala/com/comcast/xfinity/sirius/itest/DoNothingSiriusLog.scala index c5cd6beb..7dceb0f7 100644 --- a/src/test/scala/com/comcast/xfinity/sirius/itest/DoNothingSiriusLog.scala +++ b/src/test/scala/com/comcast/xfinity/sirius/itest/DoNothingSiriusLog.scala @@ -32,6 +32,8 @@ class DoNothingSiriusLog extends SiriusLog { acc0 } + override def foldLeftWhile[T](startSeq: Long)(acc0: T)(pred: T => Boolean)(foldFun: (T, OrderedEvent) => T): T = acc0 + override def getNextSeq = 1L override def compact(): Unit = {} diff --git a/src/test/scala/com/comcast/xfinity/sirius/uberstore/UberToolTest.scala b/src/test/scala/com/comcast/xfinity/sirius/uberstore/UberToolTest.scala index 635835ea..d20c2de6 100644 --- a/src/test/scala/com/comcast/xfinity/sirius/uberstore/UberToolTest.scala +++ b/src/test/scala/com/comcast/xfinity/sirius/uberstore/UberToolTest.scala @@ -19,11 +19,13 @@ package com.comcast.xfinity.sirius.uberstore import com.comcast.xfinity.sirius.NiceTest import com.comcast.xfinity.sirius.writeaheadlog.SiriusLog import com.comcast.xfinity.sirius.api.impl.{Delete, OrderedEvent, Put} -import java.io.{File => JFile} +import java.io.{File => JFile} import better.files.File import com.comcast.xfinity.sirius.uberstore.segmented.SegmentedUberStore +import scala.annotation.tailrec + object UberToolTest { class DummySiriusLog(var events: List[OrderedEvent]) extends SiriusLog { def writeEntry(event: OrderedEvent): Unit = { @@ -34,6 +36,30 @@ object UberToolTest { def foldLeftRange[T](start: Long, end: Long)(acc0: T)(foldFun: (T, OrderedEvent) => T): T = events.filter(e => start <= e.sequence && e.sequence <= end).foldLeft(acc0)(foldFun) + def foldLeftWhile[T](startSeq: Long)(acc0: T)(pred: T => Boolean)(foldFun: (T, OrderedEvent) => T): T = { + var acc: T = acc0 + for (evt <- events) { + if (!pred(acc)) { + return acc + } + acc = foldFun(acc, evt) + } + acc + } + + @tailrec + private def foldLeftWhile[T](events: List[OrderedEvent], startSeq: Long, acc: T, pred: T => Boolean, foldFun: (T, OrderedEvent) => T): T = { + events match { + case Nil => acc + case evt :: rest if evt.sequence < startSeq => + foldLeftWhile(rest, startSeq, acc, pred, foldFun) + case _ if !pred(acc) => acc + case evt :: rest => + val accNew = foldFun(acc, evt) + foldLeftWhile(rest, startSeq, accNew, pred, foldFun) + } + } + def getNextSeq: Long = throw new IllegalStateException("not implemented") From 121166e9b6a98b844726a698573064894a6d40ca Mon Sep 17 00:00:00 2001 From: "Spindler, Justin" Date: Sat, 15 Feb 2025 19:40:45 -0500 Subject: [PATCH 2/8] fix test --- .../sirius/api/impl/state/SiriusPersistenceActorTest.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/scala/com/comcast/xfinity/sirius/api/impl/state/SiriusPersistenceActorTest.scala b/src/test/scala/com/comcast/xfinity/sirius/api/impl/state/SiriusPersistenceActorTest.scala index ec49a665..9691ec1b 100644 --- a/src/test/scala/com/comcast/xfinity/sirius/api/impl/state/SiriusPersistenceActorTest.scala +++ b/src/test/scala/com/comcast/xfinity/sirius/api/impl/state/SiriusPersistenceActorTest.scala @@ -72,7 +72,7 @@ class SiriusPersistenceActorTest extends NiceTest { } def verifyFoldLeftWhile(siriusLog: SiriusLog, start: Long): Unit = { - verify(siriusLog).foldLeftWhile(meq(start))(meq(ListBuffer[OrderedEvent]()))(any[ListBuffer[OrderedEvent] => Boolean])(any[(ListBuffer[OrderedEvent], OrderedEvent) => ListBuffer[OrderedEvent]]) + verify(siriusLog).foldLeftWhile(meq(start))(meq(ListBuffer[OrderedEvent]()))(any[ListBuffer[OrderedEvent] => Boolean]())(any[(ListBuffer[OrderedEvent], OrderedEvent) => ListBuffer[OrderedEvent]]()) } describe("a SiriusPersistenceActor") { From db853ca31c59f79a97faf8ee6abb46bd48a0ef90 Mon Sep 17 00:00:00 2001 From: "Spindler, Justin" Date: Sun, 16 Feb 2025 10:39:51 -0500 Subject: [PATCH 3/8] add tests, fix segment empty range issue --- .../sirius/uberstore/segmented/Segment.scala | 15 ++++---- .../segmented/SegmentedUberStoreTest.scala | 34 ++++++++++++++++++- 2 files changed, 42 insertions(+), 7 deletions(-) diff --git a/src/main/scala/com/comcast/xfinity/sirius/uberstore/segmented/Segment.scala b/src/main/scala/com/comcast/xfinity/sirius/uberstore/segmented/Segment.scala index 7776274c..7bd47009 100644 --- a/src/main/scala/com/comcast/xfinity/sirius/uberstore/segmented/Segment.scala +++ b/src/main/scala/com/comcast/xfinity/sirius/uberstore/segmented/Segment.scala @@ -172,12 +172,15 @@ class Segment private[uberstore](val location: File, ) } - def foldLeftWhile[T](startSeq: Long)(acc0: T)(pred: T => Boolean)(foldFun: (T, OrderedEvent) => T): T = { - val (startOffset, _) = index.getOffsetRange(startSeq, Long.MaxValue) - dataFile.foldLeftWhile(startOffset)(acc0)(pred)( - (acc, evt) => foldFun(acc, evt) - ) - } + def foldLeftWhile[T](startSeq: Long)(acc0: T)(pred: T => Boolean)(foldFun: (T, OrderedEvent) => T): T = + index.getOffsetRange(startSeq, Long.MaxValue) match { + case (_, endOffset) if endOffset == -1 => // indicates an empty range + acc0 + case (startOffset, _) => + dataFile.foldLeftWhile(startOffset)(acc0)(pred)( + (acc, evt) => foldFun(acc, evt) + ) + } /** * Close underlying file handles or connections. This Segment should not be used after diff --git a/src/test/scala/com/comcast/xfinity/sirius/uberstore/segmented/SegmentedUberStoreTest.scala b/src/test/scala/com/comcast/xfinity/sirius/uberstore/segmented/SegmentedUberStoreTest.scala index 94b19f1c..8ccc2f70 100644 --- a/src/test/scala/com/comcast/xfinity/sirius/uberstore/segmented/SegmentedUberStoreTest.scala +++ b/src/test/scala/com/comcast/xfinity/sirius/uberstore/segmented/SegmentedUberStoreTest.scala @@ -182,7 +182,7 @@ class SegmentedUberStoreTest extends NiceTest { } describe("foldLeftRange") { - it("should reflect livedir's nextSeq") { + it("should include livedir's events") { createSegment(dir, "1") createSegment(dir, "5") createSegment(dir, "10") @@ -197,6 +197,38 @@ class SegmentedUberStoreTest extends NiceTest { } } + describe("foldLeftWhile") { + it("should limit events based on predicate on accumulator") { + createPopulatedSegment(dir, "1", Range.inclusive(1, 3).toList) + createPopulatedSegment(dir, "5", Range.inclusive(4, 6).toList) + createPopulatedSegment(dir, "10", Range.inclusive(7, 9).toList) + uberstore = SegmentedUberStore(dir.getAbsolutePath, new SiriusConfiguration) + uberstore.writeEntry(OrderedEvent(10L, 1L, Delete("10"))) + uberstore.writeEntry(OrderedEvent(11L, 1L, Delete("11"))) + + val result = uberstore.foldLeftWhile(startSeq = 1)(List[SiriusRequest]())(list => list.size < 5)( + (acc, event) => event.request +: acc + ).reverse + + assert(result === List(Delete("1"), Delete("2"), Delete("3"), Delete("4"), Delete("5"))) + } + + it("should include livedir's events") { + createPopulatedSegment(dir, "1", Range.inclusive(1, 3).toList) + createPopulatedSegment(dir, "5", Range.inclusive(4, 6).toList) + createPopulatedSegment(dir, "10", Range.inclusive(7, 9).toList) + uberstore = SegmentedUberStore(dir.getAbsolutePath, new SiriusConfiguration) + uberstore.writeEntry(OrderedEvent(10L, 1L, Delete("10"))) + uberstore.writeEntry(OrderedEvent(11L, 1L, Delete("11"))) + + val result = uberstore.foldLeftWhile(startSeq = 9)(List[SiriusRequest]())(list => list.size < 2)( + (acc, event) => event.request +: acc + ).reverse + + assert(result === List(Delete("9"), Delete("10"))) + } + } + describe("parallelForeach") { it("should bootstrap the uberstore in parallel") { createPopulatedSegment(dir, "1", Range.inclusive(1, 3).toList, isApplied = true) From 529eb78706e6cefc3c953713cbcf3cf24cbdbbcb Mon Sep 17 00:00:00 2001 From: "Spindler, Justin" Date: Sun, 16 Feb 2025 11:09:33 -0500 Subject: [PATCH 4/8] Update CatchupSupervisor --- .../sirius/api/impl/bridge/CatchupSupervisor.scala | 8 +++++--- .../sirius/api/impl/state/SiriusPersistenceActor.scala | 4 ++-- .../sirius/api/impl/bridge/CatchupSupervisorTest.scala | 6 +++--- .../api/impl/state/SiriusPersistenceActorTest.scala | 6 +++--- 4 files changed, 13 insertions(+), 11 deletions(-) diff --git a/src/main/scala/com/comcast/xfinity/sirius/api/impl/bridge/CatchupSupervisor.scala b/src/main/scala/com/comcast/xfinity/sirius/api/impl/bridge/CatchupSupervisor.scala index ea0c148f..1bdb701c 100644 --- a/src/main/scala/com/comcast/xfinity/sirius/api/impl/bridge/CatchupSupervisor.scala +++ b/src/main/scala/com/comcast/xfinity/sirius/api/impl/bridge/CatchupSupervisor.scala @@ -15,14 +15,16 @@ */ package com.comcast.xfinity.sirius.api.impl.bridge -import akka.actor.{ActorRef, Props, Actor} +import akka.actor.{Actor, ActorRef, Props} import akka.pattern.ask + import scala.concurrent.duration._ import com.comcast.xfinity.sirius.api.impl.membership.MembershipHelper import com.comcast.xfinity.sirius.api.SiriusConfiguration import com.comcast.xfinity.sirius.api.impl.bridge.CatchupSupervisor.CatchupSupervisorInfoMBean -import com.comcast.xfinity.sirius.api.impl.state.SiriusPersistenceActor.{GetLogSubrange, LogSubrange, CompleteSubrange} +import com.comcast.xfinity.sirius.api.impl.state.SiriusPersistenceActor.{CompleteSubrange, GetLogSubrangeToLimit, LogSubrange} import com.comcast.xfinity.sirius.admin.MonitoringHooks + import scala.util.Success case class InitiateCatchup(fromSeq: Long) @@ -120,7 +122,7 @@ private[bridge] class CatchupSupervisor(membershipHelper: MembershipHelper, } def requestSubrange(fromSeq: Long, window: Int, source: ActorRef): Unit = { - source.ask(GetLogSubrange(fromSeq, fromSeq + window))(timeout()).onComplete { + source.ask(GetLogSubrangeToLimit(fromSeq, window))(timeout()).onComplete { case Success(logSubrange: LogSubrange) => self ! CatchupRequestSucceeded(logSubrange) case _ => self ! CatchupRequestFailed } diff --git a/src/main/scala/com/comcast/xfinity/sirius/api/impl/state/SiriusPersistenceActor.scala b/src/main/scala/com/comcast/xfinity/sirius/api/impl/state/SiriusPersistenceActor.scala index f7d59dda..e213e6c0 100644 --- a/src/main/scala/com/comcast/xfinity/sirius/api/impl/state/SiriusPersistenceActor.scala +++ b/src/main/scala/com/comcast/xfinity/sirius/api/impl/state/SiriusPersistenceActor.scala @@ -43,7 +43,7 @@ object SiriusPersistenceActor { * @param end last sequence number of the range, inclusive */ case class GetLogSubrange(begin: Long, end: Long) extends LogQuery - case class GetLogRangeLimit(begin: Long, events: Long) extends LogQuery + case class GetLogSubrangeToLimit(begin: Long, events: Long) extends LogQuery trait LogSubrange trait PopulatedSubrange extends LogSubrange { @@ -132,7 +132,7 @@ class SiriusPersistenceActor(stateActor: ActorRef, lastWriteTime = thisWriteTime - case GetLogRangeLimit(start, limit) => + case GetLogSubrangeToLimit(start, limit) => val buffer = siriusLog.foldLeftWhile(start)(ListBuffer.empty[OrderedEvent])(buffer => buffer.length < limit)( (acc, event) => acc += event ) diff --git a/src/test/scala/com/comcast/xfinity/sirius/api/impl/bridge/CatchupSupervisorTest.scala b/src/test/scala/com/comcast/xfinity/sirius/api/impl/bridge/CatchupSupervisorTest.scala index 7a0c7938..40fa8b44 100644 --- a/src/test/scala/com/comcast/xfinity/sirius/api/impl/bridge/CatchupSupervisorTest.scala +++ b/src/test/scala/com/comcast/xfinity/sirius/api/impl/bridge/CatchupSupervisorTest.scala @@ -53,7 +53,7 @@ class CatchupSupervisorTest extends NiceTest { underTest ! InitiateCatchup(1L) - remote.expectMsgClass(classOf[GetLogSubrange]) + remote.expectMsgClass(classOf[GetLogSubrangeToLimit]) } describe("for successful requests with a complete subrange") { it("should forward the message on to the parent") { @@ -176,7 +176,7 @@ class CatchupSupervisorTest extends NiceTest { underTest ! ContinueCatchup(1L) - remote.expectMsg(GetLogSubrange(1L, 2L)) + remote.expectMsg(GetLogSubrangeToLimit(1L, 1L)) } it("should ignore InitiateCatchup requests if it's currently in catchup mode") { val remote = TestProbe() @@ -197,7 +197,7 @@ class CatchupSupervisorTest extends NiceTest { underTest ! StopCatchup underTest ! InitiateCatchup(1L) - remote.expectMsg(GetLogSubrange(1L, 2L)) + remote.expectMsg(GetLogSubrangeToLimit(1L, 1L)) } } } diff --git a/src/test/scala/com/comcast/xfinity/sirius/api/impl/state/SiriusPersistenceActorTest.scala b/src/test/scala/com/comcast/xfinity/sirius/api/impl/state/SiriusPersistenceActorTest.scala index 9691ec1b..27f67e64 100644 --- a/src/test/scala/com/comcast/xfinity/sirius/api/impl/state/SiriusPersistenceActorTest.scala +++ b/src/test/scala/com/comcast/xfinity/sirius/api/impl/state/SiriusPersistenceActorTest.scala @@ -194,7 +194,7 @@ class SiriusPersistenceActorTest extends NiceTest { val mockLog = makeMockLog(ListBuffer(event1, event2), 10L) val underTest = makePersistenceActor(siriusLog = mockLog) - senderProbe.send(underTest, GetLogRangeLimit(1, 2)) + senderProbe.send(underTest, GetLogSubrangeToLimit(1, 2)) verifyFoldLeftWhile(mockLog, 1) senderProbe.expectMsg(CompleteSubrange(1, 2, List(event1, event2))) @@ -211,7 +211,7 @@ class SiriusPersistenceActorTest extends NiceTest { val mockLog = makeMockLog(ListBuffer(event1, event2), 10L) val underTest = makePersistenceActor(siriusLog = mockLog) - senderProbe.send(underTest, GetLogRangeLimit(8, 3)) + senderProbe.send(underTest, GetLogSubrangeToLimit(8, 3)) verifyFoldLeftWhile(mockLog, 8) senderProbe.expectMsg(PartialSubrange(8, 9, List(event1, event2))) @@ -224,7 +224,7 @@ class SiriusPersistenceActorTest extends NiceTest { val mockLog = makeMockLog(ListBuffer(), 5L) val underTest = makePersistenceActor(siriusLog = mockLog) - senderProbe.send(underTest, GetLogRangeLimit(8, 11)) + senderProbe.send(underTest, GetLogSubrangeToLimit(8, 11)) verifyFoldLeftWhile(mockLog, 8) senderProbe.expectMsg(EmptySubrange) From 4c436de0e7d2340b5349e22111ca8eae7cca3907 Mon Sep 17 00:00:00 2001 From: "Spindler, Justin" Date: Sun, 16 Feb 2025 15:49:07 -0500 Subject: [PATCH 5/8] Refactor to make limited subrange include end sequence --- .../api/impl/bridge/CatchupSupervisor.scala | 4 +- .../impl/state/SiriusPersistenceActor.scala | 110 +++++++++++++----- .../xfinity/sirius/uberstore/UberPair.scala | 6 +- .../xfinity/sirius/uberstore/UberStore.scala | 4 +- .../sirius/uberstore/data/UberDataFile.scala | 11 +- .../sirius/uberstore/segmented/Segment.scala | 8 +- .../segmented/SegmentedUberStore.scala | 6 +- .../writeaheadlog/CachedSiriusLog.scala | 36 +++++- .../sirius/writeaheadlog/SiriusLog.scala | 2 +- .../impl/bridge/CatchupSupervisorTest.scala | 6 +- .../state/SiriusPersistenceActorTest.scala | 38 ++++-- .../sirius/itest/DoNothingSiriusLog.scala | 4 +- .../sirius/uberstore/UberToolTest.scala | 19 +-- .../segmented/SegmentedUberStoreTest.scala | 6 +- 14 files changed, 175 insertions(+), 85 deletions(-) diff --git a/src/main/scala/com/comcast/xfinity/sirius/api/impl/bridge/CatchupSupervisor.scala b/src/main/scala/com/comcast/xfinity/sirius/api/impl/bridge/CatchupSupervisor.scala index 1bdb701c..63e6c707 100644 --- a/src/main/scala/com/comcast/xfinity/sirius/api/impl/bridge/CatchupSupervisor.scala +++ b/src/main/scala/com/comcast/xfinity/sirius/api/impl/bridge/CatchupSupervisor.scala @@ -22,7 +22,7 @@ import scala.concurrent.duration._ import com.comcast.xfinity.sirius.api.impl.membership.MembershipHelper import com.comcast.xfinity.sirius.api.SiriusConfiguration import com.comcast.xfinity.sirius.api.impl.bridge.CatchupSupervisor.CatchupSupervisorInfoMBean -import com.comcast.xfinity.sirius.api.impl.state.SiriusPersistenceActor.{CompleteSubrange, GetLogSubrangeToLimit, LogSubrange} +import com.comcast.xfinity.sirius.api.impl.state.SiriusPersistenceActor.{CompleteSubrange, GetLogSubrangeWithLimit, LogSubrange} import com.comcast.xfinity.sirius.admin.MonitoringHooks import scala.util.Success @@ -122,7 +122,7 @@ private[bridge] class CatchupSupervisor(membershipHelper: MembershipHelper, } def requestSubrange(fromSeq: Long, window: Int, source: ActorRef): Unit = { - source.ask(GetLogSubrangeToLimit(fromSeq, window))(timeout()).onComplete { + source.ask(GetLogSubrangeWithLimit(fromSeq, Long.MaxValue, window))(timeout()).onComplete { case Success(logSubrange: LogSubrange) => self ! CatchupRequestSucceeded(logSubrange) case _ => self ! CatchupRequestFailed } diff --git a/src/main/scala/com/comcast/xfinity/sirius/api/impl/state/SiriusPersistenceActor.scala b/src/main/scala/com/comcast/xfinity/sirius/api/impl/state/SiriusPersistenceActor.scala index e213e6c0..f1d341ca 100644 --- a/src/main/scala/com/comcast/xfinity/sirius/api/impl/state/SiriusPersistenceActor.scala +++ b/src/main/scala/com/comcast/xfinity/sirius/api/impl/state/SiriusPersistenceActor.scala @@ -43,7 +43,18 @@ object SiriusPersistenceActor { * @param end last sequence number of the range, inclusive */ case class GetLogSubrange(begin: Long, end: Long) extends LogQuery - case class GetLogSubrangeToLimit(begin: Long, events: Long) extends LogQuery + /** + * Message for directly requesting a chunk of the log from a node. + * + * SiriusPersistenceActor is expected to reply with a LogSubrange + * when receiving this message. This range should be as complete + * as possible. + * + * @param begin first sequence number of the range + * @param end last sequence number of the range, inclusive + * @param limit the maximum number of events + */ + case class GetLogSubrangeWithLimit(begin: Long, end: Long, limit: Long) extends LogQuery trait LogSubrange trait PopulatedSubrange extends LogSubrange { @@ -132,34 +143,11 @@ class SiriusPersistenceActor(stateActor: ActorRef, lastWriteTime = thisWriteTime - case GetLogSubrangeToLimit(start, limit) => - val buffer = siriusLog.foldLeftWhile(start)(ListBuffer.empty[OrderedEvent])(buffer => buffer.length < limit)( - (acc, event) => acc += event - ) - if (buffer.isEmpty) { - sender ! EmptySubrange - } - if (buffer.length < limit) { - sender ! PartialSubrange(buffer.head.sequence, buffer.last.sequence, buffer.toList) - } else { - sender ! CompleteSubrange(buffer.head.sequence, buffer.last.sequence, buffer.toList) - } - - case GetLogSubrange(rangeStart, rangeEnd) if rangeEnd < siriusLog.getNextSeq => // we can answer fully - val events = siriusLog.foldLeftRange(rangeStart, rangeEnd)(ListBuffer.empty[OrderedEvent])( - (acc, event) => acc += event - ) - sender ! CompleteSubrange(rangeStart, rangeEnd, events.toList) - - case GetLogSubrange(rangeStart, _) if siriusLog.getNextSeq <= rangeStart => // we can't send anything useful back - sender ! EmptySubrange + case GetLogSubrangeWithLimit(start, end, limit) => + sender ! queryLimitedSubrange(start, end, limit) - case GetLogSubrange(rangeStart, _) => // we can respond partially - val lastSeq = siriusLog.getNextSeq - 1 - val events = siriusLog.foldLeftRange(rangeStart, lastSeq)(ListBuffer.empty[OrderedEvent])( - (acc, event) => acc += event - ) - sender ! PartialSubrange(rangeStart, lastSeq, events.toList) + case GetLogSubrange(start, end) => + sender ! querySubrange(start, end) case GetNextLogSeq => sender ! siriusLog.getNextSeq @@ -172,6 +160,72 @@ class SiriusPersistenceActor(stateActor: ActorRef, case _: SiriusResult => } + private def queryLimitedSubrange(rangeStart: Long, rangeEnd: Long, limit: Long): LogSubrange = + if (limit <= 0 || rangeEnd < rangeStart || rangeEnd <= 0) { + // invalid query subrange or limit, we can't send anything useful back + EmptySubrange + } else if (limit > (rangeEnd - rangeStart)) { + // limit is larger than the subrange window, no need to enforce limit + querySubrange(rangeStart, rangeEnd) + } else { + val nextSeq = siriusLog.getNextSeq + if (rangeStart >= nextSeq) { + // query is out of range, we can't send anything useful back + EmptySubrange + } else if (rangeEnd >= nextSeq) { + // we can only answer partially + val lastSeq = nextSeq - 1 + val buffer = siriusLog.foldLeftRangeWhile(rangeStart, lastSeq)(ListBuffer.empty[OrderedEvent])( + // continue folding events as long as the buffer is smaller than the limit + buffer => buffer.size < limit + )( + (acc, event) => acc += event + ) + if (buffer.size < limit) { + PartialSubrange(rangeStart, lastSeq, buffer.toList) + } else { + PartialSubrange(rangeStart, buffer.last.sequence, buffer.toList) + } + } else { + val buffer = siriusLog.foldLeftRangeWhile(rangeStart, rangeEnd)(ListBuffer.empty[OrderedEvent])( + // continue folding events as long as the buffer is smaller than the limit + buffer => buffer.size < limit + )( + (acc, event) => acc += event + ) + if (buffer.size < limit) { + CompleteSubrange(rangeStart, rangeEnd, buffer.toList) + } else { + PartialSubrange(rangeStart, buffer.last.sequence, buffer.toList) + } + } + } + + private def querySubrange(rangeStart: Long, rangeEnd: Long): LogSubrange = + if (rangeEnd < rangeStart || rangeEnd <= 0) { + // invalid query subrange or limit, we can't send anything useful back + EmptySubrange + } else { + val nextSeq = siriusLog.getNextSeq + if (rangeStart >= nextSeq) { + // query is out of range, we can't send anything useful back + EmptySubrange + } else if (rangeEnd >= nextSeq) { + // we can answer partially + val lastSeq = nextSeq - 1 + val buffer = siriusLog.foldLeftRange(rangeStart, lastSeq)(ListBuffer.empty[OrderedEvent])( + (acc, event) => acc += event + ) + PartialSubrange(rangeStart, lastSeq, buffer.toList) + } else { + // we can answer fully + val buffer = siriusLog.foldLeftRange(rangeStart, rangeEnd)(ListBuffer.empty[OrderedEvent])( + (acc, event) => acc += event + ) + CompleteSubrange(rangeStart, rangeEnd, buffer.toList) + } + } + /** * Monitoring hooks */ diff --git a/src/main/scala/com/comcast/xfinity/sirius/uberstore/UberPair.scala b/src/main/scala/com/comcast/xfinity/sirius/uberstore/UberPair.scala index ac433e53..a1a8ac54 100644 --- a/src/main/scala/com/comcast/xfinity/sirius/uberstore/UberPair.scala +++ b/src/main/scala/com/comcast/xfinity/sirius/uberstore/UberPair.scala @@ -107,9 +107,9 @@ class UberPair(dataFile: UberDataFile, index: SeqIndex) { ) } - def foldLeftWhile[T](startSeq: Long)(acc0: T)(pred: T => Boolean)(foldFun: (T, OrderedEvent) => T): T = { - val (startOffset, _) = index.getOffsetRange(startSeq, Long.MaxValue) - dataFile.foldLeftWhile(startOffset)(acc0)(pred)(foldFun) + def foldLeftRangeWhile[T](startSeq: Long, endSeq: Long)(acc0: T)(pred: T => Boolean)(foldFun: (T, OrderedEvent) => T): T = { + val (startOffset, endOffset) = index.getOffsetRange(startSeq, endSeq) + dataFile.foldLeftRangeWhile(startOffset, endOffset)(acc0)(pred)(foldFun) } /** diff --git a/src/main/scala/com/comcast/xfinity/sirius/uberstore/UberStore.scala b/src/main/scala/com/comcast/xfinity/sirius/uberstore/UberStore.scala index 1de8eb9b..294debb6 100644 --- a/src/main/scala/com/comcast/xfinity/sirius/uberstore/UberStore.scala +++ b/src/main/scala/com/comcast/xfinity/sirius/uberstore/UberStore.scala @@ -85,8 +85,8 @@ class UberStore private[uberstore] (baseDir: String, uberpair: UberPair) extends /** * @inheritdoc */ - def foldLeftWhile[T](startSeq: Long)(acc0: T)(pred: T => Boolean)(foldFun: (T, OrderedEvent) => T): T = { - uberpair.foldLeftWhile(startSeq)(acc0)(pred)(foldFun) + def foldLeftRangeWhile[T](startSeq: Long, endSeq: Long)(acc0: T)(pred: T => Boolean)(foldFun: (T, OrderedEvent) => T): T = { + uberpair.foldLeftRangeWhile(startSeq, endSeq)(acc0)(pred)(foldFun) } /** diff --git a/src/main/scala/com/comcast/xfinity/sirius/uberstore/data/UberDataFile.scala b/src/main/scala/com/comcast/xfinity/sirius/uberstore/data/UberDataFile.scala index 64e576ba..4bd0d3e7 100644 --- a/src/main/scala/com/comcast/xfinity/sirius/uberstore/data/UberDataFile.scala +++ b/src/main/scala/com/comcast/xfinity/sirius/uberstore/data/UberDataFile.scala @@ -107,10 +107,10 @@ private[uberstore] class UberDataFile(dataFileName: String, } } - def foldLeftWhile[T](baseOff:Long)(acc0: T)(pred: T => Boolean)(foldFun: (T, OrderedEvent) => T): T = { + def foldLeftRangeWhile[T](baseOff:Long, endOff: Long)(acc0: T)(pred: T => Boolean)(foldFun: (T, OrderedEvent) => T): T = { val readHandle = fileHandleFactory.createReadHandle(dataFileName, baseOff) try { - foldLeftWhile(readHandle, acc0, pred, foldFun) + foldLeftRangeWhile(readHandle, endOff, acc0, pred, foldFun) } finally { readHandle.close() } @@ -133,15 +133,16 @@ private[uberstore] class UberDataFile(dataFileName: String, } @tailrec - private def foldLeftWhile[T](readHandle: UberDataFileReadHandle, acc: T, pred: T => Boolean, foldFun: (T, OrderedEvent) => T): T = { - if (!pred(acc)) { + private def foldLeftRangeWhile[T](readHandle: UberDataFileReadHandle, maxOffset: Long, acc: T, pred: T => Boolean, foldFun: (T, OrderedEvent) => T): T = { + val offset = readHandle.offset() + if (offset > maxOffset || !pred(acc)) { acc } else { fileOps.readNext(readHandle) match { case None => acc case Some(bytes) => val accNew = foldFun(acc, codec.deserialize(bytes)) - foldLeftWhile(readHandle, accNew, pred, foldFun) + foldLeftRangeWhile(readHandle, maxOffset, accNew, pred, foldFun) } } } diff --git a/src/main/scala/com/comcast/xfinity/sirius/uberstore/segmented/Segment.scala b/src/main/scala/com/comcast/xfinity/sirius/uberstore/segmented/Segment.scala index 7bd47009..cd5dbdb5 100644 --- a/src/main/scala/com/comcast/xfinity/sirius/uberstore/segmented/Segment.scala +++ b/src/main/scala/com/comcast/xfinity/sirius/uberstore/segmented/Segment.scala @@ -172,12 +172,12 @@ class Segment private[uberstore](val location: File, ) } - def foldLeftWhile[T](startSeq: Long)(acc0: T)(pred: T => Boolean)(foldFun: (T, OrderedEvent) => T): T = - index.getOffsetRange(startSeq, Long.MaxValue) match { + def foldLeftRangeWhile[T](startSeq: Long, endSeq: Long)(acc0: T)(pred: T => Boolean)(foldFun: (T, OrderedEvent) => T): T = + index.getOffsetRange(startSeq, endSeq) match { case (_, endOffset) if endOffset == -1 => // indicates an empty range acc0 - case (startOffset, _) => - dataFile.foldLeftWhile(startOffset)(acc0)(pred)( + case (startOffset, endOffset) => + dataFile.foldLeftRangeWhile(startOffset, endOffset)(acc0)(pred)( (acc, evt) => foldFun(acc, evt) ) } diff --git a/src/main/scala/com/comcast/xfinity/sirius/uberstore/segmented/SegmentedUberStore.scala b/src/main/scala/com/comcast/xfinity/sirius/uberstore/segmented/SegmentedUberStore.scala index e7fb6c74..c6128e1b 100644 --- a/src/main/scala/com/comcast/xfinity/sirius/uberstore/segmented/SegmentedUberStore.scala +++ b/src/main/scala/com/comcast/xfinity/sirius/uberstore/segmented/SegmentedUberStore.scala @@ -154,11 +154,11 @@ class SegmentedUberStore private[segmented] (base: JFile, /** * @inheritdoc */ - def foldLeftWhile[T](startSeq: Long)(acc0: T)(pred: T => Boolean)(foldFun: (T, OrderedEvent) => T): T = { + def foldLeftRangeWhile[T](startSeq: Long, endSeq: Long)(acc0: T)(pred: T => Boolean)(foldFun: (T, OrderedEvent) => T): T = { val res0 = readOnlyDirs.foldLeft(acc0)( - (acc, dir) => dir.foldLeftWhile(startSeq)(acc)(pred)(foldFun) + (acc, dir) => dir.foldLeftRangeWhile(startSeq, endSeq)(acc)(pred)(foldFun) ) - liveDir.foldLeftWhile(startSeq)(res0)(pred)(foldFun) + liveDir.foldLeftRangeWhile(startSeq, endSeq)(res0)(pred)(foldFun) } /** diff --git a/src/main/scala/com/comcast/xfinity/sirius/writeaheadlog/CachedSiriusLog.scala b/src/main/scala/com/comcast/xfinity/sirius/writeaheadlog/CachedSiriusLog.scala index 922e0f23..5c31ad01 100644 --- a/src/main/scala/com/comcast/xfinity/sirius/writeaheadlog/CachedSiriusLog.scala +++ b/src/main/scala/com/comcast/xfinity/sirius/writeaheadlog/CachedSiriusLog.scala @@ -16,8 +16,10 @@ package com.comcast.xfinity.sirius.writeaheadlog import com.comcast.xfinity.sirius.api.impl.OrderedEvent + import scala.collection.JavaConverters._ import java.util.{TreeMap => JTreeMap} +import scala.annotation.tailrec object CachedSiriusLog { /** @@ -89,27 +91,51 @@ class CachedSiriusLog(log: SiriusLog, maxCacheSize: Int) extends SiriusLog { * @param foldFun function that should take (accumulator, event) and return a new accumulator * @return the final value of the accumulator */ - override def foldLeftRange[T](startSeq: Long, endSeq: Long)(acc0: T)(foldFun: (T, OrderedEvent) => T): T = { + override def foldLeftRange[T](startSeq: Long, endSeq: Long)(acc0: T)(foldFun: (T, OrderedEvent) => T): T = (startSeq, endSeq) match { case (start, end) if (firstSeq <= start && end <= lastSeq) => foldLeftRangeCached(start, end)(acc0)(foldFun) case (start, end) => log.foldLeftRange(start, end)(acc0)(foldFun) } - } + + def foldLeftRangeWhile[T](startSeq: Long, endSeq: Long)(acc0: T)(pred: T => Boolean)(foldFun: (T, OrderedEvent) => T): T = + (startSeq, endSeq) match { + case (start, end) if (firstSeq <= start && end <= lastSeq) => + foldLeftRangeWhileCached(start, end)(acc0)(pred)(foldFun) + case (start, end) => + log.foldLeftRangeWhile(start, end)(acc0)(pred)(foldFun) + } /** * Private inner version of fold left. This one hits the cache, and assumes that start/endSeqs are * contained in the cache. Synchronizes on writeCache so we can subMap with no fear. */ private def foldLeftRangeCached[T](startSeq: Long, endSeq: Long) - (acc0: T)(foldFun: (T, OrderedEvent) => T): T = writeCache.synchronized { + (acc0: T)(foldFun: (T, OrderedEvent) => T): T = writeCache.synchronized { writeCache.subMap(startSeq, true, endSeq, true) .values.asScala.foldLeft(acc0)(foldFun) } - override def foldLeftWhile[T](startSeq: Long)(acc0: T)(pred: T => Boolean)(foldFun: (T, OrderedEvent) => T): T = - log.foldLeftWhile(startSeq)(acc0)(pred)(foldFun) + private def foldLeftRangeWhileCached[T](startSeq: Long, endSeq: Long) + (acc0: T) + (pred: T => Boolean) + (foldFun: (T, OrderedEvent) => T): T = writeCache.synchronized { + val iterable = writeCache.subMap(startSeq, true, endSeq, true) + .values.asScala + foldLeftRangeWhileCached(iterable.iterator, acc0, pred, foldFun) + } + + @tailrec + private def foldLeftRangeWhileCached[T](iter: Iterator[OrderedEvent], acc: T, pred: T => Boolean, foldFun: (T, OrderedEvent) => T): T = { + if (!(pred(acc) && iter.hasNext)) { + acc + } else { + val evt = iter.next() + val newAcc = foldFun(acc, evt) + foldLeftRangeWhileCached(iter, newAcc, pred, foldFun) + } + } def getNextSeq = log.getNextSeq diff --git a/src/main/scala/com/comcast/xfinity/sirius/writeaheadlog/SiriusLog.scala b/src/main/scala/com/comcast/xfinity/sirius/writeaheadlog/SiriusLog.scala index 00a4a038..20a8a905 100644 --- a/src/main/scala/com/comcast/xfinity/sirius/writeaheadlog/SiriusLog.scala +++ b/src/main/scala/com/comcast/xfinity/sirius/writeaheadlog/SiriusLog.scala @@ -66,7 +66,7 @@ trait SiriusLog { * @param pred condition to continue accumulating log entries * @param foldFun function to apply to the log entry, the result being the new accumulator */ - def foldLeftWhile[T](startSeq: Long)(acc0: T)(pred: T => Boolean)(foldFun: (T, OrderedEvent) => T): T + def foldLeftRangeWhile[T](startSeq: Long, endSeq: Long)(acc0: T)(pred: T => Boolean)(foldFun: (T, OrderedEvent) => T): T /** * retrieves the next sequence number to be written diff --git a/src/test/scala/com/comcast/xfinity/sirius/api/impl/bridge/CatchupSupervisorTest.scala b/src/test/scala/com/comcast/xfinity/sirius/api/impl/bridge/CatchupSupervisorTest.scala index 40fa8b44..9d6369c2 100644 --- a/src/test/scala/com/comcast/xfinity/sirius/api/impl/bridge/CatchupSupervisorTest.scala +++ b/src/test/scala/com/comcast/xfinity/sirius/api/impl/bridge/CatchupSupervisorTest.scala @@ -53,7 +53,7 @@ class CatchupSupervisorTest extends NiceTest { underTest ! InitiateCatchup(1L) - remote.expectMsgClass(classOf[GetLogSubrangeToLimit]) + remote.expectMsgClass(classOf[GetLogSubrangeWithLimit]) } describe("for successful requests with a complete subrange") { it("should forward the message on to the parent") { @@ -176,7 +176,7 @@ class CatchupSupervisorTest extends NiceTest { underTest ! ContinueCatchup(1L) - remote.expectMsg(GetLogSubrangeToLimit(1L, 1L)) + remote.expectMsg(GetLogSubrangeWithLimit(1L, Long.MaxValue, 1L)) } it("should ignore InitiateCatchup requests if it's currently in catchup mode") { val remote = TestProbe() @@ -197,7 +197,7 @@ class CatchupSupervisorTest extends NiceTest { underTest ! StopCatchup underTest ! InitiateCatchup(1L) - remote.expectMsg(GetLogSubrangeToLimit(1L, 1L)) + remote.expectMsg(GetLogSubrangeWithLimit(1L, Long.MaxValue, 1L)) } } } diff --git a/src/test/scala/com/comcast/xfinity/sirius/api/impl/state/SiriusPersistenceActorTest.scala b/src/test/scala/com/comcast/xfinity/sirius/api/impl/state/SiriusPersistenceActorTest.scala index 27f67e64..d8aadf67 100644 --- a/src/test/scala/com/comcast/xfinity/sirius/api/impl/state/SiriusPersistenceActorTest.scala +++ b/src/test/scala/com/comcast/xfinity/sirius/api/impl/state/SiriusPersistenceActorTest.scala @@ -57,7 +57,7 @@ class SiriusPersistenceActorTest extends NiceTest { def makeMockLog(events: ListBuffer[OrderedEvent], nextSeq: Long = 1L): SiriusLog = { val mockLog = mock[SiriusLog] doReturn(events).when(mockLog).foldLeftRange(anyLong, anyLong)(any[Symbol])(anyFoldFun) - doReturn(events).when(mockLog).foldLeftWhile(anyLong)(any[Symbol])(anyPred)(anyFoldFun) + doReturn(events).when(mockLog).foldLeftRangeWhile(anyLong, anyLong)(any[Symbol])(anyPred)(anyFoldFun) doReturn(nextSeq).when(mockLog).getNextSeq mockLog @@ -71,8 +71,8 @@ class SiriusPersistenceActorTest extends NiceTest { verify(siriusLog).foldLeftRange(meq(start), meq(end))(meq(ListBuffer[OrderedEvent]()))(any[(ListBuffer[OrderedEvent], OrderedEvent) => ListBuffer[OrderedEvent]]()) } - def verifyFoldLeftWhile(siriusLog: SiriusLog, start: Long): Unit = { - verify(siriusLog).foldLeftWhile(meq(start))(meq(ListBuffer[OrderedEvent]()))(any[ListBuffer[OrderedEvent] => Boolean]())(any[(ListBuffer[OrderedEvent], OrderedEvent) => ListBuffer[OrderedEvent]]()) + def verifyFoldLeftWhile(siriusLog: SiriusLog, start: Long, end: Long): Unit = { + verify(siriusLog).foldLeftRangeWhile(meq(start), meq(end))(meq(ListBuffer[OrderedEvent]()))(any[ListBuffer[OrderedEvent] => Boolean]())(any[(ListBuffer[OrderedEvent], OrderedEvent) => ListBuffer[OrderedEvent]]()) } describe("a SiriusPersistenceActor") { @@ -182,7 +182,7 @@ class SiriusPersistenceActorTest extends NiceTest { } } - describe("upon receiving a GetLogRangeLimit message") { + describe("upon receiving a GetLogSubrangeWithLimit message") { describe("when we can fully reply") { it("should build the list of events and reply with it") { val senderProbe = TestProbe() @@ -194,13 +194,13 @@ class SiriusPersistenceActorTest extends NiceTest { val mockLog = makeMockLog(ListBuffer(event1, event2), 10L) val underTest = makePersistenceActor(siriusLog = mockLog) - senderProbe.send(underTest, GetLogSubrangeToLimit(1, 2)) + senderProbe.send(underTest, GetLogSubrangeWithLimit(1, 2, 2)) - verifyFoldLeftWhile(mockLog, 1) + verifyFoldLeftRanged(mockLog, 1, 2) senderProbe.expectMsg(CompleteSubrange(1, 2, List(event1, event2))) } } - describe("when we can partially reply") { + describe("when we can partially reply due to range") { it("should build the list of events and reply with it") { val senderProbe = TestProbe() @@ -211,9 +211,26 @@ class SiriusPersistenceActorTest extends NiceTest { val mockLog = makeMockLog(ListBuffer(event1, event2), 10L) val underTest = makePersistenceActor(siriusLog = mockLog) - senderProbe.send(underTest, GetLogSubrangeToLimit(8, 3)) + senderProbe.send(underTest, GetLogSubrangeWithLimit(8, Long.MaxValue, 3)) + + verifyFoldLeftWhile(mockLog, 8, 9) + senderProbe.expectMsg(PartialSubrange(8, 9, List(event1, event2))) + } + } + describe("when we can partially reply due to limit") { + it("should build the list of events and reply with it") { + val senderProbe = TestProbe() + + val event1 = mock[OrderedEvent] + doReturn(8L).when(event1).sequence + val event2 = mock[OrderedEvent] + doReturn(9L).when(event2).sequence + val mockLog = makeMockLog(ListBuffer(event1, event2), 11L) + val underTest = makePersistenceActor(siriusLog = mockLog) + + senderProbe.send(underTest, GetLogSubrangeWithLimit(8, 10, 2)) - verifyFoldLeftWhile(mockLog, 8) + verifyFoldLeftWhile(mockLog, 8, 10) senderProbe.expectMsg(PartialSubrange(8, 9, List(event1, event2))) } } @@ -224,9 +241,8 @@ class SiriusPersistenceActorTest extends NiceTest { val mockLog = makeMockLog(ListBuffer(), 5L) val underTest = makePersistenceActor(siriusLog = mockLog) - senderProbe.send(underTest, GetLogSubrangeToLimit(8, 11)) + senderProbe.send(underTest, GetLogSubrangeWithLimit(8, Long.MaxValue, 11)) - verifyFoldLeftWhile(mockLog, 8) senderProbe.expectMsg(EmptySubrange) } } diff --git a/src/test/scala/com/comcast/xfinity/sirius/itest/DoNothingSiriusLog.scala b/src/test/scala/com/comcast/xfinity/sirius/itest/DoNothingSiriusLog.scala index 7dceb0f7..731455a3 100644 --- a/src/test/scala/com/comcast/xfinity/sirius/itest/DoNothingSiriusLog.scala +++ b/src/test/scala/com/comcast/xfinity/sirius/itest/DoNothingSiriusLog.scala @@ -32,7 +32,9 @@ class DoNothingSiriusLog extends SiriusLog { acc0 } - override def foldLeftWhile[T](startSeq: Long)(acc0: T)(pred: T => Boolean)(foldFun: (T, OrderedEvent) => T): T = acc0 + override def foldLeftRangeWhile[T](startSeq: Long, endSeq: Long)(acc0: T)(pred: T => Boolean)(foldFun: (T, OrderedEvent) => T): T = { + acc0 + } override def getNextSeq = 1L diff --git a/src/test/scala/com/comcast/xfinity/sirius/uberstore/UberToolTest.scala b/src/test/scala/com/comcast/xfinity/sirius/uberstore/UberToolTest.scala index d20c2de6..f01d725f 100644 --- a/src/test/scala/com/comcast/xfinity/sirius/uberstore/UberToolTest.scala +++ b/src/test/scala/com/comcast/xfinity/sirius/uberstore/UberToolTest.scala @@ -36,29 +36,20 @@ object UberToolTest { def foldLeftRange[T](start: Long, end: Long)(acc0: T)(foldFun: (T, OrderedEvent) => T): T = events.filter(e => start <= e.sequence && e.sequence <= end).foldLeft(acc0)(foldFun) - def foldLeftWhile[T](startSeq: Long)(acc0: T)(pred: T => Boolean)(foldFun: (T, OrderedEvent) => T): T = { - var acc: T = acc0 - for (evt <- events) { - if (!pred(acc)) { - return acc - } - acc = foldFun(acc, evt) - } - acc + def foldLeftRangeWhile[T](start: Long, end: Long)(acc0: T)(pred: T => Boolean)(foldFun: (T, OrderedEvent) => T): T = { + val filtered = events.filter(e => start <= e.sequence && e.sequence <= end) + foldLeftRangeWhile(filtered, acc0, pred, foldFun) } @tailrec - private def foldLeftWhile[T](events: List[OrderedEvent], startSeq: Long, acc: T, pred: T => Boolean, foldFun: (T, OrderedEvent) => T): T = { + private def foldLeftRangeWhile[T](events: List[OrderedEvent], acc: T, pred: T => Boolean, foldFun: (T, OrderedEvent) => T): T = events match { case Nil => acc - case evt :: rest if evt.sequence < startSeq => - foldLeftWhile(rest, startSeq, acc, pred, foldFun) case _ if !pred(acc) => acc case evt :: rest => val accNew = foldFun(acc, evt) - foldLeftWhile(rest, startSeq, accNew, pred, foldFun) + foldLeftRangeWhile(rest, accNew, pred, foldFun) } - } def getNextSeq: Long = throw new IllegalStateException("not implemented") diff --git a/src/test/scala/com/comcast/xfinity/sirius/uberstore/segmented/SegmentedUberStoreTest.scala b/src/test/scala/com/comcast/xfinity/sirius/uberstore/segmented/SegmentedUberStoreTest.scala index 8ccc2f70..2223f1d4 100644 --- a/src/test/scala/com/comcast/xfinity/sirius/uberstore/segmented/SegmentedUberStoreTest.scala +++ b/src/test/scala/com/comcast/xfinity/sirius/uberstore/segmented/SegmentedUberStoreTest.scala @@ -197,7 +197,7 @@ class SegmentedUberStoreTest extends NiceTest { } } - describe("foldLeftWhile") { + describe("foldLeftRangeWhile") { it("should limit events based on predicate on accumulator") { createPopulatedSegment(dir, "1", Range.inclusive(1, 3).toList) createPopulatedSegment(dir, "5", Range.inclusive(4, 6).toList) @@ -206,7 +206,7 @@ class SegmentedUberStoreTest extends NiceTest { uberstore.writeEntry(OrderedEvent(10L, 1L, Delete("10"))) uberstore.writeEntry(OrderedEvent(11L, 1L, Delete("11"))) - val result = uberstore.foldLeftWhile(startSeq = 1)(List[SiriusRequest]())(list => list.size < 5)( + val result = uberstore.foldLeftRangeWhile(startSeq = 1, endSeq = Long.MaxValue)(List[SiriusRequest]())(list => list.size < 5)( (acc, event) => event.request +: acc ).reverse @@ -221,7 +221,7 @@ class SegmentedUberStoreTest extends NiceTest { uberstore.writeEntry(OrderedEvent(10L, 1L, Delete("10"))) uberstore.writeEntry(OrderedEvent(11L, 1L, Delete("11"))) - val result = uberstore.foldLeftWhile(startSeq = 9)(List[SiriusRequest]())(list => list.size < 2)( + val result = uberstore.foldLeftRangeWhile(startSeq = 9, endSeq = Long.MaxValue)(List[SiriusRequest]())(list => list.size < 2)( (acc, event) => event.request +: acc ).reverse From e6dfb3bee94adffe7e5938db1613bbe23518389a Mon Sep 17 00:00:00 2001 From: "Spindler, Justin" Date: Sun, 16 Feb 2025 18:02:25 -0500 Subject: [PATCH 6/8] nix refactor of CatchupSupervisor, for now --- .../xfinity/sirius/api/impl/bridge/CatchupSupervisor.scala | 4 ++-- .../sirius/api/impl/bridge/CatchupSupervisorTest.scala | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/main/scala/com/comcast/xfinity/sirius/api/impl/bridge/CatchupSupervisor.scala b/src/main/scala/com/comcast/xfinity/sirius/api/impl/bridge/CatchupSupervisor.scala index 63e6c707..13259fe3 100644 --- a/src/main/scala/com/comcast/xfinity/sirius/api/impl/bridge/CatchupSupervisor.scala +++ b/src/main/scala/com/comcast/xfinity/sirius/api/impl/bridge/CatchupSupervisor.scala @@ -22,7 +22,7 @@ import scala.concurrent.duration._ import com.comcast.xfinity.sirius.api.impl.membership.MembershipHelper import com.comcast.xfinity.sirius.api.SiriusConfiguration import com.comcast.xfinity.sirius.api.impl.bridge.CatchupSupervisor.CatchupSupervisorInfoMBean -import com.comcast.xfinity.sirius.api.impl.state.SiriusPersistenceActor.{CompleteSubrange, GetLogSubrangeWithLimit, LogSubrange} +import com.comcast.xfinity.sirius.api.impl.state.SiriusPersistenceActor.{CompleteSubrange, GetLogSubrange, GetLogSubrangeWithLimit, LogSubrange} import com.comcast.xfinity.sirius.admin.MonitoringHooks import scala.util.Success @@ -122,7 +122,7 @@ private[bridge] class CatchupSupervisor(membershipHelper: MembershipHelper, } def requestSubrange(fromSeq: Long, window: Int, source: ActorRef): Unit = { - source.ask(GetLogSubrangeWithLimit(fromSeq, Long.MaxValue, window))(timeout()).onComplete { + source.ask(GetLogSubrange(fromSeq, fromSeq + window))(timeout()).onComplete { case Success(logSubrange: LogSubrange) => self ! CatchupRequestSucceeded(logSubrange) case _ => self ! CatchupRequestFailed } diff --git a/src/test/scala/com/comcast/xfinity/sirius/api/impl/bridge/CatchupSupervisorTest.scala b/src/test/scala/com/comcast/xfinity/sirius/api/impl/bridge/CatchupSupervisorTest.scala index 9d6369c2..7a0c7938 100644 --- a/src/test/scala/com/comcast/xfinity/sirius/api/impl/bridge/CatchupSupervisorTest.scala +++ b/src/test/scala/com/comcast/xfinity/sirius/api/impl/bridge/CatchupSupervisorTest.scala @@ -53,7 +53,7 @@ class CatchupSupervisorTest extends NiceTest { underTest ! InitiateCatchup(1L) - remote.expectMsgClass(classOf[GetLogSubrangeWithLimit]) + remote.expectMsgClass(classOf[GetLogSubrange]) } describe("for successful requests with a complete subrange") { it("should forward the message on to the parent") { @@ -176,7 +176,7 @@ class CatchupSupervisorTest extends NiceTest { underTest ! ContinueCatchup(1L) - remote.expectMsg(GetLogSubrangeWithLimit(1L, Long.MaxValue, 1L)) + remote.expectMsg(GetLogSubrange(1L, 2L)) } it("should ignore InitiateCatchup requests if it's currently in catchup mode") { val remote = TestProbe() @@ -197,7 +197,7 @@ class CatchupSupervisorTest extends NiceTest { underTest ! StopCatchup underTest ! InitiateCatchup(1L) - remote.expectMsg(GetLogSubrangeWithLimit(1L, Long.MaxValue, 1L)) + remote.expectMsg(GetLogSubrange(1L, 2L)) } } } From 23381d48865da74ea4975909f81bb56bf4321129 Mon Sep 17 00:00:00 2001 From: "Spindler, Justin" Date: Sun, 16 Feb 2025 21:11:22 -0500 Subject: [PATCH 7/8] Make end sequence optional --- .../impl/state/SiriusPersistenceActor.scala | 34 +++++++++++++++++-- .../state/SiriusPersistenceActorTest.scala | 24 ++++++------- 2 files changed, 44 insertions(+), 14 deletions(-) diff --git a/src/main/scala/com/comcast/xfinity/sirius/api/impl/state/SiriusPersistenceActor.scala b/src/main/scala/com/comcast/xfinity/sirius/api/impl/state/SiriusPersistenceActor.scala index f1d341ca..8d97b00f 100644 --- a/src/main/scala/com/comcast/xfinity/sirius/api/impl/state/SiriusPersistenceActor.scala +++ b/src/main/scala/com/comcast/xfinity/sirius/api/impl/state/SiriusPersistenceActor.scala @@ -54,7 +54,7 @@ object SiriusPersistenceActor { * @param end last sequence number of the range, inclusive * @param limit the maximum number of events */ - case class GetLogSubrangeWithLimit(begin: Long, end: Long, limit: Long) extends LogQuery + case class GetLogSubrangeWithLimit(begin: Long, end: Option[Long], limit: Long) extends LogQuery trait LogSubrange trait PopulatedSubrange extends LogSubrange { @@ -143,9 +143,12 @@ class SiriusPersistenceActor(stateActor: ActorRef, lastWriteTime = thisWriteTime - case GetLogSubrangeWithLimit(start, end, limit) => + case GetLogSubrangeWithLimit(start, Some(end), limit) => sender ! queryLimitedSubrange(start, end, limit) + case GetLogSubrangeWithLimit(start, None, limit) => + sender ! queryLimited(start, limit) + case GetLogSubrange(start, end) => sender ! querySubrange(start, end) @@ -201,6 +204,33 @@ class SiriusPersistenceActor(stateActor: ActorRef, } } + private def queryLimited(rangeStart: Long, limit: Long): LogSubrange = { + if (limit <= 0) { + // invalid query subrange or limit, we can't send anything useful back + EmptySubrange + } else { + val nextSeq = siriusLog.getNextSeq + if (rangeStart >= nextSeq) { + // query is out of range, we can't send anything useful back + EmptySubrange + } else { + val nextSeq = siriusLog.getNextSeq + val lastSeq = nextSeq - 1 + val buffer = siriusLog.foldLeftRangeWhile(rangeStart, lastSeq)(ListBuffer.empty[OrderedEvent])( + // continue folding events as long as the buffer is smaller than the limit + buffer => buffer.size < limit + )( + (acc, event) => acc += event + ) + if (buffer.size < limit) { + CompleteSubrange(rangeStart, lastSeq, buffer.toList) + } else { + PartialSubrange(rangeStart, buffer.last.sequence, buffer.toList) + } + } + } + } + private def querySubrange(rangeStart: Long, rangeEnd: Long): LogSubrange = if (rangeEnd < rangeStart || rangeEnd <= 0) { // invalid query subrange or limit, we can't send anything useful back diff --git a/src/test/scala/com/comcast/xfinity/sirius/api/impl/state/SiriusPersistenceActorTest.scala b/src/test/scala/com/comcast/xfinity/sirius/api/impl/state/SiriusPersistenceActorTest.scala index d8aadf67..1ac37157 100644 --- a/src/test/scala/com/comcast/xfinity/sirius/api/impl/state/SiriusPersistenceActorTest.scala +++ b/src/test/scala/com/comcast/xfinity/sirius/api/impl/state/SiriusPersistenceActorTest.scala @@ -194,7 +194,7 @@ class SiriusPersistenceActorTest extends NiceTest { val mockLog = makeMockLog(ListBuffer(event1, event2), 10L) val underTest = makePersistenceActor(siriusLog = mockLog) - senderProbe.send(underTest, GetLogSubrangeWithLimit(1, 2, 2)) + senderProbe.send(underTest, GetLogSubrangeWithLimit(1, Some(2), 2)) verifyFoldLeftRanged(mockLog, 1, 2) senderProbe.expectMsg(CompleteSubrange(1, 2, List(event1, event2))) @@ -205,16 +205,16 @@ class SiriusPersistenceActorTest extends NiceTest { val senderProbe = TestProbe() val event1 = mock[OrderedEvent] - doReturn(8L).when(event1).sequence + doReturn(1L).when(event1).sequence val event2 = mock[OrderedEvent] - doReturn(9L).when(event2).sequence + doReturn(2L).when(event2).sequence val mockLog = makeMockLog(ListBuffer(event1, event2), 10L) val underTest = makePersistenceActor(siriusLog = mockLog) - senderProbe.send(underTest, GetLogSubrangeWithLimit(8, Long.MaxValue, 3)) + senderProbe.send(underTest, GetLogSubrangeWithLimit(1, None, 3)) - verifyFoldLeftWhile(mockLog, 8, 9) - senderProbe.expectMsg(PartialSubrange(8, 9, List(event1, event2))) + verifyFoldLeftWhile(mockLog, 1, 9) + senderProbe.expectMsg(CompleteSubrange(1, 9, List(event1, event2))) } } describe("when we can partially reply due to limit") { @@ -222,16 +222,16 @@ class SiriusPersistenceActorTest extends NiceTest { val senderProbe = TestProbe() val event1 = mock[OrderedEvent] - doReturn(8L).when(event1).sequence + doReturn(1L).when(event1).sequence val event2 = mock[OrderedEvent] - doReturn(9L).when(event2).sequence + doReturn(2L).when(event2).sequence val mockLog = makeMockLog(ListBuffer(event1, event2), 11L) val underTest = makePersistenceActor(siriusLog = mockLog) - senderProbe.send(underTest, GetLogSubrangeWithLimit(8, 10, 2)) + senderProbe.send(underTest, GetLogSubrangeWithLimit(1, Some(10), 2)) - verifyFoldLeftWhile(mockLog, 8, 10) - senderProbe.expectMsg(PartialSubrange(8, 9, List(event1, event2))) + verifyFoldLeftWhile(mockLog, 1, 10) + senderProbe.expectMsg(PartialSubrange(1, 2, List(event1, event2))) } } describe("when we can't send anything useful at all") { @@ -241,7 +241,7 @@ class SiriusPersistenceActorTest extends NiceTest { val mockLog = makeMockLog(ListBuffer(), 5L) val underTest = makePersistenceActor(siriusLog = mockLog) - senderProbe.send(underTest, GetLogSubrangeWithLimit(8, Long.MaxValue, 11)) + senderProbe.send(underTest, GetLogSubrangeWithLimit(8, None, 11)) senderProbe.expectMsg(EmptySubrange) } From cc34439953665ae488418d66d66c7a7fc3351e52 Mon Sep 17 00:00:00 2001 From: "Spindler, Justin" Date: Tue, 18 Feb 2025 13:36:50 -0500 Subject: [PATCH 8/8] Fix catchup with limit, optionally support via config --- .../sirius/api/SiriusConfiguration.scala | 10 ++ .../api/impl/bridge/CatchupSupervisor.scala | 42 +++--- .../impl/state/SiriusPersistenceActor.scala | 124 +++++------------- .../impl/bridge/CatchupSupervisorTest.scala | 87 ++++++++---- .../state/SiriusPersistenceActorTest.scala | 22 ++-- .../sirius/itest/FullSystemITest.scala | 57 ++++++-- 6 files changed, 186 insertions(+), 156 deletions(-) diff --git a/src/main/scala/com/comcast/xfinity/sirius/api/SiriusConfiguration.scala b/src/main/scala/com/comcast/xfinity/sirius/api/SiriusConfiguration.scala index 24ff4aa1..e3cda3c1 100644 --- a/src/main/scala/com/comcast/xfinity/sirius/api/SiriusConfiguration.scala +++ b/src/main/scala/com/comcast/xfinity/sirius/api/SiriusConfiguration.scala @@ -298,6 +298,16 @@ object SiriusConfiguration { */ final val CATCHUP_MAX_WINDOW_SIZE = "sirius.catchup.max-window-size" + /** + * Use log limit for congestion avoidance for catchup. Default is false. + */ + final val CATCHUP_USE_LIMIT = "sirius.catchup.use-limit" + + /** + * Maximum catchup limit size, in number of events. Default is 1000. + */ + final val CATCHUP_MAX_LIMIT_SIZE = "sirius.catchup.max-limit-size" + /** * Starting ssthresh, which is the point where catchup transitions from Slow Start to * Congestion Avoidance. Default is 500. diff --git a/src/main/scala/com/comcast/xfinity/sirius/api/impl/bridge/CatchupSupervisor.scala b/src/main/scala/com/comcast/xfinity/sirius/api/impl/bridge/CatchupSupervisor.scala index 13259fe3..ee756f08 100644 --- a/src/main/scala/com/comcast/xfinity/sirius/api/impl/bridge/CatchupSupervisor.scala +++ b/src/main/scala/com/comcast/xfinity/sirius/api/impl/bridge/CatchupSupervisor.scala @@ -38,7 +38,7 @@ object CatchupSupervisor { trait CatchupSupervisorInfoMBean { def getSSThresh: Int - def getWindow: Int + def getLimit: Int } /** @@ -52,9 +52,14 @@ object CatchupSupervisor { val timeoutCoeff = config.getDouble(SiriusConfiguration.CATCHUP_TIMEOUT_INCREASE_PER_EVENT, .01) val timeoutConst = config.getDouble(SiriusConfiguration.CATCHUP_TIMEOUT_BASE, 1.0) val maxWindowSize = config.getInt(SiriusConfiguration.CATCHUP_MAX_WINDOW_SIZE, 1000) + val maxLimitSize = if (config.getProp(SiriusConfiguration.CATCHUP_USE_LIMIT, false)) { + config.getProp[Int](SiriusConfiguration.CATCHUP_MAX_LIMIT_SIZE) + } else { + None + } // must ensure ssthresh <= maxWindowSize - val startingSSThresh = Math.min(maxWindowSize, config.getInt(SiriusConfiguration.CATCHUP_DEFAULT_SSTHRESH, 500)) - Props(new CatchupSupervisor(membershipHelper, timeoutCoeff, timeoutConst, maxWindowSize, startingSSThresh, config)) + val startingSSThresh = Math.min(maxLimitSize.getOrElse(maxWindowSize), config.getInt(SiriusConfiguration.CATCHUP_DEFAULT_SSTHRESH, 500)) + Props(new CatchupSupervisor(membershipHelper, timeoutCoeff, timeoutConst, maxWindowSize, maxLimitSize, startingSSThresh, config)) } } @@ -78,28 +83,29 @@ private[bridge] class CatchupSupervisor(membershipHelper: MembershipHelper, timeoutCoeff: Double, timeoutConst: Double, maxWindowSize: Int, + maxLimitSize: Option[Int], var ssthresh: Int, config: SiriusConfiguration) extends Actor with MonitoringHooks { - var window = 1 - def timeout() = (timeoutConst + (window * timeoutCoeff)).seconds + var limit = 1 + def timeout() = (timeoutConst + (limit * timeoutCoeff)).seconds implicit val executionContext = context.dispatcher def receive = { case InitiateCatchup(fromSeq) => membershipHelper.getRandomMember.map(remote => { - requestSubrange(fromSeq, window, remote) + requestSubrange(fromSeq, limit, remote) context.become(catchup(remote)) }) } def catchup(source: ActorRef): Receive = { case CatchupRequestSucceeded(logSubrange: CompleteSubrange) => - if (window >= ssthresh) { // we're in Congestion Avoidance phase - window = Math.min(window + 2, maxWindowSize) + if (limit >= ssthresh) { // we're in Congestion Avoidance phase + limit = Math.min(limit + 2, maxLimitSize.getOrElse(maxWindowSize)) } else { // we're in Slow Start phase - window = Math.min(window * 2, ssthresh) + limit = Math.min(limit * 2, ssthresh) } context.parent ! logSubrange @@ -107,22 +113,26 @@ private[bridge] class CatchupSupervisor(membershipHelper: MembershipHelper, context.parent ! logSubrange case CatchupRequestFailed => - if (window != 1) { + if (limit != 1) { // adjust ssthresh, revert to Slow Start phase - ssthresh = Math.max(window / 2, 1) - window = 1 + ssthresh = Math.max(limit / 2, 1) + limit = 1 } context.unbecome() case ContinueCatchup(fromSeq: Long) => - requestSubrange(fromSeq, window, source) + requestSubrange(fromSeq, limit, source) case StopCatchup => context.unbecome() } - def requestSubrange(fromSeq: Long, window: Int, source: ActorRef): Unit = { - source.ask(GetLogSubrange(fromSeq, fromSeq + window))(timeout()).onComplete { + def requestSubrange(fromSeq: Long, limit: Int, source: ActorRef): Unit = { + val message = maxLimitSize match { + case Some(_) => GetLogSubrangeWithLimit(fromSeq, fromSeq + maxWindowSize, limit) + case None => GetLogSubrange(fromSeq, fromSeq + limit) + } + source.ask(message)(timeout()).onComplete { case Success(logSubrange: LogSubrange) => self ! CatchupRequestSucceeded(logSubrange) case _ => self ! CatchupRequestFailed } @@ -137,6 +147,6 @@ private[bridge] class CatchupSupervisor(membershipHelper: MembershipHelper, class CatchupSupervisorInfo extends CatchupSupervisorInfoMBean { def getSSThresh = ssthresh - def getWindow = window + def getLimit = limit } } diff --git a/src/main/scala/com/comcast/xfinity/sirius/api/impl/state/SiriusPersistenceActor.scala b/src/main/scala/com/comcast/xfinity/sirius/api/impl/state/SiriusPersistenceActor.scala index 8d97b00f..725aba31 100644 --- a/src/main/scala/com/comcast/xfinity/sirius/api/impl/state/SiriusPersistenceActor.scala +++ b/src/main/scala/com/comcast/xfinity/sirius/api/impl/state/SiriusPersistenceActor.scala @@ -30,6 +30,10 @@ object SiriusPersistenceActor { * that is, the state of persisted data */ sealed trait LogQuery + sealed trait LogQuerySubrange extends LogQuery { + def begin: Long + def end: Long + } case object GetLogSize extends LogQuery /** @@ -42,7 +46,7 @@ object SiriusPersistenceActor { * @param begin first sequence number of the range * @param end last sequence number of the range, inclusive */ - case class GetLogSubrange(begin: Long, end: Long) extends LogQuery + case class GetLogSubrange(begin: Long, end: Long) extends LogQuerySubrange /** * Message for directly requesting a chunk of the log from a node. * @@ -54,7 +58,7 @@ object SiriusPersistenceActor { * @param end last sequence number of the range, inclusive * @param limit the maximum number of events */ - case class GetLogSubrangeWithLimit(begin: Long, end: Option[Long], limit: Long) extends LogQuery + case class GetLogSubrangeWithLimit(begin: Long, end: Long, limit: Long) extends LogQuerySubrange trait LogSubrange trait PopulatedSubrange extends LogSubrange { @@ -143,14 +147,11 @@ class SiriusPersistenceActor(stateActor: ActorRef, lastWriteTime = thisWriteTime - case GetLogSubrangeWithLimit(start, Some(end), limit) => - sender ! queryLimitedSubrange(start, end, limit) - - case GetLogSubrangeWithLimit(start, None, limit) => - sender ! queryLimited(start, limit) - case GetLogSubrange(start, end) => - sender ! querySubrange(start, end) + sender ! querySubrange(start, end, Long.MaxValue) + + case GetLogSubrangeWithLimit(start, end, limit) => + sender ! querySubrange(start, end, limit) case GetNextLogSeq => sender ! siriusLog.getNextSeq @@ -163,99 +164,46 @@ class SiriusPersistenceActor(stateActor: ActorRef, case _: SiriusResult => } - private def queryLimitedSubrange(rangeStart: Long, rangeEnd: Long, limit: Long): LogSubrange = - if (limit <= 0 || rangeEnd < rangeStart || rangeEnd <= 0) { - // invalid query subrange or limit, we can't send anything useful back + private def querySubrange(rangeStart: Long, rangeEnd: Long, limit: Long): LogSubrange = { + val nextSeq = siriusLog.getNextSeq + val lastSeq = nextSeq - 1 + if (limit <= 0 || rangeEnd < rangeStart || rangeEnd <= 0 || rangeStart > lastSeq) { + // parameters are out of range, can't return anything useful EmptySubrange - } else if (limit > (rangeEnd - rangeStart)) { - // limit is larger than the subrange window, no need to enforce limit - querySubrange(rangeStart, rangeEnd) } else { - val nextSeq = siriusLog.getNextSeq - if (rangeStart >= nextSeq) { - // query is out of range, we can't send anything useful back - EmptySubrange - } else if (rangeEnd >= nextSeq) { - // we can only answer partially - val lastSeq = nextSeq - 1 - val buffer = siriusLog.foldLeftRangeWhile(rangeStart, lastSeq)(ListBuffer.empty[OrderedEvent])( - // continue folding events as long as the buffer is smaller than the limit - buffer => buffer.size < limit - )( - (acc, event) => acc += event - ) - if (buffer.size < limit) { - PartialSubrange(rangeStart, lastSeq, buffer.toList) - } else { - PartialSubrange(rangeStart, buffer.last.sequence, buffer.toList) - } - } else { - val buffer = siriusLog.foldLeftRangeWhile(rangeStart, rangeEnd)(ListBuffer.empty[OrderedEvent])( - // continue folding events as long as the buffer is smaller than the limit - buffer => buffer.size < limit - )( - (acc, event) => acc += event - ) - if (buffer.size < limit) { - CompleteSubrange(rangeStart, rangeEnd, buffer.toList) + val endSeq = if (rangeEnd > lastSeq) lastSeq else rangeEnd + if (limit > (endSeq - rangeStart)) { + // the limit is larger than the subrange window, so do not enforce + val events = siriusLog.foldLeftRange(rangeStart, endSeq)(ListBuffer.empty[OrderedEvent])( + (acc, evt) => acc += evt + ).toList + if (endSeq < rangeEnd) { + // the end of the range extends beyond the end of the log, so can only partially answer + PartialSubrange(rangeStart, endSeq, events) } else { - PartialSubrange(rangeStart, buffer.last.sequence, buffer.toList) + // the range is entirely within the log, so can fully answer + CompleteSubrange(rangeStart, endSeq, events) } - } - } - - private def queryLimited(rangeStart: Long, limit: Long): LogSubrange = { - if (limit <= 0) { - // invalid query subrange or limit, we can't send anything useful back - EmptySubrange - } else { - val nextSeq = siriusLog.getNextSeq - if (rangeStart >= nextSeq) { - // query is out of range, we can't send anything useful back - EmptySubrange } else { - val nextSeq = siriusLog.getNextSeq - val lastSeq = nextSeq - 1 - val buffer = siriusLog.foldLeftRangeWhile(rangeStart, lastSeq)(ListBuffer.empty[OrderedEvent])( - // continue folding events as long as the buffer is smaller than the limit + // the limit is smaller than the subrange window + val buffer = siriusLog.foldLeftRangeWhile(rangeStart, endSeq)(ListBuffer.empty[OrderedEvent])( buffer => buffer.size < limit )( - (acc, event) => acc += event + (acc, evt) => acc += evt ) - if (buffer.size < limit) { - CompleteSubrange(rangeStart, lastSeq, buffer.toList) + if (buffer.size < limit && endSeq < rangeEnd) { + // the end of the subrange extended part the end of the log + // and the buffer was not filled to the limit, so we can only partially respond + PartialSubrange(rangeStart, endSeq, buffer.toList) } else { - PartialSubrange(rangeStart, buffer.last.sequence, buffer.toList) + // the buffer was filled to the limit, so completely respond using the sequence of the + // last event as the end of the range + CompleteSubrange(rangeStart, buffer.last.sequence, buffer.toList) } } } } - private def querySubrange(rangeStart: Long, rangeEnd: Long): LogSubrange = - if (rangeEnd < rangeStart || rangeEnd <= 0) { - // invalid query subrange or limit, we can't send anything useful back - EmptySubrange - } else { - val nextSeq = siriusLog.getNextSeq - if (rangeStart >= nextSeq) { - // query is out of range, we can't send anything useful back - EmptySubrange - } else if (rangeEnd >= nextSeq) { - // we can answer partially - val lastSeq = nextSeq - 1 - val buffer = siriusLog.foldLeftRange(rangeStart, lastSeq)(ListBuffer.empty[OrderedEvent])( - (acc, event) => acc += event - ) - PartialSubrange(rangeStart, lastSeq, buffer.toList) - } else { - // we can answer fully - val buffer = siriusLog.foldLeftRange(rangeStart, rangeEnd)(ListBuffer.empty[OrderedEvent])( - (acc, event) => acc += event - ) - CompleteSubrange(rangeStart, rangeEnd, buffer.toList) - } - } - /** * Monitoring hooks */ diff --git a/src/test/scala/com/comcast/xfinity/sirius/api/impl/bridge/CatchupSupervisorTest.scala b/src/test/scala/com/comcast/xfinity/sirius/api/impl/bridge/CatchupSupervisorTest.scala index 7a0c7938..380858db 100644 --- a/src/test/scala/com/comcast/xfinity/sirius/api/impl/bridge/CatchupSupervisorTest.scala +++ b/src/test/scala/com/comcast/xfinity/sirius/api/impl/bridge/CatchupSupervisorTest.scala @@ -31,6 +31,7 @@ class CatchupSupervisorTest extends NiceTest { val atomicLong = new AtomicLong def makeMockCatchupSupervisor(remoteActorTry: Try[ActorRef] = Success(TestProbe().ref), maxWindowSize: Int = 1000, + maxLimitSize: Option[Int] = None, startingSSThresh: Int = 500, parent: ActorRef = TestProbe().ref): TestActorRef[CatchupSupervisor] = { val membershipHelper = mock[MembershipHelper] @@ -42,7 +43,7 @@ class CatchupSupervisorTest extends NiceTest { // in order to specify the parent, we also have to specify a name for this actor. using akka's approach. val name = "$" + base64(atomicLong.getAndIncrement) val props = Props(new CatchupSupervisor(membershipHelper, timeoutCoeff, timeoutConst, - maxWindowSize, startingSSThresh, new SiriusConfiguration())) + maxWindowSize, maxLimitSize, startingSSThresh, new SiriusConfiguration())) TestActorRef(props, parent, name) } @@ -68,36 +69,36 @@ class CatchupSupervisorTest extends NiceTest { } it("should update window for Slow Start phase") { val underTest = makeMockCatchupSupervisor(startingSSThresh = 100) - underTest.underlyingActor.window = 20 + underTest.underlyingActor.limit = 20 val mockSubrange = mock[CompleteSubrange] underTest ! InitiateCatchup(1L) underTest ! CatchupRequestSucceeded(mockSubrange) - assert(40 === underTest.underlyingActor.window) + assert(40 === underTest.underlyingActor.limit) } it("should update window for Congestion Avoidance phase") { val underTest = makeMockCatchupSupervisor(startingSSThresh = 100) - underTest.underlyingActor.window = 120 + underTest.underlyingActor.limit = 120 val mockSubrange = mock[CompleteSubrange] underTest ! InitiateCatchup(1L) underTest ! CatchupRequestSucceeded(mockSubrange) - assert(122 === underTest.underlyingActor.window) + assert(122 === underTest.underlyingActor.limit) } it("should not increase the window beyond maxWindowSize") { val underTest = makeMockCatchupSupervisor() - underTest.underlyingActor.window = 1000 + underTest.underlyingActor.limit = 1000 val mockSubrange = mock[CompleteSubrange] underTest ! InitiateCatchup(1L) underTest ! CatchupRequestSucceeded(mockSubrange) - assert(1000 === underTest.underlyingActor.window) + assert(1000 === underTest.underlyingActor.limit) } } describe("for successful requests with an empty subrange") { @@ -112,12 +113,12 @@ class CatchupSupervisorTest extends NiceTest { } it("should leave window and ssthresh unchanged") { val underTest = makeMockCatchupSupervisor(startingSSThresh = 100) - underTest.underlyingActor.window = 20 + underTest.underlyingActor.limit = 20 underTest ! InitiateCatchup(1L) underTest ! CatchupRequestSucceeded(EmptySubrange) - assert(20 === underTest.underlyingActor.window) + assert(20 === underTest.underlyingActor.limit) assert(100 === underTest.underlyingActor.ssthresh) } } @@ -134,14 +135,14 @@ class CatchupSupervisorTest extends NiceTest { } it("should leave window and ssthresh unchanged") { val underTest = makeMockCatchupSupervisor(startingSSThresh = 100) - underTest.underlyingActor.window = 20 + underTest.underlyingActor.limit = 20 val mockSubrange = mock[PartialSubrange] underTest ! InitiateCatchup(1L) underTest ! CatchupRequestSucceeded(mockSubrange) - assert(20 === underTest.underlyingActor.window) + assert(20 === underTest.underlyingActor.limit) assert(100 === underTest.underlyingActor.ssthresh) } } @@ -157,27 +158,18 @@ class CatchupSupervisorTest extends NiceTest { } it("should reduce window and ssthresh") { val underTest = makeMockCatchupSupervisor() - underTest.underlyingActor.window = 50 + underTest.underlyingActor.limit = 50 underTest.underlyingActor.ssthresh = 100 underTest ! InitiateCatchup(1L) underTest ! CatchupRequestFailed - assert(1 === underTest.underlyingActor.window) + assert(1 === underTest.underlyingActor.limit) assert(25 === underTest.underlyingActor.ssthresh) } } - it("should request the next subrange upon receiving a ContinueCatchup request while in catchup mode") { - val remote = TestProbe() - val underTest = makeMockCatchupSupervisor(remoteActorTry = Success(remote.ref)) - - underTest.underlyingActor.context.become(underTest.underlyingActor.catchup(remote.ref)) - underTest ! ContinueCatchup(1L) - - remote.expectMsg(GetLogSubrange(1L, 2L)) - } it("should ignore InitiateCatchup requests if it's currently in catchup mode") { val remote = TestProbe() val underTest = makeMockCatchupSupervisor(remoteActorTry = Success(remote.ref)) @@ -188,16 +180,53 @@ class CatchupSupervisorTest extends NiceTest { remote.expectNoMessage() } - it("should leave catchup mode and then be able to re-initiate catchup mode after receiving a StopCatchup") { - val remote = TestProbe() - val underTest = makeMockCatchupSupervisor(remoteActorTry = Success(remote.ref)) - underTest.underlyingActor.context.become(underTest.underlyingActor.catchup(remote.ref)) + describe("without a configured maximum limit") { + it("should request the next subrange upon receiving a ContinueCatchup request while in catchup mode") { + val remote = TestProbe() + val underTest = makeMockCatchupSupervisor(remoteActorTry = Success(remote.ref)) - underTest ! StopCatchup - underTest ! InitiateCatchup(1L) + underTest.underlyingActor.context.become(underTest.underlyingActor.catchup(remote.ref)) + + underTest ! ContinueCatchup(1L) + + remote.expectMsg(GetLogSubrange(1L, 2L)) + } + it("should leave catchup mode and then be able to re-initiate catchup mode after receiving a StopCatchup") { + val remote = TestProbe() + val underTest = makeMockCatchupSupervisor(remoteActorTry = Success(remote.ref)) + + underTest.underlyingActor.context.become(underTest.underlyingActor.catchup(remote.ref)) - remote.expectMsg(GetLogSubrange(1L, 2L)) + underTest ! StopCatchup + underTest ! InitiateCatchup(1L) + + remote.expectMsg(GetLogSubrange(1L, 2L)) + } + } + + describe("with a configured maximum limit") { + it("should request the next subrange upon receiving a ContinueCatchup request while in catchup mode") { + val remote = TestProbe() + val underTest = makeMockCatchupSupervisor(remoteActorTry = Success(remote.ref), maxLimitSize = Some(1000)) + + underTest.underlyingActor.context.become(underTest.underlyingActor.catchup(remote.ref)) + + underTest ! ContinueCatchup(1L) + + remote.expectMsg(GetLogSubrangeWithLimit(1L, 1001L, 1L)) + } + it("should leave catchup mode and then be able to re-initiate catchup mode after receiving a StopCatchup") { + val remote = TestProbe() + val underTest = makeMockCatchupSupervisor(remoteActorTry = Success(remote.ref), maxLimitSize = Some(1000)) + + underTest.underlyingActor.context.become(underTest.underlyingActor.catchup(remote.ref)) + + underTest ! StopCatchup + underTest ! InitiateCatchup(1L) + + remote.expectMsg(GetLogSubrangeWithLimit(1L, 1001L, 1L)) + } } } } diff --git a/src/test/scala/com/comcast/xfinity/sirius/api/impl/state/SiriusPersistenceActorTest.scala b/src/test/scala/com/comcast/xfinity/sirius/api/impl/state/SiriusPersistenceActorTest.scala index 1ac37157..a5d4f019 100644 --- a/src/test/scala/com/comcast/xfinity/sirius/api/impl/state/SiriusPersistenceActorTest.scala +++ b/src/test/scala/com/comcast/xfinity/sirius/api/impl/state/SiriusPersistenceActorTest.scala @@ -194,7 +194,7 @@ class SiriusPersistenceActorTest extends NiceTest { val mockLog = makeMockLog(ListBuffer(event1, event2), 10L) val underTest = makePersistenceActor(siriusLog = mockLog) - senderProbe.send(underTest, GetLogSubrangeWithLimit(1, Some(2), 2)) + senderProbe.send(underTest, GetLogSubrangeWithLimit(1, 2, 2)) verifyFoldLeftRanged(mockLog, 1, 2) senderProbe.expectMsg(CompleteSubrange(1, 2, List(event1, event2))) @@ -205,16 +205,14 @@ class SiriusPersistenceActorTest extends NiceTest { val senderProbe = TestProbe() val event1 = mock[OrderedEvent] - doReturn(1L).when(event1).sequence val event2 = mock[OrderedEvent] - doReturn(2L).when(event2).sequence val mockLog = makeMockLog(ListBuffer(event1, event2), 10L) val underTest = makePersistenceActor(siriusLog = mockLog) - senderProbe.send(underTest, GetLogSubrangeWithLimit(1, None, 3)) + senderProbe.send(underTest, GetLogSubrangeWithLimit(8, 11, 3)) - verifyFoldLeftWhile(mockLog, 1, 9) - senderProbe.expectMsg(CompleteSubrange(1, 9, List(event1, event2))) + verifyFoldLeftRanged(mockLog, 8, 9) + senderProbe.expectMsg(PartialSubrange(8, 9, List(event1, event2))) } } describe("when we can partially reply due to limit") { @@ -222,16 +220,16 @@ class SiriusPersistenceActorTest extends NiceTest { val senderProbe = TestProbe() val event1 = mock[OrderedEvent] - doReturn(1L).when(event1).sequence + doReturn(8L).when(event1).sequence val event2 = mock[OrderedEvent] - doReturn(2L).when(event2).sequence + doReturn(9L).when(event2).sequence val mockLog = makeMockLog(ListBuffer(event1, event2), 11L) val underTest = makePersistenceActor(siriusLog = mockLog) - senderProbe.send(underTest, GetLogSubrangeWithLimit(1, Some(10), 2)) + senderProbe.send(underTest, GetLogSubrangeWithLimit(8, 10, 2)) - verifyFoldLeftWhile(mockLog, 1, 10) - senderProbe.expectMsg(PartialSubrange(1, 2, List(event1, event2))) + verifyFoldLeftWhile(mockLog, 8, 10) + senderProbe.expectMsg(CompleteSubrange(8, 9, List(event1, event2))) } } describe("when we can't send anything useful at all") { @@ -241,7 +239,7 @@ class SiriusPersistenceActorTest extends NiceTest { val mockLog = makeMockLog(ListBuffer(), 5L) val underTest = makePersistenceActor(siriusLog = mockLog) - senderProbe.send(underTest, GetLogSubrangeWithLimit(8, None, 11)) + senderProbe.send(underTest, GetLogSubrangeWithLimit(8, Long.MaxValue, 11)) senderProbe.expectMsg(EmptySubrange) } diff --git a/src/test/scala/com/comcast/xfinity/sirius/itest/FullSystemITest.scala b/src/test/scala/com/comcast/xfinity/sirius/itest/FullSystemITest.scala index b6e3d9ab..bacdd7fd 100644 --- a/src/test/scala/com/comcast/xfinity/sirius/itest/FullSystemITest.scala +++ b/src/test/scala/com/comcast/xfinity/sirius/itest/FullSystemITest.scala @@ -95,6 +95,7 @@ class FullSystemITest extends NiceTest with TimedTest { replicaReproposalWindow: Int = 10, sslEnabled: Boolean = false, maxWindowSize: Int = 1000, + maxLimitSize: Option[Int] = None, membershipPath: String = new JFile(tempDir, "membership").getAbsolutePath): (SiriusImpl, RequestHandler, SiriusLog) = { @@ -123,6 +124,11 @@ class FullSystemITest extends NiceTest with TimedTest { siriusConfig.setProp(SiriusConfiguration.REPROPOSAL_WINDOW, replicaReproposalWindow) siriusConfig.setProp(SiriusConfiguration.CATCHUP_MAX_WINDOW_SIZE, maxWindowSize) + if (maxLimitSize.isDefined) { + siriusConfig.setProp(SiriusConfiguration.CATCHUP_USE_LIMIT, true) + siriusConfig.setProp(SiriusConfiguration.CATCHUP_MAX_LIMIT_SIZE, maxLimitSize.get) + } + if (sslEnabled) { siriusConfig.setProp(SiriusConfiguration.AKKA_EXTERN_CONFIG, "src/test/resources/sirius-akka-base.conf") siriusConfig.setProp(SiriusConfiguration.KEY_STORE_LOCATION, "src/test/resources/keystore") @@ -224,12 +230,13 @@ class FullSystemITest extends NiceTest with TimedTest { } describe("a full sirius implementation") { - it("must reach a decision for lots of slots") { + + def mustReachDecisionForLotsOfSlots(maxLimitSize: Option[Int]): Unit = { writeClusterConfig(List(42289, 42290, 42291)) val numCommands = 50 - val (sirius1, _, log1) = makeSirius(42289) - val (sirius2, _, log2) = makeSirius(42290) - val (sirius3, _, log3) = makeSirius(42291) + val (sirius1, _, log1) = makeSirius(42289, maxLimitSize = maxLimitSize) + val (sirius2, _, log2) = makeSirius(42290, maxLimitSize = maxLimitSize) + val (sirius3, _, log3) = makeSirius(42291, maxLimitSize = maxLimitSize) sirii = List(sirius1, sirius2, sirius3) waitForMembership(sirii, 3) @@ -248,11 +255,11 @@ class FullSystemITest extends NiceTest with TimedTest { "Wals were not equivalent") } - it("must be able to make progress with a node being down and then catch up") { + def mustMakeProgressWithNodeDown(maxLimitSize: Option[Int]): Unit = { writeClusterConfig(List(42289, 42290, 42291)) val numCommands = 50 - val (sirius1, _, log1) = makeSirius(42289) - val (sirius2, _, log2) = makeSirius(42290) + val (sirius1, _, log1) = makeSirius(42289, maxLimitSize = maxLimitSize) + val (sirius2, _, log2) = makeSirius(42290, maxLimitSize = maxLimitSize) sirii = List(sirius1, sirius2) waitForMembership(sirii, 2) @@ -277,12 +284,12 @@ class FullSystemITest extends NiceTest with TimedTest { "Original and caught-up wals were not equivalent") } - it("must work for master/slave mode") { + def mustWorkMasterSlaveNone(maxLimitSize: Option[Int]): Unit = { writeClusterConfig(List(42289, 42290, 42291)) val numCommands = 50 - val (sirius1, _, log1) = makeSirius(42289, replicaReproposalWindow = 4) - val (sirius2, _, log2) = makeSirius(42290, replicaReproposalWindow = 4) - val (sirius3, _, log3) = makeSirius(42291, replicaReproposalWindow = 4) + val (sirius1, _, log1) = makeSirius(42289, replicaReproposalWindow = 4, maxLimitSize = maxLimitSize) + val (sirius2, _, log2) = makeSirius(42290, replicaReproposalWindow = 4, maxLimitSize = maxLimitSize) + val (sirius3, _, log3) = makeSirius(42291, replicaReproposalWindow = 4, maxLimitSize = maxLimitSize) sirii = List(sirius1, sirius2, sirius3) waitForMembership(sirii, 3) @@ -337,6 +344,34 @@ class FullSystemITest extends NiceTest with TimedTest { assert(waitForTrue(verifyWalsAreEquivalent(List(log1, log2, log3)), 500, 100), "Master and slave wals not equivalent") } + + describe("using window-based catchup") { + it("must reach a decision for lots of slots") { + mustReachDecisionForLotsOfSlots(maxLimitSize = None) + } + + it("must be able to make progress with a node being down and then catch up") { + mustMakeProgressWithNodeDown(maxLimitSize = None) + } + + it("must work for master/slave mode") { + mustWorkMasterSlaveNone(maxLimitSize = None) + } + } + + describe("using limit-based catchup") { + it("must reach a decision for lots of slots") { + mustReachDecisionForLotsOfSlots(maxLimitSize = Some(1000)) + } + + it("must be able to make progress with a node being down and then catch up") { + mustMakeProgressWithNodeDown(maxLimitSize = Some(1000)) + } + + it("must work for master/slave mode") { + mustWorkMasterSlaveNone(maxLimitSize = Some(1000)) + } + } } it("should fail to start if a segmented WAL is specified but the LOG_VERSION_ID param does not match") {