CData SSIS Components を使用して RabbitMQ のデータを Databricks にマイグレーション
Databricks は、大量のデータを簡単に処理、分析、可視化できる統合データ分析プラットフォームです。データエンジニアリング、データサイエンス、機械学習の機能を単一のプラットフォームに統合し、チームがコラボレーションしてデータからインサイトを得ることを容易にします。
CData SSIS Components は、SQL Server Integration Services を拡張し、さまざまなソースやデスティネーションからデータを簡単にインポート・エクスポートできるようにします。
この記事では、Databricks へのエクスポート時のデータ型マッピングの考慮事項を確認し、CData SSIS Components for RabbitMQ と Databricks を使用してRabbitMQ のデータを Databricks にマイグレーションする方法を説明します。
データ型マッピング
| Databricks スキーマ | CData スキーマ |
|---|---|
|
int, integer, int32 |
int |
|
smallint, short, int16 |
smallint |
|
double, float, real |
float |
|
date |
date |
|
datetime, timestamp |
datetime |
|
time, timespan |
time |
|
string, varchar |
長さ > 4000 の場合:nvarchar(max)、それ以外:nvarchar(length) |
|
long, int64, bigint |
bigint |
|
boolean, bool |
tinyint |
|
decimal, numeric |
decimal |
|
uuid |
nvarchar(length) |
|
binary, varbinary, longvarbinary |
binary(1000) または SQL Server 2000 以降は varbinary(max) |
特別な考慮事項
- String/VARCHAR: Databricks の String カラムは、カラムの長さによって異なるデータ型にマッピングされます。カラムの長さが 4000 を超える場合、カラムは nvarchar(max) にマッピングされます。それ以外の場合は、nvarchar(length) にマッピングされます。
- DECIMAL: Databricks は最大 38 桁の精度の DECIMAL 型をサポートしていますが、それを超えるソースカラムはロードエラーを引き起こす可能性があります。
前提条件
- Visual Studio 2022
- Visual Studio 2022 用 SQL Server Integration Services Projects 拡張機能
- CData SSIS Components for Databricks
- CData SSIS Components for RabbitMQ
プロジェクトの作成とコンポーネントの追加
-
Visual Studio を開き、新しい Integration Services プロジェクトを作成します。
- Control Flow 画面に新しい Data Flow Task を追加し、Data Flow Task を開きます。
-
Data Flow Task に CData RabbitMQ Source コントロールと CData Databricks Destination コントロールを追加します。
RabbitMQ ソースの設定
以下の手順に従って、RabbitMQ への接続に必要なプロパティを指定します。
-
CData RabbitMQ Source をダブルクリックしてソースコンポーネントエディタを開き、新しい接続を追加します。
-
CData RabbitMQ Connection Manager で接続プロパティを設定し、接続をテストして保存します。
RabbitMQ Management HTTP API について
RabbitMQ は、複数のメッセージングプロトコルをサポートするオープンソースのメッセージブローカーです。RabbitMQ Management HTTP API は、RabbitMQ サーバーの管理データと監視データに HTTP 経由でアクセスする手段を提供します。この API では、仮想ホスト、エクスチェンジ、キュー、バインディング、コネクション、チャネル、コンシューマー、ユーザー、権限、ポリシー、クラスター全体の統計情報を取得できます。
HTTP API を利用するには、RabbitMQ サーバーで Management プラグインを有効化する必要があります。デフォルトでは、管理インターフェースはポート 15672 でリッスンします。
Basic 認証の設定
RabbitMQ Management HTTP API は HTTP Basic 認証を使用します。RabbitMQ 管理ユーザーのユーザー名とパスワードを指定する必要があります。
管理 API へのアクセスを有効にするには、以下のステップで進めます:
- サーバーで RabbitMQ Management プラグインが有効になっていることを確認します(rabbitmq-plugins enable rabbitmq_management)。
- 既存の管理ユーザーを使用するか、適切な管理タグ(management、policymaker、monitoring、または administrator)を持つユーザーを作成します。
- RabbitMQ Management HTTP API の完全なベース URL を控えておきます(例:http://localhost:15672)。
RabbitMQ サーバーを設定したら、以下の接続プロパティを設定して接続します:
- AuthScheme:Basic に設定します。
- URL:RabbitMQ Management HTTP API のベース URL に設定します(例:http://localhost:15672)。
- User:RabbitMQ の管理ユーザー名に設定します(例:guest)。
- Password:RabbitMQ の管理パスワードに設定します。
接続文字列の例:
Profile=C:\profiles\RabbitMQ.apip;AuthScheme=Basic;URL=http://localhost:15672;User=guest;Password=guest;
利用可能なテーブル
RabbitMQ プロファイルでは、以下のテーブルにアクセスできます:
- Overview - クラスター全体の統計情報と RabbitMQ ノードに関する情報
- Nodes - RabbitMQ クラスター内の個々のノードに関する情報
- NodeMemory - 特定のクラスターノードの詳細なメモリ使用状況の内訳
- Connections - ブローカーへのすべてのオープンな AMQP コネクションの一覧
- Channels - すべてのコネクションにわたるオープンな AMQP チャネルの一覧
- Consumers - すべてのキューに登録されたコンシューマーの一覧
- Exchanges - すべての仮想ホストで宣言されたエクスチェンジの一覧
- Queues - すべての仮想ホストで宣言されたキューの一覧
- Bindings - エクスチェンジとキュー間のすべてのバインディングの一覧
- VirtualHosts - ブローカーに設定された仮想ホストの一覧
- VhostPermissions - 特定の仮想ホスト内のユーザー権限
- Users - すべての RabbitMQ ユーザーの一覧
- Permissions - すべての仮想ホストにわたる全ユーザーの権限レコード
- TopicPermissions - 全ユーザーのトピックレベルの権限レコード
- Policies - 仮想ホスト内のキューおよびエクスチェンジに適用されたポリシーの一覧
- OperatorPolicies - 仮想ホスト内のキューに適用されたオペレーターポリシーの一覧
- Parameters - 仮想ホストごとのコンポーネントパラメータ(例:federation、shovel)の一覧
- GlobalParameters - すべての仮想ホストに適用されるグローバルパラメータの一覧
- VhostLimits - 特定の仮想ホストに設定されたリソース制限
- UserLimits - 特定のユーザーに設定されたリソース制限
- FeatureFlags - フィーチャーフラグの一覧と、ノード上での有効/無効の状態
- DeprecatedFeatures - 非推奨機能の一覧と、その使用状態
- AuthAttempts - ノードの認証試行統計
- ClusterName - RabbitMQ クラスターの名前
- WhoAmI - 現在認証されている管理ユーザーに関する情報
- ExchangeBindingsSource - 特定のエクスチェンジがソースとなっているバインディング
- ExchangeBindingsDestination - 特定のエクスチェンジが宛先となっているバインディング
- QueueBindings - 仮想ホスト内の特定のキューのバインディング
-
接続を保存後、「Table or view」を選択し、Databricks にエクスポートするテーブルまたはビューを選択して、CData RabbitMQ Source Editor を閉じます。
Databricks デスティネーションの設定
RabbitMQ Source を設定したら、Databricks 接続を設定してカラムをマッピングします。
-
CData Databricks Destination をダブルクリックしてデスティネーションコンポーネントエディタを開き、新しい接続を追加します。
-
CData Databricks Connection Manager で接続プロパティを設定し、接続をテストして保存します。Databricks クラスターに接続するには、以下のようにプロパティを設定します。
注意:必要な値は、Databricks インスタンスで Clusters に移動し、目的のクラスターを選択して、Advanced Options の下にある JDBC/ODBC タブを選択することで確認できます。
- Server:Databricks クラスターの Server Hostname を設定します。
- HTTPPath:Databricks クラスターの HTTP Path を設定します。
- Token:個人用アクセストークンを設定します(この値は、Databricks インスタンスの User Settings ページに移動し、Access Tokens タブを選択することで取得できます)。
その他の便利な接続プロパティ
- QueryPassthrough: True に設定すると、クエリは Databricks に直接渡されます。
- ConvertDateTimetoGMT: True に設定すると、コンポーネントはローカルマシンの時刻ではなく、日時値を GMT に変換します。
- UseUploadApi: このプロパティを true に設定すると、Bulk INSERT 操作で大量のデータがある場合にパフォーマンスが向上します。
- UseCloudFetch: このオプションは、テーブルに 100 万件を超えるエントリがある場合にクエリ効率を向上させるために CloudFetch を使用するかどうかを指定します。
-
接続を保存後、Use a Table メニューでテーブルを選択し、Action メニューで Insert を選択します。
-
Column Mappings タブで、入力カラムからデスティネーションカラムへのマッピングを設定します。
プロジェクトの実行
これでプロジェクトを実行できます。SSIS Task の実行が完了すると、SQL テーブルのデータが選択したテーブルにエクスポートされます。