Azure Databricks でRabbitMQ のデータに接続してデータ処理を行う方法
Databricks は、Apache Spark によるデータ処理機能を提供するクラウドベースのサービスです。CData JDBC ドライバと組み合わせることで、Databricks を使用してリアルタイムRabbitMQ のデータのデータエンジニアリングとデータサイエンスを実行できます。この記事では、Azure で CData JDBC ドライバをホストし、Databricks からリアルタイムRabbitMQ のデータに接続してデータを処理する方法を説明します。
最適化されたデータ処理機能を組み込んだ CData JDBC ドライバは、リアルタイムRabbitMQ のデータとのインタラクションにおいて卓越したパフォーマンスを発揮します。RabbitMQ に対して複雑な SQL クエリを発行すると、ドライバーはフィルタや集計などのサポートされている SQL 操作を直接RabbitMQにプッシュし、サポートされていない操作(主に SQL 関数や JOIN 操作)は組み込みの SQL エンジンを使用してクライアント側で処理します。動的メタデータクエリ機能により、ネイティブのデータ型を使用してRabbitMQ のデータの操作・分析が可能です。
CData JDBC ドライバを Azure にインストール
Databricks でリアルタイムRabbitMQ のデータを操作するには、Azure Data Lake Storage(ADLS)を通じてドライバーをインストールします。(以前のバージョンの記事で説明していた DBFS を介した接続方法は非推奨となっていますが、廃止日は公開されていません。)
- JDBC JAR ファイルを任意の Blob コンテナにアップロードします(例:「databrickslibraries」ストレージアカウントの「jdbcjars」コンテナ)。
- ストレージアカウントから「セキュリティとネットワーク」を展開し、「アクセスキー」をクリックしてアカウントキーを取得します。使用するキーを表示してコピーしてください。
- コンテナに移動し、JAR を保存している特定のコンテナを開き、JDBC JAR ファイルのエントリを選択して JAR ファイルの URL を取得します。ファイルの詳細が開き、URL をクリップボードにコピーするボタンがあります。この値は以下のようになります(「blob」の部分はストレージアカウントの種類によって異なる場合があります):
https://databrickslibraries.blob.core.windows.net/jdbcjars/cdata.jdbc.salesforce.jar
- Databricks クラスターの「Configuration」タブで「Edit」ボタンをクリックし、「Advanced options」を展開します。そこで、以下の Spark オプション(JAR URL のドメイン名から派生)に、コピーしたアカウントキーを値として追加し、「Confirm」をクリックします:
spark.hadoop.fs.azure.account.key.databrickslibraries.blob.core.windows.net
- Databricks クラスターの「Libraries」タブで「Install new」をクリックし、ADLS オプションを選択します。ドライバー JAR の ABFSS URL(これも JAR URL のドメイン名から派生)を指定し、「Install」をクリックします。ABFSS URL は以下のようになります:
abfss://[email protected]/cdata.jdbc.salesforce.jar
Databricks からRabbitMQに接続
JAR ファイルがインストールされたら、Databricks でリアルタイムRabbitMQ のデータを操作する準備が整いました。まず、ワークスペースで新しいノートブックを作成します。ワークブックに名前を付け、言語として Python が選択されていることを確認し(デフォルトで選択されているはずです)、「Connect」をクリックして「General Compute」から JDBC ドライバーをインストールしたクラスターを選択します(デフォルトで選択されているはずです)。
RabbitMQへの接続を設定
JDBC ドライバのクラスを参照し、JDBC URL で使用する接続文字列を構築してRabbitMQに接続します。また、JDBC URL に RTK プロパティを設定する必要があります(Beta ドライバーを使用している場合を除く)。このプロパティの設定方法については、インストールに含まれるライセンスファイルを参照してください。
driver = "cdata.jdbc.api.APIDriver" url = "jdbc:api:RTK=5246...;Profile=C:\profiles\\RabbitMQ.apip;AuthScheme=Basic;URL=http://localhost:15672;User=guest;Password=guest;"
組み込みの接続文字列デザイナー
JDBC URL の構築には、RabbitMQ JDBC Driver に組み込まれている接続文字列デザイナーを使用できます。JAR ファイルをダブルクリックするか、コマンドラインから JAR ファイルを実行してください。
java -jar cdata.jdbc.api.jar
接続プロパティを入力し、接続文字列をクリップボードにコピーします。
RabbitMQ Management HTTP API について
RabbitMQ は、複数のメッセージングプロトコルをサポートするオープンソースのメッセージブローカーです。RabbitMQ Management HTTP API は、RabbitMQ サーバーの管理データと監視データに HTTP 経由でアクセスする手段を提供します。この API では、仮想ホスト、エクスチェンジ、キュー、バインディング、コネクション、チャネル、コンシューマー、ユーザー、権限、ポリシー、クラスター全体の統計情報を取得できます。
HTTP API を利用するには、RabbitMQ サーバーで Management プラグインを有効化する必要があります。デフォルトでは、管理インターフェースはポート 15672 でリッスンします。
Basic 認証の設定
RabbitMQ Management HTTP API は HTTP Basic 認証を使用します。RabbitMQ 管理ユーザーのユーザー名とパスワードを指定する必要があります。
管理 API へのアクセスを有効にするには、以下のステップで進めます:
- サーバーで RabbitMQ Management プラグインが有効になっていることを確認します(rabbitmq-plugins enable rabbitmq_management)。
- 既存の管理ユーザーを使用するか、適切な管理タグ(management、policymaker、monitoring、または administrator)を持つユーザーを作成します。
- RabbitMQ Management HTTP API の完全なベース URL を控えておきます(例:http://localhost:15672)。
RabbitMQ サーバーを設定したら、以下の接続プロパティを設定して接続します:
- AuthScheme:Basic に設定します。
- URL:RabbitMQ Management HTTP API のベース URL に設定します(例:http://localhost:15672)。
- User:RabbitMQ の管理ユーザー名に設定します(例:guest)。
- Password:RabbitMQ の管理パスワードに設定します。
接続文字列の例:
Profile=C:\profiles\RabbitMQ.apip;AuthScheme=Basic;URL=http://localhost:15672;User=guest;Password=guest;
利用可能なテーブル
RabbitMQ プロファイルでは、以下のテーブルにアクセスできます:
- Overview - クラスター全体の統計情報と RabbitMQ ノードに関する情報
- Nodes - RabbitMQ クラスター内の個々のノードに関する情報
- NodeMemory - 特定のクラスターノードの詳細なメモリ使用状況の内訳
- Connections - ブローカーへのすべてのオープンな AMQP コネクションの一覧
- Channels - すべてのコネクションにわたるオープンな AMQP チャネルの一覧
- Consumers - すべてのキューに登録されたコンシューマーの一覧
- Exchanges - すべての仮想ホストで宣言されたエクスチェンジの一覧
- Queues - すべての仮想ホストで宣言されたキューの一覧
- Bindings - エクスチェンジとキュー間のすべてのバインディングの一覧
- VirtualHosts - ブローカーに設定された仮想ホストの一覧
- VhostPermissions - 特定の仮想ホスト内のユーザー権限
- Users - すべての RabbitMQ ユーザーの一覧
- Permissions - すべての仮想ホストにわたる全ユーザーの権限レコード
- TopicPermissions - 全ユーザーのトピックレベルの権限レコード
- Policies - 仮想ホスト内のキューおよびエクスチェンジに適用されたポリシーの一覧
- OperatorPolicies - 仮想ホスト内のキューに適用されたオペレーターポリシーの一覧
- Parameters - 仮想ホストごとのコンポーネントパラメータ(例:federation、shovel)の一覧
- GlobalParameters - すべての仮想ホストに適用されるグローバルパラメータの一覧
- VhostLimits - 特定の仮想ホストに設定されたリソース制限
- UserLimits - 特定のユーザーに設定されたリソース制限
- FeatureFlags - フィーチャーフラグの一覧と、ノード上での有効/無効の状態
- DeprecatedFeatures - 非推奨機能の一覧と、その使用状態
- AuthAttempts - ノードの認証試行統計
- ClusterName - RabbitMQ クラスターの名前
- WhoAmI - 現在認証されている管理ユーザーに関する情報
- ExchangeBindingsSource - 特定のエクスチェンジがソースとなっているバインディング
- ExchangeBindingsDestination - 特定のエクスチェンジが宛先となっているバインディング
- QueueBindings - 仮想ホスト内の特定のキューのバインディング
RabbitMQ のデータの読み込み
接続を設定したら、CData JDBC ドライバと接続情報を使用してRabbitMQ のデータをデータフレームとして読み込むことができます。
remote_table = spark.read.format ( "jdbc" ) \ .option ( "driver" , driver) \ .option ( "url" , url) \ .option ( "dbtable" , "AuthAttempts") \ .load ()
RabbitMQ のデータの表示
読み込んだRabbitMQ のデータを display 関数で確認してみましょう。
display (remote_table.select (""))
Azure Databricks でRabbitMQ のデータを分析
Databricks SparkSQL でデータを処理したい場合は、読み込んだデータを一時ビューとして登録します。
remote_table.createOrReplaceTempView ( "SAMPLE_VIEW" )
以下の SparkSQL で分析用のRabbitMQ のデータを取得できます。
result = spark.sql("SELECT , FROM SAMPLE_VIEW WHERE NodeName = 'rabbit@hostname'")
RabbitMQ からのデータは、対象のノートブック内でのみ利用可能です。他のユーザーと共有したい場合は、テーブルとして保存してください。
remote_table.write.format ( "parquet" ) .saveAsTable ( "SAMPLE_TABLE" )
CData API Driver for JDBC の30日間の無償トライアルをダウンロードして、Azure Databricks でリアルタイムRabbitMQ のデータを活用してみてください。ご不明な点があれば、サポートチームまでお気軽にお問い合わせください。