CData Connect AI MCP Server でPipedream からSage Intacct のデータと連携しよう!

Somya Sharma
Somya Sharma
Technical Marketing Engineer
CData Connect AI のリモート MCP Server で Pipedream から Sage Intacct へのリアルタイムアクセスを実現。ワークフロー自動化でデータの読み取りとアクション実行を簡単に。

Pipedream は、API の接続、タスクの自動化、サーバーレス関数を使ったイベント駆動型ワークフローの構築をサポートする、クラウドベースのワークフロー自動化プラットフォームです。CData Connect AI のリモート MCP と組み合わせることで、データレプリケーションを行わずに、Pipedream から自然言語を使って Sage Intacct のデータとリアルタイムでやり取りできるようになります。

CData Connect AI は、Sage Intacct に接続するための専用クラウド間インターフェースを提供します。CData Connect AI Remote MCP Server により、Pipedream と Sage Intacct の間でセキュアな通信が可能になります。LLM がデータソースをインテリジェントに検出し、SQL クエリを動的に生成することで、Pipedream のワークフローから Sage Intacct に質問してデータを取得できます。

この記事では、Pipedream で自然言語によるデータクエリワークフローを構築し、Sage Intacct のデータを会話形式で探索する方法をご紹介します。ここで紹介する接続の原則は、あらゆる Pipedream ワークフローに適用できます。Connect AI を使用すれば、Sage Intacct に加えて、数百の他のデータソースにもアクセスできるワークフローやエージェントを構築できます。

Sage Intacct データ連携について

CData は、Sage Intacct のライブデータにアクセスし、統合するための最も簡単な方法を提供します。お客様は CData の接続機能を以下の目的で使用しています:

  • API の更新や変更を気にすることなく、Sage Intacct にアクセスできます。
  • 追加の構成手順なしで、Sage Intacct のカスタムオブジェクトやフィールドにアクセスできます。
  • Basic 認証による組み込み Web サービス認証情報を使用して、Sage Intacct にデータを書き戻すことができます。
  • SQL ストアドプロシージャを使用して、ベンダーの承認・却下、エンゲージメントの挿入、カスタムオブジェクトやフィールドの作成・削除などの機能的な操作を実行できます。

ユーザーは、Tableau、Power BI、Excel などの分析ツールと Sage Intacct を統合し、当社のツールを活用して Sage Intacct データをデータベースやデータウェアハウスにレプリケートしています。

他のお客様が CData の Sage Intacct ソリューションをどのように使用しているかについては、ブログをご覧ください:Drivers in Focus: Accounting Connectivity


はじめに


前提条件

  1. CData Connect AI アカウント(Sage Intacct など、少なくとも1つのアクティブな接続が必要)
  2. Pipedream アカウント
  3. OpenAI アカウント(API キー付き)
  4. CData Connect AI の認証情報:
    • メールアドレス(Basic 認証のユーザー名として使用)
    • パーソナルアクセストークン(PAT)(CData Connect AI の設定ページから生成)

ステップ 1:Pipedream 用の Sage Intacct 接続を設定する

Pipedream から Sage Intacct への接続は、CData Connect AI のリモート MCP を通じて実現されます。Pipedream から Sage Intacct とやり取りするために、まず CData Connect AI で Sage Intacct 接続を作成・設定していきましょう。

  1. Connect AI にログインし、「Sources」をクリックして、「Add Connection」をクリックします
  2. 「Add Connection」パネルから「Sage Intacct」を選択します
  3. Sage Intacct に接続するために必要な認証プロパティを入力します。

    Sage Intacct 接続プロパティの取得・設定方法

    独自のWeb サービスクレデンシャル、埋め込みクレデンシャル(Basic 認証)、またはOkta クレデンシャルのいずれかを使用して、Sage Intacct への接続を確立できます。

    Sage Intacct への認証

    Sage Intacct は2種類の認証をサポートします。Basic およびOkta です。選択した認証方法に関連するプロパティを設定して、接続を構成します。

    Basic 認証

    Basic 認証スキームでは、埋め込みクレデンシャルを使用してデータの読み書きが可能です。オプションとして、独自のWeb サービスクレデンシャルを指定することもできます。

    Basic 認証を使用して認証を行うには、以下のプロパティを設定します。

    • AuthSchemeBasic
    • CompanyID:Sage Intacct にログインする際に会社を識別するために使用するID。
    • User:Sage Intacct へのログインに使用するログイン名。
    • Password:ログインクレデンシャル用のパスワード。
    • (オプション)SenderID およびSenderPassword:Web サービスのSender ID およびパスワード(独自のWeb サービスクレデンシャルを使用している場合のみ)。

    独自のWeb サービスクレデンシャルではなく、埋め込みクレデンシャルを使用する場合は、以下を実行する必要があります:

    • Web サービスダッシュボードで、会社 -> 会社情報 -> セキュリティタブに移動します。
    • Web サービス認証に"CData" を追加します。これは大文字・小文字が区別されます。これを行うには、会社 -> 会社情報(新しいUI では、設定 -> 会社)-> セキュリティ -> Web サービス認証 / 編集に移動します。
    Okta 認証についてはヘルプドキュメントを参照してください。
  4. 「Save & Test」をクリックします
  5. 「Add Sage Intacct Connection」ページの「Permissions」タブに移動し、ユーザーベースの権限を更新します。

