Skip to content

Commit 41ef5b2

Browse files
committed
refactor: Modularise packages and use custom rapidcore components
1 parent 41b9eb2 commit 41ef5b2

File tree

24 files changed

+1193
-437
lines changed

24 files changed

+1193
-437
lines changed

.github/workflows/build.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ jobs:
2929
name: aws-lambda-rie
3030
path: bin/*
3131
- name: Release binaries
32-
uses: softprops/action-gh-release@v2
32+
uses: softprops/action-gh-release@v2.2.2
3333
if: startsWith(github.ref, 'refs/tags/')
3434
with:
3535
files: bin/*

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,3 +7,4 @@ tags
77
.idea
88
.DS_Store
99
.venv
10+
.vscode

cmd/localstack/main.go

Lines changed: 149 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -4,28 +4,41 @@ package main
44

55
import (
66
"context"
7+
"fmt"
8+
"net"
79
"os"
10+
"os/signal"
811
"runtime/debug"
912
"strconv"
1013
"strings"
14+
"syscall"
1115

16+
"github.com/aws/aws-sdk-go-v2/config"
17+
"github.com/localstack/lambda-runtime-init/internal/aws/lambda"
1218
"github.com/localstack/lambda-runtime-init/internal/aws/xray"
1319
"github.com/localstack/lambda-runtime-init/internal/bootstrap"
20+
"github.com/localstack/lambda-runtime-init/internal/events"
1421
"github.com/localstack/lambda-runtime-init/internal/hotreloading"
22+
"github.com/localstack/lambda-runtime-init/internal/localstack"
1523
"github.com/localstack/lambda-runtime-init/internal/logging"
24+
"github.com/localstack/lambda-runtime-init/internal/sandbox"
25+
1626
"github.com/localstack/lambda-runtime-init/internal/server"
27+
28+
"github.com/localstack/lambda-runtime-init/internal/supervisor"
1729
"github.com/localstack/lambda-runtime-init/internal/tracing"
1830
"github.com/localstack/lambda-runtime-init/internal/utils"
1931
log "github.com/sirupsen/logrus"
2032
"go.amzn.com/lambda/core/directinvoke"
33+
"go.amzn.com/lambda/interop"
2134
"go.amzn.com/lambda/rapidcore"
2235
)
2336

24-
func InitLsOpts() *server.LsOpts {
25-
return &server.LsOpts{
37+
func InitLsOpts() *localstack.Config {
38+
return &localstack.Config{
2639
// required
27-
RuntimeEndpoint: utils.GetEnvOrDie("LOCALSTACK_RUNTIME_ENDPOINT"),
28-
RuntimeId: utils.GetEnvOrDie("LOCALSTACK_RUNTIME_ID"),
40+
RuntimeEndpoint: utils.MustGetEnv("LOCALSTACK_RUNTIME_ENDPOINT"),
41+
RuntimeId: utils.MustGetEnv("LOCALSTACK_RUNTIME_ID"),
2942
AccountId: utils.GetEnvWithDefault("LOCALSTACK_FUNCTION_ACCOUNT_ID", "000000000000"),
3043
// optional with default
3144
InteropPort: utils.GetEnvWithDefault("LOCALSTACK_INTEROP_PORT", "9563"),
@@ -45,6 +58,19 @@ func InitLsOpts() *server.LsOpts {
4558
}
4659
}
4760

61+
func InitFunctionConfig() lambda.FunctionConfig {
62+
return lambda.FunctionConfig{
63+
FunctionName: utils.GetEnvWithDefault("AWS_LAMBDA_FUNCTION_NAME", "test_function"),
64+
FunctionVersion: utils.GetEnvWithDefault("AWS_LAMBDA_FUNCTION_VERSION", "$LATEST"),
65+
FunctionTimeoutSec: utils.GetEnvWithDefault("AWS_LAMBDA_FUNCTION_TIMEOUT", "30"),
66+
InitializationType: utils.GetEnvWithDefault("AWS_LAMBDA_INITIALIZATION_TYPE", "on-demand"),
67+
LogGroupName: utils.GetEnvWithDefault("AWS_LAMBDA_LOG_GROUP_NAME", "/aws/lambda/Functions"),
68+
LogStreamName: utils.GetEnvWithDefault("AWS_LAMBDA_LOG_STREAM_NAME", "$LATEST"),
69+
FunctionMemorySizeMb: utils.GetEnvWithDefault("AWS_LAMBDA_FUNCTION_MEMORY_SIZE", "128"),
70+
FunctionHandler: utils.GetEnvWithDefault("AWS_LAMBDA_FUNCTION_HANDLER", os.Getenv("_HANDLER")),
71+
}
72+
}
73+
4874
// UnsetLsEnvs unsets environment variables specific to LocalStack to achieve better runtime parity with AWS
4975
func UnsetLsEnvs() {
5076
unsetList := [...]string{
@@ -75,42 +101,54 @@ func UnsetLsEnvs() {
75101
}
76102
}
77103

104+
func setupUserPermissions(user string) {
105+
logger := utils.UserLogger()
106+
// Switch to non-root user and drop root privileges
107+
if utils.IsRootUser() && user != "" && user != "root" {
108+
uid := 993
109+
gid := 990
110+
utils.AddUser(user, uid, gid)
111+
if err := os.Chown("/tmp", uid, gid); err != nil {
112+
log.WithError(err).Warn("Could not change owner of directory /tmp.")
113+
}
114+
logger.Debug("Process running as root user.")
115+
err := utils.DropPrivileges(user)
116+
if err != nil {
117+
log.WithError(err).Warn("Could not drop root privileges.")
118+
} else {
119+
logger.Debug("Process running as non-root user.")
120+
}
121+
}
122+
}
123+
78124
func main() {
79125
// we're setting this to the same value as in the official RIE
80126
debug.SetGCPercent(33)
81127

82128
// configuration parsing
83129
lsOpts := InitLsOpts()
130+
functionConf := InitFunctionConfig()
131+
awsEnvConf, _ := config.NewEnvConfig()
132+
awsEnvConf.Credentials.AccountID = lsOpts.AccountId
133+
84134
UnsetLsEnvs()
85135

86136
// set up logging following the Logrus logging levels: https://github.com/sirupsen/logrus#level-logging
87137
log.SetReportCaller(true)
88138
// https://docs.aws.amazon.com/xray/latest/devguide/xray-daemon-configuration.html
89-
xRayLogLevel := "info"
90-
switch lsOpts.InitLogLevel {
91-
case "trace":
139+
140+
logLevel, err := log.ParseLevel(lsOpts.InitLogLevel)
141+
if err != nil {
142+
log.Fatal("Invalid value for LOCALSTACK_INIT_LOG_LEVEL")
143+
}
144+
log.SetLevel(logLevel)
145+
146+
xRayLogLevel := lsOpts.InitLogLevel
147+
switch logLevel {
148+
case log.TraceLevel:
92149
log.SetFormatter(&log.JSONFormatter{})
93-
log.SetLevel(log.TraceLevel)
94-
xRayLogLevel = "debug"
95-
case "debug":
96-
log.SetLevel(log.DebugLevel)
97-
xRayLogLevel = "debug"
98-
case "info":
99-
log.SetLevel(log.InfoLevel)
100-
case "warn":
101-
log.SetLevel(log.WarnLevel)
102-
xRayLogLevel = "warn"
103-
case "error":
104-
log.SetLevel(log.ErrorLevel)
105-
xRayLogLevel = "error"
106-
case "fatal":
107-
log.SetLevel(log.FatalLevel)
150+
case log.ErrorLevel, log.FatalLevel, log.PanicLevel:
108151
xRayLogLevel = "error"
109-
case "panic":
110-
log.SetLevel(log.PanicLevel)
111-
xRayLogLevel = "error"
112-
default:
113-
log.Fatal("Invalid value for LOCALSTACK_INIT_LOG_LEVEL")
114152
}
115153

116154
// patch MaxPayloadSize
@@ -119,6 +157,10 @@ func main() {
119157
log.Panicln("Please specify a number for LOCALSTACK_MAX_PAYLOAD_SIZE")
120158
}
121159
directinvoke.MaxDirectResponseSize = int64(payloadSize)
160+
if directinvoke.MaxDirectResponseSize > interop.MaxPayloadSize {
161+
log.Infof("Large response size detected (%d bytes), forcing streaming mode", directinvoke.MaxDirectResponseSize)
162+
directinvoke.InvokeResponseMode = interop.InvokeResponseModeStreaming
163+
}
122164

123165
// download code archive if env variable is set
124166
if err := utils.DownloadCodeArchives(lsOpts.CodeArchives); err != nil {
@@ -133,97 +175,119 @@ func main() {
133175
bootstrap, handler := bootstrap.GetBootstrap(os.Args)
134176

135177
// Switch to non-root user and drop root privileges
136-
if utils.IsRootUser() && lsOpts.User != "" && lsOpts.User != "root" {
137-
uid := 993
138-
gid := 990
139-
utils.AddUser(lsOpts.User, uid, gid)
140-
if err := os.Chown("/tmp", uid, gid); err != nil {
141-
log.Warnln("Could not change owner of directory /tmp:", err)
142-
}
143-
utils.UserLogger().Debugln("Process running as root user.")
144-
err := utils.DropPrivileges(lsOpts.User)
145-
if err != nil {
146-
log.Warnln("Could not drop root privileges.", err)
147-
} else {
148-
utils.UserLogger().Debugln("Process running as non-root user.")
149-
}
150-
}
178+
setupUserPermissions(lsOpts.User)
151179

152-
// file watcher for hot-reloading
153-
fileWatcherContext, cancelFileWatcher := context.WithCancel(context.Background())
180+
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
181+
defer stop()
182+
183+
// LocalStack client used for sending callbacks
184+
lsClient := localstack.NewLocalStackClient(lsOpts.RuntimeEndpoint, lsOpts.RuntimeId)
185+
186+
// Services required for Sandbox environment
187+
interopServer := server.NewInteropServer(lsClient)
154188

155189
logCollector := logging.NewLogCollector()
156190
localStackLogsEgressApi := logging.NewLocalStackLogsEgressAPI(logCollector)
157191
tracer := tracing.NewLocalStackTracer()
192+
lsEventsAPI := events.NewLocalStackEventsAPI(lsClient)
193+
localStackSupv := supervisor.NewLocalStackSupervisor(ctx, lsEventsAPI)
158194

159195
// build sandbox
160-
sandbox := rapidcore.
196+
builder := rapidcore.
161197
NewSandboxBuilder().
162-
//SetTracer(tracer).
163-
AddShutdownFunc(func() {
164-
log.Debugln("Stopping file watcher")
165-
cancelFileWatcher()
166-
}).
167-
SetExtensionsFlag(true).
168-
SetInitCachingFlag(true).
198+
SetTracer(tracer).
199+
SetEventsAPI(lsEventsAPI).
200+
SetHandler(handler).
201+
SetInteropServer(interopServer).
169202
SetLogsEgressAPI(localStackLogsEgressApi).
170-
SetTracer(tracer)
203+
SetRuntimeAPIAddress("127.0.0.1:9000").
204+
SetSupervisor(localStackSupv).
205+
SetRuntimeFsRootPath("/")
206+
207+
builder.AddShutdownFunc(func() {
208+
interopServer.Close()
209+
})
210+
211+
// Start daemons
212+
213+
// file watcher for hot-reloading
214+
fileWatcherContext, cancelFileWatcher := context.WithCancel(ctx)
215+
builder.AddShutdownFunc(func() {
216+
cancelFileWatcher()
217+
})
218+
219+
go hotreloading.RunHotReloadingListener(interopServer, lsOpts.HotReloadingPaths, fileWatcherContext, lsOpts.FileWatcherStrategy)
171220

172-
// xray daemon
173-
endpoint := "http://" + lsOpts.LocalstackIP + ":" + lsOpts.EdgePort
221+
// Start xray daemon
222+
endpoint := "http://" + net.JoinHostPort(lsOpts.LocalstackIP, lsOpts.EdgePort)
174223
xrayConfig := xray.NewConfig(endpoint, xRayLogLevel)
175224
d := xray.NewDaemon(xrayConfig, lsOpts.EnableXRayTelemetry == "1")
176-
sandbox.AddShutdownFunc(func() {
225+
builder.AddShutdownFunc(func() {
177226
log.Debugln("Shutting down xray daemon")
178227
d.Stop()
179228
log.Debugln("Flushing segments in xray daemon")
180229
d.Close()
181230
})
182-
d.Run() // async
231+
d.Run() // served async
183232

184-
defaultInterop := sandbox.DefaultInteropServer()
185-
interopServer := server.NewCustomInteropServer(lsOpts, defaultInterop, logCollector)
186-
sandbox.SetInteropServer(interopServer)
187-
if len(handler) > 0 {
188-
sandbox.SetHandler(handler)
189-
}
190-
exitChan := make(chan struct{})
191-
sandbox.AddShutdownFunc(func() {
192-
exitChan <- struct{}{}
193-
})
233+
// Populate our interop server
234+
sandboxCtx, internalStateFn := builder.Create()
194235

195-
// initialize all flows and start runtime API
196-
sandboxContext, internalStateFn := sandbox.Create()
197-
// Populate our custom interop server
198-
interopServer.SetSandboxContext(sandboxContext)
236+
interopServer.SetSandboxContext(sandboxCtx)
199237
interopServer.SetInternalStateGetter(internalStateFn)
200238

201-
// get timeout
202-
invokeTimeoutEnv := utils.GetEnvOrDie("AWS_LAMBDA_FUNCTION_TIMEOUT") // TODO: collect all AWS_* env parsing
203-
invokeTimeoutSeconds, err := strconv.Atoi(invokeTimeoutEnv)
204-
if err != nil {
205-
log.Fatalln(err)
206-
}
207-
go hotreloading.RunHotReloadingListener(interopServer, lsOpts.HotReloadingPaths, fileWatcherContext, lsOpts.FileWatcherStrategy)
239+
// TODO(gregfurman): The default interops server has a shutdown function added in the builder.
240+
// This calls sandbox.Reset(), where if sandbox is not set, this will panic.
241+
// So instead, just set the sandbox to a noop version.
242+
builder.DefaultInteropServer().SetSandboxContext(sandbox.NewNoopSandboxContext())
243+
244+
// Create the LocalStack service
245+
localStackService := server.NewLocalStackService(
246+
interopServer, logCollector, lsClient, localStackSupv, xrayConfig.Endpoint, lsOpts, functionConf, awsEnvConf,
247+
)
248+
defer localStackService.Close()
208249

209250
// start runtime init. It is important to start `InitHandler` synchronously because we need to ensure the
210251
// notification channels and status fields are properly initialized before `AwaitInitialized`
211252
log.Debugln("Starting runtime init.")
212-
server.InitHandler(sandbox.LambdaInvokeAPI(), utils.GetEnvOrDie("AWS_LAMBDA_FUNCTION_VERSION"), int64(invokeTimeoutSeconds), bootstrap, lsOpts.AccountId) // TODO: replace this with a custom init
253+
if err := localStackService.Initialize(bootstrap); err != nil {
254+
log.WithError(err).Warnf("Failed to initialize runtime. Initialization will be retried in the next invoke.")
255+
}
256+
257+
invokeServer := server.NewServer(lsOpts.InteropPort, localStackService)
258+
defer invokeServer.Close()
259+
260+
serverErr := make(chan error, 1)
261+
go func() {
262+
listener, err := net.Listen("tcp", fmt.Sprintf(":%s", lsOpts.InteropPort))
263+
if err != nil {
264+
log.Fatalf("failed to start LocalStack Lambda Runtime Interface server: %s", err)
265+
}
266+
go func() { serverErr <- invokeServer.Serve(listener); close(serverErr) }()
267+
log.Debugf("LocalStack API gateway listening on %s", listener.Addr().String())
268+
}()
213269

214270
log.Debugln("Awaiting initialization of runtime init.")
215271
if err := interopServer.AwaitInitialized(); err != nil {
216272
// Error cases: ErrInitDoneFailed or ErrInitResetReceived
217273
log.Errorln("Runtime init failed to initialize: " + err.Error() + ". Exiting.")
218274
// NOTE: Sending the error status to LocalStack is handled beforehand in the custom_interop.go through the
219275
// callback SendInitErrorResponse because it contains the correct error response payload.
220-
return
276+
// return
277+
} else {
278+
log.Debugln("Completed initialization of runtime init. Sending status ready to LocalStack.")
279+
if err := localStackService.SendStatus(localstack.Ready, []byte{}); err != nil {
280+
log.Fatalln("Failed to send status ready to LocalStack " + err.Error() + ". Exiting.")
281+
}
221282
}
222283

223-
log.Debugln("Completed initialization of runtime init. Sending status ready to LocalStack.")
224-
if err := interopServer.SendStatus(server.Ready, []byte{}); err != nil {
225-
log.Fatalln("Failed to send status ready to LocalStack " + err.Error() + ". Exiting.")
284+
// Block until context is cancelled OR the server errors out
285+
select {
286+
case <-ctx.Done():
287+
log.Info("Shutdown signal received.")
288+
case <-serverErr:
289+
if err != nil {
290+
log.Errorf("Server error: %v", err)
291+
}
226292
}
227-
228-
<-exitChan
229293
}

cmd/ls-api/main.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,12 @@ import (
55
"bytes"
66
"encoding/json"
77
"fmt"
8+
"io"
9+
"net/http"
10+
811
"github.com/go-chi/chi"
912
"github.com/go-chi/chi/middleware"
1013
log "github.com/sirupsen/logrus"
11-
"io"
12-
"net/http"
1314
)
1415

1516
const apiPort = 9563
@@ -33,7 +34,7 @@ func main() {
3334
invokeRequest, _ := json.Marshal(InvokeRequest{InvokeId: uid, Payload: "{\"counter\":0}"})
3435
_, err := http.Post(invokeUrl, "application/json", bytes.NewReader(invokeRequest))
3536
if err != nil {
36-
log.Fatal(err)
37+
log.Error(err)
3738
}
3839

3940
w.WriteHeader(200)
@@ -47,7 +48,7 @@ func main() {
4748
invokeRequest, _ := json.Marshal(InvokeRequest{InvokeId: uid, Payload: "{\"counter\":0, \"fail\": \"yes\"}"})
4849
_, err := http.Post(invokeUrl, "application/json", bytes.NewReader(invokeRequest))
4950
if err != nil {
50-
log.Fatal(err)
51+
log.Error(err)
5152
}
5253

5354
w.WriteHeader(200)
@@ -57,6 +58,7 @@ func main() {
5758
}
5859
})
5960

61+
log.Infof("Listening on port :%d", listenPort)
6062
err := http.ListenAndServe(fmt.Sprintf(":%d", listenPort), router)
6163
if err != nil {
6264
log.Fatal(err)
@@ -74,7 +76,7 @@ func invokeLogsHandler(w http.ResponseWriter, r *http.Request) {
7476
}
7577

7678
type InvokeRequest struct {
77-
InvokeId string `json:"invoke-id"`
79+
InvokeId string `json:"request-id"`
7880
Payload string `json:"payload"`
7981
}
8082

0 commit comments

Comments
 (0)