node-redis icon indicating copy to clipboard operation
node-redis copied to clipboard

Pubsub : Non null assertion resulting error in `commands-queue.ts`

Open rchoffar opened this issue 5 months ago • 7 comments

Description

Hello Here is my implementation of a pubsub

class RedisGraphqlPubSub implements PubSub {
  constructor(
    private readonly redisClient: RedisClientType, 
    private readonly logger: ILoggerService
  ) {}

  async publish<K extends keyof Events>(
    routingKey: K, 
    payload: Events[K]
  ): Promise<void> {
    await this.redisClient.publish(routingKey, 'toto');
  }

  subscribe<K extends keyof Events>(routingKey: K): AsyncIterable<unknown> {
    return this.subscribeWithPolling(routingKey);
  }

  async *subscribeWithPolling<K extends keyof Events>(
    routingKey: K
  ): AsyncIterable<string> {
    
    const subscriber = this.redisClient.duplicate({disableOfflineQueue: true});
    
    try {
      await subscriber.connect();
      
      const messages: string[] = [];
      await subscriber.subscribe(routingKey, (message: string) => {
          try {
            messages.push(message);
          } catch (err) {
            console.error("❌ POLLING MESSAGE ERROR", err);
          }
      });
      
      while (true) {
        if (messages.length > 0) {
          yield messages.shift()!;
        } else {
          await new Promise(resolve => setTimeout(resolve, 50));
        }
      }
      
    } finally {
      try {
        await subscriber.unsubscribe(routingKey);
        await subscriber.destroy();
      } catch (err) {
        console.error("❌ POLLING CLEANUP ERROR", err);
      }
    }
  }
}

Publish works fine and polling too but when receiving a message i have an error:

{"message":"Cannot read properties of undefined (reading 'value')","stack":"TypeError: Cannot read properties of undefined (reading 'value')\n    at RedisCommandQueue.#getTypeMapping (/Users/apps/api/node_modules/@redis/client/lib/client/commands-queue.ts:113:40)

It look like this line is the error in commands-queue.ts

#getTypeMapping() {
    return this.#waitingForReply.head!.value.typeMapping ?? {};
  }

First of all head non null assertion is not safe enought, i could have received {}

Second, i don't know what's wrong in my implementation

Thanks in advance

Node.js Version

v20.11.0

Redis Server Version

redis-stack:7.4.0-v6 or alpine redis image

Node Redis Version

[email protected]

Platform

macOS

Logs


rchoffar avatar Aug 11 '25 08:08 rchoffar

When adding a condition:

         if (!this.#waitingForReply.head) {
            return {};
        }

It crash else where, i dont understand why this.#waitingForReply.head or .shift is undefined

rchoffar avatar Aug 11 '25 09:08 rchoffar

@rchoffar thanks, I will check this out

nkaradzhov avatar Aug 11 '25 15:08 nkaradzhov

Hi @rchoffar, i tried your example and couldn't reproduce the issue.

Here is the example i tried ( i have stripped out some non-essential parts of you example, like the PubSub interface, the logger service, generics, etc. ):

src/index.ts

import { GenericContainer } from "testcontainers";
import { createClient } from "redis";
import RedisGraphqlPubSub from "./pubsub";

const container = await new GenericContainer("redis")
  .withExposedPorts(6379)
  .start();

const client = createClient({
  url: `redis://localhost:${container.getMappedPort(6379)}`,
});

await client.connect();

const pubsub = new RedisGraphqlPubSub(client);

const routingKey = "foo";

setTimeout(async () => {
  for (let i = 0; i < 1000; i++) {
    await pubsub.publish(routingKey, "hi");
  }
}, 100);

for await (const value of pubsub.subscribe(routingKey)) {
  console.log(value);
}

await client.close();

src/pubsub.ts

import { RedisClientType } from "redis";

export default class RedisGraphqlPubSub {
  constructor(
    private readonly redisClient: RedisClientType<any,any,any,any,any>,
  ) {}

  async publish(
    routingKey: string,
    payload: string
  ): Promise<void> {
    await this.redisClient.publish(routingKey, payload);
  }

  subscribe(routingKey: string): AsyncIterable<unknown> {
    return this.subscribeWithPolling(routingKey);
  }

  async *subscribeWithPolling(
    routingKey: string
  ): AsyncIterable<string> {

    const subscriber = this.redisClient.duplicate({disableOfflineQueue: true});

    try {
      await subscriber.connect();

      const messages: string[] = [];
      await subscriber.subscribe(routingKey, (message: string) => {
          try {
            messages.push(message);
          } catch (err) {
            console.error("❌ POLLING MESSAGE ERROR", err);
          }
      });

      while (true) {
        if (messages.length > 0) {
          yield messages.shift()!;
        } else {
          await new Promise(resolve => setTimeout(resolve, 50));
        }
      }

    } finally {
      try {
        await subscriber.unsubscribe(routingKey);
        await subscriber.destroy();
      } catch (err) {
        console.error("❌ POLLING CLEANUP ERROR", err);
      }
    }
  }
}

Can you provide exact example i can run and reproduce the issue?

nkaradzhov avatar Aug 14 '25 09:08 nkaradzhov

I am gonna try to reproduce, where you using redis-stack:7.4.0-v6 or alpine redis image ?

rchoffar avatar Aug 14 '25 14:08 rchoffar

Just use new GenericContainer("redis") which is the latest official image

nkaradzhov avatar Aug 15 '25 10:08 nkaradzhov

I investigated, When using

createClient({
    url: `redis://localhost:6379`,
    RESP: 2,
  })

It is not working and i have the error i showed you.

When using

createClient({
    url: `redis://localhost:6379`,
    RESP: 3,
  })

It's working

When creating the client without resp params it is not working

Here is the typescript error i had:

Two different types with this name exist, but they are unrelated.
          Type '3' is not assignable to type '2'

rchoffar avatar Aug 18 '25 15:08 rchoffar

@rchoffar can you provide a complete example that i can run and reproduce the issue?

nkaradzhov avatar Aug 18 '25 18:08 nkaradzhov