Skip to content

Commit

Permalink
VReplication: Support for filter using IN operator (#17296)
Browse files Browse the repository at this point in the history
Signed-off-by: Noble Mittal <[email protected]>
  • Loading branch information
beingnoble03 authored Dec 3, 2024
1 parent 1058621 commit f0c0a70
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 5 deletions.
1 change: 1 addition & 0 deletions go/vt/vtgate/evalengine/eval_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ func (er EvalResult) String() string {

// TupleValues allows for retrieval of the value we expose for public consumption
func (er EvalResult) TupleValues() []sqltypes.Value {
// TODO: Make this collation-aware
switch v := er.v.(type) {
case *evalTuple:
result := make([]sqltypes.Value, 0, len(v.t))
Expand Down
60 changes: 60 additions & 0 deletions go/vt/vttablet/tabletserver/vstreamer/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ const (
NotEqual
// IsNotNull is used to filter a column if it is NULL
IsNotNull
// In is used to filter a comparable column if equals any of the values from a specific tuple
In
)

// Filter contains opcodes for filtering.
Expand All @@ -97,6 +99,9 @@ type Filter struct {
ColNum int
Value sqltypes.Value

// Values will be used to store tuple/list values.
Values []sqltypes.Value

// Parameters for VindexMatch.
// Vindex, VindexColumns and KeyRange, if set, will be used
// to filter the row.
Expand Down Expand Up @@ -166,6 +171,8 @@ func getOpcode(comparison *sqlparser.ComparisonExpr) (Opcode, error) {
opcode = GreaterThanEqual
case sqlparser.NotEqualOp:
opcode = NotEqual
case sqlparser.InOp:
opcode = In
default:
return -1, fmt.Errorf("comparison operator %s not supported", comparison.Operator.ToString())
}
Expand Down Expand Up @@ -238,6 +245,24 @@ func (plan *Plan) filter(values, result []sqltypes.Value, charsets []collations.
if values[filter.ColNum].IsNull() {
return false, nil
}
case In:
if filter.Values == nil {
return false, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "unexpected empty filter values when performing IN operator")
}
found := false
for _, filterValue := range filter.Values {
match, err := compare(Equal, values[filter.ColNum], filterValue, plan.env.CollationEnv(), charsets[filter.ColNum])
if err != nil {
return false, err
}
if match {
found = true
break
}
}
if !found {
return false, nil
}
default:
match, err := compare(filter.Opcode, values[filter.ColNum], filter.Value, plan.env.CollationEnv(), charsets[filter.ColNum])
if err != nil {
Expand Down Expand Up @@ -514,6 +539,27 @@ func (plan *Plan) getColumnFuncExpr(columnName string) *sqlparser.FuncExpr {
return nil
}

func (plan *Plan) appendTupleFilter(values sqlparser.ValTuple, opcode Opcode, colnum int) error {
pv, err := evalengine.Translate(values, &evalengine.Config{
Collation: plan.env.CollationEnv().DefaultConnectionCharset(),
Environment: plan.env,
})
if err != nil {
return err
}
env := evalengine.EmptyExpressionEnv(plan.env)
resolved, err := env.Evaluate(pv)
if err != nil {
return err
}
plan.Filters = append(plan.Filters, Filter{
Opcode: opcode,
ColNum: colnum,
Values: resolved.TupleValues(),
})
return nil
}

func (plan *Plan) analyzeWhere(vschema *localVSchema, where *sqlparser.Where) error {
if where == nil {
return nil
Expand All @@ -537,6 +583,20 @@ func (plan *Plan) analyzeWhere(vschema *localVSchema, where *sqlparser.Where) er
if err != nil {
return err
}
// The Right Expr is typically expected to be a Literal value,
// except for the IN operator, where a Tuple value is expected.
// Handle the IN operator case first.
if opcode == In {
values, ok := expr.Right.(sqlparser.ValTuple)
if !ok {
return fmt.Errorf("unexpected: %v", sqlparser.String(expr))
}
err := plan.appendTupleFilter(values, opcode, colnum)
if err != nil {
return err
}
continue
}
val, ok := expr.Right.(*sqlparser.Literal)
if !ok {
return fmt.Errorf("unexpected: %v", sqlparser.String(expr))
Expand Down
9 changes: 8 additions & 1 deletion go/vt/vttablet/tabletserver/vstreamer/planbuilder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -710,9 +710,15 @@ func TestPlanBuilderFilterComparison(t *testing.T) {
outFilters: []Filter{{Opcode: LessThan, ColNum: 0, Value: sqltypes.NewInt64(2)},
{Opcode: LessThanEqual, ColNum: 1, Value: sqltypes.NewVarChar("xyz")},
},
}, {
name: "in-operator",
inFilter: "select * from t1 where id in (1, 2)",
outFilters: []Filter{
{Opcode: In, ColNum: 0, Values: []sqltypes.Value{sqltypes.NewInt64(1), sqltypes.NewInt64(2)}},
},
}, {
name: "vindex-and-operators",
inFilter: "select * from t1 where in_keyrange(id, 'hash', '-80') and id = 2 and val <> 'xyz'",
inFilter: "select * from t1 where in_keyrange(id, 'hash', '-80') and id = 2 and val <> 'xyz' and id in (100, 30)",
outFilters: []Filter{
{
Opcode: VindexMatch,
Expand All @@ -727,6 +733,7 @@ func TestPlanBuilderFilterComparison(t *testing.T) {
},
{Opcode: Equal, ColNum: 0, Value: sqltypes.NewInt64(2)},
{Opcode: NotEqual, ColNum: 1, Value: sqltypes.NewVarChar("xyz")},
{Opcode: In, ColNum: 0, Values: []sqltypes.Value{sqltypes.NewInt64(100), sqltypes.NewInt64(30)}},
},
}}

Expand Down
36 changes: 32 additions & 4 deletions go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1966,7 +1966,7 @@ func TestFilteredMultipleWhere(t *testing.T) {
filter: &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Match: "t1",
Filter: "select id1, val from t1 where in_keyrange('-80') and id2 = 200 and id3 = 1000 and val = 'newton'",
Filter: "select id1, val from t1 where in_keyrange('-80') and id2 = 200 and id3 = 1000 and val = 'newton' and id1 in (1, 2, 129)",
}},
},
customFieldEvents: true,
Expand All @@ -1988,9 +1988,7 @@ func TestFilteredMultipleWhere(t *testing.T) {
{spec: &TestRowEventSpec{table: "t1", changes: []TestRowChange{{after: []string{"2", "newton"}}}}},
}},
{"insert into t1 values (3, 100, 2000, 'kepler')", noEvents},
{"insert into t1 values (128, 200, 1000, 'newton')", []TestRowEvent{
{spec: &TestRowEventSpec{table: "t1", changes: []TestRowChange{{after: []string{"128", "newton"}}}}},
}},
{"insert into t1 values (128, 200, 1000, 'newton')", noEvents},
{"insert into t1 values (5, 200, 2000, 'kepler')", noEvents},
{"insert into t1 values (129, 200, 1000, 'kepler')", noEvents},
{"commit", nil},
Expand Down Expand Up @@ -2080,3 +2078,33 @@ func TestGeneratedInvisiblePrimaryKey(t *testing.T) {
}}
ts.Run()
}

func TestFilteredInOperator(t *testing.T) {
ts := &TestSpec{
t: t,
ddls: []string{
"create table t1(id1 int, id2 int, val varbinary(128), primary key(id1))",
},
options: &TestSpecOptions{
filter: &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Match: "t1",
Filter: "select id1, val from t1 where val in ('eee', 'bbb', 'ddd') and id1 in (4, 5)",
}},
},
},
}
defer ts.Close()
ts.Init()
ts.fieldEvents["t1"].cols[1].skip = true
ts.tests = [][]*TestQuery{{
{"begin", nil},
{"insert into t1 values (1, 100, 'aaa')", noEvents},
{"insert into t1 values (2, 200, 'bbb')", noEvents},
{"insert into t1 values (3, 100, 'ccc')", noEvents},
{"insert into t1 values (4, 200, 'ddd')", nil},
{"insert into t1 values (5, 200, 'eee')", nil},
{"commit", nil},
}}
ts.Run()
}

0 comments on commit f0c0a70

Please sign in to comment.