Angenommen, ich gebe einem Spark-Kontext drei Dateipfade zum Lesen, und jede Datei enthält ein Schema in der ersten Zeile. Wie können wir Schemazeilen aus Kopfzeilen überspringen?
val rdd=sc.textFile("file1,file2,file3")
Nun, wie können wir Headerzeilen von diesem Rdd überspringen?
Wenn der erste Datensatz nur eine Kopfzeile enthält, können Sie ihn am effizientesten herausfiltern:
rdd.mapPartitionsWithIndex {
(idx, iter) => if (idx == 0) iter.drop(1) else iter
}
Dies ist nicht hilfreich, wenn natürlich viele Dateien mit vielen Kopfzeilen darin sind. Sie können tatsächlich drei RDDs, die Sie auf diese Weise herstellen, zusammenführen.
Sie könnten auch einfach eine filter
schreiben, die nur einer Zeile entspricht, die ein Header sein könnte. Dies ist ziemlich einfach, aber weniger effizient.
Python-Äquivalent:
from itertools import islice
rdd.mapPartitionsWithIndex(
lambda idx, it: islice(it, 1, None) if idx == 0 else it
)
data = sc.textFile('path_to_data')
header = data.first() #extract header
data = data.filter(row => row != header) #filter out header
In Spark 2.0 ist ein CSV-Reader in Spark integriert, sodass Sie eine CSV-Datei wie folgt einfach laden können:
spark.read.option("header","true").csv("filePath")
Ab Spark 2.0 können Sie dies tun, indem Sie SparkSession verwenden, um dies als Einzeiler auszuführen:
val spark = SparkSession.builder.config(conf).getOrCreate()
und dann wie @SandeepPurohit sagte:
val dataFrame = spark.read.format("CSV").option("header","true").load(csvfilePath)
Ich hoffe es hat deine Frage gelöst!
P.S: SparkSession ist der neue Einstiegspunkt in Spark 2.0 und befindet sich unter spark_sql package .
In PySpark können Sie einen Datenrahmen verwenden und den Header als "True" setzen:
df = spark.read.csv(dataPath, header=True)
Sie können jede Datei einzeln laden, sie mit file.zipWithIndex().filter(_._2 > 0)
filtern und dann alle RDDs der Datei zusammenführen.
Wenn die Anzahl der Dateien zu groß ist, könnte die Union eine StackOverflowExeption
auslösen.
Verwenden Sie die Methode filter()
in PySpark, indem Sie den Namen der ersten Spalte herausfiltern und die Kopfzeile entfernen:
# Read file (change format for other file formats)
contentRDD = sc.textfile(<filepath>)
# Filter out first column of the header
filterDD = contentRDD.filter(lambda l: not l.startswith(<first column name>)
# Check your result
for i in filterDD.take(5) : print (i)
Es ist eine Option, die Sie an den Befehl read()
übergeben:
context = new org.Apache.spark.sql.SQLContext(sc)
var data = context.read.option("header","true").csv("<path>")
Arbeiten im Jahr 2018 (Spark 2.3)
Python
df = spark.read.option("header","true").format("csv").schema(myManualSchema).load("maestraDestacados.csv")
Scala
val myDf = spark.read.option("header","true").format("csv").schema(myManualSchema).load("maestraDestacados.csv")
PD1: myManualSchema ist ein vordefiniertes Schema, das von mir geschrieben wurde. Sie können diesen Teil des Codes überspringen
Alternativ können Sie das spark-csv-Paket verwenden (oder in Spark 2.0 ist dies mehr oder weniger nativ als CSV verfügbar). Beachten Sie, dass dies den Header für jede Datei erwartet (je nach Wunsch):
schema = StructType([
StructField('lat',DoubleType(),True),
StructField('lng',DoubleType(),True)])
df = sqlContext.read.format('com.databricks.spark.csv'). \
options(header='true',
delimiter="\t",
treatEmptyValuesAsNulls=True,
mode="DROPMALFORMED").load(input_file,schema=schema)