by Sally Bo Hatter
Share
by Sally Bo Hatter
Was ist Spark?
Apache Spark ist eine einheitliche Computing-Engine, die von einer Reihe von Bibliotheken begleitet wird, die für die parallele Datenverarbeitung in einer Cluster-Umgebung entwickelt wurden. Als die am aktivsten entwickelte Open-Source-Engine für parallele Verarbeitung entwickelt sich Spark schnell zu einem Standardwerkzeug für Entwickler und Datenwissenschaftler, die sich mit Big-Data-Aufgaben befassen. Sie ist mit gängigen Programmiersprachen wie Python, Java, Scala und R kompatibel und bietet ein umfangreiches Angebot an Bibliotheken für verschiedene Funktionen, von SQL bis hin zu Streaming und maschinellem Lernen. Spark ist vielseitig und kann in verschiedenen Umgebungen betrieben werden, von einem einzelnen Laptop bis hin zu ausgedehnten Clustern mit Tausenden von Servern. Mithilfe dieser Funktionen können Benutzer mühelos die Verarbeitung großer Datenmengen einleiten und Operationen nahtlos skalieren – von kleinen Setups bis hin zu umfangreichen Clustern.
Wie man Spark optimiert
01 – Shuffling so weit wie möglich vermeiden
Was ist Shuffling?
Shuffling ist eine wichtige Operation im Kontext der verteilten Datenverarbeitung und spielt in Apache Spark eine wichtige Rolle, wenn es um große Datensätze geht, die über mehrere Knoten in einem Cluster verteilt sind. Beim Shuffling werden die Daten im Cluster neu verteilt und reorganisiert, in der Regel als Ergebnis von Transformationen oder Operationen, die eine Gruppierung oder Aggregation von Daten auf eine Art und Weise erfordern, wie sie zuvor nicht der Fall war.
Das Mischen von Daten kann zeit- und bandbreitenintensiv sein, da es Datenbewegungen und Koordinierung zwischen den Knotenpunkten erfordert. Die Minimierung des Shufflings ist häufig ein Ziel der Leistungsoptimierung bei der Entwicklung von Spark-Anwendungen. Strategien wie die Auswahl geeigneter Partitionierungsmethoden, die Verwendung von Broadcast-Variablen und die Optimierung des Ausführungsplans können dazu beitragen, die Auswirkungen des Shufflings auf die Gesamtleistung von Spark-Aufträgen abzuschwächen.
In Situationen, in denen das Problem mit reduceByKey gelöst werden kann, sollten Sie immer reduceByKey verwenden. Bei der Verwendung von groupByKey kommt es unweigerlich zu der höchst unerwünschten (aber manchmal unvermeidbaren) Datenumverteilung über alle Knoten in Spark. Bei der Verwendung von reduceByKey werden die Daten zwar immer noch gemischt, der wesentliche Unterschied zwischen den beiden Funktionen liegt jedoch darin, dass reduceByKey die Reduktionsoperation vor der Mischung durchführt, wodurch die über das Netz übertragene Datenmenge drastisch reduziert wird. Daher ist es ratsam, Funktionen zu verwenden, die die Datengröße vor dem Mischen reduzieren, wie reduceByKey oder aggregateByKey, wann immer möglich. Selbst bei der gleichen breiten Transformationsfunktion kann es erhebliche Leistungsunterschiede geben.
Im folgenden Beispiel wird die Leistung von „groupByKey“ und „reduceByKey“ verglichen:
Verwendung von groupByKey:
Verwendung von reduceByKey:
DAG groupByKey vs. reduceByKey:
Vergleich der Gesamtbearbeitungszeit:
02 – Unterteilung
In parallelen Umgebungen wie einem Spark-Cluster ist es entscheidend, die Daten angemessen zu partitionieren. Dadurch wird sichergestellt, dass jeder Executor-Knoten aktiv und produktiv bleibt. Wenn schlecht partitionierte Daten für die Verarbeitung verwendet werden, kann dies zu einer Situation führen, in der bestimmte Knoten eine unverhältnismäßig hohe Arbeitslast tragen. Dies wird als Datenschieflage bezeichnet. In Situationen, in denen der Programmierer die Kontrolle hat, kann er die Anzahl der Partitionen mit Funktionen wie „coalesce“ oder „repartition“ anpassen. Es gibt jedoch Szenarien, die sich der Kontrolle des Programmierers entziehen, z. B. bei Operationen wie „join“, die ein Mischen der Daten erfordern. In solchen Fällen wird die Anzahl der Partitionen durch den Spark-Konfigurationsparameter „spark.sql.shuffle.partitions“ bestimmt. Bei Aufträgen, die häufige Join-Operationen erfordern, hilft die vorherige Anpassung dieses Konfigurationswerts daher, eine angemessene Anzahl von Partitionen beizubehalten.
Es ist wichtig, den Unterschied zwischen „coalesce“ und „repartition“ zu beachten. repartition‘ führt zu einer Umverteilung, da der gesamte Datenbestand gleichmäßig auf die Knoten verteilt wird. Dies ist unvermeidlich, da das Wesen der Repartitionierung darin besteht, eine gleichmäßige Verteilung auf die Knoten zu erreichen. Andererseits ermöglicht die Verwendung der Funktion „coalesce“ die Verteilung der Daten, ohne ein Shuffling auszulösen, allerdings mit der Einschränkung, dass die Anzahl der Partitionen nicht erhöht werden kann.
03 – Verwenden Sie die richtigen Datenstrukturen
Ab Spark 2.x wird empfohlen, die Dataset-API zu verwenden. Die zugrundeliegende Struktur von Datasets ist zwar immer noch RDD, aber sie beinhaltet verschiedene Optimierungen, wie die Spark Catalyst-Optimierung, und eine viel leistungsfähigere Schnittstelle. Bei der Durchführung zeitaufwändiger Join-Operationen mit der High-Level-API können die Optimierungen beispielsweise automatisch auf Techniken wie Broadcast Join umschalten, um das Shuffling so weit wie möglich zu minimieren. Daher ist es ratsam, wann immer möglich, Datasets oder DataFrames zu verwenden.
04 – Verwendung von Broadcast-Variablen
Broadcast-Variablen in Apache Spark werden verwendet, um schreibgeschützte Variablen effizient über die Knoten in einem Spark-Cluster zu verteilen. Anstatt eine Kopie der Variablen an jede Aufgabe zu senden, was ressourcenintensiv und ineffizient sein kann, ermöglichen Broadcast-Variablen die Zwischenspeicherung der Variablen auf jedem Rechner und die gemeinsame Nutzung durch die Aufgaben, die auf diesem Rechner laufen. Dies kann die Leistung bestimmter Spark-Operationen erheblich verbessern, da die über das Netzwerk zu übertragende Datenmenge reduziert wird.
Nachfolgend ein Beispiel für die Verbindung zweier Datenrahmen. Die erste Variante ist ohne Broadcast, die zweite mit Broadcast.
Verbinden von zwei Datenrahmen ohne Übertragung:
Verbinden von zwei Datenrahmen mit Broadcasting:
Wenn Sie beide Beispiele ausführen, werden Sie feststellen, dass der größte Unterschied darin besteht, dass bei dem Beispiel mit Broadcast kein Mischen erforderlich ist. Bei Spark ist das Mischen ein kostspieliger Vorgang. Durch die Verwendung von Broadcast wird unnötiges Shuffling vermieden, was die Netzauslastung reduziert. Bei Spark, wo die Verarbeitung großer Datenmengen üblich ist, kann die Minimierung dieser unnötigen Netzwerknutzung die Leistung erheblich verbessern.
05 – Zwischenzeitliche Daten beibehalten
In Apache Spark sind persist() und cache() Methoden, die verwendet werden, um ein DataFrame, RDD (Resilient Distributed Dataset) oder Dataset im Speicher oder auf der Festplatte zu speichern. Diese Methoden sind besonders nützlich, um die Leistung von iterativen oder interaktiven Spark-Workloads zu verbessern, da die Daten nicht bei jedem Zugriff neu berechnet werden müssen.
Insbesondere iterative Algorithmen für maschinelles Lernen, wie sie in der MLlib von Spark üblich sind, profitieren erheblich von Caching oder Persisting. Diese Algorithmen greifen während der Iterationen wiederholt auf dieselben Daten zu und aktualisieren sie, so dass die Zwischenspeicherung für die Effizienz unerlässlich ist.
Wenn Daten persistiert werden, speichert Spark sie außerdem auf eine fehlertolerante Weise. Dadurch wird sichergestellt, dass bei einem Ausfall eines Knotens während der Berechnung die verlorenen Daten aus der ursprünglichen Quelle oder aus Zwischenschritten neu berechnet werden können, so dass die Datenintegrität erhalten bleibt.
In diesem Beispiel wird die Methode persist() verwendet, um den DataFrame im Speicher zwischenzuspeichern. Nachfolgende Aktionen auf dem DataFrame können von den zwischengespeicherten Daten profitieren, was die Leistung verbessert. Denken Sie daran, dass Sie je nach Anwendungsfall und verfügbaren Ressourcen eine geeignete Speicherebene wählen sollten.