パーソナルアクセストークンの追加

パーソナルアクセストークン(PAT)は、Pipedream から Connect AI への接続を認証するために使用されます。アクセス制御の粒度を維持するために、サービスごとに個別の PAT を作成することをおすすめします。

  1. Connect AI アプリの右上にある歯車アイコン()をクリックして、設定ページを開きます。
  2. 設定ページの「Access Tokens」セクションに移動し、「Create PAT」をクリックします。
  3. PAT に名前を付けて「Create」をクリックします。
  4. パーソナルアクセストークンは作成時にのみ表示されますので、必ずコピーして安全な場所に保管してください。

これで接続の設定と PAT の生成が完了しました。Pipedream から Sage Intacct に接続する準備が整いました。


ステップ 2:Pipedream で環境変数を設定する

認証情報を Pipedream の環境変数として安全に保存します。

  1. Pipedream で Settings を開き、Environment Variables に移動します
  2. New Variable をクリックして、以下の変数を追加します:
  3. 変数名
    CDATA_EMAIL CData Connect AI のログイン用メールアドレス
    CDATA_PAT CData パーソナルアクセストークン
    OPENAI_API_KEY OpenAI API キー

ステップ 3:Pipedream ワークフローを作成する

3.1 HTTP トリガーの設定

  1. Pipedream で新しいワークフローを作成します
  2. トリガーとして HTTP / Webhook を選択します
  3. HTTP Response"Return a custom response from your workflow" に設定します

3.2 LLM ステップの追加

Node.js コードステップを追加し、名前を LLM とします。このステップでは、受信したリクエストから自然言語クエリを抽出します。

ステップのデフォルトコードを以下に置き換えてください:

import OpenAI from "openai";

export default defineComponent({
  async run({ steps }) {
    if (steps.trigger.event.method === "OPTIONS") {
      return { userQuery: null, isOptions: true };
    }

    const body = steps.trigger.event.body;
    const parsed = typeof body === "string" ? JSON.parse(body) : body;
    const userQuery = parsed?.query;

    console.log("USER QUERY:", userQuery);
    if (!userQuery) throw new Error("No query found in request body");

    return { userQuery };
  }
});

3.3 MCP ステップの追加

Node.js コードステップを追加し、名前を MCP とします。このステップでは、エージェント型の MCP フロー全体を実装します。利用可能な接続をすべて自動検出し、質問に最も関連する接続を選択し、スキーマとテーブルを動的に検出し、LLM を使って SQL クエリを生成し、Sage Intacct のデータに対して実行します。

このステップでは、CData Connect AI の以下の MCP ツールを順番に使用します:

MCP ツール 用途
getCatalogs CData Connect AI から利用可能な接続をすべて取得します
getSchemas 選択した接続のデータベーススキーマを取得します
getTables 選択したスキーマのすべてのテーブルとビューを取得します
queryData 生成された SQL クエリを実行し、結果を返します

ステップのデフォルトコードを以下に置き換えてください:


import fetch from "node-fetch";
import OpenAI from "openai";

