Skip to content

Commit eb301fe

Browse files
committed
Use threadpool for async scs
Signed-off-by: mdouglas47 <[email protected]>
1 parent 509a4ca commit eb301fe

File tree

9 files changed

+90
-71
lines changed

9 files changed

+90
-71
lines changed

db/comdb2.c

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ void berk_memp_sync_alarm_ms(int);
128128
#include <cdb2_constants.h>
129129
#include <bb_oscompat.h>
130130
#include <schemachange.h>
131+
#include <sc_global.h>
131132
#include "comdb2_atomic.h"
132133
#include "cron.h"
133134
#include "metrics.h"
@@ -5849,6 +5850,10 @@ int main(int argc, char **argv)
58495850
*/
58505851
gbl_tunables->freeze = 1;
58515852

5853+
if (init_sc_globals()) {
5854+
logmsg(LOGMSG_FATAL, "Failed to init sc globals\n");
5855+
exit(1);
5856+
}
58525857
handle_resume_sc();
58535858

58545859
create_marker_file();

db/osqlblockproc.c

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1305,6 +1305,18 @@ int bplog_schemachange_wait(struct ireq *iq, int rc)
13051305
iq->sc_pending = NULL;
13061306

13071307
while (sc != NULL) {
1308+
if (!sc->nothrevent) {
1309+
// If sc is running asynchronously, we must first hold mtxStart and
1310+
// wait on condStart until sc->started is set to true—otherwise we risk
1311+
// locking sc->mtx before the sc has begun.
1312+
Pthread_mutex_lock(&sc->mtxStart);
1313+
while (!sc->started) {
1314+
Pthread_cond_wait(&sc->condStart, &sc->mtxStart);
1315+
}
1316+
Pthread_mutex_unlock(&sc->mtxStart);
1317+
}
1318+
// Now that sc is guaranteed to be running, we can safely lock its main mutex,
1319+
// which the sc will release once it completes
13081320
Pthread_mutex_lock(&sc->mtx);
13091321
sc->nothrevent = 1;
13101322
iq->sc = sc->sc_next;

db/osqlcomm.c

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6563,8 +6563,7 @@ static int _process_single_table_sc_partitioning(struct ireq *iq)
65636563
arg.start = 1; /* first shard is already there */
65646564
arg.pos = 0; /* reset this so we do not set publish on additional shards */
65656565
}
6566-
/* should we serialize ? */
6567-
arg.s->nothrevent = sc->partition.u.tpt.retention > gbl_dohsql_sc_max_threads;
6566+
arg.s->nothrevent = 0;
65686567
rc = timepart_foreach_shard_lockless(
65696568
sc->newpartition, start_schema_change_tran_wrapper, &arg);
65706569

schemachange/sc_global.c

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,6 @@ struct schema_change_type *sc_resuming = NULL;
5555
/* async ddl sc */
5656
pthread_mutex_t sc_async_mtx = PTHREAD_MUTEX_INITIALIZER;
5757
pthread_cond_t sc_async_cond = PTHREAD_COND_INITIALIZER;
58-
volatile int sc_async_threads = 0;
5958

