influxdb-client-python icon indicating copy to clipboard operation
influxdb-client-python copied to clipboard

Is query_api() thread-safe?

Open rojikada opened this issue 3 years ago • 6 comments

Steps to reproduce: List the minimal actions needed to reproduce the behavior.

  1. We use one connection with InfluxDBClient -> query_api().
  2. Run multiple threads re-using this same connection.
  3. We got an error (see screenshots).

Expected behavior: I would expect influx to be thread-safe.

Actual behavior: An error during read happend.

urllib3.exceptions.ProtocolError: ('Connection aborted.', OSError(9, 'Bad file descriptor')) return self._sslobj.read(len, buffer) File "/usr/lib/python3.8/ssl.py", line 1099, in read return self.read(nbytes, buffer) File "/usr/lib/python3.8/ssl.py", line 1241, in recv_into return self._sock.recv_into(b) File "/usr/lib/python3.8/socket.py", line 669, in readinto line = str(self.fp.readline(_MAXLINE + 1), "iso-8859-1") File "/usr/lib/python3.8/http/client.py", line 277, in _read_status version, status, reason = self._read_status() File "/usr/lib/python3.8/http/client.py", line 316, in begin response.begin() File "/usr/lib/python3.8/http/client.py", line 1348, in getresponse httplib_response = conn.getresponse() File "/usr/local/lib/python3.8/dist-packages/urllib3/connectionpool.py", line 440, in _make_request File "", line 3, in raise_from six.raise_from(e, None) File "/usr/local/lib/python3.8/dist-packages/urllib3/connectionpool.py", line 445, in _make_request httplib_response = self._make_request( File "/usr/local/lib/python3.8/dist-packages/urllib3/connectionpool.py", line 699, in urlopen raise value.with_traceback(tb) File "/usr/local/lib/python3.8/dist-packages/urllib3/packages/six.py", line 734, in reraise raise six.reraise(type(error), error, _stacktrace) File "/usr/local/lib/python3.8/dist-packages/urllib3/util/retry.py", line 507, in increment retries = retries.increment( File "/usr/local/lib/python3.8/dist-packages/urllib3/connectionpool.py", line 755, in urlopen response = conn.urlopen(method, u.request_uri, **kw) File "/usr/local/lib/python3.8/dist-packages/urllib3/poolmanager.py", line 375, in urlopen return self.urlopen(method, url, **extra_kw) File "/usr/local/lib/python3.8/dist-packages/urllib3/request.py", line 170, in request_encode_body return self.request_encode_body( File "/usr/local/lib/python3.8/dist-packages/urllib3/request.py", line 78, in request r = self.pool_manager.request( File "/usr/local/lib/python3.8/dist-packages/influxdb_client/rest.py", line 179, in request return self.request("POST", url, File "/usr/local/lib/python3.8/dist-packages/influxdb_client/rest.py", line 300, in POST return self.rest_client.POST(url, File "/usr/local/lib/python3.8/dist-packages/influxdb_client/api_client.py", line 385, in request response_data = self.request( File "/usr/local/lib/python3.8/dist-packages/influxdb_client/api_client.py", line 170, in __call_api return self._call_api(resource_path, method, File "/usr/local/lib/python3.8/dist-packages/influxdb_client/api_client.py", line 340, in call_api return self.api_client.call_api( File "/usr/local/lib/python3.8/dist-packages/influxdb_client/service/query_service.py", line 340, in post_query_with_http> (data) = self.post_query_with_http_info(**kwargs) # noqa: E501 File "/usr/local/lib/python3.8/dist-packages/influxdb_client/service/query_service.py", line 260, in post_query response = self._query_api.post_query(org=org, query=self._create_query(query, self.default_dialect), File "/usr/local/lib/python3.8/dist-packages/influxdb_client/client/query_api.py", line 140, in query_data_frame_stream _generator = self.query_data_frame_stream(query, org=org, data_frame_index=data_frame_index) File "/usr/local/lib/python3.8/dist-packages/influxdb_client/client/query_api.py", line 116, in query_data_frame result = self.query_api.query_data_frame(query, data_frame_index=["_time"]) Traceback (most recent call last): During handling of the above exception, another exception occurred: OSError: [Errno 9] Bad file descriptor return self._sslobj.read(len, buffer) File "/usr/lib/python3.8/ssl.py", line 1099, in read return self.read(nbytes, buffer) File "/usr/lib/python3.8/ssl.py", line 1241, in recv_into return self._sock.recv_into(b) File "/usr/lib/python3.8/socket.py", line 669, in readinto line = str(self.fp.readline(_MAXLINE + 1), "iso-8859-1") File "/usr/lib/python3.8/http/client.py", line 277, in _read_status version, status, reason = self._read_status() File "/usr/lib/python3.8/http/client.py", line 316, in begin response.begin() File "/usr/lib/python3.8/http/client.py", line 1348, in getresponse httplib_response = conn.getresponse() File "/usr/local/lib/python3.8/dist-packages/urllib3/connectionpool.py", line 440, in _make_request File "", line 3, in raise_from six.raise_from(e, None) File "/usr/local/lib/python3.8/dist-packages/urllib3/connectionpool.py", line 445, in _make_request httplib_response = self._make_request( File "/usr/local/lib/python3.8/dist-packages/urllib3/connectionpool.py", line 699, in urlopen

Specifications:

  • Client Version: 1.16.0
  • InfluxDB Version: InfluxDB 2.0
  • Platform: Ubuntu 20.04.4 LTS

rojikada avatar Apr 08 '22 11:04 rojikada

Aha, I though urllib3 is not threadsafe, but it is. urllib is not.

rojikada avatar Apr 08 '22 14:04 rojikada

Hi @rojikada,

thanks for using our client.

The urllib3 should be thread safe for the use case of the client. The urllib3 thread-safe issue comes when opening connections to more hosts than configured the number of pools in PoolManager (currently 4).

  • https://github.com/urllib3/urllib3/issues/1252

Regards

bednar avatar Apr 12 '22 05:04 bednar

Hi @rojikada - were you able to resolve your issue? If not, could you perhaps share some code that reproduces the issue that we can look at?

Thanks!

powersj avatar Aug 05 '22 18:08 powersj

Hello, I currently don’t have any shareable code, however the main theme was: Apscheduler - multiple threads and all of them doing requests

rojikada avatar Aug 09 '22 05:08 rojikada

Hi @rojikada,

I would be happy to continue digging in further, but having a reproducer of some sort will be required otherwise I am guessing at what you are doing :)

Let me know if you could share something. Thanks!

powersj avatar Aug 10 '22 14:08 powersj

Hello, I don't work on this anymore and got it fixed with locks.

But this is some basic example of the doing:

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.executors.pool import ProcessPoolExecutor 
from influxdb_client import InfluxDBClient


class Test(object):
    def __init__(self):
        self.client = InfluxDBClient(...)
        self.query = self.client.query_api()

    def test_read(self):
        query = f'some random data query'
        self.query_api.query_data_frame(query)

t = Test()
scheduler = BackgroundScheduler(executors={'default': ProcessPoolExecutor(4)})
scheduler.add_job()
scheduler.add_job(t.test_read, 'interval', seconds=1, kwargs={})
scheduler.add_job(t.test_read, 'interval', seconds=1, kwargs={})
scheduler.add_job(t.test_read, 'interval', seconds=1, kwargs={})
scheduler.add_job(t.test_read, 'interval', seconds=1, kwargs={})
scheduler.add_job(t.test_read, 'interval', seconds=1, kwargs={})
scheduler.add_job(t.test_read, 'interval', seconds=1, kwargs={})
scheduler.add_job(t.test_read, 'interval', seconds=1, kwargs={})
scheduler.add_job(t.test_read, 'interval', seconds=1, kwargs={})
scheduler.add_job(t.test_read, 'interval', seconds=1, kwargs={})

while True:
    sleep(2)

rojikada avatar Dec 09 '22 16:12 rojikada