Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 11 additions & 26 deletions backend/pkg/server/services/flow_files.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package services

import (
"bytes"
"context"
"errors"
"fmt"
Expand Down Expand Up @@ -547,43 +546,29 @@ func (s *FlowFileService) DownloadFlowFile(c *gin.Context) {
}

// Single directory → ZIP with paths relative to that directory (backward-compat).
// The archive is buffered so an explicit Content-Length can be set; this allows
// clients (including Swagger UI) to recognise and download the file correctly.
// The archive is streamed directly to the response writer so the whole ZIP is
// never held in memory; the response uses chunked transfer encoding.
if len(entries) == 1 && entries[0].info.IsDir() {
name := filepath.Base(entries[0].localPath)
var buf bytes.Buffer
if err := flowfiles.ZipDirectory(&buf, entries[0].localPath); err != nil {
logger.FromContext(c).WithError(err).WithField("flow_id", flowID).Error("error creating ZIP archive")
response.Error(c, response.ErrInternal, err)
return
if err := streamZipArchive(c, name+".zip", func(w io.Writer) error {
return flowfiles.ZipDirectory(w, entries[0].localPath)
}); err != nil {
logger.FromContext(c).WithError(err).WithField("flow_id", flowID).Error("error streaming ZIP archive")
}
Comment thread
mason5052 marked this conversation as resolved.
c.DataFromReader(http.StatusOK, int64(buf.Len()), "application/zip", &buf,
map[string]string{
"Content-Disposition": mime.FormatMediaType("attachment", map[string]string{
"filename": name + ".zip",
}),
})
return
}

// Multiple paths (any mix of files and directories) → ZIP with cache-relative paths.
// Buffered for the same reason as above.
// Streamed directly to the response writer for the same reason as above.
relPaths := make([]string, 0, len(entries))
for _, e := range entries {
relPaths = append(relPaths, filepath.ToSlash(e.reqPath))
}
var buf bytes.Buffer
if err := flowfiles.ZipRelativePaths(&buf, s.flowDataDir(flowID), relPaths); err != nil {
logger.FromContext(c).WithError(err).WithField("flow_id", flowID).Error("error creating ZIP archive")
response.Error(c, response.ErrInternal, err)
return
if err := streamZipArchive(c, "download.zip", func(w io.Writer) error {
return flowfiles.ZipRelativePaths(w, s.flowDataDir(flowID), relPaths)
}); err != nil {
logger.FromContext(c).WithError(err).WithField("flow_id", flowID).Error("error streaming ZIP archive")
}
Comment thread
mason5052 marked this conversation as resolved.
c.DataFromReader(http.StatusOK, int64(buf.Len()), "application/zip", &buf,
map[string]string{
"Content-Disposition": mime.FormatMediaType("attachment", map[string]string{
"filename": "download.zip",
}),
})
}

// PullFlowFiles is a function to sync one or more paths from the container into the local cache
Expand Down
3 changes: 3 additions & 0 deletions backend/pkg/server/services/flow_files_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2501,6 +2501,9 @@ func TestFlowFileService_DownloadFlowFileScenarios(t *testing.T) {
assert.Contains(t, w.Header().Get("Content-Disposition"), tt.wantDispContains)
}
if tt.wantZipEntries != nil {
// A streamed ZIP download must not buffer the whole archive, so it
// must not carry a Content-Length computed from a full buffer.
assert.Empty(t, w.Header().Get("Content-Length"))
zr, err := zip.NewReader(bytes.NewReader(w.Body.Bytes()), int64(w.Body.Len()))
require.NoError(t, err)
got := map[string]string{}
Expand Down
76 changes: 53 additions & 23 deletions backend/pkg/server/services/resources.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package services

import (
"bytes"
"context"
"errors"
"fmt"
"io"
"mime"
"net/http"
"os"
Expand Down Expand Up @@ -1930,18 +1930,11 @@ func (s *ResourceService) DownloadResource(c *gin.Context) {
}

dirName := path.Base(e.rec.Path)
var buf bytes.Buffer
if err := resources.ZipResources(&buf, zipEntries); err != nil {
logger.FromContext(c).WithError(err).Error("error creating zip archive for download")
response.Error(c, response.ErrInternal, err)
return
if err := streamZipArchive(c, dirName+".zip", func(w io.Writer) error {
return resources.ZipResources(w, zipEntries)
}); err != nil {
logger.FromContext(c).WithError(err).Error("error streaming zip archive for download")
}
Comment thread
mason5052 marked this conversation as resolved.
c.DataFromReader(http.StatusOK, int64(buf.Len()), "application/zip", &buf,
map[string]string{
"Content-Disposition": mime.FormatMediaType("attachment", map[string]string{
"filename": dirName + ".zip",
}),
})
return
}

Expand Down Expand Up @@ -1983,18 +1976,55 @@ func (s *ResourceService) DownloadResource(c *gin.Context) {
}
}

var buf bytes.Buffer
if err := resources.ZipResources(&buf, zipEntries); err != nil {
logger.FromContext(c).WithError(err).Error("error creating zip archive for download")
response.Error(c, response.ErrInternal, err)
return
if err := streamZipArchive(c, "download.zip", func(w io.Writer) error {
return resources.ZipResources(w, zipEntries)
}); err != nil {
logger.FromContext(c).WithError(err).Error("error streaming zip archive for download")
}
c.DataFromReader(http.StatusOK, int64(buf.Len()), "application/zip", &buf,
map[string]string{
"Content-Disposition": mime.FormatMediaType("attachment", map[string]string{
"filename": "download.zip",
}),
})
}

// zipStreamWriter defers committing the ZIP response status and headers until the
// first byte is written. That lets streamZipArchive turn a build failure that
// happens before any output into a normal structured error response, while a
// mid-stream failure (after the 200 status is already on the wire) is aborted.
type zipStreamWriter struct {
c *gin.Context
filename string
started bool
}

func (zw *zipStreamWriter) Write(p []byte) (int, error) {
if !zw.started {
zw.started = true
zw.c.Header("Content-Type", "application/zip")
zw.c.Header("Content-Disposition", mime.FormatMediaType("attachment", map[string]string{
"filename": zw.filename,
}))
zw.c.Status(http.StatusOK)
}
return zw.c.Writer.Write(p)
}

// streamZipArchive streams a ZIP archive to the client as build writes it, instead
// of buffering the entire archive in memory before sending it. Memory stays
// proportional to a single file copy buffer rather than the full archive size.
//
// Response headers and the 200 status are committed lazily, on the first byte
// build writes (see zipStreamWriter). If build fails before writing anything
// (e.g. a blob that cannot be opened), nothing has been committed and a normal
// structured error response is returned. If it fails mid-stream the 200 status is
// already on the wire, so the partial response is aborted; either way the error is
// returned for the caller to log with its own context.
func streamZipArchive(c *gin.Context, filename string, build func(w io.Writer) error) error {
if err := build(&zipStreamWriter{c: c, filename: filename}); err != nil {
if c.Writer.Written() {
c.Abort()
} else {
response.Error(c, response.ErrInternal, err)
}
return err
}
return nil
}
Comment thread
mason5052 marked this conversation as resolved.

// ---- helper methods --------------------------------------------------------
Expand Down
89 changes: 89 additions & 0 deletions backend/pkg/server/services/resources_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"crypto/md5"
"encoding/hex"
"encoding/json"
"errors"
"io"
"mime/multipart"
"net/http"
Expand Down Expand Up @@ -1351,6 +1352,9 @@ func TestResourceService_DownloadResourceScenarios(t *testing.T) {
assert.Contains(t, w.Header().Get("Content-Disposition"), tt.wantDispContains)
}
if tt.wantZipEntries != nil {
// A streamed ZIP download must not buffer the whole archive, so it
// must not carry a Content-Length computed from a full buffer.
assert.Empty(t, w.Header().Get("Content-Length"))
zr, err := zip.NewReader(bytes.NewReader(w.Body.Bytes()), int64(w.Body.Len()))
require.NoError(t, err)
got := map[string]string{}
Expand All @@ -1370,6 +1374,91 @@ func TestResourceService_DownloadResourceScenarios(t *testing.T) {
}
}

// TestStreamZipArchive verifies the shared streaming helper writes a valid ZIP
// straight to the response writer without buffering the whole archive (no
// Content-Length is emitted) and that a build error mid-stream propagates to the
// caller and aborts the request.
func TestStreamZipArchive(t *testing.T) {
gin.SetMode(gin.TestMode)

t.Run("streams archive without content-length", func(t *testing.T) {
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Request = httptest.NewRequest(http.MethodGet, "/download", nil)

err := streamZipArchive(c, "out.zip", func(zw io.Writer) error {
z := zip.NewWriter(zw)
f, createErr := z.Create("hello.txt")
require.NoError(t, createErr)
if _, writeErr := f.Write([]byte("hello world")); writeErr != nil {
return writeErr
}
return z.Close()
})

require.NoError(t, err)
assert.False(t, c.IsAborted())
assert.Equal(t, "application/zip", w.Header().Get("Content-Type"))
assert.Contains(t, w.Header().Get("Content-Disposition"), "out.zip")
// The whole archive was never buffered, so no buffer-derived length exists.
assert.Empty(t, w.Header().Get("Content-Length"))

zr, err := zip.NewReader(bytes.NewReader(w.Body.Bytes()), int64(w.Body.Len()))
require.NoError(t, err)
require.Len(t, zr.File, 1)
assert.Equal(t, "hello.txt", zr.File[0].Name)
rc, err := zr.File[0].Open()
require.NoError(t, err)
data, err := io.ReadAll(rc)
require.NoError(t, err)
Comment thread
mason5052 marked this conversation as resolved.
require.NoError(t, rc.Close())
assert.Equal(t, "hello world", string(data))
})

t.Run("propagates build error after partial stream and aborts", func(t *testing.T) {
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Request = httptest.NewRequest(http.MethodGet, "/download", nil)

sentinel := errors.New("source reader failed mid-stream")
err := streamZipArchive(c, "out.zip", func(zw io.Writer) error {
z := zip.NewWriter(zw)
f, createErr := z.Create("partial.txt")
require.NoError(t, createErr)
if _, writeErr := f.Write([]byte("partial data")); writeErr != nil {
return writeErr
}
// Flush bytes to the response, then fail as a slow/erroring source would.
if flushErr := z.Flush(); flushErr != nil {
return flushErr
}
return sentinel
})

require.ErrorIs(t, err, sentinel)
assert.True(t, c.IsAborted())
})

t.Run("returns structured error when build fails before writing", func(t *testing.T) {
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Request = httptest.NewRequest(http.MethodGet, "/download", nil)

sentinel := errors.New("blob open failed before any write")
err := streamZipArchive(c, "out.zip", func(zw io.Writer) error {
// Fail immediately, before writing any archive bytes.
return sentinel
})

require.ErrorIs(t, err, sentinel)
// Nothing was streamed, so a normal (non-200, non-zip) error response is
// sent instead of a truncated 200.
assert.Equal(t, http.StatusInternalServerError, w.Code)
assert.NotEqual(t, "application/zip", w.Header().Get("Content-Type"))
assert.True(t, c.IsAborted())
})
}

func TestResourceService_DeleteResourceScenarios(t *testing.T) {
type seed struct {
userID uint64
Expand Down