CData Connect AI を使用して Dataiku と SingleStore のデータ を統合
Dataiku は、チームがガバナンスされた環境内で機械学習や生成 AI プロジェクトを設計、デプロイ、管理できるコラボレーティブなデータサイエンス・AI プラットフォームです。エージェントと GenAI フレームワークにより、カスタムワークフローとモデルオーケストレーションを通じてデータを分析、生成、操作できるインテリジェントエージェントを構築できます。
Dataiku を CData Connect AI の組み込み MCP(Model Context Protocol)サーバーと統合することで、これらのエージェントはリアルタイムのSingleStore のデータにセキュアにアクセスできるようになります。この統合により、Dataiku のエージェント実行環境と CData のガバナンスされたエンタープライズ接続レイヤーが橋渡しされ、すべてのクエリや指示が手動エクスポートやステージングなしで、承認されたデータソースに対して安全に実行されます。
この記事では、Connect AI での SingleStore 接続の設定、MCP サポートを含む Dataiku の Python コード環境の準備、そして Dataiku 内から直接リアルタイムのSingleStore のデータにクエリ・操作できるエージェントの作成方法を説明します。
ステップ 1:Dataiku 用の SingleStore 接続を設定
Dataiku から SingleStore への接続は、CData Connect AI のリモート MCP サーバーによって実現されます。Dataiku からSingleStore のデータを操作するには、まず CData Connect AI で SingleStore 接続を作成・設定します。
- Connect AI にログインし、Sources をクリック、次に Add Connection をクリック
- Add Connection パネルから「SingleStore」を選択
-
SingleStore に接続するために必要な認証プロパティを入力します。
データに接続するには、次の接続プロパティが必要です。
- Server:SingleStore データベースをホスティングしているサーバーのホスト名またはIP アドレス。
- Port:SingleStore データベースをホスティングしているサーバーのポート。
また、オプションで以下を設定することもできます。
- SingleStore:SingleStore Server に接続する場合のデフォルトデータベース。設定されていない場合、すべてのデータベースのテーブルが返されます。
標準認証
標準認証で認証するには、次を設定します。
- User:SingleStore サーバーに認証する際に使われるユーザー。
- Password:SingleStore サーバーに認証する際に使われるパスワード。
統合セキュリティを使用した接続
標準のユーザー名とパスワードを提供する代わりに、Windows 認証を介して信頼されたされたユーザーをサーバーに認証できます。
SSL 認証
SSL 認証を活用してセキュアなセッションを介してSingleStore データに接続できます。次の接続プロパティを設定し、データに接続します。
- SSLClientCert:クライアント証明書のための証明書ストア名に設定。クライアントとサーバーの両方のマシンでトラストストアとキーストアが保持される2-way SSL の場合に使用されます。
- SSLClientCertPassword:クライアント証明書ストアがパスワードで保護されている場合、この値をストアのパスワードに設定します。
- SSLClientCertSubject:TLS/SSL クライアント証明書のサブジェクト。ストア内の証明書を検索するために使用されます。
- SSLClientCertType:クライアントストアの証明書タイプ。
- SSLServerCert:サーバーが受け入れ可能な証明書。
SSH 認証
SSH を使用して、セキュアにリモートマシンにログインできます。SingleStore データにSSH 経由でアクセスするには、次の接続プロパティを設定します。
- SSHClientCert:クライアント証明書のための証明書ストア名に設定。
- SSHClientCertPassword:クライアント証明書ストアがパスワードで保護されている場合、この値をストアのパスワードに設定します。
- SSHClientCertSubject:TLS/SSL クライアント証明書のサブジェクト。ストア内の証明書を検索するために使用されます。
- SSHClientCertType:クライアントストアの証明書タイプ。
- SSHPassword:SSH サーバーに認証するためのパスワード。
- SSHPort:SSH 操作に使用するポート。
- SSHServer:認証しようとしているSSH 認証サーバー。
- SSHServerFingerPrint:接続先のホストの検証に使用するSSH サーバーのフィンガープリント。
- SSHUser:SSH サーバーに認証するためのユーザー名。
- Save & Test をクリック
- Permissions タブを開き、ユーザーベースの権限を設定
Personal Access Token を追加
Personal Access Token(PAT)は、Dataiku から Connect AI への接続を認証するために使用されます。きめ細かいアクセス制御を維持するため、統合ごとに個別の PAT を作成することをお勧めします
- Connect AI アプリの右上にある歯車アイコン()をクリックして Settings を開く
- Settings ページで Access Tokens セクションに移動し、 Create PAT をクリック
- PAT にわかりやすい名前を付けて Create をクリック
- トークンが表示されたらコピーして安全な場所に保存してください。再度表示されることはありません
SingleStore 接続の設定と PAT の生成が完了したら、Dataiku から CData MCP Server 経由でSingleStore のデータに接続できます。
ステップ 2:Dataiku とコード環境を準備
Dataiku の専用 Python コード環境が、MCP ベースの通信に必要なランタイムサポートを提供します。Dataiku エージェントを CData Connect AI に接続できるようにするには、Python 環境を作成し、エージェントとサーバー間の対話に必要な MCP クライアント依存関係をインストールします。
- Dataiku Cloud で Code Envs を開く
- Add a code env をクリックして DSS 設定ウィンドウを開く
- DSS で New Python env をクリック。名前を付け(例:MCP_Package)、Python 3.10 を選択(3.10 から 3.13 がサポートされています)
- Packages to install を開き、以下の pip パッケージを追加:
- httpx
- anyio
- langchain-mcp-adapters
- Containerized execution を開き、Container runtime additions の下で Agent tool MCP servers support を選択
- Rebuild env をチェックし、Save and update をクリックしてパッケージをインストール
- Dataiku Cloud に戻り、Overview を開いて Open instance をクリック
- + New project をクリックして Blank project を選択。プロジェクト名を入力
ステップ 3:Dataiku エージェントを作成して MCP サーバーに接続
Dataiku エージェントは、Dataiku ワークスペースと CData MCP Server 間のブリッジとして機能します。この接続を有効にするには、カスタムコードベースエージェントを作成し、設定した Python 環境を割り当て、Connect AI の認証情報を埋め込んで、エージェントがリアルタイムのSingleStore のデータにクエリ・操作できるようにします。
- Agents & GenAI Models に移動し、Create your first agent をクリック
- Code agent を選択し、名前を付け、Agent version で Asynchronous agent without streaming を選択
- 上部のタブから Settings を選択。Code env selection で Default Python code env を作成した環境(例:MCP_Package)に設定
- Agent の Design タブに戻り、以下のコードを貼り付けます。EMAIL と PAT を自分の値に置き換えてください
- 右側のパネルで Quick Test を開く
- JSON コードを貼り付けて Run test をクリック
import os
import base64
from typing import Dict, Any, List
from dataiku.llm.python import BaseLLM
from langchain_mcp_adapters.client import MultiServerMCPClient
# ---------- Persistent MCP client (cached between calls) ----------
_MCP_CLIENT = None
def _get_mcp_client() -> MultiServerMCPClient:
"""Create (or reuse) a MultiServerMCPClient to CData Cloud MCP."""
global _MCP_CLIENT
if _MCP_CLIENT is not None:
return _MCP_CLIENT
# Set creds via env/project variables ideally
EMAIL = os.getenv("CDATA_EMAIL", "YOUR_EMAIL")
PAT = os.getenv("CDATA_PAT", "YOUR_PAT")
BASE_URL = "https://mcp.cloud.cdata.com/mcp"
if not EMAIL or PAT == "YOUR_PAT":
raise ValueError("Set CDATA_EMAIL and CDATA_PAT as env variables or inline in the code.")
token = base64.b64encode(f"{EMAIL}:{PAT}".encode()).decode()
headers = {"Authorization": f"Basic {token}"}
_MCP_CLIENT = MultiServerMCPClient(
connections={
"cdata": {
"transport": "streamable_http",
"url": BASE_URL,
"headers": headers,
}
}
)
return _MCP_CLIENT
def _pick_tool(tools, names: List[str]):
L = [n.lower() for n in names]
return next((t for t in tools if t.name.lower() in L), None)
async def _route(prompt: str) -> str:
"""
Simple intent router:
- 'list connections' / 'list catalogs' -> getCatalogs
- 'sql: ...' or 'query: ...' -> queryData
- otherwise -> help text
"""
client = _get_mcp_client()
tools = await client.get_tools()
p = prompt.strip()
low = p.lower()
# 1) List connections (catalogs)
if "list connections" in low or "list catalogs" in low:
t = _pick_tool(tools, ["getCatalogs", "listCatalogs"])
if not t:
return "No 'getCatalogs' tool found on the MCP server."
res = await t.ainvoke({})
return str(res)[:4000]
# 2) Run SQL
if low.startswith("sql:") or low.startswith("query:"):
sql = p.split(":", 1)[1].strip()
t = _pick_tool(tools, ["queryData", "sqlQuery", "runQuery", "query"])
if not t:
return "No query-capable tool (queryData/sqlQuery) found on the MCP server."
try:
res = await t.ainvoke({"query": sql})
return str(res)[:4000]
except Exception as e:
return f"Query failed: {e}"
# 3) Help
return (
"Connected to CData MCP
"
"Say **'list connections'** to view available sources, or run a SQL like:
"
" sql: SELECT * FROM [Salesforce1].[SYS].[Connections] LIMIT 5
"
"Remember to use bracket quoting for catalog/schema/table names."
)
class MyLLM(BaseLLM):
async def aprocess(self, query: Dict[str, Any], settings: Dict[str, Any], trace: Any):
# Extract last user message from the Quick Test payload
prompt = ""
try:
prompt = (query.get("messages") or [])[-1].get("content", "")
except Exception:
prompt = ""
try:
reply = await _route(prompt)
except Exception as e:
reply = f"Error: {e}"
# The template expects a dict with a 'text' key
return {"text": reply}
クイックテストを実行
{
"messages": [
{
"role": "user",
"content": "list connections"
}
],
"context": {}
}
エージェントとチャット
Chat タブに切り替えて、「List all connections」 のようなプロンプトを試してみてください。チャット出力に接続カタログの一覧が表示されます。
CData Connect AI を入手
AI エージェントから 300 以上の SaaS、ビッグデータ、NoSQL ソースにアクセスするには、CData Connect AI をお試しください。