CData Connect AI MCP Server でPipedream からYouTube Analytics のデータと連携しよう!
Pipedream は、API の接続、タスクの自動化、サーバーレス関数を使ったイベント駆動型ワークフローの構築をサポートする、クラウドベースのワークフロー自動化プラットフォームです。CData Connect AI のリモート MCP と組み合わせることで、データレプリケーションを行わずに、Pipedream から自然言語を使って YouTube Analytics のデータとリアルタイムでやり取りできるようになります。
CData Connect AI は、YouTube Analytics に接続するための専用クラウド間インターフェースを提供します。CData Connect AI Remote MCP Server により、Pipedream と YouTube Analytics の間でセキュアな通信が可能になります。LLM がデータソースをインテリジェントに検出し、SQL クエリを動的に生成することで、Pipedream のワークフローから YouTube Analytics に質問してデータを取得できます。
この記事では、Pipedream で自然言語によるデータクエリワークフローを構築し、YouTube Analytics のデータを会話形式で探索する方法をご紹介します。ここで紹介する接続の原則は、あらゆる Pipedream ワークフローに適用できます。Connect AI を使用すれば、YouTube Analytics に加えて、数百の他のデータソースにもアクセスできるワークフローやエージェントを構築できます。
前提条件
- CData Connect AI アカウント(YouTube Analytics など、少なくとも1つのアクティブな接続が必要)
- Pipedream アカウント
- OpenAI アカウント(API キー付き)
-
CData Connect AI の認証情報:
- メールアドレス(Basic 認証のユーザー名として使用)
- パーソナルアクセストークン(PAT)(CData Connect AI の設定ページから生成)
ステップ 1:Pipedream 用の YouTube Analytics 接続を設定する
Pipedream から YouTube Analytics への接続は、CData Connect AI のリモート MCP を通じて実現されます。Pipedream から YouTube Analytics とやり取りするために、まず CData Connect AI で YouTube Analytics 接続を作成・設定していきましょう。
- Connect AI にログインし、「Sources」をクリックして、「Add Connection」をクリックします
- 「Add Connection」パネルから「YouTube Analytics」を選択します
-
YouTube Analytics に接続するために必要な認証プロパティを入力します。
YouTube Analytics への接続には、OAuth 認証標準を使います。ユーザーアカウントまたはサービスアカウントで認証できます。組織全体のアクセススコープをCData 製品に許可するには、サービスアカウントが必要です。下記で説明するとおり、CData 製品はこれらの認証フローをサポートします。
ユーザー資格情報の接続プロパティを設定せずに接続できます。次を設定して、接続してください。ChannelId:YouTube チャンネルのId に設定。指定しない場合、認証されたユーザーのチャンネルのデータが返されます。ContentOwnerId:コンテンツ所有者のレポートを生成する場合に設定。接続すると、CData 製品はデフォルトブラウザでOAuth エンドポイントを開きます。ログインして、アプリケーションにアクセス許可を与えます。CData 製品がOAuth プロセスを完了します。
他のOAuth 認証フローについては、ヘルプドキュメントの「OAuth 認証の使用」を参照してください。
- 「Save & Test」をクリックします
-
「Add YouTube Analytics Connection」ページの「Permissions」タブに移動し、ユーザーベースの権限を更新します。
パーソナルアクセストークンの追加
パーソナルアクセストークン(PAT)は、Pipedream から Connect AI への接続を認証するために使用されます。アクセス制御の粒度を維持するために、サービスごとに個別の PAT を作成することをおすすめします。
- Connect AI アプリの右上にある歯車アイコン()をクリックして、設定ページを開きます。
- 設定ページの「Access Tokens」セクションに移動し、「Create PAT」をクリックします。
- PAT に名前を付けて「Create」をクリックします。

