diff --git a/backend/pkg/server/services/flow_files.go b/backend/pkg/server/services/flow_files.go index 10cb5d5a..ffb3c144 100644 --- a/backend/pkg/server/services/flow_files.go +++ b/backend/pkg/server/services/flow_files.go @@ -1,7 +1,6 @@ package services import ( - "bytes" "context" "errors" "fmt" @@ -547,43 +546,29 @@ func (s *FlowFileService) DownloadFlowFile(c *gin.Context) { } // Single directory → ZIP with paths relative to that directory (backward-compat). - // The archive is buffered so an explicit Content-Length can be set; this allows - // clients (including Swagger UI) to recognise and download the file correctly. + // The archive is streamed directly to the response writer so the whole ZIP is + // never held in memory; the response uses chunked transfer encoding. if len(entries) == 1 && entries[0].info.IsDir() { name := filepath.Base(entries[0].localPath) - var buf bytes.Buffer - if err := flowfiles.ZipDirectory(&buf, entries[0].localPath); err != nil { - logger.FromContext(c).WithError(err).WithField("flow_id", flowID).Error("error creating ZIP archive") - response.Error(c, response.ErrInternal, err) - return + if err := streamZipArchive(c, name+".zip", func(w io.Writer) error { + return flowfiles.ZipDirectory(w, entries[0].localPath) + }); err != nil { + logger.FromContext(c).WithError(err).WithField("flow_id", flowID).Error("error streaming ZIP archive") } - c.DataFromReader(http.StatusOK, int64(buf.Len()), "application/zip", &buf, - map[string]string{ - "Content-Disposition": mime.FormatMediaType("attachment", map[string]string{ - "filename": name + ".zip", - }), - }) return } // Multiple paths (any mix of files and directories) → ZIP with cache-relative paths. - // Buffered for the same reason as above. + // Streamed directly to the response writer for the same reason as above. relPaths := make([]string, 0, len(entries)) for _, e := range entries { relPaths = append(relPaths, filepath.ToSlash(e.reqPath)) } - var buf bytes.Buffer - if err := flowfiles.ZipRelativePaths(&buf, s.flowDataDir(flowID), relPaths); err != nil { - logger.FromContext(c).WithError(err).WithField("flow_id", flowID).Error("error creating ZIP archive") - response.Error(c, response.ErrInternal, err) - return + if err := streamZipArchive(c, "download.zip", func(w io.Writer) error { + return flowfiles.ZipRelativePaths(w, s.flowDataDir(flowID), relPaths) + }); err != nil { + logger.FromContext(c).WithError(err).WithField("flow_id", flowID).Error("error streaming ZIP archive") } - c.DataFromReader(http.StatusOK, int64(buf.Len()), "application/zip", &buf, - map[string]string{ - "Content-Disposition": mime.FormatMediaType("attachment", map[string]string{ - "filename": "download.zip", - }), - }) } // PullFlowFiles is a function to sync one or more paths from the container into the local cache diff --git a/backend/pkg/server/services/flow_files_test.go b/backend/pkg/server/services/flow_files_test.go index 512762cc..a8531331 100644 --- a/backend/pkg/server/services/flow_files_test.go +++ b/backend/pkg/server/services/flow_files_test.go @@ -2501,6 +2501,9 @@ func TestFlowFileService_DownloadFlowFileScenarios(t *testing.T) { assert.Contains(t, w.Header().Get("Content-Disposition"), tt.wantDispContains) } if tt.wantZipEntries != nil { + // A streamed ZIP download must not buffer the whole archive, so it + // must not carry a Content-Length computed from a full buffer. + assert.Empty(t, w.Header().Get("Content-Length")) zr, err := zip.NewReader(bytes.NewReader(w.Body.Bytes()), int64(w.Body.Len())) require.NoError(t, err) got := map[string]string{} diff --git a/backend/pkg/server/services/resources.go b/backend/pkg/server/services/resources.go index 8fae9956..f14f54ff 100644 --- a/backend/pkg/server/services/resources.go +++ b/backend/pkg/server/services/resources.go @@ -1,10 +1,10 @@ package services import ( - "bytes" "context" "errors" "fmt" + "io" "mime" "net/http" "os" @@ -1930,18 +1930,11 @@ func (s *ResourceService) DownloadResource(c *gin.Context) { } dirName := path.Base(e.rec.Path) - var buf bytes.Buffer - if err := resources.ZipResources(&buf, zipEntries); err != nil { - logger.FromContext(c).WithError(err).Error("error creating zip archive for download") - response.Error(c, response.ErrInternal, err) - return + if err := streamZipArchive(c, dirName+".zip", func(w io.Writer) error { + return resources.ZipResources(w, zipEntries) + }); err != nil { + logger.FromContext(c).WithError(err).Error("error streaming zip archive for download") } - c.DataFromReader(http.StatusOK, int64(buf.Len()), "application/zip", &buf, - map[string]string{ - "Content-Disposition": mime.FormatMediaType("attachment", map[string]string{ - "filename": dirName + ".zip", - }), - }) return } @@ -1983,18 +1976,55 @@ func (s *ResourceService) DownloadResource(c *gin.Context) { } } - var buf bytes.Buffer - if err := resources.ZipResources(&buf, zipEntries); err != nil { - logger.FromContext(c).WithError(err).Error("error creating zip archive for download") - response.Error(c, response.ErrInternal, err) - return + if err := streamZipArchive(c, "download.zip", func(w io.Writer) error { + return resources.ZipResources(w, zipEntries) + }); err != nil { + logger.FromContext(c).WithError(err).Error("error streaming zip archive for download") } - c.DataFromReader(http.StatusOK, int64(buf.Len()), "application/zip", &buf, - map[string]string{ - "Content-Disposition": mime.FormatMediaType("attachment", map[string]string{ - "filename": "download.zip", - }), - }) +} + +// zipStreamWriter defers committing the ZIP response status and headers until the +// first byte is written. That lets streamZipArchive turn a build failure that +// happens before any output into a normal structured error response, while a +// mid-stream failure (after the 200 status is already on the wire) is aborted. +type zipStreamWriter struct { + c *gin.Context + filename string + started bool +} + +func (zw *zipStreamWriter) Write(p []byte) (int, error) { + if !zw.started { + zw.started = true + zw.c.Header("Content-Type", "application/zip") + zw.c.Header("Content-Disposition", mime.FormatMediaType("attachment", map[string]string{ + "filename": zw.filename, + })) + zw.c.Status(http.StatusOK) + } + return zw.c.Writer.Write(p) +} + +// streamZipArchive streams a ZIP archive to the client as build writes it, instead +// of buffering the entire archive in memory before sending it. Memory stays +// proportional to a single file copy buffer rather than the full archive size. +// +// Response headers and the 200 status are committed lazily, on the first byte +// build writes (see zipStreamWriter). If build fails before writing anything +// (e.g. a blob that cannot be opened), nothing has been committed and a normal +// structured error response is returned. If it fails mid-stream the 200 status is +// already on the wire, so the partial response is aborted; either way the error is +// returned for the caller to log with its own context. +func streamZipArchive(c *gin.Context, filename string, build func(w io.Writer) error) error { + if err := build(&zipStreamWriter{c: c, filename: filename}); err != nil { + if c.Writer.Written() { + c.Abort() + } else { + response.Error(c, response.ErrInternal, err) + } + return err + } + return nil } // ---- helper methods -------------------------------------------------------- diff --git a/backend/pkg/server/services/resources_test.go b/backend/pkg/server/services/resources_test.go index f5e4abc4..a2cc52eb 100644 --- a/backend/pkg/server/services/resources_test.go +++ b/backend/pkg/server/services/resources_test.go @@ -7,6 +7,7 @@ import ( "crypto/md5" "encoding/hex" "encoding/json" + "errors" "io" "mime/multipart" "net/http" @@ -1351,6 +1352,9 @@ func TestResourceService_DownloadResourceScenarios(t *testing.T) { assert.Contains(t, w.Header().Get("Content-Disposition"), tt.wantDispContains) } if tt.wantZipEntries != nil { + // A streamed ZIP download must not buffer the whole archive, so it + // must not carry a Content-Length computed from a full buffer. + assert.Empty(t, w.Header().Get("Content-Length")) zr, err := zip.NewReader(bytes.NewReader(w.Body.Bytes()), int64(w.Body.Len())) require.NoError(t, err) got := map[string]string{} @@ -1370,6 +1374,91 @@ func TestResourceService_DownloadResourceScenarios(t *testing.T) { } } +// TestStreamZipArchive verifies the shared streaming helper writes a valid ZIP +// straight to the response writer without buffering the whole archive (no +// Content-Length is emitted) and that a build error mid-stream propagates to the +// caller and aborts the request. +func TestStreamZipArchive(t *testing.T) { + gin.SetMode(gin.TestMode) + + t.Run("streams archive without content-length", func(t *testing.T) { + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Request = httptest.NewRequest(http.MethodGet, "/download", nil) + + err := streamZipArchive(c, "out.zip", func(zw io.Writer) error { + z := zip.NewWriter(zw) + f, createErr := z.Create("hello.txt") + require.NoError(t, createErr) + if _, writeErr := f.Write([]byte("hello world")); writeErr != nil { + return writeErr + } + return z.Close() + }) + + require.NoError(t, err) + assert.False(t, c.IsAborted()) + assert.Equal(t, "application/zip", w.Header().Get("Content-Type")) + assert.Contains(t, w.Header().Get("Content-Disposition"), "out.zip") + // The whole archive was never buffered, so no buffer-derived length exists. + assert.Empty(t, w.Header().Get("Content-Length")) + + zr, err := zip.NewReader(bytes.NewReader(w.Body.Bytes()), int64(w.Body.Len())) + require.NoError(t, err) + require.Len(t, zr.File, 1) + assert.Equal(t, "hello.txt", zr.File[0].Name) + rc, err := zr.File[0].Open() + require.NoError(t, err) + data, err := io.ReadAll(rc) + require.NoError(t, err) + require.NoError(t, rc.Close()) + assert.Equal(t, "hello world", string(data)) + }) + + t.Run("propagates build error after partial stream and aborts", func(t *testing.T) { + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Request = httptest.NewRequest(http.MethodGet, "/download", nil) + + sentinel := errors.New("source reader failed mid-stream") + err := streamZipArchive(c, "out.zip", func(zw io.Writer) error { + z := zip.NewWriter(zw) + f, createErr := z.Create("partial.txt") + require.NoError(t, createErr) + if _, writeErr := f.Write([]byte("partial data")); writeErr != nil { + return writeErr + } + // Flush bytes to the response, then fail as a slow/erroring source would. + if flushErr := z.Flush(); flushErr != nil { + return flushErr + } + return sentinel + }) + + require.ErrorIs(t, err, sentinel) + assert.True(t, c.IsAborted()) + }) + + t.Run("returns structured error when build fails before writing", func(t *testing.T) { + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Request = httptest.NewRequest(http.MethodGet, "/download", nil) + + sentinel := errors.New("blob open failed before any write") + err := streamZipArchive(c, "out.zip", func(zw io.Writer) error { + // Fail immediately, before writing any archive bytes. + return sentinel + }) + + require.ErrorIs(t, err, sentinel) + // Nothing was streamed, so a normal (non-200, non-zip) error response is + // sent instead of a truncated 200. + assert.Equal(t, http.StatusInternalServerError, w.Code) + assert.NotEqual(t, "application/zip", w.Header().Get("Content-Type")) + assert.True(t, c.IsAborted()) + }) +} + func TestResourceService_DeleteResourceScenarios(t *testing.T) { type seed struct { userID uint64