Skip to content

Ext_proc filter#12792

Open
kannanjgithub wants to merge 382 commits into
grpc:masterfrom
kannanjgithub:ext-proc
Open

Ext_proc filter#12792
kannanjgithub wants to merge 382 commits into
grpc:masterfrom
kannanjgithub:ext-proc

Conversation

@kannanjgithub
Copy link
Copy Markdown
Contributor

@kannanjgithub kannanjgithub commented May 4, 2026

Implements ext_proc filter from A93 (internal design doc)
Metrics aren't implemented yet.
Includes commits from unmerged Filter API enhancements and channel caching PRs.
Only the ExternaProcessingFilter.java, ExternaProcessingFilterTest.java and the envoy xds proto import and generated code need to be reviewed.
Rebasing commit history caused all received and merged commits to show my name as the committer, ignore all commits for which I'm not shown as the author.

…estsAreForwardedImmediately to use in-process server
…roc response but only sent a mode override. As per envoy ext-processing ext-proc cannot omit request_headers (even if empty) in response to a request_headers processing request.
… track close already called for handling immediate response.
…ubHasCorrectDeadline to use real dataPlaneChannel
…xtProcStreamSendsMetadata to use real dataPlaneChannel
…sHeadersAndCallIsBuffered to use real dataPlaneChannel
…henMutationsAreAppliedAndCallIsActivated to use real dataPlaneChannel
…sActivatedImmediately to use real dataPlaneChannel and ServerInterceptor
…henMessagesAreDiscarded to use real dataPlaneChannel
…ExtProcAndSuperHalfCloseIsDeferred to use real dataPlaneChannel
…nSuperHalfCloseIsCalled to use real dataPlaneChannel
…stsAreForwardedImmediately to use real dataPlaneChannel
…sHeadersAndCallIsBuffered to use real dataPlaneChannel
…henMutationsAreAppliedAndCallIsActivated to use real dataPlaneChannel
…sActivatedImmediately to use real dataPlaneChannel
…henMutatedBodyIsForwardedToDataPlane to use real dataPlaneChannel
…henMutatedBodyIsForwardedToDataPlane to use real dataPlaneChannel
Updated ExternalProcessorFilter to include the `protocol_config` field in
 the very first `ProcessingRequest` sent to the sidecar (RequestHeaders).
The configuration includes the `request_body_mode` and `response_body_mode`
derived from the filter's processing mode, as required by gRFC A93.
Added a unit test in Category 4 to verify that `protocol_config` is correctly
populated on the first message and omitted from all subsequent messages on
the stream.

Add tests for trailers HeaderSendMode default to SKIP for DEFAULT.

nit: Renumbered out of order test categories.
…ment rather than a granular merge of its fields.
# Conflicts:
#	xds/src/test/java/io/grpc/xds/internal/headermutations/HeaderMutationsTest.java
@kannanjgithub kannanjgithub marked this pull request as draft May 4, 2026 11:39
# Conflicts:
#	xds/src/main/java/io/grpc/xds/ExternalProcessorFilter.java
@kannanjgithub kannanjgithub marked this pull request as ready for review May 4, 2026 12:39
@kannanjgithub kannanjgithub requested a review from sauravzg May 4, 2026 12:39
Fix style and warning as errors.
Comment thread xds/src/main/java/io/grpc/xds/ExternalProcessorFilter.java Outdated
Comment thread xds/src/main/java/io/grpc/xds/ExternalProcessorFilter.java Outdated
Comment thread xds/src/main/java/io/grpc/xds/ExternalProcessorFilter.java Outdated
Comment thread xds/src/main/java/io/grpc/xds/ExternalProcessorFilter.java Outdated
Comment thread xds/src/main/java/io/grpc/xds/ExternalProcessorFilter.java
Comment thread xds/src/main/java/io/grpc/xds/ExternalProcessorFilter.java Outdated
Comment thread xds/src/main/java/io/grpc/xds/ExternalProcessorFilter.java
Comment thread xds/src/main/java/io/grpc/xds/ExternalProcessorFilter.java Outdated
Comment thread xds/src/main/java/io/grpc/xds/ExternalProcessorFilter.java Outdated
Comment thread xds/src/main/java/io/grpc/xds/ExternalProcessorFilter.java
@sauravzg
Copy link
Copy Markdown
Collaborator

sauravzg commented May 6, 2026

Reviewed approximately ~500 LOC of source(not tests), which currently mostly only covers the config part of things. Sending comments incrementally to kick start progress, since it seems that I'll need probably about 3 days to review all of source.

