feat: add bounded ZSTD encoder/decoder pool with memory management#327
feat: add bounded ZSTD encoder/decoder pool with memory management#327EdSchouten merged 18 commits intobuildbarn:mainfrom
Conversation
- Use shared ZSTD encoder/decoder pool instead of creating new instances per request - Add NewCASBlobAccessWithPool for custom pool configurations - Decoders and encoders are acquired from pool with backpressure (blocks when at capacity) - Proper cleanup with defer to ensure encoders/decoders return to pool - Memory usage now bounded by pool configuration (default ~320MB peak)
|
42c50e4 to
92b8cd9
Compare
- Fix SetDefaultZstdPool bug where alreadyInit was never true, so the panic for double-initialization never fired - Replace max64 helper with built-in max (Go 1.21+) - Fix nil error sent to channel in concurrent test when data mismatches - Fix BenchmarkZstdNoPool to use zstd.NewWriter directly for fair comparison - Add TestSetDefaultZstdPoolAfterInitPanics to verify the bug fix - Document context.Background() limitation in chunk reader Read()
EdSchouten
left a comment
There was a problem hiding this comment.
I know you're trying to make the best out of the entire situation. But let me use this opportunity to go on record and say this:
This is a clear demonstration of why I so absolutely hate the way the Remote API working group added support for compression to REv2. Having all of this bloat in bb_storage is just awful.
- Remove finalizers (we own the code, no need for safety nets) - Delete zstd_config.go (config struct without protobuf integration is pointless) - Remove global state (defaultZstdPool, sync.Once, SetDefaultZstdPool) - Remove null-defaulting pattern (callers must pass pool explicitly) - Consolidate to single NewCASBlobAccess constructor with required pool param - Remove unused exports (TryAcquire*, AcquireDecoderAsReadCloser, PooledReadCloser) - Create pool at configuration site when compression is enabled
I can easily understand that. The compression support in REv2 adds a lot of plumbing. Happy to keep the implementation as lean as possible |
Zstd memory management
Replace the hardcoded encoder/decoder pool limits with a ZstdCompressionConfiguration protobuf message on GrpcBlobAccessConfiguration. The presence of the message enables compression; its absence disables it entirely with no pool allocated. This addresses maintainer feedback that there should be no default values, as deployments range from Raspberry Pi to 128-core EC2 instances.
|
Hey @EdSchouten , mind letting the workflows run? |
Move ZstdCompressionConfiguration from per-GrpcBlobAccessConfiguration to the top-level ApplicationConfiguration, creating a single process-wide pool shared by all gRPC CAS clients and the ByteStream server. This addresses the concern that per-backend pools defeat the memory bound when using sharding/mirroring (N backends = N pools), and enables the ByteStream server to use pooled encoders/decoders instead of creating them ad-hoc per request. Also renames max_encoders/max_decoders to maximum_encoders/maximum_decoders for consistency with other configuration options.
|
Hey @aron-muon, What if we add a pkg/zstd/pool.go with inside of that: type Decoder interface {
Close() error
// All of the other stuff that we use from klauspost.
....
}
type Encoder interface {
Close() error
// All of the other stuff that we use from klauspost.
....
}
type Pool interface {
NewDecoder(ctx context.Context, r io.Reader) (Decoder, error)
NewEncoder(ctx context.Context, w io.Writer) (Encoder, error)
}Then we take that BoundedZstdPool implementation of yours, but place it in pkg/zstd/bounded_pool.go and rename it to NewBoundedPool() + boundedPool. Instead of ReleaseDecoder()/ReleaseEncoder(), we override {Decoder,Encoder}.Close() to do the right thing. I saw that you patched up the bytestream server to conditionally call into the pool or not. With a change like this, this would no longer be necessary. If we wanted to continue to support non-pooled usage of zstd, we could just provide a different implementation of Pool. I also saw that reference_expanding_blob_access.go also talks to klauspost directly. We should probably patch that up as well. There is also pkg/util/zstd_reader.go. Probably worth moving that to pkg/zstd/read_closer.go, and removing Zstd from the function/type names. One question I do have about this approach though: by having a maximum decoder/encoder count in place, isn't there a risk that this change leads to deadlocks/lack of forward progress? One user inside a Starbucks with poor wifi could issue a bunch of Read()/Write() calls and cause encoders/decoders to get exhausted. |
Introduce clean interfaces for ZSTD compression pooling as requested by @EdSchouten. Encoder.Close() and Decoder.Close() now handle release back to the pool, eliminating separate Release methods and the risk of forgetting them. - Pool interface with NewEncoder(ctx, w) / NewDecoder(ctx, r) - BoundedPool: semaphore-limited, reuses via sync.Pool (was BoundedZstdPool) - UnboundedPool: fresh encoder/decoder per call, replaces ad-hoc usage - Eliminates conditional pool/non-pool code paths in ByteStream server - Consolidates all zstd usage: moves pkg/util/zstd_reader.go to pkg/zstd/read_closer.go, patches reference_expanding_blob_access.go
Yep that's a really clean choice, thank you for the comment!
On the deadlock/starvation risk: you're right that a slow client holds a decoder for the full upload duration on the write path (since the decoder reads directly from the gRPC stream). Context deadlines prevent indefinite blocking on acquire, but don't help with slots already held by slow clients. The alternative (unbounded allocation) is what causes OOM under load, so bounded-with-backpressure is strictly better, but it needs to be observable and tunable. I'm thinking we expose Prometheus metrics on the pool:
This lets operators see the Starbucks scenario happening (in-use pinned at max, wait duration spiking) and tune pool size accordingly. Since the buildbarn tools are not opinionated by default, I think putting the onus on operators to tune the setup is appropriate. What do you think? |
SGTM! Would it be possible to implement that in the form of a MetricsPool or something? So as a wrapper around Pool. Just like we already have for BlobAccess, pkg/eviction's Set, etc. In terms of metrics: be sure to use counters over gauges. So encoders_in_use is something I'd avoid. Just add counters for the number of acquisitions and releases. You can always subtract those two to get the number in use. |
- Decoder.Close() returns no error (removes unboundedDecoder wrapper) - NewReadCloser takes Pool; ReferenceExpandingBlobAccess takes zstd.Pool - Use util.AcquireSemaphore() for consistent cancellation behavior - Drop errors.Join in favor of silently discarding zstd close errors - Rename decoder_max_window_size_bytes to decoder_window_size_bytes - Pass unbounded pool to bb_copy/bb_replicator instead of nil - Move proto to pkg/proto/configuration/zstd as PoolConfiguration - Add NewPoolFromConfiguration() helper (pattern from pkg/jwt)
Implements MetricsPool as a decorator around Pool, following the same pattern as MetricsBlobAccess and eviction's MetricsSet. Tracks encoder and decoder acquisitions, releases, and rejections as counters, plus wait duration as a histogram. Automatically applied by NewPoolFromConfiguration so all pools get metrics automatically.
I've implemented
All pools get metrics automatically! |
EdSchouten
left a comment
There was a problem hiding this comment.
This is starting to look pretty good, @aron-muon. Thanks for your perseverance!
- Revert blobstore.proto to upstream; remove enable_zstd_compression field entirely, use zstdPool != nil to control compression - Rename zstd_compression to zstd_pool in ApplicationConfiguration - Remove lazy init from zstdByteStreamChunkReader; call NewDecoder eagerly in casBlobAccess.Get() - Extract ctx := out.Context() / stream.Context() to avoid repeated calls in ByteStream server - Use zstd.NewReadCloser in writeZstd, eliminating decoderReadCloser - NewReadCloser takes context.Context parameter - NewCASAndACBlobAccessFromConfiguration takes zstdPool parameter; NewPoolFromConfiguration calls only live in cmd/*/main.go - Restructure metrics: pool_acquisitions_total, pool_releases_total, pool_rejections_total with object_type=Encoder/Decoder label - Revert whitespace-only BUILD.bazel change
Only pass zstdPool to NewCASBlobAccess when the per-backend EnableCompression config is true. Without this, all gRPC CAS backends would use compression whenever a pool is configured, ignoring the per-backend opt-in.
EdSchouten
left a comment
There was a problem hiding this comment.
I think this looks great, @aron-muon. Thanks for taking the time to make these refactoring changes. I've just kicked off CI. Assuming CI is happy, I will merge it tomorrow morning.
In the meantime, can you please roll out this change on your end and double check that it doesn't cause any obvious regressions for things not covered by tests? Thanks!
CI was failing lint - I fixed that in the latest commit. I'll launch this on our side and will report back on whether we find any issues! |

Summary
The existing CAS gRPC client creates a new
zstd.Encoderandzstd.Decoderfor every ByteStream request. Under high concurrency (e.g., large Bazel builds with many parallel actions), this leads to unbounded memory growth, GC pressure, and potential OOM — each encoder allocates ~4MB and each decoder ~8MB, and theklauspost/compress/zstdlibrary spawns internal goroutines that leak without explicitClose()calls (klauspost/compress#264).This PR introduces a
pkg/zstdpackage withPool,Encoder, andDecoderinterfaces, a bounded pool implementation, and integrates it across the CAS client, ByteStream server, and reference-expanding blob access.Approach
pkg/zstd/package — clean interfaces for ZSTD encoder/decoder lifecycle:Poolinterface withNewEncoder(ctx, w)/NewDecoder(ctx, r)returningEncoder/DecoderEncoder.Close()andDecoder.Close()handle flushing and release back to the poolBoundedPool: reuses encoders/decoders viasync.Pool, limits concurrency via semaphores, providing backpressure under loadUnboundedPool: creates fresh encoders/decoders per call, for use when bounding is not requiredMetricsPool: decorator exposing Prometheus counters (pool_acquisitions_total,pool_releases_total,pool_rejections_total) and histogram (pool_wait_duration_seconds) withobject_type="Encoder"/"Decoder"labelNewPoolFromConfiguration(): creates a pool from protobuf config, automatically wrapped with metricsNewReadCloser(): decompressingio.ReadCloserbacked by the poolConfiguration —
PoolConfigurationproto inpkg/proto/configuration/zstd/zstd.proto, referenced fromApplicationConfiguration.zstd_pool. When set, creates a process-wide bounded pool. When absent, an unbounded pool is used. Compression is enabled for CAS clients by providing a non-nil pool (no separate bool flag).Integration:
cas_blob_access.go):Getacquires a decoder eagerly inGet();Putacquires a pooled encoder withdefer encoder.Close()byte_stream_server.go): uses pool for both compressed reads (encoding) and writes (decoding viaNewReadCloser)NewReadCloserwith the shared poolpkg/util/zstd_reader.goremovedTesting
ResourceExhausted, encoder release (sequential puts with pool size 1), decoder release (sequential gets with pool size 1)