CData Connect AI MCP Server でPipedream からAmazon Athena のデータと連携しよう!

Somya Sharma
Somya Sharma
Technical Marketing Engineer
CData Connect AI のリモート MCP Server で Pipedream から Amazon Athena へのリアルタイムアクセスを実現。ワークフロー自動化でデータの読み取りとアクション実行を簡単に。

Pipedream は、API の接続、タスクの自動化、サーバーレス関数を使ったイベント駆動型ワークフローの構築をサポートする、クラウドベースのワークフロー自動化プラットフォームです。CData Connect AI のリモート MCP と組み合わせることで、データレプリケーションを行わずに、Pipedream から自然言語を使って Amazon Athena のデータとリアルタイムでやり取りできるようになります。

CData Connect AI は、Amazon Athena に接続するための専用クラウド間インターフェースを提供します。CData Connect AI Remote MCP Server により、Pipedream と Amazon Athena の間でセキュアな通信が可能になります。LLM がデータソースをインテリジェントに検出し、SQL クエリを動的に生成することで、Pipedream のワークフローから Amazon Athena に質問してデータを取得できます。

この記事では、Pipedream で自然言語によるデータクエリワークフローを構築し、Amazon Athena のデータを会話形式で探索する方法をご紹介します。ここで紹介する接続の原則は、あらゆる Pipedream ワークフローに適用できます。Connect AI を使用すれば、Amazon Athena に加えて、数百の他のデータソースにもアクセスできるワークフローやエージェントを構築できます。

Amazon Athena データ連携について

CData は、Amazon Athena のライブデータにアクセスし、統合するための最も簡単な方法を提供します。お客様は CData の接続機能を以下の目的で使用しています:

  • IAM 認証情報、アクセスキー、インスタンスプロファイルなど、さまざまな方法で安全に認証できます。多様なセキュリティニーズに対応し、認証プロセスを簡素化します。
  • 詳細なエラーメッセージにより、セットアップを効率化し、問題を迅速に解決できます。
  • サーバーサイドでのクエリ実行により、パフォーマンスを向上させ、クライアントリソースへの負荷を最小限に抑えます。

ユーザーは、Tableau、Power BI、Excel などの分析ツールと Athena を統合し、お気に入りのツールから詳細な分析を行うことができます。

CData を使用した Amazon Athena のユニークなユースケースについては、ブログ記事をご覧ください:https://jp.cdata.com/blog/amazon-athena-use-cases


はじめに


前提条件

  1. CData Connect AI アカウント(Amazon Athena など、少なくとも1つのアクティブな接続が必要)
  2. Pipedream アカウント
  3. OpenAI アカウント(API キー付き)
  4. CData Connect AI の認証情報:
    • メールアドレス(Basic 認証のユーザー名として使用)
    • パーソナルアクセストークン(PAT)(CData Connect AI の設定ページから生成)

ステップ 1:Pipedream 用の Amazon Athena 接続を設定する

Pipedream から Amazon Athena への接続は、CData Connect AI のリモート MCP を通じて実現されます。Pipedream から Amazon Athena とやり取りするために、まず CData Connect AI で Amazon Athena 接続を作成・設定していきましょう。

  1. Connect AI にログインし、「Sources」をクリックして、「Add Connection」をクリックします
  2. 「Add Connection」パネルから「Amazon Athena」を選択します
  3. Amazon Athena に接続するために必要な認証プロパティを入力します。

    Amazon Athena 接続プロパティの取得・設定方法

    それでは、早速Athena に接続していきましょう。

    データに接続するには、以下の接続パラメータを指定します。

    • DataSource:接続するAmazon Athena データソース。
    • Database:接続するAmazon Athena データベース。
    • AWSRegion:Amazon Athena データがホストされているリージョン。
    • S3StagingDirectory:クエリの結果を保存するS3 フォルダ。

    Database またはDataSource が設定されていない場合、CData 製品はAmazon Athena の利用可能なデータソースからすべてのデータベースのリスト化を試みます。そのため、両方のプロパティを設定することでCData 製品のパフォーマンスが向上します。

    Amazon Athena の認証設定

    CData 製品は幅広い認証オプションに対応しています。詳しくはヘルプドキュメントの「はじめに」を参照してみてください。

    AWS キーを取得

    IAM ユーザーの認証情報を取得するには、以下のステップお試しください。

    1. IAM コンソールにサインインします。
    2. ナビゲーションペインでユーザーを選択します。
    3. ユーザーのアクセスキーを作成または管理するには、ユーザーを選択してからセキュリティ認証情報タブに移動します。

    AWS ルートアカウントの資格情報を取得するには、以下のステップをお試しください。

    1. ルートアカウントの認証情報を使用してAWS 管理コンソールにサインインします。
    2. アカウント名または番号を選択します。
    3. 表示されたメニューでMy Security Credentials を選択します。
    4. ルートアカウントのアクセスキーを管理または作成するには、Continue to Security Credentials をクリックし、[Access Keys]セクションを展開します。

    その他の認証オプションについては、ヘルプドキュメントの「Amazon Athena への認証」を参照してください。

  4. 「Save & Test」をクリックします
  5. 「Add Amazon Athena Connection」ページの「Permissions」タブに移動し、ユーザーベースの権限を更新します。

