CData SSIS Components を使用して RabbitMQ のデータを Snowflake にマイグレーション

Cameron Leblanc
Cameron Leblanc
Senior Technology Evangelist
CData SSIS Tasks for RabbitMQ と Snowflake を使用して、RabbitMQ のデータを Snowflake に簡単にプッシュできます。



Snowflake は、エンタープライズ BI、分析、データ管理、ガバナンスの取り組みで広く利用されている先進的なクラウドデータウェアハウスです。Snowflake は、データ共有、リアルタイムデータ処理、安全なデータストレージなどの機能を提供しており、クラウドデータ統合の一般的な選択肢となっています。

CData SSIS Components は、SQL Server Integration Services を拡張し、さまざまなソースやデスティネーションからデータを簡単にインポート・エクスポートできるようにします。

この記事では、Snowflake へのエクスポート時のデータ型マッピングの考慮事項を確認し、CData SSIS Components for RabbitMQ と Snowflake を使用してRabbitMQ のデータを Snowflake にマイグレーションする方法を説明します。

データ型マッピング

Snowflake スキーマ CData スキーマ

NUMBER, DECIMAL, NUMERIC, INT, INTEGER, BIGINT, SMALLINT, TINYINT, BYTEINT

decimal

DOUBLE, FLOAT, FLOAT4, FLOAT8, DOUBLEPRECISION, REAL

real

VARCHAR, CHAR, STRING, TEXT, VARIANT, OBJECT, ARRAY, GEOGRAPHY

varchar

BINARY, VARBINARY

binary

BOOLEAN

bool

DATE

date

DATETIME, TIMESTAMP, TIMESTAMP_LTZ, TIMESTAMP_NTZ, TIMESTAMP_TZ

datetime

TIME

time

特別な考慮事項

  • 大文字小文字の区別: Snowflake はデフォルトで識別子の大文字小文字を厳密に一致させるため、大文字小文字の不一致に起因する問題が発生することがよくあります。これらの問題を解決するには、CData SSIS Components for Snowflake 接続で IgnoreCase プロパティを True に設定します。このプロパティは、Snowflake の QUOTED_IDENTIFIERS_IGNORE_CASE プロパティに直接マッピングされ、Snowflake が識別子を大文字小文字を区別するかどうかを指定します。
  • タイムスタンプ: Snowflake は 3 つのタイムスタンプ型をサポートしています:

    • TIMESTAMP_NTZ: このタイムスタンプは、指定された精度で UTC 時刻を格納します。ただし、すべての操作は、TIMEZONE セッションパラメータで制御される現在のセッションのタイムゾーンで実行されます。
    • TIMESTAMP_LTZ: このタイムスタンプは、指定された精度で「壁時計」時刻を格納します。すべての操作は、タイムゾーンを考慮せずに実行されます。
    • TIMESTAMP_TZ: このタイムスタンプは、関連するタイムゾーンオフセットとともに UTC 時刻を格納します。タイムゾーンが指定されていない場合、セッションのタイムゾーンオフセットが使用されます。

    デフォルトでは、CData SSIS Components は手動で設定しない限り、タイムスタンプを TIMESTAMP_NTZ として Snowflake に書き込みます。

前提条件

プロジェクトの作成とコンポーネントの追加

  1. Visual Studio を開き、新しい Integration Services プロジェクトを作成します。
  2. Control Flow 画面に新しい Data Flow Task を追加し、Data Flow Task を開きます。
  3. Data Flow Task に CData RabbitMQ Source コントロールと CData Snowflake Destination コントロールを追加します。

RabbitMQ ソースの設定

