CData Connect AI MCP Server でPipedream からPaylocity のデータと連携しよう!
Pipedream は、API の接続、タスクの自動化、サーバーレス関数を使ったイベント駆動型ワークフローの構築をサポートする、クラウドベースのワークフロー自動化プラットフォームです。CData Connect AI のリモート MCP と組み合わせることで、データレプリケーションを行わずに、Pipedream から自然言語を使って Paylocity のデータとリアルタイムでやり取りできるようになります。
CData Connect AI は、Paylocity に接続するための専用クラウド間インターフェースを提供します。CData Connect AI Remote MCP Server により、Pipedream と Paylocity の間でセキュアな通信が可能になります。LLM がデータソースをインテリジェントに検出し、SQL クエリを動的に生成することで、Pipedream のワークフローから Paylocity に質問してデータを取得できます。
この記事では、Pipedream で自然言語によるデータクエリワークフローを構築し、Paylocity のデータを会話形式で探索する方法をご紹介します。ここで紹介する接続の原則は、あらゆる Pipedream ワークフローに適用できます。Connect AI を使用すれば、Paylocity に加えて、数百の他のデータソースにもアクセスできるワークフローやエージェントを構築できます。
前提条件
- CData Connect AI アカウント(Paylocity など、少なくとも1つのアクティブな接続が必要)
- Pipedream アカウント
- OpenAI アカウント(API キー付き)
-
CData Connect AI の認証情報:
- メールアドレス(Basic 認証のユーザー名として使用)
- パーソナルアクセストークン(PAT)(CData Connect AI の設定ページから生成)
ステップ 1:Pipedream 用の Paylocity 接続を設定する
Pipedream から Paylocity への接続は、CData Connect AI のリモート MCP を通じて実現されます。Pipedream から Paylocity とやり取りするために、まず CData Connect AI で Paylocity 接続を作成・設定していきましょう。
- Connect AI にログインし、「Sources」をクリックして、「Add Connection」をクリックします
- 「Add Connection」パネルから「Paylocity」を選択します
-
Paylocity に接続するために必要な認証プロパティを入力します。
Paylocity への接続を確立するには以下を設定します。
- RSAPublicKey:Paylocity アカウントでRSA 暗号化が有効になっている場合は、Paylocity に関連付けられたRSA キーを設定。
このプロパティは、Insert およびUpdate ステートメントを実行するために必須です。この機能が無効になっている場合は必須ではありません。
- UseSandbox:サンドボックスアカウントを使用する場合はTrue に設定。
- CustomFieldsCategory:Customfields カテゴリに設定。これは、IncludeCustomFields がtrue に設定されている場合は必須です。デフォルト値はPayrollAndHR です。
- Key:Paylocity の公開鍵で暗号化されたAES 共通鍵(base 64 エンコード)。これはコンテンツを暗号化するためのキーです。
Paylocity は、RSA 復号化を使用してAES 鍵を復号化します。
これはオプションのプロパティで、IV の値が指定されていない場合、ドライバーは内部でキーを生成します。 - IV:コンテンツを暗号化するときに使用するAES IV(base 64 エンコード)。これはオプションのプロパティで、Key の値が指定されていない場合、ドライバーは内部でIV を生成します。
OAuth
OAuth を使用してPaylocity で認証する必要があります。OAuth では認証するユーザーにブラウザでPaylocity との通信を要求します。詳しくは、ヘルプドキュメントのOAuth セクションを参照してください。
Pay Entry API
Pay Entry API はPaylocity API の他の部分と完全に分離されています。個別のクライアントID とシークレットを使用し、アカウントへのアクセスを許可するにはPaylocity から明示的にリクエストする必要があります。 Pay Entry API を使用すると、個々の従業員の給与情報を自動的に送信できます。 Pay Entry API によって提供されるものの性質が非常に限られているため、CData では個別のスキーマを提供しないことを選択しましたが、UsePayEntryAPI 接続プロパティを介して有効にできます。
UsePayEntryAPI をtrue に設定する場合は、CreatePayEntryImportBatch、MergePayEntryImportBatch、Input_TimeEntry、およびOAuth ストアドプロシージャのみ利用できることに注意してください。 製品のその他の機能を使用しようとするとエラーが発生します。また、OAuthAccessToken を個別に保存する必要があります。これは、この接続プロパティを使用するときに異なるOAuthSettingsLocation を設定することを意味します。
- RSAPublicKey:Paylocity アカウントでRSA 暗号化が有効になっている場合は、Paylocity に関連付けられたRSA キーを設定。
- 「Save & Test」をクリックします
-
「Add Paylocity Connection」ページの「Permissions」タブに移動し、ユーザーベースの権限を更新します。
パーソナルアクセストークンの追加
パーソナルアクセストークン(PAT)は、Pipedream から Connect AI への接続を認証するために使用されます。アクセス制御の粒度を維持するために、サービスごとに個別の PAT を作成することをおすすめします。
- Connect AI アプリの右上にある歯車アイコン()をクリックして、設定ページを開きます。
- 設定ページの「Access Tokens」セクションに移動し、「Create PAT」をクリックします。
- PAT に名前を付けて「Create」をクリックします。

