CData Python Connector でCockroachDB 用の Singer.io Tap を構築

Mohsin Turki
Mohsin Turki
Technical Marketing Engineer
CData Python Connector を使用して、リアルタイムのCockroachDB データ用の Singer.io Tap を簡単に構築する方法を解説。

Singer.io は、標準化された JSON ベースのエクストラクター(Tap)とローダー(Target)を使用してデータソースと宛先を接続する、無料のオープンソース ETL フレームワークです。Tap を通じてソース(CRM やデータベースなど)からデータを抽出し、Target システム(Snowflake など)に読み込みます。すべてデータの整合性と構造を維持しながら行います。

CData Python Connector は、350 以上のデータソースに対してカスタム Tap と Target を構築するための、リアルタイムでノーコードのツールを提供することで、このプロセスを強化します。

このガイドでは、CData Python Connector を使用して CockroachDB 用の Tap を構築し、抽出したデータを CSV Targettarget-csv)に読み込みます。 CData Python Connector for CockroachDB の 無料 Community ライセンスVSCode を使用し、sys_tablecolumns システムテーブルを活用してスキーマとメタデータを効率的に取得します。

それでは始めましょう!

前提条件

  1. CData Python Connector for CockroachDB:無料の Community Edition ライセンスはこちらからリクエストしてダウンロードできます。 すでにライセンスをお持ちの場合やトライアルをご利用の場合は、 こちらから CData Python Connector for CockroachDB をダウンロードまたはアップデートできます。
  2. Singer.io:pip でインストール:pip install singer-python
  3. アクティブな CockroachDB アカウント。
  4. マシンにインストールされた Python ディストリビューションと、VSCode または任意のテキストエディタまたは IDE。

はじめに

概要

以下は、実行するステップの概要です:

  1. インストール:CData Connector for CockroachDB、Singer.io、Singer CSV Target(target-csv)をセットアップします。
  2. 構築:Tap を構築し、CockroachDB への接続を確立し、メタデータとスキーマ情報を取得します。
  3. 移動:CockroachDB から CSV Target にデータを移動またはレプリケートします。

ヒント:多くのファイルとフォルダを操作することになります。このプロジェクト専用のフォルダを作成することをお勧めします。 例: Singer-Tap。すべてのプロジェクトファイルをそこに配置してください。


ステップ 1:CData Connector for CockroachDB と Singer.io のインストールと設定

1.1 CData Python Connector for CockroachDB のインストール

依存関係に関する注意:Python Connector は Python バージョン 3.8、3.9、3.10、3.11、3.12 をサポートしています。 この範囲外のバージョンを使用している場合は、virtualenv で仮想環境を作成する必要があるかもしれません。

  1. Community Edition Connector の ZIP をダウンロードして展開し、任意の場所に配置します。
  2. ターミナルまたはコマンドプロンプトを開き、展開したインストールディレクトリに移動するか、 .whl ファイルがある場所でターミナルを直接開きます。 例: C:\Users\Public\Downloads\CockroachDBPythonConnector\ CData.Python.CockroachDB\win\Python312\64
  3. Windows の場合:pip を使用して .whl ファイルをインストールします。Python のバージョンとアーキテクチャに一致していることを確認してください。例: pip install cdata_cockroachdb_connector-24.0.9111-cp312-cp312-win_amd64.whl
  4. Linux または macOS の場合:pip を使用して .tar.gz ファイルをインストールします。例: pip install cdata_cockroachdb_connector-24.0.####-python3.tar.gz
  5. pip list を実行してインストールを確認します。 cdata-cockroachdb-connector が表示されていれば、インストール成功です。

1.2 CData Connector のライセンスをインストール

これはオプションのステップです。マシン用の無料 Community Edition ライセンスは、 .whl ファイルを使用して Connector をインストールする際にすでにインストールされているはずです。

ただし、ライセンスが表示されない場合やトライアルバージョンを使用している場合は、 メールで送信されたキーを使用して無料 Community Edition ライセンスをインストールしてください。

受け取っていない場合は、 こちらから Connector 用にリクエストしてください。

Windows の場合

  1. ライセンスを含む ZIP ファイルをダウンロードして展開します。
  2. ターミナルまたはコマンドプロンプトを開きます。
  3. 以下に移動します: C:\Users\Username\AppData\Local\Programs\Python\Python312\Lib\site-packages\cdata\installlic_cockroachdb
  4. または、別途ダウンロードした場合は、展開したディレクトリに移動します。例: C:\Downloads\cdata\installlic_cockroachdb
  5. 実行します:
    .\license-installer.exe [YOUR LICENSE KEY HERE]
  6. 画面に表示される名前とメール登録のプロンプトに入力して、インストールを完了します。

