From d724ef51ea33acb5811e3609adb31d073172f6f7 Mon Sep 17 00:00:00 2001 From: Aakash Arayambeth Date: Wed, 29 May 2024 15:25:51 -0400 Subject: [PATCH] generic sharding : setup metadata and remote tables. Create/delete --- bbinc/cdb2_constants.h | 2 + bbinc/consistent_hash.h | 5 + bdb/bdb_schemachange.c | 6 + bdb/bdb_schemachange.h | 4 +- db/CMakeLists.txt | 1 + db/comdb2.c | 23 +- db/comdb2.h | 11 +- db/db_tunables.c | 2 +- db/db_tunables.h | 3 + db/dohast.h | 0 db/hash_partition.c | 866 ++++++++++++++++++++++++++++ db/hash_partition.h | 42 ++ db/osqlcomm.c | 71 +++ db/osqlsqlthr.c | 9 +- db/sqlinterfaces.c | 8 + db/views.c | 49 +- db/views.h | 11 +- db/views_persist.c | 62 +- db/views_serial.c | 259 +++++++++ db/views_sqlite.c | 164 +++++- schemachange/sc_add_table.c | 6 + schemachange/sc_callbacks.c | 25 +- schemachange/sc_drop_table.c | 9 + schemachange/sc_struct.c | 44 ++ schemachange/schemachange.h | 13 +- sqlite/CMakeLists.txt | 1 + sqlite/ext/comdb2/comdb2systblInt.h | 2 +- sqlite/ext/comdb2/hashpartitions.c | 44 ++ sqlite/ext/comdb2/tables.c | 2 + sqlite/src/build.c | 13 +- sqlite/src/comdb2build.c | 135 ++++- sqlite/src/comdb2build.h | 1 + sqlite/src/parse.y | 5 +- sqlite/tool/mkkeywordhash.c | 1 + tests/tools/test_consistent_hash.c | 5 - 35 files changed, 1862 insertions(+), 42 deletions(-) create mode 100644 db/dohast.h create mode 100644 db/hash_partition.c create mode 100644 db/hash_partition.h create mode 100644 sqlite/ext/comdb2/hashpartitions.c diff --git a/bbinc/cdb2_constants.h b/bbinc/cdb2_constants.h index a3581e24d9..19d3788c90 100644 --- a/bbinc/cdb2_constants.h +++ b/bbinc/cdb2_constants.h @@ -36,6 +36,8 @@ #define MAX_SPNAME MAXTABLELEN #define MAX_SPVERSION_LEN 80 #define MAXTABLELEN 32 +#define MAXPARTITIONS 32 +#define MAXPARTITIONLEN (1 + (MAX_DBNAME_LENGTH + MAXTABLELEN)) /* partitions of the form . */ #define MAXTAGLEN 64 #define REPMAX 32 /* Maximum buffer length for generated key name. */ diff --git a/bbinc/consistent_hash.h b/bbinc/consistent_hash.h index 007212ce33..e7fd728ed4 100644 --- a/bbinc/consistent_hash.h +++ b/bbinc/consistent_hash.h @@ -15,6 +15,11 @@ enum ch_err { CH_ERR_DUP = 4 }; +enum ch_hash_func_type { + CH_HASH_SHA = 1, + CH_HASH_MD5 = 2, + CH_HASH_CRC = 3 +}; struct consistent_hash_node { uint8_t *data; size_t data_len; diff --git a/bdb/bdb_schemachange.c b/bdb/bdb_schemachange.c index 3f22f6a4c2..c128080d6b 100644 --- a/bdb/bdb_schemachange.c +++ b/bdb/bdb_schemachange.c @@ -353,6 +353,12 @@ int bdb_llog_partition(bdb_state_type *bdb_state, tran_type *tran, char *name, return bdb_llog_scdone_tran(bdb_state, views, tran, name, strlen(name) + 1, bdberr); } + +int bdb_llog_hash_partition(bdb_state_type *bdb_state, tran_type *tran, char *name, int *bdberr) +{ + return bdb_llog_scdone_tran(bdb_state, hash_views, tran, name, strlen(name) + 1, bdberr); +} + int bdb_llog_luareload(bdb_state_type *bdb_state, int wait, int *bdberr) { return bdb_llog_scdone(bdb_state, luareload, NULL, 0, wait, bdberr); diff --git a/bdb/bdb_schemachange.h b/bdb/bdb_schemachange.h index 31ae3e9bee..5ec3e401e4 100644 --- a/bdb/bdb_schemachange.h +++ b/bdb/bdb_schemachange.h @@ -51,7 +51,8 @@ typedef enum scdone { add_queue_file, // 22 del_queue_file, // 23 alias_table, // 24 - alias // 25 + alias, // 25 + hash_views // 25 } scdone_t; #define BDB_BUMP_DBOPEN_GEN(type, msg) \ @@ -73,6 +74,7 @@ int bdb_llog_views(bdb_state_type *bdb_state, char *name, int wait, int *bdberr); int bdb_llog_partition(bdb_state_type *bdb_state, tran_type *tran, char *name, int *bdberr); +int bdb_llog_hash_partition(bdb_state_type *bdb_state, tran_type *tran, char *name, int *bdberr); int bdb_llog_rowlocks(bdb_state_type *, scdone_t, int *bdberr); int bdb_llog_genid_format(bdb_state_type *, scdone_t, int *bdberr); int bdb_reload_rowlocks(bdb_state_type *, scdone_t, int *bdberr); diff --git a/db/CMakeLists.txt b/db/CMakeLists.txt index 23073efe5d..8494fd48ca 100644 --- a/db/CMakeLists.txt +++ b/db/CMakeLists.txt @@ -110,6 +110,7 @@ set(src machclass.c osqluprec.c macc_glue.c + hash_partition.c ${PROJECT_BINARY_DIR}/protobuf/bpfunc.pb-c.c ${PROJECT_SOURCE_DIR}/tools/cdb2_dump/cdb2_dump.c ${PROJECT_SOURCE_DIR}/tools/cdb2_load/cdb2_load.c diff --git a/db/comdb2.c b/db/comdb2.c index 0106e742e1..9a143307eb 100644 --- a/db/comdb2.c +++ b/db/comdb2.c @@ -492,7 +492,7 @@ double gbl_sql_cost_error_threshold = -1; int gbl_parallel_recovery_threads = 0; -int gbl_fdb_resolve_local = 0; +int gbl_fdb_resolve_local = 1; int gbl_fdb_allow_cross_classes = 0; uint64_t gbl_sc_headroom = 10; /*---COUNTS---*/ @@ -816,6 +816,7 @@ int gbl_hostname_refresh_time = 60; int gbl_pstack_self = 1; char *gbl_cdb2api_policy_override = NULL; +int gbl_create_remote_tables = 0; int close_all_dbs_tran(tran_type *tran); @@ -2410,6 +2411,15 @@ int llmeta_load_timepart(struct dbenv *dbenv) return thedb->timepart_views ? 0 : -1; } +int llmeta_load_hash_partitions(struct dbenv *dbenv) +{ + logmsg(LOGMSG_INFO, "Loading hash-based partitions\n"); + Pthread_rwlock_init(&hash_partition_lk, NULL); + dbenv->hash_partition_views = hash_create_all_views(); + + return dbenv->hash_partition_views ? 0 : -1; +} + /* replace the table names and dbnums saved in the low level meta table with the * ones in the dbenv. returns 0 on success and anything else otherwise */ int llmeta_set_tables(tran_type *tran, struct dbenv *dbenv) @@ -4143,6 +4153,12 @@ static int init(int argc, char **argv) unlock_schema_lk(); return -1; } + + if (llmeta_load_hash_partitions(thedb)) { + logmsg(LOGMSG_ERROR, "could not load hash partitions\n"); + unlock_schema_lk(); + return -1; + } if (llmeta_load_queues(thedb)) { logmsg(LOGMSG_FATAL, "could not load queues from the low level meta " @@ -6342,7 +6358,10 @@ int comdb2_reload_schemas(void *dbenv, void *inlsn) logmsg(LOGMSG_ERROR, "could not load time partitions\n"); abort(); } - + if (llmeta_load_hash_partitions(thedb)) { + logmsg(LOGMSG_FATAL, "could not load mod based shards\n"); + abort(); + } if ((rc = bdb_get_rowlocks_state(&rlstate, tran, &bdberr)) != 0) { logmsg(LOGMSG_ERROR, "Get rowlocks llmeta failed, rc=%d bdberr=%d\n", rc, bdberr); diff --git a/db/comdb2.h b/db/comdb2.h index d8e6daa1da..1854a3e1ab 100644 --- a/db/comdb2.h +++ b/db/comdb2.h @@ -969,11 +969,11 @@ struct dbenv { int incoh_notcoherent; uint32_t incoh_file, incoh_offset; timepart_views_t *timepart_views; - - struct time_metric *service_time; - struct time_metric *queue_depth; - struct time_metric *concurrent_queries; - struct time_metric *connections; + hash_t *hash_partition_views; + struct time_metric* service_time; + struct time_metric* queue_depth; + struct time_metric* concurrent_queries; + struct time_metric* connections; struct time_metric *sql_queue_time; struct time_metric *handle_buf_queue_time; struct time_metric *watchdog_time; @@ -3734,4 +3734,5 @@ void get_disable_skipscan_all(); void get_client_origin(char *out, size_t outlen, struct sqlclntstate *clnt); +extern pthread_rwlock_t hash_partition_lk; #endif /* !INCLUDED_COMDB2_H */ diff --git a/db/db_tunables.c b/db/db_tunables.c index 16200d39ce..c28004c2ae 100644 --- a/db/db_tunables.c +++ b/db/db_tunables.c @@ -523,7 +523,6 @@ int gbl_old_column_names = 1; int gbl_enable_sq_flattening_optimization = 1; int gbl_mask_internal_tunables = 1; int gbl_allow_readonly_runtime_mod = 0; - size_t gbl_cached_output_buffer_max_bytes = 8 * 1024 * 1024; /* 8 MiB */ int gbl_sqlite_sorterpenalty = 5; int gbl_file_permissions = 0660; @@ -557,6 +556,7 @@ extern int gbl_altersc_latency_inc; extern int gbl_sc_history_max_rows; extern int gbl_sc_status_max_rows; extern int gbl_rep_process_pstack_time; +extern int gbl_create_remote_tables; extern void set_snapshot_impl(snap_impl_enum impl); extern const char *snap_impl_str(snap_impl_enum impl); diff --git a/db/db_tunables.h b/db/db_tunables.h index 4a52c1a0b1..a3a2fda8de 100644 --- a/db/db_tunables.h +++ b/db/db_tunables.h @@ -2465,4 +2465,7 @@ REGISTER_TUNABLE("sc_status_max_rows", "Max number of rows returned in comdb2_sc TUNABLE_INTEGER, &gbl_sc_status_max_rows, 0, NULL, NULL, NULL, NULL); REGISTER_TUNABLE("rep_process_pstack_time", "pstack the server if rep_process runs longer than time specified in secs (Default: 30s)", TUNABLE_INTEGER, &gbl_rep_process_pstack_time, 0, NULL, NULL, NULL, NULL); +REGISTER_TUNABLE("create_remote_tables", + "while creating a sharded table, create tables (shards) on the remote databases as well (default: off)", + TUNABLE_BOOLEAN, &gbl_create_remote_tables, 0, NULL, NULL, NULL, NULL); #endif /* _DB_TUNABLES_H */ diff --git a/db/dohast.h b/db/dohast.h new file mode 100644 index 0000000000..e69de29bb2 diff --git a/db/hash_partition.c b/db/hash_partition.c new file mode 100644 index 0000000000..5fcc6ac74f --- /dev/null +++ b/db/hash_partition.c @@ -0,0 +1,866 @@ +#include "hash_partition.h" +#include +#include "views.h" +#include "cson/cson.h" +#include "schemachange.h" +#include +#include "consistent_hash.h" +#include +struct hash_view { + char *viewname; + char *tblname; + int num_keys; + char **keynames; + int num_partitions; + char **partitions; + int version; + ch_hash_t *partition_hash; +}; + + +extern char gbl_dbname[MAX_DBNAME_LENGTH]; +pthread_rwlock_t hash_partition_lk; +int gbl_sharding_local = 1; +void dump_alias_info(); +const char *hash_view_get_viewname(struct hash_view *view) +{ + return view->viewname; +} + +const char *hash_view_get_tablename(struct hash_view *view) +{ + return view->tblname; +} +char **hash_view_get_keynames(struct hash_view *view) +{ + return view->keynames; +} +int hash_view_get_num_partitions(struct hash_view *view) +{ + return view->num_partitions; +} +int hash_view_get_num_keys(struct hash_view *view) +{ + return view->num_keys; +} + +int hash_view_get_sqlite_view_version(struct hash_view *view) +{ + return view->version; +} + +char** hash_view_get_partitions(struct hash_view *view) { + return view->partitions; +} + +static void free_hash_view(hash_view_t *mView) +{ + if (mView) { + if (mView->viewname) { + free(mView->viewname); + } + if (mView->tblname) { + free(mView->tblname); + } + if (mView->keynames) { + for (int i = 0; i < mView->num_keys; i++) { + free(mView->keynames[i]); + } + free(mView->keynames); + } + if (mView->partitions) { + for (int i=0; i< mView->num_partitions;i++) { + free(mView->partitions[i]); + } + free(mView->partitions); + } + if (mView->partition_hash) { + ch_hash_free(mView->partition_hash); + } + free(mView); + } +} + +hash_view_t *create_hash_view(const char *viewname, const char *tablename, uint32_t num_columns, + char columns[][MAXCOLNAME], uint32_t num_partitions, + char partitions[][MAXPARTITIONLEN], struct errstat *err) +{ + hash_view_t *mView; + + mView = (hash_view_t *)calloc(1, sizeof(hash_view_t)); + + if (!mView) { + logmsg(LOGMSG_ERROR, "%s: Failed to allocate view %s\n", __func__, viewname); + goto oom; + } + + mView->viewname = strdup(viewname); + if (!mView->viewname) { + logmsg(LOGMSG_ERROR, "%s: Failed to allocate view name string %s\n", __func__, viewname); + goto oom; + } + + mView->tblname = strdup(tablename); + if (!mView->tblname) { + logmsg(LOGMSG_ERROR, "%s: Failed to allocate table name string %s\n", __func__, tablename); + goto oom; + } + mView->num_keys = num_columns; + mView->keynames = (char **)malloc(sizeof(char *) * mView->num_keys); + if (!mView->keynames) { + logmsg(LOGMSG_ERROR, "%s: Failed to allocate keynames\n", __func__); + goto oom; + } + + for (int i = 0; i < mView->num_keys; i++) { + mView->keynames[i] = strdup(columns[i]); + if (!mView->keynames[i]) { + logmsg(LOGMSG_ERROR, "%s: Failed to allocate key name string %s\n", __func__, columns[i]); + goto oom; + } + } + mView->num_partitions = num_partitions; + + mView->partitions = (char **)calloc(1, sizeof(char *) * num_partitions); + for (int i = 0; i < num_partitions; i++) { + mView->partitions[i] = strdup(partitions[i]); + if (!mView->partitions[i]) { + goto oom; + } + } + + ch_hash_t *ch = ch_hash_create(mView->num_partitions, ch_hash_sha); + mView->partition_hash = ch; + if (!mView->partition_hash) { + logmsg(LOGMSG_ERROR, "Failed create consistent_hash\n"); + goto oom; + } + + for(int i=0;inum_partitions;i++){ + if (ch_hash_add_node(ch, (uint8_t *)mView->partitions[i], strlen(mView->partitions[i]), ch->key_hashes[i]->hash_val)) { + logmsg(LOGMSG_ERROR, "Failed to add node %s\n", mView->partitions[i]); + goto oom; + } + } + + return mView; +oom: + free_hash_view(mView); + errstat_set_rcstrf(err, VIEW_ERR_MALLOC, "calloc oom"); + return NULL; +} + +static int create_inmem_view(hash_t *hash_views, hash_view_t *view) +{ + Pthread_rwlock_wrlock(&hash_partition_lk); + hash_add(hash_views, view); + Pthread_rwlock_unlock(&hash_partition_lk); + return VIEW_NOERR; +} + +static int destroy_inmem_view(hash_t *hash_views, hash_view_t *view) +{ + if (!view) { + logmsg(LOGMSG_USER, "SOMETHING IS WRONG. VIEW CAN'T BE NULL\n"); + return VIEW_ERR_NOTFOUND; + } + Pthread_rwlock_wrlock(&hash_partition_lk); + struct hash_view *v = hash_find_readonly(hash_views, &view->viewname); + int rc = VIEW_NOERR; + if (!v) { + rc = VIEW_ERR_NOTFOUND; + goto done; + } + hash_del(hash_views, v); + free_hash_view(v); +done: + Pthread_rwlock_unlock(&hash_partition_lk); + return rc; +} + +static int find_inmem_view(hash_t *hash_views, const char *name, hash_view_t **oView) +{ + int rc = VIEW_ERR_EXIST; + hash_view_t *view = NULL; + Pthread_rwlock_wrlock(&hash_partition_lk); + view = hash_find_readonly(hash_views, &name); + if (!view) { + rc = VIEW_ERR_NOTFOUND; + goto done; + } + logmsg(LOGMSG_USER, "FOUND VIEW %s\n", name); + if (oView) { + logmsg(LOGMSG_USER, "SETTING OUT VIEW TO FOUND VIEW\n"); + *oView = view; + } +done: + Pthread_rwlock_unlock(&hash_partition_lk); + return rc; +} + +unsigned long long hash_partition_get_partition_version(const char *name) +{ + struct dbtable *db = get_dbtable_by_name(name); + if (!db) { + logmsg(LOGMSG_USER, "Could not find partition %s\n", name); + return VIEW_ERR_NOTFOUND; + } + logmsg(LOGMSG_USER, "RETURNING PARTITION VERSION %lld\n", db->tableversion); + return db->tableversion; +} + +int is_hash_partition(const char *name) +{ + struct hash_view *v = NULL; + hash_get_inmem_view(name, &v); + return v != NULL; +} + +/* check if table is part of a hash partition*/ +int is_hash_partition_table(const char *tablename, hash_view_t **oView) { + void *ent; + unsigned int bkt; + hash_view_t *view = NULL; + Pthread_rwlock_rdlock(&hash_partition_lk); + for (view = (hash_view_t *)hash_first(thedb->hash_partition_views, &ent, &bkt);view; + view = (hash_view_t *)hash_next(thedb->hash_partition_views, &ent, &bkt)) { + int n = hash_view_get_num_partitions(view); + for(int i=0;ipartitions[i])==0) goto found; + } + } +found: + *oView = view; + Pthread_rwlock_unlock(&hash_partition_lk); + return view!=NULL; +} + +unsigned long long hash_view_get_version(const char *name) +{ + struct hash_view *v = NULL; + if (find_inmem_view(thedb->hash_partition_views, name, &v)) { + logmsg(LOGMSG_USER, "Could not find partition %s\n", name); + return VIEW_ERR_NOTFOUND; + } + + char **partitions = hash_view_get_partitions(v); + return hash_partition_get_partition_version(partitions[0]); +} + +int hash_get_inmem_view(const char *name, hash_view_t **oView) +{ + int rc; + if (!name) { + logmsg(LOGMSG_ERROR, "%s: Trying to retrieve nameless view!\n", __func__); + return VIEW_ERR_NOTFOUND; + } + rc = find_inmem_view(thedb->hash_partition_views, name, oView); + if (rc != VIEW_ERR_EXIST) { + logmsg(LOGMSG_ERROR, "%s: failed to find in-memory view %s. rc: %d\n", __func__, name, rc); + } + return rc; +} + +int hash_create_inmem_view(hash_view_t *view) +{ + int rc; + if (!view) { + return VIEW_ERR_NOTFOUND; + } + rc = create_inmem_view(thedb->hash_partition_views, view); + if (rc != VIEW_NOERR) { + logmsg(LOGMSG_ERROR, "%s: failed to create in-memory view %s. rc: %d\n", __func__, view->viewname, rc); + } + return rc; +} + +int hash_destroy_inmem_view(hash_view_t *view) +{ + int rc; + if (!view) { + return VIEW_ERR_NOTFOUND; + } + rc = destroy_inmem_view(thedb->hash_partition_views, view); + if (rc != VIEW_NOERR) { + logmsg(LOGMSG_ERROR, "%s: failed to destroy in-memory view %s. rc: %d\n", __func__, view->viewname, rc); + } + return rc; +} +int hash_partition_llmeta_erase(void *tran, hash_view_t *view, struct errstat *err) +{ + int rc = 0; + const char *view_name = hash_view_get_viewname(view); + logmsg(LOGMSG_USER, "Erasing view %s\n", view_name); + rc = hash_views_write_view(tran, view_name, NULL, 0); + if (rc != VIEW_NOERR) { + logmsg(LOGMSG_USER, "Failed to erase llmeta entry for partition %s. rc: %d\n", view_name, rc); + } + ++gbl_views_gen; + view->version = gbl_views_gen; + return rc; +} +int hash_partition_llmeta_write(void *tran, hash_view_t *view, struct errstat *err) +{ + /* + * Get serialized view string + * write to llmeta + */ + char *view_str = NULL; + int view_str_len; + int rc; + + rc = hash_serialize_view(view, &view_str_len, &view_str); + if (rc != VIEW_NOERR) { + errstat_set_strf(err, "Failed to serialize view %s", view->viewname); + errstat_set_rc(err, rc = VIEW_ERR_BUG); + goto done; + } + + logmsg(LOGMSG_USER, "%s\n", view_str); + /* save the view */ + rc = hash_views_write_view(tran, view->viewname, view_str, 0); + if (rc != VIEW_NOERR) { + if (rc == VIEW_ERR_EXIST) + errstat_set_rcstrf(err, VIEW_ERR_EXIST, "View %s already exists", view->viewname); + else + errstat_set_rcstrf(err, VIEW_ERR_LLMETA, "Failed to llmeta save view %s", view->viewname); + goto done; + } + + ++gbl_views_gen; + view->version = gbl_views_gen; + +done: + if (view_str) + free(view_str); + return rc; +} + +char *hash_views_read_all_views(); +/* + * Create hash map of all hash based partitions + * + */ +hash_t *hash_create_all_views() +{ + hash_t *hash_views; + char *views_str; + int rc; + struct hash_view *view = NULL; + cson_value *rootVal = NULL; + cson_object *rootObj = NULL; + cson_object_iterator iter; + cson_kvp *kvp = NULL; + struct errstat err; + hash_views = hash_init_strcaseptr(offsetof(struct hash_view, viewname)); + views_str = hash_views_read_all_views(); + + if (!views_str) { + logmsg(LOGMSG_ERROR, "Failed to read hash views from llmeta\n"); + goto done; + } + + rc = cson_parse_string(&rootVal, views_str, strlen(views_str)); + if (rc) { + logmsg(LOGMSG_ERROR, "error parsing cson string. rc: %d err: %s\n", rc, cson_rc_string(rc)); + goto done; + } + if (!cson_value_is_object(rootVal)) { + logmsg(LOGMSG_ERROR, "error parsing cson: expected object type\n"); + goto done; + } + + rootObj = cson_value_get_object(rootVal); + + if (!rootObj) { + logmsg(LOGMSG_ERROR, "error parsing cson: couldn't retrieve object\n"); + goto done; + } + + cson_object_iter_init(rootObj, &iter); + kvp = cson_object_iter_next(&iter); + while (kvp) { + cson_value *val = cson_kvp_value(kvp); + /* + * Each cson_value above is a cson string representation + * of a hash based partition. Validate that it's a string value + * and deserialize into an in-mem representation. + */ + if (!cson_value_is_string(val)) { + logmsg(LOGMSG_ERROR, "error parsing cson: expected string type\n"); + goto done; + } + const char *view_str = cson_string_cstr(cson_value_get_string(val)); + view = hash_deserialize_view(view_str, &err); + + if (!view) { + logmsg(LOGMSG_ERROR, "%s: failed to deserialize hash view %d %s\n", __func__, err.errval, err.errstr); + goto done; + } + /* we've successfully deserialized a view. + * now add it to the global hash of all hash-parititon based views */ + hash_add(hash_views, view); + kvp = cson_object_iter_next(&iter); + } +done: + if (rootVal) { + cson_value_free(rootVal); + } + + if (views_str) { + free(views_str); + } + return hash_views; +} + +static int hash_update_partition_version(hash_view_t *view, void *tran) +{ + int num_partitions = hash_view_get_num_partitions(view); + char **partitions = hash_view_get_partitions(view); + for (int i=0;itableversion = table_version_select(db, tran); + } else { + logmsg(LOGMSG_USER, "UNABLE TO LOCATE partition %s\n", partitions[i]); + return -1; + } + } + return 0; +} + +int hash_views_update_replicant(void *tran, const char *name) +{ + hash_t *hash_views = thedb->hash_partition_views; + hash_view_t *view = NULL, *v = NULL; + int rc = VIEW_NOERR; + logmsg(LOGMSG_USER, "++++++ Replicant updating views\n"); + char *view_str = NULL; + struct errstat xerr = {0}; + + /* read the view str from updated llmeta */ + rc = hash_views_read_view(tran, name, &view_str); + if (rc == VIEW_ERR_EXIST) { + view = NULL; + goto update_view_hash; + } else if (rc != VIEW_NOERR || !view_str) { + logmsg(LOGMSG_ERROR, "%s: Could not read metadata for view %s\n", __func__, name); + goto done; + } + + logmsg(LOGMSG_USER, "The views string is %s\n", view_str); + /* create an in-mem view object */ + view = hash_deserialize_view(view_str, &xerr); + if (!view) { + logmsg(LOGMSG_ERROR, "%s: failed to deserialize hash view %d %s\n", __func__, xerr.errval, xerr.errstr); + goto done; + } + hash_update_partition_version(view, tran); +update_view_hash: + /* update global hash views hash. + * - If a view with the given name exists, destroy it, create a new view and add to hash + * - If a view with the given name does not exist, add to hash + * - If view is NULL (not there in llmeta), destroy the view and remove from hash */ + + if (!view) { + logmsg(LOGMSG_USER, "The deserialized view is NULL. This is a delete case \n"); + /* It's okay to do this lockless. The subsequent destroy method + * grabs a lock and does a find again */ + v = hash_find_readonly(hash_views, &name); + if (!v) { + logmsg(LOGMSG_ERROR, "Couldn't find view in llmeta or in-mem hash\n"); + goto done; + } + rc = hash_destroy_inmem_view(v); + if (rc != VIEW_NOERR) { + logmsg(LOGMSG_ERROR, "%s:%d Failed to destroy inmem view\n", __func__, __LINE__); + } + } else { + logmsg(LOGMSG_USER, "The deserialized view is NOT NULL. this is a view create/update case \n"); + rc = hash_destroy_inmem_view(view); + if (rc != VIEW_NOERR && rc != VIEW_ERR_NOTFOUND) { + logmsg(LOGMSG_ERROR, "%s:%d Failed to destroy inmem view\n", __func__, __LINE__); + goto done; + } + rc = hash_create_inmem_view(view); + if (rc != VIEW_NOERR) { + logmsg(LOGMSG_ERROR, "%s:%d Failed to create inmem view\n", __func__, __LINE__); + goto done; + } + } + + ++gbl_views_gen; + if (view) { + view->version = gbl_views_gen; + } + return rc; +done: + if (view_str) { + free(view_str); + } + if (view) { + free_hash_view(view); + } + return rc; +} +static int getDbHndl(cdb2_hndl_tp **hndl, const char *dbname, const char *tier) { + int rc; + + if (!tier) { + rc = cdb2_open(hndl, dbname, "localhost", CDB2_DIRECT_CPU); + } else { + rc = cdb2_open(hndl, dbname, tier, 0); + } + if (rc != 0) { + logmsg(LOGMSG_ERROR, "%s:%d Failed to connect to %s@%s (rc: %d)\n", + __func__, __LINE__, gbl_dbname, "local", rc); + cdb2_close(*hndl); + } + return rc; +} + +/* extract everything after "create table " + * and before "partitioned by ("*/ +char *extractSchema(const char *insertQuery) { + const char *start = strchr(insertQuery, '('); // Find the first '(' + const char *end = strchr(insertQuery, ')'); // Find the first ')' + char *result = NULL; + if (start != NULL && end != NULL && end > start) { + size_t length = end - start - 1; // Calculate the length of the substring + result = (char *)malloc(length + 1); // Allocate memory for the result + + strncpy(result, start + 1, length); // Copy the substring between parentheses + result[length] = '\0'; // Null-terminate the result + + logmsg(LOGMSG_USER, "Extracted string: %s\n", result); + } else { + logmsg(LOGMSG_ERROR, "No match found\n"); + } + return result; +} + +/* Create an insert query against shards + * create table tableName(...) + * */ +static char *getCreateStatement(const char *insertQuery, const char *tableName) { + /* strip away CREATESHARDS and extract schema*/ + const char *schema = extractSchema(insertQuery); + size_t createStatementLen = 13 + strlen(tableName) + strlen(schema) + 2 + 1; /* 13 -> "CREATE TABLE ", 2-> (, ) */ + char *createStatement = (char *)malloc(createStatementLen); + strcpy(createStatement, "CREATE TABLE "); + strcat(createStatement, tableName); + strcat(createStatement, "("); + strcat(createStatement, schema); + strcat(createStatement, ")"); + logmsg(LOGMSG_USER, "The create statment is %s\n", createStatement); + return createStatement; +} + +static char *getDropStatement(const char *tableName) { + size_t dropStatementLen = 21 + strlen(tableName) + 1; /* 11-> "DROP TABLE IF EXISTS "*/ + char *dropStatement = (char *)malloc(dropStatementLen); + strcpy(dropStatement, "DROP TABLE IF EXISTS "); + strcat(dropStatement, tableName); + logmsg(LOGMSG_USER, "The drop statemetn is %s\n", dropStatement); + return dropStatement; +} + +/* + */ +void deleteRemoteTables(struct comdb2_partition *partition, int startIdx) { + cdb2_hndl_tp *hndl; + int rc; + char *savePtr = NULL, *remoteDbName = NULL, *remoteTableName = NULL; + char *tier = NULL; + int i; + for(i = startIdx; i >= 0; i--) { + char *p = partition->u.hash.partitions[i]; + remoteDbName = strtok_r(p,".", &savePtr); + remoteTableName = strtok_r(NULL, ".", &savePtr); + if (remoteTableName == NULL) { + remoteTableName = remoteDbName; + remoteDbName = gbl_dbname; + } + logmsg(LOGMSG_USER, "The db is %s, the table is %s\n", remoteDbName, remoteTableName); + + if (!strcmp(gbl_dbname, remoteDbName)) { + rc = getDbHndl(&hndl, gbl_dbname, NULL); + } else { + if (gbl_sharding_local) { + tier = "local"; + } else { + tier = (char *)mach_class_class2name(get_my_mach_class()); + } + if (!tier) { + logmsg(LOGMSG_ERROR, "Failed to get tier for remotedb %s\n", p); + abort(); + } + logmsg(LOGMSG_USER, "GOT THE TIER AS %s\n", tier); + rc = getDbHndl(&hndl, remoteDbName, tier); + } + if (rc) { + logmsg(LOGMSG_ERROR, "Failed to get handle. rc: %d, err: %s\n", rc, cdb2_errstr(hndl)); + logmsg(LOGMSG_ERROR, "Failed to drop table %s on remote db %s\n", remoteDbName, remoteTableName); + goto cleanup; + } + char *dropStatement = getDropStatement(remoteTableName); + if (!dropStatement) { + logmsg(LOGMSG_ERROR, "Failed to generate drop Query\n"); + logmsg(LOGMSG_ERROR, "Failed to drop table %s on remote db %s\n", remoteDbName, remoteTableName); + goto close_handle; + } else { + logmsg(LOGMSG_USER, "The generated drop statement is %s\n", dropStatement); + } + + rc = cdb2_run_statement(hndl, dropStatement); + if (rc) { + logmsg(LOGMSG_ERROR, "Failed to drop table %s on database %s. rc: %d, err: %s\n", remoteTableName, remoteDbName, rc, cdb2_errstr(hndl)); + } +close_handle: + cdb2_close(hndl); +cleanup: + free(remoteDbName); + free(remoteTableName); + } +} + +int createRemoteTables(struct comdb2_partition *partition) { + cdb2_hndl_tp *hndl; + int rc; + int num_partitions = partition->u.hash.num_partitions; + int i; + char *tier = NULL; + char *savePtr = NULL, *remoteDbName = NULL, *remoteTableName = NULL, *p = NULL; + for (i = 0; i < num_partitions; i++) { + p = strdup(partition->u.hash.partitions[i]); + remoteDbName = strtok_r(p,".", &savePtr); + remoteTableName = strtok_r(NULL, ".", &savePtr); + if (remoteTableName == NULL) { + remoteTableName = remoteDbName; + remoteDbName = gbl_dbname; + } + + logmsg(LOGMSG_USER, "The db is %s, the table is %s\n", remoteDbName, remoteTableName); + + if (!strcmp(gbl_dbname, remoteDbName)) { + rc = getDbHndl(&hndl, gbl_dbname, NULL); + } else { + if (gbl_sharding_local) { + tier = "local"; + } else { + tier = (char *)mach_class_class2name(get_my_mach_class()); + } + if (!tier) { + logmsg(LOGMSG_ERROR, "Failed to get tier for remotedb %s\n", p); + abort(); + } + logmsg(LOGMSG_USER, "GOT THE TIER AS %s\n", tier); + rc = getDbHndl(&hndl, remoteDbName, tier); + /*const char *tier = mach_class_class2name(get_my_mach_class()); + if (!tier) { + logmsg(LOGMSG_ERROR, "Failed to get tier for remotedb %s\n", p); + abort(); + } + logmsg(LOGMSG_USER, "GOT THE TIER AS %s\n", tier);*/ + } + if (rc) { + goto cleanup_tables; + } + char *createStatement = getCreateStatement(partition->u.hash.createQuery, remoteTableName); + if (!createStatement) { + logmsg(LOGMSG_ERROR, "Failed to generate createQuery\n"); + } else { + logmsg(LOGMSG_USER, "The generated create statement is %s\n", createStatement); + } + + rc = cdb2_run_statement(hndl, createStatement); + if (rc) { + logmsg(LOGMSG_ERROR, "Failed to create table %s on database %s. rc: %d, err: %s\n", remoteTableName, remoteDbName, rc, cdb2_errstr(hndl)); + goto cleanup_tables; + } + + /* Now setup catalog information on remote databases*/ + rc = cdb2_run_statement(hndl, partition->u.hash.createQuery); + if (rc) { + logmsg(LOGMSG_ERROR, "Failed to setup partition metatdata on database %s. rc: %d, err: %s\n", remoteDbName, rc, cdb2_errstr(hndl)); + goto cleanup_tables; + } + cdb2_close(hndl); + free(p); + } + return 0; +cleanup_tables: + /* close most recent handle*/ + cdb2_close(hndl); + free(p); + deleteRemoteTables(partition, i); + /*TODO AAR: Also delete partition info from remote tables*/ + return -1; +} +int remove_alias(const char *); +void deleteLocalAliases(struct comdb2_partition *partition, int startIdx) { + int i, rc; + cdb2_hndl_tp *hndl; + char *savePtr = NULL, *remoteDbName = NULL, *remoteTableName = NULL, *p = NULL; + /* get handle to local db*/ + rc = getDbHndl(&hndl, gbl_dbname, NULL); + if (rc) { + goto fatal; + } + for(i = startIdx; i >= 0; i--) { + p = strdup(partition->u.hash.partitions[i]); + remoteDbName = strtok_r(p,".", &savePtr); + remoteTableName = strtok_r(NULL, ".", &savePtr); + if (remoteTableName == NULL) { + remoteTableName = remoteDbName; + remoteDbName = gbl_dbname; + } + + logmsg(LOGMSG_USER, "The db is %s, the table is %s\n", remoteDbName, remoteTableName); + + char localAlias[MAXPARTITIONLEN+1]; /* remoteDbName_remoteTableName */ + char deleteAliasStatement[MAXPARTITIONLEN*2+3+10+1]; /* 3->spaces 8->'PUT ALIAS' 1 */ + rc = snprintf(localAlias, sizeof(localAlias), "%s_%s", remoteDbName, remoteTableName); + if (!rc) { + logmsg(LOGMSG_ERROR, "Failed to generate local alias name. rc: %d %s\n", rc, localAlias); + goto fatal; + } else { + logmsg(LOGMSG_ERROR, "The generated alias is %s\n", localAlias); + } + rc = snprintf(deleteAliasStatement,sizeof(deleteAliasStatement), "PUT ALIAS %s ''", localAlias); + // drop the alias + if (!rc) { + logmsg(LOGMSG_ERROR, "Failed to generate delete alias query\n"); + goto fatal; + } else { + logmsg(LOGMSG_USER, "The generated delete statement for Alias is %s\n", deleteAliasStatement); + } + + rc = cdb2_run_statement(hndl, deleteAliasStatement); + if (rc) { + logmsg(LOGMSG_ERROR, "Failed to delete alias %s on database %s. rc: %d, err: %s\n", remoteTableName, remoteDbName, rc, cdb2_errstr(hndl)); + goto fatal; + } + free(p); + } + return; +fatal: + cdb2_close(hndl); + logmsg(LOGMSG_FATAL, "Failed to drop alias while undoing create partition failure!! Giving up..\n"); +} + +int createLocalAliases(struct comdb2_partition *partition) { + cdb2_hndl_tp *hndl; + int rc; + int num_partitions = partition->u.hash.num_partitions; + int i=0; + char *savePtr = NULL, *remoteDbName = NULL, *remoteTableName = NULL, *p = NULL; + /* get handle to local db*/ + rc = getDbHndl(&hndl, gbl_dbname, NULL); + if (rc) { + goto cleanup_aliases; + } + for (i = 0; i < num_partitions; i++) { + p = strdup(partition->u.hash.partitions[i]); + remoteDbName = strtok_r(p,".", &savePtr); + remoteTableName = strtok_r(NULL, ".", &savePtr); + if (remoteTableName == NULL) { + remoteTableName = remoteDbName; + remoteDbName = gbl_dbname; + } + + logmsg(LOGMSG_USER, "The db is %s, the table is %s\n", remoteDbName, remoteTableName); + + char localAlias[MAXPARTITIONLEN+1]; /* remoteDbName_remoteTableName */ + char createAliasStatement[MAXPARTITIONLEN*2+3+10+1]; /* 3->spaces 8->'PUT ALIAS' */ + rc = snprintf(localAlias, sizeof(localAlias), "%s_%s", remoteDbName, remoteTableName); + if (!rc) { + logmsg(LOGMSG_ERROR, "Failed to generate local alias name. rc: %d %s\n", rc, localAlias); + goto cleanup_aliases; + } else { + logmsg(LOGMSG_ERROR, "The generated alias is %s\n", localAlias); + } + rc = snprintf(createAliasStatement,sizeof(createAliasStatement), "PUT ALIAS %s '%s.%s'", localAlias, remoteDbName, remoteTableName); + if (!rc) { + logmsg(LOGMSG_ERROR, "Failed to generate create alias query\n"); + goto cleanup_aliases; + } else { + logmsg(LOGMSG_USER, "The generated create statement for Alias is %s\n", createAliasStatement); + } + + rc = cdb2_run_statement(hndl, createAliasStatement); + if (rc) { + logmsg(LOGMSG_ERROR, "Failed to create alias %s on database %s. rc: %d, err: %s\n", remoteTableName, remoteDbName, rc, cdb2_errstr(hndl)); + goto cleanup_aliases; + } + free(p); + } + // dump_alias_info(); + cdb2_close(hndl); + return 0; +cleanup_aliases: + /* close most recent handle*/ + cdb2_close(hndl); + free(p); + deleteLocalAliases(partition, i); + return -1; +} + +static int hash_views_collect(void *obj, void *arg) { + struct systable_hashpartitions *recs = (struct systable_hashpartitions *)arg; + hash_view_t *view = (hash_view_t *)obj; + if (!view) { + logmsg(LOGMSG_ERROR, "VIEW IS NULL\n"); + return 0; + } + struct systable_hashpartitions *elem = NULL; + ch_hash_t *ch = view->partition_hash; + int64_t prevMax = 0; + char *dta = NULL; + for(int i=0;inum_keyhashes;i++){ + elem = &recs[recs->recno++]; + elem->name = strdup((char *)view->viewname); + dta = (char *)ch->key_hashes[i]->node->data; + elem->shardname = strdup(dta); + elem->minKey = prevMax+1; + elem->maxKey = ch->key_hashes[i]->hash_val; + prevMax = elem->maxKey; + } + return 0; +} +static int hash_views_calculate_size(void *obj, void *arg) { + int *nrecs = (int *)arg; + hash_view_t *view = (hash_view_t *)obj; + *nrecs = *nrecs + view->num_partitions; + return 0; +} +int hash_systable_collect(void **data, int *nrecords) { + hash_t *views = NULL; + struct systable_hashpartitions *recs = NULL; + int nrecs = 0, rc = 0; + Pthread_rwlock_rdlock(&hash_partition_lk); + if (thedb->hash_partition_views == NULL) { + goto done; + } + views = thedb->hash_partition_views; + hash_for(views, hash_views_calculate_size, &nrecs); + recs = calloc(nrecs , sizeof(struct systable_hashpartitions)); + if (!recs) { + rc = -1; + goto done; + } + hash_for(views, hash_views_collect, recs); +done: + Pthread_rwlock_unlock(&hash_partition_lk); + *nrecords = nrecs; + *data = recs; + return rc; +} +void hash_systable_free(void *data, int nrecords) { + struct systable_hashpartitions *recs = (struct systable_hashpartitions *)data; + for (int i=0; i +typedef struct hash_partition hash_partition_t; +typedef struct hash_view hash_view_t; +typedef struct hash_views hash_views_t; + +typedef struct systable_hashpartitions { + char *name; + char *shardname; + int64_t minKey; + int64_t maxKey; + int recno; +} systable_hashpartition_t; + +hash_view_t *create_hash_view(const char *viewname, const char *tablename, uint32_t num_columns, + char columns[][MAXCOLNAME], uint32_t num_partitions, + char partitions[][MAXPARTITIONLEN], struct errstat *err); +int hash_create_inmem_view(hash_view_t *); +int hash_destroy_inmem_view(hash_view_t *); +const char *hash_view_get_viewname(struct hash_view *view); +const char *hash_view_get_tablename(struct hash_view *view); +char **hash_view_get_keynames(struct hash_view *view); +int hash_view_get_num_partitions(struct hash_view *view); +int hash_view_get_num_keys(struct hash_view *view); +char** hash_view_get_partitions(struct hash_view *view); +int hash_view_get_sqlite_view_version(struct hash_view *view); +int hash_partition_get_hash_val(struct hash_partition *partition); +const char *hash_partition_get_dbname(struct hash_partition *partition); +int hash_partition_llmeta_write(void *tran, hash_view_t *view, struct errstat *err); +int hash_partition_llmeta_erase(void *tran, hash_view_t *view, struct errstat *err); +hash_t *hash_create_all_views(); +int hash_views_update_replicant(void *tran, const char *name); +int hash_get_inmem_view(const char *name, hash_view_t **oView); +int is_hash_partition(const char *name); +int is_hash_partition_table(const char *tablename, hash_view_t **oView); +unsigned long long hash_view_get_version(const char *name); +int hash_systable_collect(void **data, int *nrecords); +void hash_systable_free(void *data, int nrecords); +#endif + diff --git a/db/osqlcomm.c b/db/osqlcomm.c index fda9d17d32..78c22f33b9 100644 --- a/db/osqlcomm.c +++ b/db/osqlcomm.c @@ -6302,6 +6302,7 @@ static int _process_single_table_sc(struct ireq *iq) int rc; /* schema change for a regular table */ + logmsg(LOGMSG_USER, "RUNNING SCHEMACHANGE for TABLE %s\n", sc->tablename ); rc = start_schema_change_tran(iq, NULL); if (rc || sc->preempted == SC_ACTION_RESUME || sc->kind == SC_ALTERTABLE_PENDING) { @@ -6504,6 +6505,74 @@ static int _process_partitioned_table_merge(struct ireq *iq) static struct schema_change_type* _create_logical_cron_systable(const char *tblname); +static int hash_sc_partition(const char *partition, struct schema_change_type *sc) +{ + struct ireq *iq = sc->iq; + struct schema_change_type *new_sc = clone_schemachange_type(sc); + int rc = 0; + new_sc->iq = sc->iq; + new_sc->tran = sc->tran; + iq->sc = new_sc; /* We won't lose iq->sc by doing this. + we've kept track of it in iq->sc_pending + which is set to sc_next below*/ + /* copy partition name */ + strncpy0(iq->sc->tablename, partition, sizeof(iq->sc->tablename)); + + logmsg(LOGMSG_USER, "RUNNING SC TRAN FOR TABLE %s\n", sc->tablename); + rc = start_schema_change_tran(iq, NULL); + if ((rc != SC_ASYNC && rc != SC_COMMIT_PENDING) || iq->sc->preempted == SC_ACTION_RESUME || + iq->sc->kind == SC_ALTERTABLE_PENDING) { + iq->sc = NULL; + } else { + iq->sc->sc_next = iq->sc_pending; + iq->sc_pending = iq->sc; + } + return (rc == SC_ASYNC || rc == SC_COMMIT_PENDING) ? 0 : rc; +} + +static int _process_single_table_sc_hash_partitioning(struct ireq *iq) +{ + struct schema_change_type *sc = iq->sc; + int rc; + struct errstat err = {0}; + hash_view_t *view = NULL; + assert(sc->partition.type == PARTITION_ADD_COL_HASH || sc->partition.type == PARTITION_REMOVE_COL_HASH); + + if (sc->partition.type == PARTITION_ADD_COL_HASH) { + /* create a hash based partition */ + sc->newhashpartition = create_hash_view(sc->partition.u.hash.viewname, sc->tablename, sc->partition.u.hash.num_columns, + sc->partition.u.hash.columns, sc->partition.u.hash.num_partitions, sc->partition.u.hash.partitions, &err); + if (!sc->newhashpartition) { + logmsg(LOGMSG_ERROR, "Failed to create new Mod partition rc %d \"%s\"\n", err.errval, err.errstr); + sc_errf(sc, "Failed to create new Mod partition rc %d \"%s\"", err.errval, err.errstr); + rc = ERR_SC; + return rc; + } + view = sc->newhashpartition; + } else { + /* If it's a DROP then copy base table name */ + /* Also, grab a pointer to the in-mem view structure */ + hash_get_inmem_view(sc->tablename, &view); + sc->newhashpartition = view; + strncpy(sc->tablename, hash_view_get_tablename(view), sizeof(sc->tablename)); + } + /* set publish and unpublish callbacks to create/destroy inmem views */ + sc->publish = partition_publish; + sc->unpublish = partition_unpublish; + + /*rc = start_schema_change_tran(iq, NULL); + if ((rc != SC_ASYNC && rc != SC_COMMIT_PENDING) || sc->preempted == SC_ACTION_RESUME || + sc->kind == SC_ALTERTABLE_PENDING) { + iq->sc = NULL; + } else { + iq->sc->sc_next = iq->sc_pending; + iq->sc_pending = iq->sc; + } + iq->osql_flags |= OSQL_FLAGS_SCDONE; + return rc;*/ + return _process_single_table_sc(iq); +} + static int _process_single_table_sc_partitioning(struct ireq *iq) { struct schema_change_type *sc = iq->sc; @@ -6766,6 +6835,8 @@ int osql_process_schemachange(struct schema_change_type *sc, uuid_t uuid) rc = _process_single_table_sc(iq); } else if (sc->partition.type == PARTITION_MERGE) { rc = _process_single_table_sc_merge(iq); + } else if (sc->partition.type == PARTITION_ADD_COL_HASH || sc->partition.type == PARTITION_REMOVE_COL_HASH) { + rc = _process_single_table_sc_hash_partitioning(iq); } else { rc = _process_single_table_sc_partitioning(iq); } diff --git a/db/osqlsqlthr.c b/db/osqlsqlthr.c index 148b29dab6..c6e798074e 100644 --- a/db/osqlsqlthr.c +++ b/db/osqlsqlthr.c @@ -1811,13 +1811,14 @@ int osql_schemachange_logic(struct schema_change_type *sc, osql->running_ddl = 1; + logmsg(LOGMSG_USER, "%s:%d REACHED HERE \n", __func__, __LINE__); if (gbl_reject_mixed_ddl_dml && clnt->dml_tables) { hash_info(clnt->dml_tables, NULL, NULL, NULL, NULL, &count, NULL, NULL); if (count > 0) { return SQLITE_DDL_MISUSE; } } - + logmsg(LOGMSG_USER, "%s:%d REACHED HERE \n", __func__, __LINE__); if (clnt->dml_tables && hash_find_readonly(clnt->dml_tables, sc->tablename)) { return SQLITE_DDL_MISUSE; @@ -1834,11 +1835,16 @@ int osql_schemachange_logic(struct schema_change_type *sc, if (thd->clnt->dbtran.mode == TRANLEVEL_SOSQL) { if (usedb && !get_dbtable_by_name(sc->tablename)) { unsigned long long version; + hash_view_t *view = NULL; + hash_get_inmem_view(sc->tablename, &view); char *first_shardname = timepart_shard_name(sc->tablename, 0, 1, &version); if (first_shardname) { sc->usedbtablevers = version; free(first_shardname); + } else if (view) { + /* use underlying placeholder table to get version */ + sc->usedbtablevers = comdb2_table_version(hash_view_get_tablename(view)); } else /* user view */ usedb = 0; } @@ -1849,6 +1855,7 @@ int osql_schemachange_logic(struct schema_change_type *sc, comdb2uuidcpy(sc->uuid, osql->uuid); do { + logmsg(LOGMSG_USER, "SENDING sc->tablename : %s, usedbtablevers: %d\n", sc->tablename, sc->usedbtablevers); rc = osql_send_schemachange(&osql->target, osql->rqid, thd->clnt->osql.uuid, sc, NET_OSQL_SOCK_RPL); diff --git a/db/sqlinterfaces.c b/db/sqlinterfaces.c index d1d7a2b0ea..ceb90df07f 100644 --- a/db/sqlinterfaces.c +++ b/db/sqlinterfaces.c @@ -4433,6 +4433,14 @@ static int prepare_engine(struct sqlthdstate *thd, struct sqlclntstate *clnt, /* save the views generation number */ thd->views_gen = gbl_views_gen; + if (thedb->hash_partition_views) { + rc = hash_views_sqlite_update(thedb->hash_partition_views, thd->sqldb, &xerr); + if (rc != VIEW_NOERR) { + logmsg(LOGMSG_FATAL, "failed to create views rc=%d errstr=\"%s\"\n", xerr.errval, xerr.errstr); + /* TODO: AAR -> review if abort() is needed here */ + abort(); + } + } } } done: /* reached via goto for error handling case. */ diff --git a/db/views.c b/db/views.c index 46847a6071..879158b8b3 100644 --- a/db/views.c +++ b/db/views.c @@ -3371,17 +3371,38 @@ int partition_publish(tran_type *tran, struct schema_change_type *sc) abort(); /* restart will fix this*/ break; } + case PARTITION_ADD_COL_HASH: { + partition_name = strdup((char *)hash_view_get_viewname(sc->newhashpartition)); + rc = hash_create_inmem_view(sc->newhashpartition); + break; + } + case PARTITION_REMOVE_COL_HASH: { + partition_name = strdup((char *)hash_view_get_viewname(sc->newhashpartition)); + rc = hash_destroy_inmem_view(sc->newhashpartition); + + break; + } } /*switch */ int bdberr = 0; - rc = bdb_llog_partition(thedb->bdb_env, tran, - partition_name ? partition_name - : (char *)sc->timepartition_name, - &bdberr); - if (rc || bdberr != BDBERR_NOERROR) { - logmsg(LOGMSG_ERROR, "%s: Failed to log scdone for partition %s\n", - __func__, partition_name); + if (sc->partition.type == PARTITION_ADD_COL_HASH || sc->partition.type == PARTITION_REMOVE_COL_HASH) { + rc = bdb_llog_hash_partition(thedb->bdb_env, tran, partition_name, &bdberr); + if (rc || bdberr != BDBERR_NOERROR) { + logmsg(LOGMSG_ERROR, "%s: Failed to log scdone for hash partition %s\n", __func__, + sc->newhashpartition ? hash_view_get_viewname(sc->newhashpartition) : sc->tablename); + } + } else { + rc = bdb_llog_partition(thedb->bdb_env, tran, + partition_name ? partition_name + : (char *)sc->timepartition_name, + &bdberr); + if (rc || bdberr != BDBERR_NOERROR) { + logmsg(LOGMSG_ERROR, "%s: Failed to log scdone for partition %s\n", + __func__, partition_name); + } + } + if (partition_name) { + free(partition_name); } - free(partition_name); } return rc; } @@ -3404,6 +3425,18 @@ void partition_unpublish(struct schema_change_type *sc) abort(); /* restart will fix this*/ break; } + case PARTITION_ADD_COL_HASH: { + hash_destroy_inmem_view(sc->newhashpartition); + break; + } + case PARTITION_REMOVE_COL_HASH: { + int rc = hash_create_inmem_view(sc->newhashpartition); + if (rc) { + /* Really no way forward except restart */ + abort(); + } + break; + } } } } diff --git a/db/views.h b/db/views.h index 434fbe5409..7f7cb45b6c 100644 --- a/db/views.h +++ b/db/views.h @@ -12,6 +12,7 @@ #include #include "errstat.h" #include "cron.h" +#include "hash_partition.h" enum view_type { VIEW_TIME_PARTITION }; @@ -61,6 +62,10 @@ enum view_partition_errors { VIEW_ERR_CREATE = -11 /* error with pthread create */ , VIEW_ERR_ALL_SHARDS = -12 /* last shard was processed */ + , + VIEW_ERR_NOTFOUND = -13 /* Could not find a view */ + , + VIEW_ERR_VALUE = -14 /* Error while setting a value */ }; typedef struct timepart_view timepart_view_t; @@ -507,5 +512,9 @@ int logical_partition_next_rollout(const char *name); * return the view matching the name, if any; */ timepart_view_t *timepart_reaquire_view(const char *partname); - +int hash_serialize_view(hash_view_t *view, int *len, char **out); +hash_view_t *hash_deserialize_view(const char *view_str, struct errstat *err); +int hash_views_write_view(void *tran, const char *viewname, const char *str, int override); +int hash_views_read_view(void *tran, const char *name, char **pstr); +int hash_views_sqlite_update(hash_t *views, sqlite3 *db, struct errstat *err); #endif diff --git a/db/views_persist.c b/db/views_persist.c index 2a18c7a17a..c9843fa315 100644 --- a/db/views_persist.c +++ b/db/views_persist.c @@ -8,7 +8,7 @@ #define LLMETA_PARAM_NAME "timepart_views" #define LLMETA_TABLE_NAME "sys_views" - +#define LLMETA_HASH_VIEWS_TABLE "hash_views" /** * Write a CSON representation overriding the current llmeta * @@ -135,3 +135,63 @@ char *views_read_all_views(void) return blob; } + +/** + * Read a view CSON representation from llmeta + * + */ +int hash_views_read_view(void *tran, const char *name, char **pstr) +{ + int rc; + + *pstr = NULL; + + rc = bdb_get_table_parameter_tran(LLMETA_HASH_VIEWS_TABLE, name, pstr, tran); + if (rc == 1) { + return VIEW_ERR_EXIST; + } else if (rc) { + return VIEW_ERR_LLMETA; + } + + return VIEW_NOERR; +} +/** + * Write a CSON representation of a view of hash based shard partitions + * The view is internally saved as a parameter "viewname" for the table + * "hash_views". + */ +int hash_views_write_view(void *tran, const char *viewname, const char *str, int override) +{ + int rc; + + if (str) { + char *oldstr = NULL; + rc = hash_views_read_view(tran, viewname, &oldstr); + if (rc == VIEW_NOERR) { + logmsg(LOGMSG_ERROR, "View \"%s\" already exists, old string \"%s\"\n", viewname, oldstr); + free(oldstr); + return VIEW_ERR_EXIST; + } + rc = bdb_set_table_parameter(tran, LLMETA_HASH_VIEWS_TABLE, viewname, str); + } else { + /* this is a delete */ + rc = bdb_clear_table_parameter(tran, LLMETA_HASH_VIEWS_TABLE, viewname); + } + rc = (rc) ? VIEW_ERR_LLMETA : VIEW_NOERR; + + return rc; +} + +char *hash_views_read_all_views(void) +{ + char *blob = NULL; + int blob_len = 0; + int rc; + + rc = bdb_get_table_csonparameters(NULL, LLMETA_HASH_VIEWS_TABLE, &blob, &blob_len); + if (rc) { + return NULL; + } + + return blob; +} diff --git a/db/views_serial.c b/db/views_serial.c index f6128b91bc..484ec7b453 100644 --- a/db/views_serial.c +++ b/db/views_serial.c @@ -56,6 +56,7 @@ #include "views.h" #include "comdb2uuid.h" #include "bdb_access.h" +#include "hash_partition.h" static char *_concat(char *str, int *len, const char *fmt, ...); @@ -1349,7 +1350,63 @@ static int _cson_extract_start_string(cson_object *cson_obj, const char *param, return ret_int; } +static int _cson_set_int(cson_object *obj, const char *keyname, uint64_t value, struct errstat *err) +{ + int rc; + cson_value *v = cson_value_new_integer(value); + if (!v) { + err->errval = VIEW_ERR_VALUE; + snprintf(err->errstr, sizeof(err->errstr), "couldn't create cson value for integer %ld\n", value); + return -1; + } + + rc = cson_object_set(obj, keyname, v); + if (rc) { + err->errval = VIEW_ERR_VALUE; + snprintf(err->errstr, sizeof(err->errstr), "couldn't set integer field %s with value %ld\n", keyname, value); + return -1; + } + + return rc; +} + +static int _cson_set_or_create_array(cson_object *obj, const char *keyname, cson_value *arr_val, struct errstat *err) +{ + int rc; + if (!arr_val) { + arr_val = cson_value_new_array(); + if (!arr_val) { + err->errval = VIEW_ERR_VALUE; + snprintf(err->errstr, sizeof(err->errstr), "couldn't create empty cson array\n"); + return -1; + } + } + rc = cson_object_set(obj, keyname, arr_val); + if (rc) { + err->errval = VIEW_ERR_VALUE; + snprintf(err->errstr, sizeof(err->errstr), "couldn't set array field %s\n", keyname); + } + return rc; +} + +static int _cson_set_string(cson_object *obj, const char *keyname, const char *value, struct errstat *err) +{ + int rc; + cson_value *v = cson_value_new_string(value, strlen(value)); + if (!v) { + err->errval = VIEW_ERR_VALUE; + snprintf(err->errstr, sizeof(err->errstr), "couldn't create cson value for string %s\n", value); + return -1; + } + rc = cson_object_set(obj, keyname, v); + if (rc) { + err->errval = VIEW_ERR_VALUE; + snprintf(err->errstr, sizeof(err->errstr), "couldn't set string field %s with value %s\n", keyname, value); + return -1; + } + return rc; +} /* this will extract all the fields and populate a provided view */ static timepart_view_t *partition_deserialize_cson_value(cson_value *cson_view, struct errstat *err) @@ -1822,7 +1879,209 @@ int timepart_apply_file(const char *filename) return rc; } +/* + * Serialize a partition into a cson object + */ +int hash_serialize_partition(char **partitions, int num_partitions, cson_array *arr) +{ + int rc = 0; + for (int i=0;ierrstr, sizeof(err->errstr), "Parsing JSON error rc=%d err:%s\n", rc, cson_rc_string(rc)); + err->errval = VIEW_ERR_PARAM; + goto done; + } + + rc = cson_value_is_object(rootVal); + rootObj = cson_value_get_object(rootVal); + + /* VIEWNAME */ + viewname = _cson_extract_str(rootObj, "VIEWNAME", err); + if (!viewname) { + err_str = "INVALID CSON. Couldn't find 'VIEWNAME' key"; + goto error; + } + /* TABLENAME */ + tablename = _cson_extract_str(rootObj, "TABLENAME", err); + if (!tablename) { + err_str = "INVALID CSON. Couldn't find 'TABLENAME' key"; + goto error; + } + + /* NUMKEYS */ + num_keys = _cson_extract_int(rootObj, "NUMKEYS", err); + if (num_keys < 0) { + err_str = "INVALID CSON. couldn't find 'NUMKEYS' key"; + goto error; + } + + /* KEYNAMES */ + keys_arr = _cson_extract_array(rootObj, "KEYNAMES", err); + if (!keys_arr) { + err_str = "INVALID CSON. couldn't find 'KEYNAMES' key"; + goto error; + } + + char keynames[MAXCOLUMNS][MAXCOLNAME]; + for (int i = 0; i < num_keys; i++) { + arrVal = cson_array_get(keys_arr, i); + if (!cson_value_is_string(arrVal)) { + err_str = "INVALID CSON. Array element is not a string"; + goto error; + } + strcpy(keynames[i], cson_value_get_cstr(arrVal)); + } + + /* NUMPARTITIONS */ + num_partitions = _cson_extract_int(rootObj, "NUMPARTITIONS", err); + if (num_partitions < 0) { + err_str = "INVALID CSON. couldn't find 'NUMPARTITIONS' key"; + goto error; + } + + /* SHARDS */ + partitions = _cson_extract_array(rootObj, "PARTITIONS", err); + if (!partitions) { + err_str = "INVALID CSON. couldn't find 'PARTITIONS' key"; + goto error; + } + + char dbnames[MAXPARTITIONS][MAXPARTITIONLEN]; + + for (int i = 0; i < num_partitions; i++) { + arrVal = cson_array_get(partitions, i); + if (!cson_value_is_string(arrVal)) { + err_str = "INVALID CSON. Array element is not a string"; + goto error; + } + strcpy(dbnames[i], cson_value_get_cstr(arrVal)); + } + + view = create_hash_view(viewname, tablename, num_keys, keynames, num_partitions, dbnames, err); + if (rootVal) { + cson_value_free(rootVal); + } + + return view; +error: + if (err_str) { + errstat_set_rcstrf(err, VIEW_ERR_PARAM, err_str); + } + if (rootVal) { + cson_value_free(rootVal); + } + return NULL; + +done: + if (rootVal) { + cson_value_free(rootVal); + } + return view; +} #ifdef VIEWS_PERSIST_TEST const char *tests[2] = {"[\n" diff --git a/db/views_sqlite.c b/db/views_sqlite.c index 5c8b62a4fc..77db979966 100644 --- a/db/views_sqlite.c +++ b/db/views_sqlite.c @@ -185,7 +185,7 @@ static char *_views_create_view_query(timepart_view_t *view, sqlite3 *db, if (view->nshards == 0) { err->errval = VIEW_ERR_BUG; - snprintf(err->errstr, sizeof(err->errstr), "View %s has no shards???\n", + snprintf(err->errstr, sizeof(err->errstr), "View %s has no partitions???\n", view->name); return NULL; } @@ -204,8 +204,8 @@ static char *_views_create_view_query(timepart_view_t *view, sqlite3 *db, goto malloc; } - /* generate the select union for shards */ - /* TODO: put conditions for shards */ + /* generate the select union for partitions */ + /* TODO: put conditions for partitions */ select_str = sqlite3_mprintf(""); for (i = 0; i < view->nshards; i++) { tmp_str = sqlite3_mprintf("%s%sSELECT %s FROM \"%w\"", select_str, @@ -324,6 +324,164 @@ static int _view_delete_if_missing(const char *name, sqlite3 *db, void *arg) return 0; } +char *hash_views_create_view_query(hash_view_t *view, sqlite3 *db, struct errstat *err) +{ + char *select_str = NULL; + char *cols_str = NULL; + char *tmp_str = NULL; + char *ret_str = NULL; + int num_partitions = hash_view_get_num_partitions(view); + const char *view_name = hash_view_get_viewname(view); + const char *table_name = hash_view_get_tablename(view); + char **partitions = hash_view_get_partitions(view); + int i; + if (num_partitions == 0) { + err->errval = VIEW_ERR_BUG; + snprintf(err->errstr, sizeof(err->errstr), "View %s has no partitions???\n", view_name); + return NULL; + } + + cols_str = sqlite3_mprintf("rowid as __hidden__rowid, "); + if (!cols_str) { + goto malloc; + } + + cols_str = _describe_row(table_name, cols_str, VIEWS_TRIGGER_QUERY, err); + if (!cols_str) { + /* preserve error, if any */ + if (err->errval != VIEW_NOERR) + return NULL; + goto malloc; + } else { + logmsg(LOGMSG_USER, "GOT cols_str as %s\n", cols_str); + } + + select_str = sqlite3_mprintf(""); + i = 0; + logmsg(LOGMSG_USER, "num partitions is : %d\n", num_partitions); + for(;i 0) ? " UNION ALL " : "", cols_str, + partitions[i]); + sqlite3_free(select_str); + if (!tmp_str) { + sqlite3_free(cols_str); + goto malloc; + } + select_str = tmp_str; + } + + ret_str = sqlite3_mprintf("CREATE VIEW %w AS %s", view_name, select_str); + if (!ret_str) { + sqlite3_free(select_str); + sqlite3_free(cols_str); + goto malloc; + } + + sqlite3_free(select_str); + sqlite3_free(cols_str); + + dbg_verbose_sqlite("Generated:\n\"%s\"\n", ret_str); + + return ret_str; + +malloc: + err->errval = VIEW_ERR_MALLOC; + snprintf(err->errstr, sizeof(err->errstr), "View %s out of memory\n", view_name); + return NULL; +} + +int hash_views_run_sql(sqlite3 *db, char *stmt, struct errstat *err) +{ + char *errstr = NULL; + int rc; + + /* create the view */ + db->isTimepartView = 1; + rc = sqlite3_exec(db, stmt, NULL, NULL, &errstr); + db->isTimepartView = 0; + if (rc != SQLITE_OK) { + err->errval = VIEW_ERR_BUG; + snprintf(err->errstr, sizeof(err->errstr), "Sqlite error \"%s\"", errstr); + /* can't control sqlite errors */ + err->errstr[sizeof(err->errstr) - 1] = '\0'; + + logmsg(LOGMSG_USER, "%s: sqlite error \"%s\" sql \"%s\"\n", __func__, errstr, stmt); + + if (errstr) + sqlite3_free(errstr); + return err->errval; + } + + /* use sqlite to add the view */ + return VIEW_NOERR; +} + +int hash_views_sqlite_add_view(hash_view_t *view, sqlite3 *db, struct errstat *err) +{ + char *stmt_str; + int rc; + + /* create the statement */ + stmt_str = hash_views_create_view_query(view, db, err); + if (!stmt_str) { + return err->errval; + } + + rc = hash_views_run_sql(db, stmt_str, err); + + logmsg(LOGMSG_USER, "+++++++++++sql: %s, rc: %d\n", stmt_str, rc); + /* free the statement */ + sqlite3_free(stmt_str); + + if (rc != VIEW_NOERR) { + return err->errval; + } + + return rc; +} + +int hash_views_sqlite_delete_view(hash_view_t *view, sqlite3 *db, struct errstat *err) +{ + return _views_sqlite_del_view(hash_view_get_viewname(view), db, err); +} + +int hash_views_sqlite_update(hash_t *views, sqlite3 *db, struct errstat *err) +{ + Table *tab; + int rc; + void *ent; + unsigned int bkt; + hash_view_t *view = NULL; + + Pthread_rwlock_rdlock(&hash_partition_lk); + for (view = (hash_view_t *)hash_first(views, &ent, &bkt); view != NULL; + view = (hash_view_t *)hash_next(views, &ent, &bkt)) { + /* check if this exists?*/ + tab = sqlite3FindTableCheckOnly(db, hash_view_get_viewname(view), NULL); + if (tab) { + /* found view, is it the same version ? */ + if (hash_view_get_sqlite_view_version(view) != tab->version) { + /* older version, destroy current view */ + rc = hash_views_sqlite_delete_view(view, db, err); + if (rc != VIEW_NOERR) { + logmsg(LOGMSG_ERROR, "%s: failed to remove old view\n", __func__); + goto done; + } + } else { + /* up to date, nothing to do */ + continue; + } + } + rc = hash_views_sqlite_add_view(view, db, err); + if (rc != VIEW_NOERR) { + goto done; + } + } + rc = VIEW_NOERR; +done: + Pthread_rwlock_unlock(&hash_partition_lk); + return rc; +} #ifdef COMDB2_UPDATEABLE_VIEWS diff --git a/schemachange/sc_add_table.c b/schemachange/sc_add_table.c index ce98759cfb..94191370ad 100644 --- a/schemachange/sc_add_table.c +++ b/schemachange/sc_add_table.c @@ -367,6 +367,12 @@ int finalize_add_table(struct ireq *iq, struct schema_change_type *s, sc_errf(s, "Failed to remove partition llmeta %d\n", err.errval); return SC_INTERNAL_ERROR; } + } else if (s->partition.type == PARTITION_ADD_COL_HASH && s->publish) { + struct errstat err = {0}; + if (hash_partition_llmeta_write(tran, s->newhashpartition, &err)) { + sc_errf(s, "failed to create partition. rc: %d - %s\n", rc, err.errstr); + return -1; + } } sc_printf(s, "Schema change ok\n"); diff --git a/schemachange/sc_callbacks.c b/schemachange/sc_callbacks.c index 6cb421a399..2f019aba70 100644 --- a/schemachange/sc_callbacks.c +++ b/schemachange/sc_callbacks.c @@ -1109,6 +1109,27 @@ static int scdone_views(const char tablename[], void *arg, scdone_t type) return rc; } +static int scdone_hash_views(const char tablename[], void *arg, scdone_t type) +{ + tran_type *tran = NULL; + uint32_t lid = 0; + int rc = 0; + int bdberr = 0; + + tran = _tran(&lid, &bdberr, __func__, __LINE__); + if (!tran) + return bdberr; + + logmsg(LOGMSG_USER, "++++++ %s : calling hash_views_update_replicant\n", __func__); + rc = hash_views_update_replicant(tran, tablename); + if (rc != 0) { + logmsg(LOGMSG_ERROR, "failed to update mod views from llmeta!\n"); + } + + _untran(tran, lid); + return rc; +} + static int scdone_llmeta_queue(const char table[], void *arg, scdone_t type) { tran_type *tran; @@ -1264,8 +1285,8 @@ int (*SCDONE_CALLBACKS[])(const char *, void *, scdone_t) = { &scdone_llmeta_queue, &scdone_genid48, &scdone_genid48, &scdone_lua_sfunc, &scdone_lua_afunc, &scdone_rename_table, &scdone_change_stripe, &scdone_user_view, &scdone_queue_file, - &scdone_queue_file, &scdone_rename_table, &scdone_alias}; - + &scdone_queue_file, &scdone_rename_table, &scdone_alias, + &scdone_hash_views}; /* TODO fail gracefully now that inline? */ /* called by bdb layer through a callback as a detached thread, * we take ownership of table string diff --git a/schemachange/sc_drop_table.c b/schemachange/sc_drop_table.c index 8c1016462e..e7a18f442f 100644 --- a/schemachange/sc_drop_table.c +++ b/schemachange/sc_drop_table.c @@ -139,6 +139,15 @@ int finalize_drop_table(struct ireq *iq, struct schema_change_type *s, sc_errf(s, "Failed to remove partition llmeta %d\n", err.errval); return SC_INTERNAL_ERROR; } + } else if (s->partition.type == PARTITION_REMOVE_COL_HASH && s->publish) { + struct errstat err = {0}; + assert(s->newshard != NULL); + rc = hash_partition_llmeta_erase(tran, s->newhashpartition, &err); + if (rc) { + sc_errf(s, "Failed to remove partition llmeta %d\n", err.errval); + logmsg(LOGMSG_USER, "Failed to remove partition llmeta. rc: %d. err: %s\n", err.errval, err.errstr); + return SC_INTERNAL_ERROR; + } } live_sc_off(db); diff --git a/schemachange/sc_struct.c b/schemachange/sc_struct.c index 51af9ca502..c2961b81d9 100644 --- a/schemachange/sc_struct.c +++ b/schemachange/sc_struct.c @@ -118,6 +118,7 @@ static size_t _partition_packed_size(struct comdb2_partition *p) switch (p->type) { case PARTITION_NONE: case PARTITION_REMOVE: + case PARTITION_REMOVE_COL_HASH: return sizeof(p->type); case PARTITION_ADD_TIMED: case PARTITION_ADD_MANUAL: @@ -126,6 +127,18 @@ static size_t _partition_packed_size(struct comdb2_partition *p) case PARTITION_MERGE: return sizeof(p->type) + sizeof(p->u.mergetable.tablename) + sizeof(p->u.mergetable.version); + case PARTITION_ADD_COL_HASH: + size_t shardNamesSize = 0; + size_t columnNamesSize = 0; + for (int i = 0; i < p->u.hash.num_partitions; i++) { + shardNamesSize += sizeof(p->u.hash.partitions[i]); + } + + for (int i = 0; i < p->u.hash.num_columns; i++) { + columnNamesSize += sizeof(p->u.hash.columns[i]); + } + return sizeof(p->type) + sizeof(p->u.hash.viewname) + sizeof(p->u.hash.num_partitions) + + sizeof(p->u.hash.num_columns) + shardNamesSize + columnNamesSize; default: logmsg(LOGMSG_ERROR, "Unimplemented partition type %d\n", p->type); abort(); @@ -311,6 +324,20 @@ void *buf_put_schemachange(struct schema_change_type *s, void *p_buf, void *p_bu p_buf = buf_put(&s->partition.u.mergetable.version, sizeof(s->partition.u.mergetable.version), p_buf, p_buf_end); break; + } + case PARTITION_ADD_COL_HASH: { + p_buf = buf_no_net_put(s->partition.u.hash.viewname, sizeof(s->partition.u.hash.viewname), p_buf, p_buf_end); + p_buf = buf_put(&s->partition.u.hash.num_columns, sizeof(s->partition.u.hash.num_columns), p_buf, p_buf_end); + for (int i = 0; i < s->partition.u.hash.num_columns; i++) { + p_buf = + buf_no_net_put(s->partition.u.hash.columns[i], sizeof(s->partition.u.hash.columns[i]), p_buf, p_buf_end); + } + p_buf = buf_put(&s->partition.u.hash.num_partitions, sizeof(s->partition.u.hash.num_partitions), p_buf, p_buf_end); + for (int i = 0; i < s->partition.u.hash.num_partitions; i++) { + p_buf = + buf_no_net_put(s->partition.u.hash.partitions[i], sizeof(s->partition.u.hash.partitions[i]), p_buf, p_buf_end); + } + break; } } @@ -735,6 +762,23 @@ void *buf_get_schemachange_v2(struct schema_change_type *s, p_buf_end); break; } + case PARTITION_ADD_COL_HASH: { + p_buf = (uint8_t *)buf_no_net_get(s->partition.u.hash.viewname, sizeof(s->partition.u.hash.viewname), p_buf, + p_buf_end); + p_buf = (uint8_t *)buf_get(&s->partition.u.hash.num_columns, sizeof(s->partition.u.hash.num_columns), p_buf, + p_buf_end); + for (int i = 0; i < s->partition.u.hash.num_columns; i++) { + p_buf = (uint8_t *)buf_no_net_get(s->partition.u.hash.columns[i], sizeof(s->partition.u.hash.columns[i]), + p_buf, p_buf_end); + } + p_buf = + (uint8_t *)buf_get(&s->partition.u.hash.num_partitions, sizeof(s->partition.u.hash.num_partitions), p_buf, p_buf_end); + for (int i = 0; i < s->partition.u.hash.num_partitions; i++) { + p_buf = (uint8_t *)buf_no_net_get(s->partition.u.hash.partitions[i], sizeof(s->partition.u.hash.partitions[i]), p_buf, + p_buf_end); + } + break; + } } return p_buf; diff --git a/schemachange/schemachange.h b/schemachange/schemachange.h index 54cc2d3ee2..ac2ab868f3 100644 --- a/schemachange/schemachange.h +++ b/schemachange/schemachange.h @@ -23,7 +23,7 @@ #include #include #include - +#include "hash_partition.h" /* To be forward declared one accessors methods are added */ /* A schema change plan. */ @@ -68,6 +68,7 @@ enum comdb2_partition_type { PARTITION_ADD_MANUAL = 21, PARTITION_ADD_COL_RANGE = 40, PARTITION_ADD_COL_HASH = 60, + PARTITION_REMOVE_COL_HASH = 80, }; struct comdb2_partition { @@ -83,6 +84,15 @@ struct comdb2_partition { char tablename[MAXTABLELEN]; int version; } mergetable; + struct hash { + char viewname[MAXTABLELEN]; + uint32_t num_partitions; + uint32_t num_columns; // in partitoning key + char columns[MAXCOLUMNS][MAXCOLNAME]; + uint32_t keys[MAXPARTITIONS]; + char partitions[MAXPARTITIONS][MAXPARTITIONLEN]; + char *createQuery; + } hash; } u; }; @@ -252,6 +262,7 @@ struct schema_change_type { struct dbtable *db; struct dbtable *newdb; struct timepart_view *newpartition; + hash_view_t *newhashpartition; struct scplan plan; /**** TODO This is an abomination, i know. Yet still much better than on the stack where I found it. At least this datastructure lives as much as the diff --git a/sqlite/CMakeLists.txt b/sqlite/CMakeLists.txt index dd9623a467..9939d5c76a 100644 --- a/sqlite/CMakeLists.txt +++ b/sqlite/CMakeLists.txt @@ -25,6 +25,7 @@ add_library(sqlite ext/comdb2/files_util.c ext/comdb2/fingerprints.c ext/comdb2/functions.c + ext/comdb2/hashpartitions.c ext/comdb2/indexuse.c ext/comdb2/keycomponents.c ext/comdb2/keys.c diff --git a/sqlite/ext/comdb2/comdb2systblInt.h b/sqlite/ext/comdb2/comdb2systblInt.h index 346abeb4cb..ed9cbe7790 100644 --- a/sqlite/ext/comdb2/comdb2systblInt.h +++ b/sqlite/ext/comdb2/comdb2systblInt.h @@ -106,7 +106,7 @@ int systblSchemaVersionsInit(sqlite3 *db); int systblTableMetricsInit(sqlite3 *db); int systblApiHistoryInit(sqlite3 *db); int systblDbInfoInit(sqlite3 *db); - +int systblHashPartitionsInit(sqlite3*db); /* Simple yes/no answer for booleans */ #define YESNO(x) ((x) ? "Y" : "N") diff --git a/sqlite/ext/comdb2/hashpartitions.c b/sqlite/ext/comdb2/hashpartitions.c new file mode 100644 index 0000000000..73cbf19be9 --- /dev/null +++ b/sqlite/ext/comdb2/hashpartitions.c @@ -0,0 +1,44 @@ +/* + Copyright 2019-2020 Bloomberg Finance L.P. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ +#if (!defined(SQLITE_CORE) || defined(SQLITE_BUILDING_FOR_COMDB2)) \ + && !defined(SQLITE_OMIT_VIRTUALTABLE) + +#if defined(SQLITE_BUILDING_FOR_COMDB2) && !defined(SQLITE_CORE) +# define SQLITE_CORE 1 +#endif + +#include "comdb2systbl.h" +#include "ezsystables.h" +#include "hash_partition.h" + +sqlite3_module systblHashPartitionsModule = { + .access_flag = CDB2_ALLOW_USER, + .systable_lock = "comdb2_hashpartitions", +}; + +int systblHashPartitionsInit(sqlite3 *db) +{ + return create_system_table( + db, "comdb2_hashpartitions", &systblHashPartitionsModule, + hash_systable_collect, + hash_systable_free, sizeof(systable_hashpartition_t), + CDB2_CSTRING, "name", -1, offsetof(systable_hashpartition_t, name), + CDB2_CSTRING, "shardname", -1, offsetof(systable_hashpartition_t, shardname), + CDB2_INTEGER, "minKey", -1, offsetof(systable_hashpartition_t, minKey), + CDB2_INTEGER, "maxKey", -1, offsetof(systable_hashpartition_t, maxKey), + SYSTABLE_END_OF_FIELDS); +} +#endif diff --git a/sqlite/ext/comdb2/tables.c b/sqlite/ext/comdb2/tables.c index 63ab8d4572..f063fb97ce 100644 --- a/sqlite/ext/comdb2/tables.c +++ b/sqlite/ext/comdb2/tables.c @@ -248,6 +248,8 @@ int comdb2SystblInit( rc = systblApiHistoryInit(db); if (rc == SQLITE_OK) rc = systblDbInfoInit(db); + if (rc == SQLITE_OK) + rc = systblHashPartitionsInit(db); #endif return rc; } diff --git a/sqlite/src/build.c b/sqlite/src/build.c index c27dfe2a92..e44216d296 100644 --- a/sqlite/src/build.c +++ b/sqlite/src/build.c @@ -30,7 +30,7 @@ #include "comdb2Int.h" #include "pragma.h" #include "logmsg.h" - +#include "hash_partition.h" int has_comdb2_index_for_sqlite(Table *pTab); int is_comdb2_index_unique(const char *dbname, char *idx); const char* fdb_parse_comdb2_remote_dbname(const char *zDatabase, const char **fqDbname); @@ -693,6 +693,13 @@ Table *sqlite3LocateTable( } p = sqlite3FindTable(db, zName, zDbase); + /* I have to do a second lookup here for generic sharding : + * zDbase is being set to "main" for inserts/deletes/updates from triggers" + * which results in the alias for the remote table not being found + * AAR: TODO -> figure out the actual cause of the problem*/ + if ( p==0 ) { + p = sqlite3FindTable(db, zName, NULL); + } if( p==0 ){ #ifndef SQLITE_OMIT_VIRTUALTABLE /* If zName is the not the name of a table in the schema created using @@ -2792,7 +2799,9 @@ void sqlite3CreateView( sqlite3TwoPartName(pParse, pName1, pName2, &pName); iDb = sqlite3SchemaToIndex(db, p->pSchema); sqlite3FixInit(&sFix, pParse, iDb, "view", pName); +#ifndef SQLITE_BUILDING_FOR_COMDB2 if( sqlite3FixSelect(&sFix, pSelect) ) goto create_view_fail; +#endif /* Make a copy of the entire SELECT statement that defines the view. ** This will force all the Expr.token.z values to be dynamically @@ -3313,7 +3322,7 @@ void sqlite3DropTable(Parse *pParse, SrcList *pName, int isView, int noErr){ } if( !isView && pTab->pSelect ){ #if defined(SQLITE_BUILDING_FOR_COMDB2) - if (timepart_allow_drop(pTab->zName)) { + if (timepart_allow_drop(pTab->zName) && !is_hash_partition(pTab->zName)) { #endif /* defined(SQLITE_BUILDING_FOR_COMDB2) */ sqlite3ErrorMsg(pParse, "use DROP VIEW to delete view %s", pTab->zName); goto exit_drop_table; diff --git a/sqlite/src/comdb2build.c b/sqlite/src/comdb2build.c index 93498012a4..528fbbe090 100644 --- a/sqlite/src/comdb2build.c +++ b/sqlite/src/comdb2build.c @@ -28,6 +28,7 @@ #include "db_access.h" /* gbl_check_access_controls */ #include "comdb2_atomic.h" #include "alias.h" +#include "dohsql.h" #define COMDB2_INVALID_AUTOINCREMENT "invalid datatype for autoincrement" @@ -39,13 +40,16 @@ extern int gbl_ddl_cascade_drop; extern int gbl_legacy_schema; extern int gbl_permit_small_sequences; extern int gbl_lightweight_rename; - +extern int gbl_create_remote_tables; +extern char gbl_dbname[MAX_DBNAME_LENGTH]; int gbl_view_feature = 1; extern int sqlite3GetToken(const unsigned char *z, int *tokenType); extern int sqlite3ParserFallback(int iToken); extern int comdb2_save_ddl_context(char *name, void *ctx, comdb2ma mem); extern void *comdb2_get_ddl_context(char *name); +int createRemoteTables(struct comdb2_partition *partition); +int createLocalAliases(struct comdb2_partition *partition); /******************* Utility ****************************/ static inline int setError(Parse *pParse, int rc, const char *msg) @@ -186,21 +190,25 @@ static inline int chkAndCopyTable(Parse *pParse, char *dst, const char *name, if (authenticateSC(dst, pParse)) goto cleanup; + logmsg(LOGMSG_USER, "DST IS %s\n", dst); char *firstshard = timepart_shard_name(dst, 0, 0, NULL); if(!firstshard) { struct dbtable *db = get_dbtable_by_name(dst); - + hash_view_t *hash_view = NULL; + hash_get_inmem_view(dst, &hash_view); if (table_exists) { *table_exists = (db) ? 1 : 0; } - if (db == NULL && (error_flag == ERROR_ON_TBL_NOT_FOUND)) { + if ((db == NULL && hash_view == NULL) && (error_flag == ERROR_ON_TBL_NOT_FOUND)) { + logmsg(LOGMSG_ERROR, "COULDN'T FIND VIEW\n"); rc = setError(pParse, SQLITE_ERROR, "Table not found"); goto cleanup; - } + } + struct dbview *view = get_view_by_name(dst); - if ((db != NULL || view != NULL) && + if ((db != NULL || view != NULL || hash_view != NULL) && (error_flag == ERROR_ON_TBL_FOUND)) { rc = setError(pParse, SQLITE_ERROR, "Table already exists"); goto cleanup; @@ -837,7 +845,8 @@ void comdb2AlterTableCSC2( void comdb2DropTable(Parse *pParse, SrcList *pName) { char *partition_first_shard = NULL; - + hash_view_t *hashView = NULL; + char *viewName = NULL; if (comdb2IsPrepareOnly(pParse)) return; @@ -867,6 +876,7 @@ void comdb2DropTable(Parse *pParse, SrcList *pName) sc->same_schema = 1; sc->kind = SC_DROPTABLE; sc->nothrevent = 1; + hash_get_inmem_view(sc->tablename, &hashView); if(comdb2IsDryrun(pParse)){ if(comdb2SCIsDryRunnable(sc)){ sc->dryrun = 1; @@ -879,6 +889,11 @@ void comdb2DropTable(Parse *pParse, SrcList *pName) if (partition_first_shard) sc->partition.type = PARTITION_REMOVE; + if (hashView) { + sc->partition.type = PARTITION_REMOVE_COL_HASH; + strncpy0(sc->tablename, hash_view_get_tablename(hashView), MAXTABLELEN); + logmsg(LOGMSG_USER, "SC->TABLENAME is %s\n", sc->tablename); + } tran_type *tran = curtran_gettran(); int rc = get_csc2_file_tran(partition_first_shard ? partition_first_shard : sc->tablename, -1 , &sc->newcsc2, NULL, tran); @@ -890,6 +905,12 @@ void comdb2DropTable(Parse *pParse, SrcList *pName) setError(pParse, SQLITE_ERROR, "Table schema cannot be found"); goto out; } + + /* We've validated schema file */ + if (hashView) { + strncpy0(sc->tablename, hash_view_get_viewname(hashView), MAXTABLELEN); + } + logmsg(LOGMSG_USER, "%s DROPPING TABLE %s. partition type : %d\n", __func__, sc->tablename, sc->partition.type); if(sc->dryrun) comdb2prepareSString(v, pParse, 0, sc, &comdb2SqlDryrunSchemaChange, (vdbeFuncArgFree) &free_schema_change_type); @@ -897,6 +918,7 @@ void comdb2DropTable(Parse *pParse, SrcList *pName) comdb2PrepareSC(v, pParse, 0, sc, &comdb2SqlSchemaChange_usedb, (vdbeFuncArgFree)&free_schema_change_type); free(partition_first_shard); + free(viewName); return; out: @@ -5018,7 +5040,15 @@ void comdb2CreateTableEnd( if (sc == 0) goto oom; - memcpy(sc->tablename, ctx->tablename, MAXTABLELEN); + if (ctx->partition && ctx->partition->type == PARTITION_ADD_COL_HASH) { + char tmp_str[MAXTABLELEN] = {0}; + strcpy(ctx->partition->u.hash.viewname, ctx->tablename); + strcpy(tmp_str , "hash_"); + strcpy(tmp_str + 5, ctx->tablename); + strcpy(sc->tablename, tmp_str); + } else { + memcpy(sc->tablename, ctx->tablename, MAXTABLELEN); + } sc->kind = SC_ADDTABLE; sc->nothrevent = 1; @@ -7690,3 +7720,94 @@ void create_default_consumer_sp(Parse *p, char *spname) comdb2prepareNoRows(v, p, 0, sc, &comdb2SqlSchemaChange, (vdbeFuncArgFree)&free_schema_change_type); } +static int comdb2GetHashPartitionParams(Parse* pParse, IdList *pColumn, IdList *pPartitions, + char cols[][MAXCOLNAME], uint32_t *oNumPartitions, uint32_t *oNumColumns, + char partitions[][MAXPARTITIONLEN]) +{ + int column_exists = 0; + struct comdb2_column *cur_col; + struct comdb2_ddl_context *ctx = pParse->comdb2_ddl_ctx; + if (ctx == 0) { + /* An error must have been set. */ + assert(pParse->rc != 0); + return -1; + } + + assert(pColumn!=0 && pColumn->nId>0); + + /* Copy the sharding key names after asserting their existence in the table*/ + *oNumColumns = pColumn->nId; + int i; + for (i=0; i nId;i++) { + column_exists = 0; + LISTC_FOR_EACH(&ctx->schema->column_list, cur_col, lnk) + { + if ((strcasecmp(pColumn->a[i].zName, cur_col->name) == 0)) { + column_exists = 1; + break; + } + } + if (column_exists==0) { + setError(pParse, SQLITE_MISUSE, comdb2_asprintf("Column %s does not exist", + pColumn->a[i].zName)); + return -1; + } else { + // column exists, copy it + strncpy0(cols[i], pColumn->a[i].zName, strlen(pColumn->a[i].zName) + 1); + } + } + + /*Copy the table partition names*/ + *oNumPartitions = pPartitions->nId; + for(i=0;inId;i++) { + strncpy0(partitions[i], pPartitions->a[i].zName, strlen(pPartitions->a[i].zName)+1); + } + return 0; +} + +void comdb2CreateHashPartition(Parse *pParse, IdList *pColumn, IdList *pPartitions) +{ + struct comdb2_partition *partition; + if (!gbl_partitioned_table_enabled) { + setError(pParse, SQLITE_ABORT, "Create partitioned table not enabled"); + return; + } + + partition = _get_partition(pParse, 0); + if (!partition) + return; + + partition->type = PARTITION_ADD_COL_HASH; + if (comdb2GetHashPartitionParams(pParse, pColumn, pPartitions, + partition->u.hash.columns, + (uint32_t*)&partition->u.hash.num_partitions, + (uint32_t*)&partition->u.hash.num_columns, + partition->u.hash.partitions)) { + free_ddl_context(pParse); + return; + } + + GET_CLNT; + if (clnt && clnt->sql) { + logmsg(LOGMSG_USER, "The sql query is %s\n", clnt->sql); + partition->u.hash.createQuery = clnt->sql; + } else { + if (!clnt) { + logmsg(LOGMSG_USER, "The client object is not available\n"); + abort(); + } + } + /* TODO : ENABLE THIS THROUGH SQL + * We seem to have hit a limit in sqlite on the number of tokens + * The result is - adding more tokens crashes the planner in random places. + * In the interest on not being blocked for now, i've decided to proceed with a tunable + * I will revisit and fix the broader issue later + */ + if (gbl_create_remote_tables){ + logmsg(LOGMSG_USER, "CREATING REMOTE TABLES +++++++++++++++\n"); + if (createRemoteTables(partition)) { + free_ddl_context(pParse); + setError(pParse, SQLITE_ABORT, "Failed to create remote tables"); + } + } +} diff --git a/sqlite/src/comdb2build.h b/sqlite/src/comdb2build.h index 3464fe6e8a..a319a6bce1 100644 --- a/sqlite/src/comdb2build.h +++ b/sqlite/src/comdb2build.h @@ -126,6 +126,7 @@ void comdb2DropPartition(Parse* p, Token* name); void comdb2CreateTimePartition(Parse* p, Token* period, Token* retention, Token* start); void comdb2CreateManualPartition(Parse* p, Token* retention, Token* start); +void comdb2CreateHashPartition(Parse* p, IdList *, IdList *); void comdb2SaveMergeTable(Parse* p, Token* name, Token* database, int alter); void comdb2analyze(Parse*, int opt, Token*, Token*, int, int); diff --git a/sqlite/src/parse.y b/sqlite/src/parse.y index e89b8274e4..8af103dd25 100644 --- a/sqlite/src/parse.y +++ b/sqlite/src/parse.y @@ -250,6 +250,9 @@ partition_options ::= MANUAL RETENTION INTEGER(R) START INTEGER(S). { partition_options ::= MANUAL RETENTION INTEGER(R). { comdb2CreateManualPartition(pParse, &R, 0); } +partition_options ::= LP idlist(A) RP PARTITIONS LP idlist(B) RP. { + comdb2CreateHashPartition(pParse, A, B); +} merge ::= . merge ::= merge_with. merge_with ::= MERGE nm(Y) dbnm(Z). { @@ -348,7 +351,7 @@ columnname(A) ::= nm(A) typetoken(Y). {sqlite3AddColumn(pParse,&A,&Y);} DISTRIBUTION DRYRUN ENABLE EXCLUSIVE_ANALYZE EXEC EXECUTE FORCE FUNCTION GENID48 GET GRANT INCLUDE INCREMENT IPU ISC KW LUA LZ4 MANUAL MERGE NONE ODH OFF OP OPTION OPTIONS - PAGEORDER PARTITIONED PASSWORD PAUSE PERIOD PENDING PROCEDURE PUT + PAGEORDER PARTITIONED PARTITIONS PASSWORD PAUSE PERIOD PENDING PROCEDURE PUT REBUILD READ READONLY REC RESERVED RESUME RETENTION REVOKE RLE ROWLOCKS SCALAR SCHEMACHANGE SKIPSCAN START SUMMARIZE TESTDEFAULT THREADS THRESHOLD TIME TRUNCATE TUNABLE TYPE diff --git a/sqlite/tool/mkkeywordhash.c b/sqlite/tool/mkkeywordhash.c index d0f6c2cc0a..174a1c2b9a 100644 --- a/sqlite/tool/mkkeywordhash.c +++ b/sqlite/tool/mkkeywordhash.c @@ -377,6 +377,7 @@ static Keyword aKeywordTable[] = { { "OFF", "TK_OFF", ALWAYS }, { "PAGEORDER", "TK_PAGEORDER", ALWAYS }, { "PARTITIONED", "TK_PARTITIONED", ALWAYS }, + { "PARTITIONS", "TK_PARTITIONS", ALWAYS }, { "PASSWORD", "TK_PASSWORD", ALWAYS }, { "PAUSE", "TK_PAUSE", ALWAYS }, { "PERIOD", "TK_PERIOD", ALWAYS }, diff --git a/tests/tools/test_consistent_hash.c b/tests/tools/test_consistent_hash.c index 01a5a95c88..843f6efa17 100644 --- a/tests/tools/test_consistent_hash.c +++ b/tests/tools/test_consistent_hash.c @@ -4,11 +4,6 @@ #include #include "consistent_hash.h" #include -enum ch_hash_func_type { - CH_HASH_SHA = 1, - CH_HASH_MD5 = 2, - CH_HASH_CRC = 3 -}; void test_add_and_find_one_node(hash_func func, int func_type) { ch_hash_t *ch = ch_hash_create(1, func);