garnet icon indicating copy to clipboard operation
garnet copied to clipboard

API - PUBSUB CHANNELS

Open rnz opened this issue 1 year ago • 12 comments

Feature request type

sample request

Is your feature request related to a problem? Please describe

Often used

Describe the solution you'd like

Often used

Describe alternatives you've considered

No response

Additional context

No response

rnz avatar May 27 '24 15:05 rnz

Garnet supports pub/sub already. If you search online for examples of pub/sub in Redis, there should be a lot of hits. Let us know if you run into any bugs or issues.

badrishc avatar May 28 '24 02:05 badrishc

@badrishc This request about command "PUBSUB CHANNELS"

$ keydb-cli PUBSUB CHANNELS | wc -l
1453015

and

$ keydb-cli INFO | grep -i channel
pubsub_channels:1454151

rnz avatar May 28 '24 15:05 rnz

Running GarnetServer from the main branch (with the new --lua option set), the first example in the Celery documentation fails.

To reproduce the error you will need Python and Celery. Install it with:

pip install celery[redis]

Create the file tasks.py:

from celery import Celery

app = Celery('tasks', backend='redis://localhost', broker='redis://localhost:6379/0')

@app.task
def add(x, y):
    return x + y

In the directory where the file is saved open a terminal and execute:

celery -A tasks worker --pool=solo

In the same directory start a new terminal, execute python.exe and enter the following:

> from tasks import add
> add.delay(4,4)

In the first terminal you will get an exception from the Python Redis client:

redis.exceptions.ResponseError: Command # 2 (PUBLISH celery-task-meta-11b19d75-2bbc-496b-8a2f-fe520bcbf8ad {"status": "SUCCESS", "result": 8, "traceback": null, "children": [], "date_done": "2024-08-18T10:39:27.316382+00:00", "task_id": "11b19d75-2bbc-496b-8a2f-fe520bcbf8ad"}) of pipeline caused error: unknown command

mardukbp avatar Aug 18 '24 11:08 mardukbp

Using Carmine, a Redis client for Clojure that auto-generates Clojure functions from the official Redis command spec, the following example causes GarnetServer to terminate:

(ns my-app (:require [taoensso.carmine :as car]))

(defonce my-conn-pool (car/connection-pool {}))
(def     my-conn-spec {:uri "redis://localhost/"})
(def     my-conn-opts {:pool my-conn-pool, :spec my-conn-spec})

