Skip to content

Created new method to add a node to a cluster from a backup #271

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
260 changes: 257 additions & 3 deletions cli/scripts/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -658,6 +658,258 @@ def add_db(cluster_name, database_name, username, password):
util.message(f"## Updating cluster '{cluster_name}' json definition file")
update_json(cluster_name, db_json)

def add_node_from_backup(cluster_name, source_node, target_node, repo1_path=None, backup_id=None, script=" ", stanza=" ", install=True):
"""
Adds a new node to a cluster from a provided full pgBackrest backup

Args:
cluster_name (str): The name of the cluster to which the node is being
added.
source_node (str): The node from which configurations are copied.
target_node (str): The new node.
repo1_path (str): The repo1 path to use.
backup_id (str): Backup ID.
stanza (str): Stanza name.
script (str): Bash script.
"""
if (repo1_path and not backup_id) or (backup_id and not repo1_path):
util.exit_message("Both repo1_path and backup_id must be supplied together.")

json_validate(cluster_name)
db, db_settings, nodes = load_json(cluster_name)

cluster_data = get_cluster_json(cluster_name)
if cluster_data is None:
util.exit_message("Cluster data is missing.")
pg = db_settings["pg_version"]
pgV = f"pg{pg}"
verbose = cluster_data.get("log_level", "info")

# Load and validate the target node JSON
target_node_file = f"{target_node}.json"
if not os.path.isfile(target_node_file):
util.exit_message(f"New node json file '{target_node_file}' not found")

try:
with open(target_node_file, "r") as f:
target_node_data = json.load(f)
json_validate_add_node(target_node_data)
except Exception as e:
util.exit_message(
f"Unable to load new node json def file '{target_file_name}\n{e}")

# Retrieve source node data
source_node_data = next((node for node in nodes if node["name"] == source_node), None)
if source_node_data is None:
util.exit_message(f"Source node '{source_node}' not found in cluster data.")

for group in target_node_data.get("node_groups", []):
ssh_info = group.get("ssh")
os_user = ssh_info.get("os_user", "")
ssh_key = ssh_info.get("private_key", "")

new_node_data = {
"ssh": ssh_info,
"name": group.get("name", ""),
"is_active": group.get("is_active", ""),
"public_ip": group.get("public_ip", ""),
"private_ip": group.get("private_ip", ""),
"port": group.get("port", ""),
"path": group.get("path", ""),
"os_user": os_user,
"ssh_key": ssh_key
}

if "public_ip" not in new_node_data and "private_ip" not in new_node_data:
util.exit_message("Both public_ip and private_ip are missing in target node data.")

if "public_ip" in source_node_data and "private_ip" in source_node_data:
source_node_data["ip_address"] = source_node_data["public_ip"]
else:
source_node_data["ip_address"] = source_node_data.get("public_ip", source_node_data.get("private_ip"))

if "public_ip" in new_node_data and "private_ip" in new_node_data:
new_node_data["ip_address"] = new_node_data["public_ip"]
else:
new_node_data["ip_address"] = new_node_data.get("public_ip", new_node_data.get("private_ip"))

# Fetch backrest settings from cluster JSON
backrest_settings = cluster_data.get("backrest", {})
stanza = backrest_settings.get("stanza", f"pg{pg}")
repo1_retention_full = backrest_settings.get("repo1-retention-full", "7")
log_level_console = backrest_settings.get("log-level-console", "info")
repo1_cipher_type = backrest_settings.get("repo1-cipher-type", "aes-256-cbc")
pg1_path = backrest_settings.get("pg1-path", f"{source_node_data['path']}/pgedge/data/pg{pg}")
repo1_type = backrest_settings.get("repo1-type", "posix")

repo1_path = backrest_settings.get("repo1-path", False)
if not repo1_path:
repo1_path = f"/var/lib/pgbackrest/{source_node_data['name']}"
else:
repo1_path = os.path.join(repo1_path, source_node_data['name'])

rc = ssh_install_pgedge(cluster_name, db[0]["db_name"], db_settings, db[0]["db_user"],
db[0]["db_password"], [new_node_data], install, True, source_node, verbose)

os_user = new_node_data["os_user"]
repo1_type = backrest_settings.get("repo1-type", "posix")
port = source_node_data["port"]

