Salesforce のデータをApache Kafka トピックにストリーミング
Apache Kafka は、主にリアルタイムデータパイプラインやイベント駆動型アプリケーションの構築に使用されるオープンソースのストリーム処理プラットフォームです。CData JDBC Driver for Salesforceと組み合わせることで、Kafka はライブのSalesforce のデータを扱うことができます。この記事では、Salesforce データをApache Kafka トピックに接続、アクセス、ストリーミングする方法と、Confluent Control Center を起動してConfluent プラットフォームのKafka インフラストラクチャを使用して受信したSalesforce のデータをユーザーが安全に管理および監視できるようにする方法について説明します。
CData JDBC Driver は最適化されたデータ処理機能を内蔵しており、ライブのSalesforce のデータとのやり取りにおいて比類のないパフォーマンスを提供します。Salesforce に複雑なSQL クエリを発行すると、ドライバーはフィルタや集計などのサポートされているSQL 操作を直接Salesforce にプッシュし、サポートされていない操作(多くの場合SQL 関数やJOIN 操作)については組み込みのSQL エンジンを使用してクライアント側で処理します。組み込みの動的メタデータクエリにより、ネイティブのデータ型を使用してSalesforce のデータを操作および分析できます。
Salesforce データ連携について
CData を使用すれば、Salesforce のライブデータへのアクセスと統合がこれまでになく簡単になります。お客様は CData の接続機能を以下の目的で利用しています:
- カスタムエンティティやフィールドにアクセスでき、Salesforce ユーザーは Salesforce のすべてにアクセスできます。
- アトミックおよびバッチ更新操作を作成できます。
- Salesforce データの読み取り、書き込み、更新、削除ができます。
- SOAP API バージョン 30.0 のサポートにより、最新の Salesforce 機能を活用できます。
- SOQL サポートによる複雑なクエリの Salesforce サーバーへのプッシュダウンにより、パフォーマンスの向上を実現できます。
- SQL ストアドプロシージャを使用して、ジョブの作成・取得・中止・削除、添付ファイルやドキュメントのアップロード・ダウンロードなどのアクションを実行できます。
ユーザーは、Salesforce データを以下と頻繁に統合しています:
- 他の ERP、マーケティングオートメーション、HCM など。
- Power BI、Tableau、Looker などのお気に入りのデータツール。
- データベースやデータウェアハウス。
CData ソリューションが Salesforce とどのように連携するかについての詳細は、Salesforce 統合ページをご覧ください。
はじめに
前提条件
Apache Kafka トピックでSalesforce のデータをストリーミングするためにCData JDBC Driver を接続する前に、クライアントのLinux ベースのシステムに以下をインストールおよび設定してください。
- Confluent Platform for Apache Kafka
- Confluent Hub CLI のインストール
- Confluent Platform 用のSelf-Managed Kafka JDBC Source Connector
Salesforce のデータへの新しいJDBC 接続を定義
- Linux ベースのシステムにCData JDBC Driver for Salesforceをダウンロードします。
- 以下の手順に従って新しいディレクトリを作成し、すべてのドライバーの内容を展開します:
- Salesforce という名前の新しいディレクトリを作成します。
mkdir Salesforce
- ダウンロードしたドライバーファイル(.zip)をこの新しいディレクトリに移動します。
mv SalesforceJDBCDriver.zip Salesforce/
- CData SalesforceJDBCDriver の内容をこの新しいディレクトリに解凍します。
unzip SalesforceJDBCDriver.zip
- Salesforce という名前の新しいディレクトリを作成します。
- Salesforce ディレクトリを開き、lib フォルダに移動します。
ls cd lib/
- CData JDBC Driver for Salesforce のlib フォルダの内容をKafka Connect JDBC のlib フォルダにコピーします。Kafka Connect JDBC フォルダの内容を確認し、cdata.jdbc.salesforce.jar ファイルがlib フォルダに正常にコピーされたことを確認します。
cp -r /path/to/CData JDBC Driver for Salesforce/lib/* /usr/share/confluent-hub-components/confluentinc-kafka-connect-jdbc/lib/ cd /usr/share/confluent-hub-components/confluentinc-kafka-connect-jdbc/lib/
- 以下のコマンドを使用して、CData Salesforce JDBC ドライバーのライセンスをインストールします。名前とメールアドレスを入力してください。
java -jar cdata.jdbc.salesforce.jar -l
- プロダクトキーまたは"TRIAL" を入力します(ライセンスの有効期限が切れた場合は、CData サポートチームまでお問い合わせください)。
- 以下のコマンドを使用してConfluent ローカルサービスを起動します:
confluent local services start
これにより、Zookeeper、Kafka、Schema Registry、Kafka REST、Kafka CONNECT、ksqlDB、Control Center などのすべてのConfluent サービスが起動します。これで、CData JDBC Driver for Salesforce を使用してKafka Connect Driver 経由でksqlDB のKafka トピックにメッセージをストリーミングする準備が整いました。
- POST HTTP API リクエストを使用してKafka トピックを手動で作成します:
curl --location 'server_address:8083/connectors' --header 'Content-Type: application/json' --data '{ "name": "jdbc_source_cdata_salesforce_01", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "connection.url": "jdbc:salesforce:User=username;Password=password;SecurityToken=Your_Security_Token;", "topic.prefix": "salesforce-01-", "mode": "bulk" } }'HTTP POST 本文(上記)で使用されるフィールドについて説明します:
- connector.class: 使用するKafka Connect コネクタのJava クラスを指定します。
- connection.url: Salesforce データに接続するためのJDBC 接続URL です。
組み込みの接続文字列デザイナー
JDBC URL の作成については、CData JDBC Driver for Salesforceに組み込まれた接続文字列デザイナーを使用してください。JAR ファイルをダブルクリックするか、コマンドラインからjar ファイルを実行します。
java -jar cdata.jdbc.salesforce.jar
接続プロパティを入力し、接続文字列をクリップボードにコピーします。
Salesforce 接続プロパティの設定方法
埋め込みOAuth(UI でのログイン)による接続設定
それでは、Salesforce への接続について説明していきましょう。最も簡単な方法として、Salesforce にログインする際と同様にUI 上からログインするだけで接続設定が完了します(埋め込みOAuth)。この方法をご利用になる場合は、「Salesforce への接続」をクリックしてください。
標準認証の設定
埋め込みOAuth 以外の方法を利用する場合、以下の3つの認証方式をご利用いただけます。標準的な認証方式では、以下の情報が必要となります。
- ユーザー名
- パスワード
- セキュリティトークン
セキュリティトークンの取得方法については、セキュリティトークン取得手順をご確認ください。
OAuth 認証の設定
ユーザー名とパスワードによる認証がご利用いただけない(避けたい)場合は、OAuth 認証をお使いいただけます。
SSO(シングルサインオン)の設定
最後に、IDプロバイダー経由でのシングルサインオンをご利用になる場合は、以下のプロパティを設定してください。
- SSOProperties
- SSOLoginUrl
- TokenUrl
より詳細な設定手順については、ヘルプドキュメントの「はじめに」セクションをご確認ください。
- topic.prefix: コネクタによって作成されるKafka トピックに追加されるプレフィックスです。「salesforce-01-」に設定されています。
- mode: コネクタの動作モードを指定します。ここでは「bulk」に設定されており、コネクタがバルクデータ転送を実行するように設定されていることを示しています。
このリクエストにより、Salesforce のすべてのテーブル/コンテンツがKafka トピックとして追加されます。
注意: リクエストをPOST するIP アドレス(サーバー)は、Linux ネットワークのIP アドレスです。
- ksqlDB を実行し、トピックを一覧表示します。以下のコマンドを使用します:
ksql list topics;
- トピック内のデータを表示するには、以下のSQL ステートメントを入力します:
PRINT topic FROM BEGINNING;
Confluent Control Center への接続
Confluent Control Center のユーザーインターフェースにアクセスするには、上記のセクションで説明した"confluent local services" を実行し、ローカルブラウザでhttp://<server address>:9021/clusters/ と入力してください。
おわりに
CData JDBC Driver for Salesforceの30日間無償トライアルをダウンロードして、Salesforce データをApache Kafka にストリーミングしましょう。ご不明な点があれば、サポートチームまでお問い合わせください。