Skip to content

Commit

Permalink
Improve and fix error messages (#443)
Browse files Browse the repository at this point in the history
Simplify, improve and make error messages consistent. Make sure we return all proper user errors and not internal error. Create errors SQL test to test many errors
  • Loading branch information
purplefox authored Jun 30, 2022
1 parent f261ee3 commit 5afec70
Show file tree
Hide file tree
Showing 45 changed files with 2,376 additions and 248 deletions.
6 changes: 5 additions & 1 deletion api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package api
import (
"context"
"fmt"
"github.com/alecthomas/participle/v2"
"github.com/squareup/pranadb/pull/exec"
"net"
"sync"
Expand Down Expand Up @@ -151,11 +152,14 @@ func (s *Server) ExecuteStatement(in *service.ExecuteStatementRequest,
func (s *Server) doExecuteStatement(executor exec.PullExecutor, batchSize int, err error,
stream service.PranaDBService_ExecuteStatementServer) error {
if err != nil {
log.Errorf("failed to execute statement %+v", err)
var perr errors.PranaError
if errors.As(err, &perr) {
return perr
}
var participleErr participle.Error
if errors.As(err, &participleErr) {
return errors.NewInvalidStatementError(participleErr.Error())
}
return common.LogInternalError(err)
}

Expand Down
3 changes: 2 additions & 1 deletion client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ const (
minLineWidth = 10
minColWidth = 5
maxLineWidthPropName = "max_line_width"
maxLineWidth = 10000
)

// Client is a simple client used for executing statements against PranaDB, it used by the CLI and elsewhere
Expand Down Expand Up @@ -116,7 +117,7 @@ func (c *Client) handleSetCommand(statement string) error {
if propName := parts[1]; propName == maxLineWidthPropName {
propVal := parts[2]
width, err := strconv.Atoi(propVal)
if err != nil || width < minLineWidth {
if err != nil || width < minLineWidth || width > maxLineWidth {
return errors.Errorf("Invalid %s value: %s", maxLineWidthPropName, propVal)
}
c.lock.Lock()
Expand Down
2 changes: 1 addition & 1 deletion cluster/dragon/dragon.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ func (d *Dragon) ExecuteRemotePullQuery(queryInfo *cluster.QueryExecutionInfo, r
if bytes[0] == 0 {
if time.Now().Sub(start) > pullQueryRetryTimeout {
msg := string(bytes[1:])
return nil, errors.Errorf("failed to execute remote query %s %v", queryInfo.Query, msg)
return nil, errors.Errorf("failed to execute remote query %v", msg)
}
// Retry - the pull engine might not be fully started.... this can occur as the pull engine is not fully
// initialised until after the cluster is active
Expand Down
2 changes: 1 addition & 1 deletion command/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ func (e *Executor) execDescribe(execCtx *execctx.ExecutionContext, tableName str
return nil, errors.WithStack(err)
}
if rows.RowCount() == 0 {
return nil, errors.NewUnknownSourceOrMaterializedViewError(execCtx.Schema.Name, tableName)
return nil, errors.NewUnknownTableError(execCtx.Schema.Name, tableName)
}
if rows.RowCount() != 1 {
panic(fmt.Sprintf("multiple matches for table: '%s.%s'", execCtx.Schema.Name, tableName))
Expand Down
10 changes: 8 additions & 2 deletions command/create_index_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func (c *CreateIndexCommand) getIndexInfo(ast *parser.CreateIndex) (*common.Inde
if !ok {
tab, ok = c.e.metaController.GetMaterializedView(c.SchemaName(), ast.TableName)
if !ok {
return nil, errors.NewUnknownSourceOrMaterializedViewError(c.SchemaName(), ast.TableName)
return nil, errors.NewUnknownTableError(c.SchemaName(), ast.TableName)
}
}
tabInfo := tab.GetTableInfo()
Expand All @@ -164,12 +164,18 @@ func (c *CreateIndexCommand) getIndexInfo(ast *parser.CreateIndex) (*common.Inde
colMap[colName] = colIndex
}
indexCols := make([]int, len(ast.ColumnNames))
indexColMap := make(map[int]struct{}, len(ast.ColumnNames))
for i, colName := range ast.ColumnNames {
colIndex, ok := colMap[colName.Name]
if !ok {
return nil, errors.NewUnknownIndexColumn(c.SchemaName(), ast.TableName, colName.Name)
return nil, errors.NewPranaErrorf(errors.InvalidStatement, "Unknown column %s in %s.%s",
colName.Name, c.SchemaName(), ast.TableName)
}
indexCols[i] = colIndex
indexColMap[colIndex] = struct{}{}
}
if len(indexColMap) != len(ast.ColumnNames) {
return nil, errors.NewPranaErrorf(errors.InvalidStatement, "Index cannot contain same column multiple times")
}
info := &common.IndexInfo{
SchemaName: c.SchemaName(),
Expand Down
7 changes: 4 additions & 3 deletions command/create_mv_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,11 @@ func (c *CreateMVCommand) Before() error {
return errors.WithStack(err)
}
c.mv = mv
_, ok := c.e.metaController.GetMaterializedView(mv.Info.SchemaName, mv.Info.Name)
if ok {
return errors.NewMaterializedViewAlreadyExistsError(mv.Info.SchemaName, mv.Info.Name)

if err := c.e.metaController.ExistsMvOrSource(c.schema, mv.Info.Name); err != nil {
return err
}

rows, err := c.e.pullEngine.ExecuteQuery("sys",
fmt.Sprintf("select id from tables where schema_name='%s' and name='%s' and kind='%s'", c.mv.Info.SchemaName, c.mv.Info.Name, meta.TableKindMaterializedView))
if err != nil {
Expand Down
48 changes: 29 additions & 19 deletions command/create_source_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,9 @@ func (c *CreateSourceCommand) Before() error {
}

func (c *CreateSourceCommand) validate() error {
_, ok := c.e.metaController.GetSource(c.schemaName, c.sourceInfo.Name)
if ok {
return errors.NewSourceAlreadyExistsError(c.schemaName, c.sourceInfo.Name)
schema := c.e.metaController.GetOrCreateSchema(c.schemaName)
if err := c.e.metaController.ExistsMvOrSource(schema, c.sourceInfo.Name); err != nil {
return err
}
rows, err := c.e.pullEngine.ExecuteQuery("sys",
fmt.Sprintf("select id from tables where schema_name='%s' and name='%s' and kind='%s'", c.sourceInfo.SchemaName, c.sourceInfo.Name, meta.TableKindSource))
Expand All @@ -97,18 +97,18 @@ func (c *CreateSourceCommand) validate() error {
}
_, err := c.e.protoRegistry.FindDescriptorByName(protoreflect.FullName(enc.SchemaName))
if err != nil {
return errors.NewPranaErrorf(errors.UnknownTopicEncoding, "proto message %q not registered", enc.SchemaName)
return errors.NewPranaErrorf(errors.InvalidStatement, "Proto message %q not registered", enc.SchemaName)
}
}

for _, sel := range topicInfo.ColSelectors {
if sel.MetaKey == nil && len(sel.Selector) == 0 {
return errors.NewPranaErrorf(errors.InvalidSelector, "invalid column selector %q", sel)
return errors.NewPranaErrorf(errors.InvalidStatement, "Invalid column selector %q", sel)
}
if sel.MetaKey != nil {
f := *sel.MetaKey
if !(f == "header" || f == "key" || f == "timestamp") {
return errors.NewPranaErrorf(errors.InvalidSelector, `invalid metadata key in column selector %q. Valid values are "header", "key", "timestamp".`, sel)
return errors.NewPranaErrorf(errors.InvalidStatement, `Invalid metadata key in column selector %q. Valid values are "header", "key", "timestamp".`, sel)
}
}
}
Expand Down Expand Up @@ -210,7 +210,7 @@ func (c *CreateSourceCommand) getSourceInfo(ast *parser.CreateSource) (*common.S
for _, pk := range option.PrimaryKey {
index, ok := colIndex[pk]
if !ok {
return nil, errors.Errorf("invalid primary key column %q", option.PrimaryKey)
return nil, errors.NewPranaErrorf(errors.InvalidStatement, "Invalid primary key column %q", option.PrimaryKey)
}
pkCols = append(pkCols, index)
}
Expand All @@ -232,17 +232,17 @@ func (c *CreateSourceCommand) getSourceInfo(ast *parser.CreateSource) (*common.S
case opt.HeaderEncoding != "":
headerEncoding = common.KafkaEncodingFromString(opt.HeaderEncoding)
if headerEncoding.Encoding == common.EncodingUnknown {
return nil, errors.NewPranaErrorf(errors.UnknownTopicEncoding, "Unknown topic encoding %s", opt.HeaderEncoding)
return nil, errors.NewPranaErrorf(errors.InvalidStatement, "Unknown topic encoding %s", opt.HeaderEncoding)
}
case opt.KeyEncoding != "":
keyEncoding = common.KafkaEncodingFromString(opt.KeyEncoding)
if keyEncoding.Encoding == common.EncodingUnknown {
return nil, errors.NewPranaErrorf(errors.UnknownTopicEncoding, "Unknown topic encoding %s", opt.KeyEncoding)
return nil, errors.NewPranaErrorf(errors.InvalidStatement, "Unknown topic encoding %s", opt.KeyEncoding)
}
case opt.ValueEncoding != "":
valueEncoding = common.KafkaEncodingFromString(opt.ValueEncoding)
if valueEncoding.Encoding == common.EncodingUnknown {
return nil, errors.NewPranaErrorf(errors.UnknownTopicEncoding, "Unknown topic encoding %s", opt.ValueEncoding)
return nil, errors.NewPranaErrorf(errors.InvalidStatement, "Unknown topic encoding %s", opt.ValueEncoding)
}
case opt.IngestFilter != "":
ingestFilter = opt.IngestFilter
Expand All @@ -264,24 +264,34 @@ func (c *CreateSourceCommand) getSourceInfo(ast *parser.CreateSource) (*common.S
}
}
if headerEncoding == common.KafkaEncodingUnknown {
return nil, errors.NewPranaError(errors.InvalidStatement, "headerEncoding is required")
return nil, errors.NewPranaErrorf(errors.InvalidStatement, "headerEncoding is required")
}
if keyEncoding == common.KafkaEncodingUnknown {
return nil, errors.NewPranaError(errors.InvalidStatement, "keyEncoding is required")
return nil, errors.NewPranaErrorf(errors.InvalidStatement, "keyEncoding is required")
}
if valueEncoding == common.KafkaEncodingUnknown {
return nil, errors.NewPranaError(errors.InvalidStatement, "valueEncoding is required")
return nil, errors.NewPranaErrorf(errors.InvalidStatement, "valueEncoding is required")
}
if brokerName == "" {
return nil, errors.NewPranaError(errors.InvalidStatement, "brokerName is required")
return nil, errors.NewPranaErrorf(errors.InvalidStatement, "brokerName is required")
}
if topicName == "" {
return nil, errors.NewPranaError(errors.InvalidStatement, "topicName is required")
return nil, errors.NewPranaErrorf(errors.InvalidStatement, "topicName is required")
}
lc := len(colSelectors)
if lc > 0 && lc != len(colTypes) {
return nil, errors.NewPranaErrorf(errors.WrongNumberColumnSelectors,
"Number of column selectors (%d) must match number of columns (%d)", lc, len(colTypes))
if len(colSelectors) != len(colTypes) {
return nil, errors.NewPranaErrorf(errors.InvalidStatement,
"Number of column selectors (%d) must match number of columns (%d)", len(colSelectors), len(colTypes))
}
if len(pkCols) == 0 {
return nil, errors.NewPranaErrorf(errors.InvalidStatement, "Primary key is required")
}

pkMap := make(map[int]struct{}, len(pkCols))
for _, pkCol := range pkCols {
pkMap[pkCol] = struct{}{}
}
if len(pkMap) != len(pkCols) {
return nil, errors.NewPranaErrorf(errors.InvalidStatement, "Primary key cannot contain same column multiple times")
}

topicInfo := &common.TopicInfo{
Expand Down
3 changes: 3 additions & 0 deletions command/drop_index_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,9 @@ func (c *DropIndexCommand) getIndexInfo() (*common.IndexInfo, error) {
c.indexName = ast.Drop.Name
c.tableName = ast.Drop.TableName
}
if c.tableName == "" {
return nil, errors.NewInvalidStatementError("Drop index requires a table")
}
indexInfo, ok := c.e.metaController.GetIndex(c.schemaName, c.tableName, c.indexName)
if !ok {
return nil, errors.NewUnknownIndexError(c.schemaName, c.tableName, c.indexName)
Expand Down
16 changes: 15 additions & 1 deletion command/parser/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,27 @@ func (c *ColumnDef) ToColumnType() (common.ColumnType, error) {
switch c.Type {
case common.TypeDecimal:
if len(c.Parameters) != 2 {
return common.ColumnType{}, participle.Errorf(c.Pos, "expected DECIMAL(precision, scale)")
return common.ColumnType{}, participle.Errorf(c.Pos, "Expected DECIMAL(precision, scale)")
}
prec := c.Parameters[0]
scale := c.Parameters[1]
if prec > 65 || prec < 1 {
return common.ColumnType{}, participle.Errorf(c.Pos, "Decimal precision must be > 0 and <= 65")
}
if scale > 30 || scale < 0 {
return common.ColumnType{}, participle.Errorf(c.Pos, "decimal scale must be >= 0 and <= 30")
}
if scale > prec {
return common.ColumnType{}, participle.Errorf(c.Pos, "Decimal scale must be <= precision")
}
return common.NewDecimalColumnType(c.Parameters[0], c.Parameters[1]), nil
case common.TypeTimestamp:
var fsp int8 = DefaultFSP
if len(c.Parameters) == 1 {
fsp = int8(c.Parameters[0])
if fsp < 0 || fsp > 6 {
return common.ColumnType{}, participle.Errorf(c.Pos, "Timestamp fsp must be >= 0 and <= 6")
}
}
return common.NewTimestampColumnType(fsp), nil
default:
Expand Down
70 changes: 35 additions & 35 deletions conf/conf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,41 +259,41 @@ func invalidRaftElectionRTTTooSmall() Config {
}

var invalidConfigs = []configPair{
{"PDB0004 - Invalid configuration: NodeID must be >= 0", invalidNodeIDConf()},
{"PDB0004 - Invalid configuration: NumShards must be >= 1", invalidNumShardsConf()},
{"PDB0004 - Invalid configuration: DataDir must be specified", invalidDatadirConf()},
{"PDB0004 - Invalid configuration: KafkaBrokers must be specified", missingKafkaBrokersConf()},
{"PDB0004 - Invalid configuration: KafkaBroker testbroker, invalid ClientType, must be 1 or 2", invalidBrokerClientTypeConf()},
{"PDB0004 - Invalid configuration: RemotingHeartbeatInterval must be >= 1000000000", invalidRemotingHeartbeatInterval()},
{"PDB0004 - Invalid configuration: RemotingHeartbeatTimeout must be >= 1000000", invalidRemotingHeartbeatTimeout()},
{"PDB0004 - Invalid configuration: APIServerListenAddresses must be specified", invalidAPIServerListenAddress()},
{"PDB0004 - Invalid configuration: NodeID must be in the range 0 (inclusive) to len(RaftAddresses) (exclusive)", NodeIDOutOfRangeConf()},
{"PDB0004 - Invalid configuration: ReplicationFactor must be >= 3", invalidReplicationFactorConfig()},
{"PDB0004 - Invalid configuration: Number of RaftAddresses must be >= ReplicationFactor", invalidRaftAddressesConfig()},
{"PDB0004 - Invalid configuration: Number of RaftAddresses must be same as number of NotifListenerAddresses", raftAndNotifListenerAddressedDifferentLengthConfig()},
{"PDB0004 - Invalid configuration: Number of RaftAddresses must be same as number of APIServerListenAddresses", raftAndAPIServerListenerAddressedDifferentLengthConfig()},
{"PDB0004 - Invalid configuration: DataSnapshotEntries must be >= 10", invalidDataSnapshotEntries()},
{"PDB0004 - Invalid configuration: DataCompactionOverhead must be >= 5", invalidDataCompactionOverhead()},
{"PDB0004 - Invalid configuration: SequenceSnapshotEntries must be >= 10", invalidSequenceSnapshotEntries()},
{"PDB0004 - Invalid configuration: SequenceCompactionOverhead must be >= 5", invalidSequenceCompactionOverhead()},
{"PDB0004 - Invalid configuration: LocksSnapshotEntries must be >= 10", invalidLocksSnapshotEntries()},
{"PDB0004 - Invalid configuration: LocksCompactionOverhead must be >= 5", invalidLocksCompactionOverhead()},
{"PDB0004 - Invalid configuration: DataSnapshotEntries must be >= DataCompactionOverhead", dataCompactionGreaterThanDataSnapshotEntries()},
{"PDB0004 - Invalid configuration: SequenceSnapshotEntries must be >= SequenceCompactionOverhead", sequenceCompactionGreaterThanDataSnapshotEntries()},
{"PDB0004 - Invalid configuration: LocksSnapshotEntries must be >= LocksCompactionOverhead", locksCompactionGreaterThanDataSnapshotEntries()},
{"PDB0004 - Invalid configuration: LifeCycleListenAddress must be specified", invalidLifecycleListenAddress()},
{"PDB0004 - Invalid configuration: StartupEndpointPath must be specified", invalidStartupEndpointPath()},
{"PDB0004 - Invalid configuration: LiveEndpointPath must be specified", invalidLiveEndpointPath()},
{"PDB0004 - Invalid configuration: ReadyEndpointPath must be specified", invalidReadyEndpointPath()},
{"PDB0004 - Invalid configuration: GlobalIngestLimitRowsPerSec must be > 0 or -1", invalidGlobalIngestLimitRowsPerSecZero()},
{"PDB0004 - Invalid configuration: GlobalIngestLimitRowsPerSec must be > 0 or -1", invalidGlobalIngestLimitRowsPerNegative()},
{"PDB0004 - Invalid configuration: RaftRTTMs must be > 0", invalidRaftRTTMsZero()},
{"PDB0004 - Invalid configuration: RaftRTTMs must be > 0", invalidRaftRTTMsNegative()},
{"PDB0004 - Invalid configuration: RaftHeartbeatRTT must be > 0", invalidRaftHeartbeatRTTZero()},
{"PDB0004 - Invalid configuration: RaftHeartbeatRTT must be > 0", invalidRaftHeartbeatRTTNegative()},
{"PDB0004 - Invalid configuration: RaftElectionRTT must be > 0", invalidRaftElectionRTTZero()},
{"PDB0004 - Invalid configuration: RaftElectionRTT must be > 0", invalidRaftElectionRTTNegative()},
{"PDB0004 - Invalid configuration: RaftElectionRTT must be > 2 * RaftHeartbeatRTT", invalidRaftElectionRTTTooSmall()},
{"PDB3000 - Invalid configuration: NodeID must be >= 0", invalidNodeIDConf()},
{"PDB3000 - Invalid configuration: NumShards must be >= 1", invalidNumShardsConf()},
{"PDB3000 - Invalid configuration: DataDir must be specified", invalidDatadirConf()},
{"PDB3000 - Invalid configuration: KafkaBrokers must be specified", missingKafkaBrokersConf()},
{"PDB3000 - Invalid configuration: KafkaBroker testbroker, invalid ClientType, must be 1 or 2", invalidBrokerClientTypeConf()},
{"PDB3000 - Invalid configuration: RemotingHeartbeatInterval must be >= 1000000000", invalidRemotingHeartbeatInterval()},
{"PDB3000 - Invalid configuration: RemotingHeartbeatTimeout must be >= 1000000", invalidRemotingHeartbeatTimeout()},
{"PDB3000 - Invalid configuration: APIServerListenAddresses must be specified", invalidAPIServerListenAddress()},
{"PDB3000 - Invalid configuration: NodeID must be in the range 0 (inclusive) to len(RaftAddresses) (exclusive)", NodeIDOutOfRangeConf()},
{"PDB3000 - Invalid configuration: ReplicationFactor must be >= 3", invalidReplicationFactorConfig()},
{"PDB3000 - Invalid configuration: Number of RaftAddresses must be >= ReplicationFactor", invalidRaftAddressesConfig()},
{"PDB3000 - Invalid configuration: Number of RaftAddresses must be same as number of NotifListenerAddresses", raftAndNotifListenerAddressedDifferentLengthConfig()},
{"PDB3000 - Invalid configuration: Number of RaftAddresses must be same as number of APIServerListenAddresses", raftAndAPIServerListenerAddressedDifferentLengthConfig()},
{"PDB3000 - Invalid configuration: DataSnapshotEntries must be >= 10", invalidDataSnapshotEntries()},
{"PDB3000 - Invalid configuration: DataCompactionOverhead must be >= 5", invalidDataCompactionOverhead()},
{"PDB3000 - Invalid configuration: SequenceSnapshotEntries must be >= 10", invalidSequenceSnapshotEntries()},
{"PDB3000 - Invalid configuration: SequenceCompactionOverhead must be >= 5", invalidSequenceCompactionOverhead()},
{"PDB3000 - Invalid configuration: LocksSnapshotEntries must be >= 10", invalidLocksSnapshotEntries()},
{"PDB3000 - Invalid configuration: LocksCompactionOverhead must be >= 5", invalidLocksCompactionOverhead()},
{"PDB3000 - Invalid configuration: DataSnapshotEntries must be >= DataCompactionOverhead", dataCompactionGreaterThanDataSnapshotEntries()},
{"PDB3000 - Invalid configuration: SequenceSnapshotEntries must be >= SequenceCompactionOverhead", sequenceCompactionGreaterThanDataSnapshotEntries()},
{"PDB3000 - Invalid configuration: LocksSnapshotEntries must be >= LocksCompactionOverhead", locksCompactionGreaterThanDataSnapshotEntries()},
{"PDB3000 - Invalid configuration: LifeCycleListenAddress must be specified", invalidLifecycleListenAddress()},
{"PDB3000 - Invalid configuration: StartupEndpointPath must be specified", invalidStartupEndpointPath()},
{"PDB3000 - Invalid configuration: LiveEndpointPath must be specified", invalidLiveEndpointPath()},
{"PDB3000 - Invalid configuration: ReadyEndpointPath must be specified", invalidReadyEndpointPath()},
{"PDB3000 - Invalid configuration: GlobalIngestLimitRowsPerSec must be > 0 or -1", invalidGlobalIngestLimitRowsPerSecZero()},
{"PDB3000 - Invalid configuration: GlobalIngestLimitRowsPerSec must be > 0 or -1", invalidGlobalIngestLimitRowsPerNegative()},
{"PDB3000 - Invalid configuration: RaftRTTMs must be > 0", invalidRaftRTTMsZero()},
{"PDB3000 - Invalid configuration: RaftRTTMs must be > 0", invalidRaftRTTMsNegative()},
{"PDB3000 - Invalid configuration: RaftHeartbeatRTT must be > 0", invalidRaftHeartbeatRTTZero()},
{"PDB3000 - Invalid configuration: RaftHeartbeatRTT must be > 0", invalidRaftHeartbeatRTTNegative()},
{"PDB3000 - Invalid configuration: RaftElectionRTT must be > 0", invalidRaftElectionRTTZero()},
{"PDB3000 - Invalid configuration: RaftElectionRTT must be > 0", invalidRaftElectionRTTNegative()},
{"PDB3000 - Invalid configuration: RaftElectionRTT must be > 2 * RaftHeartbeatRTT", invalidRaftElectionRTTTooSmall()},
}

func TestValidate(t *testing.T) {
Expand Down
Loading

0 comments on commit 5afec70

Please sign in to comment.