SQLAlchemy ORM を使用して Python で RabbitMQ のデータ にアクセスする方法
Python の豊富なモジュールエコシステムを活用することで、迅速に作業を開始し、システムを効果的に統合できます。CData API Driver for Python と SQLAlchemy ツールキットを使用して、RabbitMQ に接続された Python アプリケーションやスクリプトを構築できます。この記事では、SQLAlchemy を使用して RabbitMQ のデータ に接続し、クエリを実行する方法を説明します。
CData Python Connector は最適化されたデータ処理機能を内蔵しており、Python からリアルタイムの RabbitMQ のデータ を操作する際に比類のないパフォーマンスを提供します。RabbitMQ に対して複雑な SQL クエリを発行すると、CData Connector はフィルタや集計などのサポートされている SQL 操作を直接 RabbitMQ にプッシュし、サポートされていない操作(多くの場合 SQL 関数や JOIN 操作)は組み込みの SQL エンジンを使用してクライアント側で処理します。
RabbitMQ のデータ への接続
RabbitMQ のデータ への接続は、他のリレーショナルデータソースへの接続と同様です。必要な接続プロパティを使用して接続文字列を作成します。この記事では、接続文字列を create_engine 関数のパラメータとして渡します。
RabbitMQ Management HTTP API について
RabbitMQ は、複数のメッセージングプロトコルをサポートするオープンソースのメッセージブローカーです。RabbitMQ Management HTTP API は、RabbitMQ サーバーの管理データと監視データに HTTP 経由でアクセスする手段を提供します。この API では、仮想ホスト、エクスチェンジ、キュー、バインディング、コネクション、チャネル、コンシューマー、ユーザー、権限、ポリシー、クラスター全体の統計情報を取得できます。
HTTP API を利用するには、RabbitMQ サーバーで Management プラグインを有効化する必要があります。デフォルトでは、管理インターフェースはポート 15672 でリッスンします。
Basic 認証の設定
RabbitMQ Management HTTP API は HTTP Basic 認証を使用します。RabbitMQ 管理ユーザーのユーザー名とパスワードを指定する必要があります。
管理 API へのアクセスを有効にするには、以下のステップで進めます:
- サーバーで RabbitMQ Management プラグインが有効になっていることを確認します(rabbitmq-plugins enable rabbitmq_management)。
- 既存の管理ユーザーを使用するか、適切な管理タグ(management、policymaker、monitoring、または administrator)を持つユーザーを作成します。
- RabbitMQ Management HTTP API の完全なベース URL を控えておきます(例:http://localhost:15672)。
RabbitMQ サーバーを設定したら、以下の接続プロパティを設定して接続します:
- AuthScheme:Basic に設定します。
- URL:RabbitMQ Management HTTP API のベース URL に設定します(例:http://localhost:15672)。
- User:RabbitMQ の管理ユーザー名に設定します(例:guest)。
- Password:RabbitMQ の管理パスワードに設定します。
接続文字列の例:
Profile=C:\profiles\RabbitMQ.apip;AuthScheme=Basic;URL=http://localhost:15672;User=guest;Password=guest;
利用可能なテーブル
RabbitMQ プロファイルでは、以下のテーブルにアクセスできます:
- Overview - クラスター全体の統計情報と RabbitMQ ノードに関する情報
- Nodes - RabbitMQ クラスター内の個々のノードに関する情報
- NodeMemory - 特定のクラスターノードの詳細なメモリ使用状況の内訳
- Connections - ブローカーへのすべてのオープンな AMQP コネクションの一覧
- Channels - すべてのコネクションにわたるオープンな AMQP チャネルの一覧
- Consumers - すべてのキューに登録されたコンシューマーの一覧
- Exchanges - すべての仮想ホストで宣言されたエクスチェンジの一覧
- Queues - すべての仮想ホストで宣言されたキューの一覧
- Bindings - エクスチェンジとキュー間のすべてのバインディングの一覧
- VirtualHosts - ブローカーに設定された仮想ホストの一覧
- VhostPermissions - 特定の仮想ホスト内のユーザー権限
- Users - すべての RabbitMQ ユーザーの一覧
- Permissions - すべての仮想ホストにわたる全ユーザーの権限レコード
- TopicPermissions - 全ユーザーのトピックレベルの権限レコード
- Policies - 仮想ホスト内のキューおよびエクスチェンジに適用されたポリシーの一覧
- OperatorPolicies - 仮想ホスト内のキューに適用されたオペレーターポリシーの一覧
- Parameters - 仮想ホストごとのコンポーネントパラメータ(例:federation、shovel)の一覧
- GlobalParameters - すべての仮想ホストに適用されるグローバルパラメータの一覧
- VhostLimits - 特定の仮想ホストに設定されたリソース制限
- UserLimits - 特定のユーザーに設定されたリソース制限
- FeatureFlags - フィーチャーフラグの一覧と、ノード上での有効/無効の状態
- DeprecatedFeatures - 非推奨機能の一覧と、その使用状態
- AuthAttempts - ノードの認証試行統計
- ClusterName - RabbitMQ クラスターの名前
- WhoAmI - 現在認証されている管理ユーザーに関する情報
- ExchangeBindingsSource - 特定のエクスチェンジがソースとなっているバインディング
- ExchangeBindingsDestination - 特定のエクスチェンジが宛先となっているバインディング
- QueueBindings - 仮想ホスト内の特定のキューのバインディング
以下の手順に従って SQLAlchemy をインストールし、Python オブジェクトを通じて RabbitMQ にアクセスしてみましょう。
必要なモジュールのインストール
pip ユーティリティを使用して、SQLAlchemy ツールキットと SQLAlchemy ORM パッケージをインストールします。
pip install sqlalchemy pip install sqlalchemy.orm
適切なモジュールをインポートします。
from sqlalchemy import create_engine, String, Column from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker
Python での RabbitMQ のデータ のモデリング
これで接続文字列を使用して接続できます。create_engine 関数を使用して、RabbitMQ のデータ を操作するための Engine を作成します。
注意: 接続文字列のプロパティに特殊文字が含まれている場合は、URL エンコードする必要があります。詳細については、SQL Alchemy ドキュメントを参照してください。
engine = create_engine("api:///?Profile=C:\profiles\\RabbitMQ.apip&AuthScheme=Basic&URL=http://localhost:15672&User=guest&Password=guest")
RabbitMQ のデータ のマッピングクラスの宣言
接続を確立したら、ORM でモデル化するテーブルのマッピングクラスを宣言します(この記事では、AuthAttempts テーブルをモデル化します)。sqlalchemy.ext.declarative.declarative_base 関数を使用して、一部またはすべてのフィールド(カラム)を定義した新しいクラスを作成します。
base = declarative_base() class AuthAttempts(base): __tablename__ = "AuthAttempts" = Column(String,primary_key=True) = Column(String) ...
RabbitMQ のデータ のクエリ
マッピングクラスを準備したら、セッションオブジェクトを使用してデータソースにクエリを実行できます。Engine をセッションにバインドした後、セッションの query メソッドにマッピングクラスを渡します。
query メソッドの使用
engine = create_engine("api:///?Profile=C:\profiles\\RabbitMQ.apip&AuthScheme=Basic&URL=http://localhost:15672&User=guest&Password=guest")
factory = sessionmaker(bind=engine)
session = factory()
for instance in session.query(AuthAttempts).filter_by(NodeName="rabbit@hostname"):
print(": ", instance.)
print(": ", instance.)
print("---------")
別の方法として、適切なテーブルオブジェクトと execute メソッドを使用することもできます。以下のコードはアクティブな session で動作します。
execute メソッドの使用
AuthAttempts_table = AuthAttempts.metadata.tables["AuthAttempts"]
for instance in session.execute(AuthAttempts_table.select().where(AuthAttempts_table.c.NodeName == "rabbit@hostname")):
print(": ", instance.)
print(": ", instance.)
print("---------")
JOIN、集計、制限などのより複雑なクエリの例については、拡張機能のヘルプドキュメントを参照してください。
無料トライアルと詳細情報
CData API Driver for Python の30日間の無料トライアルをダウンロードして、RabbitMQ のデータ に接続する Python アプリとスクリプトの構築を始めましょう。ご質問がありましたら、サポートチームまでお問い合わせください。