CodeZine(コードジン)

特集ページ一覧

Spring AMQPの実装と、AMQPを利用したアプリケーションの稼働

Spring AMQP×RabbitMQで始めるメッセージキューイング 第2回

  • ブックマーク
  • LINEで送る
  • このエントリーをはてなブックマークに追加
2013/01/18 14:00

 スケールしやすいシステムを作るためのキーワード『非同期処理』。今回は、オープンソースのメッセージキューイングミドルウェアであるRabbitMQをSpring frameworkを使って実装する入門編です。Spring AMQPを利用することでJavaで簡単に非同期アプリケーションの開発が行えます。

目次

はじめに

 前回の記事では、Spring AMQPとRabbitMQを利用した簡単なメッセージ送受信を行いました。今回は一歩進んで主に受信側の実装として必要となるメッセージリスナーを、前回に引き続きSpring AMQPを利用して実装を行います。メッセージリスナーを実装することで、RabbitMQを利用する場合のメリットでもある非同期なシステムの実装が可能です。また、実際にRabbitMQとSpring AMQPを利用したアプリケーションを本番環境で稼働させる場合に必要となる、冗長構成におけるポイントも紹介します。

対象読者

 RabbitMQを利用したJavaアプリケーションの開発を行いたいと考えている人。

非同期なメッセージリスナを実装する

 SpringAMQPには非同期にメッセージを処理するため、SimpleMessageListnerContainerというコンポーネントが用意されています。このコンポーネントを利用することで、メッセージを並行かつ非同期に取り扱うためのコールバック処理を簡単に実装できます。メッセージドリブンに処理を行うために、まずはメッセージを受信した場合の振る舞いを定義するハンドラクラスを用意しましょう。今回はハンドラクラスとしてHelloWorldHandlerというクラスを用意します。

HelloWorldHandler.java
public class HelloWorldHandler {
    public void handleMessage(String text) {
        System.out.println("Received: " + text);
    }
}

 ハンドラクラスでは、handleMessageメソッドを定義しています。handleMessageメソッドでは、メッセージが届いた場合の振る舞い(処理)を定義します。今回はサンプルなので単に届いたメッセージの内容を出力しています。なお、handleMessageというメソッド名はデフォルトで使用されるもので変更も可能です。

 このクラスを見てお気づきだと思いますが、ハンドラクラス自体はシンプルなクラスであり、何かのクラスを継承しているわけでもインターフェースを実装しているわけでもありません。これをMessageListenerインターフェースへと適用するのが、SpringAMQPのMessageListenerAdapterです。このアダプタ自身はSimpleMessageListenerContainerの設定時に指定します。SimpleMessageListenerContainerの設定は、前回同様コンフィグレーション用のクラスを用意して行います。今回は受信側のコンフィグレーションクラスとしてSampleListenerConfigというクラスを用意し、ここにSimpleMessageListenerContainerなどの設定を行なっていきます。

SampleListenerConfiguration.java
@Configuration
public class SampleListenerConfig {
    String queueName = "sample_queue";
    
    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        return connectionFactory;
    }
    @Bean
    public SimpleMessageListenerContainer listenerContainer(){
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory());
        container.setQueueNames(queueName);
        container.setMessageListener(new MessageListenerAdapter(new HelloWorldHandler()));
        return container;
    }
}

 まず、Springのコンフィグレーションクラスとして利用するため、@Configurationというアノテーションを付与したSampleListenerConfigクラスを用意します。次に前回同様、ConnectionFactoryをBeanとして登録するために、connectionFactoryメソッドを@Beanというアノテーションをつけて用意します。connectionFactoryメソッドの内容自体は前回と変わりません。続いて、今回のポイントであるSimpleMessageListenerContainerを定義していきます。

SimpleMessageListenerContainerの設定
@Bean
public SimpleMessageListenerContainer listenerContainer(){
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(connectionFactory());
    container.setQueueNames(queueName);
    container.setMessageListener(new MessageListenerAdapter(new HelloWorldHandler()));
    return container;
}

 SimpleMessageListenerContainerをインスタンス化した上で、各種プロパティをセットしていきます。まず利用するConnectionFactoryを先に記述した、connectionFactoryメソッドを利用してセットします。続いて、このコンテナが接続するキュー名を設定し、最後に利用するMessageListenerとして先ほどのMessageListenerAdapterを設定します。この際、処理を委譲するクラスとして、先ほど作成したハンドラクラスであるHelloWorldHandlerをインスタンス化したオブジェクトを引数として指定しています。

 最後に、SimpleMessageListenerContainerを起動するクラスを用意します。SimpleMessageListenerContainerは、標準ではSpringフレームワークのDIコンテナによってライフサイクルが管理されるコンポーネントであるので、コンテナを起動するだけで自動で起動されます。従って、起動クラスはSpringフレームワークで用意されているApplicationContextを生成するだけです。今回はアノテーションによる設定を行っているので、AnnotationConfigApplicationContextを使ってApplicationContextを生成します。

