CData Connect AI MCP Server でPipedream からElasticsearch のデータと連携しよう!

Somya Sharma
Somya Sharma
Technical Marketing Engineer
CData Connect AI のリモート MCP Server で Pipedream から Elasticsearch へのリアルタイムアクセスを実現。ワークフロー自動化でデータの読み取りとアクション実行を簡単に。

Pipedream は、API の接続、タスクの自動化、サーバーレス関数を使ったイベント駆動型ワークフローの構築をサポートする、クラウドベースのワークフロー自動化プラットフォームです。CData Connect AI のリモート MCP と組み合わせることで、データレプリケーションを行わずに、Pipedream から自然言語を使って Elasticsearch のデータとリアルタイムでやり取りできるようになります。

CData Connect AI は、Elasticsearch に接続するための専用クラウド間インターフェースを提供します。CData Connect AI Remote MCP Server により、Pipedream と Elasticsearch の間でセキュアな通信が可能になります。LLM がデータソースをインテリジェントに検出し、SQL クエリを動的に生成することで、Pipedream のワークフローから Elasticsearch に質問してデータを取得できます。

この記事では、Pipedream で自然言語によるデータクエリワークフローを構築し、Elasticsearch のデータを会話形式で探索する方法をご紹介します。ここで紹介する接続の原則は、あらゆる Pipedream ワークフローに適用できます。Connect AI を使用すれば、Elasticsearch に加えて、数百の他のデータソースにもアクセスできるワークフローやエージェントを構築できます。

Elasticsearch データ連携について

CData を使用すれば、Elasticsearch のライブデータへのアクセスと統合がこれまでになく簡単になります。お客様は CData の接続機能を以下の目的で利用しています:

  • SQL エンドポイントと REST エンドポイントの両方にアクセスでき、接続を最適化し、Elasticsearch データの読み書きに関してより多くのオプションを提供します。
  • v2.2 以降およびオープンソース Elasticsearch サブスクリプションを含む、ほぼすべての Elasticsearch インスタンスに接続できます。
  • SCORE() 関数を明示的に要求することなく、常にクエリ結果の関連性スコアを受け取ることができます。これにより、サードパーティツールからのアクセスが簡素化され、クエリ結果のテキスト関連性のランキングを簡単に確認できます。
  • 複数のインデックスを検索でき、クライアントマシンではなく Elasticsearch がクエリと結果の管理・処理を担当します。

ユーザーは、Crystal Reports、Power BI、Excel などの分析ツールと Elasticsearch データを統合し、当社のツールを活用して、Elasticsearch を含むすべてのデータソースへの単一のフェデレートアクセスレイヤーを実現しています。

CData の Elasticsearch ソリューションの詳細については、ナレッジベース記事をご覧ください:CData Elasticsearch Driver Features & Differentiators


はじめに


前提条件

  1. CData Connect AI アカウント(Elasticsearch など、少なくとも1つのアクティブな接続が必要)
  2. Pipedream アカウント
  3. OpenAI アカウント(API キー付き)
  4. CData Connect AI の認証情報:
    • メールアドレス(Basic 認証のユーザー名として使用)
    • パーソナルアクセストークン(PAT)(CData Connect AI の設定ページから生成)

ステップ 1:Pipedream 用の Elasticsearch 接続を設定する

Pipedream から Elasticsearch への接続は、CData Connect AI のリモート MCP を通じて実現されます。Pipedream から Elasticsearch とやり取りするために、まず CData Connect AI で Elasticsearch 接続を作成・設定していきましょう。

  1. Connect AI にログインし、「Sources」をクリックして、「Add Connection」をクリックします
  2. 「Add Connection」パネルから「Elasticsearch」を選択します
  3. Elasticsearch に接続するために必要な認証プロパティを入力します。

    Elasticsearch 接続プロパティの取得・設定方法

    接続するには、Server およびPort 接続プロパティを設定します。 認証には、User とPassword プロパティ、PKI (public key infrastructure)、またはその両方を設定します。 PKI を使用するには、SSLClientCert、SSLClientCertType、SSLClientCertSubject、およびSSLClientCertPassword プロパティを設定します。

    CData 製品は、認証とTLS/SSL 暗号化にX-Pack Security を使用しています。TLS/SSL で接続するには、Server 値に'https://' を接頭します。Note: PKI を 使用するためには、TLS/SSL およびクライアント認証はX-Pack 上で有効化されていなければなりません。

    接続されると、X-Pack では、設定したリルムをベースにユーザー認証およびロールの許可が実施されます。

  4. 「Save & Test」をクリックします
  5. 「Add Elasticsearch Connection」ページの「Permissions」タブに移動し、ユーザーベースの権限を更新します。

