PingOne のデータをApache Kafka トピックにストリーミング

Dibyendu Datta
Dibyendu Datta
Lead Technology Evangelist
CData JDBC Driver とKafka Connect JDBC コネクタを使用して、Apache Kafka でPingOne のデータにアクセスし、ストリーミングできます。

Apache Kafka は、主にリアルタイムデータパイプラインやイベント駆動型アプリケーションの構築に使用されるオープンソースのストリーム処理プラットフォームです。CData JDBC Driver for PingOneと組み合わせることで、Kafka はライブのPingOne のデータを扱うことができます。この記事では、PingOne データをApache Kafka トピックに接続、アクセス、ストリーミングする方法と、Confluent Control Center を起動してConfluent プラットフォームのKafka インフラストラクチャを使用して受信したPingOne のデータをユーザーが安全に管理および監視できるようにする方法について説明します。

CData JDBC Driver は最適化されたデータ処理機能を内蔵しており、ライブのPingOne のデータとのやり取りにおいて比類のないパフォーマンスを提供します。PingOne に複雑なSQL クエリを発行すると、ドライバーはフィルタや集計などのサポートされているSQL 操作を直接PingOne にプッシュし、サポートされていない操作(多くの場合SQL 関数やJOIN 操作)については組み込みのSQL エンジンを使用してクライアント側で処理します。組み込みの動的メタデータクエリにより、ネイティブのデータ型を使用してPingOne のデータを操作および分析できます。

前提条件

Apache Kafka トピックでPingOne のデータをストリーミングするためにCData JDBC Driver を接続する前に、クライアントのLinux ベースのシステムに以下をインストールおよび設定してください。

  1. Confluent Platform for Apache Kafka
  2. Confluent Hub CLI のインストール
  3. Confluent Platform 用のSelf-Managed Kafka JDBC Source Connector

