Невозможно передать данные в xcom в воздушном потоке

from airflow.operators.python import get_current_context


context = get_current_context()
ti = context['ti']
ti.xcom_push(key = "file", value = doc )

У меня есть приведенный выше код в задаче, а doc — это данные, которые я хочу передать в xcom. Выдает следующую трассировку стека ошибок:

Traceback (most recent call last):
  File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/decorators/base.py", line 217, in execute
    return_value = super().execute(context)
  File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/operators/python.py", line 175, in execute
    return_value = self.execute_callable()
  File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/operators/python.py", line 192, in execute_callable
    return self.python_callable(*self.op_args, **self.op_kwargs)
  File "/opt/bitnami/airflow/dags/rover_ocr_pipeline.py", line 65, in retrieve
    ti.xcom_push(key = "file", value = doc )
  File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/utils/session.py", line 75, in wrapper
    return func(*args, session=session, **kwargs)
  File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 2294, in xcom_push
    XCom.set(
  File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/utils/session.py", line 72, in wrapper
    return func(*args, **kwargs)
  File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/models/xcom.py", line 234, in set
    value = cls.serialize_value(
  File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/models/xcom.py", line 627, in serialize_value
    return json.dumps(value, cls=XComEncoder).encode("UTF-8")
  File "/opt/bitnami/python/lib/python3.9/json/__init__.py", line 234, in dumps
    return cls(
  File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/utils/json.py", line 176, in encode
    return super().encode(o)
  File "/opt/bitnami/python/lib/python3.9/json/encoder.py", line 199, in encode
    chunks = self.iterencode(o, _one_shot=True)
  File "/opt/bitnami/python/lib/python3.9/json/encoder.py", line 257, in iterencode
    return _iterencode(o, 0)
  File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/utils/json.py", line 153, in default
    CLASSNAME: o.__module__ + "." + o.__class__.__qualname__,
AttributeError: 'bytes' object has no attribute '__module__'

Это работало до сих пор, я предполагаю, что это проблема с версией воздушного потока. Раньше я использовал 2.3.4, теперь использую 2.5.0.

Airflow работает в кластере kubernetes и использует образ airflow:2.5.0-debian-11-r11.

🤔 А знаете ли вы, что...
Python популярен в анализе данных и машинном обучении с помощью библиотеки scikit-learn.


58
1

Ответ:

Решено

Переходя от комментариев к фактическому ответу, полный разговор см. В комментариях выше.

XCOM пытается преобразовать все в строку перед сохранением в таблицах XCOM. В этом случае, поскольку байты - это класс, он пытался сериализовать его, что невозможно. Преобразование байтов в обычную строку с помощью base64, кодирующего байты, позволило сохранить их в xcom.

Хотя, вероятно, не стоит затрачивать усилий только в этом случае, это можно было бы решить автоматически, создав собственный серверный модуль xcom, который точно определяет работу со строками байтов и выполняет преобразование за кулисами.