CData Connect AI MCP Server でPipedream からNetSuite のデータと連携しよう!

Somya Sharma
Somya Sharma
Technical Marketing Engineer
CData Connect AI のリモート MCP Server で Pipedream から NetSuite へのリアルタイムアクセスを実現。ワークフロー自動化でデータの読み取りとアクション実行を簡単に。

Pipedream は、API の接続、タスクの自動化、サーバーレス関数を使ったイベント駆動型ワークフローの構築をサポートする、クラウドベースのワークフロー自動化プラットフォームです。CData Connect AI のリモート MCP と組み合わせることで、データレプリケーションを行わずに、Pipedream から自然言語を使って NetSuite のデータとリアルタイムでやり取りできるようになります。

CData Connect AI は、NetSuite に接続するための専用クラウド間インターフェースを提供します。CData Connect AI Remote MCP Server により、Pipedream と NetSuite の間でセキュアな通信が可能になります。LLM がデータソースをインテリジェントに検出し、SQL クエリを動的に生成することで、Pipedream のワークフローから NetSuite に質問してデータを取得できます。

この記事では、Pipedream で自然言語によるデータクエリワークフローを構築し、NetSuite のデータを会話形式で探索する方法をご紹介します。ここで紹介する接続の原則は、あらゆる Pipedream ワークフローに適用できます。Connect AI を使用すれば、NetSuite に加えて、数百の他のデータソースにもアクセスできるワークフローやエージェントを構築できます。

NetSuite データ連携について

CData は、Oracle NetSuite のライブデータにアクセスし、統合するための最も簡単な方法を提供します。お客様は CData の接続機能を以下の目的で使用しています:

  • Standard、CRM、OneWorld を含む、すべてのエディションの NetSuite にアクセスできます。
  • SuiteTalk API(SOAP ベース)のすべてのバージョンと、SQL のように機能し、より簡単なデータクエリと操作を可能にする SuiteQL に接続できます。
  • Saved Searches のサポートにより、事前定義されたレポートとカスタムレポートにアクセスできます。
  • トークンベースおよび OAuth 2.0 で安全に認証でき、あらゆるユースケースで互換性とセキュリティを確保します。
  • SQL ストアドプロシージャを使用して、ファイルのアップロード・ダウンロード、レコードや関連付けのアタッチ・デタッチ、ロールの取得、追加のテーブルやカラム情報の取得、ジョブ結果の取得などの機能的なアクションを実行できます。

お客様は、Power BI や Excel などのお気に入りの分析ツールからライブ NetSuite データにアクセスするために CData ソリューションを使用しています。また、CData Sync を直接使用するか、Azure Data Factory などの他のアプリケーションとの CData の互換性を活用して、NetSuite データを包括的なデータベースやデータウェアハウスに統合しています。CData は、Oracle NetSuite のお客様が NetSuite からデータを取得し、NetSuite にデータをプッシュするアプリを簡単に作成できるよう支援し、他のソースからのデータを NetSuite と統合することを可能にしています。

当社の Oracle NetSuite ソリューションの詳細については、ブログをご覧ください:Drivers in Focus Part 2: Replicating and Consolidating ... NetSuite Accounting Data


はじめに


前提条件

  1. CData Connect AI アカウント(NetSuite など、少なくとも1つのアクティブな接続が必要)
  2. Pipedream アカウント
  3. OpenAI アカウント(API キー付き)
  4. CData Connect AI の認証情報:
    • メールアドレス(Basic 認証のユーザー名として使用)
    • パーソナルアクセストークン(PAT)(CData Connect AI の設定ページから生成)

ステップ 1:Pipedream 用の NetSuite 接続を設定する

