diff --git a/pkg/ccl/crosscluster/logical/BUILD.bazel b/pkg/ccl/crosscluster/logical/BUILD.bazel index c2fac884a9a8..84e3fa4ccf55 100644 --- a/pkg/ccl/crosscluster/logical/BUILD.bazel +++ b/pkg/ccl/crosscluster/logical/BUILD.bazel @@ -49,12 +49,14 @@ go_library( "//pkg/sql/pgwire/pgcode", "//pkg/sql/pgwire/pgerror", "//pkg/sql/physicalplan", + "//pkg/sql/privilege", "//pkg/sql/rowenc", "//pkg/sql/rowexec", "//pkg/sql/sem/catid", "//pkg/sql/sem/eval", "//pkg/sql/sem/tree", "//pkg/sql/sessiondata", + "//pkg/sql/syntheticprivilege", "//pkg/sql/types", "//pkg/util/ctxgroup", "//pkg/util/hlc", diff --git a/pkg/ccl/crosscluster/logical/create_logical_replication_stmt.go b/pkg/ccl/crosscluster/logical/create_logical_replication_stmt.go index 5dba0ca309e8..8de1bb74fd07 100644 --- a/pkg/ccl/crosscluster/logical/create_logical_replication_stmt.go +++ b/pkg/ccl/crosscluster/logical/create_logical_replication_stmt.go @@ -26,7 +26,9 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/exprutil" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" + "github.com/cockroachdb/cockroach/pkg/sql/privilege" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/syntheticprivilege" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/errors" @@ -77,6 +79,14 @@ func createLogicalReplicationStreamPlanHook( return err } + // TODO(dt): the global priv is a big hammer; should we be checking just on + // table(s) or database being replicated from and into? + if err := p.CheckPrivilege( + ctx, syntheticprivilege.GlobalPrivilegeObject, privilege.REPLICATION, + ); err != nil { + return err + } + if !stmt.Options.IsDefault() { return errors.UnimplementedErrorf(issuelink.IssueLink{}, "logical replication stream options are not yet supported") } @@ -105,6 +115,28 @@ func createLogicalReplicationStreamPlanHook( repPairs[i].DstDescriptorID = int32(td.GetID()) + // TODO(dt): remove when we support this via KV metadata. + var foundTSCol bool + for _, col := range td.GetColumns() { + if col.Name == originTimestampColumnName { + foundTSCol = true + if col.Type.Family() != types.DecimalFamily { + return errors.Newf( + "%s column must be type DECIMAL for use by logical replication", originTimestampColumnName, + ) + } + break + } + } + if !foundTSCol { + return errors.WithHintf(errors.Newf( + "tables written to by logical replication currently require a %q DECIMAL column", + originTimestampColumnName, + ), "try 'ALTER %s ADD COLUMN %s DECIMAL NOT VISIBLE DEFAULT NULL ON UPDATE NULL", + dstObjName.String(), originTimestampColumnName, + ) + } + tbNameWithSchema := tree.MakeTableNameWithSchema( tree.Name(prefix.Database.GetName()), tree.Name(prefix.Schema.GetName()), diff --git a/pkg/ccl/crosscluster/logical/logical_replication_job_test.go b/pkg/ccl/crosscluster/logical/logical_replication_job_test.go index ce8a477bbdaa..9904edb846eb 100644 --- a/pkg/ccl/crosscluster/logical/logical_replication_job_test.go +++ b/pkg/ccl/crosscluster/logical/logical_replication_job_test.go @@ -194,6 +194,48 @@ func TestLogicalStreamIngestionJob(t *testing.T) { require.Equal(t, int64(expCPuts), numCPuts.Load()) } +func TestLogicalStreamIngestionErrors(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + + server := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{}) + defer server.Stopper().Stop(ctx) + s := server.Server(0).ApplicationLayer() + url, cleanup := s.PGUrl(t, serverutils.DBName("a")) + defer cleanup() + urlA := url.String() + + _, err := server.Conns[0].Exec("CREATE DATABASE a") + require.NoError(t, err) + _, err = server.Conns[0].Exec("CREATE DATABASE B") + require.NoError(t, err) + + dbA := sqlutils.MakeSQLRunner(s.SQLConn(t, serverutils.DBName("a"))) + dbB := sqlutils.MakeSQLRunner(s.SQLConn(t, serverutils.DBName("b"))) + + createStmt := "CREATE TABLE tab (pk int primary key, payload string)" + dbA.Exec(t, createStmt) + dbB.Exec(t, createStmt) + + createQ := "CREATE LOGICAL REPLICATION STREAM FROM TABLE tab ON $1 INTO TABLE tab" + + dbB.ExpectErrWithHint(t, "currently require a .* DECIMAL column", "ADD COLUMN", createQ, urlA) + + dbB.Exec(t, "ALTER TABLE tab ADD COLUMN crdb_internal_origin_timestamp STRING") + dbB.ExpectErr(t, ".*column must be type DECIMAL for use by logical replication", createQ, urlA) + + dbB.Exec(t, fmt.Sprintf("ALTER TABLE tab RENAME COLUMN %[1]s TO str_col, ADD COLUMN %[1]s DECIMAL", originTimestampColumnName)) + + if s.Codec().IsSystem() { + dbB.ExpectErr(t, "kv.rangefeed.enabled must be enabled on the source cluster for logical replication", createQ, urlA) + kvserver.RangefeedEnabled.Override(ctx, &server.Server(0).ClusterSettings().SV, true) + } + + dbB.Exec(t, createQ, urlA) +} + func TestLogicalStreamIngestionJobWithColumnFamilies(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) diff --git a/pkg/ccl/crosscluster/producer/replication_manager.go b/pkg/ccl/crosscluster/producer/replication_manager.go index e74ae5a4e1e7..8162014f77f7 100644 --- a/pkg/ccl/crosscluster/producer/replication_manager.go +++ b/pkg/ccl/crosscluster/producer/replication_manager.go @@ -16,6 +16,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/ccl/utilccl" "github.com/cockroachdb/cockroach/pkg/jobs/jobsprotectedts" "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb" "github.com/cockroachdb/cockroach/pkg/repstream" "github.com/cockroachdb/cockroach/pkg/repstream/streampb" @@ -33,6 +34,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/syntheticprivilege" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/uuid" + "github.com/cockroachdb/errors" ) type replicationStreamManagerImpl struct { @@ -60,6 +62,12 @@ func (r *replicationStreamManagerImpl) StartReplicationStreamForTables( return streampb.ReplicationProducerSpec{}, err } + execConfig := r.evalCtx.Planner.ExecutorConfig().(*sql.ExecutorConfig) + + if execConfig.Codec.IsSystem() && !kvserver.RangefeedEnabled.Get(&execConfig.Settings.SV) { + return streampb.ReplicationProducerSpec{}, errors.Errorf("kv.rangefeed.enabled must be enabled on the source cluster for logical replication") + } + var replicationStartTime hlc.Timestamp if !req.ReplicationStartTime.IsEmpty() { replicationStartTime = req.ReplicationStartTime @@ -87,7 +95,6 @@ func (r *replicationStreamManagerImpl) StartReplicationStreamForTables( tableDescs[name] = td.TableDescriptor } - execConfig := r.evalCtx.Planner.ExecutorConfig().(*sql.ExecutorConfig) registry := execConfig.JobRegistry ptsID := uuid.MakeV4() jr := makeProducerJobRecordForLogicalReplication( diff --git a/pkg/sql/logictest/testdata/logic_test/distsql_stats b/pkg/sql/logictest/testdata/logic_test/distsql_stats index b40034cbffec..49b1f34b5030 100644 --- a/pkg/sql/logictest/testdata/logic_test/distsql_stats +++ b/pkg/sql/logictest/testdata/logic_test/distsql_stats @@ -2577,7 +2577,7 @@ SHOW STATISTICS USING JSON FOR TABLE only_null; statement ok ALTER TABLE only_null INJECT STATISTICS '$only_null_stat'; -statement error pq: only NULL values exist in the index, so partial stats cannot be collected +statement error pq: only outer or NULL bounded buckets exist in the index, so partial stats cannot be collected CREATE STATISTICS only_null_partial ON a FROM only_null USING EXTREMES; statement ok @@ -2908,6 +2908,178 @@ SHOW HISTOGRAM $hist_crdb_internal_idx_expr upper_bound range_rows distinct_range_rows equal_rows '{"bar": {"baz": 5}}' 0 0 1 +# Verify that the correct partial predicate is used for partial stats using +# extremes when outer buckets exist (int column type). +statement ok +CREATE TABLE int_outer_buckets (a PRIMARY KEY) AS SELECT generate_series(0, 9999); + +statement ok +CREATE STATISTICS int_outer_buckets_full ON a FROM int_outer_buckets; + +let $hist_id_int_outer_buckets_full +SELECT histogram_id FROM [SHOW STATISTICS FOR TABLE int_outer_buckets] WHERE statistics_name = 'int_outer_buckets_full' + +# The full stats collection should have added 2 outer buckets for a total of 202 +# with upper bounds of MaxInt64 and MinInt64. +query I +SELECT count(*) FROM [SHOW HISTOGRAM $hist_id_int_outer_buckets_full] +---- +202 + +statement ok +INSERT INTO int_outer_buckets SELECT generate_series(-10, -1) UNION ALL SELECT generate_series(10000, 10009); + +statement ok +CREATE STATISTICS int_outer_buckets_partial ON a FROM int_outer_buckets USING EXTREMES; + +# The partial stat predicate should not include MaxInt64 and MinInt64 from the +# outer buckets and should count 20 rows beyond the extremes. +query TTII colnames +SELECT "statistics_name", "partial_predicate", "row_count", "null_count" +FROM [SHOW STATISTICS FOR TABLE int_outer_buckets] +WHERE statistics_name = 'int_outer_buckets_partial' +---- +statistics_name partial_predicate row_count null_count +int_outer_buckets_partial (a IS NULL) OR ((a < 0:::INT8) OR (a > 9999:::INT8)) 20 0 + +# Verify that we don't ignore buckets with actual max and min values when +# creating partial stats using extremes. +statement ok +INSERT INTO int_outer_buckets VALUES (-9223372036854775808), (9223372036854775807); + +statement ok +SET CLUSTER SETTING sql.stats.histogram_samples.count = 10050; + +statement ok +CREATE STATISTICS int_outer_buckets_full ON a FROM int_outer_buckets; + +statement ok +CREATE STATISTICS int_outer_buckets_partial ON a FROM int_outer_buckets USING EXTREMES; + +# The partial stat predicate should include MaxInt64 and MinInt64 and should +# count no rows beyond the extremes. +query TTII colnames +SELECT "statistics_name", "partial_predicate", "row_count", "null_count" +FROM [SHOW STATISTICS FOR TABLE int_outer_buckets] +WHERE statistics_name = 'int_outer_buckets_partial' +---- +statistics_name partial_predicate row_count null_count +int_outer_buckets_partial (a IS NULL) OR ((a < (-9223372036854775808):::INT8) OR (a > 9223372036854775807:::INT8)) 0 0 + +# Verify that the correct partial predicate is used for partial stats using +# extremes when outer buckets exist (timestamp column type). +statement ok +CREATE TABLE timestamp_outer_buckets (a TIMESTAMP PRIMARY KEY); + +statement ok +INSERT INTO timestamp_outer_buckets VALUES + ('2024-06-26 01:00:00'), + ('2024-06-26 02:00:00'), + ('2024-06-27 01:30:00'), + ('2024-06-27 02:30:00'); + +statement ok +CREATE STATISTICS timestamp_outer_buckets_full ON a FROM timestamp_outer_buckets; + +let $hist_id_timestamp_outer_buckets_full +SELECT histogram_id FROM [SHOW STATISTICS FOR TABLE timestamp_outer_buckets] WHERE statistics_name = 'timestamp_outer_buckets_full' + +# The full stats collection should not have added outer buckets. +query I +SELECT count(*) FROM [SHOW HISTOGRAM $hist_id_timestamp_outer_buckets_full] +---- +4 + +statement ok +INSERT INTO timestamp_outer_buckets VALUES + ('2024-06-26 00:00:00'), + ('2024-06-27 03:30:00'); + +statement ok +CREATE STATISTICS timestamp_outer_buckets_partial ON a FROM timestamp_outer_buckets USING EXTREMES; + +# The partial stat should not ignore any buckets and have the correct predicate. +query TTII colnames +SELECT "statistics_name", "partial_predicate", "row_count", "null_count" +FROM [SHOW STATISTICS FOR TABLE timestamp_outer_buckets] +WHERE statistics_name = 'timestamp_outer_buckets_partial' +---- +statistics_name partial_predicate row_count null_count +timestamp_outer_buckets_partial (a IS NULL) OR ((a < '2024-06-26 01:00:00':::TIMESTAMP) OR (a > '2024-06-27 02:30:00':::TIMESTAMP)) 2 0 + +# Inject a full statistic with outer buckets, overriding the previous stats. +statement ok +ALTER TABLE timestamp_outer_buckets INJECT STATISTICS '[ + { + "avg_size": 7, + "columns": [ + "a" + ], + "created_at": "2024-06-27 19:00:16.450303", + "distinct_count": 4, + "histo_buckets": [ + { + "distinct_range": 0.000001, + "num_eq": 0, + "num_range": 0, + "upper_bound": "4714-11-24 00:00:00 BC" + }, + { + "distinct_range": 0, + "num_eq": 1, + "num_range": 0, + "upper_bound": "2024-06-26 01:00:00" + }, + { + "distinct_range": 0, + "num_eq": 1, + "num_range": 0, + "upper_bound": "2024-06-26 02:00:00" + }, + { + "distinct_range": 0, + "num_eq": 1, + "num_range": 0, + "upper_bound": "2024-06-27 01:30:00" + }, + { + "distinct_range": 0, + "num_eq": 1, + "num_range": 0, + "upper_bound": "2024-06-27 02:30:00" + }, + { + "distinct_range": 0.000001, + "num_eq": 0, + "num_range": 0, + "upper_bound": "294276-12-31 23:59:59.999999" + } + ], + "histo_col_type": "TIMESTAMP", + "histo_version": 3, + "name": "timestamp_outer_buckets_full", + "null_count": 0, + "row_count": 4 + } +]' + +statement ok +INSERT INTO timestamp_outer_buckets VALUES ('2024-06-28 01:00:00'); + +statement ok +CREATE STATISTICS timestamp_outer_buckets_partial ON a FROM timestamp_outer_buckets USING EXTREMES; + +# The partial stat predicate should not include MaxSupportedTime and +# MinSupportedTime from the outer buckets and should count 3 rows beyond the +# extremes. +query TTII colnames +SELECT "statistics_name", "partial_predicate", "row_count", "null_count" +FROM [SHOW STATISTICS FOR TABLE timestamp_outer_buckets] +WHERE statistics_name = 'timestamp_outer_buckets_partial' +---- +statistics_name partial_predicate row_count null_count +timestamp_outer_buckets_partial (a IS NULL) OR ((a < '2024-06-26 01:00:00':::TIMESTAMP) OR (a > '2024-06-27 02:30:00':::TIMESTAMP)) 3 0 + statement ok RESET enable_create_stats_using_extremes diff --git a/pkg/sql/stats/bounds/extremes.go b/pkg/sql/stats/bounds/extremes.go index 2b5cd12021de..4d3421b477a3 100644 --- a/pkg/sql/stats/bounds/extremes.go +++ b/pkg/sql/stats/bounds/extremes.go @@ -88,15 +88,25 @@ func ConstructUsingExtremesSpans( func GetUsingExtremesBounds( ctx context.Context, evalCtx *eval.Context, histogram []cat.HistogramBucket, ) (lowerBound tree.Datum, upperBound tree.Datum, _ error) { + // Full stats collections sometimes add buckets with column type max/min upper + // bounds above and below the observed max and min values to account for extra + // distinct count (see addOuterBuckets()) and should be ignored. + isOuterBucket := func(bucket *cat.HistogramBucket) bool { + return (bucket.UpperBound.IsMin(ctx, evalCtx) || bucket.UpperBound.IsMax(ctx, evalCtx)) && bucket.NumEq == 0 + } upperBound = histogram[len(histogram)-1].UpperBound - // Pick the earliest lowerBound that is not null, + if len(histogram) > 1 && isOuterBucket(&histogram[len(histogram)-1]) { + upperBound = histogram[len(histogram)-2].UpperBound + } + + // Pick the earliest lowerBound that is not null and isn't an outer bucket, // but if none exist, return error for i := range histogram { hist := &histogram[i] if cmp, err := hist.UpperBound.Compare(ctx, evalCtx, tree.DNull); err != nil { return lowerBound, nil, err - } else if cmp != 0 { + } else if cmp != 0 && !isOuterBucket(hist) { lowerBound = hist.UpperBound break } @@ -105,7 +115,7 @@ func GetUsingExtremesBounds( return lowerBound, nil, pgerror.Newf( pgcode.ObjectNotInPrerequisiteState, - "only NULL values exist in the index, so partial stats cannot be collected") + "only outer or NULL bounded buckets exist in the index, so partial stats cannot be collected") } return lowerBound, upperBound, nil }