CData Connect AI を使用して Dataiku と Presto のデータ を統合
Dataiku は、チームがガバナンスされた環境内で機械学習や生成 AI プロジェクトを設計、デプロイ、管理できるコラボレーティブなデータサイエンス・AI プラットフォームです。エージェントと GenAI フレームワークにより、カスタムワークフローとモデルオーケストレーションを通じてデータを分析、生成、操作できるインテリジェントエージェントを構築できます。
Dataiku を CData Connect AI の組み込み MCP(Model Context Protocol)サーバーと統合することで、これらのエージェントはリアルタイムのPresto のデータにセキュアにアクセスできるようになります。この統合により、Dataiku のエージェント実行環境と CData のガバナンスされたエンタープライズ接続レイヤーが橋渡しされ、すべてのクエリや指示が手動エクスポートやステージングなしで、承認されたデータソースに対して安全に実行されます。
この記事では、Connect AI での Presto 接続の設定、MCP サポートを含む Dataiku の Python コード環境の準備、そして Dataiku 内から直接リアルタイムのPresto のデータにクエリ・操作できるエージェントの作成方法を説明します。
Presto データ連携について
CData を使用すれば、Trino および Presto SQL エンジンのライブデータへのアクセスと統合がこれまでになく簡単になります。お客様は CData の接続機能を以下の目的で利用しています:
- Trino v345 以降(旧 PrestoSQL)および Presto v0.242 以降(旧 PrestoDB)のデータにアクセスできます。
- Trino または Presto インスタンスの基盤となるすべてのデータに対して読み取り・書き込みアクセスができます。
- 最大スループットのための最適化されたクエリ生成。
Presto と Trino により、ユーザーは単一のエンドポイントを通じてさまざまな基盤データソースにアクセスできます。CData の接続と組み合わせることで、ユーザーはインスタンスへの純粋な SQL-92 アクセスを取得し、ビジネスデータをデータウェアハウスに統合したり、Power BI や Tableau などのお気に入りのツールからライブデータに直接簡単にアクセスしたりできます。
多くの場合、CData のライブ接続は、ツールで利用可能なネイティブのインポート機能を上回ります。あるお客様は、レポートに必要なデータセットのサイズが大きいため、Power BI を効果的に使用できませんでした。同社が CData Power BI Connector for Presto を導入したところ、DirectQuery 接続モードを使用してリアルタイムでレポートを生成できるようになりました。
はじめに
ステップ 1:Dataiku 用の Presto 接続を設定
Dataiku から Presto への接続は、CData Connect AI のリモート MCP サーバーによって実現されます。Dataiku からPresto のデータを操作するには、まず CData Connect AI で Presto 接続を作成・設定します。
- Connect AI にログインし、Sources をクリック、次に Add Connection をクリック
- Add Connection パネルから「Presto」を選択
-
Presto に接続するために必要な認証プロパティを入力します。
Presto への接続には、まずはServer およびPort を接続プロパティとして設定します。それ以外の追加項目は接続方式によって異なります。
TLS/SSL を有効化するには、UseSSL をTRUE に設定します。
LDAP で認証
LDAP で認証するには、次の接続プロパティを設定します:
- AuthScheme: LDAP に設定。
- User: LDAP で接続するユーザー名。
- Password: LDAP で接続するユーザーのパスワード。
Kerberos 認証
KERBEROS 認証を使う場合には、以下を設定します:
- AuthScheme: KERBEROS に設定。
- KerberosKDC: 接続するユーザーのKerberos Key Distribution Center (KDC) サービス。
- KerberosRealm: 接続するユーザーのKerberos Realm 。
- KerberosSPN: Kerberos Domain Controller のService Principal Name。
- KerberosKeytabFile: Kerberos principals とencrypted keys を含むKeytab file。
- User: Kerberos のユーザー。
- Password: Kerberos で認証するユーザーのパスワード。
- Save & Test をクリック
- Permissions タブを開き、ユーザーベースの権限を設定
Personal Access Token を追加
Personal Access Token(PAT)は、Dataiku から Connect AI への接続を認証するために使用されます。きめ細かいアクセス制御を維持するため、統合ごとに個別の PAT を作成することをお勧めします
- Connect AI アプリの右上にある歯車アイコン()をクリックして Settings を開く
- Settings ページで Access Tokens セクションに移動し、 Create PAT をクリック
- PAT にわかりやすい名前を付けて Create をクリック
- トークンが表示されたらコピーして安全な場所に保存してください。再度表示されることはありません
Presto 接続の設定と PAT の生成が完了したら、Dataiku から CData MCP Server 経由でPresto のデータに接続できます。
ステップ 2:Dataiku とコード環境を準備
Dataiku の専用 Python コード環境が、MCP ベースの通信に必要なランタイムサポートを提供します。Dataiku エージェントを CData Connect AI に接続できるようにするには、Python 環境を作成し、エージェントとサーバー間の対話に必要な MCP クライアント依存関係をインストールします。
- Dataiku Cloud で Code Envs を開く
- Add a code env をクリックして DSS 設定ウィンドウを開く
- DSS で New Python env をクリック。名前を付け(例:MCP_Package)、Python 3.10 を選択(3.10 から 3.13 がサポートされています)
- Packages to install を開き、以下の pip パッケージを追加:
- httpx
- anyio
- langchain-mcp-adapters
- Containerized execution を開き、Container runtime additions の下で Agent tool MCP servers support を選択
- Rebuild env をチェックし、Save and update をクリックしてパッケージをインストール
- Dataiku Cloud に戻り、Overview を開いて Open instance をクリック
- + New project をクリックして Blank project を選択。プロジェクト名を入力
ステップ 3:Dataiku エージェントを作成して MCP サーバーに接続
Dataiku エージェントは、Dataiku ワークスペースと CData MCP Server 間のブリッジとして機能します。この接続を有効にするには、カスタムコードベースエージェントを作成し、設定した Python 環境を割り当て、Connect AI の認証情報を埋め込んで、エージェントがリアルタイムのPresto のデータにクエリ・操作できるようにします。
- Agents & GenAI Models に移動し、Create your first agent をクリック
- Code agent を選択し、名前を付け、Agent version で Asynchronous agent without streaming を選択
- 上部のタブから Settings を選択。Code env selection で Default Python code env を作成した環境(例:MCP_Package)に設定
- Agent の Design タブに戻り、以下のコードを貼り付けます。EMAIL と PAT を自分の値に置き換えてください
- 右側のパネルで Quick Test を開く
- JSON コードを貼り付けて Run test をクリック
import os
import base64
from typing import Dict, Any, List
from dataiku.llm.python import BaseLLM
from langchain_mcp_adapters.client import MultiServerMCPClient
# ---------- Persistent MCP client (cached between calls) ----------
_MCP_CLIENT = None
def _get_mcp_client() -> MultiServerMCPClient:
"""Create (or reuse) a MultiServerMCPClient to CData Cloud MCP."""
global _MCP_CLIENT
if _MCP_CLIENT is not None:
return _MCP_CLIENT
# Set creds via env/project variables ideally
EMAIL = os.getenv("CDATA_EMAIL", "YOUR_EMAIL")
PAT = os.getenv("CDATA_PAT", "YOUR_PAT")
BASE_URL = "https://mcp.cloud.cdata.com/mcp"
if not EMAIL or PAT == "YOUR_PAT":
raise ValueError("Set CDATA_EMAIL and CDATA_PAT as env variables or inline in the code.")
token = base64.b64encode(f"{EMAIL}:{PAT}".encode()).decode()
headers = {"Authorization": f"Basic {token}"}
_MCP_CLIENT = MultiServerMCPClient(
connections={
"cdata": {
"transport": "streamable_http",
"url": BASE_URL,
"headers": headers,
}
}
)
return _MCP_CLIENT
def _pick_tool(tools, names: List[str]):
L = [n.lower() for n in names]
return next((t for t in tools if t.name.lower() in L), None)
async def _route(prompt: str) -> str:
"""
Simple intent router:
- 'list connections' / 'list catalogs' -> getCatalogs
- 'sql: ...' or 'query: ...' -> queryData
- otherwise -> help text
"""
client = _get_mcp_client()
tools = await client.get_tools()
p = prompt.strip()
low = p.lower()
# 1) List connections (catalogs)
if "list connections" in low or "list catalogs" in low:
t = _pick_tool(tools, ["getCatalogs", "listCatalogs"])
if not t:
return "No 'getCatalogs' tool found on the MCP server."
res = await t.ainvoke({})
return str(res)[:4000]
# 2) Run SQL
if low.startswith("sql:") or low.startswith("query:"):
sql = p.split(":", 1)[1].strip()
t = _pick_tool(tools, ["queryData", "sqlQuery", "runQuery", "query"])
if not t:
return "No query-capable tool (queryData/sqlQuery) found on the MCP server."
try:
res = await t.ainvoke({"query": sql})
return str(res)[:4000]
except Exception as e:
return f"Query failed: {e}"
# 3) Help
return (
"Connected to CData MCP
"
"Say **'list connections'** to view available sources, or run a SQL like:
"
" sql: SELECT * FROM [Salesforce1].[SYS].[Connections] LIMIT 5
"
"Remember to use bracket quoting for catalog/schema/table names."
)
class MyLLM(BaseLLM):
async def aprocess(self, query: Dict[str, Any], settings: Dict[str, Any], trace: Any):
# Extract last user message from the Quick Test payload
prompt = ""
try:
prompt = (query.get("messages") or [])[-1].get("content", "")
except Exception:
prompt = ""
try:
reply = await _route(prompt)
except Exception as e:
reply = f"Error: {e}"
# The template expects a dict with a 'text' key
return {"text": reply}
クイックテストを実行
{
"messages": [
{
"role": "user",
"content": "list connections"
}
],
"context": {}
}
エージェントとチャット
Chat タブに切り替えて、「List all connections」 のようなプロンプトを試してみてください。チャット出力に接続カタログの一覧が表示されます。
CData Connect AI を入手
AI エージェントから 300 以上の SaaS、ビッグデータ、NoSQL ソースにアクセスするには、CData Connect AI をお試しください。