wake-up-neo.net

Unterschied zwischen DataFrame, Dataset und RDD in Spark

Ich frage mich nur, was ist der Unterschied zwischen einer RDD und DataFrame (Spark 2.0.0 DataFrame ist ein reiner Typalias für Dataset[Row]) in Apache Spark? 

Kannst du eine in die andere umwandeln?

185
menorah84

Eine DataFrame ist gut mit einer Google-Suche nach "DataFrame-Definition" definiert:

Ein Datenrahmen ist eine Tabelle oder zweidimensionale Array-ähnliche Struktur in Jede Spalte enthält Messwerte für eine Variable und jede Zeile enthält einen Fall.

Daher hat eine Variable DataFrame aufgrund ihres Tabellenformats zusätzliche Metadaten, die es Spark ermöglichen, bestimmte Optimierungen für die abgeschlossene Abfrage auszuführen. 

Eine RDD ist dagegen lediglich einResilientDistributedDataset, das eher eine Blackbox von Daten ist, die nicht als Operationen optimiert werden können die dagegen ausgeführt werden können, sind nicht so eingeschränkt.

Sie können jedoch mit der RDD-Methode von einem DataFrame zu einer rdd und von der RDD zu einer DataFrame (wenn das RDD in Tabellenform vorliegt) über die toDF-Methode wechseln

Im Allgemeinen es wird empfohlen, eine DataFrame zu verwenden, da dies aufgrund der eingebauten Abfrageoptimierung möglich ist.

185
Justin Pihony

Als erstes wurde DataFrame aus SchemaRDD entwickelt.

depreated method toSchemaRDD

Ja. Eine Konvertierung zwischen Dataframe und RDD ist absolut möglich.

Unten finden Sie einige Codebeispiele.

  • df.rdd Ist RDD[Row]

Nachfolgend finden Sie einige Optionen zum Erstellen von Datenrahmen.

  • 1) yourrddOffrow.toDF Wird in DataFrame konvertiert.

  • 2) Verwenden von createDataFrame aus dem SQL-Kontext

    val df = spark.createDataFrame(rddOfRow, schema)

wobei das Schema aus einigen der folgenden Optionen stammen kann wie von Nice beschrieben SO post ..
Aus scala case class und scala reflection api

import org.Apache.spark.sql.catalyst.ScalaReflection
val schema = ScalaReflection.schemaFor[YourScalacaseClass].dataType.asInstanceOf[StructType]

ODER mit Encoders

import org.Apache.spark.sql.Encoders
val mySchema = Encoders.product[MyCaseClass].schema

wie in Schema beschrieben kann auch mit StructType und StructField erstellt werden

val schema = new StructType()
  .add(StructField("id", StringType, true))
  .add(StructField("col1", DoubleType, true))
  .add(StructField("col2", DoubleType, true)) etc...

image description

In der Tat gibt es jetzt 3 Apache Spark APIs ..

enter image description here

  1. RDD API:

Die API RDD (Resilient Distributed Dataset) befindet sich seit der Version 1.0 in Spark).

Die RDD-API bietet viele Transformationsmethoden, z. B. map (), filter () und reduce (), um Berechnungen für die Daten durchzuführen. Jede dieser Methoden führt zu einem neuen RDD, das die transformierten Daten darstellt. Diese Methoden definieren jedoch nur die auszuführenden Operationen, und die Transformationen werden erst ausgeführt, wenn eine Aktionsmethode aufgerufen wird. Beispiele für Aktionsmethoden sind collect () und saveAsObjectFile ().

RDD Beispiel:

rdd.filter(_.age > 21) // transformation
   .map(_.last)// transformation
.saveAsObjectFile("under21.bin") // action

Beispiel: Filtern nach Attributen mit RDD

rdd.filter(_.age > 21)
  1. DataFrame API

Mit Spark 1.3 wurde im Rahmen der Project Tungsten-Initiative eine neue DataFrame API eingeführt, mit der die Leistung und Skalierbarkeit von Spark verbessert werden soll. Die DataFrame -API führt das Konzept eines Schemas zur Beschreibung der Daten ein, mit dem Spark das Schema verwalten und nur Daten zwischen Knoten übertragen kann, und zwar auf viel effizientere Weise als mit = Java Serialisierung.

Die DataFrame-API unterscheidet sich grundlegend von der RDD-API, da es sich um eine API zum Erstellen eines relationalen Abfrageplans handelt, den der Catalyst-Optimierer von Spark dann ausführen kann. Die API ist natürlich für Entwickler, die mit dem Erstellen von Abfrageplänen vertraut sind

Beispiel eines SQL-Stils:

df.filter("age > 21");

Einschränkungen: Da der Code auf Datenattribute mit Namen verweist, kann der Compiler keine abfangen fehler. Wenn die Attributnamen falsch sind, wird der Fehler erst zur Laufzeit erkannt, wenn der Abfrageplan erstellt wird.

Ein weiterer Nachteil der DataFrame -API besteht darin, dass sie sehr skalazentrisch ist und Java zwar unterstützt, die Unterstützung jedoch begrenzt ist.

Wenn Sie beispielsweise ein DataFrame aus einem vorhandenen RDD von Java) Objekten erstellen, kann der Catalyst-Optimierer von Spark das Schema nicht herleiten und geht davon aus, dass Objekte im DataFrame implementiert sind Die scala.Product - Schnittstelle. Scala case class) funktioniert hervorragend, da sie diese Schnittstelle implementiert.

  1. Dataset API