Pipedream から NetSuite への接続は、CData Connect AI のリモート MCP を通じて実現されます。Pipedream から NetSuite とやり取りするために、まず CData Connect AI で NetSuite 接続を作成・設定していきましょう。

  1. Connect AI にログインし、「Sources」をクリックして、「Add Connection」をクリックします
  2. 「Add Connection」パネルから「NetSuite」を選択します
  3. NetSuite に接続するために必要な認証プロパティを入力します。

    NetSuiteへの接続

    NetSuite では、2種類のAPI でデータにアクセスできます。どちらのAPI を使用するかは、Schema 接続プロパティで以下のいずれかを選択して指定してください。

    • SuiteTalk は、NetSuite との通信に使用されるSOAP ベースの従来から提供されているサービスです。幅広いエンティティをサポートし、INSERT / UPDATE / DELETE の操作も対応しています。ただし、SuiteQL API と比べるとデータの取得速度が劣ります。また、サーバーサイドでのJOIN に対応していないため、これらの処理はCData 製品がクライアントサイドで実行します。
    • SuiteQL は、より新しいAPI です。JOIN、GROUP BY、集計、カラムフィルタリングをサーバーサイドで処理できるため、SuiteTalk よりもはるかに高速にデータを取得できます。ただし、NetSuite データへのアクセスは読み取り専用となります。

    データの取得のみが目的でしたらSuiteQL をお勧めします。データの取得と変更の両方が必要な場合は、SuiteTalk をお選びください。

    NetSuite への認証

    CData 製品では、以下の認証方式がご利用いただけます。

    • トークンベース認証(TBA)はOAuth1.0に似た仕組みです。2020.2以降のSuiteTalk とSuiteQL の両方で利用できます。
    • OAuth 2.0 認証(OAuth 2.0 認可コードグラントフロー)は、SuiteQL でのみご利用いただけます。
    • OAuth JWT 認証は、OAuth2.0 クライアント認証フローの一つで、クライアント認証情報を含むJWT を使用してNetSuite データへのアクセスを要求します。

    トークンベース認証(OAuth1.0)

    トークンベース認証(TBA)は、基本的にOAuth 1.0 の仕組みです。この認証方式はSuiteTalk とSuiteQL の両方でサポートされています。管理者権限をお持ちの方がNetSuite UI 内でOAuthClientId、OAuthClientSecret、OAuthAccessToken、OAuthAccessTokenSecret を直接作成することで設定できます。 NetSuite UI でのトークン作成手順については、ヘルプドキュメントの「はじめに」セクションをご参照ください。

    アクセストークンを作成したら、以下の接続プロパティを設定して接続してみましょう。

    • AuthScheme = Token
    • AccountId = 接続先のアカウント
    • OAuthClientId = アプリケーション作成時に表示されるコンシューマーキー
    • OAuthClientSecret = アプリケーション作成時に表示されるコンシューマーシークレット
    • OAuthAccessToken = アクセストークン作成時のトークンID
    • OAuthAccessTokenSecret = アクセストークン作成時のトークンシークレット

    その他の認証方法については、ヘルプドキュメントの「はじめに」をご確認ください。

  4. 「Save & Test」をクリックします
  5. 「Add NetSuite Connection」ページの「Permissions」タブに移動し、ユーザーベースの権限を更新します。

パーソナルアクセストークンの追加

パーソナルアクセストークン(PAT)は、Pipedream から Connect AI への接続を認証するために使用されます。アクセス制御の粒度を維持するために、サービスごとに個別の PAT を作成することをおすすめします。

  1. Connect AI アプリの右上にある歯車アイコン()をクリックして、設定ページを開きます。
  2. 設定ページの「Access Tokens」セクションに移動し、「Create PAT」をクリックします。
  3. PAT に名前を付けて「Create」をクリックします。
  4. パーソナルアクセストークンは作成時にのみ表示されますので、必ずコピーして安全な場所に保管してください。

これで接続の設定と PAT の生成が完了しました。Pipedream から NetSuite に接続する準備が整いました。


ステップ 2:Pipedream で環境変数を設定する

認証情報を Pipedream の環境変数として安全に保存します。

  1. Pipedream で Settings を開き、Environment Variables に移動します
  2. New Variable をクリックして、以下の変数を追加します:
  3. 変数名
    CDATA_EMAIL CData Connect AI のログイン用メールアドレス
    CDATA_PAT CData パーソナルアクセストークン
    OPENAI_API_KEY OpenAI API キー

ステップ 3:Pipedream ワークフローを作成する

