はじめに
前回の記事では、Spring AMQPとRabbitMQを利用した簡単なメッセージ送受信を行いました。今回は一歩進んで主に受信側の実装として必要となるメッセージリスナーを、前回に引き続きSpring AMQPを利用して実装を行います。メッセージリスナーを実装することで、RabbitMQを利用する場合のメリットでもある非同期なシステムの実装が可能です。また、実際にRabbitMQとSpring AMQPを利用したアプリケーションを本番環境で稼働させる場合に必要となる、冗長構成におけるポイントも紹介します。
対象読者
RabbitMQを利用したJavaアプリケーションの開発を行いたいと考えている人。
非同期なメッセージリスナを実装する
SpringAMQPには非同期にメッセージを処理するため、SimpleMessageListnerContainerというコンポーネントが用意されています。このコンポーネントを利用することで、メッセージを並行かつ非同期に取り扱うためのコールバック処理を簡単に実装できます。メッセージドリブンに処理を行うために、まずはメッセージを受信した場合の振る舞いを定義するハンドラクラスを用意しましょう。今回はハンドラクラスとしてHelloWorldHandlerというクラスを用意します。
public class HelloWorldHandler { public void handleMessage(String text) { System.out.println("Received: " + text); } }
ハンドラクラスでは、handleMessageメソッドを定義しています。handleMessageメソッドでは、メッセージが届いた場合の振る舞い(処理)を定義します。今回はサンプルなので単に届いたメッセージの内容を出力しています。なお、handleMessageというメソッド名はデフォルトで使用されるもので変更も可能です。
このクラスを見てお気づきだと思いますが、ハンドラクラス自体はシンプルなクラスであり、何かのクラスを継承しているわけでもインターフェースを実装しているわけでもありません。これをMessageListenerインターフェースへと適用するのが、SpringAMQPのMessageListenerAdapterです。このアダプタ自身はSimpleMessageListenerContainerの設定時に指定します。SimpleMessageListenerContainerの設定は、前回同様コンフィグレーション用のクラスを用意して行います。今回は受信側のコンフィグレーションクラスとしてSampleListenerConfigというクラスを用意し、ここにSimpleMessageListenerContainerなどの設定を行なっていきます。
@Configuration public class SampleListenerConfig { String queueName = "sample_queue"; @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost"); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); return connectionFactory; } @Bean public SimpleMessageListenerContainer listenerContainer(){ SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory()); container.setQueueNames(queueName); container.setMessageListener(new MessageListenerAdapter(new HelloWorldHandler())); return container; } }
まず、Springのコンフィグレーションクラスとして利用するため、@Configurationというアノテーションを付与したSampleListenerConfigクラスを用意します。次に前回同様、ConnectionFactoryをBeanとして登録するために、connectionFactoryメソッドを@Beanというアノテーションをつけて用意します。connectionFactoryメソッドの内容自体は前回と変わりません。続いて、今回のポイントであるSimpleMessageListenerContainerを定義していきます。
@Bean public SimpleMessageListenerContainer listenerContainer(){ SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory()); container.setQueueNames(queueName); container.setMessageListener(new MessageListenerAdapter(new HelloWorldHandler())); return container; }
SimpleMessageListenerContainerをインスタンス化した上で、各種プロパティをセットしていきます。まず利用するConnectionFactoryを先に記述した、connectionFactoryメソッドを利用してセットします。続いて、このコンテナが接続するキュー名を設定し、最後に利用するMessageListenerとして先ほどのMessageListenerAdapterを設定します。この際、処理を委譲するクラスとして、先ほど作成したハンドラクラスであるHelloWorldHandlerをインスタンス化したオブジェクトを引数として指定しています。
最後に、SimpleMessageListenerContainerを起動するクラスを用意します。SimpleMessageListenerContainerは、標準ではSpringフレームワークのDIコンテナによってライフサイクルが管理されるコンポーネントであるので、コンテナを起動するだけで自動で起動されます。従って、起動クラスはSpringフレームワークで用意されているApplicationContextを生成するだけです。今回はアノテーションによる設定を行っているので、AnnotationConfigApplicationContextを使ってApplicationContextを生成します。
public class Consumer { public static void main(String[] args){ new AnnotationConfigApplicationContext(SampleListenerConfig.class); } }
実際に起動する前に送信側のクラスを用意します。基本的には前回利用したSenderと同様のものですが、今回はSpringの機能を利用して定期的にメッセージを送信するようにしてみましょう。Thread.sleepを利用するのではなく、Springで用意されているタスクスケジュール機能を利用した簡単な実装をご紹介します。
まず、スケジュール実行するタスクを用意します。
import java.util.Date; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; public class ScheduledProducerTask { @Autowired RabbitTemplate rabbitTemplate; @Scheduled(fixedDelay=5000) public void sendMessage() { String message = "Message sent at " + new Date(); rabbitTemplate.convertAndSend(message); System.out.println(message); } }
今回は実行する内容を、5秒おきにメッセージを送信する処理としました。まず、メッセージ送信に必要となるテンプレートクラスであるRabbitTemplateを、@Autowiredというアノテーションを付与して宣言することでインジェクションします。@Autowiredというアノテーションを利用することで、タイプ(型)が一致する依存性の注入を行うことができます。
スケジューリングを行う処理自体は、@Scheduledというアノテーションを付与したメソッドを用意することで実現します。@Scheduledアノテーションの引数として、メソッドの実行タイミング(トリガー)を指定できます。今回は前回の処理終了後、指定ミリ秒経過後に実行を行うfixedDelayを指定していますが、以下のようなものが指定可能です。
- fixedDelay……前回の実行終了時点から指定したミリ秒経過後に実行
- fixedRate……前回の実行開始時点から指定したミリ秒経過後に実行
- cron……UNIX系OSで利用できるcronと同様の形式で実行タイミングを指定
スケジュール実行するタスクの用意が終わりましたので、次にこれを実行する設定を行います。Consumer側と同様にコンフィグレーションクラスを用意し、RabbitMQの接続設定とあわせて定義します。
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.EnableScheduling; @Configuration @EnableScheduling public class ProducerConfig { String queueName = "sample_queue"; @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost"); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); return connectionFactory; } @Bean public RabbitTemplate rabbitTemplate(){ RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory()); rabbitTemplate.setRoutingKey(this.queueName); return rabbitTemplate; } @Bean public ScheduledProducerTask scheduledProducerTask(){ return new ScheduledProducerTask(); } }
今回はSpringのタスクスケジューラ機能を有効にするために、@Configurationに加えて@EnabledSchedulingというアノテーションをタイプレベルに指定しています。これにより、従来はXMLベースの設定ファイルで<task:*>として定義が必要であったタスクスケジュール機能を、Javaベースの設定だけで利用可能になります。
ProducerConfigで定義しているconnectionFactoryメソッドおよびrabbitTemplateメソッドは、前回と同様のものです。今回はこれに加えてスケジュール実行するタスクとして、先ほど用意したScheduledProducerTaskをBeanとして登録しています。
最後にこの設定クラスを読み込んで起動するクラスを用意します。Consumer.javaとほぼ同内容ですが、読み込む設定クラスに先ほど用意したProducerを指定しています。
import org.springframework.context.annotation.AnnotationConfigApplicationContext; public class Pruducer { public static void main(String[] args){ new AnnotationConfigApplicationContext(ProducerConfig.class); } }