Die API Dataset, die als API-Vorschau in Spark 1.6) veröffentlicht wurde, soll das Beste aus beiden Welten bieten: den bekannten objektorientierten Programmierstil und die Sicherheit beim Kompilieren der RDD-API, jedoch mit den Leistungsvorteilen des Catalyst-Abfrageoptimierungsprogramms.Datasets verwenden denselben effizienten Off-Heap-Speichermechanismus wie die DataFrame-API.

Wenn es um die Serialisierung von Daten geht, hat die Dataset-API das Konzept von Codierern , die zwischen JVM-Darstellungen (Objekten) und der internen Binärdatei von Spark übersetzt werden Format. Spark hat eingebaute Encoder, die sehr fortschrittlich sind, da sie Byte-Code generieren, um mit Off-Heap-Daten zu interagieren und bei Bedarf auf einzelne Attribute zuzugreifen, ohne dass ein gesamtes Objekt de-serialisiert werden muss . Spark stellt noch keine API zum Implementieren benutzerdefinierter Encoder bereit, dies ist jedoch für eine zukünftige Version geplant.

Darüber hinaus ist die Dataset -API so konzipiert, dass sie mit Java und Scala) gleich gut funktioniert. Wenn Sie mit Java Objekten arbeiten, ist es wichtig, dass Sie sind vollständig bohnenkonform.

Beispiel Dataset API-SQL-Stil:

dataset.filter(_.age < 21);

Auswertungen versch. zwischen DataFrame & DataSet:enter image description here

Lesen Sie weiter ... databricks article - Eine Geschichte von drei Apache Spark APIs: RDDs vs DataFrames und Datasets

188
Ram Ghadiyaram

Apache Spark bietet drei Arten von APIs

  1. [~ # ~] rdd [~ # ~]
  2. DataFrame
  3. Datensatz

Comparing RDD, Dataframe and Dataset APIs

Hier ist der API-Vergleich zwischen RDD, Dataframe und Dataset.

RDD

Die Hauptabstraktion Spark ist ein resilientes verteiltes Dataset (RDD), bei dem es sich um eine Sammlung von Elementen handelt, die auf die Knoten des Clusters verteilt sind und die parallel bearbeitet werden können.

RDD Eigenschaften: -

  • Verteilte Sammlung:
    RDD verwendet MapReduce-Operationen, die häufig für die Verarbeitung und Generierung großer Datensätze mit einem parallelen, verteilten Algorithmus in einem Cluster verwendet werden. Benutzer können parallele Berechnungen mit einer Reihe von Operatoren auf hoher Ebene erstellen, ohne sich um die Arbeitsverteilung und die Fehlertoleranz kümmern zu müssen.

  • Unveränderlich: RDDs, die aus einer Sammlung von Datensätzen bestehen, die partitioniert sind. Eine Partition ist eine grundlegende Parallelitätseinheit in einer RDD, und jede Partition ist eine logische Datenaufteilung, die unveränderlich ist und durch einige Transformationen auf vorhandenen Partitionen erstellt wird.

  • Fehlertolerant: Wenn wir eine Partition von RDD verlieren, können wir die Transformation auf dieser Partition in der Linie wiederholen, um die gleiche Berechnung zu erzielen, anstatt Durchführen einer Datenreplikation über mehrere Knoten hinweg. Dieses Merkmal ist der größte Vorteil von RDD, da es viel Aufwand bei der Datenverwaltung und -replikation erspart und somit schnellere Berechnungen ermöglicht.

  • Faule Bewertungen: Alle Transformationen in Spark sind faul, da sie ihre Ergebnisse nicht sofort berechnen. Stattdessen erinnern sie sich nur an die Transformationen, die auf ein Basis-Dataset angewendet wurden. Die Transformationen werden nur berechnet, wenn für eine Aktion die Rückgabe eines Ergebnisses an das Treiberprogramm erforderlich ist.

  • Funktionale Transformationen: RDDs unterstützen zwei Arten von Operationen: Transformationen, die aus einer vorhandenen Datenmenge eine neue Datenmenge erstellen, und Aktionen, die dem Treiber einen Wert zurückgeben Programm, nachdem eine Berechnung für den Datensatz ausgeführt wurde.

  • Datenverarbeitungsformate:
    Es kann sowohl strukturierte als auch unstrukturierte Daten einfach und effizient verarbeiten.

  • Unterstützte Programmiersprachen:
    Die RDD-API ist in Java, Scala, Python und R verfügbar.

RDD-Einschränkungen: -

  • Keine eingebaute Optimierungs-Engine: Bei der Arbeit mit strukturierten Daten können RDDs die erweiterten Optimierungsfunktionen von Spark, einschließlich Katalysatoroptimierung und Wolfram-Ausführungs-Engine, nicht nutzen. Entwickler müssen jede RDD basierend auf ihren Attributen optimieren.

  • Umgang mit strukturierten Daten: Im Gegensatz zu Datenrahmen und Datasets leiten RDDs nicht auf das Schema der aufgenommenen Daten ab und erfordern, dass der Benutzer es angibt.

Datenrahmen

Spark hat Dataframes in Version 1.3 von Spark eingeführt. Dataframe überwindet die zentralen Herausforderungen, die RDDs hatten.

Ein DataFrame ist eine verteilte Sammlung von Daten, die in benannten Spalten organisiert sind. Es entspricht konzeptionell einer Tabelle in einer relationalen Datenbank oder einem R/Python-Datenrahmen. Zusammen mit Dataframe führte Spark auch den Katalysatoroptimierer ein, der erweiterte Programmierfunktionen nutzt, um einen erweiterbaren Abfrageoptimierer zu erstellen.

Dataframe-Funktionen: -

  • Verteilte Auflistung von Zeilenobjekten: Ein DataFrame ist eine verteilte Auflistung von Daten, die in benannten Spalten organisiert sind. Es entspricht konzeptionell einer Tabelle in einer relationalen Datenbank, bietet jedoch umfassendere Optimierungen.

  • Datenverarbeitung: Verarbeitung strukturierter und unstrukturierter Datenformate (Avro, CSV, Elastic Search und Cassandra) und Speichersysteme (HDFS, Hive Tables, MySQL usw.) ). Es kann aus all diesen verschiedenen Datenquellen lesen und schreiben.

  • Optimierung mit dem Katalysatoroptimierer: Es werden sowohl SQL-Abfragen als auch die DataFrame-API betrieben. Datenrahmen verwenden das Katalysatorbaum-Transformations-Framework in vier Phasen.

    1.Analyzing a logical plan to resolve references
    2.Logical plan optimization
    3.Physical planning
    4.Code generation to compile parts of the query to Java bytecode.
    
  • Hive-Kompatibilität: Mit Spark SQL können Sie unveränderte Hive-Abfragen in Ihren vorhandenen Hive-Warehouses ausführen. Es verwendet das Hive-Frontend und MetaStore erneut und bietet Ihnen vollständige Kompatibilität mit vorhandenen Hive-Daten, Abfragen und UDFs.

  • Tungsten: Tungsten bietet ein physisches Ausführungs-Backend, das den Speicher explizit verwaltet und dynamisch Bytecode für die Ausdrucksauswertung generiert.

  • Unterstützte Programmiersprachen:
    Dataframe-API ist in Java, Scala, Python und R verfügbar.

