CData SSIS Components を使用して RabbitMQ のデータを Google BigQuery にマイグレーション

Cameron Leblanc
Cameron Leblanc
Senior Technology Evangelist
CData SSIS Tasks for RabbitMQ と Google BigQuery を使用して、RabbitMQ のデータを Google BigQuery に簡単にプッシュできます。



Google BigQuery は、サーバーレスで高いスケーラビリティとコスト効率を備えたデータウェアハウスであり、組織がビッグデータを実用的なインサイトに変換できるよう設計されています。

CData SSIS Components は、SQL Server Integration Services を拡張し、さまざまなソースやデスティネーションからデータを簡単にインポート・エクスポートできるようにします。

この記事では、BigQuery へのエクスポート時のデータ型マッピングの考慮事項を確認し、CData SSIS Components for RabbitMQ と BigQuery を使用してRabbitMQ のデータを Google BigQuery にマイグレーションする方法を説明します。

データ型マッピング

Google BigQuery スキーマ CData スキーマ

STRING, GEOGRAPHY, JSON, INTERVAL

string

BYTES

binary

INTEGER

long

FLOAT

double

NUMERIC, BIGNUMERIC

decimal

BOOLEAN

bool

DATE

date

TIME

time

DATETIME, TIMESTAMP

datetime

STRUCT

下記参照

ARRAY

下記参照


STRUCT 型と ARRAY 型

Google BigQuery は、1 つの行に複合値を格納するための STRUCT と ARRAY という 2 種類の型をサポートしています。Google BigQuery の一部では、これらは RECORD 型および REPEATED 型としても知られています。

STRUCT は、名前でアクセスでき、異なる型を持つことができる固定サイズの値のグループです。コンポーネントは struct をフラット化し、ドット表記の名前でフィールドにアクセスできるようにします。これらのドット表記の名前は引用符で囲む必要があることに注意してください。

ARRAY は、同じ型の値で任意のサイズを持つことができるグループです。コンポーネントは配列を単一の複合値として扱い、JSON 集約として報告します。これらの型は組み合わせることができ、STRUCT 型が ARRAY フィールドを含んだり、ARRAY フィールドが STRUCT 値のリストになったりする場合があります。

特別な考慮事項

  • Google BigQuery には、DATETIME(タイムゾーンなし)と TIMESTAMP(タイムゾーンあり)の両方のデータ型があり、CData SSIS Components はローカルマシンのタイムゾーンに基づいて datetime にマッピングします。
  • Google BigQuery では、NUMERIC 型は 38 桁の精度と小数点以下最大 9 桁をサポートし、BIGNUMERIC 型は 76 桁の精度と小数点以下最大 38 桁をサポートします。CData SSIS Components for Google BigQuery は精度/スケールを自動検出しますが、Destination コンポーネントでは高精度カラムを手動でマッピングできます。
  • INTERVAL データ型:
    • コンポーネントは INTERVAL 型を文字列として表現します。クエリで INTERVAL 型が必要な場合は、BigQuery SQL の INTERVAL フォーマットを使用して INTERVAL を指定する必要があります:
      YEAR-MONTH DAY HOUR:MINUTE:SECOND.FRACTION
    • 例えば、「5 年と 11 ヶ月、マイナス 10 日と 3 時間と 2.5 秒」という値は正しいフォーマットでは以下のようになります:
      5-11 -10 -3:0:0.2.5

前提条件

プロジェクトの作成とコンポーネントの追加

  1. Visual Studio を開き、新しい Integration Services プロジェクトを作成します。
  2. Control Flow 画面に新しい Data Flow Task を追加し、Data Flow Task を開きます。
  3. Data Flow Task に CData RabbitMQ Source コントロールと CData GoogleBigQuery Destination コントロールを追加します。

RabbitMQ ソースの設定

