Bitbucket のデータをApache Kafka トピックにストリーミング
Apache Kafka は、主にリアルタイムデータパイプラインやイベント駆動型アプリケーションの構築に使用されるオープンソースのストリーム処理プラットフォームです。CData JDBC Driver for Bitbucketと組み合わせることで、Kafka はライブのBitbucket のデータを扱うことができます。この記事では、Bitbucket データをApache Kafka トピックに接続、アクセス、ストリーミングする方法と、Confluent Control Center を起動してConfluent プラットフォームのKafka インフラストラクチャを使用して受信したBitbucket のデータをユーザーが安全に管理および監視できるようにする方法について説明します。
CData JDBC Driver は最適化されたデータ処理機能を内蔵しており、ライブのBitbucket のデータとのやり取りにおいて比類のないパフォーマンスを提供します。Bitbucket に複雑なSQL クエリを発行すると、ドライバーはフィルタや集計などのサポートされているSQL 操作を直接Bitbucket にプッシュし、サポートされていない操作(多くの場合SQL 関数やJOIN 操作)については組み込みのSQL エンジンを使用してクライアント側で処理します。組み込みの動的メタデータクエリにより、ネイティブのデータ型を使用してBitbucket のデータを操作および分析できます。
前提条件
Apache Kafka トピックでBitbucket のデータをストリーミングするためにCData JDBC Driver を接続する前に、クライアントのLinux ベースのシステムに以下をインストールおよび設定してください。
- Confluent Platform for Apache Kafka
- Confluent Hub CLI のインストール
- Confluent Platform 用のSelf-Managed Kafka JDBC Source Connector
Bitbucket のデータへの新しいJDBC 接続を定義
- Linux ベースのシステムにCData JDBC Driver for Bitbucketをダウンロードします。
- 以下の手順に従って新しいディレクトリを作成し、すべてのドライバーの内容を展開します:
- Bitbucket という名前の新しいディレクトリを作成します。
mkdir Bitbucket
- ダウンロードしたドライバーファイル(.zip)をこの新しいディレクトリに移動します。
mv BitbucketJDBCDriver.zip Bitbucket/
- CData BitbucketJDBCDriver の内容をこの新しいディレクトリに解凍します。
unzip BitbucketJDBCDriver.zip
- Bitbucket という名前の新しいディレクトリを作成します。
- Bitbucket ディレクトリを開き、lib フォルダに移動します。
ls cd lib/
- CData JDBC Driver for Bitbucket のlib フォルダの内容をKafka Connect JDBC のlib フォルダにコピーします。Kafka Connect JDBC フォルダの内容を確認し、cdata.jdbc.bitbucket.jar ファイルがlib フォルダに正常にコピーされたことを確認します。
cp -r /path/to/CData JDBC Driver for Bitbucket/lib/* /usr/share/confluent-hub-components/confluentinc-kafka-connect-jdbc/lib/ cd /usr/share/confluent-hub-components/confluentinc-kafka-connect-jdbc/lib/
- 以下のコマンドを使用して、CData Bitbucket JDBC ドライバーのライセンスをインストールします。名前とメールアドレスを入力してください。
java -jar cdata.jdbc.bitbucket.jar -l
- プロダクトキーまたは"TRIAL" を入力します(ライセンスの有効期限が切れた場合は、CData サポートチームまでお問い合わせください)。
- 以下のコマンドを使用してConfluent ローカルサービスを起動します:
confluent local services start
これにより、Zookeeper、Kafka、Schema Registry、Kafka REST、Kafka CONNECT、ksqlDB、Control Center などのすべてのConfluent サービスが起動します。これで、CData JDBC Driver for Bitbucket を使用してKafka Connect Driver 経由でksqlDB のKafka トピックにメッセージをストリーミングする準備が整いました。
- POST HTTP API リクエストを使用してKafka トピックを手動で作成します:
curl --location 'server_address:8083/connectors' --header 'Content-Type: application/json' --data '{ "name": "jdbc_source_cdata_bitbucket_01", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "connection.url": "jdbc:bitbucket:Workspace=myworkspaceslug;Schema=Information", "topic.prefix": "bitbucket-01-", "mode": "bulk" } }'HTTP POST 本文(上記)で使用されるフィールドについて説明します:
- connector.class: 使用するKafka Connect コネクタのJava クラスを指定します。
- connection.url: Bitbucket データに接続するためのJDBC 接続URL です。
組み込みの接続文字列デザイナー
JDBC URL の作成については、CData JDBC Driver for Bitbucketに組み込まれた接続文字列デザイナーを使用してください。JAR ファイルをダブルクリックするか、コマンドラインからjar ファイルを実行します。
java -jar cdata.jdbc.bitbucket.jar
接続プロパティを入力し、接続文字列をクリップボードにコピーします。
ほとんどのクエリでは、ワークスペースを設定する必要があります。唯一の例外は、Workspacesテーブルです。このテーブルはこのプロパティの設定を必要とせず、クエリを実行すると、Workspaceの設定に使用できるワークスペーススラッグのリストが提供されます。このテーブルにクエリを実行するには、スキーマを'Information'に設定し、SELECT * FROM Workspacesクエリを実行する必要があります。
Schemaを'Information'に設定すると、一般的な情報が表示されます。Bitbucketに接続するには、以下のパラメータを設定してください。
- Schema: ワークスペースのユーザー、リポジトリ、プロジェクトなどの一般的な情報を表示するには、これを'Information'に設定します。それ以外の場合は、クエリを実行するリポジトリまたはプロジェクトのスキーマに設定します。利用可能なスキーマの完全なセットを取得するには、sys_schemasテーブルにクエリを実行してください。
- Workspace: Workspacesテーブルにクエリを実行する場合を除き、必須です。Workspacesテーブルへのクエリにはこのプロパティは必要ありません。そのクエリはWorkspaceの設定に使用できるワークスペーススラッグのリストのみを返すためです。
Bitbucketでの認証
BitbucketはOAuth認証のみをサポートしています。すべてのOAuthフローからこの認証を有効にするには、カスタムOAuthアプリケーションを作成し、AuthSchemeをOAuthに設定する必要があります。
特定の認証ニーズ(デスクトップアプリケーション、Webアプリケーション、ヘッドレスマシン)に必要な接続プロパティについては、ヘルプドキュメントを必ず確認してください。
カスタムOAuthアプリケーションの作成
Bitbucketアカウントから、以下のステップを実行します。
- 設定(歯車アイコン)に移動し、ワークスペース設定を選択します。
- アプリと機能セクションで、OAuthコンシューマーを選択します。
- コンシューマーを追加をクリックします。
- カスタムアプリケーションの名前と説明を入力します。
- コールバックURLを設定します。
- デスクトップアプリケーションとヘッドレスマシンの場合、http://localhost:33333または任意のポート番号を使用します。ここで設定するURIがCallbackURLプロパティになります。
- Webアプリケーションの場合、信頼できるリダイレクトURLにコールバックURLを設定します。このURLは、ユーザーがアプリケーションにアクセスが許可されたことを確認するトークンを持って戻るWebの場所です。
- クライアント認証情報を使用して認証する予定の場合、これはプライベートコンシューマーですを選択する必要があります。ドライバーでは、AuthSchemeをclientに設定する必要があります。
- OAuthアプリケーションに与える権限を選択します。これにより、読み取りおよび書き込みできるデータが決まります。
- 新しいカスタムアプリケーションを保存するには、保存をクリックします。
- アプリケーションが保存された後、それを選択して設定を表示できます。アプリケーションのKeyとSecretが表示されます。これらを将来の使用のために記録してください。Keyを使用してOAuthClientIdを設定し、Secretを使用してOAuthClientSecretを設定します。
- topic.prefix: コネクタによって作成されるKafka トピックに追加されるプレフィックスです。「bitbucket-01-」に設定されています。
- mode: コネクタの動作モードを指定します。ここでは「bulk」に設定されており、コネクタがバルクデータ転送を実行するように設定されていることを示しています。
このリクエストにより、Bitbucket のすべてのテーブル/コンテンツがKafka トピックとして追加されます。
注意: リクエストをPOST するIP アドレス(サーバー)は、Linux ネットワークのIP アドレスです。
- ksqlDB を実行し、トピックを一覧表示します。以下のコマンドを使用します:
ksql list topics;
- トピック内のデータを表示するには、以下のSQL ステートメントを入力します:
PRINT topic FROM BEGINNING;
Confluent Control Center への接続
Confluent Control Center のユーザーインターフェースにアクセスするには、上記のセクションで説明した"confluent local services" を実行し、ローカルブラウザでhttp://<server address>:9021/clusters/ と入力してください。
おわりに
CData JDBC Driver for Bitbucketの30日間無償トライアルをダウンロードして、Bitbucket データをApache Kafka にストリーミングしましょう。ご不明な点があれば、サポートチームまでお問い合わせください。