Einschränkungen für Datenrahmen: -

  • Sicherheit beim Kompilieren: Wie bereits erläutert, unterstützt die Dataframe-API keine Sicherheit beim Kompilieren, sodass Sie keine Daten manipulieren können, wenn die Struktur nicht bekannt ist. Das folgende Beispiel funktioniert während der Kompilierung. Sie erhalten jedoch eine Laufzeitausnahme, wenn Sie diesen Code ausführen.

Beispiel:

case class Person(name : String , age : Int) 
val dataframe = sqlContext.read.json("people.json") 
dataframe.filter("salary > 10000").show 
=> throws Exception : cannot resolve 'salary' given input age , name

Dies ist besonders dann eine Herausforderung, wenn Sie mit mehreren Transformations- und Aggregationsschritten arbeiten.

  • Domänenobjekt kann nicht bearbeitet werden (verloren gegangenes Domänenobjekt): Nachdem Sie ein Domänenobjekt in einen Datenrahmen umgewandelt haben, können Sie es nicht mehr daraus generieren. Im folgenden Beispiel wird nach dem Erstellen von personDF aus personRDD nicht die ursprüngliche RDD der Personenklasse (RDD [Person]) wiederhergestellt.

Beispiel:

case class Person(name : String , age : Int)
val personRDD = sc.makeRDD(Seq(Person("A",10),Person("B",20)))
val personDF = sqlContext.createDataframe(personRDD)
personDF.rdd // returns RDD[Row] , does not returns RDD[Person]

Datasets API

Die Dataset-API ist eine Erweiterung von DataFrames, die eine typsichere, objektorientierte Programmierschnittstelle bietet. Es ist eine stark typisierte, unveränderliche Sammlung von Objekten, die einem relationalen Schema zugeordnet sind.

Das Herzstück des Datasets ist die API, ein neues Konzept namens Encoder, das für die Konvertierung zwischen JVM-Objekten und der tabellarischen Darstellung zuständig ist. Die tabellarische Darstellung wird im internen Tungsten-Binärformat Spark gespeichert, wodurch Vorgänge mit serialisierten Daten und eine verbesserte Speichernutzung ermöglicht werden. Spark 1.6 unterstützt das automatische Generieren von Encodern für eine Vielzahl von Typen, einschließlich primitiver Typen (z. B. String, Integer, Long), Scala - Fallklassen und Java - Beans.

Datensatzmerkmale: -

  • Bietet das Beste aus RDD und Dataframe: RDD (funktionale Programmierung, typensicher), DataFrame (relationales Modell, Abfrageoptimierung, Wolfram-Ausführung, Sortieren und Mischen)

  • Encoder: Mit der Verwendung von Encodern ist es einfach, jedes JVM-Objekt in einen Datensatz zu konvertieren, sodass Benutzer im Gegensatz zu Dataframe sowohl mit strukturierten als auch mit unstrukturierten Daten arbeiten können .

  • Unterstützte Programmiersprachen: Datasets API ist derzeit nur in Scala und Java verfügbar. Python und R werden derzeit in Version 1.6 nicht unterstützt. Die Unterstützung von Python ist für Version 2.0 vorgesehen.

  • Typensicherheit: Die Datasets-API bietet Kompilierungszeitsicherheit, die in Dataframes nicht verfügbar war. Im folgenden Beispiel sehen wir, wie Dataset auf Domänenobjekte mit kompilierten Lambda-Funktionen angewendet werden kann.

