開発者ガイド(上級) - LangGraph による顧客ヘルス分析エージェントの構築

データスキーマを自律的に検出し、ライブのエンタープライズデータをクエリし、エグゼクティブ向けのヘルスブリーフを生成するインテリジェントな顧客ヘルス分析エージェントを構築します。 このガイドでは、LangGraph のマルチエージェントワークフローCData Connect AI を組み合わせて、拡張可能な 3 ノードのエージェントパイプラインによる自律的なデータ検出と分析を実現する Python アプリケーションの作成方法を説明します。

注意: このガイドでは Google スプレッドシートをデータソースとして使用していますが、CData Connect AI がサポートする 350 以上のデータソースのいずれにも同じ原則が適用されます。

このガイドを完了すると、以下の機能を持つ Python アプリケーションが完成します:

  • CData Connect AI を通じて 350 以上のエンタープライズデータソースに LangGraph ReAct エージェントから接続
  • 自律的なスキーマ検出、LLM を活用した分析、HTML レンダリングを備えた 3 ノードのエージェントパイプラインを構築
  • 設定可能なファクトリーを通じて複数の LLM プロバイダー(OpenAI、Anthropic、Google、Ollama)をサポート
  • 検出したスキーマをキャッシュして以降の実行を高速化
  • ヘルススコア、シグナル、推奨事項、リスクを構造化した HTML ブリーフを生成
  • カスタム分析ノードによるエージェントパイプラインの拡張

アーキテクチャの概要

このアプリケーションは、Model Context Protocol(MCP)を使用して LangGraph とデータソースを橋渡しします:

┌────────────────────────────────────────────────┐
│              ReAct Gatherer Agent               │
│                                                │
│   LLM decides next action                      │
│      |                                         │
│      v                                         │
│   MCP Tools (5 tools)          CData Connect   │
│   get_catalogs, get_schemas ──> AI MCP Server  │
│   get_tables, get_columns      (350+ sources)  │
│   query_data                                   │
│      |                                         │
│      v  loop until enough data gathered        │
└───────────────────┬────────────────────────────┘
                    |
                    v
┌───────────────────────────────────────────────┐
│           Analyst Node (LLM)                   │
│   Structured JSON: health_score, signals,      │
│   recommendations, risks, opportunities        │
└───────────────────┬───────────────────────────┘
                    |
                    v
