CData Connect AI MCP Server でPipedream からeBay のデータと連携しよう!

Somya Sharma
Somya Sharma
Technical Marketing Engineer
CData Connect AI のリモート MCP Server で Pipedream から eBay へのリアルタイムアクセスを実現。ワークフロー自動化でデータの読み取りとアクション実行を簡単に。

Pipedream は、API の接続、タスクの自動化、サーバーレス関数を使ったイベント駆動型ワークフローの構築をサポートする、クラウドベースのワークフロー自動化プラットフォームです。CData Connect AI のリモート MCP と組み合わせることで、データレプリケーションを行わずに、Pipedream から自然言語を使って eBay のデータとリアルタイムでやり取りできるようになります。

CData Connect AI は、eBay に接続するための専用クラウド間インターフェースを提供します。CData Connect AI Remote MCP Server により、Pipedream と eBay の間でセキュアな通信が可能になります。LLM がデータソースをインテリジェントに検出し、SQL クエリを動的に生成することで、Pipedream のワークフローから eBay に質問してデータを取得できます。

この記事では、Pipedream で自然言語によるデータクエリワークフローを構築し、eBay のデータを会話形式で探索する方法をご紹介します。ここで紹介する接続の原則は、あらゆる Pipedream ワークフローに適用できます。Connect AI を使用すれば、eBay に加えて、数百の他のデータソースにもアクセスできるワークフローやエージェントを構築できます。

前提条件

  1. CData Connect AI アカウント(eBay など、少なくとも1つのアクティブな接続が必要)
  2. Pipedream アカウント
  3. OpenAI アカウント(API キー付き)
  4. CData Connect AI の認証情報:
    • メールアドレス(Basic 認証のユーザー名として使用)
    • パーソナルアクセストークン(PAT)(CData Connect AI の設定ページから生成)

ステップ 1:Pipedream 用の eBay 接続を設定する

Pipedream から eBay への接続は、CData Connect AI のリモート MCP を通じて実現されます。Pipedream から eBay とやり取りするために、まず CData Connect AI で eBay 接続を作成・設定していきましょう。

  1. Connect AI にログインし、「Sources」をクリックして、「Add Connection」をクリックします
  2. 「Add Connection」パネルから「eBay」を選択します
  3. eBay に接続するために必要な認証プロパティを入力します。

    eBay はOAuth 認証標準を利用しています。OAuth を使って認証するには、アプリケーションを作成してOAuthClientId、OAuthClientSecret、およびCallbackURL 接続プロパティを取得しなければなりません。

    詳細はヘルプドキュメントの「はじめに」を参照してください。

  4. 「Save & Test」をクリックします
  5. 「Add eBay Connection」ページの「Permissions」タブに移動し、ユーザーベースの権限を更新します。

パーソナルアクセストークンの追加

パーソナルアクセストークン(PAT)は、Pipedream から Connect AI への接続を認証するために使用されます。アクセス制御の粒度を維持するために、サービスごとに個別の PAT を作成することをおすすめします。

  1. Connect AI アプリの右上にある歯車アイコン()をクリックして、設定ページを開きます。
  2. 設定ページの「Access Tokens」セクションに移動し、「Create PAT」をクリックします。
  3. PAT に名前を付けて「Create」をクリックします。
  4. パーソナルアクセストークンは作成時にのみ表示されますので、必ずコピーして安全な場所に保管してください。

これで接続の設定と PAT の生成が完了しました。Pipedream から eBay に接続する準備が整いました。


ステップ 2:Pipedream で環境変数を設定する

認証情報を Pipedream の環境変数として安全に保存します。

  1. Pipedream で Settings を開き、Environment Variables に移動します
  2. New Variable をクリックして、以下の変数を追加します:
  3. 変数名
    CDATA_EMAIL CData Connect AI のログイン用メールアドレス
    CDATA_PAT CData パーソナルアクセストークン
    OPENAI_API_KEY OpenAI API キー

ステップ 3:Pipedream ワークフローを作成する

