【MCP Server】CrewAI エージェントからSplunk のデータにリアルタイム接続する方法

加藤龍彦
加藤龍彦
デジタルマーケティング
CData Connect AI のリモートMCP サーバー を活用してCrewAI エージェントから Splunk に安全にアクセスして、データの読み取りやアクション実行を可能にします。

CrewAI は、マルチエージェントシステムを構築するためのオープンソースのPython フレームワークです。CData Connect AI のリモートMCP と組み合わせることで、自然言語を使ったクエリでSplunk とリアルタイムに対話するインテリジェントなエージェントを構築できます。この記事では、Connect AI リモートMCP を使用したSplunk への接続方法と、CrewAI フレームワークと OpenAI を活用したシンプルなコンソールベースのチャットボットエージェントの構成方法をご紹介します。

CData Connect AI は、Splunk のデータに接続するための専用クラウド間インターフェースを提供します。CData Connect AI Remote MCP Server により、Google ADK エージェントと Splunkの間でセキュアな通信が可能になります。これにより、ネイティブ対応データベースへのデータレプリケーションを必要とせずに、エージェントから Splunk のデータの読み取りや操作を実行できます。CData Connect AIは最適化されたデータ処理機能を備えており、フィルタや JOIN を含むサポート対象のすべての SQL 操作を効率的に Splunkへ直接送信します。サーバーサイド処理を活用することで、要求されたSplunk のデータ を迅速に取得できます。

ステップ 1: CrewAI 用のSplunk 接続を設定

それでは早速、CrewAI エージェントからSplunk への接続を設定していきましょう。CrewAI エージェントから Splunk と対話するには、まずCData Connect AI でSplunk 接続を作成して構成します。

  1. Connect AI にログインし、「Connections」をクリックして「 Add Connection」をクリックします
  2. 「Add Connection」パネルから「Splunk」を選択します
  3. Splunk に接続するために必要な認証情報を入力しましょう。

    リクエストを認証するには、 User、Password、およびURL プロパティを有効なSplunk クレデンシャルに設定します。デフォルトでは、CData 製品はポート8089 でリクエストを行います。

    デフォルトでは、CData 製品はサーバーとのTLS/SSL ネゴシエーションを試みます。TLS/SSL 設定について詳しくは、ヘルプドキュメントの「高度な設定」を参照してください。

    「Create & Test」をクリックします
  4. 「Add Splunk Connection」ページの「Permissions」タブに移動し、ユーザーベースの権限を更新します。

パーソナルアクセストークンを追加する

パーソナルアクセストークン (PAT) は、CrewAI エージェントから Connect AI への接続を認証するために使用します。アクセスの粒度を維持するために、サービスごとに個別の PAT を作成することをおすすめします。

  1. Connect AI アプリの右上にある歯車アイコン () をクリックして、設定ページを開きます。
  2. 「Settings」ページで「Access Tokens」セクションに移動し、「 Create PAT」をクリックします。
  3. PAT に名前を付けて「Create」をクリックします。
  4. パーソナルアクセストークンは作成時にのみ表示されます。必ずコピーして、今後の使用のために安全に保管してください。

これで、CrewAI からSplunk に接続する準備が整いました!

ステップ 2: CrewAI 環境をセットアップ

続いて、CrewAI エージェントを構成する前に必要な依存関係をインストールして環境を設定していきましょう。

CData Connect AI のMCP Server を設定する

  1. 「cdata-mcp-crew-agent」という名前のフォルダを作成します。
  2. 「cdata-mcp-crew-agent」フォルダ内に、拡張子が「.env」のファイルを作成します。
  3. 以下の内容をコピーして貼り付けます。「CONNECT_AI_EMAIL」を CData Connect AI のユーザー名に、「CONNECT_AI_PAT」を先ほど取得した PAT に置き換えてください。OpenAI のAPI キーは https://platform.openai.com/ で確認できます。
    MCP_SERVER_URL=https://mcp.cloud.cdata.com/mcp
    MCP_USERNAME=YOUR_EMAIL
    MCP_PASSWORD=YOUR_PAT
    OPENAI_API_KEY=YOUR_OPENAI_API_KEY
        

