Skip to content

feat: add bounded ZSTD encoder/decoder pool with memory management#327

Merged
EdSchouten merged 18 commits intobuildbarn:mainfrom
aron-muon:zstd-memory-management
Mar 12, 2026
Merged

feat: add bounded ZSTD encoder/decoder pool with memory management#327
EdSchouten merged 18 commits intobuildbarn:mainfrom
aron-muon:zstd-memory-management

Conversation

@aron-muon
Copy link
Contributor

@aron-muon aron-muon commented Mar 2, 2026

Summary

The existing CAS gRPC client creates a new zstd.Encoder and zstd.Decoder for every ByteStream request. Under high concurrency (e.g., large Bazel builds with many parallel actions), this leads to unbounded memory growth, GC pressure, and potential OOM — each encoder allocates ~4MB and each decoder ~8MB, and the klauspost/compress/zstd library spawns internal goroutines that leak without explicit Close() calls (klauspost/compress#264).

This PR introduces a pkg/zstd package with Pool, Encoder, and Decoder interfaces, a bounded pool implementation, and integrates it across the CAS client, ByteStream server, and reference-expanding blob access.

Approach

pkg/zstd/ package — clean interfaces for ZSTD encoder/decoder lifecycle:

  • Pool interface with NewEncoder(ctx, w) / NewDecoder(ctx, r) returning Encoder/Decoder
  • Encoder.Close() and Decoder.Close() handle flushing and release back to the pool
  • BoundedPool: reuses encoders/decoders via sync.Pool, limits concurrency via semaphores, providing backpressure under load
  • UnboundedPool: creates fresh encoders/decoders per call, for use when bounding is not required
  • MetricsPool: decorator exposing Prometheus counters (pool_acquisitions_total, pool_releases_total, pool_rejections_total) and histogram (pool_wait_duration_seconds) with object_type="Encoder"/"Decoder" label
  • NewPoolFromConfiguration(): creates a pool from protobuf config, automatically wrapped with metrics
  • NewReadCloser(): decompressing io.ReadCloser backed by the pool

ConfigurationPoolConfiguration proto in pkg/proto/configuration/zstd/zstd.proto, referenced from ApplicationConfiguration.zstd_pool. When set, creates a process-wide bounded pool. When absent, an unbounded pool is used. Compression is enabled for CAS clients by providing a non-nil pool (no separate bool flag).

Integration:

  • CAS client (cas_blob_access.go): Get acquires a decoder eagerly in Get(); Put acquires a pooled encoder with defer encoder.Close()
  • ByteStream server (byte_stream_server.go): uses pool for both compressed reads (encoding) and writes (decoding via NewReadCloser)
  • Reference-expanding blob access: uses NewReadCloser with the shared pool
  • All zstd usage consolidated — pkg/util/zstd_reader.go removed

Testing

  • Pool unit tests: acquire/release, concurrency limits, context cancellation, concurrent stress (50 goroutines), double-close safety
  • CAS integration tests: pool exhaustion returning ResourceExhausted, encoder release (sequential puts with pool size 1), decoder release (sequential gets with pool size 1)
  • Benchmarks: pooled vs non-pooled encoder performance

- Use shared ZSTD encoder/decoder pool instead of creating new instances per request
- Add NewCASBlobAccessWithPool for custom pool configurations
- Decoders and encoders are acquired from pool with backpressure (blocks when at capacity)
- Proper cleanup with defer to ensure encoders/decoders return to pool
- Memory usage now bounded by pool configuration (default ~320MB peak)
@aspect-workflows
Copy link

aspect-workflows bot commented Mar 2, 2026

Test

All tests were cache hits

30 tests (100.0%) were fully cached saving 6s.

@aron-muon aron-muon force-pushed the zstd-memory-management branch from 42c50e4 to 92b8cd9 Compare March 2, 2026 12:07
- Fix SetDefaultZstdPool bug where alreadyInit was never true, so the
  panic for double-initialization never fired
- Replace max64 helper with built-in max (Go 1.21+)
- Fix nil error sent to channel in concurrent test when data mismatches
- Fix BenchmarkZstdNoPool to use zstd.NewWriter directly for fair comparison
- Add TestSetDefaultZstdPoolAfterInitPanics to verify the bug fix
- Document context.Background() limitation in chunk reader Read()
Copy link
Member

@EdSchouten EdSchouten left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know you're trying to make the best out of the entire situation. But let me use this opportunity to go on record and say this:

This is a clear demonstration of why I so absolutely hate the way the Remote API working group added support for compression to REv2. Having all of this bloat in bb_storage is just awful.

- Remove finalizers (we own the code, no need for safety nets)
- Delete zstd_config.go (config struct without protobuf integration is pointless)
- Remove global state (defaultZstdPool, sync.Once, SetDefaultZstdPool)
- Remove null-defaulting pattern (callers must pass pool explicitly)
- Consolidate to single NewCASBlobAccess constructor with required pool param
- Remove unused exports (TryAcquire*, AcquireDecoderAsReadCloser, PooledReadCloser)
- Create pool at configuration site when compression is enabled
@aron-muon aron-muon requested a review from EdSchouten March 2, 2026 12:46
@aron-muon
Copy link
Contributor Author

I know you're trying to make the best out of the entire situation. But let me use this opportunity to go on record and say this:

This is a clear demonstration of why I so absolutely hate the way the Remote API working group added support for compression to REv2. Having all of this bloat in bb_storage is just awful.

I can easily understand that. The compression support in REv2 adds a lot of plumbing. Happy to keep the implementation as lean as possible

Replace the hardcoded encoder/decoder pool limits with a
ZstdCompressionConfiguration protobuf message on
GrpcBlobAccessConfiguration. The presence of the message
enables compression; its absence disables it entirely
with no pool allocated.

This addresses maintainer feedback that there should be no
default values, as deployments range from Raspberry Pi to
128-core EC2 instances.
@aron-muon
Copy link
Contributor Author

Hey @EdSchouten , mind letting the workflows run?
We are running the changes here in production within our Bazel cache, and are really pleased with the memory management

Move ZstdCompressionConfiguration from per-GrpcBlobAccessConfiguration
to the top-level ApplicationConfiguration, creating a single process-wide
pool shared by all gRPC CAS clients and the ByteStream server.

This addresses the concern that per-backend pools defeat the memory
bound when using sharding/mirroring (N backends = N pools), and enables
the ByteStream server to use pooled encoders/decoders instead of
creating them ad-hoc per request.

Also renames max_encoders/max_decoders to maximum_encoders/maximum_decoders
for consistency with other configuration options.
@aron-muon aron-muon requested a review from EdSchouten March 10, 2026 13:36
@EdSchouten
Copy link
Member

Hey @aron-muon,

What if we add a pkg/zstd/pool.go with inside of that:

type Decoder interface {
   Close() error
   // All of the other stuff that we use from klauspost.
   ....
}

type Encoder interface {
    Close() error
   // All of the other stuff that we use from klauspost.
   ....
}

type Pool interface {
    NewDecoder(ctx context.Context, r io.Reader) (Decoder, error)
    NewEncoder(ctx context.Context, w io.Writer) (Encoder, error)
}

Then we take that BoundedZstdPool implementation of yours, but place it in pkg/zstd/bounded_pool.go and rename it to NewBoundedPool() + boundedPool. Instead of ReleaseDecoder()/ReleaseEncoder(), we override {Decoder,Encoder}.Close() to do the right thing.

I saw that you patched up the bytestream server to conditionally call into the pool or not. With a change like this, this would no longer be necessary. If we wanted to continue to support non-pooled usage of zstd, we could just provide a different implementation of Pool.

I also saw that reference_expanding_blob_access.go also talks to klauspost directly. We should probably patch that up as well. There is also pkg/util/zstd_reader.go. Probably worth moving that to pkg/zstd/read_closer.go, and removing Zstd from the function/type names.

One question I do have about this approach though: by having a maximum decoder/encoder count in place, isn't there a risk that this change leads to deadlocks/lack of forward progress? One user inside a Starbucks with poor wifi could issue a bunch of Read()/Write() calls and cause encoders/decoders to get exhausted.

Introduce clean interfaces for ZSTD compression pooling as requested
by @EdSchouten. Encoder.Close() and Decoder.Close() now handle release
back to the pool, eliminating separate Release methods and the risk of
forgetting them.

- Pool interface with NewEncoder(ctx, w) / NewDecoder(ctx, r)
- BoundedPool: semaphore-limited, reuses via sync.Pool (was BoundedZstdPool)
- UnboundedPool: fresh encoder/decoder per call, replaces ad-hoc usage
- Eliminates conditional pool/non-pool code paths in ByteStream server
- Consolidates all zstd usage: moves pkg/util/zstd_reader.go to
  pkg/zstd/read_closer.go, patches reference_expanding_blob_access.go
@aron-muon
Copy link
Contributor Author

Hey @aron-muon,

What if we add a pkg/zstd/pool.go with inside of that:

type Decoder interface {
   Close() error
   // All of the other stuff that we use from klauspost.
   ....
}

type Encoder interface {
    Close() error
   // All of the other stuff that we use from klauspost.
   ....
}

type Pool interface {
    NewDecoder(ctx context.Context, r io.Reader) (Decoder, error)
    NewEncoder(ctx context.Context, w io.Writer) (Encoder, error)
}

Then we take that BoundedZstdPool implementation of yours, but place it in pkg/zstd/bounded_pool.go and rename it to NewBoundedPool() + boundedPool. Instead of ReleaseDecoder()/ReleaseEncoder(), we override {Decoder,Encoder}.Close() to do the right thing.

I saw that you patched up the bytestream server to conditionally call into the pool or not. With a change like this, this would no longer be necessary. If we wanted to continue to support non-pooled usage of zstd, we could just provide a different implementation of Pool.

I also saw that reference_expanding_blob_access.go also talks to klauspost directly. We should probably patch that up as well. There is also pkg/util/zstd_reader.go. Probably worth moving that to pkg/zstd/read_closer.go, and removing Zstd from the function/type names.

One question I do have about this approach though: by having a maximum decoder/encoder count in place, isn't there a risk that this change leads to deadlocks/lack of forward progress? One user inside a Starbucks with poor wifi could issue a bunch of Read()/Write() calls and cause encoders/decoders to get exhausted.

Yep that's a really clean choice, thank you for the comment!

  • Created pkg/zstd/ with Pool, Encoder, Decoder interfaces exactly as you described
  • BoundedPool where Close() flushes and releases back to pool, no more separate Release methods
  • UnboundedPool that creates fresh encoders/decoders per call, eliminating the conditional pool/non-pool paths in the ByteStream server
  • Moved pkg/util/zstd_reader.gopkg/zstd/read_closer.go (renamed to NewReadCloser)
  • Patched reference_expanding_blob_access.go to use bb_zstd.NewReadCloser()

On the deadlock/starvation risk: you're right that a slow client holds a decoder for the full upload duration on the write path (since the decoder reads directly from the gRPC stream). Context deadlines prevent indefinite blocking on acquire, but don't help with slots already held by slow clients. The alternative (unbounded allocation) is what causes OOM under load, so bounded-with-backpressure is strictly better, but it needs to be observable and tunable. I'm thinking we expose Prometheus metrics on the pool:

  • buildbarn_zstd_pool_encoders_in_use / decoders_in_use — are we near capacity?
  • acquisitions_total / rejections_total — are requests getting dropped?
  • wait_duration_seconds — how long are requests blocking for a slot?

This lets operators see the Starbucks scenario happening (in-use pinned at max, wait duration spiking) and tune pool size accordingly. Since the buildbarn tools are not opinionated by default, I think putting the onus on operators to tune the setup is appropriate. What do you think?

@EdSchouten
Copy link
Member

I'm thinking we expose Prometheus metrics on the pool:

  • buildbarn_zstd_pool_encoders_in_use / decoders_in_use — are we near capacity?
  • acquisitions_total / rejections_total — are requests getting dropped?
  • wait_duration_seconds — how long are requests blocking for a slot?

This lets operators see the Starbucks scenario happening (in-use pinned at max, wait duration spiking) and tune pool size accordingly. Since the buildbarn tools are not opinionated by default, I think putting the onus on operators to tune the setup is appropriate. What do you think?

SGTM! Would it be possible to implement that in the form of a MetricsPool or something? So as a wrapper around Pool. Just like we already have for BlobAccess, pkg/eviction's Set, etc.

In terms of metrics: be sure to use counters over gauges. So encoders_in_use is something I'd avoid. Just add counters for the number of acquisitions and releases. You can always subtract those two to get the number in use.

- Decoder.Close() returns no error (removes unboundedDecoder wrapper)
- NewReadCloser takes Pool; ReferenceExpandingBlobAccess takes zstd.Pool
- Use util.AcquireSemaphore() for consistent cancellation behavior
- Drop errors.Join in favor of silently discarding zstd close errors
- Rename decoder_max_window_size_bytes to decoder_window_size_bytes
- Pass unbounded pool to bb_copy/bb_replicator instead of nil
- Move proto to pkg/proto/configuration/zstd as PoolConfiguration
- Add NewPoolFromConfiguration() helper (pattern from pkg/jwt)
Implements MetricsPool as a decorator around Pool, following the same
pattern as MetricsBlobAccess and eviction's MetricsSet. Tracks encoder
and decoder acquisitions, releases, and rejections as counters, plus
wait duration as a histogram. Automatically applied by
NewPoolFromConfiguration so all pools get metrics automatically.
@aron-muon
Copy link
Contributor Author

SGTM! Would it be possible to implement that in the form of a MetricsPool or something? So as a wrapper around Pool. Just like we already have for BlobAccess, pkg/eviction's Set, etc.

In terms of metrics: be sure to use counters over gauges. So encoders_in_use is something I'd avoid. Just add counters for the number of acquisitions and releases. You can always subtract those two to get the number in use.

I've implemented MetricsPool as a wrapper around Pool, same pattern as MetricsBlobAccess and eviction's MetricsSet. Uses counters as you suggested:

  • buildbarn_zstd_pool_operations_total with labels for EncoderAcquire, EncoderRelease, EncoderReject, DecoderAcquire, DecoderRelease, DecoderReject
  • buildbarn_zstd_pool_wait_duration_seconds histogram for acquire latency

All pools get metrics automatically!

@aron-muon aron-muon requested a review from EdSchouten March 11, 2026 12:33
Copy link
Member

@EdSchouten EdSchouten left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is starting to look pretty good, @aron-muon. Thanks for your perseverance!

- Revert blobstore.proto to upstream; remove enable_zstd_compression
  field entirely, use zstdPool != nil to control compression
- Rename zstd_compression to zstd_pool in ApplicationConfiguration
- Remove lazy init from zstdByteStreamChunkReader; call NewDecoder
  eagerly in casBlobAccess.Get()
- Extract ctx := out.Context() / stream.Context() to avoid repeated
  calls in ByteStream server
- Use zstd.NewReadCloser in writeZstd, eliminating decoderReadCloser
- NewReadCloser takes context.Context parameter
- NewCASAndACBlobAccessFromConfiguration takes zstdPool parameter;
  NewPoolFromConfiguration calls only live in cmd/*/main.go
- Restructure metrics: pool_acquisitions_total, pool_releases_total,
  pool_rejections_total with object_type=Encoder/Decoder label
- Revert whitespace-only BUILD.bazel change
@aron-muon aron-muon requested a review from EdSchouten March 11, 2026 13:35
Only pass zstdPool to NewCASBlobAccess when the per-backend
EnableCompression config is true. Without this, all gRPC CAS backends
would use compression whenever a pool is configured, ignoring the
per-backend opt-in.
@aron-muon aron-muon requested a review from EdSchouten March 11, 2026 14:07
Copy link
Member

@EdSchouten EdSchouten left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this looks great, @aron-muon. Thanks for taking the time to make these refactoring changes. I've just kicked off CI. Assuming CI is happy, I will merge it tomorrow morning.

In the meantime, can you please roll out this change on your end and double check that it doesn't cause any obvious regressions for things not covered by tests? Thanks!

@aron-muon
Copy link
Contributor Author

I think this looks great, @aron-muon. Thanks for taking the time to make these refactoring changes. I've just kicked off CI. Assuming CI is happy, I will merge it tomorrow morning.

In the meantime, can you please roll out this change on your end and double check that it doesn't cause any obvious regressions for things not covered by tests? Thanks!

CI was failing lint - I fixed that in the latest commit. I'll launch this on our side and will report back on whether we find any issues!

@EdSchouten EdSchouten merged commit f48c18e into buildbarn:main Mar 12, 2026
3 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants