Skip to content

Integration EventHubs Extension

aryehcitron@gmail.com edited this page May 24, 2026 · 11 revisions

Integration: Azure Event Hubs Extension

Track Azure Event Hubs operations in your test diagrams using the Kronikol.Extensions.EventHubs NuGet package. This extension wraps EventHubProducerClient and EventHubConsumerClient to intercept send/receive operations.

Using a shared library or abstraction layer? If your code doesn't use the Event Hubs SDK directly — e.g. it goes through MassTransit, a shared messaging library, or a custom abstraction — see the MassTransit Extension or Tracking Custom Dependencies for alternative approaches including MessageTracker and TrackingProxy<T>.

Installation

dotnet add package Kronikol.Extensions.EventHubs

Quick Start

DI Decoration (Recommended — Zero Prod Changes)

v2.27.14+ Use the built-in DI extension methods. They automatically resolve IHttpContextAccessor from DI.

// In your test's ConfigureTestServices:
services.AddEventHubsTestTracking(options =>
{
    options.ServiceName = "EventHubs";
    options.CallerName = "OrdersApi";
    options.Verbosity = EventHubsTrackingVerbosity.Detailed;
    options.CurrentTestInfoFetcher = CurrentTestInfo.Fetcher;
});

This decorates both EventHubProducerClient and EventHubConsumerClient registrations. You can also decorate them individually:

services.AddEventHubsProducerTestTracking(options => { /* ... */ });
services.AddEventHubsConsumerTestTracking(options => { /* ... */ });

Uses DecorateAll<EventHubProducerClient> / DecorateAll<EventHubConsumerClient> internally and preserves the original service lifetime. No changes to production code required.

Note: TrackingEventHubProducerClient now inherits from EventHubProducerClient. The base class properties (EventHubName, FullyQualifiedNamespace, IsClosed) are not virtual in the Azure SDK and are accessed via new shadowing — they work correctly when accessed through the tracking type directly, but may throw if accessed through a polymorphic EventHubProducerClient reference with no underlying connection. The virtual methods (SendAsync, CreateBatchAsync, CloseAsync, etc.) work correctly in all scenarios.

Manual Wrapping

using Azure.Messaging.EventHubs.Producer;
using Kronikol.Extensions.EventHubs;

var options = new EventHubsTrackingOptions
{
    ServiceName = "EventHubs",
    CallerName = "OrdersApi",
    Verbosity = EventHubsTrackingVerbosity.Detailed,
    CurrentTestInfoFetcher = () => (TestContext.CurrentTestName, TestContext.CurrentTestId)
};

var innerClient = new EventHubProducerClient(connectionString, eventHubName);
var producer = new TrackingEventHubProducerClient(innerClient, options);

await producer.SendAsync(new[] { new EventData("order-created") });

How It Works

Azure Event Hubs uses AMQP internally and does not expose an HTTP DelegatingHandler pipeline. Like the Service Bus and Pub/Sub extensions, this extension uses the wrapper/decorator pattern — standalone classes that wrap the SDK client and intercept method calls.

The EventHubsTracker (central logging helper) handles all request/response logging with RequestResponseMetaType.Event for async messaging notation in PlantUML diagrams.

Supported Operations

Operation Enum Value Source
Send EventHubsOperation.Send Single event via SendAsync
SendBatch EventHubsOperation.SendBatch Multiple events via SendAsync
CreateBatch EventHubsOperation.CreateBatch CreateBatchAsync
ReadEvents EventHubsOperation.ReadEvents ReadEventsAsync
ReadEventsFromPartition EventHubsOperation.ReadEventsFromPartition ReadEventsFromPartitionAsync
GetPartitionIds EventHubsOperation.GetPartitionIds GetPartitionIdsAsync
GetEventHubProperties EventHubsOperation.GetEventHubProperties GetEventHubPropertiesAsync
GetPartitionProperties EventHubsOperation.GetPartitionProperties GetPartitionPropertiesAsync
StartProcessing EventHubsOperation.StartProcessing StartProcessingAsync
StopProcessing EventHubsOperation.StopProcessing StopProcessingAsync
ProcessEvent EventHubsOperation.ProcessEvent Event handler callback

Wrapper Classes

TrackingEventHubProducerClient

Wraps EventHubProducerClient:

  • SendAsync(IEnumerable<EventData>) — tracks single/batch sends
  • SendAsync(IEnumerable<EventData>, SendEventOptions) — tracks with partition key
  • SendAsync(EventDataBatch) — tracks batch sends
  • CreateBatchAsync() — delegates without tracking
  • CloseAsync() — delegates without tracking
  • Inner property for direct access to the underlying client

