Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ endif::[]
* Add instrumentation for https://github.com/aio-libs/aiobotocore[`aiobotocore`] {pull}1520[#1520]
* Add instrumentation for https://kafka-python.readthedocs.io/en/master/[`kafka-python`] {pull}1555[#1555]
* Add API for span links, and implement span link support for OpenTelemetry bridge {pull}1562[#1562]
* Add specific instrumentation for SQS delete/batch-delete {pull}1567[#1567]

[float]
===== Bug fixes
Expand Down
23 changes: 16 additions & 7 deletions elasticapm/instrumentation/packages/botocore.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,26 +164,33 @@ def handle_sns(operation_name, service, instance, args, kwargs, context):
return HandlerInfo(signature, span_type, span_subtype, span_action, context)


SQS_OPERATIONS = {
"SendMessage": {"span_action": "send", "signature": "SEND to"},
"SendMessageBatch": {"span_action": "send_batch", "signature": "SEND_BATCH to"},
"ReceiveMessage": {"span_action": "receive", "signature": "RECEIVE from"},
"DeleteMessage": {"span_action": "delete", "signature": "DELETE from"},
"DeleteMessageBatch": {"span_action": "delete_batch", "signature": "DELETE_BATCH from"},
}


def handle_sqs(operation_name, service, instance, args, kwargs, context):
if operation_name not in ("SendMessage", "SendMessageBatch", "ReceiveMessage"):
op = SQS_OPERATIONS.get(operation_name, None)
if not op:
# only "publish" is handled specifically, other endpoints get the default treatment
return False
span_type = "messaging"
span_subtype = "sqs"
span_action = "send" if operation_name in ("SendMessage", "SendMessageBatch") else "receive"
topic_name = ""
batch = "_BATCH" if operation_name == "SendMessageBatch" else ""
signature_type = "RECEIVE from" if span_action == "receive" else f"SEND{batch} to"

if len(args) > 1:
topic_name = args[1]["QueueUrl"].rsplit("/", maxsplit=1)[-1]
signature = f"SQS {signature_type} {topic_name}".rstrip() if topic_name else f"SQS {signature_type}"
signature = f"SQS {op['signature']} {topic_name}".rstrip() if topic_name else f"SQS {op['signature']}"
context["destination"]["service"] = {
"name": span_subtype,
"resource": f"{span_subtype}/{topic_name}" if topic_name else span_subtype,
"type": span_type,
}
return HandlerInfo(signature, span_type, span_subtype, span_action, context)
return HandlerInfo(signature, span_type, span_subtype, op["span_action"], context)


def modify_span_sqs(span, args, kwargs):
Expand All @@ -200,7 +207,9 @@ def modify_span_sqs(span, args, kwargs):
attributes_count = len(attributes)
if "MessageAttributes" in args[1]:
messages = [args[1]]
elif "Entries" in args[1]:
# both send_batch and delete_batch use the same "Entries" list. We only want to add the
# traceparent to send_batch. We use the existence of ReceiptHandle to differentiate between the two
elif "Entries" in args[1] and args[1]["Entries"] and "ReceiptHandle" not in args[1]["Entries"][0]:
messages = args[1]["Entries"]
else:
messages = []
Expand Down
2 changes: 1 addition & 1 deletion tests/instrumentation/asyncio_tests/aiobotocore_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ async def test_sqs_send_batch(instrument, elasticapm_client, sqs_client_and_queu
assert span["name"] == "SQS SEND_BATCH to myqueue"
assert span["type"] == "messaging"
assert span["subtype"] == "sqs"
assert span["action"] == "send"
assert span["action"] == "send_batch"
assert span["context"]["destination"]["cloud"]["region"] == "us-east-1"
assert span["context"]["destination"]["service"]["name"] == "sqs"
assert span["context"]["destination"]["service"]["resource"] == "sqs/myqueue"
Expand Down
68 changes: 57 additions & 11 deletions tests/instrumentation/botocore_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ def test_sqs_send_batch(instrument, elasticapm_client, sqs_client_and_queue):
assert span["name"] == "SQS SEND_BATCH to myqueue"
assert span["type"] == "messaging"
assert span["subtype"] == "sqs"
assert span["action"] == "send"
assert span["action"] == "send_batch"
assert span["context"]["destination"]["cloud"]["region"] == "us-east-1"
assert span["context"]["destination"]["service"]["name"] == "sqs"
assert span["context"]["destination"]["service"]["resource"] == "sqs/myqueue"
Expand Down Expand Up @@ -324,7 +324,7 @@ def test_sqs_send_disttracing_dropped_span(instrument, elasticapm_client, sqs_cl
assert transaction.id in traceparent # due to DroppedSpan, transaction.id is used instead of span.id


def test_sqs_receive(instrument, elasticapm_client, sqs_client_and_queue):
def test_sqs_receive_and_delete(instrument, elasticapm_client, sqs_client_and_queue):
sqs, queue_url = sqs_client_and_queue
sqs.send_message(
QueueUrl=queue_url,
Expand All @@ -341,13 +341,59 @@ def test_sqs_receive(instrument, elasticapm_client, sqs_client_and_queue):
"All",
],
)
sqs.delete_message(QueueUrl=queue_url, ReceiptHandle=response["Messages"][0]["ReceiptHandle"])
elasticapm_client.end_transaction("test", "test")
span = elasticapm_client.events[constants.SPAN][0]
assert span["name"] == "SQS RECEIVE from myqueue"
assert span["type"] == "messaging"
assert span["subtype"] == "sqs"
assert span["action"] == "receive"
assert span["context"]["destination"]["cloud"]["region"] == "us-east-1"
assert span["context"]["destination"]["service"]["name"] == "sqs"
assert span["context"]["destination"]["service"]["resource"] == "sqs/myqueue"
assert span["context"]["destination"]["service"]["type"] == "messaging"

receive_span = elasticapm_client.events[constants.SPAN][0]
assert receive_span["name"] == "SQS RECEIVE from myqueue"
assert receive_span["type"] == "messaging"
assert receive_span["subtype"] == "sqs"
assert receive_span["action"] == "receive"
assert receive_span["context"]["destination"]["cloud"]["region"] == "us-east-1"
assert receive_span["context"]["destination"]["service"]["name"] == "sqs"
assert receive_span["context"]["destination"]["service"]["resource"] == "sqs/myqueue"
assert receive_span["context"]["destination"]["service"]["type"] == "messaging"

delete_span = elasticapm_client.events[constants.SPAN][1]
assert delete_span["name"] == "SQS DELETE from myqueue"
assert delete_span["type"] == "messaging"
assert delete_span["subtype"] == "sqs"
assert delete_span["action"] == "delete"
assert delete_span["context"]["destination"]["cloud"]["region"] == "us-east-1"
assert delete_span["context"]["destination"]["service"]["name"] == "sqs"
assert delete_span["context"]["destination"]["service"]["resource"] == "sqs/myqueue"
assert delete_span["context"]["destination"]["service"]["type"] == "messaging"


def test_sqs_delete_batch(instrument, elasticapm_client, sqs_client_and_queue):
sqs, queue_url = sqs_client_and_queue
sqs.send_message(
QueueUrl=queue_url,
MessageAttributes={
"Title": {"DataType": "String", "StringValue": "foo"},
},
MessageBody=("bar"),
)
response = sqs.receive_message(
QueueUrl=queue_url,
AttributeNames=["All"],
MessageAttributeNames=[
"All",
],
)
elasticapm_client.begin_transaction("test")
sqs.delete_message_batch(
QueueUrl=queue_url,
Entries=[{"Id": "foo", "ReceiptHandle": response["Messages"][0]["ReceiptHandle"]}],
)
elasticapm_client.end_transaction("test", "test")

delete_span = elasticapm_client.events[constants.SPAN][0]
assert delete_span["name"] == "SQS DELETE_BATCH from myqueue"
assert delete_span["type"] == "messaging"
assert delete_span["subtype"] == "sqs"
assert delete_span["action"] == "delete_batch"
assert delete_span["context"]["destination"]["cloud"]["region"] == "us-east-1"
assert delete_span["context"]["destination"]["service"]["name"] == "sqs"
assert delete_span["context"]["destination"]["service"]["resource"] == "sqs/myqueue"
assert delete_span["context"]["destination"]["service"]["type"] == "messaging"