ergoq
ergoq copied to clipboard
Simple queue (with publish/subscribe) abstraction.
README
Warning !!
This library is heavily developed and the api can be changed !! I hope that in couple of days the api will stabilize and I will continue to develop other drivers.
What is this repository for?
Ergoq package is small and lightweight message queue abstraction. Currently redis implementation and amqp is done. In the future more implementations will be done
Usage
All snippets of code assume import of library
import "github.com/phonkee/ergoq"
Ergoq supports drivers system as seen in sql package. Every driver uses it's own connection(for redis it's redis.Pool). To open ergoq message queue you can use Open function and provide DSN. Every driver can have slightly different implementation but usually you will see
<driverName>://<host>:<port>/<database>?params
Example:
dsn := "redis://localhost:6379/0?max_idle=100&max_active=100&idle_timeout=200"
dsnAmqp := "amqp://guest:guest@localhost:5672/test?auto_ack=true&prefix=queues"
Each driver can support it's params.
Drivers
Redis
connection: &redis.Pool
DSN params:
- max_idle - default is 10
- max_active - default is 10
- idle_timeout - default is 300
Local
local memory queue and publish/subscribe
DSN params:
- size - max size of queues
Open message queue
You can open message two ways.
a. You provide DSN string to ergoq.Open and let ergoq make connections for you
mq, err := ergoq.Open("redis://localhost:6379/0")
if err != nil {
panic(err)
}
b. You provide connection to OpenConnection
pool := redis.Pool{
Dial: func() (redis.Conn, error) {
return redis.Dial("tcp", ":6379")
},
}
mq, err := ergoq.OpenConnection("redis", &pool, "auto_ack=true")
if err != nil {
panic(err)
}
API
MessageQueuer interface says it all.
type MessageQueuer interface {
// Pushes message to queue
Push(topic string, messages ...[]byte) error
// Pops message from queue
Pop(topic string) (QueueMessage, error)
// Publishes message to queue (all subscribers)
// Fanout
Publish(topic string, message []byte) error
// Subscribes to queue(s)
Subscribe(quit <-chan struct{}, topics ...string) (chan SubscriberMessage, chan error)
}
Examples:
// Error checking is omitted, but please you make your checks!
mq, _ := ergoq.Open("redis://localhost:6379/0")
// If we want to push to queue (direct) only first who pops this value will
// process it
_ := mq.Push("queue", []byte("message"))
// pop data from queue
// second argument is blocking
// third optional parameter is timeout for blocking
data, _ := mq.Pop("queue")
// If we want to publish message to all subscribers of given queue
// we need to call Publish method
errPub := mq.Publish("user:1", "logged_in")
if errPub != nil {
panic(errPub)
}
// subscribe to channels can be donw following way.
// You need to provide "quit" channel when subscription will be stopped.
// Subscribe returns 2 channels, result and errors.
quit := make(chan struct{})
results, error := mq.Subscribe(quit, "user:1", "admins")
go func() {
for {
select {
r <- results:
fmt.Println("result %+v", r)
e <- errors:
panic(e)
}
}
}()
Contribute
Welcome!