macOS/Linux の場合

  1. ライセンスを含む ZIP ファイルをダウンロードして展開します。
  2. 展開したディレクトリ内でターミナルを開きます。例:
    cd ~/Downloads/CData-Python-CockroachDB
  3. 以下に移動します: /usr/local/lib/python3.12/site-packages/cdata/installlic_cockroachdb
  4. 実行します:
    ./license-installer [YOUR LICENSE KEY HERE]
  5. 画面に表示される名前とメール登録のプロンプトに入力して、インストールを完了します。

1.3 Singer.io と target-csv のインストール

  1. pip で Singer.io をインストールします:
    pip install singer-python
  2. プロジェクトフォルダに仮想環境を作成して target-csv をインストールします:

    Windows の場合:

                python -m venv %USERPROFILE%\target-csv
                %USERPROFILE%\target-csv\Scripts\activate
                pip install target-csv
                deactivate
            

    macOS/Linux の場合:

                python3 -m venv ~/.virtualenvs/target-csv
                source ~/.virtualenvs/target-csv/bin/activate
                pip install target-csv
                deactivate
            
  3. 競合:Python 3.10 以降では、MutableMapping クラスが collections から collections.abc に移動されたため、 target-csv 仮想環境で AttributeError が発生する可能性があります。修正方法:
    1. ~\target-csv\Scripts から target_csv.py をテキストエディタで開きます。
    2. 以下を置換します:
      import collections
      を:
      from collections.abc import MutableMapping
    3. この行を更新します:
      isinstance(v, collections.MutableMapping)
      を:
      isinstance(v, MutableMapping)

ステップ 2:CockroachDB 用の Tap を構築

すべてのセットアップが完了したので、CockroachDB 用の Singer Tap を構築しましょう。 この Tap は CockroachDB インスタンスに接続し、CData のシステムテーブルを使用してメタデータを取得し、 転送用のデータを準備します。

この記事の最後に完全な動作コードをコピーできますが、以下でステップバイステップで説明します。

2.1 接続の確立

  1. プロジェクトディレクトリに新しいファイルを作成し、tap-cockroachdb.py という名前を付けます。 テキストエディタでファイルを開き、以下のコードを追加します:
  2. ファイルの先頭で、必要なライブラリをインポートします:
                import singer
                import cdata.cockroachdb as mod
                from datetime import date, datetime
          
  3. CockroachDB の資格情報を使用して接続を作成する関数を定義します。 実際の接続詳細を自分のものに置き換えることを忘れないでください:
        
        def create_connection():
            return mod.connect(
                "User=root;"
                "Password=root;"
                "Database=system;"
                "Server=localhost;"
                "Port=26257;"
        )
        
  4. ファイルを実行して、接続が成功し、構文エラーや認証エラーがないことを確認します。
  5. 注意:利用可能なすべての接続パラメータを確認したい場合や、 高度な認証(OAuth、SSO、プロキシ)を使用している場合は、 ダウンロードした Connector ディレクトリ内の help.htm ファイルを参照して、 AuthScheme とパラメータを適切に調整してください。
    パス:~\CockroachDBPythonConnector\CData.Python.CockroachDB\help\help.htm

2.2 クエリロジック、スキーママッピング、データエクスポートの定義

接続が動作するようになったので、メタデータのクエリ、データ型の Singer 互換フォーマットへのマッピング、 標準出力 stdout へのレコード書き込みのロジックを定義しましょう。

  1. システムテーブルまたはターゲットテーブルからデータを取得する再利用可能なクエリ関数を作成します:
            def query(conn, sql):
                cur = conn.cursor()
                cur.execute(sql)
                return cur.description, cur.fetchall()
            
  2. クエリするテーブルを定義します。テスト用に、返されるレコード数を制限することもできます:
                TABLE_NAME = 'Opportunity'  # 目的のオブジェクト/テーブルに変更
                RECORD_LIMIT = 50           # すべての行を取得するには None または 0 を使用
            
  3. ソースデータ型を Singer の JSON Schema 互換型にマッピングするヘルパー関数を作成します:
                def mapping_types(sf_type):
                    t = sf_type.lower()
                    if 'int' in t:
                        return ['integer', 'null']
                    if 'float' in t or 'decimal' in t or 'double' in t:
                        return ['number', 'null']
                    if 'bool' in t:
                        return ['boolean', 'null']
                    if 'date' in t or 'time' in t:
                        return ['string', 'null']
                    return ['string', 'null']
            
  4. メタデータを取得し、スキーマを生成し、データレコードを書き込む関数を記述します:
                def write_records():
                    conn = create_connection()
    
                    # スキーマ用のカラムと型を取得
                    _, cols = query(conn, f"SELECT ColumnName, DataTypeName FROM sys_tablecolumns WHERE TableName = '{TABLE_NAME}'")
                    schema = {
                        'properties': {col: {'type': mapping_types(dtype)} for col, dtype in cols}
                    }
    
                    # RECORD_LIMIT に基づいてクエリを動的に構築
                    limit_clause = f"LIMIT {RECORD_LIMIT}" if RECORD_LIMIT and RECORD_LIMIT > 0 else ""
                    query_cmd = f"SELECT * FROM {TABLE_NAME} {limit_clause}"
                    columns, records = query(conn, query_cmd)
    
                    stream = TABLE_NAME.lower()
                    singer.write_schema(stream, schema, [columns[0][0]] if columns else [])
    
                    for row in records:
                        record = {
                            columns[i][0]: (val.isoformat() if isinstance(val, (date, datetime)) else val)
                            for i, val in enumerate(row)
                        }
                        singer.write_record(stream, record)
    
                    conn.close()
            
  5. 最後に、main ブロックで関数を呼び出します:
                if __name__ == '__main__':
                    write_records()
            
  6. Tap の準備ができました。プレースホルダーの資格情報を更新し、TABLE_NAME をクエリするオブジェクトに設定してください。
  7. すべての行を読み込むには、RECORD_LIMIT = 0 に設定するか、クエリから LIMIT 句を削除してください。

