AWS Lambda でリアルタイムRabbitMQ のデータにアクセス(IntelliJ IDEA を使用)

Dibyendu Datta
Dibyendu Datta
Lead Technology Evangelist
IntelliJ IDEA と CData JDBC Driver を使用して、AWS Lambda からRabbitMQのリアルタイムデータに接続。

AWS Lambda は、新しい情報やイベントに素早く応答するアプリケーションを構築できるコンピューティングサービスです。CData API Driver for JDBC と組み合わせることで、AWS Lambda 関数からリアルタイムRabbitMQ のデータを操作できます。この記事では、IntelliJ で Maven を使用して AWS Lambda 関数を構築し、RabbitMQ のデータに接続してクエリを実行する方法を説明します。

最適化されたデータ処理機能を組み込んだ CData JDBC ドライバは、リアルタイムRabbitMQ のデータとのインタラクションにおいて卓越したパフォーマンスを発揮します。RabbitMQ に対して複雑な SQL クエリを発行すると、ドライバーはフィルタや集計などのサポートされている SQL 操作を直接RabbitMQにプッシュし、サポートされていない操作(主に SQL 関数や JOIN 操作)は組み込みの SQL エンジンを使用してクライアント側で処理します。さらに、動的メタデータクエリ機能により、ネイティブのデータ型を使用してRabbitMQ のデータの操作・分析が可能です。

ステップ1:接続プロパティの設定と接続文字列の構築

CData API Driver for JDBC のインストーラーをダウンロードし、パッケージを解凍して JAR ファイルを実行してドライバーをインストールします。次に、必要な接続プロパティを収集します。

RabbitMQ Management HTTP API について

RabbitMQ は、複数のメッセージングプロトコルをサポートするオープンソースのメッセージブローカーです。RabbitMQ Management HTTP API は、RabbitMQ サーバーの管理データと監視データに HTTP 経由でアクセスする手段を提供します。この API では、仮想ホスト、エクスチェンジ、キュー、バインディング、コネクション、チャネル、コンシューマー、ユーザー、権限、ポリシー、クラスター全体の統計情報を取得できます。

HTTP API を利用するには、RabbitMQ サーバーで Management プラグインを有効化する必要があります。デフォルトでは、管理インターフェースはポート 15672 でリッスンします。

Basic 認証の設定

RabbitMQ Management HTTP API は HTTP Basic 認証を使用します。RabbitMQ 管理ユーザーのユーザー名とパスワードを指定する必要があります。