3.1 HTTP トリガーの設定

  1. Pipedream で新しいワークフローを作成します
  2. トリガーとして HTTP / Webhook を選択します
  3. HTTP Response"Return a custom response from your workflow" に設定します

3.2 LLM ステップの追加

Node.js コードステップを追加し、名前を LLM とします。このステップでは、受信したリクエストから自然言語クエリを抽出します。

ステップのデフォルトコードを以下に置き換えてください:

import OpenAI from "openai";

export default defineComponent({
  async run({ steps }) {
    if (steps.trigger.event.method === "OPTIONS") {
      return { userQuery: null, isOptions: true };
    }

    const body = steps.trigger.event.body;
    const parsed = typeof body === "string" ? JSON.parse(body) : body;
    const userQuery = parsed?.query;

    console.log("USER QUERY:", userQuery);
    if (!userQuery) throw new Error("No query found in request body");

    return { userQuery };
  }
});

3.3 MCP ステップの追加

Node.js コードステップを追加し、名前を MCP とします。このステップでは、エージェント型の MCP フロー全体を実装します。利用可能な接続をすべて自動検出し、質問に最も関連する接続を選択し、スキーマとテーブルを動的に検出し、LLM を使って SQL クエリを生成し、NetSuite のデータに対して実行します。

このステップでは、CData Connect AI の以下の MCP ツールを順番に使用します:

MCP ツール 用途
getCatalogs CData Connect AI から利用可能な接続をすべて取得します
getSchemas 選択した接続のデータベーススキーマを取得します
getTables 選択したスキーマのすべてのテーブルとビューを取得します
queryData 生成された SQL クエリを実行し、結果を返します

ステップのデフォルトコードを以下に置き換えてください:


import fetch from "node-fetch";
import OpenAI from "openai";

