Elasticsearch へLogstash 経由でRabbitMQ のデータをロードする方法
Elasticsearch は、人気の分散型全文検索エンジンです。データを一元的に格納することで、超高速検索や、関連性の細かな調整、パワフルな分析が大規模に、手軽に実行可能になります。Elasticsearch にはデータのローディングを行うパイプラインツール「Logstash」があります。CData Drivers を利用することができるので、30日の無償評価版をダウンロードしてあらゆるデータソースを簡単にElasticsearch に取り込んで検索・分析を行うことができます。
この記事では、CData Driver for API を使って、RabbitMQ のデータをLogstash 経由でElasticsearch にロードする手順を説明します。
Elasticsearch Logstash でCData JDBC Driver for API を使用
- CData JDBC Driver for API をLogstash が稼働するマシンにインストールします。
-
以下のパスにJDBC Driver がインストールされます(2022J の部分はご利用される製品バージョンによって異なります)。後ほどこのパスを使います。この.jar ファイル(製品版の場合は.lic ファイルも)をLogstash に配置します。
C:\Program Files\CData\CData JDBC Driver for API 2022J\lib\cdata.jdbc.api.jar
- 次に、Logstash とCData JDBC ドライバをつなぐ、JDBC Input Plugin をインストールします。JDBC Plugin は最新のLogstash だとデフォルトでついてきますが、バージョンによっては追加する必要があります。
https://www.elastic.co/guide/en/logstash/5.4/plugins-inputs-jdbc.html - CData JDBC ドライバの.jar ファイルと.lic ファイルを、Logstashの「/logstash-core/lib/jars/」に移動します。
Logstash でElasticsearch にRabbitMQ のデータを送る
それでは、Logstash でElasticsearch にRabbitMQ のデータの転送を行うための設定ファイルを作成していきます。
- Logstash のデータ処理定義であるlogstash.conf ファイルにRabbitMQ のデータを取得する処理を書きます。Input はJDBC、Output はElasticsearch にします。データローディングジョブの起動間隔は30秒に設定しています。
- CData JDBC ドライバの.jar をjdbc driver ライブラリにして、クラス名を設定、RabbitMQ への接続プロパティをJDBC URL の形でせっていします。JDBC URL ではほかにも詳細な設定を行うことができるので、細かくは製品ドキュメントをご覧ください。
- サーバーで RabbitMQ Management プラグインが有効になっていることを確認します(rabbitmq-plugins enable rabbitmq_management)。
- 既存の管理ユーザーを使用するか、適切な管理タグ(management、policymaker、monitoring、または administrator)を持つユーザーを作成します。
- RabbitMQ Management HTTP API の完全なベース URL を控えておきます(例:http://localhost:15672)。
- AuthScheme:Basic に設定します。
- URL:RabbitMQ Management HTTP API のベース URL に設定します(例:http://localhost:15672)。
- User:RabbitMQ の管理ユーザー名に設定します(例:guest)。
- Password:RabbitMQ の管理パスワードに設定します。
RabbitMQ Management HTTP API について
RabbitMQ は、複数のメッセージングプロトコルをサポートするオープンソースのメッセージブローカーです。RabbitMQ Management HTTP API は、RabbitMQ サーバーの管理データと監視データに HTTP 経由でアクセスする手段を提供します。この API では、仮想ホスト、エクスチェンジ、キュー、バインディング、コネクション、チャネル、コンシューマー、ユーザー、権限、ポリシー、クラスター全体の統計情報を取得できます。
HTTP API を利用するには、RabbitMQ サーバーで Management プラグインを有効化する必要があります。デフォルトでは、管理インターフェースはポート 15672 でリッスンします。
Basic 認証の設定
RabbitMQ Management HTTP API は HTTP Basic 認証を使用します。RabbitMQ 管理ユーザーのユーザー名とパスワードを指定する必要があります。
管理 API へのアクセスを有効にするには、以下のステップで進めます:
RabbitMQ サーバーを設定したら、以下の接続プロパティを設定して接続します:
接続文字列の例:
Profile=C:\profiles\RabbitMQ.apip;AuthScheme=Basic;URL=http://localhost:15672;User=guest;Password=guest;
利用可能なテーブル
RabbitMQ プロファイルでは、以下のテーブルにアクセスできます:
- Overview - クラスター全体の統計情報と RabbitMQ ノードに関する情報
- Nodes - RabbitMQ クラスター内の個々のノードに関する情報
- NodeMemory - 特定のクラスターノードの詳細なメモリ使用状況の内訳
- Connections - ブローカーへのすべてのオープンな AMQP コネクションの一覧
- Channels - すべてのコネクションにわたるオープンな AMQP チャネルの一覧
- Consumers - すべてのキューに登録されたコンシューマーの一覧
- Exchanges - すべての仮想ホストで宣言されたエクスチェンジの一覧
- Queues - すべての仮想ホストで宣言されたキューの一覧
- Bindings - エクスチェンジとキュー間のすべてのバインディングの一覧
- VirtualHosts - ブローカーに設定された仮想ホストの一覧
- VhostPermissions - 特定の仮想ホスト内のユーザー権限
- Users - すべての RabbitMQ ユーザーの一覧
- Permissions - すべての仮想ホストにわたる全ユーザーの権限レコード
- TopicPermissions - 全ユーザーのトピックレベルの権限レコード
- Policies - 仮想ホスト内のキューおよびエクスチェンジに適用されたポリシーの一覧
- OperatorPolicies - 仮想ホスト内のキューに適用されたオペレーターポリシーの一覧
- Parameters - 仮想ホストごとのコンポーネントパラメータ(例:federation、shovel)の一覧
- GlobalParameters - すべての仮想ホストに適用されるグローバルパラメータの一覧
- VhostLimits - 特定の仮想ホストに設定されたリソース制限
- UserLimits - 特定のユーザーに設定されたリソース制限
- FeatureFlags - フィーチャーフラグの一覧と、ノード上での有効/無効の状態
- DeprecatedFeatures - 非推奨機能の一覧と、その使用状態
- AuthAttempts - ノードの認証試行統計
- ClusterName - RabbitMQ クラスターの名前
- WhoAmI - 現在認証されている管理ユーザーに関する情報
- ExchangeBindingsSource - 特定のエクスチェンジがソースとなっているバインディング
- ExchangeBindingsDestination - 特定のエクスチェンジが宛先となっているバインディング
- QueueBindings - 仮想ホスト内の特定のキューのバインディング
input {
jdbc {
jdbc_driver_library => "../logstash-core/lib/jars/cdata.jdbc.api.jar"
jdbc_driver_class => "Java::cdata.jdbc.api.APIDriver"
jdbc_connection_string => "jdbc:api:Profile=C:\profiles\\RabbitMQ.apip;AuthScheme=Basic;URL=http://localhost:15672;User=guest;Password=guest;"
jdbc_user => ""
jdbc_password => ""
schedule => "*/30 * * * * *"
statement => "SELECT , FROM AuthAttempts WHERE NodeName = 'rabbit@hostname'"
}
}
output {
Elasticsearch {
index => "api_AuthAttempts"
document_id => "xxxx"
}
}
Logstash でRabbitMQ のローディングを実行
それでは作成した「logstash.conf」ファイルを元にLogstash を実行してみます。
> logstash-7.8.0\bin\logstash -f logstash.conf
成功した旨のログが出ます。これでRabbitMQ のデータがElasticsearch にロードされました。
例えばKibana で実際にElasticsearch に転送されたデータを見てみます。
GET api_AuthAttempts/_search
{
"query": {
"match_all": {}
}
}
データがElasticsearch に格納されていることが確認できました。
CData JDBC Driver for API をLogstash で使うことで、RabbitMQ コネクタとして機能し、簡単にデータをElasticsearch にロードすることができました。ぜひ、30日の無償評価版をお試しください。