Titel | Inhalt | Suchen | Index | DOC | Handbuch der Java-Programmierung, 7. Auflage |
<< | < | > | >> | API | Kapitel 23 - Multithreading |
Wenn man sich mit Nebenläufigkeit beschäftigt, muss man sich in aller Regel auch mit Fragen der Synchronisation nebenläufiger Prozesse befassen. In Java erfolgt die Kommunikation zweier Threads auf der Basis gemeinsamer Variablen, die von beiden Threads erreicht werden können. Führen beide Prozesse Änderungen auf den gemeinsamen Daten durch, so müssen sie synchronisiert werden, denn andernfalls können undefinierte Ergebnisse entstehen.
Wir wollen uns als einleitendes Beispiel ein kleines Programm ansehen, bei dem zwei Threads einen gemeinsamen Zähler hochzählen:
001 /* Listing2309.java */ 002 003 public class Listing2309 004 extends Thread 005 { 006 static int cnt = 0; 007 008 public static void main(String[] args) 009 { 010 Thread t1 = new Listing2309(); 011 Thread t2 = new Listing2309(); 012 t1.start(); 013 t2.start(); 014 } 015 016 public void run() 017 { 018 while (true) { 019 System.out.println(cnt++); 020 } 021 } 022 } |
Listing2309.java |
Läßt man das Programm eine Weile laufen, könnte es
beispielsweise zu folgender Ausgabe kommen:
0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
33 <-- Nanu? Wo ist die 32?
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
32 <-- Ach so, hier!
58
59
Beide Prozesse greifen unsynchronisiert auf die gemeinsame Klassenvariable cnt zu. Da die Operation System.out.println(cnt++); nicht atomar ist, kommt es dazu, dass die Operation mitten in der Ausführung unterbrochen wird und der Scheduler mit dem anderen Thread fortfährt. Erst später, wenn der unterbrochene Prozess wieder Rechenzeit erhält, kann er seinen vor der Unterbrechung errechneten Zählerwert von 32 ausgeben. Sein Pendant war in der Zwischenzeit allerdings bereits bis 56 fortgefahren. Um diese Art von Inkonsistenzen zu beseitigen, bedarf es der Synchronisation der beteiligten Prozesse.
Zur Synchronisation nebenläufiger Prozesse hat Java das Konzept des Monitors implementiert. Ein Monitor ist die Kapselung eines kritischen Bereichs (also eines Programmteils, der nur von jeweils einem Prozess zur Zeit durchlaufen werden darf) mit Hilfe einer automatisch verwalteten Sperre. Diese Sperre wird beim Betreten des Monitors gesetzt und beim Verlassen wieder zurückgenommen. Ist sie beim Eintritt in den Monitor bereits von einem anderen Prozess gesetzt, so muss der aktuelle Prozess warten, bis der Konkurrent die Sperre freigegeben und den Monitor verlassen hat.
Das Monitor-Konzept wird mit Hilfe des in die Sprache integrierten Schlüsselworts synchronized realisiert. Durch synchronized kann entweder eine komplette Methode oder ein Block innerhalb einer Methode geschützt werden. Der Eintritt in den so deklarierten Monitor wird durch das Setzen einer Sperre auf einer Objektvariablen erreicht. Bezieht sich synchronized auf eine komplette Methode, wird als Sperre der this-Pointer verwendet, andernfalls ist eine Objektvariable explizit anzugeben.
Wir wollen uns diese Art der Verwendung an einem Beispiel ansehen, welches das oben demonstrierte Synchronisationsproblem löst. Die naheliegende Lösung, die Anweisung System.out.println(cnt++); durch einen synchronized-Block auf der Variablen this zu synchronisieren, funktioniert leider nicht. Da der Zeiger this für jeden der beiden Threads, die ja unterschiedliche Instanzen repräsentieren, neu vergeben wird, wäre für jeden Thread der Eintritt in den Monitor grundsätzlich erlaubt.
Stattdessen verwenden wir die (in Abschnitt 45.2.2 erläuterte) Methode getClass, die uns ein Klassenobjekt beschafft (ein und dasselbe für alle Instanzen), mit dem wir die Klassenvariable cnt schützen können:
001 /* Listing2310.java */ 002 003 public class Listing2310 004 extends Thread 005 { 006 static int cnt = 0; 007 008 public static void main(String[] args) 009 { 010 Thread t1 = new Listing2310(); 011 Thread t2 = new Listing2310(); 012 t1.start(); 013 t2.start(); 014 } 015 016 public void run() 017 { 018 while (true) { 019 synchronized (getClass()) { 020 System.out.println(cnt++); 021 } 022 } 023 } 024 } |
Listing2310.java |
Nun werden alle Zählerwerte in aufsteigender Reihenfolge ausgegeben.
Ein anderer Fall ist der, bei dem der Zugriff auf ein Objekt selbst synchronisiert werden muss, weil damit zu rechnen ist, dass mehr als ein Thread zur gleichen Zeit das Objekt verwenden will.
Im Folgenden werden die potenziellen Probleme am Beispiel eines Zählerobjekts erläutert, dessen Aufgabe es ist, einen internen Zähler zu kapseln, auf Anforderung den aktuellen Zählerstand zu liefern und den internen Zähler zu inkrementieren. Hierbei handelt es sich um eine Aufgabe, die beispielsweise in der Datenbankprogrammierung sehr häufig vorkommt, um Schlüsselnummern zu generieren.
In der Praxis wird das Synchronisationsproblem dadurch verschärft, dass die Verwendung des Zählers einige vergleichsweise langsame Festplattenzugriffe erforderlich macht. In unserem Beispiel wird der Zähler von fünf Threads verwendet. Die Langsamkeit und damit die Wahrscheinlichkeit, dass der Scheduler die Zugriffsoperation unterbricht, wird in unserem Beispiel durch eine Sequenz eingestreuter Fließkommaoperationen erhöht:
001 /* Listing2311.java */ 002 003 class Counter2311 004 { 005 int cnt; 006 007 public Counter2311(int cnt) 008 { 009 this.cnt = cnt; 010 } 011 012 public int nextNumber() 013 { 014 int ret = cnt; 015 //Hier erfolgen ein paar zeitaufwändige Berechnungen, um 016 //so zu tun, als sei das Errechnen des Nachfolgezählers 017 //eine langwierige Operation, die leicht durch den 018 //Scheduler unterbrochen werden kann. 019 double x = 1.0, y, z; 020 for (int i= 0; i < 1000000; ++i) { 021 x = Math.sin((x*i%35)*1.13); 022 y = Math.log(x+10.0); 023 z = Math.sqrt(x+y); 024 } 025 //Jetzt ist der Wert gefunden 026 cnt++; 027 return ret; 028 } 029 } 030 031 public class Listing2311 032 extends Thread 033 { 034 private String name; 035 private Counter2311 counter; 036 037 public Listing2311(String name, Counter2311 counter) 038 { 039 this.name = name; 040 this.counter = counter; 041 } 042 043 public static void main(String[] args) 044 { 045 Thread[] t = new Thread[5]; 046 Counter2311 cnt = new Counter2311(10); 047 for (int i = 0; i < 5; ++i) { 048 t[i] = new Listing2311("Thread-"+i,cnt); 049 t[i].start(); 050 } 051 } 052 053 public void run() 054 { 055 while (true) { 056 System.out.println(counter.nextNumber()+" for "+name); 057 } 058 } 059 } |
Listing2311.java |
Das Ergebnis des Programms ist - wie nicht anders zu erwarten - schlecht,
denn es werden sehr viele doppelte Schlüssel produziert. Ein
Beispiellauf brachte bereits in den ersten 15 Aufrufen 6 doppelte
Zählerwerte:
10 for Thread-2
11 for Thread-4
10 for Thread-0
10 for Thread-1
11 for Thread-2
11 for Thread-3
12 for Thread-4
13 for Thread-0
14 for Thread-1
15 for Thread-2
16 for Thread-3
17 for Thread-4
18 for Thread-0
19 for Thread-1
20 for Thread-2
Auch hier gibt es eine einfache Lösung für das Synchronisationsproblem. Eine einfache Markierung der Methode nextNumber als synchronized macht diese zu einem Monitor und sorgt dafür, dass der komplette Code innerhalb der Methode als atomares Programmfragment behandelt wird. Eine Unterbrechung des kritischen Abschnitts durch einen anderen Thread ist dann nicht mehr möglich:
001 public synchronized int nextNumber() 002 { 003 int ret = cnt; 004 //Hier erfolgen ein paar zeitaufwändige Berechnungen, um so 005 //zu tun, als sei das Errechnen des Nachfolgezählerstandes 006 //eine langwierige Operation, die leicht durch den 007 //Scheduler unterbrochen werden kann. 008 double x = 1.0, y, z; 009 for (int i= 0; i < 1000; ++i) { 010 x = Math.sin((x*i%35)*1.13); 011 y = Math.log(x+10.0); 012 z = Math.sqrt(x+y); 013 } 014 //Jetzt ist der Wert gefunden 015 cnt++; 016 return ret; 017 } |
Durch das synchronized-Attribut wird beim Aufruf der Methode die Instanzvariable this gesperrt und damit der Zugriff für andere Threads unmöglich gemacht. Erst nach Verlassen der Methode und Entsperren von this kann nextNumber wieder von anderen Threads aufgerufen werden.
Diese Art des Zugriffschutzes wird in Java von vielen Klassen verwendet, um ihre Methoden thread-sicher zu machen. Nach Aussage der Sprachspezifikation kann davon ausgegangen werden, dass die gesamte Java-Klassenbibliothek in diesem Sinne thread-sicher ist. Dies gilt allerdings nicht für die Swing-Klassen (siehe Kapitel 36). |
|
Neben dem Monitorkonzept stehen mit den Methoden wait und notify der Klasse Object noch weitere Synchronisationsprimitive zur Verfügung. Zusätzlich zu der bereits erwähnten Sperre, die einem Objekt zugeordnet ist, besitzt ein Objekt nämlich auch noch eine Warteliste. Dabei handelt es sich um eine (möglicherweise leere) Menge von Threads, die vom Scheduler unterbrochen wurden und auf ein Ereignis warten, um fortgesetzt werden zu können.
Sowohl wait als auch notify dürfen nur aufgerufen werden, wenn das Objekt bereits gesperrt ist, also nur innerhalb eines synchronized-Blocks für dieses Objekt. Ein Aufruf von wait nimmt die bereits gewährten Sperren (temporär) zurück und stellt den Prozess, der den Aufruf von wait verursachte, in die Warteliste des Objekts. Dadurch wird er unterbrochen und im Scheduler als wartend markiert. Ein Aufruf von notify entfernt einen (beliebigen) Prozess aus der Warteliste des Objekts, stellt die (temporär) aufgehobenen Sperren wieder her und führt ihn dem normalen Scheduling zu. wait und notify sind damit für elementare Synchronisationsaufgaben geeignet, bei denen es weniger auf die Kommunikation als auf die Steuerung der zeitlichen Abläufe ankommt.
Das folgende Beispiel demonstriert den Einsatz von wait und notify an einem Producer/Consumer-Beispiel. Ein Prozess arbeitet dabei als Produzent, der Fließkommazahlen »herstellt«, und ein anderer als Konsument, der die produzierten Daten verbraucht. Die Kommunikation zwischen beiden erfolgt über ein gemeinsam verwendetes Vector-Objekt, das die produzierten Elemente zwischenspeichert und als Medium für die wait-/notify-Aufrufe dient:
001 /* Listing2313.java */ 002 003 import java.util.*; 004 005 class Producer2313 006 extends Thread 007 { 008 private Vector<String> v; 009 010 public Producer2313(Vector<String> v) 011 { 012 this.v = v; 013 } 014 015 public void run() 016 { 017 String s; 018 019 while (true) { 020 synchronized (v) { 021 s = "Wert "+Math.random(); 022 v.addElement(s); 023 System.out.println("Produzent erzeugte "+s); 024 v.notify(); 025 } 026 try { 027 Thread.sleep((int)(100*Math.random())); 028 } catch (InterruptedException e) { 029 //nichts 030 } 031 } 032 } 033 } 034 035 class Consumer2313 036 extends Thread 037 { 038 private Vector<String> v; 039 040 public Consumer2313(Vector<String> v) 041 { 042 this.v = v; 043 } 044 045 public void run() 046 { 047 while (true) { 048 synchronized (v) { 049 if (v.size() < 1) { 050 try { 051 v.wait(); 052 } catch (InterruptedException e) { 053 //nichts 054 } 055 } 056 System.out.print( 057 " Konsument fand "+ v.elementAt(0) 058 ); 059 v.removeElementAt(0); 060 System.out.println(" (verbleiben: "+v.size()+")"); 061 } 062 try { 063 Thread.sleep((int)(100*Math.random())); 064 } catch (InterruptedException e) { 065 //nichts 066 } 067 } 068 } 069 } 070 071 public class Listing2313 072 { 073 public static void main(String[] args) 074 { 075 Vector<String> v = new Vector<String>(); 076 077 Producer2313 p = new Producer2313(v); 078 Consumer2313 c = new Consumer2313(v); 079 p.start(); 080 c.start(); 081 } 082 } |
Listing2313.java |
Um die Arbeitsverteilung zwischen den Prozessen etwas interessanter
zu gestalten, werden beide gezwungen, nach jedem Schritt eine kleine
Pause einzulegen. Da die Wartezeit zufällig ausgewählt wird,
kann es durchaus dazu kommen, dass der Produzent eine größere
Anzahl an Elementen anhäuft, die der Konsument noch nicht abgeholt
hat. Der umgekehrte Fall ist natürlich nicht möglich, da
der Konsument warten muss, wenn keine Elemente verfügbar sind.
Eine Beispielsitzung könnte etwa so aussehen:
Produzent erzeugte Wert 0.09100924684649958
Konsument fand Wert 0.09100924684649958 (verbleiben: 0)
Produzent erzeugte Wert 0.5429652807455857
Konsument fand Wert 0.5429652807455857 (verbleiben: 0)
Produzent erzeugte Wert 0.6548096532111007
Konsument fand Wert 0.6548096532111007 (verbleiben: 0)
Produzent erzeugte Wert 0.02311095955845288
Konsument fand Wert 0.02311095955845288 (verbleiben: 0)
Produzent erzeugte Wert 0.6277057416210464
Konsument fand Wert 0.6277057416210464 (verbleiben: 0)
Produzent erzeugte Wert 0.6965546173953919
Produzent erzeugte Wert 0.6990053250441516
Produzent erzeugte Wert 0.9874467815778902
Produzent erzeugte Wert 0.12110075531692543
Produzent erzeugte Wert 0.5957795111549329
Konsument fand Wert 0.6965546173953919 (verbleiben: 4)
Produzent erzeugte Wert 0.019655027417308846
Konsument fand Wert 0.6990053250441516 (verbleiben: 4)
Konsument fand Wert 0.9874467815778902 (verbleiben: 3)
Produzent erzeugte Wert 0.14247583735074354
Konsument fand Wert 0.12110075531692543 (verbleiben: 3)
Durch eine konstante Pause nach jedem produzierten Element könnte der Produzent bewusst langsamer gemacht werden. Der schnellere Konsument würde dann einen Großteil seiner Zeit damit verbringen, festzustellen, dass keine Elemente verfügbar sind. Zwar würde das Beispiel (in leicht modifizierter Form) auch ohne den Einsatz von wait/notify funktionieren. Durch ihre Verwendung aber ist der Konsumentenprozess nicht gezwungen, aktiv zu warten, sondern wird vom Produzenten benachrichtigt, wenn ein neues Element verfügbar ist. Der Rechenzeitbedarf reduziert sich dadurch auf einen Bruchteil dessen, was andernfalls benötigt würde. |
|
Seit Version 5 des JDK existieren eine Reihe weiterer Klassen im Multithreading-Umfeld. Sie liegen im Paket java.util.concurrent und seinen Unterpaketen java.util.concurrent.atomic und java.util.concurrent.locks. Unter anderem befindet sich dort die Klasse LinkedBlockingQueue, mit der das obige Beispiel kompakter geschrieben werden kann. Man kann sich die LinkedBlockingQueue als eine Datenstruktur mit zwei Enden vorstellen - von der einen Seite werden Elemente eingefügt, auf der anderen Seite werden sie herausgeholt. Die LinkedBlockingQueue ist selbstverständlich threadsafe. Es gibt verschiedene Methoden zum Einfügen und Herausholen von Objekten, die sich leicht unterschiedlich verhalten:
public boolean add(E e) public void put(E e) public boolean offer(E e) public E poll() public E peek() public E take() |
java.util.concurrent.LinkedBlockingQueue |
Mit add fügen Sie ein Element in die Queue ein. Es wird eine IllegalStateException geworfen, wenn die Queue voll ist und keine Elemente mehr aufnehmen kann. offer funktioniert wie add, jedoch wird keine Exception geworfen, sondern false zurückgegeben, wenn die Queue keine Elemente mehr aufnehmen kann. poll liefert und löscht das Element am Ende der Queue oder gibt null zurück, wenn die Queue leer ist. peek liefert das Element am Ende der Queue, ohne es zu entfernen. Ist die Queue leer, liefert sie null zurück. take funktioniert wie poll, wartet aber, wenn die Queue leer ist, bis ein Element zur Verfügung steht.
001 /* Listing2314.java */ 002 003 import java.util.concurrent.LinkedBlockingQueue; 004 005 class Producer2314 extends Thread 006 { 007 private final LinkedBlockingQueue<String> v; 008 009 public Producer2314(LinkedBlockingQueue<String> v) 010 { 011 this.v = v; 012 } 013 014 public void run() 015 { 016 String s; 017 018 while (true) { 019 s = "Wert " + Math.random(); 020 v.add(s); 021 System.out.println("Produzent erzeugte " + s); 022 try { 023 Thread.sleep((int) (100 * Math.random())); 024 } 025 catch (InterruptedException x) { 026 // nichts 027 } 028 } 029 } 030 } 031 032 class Consumer2314 extends Thread 033 { 034 private final LinkedBlockingQueue<String> v; 035 036 public Consumer2314(LinkedBlockingQueue<String> v) 037 { 038 this.v = v; 039 } 040 041 public void run() 042 { 043 while (true) { 044 try { 045 String s = v.take(); 046 System.out.print(" Konsument fand " + s); 047 System.out.println(" (verbleiben: " + v.size() + ")"); 048 Thread.sleep((int) (100 * Math.random())); 049 } 050 catch (InterruptedException e) { 051 // nichts 052 } 053 } 054 } 055 } 056 057 public class Listing2314 058 { 059 public static void main(String[] args) 060 { 061 LinkedBlockingQueue<String> v = 062 new LinkedBlockingQueue<String>(); 063 064 Producer2314 p = new Producer2314(v); 065 Consumer2314 c = new Consumer2314(v); 066 p.start(); 067 c.start(); 068 } 069 } |
Listing2314.java |
Sollen wie im vorigen Beispiel zwei Threads so miteinander verbunden werden, dass einer von beiden Daten erzeugt, die der andere verarbeitet, gibt es die Möglichkeit, beide mit Hilfe einer Pipe zu synchronisieren. Dabei werden die beiden Threads über einen ByteStream miteinander verbunden, der von einem Thread geschrieben und von dem anderen gelesen wird.
Dieses Piping-Konzept wird in Java durch die Klassen PipedInputStream und PipedOutputStream realisiert. Beide Klassen werden immer paarweise und immer in getrennten Threads verwendet. Daten, die der eine Thread in den PipedOutputStream schreibt, kann der andere aus dem angebundenen PipedInputStream lesen.
Da die Kommunikation gepuffert erfolgt, kann der schreibende Thread in einem gewissen Rahmen mehr Daten produzieren, als der lesende verarbeiten kann. Ist der Puffer voll, wird der schreibende Thread angehalten, bis der lesende ausreichend Zeichen gelesen hat. Greift der lesende Thread auf eine Pipe zu, die nicht genügend Daten enthält, muss er warten, bis ein anderer Thread die erforderliche Anzahl an Bytes hineingeschrieben hat.
Wir können das Producer-/Consumer-Beispiel unter Verwendung von PipedInputStream und PipedOutputStream vereinfachen, denn die gesamte Synchronisationsarbeit wird automatisch beim Aufruf der read- und write-Methoden erledigt.
Das folgende Listing zeigt zwei Threads, die über eine Pipe einzelne Datenbytes miteinander austauschen. Die Pipe wird vom Hauptprogramm erzeugt, indem zunächst der PipedInputStream angelegt und an den danach erzeugten PipedOutputStream übergeben wird. Alternativ hätte die Verbindung auch hergestellt werden können, indem einer der beiden Streams an die connect-Methode des anderen übergeben worden wäre.
001 /* Listing2315.java */ 002 003 import java.io.*; 004 005 class Producer2315 006 extends Thread 007 { 008 private PipedOutputStream pipe; 009 010 public Producer2315(PipedOutputStream pipe) 011 { 012 this.pipe = pipe; 013 } 014 015 public void run() 016 { 017 while (true) { 018 byte b = (byte)(Math.random() * 128); 019 try { 020 pipe.write(b); 021 System.out.println("Produzent erzeugte " + b); 022 } catch (IOException e) { 023 System.err.println(e.toString()); 024 } 025 try { 026 Thread.sleep((int)(100*Math.random())); 027 } catch (InterruptedException e) { 028 //nichts 029 } 030 } 031 } 032 } 033 034 class Consumer2315 035 extends Thread 036 { 037 private PipedInputStream pipe; 038 039 public Consumer2315(PipedInputStream pipe) 040 { 041 this.pipe = pipe; 042 } 043 044 public void run() 045 { 046 while (true) { 047 try { 048 byte b = (byte)pipe.read(); 049 System.out.println(" Konsument fand " + b); 050 } catch (IOException e) { 051 System.err.println(e.toString()); 052 } 053 try { 054 Thread.sleep((int)(100*Math.random())); 055 } catch (InterruptedException e) { 056 //nichts 057 } 058 } 059 } 060 } 061 062 public class Listing2315 063 { 064 public static void main(String[] args) 065 throws Exception 066 { 067 PipedInputStream inPipe = new PipedInputStream(); 068 PipedOutputStream outPipe = new PipedOutputStream(inPipe); 069 Producer2315 p = new Producer2315(outPipe); 070 Consumer2315 c = new Consumer2315(inPipe); 071 p.start(); 072 c.start(); 073 } 074 } |
Listing2315.java |
Piping gibt es auch für zeichenbasierte Kommunikation. Dazu werden die Klassen PipedWriter und PipedReader verwendet. Bis auf den Unterschied, dass Zeichen anstelle von Bytes ausgetauscht werden, entspricht ihre Arbeitsweise genau den hier vorgestellten Byte-Pipes. |
|
Titel | Inhalt | Suchen | Index | DOC | Handbuch der Java-Programmierung, 7. Auflage, Addison Wesley, Version 7.0 |
<< | < | > | >> | API | © 1998, 2011 Guido Krüger & Heiko Hansen, http://www.javabuch.de |