influxdb3-python icon indicating copy to clipboard operation
influxdb3-python copied to clipboard

请问如何使用多线程入库?

Open allran opened this issue 5 months ago • 7 comments

    def bulk_save_item_data(obj: SyncInfluxDbObject):
        """批量同步数据"""
        try:
            conn = _create_connection()
            # 插入pd.Dataframe数据
            conn.write(record=obj.bulk_data_pd,
                       data_frame_measurement_name=obj.coll,
                       data_frame_tag_columns=[obj.code_field],
                       data_frame_timestamp_column='time')
            conn.close()
        except Exception as err:
            logging.error(f"Failed to insert to influxdb, ErrMessage:{err}")
            raise err

  with ThreadPoolExecutor(max_workers=10) as executor:
      futures = [executor.submit(bulk_save_item_data, ops) for ops in batch_group]
      for future in as_completed(futures):
          pbar_save_db.update(1)
          result = future.result()

如果是单线程不会出错,后面开的10线程,开始执行插入时没报错,执行一会儿后就会报如下错误:

ErrMessage:HTTPConnectionPool(host='localhost', port=8181): Read timed out. (read timeout=0)

请问如何使用多线程插入?跪求,谢谢

allran avatar Aug 08 '25 09:08 allran

Hi @allran We will take a look.

NguyenHoangSon96 avatar Aug 12 '25 04:08 NguyenHoangSon96

Hi @allran Can you capture the full stack trace? That would help a lot

NguyenHoangSon96 avatar Aug 12 '25 09:08 NguyenHoangSon96

I'm running into the same issue. I have no idea if the problem is in the client or in influxdb. Oddly enough, Grafana does work. A quick look with tcpdump shows that influxdb itself isn't responding. Grafana uses HTTP2, but the Python client uses HTTP1.1.

My test code:

#!/usr/bin/env python3

from influxdb_client_3 import InfluxDBClient3, Point

influx = InfluxDBClient3(token="......",
                            host="http://127.0.0.1:8181",
                            database="sensors")
                            
print(influx.get_server_version())

points = [Point("TEMP")
            .tag("name", "test")
            .field("temperature", 12.3)
        ]
influx.write(record=points)

Full result:

3.3.0

Traceback (most recent call last):
  File "/opt/python3-venv/lib/python3.13/site-packages/urllib3/connectionpool.py", line 534, in _make_request
    response = conn.getresponse()
  File "/opt/python3-venv/lib/python3.13/site-packages/urllib3/connection.py", line 565, in getresponse
    httplib_response = super().getresponse()
  File "/usr/lib/python3.13/http/client.py", line 1430, in getresponse
    response.begin()
    ~~~~~~~~~~~~~~^^
  File "/usr/lib/python3.13/http/client.py", line 331, in begin
    version, status, reason = self._read_status()
                              ~~~~~~~~~~~~~~~~~^^
  File "/usr/lib/python3.13/http/client.py", line 292, in _read_status
    line = str(self.fp.readline(_MAXLINE + 1), "iso-8859-1")
               ~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^
  File "/usr/lib/python3.13/socket.py", line 719, in readinto
    return self._sock.recv_into(b)
           ~~~~~~~~~~~~~~~~~~~~^^^
