Skip to content

Commit 204c4bc

Browse files
authored
fix: issues with disconnecting and completing (reconbot#59)
Part of the DDB refactor messed up which keys we needed to remove a record. Upon investigation we had a "topic" as a range key because we used to allow multiple topics per subscription. I'm pulling that out for the time being. - add more logging and fixup the logging function's input - Better DDB logging - The logger function now always gets the object, the type change is additive so it's not a breaking change (I kind of want to move pino for logging and add a concept of log level) - DDB can now support range keys even though we don't use them BREAKING CHANGE: The subscriptions Table has has its range key removed. This will require a migration.
1 parent dbecec2 commit 204c4bc

23 files changed

+187
-201
lines changed

README.md

-4
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,6 @@ Connection
131131
ttl TTL
132132
Subscription
133133
id *String
134-
topic **String
135134
ttl TTL
136135
137136
@indexes
@@ -209,8 +208,6 @@ resources:
209208
KeySchema:
210209
- AttributeName: id
211210
KeyType: HASH
212-
- AttributeName: topic
213-
KeyType: RANGE
214211
GlobalSecondaryIndexes:
215212
- IndexName: ConnectionIndex
216213
KeySchema:
@@ -259,7 +256,6 @@ resource "aws_dynamodb_table" "subscriptions-table" {
259256
read_capacity = 1
260257
write_capacity = 1
261258
hash_key = "id"
262-
range_key = "topic"
263259

264260
attribute {
265261
name = "id"

docs/README.md

+3-24
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ graphql-lambda-subscriptions
2121
- [LoggerFunction](README.md#loggerfunction)
2222
- [MaybePromise](README.md#maybepromise)
2323
- [SubscribeArgs](README.md#subscribeargs)
24-
- [SubscriptionDefinition](README.md#subscriptiondefinition)
2524
- [SubscriptionFilter](README.md#subscriptionfilter)
2625
- [WebSocketResponse](README.md#websocketresponse)
2726

@@ -34,11 +33,11 @@ graphql-lambda-subscriptions
3433

3534
### LoggerFunction
3635

37-
Ƭ **LoggerFunction**: (`message`: `string`, `obj?`: `any`) => `void`
36+
Ƭ **LoggerFunction**: (`message`: `string`, `obj`: `Record`<`string`, `any`\>) => `void`
3837

3938
#### Type declaration
4039

41-
▸ (`message`, `obj?`): `void`
40+
▸ (`message`, `obj`): `void`
4241

4342
Log operational events with a logger of your choice. It will get a message and usually object with relevant data
4443

@@ -47,7 +46,7 @@ Log operational events with a logger of your choice. It will get a message and u
4746
| Name | Type |
4847
| :------ | :------ |
4948
| `message` | `string` |
50-
| `obj?` | `any` |
49+
| `obj` | `Record`<`string`, `any`\> |
5150

5251
##### Returns
5352

@@ -81,26 +80,6 @@ ___
8180

8281
___
8382

84-
### SubscriptionDefinition
85-
86-
Ƭ **SubscriptionDefinition**<`T`, `TSubscribeArgs`\>: `Object`
87-
88-
#### Type parameters
89-
90-
| Name | Type |
91-
| :------ | :------ |
92-
| `T` | extends [`PubSubEvent`](interfaces/PubSubEvent.md) |
93-
| `TSubscribeArgs` | extends [`SubscribeArgs`](README.md#subscribeargs)[`SubscribeArgs`](README.md#subscribeargs) |
94-
95-
#### Type declaration
96-
97-
| Name | Type |
98-
| :------ | :------ |
99-
| `filter?` | [`SubscriptionFilter`](README.md#subscriptionfilter)<`TSubscribeArgs`, `T`[``"payload"``]\> |
100-
| `topic` | `string` |
101-
102-
___
103-
10483
### SubscriptionFilter
10584

10685
Ƭ **SubscriptionFilter**<`TSubscribeArgs`, `TReturn`\>: `Partial`<`TReturn`\> \| `void` \| (...`args`: `TSubscribeArgs`) => [`MaybePromise`](README.md#maybepromise)<`Partial`<`TReturn`\> \| `void`\>

docs/interfaces/ServerArgs.md

+2-2
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ ___
108108

109109
### onConnectionInit
110110

111-
`Optional` **onConnectionInit**(`e`): [`MaybePromise`](../README.md#maybepromise)<`object`\>
111+
`Optional` **onConnectionInit**(`e`): [`MaybePromise`](../README.md#maybepromise)<`Record`<`string`, `any`\>\>
112112

113113
#### Parameters
114114

@@ -120,7 +120,7 @@ ___
120120

121121
#### Returns
122122

123-
[`MaybePromise`](../README.md#maybepromise)<`object`\>
123+
[`MaybePromise`](../README.md#maybepromise)<`Record`<`string`, `any`\>\>
124124

125125
___
126126

docs/interfaces/SubscribePseudoIterable.md

+10-3
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,8 @@
2929

3030
### Properties
3131

32-
- [topicDefinitions](SubscribePseudoIterable.md#topicdefinitions)
32+
- [filter](SubscribePseudoIterable.md#filter)
33+
- [topic](SubscribePseudoIterable.md#topic)
3334

3435
### Methods
3536

@@ -39,9 +40,15 @@
3940

4041
## Properties
4142

42-
### topicDefinitions
43+
### filter
4344

44-
**topicDefinitions**: [`SubscriptionDefinition`](../README.md#subscriptiondefinition)<`T`, `TSubscribeArgs`\>[]
45+
`Optional` **filter**: [`SubscriptionFilter`](../README.md#subscriptionfilter)<`TSubscribeArgs`, `T`[``"payload"``]\>
46+
47+
___
48+
49+
### topic
50+
51+
**topic**: `string`
4552

4653
## Methods
4754

lib/makeServerClosure.ts renamed to lib/buildServerClosure.ts

+4-4
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
1-
import { ServerArgs, ServerClosure, Connection, Subscription } from './types'
1+
import { ServerArgs, ServerClosure } from './types'
22
import { DDB } from './ddb/DDB'
33
import { log as debugLogger } from './utils/logger'
44

5-
export const makeServerClosure = async (opts: ServerArgs): Promise<ServerClosure> => {
5+
export const buildServerClosure = async (opts: ServerArgs): Promise<ServerClosure> => {
66
const {
77
tableNames,
88
log = debugLogger,
@@ -19,8 +19,8 @@ export const makeServerClosure = async (opts: ServerArgs): Promise<ServerClosure
1919
dynamodb: dynamodb,
2020
log,
2121
models: {
22-
subscription: DDB<Subscription>({ dynamodb, tableName: (await tableNames)?.subscriptions || 'graphql_subscriptions', log }),
23-
connection: DDB<Connection>({ dynamodb, tableName: (await tableNames)?.connections || 'graphql_connections', log }),
22+
subscription: DDB({ dynamodb, tableName: (await tableNames)?.subscriptions || 'graphql_subscriptions', log }),
23+
connection: DDB({ dynamodb, tableName: (await tableNames)?.connections || 'graphql_connections', log }),
2424
},
2525
}
2626
}

lib/ddb/DDB.ts

+89-57
Original file line numberDiff line numberDiff line change
@@ -1,93 +1,125 @@
11
import { DynamoDB } from 'aws-sdk'
22
import { LoggerFunction, DDBType } from '../types'
33

4-
export interface DDBClient<T extends DDBType> {
5-
get: (id: string) => Promise<T|null>
6-
put: (Item: T) => Promise<T>
7-
update: (id: string, obj: Partial<T>) => Promise<T>
8-
delete: (id: string) => Promise<T>
4+
export interface DDBClient<T extends DDBType, TKey> {
5+
get: (Key: TKey) => Promise<T|null>
6+
put: (obj: T) => Promise<T>
7+
update: (Key: TKey, obj: Partial<T>) => Promise<T>
8+
delete: (Key: TKey) => Promise<T>
99
query: (options: Omit<DynamoDB.DocumentClient.QueryInput, 'TableName' | 'Select'>) => AsyncGenerator<T, void, undefined>
1010
}
1111

12-
export const DDB = <T extends DDBType>({
12+
export const DDB = <T extends DDBType, TKey>({
1313
dynamodb,
1414
tableName,
1515
log,
1616
}: {
1717
dynamodb: DynamoDB
1818
tableName: string
1919
log: LoggerFunction
20-
}): DDBClient<T> => {
20+
}): DDBClient<T, TKey> => {
2121
const documentClient = new DynamoDB.DocumentClient({ service: dynamodb })
2222

23-
const get = async (id: string): Promise<null | T> => {
24-
log('get', { tableName: tableName, id })
25-
const { Item } = await documentClient.get({
26-
TableName: tableName,
27-
Key: { id },
28-
}).promise()
29-
return (Item as T) ?? null
23+
const get = async (Key: TKey): Promise<null | T> => {
24+
log('get', { tableName: tableName, Key })
25+
try {
26+
const { Item } = await documentClient.get({
27+
TableName: tableName,
28+
Key,
29+
}).promise()
30+
log('get:result', { Item })
31+
return (Item as T) ?? null
32+
} catch (e) {
33+
log('get:error', e)
34+
throw e
35+
}
3036
}
3137

3238
const put = async (Item: T): Promise<T> => {
3339
log('put', { tableName: tableName, Item })
34-
const { Attributes } = await documentClient.put({
35-
TableName: tableName,
36-
Item,
37-
ReturnValues: 'ALL_OLD',
38-
}).promise()
39-
return Attributes as T
40+
try {
41+
const { Attributes } = await documentClient.put({
42+
TableName: tableName,
43+
Item,
44+
ReturnValues: 'ALL_OLD',
45+
}).promise()
46+
return Attributes as T
47+
} catch (e) {
48+
log('put:error', e)
49+
throw e
50+
}
4051
}
4152

42-
const update = async (id: string, obj: Partial<T>) => {
43-
const AttributeUpdates = Object.entries(obj)
44-
.map(([key, Value]) => ({ [key]: { Value, Action: 'PUT' } }))
45-
.reduce((memo, val) => ({ ...memo, ...val }))
53+
const update = async (Key: TKey, obj: Partial<T>) => {
54+
log('update', { tableName: tableName, Key, obj })
55+
try {
56+
const AttributeUpdates = Object.entries(obj)
57+
.map(([key, Value]) => ({ [key]: { Value, Action: 'PUT' } }))
58+
.reduce((memo, val) => ({ ...memo, ...val }))
4659

47-
const { Attributes } = await documentClient.update({
48-
TableName: tableName,
49-
Key: { id },
50-
AttributeUpdates,
51-
ReturnValues: 'ALL_NEW',
52-
}).promise()
53-
return Attributes as T
60+
const { Attributes } = await documentClient.update({
61+
TableName: tableName,
62+
Key,
63+
AttributeUpdates,
64+
ReturnValues: 'ALL_NEW',
65+
}).promise()
66+
return Attributes as T
67+
} catch (e) {
68+
log('update:error', e)
69+
throw e
70+
}
5471
}
5572

56-
const deleteFunction = async (id: string): Promise<T> => {
57-
const { Attributes } = await documentClient.delete({
58-
TableName: tableName,
59-
Key: { id },
60-
ReturnValues: 'ALL_OLD',
61-
}).promise()
62-
return Attributes as T
73+
const deleteFunction = async (Key: TKey): Promise<T> => {
74+
log('delete', { tableName: tableName, Key })
75+
try {
76+
const { Attributes } = await documentClient.delete({
77+
TableName: tableName,
78+
Key,
79+
ReturnValues: 'ALL_OLD',
80+
}).promise()
81+
return Attributes as T
82+
} catch (e) {
83+
log('delete:error', e)
84+
throw e
85+
}
6386
}
6487

6588
const queryOnce = async (options: Omit<DynamoDB.DocumentClient.QueryInput, 'TableName' | 'Select'>) => {
66-
log('queryOnce', options)
67-
68-
const response = await documentClient.query({
69-
TableName: tableName,
70-
Select: 'ALL_ATTRIBUTES',
71-
...options,
72-
}).promise()
89+
log('queryOnce', { tableName: tableName, options })
90+
try {
91+
const response = await documentClient.query({
92+
TableName: tableName,
93+
Select: 'ALL_ATTRIBUTES',
94+
...options,
95+
}).promise()
7396

74-
const { Items, LastEvaluatedKey, Count } = response
75-
return {
76-
items: (Items ?? []) as T[],
77-
lastEvaluatedKey: LastEvaluatedKey,
78-
count: Count ?? 0,
97+
const { Items, LastEvaluatedKey, Count } = response
98+
return {
99+
items: (Items ?? []) as T[],
100+
lastEvaluatedKey: LastEvaluatedKey,
101+
count: Count ?? 0,
102+
}
103+
} catch (e) {
104+
log('queryOnce:error', e)
105+
throw e
79106
}
80107
}
81108

82109
async function* query(options: Omit<DynamoDB.DocumentClient.QueryInput, 'TableName' | 'Select'>) {
83-
log('query', options)
84-
const results = await queryOnce(options)
85-
yield* results.items
86-
let lastEvaluatedKey = results.lastEvaluatedKey
87-
while (lastEvaluatedKey) {
88-
const results = await queryOnce({ ...options, ExclusiveStartKey: lastEvaluatedKey })
110+
log('query', { tableName: tableName, options })
111+
try {
112+
const results = await queryOnce(options)
89113
yield* results.items
90-
lastEvaluatedKey = results.lastEvaluatedKey
114+
let lastEvaluatedKey = results.lastEvaluatedKey
115+
while (lastEvaluatedKey) {
116+
const results = await queryOnce({ ...options, ExclusiveStartKey: lastEvaluatedKey })
117+
yield* results.items
118+
lastEvaluatedKey = results.lastEvaluatedKey
119+
}
120+
} catch (e) {
121+
log('query:error', e)
122+
throw e
91123
}
92124
}
93125

lib/handleStepFunctionEvent.ts

+2-2
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ export const handleStepFunctionEvent = (serverPromise: Promise<ServerClosure>):
1212
// Initial state - send ping message
1313
if (input.state === 'PING') {
1414
await postToConnection(server)({ ...input, message: { type: MessageType.Ping } })
15-
await server.models.connection.update(input.connectionId, { hasPonged: false })
15+
await server.models.connection.update({ id: input.connectionId }, { hasPonged: false })
1616
return {
1717
...input,
1818
state: 'REVIEW',
@@ -21,7 +21,7 @@ export const handleStepFunctionEvent = (serverPromise: Promise<ServerClosure>):
2121
}
2222

2323
// Follow up state - check if pong was returned
24-
const conn = await server.models.connection.get(input.connectionId)
24+
const conn = await server.models.connection.get({ id: input.connectionId })
2525
if (conn?.hasPonged) {
2626
return {
2727
...input,

lib/handleWebSocketEvent.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import { pong } from './messages/pong'
1010
export const handleWebSocketEvent = (serverPromise: Promise<ServerClosure>): SubscriptionServer['webSocketHandler'] => async (event) => {
1111
const server = await serverPromise
1212
if (!event.requestContext) {
13-
server.log('handleWebSocketEvent unknown')
13+
server.log('handleWebSocketEvent unknown', { event })
1414
return {
1515
statusCode: 200,
1616
body: '',

lib/index.ts

+2-3
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,10 @@ import { publish } from './pubsub/publish'
33
import { complete } from './pubsub/complete'
44
import { handleWebSocketEvent } from './handleWebSocketEvent'
55
import { handleStepFunctionEvent } from './handleStepFunctionEvent'
6-
import { makeServerClosure } from './makeServerClosure'
6+
import { buildServerClosure } from './buildServerClosure'
77

88
export const makeServer = (opts: ServerArgs): SubscriptionServer => {
9-
const closure: Promise<ServerClosure> = makeServerClosure(opts)
9+
const closure: Promise<ServerClosure> = buildServerClosure(opts)
1010

1111
return {
1212
webSocketHandler: handleWebSocketEvent(closure),
@@ -32,7 +32,6 @@ export {
3232
WebSocketResponse,
3333
StateFunctionInput,
3434
PubSubEvent,
35-
SubscriptionDefinition,
3635
SubscriptionFilter,
3736
Connection,
3837
Subscription,

lib/messages/complete.ts

+2-2
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ export const complete: MessageHandler<CompleteMessage> =
1313
async ({ server, event, message }) => {
1414
server.log('messages:complete', { connectionId: event.requestContext.connectionId })
1515
try {
16-
const subscription = await server.models.subscription.get(`${event.requestContext.connectionId}|${message.id}`)
16+
const subscription = await server.models.subscription.get({ id: `${event.requestContext.connectionId}|${message.id}` })
1717
if (!subscription) {
1818
return
1919
}
@@ -37,7 +37,7 @@ export const complete: MessageHandler<CompleteMessage> =
3737
server.log('messages:complete:onComplete', { onComplete: !!onComplete })
3838
await onComplete?.(root, args, context, info)
3939

40-
await server.models.subscription.delete(subscription.id)
40+
await server.models.subscription.delete({ id: subscription.id })
4141
} catch (err) {
4242
server.log('messages:complete:onError', { err, event })
4343
await server.onError?.(err, { event, message })

0 commit comments

Comments
 (0)