3.1 HTTP トリガーの設定

  1. Pipedream で新しいワークフローを作成します
  2. トリガーとして HTTP / Webhook を選択します
  3. HTTP Response"Return a custom response from your workflow" に設定します

3.2 LLM ステップの追加

Node.js コードステップを追加し、名前を LLM とします。このステップでは、受信したリクエストから自然言語クエリを抽出します。

ステップのデフォルトコードを以下に置き換えてください:

import OpenAI from "openai";

export default defineComponent({
  async run({ steps }) {
    if (steps.trigger.event.method === "OPTIONS") {
      return { userQuery: null, isOptions: true };
    }

    const body = steps.trigger.event.body;
    const parsed = typeof body === "string" ? JSON.parse(body) : body;
    const userQuery = parsed?.query;

    console.log("USER QUERY:", userQuery);
    if (!userQuery) throw new Error("No query found in request body");

    return { userQuery };
  }
});

3.3 MCP ステップの追加

Node.js コードステップを追加し、名前を MCP とします。このステップでは、エージェント型の MCP フロー全体を実装します。利用可能な接続をすべて自動検出し、質問に最も関連する接続を選択し、スキーマとテーブルを動的に検出し、LLM を使って SQL クエリを生成し、eBay のデータに対して実行します。

このステップでは、CData Connect AI の以下の MCP ツールを順番に使用します:

MCP ツール 用途
getCatalogs CData Connect AI から利用可能な接続をすべて取得します
getSchemas 選択した接続のデータベーススキーマを取得します
getTables 選択したスキーマのすべてのテーブルとビューを取得します
queryData 生成された SQL クエリを実行し、結果を返します

ステップのデフォルトコードを以下に置き換えてください:


import fetch from "node-fetch";
import OpenAI from "openai";

