Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add schema validation for strict schema table in core select statements #504

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
70 changes: 69 additions & 1 deletion internal/query/statement/select.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import (
"fmt"

"github.com/cockroachdb/errors"

"github.com/genjidb/genji/document"
"github.com/genjidb/genji/internal/database"
"github.com/genjidb/genji/internal/expr"
"github.com/genjidb/genji/internal/sql/scanner"
"github.com/genjidb/genji/internal/stream"
Expand All @@ -20,16 +22,82 @@ type SelectCoreStmt struct {
ProjectionExprs []expr.Expr
}

func (stmt *SelectCoreStmt) Prepare(*Context) (*StreamStmt, error) {
// TODO: for draft placed here
func checkSchemaPathConstraints(tableInfo *database.TableInfo, src expr.Expr) error {
var pathErr error
expr.Walk(src, func(e expr.Expr) bool {
epath, ok := e.(expr.Path)
if !ok {
return true
}
path := (document.Path)(epath)
f := tableInfo.FieldConstraints
for i := range path {
fc, ok := f.ByField[path[i].FieldName]
if !ok {
if !f.AllowExtraFields {
pathErr = fmt.Errorf("field name %q for path %q does not exists", path[i].FieldName, path)
return false
}
return true
}
if fc.AnonymousType != nil {
f = fc.AnonymousType.FieldConstraints
}
// if we encountered composite path, where some part before last
// resolved in scalar type
if fc.AnonymousType == nil && i != len(path)-1 {
pathErr = fmt.Errorf("field name %q for path %q resolved in scalar type", path[i].FieldName, path)
return false
}
}
return true
})
return pathErr
}

func (stmt *SelectCoreStmt) Prepare(c *Context) (*StreamStmt, error) {
isReadOnly := true

var s *stream.Stream
var tableInfo *database.TableInfo

if stmt.TableName != "" {
info, err := c.DB.Catalog.GetTableInfo(stmt.TableName)
if err != nil {
return nil, err
}
if !info.FieldConstraints.AllowExtraFields {
tableInfo = info
}
s = s.Pipe(table.Scan(stmt.TableName))
}

if tableInfo != nil {
for _, pe := range stmt.ProjectionExprs {
ne, ok := pe.(*expr.NamedExpr)
if !ok {
// TODO: I didn't find other cases except NamedExpr, probably this should be exception?
continue
}
if pathErr := checkSchemaPathConstraints(tableInfo, ne); pathErr != nil {
return nil, pathErr
}
// TODO: see commennts in issue
/*
if ne.Name() != "" {
// ...
}
*/
}
}

if stmt.WhereExpr != nil {
if tableInfo != nil {
if pathErr := checkSchemaPathConstraints(tableInfo, stmt.WhereExpr); pathErr != nil {
return nil, pathErr
}
}
s = s.Pipe(docs.Filter(stmt.WhereExpr))
}

Expand Down