TrackingEventHubConsumerClient

Wraps EventHubConsumerClient:

  • ReadEventsAsync() — logs start, yields events, logs completion
  • ReadEventsFromPartitionAsync() — logs with partition ID
  • GetPartitionIdsAsync() — delegates directly
  • CloseAsync() — delegates without tracking
  • Inner property for direct access

Verbosity Levels

EventHubsTrackingVerbosity.Summarised

  • Label: Operation name (e.g. Send)
  • Content: Omitted
  • URI: eventhubs:///hub-name

EventHubsTrackingVerbosity.Detailed

  • Label: Contextual (e.g. Send → telemetry, Read ← telemetry[2])
  • Content: Event body included
  • URI: eventhubs:///hub-name/partition-id when applicable

EventHubsTrackingVerbosity.Raw

  • Label: Full details (e.g. Send hub=telemetry partition=1 count=3)
  • Content: Full event data
  • URI: eventhubs:///hub-name/partition-id

Configuration Options

new EventHubsTrackingOptions
{
    ServiceName = "EventHubs",
    CallerName = "OrdersApi",
    Verbosity = EventHubsTrackingVerbosity.Detailed,
    CurrentTestInfoFetcher = () => (testName, testId),
}

EventHubsTrackingOptions

Property Type Default Description
ServiceName string "EventHubs" Display name in diagrams for the Event Hubs service
CallerName string "Caller" Calling service name in diagrams
Verbosity EventHubsTrackingVerbosity Detailed Verbosity level (Raw, Detailed, Summarised)
CurrentTestInfoFetcher Func<(string Name, string Id)>? null Required: provides test context for log correlation
CurrentStepTypeFetcher Func<string?>? null Optional — returns the current BDD step type (Given/When/Then)
HttpContextAccessor IHttpContextAccessor? null Optional — enables dual-resolution of test identity from HTTP headers. Auto-resolved by DI extensions (v2.26.3+). See HTTP Tracking Setup#Dual-Resolution Test Identity (v2.23.0+)
SetupVerbosity EventHubsTrackingVerbosity? null Verbosity override for the Setup phase. See Phase-Aware Tracking
ActionVerbosity EventHubsTrackingVerbosity? null Verbosity override for the Action phase. See Phase-Aware Tracking
TrackDuringSetup bool true When false, tracking is suppressed during Setup. See Phase-Aware Tracking
TrackDuringAction bool true When false, tracking is suppressed during Action. See Phase-Aware Tracking
PropagateTestIdentity bool true When true, producers inject kronikol-test-name / kronikol-test-id into EventData.Properties and consumers extract them to establish TestIdentityScope. See Background Thread Correlation#Solution 1b: Automatic Message Header Propagation (v2.34.0+)
AutoCorrelateOnConsume bool true When true, consumed events auto-populate TestCorrelationStore for parallel-safe background thread correlation

v2.23.0+ Dual-Resolution: EventHubsTracker accepts an optional IHttpContextAccessor? httpContextAccessor constructor parameter for resolving test identity from HTTP request headers when running inside the SUT's request pipeline. v2.26.3+: Set HttpContextAccessor on EventHubsTrackingOptions instead — the tracker reads it automatically. See HTTP Tracking Setup#Dual-Resolution Test Identity (v2.23.0+) for details.

ITrackingComponent

EventHubsTracker implements ITrackingComponent and auto-registers with TrackingComponentRegistry. This provides:

  • ComponentName: "EventHubsTracker ({ServiceName})"
  • WasInvoked: true after first operation
  • InvocationCount: Total operations tracked

URI Scheme

eventhubs:///telemetry          (hub-level operations)
eventhubs:///telemetry/2        (partition-specific operations)

Test Identity Propagation (v2.34.0+)

When your application processes Event Hubs events on background threads (e.g. inside an EventProcessorClient), the test framework's TestContext is unavailable. The extension solves this automatically via EventData.Properties propagation.

How It Works

  1. Producer side: TrackingEventHubProducerClient injects kronikol-test-name and kronikol-test-id into each event's Properties dictionary before sending.
  2. Consumer side: TrackingEventHubConsumerClient reads these properties from each received event and calls TestIdentityScope.SetFromMessage().
  3. All subsequent tracking within the event handler resolves to the originating test.

Disabling Propagation

var options = new EventHubsTrackingOptions
{
    PropagateTestIdentity = false,
    // ...
};

See Background Thread Correlation#Solution 1b: Automatic Message Header Propagation (v2.34.0+) for the full architecture overview.

See Also

Home


Demo


Getting Started

Common Tasks

Integration Guides

Extensions

Configuration

Features

Reference

Clone this wiki locally