Comment thread xds/src/main/java/io/grpc/xds/ExternalProcessorFilter.java
Comment thread xds/src/main/java/io/grpc/xds/ExternalProcessorFilter.java Outdated
Comment thread xds/src/main/java/io/grpc/xds/ExternalProcessorFilter.java Outdated
Comment thread xds/src/main/java/io/grpc/xds/ExternalProcessorFilter.java Outdated
Comment thread xds/src/main/java/io/grpc/xds/ExternalProcessorFilter.java Outdated
Comment thread xds/src/main/java/io/grpc/xds/ExternalProcessorFilter.java Outdated
Comment thread xds/src/main/java/io/grpc/xds/ExternalProcessorFilter.java Outdated
Comment thread xds/src/main/java/io/grpc/xds/ExternalProcessorFilter.java
Comment thread xds/src/main/java/io/grpc/xds/ExternalProcessorFilter.java Outdated
@sauravzg
Copy link
Copy Markdown
Collaborator

sauravzg commented May 7, 2026

Reviewed about another 300 LOC, which covers static utilities and the interceptors. Speed was slower than expected, but will try to catch up.

Comment thread xds/src/main/java/io/grpc/xds/ExternalProcessorFilter.java
Comment thread xds/src/main/java/io/grpc/xds/ExternalProcessorFilter.java
Comment thread xds/src/main/java/io/grpc/xds/ExternalProcessorFilter.java
@kannanjgithub kannanjgithub force-pushed the ext-proc branch 3 times, most recently from c75ba11 to 64010fd Compare May 8, 2026 16:13
private long serverTrailersStartNanos;

private volatile Metadata requestHeaders;
final AtomicBoolean activated = new AtomicBoolean(false);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we be using an atomic reference enum with a state machine driver here?

The current set of 10 atomic bools, results in potentially 2^10 representable states. I don't think we have that many and should try to compress the state machine.

this.backendService = checkNotNull(backendService, "backendService");
}

private void activateCall() {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optional: Somewhat echoing go/java-practices/class-structure - Maybe we should try to put the public member that uses a private method before the private method to enhance readability.

}
}

private boolean checkCompressionSupport(BodyResponse bodyResponse) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function name feels slightly misleading. Given that it starts with check and returns a boolean, I'd expect it to not have any side effects, but it does.

My rec would be to either rename this method appropriately (optionally documenting the side effects) or split this method to reflect check and side effect behavior separately.

StatusRuntimeException ex = Status.UNAVAILABLE
.withDescription("gRPC message compression not supported in ext_proc")
.asRuntimeException();
if (!extProcStreamCompleted.get() && extProcClientCallRequestObserver != null) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This member seems to be synchronized in other places. Should we have it here as well or is this a conscious decision?

}
headersToModify.add(HeaderValueOption.create(
headerValue,
HeaderValueOption.HeaderAppendAction.valueOf(protoOption.getAppendAction().name()),
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may be slightly risky IMO.
I am not sure we want to couple our internal enum to xds proto enum, so I'd prefer an explicit switch case if possible.

@Override
public void beforeStart(ClientCallStreamObserver<ProcessingRequest> requestStream) {
synchronized (streamLock) {
extProcClientCallRequestObserver = requestStream;
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couple of things here.

  • It seems like we aren't respecting control flow before calling on next for the stream and processing pending buffer
  • It seems like this is dead code. BeforeStart is called synchronously so pendingRequests should in theory always be empty.

public void beforeStart(ClientCallStreamObserver<ProcessingRequest> requestStream) {
synchronized (streamLock) {
extProcClientCallRequestObserver = requestStream;
while (!pendingProcessingRequests.isEmpty()) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have some questiosns on of why we need pendingProcessing.... The design doc seems to mention a couple of cases

  • observability mode ext proc slowness
  • request drain.

But the grfc for observability mode requests flow control over buffering: https://github.com/markdroth/proposal/blob/6ee533dac7d9d1b71ad46ba8826d2a3b3cdba313/A93-xds-ext-proc.md#flow-control . This should be somewhat easy to enforce, where we push problem to ext proc server, if our caller doesn't respect flow control.

Similarly, flow control is recommended solution for draining as well at https://github.com/markdroth/proposal/blob/6ee533dac7d9d1b71ad46ba8826d2a3b3cdba313/A93-xds-ext-proc.md#early-termination-of-ext_proc-stream . I don't really have a good solution for this if the caller doesn't respect flow control, we'll have to buffer to avoid out of order messages. Maybe let's discuss with Mark on the grfc about this.

.asRuntimeException());
return;
}
Metadata target = wrappedListener.trailersOnly.get()
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

encapsulation instead of member access.

}
}
// 3. Client Trailers
else if (response.hasRequestTrailers()) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we be handling this? The grfc doesn't talk about this.

Would this also make proceedWithClose dead code?

if (extProcStreamCompleted.compareAndSet(false, true)) {
synchronized (streamLock) {
if (extProcClientCallRequestObserver != null) {
extProcClientCallRequestObserver.onError(t);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my education is this valid behavior?

I'd assume, when the stream has error, it calls our onError , but it seems we are propagating it back to the request stream. Am I missing something here?

@sauravzg
Copy link
Copy Markdown
Collaborator

sauravzg commented May 11, 2026

Reviewed the extproc response handling. Will be reviewing rest of the client call private methods today.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.