if repo1_type == "s3":
for env_var in ["PGBACKREST_REPO1_S3_KEY", "PGBACKREST_REPO1_S3_BUCKET",
"PGBACKREST_REPO1_S3_KEY_SECRET", "PGBACKREST_REPO1_CIPHER_PASS"]:
if env_var not in os.environ:
util.exit_message(f"Environment variable {env_var} not set.")
s3_export_cmds = [f'export {env_var}={os.environ[env_var]}' for env_var in [
"PGBACKREST_REPO1_S3_KEY", "PGBACKREST_REPO1_S3_BUCKET",
"PGBACKREST_REPO1_S3_KEY_SECRET", "PGBACKREST_REPO1_CIPHER_PASS"]]
run_cmd(" && ".join(s3_export_cmds), source_node_data, message="Setting S3 environment variables on source node", verbose=verbose)
run_cmd(" && ".join(s3_export_cmds), new_node_data, message="Setting S3 environment variables on target node", verbose=verbose)

cmd = f"{new_node_data['path']}/pgedge/pgedge install backrest"
message = f"Installing backrest"
run_cmd(cmd, new_node_data, message=message, verbose=verbose)

manage_node(new_node_data, "stop", f"pg{pg}", verbose)
cmd = f'rm -rf {new_node_data["path"]}/pgedge/data/pg{pg}'
message = f"Removing old data directory"
run_cmd(cmd, new_node_data, message=message, verbose=verbose)

args = (f'--repo1-path {repo1_path} --repo1-cipher-type {repo1_cipher_type} ')

if backup_id:
args += f'--set={backup_id} '

cmd = (f'{new_node_data["path"]}/pgedge/pgedge backrest command restore '
f'--repo1-type={repo1_type} --stanza={stanza} '
f'--pg1-path={new_node_data["path"]}/pgedge/data/pg{pg} {args}')

message = f"Restoring backup"
run_cmd(cmd, new_node_data, message=message, verbose=verbose)

pgd = f'{new_node_data["path"]}/pgedge/data/pg{pg}'
pgc = f'{pgd}/postgresql.conf'

cmd = f'echo "ssl_cert_file=\'{pgd}/server.crt\'" >> {pgc}'
message = f"Setting ssl_cert_file"
run_cmd(cmd, new_node_data, message=message, verbose=verbose)

cmd = f'echo "ssl_key_file=\'{pgd}/server.key\'" >> {pgc}'
message = f"Setting ssl_key_file"
run_cmd(cmd, new_node_data, message=message, verbose=verbose)

cmd = f'echo "log_directory=\'{pgd}/log\'" >> {pgc}'
message = f"Setting log_directory"
run_cmd(cmd, new_node_data, message=message, verbose=verbose)

cmd = (f'echo "shared_preload_libraries = '
f'\'pg_stat_statements, snowflake, spock\'" >> {pgc}')
message = f"Setting shared_preload_libraries"
run_cmd(cmd, new_node_data, message=message, verbose=verbose)


cmd = (f'{new_node_data["path"]}/pgedge/pgedge backrest configure_replica {stanza} '
f'{new_node_data["path"]}/pgedge/data/pg{pg} {source_node_data["ip_address"]} '
f'{source_node_data["port"]} {source_node_data["os_user"]}')
message = f"Configuring PITR on replica"
run_cmd(cmd, new_node_data, message=message, verbose=verbose)

if script.strip() and os.path.isfile(script):
util.echo_cmd(f'{script}')

terminate_cluster_transactions(nodes, db[0]['db_name'], f"pg{pg}", verbose)

spock = db_settings["spock_version"]
v4 = False
spock_maj = 3
if spock:
ver = [int(x) for x in spock.split('.')]
spock_maj = ver[0]
spock_min = ver[1]
if spock_maj >= 4:
v4 = True

set_cluster_readonly(nodes, True, db[0]['db_name'], f"pg{pg}", v4, verbose)


manage_node(new_node_data, "start", f"pg{pg}", verbose)
time.sleep(5)

check_cluster_lag(new_node_data, db[0]['db_name'], f"pg{pg}", verbose)

sql_cmd = "SELECT pg_promote()"
cmd = f"{new_node_data['path']}/pgedge/pgedge psql '{sql_cmd}' {db[0]['db_name']}"
message = f"Promoting standby to primary"
run_cmd(cmd, new_node_data, message=message, verbose=verbose)

sql_cmd = "DROP extension spock cascade"
cmd = f"{new_node_data['path']}/pgedge/pgedge psql '{sql_cmd}' {db[0]['db_name']}"
message = f"DROP extension spock cascade"
run_cmd(cmd, new_node_data, message=message, verbose=verbose)

parms = f"spock{spock_maj}{spock_min}" if spock else "spock"