ステップ 3:データの移動

Tap の準備ができたので、CockroachDB インスタンスから使用可能なフォーマットにデータを移動しましょう。 target-csv を使用して、データをタブ区切りの CSV ファイルとして出力します。

target-csv に出力のフォーマット方法と保存先を指示するための my-config.json ファイルが必要です。 区切り文字、引用符文字、出力パス、ログレベル、メタデータ設定などの構成設定を定義します。

  1. プロジェクトフォルダに my-config.json を作成し、以下を使用して設定します:

    {
        "delimiter": "	",
        "quotechar": "'",
        "destination_path": "YOUR/PATH/FOR/CSV/OUTPUTS",
        "disable_collection": true,
        "verbosity": 5
    }
  2. target-csv の仮想環境をアクティベート
    ターミナルまたは PowerShell を開いて実行します:
    \Singer-Project\target-csv\Scripts\Activate.ps1
  3. パイプラインを実行
    tap の出力を CSV target にパイプします:
    python tap16.py | .\target-csv\Scripts\target-csv -c my-config.json
  4. 出力 CSV ファイルを確認
    destination_path から出力 CSV ファイルを開き、データを確認します。

まとめ:CData でさらに多くの Tap と Target を構築

CockroachDB(および 350 以上の他のデータソース)用のカスタム Singer Tap の構築は、 CData Python Connector を使用すると、より速く、より簡単になります。 組み込みのスキーマ検出、リアルタイムデータアクセス、プラグアンドプレイの接続性があり、すべて最小限のコードで実現できます。

今すぐ無料トライアルを開始して、ETL パイプラインを簡素化しましょう。


完全なコード



import singer
import cdata.cockroachdb as mod
from datetime import date, datetime

def create_connection():
    return mod.connect(
         "User=root;"
         "Password=root;"
         "Database=system;"
         "Server=localhost;"
         "Port=26257;"
    )


def query(conn, sql):
cur = conn.cursor()
cur.execute(sql)
return cur.description, cur.fetchall()

TABLE_NAME = 'Opportunity'  # 目的のオブジェクト/テーブルに変更
RECORD_LIMIT = 50           # 必要に応じて変更

def mapping_types(sf_type):
    t = sf_type.lower()
    if 'int' in t:
        return ['integer', 'null']
    if 'float' in t or 'decimal' in t or 'double' in t:
        return ['number', 'null']
    if 'bool' in t:
        return ['boolean', 'null']
    if 'date' in t or 'time' in t:
        return ['string', 'null']
    return ['string', 'null']

def write_records():
    conn = create_connection()

    # スキーマ用のカラムと型を取得
    _, cols = query(conn, f"SELECT ColumnName, DataTypeName FROM sys_tablecolumns WHERE TableName = '{TABLE_NAME}'")
    schema = {
        'properties': {col: {'type': mapping_types(dtype)} for col, dtype in cols}
    }

    # 実際のデータ行を取得
    columns, records = query(conn, f"SELECT * FROM {TABLE_NAME} LIMIT {RECORD_LIMIT}")
    stream = TABLE_NAME.lower()
    singer.write_schema(stream, schema, [columns[0][0]] if columns else [])

    for row in records:
        record = {
            columns[i][0]: (val.isoformat() if isinstance(val, (date, datetime)) else val)
            for i, val in enumerate(row)
        }
        singer.write_record(stream, record)

    conn.close()

if __name__ == '__main__':
    write_records()

はじめる準備はできましたか?

CockroachDB Connector のコミュニティライセンスをダウンロード:

 ダウンロード

詳細:

CockroachDB Icon CockroachDB Python Connector お問い合わせ

CockroachDB データ連携用Python コネクタライブラリ。CockroachDB データをpandas、SQLAlchemy、Dash、petl などの人気のPython ツールにシームレスに統合。