
CData Sync V26.2 では、イベントスクリプトの記述言語として Python が新たに追加されました。データエコシステムの主流である Python をそのまま使ってレプリケーションジョブの前後処理を書けるようになったことで、イベントの自動化がより幅広いエンジニアに開かれます。本記事では、Python イベントを使うために必要な前提条件・セットアップ手順を整理したうえで、ユースケースとして 2 つのサンプル(Pre-Job でのデータ品質チェック、Post-Job での Webhook 連携)を紹介します。なお、検証には CData Sync 26.2.9629.0 を使用しています。
CData Sync のイベントとは
イベントは、CData Sync の処理フローの要所にカスタムロジックを差し込み、通知・検証・下流処理のトリガーなどを行える仕組みです。ジョブでは Pre-Job / Post-Job の 2 つの実行タイミングが、パイプラインでは変換や別のイベントの前後にイベントステップとして挟み込むタイミングが、それぞれ用意されています。
V26.2 での対応内容
CData Sync 26.2 から、イベントスクリプトの記述言語として Python が公式にサポートされました。データエコシステムの主流である Python をそのまま使えるため、イベントの自動化がより幅広いエンジニアに開かれます。スクリプト構文や利用できる組み込み変数・オペレーションといった詳細仕様は Python スクリプトのヘルプ を参照してください。
前提条件
CData Sync で Python イベントを使うには、Sync が動作しているサーバー上に以下のソフトウェアが必要です。
Jep とは
Jep(Java Embedded Python)は、Java から Python インタープリタを呼び出すためのブリッジライブラリです。CData Sync は Java ベースのアプリケーションのため、Python コードを実行するには JVM と Python の橋渡し役として Jep が必要になります。
セットアップ方法
Sync が動作しているサーバー上で、Python と Jep のセットアップを 1 度だけ行います。
Python のインストール
Python 公式サイト から OS に合わせて Python 3.10 以上をインストールします。
Jep のインストール
Jep は Python の pip 経由でインストールします。
python3 -m pip install "jep==4.2.2"
なお、Jep のビルドには Java のヘッダーが必要となるため、別途 JDK をインストールしたうえで、環境変数 JAVA_HOME を JDK のインストールディレクトリに設定してから pip install を実行してください。また、Linux ディストリビューションによっては、Jep のビルドに C コンパイラ(gcc)や Python 開発用ヘッダー(python3-devel / python3-dev パッケージ)の導入が追加で必要になる場合があります。
環境変数の設定
CData Sync が Python と Jep を正しく見つけられるよう、以下の環境変数を設定します。
環境変数 | 設定内容 |
|---|
PythonHome
| Python インストールのルートディレクトリ |
LibraryPath
| (オプション)libjep.so のパス。CData Sync が自動検出できない場合のみ指定 |
PythonSitePackages
| (オプション)Python の site-packages ディレクトリのパス。CData Sync が自動検出できない場合のみ指定 |
Linux ディストリビューションによっては、pip install jep で Jep が想定されている /lib/site-packages/jep/ ではなく、/usr/local/lib64/python3.6/site-packages/jep/ のような別のパスにインストールされることがあります。その場合は LibraryPath に libjep.so の実際のパスを指定してください。
OS の環境変数を変更することで他システムに影響が出るのを避けたい場合は、CData Sync インストールディレクトリ配下の sync.properties に cdata.initParameters として設定する方法もあります。sync.properties が存在しない場合は、sync.properties ファイルの生成 を参考に生成してください。
Windows の例
cdata.initParameters=PythonHome:C:/Libs/Python312/Python312_64
Linux の例
cdata.initParameters=PythonHome:/opt/syncvenv,LibraryPath:/opt/syncvenv/lib/python3.12/site-packages/jep/libjep.so,PythonSitePackages:/opt/syncvenv/lib/python3.12/site-packages
セットアップが完了すれば、以降はジョブの Pre-Job / Post-Job イベントや、パイプラインのイベントステップで Python が使えるようになります。
Python イベントスクリプトのユースケース
ここからは、Python イベントスクリプトの具体的な活用例を 2 つ紹介します。1 つ目はレプリケーション前にソースのデータ品質を検証する Pre-Job のサンプル、2 つ目はレプリケーション後にジョブメトリクスを Webhook で外部システムに連携する Post-Job のサンプルです。どちらも Python 標準ライブラリだけで動かせる最小構成にしているので、そのまま試したり自分の運用に合わせて拡張する出発点として活用できます。
ユースケース 1:Pre-Job イベントでのデータ品質チェック
レプリケーション開始前にソースのデータ品質を検証し、想定外の状態であればジョブを停止できます。誤ったデータが同期先へ流れ込むのを防ぐうえで、もっとも手軽な防衛ラインになります。
以下のサンプルは、ソース側のレコード件数を内部 API で取得し、閾値を下回っていれば例外を投げてジョブを中断する例です。
# レプリケーション開始前に、ソース側のデータ品質をチェックする
import urllib.request
import json
_ctx.log("INFO", "Pre-Job データ品質チェックを開始")
# 例:別途用意した内部 API から件数を取得(社内のメタデータ API などを想定)
endpoint = "https://internal.example.com/api/orders/row-count"
with urllib.request.urlopen(endpoint, timeout=10) as resp:
payload = json.loads(resp.read())
row_count = payload["row_count"]
threshold = 1000
if row_count < threshold:
msg = f"orders の件数が想定下限を下回りました({row_count} 件)"
_ctx.log("ERROR", msg)
raise Exception(msg)
_ctx.log("INFO", f"データ品質チェック OK({row_count} 件)")
パイプラインのイベントステップで「Stop flow if error」を有効にすることで、スクリプト内で raise Exception(...) を呼び出すと後続のジョブ実行が停止します。_ctx.log で書いたメッセージは CData Sync のアプリケーションログから追えるため、スクリプトのデバッグにも役立ちます。

