Typeform のデータをApache Kafka トピックにストリーミング
Apache Kafka は、主にリアルタイムデータパイプラインやイベント駆動型アプリケーションの構築に使用されるオープンソースのストリーム処理プラットフォームです。CData API Driver for JDBCと組み合わせることで、Kafka はライブのTypeform のデータを扱うことができます。この記事では、Typeform データをApache Kafka トピックに接続、アクセス、ストリーミングする方法と、Confluent Control Center を起動してConfluent プラットフォームのKafka インフラストラクチャを使用して受信したTypeform のデータをユーザーが安全に管理および監視できるようにする方法について説明します。
CData JDBC Driver は最適化されたデータ処理機能を内蔵しており、ライブのTypeform のデータとのやり取りにおいて比類のないパフォーマンスを提供します。Typeform に複雑なSQL クエリを発行すると、ドライバーはフィルタや集計などのサポートされているSQL 操作を直接Typeform にプッシュし、サポートされていない操作(多くの場合SQL 関数やJOIN 操作)については組み込みのSQL エンジンを使用してクライアント側で処理します。組み込みの動的メタデータクエリにより、ネイティブのデータ型を使用してTypeform のデータを操作および分析できます。
前提条件
Apache Kafka トピックでTypeform のデータをストリーミングするためにCData JDBC Driver を接続する前に、クライアントのLinux ベースのシステムに以下をインストールおよび設定してください。
- Confluent Platform for Apache Kafka
- Confluent Hub CLI のインストール
- Confluent Platform 用のSelf-Managed Kafka JDBC Source Connector
Typeform のデータへの新しいJDBC 接続を定義
- Linux ベースのシステムにCData API Driver for JDBCをダウンロードします。
- 以下の手順に従って新しいディレクトリを作成し、すべてのドライバーの内容を展開します:
- Typeform という名前の新しいディレクトリを作成します。
mkdir API
- ダウンロードしたドライバーファイル(.zip)をこの新しいディレクトリに移動します。
mv APIJDBCDriver.zip API/
- CData APIJDBCDriver の内容をこの新しいディレクトリに解凍します。
unzip APIJDBCDriver.zip
- Typeform という名前の新しいディレクトリを作成します。
- Typeform ディレクトリを開き、lib フォルダに移動します。
ls cd lib/
- CData API Driver for JDBC のlib フォルダの内容をKafka Connect JDBC のlib フォルダにコピーします。Kafka Connect JDBC フォルダの内容を確認し、cdata.jdbc.api.jar ファイルがlib フォルダに正常にコピーされたことを確認します。
cp -r /path/to/CData API Driver for JDBC/lib/* /usr/share/confluent-hub-components/confluentinc-kafka-connect-jdbc/lib/ cd /usr/share/confluent-hub-components/confluentinc-kafka-connect-jdbc/lib/
- 以下のコマンドを使用して、CData Typeform JDBC ドライバーのライセンスをインストールします。名前とメールアドレスを入力してください。
java -jar cdata.jdbc.api.jar -l
- プロダクトキーまたは"TRIAL" を入力します(ライセンスの有効期限が切れた場合は、CData サポートチームまでお問い合わせください)。
- 以下のコマンドを使用してConfluent ローカルサービスを起動します:
confluent local services start
これにより、Zookeeper、Kafka、Schema Registry、Kafka REST、Kafka CONNECT、ksqlDB、Control Center などのすべてのConfluent サービスが起動します。これで、CData JDBC Driver for Typeform を使用してKafka Connect Driver 経由でksqlDB のKafka トピックにメッセージをストリーミングする準備が整いました。
- POST HTTP API リクエストを使用してKafka トピックを手動で作成します:
curl --location 'server_address:8083/connectors' --header 'Content-Type: application/json' --data '{ "name": "jdbc_source_cdata_api_01", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "connection.url": "jdbc:api:Profile=C:\profiles\TypeForm.apip;Authscheme=OAuth;OAuthClientId=your_client_id;OAuthClientSecret=your_client_secret;CallbackUrl=your_callback_url;", "topic.prefix": "api-01-", "mode": "bulk" } }'HTTP POST 本文(上記)で使用されるフィールドについて説明します:
- connector.class: 使用するKafka Connect コネクタのJava クラスを指定します。
- connection.url: Typeform データに接続するためのJDBC 接続URL です。
組み込みの接続文字列デザイナー
JDBC URL の作成については、CData API Driver for JDBCに組み込まれた接続文字列デザイナーを使用してください。JAR ファイルをダブルクリックするか、コマンドラインからjar ファイルを実行します。
java -jar cdata.jdbc.api.jar
接続プロパティを入力し、接続文字列をクリップボードにコピーします。
まず、Profile 接続プロパティにTypeForm プロファイルのディスク上の場所を設定します(例:C:\profiles\TypeForm.apip)。次に、ProfileSettings 接続プロパティにTypeForm の接続文字列を設定します(以下を参照)。
TypeForm API プロファイル設定
TypeForm への認証にはOAuth 標準を使用します。
TypeForm に認証するには、まずTypeForm でOAuth アプリケーションを登録および設定する必要があります:https://admin.typeform.com/account#/section/tokens。アプリにはclient ID とclient secret が割り当てられ、接続文字列で設定できます。OAuth アプリケーションの設定の詳細については、https://developer.typeform.com/get-started/ をご参照ください。
使用シナリオによって異なるリダイレクトURI が必要です:
- CData デスクトップアプリケーション:CData デスクトップアプリケーション(Sync、API Server、ArcESB)は/src/oauthCallback.rst でOAuth トークンを受け入れます。ホストとポートはアプリケーションが使用するデフォルトポートと同じです。例えば、http://localhost:8019/ でCData Sync にアクセスする場合、リダイレクトURI はhttp://localhost:8019/src/oauthCallback.rst になります。
- CData クラウドアプリケーション:CData クラウドアプリケーションはデスクトップ版と同様です。https://1.2.3.4/ でConnect AI にアクセスする場合、リダイレクトはhttps://1.2.3.4/src/oauthCallback.rst を使用します。
- デスクトップアプリケーション:デスクトップアプリケーションを使用する場合、URI はhttps://localhost:33333 を推奨します。
- Web アプリケーション:ドライバーを使用してWeb アプリケーションを開発する場合、https://my-website.com/oauth のような独自のURI を使用します。
以下の接続プロパティを設定すると、接続の準備が整います:
- AuthScheme:OAuth に設定します。
- InitiateOAuth:GETANDREFRESH に設定します。InitiateOAuth を使用してOAuthAccessToken を取得するプロセスを管理できます。
- OAuthClientId:アプリ設定で指定されているClient Id を設定します。
- OAuthClientSecret:アプリ設定で指定されているClient Secret を設定します。
- CallbackURL:アプリ設定で指定したRedirect URI を設定します。
- topic.prefix: コネクタによって作成されるKafka トピックに追加されるプレフィックスです。「api-01-」に設定されています。
- mode: コネクタの動作モードを指定します。ここでは「bulk」に設定されており、コネクタがバルクデータ転送を実行するように設定されていることを示しています。
このリクエストにより、Typeform のすべてのテーブル/コンテンツがKafka トピックとして追加されます。
注意: リクエストをPOST するIP アドレス(サーバー)は、Linux ネットワークのIP アドレスです。
- ksqlDB を実行し、トピックを一覧表示します。以下のコマンドを使用します:
ksql list topics;
- トピック内のデータを表示するには、以下のSQL ステートメントを入力します:
PRINT topic FROM BEGINNING;
Confluent Control Center への接続
Confluent Control Center のユーザーインターフェースにアクセスするには、上記のセクションで説明した"confluent local services" を実行し、ローカルブラウザでhttp://<server address>:9021/clusters/ と入力してください。
おわりに
CData API Driver for JDBCの30日間無償トライアルをダウンロードして、Typeform データをApache Kafka にストリーミングしましょう。ご不明な点があれば、サポートチームまでお問い合わせください。