From 8d5b904cb5ab55723efec2cddcf834d27b1a0186 Mon Sep 17 00:00:00 2001 From: Dominik Picheta Date: Wed, 19 Nov 2025 18:54:46 +0000 Subject: [PATCH 1/3] Adds long-running websocket stream consumer example --- 14-websocket-stream-consumer/README.md | 25 +++ 14-websocket-stream-consumer/package.json | 13 ++ 14-websocket-stream-consumer/pyproject.toml | 15 ++ 14-websocket-stream-consumer/src/entry.py | 155 ++++++++++++++++++ 14-websocket-stream-consumer/uv.lock | 168 ++++++++++++++++++++ 14-websocket-stream-consumer/wrangler.jsonc | 29 ++++ 6 files changed, 405 insertions(+) create mode 100644 14-websocket-stream-consumer/README.md create mode 100644 14-websocket-stream-consumer/package.json create mode 100644 14-websocket-stream-consumer/pyproject.toml create mode 100644 14-websocket-stream-consumer/src/entry.py create mode 100644 14-websocket-stream-consumer/uv.lock create mode 100644 14-websocket-stream-consumer/wrangler.jsonc diff --git a/14-websocket-stream-consumer/README.md b/14-websocket-stream-consumer/README.md new file mode 100644 index 0000000..b98920c --- /dev/null +++ b/14-websocket-stream-consumer/README.md @@ -0,0 +1,25 @@ +# WebSocket Stream Consumer - Bluesky Firehose + +This example demonstrates a long-running Durable Object that connects to the Bluesky firehose (via Jetstream) and filters for post events, with rate limiting to print at most 1 per second. + +## How to Run + +First ensure that `uv` is installed: +https://docs.astral.sh/uv/getting-started/installation/#standalone-installer + +Now, if you run `uv run pywrangler dev` within this directory, it should use the config +in `wrangler.jsonc` to run the example. + +You can also run `uv run pywrangler deploy` to deploy the example. + +## Testing the Firehose Consumer + +1. Start the worker: `uv run pywrangler dev` +2. Make any request to initialize the DO: `curl "http://localhost:8787/status"` +3. Watch the logs to see filtered Bluesky post events in real-time (rate limited to 1/sec)! + +The Durable Object automatically connects to Jetstream when first accessed. It will maintain a persistent WebSocket connection and print out post events to the console, including the author DID, post text (truncated to 100 chars), and timestamp. Posts are rate limited to display at most 1 per second to avoid overwhelming the logs. + +**Available endpoints:** +- `/status` - Check connection status +- `/reconnect` - Manually trigger reconnection if disconnected diff --git a/14-websocket-stream-consumer/package.json b/14-websocket-stream-consumer/package.json new file mode 100644 index 0000000..603d1f1 --- /dev/null +++ b/14-websocket-stream-consumer/package.json @@ -0,0 +1,13 @@ +{ + "name": "python-websocket-stream-consumer", + "version": "0.0.0", + "private": true, + "scripts": { + "deploy": "uv run pywrangler deploy", + "dev": "uv run pywrangler dev", + "start": "uv run pywrangler dev" + }, + "devDependencies": { + "wrangler": "^4.46.0" + } +} diff --git a/14-websocket-stream-consumer/pyproject.toml b/14-websocket-stream-consumer/pyproject.toml new file mode 100644 index 0000000..cee560e --- /dev/null +++ b/14-websocket-stream-consumer/pyproject.toml @@ -0,0 +1,15 @@ +[project] +name = "python-websocket-stream-consumer" +version = "0.1.0" +description = "Python WebSocket stream consumer example" +readme = "README.md" +requires-python = ">=3.12" +dependencies = [ + "webtypy>=0.1.7", +] + +[dependency-groups] +dev = [ + "workers-py", + "workers-runtime-sdk" +] diff --git a/14-websocket-stream-consumer/src/entry.py b/14-websocket-stream-consumer/src/entry.py new file mode 100644 index 0000000..3f3ff71 --- /dev/null +++ b/14-websocket-stream-consumer/src/entry.py @@ -0,0 +1,155 @@ +from workers import WorkerEntrypoint, Response, DurableObject +import js +import json +import time +from pyodide.ffi import create_proxy +from urllib.parse import urlparse + + +class BlueskyFirehoseConsumer(DurableObject): + """Durable Object that maintains a persistent WebSocket connection to Bluesky Jetstream.""" + + def __init__(self, state, env): + super().__init__(state, env) + self.websocket = None + self.connected = False + self.last_print_time = 0 # Track last time we printed a post + + async def fetch(self, request): + """Handle incoming requests to the Durable Object.""" + # If we're not connected then make sure we start a connection. + if not self.connected: + await self._schedule_next_alarm() + await self._connect_to_jetstream() + + url = urlparse(request.url) + path = url.path + + if path == "/status": + status = "connected" if self.connected else "disconnected" + return Response(f"Firehose status: {status}") + else: + return Response("Available endpoints: /status") + + async def alarm(self): + """Handle alarm events - used to ensure that the DO stays alive and connected""" + print("Alarm triggered - making sure we are connected to jetstream...") + if not self.connected: + await self._connect_to_jetstream() + else: + print("Already connected, skipping reconnection") + + # Schedule the next alarm to keep the DO alive + await self._schedule_next_alarm() + + async def _schedule_next_alarm(self): + """Schedule the next alarm to run in 1 minute to keep the DO alive.""" + # Schedule alarm for 1 minute from now, overwriting any existing alarms + next_alarm_time = int(time.time() * 1000) + 60000 + return await self.ctx.storage.setAlarm(next_alarm_time) + + async def _connect_to_jetstream(self): + """Connect to the Bluesky Jetstream WebSocket and start consuming events.""" + # Get the last event timestamp from storage to resume from the right position + last_timestamp = self.ctx.storage.kv.get("last_event_timestamp") + + # Jetstream endpoint - we'll filter for posts + # Using wantedCollections parameter to only get post events + jetstream_url = "wss://jetstream2.us-east.bsky.network/subscribe?wantedCollections=app.bsky.feed.post" + + # If we have a last timestamp, add it to resume from that point + if last_timestamp: + jetstream_url += f"&cursor={last_timestamp}" + print( + f"Connecting to Bluesky Jetstream at {jetstream_url} (resuming from timestamp: {last_timestamp})" + ) + else: + print( + f"Connecting to Bluesky Jetstream at {jetstream_url} (starting fresh)" + ) + + # Create WebSocket using JS FFI + ws = js.WebSocket.new(jetstream_url) + self.websocket = ws + + # Set up event handlers using JS FFI + async def on_open(event): + self.connected = True + print("Connected to Bluesky Jetstream firehose!") + print( + "Filtering for: app.bsky.feed.post (post events, rate limited to 1/sec)" + ) + # Ensure alarm is set when we connect + await self._schedule_next_alarm() + + def on_message(event): + try: + # Parse the JSON message + data = json.loads(event.data) + + # Store the timestamp for resumption on reconnect + time_us = data.get("time_us") + if time_us: + # Store the timestamp asynchronously + self.ctx.storage.kv.put("last_event_timestamp", time_us) + + # Jetstream sends different event types + # We're interested in 'commit' events which contain posts + if data.get("kind") == "commit": + commit = data.get("commit", {}) + collection = commit.get("collection") + + # Filter for post events + if collection == "app.bsky.feed.post": + # Rate limiting: only print at most 1 per second + current_time = time.time() + if current_time - self.last_print_time >= 1.0: + record = commit.get("record", {}) + print("Post record", record) + + # Update last print time + self.last_print_time = current_time + + except Exception as e: + print(f"Error processing message: {e}") + + def on_error(event): + print(f"WebSocket error: {event}") + self.connected = False + self.ctx.abort("WebSocket error occurred") + + async def on_close(event): + print(f"WebSocket closed: code={event.code}, reason={event.reason}") + self.connected = False + self.ctx.abort("WebSocket closed unexpectedly") + + # Attach event handlers + # + # Note that ordinarily proxies need to be destroyed once they are no longer used. + # However, in this Durable Object context, the WebSocket and its event listeners + # persist for the lifetime of the Durable Object, so we don't explicitly destroy + # the proxies here. When the websocket connection closes, the Durable Object + # is restarted which destroys these proxies. + # + # In the future, we plan to provide support for native Python websocket APIs which + # should eliminate the need for proxy wrappers. + ws.addEventListener("open", create_proxy(on_open)) + ws.addEventListener("message", create_proxy(on_message)) + ws.addEventListener("error", create_proxy(on_error)) + ws.addEventListener("close", create_proxy(on_close)) + + +class Default(WorkerEntrypoint): + """Main worker entry point that routes requests to the Durable Object.""" + + async def fetch(self, request): + # Get the Durable Object namespace from the environment + namespace = self.env.BLUESKY_FIREHOSE + + # Use a fixed ID so we always connect to the same Durable Object instance + # This ensures we maintain a single persistent connection + id = namespace.idFromName("bluesky-consumer") + stub = namespace.get(id) + + # Forward the request to the Durable Object + return await stub.fetch(request) diff --git a/14-websocket-stream-consumer/uv.lock b/14-websocket-stream-consumer/uv.lock new file mode 100644 index 0000000..6922b65 --- /dev/null +++ b/14-websocket-stream-consumer/uv.lock @@ -0,0 +1,168 @@ +version = 1 +revision = 3 +requires-python = ">=3.12" + +[[package]] +name = "click" +version = "8.2.1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "colorama", marker = "sys_platform == 'win32'" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/60/6c/8ca2efa64cf75a977a0d7fac081354553ebe483345c734fb6b6515d96bbc/click-8.2.1.tar.gz", hash = "sha256:27c491cc05d968d271d5a1db13e3b5a184636d9d930f148c50b038f0d0646202", size = 286342, upload-time = "2025-05-20T23:19:49.832Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/85/32/10bb5764d90a8eee674e9dc6f4db6a0ab47c8c4d0d83c27f7c39ac415a4d/click-8.2.1-py3-none-any.whl", hash = "sha256:61a3265b914e850b85317d0b3109c7f8cd35a670f963866005d6ef1d5175a12b", size = 102215, upload-time = "2025-05-20T23:19:47.796Z" }, +] + +[[package]] +name = "colorama" +version = "0.4.6" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/d8/53/6f443c9a4a8358a93a6792e2acffb9d9d5cb0a5cfd8802644b7b1c9a02e4/colorama-0.4.6.tar.gz", hash = "sha256:08695f5cb7ed6e0531a20572697297273c47b8cae5a63ffc6d6ed5c201be6e44", size = 27697, upload-time = "2022-10-25T02:36:22.414Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/d1/d6/3965ed04c63042e047cb6a3e6ed1a63a35087b6a609aa3a15ed8ac56c221/colorama-0.4.6-py2.py3-none-any.whl", hash = "sha256:4f1d9991f5acc0ca119f9d443620b77f9d6b33703e51011c16baf57afb285fc6", size = 25335, upload-time = "2022-10-25T02:36:20.889Z" }, +] + +[[package]] +name = "markdown-it-py" +version = "4.0.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "mdurl" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/5b/f5/4ec618ed16cc4f8fb3b701563655a69816155e79e24a17b651541804721d/markdown_it_py-4.0.0.tar.gz", hash = "sha256:cb0a2b4aa34f932c007117b194e945bd74e0ec24133ceb5bac59009cda1cb9f3", size = 73070, upload-time = "2025-08-11T12:57:52.854Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/94/54/e7d793b573f298e1c9013b8c4dade17d481164aa517d1d7148619c2cedbf/markdown_it_py-4.0.0-py3-none-any.whl", hash = "sha256:87327c59b172c5011896038353a81343b6754500a08cd7a4973bb48c6d578147", size = 87321, upload-time = "2025-08-11T12:57:51.923Z" }, +] + +[[package]] +name = "mdurl" +version = "0.1.2" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/d6/54/cfe61301667036ec958cb99bd3efefba235e65cdeb9c84d24a8293ba1d90/mdurl-0.1.2.tar.gz", hash = "sha256:bb413d29f5eea38f31dd4754dd7377d4465116fb207585f97bf925588687c1ba", size = 8729, upload-time = "2022-08-14T12:40:10.846Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/b3/38/89ba8ad64ae25be8de66a6d463314cf1eb366222074cfda9ee839c56a4b4/mdurl-0.1.2-py3-none-any.whl", hash = "sha256:84008a41e51615a49fc9966191ff91509e3c40b939176e643fd50a5c2196b8f8", size = 9979, upload-time = "2022-08-14T12:40:09.779Z" }, +] + +[[package]] +name = "pygments" +version = "2.19.2" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/b0/77/a5b8c569bf593b0140bde72ea885a803b82086995367bf2037de0159d924/pygments-2.19.2.tar.gz", hash = "sha256:636cb2477cec7f8952536970bc533bc43743542f70392ae026374600add5b887", size = 4968631, upload-time = "2025-06-21T13:39:12.283Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/c7/21/705964c7812476f378728bdf590ca4b771ec72385c533964653c68e86bdc/pygments-2.19.2-py3-none-any.whl", hash = "sha256:86540386c03d588bb81d44bc3928634ff26449851e99741617ecb9037ee5ec0b", size = 1225217, upload-time = "2025-06-21T13:39:07.939Z" }, +] + +[[package]] +name = "pyodide-cli" +version = "0.3.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "rich" }, + { name = "typer" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/50/80/0dd7b828031d08efcdc2be6d69b3b0aa502fb1dcf05eb0397369966329f8/pyodide_cli-0.3.0.tar.gz", hash = "sha256:247a7408f358326dd586477b5fe6eeb146edbed5f058923d258730e743457dd0", size = 11698, upload-time = "2025-04-05T12:18:34.27Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/b4/93/b9815f6f4ef30dd8490bea2a82aa664e5aa9162fa38986b35a0898522d22/pyodide_cli-0.3.0-py3-none-any.whl", hash = "sha256:9d2736e04ddb380fd7eac664e5e4ba23d2c1dd29ed38a98b6246ec529ffc834a", size = 11630, upload-time = "2025-04-05T12:18:33.177Z" }, +] + +[[package]] +name = "python-websocket-stream-consumer" +version = "0.1.0" +source = { virtual = "." } +dependencies = [ + { name = "webtypy" }, +] + +[package.dev-dependencies] +dev = [ + { name = "workers-py" }, + { name = "workers-runtime-sdk" }, +] + +[package.metadata] +requires-dist = [{ name = "webtypy", specifier = ">=0.1.7" }] + +[package.metadata.requires-dev] +dev = [ + { name = "workers-py" }, + { name = "workers-runtime-sdk" }, +] + +[[package]] +name = "rich" +version = "14.1.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "markdown-it-py" }, + { name = "pygments" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/fe/75/af448d8e52bf1d8fa6a9d089ca6c07ff4453d86c65c145d0a300bb073b9b/rich-14.1.0.tar.gz", hash = "sha256:e497a48b844b0320d45007cdebfeaeed8db2a4f4bcf49f15e455cfc4af11eaa8", size = 224441, upload-time = "2025-07-25T07:32:58.125Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/e3/30/3c4d035596d3cf444529e0b2953ad0466f6049528a879d27534700580395/rich-14.1.0-py3-none-any.whl", hash = "sha256:536f5f1785986d6dbdea3c75205c473f970777b4a0d6c6dd1b696aa05a3fa04f", size = 243368, upload-time = "2025-07-25T07:32:56.73Z" }, +] + +[[package]] +name = "shellingham" +version = "1.5.4" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/58/15/8b3609fd3830ef7b27b655beb4b4e9c62313a4e8da8c676e142cc210d58e/shellingham-1.5.4.tar.gz", hash = "sha256:8dbca0739d487e5bd35ab3ca4b36e11c4078f3a234bfce294b0a0291363404de", size = 10310, upload-time = "2023-10-24T04:13:40.426Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/e0/f9/0595336914c5619e5f28a1fb793285925a8cd4b432c9da0a987836c7f822/shellingham-1.5.4-py2.py3-none-any.whl", hash = "sha256:7ecfff8f2fd72616f7481040475a65b2bf8af90a56c89140852d1120324e8686", size = 9755, upload-time = "2023-10-24T04:13:38.866Z" }, +] + +[[package]] +name = "typer" +version = "0.16.1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "click" }, + { name = "rich" }, + { name = "shellingham" }, + { name = "typing-extensions" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/43/78/d90f616bf5f88f8710ad067c1f8705bf7618059836ca084e5bb2a0855d75/typer-0.16.1.tar.gz", hash = "sha256:d358c65a464a7a90f338e3bb7ff0c74ac081449e53884b12ba658cbd72990614", size = 102836, upload-time = "2025-08-18T19:18:22.898Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/2d/76/06dbe78f39b2203d2a47d5facc5df5102d0561e2807396471b5f7c5a30a1/typer-0.16.1-py3-none-any.whl", hash = "sha256:90ee01cb02d9b8395ae21ee3368421faf21fa138cb2a541ed369c08cec5237c9", size = 46397, upload-time = "2025-08-18T19:18:21.663Z" }, +] + +[[package]] +name = "typing-extensions" +version = "4.15.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/72/94/1a15dd82efb362ac84269196e94cf00f187f7ed21c242792a923cdb1c61f/typing_extensions-4.15.0.tar.gz", hash = "sha256:0cea48d173cc12fa28ecabc3b837ea3cf6f38c6d1136f85cbaaf598984861466", size = 109391, upload-time = "2025-08-25T13:49:26.313Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/18/67/36e9267722cc04a6b9f15c7f3441c2363321a3ea07da7ae0c0707beb2a9c/typing_extensions-4.15.0-py3-none-any.whl", hash = "sha256:f0fa19c6845758ab08074a0cfa8b7aecb71c999ca73d62883bc25cc018c4e548", size = 44614, upload-time = "2025-08-25T13:49:24.86Z" }, +] + +[[package]] +name = "webtypy" +version = "0.1.7" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/5e/89/c7a0311fdc73809fc2415be97767f085ff3e00c86546430034dc8465fee7/webtypy-0.1.7.tar.gz", hash = "sha256:1b7212719a949c802f3d60fac5f0d952eb503a92121409cf1ad9847d7c76a336", size = 104505, upload-time = "2023-11-21T19:23:26.342Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/61/91/c731bdaa605279e00b28bfd2bf0ae67f48061d16890fb1c026924bfbd242/webtypy-0.1.7-py3-none-any.whl", hash = "sha256:f35e6d73a4e08783e23adfac271a11cda3a2bd1105499db70e4819244efed0ae", size = 103519, upload-time = "2023-11-21T19:23:23.946Z" }, +] + +[[package]] +name = "workers-py" +version = "1.1.5" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "click" }, + { name = "pyodide-cli" }, + { name = "rich" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/fd/7c/9a620ed692f19010bf97e704ffe26f0dc0b70c562202bb5bb3a2f2c5b302/workers_py-1.1.5.tar.gz", hash = "sha256:5eee1534322b82eaff40fb85176fe3956aff3c4b1363e151bfbb2ba51ca24cf8", size = 31781, upload-time = "2025-08-26T11:37:40.766Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/9c/48/6080eeb7101a2adfa655a7fc6f7cc3f6da62b6a25e9fa9963365911a0a51/workers_py-1.1.5-py3-none-any.whl", hash = "sha256:28465ee8b1f2050c906074653f089e0cae5c30ae99a6c683cfb300bdb3496929", size = 8095, upload-time = "2025-08-26T11:37:38.099Z" }, +] + +[[package]] +name = "workers-runtime-sdk" +version = "0.1.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/7d/90/4954e37776f0e1064002d3d338bfc33900257da6607c3799987c9cdb6776/workers_runtime_sdk-0.1.0.tar.gz", hash = "sha256:137b974130576a8631f0af8f2e6a824006f2dc845de501a7ecd2d787ed2012a6", size = 25174, upload-time = "2025-10-13T17:04:58.393Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/cc/68/338ff8e8cdb232be9870cf5d761e464bb3637f0e63103533c83fe5a7030a/workers_runtime_sdk-0.1.0-py3-none-any.whl", hash = "sha256:d9503025d0cee84f04a85537330a4a6dfa68a7856233c4afbcfb64d0c9d089ae", size = 14481, upload-time = "2025-10-13T17:04:57.225Z" }, +] diff --git a/14-websocket-stream-consumer/wrangler.jsonc b/14-websocket-stream-consumer/wrangler.jsonc new file mode 100644 index 0000000..f9c2bda --- /dev/null +++ b/14-websocket-stream-consumer/wrangler.jsonc @@ -0,0 +1,29 @@ +{ + "$schema": "node_modules/wrangler/config-schema.json", + "name": "python-websocket-stream-consumer", + "main": "src/entry.py", + "compatibility_date": "2025-11-02", + "compatibility_flags": [ + "python_workers" + ], + "observability": { + "enabled": true + }, + "durable_objects": { + "bindings": [ + { + "name": "BLUESKY_FIREHOSE", + "class_name": "BlueskyFirehoseConsumer", + "script_name": "python-websocket-stream-consumer" + } + ] + }, + "migrations": [ + { + "tag": "v1", + "new_sqlite_classes": [ + "BlueskyFirehoseConsumer" + ] + } + ] +} From 857b278f8274bda23a388255cbce3bd1a6925701 Mon Sep 17 00:00:00 2001 From: Dominik Picheta Date: Thu, 20 Nov 2025 16:27:17 +0000 Subject: [PATCH 2/3] Use wasmsockets package --- 14-websocket-stream-consumer/pyproject.toml | 1 + 14-websocket-stream-consumer/src/entry.py | 62 +++++++++------------ 14-websocket-stream-consumer/uv.lock | 20 ++++++- 3 files changed, 47 insertions(+), 36 deletions(-) diff --git a/14-websocket-stream-consumer/pyproject.toml b/14-websocket-stream-consumer/pyproject.toml index cee560e..9412319 100644 --- a/14-websocket-stream-consumer/pyproject.toml +++ b/14-websocket-stream-consumer/pyproject.toml @@ -6,6 +6,7 @@ readme = "README.md" requires-python = ">=3.12" dependencies = [ "webtypy>=0.1.7", + "wasmsockets @ git+https://github.com/dom96/wasmsockets.git@main", ] [dependency-groups] diff --git a/14-websocket-stream-consumer/src/entry.py b/14-websocket-stream-consumer/src/entry.py index 3f3ff71..77cc61f 100644 --- a/14-websocket-stream-consumer/src/entry.py +++ b/14-websocket-stream-consumer/src/entry.py @@ -1,11 +1,10 @@ from workers import WorkerEntrypoint, Response, DurableObject -import js +from wasmsockets.client import connect as ws_connect import json import time -from pyodide.ffi import create_proxy +import asyncio from urllib.parse import urlparse - class BlueskyFirehoseConsumer(DurableObject): """Durable Object that maintains a persistent WebSocket connection to Bluesky Jetstream.""" @@ -14,6 +13,7 @@ def __init__(self, state, env): self.websocket = None self.connected = False self.last_print_time = 0 # Track last time we printed a post + self.consumer_task = None # Track the message consumer task async def fetch(self, request): """Handle incoming requests to the Durable Object.""" @@ -68,24 +68,36 @@ async def _connect_to_jetstream(self): f"Connecting to Bluesky Jetstream at {jetstream_url} (starting fresh)" ) - # Create WebSocket using JS FFI - ws = js.WebSocket.new(jetstream_url) - self.websocket = ws - - # Set up event handlers using JS FFI - async def on_open(event): + try: + # Connect using wasmsockets - provides a websockets-like interface + self.websocket = await ws_connect(jetstream_url) self.connected = True + print("Connected to Bluesky Jetstream firehose!") print( "Filtering for: app.bsky.feed.post (post events, rate limited to 1/sec)" ) + # Ensure alarm is set when we connect await self._schedule_next_alarm() - def on_message(event): + # Start consuming messages in the background + self.consumer_task = asyncio.create_task(self._consume_messages()) + + except Exception as e: + print(f"Failed to connect to Jetstream: {e}") + self.connected = False + self.ctx.abort(f"WebSocket connection failed: {e}") + + async def _consume_messages(self): + """Consume messages from the WebSocket connection.""" + while self.connected and self.websocket: + # Receive message from WebSocket try: + message = await self.websocket.recv() + # Parse the JSON message - data = json.loads(event.data) + data = json.loads(message) # Store the timestamp for resumption on reconnect time_us = data.get("time_us") @@ -110,33 +122,13 @@ def on_message(event): # Update last print time self.last_print_time = current_time + except json.JSONDecodeError as e: + print(f"Error parsing message JSON: {e}") except Exception as e: print(f"Error processing message: {e}") + self.connected = False + self.ctx.abort(f"WebSocket message processing failed: {e}") - def on_error(event): - print(f"WebSocket error: {event}") - self.connected = False - self.ctx.abort("WebSocket error occurred") - - async def on_close(event): - print(f"WebSocket closed: code={event.code}, reason={event.reason}") - self.connected = False - self.ctx.abort("WebSocket closed unexpectedly") - - # Attach event handlers - # - # Note that ordinarily proxies need to be destroyed once they are no longer used. - # However, in this Durable Object context, the WebSocket and its event listeners - # persist for the lifetime of the Durable Object, so we don't explicitly destroy - # the proxies here. When the websocket connection closes, the Durable Object - # is restarted which destroys these proxies. - # - # In the future, we plan to provide support for native Python websocket APIs which - # should eliminate the need for proxy wrappers. - ws.addEventListener("open", create_proxy(on_open)) - ws.addEventListener("message", create_proxy(on_message)) - ws.addEventListener("error", create_proxy(on_error)) - ws.addEventListener("close", create_proxy(on_close)) class Default(WorkerEntrypoint): diff --git a/14-websocket-stream-consumer/uv.lock b/14-websocket-stream-consumer/uv.lock index 6922b65..0bcbfc9 100644 --- a/14-websocket-stream-consumer/uv.lock +++ b/14-websocket-stream-consumer/uv.lock @@ -71,6 +71,7 @@ name = "python-websocket-stream-consumer" version = "0.1.0" source = { virtual = "." } dependencies = [ + { name = "wasmsockets" }, { name = "webtypy" }, ] @@ -81,7 +82,10 @@ dev = [ ] [package.metadata] -requires-dist = [{ name = "webtypy", specifier = ">=0.1.7" }] +requires-dist = [ + { name = "wasmsockets", git = "https://github.com/dom96/wasmsockets.git?rev=main" }, + { name = "webtypy", specifier = ">=0.1.7" }, +] [package.metadata.requires-dev] dev = [ @@ -135,6 +139,20 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/18/67/36e9267722cc04a6b9f15c7f3441c2363321a3ea07da7ae0c0707beb2a9c/typing_extensions-4.15.0-py3-none-any.whl", hash = "sha256:f0fa19c6845758ab08074a0cfa8b7aecb71c999ca73d62883bc25cc018c4e548", size = 44614, upload-time = "2025-08-25T13:49:24.86Z" }, ] +[[package]] +name = "wasmsockets" +version = "0.1.4" +source = { git = "https://github.com/dom96/wasmsockets.git?rev=main#ce04386fe11c5e6a13c02a1a62a58848b87cf0a6" } +dependencies = [ + { name = "websockets", marker = "sys_platform != 'emscripten'" }, +] + +[[package]] +name = "websockets" +version = "10.4" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/85/dc/549a807a53c13fd4a8dac286f117a7a71260defea9ec0c05d6027f2ae273/websockets-10.4.tar.gz", hash = "sha256:eef610b23933c54d5d921c92578ae5f89813438fded840c2e9809d378dc765d3", size = 84877, upload-time = "2022-10-25T20:12:37.712Z" } + [[package]] name = "webtypy" version = "0.1.7" From 13b46c8418b83f56b33d6c770bc3de8dc15e115f Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 20 Nov 2025 16:35:34 +0000 Subject: [PATCH 3/3] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- 14-websocket-stream-consumer/src/entry.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/14-websocket-stream-consumer/src/entry.py b/14-websocket-stream-consumer/src/entry.py index 77cc61f..37a5028 100644 --- a/14-websocket-stream-consumer/src/entry.py +++ b/14-websocket-stream-consumer/src/entry.py @@ -5,6 +5,7 @@ import asyncio from urllib.parse import urlparse + class BlueskyFirehoseConsumer(DurableObject): """Durable Object that maintains a persistent WebSocket connection to Bluesky Jetstream.""" @@ -130,7 +131,6 @@ async def _consume_messages(self): self.ctx.abort(f"WebSocket message processing failed: {e}") - class Default(WorkerEntrypoint): """Main worker entry point that routes requests to the Durable Object."""