CData Connect AI MCP Server でPipedream からAmazon S3 のデータと連携しよう!
Pipedream は、API の接続、タスクの自動化、サーバーレス関数を使ったイベント駆動型ワークフローの構築をサポートする、クラウドベースのワークフロー自動化プラットフォームです。CData Connect AI のリモート MCP と組み合わせることで、データレプリケーションを行わずに、Pipedream から自然言語を使って Amazon S3 のデータとリアルタイムでやり取りできるようになります。
CData Connect AI は、Amazon S3 に接続するための専用クラウド間インターフェースを提供します。CData Connect AI Remote MCP Server により、Pipedream と Amazon S3 の間でセキュアな通信が可能になります。LLM がデータソースをインテリジェントに検出し、SQL クエリを動的に生成することで、Pipedream のワークフローから Amazon S3 に質問してデータを取得できます。
この記事では、Pipedream で自然言語によるデータクエリワークフローを構築し、Amazon S3 のデータを会話形式で探索する方法をご紹介します。ここで紹介する接続の原則は、あらゆる Pipedream ワークフローに適用できます。Connect AI を使用すれば、Amazon S3 に加えて、数百の他のデータソースにもアクセスできるワークフローやエージェントを構築できます。
前提条件
- CData Connect AI アカウント(Amazon S3 など、少なくとも1つのアクティブな接続が必要)
- Pipedream アカウント
- OpenAI アカウント(API キー付き)
-
CData Connect AI の認証情報:
- メールアドレス(Basic 認証のユーザー名として使用)
- パーソナルアクセストークン(PAT)(CData Connect AI の設定ページから生成)
ステップ 1:Pipedream 用の Amazon S3 接続を設定する
Pipedream から Amazon S3 への接続は、CData Connect AI のリモート MCP を通じて実現されます。Pipedream から Amazon S3 とやり取りするために、まず CData Connect AI で Amazon S3 接続を作成・設定していきましょう。
- Connect AI にログインし、「Sources」をクリックして、「Add Connection」をクリックします
- 「Add Connection」パネルから「Amazon S3」を選択します
-
Amazon S3 に接続するために必要な認証プロパティを入力します。
Amazon S3 リクエストを認可するには、管理者アカウントまたはカスタム権限を持つIAM ユーザーの認証情報を入力します。AccessKey をアクセスキーID に設定します。SecretKey をシークレットアクセスキーに設定します。
Note: AWS アカウント管理者として接続できますが、AWS サービスにアクセスするにはIAM ユーザー認証情報を使用することをお勧めします。
尚、CData 製品はAmazon S3 のファイルの一覧表示やユーザー管理情報の取得用です。S3 に保管されているExcel、CSV、JSON などのファイル内のデータを読み込みたい場合には、Excel Driver、CSV Driver、JSON Driver をご利用ください。
アクセスキーの取得
IAM ユーザーの資格情報を取得するには:
- IAM コンソールにサインインします。
- ナビゲーションペインで「ユーザー」を選択します。
- ユーザーのアクセスキーを作成または管理するには、ユーザーを選択してから「セキュリティ認証情報」タブを選択します。
AWS ルートアカウントの資格情報を取得するには:
- ルートアカウントの資格情報を使用してAWS 管理コンソールにサインインします。
- アカウント名または番号を選択し、表示されたメニューで「My Security Credentials」を選択します。
- 「Continue to Security Credentials」をクリックし、「Access Keys」セクションを展開して、ルートアカウントのアクセスキーを管理または作成します。
AWS ロールとして認証
多くの場合、認証にはAWS ルートユーザーのダイレクトなセキュリティ認証情報ではなく、IAM ロールを使用することをお勧めします。RoleARN を指定することでAWS ロールを代わりに使用できます。これにより、CData 製品は指定されたロールの資格情報を取得しようと試みます。
(すでにEC2 インスタンスなどで接続されているのではなく)AWS に接続している場合は、ロールを引き受けるIAM ユーザーのAccessKey とSecretKey を追加で指定する必要があります。AWS ルートユーザーのAccessKey および SecretKey を指定する場合、ロールは使用できません。
SSO 認証
SSO 認証を必要とするユーザーおよびロールには、RoleARN およびPrincipalArn 接続プロパティを指定してください。各Identity Provider に固有のSSOProperties を指定し、AccessKey とSecretKey を空のままにする必要があります。これにより、CData 製品は一時的な認証資格情報を取得するために、リクエストでSSO 認証情報を送信します。
- 「Save & Test」をクリックします
-
「Add Amazon S3 Connection」ページの「Permissions」タブに移動し、ユーザーベースの権限を更新します。
パーソナルアクセストークンの追加
パーソナルアクセストークン(PAT)は、Pipedream から Connect AI への接続を認証するために使用されます。アクセス制御の粒度を維持するために、サービスごとに個別の PAT を作成することをおすすめします。
- Connect AI アプリの右上にある歯車アイコン()をクリックして、設定ページを開きます。
- 設定ページの「Access Tokens」セクションに移動し、「Create PAT」をクリックします。
- PAT に名前を付けて「Create」をクリックします。

