Skip to content

Add bulk import tests #5268

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions db/db_tunables.c
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ extern int gbl_recovery_ckp;
extern int gbl_reproduce_ckp_bug;
extern int gbl_sample_queries;
extern int gbl_sample_queries_max_queries;
extern int gbl_comdb2_files_sleep_secs_after_processing_llmeta;
extern int gbl_slow_rep_process_txn_freq;
extern int gbl_slow_rep_process_txn_minms;
extern int gbl_slow_rep_process_txn_maxms;
Expand Down
3 changes: 3 additions & 0 deletions db/db_tunables.h
Original file line number Diff line number Diff line change
Expand Up @@ -2556,6 +2556,9 @@ REGISTER_TUNABLE("iam_dbname",
"override dbname for IAM",
TUNABLE_STRING, &gbl_iam_dbname, READEARLY | READONLY, NULL,
NULL, NULL, NULL);
REGISTER_TUNABLE("comdb2_files_sleep_secs_after_processing_llmeta", "Number of seconds to sleep after processing "
"llmeta in comdb2 files query processor\n", TUNABLE_INTEGER,
&gbl_comdb2_files_sleep_secs_after_processing_llmeta, INTERNAL, NULL, NULL, NULL, NULL);
REGISTER_TUNABLE("comdb2_oplog_preserve_seqno", "Preserve max value of the seqno in llmeta", TUNABLE_BOOLEAN, &gbl_comdb2_oplog_preserve_seqno, INTERNAL, NULL, NULL, NULL, NULL);
REGISTER_TUNABLE("queue_nonodh_scan_limit", "For comdb2_queues, stop queue scan at this depth (Default: 10000)", TUNABLE_INTEGER, &gbl_nonodh_queue_scan_limit, 0, NULL, NULL, NULL, NULL);
REGISTER_TUNABLE("always_request_log_req", "Always request the next log record on replicant if there is a gap (default: off)", TUNABLE_BOOLEAN, &gbl_always_request_log_req, 0, NULL, NULL, NULL, NULL);
Expand Down
17 changes: 13 additions & 4 deletions sqlite/ext/comdb2/files.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

typedef unsigned char u_int8_t;

int gbl_comdb2_files_sleep_secs_after_processing_llmeta = 0;
int endianness_mismatch(struct sqlclntstate *clnt);
void berk_fix_checkpoint_endianness(u_int8_t *buffer);

Expand Down Expand Up @@ -131,19 +132,19 @@ static int check_and_append_new_log_files(systbl_files_cursor *pCur)
static int read_next_chunk(systbl_files_cursor *pCur)
{
while (pCur->rowid < pCur->nfiles) {
logmsg(LOGMSG_DEBUG, "%s:%d processing %s\n", __func__, __LINE__,
pCur->files[pCur->rowid].name);
const char * const fname = pCur->files[pCur->rowid].name;
logmsg(LOGMSG_DEBUG, "%s:%d processing %s\n", __func__, __LINE__, fname);

if (pCur->files[pCur->rowid].type == FILES_TYPE_LOGFILE) {
if (check_and_append_new_log_files(pCur) != 0) {
logmsg(LOGMSG_ERROR, "%s:%d Failed to process file %s\n", __func__, __LINE__, pCur->files[pCur->rowid].name);
logmsg(LOGMSG_ERROR, "%s:%d Failed to process file %s\n", __func__, __LINE__, fname);
return SQLITE_ERROR;
}
}

int rc = read_write_file(pCur->files[pCur->rowid].info, pCur, memory_writer);
if (rc > 0) {
logmsg(LOGMSG_ERROR, "%s:%d Failed to process file %s\n", __func__, __LINE__, pCur->files[pCur->rowid].name);
logmsg(LOGMSG_ERROR, "%s:%d Failed to process file %s\n", __func__, __LINE__, fname);
return SQLITE_ERROR;
} else if (rc == 0) {
break;
Expand All @@ -160,6 +161,14 @@ static int read_next_chunk(systbl_files_cursor *pCur)
}

pCur->rowid++; // Read the next file

if (gbl_comdb2_files_sleep_secs_after_processing_llmeta
&& (strcmp(fname, "comdb2_llmeta.dta") == 0)) {
logmsg(LOGMSG_DEBUG, "%s:%d just processed llmeta. Sleeping for %d seconds\n",
__func__, __LINE__, gbl_comdb2_files_sleep_secs_after_processing_llmeta);
sleep(gbl_comdb2_files_sleep_secs_after_processing_llmeta);
logmsg(LOGMSG_DEBUG, "%s:%d done sleeping\n", __func__, __LINE__);
}
}

return SQLITE_OK;
Expand Down
2 changes: 0 additions & 2 deletions sqlite/ext/comdb2/files_util.c
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,6 @@ static void set_chunk_size(db_file_t *f, size_t chunk_size)
dbfile_set_chunk_size(f->info, chunk_size);
}



