RabbitMQ のデータ を Microsoft Fabric の OneLake に自動で継続的にレプリケーションする方法

Dibyendu Datta
Dibyendu Datta
Lead Technology Evangelist
CData Sync を使って、RabbitMQ のデータ を Microsoft Fabric の OneLake に自動・継続的・カスタマイズ可能なレプリケーションを実現する方法を解説します。

常時稼働のアプリケーションには、自動フェイルオーバー機能とリアルタイムのデータアクセスが欠かせません。CData Sync を使えば、Microsoft Fabric の OneLake インスタンスにRabbitMQ のデータをリアルタイムで統合できます。すべてのデータを1カ所に集約し、アーカイブ、レポーティング、分析、機械学習、AI などさまざまな用途に活用できます。

OneLake を同期先として設定する

CData Sync を使って、RabbitMQ のデータ を OneLake にレプリケーションできます。同期先を追加するには、接続タブに移動します。

  1. 接続の追加をクリックします。
  2. 同期先タブをクリックし、Azure OneLake コネクタを探します。
  3. 該当行の末尾にある接続の設定アイコンをクリックして、新しい接続ページを開きます。接続の設定アイコンが表示されていない場合は、コネクタのダウンロードアイコンをクリックして OneLake コネクタをインストールします。新しいコネクタのインストールについて詳しくは、ヘルプドキュメントの「接続」セクションをご覧ください。
  4. コネクタが追加されたら、設定の基本タブで以下の接続プロパティを入力して OneLake に接続します:
    • 接続名:任意の接続名を入力します。
    • File Format:使用するファイル形式を選択します。Sync は CSV、PARQUET、AVRO ファイル形式をサポートしています。
    • URI:ファイルを含むファイルシステムとフォルダのパスを入力します(例:onelake://Workspace/Test.LakeHouse/Files/CustomFolder)。
    • Auth SchemeMicrosoft Entra ID ユーザーアカウントで接続する場合は、Auth Scheme にAzure ADを選択します。CData Sync には組み込みの OAuth アプリケーションが用意されているため、追加のプロパティ設定は不要です。
    • Data Model:選択したファイル形式のドキュメントを解析し、データベースメタデータを生成する際に使用するデータ形式を指定します。
    • CData Sync をホスティングしている場合(ローカルまたは独自のクラウド):
      • Use CData CallbackURL:トグルを無効にします。
      • Callback URL:コールバック URL を入力します。
    • CData Sync Cloud を使用している場合は、Use CData CallbackURLトグルを有効のままにしてください。
  5. 詳細タブに移動し、その他セクションまでスクロールします。
  6. Include Filesに、最初に選択したファイル形式を入力します。
  7. Insert ModeドロップダウンからCreateを選択します。その他の Insert Mode オプションにはOverwriteBatchがあります。
  8. 基本設定に戻り、Azure OneLake に接続をクリックします。
  9. 接続が確立されたら、作成およびテストをクリックして接続を保存します。

これで OneLake に接続され、データソースとしても同期先としても使用できるようになりました。

NOTEラベル機能を使って、データソースや同期先にラベルを追加できます。

この記事では、RabbitMQ のデータ を OneLake にロードし、同期先として活用する方法をご紹介します。

RabbitMQ への接続を設定する

RabbitMQ への接続は、接続タブから設定できます。RabbitMQ アカウントへの接続を追加するには、接続タブに移動します。

  1. 接続の追加をクリックします。
  2. データソース(RabbitMQ)を選択します。
  3. 接続プロパティを設定します。

    RabbitMQ Management HTTP API について

    RabbitMQ は、複数のメッセージングプロトコルをサポートするオープンソースのメッセージブローカーです。RabbitMQ Management HTTP API は、RabbitMQ サーバーの管理データと監視データに HTTP 経由でアクセスする手段を提供します。この API では、仮想ホスト、エクスチェンジ、キュー、バインディング、コネクション、チャネル、コンシューマー、ユーザー、権限、ポリシー、クラスター全体の統計情報を取得できます。

    HTTP API を利用するには、RabbitMQ サーバーで Management プラグインを有効化する必要があります。デフォルトでは、管理インターフェースはポート 15672 でリッスンします。

    Basic 認証の設定

    RabbitMQ Management HTTP API は HTTP Basic 認証を使用します。RabbitMQ 管理ユーザーのユーザー名とパスワードを指定する必要があります。

    管理 API へのアクセスを有効にするには、以下のステップで進めます:

    1. サーバーで RabbitMQ Management プラグインが有効になっていることを確認します(rabbitmq-plugins enable rabbitmq_management)。
    2. 既存の管理ユーザーを使用するか、適切な管理タグ(management、policymaker、monitoring、または administrator)を持つユーザーを作成します。
    3. RabbitMQ Management HTTP API の完全なベース URL を控えておきます(例:http://localhost:15672)。

    RabbitMQ サーバーを設定したら、以下の接続プロパティを設定して接続します:

    • AuthScheme:Basic に設定します。
    • URL:RabbitMQ Management HTTP API のベース URL に設定します(例:http://localhost:15672)。
    • User:RabbitMQ の管理ユーザー名に設定します(例:guest)。
    • Password:RabbitMQ の管理パスワードに設定します。

    接続文字列の例:

    Profile=C:\profiles\RabbitMQ.apip;AuthScheme=Basic;URL=http://localhost:15672;User=guest;Password=guest;
    

    利用可能なテーブル

    RabbitMQ プロファイルでは、以下のテーブルにアクセスできます:

    • Overview - クラスター全体の統計情報と RabbitMQ ノードに関する情報
    • Nodes - RabbitMQ クラスター内の個々のノードに関する情報
    • NodeMemory - 特定のクラスターノードの詳細なメモリ使用状況の内訳
    • Connections - ブローカーへのすべてのオープンな AMQP コネクションの一覧
    • Channels - すべてのコネクションにわたるオープンな AMQP チャネルの一覧
    • Consumers - すべてのキューに登録されたコンシューマーの一覧
    • Exchanges - すべての仮想ホストで宣言されたエクスチェンジの一覧
    • Queues - すべての仮想ホストで宣言されたキューの一覧
    • Bindings - エクスチェンジとキュー間のすべてのバインディングの一覧
    • VirtualHosts - ブローカーに設定された仮想ホストの一覧
    • VhostPermissions - 特定の仮想ホスト内のユーザー権限
    • Users - すべての RabbitMQ ユーザーの一覧
    • Permissions - すべての仮想ホストにわたる全ユーザーの権限レコード
    • TopicPermissions - 全ユーザーのトピックレベルの権限レコード
    • Policies - 仮想ホスト内のキューおよびエクスチェンジに適用されたポリシーの一覧
    • OperatorPolicies - 仮想ホスト内のキューに適用されたオペレーターポリシーの一覧
    • Parameters - 仮想ホストごとのコンポーネントパラメータ(例:federation、shovel)の一覧
    • GlobalParameters - すべての仮想ホストに適用されるグローバルパラメータの一覧
    • VhostLimits - 特定の仮想ホストに設定されたリソース制限
    • UserLimits - 特定のユーザーに設定されたリソース制限
    • FeatureFlags - フィーチャーフラグの一覧と、ノード上での有効/無効の状態
    • DeprecatedFeatures - 非推奨機能の一覧と、その使用状態
    • AuthAttempts - ノードの認証試行統計
    • ClusterName - RabbitMQ クラスターの名前
    • WhoAmI - 現在認証されている管理ユーザーに関する情報
    • ExchangeBindingsSource - 特定のエクスチェンジがソースとなっているバインディング
    • ExchangeBindingsDestination - 特定のエクスチェンジが宛先となっているバインディング
    • QueueBindings - 仮想ホスト内の特定のキューのバインディング
  4. RabbitMQ に接続をクリックして、接続が正しく設定されていることを確認します。
  5. 作成およびテストをクリックして変更を保存します。

レプリケーションクエリの設定

CData Sync では、ポイント&クリック操作と SQL クエリの両方でレプリケーションを制御できます。レプリケーションを設定するには、ジョブタブに移動し、ジョブを追加をクリックします。レプリケーションのデータソースと同期先を選択します。

ジョブの編集

  1. ジョブの詳細タブで、レプリケーションオプションの編集をクリックし、Insert ModeSingle Fileに設定します(OneLake コネクタで Insert Mode を「Create」に設定した場合)。
  2. 「Batch」モードの場合は、ジョブの Insert Mode をFile Per Batchに設定する必要があります。
  3. 「Overwrite」モードの場合は、Single FileFile Per Batchのどちらも使用できます。

テーブル全体をレプリケーションする

テーブル全体をレプリケーションするには、ジョブのタスクタブでタスクを追加をクリックし、OneLake にレプリケーションしたい RabbitMQ テーブルをリストから選択して、再度タスクを追加をクリックします。

レプリケーションのカスタマイズ

タスクのカラムタブとクエリタブを使って、レプリケーションをカスタマイズできます。カラムタブでは、レプリケーションするカラムの指定、同期先でのカラム名の変更、レプリケーション前のデータ操作などが可能です。クエリタブでは、SQL クエリを使ってフィルタ、グループ化、ソートを追加できます。

レプリケーションのスケジュール

ジョブの概要タブを選択し、スケジュールの下にある設定をクリックします。10分ごとから月1回まで、指定した間隔でジョブを自動実行するようにスケジュールできます。

レプリケーションジョブを設定したら、変更を保存をクリックします。RabbitMQ のデータ から OneLake へのレプリケーションを管理するジョブをいくつでも設定できます。

レプリケーションジョブの実行

ジョブに必要なすべての設定が完了したら、レプリケーションしたい RabbitMQ テーブルを選択し、実行をクリックします。レプリケーションが正常に完了すると、ジョブの実行時間とレプリケーションされた行数を示す通知が表示されます。

無料トライアル & 詳細情報

RabbitMQ のデータ を OneLake にレプリケーションする方法をご覧いただきました。CData Sync ページで詳細をご確認いただき、30日間の無料トライアルをダウンロードして、エンタープライズデータの統合を始めましょう。

ご不明な点がございましたら、サポートチームがいつでもお手伝いいたします。

はじめる準備はできましたか?

詳細はこちら、または無料トライアルにお申し込みください:

CData Sync