Skip to content

Conversation

@michael-zhao459
Copy link
Contributor

@michael-zhao459 michael-zhao459 commented Jul 30, 2025

What does this PR do?

This PR

Sets a DSM checkpoint for every single record in a event. Does this by extracting out a helper that gets datadog context per record, and if DSM is enabled continues to loop starting with the second record and setting DSM checkpoints (always using the first record for APM).

Motivation

Screenshot 2025-07-30 at 2 19 19 PM

Please note the discrepancy between the msg/s from incoming produce and outgoing by downstream queue. When batch processing, if the consume service does not set a checkpoint for each message coming from upstream we lose track of them causing the noticeable drop in throughput.

Testing Guidelines

All of the tests in #622 this table are maintained. Ensured no regressions by making sure that all test_tracing.py continued to pass. Tested on sandbox AWS account for all queue types to see context propagation and ensure throughput matches. Added tests to show that DSM can now handle multiple records in a event.

Additional Notes

Types of Changes

  • Bug fix
  • New feature
  • Breaking change
  • Misc (docs, refactoring, dependency upgrade, etc.)

Check all that apply

  • This PR's description is comprehensive
  • This PR contains breaking changes that are documented in the description
  • This PR introduces new APIs or parameters that are documented and unlikely to change in the foreseeable future
  • This PR impacts documentation, and it has been updated (or a ticket has been logged)
  • This PR's changes are covered by the automated tests
  • This PR collects user input/sensitive content into Datadog
  • This PR passes the integration tests (ask a Datadog member to run the tests)

@michael-zhao459 michael-zhao459 changed the title set dsm checkpoint for all records in array fix(dsm): set dsm checkpoint for all records in array Jul 30, 2025
Copy link

@piochelepiotr piochelepiotr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The big question I guess is do we:

  1. Couple a bit more APM & DSM, but avoid de-serializing two times the datadog context for the first record
  2. Couple APM & DSM less, but at the cost of de-serializing two times the datadog context for the first record

not event_source.equals(EventTypes.KINESIS)
and not event_source.equals(EventTypes.SNS)
and not event_source.equals(EventTypes.SQS)
):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we expect this to happen? If not, let's add a debug log here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree the double de-serialization introduce some performance cost. But since we only need 1 extra deserialization for tracing purpose, and one JSON.loads(50 bytes) is at the level of nanosecond cost. So I feel comfortable not worrying about it at all. I'd say decoupling and better code structure is worth more. Also for lambdas, cold start costs are more critical than this. With that being said,...

  1. If in the future we need to deserialize all the tracecontext for spanlinks for example, that would bring the cost to ms level for each invocation and we might want to refactor the code to do it only once then.
  2. Might be too late to mention, but I start to think maybe datadog-lambda-js is a better starting place for refactoring like this because there we are using a bunch of extractors and could be easier to refactor.

@michael-zhao459 michael-zhao459 force-pushed the michael.zhao/dsm-ckpt-all-records branch from 69bbcd8 to c0906a0 Compare August 4, 2025 20:05
@michael-zhao459 michael-zhao459 changed the title fix(dsm): set dsm checkpoint for all records in array fix(dsm): set dsm checkpoint for all records in event Aug 4, 2025
)
# Handle case where trace context is injected into attributes.AWSTraceHeader
# example:Root=1-654321ab-000000001234567890abcdef;Parent=0123456789abcdef;Sampled=1
attrs = event.get("Records")[0].get("attributes")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here, it's accessing records[0] again. I would put this whole section in an else if idx == 0 (line 315)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
attrs = event.get("Records")[0].get("attributes")
attrs = record.get("attributes")

if idx == 0:
context = propagator.extract(dd_data)
dsm_data = dd_data
else:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in this else, dsm_data is not set. Is that an issue?

Copy link
Contributor Author

@michael-zhao459 michael-zhao459 Aug 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not an issue. I checked in dd-trace-py and DSM never injects context into attributes.AWSTraceHeader, we can just set a checkpoint with None

)
context = None
for idx, record in enumerate(records):
dsm_data = None

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not specific to dsm, it's dd_ctx