以下の手順に従って、RabbitMQ への接続に必要なプロパティを指定します。

  1. CData RabbitMQ Source をダブルクリックしてソースコンポーネントエディタを開き、新しい接続を追加します。
  2. CData RabbitMQ Connection Manager で接続プロパティを設定し、接続をテストして保存します。

    RabbitMQ Management HTTP API について

    RabbitMQ は、複数のメッセージングプロトコルをサポートするオープンソースのメッセージブローカーです。RabbitMQ Management HTTP API は、RabbitMQ サーバーの管理データと監視データに HTTP 経由でアクセスする手段を提供します。この API では、仮想ホスト、エクスチェンジ、キュー、バインディング、コネクション、チャネル、コンシューマー、ユーザー、権限、ポリシー、クラスター全体の統計情報を取得できます。

    HTTP API を利用するには、RabbitMQ サーバーで Management プラグインを有効化する必要があります。デフォルトでは、管理インターフェースはポート 15672 でリッスンします。

    Basic 認証の設定

    RabbitMQ Management HTTP API は HTTP Basic 認証を使用します。RabbitMQ 管理ユーザーのユーザー名とパスワードを指定する必要があります。

    管理 API へのアクセスを有効にするには、以下のステップで進めます:

    1. サーバーで RabbitMQ Management プラグインが有効になっていることを確認します(rabbitmq-plugins enable rabbitmq_management)。
    2. 既存の管理ユーザーを使用するか、適切な管理タグ(management、policymaker、monitoring、または administrator)を持つユーザーを作成します。
    3. RabbitMQ Management HTTP API の完全なベース URL を控えておきます(例:http://localhost:15672)。

    RabbitMQ サーバーを設定したら、以下の接続プロパティを設定して接続します:

    • AuthScheme:Basic に設定します。
    • URL:RabbitMQ Management HTTP API のベース URL に設定します(例:http://localhost:15672)。
    • User:RabbitMQ の管理ユーザー名に設定します(例:guest)。
    • Password:RabbitMQ の管理パスワードに設定します。

    接続文字列の例:

    Profile=C:\profiles\RabbitMQ.apip;AuthScheme=Basic;URL=http://localhost:15672;User=guest;Password=guest;
    

    利用可能なテーブル

    RabbitMQ プロファイルでは、以下のテーブルにアクセスできます:

    • Overview - クラスター全体の統計情報と RabbitMQ ノードに関する情報
    • Nodes - RabbitMQ クラスター内の個々のノードに関する情報
    • NodeMemory - 特定のクラスターノードの詳細なメモリ使用状況の内訳
    • Connections - ブローカーへのすべてのオープンな AMQP コネクションの一覧
    • Channels - すべてのコネクションにわたるオープンな AMQP チャネルの一覧
    • Consumers - すべてのキューに登録されたコンシューマーの一覧
    • Exchanges - すべての仮想ホストで宣言されたエクスチェンジの一覧
    • Queues - すべての仮想ホストで宣言されたキューの一覧
    • Bindings - エクスチェンジとキュー間のすべてのバインディングの一覧
    • VirtualHosts - ブローカーに設定された仮想ホストの一覧
    • VhostPermissions - 特定の仮想ホスト内のユーザー権限
    • Users - すべての RabbitMQ ユーザーの一覧
    • Permissions - すべての仮想ホストにわたる全ユーザーの権限レコード
    • TopicPermissions - 全ユーザーのトピックレベルの権限レコード
    • Policies - 仮想ホスト内のキューおよびエクスチェンジに適用されたポリシーの一覧
    • OperatorPolicies - 仮想ホスト内のキューに適用されたオペレーターポリシーの一覧
    • Parameters - 仮想ホストごとのコンポーネントパラメータ(例:federation、shovel)の一覧
    • GlobalParameters - すべての仮想ホストに適用されるグローバルパラメータの一覧
    • VhostLimits - 特定の仮想ホストに設定されたリソース制限
    • UserLimits - 特定のユーザーに設定されたリソース制限
    • FeatureFlags - フィーチャーフラグの一覧と、ノード上での有効/無効の状態
    • DeprecatedFeatures - 非推奨機能の一覧と、その使用状態
    • AuthAttempts - ノードの認証試行統計
    • ClusterName - RabbitMQ クラスターの名前
    • WhoAmI - 現在認証されている管理ユーザーに関する情報
    • ExchangeBindingsSource - 特定のエクスチェンジがソースとなっているバインディング
    • ExchangeBindingsDestination - 特定のエクスチェンジが宛先となっているバインディング
    • QueueBindings - 仮想ホスト内の特定のキューのバインディング
  3. 接続を保存後、「Table or view」を選択し、Google BigQuery にエクスポートするテーブルまたはビューを選択して、CData RabbitMQ Source Editor を閉じます。

Google BigQuery デスティネーションの設定

RabbitMQ Source を設定したら、Google BigQuery 接続を設定してカラムをマッピングします。

  1. CData Google BigQuery Destination をダブルクリックしてデスティネーションコンポーネントエディタを開き、新しい接続を追加します。
  2. CData GoogleBigQuery Connection Manager で接続プロパティを設定し、接続をテストして保存します。
    • Google は OAuth 認証標準を使用しています。個々のユーザーに代わって Google API にアクセスするには、埋め込み資格情報を使用するか、独自の OAuth アプリを登録できます。 OAuth を使用すると、サービスアカウントを使用して Google Apps ドメイン内のユーザーに代わって接続することもできます。サービスアカウントで認証するには、アプリケーションを登録して OAuth JWT 値を取得します。 OAuth 値に加えて、DatasetId と ProjectId を指定します。OAuth の使用ガイドについては、ヘルプドキュメントの「Getting Started」章を参照してください。

    便利な接続プロパティ

    • QueryPassthrough: True に設定すると、クエリは Google BigQuery に直接渡されます。
    • ConvertDateTimetoGMT: True に設定すると、コンポーネントはローカルマシンの時刻ではなく、日時値を GMT に変換します。
    • FlattenObjects: デフォルトでは、コンポーネントは STRUCT カラムの各フィールドを独自のカラムとして報告し、STRUCT カラム自体は非表示にします。False に設定すると、トップレベルの STRUCT は展開されず、独自のカラムとして残ります。このカラムの値は JSON 集約として報告されます。
    • SupportCaseSensitiveTables: このプロパティを true に設定すると、同じ名前で大文字小文字が異なるテーブルは、すべてメタデータで報告されるように名前が変更されます。デフォルトでは、プロバイダーはテーブル名を大文字小文字を区別しないものとして扱うため、複数のテーブルが同じ名前で大文字小文字が異なる場合、メタデータでは 1 つだけが報告されます。
  3. 接続を保存後、Use a Table メニューでテーブルを選択し、Action メニューで Insert を選択します。
  4. Column Mappings タブで、入力カラムからデスティネーションカラムへのマッピングを設定します。

プロジェクトの実行

これでプロジェクトを実行できます。SSIS Task の実行が完了すると、SQL テーブルのデータが選択したテーブルにエクスポートされます。

はじめる準備はできましたか?

API Driver で RabbitMQ のライブデータに接続

RabbitMQ に接続