export default defineComponent({
  async run({ steps }) {
    const email = process.env.CDATA_EMAIL;
    const pat = process.env.CDATA_PAT;
    const credentials = email + ":" + pat;
    const auth = Buffer.from(credentials).toString("base64");
    const llmOutput = steps.LLM;
    const userQuery = llmOutput.return_value.userQuery; // In Pipedream replace with: steps.LLM.$return_value.userQuery
    const MCP_URL = "https://mcp.cloud.cdata.com/mcp";
    const NL = String.fromCharCode(10);
    const CRNL = String.fromCharCode(13) + String.fromCharCode(10);

    const headers = {
      "Content-Type": "application/json",
      "Accept": "application/json, text/event-stream",
      "Authorization": "Basic " + auth
    };

    function parseSSE(raw) {
      try {
        const lines = raw.split(NL);
        for (let i = 0; i < lines.length; i++) {
          const line = lines.at(i);
          const trimmed = line.trim();
          if (trimmed.indexOf("data:") === 0) {
            const jsonStr = trimmed.slice(5).trim();
            if (jsonStr) {
              const json = JSON.parse(jsonStr);
              const result = json && json.result;
              const content = result && result.content;
              if (Array.isArray(content)) {
                return {
                  parsed: content.map(function(c) { return c.text || ""; }).join(NL),
                  isError: (result && result.isError) || false,
                  full: json
                };
              }
            }
          }
        }
      } catch (e) {
        console.log("SSE parse error:", e.message);
      }
      return { parsed: raw, isError: false, full: null };
    }

    function parseCSV(text) {
      let clean = text || "";
      if (clean.charAt(0) === '"' && clean.charAt(clean.length - 1) === '"') {
        clean = clean.slice(1, -1);
      }
      const ESC_CRNL = String.fromCharCode(92) + "r" + String.fromCharCode(92) + "n";
      const ESC_QUOTE = String.fromCharCode(92) + '"';
      const ESC_SLASH = String.fromCharCode(92) + String.fromCharCode(92);
      const SINGLE_SLASH = String.fromCharCode(92);
      clean = clean.split(ESC_CRNL).join(CRNL).split(ESC_QUOTE).join('"').split(ESC_SLASH).join(SINGLE_SLASH);
      const lines = clean.split(CRNL).filter(function(l) { return l.trim(); });
      return lines.slice(1).map(function(l) { return l.split(",").at(0).trim(); }).filter(Boolean);
    }

    async function initSession() {
      const res = await fetch(MCP_URL, {
        method: "POST",
        headers: headers,
        body: JSON.stringify({
          jsonrpc: "2.0",
          id: 1,
          method: "initialize",
          params: {
            protocolVersion: "2024-11-05",
            capabilities: {},
            clientInfo: { name: "pipedream", version: "1.0" }
          }
        })
      });
      return res.headers.get("mcp-session-id");
    }

    async function callMCP(id, method, args, sessionId) {
      const reqHeaders = Object.assign({}, headers);
      if (sessionId) {
        Object.assign(reqHeaders, { "mcp-session-id": sessionId });
      }
      const res = await fetch(MCP_URL, {
        method: "POST",
        headers: reqHeaders,
        body: JSON.stringify({
          jsonrpc: "2.0",
          id: id,
          method: "tools/call",
          params: { name: method, arguments: args }
        })
      });
      const raw = await res.text();
      const result = parseSSE(raw);
      result.raw = raw;
      return result;
    }

    const client = new OpenAI({ apiKey: process.env.OPENAI_API_KEY });
    const completions = client.chat.completions;

    const session1 = await initSession();
    const catalogsResult = await callMCP(2, "getCatalogs", {}, session1);
    const catalogs = parseCSV(catalogsResult.parsed);

    const systemMsg1 = "You are a data routing expert. Pick the MOST relevant connection name from the list. Return ONLY the connection name. Available connections: " + catalogs.join(", ");
    const connectionResponse = await completions.create({
      model: "gpt-4o-mini",
      messages: new Array(
        { role: "system", content: systemMsg1 },
        { role: "user", content: userQuery }
      )
    });
    const connectionName = connectionResponse.choices.at(0).message.content.trim();

    const session2 = await initSession();
    const schemasResult = await callMCP(2, "getSchemas", {
      connectionName: connectionName,
      catalogName: connectionName
    }, session2);
    const schemas = parseCSV(schemasResult.parsed);
    const schemaName = schemas.at(0) || "REST";

    const session3 = await initSession();
    const tablesResult = await callMCP(2, "getTables", {
      connectionName: connectionName,
      catalogName: connectionName,
      schemaName: schemaName
    }, session3);
    const tableNames = parseCSV(tablesResult.parsed);

    const queryLower = userQuery.toLowerCase();
    const isListTablesQuery =
      queryLower.indexOf("list") !== -1 ||
      queryLower.indexOf("what tables") !== -1 ||
      queryLower.indexOf("show tables") !== -1;

    if (isListTablesQuery) {
      return {
        success: true,
        connection: connectionName,
        message: "Available tables in " + connectionName + "." + schemaName,
        tables: tableNames
      };
    }

    const tableList = tableNames.map(function(t) {
      return connectionName + "." + schemaName + "." + t;
    }).join(", ");

    const systemMsg2 = "You are a SQL expert. Generate SQL for CData. Use format: connectionName.schemaName.TableName. Available tables: " + tableList + ". Return ONLY SQL. No markdown. No brackets.";
    const sqlResponse = await completions.create({
      model: "gpt-4o-mini",
      messages: new Array(
        { role: "system", content: systemMsg2 },
        { role: "user", content: userQuery }
      )
    });
    const sql = sqlResponse.choices.at(0).message.content.trim();

    if (!sql) { return { error: "LLM returned empty SQL" }; }

    const session4 = await initSession();
    const queryResult = await callMCP(2, "queryData", {
      query: sql,
      connectionName: connectionName
    }, session4);

    if (queryResult.full) {
      const content = queryResult.full.result && queryResult.full.result.content;
      if (Array.isArray(content)) {
        try {
          const parsed = JSON.parse(content.at(0).text);
          const results = parsed.results && parsed.results.at(0);
          return {
            sql: sql,
            connection: connectionName,
            data: (results && results.rows) || new Array(),
            schema: (results && results.schema) || new Array(),
            success: true
          };
        } catch (e) {
          return { sql: sql, connection: connectionName, raw: content.at(0).text, success: true };
        }
      }
    }
    return { sql: sql, connection: connectionName, raw: queryResult.raw };
  }
});