if idx == 0
else context
)
_dsm_set_checkpoint(None, "kinesis", source_arn)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we are setting a checkpoint kinesis if not kinesis, I think the name kinesis is wrong, because this code is a bit confusing.

Copy link
Contributor Author

@michael-zhao459 michael-zhao459 Aug 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm. This is deep enough inside the extract function where we believe that the event source is from Kinesis from parsing beforehand. However, all AWS documentation says Kinesis lambda event should have this field. To my understanding, this check is for lambda synchronous invocations with records that match Kinesis, but doesn't actually come from a Kinesis stream. @DataDog/apm-serverless Can you help confirm why this check is here in the first place?

for idx, record in enumerate(records):
try:
source_arn = record.get("eventSourceARN", "")
dsm_data = None

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as bellow, dsm_data is not specific to data streams here. I would name it same as bellow, dd_ctx, or something like that.

if dd_json_data_type == "Binary":
import base64
context = None
records = (

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe let's have records always be: event.get("Records", []), however, in the loop, we can break early if data streams is disabled.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if index == 0:
     do apm stuff
     if data streams is enabled:
         break

@michael-zhao459 michael-zhao459 force-pushed the michael.zhao/dsm-ckpt-all-records branch from f38673b to 60ca41b Compare August 6, 2025 15:06
@michael-zhao459 michael-zhao459 force-pushed the michael.zhao/dsm-ckpt-all-records branch from 60ca41b to 2f8dfaa Compare August 6, 2025 15:11
@DataDog DataDog deleted a comment from piochelepiotr Aug 6, 2025
)
# Handle case where trace context is injected into attributes.AWSTraceHeader
# example:Root=1-654321ab-000000001234567890abcdef;Parent=0123456789abcdef;Sampled=1
attrs = event.get("Records")[0].get("attributes")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
attrs = event.get("Records")[0].get("attributes")
attrs = record.get("attributes")

"Failed to extract Step Functions context from SQS/SNS event."
)
context = propagator.extract(dd_data)
if not config.data_streams_enabled:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this break is too hidden. It is in if dd_payload block?

I suggest this high level approach:

apm_context = None
for record in records:
  context = extract_context(record)
  if apm_context is None:
      apm_context = context
  if data_streams_enabled:
    set_checkpoint()
  if !data_streams_enabled:
    # APM only looks at the first record.
    break

You can break down the code into helper function to make that structure be very clear.
Basically, can you avoid some of the nested conditions? Like if not config.data_streams_enabled here?

# example:Root=1-654321ab-000000001234567890abcdef;Parent=0123456789abcdef;Sampled=1
attrs = event.get("Records")[0].get("attributes")
if attrs:
x_ray_header = attrs.get("AWSTraceHeader")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe put this logic in the extract_context I suggested above. The extract_context can take an argument: extract_from_xray?

(extract_context is probably not a great name, I let you find a better one)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm. I might be misinterpreting but I'm not sure we should have one function return both a Context() object and the return of a json.loads(). I ended up splitting the x-ray extractor into another helper, let me know what you think

except Exception:
logger.debug("Failed extracting context as EventBridge to SQS.")

try:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if context is extracted from event bridge, we don't set a checkpoint. Is that expected?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tracers never inject DSM context in the case of event bridge or step functions. I'm not sure this is the PR to be adding the functionality for these event types

)


def _extract_context_from_sqs_or_sns_record(record):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that function looks great to me

try:
dd_ctx = _extract_context_from_sqs_or_sns_record(record)
if apm_context is None:
if dd_ctx and is_step_function_event(dd_ctx):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the DSM context can't be in a step_funtion_event?

In any case, the logic to get the apm context from the dd_ctx, should be in it's own function I this.

That function can be the _extract_context_from_xray that you can rename to:
_extract_apm_context, and it can take the parameters: dd_ctx and record.

Then, code here can be:

if apm_context is None:
       apm_context = _extract_apm_context(dd_ctx, record)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will make the code easier to read, but also less error prone.

Here, if the context is extracted from step function, we are not setting a checkpoint. I don't think this is what we want?

return None


def _extract_context_from_xray(record):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as mentioned above, I would change function to extract the apm context. Not just from xray.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants