【ノーコード・リバースETL】RabbitMQのデータをSQL Serverで集計・分析してSalesforceに連携してみよう
CData Sync は、400種類以上のSaaS / DB のデータを各種DB・データウェアハウスにノーコードで統合可能なETL / ELT ツールです。CData Sync では、DB / DWH だけでなくSalesforce をはじめとする一部SaaS をデータの転送先としてサポートしているため、いわゆるリバースETL 構成のデータパイプラインを構築できます。
本記事では、RabbitMQ とSalesforce のデータをSQL Server に統合、リードスコアを計算・付加した後にSalesforce に連携する、というリバースETL 構成のパイプラインを作っていきます。
CData Sync とは?
CData Sync は、レポーティング・ダッシュボード、機械学習・AI などで使えるよう、社内のデータを一か所に統合して管理できるデータ基盤をノーコードで構築できるETL ツールで、以下の特徴を持っています。
- RabbitMQ をはじめとする400種類以上のSaaS / DB データに対応
- 主要なRDB、データレイク、データストア、データウェアハウスにデータを転送
- 業務データのデータ分析基盤へのETL / ELT 機能に特化し、極限まで設定操作をシンプルに
- 主要なSaaS データの差分更新やCDC(Change Data Capture、変更データキャプチャ)のサポート
- フレキシブルなSQL / dbt 連携での取得データの変換
- Salesforce を始めとする一部SaaS へのデータ転送(リバースETL)をサポート
リバースETL とは?
ETL の逆方向のデータ転送手法で、データウェアハウス(DWH)からSaaS へデータを転送することを指します。アプリ間連携のようなEAI とは異なり、ETL のようにバッチ処理での連携を行います。例えば、SalesforceとRabbitMQ のデータをデータウェアハウス内に統合、集計・予測してからSalesforceに書き戻したい場合、以下の2つの方法があります。
- Salesforce → データウェアハウスで連携
- データウェアハウスで変換されたデータをSalesforce に書き戻し
それでは、RabbitMQ とSalesforce のデータを統合して連携するための具体的な設定手順を説明していきます。
実現するシナリオ
RabbitMQ とSalesforce の情報を一度SQL Server に統合、統合したデータを使ってリードをスコアリングし、その結果をSalesforce に書き戻します。 リバースETL のデータソースとなるDB としてSQL Server を使い、全体のデータの流れは、
Salesforce (Lead)+RabbitMQ → SQL Server(スコアリング)→ Salesforce(Lead)となります。なお、Salesforce のLead オブジェクトにはスコアリング結果を格納するカスタム項目を事前に作成しておきます。
Salesforce とRabbitMQ への接続を設定
はじめに、Salesforce とRabbitMQ のデータをSQL Server に転送するための設定を行います。
CData Sync のブラウザ管理コンソールにログインします。CData Sync のインストールをまだ行っていない方は本記事の製品リンクからCData Sync をクリックして、30日の無償トライアルとしてCData Sync をインストールしてください。インストール後にCData Sync が起動して、ブラウザ設定画面が開きます。
それでは、データソースとしてRabbitMQ を設定していきましょう。左の[接続]タブをクリックします。
- [+接続の追加]ボタンをクリックします。
- [データソース]タブを選択して、リスト表示されるデータソースを選ぶか、検索バーにデータソース名を入力して、RabbitMQ を見つけます。
- RabbitMQ の右側の[→]をクリックして、RabbitMQ アカウントへの接続画面を開きます。もし、RabbitMQ のコネクタがデフォルトでCData Sync にインストールされていない場合には、ダウンロードアイコン(コネクタのアップロードアイコン)をクリックし、[ダウンロード]をクリックすると、CData Sync にコネクタがインストールされます。
- 接続プロパティにRabbitMQ に接続するアカウント情報を入力をします。
RabbitMQ Management HTTP API について
RabbitMQ は、複数のメッセージングプロトコルをサポートするオープンソースのメッセージブローカーです。RabbitMQ Management HTTP API は、RabbitMQ サーバーの管理データと監視データに HTTP 経由でアクセスする手段を提供します。この API では、仮想ホスト、エクスチェンジ、キュー、バインディング、コネクション、チャネル、コンシューマー、ユーザー、権限、ポリシー、クラスター全体の統計情報を取得できます。
HTTP API を利用するには、RabbitMQ サーバーで Management プラグインを有効化する必要があります。デフォルトでは、管理インターフェースはポート 15672 でリッスンします。
Basic 認証の設定
RabbitMQ Management HTTP API は HTTP Basic 認証を使用します。RabbitMQ 管理ユーザーのユーザー名とパスワードを指定する必要があります。
管理 API へのアクセスを有効にするには、以下のステップで進めます:
- サーバーで RabbitMQ Management プラグインが有効になっていることを確認します(rabbitmq-plugins enable rabbitmq_management)。
- 既存の管理ユーザーを使用するか、適切な管理タグ(management、policymaker、monitoring、または administrator)を持つユーザーを作成します。
- RabbitMQ Management HTTP API の完全なベース URL を控えておきます(例:http://localhost:15672)。
RabbitMQ サーバーを設定したら、以下の接続プロパティを設定して接続します:
- AuthScheme:Basic に設定します。
- URL:RabbitMQ Management HTTP API のベース URL に設定します(例:http://localhost:15672)。
- User:RabbitMQ の管理ユーザー名に設定します(例:guest)。
- Password:RabbitMQ の管理パスワードに設定します。
接続文字列の例:
Profile=C:\profiles\RabbitMQ.apip;AuthScheme=Basic;URL=http://localhost:15672;User=guest;Password=guest;
利用可能なテーブル
RabbitMQ プロファイルでは、以下のテーブルにアクセスできます:
- Overview - クラスター全体の統計情報と RabbitMQ ノードに関する情報
- Nodes - RabbitMQ クラスター内の個々のノードに関する情報
- NodeMemory - 特定のクラスターノードの詳細なメモリ使用状況の内訳
- Connections - ブローカーへのすべてのオープンな AMQP コネクションの一覧
- Channels - すべてのコネクションにわたるオープンな AMQP チャネルの一覧
- Consumers - すべてのキューに登録されたコンシューマーの一覧
- Exchanges - すべての仮想ホストで宣言されたエクスチェンジの一覧
- Queues - すべての仮想ホストで宣言されたキューの一覧
- Bindings - エクスチェンジとキュー間のすべてのバインディングの一覧
- VirtualHosts - ブローカーに設定された仮想ホストの一覧
- VhostPermissions - 特定の仮想ホスト内のユーザー権限
- Users - すべての RabbitMQ ユーザーの一覧
- Permissions - すべての仮想ホストにわたる全ユーザーの権限レコード
- TopicPermissions - 全ユーザーのトピックレベルの権限レコード
- Policies - 仮想ホスト内のキューおよびエクスチェンジに適用されたポリシーの一覧
- OperatorPolicies - 仮想ホスト内のキューに適用されたオペレーターポリシーの一覧
- Parameters - 仮想ホストごとのコンポーネントパラメータ(例:federation、shovel)の一覧
- GlobalParameters - すべての仮想ホストに適用されるグローバルパラメータの一覧
- VhostLimits - 特定の仮想ホストに設定されたリソース制限
- UserLimits - 特定のユーザーに設定されたリソース制限
- FeatureFlags - フィーチャーフラグの一覧と、ノード上での有効/無効の状態
- DeprecatedFeatures - 非推奨機能の一覧と、その使用状態
- AuthAttempts - ノードの認証試行統計
- ClusterName - RabbitMQ クラスターの名前
- WhoAmI - 現在認証されている管理ユーザーに関する情報
- ExchangeBindingsSource - 特定のエクスチェンジがソースとなっているバインディング
- ExchangeBindingsDestination - 特定のエクスチェンジが宛先となっているバインディング
- QueueBindings - 仮想ホスト内の特定のキューのバインディング
- [作成およびテスト]をクリックして、正しくRabbitMQ に接続できているかをテストして保存します。これでレプリケーションのデータソースとしてRabbitMQ への接続が設定されました。
Salesforce への接続を設定
データソースとしてSalesforce を設定します。接続プロパティまでの設定方法は基本的にRabbitMQ と同じです。
Salesforce への接続には通常のログインの他、OAuth やSSO を利用できます。ログイン方式では、ユーザー名、パスワード、セキュリティトークンを使って接続します。Salesforce セキュリティトークンの取得についてはこちらの記事をご確認ください。
ユーザー名、パスワードを使用しない、またはできない場合、OAuth 認証を利用できます。
SSO (シングルサインオン) は、SSOProperties、SSOLoginUrl、TokenUrl プロパティを設定することでID プロバイダー経由で利用できます。詳細はヘルプドキュメントの「はじめに」を参照してください。
SQL Server への接続を設定
次に、SQL Server への接続を設定します。同じく[接続]タブを開きます。
- [+接続の追加]ボタンをクリックします。
- [同期先]タブを選択して、リスト表示されるデータソースを選ぶか、検索バーにデータソース名を入力して、SQL Server を見つけます。
- SQL Server の右側の[→]をクリックして、SQL Server データベースへの接続画面を開きます。
- 必要な接続プロパティを入力します。SQL Server との接続には、以下のプロパティが必要です。
- User: SQL Server データベースへの認証用のusername
- Password: SQL Server ユーザーのpassword
- AuthScheme: 使用する認証スキーマ。入力可能な値はPASSWORD、もしくはOKTA
- Account: SQL Server でのアカウント
- URL: SQL Server インスタンスのURL 例: https://myaccount.snowflakecomputing.com
- Warehouse: SQL Server ウェアハウスの名前
- Database: SQL Server データベース名
- Schema: SQL Server データベースのスキーマ
- [作成およびテスト]をクリックして、正しく接続できているかをテストします。
- これで転送先としてSQL Server を設定できました。CData Sync では、SQL Server のデータベース名を指定するだけで、転送するSQL Server に併せたテーブルスキーマを自動的にCREATE TABLE してくれます。同期データに合わせたテーブルを事前に作成するなどの面倒な手順は必要ありません。もちろん、既存テーブルにマッピングを行いデータ同期を行うことも可能です。
Salesforce とRabbitMQ のデータをSQL Server に統合
CData Sync では、データ転送をジョブ単位で設定します。ジョブは、例えばSalesforce → SQL Server といった1データソース対1転送先の単位で設定し、データソースが持つ複数のテーブルを転送できます。データ転送ジョブを設定するには、[ジョブ]タブに進み、[+ジョブを追加]ボタンをクリックします。
すべてのオブジェクトをデータ転送する場合
Salesforce のすべてのオブジェクト / テーブルをデータ転送するには、[種類]で[すべて同期]を選択して、[タスクを追加]ボタンで確定します。
作成したジョブ画面で、右上の[▷実行]ボタンをクリックするだけで、全Salesforce テーブルをSQL Server に転送できます。
オブジェクトを選択してデータ転送する場合
Salesforce から特定のオブジェクト / テーブルを選択してデータ転送を行うには、[種類]で[標準(個別設定)]を選んでください。
次に[ジョブ]画面で、[タスク]タブをクリックし、[タスクを追加]ボタンをクリックします。 
するとCData Sync で利用可能なオブジェクト / テーブルのリストが表示されるので、データ転送を行うオブジェクトにチェックを付けます(複数選択可)。[タスクを追加]ボタンで確定します。
作成したジョブ画面で、[▷実行]ボタンをクリックして(もしくは各タスク毎の実行ボタンを押して)、データ転送ジョブを実行します。 
このようにとても簡単にSalesforce からSQL Server への同期を行うことができました。
SQL Server に転送されたテーブルを見てみると、無事にSalesforce のデータが転送されていることが確認できます。スコアリング結果を格納するLeadScore_c(カスタム項目)にはまだ何もデータが入っていないので、ここにRabbitMQ のデータを統合したリードスコアリングの計算結果を追加します。
同じ手順で、RabbitMQ のお好みのデータをSQL Server に転送できます。今回はAuthAttempts テーブルを使用しました。
リードスコアリング
それでは、Salesforce のリードをスコアリングしてSQL Server に反映しましょう。このときにRabbitMQ のAuthAttempts のデータを統合して使います。
CData Sync ではSalesforce とRabbitMQ 以外にも400種類以上のデータソースをサポートしているので、スコアリングに必要なデータ(Webサイト上のアクティビティやメール開封率、ダウンロード履歴など)が他にあれば追加してみてください。
それでは、SQL Server のLead_Reverse_ETL テーブルのLeadScore_c を参照してみましょう。
本記事ではリードスコアリングの方法は省きますが、SQL Server 上でSalesforce とRabbitMQ のデータを使ってスコアリングした結果を、以下のようにLeadScore_c カラムに追加しています。
この更新されたリードデータを、元のリードデータを持つSalesforce に書き戻します。
Salesforce への書き戻し
書き戻しを行うには、SQL Server からSalesforce へのジョブを作成する必要があります。ただし、作成方法はデータソースと同期先に注意するだけでほとんど同じです。
では、ジョブを追加ボタンをクリックしてジョブを作成していきます。
- データソース:SQL Server
- 同期先:Salesforce
- 転送モード:元あるリードデータにスコアリング結果を加えるだけなので、Update を使います
※連携方法は、 Insert、Upsert、Update の3パターンから選択可能です。Upsertの場合は、Salesforce で外部ID として登録している項目のみKey として使用可能
ここでテーブル同士を紐づけます。
次にどの項目をキーにするか、またどのカラム同士をマッピングするかを指定します。今回は LeadScore_c 同士でマッピングしました。
設定は以上で、あとは右上の実行ボタンをクリックするだけです。※運用時はスケジュール設定を行ってください。
実行が完了すると、ステータスや更新した行数が表示されます。
では、最後に Salesforce のLeadオブジェクトを見てみましょう。LeadScore 列にSQL Server でスコアリングした結果が取り込まれました!
Salesforce へのリバースETL 構成をCData Sync で実現
このように、Salesforce とRabbitMQ のデータを統合して連携するリバースETL のような複雑に思える構成でも、CData Sync ならノーコードで簡単に実現できます。
リバースETL にはリードスコアリングの他、マスタデータとの連携やWeb 解析ツールが持つユーザーアクティビティとの連携など、幅広いユースケースがあります。30日間の無償トライアルで、リバースETL パイプラインの構築を手軽にお試しください。
日本のユーザー向けにCData Sync は、UI の日本語化、ドキュメントの日本語化、日本語でのテクニカルサポートを提供しています。
もっとユースケースが知りたい!という方は、CData Sync の 導入事例を併せてご覧ください。