Apache Camel を使用してRabbitMQ のデータと連携

加藤龍彦
加藤龍彦
デジタルマーケティング
Apache Camel のルーティングとCData JDBC Driver を使用してRabbitMQ のデータをディスク上のJSON ファイルにコピーするシンプルなJava アプリを作成。



Apache Camel は、データを消費または生成するさまざまなシステムを統合できる、オープンソースの統合フレームワークです。CData JDBC Driver for API と組み合わせることで、リアルタイムRabbitMQ のデータと連携するCamel ルートを使用するJava アプリを作成できます。この記事では、RabbitMQ のデータをJSON ファイルに接続、クエリ、及びルーティングするアプリをNetBeans で作成する方法について説明します。

ビルトインの最適化されたデータ処理により、CData JDBC Driver は、リアルタイムRabbitMQ のデータとやり取りする際に比類のないパフォーマンスを提供します。RabbitMQ に複雑なSQL クエリを発行すると、ドライバーはフィルタや集計などのサポートされているSQL 操作をRabbitMQ に直接プッシュし、組み込まれたSQL エンジンを利用してサポートされていない操作(主にSQL 関数とJOIN 操作)をクライアント側で処理します。組み込みの動的メタデータクエリを使用すると、ネイティブデータソース型を使用してRabbitMQ のデータを操作および分析することができます。

新しいMaven/Java プロジェクトを作成する

以下の手順に従って、新しいJava プロジェクトを作成し、適切な依存関係を追加します。

  1. NetBeans を開き、新しいプロジェクトを作成します。
  2. カテゴリリストからMaven を選択し、プロジェクトリストからJava Application を選択して、「Next」をクエリします。
  3. プロジェクトに名前を付け、他のプロパティを調整して「Finish」をクリックします。
  4. ソースパッケージで新しいJava クラス(ここではApp.java を使用)を作成し、クラスにmain メソッドを追加します。

プロジェクトの依存関係を追加する

プロジェクトが作成されたら、アプリからリアルタイムRabbitMQ のデータを操作するために必要な依存関係を追加できるようになります。まだMaven を環境にインストールしていない場合、CData JDBC ドライバのJAR ファイルをプロジェクトに追加するのに必要なため、インストールしてください。

Maven を使用してCData JDBC Driver for API をインストールする

  1. RabbitMQ 用のCData JDBC Driver をダウンロードしてパッケージを解凍し、JAR を実行してドライバーをインストールします。
  2. Maven を使用し、コネクタとしてJDBC Driver をインストールします。
    mvn install:install-file
    	-Dfile="C:\Program Files\CData\CData JDBC Driver for API 2019\lib\cdata.jdbc.api.jar"
    	-DgroupId="org.cdata.connectors"
    	-DartifactId="cdata-api-connector"
    	-Dversion="19"
    	-Dpackaging=jar
    

JDBC Driver をインストールしたら、プロジェクトに依存関係を追加できます。依存関係を追加するには、pom.xml を編集するか、依存関係にあるフォルダを右クリックして「Add Dependency」をクリックします。各依存関係のプロパティは以下の通りですが、「Add Dependency」ウィザードの「Query」ボックスに依存関係の名前を入力することで使用可能なライブラリを検索できます。

Selecting a dependency

必要な依存関係

DependencyGroup IDArtifact IDVersion
camel-coreorg.apache.camelcamel-core3.0.0
camel-jacksonorg.apache.camelcamel-jackson3.0.0
camel-jdbcorg.apache.camelcamel-jdbc3.0.0
camel-jsonpathorg.apache.camelcamel-jsonpath3.0.0
cdata-api-connectororg.cdata.connectorscdata-salesforce-connector19
commons-dbcp2org.apache.commonscommons-dbcp22.7.0
slf4j-log4j12org.slf4jslf4j-log4j121.7.30
log4jorg.apache.logging.log4jlog4j2.12.1

Camel を使用してJava アプリでRabbitMQ のデータにアクセスする

