diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/ChannelEvents.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/ChannelEvents.scala index 890b93e322..cee5e99f21 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/ChannelEvents.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/ChannelEvents.scala @@ -21,8 +21,9 @@ import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey import fr.acinq.bitcoin.scalacompat.{ByteVector32, Satoshi, Transaction, TxId} import fr.acinq.eclair.blockchain.fee.FeeratePerKw import fr.acinq.eclair.channel.Helpers.Closing.ClosingType +import fr.acinq.eclair.transactions.Transactions import fr.acinq.eclair.wire.protocol._ -import fr.acinq.eclair.{BlockHeight, CltvExpiry, Features, MilliSatoshi, RealShortChannelId, ShortChannelId} +import fr.acinq.eclair.{BlockHeight, CltvExpiry, Features, MilliSatoshi, RealShortChannelId, ShortChannelId, TimestampMilli} /** * Created by PM on 17/08/2016. @@ -92,10 +93,19 @@ case class ChannelLiquidityPurchased(channel: ActorRef, channelId: ByteVector32, case class ChannelErrorOccurred(channel: ActorRef, channelId: ByteVector32, remoteNodeId: PublicKey, error: ChannelError, isFatal: Boolean) extends ChannelEvent -// NB: the fee should be set to 0 when we're not paying it. -case class TransactionPublished(channelId: ByteVector32, remoteNodeId: PublicKey, tx: Transaction, miningFee: Satoshi, desc: String) extends ChannelEvent +/** + * We published a transaction related to the given [[channelId]]. + * + * @param localMiningFee mining fee paid by us in the given [[tx]]. + * @param remoteMiningFee mining fee paid by our channel peer in the given [[tx]]. + * @param liquidityPurchase_opt optional liquidity purchase included in this transaction. + */ +case class TransactionPublished(channelId: ByteVector32, remoteNodeId: PublicKey, tx: Transaction, localMiningFee: Satoshi, remoteMiningFee: Satoshi, desc: String, liquidityPurchase_opt: Option[LiquidityAds.PurchaseBasicInfo], timestamp: TimestampMilli = TimestampMilli.now()) extends ChannelEvent { + val miningFee: Satoshi = localMiningFee + remoteMiningFee + val feerate: FeeratePerKw = Transactions.fee2rate(miningFee, tx.weight()) +} -case class TransactionConfirmed(channelId: ByteVector32, remoteNodeId: PublicKey, tx: Transaction) extends ChannelEvent +case class TransactionConfirmed(channelId: ByteVector32, remoteNodeId: PublicKey, tx: Transaction, timestamp: TimestampMilli = TimestampMilli.now()) extends ChannelEvent // NB: this event is only sent when the channel is available. case class AvailableBalanceChanged(channel: ActorRef, channelId: ByteVector32, aliases: ShortIdAliases, commitments: Commitments, lastAnnouncement_opt: Option[ChannelAnnouncement]) extends ChannelEvent diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/Helpers.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/Helpers.scala index b50b4a15f6..e09f4221a4 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/Helpers.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/Helpers.scala @@ -971,11 +971,12 @@ object Helpers { } } - /** Compute the fee paid by a commitment transaction. */ - def commitTxFee(commitInput: InputInfo, commitTx: Transaction, localPaysCommitTxFees: Boolean): Satoshi = { + /** Compute the fee paid by a commitment transaction. The first result is the fee paid by us, the second one is the fee paid by our peer. */ + def commitTxFee(commitInput: InputInfo, commitTx: Transaction, localPaysCommitTxFees: Boolean): (Satoshi, Satoshi) = { require(commitTx.txIn.size == 1, "transaction must have only one input") require(commitTx.txIn.exists(txIn => txIn.outPoint == commitInput.outPoint), "transaction must spend the funding output") - if (localPaysCommitTxFees) commitInput.txOut.amount - commitTx.txOut.map(_.amount).sum else 0 sat + val commitFee = commitInput.txOut.amount - commitTx.txOut.map(_.amount).sum + if (localPaysCommitTxFees) (commitFee, 0 sat) else (0 sat, commitFee) } /** Return the confirmation target that should be used for our local commitment. */ diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/DualFundingHandlers.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/DualFundingHandlers.scala index 0f891bea1a..75c188f413 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/DualFundingHandlers.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/DualFundingHandlers.scala @@ -51,7 +51,7 @@ trait DualFundingHandlers extends CommonFundingHandlers { // to publish and we may be able to RBF. wallet.publishTransaction(fundingTx.signedTx).onComplete { case Success(_) => - context.system.eventStream.publish(TransactionPublished(dualFundedTx.fundingParams.channelId, remoteNodeId, fundingTx.signedTx, fundingTx.tx.localFees.truncateToSatoshi, "funding")) + context.system.eventStream.publish(TransactionPublished(dualFundedTx.fundingParams.channelId, remoteNodeId, fundingTx.signedTx, localMiningFee = fundingTx.tx.localFees.truncateToSatoshi, remoteMiningFee = fundingTx.tx.remoteFees.truncateToSatoshi, "funding", dualFundedTx.liquidityPurchase_opt)) // We rely on Bitcoin Core ZMQ notifications to learn about transactions that appear in our mempool, but // it doesn't provide strong guarantees that we'll always receive an event. This can be an issue for 0-conf // funding transactions, where we end up delaying our channel_ready or splice_locked. diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/ErrorHandlers.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/ErrorHandlers.scala index 6807f366ff..96a1f2a516 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/ErrorHandlers.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/ErrorHandlers.scala @@ -232,7 +232,8 @@ trait ErrorHandlers extends CommonHandlers { /** Publish 2nd-stage transactions for our local commitment. */ def doPublish(lcp: LocalCommitPublished, txs: Closing.LocalClose.SecondStageTransactions, commitment: FullCommitment): Unit = { - val publishCommitTx = PublishFinalTx(lcp.commitTx, commitment.fundingInput, "commit-tx", Closing.commitTxFee(commitment.commitInput(channelKeys), lcp.commitTx, commitment.localChannelParams.paysCommitTxFees), None) + val (localCommitFee, _) = Closing.commitTxFee(commitment.commitInput(channelKeys), lcp.commitTx, commitment.localChannelParams.paysCommitTxFees) + val publishCommitTx = PublishFinalTx(lcp.commitTx, commitment.fundingInput, "commit-tx", localCommitFee, None) val publishAnchorTx_opt = txs.anchorTx_opt match { case Some(anchorTx) if !lcp.isConfirmed => val confirmationTarget = Closing.confirmationTarget(commitment.localCommit, commitment.localCommitParams.dustLimit, commitment.commitmentFormat, nodeParams.onChainFeeConf) @@ -274,7 +275,8 @@ trait ErrorHandlers extends CommonHandlers { case closing: DATA_CLOSING => nodeParams.onChainFeeConf.getClosingFeerate(nodeParams.currentBitcoinCoreFeerates, closing.maxClosingFeerate_opt) case _ => nodeParams.onChainFeeConf.getClosingFeerate(nodeParams.currentBitcoinCoreFeerates, maxClosingFeerateOverride_opt = None) } - context.system.eventStream.publish(TransactionPublished(d.channelId, remoteNodeId, commitTx, Closing.commitTxFee(commitments.commitInput(channelKeys), commitTx, d.commitments.localChannelParams.paysCommitTxFees), "remote-commit")) + val (localCommitFee, remoteCommitFee) = Closing.commitTxFee(commitments.commitInput(channelKeys), commitTx, d.commitments.localChannelParams.paysCommitTxFees) + context.system.eventStream.publish(TransactionPublished(d.channelId, remoteNodeId, commitTx, localCommitFee, remoteCommitFee, "remote-commit", None)) val (remoteCommitPublished, closingTxs) = Closing.RemoteClose.claimCommitTxOutputs(channelKeys, commitments, commitments.remoteCommit, commitTx, closingFeerate, finalScriptPubKey, nodeParams.onChainFeeConf.spendAnchorWithoutHtlcs) val nextData = d match { case closing: DATA_CLOSING => closing.copy(remoteCommitPublished = Some(remoteCommitPublished)) @@ -296,7 +298,8 @@ trait ErrorHandlers extends CommonHandlers { case closing: DATA_CLOSING => nodeParams.onChainFeeConf.getClosingFeerate(nodeParams.currentBitcoinCoreFeerates, closing.maxClosingFeerate_opt) case _ => nodeParams.onChainFeeConf.getClosingFeerate(nodeParams.currentBitcoinCoreFeerates, maxClosingFeerateOverride_opt = None) } - context.system.eventStream.publish(TransactionPublished(d.channelId, remoteNodeId, commitTx, Closing.commitTxFee(commitment.commitInput(channelKeys), commitTx, d.commitments.localChannelParams.paysCommitTxFees), "next-remote-commit")) + val (localCommitFee, remoteCommitFee) = Closing.commitTxFee(commitment.commitInput(channelKeys), commitTx, d.commitments.localChannelParams.paysCommitTxFees) + context.system.eventStream.publish(TransactionPublished(d.channelId, remoteNodeId, commitTx, localCommitFee, remoteCommitFee, "next-remote-commit", None)) val (remoteCommitPublished, closingTxs) = Closing.RemoteClose.claimCommitTxOutputs(channelKeys, commitment, remoteCommit, commitTx, closingFeerate, finalScriptPubKey, nodeParams.onChainFeeConf.spendAnchorWithoutHtlcs) val nextData = d match { case closing: DATA_CLOSING => closing.copy(nextRemoteCommitPublished = Some(remoteCommitPublished)) @@ -350,7 +353,8 @@ trait ErrorHandlers extends CommonHandlers { val dustLimit = commitment.localCommitParams.dustLimit val (revokedCommitPublished, closingTxs) = Closing.RevokedClose.claimCommitTxOutputs(d.commitments.channelParams, channelKeys, tx, commitmentNumber, remotePerCommitmentSecret, toSelfDelay, commitmentFormat, nodeParams.db.channels, dustLimit, nodeParams.currentBitcoinCoreFeerates, nodeParams.onChainFeeConf, finalScriptPubKey) log.warning("txid={} was a revoked commitment, publishing the penalty tx", tx.txid) - context.system.eventStream.publish(TransactionPublished(d.channelId, remoteNodeId, tx, Closing.commitTxFee(commitment.commitInput(channelKeys), tx, d.commitments.localChannelParams.paysCommitTxFees), "revoked-commit")) + val (localCommitFee, remoteCommitFee) = Closing.commitTxFee(commitment.commitInput(channelKeys), tx, d.commitments.localChannelParams.paysCommitTxFees) + context.system.eventStream.publish(TransactionPublished(d.channelId, remoteNodeId, tx, localCommitFee, remoteCommitFee, "revoked-commit", None)) val exc = FundingTxSpent(d.channelId, tx.txid) val error = Error(d.channelId, exc.getMessage) val nextData = d match { @@ -364,7 +368,8 @@ trait ErrorHandlers extends CommonHandlers { case None => d match { case d: DATA_WAIT_FOR_REMOTE_PUBLISH_FUTURE_COMMITMENT => log.warning("they published a future commit (because we asked them to) in txid={}", tx.txid) - context.system.eventStream.publish(TransactionPublished(d.channelId, remoteNodeId, tx, Closing.commitTxFee(d.commitments.latest.commitInput(channelKeys), tx, d.commitments.localChannelParams.paysCommitTxFees), "future-remote-commit")) + val (localCommitFee, remoteCommitFee) = Closing.commitTxFee(d.commitments.latest.commitInput(channelKeys), tx, d.commitments.localChannelParams.paysCommitTxFees) + context.system.eventStream.publish(TransactionPublished(d.channelId, remoteNodeId, tx, localCommitFee, remoteCommitFee, "future-remote-commit", None)) val remotePerCommitmentPoint = d.remoteChannelReestablish.myCurrentPerCommitmentPoint val commitKeys = d.commitments.latest.remoteKeys(channelKeys, remotePerCommitmentPoint) val closingFeerate = nodeParams.onChainFeeConf.getClosingFeerate(nodeParams.currentBitcoinCoreFeerates, maxClosingFeerateOverride_opt = None) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/SingleFundingHandlers.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/SingleFundingHandlers.scala index 95a1192493..27994b16f4 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/SingleFundingHandlers.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/SingleFundingHandlers.scala @@ -43,7 +43,7 @@ trait SingleFundingHandlers extends CommonFundingHandlers { def publishFundingTx(channelId: ByteVector32, fundingTx: Transaction, fundingTxFee: Satoshi, replyTo: akka.actor.typed.ActorRef[OpenChannelResponse]): Unit = { wallet.commit(fundingTx).onComplete { case Success(true) => - context.system.eventStream.publish(TransactionPublished(channelId, remoteNodeId, fundingTx, fundingTxFee, "funding")) + context.system.eventStream.publish(TransactionPublished(channelId, remoteNodeId, fundingTx, localMiningFee = fundingTxFee, remoteMiningFee = 0 sat, "funding", None)) replyTo ! OpenChannelResponse.Created(channelId, fundingTxId = fundingTx.txid, fundingTxFee) case Success(false) => replyTo ! OpenChannelResponse.Rejected("couldn't publish funding tx") diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/publish/MempoolTxMonitor.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/publish/MempoolTxMonitor.scala index 0470229637..df7c334063 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/publish/MempoolTxMonitor.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/publish/MempoolTxMonitor.scala @@ -19,7 +19,7 @@ package fr.acinq.eclair.channel.publish import akka.actor.typed.eventstream.EventStream import akka.actor.typed.scaladsl.{ActorContext, Behaviors, TimerScheduler} import akka.actor.typed.{ActorRef, Behavior} -import fr.acinq.bitcoin.scalacompat.{ByteVector32, OutPoint, Satoshi, Transaction, TxId} +import fr.acinq.bitcoin.scalacompat.{ByteVector32, OutPoint, Satoshi, SatoshiLong, Transaction, TxId} import fr.acinq.eclair.blockchain.CurrentBlockHeight import fr.acinq.eclair.blockchain.bitcoind.rpc.BitcoinCoreClient import fr.acinq.eclair.channel.publish.TxPublisher.{TxPublishContext, TxRejectedReason} @@ -136,7 +136,7 @@ private class MempoolTxMonitor(nodeParams: NodeParams, private def waitForConfirmation(): Behavior[Command] = { context.system.eventStream ! EventStream.Subscribe(context.messageAdapter[CurrentBlockHeight](cbc => WrappedCurrentBlockHeight(cbc.blockHeight))) - context.system.eventStream ! EventStream.Publish(TransactionPublished(txPublishContext.channelId_opt.getOrElse(ByteVector32.Zeroes), txPublishContext.remoteNodeId, cmd.tx, cmd.fee, cmd.desc)) + context.system.eventStream ! EventStream.Publish(TransactionPublished(txPublishContext.channelId_opt.getOrElse(ByteVector32.Zeroes), txPublishContext.remoteNodeId, cmd.tx, localMiningFee = cmd.fee, remoteMiningFee = 0 sat, cmd.desc, None)) Behaviors.receiveMessagePartial { case WrappedCurrentBlockHeight(currentBlockHeight) => timers.startSingleTimer(CheckTxConfirmationsKey, CheckTxConfirmations(currentBlockHeight), (1 + Random.nextLong(nodeParams.channelConf.maxTxPublishRetryDelay.toMillis)).millis) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/DbEventHandler.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/DbEventHandler.scala index ba249a831d..9275ade6f4 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/DbEventHandler.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/DbEventHandler.scala @@ -29,7 +29,7 @@ import fr.acinq.eclair.channel._ import fr.acinq.eclair.db.DbEventHandler.ChannelEvent import fr.acinq.eclair.payment.Monitoring.{Metrics => PaymentMetrics, Tags => PaymentTags} import fr.acinq.eclair.payment._ -import fr.acinq.eclair.{Logs, NodeParams} +import fr.acinq.eclair.{Logs, NodeParams, TimestampMilli} /** * This actor sits at the interface between our event stream and the database. @@ -90,8 +90,8 @@ class DbEventHandler(nodeParams: NodeParams) extends Actor with DiagnosticActorL incoming.foreach(p => channelsDb.updateChannelMeta(p.channelId, ChannelEvent.EventType.PaymentReceived)) outgoing.foreach(p => channelsDb.updateChannelMeta(p.channelId, ChannelEvent.EventType.PaymentSent)) case ChannelPaymentRelayed(_, incoming, outgoing) => - channelsDb.updateChannelMeta(incoming.channelId, ChannelEvent.EventType.PaymentReceived) - channelsDb.updateChannelMeta(outgoing.channelId, ChannelEvent.EventType.PaymentSent) + incoming.foreach(i => channelsDb.updateChannelMeta(i.channelId, ChannelEvent.EventType.PaymentReceived)) + outgoing.foreach(o => channelsDb.updateChannelMeta(o.channelId, ChannelEvent.EventType.PaymentSent)) case OnTheFlyFundingPaymentRelayed(_, incoming, outgoing) => incoming.foreach(p => channelsDb.updateChannelMeta(p.channelId, ChannelEvent.EventType.PaymentReceived)) outgoing.foreach(p => channelsDb.updateChannelMeta(p.channelId, ChannelEvent.EventType.PaymentSent)) @@ -124,7 +124,7 @@ class DbEventHandler(nodeParams: NodeParams) extends Actor with DiagnosticActorL case ChannelStateChanged(_, channelId, _, remoteNodeId, WAIT_FOR_CHANNEL_READY | WAIT_FOR_DUAL_FUNDING_READY, NORMAL, Some(commitments)) => ChannelMetrics.ChannelLifecycleEvents.withTag(ChannelTags.Event, ChannelTags.Events.Created).increment() val event = ChannelEvent.EventType.Created - auditDb.add(ChannelEvent(channelId, remoteNodeId, commitments.latest.fundingTxId, commitments.latest.capacity, commitments.localChannelParams.isChannelOpener, !commitments.announceChannel, event)) + auditDb.add(ChannelEvent(channelId, remoteNodeId, commitments.latest.fundingTxId, commitments.latest.commitmentFormat.toString, commitments.latest.capacity, commitments.localChannelParams.isChannelOpener, !commitments.announceChannel, event.label)) channelsDb.updateChannelMeta(channelId, event) case ChannelStateChanged(_, channelId, _, _, OFFLINE, SYNCING, _) => channelsDb.updateChannelMeta(channelId, ChannelEvent.EventType.Connected) @@ -141,7 +141,7 @@ class DbEventHandler(nodeParams: NodeParams) extends Actor with DiagnosticActorL case 0 => ChannelEvent.EventType.Confirmed case _ => ChannelEvent.EventType.Spliced } - auditDb.add(ChannelEvent(e.channelId, e.remoteNodeId, e.fundingTxId, e.commitments.latest.capacity, e.commitments.localChannelParams.isChannelOpener, !e.commitments.announceChannel, event)) + auditDb.add(ChannelEvent(e.channelId, e.remoteNodeId, e.fundingTxId, e.commitments.latest.commitmentFormat.toString, e.commitments.latest.capacity, e.commitments.localChannelParams.isChannelOpener, !e.commitments.announceChannel, event.label)) case e: ChannelClosed => ChannelMetrics.ChannelLifecycleEvents.withTag(ChannelTags.Event, ChannelTags.Events.Closed).increment() @@ -150,7 +150,7 @@ class DbEventHandler(nodeParams: NodeParams) extends Actor with DiagnosticActorL // spent by the closing transaction. val capacity = e.commitments.latest.capacity val fundingTxId = e.commitments.latest.fundingTxId - auditDb.add(ChannelEvent(e.channelId, e.commitments.remoteNodeId, fundingTxId, capacity, e.commitments.localChannelParams.isChannelOpener, !e.commitments.announceChannel, event)) + auditDb.add(ChannelEvent(e.channelId, e.commitments.remoteNodeId, fundingTxId, e.commitments.latest.commitmentFormat.toString, capacity, e.commitments.localChannelParams.isChannelOpener, !e.commitments.announceChannel, event.label)) channelsDb.updateChannelMeta(e.channelId, event) case u: ChannelUpdateParametersChanged => @@ -178,7 +178,7 @@ object DbEventHandler { def props(nodeParams: NodeParams): Props = Props(new DbEventHandler(nodeParams)) // @formatter:off - case class ChannelEvent(channelId: ByteVector32, remoteNodeId: PublicKey, fundingTxId: TxId, capacity: Satoshi, isChannelOpener: Boolean, isPrivate: Boolean, event: ChannelEvent.EventType) + case class ChannelEvent(channelId: ByteVector32, remoteNodeId: PublicKey, fundingTxId: TxId, channelType: String, capacity: Satoshi, isChannelOpener: Boolean, isPrivate: Boolean, event: String, timestamp: TimestampMilli = TimestampMilli.now()) object ChannelEvent { sealed trait EventType { def label: String } object EventType { @@ -190,12 +190,12 @@ object DbEventHandler { object PaymentReceived extends EventType { override def label: String = "received" } case class Closed(closingType: ClosingType) extends EventType { override def label: String = closingType match { - case _: MutualClose => "mutual" - case _: LocalClose => "local" - case _: CurrentRemoteClose => "remote" - case _: NextRemoteClose => "remote" - case _: RecoveryClose => "recovery" - case _: RevokedClose => "revoked" + case _: MutualClose => "mutual-close" + case _: LocalClose => "local-close" + case _: CurrentRemoteClose => "remote-close" + case _: NextRemoteClose => "remote-close" + case _: RecoveryClose => "recovery-close" + case _: RevokedClose => "revoked-close" } } } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgAuditDb.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgAuditDb.scala index 4caa243a72..e3e7a83e7f 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgAuditDb.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgAuditDb.scala @@ -195,7 +195,7 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging { statement.setLong(3, e.capacity.toLong) statement.setBoolean(4, e.isChannelOpener) statement.setBoolean(5, e.isPrivate) - statement.setString(6, e.event.label) + statement.setString(6, e.event) statement.setTimestamp(7, Timestamp.from(Instant.now())) statement.executeUpdate() } @@ -243,8 +243,8 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging { val payments = e match { case e: ChannelPaymentRelayed => // non-trampoline relayed payments have one input and one output - val in = Seq(RelayedPart(e.paymentIn.channelId, e.paymentIn.amount, "IN", "channel", e.startedAt)) - val out = Seq(RelayedPart(e.paymentOut.channelId, e.paymentOut.amount, "OUT", "channel", e.settledAt)) + val in = e.incoming.map(i => RelayedPart(i.channelId, i.amount, "IN", "channel", i.receivedAt)) + val out = e.outgoing.map(o => RelayedPart(o.channelId, o.amount, "OUT", "channel", o.settledAt)) in ++ out case TrampolinePaymentRelayed(_, incoming, outgoing, nextTrampolineNodeId, nextTrampolineAmount) => using(pg.prepareStatement("INSERT INTO audit.relayed_trampoline VALUES (?, ?, ?, ?)")) { statement => @@ -450,7 +450,7 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging { val outgoing = parts.filter(_.direction == "OUT").map(p => PaymentEvent.OutgoingPayment(p.channelId, PrivateKey(ByteVector32.One).publicKey, p.amount, p.timestamp)).sortBy(_.amount) parts.headOption match { case Some(RelayedPart(_, _, _, "channel", _)) => incoming.zip(outgoing).map { - case (in, out) => ChannelPaymentRelayed(paymentHash, in, out) + case (in, out) => ChannelPaymentRelayed(paymentHash, Seq(in), Seq(out)) } case Some(RelayedPart(_, _, _, "trampoline", _)) => trampolineByHash.get(paymentHash) match { case Some((nextTrampolineAmount, nextTrampolineNodeId)) => TrampolinePaymentRelayed(paymentHash, incoming, outgoing, nextTrampolineNodeId, nextTrampolineAmount) :: Nil @@ -502,16 +502,7 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging { val relayed = listRelayed(from, to).foldLeft(Map.empty[ByteVector32, Seq[Relayed]]) { (previous, e) => // NB: we must avoid counting the fee twice: we associate it to the outgoing channels rather than the incoming ones. - val current = e match { - case c: ChannelPaymentRelayed => Map( - c.paymentIn.channelId -> (Relayed(c.amountIn, 0 msat, "IN") +: previous.getOrElse(c.paymentIn.channelId, Nil)), - c.paymentOut.channelId -> (Relayed(c.amountOut, c.amountIn - c.amountOut, "OUT") +: previous.getOrElse(c.paymentOut.channelId, Nil)), - ) - case t: TrampolinePaymentRelayed => - aggregateRelayStats(previous, t.incoming, t.outgoing) - case f: OnTheFlyFundingPaymentRelayed => - aggregateRelayStats(previous, f.incoming, f.outgoing) - } + val current = aggregateRelayStats(previous, e.incoming, e.outgoing) previous ++ current } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteAuditDb.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteAuditDb.scala index 464d9daed4..d753208fcb 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteAuditDb.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteAuditDb.scala @@ -189,7 +189,7 @@ class SqliteAuditDb(val sqlite: Connection) extends AuditDb with Logging { statement.setLong(3, e.capacity.toLong) statement.setBoolean(4, e.isChannelOpener) statement.setBoolean(5, e.isPrivate) - statement.setString(6, e.event.label) + statement.setString(6, e.event) statement.setLong(7, TimestampMilli.now().toLong) statement.executeUpdate() } @@ -231,8 +231,8 @@ class SqliteAuditDb(val sqlite: Connection) extends AuditDb with Logging { val payments = e match { case e: ChannelPaymentRelayed => // non-trampoline relayed payments have one input and one output - val in = Seq(RelayedPart(e.paymentIn.channelId, e.paymentIn.amount, "IN", "channel", e.startedAt)) - val out = Seq(RelayedPart(e.paymentOut.channelId, e.paymentOut.amount, "OUT", "channel", e.settledAt)) + val in = e.incoming.map(i => RelayedPart(i.channelId, i.amount, "IN", "channel", i.receivedAt)) + val out = e.outgoing.map(o => RelayedPart(o.channelId, o.amount, "OUT", "channel", o.settledAt)) in ++ out case TrampolinePaymentRelayed(_, incoming, outgoing, nextTrampolineNodeId, nextTrampolineAmount) => using(sqlite.prepareStatement("INSERT INTO relayed_trampoline VALUES (?, ?, ?, ?)")) { statement => @@ -421,7 +421,7 @@ class SqliteAuditDb(val sqlite: Connection) extends AuditDb with Logging { val outgoing = parts.filter(_.direction == "OUT").map(p => PaymentEvent.OutgoingPayment(p.channelId, PrivateKey(ByteVector32.One).publicKey, p.amount, p.timestamp)).sortBy(_.amount) parts.headOption match { case Some(RelayedPart(_, _, _, "channel", _)) => incoming.zip(outgoing).map { - case (in, out) => ChannelPaymentRelayed(paymentHash, in, out) + case (in, out) => ChannelPaymentRelayed(paymentHash, Seq(in), Seq(out)) } case Some(RelayedPart(_, _, _, "trampoline", _)) => trampolineByHash.get(paymentHash) match { case Some((nextTrampolineAmount, nextTrampolineNodeId)) => TrampolinePaymentRelayed(paymentHash, incoming, outgoing, nextTrampolineNodeId, nextTrampolineAmount) :: Nil @@ -472,16 +472,7 @@ class SqliteAuditDb(val sqlite: Connection) extends AuditDb with Logging { val relayed = listRelayed(from, to).foldLeft(Map.empty[ByteVector32, Seq[Relayed]]) { (previous, e) => // NB: we must avoid counting the fee twice: we associate it to the outgoing channels rather than the incoming ones. - val current = e match { - case c: ChannelPaymentRelayed => Map( - c.paymentIn.channelId -> (Relayed(c.amountIn, 0 msat, "IN") +: previous.getOrElse(c.paymentIn.channelId, Nil)), - c.paymentOut.channelId -> (Relayed(c.amountOut, c.amountIn - c.amountOut, "OUT") +: previous.getOrElse(c.paymentOut.channelId, Nil)), - ) - case t: TrampolinePaymentRelayed => - aggregateRelayStats(previous, t.incoming, t.outgoing) - case f: OnTheFlyFundingPaymentRelayed => - aggregateRelayStats(previous, f.incoming, f.outgoing) - } + val current = aggregateRelayStats(previous, e.incoming, e.outgoing) previous ++ current } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/payment/PaymentEvents.scala b/eclair-core/src/main/scala/fr/acinq/eclair/payment/PaymentEvents.scala index 7874753217..f8553a811a 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/payment/PaymentEvents.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/payment/PaymentEvents.scala @@ -120,16 +120,14 @@ sealed trait PaymentRelayed extends PaymentEvent { def outgoing: Seq[PaymentEvent.OutgoingPayment] def amountIn: MilliSatoshi = incoming.map(_.amount).sum def amountOut: MilliSatoshi = outgoing.map(_.amount).sum + def relayFee: MilliSatoshi = amountIn - amountOut override def startedAt: TimestampMilli = incoming.map(_.receivedAt).minOption.getOrElse(TimestampMilli.now()) override def settledAt: TimestampMilli = outgoing.map(_.settledAt).maxOption.getOrElse(TimestampMilli.now()) // @formatter:on } -/** A payment was successfully relayed from a single incoming channel to a single outgoing channel. */ -case class ChannelPaymentRelayed(paymentHash: ByteVector32, paymentIn: PaymentEvent.IncomingPayment, paymentOut: PaymentEvent.OutgoingPayment) extends PaymentRelayed { - override val incoming: Seq[PaymentEvent.IncomingPayment] = Seq(paymentIn) - override val outgoing: Seq[PaymentEvent.OutgoingPayment] = Seq(paymentOut) -} +/** A payment was successfully relayed from incoming channels to outgoing channels. */ +case class ChannelPaymentRelayed(paymentHash: ByteVector32, incoming: Seq[PaymentEvent.IncomingPayment], outgoing: Seq[PaymentEvent.OutgoingPayment]) extends PaymentRelayed /** A trampoline payment was successfully relayed, using potentially multiple incoming and outgoing channels. */ case class TrampolinePaymentRelayed(paymentHash: ByteVector32, incoming: Seq[PaymentEvent.IncomingPayment], outgoing: Seq[PaymentEvent.OutgoingPayment], nextTrampolineNodeId: PublicKey, nextTrampolineAmount: MilliSatoshi) extends PaymentRelayed diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/ChannelRelay.scala b/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/ChannelRelay.scala index 97f987787b..2911c4dd33 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/ChannelRelay.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/ChannelRelay.scala @@ -258,7 +258,7 @@ class ChannelRelay private(nodeParams: NodeParams, val cmd = CMD_FULFILL_HTLC(upstream.add.id, fulfill.paymentPreimage, Some(attribution), commit = true) val incoming = PaymentEvent.IncomingPayment(upstream.add.channelId, upstream.receivedFrom, upstream.amountIn, upstream.receivedAt) val outgoing = PaymentEvent.OutgoingPayment(htlc.channelId, remoteNodeId, htlc.amountMsat, now) - context.system.eventStream ! EventStream.Publish(ChannelPaymentRelayed(htlc.paymentHash, incoming, outgoing)) + context.system.eventStream ! EventStream.Publish(ChannelPaymentRelayed(htlc.paymentHash, Seq(incoming), Seq(outgoing))) recordRelayDuration(isSuccess = true) safeSendAndStop(upstream.add.channelId, cmd) case WrappedAddResponse(RES_ADD_SETTLED(_, _, htlc, fail: HtlcResult.Fail)) => diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/PostRestartHtlcCleaner.scala b/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/PostRestartHtlcCleaner.scala index 7952335af7..9236f0430b 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/PostRestartHtlcCleaner.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/PostRestartHtlcCleaner.scala @@ -211,7 +211,7 @@ class PostRestartHtlcCleaner(nodeParams: NodeParams, register: ActorRef, initial } PendingCommandsDb.safeSend(register, nodeParams.db.pendingCommands, u.originChannelId, CMD_FULFILL_HTLC(u.originHtlcId, paymentPreimage, None, commit = true)) // We don't know when we received this HTLC so we just pretend that we received it just now. - context.system.eventStream.publish(ChannelPaymentRelayed(fulfilledHtlc.paymentHash, PaymentEvent.IncomingPayment(u.originChannelId, u.originNodeId, u.amountIn, TimestampMilli.now()), PaymentEvent.OutgoingPayment(fulfilledHtlc.channelId, downstreamNodeId, fulfilledHtlc.amountMsat, TimestampMilli.now()))) + context.system.eventStream.publish(ChannelPaymentRelayed(fulfilledHtlc.paymentHash, Seq(PaymentEvent.IncomingPayment(u.originChannelId, u.originNodeId, u.amountIn, TimestampMilli.now())), Seq(PaymentEvent.OutgoingPayment(fulfilledHtlc.channelId, downstreamNodeId, fulfilledHtlc.amountMsat, TimestampMilli.now())))) Metrics.PendingRelayedOut.decrement() context become main(brokenHtlcs.copy(relayedOut = brokenHtlcs.relayedOut - origin)) case u: Upstream.Cold.Trampoline => diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/wire/protocol/LiquidityAds.scala b/eclair-core/src/main/scala/fr/acinq/eclair/wire/protocol/LiquidityAds.scala index fbf983bad9..79c5fd660b 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/wire/protocol/LiquidityAds.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/wire/protocol/LiquidityAds.scala @@ -249,7 +249,9 @@ object LiquidityAds { case class WillFundPurchase(willFund: WillFund, purchase: Purchase) /** Minimal information about a liquidity purchase, useful for example when RBF-ing transactions. */ - case class PurchaseBasicInfo(isBuyer: Boolean, amount: Satoshi, fees: Fees) + case class PurchaseBasicInfo(isBuyer: Boolean, amount: Satoshi, fees: Fees) { + val isSeller: Boolean = !isBuyer + } object Codecs { val fundingRate: Codec[FundingRate] = ( diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/channel/publish/MempoolTxMonitorSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/channel/publish/MempoolTxMonitorSpec.scala index 2f02d3239d..331de836cc 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/channel/publish/MempoolTxMonitorSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/channel/publish/MempoolTxMonitorSpec.scala @@ -32,6 +32,7 @@ import fr.acinq.eclair.channel.publish.TxPublisher.TxRejectedReason._ import fr.acinq.eclair.channel.{TransactionConfirmed, TransactionPublished} import fr.acinq.eclair.{TestConstants, TestKitBaseClass, randomKey} import org.scalatest.BeforeAndAfterAll +import org.scalatest.Inside.inside import org.scalatest.funsuite.AnyFunSuiteLike import java.util.UUID @@ -279,7 +280,11 @@ class MempoolTxMonitorSpec extends TestKitBaseClass with AnyFunSuiteLike with Bi generateBlocks(2) monitor ! WrappedCurrentBlockHeight(currentBlockHeight()) - eventListener.expectMsg(TransactionConfirmed(txPublished.channelId, txPublished.remoteNodeId, tx)) + inside(eventListener.expectMsgType[TransactionConfirmed]) { e => + assert(e.channelId == txPublished.channelId) + assert(e.remoteNodeId == txPublished.remoteNodeId) + assert(e.tx == tx) + } } } diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/db/AuditDbSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/db/AuditDbSpec.scala index 8f2b4ae9b9..9a07b6f51d 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/db/AuditDbSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/db/AuditDbSpec.scala @@ -21,7 +21,6 @@ import fr.acinq.bitcoin.scalacompat.{Block, ByteVector32, SatoshiLong, Script, T import fr.acinq.eclair.TestDatabases.{TestPgDatabases, TestSqliteDatabases} import fr.acinq.eclair.TestUtils.randomTxId import fr.acinq.eclair._ -import fr.acinq.eclair.channel.Helpers.Closing.MutualClose import fr.acinq.eclair.channel._ import fr.acinq.eclair.db.AuditDb.Stats import fr.acinq.eclair.db.DbEventHandler.ChannelEvent @@ -70,8 +69,8 @@ class AuditDbSpec extends AnyFunSuite { val pp2a = PaymentEvent.IncomingPayment(randomBytes32(), dummyRemoteNodeId, 42000 msat, now) val pp2b = PaymentEvent.IncomingPayment(randomBytes32(), dummyRemoteNodeId, 42100 msat, now) val e2 = PaymentReceived(randomBytes32(), pp2a :: pp2b :: Nil) - val e3 = ChannelPaymentRelayed(randomBytes32(), PaymentEvent.IncomingPayment(randomBytes32(), dummyRemoteNodeId, 42000 msat, now - 3.seconds), PaymentEvent.OutgoingPayment(randomBytes32(), dummyRemoteNodeId, 1000 msat, now)) - val e4a = TransactionPublished(randomBytes32(), randomKey().publicKey, Transaction(0, Seq.empty, Seq.empty, 0), 42 sat, "mutual") + val e3 = ChannelPaymentRelayed(randomBytes32(), Seq(PaymentEvent.IncomingPayment(randomBytes32(), dummyRemoteNodeId, 42000 msat, now - 3.seconds)), Seq(PaymentEvent.OutgoingPayment(randomBytes32(), dummyRemoteNodeId, 1000 msat, now))) + val e4a = TransactionPublished(randomBytes32(), randomKey().publicKey, Transaction(0, Seq.empty, Seq.empty, 0), 42 sat, 0 sat, "mutual", None) val e4b = TransactionConfirmed(e4a.channelId, e4a.remoteNodeId, e4a.tx) val e4c = TransactionConfirmed(randomBytes32(), randomKey().publicKey, Transaction(2, Nil, TxOut(500 sat, hex"1234") :: Nil, 0)) val pp5a = PaymentSent.PaymentPart(UUID.randomUUID(), PaymentEvent.OutgoingPayment(randomBytes32(), dummyRemoteNodeId, 42000 msat, 0 unixms), 1000 msat, None, startedAt = 0 unixms) @@ -79,7 +78,7 @@ class AuditDbSpec extends AnyFunSuite { val e5 = PaymentSent(UUID.randomUUID(), randomBytes32(), 84100 msat, randomKey().publicKey, pp5a :: pp5b :: Nil, None, startedAt = 0 unixms) val pp6 = PaymentSent.PaymentPart(UUID.randomUUID(), PaymentEvent.OutgoingPayment(randomBytes32(), dummyRemoteNodeId, 42000 msat, settledAt = now + 10.minutes), 1000 msat, None, startedAt = now + 10.minutes) val e6 = PaymentSent(UUID.randomUUID(), randomBytes32(), 42000 msat, randomKey().publicKey, pp6 :: Nil, None, startedAt = now + 10.minutes) - val e7 = ChannelEvent(randomBytes32(), randomKey().publicKey, randomTxId(), 456123000 sat, isChannelOpener = true, isPrivate = false, ChannelEvent.EventType.Closed(MutualClose(null))) + val e7 = ChannelEvent(randomBytes32(), randomKey().publicKey, randomTxId(), "anchor_outputs", 456123000 sat, isChannelOpener = true, isPrivate = false, "mutual-close") val e10 = TrampolinePaymentRelayed(randomBytes32(), Seq( PaymentEvent.IncomingPayment(randomBytes32(), dummyRemoteNodeId, 20000 msat, now - 7.seconds), @@ -92,8 +91,8 @@ class AuditDbSpec extends AnyFunSuite { ), randomKey().publicKey, 30000 msat) val multiPartPaymentHash = randomBytes32() - val e11 = ChannelPaymentRelayed(multiPartPaymentHash, PaymentEvent.IncomingPayment(randomBytes32(), dummyRemoteNodeId, 13000 msat, now - 5.seconds), PaymentEvent.OutgoingPayment(randomBytes32(), dummyRemoteNodeId, 11000 msat, now + 4.milli)) - val e12 = ChannelPaymentRelayed(multiPartPaymentHash, PaymentEvent.IncomingPayment(randomBytes32(), dummyRemoteNodeId, 15000 msat, now - 4.seconds), PaymentEvent.OutgoingPayment(randomBytes32(), dummyRemoteNodeId, 12500 msat, now + 5.milli)) + val e11 = ChannelPaymentRelayed(multiPartPaymentHash, Seq(PaymentEvent.IncomingPayment(randomBytes32(), dummyRemoteNodeId, 13000 msat, now - 5.seconds)), Seq(PaymentEvent.OutgoingPayment(randomBytes32(), dummyRemoteNodeId, 11000 msat, now + 4.milli))) + val e12 = ChannelPaymentRelayed(multiPartPaymentHash, Seq(PaymentEvent.IncomingPayment(randomBytes32(), dummyRemoteNodeId, 15000 msat, now - 4.seconds)), Seq(PaymentEvent.OutgoingPayment(randomBytes32(), dummyRemoteNodeId, 12500 msat, now + 5.milli))) db.add(e1) db.add(e2) @@ -144,26 +143,26 @@ class AuditDbSpec extends AnyFunSuite { val c5 = c1.copy(bytes = 0x05b +: c1.tail) val c6 = c1.copy(bytes = 0x06b +: c1.tail) - db.add(ChannelPaymentRelayed(randomBytes32(), PaymentEvent.IncomingPayment(c6, randomKey().publicKey, 46000 msat, 1000 unixms), PaymentEvent.OutgoingPayment(c1, randomKey().publicKey, 44000 msat, 1001 unixms))) - db.add(ChannelPaymentRelayed(randomBytes32(), PaymentEvent.IncomingPayment(c6, randomKey().publicKey, 41000 msat, 1002 unixms), PaymentEvent.OutgoingPayment(c1, randomKey().publicKey, 40000 msat, 1003 unixms))) - db.add(ChannelPaymentRelayed(randomBytes32(), PaymentEvent.IncomingPayment(c5, randomKey().publicKey, 43000 msat, 1004 unixms), PaymentEvent.OutgoingPayment(c1, randomKey().publicKey, 42000 msat, 1005 unixms))) - db.add(ChannelPaymentRelayed(randomBytes32(), PaymentEvent.IncomingPayment(c5, randomKey().publicKey, 42000 msat, 1006 unixms), PaymentEvent.OutgoingPayment(c2, randomKey().publicKey, 40000 msat, 1007 unixms))) - db.add(ChannelPaymentRelayed(randomBytes32(), PaymentEvent.IncomingPayment(c5, randomKey().publicKey, 45000 msat, 1008 unixms), PaymentEvent.OutgoingPayment(c6, randomKey().publicKey, 40000 msat, 1009 unixms))) + db.add(ChannelPaymentRelayed(randomBytes32(), Seq(PaymentEvent.IncomingPayment(c6, randomKey().publicKey, 46000 msat, 1000 unixms)), Seq(PaymentEvent.OutgoingPayment(c1, randomKey().publicKey, 44000 msat, 1001 unixms)))) + db.add(ChannelPaymentRelayed(randomBytes32(), Seq(PaymentEvent.IncomingPayment(c6, randomKey().publicKey, 41000 msat, 1002 unixms)), Seq(PaymentEvent.OutgoingPayment(c1, randomKey().publicKey, 40000 msat, 1003 unixms)))) + db.add(ChannelPaymentRelayed(randomBytes32(), Seq(PaymentEvent.IncomingPayment(c5, randomKey().publicKey, 43000 msat, 1004 unixms)), Seq(PaymentEvent.OutgoingPayment(c1, randomKey().publicKey, 42000 msat, 1005 unixms)))) + db.add(ChannelPaymentRelayed(randomBytes32(), Seq(PaymentEvent.IncomingPayment(c5, randomKey().publicKey, 42000 msat, 1006 unixms)), Seq(PaymentEvent.OutgoingPayment(c2, randomKey().publicKey, 40000 msat, 1007 unixms)))) + db.add(ChannelPaymentRelayed(randomBytes32(), Seq(PaymentEvent.IncomingPayment(c5, randomKey().publicKey, 45000 msat, 1008 unixms)), Seq(PaymentEvent.OutgoingPayment(c6, randomKey().publicKey, 40000 msat, 1009 unixms)))) db.add(TrampolinePaymentRelayed(randomBytes32(), Seq(PaymentEvent.IncomingPayment(c6, randomKey().publicKey, 25000 msat, 1010 unixms)), Seq(PaymentEvent.OutgoingPayment(c4, randomKey().publicKey, 20000 msat, 1011 unixms)), randomKey().publicKey, 15000 msat)) db.add(TrampolinePaymentRelayed(randomBytes32(), Seq(PaymentEvent.IncomingPayment(c6, randomKey().publicKey, 46000 msat, 1012 unixms)), Seq(PaymentEvent.OutgoingPayment(c2, randomKey().publicKey, 16000 msat, 1013 unixms), PaymentEvent.OutgoingPayment(c4, randomKey().publicKey, 10000 msat, 1014 unixms), PaymentEvent.OutgoingPayment(c4, randomKey().publicKey, 14000 msat, 1015 unixms)), randomKey().publicKey, 37000 msat)) // The following confirmed txs will be taken into account. - db.add(TransactionPublished(c2, n2, Transaction(0, Seq.empty, Seq(TxOut(5000 sat, hex"12345")), 0), 200 sat, "funding")) + db.add(TransactionPublished(c2, n2, Transaction(0, Seq.empty, Seq(TxOut(5000 sat, hex"12345")), 0), 200 sat, 0 sat, "funding", None)) db.add(TransactionConfirmed(c2, n2, Transaction(0, Seq.empty, Seq(TxOut(5000 sat, hex"12345")), 0))) - db.add(TransactionPublished(c2, n2, Transaction(0, Seq.empty, Seq(TxOut(4000 sat, hex"00112233")), 0), 300 sat, "mutual")) + db.add(TransactionPublished(c2, n2, Transaction(0, Seq.empty, Seq(TxOut(4000 sat, hex"00112233")), 0), 300 sat, 0 sat, "mutual", None)) db.add(TransactionConfirmed(c2, n2, Transaction(0, Seq.empty, Seq(TxOut(4000 sat, hex"00112233")), 0))) - db.add(TransactionPublished(c3, n3, Transaction(0, Seq.empty, Seq(TxOut(8000 sat, hex"deadbeef")), 0), 400 sat, "funding")) + db.add(TransactionPublished(c3, n3, Transaction(0, Seq.empty, Seq(TxOut(8000 sat, hex"deadbeef")), 0), 400 sat, 0 sat, "funding", None)) db.add(TransactionConfirmed(c3, n3, Transaction(0, Seq.empty, Seq(TxOut(8000 sat, hex"deadbeef")), 0))) - db.add(TransactionPublished(c4, n4, Transaction(0, Seq.empty, Seq(TxOut(6000 sat, hex"0000000000")), 0), 500 sat, "funding")) + db.add(TransactionPublished(c4, n4, Transaction(0, Seq.empty, Seq(TxOut(6000 sat, hex"0000000000")), 0), 500 sat, 0 sat, "funding", None)) db.add(TransactionConfirmed(c4, n4, Transaction(0, Seq.empty, Seq(TxOut(6000 sat, hex"0000000000")), 0))) // The following txs will not be taken into account. - db.add(TransactionPublished(c2, n2, Transaction(0, Seq.empty, Seq(TxOut(5000 sat, hex"12345")), 0), 1000 sat, "funding")) // duplicate - db.add(TransactionPublished(c4, n4, Transaction(0, Seq.empty, Seq(TxOut(4500 sat, hex"1111222233")), 0), 500 sat, "funding")) // unconfirmed + db.add(TransactionPublished(c2, n2, Transaction(0, Seq.empty, Seq(TxOut(5000 sat, hex"12345")), 0), 1000 sat, 0 sat, "funding", None)) // duplicate + db.add(TransactionPublished(c4, n4, Transaction(0, Seq.empty, Seq(TxOut(4500 sat, hex"1111222233")), 0), 500 sat, 0 sat, "funding", None)) // unconfirmed db.add(TransactionConfirmed(c4, n4, Transaction(0, Seq.empty, Seq(TxOut(2500 sat, hex"ffffff")), 0))) // doesn't match a published tx assert(db.listPublished(randomBytes32()).isEmpty) @@ -204,7 +203,7 @@ class AuditDbSpec extends AnyFunSuite { channelIds.foreach(channelId => { val nodeId = nodeIds(Random.nextInt(nodeCount)) val fundingTx = Transaction(0, Seq.empty, Seq(TxOut(5000 sat, Script.pay2wpkh(nodeId))), 0) - db.add(TransactionPublished(channelId, nodeId, fundingTx, 100 sat, "funding")) + db.add(TransactionPublished(channelId, nodeId, fundingTx, 100 sat, 0 sat, "funding", None)) db.add(TransactionConfirmed(channelId, nodeId, fundingTx)) }) // Add relay events. @@ -217,7 +216,7 @@ class AuditDbSpec extends AnyFunSuite { db.add(TrampolinePaymentRelayed(randomBytes32(), incoming, outgoing, randomKey().publicKey, 5000 msat)) } else { val toChannelId = channelIds(Random.nextInt(channelCount)) - db.add(ChannelPaymentRelayed(randomBytes32(), PaymentEvent.IncomingPayment(randomBytes32(), randomKey().publicKey, 10000 msat, TimestampMilli.now() - 2.seconds), PaymentEvent.OutgoingPayment(toChannelId, randomKey().publicKey, Random.nextInt(10000).msat, TimestampMilli.now()))) + db.add(ChannelPaymentRelayed(randomBytes32(), Seq(PaymentEvent.IncomingPayment(randomBytes32(), randomKey().publicKey, 10000 msat, TimestampMilli.now() - 2.seconds)), Seq(PaymentEvent.OutgoingPayment(toChannelId, randomKey().publicKey, Random.nextInt(10000).msat, TimestampMilli.now())))) } }) // Test starts here. diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/db/PgUtilsSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/db/PgUtilsSpec.scala index 89bfd76c5b..7e97f96d1a 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/db/PgUtilsSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/db/PgUtilsSpec.scala @@ -173,7 +173,7 @@ class PgUtilsSpec extends TestKitBaseClass with AnyFunSuiteLike with Eventually db.network.addNode(Announcements.makeNodeAnnouncement(randomKey(), "node-A", Color(50, 99, -80), Nil, Features.empty, TimestampSecond.now() - 45.days)) db.network.addNode(Announcements.makeNodeAnnouncement(randomKey(), "node-B", Color(50, 99, -80), Nil, Features.empty, TimestampSecond.now() - 3.days)) db.network.addNode(Announcements.makeNodeAnnouncement(randomKey(), "node-C", Color(50, 99, -80), Nil, Features.empty, TimestampSecond.now() - 7.minutes)) - db.audit.add(ChannelPaymentRelayed(randomBytes32(), PaymentEvent.IncomingPayment(randomBytes32(), randomKey().publicKey, 421 msat, TimestampMilli.now() - 5.seconds), PaymentEvent.OutgoingPayment(randomBytes32(), randomKey().publicKey, 400 msat, TimestampMilli.now() - 3.seconds))) + db.audit.add(ChannelPaymentRelayed(randomBytes32(), Seq(PaymentEvent.IncomingPayment(randomBytes32(), randomKey().publicKey, 421 msat, TimestampMilli.now() - 5.seconds)), Seq(PaymentEvent.OutgoingPayment(randomBytes32(), randomKey().publicKey, 400 msat, TimestampMilli.now() - 3.seconds)))) db.dataSource.close() } diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/json/JsonSerializersSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/json/JsonSerializersSpec.scala index ffdd099ba7..241bad4b98 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/json/JsonSerializersSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/json/JsonSerializersSpec.scala @@ -404,7 +404,7 @@ class JsonSerializersSpec extends TestKitBaseClass with AnyFunSuiteLike with Mat } test("type hints") { - val e1 = ChannelPaymentRelayed(randomBytes32(), PaymentEvent.IncomingPayment(randomBytes32(), randomKey().publicKey, 110 msat, 100 unixms), PaymentEvent.OutgoingPayment(randomBytes32(), randomKey().publicKey, 100 msat, 150 unixms)) + val e1 = ChannelPaymentRelayed(randomBytes32(), Seq(PaymentEvent.IncomingPayment(randomBytes32(), randomKey().publicKey, 110 msat, 100 unixms)), Seq(PaymentEvent.OutgoingPayment(randomBytes32(), randomKey().publicKey, 100 msat, 150 unixms))) assert(JsonSerializers.serialization.writePretty(e1)(JsonSerializers.formats).contains("\"type\" : \"payment-relayed\"")) } diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/payment/relay/ChannelRelayerSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/payment/relay/ChannelRelayerSpec.scala index 8f93d78191..7e9037af2a 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/payment/relay/ChannelRelayerSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/payment/relay/ChannelRelayerSpec.scala @@ -767,11 +767,11 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a val paymentRelayed = eventListener.expectMessageType[ChannelPaymentRelayed] assert(paymentRelayed.paymentHash == r.add.paymentHash) assert(paymentRelayed.amountIn == r.add.amountMsat) - assert(paymentRelayed.paymentIn.channelId == r.add.channelId) - assert(paymentRelayed.paymentIn.remoteNodeId == TestConstants.Alice.nodeParams.nodeId) + assert(paymentRelayed.incoming.map(_.channelId) == Seq(r.add.channelId)) + assert(paymentRelayed.incoming.map(_.remoteNodeId) == Seq(TestConstants.Alice.nodeParams.nodeId)) assert(paymentRelayed.amountOut == r.amountToForward) - assert(paymentRelayed.paymentOut.channelId == channelId1) - assert(paymentRelayed.paymentOut.remoteNodeId == remoteNodeId2) + assert(paymentRelayed.outgoing.map(_.channelId) == Seq(channelId1)) + assert(paymentRelayed.outgoing.map(_.remoteNodeId) == Seq(remoteNodeId2)) assert(paymentRelayed.startedAt == r.receivedAt) assert(paymentRelayed.settledAt >= now) } diff --git a/eclair-node/src/test/scala/fr/acinq/eclair/api/ApiServiceSpec.scala b/eclair-node/src/test/scala/fr/acinq/eclair/api/ApiServiceSpec.scala index 71529e795c..fb1a12cb2d 100644 --- a/eclair-node/src/test/scala/fr/acinq/eclair/api/ApiServiceSpec.scala +++ b/eclair-node/src/test/scala/fr/acinq/eclair/api/ApiServiceSpec.scala @@ -1136,8 +1136,8 @@ class ApiServiceSpec extends AnyFunSuite with ScalatestRouteTest with IdiomaticM system.eventStream.publish(ps) wsClient.expectMessage(expectedSerializedPs) - val prel = ChannelPaymentRelayed(ByteVector32.Zeroes, PaymentEvent.IncomingPayment(ByteVector32.Zeroes, previousNodeId, 21 msat, TimestampMilli(1553784961048L)), PaymentEvent.OutgoingPayment(ByteVector32.One, nextNodeId, 20 msat, TimestampMilli(1553784963659L))) - val expectedSerializedPrel = """{"type":"payment-relayed","paymentHash":"0000000000000000000000000000000000000000000000000000000000000000","paymentIn":{"channelId":"0000000000000000000000000000000000000000000000000000000000000000","remoteNodeId":"02e899d99662f2e64ea0eeaecb53c4628fa40a22d7185076e42e8a3d67fcb7b8e6","amount":21,"receivedAt":{"iso":"2019-03-28T14:56:01.048Z","unix":1553784961}},"paymentOut":{"channelId":"0100000000000000000000000000000000000000000000000000000000000000","remoteNodeId":"030bb6a5e0c6b203c7e2180fb78c7ba4bdce46126761d8201b91ddac089cdecc87","amount":20,"settledAt":{"iso":"2019-03-28T14:56:03.659Z","unix":1553784963}}}""" + val prel = ChannelPaymentRelayed(ByteVector32.Zeroes, Seq(PaymentEvent.IncomingPayment(ByteVector32.Zeroes, previousNodeId, 21 msat, TimestampMilli(1553784961048L))), Seq(PaymentEvent.OutgoingPayment(ByteVector32.One, nextNodeId, 20 msat, TimestampMilli(1553784963659L)))) + val expectedSerializedPrel = """{"type":"payment-relayed","paymentHash":"0000000000000000000000000000000000000000000000000000000000000000","incoming":[{"channelId":"0000000000000000000000000000000000000000000000000000000000000000","remoteNodeId":"02e899d99662f2e64ea0eeaecb53c4628fa40a22d7185076e42e8a3d67fcb7b8e6","amount":21,"receivedAt":{"iso":"2019-03-28T14:56:01.048Z","unix":1553784961}}],"outgoing":[{"channelId":"0100000000000000000000000000000000000000000000000000000000000000","remoteNodeId":"030bb6a5e0c6b203c7e2180fb78c7ba4bdce46126761d8201b91ddac089cdecc87","amount":20,"settledAt":{"iso":"2019-03-28T14:56:03.659Z","unix":1553784963}}]}""" assert(serialization.write(prel) == expectedSerializedPrel) system.eventStream.publish(prel) wsClient.expectMessage(expectedSerializedPrel)