Beispiel:

case class Person(name : String , age : Int)
val personRDD = sc.makeRDD(Seq(Person("A",10),Person("B",20)))
val personDF = sqlContext.createDataframe(personRDD)
val ds:Dataset[Person] = personDF.as[Person]
ds.filter(p => p.age > 25)
ds.filter(p => p.salary > 25)
 // error : value salary is not a member of person
ds.rdd // returns RDD[Person]
  • Interoperabel: Mit Datasets können Sie Ihre vorhandenen RDDs und Dataframes auf einfache Weise in Datasets ohne Boilerplate-Code konvertieren.

Datasets API-Einschränkung: -

  • Erfordert Typumwandlung in String: Um Daten aus Datasets abzufragen, müssen wir die Felder in der Klasse derzeit als String angeben. Sobald wir die Daten abgefragt haben, müssen wir die Spalte in den erforderlichen Datentyp umwandeln. Wenn Sie dagegen die Kartenoperation für Datasets verwenden, wird der Catalyst-Optimierer nicht verwendet.

Beispiel:

ds.select(col("name").as[String], $"age".as[Int]).collect()

Keine Unterstützung für Python und R: Ab Release 1.6 unterstützen Datasets nur Scala und Java. Die Unterstützung von Python wird in Spark 2.0 eingeführt.

Die Datasets-API bietet mehrere Vorteile gegenüber der vorhandenen RDD- und Dataframe-API mit verbesserter Typensicherheit und funktionaler Programmierung. Angesichts der Anforderungen an die Typumwandlung in der API würden Sie immer noch nicht die erforderliche Typensicherheit erreichen und Ihren Code spröde machen.

123
Amit Dubey

RDD

Die Hauptabstraktion Spark stellt ein resilient Distributed Dataset (RDD) bereit, eine Sammlung von Elementen, die über die Knoten des Clusters verteilt sind und parallel bearbeitet werden können.

RDD Eigenschaften: -

  • Verteilte Sammlung:
    RDD verwendet MapReduce-Operationen, die häufig für die Verarbeitung und Generierung großer Datensätze mit einem parallelen, verteilten Algorithmus in einem Cluster verwendet werden. Benutzer können parallele Berechnungen mit einer Reihe von Operatoren auf hoher Ebene erstellen, ohne sich um die Arbeitsverteilung und die Fehlertoleranz kümmern zu müssen.

  • nveränderlich: RDDs, die aus einer Sammlung von Datensätzen bestehen, die partitioniert sind. Eine Partition ist eine grundlegende Parallelitätseinheit in einer RDD, und jede Partition ist eine logische Datenaufteilung, die unveränderlich ist und durch einige Transformationen auf vorhandenen Partitionen erstellt wird.

  • Fehlertolerant: Wenn wir eine Partition von RDD verlieren, können wir die Umwandlung auf dieser Partition in der Linie wiederholen, um dieselbe Berechnung zu erzielen, anstatt die Datenreplikation über mehrere Knoten hinweg durchzuführen Der größte Vorteil von RDD besteht darin, dass es viel Aufwand bei der Datenverwaltung und -replikation spart und somit schnellere Berechnungen ermöglicht.

  • Lazy evaluations: Alle Transformationen in Spark) sind insofern faul, als sie ihre Ergebnisse nicht sofort berechnen, sondern sich nur an die Transformationen erinnern, die auf einen Basisdatensatz angewendet wurden. Die Transformationen werden nur berechnet, wenn für eine Aktion die Rückgabe eines Ergebnisses an das Treiberprogramm erforderlich ist.

  • Funktionale Transformationen: RDDs unterstützen zwei Arten von Operationen: Transformationen, die aus einer vorhandenen einen neuen Datensatz erstellen, und Aktionen, die nach Ausführung einer Berechnung für den Datensatz einen Wert an das Treiberprogramm zurückgeben.

  • Datenverarbeitungsformate:

Es kann sowohl strukturierte als auch unstrukturierte Daten einfach und effizient verarbeiten.

  • nterstützte Programmiersprachen:
    Die RDD-API ist in Java, Scala, Python und R.

RDD-Einschränkungen: -

  • Keine eingebaute Optimierungs-Engine: Bei der Arbeit mit strukturierten Daten können RDDs die erweiterten Optimierungsfunktionen von Spark, einschließlich Katalysatoroptimierung und Tungsten-Ausführungs-Engine, nicht nutzen. Entwickler müssen jede RDD basierend auf ihren Attributen optimieren.

  • mgang mit strukturierten Daten: Im Gegensatz zu Datenrahmen und Datasets leiten RDDs nicht auf das Schema der aufgenommenen Daten ab und erfordern, dass der Benutzer es angibt.

Datenrahmen

In Spark 1.3) hat Spark Dataframes eingeführt. Dataframe überwindet die wichtigsten Herausforderungen, die RDDs hatten.

