Google Cloud Data Fusion でRabbitMQ のデータを扱う方法:CData JDBC Driver

宮本航太
宮本航太
プロダクトスペシャリスト
CData JDBC ドライバを使って、Google Cloud Data fusion で RabbitMQ のデータ をBigQuery にETL。



Google Cloud Data Fusion は、ノーコードでデータ連携の設定が可能な言わば GCP の ETL ツール(サービス)です。たくさんのコネクタや変換・分析機能がデフォルトで用意されているため、さまざまなデータソースを色々な組み合わせで扱うことが可能なようです。 また JDBC を扱うこともできるため、この記事では、CData JDBC Driver for RabbitMQ のデータ を使って、RabbitMQ のデータ データをCloud Data Fusion でGoogle BigQuery にノーコードでパイプラインします。

Cloud Data Fusion の準備

まずはCloud Data Fusion のインスタンスを作成します。

  1. Data Fusion のトップ画面にある「CREATE INSTANCE」からインスタンスを作成します。
  2. 作成されたインスタンス名を先ほどの画面でクリックすると以下の画面に遷移しますので、画面下部にある Service Account をコピーします。
  3. Cloud Data Fusion のインスタンス作成
  4. 画面上部にある追加からメンバーを追加します。メンバー名は先ほどコピーした「Service Account」に合わせてください。 役割は BiqQuery へもアクセスしますので、「BigQuery 管理者」、「Cloud Data Fusion 管理者」、「Cloud Data Fusion API サービス エージェント」を付与します。

CData JDBC Driver for API のアップロード

ここからは実際に、Data Fusion の設定をしていきます。 まずは JDBC Driver をアップロードを行います。

  1. 「View Instance」をクリックして、Data Fusion の Control Center を開きます。
  2. Cloud Data Fusion のControl Center を開く
  3. Control Center が表示されたら、「+」ボタンをクリックして JDBC Driver をアップロードしていきます。
    • Name:アップロードしたドライバーに設定する名前
    • Class name:cdata.jdbc.api.APIDriver
    JDBC Driver をCloud Data Fusion にアップロード
  4. アップロードする際の注意点として、Driver のファイル名を name-version の形式に変更してアップロードする必要があります。 なお、jarファイルをダブルクリックした際に表示されているバージョンをもとに「api-connector-java-19.0.7115.0.jar」に変更しました。
  5. JDBC Driver をCloud Data Fusion にアップロード
  6. アップロードが成功するとこのような画面が表示されるので、「Create a Pipeline」をクリックします。
  7. JDBC Driver のアップロード終了

RabbitMQ からGoogle BigQuery へのパイプラインの作成

Data Fusion のパイプライン作成

インプット元はサイドメニューの「Source」から選択します。今回は先ほどアップロードした RabbitMQ のデータ の JDBC Driver を使用するため、「DataBase」を選択します。 アウトプット先は同じくサイドメニューより「Sink」→「BigQuery」を選択します。

Source およびSink 先の選択

「DataBase」の設定

「DataBase」のアイコンにカーソルを持ってくるとプロパティというボタンが表示されるのでクリックし、下記内容を設定します。

  • Label:API
  • Reference Name:API
  • Plugin Name:API Driver(Driver をアップロードした際の名前)
  • Plugin Type:jdbc
  • Connection String:API へ接続する際の JDBC URL
  • Import Query:インプットしたいデータを抽出するクエリ

RabbitMQ Management HTTP API について

RabbitMQ は、複数のメッセージングプロトコルをサポートするオープンソースのメッセージブローカーです。RabbitMQ Management HTTP API は、RabbitMQ サーバーの管理データと監視データに HTTP 経由でアクセスする手段を提供します。この API では、仮想ホスト、エクスチェンジ、キュー、バインディング、コネクション、チャネル、コンシューマー、ユーザー、権限、ポリシー、クラスター全体の統計情報を取得できます。

HTTP API を利用するには、RabbitMQ サーバーで Management プラグインを有効化する必要があります。デフォルトでは、管理インターフェースはポート 15672 でリッスンします。

Basic 認証の設定

RabbitMQ Management HTTP API は HTTP Basic 認証を使用します。RabbitMQ 管理ユーザーのユーザー名とパスワードを指定する必要があります。

管理 API へのアクセスを有効にするには、以下のステップで進めます:

  1. サーバーで RabbitMQ Management プラグインが有効になっていることを確認します(rabbitmq-plugins enable rabbitmq_management)。
  2. 既存の管理ユーザーを使用するか、適切な管理タグ(management、policymaker、monitoring、または administrator)を持つユーザーを作成します。
  3. RabbitMQ Management HTTP API の完全なベース URL を控えておきます(例:http://localhost:15672)。

RabbitMQ サーバーを設定したら、以下の接続プロパティを設定して接続します:

  • AuthScheme:Basic に設定します。
  • URL:RabbitMQ Management HTTP API のベース URL に設定します(例:http://localhost:15672)。
  • User:RabbitMQ の管理ユーザー名に設定します(例:guest)。
  • Password:RabbitMQ の管理パスワードに設定します。

接続文字列の例:

Profile=C:\profiles\RabbitMQ.apip;AuthScheme=Basic;URL=http://localhost:15672;User=guest;Password=guest;

利用可能なテーブル

RabbitMQ プロファイルでは、以下のテーブルにアクセスできます:

  • Overview - クラスター全体の統計情報と RabbitMQ ノードに関する情報
  • Nodes - RabbitMQ クラスター内の個々のノードに関する情報
  • NodeMemory - 特定のクラスターノードの詳細なメモリ使用状況の内訳
  • Connections - ブローカーへのすべてのオープンな AMQP コネクションの一覧
  • Channels - すべてのコネクションにわたるオープンな AMQP チャネルの一覧
  • Consumers - すべてのキューに登録されたコンシューマーの一覧
  • Exchanges - すべての仮想ホストで宣言されたエクスチェンジの一覧
  • Queues - すべての仮想ホストで宣言されたキューの一覧
  • Bindings - エクスチェンジとキュー間のすべてのバインディングの一覧
  • VirtualHosts - ブローカーに設定された仮想ホストの一覧
  • VhostPermissions - 特定の仮想ホスト内のユーザー権限
  • Users - すべての RabbitMQ ユーザーの一覧
  • Permissions - すべての仮想ホストにわたる全ユーザーの権限レコード
  • TopicPermissions - 全ユーザーのトピックレベルの権限レコード
  • Policies - 仮想ホスト内のキューおよびエクスチェンジに適用されたポリシーの一覧
  • OperatorPolicies - 仮想ホスト内のキューに適用されたオペレーターポリシーの一覧
  • Parameters - 仮想ホストごとのコンポーネントパラメータ(例:federation、shovel)の一覧
  • GlobalParameters - すべての仮想ホストに適用されるグローバルパラメータの一覧
  • VhostLimits - 特定の仮想ホストに設定されたリソース制限
  • UserLimits - 特定のユーザーに設定されたリソース制限
  • FeatureFlags - フィーチャーフラグの一覧と、ノード上での有効/無効の状態
  • DeprecatedFeatures - 非推奨機能の一覧と、その使用状態
  • AuthAttempts - ノードの認証試行統計
  • ClusterName - RabbitMQ クラスターの名前
  • WhoAmI - 現在認証されている管理ユーザーに関する情報
  • ExchangeBindingsSource - 特定のエクスチェンジがソースとなっているバインディング
  • ExchangeBindingsDestination - 特定のエクスチェンジが宛先となっているバインディング
  • QueueBindings - 仮想ホスト内の特定のキューのバインディング

Connection String は以下の形式です。

jdbc:api:Profile=C:\profiles\\RabbitMQ.apip;AuthScheme=Basic;URL=http://localhost:15672;User=guest;Password=guest;

Database プロパティ設定

上のキャプチャの赤枠は、Salesforce から BigQuery へアウトプットするデータの定義となります。 こちらは「Import Query」のすぐ右上にある「Get Schema」をクリックすると下の画面が表示されますので、「Import Query」で入力したクエリを実行し、カラムを定義します。

Output schema 設定

「BigQuery」の設定

こちらも同様に BigQuery のプロパティから下記内容を設定します。

  • Label:BigQuery
  • Reference Name:BigQuery
  • Project ID:使用するProject ID
  • DataSet:使用するDataSet
  • Table:使用するテーブル名、例:Account_DataFusion
BigQuery のプロパティ設定

作成したRabbitMQ のデータ からBigQuery のパイプラインの実行

まずは作成したパイプラインをデプロイします。赤枠の「Deploy」ボタンをクリックしてデプロイを行います。

Deploy Cloud Data Fusion Pipeline

デプロイ完了後、Runボタンが表示されますので、クリックします。

デプロイしたパイプラインを実行

このようにCData JDBC ドライバをアップロードすることで、簡単にGoogle Cloud Data Fusion でRabbitMQ のデータ データをノーコードで連携し、BigQuery などへのパイプラインを作成することができます。

是非、CData JDBC Driver for API 30日の無償評価版 をダウンロードして、お試しください。

はじめる準備はできましたか?

API Driver で RabbitMQ のライブデータに接続

RabbitMQ に接続