Как отфильтровать элементы в каждой строке столбца List[StringType] в искровом кадре данных?

Я хочу сохранить элементы, присутствующие в строке столбца ArrayType. Например, если строка моего искрового фрейма данных:

val df = sc.parallelize(Seq(Seq("A", "B"),Seq("C","X"),Seq("A", "B", "C", "D", "Z"))).toDF("column")

+---------------+
|         column|
+---------------+
|         [A, B]|
|         [C, X]|
|[A, B, C, D, Z]|
+---------------+

и у меня есть dictionary_list = ["A", "Z", "X", "Y"], я хочу, чтобы строка моего выходного фрейма данных была:

val outp_df

+---------------+
|         column|
+---------------+
|            [A]|
|            [X]|
|         [A, Z]|
+---------------+

Я пробовал array_contains, array_overlap и т. д., но результат, который я получаю, такой:

val result = df.where(array_contains(col("column"), "A"))
+---------------+
|         column|
+---------------+
|         [A, B]|
|[A, B, C, D, Z]|
+---------------+

Строки фильтруются, но я хочу фильтровать внутри самого списка/строки. Как я могу это сделать?


1
48
2

Ответы:

Используйте функцию filter:

val df1 = df.withColumn(
  "column",
  expr("filter(column, x -> x in ('A', 'Z', 'X', 'Y'))")
)

Решено

Результат, который вы получаете, имеет смысл, вы ТОЛЬКО выбираете строки, значение массива столбцов которых содержит «A», что является первой строкой и последней строкой. Вам нужна функция (НЕ функция фильтра SQL), которая получает входную последовательность строк и возвращает последовательность, содержащую только значения, которые существуют в вашем словаре. Вы можете использовать udfs следующим образом:

// rename this as you wish
val myCustomFilter: Seq[String] => Seq[String] =
  input => input.filter(dictionaryList.contains)
// register to your spark context
// rename this as you wish
spark.udf.register("custom_filter", myCustomFilter)

И тогда вам нужен оператор выбора, а не оператор фильтра! оператор фильтра выбирает только те row, которые могут удовлетворить предикату, это не то, что вам нужно.

Результат искровой оболочки:

scala> df.select(expr("custom_filter(column)")).show
+--------------+
|cusfil(column)|
+--------------+
|           [A]|
|           [X]|
|        [A, Z]|
+--------------+