How to handle broker restart
Communication via fanout exchange works fine.
using AMQPClient
ENV["JULIA_DEBUG"] = "all"
const HOST = "localhost"
const PORT = 5672
const EXCHANGE = "exchange"
const AUTH = Dict{String,Any}(
"MECHANISM" => "AMQPLAIN",
"LOGIN" => "guest",
"PASSWORD" => "guest"
)
function produce()
conn = connection(;virtualhost="/", host=HOST, port=PORT, auth_params=AUTH)
chnl = channel(conn, AMQPClient.UNUSED_CHANNEL, true)
success = exchange_declare(chnl, EXCHANGE, EXCHANGE_TYPE_FANOUT)
() -> begin
timestamp = round(Int, time())
msg = Message(Vector{UInt8}(string(timestamp)), content_type="text/plain")
basic_publish(chnl, msg; exchange=EXCHANGE, routing_key="")
@info "sent" data=timestamp
end
end
function consume()
conn = connection(;virtualhost="/", host=HOST, port=PORT, auth_params=AUTH)
chnl = channel(conn, AMQPClient.UNUSED_CHANNEL, true)
success, queue_name, message_count, consumer_count = queue_declare(chnl, "")
success = queue_bind(chnl, queue_name, EXCHANGE, "")
function consumer(msg)
@info "recieved" data=parse(Int, String(msg.data))
basic_ack(chnl, msg.delivery_tag)
end
success, consumer_tag = basic_consume(chnl, queue_name, consumer)
end
consumer = consume()
producer = produce()
producer()
┌ Info: sent
└ data = 1572692964
┌ Info: recieved
└ data = 1572692964
But when I restart RabbitMQ (docker restart ...) and call producer again julia process hangs and eats all available memory without any error reporting.
The RabbitMQ reliability guide says that in such cases it is necessary to create new connection and channel. How can I do that with AMQPClient?
On the producer side, it seems like a simple state check is enough.
create_connection() = connection(;virtualhost="/", host=HOST, port=PORT, auth_params=AUTH)
create_channel(conn) = channel(conn, AMQPClient.UNUSED_CHANNEL, true)
mutable struct Producer
conn
chnl
function Producer()
conn = create_connection()
chnl = create_channel(conn)
success = exchange_declare(chnl, EXCHANGE, EXCHANGE_TYPE_FANOUT)
return new(conn, chnl)
end
end
function (p::Producer)()
if p.conn.state != AMQPClient.CONN_STATE_OPEN
p.conn = create_connection()
p.chnl = create_channel(p.conn)
elseif p.chnl.state != AMQPClient.CONN_STATE_OPEN
p.chnl = create_channel(p.conn)
end
timestamp = round(Int, time())
msg = Message(Vector{UInt8}(string(timestamp)), content_type="text/plain")
basic_publish(p.chnl, msg; exchange=EXCHANGE, routing_key="")
@info "sent" data=timestamp
end
But messages still don't make it to the consumer. So some connection handling is required there as well.
I've managed to implement consumer that could handle broker restarts. But I doubt that's the best way to do it.
function consume()
while true
try
conn = connection()
chnl = channel(conn, AMQPClient.UNUSED_CHANNEL, true)
success = exchange_declare(chnl, EXCHNG, EXCHANGE_TYPE_FANOUT, durable=true)
success, queue_name, _, _ = queue_declare(
chnl, ""; exclusive=true, durable=false, auto_delete=true
)
success = queue_bind(chnl, queue_name, EXCHNG, ROUTE)
callback = (msg) -> begin
@info "recieved" data=String(msg.data)
basic_ack(chnl, msg.delivery_tag)
end
success, consumer_tag = basic_consume(chnl, queue_name, callback)
fetch(chnl.consumers[consumer_tag].receiver)
catch e
@error exception=(e, stacktrace(catch_backtrace()))
sleep(5)
end
end
end
@pshashk This is excellent, thanks. My consumer service now restarts gracefully on connection error.