export default defineComponent({
  async run({ steps }) {
    const email = process.env.CDATA_EMAIL;
    const pat = process.env.CDATA_PAT;
    const credentials = email + ":" + pat;
    const auth = Buffer.from(credentials).toString("base64");
    const llmOutput = steps.LLM;
    const userQuery = llmOutput.return_value.userQuery; // In Pipedream replace with: steps.LLM.$return_value.userQuery
    const MCP_URL = "https://mcp.cloud.cdata.com/mcp";
    const NL = String.fromCharCode(10);
    const CRNL = String.fromCharCode(13) + String.fromCharCode(10);

    const headers = {
      "Content-Type": "application/json",
      "Accept": "application/json, text/event-stream",
      "Authorization": "Basic " + auth
    };

    function parseSSE(raw) {
      try {
        const lines = raw.split(NL);
        for (let i = 0; i < lines.length; i++) {
          const line = lines.at(i);
          const trimmed = line.trim();
          if (trimmed.indexOf("data:") === 0) {
            const jsonStr = trimmed.slice(5).trim();
            if (jsonStr) {
              const json = JSON.parse(jsonStr);
              const result = json && json.result;
              const content = result && result.content;
              if (Array.isArray(content)) {
                return {
                  parsed: content.map(function(c) { return c.text || ""; }).join(NL),
                  isError: (result && result.isError) || false,
                  full: json
                };
              }
            }
          }
        }
      } catch (e) {
        console.log("SSE parse error:", e.message);
      }
      return { parsed: raw, isError: false, full: null };
    }

    function parseCSV(text) {
      let clean = text || "";
      if (clean.charAt(0) === '"' && clean.charAt(clean.length - 1) === '"') {
        clean = clean.slice(1, -1);
      }
      const ESC_CRNL = String.fromCharCode(92) + "r" + String.fromCharCode(92) + "n";
      const ESC_QUOTE = String.fromCharCode(92) + '"';
      const ESC_SLASH = String.fromCharCode(92) + String.fromCharCode(92);
      const SINGLE_SLASH = String.fromCharCode(92);
      clean = clean.split(ESC_CRNL).join(CRNL).split(ESC_QUOTE).join('"').split(ESC_SLASH).join(SINGLE_SLASH);
      const lines = clean.split(CRNL).filter(function(l) { return l.trim(); });
      return lines.slice(1).map(function(l) { return l.split(",").at(0).trim(); }).filter(Boolean);
    }

    async function initSession() {
      const res = await fetch(MCP_URL, {
        method: "POST",
        headers: headers,
        body: JSON.stringify({
          jsonrpc: "2.0",
          id: 1,
          method: "initialize",
          params: {
            protocolVersion: "2024-11-05",
            capabilities: {},
            clientInfo: { name: "pipedream", version: "1.0" }
          }
        })
      });
      return res.headers.get("mcp-session-id");
    }

    async function callMCP(id, method, args, sessionId) {
      const reqHeaders = Object.assign({}, headers);
      if (sessionId) {
        Object.assign(reqHeaders, { "mcp-session-id": sessionId });
      }
      const res = await fetch(MCP_URL, {
        method: "POST",
        headers: reqHeaders,
        body: JSON.stringify({
          jsonrpc: "2.0",
          id: id,
          method: "tools/call",
          params: { name: method, arguments: args }
        })
      });
      const raw = await res.text();
      const result = parseSSE(raw);
      result.raw = raw;
      return result;
    }

    const client = new OpenAI({ apiKey: process.env.OPENAI_API_KEY });
    const completions = client.chat.completions;

    const session1 = await initSession();
    const catalogsResult = await callMCP(2, "getCatalogs", {}, session1);
    const catalogs = parseCSV(catalogsResult.parsed);

    const systemMsg1 = "You are a data routing expert. Pick the MOST relevant connection name from the list. Return ONLY the connection name. Available connections: " + catalogs.join(", ");
    const connectionResponse = await completions.create({
      model: "gpt-4o-mini",
      messages: new Array(
        { role: "system", content: systemMsg1 },
        { role: "user", content: userQuery }
      )
    });
    const connectionName = connectionResponse.choices.at(0).message.content.trim();

    const session2 = await initSession();
    const schemasResult = await callMCP(2, "getSchemas", {
      connectionName: connectionName,
      catalogName: connectionName
    }, session2);
    const schemas = parseCSV(schemasResult.parsed);
    const schemaName = schemas.at(0) || "REST";

    const session3 = await initSession();
    const tablesResult = await callMCP(2, "getTables", {
      connectionName: connectionName,
      catalogName: connectionName,
      schemaName: schemaName
    }, session3);
    const tableNames = parseCSV(tablesResult.parsed);

    const queryLower = userQuery.toLowerCase();
    const isListTablesQuery =
      queryLower.indexOf("list") !== -1 ||
      queryLower.indexOf("what tables") !== -1 ||
      queryLower.indexOf("show tables") !== -1;

    if (isListTablesQuery) {
      return {
        success: true,
        connection: connectionName,
        message: "Available tables in " + connectionName + "." + schemaName,
        tables: tableNames
      };
    }

    const tableList = tableNames.map(function(t) {
      return connectionName + "." + schemaName + "." + t;
    }).join(", ");

    const systemMsg2 = "You are a SQL expert. Generate SQL for CData. Use format: connectionName.schemaName.TableName. Available tables: " + tableList + ". Return ONLY SQL. No markdown. No brackets.";
    const sqlResponse = await completions.create({
      model: "gpt-4o-mini",
      messages: new Array(
        { role: "system", content: systemMsg2 },
        { role: "user", content: userQuery }
      )
    });
    const sql = sqlResponse.choices.at(0).message.content.trim();

    if (!sql) { return { error: "LLM returned empty SQL" }; }

    const session4 = await initSession();
    const queryResult = await callMCP(2, "queryData", {
      query: sql,
      connectionName: connectionName
    }, session4);

    if (queryResult.full) {
      const content = queryResult.full.result && queryResult.full.result.content;
      if (Array.isArray(content)) {
        try {
          const parsed = JSON.parse(content.at(0).text);
          const results = parsed.results && parsed.results.at(0);
          return {
            sql: sql,
            connection: connectionName,
            data: (results && results.rows) || new Array(),
            schema: (results && results.schema) || new Array(),
            success: true
          };
        } catch (e) {
          return { sql: sql, connection: connectionName, raw: content.at(0).text, success: true };
        }
      }
    }
    return { sql: sql, connection: connectionName, raw: queryResult.raw };
  }
});

