Is query_api() thread-safe?
Steps to reproduce: List the minimal actions needed to reproduce the behavior.
- We use one connection with InfluxDBClient -> query_api().
- Run multiple threads re-using this same connection.
- We got an error (see screenshots).
Expected behavior: I would expect influx to be thread-safe.
Actual behavior: An error during read happend.
urllib3.exceptions.ProtocolError: ('Connection aborted.', OSError(9, 'Bad file descriptor')) return self._sslobj.read(len, buffer) File "/usr/lib/python3.8/ssl.py", line 1099, in read return self.read(nbytes, buffer) File "/usr/lib/python3.8/ssl.py", line 1241, in recv_into return self._sock.recv_into(b) File "/usr/lib/python3.8/socket.py", line 669, in readinto line = str(self.fp.readline(_MAXLINE + 1), "iso-8859-1") File "/usr/lib/python3.8/http/client.py", line 277, in _read_status version, status, reason = self._read_status() File "/usr/lib/python3.8/http/client.py", line 316, in begin response.begin() File "/usr/lib/python3.8/http/client.py", line 1348, in getresponse httplib_response = conn.getresponse() File "/usr/local/lib/python3.8/dist-packages/urllib3/connectionpool.py", line 440, in _make_request File "
", line 3, in raise_from six.raise_from(e, None) File "/usr/local/lib/python3.8/dist-packages/urllib3/connectionpool.py", line 445, in _make_request httplib_response = self._make_request( File "/usr/local/lib/python3.8/dist-packages/urllib3/connectionpool.py", line 699, in urlopen raise value.with_traceback(tb) File "/usr/local/lib/python3.8/dist-packages/urllib3/packages/six.py", line 734, in reraise raise six.reraise(type(error), error, _stacktrace) File "/usr/local/lib/python3.8/dist-packages/urllib3/util/retry.py", line 507, in increment retries = retries.increment( File "/usr/local/lib/python3.8/dist-packages/urllib3/connectionpool.py", line 755, in urlopen response = conn.urlopen(method, u.request_uri, **kw) File "/usr/local/lib/python3.8/dist-packages/urllib3/poolmanager.py", line 375, in urlopen return self.urlopen(method, url, **extra_kw) File "/usr/local/lib/python3.8/dist-packages/urllib3/request.py", line 170, in request_encode_body return self.request_encode_body( File "/usr/local/lib/python3.8/dist-packages/urllib3/request.py", line 78, in request r = self.pool_manager.request( File "/usr/local/lib/python3.8/dist-packages/influxdb_client/rest.py", line 179, in request return self.request("POST", url, File "/usr/local/lib/python3.8/dist-packages/influxdb_client/rest.py", line 300, in POST return self.rest_client.POST(url, File "/usr/local/lib/python3.8/dist-packages/influxdb_client/api_client.py", line 385, in request response_data = self.request( File "/usr/local/lib/python3.8/dist-packages/influxdb_client/api_client.py", line 170, in __call_api return self._call_api(resource_path, method, File "/usr/local/lib/python3.8/dist-packages/influxdb_client/api_client.py", line 340, in call_api return self.api_client.call_api( File "/usr/local/lib/python3.8/dist-packages/influxdb_client/service/query_service.py", line 340, in post_query_with_http> (data) = self.post_query_with_http_info(**kwargs) # noqa: E501 File "/usr/local/lib/python3.8/dist-packages/influxdb_client/service/query_service.py", line 260, in post_query response = self._query_api.post_query(org=org, query=self._create_query(query, self.default_dialect), File "/usr/local/lib/python3.8/dist-packages/influxdb_client/client/query_api.py", line 140, in query_data_frame_stream _generator = self.query_data_frame_stream(query, org=org, data_frame_index=data_frame_index) File "/usr/local/lib/python3.8/dist-packages/influxdb_client/client/query_api.py", line 116, in query_data_frame result = self.query_api.query_data_frame(query, data_frame_index=["_time"]) Traceback (most recent call last): During handling of the above exception, another exception occurred: OSError: [Errno 9] Bad file descriptor return self._sslobj.read(len, buffer) File "/usr/lib/python3.8/ssl.py", line 1099, in read return self.read(nbytes, buffer) File "/usr/lib/python3.8/ssl.py", line 1241, in recv_into return self._sock.recv_into(b) File "/usr/lib/python3.8/socket.py", line 669, in readinto line = str(self.fp.readline(_MAXLINE + 1), "iso-8859-1") File "/usr/lib/python3.8/http/client.py", line 277, in _read_status version, status, reason = self._read_status() File "/usr/lib/python3.8/http/client.py", line 316, in begin response.begin() File "/usr/lib/python3.8/http/client.py", line 1348, in getresponse httplib_response = conn.getresponse() File "/usr/local/lib/python3.8/dist-packages/urllib3/connectionpool.py", line 440, in _make_request File " ", line 3, in raise_from six.raise_from(e, None) File "/usr/local/lib/python3.8/dist-packages/urllib3/connectionpool.py", line 445, in _make_request httplib_response = self._make_request( File "/usr/local/lib/python3.8/dist-packages/urllib3/connectionpool.py", line 699, in urlopen
Specifications:
- Client Version: 1.16.0
- InfluxDB Version: InfluxDB 2.0
- Platform: Ubuntu 20.04.4 LTS
Aha, I though urllib3 is not threadsafe, but it is. urllib is not.
Hi @rojikada,
thanks for using our client.
The urllib3 should be thread safe for the use case of the client. The urllib3 thread-safe issue comes when opening connections to more hosts than configured the number of pools in PoolManager (currently 4).
- https://github.com/urllib3/urllib3/issues/1252
Regards
Hi @rojikada - were you able to resolve your issue? If not, could you perhaps share some code that reproduces the issue that we can look at?
Thanks!
Hello, I currently don’t have any shareable code, however the main theme was: Apscheduler - multiple threads and all of them doing requests
Hi @rojikada,
I would be happy to continue digging in further, but having a reproducer of some sort will be required otherwise I am guessing at what you are doing :)
Let me know if you could share something. Thanks!
Hello, I don't work on this anymore and got it fixed with locks.
But this is some basic example of the doing:
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.executors.pool import ProcessPoolExecutor
from influxdb_client import InfluxDBClient
class Test(object):
def __init__(self):
self.client = InfluxDBClient(...)
self.query = self.client.query_api()
def test_read(self):
query = f'some random data query'
self.query_api.query_data_frame(query)
t = Test()
scheduler = BackgroundScheduler(executors={'default': ProcessPoolExecutor(4)})
scheduler.add_job()
scheduler.add_job(t.test_read, 'interval', seconds=1, kwargs={})
scheduler.add_job(t.test_read, 'interval', seconds=1, kwargs={})
scheduler.add_job(t.test_read, 'interval', seconds=1, kwargs={})
scheduler.add_job(t.test_read, 'interval', seconds=1, kwargs={})
scheduler.add_job(t.test_read, 'interval', seconds=1, kwargs={})
scheduler.add_job(t.test_read, 'interval', seconds=1, kwargs={})
scheduler.add_job(t.test_read, 'interval', seconds=1, kwargs={})
scheduler.add_job(t.test_read, 'interval', seconds=1, kwargs={})
scheduler.add_job(t.test_read, 'interval', seconds=1, kwargs={})
while True:
sleep(2)