export default defineComponent({
  async run({ steps }) {
    const email = process.env.CDATA_EMAIL;
    const pat = process.env.CDATA_PAT;
    const credentials = email + ":" + pat;
    const auth = Buffer.from(credentials).toString("base64");
    const llmOutput = steps.LLM;
    const userQuery = llmOutput.return_value.userQuery; // In Pipedream replace with: steps.LLM.$return_value.userQuery
    const MCP_URL = "https://mcp.cloud.cdata.com/mcp";
    const NL = String.fromCharCode(10);
    const CRNL = String.fromCharCode(13) + String.fromCharCode(10);

    const headers = {
      "Content-Type": "application/json",
      "Accept": "application/json, text/event-stream",
      "Authorization": "Basic " + auth
    };

    function parseSSE(raw) {
      try {
        const lines = raw.split(NL);
        for (let i = 0; i < lines.length; i++) {
          const line = lines.at(i);
          const trimmed = line.trim();
          if (trimmed.indexOf("data:") === 0) {
            const jsonStr = trimmed.slice(5).trim();
            if (jsonStr) {
              const json = JSON.parse(jsonStr);
              const result = json && json.result;
              const content = result && result.content;
              if (Array.isArray(content)) {
                return {
                  parsed: content.map(function(c) { return c.text || ""; }).join(NL),
                  isError: (result && result.isError) || false,
                  full: json
                };
              }
            }
          }
        }
      } catch (e) {
        console.log("SSE parse error:", e.message);
      }
      return { parsed: raw, isError: false, full: null };
    }

    function parseCSV(text) {
      let clean = text || "";
      if (clean.charAt(0) === '"' && clean.charAt(clean.length - 1) === '"') {
        clean = clean.slice(1, -1);
      }
      const ESC_CRNL = String.fromCharCode(92) + "r" + String.fromCharCode(92) + "n";
      const ESC_QUOTE = String.fromCharCode(92) + '"';
      const ESC_SLASH = String.fromCharCode(92) + String.fromCharCode(92);
      const SINGLE_SLASH = String.fromCharCode(92);
      clean = clean.split(ESC_CRNL).join(CRNL).split(ESC_QUOTE).join('"').split(ESC_SLASH).join(SINGLE_SLASH);
      const lines = clean.split(CRNL).filter(function(l) { return l.trim(); });
      return lines.slice(1).map(function(l) { return l.split(",").at(0).trim(); }).filter(Boolean);
    }

    async function initSession() {
      const res = await fetch(MCP_URL, {
        method: "POST",
        headers: headers,
        body: JSON.stringify({
          jsonrpc: "2.0",
          id: 1,
          method: "initialize",
          params: {
            protocolVersion: "2024-11-05",
            capabilities: {},
            clientInfo: { name: "pipedream", version: "1.0" }
          }
        })
      });
      return res.headers.get("mcp-session-id");
    }

    async function callMCP(id, method, args, sessionId) {
      const reqHeaders = Object.assign({}, headers);
      if (sessionId) {
        Object.assign(reqHeaders, { "mcp-session-id": sessionId });
      }
      const res = await fetch(MCP_URL, {
        method: "POST",
        headers: reqHeaders,
        body: JSON.stringify({
          jsonrpc: "2.0",
          id: id,
          method: "tools/call",
          params: { name: method, arguments: args }
        })
      });
      const raw = await res.text();
      const result = parseSSE(raw);
      result.raw = raw;
      return result;
    }

    const client = new OpenAI({ apiKey: process.env.OPENAI_API_KEY });
    const completions = client.chat.completions;

    const session1 = await initSession();
    const catalogsResult = await callMCP(2, "getCatalogs", {}, session1);
    const catalogs = parseCSV(catalogsResult.parsed);

    const systemMsg1 = "You are a data routing expert. Pick the MOST relevant connection name from the list. Return ONLY the connection name. Available connections: " + catalogs.join(", ");
    const connectionResponse = await completions.create({
      model: "gpt-4o-mini",
      messages: new Array(
        { role: "system", content: systemMsg1 },
        { role: "user", content: userQuery }
      )
    });
    const connectionName = connectionResponse.choices.at(0).message.content.trim();

    const session2 = await initSession();
    const schemasResult = await callMCP(2, "getSchemas", {
      connectionName: connectionName,
      catalogName: connectionName
    }, session2);
    const schemas = parseCSV(schemasResult.parsed);
    const schemaName = schemas.at(0) || "REST";

    const session3 = await initSession();
    const tablesResult = await callMCP(2, "getTables", {
      connectionName: connectionName,
      catalogName: connectionName,
      schemaName: schemaName
    }, session3);
    const tableNames = parseCSV(tablesResult.parsed);

    const queryLower = userQuery.toLowerCase();
    const isListTablesQuery =
      queryLower.indexOf("list") !== -1 ||
      queryLower.indexOf("what tables") !== -1 ||
      queryLower.indexOf("show tables") !== -1;

    if (isListTablesQuery) {
      return {
        success: true,
        connection: connectionName,
        message: "Available tables in " + connectionName + "." + schemaName,
        tables: tableNames
      };
    }

    const tableList = tableNames.map(function(t) {
      return connectionName + "." + schemaName + "." + t;
    }).join(", ");

    const systemMsg2 = "You are a SQL expert. Generate SQL for CData. Use format: connectionName.schemaName.TableName. Available tables: " + tableList + ". Return ONLY SQL. No markdown. No brackets.";
    const sqlResponse = await completions.create({
      model: "gpt-4o-mini",
      messages: new Array(
        { role: "system", content: systemMsg2 },
        { role: "user", content: userQuery }
      )
    });
    const sql = sqlResponse.choices.at(0).message.content.trim();

    if (!sql) { return { error: "LLM returned empty SQL" }; }

    const session4 = await initSession();
    const queryResult = await callMCP(2, "queryData", {
      query: sql,
      connectionName: connectionName
    }, session4);

    if (queryResult.full) {
      const content = queryResult.full.result && queryResult.full.result.content;
      if (Array.isArray(content)) {
        try {
          const parsed = JSON.parse(content.at(0).text);
          const results = parsed.results && parsed.results.at(0);
          return {
            sql: sql,
            connection: connectionName,
            data: (results && results.rows) || new Array(),
            schema: (results && results.schema) || new Array(),
            success: true
          };
        } catch (e) {
          return { sql: sql, connection: connectionName, raw: content.at(0).text, success: true };
        }
      }
    }
    return { sql: sql, connection: connectionName, raw: queryResult.raw };
  }
});

