Kafka のデータで Updategram を使用する
この記事では、CData BizTalk Adapter for Kafka で実行できる Updategram を作成します。スキーマを設計し、それをテンプレートとして挿入、更新、削除の Updategram を生成します。
Kafka 用アダプターをプロジェクトに追加する
Add Adapter ウィザードを使用して、Visual Studio の BizTalk Server プロジェクトにアダプターを追加します。アダプターを使用して、変更したいテーブルに関するメタデータを Kafka に問い合わせます。
- Solution Explorer でプロジェクトを右クリックし、Add -> Add Generated Items をクリックします。
- 表示されるダイアログボックスで Add Adapter Metadata を選択します。
- 表示される Add Adapter Wizard で、リストビューから CData BizTalk Adapter for Kafka を選択します。
- Port メニューでは、選択を空白のままにします。または、アダプターを使用するよう構成された Receive Location か Send Port を選択します。
- Next をクリックすると、Schema Wizard が表示されます。
Updategram 用のスキーマを生成する
以下の手順で、Visual Studio の BizTalk Server プロジェクトでスキーマを作成します。
- まだ行っていない場合は、Kafka アダプターをプロジェクトに追加します。
- Add Adapter ウィザードの Connection String ページで、Send Port または Receive Location で構成されていない場合は、認証情報やその他の接続プロパティを入力します。一般的な接続文字列は以下のとおりです:
User=admin;Password=pass;BootStrapServers=https://localhost:9091;Topic=MyTopic;
Apache Kafka 接続プロパティの取得・設定方法
それでは、Apache Kafka に接続していきましょう。.NET ベースのエディションは、Confluent.Kafka およびlibrdkafka ライブラリに依存して機能します。 これらのアセンブリはインストーラーにバンドルされており、CData 製品と一緒に自動的にインストールされます。 別のインストール方法をご利用の場合は、NuGet から依存関係のあるConfluent.Kafka 2.6.0をインストールしてください。
Apache Kafka サーバーのアドレスを指定するには、BootstrapServers パラメータを使用します。
デフォルトでは、CData 製品はデータソースとPLAINTEXT で通信しており、これはすべてのデータが暗号化なしで送信されることを意味します。 通信を暗号化したい場合は、以下の設定を行ってください:
- UseSSL をtrue に設定し、CData 製品がSSL 暗号化を使用するように構成します
- SSLServerCert およびSSLServerCertType を設定して、サーバー証明書をロードします
Apache Kafka への認証
続いて、認証方法を設定しましょう。Apache Kafka データソースでは、以下の認証方法をサポートしています:
- Anonymous
- Plain
- SCRAM ログインモジュール
- SSL クライアント証明書
- Kerberos
Anonymous 認証
Apache Kafka の特定のオンプレミスデプロイメントでは、認証接続プロパティを設定することなくApache Kafka に接続できます。 このような接続はanonymous(匿名)と呼ばれます。
匿名認証を行うには、以下のプロパティを設定してください。
- AuthScheme:None
その他の認証方法については、ヘルプドキュメントをご確認ください。
必要な接続プロパティの詳細については、ヘルプドキュメントの "BizTalk Configuration" の章を参照してください。
- Schema Information ページで、General Options セクションの Send Port をクリックします。CommandType メニューから Updategram を選択します。Solicit-Response Send Port でアダプターを使用している場合は、One-Way オプションを無効にします。
次のページ(Statement Information)で、Updategram のタイプ(Insert、Update、Delete)を選択します。テーブル名とスキーマに含めるカラムを選択します。Update または Delete を行う場合は、Id カラムが必要です。
注:Updategram を作成する際、スキーマに含めたカラムのみを変更できます。
- Next をクリックしてスキーマのサマリーを表示し、ウィザードを終了してスキーマを作成します。生成された .xsd ファイルがプロジェクトに追加されます。
Insert、Update、Delete のインスタンスメッセージを生成する
Updategram スキーマを作成したら、.xsd ファイルを使用して Updategram を生成できます。Solution Explorer で .xsd ファイルを右クリックし、Generate Instance を選択します。このファイルをテンプレートとして、手動で Updategram を作成することもできます。以下は、Insert、Update、Delete 用に生成された Updategram インスタンスの例です:
Insert
INSERT の例を以下に示します。このインスタンスでは、データがどのように変更されるかを指定する after ブロックのみがあります。
<ns0:parameters xmlns:ns0="http://www.cdata.com/jp/ApacheKafkaProvider">
<ns0:sync>
<ns0:before></ns0:before>
<ns0:after>
<ns0:SampleTable_1 Id="Id_0" Column1="Column1_1" />
</ns0:after>
</ns0:sync>
</ns0:parameters>
Update
UPDATE の例を以下に示します。このインスタンスでは、before ブロック(テーブルの現在のデータ)と after ブロック(データがどのように変更されるか)の両方があります。
<ns0:parameters xmlns:ns0="http://www.cdata.com/jp/ApacheKafkaProvider">
<ns0:sync>
<ns0:before>
<ns0:SampleTable_1 Id=001d000000YBRseAAH></ns0:SampleTable_1>
</ns0:before>
<ns0:after>
<ns0:SampleTable_1 Id="Id_0" Column1="Column1_1" ></ns0:SampleTable_1>
</ns0:after>
</ns0:sync>
</ns0:parameters>
Delete
DELETE の例を以下に示します。after ブロックは空になり、アイテムが削除されることを示します。
<ns0:parameters xmlns:ns0="http://www.cdata.com/jp/ApacheKafkaProvider">
<ns0:sync>
<ns0:before>
<ns0:SampleTable_1 Id=001d000000YBRseAAH></ns0:SampleTable_1>
</ns0:before>
<ns0:after></ns0:after>
</ns0:sync>
</ns0:parameters>
スキーマの処理
Updategram を使用して Kafka レコードの挿入、更新、削除を行う方法については、チュートリアルを参照してください。