CData Connect AI MCP Server でPipedream からHubSpot のデータと連携しよう!
Pipedream は、API の接続、タスクの自動化、サーバーレス関数を使ったイベント駆動型ワークフローの構築をサポートする、クラウドベースのワークフロー自動化プラットフォームです。CData Connect AI のリモート MCP と組み合わせることで、データレプリケーションを行わずに、Pipedream から自然言語を使って HubSpot のデータとリアルタイムでやり取りできるようになります。
CData Connect AI は、HubSpot に接続するための専用クラウド間インターフェースを提供します。CData Connect AI Remote MCP Server により、Pipedream と HubSpot の間でセキュアな通信が可能になります。LLM がデータソースをインテリジェントに検出し、SQL クエリを動的に生成することで、Pipedream のワークフローから HubSpot に質問してデータを取得できます。
この記事では、Pipedream で自然言語によるデータクエリワークフローを構築し、HubSpot のデータを会話形式で探索する方法をご紹介します。ここで紹介する接続の原則は、あらゆる Pipedream ワークフローに適用できます。Connect AI を使用すれば、HubSpot に加えて、数百の他のデータソースにもアクセスできるワークフローやエージェントを構築できます。
HubSpot データ連携について
CData は、HubSpot のライブデータにアクセスし、統合するための最も簡単な方法を提供します。お客様は CData の接続機能を以下の目的で使用しています:
- API の更新や変更を気にすることなく、HubSpot にアクセスできます。
- 追加の構成手順なしで、HubSpot のカスタムオブジェクトやフィールドにアクセスできます。
- SQL ストアドプロシージャを使用して、添付ファイルのアップロード・ダウンロード、エンゲージメントの挿入、カスタムオブジェクトやフィールドの作成・削除などの機能的な操作を実行できます。
ユーザーは、Tableau、Power BI、Excel などの分析ツールと HubSpot を統合し、当社のツールを活用して HubSpot データをデータベースやデータウェアハウスにレプリケートしています。
他のお客様が CData の HubSpot ソリューションをどのように使用しているかについては、ブログをご覧ください:Drivers in Focus: Simplified HubSpot Connectivity
はじめに
前提条件
- CData Connect AI アカウント(HubSpot など、少なくとも1つのアクティブな接続が必要)
- Pipedream アカウント
- OpenAI アカウント(API キー付き)
-
CData Connect AI の認証情報:
- メールアドレス(Basic 認証のユーザー名として使用)
- パーソナルアクセストークン(PAT)(CData Connect AI の設定ページから生成)
ステップ 1:Pipedream 用の HubSpot 接続を設定する
Pipedream から HubSpot への接続は、CData Connect AI のリモート MCP を通じて実現されます。Pipedream から HubSpot とやり取りするために、まず CData Connect AI で HubSpot 接続を作成・設定していきましょう。
- Connect AI にログインし、「Sources」をクリックして、「Add Connection」をクリックします
- 「Add Connection」パネルから「HubSpot」を選択します
-
HubSpot に接続するために必要な認証プロパティを入力します。
HubSpot 接続プロパティの取得・設定方法
HubSpot はOAuth 認証 およびPrivateAppToken ベース認証をサポートします。
OAuth
HubSpot は埋め込みOAuth 認証情報を提供しており、デスクトップアプリケーションまたはヘッドレスマシンから簡単に接続できます。 Web アプリケーションから接続するには、カスタムOAuth アプリケーションを作成する必要があります。 OAuth 経由で接続するには、すべての認証フローでAuthScheme をOAuth に設定します。 ヘルプドキュメントでは、利用可能なOAuth フローでのHubSpot への認証について詳しく説明します。 カスタムOAuth アプリケーションの作成についての情報と、すでに埋め込みOAuth 認証情報を持つ認証フローでもカスタムOAuth アプリケーションを作成したほうがよい場合の説明については、「カスタムOAuth アプリケーションの作成」セクション を参照してください。
また、PrivateAppToken ベース認証についてはヘルプドキュメントの「接続の確立」セクションを参照してください。
- 「Save & Test」をクリックします
-
「Add HubSpot Connection」ページの「Permissions」タブに移動し、ユーザーベースの権限を更新します。
パーソナルアクセストークンの追加
パーソナルアクセストークン(PAT)は、Pipedream から Connect AI への接続を認証するために使用されます。アクセス制御の粒度を維持するために、サービスごとに個別の PAT を作成することをおすすめします。
- Connect AI アプリの右上にある歯車アイコン()をクリックして、設定ページを開きます。
- 設定ページの「Access Tokens」セクションに移動し、「Create PAT」をクリックします。
- PAT に名前を付けて「Create」をクリックします。