注意: Pipedream にコードを貼り付ける際は、該当行のコメントに記載されているとおり、llmOutput.return_value.userQuerysteps.LLM.$return_value.userQuery に置き換えてください。

3.4 レスポンスステップの設定

  1. Return HTTP Response ステップを追加し、名前を Response とします
  2. Response Status Code を 200 に設定します
  3. Response Body{{steps.mcp.$return_value}} に設定します
  4. 以下の Response Headers を追加します。「Response Headers」をクリックし、 をクリックして追加してください:
  5. Key Value
    Access-Control-Allow-Origin *
    Access-Control-Allow-Methods POST, OPTIONS
    Access-Control-Allow-Headers Content-Type

ステップ 4:ワークフローをテストして Sage Intacct のデータを操作する

トリガーにテストイベントを設定する

  1. ワークフロー内の trigger ステップをクリックします
  2. Generate Test Event をクリックします
  3. イベントのボディを以下のように編集します:
  4. {
      "query": "list all tables"
    }
    

ワークフロー全体を実行する

  1. トリガーステップの下部にある Test workflow をクリックします
  2. Pipedream がテストイベントを使用して、すべてのステップを順番に実行します
  3. 各ステップが正常に完了すると、緑色に変わります

各ステップの結果を確認する

テスト実行が完了したら、各ステップのタブをクリックして Exports タブで出力を確認しましょう:

ステップ Exports で確認するポイント
trigger body.query - クエリが正しく受信されたことを確認
LLM userQuery - クエリが正しく抽出されたことを確認
MCP connection, sql, data, schema - データが取得されたことを確認
Response $response.body - 最終的な JSON レスポンス

各ステップ内の Logs タブには、生成された SQL、選択された接続、MCP の生レスポンスなどの詳細な出力が表示されます。

注意: Response ステップの Exports タブには、{ "success": true }"status 200" のようなサマリーのみが表示されます。これはワークフローが正常に実行されたことを示していますが、完全なデータは表示されません。

データ行、SQL、スキーマを含む完全な出力を確認するには、MCP ステップのタブをクリックし、Exports タブを確認してください。$return_value を展開すると、完全なレスポンスが表示されます:

このワークフローは以下を自動的に実行します:

  1. CData の接続をすべて検出
  2. 質問に最も関連する接続を選択
  3. スキーマとテーブルを動的に検出
  4. 適切な SQL クエリを生成して実行
  5. 結果を返却

仕組みについて

この連携では、CData Connect AI の以下の MCP ツールを順番に使用します:

MCP ツール 用途
getCatalogs CData Connect AI から利用可能な接続をすべて取得します
getSchemas 特定の接続のデータベーススキーマを取得します
getTables 特定のスキーマのすべてのテーブルとビューを取得します
queryData SQL クエリを実行し、結果を返します

OpenAI の LLM が、自然言語の質問と CData MCP ツールの間のインテリジェントレイヤーとして機能し、適切な接続の選択、データ構造の検出、正確な SQL クエリの生成を自動で行います。

CData Connect AI でビジネスシステムのデータ活用を今すぐスタート

Pipedream と CData Connect AI を組み合わせることで、自然言語クエリがエンタープライズシステム全体のリアルタイムデータ操作に自動変換される、インテリジェントな AI 駆動ワークフローを実現できます。ETL パイプライン、データ同期ジョブ、カスタム連携ロジックは不要です。このアプローチにより、ガバナンスの強化、運用コストの削減、AI ワークフローからのより迅速で正確なレスポンスが可能になります。

いかがでしたか?Pipedream から Sage Intacct へのデータ接続が簡単に完了したのではないでしょうか。業務に使えそう、と感じてくださった方は、14日間の無償トライアルで AI ツールからビジネスシステムへのリアルタイムデータ接続をぜひお試しください。

はじめる準備はできましたか?

CData Connect AI の詳細、または無料トライアルにお申し込みください:

無料トライアル