【ノーコード・リバースETL】RabbitMQのデータをPostgreSQLで集計・分析してSalesforceに連携してみよう

宮本航太
宮本航太
プロダクトスペシャリスト
リバースETL構成でお困りですか?RabbitMQ・SalesforceデータをPostgreSQLで統合・分析後、リードスコアを付加してSalesforceに書き戻し。自動データパイプライン構築を即実現。



CData Sync は、400種類以上のSaaS / DB のデータを各種DB・データウェアハウスにノーコードで統合可能なETL / ELT ツールです。CData Sync では、DB / DWH だけでなくSalesforce をはじめとする一部SaaS をデータの転送先としてサポートしているため、いわゆるリバースETL 構成のデータパイプラインを構築できます

本記事では、RabbitMQ とSalesforce のデータをPostgreSQL に統合、リードスコアを計算・付加した後にSalesforce 連携する、というリバースETL 構成のパイプラインを作っていきます。

CData Sync とは?

CData Sync の概要画像

CData Sync は、レポーティング・ダッシュボード、機械学習・AI などで使えるよう、社内のデータを一か所に統合して管理できるデータ基盤をノーコードで構築できるETL ツールで、以下の特徴を持っています。

  1. RabbitMQ をはじめとする400種類以上のSaaS / DB データに対応
  2. 主要なRDB、データレイク、データストア、データウェアハウスにデータを転送
  3. 業務データのデータ分析基盤へのETL / ELT 機能に特化し、極限まで設定操作をシンプルに
  4. 主要なSaaS データの差分更新やCDC(Change Data Capture、変更データキャプチャ)のサポート
  5. フレキシブルなSQL / dbt 連携での取得データの変換
  6. Salesforce を始めとする一部SaaS へのデータ転送(リバースETL)をサポート
他にもパワフルな機能を搭載しています。

リバースETL とは?

ETL の逆方向のデータ転送手法で、データウェアハウス(DWH)からSaaS へデータを転送することを指します。アプリ間連携のようなEAI とは異なり、ETL のようにバッチ処理での連携を行います。例えば、SalesforceとRabbitMQ のデータをデータウェアハウス内に統合、集計・予測してからSalesforceに書き戻したい場合、以下の2つの方法があります。

  1. Salesforce → データウェアハウスで連携
  2. データウェアハウスで変換されたデータをSalesforce に書き戻し
②の構成がリバースETL に当たります。 リバースETL の構成例

それでは、RabbitMQ とSalesforce のデータを統合して書き戻すための具体的な設定手順を説明していきます。

実現するシナリオ

RabbitMQ とSalesforce の情報を一度PostgreSQL に統合して、統合したデータを使ってリードをスコアリング、その結果をSalesforce に書き戻します。 リバースETL のデータソースとなるDB としてPostgreSQL を使い、全体のデータの流れは以下のようになります。

Salesforce (Lead) + RabbitMQ → PostgreSQL(スコアリング)→ Salesforce(Lead)

なお、Salesforce のLead オブジェクトにはスコアリング結果を格納するカスタム項目を事前に作成しておきます。

Salesforce とRabbitMQ への接続を設定

はじめに、Salesforce とRabbitMQ のデータをPostgreSQL に転送するための設定を行います。

CData Sync のブラウザ管理コンソールにログインします。CData Sync のインストールをまだ行っていない方は本記事の製品リンクからCData Sync をクリックして、30日の無償トライアルとしてCData Sync をインストールしてください。インストール後にCData Sync が起動して、ブラウザ設定画面が開きます。