TimeoutError: timed out

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/opt/./test.py", line 15, in <module>
    influx.write(record=points)
    ~~~~~~~~~~~~^^^^^^^^^^^^^^^
  File "/opt/python3-venv/lib/python3.13/site-packages/influxdb_client_3/__init__.py", line 351, in write
    self._write_api.write(bucket=database, record=record, **kwargs)
    ~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/python3-venv/lib/python3.13/site-packages/influxdb_client_3/write_client/client/write_api.py", line 396, in write
    results = list(map(write_payload, payloads.items()))
  File "/opt/python3-venv/lib/python3.13/site-packages/influxdb_client_3/write_client/client/write_api.py", line 394, in write_payload
    return self._post_write(_async_req, bucket, org, final_string, payload[0], no_sync)
           ~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/python3-venv/lib/python3.13/site-packages/influxdb_client_3/write_client/client/write_api.py", line 544, in _post_write
    return self._write_service.post_write(org=org, bucket=bucket, body=body, precision=precision,
           ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
                                          no_sync=no_sync,
                                          ^^^^^^^^^^^^^^^^
                                          async_req=_async_req,
                                          ^^^^^^^^^^^^^^^^^^^^^
                                          content_type="text/plain; charset=utf-8",
                                          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
                                          **kwargs)
                                          ^^^^^^^^^
  File "/opt/python3-venv/lib/python3.13/site-packages/influxdb_client_3/write_client/service/write_service.py", line 48, in post_write
    (data) = self.post_write_with_http_info(org, bucket, body, **kwargs)  # noqa: E501
  File "/opt/python3-venv/lib/python3.13/site-packages/influxdb_client_3/write_client/service/write_service.py", line 80, in post_write_with_http_info
    return self.api_client.call_api(
           ~~~~~~~~~~~~~~~~~~~~~~~~^
        path, 'POST',
        ^^^^^^^^^^^^^
    ...<12 lines>...
        collection_formats={},
        ^^^^^^^^^^^^^^^^^^^^^^
        urlopen_kw=kwargs.get('urlopen_kw', None))
        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/python3-venv/lib/python3.13/site-packages/influxdb_client_3/write_client/_sync/api_client.py", line 361, in call_api
    return self.__call_api(resource_path, method,
           ~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^
                           path_params, query_params, header_params,
                           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    ...<2 lines>...
                           _return_http_data_only, collection_formats,
                           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
                           _preload_content, _request_timeout, urlopen_kw)
                           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/python3-venv/lib/python3.13/site-packages/influxdb_client_3/write_client/_sync/api_client.py", line 191, in __call_api
    response_data = self.request(
        method, url, query_params=query_params, headers=header_params,
        post_params=post_params, body=body,
        _preload_content=_preload_content,
        _request_timeout=_request_timeout, **urlopen_kw)
  File "/opt/python3-venv/lib/python3.13/site-packages/influxdb_client_3/write_client/_sync/api_client.py", line 406, in request
    return self.rest_client.POST(url,
           ~~~~~~~~~~~~~~~~~~~~~^^^^^
                                 query_params=query_params,
                                 ^^^^^^^^^^^^^^^^^^^^^^^^^^
    ...<4 lines>...
                                 body=body,
                                 ^^^^^^^^^^
                                 **urlopen_kw)
                                 ^^^^^^^^^^^^^
  File "/opt/python3-venv/lib/python3.13/site-packages/influxdb_client_3/write_client/_sync/rest.py", line 291, in POST
    return self.request("POST", url,
           ~~~~~~~~~~~~^^^^^^^^^^^^^
                        headers=headers,
                        ^^^^^^^^^^^^^^^^
    ...<4 lines>...
                        body=body,
                        ^^^^^^^^^^
                        **urlopen_kw)
                        ^^^^^^^^^^^^^
  File "/opt/python3-venv/lib/python3.13/site-packages/influxdb_client_3/write_client/_sync/rest.py", line 200, in request
    r = self.pool_manager.request(
        method, url,
    ...<3 lines>...
        headers=headers,
        **urlopen_kw)
  File "/opt/python3-venv/lib/python3.13/site-packages/urllib3/_request_methods.py", line 143, in request
    return self.request_encode_body(
           ~~~~~~~~~~~~~~~~~~~~~~~~^
        method, url, fields=fields, headers=headers, **urlopen_kw
        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    )
    ^
  File "/opt/python3-venv/lib/python3.13/site-packages/urllib3/_request_methods.py", line 278, in request_encode_body
    return self.urlopen(method, url, **extra_kw)
           ~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/python3-venv/lib/python3.13/site-packages/urllib3/poolmanager.py", line 459, in urlopen
    response = conn.urlopen(method, u.request_uri, **kw)
  File "/opt/python3-venv/lib/python3.13/site-packages/urllib3/connectionpool.py", line 841, in urlopen
    retries = retries.increment(
        method, url, error=new_e, _pool=self, _stacktrace=sys.exc_info()[2]
    )
  File "/opt/python3-venv/lib/python3.13/site-packages/urllib3/util/retry.py", line 449, in increment
    raise reraise(type(error), error, _stacktrace)
          ~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/python3-venv/lib/python3.13/site-packages/urllib3/util/util.py", line 39, in reraise
    raise value
  File "/opt/python3-venv/lib/python3.13/site-packages/urllib3/connectionpool.py", line 787, in urlopen
    response = self._make_request(
        conn,
    ...<10 lines>...
        **response_kw,
    )
  File "/opt/python3-venv/lib/python3.13/site-packages/urllib3/connectionpool.py", line 536, in _make_request
    self._raise_timeout(err=e, url=url, timeout_value=read_timeout)
    ~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/python3-venv/lib/python3.13/site-packages/urllib3/connectionpool.py", line 367, in _raise_timeout
    raise ReadTimeoutError(
        self, url, f"Read timed out. (read timeout={timeout_value})"
    ) from err
urllib3.exceptions.ReadTimeoutError: HTTPConnectionPool(host='127.0.0.1', port=8181): Read timed out. (read timeout=9.999816542011104)

brambo123 avatar Aug 14 '25 10:08 brambo123

Hi @brambo123 Thank you for a very detailed report. I still have a question. Did you install infludb3 from a Docker container or something else..., and which version of influxdb3 .e.g: enterprise, cloud serverless,....

NguyenHoangSon96 avatar Aug 14 '25 10:08 NguyenHoangSon96

Hi @brambo123 Thank you for a very detailed report. I still have a question. Did you install infludb3 from a Docker container or something else..., and which version of influxdb3 .e.g: enterprise, cloud serverless,....

I'm running core version 3.3.0 in a Docker container.

Part of strace from influxdb, it seems like influxdb isn't responding at all.

