CData Connect AI MCP Server でPipedream からNetSuite のデータと連携しよう!
Pipedream は、API の接続、タスクの自動化、サーバーレス関数を使ったイベント駆動型ワークフローの構築をサポートする、クラウドベースのワークフロー自動化プラットフォームです。CData Connect AI のリモート MCP と組み合わせることで、データレプリケーションを行わずに、Pipedream から自然言語を使って NetSuite のデータとリアルタイムでやり取りできるようになります。
CData Connect AI は、NetSuite に接続するための専用クラウド間インターフェースを提供します。CData Connect AI Remote MCP Server により、Pipedream と NetSuite の間でセキュアな通信が可能になります。LLM がデータソースをインテリジェントに検出し、SQL クエリを動的に生成することで、Pipedream のワークフローから NetSuite に質問してデータを取得できます。
この記事では、Pipedream で自然言語によるデータクエリワークフローを構築し、NetSuite のデータを会話形式で探索する方法をご紹介します。ここで紹介する接続の原則は、あらゆる Pipedream ワークフローに適用できます。Connect AI を使用すれば、NetSuite に加えて、数百の他のデータソースにもアクセスできるワークフローやエージェントを構築できます。
NetSuite データ連携について
CData は、Oracle NetSuite のライブデータにアクセスし、統合するための最も簡単な方法を提供します。お客様は CData の接続機能を以下の目的で使用しています:
- Standard、CRM、OneWorld を含む、すべてのエディションの NetSuite にアクセスできます。
- SuiteTalk API(SOAP ベース)のすべてのバージョンと、SQL のように機能し、より簡単なデータクエリと操作を可能にする SuiteQL に接続できます。
- Saved Searches のサポートにより、事前定義されたレポートとカスタムレポートにアクセスできます。
- トークンベースおよび OAuth 2.0 で安全に認証でき、あらゆるユースケースで互換性とセキュリティを確保します。
- SQL ストアドプロシージャを使用して、ファイルのアップロード・ダウンロード、レコードや関連付けのアタッチ・デタッチ、ロールの取得、追加のテーブルやカラム情報の取得、ジョブ結果の取得などの機能的なアクションを実行できます。
お客様は、Power BI や Excel などのお気に入りの分析ツールからライブ NetSuite データにアクセスするために CData ソリューションを使用しています。また、CData Sync を直接使用するか、Azure Data Factory などの他のアプリケーションとの CData の互換性を活用して、NetSuite データを包括的なデータベースやデータウェアハウスに統合しています。CData は、Oracle NetSuite のお客様が NetSuite からデータを取得し、NetSuite にデータをプッシュするアプリを簡単に作成できるよう支援し、他のソースからのデータを NetSuite と統合することを可能にしています。
当社の Oracle NetSuite ソリューションの詳細については、ブログをご覧ください:Drivers in Focus Part 2: Replicating and Consolidating ... NetSuite Accounting Data
はじめに
前提条件
- CData Connect AI アカウント(NetSuite など、少なくとも1つのアクティブな接続が必要)
- Pipedream アカウント
- OpenAI アカウント(API キー付き)
-
CData Connect AI の認証情報:
- メールアドレス(Basic 認証のユーザー名として使用)
- パーソナルアクセストークン(PAT)(CData Connect AI の設定ページから生成)
ステップ 1:Pipedream 用の NetSuite 接続を設定する
Pipedream から NetSuite への接続は、CData Connect AI のリモート MCP を通じて実現されます。Pipedream から NetSuite とやり取りするために、まず CData Connect AI で NetSuite 接続を作成・設定していきましょう。
- Connect AI にログインし、「Sources」をクリックして、「Add Connection」をクリックします
- 「Add Connection」パネルから「NetSuite」を選択します
-
NetSuite に接続するために必要な認証プロパティを入力します。
NetSuiteへの接続
NetSuite では、2種類のAPI でデータにアクセスできます。どちらのAPI を使用するかは、Schema 接続プロパティで以下のいずれかを選択して指定してください。
- SuiteTalk は、NetSuite との通信に使用されるSOAP ベースの従来から提供されているサービスです。幅広いエンティティをサポートし、INSERT / UPDATE / DELETE の操作も対応しています。ただし、SuiteQL API と比べるとデータの取得速度が劣ります。また、サーバーサイドでのJOIN に対応していないため、これらの処理はCData 製品がクライアントサイドで実行します。
- SuiteQL は、より新しいAPI です。JOIN、GROUP BY、集計、カラムフィルタリングをサーバーサイドで処理できるため、SuiteTalk よりもはるかに高速にデータを取得できます。ただし、NetSuite データへのアクセスは読み取り専用となります。
データの取得のみが目的でしたらSuiteQL をお勧めします。データの取得と変更の両方が必要な場合は、SuiteTalk をお選びください。
NetSuite への認証
CData 製品では、以下の認証方式がご利用いただけます。
- トークンベース認証(TBA)はOAuth1.0に似た仕組みです。2020.2以降のSuiteTalk とSuiteQL の両方で利用できます。
- OAuth 2.0 認証(OAuth 2.0 認可コードグラントフロー)は、SuiteQL でのみご利用いただけます。
- OAuth JWT 認証は、OAuth2.0 クライアント認証フローの一つで、クライアント認証情報を含むJWT を使用してNetSuite データへのアクセスを要求します。
トークンベース認証(OAuth1.0)
トークンベース認証(TBA)は、基本的にOAuth 1.0 の仕組みです。この認証方式はSuiteTalk とSuiteQL の両方でサポートされています。管理者権限をお持ちの方がNetSuite UI 内でOAuthClientId、OAuthClientSecret、OAuthAccessToken、OAuthAccessTokenSecret を直接作成することで設定できます。 NetSuite UI でのトークン作成手順については、ヘルプドキュメントの「はじめに」セクションをご参照ください。
アクセストークンを作成したら、以下の接続プロパティを設定して接続してみましょう。
- AuthScheme = Token
- AccountId = 接続先のアカウント
- OAuthClientId = アプリケーション作成時に表示されるコンシューマーキー
- OAuthClientSecret = アプリケーション作成時に表示されるコンシューマーシークレット
- OAuthAccessToken = アクセストークン作成時のトークンID
- OAuthAccessTokenSecret = アクセストークン作成時のトークンシークレット
その他の認証方法については、ヘルプドキュメントの「はじめに」をご確認ください。
- 「Save & Test」をクリックします
-
「Add NetSuite Connection」ページの「Permissions」タブに移動し、ユーザーベースの権限を更新します。
パーソナルアクセストークンの追加
パーソナルアクセストークン(PAT)は、Pipedream から Connect AI への接続を認証するために使用されます。アクセス制御の粒度を維持するために、サービスごとに個別の PAT を作成することをおすすめします。
- Connect AI アプリの右上にある歯車アイコン()をクリックして、設定ページを開きます。
- 設定ページの「Access Tokens」セクションに移動し、「Create PAT」をクリックします。
- PAT に名前を付けて「Create」をクリックします。

