After trying to reconnect many times, client gets stuck
Hello,
I run the test program below, and after it connects to ZooKeeper, I block access to port 2181 with iptables. After a while (~90 sec) and a number of retries, the test program gets stuck: it blocks writing to a socketpair that seems full (it seems no thread reads from it).
Test program:
#!/usr/bin/env python3
import kazoo.client
import logging
import queue
import time
logging.basicConfig(level=logging.INFO)
# very low timeouts to reproduce the issue faster
CONNECT_TIMEOUT = 0.2
RETRY_TIMEOUT = 0.1
SESSION_TIMEOUT = 0.2
ZK_HOSTS = 'localhost:2181'
class MyClient:
def __init__(self):
self.zk = kazoo.client.KazooClient(hosts=ZK_HOSTS,
timeout=SESSION_TIMEOUT)
self.mqueue = queue.Queue()
self.zk.add_listener(self.session_listener)
self.zk_state = kazoo.client.KazooState.LOST
self.running = False
def session_listener(self, state):
self.mqueue.put((self.update_state, (state,)))
def update_state(self, state):
logging.info('state {} -> {}'.format(self.zk_state, state))
self.zk_state = state
def reconnect(self):
if self.zk_state == kazoo.client.KazooState.CONNECTED:
return
try:
logging.info('connecting')
if self.running:
self.zk.stop()
self.running = False
self.zk.start(timeout=CONNECT_TIMEOUT)
self.zk_state = kazoo.client.KazooState.CONNECTED
self.running = True
logging.info('connected')
except Exception as e:
logging.error(repr(e))
def main():
cl = MyClient()
while True:
while not cl.mqueue.empty():
f, args = cl.mqueue.get()
f(*args)
cl.reconnect()
time.sleep(RETRY_TIMEOUT)
if __name__ == '__main__':
main()
iptables script:
#!/bin/bash
set -x
case $1 in
block)
echo blocking
for port in 218{1..3}; do
iptables -t filter -A INPUT -p tcp --sport $port -j DROP
iptables -t filter -A OUTPUT -p tcp --dport $port -j DROP
done
;;
unblock)
echo unblocking
for port in 218{1..3}; do
iptables -t filter -D INPUT -p tcp --sport $port -j DROP
iptables -t filter -D OUTPUT -p tcp --dport $port -j DROP
done
;;
*)
echo "unknown command: $1"
exit 1
;;
esac
I also run the script under python3-dbg so I can attach gdb to it, and obtain thread backtraces:
$ gdb python3-dbg $(pgrep python3-dbg )
...
(gdb) i threads
Id Target Id Frame
* 1 Thread 0x7fd5da634740 (LWP 5426) "python3-dbg" 0x00007fd5da23995b in __libc_send (fd=8, buf=0x7fd5da477858, n=1, flags=-1) at ../sysdeps/unix/sysv/linux/x86_64/send.c:31
2 Thread 0x7fd5d7238700 (LWP 6898) "python3-dbg" sem_wait () at ../nptl/sysdeps/unix/sysv/linux/x86_64/sem_wait.S:85
3 Thread 0x7fd5d6236700 (LWP 6899) "python3-dbg" sem_wait () at ../nptl/sysdeps/unix/sysv/linux/x86_64/sem_wait.S:85
(gdb) thread apply all py-bt
Thread 3 (Thread 0x7fd5d6236700 (LWP 6899)):
Traceback (most recent call first):
<built-in method acquire of _thread.lock object at remote 0x7fd5d74bcd50>
File "/usr/lib/python3.4/threading.py", line 290, in wait
waiter.acquire()
File "/usr/lib/python3.4/queue.py", line 167, in get
self.not_empty.wait()
File "/home/saffroy/ring/modules/membership/src/venv-dbg/lib/python3.4/site-packages/kazoo/handlers/threading.py", line 124, in _thread_worker
func = queue.get()
File "/usr/lib/python3.4/threading.py", line 868, in run
self._target(*self._args, **self._kwargs)
File "/usr/lib/python3.4/threading.py", line 920, in _bootstrap_inner
self.run()
File "/usr/lib/python3.4/threading.py", line 888, in _bootstrap
self._bootstrap_inner()
Thread 2 (Thread 0x7fd5d7238700 (LWP 6898)):
Traceback (most recent call first):
<built-in method acquire of _thread.lock object at remote 0x7fd5d74bc8d8>
File "/usr/lib/python3.4/threading.py", line 290, in wait
waiter.acquire()
File "/usr/lib/python3.4/queue.py", line 167, in get
self.not_empty.wait()
File "/home/saffroy/ring/modules/membership/src/venv-dbg/lib/python3.4/site-packages/kazoo/handlers/threading.py", line 124, in _thread_worker
func = queue.get()
File "/usr/lib/python3.4/threading.py", line 868, in run
self._target(*self._args, **self._kwargs)
File "/usr/lib/python3.4/threading.py", line 920, in _bootstrap_inner
self.run()
File "/usr/lib/python3.4/threading.py", line 888, in _bootstrap
self._bootstrap_inner()
Thread 1 (Thread 0x7fd5da634740 (LWP 5426)):
Traceback (most recent call first):
<built-in method send of socket object at remote 0x7fd5d724a178>
File "/home/saffroy/ring/modules/membership/src/venv-dbg/lib/python3.4/site-packages/kazoo/client.py", line 682, in stop
self._connection._write_sock.send(b'\0')
File "/home/saffroy/ring/modules/membership/src/venv-dbg/lib/python3.4/site-packages/kazoo/client.py", line 630, in start
self.stop()
File "./test_31889.py", line 42, in reconnect
File "./test_31889.py", line 56, in main
File "./test_31889.py", line 60, in <module>
While the program runs, I monitor the state of its socketpairs with ss:
watch -n1 "ss -xp|awk 'NR==1 || /dbg/'"
When the program blocks, ss output always ends up in this state:
Netid State Recv-Q Send-Q Local Address:Port Peer Address:Port
u_str ESTAB 0 213504 * 68192600 * 68192599 users:(("python3-dbg",5426,8))
u_str ESTAB 278 0 * 68192599 * 68192600 users:(("python3-dbg",5426,7))
Anything wrong in my program? Or is it a bug in Kazoo?
If I change my code to always call zk.close() before zk.start() then the problem disappears: the socketpairs don't get clogged with unread bytes, as they are closed and re-created on every connection attempt.
I think the bug is that, on a failure, KazooClient.start() itself calls self.stop() but not self.close(). As we have multiple failed connection attemps, bytes accumulate in the ConnectionHandler socketpair, which eventually is full.
Adding this call to close() solves the problem for me, I'll do a PR.
BTW the problem is even easier to reproduce by trying to connecto to a non-existent ZK server (e.g. use localhost:12345).
PR opened in #579