Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
126403: sql: ignore outer buckets when getting partial statistic extreme bounds r=Uzair5162 a=Uzair5162

Full statistic collections sometimes invoke `addOuterBuckets()` which adds buckets with column-type max and min upper bounds to the histogram. Previously, we used the first (non-null) and last bucket as the "less than" and "greater than" bounds for partial statistics collections using extremes. This results in an incorrect predicate when outer buckets exist in the most recent full statistic, which has been fixed in this commit by ignoring outer buckets when determining bounds.

Note that this fix doesn't apply to partial stats created on enum or bool type columns (see #126401) as outer buckets on those histograms don't have `NumEq == 0`. Reducing the check to just `UpperBound.IsMax() || UpperBound.IsMin()` for these types is problematic as for bools, every bucket is a max or min and enums are typically small and behave similarly.

Fixes: #93094

See also: #125950

Release note (bug fix): Fixed a bug when creating partial statistics using extremes (which is disabled by default) where it would occasionally use incorrect extreme values and collect no stats. This occurs when outer buckets were added to the previous histogram to account for extra distinct count.

126405: crosscluster/logical: add a few guardrails r=dt a=dt

This checks that the user has the replication priv and that the table and cluster are configured for replication.

Co-authored-by: Uzair Ahmad <[email protected]>
Co-authored-by: David Taylor <[email protected]>
  • Loading branch information
3 people committed Jun 28, 2024
3 parents 27f6e07 + bd63938 + b2a5e98 commit 047a7ed
Show file tree
Hide file tree
Showing 6 changed files with 270 additions and 5 deletions.
2 changes: 2 additions & 0 deletions pkg/ccl/crosscluster/logical/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,14 @@ go_library(
"//pkg/sql/pgwire/pgcode",
"//pkg/sql/pgwire/pgerror",
"//pkg/sql/physicalplan",
"//pkg/sql/privilege",
"//pkg/sql/rowenc",
"//pkg/sql/rowexec",
"//pkg/sql/sem/catid",
"//pkg/sql/sem/eval",
"//pkg/sql/sem/tree",
"//pkg/sql/sessiondata",
"//pkg/sql/syntheticprivilege",
"//pkg/sql/types",
"//pkg/util/ctxgroup",
"//pkg/util/hlc",
Expand Down
32 changes: 32 additions & 0 deletions pkg/ccl/crosscluster/logical/create_logical_replication_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/exprutil"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/privilege"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/syntheticprivilege"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -77,6 +79,14 @@ func createLogicalReplicationStreamPlanHook(
return err
}

// TODO(dt): the global priv is a big hammer; should we be checking just on
// table(s) or database being replicated from and into?
if err := p.CheckPrivilege(
ctx, syntheticprivilege.GlobalPrivilegeObject, privilege.REPLICATION,
); err != nil {
return err
}