Consumer.java
public class Consumer {
    public static void main(String[] args){
        new AnnotationConfigApplicationContext(SampleListenerConfig.class);
    }
}

 実際に起動する前に送信側のクラスを用意します。基本的には前回利用したSenderと同様のものですが、今回はSpringの機能を利用して定期的にメッセージを送信するようにしてみましょう。Thread.sleepを利用するのではなく、Springで用意されているタスクスケジュール機能を利用した簡単な実装をご紹介します。

 まず、スケジュール実行するタスクを用意します。

ScheduledProducerTask.java
import java.util.Date;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;

public class ScheduledProducerTask {
    @Autowired
    RabbitTemplate rabbitTemplate;
    
    @Scheduled(fixedDelay=5000)
    public void sendMessage() {
        String message = "Message sent at " + new Date();
        rabbitTemplate.convertAndSend(message);
        System.out.println(message);
    }
}

 今回は実行する内容を、5秒おきにメッセージを送信する処理としました。まず、メッセージ送信に必要となるテンプレートクラスであるRabbitTemplateを、@Autowiredというアノテーションを付与して宣言することでインジェクションします。@Autowiredというアノテーションを利用することで、タイプ(型)が一致する依存性の注入を行うことができます。

 スケジューリングを行う処理自体は、@Scheduledというアノテーションを付与したメソッドを用意することで実現します。@Scheduledアノテーションの引数として、メソッドの実行タイミング(トリガー)を指定できます。今回は前回の処理終了後、指定ミリ秒経過後に実行を行うfixedDelayを指定していますが、以下のようなものが指定可能です。

  • fixedDelay……前回の実行終了時点から指定したミリ秒経過後に実行
  • fixedRate……前回の実行開始時点から指定したミリ秒経過後に実行
  • cron……UNIX系OSで利用できるcronと同様の形式で実行タイミングを指定

 スケジュール実行するタスクの用意が終わりましたので、次にこれを実行する設定を行います。Consumer側と同様にコンフィグレーションクラスを用意し、RabbitMQの接続設定とあわせて定義します。

ProducerConfig.java
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;

@Configuration
@EnableScheduling
public class ProducerConfig {
    String queueName = "sample_queue";
    
    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        return connectionFactory;
    }
    
    @Bean
    public RabbitTemplate rabbitTemplate(){
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
        rabbitTemplate.setRoutingKey(this.queueName);
        return rabbitTemplate;
    }
    
    @Bean
    public ScheduledProducerTask scheduledProducerTask(){
        return new ScheduledProducerTask();
    }
}

 今回はSpringのタスクスケジューラ機能を有効にするために、@Configurationに加えて@EnabledSchedulingというアノテーションをタイプレベルに指定しています。これにより、従来はXMLベースの設定ファイルで<task:*>として定義が必要であったタスクスケジュール機能を、Javaベースの設定だけで利用可能になります。

 ProducerConfigで定義しているconnectionFactoryメソッドおよびrabbitTemplateメソッドは、前回と同様のものです。今回はこれに加えてスケジュール実行するタスクとして、先ほど用意したScheduledProducerTaskをBeanとして登録しています。

 最後にこの設定クラスを読み込んで起動するクラスを用意します。Consumer.javaとほぼ同内容ですが、読み込む設定クラスに先ほど用意したProducerを指定しています。

Producer.java
import org.springframework.context.annotation.AnnotationConfigApplicationContext;

public class Pruducer {
    public static void main(String[] args){
        new AnnotationConfigApplicationContext(ProducerConfig.class);
    }
}

  • ブックマーク
  • LINEで送る
  • このエントリーをはてなブックマークに追加

あなたにオススメ

著者プロフィール

  • 西谷 圭介(ニシタニ ケイスケ)

    TIS株式会社所属。金融系基幹システムの開発等に従事したのち、サービス企画・開発を担当。IaaS開発を経て、現在はアプリ開発者のためのPaaS「eXcale」の開発責任者兼プログラマとして活動中。 Twitter:@Keisuke69 eXcale:http://www.excale.net...

バックナンバー

連載:Spring AMQP×RabbitMQで始めるメッセージキューイング
All contents copyright © 2005-2021 Shoeisha Co., Ltd. All rights reserved. ver.1.5