Skip to content
This repository was archived by the owner on Sep 23, 2024. It is now read-only.

Commit 193f4d8

Browse files
authored
Backward compatibility for PG version > 10 (#44)
1 parent 68e8cb5 commit 193f4d8

File tree

2 files changed

+25
-23
lines changed

2 files changed

+25
-23
lines changed

setup.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
long_description = f.read()
77

88
setup(name='pipelinewise-tap-postgres',
9-
version='1.5.1',
9+
version='1.5.2',
1010
description='Singer.io tap for extracting data from PostgresSQL - PipelineWise compatible',
1111
long_description=long_description,
1212
long_description_content_type='text/markdown',

tap_postgres/sync_strategies/logical_replication.py

+24-22
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,12 @@
1919

2020
UPDATE_BOOKMARK_PERIOD = 10000
2121

22-
def get_pg_version(cur):
23-
cur.execute("SELECT setting::int AS version FROM pg_settings WHERE name='server_version_num'")
24-
version = cur.fetchone()[0]
25-
LOGGER.debug("Detected PostgreSQL version: %s", version)
22+
def get_pg_version(conn_info):
23+
with post_db.open_connection(conn_info, False) as conn:
24+
with conn.cursor() as cur:
25+
cur.execute("SELECT setting::int AS version FROM pg_settings WHERE name='server_version_num'")
26+
version = cur.fetchone()[0]
27+
LOGGER.debug("Detected PostgreSQL version: {}".format(version))
2628
return version
2729

2830

@@ -60,25 +62,24 @@ def int_to_lsn(lsni):
6062

6163

6264
def fetch_current_lsn(conn_config):
65+
version = get_pg_version(conn_config)
66+
# Make sure PostgreSQL version is 9.4 or higher
67+
# Do not allow minor versions with PostgreSQL BUG #15114
68+
if (version >= 110000) and (version < 110002):
69+
raise Exception('PostgreSQL upgrade required to minor version 11.2')
70+
elif (version >= 100000) and (version < 100007):
71+
raise Exception('PostgreSQL upgrade required to minor version 10.7')
72+
elif (version >= 90600) and (version < 90612):
73+
raise Exception('PostgreSQL upgrade required to minor version 9.6.12')
74+
elif (version >= 90500) and (version < 90516):
75+
raise Exception('PostgreSQL upgrade required to minor version 9.5.16')
76+
elif (version >= 90400) and (version < 90421):
77+
raise Exception('PostgreSQL upgrade required to minor version 9.4.21')
78+
elif (version < 90400):
79+
raise Exception('Logical replication not supported before PostgreSQL 9.4')
80+
6381
with post_db.open_connection(conn_config, False) as conn:
6482
with conn.cursor() as cur:
65-
# Make sure PostgreSQL version is 9.4 or higher
66-
version = get_pg_version(cur)
67-
68-
# Do not allow minor versions with PostgreSQL BUG #15114
69-
if (version >= 110000) and (version < 110002):
70-
raise Exception('PostgreSQL upgrade required to minor version 11.2')
71-
elif (version >= 100000) and (version < 100007):
72-
raise Exception('PostgreSQL upgrade required to minor version 10.7')
73-
elif (version >= 90600) and (version < 90612):
74-
raise Exception('PostgreSQL upgrade required to minor version 9.6.12')
75-
elif (version >= 90500) and (version < 90516):
76-
raise Exception('PostgreSQL upgrade required to minor version 9.5.16')
77-
elif (version >= 90400) and (version < 90421):
78-
raise Exception('PostgreSQL upgrade required to minor version 9.4.21')
79-
elif (version < 90400):
80-
raise Exception('Logical replication not supported before PostgreSQL 9.4')
81-
8283
# Use version specific lsn command
8384
if version >= 100000:
8485
cur.execute("SELECT pg_current_wal_lsn() AS current_lsn")
@@ -388,12 +389,13 @@ def sync_tables(conn_info, logical_streams, state, end_lsn, state_file):
388389
for s in logical_streams:
389390
sync_common.send_schema_message(s, ['lsn'])
390391

392+
version = get_pg_version(conn_info)
393+
391394
# Create replication connection and cursor
392395
conn = post_db.open_connection(conn_info, True)
393396
cur = conn.cursor()
394397

395398
# Set session wal_sender_timeout for PG12 and above
396-
version = get_pg_version(cur)
397399
if (version >= 120000):
398400
wal_sender_timeout = 10800000 #10800000ms = 3 hours
399401
LOGGER.info("Set session wal_sender_timeout = {} milliseconds".format(wal_sender_timeout))

0 commit comments

Comments
 (0)