LlamaIndex を使って Python でSpark データに自然言語でクエリを実行する方法

Jerod Johnson
Jerod Johnson
Senior Technology Evangelist
Python で LlamaIndex を使用してリアルタイムのSpark のデータに自然言語でクエリを実行。

CData Python Connector for Apache Spark を使用して、Spark からリアルタイムデータへのクエリを開始しましょう。LlamaIndex と AI の力を活用して、複雑な SQL クエリを書くことなく、シンプルな自然言語でインサイトを取得できます。意思決定を強化するリアルタイムデータアクセスのメリットを享受しながら、既存の Python アプリケーションと簡単に統合できます。

CData Python Connector は、組み込みの最適化されたデータ処理により、Python でリアルタイムのSpark のデータを操作する際に比類のないパフォーマンスを提供します。Python から複雑な SQL クエリを発行すると、ドライバーはフィルターや集計などのサポートされた SQL 操作を直接 Spark にプッシュし、埋め込み SQL エンジンを使用してサポートされていない操作(多くの場合 SQL 関数や JOIN 操作)をクライアント側で処理します。

トレンド分析、レポート作成、データの可視化など、CData Python Connector を使用すれば、リアルタイムのデータソースの可能性を最大限に活用できます。

概要

LlamaIndex を使用して、CData Python Connector forSpark のデータでリアルタイムデータにクエリを実行する方法の概要です:

  • ロギング、データベース接続、NLP に必要な Python、CData、LlamaIndex モジュールをインポートします。
  • アプリケーションからの API リクエストを認証するための OpenAI API キーを取得します。
  • CData Python Connector を使用してリアルタイムのSpark のデータに接続します。
  • OpenAI を初期化し、自然言語クエリを処理するための SQLDatabase と NLSQLTableQueryEngine のインスタンスを作成します。
  • クエリエンジンと特定のデータベースインスタンスを作成します。
  • 自然言語クエリ(例:「最も稼いでいる従業員は誰ですか?」)を実行して、データベースから構造化されたレスポンスを取得します。
  • 取得したデータを分析してインサイトを得て、データドリブンな意思決定に役立てます。

必要なモジュールのインポート

CData、データベース接続、自然言語クエリに必要なモジュールをインポートします。

import os
import logging
import sys

# ロギングの設定
logging.basicConfig(stream=sys.stdout, level=logging.INFO, force=True)
logging.getLogger().addHandler(logging.StreamHandler(stream=sys.stdout))

# CData と LlamaIndex に必要なモジュールをインポート
import cdata.sparksql as mod
from sqlalchemy import create_engine
from llama_index.core.query_engine import NLSQLTableQueryEngine
from llama_index.core import SQLDatabase
from llama_index.llms.openai import OpenAI

OpenAI API キーの設定

OpenAI の言語モデルを使用するには、API キーを環境変数として設定する必要があります。システムの環境変数で OpenAI API キーが利用可能であることを確認してください。

# 環境変数から OpenAI API キーを取得
OPENAI_API_KEY = os.environ["OPENAI_API_KEY"]

''または、コード内で直接 API キーを追加することもできます(ただし、セキュリティリスクのため、本番環境ではこの方法は推奨されません):''

# API キーを直接設定(本番使用には非推奨)
OPENAI_API_KEY = "your-api-key-here"

データベース接続の作成

次に、必要な接続プロパティを含む接続文字列を使用して、CData Connector で Spark への接続を確立します。

SparkSQL への接続

SparkSQL への接続を確立するには以下を指定します。

  • Server:SparkSQL をホストするサーバーのホスト名またはIP アドレスに設定。
  • Port:SparkSQL インスタンスへの接続用のポートに設定。
  • TransportMode:SparkSQL サーバーとの通信に使用するトランスポートモード。有効な入力値は、BINARY およびHTTP です。デフォルトではBINARY が選択されます。
  • AuthScheme:使用される認証スキーム。有効な入力値はPLAIN、LDAP、NOSASL、およびKERBEROS です。デフォルトではPLAIN が選択されます。

Databricks への接続

Databricks クラスターに接続するには、以下の説明に従ってプロパティを設定します。Note:必要な値は、「クラスター」に移動して目的のクラスターを選択し、 「Advanced Options」の下にある「JDBC/ODBC」タブを選択することで、Databricks インスタンスで見つけることができます。

  • Server:Databricks クラスターのサーバーのホスト名に設定。
  • Port:443
  • TransportMode:HTTP
  • HTTPPath:Databricks クラスターのHTTP パスに設定。
  • UseSSL:True
  • AuthScheme:PLAIN
  • User:'token' に設定。
  • Password:パーソナルアクセストークンに設定(値は、Databricks インスタンスの「ユーザー設定」ページに移動して「アクセストークン」タブを選択することで取得できます)。

Spark への接続

# CData Python Connector for Apache Spark を使用してデータベースエンジンを作成
engine = create_engine("cdata_sparksql_2:///?User=Server=127.0.0.1;")

OpenAI インスタンスの初期化

OpenAI 言語モデルのインスタンスを作成します。ここで、temperature やモデルバージョンなどのパラメータを指定できます。

# OpenAI 言語モデルインスタンスを初期化
llm = OpenAI(temperature=0.0, model="gpt-3.5-turbo")

データベースとクエリエンジンの設定

SQL データベースとクエリエンジンを設定します。NLSQLTableQueryEngine を使用すると、SQL データベースに対して自然言語クエリを実行できます。

# SQL データベースインスタンスを作成
sql_db = SQLDatabase(engine)  # すべてのテーブルを含む

# 自然言語 SQL クエリ用のクエリエンジンを初期化
query_engine = NLSQLTableQueryEngine(sql_database=sql_db)

クエリの実行

これで、リアルタイムのデータソースに対して自然言語クエリを実行できます。この例では、最も稼いでいる従業員上位 2 名をクエリします。

# クエリ文字列を定義
query_str = "Who are the top earning employees?"

# クエリエンジンからレスポンスを取得
response = query_engine.query(query_str)

# レスポンスを出力
print(response)

CData Python Connector for Apache Spark の無料 30 日間トライアルをダウンロードして、リアルタイムデータへのシームレスなクエリを始めましょう。自然言語処理の力を体験し、データから貴重なインサイトを引き出してください。

はじめる準備はできましたか?

Apache Spark Connector のコミュニティライセンスをダウンロード:

 ダウンロード

詳細:

Apache Spark Icon Apache Spark Python Connector お問い合わせ

Apache Spark へのデータ連携用のPython Connecotr ライブラリ。 pandas、SQLAlchemy、Dash、petl などの主要なPython ツールにApache Spark をシームレスに統合。