AWS Glue ジョブからRabbitMQ のデータにJDBC 経由で接続

加藤龍彦
加藤龍彦
デジタルマーケティング
Amazon S3 でホストされているCData JDBC ドライバーを使用してAWS Glue ジョブからRabbitMQ にデータ連携。

AWS Glue はAmazon のETL サービスであり、簡単にデータプレパレーションを実行してストレージおよび分析用に読み込むことができます。AWS Glue と一緒にPySpark モジュールを使用すると、JDBC 接続経由でデータを処理するジョブを作成し、そのデータをAWS データストアに直接読み込むことができます。ここでは、CData JDBC Driver for API をAmazon S3 バケットにアップロードし、RabbitMQ からデータを抽出してCSV ファイルとしてS3 に保存するためのAWS Glue ジョブを作成・実行する方法について説明します。

CData JDBC Driver for API をAmazon S3 バケットにアップロード

CData JDBC Driver for API をAWS Glue から使用するには、ドライバーの.jar ファイル(および必要なライセンスファイル)をAmazon S3 のバケットに配置する必要があります。

  1. Amazon S3 コンソールを開きます。
  2. バケットを選択、もしくは作成します。
  3. [アップロード]をクリックします。
  4. JDBC Driver の.jar ファイル(cdata.jdbc.api.jar) をインストールディレクトリのlib フォルダから選択してアップロードします。

Amazon Glue Job を設定

  1. [分析]->[AWS Glue]をクリックします。
  2. AWS Glue コンソールで、[ETL]->[ジョブ]をクリックします。
  3. [ジョブの追加]をクリックして新しいGlue ジョブを作成します。
  4. ジョブのプロパティを設定します:
    • 名前: APIGlueJob など任意のジョブ名
    • IAM ロール: AWSGlueServiceRole もしくは AmazonS3FullAccessSelect の権限があるIAM ロールを設定(JDBC Driver がAmazon S3 バケットにあるため)。
    • Type: [Spark]を選択。
    • Glue version: ドロップダウンからバージョンを選択。
    • このジョブ実行: [ユーザーが作成する新しいスクリプト]を選択。
      スクリプトプロパティの設定:
      • スクリプトファイル名: GlueAPIJDBC などのスクリプトファイル名。
      • スクリプトが保存されているS3 パス: S3 バケットを入力もしくは選択。
      • 一時ディレクトリ: S3 バケットを入力もしくは選択
    • ETL 言語: [Python]を選択
    • セキュリティ設定、スクリプトライブラリおよびジョブパラメータを展開。依存JARS パスは、JDBC の.jar ファイルをアップロードしたS3 バケットに設定。.jar ファイル名 s3://mybucket/cdata.jdbc.api.jar も含めます。
  5. [次へ]をクリックすると、ほかのAWS エンドポイントへの接続オプション追加ができます。Redshift、MySQL などに接続する際にはここで接続を作成できます。
  6. [ジョブの保存とスクリプトの編集]をクリックします。
  7. 開いたエディタで、Python スクリプトを記述します。サンプルは以下です。

サンプルGlue スクリプト

CData JDBC driver でRabbitMQ に接続するには、JDBC URL を作成します。さらにライセンスとしてJDBC URL にRTK プロパティを設定する必要があります。RTK は通常のライセンスと異なりますので、CData まで直接ご連絡をください。

RabbitMQ Management HTTP API について

RabbitMQ は、複数のメッセージングプロトコルをサポートするオープンソースのメッセージブローカーです。RabbitMQ Management HTTP API は、RabbitMQ サーバーの管理データと監視データに HTTP 経由でアクセスする手段を提供します。この API では、仮想ホスト、エクスチェンジ、キュー、バインディング、コネクション、チャネル、コンシューマー、ユーザー、権限、ポリシー、クラスター全体の統計情報を取得できます。

HTTP API を利用するには、RabbitMQ サーバーで Management プラグインを有効化する必要があります。デフォルトでは、管理インターフェースはポート 15672 でリッスンします。

Basic 認証の設定

RabbitMQ Management HTTP API は HTTP Basic 認証を使用します。RabbitMQ 管理ユーザーのユーザー名とパスワードを指定する必要があります。

