はじめに
スレッドによる処理の並列化は、アプリケーションのスループットをアップしたいという場面で、しばしば用いられる常套テクニックです。スレッドを用いたアプリケーションでは、処理の並列化という性質を考慮して実装する必要があります。そのスレッドアプリケーションの実装の1つのテクニックとして、本稿ではThreadGroup
のuncaughtException
メソッドのオーバーライドという方法を紹介します。
対象読者
Javaプログラミングを行ったことがある方を対象とします。
必要な環境
サンプルは以下の環境で動作確認を行っています。
ThreadGroupのuncaughtExceptionメソッドの活用
ThreadGroup
のuncaughtException
メソッドのシグネチャは以下の通りです。
void uncaughtException(Thread thread,Throwable t)
第1引数には、例外が発生したスレッドのインスタンスが渡されます。第2引数には、発生した例外が渡されます。
このメソッド内に例外発生時のエラーハンドリングを実装します。詳細はJavaAPI Documentを参照して下さい。
今回のサンプルで示すThreadGroup
のuncaughtException
メソッドのオーバーライドという方法は、以下のような場面で活用できます。
サブスレッド内でランタイム例外が発生した場合に適切なエラーハンドリングを行いたい
メインスレッドから起動されたサブスレッド内で、ランタイム例外がキャッチされなかった場合、標準エラーに例外のスタックトレースが出力されます。通常、標準エラー以外にもエラーログファイルの出力、システム管理者へのメール送信など、エラー発生時のハンドリングを行いたい場合がほとんどです。
スレッドクラスのrun
メソッド全体をtry-catch(Exception e)
で囲み、エラーハンドリングを実装するのも1つの手ですが、その実装をするかしないかはプログラマにゆだねられ、それが守られなかった場合、適切にエラーハンドリングが行われない恐れがあります。このような問題を回避するために、サンプルアプリケーションの方法を用いることにより、確実にエラーハンドリングを行うことが可能になります。
サブスレッド内でのランタイム例外発生に気づかない問題を回避したい
マルチスレッドアプリケーションでは、メインスレッドの処理とサブスレッドの処理は非同期に進行します。そのため、サブスレッド側で例外が発生した場合にもメインスレッドの動作には影響を及ぼさず、サブスレッドが停止するのみです。メインスレッド側でランタイム例外が発生した場合には、全体の処理(メインスレッド)が停止するため、バグがあった場合に容易に気づくことができます。
一方サブスレッド側でNullPointerException
などのランタイム例外が発生した場合は、サブスレッドが終了するだけなので、あたかもメインスレッドは正常終了したかのように見えます。開発者が例外のスタックトレース出力に気づかなかった場合には、例外を発生させた原因のバグが見過ごされ、システム運用後に不具合が発覚するといったようなことが起こりえます。このような問題の解決方法として、サンプルアプリケーションの方法を用いることにより、メインスレッド側でサブスレッド側のエラー発生を知ることができ、バグの発生を未然に防ぐことができます。
ThreadGroupとThreadの関係
本稿で紹介するスレッドグループとスレッドの関係を図に示します。
あるスレッドは、必ずスレッドグループに属します。スレッドのコンストラクタに明示的にスレッドグループのインスタンスを指定しない場合は、デフォルトのスレッドグループに属することとなります。本稿では、スレッドのコンストラクタに明示的にスレッドグループのインスタンスを指定する方法を採用します。
サンプルアプリケーション仕様
サンプルは、複数行から構成されるCSVファイルを読み込み、マルチスレッドで並列に1行ごとのデータを新規登録・更新・削除するアプリケーションです。
入力CSVファイルデータ形式
入力となるCSVファイルはカンマ区切りで以下のデータ構成になっています。
カラム番号 | データ内容 |
1カラム目 | データ処理種別(n=新規登録 / u=更新 / d=削除) |
2カラム目 | データ |
※サンプルアプリケーションでは、処理を単純にするために2カラム目のデータには半角カンマが指定されないものとして実装されています。
全体フロー
- データ処理
- データ処理内でのエラー検知時フロー制御
データ処理エラー発生時ハンドリング
データ処理内でシステムエラーが発生した際には、以下のハンドリングを行います。
- エラー内容をログファイルに出力
- システム管理者あてにメールを送信
データ進行状況ログ出力
データ処理の開始(START)、終了(END)をログファイルに出力します。
データ処理件数ログ出力
全データの処理完了後、新規登録・更新・削除の件数をログファイルに出力します。
ソースの全体構成
以下の表に、サンプルアプリケーションを構成するクラスを示します。
ソースファイル名 | 説明 |
CSV2DataUpdateApp | CSVファイルを読み込み、データ処理スレッドクラスを起動するアプリケーションクラス。 |
DataUpdateThreadGroup | データ処理用スレッドクラスが所属するスレッドグループクラス。java.lang.ThreadGroup を継承している。 |
AbstDataUpdateThread | データ処理用抽象スレッドクラス。java.lang.Thread を継承している。 |
CreateThread | データ新規登録用のスレッド実装クラス。AbstDataUpdateThread を継承している。 |
UpdateThread | データ更新用のスレッド実装クラス。AbstDataUpdateThread を継承している。 |
DeleteThread | データ削除用のスレッド実装クラス。AbstDataUpdateThread を継承している。 |
データ処理スレッドクラス
public class DeleteThread extends AbstDataUpdateThread{ (中略) /** * データを削除します。 */ public void run(){ logProcess( true ); // 処理進行ログ // ここで削除処理を実行(サンプルのため、実装なし) int tmp = 2/0; //わざとランライム例外を発生させる。 // スレッドが所属するスレッドグループへの参照を取得 DataUpdateThreadGroup manThreadGrp = (DataUpdateThreadGroup)getThreadGroup(); manThreadGrp.incrementDelete(); // 削除件数を加算 logProcess( false ); // 処理進行ログ } }
データ削除処理用スレッドクラスです。スレッドの実装であるrun
メソッドを実装しています(サンプルのため実際のデータ処理は実装していない)。ここでは、わざとランタイム例外が発生するように実装されています。新規登録処理用のCreateThread
クラス、更新処理用のUpdateThread
クラスもほぼ同様の実装です。
public abstract class AbstDataUpdateThread extends Thread{ /** * コンストラクタ * @param threadGroup スレッドが所属するスレッドグループ * @param threadPrefix スレッド名につけるプレフィックス */ public AbstDataUpdateThread( ThreadGroup threadGroup,String threadPrefix){ super( threadGroup , threadPrefix + "_" + (new SimpleDateFormat("yyyymmddhhmmssSSS") ) .format(new Date() ) ); } /** 対象データ */ protected String data = null; /** * @param data The data to set. */ public void setData(String data) { this.data = data; } }
データ処理用スレッドクラスの共通処理を定義したスーパークラスです。コンストラクタで、スレッドが所属するスレッドグループと、スレッド名につけるプレフィックスを受け取っています。スレッド名は、「プレフィックス_タイムスタンプ」の形式としています。
スレッドグループクラス
/** * 所属するThreadの実行管理、 * 新規登録・更新・削除の更新件数の管理を行うクラスです。 */ public class DataUpdateThreadGroup extends ThreadGroup{ (中略) /** エラーが起こったかどうかの真偽値 */ boolean errorOccured = false; (中略) /** * スレッド内でcatchされていない例外をハンドリングします。 */ public void uncaughtException(Thread t, Throwable e) { errLogger.error( " Exception Occured at " + t.getName() , e ); this.errorOccured = true; }
DataUpdateThreadGroup
クラスはデータ処理用スレッドクラスが属するスレッドグループ実装クラスです。ここでは、uncaughtException
メソッドをオーバーライドしています。このメソッドはスレッドグループに所属するスレッド内でキャッチされなかった例外があった場合に呼び出されます。メソッド内では、例外内容のエラーロギングを実装しています(Log4Jの機能により、エラーログファイル出力とメール送信)。
エラーロギング後に、サブスレッド内で例外が発生したことを示すerrorOccured
プロパティをtrue
に設定しています。サンプルアプリケーションではこのフラグがtrue
になった場合に、処理フロー制御クラス(CSV2DataUpdateApp.java
)がCSVファイルの読み込みを中止し、全体処理を停止します。
処理フロー制御クラス
/** * 全体の処理フローを制御します。 */ public void exec(String fileNm) throws UnsupportedEncodingException,FileNotFoundException, IOException,InterruptedException{ BufferedReader br = null; try { br = new BufferedReader(new InputStreamReader( new FileInputStream(new File( fileNm )),"Shift_JIS" ) ); // データ処理用スレッドが属するスレッドグループの生成 DataUpdateThreadGroup manageTG = new DataUpdateThreadGroup( "manageTG" ); String line = null; //---------------------------------------------- // ファイルを全て読み込むまで処理を続けます。 //---------------------------------------------- while( ( line = br.readLine() ) != null ){ // サブスレッド内でエラーがあった場合、 // 起動した全てのスレッドが終了するのを待って // 全体の処理を停止します。 if ( manageTG.isErrorOccured() ) { waitThreadEnd(manageTG); errLogger.error( "Process End because of Error" ); System.exit( 0 ); } String[] csvdata = line.split( "," ); // CSVデータ解析 // 1カラム目の値に対応したスレッドを起動します。 AbstDataUpdateThread commonThread = makeThread(manageTG,csvdata[0]); commonThread.setData(csvdata[1]); commonThread.start(); // 分かりやすいサンプルにするため、1秒停止 Thread.sleep( 1000 ); } //---------------------------------------------- // 起動した全てのスレッドが終了するのを待ちます。 //---------------------------------------------- waitThreadEnd(manageTG); cntLogger.info( "新規:" + manageTG.getCreateCnt() + "/更新:" + manageTG.getUpdateCnt() + "/削除:" + manageTG.getDeleteCnt() ); }finally{ if ( br != null) br.close(); } }
サンプルアプリケーション全体の処理フローを制御しているメソッドです。全体の処理フローは以下の順で処理が行われます。
- ファイルを開く。
- ファイルを全て読み込むまで以下の処理を繰り返す。
- データ処理用スレッドでエラーが発生した場合、起動したスレッドの終了を待ち、全体の処理を終了
- ファイル1行読み込み
- データ処理用スレッド起動
- 全スレッドが終了するのを待つ。
- 件数ログに出力。
/** * スレッドグループ内で起動中のスレッド数が0になるまで待機します。 */ private void waitThreadEnd(ThreadGroup threadGroup) throws InterruptedException{ while ( true ){ if ( threadGroup.activeCount() == 0 ) break; // 全スレッドが終了していない場合は1秒待つ Thread.sleep( 1000 ); } }
スレッドグループ内に属するスレッドが全て終了するまで待つためのメソッドです。ThreadGroup
クラスのactiveCount
メソッドの戻り値が0(ゼロ)になった場合、スレッドグループ内に属するスレッドが全て終了したとみなし、無限ループを抜けます。
実行
それでは、アプリケーションを実行してみましょう。
テストデータ
サンプルCSVファイルのデータは以下のようになっています。
n,新規データ1 n,新規データ2 u,更新データ1 u,更新データ2 d,削除データ3 u,更新データ3 u,更新データ4
削除データ(1カラム目が「d」)処理用クラスのDeleteThread
のrun
メソッド内で、わざとランタイム例外が発生するように実装されていますので、5行目のデータ「d,削除データ3」を処理後にメインスレッド側で異常を検出し、全体処理が停止されます。
準備
サンプルを展開したディレクトリの「conf/log4j.properties」ファイルの以下の場所を環境に合わせて修正します。
log4j.appender.errorMail.SMTPHost=xxx.xxx.xxx.xxx (メール送信に使用するSMTPサーバー) log4j.appender.errorMail.From=xxx@xxx.xxx (エラーメール送信元メールアドレス) log4j.appender.errorMail.To=xxx@xxx.xxx (エラーメール送信先メールアドレス)
実行
コマンドプロンプトを開き、サンプルを展開したディレクトリに移動してから「exec.bat」を実行します。
実行結果
処理の進行状況用ログファイルには以下のように出力されます。
実行後にlogディレクトリに出力されるログファイルを参照と、テストデータ5行目のログ(Excelの8行目)の「START」は記録されていますが、「END」のログは記録されていませんので異常終了したことが分かります。
またテストデータ6行目以降のデータのログも記録されていないことから、テストデータ5行目のデータ処理でサブスレッドの例外を検知し、全体の処理が停止されたことがわかります。なおテストデータ4行目の「END」のログが、いちばん最後の行に出力されているのは、メインスレッド側で、起動済みスレッドの終了を待って全体処理を停止したためです。
まとめ
サブスレッドで例外が発生した時のエラーハンドリングとしてThreadGroup
のuncaughtException
メソッドのオーバーライドを実際に使用したアプリケーションを紹介しました。この方法を活用することで、メインスレッド側でサブスレッドを管理することができ、より強固なマルチスレッドアプリケーションが作成できるようになるでしょう。