From 556692aeb13067fa75f50cb093a858ef8eb85ba1 Mon Sep 17 00:00:00 2001 From: mason5052 Date: Fri, 5 Jun 2026 10:04:03 -0400 Subject: [PATCH 1/2] fix: stream zip downloads without buffering archive DownloadResource and DownloadFlowFile built the entire ZIP archive in a bytes.Buffer before sending it, so heap usage scaled with archive size and a few concurrent large-directory downloads could exhaust process memory. Stream the archive straight to the response writer via a shared streamZipArchive helper; the existing ZipResources/ZipDirectory/ZipRelativePaths helpers already accept an io.Writer. Responses are now chunked (no Content-Length) and memory stays proportional to one file's copy buffer. --- backend/pkg/server/services/flow_files.go | 37 +++------- .../pkg/server/services/flow_files_test.go | 3 + backend/pkg/server/services/resources.go | 56 +++++++++------ backend/pkg/server/services/resources_test.go | 70 +++++++++++++++++++ 4 files changed, 117 insertions(+), 49 deletions(-) diff --git a/backend/pkg/server/services/flow_files.go b/backend/pkg/server/services/flow_files.go index 10cb5d5a..ffb3c144 100644 --- a/backend/pkg/server/services/flow_files.go +++ b/backend/pkg/server/services/flow_files.go @@ -1,7 +1,6 @@ package services import ( - "bytes" "context" "errors" "fmt" @@ -547,43 +546,29 @@ func (s *FlowFileService) DownloadFlowFile(c *gin.Context) { } // Single directory → ZIP with paths relative to that directory (backward-compat). - // The archive is buffered so an explicit Content-Length can be set; this allows - // clients (including Swagger UI) to recognise and download the file correctly. + // The archive is streamed directly to the response writer so the whole ZIP is + // never held in memory; the response uses chunked transfer encoding. if len(entries) == 1 && entries[0].info.IsDir() { name := filepath.Base(entries[0].localPath) - var buf bytes.Buffer - if err := flowfiles.ZipDirectory(&buf, entries[0].localPath); err != nil { - logger.FromContext(c).WithError(err).WithField("flow_id", flowID).Error("error creating ZIP archive") - response.Error(c, response.ErrInternal, err) - return + if err := streamZipArchive(c, name+".zip", func(w io.Writer) error { + return flowfiles.ZipDirectory(w, entries[0].localPath) + }); err != nil { + logger.FromContext(c).WithError(err).WithField("flow_id", flowID).Error("error streaming ZIP archive") } - c.DataFromReader(http.StatusOK, int64(buf.Len()), "application/zip", &buf, - map[string]string{ - "Content-Disposition": mime.FormatMediaType("attachment", map[string]string{ - "filename": name + ".zip", - }), - }) return } // Multiple paths (any mix of files and directories) → ZIP with cache-relative paths. - // Buffered for the same reason as above. + // Streamed directly to the response writer for the same reason as above. relPaths := make([]string, 0, len(entries)) for _, e := range entries { relPaths = append(relPaths, filepath.ToSlash(e.reqPath)) } - var buf bytes.Buffer - if err := flowfiles.ZipRelativePaths(&buf, s.flowDataDir(flowID), relPaths); err != nil { - logger.FromContext(c).WithError(err).WithField("flow_id", flowID).Error("error creating ZIP archive") - response.Error(c, response.ErrInternal, err) - return + if err := streamZipArchive(c, "download.zip", func(w io.Writer) error { + return flowfiles.ZipRelativePaths(w, s.flowDataDir(flowID), relPaths) + }); err != nil { + logger.FromContext(c).WithError(err).WithField("flow_id", flowID).Error("error streaming ZIP archive") } - c.DataFromReader(http.StatusOK, int64(buf.Len()), "application/zip", &buf, - map[string]string{ - "Content-Disposition": mime.FormatMediaType("attachment", map[string]string{ - "filename": "download.zip", - }), - }) } // PullFlowFiles is a function to sync one or more paths from the container into the local cache diff --git a/backend/pkg/server/services/flow_files_test.go b/backend/pkg/server/services/flow_files_test.go index 512762cc..a8531331 100644 --- a/backend/pkg/server/services/flow_files_test.go +++ b/backend/pkg/server/services/flow_files_test.go @@ -2501,6 +2501,9 @@ func TestFlowFileService_DownloadFlowFileScenarios(t *testing.T) { assert.Contains(t, w.Header().Get("Content-Disposition"), tt.wantDispContains) } if tt.wantZipEntries != nil { + // A streamed ZIP download must not buffer the whole archive, so it + // must not carry a Content-Length computed from a full buffer. + assert.Empty(t, w.Header().Get("Content-Length")) zr, err := zip.NewReader(bytes.NewReader(w.Body.Bytes()), int64(w.Body.Len())) require.NoError(t, err) got := map[string]string{} diff --git a/backend/pkg/server/services/resources.go b/backend/pkg/server/services/resources.go index 8fae9956..6eaf7b49 100644 --- a/backend/pkg/server/services/resources.go +++ b/backend/pkg/server/services/resources.go @@ -1,10 +1,10 @@ package services import ( - "bytes" "context" "errors" "fmt" + "io" "mime" "net/http" "os" @@ -1930,18 +1930,11 @@ func (s *ResourceService) DownloadResource(c *gin.Context) { } dirName := path.Base(e.rec.Path) - var buf bytes.Buffer - if err := resources.ZipResources(&buf, zipEntries); err != nil { - logger.FromContext(c).WithError(err).Error("error creating zip archive for download") - response.Error(c, response.ErrInternal, err) - return + if err := streamZipArchive(c, dirName+".zip", func(w io.Writer) error { + return resources.ZipResources(w, zipEntries) + }); err != nil { + logger.FromContext(c).WithError(err).Error("error streaming zip archive for download") } - c.DataFromReader(http.StatusOK, int64(buf.Len()), "application/zip", &buf, - map[string]string{ - "Content-Disposition": mime.FormatMediaType("attachment", map[string]string{ - "filename": dirName + ".zip", - }), - }) return } @@ -1983,18 +1976,35 @@ func (s *ResourceService) DownloadResource(c *gin.Context) { } } - var buf bytes.Buffer - if err := resources.ZipResources(&buf, zipEntries); err != nil { - logger.FromContext(c).WithError(err).Error("error creating zip archive for download") - response.Error(c, response.ErrInternal, err) - return + if err := streamZipArchive(c, "download.zip", func(w io.Writer) error { + return resources.ZipResources(w, zipEntries) + }); err != nil { + logger.FromContext(c).WithError(err).Error("error streaming zip archive for download") } - c.DataFromReader(http.StatusOK, int64(buf.Len()), "application/zip", &buf, - map[string]string{ - "Content-Disposition": mime.FormatMediaType("attachment", map[string]string{ - "filename": "download.zip", - }), - }) +} + +// streamZipArchive streams a ZIP archive to the client as build writes it, +// instead of buffering the entire archive in memory before sending it. Memory +// stays proportional to a single file copy buffer rather than the full archive. +// +// The status line and headers commit as soon as build writes the first byte, so +// an error returned by build after streaming has started can no longer change the +// HTTP status; the request is aborted and the error returned for the caller to +// log. The client then receives a truncated (invalid) archive, which is preferable +// to risking an out-of-memory crash. Errors detected before the archive is built +// (auth, path validation, resource lookup) are handled by the caller and still +// produce normal error responses. +func streamZipArchive(c *gin.Context, filename string, build func(w io.Writer) error) error { + c.Header("Content-Type", "application/zip") + c.Header("Content-Disposition", mime.FormatMediaType("attachment", map[string]string{ + "filename": filename, + })) + c.Status(http.StatusOK) + if err := build(c.Writer); err != nil { + c.Abort() + return err + } + return nil } // ---- helper methods -------------------------------------------------------- diff --git a/backend/pkg/server/services/resources_test.go b/backend/pkg/server/services/resources_test.go index f5e4abc4..35b9ac3f 100644 --- a/backend/pkg/server/services/resources_test.go +++ b/backend/pkg/server/services/resources_test.go @@ -7,6 +7,7 @@ import ( "crypto/md5" "encoding/hex" "encoding/json" + "errors" "io" "mime/multipart" "net/http" @@ -1351,6 +1352,9 @@ func TestResourceService_DownloadResourceScenarios(t *testing.T) { assert.Contains(t, w.Header().Get("Content-Disposition"), tt.wantDispContains) } if tt.wantZipEntries != nil { + // A streamed ZIP download must not buffer the whole archive, so it + // must not carry a Content-Length computed from a full buffer. + assert.Empty(t, w.Header().Get("Content-Length")) zr, err := zip.NewReader(bytes.NewReader(w.Body.Bytes()), int64(w.Body.Len())) require.NoError(t, err) got := map[string]string{} @@ -1370,6 +1374,72 @@ func TestResourceService_DownloadResourceScenarios(t *testing.T) { } } +// TestStreamZipArchive verifies the shared streaming helper writes a valid ZIP +// straight to the response writer without buffering the whole archive (no +// Content-Length is emitted) and that a build error mid-stream propagates to the +// caller and aborts the request. +func TestStreamZipArchive(t *testing.T) { + gin.SetMode(gin.TestMode) + + t.Run("streams archive without content-length", func(t *testing.T) { + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Request = httptest.NewRequest(http.MethodGet, "/download", nil) + + err := streamZipArchive(c, "out.zip", func(zw io.Writer) error { + z := zip.NewWriter(zw) + f, createErr := z.Create("hello.txt") + require.NoError(t, createErr) + if _, writeErr := f.Write([]byte("hello world")); writeErr != nil { + return writeErr + } + return z.Close() + }) + + require.NoError(t, err) + assert.False(t, c.IsAborted()) + assert.Equal(t, "application/zip", w.Header().Get("Content-Type")) + assert.Contains(t, w.Header().Get("Content-Disposition"), "out.zip") + // The whole archive was never buffered, so no buffer-derived length exists. + assert.Empty(t, w.Header().Get("Content-Length")) + + zr, err := zip.NewReader(bytes.NewReader(w.Body.Bytes()), int64(w.Body.Len())) + require.NoError(t, err) + require.Len(t, zr.File, 1) + assert.Equal(t, "hello.txt", zr.File[0].Name) + rc, err := zr.File[0].Open() + require.NoError(t, err) + data, err := io.ReadAll(rc) + rc.Close() + require.NoError(t, err) + assert.Equal(t, "hello world", string(data)) + }) + + t.Run("propagates build error after partial stream and aborts", func(t *testing.T) { + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Request = httptest.NewRequest(http.MethodGet, "/download", nil) + + sentinel := errors.New("source reader failed mid-stream") + err := streamZipArchive(c, "out.zip", func(zw io.Writer) error { + z := zip.NewWriter(zw) + f, createErr := z.Create("partial.txt") + require.NoError(t, createErr) + if _, writeErr := f.Write([]byte("partial data")); writeErr != nil { + return writeErr + } + // Flush bytes to the response, then fail as a slow/erroring source would. + if flushErr := z.Flush(); flushErr != nil { + return flushErr + } + return sentinel + }) + + require.ErrorIs(t, err, sentinel) + assert.True(t, c.IsAborted()) + }) +} + func TestResourceService_DeleteResourceScenarios(t *testing.T) { type seed struct { userID uint64 From 41524bd07ed833b5f7545259080b5a4fb7ea702f Mon Sep 17 00:00:00 2001 From: mason5052 Date: Fri, 5 Jun 2026 10:16:52 -0400 Subject: [PATCH 2/2] fix: return structured error when zip build fails before streaming Commit the ZIP response status and headers lazily, on the first byte written, via a small zipStreamWriter. A build failure before any output now returns the normal structured error response instead of a committed 200 with a truncated body; mid-stream failures still abort. Also check the reader Close error in the helper test and cover the pre-stream failure path. --- backend/pkg/server/services/resources.go | 54 +++++++++++++------ backend/pkg/server/services/resources_test.go | 21 +++++++- 2 files changed, 57 insertions(+), 18 deletions(-) diff --git a/backend/pkg/server/services/resources.go b/backend/pkg/server/services/resources.go index 6eaf7b49..f14f54ff 100644 --- a/backend/pkg/server/services/resources.go +++ b/backend/pkg/server/services/resources.go @@ -1983,25 +1983,45 @@ func (s *ResourceService) DownloadResource(c *gin.Context) { } } -// streamZipArchive streams a ZIP archive to the client as build writes it, -// instead of buffering the entire archive in memory before sending it. Memory -// stays proportional to a single file copy buffer rather than the full archive. +// zipStreamWriter defers committing the ZIP response status and headers until the +// first byte is written. That lets streamZipArchive turn a build failure that +// happens before any output into a normal structured error response, while a +// mid-stream failure (after the 200 status is already on the wire) is aborted. +type zipStreamWriter struct { + c *gin.Context + filename string + started bool +} + +func (zw *zipStreamWriter) Write(p []byte) (int, error) { + if !zw.started { + zw.started = true + zw.c.Header("Content-Type", "application/zip") + zw.c.Header("Content-Disposition", mime.FormatMediaType("attachment", map[string]string{ + "filename": zw.filename, + })) + zw.c.Status(http.StatusOK) + } + return zw.c.Writer.Write(p) +} + +// streamZipArchive streams a ZIP archive to the client as build writes it, instead +// of buffering the entire archive in memory before sending it. Memory stays +// proportional to a single file copy buffer rather than the full archive size. // -// The status line and headers commit as soon as build writes the first byte, so -// an error returned by build after streaming has started can no longer change the -// HTTP status; the request is aborted and the error returned for the caller to -// log. The client then receives a truncated (invalid) archive, which is preferable -// to risking an out-of-memory crash. Errors detected before the archive is built -// (auth, path validation, resource lookup) are handled by the caller and still -// produce normal error responses. +// Response headers and the 200 status are committed lazily, on the first byte +// build writes (see zipStreamWriter). If build fails before writing anything +// (e.g. a blob that cannot be opened), nothing has been committed and a normal +// structured error response is returned. If it fails mid-stream the 200 status is +// already on the wire, so the partial response is aborted; either way the error is +// returned for the caller to log with its own context. func streamZipArchive(c *gin.Context, filename string, build func(w io.Writer) error) error { - c.Header("Content-Type", "application/zip") - c.Header("Content-Disposition", mime.FormatMediaType("attachment", map[string]string{ - "filename": filename, - })) - c.Status(http.StatusOK) - if err := build(c.Writer); err != nil { - c.Abort() + if err := build(&zipStreamWriter{c: c, filename: filename}); err != nil { + if c.Writer.Written() { + c.Abort() + } else { + response.Error(c, response.ErrInternal, err) + } return err } return nil diff --git a/backend/pkg/server/services/resources_test.go b/backend/pkg/server/services/resources_test.go index 35b9ac3f..a2cc52eb 100644 --- a/backend/pkg/server/services/resources_test.go +++ b/backend/pkg/server/services/resources_test.go @@ -1410,8 +1410,8 @@ func TestStreamZipArchive(t *testing.T) { rc, err := zr.File[0].Open() require.NoError(t, err) data, err := io.ReadAll(rc) - rc.Close() require.NoError(t, err) + require.NoError(t, rc.Close()) assert.Equal(t, "hello world", string(data)) }) @@ -1438,6 +1438,25 @@ func TestStreamZipArchive(t *testing.T) { require.ErrorIs(t, err, sentinel) assert.True(t, c.IsAborted()) }) + + t.Run("returns structured error when build fails before writing", func(t *testing.T) { + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Request = httptest.NewRequest(http.MethodGet, "/download", nil) + + sentinel := errors.New("blob open failed before any write") + err := streamZipArchive(c, "out.zip", func(zw io.Writer) error { + // Fail immediately, before writing any archive bytes. + return sentinel + }) + + require.ErrorIs(t, err, sentinel) + // Nothing was streamed, so a normal (non-200, non-zip) error response is + // sent instead of a truncated 200. + assert.Equal(t, http.StatusInternalServerError, w.Code) + assert.NotEqual(t, "application/zip", w.Header().Get("Content-Type")) + assert.True(t, c.IsAborted()) + }) } func TestResourceService_DeleteResourceScenarios(t *testing.T) {