CrewAI ライブラリをインストールする

ターミナルでpip install crewai crewai-tools python-dotenv requests を実行します。

CrewAI エージェントを作成して実行する

  1. 「crew-agent.py」というファイルを作成します。これが CrewAI エージェントになります。
  2. 「crew-agent.py」ファイルを構成して、CData Connect AI MCP Server を使用できるようにします。

    主な機能

    このPython スクリプトは、CData Connect AI の MCP (Model Context Protocol) Server に接続するインテリジェントな CrewAI エージェントを作成します。このエージェントは、接続されたデータソースと対話するための自然言語インターフェースを提供し、SQL を書くことなくデータベースのクエリ、スキーマの探索、ストアドプロシージャの実行が可能になります。

    ツールクラス

    このスクリプトは、データ操作のさまざまなステップを処理する8つの専用ツールを実装しています。

    • GetCatalogsTool: 利用可能なすべてのデータソースとデータベースをリスト表示
    • GetSchemasTool: 特定のカタログ内のデータベーススキーマを取得
    • GetTablesTool: カタログ/スキーマのフィルタリングオプション付きでテーブルを検索
    • GetColumnsTool: 列のメタデータと構造情報を取得
    • GetProceduresTool: 利用可能なストアドプロシージャをリスト表示
    • GetProcedureParametersTool: プロシージャのパラメータ要件を詳細に表示
    • ExecuteProcedureTool: パラメータ付きでストアドプロシージャを実行
    • QueryDataTool: データソースに対してSQL クエリを実行

    主要なコンポーネント

    実装は3つの主要なコンポーネントで構成されています:

    • BaseCDataTool: MCP Serverとの通信を管理し、Connect AI の認証情報を使用して認証を処理し、Server-Sent Events (SSE) レスポンスを解析する基底クラス
    • CDataConnectAgent: OpenAI のGPT-4 モデルを使用して自然言語クエリを処理し、ユーザーリクエストを満たすために適切なツールをインテリジェントに選択するメインエージェントクラス
    • インタラクティブコンソール: データの探索とクエリのための会話型チャットボット体験を提供するコマンドラインインターフェース

    以下が完全な実装です:

    import os
    import warnings
    from typing import Dict, Any, Optional
    from dotenv import load_dotenv
    from CrewAI import Agent, Task, Crew
    from CrewAI.tools import BaseTool
    import requests
    import json
    import base64
    
    # 警告を抑制
    warnings.filterwarnings('ignore')
    
    # 環境変数を読み込む
    load_dotenv()
    
    class BaseCDataTool(BaseTool):
        """CData Connect AI MCP ツールの基底クラス"""
        
        def __init__(self, **kwargs):
            super().__init__(**kwargs)
            self._server_url = os.getenv('MCP_SERVER_URL', '').rstrip('/')
            self._username = os.getenv('MCP_USERNAME', '')
            self._password = os.getenv('MCP_PASSWORD', '')
            
            if not all([self._server_url, self._username, self._password]):
                raise ValueError("環境変数に MCP_SERVER_URL、MCP_USERNAME、MCP_PASSWORD を設定する必要があります")
            
            # Basic 認証ヘッダーを作成
            credentials = base64.b64encode(f"{self._username}:{self._password}".encode()).decode()
            self._headers = {
                'Authorization': f'Basic {credentials}',
                'Content-Type': 'application/json',
                'Accept': 'application/json, text/event-stream'
            }
        
        def _parse_sse_response(self, response_text: str) -> Dict[str, Any]:
            """Server-Sent Events レスポンス形式を解析"""
            lines = response_text.strip().split('
    ')
            data_line = None
            
            for line in lines:
                if line.startswith('data: '):
                    data_line = line[6:]  # 'data: ' プレフィックスを削除
                    break
            
            if data_line:
                try:
                    return json.loads(data_line)
                except json.JSONDecodeError as e:
                    return {"error": f"SSE データの JSON が無効です: {e}"}
            
            return {"error": "SSE レスポンスにデータが見つかりません"}
        
        def _make_mcp_request(self, tool_name: str, arguments: Dict[str, Any]) -> Dict[str, Any]:
            """Connect AI サーバーに MCP リクエストを送信"""
            payload = {
                "jsonrpc": "2.0",
                "id": 1,
                "method": "tools/call",
                "params": {
                    "name": tool_name,
                    "arguments": arguments
                }
            }
            
            try:
                response = requests.post(self._server_url, json=payload, headers=self._headers)
                
                # 解析する前にレスポンスの内容を確認
                if not response.text.strip():
                    return {"error": "サーバーからの応答が空です"}
                
                response.raise_for_status()
                
                # Server-Sent Events 形式を解析
                result = self._parse_sse_response(response.text)
                
                if "error" in result:
                    if "error" in result and isinstance(result["error"], dict):
                        return {"error": result["error"]["message"]}
                    return result
                
                return result.get("result", {})
            
            except requests.exceptions.RequestException as e:
                return {"error": f"リクエストが失敗しました: {str(e)}"}
    
    class GetCatalogsTool(BaseCDataTool):
        """利用可能なデータカタログ/データベースを取得するツール"""
        name: str = "Get Data Catalogs"
        description: str = "CData Connect AI で利用可能なデータソース、データベース、またはカタログのリストを取得"
        
        def _run(self, query: str = "") -> str:
            result = self._make_mcp_request("getCatalogs", {})
            
            if "error" in result:
                return f"カタログの取得エラー: {result['error']}"
            
            return json.dumps(result, indent=2)
    
    class GetSchemasTool(BaseCDataTool):
        """カタログ内のスキーマを取得するツール"""
        name: str = "Get Schemas"
        description: str = "特定のカタログ内のデータベーススキーマを取得。カタログ名が必要です。"
        
        def _run(self, catalog_name: str) -> str:
            if not catalog_name:
                return "エラー: catalog_name が必要です"
                
            result = self._make_mcp_request("getSchemas", {"catalogName": catalog_name})
            
            if "error" in result:
                return f"スキーマの取得エラー: {result['error']}"
            
            return json.dumps(result, indent=2)
    
    class GetTablesTool(BaseCDataTool):
        """カタログ/スキーマ内のテーブルを取得するツール"""
        name: str = "Get Tables"
        description: str = "データベース内のテーブルを取得。カタログやスキーマでフィルタリングすることもできます。"
        
        def _run(self, catalog_name: str = "", schema_name: str = "") -> str:
            params = {}
            if catalog_name:
                params["catalogName"] = catalog_name
            if schema_name:
                params["schemaName"] = schema_name
                
            result = self._make_mcp_request("getTables", params)
            
            if "error" in result:
                return f"テーブルの取得エラー: {result['error']}"
            
            return json.dumps(result, indent=2)
    
    class GetColumnsTool(BaseCDataTool):
        """列情報を取得するツール"""
        name: str = "Get Columns"
        description: str = "テーブルの列情報を取得。カタログ、スキーマ、テーブル名でフィルタリングできます。"
        
        def _run(self, catalog_name: str = "", schema_name: str = "", table_name: str = "") -> str:
            params = {}
            if catalog_name:
                params["catalogName"] = catalog_name
            if schema_name:
                params["schemaName"] = schema_name
            if table_name:
                params["tableName"] = table_name
                
            result = self._make_mcp_request("getColumns", params)
            
            if "error" in result:
                return f"列の取得エラー: {result['error']}"
            
            return json.dumps(result, indent=2)
    
    class GetProceduresTool(BaseCDataTool):
        """ストアドプロシージャを取得するツール"""
        name: str = "Get Stored Procedures"
        description: str = "特定のカタログとスキーマ内のストアドプロシージャを取得。カタログ名とスキーマ名の両方が必要です。"
        
        def _run(self, catalog_name: str, schema_name: str) -> str:
            if not catalog_name or not schema_name:
                return "エラー: catalog_name と schema_name の両方が必要です"
                
            params = {
                "catalogName": catalog_name,
                "schemaName": schema_name
            }
            result = self._make_mcp_request("getProcedures", params)
            
            if "error" in result:
                return f"プロシージャの取得エラー: {result['error']}"
            
            return json.dumps(result, indent=2)
    
    class GetProcedureParametersTool(BaseCDataTool):
        """ストアドプロシージャのパラメータを取得するツール"""
        name: str = "Get Procedure Parameters"
        description: str = "特定のストアドプロシージャのパラメータ情報を取得。カタログ、スキーマ、プロシージャ名が必要です。"
        
        def _run(self, catalog_name: str, schema_name: str, procedure_name: str) -> str:
            if not all([catalog_name, schema_name, procedure_name]):
                return "エラー: catalog_name、schema_name、procedure_name のすべてが必要です"
                
            params = {
                "catalogName": catalog_name,
                "schemaName": schema_name,
                "procedureName": procedure_name
            }
            result = self._make_mcp_request("getProcedureParameters", params)
            
            if "error" in result:
                return f"プロシージャパラメータの取得エラー: {result['error']}"
            
            return json.dumps(result, indent=2)
    
    class ExecuteProcedureTool(BaseCDataTool):
        """ストアドプロシージャを実行するツール"""
        name: str = "Execute Stored Procedure"
        description: str = "ストアドプロシージャを実行。カタログ、スキーマ、プロシージャ名が必要です。オプションでパラメータをキーと値のペアとして指定できます。"
        
        def _run(self, catalog_name: str, schema_name: str, procedure_name: str, parameters: str = "") -> str:
            if not all([catalog_name, schema_name, procedure_name]):
                return "エラー: catalog_name、schema_name、procedure_name のすべてが必要です"
                
            params = {
                "catalogName": catalog_name,
                "schemaName": schema_name,
                "procedureName": procedure_name
            }
            
            if parameters:
                # パラメータ文字列を辞書に解析
                try:
                    param_dict = json.loads(parameters)
                    params["parameters"] = param_dict
                except json.JSONDecodeError:
                    return f"エラー: パラメータは有効な JSON である必要があります。入力値: {parameters}"
                
            result = self._make_mcp_request("executeProcedure", params)
            
            if "error" in result:
                return f"プロシージャ実行エラー: {result['error']}"
            
            return json.dumps(result, indent=2)
    
    class QueryDataTool(BaseCDataTool):
        """SQL クエリを実行するツール"""
        name: str = "Execute SQL Query"
        description: str = "データソースに対して SQL クエリを実行。SQL クエリを入力として指定します。オプションでパラメータとデフォルトスキーマを指定できます。"
        
        def _run(self, query: str, parameters: str = "", default_schema: str = "") -> str:
            if not query:
                return "エラー: SQL クエリが必要です"
                
            params = {"query": query}
            
            if parameters:
                try:
                    param_dict = json.loads(parameters)
                    params["parameters"] = param_dict
                except json.JSONDecodeError:
                    return f"エラー: パラメータは有効な JSON である必要があります。入力値: {parameters}"
            
            if default_schema:
                params["defaultSchema"] = default_schema
                
            result = self._make_mcp_request("queryData", params)
            
            if "error" in result:
                return f"クエリ実行エラー: {result['error']}"
            
            return json.dumps(result, indent=2)
    
    class CDataConnectAgent:
        """CData Connect AI 統合のメインエージェントクラス"""
        
        def __init__(self):
            # OpenAI を構成
            self.openai_api_key = os.getenv('OPENAI_API_KEY')
            if not self.openai_api_key:
                raise ValueError("環境変数に OPENAI_API_KEY を設定する必要があります")
            
            os.environ["OPENAI_API_KEY"] = self.openai_api_key
            os.environ["OPENAI_MODEL_NAME"] = "gpt-4"
            
            # すべての CData Connect ツールを初期化
            self.tools = [
                GetCatalogsTool(),
                GetSchemasTool(),
                GetTablesTool(),
                GetColumnsTool(),
                GetProceduresTool(),
                GetProcedureParametersTool(),
                ExecuteProcedureTool(),
                QueryDataTool()
            ]
            
            # エージェントを作成
            self.agent = Agent(
                role="データアナリスト兼クエリスペシャリスト",
                goal="自然言語を使って CData Connect AI からデータをクエリし、分析するユーザーをサポート",
                backstory=(
                    "あなたは CData Connect AI を専門とするエキスパートデータアナリストです。"
                    "ユーザーがデータソースを探索し、テーブル構造を理解し、"
                    "クエリを実行し、ストアドプロシージャを操作して必要な情報を取得できるようサポートします。"
                    "自然言語のリクエストを、利用可能なツールを使用した適切なアクションに変換できます。"
                    "CData Connect AI を通じて、Salesforce、SharePoint、QuickBooks など、さまざまなデータコネクタにアクセスできます。"
                    "ユーザーがデータについて質問したときは、インテリジェントに適切なツールを選択してサポートします。"
                    "常に実行内容を説明し、結果について役立つコンテキストを提供します。"
                ),
                verbose=True,
                allow_delegation=False,
                tools=self.tools
            )
        
        def process_query(self, user_query: str) -> str:
            """ユーザークエリを処理して応答を返す"""
            # ユーザーのクエリに対するタスクを作成
            task = Task(
                description=f"""
                CData Connect AI のデータに関するこのユーザークエリを処理してください: "{user_query}"
                
                クエリに答えるために、以下のツールにアクセスできます:
                1. Get Data Catalogs - 利用可能なデータソースを確認
                2. Get Schemas - カタログ内のスキーマを確認
                3. Get Tables - カタログ/スキーマ内のテーブルを確認
                4. Get Columns - 列の構造を確認
                5. Get Stored Procedures - 利用可能なプロシージャを確認
                6. Get Procedure Parameters - プロシージャの要件を理解
                7. Execute Stored Procedure - プロシージャを実行
                8. Execute SQL Query - SQL クエリを実行
                
                ユーザーの質問内容に基づいて適切なツールを選択してください。
                利用可能なデータソースを確認したい場合は、Get Data Catalogs を使用します。
                データをクエリしたい場合は、Execute SQL Query を使用します。
                テーブル構造について質問している場合は、Get Tables または Get Columns を使用します。
                ストアドプロシージャについて言及している場合は、プロシージャ関連のツールを使用します。
                
                常に実行内容を説明し、役立つフォーマットされた応答を提供してください。
                """,
                expected_output="ユーザーのデータに関するクエリに対する役立つ情報を含む応答",
                agent=self.agent
            )
            
            # このエージェントとタスクだけでクルーを作成
            crew = Crew(
                agents=[self.agent],
                tasks=[task],
                verbose=False
            )
            
            # タスクを実行
            result = crew.kickoff()
            return str(result)
    
    def main():
        """コンソールチャットボットを実行するメイン関数"""
        print("CData Connect AI AI アシスタント")
        print("=" * 50)
        print("CData Connect AI のデータのクエリと探索をサポートします!")
        print("以下について質問できます:")
        print("- 利用可能なデータソースとカタログ")
        print("- データベースのスキーマとテーブル")
        print("- 列の構造とテーブル情報")
        print("- ストアドプロシージャとそのパラメータ")
        print("- SQL クエリとストアドプロシージャの実行")
        print("- データの分析")
        print("
    終了するには 'quit'、'exit'、'bye' と入力してください。")
        print("=" * 50)
    
        try:
            # エージェントを初期化
            agent = CDataConnectAgent()
    
            while True:
                # ユーザー入力を取得
                user_input = input("
    データについて質問してください: ").strip()
    
                # 終了条件を確認
                if user_input.lower() in ['quit', 'exit', 'bye', 'q']:
                    print("
    さようなら! 楽しいデータ探索を!")
                    break
    
                if not user_input:
                    print("クエリまたは質問を入力してください。")
                    continue
    
                try:
                    print("
    クエリを処理中...")
                    # クエリを処理
                    response = agent.process_query(user_input)
                    print(f"
    応答:
    {response}")
    
                except Exception as e:
                    print(f"
    クエリ処理エラー: {str(e)}")
                    print("質問を言い換えるか、接続を確認してください。")
    
        except Exception as e:
            print(f"
    エージェントの初期化に失敗しました: {str(e)}")
            print("環境変数と接続設定を確認してください。")
    
    if __name__ == "__main__":
        main()
        
  3. ターミナルで「python crew-agent.py」を実行します。出力にタスクの結果が表示されます:
  4. 自然言語クエリを通じてSplunk との対話を開始しましょう。これで、エージェントは CData Connect AI MCP Server 経由で Splunk にアクセスできるようになりました。

ステップ 3: Splunk データにアクセスできるインテリジェントエージェントを構築

CrewAI エージェントを作成してCData Connect AI に接続できたので、自然言語を使用してSplunk と対話する高度なエージェントや、マルチエージェントシステムの構築が可能になります。MCP 統合により、エージェントに強力なデータアクセス機能が提供されます。

エージェントで利用可能なMCP ツール

CrewAI エージェントは、以下のCData Connect AI MCP ツールにアクセスできます。

  • queryData: 接続されたデータソースに対して SQL クエリを実行
  • execData: ストアドプロシージャを実行
  • getCatalogs: 利用可能なデータベース接続を取得
  • getSchemas: 特定のカタログのデータベーススキーマをリスト表示
  • getTables: スキーマ内のテーブルをリスト表示
  • getColumns: 特定のテーブルの列情報を取得
  • getPrimaryKeys: 主キー情報を取得
  • getIndexes: テーブルのインデックス情報を取得
  • getProcedures: 利用可能なストアドプロシージャをリスト表示

ユースケースの例

Splunk データへのアクセスが可能なCrewAI エージェントで実現できることの例をいくつかご紹介します。

  • データ分析エージェント: Splunk のトレンド、パターン、異常を分析するエージェントを構築
  • レポート生成エージェント: 自然言語のリクエストに基づいてカスタムレポートを生成するエージェントを作成
  • データ品質エージェント: リアルタイムでデータ品質を監視および検証するエージェントを開発
  • BI エージェント: 複数のデータソースをクエリして複雑なビジネス上の質問に答えるエージェントを構築
  • 自動化ワークフローエージェント: Splunk のデータ条件に基づいてアクションをトリガーするエージェントを作成

CrewAI エージェントは、これらの自然言語クエリを自動的に変換し、CData Connect AI MCP Server を通じて Splunk データに対して実行します。これにより、ユーザーが複雑な SQL を記述したり、基礎となるデータ構造を理解したりすることなく、リアルタイムのインサイトを提供できます。

CData Connect AI でビジネスシステムのデータ活用を今すぐスタート

いかがでしたか?CrewAI からSplunk へのデータ接続が10分もかからずに完了したのではないでしょうか。業務に使えそう、と感じてくださった方は、14日間の無償トライアルでAI ツールからビジネスシステムへのリアルタイムデータ接続をぜひお試しください。

はじめる準備はできましたか?

CData Connect AI の詳細、または無料トライアルにお申し込みください:

無料トライアル お問い合わせ