From f777814a7a265f2bc0f4b3d743ad9e758c144d51 Mon Sep 17 00:00:00 2001 From: t-bast Date: Tue, 3 Feb 2026 11:41:01 +0100 Subject: [PATCH 1/2] Improve channel and payment events We improve the following events: - `TransactionPublished` includes more details about mining fees and an optional liquidity purchase - all channel events include the latest `channel_type` - `PaymentRelayed` exposes the `relayFee` earned --- .../acinq/eclair/channel/ChannelEvents.scala | 18 +++++++-- .../fr/acinq/eclair/channel/Helpers.scala | 7 ++-- .../channel/fsm/DualFundingHandlers.scala | 2 +- .../eclair/channel/fsm/ErrorHandlers.scala | 15 +++++--- .../channel/fsm/SingleFundingHandlers.scala | 2 +- .../channel/publish/MempoolTxMonitor.scala | 4 +- .../fr/acinq/eclair/db/DbEventHandler.scala | 26 ++++++------- .../fr/acinq/eclair/db/pg/PgAuditDb.scala | 19 +++------- .../eclair/db/sqlite/SqliteAuditDb.scala | 19 +++------- .../acinq/eclair/payment/PaymentEvents.scala | 8 ++-- .../eclair/payment/relay/ChannelRelay.scala | 2 +- .../relay/PostRestartHtlcCleaner.scala | 2 +- .../eclair/wire/protocol/LiquidityAds.scala | 4 +- .../publish/MempoolTxMonitorSpec.scala | 7 +++- .../fr/acinq/eclair/db/AuditDbSpec.scala | 37 +++++++++---------- .../fr/acinq/eclair/db/PgUtilsSpec.scala | 2 +- .../eclair/json/JsonSerializersSpec.scala | 2 +- .../payment/relay/ChannelRelayerSpec.scala | 8 ++-- .../fr/acinq/eclair/api/ApiServiceSpec.scala | 4 +- 19 files changed, 95 insertions(+), 93 deletions(-) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/ChannelEvents.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/ChannelEvents.scala index 890b93e322..cee5e99f21 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/ChannelEvents.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/ChannelEvents.scala @@ -21,8 +21,9 @@ import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey import fr.acinq.bitcoin.scalacompat.{ByteVector32, Satoshi, Transaction, TxId} import fr.acinq.eclair.blockchain.fee.FeeratePerKw import fr.acinq.eclair.channel.Helpers.Closing.ClosingType +import fr.acinq.eclair.transactions.Transactions import fr.acinq.eclair.wire.protocol._ -import fr.acinq.eclair.{BlockHeight, CltvExpiry, Features, MilliSatoshi, RealShortChannelId, ShortChannelId} +import fr.acinq.eclair.{BlockHeight, CltvExpiry, Features, MilliSatoshi, RealShortChannelId, ShortChannelId, TimestampMilli} /** * Created by PM on 17/08/2016. @@ -92,10 +93,19 @@ case class ChannelLiquidityPurchased(channel: ActorRef, channelId: ByteVector32, case class ChannelErrorOccurred(channel: ActorRef, channelId: ByteVector32, remoteNodeId: PublicKey, error: ChannelError, isFatal: Boolean) extends ChannelEvent -// NB: the fee should be set to 0 when we're not paying it. -case class TransactionPublished(channelId: ByteVector32, remoteNodeId: PublicKey, tx: Transaction, miningFee: Satoshi, desc: String) extends ChannelEvent +/** + * We published a transaction related to the given [[channelId]]. + * + * @param localMiningFee mining fee paid by us in the given [[tx]]. + * @param remoteMiningFee mining fee paid by our channel peer in the given [[tx]]. + * @param liquidityPurchase_opt optional liquidity purchase included in this transaction. + */ +case class TransactionPublished(channelId: ByteVector32, remoteNodeId: PublicKey, tx: Transaction, localMiningFee: Satoshi, remoteMiningFee: Satoshi, desc: String, liquidityPurchase_opt: Option[LiquidityAds.PurchaseBasicInfo], timestamp: TimestampMilli = TimestampMilli.now()) extends ChannelEvent { + val miningFee: Satoshi = localMiningFee + remoteMiningFee + val feerate: FeeratePerKw = Transactions.fee2rate(miningFee, tx.weight()) +} -case class TransactionConfirmed(channelId: ByteVector32, remoteNodeId: PublicKey, tx: Transaction) extends ChannelEvent +case class TransactionConfirmed(channelId: ByteVector32, remoteNodeId: PublicKey, tx: Transaction, timestamp: TimestampMilli = TimestampMilli.now()) extends ChannelEvent // NB: this event is only sent when the channel is available. case class AvailableBalanceChanged(channel: ActorRef, channelId: ByteVector32, aliases: ShortIdAliases, commitments: Commitments, lastAnnouncement_opt: Option[ChannelAnnouncement]) extends ChannelEvent diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/Helpers.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/Helpers.scala index b50b4a15f6..e09f4221a4 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/Helpers.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/Helpers.scala @@ -971,11 +971,12 @@ object Helpers { } } - /** Compute the fee paid by a commitment transaction. */ - def commitTxFee(commitInput: InputInfo, commitTx: Transaction, localPaysCommitTxFees: Boolean): Satoshi = { + /** Compute the fee paid by a commitment transaction. The first result is the fee paid by us, the second one is the fee paid by our peer. */ + def commitTxFee(commitInput: InputInfo, commitTx: Transaction, localPaysCommitTxFees: Boolean): (Satoshi, Satoshi) = { require(commitTx.txIn.size == 1, "transaction must have only one input") require(commitTx.txIn.exists(txIn => txIn.outPoint == commitInput.outPoint), "transaction must spend the funding output") - if (localPaysCommitTxFees) commitInput.txOut.amount - commitTx.txOut.map(_.amount).sum else 0 sat + val commitFee = commitInput.txOut.amount - commitTx.txOut.map(_.amount).sum + if (localPaysCommitTxFees) (commitFee, 0 sat) else (0 sat, commitFee) } /** Return the confirmation target that should be used for our local commitment. */ diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/DualFundingHandlers.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/DualFundingHandlers.scala index 0f891bea1a..75c188f413 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/DualFundingHandlers.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/DualFundingHandlers.scala @@ -51,7 +51,7 @@ trait DualFundingHandlers extends CommonFundingHandlers { // to publish and we may be able to RBF. wallet.publishTransaction(fundingTx.signedTx).onComplete { case Success(_) => - context.system.eventStream.publish(TransactionPublished(dualFundedTx.fundingParams.channelId, remoteNodeId, fundingTx.signedTx, fundingTx.tx.localFees.truncateToSatoshi, "funding")) + context.system.eventStream.publish(TransactionPublished(dualFundedTx.fundingParams.channelId, remoteNodeId, fundingTx.signedTx, localMiningFee = fundingTx.tx.localFees.truncateToSatoshi, remoteMiningFee = fundingTx.tx.remoteFees.truncateToSatoshi, "funding", dualFundedTx.liquidityPurchase_opt)) // We rely on Bitcoin Core ZMQ notifications to learn about transactions that appear in our mempool, but // it doesn't provide strong guarantees that we'll always receive an event. This can be an issue for 0-conf // funding transactions, where we end up delaying our channel_ready or splice_locked. diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/ErrorHandlers.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/ErrorHandlers.scala index 6807f366ff..96a1f2a516 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/ErrorHandlers.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/ErrorHandlers.scala @@ -232,7 +232,8 @@ trait ErrorHandlers extends CommonHandlers { /** Publish 2nd-stage transactions for our local commitment. */ def doPublish(lcp: LocalCommitPublished, txs: Closing.LocalClose.SecondStageTransactions, commitment: FullCommitment): Unit = { - val publishCommitTx = PublishFinalTx(lcp.commitTx, commitment.fundingInput, "commit-tx", Closing.commitTxFee(commitment.commitInput(channelKeys), lcp.commitTx, commitment.localChannelParams.paysCommitTxFees), None) + val (localCommitFee, _) = Closing.commitTxFee(commitment.commitInput(channelKeys), lcp.commitTx, commitment.localChannelParams.paysCommitTxFees) + val publishCommitTx = PublishFinalTx(lcp.commitTx, commitment.fundingInput, "commit-tx", localCommitFee, None) val publishAnchorTx_opt = txs.anchorTx_opt match { case Some(anchorTx) if !lcp.isConfirmed => val confirmationTarget = Closing.confirmationTarget(commitment.localCommit, commitment.localCommitParams.dustLimit, commitment.commitmentFormat, nodeParams.onChainFeeConf) @@ -274,7 +275,8 @@ trait ErrorHandlers extends CommonHandlers { case closing: DATA_CLOSING => nodeParams.onChainFeeConf.getClosingFeerate(nodeParams.currentBitcoinCoreFeerates, closing.maxClosingFeerate_opt) case _ => nodeParams.onChainFeeConf.getClosingFeerate(nodeParams.currentBitcoinCoreFeerates, maxClosingFeerateOverride_opt = None) } - context.system.eventStream.publish(TransactionPublished(d.channelId, remoteNodeId, commitTx, Closing.commitTxFee(commitments.commitInput(channelKeys), commitTx, d.commitments.localChannelParams.paysCommitTxFees), "remote-commit")) + val (localCommitFee, remoteCommitFee) = Closing.commitTxFee(commitments.commitInput(channelKeys), commitTx, d.commitments.localChannelParams.paysCommitTxFees) + context.system.eventStream.publish(TransactionPublished(d.channelId, remoteNodeId, commitTx, localCommitFee, remoteCommitFee, "remote-commit", None)) val (remoteCommitPublished, closingTxs) = Closing.RemoteClose.claimCommitTxOutputs(channelKeys, commitments, commitments.remoteCommit, commitTx, closingFeerate, finalScriptPubKey, nodeParams.onChainFeeConf.spendAnchorWithoutHtlcs) val nextData = d match { case closing: DATA_CLOSING => closing.copy(remoteCommitPublished = Some(remoteCommitPublished)) @@ -296,7 +298,8 @@ trait ErrorHandlers extends CommonHandlers { case closing: DATA_CLOSING => nodeParams.onChainFeeConf.getClosingFeerate(nodeParams.currentBitcoinCoreFeerates, closing.maxClosingFeerate_opt) case _ => nodeParams.onChainFeeConf.getClosingFeerate(nodeParams.currentBitcoinCoreFeerates, maxClosingFeerateOverride_opt = None) } - context.system.eventStream.publish(TransactionPublished(d.channelId, remoteNodeId, commitTx, Closing.commitTxFee(commitment.commitInput(channelKeys), commitTx, d.commitments.localChannelParams.paysCommitTxFees), "next-remote-commit")) + val (localCommitFee, remoteCommitFee) = Closing.commitTxFee(commitment.commitInput(channelKeys), commitTx, d.commitments.localChannelParams.paysCommitTxFees) + context.system.eventStream.publish(TransactionPublished(d.channelId, remoteNodeId, commitTx, localCommitFee, remoteCommitFee, "next-remote-commit", None)) val (remoteCommitPublished, closingTxs) = Closing.RemoteClose.claimCommitTxOutputs(channelKeys, commitment, remoteCommit, commitTx, closingFeerate, finalScriptPubKey, nodeParams.onChainFeeConf.spendAnchorWithoutHtlcs) val nextData = d match { case closing: DATA_CLOSING => closing.copy(nextRemoteCommitPublished = Some(remoteCommitPublished)) @@ -350,7 +353,8 @@ trait ErrorHandlers extends CommonHandlers { val dustLimit = commitment.localCommitParams.dustLimit val (revokedCommitPublished, closingTxs) = Closing.RevokedClose.claimCommitTxOutputs(d.commitments.channelParams, channelKeys, tx, commitmentNumber, remotePerCommitmentSecret, toSelfDelay, commitmentFormat, nodeParams.db.channels, dustLimit, nodeParams.currentBitcoinCoreFeerates, nodeParams.onChainFeeConf, finalScriptPubKey) log.warning("txid={} was a revoked commitment, publishing the penalty tx", tx.txid) - context.system.eventStream.publish(TransactionPublished(d.channelId, remoteNodeId, tx, Closing.commitTxFee(commitment.commitInput(channelKeys), tx, d.commitments.localChannelParams.paysCommitTxFees), "revoked-commit")) + val (localCommitFee, remoteCommitFee) = Closing.commitTxFee(commitment.commitInput(channelKeys), tx, d.commitments.localChannelParams.paysCommitTxFees) + context.system.eventStream.publish(TransactionPublished(d.channelId, remoteNodeId, tx, localCommitFee, remoteCommitFee, "revoked-commit", None)) val exc = FundingTxSpent(d.channelId, tx.txid) val error = Error(d.channelId, exc.getMessage) val nextData = d match { @@ -364,7 +368,8 @@ trait ErrorHandlers extends CommonHandlers { case None => d match { case d: DATA_WAIT_FOR_REMOTE_PUBLISH_FUTURE_COMMITMENT => log.warning("they published a future commit (because we asked them to) in txid={}", tx.txid) - context.system.eventStream.publish(TransactionPublished(d.channelId, remoteNodeId, tx, Closing.commitTxFee(d.commitments.latest.commitInput(channelKeys), tx, d.commitments.localChannelParams.paysCommitTxFees), "future-remote-commit")) + val (localCommitFee, remoteCommitFee) = Closing.commitTxFee(d.commitments.latest.commitInput(channelKeys), tx, d.commitments.localChannelParams.paysCommitTxFees) + context.system.eventStream.publish(TransactionPublished(d.channelId, remoteNodeId, tx, localCommitFee, remoteCommitFee, "future-remote-commit", None)) val remotePerCommitmentPoint = d.remoteChannelReestablish.myCurrentPerCommitmentPoint val commitKeys = d.commitments.latest.remoteKeys(channelKeys, remotePerCommitmentPoint) val closingFeerate = nodeParams.onChainFeeConf.getClosingFeerate(nodeParams.currentBitcoinCoreFeerates, maxClosingFeerateOverride_opt = None) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/SingleFundingHandlers.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/SingleFundingHandlers.scala index 95a1192493..27994b16f4 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/SingleFundingHandlers.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/SingleFundingHandlers.scala @@ -43,7 +43,7 @@ trait SingleFundingHandlers extends CommonFundingHandlers { def publishFundingTx(channelId: ByteVector32, fundingTx: Transaction, fundingTxFee: Satoshi, replyTo: akka.actor.typed.ActorRef[OpenChannelResponse]): Unit = { wallet.commit(fundingTx).onComplete { case Success(true) => - context.system.eventStream.publish(TransactionPublished(channelId, remoteNodeId, fundingTx, fundingTxFee, "funding")) + context.system.eventStream.publish(TransactionPublished(channelId, remoteNodeId, fundingTx, localMiningFee = fundingTxFee, remoteMiningFee = 0 sat, "funding", None)) replyTo ! OpenChannelResponse.Created(channelId, fundingTxId = fundingTx.txid, fundingTxFee) case Success(false) => replyTo ! OpenChannelResponse.Rejected("couldn't publish funding tx") diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/publish/MempoolTxMonitor.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/publish/MempoolTxMonitor.scala index 0470229637..df7c334063 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/publish/MempoolTxMonitor.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/publish/MempoolTxMonitor.scala @@ -19,7 +19,7 @@ package fr.acinq.eclair.channel.publish import akka.actor.typed.eventstream.EventStream import akka.actor.typed.scaladsl.{ActorContext, Behaviors, TimerScheduler} import akka.actor.typed.{ActorRef, Behavior} -import fr.acinq.bitcoin.scalacompat.{ByteVector32, OutPoint, Satoshi, Transaction, TxId} +import fr.acinq.bitcoin.scalacompat.{ByteVector32, OutPoint, Satoshi, SatoshiLong, Transaction, TxId} import fr.acinq.eclair.blockchain.CurrentBlockHeight import fr.acinq.eclair.blockchain.bitcoind.rpc.BitcoinCoreClient import fr.acinq.eclair.channel.publish.TxPublisher.{TxPublishContext, TxRejectedReason} @@ -136,7 +136,7 @@ private class MempoolTxMonitor(nodeParams: NodeParams, private def waitForConfirmation(): Behavior[Command] = { context.system.eventStream ! EventStream.Subscribe(context.messageAdapter[CurrentBlockHeight](cbc => WrappedCurrentBlockHeight(cbc.blockHeight))) - context.system.eventStream ! EventStream.Publish(TransactionPublished(txPublishContext.channelId_opt.getOrElse(ByteVector32.Zeroes), txPublishContext.remoteNodeId, cmd.tx, cmd.fee, cmd.desc)) + context.system.eventStream ! EventStream.Publish(TransactionPublished(txPublishContext.channelId_opt.getOrElse(ByteVector32.Zeroes), txPublishContext.remoteNodeId, cmd.tx, localMiningFee = cmd.fee, remoteMiningFee = 0 sat, cmd.desc, None)) Behaviors.receiveMessagePartial { case WrappedCurrentBlockHeight(currentBlockHeight) => timers.startSingleTimer(CheckTxConfirmationsKey, CheckTxConfirmations(currentBlockHeight), (1 + Random.nextLong(nodeParams.channelConf.maxTxPublishRetryDelay.toMillis)).millis) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/DbEventHandler.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/DbEventHandler.scala index ba249a831d..9275ade6f4 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/DbEventHandler.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/DbEventHandler.scala @@ -29,7 +29,7 @@ import fr.acinq.eclair.channel._ import fr.acinq.eclair.db.DbEventHandler.ChannelEvent import fr.acinq.eclair.payment.Monitoring.{Metrics => PaymentMetrics, Tags => PaymentTags} import fr.acinq.eclair.payment._ -import fr.acinq.eclair.{Logs, NodeParams} +import fr.acinq.eclair.{Logs, NodeParams, TimestampMilli} /** * This actor sits at the interface between our event stream and the database. @@ -90,8 +90,8 @@ class DbEventHandler(nodeParams: NodeParams) extends Actor with DiagnosticActorL incoming.foreach(p => channelsDb.updateChannelMeta(p.channelId, ChannelEvent.EventType.PaymentReceived)) outgoing.foreach(p => channelsDb.updateChannelMeta(p.channelId, ChannelEvent.EventType.PaymentSent)) case ChannelPaymentRelayed(_, incoming, outgoing) => - channelsDb.updateChannelMeta(incoming.channelId, ChannelEvent.EventType.PaymentReceived) - channelsDb.updateChannelMeta(outgoing.channelId, ChannelEvent.EventType.PaymentSent) + incoming.foreach(i => channelsDb.updateChannelMeta(i.channelId, ChannelEvent.EventType.PaymentReceived)) + outgoing.foreach(o => channelsDb.updateChannelMeta(o.channelId, ChannelEvent.EventType.PaymentSent)) case OnTheFlyFundingPaymentRelayed(_, incoming, outgoing) => incoming.foreach(p => channelsDb.updateChannelMeta(p.channelId, ChannelEvent.EventType.PaymentReceived)) outgoing.foreach(p => channelsDb.updateChannelMeta(p.channelId, ChannelEvent.EventType.PaymentSent)) @@ -124,7 +124,7 @@ class DbEventHandler(nodeParams: NodeParams) extends Actor with DiagnosticActorL case ChannelStateChanged(_, channelId, _, remoteNodeId, WAIT_FOR_CHANNEL_READY | WAIT_FOR_DUAL_FUNDING_READY, NORMAL, Some(commitments)) => ChannelMetrics.ChannelLifecycleEvents.withTag(ChannelTags.Event, ChannelTags.Events.Created).increment() val event = ChannelEvent.EventType.Created - auditDb.add(ChannelEvent(channelId, remoteNodeId, commitments.latest.fundingTxId, commitments.latest.capacity, commitments.localChannelParams.isChannelOpener, !commitments.announceChannel, event)) + auditDb.add(ChannelEvent(channelId, remoteNodeId, commitments.latest.fundingTxId, commitments.latest.commitmentFormat.toString, commitments.latest.capacity, commitments.localChannelParams.isChannelOpener, !commitments.announceChannel, event.label)) channelsDb.updateChannelMeta(channelId, event) case ChannelStateChanged(_, channelId, _, _, OFFLINE, SYNCING, _) => channelsDb.updateChannelMeta(channelId, ChannelEvent.EventType.Connected) @@ -141,7 +141,7 @@ class DbEventHandler(nodeParams: NodeParams) extends Actor with DiagnosticActorL case 0 => ChannelEvent.EventType.Confirmed case _ => ChannelEvent.EventType.Spliced } - auditDb.add(ChannelEvent(e.channelId, e.remoteNodeId, e.fundingTxId, e.commitments.latest.capacity, e.commitments.localChannelParams.isChannelOpener, !e.commitments.announceChannel, event)) + auditDb.add(ChannelEvent(e.channelId, e.remoteNodeId, e.fundingTxId, e.commitments.latest.commitmentFormat.toString, e.commitments.latest.capacity, e.commitments.localChannelParams.isChannelOpener, !e.commitments.announceChannel, event.label)) case e: ChannelClosed => ChannelMetrics.ChannelLifecycleEvents.withTag(ChannelTags.Event, ChannelTags.Events.Closed).increment() @@ -150,7 +150,7 @@ class DbEventHandler(nodeParams: NodeParams) extends Actor with DiagnosticActorL // spent by the closing transaction. val capacity = e.commitments.latest.capacity val fundingTxId = e.commitments.latest.fundingTxId - auditDb.add(ChannelEvent(e.channelId, e.commitments.remoteNodeId, fundingTxId, capacity, e.commitments.localChannelParams.isChannelOpener, !e.commitments.announceChannel, event)) + auditDb.add(ChannelEvent(e.channelId, e.commitments.remoteNodeId, fundingTxId, e.commitments.latest.commitmentFormat.toString, capacity, e.commitments.localChannelParams.isChannelOpener, !e.commitments.announceChannel, event.label)) channelsDb.updateChannelMeta(e.channelId, event) case u: ChannelUpdateParametersChanged => @@ -178,7 +178,7 @@ object DbEventHandler { def props(nodeParams: NodeParams): Props = Props(new DbEventHandler(nodeParams)) // @formatter:off - case class ChannelEvent(channelId: ByteVector32, remoteNodeId: PublicKey, fundingTxId: TxId, capacity: Satoshi, isChannelOpener: Boolean, isPrivate: Boolean, event: ChannelEvent.EventType) + case class ChannelEvent(channelId: ByteVector32, remoteNodeId: PublicKey, fundingTxId: TxId, channelType: String, capacity: Satoshi, isChannelOpener: Boolean, isPrivate: Boolean, event: String, timestamp: TimestampMilli = TimestampMilli.now()) object ChannelEvent { sealed trait EventType { def label: String } object EventType { @@ -190,12 +190,12 @@ object DbEventHandler { object PaymentReceived extends EventType { override def label: String = "received" } case class Closed(closingType: ClosingType) extends EventType { override def label: String = closingType match { - case _: MutualClose => "mutual" - case _: LocalClose => "local" - case _: CurrentRemoteClose => "remote" - case _: NextRemoteClose => "remote" - case _: RecoveryClose => "recovery" - case _: RevokedClose => "revoked" + case _: MutualClose => "mutual-close" + case _: LocalClose => "local-close" + case _: CurrentRemoteClose => "remote-close" + case _: NextRemoteClose => "remote-close" + case _: RecoveryClose => "recovery-close" + case _: RevokedClose => "revoked-close" } } } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgAuditDb.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgAuditDb.scala index 4caa243a72..e3e7a83e7f 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgAuditDb.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgAuditDb.scala @@ -195,7 +195,7 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging { statement.setLong(3, e.capacity.toLong) statement.setBoolean(4, e.isChannelOpener) statement.setBoolean(5, e.isPrivate) - statement.setString(6, e.event.label) + statement.setString(6, e.event) statement.setTimestamp(7, Timestamp.from(Instant.now())) statement.executeUpdate() } @@ -243,8 +243,8 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging { val payments = e match { case e: ChannelPaymentRelayed => // non-trampoline relayed payments have one input and one output - val in = Seq(RelayedPart(e.paymentIn.channelId, e.paymentIn.amount, "IN", "channel", e.startedAt)) - val out = Seq(RelayedPart(e.paymentOut.channelId, e.paymentOut.amount, "OUT", "channel", e.settledAt)) + val in = e.incoming.map(i => RelayedPart(i.channelId, i.amount, "IN", "channel", i.receivedAt)) + val out = e.outgoing.map(o => RelayedPart(o.channelId, o.amount, "OUT", "channel", o.settledAt)) in ++ out case TrampolinePaymentRelayed(_, incoming, outgoing, nextTrampolineNodeId, nextTrampolineAmount) => using(pg.prepareStatement("INSERT INTO audit.relayed_trampoline VALUES (?, ?, ?, ?)")) { statement => @@ -450,7 +450,7 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging { val outgoing = parts.filter(_.direction == "OUT").map(p => PaymentEvent.OutgoingPayment(p.channelId, PrivateKey(ByteVector32.One).publicKey, p.amount, p.timestamp)).sortBy(_.amount) parts.headOption match { case Some(RelayedPart(_, _, _, "channel", _)) => incoming.zip(outgoing).map { - case (in, out) => ChannelPaymentRelayed(paymentHash, in, out) + case (in, out) => ChannelPaymentRelayed(paymentHash, Seq(in), Seq(out)) } case Some(RelayedPart(_, _, _, "trampoline", _)) => trampolineByHash.get(paymentHash) match { case Some((nextTrampolineAmount, nextTrampolineNodeId)) => TrampolinePaymentRelayed(paymentHash, incoming, outgoing, nextTrampolineNodeId, nextTrampolineAmount) :: Nil @@ -502,16 +502,7 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging { val relayed = listRelayed(from, to).foldLeft(Map.empty[ByteVector32, Seq[Relayed]]) { (previous, e) => // NB: we must avoid counting the fee twice: we associate it to the outgoing channels rather than the incoming ones. - val current = e match { - case c: ChannelPaymentRelayed => Map( - c.paymentIn.channelId -> (Relayed(c.amountIn, 0 msat, "IN") +: previous.getOrElse(c.paymentIn.channelId, Nil)), - c.paymentOut.channelId -> (Relayed(c.amountOut, c.amountIn - c.amountOut, "OUT") +: previous.getOrElse(c.paymentOut.channelId, Nil)), - ) - case t: TrampolinePaymentRelayed => - aggregateRelayStats(previous, t.incoming, t.outgoing) - case f: OnTheFlyFundingPaymentRelayed => - aggregateRelayStats(previous, f.incoming, f.outgoing) - } + val current = aggregateRelayStats(previous, e.incoming, e.outgoing) previous ++ current } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteAuditDb.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteAuditDb.scala index 464d9daed4..d753208fcb 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteAuditDb.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteAuditDb.scala @@ -189,7 +189,7 @@ class SqliteAuditDb(val sqlite: Connection) extends AuditDb with Logging { statement.setLong(3, e.capacity.toLong) statement.setBoolean(4, e.isChannelOpener) statement.setBoolean(5, e.isPrivate) - statement.setString(6, e.event.label) + statement.setString(6, e.event) statement.setLong(7, TimestampMilli.now().toLong) statement.executeUpdate() } @@ -231,8 +231,8 @@ class SqliteAuditDb(val sqlite: Connection) extends AuditDb with Logging { val payments = e match { case e: ChannelPaymentRelayed => // non-trampoline relayed payments have one input and one output - val in = Seq(RelayedPart(e.paymentIn.channelId, e.paymentIn.amount, "IN", "channel", e.startedAt)) - val out = Seq(RelayedPart(e.paymentOut.channelId, e.paymentOut.amount, "OUT", "channel", e.settledAt)) + val in = e.incoming.map(i => RelayedPart(i.channelId, i.amount, "IN", "channel", i.receivedAt)) + val out = e.outgoing.map(o => RelayedPart(o.channelId, o.amount, "OUT", "channel", o.settledAt)) in ++ out case TrampolinePaymentRelayed(_, incoming, outgoing, nextTrampolineNodeId, nextTrampolineAmount) => using(sqlite.prepareStatement("INSERT INTO relayed_trampoline VALUES (?, ?, ?, ?)")) { statement => @@ -421,7 +421,7 @@ class SqliteAuditDb(val sqlite: Connection) extends AuditDb with Logging { val outgoing = parts.filter(_.direction == "OUT").map(p => PaymentEvent.OutgoingPayment(p.channelId, PrivateKey(ByteVector32.One).publicKey, p.amount, p.timestamp)).sortBy(_.amount) parts.headOption match { case Some(RelayedPart(_, _, _, "channel", _)) => incoming.zip(outgoing).map { - case (in, out) => ChannelPaymentRelayed(paymentHash, in, out) + case (in, out) => ChannelPaymentRelayed(paymentHash, Seq(in), Seq(out)) } case Some(RelayedPart(_, _, _, "trampoline", _)) => trampolineByHash.get(paymentHash) match { case Some((nextTrampolineAmount, nextTrampolineNodeId)) => TrampolinePaymentRelayed(paymentHash, incoming, outgoing, nextTrampolineNodeId, nextTrampolineAmount) :: Nil @@ -472,16 +472,7 @@ class SqliteAuditDb(val sqlite: Connection) extends AuditDb with Logging { val relayed = listRelayed(from, to).foldLeft(Map.empty[ByteVector32, Seq[Relayed]]) { (previous, e) => // NB: we must avoid counting the fee twice: we associate it to the outgoing channels rather than the incoming ones. - val current = e match { - case c: ChannelPaymentRelayed => Map( - c.paymentIn.channelId -> (Relayed(c.amountIn, 0 msat, "IN") +: previous.getOrElse(c.paymentIn.channelId, Nil)), - c.paymentOut.channelId -> (Relayed(c.amountOut, c.amountIn - c.amountOut, "OUT") +: previous.getOrElse(c.paymentOut.channelId, Nil)), - ) - case t: TrampolinePaymentRelayed => - aggregateRelayStats(previous, t.incoming, t.outgoing) - case f: OnTheFlyFundingPaymentRelayed => - aggregateRelayStats(previous, f.incoming, f.outgoing) - } + val current = aggregateRelayStats(previous, e.incoming, e.outgoing) previous ++ current } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/payment/PaymentEvents.scala b/eclair-core/src/main/scala/fr/acinq/eclair/payment/PaymentEvents.scala index 7874753217..f8553a811a 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/payment/PaymentEvents.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/payment/PaymentEvents.scala @@ -120,16 +120,14 @@ sealed trait PaymentRelayed extends PaymentEvent { def outgoing: Seq[PaymentEvent.OutgoingPayment] def amountIn: MilliSatoshi = incoming.map(_.amount).sum def amountOut: MilliSatoshi = outgoing.map(_.amount).sum + def relayFee: MilliSatoshi = amountIn - amountOut override def startedAt: TimestampMilli = incoming.map(_.receivedAt).minOption.getOrElse(TimestampMilli.now()) override def settledAt: TimestampMilli = outgoing.map(_.settledAt).maxOption.getOrElse(TimestampMilli.now()) // @formatter:on } -/** A payment was successfully relayed from a single incoming channel to a single outgoing channel. */ -case class ChannelPaymentRelayed(paymentHash: ByteVector32, paymentIn: PaymentEvent.IncomingPayment, paymentOut: PaymentEvent.OutgoingPayment) extends PaymentRelayed { - override val incoming: Seq[PaymentEvent.IncomingPayment] = Seq(paymentIn) - override val outgoing: Seq[PaymentEvent.OutgoingPayment] = Seq(paymentOut) -} +/** A payment was successfully relayed from incoming channels to outgoing channels. */ +case class ChannelPaymentRelayed(paymentHash: ByteVector32, incoming: Seq[PaymentEvent.IncomingPayment], outgoing: Seq[PaymentEvent.OutgoingPayment]) extends PaymentRelayed /** A trampoline payment was successfully relayed, using potentially multiple incoming and outgoing channels. */ case class TrampolinePaymentRelayed(paymentHash: ByteVector32, incoming: Seq[PaymentEvent.IncomingPayment], outgoing: Seq[PaymentEvent.OutgoingPayment], nextTrampolineNodeId: PublicKey, nextTrampolineAmount: MilliSatoshi) extends PaymentRelayed diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/ChannelRelay.scala b/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/ChannelRelay.scala index 97f987787b..2911c4dd33 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/ChannelRelay.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/ChannelRelay.scala @@ -258,7 +258,7 @@ class ChannelRelay private(nodeParams: NodeParams, val cmd = CMD_FULFILL_HTLC(upstream.add.id, fulfill.paymentPreimage, Some(attribution), commit = true) val incoming = PaymentEvent.IncomingPayment(upstream.add.channelId, upstream.receivedFrom, upstream.amountIn, upstream.receivedAt) val outgoing = PaymentEvent.OutgoingPayment(htlc.channelId, remoteNodeId, htlc.amountMsat, now) - context.system.eventStream ! EventStream.Publish(ChannelPaymentRelayed(htlc.paymentHash, incoming, outgoing)) + context.system.eventStream ! EventStream.Publish(ChannelPaymentRelayed(htlc.paymentHash, Seq(incoming), Seq(outgoing))) recordRelayDuration(isSuccess = true) safeSendAndStop(upstream.add.channelId, cmd) case WrappedAddResponse(RES_ADD_SETTLED(_, _, htlc, fail: HtlcResult.Fail)) => diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/PostRestartHtlcCleaner.scala b/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/PostRestartHtlcCleaner.scala index 7952335af7..9236f0430b 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/PostRestartHtlcCleaner.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/PostRestartHtlcCleaner.scala @@ -211,7 +211,7 @@ class PostRestartHtlcCleaner(nodeParams: NodeParams, register: ActorRef, initial } PendingCommandsDb.safeSend(register, nodeParams.db.pendingCommands, u.originChannelId, CMD_FULFILL_HTLC(u.originHtlcId, paymentPreimage, None, commit = true)) // We don't know when we received this HTLC so we just pretend that we received it just now. - context.system.eventStream.publish(ChannelPaymentRelayed(fulfilledHtlc.paymentHash, PaymentEvent.IncomingPayment(u.originChannelId, u.originNodeId, u.amountIn, TimestampMilli.now()), PaymentEvent.OutgoingPayment(fulfilledHtlc.channelId, downstreamNodeId, fulfilledHtlc.amountMsat, TimestampMilli.now()))) + context.system.eventStream.publish(ChannelPaymentRelayed(fulfilledHtlc.paymentHash, Seq(PaymentEvent.IncomingPayment(u.originChannelId, u.originNodeId, u.amountIn, TimestampMilli.now())), Seq(PaymentEvent.OutgoingPayment(fulfilledHtlc.channelId, downstreamNodeId, fulfilledHtlc.amountMsat, TimestampMilli.now())))) Metrics.PendingRelayedOut.decrement() context become main(brokenHtlcs.copy(relayedOut = brokenHtlcs.relayedOut - origin)) case u: Upstream.Cold.Trampoline => diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/wire/protocol/LiquidityAds.scala b/eclair-core/src/main/scala/fr/acinq/eclair/wire/protocol/LiquidityAds.scala index fbf983bad9..79c5fd660b 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/wire/protocol/LiquidityAds.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/wire/protocol/LiquidityAds.scala @@ -249,7 +249,9 @@ object LiquidityAds { case class WillFundPurchase(willFund: WillFund, purchase: Purchase) /** Minimal information about a liquidity purchase, useful for example when RBF-ing transactions. */ - case class PurchaseBasicInfo(isBuyer: Boolean, amount: Satoshi, fees: Fees) + case class PurchaseBasicInfo(isBuyer: Boolean, amount: Satoshi, fees: Fees) { + val isSeller: Boolean = !isBuyer + } object Codecs { val fundingRate: Codec[FundingRate] = ( diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/channel/publish/MempoolTxMonitorSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/channel/publish/MempoolTxMonitorSpec.scala index 2f02d3239d..331de836cc 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/channel/publish/MempoolTxMonitorSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/channel/publish/MempoolTxMonitorSpec.scala @@ -32,6 +32,7 @@ import fr.acinq.eclair.channel.publish.TxPublisher.TxRejectedReason._ import fr.acinq.eclair.channel.{TransactionConfirmed, TransactionPublished} import fr.acinq.eclair.{TestConstants, TestKitBaseClass, randomKey} import org.scalatest.BeforeAndAfterAll +import org.scalatest.Inside.inside import org.scalatest.funsuite.AnyFunSuiteLike import java.util.UUID @@ -279,7 +280,11 @@ class MempoolTxMonitorSpec extends TestKitBaseClass with AnyFunSuiteLike with Bi generateBlocks(2) monitor ! WrappedCurrentBlockHeight(currentBlockHeight()) - eventListener.expectMsg(TransactionConfirmed(txPublished.channelId, txPublished.remoteNodeId, tx)) + inside(eventListener.expectMsgType[TransactionConfirmed]) { e => + assert(e.channelId == txPublished.channelId) + assert(e.remoteNodeId == txPublished.remoteNodeId) + assert(e.tx == tx) + } } } diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/db/AuditDbSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/db/AuditDbSpec.scala index 8f2b4ae9b9..9a07b6f51d 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/db/AuditDbSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/db/AuditDbSpec.scala @@ -21,7 +21,6 @@ import fr.acinq.bitcoin.scalacompat.{Block, ByteVector32, SatoshiLong, Script, T import fr.acinq.eclair.TestDatabases.{TestPgDatabases, TestSqliteDatabases} import fr.acinq.eclair.TestUtils.randomTxId import fr.acinq.eclair._ -import fr.acinq.eclair.channel.Helpers.Closing.MutualClose import fr.acinq.eclair.channel._ import fr.acinq.eclair.db.AuditDb.Stats import fr.acinq.eclair.db.DbEventHandler.ChannelEvent @@ -70,8 +69,8 @@ class AuditDbSpec extends AnyFunSuite { val pp2a = PaymentEvent.IncomingPayment(randomBytes32(), dummyRemoteNodeId, 42000 msat, now) val pp2b = PaymentEvent.IncomingPayment(randomBytes32(), dummyRemoteNodeId, 42100 msat, now) val e2 = PaymentReceived(randomBytes32(), pp2a :: pp2b :: Nil) - val e3 = ChannelPaymentRelayed(randomBytes32(), PaymentEvent.IncomingPayment(randomBytes32(), dummyRemoteNodeId, 42000 msat, now - 3.seconds), PaymentEvent.OutgoingPayment(randomBytes32(), dummyRemoteNodeId, 1000 msat, now)) - val e4a = TransactionPublished(randomBytes32(), randomKey().publicKey, Transaction(0, Seq.empty, Seq.empty, 0), 42 sat, "mutual") + val e3 = ChannelPaymentRelayed(randomBytes32(), Seq(PaymentEvent.IncomingPayment(randomBytes32(), dummyRemoteNodeId, 42000 msat, now - 3.seconds)), Seq(PaymentEvent.OutgoingPayment(randomBytes32(), dummyRemoteNodeId, 1000 msat, now))) + val e4a = TransactionPublished(randomBytes32(), randomKey().publicKey, Transaction(0, Seq.empty, Seq.empty, 0), 42 sat, 0 sat, "mutual", None) val e4b = TransactionConfirmed(e4a.channelId, e4a.remoteNodeId, e4a.tx) val e4c = TransactionConfirmed(randomBytes32(), randomKey().publicKey, Transaction(2, Nil, TxOut(500 sat, hex"1234") :: Nil, 0)) val pp5a = PaymentSent.PaymentPart(UUID.randomUUID(), PaymentEvent.OutgoingPayment(randomBytes32(), dummyRemoteNodeId, 42000 msat, 0 unixms), 1000 msat, None, startedAt = 0 unixms) @@ -79,7 +78,7 @@ class AuditDbSpec extends AnyFunSuite { val e5 = PaymentSent(UUID.randomUUID(), randomBytes32(), 84100 msat, randomKey().publicKey, pp5a :: pp5b :: Nil, None, startedAt = 0 unixms) val pp6 = PaymentSent.PaymentPart(UUID.randomUUID(), PaymentEvent.OutgoingPayment(randomBytes32(), dummyRemoteNodeId, 42000 msat, settledAt = now + 10.minutes), 1000 msat, None, startedAt = now + 10.minutes) val e6 = PaymentSent(UUID.randomUUID(), randomBytes32(), 42000 msat, randomKey().publicKey, pp6 :: Nil, None, startedAt = now + 10.minutes) - val e7 = ChannelEvent(randomBytes32(), randomKey().publicKey, randomTxId(), 456123000 sat, isChannelOpener = true, isPrivate = false, ChannelEvent.EventType.Closed(MutualClose(null))) + val e7 = ChannelEvent(randomBytes32(), randomKey().publicKey, randomTxId(), "anchor_outputs", 456123000 sat, isChannelOpener = true, isPrivate = false, "mutual-close") val e10 = TrampolinePaymentRelayed(randomBytes32(), Seq( PaymentEvent.IncomingPayment(randomBytes32(), dummyRemoteNodeId, 20000 msat, now - 7.seconds), @@ -92,8 +91,8 @@ class AuditDbSpec extends AnyFunSuite { ), randomKey().publicKey, 30000 msat) val multiPartPaymentHash = randomBytes32() - val e11 = ChannelPaymentRelayed(multiPartPaymentHash, PaymentEvent.IncomingPayment(randomBytes32(), dummyRemoteNodeId, 13000 msat, now - 5.seconds), PaymentEvent.OutgoingPayment(randomBytes32(), dummyRemoteNodeId, 11000 msat, now + 4.milli)) - val e12 = ChannelPaymentRelayed(multiPartPaymentHash, PaymentEvent.IncomingPayment(randomBytes32(), dummyRemoteNodeId, 15000 msat, now - 4.seconds), PaymentEvent.OutgoingPayment(randomBytes32(), dummyRemoteNodeId, 12500 msat, now + 5.milli)) + val e11 = ChannelPaymentRelayed(multiPartPaymentHash, Seq(PaymentEvent.IncomingPayment(randomBytes32(), dummyRemoteNodeId, 13000 msat, now - 5.seconds)), Seq(PaymentEvent.OutgoingPayment(randomBytes32(), dummyRemoteNodeId, 11000 msat, now + 4.milli))) + val e12 = ChannelPaymentRelayed(multiPartPaymentHash, Seq(PaymentEvent.IncomingPayment(randomBytes32(), dummyRemoteNodeId, 15000 msat, now - 4.seconds)), Seq(PaymentEvent.OutgoingPayment(randomBytes32(), dummyRemoteNodeId, 12500 msat, now + 5.milli))) db.add(e1) db.add(e2) @@ -144,26 +143,26 @@ class AuditDbSpec extends AnyFunSuite { val c5 = c1.copy(bytes = 0x05b +: c1.tail) val c6 = c1.copy(bytes = 0x06b +: c1.tail) - db.add(ChannelPaymentRelayed(randomBytes32(), PaymentEvent.IncomingPayment(c6, randomKey().publicKey, 46000 msat, 1000 unixms), PaymentEvent.OutgoingPayment(c1, randomKey().publicKey, 44000 msat, 1001 unixms))) - db.add(ChannelPaymentRelayed(randomBytes32(), PaymentEvent.IncomingPayment(c6, randomKey().publicKey, 41000 msat, 1002 unixms), PaymentEvent.OutgoingPayment(c1, randomKey().publicKey, 40000 msat, 1003 unixms))) - db.add(ChannelPaymentRelayed(randomBytes32(), PaymentEvent.IncomingPayment(c5, randomKey().publicKey, 43000 msat, 1004 unixms), PaymentEvent.OutgoingPayment(c1, randomKey().publicKey, 42000 msat, 1005 unixms))) - db.add(ChannelPaymentRelayed(randomBytes32(), PaymentEvent.IncomingPayment(c5, randomKey().publicKey, 42000 msat, 1006 unixms), PaymentEvent.OutgoingPayment(c2, randomKey().publicKey, 40000 msat, 1007 unixms))) - db.add(ChannelPaymentRelayed(randomBytes32(), PaymentEvent.IncomingPayment(c5, randomKey().publicKey, 45000 msat, 1008 unixms), PaymentEvent.OutgoingPayment(c6, randomKey().publicKey, 40000 msat, 1009 unixms))) + db.add(ChannelPaymentRelayed(randomBytes32(), Seq(PaymentEvent.IncomingPayment(c6, randomKey().publicKey, 46000 msat, 1000 unixms)), Seq(PaymentEvent.OutgoingPayment(c1, randomKey().publicKey, 44000 msat, 1001 unixms)))) + db.add(ChannelPaymentRelayed(randomBytes32(), Seq(PaymentEvent.IncomingPayment(c6, randomKey().publicKey, 41000 msat, 1002 unixms)), Seq(PaymentEvent.OutgoingPayment(c1, randomKey().publicKey, 40000 msat, 1003 unixms)))) + db.add(ChannelPaymentRelayed(randomBytes32(), Seq(PaymentEvent.IncomingPayment(c5, randomKey().publicKey, 43000 msat, 1004 unixms)), Seq(PaymentEvent.OutgoingPayment(c1, randomKey().publicKey, 42000 msat, 1005 unixms)))) + db.add(ChannelPaymentRelayed(randomBytes32(), Seq(PaymentEvent.IncomingPayment(c5, randomKey().publicKey, 42000 msat, 1006 unixms)), Seq(PaymentEvent.OutgoingPayment(c2, randomKey().publicKey, 40000 msat, 1007 unixms)))) + db.add(ChannelPaymentRelayed(randomBytes32(), Seq(PaymentEvent.IncomingPayment(c5, randomKey().publicKey, 45000 msat, 1008 unixms)), Seq(PaymentEvent.OutgoingPayment(c6, randomKey().publicKey, 40000 msat, 1009 unixms)))) db.add(TrampolinePaymentRelayed(randomBytes32(), Seq(PaymentEvent.IncomingPayment(c6, randomKey().publicKey, 25000 msat, 1010 unixms)), Seq(PaymentEvent.OutgoingPayment(c4, randomKey().publicKey, 20000 msat, 1011 unixms)), randomKey().publicKey, 15000 msat)) db.add(TrampolinePaymentRelayed(randomBytes32(), Seq(PaymentEvent.IncomingPayment(c6, randomKey().publicKey, 46000 msat, 1012 unixms)), Seq(PaymentEvent.OutgoingPayment(c2, randomKey().publicKey, 16000 msat, 1013 unixms), PaymentEvent.OutgoingPayment(c4, randomKey().publicKey, 10000 msat, 1014 unixms), PaymentEvent.OutgoingPayment(c4, randomKey().publicKey, 14000 msat, 1015 unixms)), randomKey().publicKey, 37000 msat)) // The following confirmed txs will be taken into account. - db.add(TransactionPublished(c2, n2, Transaction(0, Seq.empty, Seq(TxOut(5000 sat, hex"12345")), 0), 200 sat, "funding")) + db.add(TransactionPublished(c2, n2, Transaction(0, Seq.empty, Seq(TxOut(5000 sat, hex"12345")), 0), 200 sat, 0 sat, "funding", None)) db.add(TransactionConfirmed(c2, n2, Transaction(0, Seq.empty, Seq(TxOut(5000 sat, hex"12345")), 0))) - db.add(TransactionPublished(c2, n2, Transaction(0, Seq.empty, Seq(TxOut(4000 sat, hex"00112233")), 0), 300 sat, "mutual")) + db.add(TransactionPublished(c2, n2, Transaction(0, Seq.empty, Seq(TxOut(4000 sat, hex"00112233")), 0), 300 sat, 0 sat, "mutual", None)) db.add(TransactionConfirmed(c2, n2, Transaction(0, Seq.empty, Seq(TxOut(4000 sat, hex"00112233")), 0))) - db.add(TransactionPublished(c3, n3, Transaction(0, Seq.empty, Seq(TxOut(8000 sat, hex"deadbeef")), 0), 400 sat, "funding")) + db.add(TransactionPublished(c3, n3, Transaction(0, Seq.empty, Seq(TxOut(8000 sat, hex"deadbeef")), 0), 400 sat, 0 sat, "funding", None)) db.add(TransactionConfirmed(c3, n3, Transaction(0, Seq.empty, Seq(TxOut(8000 sat, hex"deadbeef")), 0))) - db.add(TransactionPublished(c4, n4, Transaction(0, Seq.empty, Seq(TxOut(6000 sat, hex"0000000000")), 0), 500 sat, "funding")) + db.add(TransactionPublished(c4, n4, Transaction(0, Seq.empty, Seq(TxOut(6000 sat, hex"0000000000")), 0), 500 sat, 0 sat, "funding", None)) db.add(TransactionConfirmed(c4, n4, Transaction(0, Seq.empty, Seq(TxOut(6000 sat, hex"0000000000")), 0))) // The following txs will not be taken into account. - db.add(TransactionPublished(c2, n2, Transaction(0, Seq.empty, Seq(TxOut(5000 sat, hex"12345")), 0), 1000 sat, "funding")) // duplicate - db.add(TransactionPublished(c4, n4, Transaction(0, Seq.empty, Seq(TxOut(4500 sat, hex"1111222233")), 0), 500 sat, "funding")) // unconfirmed + db.add(TransactionPublished(c2, n2, Transaction(0, Seq.empty, Seq(TxOut(5000 sat, hex"12345")), 0), 1000 sat, 0 sat, "funding", None)) // duplicate + db.add(TransactionPublished(c4, n4, Transaction(0, Seq.empty, Seq(TxOut(4500 sat, hex"1111222233")), 0), 500 sat, 0 sat, "funding", None)) // unconfirmed db.add(TransactionConfirmed(c4, n4, Transaction(0, Seq.empty, Seq(TxOut(2500 sat, hex"ffffff")), 0))) // doesn't match a published tx assert(db.listPublished(randomBytes32()).isEmpty) @@ -204,7 +203,7 @@ class AuditDbSpec extends AnyFunSuite { channelIds.foreach(channelId => { val nodeId = nodeIds(Random.nextInt(nodeCount)) val fundingTx = Transaction(0, Seq.empty, Seq(TxOut(5000 sat, Script.pay2wpkh(nodeId))), 0) - db.add(TransactionPublished(channelId, nodeId, fundingTx, 100 sat, "funding")) + db.add(TransactionPublished(channelId, nodeId, fundingTx, 100 sat, 0 sat, "funding", None)) db.add(TransactionConfirmed(channelId, nodeId, fundingTx)) }) // Add relay events. @@ -217,7 +216,7 @@ class AuditDbSpec extends AnyFunSuite { db.add(TrampolinePaymentRelayed(randomBytes32(), incoming, outgoing, randomKey().publicKey, 5000 msat)) } else { val toChannelId = channelIds(Random.nextInt(channelCount)) - db.add(ChannelPaymentRelayed(randomBytes32(), PaymentEvent.IncomingPayment(randomBytes32(), randomKey().publicKey, 10000 msat, TimestampMilli.now() - 2.seconds), PaymentEvent.OutgoingPayment(toChannelId, randomKey().publicKey, Random.nextInt(10000).msat, TimestampMilli.now()))) + db.add(ChannelPaymentRelayed(randomBytes32(), Seq(PaymentEvent.IncomingPayment(randomBytes32(), randomKey().publicKey, 10000 msat, TimestampMilli.now() - 2.seconds)), Seq(PaymentEvent.OutgoingPayment(toChannelId, randomKey().publicKey, Random.nextInt(10000).msat, TimestampMilli.now())))) } }) // Test starts here. diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/db/PgUtilsSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/db/PgUtilsSpec.scala index 89bfd76c5b..7e97f96d1a 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/db/PgUtilsSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/db/PgUtilsSpec.scala @@ -173,7 +173,7 @@ class PgUtilsSpec extends TestKitBaseClass with AnyFunSuiteLike with Eventually db.network.addNode(Announcements.makeNodeAnnouncement(randomKey(), "node-A", Color(50, 99, -80), Nil, Features.empty, TimestampSecond.now() - 45.days)) db.network.addNode(Announcements.makeNodeAnnouncement(randomKey(), "node-B", Color(50, 99, -80), Nil, Features.empty, TimestampSecond.now() - 3.days)) db.network.addNode(Announcements.makeNodeAnnouncement(randomKey(), "node-C", Color(50, 99, -80), Nil, Features.empty, TimestampSecond.now() - 7.minutes)) - db.audit.add(ChannelPaymentRelayed(randomBytes32(), PaymentEvent.IncomingPayment(randomBytes32(), randomKey().publicKey, 421 msat, TimestampMilli.now() - 5.seconds), PaymentEvent.OutgoingPayment(randomBytes32(), randomKey().publicKey, 400 msat, TimestampMilli.now() - 3.seconds))) + db.audit.add(ChannelPaymentRelayed(randomBytes32(), Seq(PaymentEvent.IncomingPayment(randomBytes32(), randomKey().publicKey, 421 msat, TimestampMilli.now() - 5.seconds)), Seq(PaymentEvent.OutgoingPayment(randomBytes32(), randomKey().publicKey, 400 msat, TimestampMilli.now() - 3.seconds)))) db.dataSource.close() } diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/json/JsonSerializersSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/json/JsonSerializersSpec.scala index ffdd099ba7..241bad4b98 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/json/JsonSerializersSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/json/JsonSerializersSpec.scala @@ -404,7 +404,7 @@ class JsonSerializersSpec extends TestKitBaseClass with AnyFunSuiteLike with Mat } test("type hints") { - val e1 = ChannelPaymentRelayed(randomBytes32(), PaymentEvent.IncomingPayment(randomBytes32(), randomKey().publicKey, 110 msat, 100 unixms), PaymentEvent.OutgoingPayment(randomBytes32(), randomKey().publicKey, 100 msat, 150 unixms)) + val e1 = ChannelPaymentRelayed(randomBytes32(), Seq(PaymentEvent.IncomingPayment(randomBytes32(), randomKey().publicKey, 110 msat, 100 unixms)), Seq(PaymentEvent.OutgoingPayment(randomBytes32(), randomKey().publicKey, 100 msat, 150 unixms))) assert(JsonSerializers.serialization.writePretty(e1)(JsonSerializers.formats).contains("\"type\" : \"payment-relayed\"")) } diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/payment/relay/ChannelRelayerSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/payment/relay/ChannelRelayerSpec.scala index 8f93d78191..7e9037af2a 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/payment/relay/ChannelRelayerSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/payment/relay/ChannelRelayerSpec.scala @@ -767,11 +767,11 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a val paymentRelayed = eventListener.expectMessageType[ChannelPaymentRelayed] assert(paymentRelayed.paymentHash == r.add.paymentHash) assert(paymentRelayed.amountIn == r.add.amountMsat) - assert(paymentRelayed.paymentIn.channelId == r.add.channelId) - assert(paymentRelayed.paymentIn.remoteNodeId == TestConstants.Alice.nodeParams.nodeId) + assert(paymentRelayed.incoming.map(_.channelId) == Seq(r.add.channelId)) + assert(paymentRelayed.incoming.map(_.remoteNodeId) == Seq(TestConstants.Alice.nodeParams.nodeId)) assert(paymentRelayed.amountOut == r.amountToForward) - assert(paymentRelayed.paymentOut.channelId == channelId1) - assert(paymentRelayed.paymentOut.remoteNodeId == remoteNodeId2) + assert(paymentRelayed.outgoing.map(_.channelId) == Seq(channelId1)) + assert(paymentRelayed.outgoing.map(_.remoteNodeId) == Seq(remoteNodeId2)) assert(paymentRelayed.startedAt == r.receivedAt) assert(paymentRelayed.settledAt >= now) } diff --git a/eclair-node/src/test/scala/fr/acinq/eclair/api/ApiServiceSpec.scala b/eclair-node/src/test/scala/fr/acinq/eclair/api/ApiServiceSpec.scala index 71529e795c..fb1a12cb2d 100644 --- a/eclair-node/src/test/scala/fr/acinq/eclair/api/ApiServiceSpec.scala +++ b/eclair-node/src/test/scala/fr/acinq/eclair/api/ApiServiceSpec.scala @@ -1136,8 +1136,8 @@ class ApiServiceSpec extends AnyFunSuite with ScalatestRouteTest with IdiomaticM system.eventStream.publish(ps) wsClient.expectMessage(expectedSerializedPs) - val prel = ChannelPaymentRelayed(ByteVector32.Zeroes, PaymentEvent.IncomingPayment(ByteVector32.Zeroes, previousNodeId, 21 msat, TimestampMilli(1553784961048L)), PaymentEvent.OutgoingPayment(ByteVector32.One, nextNodeId, 20 msat, TimestampMilli(1553784963659L))) - val expectedSerializedPrel = """{"type":"payment-relayed","paymentHash":"0000000000000000000000000000000000000000000000000000000000000000","paymentIn":{"channelId":"0000000000000000000000000000000000000000000000000000000000000000","remoteNodeId":"02e899d99662f2e64ea0eeaecb53c4628fa40a22d7185076e42e8a3d67fcb7b8e6","amount":21,"receivedAt":{"iso":"2019-03-28T14:56:01.048Z","unix":1553784961}},"paymentOut":{"channelId":"0100000000000000000000000000000000000000000000000000000000000000","remoteNodeId":"030bb6a5e0c6b203c7e2180fb78c7ba4bdce46126761d8201b91ddac089cdecc87","amount":20,"settledAt":{"iso":"2019-03-28T14:56:03.659Z","unix":1553784963}}}""" + val prel = ChannelPaymentRelayed(ByteVector32.Zeroes, Seq(PaymentEvent.IncomingPayment(ByteVector32.Zeroes, previousNodeId, 21 msat, TimestampMilli(1553784961048L))), Seq(PaymentEvent.OutgoingPayment(ByteVector32.One, nextNodeId, 20 msat, TimestampMilli(1553784963659L)))) + val expectedSerializedPrel = """{"type":"payment-relayed","paymentHash":"0000000000000000000000000000000000000000000000000000000000000000","incoming":[{"channelId":"0000000000000000000000000000000000000000000000000000000000000000","remoteNodeId":"02e899d99662f2e64ea0eeaecb53c4628fa40a22d7185076e42e8a3d67fcb7b8e6","amount":21,"receivedAt":{"iso":"2019-03-28T14:56:01.048Z","unix":1553784961}}],"outgoing":[{"channelId":"0100000000000000000000000000000000000000000000000000000000000000","remoteNodeId":"030bb6a5e0c6b203c7e2180fb78c7ba4bdce46126761d8201b91ddac089cdecc87","amount":20,"settledAt":{"iso":"2019-03-28T14:56:03.659Z","unix":1553784963}}]}""" assert(serialization.write(prel) == expectedSerializedPrel) system.eventStream.publish(prel) wsClient.expectMessage(expectedSerializedPrel) From 401b7f8ce39399fb7cf1dce0f87331b130d8ba39 Mon Sep 17 00:00:00 2001 From: t-bast Date: Thu, 5 Feb 2026 14:58:02 +0100 Subject: [PATCH 2/2] Add per-peer profit scoring --- .../main/scala/fr/acinq/eclair/Setup.scala | 3 +- .../fr/acinq/eclair/profit/PeerScorer.scala | 530 ++++++++++++++++++ .../acinq/eclair/profit/PeerScorerSpec.scala | 224 ++++++++ 3 files changed, 756 insertions(+), 1 deletion(-) create mode 100644 eclair-core/src/main/scala/fr/acinq/eclair/profit/PeerScorer.scala create mode 100644 eclair-core/src/test/scala/fr/acinq/eclair/profit/PeerScorerSpec.scala diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala b/eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala index 6d9bc2de5e..5310b9cda5 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala @@ -44,6 +44,7 @@ import fr.acinq.eclair.payment.offer.{DefaultOfferHandler, OfferManager} import fr.acinq.eclair.payment.receive.PaymentHandler import fr.acinq.eclair.payment.relay.{AsyncPaymentTriggerer, PostRestartHtlcCleaner, Relayer} import fr.acinq.eclair.payment.send.{Autoprobe, PaymentInitiator} +import fr.acinq.eclair.profit.PeerScorer import fr.acinq.eclair.reputation.ReputationRecorder import fr.acinq.eclair.router._ import fr.acinq.eclair.tor.{Controller, TorProtocolHandler} @@ -400,8 +401,8 @@ class Setup(val datadir: File, _ = for (i <- 0 until config.getInt("autoprobe-count")) yield system.actorOf(SimpleSupervisor.props(Autoprobe.props(nodeParams, router, paymentInitiator), s"payment-autoprobe-$i", SupervisorStrategy.Restart)) balanceActor = system.spawn(BalanceActor(bitcoinClient, nodeParams.channelConf.minDepth, channelsListener, nodeParams.balanceCheckInterval), name = "balance-actor") - postman = system.spawn(Behaviors.supervise(Postman(nodeParams, switchboard, router.toTyped, register, offerManager)).onFailure(typed.SupervisorStrategy.restart), name = "postman") + peerScorer = system.spawn(Behaviors.supervise(PeerScorer(nodeParams.db.audit, channels)).onFailure(typed.SupervisorStrategy.restart), name = "peer-scorer") kit = Kit( nodeParams = nodeParams, diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/profit/PeerScorer.scala b/eclair-core/src/main/scala/fr/acinq/eclair/profit/PeerScorer.scala new file mode 100644 index 0000000000..af2871d8e8 --- /dev/null +++ b/eclair-core/src/main/scala/fr/acinq/eclair/profit/PeerScorer.scala @@ -0,0 +1,530 @@ +/* + * Copyright 2026 ACINQ SAS + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package fr.acinq.eclair.profit + +import akka.actor.typed.eventstream.EventStream +import akka.actor.typed.scaladsl.{ActorContext, Behaviors, TimerScheduler} +import akka.actor.typed.{ActorRef, Behavior} +import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey +import fr.acinq.bitcoin.scalacompat.{BtcDouble, ByteVector32, Satoshi, SatoshiLong} +import fr.acinq.eclair.channel._ +import fr.acinq.eclair.db.AuditDb +import fr.acinq.eclair.payment.{PaymentReceived, PaymentRelayed, PaymentSent} +import fr.acinq.eclair.wire.protocol.ChannelUpdate +import fr.acinq.eclair.{Features, MilliSatoshi, MilliSatoshiLong, TimestampMilli, ToMilliSatoshiConversion} + +import java.time.{Instant, ZoneId, ZonedDateTime} +import scala.concurrent.duration.{DurationInt, FiniteDuration} + +/** + * Created by t-bast on 30/01/2026. + */ + +object PeerScorer { + // @formatter:off + sealed trait Command + case class ScorePeers(replyTo_opt: Option[ActorRef[ScoreBoard]]) extends Command + private[profit] case object RemoveOldBuckets extends Command + private[profit] case class WrappedPaymentSent(e: PaymentSent) extends Command + private[profit] case class WrappedPaymentRelayed(e: PaymentRelayed) extends Command + private[profit] case class WrappedPaymentReceived(e: PaymentReceived) extends Command + private[profit] case class ChannelCreationInProgress(remoteNodeId: PublicKey, channelId: ByteVector32) extends Command + private[profit] case class ChannelCreationAborted(remoteNodeId: PublicKey, channelId: ByteVector32) extends Command + private[profit] case class WrappedLocalChannelUpdate(e: LocalChannelUpdate) extends Command + private[profit] case class WrappedLocalChannelDown(e: LocalChannelDown) extends Command + private[profit] case class WrappedAvailableBalanceChanged(e: AvailableBalanceChanged) extends Command + // @formatter:on + + // TODO: document configuration parameters + case class Config(scoringFrequency: FiniteDuration, topPeersCount: Int, minLiquidityAllocation: Satoshi, maxLiquidityAllocation: Satoshi, liquidityAllocationDays: Int, routingFeeUpdateMinVolume: Satoshi) + + object Config { + def default: Config = Config( + scoringFrequency = 30 minutes, + topPeersCount = 50, + minLiquidityAllocation = 0.5.btc.toSatoshi, + maxLiquidityAllocation = 5.btc.toSatoshi, + liquidityAllocationDays = 3, + routingFeeUpdateMinVolume = 0.1.btc.toSatoshi, + ) + } + + def apply(db: AuditDb, channels: Seq[PersistentChannelData], config: Config = Config.default): Behavior[Command] = { + Behaviors.setup { context => + Behaviors.withTimers { timers => + new PeerScorer(config, db, timers, context).start(channels) + } + } + } + + case class ScoreBoard(bestPeers: Seq[PeerInfo], liquiditySuggestions: Seq[LiquiditySuggestion], routingFeeSuggestions: Seq[RoutingFeeSuggestion], channelsToClose: Seq[ChannelInfo]) + + /** NB: stats are ordered, with the most recent events first. */ + case class PeerInfo(remoteNodeId: PublicKey, stats: Seq[PeerStats], channels: Seq[ChannelInfo], latestUpdate_opt: Option[ChannelUpdate]) { + val capacity: Satoshi = channels.map(_.capacity).sum + val canSend: MilliSatoshi = channels.map(_.canSend).sum + val canReceive: MilliSatoshi = channels.map(_.canReceive).sum + + def score(bucketCount: Int, totalProfit: MilliSatoshi): Double = { + // We compute a profit score based on: + // - fees earned relative to channel capacity: this indicator allows smaller peers that perform well to attract + // more liquidity and become larger peers (avoid always allocating to the already large peers). We scale this + // by a factor 10 because it is otherwise negligible. + // - fees earned relative to other peers: this indicator favors larger peers, but is useful because smaller peers + // that perform well won't necessarily perform as well for larger volumes, so the previous indicator alone is + // insufficient. + val profit = stats.take(bucketCount).map(_.profit).sum + if (capacity > 0.sat && totalProfit > 0.msat) { + (10 * profit.toLong.toDouble / capacity.toMilliSatoshi.toLong) + (profit.toLong.toDouble / totalProfit.toLong) + } else { + 0.0 + } + } + } + + /** We recommend adding [[fundingAmount]] to a channel with [[remoteNodeId]]. */ + case class LiquiditySuggestion(remoteNodeId: PublicKey, fundingAmount: Satoshi) + + /** We recommend increasing or decreasing our routing fees with [[remoteNodeId]] because we detected a variation in payment volume. */ + case class RoutingFeeSuggestion(remoteNodeId: PublicKey, volumeVariation: Double, increase: Boolean, current: ChannelUpdate) { + val decrease: Boolean = !increase + } + + /** We compute peer statistics per buckets spanning a few hours. */ + case class Bucket(private val day: Int, private val month: Int, private val year: Int, private val slot: Int) extends Ordered[Bucket] { + override def compare(that: Bucket): Int = that match { + case Bucket(_, _, y, _) if y != year => year - y + case Bucket(_, m, _, _) if m != month => month - m + case Bucket(d, _, _, _) if d != day => day - d + case Bucket(_, _, _, s) => slot - s + } + + override def toString: String = f"$year-$month%02d-$day%02d-$slot" + } + + object Bucket { + val duration: FiniteDuration = 3 hours + val bucketsPerDay: Int = 8 + + def from(ts: TimestampMilli): Bucket = { + val date = ZonedDateTime.ofInstant(Instant.ofEpochMilli(ts.toLong), ZoneId.of("UTC")) + Bucket(date.getDayOfMonth, date.getMonthValue, date.getYear, date.getHour * bucketsPerDay / 24) + } + } + + case class PeerStats(totalAmountIn: MilliSatoshi, totalAmountOut: MilliSatoshi, relayFeeEarned: MilliSatoshi, onChainFeePaid: Satoshi, liquidityFeeEarned: Satoshi, liquidityFeePaid: Satoshi) { + val outgoingFlow: MilliSatoshi = totalAmountOut - totalAmountIn + val profit: MilliSatoshi = relayFeeEarned + liquidityFeeEarned.toMilliSatoshi - onChainFeePaid.toMilliSatoshi - liquidityFeePaid.toMilliSatoshi + } + + object PeerStats { + def empty: PeerStats = PeerStats(0 msat, 0 msat, 0 msat, 0 sat, 0 sat, 0 sat) + } + + /** + * We aggregate events into buckets to avoid storing too much data per peer, while providing enough granularity to + * detect variations in volume and flows. Note that we store an entry for every peer with whom we have a channel, + * even for peers that don't have any activity (which lets us detect those peers and potentially reclaim liquidity). + */ + case class BucketedPeerStats(private val stats: Map[PublicKey, Map[Bucket, PeerStats]]) { + def peers: Iterable[PublicKey] = stats.keys + + /** Returns stats for the given peer in all buckets (most recent bucket first). */ + def getPeerStats(remoteNodeId: PublicKey, now: TimestampMilli): Seq[PeerStats] = { + stats.get(remoteNodeId) match { + case Some(_) => (0 until BucketedPeerStats.bucketsCount).map(b => getPeerStatsForBucket(remoteNodeId, Bucket.from(now - b * Bucket.duration))) + case None => Seq.fill(BucketedPeerStats.bucketsCount)(PeerStats.empty) + } + } + + /** Returns stats for the given bucket. */ + private def getPeerStatsForBucket(remoteNodeId: PublicKey, bucket: Bucket): PeerStats = { + stats.get(remoteNodeId).flatMap(_.get(bucket)).getOrElse(PeerStats.empty) + } + + private def addOrUpdate(remoteNodeId: PublicKey, bucket: Bucket, peerStats: PeerStats): BucketedPeerStats = { + val buckets = stats.getOrElse(remoteNodeId, Map.empty[Bucket, PeerStats]) + copy(stats = stats + (remoteNodeId -> (buckets + (bucket -> peerStats)))) + } + + def addPaymentSent(e: PaymentSent): BucketedPeerStats = { + e.parts.foldLeft(this) { + case (current, p) => + val bucket = Bucket.from(p.settledAt) + val peerStats = current.getPeerStatsForBucket(p.remoteNodeId, bucket) + val peerStats1 = peerStats.copy(totalAmountOut = peerStats.totalAmountOut + p.amountWithFees) + current.addOrUpdate(p.remoteNodeId, bucket, peerStats1) + } + } + + def addPaymentReceived(e: PaymentReceived): BucketedPeerStats = { + e.parts.foldLeft(this) { + case (current, p) => + val bucket = Bucket.from(p.receivedAt) + val peerStats = current.getPeerStatsForBucket(p.remoteNodeId, bucket) + val peerStats1 = peerStats.copy(totalAmountIn = peerStats.totalAmountIn + p.amount) + current.addOrUpdate(p.remoteNodeId, bucket, peerStats1) + } + } + + def addPaymentRelayed(e: PaymentRelayed): BucketedPeerStats = { + val withIncoming = e.incoming.foldLeft(this) { + case (current, i) => + val bucket = Bucket.from(i.receivedAt) + val peerStats = current.getPeerStatsForBucket(i.remoteNodeId, bucket) + val peerStats1 = peerStats.copy(totalAmountIn = peerStats.totalAmountIn + i.amount) + current.addOrUpdate(i.remoteNodeId, bucket, peerStats1) + } + e.outgoing.foldLeft(withIncoming) { + case (current, o) => + val bucket = Bucket.from(o.settledAt) + val peerStats = current.getPeerStatsForBucket(o.remoteNodeId, bucket) + // When using MPP and trampoline, payments can be relayed through multiple nodes at once. + // We split the fee according to the proportional amount relayed through the requested node. + val relayFee = e.relayFee * (e.outgoing.filter(_.remoteNodeId == o.remoteNodeId).map(_.amount).sum.toLong.toDouble / e.amountOut.toLong) + val peerStats1 = peerStats.copy(totalAmountOut = peerStats.totalAmountOut + o.amount, relayFeeEarned = peerStats.relayFeeEarned + relayFee) + current.addOrUpdate(o.remoteNodeId, bucket, peerStats1) + } + } + + /** Remove old buckets that exceed our retention window. This should be called frequently to avoid memory leaks. */ + def removeOldBuckets(now: TimestampMilli): BucketedPeerStats = { + val oldestBucket = Bucket.from(now - Bucket.duration * BucketedPeerStats.bucketsCount) + copy(stats = stats.map { + case (remoteNodeId, peerStats) => remoteNodeId -> peerStats.filter { case (bucket, _) => bucket >= oldestBucket } + }) + } + + /** Remove a peer from our list: this should only happen when we don't have channels with that peer anymore. */ + def removePeer(remoteNodeId: PublicKey): BucketedPeerStats = copy(stats = stats - remoteNodeId) + } + + object BucketedPeerStats { + // We keep 7 days of past history. + val bucketsCount: Int = 7 * Bucket.bucketsPerDay + + def empty(peers: Set[PublicKey]): BucketedPeerStats = BucketedPeerStats(peers.map(remoteNodeId => remoteNodeId -> Map.empty[Bucket, PeerStats]).toMap) + } + + /** We keep minimal information about open channels to allow running our heuristics. */ + case class ChannelInfo(channelId: ByteVector32, capacity: Satoshi, canSend: MilliSatoshi, canReceive: MilliSatoshi, isPublic: Boolean) + + private object ChannelInfo { + def apply(commitments: Commitments): ChannelInfo = ChannelInfo(commitments.channelId, commitments.latest.capacity, commitments.availableBalanceForSend, commitments.availableBalanceForReceive, commitments.announceChannel) + } + + /** Note that we keep channel updates separately: we're only interested in the relay fees, which are the same for every channel. */ + case class PeerChannels(private val channels: Map[PublicKey, Seq[ChannelInfo]], private val updates: Map[PublicKey, ChannelUpdate]) { + def peers: Set[PublicKey] = channels.keySet + + def getChannels(remoteNodeId: PublicKey): Seq[ChannelInfo] = channels.getOrElse(remoteNodeId, Nil) + + def getUpdate(remoteNodeId: PublicKey): Option[ChannelUpdate] = updates.get(remoteNodeId) + + def updateChannel(e: LocalChannelUpdate): PeerChannels = { + updates.get(e.remoteNodeId) match { + case Some(u) if u.timestamp > e.channelUpdate.timestamp => updateChannel(e.commitments) + case _ => updateChannel(e.commitments).copy(updates = updates + (e.remoteNodeId -> e.channelUpdate)) + } + } + + def updateChannel(e: AvailableBalanceChanged): PeerChannels = { + updateChannel(e.commitments) + } + + private def updateChannel(commitments: Commitments): PeerChannels = { + if (!commitments.channelParams.channelFeatures.hasFeature(Features.PhoenixZeroReserve)) { + val peerChannels1 = channels.getOrElse(commitments.remoteNodeId, Nil).filter(_.channelId != commitments.channelId) :+ ChannelInfo(commitments) + copy(channels = channels + (commitments.remoteNodeId -> peerChannels1)) + } else { + // We filter out channels with mobile wallets. + this + } + } + + def removeChannel(e: LocalChannelDown): PeerChannels = { + channels.get(e.remoteNodeId) match { + case Some(peerChannels) => + val peerChannels1 = peerChannels.filter(_.channelId != e.channelId) + if (peerChannels1.isEmpty) { + copy(channels = channels - e.remoteNodeId, updates = updates - e.remoteNodeId) + } else { + copy(channels = channels + (e.remoteNodeId -> peerChannels1)) + } + case None => this + } + } + } + + private object PeerChannels { + def apply(channels: Seq[PersistentChannelData]): PeerChannels = { + channels.foldLeft(PeerChannels(Map.empty[PublicKey, Seq[ChannelInfo]], Map.empty[PublicKey, ChannelUpdate])) { + case (current, channel) => channel match { + // We filter out channels with mobile wallets. + case d: DATA_NORMAL if !d.commitments.channelParams.channelFeatures.hasFeature(Features.PhoenixZeroReserve) => + val peerChannels1 = current.channels.getOrElse(d.remoteNodeId, Nil) :+ ChannelInfo(d.commitments) + val update1 = current.updates.get(d.remoteNodeId) match { + case Some(update) if update.timestamp > d.channelUpdate.timestamp => update + case _ => d.channelUpdate + } + current.copy( + channels = current.channels + (d.remoteNodeId -> peerChannels1), + updates = current.updates + (d.remoteNodeId -> update1) + ) + case _ => current + } + } + } + } + + private def removePendingChannel(remoteNodeId: PublicKey, channelId: ByteVector32, pending: Map[PublicKey, Set[ByteVector32]]): Map[PublicKey, Set[ByteVector32]] = { + pending.get(remoteNodeId) match { + case Some(channels) => + val channels1 = channels - channelId + if (channels1.isEmpty) { + pending - remoteNodeId + } else { + pending + (remoteNodeId -> channels1) + } + case None => pending + } + } + +} + +private class PeerScorer(config: PeerScorer.Config, db: AuditDb, timers: TimerScheduler[PeerScorer.Command], context: ActorContext[PeerScorer.Command]) { + + import PeerScorer._ + + private val log = context.log + private val startedAt = TimestampMilli.now() + + private def start(channels: Seq[PersistentChannelData]): Behavior[Command] = { + // We subscribe to channel events to update channel balances. + context.system.eventStream ! EventStream.Subscribe(context.messageAdapter[ChannelIdAssigned](e => ChannelCreationInProgress(e.remoteNodeId, e.channelId))) + context.system.eventStream ! EventStream.Subscribe(context.messageAdapter[ChannelAborted](e => ChannelCreationAborted(e.remoteNodeId, e.channelId))) + context.system.eventStream ! EventStream.Subscribe(context.messageAdapter(WrappedLocalChannelDown)) + context.system.eventStream ! EventStream.Subscribe(context.messageAdapter(WrappedLocalChannelUpdate)) + context.system.eventStream ! EventStream.Subscribe(context.messageAdapter(WrappedAvailableBalanceChanged)) + val peerChannels = PeerChannels(channels) + val pendingChannels = channels.collect { + case d: DATA_WAIT_FOR_DUAL_FUNDING_SIGNED => d.remoteNodeId -> d.channelId + case d: DATA_WAIT_FOR_DUAL_FUNDING_CONFIRMED => d.remoteNodeId -> d.channelId + case d: DATA_WAIT_FOR_DUAL_FUNDING_READY => d.remoteNodeId -> d.channelId + case d: DATA_WAIT_FOR_FUNDING_CONFIRMED => d.remoteNodeId -> d.channelId + case d: DATA_WAIT_FOR_CHANNEL_READY => d.remoteNodeId -> d.channelId + }.groupBy(_._1).map { case (remoteNodeId, channelIds) => remoteNodeId -> channelIds.map(_._2).toSet } + // We subscribe to payment events to update statistics. + context.system.eventStream ! EventStream.Subscribe(context.messageAdapter(WrappedPaymentSent)) + context.system.eventStream ! EventStream.Subscribe(context.messageAdapter(WrappedPaymentRelayed)) + context.system.eventStream ! EventStream.Subscribe(context.messageAdapter(WrappedPaymentReceived)) + // TODO: read events that happened before startedAt from the DB to initialize statistics from past data. + val stats = BucketedPeerStats.empty(peerChannels.peers) + // TODO: trigger this event reactively when large peers seem to run out of liquidity? + timers.startTimerWithFixedDelay(ScorePeers(None), config.scoringFrequency) + timers.startTimerWithFixedDelay(RemoveOldBuckets, Bucket.duration) + listening(stats, peerChannels, pendingChannels) + } + + private def listening(stats: BucketedPeerStats, channels: PeerChannels, pendingChannels: Map[PublicKey, Set[ByteVector32]]): Behavior[Command] = { + Behaviors.receiveMessagePartial { + case WrappedPaymentSent(e) => + listening(stats.addPaymentSent(e), channels, pendingChannels) + case WrappedPaymentReceived(e) => + listening(stats.addPaymentReceived(e), channels, pendingChannels) + case WrappedPaymentRelayed(e) => + listening(stats.addPaymentRelayed(e), channels, pendingChannels) + case e: ChannelCreationInProgress => + val pendingChannels1 = pendingChannels + (e.remoteNodeId -> (pendingChannels.getOrElse(e.remoteNodeId, Set.empty) + e.channelId)) + listening(stats, channels, pendingChannels1) + case e: ChannelCreationAborted => + listening(stats, channels, removePendingChannel(e.remoteNodeId, e.channelId, pendingChannels)) + case WrappedLocalChannelUpdate(e) => + listening(stats, channels.updateChannel(e), removePendingChannel(e.remoteNodeId, e.channelId, pendingChannels)) + case WrappedAvailableBalanceChanged(e) => + listening(stats, channels.updateChannel(e), pendingChannels) + case WrappedLocalChannelDown(e) => + val channels1 = channels.removeChannel(e) + val pendingChannels1 = removePendingChannel(e.remoteNodeId, e.channelId, pendingChannels) + val stats1 = if (channels1.getChannels(e.remoteNodeId).isEmpty && !pendingChannels1.contains(e.remoteNodeId)) { + stats.removePeer(e.remoteNodeId) + } else { + stats + } + listening(stats1, channels1, pendingChannels1) + case RemoveOldBuckets => + listening(stats.removeOldBuckets(TimestampMilli.now()), channels, pendingChannels) + case cmd: ScorePeers => + // TODO: do a db.listConfirmed() to update on-chain stats (we cannot rely on events only because data comes from + // the TransactionPublished event, but should only be applied after TransactionConfirmed so we need permanent + // storage). We'll need the listConfirmed() function added in https://github.com/ACINQ/eclair/pull/3245. + scoring(cmd, stats, channels, pendingChannels) + } + } + + private def scoring(cmd: ScorePeers, stats: BucketedPeerStats, channels: PeerChannels, pendingChannels: Map[PublicKey, Set[ByteVector32]]): Behavior[Command] = { + val now = TimestampMilli.now() + log.info("scoring {} peers", stats.peers.size) + // We compute the overall daily and weekly profit. + val (totalDailyProfit, totalWeeklyProfit) = stats.peers.map(p => { + val peerStats = stats.getPeerStats(p, now) + (peerStats.take(Bucket.bucketsPerDay).map(_.profit).sum, peerStats.map(_.profit).sum) + }).reduceOption { + (current, next) => (current._1 + next._1, current._2 + next._2) + }.getOrElse(0 msat, 0 msat) + log.info("rolling daily profit = {} and weekly profit = {}", totalDailyProfit, totalWeeklyProfit) + // We start by selecting the top 50 most profitable peers of the past day and week. + // TODO: if a good peer ran out of liquidity more than a week ago, they won't get a good score: + // - should we look at older data to detect such peers? + // - which threshold should we use to start digging in the past instead of focusing on recent good peers? + // - maybe a threshold on the profit earned (if recent peers didn't earn us enough fees, start looking at past data)? + val mostProfitableWeeklyPeers = stats.peers + .map(remoteNodeId => (remoteNodeId, stats.getPeerStats(remoteNodeId, now).map(_.profit).sum)) + .toSeq + .filter { case (_, profit) => profit > 0.msat } + .sortBy { case (_, profit) => profit }(Ordering[MilliSatoshi].reverse) + .take(config.topPeersCount) + .toMap + val mostProfitableDailyPeers = stats.peers + .map(remoteNodeId => (remoteNodeId, stats.getPeerStats(remoteNodeId, now).take(Bucket.bucketsPerDay).map(_.profit).sum)) + .toSeq + .filter { case (_, profit) => profit > 0.msat } + .sortBy { case (_, profit) => profit }(Ordering[MilliSatoshi].reverse) + .take(config.topPeersCount) + .toMap + // We assign a score to each of those nodes which lets us prioritize liquidity allocation. + // We use the best of their daily and weekly scores. + val bestScoringPeers = (mostProfitableDailyPeers.keySet ++ mostProfitableWeeklyPeers.keySet).map(remoteNodeId => { + PeerInfo(remoteNodeId, stats.getPeerStats(remoteNodeId, now), channels.getChannels(remoteNodeId), channels.getUpdate(remoteNodeId)) + }).toSeq.sortBy(p => Seq(p.score(Bucket.bucketsPerDay, totalDailyProfit), p.score(BucketedPeerStats.bucketsCount, totalWeeklyProfit)).max)(Ordering[Double].reverse) + if (bestScoringPeers.nonEmpty) { + log.info("top {} peers:", bestScoringPeers.size) + log.info("| rank | node_id | daily_score | weekly_score | current_profit_msat | previous_profit_msat | daily_profit_msat | weekly_profit_msat | current_amount_in_msat | current_amount_out_msat | previous_amount_in_msat | previous_amount_out_msat | daily_amount_in_msat | daily_amount_out_msat | weekly_amount_in_msat | weekly_amount_out_msat | capacity_sat | can_send_msat | can_receive_msat |") + log.info("|------|--------------------------------------------------------------------|-------------|--------------|---------------------|----------------------|-------------------|--------------------|------------------------|-------------------------|-------------------------|--------------------------|----------------------|-----------------------|-----------------------|------------------------|--------------|-----------------|------------------|") + bestScoringPeers.zipWithIndex.foreach { case (s, i) => + val currentProfit = s.stats.headOption.map(_.profit).getOrElse(0 msat).toLong + val previousProfit = s.stats.drop(1).headOption.map(_.profit).getOrElse(0 msat).toLong + val dailyProfit = s.stats.take(Bucket.bucketsPerDay).map(_.profit).sum.toLong + val dailyScore = s.score(Bucket.bucketsPerDay, totalDailyProfit) + val weeklyProfit = s.stats.map(_.profit).sum.toLong + val weeklyScore = s.score(BucketedPeerStats.bucketsCount, totalWeeklyProfit) + val currentAmountOut = s.stats.headOption.map(_.totalAmountOut).getOrElse(0 msat).toLong + val previousAmountOut = s.stats.drop(1).headOption.map(_.totalAmountOut).getOrElse(0 msat).toLong + val dailyAmountOut = s.stats.take(Bucket.bucketsPerDay).map(_.totalAmountOut).sum.toLong + val weeklyAmountOut = s.stats.map(_.totalAmountOut).sum.toLong + val currentAmountIn = s.stats.headOption.map(_.totalAmountIn).getOrElse(0 msat).toLong + val previousAmountIn = s.stats.drop(1).headOption.map(_.totalAmountIn).getOrElse(0 msat).toLong + val dailyAmountIn = s.stats.take(Bucket.bucketsPerDay).map(_.totalAmountIn).sum.toLong + val weeklyAmountIn = s.stats.map(_.totalAmountIn).sum.toLong + log.info(f"| ${i + 1}%4d | ${s.remoteNodeId} | $dailyScore%.2f | $weeklyScore%.2f | $currentProfit%19d | $previousProfit%20d | $dailyProfit%17d | $weeklyProfit%18d | $currentAmountIn%22d | $currentAmountOut%23d | $previousAmountIn%23d | $previousAmountOut%24d | $dailyAmountIn%20d | $dailyAmountOut%21d | $weeklyAmountIn%21d | $weeklyAmountOut%22d | ${s.capacity.toLong}%12d | ${s.canSend.toLong}%15d | ${s.canReceive.toLong}%16d |") + } + log.info("|------|--------------------------------------------------------------------|-------------|--------------|---------------------|----------------------|-------------------|--------------------|------------------------|-------------------------|-------------------------|--------------------------|----------------------|-----------------------|-----------------------|------------------------|--------------|-----------------|------------------|") + } + // We compute suggestions for liquidity allocation based on estimated future outgoing flow. + val liquidity = bestScoringPeers + // We only consider allocating liquidity if there isn't already a channel being created. + .filter(p => !pendingChannels.contains(p.remoteNodeId)) + // We only consider allocating liquidity to nodes that have a positive outgoing flow. + .filter(p => p.stats.map(_.outgoingFlow).sum > 0.msat || p.stats.take(Bucket.bucketsPerDay).map(_.outgoingFlow).sum > 0.msat) + .map(s => { + // We want to allocate liquidity to ensure that we can keep relaying payments for a given duration if the flow stays the same. + val lastFlows = s.stats.take(Bucket.bucketsPerDay).map(_.outgoingFlow).sum + val missing = (lastFlows * config.liquidityAllocationDays - s.canSend).max(0 msat).min(config.maxLiquidityAllocation) + LiquiditySuggestion(s.remoteNodeId, missing.truncateToSatoshi) + }) + .filter { l => l.fundingAmount > config.minLiquidityAllocation } + if (liquidity.nonEmpty) { + log.info("we recommend the following {} liquidity allocations:", liquidity.size) + log.info("| rank | node_id | funding_amount_sat |") + log.info("|------|--------------------------------------------------------------------|--------------------|") + liquidity.zipWithIndex.foreach { case (l, i) => + log.info(f"| ${i + 1}%4d | ${l.remoteNodeId} | ${l.fundingAmount.toLong}%18d |") + } + log.info("|------|--------------------------------------------------------------------|--------------------|") + } + val routing = bestScoringPeers.flatMap(s => { + val liquidityAvailable = s.canSend > config.routingFeeUpdateMinVolume + val current = s.stats.headOption.map(_.totalAmountOut).getOrElse(0 msat) + val previous = s.stats.drop(1).headOption.map(_.totalAmountOut).getOrElse(0 msat) + val older = s.stats.drop(2).headOption.map(_.totalAmountOut).getOrElse(0 msat) + s.latestUpdate_opt match { + case Some(u) if current > Seq(previous, config.routingFeeUpdateMinVolume.toMilliSatoshi).max && previous > 0.msat && liquidityAvailable && u.timestamp.toTimestampMilli < now - Bucket.duration => + // If the current flow (which doesn't have a whole bucket of data yet) is greater than the previous flow + // (which has a full bucket of data), and greater than a given threshold, we can try to increase our routing + // fees if we haven't done so recently already. + val variation = (current - previous).toLong.toDouble / previous.toLong + Some(RoutingFeeSuggestion(s.remoteNodeId, variation, increase = true, u)) + case Some(u) if older > 0.msat && older > current + previous && liquidityAvailable && u.timestamp.toTimestampMilli < now - Bucket.duration => + // If the flow of the current and previous buckets are smaller than the flow of the bucket before that, and + // we haven't already updated our fees recently, we may want to decrease our routing fees to attract traffic. + val variation = (older - current - previous).toLong.toDouble / older.toLong + Some(RoutingFeeSuggestion(s.remoteNodeId, variation, increase = false, u)) + case _ => + None + } + }) + if (routing.nonEmpty) { + log.info("we recommend the following {} routing fee changes", routing.size) + log.info("| node_id | decision | volume_variation | current_fee_base_msat | current_fee_proportional |") + log.info("|--------------------------------------------------------------------|----------|------------------|-----------------------|--------------------------|") + routing.foreach(r => { + log.info(f"| ${r.remoteNodeId} | ${if (r.increase) "increase" else "decrease"} | ${r.volumeVariation}%.2f | ${r.current.feeBaseMsat.toLong}%21d | ${r.current.feeProportionalMillionths}%24d |") + }) + log.info("|--------------------------------------------------------------------|----------|------------------|-----------------------|--------------------------|") + } + // We compute suggestions for channels that can be closed to reclaim liquidity. + val channelsToClose = if (now > startedAt + 1.day) { + // Since we're not yet reading past events from the DB, we need to wait until we have collected enough data. + stats.peers + .filter(remoteNodeId => channels.getChannels(remoteNodeId).size > 1) + .flatMap(remoteNodeId => { + // Peers for which most of the liquidity is idle on our side are good candidates for reclaiming liquidity. + val peerChannels = channels.getChannels(remoteNodeId) + val unbalanced = peerChannels.map(_.canSend).sum >= peerChannels.map(_.canReceive).sum * 4 + val volumeOut = stats.getPeerStats(remoteNodeId, now).map(_.totalAmountOut).sum + val lowVolume = volumeOut <= peerChannels.map(_.canSend).sum * 0.1 + if (unbalanced && lowVolume) { + // We always want to keep at least one channel with every peer, and want to keep the one with the best score + // for routing algorithms (mostly influenced by capacity). We want to avoid closing the largest channel if + // it isn't unbalanced, so we also take into account current balances in that channel. + val maxCapacity = peerChannels.map(_.capacity).max + val channelToKeep = peerChannels.filter(_.isPublic).map(b => { + val score = 4 * (b.capacity.toLong.toDouble / maxCapacity.toLong) + (b.canSend.toLong.toDouble / b.capacity.toMilliSatoshi.toLong) + (score, b) + }).sortBy(_._1).lastOption.map(_._2.channelId) + peerChannels.filter(b => !channelToKeep.contains(b.channelId)) + } else { + Nil + } + }).toSeq + } else { + Nil + } + if (channelsToClose.nonEmpty) { + log.info("we recommend closing the following {} channels to reclaim {}", channelsToClose.size, channelsToClose.map(_.canSend).sum) + log.info("| channel_id | local_balance_sat | capacity_sat |") + log.info("|------------------------------------------------------------------|-------------------|--------------|") + channelsToClose.foreach(c => { + log.info(f"| ${c.channelId} | ${c.canSend.truncateToSatoshi.toLong}%17d | ${c.capacity.toLong}%12d |") + }) + log.info("|------------------------------------------------------------------|-------------------|--------------|") + } + cmd.replyTo_opt.foreach(_ ! ScoreBoard(bestScoringPeers, liquidity, routing, channelsToClose)) + listening(stats, channels, pendingChannels) + } + +} diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/profit/PeerScorerSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/profit/PeerScorerSpec.scala new file mode 100644 index 0000000000..475ecd4b18 --- /dev/null +++ b/eclair-core/src/test/scala/fr/acinq/eclair/profit/PeerScorerSpec.scala @@ -0,0 +1,224 @@ +package fr.acinq.eclair.profit + +import akka.actor.ActorRef +import akka.actor.testkit.typed.scaladsl.{ScalaTestWithActorTestKit, TestProbe} +import com.typesafe.config.ConfigFactory +import fr.acinq.bitcoin.scalacompat.Crypto.{PrivateKey, PublicKey} +import fr.acinq.bitcoin.scalacompat.{Block, BtcAmount, BtcDouble, ByteVector32, ByteVector64, OutPoint, Satoshi, SatoshiLong, Script, Transaction, TxOut} +import fr.acinq.eclair.TestUtils.randomTxId +import fr.acinq.eclair.blockchain.fee.FeeratePerKw +import fr.acinq.eclair.channel._ +import fr.acinq.eclair.payment.ChannelPaymentRelayed +import fr.acinq.eclair.payment.PaymentEvent.{IncomingPayment, OutgoingPayment} +import fr.acinq.eclair.payment.relay.Relayer.RelayFees +import fr.acinq.eclair.profit.PeerScorer._ +import fr.acinq.eclair.transactions.Transactions.{ClosingTx, InputInfo} +import fr.acinq.eclair.wire.protocol.ChannelUpdate.{ChannelFlags, MessageFlags} +import fr.acinq.eclair.wire.protocol.{ChannelAnnouncement, ChannelUpdate} +import fr.acinq.eclair.{Alias, BlockHeight, CltvExpiryDelta, Features, MilliSatoshiLong, RealShortChannelId, TestDatabases, TimestampMilli, TimestampSecond, ToMilliSatoshiConversion, randomBytes32} +import org.scalatest.Inside.inside +import org.scalatest.funsuite.AnyFunSuiteLike +import scodec.bits.{ByteVector, HexStringSyntax} + +import scala.concurrent.duration.DurationInt + +class PeerScorerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("application")) with AnyFunSuiteLike { + + private val dummyPubKey = PrivateKey(ByteVector32.One).publicKey + private val dummyAliases = ShortIdAliases(Alias(42), None) + private val dummyChannelAnn = ChannelAnnouncement(ByteVector64.Zeroes, ByteVector64.Zeroes, ByteVector64.Zeroes, ByteVector64.Zeroes, Features.empty, Block.RegtestGenesisBlock.hash, RealShortChannelId(42), dummyPubKey, dummyPubKey, dummyPubKey, dummyPubKey) + + private val localNodeId = PublicKey(hex"03bd04635f1465d9347f3d69edc51f17cdf9548847533e084cd9d153a4abb065cd") + private val remoteNodeId1 = PublicKey(hex"024c9c77624899672c78d84b551ef1187cbb17618b2d96ef189d0ea36f307be76e") + private val remoteNodeId2 = PublicKey(hex"02271ffb6969f6dc4769438637d7d24dc3358098cdef7a772f9ccfd31251470e28") + private val remoteNodeId3 = PublicKey(hex"028f5be42aa013f9fd2e5a28a152563ac21acc095ef65cab2e835a789d2a4add96") + + private def commitments(remoteNodeId: PublicKey, toLocal: BtcAmount, toRemote: BtcAmount, announceChannel: Boolean = true): Commitments = { + CommitmentsSpec.makeCommitments(toLocal.toMilliSatoshi, toRemote.toMilliSatoshi, localNodeId, remoteNodeId, if (announceChannel) Some(dummyChannelAnn) else None) + } + + private def updateChannelBalance(c: Commitments, toLocal: BtcAmount, toRemote: BtcAmount): Commitments = { + val c1 = commitments(c.remoteNodeId, toLocal, toRemote, c.announceChannel) + c1.copy(channelParams = c1.channelParams.copy(channelId = c.channelId)) + } + + private def channelUpdate(capacity: Satoshi, fees: RelayFees = RelayFees(250 msat, 1000), timestamp: TimestampSecond = TimestampSecond.now(), announceChannel: Boolean = true): ChannelUpdate = { + val messageFlags = MessageFlags(dontForward = !announceChannel) + val channelFlags = ChannelFlags(isEnabled = true, isNode1 = true) + ChannelUpdate(ByteVector64.Zeroes, Block.RegtestGenesisBlock.hash, RealShortChannelId(42), timestamp, messageFlags, channelFlags, CltvExpiryDelta(36), 1 msat, fees.feeBase, fees.feeProportionalMillionths, capacity.toMilliSatoshi) + } + + private def channel(remoteNodeId: PublicKey, toLocal: BtcAmount, toRemote: BtcAmount, fees: RelayFees = RelayFees(250 msat, 1000), announceChannel: Boolean = true): DATA_NORMAL = { + val c = commitments(remoteNodeId, toLocal, toRemote, announceChannel) + val ann_opt = if (announceChannel) Some(dummyChannelAnn) else None + val update = channelUpdate(c.capacity, fees, TimestampSecond.now(), announceChannel) + DATA_NORMAL(c, dummyAliases, ann_opt, update, SpliceStatus.NoSplice, None, None, None) + } + + test("create buckets") { + // February 5th 2026 at 12h00 UTC. + val timestamp = TimestampMilli(1770292800000L) + assert(Bucket.from(timestamp) == Bucket(day = 5, month = 2, year = 2026, slot = Bucket.bucketsPerDay / 2)) + assert(Bucket.from(timestamp - 1.millis) == Bucket(day = 5, month = 2, year = 2026, slot = Bucket.bucketsPerDay / 2 - 1)) + assert(Bucket.from(timestamp - Bucket.duration) == Bucket(day = 5, month = 2, year = 2026, slot = Bucket.bucketsPerDay / 2 - 1)) + assert(Bucket.from(timestamp - Bucket.duration - 1.millis) == Bucket(day = 5, month = 2, year = 2026, slot = Bucket.bucketsPerDay / 2 - 2)) + assert(Bucket.from(timestamp - Bucket.duration * Bucket.bucketsPerDay / 2) == Bucket(day = 5, month = 2, year = 2026, slot = 0)) + assert(Bucket.from(timestamp - Bucket.duration * Bucket.bucketsPerDay / 2 - 1.millis) == Bucket(day = 4, month = 2, year = 2026, slot = Bucket.bucketsPerDay - 1)) + } + + test("sort buckets") { + val b1 = Bucket(day = 30, month = 11, year = 2025, slot = 0) + val b2 = Bucket(day = 30, month = 11, year = 2025, slot = 7) + val b3 = Bucket(day = 30, month = 11, year = 2025, slot = 9) + val b4 = Bucket(day = 1, month = 12, year = 2025, slot = 2) + val b5 = Bucket(day = 1, month = 12, year = 2025, slot = 3) + val b6 = Bucket(day = 15, month = 12, year = 2025, slot = 5) + val b7 = Bucket(day = 1, month = 1, year = 2026, slot = 1) + assert(b1 < b2 && b2 < b3 && b3 < b4 && b4 < b5 && b5 < b6 && b6 < b7) + assert(Seq(b3, b6, b5, b1, b4, b2, b7).sorted == Seq(b1, b2, b3, b4, b5, b6, b7)) + } + + // TODO: + // -> verify that peer without stats is kept and we reclaim their liquidity eventually + // -> verify that remove-old-buckets removes past data without removing peer + // -> peer statistics: + // -> basic unit tests to verify that correctly tracked + // -> then simulations of interesting flows to see recommendations? + // -> liquidity decisions: + // -> none if pending channel + // -> pending channel can be aborted and then liquidity decision + + test("keep track of channel balances and state") { + val now = TimestampMilli.now() + val probe = TestProbe[ScoreBoard]() + + // We have 4 channels with our first peer: 2 of them are active. + val c1a = DATA_WAIT_FOR_DUAL_FUNDING_READY(commitments(remoteNodeId1, toLocal = 0.1 btc, toRemote = 0.2 btc), dummyAliases) + val c1b = channel(remoteNodeId1, toLocal = 0.15 btc, toRemote = 0.25 btc) + val c1c = channel(remoteNodeId1, toLocal = 0.07 btc, toRemote = 0.03 btc) + val c1d = DATA_SHUTDOWN(commitments(remoteNodeId1, toLocal = 0.1 btc, toRemote = 0.2 btc), null, null, CloseStatus.Initiator(None)) + // We have 2 channels with our second peer: none of them are active. + val c2a = DATA_WAIT_FOR_FUNDING_CONFIRMED(commitments(remoteNodeId2, toLocal = 0.13 btc, toRemote = 0.24 btc), BlockHeight(750_000), None, null) + val c2b = DATA_NEGOTIATING_SIMPLE(commitments(remoteNodeId2, toLocal = 0.5 btc, toRemote = 0.1 btc), FeeratePerKw(2000 sat), ByteVector.empty, ByteVector.empty, Nil, Nil) + // We have 2 channels with our third peer: none of them are active. + val c3a = DATA_WAIT_FOR_DUAL_FUNDING_CONFIRMED(commitments(remoteNodeId3, toLocal = 0.7 btc, toRemote = 0.2 btc), 0 msat, 0 msat, BlockHeight(750_000), BlockHeight(750_000), DualFundingStatus.WaitingForConfirmations, None) + val c3b = DATA_CLOSING(commitments(remoteNodeId3, toLocal = 0.2 btc, toRemote = 0.2 btc), BlockHeight(750_000), ByteVector.empty, Nil, ClosingTx(InputInfo(OutPoint(randomTxId(), 2), TxOut(500_000 sat, Script.pay2wpkh(dummyPubKey))), Transaction(2, Nil, TxOut(500_000 sat, Script.pay2wpkh(dummyPubKey)) :: Nil, 0), None) :: Nil) + + // We initialize our scorer actor with these existing channels. + val scorer = testKit.spawn(PeerScorer(TestDatabases.inMemoryDb().audit, Seq(c1a, c1b, c1c, c1d, c2a, c2b, c3a, c3b))) + // We have relayed payments through channels that have been closed since then. + scorer.ref ! WrappedPaymentRelayed(ChannelPaymentRelayed( + paymentHash = randomBytes32(), + incoming = Seq(IncomingPayment(c1b.channelId, remoteNodeId1, 58_000_000 msat, now - 5.minutes)), + outgoing = Seq(OutgoingPayment(c3b.channelId, remoteNodeId3, 50_000_000 msat, now)) + )) + // We have relayed payments through active channels as well. + scorer.ref ! WrappedPaymentRelayed(ChannelPaymentRelayed( + paymentHash = randomBytes32(), + incoming = Seq(IncomingPayment(c2b.channelId, remoteNodeId2, 15_000_000 msat, now - 5.minutes)), + outgoing = Seq(OutgoingPayment(c1b.channelId, remoteNodeId1, 10_000_000 msat, now)) + )) + // We filter out channels that are closing or not ready yet. + scorer.ref ! ScorePeers(Some(probe.ref)) + inside(probe.expectMessageType[ScoreBoard]) { s => + assert(s.bestPeers.map(_.remoteNodeId).toSet == Set(remoteNodeId1, remoteNodeId3)) + // We only take into account the two active channels with our first peer. + val peer1 = s.bestPeers.find(_.remoteNodeId == remoteNodeId1).get + assert(peer1.channels.map(_.channelId).toSet == Set(c1b.channelId, c1c.channelId)) + assert(peer1.latestUpdate_opt.exists(u => Set(c1b.channelUpdate, c1c.channelUpdate).contains(u))) + assert(peer1.capacity == 0.5.btc.toSatoshi) + assert(0.21.btc.toMilliSatoshi <= peer1.canSend && peer1.canSend <= 0.22.btc.toMilliSatoshi) + assert(0.27.btc.toMilliSatoshi <= peer1.canReceive && peer1.canReceive <= 0.28.btc.toMilliSatoshi) + // We don't have any active channel with our third peer. + assert(s.bestPeers.find(_.remoteNodeId == remoteNodeId3).get.channels.isEmpty) + } + + // Our pending channel with our second peer becomes ready. + val update2a = channelUpdate(c2a.commitments.capacity, RelayFees(500 msat, 500)) + scorer.ref ! WrappedLocalChannelUpdate(LocalChannelUpdate( + channel = ActorRef.noSender, + channelId = c2a.channelId, + aliases = dummyAliases, + remoteNodeId = remoteNodeId2, + announcement_opt = None, + channelUpdate = update2a, + commitments = c2a.commitments, + )) + + // Our pending channel with our third peer is aborted. + scorer.ref ! WrappedLocalChannelDown(LocalChannelDown( + channel = ActorRef.noSender, + channelId = c3a.channelId, + realScids = Nil, + aliases = dummyAliases, + remoteNodeId = remoteNodeId3 + )) + + // We relay a payment to our second peer. + scorer.ref ! WrappedPaymentRelayed(ChannelPaymentRelayed( + paymentHash = randomBytes32(), + incoming = Seq(IncomingPayment(c1b.channelId, remoteNodeId1, 30_000_000 msat, now - 5.minutes)), + outgoing = Seq(OutgoingPayment(c2a.channelId, remoteNodeId2, 15_000_000 msat, now)) + )) + scorer.ref ! ScorePeers(Some(probe.ref)) + inside(probe.expectMessageType[ScoreBoard]) { s => + assert(s.bestPeers.map(_.remoteNodeId).toSet == Set(remoteNodeId1, remoteNodeId2)) + val peer2 = s.bestPeers.find(_.remoteNodeId == remoteNodeId2).get + assert(peer2.channels.map(_.channelId).toSet == Set(c2a.channelId)) + assert(peer2.latestUpdate_opt.contains(update2a)) + } + + // We update our routing fees with our first peer. + val update1b = channelUpdate(c1b.commitments.capacity, RelayFees(100 msat, 600), TimestampSecond.now() + 10.seconds) + scorer.ref ! WrappedLocalChannelUpdate(LocalChannelUpdate( + channel = ActorRef.noSender, + channelId = c1b.channelId, + aliases = dummyAliases, + remoteNodeId = remoteNodeId1, + announcement_opt = None, + channelUpdate = update1b, + commitments = updateChannelBalance(c1b.commitments, toLocal = 0.3 btc, toRemote = 0.1 btc), + )) + + // We ignore previous channel updates from another channel. + val update1c = channelUpdate(c1c.commitments.capacity, RelayFees(150 msat, 400), update1b.timestamp - 10.seconds) + scorer.ref ! WrappedLocalChannelUpdate(LocalChannelUpdate( + channel = ActorRef.noSender, + channelId = c1c.channelId, + aliases = dummyAliases, + remoteNodeId = remoteNodeId1, + announcement_opt = None, + channelUpdate = update1c, + commitments = updateChannelBalance(c1c.commitments, toLocal = 0.05 btc, toRemote = 0.05 btc), + )) + + // Channels with our second peer are closed. + scorer.ref ! WrappedLocalChannelDown(LocalChannelDown( + channel = ActorRef.noSender, + channelId = c2a.channelId, + realScids = Nil, + aliases = dummyAliases, + remoteNodeId = remoteNodeId2 + )) + scorer.ref ! WrappedLocalChannelDown(LocalChannelDown( + channel = ActorRef.noSender, + channelId = c2b.channelId, + realScids = Nil, + aliases = dummyAliases, + remoteNodeId = remoteNodeId2 + )) + + // The only remaining channels are with our first peer, with updated balances and the latest channel update. + scorer.ref ! ScorePeers(Some(probe.ref)) + inside(probe.expectMessageType[ScoreBoard]) { s => + assert(s.bestPeers.map(_.remoteNodeId).toSet == Set(remoteNodeId1)) + val peer1 = s.bestPeers.find(_.remoteNodeId == remoteNodeId1).get + assert(peer1.channels.map(_.channelId).toSet == Set(c1b.channelId, c1c.channelId)) + assert(peer1.capacity == 0.5.btc.toSatoshi) + assert(0.34.btc.toMilliSatoshi <= peer1.canSend && peer1.canSend <= 0.35.btc.toMilliSatoshi) + assert(0.14.btc.toMilliSatoshi <= peer1.canReceive && peer1.canReceive <= 0.15.btc.toMilliSatoshi) + assert(peer1.latestUpdate_opt.contains(update1b)) + } + } + +}