Ich habe den folgenden Spark-Datenrahmen, der dynamisch erstellt wird:
val sf1 = StructField("name", StringType, nullable = true)
val sf2 = StructField("sector", StringType, nullable = true)
val sf3 = StructField("age", IntegerType, nullable = true)
val fields = List(sf1,sf2,sf3)
val schema = StructType(fields)
val row1 = Row("Andy","aaa",20)
val row2 = Row("Berta","bbb",30)
val row3 = Row("Joe","ccc",40)
val data = Seq(row1,row2,row3)
val df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
df.createOrReplaceTempView("people")
val sqlDF = spark.sql("SELECT * FROM people")
Jetzt muss ich jede Zeile und Spalte in sqlDF
durchlaufen, um jede Spalte zu drucken. Dies ist mein Versuch:
sqlDF.foreach { row =>
row.foreach { col => println(col) }
}
row
ist vom Typ Row
, kann aber nicht iteriert werden. Deshalb gibt dieser Code einen Kompilierungsfehler in row.foreach
aus. Wie iteriere ich jede Spalte in Row
?
Sie können Row
mit Seq
in toSeq
konvertieren. Sobald Sie Seq
ausgewählt haben, können Sie wie üblich mit foreach
, map
oder was immer Sie benötigen, durchlaufen
sqlDF.foreach { row =>
row.toSeq.foreach{col => println(col) }
}
Ausgabe:
Berta
bbb
30
Joe
Andy
aaa
20
ccc
40
Stellen Sie sich vor, Sie haben eine Dataframe
wie unten
val df = Seq(
("Andy","aaa", 20),
("Berta","bbb", 30),
("Joe","ccc", 40)).toDF("name","sector","age")
Das Schleifen Ihres Dataframe und das Extrahieren der Elemente aus dem Dataframe hilft df.foreach
nicht direkt. Um dies zu implementieren, können Sie einen der folgenden Ansätze wählen.
Ansatz 1 - Schleife mit rdd
Verwenden Sie rdd.collect
über Ihrem Dataframe . Die Variable row
enthält jede Zeile des Zeilentyps Dataframe von rdd
. Um jedes Element aus einer Zeile zu erhalten, verwenden Sie row.mkString(",")
, das den Wert jeder Zeile in durch Kommas getrennten Werten enthält. Mit der split
-Funktion (integrierte Funktion) können Sie auf jeden Spaltenwert der rdd
-Zeile mit Index zugreifen.
for (row <- df.rdd.collect)
{
var name = row.mkString(",").split(",")(0)
var sector = row.mkString(",").split(",")(1)
var age = row.mkString(",").split(",")(2)
}
Ansatz 2 - Verwenden von wo und wählen Sie
Sie können where
und select
direkt verwenden, um die Daten intern zu finden. Da der Index keine gebundene Ausnahme auslösen soll, wird eine if-Bedingung verwendet
if(df.where($"name" === "Andy").select(col("name")).collect().length >= 1)
name = df.where($"name" === "Andy").select(col("name")).collect()(0).get(0).toString
Ansatz 3 - Temp-Tabellen verwenden
Sie können den Datenrahmen als verführerisch registrieren, der im Speicher von spark gespeichert wird. Dann können Sie eine Auswahlabfrage wie andere Datenbanken verwenden, um die Daten abzufragen und diese dann zu sammeln und in einer Variablen zu speichern
df.registerTempTable("student")
name = sqlContext.sql("select name from student where name='Andy'").collect()(0).toString().replace("[","").replace("]","")
Sie sollten mkString
für Ihre Row
verwenden:
sqlDF.foreach { row =>
println(row.mkString(","))
}
Beachten Sie jedoch, dass dies in den Executors-JVMs gedruckt wird. Normalerweise wird die Ausgabe nicht angezeigt (es sei denn, Sie arbeiten mit master = local)
einfach ergebnis sammeln und dann für jeden bewerben
df.collect().foreach(println)
sqlDF.foreach
funktioniert nicht für mich, aber Ansatz 1 von @Sarath Avanavu answer funktioniert, aber es spielte auch irgendwann mit der Reihenfolge der Platten.
Ich habe einen weiteren Weg gefunden, der funktioniert
df.collect().foreach { row =>
println(row.mkString(","))
}