[stream/ws_client] Reading long stdout is truncated at 32768 chars
We are facing this issue in airflow and think it's due to python kubernetes-client. Earlier repo issue : https://github.com/kubernetes-client/python-base/issues/190 Airflow issue : https://github.com/apache/airflow/issues/39267 Error log :
[2024-04-26, 00:21:32 IST] {pod_manager.py:718} INFO - Checking if xcom sidecar container is started.
[2024-04-26, 00:21:32 IST] {pod_manager.py:721} INFO - The xcom sidecar container is started.
[2024-04-26, 00:21:32 IST] {pod_manager.py:798} INFO - Running command... if [ -s /airflow/xcom/return.json ]; then cat /airflow/xcom/return.json; else echo __airflow_xcom_result_empty__; fi
[2024-04-26, 00:21:36 IST] {pod_manager.py:798} INFO - Running command... if [ -s /airflow/xcom/return.json ]; then cat /airflow/xcom/return.json; else echo __airflow_xcom_result_empty__; fi
[2024-04-26, 00:21:40 IST] {pod_manager.py:798} INFO - Running command... if [ -s /airflow/xcom/return.json ]; then cat /airflow/xcom/return.json; else echo __airflow_xcom_result_empty__; fi
[2024-04-26, 00:21:44 IST] {pod_manager.py:798} INFO - Running command... if [ -s /airflow/xcom/return.json ]; then cat /airflow/xcom/return.json; else echo __airflow_xcom_result_empty__; fi
[2024-04-26, 00:21:52 IST] {pod_manager.py:798} INFO - Running command... if [ -s /airflow/xcom/return.json ]; then cat /airflow/xcom/return.json; else echo __airflow_xcom_result_empty__; fi
[2024-04-26, 00:21:52 IST] {pod_manager.py:798} INFO - Running command... kill -s SIGINT 1
[2024-04-26, 00:21:52 IST] {pod.py:909} INFO - Deleting pod: hps-mydata-generation-9lqygp1s
[2024-04-26, 00:21:52 IST] {taskinstance.py:2731} ERROR - Task failed with exception
Traceback (most recent call last):
File "/opt/airflow/plugins/operators/kubernetes_pod_operator.py", line 200, in execute
result = self.extract_xcom(pod=self.pod)
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 557, in extract_xcom
result = self.pod_manager.extract_xcom(pod)
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/utils/pod_manager.py", line 730, in extract_xcom
result = self.extract_xcom_json(pod)
File "/home/airflow/.local/lib/python3.10/site-packages/tenacity/__init__.py", line 289, in wrapped_f
return self(f, *args, **kw)
File "/home/airflow/.local/lib/python3.10/site-packages/tenacity/__init__.py", line 379, in __call__
do = self.iter(retry_state=retry_state)
File "/home/airflow/.local/lib/python3.10/site-packages/tenacity/__init__.py", line 325, in iter
raise retry_exc.reraise()
File "/home/airflow/.local/lib/python3.10/site-packages/tenacity/__init__.py", line 158, in reraise
raise self.last_attempt.result()
File "/usr/local/lib/python3.10/concurrent/futures/_base.py", line 451, in result
return self.__get_result()
File "/usr/local/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
raise self._exception
File "/home/airflow/.local/lib/python3.10/site-packages/tenacity/__init__.py", line 382, in __call__
result = fn(*args, **kwargs)
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/utils/pod_manager.py", line 765, in extract_xcom_json
json.loads(result)
File "/usr/local/lib/python3.10/json/__init__.py", line 346, in loads
return _default_decoder.decode(s)
File "/usr/local/lib/python3.10/json/decoder.py", line 337, in decode
obj, end = self.raw_decode(s, idx=_w(s, 0).end())
File "/usr/local/lib/python3.10/json/decoder.py", line 355, in raw_decode
raise JSONDecodeError("Expecting value", s, err.value) from None
json.decoder.JSONDecodeError: Expecting value: line 1 column 16385 (char 16384)
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 439, in _execute_task
result = _execute_callable(context=context, **execute_callable_kwargs)
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 414, in _execute_callable
return execute_callable(context=context, **execute_callable_kwargs)
File "/opt/airflow/plugins/operators/kubernetes_pod_operator.py", line 215, in execute
self.cleanup(
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 839, in cleanup
raise AirflowException
/help
It is unclear where the limitation is coming from. Whether the limitation is coming from this client or some underlying library
@roycaihw: This request has been marked as needing help from a contributor.
Guidelines
Please ensure that the issue body includes answers to the following questions:
- Why are we solving this issue?
- To address this issue, are there any code changes? If there are code changes, what needs to be done in the code and what places can the assignee treat as reference points?
- Does this issue have zero to low barrier of entry?
- How can the assignee reach out to you for help?
For more details on the requirements of such an issue, please see here and ensure that they are met.
If this request no longer meets these requirements, the label can be removed
by commenting with the /remove-help command.
In response to this:
/help
It is unclear where the limitation is coming from. Whether the limitation is coming from this client or some underlying library
Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository.
I'm not sure where the data is lost in between but I'm trying this code as suggested by @paul424 in this issue : https://github.com/kubernetes-client/python-base/issues/190. My understanding is that the data is lost while reading from the cat command,
def _extract_xcom(self, pod: V1Pod):
try:
self.log.info(f'Running command... cat {PodDefaults.XCOM_MOUNT_PATH}/return.json')
# Can't use _preload_content=True because that would return str(json.load(response))
client = kubernetes_stream(
self._client.connect_get_namespaced_pod_exec,
pod.metadata.name,
pod.metadata.namespace,
container=PodDefaults.SIDECAR_CONTAINER_NAME,
command=[
'/bin/sh',
'-c',
f'cat {PodDefaults.XCOM_MOUNT_PATH}/return.json',
],
stderr=True,
stdin=False,
stdout=True,
tty=False,
_preload_content=False,
_request_timeout=10,
)
client.run_forever(timeout=10)
resp = client.read_all()
self.log.info("Received {} ({}) ({} ... {}))".format(type(resp), len(resp), resp[:64], resp[-64:]))
# validate it's valid json
_ = json.loads(resp)
# Terminate the sidecar
_ = kubernetes_stream(
self._client.connect_get_namespaced_pod_exec,
pod.metadata.name,
pod.metadata.namespace,
container=PodDefaults.SIDECAR_CONTAINER_NAME,
command=[
'/bin/sh',
'-c',
'kill -s SIGINT 1',
],
stderr=True,
stdin=False,
stdout=True,
tty=False,
_preload_content=True,
_request_timeout=10,
)
return resp
except JSONDecodeError:
message = f'Failed to decode json document from pod: {pod.metadata.name}'
self.log.exception(message)
raise AirflowException(message)
except Exception as e:
message = f'Failed to extract xcom from pod: {pod.metadata.name}'
self.log.exception(message)
raise AirflowException(message)
Hi @paramjeet01
Did you get any luck on this issue? I am airflow user too, the issue was intermittent but now its been more consistent..
@kand617 , We use the above mentioned code to solve the issue.