diff --git a/modules/core/src/commonMain/kotlin/fr/acinq/lightning/channel/Commitments.kt b/modules/core/src/commonMain/kotlin/fr/acinq/lightning/channel/Commitments.kt index 95bc2df7c..4e5d5976c 100644 --- a/modules/core/src/commonMain/kotlin/fr/acinq/lightning/channel/Commitments.kt +++ b/modules/core/src/commonMain/kotlin/fr/acinq/lightning/channel/Commitments.kt @@ -762,7 +762,7 @@ data class Commitments( return failure?.let { Either.Left(it) } ?: Either.Right(copy(changes = changes1)) } - fun sendCommit(channelKeys: KeyManager.ChannelKeys, log: MDCLogger): Either>> { + fun sendCommit(channelKeys: KeyManager.ChannelKeys, log: MDCLogger): Either> { val remoteNextPerCommitmentPoint = remoteNextCommitInfo.right ?: return Either.Left(CannotSignBeforeRevocation(channelId)) if (!changes.localHasChanges()) return Either.Left(CannotSignWithoutChanges(channelId)) val (active1, sigs) = active.map { it.sendCommit(channelKeys, params, changes, remoteNextPerCommitmentPoint, active.size, log) }.unzip() @@ -774,18 +774,22 @@ data class Commitments( remoteChanges = changes.remoteChanges.copy(acked = emptyList(), signed = changes.remoteChanges.acked) ) ) - return Either.Right(Pair(commitments1, sigs)) + return Either.Right(Pair(commitments1, CommitSigs.fromSigs(sigs))) } - fun receiveCommit(commits: List, channelKeys: KeyManager.ChannelKeys, log: MDCLogger): Either> { + fun receiveCommit(commits: CommitSigs, channelKeys: KeyManager.ChannelKeys, log: MDCLogger): Either> { // We may receive more commit_sig than the number of active commitments, because there can be a race where we send splice_locked // while our peer is sending us a batch of commit_sig. When that happens, we simply need to discard the commit_sig that belong // to commitments we deactivated. - if (commits.size < active.size) { - return Either.Left(CommitSigCountMismatch(channelId, active.size, commits.size)) + val sigs = when (commits) { + is CommitSigBatch -> commits.messages + is CommitSig -> listOf(commits) + } + if (sigs.size < active.size) { + return Either.Left(CommitSigCountMismatch(channelId, active.size, sigs.size)) } // Signatures are sent in order (most recent first), calling `zip` will drop trailing sigs that are for deactivated/pruned commitments. - val active1 = active.zip(commits).map { + val active1 = active.zip(sigs).map { when (val commitment1 = it.first.receiveCommit(channelKeys, params, changes, it.second, log)) { is Either.Left -> return Either.Left(commitment1.value) is Either.Right -> commitment1.value diff --git a/modules/core/src/commonMain/kotlin/fr/acinq/lightning/channel/states/Channel.kt b/modules/core/src/commonMain/kotlin/fr/acinq/lightning/channel/states/Channel.kt index cda05574e..ce0a8f982 100644 --- a/modules/core/src/commonMain/kotlin/fr/acinq/lightning/channel/states/Channel.kt +++ b/modules/core/src/commonMain/kotlin/fr/acinq/lightning/channel/states/Channel.kt @@ -674,22 +674,6 @@ sealed class ChannelStateWithCommitments : PersistedChannelState() { } } } - - // in Normal and Shutdown we aggregate sigs for splices before processing - var sigStash = emptyList() - - /** For splices we will send one commit_sig per active commitments. */ - internal fun ChannelContext.aggregateSigs(commit: CommitSig): List? { - sigStash = sigStash + commit - logger.debug { "received sig for batch of size=${commit.batchSize}" } - return if (sigStash.size == commit.batchSize) { - val sigs = sigStash - sigStash = emptyList() - sigs - } else { - null - } - } } object Channel { diff --git a/modules/core/src/commonMain/kotlin/fr/acinq/lightning/channel/states/Normal.kt b/modules/core/src/commonMain/kotlin/fr/acinq/lightning/channel/states/Normal.kt index 1aded1905..2ef5a76b7 100644 --- a/modules/core/src/commonMain/kotlin/fr/acinq/lightning/channel/states/Normal.kt +++ b/modules/core/src/commonMain/kotlin/fr/acinq/lightning/channel/states/Normal.kt @@ -80,7 +80,7 @@ data class Normal( val actions = buildList { add(ChannelAction.Storage.StoreHtlcInfos(htlcInfos)) add(ChannelAction.Storage.StoreState(nextState)) - addAll(result.value.second.map { ChannelAction.Message.Send(it) }) + add(ChannelAction.Message.Send(result.value.second)) } Pair(nextState, actions) } @@ -173,12 +173,12 @@ data class Normal( is Either.Left -> handleLocalError(cmd, result.value) is Either.Right -> Pair(this@Normal.copy(commitments = result.value), listOf()) } - is CommitSig -> when { + is CommitSigs -> when { spliceStatus == SpliceStatus.Aborted -> { logger.warning { "received commit_sig after sending tx_abort, they probably sent it before receiving our tx_abort, ignoring..." } Pair(this@Normal, listOf()) } - spliceStatus is SpliceStatus.WaitingForSigs -> { + spliceStatus is SpliceStatus.WaitingForSigs && cmd.message is CommitSig -> { val (signingSession1, action) = spliceStatus.session.receiveCommitSig(channelKeys(), commitments.params, cmd.message, currentBlockHeight.toLong(), logger) when (action) { is InteractiveTxSigningSessionAction.AbortFundingAttempt -> { @@ -193,7 +193,7 @@ data class Normal( is InteractiveTxSigningSessionAction.SendTxSigs -> sendSpliceTxSigs(spliceStatus.origins, action, spliceStatus.liquidityPurchase) } } - ignoreRetransmittedCommitSig(cmd.message) -> { + cmd.message is CommitSig && ignoreRetransmittedCommitSig(cmd.message) -> { // We haven't received our peer's tx_signatures for the latest funding transaction and asked them to resend it on reconnection. // They also resend their corresponding commit_sig, but we have already received it so we should ignore it. // Note that the funding transaction may have confirmed while we were offline. @@ -202,34 +202,29 @@ data class Normal( } // NB: in all other cases we process the commit_sig normally. We could do a full pattern matching on all splice statuses, but it would force us to handle // corner cases like race condition between splice_init and a non-splice commit_sig - else -> { - when (val sigs = aggregateSigs(cmd.message)) { - is List -> when (val result = commitments.receiveCommit(sigs, channelKeys(), logger)) { - is Either.Left -> handleLocalError(cmd, result.value) - is Either.Right -> { - val commitments1 = result.value.first - val spliceStatus1 = when { - spliceStatus is SpliceStatus.QuiescenceRequested && commitments1.localIsQuiescent() -> SpliceStatus.InitiatorQuiescent(spliceStatus.command) - spliceStatus is SpliceStatus.ReceivedStfu && commitments1.localIsQuiescent() -> SpliceStatus.NonInitiatorQuiescent - else -> spliceStatus - } - val nextState = this@Normal.copy(commitments = commitments1, spliceStatus = spliceStatus1) - val actions = mutableListOf() - actions.add(ChannelAction.Storage.StoreState(nextState)) - actions.add(ChannelAction.Message.Send(result.value.second)) - if (commitments1.changes.localHasChanges()) { - actions.add(ChannelAction.Message.SendToSelf(ChannelCommand.Commitment.Sign)) - } - // If we're now quiescent, we may send our stfu message. - when { - spliceStatus is SpliceStatus.QuiescenceRequested && commitments1.localIsQuiescent() -> actions.add(ChannelAction.Message.Send(Stfu(channelId, initiator = true))) - spliceStatus is SpliceStatus.ReceivedStfu && commitments1.localIsQuiescent() -> actions.add(ChannelAction.Message.Send(Stfu(channelId, initiator = false))) - else -> {} - } - Pair(nextState, actions) - } + else -> when (val result = commitments.receiveCommit(cmd.message, channelKeys(), logger)) { + is Either.Left -> handleLocalError(cmd, result.value) + is Either.Right -> { + val commitments1 = result.value.first + val spliceStatus1 = when { + spliceStatus is SpliceStatus.QuiescenceRequested && commitments1.localIsQuiescent() -> SpliceStatus.InitiatorQuiescent(spliceStatus.command) + spliceStatus is SpliceStatus.ReceivedStfu && commitments1.localIsQuiescent() -> SpliceStatus.NonInitiatorQuiescent + else -> spliceStatus + } + val nextState = this@Normal.copy(commitments = commitments1, spliceStatus = spliceStatus1) + val actions = mutableListOf() + actions.add(ChannelAction.Storage.StoreState(nextState)) + actions.add(ChannelAction.Message.Send(result.value.second)) + if (commitments1.changes.localHasChanges()) { + actions.add(ChannelAction.Message.SendToSelf(ChannelCommand.Commitment.Sign)) } - else -> Pair(this@Normal, listOf()) + // If we're now quiescent, we may send our stfu message. + when { + spliceStatus is SpliceStatus.QuiescenceRequested && commitments1.localIsQuiescent() -> actions.add(ChannelAction.Message.Send(Stfu(channelId, initiator = true))) + spliceStatus is SpliceStatus.ReceivedStfu && commitments1.localIsQuiescent() -> actions.add(ChannelAction.Message.Send(Stfu(channelId, initiator = false))) + else -> {} + } + Pair(nextState, actions) } } } @@ -822,8 +817,6 @@ data class Normal( SpliceStatus.None } } - // reset the commit_sig batch - sigStash = emptyList() Pair(Offline(this@Normal.copy(spliceStatus = spliceStatus1)), failedHtlcs) } is ChannelCommand.Connected -> unhandled(cmd) diff --git a/modules/core/src/commonMain/kotlin/fr/acinq/lightning/channel/states/ShuttingDown.kt b/modules/core/src/commonMain/kotlin/fr/acinq/lightning/channel/states/ShuttingDown.kt index 4afc2009a..0d10ab6fa 100644 --- a/modules/core/src/commonMain/kotlin/fr/acinq/lightning/channel/states/ShuttingDown.kt +++ b/modules/core/src/commonMain/kotlin/fr/acinq/lightning/channel/states/ShuttingDown.kt @@ -39,29 +39,26 @@ data class ShuttingDown( is Either.Left -> handleLocalError(cmd, result.value) is Either.Right -> Pair(this@ShuttingDown.copy(commitments = result.value), listOf()) } - is CommitSig -> when (val sigs = aggregateSigs(cmd.message)) { - is List -> when (val result = commitments.receiveCommit(sigs, channelKeys(), logger)) { - is Either.Left -> handleLocalError(cmd, result.value) - is Either.Right -> { - val (commitments1, revocation) = result.value - when { - commitments1.hasNoPendingHtlcsOrFeeUpdate() -> startClosingNegotiation(closeCommand, commitments1, localShutdown, remoteShutdown, listOf(ChannelAction.Message.Send(revocation))) - else -> { - val nextState = this@ShuttingDown.copy(commitments = commitments1) - val actions = buildList { - add(ChannelAction.Storage.StoreState(nextState)) - add(ChannelAction.Message.Send(revocation)) - if (commitments1.changes.localHasChanges()) { - // if we have newly acknowledged changes let's sign them - add(ChannelAction.Message.SendToSelf(ChannelCommand.Commitment.Sign)) - } + is CommitSigs -> when (val result = commitments.receiveCommit(cmd.message, channelKeys(), logger)) { + is Either.Left -> handleLocalError(cmd, result.value) + is Either.Right -> { + val (commitments1, revocation) = result.value + when { + commitments1.hasNoPendingHtlcsOrFeeUpdate() -> startClosingNegotiation(closeCommand, commitments1, localShutdown, remoteShutdown, listOf(ChannelAction.Message.Send(revocation))) + else -> { + val nextState = this@ShuttingDown.copy(commitments = commitments1) + val actions = buildList { + add(ChannelAction.Storage.StoreState(nextState)) + add(ChannelAction.Message.Send(revocation)) + if (commitments1.changes.localHasChanges()) { + // if we have newly acknowledged changes let's sign them + add(ChannelAction.Message.SendToSelf(ChannelCommand.Commitment.Sign)) } - Pair(nextState, actions) } + Pair(nextState, actions) } } } - else -> Pair(this@ShuttingDown, listOf()) } is RevokeAndAck -> when (val result = commitments.receiveRevocation(cmd.message)) { is Either.Left -> handleLocalError(cmd, result.value) @@ -128,7 +125,7 @@ data class ShuttingDown( val actions = buildList { add(ChannelAction.Storage.StoreHtlcInfos(htlcInfos)) add(ChannelAction.Storage.StoreState(nextState)) - addAll(result.value.second.map { ChannelAction.Message.Send(it) }) + add(ChannelAction.Message.Send(result.value.second)) } Pair(nextState, actions) } @@ -164,11 +161,7 @@ data class ShuttingDown( is WatchSpentTriggered -> handlePotentialForceClose(watch) } is ChannelCommand.Commitment.CheckHtlcTimeout -> checkHtlcTimeout() - is ChannelCommand.Disconnected -> { - // reset the commit_sig batch - sigStash = emptyList() - Pair(Offline(this@ShuttingDown), listOf()) - } + is ChannelCommand.Disconnected -> Pair(Offline(this@ShuttingDown), listOf()) else -> unhandled(cmd) } } diff --git a/modules/core/src/commonMain/kotlin/fr/acinq/lightning/channel/states/Syncing.kt b/modules/core/src/commonMain/kotlin/fr/acinq/lightning/channel/states/Syncing.kt index 8432837c8..8cf097078 100644 --- a/modules/core/src/commonMain/kotlin/fr/acinq/lightning/channel/states/Syncing.kt +++ b/modules/core/src/commonMain/kotlin/fr/acinq/lightning/channel/states/Syncing.kt @@ -413,29 +413,29 @@ data class Syncing(val state: PersistedChannelState, val channelReestablishSent: val signedUpdates = commitments.changes.localChanges.signed val channelParams = commitments.params val batchSize = commitments.active.size - val commitSigs = commitments.active.mapNotNull { c -> + val commitSigs = CommitSigs.fromSigs(commitments.active.mapNotNull { c -> val commitInput = c.commitInput // Note that we ignore errors and simply skip failures to sign: we've already signed those updates before // the disconnection, so we don't expect any error here unless our peer sends an invalid nonce. In that // case, we simply won't send back our commit_sig until they fix their node. c.nextRemoteCommit?.commit?.sign(channelKeys, channelParams, c.fundingTxIndex, c.remoteFundingPubkey, commitInput, batchSize) - } + }) val retransmit = when (retransmitRevocation) { null -> buildList { addAll(signedUpdates) - addAll(commitSigs) + add(commitSigs) } else -> if (commitments.localCommitIndex > rnci.value.sentAfterLocalCommitIndex) { buildList { addAll(signedUpdates) - addAll(commitSigs) + add(commitSigs) add(retransmitRevocation) } } else { buildList { add(retransmitRevocation) addAll(signedUpdates) - addAll(commitSigs) + add(commitSigs) } } } diff --git a/modules/core/src/commonMain/kotlin/fr/acinq/lightning/io/Peer.kt b/modules/core/src/commonMain/kotlin/fr/acinq/lightning/io/Peer.kt index a7b8fba07..0056d25c1 100644 --- a/modules/core/src/commonMain/kotlin/fr/acinq/lightning/io/Peer.kt +++ b/modules/core/src/commonMain/kotlin/fr/acinq/lightning/io/Peer.kt @@ -102,7 +102,7 @@ data class AddLiquidityForIncomingPayment(val paymentAmount: MilliSatoshi, val r } data class PeerConnection(val id: Long, val output: Channel, val logger: MDCLogger) { - fun send(msg: LightningMessage) { + private fun sendInternal(msg: LightningMessage) { // We can safely use trySend because we use unlimited channel buffers. // If the connection was closed, the message will automatically be dropped. val result = output.trySend(msg) @@ -113,6 +113,13 @@ data class PeerConnection(val id: Long, val output: Channel, v } } } + + fun send(msg: LightningMessage) { + when (msg) { + is CommitSigBatch -> msg.messages.map { sendInternal(it) } + else -> sendInternal(msg) + } + } } data class Connected(val peerConnection: PeerConnection) : PeerCommand() @@ -190,8 +197,7 @@ class Peer( private val prologue = "lightning".encodeToByteArray() fun updatePeerStorage(nodeParams: NodeParams, channelStates: Map, peerConnection: PeerConnection?, remoteFeatures: Features?, logger: MDCLogger?) { - if (nodeParams.usePeerStorage && - remoteFeatures?.hasFeature(Feature.ProvideStorage) == true) { + if (nodeParams.usePeerStorage && remoteFeatures?.hasFeature(Feature.ProvideStorage) == true) { val persistedChannelStates = channelStates.values.filterIsInstance().filterNot { it is Closed } peerConnection?.send(PeerStorageStore(EncryptedPeerStorage.from(nodeParams.nodePrivateKey, persistedChannelStates, logger))) } @@ -472,15 +478,26 @@ class Peer( } suspend fun receiveLoop() { + suspend fun receiveMessage(): LightningMessage? { + val received = session.receive { size -> socket.receiveFully(size) } + return try { + LightningMessage.decode(received) + } catch (_: Throwable) { + logger.warning { "cannot deserialize message: ${received.byteVector().toHex()}" } + null + } + } + try { while (isActive) { - val received = session.receive { size -> socket.receiveFully(size) } - try { - val msg = LightningMessage.decode(received) - input.send(MessageReceived(peerConnection.id, msg)) - } catch (e: Throwable) { - logger.warning { "cannot deserialize message: ${received.byteVector().toHex()}" } + val msg = when (val msg = receiveMessage()) { + is CommitSig -> { + val others = (1 until msg.batchSize).mapNotNull { receiveMessage() as CommitSig } + CommitSigs.fromSigs(listOf(msg) + others) + } + else -> msg } + msg?.let { input.send(MessageReceived(peerConnection.id, it)) } } closeSocket(null) } catch (ex: Throwable) { diff --git a/modules/core/src/commonMain/kotlin/fr/acinq/lightning/wire/LightningMessages.kt b/modules/core/src/commonMain/kotlin/fr/acinq/lightning/wire/LightningMessages.kt index 434296834..66591fb73 100644 --- a/modules/core/src/commonMain/kotlin/fr/acinq/lightning/wire/LightningMessages.kt +++ b/modules/core/src/commonMain/kotlin/fr/acinq/lightning/wire/LightningMessages.kt @@ -1213,12 +1213,22 @@ data class UpdateFailMalformedHtlc( } } +/** + * [CommitSig] can either be sent individually or as part of a batch. When sent in a batch (which happens when there + * are pending splice transactions), we treat the whole batch as a single lightning message and group them on the wire. + */ +sealed class CommitSigs : HtlcMessage, HasChannelId, RequirePeerStorageStore { + companion object { + fun fromSigs(sigs: List): CommitSigs = if (sigs.size == 1) sigs.first() else CommitSigBatch(sigs) + } +} + data class CommitSig( override val channelId: ByteVector32, val signature: ByteVector64, val htlcSignatures: List, val tlvStream: TlvStream = TlvStream.empty() -) : HtlcMessage, HasChannelId, RequirePeerStorageStore { +) : CommitSigs() { override val type: Long get() = CommitSig.type val alternativeFeerateSigs: List = tlvStream.get()?.sigs ?: listOf() @@ -1254,6 +1264,21 @@ data class CommitSig( } } +data class CommitSigBatch(val messages: List) : CommitSigs() { + // The read/write functions and the type field are meant for individual lightning messages. + // While we treat a commit_sig batch as one logical message, we will actually encode each messages individually. + // That's why the read/write functions aren't implemented. + override val type: Long get() = 0 + override val channelId: ByteVector32 = messages.first().channelId + val batchSize: Int = messages.size + + override fun write(out: Output) = error("cannot write commit_sig batch: each message must be written individually") + + companion object : LightningMessageReader { + override fun read(input: Input): CommitSigBatch = error("cannot read commit_sig batch: each message must be read individually") + } +} + data class RevokeAndAck( override val channelId: ByteVector32, val perCommitmentSecret: PrivateKey, diff --git a/modules/core/src/commonTest/kotlin/fr/acinq/lightning/channel/TestsHelper.kt b/modules/core/src/commonTest/kotlin/fr/acinq/lightning/channel/TestsHelper.kt index 9929b7883..dd42542ca 100644 --- a/modules/core/src/commonTest/kotlin/fr/acinq/lightning/channel/TestsHelper.kt +++ b/modules/core/src/commonTest/kotlin/fr/acinq/lightning/channel/TestsHelper.kt @@ -526,13 +526,12 @@ object TestsHelper { return payer0 to payee0 } - private fun receiveCommitSigs(receiver: LNChannel, commitSigs: List): Pair, List> { - return commitSigs.fold(Pair(receiver, emptyList())) { pair, commitSig -> - val (statePrev, actionsPrev) = pair - assertTrue(actionsPrev.isEmpty()) - val (stateNext, actionsNext) = statePrev.process(ChannelCommand.MessageReceived(commitSig)) - assertIs>(stateNext) - Pair(stateNext, actionsNext) + private fun verifyCommitSigsCount(commitSigs: CommitSigs, commitmentsCount: Int) { + if (commitmentsCount == 1) { + assertIs(commitSigs) + } else { + assertIs(commitSigs) + assertEquals(commitmentsCount, commitSigs.batchSize) } } @@ -545,22 +544,20 @@ object TestsHelper { val rHasChanges = nodeB.state.commitments.changes.localHasChanges() val (sender0, sActions0) = nodeA.process(ChannelCommand.Commitment.Sign) - val commitSigs0 = sActions0.findOutgoingMessages() - assertEquals(commitmentsCount, commitSigs0.size) - commitSigs0.forEach { assertEquals(commitmentsCount, it.batchSize) } + val commitSigs0 = sActions0.findOutgoingMessage() + verifyCommitSigsCount(commitSigs0, commitmentsCount) - val (receiver0, rActions0) = receiveCommitSigs(nodeB, commitSigs0) + val (receiver0, rActions0) = nodeB.process(ChannelCommand.MessageReceived(commitSigs0)) val revokeAndAck0 = rActions0.findOutgoingMessage() val commandSign0 = rActions0.findCommand() val (sender1, _) = sender0.process(ChannelCommand.MessageReceived(revokeAndAck0)) assertIs>(sender1) val (receiver1, rActions1) = receiver0.process(commandSign0) - val commitSigs1 = rActions1.findOutgoingMessages() - assertEquals(commitmentsCount, commitSigs1.size) - commitSigs1.forEach { assertEquals(commitmentsCount, it.batchSize) } + val commitSigs1 = rActions1.findOutgoingMessage() + verifyCommitSigsCount(commitSigs1, commitmentsCount) - val (sender2, sActions2) = receiveCommitSigs(sender1, commitSigs1) + val (sender2, sActions2) = sender1.process(ChannelCommand.MessageReceived(commitSigs1)) val revokeAndAck1 = sActions2.findOutgoingMessage() val (receiver2, _) = receiver1.process(ChannelCommand.MessageReceived(revokeAndAck1)) assertIs>(receiver2) @@ -568,10 +565,10 @@ object TestsHelper { if (rHasChanges) { val commandSign1 = sActions2.findCommand() val (sender3, sActions3) = sender2.process(commandSign1) - val commitSigs2 = sActions3.findOutgoingMessages() - assertEquals(commitmentsCount, commitSigs2.size) + val commitSigs2 = sActions3.findOutgoingMessage() + verifyCommitSigsCount(commitSigs2, commitmentsCount) - val (receiver3, rActions3) = receiveCommitSigs(receiver2, commitSigs2) + val (receiver3, rActions3) = receiver2.process(ChannelCommand.MessageReceived(commitSigs2)) val revokeAndAck2 = rActions3.findOutgoingMessage() val (sender4, _) = sender3.process(ChannelCommand.MessageReceived(revokeAndAck2)) diff --git a/modules/core/src/commonTest/kotlin/fr/acinq/lightning/channel/states/SpliceTestsCommon.kt b/modules/core/src/commonTest/kotlin/fr/acinq/lightning/channel/states/SpliceTestsCommon.kt index ae94780fd..ddc4e0f9e 100644 --- a/modules/core/src/commonTest/kotlin/fr/acinq/lightning/channel/states/SpliceTestsCommon.kt +++ b/modules/core/src/commonTest/kotlin/fr/acinq/lightning/channel/states/SpliceTestsCommon.kt @@ -694,27 +694,22 @@ class SpliceTestsCommon : LightningTestSuite() { val (nodes, preimage, htlc) = addHtlc(20_000_000.msat, alice2, bob3) val (alice3, bob4) = nodes val (alice4, actionsAlice4) = alice3.process(ChannelCommand.Commitment.Sign) - val commitSigsAlice = actionsAlice4.findOutgoingMessages() - assertEquals(commitSigsAlice.size, 3) - commitSigsAlice.forEach { assertEquals(it.batchSize, 3) } - val (bob5, actionsBob5) = bob4.process(ChannelCommand.MessageReceived(commitSigsAlice[0])) - assertTrue(actionsBob5.isEmpty()) + val commitSigsAlice = actionsAlice4.findOutgoingMessage() + assertEquals(commitSigsAlice.batchSize, 3) val (alice5, _) = alice4.process(ChannelCommand.MessageReceived(spliceLocked)) assertEquals(alice5.commitments.active.size, 1) assertEquals(alice5.commitments.inactive.size, 2) - val (bob6, actionsBob6) = bob5.process(ChannelCommand.MessageReceived(commitSigsAlice[1])) - assertTrue(actionsBob6.isEmpty()) - val (bob7, actionsBob7) = bob6.process(ChannelCommand.MessageReceived(commitSigsAlice[2])) - assertEquals(actionsBob7.size, 3) - val revokeAndAckBob = actionsBob7.findOutgoingMessage() - actionsBob7.contains(ChannelAction.Message.SendToSelf(ChannelCommand.Commitment.Sign)) - actionsBob7.has() - val (bob8, actionsBob8) = bob7.process(ChannelCommand.Commitment.Sign) - assertEquals(actionsBob8.size, 3) - val commitSigBob = actionsBob8.findOutgoingMessage() + val (bob5, actionsBob5) = bob4.process(ChannelCommand.MessageReceived(commitSigsAlice)) + assertEquals(actionsBob5.size, 3) + val revokeAndAckBob = actionsBob5.findOutgoingMessage() + actionsBob5.contains(ChannelAction.Message.SendToSelf(ChannelCommand.Commitment.Sign)) + actionsBob5.has() + val (bob6, actionsBob6) = bob5.process(ChannelCommand.Commitment.Sign) + assertEquals(actionsBob6.size, 3) + val commitSigBob = actionsBob6.findOutgoingMessage() assertEquals(commitSigBob.batchSize, 1) - actionsBob8.has() - actionsBob8.has() + actionsBob6.has() + actionsBob6.has() val (alice6, actionsAlice6) = alice5.process(ChannelCommand.MessageReceived(revokeAndAckBob)) assertEquals(actionsAlice6.size, 1) actionsAlice6.has() @@ -723,19 +718,19 @@ class SpliceTestsCommon : LightningTestSuite() { assertEquals(actionsAlice7.size, 2) val revokeAndAckAlice = actionsAlice7.findOutgoingMessage() actionsAlice7.has() - val (bob9, actionsBob9) = bob8.process(ChannelCommand.MessageReceived(revokeAndAckAlice)) - assertIs>(bob9) - assertEquals(actionsBob9.size, 2) - actionsBob9.has() - actionsBob9.has() + val (bob7, actionsBob7) = bob6.process(ChannelCommand.MessageReceived(revokeAndAckAlice)) + assertIs>(bob7) + assertEquals(actionsBob7.size, 2) + actionsBob7.has() + actionsBob7.has() // Bob fulfills the HTLC. - val (alice8, bob10) = fulfillHtlc(htlc.id, preimage, alice7, bob9) - val (bob11, alice9) = crossSign(bob10, alice8, commitmentsCount = 1) + val (alice8, bob8) = fulfillHtlc(htlc.id, preimage, alice7, bob7) + val (bob9, alice9) = crossSign(bob8, alice8, commitmentsCount = 1) assertEquals(alice9.commitments.active.size, 1) alice9.commitments.inactive.forEach { assertTrue(it.localCommit.index < alice9.commitments.localCommitIndex) } - assertEquals(bob11.commitments.active.size, 1) - bob11.commitments.inactive.forEach { assertTrue(it.localCommit.index < bob11.commitments.localCommitIndex) } + assertEquals(bob9.commitments.active.size, 1) + bob9.commitments.inactive.forEach { assertTrue(it.localCommit.index < bob9.commitments.localCommitIndex) } } @Test @@ -1039,20 +1034,19 @@ class SpliceTestsCommon : LightningTestSuite() { val (nodes2, _, htlc) = addHtlc(50_000_000.msat, alice1, bob1) val (alice3, actionsAlice3) = nodes2.first.process(ChannelCommand.Commitment.Sign) assertIs>(alice3) - assertEquals(2, actionsAlice3.findOutgoingMessages().size) - actionsAlice3.findOutgoingMessages().forEach { assertEquals(2, it.batchSize) } + val commitSigsAlice1 = actionsAlice3.findOutgoingMessage() + assertEquals(2, commitSigsAlice1.batchSize) // Bob disconnects before receiving Alice's commit_sig. val (alice4, bob3, channelReestablishAlice) = disconnect(alice3, nodes2.second) val (bob4, actionsBob4) = bob3.process(ChannelCommand.MessageReceived(channelReestablishAlice)) val channelReestablishBob = actionsBob4.findOutgoingMessage() val (_, actionsAlice5) = alice4.process(ChannelCommand.MessageReceived(channelReestablishBob)) actionsAlice5.hasOutgoingMessage().also { assertEquals(htlc, it) } - assertEquals(2, actionsAlice5.findOutgoingMessages().size) - actionsAlice5.findOutgoingMessages().forEach { assertEquals(2, it.batchSize) } + val commitSigsAlice2 = actionsAlice5.findOutgoingMessage() + assertEquals(2, commitSigsAlice2.batchSize) val (bob5, _) = bob4.process(ChannelCommand.MessageReceived(htlc)) - val (bob6, _) = bob5.process(ChannelCommand.MessageReceived(actionsAlice5.findOutgoingMessages().first())) - val (_, actionsBob7) = bob6.process(ChannelCommand.MessageReceived(actionsAlice5.findOutgoingMessages().last())) - actionsBob7.findOutgoingMessage() + val (_, actionsBob6) = bob5.process(ChannelCommand.MessageReceived(commitSigsAlice2)) + actionsBob6.findOutgoingMessage() } @Test @@ -1071,8 +1065,10 @@ class SpliceTestsCommon : LightningTestSuite() { // Alice sends an HTLC to Bob, but Bob doesn't receive the commit_sig messages. val (nodes3, _, htlc) = addHtlc(50_000_000.msat, alice2, bob2) val (alice3, actionsAlice3) = nodes3.first.process(ChannelCommand.Commitment.Sign) - assertEquals(2, actionsAlice3.findOutgoingMessages().size) - actionsAlice3.findOutgoingMessages().forEach { assertEquals(2, it.batchSize) } + actionsAlice3.hasOutgoingMessage().also { batch -> + assertEquals(2, batch.batchSize) + batch.messages.forEach { sig -> assertEquals(2, sig.batchSize) } + } // At the same time, the splice confirms on Bob's side, who now expects a single commit_sig message. val (bob3, actionsBob3) = nodes3.second.process(ChannelCommand.WatchReceived(WatchConfirmedTriggered(bob.channelId, WatchConfirmed.ChannelFundingDepthOk, 100, 0, spliceTx))) @@ -1410,7 +1406,7 @@ class SpliceTestsCommon : LightningTestSuite() { val (nodes6, _, htlcOut2) = addHtlc(15_000_000.msat, alice5, bob5) val (alice6, bob6) = nodes6 val (alice7, actionsAlice7) = alice6.process(ChannelCommand.Commitment.Sign) - actionsAlice7.hasOutgoingMessage() // Bob ignores Alice's message + actionsAlice7.hasOutgoingMessage() // Bob ignores Alice's message assertEquals(3, bob6.commitments.active.size) assertEquals(3, alice7.commitments.active.size)