Apache NiFi で RabbitMQ に接続

Dibyendu Datta
Dibyendu Datta
Lead Technology Evangelist
CData JDBC Driver を使用して Apache NiFi で RabbitMQ のデータ にアクセスして処理できます。

Apache NiFi は、強力かつスケーラブルなデータルーティング、変換、システム間連携ロジックの有向グラフをサポートしています。CData API Driver for JDBC と組み合わせることで、NiFi からリアルタイムのRabbitMQ のデータ を操作できます。この記事では、Apache NiFi Flow からRabbitMQ のデータ に接続してクエリを実行する方法を説明します。

CData JDBC Driver は、最適化されたデータ処理機能が組み込まれており、リアルタイムのRabbitMQ のデータ とのやり取りにおいて比類のないパフォーマンスを提供します。複雑なSQL クエリをRabbitMQ に発行すると、ドライバーはフィルタや集計などのサポートされているSQL 操作を直接RabbitMQ にプッシュし、サポートされていない操作(多くの場合SQL 関数やJOIN 操作)は組み込みのSQL エンジンでクライアント側に処理します。また、組み込みの動的メタデータクエリにより、ネイティブのデータ型を使用してRabbitMQ のデータ を操作・分析できます。

Apache NiFi でRabbitMQ のデータ に接続

  1. CData API Driver for JDBC のインストーラーをダウンロードし、パッケージを解凍して、.exe ファイルを実行してドライバーをインストールします。
  2. CData JDBC Driver のJAR ファイル(およびライセンスファイルがある場合はそれも)、cdata.jdbc.api.jar(および cdata.jdbc.api.lic)を Apache NiFi の lib サブフォルダにコピーします(例:C:\nifi-1.3.0-bin\nifi-1.3.0\lib)。

    Windows では、CData JDBC Driver のデフォルトのインストール場所は C:\Program Files\CData\CData API Driver for JDBC です。

  3. bin サブフォルダにある run-nifi.bat ファイルを実行して Apache NiFi を起動します(例:C:\nifi-1.3.0-bin\nifi-1.3.0\bin)。

    (または)

    コマンドプロンプトで対象のディレクトリに移動し、run-nifi.bat ファイルを実行します:

    cd C:\nifi-1.3.0-bin\nifi-1.3.0\bin
    .\run-nifi.bat
    
  4. Web ブラウザで Apache NiFi の UI に移動します:https://localhost:8443/nifi でアクセスできます。

    注意:古いバージョンの Apache NiFi を使用している場合は、http://localhost:8080/nifi からアクセスする必要があります。以前のバージョンでは HTTP プロトコルが使用されていましたが、最新バージョンでは HTTPS が標準になっています。デフォルトでは、HTTP はポート 8080 で動作し、HTTPS はポート 8443 を使用します。

  5. URL から Apache NiFi にアクセスすると、ログイン用のユーザー名とパスワードの入力を求められます。

    ログイン資格情報を取得するには、NiFi のインストールディレクトリ内の log ディレクトリにある「App.log」ファイルを確認してください。このファイルには通常、NiFi インターフェースにアクセスするために必要な情報が含まれています。

  6. NiFi Flow のワークスペースを右クリックし、「Controller Services」をクリックします。
  7. ボタンをクリックして、新しいコントローラーサービスを作成します。
  8. Controller Services セクションで、新しく作成した「DBCPConnection Pool」を見つけ、メニュー()から「Edit」をクリックして新しい接続を設定します。
  9. 以下のプロパティを入力します:

    • Database Connection URL:jdbc:api:Profile=C:\profiles\\RabbitMQ.apip;AuthScheme=Basic;URL=http://localhost:15672;User=guest;Password=guest;
    • Database Driver Class Name:cdata.jdbc.api.APIDriver
    • Database Driver Location(s):Apache NiFi の lib フォルダへのパス(JAR ファイルが配置されている場所)。

    組み込みの接続文字列デザイナー

    JDBC URL の構築には、RabbitMQ JDBC Driver に組み込まれている接続文字列デザイナーを使用できます。JAR ファイルをダブルクリックするか、コマンドラインから JAR ファイルを実行してください。

    java -jar cdata.jdbc.api.jar
    

    接続プロパティを入力し、接続文字列をクリップボードにコピーします。

    RabbitMQ Management HTTP API について

    RabbitMQ は、複数のメッセージングプロトコルをサポートするオープンソースのメッセージブローカーです。RabbitMQ Management HTTP API は、RabbitMQ サーバーの管理データと監視データに HTTP 経由でアクセスする手段を提供します。この API では、仮想ホスト、エクスチェンジ、キュー、バインディング、コネクション、チャネル、コンシューマー、ユーザー、権限、ポリシー、クラスター全体の統計情報を取得できます。

    HTTP API を利用するには、RabbitMQ サーバーで Management プラグインを有効化する必要があります。デフォルトでは、管理インターフェースはポート 15672 でリッスンします。

    Basic 認証の設定

    RabbitMQ Management HTTP API は HTTP Basic 認証を使用します。RabbitMQ 管理ユーザーのユーザー名とパスワードを指定する必要があります。

    管理 API へのアクセスを有効にするには、以下のステップで進めます:

    1. サーバーで RabbitMQ Management プラグインが有効になっていることを確認します(rabbitmq-plugins enable rabbitmq_management)。
    2. 既存の管理ユーザーを使用するか、適切な管理タグ(management、policymaker、monitoring、または administrator)を持つユーザーを作成します。
    3. RabbitMQ Management HTTP API の完全なベース URL を控えておきます(例:http://localhost:15672)。

    RabbitMQ サーバーを設定したら、以下の接続プロパティを設定して接続します:

    • AuthScheme:Basic に設定します。
    • URL:RabbitMQ Management HTTP API のベース URL に設定します(例:http://localhost:15672)。
    • User:RabbitMQ の管理ユーザー名に設定します(例:guest)。
    • Password:RabbitMQ の管理パスワードに設定します。

    接続文字列の例:

    Profile=C:\profiles\RabbitMQ.apip;AuthScheme=Basic;URL=http://localhost:15672;User=guest;Password=guest;
    

    利用可能なテーブル

    RabbitMQ プロファイルでは、以下のテーブルにアクセスできます:

    • Overview - クラスター全体の統計情報と RabbitMQ ノードに関する情報
    • Nodes - RabbitMQ クラスター内の個々のノードに関する情報
    • NodeMemory - 特定のクラスターノードの詳細なメモリ使用状況の内訳
    • Connections - ブローカーへのすべてのオープンな AMQP コネクションの一覧
    • Channels - すべてのコネクションにわたるオープンな AMQP チャネルの一覧
    • Consumers - すべてのキューに登録されたコンシューマーの一覧
    • Exchanges - すべての仮想ホストで宣言されたエクスチェンジの一覧
    • Queues - すべての仮想ホストで宣言されたキューの一覧
    • Bindings - エクスチェンジとキュー間のすべてのバインディングの一覧
    • VirtualHosts - ブローカーに設定された仮想ホストの一覧
    • VhostPermissions - 特定の仮想ホスト内のユーザー権限
    • Users - すべての RabbitMQ ユーザーの一覧
    • Permissions - すべての仮想ホストにわたる全ユーザーの権限レコード
    • TopicPermissions - 全ユーザーのトピックレベルの権限レコード
    • Policies - 仮想ホスト内のキューおよびエクスチェンジに適用されたポリシーの一覧
    • OperatorPolicies - 仮想ホスト内のキューに適用されたオペレーターポリシーの一覧
    • Parameters - 仮想ホストごとのコンポーネントパラメータ(例:federation、shovel)の一覧
    • GlobalParameters - すべての仮想ホストに適用されるグローバルパラメータの一覧
    • VhostLimits - 特定の仮想ホストに設定されたリソース制限
    • UserLimits - 特定のユーザーに設定されたリソース制限
    • FeatureFlags - フィーチャーフラグの一覧と、ノード上での有効/無効の状態
    • DeprecatedFeatures - 非推奨機能の一覧と、その使用状態
    • AuthAttempts - ノードの認証試行統計
    • ClusterName - RabbitMQ クラスターの名前
    • WhoAmI - 現在認証されている管理ユーザーに関する情報
    • ExchangeBindingsSource - 特定のエクスチェンジがソースとなっているバインディング
    • ExchangeBindingsDestination - 特定のエクスチェンジが宛先となっているバインディング
    • QueueBindings - 仮想ホスト内の特定のキューのバインディング
  10. Controller Services セクションで、新しく作成した DBCPConnection Pool を見つけ、メニュー()から「Enable」をクリックして新しい接続を有効にします。
  11. 「Enable Controller Service」ウィンドウで、Scope を「Service and referencing components」に設定します。
  12. 接続を確立し、SELECT クエリを実行するには、プロセッサー(黄色でハイライトされている部分)をワークスペースにドラッグ&ドロップします。
  13. 「ExecuteSQL」プロセッサーを選択し、「Add」ボタンをクリックしてワークスペースに表示させます。
  14. 追加したプロセッサー(ExecuteSQL)をダブルクリックして、接続ページを開きます。
  15. Properties セクションで、必要な情報を入力します。Database Connection Pooling Service を作成した DBCPConnectionPool に一致させ、SQL select query セクションに実行したい SQL クエリを設定してください。
  16. Relationships に移動し、実行プロセスの成功時と失敗時にコンポーネントがどのように処理を進めるかのオプションを選択してください。
  17. ExecuteSQL コンポーネントを有効にするには、それを選択して Operation セクションの「Enable」をクリックするか、右クリックして「Enable」を選択します。

これでRabbitMQ のデータ が Apache NiFi で使用できるようになりました。たとえば、DBCPConnection Pool を QueryDatabaseTable プロセッサーのソースとして使用できます(以下に表示)。

30日間の無料トライアルをダウンロードして、Apache NiFi でリアルタイムのRabbitMQ のデータ を操作してみてください。ご不明な点は、サポートチームまでお気軽にお問い合わせください。

はじめる準備はできましたか?

API Driver で RabbitMQ のライブデータに接続

RabbitMQ に接続