请问如何使用多线程入库?
def bulk_save_item_data(obj: SyncInfluxDbObject):
"""批量同步数据"""
try:
conn = _create_connection()
# 插入pd.Dataframe数据
conn.write(record=obj.bulk_data_pd,
data_frame_measurement_name=obj.coll,
data_frame_tag_columns=[obj.code_field],
data_frame_timestamp_column='time')
conn.close()
except Exception as err:
logging.error(f"Failed to insert to influxdb, ErrMessage:{err}")
raise err
with ThreadPoolExecutor(max_workers=10) as executor:
futures = [executor.submit(bulk_save_item_data, ops) for ops in batch_group]
for future in as_completed(futures):
pbar_save_db.update(1)
result = future.result()
如果是单线程不会出错,后面开的10线程,开始执行插入时没报错,执行一会儿后就会报如下错误:
ErrMessage:HTTPConnectionPool(host='localhost', port=8181): Read timed out. (read timeout=0)
请问如何使用多线程插入?跪求,谢谢
Hi @allran We will take a look.
Hi @allran Can you capture the full stack trace? That would help a lot
I'm running into the same issue. I have no idea if the problem is in the client or in influxdb. Oddly enough, Grafana does work. A quick look with tcpdump shows that influxdb itself isn't responding. Grafana uses HTTP2, but the Python client uses HTTP1.1.
My test code:
#!/usr/bin/env python3
from influxdb_client_3 import InfluxDBClient3, Point
influx = InfluxDBClient3(token="......",
host="http://127.0.0.1:8181",
database="sensors")
print(influx.get_server_version())
points = [Point("TEMP")
.tag("name", "test")
.field("temperature", 12.3)
]
influx.write(record=points)
Full result:
3.3.0
Traceback (most recent call last):
File "/opt/python3-venv/lib/python3.13/site-packages/urllib3/connectionpool.py", line 534, in _make_request
response = conn.getresponse()
File "/opt/python3-venv/lib/python3.13/site-packages/urllib3/connection.py", line 565, in getresponse
httplib_response = super().getresponse()
File "/usr/lib/python3.13/http/client.py", line 1430, in getresponse
response.begin()
~~~~~~~~~~~~~~^^
File "/usr/lib/python3.13/http/client.py", line 331, in begin
version, status, reason = self._read_status()
~~~~~~~~~~~~~~~~~^^
File "/usr/lib/python3.13/http/client.py", line 292, in _read_status
line = str(self.fp.readline(_MAXLINE + 1), "iso-8859-1")
~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^
File "/usr/lib/python3.13/socket.py", line 719, in readinto
return self._sock.recv_into(b)
~~~~~~~~~~~~~~~~~~~~^^^
TimeoutError: timed out
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/opt/./test.py", line 15, in <module>
influx.write(record=points)
~~~~~~~~~~~~^^^^^^^^^^^^^^^
File "/opt/python3-venv/lib/python3.13/site-packages/influxdb_client_3/__init__.py", line 351, in write
self._write_api.write(bucket=database, record=record, **kwargs)
~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/python3-venv/lib/python3.13/site-packages/influxdb_client_3/write_client/client/write_api.py", line 396, in write
results = list(map(write_payload, payloads.items()))
File "/opt/python3-venv/lib/python3.13/site-packages/influxdb_client_3/write_client/client/write_api.py", line 394, in write_payload
return self._post_write(_async_req, bucket, org, final_string, payload[0], no_sync)
~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/python3-venv/lib/python3.13/site-packages/influxdb_client_3/write_client/client/write_api.py", line 544, in _post_write
return self._write_service.post_write(org=org, bucket=bucket, body=body, precision=precision,
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
no_sync=no_sync,
^^^^^^^^^^^^^^^^
async_req=_async_req,
^^^^^^^^^^^^^^^^^^^^^
content_type="text/plain; charset=utf-8",
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
**kwargs)
^^^^^^^^^
File "/opt/python3-venv/lib/python3.13/site-packages/influxdb_client_3/write_client/service/write_service.py", line 48, in post_write
(data) = self.post_write_with_http_info(org, bucket, body, **kwargs) # noqa: E501
File "/opt/python3-venv/lib/python3.13/site-packages/influxdb_client_3/write_client/service/write_service.py", line 80, in post_write_with_http_info
return self.api_client.call_api(
~~~~~~~~~~~~~~~~~~~~~~~~^
path, 'POST',
^^^^^^^^^^^^^
...<12 lines>...
collection_formats={},
^^^^^^^^^^^^^^^^^^^^^^
urlopen_kw=kwargs.get('urlopen_kw', None))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/python3-venv/lib/python3.13/site-packages/influxdb_client_3/write_client/_sync/api_client.py", line 361, in call_api
return self.__call_api(resource_path, method,
~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^
path_params, query_params, header_params,
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
...<2 lines>...
_return_http_data_only, collection_formats,
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
_preload_content, _request_timeout, urlopen_kw)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/python3-venv/lib/python3.13/site-packages/influxdb_client_3/write_client/_sync/api_client.py", line 191, in __call_api
response_data = self.request(
method, url, query_params=query_params, headers=header_params,
post_params=post_params, body=body,
_preload_content=_preload_content,
_request_timeout=_request_timeout, **urlopen_kw)
File "/opt/python3-venv/lib/python3.13/site-packages/influxdb_client_3/write_client/_sync/api_client.py", line 406, in request
return self.rest_client.POST(url,
~~~~~~~~~~~~~~~~~~~~~^^^^^
query_params=query_params,
^^^^^^^^^^^^^^^^^^^^^^^^^^
...<4 lines>...
body=body,
^^^^^^^^^^
**urlopen_kw)
^^^^^^^^^^^^^
File "/opt/python3-venv/lib/python3.13/site-packages/influxdb_client_3/write_client/_sync/rest.py", line 291, in POST
return self.request("POST", url,
~~~~~~~~~~~~^^^^^^^^^^^^^
headers=headers,
^^^^^^^^^^^^^^^^
...<4 lines>...
body=body,
^^^^^^^^^^
**urlopen_kw)
^^^^^^^^^^^^^
File "/opt/python3-venv/lib/python3.13/site-packages/influxdb_client_3/write_client/_sync/rest.py", line 200, in request
r = self.pool_manager.request(
method, url,
...<3 lines>...
headers=headers,
**urlopen_kw)
File "/opt/python3-venv/lib/python3.13/site-packages/urllib3/_request_methods.py", line 143, in request
return self.request_encode_body(
~~~~~~~~~~~~~~~~~~~~~~~~^
method, url, fields=fields, headers=headers, **urlopen_kw
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
)
^
File "/opt/python3-venv/lib/python3.13/site-packages/urllib3/_request_methods.py", line 278, in request_encode_body
return self.urlopen(method, url, **extra_kw)
~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/python3-venv/lib/python3.13/site-packages/urllib3/poolmanager.py", line 459, in urlopen
response = conn.urlopen(method, u.request_uri, **kw)
File "/opt/python3-venv/lib/python3.13/site-packages/urllib3/connectionpool.py", line 841, in urlopen
retries = retries.increment(
method, url, error=new_e, _pool=self, _stacktrace=sys.exc_info()[2]
)
File "/opt/python3-venv/lib/python3.13/site-packages/urllib3/util/retry.py", line 449, in increment
raise reraise(type(error), error, _stacktrace)
~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/python3-venv/lib/python3.13/site-packages/urllib3/util/util.py", line 39, in reraise
raise value
File "/opt/python3-venv/lib/python3.13/site-packages/urllib3/connectionpool.py", line 787, in urlopen
response = self._make_request(
conn,
...<10 lines>...
**response_kw,
)
File "/opt/python3-venv/lib/python3.13/site-packages/urllib3/connectionpool.py", line 536, in _make_request
self._raise_timeout(err=e, url=url, timeout_value=read_timeout)
~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/python3-venv/lib/python3.13/site-packages/urllib3/connectionpool.py", line 367, in _raise_timeout
raise ReadTimeoutError(
self, url, f"Read timed out. (read timeout={timeout_value})"
) from err
urllib3.exceptions.ReadTimeoutError: HTTPConnectionPool(host='127.0.0.1', port=8181): Read timed out. (read timeout=9.999816542011104)
Hi @brambo123 Thank you for a very detailed report. I still have a question. Did you install infludb3 from a Docker container or something else..., and which version of influxdb3 .e.g: enterprise, cloud serverless,....
Hi @brambo123 Thank you for a very detailed report. I still have a question. Did you install infludb3 from a Docker container or something else..., and which version of influxdb3 .e.g: enterprise, cloud serverless,....
I'm running core version 3.3.0 in a Docker container.
Part of strace from influxdb, it seems like influxdb isn't responding at all.
6083 setsockopt(19, SOL_TCP, TCP_NODELAY, [1], 4) = 0
6083 getsockname(19, {sa_family=AF_INET, sin_port=htons(8181), sin_addr=inet_addr("172.18.0.2")}, [128 => 16]) = 0
6083 futex(0x7ff3879fd518, FUTEX_WAKE_PRIVATE, 1) = 1
6144 <... futex resumed>) = 0
6083 accept4(17, <unfinished ...>
6144 futex(0x7ff3873ff518, FUTEX_WAKE_PRIVATE, 1) = 1
6145 <... futex resumed>) = 0
6083 <... accept4 resumed>0x7ffff1d74900, [128], SOCK_CLOEXEC|SOCK_NONBLOCK) = -1 EAGAIN (Resource temporarily unavailable)
6145 futex(0x7ff3873ff518, FUTEX_WAIT_PRIVATE, 1, NULL <unfinished ...>
6144 recvfrom(19, <unfinished ...>
6083 futex(0x7ff3886fc398, FUTEX_WAIT_PRIVATE, 1, NULL <unfinished ...>
6144 <... recvfrom resumed>"GET /ping HTTP/1.1\r\nHost: 127.0.0.1:8181\r\nAccept-Encoding: identity\r\nAuthorization: Token xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx\r\nUser-Agent: influxdb3-python/0.15.0\r\nContent-Type: application/json\r\n\r\n", 8192, 0, NULL, NULL) = 255
6144 getrandom("\x35\x53\xd7\x3f\xbf\x4d\xa3\x8a", 8, 0) = 8
6144 getrandom("\xc0\xd2\xec\xfb\x70\xad\x17\x05\x92\x6b\x3f\x57\xd1\xb8\xc5\xbd", 16, 0) = 16
6144 writev(19, [{iov_base="HTTP/1.1 200 OK\r\ncontent-type: application/json\r\nrequest-id: 0198a819-6072-7fc3-bd75-335f6b920517\r\nx-influxdb-build: Core\r\nx-influxdb-version: 3.3.0\r\nx-request-id: 0198a819-6072-7fc3-bd75-335f6b920517\r\naccess-control-allow-origin: *\r\ntransfer-encoding: chunked\r\ndate: Thu, 14 Aug 2025 10:21:32 GMT\r\n\r\n", iov_len=301}, {iov_base="5F\r\n", iov_len=4}, {iov_base="{\"version\":\"3.3.0\",\"revision\":\"02d7ee1e6f\",\"process_id\":\"9175a822-9c8c-4b74-999f-9af3577ce925\"}", iov_len=95}, {iov_base="\r\n0\r\n\r\n", iov_len=7}], 4) = 407
6144 futex(0x7ff3879fd518, FUTEX_WAIT_PRIVATE, 1, NULL <unfinished ...>
6142 <... epoll_wait resumed>[{events=EPOLLIN|EPOLLOUT, data=0x7ff3883c2380}], 1024, 511) = 1
6142 recvfrom(19, "POST /api/v2/write?org=default&bucket=sensors&precision=ns HTTP/1.1\r\nHost: 127.0.0.1:8181\r\nAccept-Encoding: identity\r\nContent-Length: 31\r\nContent-Type: text/plain\r\nAccept: application/json\r\nAuthorization: Token xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx\r\nUser-Agent: influxdb3-python/0.15.0\r\n\r\nTEMP,name=test temperature=12.3", 8192, 0, NULL, NULL) = 375
6142 futex(0x7ff3873ff518, FUTEX_WAKE_PRIVATE, 1) = 1
6145 <... futex resumed>) = 0
6145 futex(0x7ff3873ff518, FUTEX_WAIT_PRIVATE, 1, NULL <unfinished ...>
6142 epoll_wait(3, <unfinished ...>
6157 <... clock_nanosleep resumed>0x7ff3823fcce0) = 0
6151 <... clock_nanosleep resumed>0x7ff384b7bce0) = 0
6157 clock_nanosleep(CLOCK_REALTIME, 0, {tv_sec=0, tv_nsec=100000000}, <unfinished ...>
6151 clock_nanosleep(CLOCK_REALTIME, 0, {tv_sec=0, tv_nsec=100000000}, <unfinished ...>
6157 <... clock_nanosleep resumed>0x7ff3823fcce0) = 0
6151 <... clock_nanosleep resumed>0x7ff384b7bce0) = 0
Edit: Even weirder: It seems like the data is being saved, but only becomes visible after some time.
@brambo123 Thank you. I will check
Hi @brambo123 I have run with the same code as you, but I can't reproduce the error. I'm using
- Docker image
influxdb:3.3.0-core - My os: macOS Apple Silicon
- influxdb3-python v0.15.0 Can you provide more details about your environment?