注意: Pipedream にコードを貼り付ける際は、該当行のコメントに記載されているとおり、llmOutput.return_value.userQuerysteps.LLM.$return_value.userQuery に置き換えてください。

3.4 レスポンスステップの設定

  1. Return HTTP Response ステップを追加し、名前を Response とします
  2. Response Status Code を 200 に設定します
  3. Response Body{{steps.mcp.$return_value}} に設定します
  4. 以下の Response Headers を追加します。「Response Headers」をクリックし、 をクリックして追加してください:
  5. Key Value
    Access-Control-Allow-Origin *
    Access-Control-Allow-Methods POST, OPTIONS
    Access-Control-Allow-Headers Content-Type

ステップ 4:ワークフローをテストして NetSuite のデータを操作する

トリガーにテストイベントを設定する

  1. ワークフロー内の trigger ステップをクリックします
  2. Generate Test Event をクリックします
  3. イベントのボディを以下のように編集します:
  4. {
      "query": "list all tables"
    }
    

ワークフロー全体を実行する

  1. トリガーステップの下部にある Test workflow をクリックします
  2. Pipedream がテストイベントを使用して、すべてのステップを順番に実行します
  3. 各ステップが正常に完了すると、緑色に変わります

各ステップの結果を確認する

テスト実行が完了したら、各ステップのタブをクリックして Exports タブで出力を確認しましょう:

ステップ Exports で確認するポイント
trigger body.query - クエリが正しく受信されたことを確認
LLM userQuery - クエリが正しく抽出されたことを確認
MCP connection, sql, data, schema - データが取得されたことを確認
Response $response.body - 最終的な JSON レスポンス

各ステップ内の Logs タブには、生成された SQL、選択された接続、MCP の生レスポンスなどの詳細な出力が表示されます。

注意: Response ステップの Exports タブには、{ "success": true }"status 200" のようなサマリーのみが表示されます。これはワークフローが正常に実行されたことを示していますが、完全なデータは表示されません。

データ行、SQL、スキーマを含む完全な出力を確認するには、MCP ステップのタブをクリックし、Exports タブを確認してください。$return_value を展開すると、完全なレスポンスが表示されます:

このワークフローは以下を自動的に実行します:

  1. CData の接続をすべて検出
  2. 質問に最も関連する接続を選択
  3. スキーマとテーブルを動的に検出
  4. 適切な SQL クエリを生成して実行
  5. 結果を返却

仕組みについて

この連携では、CData Connect AI の以下の MCP ツールを順番に使用します:

MCP ツール 用途
getCatalogs CData Connect AI から利用可能な接続をすべて取得します
getSchemas 特定の接続のデータベーススキーマを取得します
getTables 特定のスキーマのすべてのテーブルとビューを取得します
queryData SQL クエリを実行し、結果を返します

OpenAI の LLM が、自然言語の質問と CData MCP ツールの間のインテリジェントレイヤーとして機能し、適切な接続の選択、データ構造の検出、正確な SQL クエリの生成を自動で行います。

CData Connect AI でビジネスシステムのデータ活用を今すぐスタート

Pipedream と CData Connect AI を組み合わせることで、自然言語クエリがエンタープライズシステム全体のリアルタイムデータ操作に自動変換される、インテリジェントな AI 駆動ワークフローを実現できます。ETL パイプライン、データ同期ジョブ、カスタム連携ロジックは不要です。このアプローチにより、ガバナンスの強化、運用コストの削減、AI ワークフローからのより迅速で正確なレスポンスが可能になります。

いかがでしたか?Pipedream から NetSuite へのデータ接続が簡単に完了したのではないでしょうか。業務に使えそう、と感じてくださった方は、14日間の無償トライアルで AI ツールからビジネスシステムへのリアルタイムデータ接続をぜひお試しください。

はじめる準備はできましたか?

CData Connect AI の詳細、または無料トライアルにお申し込みください:

無料トライアル