はじめに
CやC++、あるいは以前のバージョンのJavaでスレッドプログラミングをした経験がある人は、コード内でスレッドを管理するのがいかに難しいことか知っているでしょう。シングルスレッドプログラムの場合は、コード内にバグがあるとその時点でアプリケーションに障害が発生します。しかしマルチスレッドプログラムの場合は、ある条件を満たしたときのみ障害が発生します。アプリケーション障害を引き起こすすべての条件を予測することは難しいので、スレッドプログラミングは手ごわい難題と言えます。この難題を最初から避けて通るプログラマもいますが、ベテランプログラマはコンピュータにかじりついて何とか成し遂げようとします。
J2SE 5.0プラットフォームには、新しい並列処理ユーティリティのパッケージが含まれています。このパッケージ内のクラスは、並列処理システムで使用される並列処理クラスやアプリケーションの基本要素となります。この並列処理ユーティリティには次のものが含まれています。
- 高パフォーマンスの柔軟なスレッドプール
- タスクを非同期実行するためのフレームワーク
- 並列アクセスのために最適化されたコレクションクラス群
本稿では、J2SE 5.0フレームワークの各種クラスとその重要な機能について紹介します。本稿のダウンロードサンプルには、すべての新しいスレッドフレームワーククラスの使い方を示す簡単なサンプルが含まれています。本稿を読みながらサンプルを実行してみると、各クラスの機能がよりよくわかるでしょう。
Executorフレームワーク
Executor
フレームワークは、手動で実装するのは難しかったり手間がかかったりする機能を簡単に実現してくれる、単純で標準化された拡張可能なクラスを提供します。このフレームワークは、呼び出し、スケジューリング、実行を標準化します。これを利用すると、一連の実行ポリシーに従って非同期タスクを制御することができます。
Executor
インターフェイスは、発行済みの実行可能タスクを実行します。このインターフェイスを使うと、タスク発行をタスク実行のメカニズムから切り離すことができます。プログラマは、通常は明示的にスレッドを作成する代わりにExecutor
を使用します。Executor
インターフェイスでは、タスクの同期実行と非同期実行の両方を行うことができます。
同期実行の場合は、次のコマンドを使用します。
Class MySynExecutor implements Executor{ public void execute(Runnable r) { r.run(); } }
非同期実行の場合は、次のコマンドを使用します。
Class MyASynExecutor implements Executor{ public void execute(Runnable r) { new Thread(r).start(); } }
ExecutorServiceクラス
ExecutorService
クラスには、1つまたは複数の非同期タスクの終了管理や進捗追跡を行うためのメソッドが用意されています。本稿のダウンロードサンプルの「MyExecutorService.java」ファイルは、終了管理のプロセスを示しています。このサンプルでは、サイズ3のThreadPool
を開始し、スレッドを順番に追加しています。スレッド数がプールの上限に達したときは、shutdown()
メソッドを呼び出します。shutdown()
メソッドの呼び出し後は、スレッドプールは実行用の新しいタスクを受け付けません。10秒待機した後に、スレッドプールはshutDownNow()
メソッドを呼び出します。このメソッドは、実行中のすべてのタスクを終了する最も効率的な手段です。このサンプルでは、アプリケーションは実行中のスレッドを終了できません。
ScheduledExecutorServiceクラス
私のお気に入りはScheduledExecutorService
クラスです。このクラスは、定期的に実行するタスクのスケジューリングをするときに非常に便利です。特に、クリーンアップジョブではこのクラスが役に立ちます(アプリケーションで作成した一時ファイルをすべて削除するときなど)。サンプルの「MyScheduledExecutorService.java」ファイルは、5秒ごとに「beep」を出力するというスケジューリングプロセスの例を示しています。
final Runnable beeper = new Runnable() { public void run() { System.out.println("beep"); } }; final ScheduledFuture beeperHandle = scheduler.scheduleAtFixedRate(beeper, 1, 5, SECONDS);
FutureインターフェイスとFutureTaskクラス
初期のバージョンのJavaでは、実行中のスレッドの状態を調べたり、実行後にスレッドから値を返させたりすることが非常に困難でした。run
メソッドはvoidを返すので、スレッドからの戻り値を受け取るには何行ものコードを書かなければなりませんでした。このアプローチに挑戦したことのあるプログラマは、その難しさを知っているでしょう。
Future
インターフェイスまたはFutureTask
クラスを使用すると、非同期実行されているスレッドから戻り値を取得することができます。Future
インターフェイスには、処理が完了しているかどうかの調査や、処理結果の取得、処理の取り消しを行うメソッドが定義されています。FutureTask
クラスには、Future
インターフェイスの各メソッドの基本実装が含まれています。結果を取得できるのは、処理が完了したときのみです。処理が完了していない場合は、get
メソッドはブロックされます。
サンプルの「MyStringReverser.java」ファイルは、FutureTask
クラスの簡単な使用例を示しています。このサンプルでは、発行された文字列を1秒に1文字というゆっくりした速度で返します。その間、メインスレッドはタスクが完了したかどうかを継続的にポーリングします。
while(!future.isDone()){ System.out.println("Task not yet completed."); try{ Thread.currentThread().sleep(500); }catch(InterruptedException ie){ System.out.println("Will check after 1/2 sec."); } }
タスクが完了したら、get
メソッドを使用してFuture
オブジェクトから結果を取得します。
System.out.println("Here is result..."+future.get());
ThreadPoolExecutorクラス
ThreadPoolExecutor
クラスを使用すると、独自のサーバーを作成することができます。このクラスには、大規模なエンタープライズEJBサーバーと同様に、設定やチューニングを行うための機能が数多く用意されています。ここで、このクラスの設定可能なパラメータをいくつか紹介しておきます。
- コアプールサイズと最大プールサイズ --
corePoolSize
とmaximumPoolSize
を同じ値に設定すると、固定サイズのスレッドプールを作成できます。maximumPoolSize
を事実上の無限値(Integer.MAX_VALUE
など)に設定すると、そのプールで任意の数の並列処理タスクを収容できるようになります。 - オンデマンド作成 -- 既定では、
ThreadPoolExecutor
は、新しいタスクがコアスレッドを必要とするときにのみコアスレッドを作成して開始します。しかし、prestartCoreThread
またはprestartAllCoreThreads
を使用すれば、この動作を動的にオーバーライドできます。 - キープアライブ時間 --
corePoolSize
で指定した数よりも多くのスレッドがプール内にある場合は、keepAliveTime
で指定した期間だけアイドル状態になったスレッドを終了させ、超過分をなくします。 - キュー管理 -- キュー管理は次のような規則で行われます。
- 実行中のスレッドの数が
corePoolSize
よりも少ない場合は、Executor
は要求をキューを入れずに、新しいスレッドを追加します。 - 実行中のスレッドの数が
corePoolSize
以上の場合は、Executor
は新しいスレッドを追加せずに、要求をキューに入れます。 - 要求をキューに入れることができない場合は、スレッド数が
maximumPoolSize
に達するまで、新しいスレッドを作成します。スレッド数がmaximumPoolSize
を超えてしまう場合は、タスクが拒否されます。 - フックメソッド -- このクラスには、各タスクの実行前と実行後に呼び出される
beforeExecute()
とafterExecute()
というフックメソッドがあります。これらのメソッドを使用するには、このクラスをサブクラス化する必要があります(これらのメソッドはprotected
であるため)。
サンプルの「MyThreadPoolExecutor.java」ファイルは、さまざまな設定パラメータを監視する方法の例を示しています。コード内のコメントを解除すると、各タスクの追加と終了によってプールサイズとキューサイズがどのように変化するのかを確認できます。また、コード内の設定値をいろいろ変更してみてください。
並列処理コレクション
JDK 1.5には、マルチスレッドのコンテキストで使用するための次のようなコレクション実装が含まれています。
ConcurrentHashMap
クラスでは、取得操作に関しては完全にスレッドセーフな並列処理サポートを使用でき、更新操作に関しては調整可能な予測並列処理を使用できます。CopyOnWriteArraySet
はSet
のスレッドセーフ版であり、CopyOnWriteArrayList
はArrayList
のスレッドセーフ版です。これらのクラスは、基になるセットまたは配列を修正するときに、事前にコピーを行います。したがって、読み取りは高速ですが更新は遅くなります。
これらの並列処理コレクションクラスは、Iterator
に対してスナップショット形式のデータを提供します(基になるデータが変更されても、その変更内容はIterator
に反映されません)。
同期支援クラス
JDK 1.5には、特殊な同期処理で使用するSemaphore
、CountDownLatch
、CyclicBarrier
、Exchanger
という高度なクラスがあります。これらのクラスの機能と用途について詳しく説明するためには少々高度な知識が必要になるため、別の機会に回します。
これらの新しいクラスがあれば、スレッド恐怖症の上司に対しても、マルチスレッドアプリケーションの開発を進言することができるでしょう。