上級開発者ガイド - LangGraph で顧客ヘルスモニタリングエージェントを構築する
データスキーマを自律的に検出し、ライブのエンタープライズデータにクエリを実行して、エグゼクティブ向けのヘルスブリーフを生成する、 インテリジェントな顧客ヘルス分析エージェントを構築してみましょう。このガイドでは、 LangGraph のマルチエージェントワークフローと CData Connect AI を組み合わせ、 拡張可能な 3 ノードのエージェントパイプラインによる自律的なデータ検出・分析を実現する Python アプリケーションの作成方法を解説します。
NOTE: このガイドではデータソースとして Google Sheets を使用していますが、同じ手法は CData Connect AI がサポートする 350 以上のデータソースすべてに適用できます。
このガイドを最後まで進めると、以下のことができる Python アプリケーションが完成します。
- CData Connect AI を通じて LangGraph ReAct エージェントを 350 以上のエンタープライズデータソースに接続する
- 自律的なスキーマ検出、LLM によるデータ分析、HTML レンダリングの 3 ノードエージェントパイプラインを構築する
- 構成可能なファクトリを使って複数の LLM プロバイダー(OpenAI、Anthropic、Google、Ollama)をサポートする
- 検出済みのスキーマをキャッシュして、以降の実行を高速化する
- ヘルススコア、シグナル、レコメンデーション、リスクを構造化した HTML ブリーフを生成する
- カスタム分析ノードを追加してエージェントパイプラインを拡張する
アーキテクチャ概要
このアプリケーションは、Model Context Protocol(MCP)を使用して LangGraph とデータソースを橋渡しします。
┌────────────────────────────────────────────────┐
│ ReAct Gatherer Agent │
│ │
│ LLM decides next action │
│ | │
│ v │
│ MCP Tools (5 tools) CData Connect │
│ get_catalogs, get_schemas ──> AI MCP Server │
│ get_tables, get_columns (350+ sources) │
│ query_data │
│ | │
│ v loop until enough data gathered │
└───────────────────┬────────────────────────────┘
|
v
┌───────────────────────────────────────────────┐
│ Analyst Node (LLM) │
│ Structured JSON: health_score, signals, │
│ recommendations, risks, opportunities │
└───────────────────┬───────────────────────────┘
|
v
┌───────────────────────────────────────────────┐
│ Renderer Node (Deterministic) │
│ Jinja2 template -> HTML brief │
│ No LLM call, pure template rendering │
└───────────────────┬───────────────────────────┘
|
v
output/*.html
動作の仕組み:
- ReAct エージェントが CData Connect AI の MCP ツールを使ってスキーマを自律的に検出し、ツール呼び出しを繰り返してデータを収集します
- Analyst ノードが 1 回の LLM 呼び出しで構造化された JSON 形式のヘルスアセスメント(スコア、シグナル、レコメンデーション、リスク)を生成します
- Renderer ノードが分析結果を Jinja2 テンプレートに流し込み、スタイル付きの HTML ブリーフを保存します(LLM 不要、決定論的処理)
- 各ノードはツールを追加することでフルエージェントにアップグレード可能です。パイプラインはマルチエージェント対応の設計になっています
- スキーマキャッシュにより、以降の実行で冗長なスキーマ検出を回避します(デフォルト TTL: 24 時間)
前提条件
このガイドを進めるには、以下が必要です。
- Python 3.8 以上がインストール済みであること(Python のダウンロード)
- pip パッケージインストーラー(Python 3.4 以降に同梱)。pip --version で確認できます
- OpenAI API キー(有料アカウントが必要)、またはサポート対象の LLM プロバイダー(Anthropic、Google、Ollama)のいずれか
- CData Connect AI アカウント(無料トライアルはこちら)
- サンプルの Google Sheets データ用の Google アカウント
はじめに
概要
全体の流れを簡単にまとめると、以下のステップで進めていきます。
- Google Sheets にサンプルデータをセットアップする
- CData Connect AI を設定し、Personal Access Token を作成する
- Python プロジェクトをセットアップして依存関係をインストールする
- コードのアーキテクチャを理解する
- エージェントを実行する
STEP 1: Google Sheets にサンプルデータをセットアップする
まずは、顧客データを含むサンプルの Google Sheet を使って機能を確認していきましょう。 このデータセットには、アカウント情報、営業案件、サポートチケット、利用状況メトリクスが含まれています。
- サンプルの顧客ヘルススプレッドシートにアクセスします
- ファイル > コピーを作成をクリックして、自分の Google ドライブに保存します
- わかりやすい名前を付けましょう(例: "demo_organization")。この名前は後のステップで使用します
スプレッドシートには、以下の 4 つのシートが含まれています。
- account: 会社情報(会社名、業種、売上、従業員数)
- opportunity: 営業パイプラインデータ(ステージ、金額、確度)
- tickets: サポートチケット(優先度、ステータス、説明)
- usage: 製品利用状況メトリクス(ジョブ実行回数、処理レコード数)
STEP 2: CData Connect AI を設定する
2.1 サインアップまたはログイン
- https://www.cdata.com/ai/signup/ から新規アカウントを作成するか、 https://cloud.cdata.com/ にログインします
- 新規アカウントを作成する場合は、登録手順を完了してください
2.2 Google Sheets 接続を追加する
-
ログインしたら、左側のナビゲーションメニューで Sources をクリックし、Add Connection をクリックします
-
Add Connection パネルから Google Sheets を選択します
-
接続を設定します:
- Spreadsheet プロパティに、コピーしたシートの名前(例: "demo_organization")を設定します
- Sign in をクリックして Google OAuth で認証します
-
認証が完了したら、Permissions タブに移動して、ユーザーにアクセス権があることを確認します
2.3 Personal Access Token を作成する
Python アプリケーションは、Personal Access Token(PAT)を使って Connect AI に認証します。
- 右上の歯車アイコンをクリックして Settings を開きます
- Access Tokens セクションに移動します
- Create PAT をクリックします
-
トークンに名前を付け(例: "LangGraph Customer Health")、Create をクリックします
- 重要: トークンは一度しか表示されないため、すぐにコピーしてください!
STEP 3: Python プロジェクトをセットアップする
3.1 GitHub からクローン(推奨)
すべてのソースファイルを含む完全なプロジェクトをクローンします。
git clone https://github.com/CDataSoftware/langgraph-customer-health-agent.git
cd langgraph-customer-health-agent
pip install -r requirements.txt
python run.py
インタラクティブランナー(run.py)が、認証情報のセットアップと初回の分析実行をガイドしてくれます。
3.2 スクラッチから作成する場合
新しいプロジェクトディレクトリを作成し、依存関係をインストールします。
mkdir langgraph-customer-health-agent
cd langgraph-customer-health-agent
pip install langgraph langchain-core langchain-openai requests python-dotenv jinja2 rich
その後、STEP 4 と STEP 5 で説明するソースファイルを作成します。
3.3 環境変数を設定する
方法 A: python run.py を実行し、セットアップウィザード(オプション 1)を選択して、対話形式で認証情報を設定します。
方法 B: プロジェクトルートに .env ファイルを手動で作成します。
cp .env.example .env
.env をテキストエディタで開き、認証情報を入力します。
# CData Connect AI
[email protected]
CDATA_PAT=your-personal-access-token
# LLM Provider (openai, anthropic, google, ollama)
LLM_PROVIDER=openai
LLM_MODEL=gpt-4o
OPENAI_API_KEY=sk-proj-...
# Optional: force a specific catalog for demos
# CDATA_CATALOG=MCP_Apps_Demo
プレースホルダーの値を、実際の認証情報に置き換えてください。
STEP 4: コードアーキテクチャを理解する
このプロジェクトは、専用モジュールで構成されたマルチエージェントパイプラインで構成されています。
4.1 設定と LLM ファクトリ
config.py モジュールは環境変数を読み込み、複数の LLM プロバイダーをサポートする get_llm() ファクトリを提供します。
"""Configuration management for the customer health agent."""
import os
from dotenv import load_dotenv
load_dotenv(override=True)
# CData Connect AI
CDATA_EMAIL = os.getenv("CDATA_EMAIL")
CDATA_PAT = os.getenv("CDATA_PAT")
MCP_ENDPOINT = "https://mcp.cloud.cdata.com/mcp"
# Optional: force a specific catalog for demos
CDATA_CATALOG = os.getenv("CDATA_CATALOG")
# LLM configuration
LLM_PROVIDER = os.getenv("LLM_PROVIDER", "openai")
LLM_MODEL = os.getenv("LLM_MODEL", "gpt-4o")
def get_llm(temperature=0, model_override=None):
"""Factory function to create an LLM instance based on LLM_PROVIDER."""
provider = LLM_PROVIDER.lower()
model = model_override or LLM_MODEL
if provider == "openai":
from langchain_openai import ChatOpenAI
return ChatOpenAI(model=model, temperature=temperature)
elif provider == "anthropic":
from langchain_anthropic import ChatAnthropic
return ChatAnthropic(model=model, temperature=temperature)
elif provider == "google":
from langchain_google_genai import ChatGoogleGenerativeAI
return ChatGoogleGenerativeAI(model=model, temperature=temperature)
elif provider == "ollama":
from langchain_ollama import ChatOllama
return ChatOllama(model=model, temperature=temperature)
get_llm() ファクトリは遅延インポートを使用しているため、実際に使用するプロバイダーのパッケージのみインストールすれば OK です。.env ファイルで LLM_PROVIDER と LLM_MODEL を設定して、プロバイダーを切り替えられます。
4.2 MCP ツール
5 つの @tool デコレータ付き関数が CData Connect AI の MCP エンドポイントをラップします。共有の requests.Session が認証を処理します。
import base64
import json
import requests
from langchain_core.tools import tool
from config import CDATA_EMAIL, CDATA_PAT, MCP_ENDPOINT, CDATA_CATALOG
# Shared session with Basic Auth
_session = requests.Session()
_credentials = f"{CDATA_EMAIL}:{CDATA_PAT}"
_encoded = base64.b64encode(_credentials.encode()).decode()
_session.headers.update({
"Authorization": f"Basic {_encoded}",
"Content-Type": "application/json",
"Accept": "application/json, text/event-stream",
})
def _call_mcp(method, params):
"""Send a JSON-RPC 2.0 request to the MCP endpoint."""
payload = {"jsonrpc": "2.0", "id": 1, "method": method, "params": params}
resp = _session.post(MCP_ENDPOINT, json=payload, timeout=60, stream=True)
# Parse SSE response (data: {...})
for line in resp.text.split("
"):
if line.startswith("data: "):
return json.loads(line[6:]).get("result", {})
@tool
def get_catalogs() -> str:
"""List all available data source connections (catalogs)."""
if CDATA_CATALOG:
return f"Available catalogs:
- {CDATA_CATALOG}"
result = _call_mcp("tools/call", {"name": "getCatalogs", "arguments": {}})
return _extract_text(result)
@tool
def get_tables(catalog_name: str, schema_name: str) -> str:
"""List tables in a catalog and schema."""
result = _call_mcp("tools/call", {
"name": "getTables",
"arguments": {"catalogName": catalog_name, "schemaName": schema_name}
})
return _extract_text(result)
@tool
def query_data(sql_query: str) -> str:
"""Execute a SQL SELECT query. Use [Catalog].[Schema].[Table] format."""
result = _call_mcp("tools/call", {"name": "queryData", "arguments": {"query": sql_query}})
return _extract_text(result)
ReAct エージェントはこれらのツールを自律的に呼び出します。環境変数 CDATA_CATALOG を設定すると、デモ時にカタログ検出をスキップして既知のカタログ名を直接返すことができます。
4.3 スキーマキャッシュ
schema_cache.py モジュールは、検出したスキーマメタデータを ~/.cache/langgraph-health/schema.json にキャッシュします。TTL は設定可能で、デフォルトは 24 時間です。
import json, time
from pathlib import Path
from config import SCHEMA_CACHE_TTL
CACHE_FILE = Path.home() / ".cache" / "langgraph-health" / "schema.json"
def is_valid():
"""Check if cache exists and is within TTL."""
if not CACHE_FILE.exists():
return False
return (time.time() - CACHE_FILE.stat().st_mtime) < SCHEMA_CACHE_TTL
def load():
"""Load cached schema data."""
return json.loads(CACHE_FILE.read_text())
def save(schema_data):
"""Save schema to cache."""
CACHE_FILE.parent.mkdir(parents=True, exist_ok=True)
CACHE_FILE.write_text(json.dumps(schema_data, indent=2))
キャッシュが有効な場合、gatherer エージェントはキャッシュ済みのスキーマをシステムプロンプトに注入するため、スキーマ検出をスキップしてすぐにクエリを開始できます。スキーマの再検出を強制するには --refresh-schema を使用してください。
4.4 LangGraph ワークフロー
このエージェントは、LangGraph の StateGraph で構築された 3 ノードのパイプラインを使用します。
gather (ReAct Agent) ──> analyze (LLM Node) ──> render (Deterministic Node)
| | |
Discover schema, Produce structured Fill HTML template,
query data via JSON assessment save styled brief
MCP tool calls (score, signals, to output/
recommendations)
from langgraph.graph import StateGraph
from state import AgentState
from agents.gatherer import gather_node
from agents.analyst import analyze_node
from agents.renderer import render_node
# Extensible pipeline -- add your agents to this list
PIPELINE = [
("gather", gather_node),
("analyze", analyze_node),
("render", render_node),
]
def build_graph():
"""Build and compile the LangGraph workflow."""
graph = StateGraph(AgentState)
for name, func in PIPELINE:
graph.add_node(name, func)
graph.set_entry_point(PIPELINE[0][0])
for i in range(len(PIPELINE) - 1):
graph.add_edge(PIPELINE[i][0], PIPELINE[i + 1][0])
return graph.compile()
PIPELINE リストを使えば、エージェントの追加・削除・順序変更が簡単にできます。各ノードは共有の AgentState ディクショナリを通じてデータの読み書きを行います。
gatherer ノードは LangGraph の create_react_agent を使って ReAct ループを作成します。
from langgraph.prebuilt import create_react_agent
from config import get_llm, CDATA_CATALOG
from mcp_tools import get_catalogs, get_schemas, get_tables, get_columns, query_data
import schema_cache
TOOLS = [get_catalogs, get_schemas, get_tables, get_columns, query_data]
def gather_node(state):
"""ReAct data gatherer -- discovers schemas and queries data."""
sys_prompt = "You are a data gathering agent..."
# Inject cached schema if available
if schema_cache.is_valid():
sys_prompt += f"
Cached schema:
{json.dumps(schema_cache.load())}"
llm = get_llm()
agent = create_react_agent(llm, TOOLS, prompt=sys_prompt)
result = agent.invoke({"messages": [("user", state["user_prompt"])]})
return {"gathered_data": result["messages"][-1].content}
ReAct エージェントは、どのツールをどの順番で呼び出すかを自律的に判断し、接続先のデータソースに適応します。
4.5 ロガー
logger.py モジュールは、カスタムフォーマットと実行統計を備えた軽量ロガーを提供します。--verbose を使用すると、MCP 呼び出しやタイミングなどの詳細な出力を確認できます。
[gatherer] 14:32:01 Schema cache hit [analyst] 14:32:05 Analyzing gathered data [renderer] 14:32:08 Brief saved to output/20260224_143208_premium_auto_health_brief.html [summary] 14:32:08 --- Run Summary --- [summary] 14:32:08 LLM calls: 4 [summary] 14:32:08 MCP calls: 12 [summary] 14:32:08 Total time: 7.23s
STEP 5: エージェントを実行する
5.1 インタラクティブランナー(初回推奨)
最も簡単な方法は、インタラクティブランナーを使うことです。メニュー形式のインターフェースで、認証情報のセットアップ、LLM プロバイダーの選択、エージェントの実行を案内してくれます。
python run.py
ランナーには 5 つのオプションがあります。
- セットアップウィザード — CData の認証情報を設定し、LLM プロバイダー(OpenAI、Gemini、DeepSeek)とモデルを選択します
- ヘルス分析の実行 — 特定のアカウントを分析します(サンプルアカウントの候補も表示されます)
- 自由形式クエリの実行 — データに関する任意の質問ができます(サンプルクエリの候補も表示されます)
- スキーマキャッシュの更新 — キャッシュ済みスキーマをクリアして再検出します
- セットアップの確認 — 認証情報の検証、MCP 接続のテスト、依存関係のチェックを行います
rich ライブラリは、未インストールの場合は初回実行時に自動でインストールされます。
5.2 CLI から直接実行: アカウントヘルス分析
コマンドラインからエージェントを直接実行することもできます。
python src/main.py --account "Premium Auto Group Europe"
実行結果の例:
- ReAct エージェントがスキーマを検出し、アカウント、営業案件、チケットのデータにクエリを実行します
- Analyst ノードがシグナルとレコメンデーション付きのヘルススコアを生成します
- HTML ブリーフが output/TIMESTAMP_AccountName_health_brief.html に保存されます
5.3 CLI から直接実行: 自由形式クエリ
自然言語で自由に質問できます。
python src/main.py "Show me the top 10 customers by revenue"
ReAct エージェントがどのテーブルにどんなクエリを実行すべきかを自動で判断します。複数のテーブルにまたがる複雑な質問も可能です。
python src/main.py "Which industries have the most high-priority open tickets?" --verbose
5.4 詳細出力モード
--verbose を付けると、ツール呼び出しやタイミングを含む詳細なエージェント出力を確認できます。
python src/main.py --account "Premium Auto Group Europe" --verbose
以下は、エージェントが生成したヘルスブリーフのサンプルです。
STEP 6: クエリの例
データを探索するためのクエリ例をいくつか紹介します。
| カテゴリ | クエリ |
|---|---|
| 売上 | python src/main.py "Show me the top 10 customers by annual revenue" |
| 業種 | python src/main.py "All customers in the energy sector" |
| パイプライン | python src/main.py "How many open opportunities do we have and total value" |
| サポート | python src/main.py "Show all high priority open tickets" |
| セグメンテーション | python src/main.py "Customer count by industry" |
| アカウントヘルス | python src/main.py --account "Premium Auto Group Europe" |
STEP 7: 利用可能な MCP ツール
AI エージェントは、以下の CData Connect AI ツールを使用できます。
| ツール | 説明 |
|---|---|
| getCatalogs | 利用可能なデータソース接続を一覧表示 |
| getSchemas | 特定のカタログのスキーマを取得 |
| getTables | スキーマ内のテーブルを取得 |
| getColumns | テーブルのカラムメタデータを取得 |
| queryData | SQL クエリを実行 |
| getProcedures | ストアドプロシージャを一覧表示 |
| getProcedureParameters | プロシージャのパラメータ詳細を取得 |
| executeProcedure | ストアドプロシージャを実行 |
トラブルシューティング
クエリの結果が返ってこない場合
- CData Connect AI での接続名が正しいことを確認してください
- Connect AI のデータエクスプローラーを使って、テーブル名やカラム名が存在するか確認してください
- まずはシンプルなクエリを試してみてください: python src/main.py "Show me all customers"
- --verbose を使って、エージェントが生成している SQL クエリを確認してください
LLM API エラー
- OPENAI_API_KEY(または同等のキー)が有効で、利用可能なクレジットがあることを確認してください
- このエージェントは GPT-4o または Claude Sonnet で最適に動作します。.env で LLM_MODEL を設定してください
- カスタム API エンドポイントを使用する場合は、.env で OPENAI_API_BASE を設定してください
認証エラー
- .env の CData メールアドレスと PAT が正しいことを確認してください
- PAT の有効期限が切れていないことを確認してください
- Connect AI アカウントがアクティブであることを確認してください
エージェントのループ回数が多すぎる場合
- .env で CDATA_CATALOG を設定して、カタログ検出をスキップし、エージェントのスコープを絞ってください
- MAX_ITERATIONS(デフォルト: 15)を減らして、ツール呼び出しループの上限を制限してください
- --verbose を使って、各ステップでエージェントが何をしているか確認してください
ツール呼び出しの失敗
- CData Connect AI インスタンスに、少なくとも 1 つのアクティブなデータソースが接続されていることを確認してください
- 完全修飾テーブル名を使用してください: [Catalog].[Schema].[Table]
- Connect AI のデータエクスプローラーを使って、カラム名が存在するか確認してください
次のステップ
顧客ヘルスエージェントが動作するようになったら、以下のような拡張が可能です。
- エージェントパイプラインを拡張する: graph.py の PIPELINE リストにカスタムエージェントを追加できます。たとえば、競合分析ノード、解約予測エージェント、財務モデリングステップなどを Analyst と Renderer の間に追加できます。
- データソースを追加する: CData Connect AI のダッシュボードから Salesforce、HubSpot、Snowflake をはじめ 350 以上のデータソースを追加できます。ReAct エージェントがスキーマを自動検出します。
- LLM プロバイダーを切り替える: .env の LLM_PROVIDER と LLM_MODEL を変更して、Anthropic Claude、Google Gemini、または Ollama 経由のローカルモデルを使用できます。
- スケジュール実行を追加する: ヘルス分析をスケジュールに沿って自動実行し、プロアクティブな顧客モニタリングを実現できます。
- Human-in-the-Loop を追加する: LangGraph は、エージェントのアクションを人間がレビュー・承認してから次に進む中断ポイントをサポートしています。
- 高度なパターンを探求する: LangGraph のドキュメントでは、サイクル、分岐、並列実行、マルチエージェント連携などの高度なパターンが紹介されています。
リソース
- GitHub リポジトリ - 完全なソースコード
- LangGraph ドキュメント - 高度なワークフローパターンとステート管理
- CData Connect AI ドキュメント - データソースの追加とガバナンス付きアクセスの設定
- CData プロンプトライブラリ - さまざまなユースケース向けのプロンプト例
- OpenAI API ドキュメント - OpenAI モデルと API リファレンス
- Model Context Protocol - MCP の仕様とドキュメント
CData Connect AI を始めましょう
AI 搭載のデータアプリケーションを構築する準備はできましたか? CData Connect AI は、350 以上のエンタープライズデータソースへのガバナンス付きのセキュアなアクセスを AI アプリケーションに提供します。 LangGraph エージェントは、単一の MCP インターフェースを通じて Salesforce、Snowflake、HubSpot、Google Sheets、各種データベースなどのライブビジネスデータにクエリを実行できます。
無料トライアルにサインアップして、 インテリジェントな顧客ヘルスエージェントの構築を始めましょう!