static int read_dir(const char *dirname, db_file_t **files, int *count, char *file_pattern, size_t chunk_size)
{
struct dirent buf;
Expand Down
2 changes: 1 addition & 1 deletion tests/bulkimport.test/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@ else
include $(TESTSROOTDIR)/testcase.mk
endif
ifeq ($(TEST_TIMEOUT),)
export TEST_TIMEOUT=5m
export TEST_TIMEOUT=7m
endif
133 changes: 133 additions & 0 deletions tests/bulkimport.test/runit
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,29 @@ function test_bad_inputs_with_werror() {
)
}

function test_source_table_does_not_exist() {
(
# Given
local dst_tbl=foo
query_dst_db "create table $dst_tbl(i int)"
trap "query_dst_db 'drop table $dst_tbl'" EXIT

# When
err=$(query_dst_db "replace table $dst_tbl with LOCAL_$SRC_DBNAME.donut_exist" 2>&1)

# Then
if (( $? == 0 )); then
echo "FAIL: Expected import to fail"
return 1
fi

if ! echo $err | grep "Source table does not exist"; then
echo "FAIL: Expected 'table does not exist' error"
return 1
fi
)
}

function test_resume_is_blocked() {
(
# Given
Expand Down Expand Up @@ -380,6 +403,116 @@ function test_resume_is_blocked() {
)
}

function test_src_db_table_dropped_during_import() {
Copy link
Contributor Author

@morgando morgando Jul 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • table is dropped before src db sends any files -> dst db will not see src table and import will fail. This is tested in test_source_table_does_not_exist
  • table is dropped after src db sends all files -> not worth testing: from the perspective of the dst db this is equivalent to the drop not happening
  • table is dropped while src db sends files -> tested in test_src_db_table_dropped_during_import. comdb2_files sends transaction log files after sending all btree files. So if the drop happens after btree files have been sent, then the import will error if the txn log reflecting the drop has been flushed to disk by the time it is sent to the src db and succeed if it hasn't (because the dst db brings up a copy of the src db with the files that it gets and runs recovery on it). Either of these outcomes is fine.

(
# Given
# create tables
local src_tbl=foo dst_tbl=bar start_timestamp=$(date '+%Y/%m/%d %H:%M:%S')
fixture_src_tbl_and_dst_tbl_have_same_schema $src_tbl $dst_tbl > /dev/null

# set tunables
# the 30 second sleep gives the db time to flush logs reflecting the table drop
set_src_tunable 'comdb2_files_sleep_secs_after_processing_llmeta 30'
set_src_tunable 'logmsg level debug'

# teardown routines
trap "set_src_tunable 'comdb2_files_sleep_secs_after_processing_llmeta 0';
set_src_tunable 'logmsg level warn';
query_dst_db 'drop table $dst_tbl'" EXIT

# start import and wait for the source db to sleep after
# sending llmeta to the dst db
query_dst_db "replace table $dst_tbl with LOCAL_$SRC_DBNAME.$src_tbl" 2> err.txt &
waitpid=$!
wait_for_src_trace "just processed llmeta" "$start_timestamp"

# When
query_src_db "drop table $src_tbl"

# Then
# We dropped the table between the point when the src db sent
# llmeta and the point when it sent the txn logs.
#
# Therefore, the drop will be reflected in the txn logs,
# which means that recovery will drop it from llmeta.
#
# So we expect the import to fail because
# the source table will not exist.

if check_for_src_trace "read_next_chunk.*done sleeping" "$start_timestamp"; then
echo "FAIL: Expected source database to still be sleeping. Test is buggy"
return 1
fi

if wait $waitpid; then
echo "FAIL: Expected import to fail."
return 1
fi

if ! cat err.txt | grep "Source table does not exist"; then
echo "FAIL: Expected 'table does not exist' error"
return 1
fi

if verify_eq $src_tbl $dst_tbl; then
echo "FAIL: Expected tables to not match"
return 1
fi
)
}

function test_src_db_election_during_import() {
(
# Given
# create tables
local src_tbl=foo dst_tbl=bar start_timestamp=$(date '+%Y/%m/%d %H:%M:%S')
fixture_src_tbl_and_dst_tbl_have_same_schema $src_tbl $dst_tbl > /dev/null

# set tunables
set_src_tunable 'comdb2_files_sleep_secs_after_processing_llmeta 10'
set_src_tunable 'logmsg level debug'

# teardown routines
trap "set_src_tunable 'comdb2_files_sleep_secs_after_processing_llmeta 0';
set_src_tunable 'logmsg level warn';
query_src_db 'drop table $src_tbl';
query_dst_db 'drop table $dst_tbl'" EXIT


# start import and wait for the source db to sleep after
# sending llmeta to the dst db
query_dst_db "replace table $dst_tbl with LOCAL_$SRC_DBNAME.$src_tbl" &
waitpid=$!
wait_for_src_trace "just processed llmeta" "$start_timestamp"

# When
downgrade_src_db

# Then

downgrade_rc=$?
if (( downgrade_rc != 0 )); then
echo "FAIL: Expected downgrade to succeed"
return 1
fi

if check_for_src_trace "read_next_chunk.*done sleeping" "$start_timestamp"; then
echo "FAIL: Expected source database to still be sleeping. Test is buggy"
return 1
fi

if ! wait $waitpid; then
echo "FAIL: Expected import to succeed"
return 1
fi

if ! verify_eq $src_tbl $dst_tbl; then
echo "FAIL: Expected tables to match"
return 1
fi
)
}

function run_basic_test() {
(
# Given
Expand Down
44 changes: 28 additions & 16 deletions tests/bulkimport.test/util.sh
Original file line number Diff line number Diff line change
Expand Up @@ -30,26 +30,14 @@ function failexit() {
fi
}

function query_src_db_opts() {
query=$1
opts=$2

cdb2sql ${SRC_CDB2_OPTIONS} ${opts} $SRC_DBNAME default "$query"
}

function query_dst_db_opts() {
query=$1
opts=$2

cdb2sql ${DST_CDB2_OPTIONS} ${opts} $DST_DBNAME default "$query"
}

function query_src_db() {
query_src_db_opts "$1"
query=$1
cdb2sql ${SRC_CDB2_OPTIONS} $SRC_DBNAME default "$query"
}

function query_dst_db() {
query_dst_db_opts "$1"
query=$1
cdb2sql ${DST_CDB2_OPTIONS} $DST_DBNAME default "$query"
}

function set_src_tunable() {
Expand All @@ -70,6 +58,30 @@ function set_dst_tunable() {
done
}

function check_for_src_trace() {
local -r trace="$1" timestamp="$2"
echo "Checking for src trace. Trace is $trace. Timestamp is $timestamp"
awk -v ts="${timestamp}" '$0 >= ts' ${TESTDIR}/logs/${SRC_DBNAME}* | grep "${trace}"
}

function wait_for_src_trace() {
local -r trace="$1" timestamp="$2"
while ! check_for_src_trace "$trace" "$timestamp"; do
sleep .1
done
}

function downgrade_src_db() {
local master
master=$(cdb2sql ${SRC_CDB2_OPTIONS} -tabs $SRC_DBNAME default 'SELECT host FROM comdb2_cluster WHERE is_master="Y"') || return 1
cdb2sql --host ${master} ${SRC_CDB2_OPTIONS} $SRC_DBNAME "exec procedure sys.cmd.send('downgrade')" || return 1
local new_master=${master}
while [[ "${new_master}" == "${master}" ]]; do
sleep 1
new_master=$(cdb2sql ${SRC_CDB2_OPTIONS} -tabs $SRC_DBNAME default 'SELECT host FROM comdb2_cluster WHERE is_master="Y"') || return 1
done
}

function query_in_loop() {
query_func=$1
query=$2
Expand Down