AMQPClient.jl icon indicating copy to clipboard operation
AMQPClient.jl copied to clipboard

How to handle broker restart

Open pshashk opened this issue 6 years ago • 3 comments

Communication via fanout exchange works fine.

using AMQPClient

ENV["JULIA_DEBUG"] = "all"

const HOST = "localhost"
const PORT = 5672
const EXCHANGE = "exchange"
const AUTH = Dict{String,Any}(
    "MECHANISM" => "AMQPLAIN",
    "LOGIN" => "guest",
    "PASSWORD" => "guest"
)

function produce()
    conn = connection(;virtualhost="/", host=HOST, port=PORT, auth_params=AUTH)
    chnl = channel(conn, AMQPClient.UNUSED_CHANNEL, true)
    success = exchange_declare(chnl, EXCHANGE, EXCHANGE_TYPE_FANOUT)

    () -> begin
        timestamp = round(Int, time())
        msg = Message(Vector{UInt8}(string(timestamp)), content_type="text/plain")
        basic_publish(chnl, msg; exchange=EXCHANGE, routing_key="")
        @info "sent" data=timestamp
    end
end

function consume()
    conn = connection(;virtualhost="/", host=HOST, port=PORT, auth_params=AUTH)
    chnl = channel(conn, AMQPClient.UNUSED_CHANNEL, true)
    success, queue_name, message_count, consumer_count = queue_declare(chnl, "")
    success = queue_bind(chnl, queue_name, EXCHANGE, "")

    function consumer(msg)
        @info "recieved" data=parse(Int, String(msg.data))
        basic_ack(chnl, msg.delivery_tag)
    end

    success, consumer_tag = basic_consume(chnl, queue_name, consumer)
end

consumer = consume()
producer = produce()

producer()
┌ Info: sent
└   data = 1572692964
┌ Info: recieved
└   data = 1572692964

But when I restart RabbitMQ (docker restart ...) and call producer again julia process hangs and eats all available memory without any error reporting.

The RabbitMQ reliability guide says that in such cases it is necessary to create new connection and channel. How can I do that with AMQPClient?

pshashk avatar Nov 02 '19 11:11 pshashk

On the producer side, it seems like a simple state check is enough.


create_connection() = connection(;virtualhost="/", host=HOST, port=PORT, auth_params=AUTH)
create_channel(conn) = channel(conn, AMQPClient.UNUSED_CHANNEL, true)   

mutable struct Producer
    conn
    chnl
    function Producer()   
        conn = create_connection()
        chnl = create_channel(conn)    
        success = exchange_declare(chnl, EXCHANGE, EXCHANGE_TYPE_FANOUT)
        return new(conn, chnl)
    end
end

function (p::Producer)()
    if p.conn.state != AMQPClient.CONN_STATE_OPEN  
        p.conn = create_connection()
        p.chnl = create_channel(p.conn)        
    elseif p.chnl.state != AMQPClient.CONN_STATE_OPEN  
        p.chnl = create_channel(p.conn)
    end
    timestamp = round(Int, time())
    msg = Message(Vector{UInt8}(string(timestamp)), content_type="text/plain")
    basic_publish(p.chnl, msg; exchange=EXCHANGE, routing_key="")
    @info "sent" data=timestamp
end

But messages still don't make it to the consumer. So some connection handling is required there as well.

pshashk avatar Nov 02 '19 12:11 pshashk

I've managed to implement consumer that could handle broker restarts. But I doubt that's the best way to do it.

function consume()
    while true
        try
            conn = connection()
            chnl = channel(conn, AMQPClient.UNUSED_CHANNEL, true)
            success = exchange_declare(chnl, EXCHNG, EXCHANGE_TYPE_FANOUT, durable=true)
            success, queue_name, _, _ = queue_declare(
                chnl, ""; exclusive=true, durable=false, auto_delete=true
            )
            success = queue_bind(chnl, queue_name, EXCHNG, ROUTE)
            callback = (msg) -> begin
                @info "recieved" data=String(msg.data)
                basic_ack(chnl, msg.delivery_tag)
            end
            success, consumer_tag = basic_consume(chnl, queue_name, callback)
            fetch(chnl.consumers[consumer_tag].receiver)
        catch e
            @error exception=(e, stacktrace(catch_backtrace()))
            sleep(5)
        end
    end
end

pshashk avatar Nov 06 '19 06:11 pshashk

@pshashk This is excellent, thanks. My consumer service now restarts gracefully on connection error.

nsslh avatar Jun 25 '21 12:06 nsslh