Skip to content

Commit 02507d6

Browse files
authored
feat: allow runtime errors and validation errors in onSubscribe (reconbot#36)
Change the API of `onSubscribe` to allow returning validation errors (to reject a subscription) and throwing errors to close the connection (usually only happens if there's a big problem). This came from talking it over at graphql-ws enisdenjo/graphql-ws#226 bonus - add a timeout for test helpers
1 parent 8f64a5d commit 02507d6

File tree

9 files changed

+174
-117
lines changed

9 files changed

+174
-117
lines changed

README.md

+25-31
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,33 @@
11
# Graphql Lambda Subscriptions
22
[![Release](https://github.com/reconbot/graphql-lambda-subscriptions/actions/workflows/test.yml/badge.svg)](https://github.com/reconbot/graphql-lambda-subscriptions/actions/workflows/test.yml)
33

4-
This is a fork of [subscriptionless](https://github.com/andyrichardson/subscriptionless) that is built to work with [Architect](https://arc.codes) and tested with the [Architect Sandbox](https://arc.codes/docs/en/reference/cli/sandbox). There's no reason why it wont work with Serverless or other deploy tools but their support is not a goal.
4+
This is a fork of [`subscriptionless`](https://github.com/andyrichardson/subscriptionless) and is a Amazon Lambda Serverless equivalent to [graphQL-ws](https://github.com/enisdenjo/graphql-ws). It follows the [`graphql-ws prototcol`](https://github.com/enisdenjo/graphql-ws/blob/master/PROTOCOL.md). It is tested with the [Architect Sandbox](https://arc.codes/docs/en/reference/cli/sandbox) against `graphql-ws` directly and run in production today. For many applications `graphql-lambda-subscriptions` should do what `graphql-ws` does for you today without having to run a server.
5+
6+
As `subscriptionless`'s tagline goes;
7+
8+
> Have all the functionality of GraphQL subscriptions on a stateful server without the cost.
9+
10+
## Why a fork?
11+
12+
I had different requirements and needed more features. This project wouldn't exist without `subscriptionless` and you should totally check it out.
13+
14+
## Features
15+
16+
- Only needs DynamoDB, API Gateway and Lambda (no app sync or other platform required, can use step functions for ping/pong support)
17+
- Provides a Pub/Sub system to broadcast events to subscriptions
18+
- Provides hooks for the full lifecycle of a subscription
19+
- Type compatible with GraphQL and [`nexus.js`](https://nexusjs.org)
20+
21+
## Quick Start
22+
23+
Since there are many ways to deploy to amazon lambda I'm going to have to get opinionated in the quickstart and pick [Architect](https://arc.codes). `graphql-lambda-subscriptions` should work on Lambda regardless of your deployment and packaging framework. Take a look at the [arc-basic-events](mocks/arc-basic-events) mock used for integration testing for an example of using it with Architect.
24+
25+
More to come...
526

627
## API
728

29+
This should be generated...
30+
831
### `subscribe(topic: string, options?: SubscribeOptions): SubscribePseudoIterable`
932

1033
Subscribe is the most important method in the library. It's the primary difference between `graphql-ws` and `graphql-lambda-subscriptions`. It returns a `SubscribePseudoIterable` that pretends to be an async iterator that you put on the `subscribe` resolver for your Subscription. In reality it includes a few properties that we use to subscribe to events and fire lifecycle functions.
@@ -20,40 +43,12 @@ interface SubscribeOptions {
2043

2144
- `topic`: The you subscribe to the topic and can filter based upon the topics payload.
2245
- `filter`: An object that the payload will be matched against (or a function that produces the object). If the payload's field matches the subscription will receive the event. If the payload is missing the field the subscription will receive the event.
23-
- `onSubscribe`: A function that gets the subscription information (like arguments) it can throw if you don't want the subscription to subscribe.
46+
- `onSubscribe`: A function that gets the subscription information (like arguments) it can return an array of errors if you don't want the subscription to subscribe.
2447
- `onAfterSubscribe`: A function that gets the subscription information (like arguments) and can fire initial events or record information.
2548
- `onComplete`: A function that fires at least once when a connection disconnects, a client sends a "complete" message, or the server sends a "complete" message. Because of the nature of aws lambda, it's possible for a client to send a "complete" message and disconnect and those events executing on lambda out of order. Which why this function can be called up to twice.
2649

2750
## Old Readme
2851

29-
## About
30-
31-
GraphQL subscriptions for AWS Lambda and API Gateway WebSockets.
32-
33-
Have all the functionality of GraphQL subscriptions on a stateful server without the cost.
34-
35-
> Note: This project uses the [graphql-ws protocol](https://github.com/enisdenjo/graphql-ws) under the hood.
36-
37-
## ⚠️ Limitations
38-
39-
Seriously, **read this first** before you even think about using this.
40-
41-
<details>
42-
43-
<summary>This is in alpha</summary>
44-
45-
This is Alpha software and should be treated as such.
46-
47-
</details>
48-
49-
<details>
50-
51-
<summary>AWS API Gateway Limitations</summary>
52-
53-
There are a few noteworthy limitations to the AWS API Gateway WebSocket implementation.
54-
55-
> Note: If you work on AWS and want to run through this, hit me up!
56-
5752
#### Ping/Pong
5853

5954
For whatever reason, AWS API Gateway does not support WebSocket protocol level ping/pong.
@@ -74,7 +69,6 @@ API Gateway's current socket closing functionality doesn't support any kind of m
7469

7570
Because of this limitation, there is no clear way to communicate subprotocol errors to the client. In the case of a subprotocol error the socket will be closed by the server (with no meaningful disconnect payload).
7671

77-
</details>
7872

7973
## Setup
8074

lib/messages/subscribe-test.ts

+49-48
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ describe('messages/subscribe', () => {
8989
delete: [],
9090
})
9191
})
92+
9293
it('calls the global error callback server errors', async () => {
9394
const event: any = { requestContext: { connectedAt: 1628889982819, connectionId, domainName: 'localhost:3339', eventType: 'MESSAGE', messageDirection: 'IN', messageId: 'b6o5BPxb3', requestId: 'MaEe0DVon', requestTimeEpoch: 1628889983319, routeKey: '$default', stage: 'testing' }, isBase64Encoded: false, body: '{"id":"abcdefg","type":"subscribe","payload":{"query":"{ hello }"}}' }
9495
let error: any = null
@@ -108,59 +109,59 @@ describe('messages/subscribe', () => {
108109
await subscribe({ server, event, message: JSON.parse(event.body) })
109110
assert.match(error.message, /postToConnection Error/ )
110111
})
112+
111113
describe('callbacks', () => {
112-
// this test doesn't make sense anymore because these error are now sent as error messages
113-
// it('fires onSubscribe before subscribing', async () => {
114+
it('fires onSubscribe before subscribing', async () => {
115+
const onSubscribe: string[] = []
114116

115-
// const onSubscribe: string[] = []
117+
const typeDefs = `
118+
type Query {
119+
hello: String
120+
}
121+
type Subscription {
122+
greetings: String
123+
}
124+
`
125+
const resolvers = {
126+
Query: {
127+
hello: () => 'Hello World!',
128+
},
129+
Subscription: {
130+
greetings:{
131+
subscribe: pubsubSubscribe('greetings', {
132+
onSubscribe() {
133+
onSubscribe.push('We did it!')
134+
throw new Error('don\'t subscribe!')
135+
},
136+
}),
137+
resolve: ({ payload }) => {
138+
return payload
139+
},
140+
},
141+
},
142+
}
116143

117-
// const typeDefs = `
118-
// type Query {
119-
// hello: String
120-
// }
121-
// type Subscription {
122-
// greetings: String
123-
// }
124-
// `
125-
// const resolvers = {
126-
// Query: {
127-
// hello: () => 'Hello World!',
128-
// },
129-
// Subscription: {
130-
// greetings:{
131-
// subscribe: pubsubSubscribe('greetings', {
132-
// onSubscribe() {
133-
// onSubscribe.push('We did it!')
134-
// throw new Error('don\'t subscribe!')
135-
// },
136-
// }),
137-
// resolve: ({ payload }) => {
138-
// return payload
139-
// },
140-
// },
141-
// },
142-
// }
144+
const schema = makeExecutableSchema({
145+
typeDefs,
146+
resolvers,
147+
})
148+
const server = await mockServerContext({
149+
schema,
150+
})
151+
const event: any = { requestContext: { connectedAt: 1628889984369, connectionId, domainName: 'localhost:3339', eventType: 'MESSAGE', messageDirection: 'IN', messageId: 'el4MNdOJy', requestId: '0yd7bkvXz', requestTimeEpoch: 1628889984774, routeKey: '$default', stage: 'testing' }, isBase64Encoded: false, body: '{"id":"1234","type":"subscribe","payload":{"query":"subscription { greetings }"}}' }
143152

144-
// const schema = makeExecutableSchema({
145-
// typeDefs,
146-
// resolvers,
147-
// })
148-
// const server = await mockServerContext({
149-
// schema,
150-
// })
151-
// const event: any = { requestContext: { connectedAt: 1628889984369, connectionId, domainName: 'localhost:3339', eventType: 'MESSAGE', messageDirection: 'IN', messageId: 'el4MNdOJy', requestId: '0yd7bkvXz', requestTimeEpoch: 1628889984774, routeKey: '$default', stage: 'testing' }, isBase64Encoded: false, body: '{"id":"1234","type":"subscribe","payload":{"query":"subscription { greetings }"}}' }
153+
await connection_init({ server, event: connectionInitEvent, message: JSON.parse(connectionInitEvent.body) })
154+
try {
155+
await subscribe({ server, event, message: JSON.parse(event.body) })
156+
throw new Error('should not have subscribed')
157+
} catch (error) {
158+
assert.equal(error.message, 'don\'t subscribe!')
159+
}
160+
assert.deepEqual(onSubscribe, ['We did it!'])
161+
const subscriptions = await collect(server.mapper.query(server.model.Subscription, { connectionId: equals(event.requestContext.connectionId) }, { indexName: 'ConnectionIndex' }))
162+
assert.isEmpty(subscriptions)
163+
})
152164

153-
// await connection_init({ server, event: connectionInitEvent, message: JSON.parse(connectionInitEvent.body) })
154-
// try {
155-
// await subscribe({ server, event, message: JSON.parse(event.body) })
156-
// throw new Error('should not have subscribed')
157-
// } catch (error) {
158-
// assert.equal(error.message, 'don\'t subscribe!')
159-
// }
160-
// assert.deepEqual(onSubscribe, ['We did it!'])
161-
// const subscriptions = await collect(server.mapper.query(server.model.Subscription, { connectionId: equals(event.requestContext.connectionId) }, { indexName: 'ConnectionIndex' }))
162-
// assert.isEmpty(subscriptions)
163-
// })
164165
it('fires onAfterSubscribe after subscribing', async () => {
165166
const events: string[] = []
166167

lib/messages/subscribe.ts

+6-7
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { SubscribeMessage, MessageType } from 'graphql-ws'
2-
import { validate, parse, GraphQLError } from 'graphql'
2+
import { validate, parse } from 'graphql'
33
import {
44
buildExecutionContext,
55
assertValidExecutionArguments,
@@ -83,17 +83,16 @@ const setupSubscription: MessageHandler<SubscribeMessage> = async ({ server, eve
8383

8484
const { topicDefinitions, onSubscribe, onAfterSubscribe } = field.subscribe as SubscribePseudoIterable<PubSubEvent>
8585

86-
try {
87-
server.log('onSubscribe', { onSubscribe: !!onSubscribe })
88-
await onSubscribe?.(root, args, context, info)
89-
} catch (error) {
90-
server.log('onSubscribe', { error })
86+
server.log('onSubscribe', { onSubscribe: !!onSubscribe })
87+
const onSubscribeErrors = await onSubscribe?.(root, args, context, info)
88+
if (onSubscribeErrors){
89+
server.log('onSubscribe', { onSubscribeErrors })
9190
return sendMessage(server)({
9291
...event.requestContext,
9392
message: {
9493
type: MessageType.Error,
9594
id: message.id,
96-
payload: [new GraphQLError(error.message)],
95+
payload: onSubscribeErrors,
9796
},
9897
})
9998
}

lib/pubsub/getFilteredSubs-test.ts

+5
Original file line numberDiff line numberDiff line change
@@ -18,3 +18,8 @@ describe('collapseKeys', () => {
1818
})
1919
})
2020
})
21+
22+
describe('getFilteredSubs', () => {
23+
it('can match on payload')
24+
it('can match on connectionId')
25+
})

lib/test/execute-helper.ts

+13
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,11 @@ const messageToString = (message) => {
1414
export const executeQuery = async function* (query: string, {
1515
url = URL,
1616
stayConnected = false,
17+
timeout = 20_000,
1718
}: {
1819
url?: string
1920
stayConnected?: boolean
21+
timeout?: number
2022
} = {}): AsyncGenerator<unknown, void, unknown> {
2123
let id = 1
2224
const ws = new WebSocket(url, 'graphql-transport-ws')
@@ -40,6 +42,14 @@ export const executeQuery = async function* (query: string, {
4042
incomingMessages.queueReturn()
4143
})
4244

45+
let timer: NodeJS.Timeout|null = null
46+
if (timeout) {
47+
timer = setTimeout(() => {
48+
incomingMessages.queueValue({ type: 'timeout', timeout })
49+
incomingMessages.queueReturn()
50+
}, timeout)
51+
}
52+
4353
const send = (data: any) => new Promise<void>(resolve => ws.send(JSON.stringify(data), () => resolve()))
4454

4555
await new Promise(resolve => ws.on('open', resolve))
@@ -65,6 +75,9 @@ export const executeQuery = async function* (query: string, {
6575
if (!stayConnected){
6676
ws.close()
6777
}
78+
if (timer) {
79+
clearTimeout(timer)
80+
}
6881
}
6982

7083

lib/test/graphql-ws-schema.ts

+22-3
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import ws from 'ws'
22
import { useServer } from 'graphql-ws/lib/use/ws'
33
import { makeExecutableSchema } from '@graphql-tools/schema'
4+
import { GraphQLError } from 'graphql'
45

56
const PORT = 4000
67

@@ -11,6 +12,7 @@ const typeDefs = `
1112
type Subscription {
1213
greetings: String
1314
onSubscribeError: String
15+
onResolveError: String
1416
}
1517
`
1618

@@ -32,6 +34,14 @@ const resolvers = {
3234
throw new Error('onSubscribeError')
3335
},
3436
},
37+
onResolveError: {
38+
subscribe: async function*(){
39+
yield { greetings: 'yoyo' }
40+
},
41+
resolve() {
42+
throw new Error('resolver error')
43+
},
44+
},
3545
},
3646
}
3747

@@ -62,17 +72,26 @@ export const startGqlWSServer = async () => {
6272
})
6373

6474
useServer(
65-
{ schema },
75+
{
76+
schema,
77+
async onSubscribe(ctx, message) {
78+
if (message?.payload?.query === 'subscription { onSubscribeError }') {
79+
return [
80+
new GraphQLError('onSubscribeError'),
81+
]
82+
}
83+
},
84+
},
6685
server,
6786
)
6887

6988
await new Promise(resolve => server.on('listening', resolve))
7089
// console.log('server started')
7190

72-
const close = () => new Promise<void>(resolve => server.close(() => resolve()))
91+
const stop = () => new Promise<void>(resolve => server.close(() => resolve()))
7392

7493
return {
7594
url: `ws://localhost:${PORT}`,
76-
close,
95+
stop,
7796
}
7897
}

0 commit comments

Comments
 (0)