Довольно просто: по сути, я читаю некоторые файлы паркета с диска, используя поляры, которые являются источником данных. Выполнение умеренно тяжелой обработки (несколько миллионов строк) для создания промежуточного кадра данных, а затем создание двух результатов, которые необходимо записать обратно в некоторую базу данных.
Polars рекомендует, насколько это возможно, использовать ленивые вычисления для оптимизации выполнения. Теперь окончательные результаты (result_1
и result_2
), очевидно, необходимо материализовать.
Но если я позвоню этим двум последовательно
#! /usr/bin/env python3
# encoding: utf-8
import polars as pl
...
result_1.collect() # Materialise result 1
result_2.collect() # Materialise result 2
Повторяется ли преобразование исходного кадра в промежуточный (общий предок)? Если да, то это явно нежелательно. В этом случае мне придется материализовать промежуточный кадр, а затем выполнить остальную обработку в режиме ожидания.
Есть ли документация от полярников об ожидаемом поведении и рекомендуемых методах в отношении этого сценария?
🤔 А знаете ли вы, что...
Python - это универсальный язык программирования.
Попробуйте pl.collect_all
собрать несколько кадров данных.
pl.collect_all([result1, result2])
Ссылка: https://docs.pola.rs/api/python/stable/reference/api/polars.collect_all.html
Честно говоря, я думаю, что для производственного кода лучше всего collect()
промежуточные результаты, а затем повторно использовать их в result_1
и result_2
. Было бы неплохо, чтобы он collect_all()
мог найти некоторые общие подграфы вычислений и кэшировать их, но я не думаю, что это происходит (хотя я особо не проверял Rust-код).
Возможно, вы могли бы попробовать обходной путь с помощью Polars.concat():
# let's say you have some intermediate LazyFrame with some calculations
lf_intermediate = lf.group_by("a").agg()
# and here you want to create 2 different results out of this DataFrame
# You can add a 'partitioning' column so you can separate your results after
# collection
lf1 = lf_intermediate.with_columns(pl.col.a * 2, partition=pl.lit(1))
lf2 = lf_intermediate.with_columns(pl.col.a / 3, partition=pl.lit(2))
# create combined result
df_result = pl.concat([lf1, lf2], how='diagonal').collect()
# and now separate results into different dataframes
df1 = lf_result.filter(pl.col.partition == 1)
df2 = lf_result.filter(pl.col.partition == 2)
Вы можете видеть, что промежуточная часть кэшируется во время расчета:
pl.concat([lf1, lf2], how='diagonal').explain(optimized=True)
UNION
PLAN 0:
WITH_COLUMNS:
[[(col("a")) * (2)], dyn int: 1.alias("partition"), null.cast(Float64).alias("c")]
CACHE[id: 0, cache_hits: 1]
AGGREGATE
[] BY [col("a")] FROM
DF ["a"]; PROJECT 1/1 COLUMNS; SELECTION: None
PLAN 1:
simple π 3/3 ["a", "partition", "c"]
WITH_COLUMNS:
[[(col("a")) / (3)].alias("c"), dyn int: 2.alias("partition")]
CACHE[id: 0, cache_hits: 1]
AGGREGATE
[] BY [col("a")] FROM
DF ["a"]; PROJECT 1/1 COLUMNS; SELECTION: None
END UNION