6083  setsockopt(19, SOL_TCP, TCP_NODELAY, [1], 4) = 0
6083  getsockname(19, {sa_family=AF_INET, sin_port=htons(8181), sin_addr=inet_addr("172.18.0.2")}, [128 => 16]) = 0
6083  futex(0x7ff3879fd518, FUTEX_WAKE_PRIVATE, 1) = 1
6144  <... futex resumed>)              = 0
6083  accept4(17,  <unfinished ...>
6144  futex(0x7ff3873ff518, FUTEX_WAKE_PRIVATE, 1) = 1
6145  <... futex resumed>)              = 0
6083  <... accept4 resumed>0x7ffff1d74900, [128], SOCK_CLOEXEC|SOCK_NONBLOCK) = -1 EAGAIN (Resource temporarily unavailable)
6145  futex(0x7ff3873ff518, FUTEX_WAIT_PRIVATE, 1, NULL <unfinished ...>
6144  recvfrom(19,  <unfinished ...>
6083  futex(0x7ff3886fc398, FUTEX_WAIT_PRIVATE, 1, NULL <unfinished ...>
6144  <... recvfrom resumed>"GET /ping HTTP/1.1\r\nHost: 127.0.0.1:8181\r\nAccept-Encoding: identity\r\nAuthorization: Token xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx\r\nUser-Agent: influxdb3-python/0.15.0\r\nContent-Type: application/json\r\n\r\n", 8192, 0, NULL, NULL) = 255
6144  getrandom("\x35\x53\xd7\x3f\xbf\x4d\xa3\x8a", 8, 0) = 8
6144  getrandom("\xc0\xd2\xec\xfb\x70\xad\x17\x05\x92\x6b\x3f\x57\xd1\xb8\xc5\xbd", 16, 0) = 16
6144  writev(19, [{iov_base="HTTP/1.1 200 OK\r\ncontent-type: application/json\r\nrequest-id: 0198a819-6072-7fc3-bd75-335f6b920517\r\nx-influxdb-build: Core\r\nx-influxdb-version: 3.3.0\r\nx-request-id: 0198a819-6072-7fc3-bd75-335f6b920517\r\naccess-control-allow-origin: *\r\ntransfer-encoding: chunked\r\ndate: Thu, 14 Aug 2025 10:21:32 GMT\r\n\r\n", iov_len=301}, {iov_base="5F\r\n", iov_len=4}, {iov_base="{\"version\":\"3.3.0\",\"revision\":\"02d7ee1e6f\",\"process_id\":\"9175a822-9c8c-4b74-999f-9af3577ce925\"}", iov_len=95}, {iov_base="\r\n0\r\n\r\n", iov_len=7}], 4) = 407
6144  futex(0x7ff3879fd518, FUTEX_WAIT_PRIVATE, 1, NULL <unfinished ...>
6142  <... epoll_wait resumed>[{events=EPOLLIN|EPOLLOUT, data=0x7ff3883c2380}], 1024, 511) = 1
6142  recvfrom(19, "POST /api/v2/write?org=default&bucket=sensors&precision=ns HTTP/1.1\r\nHost: 127.0.0.1:8181\r\nAccept-Encoding: identity\r\nContent-Length: 31\r\nContent-Type: text/plain\r\nAccept: application/json\r\nAuthorization: Token xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx\r\nUser-Agent: influxdb3-python/0.15.0\r\n\r\nTEMP,name=test temperature=12.3", 8192, 0, NULL, NULL) = 375
6142  futex(0x7ff3873ff518, FUTEX_WAKE_PRIVATE, 1) = 1
6145  <... futex resumed>)              = 0
6145  futex(0x7ff3873ff518, FUTEX_WAIT_PRIVATE, 1, NULL <unfinished ...>
6142  epoll_wait(3,  <unfinished ...>
6157  <... clock_nanosleep resumed>0x7ff3823fcce0) = 0
6151  <... clock_nanosleep resumed>0x7ff384b7bce0) = 0
6157  clock_nanosleep(CLOCK_REALTIME, 0, {tv_sec=0, tv_nsec=100000000},  <unfinished ...>
6151  clock_nanosleep(CLOCK_REALTIME, 0, {tv_sec=0, tv_nsec=100000000},  <unfinished ...>
6157  <... clock_nanosleep resumed>0x7ff3823fcce0) = 0
6151  <... clock_nanosleep resumed>0x7ff384b7bce0) = 0

Edit: Even weirder: It seems like the data is being saved, but only becomes visible after some time.

brambo123 avatar Aug 14 '25 10:08 brambo123

@brambo123 Thank you. I will check

NguyenHoangSon96 avatar Aug 14 '25 11:08 NguyenHoangSon96

Hi @brambo123 I have run with the same code as you, but I can't reproduce the error. I'm using

  • Docker image influxdb:3.3.0-core
  • My os: macOS Apple Silicon
  • influxdb3-python v0.15.0 Can you provide more details about your environment?

NguyenHoangSon96 avatar Aug 15 '25 04:08 NguyenHoangSon96