Pentaho Data Integration でRabbitMQ のデータを連携

Jerod Johnson
Jerod Johnson
Director, Technology Evangelism
Pentaho Data Integration で RabbitMQ のデータ をベースにした ETL パイプラインを構築します。



CData API Driver for JDBC を使用すると、データパイプラインからリアルタイムデータにアクセスできます。Pentaho Data Integration は、ETL(Extraction, Transformation, and Loading)エンジンであり、データをクレンジングし、アクセス可能な統一フォーマットでデータを格納します。この記事では、RabbitMQ のデータ に JDBC データソースとして接続し、Pentaho Data Integration で RabbitMQ のデータ をベースにしたジョブやトランスフォーメーションを構築する方法を説明します。

RabbitMQ への接続を設定

RabbitMQ Management HTTP API について

RabbitMQ は、複数のメッセージングプロトコルをサポートするオープンソースのメッセージブローカーです。RabbitMQ Management HTTP API は、RabbitMQ サーバーの管理データと監視データに HTTP 経由でアクセスする手段を提供します。この API では、仮想ホスト、エクスチェンジ、キュー、バインディング、コネクション、チャネル、コンシューマー、ユーザー、権限、ポリシー、クラスター全体の統計情報を取得できます。

HTTP API を利用するには、RabbitMQ サーバーで Management プラグインを有効化する必要があります。デフォルトでは、管理インターフェースはポート 15672 でリッスンします。

Basic 認証の設定

RabbitMQ Management HTTP API は HTTP Basic 認証を使用します。RabbitMQ 管理ユーザーのユーザー名とパスワードを指定する必要があります。

