開発者ガイド(上級) - LangGraph による顧客ヘルス分析エージェントの構築
データスキーマを自律的に検出し、ライブのエンタープライズデータをクエリし、エグゼクティブ向けのヘルスブリーフを生成するインテリジェントな顧客ヘルス分析エージェントを構築します。 このガイドでは、LangGraph のマルチエージェントワークフローと CData Connect AI を組み合わせて、拡張可能な 3 ノードのエージェントパイプラインによる自律的なデータ検出と分析を実現する Python アプリケーションの作成方法を説明します。
注意: このガイドでは Google スプレッドシートをデータソースとして使用していますが、CData Connect AI がサポートする 350 以上のデータソースのいずれにも同じ原則が適用されます。
このガイドを完了すると、以下の機能を持つ Python アプリケーションが完成します:
- CData Connect AI を通じて 350 以上のエンタープライズデータソースに LangGraph ReAct エージェントから接続
- 自律的なスキーマ検出、LLM を活用した分析、HTML レンダリングを備えた 3 ノードのエージェントパイプラインを構築
- 設定可能なファクトリーを通じて複数の LLM プロバイダー(OpenAI、Anthropic、Google、Ollama)をサポート
- 検出したスキーマをキャッシュして以降の実行を高速化
- ヘルススコア、シグナル、推奨事項、リスクを構造化した HTML ブリーフを生成
- カスタム分析ノードによるエージェントパイプラインの拡張
アーキテクチャの概要
このアプリケーションは、Model Context Protocol(MCP)を使用して LangGraph とデータソースを橋渡しします:
┌────────────────────────────────────────────────┐
│ ReAct Gatherer Agent │
│ │
│ LLM decides next action │
│ | │
│ v │
│ MCP Tools (5 tools) CData Connect │
│ get_catalogs, get_schemas ──> AI MCP Server │
│ get_tables, get_columns (350+ sources) │
│ query_data │
│ | │
│ v loop until enough data gathered │
└───────────────────┬────────────────────────────┘
|
v
┌───────────────────────────────────────────────┐
│ Analyst Node (LLM) │
│ Structured JSON: health_score, signals, │
│ recommendations, risks, opportunities │
└───────────────────┬───────────────────────────┘
|
v
┌───────────────────────────────────────────────┐
│ Renderer Node (Deterministic) │
│ Jinja2 template -> HTML brief │
│ No LLM call, pure template rendering │
└───────────────────┬───────────────────────────┘
|
v
output/*.html
仕組み:
- ReAct エージェントが CData Connect AI の MCP ツールを使用して自律的にスキーマを検出し、反復的なツール呼び出しでデータを収集
- Analyst ノードが単一の LLM 呼び出しで構造化された JSON ヘルスアセスメント(スコア、シグナル、推奨事項、リスク)を生成
- Renderer ノードが Jinja2 テンプレートに分析結果を埋め込み、スタイル付きの HTML ブリーフを保存(LLM 不使用、決定論的処理)
- 各ノードにツールを追加することで完全なエージェントにアップグレード可能。パイプラインはマルチエージェント対応
- スキーマキャッシュにより、以降の実行で冗長な検出を回避(デフォルト TTL: 24 時間)
前提条件
このガイドには以下が必要です:
- システムにインストールされた Python 3.8+(Python のダウンロード)
- pip パッケージインストーラー(Python 3.4+ に同梱)。pip --version で確認できます
- OpenAI API キー(有料アカウントが必要)、または対応する LLM プロバイダー(Anthropic、Google、Ollama)
- CData Connect AI アカウント(無料トライアル利用可能)
- サンプル Google スプレッドシートデータ用の Google アカウント
はじめに
概要
手順の概要は以下の通りです:
- Google スプレッドシートでサンプルデータをセットアップ
- CData Connect AI を設定し、パーソナルアクセストークンを作成
- Python プロジェクトをセットアップし、依存関係をインストール
- コードアーキテクチャを理解
- エージェントを実行
ステップ 1: Google スプレッドシートでサンプルデータをセットアップ
機能をデモンストレーションするために、顧客データを含むサンプル Google スプレッドシートを使用します。 このデータセットには、アカウント、販売機会、サポートチケット、使用状況メトリクスが含まれています。
- サンプル顧客ヘルススプレッドシートに移動します
- ファイル > コピーを作成をクリックして、Google ドライブに保存します
- わかりやすい名前を付けます(例:「demo_organization」)。この名前は後で接続を設定する際に必要になります。
スプレッドシートには 4 つのシートが含まれています:
- account: 会社情報(名前、業界、収益、従業員数)
- opportunity: 販売パイプラインデータ(ステージ、金額、確率)
- tickets: サポートチケット(優先度、ステータス、説明)
- usage: 製品使用状況メトリクス(ジョブ実行回数、処理レコード数)
ステップ 2: CData Connect AI の設定
2.1 サインアップまたはログイン
- https://jp.cdata.com/ai/signup/ で新規アカウントを作成するか、 https://cloud.cdata.com/ でログインします
- 新規アカウントを作成する場合は、登録プロセスを完了します
2.2 Google スプレッドシート接続を追加
-
ログイン後、左側のナビゲーションメニューでSourcesをクリックし、Add Connectionをクリックします
-
Add Connection パネルからGoogle Sheetsを選択します
-
接続を設定します:
- Spreadsheet プロパティに、コピーしたシートの名前を設定します(例:「demo_organization」)
- Sign inをクリックして Google OAuth で認証します
-
認証後、Permissionsタブに移動し、ユーザーがアクセス権を持っていることを確認します
2.3 パーソナルアクセストークンの作成
Python アプリケーションは、パーソナルアクセストークン(PAT)を使用して Connect AI と認証します。
- 右上の歯車アイコンをクリックして設定を開きます
- Access Tokensセクションに移動します
- Create PATをクリックします
-
トークンに名前を付け(例:「LangGraph Customer Health」)、Createをクリックします
- 重要: トークンはすぐにコピーしてください。一度しか表示されません!
ステップ 3: Python プロジェクトのセットアップ
3.1 GitHub からクローン(推奨)
すべてのソースファイルを含む完全なプロジェクトをクローンします:
git clone https://github.com/CDataSoftware/langgraph-customer-health-agent.git
cd langgraph-customer-health-agent
pip install -r requirements.txt
python run.py
インタラクティブランナー(run.py)が認証情報の設定と最初の分析実行をガイドします。
3.2 代替方法: ゼロから作成
新しいプロジェクトディレクトリを作成し、依存関係をインストールします:
mkdir langgraph-customer-health-agent
cd langgraph-customer-health-agent
pip install langgraph langchain-core langchain-openai requests python-dotenv jinja2 rich
次に、ステップ 4 と 5 で説明するソースファイルを作成します。
3.3 環境変数の設定
オプション A: python run.py を実行し、セットアップウィザード(オプション 1)を選択して、対話的に認証情報を設定します。
オプション B: プロジェクトルートに .env ファイルを手動で作成します:
cp .env.example .env
.env をテキストエディタで開き、認証情報を入力します:
# CData Connect AI
[email protected]
CDATA_PAT=your-personal-access-token
# LLM Provider (openai, anthropic, google, ollama)
LLM_PROVIDER=openai
LLM_MODEL=gpt-4o
OPENAI_API_KEY=sk-proj-...
# Optional: force a specific catalog for demos
# CDATA_CATALOG=MCP_Apps_Demo
プレースホルダーの値を実際の認証情報に置き換えてください。
ステップ 4: コードアーキテクチャの理解
プロジェクトは、専門モジュールで構成されたマルチエージェントパイプラインで構成されています:
4.1 設定と LLM ファクトリー
config.py モジュールは環境変数を読み込み、複数の LLM プロバイダーをサポートする get_llm() ファクトリーを提供します:
"""Configuration management for the customer health agent."""
import os
from dotenv import load_dotenv
load_dotenv(override=True)
# CData Connect AI
CDATA_EMAIL = os.getenv("CDATA_EMAIL")
CDATA_PAT = os.getenv("CDATA_PAT")
MCP_ENDPOINT = "https://mcp.cloud.cdata.com/mcp"
# Optional: force a specific catalog for demos
CDATA_CATALOG = os.getenv("CDATA_CATALOG")
# LLM configuration
LLM_PROVIDER = os.getenv("LLM_PROVIDER", "openai")
LLM_MODEL = os.getenv("LLM_MODEL", "gpt-4o")
def get_llm(temperature=0, model_override=None):
"""Factory function to create an LLM instance based on LLM_PROVIDER."""
provider = LLM_PROVIDER.lower()
model = model_override or LLM_MODEL
if provider == "openai":
from langchain_openai import ChatOpenAI
return ChatOpenAI(model=model, temperature=temperature)
elif provider == "anthropic":
from langchain_anthropic import ChatAnthropic
return ChatAnthropic(model=model, temperature=temperature)
elif provider == "google":
from langchain_google_genai import ChatGoogleGenerativeAI
return ChatGoogleGenerativeAI(model=model, temperature=temperature)
elif provider == "ollama":
from langchain_ollama import ChatOllama
return ChatOllama(model=model, temperature=temperature)
get_llm() ファクトリーは遅延インポートを使用するため、使用するプロバイダーのパッケージのみが必要です。.env ファイルで LLM_PROVIDER と LLM_MODEL を設定してプロバイダーを切り替えられます。
4.2 MCP ツール
5 つの @tool デコレータ付き関数が CData Connect AI の MCP エンドポイントをラップします。共有の requests.Session が認証を処理します:
import base64
import json
import requests
from langchain_core.tools import tool
from config import CDATA_EMAIL, CDATA_PAT, MCP_ENDPOINT, CDATA_CATALOG
# Shared session with Basic Auth
_session = requests.Session()
_credentials = f"{CDATA_EMAIL}:{CDATA_PAT}"
_encoded = base64.b64encode(_credentials.encode()).decode()
_session.headers.update({
"Authorization": f"Basic {_encoded}",
"Content-Type": "application/json",
"Accept": "application/json, text/event-stream",
})
def _call_mcp(method, params):
"""Send a JSON-RPC 2.0 request to the MCP endpoint."""
payload = {"jsonrpc": "2.0", "id": 1, "method": method, "params": params}
resp = _session.post(MCP_ENDPOINT, json=payload, timeout=60, stream=True)
# Parse SSE response (data: {...})
for line in resp.text.split("
"):
if line.startswith("data: "):
return json.loads(line[6:]).get("result", {})
@tool
def get_catalogs() -> str:
"""List all available data source connections (catalogs)."""
if CDATA_CATALOG:
return f"Available catalogs:
- {CDATA_CATALOG}"
result = _call_mcp("tools/call", {"name": "getCatalogs", "arguments": {}})
return _extract_text(result)
@tool
def get_tables(catalog_name: str, schema_name: str) -> str:
"""List tables in a catalog and schema."""
result = _call_mcp("tools/call", {
"name": "getTables",
"arguments": {"catalogName": catalog_name, "schemaName": schema_name}
})
return _extract_text(result)
@tool
def query_data(sql_query: str) -> str:
"""Execute a SQL SELECT query. Use [Catalog].[Schema].[Table] format."""
result = _call_mcp("tools/call", {"name": "queryData", "arguments": {"query": sql_query}})
return _extract_text(result)
ReAct エージェントがこれらのツールを自律的に呼び出します。CDATA_CATALOG 環境変数を設定すると、デモ用にカタログ検出をスキップし、指定されたカタログ名のみを返します。
4.3 スキーマキャッシュ
schema_cache.py モジュールは、検出されたスキーマメタデータを ~/.cache/langgraph-health/schema.json にキャッシュします(設定可能な TTL、デフォルト 24 時間):
import json, time
from pathlib import Path
from config import SCHEMA_CACHE_TTL
CACHE_FILE = Path.home() / ".cache" / "langgraph-health" / "schema.json"
def is_valid():
"""Check if cache exists and is within TTL."""
if not CACHE_FILE.exists():
return False
return (time.time() - CACHE_FILE.stat().st_mtime) < SCHEMA_CACHE_TTL
def load():
"""Load cached schema data."""
return json.loads(CACHE_FILE.read_text())
def save(schema_data):
"""Save schema to cache."""
CACHE_FILE.parent.mkdir(parents=True, exist_ok=True)
CACHE_FILE.write_text(json.dumps(schema_data, indent=2))
キャッシュが有効な場合、Gatherer エージェントはキャッシュされたスキーマをシステムプロンプトに挿入し、検出をスキップして即座にクエリを開始できます。--refresh-schema を使用するとスキーマの再検出を強制できます。
4.4 LangGraph ワークフロー
エージェントは LangGraph の StateGraph で構築された 3 ノードのパイプラインを使用します:
gather (ReAct Agent) ──> analyze (LLM Node) ──> render (Deterministic Node)
| | |
Discover schema, Produce structured Fill HTML template,
query data via JSON assessment save styled brief
MCP tool calls (score, signals, to output/
recommendations)
from langgraph.graph import StateGraph
from state import AgentState
from agents.gatherer import gather_node
from agents.analyst import analyze_node
from agents.renderer import render_node
# Extensible pipeline -- add your agents to this list
PIPELINE = [
("gather", gather_node),
("analyze", analyze_node),
("render", render_node),
]
def build_graph():
"""Build and compile the LangGraph workflow."""
graph = StateGraph(AgentState)
for name, func in PIPELINE:
graph.add_node(name, func)
graph.set_entry_point(PIPELINE[0][0])
for i in range(len(PIPELINE) - 1):
graph.add_edge(PIPELINE[i][0], PIPELINE[i + 1][0])
return graph.compile()
PIPELINE リストを使用することで、エージェントの追加、削除、並べ替えが簡単にできます。各ノードは共有の AgentState ディクショナリから読み書きを行います。
Gatherer ノードは LangGraph の create_react_agent を使用して ReAct ループを作成します:
from langgraph.prebuilt import create_react_agent
from config import get_llm, CDATA_CATALOG
from mcp_tools import get_catalogs, get_schemas, get_tables, get_columns, query_data
import schema_cache
TOOLS = [get_catalogs, get_schemas, get_tables, get_columns, query_data]
def gather_node(state):
"""ReAct data gatherer -- discovers schemas and queries data."""
sys_prompt = "You are a data gathering agent..."
# Inject cached schema if available
if schema_cache.is_valid():
sys_prompt += f"
Cached schema:
{json.dumps(schema_cache.load())}"
llm = get_llm()
agent = create_react_agent(llm, TOOLS, prompt=sys_prompt)
result = agent.invoke({"messages": [("user", state["user_prompt"])]})
return {"gathered_data": result["messages"][-1].content}
ReAct エージェントは、接続されたデータソースに適応しながら、どのツールをどの順序で呼び出すかを自律的に判断します。
4.5 ロガー
logger.py モジュールは、カスタムフォーマットと実行統計を備えた軽量ロガーを提供します。--verbose を使用すると、MCP 呼び出しやタイミングを含む詳細な出力が表示されます:
[gatherer] 14:32:01 Schema cache hit [analyst] 14:32:05 Analyzing gathered data [renderer] 14:32:08 Brief saved to output/20260224_143208_premium_auto_health_brief.html [summary] 14:32:08 --- Run Summary --- [summary] 14:32:08 LLM calls: 4 [summary] 14:32:08 MCP calls: 12 [summary] 14:32:08 Total time: 7.23s
ステップ 5: エージェントの実行
5.1 インタラクティブランナー(初回推奨)
最も簡単に始められるのはインタラクティブランナーです。認証情報の設定、LLM プロバイダーの選択、エージェントの実行をメニュー形式のインターフェースでガイドします:
python run.py
ランナーには 5 つのオプションがあります:
- セットアップウィザード — CData の認証情報を設定し、LLM プロバイダー(OpenAI、Gemini、DeepSeek)とモデルを選択
- ヘルス分析の実行 — 特定のアカウントを分析(サンプルアカウントの提案あり)
- 自由形式のクエリを実行 — データに関する任意の質問が可能(サンプルクエリの提案あり)
- スキーマキャッシュの更新 — キャッシュされたスキーマをクリアして再検出
- セットアップの確認 — 認証情報の検証、MCP 接続のテスト、依存関係の確認
rich ライブラリがインストールされていない場合は、初回実行時に自動インストールされます。
5.2 CLI から直接実行: アカウントヘルス分析
コマンドラインからエージェントを直接実行することもできます:
python src/main.py --account "Premium Auto Group Europe"
想定される出力:
- ReAct エージェントがスキーマを検出し、アカウント、販売機会、チケットをクエリ
- Analyst ノードがシグナルと推奨事項を含むヘルススコアを生成
- HTML ブリーフが output/TIMESTAMP_AccountName_health_brief.html に保存
5.3 CLI から直接実行: 自由形式のクエリ
自然な英語で任意の質問ができます:
python src/main.py "Show me the top 10 customers by revenue"
ReAct エージェントが、どのテーブルとクエリを実行すべきかを自動的に判断します。複数のテーブルにまたがる複雑な質問も可能です:
python src/main.py "Which industries have the most high-priority open tickets?" --verbose
5.4 Verbose モード
--verbose を追加すると、ツール呼び出しやタイミングを含むエージェントの詳細な出力が表示されます:
python src/main.py --account "Premium Auto Group Europe" --verbose
以下は、エージェントが生成したヘルスブリーフのサンプルです:
ステップ 6: クエリの例
データを探索するためのクエリ例をいくつか紹介します:
| カテゴリ | クエリ |
|---|---|
| 収益 | python src/main.py "Show me the top 10 customers by annual revenue" |
| 業界 | python src/main.py "All customers in the energy sector" |
| パイプライン | python src/main.py "How many open opportunities do we have and total value" |
| サポート | python src/main.py "Show all high priority open tickets" |
| セグメンテーション | python src/main.py "Customer count by industry" |
| アカウントヘルス | python src/main.py --account "Premium Auto Group Europe" |
ステップ 7: 利用可能な MCP ツール
AI エージェントは以下の CData Connect AI ツールにアクセスできます:
| ツール | 説明 |
|---|---|
| getCatalogs | 利用可能なデータソース接続を一覧表示 |
| getSchemas | 特定のカタログのスキーマを取得 |
| getTables | スキーマ内のテーブルを取得 |
| getColumns | テーブルのカラムメタデータを取得 |
| queryData | SQL クエリを実行 |
| getProcedures | ストアドプロシージャを一覧表示 |
| getProcedureParameters | プロシージャのパラメータ詳細を取得 |
| executeProcedure | ストアドプロシージャを実行 |
トラブルシューティング
クエリが結果を返さない
- CData Connect AI の接続名が正しいことを確認してください
- Connect AI のデータエクスプローラーでテーブル名とカラム名が存在するか確認してください
- まずシンプルなクエリを試してみてください: python src/main.py "Show me all customers"
- --verbose でエージェントが生成する SQL クエリを確認してください
LLM API エラー
- OPENAI_API_KEY(または同等のキー)が有効で、利用可能なクレジットがあることを確認してください
- このエージェントは GPT-4o または Claude Sonnet で最も効果的に動作します。.env の LLM_MODEL で設定できます
- カスタム API エンドポイントを使用する場合は、.env で OPENAI_API_BASE を設定してください
認証エラー
- .env の CData メールアドレスと PAT が正しいことを確認してください
- PAT が期限切れになっていないことを確認してください
- Connect AI アカウントがアクティブであることを確認してください
エージェントがループしすぎる
- .env で CDATA_CATALOG を設定し、カタログ検出をスキップしてエージェントのスコープを絞ります
- MAX_ITERATIONS(デフォルト: 15)を減らしてツール呼び出しのループを制限します
- --verbose を使用して、各ステップでエージェントが何を行っているか確認してください
ツール呼び出しの失敗
- CData Connect AI インスタンスに少なくとも 1 つのアクティブなデータソースが接続されていることを確認してください
- 完全修飾テーブル名を使用してください: [Catalog].[Schema].[Table]
- Connect AI のデータエクスプローラーでカラム名が存在するか確認してください
次のステップ
顧客ヘルスエージェントが動作するようになったら、次のことを試してみましょう:
- エージェントパイプラインの拡張: graph.py の PIPELINE リストにカスタムエージェントを追加できます。例えば、Analyst と Renderer の間に競合分析ノード、解約予測エージェント、財務モデリングステップなどを追加できます。
- より多くのデータソースを接続: CData Connect AI ダッシュボードから、Salesforce、HubSpot、Snowflake など 350 以上のサポートされたソースを追加できます。ReAct エージェントがスキーマを自動的に検出します。
- LLM プロバイダーの切り替え: .env の LLM_PROVIDER と LLM_MODEL を変更して、Anthropic Claude、Google Gemini、または Ollama 経由のローカルモデルを使用できます。
- スケジュール実行の追加: プロアクティブな顧客モニタリングのために、ヘルス分析をスケジュールに従って自動実行できます。
- Human-in-the-Loop の追加: LangGraph は割り込みポイントをサポートしており、エージェントのアクションを人間がレビュー・承認してから次に進めることができます。
- 高度なパターンを探索: LangGraph ドキュメントでは、サイクル、ブランチ、並列実行、マルチエージェント連携について詳しく解説されています。
リソース
- GitHub リポジトリ - 完全なソースコード
- LangGraph ドキュメント - 高度なワークフローパターンとステート管理
- CData Connect AI ドキュメント - より多くのデータソースの接続とガバナンス設定
- CData プロンプトライブラリ - さまざまなユースケース向けのサンプルプロンプト
- OpenAI API ドキュメント - OpenAI モデルと API リファレンス
- Model Context Protocol - MCP の仕様とドキュメント
CData Connect AI を始めよう
AI 搭載のデータアプリケーションを構築する準備はできましたか?CData Connect AI は、AI アプリケーションから 350 以上のエンタープライズデータソースへのガバナンスされたセキュアなアクセスを提供します。 LangGraph エージェントから、Salesforce、Snowflake、HubSpot、Google スプレッドシート、データベースなどのライブビジネスデータを、単一の MCP インターフェースを通じてクエリできます。
無料トライアルにサインアップして、今日からインテリジェントな顧客ヘルスエージェントの構築を始めましょう!