- パーソナルアクセストークンは作成時にのみ表示されますので、必ずコピーして安全な場所に保管してください。
これで接続の設定と PAT の生成が完了しました。Pipedream から HubSpot に接続する準備が整いました。
ステップ 2:Pipedream で環境変数を設定する
認証情報を Pipedream の環境変数として安全に保存します。
- Pipedream で Settings を開き、Environment Variables に移動します
- New Variable をクリックして、以下の変数を追加します:
| 変数名 | 値 |
|---|---|
| CDATA_EMAIL | CData Connect AI のログイン用メールアドレス |
| CDATA_PAT | CData パーソナルアクセストークン |
| OPENAI_API_KEY | OpenAI API キー |
ステップ 3:Pipedream ワークフローを作成する
3.1 HTTP トリガーの設定
- Pipedream で新しいワークフローを作成します
- トリガーとして HTTP / Webhook を選択します
- HTTP Response を "Return a custom response from your workflow" に設定します
3.2 LLM ステップの追加
Node.js コードステップを追加し、名前を LLM とします。このステップでは、受信したリクエストから自然言語クエリを抽出します。
ステップのデフォルトコードを以下に置き換えてください:
import OpenAI from "openai";
export default defineComponent({
async run({ steps }) {
if (steps.trigger.event.method === "OPTIONS") {
return { userQuery: null, isOptions: true };
}
const body = steps.trigger.event.body;
const parsed = typeof body === "string" ? JSON.parse(body) : body;
const userQuery = parsed?.query;
console.log("USER QUERY:", userQuery);
if (!userQuery) throw new Error("No query found in request body");
return { userQuery };
}
});
3.3 MCP ステップの追加
Node.js コードステップを追加し、名前を MCP とします。このステップでは、エージェント型の MCP フロー全体を実装します。利用可能な接続をすべて自動検出し、質問に最も関連する接続を選択し、スキーマとテーブルを動的に検出し、LLM を使って SQL クエリを生成し、HubSpot のデータに対して実行します。
このステップでは、CData Connect AI の以下の MCP ツールを順番に使用します:
| MCP ツール | 用途 |
|---|---|
| getCatalogs | CData Connect AI から利用可能な接続をすべて取得します |
| getSchemas | 選択した接続のデータベーススキーマを取得します |
| getTables | 選択したスキーマのすべてのテーブルとビューを取得します |
| queryData | 生成された SQL クエリを実行し、結果を返します |
ステップのデフォルトコードを以下に置き換えてください:
import fetch from "node-fetch";
import OpenAI from "openai";
export default defineComponent({
async run({ steps }) {
const email = process.env.CDATA_EMAIL;
const pat = process.env.CDATA_PAT;
const credentials = email + ":" + pat;
const auth = Buffer.from(credentials).toString("base64");
const llmOutput = steps.LLM;
const userQuery = llmOutput.return_value.userQuery; // In Pipedream replace with: steps.LLM.$return_value.userQuery
const MCP_URL = "https://mcp.cloud.cdata.com/mcp";
const NL = String.fromCharCode(10);
const CRNL = String.fromCharCode(13) + String.fromCharCode(10);
const headers = {
"Content-Type": "application/json",
"Accept": "application/json, text/event-stream",
"Authorization": "Basic " + auth
};
function parseSSE(raw) {
try {
const lines = raw.split(NL);
for (let i = 0; i < lines.length; i++) {
const line = lines.at(i);
const trimmed = line.trim();
if (trimmed.indexOf("data:") === 0) {
const jsonStr = trimmed.slice(5).trim();
if (jsonStr) {
const json = JSON.parse(jsonStr);
const result = json && json.result;
const content = result && result.content;
if (Array.isArray(content)) {
return {
parsed: content.map(function(c) { return c.text || ""; }).join(NL),
isError: (result && result.isError) || false,
full: json
};
}
}
}
}
} catch (e) {
console.log("SSE parse error:", e.message);
}
return { parsed: raw, isError: false, full: null };
}
function parseCSV(text) {
let clean = text || "";
if (clean.charAt(0) === '"' && clean.charAt(clean.length - 1) === '"') {
clean = clean.slice(1, -1);
}
const ESC_CRNL = String.fromCharCode(92) + "r" + String.fromCharCode(92) + "n";
const ESC_QUOTE = String.fromCharCode(92) + '"';
const ESC_SLASH = String.fromCharCode(92) + String.fromCharCode(92);
const SINGLE_SLASH = String.fromCharCode(92);
clean = clean.split(ESC_CRNL).join(CRNL).split(ESC_QUOTE).join('"').split(ESC_SLASH).join(SINGLE_SLASH);
const lines = clean.split(CRNL).filter(function(l) { return l.trim(); });
return lines.slice(1).map(function(l) { return l.split(",").at(0).trim(); }).filter(Boolean);
}
async function initSession() {
const res = await fetch(MCP_URL, {
method: "POST",
headers: headers,
body: JSON.stringify({
jsonrpc: "2.0",
id: 1,
method: "initialize",
params: {
protocolVersion: "2024-11-05",
capabilities: {},
clientInfo: { name: "pipedream", version: "1.0" }
}
})
});
return res.headers.get("mcp-session-id");
}
async function callMCP(id, method, args, sessionId) {
const reqHeaders = Object.assign({}, headers);
if (sessionId) {
Object.assign(reqHeaders, { "mcp-session-id": sessionId });
}
const res = await fetch(MCP_URL, {
method: "POST",
headers: reqHeaders,
body: JSON.stringify({
jsonrpc: "2.0",
id: id,
method: "tools/call",
params: { name: method, arguments: args }
})
});
const raw = await res.text();
const result = parseSSE(raw);
result.raw = raw;
return result;
}
const client = new OpenAI({ apiKey: process.env.OPENAI_API_KEY });
const completions = client.chat.completions;
const session1 = await initSession();
const catalogsResult = await callMCP(2, "getCatalogs", {}, session1);
const catalogs = parseCSV(catalogsResult.parsed);
const systemMsg1 = "You are a data routing expert. Pick the MOST relevant connection name from the list. Return ONLY the connection name. Available connections: " + catalogs.join(", ");
const connectionResponse = await completions.create({
model: "gpt-4o-mini",
messages: new Array(
{ role: "system", content: systemMsg1 },
{ role: "user", content: userQuery }
)
});
const connectionName = connectionResponse.choices.at(0).message.content.trim();
const session2 = await initSession();
const schemasResult = await callMCP(2, "getSchemas", {
connectionName: connectionName,
catalogName: connectionName
}, session2);
const schemas = parseCSV(schemasResult.parsed);
const schemaName = schemas.at(0) || "REST";
const session3 = await initSession();
const tablesResult = await callMCP(2, "getTables", {
connectionName: connectionName,
catalogName: connectionName,
schemaName: schemaName
}, session3);
const tableNames = parseCSV(tablesResult.parsed);
const queryLower = userQuery.toLowerCase();
const isListTablesQuery =
queryLower.indexOf("list") !== -1 ||
queryLower.indexOf("what tables") !== -1 ||
queryLower.indexOf("show tables") !== -1;
if (isListTablesQuery) {
return {
success: true,
connection: connectionName,
message: "Available tables in " + connectionName + "." + schemaName,
tables: tableNames
};
}
const tableList = tableNames.map(function(t) {
return connectionName + "." + schemaName + "." + t;
}).join(", ");
const systemMsg2 = "You are a SQL expert. Generate SQL for CData. Use format: connectionName.schemaName.TableName. Available tables: " + tableList + ". Return ONLY SQL. No markdown. No brackets.";
const sqlResponse = await completions.create({
model: "gpt-4o-mini",
messages: new Array(
{ role: "system", content: systemMsg2 },
{ role: "user", content: userQuery }
)
});
const sql = sqlResponse.choices.at(0).message.content.trim();
if (!sql) { return { error: "LLM returned empty SQL" }; }
const session4 = await initSession();
const queryResult = await callMCP(2, "queryData", {
query: sql,
connectionName: connectionName
}, session4);
if (queryResult.full) {
const content = queryResult.full.result && queryResult.full.result.content;
if (Array.isArray(content)) {
try {
const parsed = JSON.parse(content.at(0).text);
const results = parsed.results && parsed.results.at(0);
return {
sql: sql,
connection: connectionName,
data: (results && results.rows) || new Array(),
schema: (results && results.schema) || new Array(),
success: true
};
} catch (e) {
return { sql: sql, connection: connectionName, raw: content.at(0).text, success: true };
}
}
}
return { sql: sql, connection: connectionName, raw: queryResult.raw };
}
});
注意: Pipedream にコードを貼り付ける際は、該当行のコメントに記載されているとおり、llmOutput.return_value.userQuery を steps.LLM.$return_value.userQuery に置き換えてください。
3.4 レスポンスステップの設定
- Return HTTP Response ステップを追加し、名前を Response とします
- Response Status Code を 200 に設定します
- Response Body を {{steps.mcp.$return_value}} に設定します
- 以下の Response Headers を追加します。「Response Headers」をクリックし、 をクリックして追加してください:
| Key | Value |
|---|---|
| Access-Control-Allow-Origin | * |
| Access-Control-Allow-Methods | POST, OPTIONS |
| Access-Control-Allow-Headers | Content-Type |
ステップ 4:ワークフローをテストして HubSpot のデータを操作する
トリガーにテストイベントを設定する
- ワークフロー内の trigger ステップをクリックします
- Generate Test Event をクリックします
- イベントのボディを以下のように編集します:
{
"query": "list all tables"
}
ワークフロー全体を実行する
- トリガーステップの下部にある Test workflow をクリックします
- Pipedream がテストイベントを使用して、すべてのステップを順番に実行します
- 各ステップが正常に完了すると、緑色に変わります
各ステップの結果を確認する
テスト実行が完了したら、各ステップのタブをクリックして Exports タブで出力を確認しましょう:
| ステップ | Exports で確認するポイント |
|---|---|
| trigger | body.query - クエリが正しく受信されたことを確認 |
| LLM | userQuery - クエリが正しく抽出されたことを確認 |
| MCP | connection, sql, data, schema - データが取得されたことを確認 |
| Response | $response.body - 最終的な JSON レスポンス |
各ステップ内の Logs タブには、生成された SQL、選択された接続、MCP の生レスポンスなどの詳細な出力が表示されます。
注意: Response ステップの Exports タブには、{ "success": true } と "status 200" のようなサマリーのみが表示されます。これはワークフローが正常に実行されたことを示していますが、完全なデータは表示されません。
データ行、SQL、スキーマを含む完全な出力を確認するには、MCP ステップのタブをクリックし、Exports タブを確認してください。$return_value を展開すると、完全なレスポンスが表示されます:
このワークフローは以下を自動的に実行します:
- CData の接続をすべて検出
- 質問に最も関連する接続を選択
- スキーマとテーブルを動的に検出
- 適切な SQL クエリを生成して実行
- 結果を返却
仕組みについて
この連携では、CData Connect AI の以下の MCP ツールを順番に使用します:
| MCP ツール | 用途 |
|---|---|
| getCatalogs | CData Connect AI から利用可能な接続をすべて取得します |
| getSchemas | 特定の接続のデータベーススキーマを取得します |
| getTables | 特定のスキーマのすべてのテーブルとビューを取得します |
| queryData | SQL クエリを実行し、結果を返します |
OpenAI の LLM が、自然言語の質問と CData MCP ツールの間のインテリジェントレイヤーとして機能し、適切な接続の選択、データ構造の検出、正確な SQL クエリの生成を自動で行います。
CData Connect AI でビジネスシステムのデータ活用を今すぐスタート
Pipedream と CData Connect AI を組み合わせることで、自然言語クエリがエンタープライズシステム全体のリアルタイムデータ操作に自動変換される、インテリジェントな AI 駆動ワークフローを実現できます。ETL パイプライン、データ同期ジョブ、カスタム連携ロジックは不要です。このアプローチにより、ガバナンスの強化、運用コストの削減、AI ワークフローからのより迅速で正確なレスポンスが可能になります。
いかがでしたか?Pipedream から HubSpot へのデータ接続が簡単に完了したのではないでしょうか。業務に使えそう、と感じてくださった方は、14日間の無償トライアルで AI ツールからビジネスシステムへのリアルタイムデータ接続をぜひお試しください。