Skip to content

Commit 9d00462

Browse files
committed
Improve metadata bwc test for Logical Replication
1 parent 4781e62 commit 9d00462

File tree

1 file changed

+41
-0
lines changed

1 file changed

+41
-0
lines changed

tests/bwc/test_rolling_upgrade.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
1+
import time
12
import unittest
23
from crate.client import connect
4+
from crate.client.cursor import Cursor
35
from crate.client.exceptions import ProgrammingError
46

57
from crate.qa.tests import NodeProvider, insert_data, wait_for_active_shards, UpgradePath
@@ -88,6 +90,10 @@ def _test_rolling_upgrade(self, path, nodes):
8890
}
8991
cluster = self._new_cluster(path.from_version, nodes, settings=settings)
9092
cluster.start()
93+
replica_cluster = None
94+
if int(path.from_version.split('.')[0]) >= 5 and int(path.from_version.split('.')[1]) >= 10:
95+
replica_cluster = self._new_cluster(path.from_version, 1, settings=settings, explicit_discovery=False)
96+
replica_cluster.start()
9197
with connect(cluster.node().http_url, error_trace=True) as conn:
9298
c = conn.cursor()
9399
c.execute("create user arthur with (password = 'secret')")
@@ -152,6 +158,21 @@ def _test_rolling_upgrade(self, path, nodes):
152158
# Add the shards of the new partition primaries
153159
expected_active_shards += shards
154160

161+
# Set up tables for logical replications
162+
if int(path.from_version.split('.')[0]) >= 5 and int(path.from_version.split('.')[1]) >= 10:
163+
c.execute("create table doc.x (a int) clustered into 1 shards with (number_of_replicas=0)")
164+
expected_active_shards += 1
165+
c.execute("create publication p for table doc.x")
166+
with connect(replica_cluster.node().http_url, error_trace=True) as replica_conn:
167+
rc = replica_conn.cursor()
168+
rc.execute("create table doc.rx (a int) clustered into 1 shards with (number_of_replicas=0)")
169+
rc.execute("create publication rp for table doc.rx")
170+
rc.execute(f"create subscription rs connection 'crate://localhost:{cluster.node().addresses.transport.port}?user=crate&sslmode=sniff' publication p")
171+
wait_for_active_shards(rc)
172+
c.execute(f"create subscription s connection 'crate://localhost:{replica_cluster.node().addresses.transport.port}?user=crate&sslmode=sniff' publication rp")
173+
wait_for_active_shards(c)
174+
expected_active_shards += 1
175+
155176
for idx, node in enumerate(cluster):
156177
# Enforce an old version node be a handler to make sure that an upgraded node can serve 'select *' from an old version node.
157178
# Otherwise upgraded node simply requests N-1 columns from old version with N columns and it always works.
@@ -282,6 +303,26 @@ def _test_rolling_upgrade(self, path, nodes):
282303
c.execute("select version['created'] from information_schema.table_partitions where table_name = 't3' and values['a'] = ?", [idx])
283304
self.assertEqual(c.fetchall(), [[partition_version]])
284305

306+
# Ensure logical replications works
307+
if int(path.from_version.split('.')[0]) >= 5 and int(path.from_version.split('.')[1]) >= 10:
308+
with connect(replica_cluster.node().http_url, error_trace=True) as replica_conn:
309+
rc = replica_conn.cursor()
310+
rc.execute("select count(*) from doc.x")
311+
count = rc.fetchall()[0][0]
312+
c.execute("select count(*) from doc.rx")
313+
count2 = c.fetchall()[0][0]
314+
315+
c.execute("insert into doc.x values (1)")
316+
rc.execute("insert into doc.rx values (1)")
317+
318+
# account for replication delay, wait_for_active_shards nor REFRESH help here
319+
time.sleep(5)
320+
321+
rc.execute("select count(*) from doc.x")
322+
self.assertEqual(rc.fetchall()[0][0], count + 1)
323+
c.execute("select count(*) from doc.rx")
324+
self.assertEqual(c.fetchall()[0][0], count2 + 1)
325+
285326
# Finally validate that all shards (primaries and replicas) of all partitions are started
286327
# and writes into the partitioned table while upgrading were successful
287328
with connect(cluster.node().http_url, error_trace=True) as conn:

0 commit comments

Comments
 (0)