wake-up-neo.net

Zeilen und Spalten in Spark-Datenrahmen iterieren

Ich habe den folgenden Spark-Datenrahmen, der dynamisch erstellt wird:

val sf1 = StructField("name", StringType, nullable = true)
val sf2 = StructField("sector", StringType, nullable = true)
val sf3 = StructField("age", IntegerType, nullable = true)

val fields = List(sf1,sf2,sf3)
val schema = StructType(fields)

val row1 = Row("Andy","aaa",20)
val row2 = Row("Berta","bbb",30)
val row3 = Row("Joe","ccc",40)

val data = Seq(row1,row2,row3)

val df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)

df.createOrReplaceTempView("people")
val sqlDF = spark.sql("SELECT * FROM people")

Jetzt muss ich jede Zeile und Spalte in sqlDF durchlaufen, um jede Spalte zu drucken. Dies ist mein Versuch:

sqlDF.foreach { row =>
  row.foreach { col => println(col) }
}

row ist vom Typ Row, kann aber nicht iteriert werden. Deshalb gibt dieser Code einen Kompilierungsfehler in row.foreach aus. Wie iteriere ich jede Spalte in Row?

12
ps0604

Sie können Row mit Seq in toSeq konvertieren. Sobald Sie Seq ausgewählt haben, können Sie wie üblich mit foreach, map oder was immer Sie benötigen, durchlaufen

    sqlDF.foreach { row => 
           row.toSeq.foreach{col => println(col) }
    }

Ausgabe:

Berta
bbb
30
Joe
Andy
aaa
20
ccc
40
6
SCouto

Stellen Sie sich vor, Sie haben eine Dataframe wie unten

val df = Seq(
          ("Andy","aaa", 20),     
          ("Berta","bbb", 30),
          ("Joe","ccc", 40)).toDF("name","sector","age")

Das Schleifen Ihres Dataframe und das Extrahieren der Elemente aus dem Dataframe hilft df.foreach nicht direkt. Um dies zu implementieren, können Sie einen der folgenden Ansätze wählen.

Ansatz 1 - Schleife mit rdd

Verwenden Sie rdd.collect über Ihrem Dataframe . Die Variable row enthält jede Zeile des Zeilentyps Dataframe von rdd. Um jedes Element aus einer Zeile zu erhalten, verwenden Sie row.mkString(","), das den Wert jeder Zeile in durch Kommas getrennten Werten enthält. Mit der split-Funktion (integrierte Funktion) können Sie auf jeden Spaltenwert der rdd-Zeile mit Index zugreifen.

for (row <- df.rdd.collect)
{   
    var name = row.mkString(",").split(",")(0)
    var sector = row.mkString(",").split(",")(1)
    var age = row.mkString(",").split(",")(2)   
}

Ansatz 2 - Verwenden von wo und wählen Sie

Sie können where und select direkt verwenden, um die Daten intern zu finden. Da der Index keine gebundene Ausnahme auslösen soll, wird eine if-Bedingung verwendet

if(df.where($"name" === "Andy").select(col("name")).collect().length >= 1)
    name = df.where($"name" === "Andy").select(col("name")).collect()(0).get(0).toString

Ansatz 3 - Temp-Tabellen verwenden

Sie können den Datenrahmen als verführerisch registrieren, der im Speicher von spark gespeichert wird. Dann können Sie eine Auswahlabfrage wie andere Datenbanken verwenden, um die Daten abzufragen und diese dann zu sammeln und in einer Variablen zu speichern

df.registerTempTable("student")
name = sqlContext.sql("select name from student where name='Andy'").collect()(0).toString().replace("[","").replace("]","")
6
Sarath Avanavu

Sie sollten mkString für Ihre Row verwenden:

sqlDF.foreach { row =>
  println(row.mkString(",")) 
}

Beachten Sie jedoch, dass dies in den Executors-JVMs gedruckt wird. Normalerweise wird die Ausgabe nicht angezeigt (es sei denn, Sie arbeiten mit master = local)

2
Raphael Roth

einfach ergebnis sammeln und dann für jeden bewerben

df.collect().foreach(println)

1
GANESH CHOKHARE

sqlDF.foreach funktioniert nicht für mich, aber Ansatz 1 von @Sarath Avanavu answer funktioniert, aber es spielte auch irgendwann mit der Reihenfolge der Platten. 

Ich habe einen weiteren Weg gefunden, der funktioniert

df.collect().foreach { row =>
   println(row.mkString(","))
}
0
Naresh Joshi