パーソナルアクセストークンの追加

パーソナルアクセストークン(PAT)は、Pipedream から Connect AI への接続を認証するために使用されます。アクセス制御の粒度を維持するために、サービスごとに個別の PAT を作成することをおすすめします。

  1. Connect AI アプリの右上にある歯車アイコン()をクリックして、設定ページを開きます。
  2. 設定ページの「Access Tokens」セクションに移動し、「Create PAT」をクリックします。
  3. PAT に名前を付けて「Create」をクリックします。
  4. パーソナルアクセストークンは作成時にのみ表示されますので、必ずコピーして安全な場所に保管してください。

これで接続の設定と PAT の生成が完了しました。Pipedream から Amazon Athena に接続する準備が整いました。


ステップ 2:Pipedream で環境変数を設定する

認証情報を Pipedream の環境変数として安全に保存します。

  1. Pipedream で Settings を開き、Environment Variables に移動します
  2. New Variable をクリックして、以下の変数を追加します:
  3. 変数名
    CDATA_EMAIL CData Connect AI のログイン用メールアドレス
    CDATA_PAT CData パーソナルアクセストークン
    OPENAI_API_KEY OpenAI API キー

ステップ 3:Pipedream ワークフローを作成する

3.1 HTTP トリガーの設定

  1. Pipedream で新しいワークフローを作成します
  2. トリガーとして HTTP / Webhook を選択します
  3. HTTP Response"Return a custom response from your workflow" に設定します

3.2 LLM ステップの追加

Node.js コードステップを追加し、名前を LLM とします。このステップでは、受信したリクエストから自然言語クエリを抽出します。

ステップのデフォルトコードを以下に置き換えてください:

import OpenAI from "openai";

export default defineComponent({
  async run({ steps }) {
    if (steps.trigger.event.method === "OPTIONS") {
      return { userQuery: null, isOptions: true };
    }

    const body = steps.trigger.event.body;
    const parsed = typeof body === "string" ? JSON.parse(body) : body;
    const userQuery = parsed?.query;

    console.log("USER QUERY:", userQuery);
    if (!userQuery) throw new Error("No query found in request body");

    return { userQuery };
  }
});

3.3 MCP ステップの追加

Node.js コードステップを追加し、名前を MCP とします。このステップでは、エージェント型の MCP フロー全体を実装します。利用可能な接続をすべて自動検出し、質問に最も関連する接続を選択し、スキーマとテーブルを動的に検出し、LLM を使って SQL クエリを生成し、Amazon Athena のデータに対して実行します。

このステップでは、CData Connect AI の以下の MCP ツールを順番に使用します:

MCP ツール 用途
getCatalogs CData Connect AI から利用可能な接続をすべて取得します
getSchemas 選択した接続のデータベーススキーマを取得します
getTables 選択したスキーマのすべてのテーブルとビューを取得します
queryData 生成された SQL クエリを実行し、結果を返します

ステップのデフォルトコードを以下に置き換えてください:


import fetch from "node-fetch";
import OpenAI from "openai";

