9. Parallelisierung#

9.1. Vorbereitung#

Aktuelle Entwicklungen in der Computertechnik gehen dahin, dass immer mehr Prozessoren in einem Rechner verbaut werden. Im aktuell laut top500.org besten Hochleistungsrechner, dem Frontier (Oak Ridge, USA), sind beispielsweise 8 Millionen Rechenkerne verbaut. Eine solche Hardware wollen wir bei der Entwicklung von effizienten Algorithmen ausnutzen. Bisher liefen unsere Algorithmen mehr oder weniger sequentiell, das bedeutet, unser Programm wird Zeile für Zeile auf einem Rechenkern ausgeführt. Das Gegenstück zu sequentieller Programmierung ist die parallele Programmierung. Die Grundidee ist es das zu lösende Problem in Teilprobleme zu zerlegen, die weitestgehend unabhängig voneinander und gleichzeitig von mehreren Recheneinheiten gelöst werden. Die Teilergebnisse jedes einzelnen Prozesses müssen anschließend zu einem Gesamtergebnis zusammengefügt werden. Der entscheidende Punkt ist hierbei die Kommunikation der einzelnen Prozesse.

Wir wollen zunächst die benötigten Module in unsere Conda-Umgebung installieren:

conda install -c conda-forge mpi4py

MPI steht für Message Passing Interface, eine Schnittstelle, die Funktionen bereitstellt, welche die Kommunikation parallel laufender Prozesse erlaubt. Testen wir zunächst ob die Installation erfolgreich war. Wir schreiben ein simples Programm

print("Hello world")
Hide code cell output
Hello world

und speichern dieses in einer Datei namens hello_world.py ab. Um das Programm nun 4 mal aufzurufen schreiben wir in die Konsole

mpirun -np 4 python hello_world.py

und bekommen folgende Ausgabe:

Hello world
Hello world
Hello world
Hello world

Dies ist natürlich noch kein richtiges paralleles Programm. Unser Skript wird einfach nur 4 mal parallel ausgeführt.

Die Idee ist nun, dass wir mit mehreren Prozessen einen bestimmten Algorithmus effizienter gestalten, indem wir das zu lösende Problem in möglichst unabhängige Teilprobleme zerlegen, die sich gleichzeitig und möglichst ohne Kommunikation untereinander lösen lassen. Wir erläutern die einfachsten Konzepte an einem sehr einfachen Beispiel:

Beispiel

Berechne die Summe der Zahlen 1 bis \(n\), also

\[ s = \sum_{i=1}^n i. \]

Diese Aufgabe lässt sich nahezu perfekt parallelisieren. Für beispielsweise \(n=100\) könnte Prozess \(1\) die Summe von 1 bis 25, Prozess 2 die Summe von 26 bis 50, Prozess 3 die Summe von 51 bis 75 und Prozess 4 die Summe von 76 bis 100 berechnen. Die Zwischenergebnisse müssen anschließend an den Hauptprozess gesendet werden, welcher die finale Summation der Teilergebnisse vornimmt. Im Prinzip ändert sich dann die Summenformel für \(Q\) Prozesse wie folgt:

(9.1)#\[s = \sum_{q=1}^{Q} \left(\underbrace{\sum_{i=(q-1)\cdot(n/Q)+1}^{q\cdot(n/Q)} i}_{\text{Prozess q}}\right)\]

Zur Vereinfachung nehmen wir zunächst an, dass \(n\) durch \(Q\) teilbar ist. Das Programm soll so geschrieben sein, dass die innere Summe von Prozess \(q\) berechnet und die äußere Summe wird anschließend vom Hauptprozess.

Zur Kommunikation der einzelnen Prozesse wird ein sogenanntes Communicator-Objekt verwendet. In folgendem Beispiel wird das World-Communicator-Objekt in comm abgelegt. Mit diesem Objekt können wir den Rang eines Prozesses sowie die Gesamtanzahl der Prozesse erfragen:

from mpi4py import MPI

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

print(f"Hello! I am process {rank+1} of {size}.")

Ausführen mit mpirun -np 4 python first_mpi_prog.py liefert

Hello! I am process 2 of 4.
Hello! I am process 1 of 4.
Hello! I am process 4 of 4.
Hello! I am process 3 of 4.

Auffällig ist, dass hier nicht zwingend die Reihenfolge eingehalten wird. Der Prozess, der als erstes die Zeile mit der print-Ausgabe erreicht hat, schreibt auch zuerst auf die Konsole. Häufig muss dann im Skript unterschieden werden, ob dieses gerade vom Hauptprozess oder einem Hilfsprozess ausgeführt wird. Dies kann man mit folgender Fallunterscheidung realisieren:

