RabbitMQ のデータをApache Kafka トピックにストリーミング

Dibyendu Datta
Dibyendu Datta
Lead Technology Evangelist
CData JDBC Driver とKafka Connect JDBC コネクタを使用して、Apache Kafka でRabbitMQ のデータにアクセスし、ストリーミングできます。

Apache Kafka は、主にリアルタイムデータパイプラインやイベント駆動型アプリケーションの構築に使用されるオープンソースのストリーム処理プラットフォームです。CData API Driver for JDBCと組み合わせることで、Kafka はライブのRabbitMQ のデータを扱うことができます。この記事では、RabbitMQ データをApache Kafka トピックに接続、アクセス、ストリーミングする方法と、Confluent Control Center を起動してConfluent プラットフォームのKafka インフラストラクチャを使用して受信したRabbitMQ のデータをユーザーが安全に管理および監視できるようにする方法について説明します。

CData JDBC Driver は最適化されたデータ処理機能を内蔵しており、ライブのRabbitMQ のデータとのやり取りにおいて比類のないパフォーマンスを提供します。RabbitMQ に複雑なSQL クエリを発行すると、ドライバーはフィルタや集計などのサポートされているSQL 操作を直接RabbitMQ にプッシュし、サポートされていない操作(多くの場合SQL 関数やJOIN 操作)については組み込みのSQL エンジンを使用してクライアント側で処理します。組み込みの動的メタデータクエリにより、ネイティブのデータ型を使用してRabbitMQ のデータを操作および分析できます。

前提条件

Apache Kafka トピックでRabbitMQ のデータをストリーミングするためにCData JDBC Driver を接続する前に、クライアントのLinux ベースのシステムに以下をインストールおよび設定してください。

  1. Confluent Platform for Apache Kafka
  2. Confluent Hub CLI のインストール
  3. Confluent Platform 用のSelf-Managed Kafka JDBC Source Connector