それでは、データソースとしてRabbitMQ を設定していきましょう。左の[接続]タブをクリックします。

  1. [+接続の追加]ボタンをクリックします。 コネクションの追加。
  2. [データソース]タブを選択して、リスト表示されるデータソースを選ぶか、検索バーにデータソース名を入力して、RabbitMQ を見つけます。
  3. RabbitMQ の右側の[→]をクリックして、RabbitMQ アカウントへの接続画面を開きます。もし、RabbitMQ のコネクタがデフォルトでCData Sync にインストールされていない場合には、ダウンロードアイコン(コネクタのアップロードアイコン)をクリックし、[ダウンロード]をクリックすると、CData Sync にコネクタがインストールされます。 データソースの追加。
  4. 接続プロパティにRabbitMQ に接続するアカウント情報を入力をします。

    RabbitMQ Management HTTP API について

    RabbitMQ は、複数のメッセージングプロトコルをサポートするオープンソースのメッセージブローカーです。RabbitMQ Management HTTP API は、RabbitMQ サーバーの管理データと監視データに HTTP 経由でアクセスする手段を提供します。この API では、仮想ホスト、エクスチェンジ、キュー、バインディング、コネクション、チャネル、コンシューマー、ユーザー、権限、ポリシー、クラスター全体の統計情報を取得できます。

    HTTP API を利用するには、RabbitMQ サーバーで Management プラグインを有効化する必要があります。デフォルトでは、管理インターフェースはポート 15672 でリッスンします。

    Basic 認証の設定

    RabbitMQ Management HTTP API は HTTP Basic 認証を使用します。RabbitMQ 管理ユーザーのユーザー名とパスワードを指定する必要があります。

    管理 API へのアクセスを有効にするには、以下のステップで進めます:

    1. サーバーで RabbitMQ Management プラグインが有効になっていることを確認します(rabbitmq-plugins enable rabbitmq_management)。
    2. 既存の管理ユーザーを使用するか、適切な管理タグ(management、policymaker、monitoring、または administrator)を持つユーザーを作成します。
    3. RabbitMQ Management HTTP API の完全なベース URL を控えておきます(例:http://localhost:15672)。

    RabbitMQ サーバーを設定したら、以下の接続プロパティを設定して接続します:

    • AuthScheme:Basic に設定します。
    • URL:RabbitMQ Management HTTP API のベース URL に設定します(例:http://localhost:15672)。
    • User:RabbitMQ の管理ユーザー名に設定します(例:guest)。
    • Password:RabbitMQ の管理パスワードに設定します。

    接続文字列の例:

    Profile=C:\profiles\RabbitMQ.apip;AuthScheme=Basic;URL=http://localhost:15672;User=guest;Password=guest;
    

    利用可能なテーブル

    RabbitMQ プロファイルでは、以下のテーブルにアクセスできます:

    • Overview - クラスター全体の統計情報と RabbitMQ ノードに関する情報
    • Nodes - RabbitMQ クラスター内の個々のノードに関する情報
    • NodeMemory - 特定のクラスターノードの詳細なメモリ使用状況の内訳
    • Connections - ブローカーへのすべてのオープンな AMQP コネクションの一覧
    • Channels - すべてのコネクションにわたるオープンな AMQP チャネルの一覧
    • Consumers - すべてのキューに登録されたコンシューマーの一覧
    • Exchanges - すべての仮想ホストで宣言されたエクスチェンジの一覧
    • Queues - すべての仮想ホストで宣言されたキューの一覧
    • Bindings - エクスチェンジとキュー間のすべてのバインディングの一覧
    • VirtualHosts - ブローカーに設定された仮想ホストの一覧
    • VhostPermissions - 特定の仮想ホスト内のユーザー権限
    • Users - すべての RabbitMQ ユーザーの一覧
    • Permissions - すべての仮想ホストにわたる全ユーザーの権限レコード
    • TopicPermissions - 全ユーザーのトピックレベルの権限レコード
    • Policies - 仮想ホスト内のキューおよびエクスチェンジに適用されたポリシーの一覧
    • OperatorPolicies - 仮想ホスト内のキューに適用されたオペレーターポリシーの一覧
    • Parameters - 仮想ホストごとのコンポーネントパラメータ(例:federation、shovel)の一覧
    • GlobalParameters - すべての仮想ホストに適用されるグローバルパラメータの一覧
    • VhostLimits - 特定の仮想ホストに設定されたリソース制限
    • UserLimits - 特定のユーザーに設定されたリソース制限
    • FeatureFlags - フィーチャーフラグの一覧と、ノード上での有効/無効の状態
    • DeprecatedFeatures - 非推奨機能の一覧と、その使用状態
    • AuthAttempts - ノードの認証試行統計
    • ClusterName - RabbitMQ クラスターの名前
    • WhoAmI - 現在認証されている管理ユーザーに関する情報
    • ExchangeBindingsSource - 特定のエクスチェンジがソースとなっているバインディング
    • ExchangeBindingsDestination - 特定のエクスチェンジが宛先となっているバインディング
    • QueueBindings - 仮想ホスト内の特定のキューのバインディング
    データソースの追加。
  5. [作成およびテスト]をクリックして、正しくRabbitMQ に接続できているかをテストして保存します。これでレプリケーションのデータソースとしてRabbitMQ への接続が設定されました。

Salesforce への接続を設定

データソースとしてSalesforce を設定します。接続プロパティまでの設定方法は基本的にRabbitMQ と同じです。

Salesforce への接続には通常のログインの他、OAuth やSSO を利用できます。ログイン方式では、ユーザー名、パスワード、セキュリティトークンを使って接続します。Salesforce セキュリティトークンの取得についてはこちらの記事をご確認ください。

ユーザー名、パスワードを使用しない、またはできない場合、OAuth 認証を利用できます。

SSO (シングルサインオン) は、SSOProperties、SSOLoginUrl、TokenUrl プロパティを設定することでID プロバイダー経由で利用できます。詳細はヘルプドキュメントの「はじめに」を参照してください。

PostgreSQL への接続を設定

次に、PostgreSQL への接続を設定します。同じく[接続]タブを開きます。

  1. [+接続の追加]ボタンをクリックします。
  2. [同期先]タブを選択して、リスト表示されるデータソースを選ぶか、検索バーにデータソース名を入力して、PostgreSQL を見つけます。
  3. PostgreSQL の右側の[→]をクリックして、PostgreSQL データベースへの接続画面を開きます。 PostgreSQL をDestination に選択
  4. 必要な接続プロパティを入力します。PostgreSQL との接続には、通常のUser / Password での認証の他にSSH を利用してよりセキュアに接続することもできますので、併せてご紹介します。

    SSH なしの接続には、Server、Port(デフォルトは5432)、Database、およびUser、Password のプロパティを設定します。Database プロパティが設定されない場合には、User のデフォルトデータベースに接続します。

    パスワード方式によるSSH 接続

    パスワード方式によるSSH接続時に必要なプロパティ一覧を以下に示します。

    • User: PostgreSQL のユーザ
    • Password: PostgreSQL のパスワード
    • Database: PostgreSQL の接続先データベース
    • Server: PostgreSQL のサーバー
    • Port: PostgreSQL のポート
    • UserSSH: "true"
    • SSHAuthMode: "Password"
    • SSHPort: SSH のポート
    • SSHServer: SSH サーバー
    • SSHUser: SSH ユーザー
    • SSHPassword: SSH パスワード

    接続文字列形式では以下のようになります。

    User=admin;Password=adminpassword;Database=test;Server=postgresql-server;Port=5432;UseSSH=true;SSHPort=22;SSHServer=ssh-server;SSHUser=root;SSHPassword=sshpasswd;

    公開鍵認証方式によるSSH 接続

    公開鍵認証によるSSH接続時に必要なプロパティ一覧を以下に示します。

    • User: PostgreSQL のユーザ
    • Password: PostgreSQL のパスワード
    • Database: PostgreSQL の接続先データベース
    • Server: PostgreSQL のサーバー
    • Port: PostgreSQL のポート
    • UserSSH: "true"
    • SSHAuthMode: "Public_Key"
    • SSHClientCertType: キーストアの種類
    • SSHPort: SSH のポート
    • SSHServer: SSH サーバー
    • SSHUser: SSH ユーザー
    • SSHClientCert: 秘密鍵ファイルのパス

    接続文字列形式では以下のようになります。

    User=admin;Password=adminpassword;Database=test;Server=PostgreSQL-server;Port=5432;UseSSH=true;SSHClientCertType=PEMKEY_FILE;SSHPort=22;SSHServer=ssh-server;SSHUser=root;SSHClientCert=C:\Keys\key.pem;
  5. 接続設定が完了したら、[作成およびテスト]をクリックして、正しく接続できているかをテストします。 同期先接続のテスト
  6. これで転送先としてPostgreSQL を設定できました。CData Sync では、PostgreSQL のデータベース名を指定するだけで、転送するPostgreSQL に合わせたテーブルスキーマを自動的にCREATE TABLE してくれます。同期データに合わせたテーブルを事前に作成するなどの面倒な手順は必要ありません。もちろん、既存テーブルにマッピングを行いデータ同期を行うことも可能です。

Salesforce とRabbitMQ のデータをPostgreSQL に統合

CData Sync では、データ転送をジョブ単位で設定します。ジョブは、例えばSalesforce → PostgreSQL といった1データソース対1転送先の単位で設定し、データソースが持つ複数のテーブルを転送できます。データ転送ジョブを設定するには、[ジョブ]タブに進み、[+ジョブを追加]ボタンをクリックします。 ジョブの追加

すべてのオブジェクトをデータ転送する場合

Salesforce のすべてのオブジェクト / テーブルをデータ転送するには、[種類]で[すべて同期]を選択して、[タスクを追加]ボタンで確定します。

作成したジョブ画面で、右上の[▷実行]ボタンをクリックするだけで、全Salesforce テーブルをPostgreSQL に転送できます。

オブジェクトを選択してデータ転送する場合

Salesforce から特定のオブジェクト / テーブルを選択してデータ転送を行うことが可能です。[種類]では[標準(個別設定)]を選んでください。

次に[ジョブ]画面で、[タスク]タブをクリックし、[タスクを追加]ボタンをクリックします。 ジョブへのタスク追加。

するとCData Sync で利用可能なオブジェクト / テーブルのリストが表示されるので、データ転送を行うオブジェクトにチェックを付けます(複数選択可)。[タスクを追加]ボタンで確定します。

タスク選択。

作成したジョブ画面で、[▷実行]ボタンをクリックして(もしくは各タスク毎の実行ボタンを押して)、データ転送ジョブを実行します。 作成したジョブの実行(Salesforce の例)。

このようにとても簡単にSalesforce からPostgreSQL への同期を行うことができました。

PostgreSQL に転送されたテーブルを見てみると、Salesforce のデータが転送されていることが確認できます。スコアリング結果を格納するLeadScore_c(カスタム項目)にはまだ何もデータが入っていないので、ここにRabbitMQ のデータを統合したリードスコアリングの計算結果を追加します。

PostgreSQL への転送結果

同じ手順で、RabbitMQ のお好みのデータをPostgreSQL に転送できます。今回はAuthAttempts テーブルを使用しました。

リードスコアリング

それでは、Salesforce のリードをスコアリングしてPostgreSQL に反映しましょう。このときにRabbitMQ のAuthAttempts データを統合して使います。

CData Sync ではSalesforce とRabbitMQ 以外にも400種類以上のデータソースをサポートしているので、スコアリングに必要なデータ(Webサイト上のユーザーアクティビティやメール開封率、ダウンロード履歴など)が他にあれば追加してみてください。

それでは、PostgreSQL のLead_reverse テーブルのLeadScore_c を参照してみましょう。

本記事ではリードスコアリングの方法は省きますが、PostgreSQL 上でSalesforce とRabbitMQ のデータを使ってスコアリングした結果は以下のようにLeadScore__c カラムに追加しています。

スコアリングを算出してLeadScore_c カラムに追加

この更新されたリードデータを、元のリードデータを持つSalesforce に書き戻します。

Salesforce への書き戻し

書き戻しを行うには、PostgreSQL からSalesforce へのジョブを作成する必要があります。ただし、作成方法はデータソースと同期先に注意するだけでほとんど同じです。

では、ジョブを追加ボタンをクリックしてジョブを作成していきます。

  • データソース:PostgreSQL
  • 同期先:Salesforce
  • 転送モード:元あるリードデータにスコアリング結果を加えるだけなので、Update
ジョブ追加画面ではUpdateを選択

※連携方法は、 Insert、Upsert、Update の3パターンから選択可能です。Upsertの場合は、Salesforce で外部ID として登録している項目のみKey として使用可能

ここでテーブル同士を紐づけます。

PostgreSQL のLead_reverse テーブルをSalesforce のLead テーブルに同期する設定

次にどの項目をキーにするか、またどのカラム同士をマッピングするかを指定します。今回は LeadScore_c 同士でマッピングしました。

カラムのマッピングを設定

設定は以上で、あとは右上の実行ボタンをクリックするだけです。※運用時はスケジュール設定を行ってください。

右上の実行ボタンをクリック

実行が完了すると、ステータスや更新した行数が表示されます。

ステータスがSuccessfulになっていることを確認

では、最後に Salesforce のLeadオブジェクトを見てみましょう。LeadScore 列にPostgreSQL でスコアリングした結果が取り込まれました!

PostgreSQL とSalesforce のテーブルを紐づけ

Salesforce へのリバースETL 構成をCData Sync で実現

このように、Salesforce とRabbitMQ のデータを統合して書き戻すリバースETL のような複雑に思える構成でも、CData Sync ならノーコードで簡単に実現できます。

リバースETL にはリードスコアリングの他、マスタデータとの連携やWeb 解析ツールが持つユーザーアクティビティとの連携など、幅広いユースケースがあります。30日間の無償トライアルで、リバースETL パイプラインの構築を手軽にお試しください。

日本のユーザー向けにCData Sync は、UI の日本語化、ドキュメントの日本語化、日本語でのテクニカルサポートを提供しています。

もっとユースケースが知りたい!という方は、CData Sync の 導入事例を併せてご覧ください。

はじめる準備はできましたか?

詳細はこちら、または無料トライアルにお申し込みください:

CData Sync