┌───────────────────────────────────────────────┐
│          Renderer Node (Deterministic)         │
│   Jinja2 template -> HTML brief               │
│   No LLM call, pure template rendering         │
└───────────────────┬───────────────────────────┘
                    |
                    v
              output/*.html

仕組み:

  1. ReAct エージェントが CData Connect AI の MCP ツールを使用して自律的にスキーマを検出し、反復的なツール呼び出しでデータを収集
  2. Analyst ノードが単一の LLM 呼び出しで構造化された JSON ヘルスアセスメント(スコア、シグナル、推奨事項、リスク)を生成
  3. Renderer ノードが Jinja2 テンプレートに分析結果を埋め込み、スタイル付きの HTML ブリーフを保存(LLM 不使用、決定論的処理)
  4. 各ノードにツールを追加することで完全なエージェントにアップグレード可能。パイプラインはマルチエージェント対応
  5. スキーマキャッシュにより、以降の実行で冗長な検出を回避(デフォルト TTL: 24 時間)

前提条件

このガイドには以下が必要です:



はじめに

概要

手順の概要は以下の通りです:

  1. Google スプレッドシートでサンプルデータをセットアップ
  2. CData Connect AI を設定し、パーソナルアクセストークンを作成
  3. Python プロジェクトをセットアップし、依存関係をインストール
  4. コードアーキテクチャを理解
  5. エージェントを実行

ステップ 1: Google スプレッドシートでサンプルデータをセットアップ

機能をデモンストレーションするために、顧客データを含むサンプル Google スプレッドシートを使用します。 このデータセットには、アカウント、販売機会、サポートチケット、使用状況メトリクスが含まれています。

  1. サンプル顧客ヘルススプレッドシートに移動します
  2. ファイル > コピーを作成をクリックして、Google ドライブに保存します
  3. わかりやすい名前を付けます(例:「demo_organization」)。この名前は後で接続を設定する際に必要になります。

スプレッドシートには 4 つのシートが含まれています:

  • account: 会社情報(名前、業界、収益、従業員数)
  • opportunity: 販売パイプラインデータ(ステージ、金額、確率)
  • tickets: サポートチケット(優先度、ステータス、説明)
  • usage: 製品使用状況メトリクス(ジョブ実行回数、処理レコード数)

ステップ 2: CData Connect AI の設定

2.1 サインアップまたはログイン

  1. https://jp.cdata.com/ai/signup/ で新規アカウントを作成するか、 https://cloud.cdata.com/ でログインします
  2. 新規アカウントを作成する場合は、登録プロセスを完了します

2.2 Google スプレッドシート接続を追加

  1. ログイン後、左側のナビゲーションメニューでSourcesをクリックし、Add Connectionをクリックします
  2. Add Connection パネルからGoogle Sheetsを選択します
  3. 接続を設定します:
    • Spreadsheet プロパティに、コピーしたシートの名前を設定します(例:「demo_organization」)
    • Sign inをクリックして Google OAuth で認証します
  4. 認証後、Permissionsタブに移動し、ユーザーがアクセス権を持っていることを確認します

2.3 パーソナルアクセストークンの作成

Python アプリケーションは、パーソナルアクセストークン(PAT)を使用して Connect AI と認証します。

  1. 右上の歯車アイコンをクリックして設定を開きます
  2. Access Tokensセクションに移動します
  3. Create PATをクリックします
  4. トークンに名前を付け(例:「LangGraph Customer Health」)、Createをクリックします
  5. 重要: トークンはすぐにコピーしてください。一度しか表示されません!

ステップ 3: Python プロジェクトのセットアップ

3.1 GitHub からクローン(推奨)

すべてのソースファイルを含む完全なプロジェクトをクローンします:

git clone https://github.com/CDataSoftware/langgraph-customer-health-agent.git cd langgraph-customer-health-agent pip install -r requirements.txt python run.py

インタラクティブランナー(run.py)が認証情報の設定と最初の分析実行をガイドします。

3.2 代替方法: ゼロから作成

新しいプロジェクトディレクトリを作成し、依存関係をインストールします:

mkdir langgraph-customer-health-agent cd langgraph-customer-health-agent pip install langgraph langchain-core langchain-openai requests python-dotenv jinja2 rich

次に、ステップ 4 と 5 で説明するソースファイルを作成します。

3.3 環境変数の設定

オプション A: python run.py を実行し、セットアップウィザード(オプション 1)を選択して、対話的に認証情報を設定します。

オプション B: プロジェクトルートに .env ファイルを手動で作成します:

cp .env.example .env

.env をテキストエディタで開き、認証情報を入力します:

# CData Connect AI [email protected] CDATA_PAT=your-personal-access-token # LLM Provider (openai, anthropic, google, ollama) LLM_PROVIDER=openai LLM_MODEL=gpt-4o OPENAI_API_KEY=sk-proj-... # Optional: force a specific catalog for demos # CDATA_CATALOG=MCP_Apps_Demo

プレースホルダーの値を実際の認証情報に置き換えてください。


ステップ 4: コードアーキテクチャの理解

プロジェクトは、専門モジュールで構成されたマルチエージェントパイプラインで構成されています:

4.1 設定と LLM ファクトリー

config.py モジュールは環境変数を読み込み、複数の LLM プロバイダーをサポートする get_llm() ファクトリーを提供します:

"""Configuration management for the customer health agent.""" import os from dotenv import load_dotenv load_dotenv(override=True) # CData Connect AI CDATA_EMAIL = os.getenv("CDATA_EMAIL") CDATA_PAT = os.getenv("CDATA_PAT") MCP_ENDPOINT = "https://mcp.cloud.cdata.com/mcp" # Optional: force a specific catalog for demos CDATA_CATALOG = os.getenv("CDATA_CATALOG") # LLM configuration LLM_PROVIDER = os.getenv("LLM_PROVIDER", "openai") LLM_MODEL = os.getenv("LLM_MODEL", "gpt-4o") def get_llm(temperature=0, model_override=None): """Factory function to create an LLM instance based on LLM_PROVIDER.""" provider = LLM_PROVIDER.lower() model = model_override or LLM_MODEL if provider == "openai": from langchain_openai import ChatOpenAI return ChatOpenAI(model=model, temperature=temperature) elif provider == "anthropic": from langchain_anthropic import ChatAnthropic return ChatAnthropic(model=model, temperature=temperature) elif provider == "google": from langchain_google_genai import ChatGoogleGenerativeAI return ChatGoogleGenerativeAI(model=model, temperature=temperature) elif provider == "ollama": from langchain_ollama import ChatOllama return ChatOllama(model=model, temperature=temperature)

get_llm() ファクトリーは遅延インポートを使用するため、使用するプロバイダーのパッケージのみが必要です。.env ファイルで LLM_PROVIDERLLM_MODEL を設定してプロバイダーを切り替えられます。

4.2 MCP ツール

5 つの @tool デコレータ付き関数が CData Connect AI の MCP エンドポイントをラップします。共有の requests.Session が認証を処理します:

import base64 import json import requests from langchain_core.tools import tool from config import CDATA_EMAIL, CDATA_PAT, MCP_ENDPOINT, CDATA_CATALOG # Shared session with Basic Auth _session = requests.Session() _credentials = f"{CDATA_EMAIL}:{CDATA_PAT}" _encoded = base64.b64encode(_credentials.encode()).decode() _session.headers.update({ "Authorization": f"Basic {_encoded}", "Content-Type": "application/json", "Accept": "application/json, text/event-stream", }) def _call_mcp(method, params): """Send a JSON-RPC 2.0 request to the MCP endpoint.""" payload = {"jsonrpc": "2.0", "id": 1, "method": method, "params": params} resp = _session.post(MCP_ENDPOINT, json=payload, timeout=60, stream=True) # Parse SSE response (data: {...}) for line in resp.text.split(" "): if line.startswith("data: "): return json.loads(line[6:]).get("result", {}) @tool def get_catalogs() -> str: """List all available data source connections (catalogs).""" if CDATA_CATALOG: return f"Available catalogs: - {CDATA_CATALOG}" result = _call_mcp("tools/call", {"name": "getCatalogs", "arguments": {}}) return _extract_text(result) @tool def get_tables(catalog_name: str, schema_name: str) -> str: """List tables in a catalog and schema.""" result = _call_mcp("tools/call", { "name": "getTables", "arguments": {"catalogName": catalog_name, "schemaName": schema_name} }) return _extract_text(result) @tool def query_data(sql_query: str) -> str: """Execute a SQL SELECT query. Use [Catalog].[Schema].[Table] format.""" result = _call_mcp("tools/call", {"name": "queryData", "arguments": {"query": sql_query}}) return _extract_text(result)

ReAct エージェントがこれらのツールを自律的に呼び出します。CDATA_CATALOG 環境変数を設定すると、デモ用にカタログ検出をスキップし、指定されたカタログ名のみを返します。

4.3 スキーマキャッシュ

schema_cache.py モジュールは、検出されたスキーマメタデータを ~/.cache/langgraph-health/schema.json にキャッシュします(設定可能な TTL、デフォルト 24 時間):

import json, time from pathlib import Path from config import SCHEMA_CACHE_TTL CACHE_FILE = Path.home() / ".cache" / "langgraph-health" / "schema.json" def is_valid(): """Check if cache exists and is within TTL.""" if not CACHE_FILE.exists(): return False return (time.time() - CACHE_FILE.stat().st_mtime) < SCHEMA_CACHE_TTL def load(): """Load cached schema data.""" return json.loads(CACHE_FILE.read_text()) def save(schema_data): """Save schema to cache.""" CACHE_FILE.parent.mkdir(parents=True, exist_ok=True) CACHE_FILE.write_text(json.dumps(schema_data, indent=2))

キャッシュが有効な場合、Gatherer エージェントはキャッシュされたスキーマをシステムプロンプトに挿入し、検出をスキップして即座にクエリを開始できます。--refresh-schema を使用するとスキーマの再検出を強制できます。

4.4 LangGraph ワークフロー

エージェントは LangGraph の StateGraph で構築された 3 ノードのパイプラインを使用します:

  gather (ReAct Agent) ──> analyze (LLM Node) ──> render (Deterministic Node)
       |                   |                   |
  Discover schema,   Produce structured   Fill HTML template,
  query data via     JSON assessment      save styled brief
  MCP tool calls     (score, signals,     to output/
                      recommendations)
from langgraph.graph import StateGraph from state import AgentState from agents.gatherer import gather_node from agents.analyst import analyze_node from agents.renderer import render_node # Extensible pipeline -- add your agents to this list PIPELINE = [ ("gather", gather_node), ("analyze", analyze_node), ("render", render_node), ] def build_graph(): """Build and compile the LangGraph workflow.""" graph = StateGraph(AgentState) for name, func in PIPELINE: graph.add_node(name, func) graph.set_entry_point(PIPELINE[0][0]) for i in range(len(PIPELINE) - 1): graph.add_edge(PIPELINE[i][0], PIPELINE[i + 1][0]) return graph.compile()

PIPELINE リストを使用することで、エージェントの追加、削除、並べ替えが簡単にできます。各ノードは共有の AgentState ディクショナリから読み書きを行います。

Gatherer ノードは LangGraph の create_react_agent を使用して ReAct ループを作成します:

from langgraph.prebuilt import create_react_agent from config import get_llm, CDATA_CATALOG from mcp_tools import get_catalogs, get_schemas, get_tables, get_columns, query_data import schema_cache TOOLS = [get_catalogs, get_schemas, get_tables, get_columns, query_data] def gather_node(state): """ReAct data gatherer -- discovers schemas and queries data.""" sys_prompt = "You are a data gathering agent..." # Inject cached schema if available if schema_cache.is_valid(): sys_prompt += f" Cached schema: {json.dumps(schema_cache.load())}" llm = get_llm() agent = create_react_agent(llm, TOOLS, prompt=sys_prompt) result = agent.invoke({"messages": [("user", state["user_prompt"])]}) return {"gathered_data": result["messages"][-1].content}

ReAct エージェントは、接続されたデータソースに適応しながら、どのツールをどの順序で呼び出すかを自律的に判断します。

4.5 ロガー

logger.py モジュールは、カスタムフォーマットと実行統計を備えた軽量ロガーを提供します。--verbose を使用すると、MCP 呼び出しやタイミングを含む詳細な出力が表示されます:

[gatherer] 14:32:01 Schema cache hit
[analyst] 14:32:05 Analyzing gathered data
[renderer] 14:32:08 Brief saved to output/20260224_143208_premium_auto_health_brief.html
[summary] 14:32:08 --- Run Summary ---
[summary] 14:32:08 LLM calls: 4
[summary] 14:32:08 MCP calls: 12
[summary] 14:32:08 Total time: 7.23s

ステップ 5: エージェントの実行

5.1 インタラクティブランナー(初回推奨)

最も簡単に始められるのはインタラクティブランナーです。認証情報の設定、LLM プロバイダーの選択、エージェントの実行をメニュー形式のインターフェースでガイドします:

python run.py

ランナーには 5 つのオプションがあります:

  1. セットアップウィザード — CData の認証情報を設定し、LLM プロバイダー(OpenAI、Gemini、DeepSeek)とモデルを選択
  2. ヘルス分析の実行 — 特定のアカウントを分析(サンプルアカウントの提案あり)
  3. 自由形式のクエリを実行 — データに関する任意の質問が可能(サンプルクエリの提案あり)
  4. スキーマキャッシュの更新 — キャッシュされたスキーマをクリアして再検出
  5. セットアップの確認 — 認証情報の検証、MCP 接続のテスト、依存関係の確認

rich ライブラリがインストールされていない場合は、初回実行時に自動インストールされます。

5.2 CLI から直接実行: アカウントヘルス分析

コマンドラインからエージェントを直接実行することもできます:

python src/main.py --account "Premium Auto Group Europe"

想定される出力:

  • ReAct エージェントがスキーマを検出し、アカウント、販売機会、チケットをクエリ
  • Analyst ノードがシグナルと推奨事項を含むヘルススコアを生成
  • HTML ブリーフが output/TIMESTAMP_AccountName_health_brief.html に保存

5.3 CLI から直接実行: 自由形式のクエリ

自然な英語で任意の質問ができます:

python src/main.py "Show me the top 10 customers by revenue"

ReAct エージェントが、どのテーブルとクエリを実行すべきかを自動的に判断します。複数のテーブルにまたがる複雑な質問も可能です:

python src/main.py "Which industries have the most high-priority open tickets?" --verbose

5.4 Verbose モード

--verbose を追加すると、ツール呼び出しやタイミングを含むエージェントの詳細な出力が表示されます:

python src/main.py --account "Premium Auto Group Europe" --verbose

以下は、エージェントが生成したヘルスブリーフのサンプルです:


ステップ 6: クエリの例

データを探索するためのクエリ例をいくつか紹介します:

カテゴリ クエリ
収益 python src/main.py "Show me the top 10 customers by annual revenue"
業界 python src/main.py "All customers in the energy sector"
パイプライン python src/main.py "How many open opportunities do we have and total value"
サポート python src/main.py "Show all high priority open tickets"
セグメンテーション python src/main.py "Customer count by industry"
アカウントヘルス python src/main.py --account "Premium Auto Group Europe"

ステップ 7: 利用可能な MCP ツール

AI エージェントは以下の CData Connect AI ツールにアクセスできます:

ツール説明
getCatalogs利用可能なデータソース接続を一覧表示
getSchemas特定のカタログのスキーマを取得
getTablesスキーマ内のテーブルを取得
getColumnsテーブルのカラムメタデータを取得
queryDataSQL クエリを実行
getProceduresストアドプロシージャを一覧表示
getProcedureParametersプロシージャのパラメータ詳細を取得
executeProcedureストアドプロシージャを実行

トラブルシューティング

クエリが結果を返さない

  • CData Connect AI の接続名が正しいことを確認してください
  • Connect AI のデータエクスプローラーでテーブル名とカラム名が存在するか確認してください
  • まずシンプルなクエリを試してみてください: python src/main.py "Show me all customers"
  • --verbose でエージェントが生成する SQL クエリを確認してください

LLM API エラー

  • OPENAI_API_KEY(または同等のキー)が有効で、利用可能なクレジットがあることを確認してください
  • このエージェントは GPT-4o または Claude Sonnet で最も効果的に動作します。.envLLM_MODEL で設定できます
  • カスタム API エンドポイントを使用する場合は、.envOPENAI_API_BASE を設定してください

認証エラー

  • .env の CData メールアドレスと PAT が正しいことを確認してください
  • PAT が期限切れになっていないことを確認してください
  • Connect AI アカウントがアクティブであることを確認してください

エージェントがループしすぎる

  • .envCDATA_CATALOG を設定し、カタログ検出をスキップしてエージェントのスコープを絞ります
  • MAX_ITERATIONS(デフォルト: 15)を減らしてツール呼び出しのループを制限します
  • --verbose を使用して、各ステップでエージェントが何を行っているか確認してください

ツール呼び出しの失敗

  • CData Connect AI インスタンスに少なくとも 1 つのアクティブなデータソースが接続されていることを確認してください
  • 完全修飾テーブル名を使用してください: [Catalog].[Schema].[Table]
  • Connect AI のデータエクスプローラーでカラム名が存在するか確認してください

次のステップ

顧客ヘルスエージェントが動作するようになったら、次のことを試してみましょう:

  • エージェントパイプラインの拡張: graph.pyPIPELINE リストにカスタムエージェントを追加できます。例えば、Analyst と Renderer の間に競合分析ノード、解約予測エージェント、財務モデリングステップなどを追加できます。
  • より多くのデータソースを接続: CData Connect AI ダッシュボードから、Salesforce、HubSpot、Snowflake など 350 以上のサポートされたソースを追加できます。ReAct エージェントがスキーマを自動的に検出します。
  • LLM プロバイダーの切り替え: .envLLM_PROVIDERLLM_MODEL を変更して、Anthropic Claude、Google Gemini、または Ollama 経由のローカルモデルを使用できます。
  • スケジュール実行の追加: プロアクティブな顧客モニタリングのために、ヘルス分析をスケジュールに従って自動実行できます。
  • Human-in-the-Loop の追加: LangGraph は割り込みポイントをサポートしており、エージェントのアクションを人間がレビュー・承認してから次に進めることができます。
  • 高度なパターンを探索: LangGraph ドキュメントでは、サイクル、ブランチ、並列実行、マルチエージェント連携について詳しく解説されています。

リソース


CData Connect AI を始めよう

AI 搭載のデータアプリケーションを構築する準備はできましたか?CData Connect AI は、AI アプリケーションから 350 以上のエンタープライズデータソースへのガバナンスされたセキュアなアクセスを提供します。 LangGraph エージェントから、Salesforce、Snowflake、HubSpot、Google スプレッドシート、データベースなどのライブビジネスデータを、単一の MCP インターフェースを通じてクエリできます。

無料トライアルにサインアップして、今日からインテリジェントな顧客ヘルスエージェントの構築を始めましょう!