Skip to content

Commit f261ee3

Browse files
authored
Implement source ingest filter (#439)
* Implement source ingest filter - this allows us to reject rows from the source based on a WHERE clause * Fixed some issues with error propagation and a race in schema deletion
1 parent d6b713c commit f261ee3

26 files changed

+396
-120
lines changed

.github/workflows/ci.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,4 +32,4 @@ jobs:
3232
- name: Proto lint
3333
run: make -C protos lint
3434
- name: Test
35-
run: go test -v -race -count=1 -timeout 1h -failfast -p 1 -tags integration ./...
35+
run: go test -race -count=1 -timeout 1h -failfast -p 1 -tags integration ./...

api/server.go

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import (
77
"github.com/squareup/pranadb/pull/exec"
88
"net"
99
"sync"
10-
"sync/atomic"
1110
"time"
1211

1312
"github.com/squareup/pranadb/meta"
@@ -157,13 +156,7 @@ func (s *Server) doExecuteStatement(executor exec.PullExecutor, batchSize int, e
157156
if errors.As(err, &perr) {
158157
return perr
159158
}
160-
// For internal errors we don't return internal error messages to the CLI as this would leak
161-
// server implementation details. Instead, we generate a sequence number and add that to the message
162-
// and log the internal error in the server logs with the sequence number so it can be looked up
163-
seq := atomic.AddInt64(&s.errorSequence, 1)
164-
perr = errors.NewInternalError(seq)
165-
log.Errorf("internal error occurred with sequence number %d\n%v", seq, err)
166-
return perr
159+
return common.LogInternalError(err)
167160
}
168161

169162
// First send column definitions.

cfg/example.conf

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,6 @@ remoting-heartbeat-interval = "10s" // Amount of time between remoting hea
5858
remoting-heartbeat-timeout = "5s" // Timeout for a remoting heartbeat
5959
enable-api-server = true // Set to true to enable the API server - needed for CLI access
6060
global-ingest-limit-rows-per-sec = 1000 // The maximum number of rows per second that can be ingested in the broker - ingest will be throttled to this rate. -1 represents no throttling
61-
raft-rtt-ms = 100 // The size of a Raft RTT unit in ms
61+
raft-rtt-ms = 50 // The size of a Raft RTT unit in ms
6262
raft-heartbeat-rtt = 30 // The Raft heartbeat period in units of raft-rtt-ms
6363
raft-election-rtt = 300 // The Raft election period in units of raft-rtt-ms

command/create_source_command.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,7 @@ func (c *CreateSourceCommand) getSourceInfo(ast *parser.CreateSource) (*common.S
222222

223223
var (
224224
headerEncoding, keyEncoding, valueEncoding common.KafkaEncoding
225+
ingestFilter string
225226
propsMap map[string]string
226227
colSelectors []selector.ColumnSelector
227228
brokerName, topicName string
@@ -243,6 +244,8 @@ func (c *CreateSourceCommand) getSourceInfo(ast *parser.CreateSource) (*common.S
243244
if valueEncoding.Encoding == common.EncodingUnknown {
244245
return nil, errors.NewPranaErrorf(errors.UnknownTopicEncoding, "Unknown topic encoding %s", opt.ValueEncoding)
245246
}
247+
case opt.IngestFilter != "":
248+
ingestFilter = opt.IngestFilter
246249
case opt.Properties != nil:
247250
propsMap = make(map[string]string, len(opt.Properties))
248251
for _, prop := range opt.Properties {
@@ -287,6 +290,7 @@ func (c *CreateSourceCommand) getSourceInfo(ast *parser.CreateSource) (*common.S
287290
HeaderEncoding: headerEncoding,
288291
KeyEncoding: keyEncoding,
289292
ValueEncoding: valueEncoding,
293+
IngestFilter: ingestFilter,
290294
ColSelectors: colSelectors,
291295
Properties: propsMap,
292296
}

command/ddl_runner.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ func (d *DDLCommandRunner) HandleNotification(notification remoting.ClusterMessa
117117
return errors.Errorf("cannot find command with id %d:%d", ddlInfo.GetOriginatingNodeId(), ddlInfo.GetCommandId())
118118
}
119119
err := com.OnPhase(phase)
120-
if phase == int32(com.NumPhases()-1) {
120+
if phase == int32(com.NumPhases()-1) || err != nil {
121121
// Final phase so delete the command
122122
delete(d.commands, skey)
123123
}

command/parser/ast.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ type TopicInformation struct {
105105
HeaderEncoding string `|"HeaderEncoding" "=" @String`
106106
KeyEncoding string `|"KeyEncoding" "=" @String`
107107
ValueEncoding string `|"ValueEncoding" "=" @String`
108+
IngestFilter string `|"IngestFilter" "=" @String`
108109
ColSelectors []*selector.ColumnSelectorAST `|"ColumnSelectors" "=" "(" (@@ ("," @@)*)? ")"`
109110
Properties []*TopicInfoProperty `|"Properties" "=" "(" (@@ ("," @@)*)? ")"`
110111
}

command/parser/ast_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ func TestParse(t *testing.T) {
5050
headerencoding = "json",
5151
keyencoding = "json",
5252
valueencoding = "json",
53+
ingestfilter = "where sensor_id=1",
5354
columnselectors = (
5455
meta("key").k0,
5556
v1,
@@ -75,6 +76,7 @@ func TestParse(t *testing.T) {
7576
{HeaderEncoding: "json"},
7677
{KeyEncoding: "json"},
7778
{ValueEncoding: "json"},
79+
{IngestFilter: "where sensor_id=1"},
7880
{ColSelectors: []*selector.ColumnSelectorAST{
7981
{MetaKey: stringRef("key"), Next: &selector.SelectorAST{Field: "k0"}},
8082
{Field: stringRef("v1")},

common/internal_error.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package common
2+
3+
import (
4+
"github.com/google/uuid"
5+
log "github.com/sirupsen/logrus"
6+
"github.com/squareup/pranadb/errors"
7+
)
8+
9+
func LogInternalError(err error) errors.PranaError {
10+
id, err := uuid.NewRandom()
11+
var errRef string
12+
if err != nil {
13+
log.Errorf("failed to generate uuid %v", err)
14+
errRef = ""
15+
} else {
16+
errRef = id.String()
17+
}
18+
// For internal errors we don't return internal error messages to the CLI as this would leak
19+
// server implementation details. Instead, we generate a random UUID and add that to the message
20+
// and log the internal error in the server logs with the UUID so it can be looked up
21+
perr := errors.NewInternalError(errRef)
22+
log.Errorf("internal error occurred with reference %s\n%v", errRef, err)
23+
return perr
24+
}

common/meta.go

Lines changed: 5 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -186,11 +186,10 @@ func (i *IndexInfo) ContainsColIndex(colIndex int) bool {
186186

187187
type Schema struct {
188188
// Schema can be mutated from different goroutines so we need to lock to protect access to it's maps
189-
lock sync.RWMutex
190-
Name string
191-
tables map[string]Table
192-
sinks map[string]*SinkInfo
193-
deleted bool
189+
lock sync.RWMutex
190+
Name string
191+
tables map[string]Table
192+
sinks map[string]*SinkInfo
194193
}
195194

196195
func NewSchema(name string) *Schema {
@@ -280,20 +279,6 @@ func (s *Schema) Equal(other *Schema) bool {
280279
return reflect.DeepEqual(s, other)
281280
}
282281

283-
func (s *Schema) SetDeleted() {
284-
s.lock.RLock()
285-
defer s.lock.RUnlock()
286-
s.deleted = true
287-
}
288-
289-
func (s *Schema) IsDeleted() bool {
290-
// Schema cna become deleted if all tables are removed, but sessions might still have it cached
291-
// checking isDeleted() allows sessions to refresh the schema if necessary
292-
s.lock.RLock()
293-
defer s.lock.RUnlock()
294-
return s.deleted
295-
}
296-
297282
type SourceInfo struct {
298283
*TableInfo
299284
TopicInfo *TopicInfo
@@ -324,6 +309,7 @@ type TopicInfo struct {
324309
HeaderEncoding KafkaEncoding
325310
ColSelectors []selector.ColumnSelector
326311
Properties map[string]string
312+
IngestFilter string
327313
}
328314

329315
type KafkaEncoding struct {

conf/conf.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ const (
1717
DefaultRemotingHeartbeatInterval = 10 * time.Second
1818
DefaultRemotingHeartbeatTimeout = 5 * time.Second
1919
DefaultGlobalIngestLimitRowsPerSec = 1000
20-
DefaultRaftRTTMs = 100
20+
DefaultRaftRTTMs = 50
2121
DefaultRaftHeartbeatRTT = 30
2222
DefaultRaftElectionRTT = 300
2323
)

0 commit comments

Comments
 (0)