Skip to content

Commit

Permalink
postgres: add ability to terminate SSL at Envoy
Browse files Browse the repository at this point in the history
Adds ability to use _starttls_ transport socket to terminate SSL at Envoy and pass unencrypted traffic upstream to Postgres server.

Signed-off-by: Fabrízio de Royes Mello <[email protected]>
  • Loading branch information
fabriziomello committed Feb 18, 2021
1 parent 31bdbef commit 1174009
Show file tree
Hide file tree
Showing 15 changed files with 369 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,15 @@ message PostgresProxy {
// are parsed. Parsing is required to produce Postgres proxy filter
// metadata. Defaults to true.
google.protobuf.BoolValue enable_sql_parsing = 2;

// Controls whether to terminate SSL session initiated by a client.
// If the value is false, the Postgres proxy filter will not try to
// terminate SSL session, but will pass all the packets to the upstream server.
// If the value is true, the Postgres proxy filter will try to terminate SSL
// session. In order to do that, the filter chain must use :ref:`starttls transport socket
// <envoy_api_msg_extensions.transport_sockets.starttls.v3.StartTlsConfig>`.
// If the filter does not manage to terminate the SSL session, it will close the connection from the client.
// Refer to official documentation for details
// `SSL Session Encryption Message Flow <https://www.postgresql.org/docs/current/protocol-flow.html#id-1.10.5.7.11>`_.
bool terminate_ssl = 3;
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ Every configured Postgres proxy filter has statistics rooted at postgres.<stat_p
messages_frontend, Counter, Number of frontend messages detected by the filter
messages_unknown, Counter, Number of times the filter successfully decoded a message but did not know what to do with it
sessions, Counter, Total number of successful logins
sessions_encrypted, Counter, Number of times the filter detected encrypted sessions
sessions_encrypted, Counter, Number of times the filter detected and passed upstream encrypted sessions
sessions_terminated_ssl, Counter, Number of times the filter terminated SSL sessions
sessions_unencrypted, Counter, Number of messages indicating unencrypted successful login
statements, Counter, Total number of SQL statements
statements_delete, Counter, Number of DELETE statements
Expand Down
1 change: 1 addition & 0 deletions docs/root/version_history/current.rst
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ Removed Config or Runtime

New Features
------------
* postgres: added ability to :ref:`terminate SSL<envoy_v3_api_field_extensions.filters.network.postgres_proxy.v3alpha.PostgresProxy.terminate_ssl>`.

Deprecated
----------

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 6 additions & 3 deletions source/extensions/filters/network/postgres_proxy/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,14 @@ NetworkFilters::PostgresProxy::PostgresConfigFactory::createFilterFactoryFromPro
Server::Configuration::FactoryContext& context) {
ASSERT(!proto_config.stat_prefix().empty());

const std::string stat_prefix = fmt::format("postgres.{}", proto_config.stat_prefix());
const bool enable_sql = PROTOBUF_GET_WRAPPED_OR_DEFAULT(proto_config, enable_sql_parsing, true);
PostgresFilterConfig::PostgresFilterConfigOptions config_options;
config_options.stats_prefix_ = fmt::format("postgres.{}", proto_config.stat_prefix());
config_options.enable_sql_parsing_ =
PROTOBUF_GET_WRAPPED_OR_DEFAULT(proto_config, enable_sql_parsing, true);
config_options.terminate_ssl_ = proto_config.terminate_ssl();

PostgresFilterConfigSharedPtr filter_config(
std::make_shared<PostgresFilterConfig>(stat_prefix, enable_sql, context.scope()));
std::make_shared<PostgresFilterConfig>(config_options, context.scope()));
return [filter_config](Network::FilterManager& filter_manager) -> void {
filter_manager.addFilter(std::make_shared<PostgresFilter>(filter_config));
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,13 +176,13 @@ void DecoderImpl::initialize() {
};
}

