-
Notifications
You must be signed in to change notification settings - Fork 1.7k
fix(metrics): correct GFE metrics extraction and enable by default #17561
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -15,7 +15,7 @@ | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| """Interceptor for collecting Cloud Spanner metrics.""" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import re | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| from typing import Dict | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| from typing import Any, Dict | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| from grpc_interceptor import ClientInterceptor | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -122,10 +122,115 @@ def intercept(self, invoked_method, request_or_iterator, call_details): | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| tracer.set_method(method_name) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| tracer.record_attempt_start() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| response = invoked_method(request_or_iterator, call_details) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| tracer.record_attempt_completion() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # Process and send GFE metrics if enabled | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if tracer.gfe_enabled: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| metadata = response.initial_metadata() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| tracer.record_gfe_metrics(metadata) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return _wrap_response(response, tracer) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| def _wrap_response(response: Any, tracer: Any) -> Any: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| """Wraps the response if it is streaming, or records metrics immediately if unary.""" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if hasattr(response, "__anext__") or hasattr(response, "__aiter__"): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return _AsyncStreamingResponseWrapper(response, tracer) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| elif hasattr(response, "__next__") or hasattr(response, "__iter__"): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return _StreamingResponseWrapper(response, tracer) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| else: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # Unary call: execute completion and record metrics immediately | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| tracer.record_attempt_completion() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| metadata = [] | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if hasattr(response, "initial_metadata"): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| try: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| metadata.extend(response.initial_metadata() or []) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| except Exception: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| pass | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if hasattr(response, "trailing_metadata"): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| try: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| metadata.extend(response.trailing_metadata() or []) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| except Exception: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| pass | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| tracer.record_gfe_metrics(metadata) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return response | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+135
to
150
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The metrics recording block for unary calls is not wrapped in a try-except block. If
Suggested change
References
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| class _StreamingResponseWrapper: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| """Wrapper for streaming RPC response iterators to defer metrics recording.""" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| def __init__(self, response, tracer): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self._response = response | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self._tracer = tracer | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self._metrics_recorded = False | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| def __iter__(self): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return self | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| def __next__(self): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| try: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return next(self._response) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| except StopIteration: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self._record_metrics() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| raise | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| except Exception: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self._record_metrics() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| raise | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| def _record_metrics(self): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if self._metrics_recorded: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self._metrics_recorded = True | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self._tracer.record_attempt_completion() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| metadata = [] | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if hasattr(self._response, "initial_metadata"): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| try: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| metadata.extend(self._response.initial_metadata() or []) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| except Exception: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| pass | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if hasattr(self._response, "trailing_metadata"): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| try: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| metadata.extend(self._response.trailing_metadata() or []) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| except Exception: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| pass | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self._tracer.record_gfe_metrics(metadata) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+174
to
+190
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Similar to the unary call metrics recording, if any exception is raised during
Suggested change
References
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| def __getattr__(self, name): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return getattr(self._response, name) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| class _AsyncStreamingResponseWrapper: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| """Wrapper for async streaming RPC response iterators to defer metrics recording.""" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| def __init__(self, response, tracer): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self._response = response | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self._tracer = tracer | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self._metrics_recorded = False | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| def __aiter__(self): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return self | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| async def __anext__(self): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| try: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return await self._response.__anext__() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| except StopAsyncIteration: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self._record_metrics() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| raise | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| except Exception: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self._record_metrics() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| raise | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| def _record_metrics(self): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if self._metrics_recorded: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self._metrics_recorded = True | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self._tracer.record_attempt_completion() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| metadata = [] | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if hasattr(self._response, "initial_metadata"): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| try: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| metadata.extend(self._response.initial_metadata() or []) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| except Exception: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| pass | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if hasattr(self._response, "trailing_metadata"): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| try: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| metadata.extend(self._response.trailing_metadata() or []) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| except Exception: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| pass | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self._tracer.record_gfe_metrics(metadata) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+217
to
+233
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For the asynchronous streaming wrapper, we should also wrap the metrics recording logic in a try-except block to prevent telemetry failures from crashing the async stream or masking
Suggested change
References
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| def __getattr__(self, name): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return getattr(self._response, name) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -19,8 +19,9 @@ | |||||||||||||||||||||||||||||||||
| while the helper classes provide additional functionality and context for the metrics being traced. | ||||||||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| import re | ||||||||||||||||||||||||||||||||||
| from datetime import datetime | ||||||||||||||||||||||||||||||||||
| from typing import Dict | ||||||||||||||||||||||||||||||||||
| from typing import Any, Dict, Optional | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| from grpc import StatusCode | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
|
|
@@ -198,6 +199,8 @@ def __init__( | |||||||||||||||||||||||||||||||||
| instrument_operation_counter: "Counter", | ||||||||||||||||||||||||||||||||||
| client_attributes: Dict[str, str], | ||||||||||||||||||||||||||||||||||
| gfe_enabled: bool = False, | ||||||||||||||||||||||||||||||||||
| instrument_gfe_latency: Optional["Histogram"] = None, | ||||||||||||||||||||||||||||||||||
| instrument_gfe_missing_header_count: Optional["Counter"] = None, | ||||||||||||||||||||||||||||||||||
| ): | ||||||||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||||||||
| Initialize a MetricsTracer instance with the given parameters. | ||||||||||||||||||||||||||||||||||
|
|
@@ -214,15 +217,19 @@ def __init__( | |||||||||||||||||||||||||||||||||
| instrument_operation_counter (Counter): Instrument for counting operations. | ||||||||||||||||||||||||||||||||||
| client_attributes (Dict[str, str]): Dictionary of client attributes used for metrics tracing. | ||||||||||||||||||||||||||||||||||
| gfe_enabled (bool, optional): Indicates if GFE metrics are enabled. Defaults to False. | ||||||||||||||||||||||||||||||||||
| instrument_gfe_latency (Histogram, optional): Instrument for measuring GFE latency. | ||||||||||||||||||||||||||||||||||
| instrument_gfe_missing_header_count (Counter, optional): Instrument for counting missing GFE headers. | ||||||||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||||||||
| self.current_op = MetricOpTracer() | ||||||||||||||||||||||||||||||||||
| self._client_attributes = client_attributes | ||||||||||||||||||||||||||||||||||
| self._instrument_attempt_latency = instrument_attempt_latency | ||||||||||||||||||||||||||||||||||
| self._instrument_attempt_counter = instrument_attempt_counter | ||||||||||||||||||||||||||||||||||
| self._instrument_operation_latency = instrument_operation_latency | ||||||||||||||||||||||||||||||||||
| self._instrument_operation_counter = instrument_operation_counter | ||||||||||||||||||||||||||||||||||
| self._instrument_gfe_latency = instrument_gfe_latency | ||||||||||||||||||||||||||||||||||
| self._instrument_gfe_missing_header_count = instrument_gfe_missing_header_count | ||||||||||||||||||||||||||||||||||
| self.enabled = enabled | ||||||||||||||||||||||||||||||||||
| self.gfe_enabled = gfe_enabled | ||||||||||||||||||||||||||||||||||
| self.gfe_enabled = True | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| @staticmethod | ||||||||||||||||||||||||||||||||||
| def _get_ms_time_diff(start: datetime, end: datetime) -> float: | ||||||||||||||||||||||||||||||||||
|
|
@@ -399,7 +406,11 @@ def record_gfe_latency(self, latency: int) -> None: | |||||||||||||||||||||||||||||||||
| Args: | ||||||||||||||||||||||||||||||||||
| latency (int): The latency duration to be recorded. | ||||||||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||||||||
| if not self.enabled or not HAS_OPENTELEMETRY_INSTALLED or not self.gfe_enabled: | ||||||||||||||||||||||||||||||||||
| if ( | ||||||||||||||||||||||||||||||||||
| not self.enabled | ||||||||||||||||||||||||||||||||||
| or not HAS_OPENTELEMETRY_INSTALLED | ||||||||||||||||||||||||||||||||||
| or not getattr(self, "_instrument_gfe_latency", None) | ||||||||||||||||||||||||||||||||||
| ): | ||||||||||||||||||||||||||||||||||
| return | ||||||||||||||||||||||||||||||||||
| self._instrument_gfe_latency.record( | ||||||||||||||||||||||||||||||||||
| amount=latency, attributes=self.client_attributes | ||||||||||||||||||||||||||||||||||
|
|
@@ -409,12 +420,65 @@ def record_gfe_missing_header_count(self) -> None: | |||||||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||||||||
| Increments the counter for missing GFE headers. | ||||||||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||||||||
| if not self.enabled or not HAS_OPENTELEMETRY_INSTALLED or not self.gfe_enabled: | ||||||||||||||||||||||||||||||||||
| if ( | ||||||||||||||||||||||||||||||||||
| not self.enabled | ||||||||||||||||||||||||||||||||||
| or not HAS_OPENTELEMETRY_INSTALLED | ||||||||||||||||||||||||||||||||||
| or not getattr(self, "_instrument_gfe_missing_header_count", None) | ||||||||||||||||||||||||||||||||||
| ): | ||||||||||||||||||||||||||||||||||
| return | ||||||||||||||||||||||||||||||||||
| self._instrument_gfe_missing_header_count.add( | ||||||||||||||||||||||||||||||||||
| amount=1, attributes=self.client_attributes | ||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| @staticmethod | ||||||||||||||||||||||||||||||||||
| def extract_gfe_latency(metadata: Any) -> Optional[int]: | ||||||||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||||||||
| Extracts the GFE latency value (in milliseconds) from response metadata. | ||||||||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||||||||
| if not metadata: | ||||||||||||||||||||||||||||||||||
| return None | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| header_vals = [] | ||||||||||||||||||||||||||||||||||
| if isinstance(metadata, dict): | ||||||||||||||||||||||||||||||||||
| for key, val in metadata.items(): | ||||||||||||||||||||||||||||||||||
| if key and str(key).lower() in ("server-timing", "server_timing"): | ||||||||||||||||||||||||||||||||||
| if isinstance(val, (list, tuple)): | ||||||||||||||||||||||||||||||||||
| header_vals.extend(val) | ||||||||||||||||||||||||||||||||||
| else: | ||||||||||||||||||||||||||||||||||
| header_vals.append(val) | ||||||||||||||||||||||||||||||||||
| elif isinstance(metadata, (list, tuple)): | ||||||||||||||||||||||||||||||||||
| for key, val in metadata: | ||||||||||||||||||||||||||||||||||
| if key and str(key).lower() in ("server-timing", "server_timing"): | ||||||||||||||||||||||||||||||||||
| if isinstance(val, (list, tuple)): | ||||||||||||||||||||||||||||||||||
| header_vals.extend(val) | ||||||||||||||||||||||||||||||||||
| else: | ||||||||||||||||||||||||||||||||||
| header_vals.append(val) | ||||||||||||||||||||||||||||||||||
|
Comment on lines
+449
to
+455
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When iterating over
Suggested change
|
||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| for header_val in header_vals: | ||||||||||||||||||||||||||||||||||
| if not header_val: | ||||||||||||||||||||||||||||||||||
| continue | ||||||||||||||||||||||||||||||||||
| if not isinstance(header_val, str): | ||||||||||||||||||||||||||||||||||
| header_val = str(header_val) | ||||||||||||||||||||||||||||||||||
|
Comment on lines
+460
to
+461
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If if isinstance(header_val, bytes):
try:
header_val = header_val.decode("utf-8")
except Exception:
header_val = str(header_val)
elif not isinstance(header_val, str):
header_val = str(header_val) |
||||||||||||||||||||||||||||||||||
| match = re.search(r"gfet4t7;\s*dur=([0-9]+)", header_val) | ||||||||||||||||||||||||||||||||||
| if match: | ||||||||||||||||||||||||||||||||||
| try: | ||||||||||||||||||||||||||||||||||
| return int(match.group(1)) | ||||||||||||||||||||||||||||||||||
| except ValueError: | ||||||||||||||||||||||||||||||||||
| pass | ||||||||||||||||||||||||||||||||||
| return None | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| def record_gfe_metrics(self, metadata: Any) -> None: | ||||||||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||||||||
| Extracts and records GFE metrics from the RPC response metadata. | ||||||||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||||||||
| if not self.enabled or not HAS_OPENTELEMETRY_INSTALLED: | ||||||||||||||||||||||||||||||||||
| return | ||||||||||||||||||||||||||||||||||
| latency = self.extract_gfe_latency(metadata) | ||||||||||||||||||||||||||||||||||
| if latency is not None: | ||||||||||||||||||||||||||||||||||
| self.record_gfe_latency(latency) | ||||||||||||||||||||||||||||||||||
| else: | ||||||||||||||||||||||||||||||||||
| self.record_gfe_missing_header_count() | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| def _create_operation_otel_attributes(self) -> dict: | ||||||||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||||||||
| Create additional attributes for operation metrics tracing. | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Checking
hasattr(response, "__iter__")orhasattr(response, "__aiter__")is too broad because many standard unary response types (such as lists, dicts, tuples, or custom iterables) are iterable but are not streaming responses. If a unary RPC returns an iterable, it will be incorrectly wrapped in_StreamingResponseWrapper, which will break the caller's expectations.Additionally, in unit tests,
MagicMockobjects have__iter__by default, which causes them to be incorrectly wrapped.To correctly identify streaming responses, we should only check for the iterator protocol methods
__next__and__anext__. Any gRPC streaming response is an iterator and must implement these methods.