-
Notifications
You must be signed in to change notification settings - Fork 1
Integration PubSub Extension
Track Google Cloud Pub/Sub operations in your test diagrams using the Kronikol.Extensions.PubSub NuGet package. This extension wraps PublisherClient and SubscriberClient to capture publish/subscribe operations.
Using a shared library or abstraction layer? If your code doesn't use the Pub/Sub SDK directly — e.g. it goes through a shared messaging library, wrapper, or custom abstraction — this extension won't be able to intercept the underlying calls. See Tracking Custom Dependencies for alternative approaches including
MessageTrackerandTrackingProxy<T>.
dotnet add package Kronikol.Extensions.PubSub
using Kronikol.Extensions.PubSub;
var publisher = await PublisherClient.CreateAsync(topicName);
var trackingPublisher = new TrackingPublisherClient(publisher, new PubSubTrackingOptions
{
ServiceName = "PubSub",
CallerName = "OrdersApi",
CurrentTestInfoFetcher = () => (TestContext.CurrentTestName, TestContext.CurrentTestId)
});
await trackingPublisher.PublishAsync("Hello, Pub/Sub!");var subscriber = await SubscriberClient.CreateAsync(subscriptionName);
var trackingSubscriber = new TrackingSubscriberClient(subscriber, new PubSubTrackingOptions
{
ServiceName = "PubSub",
CallerName = "WorkerService",
CurrentTestInfoFetcher = () => (TestContext.CurrentTestName, TestContext.CurrentTestId)
});
await trackingSubscriber.StartAsync(async (msg, ct) =>
{
// Process message
return SubscriberClient.Reply.Ack;
});Google Cloud Pub/Sub uses gRPC internally, so DelegatingHandler interception is not viable. Instead, this extension uses a wrapper/decorator pattern:
-
TrackingPublisherClientwrapsPublisherClientand logs publish operations -
TrackingSubscriberClientwrapsSubscriberClientand intercepts the message handler callback
Both wrappers use PubSubTracker as a central logging helper. It implements ITrackingComponent and handles request/response logging with RequestResponseMetaType.Event for messaging semantics.
| Operation | Enum Value | Source |
|---|---|---|
| Publish (single) | PubSubOperation.Publish |
TrackingPublisherClient |
| Publish (batch) | PubSubOperation.PublishBatch |
TrackingPublisherClient |
| Receive | PubSubOperation.Receive |
TrackingSubscriberClient callback |
| Pull | PubSubOperation.Pull |
Low-level API |
| Acknowledge | PubSubOperation.Acknowledge |
Low-level API |
| ModifyAckDeadline | PubSubOperation.ModifyAckDeadline |
Low-level API |
| StartSubscriber | PubSubOperation.StartSubscriber |
Lifecycle |
| StopSubscriber | PubSubOperation.StopSubscriber |
Lifecycle |
GCP Pub/Sub uses full resource paths like projects/my-project/topics/my-topic. At Detailed verbosity, short names are extracted:
-
projects/my-project/topics/my-topic→my-topic -
projects/my-project/subscriptions/my-sub→my-sub
-
Label: Operation name (e.g.
Publish) - Content: Omitted
-
URI:
pubsub:///my-topic
-
Label: Operation with target (e.g.
Publish → my-topic,Receive ← my-sub) - Content: Message data (UTF-8 text)
-
URI:
pubsub:///my-topic
- Label: Full info including resource paths
- Content: Message data + response details
-
URI:
pubsub:///projects/p/topics/my-topic
Recommended. Decorates all existing
PublisherClientandSubscriberClientregistrations with tracking wrappers, and registers a singletonPubSubTracker.IHttpContextAccessoris automatically resolved from DI.
// In your test's ConfigureTestServices:
services.AddPubSubTestTracking(options =>
{
options.ServiceName = "PubSub";
options.CallerName = "OrdersApi";
options.CurrentTestInfoFetcher = CurrentTestInfo.Fetcher;
});This uses DecorateAll<PublisherClient> and DecorateAll<SubscriberClient> internally — production code that resolves PublisherClient or SubscriberClient from DI will receive the tracking subclass transparently. No changes to production code required.
v2.27.14 Breaking Change:
TrackingPublisherClientnow inherits fromPublisherClient(previously a composition wrapper).TrackingSubscriberClientnow inherits fromSubscriberClient. TheInnerproperty and existing API remain available.
new PubSubTrackingOptions
{
ServiceName = "PubSub",
CallerName = "OrdersApi",
Verbosity = PubSubTrackingVerbosity.Detailed,
CurrentTestInfoFetcher = () => (testName, testId)
}| Property | Type | Default | Description |
|---|---|---|---|
ServiceName |
string |
"PubSub" |
Display name in diagrams for the Pub/Sub service |
CallerName |
string |
"Caller" |
Calling service name in diagrams |
Verbosity |
PubSubTrackingVerbosity |
Detailed |
Verbosity level (Raw, Detailed, Summarised) |
CurrentTestInfoFetcher |
Func<(string Name, string Id)>? |
null |
Required: provides test context for log correlation |
CurrentStepTypeFetcher |
Func<string?>? |
null |
Optional — returns the current BDD step type (Given/When/Then) |
HttpContextAccessor |
IHttpContextAccessor? |
null |
Optional — enables dual-resolution of test identity from HTTP headers |
SetupVerbosity |
PubSubTrackingVerbosity? |
null |
Verbosity override for the Setup phase. See Phase-Aware Tracking |
ActionVerbosity |
PubSubTrackingVerbosity? |
null |
Verbosity override for the Action phase. See Phase-Aware Tracking |
TrackDuringSetup |
bool |
true |
When false, tracking is suppressed during Setup. See Phase-Aware Tracking
|
TrackDuringAction |
bool |
true |
When false, tracking is suppressed during Action. See Phase-Aware Tracking
|
PropagateTestIdentity |
bool |
true |
When true, publishers inject kronikol-test-name / kronikol-test-id into PubsubMessage.Attributes and subscribers extract them to establish TestIdentityScope. See Background Thread Correlation#Solution 1b: Automatic Message Header Propagation (v2.34.0+)
|
AutoCorrelateOnConsume |
bool |
true |
When true, consumed messages auto-populate TestCorrelationStore for parallel-safe background thread correlation |
v2.23.0+ Dual-Resolution:
PubSubTrackeraccepts an optionalIHttpContextAccessor? httpContextAccessorconstructor parameter for resolving test identity from HTTP request headers when running inside the SUT's request pipeline. See HTTP Tracking Setup#Dual-Resolution Test Identity (v2.23.0+) for details. TheAddPubSubTestTracking()DI helper (v2.23.11+) resolves this automatically.
PubSubTracker implements ITrackingComponent and auto-registers:
-
ComponentName:"PubSubTracker ({ServiceName})" -
WasInvoked:trueafter first operation -
InvocationCount: Total operations logged
When your application processes Pub/Sub messages on background threads (e.g. inside a pull subscriber or push endpoint), the test framework's TestContext is unavailable. The extension solves this automatically via message attribute propagation.
-
Publisher side:
TrackingPublisherClientinjectskronikol-test-nameandkronikol-test-idinto each message'sAttributesdictionary before publishing. -
Subscriber side:
TrackingSubscriberClientreads these attributes from each received message and callsTestIdentityScope.SetFromMessage(). - All subsequent tracking within the message handler resolves to the originating test.
var options = new PubSubTrackingOptions
{
PropagateTestIdentity = false,
// ...
};See Background Thread Correlation#Solution 1b: Automatic Message Header Propagation (v2.34.0+) for the full architecture overview.
Getting Started
Common Tasks
Integration Guides
- Integration xUnit3
- Integration xUnit2
- Integration NUnit
- Integration MSTest
- Integration TUnit
- Integration BDDfy xUnit3
- Integration LightBDD xUnit2
- Integration LightBDD xUnit3
- Integration LightBDD TUnit
- Integration ReqNRoll xUnit2
- Integration ReqNRoll xUnit3
- Integration ReqNRoll TUnit
Extensions
- Integration AtlasDataApi Extension
- Integration BigQuery Extension
- Integration Bigtable Extension
- Integration BlobStorage Extension
- Integration CloudStorage Extension
- Integration CosmosDB Extension
- Integration Dapper Extension
- Integration DynamoDB Extension
- Integration EF Core Relational Extension
- Integration Elasticsearch Extension
- Integration EventBridge Extension
- Integration EventHubs Extension
- Integration Grpc Extension
- Integration Kafka Extension
- Integration MassTransit Extension
- Integration MongoDB Extension
- Integration MySqlConnector Extension
- Integration Npgsql Extension
- Integration Oracle Extension
- Integration PubSub Extension
- Integration Redis Extension
- Integration S3 Extension
- Integration ServiceBus Extension
- Integration SNS Extension
- Integration Spanner Extension
- Integration SqlClient Extension
- Integration Sqlite Extension
- Integration SQS Extension
- Integration StorageQueues Extension
- Integration OpenTelemetry Extension
- Integration DispatchProxy Extension
- Integration MediatR Extension
- Integration PlantUML IKVM
Configuration
- Tracking Dependencies
- Tracking Custom Dependencies
- HTTP Tracking Setup
- Report Configuration
- Diagram Customisation
- Phase-Aware Tracking
- Content Formatting
- PlantUML Server Configuration
Features
- Generated Reports
- Search Syntax
- Component Diagrams
- PlantUML Browser Rendering
- Inline SVG Rendering
- Internal Flow Tracking
- Tags and Attributes
- Excluding Requests
- Excluded Headers
- Multi-Host Test Architectures
- Event-Driven Architecture Testing
- Service Bus Tracking Patterns
- Background Thread Correlation
- Parallel-Safe Background Correlation
- Event & Message Tracking
- Assertion Tracking
- Step Tracking
- Tabular Attributes
- Large Response and Diagram Handling
- Diagnostics and Debugging
- CI Summary Integration
- CI Artifact Upload
Reference