Apache Spark でLakebase のデータをSQL で操作する方法
Apache Spark は大規模データ処理のための高速エンジンです。CData JDBC Driver for Lakebase と組み合わせると、Spark はリアルタイムでLakebase のデータに連携して処理ができます。本記事では、Spark シェルに接続してLakebase をクエリする方法について解説します。
CData JDBC Driver は、最適化されたデータ処理がドライバーに組み込まれているため、リアルタイムLakebase と対話するための高いパフォーマンスを提供します。Lakebase に複雑なSQL クエリを発行すると、ドライバーはフィルタや集計など、サポートされているSQL操作を直接Lakebase にプッシュし、組込みSQL エンジンを使用してサポートされていない操作(SQL 関数やJOIN 操作)をクライアント側で処理します。組み込みの動的メタデータクエリを使用すると、ネイティブデータ型を使用してLakebase を操作して分析できます。
CData JDBC Driver for Lakebase をインストール
まずは、本記事右側のサイドバーからLakebase JDBC Driver の無償トライアルをダウンロード・インストールしてください。30日間無償で、製品版の全機能が使用できます。
Spark Shell を起動してLakebase のデータに接続
- ターミナルを開き、Spark shell でCData JDBC Driver for Lakebase JAR file をjars パラメータに設定します:
$ spark-shell --jars /CData/CData JDBC Driver for Lakebase/lib/cdata.jdbc.lakebase.jar
- Shell でJDBC URL を使ってLakebase に接続し、SQL Context load() function でテーブルを読み込みます。
Databricks Lakebase に接続するには、以下のプロパティを設定します。
- DatabricksInstance: Databricks インスタンスまたはサーバーホスト名を指定します。形式は instance-abcdef12-3456-7890-abcd-abcdef123456.database.cloud.databricks.com です。
- Server: Lakebase データベースをホストするサーバーのホスト名または IP アドレスを指定します。
- Port(オプション): Lakebase データベースをホストするサーバーのポート番号を指定します。デフォルトは 5432 です。
- Database(オプション): Lakebase サーバーへの認証後に接続するデータベースを指定します。デフォルトでは認証ユーザーのデフォルトデータベースに接続します。
OAuth クライアント認証
OAuth クライアント資格情報を使用して認証するには、サービスプリンシパルで OAuth クライアントを構成します。手順の概要は以下のとおりです。
- 新しいサービスプリンシパルを作成・構成する
- サービスプリンシパルに権限を割り当てる
- サービスプリンシパル用の OAuth シークレットを作成する
詳細については、ヘルプドキュメントの「Setting Up OAuthClient Authentication」セクションをご参照ください。
OAuth PKCE 認証
PKCE(Proof Key for Code Exchange)を使用した OAuth code タイプで認証するには、以下のプロパティを設定します。
- AuthScheme: OAuthPKCE を指定します。
- User: 認証ユーザーのユーザー ID を指定します。
詳細については、ヘルプドキュメントをご参照ください。
組み込みの接続文字列デザイナー
JDBC 接続文字列URL の作成には、Lakebase JDBC Driver にビルトインされたデザイナを使用できます。JAR ファイルをダブルクリックするか、コマンドラインでJAR ファイルを実行するとデザイナが開きます。
java -jar cdata.jdbc.lakebase.jar
接続プロパティを入力し、接続文字列をクリップボードにコピーします。
scala> val lakebase_df = spark.sqlContext.read.format("jdbc").option("url", "jdbc:lakebase:DatabricksInstance=lakebase;Server=127.0.0.1;Port=5432;Database=my_database;").option("dbtable","Orders").option("driver","cdata.jdbc.lakebase.LakebaseDriver").load() - 接続が完了し、データがロードされたら、テーブルスキーマが表示されます。
Lakebase をテンポラリーテーブルとして登録します:
scala> lakebase_df.registerTable("orders")-
データに対して、次のようなカスタムSQL クエリを実行します。
scala> lakebase_df.sqlContext.sql("SELECT ShipName, ShipCity FROM Orders WHERE ShipCountry = USA").collect.foreach(println)コンソールで、次のようなLakebase のデータを取得できました!これでLakebase との連携は完了です。
CData JDBC Driver for Lakebase をApache Spark で使って、Lakebase に対して、複雑かつハイパフォーマンスなクエリを実行できます。30日の無償評価版 をダウンロードしてぜひお試しください。