if rank == 0:
    [Anweisungen für Hauptprozess]
else:
    [Anweisungen für Hilfsprozesse]

Der Hauptprozess sorgt meist dafür, dass die Hilfsprozesse mit den Eingabedaten gefüttert werden, welche zur Lösung der Teilaufgabe benötigt werden und für das Zusammenführen und Weiterverarbeiten der Zwischenergebnisse.

9.2. Einfache Kommunikation mit Send und Recv#

Die simpelste Form des Datenaustauschs zweier Prozesse ist eine Point-to-Point-Kommunikation mit den Methoden comm.recv(...) und comm.send(...) des Communicator-Objekts. Aus der Code-Dokumentation entnehmen wir, dass die Funktionen mit folgenden Parametern aufgerufen werden müssen:

comm.send(obj, dest, tag=0)
obj = comm.recv(buf=None, source=ANY_SOURCE, tag=ANY_TAG, status=None)

Der sendende Prozess ruft also die send-Methode auf und die zu sendenden Daten sind der erste Parameter obj dieser Funktion. Das zweite Argument dest ist der Rang des Empfänger-Prozesses. Der empfangende Prozess ruft zur gleichen Zeit die recv-Methode auf und gibt das empfangene Objekt als Return-Wert zurück. Häufig macht es noch Sinn den tag-Parameter zu setzen, um nach bestimmten Nachrichten zu filtern. Alle anderen Parameter werden für unsere Zwecke nicht benötigt. Es ist außerdem zu beachten, dass send und recv das Skript an dieser Stelle blockieren, und erst zur nächsten Zeile gesprungen wird, wenn das zugehörige Gegenstück die Daten empfängt bzw. sendet. Beide Funktionen gibt es auch noch in einer nicht-blockierenden Variante namens isend und irecv auf die wir hier aber nicht näher eingehen wollen.

Testen wir diese Methoden an unserem Beispiel aus. Zunächst erfragt jeder Prozess seinen Rang, setzt in Abhängigkeit des Rangs den Start- und Endindex für die Summation, wie in der inneren Summe in (9.1), und löst die entsprechende Teilaufgabe:

from mpi4py import MPI

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

N = 1000

# Parameter für jeden Prozess setzen
start_idx = rank*N//size+1
end_idx = (rank+1)*N//size

# Kurze Info
print("I am process", rank, "and I will sum up the numbers", start_idx, "to", end_idx)

# Lösung des Teilproblems
val = sum(range(start_idx, end_idx+1))

Nun muss der Hauptprozess alle Zwischenergebnisse einsammeln und diese aufsummieren. Der Hauptprozess muss also \((\textup{size}-1)\)-mal recv mit unterschiedlichen Quellen (source=1, 2, …, size-1) aufrufen, und jeder Hilfsprozess die send-Methode mit dem Hauptprozess als Ziel (dest):

if rank == 0:
    # Hauptprozess
    for i in range(1, size):
        tmp = comm.recv(tag=1, source=i)
        val = val + tmp
else:
    # Hilfsprozesse
    comm.send(val, 0, tag=1)

if rank == 0:
    print("The sum of 1 to", N, "is", val)

Ausgabe:

I am process 3 and I will sum up the numbers 751 to 1000
I am process 1 and I will sum up the numbers 251 to 500
I am process 2 and I will sum up the numbers 501 to 750
I am process 0 and I will sum up the numbers 1 to 250
The sum of 1 to 1000 is 500500

Der tag-Parameter ist in send- und recv-Funktion optional, ist aber bei Programmen mit mehreren Kommunikationsphasen hilfreich um sicherzustellen, dass sich auch das richtige send-recv-Paar austauschen.

Die Verarbeitung der weiteren Daten, in diesem Fall die Ausgabe des Ergebnisses auf der Konsole erfolgt dann nur vom Hauptprozess. Beachte, dass das Endergebniss nur auf dem Hauptprozess zur Verfügung steht. Wird das Gesamtergebnis auch auf den Hilfsprozessen benötigt, so kann man diese ebenfalls mit send (auf Hauptprozess) und recv (auf Hilfsprozessen) verteilen. Später lernen wir noch die Funktion bcast kennen, welche dieses Vorhaben sogar noch einfacher macht.

9.3. Aufgabenverteilung mit bcast und scatter#