管理 API へのアクセスを有効にするには、以下のステップで進めます:

  1. サーバーで RabbitMQ Management プラグインが有効になっていることを確認します(rabbitmq-plugins enable rabbitmq_management)。
  2. 既存の管理ユーザーを使用するか、適切な管理タグ(management、policymaker、monitoring、または administrator)を持つユーザーを作成します。
  3. RabbitMQ Management HTTP API の完全なベース URL を控えておきます(例:http://localhost:15672)。

RabbitMQ サーバーを設定したら、以下の接続プロパティを設定して接続します:

  • AuthScheme:Basic に設定します。
  • URL:RabbitMQ Management HTTP API のベース URL に設定します(例:http://localhost:15672)。
  • User:RabbitMQ の管理ユーザー名に設定します(例:guest)。
  • Password:RabbitMQ の管理パスワードに設定します。

接続文字列の例:

Profile=C:\profiles\RabbitMQ.apip;AuthScheme=Basic;URL=http://localhost:15672;User=guest;Password=guest;

利用可能なテーブル

RabbitMQ プロファイルでは、以下のテーブルにアクセスできます:

  • Overview - クラスター全体の統計情報と RabbitMQ ノードに関する情報
  • Nodes - RabbitMQ クラスター内の個々のノードに関する情報
  • NodeMemory - 特定のクラスターノードの詳細なメモリ使用状況の内訳
  • Connections - ブローカーへのすべてのオープンな AMQP コネクションの一覧
  • Channels - すべてのコネクションにわたるオープンな AMQP チャネルの一覧
  • Consumers - すべてのキューに登録されたコンシューマーの一覧
  • Exchanges - すべての仮想ホストで宣言されたエクスチェンジの一覧
  • Queues - すべての仮想ホストで宣言されたキューの一覧
  • Bindings - エクスチェンジとキュー間のすべてのバインディングの一覧
  • VirtualHosts - ブローカーに設定された仮想ホストの一覧
  • VhostPermissions - 特定の仮想ホスト内のユーザー権限
  • Users - すべての RabbitMQ ユーザーの一覧
  • Permissions - すべての仮想ホストにわたる全ユーザーの権限レコード
  • TopicPermissions - 全ユーザーのトピックレベルの権限レコード
  • Policies - 仮想ホスト内のキューおよびエクスチェンジに適用されたポリシーの一覧
  • OperatorPolicies - 仮想ホスト内のキューに適用されたオペレーターポリシーの一覧
  • Parameters - 仮想ホストごとのコンポーネントパラメータ(例:federation、shovel)の一覧
  • GlobalParameters - すべての仮想ホストに適用されるグローバルパラメータの一覧
  • VhostLimits - 特定の仮想ホストに設定されたリソース制限
  • UserLimits - 特定のユーザーに設定されたリソース制限
  • FeatureFlags - フィーチャーフラグの一覧と、ノード上での有効/無効の状態
  • DeprecatedFeatures - 非推奨機能の一覧と、その使用状態
  • AuthAttempts - ノードの認証試行統計
  • ClusterName - RabbitMQ クラスターの名前
  • WhoAmI - 現在認証されている管理ユーザーに関する情報
  • ExchangeBindingsSource - 特定のエクスチェンジがソースとなっているバインディング
  • ExchangeBindingsDestination - 特定のエクスチェンジが宛先となっているバインディング
  • QueueBindings - 仮想ホスト内の特定のキューのバインディング

ビルトイン接続文字列デザイナー

JDBC URL の作成をサポートするビルトインの接続文字列デザイナーがあります。ドライバーの.jar ファイルをダブルクリックするか、コマンドラインで.jar ファイルを実行するとデザイナーが開きます。

java -jar cdata.jdbc.api.jar

必要項目を入力すると、デザインs-下部に接続文字列が生成されますのでクリップボードにコピーして使います。

Using the built-in connection string designer to generate a JDBC URL (Salesforce is shown.)

CData JDBC driver をPySpark で使用して、AWS Glue モジュールでRabbitMQ のデータを取得して、S3 にCSV 形式で保存するシンプルなスクリプト例は以下です。

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame
from awsglue.job import Job

args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sparkContext = SparkContext()
glueContext = GlueContext(sparkContext)
sparkSession = glueContext.spark_session

##Use the CData JDBC driver to readRabbitMQ のデータfrom the AuthAttempts table into a DataFrame
##Note the populated JDBC URL and driver class name
source_df = sparkSession.read.format("jdbc").option("url","jdbc:api:RTK=5246...;Profile=C:\profiles\\RabbitMQ.apip;AuthScheme=Basic;URL=http://localhost:15672;User=guest;Password=guest;").option("dbtable","AuthAttempts").option("driver","cdata.jdbc.api.APIDriver").load()

glueJob = Job(glueContext)
glueJob.init(args['JOB_NAME'], args)

##Convert DataFrames to AWS Glue's DynamicFrames Object
dynamic_dframe = DynamicFrame.fromDF(source_df, glueContext, "dynamic_df")

##Write the DynamicFrame as a file in CSV format to a folder in an S3 bucket.
##It is possible to write to any Amazon data store (SQL Server, Redshift, etc) by using any previously defined connections.
retDatasink4 = glueContext.write_dynamic_frame.from_options(frame = dynamic_dframe, connection_type = "s3", connection_options = {"path": "s3://mybucket/outfiles"}, format = "csv", transformation_ctx = "datasink4")

glueJob.commit()

Glueジョブを実行する

スクリプト記述後、Glue ジョブを実行します。実行した取得/ロードのジョブが完了するとAWS Glue コンソールのジョブページでステータスが確認できます。成功するとS3 バケットにRabbitMQ のデータのCSV ファイルが生成されています。

このようにCData JDBC Driver for API をAWS Glue で使用することで、RabbitMQ のデータをAWS Glue で自在に扱うことができます。Glue の外部データへの接続性を拡張するJDBC Driver を是非お試しください。

はじめる準備はできましたか?

API Driver で RabbitMQ のライブデータに接続

RabbitMQ に接続