wake-up-neo.net

Spark-Dataframe-Zeichenfolgespalte in mehrere Spalten aufteilen

Ich habe verschiedene Leute gesehen, die behaupten, dass Dataframe.explode eine nützliche Methode ist, aber dies führt zu mehr Zeilen als der ursprüngliche Datenrahmen, was ich gar nicht will. Ich möchte einfach das Dataframe-Äquivalent des sehr einfachen machen:

rdd.map(lambda row: row + [row.my_str_col.split('-')])

was etwas aussieht wie:

col1 | my_str_col
-----+-----------
  18 |  856-yygrm
 201 |  777-psgdg

und konvertiert es zu diesem:

col1 | my_str_col | _col3 | _col4
-----+------------+-------+------
  18 |  856-yygrm |   856 | yygrm
 201 |  777-psgdg |   777 | psgdg

Mir ist pyspark.sql.functions.split() bekannt, aber es führt zu einer verschachtelten Array-Spalte anstelle von zwei Top-Level-Spalten, wie ich möchte. 

Im Idealfall möchte ich, dass auch diese neuen Spalten benannt werden.

32
Peter Gaultney

pyspark.sql.functions.split() ist hier der richtige Ansatz - Sie müssen lediglich die verschachtelte ArrayType-Spalte in mehrere Spalten der obersten Ebene reduzieren. In diesem Fall, in dem jedes Array nur zwei Elemente enthält, ist dies sehr einfach. Sie verwenden einfach Column.getItem(), um jeden Teil des Arrays als Spalte selbst abzurufen:

split_col = pyspark.sql.functions.split(df['my_str_col'], '-')
df = df.withColumn('NAME1', split_col.getItem(0))
df = df.withColumn('NAME2', split_col.getItem(1))

Das Ergebnis wird sein:

col1 | my_str_col | NAME1 | NAME2
-----+------------+-------+------
  18 |  856-yygrm |   856 | yygrm
 201 |  777-psgdg |   777 | psgdg

Ich bin nicht sicher, wie ich dies in einem allgemeinen Fall lösen würde, in dem die verschachtelten Arrays von Zeile zu Zeile nicht dieselbe Größe hatten.

54
Peter Gaultney

Im Folgenden finden Sie eine Lösung für den allgemeinen Fall, bei der Sie nicht die Länge des Arrays vorab mit der Variablen collect oder udfs kennen müssen. Leider funktioniert dies nur für spark Version 2.1 und höher, da hierfür die Funktion posexplode benötigt wird.

Angenommen, Sie hatten den folgenden DataFrame:

df = spark.createDataFrame(
    [
        [1, 'A, B, C, D'], 
        [2, 'E, F, G'], 
        [3, 'H, I'], 
        [4, 'J']
    ]
    , ["num", "letters"]
)
df.show()
#+---+----------+
#|num|   letters|
#+---+----------+
#|  1|A, B, C, D|
#|  2|   E, F, G|
#|  3|      H, I|
#|  4|         J|
#+---+----------+

Teilen Sie die letters-Spalte auf und verwenden Sie posexplode, um das resultierende Array zusammen mit der Position im Array aufzulösen. Verwenden Sie als Nächstes pyspark.sql.functions.expr, um das Element am Index pos in diesem Array zu packen.

import pyspark.sql.functions as f

df.select(
        "num",
        f.split("letters", ", ").alias("letters"),
        f.posexplode(f.split("letters", ", ")).alias("pos", "val")
    )\
    .show()
#+---+------------+---+---+
#|num|     letters|pos|val|
#+---+------------+---+---+
#|  1|[A, B, C, D]|  0|  A|
#|  1|[A, B, C, D]|  1|  B|
#|  1|[A, B, C, D]|  2|  C|
#|  1|[A, B, C, D]|  3|  D|
#|  2|   [E, F, G]|  0|  E|
#|  2|   [E, F, G]|  1|  F|
#|  2|   [E, F, G]|  2|  G|
#|  3|      [H, I]|  0|  H|
#|  3|      [H, I]|  1|  I|
#|  4|         [J]|  0|  J|
#+---+------------+---+---+

Jetzt erstellen wir zwei neue Spalten aus diesem Ergebnis. Der erste ist der Name unserer neuen Spalte, die eine Verkettung von letter und dem Index im Array ist. Die zweite Spalte ist der Wert am entsprechenden Index im Array. Letzteres erhalten wir, indem wir die Funktionalität von pyspark.sql.functions.expr nutzen, wodurch wir Spaltenwerte als Parameter verwenden können.

df.select(
        "num",
        f.split("letters", ", ").alias("letters"),
        f.posexplode(f.split("letters", ", ")).alias("pos", "val")
    )\
    .drop("val")\
    .select(
        "num",
        f.concat(f.lit("letter"),f.col("pos").cast("string")).alias("name"),
        f.expr("letters[pos]").alias("val")
    )\
    .show()
#+---+-------+---+
#|num|   name|val|
#+---+-------+---+
#|  1|letter0|  A|
#|  1|letter1|  B|
#|  1|letter2|  C|
#|  1|letter3|  D|
#|  2|letter0|  E|
#|  2|letter1|  F|
#|  2|letter2|  G|
#|  3|letter0|  H|
#|  3|letter1|  I|
#|  4|letter0|  J|
#+---+-------+---+

Jetzt können wir nur groupBy die num und pivot den DataFrame. Wenn wir das alles zusammenstellen, bekommen wir:

df.select(
        "num",
        f.split("letters", ", ").alias("letters"),
        f.posexplode(f.split("letters", ", ")).alias("pos", "val")
    )\
    .drop("val")\
    .select(
        "num",
        f.concat(f.lit("letter"),f.col("pos").cast("string")).alias("name"),
        f.expr("letters[pos]").alias("val")
    )\
    .groupBy("num").pivot("name").agg(f.first("val"))\
    .show()
#+---+-------+-------+-------+-------+
#|num|letter0|letter1|letter2|letter3|
#+---+-------+-------+-------+-------+
#|  1|      A|      B|      C|      D|
#|  3|      H|      I|   null|   null|
#|  2|      E|      F|      G|   null|
#|  4|      J|   null|   null|   null|
#+---+-------+-------+-------+-------+
12
pault

Ich habe eine Lösung für den allgemeinen ungleichmäßigen Fall gefunden (oder wenn Sie die verschachtelten Spalten erhalten, die mit der Funktion .split () abgerufen werden):

import pyspark.sql.functions as f

@f.udf(StructType([StructField(col_3, StringType(), True),
                   StructField(col_4, StringType(), True)]))

 def splitCols(array):
    return array[0],  ''.join(array[1:len(array)])

 df = df.withColumn("name", splitCols(f.split(f.col("my_str_col"), '-')))\
        .select(df.columns+['name.*'])

Im Grunde müssen Sie nur alle vorhergehenden Spalten auswählen und die Spaltennamen. * Der verschachtelten Spalten. In diesem Fall erhalten Sie sie als zwei Spalten der obersten Ebene.

0
Jasminyas