CData Connect AI を使用して Dataiku と NetSuite のデータ を統合

Yazhini G
Yazhini G
Technical Marketing Engineer
CData Connect AI のリモート MCP サーバーを活用して、Dataiku エージェントからリアルタイムのNetSuite のデータにセキュアにクエリ・操作できるようにします。

Dataiku は、チームがガバナンスされた環境内で機械学習や生成 AI プロジェクトを設計、デプロイ、管理できるコラボレーティブなデータサイエンス・AI プラットフォームです。エージェントと GenAI フレームワークにより、カスタムワークフローとモデルオーケストレーションを通じてデータを分析、生成、操作できるインテリジェントエージェントを構築できます。

Dataiku を CData Connect AI の組み込み MCP(Model Context Protocol)サーバーと統合することで、これらのエージェントはリアルタイムのNetSuite のデータにセキュアにアクセスできるようになります。この統合により、Dataiku のエージェント実行環境と CData のガバナンスされたエンタープライズ接続レイヤーが橋渡しされ、すべてのクエリや指示が手動エクスポートやステージングなしで、承認されたデータソースに対して安全に実行されます。

この記事では、Connect AI での NetSuite 接続の設定、MCP サポートを含む Dataiku の Python コード環境の準備、そして Dataiku 内から直接リアルタイムのNetSuite のデータにクエリ・操作できるエージェントの作成方法を説明します。

NetSuite データ連携について

CData は、Oracle NetSuite のライブデータにアクセスし、統合するための最も簡単な方法を提供します。お客様は CData の接続機能を以下の目的で使用しています:

  • Standard、CRM、OneWorld を含む、すべてのエディションの NetSuite にアクセスできます。
  • SuiteTalk API(SOAP ベース)のすべてのバージョンと、SQL のように機能し、より簡単なデータクエリと操作を可能にする SuiteQL に接続できます。
  • Saved Searches のサポートにより、事前定義されたレポートとカスタムレポートにアクセスできます。
  • トークンベースおよび OAuth 2.0 で安全に認証でき、あらゆるユースケースで互換性とセキュリティを確保します。
  • SQL ストアドプロシージャを使用して、ファイルのアップロード・ダウンロード、レコードや関連付けのアタッチ・デタッチ、ロールの取得、追加のテーブルやカラム情報の取得、ジョブ結果の取得などの機能的なアクションを実行できます。

お客様は、Power BI や Excel などのお気に入りの分析ツールからライブ NetSuite データにアクセスするために CData ソリューションを使用しています。また、CData Sync を直接使用するか、Azure Data Factory などの他のアプリケーションとの CData の互換性を活用して、NetSuite データを包括的なデータベースやデータウェアハウスに統合しています。CData は、Oracle NetSuite のお客様が NetSuite からデータを取得し、NetSuite にデータをプッシュするアプリを簡単に作成できるよう支援し、他のソースからのデータを NetSuite と統合することを可能にしています。

当社の Oracle NetSuite ソリューションの詳細については、ブログをご覧ください:Drivers in Focus Part 2: Replicating and Consolidating ... NetSuite Accounting Data


はじめに


ステップ 1:Dataiku 用の NetSuite 接続を設定