Ein DataFrame ist eine verteilte Sammlung von Daten, die in benannten Spalten organisiert sind. Es entspricht konzeptionell einer Tabelle in einer relationalen Datenbank oder einem R/Python-Datenrahmen. Zusammen mit Dataframe führte Spark) auch den Katalysatoroptimierer ein, der erweiterte Programmierfunktionen nutzt, um einen erweiterbaren Abfrageoptimierer zu erstellen.

Dataframe-Funktionen: -

  • Verteilte Auflistung von Zeilenobjekten: Ein DataFrame ist eine verteilte Auflistung von Daten, die in benannten Spalten organisiert sind. Es entspricht konzeptionell einer Tabelle in einer relationalen Datenbank, bietet jedoch umfassendere Optimierungen.

  • Data Processing: Verarbeiten von strukturierten und unstrukturierten Datenformaten (Avro, CSV, Elastic Search und Cassandra) und Speichersystemen (HDFS, Hive Tables, MySQL usw.). Es kann aus all diesen verschiedenen Datenquellen lesen und schreiben.

  • Optimierung mit dem Katalysatoroptimierer: Es werden sowohl SQL-Abfragen als auch die DataFrame-API betrieben. Datenrahmen verwenden das Katalysatorbaum-Transformations-Framework in vier Phasen.

    1.Analyzing a logical plan to resolve references
    2.Logical plan optimization
    3.Physical planning
    4.Code generation to compile parts of the query to Java bytecode.
    
  • Hive-Kompatibilität: Mit Spark SQL können Sie unveränderte Hive-Abfragen in Ihren vorhandenen Hive-Warehouses ausführen. Hive-Frontend und MetaStore werden wiederverwendet, und Sie erhalten vollständige Kompatibilität mit vorhandenen Hive-Daten , Abfragen und UDFs.

  • Tungsten: Tungsten bietet ein physisches Ausführungs-Backend, das den Speicher explizit verwaltet und dynamisch Bytecode für die Ausdrucksauswertung generiert.

  • nterstützte Programmiersprachen:
    Dataframe-API ist in Java, Scala, Python und R verfügbar.

Einschränkungen für Datenrahmen: -

  • Sicherheit beim Kompilieren: Wie bereits erwähnt, unterstützt die Dataframe-API keine Sicherheit beim Kompilieren, wodurch Sie daran gehindert werden, Daten zu manipulieren, wenn die Struktur nicht bekannt ist. Das folgende Beispiel funktioniert während der Kompilierung. Sie erhalten jedoch eine Laufzeitausnahme, wenn Sie diesen Code ausführen.

Beispiel:

case class Person(name : String , age : Int) 
val dataframe = sqlContect.read.json("people.json") 
dataframe.filter("salary > 10000").show 
=> throws Exception : cannot resolve 'salary' given input age , name

Dies ist besonders dann eine Herausforderung, wenn Sie mit mehreren Transformations- und Aggregationsschritten arbeiten.

  • Kann nicht mit Domänenobjekten (verlorenen Domänenobjekten) arbeiten: Sobald Sie ein Domänenobjekt in einen Datenrahmen umgewandelt haben, können Sie es nicht mehr daraus generieren. Im folgenden Beispiel wird nach dem Erstellen von personDF aus personRDD nicht die ursprüngliche RDD der Personenklasse (RDD [Person]) wiederhergestellt.

Beispiel:

case class Person(name : String , age : Int)
val personRDD = sc.makeRDD(Seq(Person("A",10),Person("B",20)))
val personDF = sqlContect.createDataframe(personRDD)
personDF.rdd // returns RDD[Row] , does not returns RDD[Person]

Datasets API

Die Dataset-API ist eine Erweiterung von DataFrames, die eine typsichere, objektorientierte Programmierschnittstelle bietet. Es ist eine stark typisierte, unveränderliche Sammlung von Objekten, die einem relationalen Schema zugeordnet sind.

Das Herzstück des Datasets ist die API, ein neues Konzept namens Encoder, das für die Konvertierung zwischen JVM-Objekten und der tabellarischen Darstellung zuständig ist. Die tabellarische Darstellung wird im internen Wolfram-Binärformat Spark=) gespeichert, um Operationen mit serialisierten Daten und eine verbesserte Speichernutzung zu ermöglichen. Spark= 1.6 unterstützt das automatische Generieren von Encodern für eine Vielzahl von Typen, einschließlich primitiver Typen (z. B. String, Integer, Long), Scala case classes und Java Beans).

Datensatzmerkmale: -

  • Bietet das Beste aus RDD und Dataframe: RDD (funktionale Programmierung, typensicher), DataFrame (relationales Modell, Abfrageoptimierung, Wolfram-Ausführung, Sortieren und Mischen)

  • Encoder: Mit der Verwendung von Encodern ist es einfach, jedes JVM-Objekt in ein Dataset zu konvertieren, sodass Benutzer im Gegensatz zu Dataframe sowohl mit strukturierten als auch mit unstrukturierten Daten arbeiten können.

  • nterstützte Programmiersprachen: Die Datasets-API ist derzeit nur in Scala und Java. Python und R werden derzeit in Version 1.6 nicht unterstützt . Python Unterstützung ist für Version 2.0 vorgesehen.

  • Type Safety: Datasets API bietet Kompilierungszeitsicherheit, die in Dataframes nicht verfügbar war. Im folgenden Beispiel sehen wir, wie Dataset auf Domänenobjekte mit kompilierten Lambda-Funktionen angewendet werden kann.

