wake-up-neo.net

Der beste Weg, um den maximalen Wert in einer Spark-Dataframe-Spalte abzurufen

Ich versuche herauszufinden, wie Sie den besten Wert in einer Spark-Dataframe-Spalte ermitteln können.

Betrachten Sie das folgende Beispiel:

df = spark.createDataFrame([(1., 4.), (2., 5.), (3., 6.)], ["A", "B"])
df.show()

Was schafft:

+---+---+
|  A|  B|
+---+---+
|1.0|4.0|
|2.0|5.0|
|3.0|6.0|
+---+---+

Mein Ziel ist es, den größten Wert in Spalte A zu finden (durch Inspektion ist dies 3,0). Mit PySpark kann ich mir vier Ansätze vorstellen:

# Method 1: Use describe()
float(df.describe("A").filter("summary = 'max'").select("A").collect()[0].asDict()['A'])

# Method 2: Use SQL
df.registerTempTable("df_table")
spark.sql("SELECT MAX(A) as maxval FROM df_table").collect()[0].asDict()['maxval']

# Method 3: Use groupby()
df.groupby().max('A').collect()[0].asDict()['max(A)']

# Method 4: Convert to RDD
df.select("A").rdd.max()[0]

Jede der oben genannten Antworten gibt die richtige Antwort, aber bei Fehlen eines Spark-Profiling-Tools kann ich nicht sagen, welches die beste Lösung ist. 

Irgendwelche Ideen aus Intuition oder Empirie, welche der oben genannten Methoden hinsichtlich der Spark-Laufzeit oder der Ressourcennutzung am effizientesten ist oder ob es eine direktere Methode als die oben genannten gibt?

36
xenocyon
>df1.show()
+-----+--------------------+--------+----------+-----------+
|floor|           timestamp|     uid|         x|          y|
+-----+--------------------+--------+----------+-----------+
|    1|2014-07-19T16:00:...|600dfbe2| 103.79211|71.50419418|
|    1|2014-07-19T16:00:...|5e7b40e1| 110.33613|100.6828393|
|    1|2014-07-19T16:00:...|285d22e4|110.066315|86.48873585|
|    1|2014-07-19T16:00:...|74d917a1| 103.78499|71.45633073|

>row1 = df1.agg({"x": "max"}).collect()[0]
>print row1
Row(max(x)=110.33613)
>print row1["max(x)"]
110.33613

Die Antwort ist fast dieselbe wie bei method3. aber scheinbar kann "asDict ()" in method3 entfernt werden

28
Burt

Der maximale Wert für eine bestimmte Spalte eines Datenrahmens kann mit - erreicht werden.

your_max_value = df.agg({"your-column": "max"}).collect()[0][0]

11

Anmerkung: Spark soll mit Big Data - Distributed Computing arbeiten. Die Größe des DataFrame-Beispiels ist sehr klein, sodass die Reihenfolge der realen Beispiele in Bezug auf das kleine Beispiel geändert werden kann.

Am langsamsten: Methode_1, weil .describe ("A") min, max, mittel, stddev und count (5 Berechnungen über die gesamte Spalte) berechnet. 

Mittel: Methode_4, weil .rdd (DF in RDD-Umwandlung) den Prozess verlangsamt.

Schneller: Method_3 ~ Method_2 ~ Method_5, da die Logik sehr ähnlich ist, so folgt der Katalysator-Optimierer von Spark mit einer minimalen Anzahl von Operationen einer sehr ähnlichen Logik. (.asDict () fügt etwas mehr Zeit hinzu und vergleicht 3,2 mit 5) 

import pandas as pd
import time

time_dict = {}

dfff = self.spark.createDataFrame([(1., 4.), (2., 5.), (3., 6.)], ["A", "B"])
#--  For bigger/realistic dataframe just uncomment the following 3 lines
#lst = list(np.random.normal(0.0, 100.0, 100000))
#pdf = pd.DataFrame({'A': lst, 'B': lst, 'C': lst, 'D': lst})
#dfff = self.sqlContext.createDataFrame(pdf)

tic1 = int(round(time.time() * 1000))
# Method 1: Use describe()
max_val = float(dfff.describe("A").filter("summary = 'max'").select("A").collect()[0].asDict()['A'])
tac1 = int(round(time.time() * 1000))
time_dict['m1']= tac1 - tic1
print (max_val)

tic2 = int(round(time.time() * 1000))
# Method 2: Use SQL
dfff.registerTempTable("df_table")
max_val = self.sqlContext.sql("SELECT MAX(A) as maxval FROM df_table").collect()[0].asDict()['maxval']
tac2 = int(round(time.time() * 1000))
time_dict['m2']= tac2 - tic2
print (max_val)

tic3 = int(round(time.time() * 1000))
# Method 3: Use groupby()
max_val = dfff.groupby().max('A').collect()[0].asDict()['max(A)']
tac3 = int(round(time.time() * 1000))
time_dict['m3']= tac3 - tic3
print (max_val)

