From 129152ee9d079fc1afddb04e5d32a5bf026756de Mon Sep 17 00:00:00 2001 From: Ali Beyad Date: Sun, 7 Aug 2022 16:08:34 -0400 Subject: [PATCH] xds: Add a KeyValueStore-backed xDS delegate extension to contrib An xDS delegate extension point was added in https://github.com/envoyproxy/envoy/pull/22473 to enable custom behavior upon receiving and loading xDS resources. This change creates an implementation of the XdsResourcesDelegate interface that is backed by a KeyValueStore. The intended use case is to enable persisting xDS resources and loading them on startup in Envoy Mobile, in the event that the xDS control plane is unreachable. Signed-off-by: Ali Beyad --- api/contrib/envoy/extensions/xds/BUILD | 11 + .../xds/kv_store_xds_delegate_config.proto | 23 ++ .../extensions/xds/persisted_resources.proto | 16 + api/versioning/BUILD | 1 + contrib/contrib_build_config.bzl | 6 + contrib/extensions_metadata.yaml | 5 + contrib/xds/source/BUILD | 23 ++ contrib/xds/source/kv_store_xds_delegate.cc | 111 ++++++ contrib/xds/source/kv_store_xds_delegate.h | 44 +++ contrib/xds/test/BUILD | 34 ++ .../kv_store_xds_delegate_integration_test.cc | 337 ++++++++++++++++++ envoy/config/xds_resources_delegate.h | 9 +- source/common/config/grpc_mux_impl.cc | 173 ++++++--- source/common/config/grpc_mux_impl.h | 6 + .../config/xds_mux/sotw_subscription_state.cc | 1 + .../common/upstream/cluster_manager_impl.cc | 6 +- test/integration/base_integration_test.cc | 12 +- test/integration/base_integration_test.h | 21 +- ...xds_delegate_extension_integration_test.cc | 8 +- 19 files changed, 777 insertions(+), 70 deletions(-) create mode 100644 api/contrib/envoy/extensions/xds/BUILD create mode 100644 api/contrib/envoy/extensions/xds/kv_store_xds_delegate_config.proto create mode 100644 api/contrib/envoy/extensions/xds/persisted_resources.proto create mode 100644 contrib/xds/source/BUILD create mode 100644 contrib/xds/source/kv_store_xds_delegate.cc create mode 100644 contrib/xds/source/kv_store_xds_delegate.h create mode 100644 contrib/xds/test/BUILD create mode 100644 contrib/xds/test/kv_store_xds_delegate_integration_test.cc diff --git a/api/contrib/envoy/extensions/xds/BUILD b/api/contrib/envoy/extensions/xds/BUILD new file mode 100644 index 0000000000000..290c08b0f2158 --- /dev/null +++ b/api/contrib/envoy/extensions/xds/BUILD @@ -0,0 +1,11 @@ +load("@envoy_api//bazel:api_build_system.bzl", "api_proto_package") + +licenses(["notice"]) # Apache 2 + +api_proto_package( + deps = [ + "//envoy/config/common/key_value/v3:pkg", + "@com_github_cncf_udpa//udpa/annotations:pkg", + "@envoy_api//envoy/service/discovery/v3:pkg", + ], +) diff --git a/api/contrib/envoy/extensions/xds/kv_store_xds_delegate_config.proto b/api/contrib/envoy/extensions/xds/kv_store_xds_delegate_config.proto new file mode 100644 index 0000000000000..d29e7c83a2d7b --- /dev/null +++ b/api/contrib/envoy/extensions/xds/kv_store_xds_delegate_config.proto @@ -0,0 +1,23 @@ +syntax = "proto3"; + +package envoy.extensions.xds; + +import "envoy/config/common/key_value/v3/config.proto"; +import "udpa/annotations/status.proto"; +import "validate/validate.proto"; + +option java_package = "io.envoyproxy.envoy.extensions.xds"; +option java_outer_classname = "KeyValueStoreXdsDelegateConfigProto"; +option java_multiple_files = true; +option go_package = "github.com/envoyproxy/go-control-plane/envoy/extensions/xds"; +option (udpa.annotations.file_status).package_version_status = ACTIVE; + +// [#extension: envoy.xds_delegates.kv_store] +// +// Configuration for a KeyValueStore-based XdsResourcesDelegate implementation. +// +// [#not-implemented-hide:] +message KeyValueStoreXdsDelegateConfig { + // Configuration for the KeyValueStore that holds the xDS resources. + config.common.key_value.v3.KeyValueStoreConfig key_value_store_config = 1; +}; diff --git a/api/contrib/envoy/extensions/xds/persisted_resources.proto b/api/contrib/envoy/extensions/xds/persisted_resources.proto new file mode 100644 index 0000000000000..9ac85742e85c2 --- /dev/null +++ b/api/contrib/envoy/extensions/xds/persisted_resources.proto @@ -0,0 +1,16 @@ +syntax = "proto3"; + +package envoy.extensions.xds; + +import "google/protobuf/timestamp.proto"; +import "envoy/service/discovery/v3/discovery.proto"; + +// [#not-implemented-hide:] +// Represents a list of xDS resources for an xDS authority and resource type URL tuple. Used to +// serialize xDS resources in the KeyValueStoreXdsDelegate. +message ResourceList { + repeated envoy.service.discovery.v3.Resource resources = 1; + + // The timestamp at which the xDS resources list was last updated in the KV store. + google.protobuf.Timestamp last_updated = 2; +}; diff --git a/api/versioning/BUILD b/api/versioning/BUILD index ca5ad9b7e1c9f..16823915782c9 100644 --- a/api/versioning/BUILD +++ b/api/versioning/BUILD @@ -28,6 +28,7 @@ proto_library( "//contrib/envoy/extensions/private_key_providers/qat/v3alpha:pkg", "//contrib/envoy/extensions/regex_engines/hyperscan/v3alpha:pkg", "//contrib/envoy/extensions/vcl/v3alpha:pkg", + "//contrib/envoy/extensions/xds:pkg", "//envoy/admin/v3:pkg", "//envoy/config/accesslog/v3:pkg", "//envoy/config/bootstrap/v3:pkg", diff --git a/contrib/contrib_build_config.bzl b/contrib/contrib_build_config.bzl index 189995c4b3505..4a556659e549b 100644 --- a/contrib/contrib_build_config.bzl +++ b/contrib/contrib_build_config.bzl @@ -57,4 +57,10 @@ CONTRIB_EXTENSIONS = { # "envoy.regex_engines.hyperscan": "//contrib/hyperscan/regex_engines/source:config", + + # + # xDS delegates + # + + "envoy.xds_delegates.kv_store": "//contrib/xds/source:kv_store_xds_delegate", } diff --git a/contrib/extensions_metadata.yaml b/contrib/extensions_metadata.yaml index 398f8bed5212d..65c2ab77aecb8 100644 --- a/contrib/extensions_metadata.yaml +++ b/contrib/extensions_metadata.yaml @@ -88,3 +88,8 @@ envoy.regex_engines.hyperscan: - envoy.regex_engines security_posture: requires_trusted_downstream_and_upstream status: alpha +envoy.xds_delegates.kv_store: + categories: + - envoy.xds_delegates + security_posture: requires_trusted_downstream_and_upstream + status: alpha diff --git a/contrib/xds/source/BUILD b/contrib/xds/source/BUILD new file mode 100644 index 0000000000000..bd87663359a1b --- /dev/null +++ b/contrib/xds/source/BUILD @@ -0,0 +1,23 @@ +load( + "//bazel:envoy_build_system.bzl", + "envoy_cc_contrib_extension", + "envoy_contrib_package", +) + +licenses(["notice"]) # Apache 2 + +envoy_contrib_package() + +envoy_cc_contrib_extension( + name = "kv_store_xds_delegate", + srcs = ["kv_store_xds_delegate.cc"], + hdrs = ["kv_store_xds_delegate.h"], + deps = [ + "//envoy/common:key_value_store_interface", + "//envoy/common:time_interface", + "//envoy/config:xds_resources_delegate_interface", + "//source/common/config:utility_lib", + "//source/common/protobuf:utility_lib", + "@envoy_api//contrib/envoy/extensions/xds:pkg_cc_proto", + ], +) diff --git a/contrib/xds/source/kv_store_xds_delegate.cc b/contrib/xds/source/kv_store_xds_delegate.cc new file mode 100644 index 0000000000000..c63107ccffe0a --- /dev/null +++ b/contrib/xds/source/kv_store_xds_delegate.cc @@ -0,0 +1,111 @@ +#include "contrib/envoy/extensions/xds/kv_store_xds_delegate_config.pb.h" +#include "contrib/envoy/extensions/xds/kv_store_xds_delegate_config.pb.validate.h" +#include "contrib/envoy/extensions/xds/persisted_resources.pb.h" +#include "contrib/xds/source/kv_store_xds_delegate.h" + +#include "envoy/registry/registry.h" +#include "envoy/service/discovery/v3/discovery.pb.h" + +#include "source/common/config/utility.h" +#include "source/common/protobuf/utility.h" + +#include "absl/strings/str_cat.h" + +namespace Envoy { +namespace Extensions { +namespace Config { +namespace { + +using envoy::extensions::xds::KeyValueStoreXdsDelegateConfig; +using envoy::extensions::xds::ResourceList; + +// The supplied KeyValueStore may be shared with other parts of the application +// (e.g. SharedPreferences on Android). Therefore, we introduce a prefix to the key to create a +// distinct key namespace. +constexpr char KEY_PREFIX[] = "XDS_CONFIG"; +// The delimiter between parts of the key. +constexpr char DELIMITER[] = "*+"; + +// Constructs the key for the KeyValueStore from the xDS authority and resource type URL. +std::string constructKey(const std::string& authority_id, const std::string& resource_type_url) { + return absl::StrCat(KEY_PREFIX, DELIMITER, authority_id, DELIMITER, resource_type_url); +} + +} // namespace + +KeyValueStoreXdsDelegate::KeyValueStoreXdsDelegate(KeyValueStorePtr&& xds_config_store, + Api::Api& api) + : xds_config_store_(std::move(xds_config_store)), api_(api) {} + +std::vector +KeyValueStoreXdsDelegate::getResources(const std::string& authority_id, + const std::string& resource_type_url) const { + const std::string key = constructKey(authority_id, resource_type_url); + if (auto existing_resources = xds_config_store_->get(key)) { + ResourceList resource_list; + resource_list.ParseFromString(std::string(*existing_resources)); + return std::vector{resource_list.resources().begin(), + resource_list.resources().end()}; + } + return {}; +} + +// TODO(abeyad): Handle key eviction. +void KeyValueStoreXdsDelegate::onConfigUpdated( + const std::string& authority_id, const std::string& resource_type_url, + const std::vector& resources) { + ResourceList resource_list; + for (const auto& resource_ref : resources) { + const auto& decoded_resource = resource_ref.get(); + if (decoded_resource.hasResource()) { + envoy::service::discovery::v3::Resource r; + // TODO(abeyad): Support dynamic parameter constraints. + r.set_name(decoded_resource.name()); + r.set_version(decoded_resource.version()); + r.mutable_resource()->PackFrom(decoded_resource.resource()); + if (decoded_resource.ttl()) { + r.mutable_ttl()->CopyFrom(Protobuf::util::TimeUtil::MillisecondsToDuration( + decoded_resource.ttl().value().count())); + } + *resource_list.add_resources() = std::move(r); + } + } + + const std::string key = constructKey(authority_id, resource_type_url); + + if (resource_list.resources_size() == 0) { + xds_config_store_->remove(key); + return; + } + + TimestampUtil::systemClockToTimestamp(api_.timeSource().systemTime(), + *resource_list.mutable_last_updated()); + xds_config_store_->addOrUpdate(key, resource_list.SerializeAsString()); +} + +Envoy::ProtobufTypes::MessagePtr KeyValueStoreXdsDelegateFactory::createEmptyConfigProto() { + return std::make_unique(); +} + +std::string KeyValueStoreXdsDelegateFactory::name() const { + return "envoy.xds_delegates.KeyValueStoreXdsDelegate"; +}; + +Envoy::Config::XdsResourcesDelegatePtr KeyValueStoreXdsDelegateFactory::createXdsResourcesDelegate( + const ProtobufWkt::Any& config, ProtobufMessage::ValidationVisitor& validation_visitor, + Api::Api& api, Event::Dispatcher& dispatcher) { + const auto& validator_config = + Envoy::MessageUtil::anyConvertAndValidate(config, + validation_visitor); + auto& kv_store_factory = Envoy::Config::Utility::getAndCheckFactory( + validator_config.key_value_store_config().config()); + KeyValueStorePtr xds_config_store = kv_store_factory.createStore( + validator_config.key_value_store_config(), validation_visitor, dispatcher, api.fileSystem()); + return std::make_unique(std::move(xds_config_store), api); +} + +REGISTER_FACTORY(KeyValueStoreXdsDelegateFactory, Envoy::Config::XdsResourcesDelegateFactory); + +} // namespace Config +} // namespace Extensions +} // namespace Envoy diff --git a/contrib/xds/source/kv_store_xds_delegate.h b/contrib/xds/source/kv_store_xds_delegate.h new file mode 100644 index 0000000000000..b431af813d7d6 --- /dev/null +++ b/contrib/xds/source/kv_store_xds_delegate.h @@ -0,0 +1,44 @@ +#pragma once + +#include "envoy/common/key_value_store.h" +#include "envoy/config/xds_resources_delegate.h" + +namespace Envoy { +namespace Extensions { +namespace Config { + +// TODO(abeyad): add comments +class KeyValueStoreXdsDelegate : public Envoy::Config::XdsResourcesDelegate { +public: + KeyValueStoreXdsDelegate(KeyValueStorePtr&& xds_config_store, Api::Api& api); + + std::vector + getResources(const std::string& authority_id, + const std::string& resource_type_url) const override; + + void onConfigUpdated(const std::string& authority_id, const std::string& resource_type_url, + const std::vector& resources) override; + +private: + KeyValueStorePtr xds_config_store_; + Api::Api& api_; +}; + +// TODO(abeyad): add comments +class KeyValueStoreXdsDelegateFactory : public Envoy::Config::XdsResourcesDelegateFactory { +public: + KeyValueStoreXdsDelegateFactory() = default; + + Envoy::ProtobufTypes::MessagePtr createEmptyConfigProto() override; + + std::string name() const override; + + Envoy::Config::XdsResourcesDelegatePtr + createXdsResourcesDelegate(const ProtobufWkt::Any& config, + ProtobufMessage::ValidationVisitor& validation_visitor, Api::Api& api, + Event::Dispatcher& dispatcher) override; +}; + +} // namespace Config +} // namespace Extensions +} // namespace Envoy diff --git a/contrib/xds/test/BUILD b/contrib/xds/test/BUILD new file mode 100644 index 0000000000000..ce55647abb0d1 --- /dev/null +++ b/contrib/xds/test/BUILD @@ -0,0 +1,34 @@ +load( + "//bazel:envoy_build_system.bzl", + "envoy_cc_test", + "envoy_contrib_package", +) + +licenses(["notice"]) # Apache 2 + +envoy_contrib_package() + +envoy_cc_test( + name = "kv_store_xds_delegate_integration_test", + srcs = [ + "kv_store_xds_delegate_integration_test.cc", + ], + data = [ + "//test/config/integration/certs", + ], + deps = [ + "//contrib/xds/source:kv_store_xds_delegate", + "//source/extensions/key_value/file_based:config_lib", + "//test/common/grpc:grpc_client_integration_lib", + "//test/integration:http_integration_lib", + "//test/test_common:utility_lib", + "@envoy_api//contrib/envoy/extensions/xds:pkg_cc_proto", + "@envoy_api//envoy/admin/v3:pkg_cc_proto", + "@envoy_api//envoy/config/bootstrap/v3:pkg_cc_proto", + "@envoy_api//envoy/config/core/v3:pkg_cc_proto", + "@envoy_api//envoy/extensions/transport_sockets/tls/v3:pkg_cc_proto", + "@envoy_api//envoy/service/discovery/v3:pkg_cc_proto", + "@envoy_api//envoy/service/runtime/v3:pkg_cc_proto", + "@envoy_api//envoy/service/secret/v3:pkg_cc_proto", + ], +) diff --git a/contrib/xds/test/kv_store_xds_delegate_integration_test.cc b/contrib/xds/test/kv_store_xds_delegate_integration_test.cc new file mode 100644 index 0000000000000..111f3a7fe9486 --- /dev/null +++ b/contrib/xds/test/kv_store_xds_delegate_integration_test.cc @@ -0,0 +1,337 @@ +#include "envoy/admin/v3/config_dump.pb.h" +#include "envoy/service/runtime/v3/rtds.pb.h" +#include "envoy/service/secret/v3/sds.pb.h" + +#include "test/common/grpc/grpc_client_integration.h" +#include "test/integration/http_integration.h" +#include "test/test_common/utility.h" + +#include "gtest/gtest.h" + +namespace Envoy { +namespace { + +static constexpr char SDS_CLUSTER_NAME[] = "sds_cluster.lyft.com"; +static constexpr char RTDS_CLUSTER_NAME[] = "rtds_cluster"; +static constexpr char CLIENT_CERT_NAME[] = "client_cert"; + +std::string bootstrapConfig() { + const std::string filename = TestEnvironment::temporaryPath("dns_cache.txt"); + ::unlink(filename.c_str()); + + return fmt::format(R"EOF( +static_resources: + clusters: + - name: dummy_cluster + typed_extension_protocol_options: + envoy.extensions.upstreams.http.v3.HttpProtocolOptions: + "@type": type.googleapis.com/envoy.extensions.upstreams.http.v3.HttpProtocolOptions + explicit_http_config: + http2_protocol_options: {{}} + load_assignment: + cluster_name: dummy_cluster + endpoints: + - lb_endpoints: + - endpoint: + address: + socket_address: + address: 127.0.0.1 + port_value: 0 +layered_runtime: + layers: + - name: some_static_layer + static_layer: + foo: whatevs + bar: yar + - name: some_rtds_layer + rtds_layer: + name: some_rtds_layer + rtds_config: + resource_api_version: V3 + api_config_source: + api_type: GRPC + transport_api_version: V3 + grpc_services: + envoy_grpc: + cluster_name: {} + set_node_on_first_message_only: true + - name: some_admin_layer + admin_layer: {{}} +xds_delegate_extension: + name: envoy.config.xds.KeyValueStoreXdsDelegate + typed_config: + "@type": type.googleapis.com/envoy.extensions.xds.KeyValueStoreXdsDelegateConfig + key_value_store_config: + config: + name: envoy.key_value.file_based + typed_config: + "@type": type.googleapis.com/envoy.extensions.key_value.file_based.v3.FileBasedKeyValueStoreConfig + filename: {} +admin: + access_log: + - name: envoy.access_loggers.file + typed_config: + "@type": type.googleapis.com/envoy.extensions.access_loggers.file.v3.FileAccessLog + path: "{}" + address: + socket_address: + address: 127.0.0.1 + port_value: 0 +)EOF", + RTDS_CLUSTER_NAME, filename, Platform::null_device_path); +} + +class KeyValueStoreXdsDelegateIntegrationTest : public HttpIntegrationTest, + public Grpc::GrpcClientIntegrationParamTest { +public: + KeyValueStoreXdsDelegateIntegrationTest() + : HttpIntegrationTest(Http::CodecType::HTTP2, ipVersion(), bootstrapConfig()) { + use_lds_ = false; + // TODO(abeyad): add UnifiedSotw tests too when implementation is ready. + sotw_or_delta_ = Grpc::SotwOrDelta::Sotw; + skip_tag_extraction_rule_check_ = true; + } + + void initialize() override { + config_helper_.addConfigModifier([this](envoy::config::bootstrap::v3::Bootstrap& bootstrap) { + // Add the SDS cluster. + addXdsCluster(bootstrap, std::string(SDS_CLUSTER_NAME)); + // Add the RTDS cluster. + addXdsCluster(bootstrap, std::string(RTDS_CLUSTER_NAME)); + + // Set up the initial static cluster with SSL using SDS. + auto* transport_socket = + bootstrap.mutable_static_resources()->mutable_clusters(0)->mutable_transport_socket(); + envoy::extensions::transport_sockets::tls::v3::UpstreamTlsContext tls_context; + tls_context.set_sni("lyft.com"); + auto* secret_config = + tls_context.mutable_common_tls_context()->add_tls_certificate_sds_secret_configs(); + setUpSdsConfig(secret_config, CLIENT_CERT_NAME); + transport_socket->set_name("envoy.transport_sockets.tls"); + transport_socket->mutable_typed_config()->PackFrom(tls_context); + }); + + HttpIntegrationTest::initialize(); + // Register admin port. + registerTestServerPorts({}); + } + + void TearDown() override { + closeConnection(sds_connection_); + closeConnection(rtds_connection_); + cleanupUpstreamAndDownstream(); + codec_client_.reset(); + test_server_.reset(); + fake_upstreams_.clear(); + } + + void createUpstreams() override { + // Static cluster. + addFakeUpstream(Http::CodecType::HTTP2); + // SDS Cluster. + addFakeUpstream(Http::CodecType::HTTP2); + // RTDS Cluster. + addFakeUpstream(Http::CodecType::HTTP2); + } + +protected: + FakeUpstream& getSdsUpstream() { return *fake_upstreams_[1]; } + FakeUpstream& getRtdsUpstream() { return *fake_upstreams_[2]; } + + void addXdsCluster(envoy::config::bootstrap::v3::Bootstrap& bootstrap, + const std::string& cluster_name) { + auto* xds_cluster = bootstrap.mutable_static_resources()->add_clusters(); + xds_cluster->MergeFrom(bootstrap.static_resources().clusters()[0]); + xds_cluster->set_name(cluster_name); + xds_cluster->mutable_load_assignment()->set_cluster_name(cluster_name); + ConfigHelper::setHttp2(*xds_cluster); + } + + void initXdsStream(FakeUpstream& upstream, FakeHttpConnectionPtr& connection, + FakeStreamPtr& stream) { + AssertionResult result = upstream.waitForHttpConnection(*dispatcher_, connection); + RELEASE_ASSERT(result, result.message()); + result = connection->waitForNewStream(*dispatcher_, stream); + RELEASE_ASSERT(result, result.message()); + stream->startGrpcStream(); + } + + void closeConnection(FakeHttpConnectionPtr& connection) { + AssertionResult result = connection->close(); + RELEASE_ASSERT(result, result.message()); + result = connection->waitForDisconnect(); + RELEASE_ASSERT(result, result.message()); + connection.reset(); + } + + void setUpSdsConfig(envoy::extensions::transport_sockets::tls::v3::SdsSecretConfig* secret_config, + const std::string& secret_name) { + secret_config->set_name(secret_name); + auto* config_source = secret_config->mutable_sds_config(); + config_source->set_resource_api_version(envoy::config::core::v3::ApiVersion::V3); + auto* api_config_source = config_source->mutable_api_config_source(); + api_config_source->set_api_type(envoy::config::core::v3::ApiConfigSource::GRPC); + api_config_source->set_transport_api_version(envoy::config::core::v3::V3); + auto* grpc_service = api_config_source->add_grpc_services(); + setGrpcService(*grpc_service, SDS_CLUSTER_NAME, getSdsUpstream().localAddress()); + } + + envoy::extensions::transport_sockets::tls::v3::Secret getClientSecret() { + envoy::extensions::transport_sockets::tls::v3::Secret secret; + secret.set_name(std::string(CLIENT_CERT_NAME)); + auto* tls_certificate = secret.mutable_tls_certificate(); + tls_certificate->mutable_certificate_chain()->set_filename( + TestEnvironment::runfilesPath("test/config/integration/certs/clientcert.pem")); + tls_certificate->mutable_private_key()->set_filename( + TestEnvironment::runfilesPath("test/config/integration/certs/clientkey.pem")); + return secret; + } + + std::string getRuntimeKey(const std::string& key) { + auto response = IntegrationUtil::makeSingleRequest( + lookupPort("admin"), "GET", "/runtime?format=json", "", downstreamProtocol(), version_); + EXPECT_TRUE(response->complete()); + EXPECT_EQ("200", response->headers().getStatusValue()); + Json::ObjectSharedPtr loader = TestEnvironment::jsonLoadFromString(response->body()); + auto entries = loader->getObject("entries"); + if (entries->hasObject(key)) { + return entries->getObject(key)->getString("final_value"); + } + return ""; + } + + void checkSecretExists(const std::string& secret_name, const std::string& version_info) { + auto response = IntegrationUtil::makeSingleRequest( + lookupPort("admin"), "GET", "/config_dump?resource=dynamic_active_secrets", "", + downstreamProtocol(), version_); + EXPECT_TRUE(response->complete()); + EXPECT_EQ("200", response->headers().getStatusValue()); + Json::ObjectSharedPtr loader = TestEnvironment::jsonLoadFromString(response->body()); + envoy::admin::v3::ConfigDump config_dump; + TestUtility::loadFromJson(loader->asJsonString(), config_dump); + // Expect at least the "client_cert" dynamic secret. + ASSERT_GE(config_dump.configs_size(), 1); + envoy::admin::v3::SecretsConfigDump::DynamicSecret dynamic_secret; + config_dump.configs(0).UnpackTo(&dynamic_secret); + EXPECT_EQ(secret_name, dynamic_secret.name()); + EXPECT_EQ(version_info, dynamic_secret.version_info()); + } + + void shutdownAndRestartTestServer() { + // Reset the test server. + on_server_init_function_ = nullptr; + test_server_.reset(); + + // Set up a new Envoy, using the previous Envoy's configuration, and create the test server. + ConfigHelper helper(version_, *api_, + MessageUtil::getJsonStringFromMessageOrDie(config_helper_.bootstrap())); + std::vector ports; + std::vector zero; + for (auto& upstream : fake_upstreams_) { + if (upstream->localAddress()->ip()) { + ports.push_back(upstream->localAddress()->ip()->port()); + zero.push_back(0); + } + } + helper.setPorts(zero, true); // Zero out ports set by config_helper_'s finalize(); + const std::string bootstrap_path = finalizeConfigWithPorts(helper, ports, use_lds_); + + std::vector named_ports; + const auto& static_resources = config_helper_.bootstrap().static_resources(); + named_ports.reserve(static_resources.listeners_size()); + for (int i = 0; i < static_resources.listeners_size(); ++i) { + named_ports.push_back(static_resources.listeners(i).name()); + } + createGeneratedApiTestServer(bootstrap_path, named_ports, {false, true, false}, false, + test_server_); + registerTestServerPorts(named_ports, test_server_); + } + + FakeHttpConnectionPtr sds_connection_; + FakeStreamPtr sds_stream_; + FakeHttpConnectionPtr rtds_connection_; + FakeStreamPtr rtds_stream_; +}; + +INSTANTIATE_TEST_SUITE_P(IpVersions, KeyValueStoreXdsDelegateIntegrationTest, + GRPC_CLIENT_INTEGRATION_PARAMS); + +TEST_P(KeyValueStoreXdsDelegateIntegrationTest, BasicSuccess) { + on_server_init_function_ = [this]() { + { + // SDS. + initXdsStream(getSdsUpstream(), sds_connection_, sds_stream_); + EXPECT_TRUE(compareSotwDiscoveryRequest( + sds_stream_, /*expected_type_url=*/Config::TypeUrl::get().Secret, /*expected_version=*/"", + /*expected_resource_names=*/{std::string(CLIENT_CERT_NAME)}, /*expect_node=*/true)); + auto sds_resource = getClientSecret(); + sendSotwDiscoveryResponse( + sds_stream_, Config::TypeUrl::get().Secret, {sds_resource}, "1"); + } + { + // RTDS. + initXdsStream(getRtdsUpstream(), rtds_connection_, rtds_stream_); + EXPECT_TRUE(compareSotwDiscoveryRequest( + rtds_stream_, /*expected_type_url=*/Config::TypeUrl::get().Runtime, + /*expected_version=*/"", + /*expected_resource_names=*/{"some_rtds_layer"}, /*expect_node=*/true)); + auto rtds_resource = TestUtility::parseYaml(R"EOF( + name: some_rtds_layer + layer: + foo: bar + baz: meh + )EOF"); + sendSotwDiscoveryResponse( + rtds_stream_, Config::TypeUrl::get().Runtime, {rtds_resource}, "1"); + } + }; + + initialize(); + + // Wait until the discovery responses have been processed. + test_server_->waitForCounterGe( + "cluster.dummy_cluster.client_ssl_socket_factory.ssl_context_update_by_sds", 1); + test_server_->waitForCounterGe("runtime.load_success", 2); + + // Verify that the xDS resources are used by Envoy. + checkSecretExists(std::string(CLIENT_CERT_NAME), /*version_info=*/"1"); + EXPECT_EQ("bar", getRuntimeKey("foo")); + EXPECT_EQ("yar", getRuntimeKey("bar")); + EXPECT_EQ("meh", getRuntimeKey("baz")); + + // Send an update to the RTDS resource, from the RTDS cluster to the Envoy test server. + EXPECT_TRUE(compareSotwDiscoveryRequest( + rtds_stream_, /*expected_type_url=*/Config::TypeUrl::get().Runtime, /*expected_version=*/"1", + /*expected_resource_names=*/{"some_rtds_layer"}, /*expect_node=*/false)); + auto rtds_resource = TestUtility::parseYaml(R"EOF( + name: some_rtds_layer + layer: + baz: saz + )EOF"); + sendSotwDiscoveryResponse( + rtds_stream_, Config::TypeUrl::get().Runtime, {rtds_resource}, "2"); + test_server_->waitForCounterGe("runtime.load_success", 3); + + EXPECT_EQ("whatevs", getRuntimeKey("foo")); + EXPECT_EQ("yar", getRuntimeKey("bar")); + EXPECT_EQ("saz", getRuntimeKey("baz")); + + // Kill the current test server, and restart it using the same configuration. + shutdownAndRestartTestServer(); + + // Wait until SDS and RTDS have been loaded from disk and updated the Envoy instance. + test_server_->waitForCounterGe( + "cluster.dummy_cluster.client_ssl_socket_factory.ssl_context_update_by_sds", 1); + test_server_->waitForCounterGe("runtime.load_success", 2); + + // Verify that the latest resource values are used by Envoy. + checkSecretExists(std::string(CLIENT_CERT_NAME), /*version_info=*/"1"); + EXPECT_EQ("whatevs", getRuntimeKey("foo")); + EXPECT_EQ("yar", getRuntimeKey("bar")); + EXPECT_EQ("saz", getRuntimeKey("baz")); +} + +// TODO(abeyad): add test for removed resources updates the persisted xDS. + +} // namespace +} // namespace Envoy diff --git a/envoy/config/xds_resources_delegate.h b/envoy/config/xds_resources_delegate.h index f1c1b29708ae2..a9313ef9c376e 100644 --- a/envoy/config/xds_resources_delegate.h +++ b/envoy/config/xds_resources_delegate.h @@ -33,7 +33,7 @@ class XdsResourcesDelegate { * @return A set of xDS resources for the given authority and type. */ virtual std::vector - getResources(const std::string& authority_id, const std::string& resource_type_url) PURE; + getResources(const std::string& authority_id, const std::string& resource_type_url) const PURE; /** * Invoked when SotW xDS configuration updates have been received from an xDS authority, have been @@ -64,14 +64,15 @@ class XdsResourcesDelegateFactory : public Config::TypedFactory { * @param config Configuration of the XdsResourcesDelegate to create. * @param validation_visitor Validates the configuration. * @param api The APIs that can be used by the delegate. + * @param dispatcher The dispatcher for the thread. * @return The created XdsResourcesDelegate instance */ virtual XdsResourcesDelegatePtr createXdsResourcesDelegate(const ProtobufWkt::Any& config, - ProtobufMessage::ValidationVisitor& validation_visitor, - Api::Api& api) PURE; + ProtobufMessage::ValidationVisitor& validation_visitor, Api::Api& api, + Event::Dispatcher& dispatcher) PURE; - std::string category() const override { return "envoy.config.xds"; } + std::string category() const override { return "envoy.xds_delegates"; } }; } // namespace Config diff --git a/source/common/config/grpc_mux_impl.cc b/source/common/config/grpc_mux_impl.cc index fbca03a5933ab..2840283d4cba5 100644 --- a/source/common/config/grpc_mux_impl.cc +++ b/source/common/config/grpc_mux_impl.cc @@ -74,6 +74,13 @@ void GrpcMuxImpl::sendDiscoveryRequest(absl::string_view type_url) { return; } + if (first_stream_request_) { + // On the initialization of the gRPC mux, load the persisted config, if available. If the xDS + // server cannot be reached, the locally persisted config will be used until connectivity is + // established with the xDS server. + loadConfigFromDelegate(std::string(type_url)); + } + ApiState& api_state = apiStateFor(type_url); auto& request = api_state.request_; request.mutable_resource_names()->Clear(); @@ -106,6 +113,49 @@ void GrpcMuxImpl::sendDiscoveryRequest(absl::string_view type_url) { } } +void GrpcMuxImpl::loadConfigFromDelegate(const std::string& type_url) { + if (!xds_resources_delegate_.has_value()) { + return; + } + ApiState& api_state = apiStateFor(type_url); + if (api_state.watches_.empty()) { + // No watches, so exit without loading config from storage. + return; + } + + TRY_ASSERT_MAIN_THREAD { + std::vector resources = + xds_resources_delegate_->getResources(target_xds_authority_, std::string(type_url)); + if (resources.empty()) { + // There are no persisted resources, so nothing to process. + return; + } + + std::vector decoded_resources; + OpaqueResourceDecoder& resource_decoder = api_state.watches_.front()->resource_decoder_; + std::string version_info; + for (const auto& resource : resources) { + if (version_info.empty()) { + version_info = resource.version(); + } else { + ASSERT(resource.version() == version_info); + } + + decoded_resources.emplace_back( + std::make_unique(resource_decoder, resource)); + } + + processDiscoveryResources(decoded_resources, api_state, type_url, version_info, + /*call_delegate=*/false); + } + END_TRY + catch (const EnvoyException& e) { + // TODO(abeyad): do something more than just logging the error? + ENVOY_LOG(warn, "Failed to load locally-persisted xDS configuration for {}, type url {}: {}", + target_xds_authority_, type_url, e.what()); + } +} + GrpcMuxWatchPtr GrpcMuxImpl::addWatch(const std::string& type_url, const absl::flat_hash_set& resources, SubscriptionCallbacks& callbacks, @@ -211,18 +261,9 @@ void GrpcMuxImpl::onDiscoveryResponse( // see https://github.com/envoyproxy/envoy/issues/11477. same_type_resume = pause(type_url); TRY_ASSERT_MAIN_THREAD { - // To avoid O(n^2) explosion (e.g. when we have 1000s of EDS watches), we - // build a map here from resource name to resource and then walk watches_. - // We have to walk all watches (and need an efficient map as a result) to - // ensure we deliver empty config updates when a resource is dropped. We make the map ordered - // for test determinism. std::vector resources; - absl::btree_map resource_ref_map; - std::vector all_resource_refs; OpaqueResourceDecoder& resource_decoder = api_state.watches_.front()->resource_decoder_; - const auto scoped_ttl_update = api_state.ttl_.scopedTtlUpdate(); - for (const auto& resource : message->resources()) { // TODO(snowp): Check the underlying type when the resource is a Resource. if (!resource.Is() && @@ -235,57 +276,13 @@ void GrpcMuxImpl::onDiscoveryResponse( auto decoded_resource = DecodedResourceImpl::fromResource(resource_decoder, resource, message->version_info()); - if (decoded_resource->ttl()) { - api_state.ttl_.add(*decoded_resource->ttl(), decoded_resource->name()); - } else { - api_state.ttl_.clear(decoded_resource->name()); - } - if (!isHeartbeatResource(type_url, *decoded_resource)) { resources.emplace_back(std::move(decoded_resource)); - all_resource_refs.emplace_back(*resources.back()); - resource_ref_map.emplace(resources.back()->name(), *resources.back()); } } - // Execute external config validators if there are any watches. - if (!api_state.watches_.empty()) { - config_validators_->executeValidators(type_url, resources); - } - - for (auto watch : api_state.watches_) { - // onConfigUpdate should be called in all cases for single watch xDS (Cluster and - // Listener) even if the message does not have resources so that update_empty stat - // is properly incremented and state-of-the-world semantics are maintained. - if (watch->resources_.empty()) { - watch->callbacks_.onConfigUpdate(all_resource_refs, message->version_info()); - continue; - } - std::vector found_resources; - for (const auto& watched_resource_name : watch->resources_) { - auto it = resource_ref_map.find(watched_resource_name); - if (it != resource_ref_map.end()) { - found_resources.emplace_back(it->second); - } - } - - // onConfigUpdate should be called only on watches(clusters/listeners) that have - // updates in the message for EDS/RDS. - if (!found_resources.empty()) { - watch->callbacks_.onConfigUpdate(found_resources, message->version_info()); - } - } - - // All config updates have been applied without throwing an exception, so we'll call the xDS - // resources delegate, if any. - if (xds_resources_delegate_.has_value()) { - xds_resources_delegate_->onConfigUpdated(target_xds_authority_, type_url, all_resource_refs); - } - - // TODO(mattklein123): In the future if we start tracking per-resource versions, we - // would do that tracking here. - api_state.request_.set_version_info(message->version_info()); - Memory::Utils::tryShrinkHeap(); + processDiscoveryResources(resources, api_state, type_url, message->version_info(), + /*call_delegate=*/true); } END_TRY catch (const EnvoyException& e) { @@ -302,6 +299,72 @@ void GrpcMuxImpl::onDiscoveryResponse( queueDiscoveryRequest(type_url); } +void GrpcMuxImpl::processDiscoveryResources(const std::vector& resources, + ApiState& api_state, const std::string& type_url, + const std::string& version_info, + const bool call_delegate) { + ASSERT_IS_MAIN_OR_TEST_THREAD(); + // To avoid O(n^2) explosion (e.g. when we have 1000s of EDS watches), we + // build a map here from resource name to resource and then walk watches_. + // We have to walk all watches (and need an efficient map as a result) to + // ensure we deliver empty config updates when a resource is dropped. We make the map ordered + // for test determinism. + absl::btree_map resource_ref_map; + std::vector all_resource_refs; + + const auto scoped_ttl_update = api_state.ttl_.scopedTtlUpdate(); + + for (const auto& resource : resources) { + if (resource->ttl()) { + api_state.ttl_.add(*resource->ttl(), resource->name()); + } else { + api_state.ttl_.clear(resource->name()); + } + + all_resource_refs.emplace_back(*resource); + resource_ref_map.emplace(resource->name(), *resource); + } + + // Execute external config validators if there are any watches. + if (!api_state.watches_.empty()) { + config_validators_->executeValidators(type_url, resources); + } + + for (auto watch : api_state.watches_) { + // onConfigUpdate should be called in all cases for single watch xDS (Cluster and + // Listener) even if the message does not have resources so that update_empty stat + // is properly incremented and state-of-the-world semantics are maintained. + if (watch->resources_.empty()) { + watch->callbacks_.onConfigUpdate(all_resource_refs, version_info); + continue; + } + std::vector found_resources; + for (const auto& watched_resource_name : watch->resources_) { + auto it = resource_ref_map.find(watched_resource_name); + if (it != resource_ref_map.end()) { + found_resources.emplace_back(it->second); + } + } + + // onConfigUpdate should be called only on watches(clusters/listeners) that have + // updates in the message for EDS/RDS. + if (!found_resources.empty()) { + watch->callbacks_.onConfigUpdate(found_resources, version_info); + } + } + + // All config updates have been applied without throwing an exception, so we'll call the xDS + // resources delegate, if any. + if (call_delegate && xds_resources_delegate_.has_value()) { + xds_resources_delegate_->onConfigUpdated(target_xds_authority_, type_url, all_resource_refs); + } + + // TODO(mattklein123): In the future if we start tracking per-resource versions, we + // would do that tracking here. + api_state.request_.set_version_info(version_info); + Memory::Utils::tryShrinkHeap(); +} + void GrpcMuxImpl::onWriteable() { drainRequests(); } void GrpcMuxImpl::onStreamEstablished() { diff --git a/source/common/config/grpc_mux_impl.h b/source/common/config/grpc_mux_impl.h index 5bb3e90a6c8da..b7af6101d5c82 100644 --- a/source/common/config/grpc_mux_impl.h +++ b/source/common/config/grpc_mux_impl.h @@ -167,6 +167,12 @@ class GrpcMuxImpl : public GrpcMux, void queueDiscoveryRequest(absl::string_view queue_item); // Invoked when dynamic context parameters change for a resource type. void onDynamicContextUpdate(absl::string_view resource_type_url); + // Must be invoked from the main or test thread. + void loadConfigFromDelegate(const std::string& type_url); + // Must be invoked from the main or test thread. + void processDiscoveryResources(const std::vector& resources, + ApiState& api_state, const std::string& type_url, + const std::string& version_info, bool call_delegate); GrpcStream diff --git a/source/common/config/xds_mux/sotw_subscription_state.cc b/source/common/config/xds_mux/sotw_subscription_state.cc index 7792ced341089..8888838da542d 100644 --- a/source/common/config/xds_mux/sotw_subscription_state.cc +++ b/source/common/config/xds_mux/sotw_subscription_state.cc @@ -71,6 +71,7 @@ void SotwSubscriptionState::handleGoodResponse( // TODO (dmitri-d) to eliminate decoding of resources twice consider expanding the interface to // support passing of decoded resources. This would also avoid a resource copy above. callbacks().onConfigUpdate(non_heartbeat_resources, message.version_info()); + // TODO(abeyad): Add support for calling the Config::UpdateListener registered listeners. // Now that we're passed onConfigUpdate() without an exception thrown, we know we're good. last_good_version_info_ = message.version_info(); last_good_nonce_ = message.nonce(); diff --git a/source/common/upstream/cluster_manager_impl.cc b/source/common/upstream/cluster_manager_impl.cc index f0327a76d54d5..b9e1b58fac467 100644 --- a/source/common/upstream/cluster_manager_impl.cc +++ b/source/common/upstream/cluster_manager_impl.cc @@ -325,9 +325,9 @@ ClusterManagerImpl::ClusterManagerImpl( if (bootstrap.has_xds_delegate_extension()) { auto& factory = Config::Utility::getAndCheckFactory( bootstrap.xds_delegate_extension()); - xds_resources_delegate_ = - factory.createXdsResourcesDelegate(bootstrap.xds_delegate_extension().typed_config(), - validation_context.dynamicValidationVisitor(), api); + xds_resources_delegate_ = factory.createXdsResourcesDelegate( + bootstrap.xds_delegate_extension().typed_config(), + validation_context.dynamicValidationVisitor(), api, main_thread_dispatcher); } subscription_factory_ = std::make_unique( diff --git a/test/integration/base_integration_test.cc b/test/integration/base_integration_test.cc index 90e9e96746a47..9870203847163 100644 --- a/test/integration/base_integration_test.cc +++ b/test/integration/base_integration_test.cc @@ -579,8 +579,18 @@ AssertionResult BaseIntegrationTest::compareSotwDiscoveryRequest( const std::string& expected_type_url, const std::string& expected_version, const std::vector& expected_resource_names, bool expect_node, const Protobuf::int32 expected_error_code, const std::string& expected_error_substring) { + return compareSotwDiscoveryRequest(xds_stream_, expected_type_url, expected_version, + expected_resource_names, expect_node, expected_error_code, + expected_error_substring); +} + +AssertionResult BaseIntegrationTest::compareSotwDiscoveryRequest( + FakeStreamPtr& stream, const std::string& expected_type_url, + const std::string& expected_version, const std::vector& expected_resource_names, + bool expect_node, const Protobuf::int32 expected_error_code, + const std::string& expected_error_substring) { envoy::service::discovery::v3::DiscoveryRequest discovery_request; - VERIFY_ASSERTION(xds_stream_->waitForGrpcMessage(*dispatcher_, discovery_request)); + VERIFY_ASSERTION(stream->waitForGrpcMessage(*dispatcher_, discovery_request)); if (expect_node) { EXPECT_TRUE(discovery_request.has_node()); diff --git a/test/integration/base_integration_test.h b/test/integration/base_integration_test.h index e3954e7f0e614..6bb96d104da6d 100644 --- a/test/integration/base_integration_test.h +++ b/test/integration/base_integration_test.h @@ -173,6 +173,16 @@ class BaseIntegrationTest : protected Logger::Loggable { const std::vector& expected_resource_names_removed, bool expect_node = false, const Protobuf::int32 expected_error_code = Grpc::Status::WellKnownGrpcStatus::Ok, const std::string& expected_error_message = ""); + // For SotW tests, provides the same functionality as the function above, except it also takes in + // the FakeStream instead of using xds_stream_. Continue to use the above function for SotW tests + // unless the xDS test is not using xds_stream_. + AssertionResult compareSotwDiscoveryRequest( + FakeStreamPtr& stream, const std::string& expected_type_url, + const std::string& expected_version, const std::vector& expected_resource_names, + bool expect_node = false, + const Protobuf::int32 expected_error_code = Grpc::Status::WellKnownGrpcStatus::Ok, + const std::string& expected_error_message = ""); + template void sendDiscoveryResponse(const std::string& type_url, const std::vector& state_of_the_world, const std::vector& added_or_updated, @@ -210,8 +220,8 @@ class BaseIntegrationTest : protected Logger::Loggable { const std::string& expected_error_message = ""); template - void sendSotwDiscoveryResponse(const std::string& type_url, const std::vector& messages, - const std::string& version) { + void sendSotwDiscoveryResponse(FakeStreamPtr& stream, const std::string& type_url, + const std::vector& messages, const std::string& version) { envoy::service::discovery::v3::DiscoveryResponse discovery_response; discovery_response.set_version_info(version); discovery_response.set_type_url(type_url); @@ -220,7 +230,12 @@ class BaseIntegrationTest : protected Logger::Loggable { } static int next_nonce_counter = 0; discovery_response.set_nonce(absl::StrCat("nonce", next_nonce_counter++)); - xds_stream_->sendGrpcMessage(discovery_response); + stream->sendGrpcMessage(discovery_response); + } + template + void sendSotwDiscoveryResponse(const std::string& type_url, const std::vector& messages, + const std::string& version) { + sendSotwDiscoveryResponse(xds_stream_, type_url, messages, version); } template diff --git a/test/integration/xds_delegate_extension_integration_test.cc b/test/integration/xds_delegate_extension_integration_test.cc index 5e23a07fc271d..cdc07f3ff2a36 100644 --- a/test/integration/xds_delegate_extension_integration_test.cc +++ b/test/integration/xds_delegate_extension_integration_test.cc @@ -28,7 +28,7 @@ class TestXdsResourcesDelegate : public Config::XdsResourcesDelegate { std::vector getResources(const std::string& /*authority_id*/, - const std::string& /*resource_type_url*/) override { + const std::string& /*resource_type_url*/) const override { // TODO(abeyad): implement this and test for it when we add support for loading config from the // delegate in a subsequent PR. return {}; @@ -50,9 +50,9 @@ class TestXdsResourcesDelegateFactory : public Config::XdsResourcesDelegateFacto std::string name() const override { return "envoy.config.xds.test_delegate"; }; - Config::XdsResourcesDelegatePtr createXdsResourcesDelegate(const ProtobufWkt::Any&, - ProtobufMessage::ValidationVisitor&, - Api::Api&) override { + Config::XdsResourcesDelegatePtr + createXdsResourcesDelegate(const ProtobufWkt::Any&, ProtobufMessage::ValidationVisitor&, + Api::Api&, Event::Dispatcher& dispatcher) override { return std::make_unique(); } };