必要な依存関係を追加したら、Java DSL(Domain Specific Language)を使用してリアルタイムRabbitMQ のデータにアクセスできるルートを作成できます。以下はコードの一部です。サンプルプロジェクト(zip ファイル)をダウンロードして以下を実行してください。(TODO コメントに注意してください。)

必要なクラスをメインクラスにインポートすることから始めます。

import org.apache.camel.CamelContext;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.support.SimpleRegistry;
import org.apache.commons.dbcp2.BasicDataSource;
import org.apache.log4j.BasicConfigurator;

次に、main メソッドでロギングを構成し、新しいBasicDataSource を作成してレジストリに追加し、新しいCamelContext を作成して、最後にコンテクストへのルートに追加します。この例では、RabbitMQ のデータをJSON ファイルにルーティングします。

ロギングを構成する

BasicConfigurator.configure();

BasicDataSource を作成する

BasicDataSource を作成し、ドライバークラス名(cdata.jdbc.salesforce.SalesforceDriver)とURL(必要な接続プロパティを使用)を設定します。

RabbitMQ Management HTTP API について

RabbitMQ は、複数のメッセージングプロトコルをサポートするオープンソースのメッセージブローカーです。RabbitMQ Management HTTP API は、RabbitMQ サーバーの管理データと監視データに HTTP 経由でアクセスする手段を提供します。この API では、仮想ホスト、エクスチェンジ、キュー、バインディング、コネクション、チャネル、コンシューマー、ユーザー、権限、ポリシー、クラスター全体の統計情報を取得できます。

HTTP API を利用するには、RabbitMQ サーバーで Management プラグインを有効化する必要があります。デフォルトでは、管理インターフェースはポート 15672 でリッスンします。

Basic 認証の設定

RabbitMQ Management HTTP API は HTTP Basic 認証を使用します。RabbitMQ 管理ユーザーのユーザー名とパスワードを指定する必要があります。