Beispiel:

case class Person(name : String , age : Int)
val personRDD = sc.makeRDD(Seq(Person("A",10),Person("B",20)))
val personDF = sqlContect.createDataframe(personRDD)
val ds:Dataset[Person] = personDF.as[Person]
ds.filter(p => p.age > 25)
ds.filter(p => p.salary > 25)
 // error : value salary is not a member of person
ds.rdd // returns RDD[Person]
  • Interoperable: Mit Datasets können Sie Ihre vorhandenen RDDs und Dataframes auf einfache Weise in Datasets ohne Boilerplate-Code konvertieren.

Datasets API-Einschränkung: -

  • Typumwandlung in String erforderlich: Um Daten aus Datasets abzufragen, müssen wir derzeit die Felder in der Klasse als String angeben. Sobald wir die Daten abgefragt haben, müssen wir die Spalte in den erforderlichen Datentyp umwandeln. Wenn Sie dagegen die Kartenoperation für Datasets verwenden, wird der Catalyst-Optimierer nicht verwendet.

Beispiel:

ds.select(col("name").as[String], $"age".as[Int]).collect()

Keine Unterstützung für Python und R: Ab Version 1.6 unterstützen Datasets nur Scala und Java. Python) eingeführt in Spark 2.0.

Die Datasets-API bietet mehrere Vorteile gegenüber der vorhandenen RDD- und Dataframe-API mit verbesserter Typensicherheit und funktionaler Programmierung. Angesichts der Anforderungen an die Typumwandlung in der API würden Sie immer noch nicht die erforderliche Typensicherheit erreichen und Ihren Code spröde machen.

59
Amit Dubey

Alle (RDD, DataFrame und DataSet) in einem Bild

RDD vs DataFrame vs DataSet

Bildnachweis

RDD

RDD ist eine fehlertolerante Sammlung von Elementen, die parallel bearbeitet werden können.

DataFrame

DataFrame ist ein Dataset, das in benannten Spalten organisiert ist. Es ist konzeptionell äquivalent zu einer Tabelle in einer relationalen Datenbank oder Daten Frame in R/Python, aber mit reichhaltigeren Optimierungen unter der Haube.

Datensatz

Dataset ist eine verteilte Sammlung von Daten. Datensatz ist eine neue Schnittstelle, die in Spark 1.6 hinzugefügt wurde und die -Vorteile von RDDs .__ bereitstellt. (starkes Tippen, Fähigkeit, mächtige Lambda-Funktionen zu verwenden) mit der Vorteile der optimierten Ausführungs-Engine von Spark SQL.


Hinweis: 

Datensatz der Zeilen (Dataset[Row]) in Scala/Java wird häufig als DataFrames bezeichnen.


Netter Vergleich aller mit einem Code-Snippet

RDD vs DataFrame vs DataSet with code

Quelle


F: Können Sie RDD in DataFrame oder umgekehrt in eine andere konvertieren?

Ja, beides ist möglich

1. RDD bis DataFrame mit .toDF()

val rowsRdd: RDD[Row] = sc.parallelize(
  Seq(
    Row("first", 2.0, 7.0),
    Row("second", 3.5, 2.5),
    Row("third", 7.0, 5.9)
  )
)

val df = spark.createDataFrame(rowsRdd).toDF("id", "val1", "val2")

df.show()
+------+----+----+
|    id|val1|val2|
+------+----+----+
| first| 2.0| 7.0|
|second| 3.5| 2.5|
| third| 7.0| 5.9|
+------+----+----+

weitere Möglichkeiten: Konvertieren eines RDD-Objekts in Spark in Dataframe

2. DataFrame/DataSet zu RDD mit .rdd() Methode

val rowsRdd: RDD[Row] = df.rdd() // DataFrame to RDD
28
mrsrinivas

Einfach RDD ist die Kernkomponente, aber DataFrame ist eine API, die in spark 1.30 eingeführt wurde.

RDD

Sammlung von Datenpartitionen mit dem Namen RDD. Diese RDD muss folgenden Eigenschaften folgen: 

  • Unveränderlich,
  • Fehlertoleranz,
  • Verteilt, 
  • Mehr.

Hier ist RDD entweder strukturiert oder unstrukturiert.

DataFrame

DataFrame ist eine API, die in Scala, Java, Python und R verfügbar ist. Sie ermöglicht die Verarbeitung jeder Art von strukturierten und halbstrukturierten Daten. Um DataFrame zu definieren, eine Sammlung verteilter Daten, die in benannten Spalten mit dem Namen DataFrame angeordnet sind. Sie können die RDDs in der DataFrame..__ leicht optimieren. Sie können JSON-Daten, Parkettdaten und HiveQL-Daten gleichzeitig mit DataFrame bearbeiten.

val sampleRDD = sqlContext.jsonFile("hdfs://localhost:9000/jsondata.json")

val sample_DF = sampleRDD.toDF()

Hier gilt Sample_DF als DataFrame. sampleRDD heißt (Rohdaten) RDD.

23
Venu A Positive