Dataiku から NetSuite への接続は、CData Connect AI のリモート MCP サーバーによって実現されます。Dataiku からNetSuite のデータを操作するには、まず CData Connect AI で NetSuite 接続を作成・設定します。

  1. Connect AI にログインし、Sources をクリック、次に Add Connection をクリック
  2. Add Connection パネルから「NetSuite」を選択
  3. NetSuite に接続するために必要な認証プロパティを入力します。

    NetSuiteへの接続

    NetSuite では、2種類のAPI でデータにアクセスできます。どちらのAPI を使用するかは、Schema 接続プロパティで以下のいずれかを選択して指定してください。

    • SuiteTalk は、NetSuite との通信に使用されるSOAP ベースの従来から提供されているサービスです。幅広いエンティティをサポートし、INSERT / UPDATE / DELETE の操作も対応しています。ただし、SuiteQL API と比べるとデータの取得速度が劣ります。また、サーバーサイドでのJOIN に対応していないため、これらの処理はCData 製品がクライアントサイドで実行します。
    • SuiteQL は、より新しいAPI です。JOIN、GROUP BY、集計、カラムフィルタリングをサーバーサイドで処理できるため、SuiteTalk よりもはるかに高速にデータを取得できます。ただし、NetSuite データへのアクセスは読み取り専用となります。

    データの取得のみが目的でしたらSuiteQL をお勧めします。データの取得と変更の両方が必要な場合は、SuiteTalk をお選びください。

    NetSuite への認証

    CData 製品では、以下の認証方式がご利用いただけます。

    • トークンベース認証(TBA)はOAuth1.0に似た仕組みです。2020.2以降のSuiteTalk とSuiteQL の両方で利用できます。
    • OAuth 2.0 認証(OAuth 2.0 認可コードグラントフロー)は、SuiteQL でのみご利用いただけます。
    • OAuth JWT 認証は、OAuth2.0 クライアント認証フローの一つで、クライアント認証情報を含むJWT を使用してNetSuite データへのアクセスを要求します。

    トークンベース認証(OAuth1.0)

    トークンベース認証(TBA)は、基本的にOAuth 1.0 の仕組みです。この認証方式はSuiteTalk とSuiteQL の両方でサポートされています。管理者権限をお持ちの方がNetSuite UI 内でOAuthClientId、OAuthClientSecret、OAuthAccessToken、OAuthAccessTokenSecret を直接作成することで設定できます。 NetSuite UI でのトークン作成手順については、ヘルプドキュメントの「はじめに」セクションをご参照ください。

    アクセストークンを作成したら、以下の接続プロパティを設定して接続してみましょう。

    • AuthScheme = Token
    • AccountId = 接続先のアカウント
    • OAuthClientId = アプリケーション作成時に表示されるコンシューマーキー
    • OAuthClientSecret = アプリケーション作成時に表示されるコンシューマーシークレット
    • OAuthAccessToken = アクセストークン作成時のトークンID
    • OAuthAccessTokenSecret = アクセストークン作成時のトークンシークレット

    その他の認証方法については、ヘルプドキュメントの「はじめに」をご確認ください。

  4. Save & Test をクリック
  5. Permissions タブを開き、ユーザーベースの権限を設定

Personal Access Token を追加

Personal Access Token(PAT)は、Dataiku から Connect AI への接続を認証するために使用されます。きめ細かいアクセス制御を維持するため、統合ごとに個別の PAT を作成することをお勧めします

  1. Connect AI アプリの右上にある歯車アイコン()をクリックして Settings を開く
  2. Settings ページで Access Tokens セクションに移動し、 Create PAT をクリック
  3. PAT にわかりやすい名前を付けて Create をクリック
  4. トークンが表示されたらコピーして安全な場所に保存してください。再度表示されることはありません

NetSuite 接続の設定と PAT の生成が完了したら、Dataiku から CData MCP Server 経由でNetSuite のデータに接続できます。

ステップ 2:Dataiku とコード環境を準備

Dataiku の専用 Python コード環境が、MCP ベースの通信に必要なランタイムサポートを提供します。Dataiku エージェントを CData Connect AI に接続できるようにするには、Python 環境を作成し、エージェントとサーバー間の対話に必要な MCP クライアント依存関係をインストールします。

  1. Dataiku CloudCode Envs を開く
  2. Add a code env をクリックして DSS 設定ウィンドウを開く
  3. DSS で New Python env をクリック。名前を付け(例:MCP_Package)、Python 3.10 を選択(3.10 から 3.13 がサポートされています)
  4. Packages to install を開き、以下の pip パッケージを追加:
    • httpx
    • anyio
    • langchain-mcp-adapters
  5. Containerized execution を開き、Container runtime additions の下で Agent tool MCP servers support を選択
  6. Rebuild env をチェックし、Save and update をクリックしてパッケージをインストール
  7. Dataiku Cloud に戻り、Overview を開いて Open instance をクリック
  8. + New project をクリックして Blank project を選択。プロジェクト名を入力

ステップ 3:Dataiku エージェントを作成して MCP サーバーに接続

