Skip to content

Commit 8ccfec9

Browse files
authored
Merge pull request #2 from m-mizutani/feature/fdr
implement Falcon Data Replicator
2 parents 9c23392 + d158769 commit 8ccfec9

File tree

10 files changed

+342
-59
lines changed

10 files changed

+342
-59
lines changed

pkg/actions/fdr/action.go

Lines changed: 98 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,45 @@ package fdr
22

33
import (
44
"context"
5+
"encoding/json"
6+
"errors"
7+
"io"
58

69
"github.com/aws/aws-sdk-go/aws"
710
"github.com/aws/aws-sdk-go/aws/credentials"
811
"github.com/aws/aws-sdk-go/aws/session"
12+
"github.com/aws/aws-sdk-go/service/s3"
913
"github.com/aws/aws-sdk-go/service/sqs"
1014
"github.com/m-mizutani/goerr"
1115
"github.com/m-mizutani/hatchery/pkg/domain/config"
16+
"github.com/m-mizutani/hatchery/pkg/domain/interfaces"
17+
"github.com/m-mizutani/hatchery/pkg/domain/model"
1218
"github.com/m-mizutani/hatchery/pkg/infra"
1319
"github.com/m-mizutani/hatchery/pkg/utils"
1420
)
1521

22+
type fdrMessage struct {
23+
Bucket string `json:"bucket"`
24+
Cid string `json:"cid"`
25+
FileCount int64 `json:"fileCount"`
26+
Files []file `json:"files"`
27+
PathPrefix string `json:"pathPrefix"`
28+
Timestamp int64 `json:"timestamp"`
29+
TotalSize int64 `json:"totalSize"`
30+
}
31+
32+
type file struct {
33+
Checksum string `json:"checksum"`
34+
Path string `json:"path"`
35+
Size int64 `json:"size"`
36+
}
37+
38+
type fdrClients struct {
39+
infra *infra.Clients
40+
sqs interfaces.SQS
41+
s3 interfaces.S3
42+
}
43+
1644
func Exec(ctx context.Context, clients *infra.Clients, req *config.FalconDataReplicatorImpl) error {
1745
// Create an AWS session
1846
awsSession, err := session.NewSession(&aws.Config{
@@ -28,71 +56,91 @@ func Exec(ctx context.Context, clients *infra.Clients, req *config.FalconDataRep
2856
return goerr.Wrap(err, "failed to create AWS session").With("req", req)
2957
}
3058

31-
// Create an SQS client
32-
sqsClient := sqs.New(awsSession)
59+
// Create AWS service clients
60+
sqsClient := clients.NewSQS(awsSession)
61+
s3Client := clients.NewS3(awsSession)
62+
prefix := config.LogObjNamePrefix(req, utils.CtxNow(ctx))
3363

3464
// Receive messages from SQS queue
35-
result, err := sqsClient.ReceiveMessageWithContext(ctx, &sqs.ReceiveMessageInput{
36-
QueueUrl: aws.String(req.SqsUrl),
37-
MaxNumberOfMessages: aws.Int64(10),
38-
})
39-
if err != nil {
40-
return goerr.Wrap(err, "failed to receive messages from SQS").With("req", req)
65+
input := &sqs.ReceiveMessageInput{
66+
QueueUrl: aws.String(req.SqsUrl),
67+
}
68+
if req.MaxMessages != nil {
69+
input.MaxNumberOfMessages = aws.Int64(int64(*req.MaxMessages))
4170
}
4271

43-
utils.CtxLogger(ctx).Info("FDR: received messages from SQS", "count", len(result.Messages))
44-
/*
45-
prefix := config.ToObjNamePrefix(req, utils.CtxNow(ctx))
72+
for i := 0; ; i++ {
73+
if req.MaxPulls != nil && i >= *req.MaxPulls {
74+
break
75+
}
76+
77+
c := &fdrClients{infra: clients, sqs: sqsClient, s3: s3Client}
78+
if err := copy(ctx, c, input, model.CSBucket(req.Bucket), prefix); err != nil {
79+
if err == errNoMoreMessage {
80+
break
81+
}
82+
return err
83+
}
84+
}
85+
86+
return nil
87+
}
88+
89+
var (
90+
errNoMoreMessage = errors.New("no more message")
91+
)
4692

47-
// Iterate over received messages
48-
for _, message := range result.Messages {
49-
// Get the S3 object key from the message
50-
s3ObjectKey := *message.Body
93+
func copy(ctx context.Context, clients *fdrClients, input *sqs.ReceiveMessageInput, bucket model.CSBucket, prefix model.CSObjectName) error {
94+
result, err := clients.sqs.ReceiveMessageWithContext(ctx, input)
95+
if err != nil {
96+
return goerr.Wrap(err, "failed to receive messages from SQS").With("input", input)
97+
}
98+
if len(result.Messages) == 0 {
99+
return errNoMoreMessage
100+
}
51101

102+
// Iterate over received messages
103+
for _, message := range result.Messages {
104+
// Get the S3 object key from the message
105+
var msg fdrMessage
106+
if err := json.Unmarshal([]byte(*message.Body), &msg); err != nil {
107+
return goerr.Wrap(err, "failed to unmarshal message").With("message", *message.Body)
108+
}
109+
110+
for _, file := range msg.Files {
52111
// Download the object from S3
53-
s3Client := s3.New(awsSession)
54-
s3ObjectOutput, err := s3Client.GetObjectWithContext(ctx, &s3.GetObjectInput{
55-
Bucket: aws.String(req.S3Bucket),
56-
Key: aws.String(s3ObjectKey),
57-
})
58-
if err != nil {
59-
log.Printf("failed to download object from S3: %v", err)
60-
continue
112+
s3Input := &s3.GetObjectInput{
113+
Bucket: aws.String(msg.Bucket),
114+
Key: aws.String(file.Path),
61115
}
62-
63-
w := clients.CloudStorage().NewObjectWriter(ctx, model.CSBucket(req.GetBucket()), objName)
64-
// Upload the object to Google Cloud Storage
65-
gcsClient, err := storage.NewService(ctx, option.WithCredentialsFile(req.GCSCredentialsFile))
116+
s3Obj, err := clients.s3.GetObjectWithContext(ctx, s3Input)
66117
if err != nil {
67-
log.Printf("failed to create GCS client: %v", err)
68-
continue
118+
return goerr.Wrap(err, "failed to download object from S3").With("msg", msg)
69119
}
120+
defer utils.SafeClose(s3Obj.Body)
70121

71-
objectName := uuid.New().String() // Generate a unique object name
72-
gcsObject := &storage.Object{
73-
Name: objectName,
74-
Bucket: req.Bucket,
75-
Metadata: make(map[string]string),
76-
}
122+
csObj := prefix + model.CSObjectName(file.Path)
123+
w := clients.infra.CloudStorage().NewObjectWriter(ctx, bucket, csObj)
77124

78-
// Copy the object data to GCS
79-
_, err = gcsClient.Objects.Insert(req.Bucket, gcsObject).Media(s3ObjectOutput.Body).Do()
80-
if err != nil {
81-
log.Printf("failed to upload object to GCS: %v", err)
82-
continue
125+
if _, err := io.Copy(w, s3Obj.Body); err != nil {
126+
return goerr.Wrap(err, "failed to write object to GCS").With("msg", msg)
83127
}
84-
85-
// Delete the message from SQS
86-
_, err = sqsClient.DeleteMessageWithContext(ctx, &sqs.DeleteMessageInput{
87-
QueueUrl: aws.String(req.SqsUrl),
88-
ReceiptHandle: message.ReceiptHandle,
89-
})
90-
if err != nil {
91-
return goerr.Wrap(err, "failed to delete message from SQS").With("req", req)
128+
if err := w.Close(); err != nil {
129+
return goerr.Wrap(err, "failed to close object writer").With("msg", msg)
92130
}
93131

94-
utils.CtxLogger(ctx).Info("FDR: object forwarded from S3 to GCS", "s3ObjectKey", s3ObjectKey, "gcsObjectName", objectName)
132+
utils.CtxLogger(ctx).Info("FDR: object forwarded from S3 to GCS", "s3", s3Input, "gcsObj", csObj)
95133
}
96-
*/
134+
135+
// Delete the message from SQS
136+
_, err = clients.sqs.DeleteMessageWithContext(ctx, &sqs.DeleteMessageInput{
137+
QueueUrl: input.QueueUrl,
138+
ReceiptHandle: message.ReceiptHandle,
139+
})
140+
if err != nil {
141+
return goerr.Wrap(err, "failed to delete message from SQS")
142+
}
143+
}
144+
97145
return nil
98146
}

pkg/actions/fdr/action_test.go

Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
package fdr_test
2+
3+
import (
4+
"bytes"
5+
"context"
6+
_ "embed"
7+
"errors"
8+
"io"
9+
"testing"
10+
"time"
11+
12+
"github.com/aws/aws-sdk-go/aws"
13+
"github.com/aws/aws-sdk-go/aws/request"
14+
"github.com/aws/aws-sdk-go/aws/session"
15+
"github.com/aws/aws-sdk-go/service/s3"
16+
"github.com/aws/aws-sdk-go/service/sqs"
17+
"github.com/m-mizutani/gt"
18+
"github.com/m-mizutani/hatchery/pkg/actions/fdr"
19+
"github.com/m-mizutani/hatchery/pkg/domain/config"
20+
"github.com/m-mizutani/hatchery/pkg/domain/interfaces"
21+
"github.com/m-mizutani/hatchery/pkg/infra"
22+
"github.com/m-mizutani/hatchery/pkg/infra/cs"
23+
"github.com/m-mizutani/hatchery/pkg/utils"
24+
)
25+
26+
type mockSQS struct {
27+
FnDeleteMessage func(ctx context.Context, input *sqs.DeleteMessageInput, opts ...request.Option) (*sqs.DeleteMessageOutput, error)
28+
FnReceiveMessage func(ctx context.Context, input *sqs.ReceiveMessageInput, opts ...request.Option) (*sqs.ReceiveMessageOutput, error)
29+
messages []*sqs.ReceiveMessageOutput
30+
}
31+
32+
// DeleteMessageWithContext implements interfaces.SQS.
33+
func (m *mockSQS) DeleteMessageWithContext(ctx context.Context, input *sqs.DeleteMessageInput, opts ...request.Option) (*sqs.DeleteMessageOutput, error) {
34+
return m.FnDeleteMessage(ctx, input, opts...)
35+
}
36+
37+
// ReceiveMessageWithContext implements interfaces.SQS.
38+
func (m *mockSQS) ReceiveMessageWithContext(ctx context.Context, input *sqs.ReceiveMessageInput, opts ...request.Option) (*sqs.ReceiveMessageOutput, error) {
39+
if m.FnReceiveMessage != nil {
40+
return m.FnReceiveMessage(ctx, input, opts...)
41+
}
42+
43+
if len(m.messages) == 0 {
44+
return &sqs.ReceiveMessageOutput{
45+
Messages: []*sqs.Message{},
46+
}, nil
47+
}
48+
49+
msg := m.messages[0]
50+
m.messages = m.messages[1:]
51+
return msg, nil
52+
}
53+
54+
var _ interfaces.SQS = &mockSQS{}
55+
56+
type mockS3 struct {
57+
DataSet [][]byte
58+
}
59+
60+
// GetObjectWithContext implements interfaces.S3.
61+
func (m *mockS3) GetObjectWithContext(ctx context.Context, input *s3.GetObjectInput, opts ...request.Option) (*s3.GetObjectOutput, error) {
62+
if len(m.DataSet) == 0 {
63+
return nil, errors.New("no data")
64+
}
65+
66+
data := m.DataSet[0]
67+
m.DataSet = m.DataSet[1:]
68+
return &s3.GetObjectOutput{
69+
Body: io.NopCloser(bytes.NewReader(data)),
70+
}, nil
71+
}
72+
73+
var _ interfaces.S3 = &mockS3{}
74+
75+
//go:embed testdata/body.json
76+
var bodyJSON string
77+
78+
func TestFalconDataReplicator(t *testing.T) {
79+
var calledRecv, calledDelete int
80+
81+
mockCS := cs.NewMock()
82+
mockSQS := &mockSQS{
83+
FnDeleteMessage: func(ctx context.Context, input *sqs.DeleteMessageInput, opts ...request.Option) (*sqs.DeleteMessageOutput, error) {
84+
calledDelete++
85+
gt.Equal(t, "test-sqs-url", *input.QueueUrl)
86+
gt.Equal(t, "test-receipt-handle", *input.ReceiptHandle)
87+
return nil, nil
88+
},
89+
FnReceiveMessage: func(ctx context.Context, input *sqs.ReceiveMessageInput, opts ...request.Option) (*sqs.ReceiveMessageOutput, error) {
90+
calledRecv++
91+
gt.Equal(t, "test-sqs-url", *input.QueueUrl)
92+
93+
if calledRecv > 1 {
94+
return &sqs.ReceiveMessageOutput{
95+
Messages: []*sqs.Message{},
96+
}, nil
97+
}
98+
return &sqs.ReceiveMessageOutput{
99+
Messages: []*sqs.Message{
100+
{
101+
Body: &bodyJSON,
102+
ReceiptHandle: aws.String("test-receipt-handle"),
103+
},
104+
},
105+
}, nil
106+
},
107+
}
108+
mockS3 := &mockS3{
109+
DataSet: [][]byte{
110+
[]byte("test-data-1"),
111+
[]byte("test-data-2"),
112+
},
113+
}
114+
clients := infra.New(
115+
infra.WithCloudStorage(mockCS),
116+
infra.WithNewSQS(func(s *session.Session) interfaces.SQS { return mockSQS }),
117+
infra.WithNewS3(func(s *session.Session) interfaces.S3 { return mockS3 }),
118+
)
119+
120+
now := time.Date(2021, 9, 1, 2, 3, 0, 0, time.UTC)
121+
ctx := utils.CtxWithNow(context.Background(), func() time.Time { return now })
122+
gt.NoError(t, fdr.Exec(ctx, clients, &config.FalconDataReplicatorImpl{
123+
AwsRegion: "us-west-2",
124+
Bucket: "test-bucket",
125+
AwsAccessKeyId: "test-access-key",
126+
AwsSecretAccessKey: "test-secret",
127+
SqsUrl: "test-sqs-url",
128+
}))
129+
gt.V(t, calledDelete).Equal(1)
130+
gt.V(t, calledRecv).Equal(2)
131+
gt.A(t, mockCS.Results).Length(2).
132+
At(0, func(t testing.TB, v *cs.MockResult) {
133+
gt.Equal(t, v.Bucket, "test-bucket")
134+
gt.Equal(t, v.Object, "logs/2021/09/01/02/03/dAnpZeYcYD1J1B00-9f25c8f9/data/C246521D-D19E-43DD-9EB9-4EEE07F53D5A/part-00000.gz")
135+
}).
136+
At(1, func(t testing.TB, v *cs.MockResult) {
137+
gt.Equal(t, v.Bucket, "test-bucket")
138+
gt.Equal(t, v.Object, "logs/2021/09/01/02/03/dAnpZeYcYD1J1B00-9f25c8f9/data/C246521D-D19E-43DD-9EB9-4EEE07F53D5A/part-00001.gz")
139+
})
140+
}

pkg/actions/fdr/testdata/body.json

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
{
2+
"cid": "mi7ue3RAU0g0S82auotRRpL2qWjtG302",
3+
"timestamp": 1709880887084,
4+
"fileCount": 2,
5+
"totalSize": 18402486,
6+
"bucket": "cs-mav-asjkshdfkljahsdfkjhljhfa-s3alias",
7+
"pathPrefix": "dAnpZeYcYD1J1B00-9f25c8f9/data/C246521D-D19E-43DD-9EB9-4EEE07F53D5A",
8+
"files": [
9+
{
10+
"path": "dAnpZeYcYD1J1B00-9f25c8f9/data/C246521D-D19E-43DD-9EB9-4EEE07F53D5A/part-00000.gz",
11+
"size": 18402486,
12+
"checksum": "fb4a23224f90bd4065868e6de6f2e5a1"
13+
},
14+
{
15+
"path": "dAnpZeYcYD1J1B00-9f25c8f9/data/C246521D-D19E-43DD-9EB9-4EEE07F53D5A/part-00001.gz",
16+
"size": 18402486,
17+
"checksum": "fb4a23224f90bd4065868e6de6f2e5a1"
18+
}
19+
]
20+
}

pkg/actions/one_password/action.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ func Exec(ctx context.Context, clients *infra.Clients, req *config.OnePasswordIm
4848
func crawl(ctx context.Context, clients *infra.Clients, req *config.OnePasswordImpl, end time.Time, seq int, cursor string) (*string, error) {
4949
d := req.GetDuration().GoDuration()
5050

51-
objPrefix := config.ToObjNamePrefix(req, end)
51+
objPrefix := config.LogObjNamePrefix(req, end)
5252
objName := model.CSObjectName(
5353
fmt.Sprintf("%s_%d_%d.json.gz", objPrefix, d/time.Second, seq),
5454
)

pkg/controller/cli/exec.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"sync"
77

88
"github.com/m-mizutani/goerr"
9+
"github.com/m-mizutani/hatchery/pkg/actions/fdr"
910
"github.com/m-mizutani/hatchery/pkg/actions/one_password"
1011
"github.com/m-mizutani/hatchery/pkg/domain/config"
1112
"github.com/m-mizutani/hatchery/pkg/domain/model"
@@ -116,11 +117,10 @@ func execute(ctx context.Context, clients *infra.Clients, action config.Action)
116117
case *config.OnePasswordImpl:
117118
return one_password.Exec(ctx, clients, v)
118119
case *config.FalconDataReplicatorImpl:
119-
return nil
120+
return fdr.Exec(ctx, clients, v)
120121
default:
121122
return goerr.Wrap(model.ErrAssertFailed, "unknown action type").With("action", action)
122123
}
123-
124124
}
125125

126126
func execDryRun(rt *runtime, actionIDs []string) error {

0 commit comments

Comments
 (0)