Weil DataFrame schwach typisiert ist und die Entwickler die Vorteile des Typsystems nicht nutzen können. Nehmen wir beispielsweise an, Sie möchten etwas aus SQL lesen und eine Aggregation darauf ausführen:

val people = sqlContext.read.parquet("...")
val department = sqlContext.read.parquet("...")

people.filter("age > 30")
  .join(department, people("deptId") === department("id"))
  .groupBy(department("name"), "gender")
  .agg(avg(people("salary")), max(people("age")))

Wenn Sie people("deptId") sagen, erhalten Sie keine Int oder Long zurück, sondern ein Column-Objekt, das Sie bearbeiten müssen. In Sprachen mit umfangreichen Typsystemen wie Scala verlieren Sie am Ende alle Typensicherheit, was die Anzahl der Laufzeitfehler für Dinge erhöht, die zur Kompilierzeit entdeckt werden könnten.

Im Gegenteil, DataSet[T] wird eingegeben. wenn Sie das tun:

val people: People = val people = sqlContext.read.parquet("...").as[People]

Sie erhalten tatsächlich ein People-Objekt zurück, wobei deptId ein tatsächlicher integraler Typ und kein Spaltentyp ist, wodurch das Typsystem genutzt wird.

Ab Spark 2.0 werden die DataFrame- und DataSet-APIs vereinheitlicht, wobei DataFrame ein Typalias für DataSet[Row] ist.

21
Yuval Itzchakov

Ein DataFrame entspricht einer Tabelle in RDBMS und kann auch auf ähnliche Weise wie die "nativen" verteilten Sammlungen in RDDs bearbeitet werden. Im Gegensatz zu RDDs verfolgen Dataframes das Schema und unterstützen verschiedene relationale Operationen, die zu einer optimierten Ausführung führen .. Jedes DataFrame-Objekt stellt einen logischen Plan dar, aber aufgrund seiner "trägen" Natur findet keine Ausführung statt, bis der Benutzer eine bestimmte Ausgabe aufruft Operation".

8
Silver Blaze

Die meisten Antworten sind richtig, ich möchte hier nur einen Punkt hinzufügen 

In Spark 2.0 werden die beiden APIs (DataFrame + DataSet) zu einer einzigen API zusammengefasst.

"Vereinheitlichen von DataFrame und Dataset: In Scala und Java wurden DataFrame und Dataset vereinheitlicht, d. H. DataFrame ist lediglich ein Typalias für Dataset of Row. In Python und R ist DataFrame aufgrund der fehlenden Typsicherheit die Hauptprogrammierschnittstelle."

Datensätze ähneln RDDs, verwenden jedoch anstelle von Java-Serialisierung oder Kryo einen speziellen Encoder, um die Objekte für die Verarbeitung oder Übertragung über das Netzwerk zu serialisieren. 

Spark SQL unterstützt zwei verschiedene Methoden zum Konvertieren vorhandener RDDs in Datensätze. Die erste Methode verwendet Reflektion, um auf das Schema einer RDD zu schließen, die bestimmte Objekttypen enthält. Dieser reflexionsbasierte Ansatz führt zu prägnanterem Code und funktioniert gut, wenn Sie das Schema bereits beim Schreiben Ihrer Spark-Anwendung kennen.

Die zweite Methode zum Erstellen von Datensätzen besteht in einer programmatischen Schnittstelle, mit der Sie ein Schema erstellen und auf eine vorhandene RDD anwenden können. Obwohl diese Methode ausführlicher ist, können Sie Datensätze erstellen, wenn die Spalten und ihre Typen erst zur Laufzeit bekannt sind.

Hier finden Sie die Antwort auf RDD-Datenrahmenkonvertierung 

So konvertieren Sie ein Rdd-Objekt in Spark in einen Datenrahmen

8
vaquar khan

Ein Datenrahmen ist eine RDD von Row-Objekten, die jeweils einen Datensatz darstellen. Ein Datenframe kennt auch das Schema (d. H. Datenfelder) seiner Zeilen. Während Dataframes Wie normale RDDs aussehen, speichern sie intern Daten effizienter und nutzen deren Schema. Außerdem bieten sie neue Vorgänge, die für RDDs nicht verfügbar sind, z. B. die Möglichkeit, SQL-Abfragen auszuführen. Datenrahmen können aus externen Datenquellen, aus den Ergebnissen von Abfragen oder aus regulären RDDs erstellt werden.