- パーソナルアクセストークンは作成時にのみ表示されますので、必ずコピーして安全な場所に保管してください。
これで接続の設定と PAT の生成が完了しました。Pipedream から Paylocity に接続する準備が整いました。
ステップ 2:Pipedream で環境変数を設定する
認証情報を Pipedream の環境変数として安全に保存します。
- Pipedream で Settings を開き、Environment Variables に移動します
- New Variable をクリックして、以下の変数を追加します:
| 変数名 | 値 |
|---|---|
| CDATA_EMAIL | CData Connect AI のログイン用メールアドレス |
| CDATA_PAT | CData パーソナルアクセストークン |
| OPENAI_API_KEY | OpenAI API キー |
ステップ 3:Pipedream ワークフローを作成する
3.1 HTTP トリガーの設定
- Pipedream で新しいワークフローを作成します
- トリガーとして HTTP / Webhook を選択します
- HTTP Response を "Return a custom response from your workflow" に設定します
3.2 LLM ステップの追加
Node.js コードステップを追加し、名前を LLM とします。このステップでは、受信したリクエストから自然言語クエリを抽出します。
ステップのデフォルトコードを以下に置き換えてください:
import OpenAI from "openai";
export default defineComponent({
async run({ steps }) {
if (steps.trigger.event.method === "OPTIONS") {
return { userQuery: null, isOptions: true };
}
const body = steps.trigger.event.body;
const parsed = typeof body === "string" ? JSON.parse(body) : body;
const userQuery = parsed?.query;
console.log("USER QUERY:", userQuery);
if (!userQuery) throw new Error("No query found in request body");
return { userQuery };
}
});
3.3 MCP ステップの追加
Node.js コードステップを追加し、名前を MCP とします。このステップでは、エージェント型の MCP フロー全体を実装します。利用可能な接続をすべて自動検出し、質問に最も関連する接続を選択し、スキーマとテーブルを動的に検出し、LLM を使って SQL クエリを生成し、Paylocity のデータに対して実行します。
このステップでは、CData Connect AI の以下の MCP ツールを順番に使用します:
| MCP ツール | 用途 |
|---|---|
| getCatalogs | CData Connect AI から利用可能な接続をすべて取得します |
| getSchemas | 選択した接続のデータベーススキーマを取得します |
| getTables | 選択したスキーマのすべてのテーブルとビューを取得します |
| queryData | 生成された SQL クエリを実行し、結果を返します |
ステップのデフォルトコードを以下に置き換えてください:
import fetch from "node-fetch";
import OpenAI from "openai";
export default defineComponent({
async run({ steps }) {
const email = process.env.CDATA_EMAIL;
const pat = process.env.CDATA_PAT;
const credentials = email + ":" + pat;
const auth = Buffer.from(credentials).toString("base64");
const llmOutput = steps.LLM;
const userQuery = llmOutput.return_value.userQuery; // In Pipedream replace with: steps.LLM.$return_value.userQuery
const MCP_URL = "https://mcp.cloud.cdata.com/mcp";
const NL = String.fromCharCode(10);
const CRNL = String.fromCharCode(13) + String.fromCharCode(10);
const headers = {
"Content-Type": "application/json",
"Accept": "application/json, text/event-stream",
"Authorization": "Basic " + auth
};
function parseSSE(raw) {
try {
const lines = raw.split(NL);
for (let i = 0; i < lines.length; i++) {
const line = lines.at(i);
const trimmed = line.trim();
if (trimmed.indexOf("data:") === 0) {
const jsonStr = trimmed.slice(5).trim();
if (jsonStr) {
const json = JSON.parse(jsonStr);
const result = json && json.result;
const content = result && result.content;
if (Array.isArray(content)) {
return {
parsed: content.map(function(c) { return c.text || ""; }).join(NL),
isError: (result && result.isError) || false,
full: json
};
}
}
}
}
} catch (e) {
console.log("SSE parse error:", e.message);
}
return { parsed: raw, isError: false, full: null };
}
function parseCSV(text) {
let clean = text || "";
if (clean.charAt(0) === '"' && clean.charAt(clean.length - 1) === '"') {
clean = clean.slice(1, -1);
}
const ESC_CRNL = String.fromCharCode(92) + "r" + String.fromCharCode(92) + "n";
const ESC_QUOTE = String.fromCharCode(92) + '"';
const ESC_SLASH = String.fromCharCode(92) + String.fromCharCode(92);
const SINGLE_SLASH = String.fromCharCode(92);
clean = clean.split(ESC_CRNL).join(CRNL).split(ESC_QUOTE).join('"').split(ESC_SLASH).join(SINGLE_SLASH);
const lines = clean.split(CRNL).filter(function(l) { return l.trim(); });
return lines.slice(1).map(function(l) { return l.split(",").at(0).trim(); }).filter(Boolean);
}
async function initSession() {
const res = await fetch(MCP_URL, {
method: "POST",
headers: headers,
body: JSON.stringify({
jsonrpc: "2.0",
id: 1,
method: "initialize",
params: {
protocolVersion: "2024-11-05",
capabilities: {},
clientInfo: { name: "pipedream", version: "1.0" }
}
})
});
return res.headers.get("mcp-session-id");
}
async function callMCP(id, method, args, sessionId) {
const reqHeaders = Object.assign({}, headers);
if (sessionId) {
Object.assign(reqHeaders, { "mcp-session-id": sessionId });
}
const res = await fetch(MCP_URL, {
method: "POST",
headers: reqHeaders,
body: JSON.stringify({
jsonrpc: "2.0",
id: id,
method: "tools/call",
params: { name: method, arguments: args }
})
});
const raw = await res.text();
const result = parseSSE(raw);
result.raw = raw;
return result;
}
const client = new OpenAI({ apiKey: process.env.OPENAI_API_KEY });
const completions = client.chat.completions;
const session1 = await initSession();
const catalogsResult = await callMCP(2, "getCatalogs", {}, session1);
const catalogs = parseCSV(catalogsResult.parsed);
const systemMsg1 = "You are a data routing expert. Pick the MOST relevant connection name from the list. Return ONLY the connection name. Available connections: " + catalogs.join(", ");
const connectionResponse = await completions.create({
model: "gpt-4o-mini",
messages: new Array(
{ role: "system", content: systemMsg1 },
{ role: "user", content: userQuery }
)
});
const connectionName = connectionResponse.choices.at(0).message.content.trim();
const session2 = await initSession();
const schemasResult = await callMCP(2, "getSchemas", {
connectionName: connectionName,
catalogName: connectionName
}, session2);
const schemas = parseCSV(schemasResult.parsed);
const schemaName = schemas.at(0) || "REST";
const session3 = await initSession();
const tablesResult = await callMCP(2, "getTables", {
connectionName: connectionName,
catalogName: connectionName,
schemaName: schemaName
}, session3);
const tableNames = parseCSV(tablesResult.parsed);
const queryLower = userQuery.toLowerCase();
const isListTablesQuery =
queryLower.indexOf("list") !== -1 ||
queryLower.indexOf("what tables") !== -1 ||
queryLower.indexOf("show tables") !== -1;
if (isListTablesQuery) {
return {
success: true,
connection: connectionName,
message: "Available tables in " + connectionName + "." + schemaName,
tables: tableNames
};
}
const tableList = tableNames.map(function(t) {
return connectionName + "." + schemaName + "." + t;
}).join(", ");
const systemMsg2 = "You are a SQL expert. Generate SQL for CData. Use format: connectionName.schemaName.TableName. Available tables: " + tableList + ". Return ONLY SQL. No markdown. No brackets.";
const sqlResponse = await completions.create({
model: "gpt-4o-mini",
messages: new Array(
{ role: "system", content: systemMsg2 },
{ role: "user", content: userQuery }
)
});
const sql = sqlResponse.choices.at(0).message.content.trim();
if (!sql) { return { error: "LLM returned empty SQL" }; }
const session4 = await initSession();
const queryResult = await callMCP(2, "queryData", {
query: sql,
connectionName: connectionName
}, session4);
if (queryResult.full) {
const content = queryResult.full.result && queryResult.full.result.content;
if (Array.isArray(content)) {
try {
const parsed = JSON.parse(content.at(0).text);
const results = parsed.results && parsed.results.at(0);
return {
sql: sql,
connection: connectionName,
data: (results && results.rows) || new Array(),
schema: (results && results.schema) || new Array(),
success: true
};
} catch (e) {
return { sql: sql, connection: connectionName, raw: content.at(0).text, success: true };
}
}
}
return { sql: sql, connection: connectionName, raw: queryResult.raw };
}
});
注意: Pipedream にコードを貼り付ける際は、該当行のコメントに記載されているとおり、llmOutput.return_value.userQuery を steps.LLM.$return_value.userQuery に置き換えてください。
3.4 レスポンスステップの設定
- Return HTTP Response ステップを追加し、名前を Response とします
- Response Status Code を 200 に設定します
- Response Body を {{steps.mcp.$return_value}} に設定します
- 以下の Response Headers を追加します。「Response Headers」をクリックし、 をクリックして追加してください:
| Key | Value |
|---|---|
| Access-Control-Allow-Origin | * |
| Access-Control-Allow-Methods | POST, OPTIONS |
| Access-Control-Allow-Headers | Content-Type |
ステップ 4:ワークフローをテストして Paylocity のデータを操作する
トリガーにテストイベントを設定する
- ワークフロー内の trigger ステップをクリックします
- Generate Test Event をクリックします
- イベントのボディを以下のように編集します:
{
"query": "list all tables"
}
ワークフロー全体を実行する
- トリガーステップの下部にある Test workflow をクリックします
- Pipedream がテストイベントを使用して、すべてのステップを順番に実行します
- 各ステップが正常に完了すると、緑色に変わります
各ステップの結果を確認する
テスト実行が完了したら、各ステップのタブをクリックして Exports タブで出力を確認しましょう:
| ステップ | Exports で確認するポイント |
|---|---|
| trigger | body.query - クエリが正しく受信されたことを確認 |
| LLM | userQuery - クエリが正しく抽出されたことを確認 |
| MCP | connection, sql, data, schema - データが取得されたことを確認 |
| Response | $response.body - 最終的な JSON レスポンス |
各ステップ内の Logs タブには、生成された SQL、選択された接続、MCP の生レスポンスなどの詳細な出力が表示されます。
注意: Response ステップの Exports タブには、{ "success": true } と "status 200" のようなサマリーのみが表示されます。これはワークフローが正常に実行されたことを示していますが、完全なデータは表示されません。
データ行、SQL、スキーマを含む完全な出力を確認するには、MCP ステップのタブをクリックし、Exports タブを確認してください。$return_value を展開すると、完全なレスポンスが表示されます:
このワークフローは以下を自動的に実行します:
- CData の接続をすべて検出
- 質問に最も関連する接続を選択
- スキーマとテーブルを動的に検出
- 適切な SQL クエリを生成して実行
- 結果を返却
仕組みについて
この連携では、CData Connect AI の以下の MCP ツールを順番に使用します:
| MCP ツール | 用途 |
|---|---|
| getCatalogs | CData Connect AI から利用可能な接続をすべて取得します |
| getSchemas | 特定の接続のデータベーススキーマを取得します |
| getTables | 特定のスキーマのすべてのテーブルとビューを取得します |
| queryData | SQL クエリを実行し、結果を返します |
OpenAI の LLM が、自然言語の質問と CData MCP ツールの間のインテリジェントレイヤーとして機能し、適切な接続の選択、データ構造の検出、正確な SQL クエリの生成を自動で行います。
CData Connect AI でビジネスシステムのデータ活用を今すぐスタート
Pipedream と CData Connect AI を組み合わせることで、自然言語クエリがエンタープライズシステム全体のリアルタイムデータ操作に自動変換される、インテリジェントな AI 駆動ワークフローを実現できます。ETL パイプライン、データ同期ジョブ、カスタム連携ロジックは不要です。このアプローチにより、ガバナンスの強化、運用コストの削減、AI ワークフローからのより迅速で正確なレスポンスが可能になります。
いかがでしたか?Pipedream から Paylocity へのデータ接続が簡単に完了したのではないでしょうか。業務に使えそう、と感じてくださった方は、14日間の無償トライアルで AI ツールからビジネスシステムへのリアルタイムデータ接続をぜひお試しください。