Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,16 @@ object SiriusConfiguration {
*/
final val CATCHUP_MAX_WINDOW_SIZE = "sirius.catchup.max-window-size"

/**
* Use log limit for congestion avoidance for catchup. Default is false.
*/
final val CATCHUP_USE_LIMIT = "sirius.catchup.use-limit"

/**
* Maximum catchup limit size, in number of events. Default is 1000.
*/
final val CATCHUP_MAX_LIMIT_SIZE = "sirius.catchup.max-limit-size"

/**
* Starting ssthresh, which is the point where catchup transitions from Slow Start to
* Congestion Avoidance. Default is 500.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,16 @@
*/
package com.comcast.xfinity.sirius.api.impl.bridge

import akka.actor.{ActorRef, Props, Actor}
import akka.actor.{Actor, ActorRef, Props}
import akka.pattern.ask

import scala.concurrent.duration._
import com.comcast.xfinity.sirius.api.impl.membership.MembershipHelper
import com.comcast.xfinity.sirius.api.SiriusConfiguration
import com.comcast.xfinity.sirius.api.impl.bridge.CatchupSupervisor.CatchupSupervisorInfoMBean
import com.comcast.xfinity.sirius.api.impl.state.SiriusPersistenceActor.{GetLogSubrange, LogSubrange, CompleteSubrange}
import com.comcast.xfinity.sirius.api.impl.state.SiriusPersistenceActor.{CompleteSubrange, GetLogSubrange, GetLogSubrangeWithLimit, LogSubrange}
import com.comcast.xfinity.sirius.admin.MonitoringHooks

import scala.util.Success

case class InitiateCatchup(fromSeq: Long)
Expand All @@ -36,7 +38,7 @@ object CatchupSupervisor {

trait CatchupSupervisorInfoMBean {
def getSSThresh: Int
def getWindow: Int
def getLimit: Int
}

/**
Expand All @@ -50,9 +52,14 @@ object CatchupSupervisor {
val timeoutCoeff = config.getDouble(SiriusConfiguration.CATCHUP_TIMEOUT_INCREASE_PER_EVENT, .01)
val timeoutConst = config.getDouble(SiriusConfiguration.CATCHUP_TIMEOUT_BASE, 1.0)
val maxWindowSize = config.getInt(SiriusConfiguration.CATCHUP_MAX_WINDOW_SIZE, 1000)
val maxLimitSize = if (config.getProp(SiriusConfiguration.CATCHUP_USE_LIMIT, false)) {
config.getProp[Int](SiriusConfiguration.CATCHUP_MAX_LIMIT_SIZE)
} else {
None
}
// must ensure ssthresh <= maxWindowSize
val startingSSThresh = Math.min(maxWindowSize, config.getInt(SiriusConfiguration.CATCHUP_DEFAULT_SSTHRESH, 500))
Props(new CatchupSupervisor(membershipHelper, timeoutCoeff, timeoutConst, maxWindowSize, startingSSThresh, config))
val startingSSThresh = Math.min(maxLimitSize.getOrElse(maxWindowSize), config.getInt(SiriusConfiguration.CATCHUP_DEFAULT_SSTHRESH, 500))
Props(new CatchupSupervisor(membershipHelper, timeoutCoeff, timeoutConst, maxWindowSize, maxLimitSize, startingSSThresh, config))
}
}

Expand All @@ -76,51 +83,56 @@ private[bridge] class CatchupSupervisor(membershipHelper: MembershipHelper,
timeoutCoeff: Double,
timeoutConst: Double,
maxWindowSize: Int,
maxLimitSize: Option[Int],
var ssthresh: Int,
config: SiriusConfiguration) extends Actor with MonitoringHooks {

var window = 1
def timeout() = (timeoutConst + (window * timeoutCoeff)).seconds
var limit = 1
def timeout() = (timeoutConst + (limit * timeoutCoeff)).seconds

implicit val executionContext = context.dispatcher

def receive = {
case InitiateCatchup(fromSeq) =>
membershipHelper.getRandomMember.map(remote => {
requestSubrange(fromSeq, window, remote)
requestSubrange(fromSeq, limit, remote)
context.become(catchup(remote))
})
}

def catchup(source: ActorRef): Receive = {
case CatchupRequestSucceeded(logSubrange: CompleteSubrange) =>
if (window >= ssthresh) { // we're in Congestion Avoidance phase
window = Math.min(window + 2, maxWindowSize)
if (limit >= ssthresh) { // we're in Congestion Avoidance phase
limit = Math.min(limit + 2, maxLimitSize.getOrElse(maxWindowSize))
} else { // we're in Slow Start phase
window = Math.min(window * 2, ssthresh)
limit = Math.min(limit * 2, ssthresh)
}
context.parent ! logSubrange

case CatchupRequestSucceeded(logSubrange) =>
context.parent ! logSubrange

case CatchupRequestFailed =>
if (window != 1) {
if (limit != 1) {
// adjust ssthresh, revert to Slow Start phase
ssthresh = Math.max(window / 2, 1)
window = 1
ssthresh = Math.max(limit / 2, 1)
limit = 1
}
context.unbecome()

case ContinueCatchup(fromSeq: Long) =>
requestSubrange(fromSeq, window, source)
requestSubrange(fromSeq, limit, source)

case StopCatchup =>
context.unbecome()
}

def requestSubrange(fromSeq: Long, window: Int, source: ActorRef): Unit = {
source.ask(GetLogSubrange(fromSeq, fromSeq + window))(timeout()).onComplete {
def requestSubrange(fromSeq: Long, limit: Int, source: ActorRef): Unit = {
val message = maxLimitSize match {
case Some(_) => GetLogSubrangeWithLimit(fromSeq, fromSeq + maxWindowSize, limit)
case None => GetLogSubrange(fromSeq, fromSeq + limit)
}
source.ask(message)(timeout()).onComplete {
case Success(logSubrange: LogSubrange) => self ! CatchupRequestSucceeded(logSubrange)
case _ => self ! CatchupRequestFailed
}
Expand All @@ -135,6 +147,6 @@ private[bridge] class CatchupSupervisor(membershipHelper: MembershipHelper,

class CatchupSupervisorInfo extends CatchupSupervisorInfoMBean {
def getSSThresh = ssthresh
def getWindow = window
def getLimit = limit
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,25 @@
*/
package com.comcast.xfinity.sirius.api.impl.state

import akka.actor.{Props, Actor, ActorRef}
import akka.actor.{Actor, ActorRef, Props}
import com.comcast.xfinity.sirius.writeaheadlog.SiriusLog
import com.comcast.xfinity.sirius.api.{SiriusConfiguration, SiriusResult}
import com.comcast.xfinity.sirius.api.impl._
import com.comcast.xfinity.sirius.admin.MonitoringHooks

import scala.collection.mutable.ListBuffer

object SiriusPersistenceActor {

/**
* Trait encapsulating _queries_ into Sirius's log's state,
* that is, the state of persisted data
*/
sealed trait LogQuery
sealed trait LogQuerySubrange extends LogQuery {
def begin: Long
def end: Long
}

case object GetLogSize extends LogQuery
/**
Expand All @@ -40,7 +46,19 @@ object SiriusPersistenceActor {
* @param begin first sequence number of the range
* @param end last sequence number of the range, inclusive
*/
case class GetLogSubrange(begin: Long, end: Long) extends LogQuery
case class GetLogSubrange(begin: Long, end: Long) extends LogQuerySubrange
/**
* Message for directly requesting a chunk of the log from a node.
*
* SiriusPersistenceActor is expected to reply with a LogSubrange
* when receiving this message. This range should be as complete
* as possible.
*
* @param begin first sequence number of the range
* @param end last sequence number of the range, inclusive
* @param limit the maximum number of events
*/
case class GetLogSubrangeWithLimit(begin: Long, end: Long, limit: Long) extends LogQuerySubrange

trait LogSubrange
trait PopulatedSubrange extends LogSubrange {
Expand Down Expand Up @@ -129,21 +147,11 @@ class SiriusPersistenceActor(stateActor: ActorRef,

lastWriteTime = thisWriteTime

case GetLogSubrange(rangeStart, rangeEnd) if rangeEnd < siriusLog.getNextSeq => // we can answer fully
val events = siriusLog.foldLeftRange(rangeStart, rangeEnd)(List[OrderedEvent]())(
(acc, event) => event :: acc
).reverse
sender ! CompleteSubrange(rangeStart, rangeEnd, events)
case GetLogSubrange(start, end) =>
sender ! querySubrange(start, end, Long.MaxValue)

case GetLogSubrange(rangeStart, rangeEnd) if siriusLog.getNextSeq <= rangeStart => // we can't send anything useful back
sender ! EmptySubrange

case GetLogSubrange(rangeStart, rangeEnd) => // we can respond partially
val lastSeq = siriusLog.getNextSeq - 1
val events = siriusLog.foldLeftRange(rangeStart, lastSeq)(List[OrderedEvent]())(
(acc, event) => event :: acc
).reverse
sender ! PartialSubrange(rangeStart, lastSeq, events)
case GetLogSubrangeWithLimit(start, end, limit) =>
sender ! querySubrange(start, end, limit)

case GetNextLogSeq =>
sender ! siriusLog.getNextSeq
Expand All @@ -156,6 +164,46 @@ class SiriusPersistenceActor(stateActor: ActorRef,
case _: SiriusResult =>
}

private def querySubrange(rangeStart: Long, rangeEnd: Long, limit: Long): LogSubrange = {
val nextSeq = siriusLog.getNextSeq
val lastSeq = nextSeq - 1
if (limit <= 0 || rangeEnd < rangeStart || rangeEnd <= 0 || rangeStart > lastSeq) {
// parameters are out of range, can't return anything useful
EmptySubrange
} else {
val endSeq = if (rangeEnd > lastSeq) lastSeq else rangeEnd
if (limit > (endSeq - rangeStart)) {
// the limit is larger than the subrange window, so do not enforce
val events = siriusLog.foldLeftRange(rangeStart, endSeq)(ListBuffer.empty[OrderedEvent])(
(acc, evt) => acc += evt
).toList
if (endSeq < rangeEnd) {
// the end of the range extends beyond the end of the log, so can only partially answer
PartialSubrange(rangeStart, endSeq, events)
} else {
// the range is entirely within the log, so can fully answer
CompleteSubrange(rangeStart, endSeq, events)
}
} else {
// the limit is smaller than the subrange window
val buffer = siriusLog.foldLeftRangeWhile(rangeStart, endSeq)(ListBuffer.empty[OrderedEvent])(
buffer => buffer.size < limit
)(
(acc, evt) => acc += evt
)
if (buffer.size < limit && endSeq < rangeEnd) {
// the end of the subrange extended part the end of the log
// and the buffer was not filled to the limit, so we can only partially respond
PartialSubrange(rangeStart, endSeq, buffer.toList)
} else {
// the buffer was filled to the limit, so completely respond using the sequence of the
// last event as the end of the range
CompleteSubrange(rangeStart, buffer.last.sequence, buffer.toList)
}
}
}
}

/**
* Monitoring hooks
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,11 @@ class UberPair(dataFile: UberDataFile, index: SeqIndex) {
)
}

def foldLeftRangeWhile[T](startSeq: Long, endSeq: Long)(acc0: T)(pred: T => Boolean)(foldFun: (T, OrderedEvent) => T): T = {
val (startOffset, endOffset) = index.getOffsetRange(startSeq, endSeq)
dataFile.foldLeftRangeWhile(startOffset, endOffset)(acc0)(pred)(foldFun)
}

/**
* Close underlying file handles or connections. This UberStoreFilePair should not be used after
* close is called.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,13 @@ class UberStore private[uberstore] (baseDir: String, uberpair: UberPair) extends
def foldLeftRange[T](startSeq: Long, endSeq: Long)(acc0: T)(foldFun: (T, OrderedEvent) => T): T =
uberpair.foldLeftRange(startSeq, endSeq)(acc0)(foldFun)

/**
* @inheritdoc
*/
def foldLeftRangeWhile[T](startSeq: Long, endSeq: Long)(acc0: T)(pred: T => Boolean)(foldFun: (T, OrderedEvent) => T): T = {
uberpair.foldLeftRangeWhile(startSeq, endSeq)(acc0)(pred)(foldFun)
}

/**
* Close underlying file handles or connections. This UberStore should not be used after
* close is called.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,15 +101,24 @@ private[uberstore] class UberDataFile(dataFileName: String,
def foldLeftRange[T](baseOff: Long, endOff: Long)(acc0: T)(foldFun: (T, Long, OrderedEvent) => T): T = {
val readHandle = fileHandleFactory.createReadHandle(dataFileName, baseOff)
try {
foldLeftUntil(readHandle, endOff, acc0, foldFun)
foldLeftUntilOffset(readHandle, endOff, acc0, foldFun)
} finally {
readHandle.close()
}
}

def foldLeftRangeWhile[T](baseOff:Long, endOff: Long)(acc0: T)(pred: T => Boolean)(foldFun: (T, OrderedEvent) => T): T = {
val readHandle = fileHandleFactory.createReadHandle(dataFileName, baseOff)
try {
foldLeftRangeWhile(readHandle, endOff, acc0, pred, foldFun)
} finally {
readHandle.close()
}
}

// private low low low level fold left
@tailrec
private def foldLeftUntil[T](readHandle: UberDataFileReadHandle, maxOffset: Long, acc: T, foldFun: (T, Long, OrderedEvent) => T): T = {
private def foldLeftUntilOffset[T](readHandle: UberDataFileReadHandle, maxOffset: Long, acc: T, foldFun: (T, Long, OrderedEvent) => T): T = {
val offset = readHandle.offset()
if (offset > maxOffset) {
acc
Expand All @@ -118,7 +127,22 @@ private[uberstore] class UberDataFile(dataFileName: String,
case None => acc
case Some(bytes) =>
val accNew = foldFun(acc, offset, codec.deserialize(bytes))
foldLeftUntil(readHandle, maxOffset, accNew, foldFun)
foldLeftUntilOffset(readHandle, maxOffset, accNew, foldFun)
}
}
}

@tailrec
private def foldLeftRangeWhile[T](readHandle: UberDataFileReadHandle, maxOffset: Long, acc: T, pred: T => Boolean, foldFun: (T, OrderedEvent) => T): T = {
val offset = readHandle.offset()
if (offset > maxOffset || !pred(acc)) {
acc
} else {
fileOps.readNext(readHandle) match {
case None => acc
case Some(bytes) =>
val accNew = foldFun(acc, codec.deserialize(bytes))
foldLeftRangeWhile(readHandle, maxOffset, accNew, pred, foldFun)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ class Segment private[uberstore](val location: File,
* Get the number of entries written to the Segment
* @return number of entries in the Segment
*/
def size = index.size
def size: Long = index.size

/**
* Write OrderedEvent event into this dir. Will fail if closed or sequence
Expand Down Expand Up @@ -136,10 +136,15 @@ class Segment private[uberstore](val location: File,
(acc, event) => acc + event.request.key
)

/**
* Get the first sequence number in this dir.
*/
def getFirstSeq: Option[Long] = index.getMinSeq

/**
* Get the next possible sequence number in this dir.
*/
def getNextSeq = index.getMaxSeq match {
def getNextSeq: Long = index.getMaxSeq match {
case None => 1L
case Some(seq) => seq + 1
}
Expand Down Expand Up @@ -167,6 +172,16 @@ class Segment private[uberstore](val location: File,
)
}

def foldLeftRangeWhile[T](startSeq: Long, endSeq: Long)(acc0: T)(pred: T => Boolean)(foldFun: (T, OrderedEvent) => T): T =
index.getOffsetRange(startSeq, endSeq) match {
case (_, endOffset) if endOffset == -1 => // indicates an empty range
acc0
case (startOffset, endOffset) =>
dataFile.foldLeftRangeWhile(startOffset, endOffset)(acc0)(pred)(
(acc, evt) => foldFun(acc, evt)
)
}

/**
* Close underlying file handles or connections. This Segment should not be used after
* close is called.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,15 @@ class SegmentedUberStore private[segmented] (base: JFile,
liveDir.foldLeftRange(startSeq, endSeq)(res0)(foldFun)
}

/**
* @inheritdoc
*/
def foldLeftRangeWhile[T](startSeq: Long, endSeq: Long)(acc0: T)(pred: T => Boolean)(foldFun: (T, OrderedEvent) => T): T = {
val res0 = readOnlyDirs.foldLeft(acc0)(
(acc, dir) => dir.foldLeftRangeWhile(startSeq, endSeq)(acc)(pred)(foldFun)
)
liveDir.foldLeftRangeWhile(startSeq, endSeq)(res0)(pred)(foldFun)
}

/**
* Close underlying file handles or connections. This SegmentedUberStore should not be used after
Expand Down
Loading
Loading