Wir wollen hier 2 weitere Funktionen kennenlernen:

obj = comm.bcast(obj, root=0)
obj = comm.scatter(sendobj, root=0)

Die Broadcsat-Methode bcast sendet die gleichen Daten an alle Prozesse.

Broadcast

Fig. 9.1 Broadcast#

Wir könnten unser Programm beispielsweise so anpassen, dass das berechnete Endergebnis, was zunächst nur dem Hauptprozess vorliegt, an alle anderen Prozesse gesendet wird. Eventuell wird dies benötigt um die nächste Teilaufgabe in unserem Algorithmus anzugehen. In den meisten Fällen sendet der Hauptprozess mit Rang 0, also ist der Standardwert für den Parameter root ausreichend. Im Rückgabewert obj stehen die gesendeten Daten. Wir ergänzen das obere Beispiel durch folgende Zeilen:

val = comm.bcast(val)
print("I am process", rank, "and I also know the answer:", val)

Ausgabe:

I am process 0 and I also know the answer: 500500
I am process 1 and I also know the answer: 500500
I am process 2 and I also know the answer: 500500
I am process 3 and I also know the answer: 500500

Beachte, dass die Variable val bereits in jedem Prozess definiert ist und in den Hilfsprozessen noch das Ergebnis der ihnen zugewiesenen Teilaufgabe enthält. Dass der Wert in val vom Hauptprozess versendet wird ist durch den Funktionsparameter root=0 vorgegeben. Ändert man die Zeile auf val = comm.bcast(val, root=1), dann würden alle Prozesse das Zwischenergebnis vom Prozess mit Rang 1 empfangen.

Schauen wir uns nun die Methode scatter an.

Scatter

Fig. 9.2 Scatter#

Diese Methode kann verwendet werden um vom Hauptprozess generierte Daten auf alle Prozesse aufzuteilen. Diskutieren wir kurz die Ein- und Ausgabeparameter von scatter. Der erste Parameter sendobject muss im Hauptprozess eine Liste sein in der das \(i\)-te Element das Objekt ist, welches man an den \(i\)-ten Prozess senden möchte. In den Hilfsprozessen muss sendobj ebenfalls definiert werden und wir setzen einfach sendobj = None. Der zweite Parameter root mit Default-Wert 0 ist nur dann relevant, wenn die zu sendenden Daten von einem anderen als den Hauptprozess kommen. Dann muss root den Rang dieses Prozesses enthalten. Wir testen dies an folgendem Beispiel:

from mpi4py import MPI

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

if rank==0:
    sendobj = [i for i in range(size)]
    print("sendobj:", sendobj)
else:
    sendobj = None

obj = comm.scatter(sendobj)

print("I am process", rank, "and i received the data", obj)

Ausgabe:

sendobj: [0, 1, 2, 3]
I am process 0 and i received the data 0
I am process 1 and i received the data 1
I am process 2 and i received the data 2
I am process 3 and i received the data 3

Wir wollen die scatter-Funktion in unserem Beispiel nutzen um die Zeilen

start_idx = rank*N//size+1
end_idx = (rank+1)*N//size

etwas generischer zu gestalten. Zwar kennt jeder Teilprozess die für ihn relevanten Parameter start_idx und end_idx bereits vor Ausführen des Programms, aber man findet leicht Beispiele, in denen die für die Hilfsprozesse notwendigen Parameter erst zur Laufzeit bekannt werden. Beispielsweise könnten diese Parameter von einer Nutzereingabe abhängen, oder von dem Ergbnis eines vorherigen Schrittes in unserem Algorithmus. Wir ersetzen die 2 Zeilen durch folgenden Code:

if rank==0:
    N = 10000

    # Vorbereitung der Parameter für alle Prozesse
    start_indices = [p*N//size+1 for p in range(size)]
    end_indices = [(p+1)*N//size for p in range(size)]

    # Liste aus Tupeln der Form (start_idx, end_idx) erstellen
    sendobj = [(start_indices[p], end_indices[p]) for p in range(size)]
else:
    sendobj = None

data = comm.scatter(sendobj, root=0)

# Parameter aus empfangenen Objekt extrahieren
start_idx = data[0]
end_idx = data[1]

# Kurze Info
print("I am process", rank, "and I will sum up the numbers", start_idx, "to", end_idx)

Der Hauptprozess erstellt hier zunächst eine Liste mit den Parametern start_idx und end_idx für alle Prozesse. Das sendobject, was auf die Teilprozesse aufgeteilt wird, ist hier eine Liste, wobei die Listenelemente selbst Tupel sind, die die beiden Parameter enthalten. Die Hilfsprozesse extrahieren anschließend beide Parameter aus den ihn zugewiesenen Tupeln und können mit der Lösung der Teilaufgabe beginnen.

9.4. Prozesse zusammenführen mit reduce und gather#

Eher suboptimal in unserem Beispiel ist die Art und Weise wie die Zwischenergebnisse wieder an den Hauptprozess zur Weiterverarbeitung gesendet werden:

if rank == 0:
    # Hauptprozess
    for i in range(1, size):
        tmp = comm.recv(tag=1, source=i)
        val = val + tmp
else:
    # Hilfsprozesse
    comm.send(val, 0, tag=1)

Die Ergebnisse werden nacheinander gesendet. Besser geeignet sind in solchen Fällen die Methoden reduce und gather. Aus der Code-Dokumentation entnehmen wir folgenden Aufruf:

val  = reduce(sendobj, op=MPI.SUM, root=0)
List = gather(sendobj, root=0)

In beiden Fällen ist der erste Parameter sendobj, das Objekt, was das Zwischenergebnis jedes Prozesses beinhaltet. Der optionale Parameter root gibt an, welcher Prozess der Hauptprozess ist. Standardmäßig ist das der Prozess mit Rang 0. Die reduce-Methode benötigt weiterhin einen Parameter op. Das ist eine Rechenoperation, welche auf die Zwischenergebnisse angewendet wird und damit das Gesamtergebnis berechnet. Die wichtigsten Operationen sind

Operation

Beschreibung

MPI.SUM

Summiert alle Werte auf

MPI.PROD

Multipliziere alle Werte auf

MPI.MAX

Ermittle das Maximum aus allen Werten

MPI.MAXLOC

Ermittle das Maximum und den Rang des zug. Prozesses

Reduce

Fig. 9.3 Reduce#

Probieren wir die Methode für unser Beispiel aus. Wir ersetzen die oben stehenden Code-Zeilen mit

val = comm.reduce(val, op=MPI.SUM, root=0)

Das war schon alles. Die Variable val von allen Prozessen wird aufsummiert und überschrieben. Das richtige Ergebnis ist wieder nur im Prozess mit Rang 0 zu finden.

Die Methode gather arbeitet etwas anders. Prinzipiell ist sie das Gegenstück zur scatter-Methode. Anstatt Daten aus einer Liste auf alle Prozesse aufzuteilen, sammelt gather die Daten aller Prozesse und legt sie in eine Liste, die vom root-Prozess weiterverarbeitet werden kann.

Gather

Fig. 9.4 Gather#

Auf unser Beispiel angewendet sieht das wie folgt aus:

all_vals = comm.gather(val)
if rank==0:
    print(f"Gather gives me the following list: {all_vals}")

    # Diese Liste aufsummieren
    val = sum(all_vals)
    print("The sum of 1 to", N, "is", val)

Ausgabe:

Gather gives me the following list: [3126250, 9376250, 15626250, 21876250]
The sum of 1 to 10000 is 50005000.

Der vollständige Code zur Lösung unseres Beispiels ist:

from mpi4py import MPI
import numpy as np

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

if rank==0:
    N = 1000

    start_indices = [int(p*N/size+1) for p in range(size)]
    end_indices = [int((p+1)*N/size) for p in range(size)]
    
    sendobj = [(start_indices[p], end_indices[p]) for p in range(size)]
else:
    sendobj = None

data = comm.scatter(sendobj, root=0)

start_idx = data[0]
end_idx = data[1]

print(f"Hello! I am process {rank+1} of {size}. I sum up {start_idx} to {end_idx}")

# Lösung der Teilprobleme
val = sum(range(start_idx, end_idx+1))

# Option 1: Via send/recv
#if rank == 0:
#    for i in range(1,size):
#        tmp = comm.recv(source=i, tag=7)
#        val = val + tmp
#else:
#    comm.send(val, dest=0, tag=7)

# Option 2: Via gather
#all_vals = comm.gather(val, root=0)
#if rank==0:
#    print(f"Gather gives me the following list: {all_vals}")
#    val = sum(all_vals)

# Option 3: Via reduce
val = comm.reduce(val, op=MPI.SUM, root=0)

# Print result
if rank==0:
    print(f"The sum of 1 to {N} is {val}.")
Hello! I am process 1 of 1. I sum up 1 to 1000
The sum of 1 to 1000 is 500500.