- パーソナルアクセストークンは作成時にのみ表示されますので、必ずコピーして安全な場所に保管してください。
これで接続の設定と PAT の生成が完了しました。Pipedream から YouTube Analytics に接続する準備が整いました。
ステップ 2:Pipedream で環境変数を設定する
認証情報を Pipedream の環境変数として安全に保存します。
- Pipedream で Settings を開き、Environment Variables に移動します
- New Variable をクリックして、以下の変数を追加します:
| 変数名 | 値 |
|---|---|
| CDATA_EMAIL | CData Connect AI のログイン用メールアドレス |
| CDATA_PAT | CData パーソナルアクセストークン |
| OPENAI_API_KEY | OpenAI API キー |
ステップ 3:Pipedream ワークフローを作成する
3.1 HTTP トリガーの設定
- Pipedream で新しいワークフローを作成します
- トリガーとして HTTP / Webhook を選択します
- HTTP Response を "Return a custom response from your workflow" に設定します
3.2 LLM ステップの追加
Node.js コードステップを追加し、名前を LLM とします。このステップでは、受信したリクエストから自然言語クエリを抽出します。
ステップのデフォルトコードを以下に置き換えてください:
import OpenAI from "openai";
export default defineComponent({
async run({ steps }) {
if (steps.trigger.event.method === "OPTIONS") {
return { userQuery: null, isOptions: true };
}
const body = steps.trigger.event.body;
const parsed = typeof body === "string" ? JSON.parse(body) : body;
const userQuery = parsed?.query;
console.log("USER QUERY:", userQuery);
if (!userQuery) throw new Error("No query found in request body");
return { userQuery };
}
});
3.3 MCP ステップの追加
Node.js コードステップを追加し、名前を MCP とします。このステップでは、エージェント型の MCP フロー全体を実装します。利用可能な接続をすべて自動検出し、質問に最も関連する接続を選択し、スキーマとテーブルを動的に検出し、LLM を使って SQL クエリを生成し、YouTube Analytics のデータに対して実行します。
このステップでは、CData Connect AI の以下の MCP ツールを順番に使用します:
| MCP ツール | 用途 |
|---|---|
| getCatalogs | CData Connect AI から利用可能な接続をすべて取得します |
| getSchemas | 選択した接続のデータベーススキーマを取得します |
| getTables | 選択したスキーマのすべてのテーブルとビューを取得します |
| queryData | 生成された SQL クエリを実行し、結果を返します |
ステップのデフォルトコードを以下に置き換えてください:
import fetch from "node-fetch";
import OpenAI from "openai";
export default defineComponent({
async run({ steps }) {
const email = process.env.CDATA_EMAIL;
const pat = process.env.CDATA_PAT;
const credentials = email + ":" + pat;
const auth = Buffer.from(credentials).toString("base64");
const llmOutput = steps.LLM;
const userQuery = llmOutput.return_value.userQuery; // In Pipedream replace with: steps.LLM.$return_value.userQuery
const MCP_URL = "https://mcp.cloud.cdata.com/mcp";
const NL = String.fromCharCode(10);
const CRNL = String.fromCharCode(13) + String.fromCharCode(10);
const headers = {
"Content-Type": "application/json",
"Accept": "application/json, text/event-stream",
"Authorization": "Basic " + auth
};
function parseSSE(raw) {
try {
const lines = raw.split(NL);
for (let i = 0; i < lines.length; i++) {
const line = lines.at(i);
const trimmed = line.trim();
if (trimmed.indexOf("data:") === 0) {
const jsonStr = trimmed.slice(5).trim();
if (jsonStr) {
const json = JSON.parse(jsonStr);
const result = json && json.result;
const content = result && result.content;
if (Array.isArray(content)) {
return {
parsed: content.map(function(c) { return c.text || ""; }).join(NL),
isError: (result && result.isError) || false,
full: json
};
}
}
}
}
} catch (e) {
console.log("SSE parse error:", e.message);
}
return { parsed: raw, isError: false, full: null };
}
function parseCSV(text) {
let clean = text || "";
if (clean.charAt(0) === '"' && clean.charAt(clean.length - 1) === '"') {
clean = clean.slice(1, -1);
}
const ESC_CRNL = String.fromCharCode(92) + "r" + String.fromCharCode(92) + "n";
const ESC_QUOTE = String.fromCharCode(92) + '"';
const ESC_SLASH = String.fromCharCode(92) + String.fromCharCode(92);
const SINGLE_SLASH = String.fromCharCode(92);
clean = clean.split(ESC_CRNL).join(CRNL).split(ESC_QUOTE).join('"').split(ESC_SLASH).join(SINGLE_SLASH);
const lines = clean.split(CRNL).filter(function(l) { return l.trim(); });
return lines.slice(1).map(function(l) { return l.split(",").at(0).trim(); }).filter(Boolean);
}
async function initSession() {
const res = await fetch(MCP_URL, {
method: "POST",
headers: headers,
body: JSON.stringify({
jsonrpc: "2.0",
id: 1,
method: "initialize",
params: {
protocolVersion: "2024-11-05",
capabilities: {},
clientInfo: { name: "pipedream", version: "1.0" }
}
})
});
return res.headers.get("mcp-session-id");
}
async function callMCP(id, method, args, sessionId) {
const reqHeaders = Object.assign({}, headers);
if (sessionId) {
Object.assign(reqHeaders, { "mcp-session-id": sessionId });
}
const res = await fetch(MCP_URL, {
method: "POST",
headers: reqHeaders,
body: JSON.stringify({
jsonrpc: "2.0",
id: id,
method: "tools/call",
params: { name: method, arguments: args }
})
});
const raw = await res.text();
const result = parseSSE(raw);
result.raw = raw;
return result;
}
const client = new OpenAI({ apiKey: process.env.OPENAI_API_KEY });
const completions = client.chat.completions;
const session1 = await initSession();
const catalogsResult = await callMCP(2, "getCatalogs", {}, session1);
const catalogs = parseCSV(catalogsResult.parsed);
const systemMsg1 = "You are a data routing expert. Pick the MOST relevant connection name from the list. Return ONLY the connection name. Available connections: " + catalogs.join(", ");
const connectionResponse = await completions.create({
model: "gpt-4o-mini",
messages: new Array(
{ role: "system", content: systemMsg1 },
{ role: "user", content: userQuery }
)
});
const connectionName = connectionResponse.choices.at(0).message.content.trim();
const session2 = await initSession();
const schemasResult = await callMCP(2, "getSchemas", {
connectionName: connectionName,
catalogName: connectionName
}, session2);
const schemas = parseCSV(schemasResult.parsed);
const schemaName = schemas.at(0) || "REST";
const session3 = await initSession();
const tablesResult = await callMCP(2, "getTables", {
connectionName: connectionName,
catalogName: connectionName,
schemaName: schemaName
}, session3);
const tableNames = parseCSV(tablesResult.parsed);
const queryLower = userQuery.toLowerCase();
const isListTablesQuery =
queryLower.indexOf("list") !== -1 ||
queryLower.indexOf("what tables") !== -1 ||
queryLower.indexOf("show tables") !== -1;
if (isListTablesQuery) {
return {
success: true,
connection: connectionName,
message: "Available tables in " + connectionName + "." + schemaName,
tables: tableNames
};
}
const tableList = tableNames.map(function(t) {
return connectionName + "." + schemaName + "." + t;
}).join(", ");
const systemMsg2 = "You are a SQL expert. Generate SQL for CData. Use format: connectionName.schemaName.TableName. Available tables: " + tableList + ". Return ONLY SQL. No markdown. No brackets.";
const sqlResponse = await completions.create({
model: "gpt-4o-mini",
messages: new Array(
{ role: "system", content: systemMsg2 },
{ role: "user", content: userQuery }
)
});
const sql = sqlResponse.choices.at(0).message.content.trim();
if (!sql) { return { error: "LLM returned empty SQL" }; }
const session4 = await initSession();
const queryResult = await callMCP(2, "queryData", {
query: sql,
connectionName: connectionName
}, session4);
if (queryResult.full) {
const content = queryResult.full.result && queryResult.full.result.content;
if (Array.isArray(content)) {
try {
const parsed = JSON.parse(content.at(0).text);
const results = parsed.results && parsed.results.at(0);
return {
sql: sql,
connection: connectionName,
data: (results && results.rows) || new Array(),
schema: (results && results.schema) || new Array(),
success: true
};
} catch (e) {
return { sql: sql, connection: connectionName, raw: content.at(0).text, success: true };
}
}
}
return { sql: sql, connection: connectionName, raw: queryResult.raw };
}
});
注意: Pipedream にコードを貼り付ける際は、該当行のコメントに記載されているとおり、llmOutput.return_value.userQuery を steps.LLM.$return_value.userQuery に置き換えてください。
3.4 レスポンスステップの設定
- Return HTTP Response ステップを追加し、名前を Response とします
- Response Status Code を 200 に設定します
- Response Body を {{steps.mcp.$return_value}} に設定します
- 以下の Response Headers を追加します。「Response Headers」をクリックし、 をクリックして追加してください:
| Key | Value |
|---|---|
| Access-Control-Allow-Origin | * |
| Access-Control-Allow-Methods | POST, OPTIONS |
| Access-Control-Allow-Headers | Content-Type |
ステップ 4:ワークフローをテストして YouTube Analytics のデータを操作する
トリガーにテストイベントを設定する
- ワークフロー内の trigger ステップをクリックします
- Generate Test Event をクリックします
- イベントのボディを以下のように編集します:
{
"query": "list all tables"
}
ワークフロー全体を実行する
- トリガーステップの下部にある Test workflow をクリックします
- Pipedream がテストイベントを使用して、すべてのステップを順番に実行します
- 各ステップが正常に完了すると、緑色に変わります
各ステップの結果を確認する
テスト実行が完了したら、各ステップのタブをクリックして Exports タブで出力を確認しましょう:
| ステップ | Exports で確認するポイント |
|---|---|
| trigger | body.query - クエリが正しく受信されたことを確認 |
| LLM | userQuery - クエリが正しく抽出されたことを確認 |
| MCP | connection, sql, data, schema - データが取得されたことを確認 |
| Response | $response.body - 最終的な JSON レスポンス |
各ステップ内の Logs タブには、生成された SQL、選択された接続、MCP の生レスポンスなどの詳細な出力が表示されます。
注意: Response ステップの Exports タブには、{ "success": true } と "status 200" のようなサマリーのみが表示されます。これはワークフローが正常に実行されたことを示していますが、完全なデータは表示されません。
データ行、SQL、スキーマを含む完全な出力を確認するには、MCP ステップのタブをクリックし、Exports タブを確認してください。$return_value を展開すると、完全なレスポンスが表示されます:
このワークフローは以下を自動的に実行します:
- CData の接続をすべて検出
- 質問に最も関連する接続を選択
- スキーマとテーブルを動的に検出
- 適切な SQL クエリを生成して実行
- 結果を返却
仕組みについて
この連携では、CData Connect AI の以下の MCP ツールを順番に使用します:
| MCP ツール | 用途 |
|---|---|
| getCatalogs | CData Connect AI から利用可能な接続をすべて取得します |
| getSchemas | 特定の接続のデータベーススキーマを取得します |
| getTables | 特定のスキーマのすべてのテーブルとビューを取得します |
| queryData | SQL クエリを実行し、結果を返します |
OpenAI の LLM が、自然言語の質問と CData MCP ツールの間のインテリジェントレイヤーとして機能し、適切な接続の選択、データ構造の検出、正確な SQL クエリの生成を自動で行います。
CData Connect AI でビジネスシステムのデータ活用を今すぐスタート
Pipedream と CData Connect AI を組み合わせることで、自然言語クエリがエンタープライズシステム全体のリアルタイムデータ操作に自動変換される、インテリジェントな AI 駆動ワークフローを実現できます。ETL パイプライン、データ同期ジョブ、カスタム連携ロジックは不要です。このアプローチにより、ガバナンスの強化、運用コストの削減、AI ワークフローからのより迅速で正確なレスポンスが可能になります。
いかがでしたか?Pipedream から YouTube Analytics へのデータ接続が簡単に完了したのではないでしょうか。業務に使えそう、と感じてくださった方は、14日間の無償トライアルで AI ツールからビジネスシステムへのリアルタイムデータ接続をぜひお試しください。