Skip to content

Integration PubSub Extension

aryehcitron@gmail.com edited this page May 24, 2026 · 13 revisions

Integration: Google Cloud Pub/Sub Extension

Track Google Cloud Pub/Sub operations in your test diagrams using the Kronikol.Extensions.PubSub NuGet package. This extension wraps PublisherClient and SubscriberClient to capture publish/subscribe operations.

Using a shared library or abstraction layer? If your code doesn't use the Pub/Sub SDK directly — e.g. it goes through a shared messaging library, wrapper, or custom abstraction — this extension won't be able to intercept the underlying calls. See Tracking Custom Dependencies for alternative approaches including MessageTracker and TrackingProxy<T>.

Installation

dotnet add package Kronikol.Extensions.PubSub

Quick Start

Publisher

using Kronikol.Extensions.PubSub;

var publisher = await PublisherClient.CreateAsync(topicName);
var trackingPublisher = new TrackingPublisherClient(publisher, new PubSubTrackingOptions
{
    ServiceName = "PubSub",
    CallerName = "OrdersApi",
    CurrentTestInfoFetcher = () => (TestContext.CurrentTestName, TestContext.CurrentTestId)
});

await trackingPublisher.PublishAsync("Hello, Pub/Sub!");

Subscriber

var subscriber = await SubscriberClient.CreateAsync(subscriptionName);
var trackingSubscriber = new TrackingSubscriberClient(subscriber, new PubSubTrackingOptions
{
    ServiceName = "PubSub",
    CallerName = "WorkerService",
    CurrentTestInfoFetcher = () => (TestContext.CurrentTestName, TestContext.CurrentTestId)
});

await trackingSubscriber.StartAsync(async (msg, ct) =>
{
    // Process message
    return SubscriberClient.Reply.Ack;
});

How It Works

Google Cloud Pub/Sub uses gRPC internally, so DelegatingHandler interception is not viable. Instead, this extension uses a wrapper/decorator pattern:

  • TrackingPublisherClient wraps PublisherClient and logs publish operations
  • TrackingSubscriberClient wraps SubscriberClient and intercepts the message handler callback

PubSubTracker

Both wrappers use PubSubTracker as a central logging helper. It implements ITrackingComponent and handles request/response logging with RequestResponseMetaType.Event for messaging semantics.

Supported Operations

Operation Enum Value Source
Publish (single) PubSubOperation.Publish TrackingPublisherClient
Publish (batch) PubSubOperation.PublishBatch TrackingPublisherClient
Receive PubSubOperation.Receive TrackingSubscriberClient callback
Pull PubSubOperation.Pull Low-level API
Acknowledge PubSubOperation.Acknowledge Low-level API
ModifyAckDeadline PubSubOperation.ModifyAckDeadline Low-level API
StartSubscriber PubSubOperation.StartSubscriber Lifecycle
StopSubscriber PubSubOperation.StopSubscriber Lifecycle

Resource Name Extraction

GCP Pub/Sub uses full resource paths like projects/my-project/topics/my-topic. At Detailed verbosity, short names are extracted:

  • projects/my-project/topics/my-topicmy-topic
  • projects/my-project/subscriptions/my-submy-sub

Verbosity Levels

PubSubTrackingVerbosity.Summarised

  • Label: Operation name (e.g. Publish)
  • Content: Omitted
  • URI: pubsub:///my-topic

PubSubTrackingVerbosity.Detailed

  • Label: Operation with target (e.g. Publish → my-topic, Receive ← my-sub)
  • Content: Message data (UTF-8 text)
  • URI: pubsub:///my-topic

PubSubTrackingVerbosity.Raw

  • Label: Full info including resource paths
  • Content: Message data + response details
  • URI: pubsub:///projects/p/topics/my-topic

Configuration

DI Decoration (v2.27.14+ — Zero Prod Changes)

Recommended. Decorates all existing PublisherClient and SubscriberClient registrations with tracking wrappers, and registers a singleton PubSubTracker. IHttpContextAccessor is automatically resolved from DI.