パーソナルアクセストークンの追加

パーソナルアクセストークン(PAT)は、Pipedream から Connect AI への接続を認証するために使用されます。アクセス制御の粒度を維持するために、サービスごとに個別の PAT を作成することをおすすめします。

  1. Connect AI アプリの右上にある歯車アイコン()をクリックして、設定ページを開きます。
  2. 設定ページの「Access Tokens」セクションに移動し、「Create PAT」をクリックします。
  3. PAT に名前を付けて「Create」をクリックします。
  4. パーソナルアクセストークンは作成時にのみ表示されますので、必ずコピーして安全な場所に保管してください。

これで接続の設定と PAT の生成が完了しました。Pipedream から Elasticsearch に接続する準備が整いました。


ステップ 2:Pipedream で環境変数を設定する

認証情報を Pipedream の環境変数として安全に保存します。

  1. Pipedream で Settings を開き、Environment Variables に移動します
  2. New Variable をクリックして、以下の変数を追加します:
  3. 変数名
    CDATA_EMAIL CData Connect AI のログイン用メールアドレス
    CDATA_PAT CData パーソナルアクセストークン
    OPENAI_API_KEY OpenAI API キー

ステップ 3:Pipedream ワークフローを作成する

3.1 HTTP トリガーの設定

  1. Pipedream で新しいワークフローを作成します
  2. トリガーとして HTTP / Webhook を選択します
  3. HTTP Response"Return a custom response from your workflow" に設定します

3.2 LLM ステップの追加

Node.js コードステップを追加し、名前を LLM とします。このステップでは、受信したリクエストから自然言語クエリを抽出します。

ステップのデフォルトコードを以下に置き換えてください:

import OpenAI from "openai";

export default defineComponent({
  async run({ steps }) {
    if (steps.trigger.event.method === "OPTIONS") {
      return { userQuery: null, isOptions: true };
    }

    const body = steps.trigger.event.body;
    const parsed = typeof body === "string" ? JSON.parse(body) : body;
    const userQuery = parsed?.query;

    console.log("USER QUERY:", userQuery);
    if (!userQuery) throw new Error("No query found in request body");

    return { userQuery };
  }
});

3.3 MCP ステップの追加

Node.js コードステップを追加し、名前を MCP とします。このステップでは、エージェント型の MCP フロー全体を実装します。利用可能な接続をすべて自動検出し、質問に最も関連する接続を選択し、スキーマとテーブルを動的に検出し、LLM を使って SQL クエリを生成し、Elasticsearch のデータに対して実行します。

このステップでは、CData Connect AI の以下の MCP ツールを順番に使用します:

MCP ツール 用途
getCatalogs CData Connect AI から利用可能な接続をすべて取得します
getSchemas 選択した接続のデータベーススキーマを取得します
getTables 選択したスキーマのすべてのテーブルとビューを取得します
queryData 生成された SQL クエリを実行し、結果を返します

ステップのデフォルトコードを以下に置き換えてください:


import fetch from "node-fetch";
import OpenAI from "openai";

