Embulk を使用して RabbitMQ のデータをデータベースにロードする方法

Jerod Johnson
Jerod Johnson
Director, Technology Evangelism
CData JDBC Driver とオープンソースのETL/ELT ツールであるEmbulk を使ってRabbitMQ のデータをデータベースにロードする方法を解説します。



Embulk はオープンソースのバルクデータローダーです。CData API Driver for JDBC と組み合わせることで、RabbitMQ から任意の同期先にデータを簡単にロードできます。この記事では、CData API Driver for JDBC をEmbulk で使用してRabbitMQ のデータをMySQL データベースにロードする方法を解説します。

CData JDBC Driver は、最適化されたデータ処理機能を内蔵しており、リアルタイムのRabbitMQ のデータに対して比類のないパフォーマンスを発揮します。RabbitMQ に対して複雑なSQL クエリを発行すると、ドライバーはフィルタや集計などのサポートされているSQL 操作を直接RabbitMQ にプッシュし、サポートされていない操作(SQL 関数やJOIN 操作など)は組み込みのSQL エンジンを使用してクライアント側で処理します。

RabbitMQ への JDBC 接続を設定

Embulk でバルクロードジョブを作成する前に、JDBC Driver のJAR ファイルのインストール場所(通常はC:\Program Files\CData\CData API Driver for JDBC\lib)を確認しておきます。

Embulk はJDBC 接続をサポートしているため、RabbitMQ に簡単に接続してSQL クエリを実行できます。バルクロードジョブを作成する前に、RabbitMQ への認証用のJDBC URL を作成します。

RabbitMQ Management HTTP API について

RabbitMQ は、複数のメッセージングプロトコルをサポートするオープンソースのメッセージブローカーです。RabbitMQ Management HTTP API は、RabbitMQ サーバーの管理データと監視データに HTTP 経由でアクセスする手段を提供します。この API では、仮想ホスト、エクスチェンジ、キュー、バインディング、コネクション、チャネル、コンシューマー、ユーザー、権限、ポリシー、クラスター全体の統計情報を取得できます。

HTTP API を利用するには、RabbitMQ サーバーで Management プラグインを有効化する必要があります。デフォルトでは、管理インターフェースはポート 15672 でリッスンします。

Basic 認証の設定

RabbitMQ Management HTTP API は HTTP Basic 認証を使用します。RabbitMQ 管理ユーザーのユーザー名とパスワードを指定する必要があります。