export default defineComponent({
  async run({ steps }) {
    const email = process.env.CDATA_EMAIL;
    const pat = process.env.CDATA_PAT;
    const credentials = email + ":" + pat;
    const auth = Buffer.from(credentials).toString("base64");
    const llmOutput = steps.LLM;
    const userQuery = llmOutput.return_value.userQuery; // In Pipedream replace with: steps.LLM.$return_value.userQuery
    const MCP_URL = "https://mcp.cloud.cdata.com/mcp";
    const NL = String.fromCharCode(10);
    const CRNL = String.fromCharCode(13) + String.fromCharCode(10);

    const headers = {
      "Content-Type": "application/json",
      "Accept": "application/json, text/event-stream",
      "Authorization": "Basic " + auth
    };

    function parseSSE(raw) {
      try {
        const lines = raw.split(NL);
        for (let i = 0; i < lines.length; i++) {
          const line = lines.at(i);
          const trimmed = line.trim();
          if (trimmed.indexOf("data:") === 0) {
            const jsonStr = trimmed.slice(5).trim();
            if (jsonStr) {
              const json = JSON.parse(jsonStr);
              const result = json && json.result;
              const content = result && result.content;
              if (Array.isArray(content)) {
                return {
                  parsed: content.map(function(c) { return c.text || ""; }).join(NL),
                  isError: (result && result.isError) || false,
                  full: json
                };
              }
            }
          }
        }
      } catch (e) {
        console.log("SSE parse error:", e.message);
      }
      return { parsed: raw, isError: false, full: null };
    }

    function parseCSV(text) {
      let clean = text || "";
      if (clean.charAt(0) === '"' && clean.charAt(clean.length - 1) === '"') {
        clean = clean.slice(1, -1);
      }
      const ESC_CRNL = String.fromCharCode(92) + "r" + String.fromCharCode(92) + "n";
      const ESC_QUOTE = String.fromCharCode(92) + '"';
      const ESC_SLASH = String.fromCharCode(92) + String.fromCharCode(92);
      const SINGLE_SLASH = String.fromCharCode(92);
      clean = clean.split(ESC_CRNL).join(CRNL).split(ESC_QUOTE).join('"').split(ESC_SLASH).join(SINGLE_SLASH);
      const lines = clean.split(CRNL).filter(function(l) { return l.trim(); });
      return lines.slice(1).map(function(l) { return l.split(",").at(0).trim(); }).filter(Boolean);
    }

    async function initSession() {
      const res = await fetch(MCP_URL, {
        method: "POST",
        headers: headers,
        body: JSON.stringify({
          jsonrpc: "2.0",
          id: 1,
          method: "initialize",
          params: {
            protocolVersion: "2024-11-05",
            capabilities: {},
            clientInfo: { name: "pipedream", version: "1.0" }
          }
        })
      });
      return res.headers.get("mcp-session-id");
    }

    async function callMCP(id, method, args, sessionId) {
      const reqHeaders = Object.assign({}, headers);
      if (sessionId) {
        Object.assign(reqHeaders, { "mcp-session-id": sessionId });
      }
      const res = await fetch(MCP_URL, {
        method: "POST",
        headers: reqHeaders,
        body: JSON.stringify({
          jsonrpc: "2.0",
          id: id,
          method: "tools/call",
          params: { name: method, arguments: args }
        })
      });
      const raw = await res.text();
      const result = parseSSE(raw);
      result.raw = raw;
      return result;
    }

    const client = new OpenAI({ apiKey: process.env.OPENAI_API_KEY });
    const completions = client.chat.completions;

    const session1 = await initSession();
    const catalogsResult = await callMCP(2, "getCatalogs", {}, session1);
    const catalogs = parseCSV(catalogsResult.parsed);

    const systemMsg1 = "You are a data routing expert. Pick the MOST relevant connection name from the list. Return ONLY the connection name. Available connections: " + catalogs.join(", ");
    const connectionResponse = await completions.create({
      model: "gpt-4o-mini",
      messages: new Array(
        { role: "system", content: systemMsg1 },
        { role: "user", content: userQuery }
      )
    });
    const connectionName = connectionResponse.choices.at(0).message.content.trim();

    const session2 = await initSession();
    const schemasResult = await callMCP(2, "getSchemas", {
      connectionName: connectionName,
      catalogName: connectionName
    }, session2);
    const schemas = parseCSV(schemasResult.parsed);
    const schemaName = schemas.at(0) || "REST";

    const session3 = await initSession();
    const tablesResult = await callMCP(2, "getTables", {
      connectionName: connectionName,
      catalogName: connectionName,
      schemaName: schemaName
    }, session3);
    const tableNames = parseCSV(tablesResult.parsed);

    const queryLower = userQuery.toLowerCase();
    const isListTablesQuery =
      queryLower.indexOf("list") !== -1 ||
      queryLower.indexOf("what tables") !== -1 ||
      queryLower.indexOf("show tables") !== -1;

    if (isListTablesQuery) {
      return {
        success: true,
        connection: connectionName,
        message: "Available tables in " + connectionName + "." + schemaName,
        tables: tableNames
      };
    }

    const tableList = tableNames.map(function(t) {
      return connectionName + "." + schemaName + "." + t;
    }).join(", ");

    const systemMsg2 = "You are a SQL expert. Generate SQL for CData. Use format: connectionName.schemaName.TableName. Available tables: " + tableList + ". Return ONLY SQL. No markdown. No brackets.";
    const sqlResponse = await completions.create({
      model: "gpt-4o-mini",
      messages: new Array(
        { role: "system", content: systemMsg2 },
        { role: "user", content: userQuery }
      )
    });
    const sql = sqlResponse.choices.at(0).message.content.trim();

    if (!sql) { return { error: "LLM returned empty SQL" }; }

    const session4 = await initSession();
    const queryResult = await callMCP(2, "queryData", {
      query: sql,
      connectionName: connectionName
    }, session4);

    if (queryResult.full) {
      const content = queryResult.full.result && queryResult.full.result.content;
      if (Array.isArray(content)) {
        try {
          const parsed = JSON.parse(content.at(0).text);
          const results = parsed.results && parsed.results.at(0);
          return {
            sql: sql,
            connection: connectionName,
            data: (results && results.rows) || new Array(),
            schema: (results && results.schema) || new Array(),
            success: true
          };
        } catch (e) {
          return { sql: sql, connection: connectionName, raw: content.at(0).text, success: true };
        }
      }
    }
    return { sql: sql, connection: connectionName, raw: queryResult.raw };
  }
});