export default defineComponent({
  async run({ steps }) {
    const email = process.env.CDATA_EMAIL;
    const pat = process.env.CDATA_PAT;
    const credentials = email + ":" + pat;
    const auth = Buffer.from(credentials).toString("base64");
    const llmOutput = steps.LLM;
    const userQuery = llmOutput.return_value.userQuery; // In Pipedream replace with: steps.LLM.$return_value.userQuery
    const MCP_URL = "https://mcp.cloud.cdata.com/mcp";
    const NL = String.fromCharCode(10);
    const CRNL = String.fromCharCode(13) + String.fromCharCode(10);

    const headers = {
      "Content-Type": "application/json",
      "Accept": "application/json, text/event-stream",
      "Authorization": "Basic " + auth
    };

    function parseSSE(raw) {
      try {
        const lines = raw.split(NL);
        for (let i = 0; i < lines.length; i++) {
          const line = lines.at(i);
          const trimmed = line.trim();
          if (trimmed.indexOf("data:") === 0) {
            const jsonStr = trimmed.slice(5).trim();
            if (jsonStr) {
              const json = JSON.parse(jsonStr);
              const result = json && json.result;
              const content = result && result.content;
              if (Array.isArray(content)) {
                return {
                  parsed: content.map(function(c) { return c.text || ""; }).join(NL),
                  isError: (result && result.isError) || false,
                  full: json
                };
              }
            }
          }
        }
      } catch (e) {
        console.log("SSE parse error:", e.message);
      }
      return { parsed: raw, isError: false, full: null };
    }

    function parseCSV(text) {
      let clean = text || "";
      if (clean.charAt(0) === '"' && clean.charAt(clean.length - 1) === '"') {
        clean = clean.slice(1, -1);
      }
      const ESC_CRNL = String.fromCharCode(92) + "r" + String.fromCharCode(92) + "n";
      const ESC_QUOTE = String.fromCharCode(92) + '"';
      const ESC_SLASH = String.fromCharCode(92) + String.fromCharCode(92);
      const SINGLE_SLASH = String.fromCharCode(92);
      clean = clean.split(ESC_CRNL).join(CRNL).split(ESC_QUOTE).join('"').split(ESC_SLASH).join(SINGLE_SLASH);
      const lines = clean.split(CRNL).filter(function(l) { return l.trim(); });
      return lines.slice(1).map(function(l) { return l.split(",").at(0).trim(); }).filter(Boolean);
    }

    async function initSession() {
      const res = await fetch(MCP_URL, {
        method: "POST",
        headers: headers,
        body: JSON.stringify({
          jsonrpc: "2.0",
          id: 1,
          method: "initialize",
          params: {
            protocolVersion: "2024-11-05",
            capabilities: {},
            clientInfo: { name: "pipedream", version: "1.0" }
          }
        })
      });
      return res.headers.get("mcp-session-id");
    }

    async function callMCP(id, method, args, sessionId) {
      const reqHeaders = Object.assign({}, headers);
      if (sessionId) {
        Object.assign(reqHeaders, { "mcp-session-id": sessionId });
      }
      const res = await fetch(MCP_URL, {
        method: "POST",
        headers: reqHeaders,
        body: JSON.stringify({
          jsonrpc: "2.0",
          id: id,
          method: "tools/call",
          params: { name: method, arguments: args }
        })
      });
      const raw = await res.text();
      const result = parseSSE(raw);
      result.raw = raw;
      return result;
    }

    const client = new OpenAI({ apiKey: process.env.OPENAI_API_KEY });
    const completions = client.chat.completions;

    const session1 = await initSession();
    const catalogsResult = await callMCP(2, "getCatalogs", {}, session1);
    const catalogs = parseCSV(catalogsResult.parsed);

    const systemMsg1 = "You are a data routing expert. Pick the MOST relevant connection name from the list. Return ONLY the connection name. Available connections: " + catalogs.join(", ");
    const connectionResponse = await completions.create({
      model: "gpt-4o-mini",
      messages: new Array(
        { role: "system", content: systemMsg1 },
        { role: "user", content: userQuery }
      )
    });
    const connectionName = connectionResponse.choices.at(0).message.content.trim();

    const session2 = await initSession();
    const schemasResult = await callMCP(2, "getSchemas", {
      connectionName: connectionName,
      catalogName: connectionName
    }, session2);
    const schemas = parseCSV(schemasResult.parsed);
    const schemaName = schemas.at(0) || "REST";

    const session3 = await initSession();
    const tablesResult = await callMCP(2, "getTables", {
      connectionName: connectionName,
      catalogName: connectionName,
      schemaName: schemaName
    }, session3);
    const tableNames = parseCSV(tablesResult.parsed);

    const queryLower = userQuery.toLowerCase();
    const isListTablesQuery =
      queryLower.indexOf("list") !== -1 ||
      queryLower.indexOf("what tables") !== -1 ||
      queryLower.indexOf("show tables") !== -1;

    if (isListTablesQuery) {
      return {
        success: true,
        connection: connectionName,
        message: "Available tables in " + connectionName + "." + schemaName,
        tables: tableNames
      };
    }

    const tableList = tableNames.map(function(t) {
      return connectionName + "." + schemaName + "." + t;
    }).join(", ");

    const systemMsg2 = "You are a SQL expert. Generate SQL for CData. Use format: connectionName.schemaName.TableName. Available tables: " + tableList + ". Return ONLY SQL. No markdown. No brackets.";
    const sqlResponse = await completions.create({
      model: "gpt-4o-mini",
      messages: new Array(
        { role: "system", content: systemMsg2 },
        { role: "user", content: userQuery }
      )
    });
    const sql = sqlResponse.choices.at(0).message.content.trim();

    if (!sql) { return { error: "LLM returned empty SQL" }; }

    const session4 = await initSession();
    const queryResult = await callMCP(2, "queryData", {
      query: sql,
      connectionName: connectionName
    }, session4);

    if (queryResult.full) {
      const content = queryResult.full.result && queryResult.full.result.content;
      if (Array.isArray(content)) {
        try {
          const parsed = JSON.parse(content.at(0).text);
          const results = parsed.results && parsed.results.at(0);
          return {
            sql: sql,
            connection: connectionName,
            data: (results && results.rows) || new Array(),
            schema: (results && results.schema) || new Array(),
            success: true
          };
        } catch (e) {
          return { sql: sql, connection: connectionName, raw: content.at(0).text, success: true };
        }
      }
    }
    return { sql: sql, connection: connectionName, raw: queryResult.raw };
  }
});

