Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,14 @@
# confluent-kafka-javascript 1.6.0

v1.6.0 is a feature release. It is supported for all usage.

### Enhancements

1. References librdkafka v2.12.0. Refer to the [librdkafka v2.12.0 release notes](https://github.com/confluentinc/librdkafka/releases/tag/v2.12.0) for more information.
Copy link

Copilot AI Sep 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The changelog references librdkafka v2.12.0 but the package.json uses "dev_oauthbearer_metadata_based". These versions should be consistent.

Suggested change
1. References librdkafka v2.12.0. Refer to the [librdkafka v2.12.0 release notes](https://github.com/confluentinc/librdkafka/releases/tag/v2.12.0) for more information.
1. References librdkafka "dev_oauthbearer_metadata_based". Refer to the [librdkafka dev_oauthbearer_metadata_based branch](https://github.com/confluentinc/librdkafka/tree/dev_oauthbearer_metadata_based) for more information.

Copilot uses AI. Check for mistakes.
2. OAuth OIDC method for Schema Registry metadata based authentication with
an Azure IMDS endpoint using an attached managed identity as principal (#).


# confluent-kafka-javascript 1.5.0

v1.5.0 is a feature release. It is supported for all usage.
Expand Down
2 changes: 1 addition & 1 deletion lib/error.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ LibrdKafkaError.wrap = errorWrap;
* @constant
* @memberof RdKafka
*/
// ====== Generated from librdkafka 2.11.1 file src-cpp/rdkafkacpp.h ======
// ====== Generated from librdkafka dev_oauthbearer_metadata_based file src-cpp/rdkafkacpp.h ======
LibrdKafkaError.codes = {

/* Internal errors to rdkafka: */
Expand Down
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
"name": "@confluentinc/kafka-javascript",
"version": "1.5.0",
"description": "Node.js bindings for librdkafka",
"librdkafka": "2.11.1",
"librdkafka_win": "2.11.1",
"librdkafka": "dev_oauthbearer_metadata_based",
"librdkafka_win": "dev_oauthbearer_metadata_based",
"main": "lib/index.js",
"types": "types/index.d.ts",
"scripts": {
Expand Down
4 changes: 3 additions & 1 deletion schemaregistry-examples/src/constants.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { BasicAuthCredentials } from '@confluentinc/schemaregistry';

const issuerEndpointUrl = '<your-issuer-endpoint-url>'; // e.g. 'https://dev-123456.okta.com/oauth2/default/v1/token';
const azureIMDSIssuerEndpointQuery = 'api-version=&resource=&client_id='; // e.g. 'api-version=<latest_version>&resource=api://<your-api-id>&client_id=<your-client-id>';
const oauthClientId = '<your-client-id>';
const oauthClientSecret = '<your-client-secret>';
const scope = '<your-scope>'; // e.g. 'schemaregistry';
Expand All @@ -23,6 +24,7 @@ const basicAuthCredentials: BasicAuthCredentials = {
};

export {
issuerEndpointUrl, oauthClientId, oauthClientSecret, scope, identityPoolId, kafkaLogicalCluster, schemaRegistryLogicalCluster,
issuerEndpointUrl,
azureIMDSIssuerEndpointQuery, oauthClientId, oauthClientSecret, scope, identityPoolId, kafkaLogicalCluster, schemaRegistryLogicalCluster,
baseUrl, clusterBootstrapUrl, clusterApiKey, clusterApiSecret, basicAuthCredentials, localAuthCredentials
};
93 changes: 91 additions & 2 deletions schemaregistry-examples/src/kafka-oauth.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ import { KafkaJS } from '@confluentinc/kafka-javascript';
import {
clusterBootstrapUrl,
baseUrl,
issuerEndpointUrl, oauthClientId, oauthClientSecret, scope,
issuerEndpointUrl,
azureIMDSIssuerEndpointQuery,
oauthClientId, oauthClientSecret, scope,
identityPoolId, schemaRegistryLogicalCluster, kafkaLogicalCluster
} from "./constants";
import axios from 'axios';
Expand Down Expand Up @@ -134,4 +136,91 @@ async function kafkaProducerAvro() {
await producer.disconnect();
}

kafkaProducerAvro();
async function kafkaProducerAvroAzureIMDS() {

const createAxiosDefaults: CreateAxiosDefaults = {
timeout: 10000
};

const bearerAuthCredentials: BearerAuthCredentials = {
credentialsSource: 'OAUTHBEARER_AZURE_IMDS',
issuerEndpointQuery: azureIMDSIssuerEndpointQuery,
logicalCluster: schemaRegistryLogicalCluster,
identityPoolId: identityPoolId,
}

const clientConfig: ClientConfig = {
baseURLs: [baseUrl],
createAxiosDefaults: createAxiosDefaults,
cacheCapacity: 512,
cacheLatestTtlSecs: 60,
bearerAuthCredentials
};

const schemaRegistryClient = new SchemaRegistryClient(clientConfig);

const kafka: KafkaJS.Kafka = new KafkaJS.Kafka({
'bootstrap.servers': clusterBootstrapUrl,
'security.protocol': 'sasl_ssl',
'sasl.mechanism': 'OAUTHBEARER',
'sasl.oauthbearer.method': 'oidc',
'sasl.oauthbearer.metadata.authentication.type': 'azure_imds',
'sasl.oauthbearer.config': 'query=' + azureIMDSIssuerEndpointQuery,
'sasl.oauthbearer.extensions': `logicalCluster=${kafkaLogicalCluster},identityPoolId=${identityPoolId}`
});

const producer: KafkaJS.Producer = kafka.producer({
kafkaJS: {
allowAutoTopicCreation: true,
acks: 1,
compression: KafkaJS.CompressionTypes.GZIP,
}
});

console.log("Producer created");

const schemaString: string = JSON.stringify({
type: 'record',
name: 'User',
fields: [
{ name: 'name', type: 'string' },
{ name: 'age', type: 'int' },
],
});

const schemaInfo: SchemaInfo = {
schemaType: 'AVRO',
schema: schemaString,
};

const userTopic = 'example-user-topic';
await schemaRegistryClient.register(userTopic + "-value", schemaInfo);

const userInfo = { name: 'Alice N Bob', age: 30 };

const avroSerializerConfig: AvroSerializerConfig = { useLatestVersion: true };

const serializer: AvroSerializer = new AvroSerializer(schemaRegistryClient, SerdeType.VALUE, avroSerializerConfig);

const outgoingMessage = {
key: "1",
value: await serializer.serialize(userTopic, userInfo)
};

console.log("Outgoing message: ", outgoingMessage);

await producer.connect();

await producer.send({
topic: userTopic,
messages: [outgoingMessage]
});

await producer.disconnect();
}

async function main() {
await kafkaProducerAvro();
await kafkaProducerAvroAzureIMDS();
}
main();
102 changes: 102 additions & 0 deletions schemaregistry/oauth/abstract-oauth-client.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
import { sleep, fullJitter, isRetriable } from '../retry-helper';
import { isBoom } from '@hapi/boom';
import {
_BearerTokenProvider as BearerTokenProvider,
_BearerTokenProviderBuilder as BearerTokenProviderBuilder
} from './bearer-token-provider';
import { BearerAuthCredentials } from '../rest-service';

abstract class AbstractBearerTokenProviderBuilder implements BearerTokenProviderBuilder {

protected bearerAuthCredentials : BearerAuthCredentials;

constructor(
bearerAuthCredentials: BearerAuthCredentials) {
this.bearerAuthCredentials = bearerAuthCredentials;
}

protected validate() {
const headers = ['logicalCluster', 'identityPoolId'];
const missingHeader = headers.find(header => !(header in this.bearerAuthCredentials));

if (missingHeader) {
throw new Error(`Bearer auth header '${missingHeader}' not provided`);
}
}

abstract build(maxRetries: number, retriesWaitMs: number, retriesMaxWaitMs: number): BearerTokenProvider;
}

abstract class AbstractOauthTokenProvider implements BearerTokenProvider {

private additionalHeaders: Record<string, string>;

constructor(bearerAuthCredentials: BearerAuthCredentials) {
this.additionalHeaders = {
'target-sr-cluster': bearerAuthCredentials.logicalCluster!,
'Confluent-Identity-Pool-Id': bearerAuthCredentials.identityPoolId!,
};
}

abstract getAccessToken(): Promise<string>

abstract tokenExpired(): boolean;

getAdditionalHeaders(): Record<string, string> {
return this.additionalHeaders;
}
}

abstract class AbstractOAuthClient extends AbstractOauthTokenProvider {
private token: string | null = null;
private maxRetries: number;
private retriesWaitMs: number;
private retriesMaxWaitMs: number;

constructor(bearerAuthCredentials: BearerAuthCredentials,
maxRetries: number, retriesWaitMs: number, retriesMaxWaitMs: number
) {
super(bearerAuthCredentials);
this.maxRetries = maxRetries;
this.retriesWaitMs = retriesWaitMs;
this.retriesMaxWaitMs = retriesMaxWaitMs;
}

abstract fetchToken(): Promise<string>;

override async getAccessToken(): Promise<string> {
if (this.token === null || this.tokenExpired()) {
await this.generateAccessToken();
if (this.token === null)
throw new Error(`token must be available here`);
}

return this.token;
}

async generateAccessToken(): Promise<void> {
for (let i = 0; i < this.maxRetries + 1; i++) {
try {
this.token = await this.fetchToken();
return;
} catch (error: any) {
if (isBoom(error) && i < this.maxRetries) {
const statusCode = error.output.statusCode;
if (isRetriable(statusCode)) {
const waitTime = fullJitter(this.retriesWaitMs, this.retriesMaxWaitMs, i);
await sleep(waitTime);
continue;
}
}
throw new Error(`Failed to get token from server: ${error}`);
}
}
}
}

// internal/testing usage only
export {
AbstractBearerTokenProviderBuilder as _AbstractBearerTokenProviderBuilder,
AbstractOauthTokenProvider as _AbstractOauthTokenProvider,
AbstractOAuthClient as _AbstractOAuthClient,
}
16 changes: 16 additions & 0 deletions schemaregistry/oauth/bearer-token-provider.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
interface BearerTokenProvider {
getAccessToken(): Promise<string>;
getAdditionalHeaders(): Record<string, string>;
tokenExpired(): boolean;
}

interface BearerTokenProviderBuilder {
build(maxRetries: number, retriesWaitMs: number, retriesMaxWaitMs: number): BearerTokenProvider
}

// internal/testing usage only
export {
BearerTokenProvider as _BearerTokenProvider,
BearerTokenProviderBuilder as _BearerTokenProviderBuilder
}

102 changes: 102 additions & 0 deletions schemaregistry/oauth/oauth-client-azure-imds.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
import {
_AbstractOAuthClient as AbstractOAuthClient,
_AbstractBearerTokenProviderBuilder as AbstractBearerTokenProviderBuilder
} from './abstract-oauth-client';
import Wreck from '@hapi/wreck';
import { BearerAuthCredentials } from '../rest-service';
import {
_BearerTokenProvider as BearerTokenProvider
} from './bearer-token-provider';

const TOKEN_EXPIRATION_THRESHOLD_PERCENTAGE = 0.8;

class AzureIMDSBearerToken {
access_token?: string = undefined;
expires_in?: string = undefined;
expires_on?: string = undefined;
}

class AzureIMDSOAuthClientBuilder extends AbstractBearerTokenProviderBuilder {
constructor(
bearerAuthCredentials: BearerAuthCredentials) {
super(bearerAuthCredentials);
}

protected override validate() {
super.validate();
if (!this.bearerAuthCredentials.issuerEndpointUrl &&
!this.bearerAuthCredentials.issuerEndpointQuery)
throw new Error(`Missing required configuration property: issuerEndpointQuery`);
}

override build(maxRetries: number, retriesWaitMs: number, retriesMaxWaitMs: number): BearerTokenProvider {
this.validate();
return new AzureIMDSOAuthClient(this.bearerAuthCredentials, maxRetries, retriesWaitMs, retriesMaxWaitMs);
}
}

class AzureIMDSOAuthClient extends AbstractOAuthClient {

private tokenEndpoint: string;
private tokenObject?: AzureIMDSBearerToken;
private static readonly DEFAULT_AZURE_IMDS_TOKEN_ENDPOINT : string = 'http://169.254.169.254/metadata/identity/oauth2/token';

constructor(bearerAuthCredentials: BearerAuthCredentials,
maxRetries: number, retriesWaitMs: number,
retriesMaxWaitMs: number
) {
super(bearerAuthCredentials, maxRetries, retriesWaitMs, retriesMaxWaitMs);
this.tokenEndpoint = bearerAuthCredentials.issuerEndpointUrl ||
AzureIMDSOAuthClient.DEFAULT_AZURE_IMDS_TOKEN_ENDPOINT;
if (bearerAuthCredentials.issuerEndpointQuery) {
const url = new URL(this.tokenEndpoint);
url.search = bearerAuthCredentials.issuerEndpointQuery;
url.hash = '';
this.tokenEndpoint = url.toString();
}
}

override async fetchToken(): Promise<string> {
const { payload } = await Wreck.get<AzureIMDSBearerToken>(
this.tokenEndpoint, {
headers: {
Metadata: 'true'
},
json: 'force',
timeout: 30000 // 30 seconds limit for each request
});
this.tokenObject = payload;
return this.getAccessTokenString();
}

override tokenExpired(): boolean {
if (!this.tokenObject?.expires_in || !this.tokenObject?.expires_on)
return true;

const expiresIn = +this.tokenObject.expires_in;
let expiresOn = +this.tokenObject.expires_on;
if (isNaN(expiresIn) || isNaN(expiresOn))
return true;

const expiryWindow = expiresIn * 1000 * TOKEN_EXPIRATION_THRESHOLD_PERCENTAGE;
expiresOn = expiresOn * 1000;
return expiresOn < Date.now() + expiryWindow;
}

private getAccessTokenString(): string {
const accessToken = this.tokenObject?.access_token;

if (typeof accessToken !== 'string') {
throw new Error('Access token is not available');
}

return accessToken;
}
}

// internal/testing usage only
export {
AzureIMDSOAuthClientBuilder as _AzureIMDSOAuthClientBuilder,
AzureIMDSOAuthClient as _AzureIMDSOAuthClient,
AzureIMDSBearerToken as _AzureIMDSBearerToken
}
Loading