注意: Pipedream にコードを貼り付ける際は、該当行のコメントに記載されているとおり、llmOutput.return_value.userQuerysteps.LLM.$return_value.userQuery に置き換えてください。

3.4 レスポンスステップの設定

  1. Return HTTP Response ステップを追加し、名前を Response とします
  2. Response Status Code を 200 に設定します
  3. Response Body{{steps.mcp.$return_value}} に設定します
  4. 以下の Response Headers を追加します。「Response Headers」をクリックし、 をクリックして追加してください:
  5. Key Value
    Access-Control-Allow-Origin *
    Access-Control-Allow-Methods POST, OPTIONS
    Access-Control-Allow-Headers Content-Type

ステップ 4:ワークフローをテストして Amazon Athena のデータを操作する

トリガーにテストイベントを設定する

  1. ワークフロー内の trigger ステップをクリックします
  2. Generate Test Event をクリックします
  3. イベントのボディを以下のように編集します:
  4. {
      "query": "list all tables"
    }
    

ワークフロー全体を実行する

  1. トリガーステップの下部にある Test workflow をクリックします
  2. Pipedream がテストイベントを使用して、すべてのステップを順番に実行します
  3. 各ステップが正常に完了すると、緑色に変わります

各ステップの結果を確認する

テスト実行が完了したら、各ステップのタブをクリックして Exports タブで出力を確認しましょう:

ステップ Exports で確認するポイント
trigger body.query - クエリが正しく受信されたことを確認
LLM userQuery - クエリが正しく抽出されたことを確認
MCP connection, sql, data, schema - データが取得されたことを確認
Response $response.body - 最終的な JSON レスポンス

各ステップ内の Logs タブには、生成された SQL、選択された接続、MCP の生レスポンスなどの詳細な出力が表示されます。

注意: Response ステップの Exports タブには、{ "success": true }"status 200" のようなサマリーのみが表示されます。これはワークフローが正常に実行されたことを示していますが、完全なデータは表示されません。

データ行、SQL、スキーマを含む完全な出力を確認するには、MCP ステップのタブをクリックし、Exports タブを確認してください。$return_value を展開すると、完全なレスポンスが表示されます:

このワークフローは以下を自動的に実行します:

  1. CData の接続をすべて検出
  2. 質問に最も関連する接続を選択
  3. スキーマとテーブルを動的に検出
  4. 適切な SQL クエリを生成して実行
  5. 結果を返却

仕組みについて

この連携では、CData Connect AI の以下の MCP ツールを順番に使用します:

MCP ツール 用途
getCatalogs CData Connect AI から利用可能な接続をすべて取得します
getSchemas 特定の接続のデータベーススキーマを取得します
getTables 特定のスキーマのすべてのテーブルとビューを取得します
queryData SQL クエリを実行し、結果を返します

OpenAI の LLM が、自然言語の質問と CData MCP ツールの間のインテリジェントレイヤーとして機能し、適切な接続の選択、データ構造の検出、正確な SQL クエリの生成を自動で行います。

CData Connect AI でビジネスシステムのデータ活用を今すぐスタート

Pipedream と CData Connect AI を組み合わせることで、自然言語クエリがエンタープライズシステム全体のリアルタイムデータ操作に自動変換される、インテリジェントな AI 駆動ワークフローを実現できます。ETL パイプライン、データ同期ジョブ、カスタム連携ロジックは不要です。このアプローチにより、ガバナンスの強化、運用コストの削減、AI ワークフローからのより迅速で正確なレスポンスが可能になります。

いかがでしたか?Pipedream から Amazon Athena へのデータ接続が簡単に完了したのではないでしょうか。業務に使えそう、と感じてくださった方は、14日間の無償トライアルで AI ツールからビジネスシステムへのリアルタイムデータ接続をぜひお試しください。

はじめる準備はできましたか?

CData Connect AI の詳細、または無料トライアルにお申し込みください:

無料トライアル