管理 API へのアクセスを有効にするには、以下のステップで進めます:

  1. サーバーで RabbitMQ Management プラグインが有効になっていることを確認します(rabbitmq-plugins enable rabbitmq_management)。
  2. 既存の管理ユーザーを使用するか、適切な管理タグ(management、policymaker、monitoring、または administrator)を持つユーザーを作成します。
  3. RabbitMQ Management HTTP API の完全なベース URL を控えておきます(例:http://localhost:15672)。

RabbitMQ サーバーを設定したら、以下の接続プロパティを設定して接続します:

  • AuthScheme:Basic に設定します。
  • URL:RabbitMQ Management HTTP API のベース URL に設定します(例:http://localhost:15672)。
  • User:RabbitMQ の管理ユーザー名に設定します(例:guest)。
  • Password:RabbitMQ の管理パスワードに設定します。

接続文字列の例:

Profile=C:\profiles\RabbitMQ.apip;AuthScheme=Basic;URL=http://localhost:15672;User=guest;Password=guest;

利用可能なテーブル

RabbitMQ プロファイルでは、以下のテーブルにアクセスできます:

  • Overview - クラスター全体の統計情報と RabbitMQ ノードに関する情報
  • Nodes - RabbitMQ クラスター内の個々のノードに関する情報
  • NodeMemory - 特定のクラスターノードの詳細なメモリ使用状況の内訳
  • Connections - ブローカーへのすべてのオープンな AMQP コネクションの一覧
  • Channels - すべてのコネクションにわたるオープンな AMQP チャネルの一覧
  • Consumers - すべてのキューに登録されたコンシューマーの一覧
  • Exchanges - すべての仮想ホストで宣言されたエクスチェンジの一覧
  • Queues - すべての仮想ホストで宣言されたキューの一覧
  • Bindings - エクスチェンジとキュー間のすべてのバインディングの一覧
  • VirtualHosts - ブローカーに設定された仮想ホストの一覧
  • VhostPermissions - 特定の仮想ホスト内のユーザー権限
  • Users - すべての RabbitMQ ユーザーの一覧
  • Permissions - すべての仮想ホストにわたる全ユーザーの権限レコード
  • TopicPermissions - 全ユーザーのトピックレベルの権限レコード
  • Policies - 仮想ホスト内のキューおよびエクスチェンジに適用されたポリシーの一覧
  • OperatorPolicies - 仮想ホスト内のキューに適用されたオペレーターポリシーの一覧
  • Parameters - 仮想ホストごとのコンポーネントパラメータ(例:federation、shovel)の一覧
  • GlobalParameters - すべての仮想ホストに適用されるグローバルパラメータの一覧
  • VhostLimits - 特定の仮想ホストに設定されたリソース制限
  • UserLimits - 特定のユーザーに設定されたリソース制限
  • FeatureFlags - フィーチャーフラグの一覧と、ノード上での有効/無効の状態
  • DeprecatedFeatures - 非推奨機能の一覧と、その使用状態
  • AuthAttempts - ノードの認証試行統計
  • ClusterName - RabbitMQ クラスターの名前
  • WhoAmI - 現在認証されている管理ユーザーに関する情報
  • ExchangeBindingsSource - 特定のエクスチェンジがソースとなっているバインディング
  • ExchangeBindingsDestination - 特定のエクスチェンジが宛先となっているバインディング
  • QueueBindings - 仮想ホスト内の特定のキューのバインディング

組み込みの接続文字列デザイナー

JDBC URL の構築を支援するには、RabbitMQ JDBC Driver に組み込まれている接続文字列デザイナーを使用してください。JAR ファイルをダブルクリックするか、コマンドラインから JAR ファイルを実行します。

java -jar cdata.jdbc.api.jar

接続プロパティを設定し、接続文字列をクリップボードにコピーします。

JDBC URL を設定する際には、Max Rows 接続プロパティの設定も検討してください。これにより返される行数が制限され、レポートやビジュアライゼーションの設計時にパフォーマンスを向上させることができます。

一般的な JDBC URL は次のようになります:

jdbc:api:Profile=C:\profiles\\RabbitMQ.apip;AuthScheme=Basic;URL=http://localhost:15672;User=guest;Password=guest;

接続文字列を保存して、Pentaho Data Integration で使用します。

Pentaho DI から RabbitMQ に接続

Pentaho Data Integration を開き、「Database Connection」を選択して CData API Driver for JDBC への接続を設定します。

  1. 「General」をクリックします。
  2. Connection name を設定します(例:RabbitMQ Connection)。
  3. Connection type を「Generic database」に設定します。
  4. Access を「Native (JDBC)」に設定します。
  5. Custom connection URL に RabbitMQ の接続文字列を設定します(例:
    jdbc:api:Profile=C:\profiles\\RabbitMQ.apip;AuthScheme=Basic;URL=http://localhost:15672;User=guest;Password=guest;
    )。
  6. Custom driver class name を「cdata.jdbc.api.APIDriver」に設定します。
  7. 接続をテストし、「OK」をクリックして保存します。

RabbitMQ のデータパイプラインを作成

CData JDBC Driver を使用して RabbitMQ への接続が設定されたら、新しいトランスフォーメーションまたはジョブを作成する準備が整いました。

  1. 「File」>>「New」>>「Transformation/job」をクリックします。
  2. 「Table input」オブジェクトをワークフローパネルにドラッグし、RabbitMQ 接続を選択します。
  3. 「Get SQL select statement」をクリックし、Database Explorer を使用して利用可能なテーブルとビューを表示します。
  4. テーブルを選択し、必要に応じてデータをプレビューして確認します。

ここから、適切な同期先を選択し、レプリケーション中にデータを変更、フィルタリング、その他の処理を行うトランスフォーメーションを追加することで、トランスフォーメーションまたはジョブを続行できます。

無料トライアルと詳細情報

CData API Driver for JDBC の 30日間無料トライアルをダウンロードして、Pentaho Data Integration で RabbitMQ のデータ のリアルタイムデータを今すぐ活用しましょう。

はじめる準備はできましたか?

API Driver で RabbitMQ のライブデータに接続

RabbitMQ に接続