diff --git a/src/main/scala/com/comcast/xfinity/sirius/api/SiriusConfiguration.scala b/src/main/scala/com/comcast/xfinity/sirius/api/SiriusConfiguration.scala index 24ff4aa1..e3cda3c1 100644 --- a/src/main/scala/com/comcast/xfinity/sirius/api/SiriusConfiguration.scala +++ b/src/main/scala/com/comcast/xfinity/sirius/api/SiriusConfiguration.scala @@ -298,6 +298,16 @@ object SiriusConfiguration { */ final val CATCHUP_MAX_WINDOW_SIZE = "sirius.catchup.max-window-size" + /** + * Use log limit for congestion avoidance for catchup. Default is false. + */ + final val CATCHUP_USE_LIMIT = "sirius.catchup.use-limit" + + /** + * Maximum catchup limit size, in number of events. Default is 1000. + */ + final val CATCHUP_MAX_LIMIT_SIZE = "sirius.catchup.max-limit-size" + /** * Starting ssthresh, which is the point where catchup transitions from Slow Start to * Congestion Avoidance. Default is 500. diff --git a/src/main/scala/com/comcast/xfinity/sirius/api/impl/bridge/CatchupSupervisor.scala b/src/main/scala/com/comcast/xfinity/sirius/api/impl/bridge/CatchupSupervisor.scala index ea0c148f..ee756f08 100644 --- a/src/main/scala/com/comcast/xfinity/sirius/api/impl/bridge/CatchupSupervisor.scala +++ b/src/main/scala/com/comcast/xfinity/sirius/api/impl/bridge/CatchupSupervisor.scala @@ -15,14 +15,16 @@ */ package com.comcast.xfinity.sirius.api.impl.bridge -import akka.actor.{ActorRef, Props, Actor} +import akka.actor.{Actor, ActorRef, Props} import akka.pattern.ask + import scala.concurrent.duration._ import com.comcast.xfinity.sirius.api.impl.membership.MembershipHelper import com.comcast.xfinity.sirius.api.SiriusConfiguration import com.comcast.xfinity.sirius.api.impl.bridge.CatchupSupervisor.CatchupSupervisorInfoMBean -import com.comcast.xfinity.sirius.api.impl.state.SiriusPersistenceActor.{GetLogSubrange, LogSubrange, CompleteSubrange} +import com.comcast.xfinity.sirius.api.impl.state.SiriusPersistenceActor.{CompleteSubrange, GetLogSubrange, GetLogSubrangeWithLimit, LogSubrange} import com.comcast.xfinity.sirius.admin.MonitoringHooks + import scala.util.Success case class InitiateCatchup(fromSeq: Long) @@ -36,7 +38,7 @@ object CatchupSupervisor { trait CatchupSupervisorInfoMBean { def getSSThresh: Int - def getWindow: Int + def getLimit: Int } /** @@ -50,9 +52,14 @@ object CatchupSupervisor { val timeoutCoeff = config.getDouble(SiriusConfiguration.CATCHUP_TIMEOUT_INCREASE_PER_EVENT, .01) val timeoutConst = config.getDouble(SiriusConfiguration.CATCHUP_TIMEOUT_BASE, 1.0) val maxWindowSize = config.getInt(SiriusConfiguration.CATCHUP_MAX_WINDOW_SIZE, 1000) + val maxLimitSize = if (config.getProp(SiriusConfiguration.CATCHUP_USE_LIMIT, false)) { + config.getProp[Int](SiriusConfiguration.CATCHUP_MAX_LIMIT_SIZE) + } else { + None + } // must ensure ssthresh <= maxWindowSize - val startingSSThresh = Math.min(maxWindowSize, config.getInt(SiriusConfiguration.CATCHUP_DEFAULT_SSTHRESH, 500)) - Props(new CatchupSupervisor(membershipHelper, timeoutCoeff, timeoutConst, maxWindowSize, startingSSThresh, config)) + val startingSSThresh = Math.min(maxLimitSize.getOrElse(maxWindowSize), config.getInt(SiriusConfiguration.CATCHUP_DEFAULT_SSTHRESH, 500)) + Props(new CatchupSupervisor(membershipHelper, timeoutCoeff, timeoutConst, maxWindowSize, maxLimitSize, startingSSThresh, config)) } } @@ -76,28 +83,29 @@ private[bridge] class CatchupSupervisor(membershipHelper: MembershipHelper, timeoutCoeff: Double, timeoutConst: Double, maxWindowSize: Int, + maxLimitSize: Option[Int], var ssthresh: Int, config: SiriusConfiguration) extends Actor with MonitoringHooks { - var window = 1 - def timeout() = (timeoutConst + (window * timeoutCoeff)).seconds + var limit = 1 + def timeout() = (timeoutConst + (limit * timeoutCoeff)).seconds implicit val executionContext = context.dispatcher def receive = { case InitiateCatchup(fromSeq) => membershipHelper.getRandomMember.map(remote => { - requestSubrange(fromSeq, window, remote) + requestSubrange(fromSeq, limit, remote) context.become(catchup(remote)) }) } def catchup(source: ActorRef): Receive = { case CatchupRequestSucceeded(logSubrange: CompleteSubrange) => - if (window >= ssthresh) { // we're in Congestion Avoidance phase - window = Math.min(window + 2, maxWindowSize) + if (limit >= ssthresh) { // we're in Congestion Avoidance phase + limit = Math.min(limit + 2, maxLimitSize.getOrElse(maxWindowSize)) } else { // we're in Slow Start phase - window = Math.min(window * 2, ssthresh) + limit = Math.min(limit * 2, ssthresh) } context.parent ! logSubrange @@ -105,22 +113,26 @@ private[bridge] class CatchupSupervisor(membershipHelper: MembershipHelper, context.parent ! logSubrange case CatchupRequestFailed => - if (window != 1) { + if (limit != 1) { // adjust ssthresh, revert to Slow Start phase - ssthresh = Math.max(window / 2, 1) - window = 1 + ssthresh = Math.max(limit / 2, 1) + limit = 1 } context.unbecome() case ContinueCatchup(fromSeq: Long) => - requestSubrange(fromSeq, window, source) + requestSubrange(fromSeq, limit, source) case StopCatchup => context.unbecome() } - def requestSubrange(fromSeq: Long, window: Int, source: ActorRef): Unit = { - source.ask(GetLogSubrange(fromSeq, fromSeq + window))(timeout()).onComplete { + def requestSubrange(fromSeq: Long, limit: Int, source: ActorRef): Unit = { + val message = maxLimitSize match { + case Some(_) => GetLogSubrangeWithLimit(fromSeq, fromSeq + maxWindowSize, limit) + case None => GetLogSubrange(fromSeq, fromSeq + limit) + } + source.ask(message)(timeout()).onComplete { case Success(logSubrange: LogSubrange) => self ! CatchupRequestSucceeded(logSubrange) case _ => self ! CatchupRequestFailed } @@ -135,6 +147,6 @@ private[bridge] class CatchupSupervisor(membershipHelper: MembershipHelper, class CatchupSupervisorInfo extends CatchupSupervisorInfoMBean { def getSSThresh = ssthresh - def getWindow = window + def getLimit = limit } } diff --git a/src/main/scala/com/comcast/xfinity/sirius/api/impl/state/SiriusPersistenceActor.scala b/src/main/scala/com/comcast/xfinity/sirius/api/impl/state/SiriusPersistenceActor.scala index 686ac709..725aba31 100644 --- a/src/main/scala/com/comcast/xfinity/sirius/api/impl/state/SiriusPersistenceActor.scala +++ b/src/main/scala/com/comcast/xfinity/sirius/api/impl/state/SiriusPersistenceActor.scala @@ -15,12 +15,14 @@ */ package com.comcast.xfinity.sirius.api.impl.state -import akka.actor.{Props, Actor, ActorRef} +import akka.actor.{Actor, ActorRef, Props} import com.comcast.xfinity.sirius.writeaheadlog.SiriusLog import com.comcast.xfinity.sirius.api.{SiriusConfiguration, SiriusResult} import com.comcast.xfinity.sirius.api.impl._ import com.comcast.xfinity.sirius.admin.MonitoringHooks +import scala.collection.mutable.ListBuffer + object SiriusPersistenceActor { /** @@ -28,6 +30,10 @@ object SiriusPersistenceActor { * that is, the state of persisted data */ sealed trait LogQuery + sealed trait LogQuerySubrange extends LogQuery { + def begin: Long + def end: Long + } case object GetLogSize extends LogQuery /** @@ -40,7 +46,19 @@ object SiriusPersistenceActor { * @param begin first sequence number of the range * @param end last sequence number of the range, inclusive */ - case class GetLogSubrange(begin: Long, end: Long) extends LogQuery + case class GetLogSubrange(begin: Long, end: Long) extends LogQuerySubrange + /** + * Message for directly requesting a chunk of the log from a node. + * + * SiriusPersistenceActor is expected to reply with a LogSubrange + * when receiving this message. This range should be as complete + * as possible. + * + * @param begin first sequence number of the range + * @param end last sequence number of the range, inclusive + * @param limit the maximum number of events + */ + case class GetLogSubrangeWithLimit(begin: Long, end: Long, limit: Long) extends LogQuerySubrange trait LogSubrange trait PopulatedSubrange extends LogSubrange { @@ -129,21 +147,11 @@ class SiriusPersistenceActor(stateActor: ActorRef, lastWriteTime = thisWriteTime - case GetLogSubrange(rangeStart, rangeEnd) if rangeEnd < siriusLog.getNextSeq => // we can answer fully - val events = siriusLog.foldLeftRange(rangeStart, rangeEnd)(List[OrderedEvent]())( - (acc, event) => event :: acc - ).reverse - sender ! CompleteSubrange(rangeStart, rangeEnd, events) + case GetLogSubrange(start, end) => + sender ! querySubrange(start, end, Long.MaxValue) - case GetLogSubrange(rangeStart, rangeEnd) if siriusLog.getNextSeq <= rangeStart => // we can't send anything useful back - sender ! EmptySubrange - - case GetLogSubrange(rangeStart, rangeEnd) => // we can respond partially - val lastSeq = siriusLog.getNextSeq - 1 - val events = siriusLog.foldLeftRange(rangeStart, lastSeq)(List[OrderedEvent]())( - (acc, event) => event :: acc - ).reverse - sender ! PartialSubrange(rangeStart, lastSeq, events) + case GetLogSubrangeWithLimit(start, end, limit) => + sender ! querySubrange(start, end, limit) case GetNextLogSeq => sender ! siriusLog.getNextSeq @@ -156,6 +164,46 @@ class SiriusPersistenceActor(stateActor: ActorRef, case _: SiriusResult => } + private def querySubrange(rangeStart: Long, rangeEnd: Long, limit: Long): LogSubrange = { + val nextSeq = siriusLog.getNextSeq + val lastSeq = nextSeq - 1 + if (limit <= 0 || rangeEnd < rangeStart || rangeEnd <= 0 || rangeStart > lastSeq) { + // parameters are out of range, can't return anything useful + EmptySubrange + } else { + val endSeq = if (rangeEnd > lastSeq) lastSeq else rangeEnd + if (limit > (endSeq - rangeStart)) { + // the limit is larger than the subrange window, so do not enforce + val events = siriusLog.foldLeftRange(rangeStart, endSeq)(ListBuffer.empty[OrderedEvent])( + (acc, evt) => acc += evt + ).toList + if (endSeq < rangeEnd) { + // the end of the range extends beyond the end of the log, so can only partially answer + PartialSubrange(rangeStart, endSeq, events) + } else { + // the range is entirely within the log, so can fully answer + CompleteSubrange(rangeStart, endSeq, events) + } + } else { + // the limit is smaller than the subrange window + val buffer = siriusLog.foldLeftRangeWhile(rangeStart, endSeq)(ListBuffer.empty[OrderedEvent])( + buffer => buffer.size < limit + )( + (acc, evt) => acc += evt + ) + if (buffer.size < limit && endSeq < rangeEnd) { + // the end of the subrange extended part the end of the log + // and the buffer was not filled to the limit, so we can only partially respond + PartialSubrange(rangeStart, endSeq, buffer.toList) + } else { + // the buffer was filled to the limit, so completely respond using the sequence of the + // last event as the end of the range + CompleteSubrange(rangeStart, buffer.last.sequence, buffer.toList) + } + } + } + } + /** * Monitoring hooks */ diff --git a/src/main/scala/com/comcast/xfinity/sirius/uberstore/UberPair.scala b/src/main/scala/com/comcast/xfinity/sirius/uberstore/UberPair.scala index 439549d1..a1a8ac54 100644 --- a/src/main/scala/com/comcast/xfinity/sirius/uberstore/UberPair.scala +++ b/src/main/scala/com/comcast/xfinity/sirius/uberstore/UberPair.scala @@ -107,6 +107,11 @@ class UberPair(dataFile: UberDataFile, index: SeqIndex) { ) } + def foldLeftRangeWhile[T](startSeq: Long, endSeq: Long)(acc0: T)(pred: T => Boolean)(foldFun: (T, OrderedEvent) => T): T = { + val (startOffset, endOffset) = index.getOffsetRange(startSeq, endSeq) + dataFile.foldLeftRangeWhile(startOffset, endOffset)(acc0)(pred)(foldFun) + } + /** * Close underlying file handles or connections. This UberStoreFilePair should not be used after * close is called. diff --git a/src/main/scala/com/comcast/xfinity/sirius/uberstore/UberStore.scala b/src/main/scala/com/comcast/xfinity/sirius/uberstore/UberStore.scala index 10e026cf..294debb6 100644 --- a/src/main/scala/com/comcast/xfinity/sirius/uberstore/UberStore.scala +++ b/src/main/scala/com/comcast/xfinity/sirius/uberstore/UberStore.scala @@ -82,6 +82,13 @@ class UberStore private[uberstore] (baseDir: String, uberpair: UberPair) extends def foldLeftRange[T](startSeq: Long, endSeq: Long)(acc0: T)(foldFun: (T, OrderedEvent) => T): T = uberpair.foldLeftRange(startSeq, endSeq)(acc0)(foldFun) + /** + * @inheritdoc + */ + def foldLeftRangeWhile[T](startSeq: Long, endSeq: Long)(acc0: T)(pred: T => Boolean)(foldFun: (T, OrderedEvent) => T): T = { + uberpair.foldLeftRangeWhile(startSeq, endSeq)(acc0)(pred)(foldFun) + } + /** * Close underlying file handles or connections. This UberStore should not be used after * close is called. diff --git a/src/main/scala/com/comcast/xfinity/sirius/uberstore/data/UberDataFile.scala b/src/main/scala/com/comcast/xfinity/sirius/uberstore/data/UberDataFile.scala index f77bfaa6..4bd0d3e7 100644 --- a/src/main/scala/com/comcast/xfinity/sirius/uberstore/data/UberDataFile.scala +++ b/src/main/scala/com/comcast/xfinity/sirius/uberstore/data/UberDataFile.scala @@ -101,7 +101,16 @@ private[uberstore] class UberDataFile(dataFileName: String, def foldLeftRange[T](baseOff: Long, endOff: Long)(acc0: T)(foldFun: (T, Long, OrderedEvent) => T): T = { val readHandle = fileHandleFactory.createReadHandle(dataFileName, baseOff) try { - foldLeftUntil(readHandle, endOff, acc0, foldFun) + foldLeftUntilOffset(readHandle, endOff, acc0, foldFun) + } finally { + readHandle.close() + } + } + + def foldLeftRangeWhile[T](baseOff:Long, endOff: Long)(acc0: T)(pred: T => Boolean)(foldFun: (T, OrderedEvent) => T): T = { + val readHandle = fileHandleFactory.createReadHandle(dataFileName, baseOff) + try { + foldLeftRangeWhile(readHandle, endOff, acc0, pred, foldFun) } finally { readHandle.close() } @@ -109,7 +118,7 @@ private[uberstore] class UberDataFile(dataFileName: String, // private low low low level fold left @tailrec - private def foldLeftUntil[T](readHandle: UberDataFileReadHandle, maxOffset: Long, acc: T, foldFun: (T, Long, OrderedEvent) => T): T = { + private def foldLeftUntilOffset[T](readHandle: UberDataFileReadHandle, maxOffset: Long, acc: T, foldFun: (T, Long, OrderedEvent) => T): T = { val offset = readHandle.offset() if (offset > maxOffset) { acc @@ -118,7 +127,22 @@ private[uberstore] class UberDataFile(dataFileName: String, case None => acc case Some(bytes) => val accNew = foldFun(acc, offset, codec.deserialize(bytes)) - foldLeftUntil(readHandle, maxOffset, accNew, foldFun) + foldLeftUntilOffset(readHandle, maxOffset, accNew, foldFun) + } + } + } + + @tailrec + private def foldLeftRangeWhile[T](readHandle: UberDataFileReadHandle, maxOffset: Long, acc: T, pred: T => Boolean, foldFun: (T, OrderedEvent) => T): T = { + val offset = readHandle.offset() + if (offset > maxOffset || !pred(acc)) { + acc + } else { + fileOps.readNext(readHandle) match { + case None => acc + case Some(bytes) => + val accNew = foldFun(acc, codec.deserialize(bytes)) + foldLeftRangeWhile(readHandle, maxOffset, accNew, pred, foldFun) } } } diff --git a/src/main/scala/com/comcast/xfinity/sirius/uberstore/segmented/Segment.scala b/src/main/scala/com/comcast/xfinity/sirius/uberstore/segmented/Segment.scala index d7497e33..cd5dbdb5 100644 --- a/src/main/scala/com/comcast/xfinity/sirius/uberstore/segmented/Segment.scala +++ b/src/main/scala/com/comcast/xfinity/sirius/uberstore/segmented/Segment.scala @@ -107,7 +107,7 @@ class Segment private[uberstore](val location: File, * Get the number of entries written to the Segment * @return number of entries in the Segment */ - def size = index.size + def size: Long = index.size /** * Write OrderedEvent event into this dir. Will fail if closed or sequence @@ -136,10 +136,15 @@ class Segment private[uberstore](val location: File, (acc, event) => acc + event.request.key ) + /** + * Get the first sequence number in this dir. + */ + def getFirstSeq: Option[Long] = index.getMinSeq + /** * Get the next possible sequence number in this dir. */ - def getNextSeq = index.getMaxSeq match { + def getNextSeq: Long = index.getMaxSeq match { case None => 1L case Some(seq) => seq + 1 } @@ -167,6 +172,16 @@ class Segment private[uberstore](val location: File, ) } + def foldLeftRangeWhile[T](startSeq: Long, endSeq: Long)(acc0: T)(pred: T => Boolean)(foldFun: (T, OrderedEvent) => T): T = + index.getOffsetRange(startSeq, endSeq) match { + case (_, endOffset) if endOffset == -1 => // indicates an empty range + acc0 + case (startOffset, endOffset) => + dataFile.foldLeftRangeWhile(startOffset, endOffset)(acc0)(pred)( + (acc, evt) => foldFun(acc, evt) + ) + } + /** * Close underlying file handles or connections. This Segment should not be used after * close is called. diff --git a/src/main/scala/com/comcast/xfinity/sirius/uberstore/segmented/SegmentedUberStore.scala b/src/main/scala/com/comcast/xfinity/sirius/uberstore/segmented/SegmentedUberStore.scala index 58316054..c6128e1b 100644 --- a/src/main/scala/com/comcast/xfinity/sirius/uberstore/segmented/SegmentedUberStore.scala +++ b/src/main/scala/com/comcast/xfinity/sirius/uberstore/segmented/SegmentedUberStore.scala @@ -151,6 +151,15 @@ class SegmentedUberStore private[segmented] (base: JFile, liveDir.foldLeftRange(startSeq, endSeq)(res0)(foldFun) } + /** + * @inheritdoc + */ + def foldLeftRangeWhile[T](startSeq: Long, endSeq: Long)(acc0: T)(pred: T => Boolean)(foldFun: (T, OrderedEvent) => T): T = { + val res0 = readOnlyDirs.foldLeft(acc0)( + (acc, dir) => dir.foldLeftRangeWhile(startSeq, endSeq)(acc)(pred)(foldFun) + ) + liveDir.foldLeftRangeWhile(startSeq, endSeq)(res0)(pred)(foldFun) + } /** * Close underlying file handles or connections. This SegmentedUberStore should not be used after diff --git a/src/main/scala/com/comcast/xfinity/sirius/uberstore/seqindex/DiskOnlySeqIndex.scala b/src/main/scala/com/comcast/xfinity/sirius/uberstore/seqindex/DiskOnlySeqIndex.scala index f0927f07..2b7ce875 100644 --- a/src/main/scala/com/comcast/xfinity/sirius/uberstore/seqindex/DiskOnlySeqIndex.scala +++ b/src/main/scala/com/comcast/xfinity/sirius/uberstore/seqindex/DiskOnlySeqIndex.scala @@ -51,7 +51,17 @@ class DiskOnlySeqIndex private(handle: RandomAccessFile, var isClosed = false var size: Long = handle.length() / 24 - var maxSeq = { + private var minSeq = { + if (handle.length == 0) + None + else { + handle.seek(0) + val (seq, _) = fileOps.readEntry(handle) + Some(seq) + } + } + + private var maxSeq = { if (handle.length == 0) None else { @@ -87,6 +97,11 @@ class DiskOnlySeqIndex private(handle: RandomAccessFile, getOffsetForAux(0, handle.length) } + /** + * {@inheritdoc} + */ + def getMinSeq: Option[Long] = minSeq + /** * {@inheritdoc} */ @@ -98,6 +113,9 @@ class DiskOnlySeqIndex private(handle: RandomAccessFile, def put(seq: Long, offset: Long): Unit = synchronized { handle.seek(handle.length) fileOps.put(handle, seq, offset) + if (minSeq.isEmpty) { + minSeq = Some(seq) + } maxSeq = Some(seq) size += 1 } diff --git a/src/main/scala/com/comcast/xfinity/sirius/uberstore/seqindex/SeqIndex.scala b/src/main/scala/com/comcast/xfinity/sirius/uberstore/seqindex/SeqIndex.scala index ef91d16f..2649b3d1 100644 --- a/src/main/scala/com/comcast/xfinity/sirius/uberstore/seqindex/SeqIndex.scala +++ b/src/main/scala/com/comcast/xfinity/sirius/uberstore/seqindex/SeqIndex.scala @@ -25,6 +25,13 @@ trait SeqIndex { */ def getOffsetFor(seq: Long): Option[Long] + /** + * Get the minimum sequence number stored, if such exists + * + * @return Some(sequence) or None if none such exists + */ + def getMinSeq: Option[Long] + /** * Get the maximum sequence number stored, if such exists * diff --git a/src/main/scala/com/comcast/xfinity/sirius/writeaheadlog/CachedSiriusLog.scala b/src/main/scala/com/comcast/xfinity/sirius/writeaheadlog/CachedSiriusLog.scala index fe265477..5c31ad01 100644 --- a/src/main/scala/com/comcast/xfinity/sirius/writeaheadlog/CachedSiriusLog.scala +++ b/src/main/scala/com/comcast/xfinity/sirius/writeaheadlog/CachedSiriusLog.scala @@ -16,8 +16,10 @@ package com.comcast.xfinity.sirius.writeaheadlog import com.comcast.xfinity.sirius.api.impl.OrderedEvent + import scala.collection.JavaConverters._ import java.util.{TreeMap => JTreeMap} +import scala.annotation.tailrec object CachedSiriusLog { /** @@ -89,25 +91,52 @@ class CachedSiriusLog(log: SiriusLog, maxCacheSize: Int) extends SiriusLog { * @param foldFun function that should take (accumulator, event) and return a new accumulator * @return the final value of the accumulator */ - override def foldLeftRange[T](startSeq: Long, endSeq: Long)(acc0: T)(foldFun: (T, OrderedEvent) => T): T = { + override def foldLeftRange[T](startSeq: Long, endSeq: Long)(acc0: T)(foldFun: (T, OrderedEvent) => T): T = (startSeq, endSeq) match { case (start, end) if (firstSeq <= start && end <= lastSeq) => foldLeftRangeCached(start, end)(acc0)(foldFun) case (start, end) => log.foldLeftRange(start, end)(acc0)(foldFun) } - } + + def foldLeftRangeWhile[T](startSeq: Long, endSeq: Long)(acc0: T)(pred: T => Boolean)(foldFun: (T, OrderedEvent) => T): T = + (startSeq, endSeq) match { + case (start, end) if (firstSeq <= start && end <= lastSeq) => + foldLeftRangeWhileCached(start, end)(acc0)(pred)(foldFun) + case (start, end) => + log.foldLeftRangeWhile(start, end)(acc0)(pred)(foldFun) + } /** * Private inner version of fold left. This one hits the cache, and assumes that start/endSeqs are * contained in the cache. Synchronizes on writeCache so we can subMap with no fear. */ private def foldLeftRangeCached[T](startSeq: Long, endSeq: Long) - (acc0: T)(foldFun: (T, OrderedEvent) => T): T = writeCache.synchronized { + (acc0: T)(foldFun: (T, OrderedEvent) => T): T = writeCache.synchronized { writeCache.subMap(startSeq, true, endSeq, true) .values.asScala.foldLeft(acc0)(foldFun) } + private def foldLeftRangeWhileCached[T](startSeq: Long, endSeq: Long) + (acc0: T) + (pred: T => Boolean) + (foldFun: (T, OrderedEvent) => T): T = writeCache.synchronized { + val iterable = writeCache.subMap(startSeq, true, endSeq, true) + .values.asScala + foldLeftRangeWhileCached(iterable.iterator, acc0, pred, foldFun) + } + + @tailrec + private def foldLeftRangeWhileCached[T](iter: Iterator[OrderedEvent], acc: T, pred: T => Boolean, foldFun: (T, OrderedEvent) => T): T = { + if (!(pred(acc) && iter.hasNext)) { + acc + } else { + val evt = iter.next() + val newAcc = foldFun(acc, evt) + foldLeftRangeWhileCached(iter, newAcc, pred, foldFun) + } + } + def getNextSeq = log.getNextSeq def compact(): Unit = { diff --git a/src/main/scala/com/comcast/xfinity/sirius/writeaheadlog/SiriusLog.scala b/src/main/scala/com/comcast/xfinity/sirius/writeaheadlog/SiriusLog.scala index 53254349..20a8a905 100644 --- a/src/main/scala/com/comcast/xfinity/sirius/writeaheadlog/SiriusLog.scala +++ b/src/main/scala/com/comcast/xfinity/sirius/writeaheadlog/SiriusLog.scala @@ -59,6 +59,15 @@ trait SiriusLog { */ def foldLeftRange[T](startSeq: Long, endSeq: Long)(acc0: T)(foldFun: (T, OrderedEvent) => T): T + /** + * Fold left across log entries while a condition is met + * @param startSeq sequence number to start with, inclusive + * @param acc0 initial accumulator value + * @param pred condition to continue accumulating log entries + * @param foldFun function to apply to the log entry, the result being the new accumulator + */ + def foldLeftRangeWhile[T](startSeq: Long, endSeq: Long)(acc0: T)(pred: T => Boolean)(foldFun: (T, OrderedEvent) => T): T + /** * retrieves the next sequence number to be written */ diff --git a/src/test/scala/com/comcast/xfinity/sirius/api/impl/bridge/CatchupSupervisorTest.scala b/src/test/scala/com/comcast/xfinity/sirius/api/impl/bridge/CatchupSupervisorTest.scala index 7a0c7938..380858db 100644 --- a/src/test/scala/com/comcast/xfinity/sirius/api/impl/bridge/CatchupSupervisorTest.scala +++ b/src/test/scala/com/comcast/xfinity/sirius/api/impl/bridge/CatchupSupervisorTest.scala @@ -31,6 +31,7 @@ class CatchupSupervisorTest extends NiceTest { val atomicLong = new AtomicLong def makeMockCatchupSupervisor(remoteActorTry: Try[ActorRef] = Success(TestProbe().ref), maxWindowSize: Int = 1000, + maxLimitSize: Option[Int] = None, startingSSThresh: Int = 500, parent: ActorRef = TestProbe().ref): TestActorRef[CatchupSupervisor] = { val membershipHelper = mock[MembershipHelper] @@ -42,7 +43,7 @@ class CatchupSupervisorTest extends NiceTest { // in order to specify the parent, we also have to specify a name for this actor. using akka's approach. val name = "$" + base64(atomicLong.getAndIncrement) val props = Props(new CatchupSupervisor(membershipHelper, timeoutCoeff, timeoutConst, - maxWindowSize, startingSSThresh, new SiriusConfiguration())) + maxWindowSize, maxLimitSize, startingSSThresh, new SiriusConfiguration())) TestActorRef(props, parent, name) } @@ -68,36 +69,36 @@ class CatchupSupervisorTest extends NiceTest { } it("should update window for Slow Start phase") { val underTest = makeMockCatchupSupervisor(startingSSThresh = 100) - underTest.underlyingActor.window = 20 + underTest.underlyingActor.limit = 20 val mockSubrange = mock[CompleteSubrange] underTest ! InitiateCatchup(1L) underTest ! CatchupRequestSucceeded(mockSubrange) - assert(40 === underTest.underlyingActor.window) + assert(40 === underTest.underlyingActor.limit) } it("should update window for Congestion Avoidance phase") { val underTest = makeMockCatchupSupervisor(startingSSThresh = 100) - underTest.underlyingActor.window = 120 + underTest.underlyingActor.limit = 120 val mockSubrange = mock[CompleteSubrange] underTest ! InitiateCatchup(1L) underTest ! CatchupRequestSucceeded(mockSubrange) - assert(122 === underTest.underlyingActor.window) + assert(122 === underTest.underlyingActor.limit) } it("should not increase the window beyond maxWindowSize") { val underTest = makeMockCatchupSupervisor() - underTest.underlyingActor.window = 1000 + underTest.underlyingActor.limit = 1000 val mockSubrange = mock[CompleteSubrange] underTest ! InitiateCatchup(1L) underTest ! CatchupRequestSucceeded(mockSubrange) - assert(1000 === underTest.underlyingActor.window) + assert(1000 === underTest.underlyingActor.limit) } } describe("for successful requests with an empty subrange") { @@ -112,12 +113,12 @@ class CatchupSupervisorTest extends NiceTest { } it("should leave window and ssthresh unchanged") { val underTest = makeMockCatchupSupervisor(startingSSThresh = 100) - underTest.underlyingActor.window = 20 + underTest.underlyingActor.limit = 20 underTest ! InitiateCatchup(1L) underTest ! CatchupRequestSucceeded(EmptySubrange) - assert(20 === underTest.underlyingActor.window) + assert(20 === underTest.underlyingActor.limit) assert(100 === underTest.underlyingActor.ssthresh) } } @@ -134,14 +135,14 @@ class CatchupSupervisorTest extends NiceTest { } it("should leave window and ssthresh unchanged") { val underTest = makeMockCatchupSupervisor(startingSSThresh = 100) - underTest.underlyingActor.window = 20 + underTest.underlyingActor.limit = 20 val mockSubrange = mock[PartialSubrange] underTest ! InitiateCatchup(1L) underTest ! CatchupRequestSucceeded(mockSubrange) - assert(20 === underTest.underlyingActor.window) + assert(20 === underTest.underlyingActor.limit) assert(100 === underTest.underlyingActor.ssthresh) } } @@ -157,27 +158,18 @@ class CatchupSupervisorTest extends NiceTest { } it("should reduce window and ssthresh") { val underTest = makeMockCatchupSupervisor() - underTest.underlyingActor.window = 50 + underTest.underlyingActor.limit = 50 underTest.underlyingActor.ssthresh = 100 underTest ! InitiateCatchup(1L) underTest ! CatchupRequestFailed - assert(1 === underTest.underlyingActor.window) + assert(1 === underTest.underlyingActor.limit) assert(25 === underTest.underlyingActor.ssthresh) } } - it("should request the next subrange upon receiving a ContinueCatchup request while in catchup mode") { - val remote = TestProbe() - val underTest = makeMockCatchupSupervisor(remoteActorTry = Success(remote.ref)) - - underTest.underlyingActor.context.become(underTest.underlyingActor.catchup(remote.ref)) - underTest ! ContinueCatchup(1L) - - remote.expectMsg(GetLogSubrange(1L, 2L)) - } it("should ignore InitiateCatchup requests if it's currently in catchup mode") { val remote = TestProbe() val underTest = makeMockCatchupSupervisor(remoteActorTry = Success(remote.ref)) @@ -188,16 +180,53 @@ class CatchupSupervisorTest extends NiceTest { remote.expectNoMessage() } - it("should leave catchup mode and then be able to re-initiate catchup mode after receiving a StopCatchup") { - val remote = TestProbe() - val underTest = makeMockCatchupSupervisor(remoteActorTry = Success(remote.ref)) - underTest.underlyingActor.context.become(underTest.underlyingActor.catchup(remote.ref)) + describe("without a configured maximum limit") { + it("should request the next subrange upon receiving a ContinueCatchup request while in catchup mode") { + val remote = TestProbe() + val underTest = makeMockCatchupSupervisor(remoteActorTry = Success(remote.ref)) - underTest ! StopCatchup - underTest ! InitiateCatchup(1L) + underTest.underlyingActor.context.become(underTest.underlyingActor.catchup(remote.ref)) + + underTest ! ContinueCatchup(1L) + + remote.expectMsg(GetLogSubrange(1L, 2L)) + } + it("should leave catchup mode and then be able to re-initiate catchup mode after receiving a StopCatchup") { + val remote = TestProbe() + val underTest = makeMockCatchupSupervisor(remoteActorTry = Success(remote.ref)) + + underTest.underlyingActor.context.become(underTest.underlyingActor.catchup(remote.ref)) - remote.expectMsg(GetLogSubrange(1L, 2L)) + underTest ! StopCatchup + underTest ! InitiateCatchup(1L) + + remote.expectMsg(GetLogSubrange(1L, 2L)) + } + } + + describe("with a configured maximum limit") { + it("should request the next subrange upon receiving a ContinueCatchup request while in catchup mode") { + val remote = TestProbe() + val underTest = makeMockCatchupSupervisor(remoteActorTry = Success(remote.ref), maxLimitSize = Some(1000)) + + underTest.underlyingActor.context.become(underTest.underlyingActor.catchup(remote.ref)) + + underTest ! ContinueCatchup(1L) + + remote.expectMsg(GetLogSubrangeWithLimit(1L, 1001L, 1L)) + } + it("should leave catchup mode and then be able to re-initiate catchup mode after receiving a StopCatchup") { + val remote = TestProbe() + val underTest = makeMockCatchupSupervisor(remoteActorTry = Success(remote.ref), maxLimitSize = Some(1000)) + + underTest.underlyingActor.context.become(underTest.underlyingActor.catchup(remote.ref)) + + underTest ! StopCatchup + underTest ! InitiateCatchup(1L) + + remote.expectMsg(GetLogSubrangeWithLimit(1L, 1001L, 1L)) + } } } } diff --git a/src/test/scala/com/comcast/xfinity/sirius/api/impl/state/SiriusPersistenceActorTest.scala b/src/test/scala/com/comcast/xfinity/sirius/api/impl/state/SiriusPersistenceActorTest.scala index 55f2d194..a5d4f019 100644 --- a/src/test/scala/com/comcast/xfinity/sirius/api/impl/state/SiriusPersistenceActorTest.scala +++ b/src/test/scala/com/comcast/xfinity/sirius/api/impl/state/SiriusPersistenceActorTest.scala @@ -19,15 +19,15 @@ import akka.actor.{ActorRef, ActorSystem} import akka.testkit.TestActorRef import akka.testkit.TestProbe import com.comcast.xfinity.sirius.writeaheadlog.SiriusLog - - import org.mockito.Mockito._ import com.comcast.xfinity.sirius.NiceTest -import com.comcast.xfinity.sirius.api.impl.{OrderedEvent, Put, Delete} +import com.comcast.xfinity.sirius.api.impl.{Delete, OrderedEvent, Put} import com.comcast.xfinity.sirius.api.impl.state.SiriusPersistenceActor._ -import org.mockito.ArgumentMatchers.{any, eq => meq, anyLong} +import org.mockito.ArgumentMatchers.{any, anyLong, eq => meq} import com.comcast.xfinity.sirius.api.SiriusConfiguration +import scala.collection.mutable.ListBuffer + class SiriusPersistenceActorTest extends NiceTest { implicit var actorSystem: ActorSystem = _ @@ -54,9 +54,10 @@ class SiriusPersistenceActorTest extends NiceTest { TestActorRef(new SiriusPersistenceActor(stateActor, siriusLog, config)) } - def makeMockLog(events: List[OrderedEvent], nextSeq: Long = 1L): SiriusLog = { + def makeMockLog(events: ListBuffer[OrderedEvent], nextSeq: Long = 1L): SiriusLog = { val mockLog = mock[SiriusLog] doReturn(events).when(mockLog).foldLeftRange(anyLong, anyLong)(any[Symbol])(anyFoldFun) + doReturn(events).when(mockLog).foldLeftRangeWhile(anyLong, anyLong)(any[Symbol])(anyPred)(anyFoldFun) doReturn(nextSeq).when(mockLog).getNextSeq mockLog @@ -64,8 +65,14 @@ class SiriusPersistenceActorTest extends NiceTest { def anyFoldFun = any[(Symbol, OrderedEvent) => Symbol]() + def anyPred = any[Symbol => Boolean] + def verifyFoldLeftRanged(siriusLog: SiriusLog, start: Long, end: Long): Unit = { - verify(siriusLog).foldLeftRange(meq(start), meq(end))(meq(List[OrderedEvent]()))(any[(List[OrderedEvent], OrderedEvent) => List[OrderedEvent]]()) + verify(siriusLog).foldLeftRange(meq(start), meq(end))(meq(ListBuffer[OrderedEvent]()))(any[(ListBuffer[OrderedEvent], OrderedEvent) => ListBuffer[OrderedEvent]]()) + } + + def verifyFoldLeftWhile(siriusLog: SiriusLog, start: Long, end: Long): Unit = { + verify(siriusLog).foldLeftRangeWhile(meq(start), meq(end))(meq(ListBuffer[OrderedEvent]()))(any[ListBuffer[OrderedEvent] => Boolean]())(any[(ListBuffer[OrderedEvent], OrderedEvent) => ListBuffer[OrderedEvent]]()) } describe("a SiriusPersistenceActor") { @@ -135,7 +142,7 @@ class SiriusPersistenceActorTest extends NiceTest { val event1 = mock[OrderedEvent] val event2 = mock[OrderedEvent] - val mockLog = makeMockLog(List(event2, event1), 10L) + val mockLog = makeMockLog(ListBuffer(event1, event2), 10L) val underTest = makePersistenceActor(siriusLog = mockLog) senderProbe.send(underTest, GetLogSubrange(1, 2)) @@ -150,7 +157,7 @@ class SiriusPersistenceActorTest extends NiceTest { val event1 = mock[OrderedEvent] val event2 = mock[OrderedEvent] - val mockLog = makeMockLog(List(event2, event1), 10L) + val mockLog = makeMockLog(ListBuffer(event1, event2), 10L) val underTest = makePersistenceActor(siriusLog = mockLog) senderProbe.send(underTest, GetLogSubrange(8, 11)) @@ -165,7 +172,7 @@ class SiriusPersistenceActorTest extends NiceTest { val event1 = mock[OrderedEvent] val event2 = mock[OrderedEvent] - val mockLog = makeMockLog(List(event2, event1), 5L) + val mockLog = makeMockLog(ListBuffer(event1, event2), 5L) val underTest = makePersistenceActor(siriusLog = mockLog) senderProbe.send(underTest, GetLogSubrange(8, 11)) @@ -174,5 +181,69 @@ class SiriusPersistenceActorTest extends NiceTest { } } } + + describe("upon receiving a GetLogSubrangeWithLimit message") { + describe("when we can fully reply") { + it("should build the list of events and reply with it") { + val senderProbe = TestProbe() + + val event1 = mock[OrderedEvent] + doReturn(1L).when(event1).sequence + val event2 = mock[OrderedEvent] + doReturn(2L).when(event2).sequence + val mockLog = makeMockLog(ListBuffer(event1, event2), 10L) + val underTest = makePersistenceActor(siriusLog = mockLog) + + senderProbe.send(underTest, GetLogSubrangeWithLimit(1, 2, 2)) + + verifyFoldLeftRanged(mockLog, 1, 2) + senderProbe.expectMsg(CompleteSubrange(1, 2, List(event1, event2))) + } + } + describe("when we can partially reply due to range") { + it("should build the list of events and reply with it") { + val senderProbe = TestProbe() + + val event1 = mock[OrderedEvent] + val event2 = mock[OrderedEvent] + val mockLog = makeMockLog(ListBuffer(event1, event2), 10L) + val underTest = makePersistenceActor(siriusLog = mockLog) + + senderProbe.send(underTest, GetLogSubrangeWithLimit(8, 11, 3)) + + verifyFoldLeftRanged(mockLog, 8, 9) + senderProbe.expectMsg(PartialSubrange(8, 9, List(event1, event2))) + } + } + describe("when we can partially reply due to limit") { + it("should build the list of events and reply with it") { + val senderProbe = TestProbe() + + val event1 = mock[OrderedEvent] + doReturn(8L).when(event1).sequence + val event2 = mock[OrderedEvent] + doReturn(9L).when(event2).sequence + val mockLog = makeMockLog(ListBuffer(event1, event2), 11L) + val underTest = makePersistenceActor(siriusLog = mockLog) + + senderProbe.send(underTest, GetLogSubrangeWithLimit(8, 10, 2)) + + verifyFoldLeftWhile(mockLog, 8, 10) + senderProbe.expectMsg(CompleteSubrange(8, 9, List(event1, event2))) + } + } + describe("when we can't send anything useful at all") { + it("should send back an EmptySubrange message") { + val senderProbe = TestProbe() + + val mockLog = makeMockLog(ListBuffer(), 5L) + val underTest = makePersistenceActor(siriusLog = mockLog) + + senderProbe.send(underTest, GetLogSubrangeWithLimit(8, Long.MaxValue, 11)) + + senderProbe.expectMsg(EmptySubrange) + } + } + } } } diff --git a/src/test/scala/com/comcast/xfinity/sirius/itest/DoNothingSiriusLog.scala b/src/test/scala/com/comcast/xfinity/sirius/itest/DoNothingSiriusLog.scala index c5cd6beb..731455a3 100644 --- a/src/test/scala/com/comcast/xfinity/sirius/itest/DoNothingSiriusLog.scala +++ b/src/test/scala/com/comcast/xfinity/sirius/itest/DoNothingSiriusLog.scala @@ -32,6 +32,10 @@ class DoNothingSiriusLog extends SiriusLog { acc0 } + override def foldLeftRangeWhile[T](startSeq: Long, endSeq: Long)(acc0: T)(pred: T => Boolean)(foldFun: (T, OrderedEvent) => T): T = { + acc0 + } + override def getNextSeq = 1L override def compact(): Unit = {} diff --git a/src/test/scala/com/comcast/xfinity/sirius/itest/FullSystemITest.scala b/src/test/scala/com/comcast/xfinity/sirius/itest/FullSystemITest.scala index b6e3d9ab..bacdd7fd 100644 --- a/src/test/scala/com/comcast/xfinity/sirius/itest/FullSystemITest.scala +++ b/src/test/scala/com/comcast/xfinity/sirius/itest/FullSystemITest.scala @@ -95,6 +95,7 @@ class FullSystemITest extends NiceTest with TimedTest { replicaReproposalWindow: Int = 10, sslEnabled: Boolean = false, maxWindowSize: Int = 1000, + maxLimitSize: Option[Int] = None, membershipPath: String = new JFile(tempDir, "membership").getAbsolutePath): (SiriusImpl, RequestHandler, SiriusLog) = { @@ -123,6 +124,11 @@ class FullSystemITest extends NiceTest with TimedTest { siriusConfig.setProp(SiriusConfiguration.REPROPOSAL_WINDOW, replicaReproposalWindow) siriusConfig.setProp(SiriusConfiguration.CATCHUP_MAX_WINDOW_SIZE, maxWindowSize) + if (maxLimitSize.isDefined) { + siriusConfig.setProp(SiriusConfiguration.CATCHUP_USE_LIMIT, true) + siriusConfig.setProp(SiriusConfiguration.CATCHUP_MAX_LIMIT_SIZE, maxLimitSize.get) + } + if (sslEnabled) { siriusConfig.setProp(SiriusConfiguration.AKKA_EXTERN_CONFIG, "src/test/resources/sirius-akka-base.conf") siriusConfig.setProp(SiriusConfiguration.KEY_STORE_LOCATION, "src/test/resources/keystore") @@ -224,12 +230,13 @@ class FullSystemITest extends NiceTest with TimedTest { } describe("a full sirius implementation") { - it("must reach a decision for lots of slots") { + + def mustReachDecisionForLotsOfSlots(maxLimitSize: Option[Int]): Unit = { writeClusterConfig(List(42289, 42290, 42291)) val numCommands = 50 - val (sirius1, _, log1) = makeSirius(42289) - val (sirius2, _, log2) = makeSirius(42290) - val (sirius3, _, log3) = makeSirius(42291) + val (sirius1, _, log1) = makeSirius(42289, maxLimitSize = maxLimitSize) + val (sirius2, _, log2) = makeSirius(42290, maxLimitSize = maxLimitSize) + val (sirius3, _, log3) = makeSirius(42291, maxLimitSize = maxLimitSize) sirii = List(sirius1, sirius2, sirius3) waitForMembership(sirii, 3) @@ -248,11 +255,11 @@ class FullSystemITest extends NiceTest with TimedTest { "Wals were not equivalent") } - it("must be able to make progress with a node being down and then catch up") { + def mustMakeProgressWithNodeDown(maxLimitSize: Option[Int]): Unit = { writeClusterConfig(List(42289, 42290, 42291)) val numCommands = 50 - val (sirius1, _, log1) = makeSirius(42289) - val (sirius2, _, log2) = makeSirius(42290) + val (sirius1, _, log1) = makeSirius(42289, maxLimitSize = maxLimitSize) + val (sirius2, _, log2) = makeSirius(42290, maxLimitSize = maxLimitSize) sirii = List(sirius1, sirius2) waitForMembership(sirii, 2) @@ -277,12 +284,12 @@ class FullSystemITest extends NiceTest with TimedTest { "Original and caught-up wals were not equivalent") } - it("must work for master/slave mode") { + def mustWorkMasterSlaveNone(maxLimitSize: Option[Int]): Unit = { writeClusterConfig(List(42289, 42290, 42291)) val numCommands = 50 - val (sirius1, _, log1) = makeSirius(42289, replicaReproposalWindow = 4) - val (sirius2, _, log2) = makeSirius(42290, replicaReproposalWindow = 4) - val (sirius3, _, log3) = makeSirius(42291, replicaReproposalWindow = 4) + val (sirius1, _, log1) = makeSirius(42289, replicaReproposalWindow = 4, maxLimitSize = maxLimitSize) + val (sirius2, _, log2) = makeSirius(42290, replicaReproposalWindow = 4, maxLimitSize = maxLimitSize) + val (sirius3, _, log3) = makeSirius(42291, replicaReproposalWindow = 4, maxLimitSize = maxLimitSize) sirii = List(sirius1, sirius2, sirius3) waitForMembership(sirii, 3) @@ -337,6 +344,34 @@ class FullSystemITest extends NiceTest with TimedTest { assert(waitForTrue(verifyWalsAreEquivalent(List(log1, log2, log3)), 500, 100), "Master and slave wals not equivalent") } + + describe("using window-based catchup") { + it("must reach a decision for lots of slots") { + mustReachDecisionForLotsOfSlots(maxLimitSize = None) + } + + it("must be able to make progress with a node being down and then catch up") { + mustMakeProgressWithNodeDown(maxLimitSize = None) + } + + it("must work for master/slave mode") { + mustWorkMasterSlaveNone(maxLimitSize = None) + } + } + + describe("using limit-based catchup") { + it("must reach a decision for lots of slots") { + mustReachDecisionForLotsOfSlots(maxLimitSize = Some(1000)) + } + + it("must be able to make progress with a node being down and then catch up") { + mustMakeProgressWithNodeDown(maxLimitSize = Some(1000)) + } + + it("must work for master/slave mode") { + mustWorkMasterSlaveNone(maxLimitSize = Some(1000)) + } + } } it("should fail to start if a segmented WAL is specified but the LOG_VERSION_ID param does not match") { diff --git a/src/test/scala/com/comcast/xfinity/sirius/uberstore/UberToolTest.scala b/src/test/scala/com/comcast/xfinity/sirius/uberstore/UberToolTest.scala index 635835ea..f01d725f 100644 --- a/src/test/scala/com/comcast/xfinity/sirius/uberstore/UberToolTest.scala +++ b/src/test/scala/com/comcast/xfinity/sirius/uberstore/UberToolTest.scala @@ -19,11 +19,13 @@ package com.comcast.xfinity.sirius.uberstore import com.comcast.xfinity.sirius.NiceTest import com.comcast.xfinity.sirius.writeaheadlog.SiriusLog import com.comcast.xfinity.sirius.api.impl.{Delete, OrderedEvent, Put} -import java.io.{File => JFile} +import java.io.{File => JFile} import better.files.File import com.comcast.xfinity.sirius.uberstore.segmented.SegmentedUberStore +import scala.annotation.tailrec + object UberToolTest { class DummySiriusLog(var events: List[OrderedEvent]) extends SiriusLog { def writeEntry(event: OrderedEvent): Unit = { @@ -34,6 +36,21 @@ object UberToolTest { def foldLeftRange[T](start: Long, end: Long)(acc0: T)(foldFun: (T, OrderedEvent) => T): T = events.filter(e => start <= e.sequence && e.sequence <= end).foldLeft(acc0)(foldFun) + def foldLeftRangeWhile[T](start: Long, end: Long)(acc0: T)(pred: T => Boolean)(foldFun: (T, OrderedEvent) => T): T = { + val filtered = events.filter(e => start <= e.sequence && e.sequence <= end) + foldLeftRangeWhile(filtered, acc0, pred, foldFun) + } + + @tailrec + private def foldLeftRangeWhile[T](events: List[OrderedEvent], acc: T, pred: T => Boolean, foldFun: (T, OrderedEvent) => T): T = + events match { + case Nil => acc + case _ if !pred(acc) => acc + case evt :: rest => + val accNew = foldFun(acc, evt) + foldLeftRangeWhile(rest, accNew, pred, foldFun) + } + def getNextSeq: Long = throw new IllegalStateException("not implemented") diff --git a/src/test/scala/com/comcast/xfinity/sirius/uberstore/segmented/SegmentedUberStoreTest.scala b/src/test/scala/com/comcast/xfinity/sirius/uberstore/segmented/SegmentedUberStoreTest.scala index 94b19f1c..2223f1d4 100644 --- a/src/test/scala/com/comcast/xfinity/sirius/uberstore/segmented/SegmentedUberStoreTest.scala +++ b/src/test/scala/com/comcast/xfinity/sirius/uberstore/segmented/SegmentedUberStoreTest.scala @@ -182,7 +182,7 @@ class SegmentedUberStoreTest extends NiceTest { } describe("foldLeftRange") { - it("should reflect livedir's nextSeq") { + it("should include livedir's events") { createSegment(dir, "1") createSegment(dir, "5") createSegment(dir, "10") @@ -197,6 +197,38 @@ class SegmentedUberStoreTest extends NiceTest { } } + describe("foldLeftRangeWhile") { + it("should limit events based on predicate on accumulator") { + createPopulatedSegment(dir, "1", Range.inclusive(1, 3).toList) + createPopulatedSegment(dir, "5", Range.inclusive(4, 6).toList) + createPopulatedSegment(dir, "10", Range.inclusive(7, 9).toList) + uberstore = SegmentedUberStore(dir.getAbsolutePath, new SiriusConfiguration) + uberstore.writeEntry(OrderedEvent(10L, 1L, Delete("10"))) + uberstore.writeEntry(OrderedEvent(11L, 1L, Delete("11"))) + + val result = uberstore.foldLeftRangeWhile(startSeq = 1, endSeq = Long.MaxValue)(List[SiriusRequest]())(list => list.size < 5)( + (acc, event) => event.request +: acc + ).reverse + + assert(result === List(Delete("1"), Delete("2"), Delete("3"), Delete("4"), Delete("5"))) + } + + it("should include livedir's events") { + createPopulatedSegment(dir, "1", Range.inclusive(1, 3).toList) + createPopulatedSegment(dir, "5", Range.inclusive(4, 6).toList) + createPopulatedSegment(dir, "10", Range.inclusive(7, 9).toList) + uberstore = SegmentedUberStore(dir.getAbsolutePath, new SiriusConfiguration) + uberstore.writeEntry(OrderedEvent(10L, 1L, Delete("10"))) + uberstore.writeEntry(OrderedEvent(11L, 1L, Delete("11"))) + + val result = uberstore.foldLeftRangeWhile(startSeq = 9, endSeq = Long.MaxValue)(List[SiriusRequest]())(list => list.size < 2)( + (acc, event) => event.request +: acc + ).reverse + + assert(result === List(Delete("9"), Delete("10"))) + } + } + describe("parallelForeach") { it("should bootstrap the uberstore in parallel") { createPopulatedSegment(dir, "1", Range.inclusive(1, 3).toList, isApplied = true)