cmd = (f'cd {new_node_data["path"]}/pgedge/; ./pgedge install {parms}')
message = f"Re-installing spock"
run_cmd(cmd, new_node_data, message=message, verbose=verbose)

sql_cmd = "CREATE EXTENSION spock"
cmd = f"{new_node_data['path']}/pgedge/pgedge psql '{sql_cmd}' {db[0]['db_name']}"
message = f"Create extension spock"
run_cmd(cmd, new_node_data, message=message, verbose=verbose)

create_node(new_node_data, db[0]['db_name'], verbose)

if not v4:
set_cluster_readonly(nodes, False, db[0]['db_name'], f"pg{pg}", v4, verbose)

create_sub(nodes, new_node_data, db[0]['db_name'], verbose)
create_sub_new(nodes, new_node_data, db[0]['db_name'], verbose)

if v4:
set_cluster_readonly(nodes, False, db[0]['db_name'], f"pg{pg}", v4, verbose)

cmd = (f'cd {new_node_data["path"]}/pgedge/; ./pgedge spock node-list {db[0]["db_name"]}')
message = f"Listing spock nodes"
result = run_cmd(cmd, node=new_node_data, message=message, verbose=verbose,
capture_output=True)
print(f"\n{result.stdout}")

sql_cmd = "select node_id,node_name from spock.node"
cmd = f"{source_node_data['path']}/pgedge/pgedge psql '{sql_cmd}' {db[0]['db_name']}"
message = f"List nodes"
result = run_cmd(cmd, node=source_node_data, message=message, verbose=verbose,
capture_output=True)
print(f"\n{result.stdout}")

for node in nodes:
sql_cmd = ("select sub_id,sub_name,sub_enabled,sub_slot_name,"
"sub_replication_sets from spock.subscription")
cmd = f"{node['path']}/pgedge/pgedge psql '{sql_cmd}' {db[0]['db_name']}"
message = f"List subscriptions"
result = run_cmd(cmd, node=node, message=message, verbose=verbose,
capture_output=True)
print(f"\n{result.stdout}")

sql_cmd = ("select sub_id,sub_name,sub_enabled,sub_slot_name,"
"sub_replication_sets from spock.subscription")
cmd = f"{new_node_data['path']}/pgedge/pgedge psql '{sql_cmd}' {db[0]['db_name']}"
message = f"List subscriptions"
result = run_cmd(cmd, node=new_node_data, message=message, verbose=verbose,
capture_output=True)
print(f"\n{result.stdout}")

# Remove unnecessary keys before appending new node to the cluster data
new_node_data.pop('repo1_type', None)
new_node_data.pop('os_user', None)
new_node_data.pop('ssh_key', None)

# Append new node data to the cluster JSON
node_group = target_node_data.get
cluster_data["node_groups"].append(new_node_data)
cluster_data["update_date"] = datetime.datetime.now().astimezone().isoformat()

write_cluster_json(cluster_name, cluster_data)

def add_node(cluster_name, source_node, target_node, repo1_path=None, backup_id=None, script=" ", stanza=" ", install=True):
"""
Expand Down Expand Up @@ -698,7 +950,7 @@ def add_node(cluster_name, source_node, target_node, repo1_path=None, backup_id=
json_validate_add_node(target_node_data)
except Exception as e:
util.exit_message(
f"Unable to load new node jsaon def file '{target_file_name}\n{e}")
f"Unable to load new node json def file '{target_file_name}\n{e}")

# Retrieve source node data
source_node_data = next((node for node in nodes if node["name"] == source_node), None)
Expand Down Expand Up @@ -737,7 +989,8 @@ def add_node(cluster_name, source_node, target_node, repo1_path=None, backup_id=

# Fetch backrest settings from cluster JSON
backrest_settings = cluster_data.get("backrest", {})
stanza = backrest_settings.get("stanza", f"pg{pg}")
# stanza = backrest_settings.get("stanza", f"pg{pg}")
stanza = backrest_settings.get("stanza", f"testdb_stanza")
repo1_retention_full = backrest_settings.get("repo1-retention-full", "7")
log_level_console = backrest_settings.get("log-level-console", "info")
repo1_cipher_type = backrest_settings.get("repo1-cipher-type", "aes-256-cbc")
Expand Down Expand Up @@ -1535,7 +1788,8 @@ def print_install_hdr(cluster_name, db, db_user, count, name, path, ip, port, re
"command": command,
"ssh": ssh,
"app-install": app_install,
"app-remove": app_remove
"app-remove": app_remove,
"add-node-from-backup": add_node_from_backup
}
)