// In your test's ConfigureTestServices:
services.AddPubSubTestTracking(options =>
{
    options.ServiceName = "PubSub";
    options.CallerName = "OrdersApi";
    options.CurrentTestInfoFetcher = CurrentTestInfo.Fetcher;
});

This uses DecorateAll<PublisherClient> and DecorateAll<SubscriberClient> internally — production code that resolves PublisherClient or SubscriberClient from DI will receive the tracking subclass transparently. No changes to production code required.

v2.27.14 Breaking Change: TrackingPublisherClient now inherits from PublisherClient (previously a composition wrapper). TrackingSubscriberClient now inherits from SubscriberClient. The Inner property and existing API remain available.

Manual Configuration

new PubSubTrackingOptions
{
    ServiceName = "PubSub",
    CallerName = "OrdersApi",
    Verbosity = PubSubTrackingVerbosity.Detailed,
    CurrentTestInfoFetcher = () => (testName, testId)
}

PubSubTrackingOptions

Property Type Default Description
ServiceName string "PubSub" Display name in diagrams for the Pub/Sub service
CallerName string "Caller" Calling service name in diagrams
Verbosity PubSubTrackingVerbosity Detailed Verbosity level (Raw, Detailed, Summarised)
CurrentTestInfoFetcher Func<(string Name, string Id)>? null Required: provides test context for log correlation
CurrentStepTypeFetcher Func<string?>? null Optional — returns the current BDD step type (Given/When/Then)
HttpContextAccessor IHttpContextAccessor? null Optional — enables dual-resolution of test identity from HTTP headers
SetupVerbosity PubSubTrackingVerbosity? null Verbosity override for the Setup phase. See Phase-Aware Tracking
ActionVerbosity PubSubTrackingVerbosity? null Verbosity override for the Action phase. See Phase-Aware Tracking
TrackDuringSetup bool true When false, tracking is suppressed during Setup. See Phase-Aware Tracking
TrackDuringAction bool true When false, tracking is suppressed during Action. See Phase-Aware Tracking
PropagateTestIdentity bool true When true, publishers inject kronikol-test-name / kronikol-test-id into PubsubMessage.Attributes and subscribers extract them to establish TestIdentityScope. See Background Thread Correlation#Solution 1b: Automatic Message Header Propagation (v2.34.0+)
AutoCorrelateOnConsume bool true When true, consumed messages auto-populate TestCorrelationStore for parallel-safe background thread correlation

v2.23.0+ Dual-Resolution: PubSubTracker accepts an optional IHttpContextAccessor? httpContextAccessor constructor parameter for resolving test identity from HTTP request headers when running inside the SUT's request pipeline. See HTTP Tracking Setup#Dual-Resolution Test Identity (v2.23.0+) for details. The AddPubSubTestTracking() DI helper (v2.23.11+) resolves this automatically.

ITrackingComponent

PubSubTracker implements ITrackingComponent and auto-registers:

  • ComponentName: "PubSubTracker ({ServiceName})"
  • WasInvoked: true after first operation
  • InvocationCount: Total operations logged

Test Identity Propagation (v2.34.0+)

When your application processes Pub/Sub messages on background threads (e.g. inside a pull subscriber or push endpoint), the test framework's TestContext is unavailable. The extension solves this automatically via message attribute propagation.

How It Works

  1. Publisher side: TrackingPublisherClient injects kronikol-test-name and kronikol-test-id into each message's Attributes dictionary before publishing.
  2. Subscriber side: TrackingSubscriberClient reads these attributes from each received message and calls TestIdentityScope.SetFromMessage().
  3. All subsequent tracking within the message handler resolves to the originating test.

Disabling Propagation

var options = new PubSubTrackingOptions
{
    PropagateTestIdentity = false,
    // ...
};

See Background Thread Correlation#Solution 1b: Automatic Message Header Propagation (v2.34.0+) for the full architecture overview.

See Also

Home


Demo


Getting Started

Common Tasks

Integration Guides

Extensions

Configuration

Features

Reference

Clone this wiki locally