Skip to content

Commit e4b1b28

Browse files
committed
Improve metadata bwc test for Logical Replication
1 parent 4781e62 commit e4b1b28

File tree

1 file changed

+52
-13
lines changed

1 file changed

+52
-13
lines changed

tests/bwc/test_rolling_upgrade.py

Lines changed: 52 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
1+
import time
12
import unittest
23
from crate.client import connect
34
from crate.client.exceptions import ProgrammingError
45

5-
from crate.qa.tests import NodeProvider, insert_data, wait_for_active_shards, UpgradePath
6+
from crate.qa.tests import NodeProvider, insert_data, wait_for_active_shards, UpgradePath, assert_busy
67

78
ROLLING_UPGRADES_V4 = (
89
# 4.0.0 -> 4.0.1 -> 4.0.2 don't support rolling upgrades due to a bug
@@ -30,23 +31,20 @@
3031
)
3132

3233
ROLLING_UPGRADES_V5 = (
33-
UpgradePath('5.0.x', '5.1.x'),
34-
UpgradePath('5.1.x', '5.2.x'),
35-
UpgradePath('5.2.x', '5.3.x'),
36-
UpgradePath('5.3.x', '5.4.x'),
37-
UpgradePath('5.4.x', '5.5.x'),
38-
UpgradePath('5.5.x', '5.6.x'),
39-
UpgradePath('5.6.x', '5.7.x'),
40-
UpgradePath('5.7.x', '5.8.x'),
41-
UpgradePath('5.8.x', '5.9.x'),
42-
UpgradePath('5.9.x', '5.10.x'),
43-
UpgradePath('5.10.x', '5.10'),
44-
UpgradePath('5.10', 'latest-nightly'),
34+
UpgradePath('5.10', 'branch:master'),
4535
)
4636

4737

4838
class RollingUpgradeTest(NodeProvider, unittest.TestCase):
4939

40+
def _num_docs(self, cursor, schema, table):
41+
cursor.execute("select sum(num_docs) from sys.shards where schema_name = ? and table_name = ?", (schema, table))
42+
return cursor.fetchall()[0][0]
43+
44+
def _assert_num_docs(self, cursor, schema, table, expected_count):
45+
count = self._num_docs(cursor, schema, table)
46+
self.assertEqual(expected_count, count)
47+
5048
def test_rolling_upgrade_4_to_5(self):
5149
print("") # force newline for first print
5250
for path in ROLLING_UPGRADES_V4:
@@ -88,6 +86,10 @@ def _test_rolling_upgrade(self, path, nodes):
8886
}
8987
cluster = self._new_cluster(path.from_version, nodes, settings=settings)
9088
cluster.start()
89+
replica_cluster = None
90+
if int(path.from_version.split('.')[0]) >= 5 and int(path.from_version.split('.')[1]) >= 10:
91+
replica_cluster = self._new_cluster(path.from_version, 1, settings=settings, explicit_discovery=False)
92+
replica_cluster.start()
9193
with connect(cluster.node().http_url, error_trace=True) as conn:
9294
c = conn.cursor()
9395
c.execute("create user arthur with (password = 'secret')")
@@ -152,6 +154,24 @@ def _test_rolling_upgrade(self, path, nodes):
152154
# Add the shards of the new partition primaries
153155
expected_active_shards += shards
154156

157+
# Set up tables for logical replications
158+
if int(path.from_version.split('.')[0]) >= 5 and int(path.from_version.split('.')[1]) >= 10:
159+
c.execute("create table doc.x (a int) clustered into 1 shards with (number_of_replicas=0)")
160+
expected_active_shards += 1
161+
c.execute("create publication p for table doc.x")
162+
with connect(replica_cluster.node().http_url, error_trace=True) as replica_conn:
163+
rc = replica_conn.cursor()
164+
transport_port = cluster.node().addresses.transport.port
165+
replica_transport_port = replica_cluster.node().addresses.transport.port
166+
assert 4300 <= transport_port <= 4310 and 4300 <= replica_transport_port <= 4310
167+
rc.execute("create table doc.rx (a int) clustered into 1 shards with (number_of_replicas=0)")
168+
rc.execute("create publication rp for table doc.rx")
169+
rc.execute(f"create subscription rs connection 'crate://localhost:{transport_port}?user=crate&sslmode=sniff' publication p")
170+
assert_busy(lambda: self._assert_num_docs(rc, "doc", "x", 0))
171+
c.execute(f"create subscription s connection 'crate://localhost:{replica_transport_port}?user=crate&sslmode=sniff' publication rp")
172+
assert_busy(lambda: self._assert_num_docs(c, "doc", "rx", 0))
173+
expected_active_shards += 1
174+
155175
for idx, node in enumerate(cluster):
156176
# Enforce an old version node be a handler to make sure that an upgraded node can serve 'select *' from an old version node.
157177
# Otherwise upgraded node simply requests N-1 columns from old version with N columns and it always works.
@@ -282,6 +302,25 @@ def _test_rolling_upgrade(self, path, nodes):
282302
c.execute("select version['created'] from information_schema.table_partitions where table_name = 't3' and values['a'] = ?", [idx])
283303
self.assertEqual(c.fetchall(), [[partition_version]])
284304

305+
# Ensure logical replications works
306+
if int(path.from_version.split('.')[0]) >= 5 and int(path.from_version.split('.')[1]) >= 10:
307+
with connect(replica_cluster.node().http_url, error_trace=True) as replica_conn:
308+
rc = replica_conn.cursor()
309+
310+
# Cannot drop replicated tables
311+
with self.assertRaises(ProgrammingError):
312+
rc.execute("drop table doc.x")
313+
c.execute("drop table doc.rx")
314+
315+
count = self._num_docs(rc, "doc", "x")
316+
count2 = self._num_docs(c, "doc", "rx")
317+
318+
c.execute("insert into doc.x values (1)")
319+
rc.execute("insert into doc.rx values (1)")
320+
321+
assert_busy(lambda: self._assert_num_docs(rc, "doc", "x", count + 1))
322+
assert_busy(lambda: self._assert_num_docs(c, "doc", "rx", count2 + 1))
323+
285324
# Finally validate that all shards (primaries and replicas) of all partitions are started
286325
# and writes into the partitioned table while upgrading were successful
287326
with connect(cluster.node().http_url, error_trace=True) as conn:

0 commit comments

Comments
 (0)