Add Application Signals logs pipeline translator and allow metric routing to otlp endpoint#2111
Conversation
83b5680 to
9027c02
Compare
logs.logs_collected.application_signals | // TODO: Update default log group prefix before PR is merged. | ||
| defaultLogGroupPrefix = "/aws/telemetry/" |
There was a problem hiding this comment.
Still pending and not final.
There was a problem hiding this comment.
Will now be /aws/service-events/{service.name}.
| defaultLogStreamName = "default" | ||
|
|
||
| metadataKeyLogGroup = "aws.cloudwatch.log_group.destination" | ||
| metadataKeyLogStream = "aws.cloudwatch.log_stream.destination" |
There was a problem hiding this comment.
These key values are not following any otel spec, they're just arbitrary attributes created by Transform Processor which are used by AttributesToContext Processor, so we don't expect customers to know about these keys or even set them in their exported logs.
However, if customer does know and set these these attributes in their logs, we'll respect it and won't override them.
51f3c5d to
d454f8f
Compare
| } | ||
| // Auto-enable logs.logs_collected.application_signals when metrics is configured | ||
| // Must happen before creating confmap since confmap may copy the map | ||
| applicationsignalslogs.AutoEnableIfNeeded(m) |
There was a problem hiding this comment.
You dont need to mutate the json for this.. just rely on the existing AppSignals enabled checks in the pipeline translator.
| translators.Merge(containerInsightsTranslators) | ||
| translators.Set(applicationsignals.NewTranslator(pipeline.SignalTraces)) | ||
| translators.Set(applicationsignals.NewTranslator(pipeline.SignalMetrics)) | ||
| translators.Set(applicationsignalslogs.NewTranslator()) |
There was a problem hiding this comment.
Does this really need to be a new translator? We've been trying to keep one translator per "solution" that can internally create multiple pipelines.
There was a problem hiding this comment.
Not needed, moved into the existing applicationsignals translator.
| logKey = common.ConfigKey(common.LogsKey, common.LogsCollectedKey) | ||
| metricKey = common.ConfigKey(common.MetricsKey, common.MetricsCollectedKey) | ||
| skipInputSet = collections.NewSet[string](files.SectionKey, windowsevents.SectionKey) | ||
| skipInputSet = collections.NewSet[string](files.SectionKey, windowsevents.SectionKey, common.AppSignals, common.AppSignalsFallback) |
There was a problem hiding this comment.
What in your changes forced you to add this?
i.e. what complained when you didnt have this?
There was a problem hiding this comment.
Before my change, application_signals only existed under logs.metrics_collected (for metrics pipeline) and traces.traces_collected (for traces pipeline). The adapter never saw it because it only scans logs_collected.
Now that we added logs.logs_collected.application_signals, the adapter scans it and tries to create a telegraf_application_signals receiver which doesn't exist and is why I try to skip it here.
|
|
||
| // NewTranslatorWithLogStatements creates a transform processor translator that | ||
| // executes the given OTTL statements in the "resource" context for logs. | ||
| func NewTranslatorWithLogStatements(name string, statements []string) common.ComponentTranslator { |
There was a problem hiding this comment.
nit: Can we follow the same pattern you did with WithMetadataKeys in the batch processor translator instead of creating this?
| } | ||
|
|
||
| func (t *translator) ID() component.ID { | ||
| return component.MustNewID("attributestocontext") |
There was a problem hiding this comment.
nit: Can we just do return component.NewIDWithName(t.factory.Type(), t.name)?
| } | ||
|
|
||
| func (t *translator) ID() component.ID { | ||
| return component.NewIDWithName(component.MustNewType("headers_setter"), t.name) |
There was a problem hiding this comment.
nit: Same here too and for the other components - can we just do return component.NewIDWithName(t.factory.Type(), t.name).
|
|
||
| var _ common.ComponentTranslator = (*translator)(nil) | ||
|
|
||
| func NewTranslatorWithName(name string, additionalAuth component.ID, headers []HeaderMapping) common.ComponentTranslator { |
There was a problem hiding this comment.
nit: The With might be better suited unless we are saying these are all mandatory.
960cebe to
46cd360
Compare
…etrics, selectively export metrics to otlp endpoint
5e9f489 to
d5a4647
Compare
| func NewTranslatorWithName(name string) common.ComponentTranslator { | ||
| return &translator{name, sigv4authextension.NewFactory()} | ||
| func NewTranslatorWithService(service string) common.ComponentTranslator { | ||
| return &translator{name: service, service: service, factory: sigv4authextension.NewFactory()} |
There was a problem hiding this comment.
nit: Don't need a separate field for name since it's always the same as service.
| AppSignalsLogs = ConfigKey(LogsKey, LogsCollectedKey, AppSignals) | ||
| AppSignalsTracesFallback = ConfigKey(TracesKey, TracesCollectedKey, AppSignalsFallback) | ||
| AppSignalsMetricsFallback = ConfigKey(LogsKey, MetricsCollectedKey, AppSignalsFallback) | ||
| AppSignalsLogsFallback = ConfigKey(LogsKey, LogsCollectedKey, AppSignalsFallback) |
| translators.Set(NewTranslator(signal, SetVariant(metricsVariantLogDest))) | ||
| translators.Set(NewTranslator(signal, SetVariant(metricsVariantOtlpDest))) | ||
| case pipeline.SignalLogs: | ||
| if isLogsDisabled(conf) { |
There was a problem hiding this comment.
nit: Should have a nil check for the conf.
| "log_group_name": { | ||
| "type": "string", | ||
| "minLength": 1 | ||
| }, | ||
| "log_stream_name": { | ||
| "type": "string", | ||
| "minLength": 1 |
There was a problem hiding this comment.
nit: maxLength could be set to 512 to match https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html although I'm sure the placeholders will allow it to go past that.
| - condition: attributes["Telemetry.Source"] == "ServiceEvents" | ||
| context: datapoint |
There was a problem hiding this comment.
Is there a reason this is on the datapoint attributes and not the resource attributes?
There was a problem hiding this comment.
In our SDK, it will set the attribute in on counter.add() attributes, so it's a datapoint attribute. SDK can send datapoints with different kinds of Telemetry.Source so we can't have it on resource.attributes.
| // EndpointConfig specifies the base endpoint and signal-specific endpoint for | ||
| // the otlphttp exporter. | ||
| type EndpointConfig struct { | ||
| BaseEndpoint string // e.g. "https://logs.us-east-1.amazonaws.com" | ||
| LogsEndpoint string // e.g. "https://logs.us-east-1.amazonaws.com/v1/logs" | ||
| MetricsEndpoint string | ||
| TracesEndpoint string | ||
| } |
There was a problem hiding this comment.
Yes, but I'm fine with what you have as well.
| error_mode: propagate | ||
| statements: | ||
| - set(resource.attributes["aws.cloudwatch.log_group.destination"], Concat(["/aws/service-events/", resource.attributes["service.name"]], "")) where resource.attributes["aws.cloudwatch.log_group.destination"] == nil | ||
| - replace_pattern(resource.attributes["aws.cloudwatch.log_group.destination"], "<nil>", "undefined") |
There was a problem hiding this comment.
Should we remove this aws.cloudwatch.log_group.destination resource attribute after attributestocontext sets it in the context? If we don't it's going to be included in every single log we send. Could add a transformprocessor for to clean it up before routing or in the export pipeline.
There was a problem hiding this comment.
As discussed, added a transform processor to cleanup: 5cec422
| metadataKeyLogGroup = "aws.cloudwatch.log_group.destination" | ||
| metadataKeyLogStream = "aws.cloudwatch.log_stream.destination" |
There was a problem hiding this comment.
nit: Consider making the metadata key consistent with the header key.
There was a problem hiding this comment.
Do you mean to use the naming aws.log.group.name / aws.log.stream.name instead of aws.cloudwatch.log_...?
Done in cb34b0a
There was a problem hiding this comment.
Meant x-aws-log-group/x-aws-log-stream like with https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/CWL_HTTP_Endpoints_OTLP.html so it's clear what they're for, but I think it's fine either way.
…butes, handle missing attrs better
…plify its translation logic, update go.mod
562e8a3 to
a02ae36
Compare
Description of the issue
Application Signals currently collects metrics and traces. This PR updates and refactors the
applicationsignalstranslator to also generate OTel logs and metrics pipelines that route OTLP data to CloudWatch via the CW OTLP endpoints, with automatic per-service log group creation and selective metrics routing.Customers configure
logs.logs_collected.application_signalswith optionallog_group_name(supporting placeholders like{service.name}) andlog_stream_name. The provisioner extension creates log groups/streams on first encounter.Description of changes
Update the existing
translator/translate/otel/pipeline/applicationsignals/translator with routing connectors for both logs and metrics signals. Each signal uses a 3-pipeline topology (receive → route → export) connected via a routing connector.Metrics (3 pipelines via routing connector):
metrics/application_signals_metrics_route: OTLP receiver → routing connectormetrics/application_signals_metrics_logs_destination(default): routing → existing AppSignals processors → EMF exportermetrics/application_signals_metrics_otlp_destination: routing → batch → otlphttp to CW OTLP monitoring endpointattributes["Telemetry.Source"] == "ServiceEvents"(datapoint context)Logs (3 pipelines via routing connector):
logs/application_signals_logs_route: OTLP receiver → transform → attributestocontext → transform → routing connectorattributes["event.name"] == "aws.service_events.aggregate_profile"(log context)transformandattributestocontextare not usedtemporary_key.) are used to help build theaws.log.group.name/aws.log.stream.nameattributes in the first transform, and will be discarded in the second transform.logs/application_signals_logs_batch(default): routing → batch → otlphttp to CW OTLP logs endpointlogs/application_signals_logs_nobatch: routing → otlphttp (no batch, for large payloads)otlphttp → headers_setter → awscloudwatchlogsprovisioner → sigv4authTraces (unchanged):
Notable behaviors:
metrics_collected.application_signalsis configured. Explicitly disable via:logs.logs_collected.application_signals.disabled: true/{deployment.environment}/prefix/{service.name}) (Default log group:/aws/service-events/{service.name})aws.log.group.name/aws.log.stream.nameare ignoredNew/Updated shared translator packages:
extension/awscloudwatchlogsprovisioner— newextension/headerssetter— newprocessor/attributestocontext— newconnector/routing— newexporter/otlphttp— new (generic, signal-awareEndpointConfig)extension/sigv4auth— Updated withWithServiceoptionprocessor/batchprocessor— Updated withWithMetadataKeysoptionprocessor/transformprocessor— Updated withNewTranslatorWithLogStatementsLicense
By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.
Tests
amazon-cloudwatch-agent-test: <TODO: Make PR>Requirements
Before commiting your code, please do the following steps.
make fmtandmake fmt-shmake lintIntegration Tests
To run integration tests against this PR, add the
ready for testinglabel.