6059
/* Throttle settings, which you can change with message traps. Note that if
6160
* you have gbl_sc_usleep=0, the important live writer threads never get to
@@ -688,3 +687,18 @@ void sc_alter_latency(int counter)
688687
}
689688
}
690689
}
690+
691+
struct thdpool *gbl_sc_thdpool = NULL;
692+
int init_sc_globals()
693+
{
694+
gbl_sc_thdpool = thdpool_create("sc_thdpool", 0);
695+
if (!gbl_sc_thdpool) {
696+
logmsg(LOGMSG_ERROR, "%s: Failed to allocate threadpool\n", __func__);
697+
return 1;
698+
}
699+
700+
thdpool_set_maxthds(gbl_sc_thdpool, bdb_attr_get(thedb->bdb_attr, BDB_ATTR_SC_ASYNC_MAXTHREADS));
701+
thdpool_set_maxqueue(gbl_sc_thdpool, 1000);
702+
703+
return 0;
704+
}

schemachange/sc_global.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ extern int gbl_default_sc_scanmode;
3030

3131
extern pthread_mutex_t sc_async_mtx;
3232
extern pthread_cond_t sc_async_cond;
33-
extern volatile int sc_async_threads;
3433

3534
/* Throttle settings, which you can change with message traps. Note that if
3635
* you have gbl_sc_usleep=0, the important live writer threads never get to
@@ -86,5 +85,6 @@ struct schema_change_type *preempt_ongoing_alter(char *table, int action);
8685
void clear_ongoing_alter();
8786
int get_stopsc(const char *func, int line);
8887
void sc_alter_latency(int counter);
88+
int init_sc_globals();
8989

9090
#endif

schemachange/sc_logic.c

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -66,10 +66,9 @@ static enum thrtype prepare_sc_thread(struct schema_change_type *s)
6666
thrman_change_type(thr_self, THRTYPE_SCHEMACHANGE);
6767
} else
6868
thr_self = thrman_register(THRTYPE_SCHEMACHANGE);
69-
if (!s->nothrevent) {
70-
backend_thread_event(thedb, COMDB2_THR_EVENT_START_RDWR);
71-
logmsg(LOGMSG_INFO, "Preparing schema change read write thread\n");
72-
}
69+
70+
backend_thread_event(thedb, COMDB2_THR_EVENT_START_RDWR);
71+
logmsg(LOGMSG_INFO, "Preparing schema change read write thread\n");
7372
}
7473
return oldtype;
7574
}
@@ -728,12 +727,6 @@ static int do_schema_change_tran_int(sc_arg_t *arg)
728727
else
729728
s->sc_rc = rc;
730729

731-
if (!s->nothrevent) {
732-
Pthread_mutex_lock(&sc_async_mtx);
733-
sc_async_threads--;
734-
Pthread_cond_broadcast(&sc_async_cond);
735-
Pthread_mutex_unlock(&sc_async_mtx);
736-
}
737730
if (rc == SC_COMMIT_PENDING && (s->preempted == SC_ACTION_RESUME ||
738731
s->kind == SC_ALTERTABLE_PENDING)) {
739732
int bdberr = 0;
@@ -795,6 +788,13 @@ int do_schema_change_tran_thd(sc_arg_t *arg)
795788
return rc;
796789
}
797790

791+
void do_schema_change_tran_thd_thdpool_wrapper(struct thdpool *pool, void *work, void *thddata, int op)
792+
{
793+
bdb_thread_event(thedb->bdb_env, 1);
794+
do_schema_change_tran_int(work);
795+
bdb_thread_event(thedb->bdb_env, 0);
796+
}
797+
798798
int do_schema_change_locked(struct schema_change_type *s, void *tran)
799799
{
800800
comdb2_name_thread(__func__);
@@ -825,6 +825,11 @@ int do_schema_change_locked(struct schema_change_type *s, void *tran)
825825
return rc;
826826
}
827827

828+
void do_schema_change_locked_thdpool_wrapper(struct thdpool *pool, void *work, void *thddata, int op)
829+
{
830+
do_schema_change_locked(work, NULL);
831+
}
832+
828833
int finalize_schema_change(struct ireq *iq, tran_type *trans)
829834
{
830835
if (iq == NULL || iq->sc == NULL) abort();

schemachange/sc_logic.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,5 +35,7 @@ int verify_constraints_exist(struct dbtable *from_db, struct dbtable *to_db,
3535

3636
int do_schema_change_tran(sc_arg_t *);
3737
int do_schema_change_tran_thd(sc_arg_t *);
38+
void do_schema_change_tran_thd_thdpool_wrapper(struct thdpool *pool, void *work, void *thddata, int op);
3839
int do_schema_change_locked(struct schema_change_type *, void *tran);
40+
void do_schema_change_locked_thdpool_wrapper(struct thdpool *pool, void *work, void *thddata, int op);
3941
#endif

schemachange/schemachange.c

Lines changed: 38 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,34 @@ const char *get_hostname_with_crc32(bdb_state_type *bdb_state,
4141
unsigned int hash);
4242

4343
extern int gbl_test_sc_resume_race;
44+
extern struct thdpool *gbl_sc_thdpool;
45+
46+
static int do_schema_change_async(struct ireq * iq, sc_arg_t *arg, struct schema_change_type *s)
47+
{
48+
int rc = 0;
49+
50+
iq->sc_locked = 0;
51+
if (s->kind == SC_ALTERTABLE_PENDING ||
52+
s->preempted == SC_ACTION_RESUME) {
53+
free(arg);
54+
arg = NULL;
55+
rc = thdpool_enqueue(gbl_sc_thdpool, (thdpool_work_fn) do_schema_change_locked_thdpool_wrapper, s, 0, NULL, THDPOOL_FORCE_QUEUE);
56+
} else {
57+
rc = thdpool_enqueue(gbl_sc_thdpool, (thdpool_work_fn) do_schema_change_tran_thd_thdpool_wrapper, arg, 0, NULL, THDPOOL_FORCE_QUEUE);
58+
}
59+
if (rc) {
60+
logmsg(LOGMSG_ERROR, "%s:thdpool_enqueue rc %d\n", __func__, rc);
61+
if (arg) { free(arg); }
62+
if (!s->is_osql) {
63+
sc_set_running(iq, s, s->tablename, 0, gbl_myhostname,
64+
time(NULL), __func__, __LINE__);
65+
free_schema_change_type(s);
66+
}
67+
rc = SC_ASYNC_FAILED;
68+
}
69+
70+
return rc;
71+
}
4472

4573
/* If this is successful, it increments */
4674
int start_schema_change_tran(struct ireq *iq, tran_type *trans)
@@ -324,66 +352,19 @@ int start_schema_change_tran(struct ireq *iq, tran_type *trans)
324352
*forward
325353
** in order to produce minimal spew
326354
*/
327-
if (s->nothrevent) {
328-
if (s->kind != SC_PARTIALUPRECS)
329-
logmsg(LOGMSG_INFO, "Executing SYNCHRONOUSLY\n");
330-
rc = do_schema_change_tran(arg);
331-
} else {
332-
int max_threads =
333-
bdb_attr_get(thedb->bdb_attr, BDB_ATTR_SC_ASYNC_MAXTHREADS);
334-
Pthread_mutex_lock(&sc_async_mtx);
335-
while (!s->must_resume && !s->resume && max_threads > 0 &&
336-
sc_async_threads >= max_threads) {
337-
logmsg(LOGMSG_INFO, "Waiting for avaiable schema change threads\n");
338-
Pthread_cond_wait(&sc_async_cond, &sc_async_mtx);
339-
}
340-
sc_async_threads++;
341-
Pthread_mutex_unlock(&sc_async_mtx);
342-
343-
if (s->kind != SC_PARTIALUPRECS)
344-
logmsg(LOGMSG_INFO, "Executing ASYNCHRONOUSLY\n");
345-
pthread_t tid;
346-
347-
if (s->kind == SC_ALTERTABLE_PENDING ||
348-
s->preempted == SC_ACTION_RESUME) {
349-
free(arg);
350-
arg = NULL;
351-
rc = pthread_create(&tid, &gbl_pthread_attr_detached,
352-
(void *(*)(void *))do_schema_change_locked, s);
353-
} else {
354-
Pthread_mutex_lock(&s->mtxStart);
355-
rc = pthread_create(&tid, &gbl_pthread_attr_detached,
356-
(void *(*)(void *))do_schema_change_tran_thd,
357-
arg);
358-
if (rc == 0) {
359-
while (!s->started) {
360-
Pthread_cond_wait(&s->condStart, &s->mtxStart);
361-
}
362-
}
363-
Pthread_mutex_unlock(&s->mtxStart);
364-
}
365-
if (rc) {
366-
logmsg(LOGMSG_ERROR,
367-
"start_schema_change:pthread_create rc %d %s\n", rc,
368-
strerror(errno));
369-
370-
Pthread_mutex_lock(&sc_async_mtx);
371-
sc_async_threads--;
372-
Pthread_mutex_unlock(&sc_async_mtx);
373-
374-
if (arg)
375-
free(arg);
376-
if (!s->is_osql) {
377-
sc_set_running(iq, s, s->tablename, 0, gbl_myhostname,
378-
time(NULL), __func__, __LINE__);
379-
free_schema_change_type(s);
380-
}
381-
rc = SC_ASYNC_FAILED;
382-
}
383-
}
355+
if (s->kind != SC_PARTIALUPRECS)
356+
logmsg(LOGMSG_INFO, "Executing %s\n", s->nothrevent ?
357+
"SYNCHRONOUSLY" : "ASYNCHRONOUSLY");
358+
359+
rc = s->nothrevent
360+
? do_schema_change_tran(arg)
361+
: do_schema_change_async(iq, arg, s);
362+
384363
/* SC_COMMIT_PENDING is SC_OK for the upper layers */
385364
if (rc == SC_COMMIT_PENDING) {
386365
rc = s->sc_rc = SC_OK;
366+
} else if (rc) {
367+
logmsg(LOGMSG_ERROR, "%s:%d Schema change failed\n", __func__, __LINE__);
387368
}
388369

389370
return rc;

tests/comdb2sys.test/comdb2sys.expected

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -349,6 +349,7 @@
349349
(name='pgcompactpool')
350350
(name='recovery_processors')
351351
(name='recovery_workers')
352+
(name='sc_thdpool')
352353
(name='sqlenginepool')
353354
(name='udppfaultpool')
354355
[SELECT name FROM comdb2_threadpools ORDER BY name] rc 0

0 commit comments

Comments
 (0)