CData Python Connector を使って Snowflake 用の Singer.io Tap を構築する
Singer.io は、データソースとデータの同期先を接続する無料のオープンソース ETL フレームワークです。 標準化された JSON ベースの Tap(データ抽出)と Target(データロード)を使用します。 Tap を通じてソース(CRM やデータベースなど)からデータを抽出し、 分析用の Target システム(Snowflake など)にロードします。 この一連の処理で、データの整合性と構造が保持されます。
CData Python Connector を使用すると、数百種類のデータソースや同期先に対して信頼性の高い、 高パフォーマンスな接続を提供し、複雑なコーディングなしでスムーズかつ最適化されたデータ移動を実現できます。
この記事では、CData コネクタと Singer.io を使って Python で Snowflake 用の Tap を構築し、Snowflake から Singer.io の CSV ターゲット target-csv にデータを移動する方法を解説します。 無料のコミュニティライセンスで CData Python Connector for Snowflake を使用し、VSCode で作業を進めます。また、 CData の sys_tables システムテーブルを使用して、スキーマとメタデータを動的に取得し、 データ抽出を簡素化してパイプラインの効率を向上させる方法も紹介します。
それでは、始めましょう!
前提条件
- CData Python Connector for Snowflake。無料のコミュニティエディションライセンスは こちらからリクエストできます(ダウンロードリンク付き)。 すでにライセンスをお持ちの場合は、CData Python Connector for Snowflake を こちらからダウンロードできます。
- Python ディストリビューション。お使いのマシン用の最新バージョンを こちらからダウンロードしてください。 インストール時に "Add Python to PATH" オプションにチェックを入れてください。
- Visual Studio Code。 こちらからダウンロードしてください。
はじめに
概要
以下の手順で進めていきます:
- インストール:CData コネクタ、Singer.io、Singer CSV Target(target-csv)をインストールして設定します。
- 接続:Snowflake への接続を確立し、メタデータとスキーマ情報を取得します。
- 構築:データの移動とレプリケーションを行う Snowflake Tap を構築します。
このプロジェクトでは、多くのファイルやサブフォルダを扱います。 専用のフォルダを作成することをお勧めします。例えば: Singer-Snowflake を作成し、すべてのプロジェクトファイルをその中に配置してください。
ステップ 1:インストールのセットアップ
1.1 CData Python Connector for Snowflake のインストール
以下の手順で CData Python Connector for Snowflake をインストールして設定します:
依存関係に関する注意:この Python コネクタは Python バージョン 3.8、3.9、3.10、3.11、3.12 をサポートしています。 これら以外のバージョンの Python を使用している場合は、仮想環境を作成する必要があるかもしれません。
- ダウンロードしたコネクタの ZIP ファイルを任意の場所に展開します。
-
ターミナルまたはコマンドプロンプトを開き、対応するインストールディレクトリに移動するか、
.whl ファイルがあるディレクトリで直接ターミナルを開きます。
例:
C:\Users\Public\Downloads\CDataPythonConnectorforSnowflake\CData.Python.Snowflake\win\Python312\64
-
Windows の場合:pip を使用して .whl ファイルをインストールします。
お使いの Python バージョンとアーキテクチャに適したバージョンを使用してください。
コマンド例:
pip install cdata_snowflake_connector-24.0.9111-cp312-cp312-win_amd64.whl
-
Linux または macOS の場合:pip を使用して .tar.gz ファイルをインストールします。コマンド例:
pip install cdata_snowflake_connector-24.0.####-python3.tar.gz
- pip list を実行してインストールが成功したことを確認します。 cdata-snowflake-connector がリストに表示されていれば、インストールは成功です。
1.2 CData コネクタのライセンスをインストールする
これはオプションの手順です。.whl ファイルを使用してコネクタをインストールすると、 お使いのマシン用の無料のコミュニティエディションライセンスが自動的にインストールされます。
ただし、ライセンスが表示されない場合や、CData コネクタの試用版を使用している場合は、 メールで送信されたライセンスキーを使用して無料のコミュニティエディションライセンスを インストールできます。
ライセンスキーを受け取っていない場合は、CData Snowflake Connector のコミュニティライセンスを こちらからリクエストできます。
Windows の場合
- ライセンスが含まれた ZIP ファイルをダウンロードして展開します。
- ターミナルまたはコマンドプロンプトを開きます。
- ライセンスインストーラーの場所に移動します。通常は以下の場所にあります: C:\Users\Username\AppData\Local\Programs\Python\Python312\Lib\site-packages\cdata\installlic_snowflake
- または、別途ダウンロードした場合: C:\Downloads\cdata\installlic_snowflake
- 以下のコマンドでインストーラーを実行します:
.\license-installer.exe [ライセンスキーをここに入力]
- 画面に表示される指示に従って、名前とメールアドレスを入力してインストールを完了します。
macOS/Linux の場合
- ライセンスが含まれた ZIP ファイルをダウンロードして展開します。
- 展開したディレクトリ内でターミナルを開きます。例:
cd ~/Downloads/CData-Python-Snowflake
- ライセンスインストーラーの場所に移動します。通常は以下の場所にあります: /usr/local/lib/python3.12/site-packages/cdata/installlic_snowflake
- インストーラーを実行します:
./license-installer [ライセンスキーをここに入力]
- 画面に表示される指示に従って、名前とメールアドレスを入力してインストールを完了します。
1.3 Singer.io と target-csv のインストール
-
以下のコマンドで pip を使用して Singer.io をインストールします:
pip install singer-python
-
仮想環境を作成して target-csv をインストールします:
Windows の場合:
python -m venv %USERPROFILE%\target-csv %USERPROFILE%\target-csv\Scripts\activate pip install target-csv deactivate
macOS/Linux の場合:
python3 -m venv ~/.virtualenvs/target-csv source ~/.virtualenvs/target-csv/bin/activate pip install target-csv deactivate
-
競合について:Python 3.10 以降では、MutableMapping クラスが
collections から collections.abc に移動されたため、
target-csv 仮想環境で AttributeError が発生する可能性があります。
これを修正するには:
- ~\target-csv\Scripts から target_csv.py をテキストエディタで開きます。
- 以下の部分を置き換えます:
import collections
を以下に変更:from collections.abc import MutableMapping
- 以下の行を更新します:
isinstance(v, collections.MutableMapping)
を以下に変更:isinstance(v, MutableMapping)
ステップ 2:接続、設定、クエリ
必要なコンポーネントのインストールが完了したら、次は Snowflake への 接続を確立し、メタデータとスキーマ情報を取得します。 これにより、データの移動やレプリケーションを効率的に行うためのスキーマを構築できます。
2.1 接続の確立
- プロジェクトフォルダに meta_snowflake.py という名前の新しい Python ファイルを作成し、ファイルを開きます。
- ファイルの先頭に以下の import 文を追加します:
import singer import cdata.snowflake as mod import sys import os import time from datetime import date, datetime
- 接続関数を作成し、プレースホルダーの値を実際の Snowflake の情報に置き換えます:
def create_connection(): conn = mod.connect( "AuthScheme=Password;" "url=YourSnowflakeURL.snowflakecomputing.com;" "user=Your_Username;" "password=Your_Password;" "Database=Your_DB;" "Warehouse=Your_WH;" ) return conn - スクリプトを実行して、エラーなく接続できることを確認します。
- OAuth、SSO、プロキシなどの高度な認証方法を使用している場合は、 AuthScheme= を適宜更新してください。完全なドキュメントは、 ダウンロードしたコネクタフォルダ内の HTML ヘルプファイルにあります: ~\SnowflakePythonConnector\CData.Python.Snowflake\help\help.htm
2.2 クエリによるメタデータの取得
CData コネクタのシステムテーブルを使用して、Snowflake ウェアハウスから メタデータとスキーマ情報を取得できます。system_table を使用すると、 コネクタはカラム名、データ型、行数などのすべてのテーブル詳細を自動的に取得し、 Snowflake スキーマの完全な概要を提供します。 詳細については、コネクタのヘルプファイルをご確認ください。
以下のコードを接続プロパティの下に貼り付け、保存して実行します。 YOUR_TABLE_NAME を Snowflake データベーススキーマ内の 任意のテーブル名に更新してください:
conn = create_connection()
table_count = conn.execute("SELECT COUNT(*) FROM sys_tables").fetchone()[0]
account_columns = conn.execute(
"""
SELECT ColumnName, DataTypeName
FROM sys_tablecolumns
WHERE TableName = 'YOUR_TABLE_NAME'
"""
).fetchall()
print(f"Tables: {table_count}, Columns in 'YOUR_TABLE_NAME': {len(account_columns)}\n")
print("\n".join(f"{col[0]} ({col[1]})" for col in account_columns))
conn.close()
これにより、データベース内のテーブルの総数と YOUR_TABLE_NAME の カラムの総数が表示され、すべてのカラムとそれぞれのデータ型の詳細なリストも表示されます。 これはスキーマ構造の理解に役立ち、より効率的なデータ管理を可能にします。
このメタデータを使用して、スキーマの構築、テーブル構造のマッピング、 システム間のデータ移行を効率的に行えます。このアプローチにより、 ETL(Extract, Transform, Load)プロセス、データ検証、分析などの 操作を実行する前に必要なスキーマの洞察を得ることができます。
この例では、以下のカラムを使用してスキーマを作成します:
{
'Id': {'type': 'string'},
'Name': {'type': 'string'},
'BillingCity': {'type': 'string'},
'AnnualRevenue': {'type': 'number'}
}