bool DecoderImpl::parseHeader(Buffer::Instance& data) {
Decoder::Result DecoderImpl::parseHeader(Buffer::Instance& data) {
ENVOY_LOG(trace, "postgres_proxy: parsing message, len {}", data.length());

// The minimum size of the message sufficient for parsing is 5 bytes.
if (data.length() < 5) {
// not enough data in the buffer.
return false;
return Decoder::NeedMoreData;
}

if (!startup_) {
Expand All @@ -198,44 +198,64 @@ bool DecoderImpl::parseHeader(Buffer::Instance& data) {
ENVOY_LOG(trace, "postgres_proxy: cannot parse message. Need {} bytes in buffer",
message_len_ + (startup_ ? 0 : 1));
// Not enough data in the buffer.
return false;
return Decoder::NeedMoreData;
}

if (startup_) {
uint32_t code = data.peekBEInt<uint32_t>(4);
// Startup message with 1234 in the most significant 16 bits
// indicate request to encrypt.
if (code >= 0x04d20000) {
ENVOY_LOG(trace, "postgres_proxy: detected encrypted traffic.");
encrypted_ = true;
startup_ = false;
incSessionsEncrypted();
// Handler for SSLRequest (Int32(80877103) = 0x04d2162f)
// See details in https://www.postgresql.org/docs/current/protocol-message-formats.html.
if (code == 0x04d2162f) {
// Notify the filter that `SSLRequest` message was decoded.
// If the filter returns true, it means to pass the message upstream
// to the server. If it returns false it means, that filter will try
// to terminate SSL session and SSLRequest should not be passed to the
// server.
encrypted_ = callbacks_->onSSLRequest();
}

// Count it as recognized frontend message.
callbacks_->incMessagesFrontend();
if (encrypted_) {
ENVOY_LOG(trace, "postgres_proxy: detected encrypted traffic.");
incSessionsEncrypted();
startup_ = false;
}
data.drain(data.length());
return false;
return encrypted_ ? Decoder::ReadyForNext : Decoder::Stopped;
} else {
ENVOY_LOG(debug, "Detected version {}.{} of Postgres", code >> 16, code & 0x0000FFFF);
// 4 bytes of length and 4 bytes of version code.
}
}

data.drain(startup_ ? 4 : 5); // Length plus optional 1st byte.

ENVOY_LOG(trace, "postgres_proxy: msg parsed");
return true;
return Decoder::ReadyForNext;
}

bool DecoderImpl::onData(Buffer::Instance& data, bool frontend) {
Decoder::Result DecoderImpl::onData(Buffer::Instance& data, bool frontend) {
// If encrypted, just drain the traffic.
if (encrypted_) {
ENVOY_LOG(trace, "postgres_proxy: ignoring {} bytes of encrypted data", data.length());
data.drain(data.length());
return true;
return Decoder::ReadyForNext;
}

if (!frontend && startup_) {
data.drain(data.length());
return Decoder::ReadyForNext;
}

ENVOY_LOG(trace, "postgres_proxy: decoding {} bytes", data.length());

if (!parseHeader(data)) {
return false;
const Decoder::Result result = parseHeader(data);
if (result != Decoder::ReadyForNext || encrypted_) {
return result;
}

MsgGroup& msg_processor = std::ref(frontend ? FE_messages_ : BE_messages_);
Expand Down Expand Up @@ -283,7 +303,7 @@ bool DecoderImpl::onData(Buffer::Instance& data, bool frontend) {
data.drain(bytes_to_read);
ENVOY_LOG(trace, "postgres_proxy: {} bytes remaining in buffer", data.length());

return true;
return Decoder::ReadyForNext;
}

// Method is called when C (CommandComplete) message has been
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,25 @@ class DecoderCallbacks {
virtual void incErrors(ErrorType) PURE;

virtual void processQuery(const std::string&) PURE;

virtual bool onSSLRequest() PURE;
};

// Postgres message decoder.
class Decoder {
public:
virtual ~Decoder() = default;

virtual bool onData(Buffer::Instance& data, bool frontend) PURE;
// The following values are returned by the decoder, when filter
// passes bytes of data via onData method:
enum Result {
ReadyForNext, // Decoder processed previous message and is ready for the next message.
NeedMoreData, // Decoder needs more data to reconstruct the message.
Stopped // Received and processed message disrupts the current flow. Decoder stopped accepting
// data. This happens when decoder wants filter to perform some action, for example to
// call starttls transport socket to enable TLS.
};
virtual Result onData(Buffer::Instance& data, bool frontend) PURE;
virtual PostgresSession& getSession() PURE;

const Extensions::Common::SQLUtils::SQLUtils::DecoderAttributes& getAttributes() const {
Expand All @@ -69,7 +80,7 @@ class DecoderImpl : public Decoder, Logger::Loggable<Logger::Id::filter> {
public:
DecoderImpl(DecoderCallbacks* callbacks) : callbacks_(callbacks) { initialize(); }

bool onData(Buffer::Instance& data, bool frontend) override;
Result onData(Buffer::Instance& data, bool frontend) override;
PostgresSession& getSession() override { return session_; }

std::string getMessage() { return message_; }
Expand Down Expand Up @@ -121,7 +132,7 @@ class DecoderImpl : public Decoder, Logger::Loggable<Logger::Id::filter> {
MsgAction unknown_;
};

bool parseHeader(Buffer::Instance& data);
Result parseHeader(Buffer::Instance& data);
void decode(Buffer::Instance& data);
void decodeAuthentication();
void decodeBackendStatements();
Expand Down
72 changes: 60 additions & 12 deletions source/extensions/filters/network/postgres_proxy/postgres_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@ namespace Extensions {
namespace NetworkFilters {
namespace PostgresProxy {

PostgresFilterConfig::PostgresFilterConfig(const std::string& stat_prefix, bool enable_sql_parsing,
PostgresFilterConfig::PostgresFilterConfig(const PostgresFilterConfigOptions& config_options,
Stats::Scope& scope)
: enable_sql_parsing_(enable_sql_parsing), scope_{scope}, stats_{generateStats(stat_prefix,
scope)} {}
: enable_sql_parsing_(config_options.enable_sql_parsing_),
terminate_ssl_(config_options.terminate_ssl_), scope_{scope},
stats_{generateStats(config_options.stats_prefix_, scope)} {}

PostgresFilter::PostgresFilter(PostgresFilterConfigSharedPtr config) : config_{config} {
if (!decoder_) {
Expand All @@ -29,9 +30,12 @@ Network::FilterStatus PostgresFilter::onData(Buffer::Instance& data, bool) {

// Frontend Buffer
frontend_buffer_.add(data);
doDecode(frontend_buffer_, true);

return Network::FilterStatus::Continue;
Network::FilterStatus result = doDecode(frontend_buffer_, true);
if (result == Network::FilterStatus::StopIteration) {
ASSERT(frontend_buffer_.length() == 0);
data.drain(data.length());
}
return result;
}

Network::FilterStatus PostgresFilter::onNewConnection() { return Network::FilterStatus::Continue; }
Expand All @@ -45,9 +49,7 @@ Network::FilterStatus PostgresFilter::onWrite(Buffer::Instance& data, bool) {

// Backend Buffer
backend_buffer_.add(data);
doDecode(backend_buffer_, false);

return Network::FilterStatus::Continue;
return doDecode(backend_buffer_, false);
}

DecoderPtr PostgresFilter::createDecoder(DecoderCallbacks* callbacks) {
Expand Down Expand Up @@ -186,12 +188,58 @@ void PostgresFilter::processQuery(const std::string& sql) {
}
}

void PostgresFilter::doDecode(Buffer::Instance& data, bool frontend) {
bool PostgresFilter::onSSLRequest() {
if (!config_->terminate_ssl_) {
// Signal to the decoder to continue.
return true;
}
// Send single bytes 'S' to indicate switch to TLS.
// Refer to official documentation for protocol details:
// https://www.postgresql.org/docs/current/protocol-flow.html
Buffer::OwnedImpl buf;
buf.add("S");
// Add callback to be notified when the reply message has been
// transmitted.
read_callbacks_->connection().addBytesSentCallback([=](uint64_t bytes) -> bool {
// Wait until 'S' has been transmitted.
if (bytes >= 1) {
if (!read_callbacks_->connection().startSecureTransport()) {
ENVOY_CONN_LOG(info, "postgres_proxy: cannot enable secure transport. Check configuration.",
read_callbacks_->connection());
read_callbacks_->connection().close(Network::ConnectionCloseType::NoFlush);
} else {
// Unsubscribe the callback.
config_->stats_.sessions_terminated_ssl_.inc();
ENVOY_CONN_LOG(trace, "postgres_proxy: enabled SSL termination.",
read_callbacks_->connection());
// Switch to TLS has been completed.
// Signal to the decoder to stop processing the current message (SSLRequest).
// Because Envoy terminates SSL, the message was consumed and should not be
// passed to other filters in the chain.
return false;
}
}
return true;
});
read_callbacks_->connection().write(buf, false);

return false;
}

Network::FilterStatus PostgresFilter::doDecode(Buffer::Instance& data, bool frontend) {
// Keep processing data until buffer is empty or decoder says
// that it cannot process data in the buffer.
while ((0 < data.length()) && (decoder_->onData(data, frontend))) {
;
while (0 < data.length()) {
switch (decoder_->onData(data, frontend)) {
case Decoder::NeedMoreData:
return Network::FilterStatus::Continue;
case Decoder::ReadyForNext:
continue;
case Decoder::Stopped:
return Network::FilterStatus::StopIteration;
}
}
return Network::FilterStatus::Continue;
}

} // namespace PostgresProxy
Expand Down
13 changes: 10 additions & 3 deletions source/extensions/filters/network/postgres_proxy/postgres_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ namespace PostgresProxy {
COUNTER(messages_unknown) \
COUNTER(sessions) \
COUNTER(sessions_encrypted) \
COUNTER(sessions_terminated_ssl) \
COUNTER(sessions_unencrypted) \
COUNTER(statements) \
COUNTER(statements_insert) \
Expand Down Expand Up @@ -62,10 +63,15 @@ struct PostgresProxyStats {
*/
class PostgresFilterConfig {
public:
PostgresFilterConfig(const std::string& stat_prefix, bool enable_sql_parsing,
Stats::Scope& scope);
struct PostgresFilterConfigOptions {
std::string stats_prefix_;
bool enable_sql_parsing_;
bool terminate_ssl_;
};
PostgresFilterConfig(const PostgresFilterConfigOptions& config_options, Stats::Scope& scope);

bool enable_sql_parsing_{true};
bool terminate_ssl_{false};
Stats::Scope& scope_;
PostgresProxyStats stats_;

Expand Down Expand Up @@ -105,8 +111,9 @@ class PostgresFilter : public Network::Filter,
void incTransactionsCommit() override;
void incTransactionsRollback() override;
void processQuery(const std::string&) override;
bool onSSLRequest() override;

void doDecode(Buffer::Instance& data, bool);
Network::FilterStatus doDecode(Buffer::Instance& data, bool);
DecoderPtr createDecoder(DecoderCallbacks* callbacks);
void setDecoder(std::unique_ptr<Decoder> decoder) { decoder_ = std::move(decoder); }
Decoder* getDecoder() const { return decoder_.get(); }
Expand Down
3 changes: 3 additions & 0 deletions test/extensions/filters/network/postgres_proxy/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,16 @@ envoy_extension_cc_test(
],
data = [
"postgres_test_config.yaml",
"//test/config/integration/certs",
],
extension_name = "envoy.filters.network.postgres_proxy",
deps = [
"//source/common/tcp_proxy",
"//source/extensions/filters/network/postgres_proxy:config",
"//source/extensions/filters/network/postgres_proxy:filter",
"//source/extensions/filters/network/tcp_proxy:config",
"//source/extensions/transport_sockets/starttls:config",
"//test/integration:integration_lib",
"@envoy_api//envoy/extensions/filters/network/postgres_proxy/v3alpha:pkg_cc_proto",
],
)
Loading

0 comments on commit 1174009

Please sign in to comment.