Skip to content

Commit

Permalink
Merge branch 'master' into test-integration-skew
Browse files Browse the repository at this point in the history
  • Loading branch information
dapr-bot committed Mar 28, 2024
2 parents 0600ccb + 892acfb commit 036ec45
Show file tree
Hide file tree
Showing 23 changed files with 723 additions and 162 deletions.
6 changes: 6 additions & 0 deletions pkg/messaging/v1/invoke_method_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,12 @@ func (imr *InvokeMethodRequest) WithHTTPExtension(verb string, querystring strin
// WithCustomHTTPMetadata applies a metadata map to a InvokeMethodRequest.
func (imr *InvokeMethodRequest) WithCustomHTTPMetadata(md map[string]string) *InvokeMethodRequest {
for k, v := range md {
if strings.EqualFold(k, ContentLengthHeader) {
// There is no use of the original payload's content-length because
// the entire data is already in the cloud event.
continue
}

if imr.r.GetMetadata() == nil {
imr.r.Metadata = make(map[string]*internalv1pb.ListStringValue)
}
Expand Down
7 changes: 7 additions & 0 deletions tests/integration/framework/process/daprd/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/dapr/dapr/tests/integration/framework/process/exec"
"github.com/dapr/dapr/tests/integration/framework/process/logline"
"github.com/dapr/dapr/tests/integration/framework/socket"
)

// Option is a function that configures the dapr process.
Expand Down Expand Up @@ -280,3 +281,9 @@ func WithControlPlaneTrustDomain(trustDomain string) Option {
o.controlPlaneTrustDomain = &trustDomain
}
}

func WithSocket(t *testing.T, socket *socket.Socket) Option {
return WithExecOptions(exec.WithEnvVars(t,
"DAPR_COMPONENTS_SOCKETS_FOLDER", socket.Directory(),
))
}
14 changes: 8 additions & 6 deletions tests/integration/framework/process/grpc/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,16 @@ func New(t *testing.T, fopts ...Option) *App {
return &App{
GRPC: procgrpc.New(t, append(opts.grpcopts, procgrpc.WithRegister(func(s *grpc.Server) {
srv := &server{
onInvokeFn: opts.onInvokeFn,
onTopicEventFn: opts.onTopicEventFn,
listTopicSubFn: opts.listTopicSubFn,
listInputBindFn: opts.listInputBindFn,
onBindingEventFn: opts.onBindingEventFn,
healthCheckFn: opts.healthCheckFn,
onInvokeFn: opts.onInvokeFn,
onTopicEventFn: opts.onTopicEventFn,
onBulkTopicEventFn: opts.onBulkTopicEventFn,
listTopicSubFn: opts.listTopicSubFn,
listInputBindFn: opts.listInputBindFn,
onBindingEventFn: opts.onBindingEventFn,
healthCheckFn: opts.healthCheckFn,
}
rtv1.RegisterAppCallbackServer(s, srv)
rtv1.RegisterAppCallbackAlphaServer(s, srv)
rtv1.RegisterAppCallbackHealthCheckServer(s, srv)
if opts.withRegister != nil {
opts.withRegister(s)
Expand Down
23 changes: 15 additions & 8 deletions tests/integration/framework/process/grpc/app/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,15 @@ import (

// options contains the options for running a GRPC server in integration tests.
type options struct {
grpcopts []procgrpc.Option
withRegister func(s *grpc.Server)
onTopicEventFn func(context.Context, *rtv1.TopicEventRequest) (*rtv1.TopicEventResponse, error)
onInvokeFn func(context.Context, *commonv1.InvokeRequest) (*commonv1.InvokeResponse, error)
listTopicSubFn func(ctx context.Context, in *emptypb.Empty) (*rtv1.ListTopicSubscriptionsResponse, error)
listInputBindFn func(context.Context, *emptypb.Empty) (*rtv1.ListInputBindingsResponse, error)
onBindingEventFn func(context.Context, *rtv1.BindingEventRequest) (*rtv1.BindingEventResponse, error)
healthCheckFn func(context.Context, *emptypb.Empty) (*rtv1.HealthCheckResponse, error)
grpcopts []procgrpc.Option
withRegister func(s *grpc.Server)
onTopicEventFn func(context.Context, *rtv1.TopicEventRequest) (*rtv1.TopicEventResponse, error)
onBulkTopicEventFn func(context.Context, *rtv1.TopicEventBulkRequest) (*rtv1.TopicEventBulkResponse, error)
onInvokeFn func(context.Context, *commonv1.InvokeRequest) (*commonv1.InvokeResponse, error)
listTopicSubFn func(ctx context.Context, in *emptypb.Empty) (*rtv1.ListTopicSubscriptionsResponse, error)
listInputBindFn func(context.Context, *emptypb.Empty) (*rtv1.ListInputBindingsResponse, error)
onBindingEventFn func(context.Context, *rtv1.BindingEventRequest) (*rtv1.BindingEventResponse, error)
healthCheckFn func(context.Context, *emptypb.Empty) (*rtv1.HealthCheckResponse, error)
}

func WithGRPCOptions(opts ...procgrpc.Option) func(*options) {
Expand All @@ -48,6 +49,12 @@ func WithOnTopicEventFn(fn func(context.Context, *rtv1.TopicEventRequest) (*rtv1
}
}

func WithOnBulkTopicEventFn(fn func(context.Context, *rtv1.TopicEventBulkRequest) (*rtv1.TopicEventBulkResponse, error)) func(*options) {
return func(opts *options) {
opts.onBulkTopicEventFn = fn
}
}

func WithOnInvokeFn(fn func(ctx context.Context, in *commonv1.InvokeRequest) (*commonv1.InvokeResponse, error)) func(*options) {
return func(opts *options) {
opts.onInvokeFn = fn
Expand Down
20 changes: 14 additions & 6 deletions tests/integration/framework/process/grpc/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,13 @@ import (
)

type server struct {
onInvokeFn func(context.Context, *commonv1.InvokeRequest) (*commonv1.InvokeResponse, error)
onTopicEventFn func(context.Context, *rtv1.TopicEventRequest) (*rtv1.TopicEventResponse, error)
listTopicSubFn func(context.Context, *emptypb.Empty) (*rtv1.ListTopicSubscriptionsResponse, error)
listInputBindFn func(context.Context, *emptypb.Empty) (*rtv1.ListInputBindingsResponse, error)
onBindingEventFn func(context.Context, *rtv1.BindingEventRequest) (*rtv1.BindingEventResponse, error)
healthCheckFn func(context.Context, *emptypb.Empty) (*rtv1.HealthCheckResponse, error)
onInvokeFn func(context.Context, *commonv1.InvokeRequest) (*commonv1.InvokeResponse, error)
onTopicEventFn func(context.Context, *rtv1.TopicEventRequest) (*rtv1.TopicEventResponse, error)
onBulkTopicEventFn func(context.Context, *rtv1.TopicEventBulkRequest) (*rtv1.TopicEventBulkResponse, error)
listTopicSubFn func(context.Context, *emptypb.Empty) (*rtv1.ListTopicSubscriptionsResponse, error)
listInputBindFn func(context.Context, *emptypb.Empty) (*rtv1.ListInputBindingsResponse, error)
onBindingEventFn func(context.Context, *rtv1.BindingEventRequest) (*rtv1.BindingEventResponse, error)
healthCheckFn func(context.Context, *emptypb.Empty) (*rtv1.HealthCheckResponse, error)
}

func (s *server) OnInvoke(ctx context.Context, in *commonv1.InvokeRequest) (*commonv1.InvokeResponse, error) {
Expand Down Expand Up @@ -66,6 +67,13 @@ func (s *server) OnTopicEvent(ctx context.Context, in *rtv1.TopicEventRequest) (
return s.onTopicEventFn(ctx, in)
}

func (s *server) OnBulkTopicEventAlpha1(ctx context.Context, in *rtv1.TopicEventBulkRequest) (*rtv1.TopicEventBulkResponse, error) {
if s.onBulkTopicEventFn == nil {
return new(rtv1.TopicEventBulkResponse), nil
}
return s.onBulkTopicEventFn(ctx, in)
}

func (s *server) HealthCheck(ctx context.Context, e *emptypb.Empty) (*rtv1.HealthCheckResponse, error) {
if s.healthCheckFn == nil {
return new(rtv1.HealthCheckResponse), nil
Expand Down
18 changes: 17 additions & 1 deletion tests/integration/framework/process/grpc/subscriber/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,21 @@ limitations under the License.

package subscriber

import (
"context"

"google.golang.org/protobuf/types/known/emptypb"

rtv1 "github.com/dapr/dapr/pkg/proto/runtime/v1"
)

// options contains the options for running a pubsub subscriber gRPC server app.
type options struct{}
type options struct {
listTopicSubFn func(ctx context.Context, in *emptypb.Empty) (*rtv1.ListTopicSubscriptionsResponse, error)
}

func WithListTopicSubscriptions(fn func(ctx context.Context, in *emptypb.Empty) (*rtv1.ListTopicSubscriptionsResponse, error)) func(*options) {
return func(opts *options) {
opts.listTopicSubFn = fn
}
}
49 changes: 44 additions & 5 deletions tests/integration/framework/process/grpc/subscriber/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,10 @@ import (
type Option func(*options)

type Subscriber struct {
app *app.App
inCh chan *rtv1.TopicEventRequest
closeCh chan struct{}
app *app.App
inCh chan *rtv1.TopicEventRequest
inBulkCh chan *rtv1.TopicEventBulkRequest
closeCh chan struct{}
}

func New(t *testing.T, fopts ...Option) *Subscriber {
Expand All @@ -43,12 +44,15 @@ func New(t *testing.T, fopts ...Option) *Subscriber {
}

inCh := make(chan *rtv1.TopicEventRequest, 100)
inBulkCh := make(chan *rtv1.TopicEventBulkRequest, 100)
closeCh := make(chan struct{})

return &Subscriber{
inCh: inCh,
closeCh: closeCh,
inCh: inCh,
inBulkCh: inBulkCh,
closeCh: closeCh,
app: app.New(t,
app.WithListTopicSubscriptions(opts.listTopicSubFn),
app.WithOnTopicEventFn(func(ctx context.Context, in *rtv1.TopicEventRequest) (*rtv1.TopicEventResponse, error) {
select {
case inCh <- in:
Expand All @@ -57,6 +61,21 @@ func New(t *testing.T, fopts ...Option) *Subscriber {
}
return new(rtv1.TopicEventResponse), nil
}),
app.WithOnBulkTopicEventFn(func(ctx context.Context, in *rtv1.TopicEventBulkRequest) (*rtv1.TopicEventBulkResponse, error) {
select {
case inBulkCh <- in:
case <-ctx.Done():
case <-closeCh:
}
stats := make([]*rtv1.TopicEventBulkResponseEntry, len(in.GetEntries()))
for i, e := range in.GetEntries() {
stats[i] = &rtv1.TopicEventBulkResponseEntry{
EntryId: e.GetEntryId(),
Status: rtv1.TopicEventResponse_SUCCESS,
}
}
return &rtv1.TopicEventBulkResponse{Statuses: stats}, nil
}),
),
}
}
Expand Down Expand Up @@ -92,11 +111,31 @@ func (s *Subscriber) Receive(t *testing.T, ctx context.Context) *rtv1.TopicEvent
}
}

func (s *Subscriber) ReceiveBulk(t *testing.T, ctx context.Context) *rtv1.TopicEventBulkRequest {
t.Helper()

ctx, cancel := context.WithTimeout(ctx, time.Second*3)
defer cancel()

select {
case <-ctx.Done():
require.Fail(t, "timed out waiting for event response")
return nil
case in := <-s.inBulkCh:
return in
}
}

func (s *Subscriber) AssertEventChanLen(t *testing.T, l int) {
t.Helper()
assert.Len(t, s.inCh, l)
}

func (s *Subscriber) AssertBulkEventChanLen(t *testing.T, l int) {
t.Helper()
assert.Len(t, s.inBulkCh, l)
}

func (s *Subscriber) ExpectPublishReceive(t *testing.T, ctx context.Context, daprd *daprd.Daprd, req *rtv1.PublishEventRequest) {
t.Helper()
_, err := daprd.GRPCClient(t, ctx).PublishEvent(ctx, req)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ limitations under the License.
package subscriber

import (
"bytes"
"context"
"encoding/json"
"fmt"
Expand All @@ -26,6 +27,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/dapr/dapr/pkg/runtime/pubsub"
"github.com/dapr/dapr/tests/integration/framework/process/daprd"
"github.com/dapr/dapr/tests/integration/framework/process/http/app"
"github.com/dapr/dapr/tests/integration/framework/util"
Expand All @@ -46,10 +48,24 @@ type PublishRequest struct {
DataContentType *string
}

type PublishBulkRequestEntry struct {
EntryID string `json:"entryId"`
Event string `json:"event"`
ContentType string `json:"contentType,omitempty"`
}

type PublishBulkRequest struct {
Daprd *daprd.Daprd
PubSubName string
Topic string
Entries []PublishBulkRequestEntry
}

type Subscriber struct {
app *app.App
client *http.Client
inCh chan *RouteEvent
inBulk chan *pubsub.BulkSubscribeEnvelope
closeCh chan struct{}
}

Expand All @@ -62,9 +78,10 @@ func New(t *testing.T, fopts ...Option) *Subscriber {
}

inCh := make(chan *RouteEvent, 100)
inBulk := make(chan *pubsub.BulkSubscribeEnvelope, 100)
closeCh := make(chan struct{})

appOpts := make([]app.Option, 0, len(opts.routes)+len(opts.handlerFuncs))
appOpts := make([]app.Option, 0, len(opts.routes)+len(opts.bulkRoutes)+len(opts.handlerFuncs))
for _, route := range opts.routes {
appOpts = append(appOpts, app.WithHandlerFunc(route, func(w http.ResponseWriter, r *http.Request) {
var ce event.Event
Expand All @@ -76,13 +93,39 @@ func New(t *testing.T, fopts ...Option) *Subscriber {
}
}))
}
for _, route := range opts.bulkRoutes {
appOpts = append(appOpts, app.WithHandlerFunc(route, func(w http.ResponseWriter, r *http.Request) {
var ce pubsub.BulkSubscribeEnvelope
require.NoError(t, json.NewDecoder(r.Body).Decode(&ce))
select {
case inBulk <- &ce:
case <-closeCh:
case <-r.Context().Done():
}

type statusT struct {
EntryID string `json:"entryId"`
Status string `json:"status"`
}
type respT struct {
Statuses []statusT `json:"statuses"`
}

var resp respT
for _, entry := range ce.Entries {
resp.Statuses = append(resp.Statuses, statusT{EntryID: entry.EntryId, Status: "SUCCESS"})
}
json.NewEncoder(w).Encode(resp)
}))
}

appOpts = append(appOpts, opts.handlerFuncs...)

return &Subscriber{
app: app.New(t, appOpts...),
client: util.HTTPClient(t),
inCh: inCh,
inBulk: inBulk,
closeCh: closeCh,
}
}
Expand Down Expand Up @@ -117,6 +160,21 @@ func (s *Subscriber) Receive(t *testing.T, ctx context.Context) *RouteEvent {
}
}

func (s *Subscriber) ReceiveBulk(t *testing.T, ctx context.Context) *pubsub.BulkSubscribeEnvelope {
t.Helper()

ctx, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()

select {
case <-ctx.Done():
require.Fail(t, "timed out waiting for event response")
return nil
case in := <-s.inBulk:
return in
}
}

func (s *Subscriber) AssertEventChanLen(t *testing.T, l int) {
t.Helper()
assert.Len(t, s.inCh, l)
Expand Down Expand Up @@ -151,6 +209,13 @@ func (s *Subscriber) Publish(t *testing.T, ctx context.Context, req PublishReque
require.Equal(t, http.StatusNoContent, resp.StatusCode)
}

func (s *Subscriber) PublishBulk(t *testing.T, ctx context.Context, req PublishBulkRequest) {
t.Helper()
//nolint:bodyclose
resp := s.publishBulk(t, ctx, req)
require.Equal(t, http.StatusNoContent, resp.StatusCode)
}

func (s *Subscriber) publish(t *testing.T, ctx context.Context, req PublishRequest) *http.Response {
t.Helper()
reqURL := fmt.Sprintf("http://%s/v1.0/publish/%s/%s", req.Daprd.HTTPAddress(), req.PubSubName, req.Topic)
Expand All @@ -164,3 +229,18 @@ func (s *Subscriber) publish(t *testing.T, ctx context.Context, req PublishReque
require.NoError(t, resp.Body.Close())
return resp
}

func (s *Subscriber) publishBulk(t *testing.T, ctx context.Context, req PublishBulkRequest) *http.Response {
t.Helper()

payload, err := json.Marshal(req.Entries)
require.NoError(t, err)
reqURL := fmt.Sprintf("http://%s/v1.0-alpha1/publish/bulk/%s/%s", req.Daprd.HTTPAddress(), req.PubSubName, req.Topic)
hreq, err := http.NewRequestWithContext(ctx, http.MethodPost, reqURL, bytes.NewReader(payload))
require.NoError(t, err)
hreq.Header.Add("Content-Type", "application/json")
resp, err := s.client.Do(hreq)
require.NoError(t, err)
require.NoError(t, resp.Body.Close())
return resp
}

0 comments on commit 036ec45

Please sign in to comment.