Referenz: Zaharia M., et al. Learning Spark (O'Reilly, 2015)

5
Neethu

Einige Erkenntnisse aus der Perspektive der Nutzung, RDD vs. DataFrame:

  1. RDDs sind unglaublich! da sie uns alle Flexibilität bieten, um mit fast jeder Art von Daten umzugehen; unstrukturierte, halbstrukturierte und strukturierte Daten. Da Daten oftmals nicht für die Anpassung an einen DataFrame (auch für JSON) bereit sind, können RDDs zur Vorverarbeitung der Daten verwendet werden, so dass sie in einen Datenrahmen passen. RDDs sind eine zentrale Datenabstraktion in Spark. 
  2. Nicht alle Umwandlungen, die auf RDD möglich sind, sind auf DataFrames möglich. Beispiel: subtract () ist für RDD und Ausnahmen () für DataFrame.
  3. Da DataFrames einer relationalen Tabelle ähneln, folgen sie bei der Verwendung von set/relational-theoretischen Transformationen strengen Regeln. Wenn Sie beispielsweise zwei Dataframes zusammenführen möchten, müssen beide DFS die gleiche Anzahl von Spalten und zugehörigen Datentypen haben. Spaltennamen können unterschiedlich sein. Diese Regeln gelten nicht für RDDs. Hier ist ein gutes Tutorial diese Fakten zu erklären.
  4. Bei der Verwendung von DataFrames gibt es Leistungsverbesserungen, wie andere bereits ausführlich erläutert haben.
  5. Bei Verwendung von DataFrames müssen Sie die beliebige Funktion nicht wie bei der Programmierung mit RDDs übergeben.
  6. Sie benötigen SQLContext/HiveContext, um Dataframes so zu programmieren, wie sie im SparkSQL-Bereich des Spark-Ökosystems liegen. Für RDD benötigen Sie jedoch nur SparkContext/JavaSparkContext, die in Spark Core-Bibliotheken liegen.
  7. Sie können eine DF aus einer RDD erstellen, wenn Sie dafür ein Schema definieren können.
  8. Sie können auch ein df in rdd und ein rdd in df konvertieren.

Ich hoffe, es hilft! 

4
skvyas

Sie können RDDs mit Structured und Unstructured verwenden, wobei Dataframe/Dataset nur Structured und Semi Structured Data verarbeiten kann (es verfügt über ein geeignetes Schema). 

1
Raj

Spark RDD (resilient distributed dataset):

RDD ist die zentrale Datenabstraktions-API und seit der ersten Veröffentlichung von Spark (Spark 1.0) verfügbar. Es ist eine untergeordnete API zum Manipulieren der verteilten Datenerfassung. Die RDD-APIs bieten einige äußerst nützliche Methoden, mit denen die zugrunde liegende physische Datenstruktur sehr genau gesteuert werden kann. Es handelt sich um eine unveränderliche (schreibgeschützte) Sammlung partitionierter Daten, die auf verschiedenen Computern verteilt sind. RDD ermöglicht die In-Memory-Berechnung in großen Clustern, um die Verarbeitung großer Datenmengen fehlertolerant zu beschleunigen. Um Fehlertoleranz zu aktivieren, verwendet RDD DAG (Directed Acyclic Graph), das aus einer Reihe von Eckpunkten und Kanten besteht. Die Eckpunkte und Kanten in DAG stellen die RDD bzw. die auf diese RDD anzuwendende Operation dar. Die in RDD definierten Transformationen sind verzögert und werden nur ausgeführt, wenn eine Aktion aufgerufen wird

Spark DataFrame:

Mit Spark 1.3 wurden zwei neue Datenabstraktions-APIs eingeführt - DataFrame und DataSet. Die DataFrame-APIs organisieren die Daten in benannten Spalten wie eine Tabelle in einer relationalen Datenbank. Es ermöglicht Programmierern, ein Schema für eine verteilte Datensammlung zu definieren. Jede Zeile in einem DataFrame ist vom Objekttyp row. Wie eine SQL-Tabelle muss jede Spalte dieselbe Anzahl von Zeilen in einem DataFrame haben. Kurz gesagt, DataFrame ist ein faul evaluierter Plan, der die Vorgänge angibt, die für die verteilte Erfassung der Daten ausgeführt werden müssen. DataFrame ist auch eine unveränderliche Sammlung.

Spark DataSet:

Als Erweiterung der DataFrame-APIs wurden mit Spark 1.3 auch DataSet-APIs eingeführt, die in Spark eine streng typisierte und objektorientierte Programmierschnittstelle bieten. Es ist eine unveränderliche, typsichere Sammlung verteilter Daten. Wie DataFrame verwenden auch DataSet-APIs die Catalyst-Engine, um die Ausführungsoptimierung zu ermöglichen. DataSet ist eine Erweiterung der DataFrame-APIs.

Other Differences -

enter image description here

0
NoName

Jede großartige Antwort und die Verwendung der einzelnen APIs hat einige Nachteile. _ Dataset ist als Super-API zur Lösung vieler Probleme konzipiert. RDD funktioniert jedoch häufig am besten, wenn Sie Ihre Daten verstehen und der Verarbeitungsalgorithmus für viele Aufgaben optimiert ist In Single Pass zu großen Daten scheint RDD die beste Option zu sein.

Die Aggregation mithilfe der Dataset-API verbraucht weiterhin Speicher und wird mit der Zeit besser.

0
Ashkrit Sharma

Ein DataFrame ist eine RDD mit einem Schema. Sie können es sich als relationale Datenbanktabelle vorstellen, indem jede Spalte einen Namen und einen bekannten Typ hat. Die Leistungsfähigkeit von DataFrames ergibt sich aus der Tatsache, dass Spark beim Erstellen eines DataFrames aus einem strukturierten Dataset (Json, Parquet ..) auf ein Schema schließen kann, indem er das gesamte (Json, Parkett) durchläuft. .) Datensatz, der geladen wird. Spark kann dann bei der Berechnung des Ausführungsplans das Schema verwenden und wesentlich bessere Berechnungsoptimierungen durchführen. Beachten Sie, dass DataFrame vor Spark v1.3.0 SchemaRDD genannt wurde

0
user2989087