Azure Data Lake Storage のデータをApache Kafka トピックにストリーミング

Dibyendu Datta
Dibyendu Datta
Lead Technology Evangelist
CData JDBC Driver とKafka Connect JDBC コネクタを使用して、Apache Kafka でAzure Data Lake Storage のデータにアクセスし、ストリーミングできます。

Apache Kafka は、主にリアルタイムデータパイプラインやイベント駆動型アプリケーションの構築に使用されるオープンソースのストリーム処理プラットフォームです。CData JDBC Driver for Azure Data Lake Storageと組み合わせることで、Kafka はライブのAzure Data Lake Storage のデータを扱うことができます。この記事では、Azure Data Lake Storage データをApache Kafka トピックに接続、アクセス、ストリーミングする方法と、Confluent Control Center を起動してConfluent プラットフォームのKafka インフラストラクチャを使用して受信したAzure Data Lake Storage のデータをユーザーが安全に管理および監視できるようにする方法について説明します。

CData JDBC Driver は最適化されたデータ処理機能を内蔵しており、ライブのAzure Data Lake Storage のデータとのやり取りにおいて比類のないパフォーマンスを提供します。Azure Data Lake Storage に複雑なSQL クエリを発行すると、ドライバーはフィルタや集計などのサポートされているSQL 操作を直接Azure Data Lake Storage にプッシュし、サポートされていない操作(多くの場合SQL 関数やJOIN 操作)については組み込みのSQL エンジンを使用してクライアント側で処理します。組み込みの動的メタデータクエリにより、ネイティブのデータ型を使用してAzure Data Lake Storage のデータを操作および分析できます。

前提条件

Apache Kafka トピックでAzure Data Lake Storage のデータをストリーミングするためにCData JDBC Driver を接続する前に、クライアントのLinux ベースのシステムに以下をインストールおよび設定してください。

  1. Confluent Platform for Apache Kafka
  2. Confluent Hub CLI のインストール
  3. Confluent Platform 用のSelf-Managed Kafka JDBC Source Connector

