From 320cf9286e133e07bf8e4ebdef14e648f7e25e63 Mon Sep 17 00:00:00 2001 From: Vasil Dimov Date: Thu, 14 Dec 2023 13:21:01 +0100 Subject: [PATCH 01/13] log: introduce a new category for private broadcast --- src/logging.cpp | 1 + src/logging.h | 1 + 2 files changed, 2 insertions(+) diff --git a/src/logging.cpp b/src/logging.cpp index f9d8f9804881..47b83d1a45bb 100644 --- a/src/logging.cpp +++ b/src/logging.cpp @@ -202,6 +202,7 @@ static const std::map> LOG_CATEGORIES_ {"scan", BCLog::SCAN}, {"txpackages", BCLog::TXPACKAGES}, {"kernel", BCLog::KERNEL}, + {"privatebroadcast", BCLog::PRIVATE_BROADCAST}, }; static const std::unordered_map LOG_CATEGORIES_BY_FLAG{ diff --git a/src/logging.h b/src/logging.h index defff61d3008..2061950ddf27 100644 --- a/src/logging.h +++ b/src/logging.h @@ -95,6 +95,7 @@ namespace BCLog { SCAN = (CategoryMask{1} << 27), TXPACKAGES = (CategoryMask{1} << 28), KERNEL = (CategoryMask{1} << 29), + PRIVATE_BROADCAST = (CategoryMask{1} << 30), ALL = ~NONE, }; enum class Level { From 210d499b49de1af50bcf580f20d2c5c98cd527d4 Mon Sep 17 00:00:00 2001 From: Vasil Dimov Date: Thu, 14 Dec 2023 14:11:05 +0100 Subject: [PATCH 02/13] init: introduce a new option to enable/disable private broadcast Co-authored-by: brunoerg --- src/init.cpp | 41 ++++++++++++++++++++++++-- src/net.h | 2 ++ test/functional/feature_config_args.py | 19 ++++++++++++ 3 files changed, 59 insertions(+), 3 deletions(-) diff --git a/src/init.cpp b/src/init.cpp index 1a33226b7c80..a7781e08c21d 100644 --- a/src/init.cpp +++ b/src/init.cpp @@ -666,6 +666,15 @@ void SetupServerArgs(ArgsManager& argsman, bool can_listen_ipc) OptionsCategory::NODE_RELAY); argsman.AddArg("-minrelaytxfee=", strprintf("Fees (in %s/kvB) smaller than this are considered zero fee for relaying, mining and transaction creation (default: %s)", CURRENCY_UNIT, FormatMoney(DEFAULT_MIN_RELAY_TX_FEE)), ArgsManager::ALLOW_ANY, OptionsCategory::NODE_RELAY); + argsman.AddArg("-privatebroadcast", + strprintf( + "Broadcast transactions submitted via sendrawtransaction RPC using short-lived " + "connections through the Tor or I2P networks, without putting them in the mempool first. " + "Transactions submitted through the wallet are not affected by this option " + "(default: %u)", + DEFAULT_PRIVATE_BROADCAST), + ArgsManager::ALLOW_ANY, + OptionsCategory::NODE_RELAY); argsman.AddArg("-whitelistforcerelay", strprintf("Add 'forcerelay' permission to whitelisted peers with default permissions. This will relay transactions even if the transactions were already in the mempool. (default: %d)", DEFAULT_WHITELISTFORCERELAY), ArgsManager::ALLOW_ANY, OptionsCategory::NODE_RELAY); argsman.AddArg("-whitelistrelay", strprintf("Add 'relay' permission to whitelisted peers with default permissions. This will accept relayed transactions even when not relaying transactions (default: %d)", DEFAULT_WHITELISTRELAY), ArgsManager::ALLOW_ANY, OptionsCategory::NODE_RELAY); @@ -1718,13 +1727,13 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info) } } + const bool listenonion{args.GetBoolArg("-listenonion", DEFAULT_LISTEN_ONION)}; if (onion_proxy.IsValid()) { SetProxy(NET_ONION, onion_proxy); } else { // If -listenonion is set, then we will (try to) connect to the Tor control port // later from the torcontrol thread and may retrieve the onion proxy from there. - const bool listenonion_disabled{!args.GetBoolArg("-listenonion", DEFAULT_LISTEN_ONION)}; - if (onlynet_used_with_onion && listenonion_disabled) { + if (onlynet_used_with_onion && !listenonion) { return InitError( _("Outbound connections restricted to Tor (-onlynet=onion) but the proxy for " "reaching the Tor network is not provided: none of -proxy, -onion or " @@ -2100,7 +2109,7 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info) connOptions.onion_binds.push_back(onion_service_target); } - if (args.GetBoolArg("-listenonion", DEFAULT_LISTEN_ONION)) { + if (listenonion) { if (connOptions.onion_binds.size() > 1) { InitWarning(strprintf(_("More than one onion bind address is provided. Using %s " "for the automatically created Tor onion service."), @@ -2173,6 +2182,32 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info) conflict->ToStringAddrPort())); } + if (args.GetBoolArg("-privatebroadcast", DEFAULT_PRIVATE_BROADCAST)) { + // If -listenonion is set, then NET_ONION may not be reachable now + // but may become reachable later, thus only error here if it is not + // reachable and will not become reachable for sure. + const bool onion_may_become_reachable{listenonion && (!args.IsArgSet("-onlynet") || onlynet_used_with_onion)}; + if (!g_reachable_nets.Contains(NET_I2P) && + !g_reachable_nets.Contains(NET_ONION) && + !onion_may_become_reachable) { + return InitError(_("Private broadcast of own transactions requested (-privatebroadcast), " + "but none of Tor or I2P networks is reachable")); + } + if (!connOptions.m_use_addrman_outgoing) { + return InitError(_("Private broadcast of own transactions requested (-privatebroadcast), " + "but -connect is also configured. They are incompatible because the " + "private broadcast needs to open new connections to randomly " + "chosen Tor or I2P peers. Consider using -maxconnections=0 -addnode=... " + "instead")); + } + if (!proxyRandomize && (g_reachable_nets.Contains(NET_ONION) || onion_may_become_reachable)) { + InitWarning(_("Private broadcast of own transactions requested (-privatebroadcast) and " + "-proxyrandomize is disabled. Tor circuits for private broadcast connections " + "may be correlated to other connections over Tor. For maximum privacy set " + "-proxyrandomize=1.")); + } + } + if (!node.connman->Start(scheduler, connOptions)) { return false; } diff --git a/src/net.h b/src/net.h index 25cb8236a3c2..cc7892a63fca 100644 --- a/src/net.h +++ b/src/net.h @@ -83,6 +83,8 @@ static const std::string DEFAULT_MAX_UPLOAD_TARGET{"0M"}; static const bool DEFAULT_BLOCKSONLY = false; /** -peertimeout default */ static const int64_t DEFAULT_PEER_CONNECT_TIMEOUT = 60; +/** Default for -privatebroadcast. */ +static constexpr bool DEFAULT_PRIVATE_BROADCAST{false}; /** Number of file descriptors required for message capture **/ static const int NUM_FDS_MESSAGE_CAPTURE = 1; /** Interval for ASMap Health Check **/ diff --git a/test/functional/feature_config_args.py b/test/functional/feature_config_args.py index 441c21f03a34..1ead02f9e1ea 100755 --- a/test/functional/feature_config_args.py +++ b/test/functional/feature_config_args.py @@ -411,6 +411,24 @@ def test_connect_with_seednode(self): self.restart_node(0, extra_args=[connect_arg, '-dnsseed', '-proxy=localhost:1080']) self.stop_node(0) + def test_privatebroadcast(self): + self.log.info("Test that an invalid usage of -privatebroadcast throws an init error") + self.stop_node(0) + args_errors = { + "Private broadcast of own transactions requested (-privatebroadcast), " + "but none of Tor or I2P networks is reachable": + ["-privatebroadcast"], + + "Private broadcast of own transactions requested (-privatebroadcast), " + "but -connect is also configured. They are incompatible because the " + "private broadcast needs to open new connections to randomly chosen " + "Tor or I2P peers. Consider using -maxconnections=0 -addnode=... instead" : + # -onion= makes the Tor network reachable + ["-privatebroadcast", "-connect=127.0.0.1:8333", "-onion=127.0.0.1:9050"] + } + for msg, args in args_errors.items(): + self.nodes[0].assert_start_raises_init_error(extra_args=args, expected_msg=f"Error: {msg}") + def test_ignored_conf(self): self.log.info('Test error is triggered when the datadir in use contains a bitcoin.conf file that would be ignored ' 'because a conflicting -conf file argument is passed.') @@ -496,6 +514,7 @@ def run_test(self): self.test_seed_peers() self.test_networkactive() self.test_connect_with_seednode() + self.test_privatebroadcast() self.test_dir_config() self.test_negated_config() From bc60535ae557c558e2a8004d5eb6ec8cabd4882f Mon Sep 17 00:00:00 2001 From: Vasil Dimov Date: Thu, 14 Dec 2023 14:34:08 +0100 Subject: [PATCH 03/13] net: introduce a new connection type for private broadcast We will open a short-lived connection to a random Tor or I2P peer, send our transaction to that peer and close the connection. --- src/bitcoin-cli.cpp | 2 ++ src/net.cpp | 2 ++ src/net.h | 7 +++++++ src/node/connection_types.cpp | 2 ++ src/node/connection_types.h | 7 +++++++ src/qt/guiutil.cpp | 2 ++ src/qt/rpcconsole.cpp | 5 ++++- src/rpc/net.cpp | 3 ++- src/test/util/net.h | 1 + 9 files changed, 29 insertions(+), 2 deletions(-) diff --git a/src/bitcoin-cli.cpp b/src/bitcoin-cli.cpp index 279aa89e88e3..5cfa2ec4becf 100644 --- a/src/bitcoin-cli.cpp +++ b/src/bitcoin-cli.cpp @@ -452,6 +452,7 @@ class NetinfoRequestHandler : public BaseRequestHandler if (conn_type == "block-relay-only") return "block"; if (conn_type == "manual" || conn_type == "feeler") return conn_type; if (conn_type == "addr-fetch") return "addr"; + if (conn_type == "private-broadcast") return "priv"; return ""; } std::string FormatServices(const UniValue& services) @@ -703,6 +704,7 @@ class NetinfoRequestHandler : public BaseRequestHandler " \"manual\" - peer we manually added using RPC addnode or the -addnode/-connect config options\n" " \"feeler\" - short-lived connection for testing addresses\n" " \"addr\" - address fetch; short-lived connection for requesting addresses\n" + " \"priv\" - private broadcast; short-lived connection for broadcasting our transactions\n" " net Network the peer connected through (\"ipv4\", \"ipv6\", \"onion\", \"i2p\", \"cjdns\", or \"npr\" (not publicly routable))\n" " serv Services offered by the peer\n" " \"n\" - NETWORK: peer can serve the full block chain\n" diff --git a/src/net.cpp b/src/net.cpp index d335f2dc526e..5cf415008197 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -1876,6 +1876,7 @@ bool CConnman::AddConnection(const std::string& address, ConnectionType conn_typ switch (conn_type) { case ConnectionType::INBOUND: case ConnectionType::MANUAL: + case ConnectionType::PRIVATE_BROADCAST: return false; case ConnectionType::OUTBOUND_FULL_RELAY: max_connections = m_max_outbound_full_relay; @@ -2665,6 +2666,7 @@ void CConnman::ThreadOpenConnections(const std::vector connect, std // peers from addrman. case ConnectionType::ADDR_FETCH: case ConnectionType::FEELER: + case ConnectionType::PRIVATE_BROADCAST: break; case ConnectionType::MANUAL: case ConnectionType::OUTBOUND_FULL_RELAY: diff --git a/src/net.h b/src/net.h index cc7892a63fca..9ed66af7f052 100644 --- a/src/net.h +++ b/src/net.h @@ -775,6 +775,7 @@ class CNode case ConnectionType::MANUAL: case ConnectionType::ADDR_FETCH: case ConnectionType::FEELER: + case ConnectionType::PRIVATE_BROADCAST: return false; } // no default case, so the compiler can warn about missing cases @@ -796,6 +797,7 @@ class CNode case ConnectionType::FEELER: case ConnectionType::BLOCK_RELAY: case ConnectionType::ADDR_FETCH: + case ConnectionType::PRIVATE_BROADCAST: return false; case ConnectionType::OUTBOUND_FULL_RELAY: case ConnectionType::MANUAL: @@ -817,6 +819,10 @@ class CNode return m_conn_type == ConnectionType::ADDR_FETCH; } + bool IsPrivateBroadcastConn() const { + return m_conn_type == ConnectionType::PRIVATE_BROADCAST; + } + bool IsInboundConn() const { return m_conn_type == ConnectionType::INBOUND; } @@ -830,6 +836,7 @@ class CNode case ConnectionType::OUTBOUND_FULL_RELAY: case ConnectionType::BLOCK_RELAY: case ConnectionType::ADDR_FETCH: + case ConnectionType::PRIVATE_BROADCAST: return true; } // no default case, so the compiler can warn about missing cases diff --git a/src/node/connection_types.cpp b/src/node/connection_types.cpp index 5e4dc5bf2ef9..4cf98047cf19 100644 --- a/src/node/connection_types.cpp +++ b/src/node/connection_types.cpp @@ -20,6 +20,8 @@ std::string ConnectionTypeAsString(ConnectionType conn_type) return "block-relay-only"; case ConnectionType::ADDR_FETCH: return "addr-fetch"; + case ConnectionType::PRIVATE_BROADCAST: + return "private-broadcast"; } // no default case, so the compiler can warn about missing cases assert(false); diff --git a/src/node/connection_types.h b/src/node/connection_types.h index a00895e2a8a5..eeb106b616d7 100644 --- a/src/node/connection_types.h +++ b/src/node/connection_types.h @@ -75,6 +75,13 @@ enum class ConnectionType { * AddrMan is empty. */ ADDR_FETCH, + + /** + * Private broadcast connections are short-lived and only opened to + * privacy networks (Tor, I2P) for relaying privacy-sensitive data (like + * our own transactions) and closed afterwards. + */ + PRIVATE_BROADCAST, }; /** Convert ConnectionType enum to a string value */ diff --git a/src/qt/guiutil.cpp b/src/qt/guiutil.cpp index 28610db45127..f7b6c15c1486 100644 --- a/src/qt/guiutil.cpp +++ b/src/qt/guiutil.cpp @@ -729,6 +729,8 @@ QString ConnectionTypeToQString(ConnectionType conn_type, bool prepend_direction case ConnectionType::FEELER: return prefix + QObject::tr("Feeler"); //: Short-lived peer connection type that solicits known addresses from a peer. case ConnectionType::ADDR_FETCH: return prefix + QObject::tr("Address Fetch"); + //: Short-lived peer connection type that is used for broadcasting privacy-sensitive data. + case ConnectionType::PRIVATE_BROADCAST: return prefix + QObject::tr("Private Broadcast"); } // no default case, so the compiler can warn about missing cases assert(false); } diff --git a/src/qt/rpcconsole.cpp b/src/qt/rpcconsole.cpp index d6d2be7b3934..8723a52a0878 100644 --- a/src/qt/rpcconsole.cpp +++ b/src/qt/rpcconsole.cpp @@ -484,7 +484,10 @@ RPCConsole::RPCConsole(interfaces::Node& node, const PlatformStyle *_platformSty tr("Outbound Feeler: short-lived, for testing addresses"), /*: Explanatory text for a short-lived outbound peer connection that is used to request addresses from a peer. */ - tr("Outbound Address Fetch: short-lived, for soliciting addresses")}; + tr("Outbound Address Fetch: short-lived, for soliciting addresses"), + /*: Explanatory text for a short-lived outbound peer connection that is used + to broadcast privacy-sensitive data (like our transactions). */ + tr("Private broadcast: short-lived, for broadcasting privacy-sensitive transactions")}; const QString connection_types_list{"
  • " + Join(CONNECTION_TYPE_DOC, QString("
  • ")) + "
"}; ui->peerConnectionTypeLabel->setToolTip(ui->peerConnectionTypeLabel->toolTip().arg(connection_types_list)); const std::vector TRANSPORT_TYPE_DOC{ diff --git a/src/rpc/net.cpp b/src/rpc/net.cpp index c97d4c75af0e..e48ca1a51389 100644 --- a/src/rpc/net.cpp +++ b/src/rpc/net.cpp @@ -48,7 +48,8 @@ const std::vector CONNECTION_TYPE_DOC{ "inbound (initiated by the peer)", "manual (added via addnode RPC or -addnode/-connect configuration options)", "addr-fetch (short-lived automatic connection for soliciting addresses)", - "feeler (short-lived automatic connection for testing addresses)" + "feeler (short-lived automatic connection for testing addresses)", + "private-broadcast (short-lived automatic connection for broadcasting privacy-sensitive transactions)" }; const std::vector TRANSPORT_TYPE_DOC{ diff --git a/src/test/util/net.h b/src/test/util/net.h index 77954d92a486..605b2fa81a07 100644 --- a/src/test/util/net.h +++ b/src/test/util/net.h @@ -143,6 +143,7 @@ constexpr ConnectionType ALL_CONNECTION_TYPES[]{ ConnectionType::FEELER, ConnectionType::BLOCK_RELAY, ConnectionType::ADDR_FETCH, + ConnectionType::PRIVATE_BROADCAST, }; constexpr auto ALL_NETWORKS = std::array{ From 15b7d1ceb3e54518c8752081e627d885f2e9bbde Mon Sep 17 00:00:00 2001 From: Vasil Dimov Date: Wed, 2 Apr 2025 06:07:41 +0200 Subject: [PATCH 04/13] net: implement opening PRIVATE_BROADCAST connections Implement opening `ConnectionType::PRIVATE_BROADCAST` connections with the following properties: * Only to Tor or I2P (or IPv4/IPv6 through the Tor proxy, if provided) * Open such connections only when requested and don't maintain N opened connections of this type. * Since this is substantially different than what `OpenNetworkConnection()` does, open the private broadcast connections from a different thread instead of modifying `OpenNetworkConnection()` to also open those types of connections. Co-authored-by: Andrew Toth --- src/init.cpp | 7 ++- src/net.cpp | 170 ++++++++++++++++++++++++++++++++++++++++++++++++++- src/net.h | 68 +++++++++++++++++++++ 3 files changed, 242 insertions(+), 3 deletions(-) diff --git a/src/init.cpp b/src/init.cpp index a7781e08c21d..48ca6dd2ba95 100644 --- a/src/init.cpp +++ b/src/init.cpp @@ -543,7 +543,7 @@ void SetupServerArgs(ArgsManager& argsman, bool can_listen_ipc) argsman.AddArg("-forcednsseed", strprintf("Always query for peer addresses via DNS lookup (default: %u)", DEFAULT_FORCEDNSSEED), ArgsManager::ALLOW_ANY, OptionsCategory::CONNECTION); argsman.AddArg("-listen", strprintf("Accept connections from outside (default: %u if no -proxy, -connect or -maxconnections=0)", DEFAULT_LISTEN), ArgsManager::ALLOW_ANY, OptionsCategory::CONNECTION); argsman.AddArg("-listenonion", strprintf("Automatically create Tor onion service (default: %d)", DEFAULT_LISTEN_ONION), ArgsManager::ALLOW_ANY, OptionsCategory::CONNECTION); - argsman.AddArg("-maxconnections=", strprintf("Maintain at most automatic connections to peers (default: %u). This limit does not apply to connections manually added via -addnode or the addnode RPC, which have a separate limit of %u.", DEFAULT_MAX_PEER_CONNECTIONS, MAX_ADDNODE_CONNECTIONS), ArgsManager::ALLOW_ANY, OptionsCategory::CONNECTION); + argsman.AddArg("-maxconnections=", strprintf("Maintain at most automatic connections to peers (default: %u). This limit does not apply to connections manually added via -addnode or the addnode RPC, which have a separate limit of %u. It does not apply to short-lived private broadcast connections either, which have a separate limit of %u.", DEFAULT_MAX_PEER_CONNECTIONS, MAX_ADDNODE_CONNECTIONS, MAX_PRIVATE_BROADCAST_CONNECTIONS), ArgsManager::ALLOW_ANY, OptionsCategory::CONNECTION); argsman.AddArg("-maxreceivebuffer=", strprintf("Maximum per-connection receive buffer, *1000 bytes (default: %u)", DEFAULT_MAXRECEIVEBUFFER), ArgsManager::ALLOW_ANY, OptionsCategory::CONNECTION); argsman.AddArg("-maxsendbuffer=", strprintf("Maximum per-connection memory usage for the send buffer, *1000 bytes (default: %u)", DEFAULT_MAXSENDBUFFER), ArgsManager::ALLOW_ANY, OptionsCategory::CONNECTION); argsman.AddArg("-maxuploadtarget=", strprintf("Tries to keep outbound traffic under the given target per 24h. Limit does not apply to peers with 'download' permission or blocks created within past week. 0 = no limit (default: %s). Optional suffix units [k|K|m|M|g|G|t|T] (default: M). Lowercase is 1000 base while uppercase is 1024 base", DEFAULT_MAX_UPLOAD_TARGET), ArgsManager::ALLOW_ANY, OptionsCategory::CONNECTION); @@ -1008,11 +1008,14 @@ bool AppInitParameterInteraction(const ArgsManager& args) if (user_max_connection < 0) { return InitError(Untranslated("-maxconnections must be greater or equal than zero")); } + const size_t max_private{args.GetBoolArg("-privatebroadcast", DEFAULT_PRIVATE_BROADCAST) + ? MAX_PRIVATE_BROADCAST_CONNECTIONS + : 0}; // Reserve enough FDs to account for the bare minimum, plus any manual connections, plus the bound interfaces int min_required_fds = MIN_CORE_FDS + MAX_ADDNODE_CONNECTIONS + nBind; // Try raising the FD limit to what we need (available_fds may be smaller than the requested amount if this fails) - available_fds = RaiseFileDescriptorLimit(user_max_connection + min_required_fds); + available_fds = RaiseFileDescriptorLimit(user_max_connection + max_private + min_required_fds); // If we are using select instead of poll, our actual limit may be even smaller #ifndef USE_POLL available_fds = std::min(FD_SETSIZE, available_fds); diff --git a/src/net.cpp b/src/net.cpp index 5cf415008197..5c7d097ab306 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -458,7 +458,10 @@ CNode* CConnman::ConnectNode(CAddress addrConnect, i2p::Connection conn; bool connected{false}; - if (m_i2p_sam_session) { + // If an I2P SAM session already exists, normally we would re-use it. But in the case of + // private broadcast we force a new transient session. A Connect() using m_i2p_sam_session + // would use our permanent I2P address as a source address. + if (m_i2p_sam_session && conn_type != ConnectionType::PRIVATE_BROADCAST) { connected = m_i2p_sam_session->Connect(target_addr, conn, proxyConnectionFailed); } else { { @@ -3051,6 +3054,73 @@ bool CConnman::OpenNetworkConnection(const CAddress& addrConnect, return true; } +std::optional CConnman::PrivateBroadcast::PickNetwork(std::optional& proxy) const +{ + prevector<4, Network> nets; + std::optional clearnet_proxy; + if (g_reachable_nets.Contains(NET_ONION)) { + nets.push_back(NET_ONION); + + clearnet_proxy = ProxyForIPv4or6(); + if (clearnet_proxy.has_value()) { + if (g_reachable_nets.Contains(NET_IPV4)) { + nets.push_back(NET_IPV4); + } + if (g_reachable_nets.Contains(NET_IPV6)) { + nets.push_back(NET_IPV6); + } + } + } + if (g_reachable_nets.Contains(NET_I2P)) { + nets.push_back(NET_I2P); + } + + if (nets.empty()) { + return std::nullopt; + } + + const Network net{nets[FastRandomContext{}.randrange(nets.size())]}; + if (net == NET_IPV4 || net == NET_IPV6) { + proxy = clearnet_proxy; + } + return net; +} + +size_t CConnman::PrivateBroadcast::NumToOpen() const +{ + return m_num_to_open; +} + +void CConnman::PrivateBroadcast::NumToOpenAdd(size_t n) +{ + m_num_to_open += n; + m_num_to_open.notify_all(); +} + +size_t CConnman::PrivateBroadcast::NumToOpenSub(size_t n) +{ + size_t current_value{m_num_to_open.load()}; + size_t new_value; + do { + new_value = current_value > n ? current_value - n : 0; + } while (!m_num_to_open.compare_exchange_weak(current_value, new_value)); + return new_value; +} + +void CConnman::PrivateBroadcast::NumToOpenWait() const +{ + m_num_to_open.wait(0); +} + +std::optional CConnman::PrivateBroadcast::ProxyForIPv4or6() const +{ + Proxy tor_proxy; + if (m_outbound_tor_ok_at_least_once.load() && GetProxy(NET_ONION, tor_proxy)) { + return tor_proxy; + } + return std::nullopt; +} + Mutex NetEventsInterface::g_msgproc_mutex; void CConnman::ThreadMessageHandler() @@ -3135,6 +3205,85 @@ void CConnman::ThreadI2PAcceptIncoming() } } +void CConnman::ThreadPrivateBroadcast() +{ + AssertLockNotHeld(m_unused_i2p_sessions_mutex); + + size_t addrman_num_bad_addresses{0}; + while (!m_interrupt_net->interrupted()) { + + if (!fNetworkActive) { + m_interrupt_net->sleep_for(5s); + continue; + } + + CountingSemaphoreGrant<> conn_max_grant{m_private_broadcast.m_sem_conn_max}; // Would block if too many are opened. + + m_private_broadcast.NumToOpenWait(); + + if (m_interrupt_net->interrupted()) { + break; + } + + std::optional proxy; + const std::optional net{m_private_broadcast.PickNetwork(proxy)}; + if (!net.has_value()) { + LogPrintLevel(BCLog::PRIVATE_BROADCAST, + BCLog::Level::Warning, + "Connections needed but none of the Tor or I2P networks is reachable"); + m_interrupt_net->sleep_for(5s); + continue; + } + + const auto [addr, _] = addrman.Select(/*new_only=*/false, {net.value()}); + + if (!addr.IsValid() || IsLocal(addr)) { + ++addrman_num_bad_addresses; + if (addrman_num_bad_addresses > 100) { + LogDebug(BCLog::PRIVATE_BROADCAST, + "Connections needed but addrman keeps returning bad addresses, will retry"); + m_interrupt_net->sleep_for(500ms); + } + continue; + } + addrman_num_bad_addresses = 0; + + auto target_str{addr.ToStringAddrPort()}; + if (proxy.has_value()) { + target_str += " through the proxy at " + proxy->ToString(); + } + + const bool use_v2transport(addr.nServices & GetLocalServices() & NODE_P2P_V2); + + if (OpenNetworkConnection(addr, + /*fCountFailure=*/true, + std::move(conn_max_grant), + /*pszDest=*/nullptr, + ConnectionType::PRIVATE_BROADCAST, + use_v2transport, + proxy)) { + const size_t remaining{m_private_broadcast.NumToOpenSub(1)}; + LogDebug(BCLog::PRIVATE_BROADCAST, + "Socket connected to %s; remaining connections to open: %d", + target_str, + remaining); + } else { + const size_t remaining{m_private_broadcast.NumToOpen()}; + if (remaining == 0) { + LogDebug(BCLog::PRIVATE_BROADCAST, + "Failed to connect to %s, will not retry, no more connections needed", + target_str); + } else { + LogDebug(BCLog::PRIVATE_BROADCAST, + "Failed to connect to %s, will retry to a different address; remaining connections " + "to open: %d", + target_str, + remaining); + } + } + } +} + bool CConnman::BindListenPort(const CService& addrBind, bilingual_str& strError, NetPermissionFlags permissions) { int nOne = 1; @@ -3415,6 +3564,11 @@ bool CConnman::Start(CScheduler& scheduler, const Options& connOptions) std::thread(&util::TraceThread, "i2paccept", [this] { ThreadI2PAcceptIncoming(); }); } + if (gArgs.GetBoolArg("-privatebroadcast", DEFAULT_PRIVATE_BROADCAST)) { + threadPrivateBroadcast = + std::thread(&util::TraceThread, "privbcast", [this] { ThreadPrivateBroadcast(); }); + } + // Dump network addresses scheduler.scheduleEvery([this] { DumpAddresses(); }, DUMP_PEERS_INTERVAL); @@ -3464,10 +3618,16 @@ void CConnman::Interrupt() semAddnode->release(); } } + + m_private_broadcast.m_sem_conn_max.release(); + m_private_broadcast.NumToOpenAdd(1); // Just unblock NumToOpenWait() to be able to continue with shutdown. } void CConnman::StopThreads() { + if (threadPrivateBroadcast.joinable()) { + threadPrivateBroadcast.join(); + } if (threadI2PAcceptIncoming.joinable()) { threadI2PAcceptIncoming.join(); } @@ -3897,6 +4057,14 @@ bool CConnman::NodeFullyConnected(const CNode* pnode) void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg) { AssertLockNotHeld(m_total_bytes_sent_mutex); + + if (!m_private_broadcast.m_outbound_tor_ok_at_least_once.load() && !pnode->IsInboundConn() && + pnode->addr.IsTor() && msg.m_type == NetMsgType::VERACK) { + // If we are sending the peer VERACK that means we successfully sent + // and received another message to/from that peer (VERSION). + m_private_broadcast.m_outbound_tor_ok_at_least_once.store(true); + } + size_t nMessageSize = msg.data.size(); LogDebug(BCLog::NET, "sending %s (%d bytes) peer=%d\n", msg.m_type, nMessageSize, pnode->GetId()); if (gArgs.GetBoolArg("-capturemessages", false)) { diff --git a/src/net.h b/src/net.h index 9ed66af7f052..dbc01d8a031b 100644 --- a/src/net.h +++ b/src/net.h @@ -73,6 +73,8 @@ static const int MAX_ADDNODE_CONNECTIONS = 8; static const int MAX_BLOCK_RELAY_ONLY_CONNECTIONS = 2; /** Maximum number of feeler connections */ static const int MAX_FEELER_CONNECTIONS = 1; +/** Maximum number of private broadcast connections */ +static constexpr size_t MAX_PRIVATE_BROADCAST_CONNECTIONS{64}; /** -listen default */ static const bool DEFAULT_LISTEN = true; /** The maximum number of peer connections to maintain. */ @@ -1180,6 +1182,70 @@ class CConnman const std::optional& proxy_override = std::nullopt) EXCLUSIVE_LOCKS_REQUIRED(!m_unused_i2p_sessions_mutex); + /// Group of private broadcast related members. + class PrivateBroadcast + { + public: + /** + * Remember if we ever established at least one outbound connection to a + * Tor peer, including sending and receiving P2P messages. If this is + * true then the Tor proxy indeed works and is a proxy to the Tor network, + * not a misconfigured ordinary SOCKS5 proxy as -proxy or -onion. If that + * is the case, then we assume that connecting to an IPv4 or IPv6 address + * via that proxy will be done through the Tor network and a Tor exit node. + */ + std::atomic_bool m_outbound_tor_ok_at_least_once{false}; + + /** + * Semaphore used to guard against opening too many connections. + * Opening private broadcast connections will be paused if this is equal to 0. + */ + std::counting_semaphore<> m_sem_conn_max{MAX_PRIVATE_BROADCAST_CONNECTIONS}; + + /** + * Choose a network to open a connection to. + * @param[out] proxy Optional proxy to override the normal proxy selection. + * Will be set if !std::nullopt is returned. Could be set to `std::nullopt` + * if there is no need to override the proxy that would be used for connecting + * to the returned network. + * @retval std::nullopt No network could be selected. + * @retval !std::nullopt The network was selected and `proxy` is set (maybe to `std::nullopt`). + */ + std::optional PickNetwork(std::optional& proxy) const; + + /// Get the pending number of connections to open. + size_t NumToOpen() const; + + /** + * Increment the number of new connections of type `ConnectionType::PRIVATE_BROADCAST` + * to be opened by `CConnman::ThreadPrivateBroadcast()`. + * @param[in] n Increment by this number. + */ + void NumToOpenAdd(size_t n); + + /** + * Decrement the number of new connections of type `ConnectionType::PRIVATE_BROADCAST` + * to be opened by `CConnman::ThreadPrivateBroadcast()`. + * @param[in] n Decrement by this number. + * @return The number of connections that remain to be opened after the operation. + */ + size_t NumToOpenSub(size_t n); + + /// Wait for the number of needed connections to become greater than 0. + void NumToOpenWait() const; + + private: + /** + * Check if private broadcast can be done to IPv4 or IPv6 peers and if so via which proxy. + * If private broadcast connections should not be opened to IPv4 or IPv6, then this will + * return an empty optional. + */ + std::optional ProxyForIPv4or6() const; + + /// Number of `ConnectionType::PRIVATE_BROADCAST` connections to open. + std::atomic_size_t m_num_to_open{0}; + } m_private_broadcast; + bool CheckIncomingNonce(uint64_t nonce); void ASMapHealthCheck(); @@ -1354,6 +1420,7 @@ class CConnman void ThreadOpenConnections(std::vector connect, std::span seed_nodes) EXCLUSIVE_LOCKS_REQUIRED(!m_addr_fetches_mutex, !m_added_nodes_mutex, !m_nodes_mutex, !m_unused_i2p_sessions_mutex, !m_reconnections_mutex); void ThreadMessageHandler() EXCLUSIVE_LOCKS_REQUIRED(!mutexMsgProc); void ThreadI2PAcceptIncoming(); + void ThreadPrivateBroadcast() EXCLUSIVE_LOCKS_REQUIRED(!m_unused_i2p_sessions_mutex); void AcceptConnection(const ListenSocket& hListenSocket); /** @@ -1644,6 +1711,7 @@ class CConnman std::thread threadOpenConnections; std::thread threadMessageHandler; std::thread threadI2PAcceptIncoming; + std::thread threadPrivateBroadcast; /** flag for deciding to connect to an extra outbound peer, * in excess of m_max_outbound_full_relay From cc877b6d57d6e6a4c08f2111054adbaeca41e42e Mon Sep 17 00:00:00 2001 From: Vasil Dimov Date: Wed, 20 Dec 2023 15:32:17 +0100 Subject: [PATCH 05/13] net_processing: rename RelayTransaction() to better describe what it does Rename `PeerManager::RelayTransaction()` to `PeerManager::InitiateTxBroadcastToAll()`. The transaction is not relayed when the method returns. It is only enqueued for a possible broadcasting at a later time. Also, there will be another method which only does so to Tor or I2P peers. --- src/net_processing.cpp | 10 +++++----- src/net_processing.h | 9 +++++++-- src/node/transaction.cpp | 2 +- 3 files changed, 13 insertions(+), 8 deletions(-) diff --git a/src/net_processing.cpp b/src/net_processing.cpp index 2ef43b8dc9af..e32cdb53aef4 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -536,7 +536,7 @@ class PeerManagerImpl final : public PeerManager std::vector GetOrphanTransactions() override EXCLUSIVE_LOCKS_REQUIRED(!m_tx_download_mutex); PeerManagerInfo GetInfo() const override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); void SendPings() override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); - void RelayTransaction(const Txid& txid, const Wtxid& wtxid) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); + void InitiateTxBroadcastToAll(const Txid& txid, const Wtxid& wtxid) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); void SetBestBlock(int height, std::chrono::seconds time) override { m_best_height = height; @@ -1578,7 +1578,7 @@ void PeerManagerImpl::ReattemptInitialBroadcast(CScheduler& scheduler) CTransactionRef tx = m_mempool.get(txid); if (tx != nullptr) { - RelayTransaction(txid, tx->GetWitnessHash()); + InitiateTxBroadcastToAll(txid, tx->GetWitnessHash()); } else { m_mempool.RemoveUnbroadcastTx(txid, true); } @@ -2123,7 +2123,7 @@ void PeerManagerImpl::SendPings() for(auto& it : m_peer_map) it.second->m_ping_queued = true; } -void PeerManagerImpl::RelayTransaction(const Txid& txid, const Wtxid& wtxid) +void PeerManagerImpl::InitiateTxBroadcastToAll(const Txid& txid, const Wtxid& wtxid) { LOCK(m_peer_mutex); for(auto& it : m_peer_map) { @@ -3023,7 +3023,7 @@ void PeerManagerImpl::ProcessValidTx(NodeId nodeid, const CTransactionRef& tx, c tx->GetWitnessHash().ToString(), m_mempool.size(), m_mempool.DynamicMemoryUsage() / 1000); - RelayTransaction(tx->GetHash(), tx->GetWitnessHash()); + InitiateTxBroadcastToAll(tx->GetHash(), tx->GetWitnessHash()); for (const CTransactionRef& removedTx : replaced_transactions) { AddToCompactExtraTransactions(removedTx); @@ -4290,7 +4290,7 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, } else { LogPrintf("Force relaying tx %s (wtxid=%s) from peer=%d\n", txid.ToString(), wtxid.ToString(), pfrom.GetId()); - RelayTransaction(txid, wtxid); + InitiateTxBroadcastToAll(txid, wtxid); } } diff --git a/src/net_processing.h b/src/net_processing.h index 6eb4a5e16a2c..afb096987c2c 100644 --- a/src/net_processing.h +++ b/src/net_processing.h @@ -116,8 +116,13 @@ class PeerManager : public CValidationInterface, public NetEventsInterface /** Get peer manager info. */ virtual PeerManagerInfo GetInfo() const = 0; - /** Relay transaction to all peers. */ - virtual void RelayTransaction(const Txid& txid, const Wtxid& wtxid) = 0; + /** + * Initiate a transaction broadcast to eligible peers. + * Queue the witness transaction id to `Peer::TxRelay::m_tx_inventory_to_send` + * for each peer. Later, depending on `Peer::TxRelay::m_next_inv_send_time` and if + * the transaction is in the mempool, an `INV` about it may be sent to the peer. + */ + virtual void InitiateTxBroadcastToAll(const Txid& txid, const Wtxid& wtxid) = 0; /** Send ping message to all peers */ virtual void SendPings() = 0; diff --git a/src/node/transaction.cpp b/src/node/transaction.cpp index f5bd0efe744b..4f0ee6f7060a 100644 --- a/src/node/transaction.cpp +++ b/src/node/transaction.cpp @@ -133,7 +133,7 @@ TransactionError BroadcastTransaction(NodeContext& node, case TxBroadcast::MEMPOOL_NO_BROADCAST: break; case TxBroadcast::MEMPOOL_AND_BROADCAST_TO_ALL: - node.peerman->RelayTransaction(txid, wtxid); + node.peerman->InitiateTxBroadcastToAll(txid, wtxid); break; } From 36415318e5c15947126a62c57f56a71c8355ef66 Mon Sep 17 00:00:00 2001 From: Vasil Dimov Date: Tue, 7 Oct 2025 16:57:15 +0200 Subject: [PATCH 06/13] node: extend node::TxBroadcast with a 3rd option Extend `node::TxBroadcast` with a 3rd option to not add the transaction to the mempool and broadcast privately. This is a non-functional change - `BroadcastTransaction()` will not do anything if the 3rd options is passed and is not used by any of its callers. --- src/node/transaction.cpp | 9 +++++++-- src/node/types.h | 3 +++ src/wallet/wallet.cpp | 3 +++ 3 files changed, 13 insertions(+), 2 deletions(-) diff --git a/src/node/transaction.cpp b/src/node/transaction.cpp index 4f0ee6f7060a..a5a3a9d9bedd 100644 --- a/src/node/transaction.cpp +++ b/src/node/transaction.cpp @@ -74,13 +74,14 @@ TransactionError BroadcastTransaction(NodeContext& node, wtxid = mempool_tx->GetWitnessHash(); } else { // Transaction is not already in the mempool. - if (max_tx_fee > 0) { + const bool check_max_fee{max_tx_fee > 0}; + if (check_max_fee || broadcast_method == TxBroadcast::NO_MEMPOOL_PRIVATE_BROADCAST) { // First, call ATMP with test_accept and check the fee. If ATMP // fails here, return error immediately. const MempoolAcceptResult result = node.chainman->ProcessTransaction(tx, /*test_accept=*/ true); if (result.m_result_type != MempoolAcceptResult::ResultType::VALID) { return HandleATMPError(result.m_state, err_string); - } else if (result.m_base_fees.value() > max_tx_fee) { + } else if (check_max_fee && result.m_base_fees.value() > max_tx_fee) { return TransactionError::MAX_FEE_EXCEEDED; } } @@ -104,6 +105,8 @@ TransactionError BroadcastTransaction(NodeContext& node, node.mempool->AddUnbroadcastTx(txid); } break; + case TxBroadcast::NO_MEMPOOL_PRIVATE_BROADCAST: + break; } if (wait_callback && node.validation_signals) { @@ -135,6 +138,8 @@ TransactionError BroadcastTransaction(NodeContext& node, case TxBroadcast::MEMPOOL_AND_BROADCAST_TO_ALL: node.peerman->InitiateTxBroadcastToAll(txid, wtxid); break; + case TxBroadcast::NO_MEMPOOL_PRIVATE_BROADCAST: + break; } return TransactionError::OK; diff --git a/src/node/types.h b/src/node/types.h index 6c2687626c98..bf11c2cba554 100644 --- a/src/node/types.h +++ b/src/node/types.h @@ -108,6 +108,9 @@ enum class TxBroadcast : uint8_t { MEMPOOL_AND_BROADCAST_TO_ALL, /// Add the transaction to the mempool, but don't broadcast to anybody. MEMPOOL_NO_BROADCAST, + /// Omit the mempool and directly send the transaction via a few dedicated connections to + /// peers on privacy networks. + NO_MEMPOOL_PRIVATE_BROADCAST, }; } // namespace node diff --git a/src/wallet/wallet.cpp b/src/wallet/wallet.cpp index 86474a456d7b..7f32c8368adf 100644 --- a/src/wallet/wallet.cpp +++ b/src/wallet/wallet.cpp @@ -1983,6 +1983,9 @@ bool CWallet::SubmitTxMemoryPoolAndRelay(CWalletTx& wtx, case node::TxBroadcast::MEMPOOL_NO_BROADCAST: what = "to mempool without broadcast"; break; + case node::TxBroadcast::NO_MEMPOOL_PRIVATE_BROADCAST: + what = "for private broadcast without adding to the mempool"; + break; } WalletLogPrintf("Submitting wtx %s %s\n", wtx.GetHash().ToString(), what); // We must set TxStateInMempool here. Even though it will also be set later by the From cb5376ab4c60923b045afb3b4c0426a3352b1cd9 Mon Sep 17 00:00:00 2001 From: Vasil Dimov Date: Tue, 30 Jan 2024 10:01:24 +0100 Subject: [PATCH 07/13] net_processing: store transactions for private broadcast in PeerManager Extend `PeerManager` with a transaction storage and a new method `ScheduleTxForPrivateBroadcast()` which: * adds a transaction to that storage and * calls `CConnman::PrivateBroadcastAdd()` to open dedicated privacy connections that will pick an entry from the transaction storage and broadcast it. --- src/CMakeLists.txt | 1 + src/net_processing.cpp | 26 +++++++++++++++++ src/net_processing.h | 6 ++++ src/node/transaction.cpp | 1 + src/private_broadcast.cpp | 16 +++++++++++ src/private_broadcast.h | 59 +++++++++++++++++++++++++++++++++++++++ 6 files changed, 109 insertions(+) create mode 100644 src/private_broadcast.cpp create mode 100644 src/private_broadcast.h diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 9df51eb9edc5..0a6797347f85 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -244,6 +244,7 @@ add_library(bitcoin_node STATIC EXCLUDE_FROM_ALL policy/rbf.cpp policy/settings.cpp policy/truc_policy.cpp + private_broadcast.cpp rest.cpp rpc/blockchain.cpp rpc/external_signer.cpp diff --git a/src/net_processing.cpp b/src/net_processing.cpp index e32cdb53aef4..5482f0f1d75d 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -44,6 +44,7 @@ #include #include #include +#include #include #include #include @@ -195,6 +196,8 @@ static constexpr double MAX_ADDR_RATE_PER_SECOND{0.1}; static constexpr size_t MAX_ADDR_PROCESSING_TOKEN_BUCKET{MAX_ADDR_TO_SEND}; /** The compactblocks version we support. See BIP 152. */ static constexpr uint64_t CMPCTBLOCKS_VERSION{2}; +/** For private broadcast, send a transaction to this many peers. */ +static constexpr size_t NUM_PRIVATE_BROADCAST_PER_TX{3}; // Internal stuff namespace { @@ -537,6 +540,7 @@ class PeerManagerImpl final : public PeerManager PeerManagerInfo GetInfo() const override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); void SendPings() override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); void InitiateTxBroadcastToAll(const Txid& txid, const Wtxid& wtxid) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); + void InitiateTxBroadcastPrivate(const CTransactionRef& tx) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); void SetBestBlock(int height, std::chrono::seconds time) override { m_best_height = height; @@ -1069,6 +1073,9 @@ class PeerManagerImpl final : public PeerManager void PushAddress(Peer& peer, const CAddress& addr) EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex); void LogBlockHeader(const CBlockIndex& index, const CNode& peer, bool via_compact_block); + + /// A list of transactions to be broadcast privately. + PrivateBroadcast m_tx_for_private_broadcast; }; const CNodeState* PeerManagerImpl::State(NodeId pnode) const @@ -2146,6 +2153,25 @@ void PeerManagerImpl::InitiateTxBroadcastToAll(const Txid& txid, const Wtxid& wt } } +void PeerManagerImpl::InitiateTxBroadcastPrivate(const CTransactionRef& tx) +{ + if (m_tx_for_private_broadcast.Add(tx)) { + LogDebug(BCLog::PRIVATE_BROADCAST, + "Requesting %d new connections due to txid=%s, wtxid=%s", + NUM_PRIVATE_BROADCAST_PER_TX, + tx->GetHash().ToString(), + tx->GetWitnessHash().ToString()); + + m_connman.m_private_broadcast.NumToOpenAdd(NUM_PRIVATE_BROADCAST_PER_TX); + } else { + LogDebug( + BCLog::PRIVATE_BROADCAST, + "Ignoring unnecessary request to schedule an already scheduled transaction: txid=%s, wtxid=%s", + tx->GetHash().ToString(), + tx->GetWitnessHash().ToString()); + } +} + void PeerManagerImpl::RelayAddress(NodeId originator, const CAddress& addr, bool fReachable) diff --git a/src/net_processing.h b/src/net_processing.h index afb096987c2c..4e2784013cf7 100644 --- a/src/net_processing.h +++ b/src/net_processing.h @@ -124,6 +124,12 @@ class PeerManager : public CValidationInterface, public NetEventsInterface */ virtual void InitiateTxBroadcastToAll(const Txid& txid, const Wtxid& wtxid) = 0; + /** + * Initiate a private transaction broadcast. This is done + * asynchronously via short-lived connections to peers on privacy networks. + */ + virtual void InitiateTxBroadcastPrivate(const CTransactionRef& tx) = 0; + /** Send ping message to all peers */ virtual void SendPings() = 0; diff --git a/src/node/transaction.cpp b/src/node/transaction.cpp index a5a3a9d9bedd..8b9eb80dbc08 100644 --- a/src/node/transaction.cpp +++ b/src/node/transaction.cpp @@ -139,6 +139,7 @@ TransactionError BroadcastTransaction(NodeContext& node, node.peerman->InitiateTxBroadcastToAll(txid, wtxid); break; case TxBroadcast::NO_MEMPOOL_PRIVATE_BROADCAST: + node.peerman->InitiateTxBroadcastPrivate(tx); break; } diff --git a/src/private_broadcast.cpp b/src/private_broadcast.cpp new file mode 100644 index 000000000000..47e012b74580 --- /dev/null +++ b/src/private_broadcast.cpp @@ -0,0 +1,16 @@ +// Copyright (c) 2023-present The Bitcoin Core developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or https://opensource.org/license/mit/. + +#include + +bool PrivateBroadcast::Add(const CTransactionRef& tx) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) +{ + const Txid& txid = tx->GetHash(); + LOCK(m_mutex); + auto [pos, inserted] = m_by_txid.emplace(txid, TxWithPriority{.tx = tx, .priority = Priority{}}); + if (inserted) { + m_by_priority.emplace(Priority{}, txid); + } + return inserted; +} diff --git a/src/private_broadcast.h b/src/private_broadcast.h new file mode 100644 index 000000000000..1029648224a0 --- /dev/null +++ b/src/private_broadcast.h @@ -0,0 +1,59 @@ +// Copyright (c) 2023-present The Bitcoin Core developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or https://opensource.org/license/mit/. + +#ifndef BITCOIN_PRIVATE_BROADCAST_H +#define BITCOIN_PRIVATE_BROADCAST_H + +#include +#include +#include +#include +#include + +#include +#include + +/** + * Store a list of transactions to be broadcast privately. Supports the following operations: + * - Add a new transaction + */ +class PrivateBroadcast +{ +public: + /** + * Add a transaction to the storage. + * @param[in] tx The transaction to add. + * @retval true The transaction was added. + * @retval false The transaction was already present. + */ + bool Add(const CTransactionRef& tx) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex); + +private: + struct Priority { + // Note: operator<=>() depends on the declaration order. + size_t num_broadcasted{0}; + NodeClock::time_point last_broadcasted{}; + + auto operator<=>(const Priority&) const = default; + }; + + struct TxWithPriority { + CTransactionRef tx; + Priority priority; + }; + + using ByTxid = std::unordered_map; + using ByPriority = std::multimap; + + struct Iterators { + ByTxid::iterator by_txid; + ByPriority::iterator by_priority; + }; + + mutable Mutex m_mutex; + ByTxid m_by_txid GUARDED_BY(m_mutex); + ByPriority m_by_priority GUARDED_BY(m_mutex); +}; + +#endif // BITCOIN_PRIVATE_BROADCAST_H From d99b3ea5f07afe55dabd5cb698d31ccbb1e0a474 Mon Sep 17 00:00:00 2001 From: Vasil Dimov Date: Thu, 21 Dec 2023 13:24:30 +0100 Subject: [PATCH 08/13] net_processing: reorder the code that handles the VERSION message Change the order in which code snippets are executed as a result of receiving the `VERSION` message. Move the snippets that do `MakeAndPushMessage()` near the end. This makes it easier to interrupt the execution when no messages should be sent as a response to the `VERSION` messages, in private broadcast connections. This is a non-functional change. --- src/net_processing.cpp | 38 +++++++++++++++++++------------------- 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/src/net_processing.cpp b/src/net_processing.cpp index 5482f0f1d75d..405b6ec9e37e 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -3541,19 +3541,6 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, pfrom.SetCommonVersion(greatest_common_version); pfrom.nVersion = nVersion; - if (greatest_common_version >= WTXID_RELAY_VERSION) { - MakeAndPushMessage(pfrom, NetMsgType::WTXIDRELAY); - } - - // Signal ADDRv2 support (BIP155). - if (greatest_common_version >= 70016) { - // BIP155 defines addrv2 and sendaddrv2 for all protocol versions, but some - // implementations reject messages they don't know. As a courtesy, don't send - // it to nodes with a version before 70016, as no software is known to support - // BIP155 that doesn't announce at least that protocol version number. - MakeAndPushMessage(pfrom, NetMsgType::SENDADDRV2); - } - pfrom.m_has_all_wanted_services = HasAllDesirableServiceFlags(nServices); peer->m_their_services = nServices; pfrom.SetAddrLocal(addrMe); @@ -3580,6 +3567,25 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, if (fRelay) pfrom.m_relays_txs = true; } + const auto mapped_as{m_connman.GetMappedAS(pfrom.addr)}; + LogDebug(BCLog::NET, "receive version message: %s: version %d, blocks=%d, us=%s, txrelay=%d, peer=%d%s%s\n", + cleanSubVer, pfrom.nVersion, + peer->m_starting_height, addrMe.ToStringAddrPort(), fRelay, pfrom.GetId(), + pfrom.LogIP(fLogIPs), (mapped_as ? strprintf(", mapped_as=%d", mapped_as) : "")); + + if (greatest_common_version >= WTXID_RELAY_VERSION) { + MakeAndPushMessage(pfrom, NetMsgType::WTXIDRELAY); + } + + // Signal ADDRv2 support (BIP155). + if (greatest_common_version >= 70016) { + // BIP155 defines addrv2 and sendaddrv2 for all protocol versions, but some + // implementations reject messages they don't know. As a courtesy, don't send + // it to nodes with a version before 70016, as no software is known to support + // BIP155 that doesn't announce at least that protocol version number. + MakeAndPushMessage(pfrom, NetMsgType::SENDADDRV2); + } + if (greatest_common_version >= WTXID_RELAY_VERSION && m_txreconciliation) { // Per BIP-330, we announce txreconciliation support if: // - protocol version per the peer's VERSION message supports WTXID_RELAY; @@ -3645,12 +3651,6 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, m_addrman.Good(pfrom.addr); } - const auto mapped_as{m_connman.GetMappedAS(pfrom.addr)}; - LogDebug(BCLog::NET, "receive version message: %s: version %d, blocks=%d, us=%s, txrelay=%d, peer=%d%s%s\n", - cleanSubVer, pfrom.nVersion, - peer->m_starting_height, addrMe.ToStringAddrPort(), fRelay, pfrom.GetId(), - pfrom.LogIP(fLogIPs), (mapped_as ? strprintf(", mapped_as=%d", mapped_as) : "")); - peer->m_time_offset = NodeSeconds{std::chrono::seconds{nTime}} - Now(); if (!pfrom.IsInboundConn()) { // Don't use timedata samples from inbound peers to make it From 676626bbf0b982b3ec2d87befdd9e8e6cb84e3c3 Mon Sep 17 00:00:00 2001 From: Vasil Dimov Date: Wed, 19 Apr 2023 15:40:30 +0200 Subject: [PATCH 09/13] net_processing: handle ConnectionType::PRIVATE_BROADCAST connections For connections of type `ConnectionType::PRIVATE_BROADCAST`: * After receiving VERACK, relay a transaction from the list of transactions for private broadcast and disconnect * Don't process any messages after VERACK * Don't send any messages other than the minimum required for the transaction relay --- src/net.cpp | 27 +++- src/net_processing.cpp | 254 ++++++++++++++++++++++++++++++++------ src/private_broadcast.cpp | 87 +++++++++++++ src/private_broadcast.h | 44 +++++++ 4 files changed, 375 insertions(+), 37 deletions(-) diff --git a/src/net.cpp b/src/net.cpp index 5c7d097ab306..c258dcf94e9b 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -354,7 +354,16 @@ bool CConnman::CheckIncomingNonce(uint64_t nonce) { LOCK(m_nodes_mutex); for (const CNode* pnode : m_nodes) { - if (!pnode->fSuccessfullyConnected && !pnode->IsInboundConn() && pnode->GetLocalNonce() == nonce) + // Omit private broadcast connections from this check to prevent this privacy attack: + // - We connect to a peer in an attempt to privately broadcast a transaction. From our + // VERSION message the peer deducts that this is a short-lived connection for + // broadcasting a transaction, takes our nonce and delays their VERACK. + // - The peer starts connecting to (clearnet) nodes and sends them a VERSION message + // which contains our nonce. If the peer manages to connect to us we would disconnect. + // - Upon a disconnect, the peer knows our clearnet address. They go back to the short + // lived privacy broadcast connection and continue with VERACK. + if (!pnode->fSuccessfullyConnected && !pnode->IsInboundConn() && !pnode->IsPrivateBroadcastConn() && + pnode->GetLocalNonce() == nonce) return false; } return true; @@ -4058,6 +4067,22 @@ void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg) { AssertLockNotHeld(m_total_bytes_sent_mutex); + if (pnode->IsPrivateBroadcastConn() && + msg.m_type != NetMsgType::VERSION && + msg.m_type != NetMsgType::VERACK && + msg.m_type != NetMsgType::INV && + msg.m_type != NetMsgType::TX && + msg.m_type != NetMsgType::PING) { + // Ensure private broadcast connections only send the above message types. + // Others are not needed and may degrade privacy. + LogDebug(BCLog::PRIVATE_BROADCAST, + "Omitting send of message '%s', peer=%d%s", + msg.m_type, + pnode->GetId(), + pnode->LogIP(fLogIPs)); + return; + } + if (!m_private_broadcast.m_outbound_tor_ok_at_least_once.load() && !pnode->IsInboundConn() && pnode->addr.IsTor() && msg.m_type == NetMsgType::VERACK) { // If we are sending the peer VERACK that means we successfully sent diff --git a/src/net_processing.cpp b/src/net_processing.cpp index 405b6ec9e37e..00337fc8636a 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -198,6 +198,8 @@ static constexpr size_t MAX_ADDR_PROCESSING_TOKEN_BUCKET{MAX_ADDR_TO_SEND}; static constexpr uint64_t CMPCTBLOCKS_VERSION{2}; /** For private broadcast, send a transaction to this many peers. */ static constexpr size_t NUM_PRIVATE_BROADCAST_PER_TX{3}; +/** Private broadcast connections must complete within this time. Disconnect the peer if it takes longer. */ +static constexpr auto PRIVATE_BROADCAST_MAX_CONNECTION_LIFETIME{3min}; // Internal stuff namespace { @@ -722,8 +724,8 @@ class PeerManagerImpl final : public PeerManager /** Send a version message to a peer */ void PushNodeVersion(CNode& pnode, const Peer& peer); - /** Send a ping message every PING_INTERVAL or if requested via RPC. May - * mark the peer to be disconnected if a ping has timed out. + /** Send a ping message every PING_INTERVAL or if requested via RPC (peer.m_ping_queued is true). + * May mark the peer to be disconnected if a ping has timed out. * We use mockable time for ping timeouts, so setmocktime may cause pings * to time out. */ void MaybeSendPing(CNode& node_to, Peer& peer, std::chrono::microseconds now); @@ -961,6 +963,14 @@ class PeerManagerImpl final : public PeerManager void ProcessCompactBlockTxns(CNode& pfrom, Peer& peer, const BlockTransactions& block_transactions) EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex, !m_most_recent_block_mutex); + /** + * Schedule an INV for a transaction to be sent to the given peer (via `PushMessage()`). + * The transaction is picked from the list of transactions for private broadcast. + * It is assumed that the connection to the peer is `ConnectionType::PRIVATE_BROADCAST`. + * Calling this for other peers will degrade privacy. Don't do that. + */ + void PushPrivateBroadcastTx(CNode& node) EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex, !m_most_recent_block_mutex); + /** * When a peer sends us a valid block, instruct it to announce blocks to us * using CMPCTBLOCK if possible by adding its nodeid to the end of @@ -1527,26 +1537,64 @@ void PeerManagerImpl::FindNextBlocks(std::vector& vBlocks, c void PeerManagerImpl::PushNodeVersion(CNode& pnode, const Peer& peer) { - uint64_t my_services{peer.m_our_services}; - const int64_t nTime{count_seconds(GetTime())}; - uint64_t nonce = pnode.GetLocalNonce(); - const int nNodeStartingHeight{m_best_height}; - NodeId nodeid = pnode.GetId(); - CAddress addr = pnode.addr; - - CService addr_you = addr.IsRoutable() && !IsProxy(addr) && addr.IsAddrV1Compatible() ? addr : CService(); - uint64_t your_services{addr.nServices}; - - const bool tx_relay{!RejectIncomingTxs(pnode)}; - MakeAndPushMessage(pnode, NetMsgType::VERSION, PROTOCOL_VERSION, my_services, nTime, - your_services, CNetAddr::V1(addr_you), // Together the pre-version-31402 serialization of CAddress "addrYou" (without nTime) - my_services, CNetAddr::V1(CService{}), // Together the pre-version-31402 serialization of CAddress "addrMe" (without nTime) - nonce, strSubVersion, nNodeStartingHeight, tx_relay); - + uint64_t my_services; + int64_t my_time; + uint64_t your_services; + CService your_addr; + std::string my_user_agent; + int my_height; + bool my_tx_relay; + if (pnode.IsPrivateBroadcastConn()) { + my_services = NODE_NONE; + my_time = 0; + your_services = NODE_NONE; + your_addr = CService{}; + my_user_agent = "/pynode:0.0.1/"; // Use a constant other than the default (or user-configured). See https://github.com/bitcoin/bitcoin/pull/27509#discussion_r1214671917 + my_height = 0; + my_tx_relay = false; + } else { + CAddress addr{pnode.addr}; + + my_services = peer.m_our_services; + my_time = count_seconds(GetTime()); + your_services = addr.nServices; + your_addr = addr.IsRoutable() && !IsProxy(addr) && addr.IsAddrV1Compatible() ? CService{addr} : CService{}; + my_user_agent = strSubVersion; + my_height = m_best_height; + my_tx_relay = !RejectIncomingTxs(pnode); + } + + MakeAndPushMessage( + pnode, + NetMsgType::VERSION, + PROTOCOL_VERSION, + my_services, + my_time, + // your_services + CNetAddr::V1(your_addr) is the pre-version-31402 serialization of your_addr (without nTime) + your_services, CNetAddr::V1(your_addr), + // same, for a dummy address + my_services, CNetAddr::V1(CService{}), + pnode.GetLocalNonce(), + my_user_agent, + my_height, + my_tx_relay); + + const NodeId nodeid{pnode.GetId()}; if (fLogIPs) { - LogDebug(BCLog::NET, "send version message: version %d, blocks=%d, them=%s, txrelay=%d, peer=%d\n", PROTOCOL_VERSION, nNodeStartingHeight, addr_you.ToStringAddrPort(), tx_relay, nodeid); + LogDebug(BCLog::NET, + "send version message: version %d, blocks=%d, them=%s, txrelay=%d, peer=%d", + PROTOCOL_VERSION, + my_height, + your_addr.ToStringAddrPort(), + my_tx_relay, + nodeid); } else { - LogDebug(BCLog::NET, "send version message: version %d, blocks=%d, txrelay=%d, peer=%d\n", PROTOCOL_VERSION, nNodeStartingHeight, tx_relay, nodeid); + LogDebug(BCLog::NET, + "send version message: version %d, blocks=%d, txrelay=%d, peer=%d", + PROTOCOL_VERSION, + my_height, + my_tx_relay, + nodeid); } } @@ -1654,16 +1702,25 @@ void PeerManagerImpl::FinalizeNode(const CNode& node) } } // cs_main if (node.fSuccessfullyConnected && - !node.IsBlockOnlyConn() && !node.IsInboundConn()) { + !node.IsBlockOnlyConn() && !node.IsPrivateBroadcastConn() && !node.IsInboundConn()) { // Only change visible addrman state for full outbound peers. We don't // call Connected() for feeler connections since they don't have - // fSuccessfullyConnected set. + // fSuccessfullyConnected set. Also don't call Connected() for private broadcast + // connections since they could leak information in addrman. m_addrman.Connected(node.addr); } { LOCK(m_headers_presync_mutex); m_headers_presync_stats.erase(nodeid); } + if (node.IsPrivateBroadcastConn()) { + // FinishBroadcast() is called when we get a PONG from the peer which means that the send + // has concluded successfully. Call FinishBroadcast() here as well in case we did not call + // it before (unsuccessful, never concluded with the reception of a PONG). + if (m_tx_for_private_broadcast.FinishBroadcast(nodeid, /*confirmed_by_node=*/false)) { + m_connman.m_private_broadcast.NumToOpenAdd(1); + } + } LogDebug(BCLog::NET, "Cleared nodestate for peer=%d\n", nodeid); } @@ -3443,6 +3500,34 @@ void PeerManagerImpl::LogBlockHeader(const CBlockIndex& index, const CNode& peer } } +void PeerManagerImpl::PushPrivateBroadcastTx(CNode& node) +{ + Assume(node.IsPrivateBroadcastConn()); + + auto opt_tx = m_tx_for_private_broadcast.GetTxForBroadcast(); + if (!opt_tx) { + LogDebug(BCLog::PRIVATE_BROADCAST, + "Disconnecting: no more transactions for private broadcast (connected in vain), peer=%d%s", + node.GetId(), + node.LogIP(fLogIPs)); + node.fDisconnect = true; + return; + } + const CTransactionRef& tx{*opt_tx}; + + LogPrintLevel(BCLog::PRIVATE_BROADCAST, + BCLog::Level::Info, + "P2P handshake completed, sending INV for txid=%s%s, peer=%d%s", + tx->GetHash().ToString(), + tx->HasWitness() ? strprintf(", wtxid=%s", tx->GetWitnessHash().ToString()) : "", + node.GetId(), + node.LogIP(fLogIPs)); + + MakeAndPushMessage(node, NetMsgType::INV, std::vector{{CInv{MSG_TX, tx->GetHash().ToUint256()}}}); + + m_tx_for_private_broadcast.PushedToNode(node.GetId(), tx->GetHash()); +} + void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, DataStream& vRecv, const std::chrono::microseconds time_received, const std::atomic& interruptMsgProc) @@ -3573,6 +3658,20 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, peer->m_starting_height, addrMe.ToStringAddrPort(), fRelay, pfrom.GetId(), pfrom.LogIP(fLogIPs), (mapped_as ? strprintf(", mapped_as=%d", mapped_as) : "")); + if (pfrom.IsPrivateBroadcastConn()) { + if (fRelay) { + MakeAndPushMessage(pfrom, NetMsgType::VERACK); + } else { + LogPrintLevel(BCLog::PRIVATE_BROADCAST, + BCLog::Level::Info, + "Disconnecting: does not support transactions relay (connected in vain), peer=%d%s", + pfrom.GetId(), + pfrom.LogIP(fLogIPs)); + pfrom.fDisconnect = true; + } + return; + } + if (greatest_common_version >= WTXID_RELAY_VERSION) { MakeAndPushMessage(pfrom, NetMsgType::WTXIDRELAY); } @@ -3697,6 +3796,31 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, (mapped_as ? strprintf(", mapped_as=%d", mapped_as) : "")); } + if (auto tx_relay = peer->GetTxRelay()) { + // `TxRelay::m_tx_inventory_to_send` must be empty before the + // version handshake is completed as + // `TxRelay::m_next_inv_send_time` is first initialised in + // `SendMessages` after the verack is received. Any transactions + // received during the version handshake would otherwise + // immediately be advertised without random delay, potentially + // leaking the time of arrival to a spy. + Assume(WITH_LOCK( + tx_relay->m_tx_inventory_mutex, + return tx_relay->m_tx_inventory_to_send.empty() && + tx_relay->m_next_inv_send_time == 0s)); + } + + if (pfrom.IsPrivateBroadcastConn()) { + pfrom.fSuccessfullyConnected = true; + // The peer may intend to later send us NetMsgType::FEEFILTER limiting + // cheap transactions, but we don't wait for that and thus we may send + // them a transaction below their threshold. This is ok because this + // relay logic is designed to work even in cases when the peer drops + // the transaction (due to it being too cheap, or for other reasons). + PushPrivateBroadcastTx(pfrom); + return; + } + if (pfrom.GetCommonVersion() >= SHORT_IDS_BLOCKS_VERSION) { // Tell our peer we are willing to provide version 2 cmpctblocks. // However, we do not request new block announcements using @@ -3715,20 +3839,6 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, } } - if (auto tx_relay = peer->GetTxRelay()) { - // `TxRelay::m_tx_inventory_to_send` must be empty before the - // version handshake is completed as - // `TxRelay::m_next_inv_send_time` is first initialised in - // `SendMessages` after the verack is received. Any transactions - // received during the version handshake would otherwise - // immediately be advertised without random delay, potentially - // leaking the time of arrival to a spy. - Assume(WITH_LOCK( - tx_relay->m_tx_inventory_mutex, - return tx_relay->m_tx_inventory_to_send.empty() && - tx_relay->m_next_inv_send_time == 0s)); - } - { LOCK2(::cs_main, m_tx_download_mutex); const CNodeState* state = State(pfrom.GetId()); @@ -3862,6 +3972,17 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, return; } + if (pfrom.IsPrivateBroadcastConn()) { + if (msg_type != NetMsgType::PONG && msg_type != NetMsgType::GETDATA) { + LogDebug(BCLog::PRIVATE_BROADCAST, + "Ignoring incoming message '%s', peer=%d%s", + msg_type, + pfrom.GetId(), + pfrom.LogIP(fLogIPs)); + return; + } + } + if (msg_type == NetMsgType::ADDR || msg_type == NetMsgType::ADDRV2) { const auto ser_params{ msg_type == NetMsgType::ADDRV2 ? @@ -4065,6 +4186,39 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, LogDebug(BCLog::NET, "received getdata for: %s peer=%d\n", vInv[0].ToString(), pfrom.GetId()); } + if (pfrom.IsPrivateBroadcastConn()) { + const auto pushed_tx_opt = m_tx_for_private_broadcast.GetTxPushedToNode(pfrom.GetId()); + if (!pushed_tx_opt) { + LogPrintLevel(BCLog::PRIVATE_BROADCAST, + BCLog::Level::Info, + "Disconnecting: got GETDATA without sending an INV, peer=%d%s", + pfrom.GetId(), + fLogIPs ? strprintf(", peeraddr=%s", pfrom.addr.ToStringAddrPort()) : ""); + pfrom.fDisconnect = true; + return; + } + + const CTransactionRef& pushed_tx{*pushed_tx_opt}; + + // The GETDATA request must contain exactly one inv and it must be for the transaction + // that we INVed to the peer earlier. + if (vInv.size() == 1 && vInv[0].IsMsgTx() && vInv[0].hash == pushed_tx->GetHash().ToUint256()) { + + MakeAndPushMessage(pfrom, NetMsgType::TX, TX_WITH_WITNESS(*pushed_tx)); + + peer->m_ping_queued = true; // Ensure a ping will be sent: mimic a request via RPC. + MaybeSendPing(pfrom, *peer, GetTime()); + } else { + LogPrintLevel(BCLog::PRIVATE_BROADCAST, + BCLog::Level::Info, + "Disconnecting: got an unexpected GETDATA message, peer=%d%s", + pfrom.GetId(), + fLogIPs ? strprintf(", peeraddr=%s", pfrom.addr.ToStringAddrPort()) : ""); + pfrom.fDisconnect = true; + } + return; + } + { LOCK(peer->m_getdata_requests_mutex); peer->m_getdata_requests.insert(peer->m_getdata_requests.end(), vInv.begin(), vInv.end()); @@ -4806,6 +4960,17 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, if (ping_time.count() >= 0) { // Let connman know about this successful ping-pong pfrom.PongReceived(ping_time); + if (pfrom.IsPrivateBroadcastConn()) { + m_tx_for_private_broadcast.FinishBroadcast(pfrom.GetId(), /*confirmed_by_node=*/true); + LogPrintLevel( + BCLog::PRIVATE_BROADCAST, + BCLog::Level::Info, + "Got a PONG (the transaction will probably reach the network), " + "marking for disconnect, peer=%d%s", + pfrom.GetId(), + pfrom.LogIP(fLogIPs)); + pfrom.fDisconnect = true; + } } else { // This should never happen sProblem = "Timing mishap"; @@ -5513,6 +5678,23 @@ bool PeerManagerImpl::SendMessages(CNode* pto) const auto current_time{GetTime()}; + // The logic below does not apply to private broadcast peers, so skip it. + // Also in CConnman::PushMessage() we make sure that unwanted messages are + // not sent. This here is just an optimization. + if (pto->IsPrivateBroadcastConn()) { + if (pto->m_connected + PRIVATE_BROADCAST_MAX_CONNECTION_LIFETIME < current_time) { + LogPrintLevel( + BCLog::PRIVATE_BROADCAST, + BCLog::Level::Info, + "Disconnecting: did not complete the transaction send within %d seconds, peer=%d%s", + std::chrono::duration_cast(PRIVATE_BROADCAST_MAX_CONNECTION_LIFETIME).count(), + pto->GetId(), + pto->LogIP(fLogIPs)); + pto->fDisconnect = true; + } + return true; + } + if (pto->IsAddrFetchConn() && current_time - pto->m_connected > 10 * AVG_ADDRESS_BROADCAST_INTERVAL) { LogDebug(BCLog::NET, "addrfetch connection timeout, %s\n", pto->DisconnectMsg(fLogIPs)); pto->fDisconnect = true; diff --git a/src/private_broadcast.cpp b/src/private_broadcast.cpp index 47e012b74580..78dc147905ea 100644 --- a/src/private_broadcast.cpp +++ b/src/private_broadcast.cpp @@ -3,6 +3,7 @@ // file COPYING or https://opensource.org/license/mit/. #include +#include bool PrivateBroadcast::Add(const CTransactionRef& tx) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) { @@ -14,3 +15,89 @@ bool PrivateBroadcast::Add(const CTransactionRef& tx) EXCLUSIVE_LOCKS_REQUIRED(! } return inserted; } + +std::optional PrivateBroadcast::GetTxForBroadcast() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) +{ + LOCK(m_mutex); + if (m_by_priority.empty()) { + return std::nullopt; + } + const Txid& txid = m_by_priority.begin()->second; + auto it = m_by_txid.find(txid); + if (Assume(it != m_by_txid.end())) { + return it->second.tx; + } + m_by_priority.erase(m_by_priority.begin()); + return std::nullopt; +} + +void PrivateBroadcast::PushedToNode(const NodeId& nodeid, const Txid& txid) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) +{ + LOCK(m_mutex); + m_by_nodeid.emplace(nodeid, txid); +} + +std::optional PrivateBroadcast::GetTxPushedToNode(const NodeId& nodeid) const + EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) +{ + LOCK(m_mutex); + + auto it_by_node = m_by_nodeid.find(nodeid); + if (it_by_node == m_by_nodeid.end()) { + return std::nullopt; + } + const Txid txid{it_by_node->second}; + + auto it_by_txid = m_by_txid.find(txid); + if (it_by_txid == m_by_txid.end()) { + return std::nullopt; + } + return it_by_txid->second.tx; +} + +bool PrivateBroadcast::FinishBroadcast(const NodeId& nodeid, bool confirmed_by_node) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) +{ + LOCK(m_mutex); + const auto handle{m_by_nodeid.extract(nodeid)}; + if (!handle) { + return false; + } + + const Txid& txid{handle.mapped()}; + + auto iters{Find(txid)}; + + if (!iters.has_value()) { + return false; + } + + if (confirmed_by_node) { + // Update broadcast stats, since txid was found and its reception is confirmed by the node. + Priority& priority = iters->by_txid->second.priority; + + ++priority.num_broadcasted; + priority.last_broadcasted = NodeClock::now(); + + // Remove and re-add the entry in the m_by_priority map because we have changed the key. + m_by_priority.erase(iters->by_priority); + m_by_priority.emplace(priority, txid); + } + + return true; +} + +std::optional PrivateBroadcast::Find(const Txid& txid) EXCLUSIVE_LOCKS_REQUIRED(m_mutex) +{ + AssertLockHeld(m_mutex); + auto i = m_by_txid.find(txid); + if (i == m_by_txid.end()) { + return std::nullopt; + } + const Priority& priority = i->second.priority; + for (auto j = m_by_priority.lower_bound(priority); j != m_by_priority.end(); ++j) { + if (j->second == txid) { + return Iterators{.by_txid = i, .by_priority = j}; + } + } + return std::nullopt; +} diff --git a/src/private_broadcast.h b/src/private_broadcast.h index 1029648224a0..7c931b1eb1b4 100644 --- a/src/private_broadcast.h +++ b/src/private_broadcast.h @@ -5,18 +5,23 @@ #ifndef BITCOIN_PRIVATE_BROADCAST_H #define BITCOIN_PRIVATE_BROADCAST_H +#include #include +#include #include #include #include #include #include +#include #include /** * Store a list of transactions to be broadcast privately. Supports the following operations: * - Add a new transaction + * - Mark a broadcast of a transaction (remember when and how many times) + * - Get a transaction for broadcast, the one that has been broadcast fewer times and least recently */ class PrivateBroadcast { @@ -29,6 +34,33 @@ class PrivateBroadcast */ bool Add(const CTransactionRef& tx) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex); + /** + * Get the transaction that has been broadcast fewest times and least recently. + */ + std::optional GetTxForBroadcast() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex); + + /** + * Mark a transaction as pushed to a given node. This is an intermediate state before + * we get a PONG from the node which would confirm that the transaction has been received. + * At the time we get the PONG we need to know which transaction we sent to that node, + * so that we can account how many times we broadcast each transaction. + */ + void PushedToNode(const NodeId& nodeid, const Txid& txid) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex); + + /** + * Get the transaction that was pushed to a given node by PushedToNode(). + */ + std::optional GetTxPushedToNode(const NodeId& nodeid) const + EXCLUSIVE_LOCKS_REQUIRED(!m_mutex); + + /** + * Mark the end of a broadcast of a transaction. Either successful by receiving a PONG, + * or unsuccessful by closing the connection to the node without getting PONG. + * @return true if the reference by the given node id was removed and the transaction + * we tried to send to this node is still in the private broadcast pool. + */ + bool FinishBroadcast(const NodeId& nodeid, bool confirmed_by_node) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex); + private: struct Priority { // Note: operator<=>() depends on the declaration order. @@ -45,15 +77,27 @@ class PrivateBroadcast using ByTxid = std::unordered_map; using ByPriority = std::multimap; + using ByNodeId = std::unordered_map; struct Iterators { ByTxid::iterator by_txid; ByPriority::iterator by_priority; }; + /** + * Get iterators in `m_by_txid` and `m_by_priority` for a given transaction. + */ + std::optional Find(const Txid& txid) EXCLUSIVE_LOCKS_REQUIRED(m_mutex); + mutable Mutex m_mutex; ByTxid m_by_txid GUARDED_BY(m_mutex); ByPriority m_by_priority GUARDED_BY(m_mutex); + + /** + * Remember which transaction was sent to which node, so that when we get the PONG + * from that node we can mark the transaction as broadcast. + */ + ByNodeId m_by_nodeid GUARDED_BY(m_mutex); }; #endif // BITCOIN_PRIVATE_BROADCAST_H From b42c0d92790c79134e3b1f8257697e2fc7d639ca Mon Sep 17 00:00:00 2001 From: Vasil Dimov Date: Tue, 30 Jan 2024 18:51:23 +0100 Subject: [PATCH 10/13] net_processing: stop private broadcast of a transaction after round-trip Remove the transaction from the list of transactions to broadcast after we receive it from the network. Only remove the transaction if it is the same as the one we sent: both txid and wtxid match. Don't remove transactions that have the same txid and different wtxid. Such transactions show that some of the private broadcast recipients malleated the witness and the transaction made it back to us. The witness could be either: * invalid, in which case the transaction will not be accepted in anybody's pool; or * valid, in which case either the original or the malleated transaction will make it to nodes' mempools and eventually be mined. Our response is to keep broadcasting the original. If the malleated transaction wins then we will eventually stop broadcasting the original when it gets stale and gets removed from the "to broadcast" storage cause it is not acceptable in our mempool. --- src/net_processing.cpp | 15 +++++++++++++++ src/private_broadcast.cpp | 13 +++++++++++++ src/private_broadcast.h | 8 ++++++++ 3 files changed, 36 insertions(+) diff --git a/src/net_processing.cpp b/src/net_processing.cpp index 00337fc8636a..ef76ca11080b 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -4456,6 +4456,21 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, const uint256& hash = peer->m_wtxid_relay ? wtxid.ToUint256() : txid.ToUint256(); AddKnownTx(*peer, hash); + if (auto num_broadcasted = m_tx_for_private_broadcast.Remove(ptx)) { + LogPrintLevel(BCLog::PRIVATE_BROADCAST, + BCLog::Level::Info, + "Received our privately broadcast transaction (txid=%s) from the " + "network from peer=%d%s; stopping private broadcast attempts", + txid.ToString(), + pfrom.GetId(), + pfrom.LogIP(fLogIPs)); + if (NUM_PRIVATE_BROADCAST_PER_TX > num_broadcasted.value()) { + // Not all of the initial NUM_PRIVATE_BROADCAST_PER_TX connections were needed. + // Tell CConnman it does not need to start the remaining ones. + m_connman.m_private_broadcast.NumToOpenSub(NUM_PRIVATE_BROADCAST_PER_TX - num_broadcasted.value()); + } + } + LOCK2(cs_main, m_tx_download_mutex); const auto& [should_validate, package_to_validate] = m_txdownloadman.ReceivedTx(pfrom.GetId(), ptx); diff --git a/src/private_broadcast.cpp b/src/private_broadcast.cpp index 78dc147905ea..db8ff19f5c0b 100644 --- a/src/private_broadcast.cpp +++ b/src/private_broadcast.cpp @@ -16,6 +16,19 @@ bool PrivateBroadcast::Add(const CTransactionRef& tx) EXCLUSIVE_LOCKS_REQUIRED(! return inserted; } +std::optional PrivateBroadcast::Remove(const CTransactionRef& tx) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) +{ + LOCK(m_mutex); + auto iters = Find(tx->GetHash()); + if (!iters || iters->by_txid->second.tx->GetWitnessHash() != tx->GetWitnessHash()) { + return std::nullopt; + } + const size_t num_broadcasted{iters->by_priority->first.num_broadcasted}; + m_by_priority.erase(iters->by_priority); + m_by_txid.erase(iters->by_txid); + return num_broadcasted; +} + std::optional PrivateBroadcast::GetTxForBroadcast() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) { LOCK(m_mutex); diff --git a/src/private_broadcast.h b/src/private_broadcast.h index 7c931b1eb1b4..0e0ee4ad437e 100644 --- a/src/private_broadcast.h +++ b/src/private_broadcast.h @@ -20,6 +20,7 @@ /** * Store a list of transactions to be broadcast privately. Supports the following operations: * - Add a new transaction + * - Remove a transaction, after it has been seen by the network * - Mark a broadcast of a transaction (remember when and how many times) * - Get a transaction for broadcast, the one that has been broadcast fewer times and least recently */ @@ -34,6 +35,13 @@ class PrivateBroadcast */ bool Add(const CTransactionRef& tx) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex); + /** + * Forget a transaction. + * @return the number of times the transaction was broadcast if the transaction existed and was removed, + * otherwise empty optional (the transaction was not in the storage). + */ + std::optional Remove(const CTransactionRef& tx) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex); + /** * Get the transaction that has been broadcast fewest times and least recently. */ From dad949f90473e94e9c7d109fe2000b060f255a8b Mon Sep 17 00:00:00 2001 From: Vasil Dimov Date: Tue, 30 Jan 2024 18:25:23 +0100 Subject: [PATCH 11/13] net_processing: retry private broadcast Periodically check for stale transactions in peerman and if found, reschedule new connections to be opened by connman for broadcasting them. --- src/net_processing.cpp | 69 +++++++++++++++++++++++++++++++++++---- src/private_broadcast.cpp | 17 ++++++++++ src/private_broadcast.h | 6 ++++ 3 files changed, 85 insertions(+), 7 deletions(-) diff --git a/src/net_processing.cpp b/src/net_processing.cpp index ef76ca11080b..af5e1c7b39a5 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -565,6 +565,9 @@ class PeerManagerImpl final : public PeerManager /** Retrieve unbroadcast transactions from the mempool and reattempt sending to peers */ void ReattemptInitialBroadcast(CScheduler& scheduler) EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); + /** Rebroadcast stale private transactions (already broadcast but not received back from the network). */ + void ReattemptPrivateBroadcast(CScheduler& scheduler); + /** Get a shared pointer to the Peer object. * May return an empty shared_ptr if the Peer object can't be found. */ PeerRef GetPeerRef(NodeId id) const EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); @@ -1625,6 +1628,13 @@ void PeerManagerImpl::InitializeNode(const CNode& node, ServiceFlags our_service } } +/** Calculate the delta time after which to run the next transactions broadcast. */ +static std::chrono::milliseconds NextTxBroadcast() +{ + // We add randomness on every cycle to avoid the possibility of P2P fingerprinting. + return 10min + FastRandomContext().randrange(5min); +} + void PeerManagerImpl::ReattemptInitialBroadcast(CScheduler& scheduler) { std::set unbroadcast_txids = m_mempool.GetUnbroadcastTxs(); @@ -1639,10 +1649,55 @@ void PeerManagerImpl::ReattemptInitialBroadcast(CScheduler& scheduler) } } - // Schedule next run for 10-15 minutes in the future. - // We add randomness on every cycle to avoid the possibility of P2P fingerprinting. - const auto delta = 10min + FastRandomContext().randrange(5min); - scheduler.scheduleFromNow([&] { ReattemptInitialBroadcast(scheduler); }, delta); + scheduler.scheduleFromNow([&] { ReattemptInitialBroadcast(scheduler); }, NextTxBroadcast()); +} + +void PeerManagerImpl::ReattemptPrivateBroadcast(CScheduler& scheduler) +{ + // The following heuristic is subject to races, but that is ok: if it overshoots, + // we will open some private connections in vain, if it undershoots, the stale + // transactions will be picked on the next run. + + size_t active_connections{0}; + m_connman.ForEachNode([&active_connections](const CNode* node) { + if (node->IsPrivateBroadcastConn()) { + ++active_connections; + } + }); + + const size_t to_open_connections{m_connman.m_private_broadcast.NumToOpen()}; + + // Remove stale transactions that are no longer relevant (e.g. already in + // the mempool or mined) and count the remaining ones. + size_t num_for_rebroadcast{0}; + const auto stale_txs = m_tx_for_private_broadcast.GetStale(); + { + LOCK(cs_main); + for (const auto& stale_tx : stale_txs) { + auto mempool_acceptable = m_chainman.ProcessTransaction(stale_tx, /*test_accept=*/true); + if (mempool_acceptable.m_result_type == MempoolAcceptResult::ResultType::VALID) { + LogDebug(BCLog::PRIVATE_BROADCAST, + "Reattempting broadcast of stale txid=%s wtxid=%s", + stale_tx->GetHash().ToString(), + stale_tx->GetWitnessHash().ToString()); + ++num_for_rebroadcast; + } else { + LogPrintLevel(BCLog::PRIVATE_BROADCAST, + BCLog::Level::Info, + "Giving up broadcast attempts for txid=%s wtxid=%s: %s", + stale_tx->GetHash().ToString(), + stale_tx->GetWitnessHash().ToString(), + mempool_acceptable.m_state.ToString()); + m_tx_for_private_broadcast.Remove(stale_tx); + } + } + } + + if (num_for_rebroadcast > active_connections + to_open_connections) { + m_connman.m_private_broadcast.NumToOpenAdd(num_for_rebroadcast - active_connections - to_open_connections); + } + + scheduler.scheduleFromNow([&] { ReattemptPrivateBroadcast(scheduler); }, NextTxBroadcast()); } void PeerManagerImpl::FinalizeNode(const CNode& node) @@ -1982,9 +2037,9 @@ void PeerManagerImpl::StartScheduledTasks(CScheduler& scheduler) static_assert(EXTRA_PEER_CHECK_INTERVAL < STALE_CHECK_INTERVAL, "peer eviction timer should be less than stale tip check timer"); scheduler.scheduleEvery([this] { this->CheckForStaleTipAndEvictPeers(); }, std::chrono::seconds{EXTRA_PEER_CHECK_INTERVAL}); - // schedule next run for 10-15 minutes in the future - const auto delta = 10min + FastRandomContext().randrange(5min); - scheduler.scheduleFromNow([&] { ReattemptInitialBroadcast(scheduler); }, delta); + scheduler.scheduleFromNow([&] { ReattemptInitialBroadcast(scheduler); }, NextTxBroadcast()); + + scheduler.scheduleFromNow([&] { ReattemptPrivateBroadcast(scheduler); }, NextTxBroadcast()); } void PeerManagerImpl::ActiveTipChange(const CBlockIndex& new_tip, bool is_ibd) diff --git a/src/private_broadcast.cpp b/src/private_broadcast.cpp index db8ff19f5c0b..2f4397d6d811 100644 --- a/src/private_broadcast.cpp +++ b/src/private_broadcast.cpp @@ -5,6 +5,10 @@ #include #include +/// If a transaction is not received back from the network for this duration +/// after it is broadcast, then we consider it stale / for rebroadcasting. +static constexpr auto STALE_DURATION{1min}; + bool PrivateBroadcast::Add(const CTransactionRef& tx) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) { const Txid& txid = tx->GetHash(); @@ -99,6 +103,19 @@ bool PrivateBroadcast::FinishBroadcast(const NodeId& nodeid, bool confirmed_by_n return true; } +std::vector PrivateBroadcast::GetStale() const EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) +{ + LOCK(m_mutex); + const auto stale_time = NodeClock::now() - STALE_DURATION; + std::vector stale; + for (const auto& [txid, tx_with_priority] : m_by_txid) { + if (tx_with_priority.priority.last_broadcasted < stale_time) { + stale.push_back(tx_with_priority.tx); + } + } + return stale; +} + std::optional PrivateBroadcast::Find(const Txid& txid) EXCLUSIVE_LOCKS_REQUIRED(m_mutex) { AssertLockHeld(m_mutex); diff --git a/src/private_broadcast.h b/src/private_broadcast.h index 0e0ee4ad437e..9702214421b5 100644 --- a/src/private_broadcast.h +++ b/src/private_broadcast.h @@ -16,6 +16,7 @@ #include #include #include +#include /** * Store a list of transactions to be broadcast privately. Supports the following operations: @@ -69,6 +70,11 @@ class PrivateBroadcast */ bool FinishBroadcast(const NodeId& nodeid, bool confirmed_by_node) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex); + /** + * Get the transactions that have not been broadcast recently. + */ + std::vector GetStale() const EXCLUSIVE_LOCKS_REQUIRED(!m_mutex); + private: struct Priority { // Note: operator<=>() depends on the declaration order. From 357f1f4f0379fb80a1187d56d2c19fa37b1082a9 Mon Sep 17 00:00:00 2001 From: Vasil Dimov Date: Thu, 1 Feb 2024 16:11:04 +0100 Subject: [PATCH 12/13] rpc: use private broadcast from sendrawtransaction RPC if -privatebroadcast is ON --- src/rpc/mempool.cpp | 34 +++++++++++++++++++++++++++++----- 1 file changed, 29 insertions(+), 5 deletions(-) diff --git a/src/rpc/mempool.cpp b/src/rpc/mempool.cpp index 62a2e29dac90..50f8036f4f50 100644 --- a/src/rpc/mempool.cpp +++ b/src/rpc/mempool.cpp @@ -8,10 +8,12 @@ #include #include +#include #include #include #include #include +#include // for g_reachable_nets #include #include #include @@ -44,11 +46,21 @@ static RPCHelpMan sendrawtransaction() { return RPCHelpMan{ "sendrawtransaction", - "Submit a raw transaction (serialized, hex-encoded) to local node and network.\n" - "\nThe transaction will be sent unconditionally to all peers, so using sendrawtransaction\n" - "for manual rebroadcast may degrade privacy by leaking the transaction's origin, as\n" - "nodes will normally not rebroadcast non-wallet transactions already in their mempool.\n" + "Submit a raw transaction (serialized, hex-encoded) to the network.\n" + + "\nIf -privatebroadcast is disabled, then the transaction will be put into the\n" + "local mempool of the node and will be sent unconditionally to all currently\n" + "connected peers, so using sendrawtransaction for manual rebroadcast will degrade\n" + "privacy by leaking the transaction's origin, as nodes will normally not\n" + "rebroadcast non-wallet transactions already in their mempool.\n" + + "\nIf -privatebroadcast is enabled, then the transaction will be sent only via\n" + "dedicated, short-lived connections to Tor or I2P peers or IPv4/IPv6 peers\n" + "through the Tor network. This conceals the transaction origin. The transaction\n" + "will only enter the local mempool when it is received back from the network.\n" + "\nA specific exception, RPC_TRANSACTION_ALREADY_IN_UTXO_SET, may throw if the transaction cannot be added to the mempool.\n" + "\nRelated RPCs: createrawtransaction, signrawtransactionwithkey\n", { {"hexstring", RPCArg::Type::STR_HEX, RPCArg::Optional::NO, "The hex string of the raw transaction"}, @@ -98,11 +110,23 @@ static RPCHelpMan sendrawtransaction() std::string err_string; AssertLockNotHeld(cs_main); NodeContext& node = EnsureAnyNodeContext(request.context); + const bool private_broadcast_enabled{gArgs.GetBoolArg("-privatebroadcast", DEFAULT_PRIVATE_BROADCAST)}; + if (private_broadcast_enabled && + !g_reachable_nets.Contains(NET_ONION) && + !g_reachable_nets.Contains(NET_I2P)) { + throw JSONRPCError(RPC_MISC_ERROR, + "-privatebroadcast is enabled, but none of the Tor or I2P networks is " + "reachable. Maybe the location of the Tor proxy couldn't be retrieved " + "from the Tor daemon at startup. Check whether the Tor daemon is running " + "and that -torcontrol, -torpassword and -i2psam are configured properly."); + } + const auto method = private_broadcast_enabled ? node::TxBroadcast::NO_MEMPOOL_PRIVATE_BROADCAST + : node::TxBroadcast::MEMPOOL_AND_BROADCAST_TO_ALL; const TransactionError err = BroadcastTransaction(node, tx, err_string, max_raw_tx_fee, - node::TxBroadcast::MEMPOOL_AND_BROADCAST_TO_ALL, + method, /*wait_callback=*/true); if (TransactionError::OK != err) { throw JSONRPCTransactionError(err, err_string); From 7987000d3601d66e08784cb50939c852c1d14dfd Mon Sep 17 00:00:00 2001 From: Vasil Dimov Date: Wed, 17 May 2023 17:19:49 +0200 Subject: [PATCH 13/13] test: add functional test for private broadcast --- test/functional/p2p_private_broadcast.py | 419 +++++++++++++++++++++++ test/functional/test_runner.py | 1 + 2 files changed, 420 insertions(+) create mode 100755 test/functional/p2p_private_broadcast.py diff --git a/test/functional/p2p_private_broadcast.py b/test/functional/p2p_private_broadcast.py new file mode 100755 index 000000000000..9ee8dce0faa8 --- /dev/null +++ b/test/functional/p2p_private_broadcast.py @@ -0,0 +1,419 @@ +#!/usr/bin/env python3 +# Copyright (c) 2017-present The Bitcoin Core developers +# Distributed under the MIT software license, see the accompanying +# file COPYING or http://www.opensource.org/licenses/mit-license.php. +""" +Test how locally submitted transactions are sent to the network when private broadcast is used. +""" + +import time +import threading + +from test_framework.p2p import ( + P2PDataStore, + P2PInterface, + P2P_SERVICES, + P2P_VERSION, +) +from test_framework.messages import ( + CAddress, + CInv, + COIN, + MSG_WTX, + malleate_tx_to_invalid_witness, + msg_inv, + msg_tx, +) +from test_framework.netutil import ( + format_addr_port +) +from test_framework.script_util import ValidWitnessMalleatedTx +from test_framework.socks5 import ( + Socks5Configuration, + Socks5Server, +) +from test_framework.test_framework import ( + BitcoinTestFramework, +) +from test_framework.util import ( + MAX_NODES, + assert_equal, + assert_not_equal, + assert_raises_rpc_error, + p2p_port, + tor_port, +) +from test_framework.wallet import ( + MiniWallet, +) + +MAX_OUTBOUND_FULL_RELAY_CONNECTIONS = 8 +MAX_BLOCK_RELAY_ONLY_CONNECTIONS = 2 +NUM_INITIAL_CONNECTIONS = MAX_OUTBOUND_FULL_RELAY_CONNECTIONS + MAX_BLOCK_RELAY_ONLY_CONNECTIONS +NUM_PRIVATE_BROADCAST_PER_TX = 3 + +# Fill addrman with these addresses. Must have enough Tor addresses, so that even +# if all 10 default connections are opened to a Tor address (!?) there must be more +# for private broadcast. +ADDRMAN_ADDRESSES = [ + "1.65.195.98", + "2.59.236.56", + "2.83.114.20", + "2.248.194.16", + "5.2.154.6", + "5.101.140.30", + "5.128.87.126", + "5.144.21.49", + "5.172.132.104", + "5.188.62.18", + "5.200.2.180", + "8.129.184.255", + "8.209.105.138", + "12.34.98.148", + "14.199.102.151", + "18.27.79.17", + "18.27.124.231", + "18.216.249.151", + "23.88.155.58", + "23.93.101.158", + "[2001:19f0:1000:1db3:5400:4ff:fe56:5a8d]", + "[2001:19f0:5:24da:3eec:efff:feb9:f36e]", + "[2001:19f0:5:24da::]", + "[2001:19f0:5:4535:3eec:efff:feb9:87e4]", + "[2001:19f0:5:4535::]", + "[2001:1bc0:c1::2000]", + "[2001:1c04:4008:6300:8a5f:2678:114b:a660]", + "[2001:41d0:203:3739::]", + "[2001:41d0:203:8f49::]", + "[2001:41d0:203:bb0a::]", + "[2001:41d0:2:bf8f::]", + "[2001:41d0:303:de8b::]", + "[2001:41d0:403:3d61::]", + "[2001:41d0:405:9600::]", + "[2001:41d0:8:ed7f::1]", + "[2001:41d0:a:69a2::1]", + "[2001:41f0::62:6974:636f:696e]", + "[2001:470:1b62::]", + "[2001:470:1f05:43b:2831:8530:7179:5864]", + "[2001:470:1f09:b14::11]", + "2bqghnldu6mcug4pikzprwhtjjnsyederctvci6klcwzepnjd46ikjyd.onion", + "4lr3w2iyyl5u5l6tosizclykf5v3smqroqdn2i4h3kq6pfbbjb2xytad.onion", + "5g72ppm3krkorsfopcm2bi7wlv4ohhs4u4mlseymasn7g7zhdcyjpfid.onion", + "5sbmcl4m5api5tqafi4gcckrn3y52sz5mskxf3t6iw4bp7erwiptrgqd.onion", + "776aegl7tfhg6oiqqy76jnwrwbvcytsx2qegcgh2mjqujll4376ohlid.onion", + "77mdte42srl42shdh2mhtjr7nf7dmedqrw6bkcdekhdvmnld6ojyyiad.onion", + "azbpsh4arqlm6442wfimy7qr65bmha2zhgjg7wbaji6vvaug53hur2qd.onion", + "b64xcbleqmwgq2u46bh4hegnlrzzvxntyzbmucn3zt7cssm7y4ubv3id.onion", + "bsqbtcparrfihlwolt4xgjbf4cgqckvrvsfyvy6vhiqrnh4w6ghixoid.onion", + "bsqbtctulf2g4jtjsdfgl2ed7qs6zz5wqx27qnyiik7laockryvszqqd.onion", + "cwi3ekrwhig47dhhzfenr5hbvckj7fzaojygvazi2lucsenwbzwoyiqd.onion", + "devinbtcmwkuitvxl3tfi5of4zau46ymeannkjv6fpnylkgf3q5fa3id.onion", + "devinbtcyk643iruzfpaxw3on2jket7rbjmwygm42dmdyub3ietrbmid.onion", + "dtql5vci4iaml4anmueftqr7bfgzqlauzfy4rc2tfgulldd3ekyijjyd.onion", + "emzybtc25oddoa2prol2znpz2axnrg6k77xwgirmhv7igoiucddsxiad.onion", + "emzybtc3ewh7zihpkdvuwlgxrhzcxy2p5fvjggp7ngjbxcytxvt4rjid.onion", + "emzybtc454ewbviqnmgtgx3rgublsgkk23r4onbhidcv36wremue4kqd.onion", + "emzybtc5bnpb2o6gh54oquiox54o4r7yn4a2wiiwzrjonlouaibm2zid.onion", + "fpz6r5ppsakkwypjcglz6gcnwt7ytfhxskkfhzu62tnylcknh3eq6pad.onion", + "255fhcp6ajvftnyo7bwz3an3t4a4brhopm3bamyh2iu5r3gnr2rq.b32.i2p", + "27yrtht5b5bzom2w5ajb27najuqvuydtzb7bavlak25wkufec5mq.b32.i2p", + "3gocb7wc4zvbmmebktet7gujccuux4ifk3kqilnxnj5wpdpqx2hq.b32.i2p", + "4fcc23wt3hyjk3csfzcdyjz5pcwg5dzhdqgma6bch2qyiakcbboa.b32.i2p", + "4osyqeknhx5qf3a73jeimexwclmt42cju6xdp7icja4ixxguu2hq.b32.i2p", + "4umsi4nlmgyp4rckosg4vegd2ysljvid47zu7pqsollkaszcbpqq.b32.i2p", + "6j2ezegd3e2e2x3o3pox335f5vxfthrrigkdrbgfbdjchm5h4awa.b32.i2p", + "6n36ljyr55szci5ygidmxqer64qr24f4qmnymnbvgehz7qinxnla.b32.i2p", + "72yjs6mvlby3ky6mgpvvlemmwq5pfcznrzd34jkhclgrishqdxva.b32.i2p", + "a5qsnv3maw77mlmmzlcglu6twje6ttctd3fhpbfwcbpmewx6fczq.b32.i2p", + "aovep2pco7v2k4rheofrgytbgk23eg22dczpsjqgqtxcqqvmxk6a.b32.i2p", + "bitcoi656nll5hu6u7ddzrmzysdtwtnzcnrjd4rfdqbeey7dmn5a.b32.i2p", + "brifkruhlkgrj65hffybrjrjqcgdgqs2r7siizb5b2232nruik3a.b32.i2p", + "c4gfnttsuwqomiygupdqqqyy5y5emnk5c73hrfvatri67prd7vyq.b32.i2p", + "day3hgxyrtwjslt54sikevbhxxs4qzo7d6vi72ipmscqtq3qmijq.b32.i2p", + "du5kydummi23bjfp6bd7owsvrijgt7zhvxmz5h5f5spcioeoetwq.b32.i2p", + "e55k6wu46rzp4pg5pk5npgbr3zz45bc3ihtzu2xcye5vwnzdy7pq.b32.i2p", + "eciohu5nq7vsvwjjc52epskuk75d24iccgzmhbzrwonw6lx4gdva.b32.i2p", + "ejlnngarmhqvune74ko7kk55xtgbz5i5ncs4vmnvjpy3l7y63xaa.b32.i2p", + "fhzlp3xroabohnmjonu5iqazwhlbbwh5cpujvw2azcu3srqdceja.b32.i2p", + "[fc32:17ea:e415:c3bf:9808:149d:b5a2:c9aa]", + "[fcc7:be49:ccd1:dc91:3125:f0da:457d:8ce]", + "[fcdc:73ae:b1a9:1bf8:d4c2:811:a4c7:c34e]", +] + + +class P2PPrivateBroadcast(BitcoinTestFramework): + def set_test_params(self): + self.disable_autoconnect = False + self.num_nodes = 2 + + def setup_nodes(self): + # Start a SOCKS5 proxy server. + socks5_server_config = Socks5Configuration() + # self.nodes[0] listens on p2p_port(0), + # self.nodes[1] listens on p2p_port(1), + # thus we tell the SOCKS5 server to listen on p2p_port(self.num_nodes) (self.num_nodes is 2) + socks5_server_config.addr = ("127.0.0.1", p2p_port(self.num_nodes)) + socks5_server_config.unauth = True + socks5_server_config.auth = True + + self.socks5_server = Socks5Server(socks5_server_config) + self.socks5_server.start() + + # Tor ports are the highest among p2p/rpc/tor, so this should be the first available port. + ports_base = tor_port(MAX_NODES) + 1 + + self.destinations = [] + + self.destinations_lock = threading.Lock() + + def destinations_factory(requested_to_addr, requested_to_port): + with self.destinations_lock: + i = len(self.destinations) + actual_to_addr = "" + actual_to_port = 0 + listener = None + if i == NUM_INITIAL_CONNECTIONS: + # Instruct the SOCKS5 server to redirect the first private + # broadcast connection from nodes[0] to nodes[1] + actual_to_addr = "127.0.0.1" # nodes[1] listen address + actual_to_port = tor_port(1) # nodes[1] listen port for Tor + self.log.debug(f"Instructing the SOCKS5 proxy to redirect connection i={i} to " + f"{format_addr_port(actual_to_addr, actual_to_port)} (nodes[1])") + else: + # Create a Python P2P listening node and instruct the SOCKS5 proxy to + # redirect the connection to it. The first outbound connection is used + # later to serve GETDATA, thus make it P2PDataStore(). + listener = P2PDataStore() if i == 0 else P2PInterface() + listener.peer_connect_helper(dstaddr="0.0.0.0", dstport=0, net=self.chain, timeout_factor=self.options.timeout_factor) + listener.peer_connect_send_version(services=P2P_SERVICES) + + def on_listen_done(addr, port): + nonlocal actual_to_addr + nonlocal actual_to_port + actual_to_addr = addr + actual_to_port = port + + self.network_thread.listen( + addr="127.0.0.1", + port=ports_base + i, + p2p=listener, + callback=on_listen_done) + # Wait until the callback has been called. + self.wait_until(lambda: actual_to_port != 0) + self.log.debug(f"Instructing the SOCKS5 proxy to redirect connection i={i} to " + f"{format_addr_port(actual_to_addr, actual_to_port)} (a Python node)") + + self.destinations.append({ + "requested_to": format_addr_port(requested_to_addr, requested_to_port), + "node": listener, + }) + assert_equal(len(self.destinations), i + 1) + + return { + "actual_to_addr": actual_to_addr, + "actual_to_port": actual_to_port, + } + + self.socks5_server.conf.destinations_factory = destinations_factory + + self.extra_args = [ + [ + # Needed to be able to add CJDNS addresses to addrman (otherwise they are unroutable). + "-cjdnsreachable", + # Connecting, sending garbage, being disconnected messes up with this test's + # check_broadcasts() which waits for a particular Python node to receive a connection. + "-v2transport=0", + "-test=addrman", + "-privatebroadcast", + f"-proxy={socks5_server_config.addr[0]}:{socks5_server_config.addr[1]}", + ], + [ + "-connect=0", + f"-bind=127.0.0.1:{tor_port(1)}=onion", + ], + ] + super().setup_nodes() + + def setup_network(self): + self.setup_nodes() + + def check_broadcasts(self, label, tx, broadcasts_to_expect, skip_destinations): + broadcasts_done = 0 + i = skip_destinations - 1 + while broadcasts_done < broadcasts_to_expect: + i += 1 + self.log.debug(f"{label}: waiting for outbound connection i={i}") + # At this point the connection may not yet have been established (A), + # may be active (B), or may have already been closed (C). + self.wait_until(lambda: len(self.destinations) > i) + dest = self.destinations[i] + peer = dest["node"] + peer.wait_until(lambda: peer.message_count["version"] == 1, check_connected=False) + # Now it is either (B) or (C). + if peer.last_message["version"].nServices != 0: + self.log.debug(f"{label}: outbound connection i={i} to {dest['requested_to']} not a private broadcast, ignoring it (maybe feeler or extra block only)") + continue + self.log.debug(f"{label}: outbound connection i={i} to {dest['requested_to']} must be a private broadcast, checking it") + peer.wait_for_disconnect() + # Now it is (C). + assert_equal(peer.message_count, { + "version": 1, + "verack": 1, + "inv": 1, + "tx": 1, + "ping": 1 + }) + dummy_address = CAddress() + dummy_address.nServices = 0 + assert_equal(peer.last_message["version"].nVersion, P2P_VERSION) + assert_equal(peer.last_message["version"].nServices, 0) + assert_equal(peer.last_message["version"].nTime, 0) + assert_equal(peer.last_message["version"].addrTo, dummy_address) + assert_equal(peer.last_message["version"].addrFrom, dummy_address) + assert_equal(peer.last_message["version"].strSubVer, "/pynode:0.0.1/") + assert_equal(peer.last_message["version"].nStartingHeight, 0) + assert_equal(peer.last_message["version"].relay, 0) + assert_equal(peer.last_message["tx"].tx.txid_hex, tx["txid"]) + self.log.info(f"{label}: ok: outbound connection i={i} is private broadcast of txid={tx['txid']}") + broadcasts_done += 1 + + def run_test(self): + tx_originator = self.nodes[0] + tx_receiver = self.nodes[1] + far_observer = tx_receiver.add_p2p_connection(P2PInterface()) + + wallet = MiniWallet(tx_originator) + + # Fill tx_originator's addrman. + for addr in ADDRMAN_ADDRESSES: + res = tx_originator.addpeeraddress(address=addr, port=8333, tried=False) + if not res["success"]: + self.log.debug(f"Could not add {addr} to tx_originator's addrman (collision?)") + + self.wait_until(lambda: len(self.destinations) == NUM_INITIAL_CONNECTIONS) + + # The next opened connection by tx_originator should be "private broadcast" + # for sending the transaction. The SOCKS5 proxy should redirect it to tx_receiver. + + txs = wallet.create_self_transfer_chain(chain_length=3) + self.log.info(f"Created txid={txs[0]['txid']}: for basic test") + self.log.info(f"Created txid={txs[1]['txid']}: for broadcast with dependency in mempool + rebroadcast") + self.log.info(f"Created txid={txs[2]['txid']}: for broadcast with dependency not in mempool") + tx_originator.sendrawtransaction(hexstring=txs[0]["hex"], maxfeerate=0.1) + + self.log.debug(f"Waiting for outbound connection i={NUM_INITIAL_CONNECTIONS}, " + "must be the first private broadcast connection") + self.wait_until(lambda: len(tx_receiver.getrawmempool()) > 0) + far_observer.wait_for_tx(txs[0]["txid"]) + self.log.info(f"Outbound connection i={NUM_INITIAL_CONNECTIONS}: " + "the private broadcast target received and further relayed the transaction") + + # One already checked above, check the other NUM_PRIVATE_BROADCAST_PER_TX - 1 broadcasts. + self.check_broadcasts("Basic", txs[0], NUM_PRIVATE_BROADCAST_PER_TX - 1, NUM_INITIAL_CONNECTIONS + 1) + + self.log.info("Resending the same transaction via RPC again (it is not in the mempool yet)") + ignoring_msg = f"Ignoring unnecessary request to schedule an already scheduled transaction: txid={txs[0]['txid']}, wtxid={txs[0]['wtxid']}" + with tx_originator.busy_wait_for_debug_log(expected_msgs=[ignoring_msg.encode()]): + tx_originator.sendrawtransaction(hexstring=txs[0]["hex"], maxfeerate=0) + + self.log.info("Sending a malleated transaction with an invalid witness via RPC") + malleated_invalid = malleate_tx_to_invalid_witness(txs[0]) + assert_raises_rpc_error(-26, "mempool-script-verify-flag-failed", + tx_originator.sendrawtransaction, + hexstring=malleated_invalid.serialize_with_witness().hex(), + maxfeerate=0.1) + + self.log.info("Checking that the transaction is not in the originator node's mempool") + assert_equal(len(tx_originator.getrawmempool()), 0) + + wtxid_int = int(txs[0]["wtxid"], 16) + inv = CInv(MSG_WTX, wtxid_int) + + self.log.info("Sending INV and waiting for GETDATA from node") + tx_returner = self.destinations[0]["node"] # Will return the transaction back to the originator. + tx_returner.tx_store[wtxid_int] = txs[0]["tx"] + assert "getdata" not in tx_returner.last_message + received_back_msg = f"Received our privately broadcast transaction (txid={txs[0]['txid']}) from the network" + with tx_originator.assert_debug_log(expected_msgs=[received_back_msg]): + tx_returner.send_without_ping(msg_inv([inv])) + tx_returner.wait_until(lambda: "getdata" in tx_returner.last_message) + self.wait_until(lambda: len(tx_originator.getrawmempool()) > 0) + + self.log.info("Waiting for normal broadcast to another peer") + self.destinations[1]["node"].wait_for_inv([inv]) + + self.log.info("Sending a transaction that is already in the mempool") + skip_destinations = len(self.destinations) + tx_originator.sendrawtransaction(hexstring=txs[0]["hex"], maxfeerate=0) + self.check_broadcasts("Broadcast of mempool transaction", txs[0], NUM_PRIVATE_BROADCAST_PER_TX, skip_destinations) + + self.log.info("Sending a transaction with a dependency in the mempool") + skip_destinations = len(self.destinations) + tx_originator.sendrawtransaction(hexstring=txs[1]["hex"], maxfeerate=0.1) + self.check_broadcasts("Dependency in mempool", txs[1], NUM_PRIVATE_BROADCAST_PER_TX, skip_destinations) + + self.log.info("Sending a transaction with a dependency not in the mempool (should be rejected)") + assert_equal(len(tx_originator.getrawmempool()), 1) + assert_raises_rpc_error(-25, "bad-txns-inputs-missingorspent", + tx_originator.sendrawtransaction, hexstring=txs[2]["hex"], maxfeerate=0.1) + assert_raises_rpc_error(-25, "bad-txns-inputs-missingorspent", + tx_originator.sendrawtransaction, hexstring=txs[2]["hex"], maxfeerate=0) + + # Since txs[1] has not been received back by tx_originator, + # it should be re-broadcast after a while. Advance tx_originator's clock + # to trigger a re-broadcast. Should be more than the maximum returned by + # NextTxBroadcast() in net_processing.cpp. + self.log.info("Checking that rebroadcast works") + delta = 20 * 60 # 20min + skip_destinations = len(self.destinations) + rebroadcast_msg = f"Reattempting broadcast of stale txid={txs[1]['txid']}" + with tx_originator.busy_wait_for_debug_log(expected_msgs=[rebroadcast_msg.encode()]): + tx_originator.setmocktime(int(time.time()) + delta) + tx_originator.mockscheduler(delta) + self.check_broadcasts("Rebroadcast", txs[1], 1, skip_destinations) + tx_originator.setmocktime(0) # Let the clock tick again (it will go backwards due to this). + + self.log.info("Sending a pair of transactions with the same txid but different valid wtxids via RPC") + txgen = ValidWitnessMalleatedTx() + funding = wallet.get_utxo() + fee_sat = 1000 + siblings_parent = txgen.build_parent_tx(funding["txid"], amount=funding["value"] * COIN - fee_sat) + sibling1, sibling2 = txgen.build_malleated_children(siblings_parent.txid_hex, amount=siblings_parent.vout[0].nValue - fee_sat) + self.log.info(f" - sibling1: txid={sibling1.txid_hex}, wtxid={sibling1.wtxid_hex}") + self.log.info(f" - sibling2: txid={sibling2.txid_hex}, wtxid={sibling2.wtxid_hex}") + assert_equal(sibling1.txid_hex, sibling2.txid_hex) + assert_not_equal(sibling1.wtxid_hex, sibling2.wtxid_hex) + wallet.sign_tx(siblings_parent) + assert_equal(len(tx_originator.getrawmempool()), 1) + tx_returner.send_without_ping(msg_tx(siblings_parent)) + self.wait_until(lambda: len(tx_originator.getrawmempool()) > 1) + self.log.info(" - siblings' parent added to the mempool") + tx_originator.sendrawtransaction(hexstring=sibling1.serialize_with_witness().hex(), maxfeerate=0.1) + self.log.info(" - sent sibling1: ok") + ignoring_msg = f"Ignoring unnecessary request to schedule an already scheduled transaction: txid={sibling2.txid_hex}, wtxid={sibling2.wtxid_hex}" + with tx_originator.busy_wait_for_debug_log(expected_msgs=[ignoring_msg.encode()]): + tx_originator.sendrawtransaction(hexstring=sibling2.serialize_with_witness().hex(), maxfeerate=0.1) + self.log.info(" - sibling2 rejected because it has the same txid: ok") + + # Stop the SOCKS5 proxy server to avoid it being upset by the bitcoin + # node disconnecting in the middle of the SOCKS5 handshake when we + # restart below. + self.socks5_server.stop() + + self.log.info("Trying to send a transaction when none of Tor or I2P is reachable") + self.restart_node(0, extra_args=[ + "-privatebroadcast", + "-v2transport=0", + # A location where definitely a Tor control is not listening. This would allow + # Bitcoin Core to start, hoping/assuming that the location of the Tor proxy + # may be retrieved after startup from the Tor control, but it will not be, so + # the RPC should throw. + "-torcontrol=127.0.0.1:1", + "-listenonion", + ]) + assert_raises_rpc_error(-1, "none of the Tor or I2P networks is reachable", + tx_originator.sendrawtransaction, hexstring=txs[0]["hex"], maxfeerate=0.1) + + +if __name__ == "__main__": + P2PPrivateBroadcast(__file__).main() diff --git a/test/functional/test_runner.py b/test/functional/test_runner.py index a4c19dd11d5f..ad588096bb52 100755 --- a/test/functional/test_runner.py +++ b/test/functional/test_runner.py @@ -301,6 +301,7 @@ 'rpc_dumptxoutset.py', 'feature_minchainwork.py', 'rpc_estimatefee.py', + 'p2p_private_broadcast.py', 'rpc_getblockstats.py', 'feature_port.py', 'feature_bind_port_externalip.py',