実行すると、_ctx.log で出力したメッセージはアプリケーションログ(管理画面右上のログアイコン → 「アプリケーション」タブ)から確認できます。raise Exception(...) で投げたエラーメッセージもそのまま記録されます。

ユースケース 2:Webhook へのジョブメトリクス送信
Post-Job イベントでジョブの結果(件数・実行時間・成否)を JSON で外部 Webhook に POST する、もっともシンプルな観測連携を紹介します。Slack、Datadog、Splunk、OpenTelemetry のようなプラットフォームに連携する際の最小サンプルとして活用できます。
以下のサンプルは Python 標準ライブラリだけで完結するため、追加のパッケージインストールは不要です。
# Post-Job イベントで、ジョブメトリクスを JSON で Webhook に送信する
import urllib.request
import urllib.error
import json
WEBHOOK_URL = "https://webhook.example.com/cdata-sync/jobs"
payload = {
"job_name": _input.get("jobname"),
"job_id": _input.get("jobid"),
"status": _input.get("jobstatus"),
"source": _input.get("source"),
"destination": _input.get("destination"),
"runtime": _input.get("runtime"),
"success_count": _input.get("successqueriescount"),
"failed_count": _input.get("failedqueriescount"),
}
req = urllib.request.Request(
WEBHOOK_URL,
data=json.dumps(payload).encode("utf-8"),
headers={"Content-Type": "application/json"},
)
try:
with urllib.request.urlopen(req, timeout=10) as resp:
_ctx.log("INFO", f"Webhook に送信しました(status={resp.status}): {payload}")
except urllib.error.HTTPError as e:
body = e.read().decode()
_ctx.log("ERROR", f"Webhook がエラーレスポンスを返しました(status={e.code}): {body}")
raise
except urllib.error.URLError as e:
_ctx.log("ERROR", f"Webhook への接続に失敗しました: {e.reason}")
raise
Slack や Datadog など、特定のサービスに連携する場合は、それぞれの SaaS が期待する JSON スキーマに合わせて payload を組み替え、認証ヘッダーなども付与する必要があります。本サンプルはあくまで「Python から HTTP で JSON を投げる」最小例として捉え、連携先ごとの仕様に合わせてカスタマイズしてください。

まとめ
Python サポートにより、CData Sync のイベントスクリプトをデータエンジニアに馴染みのある言語でそのまま書けるようになりました。本記事で紹介したパターン以外にも、後続パイプラインのキックや内部 API 呼び出しなど、Python のエコシステムを活かした拡張が可能です。
CData Sync の 30 日間無償トライアルはこちらからダウンロードいただけます。
また、使っていて気になった点やご不明な点があれば、お気軽にテクニカルサポートまでお問い合わせください。