AWS Lambda でリアルタイムKafka のデータにアクセス
AWS Lambda は、新しい情報やイベントに素早く応答するアプリケーションを構築できるコンピューティングサービスです。CData JDBC Driver for Apache Kafka と組み合わせることで、AWS Lambda 関数からリアルタイムKafka のデータを操作できます。この記事では、Eclipse で構築した AWS Lambda 関数からKafka のデータに接続してクエリを実行する方法を説明します。
なお、この記事の執筆時点(2022年6月)では、AWS Toolkit for Eclipse がサポートする最新バージョンは Eclipse 2019-12 および Java 8 となっています。
最適化されたデータ処理機能を組み込んだ CData JDBC ドライバは、リアルタイムKafka のデータとのインタラクションにおいて卓越したパフォーマンスを発揮します。Kafka に対して複雑な SQL クエリを発行すると、ドライバーはフィルタや集計などのサポートされている SQL 操作を直接Kafkaにプッシュし、サポートされていない操作(主に SQL 関数や JOIN 操作)は組み込みの SQL エンジンを使用してクライアント側で処理します。さらに、動的メタデータクエリ機能により、ネイティブのデータ型を使用してKafka のデータの操作・分析が可能です。
接続プロパティの設定と接続文字列の構築
Apache Kafka 接続プロパティの取得・設定方法
それでは、Apache Kafka に接続していきましょう。.NET ベースのエディションは、Confluent.Kafka およびlibrdkafka ライブラリに依存して機能します。 これらのアセンブリはインストーラーにバンドルされており、CData 製品と一緒に自動的にインストールされます。 別のインストール方法をご利用の場合は、NuGet から依存関係のあるConfluent.Kafka 2.6.0をインストールしてください。
Apache Kafka サーバーのアドレスを指定するには、BootstrapServers パラメータを使用します。
デフォルトでは、CData 製品はデータソースとPLAINTEXT で通信しており、これはすべてのデータが暗号化なしで送信されることを意味します。 通信を暗号化したい場合は、以下の設定を行ってください:
- UseSSL をtrue に設定し、CData 製品がSSL 暗号化を使用するように構成します
- SSLServerCert およびSSLServerCertType を設定して、サーバー証明書をロードします
Apache Kafka への認証
続いて、認証方法を設定しましょう。Apache Kafka データソースでは、以下の認証方法をサポートしています:
- Anonymous
- Plain
- SCRAM ログインモジュール
- SSL クライアント証明書
- Kerberos
Anonymous 認証
Apache Kafka の特定のオンプレミスデプロイメントでは、認証接続プロパティを設定することなくApache Kafka に接続できます。 このような接続はanonymous(匿名)と呼ばれます。
匿名認証を行うには、以下のプロパティを設定してください。
- AuthScheme:None
その他の認証方法については、ヘルプドキュメントをご確認ください。
NOTE: AWS Lambda 関数で JDBC ドライバーを使用するには、ライセンス(製品版または試用版)とランタイムキー(RTK)が必要です。ライセンス(または試用版)の取得については、弊社営業チームまでお問い合わせください。
組み込みの接続文字列デザイナー
JDBC URL の構築には、Kafka JDBC Driver に組み込まれている接続文字列デザイナーを使用できます。JAR ファイルをダブルクリックするか、コマンドラインから JAR ファイルを実行してください。
java -jar cdata.jdbc.apachekafka.jar
接続プロパティ(RTK を含む)を入力し、接続文字列をクリップボードにコピーします。
AWS Lambda 関数の作成
- CData JDBC Driver for Apache Kafka のインストーラーをダウンロードし、パッケージを解凍して JAR ファイルを実行してドライバーをインストールします。
AWS Toolkit for Eclipse を使用して、Eclipse で新しい AWS Lambda Java プロジェクトを作成します。詳細な手順は AWS のチュートリアル(amazon.com)を参照してください。
この記事では、テーブル名を入力として渡せるように、プロジェクトのInput Type を「Custom」に設定します。
- CData JDBC Driver for Apache Kafka の JAR ファイル(cdata.jdbc.apachekafka.jar)をビルドパスに追加します。このファイルは INSTALL_PATH\lib\ にあります。
- Java クラスに以下の import 文を追加します。
import java.sql.Connection; import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.sql.Statement;
handleRequest メソッドの本体を以下のコードに置き換えます。DriverManager.getConnection メソッド呼び出し内の接続文字列は、実際の値に置き換えてください。
String query = "SELECT * FROM " + input; try { Class.forName("cdata.jdbc.apachekafka.ApacheKafkaDriver"); } catch (ClassNotFoundException ex) { context.getLogger().log("Error: class not found"); } Connection connection = null; try { connection = DriverManager.getConnection("jdbc:cdata:apachekafka:RTK=52465...;User=admin;Password=pass;BootStrapServers=https://localhost:9091;Topic=MyTopic;"); } catch (SQLException ex) { context.getLogger().log("Error getting connection: " + ex.getMessage()); } catch (Exception ex) { context.getLogger().log("Error: " + ex.getMessage()); } if(connection != null) { context.getLogger().log("Connected Successfully!\n"); } ResultSet resultSet = null; try { //executing query Statement stmt = connection.createStatement(); resultSet = stmt.executeQuery(query); ResultSetMetaData metaData = resultSet.getMetaData(); int numCols = metaData.getColumnCount(); //printing the results while(resultSet.next()) { for(int i = 1; i <= numCols; i++) { System.out.printf("%-25s", (resultSet.getObject(i) != null) ? resultSet.getObject(i).toString().replaceAll("\n", "") : null ); } System.out.print("\n"); } } catch (SQLException ex) { System.out.println("SQL Exception: " + ex.getMessage()); } catch (Exception ex) { System.out.println("General exception: " + ex.getMessage()); } String output = "query: " + query + " complete"; return output;
Lambda 関数のデプロイと実行
Eclipse で関数をビルドしたら、アップロードして実行する準備が整います。この記事では出力を AWS ログに書き込んでいますが、これをテンプレートとして、AWS Lambda 関数でKafka のデータを操作する独自のカスタムビジネスロジックを実装できます。
- パッケージを右クリックして、Amazon Web Services -> Upload function to AWS Lambda を選択します。
- 関数に名前を付け、IAM ロールを選択し、タイムアウト値を関数が完了するのに十分な値に設定します(クエリの結果サイズによって異なります)。
- パッケージを右クリックして、Amazon Web Services -> Run function on AWS Lambda を選択し、クエリ対象のKafkaオブジェクト名(例:「SampleTable_1」)を入力します。
- ジョブの実行後、CloudWatch ログで出力を確認できます。
無償トライアル・詳細情報
CData JDBC Driver for Apache Kafka の30日間の無償トライアルをダウンロードして、AWS Lambda でリアルタイムKafka のデータを活用してみてください。ご不明な点があれば、サポートチームまでお気軽にお問い合わせください。