RabbitMQ のデータへの新しいJDBC 接続を定義

  1. Linux ベースのシステムにCData API Driver for JDBCをダウンロードします。
  2. 以下の手順に従って新しいディレクトリを作成し、すべてのドライバーの内容を展開します:
    1. RabbitMQ という名前の新しいディレクトリを作成します。
      		mkdir API
      		
    2. ダウンロードしたドライバーファイル(.zip)をこの新しいディレクトリに移動します。
      		mv APIJDBCDriver.zip API/
      		
    3. CData APIJDBCDriver の内容をこの新しいディレクトリに解凍します。
      		unzip APIJDBCDriver.zip
      		
  3. RabbitMQ ディレクトリを開き、lib フォルダに移動します。
    ls
    cd lib/
    
  4. CData API Driver for JDBClib フォルダの内容をKafka Connect JDBClib フォルダにコピーします。Kafka Connect JDBC フォルダの内容を確認し、cdata.jdbc.api.jar ファイルがlib フォルダに正常にコピーされたことを確認します。
    cp -r /path/to/CData API Driver for JDBC/lib/* /usr/share/confluent-hub-components/confluentinc-kafka-connect-jdbc/lib/
    cd /usr/share/confluent-hub-components/confluentinc-kafka-connect-jdbc/lib/
    
  5. 以下のコマンドを使用して、CData RabbitMQ JDBC ドライバーのライセンスをインストールします。名前とメールアドレスを入力してください。
    	java -jar cdata.jdbc.api.jar -l
    	
  6. プロダクトキーまたは"TRIAL" を入力します(ライセンスの有効期限が切れた場合は、CData サポートチームまでお問い合わせください)。
  7. 以下のコマンドを使用してConfluent ローカルサービスを起動します:
    	confluent local services start
    	

    これにより、Zookeeper、Kafka、Schema Registry、Kafka REST、Kafka CONNECT、ksqlDB、Control Center などのすべてのConfluent サービスが起動します。これで、CData JDBC Driver for RabbitMQ を使用してKafka Connect Driver 経由でksqlDB のKafka トピックにメッセージをストリーミングする準備が整いました。

    Confluent ローカルサービスを起動
  8. POST HTTP API リクエストを使用してKafka トピックを手動で作成します:
     curl --location 'server_address:8083/connectors'
    	--header 'Content-Type: application/json'
    	--data '{ 
    		"name": "jdbc_source_cdata_api_01", 
    		"config": { 
    			"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", 
    			"connection.url": "jdbc:api:Profile=C:\profiles\\RabbitMQ.apip;AuthScheme=Basic;URL=http://localhost:15672;User=guest;Password=guest;",
    		"topic.prefix": "api-01-", 
    		"mode": "bulk" 
    		} 
    	}'
    

    HTTP POST 本文(上記)で使用されるフィールドについて説明します:

    • connector.class: 使用するKafka Connect コネクタのJava クラスを指定します。
    • connection.url: RabbitMQ データに接続するためのJDBC 接続URL です。

      組み込みの接続文字列デザイナー

      JDBC URL の作成については、CData API Driver for JDBCに組み込まれた接続文字列デザイナーを使用してください。JAR ファイルをダブルクリックするか、コマンドラインからjar ファイルを実行します。

      		java -jar cdata.jdbc.api.jar
      		

      接続プロパティを入力し、接続文字列をクリップボードにコピーします。

      RabbitMQ Management HTTP API について

      RabbitMQ は、複数のメッセージングプロトコルをサポートするオープンソースのメッセージブローカーです。RabbitMQ Management HTTP API は、RabbitMQ サーバーの管理データと監視データに HTTP 経由でアクセスする手段を提供します。この API では、仮想ホスト、エクスチェンジ、キュー、バインディング、コネクション、チャネル、コンシューマー、ユーザー、権限、ポリシー、クラスター全体の統計情報を取得できます。

      HTTP API を利用するには、RabbitMQ サーバーで Management プラグインを有効化する必要があります。デフォルトでは、管理インターフェースはポート 15672 でリッスンします。

      Basic 認証の設定

      RabbitMQ Management HTTP API は HTTP Basic 認証を使用します。RabbitMQ 管理ユーザーのユーザー名とパスワードを指定する必要があります。

      管理 API へのアクセスを有効にするには、以下のステップで進めます:

      1. サーバーで RabbitMQ Management プラグインが有効になっていることを確認します(rabbitmq-plugins enable rabbitmq_management)。
      2. 既存の管理ユーザーを使用するか、適切な管理タグ(management、policymaker、monitoring、または administrator)を持つユーザーを作成します。
      3. RabbitMQ Management HTTP API の完全なベース URL を控えておきます(例:http://localhost:15672)。

      RabbitMQ サーバーを設定したら、以下の接続プロパティを設定して接続します:

      • AuthScheme:Basic に設定します。
      • URL:RabbitMQ Management HTTP API のベース URL に設定します(例:http://localhost:15672)。
      • User:RabbitMQ の管理ユーザー名に設定します(例:guest)。
      • Password:RabbitMQ の管理パスワードに設定します。

      接続文字列の例:

      Profile=C:\profiles\RabbitMQ.apip;AuthScheme=Basic;URL=http://localhost:15672;User=guest;Password=guest;
      

      利用可能なテーブル

      RabbitMQ プロファイルでは、以下のテーブルにアクセスできます:

      • Overview - クラスター全体の統計情報と RabbitMQ ノードに関する情報
      • Nodes - RabbitMQ クラスター内の個々のノードに関する情報
      • NodeMemory - 特定のクラスターノードの詳細なメモリ使用状況の内訳
      • Connections - ブローカーへのすべてのオープンな AMQP コネクションの一覧
      • Channels - すべてのコネクションにわたるオープンな AMQP チャネルの一覧
      • Consumers - すべてのキューに登録されたコンシューマーの一覧
      • Exchanges - すべての仮想ホストで宣言されたエクスチェンジの一覧
      • Queues - すべての仮想ホストで宣言されたキューの一覧
      • Bindings - エクスチェンジとキュー間のすべてのバインディングの一覧
      • VirtualHosts - ブローカーに設定された仮想ホストの一覧
      • VhostPermissions - 特定の仮想ホスト内のユーザー権限
      • Users - すべての RabbitMQ ユーザーの一覧
      • Permissions - すべての仮想ホストにわたる全ユーザーの権限レコード
      • TopicPermissions - 全ユーザーのトピックレベルの権限レコード
      • Policies - 仮想ホスト内のキューおよびエクスチェンジに適用されたポリシーの一覧
      • OperatorPolicies - 仮想ホスト内のキューに適用されたオペレーターポリシーの一覧
      • Parameters - 仮想ホストごとのコンポーネントパラメータ(例:federation、shovel)の一覧
      • GlobalParameters - すべての仮想ホストに適用されるグローバルパラメータの一覧
      • VhostLimits - 特定の仮想ホストに設定されたリソース制限
      • UserLimits - 特定のユーザーに設定されたリソース制限
      • FeatureFlags - フィーチャーフラグの一覧と、ノード上での有効/無効の状態
      • DeprecatedFeatures - 非推奨機能の一覧と、その使用状態
      • AuthAttempts - ノードの認証試行統計
      • ClusterName - RabbitMQ クラスターの名前
      • WhoAmI - 現在認証されている管理ユーザーに関する情報
      • ExchangeBindingsSource - 特定のエクスチェンジがソースとなっているバインディング
      • ExchangeBindingsDestination - 特定のエクスチェンジが宛先となっているバインディング
      • QueueBindings - 仮想ホスト内の特定のキューのバインディング
      組み込みの接続文字列デザイナーを使用してJDBC URL を生成(Salesforce の例)
    • topic.prefix: コネクタによって作成されるKafka トピックに追加されるプレフィックスです。「api-01-」に設定されています。
    • mode: コネクタの動作モードを指定します。ここでは「bulk」に設定されており、コネクタがバルクデータ転送を実行するように設定されていることを示しています。

    このリクエストにより、RabbitMQ のすべてのテーブル/コンテンツがKafka トピックとして追加されます。

    注意: リクエストをPOST するIP アドレス(サーバー)は、Linux ネットワークのIP アドレスです。

  9. ksqlDB を実行し、トピックを一覧表示します。以下のコマンドを使用します:
    ksql
    list topics;
    
    Kafka トピックを一覧表示(BigCommerce の例)
  10. トピック内のデータを表示するには、以下のSQL ステートメントを入力します:
    PRINT topic FROM BEGINNING;
    

Confluent Control Center への接続

Confluent Control Center のユーザーインターフェースにアクセスするには、上記のセクションで説明した"confluent local services" を実行し、ローカルブラウザでhttp://<server address>:9021/clusters/ と入力してください。

Confluent Control Center に接続

おわりに

CData API Driver for JDBCの30日間無償トライアルをダウンロードして、RabbitMQ データをApache Kafka にストリーミングしましょう。ご不明な点があれば、サポートチームまでお問い合わせください。

はじめる準備はできましたか?

API Driver で RabbitMQ のライブデータに接続

RabbitMQ に接続