diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 2baecedbd..e9595cdf0 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -39,6 +39,7 @@ endif::[] * Add instrumentation for https://github.com/aio-libs/aiobotocore[`aiobotocore`] {pull}1520[#1520] * Add instrumentation for https://kafka-python.readthedocs.io/en/master/[`kafka-python`] {pull}1555[#1555] * Add API for span links, and implement span link support for OpenTelemetry bridge {pull}1562[#1562] +* Add specific instrumentation for SQS delete/batch-delete {pull}1567[#1567] [float] ===== Bug fixes diff --git a/elasticapm/instrumentation/packages/botocore.py b/elasticapm/instrumentation/packages/botocore.py index 86bbe5394..71d9eed26 100644 --- a/elasticapm/instrumentation/packages/botocore.py +++ b/elasticapm/instrumentation/packages/botocore.py @@ -164,26 +164,33 @@ def handle_sns(operation_name, service, instance, args, kwargs, context): return HandlerInfo(signature, span_type, span_subtype, span_action, context) +SQS_OPERATIONS = { + "SendMessage": {"span_action": "send", "signature": "SEND to"}, + "SendMessageBatch": {"span_action": "send_batch", "signature": "SEND_BATCH to"}, + "ReceiveMessage": {"span_action": "receive", "signature": "RECEIVE from"}, + "DeleteMessage": {"span_action": "delete", "signature": "DELETE from"}, + "DeleteMessageBatch": {"span_action": "delete_batch", "signature": "DELETE_BATCH from"}, +} + + def handle_sqs(operation_name, service, instance, args, kwargs, context): - if operation_name not in ("SendMessage", "SendMessageBatch", "ReceiveMessage"): + op = SQS_OPERATIONS.get(operation_name, None) + if not op: # only "publish" is handled specifically, other endpoints get the default treatment return False span_type = "messaging" span_subtype = "sqs" - span_action = "send" if operation_name in ("SendMessage", "SendMessageBatch") else "receive" topic_name = "" - batch = "_BATCH" if operation_name == "SendMessageBatch" else "" - signature_type = "RECEIVE from" if span_action == "receive" else f"SEND{batch} to" if len(args) > 1: topic_name = args[1]["QueueUrl"].rsplit("/", maxsplit=1)[-1] - signature = f"SQS {signature_type} {topic_name}".rstrip() if topic_name else f"SQS {signature_type}" + signature = f"SQS {op['signature']} {topic_name}".rstrip() if topic_name else f"SQS {op['signature']}" context["destination"]["service"] = { "name": span_subtype, "resource": f"{span_subtype}/{topic_name}" if topic_name else span_subtype, "type": span_type, } - return HandlerInfo(signature, span_type, span_subtype, span_action, context) + return HandlerInfo(signature, span_type, span_subtype, op["span_action"], context) def modify_span_sqs(span, args, kwargs): @@ -200,7 +207,9 @@ def modify_span_sqs(span, args, kwargs): attributes_count = len(attributes) if "MessageAttributes" in args[1]: messages = [args[1]] - elif "Entries" in args[1]: + # both send_batch and delete_batch use the same "Entries" list. We only want to add the + # traceparent to send_batch. We use the existence of ReceiptHandle to differentiate between the two + elif "Entries" in args[1] and args[1]["Entries"] and "ReceiptHandle" not in args[1]["Entries"][0]: messages = args[1]["Entries"] else: messages = [] diff --git a/tests/instrumentation/asyncio_tests/aiobotocore_tests.py b/tests/instrumentation/asyncio_tests/aiobotocore_tests.py index c1d6a5152..b08b6499e 100644 --- a/tests/instrumentation/asyncio_tests/aiobotocore_tests.py +++ b/tests/instrumentation/asyncio_tests/aiobotocore_tests.py @@ -259,7 +259,7 @@ async def test_sqs_send_batch(instrument, elasticapm_client, sqs_client_and_queu assert span["name"] == "SQS SEND_BATCH to myqueue" assert span["type"] == "messaging" assert span["subtype"] == "sqs" - assert span["action"] == "send" + assert span["action"] == "send_batch" assert span["context"]["destination"]["cloud"]["region"] == "us-east-1" assert span["context"]["destination"]["service"]["name"] == "sqs" assert span["context"]["destination"]["service"]["resource"] == "sqs/myqueue" diff --git a/tests/instrumentation/botocore_tests.py b/tests/instrumentation/botocore_tests.py index 7d20b629c..7db38d834 100644 --- a/tests/instrumentation/botocore_tests.py +++ b/tests/instrumentation/botocore_tests.py @@ -255,7 +255,7 @@ def test_sqs_send_batch(instrument, elasticapm_client, sqs_client_and_queue): assert span["name"] == "SQS SEND_BATCH to myqueue" assert span["type"] == "messaging" assert span["subtype"] == "sqs" - assert span["action"] == "send" + assert span["action"] == "send_batch" assert span["context"]["destination"]["cloud"]["region"] == "us-east-1" assert span["context"]["destination"]["service"]["name"] == "sqs" assert span["context"]["destination"]["service"]["resource"] == "sqs/myqueue" @@ -324,7 +324,7 @@ def test_sqs_send_disttracing_dropped_span(instrument, elasticapm_client, sqs_cl assert transaction.id in traceparent # due to DroppedSpan, transaction.id is used instead of span.id -def test_sqs_receive(instrument, elasticapm_client, sqs_client_and_queue): +def test_sqs_receive_and_delete(instrument, elasticapm_client, sqs_client_and_queue): sqs, queue_url = sqs_client_and_queue sqs.send_message( QueueUrl=queue_url, @@ -341,13 +341,59 @@ def test_sqs_receive(instrument, elasticapm_client, sqs_client_and_queue): "All", ], ) + sqs.delete_message(QueueUrl=queue_url, ReceiptHandle=response["Messages"][0]["ReceiptHandle"]) elasticapm_client.end_transaction("test", "test") - span = elasticapm_client.events[constants.SPAN][0] - assert span["name"] == "SQS RECEIVE from myqueue" - assert span["type"] == "messaging" - assert span["subtype"] == "sqs" - assert span["action"] == "receive" - assert span["context"]["destination"]["cloud"]["region"] == "us-east-1" - assert span["context"]["destination"]["service"]["name"] == "sqs" - assert span["context"]["destination"]["service"]["resource"] == "sqs/myqueue" - assert span["context"]["destination"]["service"]["type"] == "messaging" + + receive_span = elasticapm_client.events[constants.SPAN][0] + assert receive_span["name"] == "SQS RECEIVE from myqueue" + assert receive_span["type"] == "messaging" + assert receive_span["subtype"] == "sqs" + assert receive_span["action"] == "receive" + assert receive_span["context"]["destination"]["cloud"]["region"] == "us-east-1" + assert receive_span["context"]["destination"]["service"]["name"] == "sqs" + assert receive_span["context"]["destination"]["service"]["resource"] == "sqs/myqueue" + assert receive_span["context"]["destination"]["service"]["type"] == "messaging" + + delete_span = elasticapm_client.events[constants.SPAN][1] + assert delete_span["name"] == "SQS DELETE from myqueue" + assert delete_span["type"] == "messaging" + assert delete_span["subtype"] == "sqs" + assert delete_span["action"] == "delete" + assert delete_span["context"]["destination"]["cloud"]["region"] == "us-east-1" + assert delete_span["context"]["destination"]["service"]["name"] == "sqs" + assert delete_span["context"]["destination"]["service"]["resource"] == "sqs/myqueue" + assert delete_span["context"]["destination"]["service"]["type"] == "messaging" + + +def test_sqs_delete_batch(instrument, elasticapm_client, sqs_client_and_queue): + sqs, queue_url = sqs_client_and_queue + sqs.send_message( + QueueUrl=queue_url, + MessageAttributes={ + "Title": {"DataType": "String", "StringValue": "foo"}, + }, + MessageBody=("bar"), + ) + response = sqs.receive_message( + QueueUrl=queue_url, + AttributeNames=["All"], + MessageAttributeNames=[ + "All", + ], + ) + elasticapm_client.begin_transaction("test") + sqs.delete_message_batch( + QueueUrl=queue_url, + Entries=[{"Id": "foo", "ReceiptHandle": response["Messages"][0]["ReceiptHandle"]}], + ) + elasticapm_client.end_transaction("test", "test") + + delete_span = elasticapm_client.events[constants.SPAN][0] + assert delete_span["name"] == "SQS DELETE_BATCH from myqueue" + assert delete_span["type"] == "messaging" + assert delete_span["subtype"] == "sqs" + assert delete_span["action"] == "delete_batch" + assert delete_span["context"]["destination"]["cloud"]["region"] == "us-east-1" + assert delete_span["context"]["destination"]["service"]["name"] == "sqs" + assert delete_span["context"]["destination"]["service"]["resource"] == "sqs/myqueue" + assert delete_span["context"]["destination"]["service"]["type"] == "messaging"