tic4 = int(round(time.time() * 1000))
# Method 4: Convert to RDD
max_val = dfff.select("A").rdd.max()[0]
tac4 = int(round(time.time() * 1000))
time_dict['m4']= tac4 - tic4
print (max_val)

tic5 = int(round(time.time() * 1000))
# Method 4: Convert to RDD
max_val = dfff.agg({"A": "max"}).collect()[0][0]
tac5 = int(round(time.time() * 1000))
time_dict['m5']= tac5 - tic5
print (max_val)

print time_dict

Ergebnis eines Edge-Knotens eines Clusters in Millisekunden (ms): 

klein DF (ms): {'m1': 7096, 'm2': 205, 'm3': 165, 'm4': 211, 'm5': 180}

größer DF (ms): {'m1': 10260, 'm2': 452, 'm3': 465, 'm4': 916, 'm5': 373}

9

Eine andere Möglichkeit, dies zu tun:

df.select(f.max(f.col("A")).alias("MAX")).limit(1).collect()[0].MAX

Nach meinen Angaben habe ich folgende Benchmarks erhalten:

df.select(f.max(f.col("A")).alias("MAX")).limit(1).collect()[0].MAX
CPU times: user 2.31 ms, sys: 3.31 ms, total: 5.62 ms
Wall time: 3.7 s

df.select("A").rdd.max()[0]
CPU times: user 23.2 ms, sys: 13.9 ms, total: 37.1 ms
Wall time: 10.3 s

df.agg({"A": "max"}).collect()[0][0]
CPU times: user 0 ns, sys: 4.77 ms, total: 4.77 ms
Wall time: 3.75 s

Alle geben die gleiche Antwort

7
luminousmen

Falls Sie sich wundern möchten, wie Sie Scala (mit Spark 2.0. +) Verwenden, gehen Sie hier:

scala> df.createOrReplaceTempView("TEMP_DF")
scala> val myMax = spark.sql("SELECT MAX(x) as maxval FROM TEMP_DF").
    collect()(0).getInt(0)
scala> print(myMax)
117
3
Boern

Ich glaube, die beste Lösung wird head() verwenden.

Betrachten Sie Ihr Beispiel:

+---+---+
|  A|  B|
+---+---+
|1.0|4.0|
|2.0|5.0|
|3.0|6.0|
+---+---+

Mit der agg- und max-Methode von python können wir den Wert wie folgt erhalten:

from pyspark.sql.functions import max df.agg(max(df.A)).head()[0]

Dies gibt Folgendes zurück: 3.0

Stellen Sie sicher, dass Sie den richtigen Import haben:
from pyspark.sql.functions import max Die hier verwendete max-Funktion ist die pySPark-SQL-Bibliotheksfunktion, nicht die standardmäßige max-Funktion von python.

1

Das folgende Beispiel zeigt, wie der maximale Wert in einer Spark dataframe-Spalte abgerufen wird.

from pyspark.sql.functions import max

df = sql_context.createDataFrame([(1., 4.), (2., 5.), (3., 6.)], ["A", "B"])
df.show()
+---+---+
|  A|  B|
+---+---+
|1.0|4.0|
|2.0|5.0|
|3.0|6.0|
+---+---+

result = df.select([max("A")]).show()
result.show()
+------+
|max(A)|
+------+
|   3.0|
+------+

print result.collect()[0]['max(A)']
3.0

Ähnlich können min, mean usw. wie folgt berechnet werden:

from pyspark.sql.functions import mean, min, max

result = df.select([mean("A"), min("A"), max("A")])
result.show()
+------+------+------+
|avg(A)|min(A)|max(A)|
+------+------+------+
|   2.0|   1.0|   3.0|
+------+------+------+
1
Nandeesh

in Pyspark können Sie dies tun:

max(df.select('ColumnName').rdd.flatMap(lambda x: x).collect())
0
Grant Shannon

Hier ist ein fauler Weg, dies zu tun, indem Sie einfach Statistiken berechnen:

df.write.mode("overwrite").saveAsTable("sampleStats")
Query = "ANALYZE TABLE sampleStats COMPUTE STATISTICS FOR COLUMNS " + ','.join(df.columns)
spark.sql(Query)

df.describe('ColName')

oder

spark.sql("Select * from sampleStats").describe('ColName')

oder Sie können eine Hive Shell öffnen und 

describe formatted table sampleStats;

Sie sehen die Statistiken in den Eigenschaften - min, max, eindeutig, Nullen usw.

0
user 923227
import org.Apache.spark.sql.SparkSession
import org.Apache.spark.sql.functions._

val testDataFrame = Seq(
  (1.0, 4.0), (2.0, 5.0), (3.0, 6.0)
).toDF("A", "B")

val (maxA, maxB) = testDataFrame.select(max("A"), max("B"))
  .as[(Double, Double)]
  .first()
println(maxA, maxB)

Das Ergebnis ist (3.0,6.0), das ist das gleiche wie bei testDataFrame.agg(max($"A"), max($"B")).collect()(0), jedoch gibt testDataFrame.agg(max($"A"), max($"B")).collect()(0) eine Liste zurück, [3.0,6.0].

0
hello-world