Wie verketten wir zwei Spalten in einem Apache Spark DataFrame? Gibt es eine Funktion in Spark SQL, die wir verwenden können?
Mit Raw SQL können Sie CONCAT
verwenden:
In Python
df = sqlContext.createDataFrame([("foo", 1), ("bar", 2)], ("k", "v"))
df.registerTempTable("df")
sqlContext.sql("SELECT CONCAT(k, ' ', v) FROM df")
In Scala
import sqlContext.implicits._
val df = sc.parallelize(Seq(("foo", 1), ("bar", 2))).toDF("k", "v")
df.registerTempTable("df")
sqlContext.sql("SELECT CONCAT(k, ' ', v) FROM df")
Seit Spark 1.5.0 können Sie die concat
-Funktion mit der DataFrame-API verwenden:
In Python:
from pyspark.sql.functions import concat, col, lit
df.select(concat(col("k"), lit(" "), col("v")))
In Scala:
import org.Apache.spark.sql.functions.{concat, lit}
df.select(concat($"k", lit(" "), $"v"))
Es gibt auch eine concat_ws
-Funktion, die ein String-Trennzeichen als erstes Argument verwendet.
So können Sie benutzerdefinierte Benennungen vornehmen
import pyspark
from pyspark.sql import functions as sf
sc = pyspark.SparkContext()
sqlc = pyspark.SQLContext(sc)
df = sqlc.createDataFrame([('row11','row12'), ('row21','row22')], ['colname1', 'colname2'])
df.show()
gibt,
+--------+--------+
|colname1|colname2|
+--------+--------+
| row11| row12|
| row21| row22|
+--------+--------+
erstellen Sie eine neue Spalte durch Verketten:
df = df.withColumn('joined_column',
sf.concat(sf.col('colname1'),sf.lit('_'), sf.col('colname2')))
df.show()
+--------+--------+-------------+
|colname1|colname2|joined_column|
+--------+--------+-------------+
| row11| row12| row11_row12|
| row21| row22| row21_row22|
+--------+--------+-------------+
Wenn Sie DF verwenden möchten, können Sie mithilfe einer udf eine neue Spalte hinzufügen, die auf vorhandenen Spalten basiert.
val sqlContext = new SQLContext(sc)
case class MyDf(col1: String, col2: String)
//here is our dataframe
val df = sqlContext.createDataFrame(sc.parallelize(
Array(MyDf("A", "B"), MyDf("C", "D"), MyDf("E", "F"))
))
//Define a udf to concatenate two passed in string values
val getConcatenated = udf( (first: String, second: String) => { first + " " + second } )
//use withColumn method to add a new column called newColName
df.withColumn("newColName", getConcatenated($"col1", $"col2")).select("newColName", "col1", "col2").show()
Eine Möglichkeit, String-Spalten in Spark Scala zu verketten, ist concat
.
auf Nullwerte prüfen . Wenn eine der Spalten null ist, ist das Ergebnis auch dann null, wenn eine der anderen Spalten Informationen enthält.
Verwendung von concat
und withColumn
:
val newDf =
df.withColumn(
"NEW_COLUMN",
concat(
when(col("COL1").isNotNull, col("COL1")).otherwise(lit("null")),
when(col("COL2").isNotNull, col("COL2")).otherwise(lit("null"))))
Verwendung von concat
und select
:
val newDf = df.selectExpr("concat(nvl(COL1, ''), nvl(COL2, '')) as NEW_COLUMN")
Bei beiden Ansätzen haben Sie einen NEW_COLUMN, dessen Wert eine Verkettung der Spalten ist: COL1 und COL2 aus Ihrer ursprünglichen Datenbank.
Hier ist eine andere Möglichkeit, dies für Pyspark zu tun:
#import concat and lit functions from pyspark.sql.functions
from pyspark.sql.functions import concat, lit
#Create your data frame
countryDF = sqlContext.createDataFrame([('Ethiopia',), ('Kenya',), ('Uganda',), ('Rwanda',)], ['East Africa'])
#Use select, concat, and lit functions to do the concatenation
personDF = countryDF.select(concat(countryDF['East Africa'], lit('n')).alias('East African'))
#Show the new data frame
personDF.show()
----------RESULT-------------------------
84
+------------+
|East African|
+------------+
| Ethiopian|
| Kenyan|
| Ugandan|
| Rwandan|
+------------+
Hier ein Vorschlag, wenn Sie die Nummer oder den Namen der Spalten im Dataframe nicht kennen.
val dfResults = dfSource.select(concat_ws(",",dfSource.columns.map(c => col(c)): _*))
Ab Spark 2.3 ( SPARK-22771 ) unterstützt Spark SQL den Verkettungsoperator ||
.
Zum Beispiel;
val df = spark.sql("select _c1 || _c2 as concat_column from <table_name>")
In Spark 2.3.0 können Sie Folgendes tun:
spark.sql( """ select '1' || column_a from table_a """)
In Java können Sie dazu mehrere Spalten verketten. Der Beispielcode soll Ihnen ein Szenario und die Verwendung zum besseren Verständnis vermitteln.
SparkSession spark = JavaSparkSessionSingleton.getInstance(rdd.context().getConf());
Dataset<Row> reducedInventory = spark.sql("select * from table_name")
.withColumn("concatenatedCol",
concat(col("col1"), lit("_"), col("col2"), lit("_"), col("col3")));
class JavaSparkSessionSingleton {
private static transient SparkSession instance = null;
public static SparkSession getInstance(SparkConf sparkConf) {
if (instance == null) {
instance = SparkSession.builder().config(sparkConf)
.getOrCreate();
}
return instance;
}
}
Der obige Code verkettet col1, col2, col3 durch "_" getrennt, um eine Spalte mit dem Namen "concatenatedCol" zu erstellen.
In der Tat gibt es einige schöne integrierte Abstraktionen, mit denen Sie Ihre Verkettung durchführen können, ohne eine benutzerdefinierte Funktion implementieren zu müssen. Da Sie Spark SQL erwähnt haben, versuchen Sie vermutlich, es als deklarativen Befehl über spark.sql () zu übergeben. In diesem Fall können Sie auf einfache Weise den folgenden SQL-Befehl eingeben: SELECT CONCAT(col1, '<delimiter>', col2, ...) AS concat_column_name FROM <table_name>;
Außerdem können Sie ab Spark 2.3.0 Befehle in folgenden Zeilen verwenden: SELECT col1 || col2 AS concat_column_name FROM <table_name>;
Hierbei handelt es sich um das bevorzugte Trennzeichen (kann auch ein leerer Bereich sein) und um die temporäre oder permanente Tabelle, aus der Sie lesen möchten.
Eine andere Möglichkeit, dies in pySpark mit sqlContext zu tun ...
#Suppose we have a dataframe:
df = sqlContext.createDataFrame([('row1_1','row1_2')], ['colname1', 'colname2'])
# Now we can concatenate columns and assign the new column a name
df = df.select(concat(df.colname1, df.colname2).alias('joined_colname'))