Как читать сжатые данные из концентратора событий Azure в Fabric с помощью Py-spark

Я отправляю сжатые данные в Центр событий, чтобы преодолеть жесткий лимит в 1 МБ в Центре событий Azure. Мне также нужно прочитать это в Py-spark и обновить таблицу дельты.

Сжатые данные, отправляемые в концентратор событий, в потоке Py-spark имеют нулевое значение. Как это прочитать?

Вот как я читаю из Центра событий

 df_stream_body = df_stream.select(F.from_json(F.col("body").cast("string"), message_schema).alias("Payload"))

Вот как я отправляю данные в Event Hub. event_data_batch = ожидайте Producer.create_batch()

    # Add events to the batch.
    body = '{"id": "90", "firstName": "Sudarshan70","middleName": "Kumar2","lastName": "Thakur2"}'  
    
    # Compress the JSON string using the gzip algorithm.
    compressed_body = gzip.compress(body.encode('utf-8'))

    # Encode the compressed JSON string to base64.
    encoded_compressed_body = base64.b64encode(compressed_body)
    

    event_data_batch.add(EventData(encoded_compressed_body))

Я пытался читать с опцией gzip, но это дало мне null.

df_stream  = spark.readStream.format("eventhubs")\
  .options(**ehConf)\
  .option("compression", "gzip") \
  .load() 

И вот как я читаю столбец body

def foreach_batch_function(df_stream, epoch_id):
              # temp_view_name = "stream_temp"
              df_stream_body = df_stream.select(F.from_json(F.col("body").cast("string"), message_schema).alias("Payload"))
              df_stream_body.createOrReplaceTempView("stream_temp")
              # df_stream_body.printSchema()

197
1

Ответ:

Решено

Вам необходимо выполнить распаковку столбца body, опция .option("compression", "gzip") предназначена для сжатых файлов, а не для сжатых данных в столбце.

Итак, необходимо создать пользовательскую функцию, которая распаковывает и декодирует данные. Используйте приведенный ниже код.

ОДФ

import  pyspark.sql.functions as F
from pyspark.sql.types import StringType
import gzip,base64,json


def unZip(binary_string):
    return gzip.decompress(base64.b64decode(binary_string)).decode('utf-8')

unzip = F.udf(unZip, StringType())

Затем создайте новый столбец с распакованными данными.

df.withColumn("bd", unzip(F.col("body").cast('string'))).display()

Выход: