API - PUBSUB CHANNELS
Feature request type
sample request
Is your feature request related to a problem? Please describe
Often used
Describe the solution you'd like
Often used
Describe alternatives you've considered
No response
Additional context
No response
Garnet supports pub/sub already. If you search online for examples of pub/sub in Redis, there should be a lot of hits. Let us know if you run into any bugs or issues.
@badrishc This request about command "PUBSUB CHANNELS"
$ keydb-cli PUBSUB CHANNELS | wc -l
1453015
and
$ keydb-cli INFO | grep -i channel
pubsub_channels:1454151
Running GarnetServer from the main branch (with the new --lua option set), the first example in the Celery documentation fails.
To reproduce the error you will need Python and Celery. Install it with:
pip install celery[redis]
Create the file tasks.py:
from celery import Celery
app = Celery('tasks', backend='redis://localhost', broker='redis://localhost:6379/0')
@app.task
def add(x, y):
return x + y
In the directory where the file is saved open a terminal and execute:
celery -A tasks worker --pool=solo
In the same directory start a new terminal, execute python.exe and enter the following:
> from tasks import add
> add.delay(4,4)
In the first terminal you will get an exception from the Python Redis client:
redis.exceptions.ResponseError: Command # 2 (PUBLISH celery-task-meta-11b19d75-2bbc-496b-8a2f-fe520bcbf8ad {"status": "SUCCESS", "result": 8, "traceback": null, "children": [], "date_done": "2024-08-18T10:39:27.316382+00:00", "task_id": "11b19d75-2bbc-496b-8a2f-fe520bcbf8ad"}) of pipeline caused error: unknown command
Using Carmine, a Redis client for Clojure that auto-generates Clojure functions from the official Redis command spec, the following example causes GarnetServer to terminate:
(ns my-app (:require [taoensso.carmine :as car]))
(defonce my-conn-pool (car/connection-pool {}))
(def my-conn-spec {:uri "redis://localhost/"})
(def my-conn-opts {:pool my-conn-pool, :spec my-conn-spec})
(defmacro wcar* [& body] `(car/wcar my-conn-opts ~@body))
(def my-listener
(car/with-new-pubsub-listener (:spec my-conn-spec)
{"channel*" (fn f2 [msg] (println "f2:" msg))}
(car/psubscribe "channel*")))
(car/with-open-listener my-listener
(car/unsubscribe))
This is the command line output of GarnetServer:
Process terminated. Assertion Failed
at Garnet.server.RespServerSession.NetworkUNSUBSCRIBE(Int32 count) in C:\Users\mardu\git\garnet\libs\server\Resp\PubSubCommands.cs:line 262
at Garnet.server.RespServerSession.ProcessArrayCommands[TGarnetApi](RespCommand cmd, TGarnetApi& storageApi) in C:\Users\mardu\git\garnet\libs\server\Resp\RespServerSession.cs:line 570
at Garnet.server.RespServerSession.ProcessMessages() in C:\Users\mardu\git\garnet\libs\server\Resp\RespServerSession.cs:line 419
at Garnet.server.RespServerSession.TryConsumeMessages(Byte* reqBuffer, Int32 bytesReceived) in C:\Users\mardu\git\garnet\libs\server\Resp\RespServerSession.cs:line 298
at Garnet.networking.NetworkHandler`2.TryProcessRequest() in C:\Users\mardu\git\garnet\libs\common\Networking\NetworkHandler.cs:line 481
at Garnet.networking.NetworkHandler`2.Process() in C:\Users\mardu\git\garnet\libs\common\Networking\NetworkHandler.cs:line 337
at Garnet.networking.NetworkHandler`2.OnNetworkReceive(Int32 bytesTransferred) in C:\Users\mardu\git\garnet\libs\common\Networking\NetworkHandler.cs:line 290
at Garnet.common.TcpNetworkHandlerBase`2.RecvEventArg_Completed(Object sender, SocketAsyncEventArgs e) in C:\Users\mardu\git\garnet\libs\common\Networking\TcpNetworkHandlerBase.cs:line 120
at System.Threading.ExecutionContext.RunInternal(ExecutionContext executionContext, ContextCallback callback, Object state)
at System.Net.Sockets.SocketAsyncEventArgs.<>c.<.cctor>b__173_0(UInt32 errorCode, UInt32 numBytes, NativeOverlapped* nativeOverlapped)
at System.Threading.PortableThreadPool.IOCompletionPoller.Callback.Invoke(Event e)
at System.Threading.ThreadPoolTypedWorkItemQueue`2.System.Threading.IThreadPoolWorkItem.Execute()
at System.Threading.ThreadPoolWorkQueue.Dispatch()
at System.Threading.PortableThreadPool.WorkerThread.WorkerThreadStart()
celery -A tasks worker --pool=solo
Hi @mardukbp, can you try this PR to see if Celery works after this small fix? https://github.com/microsoft/garnet/pull/604
Using Carmine, a Redis client for Clojure that auto-generates Clojure functions from the official Redis command spec, the following example causes GarnetServer to terminate:
(ns my-app (:require [taoensso.carmine :as car])) (defonce my-conn-pool (car/connection-pool {})) (def my-conn-spec {:uri "redis://localhost/"}) (def my-conn-opts {:pool my-conn-pool, :spec my-conn-spec}) (defmacro wcar* [& body] `(car/wcar my-conn-opts ~@body)) (def my-listener (car/with-new-pubsub-listener (:spec my-conn-spec) {"channel*" (fn f2 [msg] (println "f2:" msg))} (car/psubscribe "channel*"))) (car/with-open-listener my-listener (car/unsubscribe))This is the command line output of GarnetServer:
Process terminated. Assertion Failed at Garnet.server.RespServerSession.NetworkUNSUBSCRIBE(Int32 count) in C:\Users\mardu\git\garnet\libs\server\Resp\PubSubCommands.cs:line 262 at Garnet.server.RespServerSession.ProcessArrayCommands[TGarnetApi](RespCommand cmd, TGarnetApi& storageApi) in C:\Users\mardu\git\garnet\libs\server\Resp\RespServerSession.cs:line 570 at Garnet.server.RespServerSession.ProcessMessages() in C:\Users\mardu\git\garnet\libs\server\Resp\RespServerSession.cs:line 419 at Garnet.server.RespServerSession.TryConsumeMessages(Byte* reqBuffer, Int32 bytesReceived) in C:\Users\mardu\git\garnet\libs\server\Resp\RespServerSession.cs:line 298 at Garnet.networking.NetworkHandler`2.TryProcessRequest() in C:\Users\mardu\git\garnet\libs\common\Networking\NetworkHandler.cs:line 481 at Garnet.networking.NetworkHandler`2.Process() in C:\Users\mardu\git\garnet\libs\common\Networking\NetworkHandler.cs:line 337 at Garnet.networking.NetworkHandler`2.OnNetworkReceive(Int32 bytesTransferred) in C:\Users\mardu\git\garnet\libs\common\Networking\NetworkHandler.cs:line 290 at Garnet.common.TcpNetworkHandlerBase`2.RecvEventArg_Completed(Object sender, SocketAsyncEventArgs e) in C:\Users\mardu\git\garnet\libs\common\Networking\TcpNetworkHandlerBase.cs:line 120 at System.Threading.ExecutionContext.RunInternal(ExecutionContext executionContext, ContextCallback callback, Object state) at System.Net.Sockets.SocketAsyncEventArgs.<>c.<.cctor>b__173_0(UInt32 errorCode, UInt32 numBytes, NativeOverlapped* nativeOverlapped) at System.Threading.PortableThreadPool.IOCompletionPoller.Callback.Invoke(Event e) at System.Threading.ThreadPoolTypedWorkItemQueue`2.System.Threading.IThreadPoolWorkItem.Execute() at System.Threading.ThreadPoolWorkQueue.Dispatch() at System.Threading.PortableThreadPool.WorkerThread.WorkerThreadStart()
I cannot get carmine to work on WSL, but try commenting out the debug assert on that line (C:\Users\mardu\git\garnet\libs\server\Resp\PubSubCommands.cs:line 262)
celery -A tasks worker --pool=soloHi @mardukbp, can you try this PR to see if Celery works after this small fix? #604
It works!
I cannot get carmine to work on WSL, but try commenting out the debug assert on that line (C:\Users\mardu\git\garnet\libs\server\Resp\PubSubCommands.cs:line 262)
That works!
Actually, I tried out Carmine on Windows. I installed Clojure using the Windows installer. Then I created the file deps.edn with the contents
{:deps
{com.taoensso/carmine {:mvn/version "3.4.1"}}
}
And then I started the Clojure REPL (clj.exe) on the same directory. I also used the VS Code Extension Calva to execute code directly in the REPL.
I cannot get carmine to work on WSL, but try commenting out the debug assert on that line (C:\Users\mardu\git\garnet\libs\server\Resp\PubSubCommands.cs:line 262)
That works!
Actually, I tried out Carmine on Windows. I installed Clojure using the Windows installer. Then I created the file
deps.ednwith the contents{:deps {com.taoensso/carmine {:mvn/version "3.4.1"}} }And then I started the Clojure REPL (clj.exe) on the same directory. I also used the VS Code Extension Calva to execute code directly in the REPL
Got it, will try this later. Thanks!
如何和 CAP 进行集成, 应该是 SUP PUB 没实现导致
2024-10-07 21:56:17 EROR DotNetCore.CAP.RedisStreams.AsyncLazyRedisConnection Server replied with error, ERR unknown command, for endpoint:127.0.0.1:6379 2024-10-07 21:56:17 EROR DotNetCore.CAP.Processor.Dispatcher An exception occurred while publishing a message, reason:Failed : . message id:2415729483972149249 DotNetCore.CAP.Internal.PublisherSentFailedException: ERR unknown command ---> StackExchange.Redis.RedisServerException: ERR unknown command at DotNetCore.CAP.RedisStreams.RedisStreamManager.PublishAsync(String stream, NameValueEntry[] message) at DotNetCore.CAP.RedisStreams.RedisTransport.SendAsync(TransportMessage message)
@gogo1008 Do you the underneath redis command? Is it PUBSUB CHANNELS?
@gogo1008 Looks like it uses a Redis Stream XADD command, not PUB/SUB. Can you open a separate issue to track it?