Я отправляю сжатые данные в Центр событий, чтобы преодолеть жесткий лимит в 1 МБ в Центре событий Azure. Мне также нужно прочитать это в Py-spark и обновить таблицу дельты.
Сжатые данные, отправляемые в концентратор событий, в потоке Py-spark имеют нулевое значение. Как это прочитать?
Вот как я читаю из Центра событий
df_stream_body = df_stream.select(F.from_json(F.col("body").cast("string"), message_schema).alias("Payload"))
Вот как я отправляю данные в Event Hub. event_data_batch = ожидайте Producer.create_batch()
# Add events to the batch.
body = '{"id": "90", "firstName": "Sudarshan70","middleName": "Kumar2","lastName": "Thakur2"}'
# Compress the JSON string using the gzip algorithm.
compressed_body = gzip.compress(body.encode('utf-8'))
# Encode the compressed JSON string to base64.
encoded_compressed_body = base64.b64encode(compressed_body)
event_data_batch.add(EventData(encoded_compressed_body))
Я пытался читать с опцией gzip, но это дало мне null.
df_stream = spark.readStream.format("eventhubs")\
.options(**ehConf)\
.option("compression", "gzip") \
.load()
И вот как я читаю столбец body
def foreach_batch_function(df_stream, epoch_id):
# temp_view_name = "stream_temp"
df_stream_body = df_stream.select(F.from_json(F.col("body").cast("string"), message_schema).alias("Payload"))
df_stream_body.createOrReplaceTempView("stream_temp")
# df_stream_body.printSchema()
Вам необходимо выполнить распаковку столбца body
, опция .option("compression", "gzip")
предназначена для сжатых файлов, а не для сжатых данных в столбце.
Итак, необходимо создать пользовательскую функцию, которая распаковывает и декодирует данные. Используйте приведенный ниже код.
ОДФ
import pyspark.sql.functions as F
from pyspark.sql.types import StringType
import gzip,base64,json
def unZip(binary_string):
return gzip.decompress(base64.b64decode(binary_string)).decode('utf-8')
unzip = F.udf(unZip, StringType())
Затем создайте новый столбец с распакованными данными.
df.withColumn("bd", unzip(F.col("body").cast('string'))).display()
Выход: