nitro
nitro copied to clipboard
WebSockets Cloudflare Pub/Sub & Durable Objects Support
Describe the feature
I have extended the cloudflare adapter with pub/sub but can't make it work when deployed. Any pointers on how to fix this or are there other issues with cf websockets?
In wrangler I added the durable object and ran it with the cloudflare-module preset:
# wrangler.toml
[durable_objects]
bindings = [
{ name = "TOPIC_DO", class_name = "default" }
]
// adapters/cloudflare.ts
import type * as _cf from "@cloudflare/workers-types";
import { Peer } from "../peer";
import { AdapterOptions, defineWebSocketAdapter } from "../types.js";
import { Message } from "../message";
import { WSError } from "../error";
import { createCrossWS } from "../crossws";
import { toBufferLike } from "../_utils";
type Env = Record<string, any>;
declare const WebSocketPair: typeof _cf.WebSocketPair;
declare const Response: typeof _cf.Response;
export interface CloudflareAdapter {
handleUpgrade(
req: _cf.Request,
env: Env,
context: _cf.ExecutionContext,
): Promise<_cf.Response>;
}
export interface CloudflareOptions extends AdapterOptions {}
const topics = new Map<string, Set<_cf.WebSocket>>();
export default defineWebSocketAdapter<CloudflareAdapter, CloudflareOptions>((options = {}) => {
const crossws = createCrossWS(options);
const handleUpgrade = async (req: _cf.Request, env: Env, context: _cf.ExecutionContext) => {
const pair = new WebSocketPair();
const client = pair[0];
const server = pair[1];
const peer = new CloudflarePeer({
cloudflare: { client, server, req, env, context },
});
const { headers } = await crossws.upgrade(peer);
server.accept();
crossws.$callHook("cloudflare:accept", peer);
crossws.callHook("open", peer);
server.addEventListener("message", (event) => {
const data = JSON.parse(<string>event.data);
if (data.action === 'publish') {
publish(data.topic, data.message);
} else if (data.action === 'subscribe') {
subscribe(server, data.topic);
} else if (data.action === 'unsubscribe') {
unsubscribe(server, data.topic);
}
crossws.$callHook("cloudflare:message", peer, event);
crossws.callHook("message", peer, new Message(event.data));
});
server.addEventListener("error", (event) => {
crossws.$callHook("cloudflare:error", peer, event);
crossws.callHook("error", peer, new WSError(event.error));
});
server.addEventListener("close", (event) => {
topics.forEach((subscribers, topic) => unsubscribe(server, topic));
crossws.$callHook("cloudflare:close", peer, event);
crossws.callHook("close", peer, {
code: event.code,
reason: event.reason,
});
});
return new Response(null, {
status: 101,
webSocket: client,
headers,
});
};
return { handleUpgrade };
});
class CloudflarePeer extends Peer<{
cloudflare: {
client: _cf.WebSocket;
server: _cf.WebSocket;
req: _cf.Request;
env: Env;
context: _cf.ExecutionContext;
};
}> {
get addr() {
return undefined;
}
get url() {
return this.ctx.cloudflare.req.url;
}
get headers() {
return this.ctx.cloudflare.req.headers as Headers;
}
get readyState() {
return this.ctx.cloudflare.client.readyState as -1 | 0 | 1 | 2 | 3;
}
send(message: any) {
this.ctx.cloudflare.server.send(toBufferLike(message));
return 0;
}
subscribe(topic: string): void {
const topicId = this.ctx.cloudflare.env.TOPIC_DO.idFromName(topic);
const topicDO = this.ctx.cloudflare.env.TOPIC_DO.get(topicId);
topicDO.fetch('/subscribe', { method: 'POST', body: this.ctx.cloudflare.req.cf?.requestId });
}
unsubscribe(topic: string): void {
const topicId = this.ctx.cloudflare.env.TOPIC_DO.idFromName(topic);
const topicDO = this.ctx.cloudflare.env.TOPIC_DO.get(topicId);
topicDO.fetch('/unsubscribe', { method: 'POST', body: this.ctx.cloudflare.req.cf?.requestId });
}
publish(topic: string, message: any, options?: { compress?: boolean }): void {
const topicId = this.ctx.cloudflare.env.TOPIC_DO.idFromName(topic);
const topicDO = this.ctx.cloudflare.env.TOPIC_DO.get(topicId);
topicDO.fetch('/publish', { method: 'POST', body: JSON.stringify({ topic, message }) });
}
}
function publish(topic: string, message: string) {
const subscribers = topics.get(topic);
if (subscribers) {
subscribers.forEach(subscriber => {
subscriber.send(JSON.stringify({ topic, message }));
});
}
}
function subscribe(ws: _cf.WebSocket, topic: string) {
let subscribers = topics.get(topic);
if (!subscribers) {
subscribers = new Set();
topics.set(topic, subscribers);
}
subscribers.add(ws);
}
function unsubscribe(ws: _cf.WebSocket, topic: string) {
const subscribers = topics.get(topic);
if (subscribers) {
subscribers.delete(ws);
if (subscribers.size === 0) {
topics.delete(topic);
}
}
}
Additional information
- [ ] Would you be willing to help implement this feature?