注意: Pipedream にコードを貼り付ける際は、該当行のコメントに記載されているとおり、llmOutput.return_value.userQuerysteps.LLM.$return_value.userQuery に置き換えてください。

3.4 レスポンスステップの設定

  1. Return HTTP Response ステップを追加し、名前を Response とします
  2. Response Status Code を 200 に設定します
  3. Response Body{{steps.mcp.$return_value}} に設定します
  4. 以下の Response Headers を追加します。「Response Headers」をクリックし、 をクリックして追加してください:
  5. Key Value
    Access-Control-Allow-Origin *
    Access-Control-Allow-Methods POST, OPTIONS
    Access-Control-Allow-Headers Content-Type

ステップ 4:ワークフローをテストして eBay のデータを操作する

トリガーにテストイベントを設定する

  1. ワークフロー内の trigger ステップをクリックします
  2. Generate Test Event をクリックします
  3. イベントのボディを以下のように編集します:
  4. {
      "query": "list all tables"
    }
    

ワークフロー全体を実行する

  1. トリガーステップの下部にある Test workflow をクリックします
  2. Pipedream がテストイベントを使用して、すべてのステップを順番に実行します
  3. 各ステップが正常に完了すると、緑色に変わります

各ステップの結果を確認する

テスト実行が完了したら、各ステップのタブをクリックして Exports タブで出力を確認しましょう:

ステップ Exports で確認するポイント
trigger body.query - クエリが正しく受信されたことを確認
LLM userQuery - クエリが正しく抽出されたことを確認
MCP connection, sql, data, schema - データが取得されたことを確認
Response $response.body - 最終的な JSON レスポンス

各ステップ内の Logs タブには、生成された SQL、選択された接続、MCP の生レスポンスなどの詳細な出力が表示されます。

注意: Response ステップの Exports タブには、{ "success": true }"status 200" のようなサマリーのみが表示されます。これはワークフローが正常に実行されたことを示していますが、完全なデータは表示されません。

データ行、SQL、スキーマを含む完全な出力を確認するには、MCP ステップのタブをクリックし、Exports タブを確認してください。$return_value を展開すると、完全なレスポンスが表示されます:

このワークフローは以下を自動的に実行します:

  1. CData の接続をすべて検出
  2. 質問に最も関連する接続を選択
  3. スキーマとテーブルを動的に検出
  4. 適切な SQL クエリを生成して実行
  5. 結果を返却

仕組みについて

この連携では、CData Connect AI の以下の MCP ツールを順番に使用します:

MCP ツール 用途
getCatalogs CData Connect AI から利用可能な接続をすべて取得します
getSchemas 特定の接続のデータベーススキーマを取得します
getTables 特定のスキーマのすべてのテーブルとビューを取得します
queryData SQL クエリを実行し、結果を返します

OpenAI の LLM が、自然言語の質問と CData MCP ツールの間のインテリジェントレイヤーとして機能し、適切な接続の選択、データ構造の検出、正確な SQL クエリの生成を自動で行います。

CData Connect AI でビジネスシステムのデータ活用を今すぐスタート

Pipedream と CData Connect AI を組み合わせることで、自然言語クエリがエンタープライズシステム全体のリアルタイムデータ操作に自動変換される、インテリジェントな AI 駆動ワークフローを実現できます。ETL パイプライン、データ同期ジョブ、カスタム連携ロジックは不要です。このアプローチにより、ガバナンスの強化、運用コストの削減、AI ワークフローからのより迅速で正確なレスポンスが可能になります。

いかがでしたか?Pipedream から eBay へのデータ接続が簡単に完了したのではないでしょうか。業務に使えそう、と感じてくださった方は、14日間の無償トライアルで AI ツールからビジネスシステムへのリアルタイムデータ接続をぜひお試しください。

はじめる準備はできましたか?

CData Connect AI の詳細、または無料トライアルにお申し込みください:

無料トライアル