diff --git a/devel/cpanfile b/devel/cpanfile index a6878d92..94e233c4 100644 --- a/devel/cpanfile +++ b/devel/cpanfile @@ -9,6 +9,9 @@ requires "Net::EmptyPort"; requires "Path::Tiny"; requires "Proc::Guard" => 0.07; requires "Sys::Hostname"; +requires 'Test::DNS'; +requires 'Test::Instance::DNS'; +requires 'Test::Warnings'; requires "Try::Tiny::Retry" => 0.004; requires "Type::Utils"; requires "Types::Path::Tiny"; diff --git a/devel/t-dynamic/dns-srv-switchover.t b/devel/t-dynamic/dns-srv-switchover.t new file mode 100644 index 00000000..3ca48820 --- /dev/null +++ b/devel/t-dynamic/dns-srv-switchover.t @@ -0,0 +1,255 @@ +use strict; +use Test::More; +use Test::Warnings ':all'; + +use lib "devel/lib"; + +BEGIN { + # lowering minimum TTL value so we don't have to wait a minute + $ENV{TEST_MONGO_MIN_RESCAN_FREQUENCY_MS} = 4; +} + +use MongoDB::_URI; +use Test::Instance::DNS; +use File::Temp; +use Net::EmptyPort qw/ empty_port /; +use MongoDBTest::Orchestrator; + +# +# DNS mock config setup +# + +my $port = empty_port; + +# Net::DNS +$ENV{RES_NAMESERVERS} = '127.0.0.1'; +$ENV{RES_OPTIONS} = 'port:'.$port; + +# local mock override for Socket connection +$ENV{TEST_MONGO_SOCKET_HOST} = 'localhost'; + +# +# ensuring basic URI switchover +# + +my $uri; + +with_srv('testdb1', 1234, sub { + $uri = MongoDB::_URI->new(uri => 'mongodb+srv://test.example.com'); +}); + +my $updated; + +with_srv('testdb2', 1234, sub { + $updated = $uri->check_for_changes({ fallback_ttl_sec => 4 }); +}); + +ok $updated, 'uri was updated'; + +# +# URI specification conformity +# + +with_srv(['testdb1', 'testdb2'], 1234, sub { + $uri = MongoDB::_URI->new(uri => 'mongodb+srv://test.example.com'); + is_deeply $uri->hostids, + ['testdb1.example.com:1234', 'testdb2.example.com:1234'], + 'correct initial hosts'; +}); + +with_srv(['testdb1', 'testdb2', 'testdb3'], 1234, sub { + ok $uri->check_for_changes, 'update detected'; + is_deeply $uri->hostids, + [ 'testdb1.example.com:1234', + 'testdb2.example.com:1234', + 'testdb3.example.com:1234', + ], + 'new host was added'; +}); + +with_srv(['testdb2', 'testdb3'], 1234, sub { + ok $uri->check_for_changes, 'update detected'; + is_deeply $uri->hostids, + ['testdb2.example.com:1234', 'testdb3.example.com:1234'], + 'first host was removed'; +}); + +with_srv(['testdb2', 'testdb4'], 1234, sub { + ok $uri->check_for_changes, 'update detected'; + is_deeply $uri->hostids, + ['testdb2.example.com:1234', 'testdb4.example.com:1234'], + 'host was replaced'; +}); + +with_srv(['testdb5'], 1234, sub { + ok $uri->check_for_changes, 'update detected'; + is_deeply $uri->hostids, + ['testdb5.example.com:1234'], + 'hosts were both replaced (single)'; +}); + +with_srv(['testdb2', 'testdb4'], 1234, sub { + ok $uri->check_for_changes, 'update detected'; + is_deeply $uri->hostids, + ['testdb2.example.com:1234', 'testdb4.example.com:1234'], + 'back to two hosts'; +}); + +with_srv(['testdb5', 'testdb6'], 1234, sub { + ok $uri->check_for_changes, 'update detected'; + is_deeply $uri->hostids, + ['testdb5.example.com:1234', 'testdb6.example.com:1234'], + 'hosts were both replaced (multiple)'; +}); + + +# +# ensuring topology switchover +# + +my $orc = MongoDBTest::Orchestrator->new( + config_file => "devel/config/sharded-any.yml", +); +$orc->start; + +my ($client, $coll, $inserted); +my $host = 'router1'; + +my @events; + +with_srv('testdb1', $orc->get_server($host)->port, sub { + + use Test::DNS; + my $dns = Test::DNS->new(nameservers => ['127.0.0.1']); + $dns->object->port($port); + $dns->is_a('testdb1.example.com', '127.0.0.1'); + + $client = MongoDB->connect('mongodb+srv://test.example.com', { + ssl => 0, + monitoring_callback => sub { + push @events, shift; + }, + }); + $coll = $client->ns('test.db1'); + $inserted = $coll->insert_one({ foo => 23 }); +}); + +@events = (); + +with_srv('testdb2', $orc->get_server($host)->port, sub { + $inserted = $coll->insert_one({ foo => 42 }); + is_connected('testdb2.example.com'); +}); + +@events = (); + +with_srv('testdb1', $orc->get_server($host)->port, sub { + my $data = $coll->find_one({ _id => $inserted->inserted_id }); + is_connected('testdb1.example.com'); + is $data->{foo}, 42, 'correct value'; +}); + +@events = (); + +with_srv('testdb2', $orc->get_server($host)->port, sub { + my $data = $coll->find_one({ _id => $inserted->inserted_id }); + is_connected('testdb2.example.com'); + is $data->{foo}, 42, 'correct value'; +}); + +@events = (); + +with_srv('testdb1', $orc->get_server($host)->port, sub { + my $data = $coll->find_one({ _id => $inserted->inserted_id }); + is_not_connected('testdb1.example.com'); + is_connected('testdb2.example.com'); + is $data->{foo}, 42, 'correct value'; +}, 0); + +@events = (); + +with_srv(undef, $orc->get_server($host)->port, sub { + my $data; + my $warning = warning { + $data = $coll->find_one({ _id => $inserted->inserted_id }); + }; + is_connected('testdb2.example.com'); + is $data->{foo}, 42, 'correct value'; + like $warning, qr{test\.example\.com}, 'caught error as warning'; +}); + + + + + +done_testing; + +sub is_not_connected { + my ($domain) = @_; + my $count = grep { + exists $_->{connectionId} + && + $_->{connectionId} =~ qr{\A\Q$domain\E:\d+\z} + } @events; + ok !$count, "not connected to $domain"; +} + +sub is_connected { + my ($domain) = @_; + my $count = grep { + exists $_->{connectionId} + && + $_->{connectionId} =~ qr{\A\Q$domain\E:\d+\z} + } @events; + ok $count, "connected to $domain"; +} + +sub with_srv { + my ($domain, $dbport, $callback, $wait) = @_; + + $wait = 5 unless defined $wait; + do { + my $domain = $domain || ['']; + $domain = [$domain] + unless ref $domain; + note("set SRV records to [@$domain], waiting for ${wait}s"); + }; + sleep $wait; + + do { + my $zonefile = File::Temp->new; + + my @domains = ref($domain) ? @$domain : ($domain); + + print $zonefile "$_\n" for ( + q{$ORIGIN example.com.}, + q{$TTL 1s}, + q{testdb1 IN A 127.0.0.1}, + q{testdb2 IN A 127.0.0.1}, + q{testdb3 IN A 127.0.0.1}, + q{testdb4 IN A 127.0.0.1}, + q{testdb5 IN A 127.0.0.1}, + q{testdb6 IN A 127.0.0.1}, + q{ns IN A 127.0.0.1}, + q{example.com. IN NS ns}, + (map { + sprintf( + q{_mongodb._tcp.test.example.com. 1 IN SRV 0 5 %s %s}, + $dbport, + $_, + ) + } grep defined, @domains), + ); + $zonefile->close; + + my $t_i_dns = Test::Instance::DNS->new( + listen_addr => '127.0.0.1', + listen_port => $port, + zone_file => $zonefile->filename, + ); + $t_i_dns->run; + + $callback->(); + }; +} + diff --git a/lib/MongoDB/_Constants.pm b/lib/MongoDB/_Constants.pm index 9bb20975..a9b2c4e2 100644 --- a/lib/MongoDB/_Constants.pm +++ b/lib/MongoDB/_Constants.pm @@ -44,6 +44,7 @@ BEGIN { MIN_KEYED_DOC_LENGTH => 8, MIN_SERVER_VERSION => "2.4.0", MIN_WIRE_VERSION => 0, + RESCAN_SRV_FREQUENCY_SEC => $ENV{TEST_MONGO_RESCAN_SRV_FREQUENCY_SEC} || 60, NO_JOURNAL_RE => qr/^journaling not enabled/, NO_REPLICATION_RE => qr/^no replication has been enabled/, P_INT32 => $] lt '5.010' ? 'l' : 'l<', diff --git a/lib/MongoDB/_Link.pm b/lib/MongoDB/_Link.pm index 9012dab7..e337d909 100644 --- a/lib/MongoDB/_Link.pm +++ b/lib/MongoDB/_Link.pm @@ -278,7 +278,7 @@ sub connect { # workaround, we always force 'localhost' to use IPv4. my $fh = $SOCKET_CLASS->new( - PeerHost => $host, + PeerHost => $ENV{TEST_MONGO_SOCKET_HOST} || $host, PeerPort => $port, ( lc($host) eq 'localhost' ? ( Family => AF_INET ) : () ), Proto => 'tcp', diff --git a/lib/MongoDB/_Topology.pm b/lib/MongoDB/_Topology.pm index e27d9c3f..43b9dbb1 100644 --- a/lib/MongoDB/_Topology.pm +++ b/lib/MongoDB/_Topology.pm @@ -401,6 +401,36 @@ sub DEMOLISH { return; } +sub _check_for_uri_changes { + my ($self) = @_; + + my $type = $self->type; + return unless + $type eq 'Sharded' + or $type eq 'Unknown'; + + my @existing = @{ $self->uri->hostids }; + + my $options = { + fallback_ttl_sec => $self->{heartbeat_frequency_sec}, + }; + + if ($self->uri->check_for_changes($options)) { + my %new = map { ($_, 1) } @{ $self->uri->hostids }; + for my $address (@existing) { + if (!$new{$address}) { + $self->_remove_address($address); + } + else { + delete $new{$address}; + } + } + for my $address (keys %new) { + $self->_add_address_as_unknown($address); + } + } +} + #--------------------------------------------------------------------------# # public methods #--------------------------------------------------------------------------# @@ -432,6 +462,7 @@ sub close_all_links { sub get_readable_link { my ( $self, $read_pref ) = @_; + $self->_check_for_uri_changes; my $mode = $read_pref ? lc $read_pref->mode : 'primary'; my $method = @@ -464,6 +495,7 @@ sub get_readable_link { sub get_specific_link { my ( $self, $address ) = @_; + $self->_check_for_uri_changes; my $server = $self->servers->{$address}; if ( $server && ( my $link = $self->_get_server_link($server) ) ) { @@ -476,6 +508,7 @@ sub get_specific_link { sub get_writable_link { my ($self) = @_; + $self->_check_for_uri_changes; my $method = ( $self->type eq "Single" || $self->type eq "Sharded" ) diff --git a/lib/MongoDB/_URI.pm b/lib/MongoDB/_URI.pm index d8d4d8a0..2f568f22 100644 --- a/lib/MongoDB/_URI.pm +++ b/lib/MongoDB/_URI.pm @@ -22,6 +22,8 @@ our $VERSION = 'v2.1.1'; use Moo; use MongoDB::Error; use Encode (); +use Time::HiRes qw(time); +use MongoDB::_Constants qw( RESCAN_SRV_FREQUENCY_SEC ); use Types::Standard qw( Any ArrayRef @@ -91,6 +93,12 @@ has valid_options => ( isa => HashRef, ); +has expires => ( + is => 'ro', + isa => Int, + writer => '_set_expires', +); + sub _build_valid_options { my $self = shift; return { @@ -300,7 +308,7 @@ sub _parse_options { } sub _fetch_dns_seedlist { - my ( $self, $host_name ) = @_; + my ( $self, $host_name, $phase ) = @_; my @split_name = split( '\.', $host_name ); MongoDB::Error->throw("URI '$self' must contain domain name and hostname") @@ -314,21 +322,29 @@ sub _fetch_dns_seedlist { my @hosts; my $options = {}; my $domain_name = join( '.', @split_name[1..$#split_name] ); + my $minimum_ttl; if ( $srv_data ) { - foreach my $rr ( $srv_data->answer ) { + SRV_RECORD: foreach my $rr ( $srv_data->answer ) { next unless $rr->type eq 'SRV'; my $target = $rr->target; # search for dot before domain name for a valid hostname - can have sub-subdomain unless ( $target =~ /\.\Q$domain_name\E$/ ) { - MongoDB::Error->throw( - "URI '$self' SRV record returns FQDN '$target'" - . " which does not match domain name '${$domain_name}'" - ); + my $err_msg = "URI '$self' SRV record returns FQDN '$target'" + . " which does not match domain name '${$domain_name}'"; + if ($phase && $phase eq 'init') { + MongoDB::Error->throw($err_msg); + } + else { + warn $err_msg; + } + next SRV_RECORD; } push @hosts, { target => $target, port => $rr->port, }; + $minimum_ttl = $rr->ttl + if not defined $minimum_ttl or $rr->ttl < $minimum_ttl; } my $txt_data = $res->query( $host_name, 'TXT' ); if ( defined $txt_data ) { @@ -348,11 +364,25 @@ sub _fetch_dns_seedlist { MongoDB::Error->throw("URI '$self' does not return any SRV results"); } - return ( \@hosts, $options ); + unless (@hosts) { + my $err_msg = "URI '$self' does not return any valid SRV results"; + if ($phase && $phase eq 'init') { + MongoDB::Error->throw($err_msg); + } + else { + warn $err_msg; + } + } + + $minimum_ttl = RESCAN_SRV_FREQUENCY_SEC + if $minimum_ttl < RESCAN_SRV_FREQUENCY_SEC + && $phase && $phase ne 'init'; + + return ( \@hosts, $options, time + $minimum_ttl ); } sub _parse_srv_uri { - my ( $self, $uri ) = @_; + my ( $self, $uri, $phase ) = @_; my %result; @@ -381,7 +411,7 @@ sub _parse_srv_uri { $result{options} = $self->_parse_options( $self->valid_options, \%result ); } - my ( $hosts, $options ) = $self->_fetch_dns_seedlist( $result{hostids} ); + my ( $hosts, $options, $expires ) = $self->_fetch_dns_seedlist( $result{hostids}, $phase ); # Default to SSL on unless specified in conn string options $options = { @@ -405,17 +435,104 @@ sub _parse_srv_uri { join( '&', map { sprintf( '%s=%s', $_, __uri_escape( $options->{$_} ) ) } keys %$options ), ); - return $new_uri; + return( $new_uri, $expires ); } sub BUILD { my ($self) = @_; + $self->_initialize_from_uri; +} + +# Options: +# - fallback_ttl_sec: Fallback TTL in seconds in case of an error +sub check_for_changes { + my ($self, $options) = @_; + + if (defined $self->{expires} && $self->{expires} <= time) { + my @current = sort @{ $self->{hostids} }; + local $@; + my $ok = eval { + + $self->_update_from_uri; + 1; + }; + if (!$ok) { + warn "Error while fetching SRV records: $@"; + $self->{expires} = $options->{fallback_ttl_sec}; + }; + return 0 + unless $ok; + my @new = sort @{ $self->{hostids} }; + return 1 + unless @current == @new; + for my $index (0 .. $#current) { + return 1 + unless $new[$index] eq $current[$index]; + } + return 0; + } + + return 0; +} + +sub _prepare_dns_hosts { + my ($self, $hostids) = @_; + + if ( !defined $hostids || !length $hostids ) { + MongoDB::Error->throw("URI '$self' could not be parsed (missing host list)"); + } + $hostids = [ map { lc _unescape_all($_) } split ',', $hostids ]; + for my $hostid (@$hostids) { + MongoDB::Error->throw( + "URI '$self' could not be parsed (Unix domain sockets are not supported)") + if $hostid =~ /\// && $hostid =~ /\.sock/; + MongoDB::Error->throw( + "URI '$self' could not be parsed (IP literals are not supported)") + if substr( $hostid, 0, 1 ) eq '['; + my ( $host, $port ) = split ":", $hostid, 2; + MongoDB::Error->throw("host list '@{ $hostids }' contains empty host") + unless length $host; + if ( defined $port ) { + MongoDB::Error->throw("URI '$self' could not be parsed (invalid port '$port')") + unless $port =~ /^\d+$/; + MongoDB::Error->throw( + "URI '$self' could not be parsed (invalid port '$port' (must be in range [1,65535])") + unless $port >= 1 && $port <= 65535; + } + } + $hostids = [ map { /:/ ? $_ : $_.":27017" } @$hostids ]; + return $hostids; +} + +sub _update_from_uri { + my ($self) = @_; + + my $uri = $self->uri; + my %result; + + ($uri, my $expires) = $self->_parse_srv_uri( $uri ); + $self->{expires} = $expires; + + if ( $uri !~ m{^$uri_re$} ) { + MongoDB::Error->throw("URI '$self' could not be parsed"); + } + + my $hostids = $3; + $hostids = $self->_prepare_dns_hosts($hostids); + + $self->{hostids} = $hostids; +} + +sub _initialize_from_uri { + my ($self) = @_; + my $uri = $self->uri; my %result; if ( $uri =~ m{^mongodb\+srv://} ) { - $uri = $self->_parse_srv_uri( $uri ); + ($uri, my $expires) = $self->_parse_srv_uri( $uri, 'init' ); + $result{expires} = $expires; } # we throw Error instead of UsageError for errors, to avoid stacktrace revealing credentials @@ -442,29 +559,7 @@ sub BUILD { $result{password} = _unescape_all( $result{password} ); } - if ( !defined $result{hostids} || !length $result{hostids} ) { - MongoDB::Error->throw("URI '$self' could not be parsed (missing host list)"); - } - $result{hostids} = [ map { lc _unescape_all($_) } split ',', $result{hostids} ]; - for my $hostid ( @{ $result{hostids} } ) { - MongoDB::Error->throw( - "URI '$self' could not be parsed (Unix domain sockets are not supported)") - if $hostid =~ /\// && $hostid =~ /\.sock/; - MongoDB::Error->throw( - "URI '$self' could not be parsed (IP literals are not supported)") - if substr( $hostid, 0, 1 ) eq '['; - my ( $host, $port ) = split ":", $hostid, 2; - MongoDB::Error->throw("host list '@{ $result{hostids} }' contains empty host") - unless length $host; - if ( defined $port ) { - MongoDB::Error->throw("URI '$self' could not be parsed (invalid port '$port')") - unless $port =~ /^\d+$/; - MongoDB::Error->throw( - "URI '$self' could not be parsed (invalid port '$port' (must be in range [1,65535])") - unless $port >= 1 && $port <= 65535; - } - } - $result{hostids} = [ map { /:/ ? $_ : $_.":27017" } @{ $result{hostids} } ]; + $result{hostids} = $self->_prepare_dns_hosts($result{hostids}); if ( defined $result{db_name} ) { MongoDB::Error->throw( @@ -477,7 +572,7 @@ sub BUILD { $result{options} = $self->_parse_options( $self->valid_options, \%result ); } - for my $attr (qw/username password db_name options hostids/) { + for my $attr (qw/username password db_name options hostids expires/) { my $setter = "_set_$attr"; $self->$setter( $result{$attr} ) if defined $result{$attr}; }