CData Connect AI を使用して Dataiku と Salesforce のデータ を統合
Dataiku は、チームがガバナンスされた環境内で機械学習や生成 AI プロジェクトを設計、デプロイ、管理できるコラボレーティブなデータサイエンス・AI プラットフォームです。エージェントと GenAI フレームワークにより、カスタムワークフローとモデルオーケストレーションを通じてデータを分析、生成、操作できるインテリジェントエージェントを構築できます。
Dataiku を CData Connect AI の組み込み MCP(Model Context Protocol)サーバーと統合することで、これらのエージェントはリアルタイムのSalesforce のデータにセキュアにアクセスできるようになります。この統合により、Dataiku のエージェント実行環境と CData のガバナンスされたエンタープライズ接続レイヤーが橋渡しされ、すべてのクエリや指示が手動エクスポートやステージングなしで、承認されたデータソースに対して安全に実行されます。
この記事では、Connect AI での Salesforce 接続の設定、MCP サポートを含む Dataiku の Python コード環境の準備、そして Dataiku 内から直接リアルタイムのSalesforce のデータにクエリ・操作できるエージェントの作成方法を説明します。
Salesforce データ連携について
CData を使用すれば、Salesforce のライブデータへのアクセスと統合がこれまでになく簡単になります。お客様は CData の接続機能を以下の目的で利用しています:
- カスタムエンティティやフィールドにアクセスでき、Salesforce ユーザーは Salesforce のすべてにアクセスできます。
- アトミックおよびバッチ更新操作を作成できます。
- Salesforce データの読み取り、書き込み、更新、削除ができます。
- SOAP API バージョン 30.0 のサポートにより、最新の Salesforce 機能を活用できます。
- SOQL サポートによる複雑なクエリの Salesforce サーバーへのプッシュダウンにより、パフォーマンスの向上を実現できます。
- SQL ストアドプロシージャを使用して、ジョブの作成・取得・中止・削除、添付ファイルやドキュメントのアップロード・ダウンロードなどのアクションを実行できます。
ユーザーは、Salesforce データを以下と頻繁に統合しています:
- 他の ERP、マーケティングオートメーション、HCM など。
- Power BI、Tableau、Looker などのお気に入りのデータツール。
- データベースやデータウェアハウス。
CData ソリューションが Salesforce とどのように連携するかについての詳細は、Salesforce 統合ページをご覧ください。
はじめに
ステップ 1:Dataiku 用の Salesforce 接続を設定
Dataiku から Salesforce への接続は、CData Connect AI のリモート MCP サーバーによって実現されます。Dataiku からSalesforce のデータを操作するには、まず CData Connect AI で Salesforce 接続を作成・設定します。
- Connect AI にログインし、Sources をクリック、次に Add Connection をクリック
- Add Connection パネルから「Salesforce」を選択
-
Salesforce に接続するために必要な認証プロパティを入力します。
Salesforce 接続プロパティの設定方法
埋め込みOAuth(UI でのログイン)による接続設定
それでは、Salesforce への接続について説明していきましょう。最も簡単な方法として、Salesforce にログインする際と同様にUI 上からログインするだけで接続設定が完了します(埋め込みOAuth)。この方法をご利用になる場合は、「Salesforce への接続」をクリックしてください。
標準認証の設定
埋め込みOAuth 以外の方法を利用する場合、以下の3つの認証方式をご利用いただけます。標準的な認証方式では、以下の情報が必要となります。
- ユーザー名
- パスワード
- セキュリティトークン
セキュリティトークンの取得方法については、セキュリティトークン取得手順をご確認ください。
OAuth 認証の設定
ユーザー名とパスワードによる認証がご利用いただけない(避けたい)場合は、OAuth 認証をお使いいただけます。
SSO(シングルサインオン)の設定
最後に、IDプロバイダー経由でのシングルサインオンをご利用になる場合は、以下のプロパティを設定してください。
- SSOProperties
- SSOLoginUrl
- TokenUrl
より詳細な設定手順については、ヘルプドキュメントの「はじめに」セクションをご確認ください。
- Save & Test をクリック
- Permissions タブを開き、ユーザーベースの権限を設定
Personal Access Token を追加
Personal Access Token(PAT)は、Dataiku から Connect AI への接続を認証するために使用されます。きめ細かいアクセス制御を維持するため、統合ごとに個別の PAT を作成することをお勧めします
- Connect AI アプリの右上にある歯車アイコン()をクリックして Settings を開く
- Settings ページで Access Tokens セクションに移動し、 Create PAT をクリック
- PAT にわかりやすい名前を付けて Create をクリック
- トークンが表示されたらコピーして安全な場所に保存してください。再度表示されることはありません
Salesforce 接続の設定と PAT の生成が完了したら、Dataiku から CData MCP Server 経由でSalesforce のデータに接続できます。
ステップ 2:Dataiku とコード環境を準備
Dataiku の専用 Python コード環境が、MCP ベースの通信に必要なランタイムサポートを提供します。Dataiku エージェントを CData Connect AI に接続できるようにするには、Python 環境を作成し、エージェントとサーバー間の対話に必要な MCP クライアント依存関係をインストールします。
- Dataiku Cloud で Code Envs を開く
- Add a code env をクリックして DSS 設定ウィンドウを開く
- DSS で New Python env をクリック。名前を付け(例:MCP_Package)、Python 3.10 を選択(3.10 から 3.13 がサポートされています)
- Packages to install を開き、以下の pip パッケージを追加:
- httpx
- anyio
- langchain-mcp-adapters
- Containerized execution を開き、Container runtime additions の下で Agent tool MCP servers support を選択
- Rebuild env をチェックし、Save and update をクリックしてパッケージをインストール
- Dataiku Cloud に戻り、Overview を開いて Open instance をクリック
- + New project をクリックして Blank project を選択。プロジェクト名を入力
ステップ 3:Dataiku エージェントを作成して MCP サーバーに接続
Dataiku エージェントは、Dataiku ワークスペースと CData MCP Server 間のブリッジとして機能します。この接続を有効にするには、カスタムコードベースエージェントを作成し、設定した Python 環境を割り当て、Connect AI の認証情報を埋め込んで、エージェントがリアルタイムのSalesforce のデータにクエリ・操作できるようにします。
- Agents & GenAI Models に移動し、Create your first agent をクリック
- Code agent を選択し、名前を付け、Agent version で Asynchronous agent without streaming を選択
- 上部のタブから Settings を選択。Code env selection で Default Python code env を作成した環境(例:MCP_Package)に設定
- Agent の Design タブに戻り、以下のコードを貼り付けます。EMAIL と PAT を自分の値に置き換えてください
- 右側のパネルで Quick Test を開く
- JSON コードを貼り付けて Run test をクリック
import os
import base64
from typing import Dict, Any, List
from dataiku.llm.python import BaseLLM
from langchain_mcp_adapters.client import MultiServerMCPClient
# ---------- Persistent MCP client (cached between calls) ----------
_MCP_CLIENT = None
def _get_mcp_client() -> MultiServerMCPClient:
"""Create (or reuse) a MultiServerMCPClient to CData Cloud MCP."""
global _MCP_CLIENT
if _MCP_CLIENT is not None:
return _MCP_CLIENT
# Set creds via env/project variables ideally
EMAIL = os.getenv("CDATA_EMAIL", "YOUR_EMAIL")
PAT = os.getenv("CDATA_PAT", "YOUR_PAT")
BASE_URL = "https://mcp.cloud.cdata.com/mcp"
if not EMAIL or PAT == "YOUR_PAT":
raise ValueError("Set CDATA_EMAIL and CDATA_PAT as env variables or inline in the code.")
token = base64.b64encode(f"{EMAIL}:{PAT}".encode()).decode()
headers = {"Authorization": f"Basic {token}"}
_MCP_CLIENT = MultiServerMCPClient(
connections={
"cdata": {
"transport": "streamable_http",
"url": BASE_URL,
"headers": headers,
}
}
)
return _MCP_CLIENT
def _pick_tool(tools, names: List[str]):
L = [n.lower() for n in names]
return next((t for t in tools if t.name.lower() in L), None)
async def _route(prompt: str) -> str:
"""
Simple intent router:
- 'list connections' / 'list catalogs' -> getCatalogs
- 'sql: ...' or 'query: ...' -> queryData
- otherwise -> help text
"""
client = _get_mcp_client()
tools = await client.get_tools()
p = prompt.strip()
low = p.lower()
# 1) List connections (catalogs)
if "list connections" in low or "list catalogs" in low:
t = _pick_tool(tools, ["getCatalogs", "listCatalogs"])
if not t:
return "No 'getCatalogs' tool found on the MCP server."
res = await t.ainvoke({})
return str(res)[:4000]
# 2) Run SQL
if low.startswith("sql:") or low.startswith("query:"):
sql = p.split(":", 1)[1].strip()
t = _pick_tool(tools, ["queryData", "sqlQuery", "runQuery", "query"])
if not t:
return "No query-capable tool (queryData/sqlQuery) found on the MCP server."
try:
res = await t.ainvoke({"query": sql})
return str(res)[:4000]
except Exception as e:
return f"Query failed: {e}"
# 3) Help
return (
"Connected to CData MCP
"
"Say **'list connections'** to view available sources, or run a SQL like:
"
" sql: SELECT * FROM [Salesforce1].[SYS].[Connections] LIMIT 5
"
"Remember to use bracket quoting for catalog/schema/table names."
)
class MyLLM(BaseLLM):
async def aprocess(self, query: Dict[str, Any], settings: Dict[str, Any], trace: Any):
# Extract last user message from the Quick Test payload
prompt = ""
try:
prompt = (query.get("messages") or [])[-1].get("content", "")
except Exception:
prompt = ""
try:
reply = await _route(prompt)
except Exception as e:
reply = f"Error: {e}"
# The template expects a dict with a 'text' key
return {"text": reply}
クイックテストを実行
{
"messages": [
{
"role": "user",
"content": "list connections"
}
],
"context": {}
}
エージェントとチャット
Chat タブに切り替えて、「List all connections」 のようなプロンプトを試してみてください。チャット出力に接続カタログの一覧が表示されます。
CData Connect AI を入手
AI エージェントから 300 以上の SaaS、ビッグデータ、NoSQL ソースにアクセスするには、CData Connect AI をお試しください。