以下の手順に従って、RabbitMQ への接続に必要なプロパティを指定します。

  1. CData RabbitMQ Source をダブルクリックしてソースコンポーネントエディタを開き、新しい接続を追加します。
  2. CData RabbitMQ Connection Manager で接続プロパティを設定し、接続をテストして保存します。

    RabbitMQ Management HTTP API について

    RabbitMQ は、複数のメッセージングプロトコルをサポートするオープンソースのメッセージブローカーです。RabbitMQ Management HTTP API は、RabbitMQ サーバーの管理データと監視データに HTTP 経由でアクセスする手段を提供します。この API では、仮想ホスト、エクスチェンジ、キュー、バインディング、コネクション、チャネル、コンシューマー、ユーザー、権限、ポリシー、クラスター全体の統計情報を取得できます。

    HTTP API を利用するには、RabbitMQ サーバーで Management プラグインを有効化する必要があります。デフォルトでは、管理インターフェースはポート 15672 でリッスンします。

    Basic 認証の設定

    RabbitMQ Management HTTP API は HTTP Basic 認証を使用します。RabbitMQ 管理ユーザーのユーザー名とパスワードを指定する必要があります。

    管理 API へのアクセスを有効にするには、以下のステップで進めます:

    1. サーバーで RabbitMQ Management プラグインが有効になっていることを確認します(rabbitmq-plugins enable rabbitmq_management)。
    2. 既存の管理ユーザーを使用するか、適切な管理タグ(management、policymaker、monitoring、または administrator)を持つユーザーを作成します。
    3. RabbitMQ Management HTTP API の完全なベース URL を控えておきます(例:http://localhost:15672)。

    RabbitMQ サーバーを設定したら、以下の接続プロパティを設定して接続します:

    • AuthScheme:Basic に設定します。
    • URL:RabbitMQ Management HTTP API のベース URL に設定します(例:http://localhost:15672)。
    • User:RabbitMQ の管理ユーザー名に設定します(例:guest)。
    • Password:RabbitMQ の管理パスワードに設定します。

    接続文字列の例:

    Profile=C:\profiles\RabbitMQ.apip;AuthScheme=Basic;URL=http://localhost:15672;User=guest;Password=guest;
    

    利用可能なテーブル

    RabbitMQ プロファイルでは、以下のテーブルにアクセスできます:

    • Overview - クラスター全体の統計情報と RabbitMQ ノードに関する情報
    • Nodes - RabbitMQ クラスター内の個々のノードに関する情報
    • NodeMemory - 特定のクラスターノードの詳細なメモリ使用状況の内訳
    • Connections - ブローカーへのすべてのオープンな AMQP コネクションの一覧
    • Channels - すべてのコネクションにわたるオープンな AMQP チャネルの一覧
    • Consumers - すべてのキューに登録されたコンシューマーの一覧
    • Exchanges - すべての仮想ホストで宣言されたエクスチェンジの一覧
    • Queues - すべての仮想ホストで宣言されたキューの一覧
    • Bindings - エクスチェンジとキュー間のすべてのバインディングの一覧
    • VirtualHosts - ブローカーに設定された仮想ホストの一覧
    • VhostPermissions - 特定の仮想ホスト内のユーザー権限
    • Users - すべての RabbitMQ ユーザーの一覧
    • Permissions - すべての仮想ホストにわたる全ユーザーの権限レコード
    • TopicPermissions - 全ユーザーのトピックレベルの権限レコード
    • Policies - 仮想ホスト内のキューおよびエクスチェンジに適用されたポリシーの一覧
    • OperatorPolicies - 仮想ホスト内のキューに適用されたオペレーターポリシーの一覧
    • Parameters - 仮想ホストごとのコンポーネントパラメータ(例:federation、shovel)の一覧
    • GlobalParameters - すべての仮想ホストに適用されるグローバルパラメータの一覧
    • VhostLimits - 特定の仮想ホストに設定されたリソース制限
    • UserLimits - 特定のユーザーに設定されたリソース制限
    • FeatureFlags - フィーチャーフラグの一覧と、ノード上での有効/無効の状態
    • DeprecatedFeatures - 非推奨機能の一覧と、その使用状態
    • AuthAttempts - ノードの認証試行統計
    • ClusterName - RabbitMQ クラスターの名前
    • WhoAmI - 現在認証されている管理ユーザーに関する情報
    • ExchangeBindingsSource - 特定のエクスチェンジがソースとなっているバインディング
    • ExchangeBindingsDestination - 特定のエクスチェンジが宛先となっているバインディング
    • QueueBindings - 仮想ホスト内の特定のキューのバインディング
  3. 接続を保存後、「Table or view」を選択し、Snowflake にエクスポートするテーブルまたはビューを選択して、CData RabbitMQ Source Editor を閉じます。

Snowflake デスティネーションの設定

RabbitMQ Source を設定したら、Snowflake 接続を設定してカラムをマッピングします。

  1. CData Snowflake Destination をダブルクリックしてデスティネーションコンポーネントエディタを開き、新しい接続を追加します。
  2. CData Snowflake Connection Manager で接続プロパティを設定し、接続をテストして保存します。
    • コンポーネントは、Snowflake ユーザー認証、フェデレーション認証、SSL クライアント認証をサポートしています。認証するには、User と Password を設定し、AuthScheme プロパティで認証方法を選択します。Snowflake の bundle 2024_08(2024 年 10 月)を使用して作成されたアカウントからは、セキュリティ上の懸念からパスワードベースの認証がサポートされなくなりました。代わりに、OAuth や秘密鍵認証などの代替認証方法を使用してください。

    その他の便利な接続プロパティ

    • QueryPassthrough: True に設定すると、クエリは Snowflake に直接渡されます。
    • ConvertDateTimetoGMT: True に設定すると、コンポーネントはローカルマシンの時刻ではなく、日時値を GMT に変換します。
    • IgnoreCase: Snowflake が識別子を大文字小文字を区別するかどうかを指定するセッションパラメータです。デフォルト:false(大文字小文字を区別する)。
    • BindingType: DEFAULT と TEXT の 2 種類のバインディングタイプがあります。DEFAULT は、Date 型に DATE、Time 型に TIME、Timestamp_* 型に TIMESTAMP_* のバインディングタイプを使用します。TEXT は、Date、Time、Timestamp_* 型に TEXT のバインディングタイプを使用します。
  3. 接続を保存後、Use a Table メニューでテーブルを選択し、Action メニューで Insert を選択します。
  4. Column Mappings タブで、入力カラムからデスティネーションカラムへのマッピングを設定します。

プロジェクトの実行

これでプロジェクトを実行できます。SSIS Task の実行が完了すると、SQL テーブルのデータが選択したテーブルにエクスポートされます。

はじめる準備はできましたか?

API Driver で RabbitMQ のライブデータに接続

RabbitMQ に接続