Skip to content

Commit c51bb05

Browse files
authored
Fix heartbeat cached channels and groups (#447)
fix(shared-worker): fix heartbeat cached channels and groups Fix issue because of which channels and groups aggregated inside PubNub client state objects and didn't clean up properly on unsubscribe / invalidate. refactor(subscription-set): refactor subscriptions manipulation Refactor how subscription set updated with additional / removal other subscriptions and sets.
1 parent 71ba3a1 commit c51bb05

File tree

15 files changed

+277
-177
lines changed

15 files changed

+277
-177
lines changed

.pubnub.yml

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
11
---
22
changelog:
3+
- date: 2025-03-25
4+
version: v9.3.1
5+
changes:
6+
- type: bug
7+
text: "Fix issue because of which channels and groups aggregated inside PubNub client state objects and didn't clean up properly on unsubscribe / invalidate."
38
- date: 2025-03-20
49
version: v9.3.0
510
changes:
@@ -1192,7 +1197,7 @@ supported-platforms:
11921197
- 'Ubuntu 14.04 and up'
11931198
- 'Windows 7 and up'
11941199
version: 'Pubnub Javascript for Node'
1195-
version: '9.3.0'
1200+
version: '9.3.1'
11961201
sdks:
11971202
- full-name: PubNub Javascript SDK
11981203
short-name: Javascript
@@ -1208,7 +1213,7 @@ sdks:
12081213
- distribution-type: source
12091214
distribution-repository: GitHub release
12101215
package-name: pubnub.js
1211-
location: https://github.com/pubnub/javascript/archive/refs/tags/v9.3.0.zip
1216+
location: https://github.com/pubnub/javascript/archive/refs/tags/v9.3.1.zip
12121217
requires:
12131218
- name: 'agentkeepalive'
12141219
min-version: '3.5.2'
@@ -1879,7 +1884,7 @@ sdks:
18791884
- distribution-type: library
18801885
distribution-repository: GitHub release
18811886
package-name: pubnub.js
1882-
location: https://github.com/pubnub/javascript/releases/download/v9.3.0/pubnub.9.3.0.js
1887+
location: https://github.com/pubnub/javascript/releases/download/v9.3.1/pubnub.9.3.1.js
18831888
requires:
18841889
- name: 'agentkeepalive'
18851890
min-version: '3.5.2'

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,9 @@
1+
## v9.3.1
2+
March 25 2025
3+
4+
#### Fixed
5+
- Fix issue because of which channels and groups aggregated inside PubNub client state objects and didn't clean up properly on unsubscribe / invalidate.
6+
17
## v9.3.0
28
March 20 2025
39

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@ Watch [Getting Started with PubNub JS SDK](https://app.dashcam.io/replay/64ee0d2
2727
npm install pubnub
2828
```
2929
* or download one of our builds from our CDN:
30-
* https://cdn.pubnub.com/sdk/javascript/pubnub.9.3.0.js
31-
* https://cdn.pubnub.com/sdk/javascript/pubnub.9.3.0.min.js
30+
* https://cdn.pubnub.com/sdk/javascript/pubnub.9.3.1.js
31+
* https://cdn.pubnub.com/sdk/javascript/pubnub.9.3.1.min.js
3232
3333
2. Configure your keys:
3434

dist/web/pubnub.js

Lines changed: 38 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -3939,7 +3939,7 @@
39393939
return base.PubNubFile;
39403940
},
39413941
get version() {
3942-
return '9.3.0';
3942+
return '9.3.1';
39433943
},
39443944
getVersion() {
39453945
return this.version;
@@ -10424,18 +10424,11 @@
1042410424
this.options = subscriptionOptions;
1042510425
this.eventEmitter = eventEmitter;
1042610426
this.pubnub = pubnub;
10427-
channels.forEach((c) => {
10428-
const subscription = this.pubnub.channel(c).subscription(this.options);
10429-
this.channelNames = [...this.channelNames, ...subscription.channels];
10430-
this.subscriptionList.push(subscription);
10431-
});
10432-
channelGroups.forEach((cg) => {
10433-
const subscription = this.pubnub.channelGroup(cg).subscription(this.options);
10434-
this.groupNames = [...this.groupNames, ...subscription.channelGroups];
10435-
this.subscriptionList.push(subscription);
10436-
});
10427+
channels.forEach((c) => this.subscriptionList.push(this.pubnub.channel(c).subscription(this.options)));
10428+
channelGroups.forEach((cg) => this.subscriptionList.push(this.pubnub.channelGroup(cg).subscription(this.options)));
1043710429
this.typeBasedListener = {};
1043810430
this.typeBasedListenerId = eventEmitter.addListener(this.typeBasedListener, this.channelNames, this.groupNames);
10431+
this.updateListeners();
1043910432
}
1044010433
/**
1044110434
* Add additional entity's subscription to the subscription set.
@@ -10446,10 +10439,8 @@
1044610439
* @param subscription - Other entity's subscription object, which should be added.
1044710440
*/
1044810441
addSubscription(subscription) {
10449-
this.subscriptionList.push(subscription);
10450-
this.channelNames = [...this.channelNames, ...subscription.channels];
10451-
this.groupNames = [...this.groupNames, ...subscription.channelGroups];
10452-
this.eventEmitter.addListener(this.typeBasedListener, subscription.channels, subscription.channelGroups, this.typeBasedListenerId);
10442+
if (!this.subscriptionList.includes(subscription))
10443+
this.subscriptionList.push(subscription);
1045310444
// Make sure to listen events on channels / groups added with `subscription`.
1045410445
this.updateListeners();
1045510446
// Subscribe subscription object if subscription set already subscribed.
@@ -10469,12 +10460,7 @@
1046910460
* @param subscription - Other entity's subscription object, which should be removed.
1047010461
*/
1047110462
removeSubscription(subscription) {
10472-
const channelsToRemove = subscription.channels;
10473-
const groupsToRemove = subscription.channelGroups;
10474-
this.channelNames = this.channelNames.filter((c) => !channelsToRemove.includes(c));
10475-
this.groupNames = this.groupNames.filter((cg) => !groupsToRemove.includes(cg));
10476-
this.subscriptionList = this.subscriptionList.filter((s) => s !== subscription);
10477-
this.eventEmitter.removeListener(this.typeBasedListener, this.typeBasedListenerId, channelsToRemove, groupsToRemove);
10463+
this.subscriptionList = this.subscriptionList.filter((sub) => sub !== subscription);
1047810464
// Make sure to stop listening for events from channels / groups removed with `subscription`.
1047910465
this.updateListeners();
1048010466
// @ts-expect-error: Required access of protected field.
@@ -10490,10 +10476,7 @@
1049010476
* @param subscriptionSet - Other entities' subscription set, which should be joined.
1049110477
*/
1049210478
addSubscriptionSet(subscriptionSet) {
10493-
this.subscriptionList = [...this.subscriptionList, ...subscriptionSet.subscriptions];
10494-
this.channelNames = [...this.channelNames, ...subscriptionSet.channels];
10495-
this.groupNames = [...this.groupNames, ...subscriptionSet.channelGroups];
10496-
this.eventEmitter.addListener(this.typeBasedListener, subscriptionSet.channels, subscriptionSet.channelGroups, this.typeBasedListenerId);
10479+
this.subscriptionList = Array.from(new Set([...this.subscriptionList, ...subscriptionSet.subscriptions]));
1049710480
// Make sure to listen events on channels / groups added with `subscription set`.
1049810481
this.updateListeners();
1049910482
// Subscribe subscription object if subscription set already subscribed.
@@ -10511,12 +10494,7 @@
1051110494
* @param subscriptionSet - Other entities' subscription set, which should be subtracted.
1051210495
*/
1051310496
removeSubscriptionSet(subscriptionSet) {
10514-
const channelsToRemove = subscriptionSet.channels;
10515-
const groupsToRemove = subscriptionSet.channelGroups;
10516-
this.channelNames = this.channelNames.filter((c) => !channelsToRemove.includes(c));
10517-
this.groupNames = this.groupNames.filter((cg) => !groupsToRemove.includes(cg));
10518-
this.subscriptionList = this.subscriptionList.filter((s) => !subscriptionSet.subscriptions.includes(s));
10519-
this.eventEmitter.removeListener(this.typeBasedListener, this.typeBasedListenerId, channelsToRemove, groupsToRemove);
10497+
this.subscriptionList = this.subscriptionList.filter((sub) => !subscriptionSet.subscriptions.includes(sub));
1052010498
// Make sure to stop listening for events from channels / groups removed with `subscription set`.
1052110499
this.updateListeners();
1052210500
if (subscriptionSet.subscribedAutomatically)
@@ -10540,11 +10518,36 @@
1054010518
* @internal
1054110519
*/
1054210520
updateListeners() {
10543-
if (!this.aggregatedListener)
10544-
return;
10521+
// Actual list of channels and groups.
10522+
const channelGroups = [];
10523+
const channels = [];
10524+
// Gather actual information about active channels / groups.
10525+
this.subscriptionList.forEach((subscription) => {
10526+
if (subscription.channelGroups.length)
10527+
channelGroups.push(...subscription.channelGroups);
10528+
if (subscription.channels.length)
10529+
channels.push(...subscription.channels);
10530+
});
10531+
// Identify channels / groups which should be added / removed.
10532+
const channelsToRemove = this.channelNames.filter((channel) => !channels.includes(channel));
10533+
const groupsToRemove = this.groupNames.filter((group) => !channelGroups.includes(group));
10534+
const channelsToAdd = channels.filter((channel) => !this.channelNames.includes(channel));
10535+
const groupsToAdd = channelGroups.filter((group) => !this.groupNames.includes(group));
10536+
// Removing type-based listener for unused channels / groups.
10537+
if (channelsToRemove.length || groupsToRemove.length) {
10538+
this.eventEmitter.removeListener(this.typeBasedListener, this.typeBasedListenerId, channelsToRemove, groupsToRemove);
10539+
}
10540+
// Adding type-based listener for unused channels / groups.
10541+
if (channelsToAdd.length || groupsToAdd.length)
10542+
this.eventEmitter.addListener(this.typeBasedListener, channelsToAdd, groupsToAdd, this.typeBasedListenerId);
1054510543
const aggregatedListener = this.aggregatedListener;
10546-
this.removeListener(this.aggregatedListener);
10547-
this.addListener(aggregatedListener);
10544+
if (aggregatedListener)
10545+
this.removeListener(aggregatedListener);
10546+
// Set actual list of channels and groups.
10547+
this.groupNames = channelGroups;
10548+
this.channelNames = channels;
10549+
if (aggregatedListener)
10550+
this.addListener(aggregatedListener);
1054810551
}
1054910552
}
1055010553

dist/web/pubnub.min.js

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

dist/web/pubnub.worker.js

Lines changed: 60 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -404,7 +404,7 @@
404404
const heartbeatRequestKey = `${client.userId}_${(_a = clientAggregateAuthKey(client)) !== null && _a !== void 0 ? _a : ''}`;
405405
const hbRequestsBySubscriptionKey = serviceHeartbeatRequests[client.subscriptionKey];
406406
const hbRequests = (hbRequestsBySubscriptionKey !== null && hbRequestsBySubscriptionKey !== void 0 ? hbRequestsBySubscriptionKey : {})[heartbeatRequestKey];
407-
notifyRequestProcessing('start', [client], new Date().toISOString(), request);
407+
notifyRequestProcessing('start', [client], new Date().toISOString(), event.request);
408408
if (!request) {
409409
consoleLog(`Previous heartbeat request has been sent less than ${client.heartbeatInterval} seconds ago. Skipping...`, client);
410410
let response;
@@ -446,22 +446,31 @@
446446
* @param [client] - Specific client to handle leave request.
447447
*/
448448
const handleSendLeaveRequestEvent = (data, client) => {
449+
var _a, _b;
450+
var _c;
449451
client = client !== null && client !== void 0 ? client : pubNubClients[data.clientIdentifier];
450452
const request = leaveTransportRequestFromEvent(data);
451453
if (!client)
452454
return;
453455
// Clean up client subscription information if there is no more channels / groups to use.
454-
const { subscription } = client;
456+
const { subscription, heartbeat } = client;
455457
const serviceRequestId = subscription === null || subscription === void 0 ? void 0 : subscription.serviceRequestId;
456-
if (subscription) {
457-
if (subscription.channels.length === 0 && subscription.channelGroups.length === 0) {
458-
subscription.channelGroupQuery = '';
459-
subscription.path = '';
460-
subscription.previousTimetoken = '0';
461-
subscription.timetoken = '0';
462-
delete subscription.region;
463-
delete subscription.serviceRequestId;
464-
delete subscription.request;
458+
if (subscription && subscription.channels.length === 0 && subscription.channelGroups.length === 0) {
459+
subscription.channelGroupQuery = '';
460+
subscription.path = '';
461+
subscription.previousTimetoken = '0';
462+
subscription.timetoken = '0';
463+
delete subscription.region;
464+
delete subscription.serviceRequestId;
465+
delete subscription.request;
466+
}
467+
if (serviceHeartbeatRequests[client.subscriptionKey]) {
468+
if (heartbeat && heartbeat.channels.length === 0 && heartbeat.channelGroups.length === 0) {
469+
const hbRequestsBySubscriptionKey = ((_a = serviceHeartbeatRequests[_c = client.subscriptionKey]) !== null && _a !== void 0 ? _a : (serviceHeartbeatRequests[_c] = {}));
470+
const heartbeatRequestKey = `${client.userId}_${(_b = clientAggregateAuthKey(client)) !== null && _b !== void 0 ? _b : ''}`;
471+
if (hbRequestsBySubscriptionKey[heartbeatRequestKey] &&
472+
hbRequestsBySubscriptionKey[heartbeatRequestKey].clientIdentifier === client.clientIdentifier)
473+
delete hbRequestsBySubscriptionKey[heartbeatRequestKey].clientIdentifier;
465474
}
466475
}
467476
if (!request) {
@@ -837,15 +846,16 @@
837846
return undefined;
838847
const hbRequestsBySubscriptionKey = ((_a = serviceHeartbeatRequests[_e = client.subscriptionKey]) !== null && _a !== void 0 ? _a : (serviceHeartbeatRequests[_e] = {}));
839848
const heartbeatRequestKey = `${client.userId}_${(_b = clientAggregateAuthKey(client)) !== null && _b !== void 0 ? _b : ''}`;
840-
const channelGroupsForAnnouncement = client.heartbeat.channelGroups;
841-
const channelsForAnnouncement = client.heartbeat.channels;
849+
const channelGroupsForAnnouncement = [...client.heartbeat.channelGroups];
850+
const channelsForAnnouncement = [...client.heartbeat.channels];
842851
let aggregatedState;
843852
let failedPreviousRequest = false;
844853
let aggregated;
845854
if (!hbRequestsBySubscriptionKey[heartbeatRequestKey]) {
846855
hbRequestsBySubscriptionKey[heartbeatRequestKey] = {
847856
channels: channelsForAnnouncement,
848857
channelGroups: channelGroupsForAnnouncement,
858+
clientIdentifier: client.clientIdentifier,
849859
timestamp: Date.now(),
850860
};
851861
aggregatedState = (_c = client.heartbeat.presenceState) !== null && _c !== void 0 ? _c : {};
@@ -866,7 +876,10 @@
866876
if (client.heartbeatInterval)
867877
minimumHeartbeatInterval = Math.min(minimumHeartbeatInterval, client.heartbeatInterval);
868878
}
869-
if (aggregated) {
879+
// Check whether multiple instance aggregate heartbeat and there is previous sender known.
880+
// `clientIdentifier` maybe empty in case if client which triggered heartbeats before has been invalidated and new
881+
// should handle heartbeat unconditionally.
882+
if (aggregated && hbRequestsBySubscriptionKey[heartbeatRequestKey].clientIdentifier) {
870883
const expectedTimestamp = hbRequestsBySubscriptionKey[heartbeatRequestKey].timestamp + minimumHeartbeatInterval * 1000;
871884
const currentTimestamp = Date.now();
872885
// Check whether it is too soon to send request or not (5 is leeway which let send request a bit earlier).
@@ -875,10 +888,11 @@
875888
return undefined;
876889
}
877890
delete hbRequestsBySubscriptionKey[heartbeatRequestKey].response;
891+
hbRequestsBySubscriptionKey[heartbeatRequestKey].clientIdentifier = client.clientIdentifier;
878892
// Aggregate channels for similar clients which is pending for heartbeat.
879-
for (const client of clients) {
880-
const { heartbeat } = client;
881-
if (heartbeat === undefined || client.clientIdentifier === event.clientIdentifier)
893+
for (const _client of clients) {
894+
const { heartbeat } = _client;
895+
if (heartbeat === undefined || _client.clientIdentifier === event.clientIdentifier)
882896
continue;
883897
// Append presence state from the client (will override previously set value if already set).
884898
if (heartbeat.presenceState)
@@ -894,10 +908,13 @@
894908
if (!channelsForAnnouncement.includes(objectName) && !channelGroupsForAnnouncement.includes(objectName))
895909
delete aggregatedState[objectName];
896910
}
911+
// No need to try send request with empty list of channels and groups.
912+
if (channelsForAnnouncement.length === 0 && channelGroupsForAnnouncement.length === 0)
913+
return undefined;
897914
// Update request channels list (if required).
898-
if (channelsForAnnouncement.length) {
915+
if (channelsForAnnouncement.length || channelGroupsForAnnouncement.length) {
899916
const pathComponents = request.path.split('/');
900-
pathComponents[6] = channelsForAnnouncement.join(',');
917+
pathComponents[6] = channelsForAnnouncement.length ? channelsForAnnouncement.join(',') : ',';
901918
request.path = pathComponents.join('/');
902919
}
903920
// Update request channel groups list (if required).
@@ -927,6 +944,7 @@
927944
* done.
928945
*/
929946
const leaveTransportRequestFromEvent = (event) => {
947+
var _a;
930948
const client = pubNubClients[event.clientIdentifier];
931949
const clients = clientsForSendLeaveRequestEvent(event);
932950
let channelGroups = channelGroupsFromRequest(event.request);
@@ -977,6 +995,20 @@
977995
}
978996
return undefined;
979997
}
998+
// Update aggregated heartbeat state object.
999+
if (client && serviceHeartbeatRequests[client.subscriptionKey] && (channels.length || channelGroups.length)) {
1000+
const hbRequestsBySubscriptionKey = serviceHeartbeatRequests[client.subscriptionKey];
1001+
const heartbeatRequestKey = `${client.userId}_${(_a = clientAggregateAuthKey(client)) !== null && _a !== void 0 ? _a : ''}`;
1002+
if (hbRequestsBySubscriptionKey[heartbeatRequestKey]) {
1003+
let { channels: hbChannels, channelGroups: hbChannelGroups } = hbRequestsBySubscriptionKey[heartbeatRequestKey];
1004+
if (channelGroups.length)
1005+
hbChannelGroups = hbChannelGroups.filter((group) => !channels.includes(group));
1006+
if (channels.length)
1007+
hbChannels = hbChannels.filter((channel) => !channels.includes(channel));
1008+
hbRequestsBySubscriptionKey[heartbeatRequestKey].channelGroups = hbChannelGroups;
1009+
hbRequestsBySubscriptionKey[heartbeatRequestKey].channels = hbChannels;
1010+
}
1011+
}
9801012
// Update request channels list (if required).
9811013
if (channels.length) {
9821014
const pathComponents = request.path.split('/');
@@ -1419,7 +1451,7 @@
14191451
* @param clientId - Unique PubNub client identifier.
14201452
*/
14211453
const invalidateClient = (subscriptionKey, clientId) => {
1422-
var _a;
1454+
var _a, _b, _c;
14231455
const invalidatedClient = pubNubClients[clientId];
14241456
delete pubNubClients[clientId];
14251457
let clients = pubNubClientsBySubscriptionKey[subscriptionKey];
@@ -1432,6 +1464,13 @@
14321464
if (serviceRequestId)
14331465
cancelRequest(serviceRequestId);
14341466
}
1467+
if (serviceHeartbeatRequests[subscriptionKey]) {
1468+
const hbRequestsBySubscriptionKey = ((_a = serviceHeartbeatRequests[subscriptionKey]) !== null && _a !== void 0 ? _a : (serviceHeartbeatRequests[subscriptionKey] = {}));
1469+
const heartbeatRequestKey = `${invalidatedClient.userId}_${(_b = clientAggregateAuthKey(invalidatedClient)) !== null && _b !== void 0 ? _b : ''}`;
1470+
if (hbRequestsBySubscriptionKey[heartbeatRequestKey] &&
1471+
hbRequestsBySubscriptionKey[heartbeatRequestKey].clientIdentifier === invalidatedClient.clientIdentifier)
1472+
delete hbRequestsBySubscriptionKey[heartbeatRequestKey].clientIdentifier;
1473+
}
14351474
// Leave subscribed channels / groups properly.
14361475
if (invalidatedClient.unsubscribeOfflineClients)
14371476
unsubscribeClient(invalidatedClient);
@@ -1460,7 +1499,7 @@
14601499
else
14611500
delete sharedWorkerClients[subscriptionKey];
14621501
}
1463-
const message = `Invalidate '${clientId}' client. '${((_a = pubNubClientsBySubscriptionKey[subscriptionKey]) !== null && _a !== void 0 ? _a : []).length}' clients currently active.`;
1502+
const message = `Invalidate '${clientId}' client. '${((_c = pubNubClientsBySubscriptionKey[subscriptionKey]) !== null && _c !== void 0 ? _c : []).length}' clients currently active.`;
14641503
if (!clients)
14651504
consoleLog(message);
14661505
else

0 commit comments

Comments
 (0)