Kafka のデータで Updategram を使用する

Jerod Johnson
Jerod Johnson
Director, Technology Evangelism
Updategram を使用してKafka のデータの挿入、更新、削除を行うことができます。このガイドでは、CData BizTalk Adapter for Kafka を使用して Updategram スキーマとインスタンスを生成する方法を説明します。

この記事では、CData BizTalk Adapter for Kafka で実行できる Updategram を作成します。スキーマを設計し、それをテンプレートとして挿入、更新、削除の Updategram を生成します。

Kafka 用アダプターをプロジェクトに追加する

Add Adapter ウィザードを使用して、Visual Studio の BizTalk Server プロジェクトにアダプターを追加します。アダプターを使用して、変更したいテーブルに関するメタデータを Kafka に問い合わせます。

  1. Solution Explorer でプロジェクトを右クリックし、Add -> Add Generated Items をクリックします。
  2. 表示されるダイアログボックスで Add Adapter Metadata を選択します。
  3. 表示される Add Adapter Wizard で、リストビューから CData BizTalk Adapter for Kafka を選択します。
  4. Port メニューでは、選択を空白のままにします。または、アダプターを使用するよう構成された Receive Location か Send Port を選択します。
  5. Next をクリックすると、Schema Wizard が表示されます。

Updategram 用のスキーマを生成する

以下の手順で、Visual Studio の BizTalk Server プロジェクトでスキーマを作成します。

  1. まだ行っていない場合は、Kafka アダプターをプロジェクトに追加します。
  2. Add Adapter ウィザードの Connection String ページで、Send Port または Receive Location で構成されていない場合は、認証情報やその他の接続プロパティを入力します。一般的な接続文字列は以下のとおりです:
    User=admin;Password=pass;BootStrapServers=https://localhost:9091;Topic=MyTopic;
    

    Apache Kafka 接続プロパティの取得・設定方法

    それでは、Apache Kafka に接続していきましょう。.NET ベースのエディションは、Confluent.Kafka およびlibrdkafka ライブラリに依存して機能します。 これらのアセンブリはインストーラーにバンドルされており、CData 製品と一緒に自動的にインストールされます。 別のインストール方法をご利用の場合は、NuGet から依存関係のあるConfluent.Kafka 2.6.0をインストールしてください。

    Apache Kafka サーバーのアドレスを指定するには、BootstrapServers パラメータを使用します。

    デフォルトでは、CData 製品はデータソースとPLAINTEXT で通信しており、これはすべてのデータが暗号化なしで送信されることを意味します。 通信を暗号化したい場合は、以下の設定を行ってください:

    1. UseSSLtrue に設定し、CData 製品がSSL 暗号化を使用するように構成します
    2. SSLServerCert およびSSLServerCertType を設定して、サーバー証明書をロードします

    Apache Kafka への認証

    続いて、認証方法を設定しましょう。Apache Kafka データソースでは、以下の認証方法をサポートしています:

    • Anonymous
    • Plain
    • SCRAM ログインモジュール
    • SSL クライアント証明書
    • Kerberos

    Anonymous 認証

    Apache Kafka の特定のオンプレミスデプロイメントでは、認証接続プロパティを設定することなくApache Kafka に接続できます。 このような接続はanonymous(匿名)と呼ばれます。

    匿名認証を行うには、以下のプロパティを設定してください。

    • AuthSchemeNone

    その他の認証方法については、ヘルプドキュメントをご確認ください。

    必要な接続プロパティの詳細については、ヘルプドキュメントの "BizTalk Configuration" の章を参照してください。

  3. Schema Information ページで、General Options セクションの Send Port をクリックします。CommandType メニューから Updategram を選択します。Solicit-Response Send Port でアダプターを使用している場合は、One-Way オプションを無効にします。
  4. 次のページ(Statement Information)で、Updategram のタイプ(Insert、Update、Delete)を選択します。テーブル名とスキーマに含めるカラムを選択します。Update または Delete を行う場合は、Id カラムが必要です。

    :Updategram を作成する際、スキーマに含めたカラムのみを変更できます。

  5. Next をクリックしてスキーマのサマリーを表示し、ウィザードを終了してスキーマを作成します。生成された .xsd ファイルがプロジェクトに追加されます。

Insert、Update、Delete のインスタンスメッセージを生成する

Updategram スキーマを作成したら、.xsd ファイルを使用して Updategram を生成できます。Solution Explorer で .xsd ファイルを右クリックし、Generate Instance を選択します。このファイルをテンプレートとして、手動で Updategram を作成することもできます。以下は、Insert、Update、Delete 用に生成された Updategram インスタンスの例です:

Insert

INSERT の例を以下に示します。このインスタンスでは、データがどのように変更されるかを指定する after ブロックのみがあります。


<ns0:parameters xmlns:ns0="http://www.cdata.com/jp/ApacheKafkaProvider">
  <ns0:sync>
    <ns0:before></ns0:before>
    <ns0:after>
      <ns0:SampleTable_1 Id="Id_0" Column1="Column1_1" />
    </ns0:after>
  </ns0:sync>
</ns0:parameters>

Update

UPDATE の例を以下に示します。このインスタンスでは、before ブロック(テーブルの現在のデータ)と after ブロック(データがどのように変更されるか)の両方があります。


<ns0:parameters xmlns:ns0="http://www.cdata.com/jp/ApacheKafkaProvider">
  <ns0:sync>
    <ns0:before>
      <ns0:SampleTable_1 Id=001d000000YBRseAAH></ns0:SampleTable_1>
    </ns0:before>
    <ns0:after>
      <ns0:SampleTable_1 Id="Id_0" Column1="Column1_1" ></ns0:SampleTable_1>
    </ns0:after>
  </ns0:sync>
</ns0:parameters>

Delete

DELETE の例を以下に示します。after ブロックは空になり、アイテムが削除されることを示します。


<ns0:parameters xmlns:ns0="http://www.cdata.com/jp/ApacheKafkaProvider">
  <ns0:sync>
    <ns0:before>
      <ns0:SampleTable_1 Id=001d000000YBRseAAH></ns0:SampleTable_1>
    </ns0:before>
    <ns0:after></ns0:after>
  </ns0:sync>
</ns0:parameters>

スキーマの処理

Updategram を使用して Kafka レコードの挿入、更新、削除を行う方法については、チュートリアルを参照してください。

はじめる準備はできましたか?

詳細:

Kafka 連携ソリューション