Azure Data Lake Storage のデータへの新しいJDBC 接続を定義

  1. Linux ベースのシステムにCData JDBC Driver for Azure Data Lake Storageをダウンロードします。
  2. 以下の手順に従って新しいディレクトリを作成し、すべてのドライバーの内容を展開します:
    1. Azure Data Lake Storage という名前の新しいディレクトリを作成します。
      		mkdir ADLS
      		
    2. ダウンロードしたドライバーファイル(.zip)をこの新しいディレクトリに移動します。
      		mv ADLSJDBCDriver.zip ADLS/
      		
    3. CData ADLSJDBCDriver の内容をこの新しいディレクトリに解凍します。
      		unzip ADLSJDBCDriver.zip
      		
  3. Azure Data Lake Storage ディレクトリを開き、lib フォルダに移動します。
    ls
    cd lib/
    
  4. CData JDBC Driver for Azure Data Lake Storagelib フォルダの内容をKafka Connect JDBClib フォルダにコピーします。Kafka Connect JDBC フォルダの内容を確認し、cdata.jdbc.adls.jar ファイルがlib フォルダに正常にコピーされたことを確認します。
    cp -r /path/to/CData JDBC Driver for Azure Data Lake Storage/lib/* /usr/share/confluent-hub-components/confluentinc-kafka-connect-jdbc/lib/
    cd /usr/share/confluent-hub-components/confluentinc-kafka-connect-jdbc/lib/
    
  5. 以下のコマンドを使用して、CData Azure Data Lake Storage JDBC ドライバーのライセンスをインストールします。名前とメールアドレスを入力してください。
    	java -jar cdata.jdbc.adls.jar -l
    	
  6. プロダクトキーまたは"TRIAL" を入力します(ライセンスの有効期限が切れた場合は、CData サポートチームまでお問い合わせください)。
  7. 以下のコマンドを使用してConfluent ローカルサービスを起動します:
    	confluent local services start
    	

    これにより、Zookeeper、Kafka、Schema Registry、Kafka REST、Kafka CONNECT、ksqlDB、Control Center などのすべてのConfluent サービスが起動します。これで、CData JDBC Driver for Azure Data Lake Storage を使用してKafka Connect Driver 経由でksqlDB のKafka トピックにメッセージをストリーミングする準備が整いました。

    Confluent ローカルサービスを起動
  8. POST HTTP API リクエストを使用してKafka トピックを手動で作成します:
     curl --location 'server_address:8083/connectors'
    	--header 'Content-Type: application/json'
    	--data '{
    		"name": "jdbc_source_cdata_adls_01",
    		"config": {
    			"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    			"connection.url": "jdbc:adls:Schema=ADLSGen2;Account=myAccount;FileSystem=myFileSystem;AccessKey=myAccessKey;",
    		"topic.prefix": "adls-01-",
    		"mode": "bulk"
    		}
    	}'
    

    HTTP POST 本文(上記)で使用されるフィールドについて説明します:

    • connector.class: 使用するKafka Connect コネクタのJava クラスを指定します。
    • connection.url: Azure Data Lake Storage データに接続するためのJDBC 接続URL です。

      組み込みの接続文字列デザイナー

      JDBC URL の作成については、CData JDBC Driver for Azure Data Lake Storageに組み込まれた接続文字列デザイナーを使用してください。JAR ファイルをダブルクリックするか、コマンドラインからjar ファイルを実行します。

      		java -jar cdata.jdbc.adls.jar
      		

      接続プロパティを入力し、接続文字列をクリップボードにコピーします。

      Azure Data Lake Storage 接続プロパティの取得・設定方法

      Azure Data Lake Storage Gen2 への接続

      それでは、Gen2 Data Lake Storage アカウントに接続していきましょう。接続するには、以下のプロパティを設定します。

      • Account:ストレージアカウントの名前
      • FileSystem:このアカウントに使用されるファイルシステム名。例えば、Azure Blob コンテナの名前
      • Directory(オプション):レプリケートされたファイルが保存される場所へのパス。パスが指定されない場合、ファイルはルートディレクトリに保存されます

      Azure Data Lake Storage Gen2への認証

      続いて、認証方法を設定しましょう。CData 製品では、5つの認証方法をサポートしています:アクセスキー(AccessKey)の使用、共有アクセス署名(SAS)の使用、Azure Active Directory OAuth(AzureAD)経由、Azure サービスプリンシパル(AzureServicePrincipal またはAzureServicePrincipalCert)経由、およびManaged Service Identity(AzureMSI)経由です。

      アクセスキー

      アクセスキーを使用して接続するには、まずADLS Gen2ストレージアカウントで利用可能なアクセスキーを取得する必要があります。

      Azure ポータルでの手順は以下のとおりです:

      1. ADLS Gen2ストレージアカウントにアクセスします
      2. 設定でアクセスキーを選択します
      3. 利用可能なアクセスキーの1つの値をAccessKey 接続プロパティにコピーします

      接続の準備ができたら、以下のプロパティを設定してください。

      • AuthSchemeAccessKey
      • AccessKey:先ほどAzure ポータルで取得したアクセスキーの値

      共有アクセス署名(SAS)

      共有アクセス署名を使用して接続するには、まずAzure Storage Explorer ツールを使用して署名を生成する必要があります。

      接続の準備ができたら、以下のプロパティを設定してください。

      • AuthSchemeSAS
      • SharedAccessSignature:先ほど生成した共有アクセス署名の値

      その他の認証方法については、 href="/kb/help/" target="_blank">ヘルプドキュメントの「Azure Data Lake Storage Gen2への認証」セクションをご確認ください。

      組み込みの接続文字列デザイナーを使用してJDBC URL を生成(Salesforce の例)
    • topic.prefix: コネクタによって作成されるKafka トピックに追加されるプレフィックスです。「adls-01-」に設定されています。
    • mode: コネクタの動作モードを指定します。ここでは「bulk」に設定されており、コネクタがバルクデータ転送を実行するように設定されていることを示しています。

    このリクエストにより、Azure Data Lake Storage のすべてのテーブル/コンテンツがKafka トピックとして追加されます。

    注意: リクエストをPOST するIP アドレス(サーバー)は、Linux ネットワークのIP アドレスです。

  9. ksqlDB を実行し、トピックを一覧表示します。以下のコマンドを使用します:
    ksql
    list topics;
    
    Kafka トピックを一覧表示(BigCommerce の例)
  10. トピック内のデータを表示するには、以下のSQL ステートメントを入力します:
    PRINT topic FROM BEGINNING;
    

Confluent Control Center への接続

Confluent Control Center のユーザーインターフェースにアクセスするには、上記のセクションで説明した"confluent local services" を実行し、ローカルブラウザでhttp://<server address>:9021/clusters/ と入力してください。

Confluent Control Center に接続

おわりに

CData JDBC Driver for Azure Data Lake Storageの30日間無償トライアルをダウンロードして、Azure Data Lake Storage データをApache Kafka にストリーミングしましょう。ご不明な点があれば、サポートチームまでお問い合わせください。

はじめる準備はできましたか?

Azure Data Lake Storage Driver の無料トライアルをダウンロードしてお試しください:

 ダウンロード

詳細:

Azure Data Lake Storage Icon Azure Data Lake Storage JDBC Driver お問い合わせ

Azure Data Lake Storage データに連携するJava アプリケーションを素早く、簡単に開発できる便利なドライバー。