Node.js からKafka のデータをクエリ
CData API Server とADO.NET Provider for ApacheKafka(もしくは250+ の他のADO.NET Providers)を使って、Kafka をOData エンドポイントして公開し、Node.js からシンプルなHTTP リクエストでクエリを実現します。本記事ではAPI Server を使ってJSON でフォーマットされたKafka のデータをNode.js でリクエストする方法を説明します。
API Server の設定
以下のリンクからAPI Server の無償トライアルをスタートしたら、セキュアなKafka OData サービスを作成していきましょう。
Kafka への接続
NodeJS からKafka のデータを操作するには、まずKafka への接続を作成・設定します。
- API Server にログインして、「Connections」をクリック、さらに「接続を追加」をクリックします。
- 「接続を追加」をクリックして、データソースがAPI Server に事前にインストールされている場合は、一覧から「Kafka」を選択します。
- 事前にインストールされていない場合は、コネクタを追加していきます。コネクタ追加の手順は以下の記事にまとめてありますので、ご確認ください。
CData コネクタの追加方法はこちら >> - それでは、Kafka への接続設定を行っていきましょう!
-
Apache Kafka 接続プロパティの取得・設定方法
それでは、Apache Kafka に接続していきましょう。.NET ベースのエディションは、Confluent.Kafka およびlibrdkafka ライブラリに依存して機能します。 これらのアセンブリはインストーラーにバンドルされており、CData 製品と一緒に自動的にインストールされます。 別のインストール方法をご利用の場合は、NuGet から依存関係のあるConfluent.Kafka 2.6.0をインストールしてください。
Apache Kafka サーバーのアドレスを指定するには、BootstrapServers パラメータを使用します。
デフォルトでは、CData 製品はデータソースとPLAINTEXT で通信しており、これはすべてのデータが暗号化なしで送信されることを意味します。 通信を暗号化したい場合は、以下の設定を行ってください:
- UseSSL をtrue に設定し、CData 製品がSSL 暗号化を使用するように構成します
- SSLServerCert およびSSLServerCertType を設定して、サーバー証明書をロードします
Apache Kafka への認証
続いて、認証方法を設定しましょう。Apache Kafka データソースでは、以下の認証方法をサポートしています:
- Anonymous
- Plain
- SCRAM ログインモジュール
- SSL クライアント証明書
- Kerberos
Anonymous 認証
Apache Kafka の特定のオンプレミスデプロイメントでは、認証接続プロパティを設定することなくApache Kafka に接続できます。 このような接続はanonymous(匿名)と呼ばれます。
匿名認証を行うには、以下のプロパティを設定してください。
- AuthScheme:None
その他の認証方法については、ヘルプドキュメントをご確認ください。
- 接続情報の入力が完了したら、「保存およびテスト」をクリックします。
Apache Kafka 接続プロパティの取得・設定方法
それでは、Apache Kafka に接続していきましょう。.NET ベースのエディションは、Confluent.Kafka およびlibrdkafka ライブラリに依存して機能します。 これらのアセンブリはインストーラーにバンドルされており、CData 製品と一緒に自動的にインストールされます。 別のインストール方法をご利用の場合は、NuGet から依存関係のあるConfluent.Kafka 2.6.0をインストールしてください。
Apache Kafka サーバーのアドレスを指定するには、BootstrapServers パラメータを使用します。
デフォルトでは、CData 製品はデータソースとPLAINTEXT で通信しており、これはすべてのデータが暗号化なしで送信されることを意味します。 通信を暗号化したい場合は、以下の設定を行ってください:
- UseSSL をtrue に設定し、CData 製品がSSL 暗号化を使用するように構成します
- SSLServerCert およびSSLServerCertType を設定して、サーバー証明書をロードします
Apache Kafka への認証
続いて、認証方法を設定しましょう。Apache Kafka データソースでは、以下の認証方法をサポートしています:
- Anonymous
- Plain
- SCRAM ログインモジュール
- SSL クライアント証明書
- Kerberos
Anonymous 認証
Apache Kafka の特定のオンプレミスデプロイメントでは、認証接続プロパティを設定することなくApache Kafka に接続できます。 このような接続はanonymous(匿名)と呼ばれます。
匿名認証を行うには、以下のプロパティを設定してください。
- AuthScheme:None
その他の認証方法については、ヘルプドキュメントをご確認ください。
API Server のユーザー設定
次に、API Server 経由でKafka にアクセスするユーザーを作成します。「Users」ページでユーザーを追加・設定できます。やってみましょう。
- 「Users」ページで ユーザーを追加をクリックすると、「ユーザーを追加」ポップアップが開きます。
-
次に、「ロール」、「ユーザー名」、「権限」プロパティを設定し、「ユーザーを追加」をクリックします。
-
その後、ユーザーの認証トークンが生成されます。各ユーザーの認証トークンとその他の情報は「Users」ページで確認できます。
Kafka 用のAPI エンドポイントの作成
ユーザーを作成したら、Kafka のデータ用のAPI エンドポイントを作成していきます。
-
まず、「API」ページに移動し、
「 テーブルを追加」をクリックします。
-
アクセスしたい接続を選択し、次へをクリックします。
-
接続を選択した状態で、各テーブルを選択して確認をクリックすることでエンドポイントを作成します。
OData のエンドポイントを取得
以上でKafka への接続を設定してユーザーを作成し、API Server でKafka データのAPI を追加しました。これで、OData 形式のKafka データをREST API で利用できます。API Server の「API」ページから、API のエンドポイントを表示およびコピーできます。
Node.js からKafka OData フィードを利用
OData フィードはNode.js で簡単に使用できます。Node.js のHTTP クライアントを使用して、API Server のOData エンドポイントからJSON 形式のデータをリクエストしていきましょう。リクエストを行った後、レスポンスの本文を作成し、JSON.parse() 関数を呼び出してレコードに解析できます。
以下のコードはSampleTable_1 データに対して認証されたリクエストを行います。 以下のURL の例では、Column2 カラムの値が100 のレコードを検索する単純なフィルターを適用しています。
var http = require('http');
http.get({
protocol: "http:",
hostname:"MyServer.com",
port:MyPort,
path: "/api.rsc/SampleTable_1?$filter=" + encodeURIComponent("Column2 eq '100'"),
auth:'MyUser:MyAuthtoken'
},
function(res) {
var body = '';
res.on('data', function(chunk) {
body += chunk;
});
res.on('end', function() {
console.log(body);
var jsonData = JSON.parse(body);
});
}).on('error', function(e) {
console.log("Error: ", e);
});