IBM Cloud Object Storage のデータをApache Kafka トピックにストリーミング
Apache Kafka は、主にリアルタイムデータパイプラインやイベント駆動型アプリケーションの構築に使用されるオープンソースのストリーム処理プラットフォームです。CData JDBC Driver for IBM Cloud Object Storageと組み合わせることで、Kafka はライブのIBM Cloud Object Storage のデータを扱うことができます。この記事では、IBM Cloud Object Storage データをApache Kafka トピックに接続、アクセス、ストリーミングする方法と、Confluent Control Center を起動してConfluent プラットフォームのKafka インフラストラクチャを使用して受信したIBM Cloud Object Storage のデータをユーザーが安全に管理および監視できるようにする方法について説明します。
CData JDBC Driver は最適化されたデータ処理機能を内蔵しており、ライブのIBM Cloud Object Storage のデータとのやり取りにおいて比類のないパフォーマンスを提供します。IBM Cloud Object Storage に複雑なSQL クエリを発行すると、ドライバーはフィルタや集計などのサポートされているSQL 操作を直接IBM Cloud Object Storage にプッシュし、サポートされていない操作(多くの場合SQL 関数やJOIN 操作)については組み込みのSQL エンジンを使用してクライアント側で処理します。組み込みの動的メタデータクエリにより、ネイティブのデータ型を使用してIBM Cloud Object Storage のデータを操作および分析できます。
前提条件
Apache Kafka トピックでIBM Cloud Object Storage のデータをストリーミングするためにCData JDBC Driver を接続する前に、クライアントのLinux ベースのシステムに以下をインストールおよび設定してください。
- Confluent Platform for Apache Kafka
- Confluent Hub CLI のインストール
- Confluent Platform 用のSelf-Managed Kafka JDBC Source Connector
IBM Cloud Object Storage のデータへの新しいJDBC 接続を定義
- Linux ベースのシステムにCData JDBC Driver for IBM Cloud Object Storageをダウンロードします。
- 以下の手順に従って新しいディレクトリを作成し、すべてのドライバーの内容を展開します:
- IBM Cloud Object Storage という名前の新しいディレクトリを作成します。
mkdir IBMCloudObjectStorage
- ダウンロードしたドライバーファイル(.zip)をこの新しいディレクトリに移動します。
mv IBMCloudObjectStorageJDBCDriver.zip IBMCloudObjectStorage/
- CData IBMCloudObjectStorageJDBCDriver の内容をこの新しいディレクトリに解凍します。
unzip IBMCloudObjectStorageJDBCDriver.zip
- IBM Cloud Object Storage という名前の新しいディレクトリを作成します。
- IBM Cloud Object Storage ディレクトリを開き、lib フォルダに移動します。
ls cd lib/
- CData JDBC Driver for IBM Cloud Object Storage のlib フォルダの内容をKafka Connect JDBC のlib フォルダにコピーします。Kafka Connect JDBC フォルダの内容を確認し、cdata.jdbc.ibmcloudobjectstorage.jar ファイルがlib フォルダに正常にコピーされたことを確認します。
cp -r /path/to/CData JDBC Driver for IBM Cloud Object Storage/lib/* /usr/share/confluent-hub-components/confluentinc-kafka-connect-jdbc/lib/ cd /usr/share/confluent-hub-components/confluentinc-kafka-connect-jdbc/lib/
- 以下のコマンドを使用して、CData IBM Cloud Object Storage JDBC ドライバーのライセンスをインストールします。名前とメールアドレスを入力してください。
java -jar cdata.jdbc.ibmcloudobjectstorage.jar -l
- プロダクトキーまたは"TRIAL" を入力します(ライセンスの有効期限が切れた場合は、CData サポートチームまでお問い合わせください)。
- 以下のコマンドを使用してConfluent ローカルサービスを起動します:
confluent local services start
これにより、Zookeeper、Kafka、Schema Registry、Kafka REST、Kafka CONNECT、ksqlDB、Control Center などのすべてのConfluent サービスが起動します。これで、CData JDBC Driver for IBM Cloud Object Storage を使用してKafka Connect Driver 経由でksqlDB のKafka トピックにメッセージをストリーミングする準備が整いました。
- POST HTTP API リクエストを使用してKafka トピックを手動で作成します:
curl --location 'server_address:8083/connectors' --header 'Content-Type: application/json' --data '{ "name": "jdbc_source_cdata_ibmcloudobjectstorage_01", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "connection.url": "jdbc:ibmcloudobjectstorage:ApiKey=myApiKey;CloudObjectStorageCRN=MyInstanceCRN;Region=myRegion;OAuthClientId=MyOAuthClientId;OAuthClientSecret=myOAuthClientSecret;", "topic.prefix": "ibmcloudobjectstorage-01-", "mode": "bulk" } }'HTTP POST 本文(上記)で使用されるフィールドについて説明します:
- connector.class: 使用するKafka Connect コネクタのJava クラスを指定します。
- connection.url: IBM Cloud Object Storage データに接続するためのJDBC 接続URL です。
組み込みの接続文字列デザイナー
JDBC URL の作成については、CData JDBC Driver for IBM Cloud Object Storageに組み込まれた接続文字列デザイナーを使用してください。JAR ファイルをダブルクリックするか、コマンドラインからjar ファイルを実行します。
java -jar cdata.jdbc.ibmcloudobjectstorage.jar
接続プロパティを入力し、接続文字列をクリップボードにコピーします。
Cloud Object Storage 接続プロパティの取得・設定方法
Cloud Object Storage に接続する前に、Cloud Object Storage インスタンスを登録してCloud Object Storage API キーとCRN を取得していきます。
Cloud Object Storage の新規インスタンスの登録
IBM Cloud アカウントにCloud Object Storage がまだない場合は、以下の手順に従ってアカウントにSQL Query のインスタンスをインストールできます。
- IBM Cloud アカウントにログインします。
- Cloud Object Storage ページに移動して、インスタンス名を指定して「作成」をクリックします。Cloud Object Storage の新規インスタンスにリダイレクトされます。
API キー
API キーは以下の手順で取得できます。
- まずは、IBM Cloud アカウントにログインします。
- API キーページに移動します。
- 中央右隅のIBM Cloud APIキーの作成 をクリックして、新しいAPI キーを作成します。
- ポップアップウィンドウが表示されたら、API キーの名前を指定して作成をクリックします。ダッシュボードからはアクセスできなくなるため、API Key を控えておきましょう。
Cloud Object Storage CRN
デフォルトでは、CData 製品はCloud Object Storage CRN を自動で取得します。ただし、複数のアカウントがある場合は、CloudObjectStorageCRN を明示的に指定する必要があります。この値は、次の2つの方法で取得できます。
- Services ビューをクエリする。これにより、IBM Cloud Object Storage インスタンスとそれぞれのCRN がリストされます。
- IBM Cloud で直接CRN を見つける。これを行うには、IBM Cloud のダッシュボードに移動します。リソースリストで、ストレージからCloud Object Storage リソースを選択してCRN を取得します。
IBM Cloud Object Storage への接続
これで準備は完了です。以下の接続プロパティを設定してください。
- InitiateOAuth:GETANDREFRESH に設定。InitiateOAuth を使うと、OAuth 認証を繰り返す必要がなく、さらに自動でアクセストークンを設定できます。
- ApiKey:セットアップ中に控えたAPI キーを指定。
- CloudObjectStorageCRN(オプション):控えておいたCloud Object Storage のCRN に設定。Cloud Object Storage アカウントが複数ある場合のみ設定する必要があります。
プロパティを設定したら、これで接続設定は完了です。
- topic.prefix: コネクタによって作成されるKafka トピックに追加されるプレフィックスです。「ibmcloudobjectstorage-01-」に設定されています。
- mode: コネクタの動作モードを指定します。ここでは「bulk」に設定されており、コネクタがバルクデータ転送を実行するように設定されていることを示しています。
このリクエストにより、IBM Cloud Object Storage のすべてのテーブル/コンテンツがKafka トピックとして追加されます。
注意: リクエストをPOST するIP アドレス(サーバー)は、Linux ネットワークのIP アドレスです。
- ksqlDB を実行し、トピックを一覧表示します。以下のコマンドを使用します:
ksql list topics;
- トピック内のデータを表示するには、以下のSQL ステートメントを入力します:
PRINT topic FROM BEGINNING;
Confluent Control Center への接続
Confluent Control Center のユーザーインターフェースにアクセスするには、上記のセクションで説明した"confluent local services" を実行し、ローカルブラウザでhttp://<server address>:9021/clusters/ と入力してください。
おわりに
CData JDBC Driver for IBM Cloud Object Storageの30日間無償トライアルをダウンロードして、IBM Cloud Object Storage データをApache Kafka にストリーミングしましょう。ご不明な点があれば、サポートチームまでお問い合わせください。