はじめに
Javaプログラマが最初にマルチスレッドプログラミングを学ぶときに犯す間違いの1つに、ロックについての誤解があります。つまりオブジェクトをロックすれば、該当オブジェクトのフィールドやメソッドへのアクセスを防止できると考えがちなのですが、実際に行われるのは、他のスレッドが同一のロックを得るのを禁止するだけです。こうした誤解もわからなくはないのですが、これを勘違いしたままだと厄介な並行処理のバグを引き起こす可能性があります。
実際、多くの並行処理バグは、特定のデータが間違ったタイミングでよそからアクセスされた場合に起こるものであり、通常は何らかの変更を加えようとした場合がこれに相当します。一般の並行処理モデルでは、複数のソースファイルが関与する同期ゾーンをセットにして利用しています。これは微妙な相互依存関係の下で成り立つデザインであり、コードに変更を加えるような場合、異なる要素間の微妙な関係を管理するのが困難になり、「並行処理の崩壊」という現象に遭遇しやすくなります。ということは逆に、オブジェクトを真の意味でロックすることができ、自分だけが操作できる状況にすることができれば、非常に有用なのではないでしょうか?
本稿では、高負荷の並列サーバーで並行処理の崩壊を起こさないようにするための方法を解説します。これは、すべてのデータアクセスをコールバック機構を通じて行うように制限することで、サーバーの並行処理を一箇所にまとめるという方式です。これにより、並行処理制約に対する違反を簡単に特定できるようになります。
コールバックによるシングルスレッドアクセス
このチュートリアルでは、コールバックを用いてシングルスレッドアクセスを実現するサーバーの設計・構築方法を解説します。ここでは、機密データを含んでいるオブジェクトに対して「リニアなアクセス」が行われるようにします。つまり、同時にアクセスできるスレッドを1本だけに制限します。
本稿では、通常用いられているものとはまったく逆の手法でこれを実現します。通常のケースでは、複数のスレッドが同一のリソースにアクセスしようとする場合は、ロックを争うことになります。つまりロックを獲得したスレッドがそのアクセス権を独占し、必要な処理が終わった時点でロックを解放し、別のスレッドがアクセス権を得るという流れを取ります。
本稿のデザインではコールバックを使用します。まず、コールバックオブジェクトをaccess()
というメソッドを用いて「ゲートキーパー」に渡します。ゲートキーパーは、このコールバックオブジェクトのメソッドに機密データを渡します。コールバックオブジェクトがデータを使い終わると、ゲートキーパーが次のコールバックオブジェクトにデータを渡します。
こうして見ると、このデザインは、通常の方式と根本的に異なっているわけではありません。各々のオブジェクトやスレッドは順番待ちをして、自分の番が来れば排他的なアクセスを行います。ただし、コントロール権を求めてスレッド同士を競わせるのではなく、コントロール権をゲートキーパーオブジェクトにゆだねて、誰がアクセスできるかをこのゲートキーパーで判断させる点が異なります。
コールバックを用いるメリット
コールバックベースのシステムの実装には余分な作業を必要としますが、労力に見合うだけのメリットがあります。
通常の手法では、特定のデータにアクセスするには、必要なロックを事前に取得しなければなりません。同期メソッドを採用すればこうした処理を義務付けることはできますが、それでもやはりアクセス側のコードは同期プロトコルに従わなければならないので、この処置だけでは不十分なケースが生じてきます。たとえばシステムが巨大化するにつれて、行うべきではないタイミングでのデータアクセスを行いやすくなります。また、同期ブロックを設置すると、その分だけデッドロックが発生する危険性が高くなります。
これに対してコールバックを用いた方式では、データがメソッドに渡されて初めて該当データにアクセスできるようになり、このメソッドが終了すると、該当データへのアクセスはできなくなります。このように、データを利用できる期間が明確に特定されるのがこの方式のメリットです。
またこの方式では、ゲートキーパーによる制御も加わるので、これを利用して任意のオーダリングメカニズムを実装することもできます。この点、従来の待機/通知方式では、どのスレッドがどのタイミングでアクセス権を得るかを確認するための手段が存在しません。
Sumクラス
まず、今回のスレッド制御システムのアクセス対象となる機密データオブジェクトを作成します。ここではSum
という名前の単純なデータオブジェクトを作成することにします。
public class Sum { public int a, b, c; // ... }
Sum
には、c = a + b
という関係にあるa
、b
、c
という3つの数値を格納します。これらはパブリック変数なので、誰でもその値を変更することができ、c = a + b
という関係が崩れる場合も生じてきます。ここでは、こうした状況の防止策を施すようにします。
もちろん同期アクセス方式でも、変数c
を隠して保護することはできます。しかし本稿の目的は、通常の同期方式の利用が困難または適していない状況において利用できる、データ保護の代替手法を確認することです。Sum
は極めて単純なオブジェクトですが、ここでの関心はデータ構造の複雑さにあるのではなく、脆弱なデータをどう扱うかに注目しています。
GateKeeperクラス
Sum
オブジェクトへのアクセスは、すべてGateKeeper
クラスを経由します。GateKeeper
は、そのコンストラクタに渡されるオブジェクトのアクセスをコントロールします。
GateKeeper gk = new GateKeeper( new Sum() );
GateKeeper
中に隠されているオブジェクトを使用するには、GateKeeper
のuse()
メソッドにコールバックを渡す必要があります。
gk.use( user );
User、Accessor、Mutator インターフェース
Javaにはファーストクラスの関数が存在しないので、本当の意味でのコールバックを使うことはできません。ただし、インターフェースを用いることで、コールバックに類似した機能を実現することができます。
Accessor
インターフェースでは、データを読み取るオブジェクトを指定します。
public interface Accessor extends User { public void access( Object o ); }
同様にMutator
インターフェースでは、データを書き出すオブジェクトを指定します。
public interface Mutator extends User { public void mutate( Object o ); }
その他に、User
という空のインターフェースも定義する必要があります。User
を実装するオブジェクトは、Accessors
またはMutators
のいずれかになります。
public interface User { }
また、Accessor
とMutator
の両方を実装するMutatingAccessor
というインターフェースも用意しておきます。これは必須ではありませんが、これによってオブジェクト宣言を簡潔化することができます。
public interface MutatingAccessor extends Accessor, Mutator { }
use()の実装
処理の中心となるGateKeeper
クラスのuse()
メソッドを詳しく見ておきましょう。use()
のパラメータはUser
型なので、Accessor
、Mutator
またはその両方のオブジェクトを渡すことができます。
public void use( User user ) {
user
はMutator
とAccessor
のどちらにもなり得るので、どちらの側面を先に扱うのかを決める必要があります。このチュートリアルでは、Mutator
の方を先にします。
if (user instanceof Mutator) { Mutator mutator = (Mutator)user;
これで、user
がMutator
であることが分かります。しかしMutator
に変更を行わせるには、事前にロックを取得しておく必要があります。そのためには、変更処理をロック/アンロックのペアで囲みます。
try { rwlock.getWriteLock(); // LOCK mutator.mutate( o ); } finally { rwlock.releaseWriteLock(); // UNLOCK }
こうして見ると、コールバック方式は従来型のロック/アクセス/アンロックのパターンと基本的に変わらないことが理解できるでしょう。ただしここでのロック処理は、ゲートキーパーがすべてを掌握しています。こうした方式には、いくつかの大きなメリットがあります。
- ロックとアンロックの構造が非常に明確になる。
- ゲートキーパーがロックのポリシーを決定できる。
- クライアントコードが大幅に簡単化される。
- ロックとアンロックの処理が1箇所だけで行われる。
高可用性サーバーにとって最も重要な意味を持つのは、4番目のメリットでしょう。通常、マルチスレッドのデータ構造を扱う場合、すべてのクライアントがロックおよびアンロック処理に気配りをしなければなりませんが、これはまた、リソースをロックしたままアンロックをし忘れるという失敗を、誰もが犯し得ることを意味します。プログラマであれば誰でもこうしたミスを避けようとするでしょうが、実際にこうした事態はときどき発生してしまいます。この「ときどき」という発生率が、高可用性サーバーにとって容認しがたい頻度であれば、より強力な防止策を講じる必要があります。
先のコード部では、finally
ブロックを使って、状況の如何にかかわらず、リソースを確実にアンロックしています。
Mutator
による変更は実施できるようにしたので、今度はAccessor
によるアクセスも実行できるようにしなければなりません。Mutator
はデータの変更を行えますが、Accessor
に許可されているのはデータの読み込みだけです。
if (user instanceof Accessor) { Accessor accessor = (Accessor)user;
このコードは先に見たMutator
のものと非常によく似ていますが、読み込みロックのみが必要で、書き込みロックは不要である点が異なります。
try { rwlock.getReadLock(); accessor.access( o ); } finally { rwlock.releaseReadLock(); }
読み込みロックは排他的ではないので、この構成は、該当データを同時に複数のリーダーが読み込むことを許可します。同時に、データへの書き込みを行うスレッドについては、完全に排他的なアクセス持つことになります。
Poundクラス(GateKeeperのテスト)
今回の同期手法の中心はGateKeeper
であり、GateKeeper
の根幹を成しているのがuse()
メソッドです。use()
メソッドは、非常に短く構造も単純なので、一見して、行うべき処理を忠実に実行するはずのように思われます。たとえそうであっても、一通りのテストは施しておく必要があります。
こうしたテストを行ってくれるのがPound
クラスです。その名前から連想されるようにPound
は、多数のスレッドを用いて、可能な限りの高速でGateKeeper
を「連打」します。当然ながら、このGateKeeper
内部にはSum
オブジェクトを入れておく必要があります。
Pound
を実行するには次のようにします。
% java Pound 20
この場合は、20個のPound
オブジェクトが作成され、20本のスレッドで動作します。各々のPound
オブジェクトは、自分の順番が来るたびに、Sum
オブジェクトに対する変更と検証の処理を行います。
各Pound
は、Sum
オブジェクトにアクセスする際にGateKeeper
を通過する必要があります。つまり、これらはAccessor
およびMutator
になります。メインループを一巡するごとにPound
オブジェクトは、アクセス(読み込み)または変更(書き込み)を行います。mutate()
メソッドのコードを次に示します。
public void mutate( Object o ) { Sum sum = (Sum)o;
最初に、a
またはb
を変更します
// Change a or b. int delta = rand.nextInt( 2000 ) - 1000; if (rand.nextInt( 2 )==0) { sum.a += delta; } else { sum.b += delta; }
この処理の実行直後は、c = a + b
という関係が崩れています。ここで誰かがこのデータにアクセスするのは、できれば避けたい事態です。
この点を確認するために、yield()
とsleep()
を実行し、別のスレッドを実行できるようにします。仮にコードが正しく機能したとしても、それは別のスレッドがアクセスしなかったからだという場合もあり得るので、そうした可能性を排除して、コードそのものに問題がないことを確認すべきだからです。
// The better to stress the thread-safety of the system. Thread.yield(); try { Thread.sleep( 20 ); } catch( InterruptedException ie ) {}
一時停止した後に、データの不整合を修正して処理を続けます。
// Make sum correct again. sum.c = sum.a + sum.b; // Report. checkAndReport( sum, "mutate" ); pause(); }
mutate()
呼び出しの終了後、他のスレッドが実行できるようになります。access()
メソッドはもっと単純です。
Sum sum = (Sum)o; // Just check the sum. checkAndReport( sum, "access" ); pause();
唯一行うべきことは、値を出力して、c = a + b
という関係が実際に成立しているかを確認することです。すべてが正しく動作しているならば、そうした結果が得られるはずです。
コードの簡単なテストとしては、ある程度の数のスレッド(10から20本くらい)を使って、Pound
をしばらく実行し続けてください。それで、エラーが報告されなければOKです。
オブジェクトを使用するスレッド数の追跡
筆者がこのテクニックを開発したのは、以前にJavaのガベージコレクタの扱いに苦労したことがあったからです。あるときメモリ残量がなくなりかけたのですが、メモリはネイティブコードで割り当てられていたので、ガベージコレクタが働いてくれませんでした。処分してほしい巨大なオブジェクトがいくつかあったのですが、Javaはそれが巨大だとは認識しないので、そのままになっていました。
ここで本当に必要だったのは、特定のオブジェクトがいつ不要になったのかを知る方法なのですが、Javaではこれが隠されています。そもそもガベージコレクタの存在意義は、こうしたオブジェクトを自動的に処分してくれることです。
リニア変数やモナディックステート(monadic state)という考え方にヒントを得てたどり着いたのが、シングルスレッドあるいはリニアオブジェクトという、同時にアクセスできるのが単一のスレッドだけとなるようにすればいいという考え方でした(ここの解説では、複数のリーダーを許すよう拡張してありますが)。こうした構造を用いると、同一のオブジェクトを使用するスレッドの数をより細かく制御・追跡できるようになりますし、該当オブジェクトが不要になった段階で知りたいのはこうした情報なのです。