- パーソナルアクセストークンは作成時にのみ表示されますので、必ずコピーして安全な場所に保管してください。
これで接続の設定と PAT の生成が完了しました。Pipedream から NetSuite に接続する準備が整いました。
ステップ 2:Pipedream で環境変数を設定する
認証情報を Pipedream の環境変数として安全に保存します。
- Pipedream で Settings を開き、Environment Variables に移動します
- New Variable をクリックして、以下の変数を追加します:
| 変数名 | 値 |
|---|---|
| CDATA_EMAIL | CData Connect AI のログイン用メールアドレス |
| CDATA_PAT | CData パーソナルアクセストークン |
| OPENAI_API_KEY | OpenAI API キー |
ステップ 3:Pipedream ワークフローを作成する
3.1 HTTP トリガーの設定
- Pipedream で新しいワークフローを作成します
- トリガーとして HTTP / Webhook を選択します
- HTTP Response を "Return a custom response from your workflow" に設定します
3.2 LLM ステップの追加
Node.js コードステップを追加し、名前を LLM とします。このステップでは、受信したリクエストから自然言語クエリを抽出します。
ステップのデフォルトコードを以下に置き換えてください:
import OpenAI from "openai";
export default defineComponent({
async run({ steps }) {
if (steps.trigger.event.method === "OPTIONS") {
return { userQuery: null, isOptions: true };
}
const body = steps.trigger.event.body;
const parsed = typeof body === "string" ? JSON.parse(body) : body;
const userQuery = parsed?.query;
console.log("USER QUERY:", userQuery);
if (!userQuery) throw new Error("No query found in request body");
return { userQuery };
}
});
3.3 MCP ステップの追加
Node.js コードステップを追加し、名前を MCP とします。このステップでは、エージェント型の MCP フロー全体を実装します。利用可能な接続をすべて自動検出し、質問に最も関連する接続を選択し、スキーマとテーブルを動的に検出し、LLM を使って SQL クエリを生成し、NetSuite のデータに対して実行します。
このステップでは、CData Connect AI の以下の MCP ツールを順番に使用します:
| MCP ツール | 用途 |
|---|---|
| getCatalogs | CData Connect AI から利用可能な接続をすべて取得します |
| getSchemas | 選択した接続のデータベーススキーマを取得します |
| getTables | 選択したスキーマのすべてのテーブルとビューを取得します |
| queryData | 生成された SQL クエリを実行し、結果を返します |
ステップのデフォルトコードを以下に置き換えてください:
import fetch from "node-fetch";
import OpenAI from "openai";
export default defineComponent({
async run({ steps }) {
const email = process.env.CDATA_EMAIL;
const pat = process.env.CDATA_PAT;
const credentials = email + ":" + pat;
const auth = Buffer.from(credentials).toString("base64");
const llmOutput = steps.LLM;
const userQuery = llmOutput.return_value.userQuery; // In Pipedream replace with: steps.LLM.$return_value.userQuery
const MCP_URL = "https://mcp.cloud.cdata.com/mcp";
const NL = String.fromCharCode(10);
const CRNL = String.fromCharCode(13) + String.fromCharCode(10);
const headers = {
"Content-Type": "application/json",
"Accept": "application/json, text/event-stream",
"Authorization": "Basic " + auth
};
function parseSSE(raw) {
try {
const lines = raw.split(NL);
for (let i = 0; i < lines.length; i++) {
const line = lines.at(i);
const trimmed = line.trim();
if (trimmed.indexOf("data:") === 0) {
const jsonStr = trimmed.slice(5).trim();
if (jsonStr) {
const json = JSON.parse(jsonStr);
const result = json && json.result;
const content = result && result.content;
if (Array.isArray(content)) {
return {
parsed: content.map(function(c) { return c.text || ""; }).join(NL),
isError: (result && result.isError) || false,
full: json
};
}
}
}
}
} catch (e) {
console.log("SSE parse error:", e.message);
}
return { parsed: raw, isError: false, full: null };
}
function parseCSV(text) {
let clean = text || "";
if (clean.charAt(0) === '"' && clean.charAt(clean.length - 1) === '"') {
clean = clean.slice(1, -1);
}
const ESC_CRNL = String.fromCharCode(92) + "r" + String.fromCharCode(92) + "n";
const ESC_QUOTE = String.fromCharCode(92) + '"';
const ESC_SLASH = String.fromCharCode(92) + String.fromCharCode(92);
const SINGLE_SLASH = String.fromCharCode(92);
clean = clean.split(ESC_CRNL).join(CRNL).split(ESC_QUOTE).join('"').split(ESC_SLASH).join(SINGLE_SLASH);
const lines = clean.split(CRNL).filter(function(l) { return l.trim(); });
return lines.slice(1).map(function(l) { return l.split(",").at(0).trim(); }).filter(Boolean);
}
async function initSession() {
const res = await fetch(MCP_URL, {
method: "POST",
headers: headers,
body: JSON.stringify({
jsonrpc: "2.0",
id: 1,
method: "initialize",
params: {
protocolVersion: "2024-11-05",
capabilities: {},
clientInfo: { name: "pipedream", version: "1.0" }
}
})
});
return res.headers.get("mcp-session-id");
}
async function callMCP(id, method, args, sessionId) {
const reqHeaders = Object.assign({}, headers);
if (sessionId) {
Object.assign(reqHeaders, { "mcp-session-id": sessionId });
}
const res = await fetch(MCP_URL, {
method: "POST",
headers: reqHeaders,
body: JSON.stringify({
jsonrpc: "2.0",
id: id,
method: "tools/call",
params: { name: method, arguments: args }
})
});
const raw = await res.text();
const result = parseSSE(raw);
result.raw = raw;
return result;
}
const client = new OpenAI({ apiKey: process.env.OPENAI_API_KEY });
const completions = client.chat.completions;
const session1 = await initSession();
const catalogsResult = await callMCP(2, "getCatalogs", {}, session1);
const catalogs = parseCSV(catalogsResult.parsed);
const systemMsg1 = "You are a data routing expert. Pick the MOST relevant connection name from the list. Return ONLY the connection name. Available connections: " + catalogs.join(", ");
const connectionResponse = await completions.create({
model: "gpt-4o-mini",
messages: new Array(
{ role: "system", content: systemMsg1 },
{ role: "user", content: userQuery }
)
});
const connectionName = connectionResponse.choices.at(0).message.content.trim();
const session2 = await initSession();
const schemasResult = await callMCP(2, "getSchemas", {
connectionName: connectionName,
catalogName: connectionName
}, session2);
const schemas = parseCSV(schemasResult.parsed);
const schemaName = schemas.at(0) || "REST";
const session3 = await initSession();
const tablesResult = await callMCP(2, "getTables", {
connectionName: connectionName,
catalogName: connectionName,
schemaName: schemaName
}, session3);
const tableNames = parseCSV(tablesResult.parsed);
const queryLower = userQuery.toLowerCase();
const isListTablesQuery =
queryLower.indexOf("list") !== -1 ||
queryLower.indexOf("what tables") !== -1 ||
queryLower.indexOf("show tables") !== -1;
if (isListTablesQuery) {
return {
success: true,
connection: connectionName,
message: "Available tables in " + connectionName + "." + schemaName,
tables: tableNames
};
}
const tableList = tableNames.map(function(t) {
return connectionName + "." + schemaName + "." + t;
}).join(", ");
const systemMsg2 = "You are a SQL expert. Generate SQL for CData. Use format: connectionName.schemaName.TableName. Available tables: " + tableList + ". Return ONLY SQL. No markdown. No brackets.";
const sqlResponse = await completions.create({
model: "gpt-4o-mini",
messages: new Array(
{ role: "system", content: systemMsg2 },
{ role: "user", content: userQuery }
)
});
const sql = sqlResponse.choices.at(0).message.content.trim();
if (!sql) { return { error: "LLM returned empty SQL" }; }
const session4 = await initSession();
const queryResult = await callMCP(2, "queryData", {
query: sql,
connectionName: connectionName
}, session4);
if (queryResult.full) {
const content = queryResult.full.result && queryResult.full.result.content;
if (Array.isArray(content)) {
try {
const parsed = JSON.parse(content.at(0).text);
const results = parsed.results && parsed.results.at(0);
return {
sql: sql,
connection: connectionName,
data: (results && results.rows) || new Array(),
schema: (results && results.schema) || new Array(),
success: true
};
} catch (e) {
return { sql: sql, connection: connectionName, raw: content.at(0).text, success: true };
}
}
}
return { sql: sql, connection: connectionName, raw: queryResult.raw };
}
});
注意: Pipedream にコードを貼り付ける際は、該当行のコメントに記載されているとおり、llmOutput.return_value.userQuery を steps.LLM.$return_value.userQuery に置き換えてください。
3.4 レスポンスステップの設定
- Return HTTP Response ステップを追加し、名前を Response とします
- Response Status Code を 200 に設定します
- Response Body を {{steps.mcp.$return_value}} に設定します
- 以下の Response Headers を追加します。「Response Headers」をクリックし、 をクリックして追加してください:
| Key | Value |
|---|---|
| Access-Control-Allow-Origin | * |
| Access-Control-Allow-Methods | POST, OPTIONS |
| Access-Control-Allow-Headers | Content-Type |
ステップ 4:ワークフローをテストして NetSuite のデータを操作する
トリガーにテストイベントを設定する
- ワークフロー内の trigger ステップをクリックします
- Generate Test Event をクリックします
- イベントのボディを以下のように編集します:
{
"query": "list all tables"
}
ワークフロー全体を実行する
- トリガーステップの下部にある Test workflow をクリックします
- Pipedream がテストイベントを使用して、すべてのステップを順番に実行します
- 各ステップが正常に完了すると、緑色に変わります
各ステップの結果を確認する
テスト実行が完了したら、各ステップのタブをクリックして Exports タブで出力を確認しましょう:
| ステップ | Exports で確認するポイント |
|---|---|
| trigger | body.query - クエリが正しく受信されたことを確認 |
| LLM | userQuery - クエリが正しく抽出されたことを確認 |
| MCP | connection, sql, data, schema - データが取得されたことを確認 |
| Response | $response.body - 最終的な JSON レスポンス |
各ステップ内の Logs タブには、生成された SQL、選択された接続、MCP の生レスポンスなどの詳細な出力が表示されます。
注意: Response ステップの Exports タブには、{ "success": true } と "status 200" のようなサマリーのみが表示されます。これはワークフローが正常に実行されたことを示していますが、完全なデータは表示されません。
データ行、SQL、スキーマを含む完全な出力を確認するには、MCP ステップのタブをクリックし、Exports タブを確認してください。$return_value を展開すると、完全なレスポンスが表示されます:
このワークフローは以下を自動的に実行します:
- CData の接続をすべて検出
- 質問に最も関連する接続を選択
- スキーマとテーブルを動的に検出
- 適切な SQL クエリを生成して実行
- 結果を返却
仕組みについて
この連携では、CData Connect AI の以下の MCP ツールを順番に使用します:
| MCP ツール | 用途 |
|---|---|
| getCatalogs | CData Connect AI から利用可能な接続をすべて取得します |
| getSchemas | 特定の接続のデータベーススキーマを取得します |
| getTables | 特定のスキーマのすべてのテーブルとビューを取得します |
| queryData | SQL クエリを実行し、結果を返します |
OpenAI の LLM が、自然言語の質問と CData MCP ツールの間のインテリジェントレイヤーとして機能し、適切な接続の選択、データ構造の検出、正確な SQL クエリの生成を自動で行います。
CData Connect AI でビジネスシステムのデータ活用を今すぐスタート
Pipedream と CData Connect AI を組み合わせることで、自然言語クエリがエンタープライズシステム全体のリアルタイムデータ操作に自動変換される、インテリジェントな AI 駆動ワークフローを実現できます。ETL パイプライン、データ同期ジョブ、カスタム連携ロジックは不要です。このアプローチにより、ガバナンスの強化、運用コストの削減、AI ワークフローからのより迅速で正確なレスポンスが可能になります。
いかがでしたか?Pipedream から NetSuite へのデータ接続が簡単に完了したのではないでしょうか。業務に使えそう、と感じてくださった方は、14日間の無償トライアルで AI ツールからビジネスシステムへのリアルタイムデータ接続をぜひお試しください。