diff --git a/cmd/localstack/awsutil.go b/cmd/localstack/awsutil.go index 31235e4..d7cdf5c 100644 --- a/cmd/localstack/awsutil.go +++ b/cmd/localstack/awsutil.go @@ -92,7 +92,7 @@ func getBootstrap(args []string) (interop.Bootstrap, string) { return NewSimpleBootstrap(bootstrapLookupCmd, currentWorkingDir), handler } -func PrintEndReports(invokeId string, initDuration string, memorySize string, invokeStart time.Time, timeoutDuration time.Duration, w io.Writer) { +func PrintEndReports(invokeId string, initDuration string, status string, memorySize string, invokeStart time.Time, timeoutDuration time.Duration, w io.Writer) { // Calculate invoke duration invokeDuration := math.Min(float64(time.Now().Sub(invokeStart).Nanoseconds()), float64(timeoutDuration.Nanoseconds())) / float64(time.Millisecond) @@ -102,11 +102,12 @@ func PrintEndReports(invokeId string, initDuration string, memorySize string, in // not a clean way to get this information from rapidcore _, _ = fmt.Fprintf(w, "REPORT RequestId: %s\t"+ - initDuration+ "Duration: %.2f ms\t"+ "Billed Duration: %.f ms\t"+ "Memory Size: %s MB\t"+ - "Max Memory Used: %s MB\t\n", + "Max Memory Used: %s MB\t"+ + initDuration+ + status+"\n", invokeId, invokeDuration, math.Ceil(invokeDuration), memorySize, memorySize) } diff --git a/cmd/localstack/custom_interop.go b/cmd/localstack/custom_interop.go index 6b89d65..a5fa71c 100644 --- a/cmd/localstack/custom_interop.go +++ b/cmd/localstack/custom_interop.go @@ -27,6 +27,11 @@ type CustomInteropServer struct { localStackAdapter *LocalStackAdapter port string upstreamEndpoint string + // initStart is set once in Init() and warmStart is flipped on the first invoke. + // Both are accessed only from the single sequential init -> invoke flow (the RIE + // processes one invocation at a time), so they need no additional synchronization. + initStart time.Time + warmStart bool } type LocalStackAdapter struct { @@ -43,10 +48,11 @@ const ( func (l *LocalStackAdapter) SendStatus(status LocalStackStatus, payload []byte) error { statusUrl := fmt.Sprintf("%s/status/%s/%s", l.UpstreamEndpoint, l.RuntimeId, status) - _, err := http.Post(statusUrl, "application/json", bytes.NewReader(payload)) + resp, err := http.Post(statusUrl, "application/json", bytes.NewReader(payload)) if err != nil { return err } + defer resp.Body.Close() return nil } @@ -56,8 +62,12 @@ func (l *LocalStackAdapter) SendLogs(invokeId string, logs LogResponse) error { if err != nil { return err } - _, err = http.Post(l.UpstreamEndpoint+"/invocations/"+invokeId+"/logs", "application/json", bytes.NewReader(serialized)) - return err + resp, err := http.Post(l.UpstreamEndpoint+"/invocations/"+invokeId+"/logs", "application/json", bytes.NewReader(serialized)) + if err != nil { + return err + } + defer resp.Body.Close() + return nil } // SendResult posts the invocation result body to LocalStack. @@ -77,8 +87,12 @@ func (l *LocalStackAdapter) SendResult(invokeId string, body []byte, isError boo } else { log.Infoln("Sending to /response") } - _, err := http.Post(l.UpstreamEndpoint+endpoint, "application/json", bytes.NewReader(body)) - return err + resp, err := http.Post(l.UpstreamEndpoint+endpoint, "application/json", bytes.NewReader(body)) + if err != nil { + return err + } + defer resp.Body.Close() + return nil } // The InvokeRequest is sent by LocalStack to trigger an invocation @@ -87,25 +101,25 @@ type InvokeRequest struct { InvokedFunctionArn string `json:"invoked-function-arn"` Payload string `json:"payload"` TraceId string `json:"trace-id"` + IsInitRetry bool `json:"is-init-retry,omitempty"` } // The ErrorResponse is sent TO LocalStack when encountering an error type ErrorResponse struct { - ErrorMessage string `json:"errorMessage"` - ErrorType string `json:"errorType,omitempty"` - RequestId string `json:"requestId,omitempty"` - StackTrace []string `json:"stackTrace,omitempty"` + ErrorMessage string `json:"errorMessage"` + ErrorType string `json:"errorType,omitempty"` + // RequestId uses *string so that an empty string "" is serialized (not omitted), + // while nil is omitted — init errors always set this field, fault events leave it nil. + RequestId *string `json:"requestId,omitempty"` + StackTrace []string `json:"stackTrace,omitempty"` } -func NewCustomInteropServer(lsOpts *LsOpts, delegate interop.Server, logCollector *LogCollector) (server *CustomInteropServer) { +func NewCustomInteropServer(lsOpts *LsOpts, adapter *LocalStackAdapter, delegate interop.Server, logCollector *LogCollector) (server *CustomInteropServer) { server = &CustomInteropServer{ - delegate: delegate.(*rapidcore.Server), - port: lsOpts.InteropPort, - upstreamEndpoint: lsOpts.RuntimeEndpoint, - localStackAdapter: &LocalStackAdapter{ - UpstreamEndpoint: lsOpts.RuntimeEndpoint, - RuntimeId: lsOpts.RuntimeId, - }, + delegate: delegate.(*rapidcore.Server), + port: lsOpts.InteropPort, + upstreamEndpoint: lsOpts.RuntimeEndpoint, + localStackAdapter: adapter, } // TODO: extract this @@ -128,6 +142,13 @@ func NewCustomInteropServer(lsOpts *LsOpts, delegate interop.Server, logCollecto functionVersion := GetEnvOrDie("AWS_LAMBDA_FUNCTION_VERSION") // default $LATEST _, _ = fmt.Fprintf(logCollector, "START RequestId: %s Version: %s\n", invokeR.InvokeId, functionVersion) + initDuration := "" + if !server.warmStart && !invokeR.IsInitRetry { + initTimeMS := float64(time.Since(server.initStart).Nanoseconds()) / float64(time.Millisecond) + initDuration = fmt.Sprintf("Init Duration: %.2f ms\t", initTimeMS) + } + server.warmStart = true + invokeStart := time.Now() err = server.Invoke(invokeResp, &interop.Invoke{ ID: invokeR.InvokeId, @@ -149,15 +170,17 @@ func NewCustomInteropServer(lsOpts *LsOpts, delegate interop.Server, logCollecto }) timeout := int(server.delegate.GetInvokeTimeout().Seconds()) isErr := false + status := "" if err != nil { switch { case errors.Is(err, rapidcore.ErrInvokeTimeout): log.Debugf("Got invoke timeout") isErr = true + status = "Status: timeout" errorResponse := ErrorResponse{ + ErrorType: "Sandbox.Timedout", ErrorMessage: fmt.Sprintf( - "%s %s Task timed out after %d.00 seconds", - time.Now().Format("2006-01-02T15:04:05Z"), + "RequestId: %s Error: Task timed out after %d.00 seconds", invokeR.InvokeId, timeout, ), @@ -186,7 +209,7 @@ func NewCustomInteropServer(lsOpts *LsOpts, delegate interop.Server, logCollecto } timeoutDuration := time.Duration(timeout) * time.Second memorySize := GetEnvOrDie("AWS_LAMBDA_FUNCTION_MEMORY_SIZE") - PrintEndReports(invokeR.InvokeId, "", memorySize, invokeStart, timeoutDuration, logCollector) + PrintEndReports(invokeR.InvokeId, initDuration, status, memorySize, invokeStart, timeoutDuration, logCollector) if err2 := server.localStackAdapter.SendLogs(invokeR.InvokeId, logCollector.getLogs()); err2 != nil { log.Error("failed to send logs to LocalStack: ", err2) @@ -219,12 +242,45 @@ func (c *CustomInteropServer) SendErrorResponse(invokeID string, resp *interop.E return c.delegate.SendErrorResponse(invokeID, resp) } -// SendInitErrorResponse writes error response during init to a shared memory and sends GIRD FAULT. +// SendInitErrorResponse forwards the init error to LocalStack and then propagates it to the delegate. func (c *CustomInteropServer) SendInitErrorResponse(resp *interop.ErrorInvokeResponse) error { log.Traceln("SendInitErrorResponse called") - if err := c.localStackAdapter.SendStatus(Error, resp.Payload); err != nil { - log.Fatalln("Failed to send init error to LocalStack " + err.Error() + ". Exiting.") + + // Deserialize the raw payload so we can include the requestId and structured fields. + var parsed struct { + ErrorMessage string `json:"errorMessage"` + ErrorType string `json:"errorType"` + StackTrace []string `json:"stackTrace,omitempty"` } + if err := json.Unmarshal(resp.Payload, &parsed); err != nil { + log.WithError(err).Warn("Failed to parse init error payload; forwarding raw payload") + if err := c.localStackAdapter.SendStatus(Error, resp.Payload); err != nil { + log.WithError(err).WithField("runtime-id", c.localStackAdapter.RuntimeId). + Error("Failed to send init error to LocalStack") + } + return c.delegate.SendInitErrorResponse(resp) + } + + requestId := c.delegate.GetCurrentInvokeID() + adaptedResp := ErrorResponse{ + ErrorMessage: parsed.ErrorMessage, + ErrorType: parsed.ErrorType, + RequestId: &requestId, + StackTrace: parsed.StackTrace, + } + body, err := json.Marshal(adaptedResp) + if err != nil { + log.WithError(err).Error("Failed to marshal adapted init error response") + body = resp.Payload + } + + go func() { + if err := c.localStackAdapter.SendStatus(Error, body); err != nil { + log.WithError(err).WithField("runtime-id", c.localStackAdapter.RuntimeId). + Error("Failed to send init error to LocalStack") + } + }() + return c.delegate.SendInitErrorResponse(resp) } @@ -240,6 +296,7 @@ func (c *CustomInteropServer) SendRuntimeReady() error { func (c *CustomInteropServer) Init(i *interop.Init, invokeTimeoutMs int64) error { log.Traceln("Init called") + c.initStart = time.Now() return c.delegate.Init(i, invokeTimeoutMs) } diff --git a/cmd/localstack/events.go b/cmd/localstack/events.go new file mode 100644 index 0000000..4718b93 --- /dev/null +++ b/cmd/localstack/events.go @@ -0,0 +1,62 @@ +package main + +import ( + "encoding/json" + "fmt" + "sync" + + "github.com/aws/aws-lambda-runtime-interface-emulator/internal/lambda/interop" + "github.com/aws/aws-lambda-runtime-interface-emulator/internal/lambda/rapidcore/standalone/telemetry" + "github.com/google/uuid" +) + +// LocalStackEventsAPI intercepts fault events and forwards them to LocalStack as error status callbacks. +type LocalStackEventsAPI struct { + *telemetry.StandaloneEventsAPI + adapter *LocalStackAdapter + requestID string + mu sync.RWMutex +} + +func NewLocalStackEventsAPI(adapter *LocalStackAdapter) *LocalStackEventsAPI { + return &LocalStackEventsAPI{ + adapter: adapter, + StandaloneEventsAPI: new(telemetry.StandaloneEventsAPI), + } +} + +func (ev *LocalStackEventsAPI) SendFault(data interop.FaultData) error { + _ = ev.StandaloneEventsAPI.SendFault(data) + + requestID := string(data.RequestID) + if requestID == "" { + ev.mu.RLock() + requestID = ev.requestID + ev.mu.RUnlock() + } + if requestID == "" { + // No invocation is active during the init phase (LocalStack only dispatches an invoke + // after the runtime reports ready), so synthesize an ID to preserve AWS's + // "RequestId: Error: ..." message format. + requestID = uuid.NewString() + } + + resp := ErrorResponse{ + ErrorMessage: fmt.Sprintf("RequestId: %s Error: %s", requestID, data.ErrorMessage), + ErrorType: string(data.ErrorType), + } + + payload, err := json.Marshal(resp) + if err != nil { + return err + } + + return ev.adapter.SendStatus(Error, payload) +} + +func (ev *LocalStackEventsAPI) SetCurrentRequestID(id interop.RequestID) { + ev.mu.Lock() + defer ev.mu.Unlock() + ev.requestID = string(id) + ev.StandaloneEventsAPI.SetCurrentRequestID(id) +} diff --git a/cmd/localstack/main.go b/cmd/localstack/main.go index b03d877..23abadc 100644 --- a/cmd/localstack/main.go +++ b/cmd/localstack/main.go @@ -179,6 +179,20 @@ func main() { localStackLogsEgressApi := NewLocalStackLogsEgressAPI(logCollector) tracer := NewLocalStackTracer() + // Create LocalStack adapter upfront so it can be shared with the events API and interop server + lsAdapter := &LocalStackAdapter{ + UpstreamEndpoint: lsOpts.RuntimeEndpoint, + RuntimeId: lsOpts.RuntimeId, + } + + // Events API forwards runtime fault events (unexpected exits) to LocalStack as error callbacks + lsEventsAPI := NewLocalStackEventsAPI(lsAdapter) + + // Supervisor intercepts runtime process terminations and emits fault events via the events API + supervisorCtx, cancelSupervisor := context.WithCancel(context.Background()) + + localStackSupv := NewLocalStackSupervisor(supervisorCtx, lsEventsAPI) + // build sandbox sandbox := rapidcore. NewSandboxBuilder(). @@ -186,11 +200,15 @@ func main() { AddShutdownFunc(func() { log.Debugln("Stopping file watcher") cancelFileWatcher() + log.Debugln("Stopping supervisor") + cancelSupervisor() }). SetExtensionsFlag(true). SetInitCachingFlag(true). SetLogsEgressAPI(localStackLogsEgressApi). - SetTracer(tracer) + SetTracer(tracer). + SetEventsAPI(lsEventsAPI). + SetSupervisor(localStackSupv) // Corresponds to the 'AWS_LAMBDA_RUNTIME_API' environment variable. // We need to ensure the runtime server is up before the INIT phase, @@ -211,7 +229,7 @@ func main() { runDaemon(d) // async defaultInterop := sandbox.DefaultInteropServer() - interopServer := NewCustomInteropServer(lsOpts, defaultInterop, logCollector) + interopServer := NewCustomInteropServer(lsOpts, lsAdapter, defaultInterop, logCollector) sandbox.SetInteropServer(interopServer) if len(handler) > 0 { sandbox.SetHandler(handler) diff --git a/cmd/localstack/supervisor.go b/cmd/localstack/supervisor.go new file mode 100644 index 0000000..249dda3 --- /dev/null +++ b/cmd/localstack/supervisor.go @@ -0,0 +1,126 @@ +package main + +import ( + "context" + "errors" + "fmt" + "strings" + "sync/atomic" + + "github.com/aws/aws-lambda-runtime-interface-emulator/internal/lambda/fatalerror" + "github.com/aws/aws-lambda-runtime-interface-emulator/internal/lambda/interop" + "github.com/aws/aws-lambda-runtime-interface-emulator/internal/lambda/supervisor" + "github.com/aws/aws-lambda-runtime-interface-emulator/internal/lambda/supervisor/model" + log "github.com/sirupsen/logrus" +) + +// LocalStackSupervisor wraps a ProcessSupervisor and intercepts runtime process termination events. +// When a runtime process exits unexpectedly it sends a fault event via the EventsAPI so LocalStack +// receives a proper error instead of timing out. +type LocalStackSupervisor struct { + model.ProcessSupervisor + eventsChan chan model.Event + eventsAPI interop.EventsAPI + + isShuttingDown *atomic.Bool +} + +func NewLocalStackSupervisor(ctx context.Context, evs interop.EventsAPI) *LocalStackSupervisor { + var isShuttingDown atomic.Bool + ls := &LocalStackSupervisor{ + ProcessSupervisor: supervisor.NewLocalSupervisor(), + eventsAPI: evs, + eventsChan: make(chan model.Event), + isShuttingDown: &isShuttingDown, + } + + go ls.loop(ctx) + + return ls +} + +func (ls *LocalStackSupervisor) loop(ctx context.Context) { + inCh, err := ls.ProcessSupervisor.Events(ctx, nil) + if err != nil { + panic(err) + } + defer close(ls.eventsChan) + for { + select { + case event, ok := <-inCh: + if !ok { + return + } + + select { + case ls.eventsChan <- event: + case <-ctx.Done(): + return + } + + if ls.isShuttingDown.Load() { + continue + } + + termination := event.Event.ProcessTerminated() + if termination == nil { + continue + } + + if !strings.Contains(*termination.Name, "runtime-") { + log.Debugf("Ignoring non-runtime process termination: %s", *termination.Name) + continue + } + + if termination.Signaled() != nil { + log.Debugf("Runtime process signalled: %d", *termination.Signo) + } + + // RequestID is left empty so the events API can resolve it: it uses the current + // invoke ID for a mid-invocation crash and synthesizes a placeholder for init-phase + // faults, where no invocation has been dispatched yet. + faultData := interop.FaultData{ + ErrorMessage: errors.New("Runtime exited without providing a reason"), + ErrorType: fatalerror.RuntimeExit, + } + if !termination.Success() { + faultData.ErrorMessage = fmt.Errorf("Runtime exited with error: %s", termination.String()) + } + + if err := ls.eventsAPI.SendFault(faultData); err != nil { + log.WithError(err).Error("Failed to send runtime fault event") + } + case <-ctx.Done(): + return + } + } +} + +func (ls *LocalStackSupervisor) Exec(ctx context.Context, request *model.ExecRequest) error { + if request.Domain == "runtime" { + ls.isShuttingDown.Store(false) + } + return ls.ProcessSupervisor.Exec(ctx, request) +} + +func (ls *LocalStackSupervisor) Terminate(ctx context.Context, request *model.TerminateRequest) error { + defer func() { + if request.Domain == "runtime" && strings.HasPrefix(request.Name, "runtime-") { + ls.isShuttingDown.Store(true) + } + }() + return ls.ProcessSupervisor.Terminate(ctx, request) +} + +func (ls *LocalStackSupervisor) Kill(ctx context.Context, request *model.KillRequest) error { + defer func() { + if request.Domain == "runtime" && strings.HasPrefix(request.Name, "runtime-") { + ls.isShuttingDown.Store(true) + } + }() + return ls.ProcessSupervisor.Kill(ctx, request) +} + +func (ls *LocalStackSupervisor) Events(ctx context.Context, _ *model.EventsRequest) (<-chan model.Event, error) { + return ls.eventsChan, nil +}