From 3a42adfe2f9ef2cfffb74fdb972db672fae3d905 Mon Sep 17 00:00:00 2001 From: everoddandeven Date: Tue, 3 Mar 2026 00:10:56 +0100 Subject: [PATCH] Txs, transfers and outputs querying consolidation * Implement `MoneroWalletRpc::sweep_unlocked` * Implement `test_monero_wallet_model` unit tests * Fix `MoneroWalletRpc::sweep_output`, `MoneroWalletRpc::sweep_account`, `MoneroWalletRpc::get_outputs_aux`, `PyMoneroWalletRpc::verify_message` * Fix tx query reference in transfer and output query * Add wallet `get_tx(const std::string&)` and `get_txs(const std::vector&)` convenience methods * Fix `MoneroWallet::verify_message` to return bad result for consistency * Fix wallet data model deserialization --- src/cpp/py_monero.cpp | 120 +- src/cpp/wallet/py_monero_wallet_model.cpp | 15 +- src/cpp/wallet/py_monero_wallet_model.h | 1 + src/cpp/wallet/py_monero_wallet_rpc.cpp | 130 ++- src/cpp/wallet/py_monero_wallet_rpc.h | 1 + src/python/monero_output_query.pyi | 6 + src/python/monero_transfer.pyi | 3 +- src/python/monero_transfer_query.pyi | 4 + src/python/monero_wallet.pyi | 18 + tests/config/config.ini | 1 + tests/test_monero_daemon_interface.py | 2 +- tests/test_monero_daemon_rpc.py | 72 +- tests/test_monero_rpc_connection.py | 15 +- tests/test_monero_utils.py | 29 + tests/test_monero_wallet_common.py | 1217 ++++++++++++++++++++- tests/test_monero_wallet_interface.py | 23 +- tests/test_monero_wallet_keys.py | 121 +- tests/test_monero_wallet_model.py | 173 +++ tests/test_monero_wallet_rpc.py | 25 +- tests/utils/assert_utils.py | 9 - tests/utils/mining_utils.py | 16 +- tests/utils/single_tx_sender.py | 78 +- tests/utils/test_utils.py | 15 +- tests/utils/tx_spammer.py | 2 +- tests/utils/tx_utils.py | 107 +- tests/utils/wallet_equality_utils.py | 5 +- tests/utils/wallet_utils.py | 36 +- 27 files changed, 2094 insertions(+), 150 deletions(-) create mode 100644 tests/test_monero_wallet_model.py diff --git a/src/cpp/py_monero.cpp b/src/cpp/py_monero.cpp index 66102fb..6a979e9 100644 --- a/src/cpp/py_monero.cpp +++ b/src/cpp/py_monero.cpp @@ -960,13 +960,32 @@ PYBIND11_MODULE(monero, m) { MONERO_CATCH_AND_RETHROW(monero::monero_transfer_query::deserialize_from_block(transfer_query_json)); }, py::arg("transfer_query_json")) .def_readwrite("incoming", &monero::monero_transfer_query::m_is_incoming) + .def_property("outgoing", + [](const monero::monero_transfer_query& self) { return self.is_outgoing(); }, + [](monero::monero_transfer_query& self, const boost::optional& val) { + if (val == boost::none) self.m_is_incoming = boost::none; + else self.m_is_incoming = !val.get(); + }) .def_readwrite("address", &monero::monero_transfer_query::m_address) .def_readwrite("addresses", &monero::monero_transfer_query::m_addresses) - .def_readwrite("address", &monero::monero_transfer_query::m_subaddress_index) + .def_readwrite("subaddress_index", &monero::monero_transfer_query::m_subaddress_index) .def_readwrite("subaddress_indices", &monero::monero_transfer_query::m_subaddress_indices) .def_readwrite("destinations", &monero::monero_transfer_query::m_destinations) .def_readwrite("has_destinations", &monero::monero_transfer_query::m_has_destinations) - .def_readwrite("tx_query", &monero::monero_transfer_query::m_tx_query) + .def_property("tx_query", + [](const monero::monero_transfer_query& self) { return self.m_tx_query; }, + [](std::shared_ptr& self, const std::shared_ptr& val) { + const auto old_query = self->m_tx_query; + self->m_tx_query = val; + if (val != nullptr) { + val->m_transfer_query = self; + } + else self->m_tx_query = boost::none; + + if (old_query != boost::none) { + old_query.get()->m_transfer_query = boost::none; + } + }) .def("copy", [](const std::shared_ptr& self) { auto tgt = std::make_shared(); MONERO_CATCH_AND_RETHROW(self->copy(self, tgt)); @@ -1003,7 +1022,21 @@ PYBIND11_MODULE(monero, m) { .def_readwrite("subaddress_indices", &monero::monero_output_query::m_subaddress_indices) .def_readwrite("min_amount", &monero::monero_output_query::m_min_amount) .def_readwrite("max_amount", &monero::monero_output_query::m_max_amount) - .def_readwrite("tx_query", &monero::monero_output_query::m_tx_query) + .def_readonly("tx_query", &monero::monero_output_query::m_tx_query) + .def("set_tx_query", [](const std::shared_ptr& self, const std::shared_ptr& val, bool output_query) { + const auto old_query = self->m_tx_query; + if (val != nullptr) { + self->m_tx_query = val; + if (output_query) val->m_output_query = self; + else val->m_input_query = self; + } else { + self->m_tx_query = boost::none; + } + if (old_query != boost::none) { + if (output_query) old_query.get()->m_output_query = boost::none; + else old_query.get()->m_input_query = boost::none; + } + }, py::arg("tx_query"), py::arg("output_query")) .def("copy", [](const std::shared_ptr& self) { auto tgt = std::make_shared(); MONERO_CATCH_AND_RETHROW(self->copy(self, tgt)); @@ -1093,9 +1126,51 @@ PYBIND11_MODULE(monero, m) { .def_readwrite("min_height", &monero::monero_tx_query::m_min_height) .def_readwrite("max_height", &monero::monero_tx_query::m_max_height) .def_readwrite("include_outputs", &monero::monero_tx_query::m_include_outputs) - .def_readwrite("transfer_query", &monero::monero_tx_query::m_transfer_query) - .def_readwrite("input_query", &monero::monero_tx_query::m_input_query) - .def_readwrite("output_query", &monero::monero_tx_query::m_output_query) + .def_property("transfer_query", + [](const monero::monero_tx_query& self) { return self.m_transfer_query; }, + [](std::shared_ptr& self, const boost::optional>& val) { + const auto old_query = self->m_transfer_query; + self->m_transfer_query = val; + if (self->m_transfer_query != boost::none) { + self->m_transfer_query.get()->m_tx_query = self; + } + if (old_query != boost::none) { + if (val != boost::none && val.get() == old_query.get()) { + return; + } + old_query.get()->m_tx_query = boost::none; + } + }) + .def_property("input_query", + [](const monero::monero_tx_query& self) { return self.m_input_query; }, + [](std::shared_ptr& self, const boost::optional>& val) { + const auto old_query = self->m_input_query; + self->m_input_query = val; + if (self->m_input_query != boost::none) { + self->m_input_query.get()->m_tx_query = self; + } + if (old_query != boost::none) { + if (val != boost::none && val.get() == old_query.get()) { + return; + } + old_query.get()->m_tx_query = boost::none; + } + }) + .def_property("output_query", + [](const monero::monero_tx_query& self) { return self.m_output_query; }, + [](std::shared_ptr& self, const boost::optional>& val) { + const auto old_query = self->m_output_query; + self->m_output_query = val; + if (self->m_output_query != boost::none) { + self->m_output_query.get()->m_tx_query = self; + } + if (old_query != boost::none) { + if (val != boost::none && val.get() == old_query.get()) { + return; + } + old_query.get()->m_tx_query = boost::none; + } + }) .def("copy", [](const std::shared_ptr& self) { auto tgt = std::make_shared(); MONERO_CATCH_AND_RETHROW(self->copy(self, tgt)); @@ -1686,6 +1761,16 @@ PYBIND11_MODULE(monero, m) { .def("set_subaddress_label", [](PyMoneroWallet& self, uint32_t account_idx, uint32_t subaddress_idx, const std::string& label) { MONERO_CATCH_AND_RETHROW(self.set_subaddress_label(account_idx, subaddress_idx, label)); }, py::arg("account_idx"), py::arg("subaddress_idx"), py::arg("label") = "") + .def("get_tx", [](PyMoneroWallet& self, const std::string& tx_hash) { + std::shared_ptr result = nullptr; + monero_tx_query query; + query.m_hashes.push_back(tx_hash); + auto txs = self.get_txs(query); + if (txs.size() > 0) { + result = txs[0]; + } + return result; + }, py::arg("tx_hash")) .def("get_txs", [](PyMoneroWallet& self) { MONERO_CATCH_AND_RETHROW(self.get_txs()); }) @@ -1703,6 +1788,22 @@ PYBIND11_MODULE(monero, m) { throw PyMoneroError(e.what()); } }, py::arg("query")) + .def("get_txs", [](PyMoneroWallet& self, const std::vector& tx_hashes) { + try { + monero_tx_query query; + query.m_hashes = tx_hashes; + auto txs = self.get_txs(query); + PyMoneroUtils::sort_txs_wallet(txs, query.m_hashes); + return txs; + } catch (const PyMoneroRpcError& e) { + throw; + } catch (const PyMoneroError& e) { + throw; + } + catch (const std::exception& e) { + throw PyMoneroError(e.what()); + } + }, py::arg("tx_hashes")) .def("get_transfers", [](PyMoneroWallet& self, const monero::monero_transfer_query& query) { MONERO_CATCH_AND_RETHROW(self.get_transfers(query)); }, py::arg("query")) @@ -1776,7 +1877,12 @@ PYBIND11_MODULE(monero, m) { MONERO_CATCH_AND_RETHROW(self.sign_message(msg, signature_type, account_idx, subaddress_idx)); }, py::arg("msg"), py::arg("signature_type"), py::arg("account_idx") = 0, py::arg("subaddress_idx") = 0) .def("verify_message", [](PyMoneroWallet& self, const std::string& msg, const std::string& address, const std::string& signature) { - MONERO_CATCH_AND_RETHROW(self.verify_message(msg, address, signature)); + try { + return self.verify_message(msg, address, signature); + } catch (...) { + // TODO wallet full can differentiate incorrect from invalid address, but rpc returns -2 for both, so returning bad result for consistency + return monero::monero_message_signature_result(); + } }, py::arg("msg"), py::arg("address"), py::arg("signature")) .def("get_tx_key", [](PyMoneroWallet& self, const std::string& tx_hash) { MONERO_CATCH_AND_RETHROW(self.get_tx_key(tx_hash)); diff --git a/src/cpp/wallet/py_monero_wallet_model.cpp b/src/cpp/wallet/py_monero_wallet_model.cpp index b630c37..ead351f 100644 --- a/src/cpp/wallet/py_monero_wallet_model.cpp +++ b/src/cpp/wallet/py_monero_wallet_model.cpp @@ -526,7 +526,6 @@ void PyMoneroTxWallet::from_property_tree_with_output(const boost::property_tree for(auto it = node.begin(); it != node.end(); ++it) { std::string key = it->first; - if (key == std::string("amount")) output->m_amount = it->second.get_value(); else if (key == std::string("spent")) output->m_is_spent = it->second.get_value(); else if (key == std::string("key_image")) { @@ -539,7 +538,11 @@ void PyMoneroTxWallet::from_property_tree_with_output(const boost::property_tree else if (key == std::string("frozen")) output->m_is_frozen = it->second.get_value(); else if (key == std::string("pubkey")) output->m_stealth_public_key = it->second.data(); else if (key == std::string("subaddr_index")) { - + for(auto indices_it = it->second.begin(); indices_it != it->second.end(); ++indices_it) { + std::string indices_key = indices_it->first; + if (indices_key == std::string("major")) output->m_account_index = indices_it->second.get_value(); + if (indices_key == std::string("minor")) output->m_subaddress_index = indices_it->second.get_value(); + } } else if (key == std::string("block_height")) { auto block = std::make_shared(); @@ -825,7 +828,7 @@ void PyMoneroTxSet::from_sent_txs(const boost::property_tree::ptree& node, const } else { auto dest = std::make_shared(); - dest->m_address = config.m_destinations[destination_idx]->m_address; + dest->m_address = config.get_normalized_destinations()[destination_idx]->m_address; dest->m_amount = amount; tx->m_outgoing_transfer.get()->m_destinations.push_back(dest); destination_idx++; @@ -1495,9 +1498,13 @@ rapidjson::Value PyMoneroSweepParams::to_rapidjson_val(rapidjson::Document::Allo if (m_priority != boost::none) monero_utils::add_json_member("priority", m_priority.get(), allocator, root, val_num); if (m_payment_id != boost::none) monero_utils::add_json_member("payment_id", m_payment_id.get(), allocator, root, val_str); if (m_get_tx_key != boost::none) monero_utils::add_json_member("get_tx_key", m_get_tx_key.get(), allocator, root); + if (m_get_tx_keys != boost::none) monero_utils::add_json_member("get_tx_keys", m_get_tx_keys.get(), allocator, root); if (m_get_tx_hex != boost::none) monero_utils::add_json_member("get_tx_hex", m_get_tx_hex.get(), allocator, root); if (m_get_tx_metadata != boost::none) monero_utils::add_json_member("get_tx_metadata", m_get_tx_metadata.get(), allocator, root); - if (m_relay != boost::none) monero_utils::add_json_member("do_not_relay", !m_relay.get(), allocator, root); + if (m_below_amount != boost::none) monero_utils::add_json_member("below_amount", m_below_amount.get(), allocator, root, val_num); + + bool relay = bool_equals_2(true, m_relay); + monero_utils::add_json_member("do_not_relay", !relay, allocator, root); return root; } diff --git a/src/cpp/wallet/py_monero_wallet_model.h b/src/cpp/wallet/py_monero_wallet_model.h index b944304..6e05006 100644 --- a/src/cpp/wallet/py_monero_wallet_model.h +++ b/src/cpp/wallet/py_monero_wallet_model.h @@ -786,6 +786,7 @@ class PyMoneroSweepParams : public PyMoneroJsonRequestParams { boost::optional m_payment_id; boost::optional m_below_amount; boost::optional m_get_tx_key; + boost::optional m_get_tx_keys; boost::optional m_get_tx_hex; boost::optional m_get_tx_metadata; diff --git a/src/cpp/wallet/py_monero_wallet_rpc.cpp b/src/cpp/wallet/py_monero_wallet_rpc.cpp index 118c77e..2ad6fab 100644 --- a/src/cpp/wallet/py_monero_wallet_rpc.cpp +++ b/src/cpp/wallet/py_monero_wallet_rpc.cpp @@ -1039,13 +1039,90 @@ std::vector> PyMoneroWalletRpc::create_txs(con return tx_set->m_txs; } +std::vector> PyMoneroWalletRpc::sweep_unlocked(const monero_tx_config& config) { + // validate config + std::vector> destinations = config.get_normalized_destinations(); + if (destinations.size() != 1) throw std::runtime_error("Must specify exactly one destination to sweep to"); + if (destinations[0]->m_address == boost::none) throw std::runtime_error("Must specify destination address to sweep to"); + if (destinations[0]->m_amount != boost::none) throw std::runtime_error("Cannot specify amount to sweep"); + if (config.m_account_index == boost::none && config.m_subaddress_indices.size() != 0) throw std::runtime_error("Must specify account index if subaddress indices are specified"); + + // determine account and subaddress indices to sweep; default to all with unlocked balance if not specified + std::map> indices; + if (config.m_account_index != boost::none) { + if (config.m_subaddress_indices.size() != 0) { + indices[config.m_account_index.get()] = config.m_subaddress_indices; + } else { + std::vector subaddress_indices; + for (const monero_subaddress& subaddress : monero_wallet::get_subaddresses(config.m_account_index.get())) { + if (subaddress.m_unlocked_balance.get() > 0) subaddress_indices.push_back(subaddress.m_index.get()); + } + indices[config.m_account_index.get()] = subaddress_indices; + } + } else { + std::vector accounts = monero_wallet::get_accounts(true); + for (const monero_account& account : accounts) { + if (account.m_unlocked_balance.get() > 0) { + std::vector subaddress_indices; + for (const monero_subaddress& subaddress : account.m_subaddresses) { + if (subaddress.m_unlocked_balance.get() > 0) subaddress_indices.push_back(subaddress.m_index.get()); + } + indices[account.m_index.get()] = subaddress_indices; + } + } + } + + // sweep from each account and collect resulting txs + std::vector> txs; + for (std::pair> subaddress_indices_pair : indices) { + + // copy and modify the original config + monero_tx_config copy = config.copy(); + copy.m_account_index = subaddress_indices_pair.first; + copy.m_sweep_each_subaddress = false; + + // sweep all subaddresses together // TODO monero-project: can this reveal outputs belong to the same wallet? + if (copy.m_sweep_each_subaddress == boost::none || copy.m_sweep_each_subaddress.get() != true) { + copy.m_subaddress_indices = subaddress_indices_pair.second; + std::vector> account_txs = sweep_account(copy); + txs.insert(std::end(txs), std::begin(account_txs), std::end(account_txs)); + } + + // otherwise sweep each subaddress individually + else { + for (uint32_t subaddress_index : subaddress_indices_pair.second) { + std::vector subaddress_indices; + subaddress_indices.push_back(subaddress_index); + copy.m_subaddress_indices = subaddress_indices; + std::vector> account_txs = sweep_account(copy); + txs.insert(std::end(txs), std::begin(account_txs), std::end(account_txs)); + } + } + } + + // notify listeners of spent funds + if (config.m_relay != boost::none && config.m_relay.get()) poll(); + return txs; +} + + std::shared_ptr PyMoneroWalletRpc::sweep_output(const monero_tx_config& config) { + // validate request + std::vector> destinations = config.get_normalized_destinations(); + if (config.m_sweep_each_subaddress != boost::none) throw std::runtime_error("Cannot sweep each subaddress when sweeping single output"); + if (config.m_below_amount != boost::none) throw std::runtime_error("Cannot specifiy below_amount when sweeping single output"); + if (config.m_can_split != boost::none) throw std::runtime_error("Splitting is not applicable when sweeping output"); + // TODO check first destination address is not boost::none/empty + if (destinations.size() != 1) throw std::runtime_error("Must provide exactly one destination address to sweep output to"); + if (destinations[0]->m_address == boost::none) throw std::runtime_error("Must specify destination address to sweep to"); + if (destinations[0]->m_amount != boost::none) throw std::runtime_error("Cannot specify amount to sweep"); + auto params = std::make_shared(config); PyMoneroJsonRequest request("sweep_single", params); auto response = m_rpc->send_json_request(request); if (response->m_result == boost::none) throw std::runtime_error("Invalid Monero JSONRPC response"); auto node = response->m_result.get(); - if (config.m_relay) poll(); + if (bool_equals_2(true, config.m_relay)) poll(); auto set = std::make_shared(); auto tx = std::make_shared(); PyMoneroTxWallet::init_sent(config, tx, true); @@ -1057,6 +1134,7 @@ std::vector> PyMoneroWalletRpc::sweep_dust(boo auto params = std::make_shared(relay); PyMoneroJsonRequest request("sweep_dust", params); auto response = m_rpc->send_json_request(request); + if (relay) poll(); if (response->m_result == boost::none) throw std::runtime_error("Invalid Monero JSONRPC response"); auto node = response->m_result.get(); auto set = std::make_shared(); @@ -1145,11 +1223,17 @@ std::string PyMoneroWalletRpc::sign_message(const std::string& msg, monero_messa monero_message_signature_result PyMoneroWalletRpc::verify_message(const std::string& msg, const std::string& address, const std::string& signature) const { auto params = std::make_shared(msg, address, signature); PyMoneroJsonRequest request("verify", params); - auto response = m_rpc->send_json_request(request); - if (response->m_result == boost::none) throw std::runtime_error("Invalid Monero JSONRPC response"); - auto node = response->m_result.get(); - auto sig_result = std::make_shared(); - PyMoneroMessageSignatureResult::from_property_tree(node, sig_result); + auto sig_result = std::make_shared(); + sig_result->m_is_good = false; + try { + auto response = m_rpc->send_json_request(request); + if (response->m_result == boost::none) throw std::runtime_error("Invalid Monero JSONRPC response"); + auto node = response->m_result.get(); + PyMoneroMessageSignatureResult::from_property_tree(node, sig_result); + } catch (const PyMoneroRpcError& ex) { + if (ex.code != -2) throw; + } + return *sig_result; } @@ -1668,12 +1752,13 @@ std::string PyMoneroWalletRpc::query_key(const std::string& key_type) const { std::vector> PyMoneroWalletRpc::sweep_account(const monero_tx_config &conf) { auto config = conf.copy(); if (config.m_account_index == boost::none) throw std::runtime_error("Must specify an account index to sweep from"); - if (config.m_destinations.size() != 1) throw std::runtime_error("Must specify exactly one destination to sweep to"); - if (config.m_destinations[0]->m_address == boost::none) throw std::runtime_error("Must specify destination address to sweep to"); - if (config.m_destinations[0]->m_amount != boost::none) throw std::runtime_error("Cannot specify amount in sweep request"); + std::vector> destinations = config.get_normalized_destinations(); + if (destinations.size() != 1) throw std::runtime_error("Must specify exactly one destination to sweep to"); + if (destinations[0]->m_address == boost::none) throw std::runtime_error("Must specify destination address to sweep to"); + if (destinations[0]->m_amount != boost::none) throw std::runtime_error("Cannot specify amount in sweep request"); if (config.m_key_image != boost::none) throw std::runtime_error("Key image defined; use sweepOutput() to sweep an output by its key image"); //if (config.m_subaddress_indices.size() == 0) throw std::runtime_error("Empty list given for subaddresses indices to sweep"); - if (config.m_sweep_each_subaddress) throw std::runtime_error("Cannot sweep each subaddress with RPC `sweep_all`"); + if (bool_equals_2(true, config.m_sweep_each_subaddress)) throw std::runtime_error("Cannot sweep each subaddress with RPC `sweep_all`"); if (config.m_subtract_fee_from.size() > 0) throw std::runtime_error("Sweeping output does not support subtracting fees from destinations"); // sweep from all subaddresses if not otherwise defined @@ -1687,11 +1772,13 @@ std::vector> PyMoneroWalletRpc::sweep_account( if (config.m_subaddress_indices.size() == 0) throw std::runtime_error("No subaddresses to sweep from"); bool relay = config.m_relay == true; auto params = std::make_shared(config); + params->m_get_tx_key = boost::none; + params->m_get_tx_keys = true; PyMoneroJsonRequest request("sweep_all", params); auto response = m_rpc->send_json_request(request); if (response->m_result == boost::none) throw std::runtime_error("Invalid Monero JSONRPC response"); auto node = response->m_result.get(); - if (config.m_relay) poll(); + if (bool_equals_2(true, config.m_relay)) poll(); std::vector> txs; auto set = std::make_shared(); PyMoneroTxSet::from_sent_txs(node, set, txs, config); @@ -1706,24 +1793,23 @@ std::vector> PyMoneroWalletRpc::sweep_account( tx->m_is_miner_tx = false; tx->m_is_failed = false; tx->m_ring_size = monero_utils::RING_SIZE; + if (tx->m_outgoing_transfer == boost::none) throw std::runtime_error("Tx outgoing transfer is none"); auto transfer = tx->m_outgoing_transfer.get(); transfer->m_account_index = config.m_account_index; - if (config.m_subaddress_indices.size() == 1) - { + if (config.m_subaddress_indices.size() == 1) { transfer->m_subaddress_indices = config.m_subaddress_indices; } auto destination = std::make_shared(); - destination->m_address = config.m_destinations[0]->m_address; - destination->m_amount = config.m_destinations[0]->m_amount; - std::vector> destinations; - destinations.push_back(destination); - transfer->m_destinations = destinations; + destination->m_address = destinations[0]->m_address; + destination->m_amount = transfer->m_amount; + transfer->m_destinations.push_back(destination); tx->m_payment_id = config.m_payment_id; if (tx->m_unlock_time == boost::none) tx->m_unlock_time = 0; - if (tx->m_relay) { + if (relay) { if (tx->m_last_relayed_timestamp == boost::none) { - //tx.setLastRelayedTimestamp(System.currentTimeMillis()); // TODO (monero-wallet-rpc): provide timestamp on response; unconfirmed timestamps vary - } + // TODO (monero-wallet-rpc): provide timestamp on response; unconfirmed timestamps vary + tx->m_last_relayed_timestamp = static_cast(time(NULL)); + } if (tx->m_is_double_spend_seen == boost::none) tx->m_is_double_spend_seen = false; } } @@ -2093,7 +2179,7 @@ std::vector> PyMoneroWalletRpc::get_output } else { if (_query->m_subaddress_index != boost::none) throw std::runtime_error("Request specifies a subaddress index but not an account index"); - if (_query->m_subaddress_indices.empty()) throw std::runtime_error("Request specifies subaddress indices but not an account index"); + if (!_query->m_subaddress_indices.empty()) throw std::runtime_error("Request specifies subaddress indices but not an account index"); // fetch all account indices without subaddresses indices = get_account_indices(false); } diff --git a/src/cpp/wallet/py_monero_wallet_rpc.h b/src/cpp/wallet/py_monero_wallet_rpc.h index 94951dc..9789b45 100644 --- a/src/cpp/wallet/py_monero_wallet_rpc.h +++ b/src/cpp/wallet/py_monero_wallet_rpc.h @@ -127,6 +127,7 @@ class PyMoneroWalletRpc : public PyMoneroWallet { std::vector> create_txs(const monero_tx_config& conf) override; std::shared_ptr sweep_output(const monero_tx_config& config) override; std::vector> sweep_dust(bool relay = false) override; + std::vector> sweep_unlocked(const monero_tx_config& config) override; std::vector relay_txs(const std::vector& tx_metadatas) override; monero_tx_set describe_tx_set(const monero_tx_set& tx_set) override; monero_tx_set sign_txs(const std::string& unsigned_tx_hex) override; diff --git a/src/python/monero_output_query.pyi b/src/python/monero_output_query.pyi index e710503..1c047a6 100644 --- a/src/python/monero_output_query.pyi +++ b/src/python/monero_output_query.pyi @@ -28,7 +28,13 @@ class MoneroOutputQuery(MoneroOutputWallet): @typing.override def copy(self) -> MoneroOutputQuery: ... + def set_tx_query(self, tx_query: MoneroTxQuery | None, output_query: bool) -> None: + """ + Set tx query. + :param MoneroTxQuery | None tx_query: Tx query to set. + :param bool output_query: If `True` sets outputs query in tx query, otherwise inputs query. + """ def meets_criteria(self, output: MoneroOutputWallet, query_parent: bool = True) -> bool: """ Indicates if the output meets all the criteria defined within this query. diff --git a/src/python/monero_transfer.pyi b/src/python/monero_transfer.pyi index 0d7c4a8..d0f00f9 100644 --- a/src/python/monero_transfer.pyi +++ b/src/python/monero_transfer.pyi @@ -1,7 +1,8 @@ +from .serializable_struct import SerializableStruct from .monero_tx_wallet import MoneroTxWallet -class MoneroTransfer: +class MoneroTransfer(SerializableStruct): """ Models a base transfer of funds to or from the wallet. """ diff --git a/src/python/monero_transfer_query.pyi b/src/python/monero_transfer_query.pyi index 60d5347..60b4eb0 100644 --- a/src/python/monero_transfer_query.pyi +++ b/src/python/monero_transfer_query.pyi @@ -21,6 +21,10 @@ class MoneroTransferQuery(MoneroTransfer): """Filter transfers with or without destinations. `None` for all.""" incoming: bool | None """Filter incoming or outgoing transfers. `None` for all.""" + outgoing: bool | None + """Filter incoming or outgoing transfers. `None` for all.""" + subaddress_index: int | None + """Filter by subaddress index. `None` for all.""" subaddress_indices: list[int] """Select transfers involving particular subaddresses. Empty for all.""" tx_query: MoneroTxQuery | None diff --git a/src/python/monero_wallet.pyi b/src/python/monero_wallet.pyi index 08ff205..0adea69 100644 --- a/src/python/monero_wallet.pyi +++ b/src/python/monero_wallet.pyi @@ -635,6 +635,14 @@ class MoneroWallet: :return str: the transaction signature """ ... + def get_tx(self, tx_hash: str) -> MoneroTxWallet | None: + """ + Get single wallet transaction by hash. + + :param str tx_hash: Transaction hash + :return MoneroTxWallet | None: wallet transaction + """ + ... @typing.overload def get_txs(self) -> list[MoneroTxWallet]: """ @@ -645,6 +653,16 @@ class MoneroWallet: """ ... @typing.overload + def get_txs(self, tx_hashes: list[str]) -> list[MoneroTxWallet]: + """ + Get all wallet transactions. Wallet transactions contain one or more + transfers that are either incoming or outgoing to the wallet. + + :param list[str] tx_hashes: Tx hashes used to filter results + :return list[MoneroTxWallet]: all wallet transactions (free memory using MoneroUtils.free()) + """ + ... + @typing.overload def get_txs(self, query: MoneroTxQuery) -> list[MoneroTxWallet]: """ Get wallet transactions. Wallet transactions contain one or more diff --git a/tests/config/config.ini b/tests/config/config.ini index 83e4464..7e56a06 100644 --- a/tests/config/config.ini +++ b/tests/config/config.ini @@ -1,4 +1,5 @@ [general] +test_relays=True test_non_relays=True lite_mode=False test_notifications=True diff --git a/tests/test_monero_daemon_interface.py b/tests/test_monero_daemon_interface.py index 2b355c0..cc05f91 100644 --- a/tests/test_monero_daemon_interface.py +++ b/tests/test_monero_daemon_interface.py @@ -19,7 +19,7 @@ def setup_and_teardown(self, request: pytest.FixtureRequest): @pytest.fixture(scope="class") def daemon(self) -> MoneroDaemon: - """Test rpc daemon instance""" + """Test daemon interface instance""" return MoneroDaemon() #region Tests diff --git a/tests/test_monero_daemon_rpc.py b/tests/test_monero_daemon_rpc.py index 32c7427..b37ffd3 100644 --- a/tests/test_monero_daemon_rpc.py +++ b/tests/test_monero_daemon_rpc.py @@ -60,7 +60,7 @@ def wallet(self) -> MoneroWalletRpc: # Can get the daemon's version @pytest.mark.skipif(Utils.TEST_NON_RELAYS is False, reason="TEST_NON_RELAYS disabled") - def test_get_version(self, daemon: MoneroDaemonRpc): + def test_get_version(self, daemon: MoneroDaemonRpc) -> None: version: MoneroVersion = daemon.get_version() assert version.number is not None AssertUtils.assert_true(version.number > 0) @@ -68,18 +68,18 @@ def test_get_version(self, daemon: MoneroDaemonRpc): # Can indicate if it's trusted @pytest.mark.skipif(Utils.TEST_NON_RELAYS is False, reason="TEST_NON_RELAYS disabled") - def test_is_trusted(self, daemon: MoneroDaemonRpc): + def test_is_trusted(self, daemon: MoneroDaemonRpc) -> None: daemon.is_trusted() # Can get the blockchain height @pytest.mark.skipif(Utils.TEST_NON_RELAYS is False, reason="TEST_NON_RELAYS disabled") - def test_get_height(self, daemon: MoneroDaemonRpc): + def test_get_height(self, daemon: MoneroDaemonRpc) -> None: height = daemon.get_height() AssertUtils.assert_true(height > 0, "Height must be greater than 0") # Can get a block hash by height @pytest.mark.skipif(Utils.TEST_NON_RELAYS is False, reason="TEST_NON_RELAYS disabled") - def test_get_block_id_by_height(self, daemon: MoneroDaemonRpc): + def test_get_block_id_by_height(self, daemon: MoneroDaemonRpc) -> None: last_header: MoneroBlockHeader = daemon.get_last_block_header() assert last_header.height is not None hash_str: str = daemon.get_block_hash(last_header.height) @@ -88,19 +88,19 @@ def test_get_block_id_by_height(self, daemon: MoneroDaemonRpc): # Can get a block template @pytest.mark.skipif(Utils.TEST_NON_RELAYS is False, reason="TEST_NON_RELAYS disabled") - def test_get_block_template(self, daemon: MoneroDaemonRpc): + def test_get_block_template(self, daemon: MoneroDaemonRpc) -> None: template: MoneroBlockTemplate = daemon.get_block_template(Utils.ADDRESS, 2) DaemonUtils.test_block_template(template) # Can get the last block's header @pytest.mark.skipif(Utils.TEST_NON_RELAYS is False, reason="TEST_NON_RELAYS disabled") - def test_get_last_block_header(self, daemon: MoneroDaemonRpc): + def test_get_last_block_header(self, daemon: MoneroDaemonRpc) -> None: last_header: MoneroBlockHeader = daemon.get_last_block_header() BlockUtils.test_block_header(last_header, True) # Can get a block header by hash @pytest.mark.skipif(Utils.TEST_NON_RELAYS is False, reason="TEST_NON_RELAYS disabled") - def test_get_block_header_by_hash(self, daemon: MoneroDaemonRpc): + def test_get_block_header_by_hash(self, daemon: MoneroDaemonRpc) -> None: # retrieve by hash of last block last_header: MoneroBlockHeader = daemon.get_last_block_header() assert last_header.height is not None @@ -117,7 +117,7 @@ def test_get_block_header_by_hash(self, daemon: MoneroDaemonRpc): # Can get a block header by height @pytest.mark.skipif(Utils.TEST_NON_RELAYS is False, reason="TEST_NON_RELAYS disabled") - def test_get_block_header_by_height(self, daemon: MoneroDaemonRpc): + def test_get_block_header_by_height(self, daemon: MoneroDaemonRpc) -> None: # retrieve by height of last block last_header: MoneroBlockHeader = daemon.get_last_block_header() assert last_header.height is not None @@ -133,7 +133,7 @@ def test_get_block_header_by_height(self, daemon: MoneroDaemonRpc): # Can get block headers by range # TODO: test start with no end, vice versa, inclusivity @pytest.mark.skipif(Utils.TEST_NON_RELAYS is False, reason="TEST_NON_RELAYS disabled") - def test_get_block_headers_by_range(self, daemon: MoneroDaemonRpc): + def test_get_block_headers_by_range(self, daemon: MoneroDaemonRpc) -> None: # determine start and end height based on number of blocks and how many blocks ago num_blocks = 100 num_blocks_ago = 100 @@ -155,7 +155,7 @@ def test_get_block_headers_by_range(self, daemon: MoneroDaemonRpc): # Can get a block by hash @pytest.mark.skipif(Utils.TEST_NON_RELAYS is False, reason="TEST_NON_RELAYS disabled") - def test_get_block_by_hash(self, daemon: MoneroDaemonRpc): + def test_get_block_by_hash(self, daemon: MoneroDaemonRpc) -> None: # test config ctx = TestContext() ctx.has_hex = True @@ -187,7 +187,7 @@ def test_get_blocks_by_hash_binary(self) -> None: # Can get a block by height @pytest.mark.skipif(Utils.TEST_NON_RELAYS is False, reason="TEST_NON_RELAYS disabled") - def test_get_block_by_height(self, daemon: MoneroDaemonRpc): + def test_get_block_by_height(self, daemon: MoneroDaemonRpc) -> None: # config for testing blocks ctx = TestContext() ctx.has_hex = True @@ -209,7 +209,7 @@ def test_get_block_by_height(self, daemon: MoneroDaemonRpc): # Can get blocks by height which includes transactions (binary) @pytest.mark.skipif(Utils.TEST_NON_RELAYS is False, reason="TEST_NON_RELAYS disabled") - def test_get_blocks_by_height_binary(self, daemon: MoneroDaemonRpc): + def test_get_blocks_by_height_binary(self, daemon: MoneroDaemonRpc) -> None: # set number of blocks to test num_blocks = 100 @@ -248,7 +248,7 @@ def test_get_blocks_by_height_binary(self, daemon: MoneroDaemonRpc): # Can get blocks by range in a single request @pytest.mark.skipif(Utils.TEST_NON_RELAYS is False, reason="TEST_NON_RELAYS disabled") - def test_get_blocks_by_range(self, daemon: MoneroDaemonRpc): + def test_get_blocks_by_range(self, daemon: MoneroDaemonRpc) -> None: # get height range num_blocks = 100 num_blocks_ago = 102 @@ -270,7 +270,7 @@ def test_get_blocks_by_range(self, daemon: MoneroDaemonRpc): # Can get blocks by range using chunked requests @pytest.mark.skipif(Utils.TEST_NON_RELAYS is False, reason="TEST_NON_RELAYS disabled") - def test_get_blocks_by_range_chunked(self, daemon: MoneroDaemonRpc): + def test_get_blocks_by_range_chunked(self, daemon: MoneroDaemonRpc) -> None: # get long height range num_blocks = min(daemon.get_height() - 2, 1440) # test up to ~2 days of blocks AssertUtils.assert_true(num_blocks > 0) @@ -376,7 +376,7 @@ def test_get_txs_by_hashes(self, daemon: MoneroDaemonRpc, wallet: MoneroWalletRp # Can get transaction pool statistics @pytest.mark.skipif(Utils.TEST_NON_RELAYS is False, reason="TEST_NON_RELAYS disabled") - def test_get_tx_pool_statistics(self, daemon: MoneroDaemonRpc, wallet: MoneroWalletRpc): + def test_get_tx_pool_statistics(self, daemon: MoneroDaemonRpc, wallet: MoneroWalletRpc) -> None: wallet = wallet Utils.WALLET_TX_TRACKER.wait_for_txs_to_clear_pool([wallet]) tx_ids: list[str] = [] @@ -420,32 +420,32 @@ def test_get_fee_estimate(self, daemon: MoneroDaemonRpc) -> None: # Can get general information @pytest.mark.skipif(Utils.TEST_NON_RELAYS is False, reason="TEST_NON_RELAYS disabled") - def test_get_general_information(self, daemon: MoneroDaemonRpc): + def test_get_general_information(self, daemon: MoneroDaemonRpc) -> None: info: MoneroDaemonInfo = daemon.get_info() DaemonUtils.test_info(info) # Can get sync information @pytest.mark.skipif(Utils.TEST_NON_RELAYS is False, reason="TEST_NON_RELAYS disabled") - def test_get_sync_information(self, daemon: MoneroDaemonRpc): + def test_get_sync_information(self, daemon: MoneroDaemonRpc) -> None: sync_info: MoneroDaemonSyncInfo = daemon.get_sync_info() DaemonUtils.test_sync_info(sync_info) # Can get hard fork information @pytest.mark.skipif(Utils.TEST_NON_RELAYS is False, reason="TEST_NON_RELAYS disabled") - def test_get_hard_fork_information(self, daemon: MoneroDaemonRpc): + def test_get_hard_fork_information(self, daemon: MoneroDaemonRpc) -> None: hard_fork_info: MoneroHardForkInfo = daemon.get_hard_fork_info() DaemonUtils.test_hard_fork_info(hard_fork_info) # Can get alternative chains @pytest.mark.skipif(Utils.TEST_NON_RELAYS is False, reason="TEST_NON_RELAYS disabled") - def test_get_alternative_chains(self, daemon: MoneroDaemonRpc): + def test_get_alternative_chains(self, daemon: MoneroDaemonRpc) -> None: alt_chains: list[MoneroAltChain] = daemon.get_alt_chains() for altChain in alt_chains: DaemonUtils.test_alt_chain(altChain) # Can get alternative block hashes @pytest.mark.skipif(Utils.TEST_NON_RELAYS is False, reason="TEST_NON_RELAYS disabled") - def test_get_alternative_block_ids(self, daemon: MoneroDaemonRpc): + def test_get_alternative_block_ids(self, daemon: MoneroDaemonRpc) -> None: alt_block_ids: list[str] = daemon.get_alt_block_hashes() for altBlockId in alt_block_ids: AssertUtils.assert_not_none(altBlockId) @@ -453,7 +453,7 @@ def test_get_alternative_block_ids(self, daemon: MoneroDaemonRpc): # Can get, set, and reset a download bandwidth limit @pytest.mark.skipif(Utils.TEST_NON_RELAYS is False, reason="TEST_NON_RELAYS disabled") - def test_set_download_bandwidth(self, daemon: MoneroDaemonRpc): + def test_set_download_bandwidth(self, daemon: MoneroDaemonRpc) -> None: init_val: int = daemon.get_download_limit() AssertUtils.assert_true(init_val > 0) set_val: int = init_val * 2 @@ -473,7 +473,7 @@ def test_set_download_bandwidth(self, daemon: MoneroDaemonRpc): # Can get, set, and reset an upload bandwidth limit @pytest.mark.skipif(Utils.TEST_NON_RELAYS is False, reason="TEST_NON_RELAYS disabled") - def test_set_upload_bandwidth(self, daemon: MoneroDaemonRpc): + def test_set_upload_bandwidth(self, daemon: MoneroDaemonRpc) -> None: init_val: int = daemon.get_upload_limit() AssertUtils.assert_true(init_val > 0) set_val: int = init_val * 2 @@ -493,7 +493,7 @@ def test_set_upload_bandwidth(self, daemon: MoneroDaemonRpc): # Can get peers with active incoming or outgoing connections @pytest.mark.skipif(Utils.TEST_NON_RELAYS is False, reason="TEST_NON_RELAYS disabled") - def test_get_peers(self, daemon: MoneroDaemonRpc): + def test_get_peers(self, daemon: MoneroDaemonRpc) -> None: peers: list[MoneroPeer] = daemon.get_peers() AssertUtils.assert_false(len(peers) == 0, "Daemon has no incoming or outgoing peers to test") for peer in peers: @@ -501,7 +501,7 @@ def test_get_peers(self, daemon: MoneroDaemonRpc): # Can get all known peers which may be online or offline @pytest.mark.skipif(Utils.TEST_NON_RELAYS is False, reason="TEST_NON_RELAYS disabled") - def test_get_known_peers(self, daemon: MoneroDaemonRpc): + def test_get_known_peers(self, daemon: MoneroDaemonRpc) -> None: peers: list[MoneroPeer] = daemon.get_known_peers() if Utils.REGTEST: AssertUtils.assert_true(len(peers) == 0, "Regtest daemon should not have known peers to test") @@ -513,21 +513,21 @@ def test_get_known_peers(self, daemon: MoneroDaemonRpc): # Can limit the number of outgoing peers @pytest.mark.skipif(Utils.TEST_NON_RELAYS is False, reason="TEST_NON_RELAYS disabled") - def test_set_outgoing_peer_limit(self, daemon: MoneroDaemonRpc): + def test_set_outgoing_peer_limit(self, daemon: MoneroDaemonRpc) -> None: daemon.set_outgoing_peer_limit(0) daemon.set_outgoing_peer_limit(8) daemon.set_outgoing_peer_limit(10) # Can limit the number of incoming peers @pytest.mark.skipif(Utils.TEST_NON_RELAYS is False, reason="TEST_NON_RELAYS disabled") - def test_set_incoming_peer_limit(self, daemon: MoneroDaemonRpc): + def test_set_incoming_peer_limit(self, daemon: MoneroDaemonRpc) -> None: daemon.set_incoming_peer_limit(0) daemon.set_incoming_peer_limit(8) daemon.set_incoming_peer_limit(10) # Can notify listeners when a new block is added to the chain @pytest.mark.skipif(Utils.LITE_MODE is True or Utils.TEST_NOTIFICATIONS is False, reason="TEST_NOTIFICATIONS disabled") - def test_block_listener(self, daemon: MoneroDaemonRpc, wallet: MoneroWalletRpc): + def test_block_listener(self, daemon: MoneroDaemonRpc, wallet: MoneroWalletRpc) -> None: try: # start mining if possible to help push the network along address: str = wallet.get_primary_address() @@ -556,7 +556,7 @@ def test_block_listener(self, daemon: MoneroDaemonRpc, wallet: MoneroWalletRpc): # Can ban a peer @pytest.mark.skipif(Utils.TEST_NON_RELAYS is False, reason="TEST_NON_RELAYS disabled") - def test_ban_peer(self, daemon: MoneroDaemonRpc): + def test_ban_peer(self, daemon: MoneroDaemonRpc) -> None: # set ban host = "192.168.1.51" ban = MoneroBan() @@ -577,7 +577,7 @@ def test_ban_peer(self, daemon: MoneroDaemonRpc): # Can ban peers @pytest.mark.skipif(Utils.TEST_NON_RELAYS is False, reason="TEST_NON_RELAYS disabled") - def test_ban_peers(self, daemon: MoneroDaemonRpc): + def test_ban_peers(self, daemon: MoneroDaemonRpc) -> None: # set bans addr1 = "192.168.1.52" addr2 = "192.168.1.53" @@ -610,7 +610,7 @@ def test_ban_peers(self, daemon: MoneroDaemonRpc): # Can start and stop mining @pytest.mark.skipif(Utils.TEST_NON_RELAYS is False, reason="TEST_NON_RELAYS disabled") - def test_mining(self, daemon: MoneroDaemonRpc, wallet: MoneroWalletRpc): + def test_mining(self, daemon: MoneroDaemonRpc, wallet: MoneroWalletRpc) -> None: # stop mining at beginning of test try: daemon.stop_mining() @@ -628,7 +628,7 @@ def test_mining(self, daemon: MoneroDaemonRpc, wallet: MoneroWalletRpc): # Can get mining status @pytest.mark.skipif(Utils.TEST_NON_RELAYS is False, reason="TEST_NON_RELAYS disabled") - def test_get_mining_status(self, daemon: MoneroDaemonRpc, wallet: MoneroWalletRpc): + def test_get_mining_status(self, daemon: MoneroDaemonRpc, wallet: MoneroWalletRpc) -> None: try: # stop mining at beginning of test try: @@ -666,7 +666,7 @@ def test_get_mining_status(self, daemon: MoneroDaemonRpc, wallet: MoneroWalletRp # Can submit a mined block to the network @pytest.mark.skipif(Utils.TEST_NON_RELAYS is False, reason="TEST_NON_RELAYS disabled") @pytest.mark.flaky(reruns=5, reruns_delay=5) - def test_submit_mined_block(self, daemon: MoneroDaemonRpc): + def test_submit_mined_block(self, daemon: MoneroDaemonRpc) -> None: # get template to mine on template: MoneroBlockTemplate = daemon.get_block_template(Utils.ADDRESS) assert template.block_template_blob is not None @@ -683,7 +683,7 @@ def test_submit_mined_block(self, daemon: MoneroDaemonRpc): # Can prune the blockchain @pytest.mark.skipif(Utils.TEST_NON_RELAYS is False, reason="TEST_NON_RELAYS disabled") - def test_prune_blockchain(self, daemon: MoneroDaemonRpc): + def test_prune_blockchain(self, daemon: MoneroDaemonRpc) -> None: result: MoneroPruneResult = daemon.prune_blockchain(True) if result.is_pruned: @@ -695,14 +695,14 @@ def test_prune_blockchain(self, daemon: MoneroDaemonRpc): # Can check for an update @pytest.mark.skipif(Utils.TEST_NON_RELAYS is False, reason="TEST_NON_RELAYS disabled") @pytest.mark.flaky(reruns=5, reruns_delay=5) - def test_check_for_update(self, daemon: MoneroDaemonRpc): + def test_check_for_update(self, daemon: MoneroDaemonRpc) -> None: result: MoneroDaemonUpdateCheckResult = daemon.check_for_update() DaemonUtils.test_update_check_result(result) # Can download an update @pytest.mark.skipif(Utils.TEST_NON_RELAYS is False, reason="TEST_NON_RELAYS disabled") @pytest.mark.flaky(reruns=5, reruns_delay=5) - def test_download_update(self, daemon: MoneroDaemonRpc): + def test_download_update(self, daemon: MoneroDaemonRpc) -> None: try: # download to default path result: MoneroDaemonUpdateDownloadResult = daemon.download_update() @@ -730,7 +730,7 @@ def test_download_update(self, daemon: MoneroDaemonRpc): # Can be stopped #@pytest.mark.skipif(Utils.TEST_NON_RELAYS is False, reason="TEST_NON_RELAYS disabled") @pytest.mark.skip(reason="test is disabled to not interfere with other tests") - def test_stop(self, daemon: MoneroDaemonRpc): + def test_stop(self, daemon: MoneroDaemonRpc) -> None: # stop the daemon daemon.stop() diff --git a/tests/test_monero_rpc_connection.py b/tests/test_monero_rpc_connection.py index bb36458..cf04dee 100644 --- a/tests/test_monero_rpc_connection.py +++ b/tests/test_monero_rpc_connection.py @@ -11,6 +11,15 @@ class TestMoneroRpcConnection: """Rpc connection integration tests""" + # Setup and teardown of test class + @pytest.fixture(scope="class", autouse=True) + def global_setup_and_teardown(self): + """Executed once before all tests""" + logger.info(f"Setup test class {type(self).__name__}") + yield + logger.info(f"Teardown test class {type(self).__name__}") + + # Setup and teardown of each tests @pytest.fixture(autouse=True) def setup_and_teardown(self, request: pytest.FixtureRequest): logger.info(f"Before {request.node.name}") # type: ignore @@ -18,18 +27,18 @@ def setup_and_teardown(self, request: pytest.FixtureRequest): logger.info(f"After {request.node.name}") # type: ignore # Test monerod rpc connection - def test_node_rpc_connection(self): + def test_node_rpc_connection(self) -> None: connection = MoneroRpcConnection(Utils.DAEMON_RPC_URI, Utils.DAEMON_RPC_USERNAME, Utils.DAEMON_RPC_PASSWORD) DaemonUtils.test_rpc_connection(connection, Utils.DAEMON_RPC_URI) # Test wallet rpc connection @pytest.mark.skipif(Utils.TEST_NON_RELAYS is False, reason="TEST_NON_RELAYS disabled") - def test_wallet_rpc_connection(self): + def test_wallet_rpc_connection(self) -> None: connection = MoneroRpcConnection(Utils.WALLET_RPC_URI, Utils.WALLET_RPC_USERNAME, Utils.WALLET_RPC_PASSWORD) DaemonUtils.test_rpc_connection(connection, Utils.WALLET_RPC_URI) # Test invalid connection @pytest.mark.skipif(Utils.TEST_NON_RELAYS is False, reason="TEST_NON_RELAYS disabled") - def test_invalid_connection(self): + def test_invalid_connection(self) -> None: connection = MoneroRpcConnection(Utils.OFFLINE_SERVER_URI) DaemonUtils.test_rpc_connection(connection, Utils.OFFLINE_SERVER_URI, False) diff --git a/tests/test_monero_utils.py b/tests/test_monero_utils.py index 99fa1f1..1c89e0b 100644 --- a/tests/test_monero_utils.py +++ b/tests/test_monero_utils.py @@ -18,36 +18,65 @@ class TestMoneroUtils: """Monero utilities unit tests""" class Config: + """Utils tests configuration""" mainnet: AddressBook = AddressBook() + """Mainnet address book.""" testnet: AddressBook = AddressBook() + """Testnet address book.""" stagenet: AddressBook = AddressBook() + """Stagenet address book.""" keys: KeysBook = KeysBook() + """Wallet keys book.""" serialization_msg: str = '' + """Message to serialize.""" @classmethod def parse(cls, parser: ConfigParser) -> TestMoneroUtils.Config: + """ + Parse utils tests configuration + + :param ConfigParser parser: configuration parser. + :returns TestMoneroUtils.Config: parsed test utils configuration. + """ config = cls() + # check section if not parser.has_section("serialization"): raise Exception("Cannot find section [serialization] in test_monero_utils.ini") + # load address books config.mainnet = AddressBook.parse(parser, "mainnet") config.testnet = AddressBook.parse(parser, "testnet") config.stagenet = AddressBook.parse(parser, "stagenet") + # load keys book config.keys = KeysBook.parse(parser) config.serialization_msg = parser.get("serialization", "msg") return config + #region Fixtures + @pytest.fixture(scope="class") def config(self) -> TestMoneroUtils.Config: parser = ConfigParser() parser.read('tests/config/test_monero_utils.ini') return TestMoneroUtils.Config.parse(parser) + # Setup and teardown of test class + @pytest.fixture(scope="class", autouse=True) + def global_setup_and_teardown(self): + """Executed once before all tests""" + logger.info(f"Setup test class {type(self).__name__}") + yield + logger.info(f"Teardown test class {type(self).__name__}") + + # Setup and teardown of each tests @pytest.fixture(autouse=True) def setup_and_teardown(self, request: pytest.FixtureRequest): + """Executed once before each test""" logger.info(f"Before {request.node.name}") # type: ignore yield logger.info(f"After {request.node.name}") # type: ignore + #endregion + #region Tests # Can get integrated addresses diff --git a/tests/test_monero_wallet_common.py b/tests/test_monero_wallet_common.py index 89493fc..09326f0 100644 --- a/tests/test_monero_wallet_common.py +++ b/tests/test_monero_wallet_common.py @@ -3,6 +3,7 @@ import pytest import logging +from random import shuffle from configparser import ConfigParser from abc import ABC, abstractmethod from typing import Optional @@ -11,7 +12,10 @@ from monero import ( MoneroWallet, MoneroWalletRpc, MoneroDaemonRpc, MoneroWalletConfig, MoneroTxConfig, MoneroDestination, MoneroRpcConnection, MoneroError, - MoneroKeyImage, MoneroTxQuery, MoneroUtils, MoneroBlock + MoneroKeyImage, MoneroTxQuery, MoneroUtils, MoneroBlock, MoneroTransferQuery, + MoneroOutputQuery, MoneroTransfer, MoneroIncomingTransfer, MoneroOutgoingTransfer, + MoneroTxWallet, MoneroOutputWallet, MoneroTx, MoneroAccount, MoneroSubaddress, + MoneroMessageSignatureType, MoneroTxPriority ) from utils import ( TestUtils, WalletEqualityUtils, MiningUtils, @@ -166,11 +170,13 @@ def wallet(self) -> MoneroWallet: """Test wallet instance""" pytest.skip("No wallet test instance setup") - # Before all tests + # Setup and teardown of test class @pytest.fixture(scope="class", autouse=True) - def before_all(self) -> None: + def global_setup_and_teardown(self): """Executed once before all tests""" - self._setup_blockchain() + self.before_all() + yield + self.after_all() # Setup and teardown of each test @pytest.fixture(autouse=True) @@ -179,6 +185,26 @@ def setup_and_teardown(self, request: pytest.FixtureRequest): yield self.after_each(request) + # Before all tests + def before_all(self) -> None: + """Executed once before all tests""" + logger.info(f"Setup test class {type(self).__name__}") + self._setup_blockchain() + + # After all tests + def after_all(self) -> None: + """Executed once after all tests""" + logger.info(f"Teardown test class {type(self).__name__}") + daemon: MoneroDaemonRpc | None = self._get_test_daemon() + try: + daemon.stop_mining() + except Exception as e: + logger.debug(str(e)) + + # close wallet + wallet = self.get_test_wallet() + wallet.close(True) + # Before each test def before_each(self, request: pytest.FixtureRequest) -> None: """ @@ -188,6 +214,13 @@ def before_each(self, request: pytest.FixtureRequest) -> None: """ logger.info(f"Before {request.node.name}") # type: ignore + daemon = self._get_test_daemon() + wallet = self.get_test_wallet() + status = daemon.get_mining_status() + + if status.is_active is True: + wallet.stop_mining() + # After each test def after_each(self, request: pytest.FixtureRequest) -> None: """ @@ -197,6 +230,13 @@ def after_each(self, request: pytest.FixtureRequest) -> None: """ logger.info(f"After {request.node.name}") # type: ignore + daemon = self._get_test_daemon() + status = daemon.get_mining_status() + + if status.is_active is True: + logger.warning(f"Mining is active after test {request.node.name}") # type: ignore + + #endregion #region Tests @@ -922,8 +962,6 @@ def test_set_subaddress_label(self, wallet: MoneroWallet) -> None: assert label == wallet.get_subaddress(0, subaddress_idx).label subaddress_idx += 1 - #region Txs Tests - def _test_send_to_single(self, wallet: MoneroWallet, can_split: bool, relay: Optional[bool] = None, payment_id: Optional[str] = None) -> None: config = MoneroTxConfig() config.can_split = can_split @@ -1025,6 +1063,751 @@ def test_get_txs_wallet(self, wallet: MoneroWallet) -> None: # TODO enable this after setting send-to-multiple order #assert non_default_incoming, "No incoming transfers found to non-default account and subaddress; run send-to-multiple tests first" + # Can get transactions by hash + @pytest.mark.skipif(TestUtils.TEST_NON_RELAYS is False, reason="TEST_NON_RELAYS disabled") + def test_get_txs_by_hash(self, wallet: MoneroWallet) -> None: + # max number of txs to test + max_num_txs: int = 10 + + # fetch all txs for testing + txs = wallet.get_txs() + num_txs = len(txs) + assert num_txs > 1, f"Test requires at least 2 txs to fetch by hash, got {num_txs}" + + # randomly pick a few for fetching by hash + shuffle(txs) + txs = txs[0:min(max_num_txs, num_txs)] + + # test fetching by hash + tx_hash = txs[0].hash + assert tx_hash is not None + fetched_tx = wallet.get_tx(tx_hash) + assert fetched_tx is not None + assert tx_hash == fetched_tx.hash + TxUtils.test_tx_wallet(fetched_tx) + + # test fetching by hashes + tx_id1 = txs[0].hash + tx_id2 = txs[1].hash + assert tx_id1 is not None + assert tx_id2 is not None + fetched_txs = wallet.get_txs([tx_id1, tx_id2]) + num_fetched_txs = len(fetched_txs) + assert num_fetched_txs == 2, f"Expected 2 txs, got {num_fetched_txs}" + + # test fetching by hashes as collection + tx_hashes: list[str] = [] + for tx in txs: + assert tx.hash is not None + tx_hashes.append(tx.hash) + + fetched_txs = wallet.get_txs(tx_hashes) + assert len(txs) == len(fetched_txs) + for i, tx in enumerate(txs): + fetched_tx = fetched_txs[i] + assert tx.hash == fetched_tx.hash + TxUtils.test_tx_wallet(fetched_tx) + + # test fetching with missing tx hashes + missing_hash: str = "d01ede9cde813b2a693069b640c4b99c5adbdb49fbbd8da2c16c8087d0c3e320" + tx_hashes.append(missing_hash) + fetched_txs = wallet.get_txs(tx_hashes) + assert len(txs) == len(fetched_txs) + for i, tx in enumerate(txs): + fetched_tx = fetched_txs[i] + assert tx.hash == fetched_tx.hash + TxUtils.test_tx_wallet(fetched_tx) + + # Can get transactions with additional configuration + @pytest.mark.skipif(TestUtils.TEST_NON_RELAYS is False, reason="TEST_NON_RELAYS disabled") + def test_get_txs_with_query(self, wallet: MoneroWallet) -> None: + # get random transactions for testing + random_txs = TxUtils.get_random_transactions(wallet, None, 3, 5) + for random_tx in random_txs: + TxUtils.test_tx_wallet(random_tx, None) + + # get transactions by hash + tx_hashes: list[str] = [] + for random_tx in random_txs: + assert random_tx.hash is not None + tx_hashes.append(random_tx.hash) + query = MoneroTxQuery() + query.hash = random_tx.hash + txs = TxUtils.get_and_test_txs(wallet, query, None, True, TestUtils.REGTEST) + assert len(txs) == 1 + # txs change with chain so check mergeability + merged = txs[0] + merged.merge(random_tx.copy()) + TxUtils.test_tx_wallet(merged) + + # get transactions by hashes + query = MoneroTxQuery() + query.hashes = tx_hashes + txs = TxUtils.get_and_test_txs(wallet, query, None, None, TestUtils.REGTEST) + assert len(txs) == len(random_txs) + for tx in txs: + assert tx.hash in tx_hashes + + # get transactions with an outgoing transfer + ctx: TxContext = TxContext() + ctx.has_outgoing_transfer = True + query = MoneroTxQuery() + query.is_outgoing = True + txs = TxUtils.get_and_test_txs(wallet, query, ctx, True, TestUtils.REGTEST) + for tx in txs: + assert tx.is_outgoing is True + assert tx.outgoing_transfer is not None + TxUtils.test_transfer(tx.outgoing_transfer, None) + + # get transactions without an outgoing transfer + ctx.has_outgoing_transfer = False + query = MoneroTxQuery() + query.is_outgoing = False + txs = TxUtils.get_and_test_txs(wallet, query, ctx, True, TestUtils.REGTEST) + for tx in txs: + assert tx.outgoing_transfer is None + + # get transactions with incoming transfers + ctx = TxContext() + ctx.has_incoming_transfers = True + query = MoneroTxQuery() + query.is_incoming = True + txs = TxUtils.get_and_test_txs(wallet, query, ctx, True, TestUtils.REGTEST) + for tx in txs: + assert tx.is_incoming is True + assert len(tx.incoming_transfers) > 0 + for transfer in tx.incoming_transfers: + TxUtils.test_transfer(transfer, None) + + # get transactions associated with an account + account_idx: int = 1 + query = MoneroTxQuery() + query.transfer_query = MoneroTransferQuery() + query.transfer_query.account_index = account_idx + txs = wallet.get_txs(query) + + for tx in txs: + found: bool = False + if tx.is_outgoing: + assert tx.outgoing_transfer is not None + if tx.outgoing_transfer.account_index == account_idx: + found = True + elif len(tx.incoming_transfers) > 0: + for transfer in tx.incoming_transfers: + if transfer.account_index == account_idx: + found = True + break + + assert found, f"Transaction is not associated with account {account_idx}: \n{tx.serialize()}" + + # get txs with manually built query that are confirmed and have an outgoing transfer from account 0 + ctx = TxContext() + ctx.has_outgoing_transfer = True + tx_query = MoneroTxQuery() + tx_query.is_confirmed = True + tx_query.transfer_query = MoneroTransferQuery() + tx_query.transfer_query.account_index = 0 + tx_query.transfer_query.outgoing = True + txs = TxUtils.get_and_test_txs(wallet, tx_query, ctx, True, TestUtils.REGTEST) + for tx in txs: + if tx.is_confirmed is not True: + logger.warning(f"{tx.serialize()}") + assert tx.is_confirmed is True + assert tx.is_outgoing is True + assert tx.outgoing_transfer is not None + assert tx.outgoing_transfer.account_index == 0 + + # get txs with outgoing transfers that have destinations to account 1 + tx_query = MoneroTxQuery() + tx_query.is_confirmed = True + tx_query.transfer_query = MoneroTransferQuery() + tx_query.transfer_query.account_index = 0 + tx_query.transfer_query.has_destinations = True + + txs = TxUtils.get_and_test_txs(wallet, tx_query, None, None, TestUtils.REGTEST) + for tx in txs: + assert tx.is_outgoing is True + assert tx.outgoing_transfer is not None + assert len(tx.outgoing_transfer.destinations) > 0 + + # include outputs with transactions + ctx = TxContext() + ctx.include_outputs = True + tx_query = MoneroTxQuery() + tx_query.include_outputs = True + txs = TxUtils.get_and_test_txs(wallet, tx_query, ctx, True, TestUtils.REGTEST) + found: bool = False + for tx in txs: + if len(tx.outputs) > 0: + found = True + else: + # TODO: monero-wallet-rpc: return outputs for unconfirmed txs + assert tx.is_outgoing or (tx.is_incoming and tx.is_confirmed is False) + + assert found, "No outputs found in txs" + + # get txs with input query + # TODO no inputs returned to filter + + # get txs with output query + tx_query = MoneroTxQuery() + tx_query.output_query = MoneroOutputQuery() + tx_query.output_query.is_spent = False + tx_query.output_query.account_index = 1 + tx_query.output_query.subaddress_index = 2 + txs = wallet.get_txs(tx_query) + assert len(txs) > 0 + for tx in txs: + assert len(tx.outputs) > 0 + found = False + for output in tx.get_outputs_wallet(): + if output.is_spent is False and output.account_index == 1 and output.subaddress_index == 2: + found = True + break + + if not found: + raise Exception(f"Tx does not contain specified output") + + # get unlocked txs + tx_query = MoneroTxQuery() + tx_query.is_locked = False + txs = wallet.get_txs(tx_query) + assert len(txs) > 0 + for tx in txs: + assert tx.is_locked is False + + # get confirmed transactions sent from/to same wallet with a transfer with destinations + # TODO implement send from/to multiple tests + #tx_query = MoneroTxQuery() + #tx_query.is_incoming = True + #tx_query.is_outgoing = True + #tx_query.include_outputs = True + #tx_query.is_confirmed = True + #tx_query.transfer_query = MoneroTransferQuery() + #tx_query.transfer_query.has_destinations = True + + #txs = wallet.get_txs(tx_query) + #assert len(txs) > 0 + #for tx in txs: + #assert tx.is_incoming is True + #assert tx.is_outgoing is True + #assert tx.is_confirmed is True + #assert len(tx.get_outputs_wallet()) > 0 + #assert tx.outgoing_transfer is not None + #assert len(tx.outgoing_transfer.destinations) > 0 + + # Validates inputs when getting transactions + @pytest.mark.skipif(TestUtils.TEST_NON_RELAYS is False or TestUtils.LITE_MODE, reason="TEST_NON_RELAYS disabled") + def test_validate_inputs_get_txs(self, wallet: MoneroWallet) -> None: + # fetch random txs for testing + random_txs: list[MoneroTxWallet] = TxUtils.get_random_transactions(wallet, None, 3, 5) + + # valid, invalid, and unknown tx hashes for tests + tx_hash = random_txs[0].hash + invalid_hash = "invalid_id" + unknown_hash1 = "6c4982f2499ece80e10b627083c4f9b992a00155e98bcba72a9588ccb91d0a61" + unknown_hash2 = "ff397104dd875882f5e7c66e4f852ee134f8cf45e21f0c40777c9188bc92e943" + assert tx_hash is not None and len(tx_hash) > 0 + + # fetch unknown tx hash + fetched_tx = wallet.get_tx(unknown_hash1) + assert fetched_tx is None + + # fetch unknown tx hash using query + tx_query: MoneroTxQuery = MoneroTxQuery() + tx_query.hash = unknown_hash1 + fetched_txs = wallet.get_txs(tx_query) + assert len(fetched_txs) == 0 + + # fetch unknwon tx hash in list + txs = wallet.get_txs([tx_hash, unknown_hash1]) + assert len(txs) == 1 + assert txs[0].hash == tx_hash + + # fetch unknwon tx hashes in list + txs = wallet.get_txs([tx_hash, unknown_hash1, unknown_hash2]) + assert len(txs) == 1 + assert txs[0].hash == tx_hash + + # fetch invalid hash + fetched_tx = wallet.get_tx(invalid_hash) + assert fetched_tx is None + + # fetch invalid hash list + txs = wallet.get_txs([tx_hash, invalid_hash]) + assert len(txs) == 1 + assert txs[0].hash == tx_hash + + # fetch invalid hashes in list + txs = wallet.get_txs([tx_hash, invalid_hash, "invalid_hash_2"]) + assert len(txs) == 1 + assert txs[0].hash == tx_hash + + # test collection of invalid hashes + tx_query = MoneroTxQuery() + tx_query.hashes = [tx_hash, invalid_hash, "invalid_hash_2"] + txs = wallet.get_txs(tx_query) + assert len(txs) == 1 + + # test txs + for tx in txs: + TxUtils.test_tx_wallet(tx) + + # Can get transfers in the wallet, accounts, and subaddresses + @pytest.mark.skipif(TestUtils.TEST_NON_RELAYS is False, reason="TEST_NON_RELAYS disabled") + def test_get_transfers(self, wallet: MoneroWallet) -> None: + # get all transfers + TxUtils.get_and_test_transfers(wallet, None, None, True) + + # get transfers by account index + non_default_incoming: bool = False + for account in wallet.get_accounts(True): + transfer_query = MoneroTransferQuery() + transfer_query.account_index = account.index + account_transfers = TxUtils.get_and_test_transfers(wallet, transfer_query, None, None) + for transfer in account_transfers: + assert transfer.account_index == account.index + + # get transfers by subaddress index + subaddress_transfers: list[MoneroTransfer] = [] + for subaddress in account.subaddresses: + subaddress_query = MoneroTransferQuery() + subaddress_query.account_index = subaddress.account_index + subaddress_query.subaddress_index = subaddress.index + transfers = TxUtils.get_and_test_transfers(wallet, subaddress_query, None, None) + + for transfer in transfers: + # test account and subaddress indices + assert subaddress.account_index == transfer.account_index + if transfer.is_incoming() is True: + assert isinstance(transfer, MoneroIncomingTransfer) + assert subaddress.index == transfer.subaddress_index + if transfer.account_index != 0 and transfer.subaddress_index != 0: + non_default_incoming = True + else: + assert isinstance(transfer, MoneroOutgoingTransfer) + assert subaddress.index in transfer.subaddress_indices + if transfer.account_index != 0: + for subaddr_idx in transfer.subaddress_indices: + if subaddr_idx > 0: + non_default_incoming = True + break + + # don't add duplicates + # TODO monero-wallet-rpc: duplicate outgoing transfers returned for different + # subaddress indices, way to return outgoing subaddress indices? + + found: bool = False + for subaddress_transfer in subaddress_transfers: + eq_hash: bool = transfer.tx.hash == subaddress_transfer.tx.hash + if transfer.serialize() == subaddress_transfer.serialize() and eq_hash: + found = True + break + + if not found: + subaddress_transfers.append(transfer) + + assert len(account_transfers) == len(subaddress_transfers) + + # collect unique subaddress indices + subaddress_indices: set[int] = set() + for transfer in subaddress_transfers: + if transfer.is_incoming(): + assert isinstance(transfer, MoneroIncomingTransfer) + assert transfer.subaddress_index is not None + subaddress_indices.add(transfer.subaddress_index) + else: + assert isinstance(transfer, MoneroOutgoingTransfer) + for idx in transfer.subaddress_indices: + subaddress_indices.add(idx) + + # get and test transfers by subaddress indices + transfer_query = MoneroTransferQuery() + transfer_query.account_index = account.index + for idx in subaddress_indices: + transfer_query.subaddress_indices.append(idx) + + transfers = TxUtils.get_and_test_transfers(wallet, transfer_query, None, None) + # TODO monero-wallet-rpc: these may not be equal because outgoing transfers are always from subaddress 0 (#5171) + # and/or incoming transfers from/to same account are occluded (#4500) + assert len(subaddress_transfers) == len(transfers) + for transfer in transfers: + assert transfer.account_index == account.index + if transfer.is_incoming(): + assert isinstance(transfer, MoneroIncomingTransfer) + assert transfer.subaddress_index in subaddress_indices + else: + assert isinstance(transfer, MoneroOutgoingTransfer) + intersections: set[int] = set(subaddress_indices) + overlap = intersections.intersection(transfer.subaddress_indices) + assert overlap is not None and len(overlap) > 0, "Subaddresses must overlap" + + # ensure transfer found with non-zero account and subaddress indices + assert non_default_incoming, "No transfers found in non-default account and subaddress; run send-to-multiple tests" + + # Validates inputs when getting transfers + @pytest.mark.skipif(TestUtils.TEST_NON_RELAYS is False or TestUtils.LITE_MODE, reason="TEST_NON_RELAYS disabled") + def test_validate_inputs_get_transfers(self, wallet: MoneroWallet) -> None: + # test with invalid hash + transfer_query: MoneroTransferQuery = MoneroTransferQuery() + transfer_query.tx_query = MoneroTxQuery() + transfer_query.tx_query.hash = "invalid_id" + + transfers: list[MoneroTransfer] = wallet.get_transfers(transfer_query) + assert len(transfers) == 0 + + # test invalid hash in list + random_txs = TxUtils.get_random_transactions(wallet, None, 3, 5) + transfer_query.tx_query = MoneroTxQuery() + random_hash = random_txs[0].hash + assert random_hash is not None + transfer_query.tx_query.hashes.append(random_hash) + transfer_query.tx_query.hashes.append("invalid_id") + + transfers = wallet.get_transfers(transfer_query) + assert len(transfers) > 0 + + tx: MoneroTxWallet = transfers[0].tx + for transfer in transfers: + assert transfer.tx == tx + + # test unused subaddress indices + transfer_query = MoneroTransferQuery() + transfer_query.account_index = 0 + transfer_query.subaddress_indices.append(1234907) + transfers = wallet.get_transfers(transfer_query) + + # test unused subaddress index + try: + transfer_query = MoneroTransferQuery() + transfer_query.account_index = 0 + transfer_query.subaddress_index = -1 + transfers = wallet.get_transfers(transfer_query) + raise Exception("Should have failed") + except Exception as e: + assert "Should have failed" != str(e) + + # Can get outputs in the wallet, accounts, and subaddresses + @pytest.mark.skipif(TestUtils.TEST_NON_RELAYS is False, reason="TEST_NON_RELAYS disabled") + def test_get_outputs(self, wallet: MoneroWallet) -> None: + # get all outputs + TxUtils.get_and_test_outputs(wallet, None, True) + + # get outputs for each account + non_default_incoming: bool = False + accounts: list[MoneroAccount] = wallet.get_accounts(True) + for account in accounts: + # determine if account is used + is_used: bool = False + for subaddress in account.subaddresses: + if subaddress.is_used is True: + is_used = True + break + + # get outputs by account index + output_query: MoneroOutputQuery = MoneroOutputQuery() + output_query.account_index = account.index + account_outputs = TxUtils.get_and_test_outputs(wallet, output_query, is_used) + for ouput in account_outputs: + assert ouput.account_index == account.index + + # get outputs by subaddress index + subaddress_outputs: list[MoneroOutputWallet] = [] + for subaddress in account.subaddresses: + subaddr_query: MoneroOutputQuery = MoneroOutputQuery() + subaddr_query.account_index = account.index + subaddr_query.subaddress_index = subaddress.index + outputs = TxUtils.get_and_test_outputs(wallet, subaddr_query, subaddress.is_used) + for output in outputs: + assert subaddress.account_index == output.account_index + assert subaddress.index == output.subaddress_index + if output.account_index != 0 and output.subaddress_index != 0: + non_default_incoming = True + subaddress_outputs.append(output) + + assert len(subaddress_outputs) == len(account_outputs) + + # get outputs by subaddress indices + subaddress_indices: set[int] = set() + for output in subaddress_outputs: + assert output.subaddress_index is not None + subaddress_indices.add(output.subaddress_index) + + output_query = MoneroOutputQuery() + output_query.account_index = account.index + for sub_idx in subaddress_indices: + output_query.subaddress_indices.append(sub_idx) + + outputs = TxUtils.get_and_test_outputs(wallet, output_query, is_used) + assert len(outputs) == len(subaddress_outputs) + + for output in outputs: + assert account.index == output.account_index + assert output.subaddress_index is not None + assert output.subaddress_index in subaddress_indices + + # ensure output found with non-zero account and subaddress indices + assert non_default_incoming, "No outputs found in non-default account and subaddress; run send-to-multiple tests" + + # Can get outputs with additional configuration + #@pytest.mark.skipif(TestUtils.TEST_NON_RELAYS is False, reason="TEST_NON_RELAYS disabled") + @pytest.mark.skip(reason="TODO implement multiple send tests") + def test_get_outputs_with_query(self, wallet: MoneroWallet) -> None: + # get unspent outputs to account 0 + output_query: MoneroOutputQuery = MoneroOutputQuery() + output_query.account_index = 0 + output_query.is_spent = False + outputs: list[MoneroOutputWallet] = TxUtils.get_and_test_outputs(wallet, output_query, None) + + for output in outputs: + assert output.account_index == 0 + assert output.is_spent is False + + # get spent outputs to account 1 + output_query = MoneroOutputQuery() + output_query.account_index = 1 + output_query.is_spent = True + outputs = TxUtils.get_and_test_outputs(wallet, output_query, True) + + for output in outputs: + assert output.account_index == 1 + assert output.is_spent is True + + # get random transactions + tx_query: MoneroTxQuery = MoneroTxQuery() + tx_query.is_confirmed = True + txs: list[MoneroTxWallet] = TxUtils.get_random_transactions(wallet, tx_query, 3, 5) + + # get outputs with a tx hash + tx_hashes: list[str] = [] + for tx in txs: + assert tx.hash is not None + tx_hashes.append(tx.hash) + output_query = MoneroOutputQuery() + output_query.set_tx_query(MoneroTxQuery(), True) + assert output_query.tx_query is not None + output_query.tx_query.hash = tx.hash + outputs = TxUtils.get_and_test_outputs(wallet, output_query, True) + + for output in outputs: + assert output.tx is not None + assert output.tx.hash is not None + assert output.tx.hash in tx_hashes + + # get outputs with tx hashes + tx_query = MoneroTxQuery() + tx_query.hashes = tx_hashes + output_query = MoneroOutputQuery() + output_query.set_tx_query(tx_query, True) + outputs = TxUtils.get_and_test_outputs(wallet, output_query, True) + + for output in outputs: + assert output.tx is not None + assert output.tx.hash is not None + assert output.tx.hash in tx_hashes + + # get confirmed outputs to specifi subaddress with pre-built query + account_idx: int = 0 + subaddress_idx: int = 1 + output_query = MoneroOutputQuery() + output_query.account_index = account_idx + output_query.subaddress_index = subaddress_idx + tx_query = MoneroTxQuery() + tx_query.is_confirmed = True + output_query.set_tx_query(tx_query, True) + output_query.min_amount = TxUtils.MAX_FEE + outputs = TxUtils.get_and_test_outputs(wallet, output_query, True) + + for output in outputs: + assert output.account_index == account_idx + assert output.subaddress_index == subaddress_idx + assert output.tx is not None + assert output.tx.is_confirmed is True + assert output.amount is not None + assert output.amount >= TxUtils.MAX_FEE + + # get output by key image + output: MoneroOutputWallet = outputs[0] + assert output.key_image is not None + assert output.key_image.hex is not None + output_query = MoneroOutputQuery() + output_query.key_image = MoneroKeyImage() + output_query.key_image.hex = output.key_image.hex + outputs = wallet.get_outputs(output_query) + assert len(outputs) == 1 + output_result: MoneroOutputWallet = outputs[0] + assert output_result.key_image is not None + assert output.key_image.hex == output_result.key_image.hex + + # get outputs whose transaction is confirmed and has incoming and outgoing transfers + output_query = MoneroOutputQuery() + tx_query = MoneroTxQuery() + tx_query.is_confirmed = True + tx_query.is_incoming = True + tx_query.is_outgoing = True + tx_query.include_outputs = True + output_query.set_tx_query(tx_query, True) + outputs = wallet.get_outputs(output_query) + assert len(outputs) > 0 + + for output in outputs: + assert output.tx is not None + assert isinstance(output.tx, MoneroTxWallet) + assert output.tx.is_incoming is True + assert output.tx.is_outgoing is True + assert output.tx.is_confirmed is True + outputs_wallet: list[MoneroOutputWallet] = output.tx.get_outputs_wallet() + assert len(outputs_wallet) > 0 + assert output in outputs_wallet + + # Validates inputs when getting wallet outputs + @pytest.mark.skipif(TestUtils.TEST_NON_RELAYS is False or TestUtils.LITE_MODE, reason="TEST_NON_RELAYS disabled") + def test_validate_inputs_get_outputs(self, wallet: MoneroWallet) -> None: + # test with invalid hash + output_query: MoneroOutputQuery = MoneroOutputQuery() + output_query.set_tx_query(MoneroTxQuery(), True) + assert output_query.tx_query is not None + output_query.tx_query.hash = "invalid_id" + + outputs: list[MoneroOutputWallet] = wallet.get_outputs(output_query) + assert len(outputs) == 0 + + # test invalid hash in list + tx_query: MoneroTxQuery = MoneroTxQuery() + tx_query.is_confirmed = True + tx_query.include_outputs = True + random_txs: list[MoneroTxWallet] = TxUtils.get_random_transactions(wallet, tx_query, 3, 5) + + for random_tx in random_txs: + assert len(random_tx.outputs) > 0 + + output_query = MoneroOutputQuery() + output_query.set_tx_query(MoneroTxQuery(), False) + assert output_query.tx_query is not None + random_hash = random_txs[0].hash + assert random_hash is not None and len(random_hash) > 0 + output_query.tx_query.hashes = [random_hash, "invalid_id"] + + outputs = wallet.get_outputs(output_query) + assert len(outputs) > 0 + assert len(outputs) == len(random_txs[0].outputs) + + tx: MoneroTx = outputs[0].tx + assert isinstance(tx, MoneroTxWallet) + + for output in outputs: + assert output.tx == tx + + # Can export outputs in hex format + @pytest.mark.skipif(TestUtils.TEST_NON_RELAYS is False or TestUtils.LITE_MODE, reason="TEST_NON_RELAYS disabled") + def test_export_outputs(self, wallet: MoneroWallet) -> None: + outputs_hex: str = wallet.export_outputs() + logger.debug(f"Exported outputs hex: {outputs_hex}") + # TODO: this will fail if wallet has no outputs; run these tests on new wallet + assert outputs_hex is not None and len(outputs_hex) > 0 + + # wallet exports outputs since last export by default + outputs_hex = wallet.export_outputs() + outputs_hex_all = wallet.export_outputs(True) + assert len(outputs_hex_all) > len(outputs_hex) + + # Can import outputs in hex format + @pytest.mark.skipif(TestUtils.TEST_NON_RELAYS is False or TestUtils.LITE_MODE, reason="TEST_NON_RELAYS disabled") + def test_import_outputs(self, wallet: MoneroWallet) -> None: + # export outputs hex + outputs_hex: str = wallet.export_outputs() + logger.debug(f"Exported outputs hex {outputs_hex}") + # import outputs hex + if len(outputs_hex) > 0: + num_imported: int = wallet.import_outputs(outputs_hex) + assert num_imported >= 0 + + # Has correct accounting across accounts, subaddresses, txs, transfers and outputs + @pytest.mark.skipif(TestUtils.TEST_NON_RELAYS is False, reason="TEST_NON_RELAYS disabled") + def test_accounting(self, wallet: MoneroWallet) -> None: + # pre-fetch wallet balances, accounts, subaddresses and txs + wallet_balance = wallet.get_balance() + wallet_unlocked_balance = wallet.get_unlocked_balance() + # includes subaddresses + accounts = wallet.get_accounts(True) + + # test wallet balance + GenUtils.test_unsigned_big_integer(wallet_balance) + GenUtils.test_unsigned_big_integer(wallet_unlocked_balance) + assert wallet_balance >= wallet_unlocked_balance + + # test that wallet balance equals sum of account balances + accounts_balance: int = 0 + accounts_unlocked_balance: int = 0 + for account in accounts: + # test that account balance equals sum of subaddress balances + WalletUtils.test_account(account, TestUtils.NETWORK_TYPE) + assert account.balance is not None + assert account.unlocked_balance is not None + accounts_balance += account.balance + accounts_unlocked_balance += account.unlocked_balance + + assert wallet_balance == accounts_balance + assert wallet_unlocked_balance == accounts_unlocked_balance + + # TODO test that wallet balance equals net of wallet's incoming and outgoing tx amounts + + # balance may not equal sum of unspent outputs if unconfirmed txs + # TODO monero-wallet-rpc: reason not to return unspent outputs on unconfirmed txs? then this isn't necessary + txs = wallet.get_txs() + has_unconfirmed_tx: bool = False + for tx in txs: + if tx.in_tx_pool: + has_unconfirmed_tx = True + + # wallet balance is sum of all unspent outputs + wallet_sum: int = 0 + output_query: MoneroOutputQuery = MoneroOutputQuery() + output_query.is_spent = False + for output in wallet.get_outputs(output_query): + assert output.amount is not None + wallet_sum += output.amount + + if wallet_balance != wallet_sum: + # txs may have changed in between calls to retry test + wallet_sum = 0 + for output in wallet.get_outputs(output_query): + assert output.amount is not None + wallet_sum += output.amount + + if wallet_balance != wallet_sum: + assert has_unconfirmed_tx, "Wallet balance must equal sum of unspent outputs if no unconfirmed txs" + + # account balances are sum of their unspent outputs + for account in accounts: + account_sum: int = 0 + output_query = MoneroOutputQuery() + output_query.account_index = account.index + output_query.is_spent = False + account_outputs = wallet.get_outputs(output_query) + for output in account_outputs: + assert output.amount is not None + account_sum += output.amount + + assert account.balance is not None + if account.balance != account_sum: + assert has_unconfirmed_tx, "Account balance must equal sum of its unspent outputs if no unconfirmed txs" + + # subaddress balances are sum of their unspent outputs + for subaddress in account.subaddresses: + subaddress_sum: int = 0 + output_query = MoneroOutputQuery() + output_query.account_index = account.index + output_query.subaddress_index = subaddress.index + output_query.is_spent = False + subaddress_outputs = wallet.get_outputs(output_query) + for output in subaddress_outputs: + assert output.amount is not None + subaddress_sum += output.amount + + if subaddress_sum != subaddress.balance: + assert has_unconfirmed_tx, "Subaddress balance must equal sum of its unspent outputs if no unconfirmed txs" + # Can get and set a transaction note @pytest.mark.skipif(TestUtils.TEST_NON_RELAYS is False, reason="TEST_NON_RELAYS disabled") def test_set_tx_note(self, wallet: MoneroWallet) -> None: @@ -1075,8 +1858,6 @@ def test_set_tx_notes(self, wallet: MoneroWallet) -> None: # TODO: test that get transaction has note - #endregion - # Can export signed key images @pytest.mark.skipif(TestUtils.TEST_NON_RELAYS is False, reason="TEST_NON_RELAYS disabled") def test_export_key_images(self, wallet: MoneroWallet) -> None: @@ -1135,6 +1916,84 @@ def test_import_key_images(self, wallet: MoneroWallet) -> None: GenUtils.test_unsigned_big_integer(result.spent_amount, has_spent) GenUtils.test_unsigned_big_integer(result.unspent_amount, has_unspent) + # Can sign and verify messages + # TODO test with view-only wallet + @pytest.mark.skipif(TestUtils.TEST_NON_RELAYS is False, reason="TEST_NON_RELAYS disabled") + def test_sign_and_verify_messages(self, wallet: MoneroWallet) -> None: + msg: str = "This is a super important message which needs to be signed and verified." + subaddress1: MoneroSubaddress = MoneroSubaddress() + subaddress1.account_index = 0 + subaddress1.index = 0 + subaddress2: MoneroSubaddress = MoneroSubaddress() + subaddress2.account_index = 0 + subaddress2.index = 1 + subaddress3: MoneroSubaddress = MoneroSubaddress() + subaddress3.account_index = 1 + subaddress3.index = 0 + subaddresses: list[MoneroSubaddress] = [subaddress1, subaddress2, subaddress3] + + # test signing message with subaddresses + for subaddress in subaddresses: + assert subaddress.account_index is not None + assert subaddress.index is not None + account_idx = subaddress.account_index + idx = subaddress.index + + # sign and verify message with spend key + signature: str = wallet.sign_message(msg, MoneroMessageSignatureType.SIGN_WITH_SPEND_KEY, account_idx, idx) + result = wallet.verify_message(msg, wallet.get_address(account_idx, idx), signature) + WalletUtils.test_message_signature_result(result, True) + assert result.signature_type == MoneroMessageSignatureType.SIGN_WITH_SPEND_KEY + + # verify message with incorrect address + result = wallet.verify_message(msg, wallet.get_address(0, 2), signature) + WalletUtils.test_message_signature_result(result, False) + + # verify message with invalid address + result = wallet.verify_message(msg, "invalid address", signature) + WalletUtils.test_message_signature_result(result, False) + + # verify message with external address + result = wallet.verify_message(msg, TestUtils.get_external_wallet_address(), signature) + WalletUtils.test_message_signature_result(result, False) + + # sign and verify message with view key + signature = wallet.sign_message(msg, MoneroMessageSignatureType.SIGN_WITH_VIEW_KEY, account_idx, idx) + result = wallet.verify_message(msg, wallet.get_address(account_idx, idx), signature) + WalletUtils.test_message_signature_result(result, True) + assert result.signature_type == MoneroMessageSignatureType.SIGN_WITH_VIEW_KEY + + # verify message with incorrect address + result = wallet.verify_message(msg, wallet.get_address(0, 2), signature) + WalletUtils.test_message_signature_result(result, False) + + # verify message with invalid address + result = wallet.verify_message(msg, "invalid address", signature) + WalletUtils.test_message_signature_result(result, False) + + # verify message with external address + result = wallet.verify_message(msg, TestUtils.get_external_wallet_address(), signature) + WalletUtils.test_message_signature_result(result, False) + + # Can get and set arbitrary key/value attributes + @pytest.mark.skipif(TestUtils.TEST_NON_RELAYS is False, reason="TEST_NON_RELAYS disabled") + def test_set_attributes(self, wallet: MoneroWallet) -> None: + # set attributes + attrs: dict[str, str] = {} + for i in range(5): + key: str = f"attr{i}" + val: str = StringUtils.get_random_string() + attrs[key] = val + wallet.set_attribute(key, val) + + # test attributes + for key in attrs: + val = attrs[key] + assert val == wallet.get_attribute(key) + + # get an undefined attribute + assert wallet.get_attribute("unset_key") == "" + # Can convert between a tx config and payment URI @pytest.mark.skipif(TestUtils.TEST_NON_RELAYS is False, reason="TEST_NON_RELAYS disabled") def test_get_payment_uri(self, wallet: MoneroWallet) -> None: @@ -1273,4 +2132,346 @@ def test_save_and_close(self) -> None: assert uuid == wallet.get_attribute("id") self._close_wallet(wallet) + # Can freeze and thaw outputs + @pytest.mark.skipif(TestUtils.TEST_NON_RELAYS is False, reason="TEST_NON_RELAYS disabled") + def test_freeze_outputs(self, wallet: MoneroWallet) -> None: + # get an available output + output_query: MoneroOutputQuery = MoneroOutputQuery() + output_query.is_spent = False + output_query.is_frozen = False + output_query.set_tx_query(MoneroTxQuery(), True) + assert output_query.tx_query is not None + output_query.tx_query.is_locked = False + outputs: list[MoneroOutputWallet] = wallet.get_outputs(output_query) + assert len(outputs) > 0 + for output in outputs: + assert output.is_frozen is False + + output: MoneroOutputWallet = outputs[0] + assert output.tx is not None + assert isinstance(output.tx, MoneroTxWallet) + assert output.tx.is_locked is False + assert output.is_spent is False + assert output.is_frozen is False + assert output.key_image is not None + assert output.key_image.hex is not None + assert wallet.is_output_frozen(output.key_image.hex) is False + + # freeze output by key image + output_query = MoneroOutputQuery() + output_query.is_frozen = True + num_frozen_before: int = len(wallet.get_outputs(output_query)) + wallet.freeze_output(output.key_image.hex) + is_frozen: bool = wallet.is_output_frozen(output.key_image.hex) + assert is_frozen + + # test querying + frozen_outputs: list[MoneroOutputWallet] = wallet.get_outputs(output_query) + num_frozen: int = len(frozen_outputs) + assert num_frozen == num_frozen_before + 1 + output_query = MoneroOutputQuery() + output_query.key_image = MoneroKeyImage() + output_query.key_image.hex = output.key_image.hex + output_query.is_frozen = True + outputs = wallet.get_outputs(output_query) + assert len(outputs) == 1 + output_frozen: MoneroOutputWallet = outputs[0] + assert output_frozen.is_frozen is True + assert output_frozen.key_image is not None + assert output.key_image.hex == output_frozen.key_image.hex + + # try to sweep frozen output + try: + tx_config: MoneroTxConfig = MoneroTxConfig() + tx_config.address = wallet.get_primary_address() + tx_config.key_image = output.key_image.hex + wallet.sweep_output(tx_config) + raise Exception("Should have thrown error") + except Exception as e: + if "No outputs found" != str(e): + raise + + # try to freeze empty key image + try: + wallet.freeze_output("") + raise Exception("Should have thrown error") + except Exception as e: + if "Must specify key image to freeze" != str(e): + raise + + # try to freeze bad key image + try: + wallet.freeze_output("123") + raise Exception("Should have thrown error") + except Exception as e: + logger.debug(e) + #if "Bad key image" != str(e): + # raise + + # thaw output by key image + wallet.thaw_output(output.key_image.hex) + is_frozen = wallet.is_output_frozen(output.key_image.hex) + assert is_frozen is False + + # test querying + output_query = MoneroOutputQuery() + output_query.is_frozen = True + assert num_frozen_before == len(wallet.get_outputs(output_query)) + + output_query = MoneroOutputQuery() + output_query.key_image = MoneroKeyImage() + output_query.key_image.hex = output.key_image.hex + output_query.is_frozen = True + outputs = wallet.get_outputs(output_query) + assert len(outputs) == 0 + + output_query.is_frozen = False + outputs = wallet.get_outputs(output_query) + assert len(outputs) == 1 + + output_thawed: MoneroOutputWallet = outputs[0] + assert output_thawed.is_frozen is False + assert output_thawed.key_image is not None + assert output_thawed.key_image.hex == output.key_image.hex + + # Provides key images of spent outputs + def test_input_key_images(self, wallet: MoneroWallet) -> None: + # get subaddress to test input key images + subaddress: Optional[MoneroSubaddress] = WalletUtils.select_subaddress_with_min_balance(wallet, TxUtils.MAX_FEE) + assert subaddress is not None, "No subaddress with outputs found for test input key images; fund wallet" + assert subaddress.account_index is not None + assert subaddress.index is not None + account_index: int = subaddress.account_index + subaddress_index: int = subaddress.index + + # test unrelayed single transaction + tx_config: MoneroTxConfig = MoneroTxConfig() + tx_config.account_index = account_index + tx_config.destinations.append(MoneroDestination(wallet.get_primary_address(), TxUtils.MAX_FEE)) + spend_tx: MoneroTxWallet = wallet.create_tx(tx_config) + TxUtils.test_spend_tx(spend_tx) + + # test unrelayed split transactions + txs: list[MoneroTxWallet] = wallet.create_txs(tx_config) + for tx in txs: + TxUtils.test_spend_tx(tx) + + # test unrelayed sweep dust + dust_key_images: list[str] = [] + txs = wallet.sweep_dust(False) + for tx in txs: + TxUtils.test_spend_tx(tx) + for tx_input in tx.inputs: + assert tx_input.key_image is not None + assert tx_input.key_image.hex is not None + dust_key_images.append(tx_input.key_image.hex) + + # get available outputs above min amount + output_query: MoneroOutputQuery = MoneroOutputQuery() + output_query.account_index = account_index + output_query.subaddress_index = subaddress_index + output_query.is_spent = False + output_query.is_frozen = False + output_query.min_amount = TxUtils.MAX_FEE + output_query.set_tx_query(MoneroTxQuery(), True) + assert output_query.tx_query is not None + output_query.tx_query.is_locked = False + outputs: list[MoneroOutputWallet] = wallet.get_outputs(output_query) + + assert len(outputs) > 0, "No outputs found" + logger.debug(f"Found {len(outputs)} outputs") + + # filter dust outputs + dust_outputs: list[MoneroOutputWallet] = [] + for output in outputs: + assert output.key_image is not None + assert output.key_image.hex is not None + if output.key_image.hex in dust_key_images: + dust_outputs.append(output) + + logger.debug(f"Found {len(dust_outputs)} dust outputs") + + # remove dust outputs from outputs + for dust_output in dust_outputs: + if dust_output in outputs: + outputs.remove(dust_output) + + assert len(outputs) > 0, "No available outputs found" + logger.debug(f"Using {len(outputs)} available outputs") + + # test unrelayed sweep output + tx_config = MoneroTxConfig() + tx_config.address = wallet.get_primary_address() + output_key_image = outputs[0].key_image + assert output_key_image is not None + tx_config.key_image = output_key_image.hex + spend_tx = wallet.sweep_output(tx_config) + TxUtils.test_spend_tx(spend_tx) + + # test unrelayed sweep wallet ensuring all non-dust outputs are spent + available_key_images: set[str] = set() + for output in outputs: + assert output.key_image is not None + assert output.key_image.hex is not None + available_key_images.add(output.key_image.hex) + swept_key_images: set[str] = set() + tx_config = MoneroTxConfig() + tx_config.account_index = account_index + tx_config.subaddress_indices.append(subaddress_index) + tx_config.address = wallet.get_primary_address() + txs = wallet.sweep_unlocked(tx_config) + + for tx in txs: + TxUtils.test_spend_tx(tx) + for input_wallet in tx.inputs: + assert input_wallet.key_image is not None + assert input_wallet.key_image.hex is not None + swept_key_images.add(input_wallet.key_image.hex) + + assert len(swept_key_images) > 0 + + # max skipped output is less than max fee amount + max_skipped_output: Optional[MoneroOutputWallet] = None + for output in outputs: + assert output.key_image is not None + assert output.key_image.hex is not None + assert output.amount is not None + if output.key_image.hex not in swept_key_images: + if max_skipped_output is None or max_skipped_output.amount < output.amount: # type: ignore + max_skipped_output = output + + if max_skipped_output is not None: + assert max_skipped_output.amount is not None + assert max_skipped_output.amount < TxUtils.MAX_FEE + + #region Test Relays + + # Validates inputs when sending funds + @pytest.mark.skipif(TestUtils.TEST_NON_RELAYS is False, reason="TEST_NON_RELAYS disabled") + def test_validate_inputs_sending_funds(self, wallet: MoneroWallet) -> None: + # try sending with invalid address + try: + tx_config = MoneroTxConfig() + tx_config.address = "my invalid address" + tx_config.account_index = 0 + tx_config.amount = TxUtils.MAX_FEE + wallet.create_tx(tx_config) + raise Exception("Should have thrown") + except Exception as e: + if str(e) != "Invalid destination address": + raise + + # Can send to self + @pytest.mark.skipif(TestUtils.TEST_RELAYS is False, reason="TEST_RELAYS disabled") + def test_send_to_self(self, wallet: MoneroWallet) -> None: + # wait for txs to confirm and for sufficient unlocked balance + TestUtils.WALLET_TX_TRACKER.wait_for_txs_to_clear_pool(wallet) + amount: int = TxUtils.MAX_FEE * 3 + TestUtils.WALLET_TX_TRACKER.wait_for_unlocked_balance(wallet, 0, None, amount) + + # collect sender balances before + balance1 = wallet.get_balance() + unlocked_balance1 = wallet.get_unlocked_balance() + + # test error sending funds to self with integrated subaddress + # TODO (monero-project): sending funds to self + # with integrated subaddress throws error: https://github.com/monero-project/monero/issues/8380 + + try: + tx_config = MoneroTxConfig() + tx_config.account_index = 0 + subaddress = wallet.get_subaddress(0, 1) + assert subaddress.address is not None + address = subaddress.address + tx_config.address = MoneroUtils.get_integrated_address(TestUtils.NETWORK_TYPE, address, '').integrated_address + tx_config.amount = amount + tx_config.relay = True + wallet.create_tx(tx_config) + raise Exception("Should have failed sending to self with integrated subaddress") + except Exception as e: + if "Total received by" not in str(e): + raise + + # send funds to self + tx_config = MoneroTxConfig() + tx_config.account_index = 0 + tx_config.address = wallet.get_integrated_address().integrated_address + tx_config.amount = amount + tx_config.relay = True + + tx = wallet.create_tx(tx_config) + + # test balances after + balance2: int = wallet.get_balance() + unlocked_balance2: int = wallet.get_unlocked_balance() + + # unlocked balance should decrease + assert unlocked_balance2 < unlocked_balance1 + assert tx.fee is not None + expected_balance = balance1 - tx.fee + assert expected_balance == balance2, "Balance after send was not balance before - fee" + + # Can send to external address + @pytest.mark.skipif(TestUtils.TEST_RELAYS is False, reason="TEST_RELAYS is disabled") + def test_send_to_external(self, wallet: MoneroWallet) -> None: + recipient: Optional[MoneroWallet] = None + try: + # wait for txs to confirm and for sufficient unlocked balance + TestUtils.WALLET_TX_TRACKER.wait_for_txs_to_clear_pool(wallet) + amount: int = TxUtils.MAX_FEE * 3 + TestUtils.WALLET_TX_TRACKER.wait_for_unlocked_balance(wallet, 0, None, amount) + + # create recipient wallet + recipient = self._create_wallet(MoneroWalletConfig()) + + balance1: int = wallet.get_balance() + unlocked_balance1: int = wallet.get_unlocked_balance() + + # send funds to recipient + tx_config: MoneroTxConfig = MoneroTxConfig() + tx_config.account_index = 0 + tx_config.address = wallet.get_integrated_address(recipient.get_primary_address(), "54491f3bb3572a37").integrated_address + tx_config.amount = amount + tx_config.relay = True + tx: MoneroTxWallet = wallet.create_tx(tx_config) + + # test sender balances after + balance2: int = wallet.get_balance() + unlocked_balance2: int = wallet.get_unlocked_balance() + + # unlocked balance should decrease + assert unlocked_balance2 < unlocked_balance1 + assert tx.fee is not None + expected_balance = balance1 - tx.get_outgoing_amount() - tx.fee + assert expected_balance == balance2, "Balance after send was not balance before - net tx amount - fee (5 - 1 != 4 test)" + + # test recipient balance after + recipient.sync() + tx_query: MoneroTxQuery = MoneroTxQuery() + tx_query.is_confirmed = False + txs = wallet.get_txs(tx_query) + + assert len(txs) > 0 + assert amount == recipient.get_balance() + + finally: + if recipient is not None: + self._close_wallet(recipient) + + # Can scan transactions by id + def test_scan_txs(self, wallet: MoneroWallet) -> None: + config: MoneroWalletConfig = MoneroWalletConfig() + config.seed = wallet.get_seed() + config.restore_height = 0 + scan_wallet: MoneroWallet = self._create_wallet(config) + logger.debug(f"Created scan wallet") + TxUtils.test_scan_txs(wallet, scan_wallet) + + # Can get the default fee priority + def test_get_default_fee_priority(self, wallet: MoneroWallet) -> None: + default_priority: MoneroTxPriority = wallet.get_default_fee_priority() + assert int(default_priority) > 0 + + #endregion + #endregion diff --git a/tests/test_monero_wallet_interface.py b/tests/test_monero_wallet_interface.py index 3ce14df..62b05cf 100644 --- a/tests/test_monero_wallet_interface.py +++ b/tests/test_monero_wallet_interface.py @@ -8,19 +8,30 @@ MoneroTxWallet ) +from utils import WalletUtils + logger: logging.Logger = logging.getLogger("TestMoneroWalletInterface") -# Test calls to MoneroWallet interface +# Test binding calls to MoneroWallet interface @pytest.mark.unit class TestMoneroWalletInterface: - """Wallet interface bindings unit tests""" + """Wallet interface binding calls unit tests""" @pytest.fixture(scope="class") def wallet(self) -> MoneroWallet: """Test wallet instance""" return MoneroWallet() + # Setup and teardown of test class + @pytest.fixture(scope="class", autouse=True) + def global_setup_and_teardown(self): + """Executed once before all tests""" + logger.info(f"Setup test class {type(self).__name__}") + yield + logger.info(f"Teardown test class {type(self).__name__}") + + # Setup and teardown of each tests @pytest.fixture(autouse=True) def setup_and_teardown(self, request: pytest.FixtureRequest): logger.info(f"Before {request.node.name}") # type: ignore @@ -34,18 +45,22 @@ def test_default_language(self) -> None: assert MoneroWallet.DEFAULT_LANGUAGE is not None, "MoneroWallet.DEFAULT_LANGUAGE is None" assert MoneroWallet.DEFAULT_LANGUAGE == "English", f'Expected "English", got {MoneroWallet.DEFAULT_LANGUAGE}' + # Can call is_view_only() @pytest.mark.not_supported def test_is_view_only(self, wallet: MoneroWallet) -> None: wallet.is_view_only() + # Can call get_version() @pytest.mark.not_supported def test_get_version(self, wallet: MoneroWallet) -> None: wallet.get_version() + # Can call get_path() @pytest.mark.not_supported def test_get_path(self, wallet: MoneroWallet) -> None: wallet.get_path() + # Can call get_network_type() @pytest.mark.not_supported def test_get_network_type(self, wallet: MoneroWallet) -> None: wallet.get_network_type() @@ -502,9 +517,9 @@ def test_submit_multisig_tx_hex(self, wallet: MoneroWallet) -> None: def test_sign_message(self, wallet: MoneroWallet) -> None: wallet.sign_message("", MoneroMessageSignatureType.SIGN_WITH_VIEW_KEY) - @pytest.mark.not_supported def test_verify_message(self, wallet: MoneroWallet) -> None: - wallet.verify_message("", "", "") + result = wallet.verify_message("", "", "") + WalletUtils.test_message_signature_result(result, False) @pytest.mark.not_supported def test_get_payment_uri(self, wallet: MoneroWallet) -> None: diff --git a/tests/test_monero_wallet_keys.py b/tests/test_monero_wallet_keys.py index bf3e90c..4ead0e6 100644 --- a/tests/test_monero_wallet_keys.py +++ b/tests/test_monero_wallet_keys.py @@ -22,11 +22,6 @@ class TestMoneroWalletKeys(BaseTestMoneroWallet): _account_indices: list[int] = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] _subaddress_indices: list[int] = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] - @pytest.fixture(scope="class", autouse=True) - @override - def before_all(self): - pass - @pytest.fixture(scope="class") @override def wallet(self) -> MoneroWalletKeys: @@ -38,6 +33,22 @@ def wallet(self) -> MoneroWalletKeys: def daemon(self) -> MoneroDaemonRpc: return MoneroDaemon() # type: ignore + @override + def before_all(self) -> None: + logger.info(f"Setup test class {type(self).__name__}") + + @override + def after_all(self) -> None: + logger.info(f"Teardown test class {type(self).__name__}") + + @override + def before_each(self, request: pytest.FixtureRequest) -> None: + logger.info(f"Before {request.node.name}") # type: ignore + + @override + def after_each(self, request: pytest.FixtureRequest) -> None: + logger.info(f"After {request.node.name}") # type: ignore + #region Overrides @classmethod @@ -121,6 +132,46 @@ def test_create_then_relay_split(self, wallet: MoneroWallet) -> None: def test_get_txs_wallet(self, wallet: MoneroWallet) -> None: return super().test_get_txs_wallet(wallet) + @pytest.mark.not_supported + @override + def test_get_txs_by_hash(self, wallet: MoneroWallet) -> None: + return super().test_get_txs_by_hash(wallet) + + @pytest.mark.not_supported + @override + def test_get_txs_with_query(self, wallet: MoneroWallet) -> None: + return super().test_get_txs_with_query(wallet) + + @pytest.mark.not_supported + @override + def test_validate_inputs_get_txs(self, wallet: MoneroWallet) -> None: + return super().test_validate_inputs_get_txs(wallet) + + @pytest.mark.not_supported + @override + def test_get_transfers(self, wallet: MoneroWallet) -> None: + return super().test_get_transfers(wallet) + + @pytest.mark.not_supported + @override + def test_validate_inputs_get_transfers(self, wallet: MoneroWallet) -> None: + return super().test_validate_inputs_get_transfers(wallet) + + @pytest.mark.not_supported + @override + def test_get_outputs(self, wallet: MoneroWallet) -> None: + return super().test_get_outputs(wallet) + + @pytest.mark.not_supported + @override + def test_validate_inputs_get_outputs(self, wallet: MoneroWallet) -> None: + return super().test_validate_inputs_get_outputs(wallet) + + @pytest.mark.not_supported + @override + def test_accounting(self, wallet: MoneroWallet) -> None: + return super().test_accounting(wallet) + @pytest.mark.not_supported @override def test_daemon(self, wallet: MoneroWallet) -> None: @@ -236,6 +287,11 @@ def test_get_new_key_images_from_last_import(self, wallet: MoneroWallet) -> None def test_import_key_images(self, wallet: MoneroWallet) -> None: return super().test_import_key_images(wallet) + @pytest.mark.not_supported + @override + def test_set_attributes(self, wallet: MoneroWallet) -> None: + return super().test_set_attributes(wallet) + @pytest.mark.not_supported @override def test_get_payment_uri(self, wallet: MoneroWallet) -> None: @@ -256,6 +312,61 @@ def test_change_password(self) -> None: def test_save_and_close(self) -> None: return super().test_save_and_close() + @pytest.mark.not_supported + @override + def test_validate_inputs_sending_funds(self, wallet: MoneroWallet) -> None: + return super().test_validate_inputs_sending_funds(wallet) + + @pytest.mark.not_supported + @override + def test_export_outputs(self, wallet: MoneroWallet) -> None: + return super().test_export_outputs(wallet) + + @pytest.mark.not_supported + @override + def test_import_outputs(self, wallet: MoneroWallet) -> None: + return super().test_import_outputs(wallet) + + @pytest.mark.not_supported + @override + def test_send_to_self(self, wallet: MoneroWallet) -> None: + return super().test_send_to_self(wallet) + + @pytest.mark.not_supported + @override + def test_send_to_external(self, wallet: MoneroWallet) -> None: + return super().test_send_to_external(wallet) + + @pytest.mark.not_supported + @override + def test_scan_txs(self, wallet: MoneroWallet) -> None: + return super().test_scan_txs(wallet) + + @pytest.mark.not_supported + @override + def test_get_default_fee_priority(self, wallet: MoneroWallet) -> None: + return super().test_get_default_fee_priority(wallet) + + @pytest.mark.not_implemented + @override + def test_sign_and_verify_messages(self, wallet: MoneroWallet) -> None: + return super().test_sign_and_verify_messages(wallet) + + @pytest.mark.not_supported + @override + def test_freeze_outputs(self, wallet: MoneroWallet) -> None: + return super().test_freeze_outputs(wallet) + + @pytest.mark.not_supported + @override + def test_get_outputs_with_query(self, wallet: MoneroWallet) -> None: + return super().test_get_outputs_with_query(wallet) + + @pytest.mark.xfail(reason="Keys-only wallet does not have enumerable set of subaddresses") + @override + def test_input_key_images(self, wallet: MoneroWallet) -> None: + return super().test_input_key_images(wallet) + #endregion #region Tests diff --git a/tests/test_monero_wallet_model.py b/tests/test_monero_wallet_model.py new file mode 100644 index 0000000..44f4a25 --- /dev/null +++ b/tests/test_monero_wallet_model.py @@ -0,0 +1,173 @@ +import pytest +import logging + +from monero import MoneroTxQuery, MoneroTransferQuery, MoneroOutputQuery + +logger: logging.Logger = logging.getLogger("TestMoneroWalletModel") + + +@pytest.mark.unit +class TestMoneroWalletModel: + """Test monero wallet data model""" + + #region Fixtures + + # Setup and teardown of test class + @pytest.fixture(scope="class", autouse=True) + def global_setup_and_teardown(self): + """Executed once before all tests""" + logger.info(f"Setup test class {type(self).__name__}") + yield + logger.info(f"Teardown test class {type(self).__name__}") + + # setup and teardown of each tests + @pytest.fixture(autouse=True) + def setup_and_teardown(self, request: pytest.FixtureRequest): + logger.info(f"Before {request.node.name}") # type: ignore + yield + logger.info(f"After {request.node.name}") # type: ignore + + #endregion + + #region Tests + + # Test output query expected behaviour + def test_output_query(self) -> None: + output_query = MoneroOutputQuery() + tx_query: MoneroTxQuery = MoneroTxQuery() + + # assign tx query to output query + output_query.set_tx_query(tx_query, True) + + assert output_query.tx_query is not None + assert output_query.tx_query == tx_query + assert tx_query.input_query is None + assert tx_query.output_query is not None + assert tx_query.output_query == output_query + + # reassign output query to tx query + output_query.tx_query.output_query = output_query + assert output_query.tx_query is not None + assert output_query == output_query.tx_query.output_query + + # remove tx query from output query + output_query.set_tx_query(None, True) + + assert output_query.tx_query is None + assert tx_query.output_query is None + + # Test input query expected behaviour + def test_input_query(self) -> None: + input_query = MoneroOutputQuery() + tx_query: MoneroTxQuery = MoneroTxQuery() + + # assign tx query to input query + input_query.set_tx_query(tx_query, False) + + assert input_query.tx_query is not None + assert input_query.tx_query == tx_query + assert tx_query.output_query is None + assert tx_query.input_query is not None + assert tx_query.input_query == input_query + + # reassign input query to tx query + input_query.tx_query.input_query = input_query + assert input_query.tx_query is not None + assert input_query == input_query.tx_query.input_query + + # remove tx query from input query + input_query.set_tx_query(None, False) + + assert input_query.tx_query is None + assert tx_query.input_query is None + + # Test transfer query expected behaviour + def test_transfer_query(self) -> None: + transfer_query: MoneroTransferQuery = MoneroTransferQuery() + tx_query: MoneroTxQuery = MoneroTxQuery() + + # assign tx query to transfer query + transfer_query.tx_query = tx_query + + assert tx_query.transfer_query is not None + assert tx_query.transfer_query == transfer_query + + # reassign transfer query to tx query + transfer_query.tx_query.transfer_query = transfer_query + assert transfer_query.tx_query is not None + assert transfer_query == transfer_query.tx_query.transfer_query + + # remove tx query from transfer query + transfer_query.tx_query = None + + assert tx_query.transfer_query is None + + transfer_query = MoneroTransferQuery() + transfer_query.tx_query = MoneroTxQuery() + + # check incoming/outgoing + assert transfer_query.incoming is None + assert transfer_query.outgoing is None + assert transfer_query.is_incoming() is None + assert transfer_query.is_outgoing() is None + + # set incoming + transfer_query.incoming = True + assert transfer_query.is_incoming() is True + assert transfer_query.outgoing is False + assert transfer_query.is_outgoing() is False + transfer_query.incoming = None + + # set outgoing + transfer_query.outgoing = True + assert transfer_query.is_outgoing() is True + assert transfer_query.incoming is False + assert transfer_query.is_incoming() is False + + # Test tx query expected behaviour + def test_tx_query(self) -> None: + tx_query: MoneroTxQuery = MoneroTxQuery() + transfer_query: MoneroTransferQuery = MoneroTransferQuery() + output_query: MoneroOutputQuery = MoneroOutputQuery() + input_query: MoneroOutputQuery = MoneroOutputQuery() + + # assign transfer query to tx query + tx_query.transfer_query = transfer_query + + assert tx_query.transfer_query == transfer_query + assert transfer_query.tx_query is not None + assert transfer_query.tx_query == tx_query + + # remove transfer query from tx query + tx_query.transfer_query = None + + assert tx_query.transfer_query != transfer_query + assert transfer_query.tx_query is None + + # assign output query to tx query + tx_query.output_query = output_query + + assert tx_query.output_query == output_query + assert output_query.tx_query is not None + assert output_query.tx_query == tx_query + + # remove output query from tx query + tx_query.output_query = None + + assert tx_query.output_query != output_query + assert output_query.tx_query is None + + # assign input query to tx query + tx_query.input_query = input_query + + assert tx_query.input_query == input_query + assert input_query.tx_query is not None + assert input_query.tx_query == tx_query + + # remove output query from tx query + tx_query.input_query = None + + assert tx_query.input_query != input_query + assert input_query.tx_query is None + + #endregion diff --git a/tests/test_monero_wallet_rpc.py b/tests/test_monero_wallet_rpc.py index 080c962..326a071 100644 --- a/tests/test_monero_wallet_rpc.py +++ b/tests/test_monero_wallet_rpc.py @@ -52,17 +52,21 @@ def _get_seed_languages(self) -> list[str]: return self.get_test_wallet().get_seed_languages() @override - @pytest.fixture(scope="class", autouse=True) def before_all(self) -> None: - self._setup_blockchain() + super().before_all() # if full tests ran, wait for full wallet's pool txs to confirm if Utils.WALLET_FULL_TESTS_RUN: Utils.clear_wallet_full_txs_pool() @override - def after_each(self, request: pytest.FixtureRequest) -> None: + def after_all(self) -> None: + super().after_all() Utils.free_wallet_rpc_resources() + + @override + def after_each(self, request: pytest.FixtureRequest) -> None: super().after_each(request) + Utils.free_wallet_rpc_resources() @override def get_daemon_rpc_uri(self) -> str: @@ -108,24 +112,19 @@ def test_get_seed_language(self, wallet: MoneroWallet) -> None: def test_get_height_by_date(self, wallet: MoneroWallet) -> None: return super().test_get_height_by_date(wallet) - #endregion - - #region Disabled Tests - - @pytest.mark.skip(reason="TODO monero-project") + @pytest.mark.xfail(reason="TODO monero-project") @override def test_get_public_view_key(self, wallet: MoneroWallet) -> None: return super().test_get_public_view_key(wallet) - @pytest.mark.skip(reason="TODO monero-project") + @pytest.mark.xfail(reason="TODO monero-project") @override def test_get_public_spend_key(self, wallet: MoneroWallet) -> None: return super().test_get_public_spend_key(wallet) - @pytest.mark.skip(reason="TODO") - @override - def test_wallet_equality_ground_truth(self, wallet: MoneroWallet) -> None: - return super().test_wallet_equality_ground_truth(wallet) + #endregion + + #region Disabled Tests @pytest.mark.skip(reason="TODO (monero-project): https://github.com/monero-project/monero/issues/5812") @override diff --git a/tests/utils/assert_utils.py b/tests/utils/assert_utils.py index f44fe7c..c96a22e 100644 --- a/tests/utils/assert_utils.py +++ b/tests/utils/assert_utils.py @@ -42,15 +42,6 @@ def assert_equals(cls, expr1: Any, expr2: Any, message: str = "assertion failed" else: assert expr1 == expr2, f"{message}: {expr1} == {expr2}" - @classmethod - def equals(cls, expr1: Any, expr2: Any) -> bool: - try: - cls.assert_equals(expr1, expr2) - return True - except Exception as e: - logger.debug(str(e)) - return False - @classmethod def assert_not_equals(cls, expr1: Any, expr2: Any, message: str = "assertion failed"): assert expr1 != expr2, f"{message}: {expr1} != {expr2}" diff --git a/tests/utils/mining_utils.py b/tests/utils/mining_utils.py index 903be46..2795c54 100644 --- a/tests/utils/mining_utils.py +++ b/tests/utils/mining_utils.py @@ -104,13 +104,16 @@ def is_wallet_funded(cls, wallet: MoneroWallet, xmr_amount_per_address: float, n else: return False - if wallet.get_balance() < amount_required: + wallet_balance = wallet.get_balance() + + if wallet_balance < amount_required: return False accounts = wallet.get_accounts(True) subaddresses_found: int = 0 + num_wallet_accounts = len(accounts) - if len(accounts) < num_accounts: + if num_wallet_accounts < num_accounts: return False for account in accounts: @@ -120,13 +123,14 @@ def is_wallet_funded(cls, wallet: MoneroWallet, xmr_amount_per_address: float, n if balance >= amount_per_address: subaddresses_found += 1 - return subaddresses_found >= num_accounts * (num_subaddresses + 1) + required_subaddresses: int = num_accounts * (num_subaddresses + 1) + return subaddresses_found >= required_subaddresses @classmethod def fund_wallet(cls, wallet: MoneroWallet, xmr_amount_per_address: float, num_accounts: int = 3, num_subaddresses: int = 10) -> Optional[list[MoneroTxWallet]]: """Fund a wallet with mined coins""" primary_addr = wallet.get_primary_address() - if cls.is_wallet_funded(wallet, xmr_amount_per_address, num_subaddresses): + if cls.is_wallet_funded(wallet, xmr_amount_per_address, num_accounts, num_subaddresses): logger.debug(f"Already funded wallet {primary_addr}") return None @@ -143,7 +147,6 @@ def fund_wallet(cls, wallet: MoneroWallet, xmr_amount_per_address: float, num_ac tx_config.can_split = True supports_get_accounts = isinstance(wallet, MoneroWalletRpc) or isinstance(wallet, MoneroWalletFull) - while supports_get_accounts and len(wallet.get_accounts()) < num_accounts: wallet.create_account() @@ -169,6 +172,9 @@ def fund_wallet(cls, wallet: MoneroWallet, xmr_amount_per_address: float, num_ac for tx in txs: assert tx.is_failed is False, "Cannot fund wallet: tx failed" + if supports_get_accounts: + wallet.save() + logger.debug(f"Funded test wallet {primary_addr} with {amount_required_str}") return txs diff --git a/tests/utils/single_tx_sender.py b/tests/utils/single_tx_sender.py index 04f8d3a..79ff8cb 100644 --- a/tests/utils/single_tx_sender.py +++ b/tests/utils/single_tx_sender.py @@ -20,38 +20,55 @@ class SingleTxSender: """Sends funds from the first unlocked account to primary account address.""" SEND_DIVISOR: int = 10 + """Transaction amount send divisor.""" _config: MoneroTxConfig + """Transaction configuration.""" _wallet: MoneroWallet + """Wallet reference""" _daemon: MoneroDaemonRpc + """Daemon reference""" _from_account: Optional[MoneroAccount] = None + """Account to use to send funds.""" _from_subaddress: Optional[MoneroSubaddress] = None + """Subaddress to use to send funds.""" @property def tracker(self) -> TxTracker: + """Wallet transaction tracker.""" return TestUtils.WALLET_TX_TRACKER @property def balance_before(self) -> int: + """Wallet balance before sending.""" balance = self._from_subaddress.balance if self._from_subaddress is not None else 0 return balance if balance is not None else 0 @property def unlocked_balance_before(self) -> int: + """Wallet unlocked balance before sending.""" balance = self._from_subaddress.unlocked_balance if self._from_subaddress is not None else 0 return balance if balance is not None else 0 @property def send_amount(self) -> int: + """Amount to send.""" b = self.unlocked_balance_before return int((b - TxUtils.MAX_FEE) / self.SEND_DIVISOR) @property def address(self) -> str: + """Primary wallet address""" return self._wallet.get_primary_address() def __init__(self, wallet: MoneroWallet, config: Optional[MoneroTxConfig]) -> None: + """ + Initialize a new single transaction sender. + + :param MoneroWallet wallet: wallet reference. + :param MoneroTxConfig | None config: transaction configuration. + """ self._wallet = wallet self._daemon = TestUtils.get_daemon_rpc() self._config = config if config is not None else MoneroTxConfig() @@ -59,6 +76,11 @@ def __init__(self, wallet: MoneroWallet, config: Optional[MoneroTxConfig]) -> No #region Private Methods def _build_tx_config(self) -> MoneroTxConfig: + """ + Build transaction configuration. + + :returns MoneroTxConfig: transaction configuration. + """ assert self._from_account is not None assert self._from_account.index is not None assert self._from_subaddress is not None @@ -125,21 +147,35 @@ def _check_balance_decreased(self) -> None: def _send_to_invalid(self, config: MoneroTxConfig) -> None: """Send to invalid address""" # save original address - try: - # set invalid destination address - config.set_address("my invalid address") - # create tx - if config.can_split is not False: - self._wallet.create_txs(config) - else: - self._wallet.create_tx(config) - # raise error - raise Exception("Should have thrown error creating tx with invalid address") - except Exception as e: - assert str(e) == "Invalid destination address", str(e) - finally: - # restore original address - config.set_address(self.address) + max_retries: int = 3 + num_retries: int = 0 + + while True: + logger.debug(f"Trying sending to invalid address ({num_retries + 1}/{max_retries})...") + try: + # set invalid destination address + config.set_address("my invalid address") + # create tx + if config.can_split is not False: + self._wallet.create_txs(config) + else: + self._wallet.create_tx(config) + # raise error + raise Exception("Should have thrown error creating tx with invalid address") + except Exception as e: + # retry on network error + msg: str = str(e) + if msg == "Network error": + if num_retries == max_retries: + raise + num_retries += 1 + continue + + assert msg == "Invalid destination address", msg + break + finally: + # restore original address + config.set_address(self.address) def _send_to_self(self, config: MoneroTxConfig) -> list[MoneroTxWallet]: """Test sending to self""" @@ -152,6 +188,13 @@ def _send_to_self(self, config: MoneroTxConfig) -> list[MoneroTxWallet]: return txs def _handle_non_relayed_tx(self, txs: list[MoneroTxWallet], config: MoneroTxConfig) -> list[MoneroTxWallet]: + """ + Handle non relayed wallet txs + + :param list[MoneroTxWallet] txs: Transactions to handle. + :param MoneroTxConfig config: Context transaction configuration. + :returns list[MoneroTxWallet]: Handled transactions. + """ if config.relay is True: return txs @@ -195,6 +238,7 @@ def _handle_non_relayed_tx(self, txs: list[MoneroTxWallet], config: MoneroTxConf #endregion def send(self) -> None: + """Send single transaction from wallet with configuration.""" # check wallet balance self._check_balance() @@ -211,7 +255,7 @@ def send(self) -> None: # test send to self txs = self._send_to_self(config) - logger.debug(f"Created {len(txs)}") + logger.debug(f"Created {len(txs)} txs") # test that config is unchaged assert config_copy != config @@ -223,7 +267,7 @@ def send(self) -> None: # handle non-relayed transaction txs = self._handle_non_relayed_tx(txs, config) - logger.debug(f"Handled {len(txs)} txs") + logger.debug(f"Handled {len(txs)} non relayed txs") # test that balance and unlocked balance decreased self._check_balance_decreased() diff --git a/tests/utils/test_utils.py b/tests/utils/test_utils.py index bf208f6..1ff70c0 100644 --- a/tests/utils/test_utils.py +++ b/tests/utils/test_utils.py @@ -57,6 +57,8 @@ class TestUtils(ABC): DAEMON_RPC_PASSWORD: str = "" """Monero daemon rpc password""" TEST_NON_RELAYS: bool = True + """Indicates if non-relays tests are enabled""" + TEST_RELAYS: bool = True """Indicates if relays tests are enabled""" LITE_MODE: bool = False """Indicates if running tests in light mode""" @@ -167,6 +169,7 @@ def load_config(cls) -> None: # parse general config nettype_str = parser.get('general', 'network_type') cls.TEST_NON_RELAYS = parser.getboolean('general', 'test_non_relays') + cls.TEST_RELAYS = parser.getboolean('general', 'test_relays') cls.TEST_NOTIFICATIONS = parser.getboolean('general', 'test_notifications') cls.LITE_MODE = parser.getboolean('general', 'lite_mode') cls.AUTO_CONNECT_TIMEOUT_MS = parser.getint('general', 'auto_connect_timeout_ms') @@ -174,7 +177,7 @@ def load_config(cls) -> None: cls.REGTEST = DaemonUtils.is_regtest(nettype_str) if cls.REGTEST: - cls.MIN_BLOCK_HEIGHT = 250 # minimum block height for regtest environment + cls.MIN_BLOCK_HEIGHT = 100 # minimum block height for regtest environment # parse daemon config cls.DAEMON_RPC_URI = parser.get('daemon', 'rpc_uri') @@ -318,7 +321,7 @@ def get_wallet_full(cls) -> MoneroWalletFull: # sync and save wallet if cls._WALLET_FULL.is_connected_to_daemon(): - listener = WalletSyncPrinter() + listener = WalletSyncPrinter(0.25) cls._WALLET_FULL.sync(listener) cls._WALLET_FULL.save() cls._WALLET_FULL.start_syncing(cls.SYNC_PERIOD_IN_MS) # start background synchronizing with sync period @@ -349,13 +352,17 @@ def get_mining_wallet(cls) -> MoneroWalletFull: if cls._WALLET_MINING is not None: return cls._WALLET_MINING if not MoneroWalletFull.wallet_exists(cls.MINING_WALLET_FULL_PATH): + logger.debug("Creating mining wallet...") wallet = MoneroWalletFull.create_wallet(cls.get_mining_wallet_config()) + logger.debug("Mining wallet created") else: + logger.debug("Opening mining wallet...") wallet = MoneroWalletFull.open_wallet(cls.MINING_WALLET_FULL_PATH, cls.MINING_WALLET_PASSWORD, cls.NETWORK_TYPE) + logger.debug("Loaded mining wallet") wallet.set_daemon_connection(cls.get_daemon_rpc_connection()) assert wallet.is_connected_to_daemon(), "Mining wallet is not connected to daemon" - listener = WalletSyncPrinter() + listener = WalletSyncPrinter(0.25) wallet.sync(listener) wallet.save() wallet.start_syncing(cls.SYNC_PERIOD_IN_MS) @@ -540,7 +547,7 @@ def create_wallet_ground_truth( gt_wallet = MoneroWalletFull.create_wallet(config) AssertUtils.assert_equals(restore_height, gt_wallet.get_restore_height()) - gt_wallet.sync(start_height, WalletSyncPrinter()) + gt_wallet.sync(start_height, WalletSyncPrinter(0.25)) gt_wallet.start_syncing(cls.SYNC_PERIOD_IN_MS) # close the full wallet when the runtime is shutting down to release resources diff --git a/tests/utils/tx_spammer.py b/tests/utils/tx_spammer.py index c2fa14d..c1903e1 100644 --- a/tests/utils/tx_spammer.py +++ b/tests/utils/tx_spammer.py @@ -34,7 +34,7 @@ def spam(self) -> list[MoneroTxWallet]: spam_txs = MiningUtils.fund_wallet(wallet, 1, 1, 0) wallet_addr = wallet.get_primary_address() assert spam_txs is not None and len(spam_txs) > 0, f"Could not spam tx for random wallet ({i}): {wallet_addr}" - for tx in txs: + for tx in spam_txs: logger.debug(f"Spammed tx {tx.hash} for random wallet ({i}): {wallet_addr}") # save tx txs.append(tx) diff --git a/tests/utils/tx_utils.py b/tests/utils/tx_utils.py index 2584119..43cd173 100644 --- a/tests/utils/tx_utils.py +++ b/tests/utils/tx_utils.py @@ -9,7 +9,8 @@ MoneroOutgoingTransfer, MoneroDestination, MoneroUtils, MoneroOutputWallet, MoneroTx, MoneroOutput, MoneroKeyImage, MoneroDaemon, - MoneroTxConfig, MoneroTxSet + MoneroTxConfig, MoneroTxSet, MoneroTransferQuery, + MoneroOutputQuery ) from .tx_context import TxContext @@ -111,7 +112,10 @@ def test_output_wallet(cls, output: Optional[MoneroOutputWallet]) -> None: assert copy != output AssertUtils.assert_equals(copy, output) # TODO: should output copy do deep copy of tx so models are graph instead of tree? Would need to work out circular references - assert copy.tx is None + # monero-cpp gives non-null output tx + # assert copy.tx is None + assert copy.tx is not None + assert copy.tx == output.tx @classmethod def test_destination(cls, dest: Optional[MoneroDestination]) -> None: @@ -178,12 +182,12 @@ def test_transfer(cls, transfer: Optional[MoneroTransfer], context: Optional[TxC # transfer and tx reference each other assert transfer.tx is not None - if not AssertUtils.equals(transfer, transfer.tx.outgoing_transfer): + if transfer != transfer.tx.outgoing_transfer: assert len(transfer.tx.incoming_transfers) != 0 assert transfer in transfer.tx.incoming_transfers, "Transaction does not reference given transfer" @classmethod - def test_tx_wallet(cls, tx: Optional[MoneroTxWallet], context: Optional[TxContext]) -> None: + def test_tx_wallet(cls, tx: Optional[MoneroTxWallet], context: Optional[TxContext] = None) -> None: """Test monero tx wallet""" # validate / sanitize inputs ctx = TxContext(context) @@ -698,7 +702,23 @@ def test_miner_tx(cls, miner_tx: Optional[MoneroTx]) -> None: # cls.test_tx(miner_tx, ctx) @classmethod - def get_and_test_txs(cls, wallet: MoneroWallet, query: Optional[MoneroTxQuery], ctx: Optional[TxContext], is_expected: bool, regtest: bool) -> list[MoneroTxWallet]: + def test_spend_tx(cls, spend_tx: Optional[MoneroTxWallet]) -> None: + """ + Test spend transaction. + + :param MoneroTxWallet | None spend_tx: Spend transaction to test. + """ + # validate spend tx + assert spend_tx is not None + assert len(spend_tx.inputs) > 0 + # validate tx inputs + for input_wallet in spend_tx.inputs: + assert input_wallet.key_image is not None + assert input_wallet.key_image.hex is not None + assert len(input_wallet.key_image.hex) > 0 + + @classmethod + def get_and_test_txs(cls, wallet: MoneroWallet, query: Optional[MoneroTxQuery], ctx: Optional[TxContext], is_expected: Optional[bool], regtest: bool) -> list[MoneroTxWallet]: """Get and test txs from wallet""" copy: Optional[MoneroTxQuery] = query.copy() if query is not None else None txs = wallet.get_txs(query) if query is not None else wallet.get_txs() @@ -718,6 +738,81 @@ def get_and_test_txs(cls, wallet: MoneroWallet, query: Optional[MoneroTxQuery], return txs + @classmethod + def get_and_test_transfers(cls, wallet: MoneroWallet, query: Optional[MoneroTransferQuery], ctx: Optional[TxContext], is_expected: Optional[bool]) -> list[MoneroTransfer]: + copy: Optional[MoneroTransferQuery] = query.copy() if query is not None else None + transfers = wallet.get_transfers(query) if query is not None else wallet.get_transfers(MoneroTransferQuery()) + + if is_expected is False: + assert len(transfers) == 0 + elif is_expected is True: + assert len(transfers) > 0, "Transfers were expected but not found; run send tests?" + + if ctx is None: + ctx = TxContext() + + ctx.wallet = wallet + for transfer in transfers: + TxUtils.test_tx_wallet(transfer.tx, ctx) + + if query is not None: + AssertUtils.assert_equals(copy, query) + + return transfers + + @classmethod + def get_and_test_outputs(cls, wallet: MoneroWallet, query: Optional[MoneroOutputQuery], is_expected: Optional[bool]) -> list[MoneroOutputWallet]: + """ + Fetches and tests wallet outputs (i.e. wallet tx outputs) according to the given query. + + :param MoneroWallet wallet: wallet to get outputs from. + :param MoneroOutputQuery | None query: output query. + :param bool | None is_expected: expected non-empty outputs. + """ + + copy = query.copy() if query is not None else None + outputs = wallet.get_outputs(query) if query is not None else wallet.get_outputs(MoneroOutputQuery()) + AssertUtils.assert_equals(copy, query) + + if is_expected is False: + assert len(outputs) == 0 + elif is_expected is True: + assert len(outputs) > 0, "Outputs were expected but not found; run send tests" + + for output in outputs: + TxUtils.test_output_wallet(output) + + return outputs + + @classmethod + def test_scan_txs(cls, wallet: Optional[MoneroWallet], scan_wallet: Optional[MoneroWallet]) -> None: + assert wallet is not None + assert scan_wallet is not None + # get a few tx hashes + tx_hashes: list[str] = [] + txs: list[MoneroTxWallet] = wallet.get_txs() + assert len(txs) > 2, "Not enough txs to scan" + i: int = 0 + while i < 3: + tx_hash = txs[i].hash + assert tx_hash is not None + tx_hashes.append(tx_hash) + i += 1 + + # start wallet without scanning + # TODO create wallet without daemon connection (offline does not reconnect, default connects to localhost, + # offline then online causes confirmed txs to disappear) + scan_wallet.stop_syncing() + assert scan_wallet.is_connected_to_daemon() + + # scan txs + scan_wallet.scan_txs(tx_hashes) + + # TODO scanning txs causes merge problems reconciling 0 fee, is_miner_tx with test txs + + # close wallet + scan_wallet.close(False) + @classmethod def is_tx_in_block(cls, tx: MoneroTxWallet, block: MoneroBlock) -> bool: """Check if transaction is included in block""" @@ -844,7 +939,7 @@ def test_get_txs_structure(cls, txs: list[MoneroTxWallet], q: Optional[MoneroTxQ if prev_account_idx < output.account_index: prev_subaddress_idx = None prev_account_idx = output.account_index - if prev_subaddress_idx: + if prev_subaddress_idx is None: prev_subaddress_idx = output.subaddress_index else: assert prev_subaddress_idx is not None diff --git a/tests/utils/wallet_equality_utils.py b/tests/utils/wallet_equality_utils.py index 3ea7580..d9277bc 100644 --- a/tests/utils/wallet_equality_utils.py +++ b/tests/utils/wallet_equality_utils.py @@ -57,12 +57,11 @@ def test_wallet_equality_on_chain(cls, w1: MoneroWallet, w2: MoneroWallet) -> No assert w1.get_unlocked_balance() == w2.get_unlocked_balance() transfer_query = MoneroTransferQuery() transfer_query.tx_query = MoneroTxQuery() - # TODO set transfer query is setter - transfer_query.tx_query.transfer_query = transfer_query transfer_query.tx_query.is_confirmed = True cls.test_transfers_equal_on_chain(w1.get_transfers(transfer_query), w2.get_transfers(transfer_query)) output_query = MoneroOutputQuery() - output_query.tx_query = MoneroTxQuery() + output_query.set_tx_query(MoneroTxQuery(), True) + assert output_query.tx_query is not None output_query.tx_query.is_confirmed = True cls.test_output_wallets_equal_on_chain(w1.get_outputs(output_query), w2.get_outputs(output_query)) diff --git a/tests/utils/wallet_utils.py b/tests/utils/wallet_utils.py index f3e9fed..c17a593 100644 --- a/tests/utils/wallet_utils.py +++ b/tests/utils/wallet_utils.py @@ -5,7 +5,8 @@ from monero import ( MoneroNetworkType, MoneroUtils, MoneroAccount, - MoneroSubaddress, MoneroWalletKeys, MoneroWalletConfig + MoneroSubaddress, MoneroWalletKeys, MoneroWalletConfig, + MoneroMessageSignatureResult, MoneroWallet ) from .gen_utils import GenUtils @@ -17,6 +18,25 @@ class WalletUtils(ABC): """Wallet test utilities""" + @classmethod + def select_subaddress_with_min_balance(cls, wallet: MoneroWallet, min_balance: int, skip_primary: bool = True) -> Optional[MoneroSubaddress]: + # get wallet accounts + accounts: list[MoneroAccount] = wallet.get_accounts(True) + for account in accounts: + assert account.index is not None + i: int = account.index + for subaddress in account.subaddresses: + assert subaddress.index is not None + j: int = subaddress.index + if i == 0 and j == 0 and skip_primary: + continue + + assert subaddress.unlocked_balance is not None + if subaddress.unlocked_balance > min_balance - 1: + return subaddress + + return None + @classmethod def test_invalid_address(cls, address: Optional[str], network_type: MoneroNetworkType) -> None: if address is None: @@ -161,3 +181,17 @@ def create_random_wallets(cls, network_type: MoneroNetworkType, n: int = 10) -> wallets.append(wallet) return wallets + + @classmethod + def test_message_signature_result(cls, result: Optional[MoneroMessageSignatureResult], is_good: bool) -> None: + assert result is not None + if is_good: + assert result.is_good is True + assert result.is_old is False + assert result.version == 2 + else: + # TODO set boost::optional in monero-cpp? + assert result.is_good is False + assert result.is_old is False + #assert result.signature_type is None + assert result.version == 0