注意: Pipedream にコードを貼り付ける際は、該当行のコメントに記載されているとおり、llmOutput.return_value.userQuerysteps.LLM.$return_value.userQuery に置き換えてください。

3.4 レスポンスステップの設定

  1. Return HTTP Response ステップを追加し、名前を Response とします
  2. Response Status Code を 200 に設定します
  3. Response Body{{steps.mcp.$return_value}} に設定します
  4. 以下の Response Headers を追加します。「Response Headers」をクリックし、 をクリックして追加してください:
  5. Key Value
    Access-Control-Allow-Origin *
    Access-Control-Allow-Methods POST, OPTIONS
    Access-Control-Allow-Headers Content-Type

ステップ 4:ワークフローをテストして Elasticsearch のデータを操作する

トリガーにテストイベントを設定する

  1. ワークフロー内の trigger ステップをクリックします
  2. Generate Test Event をクリックします
  3. イベントのボディを以下のように編集します:
  4. {
      "query": "list all tables"
    }
    

ワークフロー全体を実行する

  1. トリガーステップの下部にある Test workflow をクリックします
  2. Pipedream がテストイベントを使用して、すべてのステップを順番に実行します
  3. 各ステップが正常に完了すると、緑色に変わります

各ステップの結果を確認する

テスト実行が完了したら、各ステップのタブをクリックして Exports タブで出力を確認しましょう:

ステップ Exports で確認するポイント
trigger body.query - クエリが正しく受信されたことを確認
LLM userQuery - クエリが正しく抽出されたことを確認
MCP connection, sql, data, schema - データが取得されたことを確認
Response $response.body - 最終的な JSON レスポンス

各ステップ内の Logs タブには、生成された SQL、選択された接続、MCP の生レスポンスなどの詳細な出力が表示されます。

注意: Response ステップの Exports タブには、{ "success": true }"status 200" のようなサマリーのみが表示されます。これはワークフローが正常に実行されたことを示していますが、完全なデータは表示されません。

データ行、SQL、スキーマを含む完全な出力を確認するには、MCP ステップのタブをクリックし、Exports タブを確認してください。$return_value を展開すると、完全なレスポンスが表示されます:

このワークフローは以下を自動的に実行します:

  1. CData の接続をすべて検出
  2. 質問に最も関連する接続を選択
  3. スキーマとテーブルを動的に検出
  4. 適切な SQL クエリを生成して実行
  5. 結果を返却

仕組みについて

この連携では、CData Connect AI の以下の MCP ツールを順番に使用します:

MCP ツール 用途
getCatalogs CData Connect AI から利用可能な接続をすべて取得します
getSchemas 特定の接続のデータベーススキーマを取得します
getTables 特定のスキーマのすべてのテーブルとビューを取得します
queryData SQL クエリを実行し、結果を返します

OpenAI の LLM が、自然言語の質問と CData MCP ツールの間のインテリジェントレイヤーとして機能し、適切な接続の選択、データ構造の検出、正確な SQL クエリの生成を自動で行います。

CData Connect AI でビジネスシステムのデータ活用を今すぐスタート

Pipedream と CData Connect AI を組み合わせることで、自然言語クエリがエンタープライズシステム全体のリアルタイムデータ操作に自動変換される、インテリジェントな AI 駆動ワークフローを実現できます。ETL パイプライン、データ同期ジョブ、カスタム連携ロジックは不要です。このアプローチにより、ガバナンスの強化、運用コストの削減、AI ワークフローからのより迅速で正確なレスポンスが可能になります。

いかがでしたか?Pipedream から Elasticsearch へのデータ接続が簡単に完了したのではないでしょうか。業務に使えそう、と感じてくださった方は、14日間の無償トライアルで AI ツールからビジネスシステムへのリアルタイムデータ接続をぜひお試しください。

はじめる準備はできましたか?

CData Connect AI の詳細、または無料トライアルにお申し込みください:

無料トライアル