Dataiku エージェントは、Dataiku ワークスペースと CData MCP Server 間のブリッジとして機能します。この接続を有効にするには、カスタムコードベースエージェントを作成し、設定した Python 環境を割り当て、Connect AI の認証情報を埋め込んで、エージェントがリアルタイムのNetSuite のデータにクエリ・操作できるようにします。

  1. Agents & GenAI Models に移動し、Create your first agent をクリック
  2. Code agent を選択し、名前を付け、Agent version で Asynchronous agent without streaming を選択
  3. 上部のタブから Settings を選択。Code env selectionDefault Python code env を作成した環境(例:MCP_Package)に設定
  4. Agent の Design タブに戻り、以下のコードを貼り付けます。EMAIL と PAT を自分の値に置き換えてください
  5. 
    
    import os
    import base64
    from typing import Dict, Any, List
    
    from dataiku.llm.python import BaseLLM
    from langchain_mcp_adapters.client import MultiServerMCPClient
    
    # ---------- Persistent MCP client (cached between calls) ----------
    _MCP_CLIENT = None
    
    def _get_mcp_client() -> MultiServerMCPClient:
        """Create (or reuse) a MultiServerMCPClient to CData Cloud MCP."""
        global _MCP_CLIENT
        if _MCP_CLIENT is not None:
            return _MCP_CLIENT
    
        # Set creds via env/project variables ideally
        EMAIL = os.getenv("CDATA_EMAIL", "YOUR_EMAIL")
        PAT   = os.getenv("CDATA_PAT",   "YOUR_PAT")
        BASE_URL = "https://mcp.cloud.cdata.com/mcp"
    
        if not EMAIL or PAT == "YOUR_PAT":
            raise ValueError("Set CDATA_EMAIL and CDATA_PAT as env variables or inline in the code.")
    
        token = base64.b64encode(f"{EMAIL}:{PAT}".encode()).decode()
        headers = {"Authorization": f"Basic {token}"}
    
        _MCP_CLIENT = MultiServerMCPClient(
            connections={
                "cdata": {
                    "transport": "streamable_http",
                    "url": BASE_URL,
                    "headers": headers,
                }
            }
        )
        return _MCP_CLIENT
    
    
    def _pick_tool(tools, names: List[str]):
        L = [n.lower() for n in names]
        return next((t for t in tools if t.name.lower() in L), None)
    
    
    async def _route(prompt: str) -> str:
        """
        Simple intent router:
          - 'list connections' / 'list catalogs' -> getCatalogs
          - 'sql: ...' or 'query: ...' -> queryData
          - otherwise -> help text
        """
        client = _get_mcp_client()
        tools = await client.get_tools()
    
        p = prompt.strip()
        low = p.lower()
    
        # 1) List connections (catalogs)
        if "list connections" in low or "list catalogs" in low:
            t = _pick_tool(tools, ["getCatalogs", "listCatalogs"])
            if not t:
                return "No 'getCatalogs' tool found on the MCP server."
            res = await t.ainvoke({})
            return str(res)[:4000]
    
        # 2) Run SQL
        if low.startswith("sql:") or low.startswith("query:"):
            sql = p.split(":", 1)[1].strip()
            t = _pick_tool(tools, ["queryData", "sqlQuery", "runQuery", "query"])
            if not t:
                return "No query-capable tool (queryData/sqlQuery) found on the MCP server."
            try:
                res = await t.ainvoke({"query": sql})
                return str(res)[:4000]
            except Exception as e:
                return f"Query failed: {e}"
    
        # 3) Help
        return (
            "Connected to CData MCP
    
    "
            "Say **'list connections'** to view available sources, or run a SQL like:
    "
            "  sql: SELECT * FROM [Salesforce1].[SYS].[Connections] LIMIT 5
    
    "
            "Remember to use bracket quoting for catalog/schema/table names."
        )
    
    
    class MyLLM(BaseLLM):
        async def aprocess(self, query: Dict[str, Any], settings: Dict[str, Any], trace: Any):
            # Extract last user message from the Quick Test payload
            prompt = ""
            try:
                prompt = (query.get("messages") or [])[-1].get("content", "")
            except Exception:
                prompt = ""
    
            try:
                reply = await _route(prompt)
            except Exception as e:
                reply = f"Error: {e}"
    
            # The template expects a dict with a 'text' key
            return {"text": reply}
    
    

    クイックテストを実行

    1. 右側のパネルで Quick Test を開く
    2. JSON コードを貼り付けて Run test をクリック
    3. 
      {
         "messages": [
            {
               "role": "user",
               "content": "list connections"
            }
         ],
         "context": {}
      }
      
      

    エージェントとチャット

    Chat タブに切り替えて、「List all connections」 のようなプロンプトを試してみてください。チャット出力に接続カタログの一覧が表示されます。

    CData Connect AI を入手

    AI エージェントから 300 以上の SaaS、ビッグデータ、NoSQL ソースにアクセスするには、CData Connect AI をお試しください。

はじめる準備はできましたか?

CData Connect AI の詳細、または無料トライアルにお申し込みください:

無料トライアル お問い合わせ