- パーソナルアクセストークンは作成時にのみ表示されますので、必ずコピーして安全な場所に保管してください。
これで接続の設定と PAT の生成が完了しました。Pipedream から Amazon S3 に接続する準備が整いました。
ステップ 2:Pipedream で環境変数を設定する
認証情報を Pipedream の環境変数として安全に保存します。
- Pipedream で Settings を開き、Environment Variables に移動します
- New Variable をクリックして、以下の変数を追加します:
| 変数名 | 値 |
|---|---|
| CDATA_EMAIL | CData Connect AI のログイン用メールアドレス |
| CDATA_PAT | CData パーソナルアクセストークン |
| OPENAI_API_KEY | OpenAI API キー |
ステップ 3:Pipedream ワークフローを作成する
3.1 HTTP トリガーの設定
- Pipedream で新しいワークフローを作成します
- トリガーとして HTTP / Webhook を選択します
- HTTP Response を "Return a custom response from your workflow" に設定します
3.2 LLM ステップの追加
Node.js コードステップを追加し、名前を LLM とします。このステップでは、受信したリクエストから自然言語クエリを抽出します。
ステップのデフォルトコードを以下に置き換えてください:
import OpenAI from "openai";
export default defineComponent({
async run({ steps }) {
if (steps.trigger.event.method === "OPTIONS") {
return { userQuery: null, isOptions: true };
}
const body = steps.trigger.event.body;
const parsed = typeof body === "string" ? JSON.parse(body) : body;
const userQuery = parsed?.query;
console.log("USER QUERY:", userQuery);
if (!userQuery) throw new Error("No query found in request body");
return { userQuery };
}
});
3.3 MCP ステップの追加
Node.js コードステップを追加し、名前を MCP とします。このステップでは、エージェント型の MCP フロー全体を実装します。利用可能な接続をすべて自動検出し、質問に最も関連する接続を選択し、スキーマとテーブルを動的に検出し、LLM を使って SQL クエリを生成し、Amazon S3 のデータに対して実行します。
このステップでは、CData Connect AI の以下の MCP ツールを順番に使用します:
| MCP ツール | 用途 |
|---|---|
| getCatalogs | CData Connect AI から利用可能な接続をすべて取得します |
| getSchemas | 選択した接続のデータベーススキーマを取得します |
| getTables | 選択したスキーマのすべてのテーブルとビューを取得します |
| queryData | 生成された SQL クエリを実行し、結果を返します |
ステップのデフォルトコードを以下に置き換えてください:
import fetch from "node-fetch";
import OpenAI from "openai";
export default defineComponent({
async run({ steps }) {
const email = process.env.CDATA_EMAIL;
const pat = process.env.CDATA_PAT;
const credentials = email + ":" + pat;
const auth = Buffer.from(credentials).toString("base64");
const llmOutput = steps.LLM;
const userQuery = llmOutput.return_value.userQuery; // In Pipedream replace with: steps.LLM.$return_value.userQuery
const MCP_URL = "https://mcp.cloud.cdata.com/mcp";
const NL = String.fromCharCode(10);
const CRNL = String.fromCharCode(13) + String.fromCharCode(10);
const headers = {
"Content-Type": "application/json",
"Accept": "application/json, text/event-stream",
"Authorization": "Basic " + auth
};
function parseSSE(raw) {
try {
const lines = raw.split(NL);
for (let i = 0; i < lines.length; i++) {
const line = lines.at(i);
const trimmed = line.trim();
if (trimmed.indexOf("data:") === 0) {
const jsonStr = trimmed.slice(5).trim();
if (jsonStr) {
const json = JSON.parse(jsonStr);
const result = json && json.result;
const content = result && result.content;
if (Array.isArray(content)) {
return {
parsed: content.map(function(c) { return c.text || ""; }).join(NL),
isError: (result && result.isError) || false,
full: json
};
}
}
}
}
} catch (e) {
console.log("SSE parse error:", e.message);
}
return { parsed: raw, isError: false, full: null };
}
function parseCSV(text) {
let clean = text || "";
if (clean.charAt(0) === '"' && clean.charAt(clean.length - 1) === '"') {
clean = clean.slice(1, -1);
}
const ESC_CRNL = String.fromCharCode(92) + "r" + String.fromCharCode(92) + "n";
const ESC_QUOTE = String.fromCharCode(92) + '"';
const ESC_SLASH = String.fromCharCode(92) + String.fromCharCode(92);
const SINGLE_SLASH = String.fromCharCode(92);
clean = clean.split(ESC_CRNL).join(CRNL).split(ESC_QUOTE).join('"').split(ESC_SLASH).join(SINGLE_SLASH);
const lines = clean.split(CRNL).filter(function(l) { return l.trim(); });
return lines.slice(1).map(function(l) { return l.split(",").at(0).trim(); }).filter(Boolean);
}
async function initSession() {
const res = await fetch(MCP_URL, {
method: "POST",
headers: headers,
body: JSON.stringify({
jsonrpc: "2.0",
id: 1,
method: "initialize",
params: {
protocolVersion: "2024-11-05",
capabilities: {},
clientInfo: { name: "pipedream", version: "1.0" }
}
})
});
return res.headers.get("mcp-session-id");
}
async function callMCP(id, method, args, sessionId) {
const reqHeaders = Object.assign({}, headers);
if (sessionId) {
Object.assign(reqHeaders, { "mcp-session-id": sessionId });
}
const res = await fetch(MCP_URL, {
method: "POST",
headers: reqHeaders,
body: JSON.stringify({
jsonrpc: "2.0",
id: id,
method: "tools/call",
params: { name: method, arguments: args }
})
});
const raw = await res.text();
const result = parseSSE(raw);
result.raw = raw;
return result;
}
const client = new OpenAI({ apiKey: process.env.OPENAI_API_KEY });
const completions = client.chat.completions;
const session1 = await initSession();
const catalogsResult = await callMCP(2, "getCatalogs", {}, session1);
const catalogs = parseCSV(catalogsResult.parsed);
const systemMsg1 = "You are a data routing expert. Pick the MOST relevant connection name from the list. Return ONLY the connection name. Available connections: " + catalogs.join(", ");
const connectionResponse = await completions.create({
model: "gpt-4o-mini",
messages: new Array(
{ role: "system", content: systemMsg1 },
{ role: "user", content: userQuery }
)
});
const connectionName = connectionResponse.choices.at(0).message.content.trim();
const session2 = await initSession();
const schemasResult = await callMCP(2, "getSchemas", {
connectionName: connectionName,
catalogName: connectionName
}, session2);
const schemas = parseCSV(schemasResult.parsed);
const schemaName = schemas.at(0) || "REST";
const session3 = await initSession();
const tablesResult = await callMCP(2, "getTables", {
connectionName: connectionName,
catalogName: connectionName,
schemaName: schemaName
}, session3);
const tableNames = parseCSV(tablesResult.parsed);
const queryLower = userQuery.toLowerCase();
const isListTablesQuery =
queryLower.indexOf("list") !== -1 ||
queryLower.indexOf("what tables") !== -1 ||
queryLower.indexOf("show tables") !== -1;
if (isListTablesQuery) {
return {
success: true,
connection: connectionName,
message: "Available tables in " + connectionName + "." + schemaName,
tables: tableNames
};
}
const tableList = tableNames.map(function(t) {
return connectionName + "." + schemaName + "." + t;
}).join(", ");
const systemMsg2 = "You are a SQL expert. Generate SQL for CData. Use format: connectionName.schemaName.TableName. Available tables: " + tableList + ". Return ONLY SQL. No markdown. No brackets.";
const sqlResponse = await completions.create({
model: "gpt-4o-mini",
messages: new Array(
{ role: "system", content: systemMsg2 },
{ role: "user", content: userQuery }
)
});
const sql = sqlResponse.choices.at(0).message.content.trim();
if (!sql) { return { error: "LLM returned empty SQL" }; }
const session4 = await initSession();
const queryResult = await callMCP(2, "queryData", {
query: sql,
connectionName: connectionName
}, session4);
if (queryResult.full) {
const content = queryResult.full.result && queryResult.full.result.content;
if (Array.isArray(content)) {
try {
const parsed = JSON.parse(content.at(0).text);
const results = parsed.results && parsed.results.at(0);
return {
sql: sql,
connection: connectionName,
data: (results && results.rows) || new Array(),
schema: (results && results.schema) || new Array(),
success: true
};
} catch (e) {
return { sql: sql, connection: connectionName, raw: content.at(0).text, success: true };
}
}
}
return { sql: sql, connection: connectionName, raw: queryResult.raw };
}
});
注意: Pipedream にコードを貼り付ける際は、該当行のコメントに記載されているとおり、llmOutput.return_value.userQuery を steps.LLM.$return_value.userQuery に置き換えてください。
3.4 レスポンスステップの設定
- Return HTTP Response ステップを追加し、名前を Response とします
- Response Status Code を 200 に設定します
- Response Body を {{steps.mcp.$return_value}} に設定します
- 以下の Response Headers を追加します。「Response Headers」をクリックし、 をクリックして追加してください:
| Key | Value |
|---|---|
| Access-Control-Allow-Origin | * |
| Access-Control-Allow-Methods | POST, OPTIONS |
| Access-Control-Allow-Headers | Content-Type |
ステップ 4:ワークフローをテストして Amazon S3 のデータを操作する
トリガーにテストイベントを設定する
- ワークフロー内の trigger ステップをクリックします
- Generate Test Event をクリックします
- イベントのボディを以下のように編集します:
{
"query": "list all tables"
}
ワークフロー全体を実行する
- トリガーステップの下部にある Test workflow をクリックします
- Pipedream がテストイベントを使用して、すべてのステップを順番に実行します
- 各ステップが正常に完了すると、緑色に変わります
各ステップの結果を確認する
テスト実行が完了したら、各ステップのタブをクリックして Exports タブで出力を確認しましょう:
| ステップ | Exports で確認するポイント |
|---|---|
| trigger | body.query - クエリが正しく受信されたことを確認 |
| LLM | userQuery - クエリが正しく抽出されたことを確認 |
| MCP | connection, sql, data, schema - データが取得されたことを確認 |
| Response | $response.body - 最終的な JSON レスポンス |
各ステップ内の Logs タブには、生成された SQL、選択された接続、MCP の生レスポンスなどの詳細な出力が表示されます。
注意: Response ステップの Exports タブには、{ "success": true } と "status 200" のようなサマリーのみが表示されます。これはワークフローが正常に実行されたことを示していますが、完全なデータは表示されません。
データ行、SQL、スキーマを含む完全な出力を確認するには、MCP ステップのタブをクリックし、Exports タブを確認してください。$return_value を展開すると、完全なレスポンスが表示されます:
このワークフローは以下を自動的に実行します:
- CData の接続をすべて検出
- 質問に最も関連する接続を選択
- スキーマとテーブルを動的に検出
- 適切な SQL クエリを生成して実行
- 結果を返却
仕組みについて
この連携では、CData Connect AI の以下の MCP ツールを順番に使用します:
| MCP ツール | 用途 |
|---|---|
| getCatalogs | CData Connect AI から利用可能な接続をすべて取得します |
| getSchemas | 特定の接続のデータベーススキーマを取得します |
| getTables | 特定のスキーマのすべてのテーブルとビューを取得します |
| queryData | SQL クエリを実行し、結果を返します |
OpenAI の LLM が、自然言語の質問と CData MCP ツールの間のインテリジェントレイヤーとして機能し、適切な接続の選択、データ構造の検出、正確な SQL クエリの生成を自動で行います。
CData Connect AI でビジネスシステムのデータ活用を今すぐスタート
Pipedream と CData Connect AI を組み合わせることで、自然言語クエリがエンタープライズシステム全体のリアルタイムデータ操作に自動変換される、インテリジェントな AI 駆動ワークフローを実現できます。ETL パイプライン、データ同期ジョブ、カスタム連携ロジックは不要です。このアプローチにより、ガバナンスの強化、運用コストの削減、AI ワークフローからのより迅速で正確なレスポンスが可能になります。
いかがでしたか?Pipedream から Amazon S3 へのデータ接続が簡単に完了したのではないでしょうか。業務に使えそう、と感じてくださった方は、14日間の無償トライアルで AI ツールからビジネスシステムへのリアルタイムデータ接続をぜひお試しください。