管理 API へのアクセスを有効にするには、以下のステップで進めます:

  1. サーバーで RabbitMQ Management プラグインが有効になっていることを確認します(rabbitmq-plugins enable rabbitmq_management)。
  2. 既存の管理ユーザーを使用するか、適切な管理タグ(management、policymaker、monitoring、または administrator)を持つユーザーを作成します。
  3. RabbitMQ Management HTTP API の完全なベース URL を控えておきます(例:http://localhost:15672)。

RabbitMQ サーバーを設定したら、以下の接続プロパティを設定して接続します:

  • AuthScheme:Basic に設定します。
  • URL:RabbitMQ Management HTTP API のベース URL に設定します(例:http://localhost:15672)。
  • User:RabbitMQ の管理ユーザー名に設定します(例:guest)。
  • Password:RabbitMQ の管理パスワードに設定します。

接続文字列の例:

Profile=C:\profiles\RabbitMQ.apip;AuthScheme=Basic;URL=http://localhost:15672;User=guest;Password=guest;

利用可能なテーブル

RabbitMQ プロファイルでは、以下のテーブルにアクセスできます:

  • Overview - クラスター全体の統計情報と RabbitMQ ノードに関する情報
  • Nodes - RabbitMQ クラスター内の個々のノードに関する情報
  • NodeMemory - 特定のクラスターノードの詳細なメモリ使用状況の内訳
  • Connections - ブローカーへのすべてのオープンな AMQP コネクションの一覧
  • Channels - すべてのコネクションにわたるオープンな AMQP チャネルの一覧
  • Consumers - すべてのキューに登録されたコンシューマーの一覧
  • Exchanges - すべての仮想ホストで宣言されたエクスチェンジの一覧
  • Queues - すべての仮想ホストで宣言されたキューの一覧
  • Bindings - エクスチェンジとキュー間のすべてのバインディングの一覧
  • VirtualHosts - ブローカーに設定された仮想ホストの一覧
  • VhostPermissions - 特定の仮想ホスト内のユーザー権限
  • Users - すべての RabbitMQ ユーザーの一覧
  • Permissions - すべての仮想ホストにわたる全ユーザーの権限レコード
  • TopicPermissions - 全ユーザーのトピックレベルの権限レコード
  • Policies - 仮想ホスト内のキューおよびエクスチェンジに適用されたポリシーの一覧
  • OperatorPolicies - 仮想ホスト内のキューに適用されたオペレーターポリシーの一覧
  • Parameters - 仮想ホストごとのコンポーネントパラメータ(例:federation、shovel)の一覧
  • GlobalParameters - すべての仮想ホストに適用されるグローバルパラメータの一覧
  • VhostLimits - 特定の仮想ホストに設定されたリソース制限
  • UserLimits - 特定のユーザーに設定されたリソース制限
  • FeatureFlags - フィーチャーフラグの一覧と、ノード上での有効/無効の状態
  • DeprecatedFeatures - 非推奨機能の一覧と、その使用状態
  • AuthAttempts - ノードの認証試行統計
  • ClusterName - RabbitMQ クラスターの名前
  • WhoAmI - 現在認証されている管理ユーザーに関する情報
  • ExchangeBindingsSource - 特定のエクスチェンジがソースとなっているバインディング
  • ExchangeBindingsDestination - 特定のエクスチェンジが宛先となっているバインディング
  • QueueBindings - 仮想ホスト内の特定のキューのバインディング

NOTE: AWS Lambda 関数で JDBC ドライバーを使用するには、ライセンス(製品版または試用版)とランタイムキー(RTK)が必要です。ライセンス(または試用版)の取得については、弊社営業チームまでお問い合わせください

組み込みの接続文字列デザイナー

JDBC URL の構築には、RabbitMQ JDBC Driver に組み込まれている接続文字列デザイナーを使用できます。JAR ファイルをダブルクリックするか、コマンドラインから JAR ファイルを実行してください。

java -jar cdata.jdbc.api.jar

接続プロパティ(RTK を含む)を入力し、接続文字列をクリップボードにコピーします。

ステップ2:IntelliJ でプロジェクトを作成

  1. IntelliJ IDEA で「New Project」をクリックします。
  2. Generators から「Maven Archetype」を選択します。
  3. プロジェクトに名前を付け、Archetype として「maven.archetypes:maven-archetype-quickstart」を選択します。
  4. 「Create」をクリックします。

CData API Driver for JDBC JAR ファイルのインストール

プロジェクトのルートフォルダから以下の Maven コマンドを実行して、JAR ファイルをプロジェクトにインストールします。

mvn install:install-file -Dfile="PATH/TO/CData API Driver for JDBC 20XX/lib/cdata.jdbc.api.jar" -DgroupId="org.cdata.connectors" -DartifactId="cdata-api-connector" -Dversion="23" -Dpackaging=jar

依存関係の追加

Maven プロジェクトの pom.xml ファイル内で、AWS とCData API Driver for JDBCを依存関係として追加します(<dependencies> 要素内に以下の XML を追加)。

  • AWS
    <dependency>
       <groupId>com.amazonaws</groupId>
       <artifactId>aws-lambda-java-core</artifactId>
       <version>1.2.2</version> <!--Replace with the actual version-->
    </dependency>
  • CData API Driver for JDBC
    <dependency>
       <groupId>org.cdata.connectors</groupId>
       <artifactId>cdata-api-connector</artifactId>
       <version>25</version> <!--Replace with the actual version-->
    </dependency>
  • Fat JAR 作成用の Maven Shade Plugin
    <build>
      <plugins>
        <plugin>
          <groupId>org.apache.maven.plugins</groupId>
          <artifactId>maven-shade-plugin</artifactId>
          <version>3.4.1</version>
          <executions>
            <execution>
              <phase>package</phase>
              <goals>
                <goal>shade</goal>
              </goals>
              <configuration>
                <createDependencyReducedPom>false</createDependencyReducedPom>
                <transformers>
                  <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                    <mainClass>com.example.CDataLambda</mainClass>
                      <!-- Change to your actual Lambda handler class -->
                  </transformer>
                </transformers>
              </configuration>
            </execution>
          </executions>
        </plugin>
      </plugins>
    </build>

AWS Lambda 関数の作成

このサンプルプロジェクトでは、CDataLambda.java と CDataLambdaTest.java の2つのソースファイルを作成します。

Lambda 関数の定義

  1. CDataLambda クラスを AWS Lambda SDK の RequestHandler インターフェースを実装するように更新します。handleRequest メソッドを追加する必要があります。このメソッドは、Lambda 関数がトリガーされたときに以下のタスクを実行します:
    1. 入力を使用して SQL クエリを構築
    2. CData API Driver for JDBC を登録
    3. JDBC を使用してRabbitMQへの接続を確立
    4. RabbitMQ で SQL クエリを実行
    5. 結果をコンソールに出力
    6. 出力メッセージを返す
  2. 以下の完全な Lambda クラスを使用してください。インポート、クラス定義、handleRequest メソッドが含まれています。DriverManager.getConnection 呼び出し内の接続文字列値は、実際の値に置き換えてください。

    package com.example;
    
    import com.amazonaws.services.lambda.runtime.Context;
    import com.amazonaws.services.lambda.runtime.RequestHandler;
    
    import java.sql.Connection;
    import java.sql.DriverManager;
    import java.sql.ResultSet;
    import java.sql.ResultSetMetaData;
    import java.sql.SQLException;
    import java.sql.Statement;
    public class CDataLambda implements RequestHandler < Object, String > {
    
      @Override
      public String handleRequest(Object input, Context context) {
        String query = "SELECT * FROM " + input;
    
        String bucketName = "MY_AWS_BUCKET";
        try {
          Class.forName("cdata.jdbc.api.APIDriver");
          cdata.jdbc.api.APIDriver driver = new cdata.jdbc.api.APIDriver();
          DriverManager.registerDriver(driver);
        } catch (SQLException ex) {
          // Registering the driver failed
          throw new RuntimeException("Failed to register JDBC driver", ex);
        } catch (ClassNotFoundException e) {
          // The driver class was not found in the classpath
          throw new RuntimeException("JDBC Driver class not found", e);
    
        }
        Connection connection = null;
        try {
          connection = DriverManager.getConnection("jdbc:cdata:api:RTK=52465...;Profile=C:\profiles\\RabbitMQ.apip;AuthScheme=Basic;URL=http://localhost:15672;User=guest;Password=guest;");
        } catch (SQLException ex) {
          context.getLogger().log("Error getting connection: " + ex.getMessage());
        } catch (Exception ex) {
          context.getLogger().log("Error: " + ex.getMessage());
        }
    
        if (connection != null) {
          context.getLogger().log("Connected Successfully!
    ");
        }
    
        ResultSet resultSet = null;
        try {
          //executing query
          Statement stmt = connection.createStatement();
          resultSet = stmt.executeQuery(query);
    
          ResultSetMetaData metaData = resultSet.getMetaData();
          int numCols = metaData.getColumnCount();
    
          //printing the results
          while (resultSet.next()) {
            for (int i = 1; i <= numCols; i++) {
              System.out.printf("%-25s", (resultSet.getObject(i) != null) ? resultSet.getObject(i).toString().replaceAll("
    ", "") : null);
            }
            System.out.print("
    ");
          }
        } catch (SQLException ex) {
          System.out.println("SQL Exception: " + ex.getMessage());
        } catch (Exception ex) {
          System.out.println("General exception: " + ex.getMessage());
        }
        return "v24 query: " + query + " complete";
      }
    }
    
    

ステップ3:Lambda 関数のデプロイと実行

IntelliJ で関数をビルドしたら、Maven プロジェクト全体を単一の JAR ファイルとしてデプロイする準備が整います。

  1. IntelliJ で mvn install コマンドを使用して SNAPSHOT JAR ファイルをビルドします。

    Note: Maven Shade Plugin は target フォルダに2つの JAR を生成します。AWS Lambda には常に、すべての必要な依存関係を含むサイズの大きい -shaded.jar ファイルをアップロードしてください。

  2. AWS Lambda で新しい関数を作成します(または既存の関数を開きます)。
  3. 関数に名前を付け、IAM ロールを選択し、タイムアウト値を関数が完了するのに十分な値に設定します(クエリの結果サイズによって異なります)。
  4. 「Upload from」->「.zip file」をクリックし、SNAPSHOT JAR ファイルを選択します。
  5. 「Runtime settings」セクションで「Edit」をクリックし、Handler を handleRequest メソッドに設定します(例:package.class::handleRequest)。
  6. これで関数をテストできます。「Event JSON」フィールドにテーブル名を設定し、「Test」をクリックします。

無償トライアル・詳細情報

CData API Driver for JDBC の30日間の無償トライアルをダウンロードして、AWS Lambda でリアルタイムRabbitMQ のデータを活用してみてください。ご不明な点があれば、サポートチームまでお気軽にお問い合わせください。

はじめる準備はできましたか?

API Driver で RabbitMQ のライブデータに接続

RabbitMQ に接続