HubDB のデータをApache Kafka トピックにストリーミング
Apache Kafka は、主にリアルタイムデータパイプラインやイベント駆動型アプリケーションの構築に使用されるオープンソースのストリーム処理プラットフォームです。CData JDBC Driver for HubDBと組み合わせることで、Kafka はライブのHubDB のデータを扱うことができます。この記事では、HubDB データをApache Kafka トピックに接続、アクセス、ストリーミングする方法と、Confluent Control Center を起動してConfluent プラットフォームのKafka インフラストラクチャを使用して受信したHubDB のデータをユーザーが安全に管理および監視できるようにする方法について説明します。
CData JDBC Driver は最適化されたデータ処理機能を内蔵しており、ライブのHubDB のデータとのやり取りにおいて比類のないパフォーマンスを提供します。HubDB に複雑なSQL クエリを発行すると、ドライバーはフィルタや集計などのサポートされているSQL 操作を直接HubDB にプッシュし、サポートされていない操作(多くの場合SQL 関数やJOIN 操作)については組み込みのSQL エンジンを使用してクライアント側で処理します。組み込みの動的メタデータクエリにより、ネイティブのデータ型を使用してHubDB のデータを操作および分析できます。
前提条件
Apache Kafka トピックでHubDB のデータをストリーミングするためにCData JDBC Driver を接続する前に、クライアントのLinux ベースのシステムに以下をインストールおよび設定してください。
- Confluent Platform for Apache Kafka
- Confluent Hub CLI のインストール
- Confluent Platform 用のSelf-Managed Kafka JDBC Source Connector
HubDB のデータへの新しいJDBC 接続を定義
- Linux ベースのシステムにCData JDBC Driver for HubDBをダウンロードします。
- 以下の手順に従って新しいディレクトリを作成し、すべてのドライバーの内容を展開します:
- HubDB という名前の新しいディレクトリを作成します。
mkdir HubDB
- ダウンロードしたドライバーファイル(.zip)をこの新しいディレクトリに移動します。
mv HubDBJDBCDriver.zip HubDB/
- CData HubDBJDBCDriver の内容をこの新しいディレクトリに解凍します。
unzip HubDBJDBCDriver.zip
- HubDB という名前の新しいディレクトリを作成します。
- HubDB ディレクトリを開き、lib フォルダに移動します。
ls cd lib/
- CData JDBC Driver for HubDB のlib フォルダの内容をKafka Connect JDBC のlib フォルダにコピーします。Kafka Connect JDBC フォルダの内容を確認し、cdata.jdbc.hubdb.jar ファイルがlib フォルダに正常にコピーされたことを確認します。
cp -r /path/to/CData JDBC Driver for HubDB/lib/* /usr/share/confluent-hub-components/confluentinc-kafka-connect-jdbc/lib/ cd /usr/share/confluent-hub-components/confluentinc-kafka-connect-jdbc/lib/
- 以下のコマンドを使用して、CData HubDB JDBC ドライバーのライセンスをインストールします。名前とメールアドレスを入力してください。
java -jar cdata.jdbc.hubdb.jar -l
- プロダクトキーまたは"TRIAL" を入力します(ライセンスの有効期限が切れた場合は、CData サポートチームまでお問い合わせください)。
- 以下のコマンドを使用してConfluent ローカルサービスを起動します:
confluent local services start
これにより、Zookeeper、Kafka、Schema Registry、Kafka REST、Kafka CONNECT、ksqlDB、Control Center などのすべてのConfluent サービスが起動します。これで、CData JDBC Driver for HubDB を使用してKafka Connect Driver 経由でksqlDB のKafka トピックにメッセージをストリーミングする準備が整いました。
- POST HTTP API リクエストを使用してKafka トピックを手動で作成します:
curl --location 'server_address:8083/connectors' --header 'Content-Type: application/json' --data '{ "name": "jdbc_source_cdata_hubdb_01", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "connection.url": "jdbc:hubdb:AuthScheme=OAuth;OAuthClientID=MyOAuthClientID;OAuthClientSecret=MyOAuthClientSecret;CallbackURL=http://localhost:33333;", "topic.prefix": "hubdb-01-", "mode": "bulk" } }'HTTP POST 本文(上記)で使用されるフィールドについて説明します:
- connector.class: 使用するKafka Connect コネクタのJava クラスを指定します。
- connection.url: HubDB データに接続するためのJDBC 接続URL です。
組み込みの接続文字列デザイナー
JDBC URL の作成については、CData JDBC Driver for HubDBに組み込まれた接続文字列デザイナーを使用してください。JAR ファイルをダブルクリックするか、コマンドラインからjar ファイルを実行します。
java -jar cdata.jdbc.hubdb.jar
接続プロパティを入力し、接続文字列をクリップボードにコピーします。
HubDBデータソースへの接続には、パブリックHubSpotアプリケーションを使用したOAuth認証とプライベートアプリケーショントークンを使用した認証の2つの方法があります。
カスタムOAuthアプリを使用する
すべてのOAuthフローでAuthSchemeを"OAuth"に設定する必要があります。特定の認証ニーズ(デスクトップアプリケーション、Webアプリケーション、ヘッドレスマシン)に必要な接続プロパティについては、ヘルプドキュメントを確認してください。
アプリケーションを登録し、OAuthクライアント認証情報を取得するには、以下の手順を実行してください。
- HubSpotアプリ開発者アカウントにログインします。
- アプリ開発者アカウントである必要があります。標準のHubSpotアカウントではパブリックアプリを作成できません。
- 開発者アカウントのホームページで、アプリタブをクリックします。
- アプリを作成をクリックします。
- アプリ情報タブで、ユーザーが接続する際に表示される値を入力し、必要に応じて変更します。これらの値には、パブリックアプリケーション名、アプリケーションロゴ、アプリケーションの説明が含まれます。
- 認証タブで、「リダイレクトURL」ボックスにコールバックURLを入力します。
- デスクトップアプリケーションを作成する場合は、http://localhost:33333のようなローカルにアクセス可能なURLに設定します。
- Webアプリケーションを作成する場合は、ユーザーがアプリケーションを承認した際にリダイレクトされる信頼できるURLに設定します。
- アプリを作成をクリックします。HubSpotがアプリケーションとそれに関連する認証情報を生成します。
- 認証タブで、クライアントIDとクライアントシークレットを確認します。これらは後でドライバーを設定する際に使用します。
スコープの下で、アプリケーションの意図する機能に必要なスコープを選択します。
テーブルにアクセスするには、最低限以下のスコープが必要です:
- hubdb
- oauth
- crm.objects.owners.read
- 変更を保存をクリックします。
- 統合に必要な機能にアクセスできる本番ポータルにアプリケーションをインストールします。
- 「インストールURL(OAuth)」の下で、完全なURLをコピーをクリックして、アプリケーションのインストールURLをコピーします。
- コピーしたリンクをブラウザで開きます。アプリケーションをインストールする標準アカウントを選択します。
- アプリを接続をクリックします。結果のタブは閉じて構いません。
プライベートアプリを使用する
HubSpotプライベートアプリケーショントークンを使用して接続するには、AuthSchemeプロパティを"PrivateApp"に設定します。
以下の手順に従ってプライベートアプリケーショントークンを生成できます:
- HubDBアカウントで、メインナビゲーションバーの設定アイコン(歯車)をクリックします。
- 左サイドバーメニューで、統合 > プライベートアプリに移動します。
- プライベートアプリを作成をクリックします。
- 基本情報タブで、アプリケーションの詳細(名前、ロゴ、説明)を設定します。
- スコープタブで、プライベートアプリケーションがアクセスできるようにしたい各スコープに対して読み取りまたは書き込みを選択します。
- テーブルにアクセスするには、最低限hubdbとcrm.objects.owners.readが必要です。
- アプリケーションの設定が完了したら、右上のアプリを作成をクリックします。
- アプリケーションのアクセストークンに関する情報を確認し、作成を続行をクリックし、その後トークンを表示をクリックします。
- コピーをクリックして、プライベートアプリケーショントークンをコピーします。
接続するには、PrivateAppTokenを取得したプライベートアプリケーショントークンに設定します。
- HubSpotアプリ開発者アカウントにログインします。
- topic.prefix: コネクタによって作成されるKafka トピックに追加されるプレフィックスです。「hubdb-01-」に設定されています。
- mode: コネクタの動作モードを指定します。ここでは「bulk」に設定されており、コネクタがバルクデータ転送を実行するように設定されていることを示しています。
このリクエストにより、HubDB のすべてのテーブル/コンテンツがKafka トピックとして追加されます。
注意: リクエストをPOST するIP アドレス(サーバー)は、Linux ネットワークのIP アドレスです。
- ksqlDB を実行し、トピックを一覧表示します。以下のコマンドを使用します:
ksql list topics;
- トピック内のデータを表示するには、以下のSQL ステートメントを入力します:
PRINT topic FROM BEGINNING;
Confluent Control Center への接続
Confluent Control Center のユーザーインターフェースにアクセスするには、上記のセクションで説明した"confluent local services" を実行し、ローカルブラウザでhttp://<server address>:9021/clusters/ と入力してください。
おわりに
CData JDBC Driver for HubDBの30日間無償トライアルをダウンロードして、HubDB データをApache Kafka にストリーミングしましょう。ご不明な点があれば、サポートチームまでお問い合わせください。