diff --git a/packages/api/internal/cfg/model.go b/packages/api/internal/cfg/model.go index d92de26c55..f7d540dd2e 100644 --- a/packages/api/internal/cfg/model.go +++ b/packages/api/internal/cfg/model.go @@ -73,6 +73,8 @@ type Config struct { SandboxStorageBackend string `env:"SANDBOX_STORAGE_BACKEND" envDefault:"memory"` DomainName string `env:"DOMAIN_NAME" envDefault:""` + + LogsCollectorAddress string `env:"LOGS_COLLECTOR_ADDRESS" envDefault:""` } type JWTSigningKey any diff --git a/packages/api/main.go b/packages/api/main.go index 424eee22a4..6695db06a3 100644 --- a/packages/api/main.go +++ b/packages/api/main.go @@ -251,6 +251,11 @@ func run() int { serviceInstanceID := uuid.New().String() nodeID := env.GetNodeID() + config, err := cfg.Parse() + if err != nil { + logger.L().Fatal(ctx, "Error parsing config", zap.Error(err)) + } + tel, err := telemetry.New(ctx, nodeID, serviceName, commitSHA, serviceVersion, serviceInstanceID) if err != nil { logger.L().Fatal(ctx, "failed to create metrics exporter", zap.Error(err)) @@ -279,7 +284,7 @@ func run() int { sbxlogger.SandboxLoggerConfig{ ServiceName: serviceName, IsInternal: false, - CollectorAddress: env.LogsCollectorAddress(), + CollectorAddress: config.LogsCollectorAddress, }, ) defer sbxLoggerExternal.Sync() @@ -291,7 +296,7 @@ func run() int { sbxlogger.SandboxLoggerConfig{ ServiceName: serviceName, IsInternal: true, - CollectorAddress: env.LogsCollectorAddress(), + CollectorAddress: config.LogsCollectorAddress, }, ) defer sbxLoggerInternal.Sync() @@ -310,11 +315,6 @@ func run() int { expectedMigration = 0 } - config, err := cfg.Parse() - if err != nil { - logger.L().Fatal(ctx, "Error parsing config", zap.Error(err)) - } - err = sqlcdb.CheckMigrationVersion(ctx, config.PostgresConnectionString, expectedMigration) if err != nil { l.Fatal(ctx, "failed to check migration version", zap.Error(err)) diff --git a/packages/envd/internal/api/compose_test.go b/packages/envd/internal/api/compose_test.go index db1c8ca1f3..239625deab 100644 --- a/packages/envd/internal/api/compose_test.go +++ b/packages/envd/internal/api/compose_test.go @@ -32,7 +32,7 @@ func newComposeTestAPI(t *testing.T) (*API, *user.User) { User: currentUser.Username, } - return New(&logger, defaults, nil, false), currentUser + return New(logger, defaults, nil, false), currentUser } func writeSourceFile(t *testing.T, dir string, name string, data []byte) string { diff --git a/packages/envd/internal/api/download_test.go b/packages/envd/internal/api/download_test.go index fd2b9447d8..ee3752e816 100644 --- a/packages/envd/internal/api/download_test.go +++ b/packages/envd/internal/api/download_test.go @@ -95,7 +95,7 @@ func TestGetFilesContentDisposition(t *testing.T) { EnvVars: utils.NewMap[string, string](), User: currentUser.Username, } - api := New(&logger, defaults, nil, false) + api := New(logger, defaults, nil, false) // Create request and response recorder req := httptest.NewRequest(http.MethodGet, "/files?path="+url.QueryEscape(tempFile), nil) @@ -144,7 +144,7 @@ func TestGetFilesContentDispositionWithNestedPath(t *testing.T) { EnvVars: utils.NewMap[string, string](), User: currentUser.Username, } - api := New(&logger, defaults, nil, false) + api := New(logger, defaults, nil, false) // Create request and response recorder req := httptest.NewRequest(http.MethodGet, "/files?path="+url.QueryEscape(tempFile), nil) @@ -187,7 +187,7 @@ func TestGetFiles_GzipEncoding_ExplicitIdentityOffWithRange(t *testing.T) { EnvVars: utils.NewMap[string, string](), User: currentUser.Username, } - api := New(&logger, defaults, nil, false) + api := New(logger, defaults, nil, false) // Create request and response recorder req := httptest.NewRequest(http.MethodGet, "/files?path="+url.QueryEscape(tempFile), nil) @@ -228,7 +228,7 @@ func TestGetFiles_GzipDownload(t *testing.T) { EnvVars: utils.NewMap[string, string](), User: currentUser.Username, } - api := New(&logger, defaults, nil, false) + api := New(logger, defaults, nil, false) req := httptest.NewRequest(http.MethodGet, "/files?path="+url.QueryEscape(tempFile), nil) req.Header.Set("Accept-Encoding", "gzip") @@ -293,7 +293,7 @@ func TestPostFiles_GzipUpload(t *testing.T) { EnvVars: utils.NewMap[string, string](), User: currentUser.Username, } - api := New(&logger, defaults, nil, false) + api := New(logger, defaults, nil, false) req := httptest.NewRequest(http.MethodPost, "/files?path="+url.QueryEscape(destPath), &gzBuf) req.Header.Set("Content-Type", mpWriter.FormDataContentType()) @@ -333,7 +333,7 @@ func TestPostFiles_RawBodyUpload(t *testing.T) { EnvVars: utils.NewMap[string, string](), User: currentUser.Username, } - api := New(&logger, defaults, nil, false) + api := New(logger, defaults, nil, false) req := httptest.NewRequest(http.MethodPost, "/files?path="+url.QueryEscape(destPath), bytes.NewReader(originalContent)) req.Header.Set("Content-Type", "application/octet-stream") @@ -371,7 +371,7 @@ func TestPostFiles_RawBodyUploadCreatesDirectories(t *testing.T) { EnvVars: utils.NewMap[string, string](), User: currentUser.Username, } - api := New(&logger, defaults, nil, false) + api := New(logger, defaults, nil, false) req := httptest.NewRequest(http.MethodPost, "/files?path="+url.QueryEscape(destPath), bytes.NewReader(originalContent)) req.Header.Set("Content-Type", "application/octet-stream") @@ -404,7 +404,7 @@ func TestPostFiles_RawBodyUploadRequiresPath(t *testing.T) { EnvVars: utils.NewMap[string, string](), User: currentUser.Username, } - api := New(&logger, defaults, nil, false) + api := New(logger, defaults, nil, false) req := httptest.NewRequest(http.MethodPost, "/files", bytes.NewReader([]byte("some content"))) req.Header.Set("Content-Type", "application/octet-stream") @@ -439,7 +439,7 @@ func TestPostFiles_RawBodyUploadOverwritesExisting(t *testing.T) { EnvVars: utils.NewMap[string, string](), User: currentUser.Username, } - api := New(&logger, defaults, nil, false) + api := New(logger, defaults, nil, false) req := httptest.NewRequest(http.MethodPost, "/files?path="+url.QueryEscape(destPath), bytes.NewReader(newContent)) req.Header.Set("Content-Type", "application/octet-stream") @@ -485,7 +485,7 @@ func TestPostFiles_RawBodyGzipUpload(t *testing.T) { EnvVars: utils.NewMap[string, string](), User: currentUser.Username, } - api := New(&logger, defaults, nil, false) + api := New(logger, defaults, nil, false) req := httptest.NewRequest(http.MethodPost, "/files?path="+url.QueryEscape(destPath), &gzBuf) req.Header.Set("Content-Type", "application/octet-stream") @@ -519,7 +519,7 @@ func TestPostFiles_UnsupportedContentType(t *testing.T) { EnvVars: utils.NewMap[string, string](), User: currentUser.Username, } - api := New(&logger, defaults, nil, false) + api := New(logger, defaults, nil, false) tempDir := t.TempDir() destPath := filepath.Join(tempDir, "test.txt") @@ -565,7 +565,7 @@ func TestPostFiles_MultipartStillWorksWithoutContentType(t *testing.T) { EnvVars: utils.NewMap[string, string](), User: currentUser.Username, } - api := New(&logger, defaults, nil, false) + api := New(logger, defaults, nil, false) req := httptest.NewRequest(http.MethodPost, "/files?path="+url.QueryEscape(destPath), &multipartBuf) req.Header.Set("Content-Type", mpWriter.FormDataContentType()) @@ -623,7 +623,7 @@ func TestGzipUploadThenGzipDownload(t *testing.T) { EnvVars: utils.NewMap[string, string](), User: currentUser.Username, } - api := New(&logger, defaults, nil, false) + api := New(logger, defaults, nil, false) uploadReq := httptest.NewRequest(http.MethodPost, "/files?path="+url.QueryEscape(destPath), &gzBuf) uploadReq.Header.Set("Content-Type", mpWriter.FormDataContentType()) diff --git a/packages/envd/internal/api/init.go b/packages/envd/internal/api/init.go index 8722bb5449..4fbb7348a5 100644 --- a/packages/envd/internal/api/init.go +++ b/packages/envd/internal/api/init.go @@ -153,7 +153,7 @@ func (a *API) PostInit(w http.ResponseWriter, r *http.Request) { go func() { //nolint:contextcheck // TODO: fix this later ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) defer cancel() - host.PollForMMDSOpts(ctx, a.mmdsChan, a.defaults.EnvVars) + host.PollForMMDSOpts(ctx, a.logger, a.mmdsChan, a.defaults.EnvVars) }() w.Header().Set("Cache-Control", "no-store") diff --git a/packages/envd/internal/api/init_test.go b/packages/envd/internal/api/init_test.go index 9877104e09..e5b72e8c34 100644 --- a/packages/envd/internal/api/init_test.go +++ b/packages/envd/internal/api/init_test.go @@ -142,7 +142,7 @@ func newTestAPI(accessToken *SecureToken, mmdsClient MMDSClient) *API { defaults := &execcontext.Defaults{ EnvVars: utils.NewMap[string, string](), } - api := New(&logger, defaults, nil, false) + api := New(logger, defaults, nil, false) if accessToken != nil { api.accessToken.TakeFrom(accessToken) } diff --git a/packages/envd/internal/api/store.go b/packages/envd/internal/api/store.go index 2524206226..45a065bde4 100644 --- a/packages/envd/internal/api/store.go +++ b/packages/envd/internal/api/store.go @@ -27,7 +27,7 @@ func (c *DefaultMMDSClient) GetAccessTokenHash(ctx context.Context) (string, err type API struct { isNotFC bool - logger *zerolog.Logger + logger zerolog.Logger accessToken *SecureToken defaults *execcontext.Defaults @@ -39,7 +39,7 @@ type API struct { initLock sync.Mutex } -func New(l *zerolog.Logger, defaults *execcontext.Defaults, mmdsChan chan *host.MMDSOpts, isNotFC bool) *API { +func New(l zerolog.Logger, defaults *execcontext.Defaults, mmdsChan chan *host.MMDSOpts, isNotFC bool) *API { return &API{ logger: l, defaults: defaults, diff --git a/packages/envd/internal/host/mmds.go b/packages/envd/internal/host/mmds.go index db1690a749..8b4db1c083 100644 --- a/packages/envd/internal/host/mmds.go +++ b/packages/envd/internal/host/mmds.go @@ -11,6 +11,8 @@ import ( "path/filepath" "time" + "github.com/rs/zerolog" + "github.com/e2b-dev/infra/packages/envd/internal/utils" ) @@ -134,7 +136,7 @@ func GetAccessTokenHashFromMMDS(ctx context.Context) (string, error) { return opts.AccessTokenHash, nil } -func PollForMMDSOpts(ctx context.Context, mmdsChan chan<- *MMDSOpts, envVars *utils.Map[string, string]) { +func PollForMMDSOpts(ctx context.Context, logger zerolog.Logger, mmdsChan chan<- *MMDSOpts, envVars *utils.Map[string, string]) { httpClient := &http.Client{} defer httpClient.CloseIdleConnections() @@ -144,20 +146,20 @@ func PollForMMDSOpts(ctx context.Context, mmdsChan chan<- *MMDSOpts, envVars *ut for { select { case <-ctx.Done(): - fmt.Fprintf(os.Stderr, "context cancelled while waiting for mmds opts") + logger.Error().Msg("context cancelled while waiting for mmds opts") return case <-ticker.C: token, err := getMMDSToken(ctx, httpClient) if err != nil { - fmt.Fprintf(os.Stderr, "error getting mmds token: %v\n", err) + logger.Error().Err(err).Msg("error getting mmds token") continue } mmdsOpts, err := getMMDSOpts(ctx, httpClient, token) if err != nil { - fmt.Fprintf(os.Stderr, "error getting mmds opts: %v\n", err) + logger.Error().Err(err).Msg("error getting mmds opts") continue } @@ -166,10 +168,10 @@ func PollForMMDSOpts(ctx context.Context, mmdsChan chan<- *MMDSOpts, envVars *ut envVars.Store("E2B_TEMPLATE_ID", mmdsOpts.TemplateID) if err := os.WriteFile(filepath.Join(E2BRunDir, ".E2B_SANDBOX_ID"), []byte(mmdsOpts.SandboxID), 0o666); err != nil { - fmt.Fprintf(os.Stderr, "error writing sandbox ID file: %v\n", err) + logger.Error().Err(err).Msg("error writing sandbox ID file") } if err := os.WriteFile(filepath.Join(E2BRunDir, ".E2B_TEMPLATE_ID"), []byte(mmdsOpts.TemplateID), 0o666); err != nil { - fmt.Fprintf(os.Stderr, "error writing template ID file: %v\n", err) + logger.Error().Err(err).Msg("error writing template ID file") } if mmdsOpts.LogsCollectorAddress != "" { diff --git a/packages/envd/internal/services/process/handler/handler.go b/packages/envd/internal/services/process/handler/handler.go index 4d60b2b9e0..0a7957d149 100644 --- a/packages/envd/internal/services/process/handler/handler.go +++ b/packages/envd/internal/services/process/handler/handler.go @@ -221,7 +221,7 @@ func New( } if readErr != nil { - fmt.Fprintf(os.Stderr, "error reading from pty: %s\n", readErr) + logger.Error().Err(readErr).Msg("error reading from pty") break } @@ -265,7 +265,7 @@ func New( } if readErr != nil { - fmt.Fprintf(os.Stderr, "error reading from stdout: %s\n", readErr) + logger.Error().Err(readErr).Msg("error reading from stdout") break } @@ -307,7 +307,7 @@ func New( } if readErr != nil { - fmt.Fprintf(os.Stderr, "error reading from stderr: %s\n", readErr) + logger.Error().Err(readErr).Msg("error reading from stderr") break } diff --git a/packages/envd/main.go b/packages/envd/main.go index 67a7cdb2a4..88409ada53 100644 --- a/packages/envd/main.go +++ b/packages/envd/main.go @@ -15,6 +15,7 @@ import ( connectcors "connectrpc.com/cors" "github.com/go-chi/chi/v5" "github.com/rs/cors" + "github.com/rs/zerolog" "github.com/e2b-dev/infra/packages/envd/internal/api" "github.com/e2b-dev/infra/packages/envd/internal/execcontext" @@ -147,46 +148,47 @@ func main() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - if err := os.MkdirAll(host.E2BRunDir, 0o755); err != nil { - fmt.Fprintf(os.Stderr, "error creating E2B run directory: %v\n", err) - } - defaults := &execcontext.Defaults{ User: defaultUser, EnvVars: utils.NewMap[string, string](), } + + mmdsChan := make(chan *host.MMDSOpts, 1) + defer close(mmdsChan) + l := logs.NewLogger(ctx, isNotFC, mmdsChan) + + if err := os.MkdirAll(host.E2BRunDir, 0o755); err != nil { + l.Error().Err(err).Msg("error creating E2B run directory") + } + isFCBoolStr := strconv.FormatBool(!isNotFC) defaults.EnvVars.Store("E2B_SANDBOX", isFCBoolStr) if err := os.WriteFile(filepath.Join(host.E2BRunDir, ".E2B_SANDBOX"), []byte(isFCBoolStr), 0o444); err != nil { - fmt.Fprintf(os.Stderr, "error writing sandbox file: %v\n", err) + l.Error().Err(err).Msg("error writing sandbox file") } - mmdsChan := make(chan *host.MMDSOpts, 1) - defer close(mmdsChan) if !isNotFC { - go host.PollForMMDSOpts(ctx, mmdsChan, defaults.EnvVars) + go host.PollForMMDSOpts(ctx, l.With().Str("logger", "mmds-poller").Logger(), mmdsChan, defaults.EnvVars) } - l := logs.NewLogger(ctx, isNotFC, mmdsChan) - m := chi.NewRouter() envLogger := l.With().Str("logger", "envd").Logger() fsLogger := l.With().Str("logger", "filesystem").Logger() filesystemRpc.Handle(m, &fsLogger, defaults) - cgroupManager := createCgroupManager() + cgroupManager := createCgroupManager(l) defer func() { err := cgroupManager.Close() if err != nil { - fmt.Fprintf(os.Stderr, "failed to close cgroup manager: %v\n", err) + l.Error().Err(err).Msg("failed to close cgroup manager") } }() processLogger := l.With().Str("logger", "process").Logger() processService := processRpc.Handle(m, &processLogger, defaults, cgroupManager) - service := api.New(&envLogger, defaults, mmdsChan, isNotFC) + service := api.New(envLogger, defaults, mmdsChan, isNotFC) handler := api.HandlerFromMux(service, m) middleware := authn.NewMiddleware(permissions.AuthenticateUsername) @@ -241,17 +243,17 @@ func main() { } } -func createCgroupManager() (m cgroups.Manager) { +func createCgroupManager(logger *zerolog.Logger) (m cgroups.Manager) { defer func() { if m == nil { - fmt.Fprintf(os.Stderr, "falling back to no-op cgroup manager\n") + logger.Warn().Msg("falling back to no-op cgroup manager") m = cgroups.NewNoopManager() } }() metrics, err := host.GetMetrics() if err != nil { - fmt.Fprintf(os.Stderr, "failed to calculate host metrics: %v\n", err) + logger.Error().Err(err).Msg("failed to calculate host metrics") return nil } @@ -284,7 +286,7 @@ func createCgroupManager() (m cgroups.Manager) { mgr, err := cgroups.NewCgroup2Manager(opts...) if err != nil { - fmt.Fprintf(os.Stderr, "failed to create cgroup2 manager: %v\n", err) + logger.Error().Err(err).Msg("failed to create cgroup2 manager") return nil } diff --git a/packages/orchestrator/pkg/cfg/model.go b/packages/orchestrator/pkg/cfg/model.go index 9484515a81..ccdc863a70 100644 --- a/packages/orchestrator/pkg/cfg/model.go +++ b/packages/orchestrator/pkg/cfg/model.go @@ -27,6 +27,9 @@ type BuilderConfig struct { SharedChunkCacheDir string `env:"SHARED_CHUNK_CACHE_PATH"` TemplatesDir string `env:"TEMPLATES_DIR,expand" envDefault:"${ORCHESTRATOR_BASE_PATH}/build-templates"` + WriteEnvdMessagesToLog bool `env:"WRITE_ENVD_MESSAGES_TO_LOG" envDefault:"false"` + LogsCollectorAddress string `env:"LOGS_COLLECTOR_ADDRESS" envDefault:""` + DefaultCacheDir string `env:"DEFAULT_CACHE_DIR,expand" envDefault:"${ORCHESTRATOR_BASE_PATH}/build"` StorageConfig storage.Config diff --git a/packages/orchestrator/pkg/factories/run.go b/packages/orchestrator/pkg/factories/run.go index 0a07b39e7d..9a4cc5ed12 100644 --- a/packages/orchestrator/pkg/factories/run.go +++ b/packages/orchestrator/pkg/factories/run.go @@ -286,7 +286,7 @@ func run(config cfg.Config, opts Options) (success bool) { sbxlogger.SandboxLoggerConfig{ ServiceName: serviceName, IsInternal: false, - CollectorAddress: env.LogsCollectorAddress(), + CollectorAddress: config.LogsCollectorAddress, }, ) defer func(l logger.Logger) { @@ -304,7 +304,7 @@ func run(config cfg.Config, opts Options) (success bool) { sbxlogger.SandboxLoggerConfig{ ServiceName: serviceName, IsInternal: true, - CollectorAddress: env.LogsCollectorAddress(), + CollectorAddress: config.LogsCollectorAddress, }, ) defer func(l logger.Logger) { @@ -572,7 +572,7 @@ func run(config cfg.Config, opts Options) (success bool) { sbxlogger.SandboxLoggerConfig{ ServiceName: constants.ServiceNameTemplate, IsInternal: false, - CollectorAddress: env.LogsCollectorAddress(), + CollectorAddress: config.LogsCollectorAddress, }, ) closers = append(closers, closer{ @@ -596,7 +596,7 @@ func run(config cfg.Config, opts Options) (success bool) { } // hyperloop server - hyperloopSrv, err := hyperloopserver.NewHyperloopServer(ctx, config.NetworkConfig.HyperloopProxyPort, globalLogger, sandboxes) + hyperloopSrv, err := hyperloopserver.NewHyperloopServer(ctx, config, globalLogger, sandboxes) if err != nil { logger.L().Fatal(ctx, "failed to create hyperloop server", zap.Error(err)) } diff --git a/packages/orchestrator/pkg/hyperloopserver/handlers/logs.go b/packages/orchestrator/pkg/hyperloopserver/handlers/logs.go index 5323636b08..ce830e3849 100644 --- a/packages/orchestrator/pkg/hyperloopserver/handlers/logs.go +++ b/packages/orchestrator/pkg/hyperloopserver/handlers/logs.go @@ -2,13 +2,14 @@ package handlers import ( "bytes" + "context" "encoding/json" - "fmt" "net" "net/http" "github.com/gin-gonic/gin" "go.uber.org/zap" + "go.uber.org/zap/zapcore" "github.com/e2b-dev/infra/packages/shared/pkg/logger" ) @@ -25,70 +26,80 @@ func (h *APIStore) Logs(c *gin.Context) { } sbxID := sbx.Runtime.SandboxID + teamID := sbx.Runtime.TeamID payload := make(map[string]any) if err := c.ShouldBindJSON(&payload); err != nil { h.sendAPIStoreError(c, http.StatusBadRequest, "Invalid body for logs") - h.logger.Error(ctx, "error when parsing sandbox logs request", zap.Error(err), logger.WithSandboxID(sbxID)) + h.logger.Error(ctx, "error when parsing sandbox logs request", zap.Error(err), logger.WithSandboxID(sbxID), logger.WithTeamID(teamID)) return } - err = h.validatePayloadSandboxID(payload, sbxID) - if err != nil { - h.sendAPIStoreError(c, http.StatusBadRequest, "Invalid sandboxID in logs payload") - h.logger.Error(ctx, "error when parsing sandbox logs request", zap.Error(err), logger.WithSandboxID(sbxID)) - - return + if rawInstanceID, ok := payload["instanceID"]; ok { + if existing, ok := rawInstanceID.(string); ok && existing != "" && existing != sbxID { + payload["invalid-instance-id"] = existing + } } - // Overwrite instanceID and teamID to avoid spoofing + // Overwrite sandbox-owned fields to avoid spoofing. payload["instanceID"] = sbxID - payload["teamID"] = sbx.Runtime.TeamID - - logs, err := json.Marshal(payload) - if err != nil { - h.sendAPIStoreError(c, http.StatusInternalServerError, "Error when parsing logs payload") - h.logger.Error(ctx, "error when parsing logs payload", zap.Error(err), logger.WithSandboxID(sbxID)) + payload["teamID"] = teamID + payload["source"] = "envd" - return + if h.writeEnvdMessagesToLog { + h.logEnvdPayload(ctx, payload, sbxID, teamID) } - request, err := http.NewRequestWithContext(c, http.MethodPost, h.collectorAddr, bytes.NewBuffer(logs)) - if err != nil { - h.sendAPIStoreError(c, http.StatusInternalServerError, "Error when creating request to forwarding sandbox logs") - h.logger.Error(ctx, "error when creating request to forwarding sandbox logs", zap.Error(err), logger.WithSandboxID(sbxID)) + if h.collectorAddr != "" { + logs, err := json.Marshal(payload) + if err != nil { + h.sendAPIStoreError(c, http.StatusInternalServerError, "Error when parsing logs payload") + h.logger.Error(ctx, "error when parsing logs payload", zap.Error(err), logger.WithSandboxID(sbxID), logger.WithTeamID(teamID)) - return - } + return + } - request.Header.Set("Content-Type", "application/json") - response, err := h.collectorClient.Do(request) - if err != nil { - h.sendAPIStoreError(c, http.StatusInternalServerError, "Error when forwarding sandbox logs") - h.logger.Error(ctx, "error when forwarding sandbox logs", zap.Error(err), logger.WithSandboxID(sbxID)) + request, err := http.NewRequestWithContext(c, http.MethodPost, h.collectorAddr, bytes.NewBuffer(logs)) + if err != nil { + h.sendAPIStoreError(c, http.StatusInternalServerError, "Error when creating request to forwarding sandbox logs") + h.logger.Error(ctx, "error when creating request to forwarding sandbox logs", zap.Error(err), logger.WithSandboxID(sbxID), logger.WithTeamID(teamID)) - return + return + } + + request.Header.Set("Content-Type", "application/json") + response, err := h.collectorClient.Do(request) + if err != nil { + h.sendAPIStoreError(c, http.StatusInternalServerError, "Error when forwarding sandbox logs") + h.logger.Error(ctx, "error when forwarding sandbox logs", zap.Error(err), logger.WithSandboxID(sbxID), logger.WithTeamID(teamID)) + + return + } + defer response.Body.Close() } - defer response.Body.Close() c.Status(http.StatusOK) } -// validatePayloadSandboxID checks if the payload contains correct instanceID to prevent slow requests to contaminating the logs of other sandboxes. -func (h *APIStore) validatePayloadSandboxID(payload map[string]any, sbxID string) error { - if payload["instanceID"] == nil { - return fmt.Errorf("missing sandboxID in logs payload") - } - - payloadSandboxID, ok := payload["instanceID"].(string) - if !ok { - return fmt.Errorf("instanceID in logs payload is not a string: %v", payload["instanceID"]) +func (h *APIStore) logEnvdPayload(ctx context.Context, payload map[string]any, sbxID, teamID string) { + message := "envd log" + if msg, ok := payload["message"].(string); ok && msg != "" { + message = msg + } else if msg, ok := payload["msg"].(string); ok && msg != "" { + message = msg } - if payloadSandboxID != sbxID { - return fmt.Errorf("sandboxID in logs payload does not match the sandboxID of the source sandbox (%s != %s)", payloadSandboxID, sbxID) + fields := []zap.Field{ + logger.WithSandboxID(sbxID), + logger.WithTeamID(teamID), + zap.String("log.source", "envd"), + zap.Any("envd_log", payload), } - return nil + level, _ := payload["level"].(string) + zapLevel, _ := zapcore.ParseLevel(level) // defaults to info if invalid + h.logger. + WithOptions(zap.AddStacktrace(zapcore.FatalLevel)). // stack trace only points to hyperloop, useless + Log(ctx, zapLevel, message, fields...) } diff --git a/packages/orchestrator/pkg/hyperloopserver/handlers/store.go b/packages/orchestrator/pkg/hyperloopserver/handlers/store.go index eab5e8b5a5..eb3882d5db 100644 --- a/packages/orchestrator/pkg/hyperloopserver/handlers/store.go +++ b/packages/orchestrator/pkg/hyperloopserver/handlers/store.go @@ -6,6 +6,7 @@ import ( "github.com/gin-gonic/gin" + "github.com/e2b-dev/infra/packages/orchestrator/pkg/cfg" "github.com/e2b-dev/infra/packages/orchestrator/pkg/sandbox" "github.com/e2b-dev/infra/packages/shared/pkg/apierrors" "github.com/e2b-dev/infra/packages/shared/pkg/logger" @@ -17,16 +18,18 @@ type APIStore struct { logger logger.Logger sandboxes *sandbox.Map - collectorClient http.Client - collectorAddr string + collectorClient http.Client + collectorAddr string + writeEnvdMessagesToLog bool } -func NewHyperloopStore(logger logger.Logger, sandboxes *sandbox.Map, sandboxCollectorAddr string) *APIStore { +func NewHyperloopStore(logger logger.Logger, sandboxes *sandbox.Map, config cfg.Config) *APIStore { return &APIStore{ logger: logger, sandboxes: sandboxes, - collectorAddr: sandboxCollectorAddr, + writeEnvdMessagesToLog: config.WriteEnvdMessagesToLog, + collectorAddr: config.LogsCollectorAddress, collectorClient: http.Client{ Timeout: CollectorExporterTimeout, }, diff --git a/packages/orchestrator/pkg/hyperloopserver/server.go b/packages/orchestrator/pkg/hyperloopserver/server.go index fb16ab8853..9e9872e9ed 100644 --- a/packages/orchestrator/pkg/hyperloopserver/server.go +++ b/packages/orchestrator/pkg/hyperloopserver/server.go @@ -10,18 +10,17 @@ import ( "github.com/gin-gonic/gin" middleware "github.com/oapi-codegen/gin-middleware" + "github.com/e2b-dev/infra/packages/orchestrator/pkg/cfg" "github.com/e2b-dev/infra/packages/orchestrator/pkg/hyperloopserver/contracts" "github.com/e2b-dev/infra/packages/orchestrator/pkg/hyperloopserver/handlers" "github.com/e2b-dev/infra/packages/orchestrator/pkg/sandbox" - "github.com/e2b-dev/infra/packages/shared/pkg/env" "github.com/e2b-dev/infra/packages/shared/pkg/logger" ) const maxUploadLimit = 1 << 28 // 256 MiB -func NewHyperloopServer(ctx context.Context, port uint16, logger logger.Logger, sandboxes *sandbox.Map) (*http.Server, error) { - sandboxCollectorAddr := env.LogsCollectorAddress() - store := handlers.NewHyperloopStore(logger, sandboxes, sandboxCollectorAddr) +func NewHyperloopServer(ctx context.Context, config cfg.Config, logger logger.Logger, sandboxes *sandbox.Map) (*http.Server, error) { + store := handlers.NewHyperloopStore(logger, sandboxes, config) swagger, err := contracts.GetSwagger() if err != nil { return nil, fmt.Errorf("error getting swagger spec: %w", err) @@ -36,7 +35,7 @@ func NewHyperloopServer(ctx context.Context, port uint16, logger logger.Logger, server := &http.Server{ Handler: engine, - Addr: fmt.Sprintf("0.0.0.0:%d", port), + Addr: fmt.Sprintf("0.0.0.0:%d", config.NetworkConfig.HyperloopProxyPort), BaseContext: func(net.Listener) context.Context { return ctx }, } diff --git a/packages/shared/pkg/env/env.go b/packages/shared/pkg/env/env.go index 71462a238e..92ac7f74f7 100644 --- a/packages/shared/pkg/env/env.go +++ b/packages/shared/pkg/env/env.go @@ -46,7 +46,3 @@ func GetEnvAsInt(key string, defaultValue int) (int, error) { func GetNodeID() string { return utils.RequiredEnv("NODE_ID", "Node ID of the instance node is required") } - -func LogsCollectorAddress() string { - return os.Getenv("LOGS_COLLECTOR_ADDRESS") -}