ステップ 3:Tap の構築
Snowflake ウェアハウスに接続してスキーマのメタデータを取得できたので、 次はデータレプリケーション用のスキーマを定義して Tap を構築します。 以下の手順で Tap を作成し、設定して、データを CSV ファイルにエクスポートします:
-
プロジェクトディレクトリに tap_snowflake.py という新しいファイルを作成し、 前述の import 文と接続プロパティを追加します。次に、接続設定の後に以下のコードを貼り付けます。 テーブル構造に応じてスキーマを定義およびマッピングし、 with open("YOUR_PATH") の実行ログファイルのパスを更新してください。
# Account テーブルのスキーマを定義 schema = { 'properties': { 'Id': {'type': 'string'}, 'Name': {'type': 'string'}, 'BillingCity': {'type': 'string'}, 'AnnualRevenue': {'type': 'number'} } } # Snowflake から Account データを取得 def fetch_sf_data(): conn = create_connection() try: query = 'SELECT "Id", "Name", "BillingCity", "AnnualRevenue" FROM "DEMO_DB"."CRM"."Account"' return conn.execute(query).fetchall() except Exception as e: sys.stderr.write(f"Error fetching data: {e}\n") return [] finally: conn.close() # スキーマ、レコードを書き込み、パフォーマンスをログに記録 def write_records(): start_time = time.perf_counter_ns() singer.write_schema('account', schema, ['Id']) records = fetch_sf_data() for record in records: singer.write_record('account', { 'Id': record[0] or 'N/A', 'Name': record[1] or 'Unknown', 'BillingCity': record[2] or '', 'AnnualRevenue': record[3] or 0 }) duration_ms = (time.perf_counter_ns() - start_time) / 1_000_000 with open("YOUR_PATH/execution_log_snow.txt", "w") as log_file: log_file.write(f"Execution Time: {duration_ms:.3f} ms\nTotal Records: {len(records)}\n") # メインガード if __name__ == "__main__": write_records() -
target-csv ツールは、カスタマイズ用のオプションの JSON 設定ファイルを受け付けます。 my-config.json というファイルを作成し、必要に応じて設定します。以下は基本的な例です:
{ "delimiter": "\t", "quotechar": "'", "destination_path": "YOUR/PATH/FOR/CSV/OUTPUTS", "disable_collection": true } -
ターミナルを開き、プロジェクトディレクトリに移動して、以下のコマンドで target-csv 仮想環境をアクティベートします:
~\target-csv\Scripts\activate -
次に、以下のコマンドを実行して Tap を実行し、出力を CSV にパイプします:
python tap_snowflake.py | .\target-csv\Scripts\target-csv -c my-config.json

