Pubsub : Non null assertion resulting error in `commands-queue.ts`
Description
Hello Here is my implementation of a pubsub
class RedisGraphqlPubSub implements PubSub {
constructor(
private readonly redisClient: RedisClientType,
private readonly logger: ILoggerService
) {}
async publish<K extends keyof Events>(
routingKey: K,
payload: Events[K]
): Promise<void> {
await this.redisClient.publish(routingKey, 'toto');
}
subscribe<K extends keyof Events>(routingKey: K): AsyncIterable<unknown> {
return this.subscribeWithPolling(routingKey);
}
async *subscribeWithPolling<K extends keyof Events>(
routingKey: K
): AsyncIterable<string> {
const subscriber = this.redisClient.duplicate({disableOfflineQueue: true});
try {
await subscriber.connect();
const messages: string[] = [];
await subscriber.subscribe(routingKey, (message: string) => {
try {
messages.push(message);
} catch (err) {
console.error("❌ POLLING MESSAGE ERROR", err);
}
});
while (true) {
if (messages.length > 0) {
yield messages.shift()!;
} else {
await new Promise(resolve => setTimeout(resolve, 50));
}
}
} finally {
try {
await subscriber.unsubscribe(routingKey);
await subscriber.destroy();
} catch (err) {
console.error("❌ POLLING CLEANUP ERROR", err);
}
}
}
}
Publish works fine and polling too but when receiving a message i have an error:
{"message":"Cannot read properties of undefined (reading 'value')","stack":"TypeError: Cannot read properties of undefined (reading 'value')\n at RedisCommandQueue.#getTypeMapping (/Users/apps/api/node_modules/@redis/client/lib/client/commands-queue.ts:113:40)
It look like this line is the error in commands-queue.ts
#getTypeMapping() {
return this.#waitingForReply.head!.value.typeMapping ?? {};
}
First of all head non null assertion is not safe enought, i could have received {}
Second, i don't know what's wrong in my implementation
Thanks in advance
Node.js Version
v20.11.0
Redis Server Version
redis-stack:7.4.0-v6 or alpine redis image
Node Redis Version
Platform
macOS
Logs
When adding a condition:
if (!this.#waitingForReply.head) {
return {};
}
It crash else where, i dont understand why this.#waitingForReply.head or .shift is undefined
@rchoffar thanks, I will check this out
Hi @rchoffar, i tried your example and couldn't reproduce the issue.
Here is the example i tried ( i have stripped out some non-essential parts of you example, like the PubSub interface, the logger service, generics, etc. ):
src/index.ts
import { GenericContainer } from "testcontainers";
import { createClient } from "redis";
import RedisGraphqlPubSub from "./pubsub";
const container = await new GenericContainer("redis")
.withExposedPorts(6379)
.start();
const client = createClient({
url: `redis://localhost:${container.getMappedPort(6379)}`,
});
await client.connect();
const pubsub = new RedisGraphqlPubSub(client);
const routingKey = "foo";
setTimeout(async () => {
for (let i = 0; i < 1000; i++) {
await pubsub.publish(routingKey, "hi");
}
}, 100);
for await (const value of pubsub.subscribe(routingKey)) {
console.log(value);
}
await client.close();
src/pubsub.ts
import { RedisClientType } from "redis";
export default class RedisGraphqlPubSub {
constructor(
private readonly redisClient: RedisClientType<any,any,any,any,any>,
) {}
async publish(
routingKey: string,
payload: string
): Promise<void> {
await this.redisClient.publish(routingKey, payload);
}
subscribe(routingKey: string): AsyncIterable<unknown> {
return this.subscribeWithPolling(routingKey);
}
async *subscribeWithPolling(
routingKey: string
): AsyncIterable<string> {
const subscriber = this.redisClient.duplicate({disableOfflineQueue: true});
try {
await subscriber.connect();
const messages: string[] = [];
await subscriber.subscribe(routingKey, (message: string) => {
try {
messages.push(message);
} catch (err) {
console.error("❌ POLLING MESSAGE ERROR", err);
}
});
while (true) {
if (messages.length > 0) {
yield messages.shift()!;
} else {
await new Promise(resolve => setTimeout(resolve, 50));
}
}
} finally {
try {
await subscriber.unsubscribe(routingKey);
await subscriber.destroy();
} catch (err) {
console.error("❌ POLLING CLEANUP ERROR", err);
}
}
}
}
Can you provide exact example i can run and reproduce the issue?
I am gonna try to reproduce, where you using redis-stack:7.4.0-v6 or alpine redis image ?
Just use new GenericContainer("redis") which is the latest official image
I investigated, When using
createClient({
url: `redis://localhost:6379`,
RESP: 2,
})
It is not working and i have the error i showed you.
When using
createClient({
url: `redis://localhost:6379`,
RESP: 3,
})
It's working
When creating the client without resp params it is not working
Here is the typescript error i had:
Two different types with this name exist, but they are unrelated.
Type '3' is not assignable to type '2'
@rchoffar can you provide a complete example that i can run and reproduce the issue?