PingOne のデータへの新しいJDBC 接続を定義

  1. Linux ベースのシステムにCData JDBC Driver for PingOneをダウンロードします。
  2. 以下の手順に従って新しいディレクトリを作成し、すべてのドライバーの内容を展開します:
    1. PingOne という名前の新しいディレクトリを作成します。
      		mkdir PingOne
      		
    2. ダウンロードしたドライバーファイル(.zip)をこの新しいディレクトリに移動します。
      		mv PingOneJDBCDriver.zip PingOne/
      		
    3. CData PingOneJDBCDriver の内容をこの新しいディレクトリに解凍します。
      		unzip PingOneJDBCDriver.zip
      		
  3. PingOne ディレクトリを開き、lib フォルダに移動します。
    ls
    cd lib/
    
  4. CData JDBC Driver for PingOnelib フォルダの内容をKafka Connect JDBClib フォルダにコピーします。Kafka Connect JDBC フォルダの内容を確認し、cdata.jdbc.pingone.jar ファイルがlib フォルダに正常にコピーされたことを確認します。
    cp -r /path/to/CData JDBC Driver for PingOne/lib/* /usr/share/confluent-hub-components/confluentinc-kafka-connect-jdbc/lib/
    cd /usr/share/confluent-hub-components/confluentinc-kafka-connect-jdbc/lib/
    
  5. 以下のコマンドを使用して、CData PingOne JDBC ドライバーのライセンスをインストールします。名前とメールアドレスを入力してください。
    	java -jar cdata.jdbc.pingone.jar -l
    	
  6. プロダクトキーまたは"TRIAL" を入力します(ライセンスの有効期限が切れた場合は、CData サポートチームまでお問い合わせください)。
  7. 以下のコマンドを使用してConfluent ローカルサービスを起動します:
    	confluent local services start
    	

    これにより、Zookeeper、Kafka、Schema Registry、Kafka REST、Kafka CONNECT、ksqlDB、Control Center などのすべてのConfluent サービスが起動します。これで、CData JDBC Driver for PingOne を使用してKafka Connect Driver 経由でksqlDB のKafka トピックにメッセージをストリーミングする準備が整いました。

    Confluent ローカルサービスを起動
  8. POST HTTP API リクエストを使用してKafka トピックを手動で作成します:
     curl --location 'server_address:8083/connectors'
    	--header 'Content-Type: application/json'
    	--data '{
    		"name": "jdbc_source_cdata_pingone_01",
    		"config": {
    			"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    			"connection.url": "jdbc:pingone:AuthScheme=OAuth;WorkerAppEnvironmentId=eebc33a8-xxxx-4f3a-yyyy-d3e5262fd49e;Region=NA;OAuthClientId=client_id;OAuthClientSecret=client_secret;",
    		"topic.prefix": "pingone-01-",
    		"mode": "bulk"
    		}
    	}'
    

    HTTP POST 本文(上記)で使用されるフィールドについて説明します:

    • connector.class: 使用するKafka Connect コネクタのJava クラスを指定します。
    • connection.url: PingOne データに接続するためのJDBC 接続URL です。

      組み込みの接続文字列デザイナー

      JDBC URL の作成については、CData JDBC Driver for PingOneに組み込まれた接続文字列デザイナーを使用してください。JAR ファイルをダブルクリックするか、コマンドラインからjar ファイルを実行します。

      		java -jar cdata.jdbc.pingone.jar
      		

      接続プロパティを入力し、接続文字列をクリップボードにコピーします。

      PingOne に接続するには以下のプロパティを設定します。

      • Region:自身のPingOne 組織のデータがホスティングされている地域。
      • AuthScheme:PingOne に接続する際に使用する認証の種類。
      • WorkerAppEnvironmentId (デフォルトのPingOne ドメインを使用する場合に必要)、またはAuthorizationServerURL のいずれかで、下で説明するように設定します。

      WorkerAppEnvironmentId の設定

      WorkerAppEnvironmentId は、Worker アプリケーションが存在するPingOne 環境のID です。 このパラメータは、環境がデフォルトのPingOne ドメイン(auth.pingone)を利用している場合のみ使用されます。 これは、ヘルプドキュメントカスタムOAuth アプリケーションの作成で説明するように、PingOne への認証に使用するカスタムOAuth アプリケーションを作成した後に設定します。

      はじめに、このプロパティの値を見つけます。

      1. 自身のPingOne 組織のホームページからナビゲーションサイドバーに移動し、Environments をクリックします。
      2. OAuth / Worker のカスタムアプリケーションを作成した環境(通常はAdministrators)を見つけ、Manage Environment をクリックします。 環境のホームページが表示されます。
      3. 環境のホームページのナビゲーションサイドバーで、Applications をクリックします。
      4. リストから、OAuth またはWorker アプリケーションの詳細を見つけます。
      5. Environment ID フィールドの値をコピーします。 以下の例に似たものになるはずです:
        WorkerAppEnvironmentId='11e96fc7-aa4d-4a60-8196-9acf91424eca'

      次に、WorkerAppEnvironmentIdEnvironment ID フィールドの値に設定します。

      AuthorizationServerURL の設定

      AuthorizationServerURL は、お使いのアプリケーションが配置されている環境のPingOne 認可サーバーのベースURL です。 このプロパティは、PingOne プラットフォームAPI ドキュメントで説明されているように、環境にカスタムドメインを設定した場合にのみ使用されます。 Custom Domains を参照してください。

      OAuth でのPingOne への認証

      PingOne はOAuth とOAuthClient 認証の両方をサポートしています。 上述の設定手順に加え、OAuth またはOAuthCliet 認証をサポートするために、さらに2つの手順を完了する必要があります。

      • ヘルプドキュメントカスタムOAuth アプリケーションの作成で説明するように、カスタムOAuth アプリケーションを作成して設定します。
      • ドライバーがデータモデル内のエンティティにアクセスできるようにするには、ヘルプドキュメントのAdministrator Roles での説明のとおり、使用するアドミンユーザー / ワーカーアプリケーションに対して正しいロールを設定していることを確認してください。
      • 以下のサブセクションで説明されているように、選択した認証スキームと認証フローに適切なプロパティを設定します。

      OAuth(認可コードグラント)

      AuthSchemeOAuth に設定します。

      デスクトップアプリケーション

      OAuth アクセストークンの取得およびリフレッシュ

      以下を設定して、接続してください。

      • InitiateOAuthGETANDREFRESH。繰り返しOAuth の交換を行ったり、手動でOAuthAccessToken を設定する必要をなくすには、InitiateOAuth を使用します。
      • OAuthClientId:カスタムOAuth アプリケーションを作成した際に取得したClient ID。
      • OAuthClientSecret:カスタムOAuth アプリケーションを作成した際に取得したClient Secret。
      • CallbackURL:カスタムOAuth アプリケーションの登録時に定義したリダイレクトURI。例:https://localhost:3333

      接続すると、CData 製品 はデフォルトブラウザでPingOne のOAuth エンドポイントを開きます。ログインして、アプリケーションにアクセス許可を与えます。 ドライバーはこれでOAuth プロセスを完了します。

      1. ドライバーはPingOne からアクセストークンを取得し、それを使ってデータをリクエストします。
      2. OAuth 値はOAuthSettingsLocation で指定された場所に保存され、接続間で永続化されるようにします。

      ドライバーはアクセストークンの期限が切れると自動的にリフレッシュします。

      Web アプリケーションやヘッドレスマシン、クライアントクレデンシャルグラントを含むその他のOAuth メソッドについては、ヘルプドキュメントを参照してください。

      組み込みの接続文字列デザイナーを使用してJDBC URL を生成(Salesforce の例)
    • topic.prefix: コネクタによって作成されるKafka トピックに追加されるプレフィックスです。「pingone-01-」に設定されています。
    • mode: コネクタの動作モードを指定します。ここでは「bulk」に設定されており、コネクタがバルクデータ転送を実行するように設定されていることを示しています。

    このリクエストにより、PingOne のすべてのテーブル/コンテンツがKafka トピックとして追加されます。

    注意: リクエストをPOST するIP アドレス(サーバー)は、Linux ネットワークのIP アドレスです。

  9. ksqlDB を実行し、トピックを一覧表示します。以下のコマンドを使用します:
    ksql
    list topics;
    
    Kafka トピックを一覧表示(BigCommerce の例)
  10. トピック内のデータを表示するには、以下のSQL ステートメントを入力します:
    PRINT topic FROM BEGINNING;
    

Confluent Control Center への接続

Confluent Control Center のユーザーインターフェースにアクセスするには、上記のセクションで説明した"confluent local services" を実行し、ローカルブラウザでhttp://<server address>:9021/clusters/ と入力してください。

Confluent Control Center に接続

おわりに

CData JDBC Driver for PingOneの30日間無償トライアルをダウンロードして、PingOne データをApache Kafka にストリーミングしましょう。ご不明な点があれば、サポートチームまでお問い合わせください。

はじめる準備はできましたか?

PingOne Driver の無料トライアルをダウンロードしてお試しください:

 ダウンロード

詳細:

PingOne Icon PingOne JDBC Driver お問い合わせ

PingOne データと連携するパワフルなJava アプリケーションを短時間・低コストで作成して配布できます。