(defmacro wcar* [& body] `(car/wcar my-conn-opts ~@body))

(def my-listener
  (car/with-new-pubsub-listener (:spec my-conn-spec)
    {"channel*" (fn f2 [msg] (println "f2:" msg))}
    (car/psubscribe "channel*")))

(car/with-open-listener my-listener
  (car/unsubscribe))

This is the command line output of GarnetServer:

Process terminated. Assertion Failed
   at Garnet.server.RespServerSession.NetworkUNSUBSCRIBE(Int32 count) in C:\Users\mardu\git\garnet\libs\server\Resp\PubSubCommands.cs:line 262
   at Garnet.server.RespServerSession.ProcessArrayCommands[TGarnetApi](RespCommand cmd, TGarnetApi& storageApi) in C:\Users\mardu\git\garnet\libs\server\Resp\RespServerSession.cs:line 570
   at Garnet.server.RespServerSession.ProcessMessages() in C:\Users\mardu\git\garnet\libs\server\Resp\RespServerSession.cs:line 419
   at Garnet.server.RespServerSession.TryConsumeMessages(Byte* reqBuffer, Int32 bytesReceived) in C:\Users\mardu\git\garnet\libs\server\Resp\RespServerSession.cs:line 298
   at Garnet.networking.NetworkHandler`2.TryProcessRequest() in C:\Users\mardu\git\garnet\libs\common\Networking\NetworkHandler.cs:line 481
   at Garnet.networking.NetworkHandler`2.Process() in C:\Users\mardu\git\garnet\libs\common\Networking\NetworkHandler.cs:line 337
   at Garnet.networking.NetworkHandler`2.OnNetworkReceive(Int32 bytesTransferred) in C:\Users\mardu\git\garnet\libs\common\Networking\NetworkHandler.cs:line 290
   at Garnet.common.TcpNetworkHandlerBase`2.RecvEventArg_Completed(Object sender, SocketAsyncEventArgs e) in C:\Users\mardu\git\garnet\libs\common\Networking\TcpNetworkHandlerBase.cs:line 120
   at System.Threading.ExecutionContext.RunInternal(ExecutionContext executionContext, ContextCallback callback, Object state)
   at System.Net.Sockets.SocketAsyncEventArgs.<>c.<.cctor>b__173_0(UInt32 errorCode, UInt32 numBytes, NativeOverlapped* nativeOverlapped)
   at System.Threading.PortableThreadPool.IOCompletionPoller.Callback.Invoke(Event e)
   at System.Threading.ThreadPoolTypedWorkItemQueue`2.System.Threading.IThreadPoolWorkItem.Execute()
   at System.Threading.ThreadPoolWorkQueue.Dispatch()
   at System.Threading.PortableThreadPool.WorkerThread.WorkerThreadStart()

mardukbp avatar Aug 18 '24 15:08 mardukbp

celery -A tasks worker --pool=solo

Hi @mardukbp, can you try this PR to see if Celery works after this small fix? https://github.com/microsoft/garnet/pull/604

badrishc avatar Aug 21 '24 09:08 badrishc

Using Carmine, a Redis client for Clojure that auto-generates Clojure functions from the official Redis command spec, the following example causes GarnetServer to terminate:

(ns my-app (:require [taoensso.carmine :as car]))

(defonce my-conn-pool (car/connection-pool {}))
(def     my-conn-spec {:uri "redis://localhost/"})
(def     my-conn-opts {:pool my-conn-pool, :spec my-conn-spec})

(defmacro wcar* [& body] `(car/wcar my-conn-opts ~@body))

(def my-listener
  (car/with-new-pubsub-listener (:spec my-conn-spec)
    {"channel*" (fn f2 [msg] (println "f2:" msg))}
    (car/psubscribe "channel*")))

(car/with-open-listener my-listener
  (car/unsubscribe))

This is the command line output of GarnetServer:

Process terminated. Assertion Failed
   at Garnet.server.RespServerSession.NetworkUNSUBSCRIBE(Int32 count) in C:\Users\mardu\git\garnet\libs\server\Resp\PubSubCommands.cs:line 262
   at Garnet.server.RespServerSession.ProcessArrayCommands[TGarnetApi](RespCommand cmd, TGarnetApi& storageApi) in C:\Users\mardu\git\garnet\libs\server\Resp\RespServerSession.cs:line 570
   at Garnet.server.RespServerSession.ProcessMessages() in C:\Users\mardu\git\garnet\libs\server\Resp\RespServerSession.cs:line 419
   at Garnet.server.RespServerSession.TryConsumeMessages(Byte* reqBuffer, Int32 bytesReceived) in C:\Users\mardu\git\garnet\libs\server\Resp\RespServerSession.cs:line 298
   at Garnet.networking.NetworkHandler`2.TryProcessRequest() in C:\Users\mardu\git\garnet\libs\common\Networking\NetworkHandler.cs:line 481
   at Garnet.networking.NetworkHandler`2.Process() in C:\Users\mardu\git\garnet\libs\common\Networking\NetworkHandler.cs:line 337
   at Garnet.networking.NetworkHandler`2.OnNetworkReceive(Int32 bytesTransferred) in C:\Users\mardu\git\garnet\libs\common\Networking\NetworkHandler.cs:line 290
   at Garnet.common.TcpNetworkHandlerBase`2.RecvEventArg_Completed(Object sender, SocketAsyncEventArgs e) in C:\Users\mardu\git\garnet\libs\common\Networking\TcpNetworkHandlerBase.cs:line 120
   at System.Threading.ExecutionContext.RunInternal(ExecutionContext executionContext, ContextCallback callback, Object state)
   at System.Net.Sockets.SocketAsyncEventArgs.<>c.<.cctor>b__173_0(UInt32 errorCode, UInt32 numBytes, NativeOverlapped* nativeOverlapped)
   at System.Threading.PortableThreadPool.IOCompletionPoller.Callback.Invoke(Event e)
   at System.Threading.ThreadPoolTypedWorkItemQueue`2.System.Threading.IThreadPoolWorkItem.Execute()
   at System.Threading.ThreadPoolWorkQueue.Dispatch()
   at System.Threading.PortableThreadPool.WorkerThread.WorkerThreadStart()

I cannot get carmine to work on WSL, but try commenting out the debug assert on that line (C:\Users\mardu\git\garnet\libs\server\Resp\PubSubCommands.cs:line 262)

badrishc avatar Aug 21 '24 10:08 badrishc

celery -A tasks worker --pool=solo

Hi @mardukbp, can you try this PR to see if Celery works after this small fix? #604

It works!

mardukbp avatar Aug 21 '24 14:08 mardukbp

I cannot get carmine to work on WSL, but try commenting out the debug assert on that line (C:\Users\mardu\git\garnet\libs\server\Resp\PubSubCommands.cs:line 262)

That works!

Actually, I tried out Carmine on Windows. I installed Clojure using the Windows installer. Then I created the file deps.edn with the contents

{:deps
 {com.taoensso/carmine {:mvn/version "3.4.1"}}
}

And then I started the Clojure REPL (clj.exe) on the same directory. I also used the VS Code Extension Calva to execute code directly in the REPL.

mardukbp avatar Aug 21 '24 14:08 mardukbp

I cannot get carmine to work on WSL, but try commenting out the debug assert on that line (C:\Users\mardu\git\garnet\libs\server\Resp\PubSubCommands.cs:line 262)

That works!

Actually, I tried out Carmine on Windows. I installed Clojure using the Windows installer. Then I created the file deps.edn with the contents

{:deps
 {com.taoensso/carmine {:mvn/version "3.4.1"}}
}

And then I started the Clojure REPL (clj.exe) on the same directory. I also used the VS Code Extension Calva to execute code directly in the REPL

Got it, will try this later. Thanks!

badrishc avatar Aug 21 '24 21:08 badrishc

如何和 CAP 进行集成, 应该是 SUP PUB 没实现导致 CAP

2024-10-07 21:56:17 EROR DotNetCore.CAP.RedisStreams.AsyncLazyRedisConnection Server replied with error, ERR unknown command, for endpoint:127.0.0.1:6379 2024-10-07 21:56:17 EROR DotNetCore.CAP.Processor.Dispatcher An exception occurred while publishing a message, reason:Failed : . message id:2415729483972149249 DotNetCore.CAP.Internal.PublisherSentFailedException: ERR unknown command ---> StackExchange.Redis.RedisServerException: ERR unknown command at DotNetCore.CAP.RedisStreams.RedisStreamManager.PublishAsync(String stream, NameValueEntry[] message) at DotNetCore.CAP.RedisStreams.RedisTransport.SendAsync(TransportMessage message)

gogo1008 avatar Oct 07 '24 14:10 gogo1008

@gogo1008 Do you the underneath redis command? Is it PUBSUB CHANNELS?

Vijay-Nirmal avatar Oct 07 '24 15:10 Vijay-Nirmal

@gogo1008 Looks like it uses a Redis Stream XADD command, not PUB/SUB. Can you open a separate issue to track it?

Vijay-Nirmal avatar Oct 07 '24 16:10 Vijay-Nirmal