diff --git a/api/envoy/service/ext_proc/v3/BUILD b/api/envoy/service/ext_proc/v3/BUILD index 0e337d5c3ed11..37704a3249557 100644 --- a/api/envoy/service/ext_proc/v3/BUILD +++ b/api/envoy/service/ext_proc/v3/BUILD @@ -7,6 +7,7 @@ licenses(["notice"]) # Apache 2 api_proto_package( has_services = True, deps = [ + "//envoy/annotations:pkg", "//envoy/config/core/v3:pkg", "//envoy/extensions/filters/http/ext_proc/v3:pkg", "//envoy/type/v3:pkg", diff --git a/api/envoy/service/ext_proc/v3/external_processor.proto b/api/envoy/service/ext_proc/v3/external_processor.proto index aa62ef74226da..21ce87a0cf509 100644 --- a/api/envoy/service/ext_proc/v3/external_processor.proto +++ b/api/envoy/service/ext_proc/v3/external_processor.proto @@ -9,6 +9,7 @@ import "envoy/type/v3/http_status.proto"; import "google/protobuf/duration.proto"; import "google/protobuf/struct.proto"; +import "envoy/annotations/deprecation.proto"; import "udpa/annotations/status.proto"; import "validate/validate.proto"; @@ -56,7 +57,7 @@ service ExternalProcessor { // This represents the different types of messages that Envoy can send // to an external processing server. -// [#next-free-field: 9] +// [#next-free-field: 10] message ProcessingRequest { // Specify whether the filter that sent this request is running in synchronous // or asynchronous mode. The choice of synchronous or asynchronous mode @@ -112,6 +113,12 @@ message ProcessingRequest { // Dynamic metadata associated with the request. config.core.v3.Metadata metadata_context = 8; + + // The values of properties selected by the ``request_attributes`` + // or ``response_attributes`` list in the configuration. Each entry + // in the list is populated from the standard + // :ref:`attributes ` supported across Envoy. + map attributes = 9; } // For every ProcessingRequest received by the server with the ``async_mode`` field @@ -204,12 +211,11 @@ message HttpHeaders { config.core.v3.HeaderMap headers = 1; // [#not-implemented-hide:] - // The values of properties selected by the ``request_attributes`` - // or ``response_attributes`` list in the configuration. Each entry - // in the list is populated - // from the standard :ref:`attributes ` - // supported across Envoy. - map attributes = 2; + // This field is deprecated and not implemented. Attributes will be sent in + // the top-level :ref:`attributes attributes = 2 + [deprecated = true, (envoy.annotations.deprecated_at_minor_version) = "3.0"]; // If true, then there is no message body associated with this // request or response. diff --git a/source/extensions/filters/http/ext_proc/ext_proc.cc b/source/extensions/filters/http/ext_proc/ext_proc.cc index 4b41dbdc57a24..3e68fcae75688 100644 --- a/source/extensions/filters/http/ext_proc/ext_proc.cc +++ b/source/extensions/filters/http/ext_proc/ext_proc.cc @@ -1,5 +1,7 @@ #include "source/extensions/filters/http/ext_proc/ext_proc.h" +#include + #include "envoy/config/common/mutation_rules/v3/mutation_rules.pb.h" #include "envoy/config/core/v3/grpc_service.pb.h" #include "envoy/extensions/filters/http/ext_proc/v3/processing_mode.pb.h" @@ -276,8 +278,7 @@ void Filter::onDestroy() { } FilterHeadersStatus Filter::onHeaders(ProcessorState& state, - Http::RequestOrResponseHeaderMap& headers, bool end_stream, - ProtobufWkt::Struct* proto) { + Http::RequestOrResponseHeaderMap& headers, bool end_stream) { switch (openStream()) { case StreamOpenState::Error: return FilterHeadersStatus::StopIteration; @@ -291,14 +292,12 @@ FilterHeadersStatus Filter::onHeaders(ProcessorState& state, state.setHeaders(&headers); state.setHasNoBody(end_stream); ProcessingRequest req; + addAttributes(state, req); addDynamicMetadata(state, req); auto* headers_req = state.mutableHeaders(req); MutationUtils::headersToProto(headers, config_->allowedHeaders(), config_->disallowedHeaders(), *headers_req->mutable_headers()); headers_req->set_end_of_stream(end_stream); - if (proto != nullptr) { - (*headers_req->mutable_attributes())[FilterName] = *proto; - } state.onStartProcessorCall(std::bind(&Filter::onMessageTimeout, this), config_->messageTimeout(), ProcessorState::CallbackState::HeadersCallback); ENVOY_LOG(debug, "Sending headers message"); @@ -315,19 +314,14 @@ FilterHeadersStatus Filter::decodeHeaders(RequestHeaderMap& headers, bool end_st decoding_state_.setCompleteBodyAvailable(true); } + // Set the request headers on decoding and encoding state in case they are + // needed later. + decoding_state_.setRequestHeaders(&headers); + encoding_state_.setRequestHeaders(&headers); + FilterHeadersStatus status = FilterHeadersStatus::Continue; if (decoding_state_.sendHeaders()) { - ProtobufWkt::Struct proto; - - if (config_->expressionManager().hasRequestExpr()) { - auto activation_ptr = Filters::Common::Expr::createActivation( - &config_->expressionManager().localInfo(), decoding_state_.callbacks()->streamInfo(), - &headers, nullptr, nullptr); - proto = config_->expressionManager().evaluateRequestAttributes(*activation_ptr); - } - - status = onHeaders(decoding_state_, headers, end_stream, - config_->expressionManager().hasRequestExpr() ? &proto : nullptr); + status = onHeaders(decoding_state_, headers, end_stream); ENVOY_LOG(trace, "onHeaders returning {}", static_cast(status)); } else { ENVOY_LOG(trace, "decodeHeaders: Skipped header processing"); @@ -590,7 +584,7 @@ FilterTrailersStatus Filter::onTrailers(ProcessorState& state, Http::HeaderMap& FilterTrailersStatus Filter::decodeTrailers(RequestTrailerMap& trailers) { ENVOY_LOG(trace, "decodeTrailers"); const auto status = onTrailers(decoding_state_, trailers); - ENVOY_LOG(trace, "encodeTrailers returning {}", static_cast(status)); + ENVOY_LOG(trace, "decodeTrailers returning {}", static_cast(status)); return status; } @@ -605,17 +599,7 @@ FilterHeadersStatus Filter::encodeHeaders(ResponseHeaderMap& headers, bool end_s FilterHeadersStatus status = FilterHeadersStatus::Continue; if (!processing_complete_ && encoding_state_.sendHeaders()) { - ProtobufWkt::Struct proto; - - if (config_->expressionManager().hasResponseExpr()) { - auto activation_ptr = Filters::Common::Expr::createActivation( - &config_->expressionManager().localInfo(), encoding_state_.callbacks()->streamInfo(), - nullptr, &headers, nullptr); - proto = config_->expressionManager().evaluateResponseAttributes(*activation_ptr); - } - - status = onHeaders(encoding_state_, headers, end_stream, - config_->expressionManager().hasResponseExpr() ? &proto : nullptr); + status = onHeaders(encoding_state_, headers, end_stream); ENVOY_LOG(trace, "onHeaders returns {}", static_cast(status)); } else { ENVOY_LOG(trace, "encodeHeaders: Skipped header processing"); @@ -650,6 +634,7 @@ ProcessingRequest Filter::setupBodyChunk(ProcessorState& state, const Buffer::In bool end_stream) { ENVOY_LOG(debug, "Sending a body chunk of {} bytes, end_stream {}", data.length(), end_stream); ProcessingRequest req; + addAttributes(state, req); addDynamicMetadata(state, req); auto* body_req = state.mutableBody(req); body_req->set_end_of_stream(end_stream); @@ -667,6 +652,7 @@ void Filter::sendBodyChunk(ProcessorState& state, ProcessorState::CallbackState void Filter::sendTrailers(ProcessorState& state, const Http::HeaderMap& trailers) { ProcessingRequest req; + addAttributes(state, req); addDynamicMetadata(state, req); auto* trailers_req = state.mutableTrailers(req); MutationUtils::headersToProto(trailers, config_->allowedHeaders(), config_->disallowedHeaders(), @@ -771,6 +757,21 @@ void Filter::addDynamicMetadata(const ProcessorState& state, ProcessingRequest& *req.mutable_metadata_context() = forwarding_metadata; } +void Filter::addAttributes(ProcessorState& state, ProcessingRequest& req) { + if (!state.sendAttributes(config_->expressionManager())) { + return; + } + + auto activation_ptr = Filters::Common::Expr::createActivation( + &config_->expressionManager().localInfo(), state.callbacks()->streamInfo(), + state.requestHeaders(), dynamic_cast(state.responseHeaders()), + dynamic_cast(state.responseTrailers())); + auto attributes = state.evaluateAttributes(config_->expressionManager(), *activation_ptr); + + state.setSentAttributes(true); + (*req.mutable_attributes())[FilterName] = attributes; +} + void Filter::setDynamicMetadata(Http::StreamFilterCallbacks* cb, const ProcessorState& state, const ProcessingResponse& response) { if (state.untypedReceivingMetadataNamespaces().empty() || !response.has_dynamic_metadata()) { diff --git a/source/extensions/filters/http/ext_proc/ext_proc.h b/source/extensions/filters/http/ext_proc/ext_proc.h index 78d97cbe9bab0..1ef14a7c2e810 100644 --- a/source/extensions/filters/http/ext_proc/ext_proc.h +++ b/source/extensions/filters/http/ext_proc/ext_proc.h @@ -373,8 +373,7 @@ class Filter : public Logger::Loggable, void sendImmediateResponse(const envoy::service::ext_proc::v3::ImmediateResponse& response); Http::FilterHeadersStatus onHeaders(ProcessorState& state, - Http::RequestOrResponseHeaderMap& headers, bool end_stream, - ProtobufWkt::Struct* proto); + Http::RequestOrResponseHeaderMap& headers, bool end_stream); // Return a pair of whether to terminate returning the current result. std::pair sendStreamChunk(ProcessorState& state); @@ -386,6 +385,7 @@ class Filter : public Logger::Loggable, void setDecoderDynamicMetadata(const envoy::service::ext_proc::v3::ProcessingResponse& response); void addDynamicMetadata(const ProcessorState& state, envoy::service::ext_proc::v3::ProcessingRequest& req); + void addAttributes(ProcessorState& state, envoy::service::ext_proc::v3::ProcessingRequest& req); const FilterConfigSharedPtr config_; const ExternalProcessorClientPtr client_; diff --git a/source/extensions/filters/http/ext_proc/processor_state.h b/source/extensions/filters/http/ext_proc/processor_state.h index a3ebe27617f0b..01e9bc57ae59f 100644 --- a/source/extensions/filters/http/ext_proc/processor_state.h +++ b/source/extensions/filters/http/ext_proc/processor_state.h @@ -15,6 +15,7 @@ #include "source/common/common/logger.h" #include "absl/status/status.h" +#include "matching_utils.h" namespace Envoy { namespace Extensions { @@ -134,8 +135,12 @@ class ProcessorState : public Logger::Loggable { return body_mode_; } + void setRequestHeaders(Http::RequestHeaderMap* headers) { request_headers_ = headers; } void setHeaders(Http::RequestOrResponseHeaderMap* headers) { headers_ = headers; } void setTrailers(Http::HeaderMap* trailers) { trailers_ = trailers; } + const Http::RequestHeaderMap* requestHeaders() const { return request_headers_; }; + virtual const Http::RequestOrResponseHeaderMap* responseHeaders() const PURE; + const Http::HeaderMap* responseTrailers() const { return trailers_; } void onStartProcessorCall(Event::TimerCb cb, std::chrono::milliseconds timeout, CallbackState callback_state); @@ -202,6 +207,14 @@ class ProcessorState : public Logger::Loggable { virtual Http::StreamFilterCallbacks* callbacks() const PURE; + virtual bool sendAttributes(const ExpressionManager& mgr) const PURE; + + void setSentAttributes(bool sent) { attributes_sent_ = sent; } + + virtual ProtobufWkt::Struct + evaluateAttributes(const ExpressionManager& mgr, + const Filters::Common::Expr::Activation& activation) const PURE; + protected: void setBodyMode( envoy::extensions::filters::http::ext_proc::v3::ProcessingMode_BodySendMode body_mode); @@ -236,6 +249,10 @@ class ProcessorState : public Logger::Loggable { // The specific mode for body handling envoy::extensions::filters::http::ext_proc::v3::ProcessingMode_BodySendMode body_mode_; + // The request_headers_ field is guaranteed to hold the pointer to the request + // headers as set in decodeHeaders. This allows both decoding and encoding states + // to have access to the request headers map. + Http::RequestHeaderMap* request_headers_ = nullptr; Http::RequestOrResponseHeaderMap* headers_ = nullptr; Http::HeaderMap* trailers_ = nullptr; Event::TimerPtr message_timer_; @@ -250,6 +267,9 @@ class ProcessorState : public Logger::Loggable { const std::vector* typed_forwarding_namespaces_{}; const std::vector* untyped_receiving_namespaces_{}; + // If true, the attributes for this processing state have already been sent. + bool attributes_sent_{}; + private: virtual void clearRouteCache(const envoy::service::ext_proc::v3::CommonResponse&) {} }; @@ -324,6 +344,17 @@ class DecodingProcessorState : public ProcessorState { Http::StreamFilterCallbacks* callbacks() const override { return decoder_callbacks_; } + bool sendAttributes(const ExpressionManager& mgr) const override { + return !attributes_sent_ && mgr.hasRequestExpr(); + } + + const Http::RequestOrResponseHeaderMap* responseHeaders() const override { return nullptr; } + ProtobufWkt::Struct + evaluateAttributes(const ExpressionManager& mgr, + const Filters::Common::Expr::Activation& activation) const override { + return mgr.evaluateRequestAttributes(activation); + } + private: void setProcessingModeInternal( const envoy::extensions::filters::http::ext_proc::v3::ProcessingMode& mode); @@ -404,6 +435,18 @@ class EncodingProcessorState : public ProcessorState { Http::StreamFilterCallbacks* callbacks() const override { return encoder_callbacks_; } + bool sendAttributes(const ExpressionManager& mgr) const override { + return !attributes_sent_ && mgr.hasResponseExpr(); + } + + const Http::RequestOrResponseHeaderMap* responseHeaders() const override { return headers_; } + + ProtobufWkt::Struct + evaluateAttributes(const ExpressionManager& mgr, + const Filters::Common::Expr::Activation& activation) const override { + return mgr.evaluateResponseAttributes(activation); + } + private: void setProcessingModeInternal( const envoy::extensions::filters::http::ext_proc::v3::ProcessingMode& mode); diff --git a/test/extensions/filters/http/ext_proc/BUILD b/test/extensions/filters/http/ext_proc/BUILD index 9d511fbd9d8a9..13971f8250e43 100644 --- a/test/extensions/filters/http/ext_proc/BUILD +++ b/test/extensions/filters/http/ext_proc/BUILD @@ -158,6 +158,7 @@ envoy_extension_cc_test( "//test/proto:helloworld_proto_cc_proto", "//test/test_common:test_runtime_lib", "//test/test_common:utility_lib", + "@envoy_api//envoy/config/core/v3:pkg_cc_proto", "@envoy_api//envoy/extensions/filters/http/ext_proc/v3:pkg_cc_proto", "@envoy_api//envoy/extensions/filters/http/set_metadata/v3:pkg_cc_proto", "@envoy_api//envoy/service/ext_proc/v3:pkg_cc_proto", diff --git a/test/extensions/filters/http/ext_proc/ext_proc_integration_test.cc b/test/extensions/filters/http/ext_proc/ext_proc_integration_test.cc index d0becb1f2bab0..fb936314551db 100644 --- a/test/extensions/filters/http/ext_proc/ext_proc_integration_test.cc +++ b/test/extensions/filters/http/ext_proc/ext_proc_integration_test.cc @@ -1,6 +1,7 @@ #include #include +#include "envoy/config/core/v3/base.pb.h" #include "envoy/extensions/filters/http/ext_proc/v3/ext_proc.pb.h" #include "envoy/extensions/filters/http/set_metadata/v3/set_metadata.pb.h" #include "envoy/network/address.h" @@ -293,7 +294,6 @@ class ExtProcIntegrationTest : public HttpIntegrationTest, ASSERT_TRUE(processor_connection_->waitForNewStream(*dispatcher_, processor_stream_)); } ASSERT_TRUE(processor_stream_->waitForGrpcMessage(*dispatcher_, request)); - ASSERT_TRUE(request.has_request_headers()); if (first_message) { processor_stream_->startGrpcStream(); } @@ -3429,45 +3429,82 @@ TEST_P(ExtProcIntegrationTest, SendAndReceiveDynamicMetadata) { } #if defined(USE_CEL_PARSER) -// Test the filter using the default configuration by connecting to -// an ext_proc server that responds to the request_headers message -// by requesting to modify the request headers. -TEST_P(ExtProcIntegrationTest, GetAndSetRequestResponseAttributes) { +TEST_P(ExtProcIntegrationTest, RequestResponseAttributes) { proto_config_.mutable_processing_mode()->set_request_header_mode(ProcessingMode::SEND); proto_config_.mutable_processing_mode()->set_response_header_mode(ProcessingMode::SEND); + proto_config_.mutable_processing_mode()->set_response_trailer_mode(ProcessingMode::SEND); proto_config_.mutable_request_attributes()->Add("request.path"); proto_config_.mutable_request_attributes()->Add("request.method"); proto_config_.mutable_request_attributes()->Add("request.scheme"); proto_config_.mutable_request_attributes()->Add("connection.mtls"); + proto_config_.mutable_request_attributes()->Add("response.code"); proto_config_.mutable_response_attributes()->Add("response.code"); proto_config_.mutable_response_attributes()->Add("response.code_details"); initializeConfig(); HttpIntegrationTest::initialize(); auto response = sendDownstreamRequest(absl::nullopt); - processRequestHeadersMessage( - *grpc_upstreams_[0], true, [](const HttpHeaders& req, HeadersResponse&) { + + // Handle request headers message. + processGenericMessage( + *grpc_upstreams_[0], true, [](const ProcessingRequest& req, ProcessingResponse& resp) { + // Add something to the response so the message isn't seen as spurious + envoy::service::ext_proc::v3::HeadersResponse headers_resp; + *(resp.mutable_request_headers()) = headers_resp; + + EXPECT_TRUE(req.has_request_headers()); EXPECT_EQ(req.attributes().size(), 1); auto proto_struct = req.attributes().at("envoy.filters.http.ext_proc"); EXPECT_EQ(proto_struct.fields().at("request.path").string_value(), "/"); EXPECT_EQ(proto_struct.fields().at("request.method").string_value(), "GET"); EXPECT_EQ(proto_struct.fields().at("request.scheme").string_value(), "http"); EXPECT_EQ(proto_struct.fields().at("connection.mtls").bool_value(), false); + // Make sure we did not include the attribute which was not yet available. + EXPECT_EQ(proto_struct.fields().size(), 4); + EXPECT_FALSE(proto_struct.fields().contains("response.code")); + + // Make sure we are not including any data in the deprecated HttpHeaders.attributes. + EXPECT_TRUE(req.request_headers().attributes().empty()); return true; }); - handleUpstreamRequest(); + handleUpstreamRequestWithTrailer(); - processResponseHeadersMessage( - *grpc_upstreams_[0], false, [](const HttpHeaders& req, HeadersResponse&) { + // Handle response headers message. + processGenericMessage( + *grpc_upstreams_[0], false, [](const ProcessingRequest& req, ProcessingResponse& resp) { + // Add something to the response so the message isn't seen as spurious + envoy::service::ext_proc::v3::HeadersResponse headers_resp; + *(resp.mutable_response_headers()) = headers_resp; + + EXPECT_TRUE(req.has_response_headers()); EXPECT_EQ(req.attributes().size(), 1); auto proto_struct = req.attributes().at("envoy.filters.http.ext_proc"); EXPECT_EQ(proto_struct.fields().at("response.code").string_value(), "200"); EXPECT_EQ(proto_struct.fields().at("response.code_details").string_value(), StreamInfo::ResponseCodeDetails::get().ViaUpstream); + + // Make sure we didn't include request attributes in the response-path processing request. + EXPECT_FALSE(proto_struct.fields().contains("request.method")); + + // Make sure we are not including any data in the deprecated HttpHeaders.attributes. + EXPECT_TRUE(req.response_headers().attributes().empty()); return true; }); + // Handle response trailers message, making sure we did not send request or response attributes + // again. + processGenericMessage(*grpc_upstreams_[0], false, + [](const ProcessingRequest& req, ProcessingResponse& resp) { + // Add something to the response so the message isn't seen as spurious + envoy::service::ext_proc::v3::TrailersResponse trailer_resp; + *(resp.mutable_response_trailers()) = trailer_resp; + + EXPECT_TRUE(req.has_response_trailers()); + EXPECT_TRUE(req.attributes().empty()); + return true; + }); + verifyDownstreamResponse(*response, 200); } #endif