管理 API へのアクセスを有効にするには、以下のステップで進めます:

  1. サーバーで RabbitMQ Management プラグインが有効になっていることを確認します(rabbitmq-plugins enable rabbitmq_management)。
  2. 既存の管理ユーザーを使用するか、適切な管理タグ(management、policymaker、monitoring、または administrator)を持つユーザーを作成します。
  3. RabbitMQ Management HTTP API の完全なベース URL を控えておきます(例:http://localhost:15672)。

RabbitMQ サーバーを設定したら、以下の接続プロパティを設定して接続します:

  • AuthScheme:Basic に設定します。
  • URL:RabbitMQ Management HTTP API のベース URL に設定します(例:http://localhost:15672)。
  • User:RabbitMQ の管理ユーザー名に設定します(例:guest)。
  • Password:RabbitMQ の管理パスワードに設定します。

接続文字列の例:

Profile=C:\profiles\RabbitMQ.apip;AuthScheme=Basic;URL=http://localhost:15672;User=guest;Password=guest;

利用可能なテーブル

RabbitMQ プロファイルでは、以下のテーブルにアクセスできます:

  • Overview - クラスター全体の統計情報と RabbitMQ ノードに関する情報
  • Nodes - RabbitMQ クラスター内の個々のノードに関する情報
  • NodeMemory - 特定のクラスターノードの詳細なメモリ使用状況の内訳
  • Connections - ブローカーへのすべてのオープンな AMQP コネクションの一覧
  • Channels - すべてのコネクションにわたるオープンな AMQP チャネルの一覧
  • Consumers - すべてのキューに登録されたコンシューマーの一覧
  • Exchanges - すべての仮想ホストで宣言されたエクスチェンジの一覧
  • Queues - すべての仮想ホストで宣言されたキューの一覧
  • Bindings - エクスチェンジとキュー間のすべてのバインディングの一覧
  • VirtualHosts - ブローカーに設定された仮想ホストの一覧
  • VhostPermissions - 特定の仮想ホスト内のユーザー権限
  • Users - すべての RabbitMQ ユーザーの一覧
  • Permissions - すべての仮想ホストにわたる全ユーザーの権限レコード
  • TopicPermissions - 全ユーザーのトピックレベルの権限レコード
  • Policies - 仮想ホスト内のキューおよびエクスチェンジに適用されたポリシーの一覧
  • OperatorPolicies - 仮想ホスト内のキューに適用されたオペレーターポリシーの一覧
  • Parameters - 仮想ホストごとのコンポーネントパラメータ(例:federation、shovel)の一覧
  • GlobalParameters - すべての仮想ホストに適用されるグローバルパラメータの一覧
  • VhostLimits - 特定の仮想ホストに設定されたリソース制限
  • UserLimits - 特定のユーザーに設定されたリソース制限
  • FeatureFlags - フィーチャーフラグの一覧と、ノード上での有効/無効の状態
  • DeprecatedFeatures - 非推奨機能の一覧と、その使用状態
  • AuthAttempts - ノードの認証試行統計
  • ClusterName - RabbitMQ クラスターの名前
  • WhoAmI - 現在認証されている管理ユーザーに関する情報
  • ExchangeBindingsSource - 特定のエクスチェンジがソースとなっているバインディング
  • ExchangeBindingsDestination - 特定のエクスチェンジが宛先となっているバインディング
  • QueueBindings - 仮想ホスト内の特定のキューのバインディング
BasicDataSource basic = new BasicDataSource();
basic.setDriverClassName("cdata.jdbc.api.APIDriver");
basic.setUrl("jdbc:api:Profile=C:\profiles\\RabbitMQ.apip;AuthScheme=Basic;URL=http://localhost:15672;User=guest;Password=guest;");

CData JDBC ドライバには、接続URL の構成に役立つ組み込みの接続文字列デザイナーが含まれています。

組み込みの接続文字列デザイナ

JDBC URL の構築については、RabbitMQ JDBC Driver に組み込まれている接続文字列デザイナーを使用してください。JAR ファイルをダブルクリックするか、コマンドラインからjar ファイルを実行します。

java -jar cdata.jdbc.api.jar

接続プロパティを入力し、接続文字列をクリップボードにコピーします。

Using the built-in connection string designer to generate a JDBC URL (Salesforce is shown.)

BasicDataSource をレジストリに追加し、CamelContext を作成する

SimpleRegistry reg = new SimpleRegistry();
reg.bind("myDataSource", basic);

CamelContext context = new DefaultCamelContext(reg);

CamelContext にルーティングを追加する

以下のルーティングでは、timer コンポーネントを使用して一度実行し、SQL クエリをJDBC Driver に渡します。結果はJSON として整理され、(きれいに印刷できるようにフォーマットされて)file コンポーネントに渡され、JSON ファイルとしてディスクに書き込まれます。

context.addRoutes(new RouteBuilder() {
	@Override
	public void configure() {
		from("timer://foo?repeatCount=1")
			.setBody(constant("SELECT * FROM Account LIMIT 10"))
			.to("jdbc:myDataSource")
			.marshal().json(true)
			.to("file:C:\\Users\\USER\\Documents?fileName=account.json");
	}
});

CamelContext ライフサイクルを管理する

ルートを定義したら、CamelContext を開始してライフサイクルを始めます。この例では、10 秒待機してからコンテクストをシャットダウンします。

context.start();
Thread.sleep(10000);
context.stop();

無償トライアル、サンプルプロジェクト、テクニカルサポート

これで、Camel を使用してRabbitMQ からJSON ファイルにデータをルーティングするJava アプリケーションを使用できるようになりました。CData JDBC Driver for API の30日の無償評価版と、サンプルプロジェクトをダウンロードして(TODO コメントに注意して)、Apache Camel でリアルタイムRabbitMQ のデータの操作を開始します。ご不明な点があれば、サポートチームにお問い合わせください。

はじめる準備はできましたか?

API Driver で RabbitMQ のライブデータに接続

RabbitMQ に接続