if !stmt.Options.IsDefault() {
return errors.UnimplementedErrorf(issuelink.IssueLink{}, "logical replication stream options are not yet supported")
}
Expand Down Expand Up @@ -105,6 +115,28 @@ func createLogicalReplicationStreamPlanHook(

repPairs[i].DstDescriptorID = int32(td.GetID())

// TODO(dt): remove when we support this via KV metadata.
var foundTSCol bool
for _, col := range td.GetColumns() {
if col.Name == originTimestampColumnName {
foundTSCol = true
if col.Type.Family() != types.DecimalFamily {
return errors.Newf(
"%s column must be type DECIMAL for use by logical replication", originTimestampColumnName,
)
}
break
}
}
if !foundTSCol {
return errors.WithHintf(errors.Newf(
"tables written to by logical replication currently require a %q DECIMAL column",
originTimestampColumnName,
), "try 'ALTER %s ADD COLUMN %s DECIMAL NOT VISIBLE DEFAULT NULL ON UPDATE NULL",
dstObjName.String(), originTimestampColumnName,
)
}

tbNameWithSchema := tree.MakeTableNameWithSchema(
tree.Name(prefix.Database.GetName()),
tree.Name(prefix.Schema.GetName()),
Expand Down
42 changes: 42 additions & 0 deletions pkg/ccl/crosscluster/logical/logical_replication_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,48 @@ func TestLogicalStreamIngestionJob(t *testing.T) {
require.Equal(t, int64(expCPuts), numCPuts.Load())
}

func TestLogicalStreamIngestionErrors(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()

server := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{})
defer server.Stopper().Stop(ctx)
s := server.Server(0).ApplicationLayer()
url, cleanup := s.PGUrl(t, serverutils.DBName("a"))
defer cleanup()
urlA := url.String()

_, err := server.Conns[0].Exec("CREATE DATABASE a")
require.NoError(t, err)
_, err = server.Conns[0].Exec("CREATE DATABASE B")
require.NoError(t, err)

dbA := sqlutils.MakeSQLRunner(s.SQLConn(t, serverutils.DBName("a")))
dbB := sqlutils.MakeSQLRunner(s.SQLConn(t, serverutils.DBName("b")))

createStmt := "CREATE TABLE tab (pk int primary key, payload string)"
dbA.Exec(t, createStmt)
dbB.Exec(t, createStmt)

createQ := "CREATE LOGICAL REPLICATION STREAM FROM TABLE tab ON $1 INTO TABLE tab"

dbB.ExpectErrWithHint(t, "currently require a .* DECIMAL column", "ADD COLUMN", createQ, urlA)

dbB.Exec(t, "ALTER TABLE tab ADD COLUMN crdb_internal_origin_timestamp STRING")
dbB.ExpectErr(t, ".*column must be type DECIMAL for use by logical replication", createQ, urlA)

dbB.Exec(t, fmt.Sprintf("ALTER TABLE tab RENAME COLUMN %[1]s TO str_col, ADD COLUMN %[1]s DECIMAL", originTimestampColumnName))

if s.Codec().IsSystem() {
dbB.ExpectErr(t, "kv.rangefeed.enabled must be enabled on the source cluster for logical replication", createQ, urlA)
kvserver.RangefeedEnabled.Override(ctx, &server.Server(0).ClusterSettings().SV, true)
}

dbB.Exec(t, createQ, urlA)
}

func TestLogicalStreamIngestionJobWithColumnFamilies(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down
9 changes: 8 additions & 1 deletion pkg/ccl/crosscluster/producer/replication_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/ccl/utilccl"
"github.com/cockroachdb/cockroach/pkg/jobs/jobsprotectedts"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb"
"github.com/cockroachdb/cockroach/pkg/repstream"
"github.com/cockroachdb/cockroach/pkg/repstream/streampb"
Expand All @@ -33,6 +34,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/syntheticprivilege"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
)

type replicationStreamManagerImpl struct {
Expand Down Expand Up @@ -60,6 +62,12 @@ func (r *replicationStreamManagerImpl) StartReplicationStreamForTables(
return streampb.ReplicationProducerSpec{}, err
}

execConfig := r.evalCtx.Planner.ExecutorConfig().(*sql.ExecutorConfig)

if execConfig.Codec.IsSystem() && !kvserver.RangefeedEnabled.Get(&execConfig.Settings.SV) {
return streampb.ReplicationProducerSpec{}, errors.Errorf("kv.rangefeed.enabled must be enabled on the source cluster for logical replication")
}

var replicationStartTime hlc.Timestamp
if !req.ReplicationStartTime.IsEmpty() {
replicationStartTime = req.ReplicationStartTime
Expand Down Expand Up @@ -87,7 +95,6 @@ func (r *replicationStreamManagerImpl) StartReplicationStreamForTables(
tableDescs[name] = td.TableDescriptor
}

execConfig := r.evalCtx.Planner.ExecutorConfig().(*sql.ExecutorConfig)
registry := execConfig.JobRegistry
ptsID := uuid.MakeV4()
jr := makeProducerJobRecordForLogicalReplication(
Expand Down
174 changes: 173 additions & 1 deletion pkg/sql/logictest/testdata/logic_test/distsql_stats
Original file line number Diff line number Diff line change
Expand Up @@ -2577,7 +2577,7 @@ SHOW STATISTICS USING JSON FOR TABLE only_null;
statement ok
ALTER TABLE only_null INJECT STATISTICS '$only_null_stat';

statement error pq: only NULL values exist in the index, so partial stats cannot be collected
statement error pq: only outer or NULL bounded buckets exist in the index, so partial stats cannot be collected
CREATE STATISTICS only_null_partial ON a FROM only_null USING EXTREMES;

statement ok
Expand Down Expand Up @@ -2908,6 +2908,178 @@ SHOW HISTOGRAM $hist_crdb_internal_idx_expr
upper_bound range_rows distinct_range_rows equal_rows
'{"bar": {"baz": 5}}' 0 0 1

# Verify that the correct partial predicate is used for partial stats using
# extremes when outer buckets exist (int column type).
statement ok
CREATE TABLE int_outer_buckets (a PRIMARY KEY) AS SELECT generate_series(0, 9999);

statement ok
CREATE STATISTICS int_outer_buckets_full ON a FROM int_outer_buckets;

let $hist_id_int_outer_buckets_full
SELECT histogram_id FROM [SHOW STATISTICS FOR TABLE int_outer_buckets] WHERE statistics_name = 'int_outer_buckets_full'

# The full stats collection should have added 2 outer buckets for a total of 202
# with upper bounds of MaxInt64 and MinInt64.
query I
SELECT count(*) FROM [SHOW HISTOGRAM $hist_id_int_outer_buckets_full]
----
202

statement ok
INSERT INTO int_outer_buckets SELECT generate_series(-10, -1) UNION ALL SELECT generate_series(10000, 10009);

statement ok
CREATE STATISTICS int_outer_buckets_partial ON a FROM int_outer_buckets USING EXTREMES;

# The partial stat predicate should not include MaxInt64 and MinInt64 from the
# outer buckets and should count 20 rows beyond the extremes.
query TTII colnames
SELECT "statistics_name", "partial_predicate", "row_count", "null_count"
FROM [SHOW STATISTICS FOR TABLE int_outer_buckets]
WHERE statistics_name = 'int_outer_buckets_partial'
----
statistics_name partial_predicate row_count null_count
int_outer_buckets_partial (a IS NULL) OR ((a < 0:::INT8) OR (a > 9999:::INT8)) 20 0

# Verify that we don't ignore buckets with actual max and min values when
# creating partial stats using extremes.
statement ok
INSERT INTO int_outer_buckets VALUES (-9223372036854775808), (9223372036854775807);

statement ok
SET CLUSTER SETTING sql.stats.histogram_samples.count = 10050;

statement ok
CREATE STATISTICS int_outer_buckets_full ON a FROM int_outer_buckets;

statement ok
CREATE STATISTICS int_outer_buckets_partial ON a FROM int_outer_buckets USING EXTREMES;

# The partial stat predicate should include MaxInt64 and MinInt64 and should
# count no rows beyond the extremes.
query TTII colnames
SELECT "statistics_name", "partial_predicate", "row_count", "null_count"
FROM [SHOW STATISTICS FOR TABLE int_outer_buckets]
WHERE statistics_name = 'int_outer_buckets_partial'
----
statistics_name partial_predicate row_count null_count
int_outer_buckets_partial (a IS NULL) OR ((a < (-9223372036854775808):::INT8) OR (a > 9223372036854775807:::INT8)) 0 0

# Verify that the correct partial predicate is used for partial stats using
# extremes when outer buckets exist (timestamp column type).
statement ok
CREATE TABLE timestamp_outer_buckets (a TIMESTAMP PRIMARY KEY);

statement ok
INSERT INTO timestamp_outer_buckets VALUES
('2024-06-26 01:00:00'),
('2024-06-26 02:00:00'),
('2024-06-27 01:30:00'),
('2024-06-27 02:30:00');

statement ok
CREATE STATISTICS timestamp_outer_buckets_full ON a FROM timestamp_outer_buckets;

let $hist_id_timestamp_outer_buckets_full
SELECT histogram_id FROM [SHOW STATISTICS FOR TABLE timestamp_outer_buckets] WHERE statistics_name = 'timestamp_outer_buckets_full'

# The full stats collection should not have added outer buckets.
query I
SELECT count(*) FROM [SHOW HISTOGRAM $hist_id_timestamp_outer_buckets_full]
----
4

statement ok
INSERT INTO timestamp_outer_buckets VALUES
('2024-06-26 00:00:00'),
('2024-06-27 03:30:00');

statement ok
CREATE STATISTICS timestamp_outer_buckets_partial ON a FROM timestamp_outer_buckets USING EXTREMES;

# The partial stat should not ignore any buckets and have the correct predicate.
query TTII colnames
SELECT "statistics_name", "partial_predicate", "row_count", "null_count"
FROM [SHOW STATISTICS FOR TABLE timestamp_outer_buckets]
WHERE statistics_name = 'timestamp_outer_buckets_partial'
----
statistics_name partial_predicate row_count null_count
timestamp_outer_buckets_partial (a IS NULL) OR ((a < '2024-06-26 01:00:00':::TIMESTAMP) OR (a > '2024-06-27 02:30:00':::TIMESTAMP)) 2 0

# Inject a full statistic with outer buckets, overriding the previous stats.
statement ok
ALTER TABLE timestamp_outer_buckets INJECT STATISTICS '[
{
"avg_size": 7,
"columns": [
"a"
],
"created_at": "2024-06-27 19:00:16.450303",
"distinct_count": 4,
"histo_buckets": [
{
"distinct_range": 0.000001,
"num_eq": 0,
"num_range": 0,
"upper_bound": "4714-11-24 00:00:00 BC"
},
{
"distinct_range": 0,
"num_eq": 1,
"num_range": 0,
"upper_bound": "2024-06-26 01:00:00"
},
{
"distinct_range": 0,
"num_eq": 1,
"num_range": 0,
"upper_bound": "2024-06-26 02:00:00"
},
{
"distinct_range": 0,
"num_eq": 1,
"num_range": 0,
"upper_bound": "2024-06-27 01:30:00"
},
{
"distinct_range": 0,
"num_eq": 1,
"num_range": 0,
"upper_bound": "2024-06-27 02:30:00"
},
{
"distinct_range": 0.000001,
"num_eq": 0,
"num_range": 0,
"upper_bound": "294276-12-31 23:59:59.999999"
}
],
"histo_col_type": "TIMESTAMP",
"histo_version": 3,
"name": "timestamp_outer_buckets_full",
"null_count": 0,
"row_count": 4
}
]'

statement ok
INSERT INTO timestamp_outer_buckets VALUES ('2024-06-28 01:00:00');

statement ok
CREATE STATISTICS timestamp_outer_buckets_partial ON a FROM timestamp_outer_buckets USING EXTREMES;

# The partial stat predicate should not include MaxSupportedTime and
# MinSupportedTime from the outer buckets and should count 3 rows beyond the
# extremes.
query TTII colnames
SELECT "statistics_name", "partial_predicate", "row_count", "null_count"
FROM [SHOW STATISTICS FOR TABLE timestamp_outer_buckets]
WHERE statistics_name = 'timestamp_outer_buckets_partial'
----
statistics_name partial_predicate row_count null_count
timestamp_outer_buckets_partial (a IS NULL) OR ((a < '2024-06-26 01:00:00':::TIMESTAMP) OR (a > '2024-06-27 02:30:00':::TIMESTAMP)) 3 0

statement ok
RESET enable_create_stats_using_extremes

Expand Down
16 changes: 13 additions & 3 deletions pkg/sql/stats/bounds/extremes.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,15 +88,25 @@ func ConstructUsingExtremesSpans(
func GetUsingExtremesBounds(
ctx context.Context, evalCtx *eval.Context, histogram []cat.HistogramBucket,
) (lowerBound tree.Datum, upperBound tree.Datum, _ error) {
// Full stats collections sometimes add buckets with column type max/min upper
// bounds above and below the observed max and min values to account for extra
// distinct count (see addOuterBuckets()) and should be ignored.
isOuterBucket := func(bucket *cat.HistogramBucket) bool {
return (bucket.UpperBound.IsMin(ctx, evalCtx) || bucket.UpperBound.IsMax(ctx, evalCtx)) && bucket.NumEq == 0
}

upperBound = histogram[len(histogram)-1].UpperBound
// Pick the earliest lowerBound that is not null,
if len(histogram) > 1 && isOuterBucket(&histogram[len(histogram)-1]) {
upperBound = histogram[len(histogram)-2].UpperBound
}

// Pick the earliest lowerBound that is not null and isn't an outer bucket,
// but if none exist, return error
for i := range histogram {
hist := &histogram[i]
if cmp, err := hist.UpperBound.Compare(ctx, evalCtx, tree.DNull); err != nil {
return lowerBound, nil, err
} else if cmp != 0 {
} else if cmp != 0 && !isOuterBucket(hist) {
lowerBound = hist.UpperBound
break
}
Expand All @@ -105,7 +115,7 @@ func GetUsingExtremesBounds(
return lowerBound, nil,
pgerror.Newf(
pgcode.ObjectNotInPrerequisiteState,
"only NULL values exist in the index, so partial stats cannot be collected")
"only outer or NULL bounded buckets exist in the index, so partial stats cannot be collected")
}
return lowerBound, upperBound, nil
}

0 comments on commit 047a7ed

Please sign in to comment.