Ext_proc filter#12792
Conversation
…estsAreForwardedImmediately to use in-process server
…roc response but only sent a mode override. As per envoy ext-processing ext-proc cannot omit request_headers (even if empty) in response to a request_headers processing request.
… track close already called for handling immediate response.
…lizingExecutor to use real dataPlaneChannel
…ubHasCorrectDeadline to use real dataPlaneChannel
…xtProcStreamSendsMetadata to use real dataPlaneChannel
…sHeadersAndCallIsBuffered to use real dataPlaneChannel
…henMutationsAreAppliedAndCallIsActivated to use real dataPlaneChannel
…sActivatedImmediately to use real dataPlaneChannel and ServerInterceptor
…entToExtProc to use real dataPlaneChannel
…henMessagesAreDiscarded to use real dataPlaneChannel
…ExtProcAndSuperHalfCloseIsDeferred to use real dataPlaneChannel
…nSuperHalfCloseIsCalled to use real dataPlaneChannel
…False to use real dataPlaneChannel
…sTrue to use real dataPlaneChannel
…OnReady to use real dataPlaneChannel
…Ready to use real dataPlaneChannel
…stsAreForwardedImmediately to use real dataPlaneChannel
…sHeadersAndCallIsBuffered to use real dataPlaneChannel
…henMutationsAreAppliedAndCallIsActivated to use real dataPlaneChannel
…sActivatedImmediately to use real dataPlaneChannel
…entToExtProc to use real dataPlaneChannel
…henMutatedBodyIsForwardedToDataPlane to use real dataPlaneChannel
…eCallFailsOpen to use real dataPlaneChannel
…lizingExecutor to use real dataPlaneChannel
…henMutatedBodyIsForwardedToDataPlane to use real dataPlaneChannel
Updated ExternalProcessorFilter to include the `protocol_config` field in the very first `ProcessingRequest` sent to the sidecar (RequestHeaders). The configuration includes the `request_body_mode` and `response_body_mode` derived from the filter's processing mode, as required by gRFC A93. Added a unit test in Category 4 to verify that `protocol_config` is correctly populated on the first message and omitted from all subsequent messages on the stream. Add tests for trailers HeaderSendMode default to SKIP for DEFAULT. nit: Renumbered out of order test categories.
…ment rather than a granular merge of its fields.
# Conflicts: # xds/src/test/java/io/grpc/xds/internal/headermutations/HeaderMutationsTest.java
# Conflicts: # xds/src/main/java/io/grpc/xds/ExternalProcessorFilter.java
Fix style and warning as errors.
|
Reviewed approximately ~500 LOC of source(not tests), which currently mostly only covers the config part of things. Sending comments incrementally to kick start progress, since it seems that I'll need probably about 3 days to review all of source. |
|
Reviewed about another 300 LOC, which covers static utilities and the interceptors. Speed was slower than expected, but will try to catch up. |
c75ba11 to
64010fd
Compare
| private long serverTrailersStartNanos; | ||
|
|
||
| private volatile Metadata requestHeaders; | ||
| final AtomicBoolean activated = new AtomicBoolean(false); |
There was a problem hiding this comment.
Should we be using an atomic reference enum with a state machine driver here?
The current set of 10 atomic bools, results in potentially 2^10 representable states. I don't think we have that many and should try to compress the state machine.
| this.backendService = checkNotNull(backendService, "backendService"); | ||
| } | ||
|
|
||
| private void activateCall() { |
There was a problem hiding this comment.
Optional: Somewhat echoing go/java-practices/class-structure - Maybe we should try to put the public member that uses a private method before the private method to enhance readability.
| } | ||
| } | ||
|
|
||
| private boolean checkCompressionSupport(BodyResponse bodyResponse) { |
There was a problem hiding this comment.
This function name feels slightly misleading. Given that it starts with check and returns a boolean, I'd expect it to not have any side effects, but it does.
My rec would be to either rename this method appropriately (optionally documenting the side effects) or split this method to reflect check and side effect behavior separately.
| StatusRuntimeException ex = Status.UNAVAILABLE | ||
| .withDescription("gRPC message compression not supported in ext_proc") | ||
| .asRuntimeException(); | ||
| if (!extProcStreamCompleted.get() && extProcClientCallRequestObserver != null) { |
There was a problem hiding this comment.
This member seems to be synchronized in other places. Should we have it here as well or is this a conscious decision?
| } | ||
| headersToModify.add(HeaderValueOption.create( | ||
| headerValue, | ||
| HeaderValueOption.HeaderAppendAction.valueOf(protoOption.getAppendAction().name()), |
There was a problem hiding this comment.
This may be slightly risky IMO.
I am not sure we want to couple our internal enum to xds proto enum, so I'd prefer an explicit switch case if possible.
| @Override | ||
| public void beforeStart(ClientCallStreamObserver<ProcessingRequest> requestStream) { | ||
| synchronized (streamLock) { | ||
| extProcClientCallRequestObserver = requestStream; |
There was a problem hiding this comment.
Couple of things here.
- It seems like we aren't respecting control flow before calling on next for the stream and processing pending buffer
- It seems like this is dead code.
BeforeStartis called synchronously so pendingRequests should in theory always be empty.
| public void beforeStart(ClientCallStreamObserver<ProcessingRequest> requestStream) { | ||
| synchronized (streamLock) { | ||
| extProcClientCallRequestObserver = requestStream; | ||
| while (!pendingProcessingRequests.isEmpty()) { |
There was a problem hiding this comment.
I have some questiosns on of why we need pendingProcessing.... The design doc seems to mention a couple of cases
- observability mode ext proc slowness
- request drain.
But the grfc for observability mode requests flow control over buffering: https://github.com/markdroth/proposal/blob/6ee533dac7d9d1b71ad46ba8826d2a3b3cdba313/A93-xds-ext-proc.md#flow-control . This should be somewhat easy to enforce, where we push problem to ext proc server, if our caller doesn't respect flow control.
Similarly, flow control is recommended solution for draining as well at https://github.com/markdroth/proposal/blob/6ee533dac7d9d1b71ad46ba8826d2a3b3cdba313/A93-xds-ext-proc.md#early-termination-of-ext_proc-stream . I don't really have a good solution for this if the caller doesn't respect flow control, we'll have to buffer to avoid out of order messages. Maybe let's discuss with Mark on the grfc about this.
| .asRuntimeException()); | ||
| return; | ||
| } | ||
| Metadata target = wrappedListener.trailersOnly.get() |
There was a problem hiding this comment.
encapsulation instead of member access.
| } | ||
| } | ||
| // 3. Client Trailers | ||
| else if (response.hasRequestTrailers()) { |
There was a problem hiding this comment.
Should we be handling this? The grfc doesn't talk about this.
Would this also make proceedWithClose dead code?
| if (extProcStreamCompleted.compareAndSet(false, true)) { | ||
| synchronized (streamLock) { | ||
| if (extProcClientCallRequestObserver != null) { | ||
| extProcClientCallRequestObserver.onError(t); |
There was a problem hiding this comment.
For my education is this valid behavior?
I'd assume, when the stream has error, it calls our onError , but it seems we are propagating it back to the request stream. Am I missing something here?
|
Reviewed the extproc response handling. Will be reviewing rest of the client call private methods today. |
Implements ext_proc filter from A93 (internal design doc)
Metrics aren't implemented yet.
Includes commits from unmerged Filter API enhancements and channel caching PRs.
Only the ExternaProcessingFilter.java, ExternaProcessingFilterTest.java and the envoy xds proto import and generated code need to be reviewed.
Rebasing commit history caused all received and merged commits to show my name as the committer, ignore all commits for which I'm not shown as the author.