これで Snowflake 用の Singer Tap を正常に構築できました。定義したスキーマが target-csv ターゲットを使用して CSV ファイルに出力されます。 エクスポートされた CSV ファイルは、プロジェクトフォルダまたは my-config.json で定義した場所で確認できます。
コードで定義した場所にある execution_log_snow.txt ファイルを確認して、 移動/レプリケーションされたレコード数と合計実行時間(ミリ秒単位)を確認してください。

次のステップ
Snowflake 用の Singer Tap を構築し、データを移動することに成功しました。しかし、これは CData Python Connector でできることのほんの一部に過ぎません。以下のような発展的な活用も可能です:
- 他のソース用の Tap を構築:PostgreSQL、MySQL などのデータベースや、Salesforce、HubSpot などの API 用の Tap を作成して、データパイプラインを統合できます。
- 増分データロードを組み込む:Tap を修正して新規または更新されたレコードのみを追跡・ロードし、大規模なデータセットでの効率を向上させます。
- データをストリーム中に変換:データが同期先に到達する前に、クリーニング、フォーマット、エンリッチメントなどの前処理ステップを追加できます。
- 他のターゲットと統合:CSV 以外にも、BigQuery、Redshift などのクラウドウェアハウスやライブダッシュボードにデータをプッシュできます。
詳細については、デベロッパーリソースセンターのドキュメントとチュートリアルガイドをご覧ください。
完全なコード
meta_snowflake.py
import singer
import cdata.snowflake as mod
import sys
import os
import time
from datetime import date, datetime
# Snowflake 接続を作成
def create_connection():
conn = mod.connect(
"AuthScheme=Password;"
"url=YourSnowflakeURL.snowflakecomputing.com;"
"user=Your_Username;"
"password=Your_Password;"
"Database=Your_DB;"
"Warehouse=Your_WH;"
)
return conn
# テーブル数とカラムメタデータを取得
conn = create_connection()
table_count = conn.execute("SELECT COUNT(*) FROM sys_tables").fetchone()[0]
account_columns = conn.execute(
"""
SELECT ColumnName, DataTypeName
FROM sys_tablecolumns
WHERE TableName = 'YOUR_TABLE_NAME'
"""
).fetchall()
# メタデータを出力
print(f"Tables: {table_count}, Columns in 'YOUR_TABLE_NAME': {len(account_columns)}\n")
print("\n".join(f"{col[0]} ({col[1]})" for col in account_columns))
conn.close()
tap_snowflake.py
import singer
import cdata.snowflake as mod
import sys
import os
import time
from datetime import date, datetime
# Snowflake 接続を作成
def create_connection():
conn = mod.connect(
"AuthScheme=Password;"
"url=YourSnowflakeURL.snowflakecomputing.com;"
"user=Your_Username;"
"password=Your_Password;"
"Database=Your_DB;"
"Warehouse=Your_WH;"
)
return conn
# Account テーブルのスキーマを定義
schema = {
'properties': {
'Id': {'type': 'string'},
'Name': {'type': 'string'},
'BillingCity': {'type': 'string'},
'AnnualRevenue': {'type': 'number'}
}
}
# Snowflake から Account データを取得
def fetch_sf_data():
conn = create_connection()
try:
query = 'SELECT "Id", "Name", "BillingCity", "AnnualRevenue" FROM "DEMO_DB"."CRM"."Account"'
return conn.execute(query).fetchall()
except Exception as e:
sys.stderr.write(f"Error fetching data: {e}\n")
return []
finally:
conn.close()
# スキーマ、レコードを書き込み、パフォーマンスをログに記録
def write_records():
start_time = time.perf_counter_ns()
singer.write_schema('account', schema, ['Id'])
records = fetch_sf_data()
for record in records:
singer.write_record('account', {
'Id': record[0] or 'N/A',
'Name': record[1] or 'Unknown',
'BillingCity': record[2] or '',
'AnnualRevenue': record[3] or 0
})
duration_ms = (time.perf_counter_ns() - start_time) / 1_000_000
with open("YOUR_PATH/execution_log_snow.txt", "w") as log_file:
log_file.write(f"Execution Time: {duration_ms:.3f} ms\nTotal Records: {len(records)}\n")
# メインガード
if __name__ == "__main__":
write_records()