管理 API へのアクセスを有効にするには、以下のステップで進めます:

  1. サーバーで RabbitMQ Management プラグインが有効になっていることを確認します(rabbitmq-plugins enable rabbitmq_management)。
  2. 既存の管理ユーザーを使用するか、適切な管理タグ(management、policymaker、monitoring、または administrator)を持つユーザーを作成します。
  3. RabbitMQ Management HTTP API の完全なベース URL を控えておきます(例:http://localhost:15672)。

RabbitMQ サーバーを設定したら、以下の接続プロパティを設定して接続します:

  • AuthScheme:Basic に設定します。
  • URL:RabbitMQ Management HTTP API のベース URL に設定します(例:http://localhost:15672)。
  • User:RabbitMQ の管理ユーザー名に設定します(例:guest)。
  • Password:RabbitMQ の管理パスワードに設定します。

接続文字列の例:

Profile=C:\profiles\RabbitMQ.apip;AuthScheme=Basic;URL=http://localhost:15672;User=guest;Password=guest;

利用可能なテーブル

RabbitMQ プロファイルでは、以下のテーブルにアクセスできます:

  • Overview - クラスター全体の統計情報と RabbitMQ ノードに関する情報
  • Nodes - RabbitMQ クラスター内の個々のノードに関する情報
  • NodeMemory - 特定のクラスターノードの詳細なメモリ使用状況の内訳
  • Connections - ブローカーへのすべてのオープンな AMQP コネクションの一覧
  • Channels - すべてのコネクションにわたるオープンな AMQP チャネルの一覧
  • Consumers - すべてのキューに登録されたコンシューマーの一覧
  • Exchanges - すべての仮想ホストで宣言されたエクスチェンジの一覧
  • Queues - すべての仮想ホストで宣言されたキューの一覧
  • Bindings - エクスチェンジとキュー間のすべてのバインディングの一覧
  • VirtualHosts - ブローカーに設定された仮想ホストの一覧
  • VhostPermissions - 特定の仮想ホスト内のユーザー権限
  • Users - すべての RabbitMQ ユーザーの一覧
  • Permissions - すべての仮想ホストにわたる全ユーザーの権限レコード
  • TopicPermissions - 全ユーザーのトピックレベルの権限レコード
  • Policies - 仮想ホスト内のキューおよびエクスチェンジに適用されたポリシーの一覧
  • OperatorPolicies - 仮想ホスト内のキューに適用されたオペレーターポリシーの一覧
  • Parameters - 仮想ホストごとのコンポーネントパラメータ(例:federation、shovel)の一覧
  • GlobalParameters - すべての仮想ホストに適用されるグローバルパラメータの一覧
  • VhostLimits - 特定の仮想ホストに設定されたリソース制限
  • UserLimits - 特定のユーザーに設定されたリソース制限
  • FeatureFlags - フィーチャーフラグの一覧と、ノード上での有効/無効の状態
  • DeprecatedFeatures - 非推奨機能の一覧と、その使用状態
  • AuthAttempts - ノードの認証試行統計
  • ClusterName - RabbitMQ クラスターの名前
  • WhoAmI - 現在認証されている管理ユーザーに関する情報
  • ExchangeBindingsSource - 特定のエクスチェンジがソースとなっているバインディング
  • ExchangeBindingsDestination - 特定のエクスチェンジが宛先となっているバインディング
  • QueueBindings - 仮想ホスト内の特定のキューのバインディング

組み込みの接続文字列デザイナー

JDBC URL の作成には、RabbitMQ JDBC Driver に組み込まれている接続文字列デザイナーを使用できます。JAR ファイルをダブルクリックするか、コマンドラインからJAR ファイルを実行します。

java -jar cdata.jdbc.api.jar

接続プロパティを入力し、接続文字列をクリップボードにコピーします。

以下は、RabbitMQ への一般的なJDBC 接続文字列です。

jdbc:api:Profile=C:\profiles\\RabbitMQ.apip;AuthScheme=Basic;URL=http://localhost:15672;User=guest;Password=guest;

Embulk で RabbitMQ ののデータをロード

CData JDBC Driver をインストールしてJDBC 接続文字列を作成したら、必要なEmbulk プラグインをインストールします。

Embulk の入力・出力プラグインをインストール

  1. Embulk にJDBC 入力プラグインをインストールします。
    https://github.com/embulk/embulk-input-jdbc/tree/master/embulk-input-jdbc
  2. embulk gem install embulk-input-jdbc
    
  3. この記事では、同期先データベースとしてMySQL を使用します。出力プラグインを使用して、SQL Server、PostgreSQL、またはGoogle BigQuery を同期先として選択することもできます。
    https://github.com/embulk/embulk-output-jdbc/tree/master/embulk-output-mysql
    embulk gem install embulk-output-mysql
    

入力プラグインと出力プラグインをインストールしたら、Embulk を使用してRabbitMQ のデータをMySQL にロードする準備が整いました。

RabbitMQ ののデータをロードするジョブを作成

まず、Embulk で設定ファイルを作成します。ファイル名はapi-mysql.yml のようにします。

  1. 入力プラグインのオプションには、CData API Driver for JDBC、ドライバーJAR ファイルへのパス、ドライバークラス(例:cdata.jdbc.api.APIDriver)、および上記のJDBC URL を指定します。
  2. 出力プラグインのオプションには、MySQL データベースの値と認証情報を指定します。

設定ファイルのサンプル(api-mysql.yml)

in:
	type: jdbc
	driver_path: C:\Program Files\CData[product_name] 20xx\lib\cdata.jdbc.api.jar
	driver_class: cdata.jdbc.api.APIDriver
	url: jdbc:api:Profile=C:\profiles\\RabbitMQ.apip;AuthScheme=Basic;URL=http://localhost:15672;User=guest;Password=guest;
	table: "AuthAttempts"
out:
	type: mysql
	host: localhost
	database: DatabaseName
	user: UserId
	password: UserPassword
	table: "AuthAttempts"
	mode: insert

ファイルを作成したら、Embulk ジョブを実行します。

embulk run api-mysql.yml

Embulk ジョブを実行すると、MySQL テーブルにRabbitMQ のデータが格納されます。

フィルタリングした RabbitMQ ののデータをロード

テーブルから直接データをロードするだけでなく、カスタムSQL クエリを使用してロードするデータをより詳細に制御できます。また、クエリフィールドのSQL WHERE 句で最終更新カラムを設定することで、増分ロードを実行することもできます。

in:
	type: jdbc
	driver_path: C:\Program Files\CData[product_name] 20xx\lib\cdata.jdbc.api.jar
	driver_class: cdata.jdbc.api.APIDriver
	url: jdbc:api:Profile=C:\profiles\\RabbitMQ.apip;AuthScheme=Basic;URL=http://localhost:15672;User=guest;Password=guest;
	query: "SELECT ,  FROM AuthAttempts WHERE [RecordId] = 1"
out:
	type: mysql
	host: localhost
	database: DatabaseName
	user: UserId
	password: UserPassword
	table: "AuthAttempts"
	mode: insert

詳細情報と無料トライアル

CData API Driver for JDBC をコネクタとして使用することで、Embulk のデータロードジョブにRabbitMQ のデータを統合できます。また、200 を超えるエンタープライズデータソース向けドライバーにより、あらゆるエンタープライズSaaS、ビッグデータ、NoSQL ソースも統合できます。30日間の無料トライアルをダウンロードして、今すぐお試しください。

はじめる準備はできましたか?

API Driver で RabbitMQ のライブデータに接続

RabbitMQ に接続