from airflow.operators.python import get_current_context
context = get_current_context()
ti = context['ti']
ti.xcom_push(key = "file", value = doc )
У меня есть приведенный выше код в задаче, а doc — это данные, которые я хочу передать в xcom. Выдает следующую трассировку стека ошибок:
Traceback (most recent call last):
File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/decorators/base.py", line 217, in execute
return_value = super().execute(context)
File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/operators/python.py", line 175, in execute
return_value = self.execute_callable()
File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/operators/python.py", line 192, in execute_callable
return self.python_callable(*self.op_args, **self.op_kwargs)
File "/opt/bitnami/airflow/dags/rover_ocr_pipeline.py", line 65, in retrieve
ti.xcom_push(key = "file", value = doc )
File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/utils/session.py", line 75, in wrapper
return func(*args, session=session, **kwargs)
File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 2294, in xcom_push
XCom.set(
File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/utils/session.py", line 72, in wrapper
return func(*args, **kwargs)
File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/models/xcom.py", line 234, in set
value = cls.serialize_value(
File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/models/xcom.py", line 627, in serialize_value
return json.dumps(value, cls=XComEncoder).encode("UTF-8")
File "/opt/bitnami/python/lib/python3.9/json/__init__.py", line 234, in dumps
return cls(
File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/utils/json.py", line 176, in encode
return super().encode(o)
File "/opt/bitnami/python/lib/python3.9/json/encoder.py", line 199, in encode
chunks = self.iterencode(o, _one_shot=True)
File "/opt/bitnami/python/lib/python3.9/json/encoder.py", line 257, in iterencode
return _iterencode(o, 0)
File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/utils/json.py", line 153, in default
CLASSNAME: o.__module__ + "." + o.__class__.__qualname__,
AttributeError: 'bytes' object has no attribute '__module__'
Это работало до сих пор, я предполагаю, что это проблема с версией воздушного потока. Раньше я использовал 2.3.4, теперь использую 2.5.0.
Airflow работает в кластере kubernetes и использует образ airflow:2.5.0-debian-11-r11.
🤔 А знаете ли вы, что...
Python популярен в анализе данных и машинном обучении с помощью библиотеки scikit-learn.
Переходя от комментариев к фактическому ответу, полный разговор см. В комментариях выше.
XCOM пытается преобразовать все в строку перед сохранением в таблицах XCOM. В этом случае, поскольку байты - это класс, он пытался сериализовать его, что невозможно. Преобразование байтов в обычную строку с помощью base64, кодирующего байты, позволило сохранить их в xcom.
Хотя, вероятно, не стоит затрачивать усилий только в этом случае, это можно было бы решить автоматически, создав собственный серверный модуль xcom, который точно определяет работу со строками байтов и выполняет преобразование за кулисами.