Кадры данных с общим ленивым предком означают повторяющиеся вычисления?

Зависимость DAG

Описание

Довольно просто: по сути, я читаю некоторые файлы паркета с диска, используя поляры, которые являются источником данных. Выполнение умеренно тяжелой обработки (несколько миллионов строк) для создания промежуточного кадра данных, а затем создание двух результатов, которые необходимо записать обратно в некоторую базу данных.

Технологический стек
  • Убунту 22.04
  • Питон 3.10
  • Поляры 1.2.1
Вопрос

Polars рекомендует, насколько это возможно, использовать ленивые вычисления для оптимизации выполнения. Теперь окончательные результаты (result_1 и result_2), очевидно, необходимо материализовать.

Но если я позвоню этим двум последовательно

#! /usr/bin/env python3
# encoding: utf-8
import polars as pl
...
result_1.collect() # Materialise result 1
result_2.collect() # Materialise result 2

Повторяется ли преобразование исходного кадра в промежуточный (общий предок)? Если да, то это явно нежелательно. В этом случае мне придется материализовать промежуточный кадр, а затем выполнить остальную обработку в режиме ожидания.

Есть ли документация от полярников об ожидаемом поведении и рекомендуемых методах в отношении этого сценария?

🤔 А знаете ли вы, что...
Python - это универсальный язык программирования.


2
50
2

Ответы:

Попробуйте pl.collect_all собрать несколько кадров данных.

pl.collect_all([result1, result2])

Ссылка: https://docs.pola.rs/api/python/stable/reference/api/polars.collect_all.html


Решено

Честно говоря, я думаю, что для производственного кода лучше всего collect() промежуточные результаты, а затем повторно использовать их в result_1 и result_2. Было бы неплохо, чтобы он collect_all() мог найти некоторые общие подграфы вычислений и кэшировать их, но я не думаю, что это происходит (хотя я особо не проверял Rust-код).

Возможно, вы могли бы попробовать обходной путь с помощью Polars.concat():

# let's say you have some intermediate LazyFrame with some calculations
lf_intermediate = lf.group_by("a").agg()

# and here you want to create 2 different results out of this DataFrame
# You can add a 'partitioning' column so you can separate your results after
# collection
lf1 = lf_intermediate.with_columns(pl.col.a * 2, partition=pl.lit(1))
lf2 = lf_intermediate.with_columns(pl.col.a / 3, partition=pl.lit(2))

# create combined result
df_result = pl.concat([lf1, lf2], how='diagonal').collect()

# and now separate results into different dataframes
df1 = lf_result.filter(pl.col.partition == 1)
df2 = lf_result.filter(pl.col.partition == 2)

Вы можете видеть, что промежуточная часть кэшируется во время расчета:

pl.concat([lf1, lf2], how='diagonal').explain(optimized=True)

UNION
  PLAN 0:
     WITH_COLUMNS:
     [[(col("a")) * (2)], dyn int: 1.alias("partition"), null.cast(Float64).alias("c")] 
      CACHE[id: 0, cache_hits: 1]
        AGGREGATE
            [] BY [col("a")] FROM
          DF ["a"]; PROJECT 1/1 COLUMNS; SELECTION: None
  PLAN 1:
    simple π 3/3 ["a", "partition", "c"]
       WITH_COLUMNS:
       [[(col("a")) / (3)].alias("c"), dyn int: 2.alias("partition")] 
        CACHE[id: 0, cache_hits: 1]
          AGGREGATE
            [] BY [col("a")] FROM
            DF ["a"]; PROJECT 1/1 COLUMNS; SELECTION: None
END UNION